diff --git a/apps/api/src/api/v1/webhooks.py b/apps/api/src/api/v1/webhooks.py index d099f7d4..17899ec2 100644 --- a/apps/api/src/api/v1/webhooks.py +++ b/apps/api/src/api/v1/webhooks.py @@ -198,6 +198,19 @@ async def _escalate_auto_repair_unavailable( ) +def _auto_repair_action_label(result, fallback_target: str) -> str: + """Build a verifier label that includes the actual playbook steps.""" + playbook_id = getattr(result, "playbook_id", None) or "unknown" + steps = getattr(result, "executed_steps", None) or [] + step_text = " | ".join(str(step) for step in steps).strip() + if not step_text: + step_text = fallback_target + step_text = " ".join(step_text.split()) + if len(step_text) > 240: + step_text = f"{step_text[:237]}..." + return f"auto_repair_playbook:{playbook_id} {step_text}".strip() + + async def _try_auto_repair_background( incident_id: str, approval_id: str, @@ -287,6 +300,46 @@ async def _try_auto_repair_background( }, ) + _pre_execution_snapshot = None + try: + from src.core.feature_flags import aiops_flags + + if aiops_flags.is_sub_flag_enabled("AIOPS_P1_PRE_DECISION_INVESTIGATOR"): + from src.services.evidence_snapshot import get_latest_snapshot + from src.services.post_execution_verifier import get_post_execution_verifier + + _pre_execution_snapshot = await get_latest_snapshot(incident_id) + if _pre_execution_snapshot is None: + from src.services.pre_decision_investigator import ( + get_pre_decision_investigator, + ) + + _pre_execution_snapshot = await asyncio.wait_for( + get_pre_decision_investigator().investigate(incident), + timeout=60.0, + ) + if _pre_execution_snapshot is not None: + await asyncio.wait_for( + get_post_execution_verifier().capture_pre_execution_state( + incident, + _pre_execution_snapshot, + ), + timeout=30.0, + ) + except asyncio.TimeoutError: + logger.warning( + "auto_repair_pre_state_capture_timeout", + incident_id=incident_id, + approval_id=approval_id, + ) + except Exception as _pre_state_err: + logger.warning( + "auto_repair_pre_state_capture_failed", + incident_id=incident_id, + approval_id=approval_id, + error=str(_pre_state_err), + ) + # 執行自動修復 logger.info( "auto_repair_executing", @@ -386,11 +439,10 @@ async def _try_auto_repair_background( from src.services.evidence_snapshot import get_latest_snapshot from src.services.learning_service import get_learning_service - _snapshot = await get_latest_snapshot(incident_id) - _action_label = ( - f"{target_resource}:{namespace}" - if not result.success - else f"auto_repair_playbook:{result.playbook_id}" + _snapshot = _pre_execution_snapshot or await get_latest_snapshot(incident_id) + _action_label = _auto_repair_action_label( + result, + fallback_target=f"{target_resource}:{namespace}", ) _verifier = get_post_execution_verifier() _verify_result = await asyncio.wait_for( diff --git a/apps/api/src/services/evidence_snapshot.py b/apps/api/src/services/evidence_snapshot.py index 7d10ebfd..0bfaef91 100644 --- a/apps/api/src/services/evidence_snapshot.py +++ b/apps/api/src/services/evidence_snapshot.py @@ -297,6 +297,45 @@ class EvidenceSnapshot: ) raise + async def update_pre_execution( + self, + pre_state: dict[str, Any], + ) -> None: + """ + 補填執行前狀態。 + + Auto-repair 背景路徑會在 playbook 執行前呼叫 + PostExecutionVerifier.capture_pre_execution_state(),同一份 evidence row + 後續再寫入 post_execution_state / verification_result,讓 truth-chain 能看出 + 「執行前 → 執行後」是否真的改善。 + """ + self.pre_execution_state = pre_state + + try: + async with get_db_context() as db: + stmt_result = await db.execute( + update(IncidentEvidence) + .where(IncidentEvidence.id == self.snapshot_id) + .values(pre_execution_state=pre_state) + ) + + if stmt_result.rowcount < 1: + logger.warning( + "evidence_snapshot_pre_update_no_rows", + snapshot_id=self.snapshot_id, + ) + else: + logger.info( + "evidence_snapshot_pre_execution_updated", + snapshot_id=self.snapshot_id, + ) + except Exception: + logger.exception( + "evidence_snapshot_pre_update_error", + snapshot_id=self.snapshot_id, + ) + raise + async def update_self_healing( self, score: float, diff --git a/apps/api/src/services/incident_memory.py b/apps/api/src/services/incident_memory.py index 48ae8ce5..750cd101 100644 --- a/apps/api/src/services/incident_memory.py +++ b/apps/api/src/services/incident_memory.py @@ -25,11 +25,106 @@ from src.core.redis_client import get_redis from src.db.base import get_db_context from src.db.models import IncidentRecord from src.models.incident import Incident -from src.utils.incident_converter import brain_to_local logger = structlog.get_logger(__name__) +def _signal_to_dict(signal: Any) -> dict[str, Any]: + """Normalize brain/local Signal objects and raw dicts into one shape.""" + if isinstance(signal, dict): + return signal + if hasattr(signal, "model_dump"): + return signal.model_dump(mode="json") + return { + "alert_name": getattr(signal, "alert_name", None), + "severity": getattr(signal, "severity", None), + "source": getattr(signal, "source", None), + "labels": getattr(signal, "labels", None) or {}, + "annotations": getattr(signal, "annotations", None) or {}, + "fingerprint": getattr(signal, "fingerprint", None), + } + + +def _derive_incident_alert_metadata(incident: Incident) -> dict[str, Any]: + """Derive alert metadata for incidents saved through the lewooogo bridge.""" + first_signal = incident.signals[0] if incident.signals else None + signal = _signal_to_dict(first_signal) if first_signal else {} + labels = signal.get("labels") or {} + annotations = signal.get("annotations") or {} + + alertname = ( + labels.get("alertname") + or signal.get("alert_name") + or signal.get("alertname") + or "" + ) + severity = ( + signal.get("severity") + or getattr(incident.severity, "value", incident.severity) + or labels.get("severity") + or "warning" + ) + severity = getattr(severity, "value", severity) + + alert_category = None + notification_type = None + if alertname: + from src.services.incident_service import classify_alert_early + + alert_category, notification_type = classify_alert_early( + str(alertname), + str(severity), + labels, + ) + + description = ( + annotations.get("message") + or annotations.get("description") + or annotations.get("summary") + or "" + ) + + return { + "alertname": str(alertname) if alertname else None, + "severity": str(severity) if severity else None, + "alert_category": alert_category, + "notification_type": notification_type, + "description": str(description) if description else None, + "actor": signal.get("source") or labels.get("source") or "signal_worker", + } + + +async def _add_signal_timeline_event( + incident: Incident, + metadata: dict[str, Any], +) -> None: + """Best-effort timeline seed for incidents created outside Alertmanager.""" + alertname = metadata.get("alertname") + if not alertname: + return + + try: + from src.services.approval_db import get_timeline_service + + await get_timeline_service().add_event( + event_type="webhook", + status="success", + title=f"Signal received: {alertname}", + description=metadata.get("description"), + actor=metadata.get("actor"), + actor_role="signal_worker", + risk_level=getattr(incident.severity, "value", incident.severity), + incident_id=incident.incident_id, + ) + except Exception as exc: + logger.warning( + "incident_signal_timeline_seed_failed", + incident_id=incident.incident_id, + alertname=alertname, + error=str(exc), + ) + + # ============================================================================= # Phase 16: IncidentDbAdapter (DI 注入實現) # ============================================================================= @@ -63,6 +158,8 @@ class IncidentDbAdapter: async def save(self, incident: Incident) -> bool: """儲存 Incident 到 PostgreSQL (upsert)""" + metadata = _derive_incident_alert_metadata(incident) + created = False try: async with get_db_context() as db: from sqlalchemy import select @@ -85,6 +182,12 @@ class IncidentDbAdapter: existing.resolved_at = incident.resolved_at if incident.closed_at: existing.closed_at = incident.closed_at + if metadata.get("alertname") and not existing.alertname: + existing.alertname = metadata["alertname"] + if metadata.get("notification_type") and not existing.notification_type: + existing.notification_type = metadata["notification_type"] + if metadata.get("alert_category") and not existing.alert_category: + existing.alert_category = metadata["alert_category"] else: record = IncidentRecord( incident_id=incident.incident_id, @@ -111,9 +214,15 @@ class IncidentDbAdapter: closed_at=incident.closed_at, ttl_days=getattr(incident, 'ttl_days', 30), vectorized=getattr(incident, 'vectorized', False), + alertname=metadata.get("alertname"), + notification_type=metadata.get("notification_type"), + alert_category=metadata.get("alert_category"), ) db.add(record) + created = True + if created: + await _add_signal_timeline_event(incident, metadata) logger.debug("db_adapter_save_success", incident_id=incident.incident_id) return True diff --git a/apps/api/src/services/post_execution_verifier.py b/apps/api/src/services/post_execution_verifier.py index 5d0eb7d4..9a45b1ec 100644 --- a/apps/api/src/services/post_execution_verifier.py +++ b/apps/api/src/services/post_execution_verifier.py @@ -208,10 +208,28 @@ class PostExecutionVerifier: timeout=TOOL_TIMEOUT_SEC, ) snapshot.pre_execution_state = state + try: + await snapshot.update_pre_execution(state) + except Exception as exc: + logger.warning( + "verifier_pre_state_persist_failed", + incident_id=incident_id, + snapshot_id=snapshot.snapshot_id, + error=str(exc), + ) logger.debug("verifier_pre_state_captured", incident_id=incident_id) except Exception: logger.warning("verifier_pre_state_failed", incident_id=incident_id) snapshot.pre_execution_state = {} + try: + await snapshot.update_pre_execution({}) + except Exception as exc: + logger.warning( + "verifier_empty_pre_state_persist_failed", + incident_id=incident_id, + snapshot_id=snapshot.snapshot_id, + error=str(exc), + ) async def _collect_post_state(self, incident: "Incident") -> dict[str, Any]: """ diff --git a/apps/api/tests/test_incident_memory_adapter.py b/apps/api/tests/test_incident_memory_adapter.py new file mode 100644 index 00000000..0ce7a60e --- /dev/null +++ b/apps/api/tests/test_incident_memory_adapter.py @@ -0,0 +1,32 @@ +from __future__ import annotations + +from types import SimpleNamespace + +from src.models.incident import Severity +from src.services.incident_memory import _derive_incident_alert_metadata + + +def test_signal_worker_incident_metadata_uses_signal_alert_name() -> None: + incident = SimpleNamespace( + incident_id="INC-TEST", + severity=Severity.P2, + signals=[ + SimpleNamespace( + alert_name="HostErrorLogFlood", + severity=Severity.P2, + source="journal", + labels={"error_count": "29"}, + annotations={"summary": "29 ERROR log entries in last 5min"}, + fingerprint="b000a87cfe4bc658", + ) + ], + ) + + metadata = _derive_incident_alert_metadata(incident) + + assert metadata["alertname"] == "HostErrorLogFlood" + assert metadata["alert_category"] == "host_resource" + assert metadata["notification_type"] == "TYPE-3" + assert metadata["description"] == "29 ERROR log entries in last 5min" + assert metadata["actor"] == "journal" + diff --git a/apps/api/tests/test_post_execution_verifier.py b/apps/api/tests/test_post_execution_verifier.py index 273dc30a..28bdb343 100644 --- a/apps/api/tests/test_post_execution_verifier.py +++ b/apps/api/tests/test_post_execution_verifier.py @@ -122,6 +122,20 @@ class TestAssessRecovery: post = {"status": "Running"} assert _assess_recovery(pre, post, "restart_service:api") == "success" + def test_rollout_restart_step_success_when_pre_and_post_running(self): + """auto-repair label 必須帶實際 rollout restart 步驟,才能驗證為真修復。""" + pre = {"status": "Running"} + post = {"status": "Running"} + action = "auto_repair_playbook:PB-TEST kubectl rollout restart deployment/api" + assert _assess_recovery(pre, post, action) == "success" + + def test_diagnosis_only_step_is_not_verified_repair(self): + """診斷型 playbook 即使 post 狀態健康,也不能被算成 verified repair。""" + pre = {"status": "Running"} + post = {"status": "Running"} + action = "auto_repair_playbook:PB-TEST mcp:ssh_diagnose docker stats" + assert _assess_recovery(pre, post, action) == "degraded" + def test_pre_running_post_running_delete_is_success(self): """kubectl delete 動作,前後都 Running → success""" pre = {"status": "Running"} diff --git a/apps/api/tests/test_webhooks_auto_repair_labels.py b/apps/api/tests/test_webhooks_auto_repair_labels.py new file mode 100644 index 00000000..1467ae7f --- /dev/null +++ b/apps/api/tests/test_webhooks_auto_repair_labels.py @@ -0,0 +1,28 @@ +from __future__ import annotations + +from types import SimpleNamespace + +from src.api.v1.webhooks import _auto_repair_action_label + + +def test_auto_repair_action_label_includes_executed_steps() -> None: + result = SimpleNamespace( + playbook_id="PB-TEST", + executed_steps=["kubectl rollout restart deployment/api -n awoooi-prod"], + ) + + label = _auto_repair_action_label(result, fallback_target="api:awoooi-prod") + + assert label == ( + "auto_repair_playbook:PB-TEST " + "kubectl rollout restart deployment/api -n awoooi-prod" + ) + + +def test_auto_repair_action_label_uses_target_when_steps_missing() -> None: + result = SimpleNamespace(playbook_id="PB-TEST", executed_steps=[]) + + label = _auto_repair_action_label(result, fallback_target="api:awoooi-prod") + + assert label == "auto_repair_playbook:PB-TEST api:awoooi-prod" + diff --git a/scripts/backfill_alertname.py b/scripts/backfill_alertname.py index 60a63d1f..b8b0561f 100644 --- a/scripts/backfill_alertname.py +++ b/scripts/backfill_alertname.py @@ -17,35 +17,7 @@ sys.path.insert(0, "/app") from sqlalchemy import text from src.db.base import get_db_context - - -def _classify_alert(alertname: str, severity: str) -> tuple[str, str]: - """Python 版分類邏輯,與 classify_alert_early() 保持一致 (ADR-075 更新)""" - alertname_lower = alertname.lower() - if alertname in ("ConfigurationDrift", "KubeConfigDrift"): - return "config_drift", "TYPE-4D" - if severity in ("info", "none"): - return "info", "TYPE-1" - if "watchdog" in alertname_lower or alertname in ("Heartbeat",): - return "backup", "TYPE-1" - # ADR-075 新增: SecOps 優先 - if any(alertname.startswith(p) for p in ("UnauthorizedSSH", "KubeAudit", "CVECritical", "WAFAttack", "PodAbnormal", "SecurityBreach")): - return "secops", "TYPE-5S" - # ADR-075 新增: Flywheel/META - if alertname in ("AutoRepairLowSuccessRate", "PermanentFixRequired") or any( - alertname.startswith(p) for p in ("Flywheel", "MCPProvider", "OllamaDown", "NemotronDown") - ): - return "flywheel_health", "TYPE-8M" - # ADR-075 新增: Business/FinOps - if any(alertname.startswith(p) for p in ("AITokenCost", "GeminiAPIError", "SLOBurn", "APIErrorBudget", "MomoScraper", "ScraperSuccess")): - return "business", "TYPE-6B" - if alertname.startswith(("Docker", "Host")): - return "infrastructure", "TYPE-3" - if alertname.startswith(("Kube", "Pod", "Deploy", "Node", "Velero", "ArgoCD")): - return "kubernetes", "TYPE-3" - if alertname.startswith(("Postgres", "Redis")): - return "database", "TYPE-3" - return "general", "TYPE-3" +from src.services.incident_service import classify_alert_early async def main() -> None: @@ -95,9 +67,10 @@ async def main() -> None: updated = 0 for row in rows: - alert_category, notification_type = _classify_alert( + alert_category, notification_type = classify_alert_early( alertname=row.alertname or "", severity=row.severity or "warning", + labels={}, ) await db.execute(text(""" UPDATE incidents @@ -123,7 +96,7 @@ async def main() -> None: FROM incidents """)) f = final_r.fetchone() - print(f"\n最終 NULL 統計:") + print("\n最終 NULL 統計:") print(f" alertname NULL: {f.alertname_null}") print(f" notification_type NULL: {f.notification_type_null}") print(f" alert_category NULL: {f.alert_category_null}")