fix(api): 修復 34 個 Ruff lint 錯誤

- 自動修復 import 排序、unused imports
- 手動修復 raise from、isinstance union、unused variable
- scripts/ 暫時保留 (非 CI 阻擋)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
OG T
2026-03-29 15:27:49 +08:00
parent 5f9a6a7e55
commit d89f0520f9
27 changed files with 2538 additions and 1132 deletions

View File

@@ -0,0 +1,390 @@
#!/usr/bin/env python3
"""
AWOOOI Alert Chain Smoke Test (Wave A.6)
=========================================
E2E 驗證告警鏈路完整性
功能:
1. 發送測試告警到 Sentry/SignOz Webhook
2. 驗證 Telegram 通知發送
3. 檢查 Prometheus Metrics
4. 驗證 Incident 建立
用法:
cd apps/api
python -m scripts.alert_chain_smoke_test
# 指定來源
python -m scripts.alert_chain_smoke_test --source sentry
python -m scripts.alert_chain_smoke_test --source signoz
python -m scripts.alert_chain_smoke_test --source all
環境變數:
AWOOOI_API_URL: API 端點 (預設 http://localhost:8000)
AWOOOI_METRICS_URL: Metrics 端點 (預設 http://localhost:8000/metrics)
Author: Claude Code
Date: 2026-03-29
ADR: ADR-037 (監控增強架構)
"""
import argparse
import asyncio
import os
import sys
import time
from datetime import datetime
from pathlib import Path
# Add src to path
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
import httpx
# =============================================================================
# Config (從環境變數讀取,符合首席架構師審查 P0 要求)
# =============================================================================
API_URL = os.environ.get("AWOOOI_API_URL", "http://localhost:8000")
METRICS_URL = os.environ.get("AWOOOI_METRICS_URL", f"{API_URL}/metrics")
# Webhook 端點
SENTRY_WEBHOOK = f"{API_URL}/api/v1/webhooks/sentry/issue"
SIGNOZ_WEBHOOK = f"{API_URL}/api/v1/webhooks/signoz/alert"
# =============================================================================
# Test Payloads
# =============================================================================
SENTRY_TEST_PAYLOAD = {
"action": "triggered",
"data": {
"issue": {
"id": f"smoke-test-{int(time.time())}",
"title": "[Smoke Test] Alert Chain Verification",
"culprit": "alert_chain_smoke_test.py",
"level": "error",
"firstSeen": datetime.now().isoformat(),
"count": 1,
"project": {"slug": "awoooi-api"},
},
"event": {
"message": "Smoke test alert - verifying alert chain E2E",
"platform": "python",
"tags": [{"key": "smoke_test", "value": "true"}],
},
},
}
SIGNOZ_TEST_PAYLOAD = {
"alertname": "SmokeTestAlert",
"status": "firing",
"labels": {
"severity": "warning",
"service_name": "smoke-test",
"namespace": "awoooi-test",
"source": "signoz",
},
"annotations": {
"summary": "[Smoke Test] SignOz Alert Chain Verification",
"description": "This is a smoke test alert to verify the alert chain E2E",
},
"startsAt": datetime.now().isoformat(),
}
# =============================================================================
# Terminal Output Helpers
# =============================================================================
class Colors:
"""ANSI Color Codes"""
HEADER = "\033[95m"
BLUE = "\033[94m"
CYAN = "\033[96m"
GREEN = "\033[92m"
YELLOW = "\033[93m"
RED = "\033[91m"
ENDC = "\033[0m"
BOLD = "\033[1m"
DIM = "\033[2m"
def print_banner():
"""Print banner"""
print(f"""
{Colors.CYAN}{Colors.BOLD}
╔═══════════════════════════════════════════════════════╗
║ AWOOOI Alert Chain Smoke Test (ADR-037 Wave A.6) ║
╚═══════════════════════════════════════════════════════╝
{Colors.ENDC}
{Colors.DIM} API URL: {API_URL}
Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} (Taipei){Colors.ENDC}
""")
def print_section(title: str):
"""Print section header"""
print(f"\n{Colors.BLUE}{Colors.BOLD}{title}{Colors.ENDC}")
print(f"{Colors.DIM}{'' * 55}{Colors.ENDC}")
def print_result(success: bool, message: str, details: str = ""):
"""Print test result"""
if success:
print(f" {Colors.GREEN}{message}{Colors.ENDC}")
else:
print(f" {Colors.RED}{message}{Colors.ENDC}")
if details:
print(f" {Colors.DIM}{details}{Colors.ENDC}")
# =============================================================================
# Test Functions
# =============================================================================
async def test_sentry_webhook() -> tuple[bool, dict]:
"""
測試 Sentry Webhook
Returns:
(success, response_data)
"""
print_section("1. Sentry Webhook Test")
print(f" {Colors.CYAN}Endpoint:{Colors.ENDC} {SENTRY_WEBHOOK}")
try:
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(
SENTRY_WEBHOOK,
json=SENTRY_TEST_PAYLOAD,
headers={"Content-Type": "application/json"},
)
result = response.json()
success = response.status_code in (200, 202) and result.get("status") in ("accepted", "ok")
print_result(
success,
f"HTTP {response.status_code}",
f"Response: {result}",
)
return success, result
except httpx.ConnectError:
print_result(False, "Connection failed", f"Cannot reach {SENTRY_WEBHOOK}")
return False, {"error": "connection_failed"}
except Exception as e:
print_result(False, "Request failed", str(e))
return False, {"error": str(e)}
async def test_signoz_webhook() -> tuple[bool, dict]:
"""
測試 SignOz Webhook
Returns:
(success, response_data)
"""
print_section("2. SignOz Webhook Test")
print(f" {Colors.CYAN}Endpoint:{Colors.ENDC} {SIGNOZ_WEBHOOK}")
try:
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(
SIGNOZ_WEBHOOK,
json=SIGNOZ_TEST_PAYLOAD,
headers={"Content-Type": "application/json"},
)
result = response.json()
success = response.status_code in (200, 202) and result.get("status") in ("accepted", "ok")
print_result(
success,
f"HTTP {response.status_code}",
f"Response: {result}",
)
return success, result
except httpx.ConnectError:
print_result(False, "Connection failed", f"Cannot reach {SIGNOZ_WEBHOOK}")
return False, {"error": "connection_failed"}
except Exception as e:
print_result(False, "Request failed", str(e))
return False, {"error": str(e)}
async def test_metrics() -> tuple[bool, dict]:
"""
檢查 Prometheus Metrics
驗證 Alert Chain 指標存在:
- awoooi_alert_chain_healthy
- awoooi_alert_chain_last_success_timestamp
"""
print_section("3. Prometheus Metrics Check")
print(f" {Colors.CYAN}Endpoint:{Colors.ENDC} {METRICS_URL}")
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(METRICS_URL)
if response.status_code != 200:
print_result(False, f"HTTP {response.status_code}")
return False, {"error": f"status_code: {response.status_code}"}
metrics_text = response.text
# 檢查必要指標
required_metrics = [
"awoooi_alert_chain_healthy",
"awoooi_alert_chain_last_success_timestamp",
"awoooi_webhook_requests_total",
"awoooi_alert_processed_total",
]
found_metrics = {}
for metric in required_metrics:
found = metric in metrics_text
found_metrics[metric] = found
print_result(found, f"Metric: {metric}")
all_found = all(found_metrics.values())
return all_found, found_metrics
except httpx.ConnectError:
print_result(False, "Connection failed", f"Cannot reach {METRICS_URL}")
return False, {"error": "connection_failed"}
except Exception as e:
print_result(False, "Request failed", str(e))
return False, {"error": str(e)}
async def test_health() -> tuple[bool, dict]:
"""
檢查 API Health
"""
print_section("4. API Health Check")
health_url = f"{API_URL}/api/v1/health"
print(f" {Colors.CYAN}Endpoint:{Colors.ENDC} {health_url}")
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(health_url)
success = response.status_code == 200
result = response.json() if success else {}
print_result(
success,
f"HTTP {response.status_code}",
f"Status: {result.get('status', 'unknown')}",
)
return success, result
except httpx.ConnectError:
print_result(False, "Connection failed", f"Cannot reach {health_url}")
return False, {"error": "connection_failed"}
except Exception as e:
print_result(False, "Request failed", str(e))
return False, {"error": str(e)}
# =============================================================================
# Main
# =============================================================================
async def run_smoke_tests(source: str = "all") -> bool:
"""
執行 Smoke Tests
Args:
source: 測試來源 (sentry, signoz, all)
Returns:
bool: 全部通過
"""
print_banner()
results = []
# Health Check
health_ok, _ = await test_health()
results.append(("Health", health_ok))
if not health_ok:
print(f"\n{Colors.RED}{Colors.BOLD}✗ API 健康檢查失敗,終止測試{Colors.ENDC}")
return False
# Webhook Tests
if source in ("sentry", "all"):
sentry_ok, _ = await test_sentry_webhook()
results.append(("Sentry Webhook", sentry_ok))
if source in ("signoz", "all"):
signoz_ok, _ = await test_signoz_webhook()
results.append(("SignOz Webhook", signoz_ok))
# 等待背景任務處理
if source != "none":
print(f"\n{Colors.DIM} 等待 2 秒讓背景任務處理...{Colors.ENDC}")
await asyncio.sleep(2)
# Metrics Check
metrics_ok, _ = await test_metrics()
results.append(("Metrics", metrics_ok))
# Summary
print_section("Summary")
all_passed = True
for name, passed in results:
print_result(passed, name)
if not passed:
all_passed = False
print(f"\n{'' * 55}")
if all_passed:
print(f"{Colors.GREEN}{Colors.BOLD}✓ All smoke tests passed!{Colors.ENDC}")
else:
print(f"{Colors.RED}{Colors.BOLD}✗ Some smoke tests failed!{Colors.ENDC}")
return all_passed
def main():
"""CLI Entry Point"""
parser = argparse.ArgumentParser(
description="AWOOOI Alert Chain Smoke Test (ADR-037 Wave A.6)",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
環境變數:
AWOOOI_API_URL API 端點 (預設 http://localhost:8000)
AWOOOI_METRICS_URL Metrics 端點 (預設 $AWOOOI_API_URL/metrics)
範例:
python -m scripts.alert_chain_smoke_test
python -m scripts.alert_chain_smoke_test --source sentry
AWOOOI_API_URL=http://awoooi-api:8000 python -m scripts.alert_chain_smoke_test
""",
)
parser.add_argument(
"--source", "-s",
type=str,
default="all",
choices=["sentry", "signoz", "all", "none"],
help="測試來源 (預設: all, none = 只檢查 health/metrics)",
)
args = parser.parse_args()
success = asyncio.run(run_smoke_tests(args.source))
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,97 @@
-- ============================================================================
-- Phase 18: AuditLog 字段遷移
-- ============================================================================
-- 日期: 2026-03-28
-- 作者: Claude Code (首席架構師)
-- 原因: Phase 18 失敗自動修復閉環需要新字段,但未執行遷移導致 API 500
-- ============================================================================
-- 檢查並添加缺少的字段
-- 這些字段在 apps/api/src/db/models.py AuditLog 中定義但未遷移
-- 1. authorization_channel: 授權來源 (web, telegram, auto)
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'audit_logs' AND column_name = 'authorization_channel'
) THEN
ALTER TABLE audit_logs ADD COLUMN authorization_channel VARCHAR(20);
COMMENT ON COLUMN audit_logs.authorization_channel IS 'Authorization source: web, telegram, auto';
END IF;
END $$;
-- 2. retry_count: 重試次數
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'audit_logs' AND column_name = 'retry_count'
) THEN
ALTER TABLE audit_logs ADD COLUMN retry_count INTEGER DEFAULT 0 NOT NULL;
COMMENT ON COLUMN audit_logs.retry_count IS 'Number of retry attempts';
END IF;
END $$;
-- 3. failure_classification: 失敗分類
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'audit_logs' AND column_name = 'failure_classification'
) THEN
ALTER TABLE audit_logs ADD COLUMN failure_classification VARCHAR(50);
COMMENT ON COLUMN audit_logs.failure_classification IS 'Failure type: TIMEOUT, K8S_ERROR, NETWORK_ERROR, PERMISSION_DENIED';
END IF;
END $$;
-- 4. source_approval_id: 原始 Approval ID (修復追蹤)
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'audit_logs' AND column_name = 'source_approval_id'
) THEN
ALTER TABLE audit_logs ADD COLUMN source_approval_id VARCHAR(36);
COMMENT ON COLUMN audit_logs.source_approval_id IS 'Original approval ID if this is a repair attempt';
CREATE INDEX IF NOT EXISTS ix_audit_source_approval_id ON audit_logs(source_approval_id);
END IF;
END $$;
-- 5. auto_repair_attempted: 是否嘗試自動修復
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'audit_logs' AND column_name = 'auto_repair_attempted'
) THEN
ALTER TABLE audit_logs ADD COLUMN auto_repair_attempted BOOLEAN DEFAULT FALSE NOT NULL;
COMMENT ON COLUMN audit_logs.auto_repair_attempted IS 'Whether auto-repair was attempted';
END IF;
END $$;
-- 6. auto_repair_result: 自動修復結果
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'audit_logs' AND column_name = 'auto_repair_result'
) THEN
ALTER TABLE audit_logs ADD COLUMN auto_repair_result TEXT;
COMMENT ON COLUMN audit_logs.auto_repair_result IS 'Auto-repair result: AI analysis and repair outcome';
END IF;
END $$;
-- 添加索引
CREATE INDEX IF NOT EXISTS ix_audit_authorization_channel ON audit_logs(authorization_channel);
CREATE INDEX IF NOT EXISTS ix_audit_failure_classification ON audit_logs(failure_classification);
-- 驗證遷移結果
SELECT
column_name,
data_type,
is_nullable,
column_default
FROM information_schema.columns
WHERE table_name = 'audit_logs'
ORDER BY ordinal_position;

View File

@@ -0,0 +1,136 @@
#!/usr/bin/env python3
"""
Phase 18 AuditLog Migration Script
===================================
執行 Phase 18 新增字段的數據庫遷移
使用方式:
cd apps/api && python scripts/run_migration.py
"""
import asyncio
from sqlalchemy import text
from sqlalchemy.ext.asyncio import create_async_engine
# 數據庫連接
DATABASE_URL = "postgresql+asyncpg://awoooi:changeme@192.168.0.188:5432/awoooi_prod"
MIGRATION_SQLS = [
# 1. authorization_channel
"""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'audit_logs' AND column_name = 'authorization_channel'
) THEN
ALTER TABLE audit_logs ADD COLUMN authorization_channel VARCHAR(20);
END IF;
END $$;
""",
# 2. retry_count
"""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'audit_logs' AND column_name = 'retry_count'
) THEN
ALTER TABLE audit_logs ADD COLUMN retry_count INTEGER DEFAULT 0 NOT NULL;
END IF;
END $$;
""",
# 3. failure_classification
"""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'audit_logs' AND column_name = 'failure_classification'
) THEN
ALTER TABLE audit_logs ADD COLUMN failure_classification VARCHAR(50);
END IF;
END $$;
""",
# 4. source_approval_id
"""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'audit_logs' AND column_name = 'source_approval_id'
) THEN
ALTER TABLE audit_logs ADD COLUMN source_approval_id VARCHAR(36);
END IF;
END $$;
""",
# 5. auto_repair_attempted
"""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'audit_logs' AND column_name = 'auto_repair_attempted'
) THEN
ALTER TABLE audit_logs ADD COLUMN auto_repair_attempted BOOLEAN DEFAULT FALSE NOT NULL;
END IF;
END $$;
""",
# 6. auto_repair_result
"""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'audit_logs' AND column_name = 'auto_repair_result'
) THEN
ALTER TABLE audit_logs ADD COLUMN auto_repair_result TEXT;
END IF;
END $$;
""",
# 創建索引
"CREATE INDEX IF NOT EXISTS ix_audit_authorization_channel ON audit_logs(authorization_channel);",
"CREATE INDEX IF NOT EXISTS ix_audit_failure_classification ON audit_logs(failure_classification);",
"CREATE INDEX IF NOT EXISTS ix_audit_source_approval_id ON audit_logs(source_approval_id);",
]
async def run_migration():
"""執行遷移"""
print("=" * 60)
print("Phase 18 AuditLog Migration")
print("=" * 60)
engine = create_async_engine(DATABASE_URL, echo=False)
async with engine.begin() as conn:
# 執行遷移
for i, sql in enumerate(MIGRATION_SQLS, 1):
try:
await conn.execute(text(sql))
print(f"✅ Step {i}/{len(MIGRATION_SQLS)} completed")
except Exception as e:
print(f"❌ Step {i} failed: {e}")
# 驗證結果
print("\n" + "=" * 60)
print("驗證欄位:")
print("=" * 60)
result = await conn.execute(text("""
SELECT column_name, data_type, is_nullable, column_default
FROM information_schema.columns
WHERE table_name = 'audit_logs'
ORDER BY ordinal_position
"""))
for row in result:
print(f" {row[0]}: {row[1]} (nullable={row[2]}, default={row[3]})")
await engine.dispose()
print("\n✅ Migration completed!")
if __name__ == "__main__":
asyncio.run(run_migration())

View File

@@ -11,12 +11,11 @@ Nemotron Tool Calling 精準度測試
日期: 2026-03-28 (台北時間)
"""
import os
import json
import asyncio
import json
import os
import time
from dataclasses import dataclass
from typing import Optional
try:
import httpx
@@ -198,11 +197,11 @@ class TestResult:
test_id: str
description: str
success: bool
tool_called: Optional[str]
params: Optional[dict]
tool_called: str | None
params: dict | None
latency_ms: float
error: Optional[str] = None
raw_response: Optional[str] = None
error: str | None = None
raw_response: str | None = None
async def call_nemotron(prompt: str, model: str = "nvidia/nemotron-mini-4b-instruct") -> dict:
@@ -328,7 +327,7 @@ async def run_single_test(test_case: dict) -> list:
prompt = test_case["prompt"]
# 測試 Nemotron
print(f" Testing Nemotron...", end=" ", flush=True)
print(" Testing Nemotron...", end=" ", flush=True)
resp = await call_nemotron(prompt)
if resp["error"]:
results.append(TestResult(
@@ -341,7 +340,7 @@ async def run_single_test(test_case: dict) -> list:
latency_ms=resp["latency_ms"],
error=resp["error"]
))
print(f"❌ Error")
print("❌ Error")
else:
tool, params, error = parse_nemotron_response(resp["data"])
success = tool == test_case["expected_tool"]
@@ -365,7 +364,7 @@ async def run_single_test(test_case: dict) -> list:
print(f"{status} {tool} ({resp['latency_ms']:.0f}ms)")
# 測試 Ollama
print(f" Testing Ollama...", end=" ", flush=True)
print(" Testing Ollama...", end=" ", flush=True)
resp = await call_ollama(prompt)
if resp["error"]:
results.append(TestResult(
@@ -404,7 +403,7 @@ async def main():
print("🧪 Nemotron vs Ollama Tool Calling 精準度測試")
print("=" * 70)
print()
print(f"Nemotron API: integrate.api.nvidia.com")
print("Nemotron API: integrate.api.nvidia.com")
print(f"Ollama URL: {OLLAMA_BASE_URL}")
print()

View File

@@ -22,12 +22,12 @@ from pathlib import Path
# 加入 src 到 path
sys.path.insert(0, str(Path(__file__).parent.parent))
from src.services.ai_router import AIProvider, get_ai_router
from src.services.nvidia_provider import (
NvidiaProvider,
create_tool_definition,
get_nvidia_provider,
)
from src.services.ai_router import get_ai_router, AIProvider
async def test_nvidia_connection():
@@ -59,7 +59,7 @@ async def test_nvidia_connection():
)
if result.success:
print(f"✅ NVIDIA API 連線成功")
print("✅ NVIDIA API 連線成功")
print(f" 延遲: {result.latency_ms:.0f}ms")
if result.usage:
print(f" Token: {result.usage.total_tokens}")
@@ -87,7 +87,7 @@ async def test_router_integration():
provider, model, fallback_chain = router.route_tool_calling()
print(f"✅ Tool Calling 路由")
print("✅ Tool Calling 路由")
print(f" Provider: {provider.value}")
print(f" Model: {model}")
print(f" Fallback: {[p.value for p, _ in fallback_chain]}")
@@ -153,7 +153,7 @@ async def test_chinese_prompt():
if result.success and result.tool_calls:
for tc in result.tool_calls:
if tc.valid and tc.tool_name == "scale_deployment":
print(f"✅ 繁中 Tool Calling 成功")
print("✅ 繁中 Tool Calling 成功")
print(f" Deployment: {tc.arguments.get('deployment')}")
print(f" Replicas: {tc.arguments.get('replicas')}")
return True