From 7795f027d2c95cfd2dadc61c789c02158b89dbbd Mon Sep 17 00:00:00 2001 From: Your Name Date: Fri, 1 May 2026 20:34:33 +0800 Subject: [PATCH] fix(aiops): persist emergency intervention traces --- .agents/skills/05-awoooi-sre-qa.md | 7 + apps/api/src/services/callback_dispatcher.py | 128 +++++++++++++++++- .../services/emergency_escalation_service.py | 41 ++++++ apps/api/tests/test_callback_dispatcher.py | 31 +++++ .../test_emergency_escalation_service.py | 71 ++++++++++ docs/LOGBOOK.md | 14 ++ .../ADR-075-telegram-notification-standard.md | 14 +- 7 files changed, 297 insertions(+), 9 deletions(-) create mode 100644 apps/api/tests/test_emergency_escalation_service.py diff --git a/.agents/skills/05-awoooi-sre-qa.md b/.agents/skills/05-awoooi-sre-qa.md index 1b9b9cee..b8e0cac1 100644 --- a/.agents/skills/05-awoooi-sre-qa.md +++ b/.agents/skills/05-awoooi-sre-qa.md @@ -804,6 +804,7 @@ kubectl -n awoooi-prod logs -l app=awoooi-api --tail=50 | \ | Drift TYPE-4D | view diff, adopt, rollback, ignore | 看 diff、採納變更、回滾、忽略 | | Backup / host diagnosis | restart only when rule allows, charts/logs/details, cleanup when safe | 不得提供 K8s-only repair button 當 host/backup 主動作 | | Post-verification degraded/failed | rollback proposal, investigate, details | 不自動 rollback,需人工或 emergency AI Agent 接手 | +| SecOps authorize/isolate/block | record authorization, multi-sig gate | 不直接執行危險隔離;必須寫 Redis TTL、AOL、timeline | Regression test target: button callback names emitted by `telegram_gateway.py` must stay in sync with `callback_action_spec.yaml`; stale buttons are a @@ -815,6 +816,12 @@ registered MCP providers (`kubernetes`, `ssh_host`) before `get_provider()`. `backup_failure` cards must expose read-only diagnostics before any write action: host disk, backup jobs, and Velero backup status. +Emergency intervention is not complete until it is queryable later. Any +auto-repair-unavailable, drift-auto-adopt-blocked, or SecOps authorization path +must write both `alert_operation_log` and `timeline_events` using existing enum +values (`APPROVAL_ESCALATED` / `USER_ACTION`) unless a migration has already +landed. Telegram-only escalation is a silent learning-loop failure. + All Telegram alert lifecycle operations must use `TelegramGateway.alert_chat_id`: initial send, analyzing placeholder, delete, editMessageText, editMessageReplyMarkup, CI progress, and action-result updates. Sending the diff --git a/apps/api/src/services/callback_dispatcher.py b/apps/api/src/services/callback_dispatcher.py index aa9ac037..e90b9ddf 100644 --- a/apps/api/src/services/callback_dispatcher.py +++ b/apps/api/src/services/callback_dispatcher.py @@ -23,6 +23,7 @@ Phase 5 Sprint 5.0-5.1 — 2026-04-14 Claude Sonnet 4.6 from __future__ import annotations +import json import time from dataclasses import dataclass from functools import lru_cache @@ -264,7 +265,12 @@ async def dispatch_action( try: # internal provider: 特殊 URL builder(無 MCP call) if spec.mcp_provider == "internal": - result_text = _handle_internal_action(spec, resolved_params) + result_text = await _handle_internal_action( + spec, + resolved_params, + incident_id=incident_id, + user_id=user_id, + ) duration = (time.perf_counter() - start) * 1000 logger.info("dispatch_action_internal", action=action_name, duration_ms=round(duration, 1)) return DispatchResult( @@ -361,7 +367,13 @@ async def dispatch_action( ) -def _handle_internal_action(spec: ActionSpec, params: dict) -> str: +async def _handle_internal_action( + spec: ActionSpec, + params: dict, + *, + incident_id: str, + user_id: int | None, +) -> str: """ Internal actions — 不走 MCP,直接產生 URL/文字回覆 @@ -379,12 +391,20 @@ def _handle_internal_action(spec: ActionSpec, params: dict) -> str: return f"{spec.emoji} {spec.label}\nhttps://awoooi.wooo.work/flywheel" if tool == "record_authorization": - # Sprint 5.4 會實作真實授權記錄,這裡先返回確認 - user_id = params.get("user_id", 0) + recorded = await _record_authorization_audit( + spec=spec, + params=params, + incident_id=incident_id, + user_id=user_id, + ) + _user_id = params.get("user_id", user_id or 0) source = params.get("source", "unknown") + action = params.get("action", "authorize") + suffix = "已寫入審計與時間線" if recorded else "已受理;審計寫入將由後續補償" return ( f"{spec.emoji} {spec.label}\n" - f"已記錄 user={user_id} 授權 source={source}(24h 內同源告警將靜默)" + f"已記錄 user={_user_id} 授權 source={source} action={action}(24h 內同源告警將靜默)\n" + f"{suffix}" ) # 未知的 internal tool @@ -394,6 +414,104 @@ def _handle_internal_action(spec: ActionSpec, params: dict) -> str: ) +async def _record_authorization_audit( + *, + spec: ActionSpec, + params: dict, + incident_id: str, + user_id: int | None, +) -> bool: + """Best-effort persistence for internal authorization actions.""" + + source = str(params.get("source") or "unknown") + requested_action = str(params.get("action") or spec.name) + source_ip = str(params.get("source_ip") or "") + actor = f"telegram:{user_id or params.get('user_id') or 0}" + context = { + "action": spec.name, + "label": spec.label, + "risk": spec.risk, + "category": spec.category, + "requested_action": requested_action, + "source": source, + "source_ip": source_ip, + "user_id": user_id or params.get("user_id") or 0, + "requires_multi_sig": spec.requires_multi_sig, + } + wrote_any = False + + try: + from src.core.redis_client import get_redis + + redis = get_redis() + redis_key = f"secops:authorization:{source}" + await redis.set(redis_key, json.dumps(context, ensure_ascii=False), ex=86400) + wrote_any = True + except Exception as exc: + logger.warning( + "record_authorization_redis_failed", + incident_id=incident_id, + source=source, + error=str(exc), + ) + + try: + from src.repositories.alert_operation_log_repository import ( + get_alert_operation_log_repository, + ) + + event_type = "APPROVAL_ESCALATED" if spec.requires_multi_sig or spec.risk == "critical" else "USER_ACTION" + record = await get_alert_operation_log_repository().append( + event_type, + incident_id=incident_id, + actor=actor, + action_detail=f"telegram_authorization:{requested_action}"[:200], + success=True, + context=context, + ) + wrote_any = wrote_any or bool(record) + except Exception as exc: + logger.warning( + "record_authorization_aol_failed", + incident_id=incident_id, + source=source, + error=str(exc), + ) + + try: + from src.services.approval_db import get_timeline_service + + await get_timeline_service().add_event( + event_type="security", + status="warning" if spec.requires_multi_sig or spec.risk == "critical" else "info", + title="Telegram authorization recorded", + description=( + f"action={requested_action} source={source} source_ip={source_ip or 'unknown'}" + )[:500], + actor=actor, + actor_role="secops_authorization", + risk_level=spec.risk, + incident_id=incident_id, + ) + wrote_any = True + except Exception as exc: + logger.warning( + "record_authorization_timeline_failed", + incident_id=incident_id, + source=source, + error=str(exc), + ) + + logger.info( + "record_authorization_audit_complete", + incident_id=incident_id, + source=source, + action=requested_action, + wrote_any=wrote_any, + ) + return wrote_any + + def _format_reply( mcp_result: Any, reply_format: str, label: str, emoji: str ) -> str: diff --git a/apps/api/src/services/emergency_escalation_service.py b/apps/api/src/services/emergency_escalation_service.py index 0abcc7ee..dd38053d 100644 --- a/apps/api/src/services/emergency_escalation_service.py +++ b/apps/api/src/services/emergency_escalation_service.py @@ -119,6 +119,10 @@ async def escalate_drift_auto_adopt_blocked( return try: + from src.repositories.alert_operation_log_repository import ( + get_alert_operation_log_repository, + ) + from src.services.approval_db import get_timeline_service from src.services.telegram_gateway import get_telegram_gateway actionable_count = sum( @@ -143,6 +147,43 @@ async def escalate_drift_auto_adopt_blocked( ), group_chat_id=settings.SRE_GROUP_CHAT_ID or None, ) + await get_alert_operation_log_repository().append( + "APPROVAL_ESCALATED", + incident_id=report.report_id, + actor="drift_auto_adopt", + action_detail="drift_auto_adopt_blocked_emergency_channel", + success=True, + context={ + "namespace": report.namespace, + "reason": reason, + "high_count": report.high_count, + "medium_count": report.medium_count, + "actionable_count": actionable_count, + "intent": intent, + "confidence": confidence, + "risk": risk, + }, + ) + try: + await get_timeline_service().add_event( + event_type="agent", + status="warning", + title="Drift emergency intervention requested", + description=( + f"{reason} | namespace={report.namespace} " + f"high={report.high_count} medium={report.medium_count} " + f"intent={intent} confidence={confidence:.0%}" + )[:500], + actor="drift_auto_adopt", + actor_role="emergency_intervention", + incident_id=report.report_id, + ) + except Exception as timeline_exc: + logger.warning( + "drift_emergency_timeline_failed", + report_id=report.report_id, + error=str(timeline_exc), + ) logger.warning( "drift_auto_adopt_emergency_escalated", report_id=report.report_id, diff --git a/apps/api/tests/test_callback_dispatcher.py b/apps/api/tests/test_callback_dispatcher.py index 58c0ad49..e80eecb5 100644 --- a/apps/api/tests/test_callback_dispatcher.py +++ b/apps/api/tests/test_callback_dispatcher.py @@ -15,6 +15,7 @@ Phase 5 Sprint 5.0-5.1 Callback Dispatcher 單元測試 import pytest +from src.services import callback_dispatcher as callback_dispatcher_module from src.services.callback_dispatcher import ( dispatch_action, get_action_spec, @@ -269,6 +270,36 @@ class TestInternalActions: assert result.success is True assert "12345" in result.result_text + async def test_record_authorization_persists_audit_intent(self, monkeypatch): + captured = {} + + async def fake_record_authorization_audit(*, spec, params, incident_id, user_id): + captured["spec"] = spec + captured["params"] = params + captured["incident_id"] = incident_id + captured["user_id"] = user_id + return True + + monkeypatch.setattr( + callback_dispatcher_module, + "_record_authorization_audit", + fake_record_authorization_audit, + ) + + result = await dispatch_action( + action_name="secops_isolate", + incident_id="INC-SEC-AUTH", + user_id=67890, + labels={"instance": "192.168.0.110"}, + ) + + assert result.success is True + assert "已寫入審計與時間線" in result.result_text + assert captured["spec"].name == "secops_isolate" + assert captured["params"]["action"] == "request_network_isolation" + assert captured["incident_id"] == "INC-SEC-AUTH" + assert captured["user_id"] == 67890 + # ============================================================================= # Sprint 5.2 — MCP 呼叫失敗路徑(Provider 未註冊) diff --git a/apps/api/tests/test_emergency_escalation_service.py b/apps/api/tests/test_emergency_escalation_service.py new file mode 100644 index 00000000..5d4a0e55 --- /dev/null +++ b/apps/api/tests/test_emergency_escalation_service.py @@ -0,0 +1,71 @@ +from types import SimpleNamespace + +import pytest + +from src.services import emergency_escalation_service as service + + +@pytest.mark.asyncio +async def test_drift_emergency_escalation_writes_aol_and_timeline(monkeypatch): + sent_cards = [] + aol_calls = [] + timeline_calls = [] + + async def fake_dedup(*args, **kwargs): + return True + + class FakeGateway: + async def send_escalation_card(self, **kwargs): + sent_cards.append(kwargs) + + class FakeRepo: + async def append(self, *args, **kwargs): + aol_calls.append((args, kwargs)) + return object() + + class FakeTimeline: + async def add_event(self, **kwargs): + timeline_calls.append(kwargs) + return {"id": "timeline-1"} + + monkeypatch.setattr(service, "_dedup_first_send", fake_dedup) + monkeypatch.setattr( + "src.services.telegram_gateway.get_telegram_gateway", + lambda: FakeGateway(), + ) + monkeypatch.setattr( + "src.repositories.alert_operation_log_repository.get_alert_operation_log_repository", + lambda: FakeRepo(), + ) + monkeypatch.setattr( + "src.services.approval_db.get_timeline_service", + lambda: FakeTimeline(), + ) + + report = SimpleNamespace( + report_id="drift-123", + namespace="awoooi-prod", + high_count=1, + medium_count=2, + items=[ + SimpleNamespace(is_allowlisted=False), + SimpleNamespace(is_allowlisted=True), + ], + ) + interpretation = SimpleNamespace( + intent=SimpleNamespace(value="emergency_hotfix"), + confidence=0.72, + risk="high", + ) + + await service.escalate_drift_auto_adopt_blocked( + report=report, + reason="unsafe drift", + interpretation=interpretation, + ) + + assert sent_cards and sent_cards[0]["incident_id"] == "drift-123" + assert aol_calls and aol_calls[0][0][0] == "APPROVAL_ESCALATED" + assert aol_calls[0][1]["actor"] == "drift_auto_adopt" + assert aol_calls[0][1]["context"]["intent"] == "emergency_hotfix" + assert timeline_calls and timeline_calls[0]["actor_role"] == "emergency_intervention" diff --git a/docs/LOGBOOK.md b/docs/LOGBOOK.md index 9e6b1c76..13e3a634 100644 --- a/docs/LOGBOOK.md +++ b/docs/LOGBOOK.md @@ -6,6 +6,20 @@ --- +## 2026-05-01 | Emergency intervention 留痕閉環 + +承接 SSH/backup 自動修復閉環與 Telegram ghost-button 補洞:SecOps 隔離/封鎖按鈕已降級成授權記錄,但若只回文字、不寫 Redis/AOL/timeline,就會形成「看似有人接手,系統沒有記憶」的新斷點;drift auto-adopt 被擋時也需要同樣進 WarRoom timeline。 + +### 完成 +- `callback_dispatcher` 的 `internal.record_authorization` 改成 async 持久化:寫 `secops:authorization:{source}` 24h TTL、寫 `alert_operation_log`,並新增 `timeline_events` security warning/info。 +- 高風險或 multi-sig SecOps 授權統一用既有 `APPROVAL_ESCALATED` AOL event,避免新增 enum 造成 migration 漏洞;一般授權用 `USER_ACTION`。 +- `EmergencyEscalationService.escalate_drift_auto_adopt_blocked()` 補 AOL + timeline,config drift 無法 auto-adopt 時不再只有 Telegram 卡片。 +- 補 regression tests,鎖住「按鈕回覆文字」之外必須落到審計與處理歷程。 + +### 驗證 +- `python3 -m py_compile apps/api/src/services/callback_dispatcher.py apps/api/src/services/emergency_escalation_service.py` 通過。 +- `cd apps/api && DATABASE_URL=postgresql://test:test@localhost:5432/test pytest tests/test_callback_dispatcher.py tests/test_emergency_escalation_service.py tests/test_alertmanager_rule_bypass.py -q` → 45 passed。 + ## 2026-05-01 | SSH 自動修復閉環 + Telegram ghost-button 補洞 承接 HostBackupFailed / SSH MCP live 驗證:`backup_failure` 已進 SSH route,但實際 188 只接受 `ollama@192.168.0.188`,而 provider 仍用預設 `wooo`;同時 `DecisionManager._ssh_execute()` 使用未註冊的 `ssh_diagnose`、錯誤的 restart tool 名稱,且 SSH 失敗或只讀診斷成功時仍可能被標成自動修復完成。 diff --git a/docs/adr/ADR-075-telegram-notification-standard.md b/docs/adr/ADR-075-telegram-notification-standard.md index c355380d..6fb4dca6 100644 --- a/docs/adr/ADR-075-telegram-notification-standard.md +++ b/docs/adr/ADR-075-telegram-notification-standard.md @@ -63,9 +63,9 @@ ADR-071(2026-04-11)設計了 TYPE-1/2/3/4/4D 五種通知類型,並實作 | TYPE-3 | 需人工審核(預設)| 依 category 動態 ≤4 個 | SRE 群組 | | TYPE-4 | AI 無法判斷 | [手動記錄][查面板][忽略] | SRE 群組 | | TYPE-4D | Config Drift | [查Diff][採納][回滾][忽略] | SRE 群組 | -| TYPE-5S | 資安防禦(未來)| [隔離][封鎖IP][驅逐Pod][確認授權] | SRE 群組 | +| TYPE-5S | 資安防禦 | [隔離][封鎖IP][驅逐Pod][確認授權];危險動作先記授權/多簽 | SRE 群組 | | TYPE-6B | 業務/FinOps(未來)| [暫停][查SignOz][忽略] | SRE 群組 | -| TYPE-7E | 重大事故升級(未來)| [建立戰情室][Postmortem][DR手冊][確認接手] | SRE 群組 | +| TYPE-7E | 重大事故升級 / auto-repair unavailable | 無 ghost callback;人工/AI 接手先靠卡片與 timeline/AOL 留痕,按鈕需有 dispatcher 後才可開 | SRE 群組 | | TYPE-8M | 飛輪/告警鏈路健康 | [觸發診斷][查看面板][靜默] | SRE 群組 | ### D4:雙頻道路由規則 @@ -87,13 +87,19 @@ NOTIFICATION_TYPE_RULES = { "TYPE-3": "最多 4 個 Callback Button,依 alert_category 動態選擇", "TYPE-4": "固定 3 個按鈕:[手動記錄][查看面板][忽略]", "TYPE-4D": "固定 4 個按鈕:[查看Diff][採納][回滾][忽略]", - "TYPE-5S": "固定 4 個按鈕:[隔離][封鎖IP][驅逐Pod][確認授權]", + "TYPE-5S": "固定 4 個按鈕:[隔離][封鎖IP][驅逐Pod][確認授權],危險動作只記授權/多簽", "TYPE-6B": "最多 3 個按鈕:[暫停][查看SignOz][忽略]", - "TYPE-7E": "固定 4 個按鈕:[建立戰情室][Postmortem草稿][DR手冊][確認接手]", + "TYPE-7E": "無 ghost callback;未落地 dispatcher 前不顯示 callback button", "TYPE-8M": "固定 3 個按鈕:[觸發診斷][飛輪面板][靜默]", } ``` +2026-05-01 補充:TYPE-7E 已用於 `auto_repair_unavailable` 與 +`drift_auto_adopt_blocked` 緊急通道。Telegram 卡片本身不是閉環;每次升級 +必須寫入 `alert_operation_log` 與 `timeline_events`,讓 WarRoom、KM 與 +learning loop 能反查。TYPE-5S 的 `record_authorization` 也必須寫 Redis TTL +和 AOL/timeline;不得只回 Telegram toast。 + --- ## 實施計畫