From 670cd5df86bb3078ed0ca8be85f739451d506ba2 Mon Sep 17 00:00:00 2001 From: OG T Date: Fri, 10 Apr 2026 11:35:10 +0800 Subject: [PATCH] =?UTF-8?q?refactor(flywheel):=20=E9=A6=96=E5=B8=AD?= =?UTF-8?q?=E6=9E=B6=E6=A7=8B=E5=B8=AB=E5=AF=A9=E6=9F=A5=E4=BF=AE=E6=AD=A3?= =?UTF-8?q?=20C1/C2/I1/I2/I3/I4/M1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit C1 — Repository 層修正 (積木化鐵律): 新增 PlaybookEmbeddingRepository (pgvector UPSERT) playbook_embedding_service 改透過 Repository 存取 DB,不再直接 db.execute(text(...)) C2 — Router 層業務邏輯移入 Service 層: create_incident_for_approval + extract_affected_services (去掉底線前綴) 移入 incident_service.py webhooks.py 改從 incident_service import,自身不再含業務邏輯 I1 — _infra_jobs 提升為 module-level frozenset (_INFRA_JOB_NAMES),避免每次呼叫重建 I2 — _persist_embeddings_to_db 補齊 PlaybookRAGService / list[Playbook] 型別標注 I3 — embedding 格式顯式化: "[" + ",".join(str(float(x)) for x in embedding) + "]" 防止 pgvector 因格式差異靜默解析失敗 I4 — import asyncio 移至 main.py 頂層,移除 try 區塊內重複 import M1 — similarity.py: 移除死代碼 `if union > 0 else 0.0` union 在兩個集合都非空時不可能為 0 2026-04-10 Asia/Taipei — Claude Sonnet 4.6 Co-Authored-By: Claude Sonnet 4.6 --- apps/api/src/api/v1/webhooks.py | 157 ++---------------- apps/api/src/main.py | 2 +- .../playbook_embedding_repository.py | 104 ++++++++++++ apps/api/src/services/incident_service.py | 140 ++++++++++++++++ .../services/playbook_embedding_service.py | 73 ++++---- apps/api/src/utils/similarity.py | 2 +- 6 files changed, 292 insertions(+), 186 deletions(-) create mode 100644 apps/api/src/repositories/playbook_embedding_repository.py diff --git a/apps/api/src/api/v1/webhooks.py b/apps/api/src/api/v1/webhooks.py index dbd4f048..63b2ab3d 100644 --- a/apps/api/src/api/v1/webhooks.py +++ b/apps/api/src/api/v1/webhooks.py @@ -43,7 +43,7 @@ from src.models.approval import ( DryRunCheck, RiskLevel, ) -from src.models.incident import Incident, IncidentStatus, Severity, Signal + # R4 #129 (2026-04-01 ogt): AlertPayload/AlertResponse 移至 models 層,AlertAnalyzer 移至 services 層 # ogt 更新 v1.1 2026-04-01 台北時間: generate_alert_fingerprint 移至 alert_analyzer_service (ADR-024) # [首席架構師] 移除 generate_alert_fingerprint 直接 import,改用 AlertAnalyzer.generate_fingerprint v1.2 2026-04-01 Asia/Taipei @@ -52,7 +52,12 @@ from src.services.alert_analyzer_service import AlertAnalyzer from src.services.approval_db import get_approval_service # Phase 17 P0: Service 層 (消除 Router 直接存取 Redis) -from src.services.incident_service import get_incident_service +# C2 修正 (首席架構師審查 2026-04-10): create_incident_for_approval + extract_affected_services 已移入 Service 層 +from src.services.incident_service import ( + create_incident_for_approval, + extract_affected_services, + get_incident_service, +) from src.services.auto_repair_service import AutoRepairService # Phase 5: OpenClaw AI Engine @@ -73,151 +78,9 @@ logger = get_logger("awoooi.webhooks") # Incident-Approval 同步 (feedback_incident_approval_sync.md 鐵律) # ============================================================================= -# 風險等級 → 事件嚴重度映射 -RISK_TO_SEVERITY = { - "critical": Severity.P0, - "high": Severity.P1, - "medium": Severity.P2, - "low": Severity.P3, -} - -# Incident TTL: 7 天 (秒) -INCIDENT_TTL_SECONDS = 7 * 24 * 60 * 60 - - -def _extract_affected_services(labels: dict, target_resource: str) -> list[str]: - """ - 從告警 labels 提取真實服務名,防止 IP 或 alertname 污染 affected_services。 - - 優先序: - 1. component label(Docker-compose 層告警最可靠) - 2. job label(排除 node-exporter / pushgateway 等基礎設施 job) - 3. pod label(取 deployment name,去掉 hash suffix) - 4. target_resource(不含冒號、不等於 alertname 時才採用) - 5. 空列表(讓通用型 Playbook 透過空集合豁免規則匹配) - - Phase 1 飛輪修復 — 2026-04-10 Claude Sonnet 4.6 Asia/Taipei - 根因: HostHighCpuLoad/192.168.0.188:9100 被誤填進 affected_services, - 導致 Jaccard 匹配永遠為 0,飛輪無法啟動。 - """ - alertname = labels.get("alertname", "") - - # 1. component(docker-compose 服務名如 "sentry", "momo-app") - if comp := labels.get("component"): - return [comp] - - # 2. job,排除基礎設施 exporter 類 - _infra_jobs = {"node", "node-exporter", "pushgateway", "blackbox", - "prometheus", "alertmanager", "cadvisor"} - if job := labels.get("job"): - if job.lower().replace("-", "").replace("_", "") not in { - j.replace("-", "").replace("_", "") for j in _infra_jobs - }: - return [job] - - # 3. pod label → 取 deployment name(去掉 ReplicaSet/Pod hash 後兩段) - if pod := labels.get("pod"): - parts = pod.rsplit("-", 2) - if len(parts) >= 3 and len(parts[-1]) == 5 and len(parts[-2]) in (9, 10): - return [parts[0]] # 去掉 - - elif len(parts) >= 2: - return ["-".join(parts[:-1])] - - # 4. target_resource 是真實服務名(不含冒號、不等於 alertname) - if (target_resource - and ":" not in target_resource - and target_resource != alertname - and not target_resource[0].isdigit()): # 排除純 IP - return [target_resource] - - # 5. 無法識別 → 返回空(讓空集合豁免規則接手) - return [] - - -async def create_incident_for_approval( - approval_id: str, - risk_level: str, - target_resource: str, - namespace: str, - alert_type: str, - message: str, - source: str = "alertmanager", - alertname: str | None = None, - alert_labels: dict | None = None, -) -> str: - """ - 為 Approval 創建對應的 Incident (活躍事件同步) - - 設計原則: - - Approval 和 Incident 必須同時存在 - - Incident 存入 Redis (Working Memory) - - 7 天 TTL 自動過期 - - Returns: - str: Incident ID - """ - from uuid import UUID - - # Phase 17 P0: Router 層違規修復 - 改用 Service 層 - incident_service = get_incident_service() - - # 映射嚴重度 - severity = RISK_TO_SEVERITY.get(risk_level.lower(), Severity.P2) - - # Phase 1 飛輪修復 (2026-04-10 Claude Sonnet 4.6): - # Signal 保留完整 labels(含 instance/job/pod 等),供執行層變數替換 - # alert_name 用 alertname(如 HostHighCpuLoad),不是 alert_type(如 "custom") - _labels = { - "namespace": namespace, - "resource": target_resource, - "alertname": alertname or alert_type, - **(alert_labels or {}), # 完整 Prometheus labels,保留 instance/job/pod/component - } - signal = Signal( - alert_name=alertname or alert_type, # 用真實 alertname,非 alert_type="custom" - severity=severity, - source=source, - fired_at=now_taipei(), - labels=_labels, - annotations={"message": message}, - ) - - # Phase 1 飛輪修復: affected_services 用語意提取,不直接放 target_resource - # _extract_affected_services 防止 IP/alertname 污染匹配層 - _affected_services = _extract_affected_services(_labels, target_resource) - - # 建立 Incident - incident = Incident( - status=IncidentStatus.INVESTIGATING, - severity=severity, - signals=[signal], - affected_services=_affected_services, - proposal_ids=[UUID(approval_id)], - ) - - # Phase 17 P0: 透過 Service 存入 Working Memory (Redis) - await incident_service.save_to_working_memory(incident) - - # 2026-04-06 ogt: Phase 26 — 同時寫入 Episodic Memory (PostgreSQL) - # 原本只存 Redis,TTL 7天後消失,Playbook 萃取和 KM 永遠找不到 incident - try: - await incident_service.save_to_episodic_memory(incident) - except Exception as _pg_err: - logger.warning( - "incident_episodic_memory_failed", - incident_id=incident.incident_id, - error=str(_pg_err), - ) - - logger.info( - "incident_created_for_approval", - incident_id=incident.incident_id, - approval_id=approval_id, - severity=severity.value, - target=target_resource, - ) - - return incident.incident_id +# C2 修正 (首席架構師審查 2026-04-10 Claude Sonnet 4.6 Asia/Taipei): +# RISK_TO_SEVERITY / INCIDENT_TTL_SECONDS / _extract_affected_services / create_incident_for_approval +# 已移入 src/services/incident_service.py(業務邏輯不屬 Router 層) # ============================================================================= diff --git a/apps/api/src/main.py b/apps/api/src/main.py index 4b227bc4..48cc6b92 100644 --- a/apps/api/src/main.py +++ b/apps/api/src/main.py @@ -18,6 +18,7 @@ Version: 1.0.0 Date: 2026-03-20 """ +import asyncio import os from collections.abc import AsyncGenerator from contextlib import asynccontextmanager @@ -285,7 +286,6 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]: # 目的: 確保 playbook_embeddings 表有最新向量,供語義相似度查詢 # 使用 asyncio.create_task 非阻塞 — 不影響 API 啟動速度 try: - import asyncio from src.services.playbook_embedding_service import ensure_playbook_embeddings_indexed asyncio.create_task(ensure_playbook_embeddings_indexed()) logger.info("playbook_embedding_indexing_scheduled") diff --git a/apps/api/src/repositories/playbook_embedding_repository.py b/apps/api/src/repositories/playbook_embedding_repository.py new file mode 100644 index 00000000..34e19847 --- /dev/null +++ b/apps/api/src/repositories/playbook_embedding_repository.py @@ -0,0 +1,104 @@ +""" +Playbook Embedding Repository — pgvector 持久化層 +================================================= +C1 修正 (首席架構師審查 2026-04-10 Claude Sonnet 4.6 Asia/Taipei): + 違規修復: playbook_embedding_service.py 直接 db.execute(text(...)) 繞過 Repository 層 + 解法: 移至此 Repository,Service 層透過此介面存取 + +職責: playbook_embeddings 表的 CRUD (pgvector) +對應遷移: migrations/flywheel_playbook_embeddings.sql +""" + +from __future__ import annotations + +import structlog +from sqlalchemy import text +from sqlalchemy.ext.asyncio import AsyncSession + +logger = structlog.get_logger(__name__) + + +class PlaybookEmbeddingRepository: + """ + Playbook Embedding Repository + + 職責: playbook_embeddings 表 CRUD + 使用 pgvector 儲存 nomic-embed-text 768 維向量 + + Args: + db: SQLAlchemy AsyncSession (DI 注入) + """ + + def __init__(self, db: AsyncSession) -> None: + self._db = db + + async def upsert( + self, + playbook_id: str, + embedding: list[float], + alert_names: list[str], + keywords: list[str], + ) -> bool: + """ + 新增或更新 Playbook 向量 (UPSERT)。 + + pgvector 格式: '[x,y,z,...]' 字串 + ON CONFLICT (playbook_id) → 更新向量快照與 updated_at + + Args: + playbook_id: Playbook ID + embedding: 768 維浮點向量 (list[float]) + alert_names: 索引時的 alert_names 快照 + keywords: 索引時的 keywords 快照 + + Returns: + True 表示成功 + """ + try: + # 顯式格式化保證 pgvector 可解析,避免 str(list) 空格差異 + vec_str = "[" + ",".join(str(float(x)) for x in embedding) + "]" + + await self._db.execute( + text(""" + INSERT INTO playbook_embeddings + (playbook_id, embedding, alert_names, keywords, indexed_at, updated_at) + VALUES + (:playbook_id, :embedding, :alert_names, :keywords, NOW(), NOW()) + ON CONFLICT (playbook_id) DO UPDATE SET + embedding = EXCLUDED.embedding, + alert_names = EXCLUDED.alert_names, + keywords = EXCLUDED.keywords, + updated_at = NOW() + """), + { + "playbook_id": playbook_id, + "embedding": vec_str, + "alert_names": alert_names, + "keywords": keywords, + }, + ) + return True + + except Exception as e: + logger.warning( + "playbook_embedding_upsert_failed", + playbook_id=playbook_id, + error=str(e), + ) + return False + + async def delete(self, playbook_id: str) -> bool: + """刪除 Playbook 向量記錄。""" + try: + await self._db.execute( + text("DELETE FROM playbook_embeddings WHERE playbook_id = :pid"), + {"pid": playbook_id}, + ) + return True + except Exception as e: + logger.warning( + "playbook_embedding_delete_failed", + playbook_id=playbook_id, + error=str(e), + ) + return False diff --git a/apps/api/src/services/incident_service.py b/apps/api/src/services/incident_service.py index 9e0c6ef7..2bd01ed0 100644 --- a/apps/api/src/services/incident_service.py +++ b/apps/api/src/services/incident_service.py @@ -14,11 +14,16 @@ Incident Service - Phase 6.2 雙層記憶寫入 統帥鐵律: - 禁止硬編碼 IP 或密碼,嚴格讀取 .env - 所有寫入操作都必須有結構化日誌 + +C2 修正 (首席架構師審查 2026-04-10 Claude Sonnet 4.6 Asia/Taipei): + create_incident_for_approval + _extract_affected_services 從 Router 層移入此 Service 層 + 原違規: 業務邏輯 (Severity 映射, Signal 建立, Incident 建立) 放在 api/v1/webhooks.py """ import json from datetime import UTC, datetime from typing import Any, Literal +from uuid import UUID import structlog @@ -31,9 +36,144 @@ from src.models.incident import ( Severity, Signal, ) +from src.utils.timezone import now_taipei logger = structlog.get_logger(__name__) +# ============================================================================= +# C2 修正: 從 webhooks.py 遷入的業務邏輯 +# 2026-04-10 Claude Sonnet 4.6 Asia/Taipei +# ============================================================================= + +# 風險等級 → 事件嚴重度映射 (原在 webhooks.py) +_RISK_TO_SEVERITY = { + "critical": Severity.P0, + "high": Severity.P1, + "medium": Severity.P2, + "low": Severity.P3, +} + +# I1 修正: 提升為 module-level frozenset,避免每次呼叫重建 (原在 webhooks.py 函數體內) +_INFRA_JOB_NAMES: frozenset[str] = frozenset( + j.lower().replace("-", "").replace("_", "") + for j in {"node", "node-exporter", "pushgateway", "blackbox", + "prometheus", "alertmanager", "cadvisor"} +) + + +def extract_affected_services(labels: dict, target_resource: str) -> list[str]: + """ + 從告警 labels 提取真實服務名,防止 IP 或 alertname 污染 affected_services。 + + 優先序: + 1. component label(Docker-compose 層告警最可靠) + 2. job label(排除 node-exporter / pushgateway 等基礎設施 job) + 3. pod label(取 deployment name,去掉 hash suffix) + 4. target_resource(不含冒號、不等於 alertname 時才採用) + 5. 空列表(讓通用型 Playbook 透過空集合豁免規則匹配) + + Phase 1 飛輪修復 — 2026-04-10 Claude Sonnet 4.6 Asia/Taipei + C2 修正: 從 api/v1/webhooks.py 移入 Service 層(純業務邏輯,無 I/O) + """ + alertname = labels.get("alertname", "") + + if comp := labels.get("component"): + return [comp] + + if job := labels.get("job"): + normalized = job.lower().replace("-", "").replace("_", "") + if normalized not in _INFRA_JOB_NAMES: + return [job] + + if pod := labels.get("pod"): + parts = pod.rsplit("-", 2) + if len(parts) >= 3 and len(parts[-1]) == 5 and len(parts[-2]) in (9, 10): + return [parts[0]] + elif len(parts) >= 2: + return ["-".join(parts[:-1])] + + if (target_resource + and ":" not in target_resource + and target_resource != alertname + and not target_resource[0].isdigit()): + return [target_resource] + + return [] + + +async def create_incident_for_approval( + approval_id: str, + risk_level: str, + target_resource: str, + namespace: str, + alert_type: str, + message: str, + source: str = "alertmanager", + alertname: str | None = None, + alert_labels: dict | None = None, +) -> str: + """ + 為 Approval 創建對應的 Incident (活躍事件同步)。 + + 設計原則: + - Approval 和 Incident 必須同時存在 + - Incident 存入 Redis (Working Memory) + PostgreSQL (Episodic Memory) + - 7 天 TTL 自動過期 + + C2 修正: 從 api/v1/webhooks.py 移入 Service 層(業務邏輯不屬 Router 層) + + Returns: + str: Incident ID + """ + incident_service = get_incident_service() + severity = _RISK_TO_SEVERITY.get(risk_level.lower(), Severity.P2) + + _labels: dict = { + "namespace": namespace, + "resource": target_resource, + "alertname": alertname or alert_type, + **(alert_labels or {}), + } + signal = Signal( + alert_name=alertname or alert_type, + severity=severity, + source=source, + fired_at=now_taipei(), + labels=_labels, + annotations={"message": message}, + ) + + _affected_services = extract_affected_services(_labels, target_resource) + + incident = Incident( + status=IncidentStatus.INVESTIGATING, + severity=severity, + signals=[signal], + affected_services=_affected_services, + proposal_ids=[UUID(approval_id)], + ) + + await incident_service.save_to_working_memory(incident) + + try: + await incident_service.save_to_episodic_memory(incident) + except Exception as _pg_err: + logger.warning( + "incident_episodic_memory_failed", + incident_id=incident.incident_id, + error=str(_pg_err), + ) + + logger.info( + "incident_created_for_approval", + incident_id=incident.incident_id, + approval_id=approval_id, + severity=severity.value, + target=target_resource, + ) + + return incident.incident_id + # ============================================================================= # Legacy Value Normalization (方案 C - 代碼相容舊格式) diff --git a/apps/api/src/services/playbook_embedding_service.py b/apps/api/src/services/playbook_embedding_service.py index 99a0a16d..06e27390 100644 --- a/apps/api/src/services/playbook_embedding_service.py +++ b/apps/api/src/services/playbook_embedding_service.py @@ -6,17 +6,26 @@ ADR-067 延伸: Playbook 向量持久化到 PostgreSQL playbook_embeddings 表 職責: - 啟動時掃描 APPROVED Playbooks,重建 Redis 向量快取 - 同步持久化到 playbook_embeddings (pgvector) 供跨重啟使用 - - 已索引且未變更的 Playbook 跳過 (updated_at 比對) 呼叫方: main.py lifespan (asyncio.create_task — 非阻塞) 2026-04-10 Claude Sonnet 4.6 Asia/Taipei +修正 (首席架構師審查 2026-04-10): + C1: _persist_embeddings_to_db 改用 PlaybookEmbeddingRepository (積木化修復) + I2: 補齊 _persist_embeddings_to_db 型別標注 + I3: embedding 格式顯式格式化,防止 pgvector 解析錯誤 """ from __future__ import annotations +from typing import TYPE_CHECKING + import structlog +if TYPE_CHECKING: + from src.models.playbook import Playbook + from src.services.playbook_rag import PlaybookRAGService + logger = structlog.get_logger(__name__) @@ -35,7 +44,7 @@ async def ensure_playbook_embeddings_indexed() -> None: from src.services.playbook_rag import get_playbook_rag_service playbook_service = get_playbook_service() - playbooks, total = await playbook_service.list_playbooks( + playbooks, _total = await playbook_service.list_playbooks( status=PlaybookStatus.APPROVED, limit=500 ) @@ -45,15 +54,11 @@ async def ensure_playbook_embeddings_indexed() -> None: logger.info("playbook_embedding_indexing_start", count=len(playbooks)) - # Step 1: 重建 Redis 向量快取 (現有邏輯) + # Step 1: 重建 Redis 向量快取 rag_service = await get_playbook_rag_service() success, failed = await rag_service.reindex_all_playbooks(playbooks) - logger.info( - "playbook_embedding_redis_done", - success=success, - failed=failed, - ) + logger.info("playbook_embedding_redis_done", success=success, failed=failed) # Step 2: 持久化到 PostgreSQL playbook_embeddings 表 await _persist_embeddings_to_db(rag_service, playbooks) @@ -62,16 +67,26 @@ async def ensure_playbook_embeddings_indexed() -> None: logger.warning("playbook_embedding_indexing_error", error=str(e)) -async def _persist_embeddings_to_db(rag_service, playbooks) -> None: - """將 Redis 向量快取同步寫入 playbook_embeddings DB 表 (持久化層)。""" +async def _persist_embeddings_to_db( + rag_service: "PlaybookRAGService", + playbooks: "list[Playbook]", +) -> None: + """ + 將 Redis 向量快取同步寫入 playbook_embeddings DB 表 (持久化層)。 + + C1 修正: 改用 PlaybookEmbeddingRepository,Service 不直接操作 SQL。 + I3 修正: embedding 格式由 Repository 層統一處理,防止 pgvector 解析錯誤。 + """ try: - from sqlalchemy import text from src.db.base import get_db_context + from src.repositories.playbook_embedding_repository import PlaybookEmbeddingRepository persisted = 0 skipped = 0 async with get_db_context() as db: + repo = PlaybookEmbeddingRepository(db) + for playbook in playbooks: try: embedding = await rag_service.get_playbook_embedding(playbook.playbook_id) @@ -83,28 +98,16 @@ async def _persist_embeddings_to_db(rag_service, playbooks) -> None: alert_names = list(sp.alert_names) if sp else [] keywords = list(sp.keywords) if sp else [] - # UPSERT: 已存在則更新向量快照 - await db.execute( - text(""" - INSERT INTO playbook_embeddings - (playbook_id, embedding, alert_names, keywords, indexed_at, updated_at) - VALUES - (:playbook_id, :embedding, :alert_names, :keywords, - NOW(), NOW()) - ON CONFLICT (playbook_id) DO UPDATE SET - embedding = EXCLUDED.embedding, - alert_names = EXCLUDED.alert_names, - keywords = EXCLUDED.keywords, - updated_at = NOW() - """), - { - "playbook_id": playbook.playbook_id, - "embedding": str(embedding), # pgvector accepts '[x,y,...]' string - "alert_names": alert_names, - "keywords": keywords, - }, + ok = await repo.upsert( + playbook_id=playbook.playbook_id, + embedding=embedding, + alert_names=alert_names, + keywords=keywords, ) - persisted += 1 + if ok: + persisted += 1 + else: + skipped += 1 except Exception as e: logger.warning( @@ -116,11 +119,7 @@ async def _persist_embeddings_to_db(rag_service, playbooks) -> None: await db.commit() - logger.info( - "playbook_embedding_db_done", - persisted=persisted, - skipped=skipped, - ) + logger.info("playbook_embedding_db_done", persisted=persisted, skipped=skipped) except Exception as e: logger.warning("playbook_embedding_db_error", error=str(e)) diff --git a/apps/api/src/utils/similarity.py b/apps/api/src/utils/similarity.py index e8542b42..450032bf 100644 --- a/apps/api/src/utils/similarity.py +++ b/apps/api/src/utils/similarity.py @@ -42,7 +42,7 @@ def calculate_jaccard_similarity(set_a: set, set_b: set) -> float: intersection = len(set_a & set_b) union = len(set_a | set_b) - return intersection / union if union > 0 else 0.0 + return intersection / union def calculate_symptom_similarity(