diff --git a/apps/api/src/services/incident_service.py b/apps/api/src/services/incident_service.py
index 4c06e474..ee7af0ba 100644
--- a/apps/api/src/services/incident_service.py
+++ b/apps/api/src/services/incident_service.py
@@ -138,11 +138,28 @@ def classify_alert_early(alertname: str, severity: str, labels: dict | None = No
):
return "alertchain_health", "TYPE-8M"
- # 3. 飛輪/AI 系統健康(優先於 severity 判斷)
- if alertname in ("AutoRepairLowSuccessRate", "PermanentFixRequired") or alertname.startswith("Flywheel"):
+ # 3. 資安告警(高優先,防止被 severity/prefix 規則覆蓋)
+ # ADR-075 TYPE-5S (2026-04-12 ogt)
+ if any(alertname.startswith(p) for p in (
+ "UnauthorizedSSH", "KubeAudit", "CVECritical", "WAFAttack",
+ "PodAbnormal", "SecurityBreach",
+ )):
+ return "secops", "TYPE-5S"
+
+ # 4. 飛輪/AI 系統健康(優先於 severity 判斷)
+ if alertname in ("AutoRepairLowSuccessRate", "PermanentFixRequired") or any(
+ alertname.startswith(p) for p in ("Flywheel", "MCPProvider", "OllamaDown", "NemotronDown")
+ ):
return "flywheel_health", "TYPE-8M"
- # 4. 純資訊
+ # 4a. 業務/FinOps 告警(ADR-075 TYPE-6B)
+ if any(alertname.startswith(p) for p in (
+ "AITokenCost", "GeminiAPIError", "SLOBurn", "APIErrorBudget",
+ "MomoScraper", "ScraperSuccess",
+ )):
+ return "business", "TYPE-6B"
+
+ # 5. 純資訊
if severity in ("info", "none"):
return "info", "TYPE-1"
@@ -185,8 +202,10 @@ def classify_alert_early(alertname: str, severity: str, labels: dict | None = No
return "external_site", "TYPE-3"
# 13. SSL 憑證(ADR-075 修正:從 general 分離)
+ # ≥14 天→TYPE-1(提醒,無需審核);<14 天→TYPE-3(緊急審核)
if alertname.startswith(("ExternalSiteSSL", "TLSCert")):
- return "ssl_cert", "TYPE-3"
+ days = int((labels or {}).get("days_remaining", 0)) if labels else 0
+ return "ssl_cert", ("TYPE-1" if days >= 14 else "TYPE-3")
return "general", "TYPE-3"
diff --git a/apps/api/src/services/telegram_gateway.py b/apps/api/src/services/telegram_gateway.py
index f8421417..94a52a7f 100644
--- a/apps/api/src/services/telegram_gateway.py
+++ b/apps/api/src/services/telegram_gateway.py
@@ -1391,7 +1391,24 @@ class TelegramGateway:
("🔍 查健康狀態", f"action:check_health:{incident_id}"),
("📋 查 Log", f"action:check_log:{incident_id}"),
],
- # alertchain_health / flywheel_health → TYPE-8M → send_meta_alert,不走此字典
+ # ADR-075 新增分類按鈕 (2026-04-12 ogt)
+ "secops": [
+ ("🚫 隔離資源", f"secops_isolate:{incident_id}"),
+ ("⛔ 封鎖來源 IP", f"secops_block_ip:{incident_id}"),
+ ("🔄 強制驅逐", f"secops_evict:{incident_id}"),
+ ("✅ 確認授權", f"secops_authorize:{incident_id}"),
+ ],
+ "business": [
+ ("⏸️ 暫停 1h", f"action:pause_1h:{incident_id}"),
+ ("🔍 查 SignOz", f"action:open_signoz:{incident_id}"),
+ ("❌ 忽略", f"action:ignore:{incident_id}"),
+ ],
+ "flywheel_health": [
+ ("🔄 觸發診斷", f"flywheel_diagnose:{incident_id}"),
+ ("📊 飛輪面板", f"action:open_flywheel:{incident_id}"),
+ ("🔕 靜默", f"action:silence:{incident_id}"),
+ ],
+ # alertchain_health → TYPE-8M → send_meta_alert,不走此字典
"ai_system": [
("🔀 切換 Provider", f"action:switch_provider:{incident_id}"),
],
@@ -1924,6 +1941,192 @@ class TelegramGateway:
},
)
+ async def send_secops_card(
+ self,
+ incident_id: str,
+ approval_id: str,
+ alertname: str,
+ threat_level: str,
+ source: str = "",
+ threat_behavior: str = "",
+ defense_action: str = "",
+ resource: str = "",
+ ) -> dict:
+ """
+ TYPE-5S SecOps 資安威脅告警卡片。
+
+ ADR-075 (2026-04-12 ogt)
+ 按鈕: [隔離資源] [封鎖來源IP] [強制驅逐] [確認授權]
+ 只發個人 DM(指令敏感,不發群組)。
+ """
+ level_icon = {"critical": "🔴", "warning": "🟠"}.get(threat_level.lower(), "⚠️")
+
+ text = (
+ f"🥷 SECOPS | {level_icon} 資安威脅\n"
+ f"━━━━━━━━━━━━━━━━━━━\n"
+ f"📋 {html.escape(incident_id)}\n"
+ f"🚨 威脅類型:{html.escape(alertname)}\n"
+ )
+ if resource:
+ text += f"🎯 受害資源:{html.escape(resource)}\n"
+ text += "\n🧠 AI 威脅分析\n"
+ if source:
+ text += f"├─ 來源:{html.escape(source)}\n"
+ if threat_behavior:
+ text += f"├─ 異常行為:{html.escape(threat_behavior[:200])}\n"
+ text += f"└─ 風險評估:{html.escape(threat_level)}\n"
+ if defense_action:
+ text += f"\n🛡️ 建議防禦動作\n{html.escape(defense_action[:200])}\n"
+
+ isolate_nonce = self._security.generate_callback_nonce(approval_id, "secops_isolate")
+ auth_nonce = self._security.generate_callback_nonce(approval_id, "secops_authorize")
+
+ keyboard = {
+ "inline_keyboard": [
+ [
+ {"text": "🚫 隔離資源", "callback_data": isolate_nonce},
+ {"text": "⛔ 封鎖來源 IP", "callback_data": f"secops_block_ip:{incident_id}"},
+ ],
+ [
+ {"text": "🔄 強制驅逐 Pod", "callback_data": f"secops_evict:{incident_id}"},
+ {"text": "✅ 確認授權操作", "callback_data": auth_nonce},
+ ],
+ ]
+ }
+
+ return await self._make_request(
+ "sendMessage",
+ {
+ "chat_id": settings.OPENCLAW_TG_CHAT_ID,
+ "text": text,
+ "parse_mode": "HTML",
+ "reply_markup": keyboard,
+ },
+ )
+
+ async def send_business_alert(
+ self,
+ incident_id: str,
+ alertname: str,
+ business_domain: str,
+ metric_name: str,
+ current_value: str,
+ threshold: str,
+ loss_rate: str = "",
+ group_chat_id: str | None = None,
+ ) -> dict:
+ """
+ TYPE-6B 業務/FinOps 告警。
+
+ ADR-075 (2026-04-12 ogt)
+ 路由: TYPE-1 發群組,此為 TYPE-6B 也發群組(業務趨勢數字)
+ 按鈕: [暫停] [查 SignOz] [忽略]
+ """
+ text = (
+ f"📉 SLO ALERT | 業務指標異常\n"
+ f"━━━━━━━━━━━━━━━━━━━\n"
+ f"📋 {html.escape(incident_id)}\n"
+ f"🚨 告警:{html.escape(alertname)}\n"
+ f"🎯 影響業務:{html.escape(business_domain)}\n"
+ f"📊 異常指標:{html.escape(metric_name)}\n"
+ f"\n🧠 業務衝擊分析\n"
+ f"├─ 當前狀態:{html.escape(current_value)} (閾值: {html.escape(threshold)})\n"
+ )
+ if loss_rate:
+ text += f"└─ 損失速率:{html.escape(loss_rate)}\n"
+
+ keyboard = {
+ "inline_keyboard": [
+ [
+ {"text": "⏸️ 暫停 1h", "callback_data": f"action:pause_1h:{incident_id}"},
+ {"text": "❌ 忽略", "callback_data": f"action:ignore:{incident_id}"},
+ ],
+ ]
+ }
+
+ target_chat = group_chat_id or settings.OPENCLAW_TG_CHAT_ID
+ return await self._make_request(
+ "sendMessage",
+ {
+ "chat_id": target_chat,
+ "text": text,
+ "parse_mode": "HTML",
+ "reply_markup": keyboard,
+ },
+ )
+
+ async def send_escalation_card(
+ self,
+ incident_id: str,
+ original_alertname: str,
+ duration_min: int,
+ priority: int = 0,
+ attempted_actions: str = "",
+ failure_reason: str = "",
+ current_impact: str = "",
+ group_chat_id: str | None = None,
+ ) -> dict:
+ """
+ TYPE-7E 重大事故升級通知。
+
+ ADR-075 (2026-04-12 ogt)
+ 觸發: SLA 超時(P0: 15分鐘; P1: 45分鐘)
+ 路由: 個人 DM + 群組(緊急事故全員知情)
+ 按鈕: [建立戰情室] [Postmortem草稿] [DR手冊] [確認接手]
+ """
+ duration_str = f"{duration_min} 分鐘" if duration_min < 60 else f"{duration_min//60} 小時 {duration_min%60} 分"
+
+ text = (
+ f"🚨 ESCALATION | P{priority} 事故升級\n"
+ f"━━━━━━━━━━━━━━━━━━━\n"
+ f"📋 {html.escape(incident_id)} | 已持續 {duration_str}\n"
+ f"⚠️ 超出自動修復能力範圍\n"
+ f"🎯 核心問題:{html.escape(original_alertname)}\n"
+ )
+ if attempted_actions or failure_reason or current_impact:
+ text += "\n🧠 AI 戰局總結\n"
+ if attempted_actions:
+ text += f"├─ 嘗試動作:{html.escape(attempted_actions[:100])}\n"
+ if failure_reason:
+ text += f"├─ 失敗原因:{html.escape(failure_reason[:100])}\n"
+ if current_impact:
+ text += f"└─ 目前影響:{html.escape(current_impact[:100])}\n"
+
+ keyboard = {
+ "inline_keyboard": [
+ [
+ {"text": "📄 產生 Postmortem 草稿", "callback_data": f"action:postmortem:{incident_id}"},
+ ],
+ [
+ {"text": "✅ 確認已接手處理", "callback_data": f"action:escalation_ack:{incident_id}"},
+ {"text": "📖 DR 手冊", "callback_data": f"action:dr_manual:{incident_id}"},
+ ],
+ ]
+ }
+
+ results = []
+ # 發個人 DM
+ results.append(await self._make_request(
+ "sendMessage",
+ {
+ "chat_id": settings.OPENCLAW_TG_CHAT_ID,
+ "text": text,
+ "parse_mode": "HTML",
+ "reply_markup": keyboard,
+ },
+ ))
+ # 發群組(若有)
+ if group_chat_id:
+ results.append(await self._make_request(
+ "sendMessage",
+ {
+ "chat_id": group_chat_id,
+ "text": text + "\n📣 @所有人 事故升級,請協助!",
+ "parse_mode": "HTML",
+ },
+ ))
+ return results[0]
+
# =========================================================================
# 新訊息發送方法 (2026-03-29 ogt: ADR-038)
# =========================================================================