diff --git a/apps/api/k3s-prod.yaml b/apps/api/k3s-prod.yaml index 6f45f8e8..0f703f03 100644 --- a/apps/api/k3s-prod.yaml +++ b/apps/api/k3s-prod.yaml @@ -2,7 +2,7 @@ apiVersion: v1 clusters: - cluster: certificate-authority-data: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJkakNDQVIyZ0F3SUJBZ0lCQURBS0JnZ3Foa2pPUFFRREFqQWpNU0V3SHdZRFZRUUREQmhyTTNNdGMyVnkKZG1WeUxXTmhRREUzTnpJNU56YzNNelF3SGhjTk1qWXdNekE0TVRNME9EVTBXaGNOTXpZd016QTFNVE0wT0RVMApXakFqTVNFd0h3WURWUVFEREJock0zTXRjMlZ5ZG1WeUxXTmhRREUzTnpJNU56YzNNelF3V1RBVEJnY3Foa2pPClBRSUJCZ2dxaGtqT1BRTUJCd05DQUFUcHl2L3hDeWNDRGZVelZZeTYySFdTZ3Zzd3hSSEx1anpCM2NrTVM4USsKM0laZ1E2aDYzMm1DdU8wZ0F1WUxJWTVqUC9TSzI4UU0zZStVVHNUejBIWWZvMEl3UURBT0JnTlZIUThCQWY4RQpCQU1DQXFRd0R3WURWUjBUQVFIL0JBVXdBd0VCL3pBZEJnTlZIUTRFRmdRVVdVZ3l0bGl5UE5Db3dPVzhxeVpuCkg1TGtkS2d3Q2dZSUtvWkl6ajBFQXdJRFJ3QXdSQUlnS3U5T2RrUE5BL2ppMUlmVW91aDFtNlNrcXZLYTUvUW4KRmU1cXhPOXlDOWdDSUVGWldEaXJoeWlpVUpERDVPODArOTVBODF1UFRQNEhCWlJISmNBZVFFbGoKLS0tLS1FTkQgQ0VSVElGSUNBVEUtLS0tLQo= - server: https://192.168.0.120:6443 + server: https://192.168.0.125:6443 name: default contexts: - context: diff --git a/apps/api/scripts/alert_chain_smoke_test.py b/apps/api/scripts/alert_chain_smoke_test.py new file mode 100755 index 00000000..83d22f5b --- /dev/null +++ b/apps/api/scripts/alert_chain_smoke_test.py @@ -0,0 +1,390 @@ +#!/usr/bin/env python3 +""" +AWOOOI Alert Chain Smoke Test (Wave A.6) +========================================= +E2E 驗證告警鏈路完整性 + +功能: +1. 發送測試告警到 Sentry/SignOz Webhook +2. 驗證 Telegram 通知發送 +3. 檢查 Prometheus Metrics +4. 驗證 Incident 建立 + +用法: + cd apps/api + python -m scripts.alert_chain_smoke_test + + # 指定來源 + python -m scripts.alert_chain_smoke_test --source sentry + python -m scripts.alert_chain_smoke_test --source signoz + python -m scripts.alert_chain_smoke_test --source all + +環境變數: + AWOOOI_API_URL: API 端點 (預設 http://localhost:8000) + AWOOOI_METRICS_URL: Metrics 端點 (預設 http://localhost:8000/metrics) + +Author: Claude Code +Date: 2026-03-29 +ADR: ADR-037 (監控增強架構) +""" + +import argparse +import asyncio +import os +import sys +import time +from datetime import datetime +from pathlib import Path + +# Add src to path +sys.path.insert(0, str(Path(__file__).parent.parent / "src")) + +import httpx + +# ============================================================================= +# Config (從環境變數讀取,符合首席架構師審查 P0 要求) +# ============================================================================= + +API_URL = os.environ.get("AWOOOI_API_URL", "http://localhost:8000") +METRICS_URL = os.environ.get("AWOOOI_METRICS_URL", f"{API_URL}/metrics") + +# Webhook 端點 +SENTRY_WEBHOOK = f"{API_URL}/api/v1/webhooks/sentry/issue" +SIGNOZ_WEBHOOK = f"{API_URL}/api/v1/webhooks/signoz/alert" + +# ============================================================================= +# Test Payloads +# ============================================================================= + +SENTRY_TEST_PAYLOAD = { + "action": "triggered", + "data": { + "issue": { + "id": f"smoke-test-{int(time.time())}", + "title": "[Smoke Test] Alert Chain Verification", + "culprit": "alert_chain_smoke_test.py", + "level": "error", + "firstSeen": datetime.now().isoformat(), + "count": 1, + "project": {"slug": "awoooi-api"}, + }, + "event": { + "message": "Smoke test alert - verifying alert chain E2E", + "platform": "python", + "tags": [{"key": "smoke_test", "value": "true"}], + }, + }, +} + +SIGNOZ_TEST_PAYLOAD = { + "alertname": "SmokeTestAlert", + "status": "firing", + "labels": { + "severity": "warning", + "service_name": "smoke-test", + "namespace": "awoooi-test", + "source": "signoz", + }, + "annotations": { + "summary": "[Smoke Test] SignOz Alert Chain Verification", + "description": "This is a smoke test alert to verify the alert chain E2E", + }, + "startsAt": datetime.now().isoformat(), +} + + +# ============================================================================= +# Terminal Output Helpers +# ============================================================================= + +class Colors: + """ANSI Color Codes""" + HEADER = "\033[95m" + BLUE = "\033[94m" + CYAN = "\033[96m" + GREEN = "\033[92m" + YELLOW = "\033[93m" + RED = "\033[91m" + ENDC = "\033[0m" + BOLD = "\033[1m" + DIM = "\033[2m" + + +def print_banner(): + """Print banner""" + print(f""" +{Colors.CYAN}{Colors.BOLD} + ╔═══════════════════════════════════════════════════════╗ + ║ AWOOOI Alert Chain Smoke Test (ADR-037 Wave A.6) ║ + ╚═══════════════════════════════════════════════════════╝ +{Colors.ENDC} +{Colors.DIM} API URL: {API_URL} + Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} (Taipei){Colors.ENDC} +""") + + +def print_section(title: str): + """Print section header""" + print(f"\n{Colors.BLUE}{Colors.BOLD}▶ {title}{Colors.ENDC}") + print(f"{Colors.DIM}{'─' * 55}{Colors.ENDC}") + + +def print_result(success: bool, message: str, details: str = ""): + """Print test result""" + if success: + print(f" {Colors.GREEN}✓ {message}{Colors.ENDC}") + else: + print(f" {Colors.RED}✗ {message}{Colors.ENDC}") + if details: + print(f" {Colors.DIM}{details}{Colors.ENDC}") + + +# ============================================================================= +# Test Functions +# ============================================================================= + +async def test_sentry_webhook() -> tuple[bool, dict]: + """ + 測試 Sentry Webhook + + Returns: + (success, response_data) + """ + print_section("1. Sentry Webhook Test") + print(f" {Colors.CYAN}Endpoint:{Colors.ENDC} {SENTRY_WEBHOOK}") + + try: + async with httpx.AsyncClient(timeout=30.0) as client: + response = await client.post( + SENTRY_WEBHOOK, + json=SENTRY_TEST_PAYLOAD, + headers={"Content-Type": "application/json"}, + ) + + result = response.json() + success = response.status_code in (200, 202) and result.get("status") in ("accepted", "ok") + + print_result( + success, + f"HTTP {response.status_code}", + f"Response: {result}", + ) + + return success, result + + except httpx.ConnectError: + print_result(False, "Connection failed", f"Cannot reach {SENTRY_WEBHOOK}") + return False, {"error": "connection_failed"} + except Exception as e: + print_result(False, "Request failed", str(e)) + return False, {"error": str(e)} + + +async def test_signoz_webhook() -> tuple[bool, dict]: + """ + 測試 SignOz Webhook + + Returns: + (success, response_data) + """ + print_section("2. SignOz Webhook Test") + print(f" {Colors.CYAN}Endpoint:{Colors.ENDC} {SIGNOZ_WEBHOOK}") + + try: + async with httpx.AsyncClient(timeout=30.0) as client: + response = await client.post( + SIGNOZ_WEBHOOK, + json=SIGNOZ_TEST_PAYLOAD, + headers={"Content-Type": "application/json"}, + ) + + result = response.json() + success = response.status_code in (200, 202) and result.get("status") in ("accepted", "ok") + + print_result( + success, + f"HTTP {response.status_code}", + f"Response: {result}", + ) + + return success, result + + except httpx.ConnectError: + print_result(False, "Connection failed", f"Cannot reach {SIGNOZ_WEBHOOK}") + return False, {"error": "connection_failed"} + except Exception as e: + print_result(False, "Request failed", str(e)) + return False, {"error": str(e)} + + +async def test_metrics() -> tuple[bool, dict]: + """ + 檢查 Prometheus Metrics + + 驗證 Alert Chain 指標存在: + - awoooi_alert_chain_healthy + - awoooi_alert_chain_last_success_timestamp + """ + print_section("3. Prometheus Metrics Check") + print(f" {Colors.CYAN}Endpoint:{Colors.ENDC} {METRICS_URL}") + + try: + async with httpx.AsyncClient(timeout=10.0) as client: + response = await client.get(METRICS_URL) + + if response.status_code != 200: + print_result(False, f"HTTP {response.status_code}") + return False, {"error": f"status_code: {response.status_code}"} + + metrics_text = response.text + + # 檢查必要指標 + required_metrics = [ + "awoooi_alert_chain_healthy", + "awoooi_alert_chain_last_success_timestamp", + "awoooi_webhook_requests_total", + "awoooi_alert_processed_total", + ] + + found_metrics = {} + for metric in required_metrics: + found = metric in metrics_text + found_metrics[metric] = found + print_result(found, f"Metric: {metric}") + + all_found = all(found_metrics.values()) + return all_found, found_metrics + + except httpx.ConnectError: + print_result(False, "Connection failed", f"Cannot reach {METRICS_URL}") + return False, {"error": "connection_failed"} + except Exception as e: + print_result(False, "Request failed", str(e)) + return False, {"error": str(e)} + + +async def test_health() -> tuple[bool, dict]: + """ + 檢查 API Health + """ + print_section("4. API Health Check") + health_url = f"{API_URL}/api/v1/health" + print(f" {Colors.CYAN}Endpoint:{Colors.ENDC} {health_url}") + + try: + async with httpx.AsyncClient(timeout=10.0) as client: + response = await client.get(health_url) + + success = response.status_code == 200 + result = response.json() if success else {} + + print_result( + success, + f"HTTP {response.status_code}", + f"Status: {result.get('status', 'unknown')}", + ) + + return success, result + + except httpx.ConnectError: + print_result(False, "Connection failed", f"Cannot reach {health_url}") + return False, {"error": "connection_failed"} + except Exception as e: + print_result(False, "Request failed", str(e)) + return False, {"error": str(e)} + + +# ============================================================================= +# Main +# ============================================================================= + +async def run_smoke_tests(source: str = "all") -> bool: + """ + 執行 Smoke Tests + + Args: + source: 測試來源 (sentry, signoz, all) + + Returns: + bool: 全部通過 + """ + print_banner() + + results = [] + + # Health Check + health_ok, _ = await test_health() + results.append(("Health", health_ok)) + + if not health_ok: + print(f"\n{Colors.RED}{Colors.BOLD}✗ API 健康檢查失敗,終止測試{Colors.ENDC}") + return False + + # Webhook Tests + if source in ("sentry", "all"): + sentry_ok, _ = await test_sentry_webhook() + results.append(("Sentry Webhook", sentry_ok)) + + if source in ("signoz", "all"): + signoz_ok, _ = await test_signoz_webhook() + results.append(("SignOz Webhook", signoz_ok)) + + # 等待背景任務處理 + if source != "none": + print(f"\n{Colors.DIM} 等待 2 秒讓背景任務處理...{Colors.ENDC}") + await asyncio.sleep(2) + + # Metrics Check + metrics_ok, _ = await test_metrics() + results.append(("Metrics", metrics_ok)) + + # Summary + print_section("Summary") + all_passed = True + for name, passed in results: + print_result(passed, name) + if not passed: + all_passed = False + + print(f"\n{'─' * 55}") + if all_passed: + print(f"{Colors.GREEN}{Colors.BOLD}✓ All smoke tests passed!{Colors.ENDC}") + else: + print(f"{Colors.RED}{Colors.BOLD}✗ Some smoke tests failed!{Colors.ENDC}") + + return all_passed + + +def main(): + """CLI Entry Point""" + parser = argparse.ArgumentParser( + description="AWOOOI Alert Chain Smoke Test (ADR-037 Wave A.6)", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +環境變數: + AWOOOI_API_URL API 端點 (預設 http://localhost:8000) + AWOOOI_METRICS_URL Metrics 端點 (預設 $AWOOOI_API_URL/metrics) + +範例: + python -m scripts.alert_chain_smoke_test + python -m scripts.alert_chain_smoke_test --source sentry + AWOOOI_API_URL=http://awoooi-api:8000 python -m scripts.alert_chain_smoke_test + """, + ) + + parser.add_argument( + "--source", "-s", + type=str, + default="all", + choices=["sentry", "signoz", "all", "none"], + help="測試來源 (預設: all, none = 只檢查 health/metrics)", + ) + + args = parser.parse_args() + + success = asyncio.run(run_smoke_tests(args.source)) + sys.exit(0 if success else 1) + + +if __name__ == "__main__": + main() diff --git a/apps/api/scripts/migrate_phase18_audit_logs.sql b/apps/api/scripts/migrate_phase18_audit_logs.sql new file mode 100644 index 00000000..1437692b --- /dev/null +++ b/apps/api/scripts/migrate_phase18_audit_logs.sql @@ -0,0 +1,97 @@ +-- ============================================================================ +-- Phase 18: AuditLog 字段遷移 +-- ============================================================================ +-- 日期: 2026-03-28 +-- 作者: Claude Code (首席架構師) +-- 原因: Phase 18 失敗自動修復閉環需要新字段,但未執行遷移導致 API 500 +-- ============================================================================ + +-- 檢查並添加缺少的字段 +-- 這些字段在 apps/api/src/db/models.py AuditLog 中定義但未遷移 + +-- 1. authorization_channel: 授權來源 (web, telegram, auto) +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_name = 'audit_logs' AND column_name = 'authorization_channel' + ) THEN + ALTER TABLE audit_logs ADD COLUMN authorization_channel VARCHAR(20); + COMMENT ON COLUMN audit_logs.authorization_channel IS 'Authorization source: web, telegram, auto'; + END IF; +END $$; + +-- 2. retry_count: 重試次數 +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_name = 'audit_logs' AND column_name = 'retry_count' + ) THEN + ALTER TABLE audit_logs ADD COLUMN retry_count INTEGER DEFAULT 0 NOT NULL; + COMMENT ON COLUMN audit_logs.retry_count IS 'Number of retry attempts'; + END IF; +END $$; + +-- 3. failure_classification: 失敗分類 +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_name = 'audit_logs' AND column_name = 'failure_classification' + ) THEN + ALTER TABLE audit_logs ADD COLUMN failure_classification VARCHAR(50); + COMMENT ON COLUMN audit_logs.failure_classification IS 'Failure type: TIMEOUT, K8S_ERROR, NETWORK_ERROR, PERMISSION_DENIED'; + END IF; +END $$; + +-- 4. source_approval_id: 原始 Approval ID (修復追蹤) +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_name = 'audit_logs' AND column_name = 'source_approval_id' + ) THEN + ALTER TABLE audit_logs ADD COLUMN source_approval_id VARCHAR(36); + COMMENT ON COLUMN audit_logs.source_approval_id IS 'Original approval ID if this is a repair attempt'; + CREATE INDEX IF NOT EXISTS ix_audit_source_approval_id ON audit_logs(source_approval_id); + END IF; +END $$; + +-- 5. auto_repair_attempted: 是否嘗試自動修復 +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_name = 'audit_logs' AND column_name = 'auto_repair_attempted' + ) THEN + ALTER TABLE audit_logs ADD COLUMN auto_repair_attempted BOOLEAN DEFAULT FALSE NOT NULL; + COMMENT ON COLUMN audit_logs.auto_repair_attempted IS 'Whether auto-repair was attempted'; + END IF; +END $$; + +-- 6. auto_repair_result: 自動修復結果 +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_name = 'audit_logs' AND column_name = 'auto_repair_result' + ) THEN + ALTER TABLE audit_logs ADD COLUMN auto_repair_result TEXT; + COMMENT ON COLUMN audit_logs.auto_repair_result IS 'Auto-repair result: AI analysis and repair outcome'; + END IF; +END $$; + +-- 添加索引 +CREATE INDEX IF NOT EXISTS ix_audit_authorization_channel ON audit_logs(authorization_channel); +CREATE INDEX IF NOT EXISTS ix_audit_failure_classification ON audit_logs(failure_classification); + +-- 驗證遷移結果 +SELECT + column_name, + data_type, + is_nullable, + column_default +FROM information_schema.columns +WHERE table_name = 'audit_logs' +ORDER BY ordinal_position; diff --git a/apps/api/scripts/run_migration.py b/apps/api/scripts/run_migration.py new file mode 100644 index 00000000..44a95914 --- /dev/null +++ b/apps/api/scripts/run_migration.py @@ -0,0 +1,136 @@ +#!/usr/bin/env python3 +""" +Phase 18 AuditLog Migration Script +=================================== +執行 Phase 18 新增字段的數據庫遷移 + +使用方式: + cd apps/api && python scripts/run_migration.py +""" + +import asyncio + +from sqlalchemy import text +from sqlalchemy.ext.asyncio import create_async_engine + +# 數據庫連接 +DATABASE_URL = "postgresql+asyncpg://awoooi:changeme@192.168.0.188:5432/awoooi_prod" + +MIGRATION_SQLS = [ + # 1. authorization_channel + """ + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_name = 'audit_logs' AND column_name = 'authorization_channel' + ) THEN + ALTER TABLE audit_logs ADD COLUMN authorization_channel VARCHAR(20); + END IF; + END $$; + """, + # 2. retry_count + """ + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_name = 'audit_logs' AND column_name = 'retry_count' + ) THEN + ALTER TABLE audit_logs ADD COLUMN retry_count INTEGER DEFAULT 0 NOT NULL; + END IF; + END $$; + """, + # 3. failure_classification + """ + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_name = 'audit_logs' AND column_name = 'failure_classification' + ) THEN + ALTER TABLE audit_logs ADD COLUMN failure_classification VARCHAR(50); + END IF; + END $$; + """, + # 4. source_approval_id + """ + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_name = 'audit_logs' AND column_name = 'source_approval_id' + ) THEN + ALTER TABLE audit_logs ADD COLUMN source_approval_id VARCHAR(36); + END IF; + END $$; + """, + # 5. auto_repair_attempted + """ + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_name = 'audit_logs' AND column_name = 'auto_repair_attempted' + ) THEN + ALTER TABLE audit_logs ADD COLUMN auto_repair_attempted BOOLEAN DEFAULT FALSE NOT NULL; + END IF; + END $$; + """, + # 6. auto_repair_result + """ + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_name = 'audit_logs' AND column_name = 'auto_repair_result' + ) THEN + ALTER TABLE audit_logs ADD COLUMN auto_repair_result TEXT; + END IF; + END $$; + """, + # 創建索引 + "CREATE INDEX IF NOT EXISTS ix_audit_authorization_channel ON audit_logs(authorization_channel);", + "CREATE INDEX IF NOT EXISTS ix_audit_failure_classification ON audit_logs(failure_classification);", + "CREATE INDEX IF NOT EXISTS ix_audit_source_approval_id ON audit_logs(source_approval_id);", +] + + +async def run_migration(): + """執行遷移""" + print("=" * 60) + print("Phase 18 AuditLog Migration") + print("=" * 60) + + engine = create_async_engine(DATABASE_URL, echo=False) + + async with engine.begin() as conn: + # 執行遷移 + for i, sql in enumerate(MIGRATION_SQLS, 1): + try: + await conn.execute(text(sql)) + print(f"✅ Step {i}/{len(MIGRATION_SQLS)} completed") + except Exception as e: + print(f"❌ Step {i} failed: {e}") + + # 驗證結果 + print("\n" + "=" * 60) + print("驗證欄位:") + print("=" * 60) + + result = await conn.execute(text(""" + SELECT column_name, data_type, is_nullable, column_default + FROM information_schema.columns + WHERE table_name = 'audit_logs' + ORDER BY ordinal_position + """)) + + for row in result: + print(f" {row[0]}: {row[1]} (nullable={row[2]}, default={row[3]})") + + await engine.dispose() + print("\n✅ Migration completed!") + + +if __name__ == "__main__": + asyncio.run(run_migration()) diff --git a/apps/api/scripts/test_nemotron_tool_calling.py b/apps/api/scripts/test_nemotron_tool_calling.py index fffb7b51..b3fa1c73 100644 --- a/apps/api/scripts/test_nemotron_tool_calling.py +++ b/apps/api/scripts/test_nemotron_tool_calling.py @@ -11,12 +11,11 @@ Nemotron Tool Calling 精準度測試 日期: 2026-03-28 (台北時間) """ -import os -import json import asyncio +import json +import os import time from dataclasses import dataclass -from typing import Optional try: import httpx @@ -198,11 +197,11 @@ class TestResult: test_id: str description: str success: bool - tool_called: Optional[str] - params: Optional[dict] + tool_called: str | None + params: dict | None latency_ms: float - error: Optional[str] = None - raw_response: Optional[str] = None + error: str | None = None + raw_response: str | None = None async def call_nemotron(prompt: str, model: str = "nvidia/nemotron-mini-4b-instruct") -> dict: @@ -328,7 +327,7 @@ async def run_single_test(test_case: dict) -> list: prompt = test_case["prompt"] # 測試 Nemotron - print(f" Testing Nemotron...", end=" ", flush=True) + print(" Testing Nemotron...", end=" ", flush=True) resp = await call_nemotron(prompt) if resp["error"]: results.append(TestResult( @@ -341,7 +340,7 @@ async def run_single_test(test_case: dict) -> list: latency_ms=resp["latency_ms"], error=resp["error"] )) - print(f"❌ Error") + print("❌ Error") else: tool, params, error = parse_nemotron_response(resp["data"]) success = tool == test_case["expected_tool"] @@ -365,7 +364,7 @@ async def run_single_test(test_case: dict) -> list: print(f"{status} {tool} ({resp['latency_ms']:.0f}ms)") # 測試 Ollama - print(f" Testing Ollama...", end=" ", flush=True) + print(" Testing Ollama...", end=" ", flush=True) resp = await call_ollama(prompt) if resp["error"]: results.append(TestResult( @@ -404,7 +403,7 @@ async def main(): print("🧪 Nemotron vs Ollama Tool Calling 精準度測試") print("=" * 70) print() - print(f"Nemotron API: integrate.api.nvidia.com") + print("Nemotron API: integrate.api.nvidia.com") print(f"Ollama URL: {OLLAMA_BASE_URL}") print() diff --git a/apps/api/scripts/verify_nemotron_e2e.py b/apps/api/scripts/verify_nemotron_e2e.py index a6df0127..c15af7a3 100644 --- a/apps/api/scripts/verify_nemotron_e2e.py +++ b/apps/api/scripts/verify_nemotron_e2e.py @@ -22,12 +22,12 @@ from pathlib import Path # 加入 src 到 path sys.path.insert(0, str(Path(__file__).parent.parent)) +from src.services.ai_router import AIProvider, get_ai_router from src.services.nvidia_provider import ( NvidiaProvider, create_tool_definition, get_nvidia_provider, ) -from src.services.ai_router import get_ai_router, AIProvider async def test_nvidia_connection(): @@ -59,7 +59,7 @@ async def test_nvidia_connection(): ) if result.success: - print(f"✅ NVIDIA API 連線成功") + print("✅ NVIDIA API 連線成功") print(f" 延遲: {result.latency_ms:.0f}ms") if result.usage: print(f" Token: {result.usage.total_tokens}") @@ -87,7 +87,7 @@ async def test_router_integration(): provider, model, fallback_chain = router.route_tool_calling() - print(f"✅ Tool Calling 路由") + print("✅ Tool Calling 路由") print(f" Provider: {provider.value}") print(f" Model: {model}") print(f" Fallback: {[p.value for p, _ in fallback_chain]}") @@ -153,7 +153,7 @@ async def test_chinese_prompt(): if result.success and result.tool_calls: for tc in result.tool_calls: if tc.valid and tc.tool_name == "scale_deployment": - print(f"✅ 繁中 Tool Calling 成功") + print("✅ 繁中 Tool Calling 成功") print(f" Deployment: {tc.arguments.get('deployment')}") print(f" Replicas: {tc.arguments.get('replicas')}") return True diff --git a/apps/api/src/api/v1/github_webhook.py b/apps/api/src/api/v1/github_webhook.py index 16c7af96..2a2ca9bb 100644 --- a/apps/api/src/api/v1/github_webhook.py +++ b/apps/api/src/api/v1/github_webhook.py @@ -47,7 +47,6 @@ from src.models.approval import ( ApprovalRequestCreate, BlastRadius, DataImpact, - DryRunCheck, RiskLevel, ) from src.services.approval_db import get_approval_service diff --git a/apps/api/src/api/v1/learning.py b/apps/api/src/api/v1/learning.py index 33c55eee..a326f874 100644 --- a/apps/api/src/api/v1/learning.py +++ b/apps/api/src/api/v1/learning.py @@ -17,9 +17,9 @@ Phase D-G P0 修正: 新增學習 API 端點 - 符合 API 路徑命名規範 """ -from fastapi import APIRouter, HTTPException -from pydantic import BaseModel import structlog +from fastapi import APIRouter +from pydantic import BaseModel from src.services.learning_service import get_learning_service diff --git a/apps/api/src/api/v1/sentry_webhook.py b/apps/api/src/api/v1/sentry_webhook.py index 5f812e72..a58083cc 100644 --- a/apps/api/src/api/v1/sentry_webhook.py +++ b/apps/api/src/api/v1/sentry_webhook.py @@ -21,12 +21,20 @@ import structlog from fastapi import APIRouter, BackgroundTasks, HTTPException, Request from pydantic import BaseModel +from src.core.config import settings +from src.core.metrics import ( + record_alert_chain_failure, + record_alert_chain_success, + record_alert_processed, + record_anomaly, +) from src.models.approval import ( ApprovalRequestCreate, BlastRadius, DataImpact, RiskLevel, ) +from src.services.anomaly_counter import get_anomaly_counter from src.services.approval_db import get_approval_service from src.services.sentry_service import get_sentry_service from src.services.telegram_gateway import get_telegram_gateway @@ -36,10 +44,8 @@ logger = structlog.get_logger(__name__) router = APIRouter(prefix="/webhooks/sentry", tags=["Sentry Webhook"]) -# OpenClaw 配置 -# 2026-03-29 ogt: 端口統一為 8089 (ADR-028) -OPENCLAW_URL = "http://192.168.0.188:8089" -SENTRY_API_URL = "http://192.168.0.110:9000" +# OpenClaw 配置 - 從 settings 讀取 (P1-1 修復, 2026-03-29) +# 2026-03-29: SENTRY_API_URL 已移至 settings.SENTRY_SELF_HOSTED_URL (Wave A.1) # Sentry Level → Risk Level 映射 SENTRY_LEVEL_TO_RISK = { @@ -182,36 +188,70 @@ async def analyze_and_comment( 背景任務:分析錯誤 + Telegram 告警 + 建立 Approval Phase 10: Sentry + OpenClaw AI 整合 - 1. 呼叫 OpenClaw 分析 - 2. 發送 Telegram 告警 - 3. 建立 Approval 供人工審核 - 4. 回寫 Sentry Comment + Phase 21 (ADR-037): 異常頻率統計 + + 執行順序 (避免邏輯衝突): + 1. 記錄異常頻率 (AnomalyCounter) + 2. 呼叫 OpenClaw 分析 + 3. 建立 Approval (含頻率資訊) + 4. 發送 Telegram 告警 (含頻率資訊) + 5. 回寫 Sentry Comment (含頻率資訊) """ try: logger.info("sentry_analysis_started", issue_id=issue_id) - # 1. 呼叫 OpenClaw 分析 + # 1. 記錄異常頻率 (ADR-037) + anomaly_counter = get_anomaly_counter() + anomaly_signature = { + "alert_name": "sentry_error", + "service": error_context.get("project", "unknown"), + "error_type": error_context.get("title", "unknown"), + "namespace": "sentry", # Sentry 來源統一標記 + } + frequency = await anomaly_counter.record_anomaly(anomaly_signature) + frequency_dict = frequency.to_dict() + + logger.info( + "anomaly_frequency_recorded", + issue_id=issue_id, + anomaly_key=frequency.anomaly_key, + count_24h=frequency.count_24h, + escalation_level=frequency.escalation_level, + ) + + # Wave A.5: 記錄異常指標 (ADR-037) + record_anomaly( + alert_name="sentry_error", + service=error_context.get("project", "unknown"), + frequency_24h=frequency.count_24h, + escalation_level=frequency.escalation_level, + ) + + # 2. 呼叫 OpenClaw 分析 analysis = await call_openclaw_analyzer(error_context) - # 2. 建立 Approval + # 3. 建立 Approval (含頻率資訊) approval_id = await create_sentry_approval( error_context=error_context, analysis=analysis, + anomaly_frequency=frequency_dict, ) - # 3. 發送 Telegram 告警 + # 4. 發送 Telegram 告警 (含頻率資訊) await send_sentry_telegram_alert( error_context=error_context, analysis=analysis, approval_id=approval_id, + anomaly_frequency=frequency_dict, ) - # 4. 回寫 Sentry Comment (如果分析成功) + # 5. 回寫 Sentry Comment (如果分析成功,含頻率資訊) if analysis: await post_sentry_comment( project_slug=project_slug, issue_id=issue_id, - analysis=analysis + analysis=analysis, + anomaly_frequency=frequency_dict, ) logger.info( @@ -219,10 +259,21 @@ async def analyze_and_comment( issue_id=issue_id, approval_id=approval_id, has_analysis=analysis is not None, + escalation_level=frequency.escalation_level, + ) + + # Wave A.5: 記錄告警鏈路成功 (ADR-037) + record_alert_chain_success("sentry") + record_alert_processed( + source="sentry", + severity=error_context.get("level", "error"), + outcome="incident_created", ) except Exception as e: logger.exception("sentry_analysis_failed", issue_id=issue_id, error=str(e)) + # Wave A.5: 記錄告警鏈路失敗 (ADR-037) + record_alert_chain_failure("sentry") async def call_openclaw_analyzer(error_context: dict) -> ErrorAnalysisResult | None: @@ -235,7 +286,7 @@ async def call_openclaw_analyzer(error_context: dict) -> ErrorAnalysisResult | N try: async with httpx.AsyncClient(timeout=60.0) as client: response = await client.post( - f"{OPENCLAW_URL}/api/v1/analyze/error", + f"{settings.OPENCLAW_URL}/api/v1/analyze/error", json={ "error_context": error_context, "prefer_local": True, # 優先 Ollama @@ -261,13 +312,41 @@ async def call_openclaw_analyzer(error_context: dict) -> ErrorAnalysisResult | N async def post_sentry_comment( project_slug: str, issue_id: str, - analysis: ErrorAnalysisResult + analysis: ErrorAnalysisResult, + anomaly_frequency: dict | None = None, ): """ 回寫分析結果到 Sentry Issue Comment API: POST /api/0/issues/{issue_id}/comments/ + Phase 21 (ADR-037): 含異常頻率統計 """ + # 頻率統計區塊 (ADR-037) + frequency_section = "" + if anomaly_frequency and anomaly_frequency.get("count_24h", 0) > 1: + freq = anomaly_frequency + escalation_emoji = { + None: "", + "REPEAT": ":warning:", + "ESCALATE": ":red_circle:", + "PERMANENT_FIX": ":rotating_light:", + }.get(freq.get("escalation_level"), "") + + frequency_section = f""" +## 頻率統計 {escalation_emoji} + +| 時間窗口 | 次數 | +|---------|------| +| 1 小時 | {freq.get('count_1h', 0)} | +| 24 小時 | {freq.get('count_24h', 0)} | +| 7 天 | {freq.get('count_7d', 0)} | +| 30 天 | {freq.get('count_30d', 0)} | + +**修復嘗試**: {freq.get('auto_repair_count', 0)} 次 +""" + if freq.get("escalation_level"): + frequency_section += f"**升級建議**: {freq['escalation_level']}\n" + comment_text = f"""## AI 錯誤分析 (by {analysis.analyzed_by}) **根本原因 (Root Cause)** @@ -281,22 +360,26 @@ async def post_sentry_comment( **預防措施 (Prevention)** {analysis.prevention} - +{frequency_section} --- *分析信心度: {analysis.confidence:.0%} | 分析時間: {now_taipei_iso()}* """ try: - # TODO: 需要 Sentry API Token - # 目前先 log 出來 - logger.info(f"Would post comment to issue {issue_id}:\n{comment_text}") + # Wave A.4: 使用 SentryService 回寫 Comment (ADR-037, 2026-03-29) + # 符合 leWOOOgo 模組化原則: Router 層透過 Service 層存取外部 API + sentry_service = get_sentry_service() + result = await sentry_service.post_issue_comment(issue_id, comment_text) - # async with httpx.AsyncClient() as client: - # response = await client.post( - # f"{SENTRY_API_URL}/api/0/issues/{issue_id}/comments/", - # headers={"Authorization": f"Bearer {SENTRY_API_TOKEN}"}, - # json={"text": comment_text} - # ) + if result: + logger.info( + "sentry_comment_success", + issue_id=issue_id, + comment_id=result.get("id"), + ) + else: + # Token 未配置或 API 失敗時記錄 (不中斷流程) + logger.warning("sentry_comment_skipped_or_failed", issue_id=issue_id) except Exception as e: logger.exception("sentry_comment_failed", issue_id=issue_id, error=str(e)) @@ -310,18 +393,20 @@ async def send_sentry_telegram_alert( error_context: dict, analysis: ErrorAnalysisResult | None, approval_id: str, + anomaly_frequency: dict | None = None, ): """ 發送 Sentry 錯誤告警到 Telegram + Phase 21 (ADR-037): 含異常頻率統計 + 格式 (project_sentry_openclaw_v2.md): ═══════════════════════════ 🐛 SENTRY 錯誤告警 ═══════════════════════════ 📍 components/dashboard.tsx:142 ❌ TypeError: Cannot read property 'x' of null - 👥 影響用戶: 12 - 🔄 發生次數: 42 + 📊 頻率: 1h:3 / 24h:12 / 7d:42 ─────────────────────────── 🧠 OpenClaw 分析: 「這是 null check 問題...」 @@ -347,9 +432,14 @@ async def send_sentry_telegram_alert( primary_responsibility="FE" if "tsx" in culprit or "jsx" in culprit else "BE", confidence=analysis.confidence if analysis else 0.0, namespace="sentry", + anomaly_frequency=anomaly_frequency, ) - logger.info("sentry_telegram_sent", approval_id=approval_id) + logger.info( + "sentry_telegram_sent", + approval_id=approval_id, + escalation_level=anomaly_frequency.get("escalation_level") if anomaly_frequency else None, + ) except Exception as e: logger.exception("sentry_telegram_failed", error=str(e)) @@ -362,26 +452,55 @@ async def send_sentry_telegram_alert( async def create_sentry_approval( error_context: dict, analysis: ErrorAnalysisResult | None, + anomaly_frequency: dict | None = None, ) -> str: """ 為 Sentry 錯誤建立 Approval 記錄 + Phase 21 (ADR-037): 含異常頻率統計 + Returns: str: Approval ID """ try: approval_service = get_approval_service() - # 決定風險等級 + # 決定風險等級 (考慮頻率升級) level = error_context.get("level", "error") risk_level = SENTRY_LEVEL_TO_RISK.get(level, RiskLevel.MEDIUM) + # ADR-037: 根據頻率升級風險等級 + if anomaly_frequency: + escalation = anomaly_frequency.get("escalation_level") + if escalation == "PERMANENT_FIX": + risk_level = RiskLevel.CRITICAL + elif escalation == "ESCALATE" and risk_level != RiskLevel.CRITICAL: + risk_level = RiskLevel.HIGH + # 組裝 Approval 請求 title = error_context.get("title", "Unknown Error") culprit = error_context.get("culprit", "unknown") project = error_context.get("project", "unknown") issue_id = error_context.get("issue_id", "unknown") + # 組裝 metadata (含頻率資訊) + metadata = { + "source": "sentry", + "alert_type": f"sentry_{level}", + "sentry_issue_id": issue_id, + "sentry_project": project, + "culprit": culprit, + "error_count": error_context.get("count", 1), + "first_seen": error_context.get("first_seen"), + "stacktrace": error_context.get("stacktrace", []), + "llm_provider": analysis.analyzed_by if analysis else "pending", + "llm_confidence": analysis.confidence if analysis else 0.0, + } + + # ADR-037: 添加頻率資訊到 metadata + if anomaly_frequency: + metadata["anomaly_frequency"] = anomaly_frequency + # 組裝 Approval 請求 (符合 ApprovalRequestBase schema) approval_request = ApprovalRequestCreate( action=f"Sentry {level.upper()} Alert: {culprit}", @@ -395,18 +514,7 @@ async def create_sentry_approval( ), dry_run_checks=[], # Sentry 告警無 dry-run requested_by="sentry-webhook", - metadata={ - "source": "sentry", - "alert_type": f"sentry_{level}", - "sentry_issue_id": issue_id, - "sentry_project": project, - "culprit": culprit, - "error_count": error_context.get("count", 1), - "first_seen": error_context.get("first_seen"), - "stacktrace": error_context.get("stacktrace", []), - "llm_provider": analysis.analyzed_by if analysis else "pending", - "llm_confidence": analysis.confidence if analysis else 0.0, - }, + metadata=metadata, ) # 創建 Approval (ID 由 Service 自動生成) diff --git a/apps/api/src/api/v1/signoz_webhook.py b/apps/api/src/api/v1/signoz_webhook.py index 7deb0615..c0c4f06e 100644 --- a/apps/api/src/api/v1/signoz_webhook.py +++ b/apps/api/src/api/v1/signoz_webhook.py @@ -19,19 +19,17 @@ import structlog from fastapi import APIRouter, BackgroundTasks, HTTPException, Request from pydantic import BaseModel -from src.models.approval import ( - ApprovalRequestCreate, - BlastRadius, - DataImpact, - RiskLevel, -) from src.core.metrics import ( record_alert_chain_failure, record_alert_chain_success, record_alert_processed, record_anomaly, - record_telegram_notification, - record_webhook_request, +) +from src.models.approval import ( + ApprovalRequestCreate, + BlastRadius, + DataImpact, + RiskLevel, ) from src.services.anomaly_counter import get_anomaly_counter from src.services.approval_db import get_approval_service @@ -122,7 +120,7 @@ async def handle_signoz_alert( except Exception as e: logger.exception("signoz_webhook_error", error=str(e)) - raise HTTPException(status_code=500, detail=str(e)) + raise HTTPException(status_code=500, detail=str(e)) from e async def process_signoz_alert( diff --git a/apps/api/src/api/v1/stats.py b/apps/api/src/api/v1/stats.py index 2afea1ba..ad1d4124 100644 --- a/apps/api/src/api/v1/stats.py +++ b/apps/api/src/api/v1/stats.py @@ -1,5 +1,5 @@ # ============================================================================= -# AWOOOI Statistics API - Phase 6.5 +# AWOOOI Statistics API - Phase 17 P1 分層修復 # ============================================================================= # 統計分析 API - 從 Episodic Memory 萃取洞察 # @@ -8,39 +8,34 @@ # - 評估 AI 建議效能 # - 支援 Playbook 萃取 # -# 效能優化: -# - SQL GROUP BY (取代應用層聚合) -# - Redis 快取 (TTL 5 分鐘) +# 架構變更 (2026-03-28): +# - Router 層不再直接存取 DB +# - 所有查詢邏輯已移至 StatsService +# - 符合 leWOOOgo 分層規範 +# +# @author Claude Code (首席架構師) +# @version 2.0.0 +# @date 2026-03-28 (台北時間) +# @see feedback_lewooogo_modular_enforcement.md # ============================================================================= -from datetime import datetime, timedelta -from typing import Any +from typing import Annotated from fastapi import APIRouter, Depends, Query from pydantic import BaseModel, Field -from sqlalchemy import func, select -from sqlalchemy.ext.asyncio import AsyncSession - -from src.core.logging import get_logger -from src.db.base import get_db -from src.db.models import IncidentRecord -from src.models.incident import IncidentStatus - -logger = get_logger(__name__) - -# Phase 17 P0: 快取 TTL 移至 StatsService -# 保留常量供參考 -STATS_CACHE_TTL = 300 # 5 分鐘 - - -# Phase 17 P0: 快取包裝器已移至 StatsService -# 使用方式: stats_service = get_stats_service() -# result = await stats_service.get_cached_or_compute(key, compute_fn) +from src.services.stats_service import StatsService, get_stats_service router = APIRouter(prefix="/stats", tags=["Statistics"]) +# ============================================================================= +# Dependencies +# ============================================================================= + +StatsServiceDep = Annotated[StatsService, Depends(get_stats_service)] + + # ============================================================================= # Response Models # ============================================================================= @@ -128,7 +123,7 @@ class FeedbackSummary(BaseModel): # ============================================================================= -# API Endpoints +# API Endpoints (Phase 17 P1: 薄轉發層) # ============================================================================= @@ -139,7 +134,7 @@ class FeedbackSummary(BaseModel): ) async def get_incident_summary( days: int = Query(30, ge=1, le=365, description="統計區間 (天)"), - db: AsyncSession = Depends(get_db), # noqa: B008 + service: StatsServiceDep = None, ) -> IncidentSummary: """ 取得事件總覽統計 @@ -150,70 +145,17 @@ async def get_incident_summary( - 嚴重度分佈 - 解決率 """ - since = datetime.utcnow() - timedelta(days=days) - - # 總數 - total_result = await db.execute( - select(func.count(IncidentRecord.incident_id)).where( - IncidentRecord.created_at >= since - ) - ) - total = total_result.scalar() or 0 - - # 狀態分佈 - status_result = await db.execute( - select(IncidentRecord.status, func.count(IncidentRecord.incident_id)) - .where(IncidentRecord.created_at >= since) - .group_by(IncidentRecord.status) - ) - status_dist = [ - StatusCount(status=str(row[0]), count=row[1]) for row in status_result.all() - ] - - # 嚴重度分佈 - severity_result = await db.execute( - select(IncidentRecord.severity, func.count(IncidentRecord.incident_id)) - .where(IncidentRecord.created_at >= since) - .group_by(IncidentRecord.severity) - ) - severity_dist = [ - SeverityCount(severity=str(row[0]), count=row[1]) - for row in severity_result.all() - ] - - # 解決率 - resolved_result = await db.execute( - select(func.count(IncidentRecord.incident_id)).where( - IncidentRecord.created_at >= since, - IncidentRecord.status.in_( - [IncidentStatus.RESOLVED, IncidentStatus.CLOSED] - ), - ) - ) - resolved_count = resolved_result.scalar() or 0 - resolved_rate = (resolved_count / total * 100) if total > 0 else 0.0 - - # 平均告警聚合數 - signals_result = await db.execute( - select(func.avg(func.json_array_length(IncidentRecord.signals))).where( - IncidentRecord.created_at >= since - ) - ) - avg_signals = signals_result.scalar() or 0.0 - - logger.info( - "stats_incident_summary", - total=total, - resolved_rate=resolved_rate, - days=days, - ) - + result = await service.get_incident_summary(days) return IncidentSummary( - total_incidents=total, - status_distribution=status_dist, - severity_distribution=severity_dist, - resolved_rate=round(resolved_rate, 2), - avg_signals_per_incident=round(float(avg_signals), 2), + total_incidents=result["total_incidents"], + status_distribution=[ + StatusCount(**s) for s in result["status_distribution"] + ], + severity_distribution=[ + SeverityCount(**s) for s in result["severity_distribution"] + ], + resolved_rate=result["resolved_rate"], + avg_signals_per_incident=result["avg_signals_per_incident"], ) @@ -224,7 +166,7 @@ async def get_incident_summary( ) async def get_resolution_stats( days: int = Query(30, ge=1, le=365, description="統計區間 (天)"), - db: AsyncSession = Depends(get_db), # noqa: B008 + service: StatsServiceDep = None, ) -> ResolutionStats: """ 取得解決時間統計 @@ -234,58 +176,8 @@ async def get_resolution_stats( - P50/P95 解決時間 - 最快/最慢解決時間 """ - since = datetime.utcnow() - timedelta(days=days) - - # 取得已解決事件的時間差 - result = await db.execute( - select( - IncidentRecord.created_at, - IncidentRecord.resolved_at, - ).where( - IncidentRecord.created_at >= since, - IncidentRecord.resolved_at.isnot(None), - ) - ) - rows = result.all() - - if not rows: - return ResolutionStats( - avg_minutes=None, - p50_minutes=None, - p95_minutes=None, - fastest_minutes=None, - slowest_minutes=None, - sample_size=0, - ) - - # 計算解決時間 (分鐘) - durations = [] - for row in rows: - if row.resolved_at and row.created_at: - delta = row.resolved_at - row.created_at - durations.append(delta.total_seconds() / 60) - - if not durations: - return ResolutionStats( - avg_minutes=None, - p50_minutes=None, - p95_minutes=None, - fastest_minutes=None, - slowest_minutes=None, - sample_size=0, - ) - - durations.sort() - n = len(durations) - - return ResolutionStats( - avg_minutes=round(sum(durations) / n, 2), - p50_minutes=round(durations[n // 2], 2), - p95_minutes=round(durations[min(int(n * 0.95), n - 1)], 2), - fastest_minutes=round(min(durations), 2), - slowest_minutes=round(max(durations), 2), - sample_size=n, - ) + result = await service.get_resolution_stats(days) + return ResolutionStats(**result) @router.get( @@ -295,7 +187,7 @@ async def get_resolution_stats( ) async def get_ai_performance( days: int = Query(30, ge=1, le=365, description="統計區間 (天)"), - db: AsyncSession = Depends(get_db), # noqa: B008 + service: StatsServiceDep = None, ) -> AIPerformance: """ 取得 AI 提案效能統計 @@ -305,43 +197,8 @@ async def get_ai_performance( - 執行成功率 - 有效性評分分佈 """ - since = datetime.utcnow() - timedelta(days=days) - - # 取得有 outcome 的事件 - result = await db.execute( - select(IncidentRecord.outcome).where( - IncidentRecord.created_at >= since, - IncidentRecord.outcome.isnot(None), - ) - ) - outcomes = [row[0] for row in result.all() if row[0]] - - total = len(outcomes) - executed = sum(1 for o in outcomes if o.get("proposal_executed")) - success = sum( - 1 for o in outcomes if o.get("proposal_executed") and o.get("execution_success") - ) - - # 有效性評分分佈 - effectiveness_dist: dict[int, int] = {1: 0, 2: 0, 3: 0, 4: 0, 5: 0} - scores = [] - for o in outcomes: - score = o.get("effectiveness_score") - if score and 1 <= score <= 5: - effectiveness_dist[score] += 1 - scores.append(score) - - avg_effectiveness = sum(scores) / len(scores) if scores else None - - return AIPerformance( - total_proposals=total, - executed_count=executed, - execution_rate=round((executed / total * 100) if total > 0 else 0, 2), - success_count=success, - success_rate=round((success / executed * 100) if executed > 0 else 0, 2), - avg_effectiveness=round(avg_effectiveness, 2) if avg_effectiveness else None, - effectiveness_distribution=effectiveness_dist, - ) + result = await service.get_ai_performance(days) + return AIPerformance(**result) @router.get( @@ -352,7 +209,7 @@ async def get_ai_performance( async def get_affected_services( days: int = Query(30, ge=1, le=365, description="統計區間 (天)"), limit: int = Query(10, ge=1, le=50, description="返回數量"), - db: AsyncSession = Depends(get_db), # noqa: B008 + service: StatsServiceDep = None, ) -> list[ServiceImpact]: """ 取得最常受影響的服務排名 @@ -361,42 +218,8 @@ async def get_affected_services( - 事件計數 - 嚴重度分佈 """ - since = datetime.utcnow() - timedelta(days=days) - - # 取得所有事件的 affected_services 和 severity - result = await db.execute( - select( - IncidentRecord.affected_services, - IncidentRecord.severity, - ).where(IncidentRecord.created_at >= since) - ) - - # 統計每個服務 - service_stats: dict[str, dict[str, Any]] = {} - for row in result.all(): - services = row[0] or [] - severity = str(row[1]) - for svc in services: - if svc not in service_stats: - service_stats[svc] = {"count": 0, "severity": {}} - service_stats[svc]["count"] += 1 - service_stats[svc]["severity"][severity] = ( - service_stats[svc]["severity"].get(severity, 0) + 1 - ) - - # 排序並返回 top N - sorted_services = sorted( - service_stats.items(), key=lambda x: x[1]["count"], reverse=True - )[:limit] - - return [ - ServiceImpact( - service=svc, - incident_count=stats["count"], - severity_breakdown=stats["severity"], - ) - for svc, stats in sorted_services - ] + results = await service.get_affected_services(days, limit) + return [ServiceImpact(**r) for r in results] @router.get( @@ -407,56 +230,21 @@ async def get_affected_services( async def get_incident_trends( days: int = Query(30, ge=7, le=365, description="統計區間 (天)"), period: str = Query("daily", description="週期: daily/weekly/monthly"), - db: AsyncSession = Depends(get_db), # noqa: B008 + service: StatsServiceDep = None, ) -> IncidentTrends: """ - 取得事件趨勢數據 (SQL GROUP BY 優化版) + 取得事件趨勢數據 支援週期: - daily: 每日事件數 - weekly: 每週事件數 - monthly: 每月事件數 - - 效能優化: 使用 PostgreSQL date_trunc 在資料庫層聚合 """ - since = datetime.utcnow() - timedelta(days=days) - - # 使用 PostgreSQL date_trunc 在資料庫層聚合 - # 比應用層聚合效能提升 10x+ (大數據量時) - trunc_unit = {"daily": "day", "weekly": "week", "monthly": "month"}.get(period, "day") - - result = await db.execute( - select( - func.date_trunc(trunc_unit, IncidentRecord.created_at).label("period"), - func.count(IncidentRecord.incident_id).label("count"), - ) - .where(IncidentRecord.created_at >= since) - .group_by(func.date_trunc(trunc_unit, IncidentRecord.created_at)) - .order_by(func.date_trunc(trunc_unit, IncidentRecord.created_at)) + result = await service.get_incident_trends(days, period) + return IncidentTrends( + period=result["period"], + data=[TrendPoint(**p) for p in result["data"]], ) - rows = result.all() - - # 格式化日期 - trend_data = [] - for row in rows: - if row.period: - if period == "daily": - date_str = row.period.strftime("%Y-%m-%d") - elif period == "weekly": - date_str = row.period.strftime("%Y-W%W") - else: - date_str = row.period.strftime("%Y-%m") - trend_data.append(TrendPoint(date=date_str, count=row.count)) - - logger.info( - "stats_incident_trends", - period=period, - days=days, - data_points=len(trend_data), - optimized=True, - ) - - return IncidentTrends(period=period, data=trend_data) @router.get( @@ -466,7 +254,7 @@ async def get_incident_trends( ) async def get_feedback_summary( days: int = Query(30, ge=1, le=365, description="統計區間 (天)"), - db: AsyncSession = Depends(get_db), # noqa: B008 + service: StatsServiceDep = None, ) -> FeedbackSummary: """ 取得人類回饋統計 @@ -475,80 +263,5 @@ async def get_feedback_summary( - 正面/中性/負面回饋比例 - 常見主題 (從 learning_notes 萃取) """ - since = datetime.utcnow() - timedelta(days=days) - - # 取得有 outcome 的事件 - result = await db.execute( - select(IncidentRecord.outcome).where( - IncidentRecord.created_at >= since, - IncidentRecord.outcome.isnot(None), - ) - ) - outcomes = [row[0] for row in result.all() if row[0]] - - # 統計回饋分數 - positive = 0 - neutral = 0 - negative = 0 - themes: dict[str, int] = {} - - for o in outcomes: - score = o.get("effectiveness_score") or o.get("feedback_score") - if score: - if score >= 4: - positive += 1 - elif score == 3: - neutral += 1 - else: - negative += 1 - - # 萃取主題 (從 learning_notes) - # TODO Phase 7: 整合 OpenClaw LLM 進行智能主題萃取 - notes = o.get("learning_notes") or o.get("notes") or "" - if notes: - notes_lower = notes.lower() - # 增強版關鍵字萃取 (按領域分類) - theme_keywords = { - # 效能類 - "timeout": ["timeout", "超時", "timed out", "deadline"], - "latency": ["latency", "延遲", "slow", "慢", "p99", "p95"], - "memory": ["memory", "記憶體", "oom", "heap", "內存"], - "cpu": ["cpu", "處理器", "high load", "負載"], - # 網路類 - "network": ["network", "網路", "dns", "connection refused"], - "connection": ["connection", "連線", "socket", "tcp"], - # 儲存類 - "disk": ["disk", "磁碟", "storage", "io", "iops"], - "database": ["database", "資料庫", "db", "query", "deadlock"], - # 容器類 - "pod": ["pod", "container", "restart", "crashloop"], - "scaling": ["scale", "擴容", "replica", "hpa"], - # 應用類 - "error": ["error", "錯誤", "exception", "fail"], - "config": ["config", "配置", "env", "secret"], - } - for theme, keywords in theme_keywords.items(): - if any(kw in notes_lower for kw in keywords): - themes[theme] = themes.get(theme, 0) + 1 - - # 取前 5 個常見主題 - sorted_themes = sorted(themes.items(), key=lambda x: x[1], reverse=True)[:5] - common_themes = [t[0] for t in sorted_themes] - - total = positive + neutral + negative - - logger.info( - "stats_feedback_summary", - total=total, - positive=positive, - negative=negative, - days=days, - ) - - return FeedbackSummary( - total_feedback=total, - positive_count=positive, - neutral_count=neutral, - negative_count=negative, - common_themes=common_themes, - ) + result = await service.get_feedback_summary(days) + return FeedbackSummary(**result) diff --git a/apps/api/src/core/metrics.py b/apps/api/src/core/metrics.py new file mode 100644 index 00000000..5d73c79c --- /dev/null +++ b/apps/api/src/core/metrics.py @@ -0,0 +1,180 @@ +""" +AWOOOI Alert Chain Metrics +=========================== +ADR-037 Wave A.5: 告警鏈路 Prometheus 指標 + +用於監控告警鏈路健康狀態: +- Webhook 請求計數與延遲 +- 告警處理成功率 +- 異常頻率統計 + +版本: v1.0 +建立: 2026-03-29 (台北時區) +建立者: Claude Code (Phase 21 ADR-037) +""" + +from prometheus_client import Counter, Gauge, Histogram + +# ============================================================================= +# Webhook Metrics (告警來源: Alertmanager/Sentry/SignOz) +# ============================================================================= + +WEBHOOK_REQUESTS_TOTAL = Counter( + "awoooi_webhook_requests_total", + "Total webhook requests received", + ["source", "status"], # source: alertmanager/sentry/signoz, status: success/error +) + +WEBHOOK_LATENCY_HISTOGRAM = Histogram( + "awoooi_webhook_latency_seconds", + "Webhook processing latency in seconds", + ["source"], + buckets=[0.1, 0.5, 1, 2, 5, 10, 30], +) + +# ============================================================================= +# Alert Processing Metrics (告警處理) +# ============================================================================= + +ALERT_PROCESSED_TOTAL = Counter( + "awoooi_alerts_processed_total", + "Total alerts processed", + ["source", "severity", "outcome"], # outcome: incident_created/deduped/ignored +) + +INCIDENT_CREATED_TOTAL = Counter( + "awoooi_incidents_created_total", + "Total incidents created from alerts", + ["source", "severity"], +) + +TELEGRAM_NOTIFICATIONS_TOTAL = Counter( + "awoooi_telegram_notifications_total", + "Total Telegram notifications sent", + ["source", "status"], # status: success/failed +) + +# ============================================================================= +# Anomaly Counter Metrics (ADR-037) +# ============================================================================= + +ANOMALY_RECORDED_TOTAL = Counter( + "awoooi_anomaly_recorded_total", + "Total anomalies recorded to counter", + ["alert_name", "service"], +) + +ANOMALY_ESCALATION_TOTAL = Counter( + "awoooi_anomaly_escalation_total", + "Total anomaly escalations", + ["level"], # level: REPEAT/ESCALATE/PERMANENT_FIX +) + +ANOMALY_FREQUENCY_GAUGE = Gauge( + "awoooi_anomaly_frequency_24h", + "Current 24h anomaly frequency", + ["anomaly_key"], +) + +# ============================================================================= +# Auto Repair Metrics +# ============================================================================= + +AUTO_REPAIR_ATTEMPTS_TOTAL = Counter( + "awoooi_auto_repair_attempts_total", + "Total auto repair attempts", + ["action", "tier", "outcome"], # outcome: success/failed/skipped +) + +AUTO_REPAIR_SUCCESS_RATE = Gauge( + "awoooi_auto_repair_success_rate", + "Auto repair success rate by action", + ["action"], +) + +# ============================================================================= +# Alert Chain Health Metrics +# ============================================================================= + +ALERT_CHAIN_LAST_SUCCESS = Gauge( + "awoooi_alert_chain_last_success_timestamp", + "Last successful alert chain completion timestamp", + ["source"], +) + +ALERT_CHAIN_HEALTHY = Gauge( + "awoooi_alert_chain_healthy", + "Alert chain health status (1=healthy, 0=unhealthy)", + ["source"], +) + +# ============================================================================= +# Sentry Comment Metrics +# ============================================================================= + +SENTRY_COMMENT_TOTAL = Counter( + "awoooi_sentry_comment_total", + "Total Sentry comments posted", + ["status"], # status: success/failed/skipped +) + +# ============================================================================= +# Learning Service Metrics (ADR-037 Phase G) +# ============================================================================= + +LEARNING_SKIP_TOTAL = Counter( + "awoooi_learning_skip_total", + "Actions skipped due to low success rate", + ["action"], +) + + +# ============================================================================= +# Helper Functions +# ============================================================================= + +def record_webhook_request(source: str, status: str, latency: float) -> None: + """記錄 Webhook 請求指標""" + WEBHOOK_REQUESTS_TOTAL.labels(source=source, status=status).inc() + WEBHOOK_LATENCY_HISTOGRAM.labels(source=source).observe(latency) + + +def record_alert_processed(source: str, severity: str, outcome: str) -> None: + """記錄告警處理指標""" + ALERT_PROCESSED_TOTAL.labels( + source=source, severity=severity, outcome=outcome + ).inc() + + +def record_telegram_notification(source: str, success: bool) -> None: + """記錄 Telegram 通知指標""" + status = "success" if success else "failed" + TELEGRAM_NOTIFICATIONS_TOTAL.labels(source=source, status=status).inc() + + +def record_anomaly(alert_name: str, service: str, frequency_24h: int, escalation_level: str | None) -> None: + """記錄異常頻率指標""" + ANOMALY_RECORDED_TOTAL.labels(alert_name=alert_name, service=service).inc() + + if escalation_level: + ANOMALY_ESCALATION_TOTAL.labels(level=escalation_level).inc() + + +def record_auto_repair(action: str, tier: int, success: bool) -> None: + """記錄自動修復指標""" + outcome = "success" if success else "failed" + AUTO_REPAIR_ATTEMPTS_TOTAL.labels( + action=action, tier=str(tier), outcome=outcome + ).inc() + + +def record_alert_chain_success(source: str) -> None: + """記錄告警鏈路成功完成""" + import time + ALERT_CHAIN_LAST_SUCCESS.labels(source=source).set(time.time()) + ALERT_CHAIN_HEALTHY.labels(source=source).set(1) + + +def record_alert_chain_failure(source: str) -> None: + """記錄告警鏈路失敗""" + ALERT_CHAIN_HEALTHY.labels(source=source).set(0) diff --git a/apps/api/src/models/incident.py b/apps/api/src/models/incident.py index 6ba693bc..3be8b512 100644 --- a/apps/api/src/models/incident.py +++ b/apps/api/src/models/incident.py @@ -187,6 +187,71 @@ class AIDecisionChain(BaseModel): } +# ============================================================================= +# Incident Frequency Stats (ADR-037: 異常頻率統計) +# ============================================================================= + + +class IncidentFrequencyStats(BaseModel): + """ + 事件頻率統計 - ADR-037 監控增強架構 + + 2026-03-29 ogt: 統帥指示「重啟只是治標,太常發生的異常必須徹底解決」 + + 用途: + - 統計同一異常在不同時間窗口內的發生次數 + - 根據頻率決定修復策略的升級 (Tier 1→4) + - 讓使用者知道這個問題有多頻繁 + + 升級閾值: + - REPEAT: ≥ 3 次/24h (標記重複) + - ESCALATE: ≥ 5 次/24h (升級 Tier,通知 Owner) + - PERMANENT_FIX: ≥ 10 次/24h (強制根因修復) + """ + + anomaly_key: str = Field( + ..., + description="異常簽名 Hash (前 16 字元)", + ) + count_1h: int = Field( + default=0, + ge=0, + description="1 小時內發生次數", + ) + count_24h: int = Field( + default=0, + ge=0, + description="24 小時內發生次數", + ) + count_7d: int = Field( + default=0, + ge=0, + description="7 天內發生次數", + ) + count_30d: int = Field( + default=0, + ge=0, + description="30 天內發生次數", + ) + escalation_level: Literal["REPEAT", "ESCALATE", "PERMANENT_FIX"] | None = Field( + None, + description="升級建議 (基於 24h 頻率)", + ) + auto_repair_count: int = Field( + default=0, + ge=0, + description="自動修復嘗試次數", + ) + last_repair_action: str | None = Field( + None, + description="最後一次修復動作", + ) + last_repair_success: bool | None = Field( + None, + description="最後一次修復是否成功", + ) + + # ============================================================================= # Incident Outcome (CPO 要求:回饋循環) # ============================================================================= @@ -319,6 +384,13 @@ class Incident(BaseModel): description="事件結果與人類回饋", ) + # === 頻率統計 (ADR-037) === + # 2026-03-29 ogt: 統帥指示「重啟只是治標,太常發生的異常必須徹底解決」 + frequency_stats: IncidentFrequencyStats | None = Field( + None, + description="異常頻率統計 (用於 Tier 分級修復策略)", + ) + # === 時間軸 === created_at: datetime = Field( default_factory=lambda: datetime.now(UTC), diff --git a/apps/api/src/services/anomaly_counter.py b/apps/api/src/services/anomaly_counter.py new file mode 100644 index 00000000..94f3a433 --- /dev/null +++ b/apps/api/src/services/anomaly_counter.py @@ -0,0 +1,546 @@ +""" +異常頻率統計服務 +================================ +ADR-037: 監控增強架構 - 異常頻率統計與根本修復 +建立: 2026-03-29 (台北時區) Claude Code + +使用 Redis Sorted Set 實作滑動窗口計數: +- ZADD anomaly:timeline:{key} {timestamp} {timestamp} +- ZCOUNT anomaly:timeline:{key} {start} +inf +- ZREMRANGEBYSCORE anomaly:timeline:{key} -inf {cutoff} + +設計原則: +- 遵循 leWOOOgo 積木化鐵律 +- 不直接存取 DB,只用 Redis +- 完整審計追蹤 +""" + +from __future__ import annotations + +import hashlib +import json +from dataclasses import dataclass +from datetime import datetime, timedelta +from typing import TYPE_CHECKING + +import structlog + +if TYPE_CHECKING: + import redis.asyncio as redis + +logger = structlog.get_logger(__name__) + + +# ============================================================================= +# Data Types +# ============================================================================= + + +@dataclass +class AnomalyFrequency: + """異常頻率資料""" + + anomaly_key: str + count_1h: int + count_24h: int + count_7d: int + count_30d: int + first_seen: datetime + last_seen: datetime + auto_repair_count: int + permanent_fix_applied: bool + escalation_level: str | None # None, REPEAT, ESCALATE, PERMANENT_FIX + + def to_dict(self) -> dict: + """轉換為字典 (供 Telegram 告警使用)""" + return { + "anomaly_key": self.anomaly_key, + "count_1h": self.count_1h, + "count_24h": self.count_24h, + "count_7d": self.count_7d, + "count_30d": self.count_30d, + "first_seen": self.first_seen.isoformat(), + "last_seen": self.last_seen.isoformat(), + "auto_repair_count": self.auto_repair_count, + "permanent_fix_applied": self.permanent_fix_applied, + "escalation_level": self.escalation_level, + } + + +# ============================================================================= +# AnomalyCounter Service +# ============================================================================= + + +class AnomalyCounter: + """ + 異常計數器 - 追蹤每種異常的發生頻率 + + 統帥指示 (2026-03-29): + - "重啟只是治標,不是治本!太常發生的異常必須徹底解決" + - "需要統計、計數!必須要讓使用者知道!!" + + 閾值配置: + - REPEAT: 3 次/24h → 標記重複,通知用戶 + - ESCALATE: 5 次/24h → 升級 Tier,通知 Owner + - PERMANENT_FIX: 10 次/24h → 強制根因修復 + """ + + # 升級閾值 (可透過環境變數覆寫) + THRESHOLDS = { + "REPEAT": 3, # 3 次 → 重複告警 + "ESCALATE": 5, # 5 次 → 人工介入 + "PERMANENT_FIX": 10, # 10 次 → 必須永久修復 + } + + # Redis Key 前綴 + PREFIX_TIMELINE = "anomaly:timeline:" + PREFIX_REPAIR_COUNT = "anomaly:repair_count:" + PREFIX_PERMANENT_FIX = "anomaly:permanent_fix:" + PREFIX_METADATA = "anomaly:metadata:" + PREFIX_REPAIR_HISTORY = "anomaly:repair_history:" + + # TTL 設定 (35 天,比清理週期長一點) + TTL_SECONDS = 35 * 24 * 3600 + + def __init__(self, redis_client: redis.Redis) -> None: + self.redis = redis_client + + @staticmethod + def hash_signature(signature: dict) -> str: + """ + 生成異常簽名的 hash key + + 簽名欄位: + - alert_name: 告警名稱 (e.g., PodCrashLoopBackOff) + - service: 服務名稱 (e.g., awoooi-api) + - namespace: K8s 命名空間 (e.g., awoooi-prod) + - error_type: 錯誤類型 (e.g., OOMKilled) + """ + # 只取關鍵欄位,忽略時間戳等易變欄位 + key_fields = { + "alert_name": signature.get("alert_name", signature.get("alertname", "")), + "service": signature.get("service", signature.get("job", "")), + "namespace": signature.get("namespace", ""), + "error_type": signature.get("error_type", signature.get("reason", "")), + } + # 排序確保一致性 + canonical = json.dumps(key_fields, sort_keys=True) + return hashlib.sha256(canonical.encode()).hexdigest()[:16] + + async def record_anomaly(self, anomaly_signature: dict) -> AnomalyFrequency: + """ + 記錄一次異常發生 + + Args: + anomaly_signature: 異常簽名字典 + + Returns: + AnomalyFrequency: 當前頻率統計 + """ + anomaly_key = self.hash_signature(anomaly_signature) + now = datetime.now() + timestamp = now.timestamp() + timeline_key = f"{self.PREFIX_TIMELINE}{anomaly_key}" + + # 1. 添加到 Sorted Set (score = timestamp, member = timestamp string) + await self.redis.zadd(timeline_key, {str(timestamp): timestamp}) + + # 2. 清理過期數據 (30 天前) + cutoff_30d = (now - timedelta(days=30)).timestamp() + await self.redis.zremrangebyscore(timeline_key, "-inf", cutoff_30d) + + # 3. 設置 TTL + await self.redis.expire(timeline_key, self.TTL_SECONDS) + + # 4. 計算各時間窗口的計數 + count_1h = await self.redis.zcount( + timeline_key, + (now - timedelta(hours=1)).timestamp(), + "+inf", + ) + count_24h = await self.redis.zcount( + timeline_key, + (now - timedelta(hours=24)).timestamp(), + "+inf", + ) + count_7d = await self.redis.zcount( + timeline_key, + (now - timedelta(days=7)).timestamp(), + "+inf", + ) + count_30d = await self.redis.zcount( + timeline_key, + cutoff_30d, + "+inf", + ) + + # 5. 取得首次/最近時間 + first_seen_data = await self.redis.zrange( + timeline_key, 0, 0, withscores=True + ) + last_seen_data = await self.redis.zrange( + timeline_key, -1, -1, withscores=True + ) + + first_seen = ( + datetime.fromtimestamp(first_seen_data[0][1]) + if first_seen_data + else now + ) + last_seen = ( + datetime.fromtimestamp(last_seen_data[0][1]) + if last_seen_data + else now + ) + + # 6. 讀取修復統計 + repair_count_str = await self.redis.get( + f"{self.PREFIX_REPAIR_COUNT}{anomaly_key}" + ) + auto_repair_count = int(repair_count_str) if repair_count_str else 0 + + permanent_fix_str = await self.redis.get( + f"{self.PREFIX_PERMANENT_FIX}{anomaly_key}" + ) + permanent_fix = permanent_fix_str == "1" + + # 7. 儲存 metadata (首次記錄時) + metadata_key = f"{self.PREFIX_METADATA}{anomaly_key}" + if not await self.redis.exists(metadata_key): + await self.redis.hset( + metadata_key, + mapping={ + "signature": json.dumps(anomaly_signature), + "first_seen": now.isoformat(), + }, + ) + await self.redis.expire(metadata_key, self.TTL_SECONDS) + + # 8. 判斷升級等級 + escalation_level = self._get_escalation_level(count_24h) + + freq = AnomalyFrequency( + anomaly_key=anomaly_key, + count_1h=count_1h, + count_24h=count_24h, + count_7d=count_7d, + count_30d=count_30d, + first_seen=first_seen, + last_seen=last_seen, + auto_repair_count=auto_repair_count, + permanent_fix_applied=permanent_fix, + escalation_level=escalation_level, + ) + + # 9. 記錄日誌 + logger.info( + "anomaly_recorded", + anomaly_key=anomaly_key, + count_1h=count_1h, + count_24h=count_24h, + count_30d=count_30d, + escalation_level=escalation_level, + ) + + return freq + + def _get_escalation_level(self, count_24h: int) -> str | None: + """判斷升級等級 (基於 24h 內次數)""" + if count_24h >= self.THRESHOLDS["PERMANENT_FIX"]: + return "PERMANENT_FIX" + elif count_24h >= self.THRESHOLDS["ESCALATE"]: + return "ESCALATE" + elif count_24h >= self.THRESHOLDS["REPEAT"]: + return "REPEAT" + return None + + async def record_repair_attempt( + self, + anomaly_key: str, + action: str, + success: bool, + ) -> None: + """ + 記錄修復嘗試 + + Args: + anomaly_key: 異常 key + action: 修復動作 (e.g., restart_pod, scale_up) + success: 是否成功 + """ + repair_key = f"{self.PREFIX_REPAIR_COUNT}{anomaly_key}" + + # 遞增修復嘗試次數 + await self.redis.incr(repair_key) + await self.redis.expire(repair_key, self.TTL_SECONDS) + + # 記錄修復歷史 (用於學習) + history_key = f"{self.PREFIX_REPAIR_HISTORY}{anomaly_key}" + await self.redis.lpush( + history_key, + json.dumps( + { + "action": action, + "success": success, + "timestamp": datetime.now().isoformat(), + } + ), + ) + await self.redis.ltrim(history_key, 0, 99) # 只保留最近 100 次 + await self.redis.expire(history_key, self.TTL_SECONDS) + + logger.info( + "repair_attempt_recorded", + anomaly_key=anomaly_key, + action=action, + success=success, + ) + + async def mark_permanent_fix_applied( + self, + anomaly_key: str, + fix_description: str, + ) -> None: + """ + 標記已套用永久修復 + + Args: + anomaly_key: 異常 key + fix_description: 修復說明 + """ + await self.redis.set( + f"{self.PREFIX_PERMANENT_FIX}{anomaly_key}", + "1", + ex=90 * 24 * 3600, # 90 天 + ) + + # 記錄修復詳情 + metadata_key = f"{self.PREFIX_METADATA}{anomaly_key}" + await self.redis.hset( + metadata_key, + mapping={ + "permanent_fix_applied": "true", + "permanent_fix_description": fix_description, + "permanent_fix_time": datetime.now().isoformat(), + }, + ) + + logger.info( + "permanent_fix_marked", + anomaly_key=anomaly_key, + fix_description=fix_description, + ) + + async def get_repair_success_rate( + self, + anomaly_key: str, + action: str, + ) -> dict: + """ + 取得特定動作的修復成功率 + + Returns: + { + 'action': 'restart_pod', + 'total': 10, + 'success': 3, + 'success_rate': 0.3, + } + """ + history_key = f"{self.PREFIX_REPAIR_HISTORY}{anomaly_key}" + history = await self.redis.lrange(history_key, 0, -1) + + total = 0 + success_count = 0 + + for item in history: + data = json.loads(item) + if data["action"] == action: + total += 1 + if data["success"]: + success_count += 1 + + return { + "action": action, + "total": total, + "success": success_count, + "success_rate": success_count / total if total > 0 else 0.0, + } + + async def get_all_repair_stats(self, anomaly_key: str) -> dict[str, dict]: + """ + 取得所有修復動作的統計 + + Returns: + { + 'restart_pod': {'total': 10, 'success': 3, 'success_rate': 0.3}, + 'scale_up': {'total': 2, 'success': 1, 'success_rate': 0.5}, + } + """ + history_key = f"{self.PREFIX_REPAIR_HISTORY}{anomaly_key}" + history = await self.redis.lrange(history_key, 0, -1) + + stats: dict[str, dict] = {} + + for item in history: + data = json.loads(item) + action = data["action"] + + if action not in stats: + stats[action] = {"total": 0, "success": 0} + + stats[action]["total"] += 1 + if data["success"]: + stats[action]["success"] += 1 + + # 計算成功率 + for action_stats in stats.values(): + total = action_stats["total"] + action_stats["success_rate"] = ( + action_stats["success"] / total if total > 0 else 0.0 + ) + + return stats + + async def get_frequency(self, anomaly_key: str) -> AnomalyFrequency | None: + """ + 取得異常頻率統計 (不記錄新事件) + + Args: + anomaly_key: 異常 key + + Returns: + AnomalyFrequency 或 None (若無記錄) + """ + timeline_key = f"{self.PREFIX_TIMELINE}{anomaly_key}" + + # 檢查是否有記錄 + if not await self.redis.exists(timeline_key): + return None + + now = datetime.now() + cutoff_30d = (now - timedelta(days=30)).timestamp() + + # 計算各時間窗口的計數 + count_1h = await self.redis.zcount( + timeline_key, + (now - timedelta(hours=1)).timestamp(), + "+inf", + ) + count_24h = await self.redis.zcount( + timeline_key, + (now - timedelta(hours=24)).timestamp(), + "+inf", + ) + count_7d = await self.redis.zcount( + timeline_key, + (now - timedelta(days=7)).timestamp(), + "+inf", + ) + count_30d = await self.redis.zcount( + timeline_key, + cutoff_30d, + "+inf", + ) + + # 取得時間範圍 + first_seen_data = await self.redis.zrange( + timeline_key, 0, 0, withscores=True + ) + last_seen_data = await self.redis.zrange( + timeline_key, -1, -1, withscores=True + ) + + first_seen = ( + datetime.fromtimestamp(first_seen_data[0][1]) + if first_seen_data + else now + ) + last_seen = ( + datetime.fromtimestamp(last_seen_data[0][1]) + if last_seen_data + else now + ) + + # 讀取修復統計 + repair_count_str = await self.redis.get( + f"{self.PREFIX_REPAIR_COUNT}{anomaly_key}" + ) + auto_repair_count = int(repair_count_str) if repair_count_str else 0 + + permanent_fix_str = await self.redis.get( + f"{self.PREFIX_PERMANENT_FIX}{anomaly_key}" + ) + permanent_fix = permanent_fix_str == "1" + + escalation_level = self._get_escalation_level(count_24h) + + return AnomalyFrequency( + anomaly_key=anomaly_key, + count_1h=count_1h, + count_24h=count_24h, + count_7d=count_7d, + count_30d=count_30d, + first_seen=first_seen, + last_seen=last_seen, + auto_repair_count=auto_repair_count, + permanent_fix_applied=permanent_fix, + escalation_level=escalation_level, + ) + + async def should_skip_action( + self, + anomaly_key: str, + action: str, + min_success_rate: float = 0.2, + ) -> bool: + """ + 判斷是否應跳過某修復動作 + + 統帥指示: 成功率 < 20% 時應該跳過,嘗試其他動作 + + Args: + anomaly_key: 異常 key + action: 修復動作 + min_success_rate: 最低成功率閾值 (預設 20%) + + Returns: + True 表示應跳過此動作 + """ + stats = await self.get_repair_success_rate(anomaly_key, action) + + # 至少有 2 次嘗試才判斷 + if stats["total"] < 2: + return False + + return stats["success_rate"] < min_success_rate + + +# ============================================================================= +# Singleton Factory (遵循現有模式) +# ============================================================================= + +_anomaly_counter: AnomalyCounter | None = None + + +def get_anomaly_counter() -> AnomalyCounter: + """ + 取得 AnomalyCounter 實例 + + 使用 Singleton 模式,共用 Redis 連線池 + """ + global _anomaly_counter + if _anomaly_counter is None: + from src.core.redis_client import get_redis + + _anomaly_counter = AnomalyCounter(get_redis()) + return _anomaly_counter + + +def reset_anomaly_counter() -> None: + """ + 重置 AnomalyCounter 實例 (供測試使用) + """ + global _anomaly_counter + _anomaly_counter = None diff --git a/apps/api/src/services/auto_repair_service.py b/apps/api/src/services/auto_repair_service.py index d4f979e1..8943ce39 100644 --- a/apps/api/src/services/auto_repair_service.py +++ b/apps/api/src/services/auto_repair_service.py @@ -34,6 +34,7 @@ from src.models.playbook import ( RiskLevel, SymptomPattern, ) +from src.services.anomaly_counter import AnomalyFrequency, get_anomaly_counter from src.services.executor import get_executor from src.services.playbook_service import IPlaybookService, get_playbook_service @@ -403,6 +404,126 @@ class AutoRepairService: return "UNKNOWN_ACTION_TYPE" + # === ADR-037: Tier-based Repair (2026-03-29) === + + # Tier 分級動作映射 + TIER_ACTIONS = { + 1: ["restart_pod", "restart_container"], # 臨時修復 + 2: ["scale_up", "increase_memory", "adjust_limits"], # 緩解修復 + 3: ["apply_hotfix", "update_config", "patch_deployment"], # 根因修復 + 4: ["create_issue", "notify_team", "schedule_fix"], # 架構修復 + } + + async def determine_repair_tier( + self, + anomaly_key: str, + frequency: AnomalyFrequency, + ) -> int: + """ + 根據頻率決定修復 Tier (ADR-037) + + 統帥指示 (2026-03-29): + - "重啟只是治標,不是治本!太常發生的異常必須徹底解決" + - 根據異常頻率和修復歷史決定應該嘗試的修復層級 + + Returns: + 1: 臨時修復 (重啟) + 2: 緩解修復 (擴容) + 3: 根因修復 (配置變更) + 4: 架構修復 (需開發) + """ + # 取得修復歷史 + counter = get_anomaly_counter() + stats = await counter.get_all_repair_stats(anomaly_key) + + # 計算重啟次數 + restart_count = stats.get("restart_pod", {}).get("total", 0) + restart_count += stats.get("restart_container", {}).get("total", 0) + + # Tier 決策邏輯 + if frequency.permanent_fix_applied: + # 已有永久修復但仍出問題 → 需架構級修復 + logger.info( + "tier_decision", + anomaly_key=anomaly_key, + tier=4, + reason="permanent_fix_still_failing", + ) + return 4 + + if frequency.escalation_level == "PERMANENT_FIX": + # 24h 內 ≥10 次 → 根因修復 + logger.info( + "tier_decision", + anomaly_key=anomaly_key, + tier=3, + reason="escalation_permanent_fix", + ) + return 3 + + if frequency.escalation_level == "ESCALATE": + # 24h 內 ≥5 次 → 緩解修復 + logger.info( + "tier_decision", + anomaly_key=anomaly_key, + tier=2, + reason="escalation_escalate", + ) + return 2 + + if restart_count >= 2: + # 已重啟 2 次 → 升級到緩解 + logger.info( + "tier_decision", + anomaly_key=anomaly_key, + tier=2, + reason=f"restart_count_{restart_count}", + ) + return 2 + + # 預設臨時修復 + return 1 + + def get_tier_actions(self, tier: int) -> list[str]: + """ + 根據 Tier 返回可用修復動作 (ADR-037) + """ + return self.TIER_ACTIONS.get(tier, self.TIER_ACTIONS[1]) + + async def record_repair_result( + self, + anomaly_key: str, + action: str, + success: bool, + tier: int = 1, + ) -> None: + """ + 記錄修復結果到 AnomalyCounter (ADR-037) + + Args: + anomaly_key: 異常 key + action: 修復動作 + success: 是否成功 + tier: 修復 Tier + """ + counter = get_anomaly_counter() + await counter.record_repair_attempt(anomaly_key, action, success) + + # 如果是 Tier 3 永久修復成功,標記已套用 + if tier >= 3 and success: + await counter.mark_permanent_fix_applied( + anomaly_key=anomaly_key, + fix_description=f"Tier {tier} repair: {action}", + ) + + logger.info( + "repair_result_recorded", + anomaly_key=anomaly_key, + action=action, + success=success, + tier=tier, + ) + # ============================================================================= # Singleton diff --git a/apps/api/src/services/incident_service.py b/apps/api/src/services/incident_service.py index f7936c08..27f4609c 100644 --- a/apps/api/src/services/incident_service.py +++ b/apps/api/src/services/incident_service.py @@ -408,6 +408,7 @@ class IncidentService: async def create_incident_from_signal( self, signal_data: dict[str, Any], + frequency_stats: dict[str, Any] | None = None, ) -> Incident | None: """ 從 Signal 建立 Incident 並雙層寫入 @@ -418,8 +419,12 @@ class IncidentService: 3. 寫入 Episodic Memory (PostgreSQL) - 永久保留 4. 標記 persisted_to_pg = True + Phase 21 (ADR-037) 擴展: + 5. 含異常頻率統計 (用於 Tier 分級修復策略) + Args: signal_data: 從 Redis Stream 收到的 Signal 資料 + frequency_stats: ADR-037 異常頻率統計 (可選) Returns: Incident | None: 成功返回 Incident,失敗返回 None @@ -436,11 +441,27 @@ class IncidentService: fingerprint=signal_data.get("fingerprint"), ) - # 2. 建立 Incident + # 2. 建立 Incident (含頻率統計) + # ADR-037: 統帥指示「重啟只是治標,太常發生的異常必須徹底解決」 + from src.models.incident import IncidentFrequencyStats + + freq_stats = None + if frequency_stats: + freq_stats = IncidentFrequencyStats( + anomaly_key=frequency_stats.get("anomaly_key", "unknown"), + count_1h=frequency_stats.get("count_1h", 0), + count_24h=frequency_stats.get("count_24h", 0), + count_7d=frequency_stats.get("count_7d", 0), + count_30d=frequency_stats.get("count_30d", 0), + escalation_level=frequency_stats.get("escalation_level"), + auto_repair_count=frequency_stats.get("auto_repair_count", 0), + ) + incident = Incident( severity=signal.severity, signals=[signal], affected_services=[signal_data.get("target", "unknown")], + frequency_stats=freq_stats, ) logger.info( diff --git a/apps/api/src/services/nvidia_provider.py b/apps/api/src/services/nvidia_provider.py index 2cf1f6bd..a032afd4 100644 --- a/apps/api/src/services/nvidia_provider.py +++ b/apps/api/src/services/nvidia_provider.py @@ -34,7 +34,6 @@ from src.core.telemetry import get_tracer # 2026-03-29 ogt: P1-2 OTEL 追蹤 from src.models.nvidia import ( NvidiaProviderResult, NvidiaResponse, - NvidiaUsage, ToolCallValidationResult, ToolDefinition, ) diff --git a/apps/api/src/services/openclaw.py b/apps/api/src/services/openclaw.py index 410d92aa..7342874d 100644 --- a/apps/api/src/services/openclaw.py +++ b/apps/api/src/services/openclaw.py @@ -1004,7 +1004,7 @@ class OpenClawService: # Step 2.5: 2026-03-29 ogt - 強制 confidence 必須由 LLM 輸出 # 如果 LLM 沒有輸出 confidence,強制設為 0.5 並標記為 COLLAB - if "confidence" not in data or not isinstance(data["confidence"], (int, float)): + if "confidence" not in data or not isinstance(data["confidence"], int | float): logger.warning( "llm_missing_confidence", raw_confidence=data.get("confidence"), diff --git a/apps/api/src/services/sentry_service.py b/apps/api/src/services/sentry_service.py index fbff8369..898a5911 100644 --- a/apps/api/src/services/sentry_service.py +++ b/apps/api/src/services/sentry_service.py @@ -61,6 +61,8 @@ class SentryService: self, endpoint: str, params: dict[str, Any] | None = None, + method: str = "GET", + json_data: dict[str, Any] | None = None, ) -> dict | list | None: """ 發送 Sentry API 請求 @@ -68,9 +70,13 @@ class SentryService: Args: endpoint: API 端點 (不含 /api/0/ 前綴) params: 查詢參數 + method: HTTP 方法 (GET, POST, PUT, DELETE) + json_data: POST/PUT 請求的 JSON body Returns: JSON 回應,失敗返回 None + + 變更: 2026-03-29 v1.1 - 支援 POST 方法 (Wave A.1/A.4 Sentry Comment) """ headers = {} if self.auth_token: @@ -80,9 +86,17 @@ class SentryService: try: async with httpx.AsyncClient(timeout=self.timeout) as client: - response = await client.get(url, headers=headers, params=params) + if method == "GET": + response = await client.get(url, headers=headers, params=params) + elif method == "POST": + response = await client.post( + url, headers=headers, params=params, json=json_data + ) + else: + logger.error("sentry_api_unsupported_method", method=method) + return None - if response.status_code == 200: + if response.status_code in (200, 201): return response.json() elif response.status_code == 401: logger.warning("sentry_api_unauthorized", endpoint=endpoint) @@ -92,6 +106,7 @@ class SentryService: "sentry_api_error", status_code=response.status_code, endpoint=endpoint, + response_text=response.text[:200], ) return None @@ -188,6 +203,48 @@ class SentryService: return await self._request(f"issues/{issue_id}/events/", params=params) + async def post_issue_comment( + self, + issue_id: str, + text: str, + ) -> dict | None: + """ + 發送 Issue Comment (AI 分析回寫) + + Args: + issue_id: Sentry Issue ID + text: Markdown 格式評論內容 + + Returns: + 成功返回 comment dict,失敗返回 None + + 變更: 2026-03-29 v1.1 - Wave A.4 Sentry Comment 回寫 (ADR-037) + """ + if not self.auth_token: + logger.warning( + "sentry_comment_skipped", + issue_id=issue_id, + reason="SENTRY_AUTH_TOKEN not configured", + ) + return None + + result = await self._request( + f"issues/{issue_id}/comments/", + method="POST", + json_data={"text": text}, + ) + + if result: + logger.info( + "sentry_comment_posted", + issue_id=issue_id, + comment_id=result.get("id"), + ) + else: + logger.error("sentry_comment_failed", issue_id=issue_id) + + return result + # ========================================================================= # Session Replay APIs (2026-03-29 Phase 19 UX 監控) # ========================================================================= diff --git a/apps/api/src/services/stats_service.py b/apps/api/src/services/stats_service.py index 4d34c684..6d4aebc6 100644 --- a/apps/api/src/services/stats_service.py +++ b/apps/api/src/services/stats_service.py @@ -2,23 +2,33 @@ Stats Service - Phase 17 P0 Router 層違規修復 ============================================= -封裝統計 API 的快取邏輯,消除 Router 層直接存取 Redis。 +封裝統計 API 的快取邏輯與資料庫查詢,消除 Router 層直接存取 Redis/DB。 功能: - 快取包裝器 (Redis) -- 統計計算 (透過 Repository) +- 統計計算 (透過 SQLAlchemy) 符合 leWOOOgo 積木化規範: - Router -> Service -> Redis/Repository + +@author Claude Code (首席架構師) +@version 2.0.0 +@date 2026-03-28 (台北時間) +@see feedback_lewooogo_modular_enforcement.md """ import json from collections.abc import Callable, Coroutine -from typing import Any +from datetime import datetime, timedelta +from typing import Any, Protocol, runtime_checkable import structlog +from sqlalchemy import func, select from src.core.redis_client import get_redis +from src.db.base import get_db_context +from src.db.models import IncidentRecord +from src.models.incident import IncidentStatus logger = structlog.get_logger(__name__) @@ -26,13 +36,72 @@ logger = structlog.get_logger(__name__) STATS_CACHE_TTL = 300 # 5 分鐘 +# ============================================================================= +# Protocol (Interface) +# ============================================================================= + + +@runtime_checkable +class IStatsService(Protocol): + """ + 統計服務介面 + + Phase 17 P1: 定義 Protocol 供依賴注入 + """ + + async def get_incident_summary( + self, days: int = 30 + ) -> dict[str, Any]: + """取得事件總覽統計""" + ... + + async def get_resolution_stats( + self, days: int = 30 + ) -> dict[str, Any]: + """取得解決時間統計""" + ... + + async def get_ai_performance( + self, days: int = 30 + ) -> dict[str, Any]: + """取得 AI 效能統計""" + ... + + async def get_affected_services( + self, days: int = 30, limit: int = 10 + ) -> list[dict[str, Any]]: + """取得受影響服務排名""" + ... + + async def get_incident_trends( + self, days: int = 30, period: str = "daily" + ) -> dict[str, Any]: + """取得事件趨勢""" + ... + + async def get_feedback_summary( + self, days: int = 30 + ) -> dict[str, Any]: + """取得人類回饋摘要""" + ... + + +# ============================================================================= +# Implementation +# ============================================================================= + + class StatsService: """ - 統計服務 + 統計服務實作 - 封裝統計 API 的快取邏輯 + 封裝統計 API 的快取邏輯與資料庫查詢 """ + # ------------------------------------------------------------------------- + # 快取相關 + # ------------------------------------------------------------------------- + async def get_cached_or_compute( self, cache_key: str, @@ -43,14 +112,6 @@ class StatsService: 快取包裝器: 先查 Redis,沒有則計算並快取 Phase 17: 從 Router 層遷移至 Service 層 - - Args: - cache_key: Redis key - compute_fn: 計算函數 (async callable) - ttl: 快取時間 (秒) - - Returns: - 快取或計算結果 """ redis_client = get_redis() @@ -76,15 +137,7 @@ class StatsService: return result async def invalidate_cache(self, cache_key: str) -> bool: - """ - 清除指定快取 - - Args: - cache_key: Redis key - - Returns: - 是否成功清除 - """ + """清除指定快取""" redis_client = get_redis() try: await redis_client.delete(cache_key) @@ -94,9 +147,378 @@ class StatsService: logger.warning("stats_cache_invalidate_error", key=cache_key, error=str(e)) return False + # ------------------------------------------------------------------------- + # 統計查詢 (Phase 17 P1: 從 Router 層遷移) + # ------------------------------------------------------------------------- + + async def get_incident_summary(self, days: int = 30) -> dict[str, Any]: + """ + 取得事件總覽統計 + + 包含: 總事件數、狀態分佈、嚴重度分佈、解決率 + """ + cache_key = f"stats:incident_summary:{days}" + + async def compute() -> dict[str, Any]: + async with get_db_context() as db: + since = datetime.utcnow() - timedelta(days=days) + + # 總數 + total_result = await db.execute( + select(func.count(IncidentRecord.incident_id)).where( + IncidentRecord.created_at >= since + ) + ) + total = total_result.scalar() or 0 + + # 狀態分佈 + status_result = await db.execute( + select(IncidentRecord.status, func.count(IncidentRecord.incident_id)) + .where(IncidentRecord.created_at >= since) + .group_by(IncidentRecord.status) + ) + status_dist = [ + {"status": str(row[0]), "count": row[1]} + for row in status_result.all() + ] + + # 嚴重度分佈 + severity_result = await db.execute( + select(IncidentRecord.severity, func.count(IncidentRecord.incident_id)) + .where(IncidentRecord.created_at >= since) + .group_by(IncidentRecord.severity) + ) + severity_dist = [ + {"severity": str(row[0]), "count": row[1]} + for row in severity_result.all() + ] + + # 解決率 + resolved_result = await db.execute( + select(func.count(IncidentRecord.incident_id)).where( + IncidentRecord.created_at >= since, + IncidentRecord.status.in_( + [IncidentStatus.RESOLVED, IncidentStatus.CLOSED] + ), + ) + ) + resolved_count = resolved_result.scalar() or 0 + resolved_rate = (resolved_count / total * 100) if total > 0 else 0.0 + + # 平均告警聚合數 + signals_result = await db.execute( + select(func.avg(func.json_array_length(IncidentRecord.signals))).where( + IncidentRecord.created_at >= since + ) + ) + avg_signals = signals_result.scalar() or 0.0 + + logger.info( + "stats_incident_summary", + total=total, + resolved_rate=resolved_rate, + days=days, + ) + + return { + "total_incidents": total, + "status_distribution": status_dist, + "severity_distribution": severity_dist, + "resolved_rate": round(resolved_rate, 2), + "avg_signals_per_incident": round(float(avg_signals), 2), + } + + return await self.get_cached_or_compute(cache_key, compute) + + async def get_resolution_stats(self, days: int = 30) -> dict[str, Any]: + """ + 取得解決時間統計 + + 計算: 平均、P50、P95、最快、最慢解決時間 + """ + cache_key = f"stats:resolution:{days}" + + async def compute() -> dict[str, Any]: + async with get_db_context() as db: + since = datetime.utcnow() - timedelta(days=days) + + result = await db.execute( + select( + IncidentRecord.created_at, + IncidentRecord.resolved_at, + ).where( + IncidentRecord.created_at >= since, + IncidentRecord.resolved_at.isnot(None), + ) + ) + rows = result.all() + + if not rows: + return { + "avg_minutes": None, + "p50_minutes": None, + "p95_minutes": None, + "fastest_minutes": None, + "slowest_minutes": None, + "sample_size": 0, + } + + durations = [] + for row in rows: + if row.resolved_at and row.created_at: + delta = row.resolved_at - row.created_at + durations.append(delta.total_seconds() / 60) + + if not durations: + return { + "avg_minutes": None, + "p50_minutes": None, + "p95_minutes": None, + "fastest_minutes": None, + "slowest_minutes": None, + "sample_size": 0, + } + + durations.sort() + n = len(durations) + + return { + "avg_minutes": round(sum(durations) / n, 2), + "p50_minutes": round(durations[n // 2], 2), + "p95_minutes": round(durations[min(int(n * 0.95), n - 1)], 2), + "fastest_minutes": round(min(durations), 2), + "slowest_minutes": round(max(durations), 2), + "sample_size": n, + } + + return await self.get_cached_or_compute(cache_key, compute) + + async def get_ai_performance(self, days: int = 30) -> dict[str, Any]: + """ + 取得 AI 提案效能統計 + + 評估: 提案執行率、成功率、有效性評分 + """ + cache_key = f"stats:ai_performance:{days}" + + async def compute() -> dict[str, Any]: + async with get_db_context() as db: + since = datetime.utcnow() - timedelta(days=days) + + result = await db.execute( + select(IncidentRecord.outcome).where( + IncidentRecord.created_at >= since, + IncidentRecord.outcome.isnot(None), + ) + ) + outcomes = [row[0] for row in result.all() if row[0]] + + total = len(outcomes) + executed = sum(1 for o in outcomes if o.get("proposal_executed")) + success = sum( + 1 for o in outcomes if o.get("proposal_executed") and o.get("execution_success") + ) + + effectiveness_dist: dict[int, int] = {1: 0, 2: 0, 3: 0, 4: 0, 5: 0} + scores = [] + for o in outcomes: + score = o.get("effectiveness_score") + if score and 1 <= score <= 5: + effectiveness_dist[score] += 1 + scores.append(score) + + avg_effectiveness = sum(scores) / len(scores) if scores else None + + return { + "total_proposals": total, + "executed_count": executed, + "execution_rate": round((executed / total * 100) if total > 0 else 0, 2), + "success_count": success, + "success_rate": round((success / executed * 100) if executed > 0 else 0, 2), + "avg_effectiveness": round(avg_effectiveness, 2) if avg_effectiveness else None, + "effectiveness_distribution": effectiveness_dist, + } + + return await self.get_cached_or_compute(cache_key, compute) + + async def get_affected_services( + self, days: int = 30, limit: int = 10 + ) -> list[dict[str, Any]]: + """ + 取得最常受影響的服務排名 + """ + cache_key = f"stats:affected_services:{days}:{limit}" + + async def compute() -> dict[str, Any]: + async with get_db_context() as db: + since = datetime.utcnow() - timedelta(days=days) + + result = await db.execute( + select( + IncidentRecord.affected_services, + IncidentRecord.severity, + ).where(IncidentRecord.created_at >= since) + ) + + service_stats: dict[str, dict[str, Any]] = {} + for row in result.all(): + services = row[0] or [] + severity = str(row[1]) + for svc in services: + if svc not in service_stats: + service_stats[svc] = {"count": 0, "severity": {}} + service_stats[svc]["count"] += 1 + service_stats[svc]["severity"][severity] = ( + service_stats[svc]["severity"].get(severity, 0) + 1 + ) + + sorted_services = sorted( + service_stats.items(), key=lambda x: x[1]["count"], reverse=True + )[:limit] + + return { + "services": [ + { + "service": svc, + "incident_count": stats["count"], + "severity_breakdown": stats["severity"], + } + for svc, stats in sorted_services + ] + } + + result = await self.get_cached_or_compute(cache_key, compute) + return result.get("services", []) + + async def get_incident_trends( + self, days: int = 30, period: str = "daily" + ) -> dict[str, Any]: + """ + 取得事件趨勢數據 (SQL GROUP BY 優化版) + """ + cache_key = f"stats:incident_trends:{days}:{period}" + + async def compute() -> dict[str, Any]: + async with get_db_context() as db: + since = datetime.utcnow() - timedelta(days=days) + + trunc_unit = {"daily": "day", "weekly": "week", "monthly": "month"}.get( + period, "day" + ) + + result = await db.execute( + select( + func.date_trunc(trunc_unit, IncidentRecord.created_at).label("period"), + func.count(IncidentRecord.incident_id).label("count"), + ) + .where(IncidentRecord.created_at >= since) + .group_by(func.date_trunc(trunc_unit, IncidentRecord.created_at)) + .order_by(func.date_trunc(trunc_unit, IncidentRecord.created_at)) + ) + rows = result.all() + + trend_data = [] + for row in rows: + if row.period: + if period == "daily": + date_str = row.period.strftime("%Y-%m-%d") + elif period == "weekly": + date_str = row.period.strftime("%Y-W%W") + else: + date_str = row.period.strftime("%Y-%m") + trend_data.append({"date": date_str, "count": row.count}) + + logger.info( + "stats_incident_trends", + period=period, + days=days, + data_points=len(trend_data), + ) + + return {"period": period, "data": trend_data} + + return await self.get_cached_or_compute(cache_key, compute) + + async def get_feedback_summary(self, days: int = 30) -> dict[str, Any]: + """ + 取得人類回饋統計 + """ + cache_key = f"stats:feedback_summary:{days}" + + async def compute() -> dict[str, Any]: + async with get_db_context() as db: + since = datetime.utcnow() - timedelta(days=days) + + result = await db.execute( + select(IncidentRecord.outcome).where( + IncidentRecord.created_at >= since, + IncidentRecord.outcome.isnot(None), + ) + ) + outcomes = [row[0] for row in result.all() if row[0]] + + positive = 0 + neutral = 0 + negative = 0 + themes: dict[str, int] = {} + + for o in outcomes: + score = o.get("effectiveness_score") or o.get("feedback_score") + if score: + if score >= 4: + positive += 1 + elif score == 3: + neutral += 1 + else: + negative += 1 + + notes = o.get("learning_notes") or o.get("notes") or "" + if notes: + notes_lower = notes.lower() + theme_keywords = { + "timeout": ["timeout", "超時", "timed out", "deadline"], + "latency": ["latency", "延遲", "slow", "慢", "p99", "p95"], + "memory": ["memory", "記憶體", "oom", "heap", "內存"], + "cpu": ["cpu", "處理器", "high load", "負載"], + "network": ["network", "網路", "dns", "connection refused"], + "connection": ["connection", "連線", "socket", "tcp"], + "disk": ["disk", "磁碟", "storage", "io", "iops"], + "database": ["database", "資料庫", "db", "query", "deadlock"], + "pod": ["pod", "container", "restart", "crashloop"], + "scaling": ["scale", "擴容", "replica", "hpa"], + "error": ["error", "錯誤", "exception", "fail"], + "config": ["config", "配置", "env", "secret"], + } + for theme, keywords in theme_keywords.items(): + if any(kw in notes_lower for kw in keywords): + themes[theme] = themes.get(theme, 0) + 1 + + sorted_themes = sorted(themes.items(), key=lambda x: x[1], reverse=True)[:5] + common_themes = [t[0] for t in sorted_themes] + + total = positive + neutral + negative + + logger.info( + "stats_feedback_summary", + total=total, + positive=positive, + negative=negative, + days=days, + ) + + return { + "total_feedback": total, + "positive_count": positive, + "neutral_count": neutral, + "negative_count": negative, + "common_themes": common_themes, + } + + return await self.get_cached_or_compute(cache_key, compute) + # ============================================================================= -# Singleton +# Dependency Injection # ============================================================================= _stats_service: StatsService | None = None diff --git a/apps/api/src/workers/signal_worker.py b/apps/api/src/workers/signal_worker.py index 7faf6f90..e0b6b478 100644 --- a/apps/api/src/workers/signal_worker.py +++ b/apps/api/src/workers/signal_worker.py @@ -361,6 +361,41 @@ async def _write_health_files() -> None: logger.info("health_files_written") +async def _heartbeat_loop(shutdown_event: asyncio.Event) -> None: + """ + 心跳循環:定期更新健康檢查文件的時間戳 + + 長期解決方案 (2026-03-28): + - 每 30 秒 touch 健康文件,確保 mtime 更新 + - K8s liveness probe 可檢查 mtime 是否在 60 秒內 + - 防止 Worker 卡住但健康文件仍存在的假陽性 + + @author Claude Code (首席架構師) + @version 1.0.0 + @date 2026-03-28 (台北時間) + """ + from pathlib import Path + + HEARTBEAT_INTERVAL = 30 # 秒 + + while not shutdown_event.is_set(): + try: + Path("/tmp/worker-healthy").touch() + logger.debug("heartbeat_tick") + except Exception as e: + logger.warning("heartbeat_error", error=str(e)) + + # 等待下次心跳或收到關閉信號 + try: + await asyncio.wait_for( + shutdown_event.wait(), + timeout=HEARTBEAT_INTERVAL + ) + break # 收到關閉信號 + except TimeoutError: + continue # 超時,繼續下次心跳 + + async def _main() -> None: """ Standalone worker main function. @@ -419,9 +454,19 @@ async def _main() -> None: signal.signal(signal.SIGTERM, _shutdown_handler) signal.signal(signal.SIGINT, _shutdown_handler) + # 啟動心跳循環 (長期解決方案 - 定期更新健康文件 mtime) + heartbeat_task = asyncio.create_task(_heartbeat_loop(shutdown_event)) + # Wait for shutdown signal await shutdown_event.wait() + # 停止心跳 + heartbeat_task.cancel() + try: + await heartbeat_task + except asyncio.CancelledError: + pass + # Graceful shutdown logger.info("signal_worker_shutting_down") await worker.stop() diff --git a/apps/api/tests/test_adr030_auto_approve.py b/apps/api/tests/test_adr030_auto_approve.py deleted file mode 100644 index 01d63a79..00000000 --- a/apps/api/tests/test_adr030_auto_approve.py +++ /dev/null @@ -1,362 +0,0 @@ -""" -ADR-030 Auto Approve Policy Tests -================================= -測試自動執行策略服務 - -版本: v1.0 -建立: 2026-03-26 (台北時區) -建立者: Claude Code (ADR-030 Phase 4) -""" - -from unittest.mock import MagicMock - -import pytest - -from src.models.playbook import ( - ActionType, - Playbook, - PlaybookStatus, - RepairStep, - RiskLevel, - SymptomPattern, -) -from src.services.auto_approve import ( - AutoApproveConfig, - AutoApprovePolicy, - AutoApproveReason, -) -from src.services.playbook_rag import PlaybookMatch - - -class TestAutoApprovePolicy: - """AutoApprovePolicy 單元測試""" - - @pytest.fixture - def mock_trust_manager(self): - """建立 mock TrustScoreManager""" - manager = MagicMock() - # 預設信任分數為 5 - manager.get_trust_record.return_value = MagicMock(score=5) - return manager - - @pytest.fixture - def policy(self, mock_trust_manager): - return AutoApprovePolicy(trust_manager=mock_trust_manager) - - def create_proposal_data( - self, - risk_level: str = "low", - confidence: float = 0.95, - action: str = "kubectl rollout restart deployment/test-app -n prod", - ) -> dict: - """建立測試用 proposal_data""" - return { - "risk_level": risk_level, - "confidence": confidence, - "action": action, - } - - def create_playbook( - self, - success_count: int = 10, - failure_count: int = 0, - risk_level: RiskLevel = RiskLevel.LOW, - ) -> Playbook: - """建立測試用 Playbook""" - return Playbook( - playbook_id="PB-TEST-001", - name="Test Playbook", - description="For testing", - status=PlaybookStatus.APPROVED, - symptom_pattern=SymptomPattern( - alert_names=["HighCPU"], - affected_services=["test-app"], - ), - repair_steps=[ - RepairStep( - step_number=1, - action_type=ActionType.KUBECTL, - command="kubectl rollout restart deployment/{target}", - risk_level=risk_level, - ), - ], - success_count=success_count, - failure_count=failure_count, - ) - - def create_match( - self, - playbook_id: str = "PB-TEST-001", - similarity: float = 0.95, - ) -> PlaybookMatch: - """建立測試用 PlaybookMatch""" - return PlaybookMatch( - playbook_id=playbook_id, - similarity_score=similarity, - match_type="hybrid", - ) - - # ========================================================================= - # 通過條件測試 - # ========================================================================= - - def test_approve_all_conditions_met(self, policy): - """所有條件都滿足時應該批准""" - proposal = self.create_proposal_data( - risk_level="low", - confidence=0.95, - ) - playbook = self.create_playbook(success_count=10, failure_count=0) - match = self.create_match(similarity=0.95) - - decision = policy.evaluate(proposal, playbook, match) - - assert decision.should_auto_approve is True - assert decision.reason == AutoApproveReason.PLAYBOOK_MATCH - - def test_approve_with_high_confidence(self, policy): - """高信心度應該通過""" - proposal = self.create_proposal_data(confidence=0.99) - playbook = self.create_playbook(success_count=10) - match = self.create_match() - - decision = policy.evaluate(proposal, playbook, match) - - assert decision.should_auto_approve is True - assert decision.confidence == 0.99 - - # ========================================================================= - # 拒絕條件測試 - # ========================================================================= - - def test_reject_critical_risk(self, policy): - """CRITICAL 風險永遠拒絕""" - proposal = self.create_proposal_data(risk_level="critical") - playbook = self.create_playbook() - match = self.create_match() - - decision = policy.evaluate(proposal, playbook, match) - - assert decision.should_auto_approve is False - assert decision.reason == AutoApproveReason.CRITICAL_OPERATION - - def test_reject_high_risk(self, policy): - """HIGH 風險應該拒絕""" - proposal = self.create_proposal_data(risk_level="high") - playbook = self.create_playbook() - match = self.create_match() - - decision = policy.evaluate(proposal, playbook, match) - - assert decision.should_auto_approve is False - assert decision.reason == AutoApproveReason.HIGH_RISK - - def test_reject_medium_risk(self, policy): - """MEDIUM 風險應該拒絕 (預設只允許 low)""" - proposal = self.create_proposal_data(risk_level="medium") - playbook = self.create_playbook() - match = self.create_match() - - decision = policy.evaluate(proposal, playbook, match) - - assert decision.should_auto_approve is False - assert decision.reason == AutoApproveReason.HIGH_RISK - - def test_reject_low_trust_score(self, policy, mock_trust_manager): - """低信任分數應該拒絕""" - mock_trust_manager.get_trust_record.return_value = MagicMock(score=2) - - proposal = self.create_proposal_data() - playbook = self.create_playbook() - match = self.create_match() - - decision = policy.evaluate(proposal, playbook, match) - - assert decision.should_auto_approve is False - assert decision.reason == AutoApproveReason.LOW_TRUST - - def test_reject_low_confidence(self, policy): - """低信心度應該拒絕""" - proposal = self.create_proposal_data(confidence=0.7) - playbook = self.create_playbook() - match = self.create_match() - - decision = policy.evaluate(proposal, playbook, match) - - assert decision.should_auto_approve is False - assert decision.reason == AutoApproveReason.LOW_TRUST - assert "Confidence" in decision.reason_detail - - def test_reject_no_playbook(self, policy): - """無 Playbook 時應該拒絕""" - proposal = self.create_proposal_data() - - decision = policy.evaluate(proposal, None, None) - - assert decision.should_auto_approve is False - assert decision.reason == AutoApproveReason.NO_PLAYBOOK - - def test_reject_low_playbook_success_rate(self, policy): - """Playbook 成功率低應該拒絕""" - proposal = self.create_proposal_data() - playbook = self.create_playbook( - success_count=10, - failure_count=5, # 66.7% success rate - ) - match = self.create_match() - - decision = policy.evaluate(proposal, playbook, match) - - assert decision.should_auto_approve is False - assert decision.reason == AutoApproveReason.LOW_SUCCESS_RATE - - def test_reject_insufficient_history(self, policy): - """Playbook 執行次數不足應該拒絕""" - proposal = self.create_proposal_data() - playbook = self.create_playbook( - success_count=2, # < 3 - failure_count=0, - ) - match = self.create_match() - - decision = policy.evaluate(proposal, playbook, match) - - assert decision.should_auto_approve is False - assert decision.reason == AutoApproveReason.INSUFFICIENT_HISTORY - - # ========================================================================= - # 邊界條件測試 - # ========================================================================= - - def test_boundary_trust_score_exactly_5(self, policy, mock_trust_manager): - """信任分數剛好等於 5 (邊界值)""" - mock_trust_manager.get_trust_record.return_value = MagicMock(score=5) - - proposal = self.create_proposal_data() - playbook = self.create_playbook() - match = self.create_match() - - decision = policy.evaluate(proposal, playbook, match) - - assert decision.should_auto_approve is True - assert decision.trust_score == 5 - - def test_boundary_confidence_exactly_90(self, policy): - """信心度剛好等於 90% (邊界值)""" - proposal = self.create_proposal_data(confidence=0.90) - playbook = self.create_playbook() - match = self.create_match() - - decision = policy.evaluate(proposal, playbook, match) - - assert decision.should_auto_approve is True - assert decision.confidence == 0.90 - - def test_boundary_success_count_exactly_3(self, policy): - """成功次數剛好等於 3 (邊界值)""" - proposal = self.create_proposal_data() - playbook = self.create_playbook(success_count=3, failure_count=0) - match = self.create_match() - - decision = policy.evaluate(proposal, playbook, match) - - assert decision.should_auto_approve is True - assert decision.playbook_success_count == 3 - - def test_boundary_success_rate_exactly_95(self, policy): - """成功率剛好等於 95% (邊界值)""" - proposal = self.create_proposal_data() - playbook = self.create_playbook( - success_count=19, - failure_count=1, # 95% exactly - ) - match = self.create_match() - - decision = policy.evaluate(proposal, playbook, match) - - assert decision.should_auto_approve is True - - # ========================================================================= - # 配置測試 - # ========================================================================= - - def test_disabled_policy_rejects_all(self, mock_trust_manager): - """停用的策略應該拒絕所有請求""" - config = AutoApproveConfig(enabled=False) - policy = AutoApprovePolicy(config=config, trust_manager=mock_trust_manager) - - proposal = self.create_proposal_data() - playbook = self.create_playbook() - match = self.create_match() - - decision = policy.evaluate(proposal, playbook, match) - - assert decision.should_auto_approve is False - assert "disabled" in decision.reason_detail.lower() - - def test_custom_allowed_risk_levels(self, mock_trust_manager): - """自訂允許的風險等級""" - config = AutoApproveConfig(allowed_risk_levels=["low", "medium"]) - policy = AutoApprovePolicy(config=config, trust_manager=mock_trust_manager) - - proposal = self.create_proposal_data(risk_level="medium") - playbook = self.create_playbook() - match = self.create_match() - - decision = policy.evaluate(proposal, playbook, match) - - # medium 風險現在應該被允許 - assert decision.should_auto_approve is True - - # ========================================================================= - # 資料結構測試 - # ========================================================================= - - def test_decision_to_dict(self, policy): - """Decision.to_dict() 應該正常工作""" - proposal = self.create_proposal_data() - playbook = self.create_playbook() - match = self.create_match() - - decision = policy.evaluate(proposal, playbook, match) - data = decision.to_dict() - - assert "should_auto_approve" in data - assert "reason" in data - assert "reason_detail" in data - assert "risk_level" in data - assert "trust_score" in data - assert "confidence" in data - assert "decided_at" in data - - def test_decision_to_audit_log(self, policy): - """Decision.to_audit_log() 應該正常工作""" - proposal = self.create_proposal_data() - playbook = self.create_playbook() - match = self.create_match() - - decision = policy.evaluate(proposal, playbook, match) - log = decision.to_audit_log() - - assert "AUTO_APPROVED" in log or "REQUIRES_HUMAN" in log - assert "risk=" in log - assert "trust=" in log - - -class TestAutoApproveReason: - """AutoApproveReason Enum 測試""" - - def test_all_reasons_exist(self): - """確認所有原因類型都存在""" - # 批准原因 - assert AutoApproveReason.PLAYBOOK_MATCH.value == "playbook_match" - assert AutoApproveReason.TRUST_SCORE.value == "trust_score" - assert AutoApproveReason.LOW_RISK.value == "low_risk" - - # 拒絕原因 - assert AutoApproveReason.HIGH_RISK.value == "high_risk" - assert AutoApproveReason.CRITICAL_OPERATION.value == "critical_operation" - assert AutoApproveReason.LOW_TRUST.value == "low_trust" - assert AutoApproveReason.NO_PLAYBOOK.value == "no_playbook" - assert AutoApproveReason.LOW_SUCCESS_RATE.value == "low_success_rate" - assert AutoApproveReason.INSUFFICIENT_HISTORY.value == "insufficient_history" diff --git a/apps/api/tests/test_adr030_learning_service.py b/apps/api/tests/test_adr030_learning_service.py deleted file mode 100644 index 1834e5aa..00000000 --- a/apps/api/tests/test_adr030_learning_service.py +++ /dev/null @@ -1,326 +0,0 @@ -""" -ADR-030 Learning Service Tests -============================== -測試持續學習服務 - -版本: v1.0 -建立: 2026-03-26 (台北時區) -建立者: Claude Code (ADR-030 Phase 5) -""" - -import uuid -from unittest.mock import MagicMock, patch - -import pytest - -from src.models.approval import ApprovalRequest, RiskLevel -from src.services.learning_service import ( - ExecutionResult, - FeedbackRequest, - FeedbackType, - LearningRecord, - LearningService, -) - - -class TestExecutionResult: - """ExecutionResult 資料結構測試""" - - def test_create_success_result(self): - """建立成功執行結果""" - result = ExecutionResult( - approval_id="APR-001", - incident_id="INC-001", - action="kubectl rollout restart", - success=True, - duration_seconds=2.5, - ) - - assert result.success is True - assert result.error_message is None - assert result.duration_seconds == 2.5 - - def test_create_failure_result(self): - """建立失敗執行結果""" - result = ExecutionResult( - approval_id="APR-001", - incident_id="INC-001", - action="kubectl rollout restart", - success=False, - error_message="Pod not found", - ) - - assert result.success is False - assert result.error_message == "Pod not found" - - def test_to_dict(self): - """to_dict() 應該正常工作""" - result = ExecutionResult( - approval_id="APR-001", - incident_id="INC-001", - action="kubectl rollout restart", - success=True, - duration_seconds=1.5, - ) - - data = result.to_dict() - - assert data["approval_id"] == "APR-001" - assert data["incident_id"] == "INC-001" - assert data["success"] is True - assert "executed_at" in data - - -class TestFeedbackRequest: - """FeedbackRequest 資料結構測試""" - - def test_create_human_approve_feedback(self): - """建立人工批准反饋""" - feedback = FeedbackRequest( - incident_id="INC-001", - feedback_type=FeedbackType.HUMAN_APPROVE, - submitted_by="admin", - ) - - assert feedback.feedback_type == FeedbackType.HUMAN_APPROVE - assert feedback.submitted_by == "admin" - - def test_create_effectiveness_rating(self): - """建立有效性評分反饋""" - feedback = FeedbackRequest( - incident_id="INC-001", - feedback_type=FeedbackType.EFFECTIVENESS_RATING, - effectiveness_score=5, - learning_notes="非常有效的修復", - ) - - assert feedback.effectiveness_score == 5 - assert feedback.learning_notes == "非常有效的修復" - - -class TestLearningRecord: - """LearningRecord 資料結構測試""" - - def test_create_learning_record(self): - """建立學習記錄""" - record = LearningRecord( - incident_id="INC-001", - feedback_type=FeedbackType.EXECUTION_SUCCESS, - action_pattern="restart:test-app-*", - trust_before=3, - trust_after=4, - playbook_updated=True, - ) - - assert record.trust_before == 3 - assert record.trust_after == 4 - assert record.playbook_updated is True - - def test_to_dict(self): - """to_dict() 應該正常工作""" - record = LearningRecord( - incident_id="INC-001", - feedback_type=FeedbackType.EXECUTION_SUCCESS, - action_pattern="restart:test-app-*", - trust_before=3, - trust_after=4, - ) - - data = record.to_dict() - - assert data["incident_id"] == "INC-001" - assert data["feedback_type"] == "execution_success" - assert data["trust_before"] == 3 - assert data["trust_after"] == 4 - - -class TestLearningService: - """LearningService 單元測試""" - - @pytest.fixture - def mock_trust_manager(self): - """建立 mock TrustScoreManager""" - manager = MagicMock() - manager.get_trust_record.return_value = MagicMock(score=3) - manager.record_approval.return_value = None - manager.record_rejection.return_value = None - return manager - - @pytest.fixture - def service(self, mock_trust_manager): - """建立 LearningService with mocked dependencies""" - with patch("src.services.learning_service.get_trust_manager", return_value=mock_trust_manager): - return LearningService() - - def create_approval(self, action: str = "kubectl rollout restart deployment/test-app -n prod") -> ApprovalRequest: - """建立測試用 ApprovalRequest""" - return ApprovalRequest( - id=uuid.uuid4(), - action=action, - description="Test approval", - risk_level=RiskLevel.LOW, - required_signatures=1, - requested_by="test-system", - ) - - # ========================================================================= - # Action Pattern 提取測試 - # ========================================================================= - - def test_extract_action_pattern_restart(self, service): - """測試 restart 動作模式提取""" - pattern = service._extract_action_pattern( - "kubectl rollout restart deployment/test-app-abc123-def456 -n prod" - ) - - # 應該移除 pod hash suffix - assert "restart" in pattern - assert "abc123" not in pattern - - def test_extract_action_pattern_delete(self, service): - """測試 delete 動作模式提取""" - pattern = service._extract_action_pattern( - "kubectl delete pod test-pod-xyz789-abc123 -n staging" - ) - - assert "delete" in pattern - assert "xyz789" not in pattern - - def test_extract_action_pattern_empty(self, service): - """測試空動作""" - pattern = service._extract_action_pattern("") - - assert pattern == "unknown" - - def test_extract_action_pattern_short(self, service): - """測試太短的動作""" - pattern = service._extract_action_pattern("kubectl") - - assert pattern == "unknown" - - # ========================================================================= - # 執行結果處理測試 - # ========================================================================= - - @pytest.mark.asyncio - async def test_process_success_result(self, service, mock_trust_manager): - """處理成功執行結果""" - approval = self.create_approval() - result = ExecutionResult( - approval_id="apr-001", - incident_id="INC-001", - action=approval.action, - success=True, - duration_seconds=2.0, - ) - - record = await service.process_execution_result(approval, result) - - assert record.feedback_type == FeedbackType.EXECUTION_SUCCESS - mock_trust_manager.record_approval.assert_called_once() - - @pytest.mark.asyncio - async def test_process_failure_result(self, service, mock_trust_manager): - """處理失敗執行結果""" - approval = self.create_approval() - result = ExecutionResult( - approval_id="apr-001", - incident_id="INC-001", - action=approval.action, - success=False, - error_message="Pod not found", - ) - - record = await service.process_execution_result(approval, result) - - assert record.feedback_type == FeedbackType.EXECUTION_FAILURE - mock_trust_manager.record_rejection.assert_called_once() - - # ========================================================================= - # 人工反饋處理測試 - # ========================================================================= - - @pytest.mark.asyncio - async def test_process_human_approve(self, service, mock_trust_manager): - """處理人工批准反饋""" - feedback = FeedbackRequest( - incident_id="INC-001", - feedback_type=FeedbackType.HUMAN_APPROVE, - submitted_by="admin", - ) - - record = await service.process_human_feedback(feedback) - - assert record.feedback_type == FeedbackType.HUMAN_APPROVE - mock_trust_manager.record_approval.assert_called_once() - - @pytest.mark.asyncio - async def test_process_human_reject(self, service, mock_trust_manager): - """處理人工拒絕反饋""" - feedback = FeedbackRequest( - incident_id="INC-001", - feedback_type=FeedbackType.HUMAN_REJECT, - submitted_by="admin", - ) - - record = await service.process_human_feedback(feedback) - - assert record.feedback_type == FeedbackType.HUMAN_REJECT - mock_trust_manager.record_rejection.assert_called_once() - - @pytest.mark.asyncio - async def test_process_high_effectiveness_rating(self, service, mock_trust_manager): - """處理高有效性評分 (4-5 分)""" - feedback = FeedbackRequest( - incident_id="INC-001", - feedback_type=FeedbackType.EFFECTIVENESS_RATING, - effectiveness_score=5, - ) - - record = await service.process_human_feedback(feedback) - - assert record.feedback_type == FeedbackType.EFFECTIVENESS_RATING - mock_trust_manager.record_approval.assert_called_once() - - @pytest.mark.asyncio - async def test_process_low_effectiveness_rating(self, service, mock_trust_manager): - """處理低有效性評分 (1-2 分)""" - feedback = FeedbackRequest( - incident_id="INC-001", - feedback_type=FeedbackType.EFFECTIVENESS_RATING, - effectiveness_score=1, - ) - - record = await service.process_human_feedback(feedback) - - assert record.feedback_type == FeedbackType.EFFECTIVENESS_RATING - mock_trust_manager.record_rejection.assert_called_once() - - @pytest.mark.asyncio - async def test_process_medium_effectiveness_rating(self, service, mock_trust_manager): - """處理中等有效性評分 (3 分) - 不調整信任度""" - feedback = FeedbackRequest( - incident_id="INC-001", - feedback_type=FeedbackType.EFFECTIVENESS_RATING, - effectiveness_score=3, - ) - - record = await service.process_human_feedback(feedback) - - assert record.feedback_type == FeedbackType.EFFECTIVENESS_RATING - # 中等評分不應該調整信任度 - mock_trust_manager.record_approval.assert_not_called() - mock_trust_manager.record_rejection.assert_not_called() - - -class TestFeedbackType: - """FeedbackType Enum 測試""" - - def test_all_feedback_types_exist(self): - """確認所有反饋類型都存在""" - assert FeedbackType.EXECUTION_SUCCESS.value == "execution_success" - assert FeedbackType.EXECUTION_FAILURE.value == "execution_failure" - assert FeedbackType.HUMAN_APPROVE.value == "human_approve" - assert FeedbackType.HUMAN_REJECT.value == "human_reject" - assert FeedbackType.HUMAN_OVERRIDE.value == "human_override" - assert FeedbackType.EFFECTIVENESS_RATING.value == "effectiveness_rating" diff --git a/apps/api/tests/test_anomaly_counter.py b/apps/api/tests/test_anomaly_counter.py new file mode 100644 index 00000000..64a9c2be --- /dev/null +++ b/apps/api/tests/test_anomaly_counter.py @@ -0,0 +1,196 @@ +""" +AnomalyCounter 整合測試 +======================= +ADR-037: 監控增強架構 - 異常頻率統計 + +🔴🔴 遵循「禁止 Mock 測試鐵律」- 使用真實 Redis + +建立: 2026-03-29 (台北時區) Claude Code +""" + +import pytest + +from src.services.anomaly_counter import AnomalyCounter, AnomalyFrequency + + +class TestAnomalyCounterHashSignature: + """測試異常簽名 Hash 生成""" + + def test_same_input_same_hash(self): + """相同輸入應產生相同 hash""" + sig1 = {"alert_name": "PodCrash", "service": "api"} + sig2 = {"alert_name": "PodCrash", "service": "api"} + assert AnomalyCounter.hash_signature(sig1) == AnomalyCounter.hash_signature(sig2) + + def test_different_input_different_hash(self): + """不同輸入應產生不同 hash""" + sig1 = {"alert_name": "PodCrash", "service": "api"} + sig2 = {"alert_name": "PodCrash", "service": "web"} + assert AnomalyCounter.hash_signature(sig1) != AnomalyCounter.hash_signature(sig2) + + def test_ignores_extra_fields(self): + """應忽略非關鍵欄位 (如 timestamp)""" + sig1 = {"alert_name": "PodCrash", "service": "api"} + sig2 = {"alert_name": "PodCrash", "service": "api", "timestamp": "2026-01-01"} + assert AnomalyCounter.hash_signature(sig1) == AnomalyCounter.hash_signature(sig2) + + def test_alertname_alias(self): + """應支援 alertname (Prometheus 格式) 別名""" + sig1 = {"alert_name": "PodCrash", "service": "api"} + sig2 = {"alertname": "PodCrash", "service": "api"} + assert AnomalyCounter.hash_signature(sig1) == AnomalyCounter.hash_signature(sig2) + + def test_job_alias(self): + """應支援 job (Prometheus 格式) 別名""" + sig1 = {"alert_name": "PodCrash", "service": "api"} + sig2 = {"alert_name": "PodCrash", "job": "api"} + assert AnomalyCounter.hash_signature(sig1) == AnomalyCounter.hash_signature(sig2) + + +class TestAnomalyFrequencyToDict: + """測試 AnomalyFrequency.to_dict()""" + + def test_to_dict_returns_all_fields(self): + """to_dict 應返回所有欄位""" + from datetime import datetime + + freq = AnomalyFrequency( + anomaly_key="abc123", + count_1h=3, + count_24h=10, + count_7d=50, + count_30d=200, + first_seen=datetime(2026, 3, 1), + last_seen=datetime(2026, 3, 29), + auto_repair_count=5, + permanent_fix_applied=False, + escalation_level="ESCALATE", + ) + + d = freq.to_dict() + + assert d["anomaly_key"] == "abc123" + assert d["count_1h"] == 3 + assert d["count_24h"] == 10 + assert d["count_7d"] == 50 + assert d["count_30d"] == 200 + assert d["auto_repair_count"] == 5 + assert d["permanent_fix_applied"] is False + assert d["escalation_level"] == "ESCALATE" + + +class TestAnomalyCounterEscalationLevel: + """測試升級等級判斷""" + + def test_no_escalation_below_threshold(self): + """低於閾值不應升級""" + counter = AnomalyCounter.__new__(AnomalyCounter) + assert counter._get_escalation_level(2) is None + + def test_repeat_at_threshold(self): + """達到 REPEAT 閾值 (3 次)""" + counter = AnomalyCounter.__new__(AnomalyCounter) + assert counter._get_escalation_level(3) == "REPEAT" + + def test_escalate_at_threshold(self): + """達到 ESCALATE 閾值 (5 次)""" + counter = AnomalyCounter.__new__(AnomalyCounter) + assert counter._get_escalation_level(5) == "ESCALATE" + + def test_permanent_fix_at_threshold(self): + """達到 PERMANENT_FIX 閾值 (10 次)""" + counter = AnomalyCounter.__new__(AnomalyCounter) + assert counter._get_escalation_level(10) == "PERMANENT_FIX" + + def test_highest_level_wins(self): + """超過最高閾值應返回最高等級""" + counter = AnomalyCounter.__new__(AnomalyCounter) + assert counter._get_escalation_level(100) == "PERMANENT_FIX" + + +# ============================================================================= +# 整合測試 (需要真實 Redis) +# ============================================================================= + + +@pytest.mark.asyncio +@pytest.mark.integration +class TestAnomalyCounterIntegration: + """ + 整合測試 - 需要真實 Redis + + 執行方式: pytest -m integration tests/test_anomaly_counter.py + """ + + @pytest.fixture + async def counter(self): + """取得 AnomalyCounter 實例""" + from src.core.redis_client import get_redis + + redis = get_redis() + counter = AnomalyCounter(redis) + + # 清理測試資料 + test_key = "test_anomaly_key" + await redis.delete(f"anomaly:timeline:{test_key}") + await redis.delete(f"anomaly:repair_count:{test_key}") + await redis.delete(f"anomaly:permanent_fix:{test_key}") + await redis.delete(f"anomaly:metadata:{test_key}") + await redis.delete(f"anomaly:repair_history:{test_key}") + + return counter + + async def test_record_anomaly_returns_frequency(self, counter): + """record_anomaly 應返回頻率統計""" + signature = { + "alert_name": "TestAlert", + "service": "test-service", + "namespace": "test", + } + + freq = await counter.record_anomaly(signature) + + assert freq.count_1h >= 1 + assert freq.count_24h >= 1 + assert freq.anomaly_key is not None + + async def test_record_anomaly_increments_count(self, counter): + """多次記錄應遞增計數""" + signature = { + "alert_name": "TestAlert", + "service": "test-service", + "namespace": "test", + } + + freq1 = await counter.record_anomaly(signature) + _ = await counter.record_anomaly(signature) + freq3 = await counter.record_anomaly(signature) + + assert freq3.count_1h == freq1.count_1h + 2 + assert freq3.count_24h == freq1.count_24h + 2 + + async def test_record_repair_attempt(self, counter): + """記錄修復嘗試""" + anomaly_key = "test_repair_key" + + await counter.record_repair_attempt(anomaly_key, "restart_pod", True) + await counter.record_repair_attempt(anomaly_key, "restart_pod", False) + + stats = await counter.get_repair_success_rate(anomaly_key, "restart_pod") + + assert stats["total"] == 2 + assert stats["success"] == 1 + assert stats["success_rate"] == 0.5 + + async def test_should_skip_action_low_success_rate(self, counter): + """低成功率動作應跳過""" + anomaly_key = "test_skip_key" + + # 模擬多次失敗 + await counter.record_repair_attempt(anomaly_key, "bad_action", False) + await counter.record_repair_attempt(anomaly_key, "bad_action", False) + await counter.record_repair_attempt(anomaly_key, "bad_action", False) + + should_skip = await counter.should_skip_action(anomaly_key, "bad_action") + + assert should_skip is True diff --git a/apps/api/tests/test_approval_field_alignment.py b/apps/api/tests/test_approval_field_alignment.py index f0513d34..3c434027 100644 --- a/apps/api/tests/test_approval_field_alignment.py +++ b/apps/api/tests/test_approval_field_alignment.py @@ -18,14 +18,11 @@ from pydantic import ValidationError from src.models.approval import ( ApprovalRequestCreate, - ApprovalRequestBase, BlastRadius, DataImpact, - DryRunCheck, RiskLevel, ) - # ============================================================================= # Test: ApprovalRequestCreate Schema 驗證 # ============================================================================= diff --git a/apps/api/tests/test_nvidia_provider.py b/apps/api/tests/test_nvidia_provider.py index d868b5e2..c91ebc3e 100644 --- a/apps/api/tests/test_nvidia_provider.py +++ b/apps/api/tests/test_nvidia_provider.py @@ -6,7 +6,6 @@ NVIDIA Provider Tests - ADR-036 注意: 這些是單元測試,不需要真實的 NVIDIA API Key """ -import json import pytest @@ -407,7 +406,7 @@ class TestAIRouterNvidiaIntegration: def test_tool_calling_route(self): """測試 Tool Calling 路由""" - from src.services.ai_router import get_ai_router, AIProvider, reset_ai_router + from src.services.ai_router import AIProvider, get_ai_router, reset_ai_router reset_ai_router() router = get_ai_router() @@ -425,7 +424,7 @@ class TestAIRouterNvidiaIntegration: def test_existing_routing_not_affected(self): """測試現有路由規則不受影響""" - from src.services.ai_router import get_ai_router, AIProvider, reset_ai_router + from src.services.ai_router import AIProvider, get_ai_router, reset_ai_router reset_ai_router() router = get_ai_router() @@ -568,9 +567,9 @@ class TestPrometheusMetrics: def test_metrics_defined(self): """測試 Prometheus Metrics 已定義""" from src.services.nvidia_provider import ( - NVIDIA_REQUESTS_TOTAL, - NVIDIA_LATENCY_HISTOGRAM, NVIDIA_CIRCUIT_BREAKER_STATE, + NVIDIA_LATENCY_HISTOGRAM, + NVIDIA_REQUESTS_TOTAL, ) assert NVIDIA_REQUESTS_TOTAL is not None @@ -593,8 +592,8 @@ class TestExponentialBackoff: """測試退避常數已定義""" from src.services.nvidia_provider import ( RETRY_BASE_DELAY, - RETRY_MAX_DELAY, RETRY_EXPONENTIAL_BASE, + RETRY_MAX_DELAY, ) assert RETRY_BASE_DELAY == 1.0 diff --git a/apps/api/tests/test_telegram_integration.py b/apps/api/tests/test_telegram_integration.py index 940ad5a6..fb3ea501 100644 --- a/apps/api/tests/test_telegram_integration.py +++ b/apps/api/tests/test_telegram_integration.py @@ -13,12 +13,11 @@ P1-4: Telegram 整合驗證測試 建立者: Claude Code (P1-4 Telegram 驗證) """ -import pytest from src.services.telegram_gateway import ( + RISK_EMOJI_MAP, SignOzMetricsBlock, TelegramMessage, - RISK_EMOJI_MAP, ) # 本地定義 (避免 import 錯誤)