diff --git a/apps/api/src/services/awooop_truth_chain_service.py b/apps/api/src/services/awooop_truth_chain_service.py index 53cb947d..8809c5af 100644 --- a/apps/api/src/services/awooop_truth_chain_service.py +++ b/apps/api/src/services/awooop_truth_chain_service.py @@ -97,6 +97,10 @@ def _operation_ids(automation_ops: list[dict[str, Any]]) -> list[str]: return [str(row["op_id"]) for row in automation_ops if row.get("op_id")] +def _auto_repair_ids(auto_repair_executions: list[dict[str, Any]]) -> list[str]: + return [str(row["id"]) for row in auto_repair_executions if row.get("id")] + + def build_incident_reconciliation( *, incident: dict[str, Any] | None, @@ -229,6 +233,7 @@ def _truth_status( gateway_mcp_total: int, legacy_mcp_total: int, outbound_visible_total: int, + auto_repair_executions: list[dict[str, Any]] | None = None, ) -> dict[str, Any]: """Derive the current operator-visible truth-chain stage.""" blockers: list[str] = [] @@ -249,6 +254,8 @@ def _truth_status( if incident is not None: incident_status = str(incident.get("status") or "unknown") + repair_rows = auto_repair_executions or [] + has_execution_records = bool(automation_ops or repair_rows) stage = "received" stage_status = incident_status.lower() if incident_status in {"RESOLVED", "CLOSED"}: @@ -272,7 +279,7 @@ def _truth_status( stage = "approval_required" stage_status = "waiting" needs_human = True - elif "APPROVED" in approval_statuses and not automation_ops: + elif "APPROVED" in approval_statuses and not has_execution_records: if "NO_ACTION" in approval_actions: stage = "manual_required" stage_status = "blocked" @@ -285,11 +292,12 @@ def _truth_status( blockers.append("approved_without_execution_record") op_statuses = {str(row.get("status") or "").lower() for row in automation_ops} - if op_statuses: - if op_statuses & {"success", "completed"}: + repair_successes = {row.get("success") for row in repair_rows} + if op_statuses or repair_successes: + if (op_statuses & {"success", "completed"}) or True in repair_successes: stage = "execution_succeeded" stage_status = "success" - elif op_statuses & {"failed", "error"}: + elif (op_statuses & {"failed", "error"}) or False in repair_successes: stage = "execution_failed" stage_status = "error" needs_human = True @@ -297,7 +305,7 @@ def _truth_status( stage = "execution_started" stage_status = "running" - if incident_status == "INVESTIGATING" and automation_ops == [] and approvals: + if incident_status == "INVESTIGATING" and not has_execution_records and approvals: blockers.append("incident_still_investigating_after_approval") if gateway_mcp_total == 0: @@ -315,6 +323,172 @@ def _truth_status( } +def _latest_verification_result( + incident: dict[str, Any] | None, + evidence_rows: list[dict[str, Any]], +) -> str | None: + if incident and incident.get("verification_result"): + return str(incident["verification_result"]) + for row in evidence_rows: + if row.get("verification_result"): + return str(row["verification_result"]) + return None + + +def build_automation_quality( + *, + incident: dict[str, Any] | None, + approvals: list[dict[str, Any]], + evidence_rows: list[dict[str, Any]], + automation_ops: list[dict[str, Any]], + auto_repair_executions: list[dict[str, Any]], + gateway_mcp_summary: dict[str, Any], + legacy_mcp_summary: dict[str, Any], + outbound_rows: list[dict[str, Any]], + km_entries: list[dict[str, Any]], + timeline_events: list[dict[str, Any]], +) -> dict[str, Any]: + """Summarize whether a card reached the real automation flywheel.""" + if incident is None: + return { + "schema_version": "automation_quality_v1", + "applicable": False, + "verdict": "not_applicable", + "score": 0, + "gates": [], + "facts": {}, + "blockers": [], + } + + blockers: list[str] = [] + gates: list[dict[str, Any]] = [] + + def gate(name: str, status: str, detail: str | None = None) -> None: + gates.append({"name": name, "status": status, "detail": detail}) + if status in {"failed", "missing"}: + blockers.append(name) + + evidence_attempted = sum(int(row.get("sensors_attempted") or 0) for row in evidence_rows) + evidence_succeeded = sum(int(row.get("sensors_succeeded") or 0) for row in evidence_rows) + gateway_total = int(gateway_mcp_summary.get("total") or 0) + legacy_total = int(legacy_mcp_summary.get("total") or 0) + automation_statuses = {str(row.get("status") or "").lower() for row in automation_ops} + auto_repair_successes = {row.get("success") for row in auto_repair_executions} + has_execution = bool(automation_ops or auto_repair_executions) + verification_result = _latest_verification_result(incident, evidence_rows) + approval_statuses = {str(row.get("status") or "").upper() for row in approvals} + approval_actions = " ".join(str(row.get("action") or "") for row in approvals).upper() + + gate("source_persisted", "passed", str(incident.get("incident_id"))) + gate("outbound_recorded", "passed" if outbound_rows else "missing", str(len(outbound_rows))) + if not evidence_rows: + gate("evidence_collected", "missing", "no incident_evidence rows") + elif evidence_attempted > 0 and evidence_succeeded == 0: + gate("evidence_collected", "failed", f"{evidence_succeeded}/{evidence_attempted}") + elif evidence_succeeded > 0: + gate("evidence_collected", "passed", f"{evidence_succeeded}/{evidence_attempted}") + else: + gate("evidence_collected", "warning", f"{evidence_succeeded}/{evidence_attempted}") + + if gateway_total > 0: + gate("mcp_gateway_observed", "passed", str(gateway_total)) + elif legacy_total > 0: + gate("mcp_gateway_observed", "warning", f"legacy_only={legacy_total}") + else: + gate("mcp_gateway_observed", "missing", "no mcp audit") + + if any(status in {"PENDING", "WAITING_APPROVAL"} for status in approval_statuses): + gate("approval_state", "warning", "waiting_approval") + elif "APPROVED" in approval_statuses and "NO_ACTION" in approval_actions and not has_execution: + gate("approval_state", "failed", "approved_no_action_without_execution") + elif approvals: + gate("approval_state", "passed", ",".join(sorted(approval_statuses))) + else: + gate("approval_state", "not_applicable", "no approval") + + gate("execution_recorded", "passed" if has_execution else "missing", str(len(automation_ops) + len(auto_repair_executions))) + gate("auto_repair_recorded", "passed" if auto_repair_executions else "missing", str(len(auto_repair_executions))) + + if not has_execution: + gate("verification_recorded", "not_applicable", "no execution") + elif verification_result: + gate("verification_recorded", "passed", verification_result) + else: + gate("verification_recorded", "missing", "execution without verification_result") + + if not has_execution: + gate("learning_recorded", "not_applicable", "no execution") + elif km_entries: + gate("learning_recorded", "passed", str(len(km_entries))) + else: + gate("learning_recorded", "missing", "execution without KM entry") + + gate("timeline_recorded", "passed" if timeline_events else "missing", str(len(timeline_events))) + + if has_execution and verification_result == "success": + verdict = "auto_repaired_verified" + elif has_execution and ( + False in auto_repair_successes or automation_statuses & {"failed", "error"} + ): + verdict = "execution_failed" + elif has_execution: + verdict = "execution_unverified" + elif "APPROVED" in approval_statuses and "NO_ACTION" in approval_actions: + verdict = "manual_required_no_action" + elif any(status in {"PENDING", "WAITING_APPROVAL"} for status in approval_statuses): + verdict = "approval_required" + elif evidence_rows or gateway_total or legacy_total: + verdict = "observed_not_executed" + else: + verdict = "received_only" + + score_weights = { + "source_persisted": 10, + "outbound_recorded": 10, + "evidence_collected": 15, + "mcp_gateway_observed": 15, + "approval_state": 10, + "execution_recorded": 15, + "auto_repair_recorded": 10, + "verification_recorded": 10, + "learning_recorded": 3, + "timeline_recorded": 2, + } + score = 0 + for row in gates: + weight = score_weights.get(str(row["name"]), 0) + if row["status"] == "passed": + score += weight + elif row["status"] == "not_applicable" and row["name"] == "approval_state": + score += weight + elif row["status"] == "warning": + score += weight // 2 + + return { + "schema_version": "automation_quality_v1", + "applicable": True, + "verdict": verdict, + "score": score, + "gates": gates, + "facts": { + "incident_id": incident.get("incident_id"), + "evidence_records": len(evidence_rows), + "sensors_attempted": evidence_attempted, + "sensors_succeeded": evidence_succeeded, + "mcp_gateway_total": gateway_total, + "legacy_mcp_total": legacy_total, + "approvals": len(approvals), + "automation_operation_records": len(automation_ops), + "auto_repair_execution_records": len(auto_repair_executions), + "verification_result": verification_result, + "knowledge_entries": len(km_entries), + "timeline_events": len(timeline_events), + "outbound_messages": len(outbound_rows), + }, + "blockers": blockers, + } + + def _summarize_mcp(rows: list[dict[str, Any]]) -> dict[str, Any]: by_tool: dict[str, dict[str, Any]] = {} success_count = 0 @@ -543,6 +717,7 @@ async def fetch_truth_chain(source_id: str, project_id: str = "awoooi") -> dict[ timeline_events: list[dict[str, Any]] = [] legacy_mcp_rows: list[dict[str, Any]] = [] automation_ops: list[dict[str, Any]] = [] + auto_repair_executions: list[dict[str, Any]] = [] km_entries: list[dict[str, Any]] = [] if incident is not None: @@ -686,6 +861,29 @@ async def fetch_truth_chain(source_id: str, project_id: str = "awoooi") -> dict[ """, {"incident_id": incident_id, "needle": f"%{incident_id}%", "limit": _MAX_ROWS}, ) + auto_repair_executions = await _fetch_all( + db, + """ + SELECT + id, + incident_id, + playbook_id, + playbook_name, + success, + executed_steps, + error_message, + triggered_by, + similarity_score, + risk_level, + execution_time_ms, + created_at + FROM auto_repair_executions + WHERE incident_id = :incident_id + ORDER BY created_at DESC + LIMIT :limit + """, + {"incident_id": incident_id, "limit": _MAX_ROWS}, + ) km_entries = await _fetch_all( db, """ @@ -812,6 +1010,7 @@ async def fetch_truth_chain(source_id: str, project_id: str = "awoooi") -> dict[ gateway_mcp_total=len(gateway_mcp_rows), legacy_mcp_total=legacy_mcp_summary["total"], outbound_visible_total=len(outbound_rows), + auto_repair_executions=auto_repair_executions, ) if incident is None and drift is None and not runs and gateway_mcp_rows: truth_status = { @@ -834,6 +1033,18 @@ async def fetch_truth_chain(source_id: str, project_id: str = "awoooi") -> dict[ "sensors_succeeded": sum(int(row.get("sensors_succeeded") or 0) for row in evidence_rows), "failed_tools": _failed_mcp_tools(evidence_rows), } + automation_quality = build_automation_quality( + incident=incident, + approvals=approvals, + evidence_rows=evidence_rows, + automation_ops=automation_ops, + auto_repair_executions=auto_repair_executions, + gateway_mcp_summary=gateway_mcp_summary, + legacy_mcp_summary=legacy_mcp_summary, + outbound_rows=outbound_rows, + km_entries=km_entries, + timeline_events=timeline_events, + ) result = { "project_id": project_id, @@ -847,6 +1058,7 @@ async def fetch_truth_chain(source_id: str, project_id: str = "awoooi") -> dict[ "run_ids": [row["run_id"] for row in runs], "drift_report_id": drift.get("report_id") if drift else None, "operation_ids": _operation_ids(automation_ops), + "auto_repair_execution_ids": _auto_repair_ids(auto_repair_executions), }, "incident": incident, "drift": { @@ -870,8 +1082,10 @@ async def fetch_truth_chain(source_id: str, project_id: str = "awoooi") -> dict[ }, "execution": { "automation_operation_log": automation_ops, + "auto_repair_executions": auto_repair_executions, "ansible": build_ansible_truth(automation_ops, incident=incident, drift=drift), }, + "automation_quality": automation_quality, "reconciliation": reconciliation, "learning": { "knowledge_entries": km_entries, diff --git a/apps/api/src/services/telegram_gateway.py b/apps/api/src/services/telegram_gateway.py index f00d3ef1..22aa60f1 100644 --- a/apps/api/src/services/telegram_gateway.py +++ b/apps/api/src/services/telegram_gateway.py @@ -128,6 +128,39 @@ def _format_gateway_summary_lines(summary: dict[str, object] | None) -> list[str return lines +def _format_automation_quality_lines(quality: dict[str, object] | None) -> list[str]: + if not quality or quality.get("applicable") is False: + return [] + + facts = quality.get("facts") if isinstance(quality.get("facts"), dict) else {} + blockers = quality.get("blockers") if isinstance(quality.get("blockers"), list) else [] + verdict = html.escape(str(quality.get("verdict") or "unknown")) + score = int(quality.get("score") or 0) + + lines = [ + "", + "🧪 自動化品質", + f"判定: {verdict} / {score}", + ( + "執行: " + f"auto-repair {int(facts.get('auto_repair_execution_records') or 0)} / " + f"ops {int(facts.get('automation_operation_records') or 0)} / " + f"verify {html.escape(str(facts.get('verification_result') or 'missing'))}" + ), + ( + "證據: " + f"sensors {int(facts.get('sensors_succeeded') or 0)}/" + f"{int(facts.get('sensors_attempted') or 0)} / " + f"gateway {int(facts.get('mcp_gateway_total') or 0)} / " + f"KM {int(facts.get('knowledge_entries') or 0)}" + ), + ] + + if blockers: + lines.append("缺口: " + html.escape(", ".join(str(item) for item in blockers[:4]))) + return lines + + def _sanitize_telegram_error(text: str) -> str: """遮蔽 Telegram Bot URL 中的 token,避免例外字串污染 log / trace。""" return _TELEGRAM_BOT_URL_RE.sub(r"\1", text) @@ -5140,6 +5173,9 @@ class TelegramGateway: .get("awooop_gateway") ) lines += _format_gateway_summary_lines(gateway_summary) + lines += _format_automation_quality_lines( + truth_chain.get("automation_quality") + ) except Exception as truth_exc: logger.warning( "incident_detail_truth_chain_summary_failed", diff --git a/apps/api/tests/test_awooop_truth_chain_service.py b/apps/api/tests/test_awooop_truth_chain_service.py index fcb262ca..225ecf4b 100644 --- a/apps/api/tests/test_awooop_truth_chain_service.py +++ b/apps/api/tests/test_awooop_truth_chain_service.py @@ -8,6 +8,7 @@ from src.services.awooop_ansible_audit_service import ( build_ansible_truth, ) from src.services.awooop_truth_chain_service import ( + build_automation_quality, build_incident_reconciliation, _clean_row, _summarize_gateway_mcp, @@ -223,6 +224,62 @@ def test_reconciliation_blocks_open_incident_after_no_action_approval() -> None: assert "timeline_missing_for_approval" in codes +def test_automation_quality_marks_no_action_without_execution() -> None: + quality = build_automation_quality( + incident={"incident_id": "INC-1", "status": "INVESTIGATING"}, + approvals=[{"status": "APPROVED", "action": "未知操作 | NO_ACTION"}], + evidence_rows=[{"sensors_attempted": 8, "sensors_succeeded": 0}], + automation_ops=[], + auto_repair_executions=[], + gateway_mcp_summary={"total": 8}, + legacy_mcp_summary={"total": 8}, + outbound_rows=[{"message_id": "m1"}], + km_entries=[], + timeline_events=[], + ) + + gates = {row["name"]: row["status"] for row in quality["gates"]} + assert quality["schema_version"] == "automation_quality_v1" + assert quality["verdict"] == "manual_required_no_action" + assert quality["facts"]["auto_repair_execution_records"] == 0 + assert gates["execution_recorded"] == "missing" + assert gates["verification_recorded"] == "not_applicable" + assert "execution_recorded" in quality["blockers"] + + +def test_automation_quality_marks_verified_auto_repair() -> None: + quality = build_automation_quality( + incident={ + "incident_id": "INC-2", + "status": "RESOLVED", + "verification_result": "success", + }, + approvals=[], + evidence_rows=[{"sensors_attempted": 3, "sensors_succeeded": 3}], + automation_ops=[], + auto_repair_executions=[ + { + "id": "repair-1", + "success": True, + "playbook_id": "pb-1", + } + ], + gateway_mcp_summary={"total": 3}, + legacy_mcp_summary={"total": 3}, + outbound_rows=[{"message_id": "m1"}], + km_entries=[{"id": "km-1"}], + timeline_events=[{"id": "tl-1"}], + ) + + gates = {row["name"]: row["status"] for row in quality["gates"]} + assert quality["verdict"] == "auto_repaired_verified" + assert quality["facts"]["verification_result"] == "success" + assert quality["score"] == 100 + assert gates["auto_repair_recorded"] == "passed" + assert gates["verification_recorded"] == "passed" + assert quality["blockers"] == [] + + def test_reconciliation_marks_consistent_resolved_execution() -> None: reconciliation = build_incident_reconciliation( incident={"incident_id": "INC-2", "status": "RESOLVED"}, diff --git a/apps/api/tests/test_telegram_adr050.py b/apps/api/tests/test_telegram_adr050.py index 73f5e004..c9a7fad0 100644 --- a/apps/api/tests/test_telegram_adr050.py +++ b/apps/api/tests/test_telegram_adr050.py @@ -96,11 +96,13 @@ class TestDetailMessageFormat: assert "incident.severity" in self._read_gateway() def test_detail_includes_truth_chain_gateway_summary(self): - """detail 顯示 AwoooP truth-chain / MCP Gateway 摘要""" + """detail 顯示 AwoooP truth-chain / MCP Gateway / automation quality 摘要""" source = self._read_gateway() assert "fetch_truth_chain" in source assert "_format_gateway_summary_lines" in source + assert "_format_automation_quality_lines" in source assert "MCP Gateway" in source + assert "自動化品質" in source # ============================================================================= diff --git a/docs/LOGBOOK.md b/docs/LOGBOOK.md index e8697721..82605322 100644 --- a/docs/LOGBOOK.md +++ b/docs/LOGBOOK.md @@ -7343,3 +7343,35 @@ persisted_after_rollback=0 - T12a 已部署,Telegram outbound mirror 的新資料會有 sent timestamp 與 structured source refs。 - 這仍只是「可追溯性」強化,不代表 150 筆 incident 都已 AI 自動修復;下一步要把 auto-repair/execution/verification 缺口做成可量化 quality gate。 - 目前整體進度更新:約 70%。 + +### 2026-05-13 — AwoooP truth-chain T12b:AI 自動修復品質閘門(local green) + +**目的**: + +- 讓每張 Telegram incident detail / truth-chain 都能直接回答「是否真的 AI 自動修復」、「是否有 execution record」、「是否有 verification_result」、「是否有 KM/Playbook 回寫」。 +- 避免低風險卡片只顯示 `ACTION REQUIRED` 或 AI 研判,卻看不出其實是 `NO_ACTION`、無 execution、無 verification、需人工。 + +**變更**: + +- truth-chain 新增 `automation_quality`: + - `verdict`: `auto_repaired_verified` / `execution_unverified` / `execution_failed` / `manual_required_no_action` / `approval_required` / `observed_not_executed` / `received_only` + - `score`: 0-100 可量化分數 + - gates: source / outbound / evidence / MCP Gateway / approval / execution / auto_repair / verification / learning / timeline + - facts: sensors、MCP、approval、automation_operation、auto_repair_execution、verification、KM、timeline、outbound counts +- truth-chain now fetches `auto_repair_executions` and exposes `linked_ids.auto_repair_execution_ids` plus `execution.auto_repair_executions`. +- Telegram incident detail 顯示「自動化品質」摘要,包含判定、分數、auto-repair / ops / verify / sensors / gateway / KM / 缺口。 + +**local verification**: + +```text +DATABASE_URL=postgresql+asyncpg://u:p@localhost:5432/db python -m pytest tests/test_awooop_truth_chain_service.py tests/test_telegram_adr050.py tests/test_telegram_gateway_error_sanitizer.py tests/test_channel_hub_grouped_alert_events.py -q +53 passed + +python -m ruff check --select F821 src/services/awooop_truth_chain_service.py src/services/telegram_gateway.py tests/test_awooop_truth_chain_service.py tests/test_telegram_adr050.py +All checks passed + +python -m py_compile src/services/awooop_truth_chain_service.py src/services/telegram_gateway.py tests/test_awooop_truth_chain_service.py tests/test_telegram_adr050.py +OK +``` + +**目前整體進度**:約 71%。