fix(awooop): persist signal metadata and auto-repair prestate
Some checks failed
Code Review / ai-code-review (push) Successful in 11s
CD Pipeline / tests (push) Successful in 1m14s
CD Pipeline / build-and-deploy (push) Failing after 3m33s
CD Pipeline / post-deploy-checks (push) Has been skipped

This commit is contained in:
Your Name
2026-05-18 10:59:54 +08:00
parent 5c240744eb
commit 1a2b04f5cf
8 changed files with 302 additions and 37 deletions

View File

@@ -198,6 +198,19 @@ async def _escalate_auto_repair_unavailable(
)
def _auto_repair_action_label(result, fallback_target: str) -> str:
"""Build a verifier label that includes the actual playbook steps."""
playbook_id = getattr(result, "playbook_id", None) or "unknown"
steps = getattr(result, "executed_steps", None) or []
step_text = " | ".join(str(step) for step in steps).strip()
if not step_text:
step_text = fallback_target
step_text = " ".join(step_text.split())
if len(step_text) > 240:
step_text = f"{step_text[:237]}..."
return f"auto_repair_playbook:{playbook_id} {step_text}".strip()
async def _try_auto_repair_background(
incident_id: str,
approval_id: str,
@@ -287,6 +300,46 @@ async def _try_auto_repair_background(
},
)
_pre_execution_snapshot = None
try:
from src.core.feature_flags import aiops_flags
if aiops_flags.is_sub_flag_enabled("AIOPS_P1_PRE_DECISION_INVESTIGATOR"):
from src.services.evidence_snapshot import get_latest_snapshot
from src.services.post_execution_verifier import get_post_execution_verifier
_pre_execution_snapshot = await get_latest_snapshot(incident_id)
if _pre_execution_snapshot is None:
from src.services.pre_decision_investigator import (
get_pre_decision_investigator,
)
_pre_execution_snapshot = await asyncio.wait_for(
get_pre_decision_investigator().investigate(incident),
timeout=60.0,
)
if _pre_execution_snapshot is not None:
await asyncio.wait_for(
get_post_execution_verifier().capture_pre_execution_state(
incident,
_pre_execution_snapshot,
),
timeout=30.0,
)
except asyncio.TimeoutError:
logger.warning(
"auto_repair_pre_state_capture_timeout",
incident_id=incident_id,
approval_id=approval_id,
)
except Exception as _pre_state_err:
logger.warning(
"auto_repair_pre_state_capture_failed",
incident_id=incident_id,
approval_id=approval_id,
error=str(_pre_state_err),
)
# 執行自動修復
logger.info(
"auto_repair_executing",
@@ -386,11 +439,10 @@ async def _try_auto_repair_background(
from src.services.evidence_snapshot import get_latest_snapshot
from src.services.learning_service import get_learning_service
_snapshot = await get_latest_snapshot(incident_id)
_action_label = (
f"{target_resource}:{namespace}"
if not result.success
else f"auto_repair_playbook:{result.playbook_id}"
_snapshot = _pre_execution_snapshot or await get_latest_snapshot(incident_id)
_action_label = _auto_repair_action_label(
result,
fallback_target=f"{target_resource}:{namespace}",
)
_verifier = get_post_execution_verifier()
_verify_result = await asyncio.wait_for(

View File

@@ -297,6 +297,45 @@ class EvidenceSnapshot:
)
raise
async def update_pre_execution(
self,
pre_state: dict[str, Any],
) -> None:
"""
補填執行前狀態。
Auto-repair 背景路徑會在 playbook 執行前呼叫
PostExecutionVerifier.capture_pre_execution_state(),同一份 evidence row
後續再寫入 post_execution_state / verification_result讓 truth-chain 能看出
「執行前 → 執行後」是否真的改善。
"""
self.pre_execution_state = pre_state
try:
async with get_db_context() as db:
stmt_result = await db.execute(
update(IncidentEvidence)
.where(IncidentEvidence.id == self.snapshot_id)
.values(pre_execution_state=pre_state)
)
if stmt_result.rowcount < 1:
logger.warning(
"evidence_snapshot_pre_update_no_rows",
snapshot_id=self.snapshot_id,
)
else:
logger.info(
"evidence_snapshot_pre_execution_updated",
snapshot_id=self.snapshot_id,
)
except Exception:
logger.exception(
"evidence_snapshot_pre_update_error",
snapshot_id=self.snapshot_id,
)
raise
async def update_self_healing(
self,
score: float,

View File

@@ -25,11 +25,106 @@ from src.core.redis_client import get_redis
from src.db.base import get_db_context
from src.db.models import IncidentRecord
from src.models.incident import Incident
from src.utils.incident_converter import brain_to_local
logger = structlog.get_logger(__name__)
def _signal_to_dict(signal: Any) -> dict[str, Any]:
"""Normalize brain/local Signal objects and raw dicts into one shape."""
if isinstance(signal, dict):
return signal
if hasattr(signal, "model_dump"):
return signal.model_dump(mode="json")
return {
"alert_name": getattr(signal, "alert_name", None),
"severity": getattr(signal, "severity", None),
"source": getattr(signal, "source", None),
"labels": getattr(signal, "labels", None) or {},
"annotations": getattr(signal, "annotations", None) or {},
"fingerprint": getattr(signal, "fingerprint", None),
}
def _derive_incident_alert_metadata(incident: Incident) -> dict[str, Any]:
"""Derive alert metadata for incidents saved through the lewooogo bridge."""
first_signal = incident.signals[0] if incident.signals else None
signal = _signal_to_dict(first_signal) if first_signal else {}
labels = signal.get("labels") or {}
annotations = signal.get("annotations") or {}
alertname = (
labels.get("alertname")
or signal.get("alert_name")
or signal.get("alertname")
or ""
)
severity = (
signal.get("severity")
or getattr(incident.severity, "value", incident.severity)
or labels.get("severity")
or "warning"
)
severity = getattr(severity, "value", severity)
alert_category = None
notification_type = None
if alertname:
from src.services.incident_service import classify_alert_early
alert_category, notification_type = classify_alert_early(
str(alertname),
str(severity),
labels,
)
description = (
annotations.get("message")
or annotations.get("description")
or annotations.get("summary")
or ""
)
return {
"alertname": str(alertname) if alertname else None,
"severity": str(severity) if severity else None,
"alert_category": alert_category,
"notification_type": notification_type,
"description": str(description) if description else None,
"actor": signal.get("source") or labels.get("source") or "signal_worker",
}
async def _add_signal_timeline_event(
incident: Incident,
metadata: dict[str, Any],
) -> None:
"""Best-effort timeline seed for incidents created outside Alertmanager."""
alertname = metadata.get("alertname")
if not alertname:
return
try:
from src.services.approval_db import get_timeline_service
await get_timeline_service().add_event(
event_type="webhook",
status="success",
title=f"Signal received: {alertname}",
description=metadata.get("description"),
actor=metadata.get("actor"),
actor_role="signal_worker",
risk_level=getattr(incident.severity, "value", incident.severity),
incident_id=incident.incident_id,
)
except Exception as exc:
logger.warning(
"incident_signal_timeline_seed_failed",
incident_id=incident.incident_id,
alertname=alertname,
error=str(exc),
)
# =============================================================================
# Phase 16: IncidentDbAdapter (DI 注入實現)
# =============================================================================
@@ -63,6 +158,8 @@ class IncidentDbAdapter:
async def save(self, incident: Incident) -> bool:
"""儲存 Incident 到 PostgreSQL (upsert)"""
metadata = _derive_incident_alert_metadata(incident)
created = False
try:
async with get_db_context() as db:
from sqlalchemy import select
@@ -85,6 +182,12 @@ class IncidentDbAdapter:
existing.resolved_at = incident.resolved_at
if incident.closed_at:
existing.closed_at = incident.closed_at
if metadata.get("alertname") and not existing.alertname:
existing.alertname = metadata["alertname"]
if metadata.get("notification_type") and not existing.notification_type:
existing.notification_type = metadata["notification_type"]
if metadata.get("alert_category") and not existing.alert_category:
existing.alert_category = metadata["alert_category"]
else:
record = IncidentRecord(
incident_id=incident.incident_id,
@@ -111,9 +214,15 @@ class IncidentDbAdapter:
closed_at=incident.closed_at,
ttl_days=getattr(incident, 'ttl_days', 30),
vectorized=getattr(incident, 'vectorized', False),
alertname=metadata.get("alertname"),
notification_type=metadata.get("notification_type"),
alert_category=metadata.get("alert_category"),
)
db.add(record)
created = True
if created:
await _add_signal_timeline_event(incident, metadata)
logger.debug("db_adapter_save_success", incident_id=incident.incident_id)
return True

View File

@@ -208,10 +208,28 @@ class PostExecutionVerifier:
timeout=TOOL_TIMEOUT_SEC,
)
snapshot.pre_execution_state = state
try:
await snapshot.update_pre_execution(state)
except Exception as exc:
logger.warning(
"verifier_pre_state_persist_failed",
incident_id=incident_id,
snapshot_id=snapshot.snapshot_id,
error=str(exc),
)
logger.debug("verifier_pre_state_captured", incident_id=incident_id)
except Exception:
logger.warning("verifier_pre_state_failed", incident_id=incident_id)
snapshot.pre_execution_state = {}
try:
await snapshot.update_pre_execution({})
except Exception as exc:
logger.warning(
"verifier_empty_pre_state_persist_failed",
incident_id=incident_id,
snapshot_id=snapshot.snapshot_id,
error=str(exc),
)
async def _collect_post_state(self, incident: "Incident") -> dict[str, Any]:
"""

View File

@@ -0,0 +1,32 @@
from __future__ import annotations
from types import SimpleNamespace
from src.models.incident import Severity
from src.services.incident_memory import _derive_incident_alert_metadata
def test_signal_worker_incident_metadata_uses_signal_alert_name() -> None:
incident = SimpleNamespace(
incident_id="INC-TEST",
severity=Severity.P2,
signals=[
SimpleNamespace(
alert_name="HostErrorLogFlood",
severity=Severity.P2,
source="journal",
labels={"error_count": "29"},
annotations={"summary": "29 ERROR log entries in last 5min"},
fingerprint="b000a87cfe4bc658",
)
],
)
metadata = _derive_incident_alert_metadata(incident)
assert metadata["alertname"] == "HostErrorLogFlood"
assert metadata["alert_category"] == "host_resource"
assert metadata["notification_type"] == "TYPE-3"
assert metadata["description"] == "29 ERROR log entries in last 5min"
assert metadata["actor"] == "journal"

View File

@@ -122,6 +122,20 @@ class TestAssessRecovery:
post = {"status": "Running"}
assert _assess_recovery(pre, post, "restart_service:api") == "success"
def test_rollout_restart_step_success_when_pre_and_post_running(self):
"""auto-repair label 必須帶實際 rollout restart 步驟,才能驗證為真修復。"""
pre = {"status": "Running"}
post = {"status": "Running"}
action = "auto_repair_playbook:PB-TEST kubectl rollout restart deployment/api"
assert _assess_recovery(pre, post, action) == "success"
def test_diagnosis_only_step_is_not_verified_repair(self):
"""診斷型 playbook 即使 post 狀態健康,也不能被算成 verified repair。"""
pre = {"status": "Running"}
post = {"status": "Running"}
action = "auto_repair_playbook:PB-TEST mcp:ssh_diagnose docker stats"
assert _assess_recovery(pre, post, action) == "degraded"
def test_pre_running_post_running_delete_is_success(self):
"""kubectl delete 動作,前後都 Running → success"""
pre = {"status": "Running"}

View File

@@ -0,0 +1,28 @@
from __future__ import annotations
from types import SimpleNamespace
from src.api.v1.webhooks import _auto_repair_action_label
def test_auto_repair_action_label_includes_executed_steps() -> None:
result = SimpleNamespace(
playbook_id="PB-TEST",
executed_steps=["kubectl rollout restart deployment/api -n awoooi-prod"],
)
label = _auto_repair_action_label(result, fallback_target="api:awoooi-prod")
assert label == (
"auto_repair_playbook:PB-TEST "
"kubectl rollout restart deployment/api -n awoooi-prod"
)
def test_auto_repair_action_label_uses_target_when_steps_missing() -> None:
result = SimpleNamespace(playbook_id="PB-TEST", executed_steps=[])
label = _auto_repair_action_label(result, fallback_target="api:awoooi-prod")
assert label == "auto_repair_playbook:PB-TEST api:awoooi-prod"

View File

@@ -17,35 +17,7 @@ sys.path.insert(0, "/app")
from sqlalchemy import text
from src.db.base import get_db_context
def _classify_alert(alertname: str, severity: str) -> tuple[str, str]:
"""Python 版分類邏輯,與 classify_alert_early() 保持一致 (ADR-075 更新)"""
alertname_lower = alertname.lower()
if alertname in ("ConfigurationDrift", "KubeConfigDrift"):
return "config_drift", "TYPE-4D"
if severity in ("info", "none"):
return "info", "TYPE-1"
if "watchdog" in alertname_lower or alertname in ("Heartbeat",):
return "backup", "TYPE-1"
# ADR-075 新增: SecOps 優先
if any(alertname.startswith(p) for p in ("UnauthorizedSSH", "KubeAudit", "CVECritical", "WAFAttack", "PodAbnormal", "SecurityBreach")):
return "secops", "TYPE-5S"
# ADR-075 新增: Flywheel/META
if alertname in ("AutoRepairLowSuccessRate", "PermanentFixRequired") or any(
alertname.startswith(p) for p in ("Flywheel", "MCPProvider", "OllamaDown", "NemotronDown")
):
return "flywheel_health", "TYPE-8M"
# ADR-075 新增: Business/FinOps
if any(alertname.startswith(p) for p in ("AITokenCost", "GeminiAPIError", "SLOBurn", "APIErrorBudget", "MomoScraper", "ScraperSuccess")):
return "business", "TYPE-6B"
if alertname.startswith(("Docker", "Host")):
return "infrastructure", "TYPE-3"
if alertname.startswith(("Kube", "Pod", "Deploy", "Node", "Velero", "ArgoCD")):
return "kubernetes", "TYPE-3"
if alertname.startswith(("Postgres", "Redis")):
return "database", "TYPE-3"
return "general", "TYPE-3"
from src.services.incident_service import classify_alert_early
async def main() -> None:
@@ -95,9 +67,10 @@ async def main() -> None:
updated = 0
for row in rows:
alert_category, notification_type = _classify_alert(
alert_category, notification_type = classify_alert_early(
alertname=row.alertname or "",
severity=row.severity or "warning",
labels={},
)
await db.execute(text("""
UPDATE incidents
@@ -123,7 +96,7 @@ async def main() -> None:
FROM incidents
"""))
f = final_r.fetchone()
print(f"\n最終 NULL 統計:")
print("\n最終 NULL 統計:")
print(f" alertname NULL: {f.alertname_null}")
print(f" notification_type NULL: {f.notification_type_null}")
print(f" alert_category NULL: {f.alert_category_null}")