Files
awoooi/apps/api/src/services/decision_manager.py
Your Name 7f4088bcd0 fix(aiops-p0): 六大病根 P0 全面修復(ADR-092 B4)
【P0.1】knowledge_extractor_service.py:210 — AttributeError 修復
- Signal.description 欄位不存在(100% 失敗,KM 每天+5 根因)
- 改用 alert_name + annotations.summary 拼接文字

【P0.2+P0.3】Gate 9+11 唯讀指令鬆綁
- blast_radius_calculator: kubectl get/top/describe/logs/version → score=1(非 50)
- operation_parser: 增加 INVESTIGATE 類型識別(唯讀 kubectl 不回 None)
- executor.py: OperationType 新增 INVESTIGATE enum
- approval_execution.py: INVESTIGATE 路徑直接呼叫 execute_kubectl_command

【P0.4】MCP SSH/K8s Provider 修復
- decision_manager: params= → parameters=(符合 MCPToolProvider.execute 簽名)
- decision_manager: MCPToolResult .get() → .success/.output(dataclass 用法)
- decision_manager + ssh_provider: 補入 hosts 120/121(原 default 缺失)
- auto_approve: phase2_agent_debate source bypass confidence 閾值

【P0.5】告警規則語義矛盾修復
- alert_rules.yaml: 8 條 kubectl 查詢規則 RESTART_DEPLOYMENT → NO_ACTION
  (CrashLoopBackOff/PostgreSQL 連線/慢查詢/MinIO 磁碟/K3s 節點/告警鏈路/SSL/CoreDNS 等)
- incident_service.py: cAdvisor/CoreDNS 從 general 拆出獨立分類

【P0.6】proactive_inspector 動態基線 PromQL 全修
- 5 個 MONITORED_METRICS PromQL 全部修正(cadvisor label/datname/blackbox)
- db_connection_pool: datname="awoooi" → "awoooi_prod"
- http_error_rate: 無效 http_requests_total → blackbox probe_success
- cpu/memory: namespace label → name=~"k8s_api_awoooi-api.*"

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-24 15:32:23 +08:00

3033 lines
140 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Decision Manager - Phase 6.5 非同步決策狀態機
# 2026-04-10 Claude Sonnet 4.6: auto_execute ApprovalRequest 修復 + placeholder 替換
=============================================
實作「雙軌決策」(Dual-Engine Decision):
1. OpenClaw LLM (主要) - 智能提案
2. Expert System (備援) - 規則引擎
狀態機:
- INIT: 事件剛建立
- ANALYZING: 正在分析中 (LLM + Expert 並行)
- READY: 決策就緒,等待統帥親核
- EXECUTING: 已授權,正在執行
- COMPLETED: 執行完成
統帥鐵律:
- 永遠不能讓 UI 鎖死
- 30 秒內必須有 decision_token
- LLM 失敗時 Expert System 保底
"""
import asyncio
import json
from datetime import UTC, datetime
from enum import Enum
from typing import Any, Protocol, runtime_checkable
from uuid import uuid4
import structlog
from src.core.config import settings
from src.core.redis_client import get_redis
from src.models.incident import Incident
from src.models.playbook import SymptomPattern
from src.services.auto_approve import get_auto_approve_policy
from src.services.openclaw import get_openclaw
from src.services.playbook_service import get_playbook_service
logger = structlog.get_logger(__name__)
# Phase 7.5: Playbook 優先閾值
PLAYBOOK_SIMILARITY_THRESHOLD = 0.85 # 相似度 >= 85% 直接使用 Playbook
# P1 fix 2026-04-11: background task GC guard — keep strong refs until done
_background_tasks: set[asyncio.Task] = set()
def _fire_and_forget(coro) -> asyncio.Task:
"""Create a background task with GC protection via _background_tasks."""
task = asyncio.create_task(coro)
_background_tasks.add(task)
task.add_done_callback(_background_tasks.discard)
return task
# P1 fix 2026-04-11: kubectl action dangerous char whitelist
import re as _re_module
_ALLOWED_KUBECTL_PATTERN = _re_module.compile(
r"^kubectl\s+(rollout restart|rollout undo|scale|delete pod|get|describe|logs)"
r"\s+[a-zA-Z0-9_./-]+(\s+(-n|--namespace)\s+[a-zA-Z0-9_-]+)?$"
)
# =============================================================================
# Phase 31 (ADR-067 2026-04-10): Log 異常摘要 — NemoTron deepseek-r1:14b
# =============================================================================
async def _send_log_summary(incident: "Incident") -> None:
"""
非同步取得 Pod log 異常摘要,用 NemoTron bot 發到 SRE 群組
觸發點_push_decision_to_telegram 發完審批卡後
"""
try:
target = incident.affected_services[0] if incident.affected_services else None
if not target:
return
namespace = "awoooi-prod"
if incident.signals:
namespace = incident.signals[0].labels.get("namespace", "awoooi-prod")
from src.services.log_summary_service import get_log_summary_service
svc = get_log_summary_service()
summary = await svc.summarize_with_soft_timeout(pod_name=target, namespace=namespace)
if not summary:
return
from src.services.telegram_gateway import get_telegram_gateway
tg = get_telegram_gateway()
await tg.send_as_nemotron(
f"📋 <b>Log 異常摘要</b> — <code>{target}</code>\n{summary}"
)
import structlog as _sl
_sl.get_logger(__name__).info("log_summary_sent", target=target, incident_id=incident.incident_id)
except Exception as e:
import structlog as _sl
_sl.get_logger(__name__).warning("log_summary_failed", error=str(e))
# =============================================================================
# Telegram 推送 (Phase 6.5: 決策就緒通知)
# =============================================================================
async def _push_decision_to_telegram(
incident: Incident,
proposal_data: dict[str, Any],
) -> None:
"""
決策就緒時推送到 Telegram
Phase 6.5: 整合 Signal Worker 流程與 Telegram 通知
2026-03-27 ogt: 加入 Redis 去重機制 (10 分鐘 TTL)
"""
try:
# 延遲導入避免循環依賴
from src.core.redis_client import get_redis
from src.services.telegram_gateway import (
classify_notification,
get_telegram_gateway,
NotificationType,
_smart_truncate as _smt,
)
# 🔴 去重檢查:同一個 incident 10 分鐘內只發一次
redis = get_redis()
dedup_key = f"telegram_sent:{incident.incident_id}"
if await redis.exists(dedup_key):
logger.debug(
"telegram_push_skipped",
reason="Already sent within 10 minutes",
incident_id=incident.incident_id,
)
return
# 2026-04-09 Claude Code: resolved Incident 不重送 Telegram
# 場景: dedup TTL 過期後,已 resolve 的 Incident 仍被重新推送
if incident.status and str(incident.status).lower() in ("resolved", "closed"):
logger.info(
"telegram_push_skipped",
reason="Incident already resolved",
incident_id=incident.incident_id,
)
return
# 🔴 靜默檢查:此資源是否被靜默 (2026-03-27 P1 優化)
target = incident.affected_services[0] if incident.affected_services else "unknown"
silence_key = f"telegram_silence:{target}"
if await redis.exists(silence_key):
logger.info(
"telegram_push_silenced",
reason="Resource is silenced",
incident_id=incident.incident_id,
resource=target,
)
return
# 檢查是否有設定 Bot Token
if not settings.OPENCLAW_TG_BOT_TOKEN:
logger.debug(
"telegram_push_skipped",
reason="Bot token not configured",
incident_id=incident.incident_id,
)
return
gateway = get_telegram_gateway()
# 從 proposal_data 提取資料
import re as _re
def _strip_placeholders(s: str) -> str:
"""移除 <placeholder> 佔位符,避免 Telegram HTML parse 錯誤"""
return _re.sub(r'<[^>]+>', '', s).strip()
def _parse_debate_summary(reasoning: str) -> dict[str, str]:
"""
解析 coordinator debate_summary → {diagnosis, plan, review, critic}
格式:「診斷:{...};方案:{...};安全審查:{...};質疑:{...}」
2026-04-17 ogt + Claude Sonnet 4.6: 修復 TYPE-8M 三欄重複渲染
根因diagnosis/system_impact/probable_cause 全用 reasoning[:100] → 同一段字
"""
result: dict[str, str] = {"diagnosis": "", "plan": "", "review": "", "critic": ""}
for part in reasoning.split(""):
part = part.strip()
if part.startswith("診斷:"):
result["diagnosis"] = part[3:]
elif part.startswith("方案:"):
result["plan"] = part[3:]
elif part.startswith("安全審查:"):
result["review"] = part[5:]
elif part.startswith("質疑:"):
result["critic"] = part[3:]
return result
target = incident.affected_services[0] if incident.affected_services else "unknown"
risk_level = proposal_data.get("risk_level", "medium")
# 2026-04-09 Claude Code: action 不用 _strip_placeholders避免截掉 deployment name
# <placeholder> 應在 nemotron 補正後已填入真實值
action = proposal_data.get("action", proposal_data.get("kubectl_command", ""))
# 2026-04-09 Claude Code: 修復舊 Incident proposal_data 存 enum string 導致建議空白
# 舊 code 存 action="RESTART_DEPLOYMENT" 而非 kubectl command
# 偵測:無 kubectl/ssh/docker 關鍵字 → 用規則引擎重新查
_KUBECTL_MARKERS = ("kubectl", "ssh", "docker", "systemctl", "/")
if action and not any(m in action for m in _KUBECTL_MARKERS):
# action 是 enum string嘗試用規則引擎補出 kubectl command
try:
from src.services.alert_rule_engine import match_rule as _match_rule
_labels = incident.signals[0].labels if incident.signals else {}
# 2026-04-12 ogt: Incident 沒有 title 欄位,用 signal annotation summary
_sig_msg = (
incident.signals[0].annotations.get("summary", "") or
incident.signals[0].annotations.get("description", "") or
incident.signals[0].alert_name
) if incident.signals else ""
_rule_resp = _match_rule({
"labels": _labels,
"alert_type": _labels.get("alertname", target),
"message": _sig_msg,
"target_resource": target,
"namespace": incident.signals[0].labels.get("namespace", "awoooi-prod") if incident.signals else "awoooi-prod",
"severity": risk_level,
})
if _rule_resp and _rule_resp.get("kubectl_command", "").strip():
action = _rule_resp["kubectl_command"]
except Exception:
pass # 規則引擎失敗不影響通知,保留原 action
description = proposal_data.get("description", "")
reasoning = _strip_placeholders(proposal_data.get("reasoning", ""))
confidence = proposal_data.get("confidence", 0.0) # 🔴 預設 0.0 表示未經 AI 分析
source = proposal_data.get("source", "unknown")
ai_provider = proposal_data.get("provider", "") # 2026-03-29 ogt: AI 模型來源
ai_model = proposal_data.get("model", "") # 2026-04-04 ogt: 底層模型名稱
# 2026-04-02 ogt: Phase 22 Nemotron 協作資料
nemotron_enabled = proposal_data.get("nemotron_enabled", False)
nemotron_tools = proposal_data.get("nemotron_tools")
nemotron_validation = proposal_data.get("nemotron_validation", "")
nemotron_latency_ms = proposal_data.get("nemotron_latency_ms", 0.0)
# 2026-04-09 Claude Sonnet 4.6: Tool Calling 模型/後端
nemotron_tool_model = proposal_data.get("nemotron_tool_model", "")
nemotron_tool_backend = proposal_data.get("nemotron_tool_backend", "")
# 2026-04-16 ogt + Claude Sonnet 4.6: Playbook 匹配名稱穿透到 TG 卡片 (ADR-076)
_playbook_name = proposal_data.get("playbook_name", "")
# 2026-04-16 ogt + Claude Sonnet 4.6: 覆寫 Webhook 的垃圾 action
# 問題Webhook inline LLM 建立 ApprovalRecord 時寫入通用 action如 kubectl rollout restart
# Agent 分析正確但只發新 Telegram 卡,未覆寫 ApprovalRecord.action
# 用戶批准 Agent 卡 → 系統查 incident_id → 執行舊 webhook 垃圾 action
# 修復Agent 確認 action 後立即覆寫 ApprovalRecord只要有值空字串不覆寫
_agent_action = action # action 已由 _package_to_proposal_data + rule engine 確定
if not _agent_action and description:
# NO_ACTION用描述摘要讓用戶知道 Agent 的建議是「觀察/調查」
_agent_action = f"NO_ACTION - {description[:120]}"
if _agent_action:
try:
from src.services.approval_db import get_approval_service as _get_approval_svc
await _get_approval_svc().update_action_by_incident_id(
incident_id=incident.incident_id,
new_action=_agent_action,
)
except Exception as _update_err:
logger.warning(
"approval_action_update_by_agent_failed",
incident_id=incident.incident_id,
error=str(_update_err),
)
# 🚨 ADR-091 鐵律 (2026-04-17 ogt + Claude Sonnet 4.6): 禁止分析失敗廣播
# 問題: GET /incidents 觸發 Agent Debate → LLM 全部失敗 → description="待分析" + action=""
# → 系統每隔幾分鐘廣播「待分析」喪屍卡片 → 告警疲勞SRE 最致命的殺手)
# 規則: description="待分析" + action 屬於失敗狀態集合 = Phase 2 所有 Agent 無有效輸出 → 靜默退出
# 已完成: DB 狀態已在上方更新 (update_action_by_incident_id),不需要 Telegram 廣播
# 禁止改動: 此閘門保護全域告警信噪比,任何「加例外」需首席架構師書面授權
# 2026-04-17 ogt + Claude Sonnet 4.6 (hotfix): 修復 d5dbfc9 邏輯漏洞
# 舊 bug: `not action.strip()` 在 action="待分析" 時為 False → 閘門失效 → 喪屍卡片突圍
# 原因: c759b4e P1 修復讓 suggested_action fallback 為 "待分析"(非空字串)而非 ""
# 修復: action 必須不在失敗狀態集合才允許廣播
_action_text = action.strip()
_FAILED_ACTION_TOKENS = {"", "待分析", "NO_ACTION", "待分析 - 系統自動保護"}
if description.strip() == "待分析" and _action_text in _FAILED_ACTION_TOKENS:
logger.info(
"telegram_push_suppressed_no_analysis",
incident_id=incident.incident_id,
action_text=_action_text,
reason="Agent Debate 無有效輸出 (description=待分析, action 屬於失敗狀態集合)",
)
return
# 建立 approval_id (使用 incident_id 作為追蹤)
# 2026-03-27 ogt: 修復 INC-INC-INC- 重複前綴 bug
approval_id = incident.incident_id # 已經是 INC-xxx 格式
# ADR-071: 通知分類器 — 依告警類型/狀態決定卡片種類
# 2026-04-11 Claude Sonnet 4.6: 接通 classify_notification(),原本死程式碼
_auto_executed = proposal_data.get("auto_executed", False)
_decision_state = proposal_data.get("decision_state", "")
_mcp_all_failed = proposal_data.get("mcp_all_failed", False)
_notif_type = classify_notification(
incident=incident,
confidence=confidence,
auto_executed=_auto_executed,
mcp_all_failed=_mcp_all_failed,
decision_state=_decision_state,
)
# 2026-04-12 ogt: ADR-075 — 從 Incident 提取 alert_category/notification_type各分支共用
_alert_category = getattr(incident, "alert_category", "") or ""
_notification_type = getattr(incident, "notification_type", "") or (_notif_type.value if _notif_type else "")
_alertname = incident.signals[0].labels.get("alertname", "MetaSystemAlert") if incident.signals else "MetaSystemAlert"
# blocked_reason 由上游「決策路由閘門」統一設定decide() 第 4c 步)
# 此處只做狀態轉譯,通知層禁止查詢業務邏輯(架構師鐵律 2026-04-17
# ① INVALID_TARGET → TYPE-4目標無法解析SRE 需人工調查
# ② NO_ACTION + critical → TYPE-4critical 事件不可靜默
# ③ NO_ACTION + non-critical → TYPE-1純資訊卡
_blocked_reason = proposal_data.get("blocked_reason", "")
if "INVALID_TARGET" in _blocked_reason:
_notif_type = NotificationType.TYPE_4
elif "NO_ACTION" in _blocked_reason:
if risk_level == "critical":
_notif_type = NotificationType.TYPE_4
else:
_notif_type = NotificationType.TYPE_1
# 2026-04-12 ogt: classify_alert_early() 設的 notification_type 優先於 classify_notification()
# 場景backup/info 告警被 classify_notification() 誤判為 TYPE-3confidence=0, 無 auto_executed
# 規則incident.notification_type 明確為 TYPE-1 → 強制走 info 路徑
if _notification_type == "TYPE-1":
_notif_type = NotificationType.TYPE_1
elif _notification_type == "TYPE-4D":
_notif_type = NotificationType.TYPE_4_DRIFT
elif _notification_type == "TYPE-8M":
_notif_type = NotificationType.TYPE_8M
if _notif_type == NotificationType.TYPE_1:
# 純資訊通知 — 無按鈕
# 2026-04-12 ogt: Incident 沒有 title 欄位,用 alertname
# 2026-04-17 ogt + Claude Sonnet 4.6: BUG-A 修復 — 只取 diagnosis 欄位
# 舊message=reasoning[:200] → 整條 debate_summary 生文字傾倒
# 新:解析 debate_summary只取「診斷...」部分給 SRE 看
_info_title = (
incident.signals[0].labels.get("alertname", "") or
incident.signals[0].alert_name
) if incident.signals else "告警通知"
_parsed_info = _parse_debate_summary(reasoning)
_info_msg = _smt(_parsed_info.get("diagnosis") or description, 200)
tg_result = await gateway.send_info_notification(
incident_id=incident.incident_id,
title=_info_title or "告警通知",
message=_info_msg,
alertname=incident.signals[0].labels.get("alertname", "") if incident.signals else "",
severity="info",
)
elif _notif_type == NotificationType.TYPE_4_DRIFT:
# Config Drift 專屬卡片
# 2026-04-17 ogt + Claude Sonnet 4.6: BUG-B JSON Catcher
# LLM 可能輸出 JSON 結構({"action_title":"...","description":"...","rollback":"..."}
# 解析成功 → 格式化可讀文字;失敗 → 平滑降級為原始截斷字串(不可拋出 Exception
try:
_drift_data = json.loads(description)
_parts: list[str] = []
if _drift_data.get("action_title"):
_parts.append(f"📝 建議操作:{_drift_data['action_title']}")
if _drift_data.get("description"):
_parts.append(f"📖 說明:{_smt(_drift_data['description'], 200)}")
if _drift_data.get("rollback"):
_parts.append(f"⏪ 回滾方案:{_smt(_drift_data['rollback'], 100)}")
_diff_text = "\n".join(_parts) if _parts else description[:500]
except (json.JSONDecodeError, TypeError, AttributeError):
_diff_text = description[:500]
tg_result = await gateway.send_drift_card(
incident_id=incident.incident_id,
approval_id=approval_id,
resource_name=target[:50],
diff_summary=_diff_text,
)
elif _notif_type == NotificationType.TYPE_8M or _alert_category in ("alertchain_health", "flywheel_health"):
# TYPE-8M飛輪/告警鏈路健康異常,發到個人 DM不發群組
# 2026-04-17 ogt + Claude Sonnet 4.6: 解析 debate_summary各欄位用不同組件
# 根因diagnosis/system_impact/probable_cause 全取 reasoning[:100] → 三欄重複同一段字
_parsed = _parse_debate_summary(reasoning)
_diag = _smt(_parsed.get("diagnosis") or description, 120) if (_parsed.get("diagnosis") or description) else "(無診斷)"
_impact = _smt(_parsed.get("plan") or "", 150)
_cause = _smt(_parsed.get("critic") or _parsed.get("review") or "", 100)
tg_result = await gateway.send_meta_alert(
incident_id=incident.incident_id,
approval_id=approval_id,
alertname=_alertname,
alert_category=_alert_category,
diagnosis=_diag,
severity_level=risk_level,
system_impact=_impact,
probable_cause=_cause,
)
elif _alert_category == "secops":
# TYPE-5S資安事件 — 隔離/封鎖審核卡,發到個人 DM (ADR-075 Step-5)
# 2026-04-12 ogt (ADR-075 Step-5)
_labels = incident.signals[0].labels if incident.signals else {}
_threat_level = _labels.get("threat_level", risk_level)
tg_result = await gateway.send_secops_card(
incident_id=incident.incident_id,
approval_id=approval_id,
alertname=_alertname,
threat_level=_threat_level,
resource=target[:60],
threat_behavior=_smt(_parse_debate_summary(reasoning).get("diagnosis") or reasoning, 150) if reasoning else description[:150],
)
elif _alert_category == "business":
# TYPE-6B業務/FinOps 資訊告警 — 發到 SRE 群組(無審核按鈕)(ADR-075 Step-5)
# 2026-04-12 ogt (ADR-075 Step-5)
_labels = incident.signals[0].labels if incident.signals else {}
_business_domain = _labels.get("business_domain", "finops")
tg_result = await gateway.send_business_alert(
incident_id=incident.incident_id,
alertname=_alertname,
business_domain=_business_domain,
metric_name=_labels.get("metric_name", _alertname),
current_value=_labels.get("value", "--"),
threshold=_labels.get("threshold", "--"),
)
else:
# TYPE-2 / TYPE-3 / TYPE-4 都走 send_approval_card按鈕組合由 alert_category 決定)
# 2026-04-17 ogt + Claude Sonnet 4.6 (BUG-C): TYPE-3 root_cause 清洗
# 舊root_cause=_smt(reasoning, 500) → debate_summary 全文傾倒到 AI 診斷欄位
# 新_parse_debate_summary 只取 diagnosis方案/審查/質疑不在此卡顯示
_parsed_card = _parse_debate_summary(reasoning)
_card_root_cause = _smt(_parsed_card.get("diagnosis") or description, 300)
tg_result = await gateway.send_approval_card(
approval_id=approval_id,
risk_level=risk_level,
resource_name=target[:50],
root_cause=_card_root_cause,
# 2026-04-17 ogt + Claude Sonnet 4.6(亞太): 修復超時降級髒資料
# 舊action="" 時 fallback 到 description而 description 可能是「待分析」或診斷摘要
# 這導致 description 中的診斷文字(如「根因:...」)出現在「建議修復動作」欄位
# 新action="" 時固定顯示「待分析」,禁止 description 流進 suggested_action
suggested_action=action[:120] if action else "待分析",
estimated_downtime="5-15 min",
primary_responsibility="INFRA",
confidence=confidence,
namespace=incident.signals[0].labels.get("namespace", "default") if incident.signals else "default",
ai_provider=ai_provider,
ai_model=ai_model,
nemotron_enabled=nemotron_enabled,
nemotron_tools=nemotron_tools,
nemotron_validation=nemotron_validation,
nemotron_latency_ms=nemotron_latency_ms,
nemotron_tool_model=nemotron_tool_model,
nemotron_tool_backend=nemotron_tool_backend,
incident_id=incident.incident_id,
alert_category=_alert_category,
notification_type=_notification_type,
playbook_name=_playbook_name,
)
# 2026-04-09 Claude Sonnet 4.6: 存 message_id → 後續狀態更新在原訊息延續
# 同時寫 Redis (快速查詢) 和 DB (持久化,不受 TTL 限制)
tg_message_id = tg_result.get("result", {}).get("message_id") if isinstance(tg_result, dict) else None
tg_chat_id = tg_result.get("result", {}).get("chat", {}).get("id") if isinstance(tg_result, dict) else None
if tg_message_id:
await redis.setex(f"tg_msg:{incident.incident_id}", 86400, str(tg_message_id))
# 持久化到 DB
try:
from src.services.approval_db import get_approval_service as _get_approval_svc
_approval_svc = _get_approval_svc()
await _approval_svc.update_telegram_message(
incident_id=incident.incident_id,
telegram_message_id=tg_message_id,
telegram_chat_id=tg_chat_id,
)
except Exception as _e:
logger.warning("telegram_message_id_db_save_failed", incident_id=incident.incident_id, error=str(_e))
# Phase 31 (ADR-067 2026-04-10): Log 異常摘要 — NemoTron deepseek-r1:14b
# 非同步執行,不阻塞主流程
_fire_and_forget(_send_log_summary(incident))
# MCP Phase 4a: NemoClaw second opinion (2026-04-11 Claude Sonnet 4.6)
# 若 proposal_data 有 advisory_note用 NemoClaw bot 身分追加一條訊息
_advisory_note = proposal_data.get("advisory_note", "")
if _advisory_note:
_fire_and_forget(
gateway.send_as_nemotron(
f"🤔 <b>NemoClaw 第二意見</b> (信心={confidence:.2f})\n"
f"<i>{_advisory_note}</i>"
)
)
# 🔴 發送成功後設置去重 key (TTL 10 分鐘)
await redis.setex(dedup_key, 600, "1")
logger.info(
"telegram_decision_pushed",
incident_id=incident.incident_id,
source=source,
risk_level=risk_level,
)
except Exception as e:
# Telegram 失敗不影響主流程
logger.warning(
"telegram_decision_push_failed",
incident_id=incident.incident_id,
error=str(e),
)
async def _nemoclaw_second_opinion(incident: "Incident", primary_result: dict) -> str | None:
"""
MCP Phase 4a: NemoClaw second opinion — 信心 < 0.7 時觸發
============================================================
用 deepseek-r1:14b (Ollama 188) 對同一份資料做獨立推理,
輸出純文字 advisory_note不執行任何操作。
2026-04-11 Claude Sonnet 4.6 Asia/Taipei
"""
try:
from src.core.config import settings
import httpx as _httpx
from src.services.model_registry import get_model as _get_model
ollama_url = getattr(settings, "OLLAMA_URL", "http://192.168.0.111:11434")
# D1 集中化 2026-04-11: 從 models.json providers.ollama.models.nemoclaw 讀取
model = _get_model("ollama", "nemoclaw")
signals_summary = ""
if incident.signals:
lbl = incident.signals[0].labels
signals_summary = (
f"alertname={lbl.get('alertname','?')} "
f"severity={lbl.get('severity','?')} "
f"instance={lbl.get('instance','?')}"
)
prompt = (
f"你是資深 SRE請對以下告警做獨立分析並提出建議。\n"
f"告警摘要: {signals_summary}\n"
f"受影響服務: {', '.join(incident.affected_services or [])}\n"
f"主 AI 已提出: {primary_result.get('action','?')} "
f"(信心={primary_result.get('confidence',0):.2f})\n"
f"主 AI 根因: {primary_result.get('reasoning','')[:200]}\n\n"
f"請用繁體中文100 字以內,給出你的獨立判斷與建議(若同意主 AI 則說明原因)。"
)
async with _httpx.AsyncClient(timeout=30.0) as client:
resp = await client.post(
f"{ollama_url}/api/generate",
json={"model": model, "prompt": prompt, "stream": False},
)
resp.raise_for_status()
data = resp.json()
advisory = data.get("response", "").strip()
# 截取 <think>...</think> 後的正文deepseek-r1 CoT 格式)
if "</think>" in advisory:
advisory = advisory.split("</think>", 1)[-1].strip()
return advisory[:300] if advisory else None
except Exception as e:
import structlog as _sl
_sl.get_logger(__name__).debug("nemoclaw_second_opinion_error", error=str(e))
return None
async def _generate_playbook_draft_if_new(incident: "Incident") -> None:
"""
MCP Phase 4c: Playbook 無命中時,自動生成 AI 草稿 Playbook 寫入 KM
=====================================================================
- 僅在 KM 中不存在同 alertname 的 Playbook 時觸發(避免重複)
- 用 qwen2.5:7b-instruct (Ollama 188) 生成結構化 Playbook 草稿
- 寫入 KnowledgeEntrystatus=DRAFT需人工審核後升為 APPROVED
- 寫入 AlertOperationLog PLAYBOOK_DRAFT_CREATED 事件
2026-04-11 Claude Sonnet 4.6 Asia/Taipei
"""
try:
import httpx as _httpx
from src.core.config import settings
from src.models.knowledge import (
EntrySource, EntryStatus, EntryType, KnowledgeEntryCreate,
)
from src.repositories.alert_operation_log_repository import get_alert_operation_log_repository
from src.services.knowledge_service import get_knowledge_service
alertname = ""
if incident.signals:
alertname = incident.signals[0].labels.get("alertname", "")
if not alertname:
return
# 已存在同 alertname 的 KM 條目則跳過
knowledge_svc = get_knowledge_service()
existing = await knowledge_svc.semantic_search(alertname, limit=1, threshold=0.92)
if existing:
return
# 用 qwen2.5:7b-instruct 生成 Playbook 草稿
severity = incident.signals[0].labels.get("severity", "warning") if incident.signals else "warning"
services = ", ".join(incident.affected_services or ["unknown"])
prompt = (
f"你是資深 SRE請為以下告警生成一份結構化 Playbook 草稿(繁體中文)。\n"
f"告警名稱: {alertname}\n"
f"嚴重度: {severity}\n"
f"受影響服務: {services}\n\n"
f"請按以下格式輸出(不超過 300 字):\n"
f"## 症狀\n(描述此告警代表什麼)\n"
f"## 根因假設\n(最常見的 2-3 個原因)\n"
f"## 診斷步驟\nkubectl 或 shell 指令)\n"
f"## 修復動作\n(具體修復指令,含 kubectl rollout restart 等)\n"
f"## 驗收條件\n(如何確認修復成功)"
)
from src.services.model_registry import get_model as _get_model
ollama_url = getattr(settings, "OLLAMA_URL", "http://192.168.0.111:11434")
# D1 集中化 2026-04-11: 從 models.json providers.ollama.models.playbook_draft 讀取
_pb_model = _get_model("ollama", "playbook_draft")
async with _httpx.AsyncClient(timeout=45.0) as client:
resp = await client.post(
f"{ollama_url}/api/generate",
json={"model": _pb_model, "prompt": prompt, "stream": False},
)
resp.raise_for_status()
content = resp.json().get("response", "").strip()
if not content or len(content) < 50:
return
# 寫入 KMstatus=DRAFT
entry = await knowledge_svc.create_entry(
KnowledgeEntryCreate(
title=f"[AI草稿] {alertname} Playbook",
content=content,
entry_type=EntryType.AUTO_RUNBOOK,
category="ai_system",
tags=[alertname, severity, "ai_draft", "mcp_phase4c"],
source=EntrySource.AI_EXTRACTED,
status=EntryStatus.DRAFT,
related_incident_id=incident.incident_id,
)
)
# 寫入操作日誌
op_repo = get_alert_operation_log_repository()
await op_repo.append(
event_type="PLAYBOOK_DRAFT_CREATED",
incident_id=incident.incident_id,
actor="mcp_phase4c",
action_detail=f"AI 草稿 Playbook: {entry.entry_id}",
success=True,
context={"alertname": alertname, "km_entry_id": entry.entry_id},
)
import structlog as _sl
_sl.get_logger(__name__).info(
"playbook_draft_created",
incident_id=incident.incident_id,
alertname=alertname,
entry_id=entry.entry_id,
)
# 2026-04-16 ogt + Claude Sonnet 4.6: KM 建立後推 Telegram 通知 (ADR-076)
# best-effort — 不阻塞主流程
try:
from src.services.telegram_gateway import get_telegram_gateway as _get_tg
_tg = _get_tg()
await _tg.send_as_nemotron(
f"📚 <b>KM 新條目</b> — <code>{entry.entry_id}</code>\n"
f"🏷️ 分類:<b>auto_generated</b> 狀態DRAFT\n"
f"📋 告警:<code>{alertname}</code> 事件:<code>{incident.incident_id}</code>\n"
f"標題:{str(getattr(entry, 'title', alertname))[:60]}"
)
except Exception as _tg_err:
_sl.get_logger(__name__).debug("km_tg_notify_failed", error=str(_tg_err))
except Exception as e:
import structlog as _sl
_sl.get_logger(__name__).debug("playbook_draft_failed", error=str(e))
async def _resolve_target_from_k8s(incident: "Incident", namespace: str) -> str | None:
"""
BUG-002 補救:主機層告警無 component/job/pod label 時,
用 K8s MCP kubectl get pods 依 alertname/host label 動態查詢受影響 Pod name
回傳 deployment name去掉 hash suffix或 None。
2026-04-11 Claude Sonnet 4.6 Asia/Taipei
"""
try:
from src.plugins.mcp.providers.k8s_provider import K8sProvider
k8s = K8sProvider()
if not k8s.enabled:
return None
alertname = ""
if incident.signals:
labels = incident.signals[0].labels
alertname = labels.get("alertname", "")
# 用 kubectl get pods 列出所有 pods再根據 alertname 推測受影響的 deployment
result = await k8s.execute(
tool_name="kubectl_get",
params={"resource": "pods", "namespace": namespace, "output": "name"},
)
if not result.get("success"):
return None
pod_lines: list[str] = (result.get("output", "") or "").splitlines()
if not pod_lines:
return None
# alertname → 關鍵字映射(主機層告警常見類型)
# I2 修復 2026-04-11: HostHighDiskUsage → HostOutOfDiskSpace與 alerts-unified.yml 一致)
# DockerContainerUnhealthy/HostOutOfDiskSpace keywords=[] 走 fallback找第一個非 infra pod
# 並加 log 便於追蹤 fallback 路徑
_ALERTNAME_KEYWORDS: dict[str, list[str]] = {
"HostHighCpuLoad": ["api", "web"],
"HostOutOfMemory": ["api", "web"],
"DockerContainerUnhealthy": [],
"DockerContainerExited": [],
"HostOutOfDiskSpace": [],
}
keywords = _ALERTNAME_KEYWORDS.get(alertname, [])
if not keywords and alertname in _ALERTNAME_KEYWORDS:
logger.debug(
"resolve_target_k8s_fallback_to_first_pod",
alertname=alertname,
reason="alertname 有對應但 keywords=[],走 fallback 取第一個非 infra pod",
)
import re as _re
for line in pod_lines:
pod = line.removeprefix("pod/").strip()
if not pod:
continue
# 優先找關鍵字命中的 pod
if keywords and not any(kw in pod for kw in keywords):
continue
# 去掉 hash suffix → deployment name
parts = pod.rsplit("-", 2)
if len(parts) >= 3 and len(parts[-1]) == 5 and len(parts[-2]) in (9, 10):
return parts[0]
if len(parts) >= 2:
return "-".join(parts[:-1])
return pod
# 無關鍵字命中時,回傳第一個 non-infra pod
for line in pod_lines:
pod = line.removeprefix("pod/").strip()
if pod and not any(inf in pod for inf in ("prometheus", "alertmanager", "grafana")):
parts = pod.rsplit("-", 2)
if len(parts) >= 3:
return parts[0]
return pod
except Exception as e:
logger.debug("resolve_target_from_k8s_failed", error=str(e))
return None
async def _verify_k8s_deployment_exists(target: str, namespace: str, alertname: str = "") -> bool:
"""
BUG-003 補救:呼叫 K8s MCP 確認 deployment/pod 是否真實存在。
K8s MCP 不可用時:
- 主機層告警 (Host*/Docker*) → 返回 False阻止 K8s 操作)
- K8s 層告警 → 返回 True保守放行讓 kubectl 自行報錯)
2026-04-11 Claude Sonnet 4.6 Asia/Taipei
"""
# 主機層告警前綴 — 這類告警的修復目標是 Docker container/主機,不是 K8s deployment
_HOST_ALERTNAME_PREFIXES = ("Host", "Docker", "Backup", "Velero", "SSH")
_is_host_alert = alertname.startswith(_HOST_ALERTNAME_PREFIXES) if alertname else False
try:
from src.plugins.mcp.providers.k8s_provider import K8sProvider
k8s = K8sProvider()
if not k8s.enabled:
# 主機層告警K8s MCP 不可用 → 拒絕(不應對主機層問題執行 K8s 操作)
# K8s 層告警:保守放行,讓 kubectl 自行報錯
return not _is_host_alert
result = await k8s.execute(
tool_name="kubectl_get",
params={"resource": "deployment", "name": target, "namespace": namespace},
)
if result.get("success"):
return True
# 嘗試 pod有些告警對應的是 pod 而非 deployment
result_pod = await k8s.execute(
tool_name="kubectl_get",
params={"resource": "pod", "namespace": namespace, "selector": f"app={target}"},
)
return bool(result_pod.get("success") and result_pod.get("output", "").strip())
except Exception as e:
logger.debug("verify_k8s_deployment_exists_failed", target=target, error=str(e))
# 例外時:主機層告警拒絕,其他保守放行
return not _is_host_alert
async def _fetch_metrics_snapshot(incident: Incident) -> dict:
"""
ADR-071-I: 從 Prometheus 抓取與此 incident 相關的指標快照
失敗時靜默返回空 dict不阻塞主流程
2026-04-11 Claude Sonnet 4.6 Asia/Taipei
"""
try:
from src.plugins.mcp.providers.prometheus_provider import PrometheusProvider
prom = PrometheusProvider()
if not prom.enabled:
return {}
labels = incident.signals[0].labels if incident.signals else {}
alertname = labels.get("alertname", "")
instance = labels.get("instance", "")
snapshots: dict = {}
# 根據 alertname 選擇最相關的指標
if alertname in ("HostHighCpuLoad", "HostOutOfMemory"):
if instance:
host = instance.split(":")[0]
# P0 fix 2026-04-11: _instant_query 要求 dict回傳 {"result": [...]}
r = await prom._instant_query({"query": f'100 - (avg by(instance) (irate(node_cpu_seconds_total{{mode="idle",instance=~"{host}.*"}}[5m])) * 100)'})
for item in r.get("result", []):
snapshots["cpu_pct"] = round(float(item["value"][1]), 1)
cpu_query = (
f'(1 - (node_memory_MemAvailable_bytes{{instance=~"{instance}"}} / node_memory_MemTotal_bytes{{instance=~"{instance}"}})) * 100'
if instance else "100 * (1 - node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes)"
)
r2 = await prom._instant_query({"query": cpu_query})
for item in r2.get("result", []):
snapshots["mem_pct"] = round(float(item["value"][1]), 1)
elif alertname == "HostOutOfDiskSpace":
r = await prom._instant_query({"query": 'max(100 - ((node_filesystem_avail_bytes{fstype!="tmpfs"} / node_filesystem_size_bytes{fstype!="tmpfs"}) * 100))'})
for item in r.get("result", []):
snapshots["disk_pct"] = round(float(item["value"][1]), 1)
elif alertname in ("PodRestartingTooMuch", "PodCrashLoopBackOff"):
pod = labels.get("pod", labels.get("component", ""))
if pod:
r = await prom._instant_query({"query": f'sum(kube_pod_container_status_restarts_total{{namespace="awoooi-prod",pod=~"{pod}.*"}})'})
for item in r.get("result", []):
snapshots["restart_count"] = int(float(item["value"][1]))
return snapshots
except Exception as _e:
logger.debug("metrics_snapshot_failed", incident_id=incident.incident_id, error=str(_e))
return {}
def _format_metrics_delta(before: dict, after: dict) -> str:
"""
ADR-071-I: 格式化指標前後對比文字
CPU 92%→23% | Mem 78%→45%
"""
if not before and not after:
return ""
parts = []
for key, label in [("cpu_pct", "CPU"), ("mem_pct", "Mem"), ("disk_pct", "Disk")]:
b = before.get(key)
a = after.get(key)
if b is not None and a is not None:
parts.append(f"{label} {b}%→{a}%")
elif b is not None:
parts.append(f"{label} {b}% (before)")
elif a is not None:
parts.append(f"{label} {a}% (after)")
for key, label in [("restart_count", "Restarts")]:
b = before.get(key)
a = after.get(key)
if b is not None and a is not None:
parts.append(f"{label} {b}{a}")
return " | ".join(parts)
async def _push_auto_repair_result(
incident: Incident,
action: str,
success: bool,
error: str = "",
) -> None:
"""
自動修復執行後,在原始告警訊息追加狀態行。
統帥要求: 所有狀態變更必須在原告警訊息延續,不發新訊息。
- append_incident_update() 取 Redis tg_msg:{id} → reply 原訊息 + 換按鈕
- 找不到 message_id 時 fallback 到 send_notification降級
ADR-071-I: 成功時抓 metrics_after 快照,寫入 incidents 表,並在通知中顯示前後對比
2026-04-09 Claude Sonnet 4.6 Asia/Taipei
2026-04-11 Claude Sonnet 4.6: +ADR-071-I 指標快照
"""
try:
from src.services.telegram_gateway import get_telegram_gateway
gateway = get_telegram_gateway()
target = incident.affected_services[0] if incident.affected_services else "unknown"
inc_id = incident.incident_id
# ADR-071-I: 抓 metrics_after成功時
metrics_delta_text = ""
if success:
metrics_after = await _fetch_metrics_snapshot(incident)
metrics_before = getattr(incident, "metrics_before", None) or {}
# 寫入 DB不阻塞主流程
if metrics_after:
try:
from src.db.base import get_db_context
from src.db.models import Incident as IncidentORM
from sqlalchemy import update as _update
async with get_db_context() as db:
await db.execute(
_update(IncidentORM)
.where(IncidentORM.incident_id == inc_id)
.values(metrics_after=metrics_after)
)
await db.commit()
logger.info("metrics_after_saved", incident_id=inc_id, metrics=metrics_after)
except Exception as _e:
logger.warning("metrics_after_save_failed", incident_id=inc_id, error=str(_e))
metrics_delta_text = _format_metrics_delta(metrics_before, metrics_after)
# MCP Phase 4b: 抓 K8s Pod 狀態寫入 k8s_state_after (2026-04-11 Claude Sonnet 4.6)
try:
from src.plugins.mcp.providers.k8s_provider import K8sProvider
_k8s = K8sProvider()
if _k8s.enabled:
_service = (incident.affected_services or [""])[0]
if _service:
_k8s_result = await _k8s.execute(
"kubectl_get",
{"resource_type": "pods", "namespace": "awoooi-prod",
"label_selector": f"app={_service}"},
)
if _k8s_result.success and _k8s_result.data:
_k8s_state = str(_k8s_result.data)[:500]
from src.db.base import get_db_context
from src.db.models import Incident as IncidentORM
from sqlalchemy import update as _upd2
async with get_db_context() as _db2:
await _db2.execute(
_upd2(IncidentORM)
.where(IncidentORM.incident_id == inc_id)
.values(k8s_state_after=_k8s_state)
)
await _db2.commit()
except Exception as _k8s_err:
logger.debug("k8s_state_after_failed", incident_id=inc_id, error=str(_k8s_err))
if success:
delta_line = f"\n├ 指標: <code>{metrics_delta_text}</code>" if metrics_delta_text else ""
status_line = (
f"✅ <b>自動修復完成</b>\n"
f"├ <code>{action[:100] if action else '已執行'}</code>"
f"{delta_line}"
)
else:
status_line = (
f"❌ <b>自動修復失敗,請人工介入</b>\n"
f"├ 動作: <code>{action[:80] if action else '未知'}</code>\n"
f"└ 錯誤: {error[:100] if error else '未知錯誤'}"
)
# BUG-006 修復 2026-04-11: outcome + verification_result 全為 null
# 原因_push_auto_repair_result 只送 Telegram沒寫 DB
# 修復:寫入 incidents 表 outcome/verification_result 欄位
try:
from src.db.base import get_db_context
from src.db.models import IncidentRecord
from sqlalchemy import update as _upd_outcome
_outcome = "auto_repaired" if success else "auto_repair_failed"
async with get_db_context() as _odb:
await _odb.execute(
_upd_outcome(IncidentRecord)
.where(IncidentRecord.incident_id == inc_id)
.values(outcome=_outcome)
)
await _odb.commit()
logger.info("outcome_written", incident_id=inc_id, outcome=_outcome)
except Exception as _oe:
logger.warning("outcome_write_failed", incident_id=inc_id, error=str(_oe))
# 優先: reply 原告警訊息並換掉按鈕
appended = await gateway.append_incident_update(
incident_id=inc_id,
status_line=status_line,
keep_info_buttons=True, # 保留詳情/重診/歷史,移除批准/拒絕
)
# Fallback: 找不到原訊息 ID舊告警或 Redis 過期)→ 發新訊息
# 2026-04-15 ogt: 改為 ADR-075 TYPE-2 格式(禁止 raw text
if not appended:
result_icon = "" if success else ""
result_label = "完成" if success else "失敗"
fallback_text = (
f"{result_icon} <b>TYPE-2 | 自動修復{result_label}</b>\n"
"──────────────────────\n"
f"├─ 事件: <code>{inc_id}</code>\n"
f"├─ 對象: <code>{target[:50]}</code>\n"
f"└─ {status_line}"
)
await gateway.send_notification(fallback_text)
logger.info("auto_repair_result_sent", incident_id=inc_id, success=success, appended=appended)
except Exception as e:
logger.warning("auto_repair_result_push_failed", incident_id=incident.incident_id, error=str(e))
# =============================================================================
# Decision States
# =============================================================================
class DecisionState(str, Enum):
"""決策狀態機"""
INIT = "init" # 事件剛建立
ANALYZING = "analyzing" # 正在分析
READY = "ready" # 決策就緒
EXECUTING = "executing" # 正在執行
COMPLETED = "completed" # 已完成
ERROR = "error" # 錯誤
# =============================================================================
# Expert System - 規則引擎 (Local Fallback)
# =============================================================================
EXPERT_RULES: dict[str, dict[str, Any]] = {
# Pod 崩潰 → 重啟
"pod_crash": {
"patterns": ["crash", "restart", "oom", "killed", "failed"],
"action": "kubectl rollout restart deployment/{target}",
"description": "Expert System: 偵測到 Pod 異常,建議重啟部署",
"risk_level": "medium",
"reasoning": "根據歷史數據,重啟可解決 85% 的 Pod 崩潰問題",
},
# 高延遲 → 擴容
"high_latency": {
"patterns": ["latency", "slow", "timeout", "p99"],
"action": "kubectl scale deployment/{target} --replicas=3",
"description": "Expert System: 偵測到高延遲,建議擴容至 3 副本",
"risk_level": "low",
"reasoning": "擴容可分散負載,降低單一 Pod 壓力",
},
# 高錯誤率 → 回滾
"high_error_rate": {
"patterns": ["error", "5xx", "fail", "exception"],
"action": "kubectl rollout undo deployment/{target}",
"description": "Expert System: 偵測到高錯誤率,建議回滾至上一版",
"risk_level": "critical",
"reasoning": "錯誤率突增通常源自最近部署,回滾是最快修復方式",
},
# 資源耗盡 → 擴容
"resource_exhaustion": {
"patterns": ["cpu", "memory", "resource", "quota"],
"action": "kubectl scale deployment/{target} --replicas=2",
"description": "Expert System: 偵測到資源耗盡,建議擴容",
"risk_level": "medium",
"reasoning": "增加副本可分散資源壓力",
},
# 預設 → 重啟 (最保守)
"default": {
"patterns": [],
"action": "kubectl rollout restart deployment/{target}",
"description": "Expert System: 無法確定具體問題,建議安全重啟",
"risk_level": "medium",
"reasoning": "重啟是最安全的通用修復動作",
},
}
def _package_to_proposal_data(package: Any) -> dict[str, Any]:
"""
將 Phase 2 DecisionPackage 轉換為 proposal_data dict。
proposal_data 格式是既有 decision_manager 決策流程的共同語言,
所有下游auto_approve / Telegram / approval_record都依此格式讀取。
ADR-082: Phase 2 多 Agent 協作
2026-04-15 ogt + Claude Sonnet 4.6(亞太)
"""
action = package.recommended_action or ""
confidence = package.confidence
# requires_human_approval → risk_level 映射
# Phase 2 Reviewer + Critic 已做安全審查,不需要 human = low/medium
if package.requires_human_approval:
risk_level = "high" if confidence > 0 else "critical"
elif confidence >= 0.8:
risk_level = "low"
elif confidence >= 0.6:
risk_level = "medium"
else:
risk_level = "high"
# 組出人類可讀的 descriptionTelegram 卡片顯示用)
# 2026-04-16 ogt + Claude Sonnet 4.6: 修復 description=debate_summary garbage
diag = getattr(package, "diagnosis", None)
plan = getattr(package, "action_plan", None)
desc_parts = []
if diag and getattr(diag, "top_hypothesis", None):
h = diag.top_hypothesis
desc_parts.append(f"根因:{h.description[:150]}(信心 {h.confidence:.0%}")
if plan and getattr(plan, "top_candidate", None):
c = plan.top_candidate
desc_parts.append(f"方案:{c.action[:100]}")
# blocked_reason 是系統內部診斷,不能放進 descriptionTelegram 卡片顯示用)
# 2026-04-17 ogt + Claude Sonnet 4.6(亞太): 修復超時髒資料污染卡片
# 舊blocked_reason → desc_parts → description → suggested_action 欄位顯示「備注:全局超時 > 90.0s」
# 新blocked_reason 只寫入 proposal_data["blocked_reason"],供下游閘門邏輯用,禁止進卡片顯示
# 2026-04-24 ogt + Claude Sonnet 4.6: ADR-091 鐵閘 bypass 修復
# 問題Agent Debate 超時 (TIMEOUT/FAILED) → desc_parts 空 + action 空 → description="待分析"
# → ADR-091 鐵閘攔截description=待分析 AND action in FAILED_TOKENS→ PENDING 積壓
# 修法:超時/全降級時使用明確降級文字,讓鐵閘放行 → Telegram 推送降級通知 → 人工可見
if desc_parts:
description = "".join(desc_parts)
elif action:
description = action[:200]
else:
_status = getattr(package, "session_status", None)
_status_val = _status.value if _status and hasattr(_status, "value") else ""
if _status_val == "timeout":
description = "AI 分析超時90s降級至人工審核"
elif _status_val == "failed" or getattr(package, "all_agents_degraded", False):
description = "AI 分析降級,所有 Agent 無有效輸出,建議人工確認"
else:
description = "待分析"
return {
"action": action,
"kubectl_command": action if action.startswith("kubectl") else "",
"description": description,
"reasoning": package.debate_summary[:1000] if package.debate_summary else "",
"confidence": confidence,
"risk_level": risk_level,
"source": "phase2_agent_debate",
"requires_human_review": package.requires_human_approval,
# Phase 2 診斷摘要(供 Audit Trail / 學習閉環,不直接顯示給用戶)
"debate_summary": package.debate_summary or "",
"all_agents_degraded": package.all_agents_degraded,
"blocked_reason": package.blocked_reason or "",
"session_status": package.session_status.value if package.session_status else "",
# MINOR-2: 補齊 Expert System / LLM 路徑存在的 keys防止下游 .get() 靜默空缺
"is_rule_based": False,
"matched_rule": "",
"rule_id": "",
"from_cache": False,
}
def expert_analyze(incident: Incident) -> dict[str, Any]:
"""
Expert System 規則引擎分析
這是 100% 本地執行,永不失敗的保底方案
"""
target = incident.affected_services[0] if incident.affected_services else "unknown-service"
alert_names = " ".join([s.alert_name.lower() for s in incident.signals])
# 匹配規則
matched_rule = "default"
for rule_name, rule in EXPERT_RULES.items():
if rule_name == "default":
continue
if any(pattern in alert_names for pattern in rule["patterns"]):
matched_rule = rule_name
break
rule = EXPERT_RULES[matched_rule]
# 2026-03-29 ogt: Expert System 不應該假裝有高信心分數
# 設為 0.0 強制標記為規則匹配,而非 AI 仲裁
return {
"source": "expert_system",
"action": rule["action"].format(target=target),
"description": rule["description"],
"diagnosis_description": rule["description"], # openclaw.py reads this key
"risk_level": rule["risk_level"],
"reasoning": f"[規則匹配] {rule['reasoning']}", # 明確標示來源
"confidence": 0.0, # 🔴 規則匹配不是 AI 仲裁,信心度設 0
"kubectl_command": rule["action"].format(target=target),
"matched_rule": matched_rule,
"initial_diagnosis": matched_rule, # openclaw.py reads this key
"from_cache": False,
"is_rule_based": True,
}
# =============================================================================
# Decision Token (Redis)
# =============================================================================
class DecisionToken:
"""
決策令牌 - 前端持有此 token 即可操作
Redis Key: decision:{token}
TTL: 1 小時
"""
def __init__(
self,
token: str,
incident_id: str,
state: DecisionState,
proposal_data: dict[str, Any] | None = None,
proposal_id: str | None = None,
created_at: datetime | None = None,
updated_at: datetime | None = None,
error: str | None = None,
):
self.token = token
self.incident_id = incident_id
self.state = state
self.proposal_data = proposal_data
self.proposal_id = proposal_id
self.created_at = created_at or datetime.now(UTC)
self.updated_at = updated_at or datetime.now(UTC)
self.error = error
def to_dict(self) -> dict[str, Any]:
return {
"token": self.token,
"incident_id": self.incident_id,
"state": self.state.value,
"proposal_data": self.proposal_data,
"proposal_id": self.proposal_id,
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat(),
"error": self.error,
}
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "DecisionToken":
return cls(
token=data["token"],
incident_id=data["incident_id"],
state=DecisionState(data["state"]),
proposal_data=data.get("proposal_data"),
proposal_id=data.get("proposal_id"),
created_at=datetime.fromisoformat(data["created_at"]) if data.get("created_at") else None,
updated_at=datetime.fromisoformat(data["updated_at"]) if data.get("updated_at") else None,
error=data.get("error"),
)
# =============================================================================
# Protocol Interface (Phase 17 P1 - 紅區治理)
# =============================================================================
@runtime_checkable
class IDecisionManager(Protocol):
"""
DecisionManager 介面定義
用途:
- 依賴注入 (DI) 時的型別約束
- 測試時 Mock 的型別檢查
- 符合 leWOOOgo 積木化規範
Tier 3 紅區服務: 修改需首席架構師簽核
@see feedback_lewooogo_modular_enforcement.md
@see docs/RED_ZONES.md
"""
async def get_or_create_decision(
self,
incident: "Incident",
timeout_sec: float = 30.0,
) -> "DecisionToken":
"""取得或建立決策令牌"""
...
async def mark_executing(self, token: str) -> "DecisionToken | None":
"""標記決策為執行中"""
...
async def mark_completed(self, token: str, result: dict[str, Any] | None = None) -> "DecisionToken | None":
"""標記決策為已完成"""
...
# =============================================================================
# Decision Manager
# =============================================================================
DECISION_TOKEN_PREFIX = "decision:"
DECISION_TOKEN_TTL = 3600 # 1 小時
class DecisionManager:
"""
決策管理器 - Phase 6.5 核心
職責:
1. 為每個 Incident 簽發 decision_token
2. 並行執行 LLM + Expert System
3. First-Win 或 Fallback 策略
4. 確保 UI 永遠有決策可操作
"""
def __init__(self):
self._openclaw = get_openclaw()
# I2 修復 (首席架構師 Review): 注入 KnowledgeService 避免函數內 import 耦合
# 2026-04-04 Claude Code
from src.services.knowledge_service import get_knowledge_service
self._knowledge_svc = get_knowledge_service()
# I2 續修 2026-04-12 Claude Sonnet 4.6 Asia/Taipei: 注入 MCP Providers
# 原 _collect_mcp_context 在函數內 import + 直接實例化,違反積木化鐵律
from src.plugins.mcp.providers.ssh_provider import SSHProvider
from src.plugins.mcp.providers.k8s_provider import K8sProvider
self._ssh = SSHProvider()
self._k8s = K8sProvider()
async def get_or_create_decision(
self,
incident: Incident,
timeout_sec: float = 30.0,
) -> DecisionToken:
"""
取得或建立決策令牌
核心邏輯:
1. 檢查是否已有 token
2. 沒有則建立新 token (INIT)
3. 啟動非同步分析 (ANALYZING)
4. 等待結果或 timeout 後使用 Expert System
這個方法保證在 timeout_sec 內返回有效 token
"""
_redis_client = get_redis()
# 1. 先檢查現有 token所有類型統一入口
# 2026-04-16 ogt + Claude Sonnet 4.6: 修復 TYPE-1 bypass 未檢查 existing token 導致
# HostBackupFailed 等告警重複洗版 — existing token 檢查必須在 TYPE-1 bypass 前執行
existing_token = await self._find_existing_token(incident.incident_id)
if existing_token:
# READY 或 EXECUTING 狀態: 直接返回
if existing_token.state in (DecisionState.READY, DecisionState.EXECUTING):
return existing_token
# COMPLETED 狀態: 直接返回,避免重複建立 decision 導致 Telegram 轟炸
if existing_token.state == DecisionState.COMPLETED:
return existing_token
# 2026-04-16 ogt + Claude Sonnet 4.6: 修復重複卡片根因 — ANALYZING 未早返回
# 問題:多 pod 並發時 pod-A 在 ANALYZINGpod-B/C 發現 ANALYZING 不在返回條件
# → 各自建新 token → 同一 incident 跑 3 次 agent_debate → 送出 3 張 TG 卡
# 修復ANALYZING 狀態也直接返回,避免重複處理
if existing_token.state == DecisionState.ANALYZING:
logger.debug(
"decision_analyzing_in_progress",
incident_id=incident.incident_id,
token=existing_token.token,
reason="另一個 worker 正在分析中,跳過重複建立",
)
return existing_token
# ADR-073 Phase 3-1: TYPE-1 triage guard — 純資訊告警跳過 LLM 分析
# classify_alert_early() 已在 webhook 入口設定 notification_type
# TYPE-1 (info/backup/heartbeat) 不需 AI 推理,直接推 Telegram 後返回
# 2026-04-12 ogt
if getattr(incident, "notification_type", None) == "TYPE-1":
_info_token = DecisionToken(
token=f"DEC-{uuid4().hex[:12].upper()}",
incident_id=incident.incident_id,
state=DecisionState.COMPLETED,
proposal_data={
"source": "triage_guard",
"notification_type": "TYPE-1",
"decision_state": "COMPLETED",
"auto_executed": False,
"confidence": 1.0,
"risk_level": "low",
"description": "純資訊通知,無需操作",
},
)
# 2026-04-16 ogt + Claude Sonnet 4.6: TYPE-1 token TTL 24h 防洗版
# 原 3600s 導致每小時重推同一 HostBackupFailed/TYPE-1 告警
await self._save_token(_info_token, ttl=86400)
_fire_and_forget(_push_decision_to_telegram(incident, _info_token.proposal_data))
logger.info(
"decision_type1_bypass",
incident_id=incident.incident_id,
notification_type="TYPE-1",
)
return _info_token
# 2. 建立新 token
token = DecisionToken(
token=f"DEC-{uuid4().hex[:12].upper()}",
incident_id=incident.incident_id,
state=DecisionState.ANALYZING,
)
await self._save_token(token)
logger.info(
"decision_analyzing",
token=token.token,
incident_id=incident.incident_id,
)
# 3. 並行執行雙軌決策
try:
proposal_data = await asyncio.wait_for(
self._dual_engine_analyze(incident),
timeout=timeout_sec,
)
token.state = DecisionState.READY
token.proposal_data = proposal_data
token.updated_at = datetime.now(UTC)
logger.info(
"decision_ready",
token=token.token,
source=proposal_data.get("source", "unknown"),
)
except TimeoutError:
# Timeout: 使用 Expert System 保底
logger.warning(
"decision_timeout_using_expert",
token=token.token,
timeout_sec=timeout_sec,
)
expert_result = expert_analyze(incident)
token.state = DecisionState.READY
token.proposal_data = expert_result
token.updated_at = datetime.now(UTC)
except Exception as e:
# 任何錯誤: 使用 Expert System 保底
logger.exception(
"decision_error_using_expert",
token=token.token,
error=str(e),
)
expert_result = expert_analyze(incident)
token.state = DecisionState.READY
token.proposal_data = expert_result
token.error = str(e)
token.updated_at = datetime.now(UTC)
# 4. 儲存最終結果
await self._save_token(token)
# 4b. 2026-04-16 Claude Sonnet 4.6: 將 AI 分析結果寫入 incidents.decision_chain (DB 長期保存)
# 修復 Gap: decision token 只有 Redis TTL ~12minAI 分析結果歷史永久丟失
# 寫入: diagnosis / confidence / risk_level / provider / source
if token.proposal_data:
_fire_and_forget(
self._persist_decision_to_db(incident.incident_id, token.proposal_data)
)
# 4c. YAML NO_ACTION 閘門 — 決策路由中樞2026-04-17 ogt + Claude Sonnet 4.6
# 架構原則通知層_push_decision_to_telegram只做狀態轉譯不查業務邏輯。
# 因此 NO_ACTION / INVALID_TARGET 判斷必須在此閘門統一完成,任何路徑都必須通過。
#
# 根因先前盲點Phase 2 路徑拒絕後直接推 TG不經 auto_execute() 的 YAML check
# Coordinator 不設 blocked_reason → TG 收到空 blocked_reason → 無法正確分類通知類型。
# 修復:在 auto_approve 之前,統一查詢 YAML 規則:
# NO_ACTION → 標記 blocked_reason + is_informational_only → 短路跳過 auto_approve
# INVALID_TARGET → 標記 blocked_reason → 短路TYPE-4 由 TG 層轉譯
if token.state == DecisionState.READY and token.proposal_data and incident.signals:
_gate_alertname = incident.signals[0].labels.get("alertname", "")
if _gate_alertname and not token.proposal_data.get("blocked_reason", ""):
try:
from src.services.alert_rule_engine import match_rule as _gate_match
_gate_r = _gate_match({
"labels": incident.signals[0].labels,
"alert_type": _gate_alertname,
"message": "",
"target_resource": (
incident.affected_services[0]
if incident.affected_services else "unknown"
),
"namespace": "awoooi-prod",
})
if _gate_r:
_gate_blocked = _gate_r.get("blocked_reason", "")
_gate_action = _gate_r.get("suggested_action", "")
if "INVALID_TARGET" in _gate_blocked or _gate_action == "NO_ACTION":
# 標記 blocked_reason讓 TG 層正確轉譯通知類型
if "INVALID_TARGET" in _gate_blocked:
token.proposal_data["blocked_reason"] = _gate_blocked
else:
token.proposal_data["blocked_reason"] = f"YAML: NO_ACTION for {_gate_alertname}"
token.proposal_data["is_informational_only"] = True
token.proposal_data["auto_executed"] = False
token.proposal_data["decision_state"] = token.state.value
await self._save_token(token)
_fire_and_forget(
_push_decision_to_telegram(incident, token.proposal_data)
)
logger.info(
"yaml_gate_short_circuit",
incident_id=incident.incident_id,
alertname=_gate_alertname,
blocked_reason=token.proposal_data["blocked_reason"],
)
return token # 短路 — 跳過 auto_approve + Blast Radius 評估
except Exception as _gate_err:
logger.debug("yaml_gate_error", error=str(_gate_err))
# 閘門查詢失敗 → 降級繼續正常流程(不阻塞)
# 5. ADR-030 Phase 4: 自動執行判斷
if token.state == DecisionState.READY and token.proposal_data:
# 評估是否可以自動執行
auto_policy = get_auto_approve_policy()
auto_decision = auto_policy.evaluate(
proposal_data=token.proposal_data,
playbook=token.proposal_data.get("_matched_playbook"), # 如果有
)
if auto_decision.should_auto_approve:
# 自動執行 (跳過人工審核)
logger.info(
"auto_approve_triggered",
incident_id=incident.incident_id,
reason=auto_decision.reason.value,
detail=auto_decision.reason_detail,
)
token.state = DecisionState.EXECUTING
token.proposal_data["auto_approved"] = True
token.proposal_data["auto_approve_reason"] = auto_decision.reason_detail
await self._save_token(token)
# 觸發自動執行 (非阻塞)
_fire_and_forget(
self._auto_execute(incident, token)
)
else:
# 需人工審核: 推送到 Telegram
# ADR-071: 注入 decision_state + auto_executed 供 classify_notification 使用
token.proposal_data["decision_state"] = token.state.value if token.state else ""
token.proposal_data["auto_executed"] = False
_fire_and_forget(
_push_decision_to_telegram(incident, token.proposal_data)
)
return token
async def _auto_execute(self, incident: Incident, token: "DecisionToken") -> None:
"""
ADR-030 Phase 4: 自動執行已批准的操作
僅當 AutoApprovePolicy 判斷可自動執行時呼叫
執行後發 Telegram 結果通知 (統帥要求: 修復結果對應同一告警)
2026-04-09 Claude Sonnet 4.6 Asia/Taipei
"""
action = token.proposal_data.get("kubectl_command", "")
# 2026-04-15 ogt: YAML 規則引擎優先 — 架構斷點修復
# 根因LLM 生成的 kubectl_command 與 YAML 規則引擎的 NO_ACTION / SSH 指令完全脫節
# YAML 規則是人工審閱的權威來源LLM 只是輔助
# 修復策略:
# 1. YAML → NO_ACTION → 立即返回,不執行任何操作
# 2. YAML → SSH 指令(非 kubectl→ 覆蓋 LLM 生成的 action讓 SSH 路由生效
_alertname_for_yaml = incident.signals[0].labels.get("alertname", "") if incident.signals else ""
if _alertname_for_yaml:
try:
from src.services.alert_rule_engine import match_rule as _yaml_match
_yaml_r = _yaml_match({
"labels": incident.signals[0].labels if incident.signals else {},
"alert_type": _alertname_for_yaml,
"message": "",
"target_resource": incident.affected_services[0] if incident.affected_services else "unknown",
"namespace": "awoooi-prod",
})
if _yaml_r:
if _yaml_r.get("suggested_action") == "NO_ACTION":
logger.info(
"auto_execute_yaml_no_action",
incident_id=incident.incident_id,
alertname=_alertname_for_yaml,
reason="YAML 規則明確標記 NO_ACTION不執行自動修復",
)
token.state = DecisionState.READY
token.proposal_data["auto_executed"] = False
token.proposal_data["blocked_reason"] = f"YAML: NO_ACTION for {_alertname_for_yaml}"
await self._save_token(token)
_fire_and_forget(_push_decision_to_telegram(incident, token.proposal_data))
return
# 2026-04-16 ogt + Claude Sonnet 4.6: INVALID_TARGET → 人工確認 TYPE-4
# 根因target 無法解析 → rule engine 清空 kubectl_command + 設 blocked_reason
# 系統不應繼續嘗試執行,提早返回讓 SRE 介入
_yaml_blocked = _yaml_r.get("blocked_reason", "")
if "INVALID_TARGET" in _yaml_blocked:
logger.warning(
"auto_execute_yaml_invalid_target",
incident_id=incident.incident_id,
alertname=_alertname_for_yaml,
blocked_reason=_yaml_blocked,
)
token.state = DecisionState.READY
token.proposal_data["auto_executed"] = False
token.proposal_data["blocked_reason"] = _yaml_blocked
await self._save_token(token)
_fire_and_forget(_push_decision_to_telegram(incident, token.proposal_data))
return
_yaml_cmd = (_yaml_r.get("kubectl_command") or "").strip()
if _yaml_cmd and not _yaml_cmd.startswith("kubectl"):
# YAML 給出 SSH / docker 指令 → 覆蓋 LLM 生成的 kubectl action
action = _yaml_cmd
logger.info(
"auto_execute_yaml_cmd_override",
incident_id=incident.incident_id,
alertname=_alertname_for_yaml,
yaml_cmd=_yaml_cmd[:80],
)
except Exception as _yaml_err:
logger.debug("auto_execute_yaml_check_error", error=str(_yaml_err))
# Phase 6 ADR-087: 自我降級守衛AIOPS_P6_SELF_DEMOTION 控制)
# SLO 違反 → 全域信心閾值調高;連續違反 → 保守模式,所有自動執行降為人工
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 6 初始建立
try:
from src.core.feature_flags import aiops_flags as _p6_flags
if _p6_flags.is_sub_flag_enabled("AIOPS_P6_SELF_DEMOTION"):
from src.db.base import get_session_factory as _p6_sf
from src.db.models import AiGovernanceEvent as _GovernanceEvent
from sqlalchemy import select as _p6_select, func as _p6_func
from datetime import timedelta as _p6_td
_now = __import__("src.utils.timezone", fromlist=["now_taipei"]).now_taipei()
async with _p6_sf()() as _p6_sess:
# 過去 7 天有幾筆未解決的 slo_violation
_viol_7d_q = await _p6_sess.execute(
_p6_select(_p6_func.count()).where(
_GovernanceEvent.event_type == "slo_violation",
_GovernanceEvent.resolved.is_(False),
_GovernanceEvent.triggered_at >= _now - _p6_td(days=7),
)
)
_viol_7d: int = _viol_7d_q.scalar() or 0
# 過去 14 天有幾筆未解決的 slo_violation
_viol_14d_q = await _p6_sess.execute(
_p6_select(_p6_func.count()).where(
_GovernanceEvent.event_type == "slo_violation",
_GovernanceEvent.resolved.is_(False),
_GovernanceEvent.triggered_at >= _now - _p6_td(days=14),
)
)
_viol_14d: int = _viol_14d_q.scalar() or 0
if _viol_14d >= 2:
# 連續 2 週違反 → 保守模式:全部降為人工
logger.warning(
"auto_execute_conservative_mode",
incident_id=incident.incident_id,
viol_14d=_viol_14d,
reason="Phase 6 保守模式:連續 SLO 違反,所有自動執行暫停",
)
token.state = DecisionState.READY
token.proposal_data["decision_state"] = DecisionState.READY.value
token.proposal_data["auto_executed"] = False
token.proposal_data["p6_conservative_mode"] = True
token.proposal_data["p6_reason"] = f"SLO 連續違反 {_viol_14d}d系統進入保守模式"
await self._save_token(token)
_fire_and_forget(
_push_decision_to_telegram(incident, token.proposal_data)
)
# 記錄保守模式事件
try:
from src.db.base import get_session_factory as _p6_sf2
async with _p6_sf2()() as _s2:
_s2.add(_GovernanceEvent(
event_type="conservative_mode",
details={
"incident_id": incident.incident_id,
"viol_14d": _viol_14d,
"triggered_at": _now.isoformat(),
},
resolved=False,
))
await _s2.commit()
except Exception:
pass
return
elif _viol_7d >= 1:
# 近 7 天有違反 → 自我降級:信心閾值提高,記錄 demotion 事件
_confidence = float(token.proposal_data.get("confidence", 0.0))
_raised_threshold = 0.75 # 原 0.70 → 調高 0.05
if _confidence < _raised_threshold:
logger.warning(
"auto_execute_self_demoted",
incident_id=incident.incident_id,
confidence=_confidence,
raised_threshold=_raised_threshold,
reason="Phase 6 自我降級:近 7d SLO 違反,信心閾值提高",
)
token.state = DecisionState.READY
token.proposal_data["decision_state"] = DecisionState.READY.value
token.proposal_data["auto_executed"] = False
token.proposal_data["p6_self_demoted"] = True
token.proposal_data["p6_reason"] = (
f"Phase 6 自我降級:近 7d SLO 違反,"
f"信心 {_confidence:.2f} < {_raised_threshold},升為人工"
)
await self._save_token(token)
_fire_and_forget(
_push_decision_to_telegram(incident, token.proposal_data)
)
try:
from src.db.base import get_session_factory as _p6_sf3
async with _p6_sf3()() as _s3:
_s3.add(_GovernanceEvent(
event_type="self_demotion",
details={
"incident_id": incident.incident_id,
"confidence": _confidence,
"raised_threshold": _raised_threshold,
"viol_7d": _viol_7d,
"triggered_at": _now.isoformat(),
},
resolved=False,
))
await _s3.commit()
except Exception:
pass
return
# confidence >= raised_threshold → 允許繼續自動執行
except Exception as _p6_err:
logger.warning("p6_self_demotion_check_error", error=str(_p6_err))
# 保守P6 check 出錯 → 不阻擋(避免因 P6 bug 把所有修復都堵住)
# ADR-073 Phase 3-5: action | parse fix (2026-04-12 ogt)
# LLM 有時輸出 "kubectl rollout restart X | kubectl get pods -n Y"
# | 後面是查詢指令,取第一個才是真正的修復操作
if action and "|" in action:
action = action.split("|")[0].strip()
logger.debug("action_pipe_stripped", incident_id=incident.incident_id, action=action)
# NO_ACTION 規則(備份失敗/E2E smoke test 等)— kubectl_command 為空,不執行,直接返回
# 2026-04-11 Claude Sonnet 4.6: 防止空 action 或 NO_ACTION 字串進入自動執行流程
_suggested_action = token.proposal_data.get("suggested_action", "")
if not action or _suggested_action == "NO_ACTION":
logger.info(
"auto_execute_skipped_no_action",
incident_id=incident.incident_id,
suggested_action=_suggested_action,
reason="規則標記 NO_ACTION 或 kubectl_command 為空,不執行自動修復",
)
token.state = DecisionState.READY
await self._save_token(token)
_fire_and_forget(
_push_decision_to_telegram(incident, token.proposal_data)
)
return
# 替換所有 placeholder — {target}/{namespace}/<deployment_name> 等
_target = incident.affected_services[0] if incident.affected_services else "unknown"
_ns = "awoooi-prod"
if incident.signals:
_ns = incident.signals[0].labels.get("namespace", "awoooi-prod")
import re as _re
# BUG-002 修復 2026-04-11: 主機層告警(HostHighCpuLoad 等)無 component/job/pod label
# → affected_services=[] → target="unknown" → safety guard 攔截
# 補救:用 K8s MCP 依 alertname/host label 動態查詢受影響 Pod
if _target == "unknown":
_target = await _resolve_target_from_k8s(incident, _ns) or "unknown"
action = action.replace("{target}", _target).replace("{namespace}", _ns)
# <xxx> 格式佔位符 → 用 target 替換
action = _re.sub(r"<deployment_name>", _target, action)
action = _re.sub(r"<[^>]+>", _target, action)
# GAP-A4 Phase 2 (2026-04-14 Claude Sonnet 4.6): LLM 路徑 target 救援
# 真兇LLM 直接產出 `kubectl scale deployment HostHighCpuLoad` (target=alertname)
# GAP-A4 Phase 1 只修了 rule_engine._extract_varsLLM 路徑沒檢查
# 結果12 次 auto_execute_blocked_unresolved_placeholder 直接攔下 → 飛輪 0
try:
from src.services.alert_rule_engine import (
_extract_vars as _rule_extract_vars,
_is_bad_target as _rule_is_bad_target,
)
_alertname_for_rescue = (
incident.signals[0].labels.get("alertname", "") if incident.signals else ""
)
# 從 action 提取 deployment 名稱kubectl scale/restart deployment XXX 或 deployment/XXX
_kubectl_target_match = _re.search(
r"deployment[/\s]+([\w.\-]+)", action
)
if _kubectl_target_match:
_llm_target = _kubectl_target_match.group(1)
if _rule_is_bad_target(_llm_target, _alertname_for_rescue):
# LLM 把垃圾當 deployment 名alertname/unknown/IP→ 重推
_alert_ctx = {
"labels": incident.signals[0].labels if incident.signals else {},
"target_resource": _target,
"namespace": _ns,
"alert_type": _alertname_for_rescue,
}
_good_target = _rule_extract_vars(_alert_ctx).get("target", "")
if _good_target and _good_target != "unknown" and not _rule_is_bad_target(_good_target, _alertname_for_rescue):
_old_action = action
action = action.replace(_llm_target, _good_target)
logger.info(
"auto_execute_target_rescued",
incident_id=incident.incident_id,
llm_target=_llm_target,
rescued_target=_good_target,
old_action=_old_action[:120],
new_action=action[:120],
reason="LLM 產出垃圾 target從 labels 重推 deployment 名",
)
else:
logger.warning(
"auto_execute_target_rescue_failed",
incident_id=incident.incident_id,
llm_target=_llm_target,
alertname=_alertname_for_rescue,
reason="labels 也找不到合法 deployment將進 safety guard 攔截 → 人工",
)
except Exception as _rescue_err:
logger.debug("target_rescue_skipped", error=str(_rescue_err))
# ADR-073 Phase 3-2: infrastructure 告警 (Docker/Host) → SSH MCP routing (2026-04-12 ogt)
# alert_category = "infrastructure" 表示 Docker 告警,非 kubectl action → SSH
# P1-1 fix 2026-04-12: 必須在 kubectl safety guard 之前 routing否則 docker 指令被 _action_safe=False 攔截
_alert_category = getattr(incident, "alert_category", None) or ""
if _alert_category == "infrastructure" and action and not action.startswith("kubectl"):
await self._ssh_execute(incident, token, action, _target)
return
# 2026-04-15 ogt: host_resource 告警HostHighCpuLoad 等)不是 K8s workload 問題
# 不得執行 kubectl 操作,改降級人工審核
# 根因:原本只擋了 infrastructure忘記 host_resource 也不走 K8s
if _alert_category == "host_resource" and action and action.startswith("kubectl"):
logger.warning(
"auto_execute_blocked_host_resource_no_k8s",
incident_id=incident.incident_id,
alert_category=_alert_category,
action=action[:80],
reason="host_resource 告警不應執行 K8s kubectl 操作,降級人工審核",
)
token.state = DecisionState.READY
token.proposal_data["auto_executed"] = False
token.proposal_data["mcp_all_failed"] = True
token.proposal_data["blocked_reason"] = "host_resource 告警禁止 K8s kubectl請人工排查主機"
await self._save_token(token)
_fire_and_forget(_push_decision_to_telegram(incident, token.proposal_data))
return
# 安全守衛: 替換後仍含 "unknown" 或未替換的 <...>/{...} → 拒絕執行
# 另外:若 target 等於 alertname代表 LLM 把告警名稱填入 deployment_name也拒絕
_alertname = incident.signals[0].labels.get("alertname", "") if incident.signals else ""
_target_is_alertname = bool(_alertname and _target == _alertname)
# P1 fix 2026-04-11: kubectl action 危險字元白名單 — 防止 && || ; > | 注入
_action_safe = bool(_ALLOWED_KUBECTL_PATTERN.match(action.strip()))
if "unknown" in action or _re.search(r"[<{][^>}]+[>}]", action) or _target_is_alertname or not _action_safe:
logger.warning(
"auto_execute_blocked_unresolved_placeholder",
incident_id=incident.incident_id,
action=action,
target=_target,
reason="action 含未解析的 placeholder、unknown、target==alertname、或危險字元拒絕執行",
)
# Safety guard 攔截 → 降級為人工審核,而非「修復失敗」
# 2026-04-11 Claude Sonnet 4.6: 不發 ❌ 失敗訊息,改發人工審核卡片
# 讓統帥看到告警並決定如何處理,而不是重複收到「無法確認 deployment 名稱」
token.state = DecisionState.READY # 回到 READY讓人工審核
token.error = f"Auto-execute blocked (safety guard): {action[:80]}"
token.proposal_data["decision_state"] = DecisionState.READY.value
token.proposal_data["auto_executed"] = False
token.proposal_data["mcp_all_failed"] = True # 標記讓 classify_notification → TYPE-4
await self._save_token(token)
_fire_and_forget(
_push_decision_to_telegram(incident, token.proposal_data)
)
return
# BUG-003 修復 2026-04-11: 加入 K8s deployment 存在性驗證,
# 避免 LLM 產生的無效 deployment name<placeholder>/alertname/unknown通過 safety guard
# 但仍對 K8s 發出錯誤指令
if _target and _target != "unknown":
_k8s_verified = await _verify_k8s_deployment_exists(_target, _ns, alertname=_alertname)
if not _k8s_verified:
logger.warning(
"auto_execute_blocked_deployment_not_found",
incident_id=incident.incident_id,
target=_target,
namespace=_ns,
reason="K8s 中找不到此 deployment/pod拒絕執行",
)
# K8s 找不到 target → 降級為人工審核,同 safety guard 策略
# 2026-04-11 Claude Sonnet 4.6: 不發 ❌ 失敗,改發 TYPE-4 人工審核卡片
token.state = DecisionState.READY
token.error = f"Auto-execute blocked: deployment '{_target}' not found in K8s"
token.proposal_data["decision_state"] = DecisionState.READY.value
token.proposal_data["auto_executed"] = False
token.proposal_data["mcp_all_failed"] = True
await self._save_token(token)
_fire_and_forget(
_push_decision_to_telegram(incident, token.proposal_data)
)
return
# Phase 5 ADR-086: Blast Radius 分級守衛AIOPS_P5_BLAST_RADIUS_CHECK 控制)
# 評估修復動作的爆炸半徑,決定是否可自動執行或需升級審核
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 5 初始建立
try:
from src.core.feature_flags import aiops_flags as _p5_flags
if _p5_flags.AIOPS_P5_BLAST_RADIUS_CHECK:
from src.services.blast_radius_calculator import get_blast_radius_calculator
from src.services.declarative_remediation import get_declarative_remediation
_calc = get_blast_radius_calculator()
_blast = _calc.calculate(action, namespace=_ns, target=_target)
_spec = get_declarative_remediation().evaluate(
action=action, target=_target, namespace=_ns,
description=token.proposal_data.get("description", ""),
)
# 記錄分級結果到 proposal_data供學習 + 審計)
token.proposal_data["blast_radius_score"] = _blast.score
token.proposal_data["blast_radius_tier"] = _blast.tier
token.proposal_data["blast_radius_reason"] = _blast.reason
if _blast.tier == "blocked":
# HARD_RULES 永擋
logger.warning(
"auto_execute_blast_radius_hard_blocked",
incident_id=incident.incident_id,
action=action[:80],
reason=_blast.reason,
)
token.state = DecisionState.READY
token.proposal_data["auto_executed"] = False
token.proposal_data["mcp_all_failed"] = True
token.proposal_data["blocked_reason"] = f"HARD_RULES 永擋:{_blast.reason}"
await self._save_token(token)
_fire_and_forget(_push_decision_to_telegram(incident, token.proposal_data))
return
elif _blast.tier in ("human", "dual"):
# 中高衝擊 → 升級人工審核,不自動執行
logger.info(
"auto_execute_blast_radius_escalated",
incident_id=incident.incident_id,
tier=_blast.tier,
score=_blast.score,
action=action[:80],
)
token.state = DecisionState.READY
token.proposal_data["auto_executed"] = False
token.proposal_data["requires_human_review"] = True
token.proposal_data["blast_radius_escalated"] = True
await self._save_token(token)
# dual tier → 非同步建立 GitOps Issue
if _blast.tier == "dual" and _p5_flags.AIOPS_P5_GITOPS_PR:
from src.services.gitops_pr_service import get_gitops_pr_service
_fire_and_forget(
get_gitops_pr_service().create_repair_issue(
spec=_spec,
incident_id=incident.incident_id,
diagnosis=token.proposal_data.get("debate_summary", ""),
)
)
_fire_and_forget(_push_decision_to_telegram(incident, token.proposal_data))
return
# tier == "auto" → 繼續自動執行流程
except Exception as _blast_err:
# Blast Radius 計算失敗 → 保守:視為 human tier升級人工審核
logger.warning(
"blast_radius_check_failed_conservative_escalate",
incident_id=incident.incident_id,
error=str(_blast_err),
)
token.state = DecisionState.READY
token.proposal_data["auto_executed"] = False
token.proposal_data["blast_radius_tier"] = "unknown_conservative"
await self._save_token(token)
_fire_and_forget(_push_decision_to_telegram(incident, token.proposal_data))
return
# 2026-04-15 ogt: 同一 target 5 分鐘內最多執行 2 次,防止修復風暴
# 根因:多個 incident 共享同一 target 時,各自獨立自動執行 → 重複重啟
try:
from src.core.redis_client import get_redis as _get_redis_dm
_redis_dm = _get_redis_dm()
_dm_cooldown_key = f"awoooi:auto_execute_cooldown:{_ns}:{_target}"
_dm_exec_count = await _redis_dm.get(_dm_cooldown_key)
if _dm_exec_count and int(_dm_exec_count) >= 2:
logger.warning(
"auto_execute_cooldown_blocked",
incident_id=incident.incident_id,
target=_target,
namespace=_ns,
exec_count=int(_dm_exec_count),
reason="同一 target 5 分鐘內已自動執行 2 次,冷卻中",
)
token.state = DecisionState.READY
token.proposal_data["auto_executed"] = False
token.proposal_data["cooldown_blocked"] = True
await self._save_token(token)
return
await _redis_dm.incr(_dm_cooldown_key)
await _redis_dm.expire(_dm_cooldown_key, 300) # 5 分鐘
except Exception as _cd_err:
logger.debug("auto_execute_cooldown_check_error", error=str(_cd_err))
try:
# 延遲導入避免循環依賴
from src.models.approval import ApprovalRequest, ApprovalStatus
from src.services.approval_execution import ApprovalExecutionService
# 建立虛擬 ApprovalRequest (auto_execute — 不需人工審核)
_risk = token.proposal_data.get("risk_level", "low")
# ADR-083 Phase 3: 傳遞 matched_playbook_id學習迴路 EWMA 才能觸發
# Playbook RAG 命中時 proposal_data["playbook_id"] 會有值(見 _try_playbook_match
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 3 matched_playbook_id 修復
_matched_playbook_id = token.proposal_data.get("playbook_id")
approval = ApprovalRequest(
incident_id=incident.incident_id,
action=action,
description=token.proposal_data.get("description", action[:100]),
requested_by="auto_approve",
required_signatures=0,
status=ApprovalStatus.APPROVED,
risk_level=_risk,
matched_playbook_id=_matched_playbook_id,
)
# ADR-071-I: 執行前抓 metrics_before 快照 (2026-04-11 Claude Sonnet 4.6)
_metrics_before = await _fetch_metrics_snapshot(incident)
if _metrics_before:
try:
from src.db.base import get_db_context
from src.db.models import Incident as IncidentORM
from sqlalchemy import update as _sa_update
async with get_db_context() as _db:
await _db.execute(
_sa_update(IncidentORM)
.where(IncidentORM.incident_id == incident.incident_id)
.values(metrics_before=_metrics_before)
)
await _db.commit()
except Exception as _mb_err:
logger.debug("metrics_before_save_failed",
incident_id=incident.incident_id, error=str(_mb_err))
# 執行
# 2026-04-17 ogt + Claude Sonnet 4.6 (Checkpoint-1 假成功修復):
# 舊 bug: execute_approved_action 返回 None → 此處永遠傳 success=True 給
# _push_auto_repair_result → Telegram 顯示 ✅ 自動修復完成,即使 K8s 拒絕了指令
# 修復: execute_approved_action 現在返回 bool正確透傳給通知函數
executor = ApprovalExecutionService()
_exec_success = await executor.execute_approved_action(approval)
# 更新狀態
token.state = DecisionState.COMPLETED
token.proposal_data["auto_executed"] = True
token.proposal_data["exec_success"] = _exec_success
await self._save_token(token)
logger.info(
"auto_execute_completed",
incident_id=incident.incident_id,
action=approval.action,
exec_success=_exec_success,
)
# 2026-04-09 Claude Sonnet 4.6: 執行完成 → 發 Telegram 結果通知(成功或失敗皆發)
_fire_and_forget(
_push_auto_repair_result(incident, action, success=_exec_success)
)
except Exception as e:
logger.error(
"auto_execute_failed",
incident_id=incident.incident_id,
error=str(e),
)
token.state = DecisionState.ERROR
token.error = f"Auto-execute failed: {e}"
await self._save_token(token)
# 2026-04-09 Claude Sonnet 4.6: 執行失敗 → 發 Telegram 失敗通知 + fallback 人工
_fire_and_forget(
_push_auto_repair_result(incident, action, success=False, error=str(e))
)
_fire_and_forget(
_push_decision_to_telegram(incident, token.proposal_data)
)
async def _query_kb_context_inner(self, incident: Incident) -> str:
"""KB RAG 實際查詢邏輯,由 _query_kb_context 包裝 timeout 後呼叫"""
query_parts = list(incident.affected_services)
if incident.signals:
query_parts.insert(0, getattr(incident.signals[0], "alert_name", ""))
query = " ".join(filter(None, query_parts))
results = await self._knowledge_svc.semantic_search(query, limit=3, threshold=0.4)
if not results:
return ""
lines = ["## Knowledge Base Related Entries (KB RAG)"]
for entry, score in results:
lines.append(
f"\n### [{entry.entry_type}] {entry.title} (similarity={score:.2f})"
)
lines.append(entry.content[:500])
if len(entry.content) > 500:
lines.append("... (truncated)")
logger.info(
"kb_rag_context_injected",
incident_id=incident.incident_id,
kb_hits=len(results),
)
return "\n".join(lines)
async def _query_kb_context(self, incident: Incident) -> str:
"""
KB Phase 2: 語意搜尋相關 KB 條目,組裝為 LLM context 字串
2026-04-04 Claude Code: KB RAG 整合
C1 修復 (首席架構師審查): 5 秒 hard timeout防止 Ollama 慢響應威脅 30s SLA
失敗/timeout 時靜默降級,不影響主分析流程
"""
try:
return await asyncio.wait_for(
self._query_kb_context_inner(incident),
timeout=5.0,
)
except asyncio.TimeoutError:
logger.warning("kb_rag_timeout", incident_id=incident.incident_id)
return ""
except (ConnectionError, OSError) as e:
# Ollama 連線問題,預期可降級
logger.warning("kb_rag_connection_error", incident_id=incident.incident_id, error=str(e))
return ""
except Exception as e:
# 非預期錯誤,用 error 級別方便監控
logger.error("kb_rag_unexpected_error", incident_id=incident.incident_id, error=str(e))
return ""
async def _collect_mcp_context(self, incident: Incident) -> str:
"""
ADR-070 全自動 AIOps: 分析前用 MCP 收集真實環境狀態
讓 LLM 拿到真實資訊做決策,而非只憑 alert labels
策略:
- K8s 告警 → K8s MCP 查 Pod 狀態/事件
- 主機/Docker 告警 → SSH MCP 查容器狀態/資源
2026-04-11 Claude Sonnet 4.6 Asia/Taipei
"""
if not incident.signals:
return ""
labels = incident.signals[0].labels
alertname = labels.get("alertname", "")
host = labels.get("instance", "").split(":")[0] or labels.get("host", "")
# C1 修復 2026-04-11 (Code Review): 修正 Python ternary 優先度問題
# 原: `A or B or C[0] if list else ""` → ternary 控制全式,非僅 C[0]
container = (
labels.get("name")
or labels.get("container")
or (incident.affected_services[0] if incident.affected_services else "")
)
ns = labels.get("namespace", "awoooi-prod")
ctx_parts: list[str] = []
# C2 修復 2026-04-11 (Code Review): 所有 MCP 呼叫加 5s timeout防止阻塞決策主路徑
_MCP_TIMEOUT = 5.0
# 主機/Docker 告警 → SSH MCP 診斷
_HOST_ALERT_PREFIXES = ("Host", "Docker", "Sentry", "Harbor", "Ollama", "Backup")
if alertname.startswith(_HOST_ALERT_PREFIXES) and host:
try:
ssh = self._ssh
# C4: 未知主機記錄 warning不靜默跳過
# P0.4 fix 2026-04-24 ogt + Claude Sonnet 4.6: 補入 120/121與 ssh_provider default 對齊
_KNOWN_HOSTS = ("192.168.0.188", "192.168.0.110", "192.168.0.120", "192.168.0.121")
if ssh.enabled and host not in _KNOWN_HOSTS:
logger.warning("mcp_context_unknown_host", host=host, known=_KNOWN_HOSTS)
if ssh.enabled and host in _KNOWN_HOSTS:
# 查容器狀態
if container and container != alertname:
status_result = await asyncio.wait_for(
ssh.execute(
tool_name="ssh_get_container_status",
# P0.4 fix 2026-04-24 ogt + Claude Sonnet 4.6: params= → parameters=(符合 MCPToolProvider.execute 簽名)
parameters={"host": host, "container_name": container},
),
timeout=_MCP_TIMEOUT,
)
# P0.4 fix 2026-04-24 ogt + Claude Sonnet 4.6: MCPToolResult 是 dataclass用 .success/.output 而非 .get()
if status_result.success:
ctx_parts.append(f"[SSH] 容器 {container} 狀態: {(status_result.output or '')[:300]}")
# 查主機資源
if "CpuLoad" in alertname or "Memory" in alertname:
top_result = await asyncio.wait_for(
ssh.execute(
tool_name="ssh_get_top_processes",
# P0.4 fix 2026-04-24 ogt + Claude Sonnet 4.6: params= → parameters=
parameters={"host": host, "top_n": 5},
),
timeout=_MCP_TIMEOUT,
)
# P0.4 fix 2026-04-24 ogt + Claude Sonnet 4.6: MCPToolResult dataclass 用 .success/.output
if top_result.success:
ctx_parts.append(f"[SSH] 主機 {host} Top processes: {(top_result.output or '')[:300]}")
except asyncio.TimeoutError:
logger.warning("mcp_context_ssh_timeout", alertname=alertname, host=host, timeout=_MCP_TIMEOUT)
except Exception as e:
logger.debug("mcp_context_ssh_failed", alertname=alertname, error=str(e))
# K8s 告警 → K8s MCP 查 Pod 狀態
if alertname.startswith(("Kube", "K3s")) or labels.get("pod"):
try:
k8s = self._k8s
if k8s.enabled:
pod = labels.get("pod", "")
if pod:
events_result = await asyncio.wait_for(
k8s.execute(
tool_name="k8s_get_events",
# P0.4 fix 2026-04-24 ogt + Claude Sonnet 4.6: params= → parameters=
parameters={"namespace": ns, "field_selector": f"involvedObject.name={pod}"},
),
timeout=_MCP_TIMEOUT,
)
# P0.4 fix 2026-04-24 ogt + Claude Sonnet 4.6: MCPToolResult 是 dataclass用 .success/.output
if events_result.success:
ctx_parts.append(f"[K8s] Pod {pod} 事件: {(events_result.output or '')[:300]}")
except asyncio.TimeoutError:
logger.warning("mcp_context_k8s_timeout", alertname=alertname, timeout=_MCP_TIMEOUT)
except Exception as e:
logger.debug("mcp_context_k8s_failed", alertname=alertname, error=str(e))
return "\n".join(ctx_parts)
async def _dual_engine_analyze(
self,
incident: Incident,
) -> dict[str, Any]:
"""
三軌決策分析 (Phase 7.5 升級 + KB Phase 2 RAG 整合 + ADR-070 MCP 前置收集)
策略:
1. MCP 前置收集真實環境狀態ADR-070
2. 先檢查 Playbook 是否有高度匹配 (similarity >= 85%)
3. Playbook 命中則直接使用 (最快、經驗驗證)
4. 否則 LLM + Expert System 雙軌 + KB RAG context + MCP context 注入
優先順序: Playbook > LLM > Expert System
"""
# ADR-081 Phase 1: PreDecisionInvestigator — 8D 感官蒐集feature flag 守衛)
# AIOPS_P1_ENABLED=False → 退回舊 _collect_mcp_context() 路徑
# 2026-04-15 ogt + Claude Sonnet 4.6
evidence_snapshot = None
from src.core.feature_flags import aiops_flags
if aiops_flags.is_sub_flag_enabled("AIOPS_P1_PRE_DECISION_INVESTIGATOR"):
from src.services.pre_decision_investigator import get_pre_decision_investigator
investigator = get_pre_decision_investigator()
evidence_snapshot = await investigator.investigate(incident)
mcp_context = evidence_snapshot.evidence_summary or ""
else:
# ADR-070: 原有 MCP 收集路徑Phase 0 保留)
mcp_context = await self._collect_mcp_context(incident)
# ADR-082 Phase 2: 5 Agent 辯證feature flag 守衛)
# AIOPS_P2_ENABLED=True → 走 AgentOrchestrator 路徑,跳過 Playbook / LLM
# 需要 EvidenceSnapshot若 P1 未開啟則自行收集
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太)
if aiops_flags.is_phase_enabled(2): # Gate 2: 用 is_phase_enabled 統一父 Phase 守衛
p2_snapshot = evidence_snapshot
if p2_snapshot is None:
try:
from src.services.pre_decision_investigator import get_pre_decision_investigator
p2_snapshot = await get_pre_decision_investigator().investigate(incident)
except Exception:
logger.warning(
"p2_snapshot_collect_failed",
incident_id=incident.incident_id,
)
if p2_snapshot is not None:
from src.services.agent_orchestrator import run_agent_debate
package = await run_agent_debate(
snapshot=p2_snapshot,
incident_id=incident.incident_id,
)
return _package_to_proposal_data(package)
# snapshot 仍為 None → 降級繼續走原路徑(不阻塞)
logger.warning("p2_no_snapshot_fallback", incident_id=incident.incident_id)
# Phase 7.5: 先嘗試 Playbook 匹配
playbook_result = await self._try_playbook_match(incident)
if playbook_result:
return playbook_result
# MCP Phase 4c: Playbook 無命中 → 非同步產生 AI 草稿 Playbook (2026-04-11 Claude Sonnet 4.6)
_fire_and_forget(_generate_playbook_draft_if_new(incident))
# Expert System 同步執行 (立即可用)
expert_result = expert_analyze(incident)
# KB Phase 2: 語意搜尋相關知識條目 (失敗時靜默降級)
# 2026-04-04 Claude Code: KB RAG 整合,提升 LLM 決策品質
kb_context = await self._query_kb_context(incident)
# LLM 非同步執行 (Phase 22: OpenClaw + Nemotron 協作)
# 2026-03-31 Claude Code: 使用 _with_tools 方法啟用雙軌協作
try:
signals_dict = [s.model_dump() for s in incident.signals]
# 將 KB context + MCP 實時狀態 注入 expert_context 傳給 LLM
# ADR-070: MCP context 優先放最前面,讓 LLM 看到真實環境狀態再做決策
llm_expert_context: dict[str, Any] = {**expert_result} if expert_result else {}
existing = str(llm_expert_context.get("diagnosis_context", ""))
context_parts = []
if mcp_context:
context_parts.append(f"## 當前環境狀態 (MCP 實時查詢)\n{mcp_context}")
if kb_context:
context_parts.append(f"## 相關歷史知識\n{kb_context}")
if existing:
context_parts.append(existing)
if context_parts:
llm_expert_context["diagnosis_context"] = "\n\n".join(context_parts)
# GAP-B4 (2026-04-14 Claude Sonnet 4.6): LLM 25s hard timeout
# 比外層 decide() 30s wait_for 更嚴格,留 5s 給 YAML risk override + NemoClaw second opinion
# Timeout → 明確 llm_timeout_fallback 日誌,返回 expert_result 而非等外層觸發
llm_result, provider, success = await asyncio.wait_for(
self._openclaw.generate_incident_proposal_with_tools(
incident_id=incident.incident_id,
severity=incident.severity.value,
signals=signals_dict,
affected_services=incident.affected_services,
expert_context=llm_expert_context if llm_expert_context else None,
),
timeout=25.0,
)
if success and llm_result:
logger.info(
"dual_engine_llm_win",
incident_id=incident.incident_id,
provider=provider,
kb_rag=bool(kb_context),
)
result = {**llm_result, "source": f"llm_{provider}"}
# ADR-073 Phase 3-6: YAML rule risk_level 優先於 LLM 輸出 (2026-04-12 ogt)
# LLM 有時把 critical 告警估為 mediumYAML 規則是由人工審閱過的,優先採用
try:
from src.services.alert_rule_engine import get_risk_for_alertname
_alertname_for_risk = (
incident.signals[0].labels.get("alertname", "")
if incident.signals else ""
)
if _alertname_for_risk:
_yaml_risk = get_risk_for_alertname(_alertname_for_risk)
if _yaml_risk and _yaml_risk != result.get("risk_level"):
logger.info(
"risk_level_yaml_override",
incident_id=incident.incident_id,
llm_risk=result.get("risk_level"),
yaml_risk=_yaml_risk,
)
result["risk_level"] = _yaml_risk
except Exception as _re:
logger.debug("risk_level_yaml_override_failed", error=str(_re))
# MCP Phase 4a: 信心 < 0.7 → NemoClaw second opinion (2026-04-11 Claude Sonnet 4.6)
_conf = float(result.get("confidence", 1.0))
if _conf < 0.7:
try:
# GAP-B4 (2026-04-14 Claude Sonnet 4.6): NemoClaw 是 advisory
# 3s timeout 保護主決策流程不被拖累
_advisory = await asyncio.wait_for(
_nemoclaw_second_opinion(incident, result),
timeout=3.0,
)
if _advisory:
result["advisory_note"] = _advisory
logger.info(
"nemoclaw_second_opinion_added",
incident_id=incident.incident_id,
confidence=_conf,
)
except asyncio.TimeoutError:
logger.warning("nemoclaw_second_opinion_timeout",
incident_id=incident.incident_id)
except Exception as _soe:
logger.warning("nemoclaw_second_opinion_failed",
incident_id=incident.incident_id, error=str(_soe))
return result
except asyncio.TimeoutError:
# GAP-B4: LLM 超時 → 明確標記,降級 Expert System
logger.warning(
"llm_timeout_fallback",
incident_id=incident.incident_id,
timeout_sec=25.0,
action="降級 Expert System",
)
except Exception as e:
logger.warning(
"dual_engine_llm_failed",
incident_id=incident.incident_id,
error=str(e),
)
# LLM 失敗/超時,使用 Expert System
logger.info(
"dual_engine_expert_fallback",
incident_id=incident.incident_id,
)
return expert_result
async def _try_playbook_match(
self,
incident: Incident,
) -> dict[str, Any] | None:
"""
Phase 7.5: 嘗試 Playbook 匹配
條件:
- 相似度 >= PLAYBOOK_SIMILARITY_THRESHOLD (85%)
- Playbook 狀態為 APPROVED
- 成功率 >= 80% (如果有執行紀錄)
Returns:
匹配成功返回 proposal_data否則 None
"""
try:
playbook_service = get_playbook_service()
# 建構症狀模式
alert_names = [s.alert_name for s in incident.signals] if incident.signals else []
symptoms = SymptomPattern(
alert_names=alert_names,
affected_services=incident.affected_services or [],
severity_range=[incident.severity.value] if incident.severity else ["P2"],
)
# 取得推薦 (只取 Top 1)
recommendations = await playbook_service.get_recommendations(
symptoms=symptoms,
top_k=1,
)
if not recommendations:
logger.debug(
"playbook_no_match",
incident_id=incident.incident_id,
)
return None
best_match = recommendations[0]
playbook = best_match.playbook
# 檢查相似度閾值
if best_match.similarity_score < PLAYBOOK_SIMILARITY_THRESHOLD:
logger.debug(
"playbook_similarity_below_threshold",
incident_id=incident.incident_id,
playbook_id=playbook.playbook_id,
similarity=best_match.similarity_score,
threshold=PLAYBOOK_SIMILARITY_THRESHOLD,
)
return None
# 檢查成功率 (如果有執行紀錄)
if playbook.total_executions > 0 and playbook.success_rate < 0.8:
logger.debug(
"playbook_low_success_rate",
incident_id=incident.incident_id,
playbook_id=playbook.playbook_id,
success_rate=playbook.success_rate,
)
return None
# Playbook 命中!
# 取得第一個修復步驟的指令
kubectl_command = ""
if playbook.repair_steps:
# 將 target 替換為實際服務名稱
target = incident.affected_services[0] if incident.affected_services else "unknown"
kubectl_command = playbook.repair_steps[0].command.format(target=target)
logger.info(
"playbook_match_success",
incident_id=incident.incident_id,
playbook_id=playbook.playbook_id,
playbook_name=playbook.name,
similarity=best_match.similarity_score,
success_rate=playbook.success_rate,
)
return {
"source": "playbook",
"playbook_id": playbook.playbook_id,
"playbook_name": playbook.name,
"action": kubectl_command,
"kubectl_command": kubectl_command,
"description": playbook.description,
"risk_level": playbook.repair_steps[0].risk_level.value.lower() if playbook.repair_steps else "medium",
"reasoning": f"Playbook 匹配 ({best_match.similarity_score:.0%} 相似度, {playbook.success_rate:.0%} 成功率): {best_match.reason}",
"confidence": 0.0, # 🔴 Playbook RAG 匹配不是 AI 分析,信心度設 0
"matched_symptoms": best_match.matched_symptoms,
"from_cache": False,
}
except Exception as e:
logger.warning(
"playbook_match_error",
incident_id=incident.incident_id,
error=str(e),
)
return None
async def _find_existing_token(
self,
incident_id: str,
) -> DecisionToken | None:
"""查找現有的決策令牌"""
redis_client = get_redis()
# 掃描 decision:* 找到匹配的 incident_id
cursor = 0
while True:
cursor, keys = await redis_client.scan(
cursor=cursor,
match=f"{DECISION_TOKEN_PREFIX}*",
count=100,
)
for key in keys:
try:
import json
data = await redis_client.get(key)
if data:
token_data = json.loads(data)
if token_data.get("incident_id") == incident_id:
return DecisionToken.from_dict(token_data)
except Exception:
continue
if cursor == 0:
break
return None
async def _persist_decision_to_db(
self, incident_id: str, proposal_data: dict
) -> None:
"""
2026-04-16 Claude Sonnet 4.6: 將 AI 分析結果寫入 incidents.decision_chain (PostgreSQL)
修復 Gap: decision token 只有 Redis TTLAI 分析歷史永久丟失
寫入格式: JSON array每次分析追加一條記錄
欄位: ts / confidence / risk_level / provider / source / diagnosis (前 200 字)
"""
try:
from src.db.base import get_db_context
from sqlalchemy import text as _sa_text
import json as _json
from src.utils.timezone import now_taipei
# 2026-04-16 ogt + Claude Sonnet 4.6: 補入 playbook_id / alert_category (ADR-076)
entry = {
"ts": now_taipei().isoformat(),
"confidence": proposal_data.get("confidence", 0.0),
"risk_level": proposal_data.get("risk_level", "unknown"),
"provider": proposal_data.get("provider", proposal_data.get("source", "")),
"source": proposal_data.get("source", ""),
"diagnosis": str(proposal_data.get("diagnosis", proposal_data.get("description", "")))[:200],
"playbook_id": proposal_data.get("playbook_id", ""),
"playbook_name": proposal_data.get("playbook_name", ""),
"alert_category": proposal_data.get("alert_category", ""),
}
async with get_db_context() as db:
# 讀取現有 decision_chain (可能為 null 或 json array)
r = await db.execute(
_sa_text(
"SELECT decision_chain FROM incidents WHERE incident_id = :iid"
),
{"iid": incident_id},
)
row = r.fetchone()
if row is None:
return
existing = row[0] or []
if isinstance(existing, str):
try:
existing = _json.loads(existing)
except Exception:
existing = []
if not isinstance(existing, list):
existing = []
existing.append(entry)
# asyncpg 不支援 :param::type 語法,改用 CAST(:param AS jsonb)
# 2026-04-16 Claude Sonnet 4.6: fix syntax error at or near ":"
await db.execute(
_sa_text(
"UPDATE incidents SET decision_chain = CAST(:dc AS jsonb) WHERE incident_id = :iid"
),
{"dc": _json.dumps(existing), "iid": incident_id},
)
await db.commit()
logger.debug(
"decision_chain_persisted",
incident_id=incident_id,
confidence=entry["confidence"],
provider=entry["provider"],
)
except Exception as e:
logger.warning("decision_chain_persist_failed", incident_id=incident_id, error=str(e))
async def _save_token(self, token: DecisionToken, ttl: int = DECISION_TOKEN_TTL) -> None:
"""儲存決策令牌到 Redis
ttl: 過期秒數,預設 DECISION_TOKEN_TTL (3600s)
TYPE-1 純資訊通知使用 86400s (24h) 防重複洗版
"""
import json
redis_client = get_redis()
key = f"{DECISION_TOKEN_PREFIX}{token.token}"
await redis_client.set(
key,
json.dumps(token.to_dict()),
ex=ttl,
)
async def get_token(self, token_id: str) -> DecisionToken | None:
"""取得決策令牌"""
import json
redis_client = get_redis()
key = f"{DECISION_TOKEN_PREFIX}{token_id}"
data = await redis_client.get(key)
if data:
return DecisionToken.from_dict(json.loads(data))
return None
async def update_token_state(
self,
token_id: str,
new_state: DecisionState,
proposal_id: str | None = None,
) -> DecisionToken | None:
"""更新決策狀態"""
token = await self.get_token(token_id)
if not token:
return None
token.state = new_state
token.updated_at = datetime.now(UTC)
if proposal_id:
token.proposal_id = proposal_id
await self._save_token(token)
return token
async def get_or_create_decision_with_consensus(
self,
incident: Incident,
timeout_sec: float = 30.0,
use_consensus: bool = True,
) -> DecisionToken:
"""
取得或建立決策令牌 (含 Agent Teams 共識)
Phase 9.4 升級版本:
- 對於 P0/P1 事件,自動啟用 ConsensusEngine
- 整合多專家意見
- 共識分數影響風險評估
Args:
incident: 事件
timeout_sec: 超時秒數
use_consensus: 是否使用共識引擎 (預設 True)
Returns:
DecisionToken
"""
# 判斷是否需要共識 (P0/P1 或明確要求)
should_use_consensus = use_consensus and incident.severity.value in ["P0", "P1"]
if not should_use_consensus:
# 使用原有的雙軌決策
return await self.get_or_create_decision(incident, timeout_sec)
# Phase 9.4: 使用 ConsensusEngine
from src.services.consensus_engine import get_consensus_engine
consensus_engine = get_consensus_engine()
# 檢查現有 token
existing_token = await self._find_existing_token(incident.incident_id)
if existing_token:
# READY 或 EXECUTING 狀態: 直接返回
if existing_token.state in (DecisionState.READY, DecisionState.EXECUTING):
return existing_token
# COMPLETED 狀態: 直接返回,避免重複建立 decision 導致 Telegram 轟炸
if existing_token.state == DecisionState.COMPLETED:
return existing_token
# 建立新 token
token = DecisionToken(
token=f"DEC-{uuid4().hex[:12].upper()}",
incident_id=incident.incident_id,
state=DecisionState.ANALYZING,
)
await self._save_token(token)
logger.info(
"decision_analyzing_with_consensus",
token=token.token,
incident_id=incident.incident_id,
)
try:
# 執行共識分析
consensus_result = await asyncio.wait_for(
consensus_engine.run_consensus(incident, timeout_sec),
timeout=timeout_sec,
)
# 轉換為 proposal_data 格式
proposal_data = {
"source": "consensus_engine",
"consensus_id": consensus_result.consensus_id,
"consensus_score": consensus_result.consensus_score,
"action": consensus_result.recommended_action,
"description": consensus_result.final_reasoning,
"risk_level": consensus_result.risk_level,
"kubectl_command": consensus_result.recommended_kubectl,
"reasoning": consensus_result.final_reasoning,
"confidence": 0.0, # 🔴 Consensus Engine 共識分數不是 AI 信心度,設 0
"agent_count": len(consensus_result.opinions),
"dissenting_opinions": consensus_result.dissenting_opinions,
"from_cache": False,
}
token.state = DecisionState.READY
token.proposal_data = proposal_data
token.updated_at = datetime.now(UTC)
logger.info(
"decision_ready_with_consensus",
token=token.token,
consensus_id=consensus_result.consensus_id,
consensus_score=consensus_result.consensus_score,
)
except TimeoutError:
logger.warning(
"consensus_timeout_using_expert",
token=token.token,
timeout_sec=timeout_sec,
)
# Fallback 到 Expert System
expert_result = expert_analyze(incident)
token.state = DecisionState.READY
token.proposal_data = expert_result
token.updated_at = datetime.now(UTC)
except Exception as e:
logger.exception(
"consensus_error_using_expert",
token=token.token,
error=str(e),
)
expert_result = expert_analyze(incident)
token.state = DecisionState.READY
token.proposal_data = expert_result
token.error = str(e)
token.updated_at = datetime.now(UTC)
await self._save_token(token)
return token
async def resend_stale_ready_tokens(self) -> int:
"""
BUG-005 修復 2026-04-11: 掃描 Redis 中所有 state=ready 且 dedup_key 不存在的 token
重新推送 Telegram 審核卡片。
觸發時機API 啟動lifespan startup+ 管理 API 手動呼叫。
2026-04-14 Claude Sonnet 4.6 修復: 加並發限制防止 Pod 啟動時壓爆 Ollama
- 原 fire_and_forget 同時啟動 N 個 task → N=108 時 Ollama embedding 全部 timeout
- 改 Semaphore 限 5 並發 + 每批 sleep 1s總體 throughput 降低但系統穩定
Returns:
重新推送的 token 數量
"""
import asyncio as _asyncio
import json as _json
from src.core.redis_client import get_redis
from src.db.base import get_db_context
from src.repositories.incident_repository import IncidentDBRepository
redis = get_redis()
resent = 0
# GAP-A4 後續修復:限制並發 5避免壓爆 Ollama
_sem = _asyncio.Semaphore(5)
async def _bounded_push(incident_obj, proposal_data_obj, _id, _token):
async with _sem:
try:
await _push_decision_to_telegram(incident_obj, proposal_data_obj)
logger.info(
"stale_ready_token_resent",
incident_id=_id,
token=_token,
)
except Exception as _e:
logger.warning(
"stale_ready_token_resend_failed",
incident_id=_id,
error=str(_e),
)
# 每次完成後喘 200ms給 Ollama embedding 恢復空間
await _asyncio.sleep(0.2)
try:
# 掃描所有 decision:* key
cursor = 0
tasks: list[_asyncio.Task] = []
while True:
cursor, keys = await redis.scan(cursor, match="decision:*", count=200)
for key in keys:
try:
raw = await redis.get(key)
if not raw:
continue
data = _json.loads(raw)
if data.get("state") != DecisionState.READY.value:
continue
incident_id = data.get("incident_id", "")
dedup_key = f"telegram_sent:{incident_id}"
if await redis.exists(dedup_key):
continue # dedup 還在,跳過
# 取 Incident 資料(確認未 resolved
async with get_db_context() as _db:
incident = await IncidentDBRepository(_db).get_by_id(incident_id)
if not incident:
continue
if str(getattr(incident, "status", "")).lower() in ("resolved", "closed"):
continue
# 2026-04-15 Claude Sonnet 4.6 (節點 3 修復):
# 跳過 > 3 天的 stale incident — labels 已過時,重 process 無意義
# 只會壓爆 Ollama + 污染 Telegram 卡片截圖4/11 的卡片今天還在彈)
_STALE_DAYS = 3
_created_at = getattr(incident, "created_at", None)
if _created_at:
from datetime import datetime as _dt, timedelta as _td, timezone as _tz
_now = _dt.now(_tz.utc)
_cutoff = _now - _td(days=_STALE_DAYS)
# 確保 _created_at 有時區
if _created_at.tzinfo is None:
_created_at = _created_at.replace(tzinfo=_tz.utc)
if _created_at < _cutoff:
logger.debug(
"stale_ready_token_skipped_too_old",
incident_id=incident_id,
age_days=(_now - _created_at).days,
cutoff_days=_STALE_DAYS,
)
continue
proposal_data = data.get("proposal_data") or {}
if not proposal_data:
continue
# 用 Semaphore 限制並發task 自帶 throttle
_task = _asyncio.create_task(
_bounded_push(incident, proposal_data, incident_id, data.get("token", ""))
)
tasks.append(_task)
resent += 1
except Exception as _te:
logger.debug("stale_ready_token_scan_error", error=str(_te))
if cursor == 0:
break
# 不等所有 task 完成fire-and-forget 語義保留),但 await 一下讓並發限制生效
if tasks:
logger.info("stale_ready_tokens_throttled_dispatch",
total=len(tasks), max_concurrent=5)
except Exception as e:
logger.warning("resend_stale_ready_tokens_failed", error=str(e))
logger.info("stale_ready_tokens_scan_done", resent=resent)
return resent
async def _ssh_execute(
self,
incident: "Incident",
token: "DecisionToken",
action: str,
target: str,
) -> None:
"""
ADR-073 Phase 3-2: infrastructure 告警 SSH MCP routing
Docker/Host 告警走 SSH MCP Provider不走 K8s executor
2026-04-12 ogt
支援指令:
- docker restart <container>
- systemctl restart <service>
- docker rm -f <container> (含 docker start)
"""
import os as _os
# 取得主機 — 從 instance label 或 SSH_MCP_ALLOWED_HOSTS 第一台
_instance = incident.signals[0].labels.get("instance", "") if incident.signals else ""
_host = _instance.split(":")[0] if ":" in _instance else _instance
_allowed = [h.strip() for h in _os.environ.get("SSH_MCP_ALLOWED_HOSTS", "").split(",") if h.strip()]
if not _host or _host not in _allowed:
_host = _allowed[0] if _allowed else ""
if not _host:
logger.warning(
"ssh_execute_no_host",
incident_id=incident.incident_id,
reason="SSH_MCP_ALLOWED_HOSTS 未設定或 instance label 不在白名單",
)
token.state = DecisionState.READY
token.proposal_data["decision_state"] = DecisionState.READY.value
token.proposal_data["auto_executed"] = False
token.proposal_data["mcp_all_failed"] = True
await self._save_token(token)
_fire_and_forget(_push_decision_to_telegram(incident, token.proposal_data))
return
# 解析 SSH tool + params
_action_lower = action.lower().strip()
if _action_lower.startswith("docker restart"):
_tool = "docker_restart"
_container = target
elif _action_lower.startswith("systemctl restart"):
_tool = "service_restart"
_service = target
else:
logger.info(
"ssh_execute_unknown_action",
incident_id=incident.incident_id,
action=action,
reason="不支援的 SSH action 格式,降級為人工審核",
)
token.state = DecisionState.READY
token.proposal_data["decision_state"] = DecisionState.READY.value
token.proposal_data["auto_executed"] = False
await self._save_token(token)
_fire_and_forget(_push_decision_to_telegram(incident, token.proposal_data))
return
params: dict = {"host": _host}
if _tool == "docker_restart":
params["container"] = _container
else:
params["service"] = _service
try:
result = await self._ssh.execute(tool_name=_tool, parameters=params)
success = result.success
logger.info(
"ssh_execute_result",
incident_id=incident.incident_id,
tool=_tool,
host=_host,
success=success,
output=result.output[:200] if result.output else "",
)
token.state = DecisionState.COMPLETED
token.proposal_data["auto_executed"] = True
await self._save_token(token)
_fire_and_forget(
_push_auto_repair_result(incident, action, success=success)
)
except Exception as e:
logger.error(
"ssh_execute_failed",
incident_id=incident.incident_id,
error=str(e),
)
token.state = DecisionState.READY
token.error = str(e)
token.proposal_data["decision_state"] = DecisionState.READY.value
token.proposal_data["auto_executed"] = False
await self._save_token(token)
_fire_and_forget(_push_decision_to_telegram(incident, token.proposal_data))
# =============================================================================
# Singleton
# =============================================================================
_decision_manager: DecisionManager | None = None
def get_decision_manager() -> DecisionManager:
"""取得 DecisionManager 實例 (Singleton)"""
global _decision_manager
if _decision_manager is None:
_decision_manager = DecisionManager()
return _decision_manager