feat(adr-071): 告警通知四類型第一批 B/C/E/F/G/H 全實作
Some checks failed
CD Pipeline / build-and-deploy (push) Has been cancelled
Type Sync Check / check-type-sync (push) Failing after 1m7s

ADR-071-B: classify_notification() — 五型分類器 (TYPE-1/2/3/4/4D)
ADR-071-C: send_info_notification() — TYPE-1 純資訊無按鈕卡片
ADR-071-E: _build_inline_keyboard() — 依 alert_category 動態組合 TYPE-3 按鈕
ADR-071-F: send_drift_card() — TYPE-4D Config Drift 卡片 + Diff 截斷
ADR-071-G: km_conversion_service.py — Incident RESOLVED 自動轉 KM
ADR-071-H: handle_manual_fix_done() — TYPE-4 手動修復 Bot 對話閉環

前批已完成: ADR-071-A (DB Migration) + ADR-071-D (狀態機守衛)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
OG T
2026-04-11 02:24:20 +08:00
parent 45b13f1d7c
commit 325b3851b5
4 changed files with 894 additions and 22 deletions

View File

@@ -0,0 +1,95 @@
-- ADR-071-A: 告警通知四類型 + 全生命週期 DB 記錄
-- 建立時間: 2026-04-11 (台北時區)
-- 建立者: Claude Sonnet 4.6 — ADR-071 第一批
--
-- 設計說明:
-- 在現有表上補充欄位,不新建表
-- PgEnum ADD VALUE 必須在獨立 transaction 執行(不能在同一 tx 內使用新值)
--
-- 執行順序:
-- Step 1: PgEnum 新增值(獨立 transaction
-- Step 2: incidents 表新增 7 個欄位
-- Step 3: 驗收查詢
-- ============================================================================
-- Step 1: alert_event_type PgEnum 新增 5 個值
-- 注意: ADD VALUE IF NOT EXISTS 是 idempotent重複執行安全
-- 注意: 每個 ADD VALUE 必須在獨立 transaction不能批次
-- ============================================================================
-- 分類通知事件
ALTER TYPE alert_event_type ADD VALUE IF NOT EXISTS 'NOTIFICATION_CLASSIFIED';
-- 手動修復記錄
ALTER TYPE alert_event_type ADD VALUE IF NOT EXISTS 'MANUAL_FIX_RECORDED';
-- KM 轉換完成
ALTER TYPE alert_event_type ADD VALUE IF NOT EXISTS 'KM_CONVERTED';
-- Playbook 草稿建立
ALTER TYPE alert_event_type ADD VALUE IF NOT EXISTS 'PLAYBOOK_DRAFT_CREATED';
-- 狀態機守衛攔截
ALTER TYPE alert_event_type ADD VALUE IF NOT EXISTS 'STATE_GUARD_BLOCKED';
-- ============================================================================
-- Step 2: incidents 表新增 7 個欄位
-- 注意: ADD COLUMN IF NOT EXISTS 是 idempotent重複執行安全
-- ============================================================================
-- 通知類型記錄 (TYPE-1/2/3/4/4D)
ALTER TABLE incidents
ADD COLUMN IF NOT EXISTS notification_type VARCHAR(10);
-- 告警類別(決定 TYPE-3 按鈕組合)
ALTER TABLE incidents
ADD COLUMN IF NOT EXISTS alert_category VARCHAR(50);
-- MCP 情報收集快照執行前Sprint A 完成後由 MCP Phase 2 填充)
ALTER TABLE incidents
ADD COLUMN IF NOT EXISTS context_bundle JSONB;
-- 指標快照執行前Prometheus MCP 採集)— ADR-071-I 使用
ALTER TABLE incidents
ADD COLUMN IF NOT EXISTS metrics_before JSONB;
-- 指標快照執行後Prometheus MCP 採集)— ADR-071-I 使用
ALTER TABLE incidents
ADD COLUMN IF NOT EXISTS metrics_after JSONB;
-- 執行驗證結果K8s MCP watch_rollout 結果)— ADR-071-J 使用
ALTER TABLE incidents
ADD COLUMN IF NOT EXISTS verification_result JSONB;
-- 手動修復步驟TYPE-4 使用者輸入)
ALTER TABLE incidents
ADD COLUMN IF NOT EXISTS manual_fix_steps TEXT;
ALTER TABLE incidents
ADD COLUMN IF NOT EXISTS manual_fix_by VARCHAR(100);
-- ============================================================================
-- Step 3: 驗收查詢(執行後確認欄位存在)
-- ============================================================================
-- 確認 incidents 新欄位
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_name = 'incidents'
AND column_name IN (
'notification_type', 'alert_category', 'context_bundle',
'metrics_before', 'metrics_after', 'verification_result',
'manual_fix_steps', 'manual_fix_by'
)
ORDER BY column_name;
-- 確認 alert_event_type 新值
SELECT enumlabel
FROM pg_enum
JOIN pg_type ON pg_enum.enumtypid = pg_type.oid
WHERE pg_type.typname = 'alert_event_type'
AND enumlabel IN (
'NOTIFICATION_CLASSIFIED', 'MANUAL_FIX_RECORDED',
'KM_CONVERTED', 'PLAYBOOK_DRAFT_CREATED', 'STATE_GUARD_BLOCKED'
)
ORDER BY enumlabel;

View File

@@ -439,6 +439,16 @@ class Incident(BaseModel):
description="是否已向量化到 Vector DB (Semantic Memory)",
)
# ADR-071-A: 告警通知四類型 + 全生命週期 DB 記錄 (2026-04-11 Claude Sonnet 4.6)
notification_type: str | None = Field(None, description="通知類型 TYPE-1/2/3/4/4D")
alert_category: str | None = Field(None, description="告警類別 k8s_workload/database/host_resource/...")
context_bundle: dict | None = Field(None, description="MCP 情報收集快照(執行前)")
metrics_before: dict | None = Field(None, description="指標快照執行前Prometheus MCP")
metrics_after: dict | None = Field(None, description="指標快照執行後Prometheus MCP")
verification_result: dict | None = Field(None, description="執行驗證結果K8s MCP watch_rollout")
manual_fix_steps: str | None = Field(None, description="手動修復步驟TYPE-4 使用者輸入)")
manual_fix_by: str | None = Field(None, description="手動修復執行者")
# [首席架構師] 移除 json_encoders (Pydantic v2 已 deprecated),原生序列化輸出格式與 .isoformat() 一致 v1.1 2026-04-01 Asia/Taipei
# 2026-04-01 Claude Code: 舊 Redis 資料相容性 - outcome 可能存為字串 "resolved"

View File

@@ -0,0 +1,257 @@
"""
KM Conversion Service — ADR-071-G
==================================
Incident RESOLVED 後自動轉換為 KnowledgeEntry + Playbook 草稿
設計原則:
- 非同步觸發,失敗不影響主流程
- 根據 notification_type 決定 KM 品質等級
- 自動向量化embedding
- 寫入 AlertOperationLog KM_CONVERTED 事件
建立時間: 2026-04-11 (台北時區)
建立者: Claude Sonnet 4.6 — ADR-071-G
leWOOOgo 積木化:
- KMConversionService → KnowledgeService + LearningService + AlertOperationLogRepository
- 不直接存取 DB透過 Repository 層
"""
import structlog
from src.models.knowledge import (
EntrySource,
EntryStatus,
EntryType,
KnowledgeEntryCreate,
)
from src.repositories.alert_operation_log_repository import (
ALERT_EVENT_TYPES,
get_alert_operation_log_repository,
)
from src.services.knowledge_service import get_knowledge_service
logger = structlog.get_logger(__name__)
# 加入 ADR-071 新 event_type避免 validation 攔截)
ALERT_EVENT_TYPES.update({
"KM_CONVERTED",
"NOTIFICATION_CLASSIFIED",
"MANUAL_FIX_RECORDED",
"PLAYBOOK_DRAFT_CREATED",
"STATE_GUARD_BLOCKED",
})
# 通知類型 → KM 品質等級對應
_TYPE_TO_STATUS = {
"TYPE-2": EntryStatus.APPROVED, # 自動修復成功,最高品質
"TYPE-3": EntryStatus.REVIEW, # 人工審核後執行
"TYPE-4": EntryStatus.DRAFT, # AI 無法判斷,草稿
"TYPE-4D": EntryStatus.DRAFT, # Config Drift草稿
"TYPE-1": None, # 純資訊,不轉 KM
}
_TYPE_TO_SOURCE = {
"TYPE-2": EntrySource.AI_EXTRACTED,
"TYPE-3": EntrySource.AI_EXTRACTED,
"TYPE-4": EntrySource.HUMAN,
"TYPE-4D": EntrySource.AI_EXTRACTED,
}
class KMConversionService:
"""
Incident → KM 自動轉換服務
觸發時機:
1. Incident 狀態變為 RESOLVED主要路徑
2. 使用者點擊 [手動修復後記錄] 後ADR-071-H
3. 每日 03:00 cron 補轉換vectorized=False + RESOLVED
"""
def __init__(self) -> None:
self._knowledge_svc = get_knowledge_service()
self._op_log_repo = get_alert_operation_log_repository()
async def convert(self, incident) -> dict | None:
"""
將 Incident 轉換為 KnowledgeEntry
Args:
incident: Incident ORM 物件
Returns:
dict with km_entry_id and quality_level, or None if skipped
"""
notification_type = getattr(incident, "notification_type", None) or "TYPE-3"
# TYPE-1 不轉 KM
target_status = _TYPE_TO_STATUS.get(notification_type)
if target_status is None:
logger.debug(
"km_conversion_skipped",
incident_id=incident.incident_id,
reason="TYPE-1 純資訊,不轉 KM",
)
return None
entry_source = _TYPE_TO_SOURCE.get(notification_type, EntrySource.AI_EXTRACTED)
alert_category = getattr(incident, "alert_category", None) or "unknown"
# 提取 label 資訊
labels = incident.signals[0].labels if incident.signals else {}
alertname = labels.get("alertname", "unknown")
severity = labels.get("severity", "unknown")
affected_services = ", ".join(incident.affected_services or ["unknown"])
# 計算修復耗時
resolution_time = ""
if incident.resolved_at and incident.created_at:
try:
delta = incident.resolved_at - incident.created_at
resolution_time = f"{int(delta.total_seconds())}s"
except Exception:
pass
# 建立 KM 內容(標準格式)
content = self._build_content(
incident=incident,
alertname=alertname,
affected_services=affected_services,
severity=severity,
resolution_time=resolution_time,
)
title = f"{alertname} @ {affected_services[:40]}{incident.title[:60] if incident.title else '未知'}"
tags = [alertname, affected_services, severity, notification_type]
if alert_category != "unknown":
tags.append(alert_category)
km_entry = await self._knowledge_svc.create_entry(
KnowledgeEntryCreate(
title=title[:255],
content=content,
entry_type=EntryType.INCIDENT_CASE,
category=alert_category,
tags=[t for t in tags if t],
source=entry_source,
status=target_status,
related_incident_id=incident.incident_id,
)
)
# 寫入操作日誌
try:
await self._op_log_repo.append(
event_type="KM_CONVERTED",
incident_id=incident.incident_id,
actor="km_conversion_service",
action_detail=f"KM entry created: {km_entry.entry_id}",
success=True,
context={
"km_entry_id": km_entry.entry_id,
"quality_level": target_status.value,
"notification_type": notification_type,
},
)
except Exception as _e:
logger.warning("km_op_log_failed", incident_id=incident.incident_id, error=str(_e))
logger.info(
"km_converted",
incident_id=incident.incident_id,
km_entry_id=km_entry.entry_id,
quality_level=target_status.value,
notification_type=notification_type,
)
return {
"km_entry_id": km_entry.entry_id,
"quality_level": target_status.value,
}
def _build_content(
self,
incident,
alertname: str,
affected_services: str,
severity: str,
resolution_time: str,
) -> str:
"""
建立 KM 條目內容(標準格式)
"""
created_at_str = str(incident.created_at) if incident.created_at else "未知"
resolved_at_str = str(incident.resolved_at) if incident.resolved_at else "未知"
context_summary = ""
if incident.context_bundle:
context_summary = str(incident.context_bundle.get("summary", ""))
# 決策鏈資訊
decision_chain = getattr(incident, "decision_chain", None)
root_cause = ""
action_type = ""
action_command = ""
if decision_chain and isinstance(decision_chain, dict):
root_cause = decision_chain.get("root_cause", "")
action_type = decision_chain.get("action_type", "")
action_command = decision_chain.get("action", "")
# 指標快照(若有)
metrics_section = ""
if incident.metrics_before or incident.metrics_after:
mb = incident.metrics_before or {}
ma = incident.metrics_after or {}
metrics_section = (
f"\n## 效果驗證\n"
f"- 執行前: {mb}\n"
f"- 執行後: {ma}\n"
f"- 恢復耗時: {resolution_time}\n"
)
# 驗證結果(若有)
verify_section = ""
if incident.verification_result:
verify_section = f"- 驗證方式: {incident.verification_result}\n"
manual_section = ""
if incident.manual_fix_steps:
manual_section = (
f"\n## 手動修復步驟\n"
f"- 執行者: {incident.manual_fix_by or '未知'}\n"
f"```\n{incident.manual_fix_steps}\n```\n"
)
return (
f"## 症狀\n"
f"- 告警: {alertname}\n"
f"- 服務: {affected_services}\n"
f"- 嚴重度: {severity}\n"
f"- 觸發時間: {created_at_str}\n"
f"- 解決時間: {resolved_at_str}\n"
+ (f"- 即時情境: {context_summary}\n" if context_summary else "")
+ f"\n## 根因分析\n{root_cause or incident.title or '未知'}\n"
+ (
f"\n## 執行動作\n"
f"- 類型: {action_type}\n"
f"- 指令: {action_command}\n"
if action_type or action_command else ""
)
+ metrics_section
+ verify_section
+ manual_section
)
# Singleton (模組層級)
_km_conversion_service: KMConversionService | None = None
def get_km_conversion_service() -> KMConversionService:
global _km_conversion_service
if _km_conversion_service is None:
_km_conversion_service = KMConversionService()
return _km_conversion_service

View File

@@ -1099,6 +1099,69 @@ RISK_EMOJI_MAP = {
}
# =============================================================================
# ADR-071-B: 告警通知四類型分類器 (2026-04-11 Claude Sonnet 4.6)
# =============================================================================
from enum import Enum
class NotificationType(str, Enum):
TYPE_1 = "TYPE-1" # 純資訊,無按鈕
TYPE_2 = "TYPE-2" # 已自動修復
TYPE_3 = "TYPE-3" # 需人工審核(預設)
TYPE_4 = "TYPE-4" # AI 無法判斷
TYPE_4_DRIFT = "TYPE-4D" # Config Drift 專屬
def classify_notification(
incident,
confidence: float,
auto_executed: bool,
mcp_all_failed: bool = False,
decision_state: str = "",
) -> NotificationType:
"""
告警通知分類器 — 決定要送哪種類型的 Telegram 卡片
分類優先順序:
TYPE-4D > TYPE-1 > TYPE-2 > TYPE-4 > TYPE-3(預設)
Args:
incident: Incident ORM 物件(需要 signals[].labels + title
confidence: AI 決策信心值 (0.0~1.0)
auto_executed: 是否已自動修復執行完成
mcp_all_failed: 所有 MCP provider 是否全失敗
decision_state: DecisionState 字串 ("COMPLETED" / "ERROR" / ...)
"""
labels = incident.signals[0].labels if incident.signals else {}
alertname = labels.get("alertname", "")
label_severity = labels.get("severity", "")
# TYPE-4DConfig Drift 專屬(最優先)
if alertname == "ConfigDrift":
return NotificationType.TYPE_4_DRIFT
# TYPE-1純資訊severity=info + 成功類告警)
title_lower = (incident.title or "").lower()
if label_severity == "info" and any(kw in title_lower for kw in ["success", "完成", "completed"]):
return NotificationType.TYPE_1
if alertname.startswith(("Backup.", "VeleroBackup")) and label_severity == "info":
return NotificationType.TYPE_1
if alertname in ("AlertChainHealthy", "AutoRepairHighSuccessRate"):
return NotificationType.TYPE_1
# TYPE-2已自動修復完成
if auto_executed and decision_state == "COMPLETED":
return NotificationType.TYPE_2
# TYPE-4AI 無法判斷(信心不足 / MCP 全失敗 / 決策錯誤)
if confidence < 0.5 or mcp_all_failed or decision_state == "ERROR":
return NotificationType.TYPE_4
# TYPE-3預設需人工審核
return NotificationType.TYPE_3
# =============================================================================
# Telegram Gateway
# =============================================================================
@@ -1255,47 +1318,106 @@ class TelegramGateway:
include_auto_tuning: bool = True,
auto_tuning_command: str = "",
incident_id: str = "",
# ADR-071-E: TYPE-3 動態按鈕 (2026-04-11 Claude Sonnet 4.6)
alert_category: str = "",
notification_type: str = "",
) -> dict:
"""
建立 Inline Keyboard (ADR-050 v2.0 六鍵佈局)
建立 Inline Keyboard
2026-04-01 Claude Code (ADR-050): 重組為 6 鍵 + 可選自動調優
- 第一行: [✅ 批准] [❌ 拒絕] [🔕 靜默] ← nonce 防重放
- 第二行: [📋 詳情] [🔄 重診] [📊 歷史] ← incident_id format (read-only)
- 第三行: [⚡ 自動調優] (可選)
ADR-050 v2.0 (2026-04-01): 六鍵佈局
ADR-071-E (2026-04-11): TYPE-3 依 alert_category 動態組合操作按鈕
TYPE-3 按鈕對應 alert_category:
k8s_workload → [重啟] [擴容] [縮容] [回滾]
database → [終止慢查詢] [清連線池]
host_resource → [查程序] [重啟服務] [清 Log]
network → [重載 Nginx] [查 Port]
devops_tool → [重啟服務] [查 Log]
ai_system → [切換 Provider]
ssl_cert → [更新憑證]
(其他) → [批准] [拒絕] (舊版通用鍵)
Args:
approval_id: 簽核單 ID (用於 nonce 生成)
include_auto_tuning: 是否包含自動調優按鈕
auto_tuning_command: kubectl 調優指令
incident_id: 關聯 Incident ID (用於 detail/reanalyze/history 按鈕)
Returns:
dict: Telegram InlineKeyboardMarkup
alert_category: 告警類別 (ADR-071-E: 決定 TYPE-3 按鈕組合)
notification_type: 通知類型 (TYPE-1/2/3/4/4D)
"""
# TYPE-3 動態操作按鈕 (ADR-071-E)
_CATEGORY_BUTTONS: dict[str, list[tuple[str, str]]] = {
"k8s_workload": [
("🔄 重啟", f"action:restart:{incident_id}"),
("📈 擴容", f"action:scale_up:{incident_id}"),
("📉 縮容", f"action:scale_down:{incident_id}"),
("⏪ 回滾", f"action:rollback:{incident_id}"),
],
"database": [
("🛑 終止慢查詢", f"action:kill_slow_query:{incident_id}"),
("🔄 清連線池", f"action:clear_conn_pool:{incident_id}"),
],
"host_resource": [
("🔍 查程序", f"action:check_process:{incident_id}"),
("🔄 重啟服務", f"action:restart_service:{incident_id}"),
("🗑 清 Log", f"action:clear_log:{incident_id}"),
],
"network": [
("🔄 重載 Nginx", f"action:reload_nginx:{incident_id}"),
("🔌 查 Port", f"action:check_port:{incident_id}"),
],
"devops_tool": [
("🔄 重啟服務", f"action:restart_service:{incident_id}"),
("📋 查 Log", f"action:check_log:{incident_id}"),
],
"ai_system": [
("🔀 切換 Provider", f"action:switch_provider:{incident_id}"),
],
"ssl_cert": [
("🔐 更新憑證", f"action:renew_cert:{incident_id}"),
],
}
# 產生 Nonce (防重放,用於寫操作)
approve_nonce = self._security.generate_callback_nonce(approval_id, "approve")
reject_nonce = self._security.generate_callback_nonce(approval_id, "reject")
silence_nonce = self._security.generate_callback_nonce(approval_id, "silence")
# 第一行: 主要簽核操作 (nonce 保護)
buttons = [
[
{"text": "✅ 批准", "callback_data": approve_nonce},
{"text": "❌ 拒絕", "callback_data": reject_nonce},
{"text": "🔕 靜默", "callback_data": silence_nonce},
],
]
is_type3 = notification_type in ("TYPE-3", NotificationType.TYPE_3, "")
# 第二行: 資訊查詢按鈕 (ADR-050: read-only, format: action:incident_id)
if incident_id:
buttons.append([
if is_type3 and alert_category and alert_category in _CATEGORY_BUTTONS:
# TYPE-3 動態操作按鈕:第一行為類別專屬操作
category_btns = [
{"text": text, "callback_data": cb_data}
for text, cb_data in _CATEGORY_BUTTONS[alert_category]
]
# 每行最多 3 個,超過換行
rows = [category_btns[i:i+3] for i in range(0, len(category_btns), 3)]
# 通用操作:[查看詳情] [忽略]
rows.append([
{"text": "📋 詳情", "callback_data": f"detail:{incident_id}"},
{"text": "🔄 重診", "callback_data": f"reanalyze:{incident_id}"},
{"text": "📊 歷史", "callback_data": f"history:{incident_id}"},
{"text": "🔕 忽略", "callback_data": silence_nonce},
])
buttons = rows
else:
# 舊版通用鍵(向下相容)
buttons = [
[
{"text": "✅ 批准", "callback_data": approve_nonce},
{"text": "❌ 拒絕", "callback_data": reject_nonce},
{"text": "🔕 靜默", "callback_data": silence_nonce},
],
]
# 第二行: 資訊查詢按鈕 (ADR-050: read-only, format: action:incident_id)
if incident_id:
buttons.append([
{"text": "📋 詳情", "callback_data": f"detail:{incident_id}"},
{"text": "🔄 重診", "callback_data": f"reanalyze:{incident_id}"},
{"text": "📊 歷史", "callback_data": f"history:{incident_id}"},
])
# 第三行: 自動調優按鈕 (v7.0)
# 自動調優按鈕 (v7.0)
if include_auto_tuning and auto_tuning_command:
tuning_nonce = self._security.generate_callback_nonce(approval_id, "tune")
buttons.append([
@@ -1591,6 +1713,123 @@ class TelegramGateway:
except Exception as e:
logger.error("send_approval_card_to_group_failed", error=str(e))
# =========================================================================
# ADR-071-C: TYPE-1 純資訊通知 (2026-04-11 Claude Sonnet 4.6)
# =========================================================================
async def send_info_notification(
self,
incident_id: str,
title: str,
message: str,
alertname: str = "",
severity: str = "info",
) -> dict:
"""
TYPE-1 純資訊通知 — 無按鈕FYI 類告警
用於: severity=info 成功類 / Backup 完成 / AlertChainHealthy 等
格式: 簡潔文字,無 InlineKeyboard
Args:
incident_id: 事件 ID
title: 訊息標題
message: 訊息內容
alertname: 告警名稱
severity: 嚴重度 (info/success)
"""
severity_emoji = {"info": "", "success": ""}.get(severity, "")
text = (
f"{severity_emoji} <b>{html.escape(title)}</b>\n"
f"━━━━━━━━━━━━━━━━━━━\n"
f"📋 <code>{html.escape(incident_id)}</code>\n"
)
if alertname:
text += f"🔔 告警: <code>{html.escape(alertname)}</code>\n"
text += (
f"\n{html.escape(message)}\n"
f"\n<i>此為純資訊通知,無需操作。</i>"
)
return await self._make_request(
"sendMessage",
{
"chat_id": settings.OPENCLAW_TG_CHAT_ID,
"text": text,
"parse_mode": "HTML",
},
)
# =========================================================================
# ADR-071-F: TYPE-4D Config Drift 專屬卡片 (2026-04-11 Claude Sonnet 4.6)
# =========================================================================
async def send_drift_card(
self,
incident_id: str,
approval_id: str,
resource_name: str,
diff_summary: str,
detected_at: str = "",
) -> dict:
"""
TYPE-4D Config Drift 通知卡片
按鈕: [查看Diff] [採納變更] [回滾] [忽略]
Diff 長度 > 500 字 → 改送 Web 連結,避免 Telegram 訊息過長
Args:
incident_id: 事件 ID
approval_id: 簽核單 ID (用於 nonce 生成)
resource_name: 漂移的資源名稱
diff_summary: Diff 摘要文字
detected_at: 偵測時間
"""
# Diff 長度處理 (ADR-071, Section 14.9.6)
if len(diff_summary) <= 500:
diff_block = f"\n<pre>{html.escape(diff_summary)}</pre>"
else:
web_url = f"https://aiops.wooo.work/incidents/{incident_id}/drift-diff"
diff_block = f"\n⚠️ 差異過大({len(diff_summary)} 字)\n🔗 <a href='{web_url}'>查看完整 Diff</a>"
text = (
f"⚙️ <b>Config Drift 偵測</b>\n"
f"━━━━━━━━━━━━━━━━━━━\n"
f"📋 <code>{html.escape(incident_id)}</code>\n"
f"🎯 資源: <code>{html.escape(resource_name[:50])}</code>\n"
)
if detected_at:
text += f"🕐 偵測時間: {html.escape(detected_at)}\n"
text += diff_block
# 按鈕組合 (TYPE-4D 固定四鍵)
view_nonce = self._security.generate_callback_nonce(approval_id, "drift_view")
adopt_nonce = self._security.generate_callback_nonce(approval_id, "drift_adopt")
revert_nonce = self._security.generate_callback_nonce(approval_id, "drift_revert")
ignore_nonce = self._security.generate_callback_nonce(approval_id, "silence")
keyboard = {
"inline_keyboard": [
[
{"text": "🔍 查看 Diff", "callback_data": view_nonce},
{"text": "✅ 採納變更", "callback_data": adopt_nonce},
],
[
{"text": "⏪ 回滾", "callback_data": revert_nonce},
{"text": "🔕 忽略", "callback_data": ignore_nonce},
],
]
}
return await self._make_request(
"sendMessage",
{
"chat_id": settings.OPENCLAW_TG_CHAT_ID,
"text": text,
"parse_mode": "HTML",
"reply_markup": keyboard,
},
)
# =========================================================================
# 新訊息發送方法 (2026-03-29 ogt: ADR-038)
# =========================================================================
@@ -2111,6 +2350,21 @@ class TelegramGateway:
nonce=nonce,
)
# ===================================================================
# Step 1.8: ADR-071-D 狀態機守衛State Machine Guardrail
# 2026-04-11 Claude Sonnet 4.6 (ADR-071 第一批最高優先)
# 防止已 RESOLVED/CLOSED 的事件卡片被誤點再次執行
# 防止 MITIGATING 中的事件被重複觸發
# ===================================================================
guard_result = await self._check_incident_state_guard(
approval_id=approval_id,
callback_query_id=callback_query_id,
message_id=message_id,
original_text=original_text,
)
if guard_result is not None:
return guard_result
# ===================================================================
# Step 2: 處理自動調優 (Shadow Mode)
# ===================================================================
@@ -2197,6 +2451,51 @@ class TelegramGateway:
"silence_result": silence_result,
}
# ===================================================================
# Step 2.8: ADR-071-H 手動修復記錄 (TYPE-4)
# 2026-04-11 Claude Sonnet 4.6 (ADR-071 第一批)
# 使用者點擊 [手動修復後記錄] → Bot 提示輸入步驟
# 實際步驟收集在 handle_message() 的 /done 流程中完成
# ===================================================================
if action == "log_manual_fix":
await self._answer_callback(
callback_query_id,
"log_manual_fix",
text="📝 請輸入修復步驟,完成後傳送 /done",
)
# 在 Redis 儲存「等待手動修復輸入」狀態
try:
redis = get_redis()
await redis.setex(
f"manual_fix_pending:{user_id}",
1800, # 30 分鐘
approval_id,
)
except Exception as _e:
logger.warning("manual_fix_pending_store_failed", error=str(_e))
await self._send_request(
"sendMessage",
{
"chat_id": settings.OPENCLAW_TG_CHAT_ID,
"text": (
"📝 <b>手動修復記錄</b>\n"
"━━━━━━━━━━━━━━━━━━━\n"
"請輸入您的修復步驟(可多行)。\n"
"輸入完畢後傳送 <code>/done</code>\n\n"
"<i>30 分鐘內有效</i>"
),
"parse_mode": "HTML",
},
)
return {
"action": action,
"approval_id": approval_id,
"user": user,
"success": True,
"waiting_for_manual_fix": True,
}
# ===================================================================
# Step 3: 回應 Callback Query (簽核/拒絕)
# ===================================================================
@@ -2253,6 +2552,217 @@ class TelegramGateway:
)
return {"success": False, "error": str(e)}
async def _check_incident_state_guard(
self,
approval_id: str,
callback_query_id: str,
message_id: int,
original_text: str,
) -> dict | None:
"""
ADR-071-D 狀態機守衛
從 approval_id 查詢關聯 incident 的當下狀態:
- RESOLVED / CLOSED → 拒絕執行,更新卡片文字,移除按鈕
- MITIGATING → 防止重複觸發,回覆「修復中」提示
- 其他 / 查不到 → 返回 None讓主流程繼續
2026-04-11 Claude Sonnet 4.6 (ADR-071-D)
"""
try:
from uuid import UUID
from src.services.approval_db import get_approval_service
from src.repositories.incident_repository import get_incident_repository
from src.models.incident import IncidentStatus
approval_svc = get_approval_service()
try:
approval = await approval_svc.get_approval_by_id(UUID(approval_id))
except (ValueError, Exception):
return None # approval_id 格式異常,讓主流程處理
if not approval or not approval.incident_id:
return None # 無關聯 incident放行
incident_repo = get_incident_repository()
incident = await incident_repo.get_by_id(approval.incident_id)
if not incident:
return None
if incident.status in (IncidentStatus.RESOLVED, IncidentStatus.CLOSED):
resolved_at = incident.resolved_at.strftime("%Y-%m-%d %H:%M") if incident.resolved_at else "未知時間"
await self._answer_callback(
callback_query_id,
"blocked",
text="✅ 此事件已解決",
)
try:
separator = "──────────────"
safe_original = html.escape(original_text) if original_text else ""
stamp = f"✅ 此事件已於 {resolved_at} 解決"
await self._send_request("editMessageText", {
"chat_id": self.chat_id,
"message_id": message_id,
"text": f"{safe_original}\n{separator}\n{stamp}" if safe_original else stamp,
"parse_mode": "HTML",
"reply_markup": {"inline_keyboard": []},
"disable_web_page_preview": True,
})
except Exception:
# 移除按鈕保底
try:
await self._send_request("editMessageReplyMarkup", {
"chat_id": self.chat_id,
"message_id": message_id,
"reply_markup": {"inline_keyboard": []},
})
except Exception:
pass
logger.info(
"state_guard_blocked_resolved",
approval_id=approval_id,
incident_id=approval.incident_id,
incident_status=incident.status.value,
)
return {"blocked": True, "reason": "already_resolved", "approval_id": approval_id}
if incident.status == IncidentStatus.MITIGATING:
await self._answer_callback(
callback_query_id,
"blocked",
text="⏳ 正在修復中,請稍候...",
)
logger.info(
"state_guard_blocked_mitigating",
approval_id=approval_id,
incident_id=approval.incident_id,
)
return {"blocked": True, "reason": "already_executing", "approval_id": approval_id}
except Exception as e:
# 守衛失敗不阻塞主流程
logger.warning("state_guard_error", approval_id=approval_id, error=str(e))
return None
async def handle_manual_fix_done(
self,
user_id: int,
username: str,
fix_steps: str,
) -> dict:
"""
ADR-071-H: 處理使用者輸入 /done 後的手動修復步驟記錄
流程:
1. 從 Redis 取得 pending approval_id
2. 查詢 ApprovalRecord → 取得 incident_id
3. 更新 incidents.manual_fix_steps + manual_fix_by
4. 寫入 alert_operation_log MANUAL_FIX_RECORDED
5. 觸發 KMConversionService.convert()
6. 回覆 Telegram 確認訊息
Args:
user_id: Telegram user ID
username: Telegram username
fix_steps: 使用者輸入的修復步驟
"""
try:
from src.core.redis_client import get_redis as _get_redis
redis = _get_redis()
pending_key = f"manual_fix_pending:{user_id}"
approval_id_bytes = await redis.get(pending_key)
if not approval_id_bytes:
await self._send_request("sendMessage", {
"chat_id": settings.OPENCLAW_TG_CHAT_ID,
"text": "⚠️ 找不到待記錄的修復任務,或已逾時。",
"parse_mode": "HTML",
})
return {"success": False, "reason": "no_pending_task"}
approval_id = approval_id_bytes.decode() if isinstance(approval_id_bytes, bytes) else str(approval_id_bytes)
await redis.delete(pending_key)
# 查 ApprovalRecord → incident
from src.repositories.incident_repository import IncidentDBRepository
from src.repositories.approval_repository import ApprovalDBRepository
approval_repo = ApprovalDBRepository()
approval = await approval_repo.get_by_approval_id(approval_id)
if not approval:
await self._send_request("sendMessage", {
"chat_id": settings.OPENCLAW_TG_CHAT_ID,
"text": f"⚠️ 找不到簽核單 <code>{html.escape(approval_id)}</code>",
"parse_mode": "HTML",
})
return {"success": False, "reason": "approval_not_found"}
incident_repo = IncidentDBRepository()
incident = await incident_repo.get_by_id(approval.incident_id)
if not incident:
return {"success": False, "reason": "incident_not_found"}
# 更新 incidents.manual_fix_steps + manual_fix_by
from src.db.base import get_db_context
from src.db.models import Incident as IncidentORM
from sqlalchemy import update
async with get_db_context() as db:
await db.execute(
update(IncidentORM)
.where(IncidentORM.incident_id == approval.incident_id)
.values(
manual_fix_steps=fix_steps,
manual_fix_by=username or str(user_id),
)
)
await db.commit()
# 寫操作日誌
from src.repositories.alert_operation_log_repository import get_alert_operation_log_repository
op_log_repo = get_alert_operation_log_repository()
await op_log_repo.append(
event_type="MANUAL_FIX_RECORDED",
incident_id=approval.incident_id,
approval_id=approval_id,
actor=username or str(user_id),
action_detail=fix_steps[:500],
success=True,
)
# 觸發 KM 轉換(重讀最新 incident
incident_updated = await incident_repo.get_by_id(approval.incident_id)
if incident_updated:
from src.services.km_conversion_service import get_km_conversion_service
km_svc = get_km_conversion_service()
import asyncio as _asyncio
_asyncio.create_task(km_svc.convert(incident_updated))
# 回覆確認
await self._send_request("sendMessage", {
"chat_id": settings.OPENCLAW_TG_CHAT_ID,
"text": (
f"✅ <b>手動修復步驟已記錄</b>\n"
f"━━━━━━━━━━━━━━━━━━━\n"
f"📋 事件: <code>{html.escape(approval.incident_id)}</code>\n"
f"👤 記錄者: @{html.escape(username or str(user_id))}\n\n"
f"<i>正在建立草稿 Playbook請至 AWOOOI 審核後生效。</i>"
),
"parse_mode": "HTML",
})
logger.info(
"manual_fix_recorded",
incident_id=approval.incident_id,
user=username,
)
return {"success": True, "incident_id": approval.incident_id}
except Exception as e:
logger.error("handle_manual_fix_done_failed", error=str(e))
return {"success": False, "error": str(e)}
async def _handle_auto_tuning(
self,
approval_id: str,