"""
Decision Manager - Phase 6.5 非同步決策狀態機
# 2026-04-10 Claude Sonnet 4.6: auto_execute ApprovalRequest 修復 + placeholder 替換
=============================================
實作「雙軌決策」(Dual-Engine Decision):
1. OpenClaw LLM (主要) - 智能提案
2. Expert System (備援) - 規則引擎
狀態機:
- INIT: 事件剛建立
- ANALYZING: 正在分析中 (LLM + Expert 並行)
- READY: 決策就緒,等待統帥親核
- EXECUTING: 已授權,正在執行
- COMPLETED: 執行完成
統帥鐵律:
- 永遠不能讓 UI 鎖死
- 30 秒內必須有 decision_token
- LLM 失敗時 Expert System 保底
"""
import asyncio
import json
from datetime import UTC, datetime
from enum import Enum
from typing import Any, Protocol, runtime_checkable
from uuid import uuid4
import structlog
from src.core.config import settings
from src.core.redis_client import get_redis
from src.models.incident import Incident
from src.models.playbook import SymptomPattern
from src.services.action_parser import parse_kubectl_action
from src.services.auto_approve import get_auto_approve_policy
from src.services.openclaw import get_openclaw
from src.services.playbook_service import get_playbook_service
from src.services.telegram_gateway import SILENCE_KEY_PREFIX # P1-24: 統一常數,禁止重複定義
logger = structlog.get_logger(__name__)
# Phase 7.5: Playbook 優先閾值
PLAYBOOK_SIMILARITY_THRESHOLD = 0.85 # 相似度 >= 85% 直接使用 Playbook
# P1 fix 2026-04-11: background task GC guard — keep strong refs until done
_background_tasks: set[asyncio.Task] = set()
def _fire_and_forget(coro) -> asyncio.Task:
"""Create a background task with GC protection via _background_tasks."""
task = asyncio.create_task(coro)
_background_tasks.add(task)
task.add_done_callback(_background_tasks.discard)
return task
def _incident_alertname_for_dedup(incident: Incident) -> str:
"""Return a stable alert name for Telegram fingerprint dedup."""
if incident.signals:
signal = incident.signals[0]
return (
signal.labels.get("alertname")
or signal.alert_name
or signal.annotations.get("summary")
or signal.annotations.get("description")
or incident.incident_id
)
return incident.incident_id
def _phase2_fallback_reason(package: Any) -> str | None:
"""Return why a Phase 2 package should continue to Playbook/LLM fallback.
Phase 2 is a deliberation layer, not the final brake. If the debate times
out or produces no actionable recommendation, the flywheel must still try
deterministic Playbook/LLM/expert paths before opening a manual gate.
"""
status = getattr(package, "session_status", None)
status_value = status.value if status and hasattr(status, "value") else str(status or "")
if status_value in {"timeout", "failed"}:
return f"session_{status_value}"
if getattr(package, "all_agents_degraded", False):
return "all_agents_degraded"
action = str(getattr(package, "recommended_action", "") or "").strip()
if not action and getattr(package, "requires_human_approval", False):
return "empty_action_requires_human"
return None
def _incident_llm_timeout_seconds() -> float:
"""Return the outer timeout for incident LLM proposals.
The provider layer already has per-provider timeouts. This outer guard must
not be shorter than the GCP Ollama lane, or alert diagnosis will be cut off
before the free/local-first route can answer.
"""
configured = getattr(settings, "INCIDENT_LLM_TIMEOUT_SECONDS", None)
try:
timeout = float(configured)
except (TypeError, ValueError):
timeout = 240.0
return max(timeout, float(getattr(settings, "OPENCLAW_TIMEOUT", 30)))
def _should_escalate_auto_approve_rejection(reason: Any) -> bool:
"""Return True for manual gates that mean the automation path went blind."""
value = getattr(reason, "value", str(reason or "")).lower()
return value in {
"no_playbook",
"no_executable_action",
"low_trust",
}
_HOST_LAYER_SSH_CATEGORIES = {"infrastructure", "host_resource", "backup_failure"}
_NON_K8S_HOST_CATEGORIES = {"host_resource", "backup_failure"}
def _is_host_layer_ssh_category(category: str | None) -> bool:
"""Return True when DecisionManager must route non-kubectl actions to SSH."""
return (category or "") in _HOST_LAYER_SSH_CATEGORIES
def _is_non_k8s_host_category(category: str | None) -> bool:
"""Return True for host/backup alerts that must not auto-run kubectl."""
return (category or "") in _NON_K8S_HOST_CATEGORIES
async def _escalate_decision_auto_repair_unavailable(
*,
incident: Incident,
token: Any,
failure_reason: str,
attempted_actions: str,
) -> None:
"""Send the same emergency channel used by auto-repair when decision gates block."""
try:
from src.services.emergency_escalation_service import (
escalate_auto_repair_unavailable,
)
labels = incident.signals[0].labels if incident.signals else {}
alertname = labels.get("alertname") or getattr(incident, "alertname", "") or "DecisionAutoRepairBlocked"
target = (
(incident.affected_services or [None])[0]
or labels.get("deployment")
or labels.get("pod")
or labels.get("service")
or labels.get("component")
or labels.get("instance")
or "unknown"
)
namespace = labels.get("namespace") or "awoooi-prod"
await escalate_auto_repair_unavailable(
incident_id=incident.incident_id,
approval_id=getattr(token, "proposal_id", None) or getattr(token, "token", ""),
alert_type=str(alertname),
target_resource=str(target),
namespace=str(namespace),
failure_reason=failure_reason,
attempted_actions=attempted_actions,
)
except Exception as exc:
logger.warning(
"decision_auto_repair_escalation_failed",
incident_id=getattr(incident, "incident_id", ""),
error=str(exc),
)
# =============================================================================
# Phase 31 (ADR-067 2026-04-10): Log 異常摘要 — NemoTron deepseek-r1:14b
# =============================================================================
async def _send_log_summary(incident: "Incident") -> None:
"""
非同步取得 Pod log 異常摘要,用 NemoTron bot 發到 SRE 群組
觸發點:_push_decision_to_telegram 發完審批卡後
"""
try:
target = incident.affected_services[0] if incident.affected_services else None
if not target:
return
namespace = "awoooi-prod"
if incident.signals:
namespace = incident.signals[0].labels.get("namespace", "awoooi-prod")
from src.services.log_summary_service import get_log_summary_service
svc = get_log_summary_service()
summary = await svc.summarize_with_soft_timeout(pod_name=target, namespace=namespace)
if not summary:
return
from src.services.telegram_gateway import get_telegram_gateway
tg = get_telegram_gateway()
await tg.send_as_nemotron(
f"📋 Log 異常摘要 — {target}\n{summary}"
)
import structlog as _sl
_sl.get_logger(__name__).info("log_summary_sent", target=target, incident_id=incident.incident_id)
except Exception as e:
import structlog as _sl
_sl.get_logger(__name__).warning("log_summary_failed", error=str(e))
# =============================================================================
# Telegram 推送 (Phase 6.5: 決策就緒通知)
# =============================================================================
async def _push_decision_to_telegram(
incident: Incident,
proposal_data: dict[str, Any],
) -> None:
"""
決策就緒時推送到 Telegram
Phase 6.5: 整合 Signal Worker 流程與 Telegram 通知
2026-03-27 ogt: 加入 Redis 去重機制 (10 分鐘 TTL)
"""
try:
# 延遲導入避免循環依賴
from src.core.redis_client import get_redis
from src.services.telegram_gateway import (
classify_notification,
get_telegram_gateway,
NotificationType,
_smart_truncate as _smt,
)
# 🔴 去重檢查:同一 fingerprint (alertname+target) 24h 內只發一次 decision card
# 2026-05-02 Claude Opus 4.7 + 統帥 ogt:原本 dedup_key 用 incident.incident_id,
# 但 incident_id = uuid4()[:6] 隨機,且 incident_analysis_sweeper 每 1h 重觸決策、
# alertmanager 預設 repeat_interval=4h,導致同症狀 24h 內推 15 次(INC-6FE3BD 鐵證)。
# 改成 alertname+target 構造的 fingerprint key + TTL 86400s,同症狀共用 dedup。
# Incident 真正 RESOLVED/CLOSED 時走 line 220-226 的 status check 提早 return,不影響復發偵測。
redis = get_redis()
_alertname_fp = _incident_alertname_for_dedup(incident).strip().lower().replace(" ", "_")[:60]
_target_fp = (
incident.affected_services[0] if incident.affected_services else "unknown"
).lower()[:40]
dedup_key = f"telegram_sent:fp:{_alertname_fp}:{_target_fp}"
if await redis.exists(dedup_key):
logger.debug(
"telegram_push_skipped",
reason="Already sent within 24h (fingerprint dedup)",
incident_id=incident.incident_id,
fingerprint=f"{_alertname_fp}:{_target_fp}",
)
return
# 2026-04-09 Claude Code: resolved Incident 不重送 Telegram
# 場景: dedup TTL 過期後,已 resolve 的 Incident 仍被重新推送
if incident.status and str(incident.status).lower() in ("resolved", "closed"):
logger.info(
"telegram_push_skipped",
reason="Incident already resolved",
incident_id=incident.incident_id,
)
return
# 🔴 靜默檢查:此資源是否被靜默 (2026-03-27 P1 優化)
target = incident.affected_services[0] if incident.affected_services else "unknown"
silence_key = f"{SILENCE_KEY_PREFIX}{target}"
if await redis.exists(silence_key):
logger.info(
"telegram_push_silenced",
reason="Resource is silenced",
incident_id=incident.incident_id,
resource=target,
)
return
# 檢查是否有設定 Bot Token
if not settings.OPENCLAW_TG_BOT_TOKEN:
logger.debug(
"telegram_push_skipped",
reason="Bot token not configured",
incident_id=incident.incident_id,
)
return
gateway = get_telegram_gateway()
# 從 proposal_data 提取資料
import re as _re
def _strip_placeholders(s: str) -> str:
"""移除 佔位符,避免 Telegram HTML parse 錯誤"""
return _re.sub(r'<[^>]+>', '', s).strip()
def _parse_debate_summary(reasoning: str) -> dict[str, str]:
"""
解析 coordinator debate_summary → {diagnosis, plan, review, critic}
格式:「診斷:{...};方案:{...};安全審查:{...};質疑:{...}」
2026-04-17 ogt + Claude Sonnet 4.6: 修復 TYPE-8M 三欄重複渲染
根因:diagnosis/system_impact/probable_cause 全用 reasoning[:100] → 同一段字
"""
result: dict[str, str] = {"diagnosis": "", "plan": "", "review": "", "critic": ""}
for part in reasoning.split(";"):
part = part.strip()
if part.startswith("診斷:"):
result["diagnosis"] = part[3:]
elif part.startswith("方案:"):
result["plan"] = part[3:]
elif part.startswith("安全審查:"):
result["review"] = part[5:]
elif part.startswith("質疑:"):
result["critic"] = part[3:]
return result
target = incident.affected_services[0] if incident.affected_services else "unknown"
risk_level = proposal_data.get("risk_level", "medium")
# 2026-04-09 Claude Code: action 不用 _strip_placeholders,避免截掉 deployment name
# 應在 nemotron 補正後已填入真實值
action = proposal_data.get("action", proposal_data.get("kubectl_command", ""))
# 2026-04-09 Claude Code: 修復舊 Incident proposal_data 存 enum string 導致建議空白
# 舊 code 存 action="RESTART_DEPLOYMENT" 而非 kubectl command
# 偵測:無 kubectl/ssh/docker 關鍵字 → 用規則引擎重新查
_KUBECTL_MARKERS = ("kubectl", "ssh", "docker", "systemctl", "/")
if action and not any(m in action for m in _KUBECTL_MARKERS):
# action 是 enum string,嘗試用規則引擎補出 kubectl command
try:
from src.services.alert_rule_engine import match_rule as _match_rule
_labels = incident.signals[0].labels if incident.signals else {}
# 2026-04-12 ogt: Incident 沒有 title 欄位,用 signal annotation summary
_sig_msg = (
incident.signals[0].annotations.get("summary", "") or
incident.signals[0].annotations.get("description", "") or
incident.signals[0].alert_name
) if incident.signals else ""
_rule_resp = _match_rule({
"labels": _labels,
"alert_type": _labels.get("alertname", target),
"message": _sig_msg,
"target_resource": target,
"namespace": incident.signals[0].labels.get("namespace", "awoooi-prod") if incident.signals else "awoooi-prod",
"severity": risk_level,
})
if _rule_resp and _rule_resp.get("kubectl_command", "").strip():
action = _rule_resp["kubectl_command"]
except Exception:
pass # 規則引擎失敗不影響通知,保留原 action
description = proposal_data.get("description", "")
reasoning = _strip_placeholders(proposal_data.get("reasoning", ""))
confidence = proposal_data.get("confidence", 0.0) # 🔴 預設 0.0 表示未經 AI 分析
source = proposal_data.get("source", "unknown")
ai_provider = proposal_data.get("provider", "") # 2026-03-29 ogt: AI 模型來源
ai_model = proposal_data.get("model", "") # 2026-04-04 ogt: 底層模型名稱
# 2026-04-02 ogt: Phase 22 Nemotron 協作資料
nemotron_enabled = proposal_data.get("nemotron_enabled", False)
nemotron_tools = proposal_data.get("nemotron_tools")
nemotron_validation = proposal_data.get("nemotron_validation", "")
nemotron_latency_ms = proposal_data.get("nemotron_latency_ms", 0.0)
# 2026-04-09 Claude Sonnet 4.6: Tool Calling 模型/後端
nemotron_tool_model = proposal_data.get("nemotron_tool_model", "")
nemotron_tool_backend = proposal_data.get("nemotron_tool_backend", "")
# 2026-04-16 ogt + Claude Sonnet 4.6: Playbook 匹配名稱穿透到 TG 卡片 (ADR-076)
_playbook_name = proposal_data.get("playbook_name", "")
# 2026-04-16 ogt + Claude Sonnet 4.6: 覆寫 Webhook 的垃圾 action
# 問題:Webhook inline LLM 建立 ApprovalRecord 時寫入通用 action(如 kubectl rollout restart)
# Agent 分析正確但只發新 Telegram 卡,未覆寫 ApprovalRecord.action
# 用戶批准 Agent 卡 → 系統查 incident_id → 執行舊 webhook 垃圾 action
# 修復:Agent 確認 action 後立即覆寫 ApprovalRecord(只要有值,空字串不覆寫)
_agent_action = action # action 已由 _package_to_proposal_data + rule engine 確定
if not _agent_action and description:
# NO_ACTION:用描述摘要讓用戶知道 Agent 的建議是「觀察/調查」
_agent_action = f"NO_ACTION - {description[:120]}"
if _agent_action:
try:
from src.services.approval_db import get_approval_service as _get_approval_svc
await _get_approval_svc().update_action_by_incident_id(
incident_id=incident.incident_id,
new_action=_agent_action,
)
except Exception as _update_err:
logger.warning(
"approval_action_update_by_agent_failed",
incident_id=incident.incident_id,
error=str(_update_err),
)
# 🚨 ADR-091 鐵律 (2026-04-17 ogt + Claude Sonnet 4.6): 禁止分析失敗廣播
# 問題: GET /incidents 觸發 Agent Debate → LLM 全部失敗 → description="待分析" + action=""
# → 系統每隔幾分鐘廣播「待分析」喪屍卡片 → 告警疲勞(SRE 最致命的殺手)
# 規則: description="待分析" + action 屬於失敗狀態集合 = Phase 2 所有 Agent 無有效輸出 → 靜默退出
# 已完成: DB 狀態已在上方更新 (update_action_by_incident_id),不需要 Telegram 廣播
# 禁止改動: 此閘門保護全域告警信噪比,任何「加例外」需首席架構師書面授權
# 2026-04-17 ogt + Claude Sonnet 4.6 (hotfix): 修復 d5dbfc9 邏輯漏洞
# 舊 bug: `not action.strip()` 在 action="待分析" 時為 False → 閘門失效 → 喪屍卡片突圍
# 原因: c759b4e P1 修復讓 suggested_action fallback 為 "待分析"(非空字串)而非 ""
# 修復: action 必須不在失敗狀態集合才允許廣播
_action_text = action.strip()
_FAILED_ACTION_TOKENS = {"", "待分析", "NO_ACTION", "待分析 - 系統自動保護"}
if description.strip() == "待分析" and _action_text in _FAILED_ACTION_TOKENS:
logger.info(
"telegram_push_suppressed_no_analysis",
incident_id=incident.incident_id,
action_text=_action_text,
reason="Agent Debate 無有效輸出 (description=待分析, action 屬於失敗狀態集合)",
)
return
# 建立 approval_id (使用 incident_id 作為追蹤)
# 2026-03-27 ogt: 修復 INC-INC-INC- 重複前綴 bug
approval_id = incident.incident_id # 已經是 INC-xxx 格式
# ADR-071: 通知分類器 — 依告警類型/狀態決定卡片種類
# 2026-04-11 Claude Sonnet 4.6: 接通 classify_notification(),原本死程式碼
_auto_executed = proposal_data.get("auto_executed", False)
_decision_state = proposal_data.get("decision_state", "")
_mcp_all_failed = proposal_data.get("mcp_all_failed", False)
_notif_type = classify_notification(
incident=incident,
confidence=confidence,
auto_executed=_auto_executed,
mcp_all_failed=_mcp_all_failed,
decision_state=_decision_state,
)
# 2026-04-12 ogt: ADR-075 — 從 Incident 提取 alert_category/notification_type(各分支共用)
_alert_category = getattr(incident, "alert_category", "") or ""
_notification_type = getattr(incident, "notification_type", "") or (_notif_type.value if _notif_type else "")
_alertname = incident.signals[0].labels.get("alertname", "MetaSystemAlert") if incident.signals else "MetaSystemAlert"
# blocked_reason 由上游「決策路由閘門」統一設定(decide() 第 4c 步)
# 此處只做狀態轉譯,通知層禁止查詢業務邏輯(架構師鐵律 2026-04-17)
# ① INVALID_TARGET → TYPE-4:目標無法解析,SRE 需人工調查
# ② NO_ACTION + critical → TYPE-4:critical 事件不可靜默
# ③ NO_ACTION + non-critical → TYPE-1:純資訊卡
_blocked_reason = proposal_data.get("blocked_reason", "")
if "INVALID_TARGET" in _blocked_reason:
_notif_type = NotificationType.TYPE_4
elif "NO_ACTION" in _blocked_reason:
if risk_level == "critical":
_notif_type = NotificationType.TYPE_4
else:
_notif_type = NotificationType.TYPE_1
# 2026-04-12 ogt: classify_alert_early() 設的 notification_type 優先於 classify_notification()
# 場景:backup/info 告警被 classify_notification() 誤判為 TYPE-3(confidence=0, 無 auto_executed)
# 規則:incident.notification_type 明確為 TYPE-1 → 強制走 info 路徑
if _notification_type == "TYPE-1":
_notif_type = NotificationType.TYPE_1
elif _notification_type == "TYPE-4D":
_notif_type = NotificationType.TYPE_4_DRIFT
elif _notification_type == "TYPE-8M":
_notif_type = NotificationType.TYPE_8M
if _notif_type == NotificationType.TYPE_1:
# 純資訊通知 — 無按鈕
# 2026-04-12 ogt: Incident 沒有 title 欄位,用 alertname
# 2026-04-17 ogt + Claude Sonnet 4.6: BUG-A 修復 — 只取 diagnosis 欄位
# 舊:message=reasoning[:200] → 整條 debate_summary 生文字傾倒
# 新:解析 debate_summary,只取「診斷:...」部分給 SRE 看
_info_title = (
incident.signals[0].labels.get("alertname", "") or
incident.signals[0].alert_name
) if incident.signals else "告警通知"
_parsed_info = _parse_debate_summary(reasoning)
_info_msg = _smt(_parsed_info.get("diagnosis") or description, 200)
tg_result = await gateway.send_info_notification(
incident_id=incident.incident_id,
title=_info_title or "告警通知",
message=_info_msg,
alertname=incident.signals[0].labels.get("alertname", "") if incident.signals else "",
severity="info",
)
elif _notif_type == NotificationType.TYPE_4_DRIFT:
# Config Drift 專屬卡片
# 2026-04-17 ogt + Claude Sonnet 4.6: BUG-B JSON Catcher
# LLM 可能輸出 JSON 結構({"action_title":"...","description":"...","rollback":"..."})
# 解析成功 → 格式化可讀文字;失敗 → 平滑降級為原始截斷字串(不可拋出 Exception)
try:
_drift_data = json.loads(description)
_parts: list[str] = []
if _drift_data.get("action_title"):
_parts.append(f"📝 建議操作:{_drift_data['action_title']}")
if _drift_data.get("description"):
_parts.append(f"📖 說明:{_smt(_drift_data['description'], 200)}")
if _drift_data.get("rollback"):
_parts.append(f"⏪ 回滾方案:{_smt(_drift_data['rollback'], 100)}")
_diff_text = "\n".join(_parts) if _parts else description[:500]
except (json.JSONDecodeError, TypeError, AttributeError):
_diff_text = description[:500]
tg_result = await gateway.send_drift_card(
incident_id=incident.incident_id,
approval_id=approval_id,
resource_name=target[:50],
diff_summary=_diff_text,
)
elif _notif_type == NotificationType.TYPE_8M or _alert_category in ("alertchain_health", "flywheel_health"):
# TYPE-8M:飛輪/告警鏈路健康異常,統一發到 SRE 戰情室群組。
# 2026-04-17 ogt + Claude Sonnet 4.6: 解析 debate_summary,各欄位用不同組件
# 根因:diagnosis/system_impact/probable_cause 全取 reasoning[:100] → 三欄重複同一段字
_parsed = _parse_debate_summary(reasoning)
_diag = _smt(_parsed.get("diagnosis") or description, 120) if (_parsed.get("diagnosis") or description) else "(無診斷)"
_impact = _smt(_parsed.get("plan") or "", 150)
_cause = _smt(_parsed.get("critic") or _parsed.get("review") or "", 100)
tg_result = await gateway.send_meta_alert(
incident_id=incident.incident_id,
approval_id=approval_id,
alertname=_alertname,
alert_category=_alert_category,
diagnosis=_diag,
severity_level=risk_level,
system_impact=_impact,
probable_cause=_cause,
)
elif _alert_category == "secops":
# TYPE-5S:資安事件 — 隔離/封鎖審核卡,統一發到 SRE 戰情室群組。
# 2026-04-12 ogt (ADR-075 Step-5)
_labels = incident.signals[0].labels if incident.signals else {}
_threat_level = _labels.get("threat_level", risk_level)
tg_result = await gateway.send_secops_card(
incident_id=incident.incident_id,
approval_id=approval_id,
alertname=_alertname,
threat_level=_threat_level,
resource=target[:60],
threat_behavior=_smt(_parse_debate_summary(reasoning).get("diagnosis") or reasoning, 150) if reasoning else description[:150],
)
elif _alert_category == "business":
# TYPE-6B:業務/FinOps 資訊告警 — 發到 SRE 群組(無審核按鈕)(ADR-075 Step-5)
# 2026-04-12 ogt (ADR-075 Step-5)
_labels = incident.signals[0].labels if incident.signals else {}
_business_domain = _labels.get("business_domain", "finops")
tg_result = await gateway.send_business_alert(
incident_id=incident.incident_id,
alertname=_alertname,
business_domain=_business_domain,
metric_name=_labels.get("metric_name", _alertname),
current_value=_labels.get("value", "--"),
threshold=_labels.get("threshold", "--"),
)
else:
# TYPE-2 / TYPE-3 / TYPE-4 都走 send_approval_card(按鈕組合由 alert_category 決定)
# 2026-04-17 ogt + Claude Sonnet 4.6 (BUG-C): TYPE-3 root_cause 清洗
# 舊:root_cause=_smt(reasoning, 500) → debate_summary 全文傾倒到 AI 診斷欄位
# 新:_parse_debate_summary 只取 diagnosis;方案/審查/質疑不在此卡顯示
_parsed_card = _parse_debate_summary(reasoning)
_card_root_cause = _smt(_parsed_card.get("diagnosis") or description, 300)
tg_result = await gateway.send_approval_card(
approval_id=approval_id,
risk_level=risk_level,
resource_name=target[:50],
root_cause=_card_root_cause,
# 2026-04-17 ogt + Claude Sonnet 4.6(亞太): 修復超時降級髒資料
# 舊:action="" 時 fallback 到 description,而 description 可能是「待分析」或診斷摘要
# 這導致 description 中的診斷文字(如「根因:...」)出現在「建議修復動作」欄位
# 新:action="" 時固定顯示「待分析」,禁止 description 流進 suggested_action
suggested_action=action[:120] if action else "待分析",
estimated_downtime="5-15 min",
primary_responsibility="INFRA",
confidence=confidence,
namespace=incident.signals[0].labels.get("namespace", "default") if incident.signals else "default",
ai_provider=ai_provider,
ai_model=ai_model,
nemotron_enabled=nemotron_enabled,
nemotron_tools=nemotron_tools,
nemotron_validation=nemotron_validation,
nemotron_latency_ms=nemotron_latency_ms,
nemotron_tool_model=nemotron_tool_model,
nemotron_tool_backend=nemotron_tool_backend,
incident_id=incident.incident_id,
alert_category=_alert_category,
notification_type=_notification_type,
playbook_name=_playbook_name,
)
# 2026-04-09 Claude Sonnet 4.6: 存 message_id → 後續狀態更新在原訊息延續
# 同時寫 Redis (快速查詢) 和 DB (持久化,不受 TTL 限制)
tg_message_id = tg_result.get("result", {}).get("message_id") if isinstance(tg_result, dict) else None
tg_chat_id = tg_result.get("result", {}).get("chat", {}).get("id") if isinstance(tg_result, dict) else None
if tg_message_id:
await redis.setex(f"tg_msg:{incident.incident_id}", 86400, str(tg_message_id))
# 持久化到 DB
try:
from src.services.approval_db import get_approval_service as _get_approval_svc
_approval_svc = _get_approval_svc()
await _approval_svc.update_telegram_message(
incident_id=incident.incident_id,
telegram_message_id=tg_message_id,
telegram_chat_id=tg_chat_id,
)
except Exception as _e:
logger.warning("telegram_message_id_db_save_failed", incident_id=incident.incident_id, error=str(_e))
# Phase 31 (ADR-067 2026-04-10): Log 異常摘要 — NemoTron deepseek-r1:14b
# 非同步執行,不阻塞主流程
_fire_and_forget(_send_log_summary(incident))
# MCP Phase 4a: NemoClaw second opinion (2026-04-11 Claude Sonnet 4.6)
# 若 proposal_data 有 advisory_note,用 NemoClaw bot 身分追加一條訊息
_advisory_note = proposal_data.get("advisory_note", "")
if _advisory_note:
_fire_and_forget(
gateway.send_as_nemotron(
f"🤔 NemoClaw 第二意見 (信心={confidence:.2f})\n"
f"{_advisory_note}"
)
)
# 🔴 發送成功後設置去重 key (TTL 24 小時)
# 2026-05-02 Claude Opus 4.7 + 統帥 ogt:TTL 從 600s 拉到 86400s。
# 原因:incident_analysis_sweeper 每 1h 重觸 decision、alertmanager 預設 4h repeat,
# 600s TTL 早就過期 → 重發;24h TTL 蓋住兩個週期,徹底治住「同症狀 24h 推 15 次」。
await redis.setex(dedup_key, 86400, "1")
logger.info(
"telegram_decision_pushed",
incident_id=incident.incident_id,
source=source,
risk_level=risk_level,
)
except Exception as e:
# Telegram 失敗不影響主流程
logger.warning(
"telegram_decision_push_failed",
incident_id=incident.incident_id,
error=str(e),
)
async def _nemoclaw_second_opinion(incident: "Incident", primary_result: dict) -> str | None:
"""
MCP Phase 4a: NemoClaw second opinion — 信心 < 0.7 時觸發
============================================================
用 deepseek-r1:14b (設定的 Ollama primary) 對同一份資料做獨立推理,
輸出純文字 advisory_note,不執行任何操作。
2026-04-11 Claude Sonnet 4.6 Asia/Taipei
"""
try:
from src.core.config import settings
import httpx as _httpx
from src.services.model_registry import get_model as _get_model
ollama_url = getattr(settings, "OLLAMA_URL", "http://34.143.170.20:11434") # 2026-05-03 ogt: ADR-110 GCP-A Primary
# D1 集中化 2026-04-11: 從 models.json providers.ollama.models.nemoclaw 讀取
model = _get_model("ollama", "nemoclaw")
signals_summary = ""
if incident.signals:
lbl = incident.signals[0].labels
signals_summary = (
f"alertname={lbl.get('alertname','?')} "
f"severity={lbl.get('severity','?')} "
f"instance={lbl.get('instance','?')}"
)
prompt = (
f"你是資深 SRE,請對以下告警做獨立分析並提出建議。\n"
f"告警摘要: {signals_summary}\n"
f"受影響服務: {', '.join(incident.affected_services or [])}\n"
f"主 AI 已提出: {primary_result.get('action','?')} "
f"(信心={primary_result.get('confidence',0):.2f})\n"
f"主 AI 根因: {primary_result.get('reasoning','')[:200]}\n\n"
f"請用繁體中文,100 字以內,給出你的獨立判斷與建議(若同意主 AI 則說明原因)。"
)
async with _httpx.AsyncClient(timeout=30.0) as client:
resp = await client.post(
f"{ollama_url}/api/generate",
json={"model": model, "prompt": prompt, "stream": False},
)
resp.raise_for_status()
data = resp.json()
advisory = data.get("response", "").strip()
# 截取 ... 後的正文(deepseek-r1 CoT 格式)
if "" in advisory:
advisory = advisory.split("", 1)[-1].strip()
return advisory[:300] if advisory else None
except Exception as e:
import structlog as _sl
_sl.get_logger(__name__).debug("nemoclaw_second_opinion_error", error=str(e))
return None
async def _generate_playbook_draft_if_new(incident: "Incident") -> None:
"""
MCP Phase 4c: Playbook 無命中時,自動生成 AI 草稿 Playbook 寫入 KM
=====================================================================
- 僅在 KM 中不存在同 alertname 的 Playbook 時觸發(避免重複)
- 用 qwen2.5:7b-instruct (設定的 Ollama primary) 生成結構化 Playbook 草稿
- 寫入 KnowledgeEntry,status=DRAFT,需人工審核後升為 APPROVED
- 寫入 AlertOperationLog PLAYBOOK_DRAFT_CREATED 事件
2026-04-11 Claude Sonnet 4.6 Asia/Taipei
"""
try:
import httpx as _httpx
from src.core.config import settings
from src.models.knowledge import (
EntrySource, EntryStatus, EntryType, KnowledgeEntryCreate,
)
from src.repositories.alert_operation_log_repository import get_alert_operation_log_repository
from src.services.knowledge_service import get_knowledge_service
alertname = ""
if incident.signals:
alertname = incident.signals[0].labels.get("alertname", "")
if not alertname:
return
# 已存在同 alertname 的 KM 條目則跳過
knowledge_svc = get_knowledge_service()
existing = await knowledge_svc.semantic_search(alertname, limit=1, threshold=0.92)
if existing:
return
# 用 qwen2.5:7b-instruct 生成 Playbook 草稿
severity = incident.signals[0].labels.get("severity", "warning") if incident.signals else "warning"
services = ", ".join(incident.affected_services or ["unknown"])
prompt = (
f"你是資深 SRE,請為以下告警生成一份結構化 Playbook 草稿(繁體中文)。\n"
f"告警名稱: {alertname}\n"
f"嚴重度: {severity}\n"
f"受影響服務: {services}\n\n"
f"請按以下格式輸出(不超過 300 字):\n"
f"## 症狀\n(描述此告警代表什麼)\n"
f"## 根因假設\n(最常見的 2-3 個原因)\n"
f"## 診斷步驟\n(kubectl 或 shell 指令)\n"
f"## 修復動作\n(具體修復指令,含 kubectl rollout restart 等)\n"
f"## 驗收條件\n(如何確認修復成功)"
)
from src.services.model_registry import get_model as _get_model
ollama_url = getattr(settings, "OLLAMA_URL", "http://34.143.170.20:11434") # 2026-05-03 ogt: ADR-110 GCP-A Primary
# D1 集中化 2026-04-11: 從 models.json providers.ollama.models.playbook_draft 讀取
_pb_model = _get_model("ollama", "playbook_draft")
async with _httpx.AsyncClient(timeout=45.0) as client:
resp = await client.post(
f"{ollama_url}/api/generate",
json={"model": _pb_model, "prompt": prompt, "stream": False},
)
resp.raise_for_status()
content = resp.json().get("response", "").strip()
if not content or len(content) < 50:
return
# 寫入 KM,status=DRAFT
entry = await knowledge_svc.create_entry(
KnowledgeEntryCreate(
title=f"[AI草稿] {alertname} Playbook",
content=content,
entry_type=EntryType.AUTO_RUNBOOK,
category="ai_system",
tags=[alertname, severity, "ai_draft", "mcp_phase4c"],
source=EntrySource.AI_EXTRACTED,
status=EntryStatus.DRAFT,
related_incident_id=incident.incident_id,
)
)
# 寫入操作日誌
op_repo = get_alert_operation_log_repository()
await op_repo.append(
event_type="PLAYBOOK_DRAFT_CREATED",
incident_id=incident.incident_id,
actor="mcp_phase4c",
action_detail=f"AI 草稿 Playbook: {entry.entry_id}",
success=True,
context={"alertname": alertname, "km_entry_id": entry.entry_id},
)
import structlog as _sl
_sl.get_logger(__name__).info(
"playbook_draft_created",
incident_id=incident.incident_id,
alertname=alertname,
entry_id=entry.entry_id,
)
# 2026-04-16 ogt + Claude Sonnet 4.6: KM 建立後推 Telegram 通知 (ADR-076)
# best-effort — 不阻塞主流程
try:
from src.services.telegram_gateway import get_telegram_gateway as _get_tg
_tg = _get_tg()
await _tg.send_as_nemotron(
f"📚 KM 新條目 — {entry.entry_id}\n"
f"🏷️ 分類:auto_generated 狀態:DRAFT\n"
f"📋 告警:{alertname} 事件:{incident.incident_id}\n"
f"標題:{str(getattr(entry, 'title', alertname))[:60]}"
)
except Exception as _tg_err:
_sl.get_logger(__name__).debug("km_tg_notify_failed", error=str(_tg_err))
except Exception as e:
import structlog as _sl
_sl.get_logger(__name__).debug("playbook_draft_failed", error=str(e))
async def _resolve_target_from_k8s(incident: "Incident", namespace: str) -> str | None:
"""
BUG-002 補救:主機層告警無 component/job/pod label 時,
用 K8s MCP kubectl get pods 依 alertname/host label 動態查詢受影響 Pod name,
回傳 deployment name(去掉 hash suffix)或 None。
2026-04-11 Claude Sonnet 4.6 Asia/Taipei
"""
try:
from src.plugins.mcp.providers.k8s_provider import K8sProvider
k8s = K8sProvider()
if not k8s.enabled:
return None
alertname = ""
if incident.signals:
labels = incident.signals[0].labels
alertname = labels.get("alertname", "")
# 用 kubectl get pods 列出所有 pods,再根據 alertname 推測受影響的 deployment
result = await k8s.execute(
tool_name="kubectl_get",
params={"resource": "pods", "namespace": namespace, "output": "name"},
)
if not result.get("success"):
return None
pod_lines: list[str] = (result.get("output", "") or "").splitlines()
if not pod_lines:
return None
# alertname → 關鍵字映射(主機層告警常見類型)
# I2 修復 2026-04-11: HostHighDiskUsage → HostOutOfDiskSpace(與 alerts-unified.yml 一致)
# DockerContainerUnhealthy/HostOutOfDiskSpace keywords=[] 走 fallback(找第一個非 infra pod)
# 並加 log 便於追蹤 fallback 路徑
_ALERTNAME_KEYWORDS: dict[str, list[str]] = {
"HostHighCpuLoad": ["api", "web"],
"HostOutOfMemory": ["api", "web"],
"DockerContainerUnhealthy": [],
"DockerContainerExited": [],
"HostOutOfDiskSpace": [],
}
keywords = _ALERTNAME_KEYWORDS.get(alertname, [])
if not keywords and alertname in _ALERTNAME_KEYWORDS:
logger.debug(
"resolve_target_k8s_fallback_to_first_pod",
alertname=alertname,
reason="alertname 有對應但 keywords=[],走 fallback 取第一個非 infra pod",
)
for line in pod_lines:
pod = line.removeprefix("pod/").strip()
if not pod:
continue
# 優先找關鍵字命中的 pod
if keywords and not any(kw in pod for kw in keywords):
continue
# 去掉 hash suffix → deployment name
parts = pod.rsplit("-", 2)
if len(parts) >= 3 and len(parts[-1]) == 5 and len(parts[-2]) in (9, 10):
return parts[0]
if len(parts) >= 2:
return "-".join(parts[:-1])
return pod
# 無關鍵字命中時,回傳第一個 non-infra pod
for line in pod_lines:
pod = line.removeprefix("pod/").strip()
if pod and not any(inf in pod for inf in ("prometheus", "alertmanager", "grafana")):
parts = pod.rsplit("-", 2)
if len(parts) >= 3:
return parts[0]
return pod
except Exception as e:
logger.debug("resolve_target_from_k8s_failed", error=str(e))
return None
async def _verify_k8s_deployment_exists(target: str, namespace: str, alertname: str = "") -> bool:
"""
BUG-003 補救:呼叫 K8s MCP 確認 deployment/pod 是否真實存在。
K8s MCP 不可用時:
- 主機層告警 (Host*/Docker*) → 返回 False(阻止 K8s 操作)
- K8s 層告警 → 返回 True(保守放行,讓 kubectl 自行報錯)
2026-04-11 Claude Sonnet 4.6 Asia/Taipei
"""
# 主機層告警前綴 — 這類告警的修復目標是 Docker container/主機,不是 K8s deployment
_HOST_ALERTNAME_PREFIXES = ("Host", "Docker", "Backup", "Velero", "SSH")
_is_host_alert = alertname.startswith(_HOST_ALERTNAME_PREFIXES) if alertname else False
try:
from src.plugins.mcp.providers.k8s_provider import K8sProvider
k8s = K8sProvider()
if not k8s.enabled:
# 主機層告警:K8s MCP 不可用 → 拒絕(不應對主機層問題執行 K8s 操作)
# K8s 層告警:保守放行,讓 kubectl 自行報錯
return not _is_host_alert
result = await k8s.execute(
tool_name="kubectl_get",
params={"resource": "deployment", "name": target, "namespace": namespace},
)
if result.get("success"):
return True
# 嘗試 pod(有些告警對應的是 pod 而非 deployment)
result_pod = await k8s.execute(
tool_name="kubectl_get",
params={"resource": "pod", "namespace": namespace, "selector": f"app={target}"},
)
return bool(result_pod.get("success") and result_pod.get("output", "").strip())
except Exception as e:
logger.debug("verify_k8s_deployment_exists_failed", target=target, error=str(e))
# 例外時:主機層告警拒絕,其他保守放行
return not _is_host_alert
async def _fetch_metrics_snapshot(incident: Incident) -> dict:
"""
ADR-071-I: 從 Prometheus 抓取與此 incident 相關的指標快照
失敗時靜默返回空 dict,不阻塞主流程
2026-04-11 Claude Sonnet 4.6 Asia/Taipei
"""
try:
from src.plugins.mcp.providers.prometheus_provider import PrometheusProvider
prom = PrometheusProvider()
if not prom.enabled:
return {}
labels = incident.signals[0].labels if incident.signals else {}
alertname = labels.get("alertname", "")
instance = labels.get("instance", "")
snapshots: dict = {}
# 根據 alertname 選擇最相關的指標
if alertname in ("HostHighCpuLoad", "HostOutOfMemory"):
if instance:
host = instance.split(":")[0]
# P0 fix 2026-04-11: _instant_query 要求 dict,回傳 {"result": [...]}
r = await prom._instant_query({"query": f'100 - (avg by(instance) (irate(node_cpu_seconds_total{{mode="idle",instance=~"{host}.*"}}[5m])) * 100)'})
for item in r.get("result", []):
snapshots["cpu_pct"] = round(float(item["value"][1]), 1)
cpu_query = (
f'(1 - (node_memory_MemAvailable_bytes{{instance=~"{instance}"}} / node_memory_MemTotal_bytes{{instance=~"{instance}"}})) * 100'
if instance else "100 * (1 - node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes)"
)
r2 = await prom._instant_query({"query": cpu_query})
for item in r2.get("result", []):
snapshots["mem_pct"] = round(float(item["value"][1]), 1)
elif alertname == "HostOutOfDiskSpace":
r = await prom._instant_query({"query": 'max(100 - ((node_filesystem_avail_bytes{fstype!="tmpfs"} / node_filesystem_size_bytes{fstype!="tmpfs"}) * 100))'})
for item in r.get("result", []):
snapshots["disk_pct"] = round(float(item["value"][1]), 1)
elif alertname in ("PodRestartingTooMuch", "PodCrashLoopBackOff"):
pod = labels.get("pod", labels.get("component", ""))
if pod:
r = await prom._instant_query({"query": f'sum(kube_pod_container_status_restarts_total{{namespace="awoooi-prod",pod=~"{pod}.*"}})'})
for item in r.get("result", []):
snapshots["restart_count"] = int(float(item["value"][1]))
return snapshots
except Exception as _e:
logger.debug("metrics_snapshot_failed", incident_id=incident.incident_id, error=str(_e))
return {}
def _format_metrics_delta(before: dict, after: dict) -> str:
"""
ADR-071-I: 格式化指標前後對比文字
例:CPU 92%→23% | Mem 78%→45%
"""
if not before and not after:
return ""
parts = []
for key, label in [("cpu_pct", "CPU"), ("mem_pct", "Mem"), ("disk_pct", "Disk")]:
b = before.get(key)
a = after.get(key)
if b is not None and a is not None:
parts.append(f"{label} {b}%→{a}%")
elif b is not None:
parts.append(f"{label} {b}% (before)")
elif a is not None:
parts.append(f"{label} {a}% (after)")
for key, label in [("restart_count", "Restarts")]:
b = before.get(key)
a = after.get(key)
if b is not None and a is not None:
parts.append(f"{label} {b}→{a}")
return " | ".join(parts)
async def _push_auto_repair_result(
incident: Incident,
action: str,
success: bool,
error: str = "",
) -> None:
"""
自動修復執行後,在原始告警訊息追加狀態行。
統帥要求: 所有狀態變更必須在原告警訊息延續,不發新訊息。
- append_incident_update() 取 Redis tg_msg:{id} → reply 原訊息 + 換按鈕
- 找不到 message_id 時 fallback 到 send_notification(降級)
ADR-071-I: 成功時抓 metrics_after 快照,寫入 incidents 表,並在通知中顯示前後對比
2026-04-09 Claude Sonnet 4.6 Asia/Taipei
2026-04-11 Claude Sonnet 4.6: +ADR-071-I 指標快照
"""
try:
from src.services.telegram_gateway import get_telegram_gateway
gateway = get_telegram_gateway()
target = incident.affected_services[0] if incident.affected_services else "unknown"
inc_id = incident.incident_id
# ADR-071-I: 抓 metrics_after(成功時)
metrics_delta_text = ""
if success:
metrics_after = await _fetch_metrics_snapshot(incident)
metrics_before = getattr(incident, "metrics_before", None) or {}
# 寫入 DB(不阻塞主流程)
if metrics_after:
try:
from src.db.base import get_db_context
from src.db.models import Incident as IncidentORM
from sqlalchemy import update as _update
async with get_db_context() as db:
await db.execute(
_update(IncidentORM)
.where(IncidentORM.incident_id == inc_id)
.values(metrics_after=metrics_after)
)
await db.commit()
logger.info("metrics_after_saved", incident_id=inc_id, metrics=metrics_after)
except Exception as _e:
logger.warning("metrics_after_save_failed", incident_id=inc_id, error=str(_e))
metrics_delta_text = _format_metrics_delta(metrics_before, metrics_after)
# MCP Phase 4b: 抓 K8s Pod 狀態寫入 k8s_state_after (2026-04-11 Claude Sonnet 4.6)
try:
from src.plugins.mcp.providers.k8s_provider import K8sProvider
_k8s = K8sProvider()
if _k8s.enabled:
_service = (incident.affected_services or [""])[0]
if _service:
_k8s_result = await _k8s.execute(
"kubectl_get",
{"resource_type": "pods", "namespace": "awoooi-prod",
"label_selector": f"app={_service}"},
)
if _k8s_result.success and _k8s_result.data:
_k8s_state = str(_k8s_result.data)[:500]
from src.db.base import get_db_context
from src.db.models import Incident as IncidentORM
from sqlalchemy import update as _upd2
async with get_db_context() as _db2:
await _db2.execute(
_upd2(IncidentORM)
.where(IncidentORM.incident_id == inc_id)
.values(k8s_state_after=_k8s_state)
)
await _db2.commit()
except Exception as _k8s_err:
logger.debug("k8s_state_after_failed", incident_id=inc_id, error=str(_k8s_err))
# 2026-05-02 ogt + Claude Sonnet 4.6: 強制標記 [AUTO],避免事後抵賴
# 統帥要求「就算是自動化處理,也要發告警訊息出來」—— 所有自治動作必須留痕,
# 且 Telegram 上能明顯與人工點擊區隔。
if success:
delta_line = f"\n├ 指標: {metrics_delta_text}" if metrics_delta_text else ""
status_line = (
f"🤖 [AUTO] AI 自動修復完成\n"
f"├ 動作: {action[:100] if action else '已執行'}\n"
f"├ Actor: leWOOOgo (autonomous)"
f"{delta_line}"
)
else:
status_line = (
f"🤖❌ [AUTO] AI 自動修復失敗,已升級人工介入\n"
f"├ 動作: {action[:80] if action else '未知'}\n"
f"├ Actor: leWOOOgo (autonomous)\n"
f"└ 錯誤: {error[:100] if error else '未知錯誤'}"
)
# BUG-006 修復 2026-04-11: outcome + verification_result 全為 null
# 原因:_push_auto_repair_result 只送 Telegram,沒寫 DB
# 修復:寫入 incidents 表 outcome/verification_result 欄位
try:
from src.db.base import get_db_context
from src.db.models import IncidentRecord
from sqlalchemy import update as _upd_outcome
_outcome = "auto_repaired" if success else "auto_repair_failed"
async with get_db_context() as _odb:
await _odb.execute(
_upd_outcome(IncidentRecord)
.where(IncidentRecord.incident_id == inc_id)
.values(outcome=_outcome)
)
await _odb.commit()
logger.info("outcome_written", incident_id=inc_id, outcome=_outcome)
except Exception as _oe:
logger.warning("outcome_write_failed", incident_id=inc_id, error=str(_oe))
# 優先: reply 原告警訊息並換掉按鈕
appended = await gateway.append_incident_update(
incident_id=inc_id,
status_line=status_line,
keep_info_buttons=True, # 保留詳情/重診/歷史,移除批准/拒絕
)
# Fallback: 找不到原訊息 ID(舊告警或 Redis 過期)→ 發新訊息
# 2026-04-15 ogt: 改為 ADR-075 TYPE-2 格式(禁止 raw text)
if not appended:
result_icon = "✅" if success else "❌"
result_label = "完成" if success else "失敗"
fallback_text = (
f"{result_icon} TYPE-2 | 自動修復{result_label}\n"
"──────────────────────\n"
f"├─ 事件: {inc_id}\n"
f"├─ 對象: {target[:50]}\n"
f"└─ {status_line}"
)
await gateway.send_alert_notification(fallback_text)
logger.info("auto_repair_result_sent", incident_id=inc_id, success=success, appended=appended)
except Exception as e:
logger.warning("auto_repair_result_push_failed", incident_id=incident.incident_id, error=str(e))
# =============================================================================
# Decision States
# =============================================================================
class DecisionState(str, Enum):
"""決策狀態機"""
INIT = "init" # 事件剛建立
ANALYZING = "analyzing" # 正在分析
READY = "ready" # 決策就緒
EXECUTING = "executing" # 正在執行
COMPLETED = "completed" # 已完成
ERROR = "error" # 錯誤
# =============================================================================
# Expert System - 規則引擎 (Local Fallback)
# =============================================================================
EXPERT_RULES: dict[str, dict[str, Any]] = {
# Pod 崩潰 → 重啟
"pod_crash": {
"patterns": ["crash", "restart", "oom", "killed", "failed"],
"action": "kubectl rollout restart deployment/{target}",
"description": "Expert System: 偵測到 Pod 異常,建議重啟部署",
"risk_level": "medium",
"reasoning": "根據歷史數據,重啟可解決 85% 的 Pod 崩潰問題",
},
# 高延遲 → 擴容
"high_latency": {
"patterns": ["latency", "slow", "timeout", "p99"],
"action": "kubectl scale deployment/{target} --replicas=3",
"description": "Expert System: 偵測到高延遲,建議擴容至 3 副本",
"risk_level": "low",
"reasoning": "擴容可分散負載,降低單一 Pod 壓力",
},
# 高錯誤率 → 回滾
"high_error_rate": {
"patterns": ["error", "5xx", "fail", "exception"],
"action": "kubectl rollout undo deployment/{target}",
"description": "Expert System: 偵測到高錯誤率,建議回滾至上一版",
"risk_level": "critical",
"reasoning": "錯誤率突增通常源自最近部署,回滾是最快修復方式",
},
# 資源耗盡 → 擴容
"resource_exhaustion": {
"patterns": ["cpu", "memory", "resource", "quota"],
"action": "kubectl scale deployment/{target} --replicas=2",
"description": "Expert System: 偵測到資源耗盡,建議擴容",
"risk_level": "medium",
"reasoning": "增加副本可分散資源壓力",
},
# 預設 → 重啟 (最保守)
"default": {
"patterns": [],
"action": "kubectl rollout restart deployment/{target}",
"description": "Expert System: 無法確定具體問題,建議安全重啟",
"risk_level": "medium",
"reasoning": "重啟是最安全的通用修復動作",
},
}
def _package_to_proposal_data(package: Any) -> dict[str, Any]:
"""
將 Phase 2 DecisionPackage 轉換為 proposal_data dict。
proposal_data 格式是既有 decision_manager 決策流程的共同語言,
所有下游(auto_approve / Telegram / approval_record)都依此格式讀取。
ADR-082: Phase 2 多 Agent 協作
2026-04-15 ogt + Claude Sonnet 4.6(亞太)
"""
action = package.recommended_action or ""
confidence = package.confidence
# requires_human_approval → risk_level 映射
# Phase 2 Reviewer + Critic 已做安全審查,不需要 human = low/medium
if package.requires_human_approval:
risk_level = "high" if confidence > 0 else "critical"
elif confidence >= 0.8:
risk_level = "low"
elif confidence >= 0.6:
risk_level = "medium"
else:
risk_level = "high"
# 組出人類可讀的 description(Telegram 卡片顯示用)
# 2026-04-16 ogt + Claude Sonnet 4.6: 修復 description=debate_summary garbage
diag = getattr(package, "diagnosis", None)
plan = getattr(package, "action_plan", None)
desc_parts = []
if diag and getattr(diag, "top_hypothesis", None):
h = diag.top_hypothesis
desc_parts.append(f"根因:{h.description[:150]}(信心 {h.confidence:.0%})")
if plan and getattr(plan, "top_candidate", None):
c = plan.top_candidate
desc_parts.append(f"方案:{c.action[:100]}")
# blocked_reason 是系統內部診斷,不能放進 description(Telegram 卡片顯示用)
# 2026-04-17 ogt + Claude Sonnet 4.6(亞太): 修復超時髒資料污染卡片
# 舊:blocked_reason → desc_parts → description → suggested_action 欄位顯示「備注:全局超時 > 90.0s」
# 新:blocked_reason 只寫入 proposal_data["blocked_reason"],供下游閘門邏輯用,禁止進卡片顯示
# 2026-04-24 ogt + Claude Sonnet 4.6: ADR-091 鐵閘 bypass 修復
# 問題:Agent Debate 超時 (TIMEOUT/FAILED) → desc_parts 空 + action 空 → description="待分析"
# → ADR-091 鐵閘攔截(description=待分析 AND action in FAILED_TOKENS)→ PENDING 積壓
# 修法:超時/全降級時使用明確降級文字,讓鐵閘放行 → Telegram 推送降級通知 → 人工可見
if desc_parts:
description = ";".join(desc_parts)
elif action:
description = action[:200]
else:
_status = getattr(package, "session_status", None)
_status_val = _status.value if _status and hasattr(_status, "value") else ""
if _status_val == "timeout":
description = "AI 分析超時(90s),降級至人工審核"
elif _status_val == "failed" or getattr(package, "all_agents_degraded", False):
description = "AI 分析降級,所有 Agent 無有效輸出,建議人工確認"
else:
description = "待分析"
return {
"action": action,
"kubectl_command": action if action.startswith("kubectl") else "",
"description": description,
"reasoning": package.debate_summary[:1000] if package.debate_summary else "",
"confidence": confidence,
"risk_level": risk_level,
"source": "phase2_agent_debate",
"provider": "phase2_agents", # 2026-04-24 ogt: 讓 Telegram 顯示 "Phase2 Agents" 而非通用 "AI 仲裁"
"requires_human_review": package.requires_human_approval,
# Phase 2 診斷摘要(供 Audit Trail / 學習閉環,不直接顯示給用戶)
"debate_summary": package.debate_summary or "",
"all_agents_degraded": package.all_agents_degraded,
"blocked_reason": package.blocked_reason or "",
"session_status": package.session_status.value if package.session_status else "",
# MINOR-2: 補齊 Expert System / LLM 路徑存在的 keys,防止下游 .get() 靜默空缺
"is_rule_based": False,
"matched_rule": "",
"rule_id": "",
"from_cache": False,
}
def expert_analyze(incident: Incident) -> dict[str, Any]:
"""
Expert System 規則引擎分析
這是 100% 本地執行,永不失敗的保底方案
"""
target = incident.affected_services[0] if incident.affected_services else "unknown-service"
alert_names = " ".join([s.alert_name.lower() for s in incident.signals])
# 匹配規則
matched_rule = "default"
for rule_name, rule in EXPERT_RULES.items():
if rule_name == "default":
continue
if any(pattern in alert_names for pattern in rule["patterns"]):
matched_rule = rule_name
break
rule = EXPERT_RULES[matched_rule]
# 2026-03-29 ogt: Expert System 不應該假裝有高信心分數
# 設為 0.0 強制標記為規則匹配,而非 AI 仲裁
return {
"source": "expert_system",
"action": rule["action"].format(target=target),
"description": rule["description"],
"diagnosis_description": rule["description"], # openclaw.py reads this key
"risk_level": rule["risk_level"],
"reasoning": f"[規則匹配] {rule['reasoning']}", # 明確標示來源
"confidence": 0.0, # 🔴 規則匹配不是 AI 仲裁,信心度設 0
"kubectl_command": rule["action"].format(target=target),
"matched_rule": matched_rule,
"initial_diagnosis": matched_rule, # openclaw.py reads this key
"from_cache": False,
"is_rule_based": True,
}
# =============================================================================
# Decision Token (Redis)
# =============================================================================
class DecisionToken:
"""
決策令牌 - 前端持有此 token 即可操作
Redis Key: decision:{token}
TTL: 1 小時
"""
def __init__(
self,
token: str,
incident_id: str,
state: DecisionState,
proposal_data: dict[str, Any] | None = None,
proposal_id: str | None = None,
created_at: datetime | None = None,
updated_at: datetime | None = None,
error: str | None = None,
):
self.token = token
self.incident_id = incident_id
self.state = state
self.proposal_data = proposal_data
self.proposal_id = proposal_id
self.created_at = created_at or datetime.now(UTC)
self.updated_at = updated_at or datetime.now(UTC)
self.error = error
def to_dict(self) -> dict[str, Any]:
return {
"token": self.token,
"incident_id": self.incident_id,
"state": self.state.value,
"proposal_data": self.proposal_data,
"proposal_id": self.proposal_id,
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat(),
"error": self.error,
}
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "DecisionToken":
return cls(
token=data["token"],
incident_id=data["incident_id"],
state=DecisionState(data["state"]),
proposal_data=data.get("proposal_data"),
proposal_id=data.get("proposal_id"),
created_at=datetime.fromisoformat(data["created_at"]) if data.get("created_at") else None,
updated_at=datetime.fromisoformat(data["updated_at"]) if data.get("updated_at") else None,
error=data.get("error"),
)
# =============================================================================
# Protocol Interface (Phase 17 P1 - 紅區治理)
# =============================================================================
@runtime_checkable
class IDecisionManager(Protocol):
"""
DecisionManager 介面定義
用途:
- 依賴注入 (DI) 時的型別約束
- 測試時 Mock 的型別檢查
- 符合 leWOOOgo 積木化規範
Tier 3 紅區服務: 修改需首席架構師簽核
@see feedback_lewooogo_modular_enforcement.md
@see docs/RED_ZONES.md
"""
async def get_or_create_decision(
self,
incident: "Incident",
timeout_sec: float = 30.0,
) -> "DecisionToken":
"""取得或建立決策令牌"""
...
async def mark_executing(self, token: str) -> "DecisionToken | None":
"""標記決策為執行中"""
...
async def mark_completed(self, token: str, result: dict[str, Any] | None = None) -> "DecisionToken | None":
"""標記決策為已完成"""
...
# =============================================================================
# Decision Manager
# =============================================================================
DECISION_TOKEN_PREFIX = "decision:"
DECISION_TOKEN_TTL = 3600 # 1 小時
class DecisionManager:
"""
決策管理器 - Phase 6.5 核心
職責:
1. 為每個 Incident 簽發 decision_token
2. 並行執行 LLM + Expert System
3. First-Win 或 Fallback 策略
4. 確保 UI 永遠有決策可操作
"""
def __init__(self):
self._openclaw = get_openclaw()
# I2 修復 (首席架構師 Review): 注入 KnowledgeService 避免函數內 import 耦合
# 2026-04-04 Claude Code
from src.services.knowledge_service import get_knowledge_service
self._knowledge_svc = get_knowledge_service()
# I2 續修 2026-04-12 Claude Sonnet 4.6 Asia/Taipei: 注入 MCP Providers
# 原 _collect_mcp_context 在函數內 import + 直接實例化,違反積木化鐵律
from src.plugins.mcp.providers.ssh_provider import SSHProvider
from src.plugins.mcp.providers.k8s_provider import K8sProvider
self._ssh = SSHProvider()
self._k8s = K8sProvider()
async def get_or_create_decision(
self,
incident: Incident,
timeout_sec: float = 30.0,
) -> DecisionToken:
"""
取得或建立決策令牌
核心邏輯:
1. 檢查是否已有 token
2. 沒有則建立新 token (INIT)
3. 啟動非同步分析 (ANALYZING)
4. 等待結果或 timeout 後使用 Expert System
這個方法保證在 timeout_sec 內返回有效 token
"""
_redis_client = get_redis()
# 1. 先檢查現有 token(所有類型統一入口)
# 2026-04-16 ogt + Claude Sonnet 4.6: 修復 TYPE-1 bypass 未檢查 existing token 導致
# HostBackupFailed 等告警重複洗版 — existing token 檢查必須在 TYPE-1 bypass 前執行
existing_token = await self._find_existing_token(incident.incident_id)
if existing_token:
# READY 或 EXECUTING 狀態: 直接返回
if existing_token.state in (DecisionState.READY, DecisionState.EXECUTING):
return existing_token
# COMPLETED 狀態: 直接返回,避免重複建立 decision 導致 Telegram 轟炸
if existing_token.state == DecisionState.COMPLETED:
return existing_token
# 2026-04-16 ogt + Claude Sonnet 4.6: 修復重複卡片根因 — ANALYZING 未早返回
# 問題:多 pod 並發時 pod-A 在 ANALYZING,pod-B/C 發現 ANALYZING 不在返回條件
# → 各自建新 token → 同一 incident 跑 3 次 agent_debate → 送出 3 張 TG 卡
# 修復:ANALYZING 狀態也直接返回,避免重複處理
if existing_token.state == DecisionState.ANALYZING:
logger.debug(
"decision_analyzing_in_progress",
incident_id=incident.incident_id,
token=existing_token.token,
reason="另一個 worker 正在分析中,跳過重複建立",
)
return existing_token
# ADR-073 Phase 3-1: TYPE-1 triage guard — 純資訊告警跳過 LLM 分析
# classify_alert_early() 已在 webhook 入口設定 notification_type
# TYPE-1 (info/backup/heartbeat) 不需 AI 推理,直接推 Telegram 後返回
# 2026-04-12 ogt
if getattr(incident, "notification_type", None) == "TYPE-1":
_info_token = DecisionToken(
token=f"DEC-{uuid4().hex[:12].upper()}",
incident_id=incident.incident_id,
state=DecisionState.COMPLETED,
proposal_data={
"source": "triage_guard",
"notification_type": "TYPE-1",
"decision_state": "COMPLETED",
"auto_executed": False,
"confidence": 1.0,
"risk_level": "low",
"description": "純資訊通知,無需操作",
},
)
# 2026-04-16 ogt + Claude Sonnet 4.6: TYPE-1 token TTL 24h 防洗版
# 原 3600s 導致每小時重推同一 HostBackupFailed/TYPE-1 告警
await self._save_token(_info_token, ttl=86400)
_fire_and_forget(_push_decision_to_telegram(incident, _info_token.proposal_data))
logger.info(
"decision_type1_bypass",
incident_id=incident.incident_id,
notification_type="TYPE-1",
)
return _info_token
# 2. 建立新 token
token = DecisionToken(
token=f"DEC-{uuid4().hex[:12].upper()}",
incident_id=incident.incident_id,
state=DecisionState.ANALYZING,
)
await self._save_token(token)
logger.info(
"decision_analyzing",
token=token.token,
incident_id=incident.incident_id,
)
# 3. 並行執行雙軌決策
try:
proposal_data = await asyncio.wait_for(
self._dual_engine_analyze(incident),
timeout=timeout_sec,
)
token.state = DecisionState.READY
token.proposal_data = proposal_data
token.updated_at = datetime.now(UTC)
# 2026-04-27 Wave8-B1 by Claude — EvidenceSnapshot 不可 JSON 序列化,
# 從 proposal_data pop 後存入 local var,供下方 fusion block 讀取
_pre_fusion_evidence = token.proposal_data.pop("_evidence_snapshot_ref", None)
logger.info(
"decision_ready",
token=token.token,
source=proposal_data.get("source", "unknown"),
)
except TimeoutError:
# Timeout: 使用 Expert System 保底
logger.warning(
"decision_timeout_using_expert",
token=token.token,
timeout_sec=timeout_sec,
)
expert_result = expert_analyze(incident)
token.state = DecisionState.READY
token.proposal_data = expert_result
token.updated_at = datetime.now(UTC)
except Exception as e:
# 任何錯誤: 使用 Expert System 保底
logger.exception(
"decision_error_using_expert",
token=token.token,
error=str(e),
)
expert_result = expert_analyze(incident)
token.state = DecisionState.READY
token.proposal_data = expert_result
token.error = str(e)
token.updated_at = datetime.now(UTC)
# 4. 儲存最終結果
await self._save_token(token)
# 4b. 2026-04-16 Claude Sonnet 4.6: 將 AI 分析結果寫入 incidents.decision_chain (DB 長期保存)
# 修復 Gap: decision token 只有 Redis TTL ~12min,AI 分析結果歷史永久丟失
# 寫入: diagnosis / confidence / risk_level / provider / source
if token.proposal_data:
_fire_and_forget(
self._persist_decision_to_db(incident.incident_id, token.proposal_data)
)
# 4c. YAML NO_ACTION 閘門 — 決策路由中樞(2026-04-17 ogt + Claude Sonnet 4.6)
# 架構原則:通知層(_push_decision_to_telegram)只做狀態轉譯,不查業務邏輯。
# 因此 NO_ACTION / INVALID_TARGET 判斷必須在此閘門統一完成,任何路徑都必須通過。
#
# 根因(先前盲點):Phase 2 路徑拒絕後直接推 TG,不經 auto_execute() 的 YAML check,
# Coordinator 不設 blocked_reason → TG 收到空 blocked_reason → 無法正確分類通知類型。
# 修復:在 auto_approve 之前,統一查詢 YAML 規則:
# NO_ACTION → 標記 blocked_reason + is_informational_only → 短路跳過 auto_approve
# INVALID_TARGET → 標記 blocked_reason → 短路,TYPE-4 由 TG 層轉譯
if token.state == DecisionState.READY and token.proposal_data and incident.signals:
_gate_alertname = incident.signals[0].labels.get("alertname", "")
if _gate_alertname and not token.proposal_data.get("blocked_reason", ""):
try:
from src.services.alert_rule_engine import match_rule as _gate_match
_gate_r = _gate_match({
"labels": incident.signals[0].labels,
"alert_type": _gate_alertname,
"message": "",
"target_resource": (
incident.affected_services[0]
if incident.affected_services else "unknown"
),
"namespace": "awoooi-prod",
})
if _gate_r:
_gate_blocked = _gate_r.get("blocked_reason", "")
_gate_action = _gate_r.get("suggested_action", "")
if "INVALID_TARGET" in _gate_blocked or _gate_action == "NO_ACTION":
# 標記 blocked_reason,讓 TG 層正確轉譯通知類型
if "INVALID_TARGET" in _gate_blocked:
token.proposal_data["blocked_reason"] = _gate_blocked
else:
token.proposal_data["blocked_reason"] = f"YAML: NO_ACTION for {_gate_alertname}"
token.proposal_data["is_informational_only"] = True
token.proposal_data["auto_executed"] = False
token.proposal_data["decision_state"] = token.state.value
await self._save_token(token)
_fire_and_forget(
_push_decision_to_telegram(incident, token.proposal_data)
)
logger.info(
"yaml_gate_short_circuit",
incident_id=incident.incident_id,
alertname=_gate_alertname,
blocked_reason=token.proposal_data["blocked_reason"],
)
return token # 短路 — 跳過 auto_approve + Blast Radius 評估
except Exception as _gate_err:
logger.debug("yaml_gate_error", error=str(_gate_err))
# 閘門查詢失敗 → 降級繼續正常流程(不阻塞)
# 4d. P2.1 決策融合(方法 III)— feature flag 守衛,失敗不阻塞主流程
# 2026-04-26 P2.1 by Claude — decision fusion 方法 III
# 融合分數寫入 token.proposal_data["decision_fusion"],供 TG 卡片 + 學習服務使用。
# 不修改 auto_approve 邏輯(遵循最小變更原則),僅補充 metadata。
if token.state == DecisionState.READY and token.proposal_data:
try:
from src.core.feature_flags import aiops_flags as _ff
if _ff.is_sub_flag_enabled("AIOPS_P2_FUSION_ENABLED"):
from src.services.decision_fusion import (
get_decision_fusion_engine as _get_fusion,
complexity_from_score as _complexity_from_score,
)
_fusion_engine = _get_fusion()
# 2026-04-27 Wave8-B2 by Claude — fusion 三斷鏈修復:
# complexity_score 從未被寫入 token,導致 fusion 永遠使用預設值 3。
# 修法:在 fusion 前呼叫 ComplexityScorer.score(),結果寫入 token。
if not token.proposal_data.get("complexity_score"):
try:
from src.services.complexity_scorer import (
get_complexity_scorer as _get_complexity_scorer,
)
_cs_context = {
"affected_services": incident.affected_services or [],
"resource_count": len(incident.affected_services or []),
"severity": (
incident.severity.value
if hasattr(incident.severity, "value")
else "medium"
),
}
_cs_result = _get_complexity_scorer().score(_cs_context)
token.proposal_data["complexity_score"] = _cs_result.score
except Exception as _cs_err:
logger.debug("complexity_score_calc_error", error=str(_cs_err))
# 計算失敗 → 保持預設值 3(下面 .get() 仍會正確 fallback)
_complexity_score = token.proposal_data.get("complexity_score", 3)
_complexity_tier = _complexity_from_score(
int(_complexity_score) if isinstance(_complexity_score, (int, float, str)) else 3
)
_openclaw_proposal = (
token.proposal_data.get("kubectl_command", "")
or token.proposal_data.get("description", "")
or ""
)
# 2026-04-27 Wave8-B1 by Claude — fusion 三斷鏈修復:
# 從 local var 讀取(已在 line 1428 pop 出,避免 EvidenceSnapshot
# object 污染 _save_token 的 json.dumps 序列化)
_fusion_evidence = _pre_fusion_evidence
_fusion_score = await _fusion_engine.fuse_decision(
incident=incident,
openclaw_proposal=_openclaw_proposal,
evidence=_fusion_evidence,
complexity=_complexity_tier,
)
token.proposal_data["decision_fusion"] = _fusion_score.to_dict()
await self._save_token(token)
# ADR-085 鐵律:fusion 分數落地 PG(失敗不阻塞主流程)
# 2026-04-26 P2-DB-Fix by Claude — db-expert P0 三修(P0.3)
try:
from src.services.approval_db import get_approval_service as _get_appr_svc
await _get_appr_svc().update_decision_fusion(
incident_id=incident.incident_id,
composite_score=_fusion_score.composite,
complexity_tier=_complexity_tier.value,
fusion_details=_fusion_score.to_dict(),
)
except Exception as _appr_err:
logger.debug("decision_fusion_pg_write_error", error=str(_appr_err))
except Exception as _fusion_err:
logger.debug("decision_fusion_error", error=str(_fusion_err))
# fusion 失敗不阻塞主流程
# 5. ADR-030 Phase 4: 自動執行判斷
if token.state == DecisionState.READY and token.proposal_data:
# 評估是否可以自動執行
auto_policy = get_auto_approve_policy()
auto_decision = auto_policy.evaluate(
proposal_data=token.proposal_data,
playbook=token.proposal_data.get("_matched_playbook"), # 如果有
)
if auto_decision.should_auto_approve:
# 自動執行 (跳過人工審核)
logger.info(
"auto_approve_triggered",
incident_id=incident.incident_id,
reason=auto_decision.reason.value,
detail=auto_decision.reason_detail,
)
token.state = DecisionState.EXECUTING
token.proposal_data["auto_approved"] = True
token.proposal_data["auto_approve_reason"] = auto_decision.reason_detail
await self._save_token(token)
# 觸發自動執行 (非阻塞)
_fire_and_forget(
self._auto_execute(incident, token)
)
else:
# 需人工審核: 推送到 Telegram
# ADR-071: 注入 decision_state + auto_executed 供 classify_notification 使用
token.proposal_data["decision_state"] = token.state.value if token.state else ""
token.proposal_data["auto_executed"] = False
token.proposal_data["auto_approve_reason"] = auto_decision.reason.value
token.proposal_data["blocked_reason"] = auto_decision.reason_detail
if _should_escalate_auto_approve_rejection(auto_decision.reason):
_fire_and_forget(
_escalate_decision_auto_repair_unavailable(
incident=incident,
token=token,
failure_reason=auto_decision.reason_detail,
attempted_actions=(
"decision_manager -> auto_approve_policy:"
f"{auto_decision.reason.value} -> emergency_intervention"
),
)
)
_fire_and_forget(
_push_decision_to_telegram(incident, token.proposal_data)
)
return token
async def _auto_execute(self, incident: Incident, token: "DecisionToken") -> None:
"""
ADR-030 Phase 4: 自動執行已批准的操作
僅當 AutoApprovePolicy 判斷可自動執行時呼叫
執行後發 Telegram 結果通知 (統帥要求: 修復結果對應同一告警)
2026-04-09 Claude Sonnet 4.6 Asia/Taipei
"""
action = token.proposal_data.get("kubectl_command", "") or token.proposal_data.get("action", "")
_alert_labels = incident.signals[0].labels if incident.signals else {}
# 2026-05-02 ogt + Claude Sonnet 4.6: YAML 是權威,先覆蓋 LLM 生成的 action
# 根因:LLM/Phase2 會先產出 node-exporter/kubectl 的錯域建議,導致
# bare_metal 告警每次都被攔回人工,形成循環。
# 修法:先執行 YAML match_rule 對齊,若 YAML 有可落地 command 則覆蓋。
_alertname_for_yaml = incident.signals[0].labels.get("alertname", "") if incident.signals else ""
if _alertname_for_yaml:
try:
from src.services.alert_rule_engine import match_rule as _yaml_match
_yaml_r = _yaml_match({
"labels": _alert_labels,
"alert_type": _alertname_for_yaml,
"message": "",
"target_resource": incident.affected_services[0] if incident.affected_services else "unknown",
"namespace": "awoooi-prod",
})
if _yaml_r:
if _yaml_r.get("suggested_action") == "NO_ACTION":
logger.info(
"auto_execute_yaml_no_action",
incident_id=incident.incident_id,
alertname=_alertname_for_yaml,
reason="YAML 規則明確標記 NO_ACTION,不執行自動修復",
)
token.state = DecisionState.READY
token.proposal_data["auto_executed"] = False
token.proposal_data["blocked_reason"] = f"YAML: NO_ACTION for {_alertname_for_yaml}"
await self._save_token(token)
_fire_and_forget(_push_decision_to_telegram(incident, token.proposal_data))
return
# 2026-04-16 ogt + Claude Sonnet 4.6: INVALID_TARGET → 人工確認 TYPE-4
_yaml_blocked = _yaml_r.get("blocked_reason", "")
if "INVALID_TARGET" in _yaml_blocked:
logger.warning(
"auto_execute_yaml_invalid_target",
incident_id=incident.incident_id,
alertname=_alertname_for_yaml,
blocked_reason=_yaml_blocked,
)
token.state = DecisionState.READY
token.proposal_data["auto_executed"] = False
token.proposal_data["blocked_reason"] = _yaml_blocked
await self._save_token(token)
_fire_and_forget(_push_decision_to_telegram(incident, token.proposal_data))
return
_yaml_cmd = (_yaml_r.get("kubectl_command") or "").strip()
if _yaml_cmd and not _yaml_cmd.startswith("kubectl"):
# YAML 給出 SSH / docker 指令 → 覆蓋 LLM 生成的 action
action = _yaml_cmd
token.proposal_data["action"] = action
token.proposal_data["kubectl_command"] = action
await self._save_token(token)
logger.info(
"auto_execute_yaml_cmd_override",
incident_id=incident.incident_id,
alertname=_alertname_for_yaml,
yaml_cmd=action[:80],
)
except Exception as _yaml_err:
logger.debug("auto_execute_yaml_check_error", error=str(_yaml_err))
# 2026-05-02 ogt + Claude Sonnet 4.6: bare_metal × kubectl 拒絕守衛
# 根因:HostHighCpuLoad / HostOutOfMemory 等主機層告警 fire 在實體機(如 110,
# 上面跑 Sentry/ClickHouse/Snuba 等第三方),但 LLM 看到 instance 後容易
# 亂提「kubectl rollout restart awoooi-api」這種對症錯誤的 K8s action,
# 重啟 awoooi 服務根本解不了第三方 CPU 燒爆,只是拖累自己。
# 修法:偵測到 alert host_type=bare_metal 且 action 是 kubectl 類,立即降級人工,
# Telegram 明示「跨 domain 動作被攔下」。auto_repair 走 SSH 診斷或人工。
_host_type = (_alert_labels.get("host_type") or "").lower()
_action_stripped = action.lstrip().lower()
if _host_type == "bare_metal" and _action_stripped.startswith("kubectl"):
logger.warning(
"auto_execute_blocked_bare_metal_kubectl",
incident_id=incident.incident_id,
alertname=_alert_labels.get("alertname", ""),
instance=_alert_labels.get("instance", ""),
proposed_action=action[:120],
reason="bare_metal host alert + kubectl action = wrong domain",
)
token.state = DecisionState.READY
token.proposal_data["auto_executed"] = False
token.proposal_data["blocked_reason"] = (
f"host_type=bare_metal 但 LLM 提案 kubectl 動作 ({action[:60]})。"
" 主機層告警的根因常在第三方服務(如 Sentry / ClickHouse),"
" 重啟 K8s deployment 解不了,已降級人工。"
)
await self._save_token(token)
_fire_and_forget(_escalate_decision_auto_repair_unavailable(
incident=incident,
token=token,
failure_reason="bare_metal alert routed to kubectl action (wrong domain)",
attempted_actions=f"action={action[:120]}",
))
_fire_and_forget(_push_decision_to_telegram(incident, token.proposal_data))
return
# Phase 6 ADR-087: 自我降級守衛(AIOPS_P6_SELF_DEMOTION 控制)
# SLO 違反 → 全域信心閾值調高;連續違反 → 保守模式,所有自動執行降為人工
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 6 初始建立
try:
from src.core.feature_flags import aiops_flags as _p6_flags
if _p6_flags.is_sub_flag_enabled("AIOPS_P6_SELF_DEMOTION"):
from src.db.base import get_session_factory as _p6_sf
from src.db.models import AiGovernanceEvent as _GovernanceEvent
from sqlalchemy import select as _p6_select, func as _p6_func
from datetime import timedelta as _p6_td
_now = __import__("src.utils.timezone", fromlist=["now_taipei"]).now_taipei()
async with _p6_sf()() as _p6_sess:
# 過去 7 天有幾筆未解決的 slo_violation?
_viol_7d_q = await _p6_sess.execute(
_p6_select(_p6_func.count()).where(
_GovernanceEvent.event_type == "slo_violation",
_GovernanceEvent.resolved.is_(False),
_GovernanceEvent.triggered_at >= _now - _p6_td(days=7),
)
)
_viol_7d: int = _viol_7d_q.scalar() or 0
# 過去 14 天有幾筆未解決的 slo_violation?
_viol_14d_q = await _p6_sess.execute(
_p6_select(_p6_func.count()).where(
_GovernanceEvent.event_type == "slo_violation",
_GovernanceEvent.resolved.is_(False),
_GovernanceEvent.triggered_at >= _now - _p6_td(days=14),
)
)
_viol_14d: int = _viol_14d_q.scalar() or 0
if _viol_14d >= 2:
# 連續 2 週違反 → 保守模式:全部降為人工
logger.warning(
"auto_execute_conservative_mode",
incident_id=incident.incident_id,
viol_14d=_viol_14d,
reason="Phase 6 保守模式:連續 SLO 違反,所有自動執行暫停",
)
token.state = DecisionState.READY
token.proposal_data["decision_state"] = DecisionState.READY.value
token.proposal_data["auto_executed"] = False
token.proposal_data["p6_conservative_mode"] = True
token.proposal_data["p6_reason"] = f"SLO 連續違反 {_viol_14d}d,系統進入保守模式"
await self._save_token(token)
_fire_and_forget(
_push_decision_to_telegram(incident, token.proposal_data)
)
# 記錄保守模式事件
try:
from src.db.base import get_session_factory as _p6_sf2
async with _p6_sf2()() as _s2:
_s2.add(_GovernanceEvent(
event_type="conservative_mode",
details={
"incident_id": incident.incident_id,
"viol_14d": _viol_14d,
"triggered_at": _now.isoformat(),
},
resolved=False,
))
await _s2.commit()
except Exception:
pass
return
elif _viol_7d >= 1:
# 近 7 天有違反 → 自我降級:信心閾值提高,記錄 demotion 事件
_confidence = float(token.proposal_data.get("confidence", 0.0))
_raised_threshold = 0.75 # 原 0.70 → 調高 0.05
if _confidence < _raised_threshold:
logger.warning(
"auto_execute_self_demoted",
incident_id=incident.incident_id,
confidence=_confidence,
raised_threshold=_raised_threshold,
reason="Phase 6 自我降級:近 7d SLO 違反,信心閾值提高",
)
token.state = DecisionState.READY
token.proposal_data["decision_state"] = DecisionState.READY.value
token.proposal_data["auto_executed"] = False
token.proposal_data["p6_self_demoted"] = True
token.proposal_data["p6_reason"] = (
f"Phase 6 自我降級:近 7d SLO 違反,"
f"信心 {_confidence:.2f} < {_raised_threshold},升為人工"
)
await self._save_token(token)
_fire_and_forget(
_push_decision_to_telegram(incident, token.proposal_data)
)
try:
from src.db.base import get_session_factory as _p6_sf3
async with _p6_sf3()() as _s3:
_s3.add(_GovernanceEvent(
event_type="self_demotion",
details={
"incident_id": incident.incident_id,
"confidence": _confidence,
"raised_threshold": _raised_threshold,
"viol_7d": _viol_7d,
"triggered_at": _now.isoformat(),
},
resolved=False,
))
await _s3.commit()
except Exception:
pass
return
# confidence >= raised_threshold → 允許繼續自動執行
except Exception as _p6_err:
logger.warning("p6_self_demotion_check_error", error=str(_p6_err))
# 保守:P6 check 出錯 → 不阻擋(避免因 P6 bug 把所有修復都堵住)
# ADR-073 Phase 3-5: action | parse fix (2026-04-12 ogt)
# LLM 有時輸出 "kubectl rollout restart X | kubectl get pods -n Y"
# | 後面是查詢指令,取第一個才是真正的修復操作
if action and "|" in action:
action = action.split("|")[0].strip()
logger.debug("action_pipe_stripped", incident_id=incident.incident_id, action=action)
# NO_ACTION 規則(備份失敗/E2E smoke test 等)— kubectl_command 為空,不執行,直接返回
# 2026-04-11 Claude Sonnet 4.6: 防止空 action 或 NO_ACTION 字串進入自動執行流程
_suggested_action = token.proposal_data.get("suggested_action", "")
if not action or _suggested_action == "NO_ACTION":
logger.info(
"auto_execute_skipped_no_action",
incident_id=incident.incident_id,
suggested_action=_suggested_action,
reason="規則標記 NO_ACTION 或 kubectl_command 為空,不執行自動修復",
)
token.state = DecisionState.READY
await self._save_token(token)
_fire_and_forget(
_push_decision_to_telegram(incident, token.proposal_data)
)
return
# 替換所有 placeholder — {target}/{namespace}/ 等
_target = incident.affected_services[0] if incident.affected_services else "unknown"
_ns = "awoooi-prod"
if incident.signals:
_ns = incident.signals[0].labels.get("namespace", "awoooi-prod")
import re as _re
# BUG-002 修復 2026-04-11: 主機層告警(HostHighCpuLoad 等)無 component/job/pod label
# → affected_services=[] → target="unknown" → safety guard 攔截
# 補救:用 K8s MCP 依 alertname/host label 動態查詢受影響 Pod
if _target == "unknown":
_target = await _resolve_target_from_k8s(incident, _ns) or "unknown"
action = action.replace("{target}", _target).replace("{namespace}", _ns)
# 格式佔位符 → 用 target 替換
action = _re.sub(r"", _target, action)
action = _re.sub(r"<[^>]+>", _target, action)
# GAP-A4 Phase 2 (2026-04-14 Claude Sonnet 4.6): LLM 路徑 target 救援
# 真兇:LLM 直接產出 `kubectl scale deployment HostHighCpuLoad` (target=alertname)
# GAP-A4 Phase 1 只修了 rule_engine._extract_vars,LLM 路徑沒檢查
# 結果:12 次 auto_execute_blocked_unresolved_placeholder 直接攔下 → 飛輪 0
try:
from src.services.alert_rule_engine import (
_extract_vars as _rule_extract_vars,
_is_bad_target as _rule_is_bad_target,
)
_alertname_for_rescue = (
incident.signals[0].labels.get("alertname", "") if incident.signals else ""
)
# 從 action 提取 deployment 名稱(kubectl scale/restart deployment XXX 或 deployment/XXX)
_kubectl_target_match = _re.search(
r"deployment[/\s]+([\w.\-]+)", action
)
if _kubectl_target_match:
_llm_target = _kubectl_target_match.group(1)
if _rule_is_bad_target(_llm_target, _alertname_for_rescue):
# LLM 把垃圾當 deployment 名(alertname/unknown/IP)→ 重推
_alert_ctx = {
"labels": incident.signals[0].labels if incident.signals else {},
"target_resource": _target,
"namespace": _ns,
"alert_type": _alertname_for_rescue,
}
_good_target = _rule_extract_vars(_alert_ctx).get("target", "")
if _good_target and _good_target != "unknown" and not _rule_is_bad_target(_good_target, _alertname_for_rescue):
_old_action = action
action = action.replace(_llm_target, _good_target)
logger.info(
"auto_execute_target_rescued",
incident_id=incident.incident_id,
llm_target=_llm_target,
rescued_target=_good_target,
old_action=_old_action[:120],
new_action=action[:120],
reason="LLM 產出垃圾 target,從 labels 重推 deployment 名",
)
else:
logger.warning(
"auto_execute_target_rescue_failed",
incident_id=incident.incident_id,
llm_target=_llm_target,
alertname=_alertname_for_rescue,
reason="labels 也找不到合法 deployment,將進 safety guard 攔截 → 人工",
)
except Exception as _rescue_err:
logger.debug("target_rescue_skipped", error=str(_rescue_err))
# ADR-073 Phase 3-2: infrastructure/host/backup 告警 → SSH MCP routing.
# alert_category = "backup_failure" uses the same host-layer path as AutoRepairService.
# P1-1 fix 2026-04-12: 必須在 kubectl safety guard 之前 routing,否則 docker 指令被 _action_safe=False 攔截
_alert_category = getattr(incident, "alert_category", None) or ""
if _is_host_layer_ssh_category(_alert_category) and action and not action.startswith("kubectl"):
await self._ssh_execute(incident, token, action, _target)
return
# 2026-04-15 ogt: host_resource/backup_failure 告警不是 K8s workload 問題
# 不得執行 kubectl 操作,改降級人工審核
# 根因:原本只擋了 infrastructure,忘記 host_resource 也不走 K8s
if _is_non_k8s_host_category(_alert_category) and action and action.startswith("kubectl"):
logger.warning(
"auto_execute_blocked_host_layer_no_k8s",
incident_id=incident.incident_id,
alert_category=_alert_category,
action=action[:80],
reason="host/backup 告警不應執行 K8s kubectl 操作,降級人工審核",
)
token.state = DecisionState.READY
token.proposal_data["auto_executed"] = False
token.proposal_data["mcp_all_failed"] = True
token.proposal_data["blocked_reason"] = f"{_alert_category} 告警禁止 K8s kubectl,請人工排查主機/備份"
await self._save_token(token)
_fire_and_forget(
_escalate_decision_auto_repair_unavailable(
incident=incident,
token=token,
failure_reason=token.proposal_data["blocked_reason"],
attempted_actions=f"auto_execute -> {_alert_category}_k8s_block -> emergency_intervention",
)
)
_fire_and_forget(_push_decision_to_telegram(incident, token.proposal_data))
return
# 安全守衛: 替換後仍含 "unknown" 或未替換的 <...>/{...} → 拒絕執行
# 另外:若 target 等於 alertname,代表 LLM 把告警名稱填入 deployment_name,也拒絕
_alertname = incident.signals[0].labels.get("alertname", "") if incident.signals else ""
_target_is_alertname = bool(_alertname and _target == _alertname)
# SPF-2: structured kubectl parser replaces the old single-regex guard.
_action_parse = parse_kubectl_action(action.strip())
_action_safe = _action_parse.ok
if "unknown" in action or _re.search(r"[<{][^>}]+[>}]", action) or _target_is_alertname or not _action_safe:
logger.warning(
"auto_execute_blocked_unresolved_placeholder",
incident_id=incident.incident_id,
action=action,
target=_target,
parser_reason=_action_parse.reason,
reason="action 含未解析的 placeholder、unknown、target==alertname、或未通過 kubectl action parser,拒絕執行",
)
# Safety guard 攔截 → 降級為人工審核,而非「修復失敗」
# 2026-04-11 Claude Sonnet 4.6: 不發 ❌ 失敗訊息,改發人工審核卡片
# 讓統帥看到告警並決定如何處理,而不是重複收到「無法確認 deployment 名稱」
token.state = DecisionState.READY # 回到 READY,讓人工審核
token.error = f"Auto-execute blocked (safety guard): {action[:80]}"
token.proposal_data["decision_state"] = DecisionState.READY.value
token.proposal_data["auto_executed"] = False
token.proposal_data["mcp_all_failed"] = True # 標記讓 classify_notification → TYPE-4
await self._save_token(token)
_fire_and_forget(
_escalate_decision_auto_repair_unavailable(
incident=incident,
token=token,
failure_reason=(
"Auto-execute safety guard blocked action: "
f"{_action_parse.reason}"
),
attempted_actions="auto_execute -> action_parser_guard -> emergency_intervention",
)
)
_fire_and_forget(
_push_decision_to_telegram(incident, token.proposal_data)
)
return
# BUG-003 修復 2026-04-11: 加入 K8s deployment 存在性驗證,
# 避免 LLM 產生的無效 deployment name(/alertname/unknown)通過 safety guard
# 但仍對 K8s 發出錯誤指令
if _target and _target != "unknown":
_k8s_verified = await _verify_k8s_deployment_exists(_target, _ns, alertname=_alertname)
if not _k8s_verified:
logger.warning(
"auto_execute_blocked_deployment_not_found",
incident_id=incident.incident_id,
target=_target,
namespace=_ns,
reason="K8s 中找不到此 deployment/pod,拒絕執行",
)
# K8s 找不到 target → 降級為人工審核,同 safety guard 策略
# 2026-04-11 Claude Sonnet 4.6: 不發 ❌ 失敗,改發 TYPE-4 人工審核卡片
token.state = DecisionState.READY
token.error = f"Auto-execute blocked: deployment '{_target}' not found in K8s"
token.proposal_data["decision_state"] = DecisionState.READY.value
token.proposal_data["auto_executed"] = False
token.proposal_data["mcp_all_failed"] = True
await self._save_token(token)
_fire_and_forget(
_escalate_decision_auto_repair_unavailable(
incident=incident,
token=token,
failure_reason=(
f"K8s target '{_target}' not found in namespace '{_ns}'"
),
attempted_actions="auto_execute -> k8s_existence_check -> emergency_intervention",
)
)
_fire_and_forget(
_push_decision_to_telegram(incident, token.proposal_data)
)
return
# Phase 5 ADR-086: Blast Radius 分級守衛(AIOPS_P5_BLAST_RADIUS_CHECK 控制)
# 評估修復動作的爆炸半徑,決定是否可自動執行或需升級審核
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 5 初始建立
try:
from src.core.feature_flags import aiops_flags as _p5_flags
if _p5_flags.AIOPS_P5_BLAST_RADIUS_CHECK:
from src.services.blast_radius_calculator import get_blast_radius_calculator
from src.services.declarative_remediation import get_declarative_remediation
_calc = get_blast_radius_calculator()
_blast = _calc.calculate(action, namespace=_ns, target=_target)
_spec = get_declarative_remediation().evaluate(
action=action, target=_target, namespace=_ns,
description=token.proposal_data.get("description", ""),
)
# 記錄分級結果到 proposal_data(供學習 + 審計)
token.proposal_data["blast_radius_score"] = _blast.score
token.proposal_data["blast_radius_tier"] = _blast.tier
token.proposal_data["blast_radius_reason"] = _blast.reason
if _blast.tier == "blocked":
# HARD_RULES 永擋
logger.warning(
"auto_execute_blast_radius_hard_blocked",
incident_id=incident.incident_id,
action=action[:80],
reason=_blast.reason,
)
token.state = DecisionState.READY
token.proposal_data["auto_executed"] = False
token.proposal_data["mcp_all_failed"] = True
token.proposal_data["blocked_reason"] = f"HARD_RULES 永擋:{_blast.reason}"
await self._save_token(token)
_fire_and_forget(_push_decision_to_telegram(incident, token.proposal_data))
return
elif _blast.tier in ("human", "dual"):
# 中高衝擊 → 升級人工審核,不自動執行
logger.info(
"auto_execute_blast_radius_escalated",
incident_id=incident.incident_id,
tier=_blast.tier,
score=_blast.score,
action=action[:80],
)
token.state = DecisionState.READY
token.proposal_data["auto_executed"] = False
token.proposal_data["requires_human_review"] = True
token.proposal_data["blast_radius_escalated"] = True
await self._save_token(token)
# dual tier → 非同步建立 GitOps Issue
if _blast.tier == "dual" and _p5_flags.AIOPS_P5_GITOPS_PR:
from src.services.gitops_pr_service import get_gitops_pr_service
_fire_and_forget(
get_gitops_pr_service().create_repair_issue(
spec=_spec,
incident_id=incident.incident_id,
diagnosis=token.proposal_data.get("debate_summary", ""),
)
)
_fire_and_forget(_push_decision_to_telegram(incident, token.proposal_data))
return
# tier == "auto" → 繼續自動執行流程
except Exception as _blast_err:
# Blast Radius 計算失敗 → 保守:視為 human tier,升級人工審核
logger.warning(
"blast_radius_check_failed_conservative_escalate",
incident_id=incident.incident_id,
error=str(_blast_err),
)
token.state = DecisionState.READY
token.proposal_data["auto_executed"] = False
token.proposal_data["blast_radius_tier"] = "unknown_conservative"
await self._save_token(token)
_fire_and_forget(_push_decision_to_telegram(incident, token.proposal_data))
return
# 2026-04-15 ogt: 同一 target 5 分鐘內最多執行 2 次,防止修復風暴
# 根因:多個 incident 共享同一 target 時,各自獨立自動執行 → 重複重啟
try:
from src.core.redis_client import get_redis as _get_redis_dm
_redis_dm = _get_redis_dm()
_dm_cooldown_key = f"awoooi:auto_execute_cooldown:{_ns}:{_target}"
_dm_exec_count = await _redis_dm.get(_dm_cooldown_key)
if _dm_exec_count and int(_dm_exec_count) >= 2:
logger.warning(
"auto_execute_cooldown_blocked",
incident_id=incident.incident_id,
target=_target,
namespace=_ns,
exec_count=int(_dm_exec_count),
reason="同一 target 5 分鐘內已自動執行 2 次,冷卻中",
)
token.state = DecisionState.READY
token.proposal_data["auto_executed"] = False
token.proposal_data["cooldown_blocked"] = True
await self._save_token(token)
return
await _redis_dm.incr(_dm_cooldown_key)
await _redis_dm.expire(_dm_cooldown_key, 300) # 5 分鐘
except Exception as _cd_err:
logger.debug("auto_execute_cooldown_check_error", error=str(_cd_err))
# P1.1 fix 2026-04-27 ogt + Claude Sonnet 4.6: 失敗路徑 KM 寫入哨兵
# 在 try 外宣告,確保 except 區塊能存取(即使建構失敗)
_km_executor = None
_km_approval = None
try:
# 延遲導入避免循環依賴
from src.models.approval import ApprovalRequest, ApprovalStatus
from src.services.approval_execution import ApprovalExecutionService
# 建立虛擬 ApprovalRequest (auto_execute — 不需人工審核)
_risk = token.proposal_data.get("risk_level", "low")
# ADR-083 Phase 3: 傳遞 matched_playbook_id,學習迴路 EWMA 才能觸發
# Playbook RAG 命中時 proposal_data["playbook_id"] 會有值(見 _try_playbook_match)
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 3 matched_playbook_id 修復
_matched_playbook_id = token.proposal_data.get("playbook_id")
approval = ApprovalRequest(
incident_id=incident.incident_id,
action=action,
description=token.proposal_data.get("description", action[:100]),
requested_by="auto_approve",
required_signatures=0,
status=ApprovalStatus.APPROVED,
risk_level=_risk,
matched_playbook_id=_matched_playbook_id,
)
_km_approval = approval # P1.1: 供 except 失敗路徑使用
# ADR-071-I: 執行前抓 metrics_before 快照 (2026-04-11 Claude Sonnet 4.6)
_metrics_before = await _fetch_metrics_snapshot(incident)
if _metrics_before:
try:
from src.db.base import get_db_context
from src.db.models import Incident as IncidentORM
from sqlalchemy import update as _sa_update
async with get_db_context() as _db:
await _db.execute(
_sa_update(IncidentORM)
.where(IncidentORM.incident_id == incident.incident_id)
.values(metrics_before=_metrics_before)
)
await _db.commit()
except Exception as _mb_err:
logger.debug("metrics_before_save_failed",
incident_id=incident.incident_id, error=str(_mb_err))
# 執行
# 2026-04-17 ogt + Claude Sonnet 4.6 (Checkpoint-1 假成功修復):
# 舊 bug: execute_approved_action 返回 None → 此處永遠傳 success=True 給
# _push_auto_repair_result → Telegram 顯示 ✅ 自動修復完成,即使 K8s 拒絕了指令
# 修復: execute_approved_action 現在返回 bool,正確透傳給通知函數
executor = ApprovalExecutionService()
_km_executor = executor # P1.1: 供 except 失敗路徑使用
_exec_success = await executor.execute_approved_action(approval)
# 更新狀態
token.state = DecisionState.COMPLETED
token.proposal_data["auto_executed"] = True
token.proposal_data["exec_success"] = _exec_success
await self._save_token(token)
logger.info(
"auto_execute_completed",
incident_id=incident.incident_id,
action=approval.action,
exec_success=_exec_success,
)
# 2026-04-09 Claude Sonnet 4.6: 執行完成 → 發 Telegram 結果通知(成功或失敗皆發)
_fire_and_forget(
_push_auto_repair_result(incident, action, success=_exec_success)
)
# M1 修復 2026-04-28 ogt + Claude Sonnet 4.6:
# 飛輪閉環 — auto_execute 路徑 KM 寫入改為 await asyncio.shield()
# 原為 _fire_and_forget(裸 create_task)→ 整個 coroutine 繞過 KMWriter 統一契約
# asyncio.shield 確保上層 cancel 時 KM 寫入仍能完成(不被中止)
# KM_WRITE_AWAIT=false 時,write_execution_result_to_km 內部走 km_write_with_flag 降級
try:
await asyncio.shield(
executor.write_execution_result_to_km(approval, _exec_success, None)
)
except BaseException as _km_err:
# BaseException 涵蓋 CancelledError(Python 3.8+ 繼承自 BaseException)
logger.warning(
"auto_execute_km_write_failed",
incident_id=incident.incident_id,
error=str(_km_err),
)
except Exception as e:
logger.error(
"auto_execute_failed",
incident_id=incident.incident_id,
error=str(e),
)
token.state = DecisionState.ERROR
token.error = f"Auto-execute failed: {e}"
await self._save_token(token)
# 2026-04-09 Claude Sonnet 4.6: 執行失敗 → 發 Telegram 失敗通知 + fallback 人工
_fire_and_forget(
_push_auto_repair_result(incident, action, success=False, error=str(e))
)
_fire_and_forget(
_push_decision_to_telegram(incident, token.proposal_data)
)
# M1 修復 2026-04-28 ogt + Claude Sonnet 4.6: 失敗路徑 KM 寫入改為 await asyncio.shield()
if _km_executor is not None and _km_approval is not None:
try:
await asyncio.shield(
_km_executor.write_execution_result_to_km(
_km_approval, False, str(e)
)
)
except BaseException as _km_err:
# BaseException 涵蓋 CancelledError(Python 3.8+ 繼承自 BaseException)
logger.warning(
"auto_execute_km_write_failed_path",
incident_id=incident.incident_id,
error=str(_km_err),
)
async def _query_kb_context_inner(self, incident: Incident) -> str:
"""KB RAG 實際查詢邏輯,由 _query_kb_context 包裝 timeout 後呼叫"""
query_parts = list(incident.affected_services)
if incident.signals:
query_parts.insert(0, getattr(incident.signals[0], "alert_name", ""))
query = " ".join(filter(None, query_parts))
results = await self._knowledge_svc.semantic_search(query, limit=3, threshold=0.4)
if not results:
return ""
lines = ["## Knowledge Base Related Entries (KB RAG)"]
for entry, score in results:
lines.append(
f"\n### [{entry.entry_type}] {entry.title} (similarity={score:.2f})"
)
lines.append(entry.content[:500])
if len(entry.content) > 500:
lines.append("... (truncated)")
logger.info(
"kb_rag_context_injected",
incident_id=incident.incident_id,
kb_hits=len(results),
)
return "\n".join(lines)
async def _query_kb_context(self, incident: Incident) -> str:
"""
KB Phase 2: 語意搜尋相關 KB 條目,組裝為 LLM context 字串
2026-04-04 Claude Code: KB RAG 整合
C1 修復 (首席架構師審查): 5 秒 hard timeout,防止 Ollama 慢響應威脅 30s SLA
失敗/timeout 時靜默降級,不影響主分析流程
"""
try:
return await asyncio.wait_for(
self._query_kb_context_inner(incident),
timeout=5.0,
)
except asyncio.TimeoutError:
logger.warning("kb_rag_timeout", incident_id=incident.incident_id)
return ""
except (ConnectionError, OSError) as e:
# Ollama 連線問題,預期可降級
logger.warning("kb_rag_connection_error", incident_id=incident.incident_id, error=str(e))
return ""
except Exception as e:
# 非預期錯誤,用 error 級別方便監控
logger.error("kb_rag_unexpected_error", incident_id=incident.incident_id, error=str(e))
return ""
async def _collect_mcp_context(self, incident: Incident) -> str:
"""
ADR-070 全自動 AIOps: 分析前用 MCP 收集真實環境狀態
讓 LLM 拿到真實資訊做決策,而非只憑 alert labels
策略:
- K8s 告警 → K8s MCP 查 Pod 狀態/事件
- 主機/Docker 告警 → SSH MCP 查容器狀態/資源
2026-04-11 Claude Sonnet 4.6 Asia/Taipei
"""
if not incident.signals:
return ""
labels = incident.signals[0].labels
alertname = labels.get("alertname", "")
host = labels.get("instance", "").split(":")[0] or labels.get("host", "")
# C1 修復 2026-04-11 (Code Review): 修正 Python ternary 優先度問題
# 原: `A or B or C[0] if list else ""` → ternary 控制全式,非僅 C[0]
container = (
labels.get("name")
or labels.get("container")
or (incident.affected_services[0] if incident.affected_services else "")
)
ns = labels.get("namespace", "awoooi-prod")
ctx_parts: list[str] = []
# C2 修復 2026-04-11 (Code Review): 所有 MCP 呼叫加 5s timeout,防止阻塞決策主路徑
_MCP_TIMEOUT = 5.0
# 主機/Docker 告警 → SSH MCP 診斷
_HOST_ALERT_PREFIXES = ("Host", "Docker", "Sentry", "Harbor", "Ollama", "Backup")
if alertname.startswith(_HOST_ALERT_PREFIXES) and host:
try:
ssh = self._ssh
# C4: 未知主機記錄 warning(不靜默跳過)
# P0.4 fix 2026-04-24 ogt + Claude Sonnet 4.6: 補入 120/121,與 ssh_provider default 對齊
_KNOWN_HOSTS = ("192.168.0.188", "192.168.0.110", "192.168.0.120", "192.168.0.121")
if ssh.enabled and host not in _KNOWN_HOSTS:
logger.warning("mcp_context_unknown_host", host=host, known=_KNOWN_HOSTS)
if ssh.enabled and host in _KNOWN_HOSTS:
# 查容器狀態
if container and container != alertname:
status_result = await asyncio.wait_for(
ssh.execute(
tool_name="ssh_get_container_status",
# P0.4 fix 2026-04-24 ogt + Claude Sonnet 4.6: params= → parameters=(符合 MCPToolProvider.execute 簽名)
parameters={"host": host, "container_name": container},
),
timeout=_MCP_TIMEOUT,
)
# P0.4 fix 2026-04-24 ogt + Claude Sonnet 4.6: MCPToolResult 是 dataclass,用 .success/.output 而非 .get()
if status_result.success:
ctx_parts.append(f"[SSH] 容器 {container} 狀態: {(status_result.output or '')[:300]}")
# 查主機資源
if "CpuLoad" in alertname or "Memory" in alertname:
top_result = await asyncio.wait_for(
ssh.execute(
tool_name="ssh_get_top_processes",
# P0.4 fix 2026-04-24 ogt + Claude Sonnet 4.6: params= → parameters=
parameters={"host": host, "top_n": 5},
),
timeout=_MCP_TIMEOUT,
)
# P0.4 fix 2026-04-24 ogt + Claude Sonnet 4.6: MCPToolResult dataclass 用 .success/.output
if top_result.success:
ctx_parts.append(f"[SSH] 主機 {host} Top processes: {(top_result.output or '')[:300]}")
except asyncio.TimeoutError:
logger.warning("mcp_context_ssh_timeout", alertname=alertname, host=host, timeout=_MCP_TIMEOUT)
except Exception as e:
logger.debug("mcp_context_ssh_failed", alertname=alertname, error=str(e))
# K8s 告警 → K8s MCP 查 Pod 狀態
if alertname.startswith(("Kube", "K3s")) or labels.get("pod"):
try:
k8s = self._k8s
if k8s.enabled:
pod = labels.get("pod", "")
if pod:
events_result = await asyncio.wait_for(
k8s.execute(
tool_name="k8s_get_events",
# P0.4 fix 2026-04-24 ogt + Claude Sonnet 4.6: params= → parameters=
parameters={"namespace": ns, "field_selector": f"involvedObject.name={pod}"},
),
timeout=_MCP_TIMEOUT,
)
# P0.4 fix 2026-04-24 ogt + Claude Sonnet 4.6: MCPToolResult 是 dataclass,用 .success/.output
if events_result.success:
ctx_parts.append(f"[K8s] Pod {pod} 事件: {(events_result.output or '')[:300]}")
except asyncio.TimeoutError:
logger.warning("mcp_context_k8s_timeout", alertname=alertname, timeout=_MCP_TIMEOUT)
except Exception as e:
logger.debug("mcp_context_k8s_failed", alertname=alertname, error=str(e))
return "\n".join(ctx_parts)
async def _dual_engine_analyze(
self,
incident: Incident,
) -> dict[str, Any]:
"""
三軌決策分析 (Phase 7.5 升級 + KB Phase 2 RAG 整合 + ADR-070 MCP 前置收集)
策略:
1. MCP 前置收集真實環境狀態(ADR-070)
2. 先檢查 Playbook 是否有高度匹配 (similarity >= 85%)
3. Playbook 命中則直接使用 (最快、經驗驗證)
4. 否則 LLM + Expert System 雙軌 + KB RAG context + MCP context 注入
優先順序: Playbook > LLM > Expert System
"""
# ADR-081 Phase 1: PreDecisionInvestigator — 8D 感官蒐集(feature flag 守衛)
# AIOPS_P1_ENABLED=False → 退回舊 _collect_mcp_context() 路徑
# 2026-04-15 ogt + Claude Sonnet 4.6
evidence_snapshot = None
from src.core.feature_flags import aiops_flags
if aiops_flags.is_sub_flag_enabled("AIOPS_P1_PRE_DECISION_INVESTIGATOR"):
from src.services.pre_decision_investigator import get_pre_decision_investigator
investigator = get_pre_decision_investigator()
evidence_snapshot = await investigator.investigate(incident)
mcp_context = evidence_snapshot.evidence_summary or ""
else:
# ADR-070: 原有 MCP 收集路徑(Phase 0 保留)
mcp_context = await self._collect_mcp_context(incident)
# ADR-082 Phase 2: 5 Agent 辯證(feature flag 守衛)
# AIOPS_P2_ENABLED=True → 走 AgentOrchestrator 路徑,跳過 Playbook / LLM
# 需要 EvidenceSnapshot;若 P1 未開啟則自行收集
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太)
if aiops_flags.is_phase_enabled(2): # Gate 2: 用 is_phase_enabled 統一父 Phase 守衛
# 2026-04-24 ogt + Claude Sonnet 4.6: YAML NO_ACTION 優先門
# 根因:Phase 2 五 agent 對主機層/外部服務告警(HostDiskUsageHigh /
# SentryClickHouseMemoryPressure)做 kubectl 分析 → Solver 永遠降級
# (無 K8s target) → confidence=20% + kubectl get pods(無意義)
# 修復:YAML 匹配到 NO_ACTION 且 kubectl_command="" → 跳過 Agent Debate,
# 直接返回 YAML 規則響應,讓 Telegram 推送人工排查通知
try:
from src.services.alert_rule_engine import match_rule as _p2_match_rule
_p2_labels = incident.signals[0].labels if incident.signals else {}
_p2_alertname = _p2_labels.get("alertname", "")
_p2_yaml = _p2_match_rule({
"labels": _p2_labels,
"alert_type": _p2_alertname,
"message": (
incident.signals[0].annotations.get("summary", "")
if incident.signals else ""
),
"target_resource": incident.affected_services[0] if incident.affected_services else "unknown",
"namespace": _p2_labels.get("namespace", "awoooi-prod"),
"severity": incident.severity.value if hasattr(incident.severity, "value") else "medium",
})
_is_no_action_yaml = (
_p2_yaml is not None
and _p2_yaml.get("suggested_action") == "NO_ACTION"
and not _p2_yaml.get("kubectl_command", "").strip()
and _p2_yaml.get("rule_id", "") not in ("generic_fallback", "")
)
if _is_no_action_yaml:
logger.info(
"p2_yaml_no_action_bypass",
incident_id=incident.incident_id,
alertname=_p2_alertname,
rule_id=_p2_yaml.get("rule_id", ""),
reason="YAML NO_ACTION 規則命中,跳過 Agent Debate",
)
return _p2_yaml
except Exception as _p2_yaml_err:
logger.debug("p2_yaml_precheck_error", error=str(_p2_yaml_err))
p2_snapshot = evidence_snapshot
if p2_snapshot is None:
try:
from src.services.pre_decision_investigator import get_pre_decision_investigator
p2_snapshot = await get_pre_decision_investigator().investigate(incident)
except Exception:
logger.warning(
"p2_snapshot_collect_failed",
incident_id=incident.incident_id,
)
if p2_snapshot is not None:
from src.services.agent_orchestrator import run_agent_debate
package = await run_agent_debate(
snapshot=p2_snapshot,
incident_id=incident.incident_id,
)
# 2026-04-27 Wave8-B1 by Claude — fusion 三斷鏈修復:
# evidence_snapshot 攜帶進 proposal_data,避免 singleton 並發污染
_p2_result = _package_to_proposal_data(package)
_p2_result["_evidence_snapshot_ref"] = p2_snapshot
_p2_fallback = _phase2_fallback_reason(package)
if not _p2_fallback:
return _p2_result
logger.warning(
"p2_degraded_fallback_to_playbook_llm",
incident_id=incident.incident_id,
reason=_p2_fallback,
confidence=_p2_result.get("confidence"),
blocked_reason=_p2_result.get("blocked_reason", ""),
)
# snapshot 仍為 None → 降級繼續走原路徑(不阻塞)
if p2_snapshot is None:
logger.warning("p2_no_snapshot_fallback", incident_id=incident.incident_id)
# Phase 7.5: 先嘗試 Playbook 匹配
playbook_result = await self._try_playbook_match(incident)
if playbook_result:
if evidence_snapshot is not None:
playbook_result["_evidence_snapshot_ref"] = evidence_snapshot
return playbook_result
# MCP Phase 4c: Playbook 無命中 → 非同步產生 AI 草稿 Playbook (2026-04-11 Claude Sonnet 4.6)
_fire_and_forget(_generate_playbook_draft_if_new(incident))
# Expert System 同步執行 (立即可用)
expert_result = expert_analyze(incident)
# KB Phase 2: 語意搜尋相關知識條目 (失敗時靜默降級)
# 2026-04-04 Claude Code: KB RAG 整合,提升 LLM 決策品質
kb_context = await self._query_kb_context(incident)
# LLM 非同步執行 (Phase 22: OpenClaw + Nemotron 協作)
# 2026-03-31 Claude Code: 使用 _with_tools 方法啟用雙軌協作
try:
signals_dict = [s.model_dump() for s in incident.signals]
# 將 KB context + MCP 實時狀態 注入 expert_context 傳給 LLM
# ADR-070: MCP context 優先放最前面,讓 LLM 看到真實環境狀態再做決策
llm_expert_context: dict[str, Any] = {**expert_result} if expert_result else {}
existing = str(llm_expert_context.get("diagnosis_context", ""))
context_parts = []
if mcp_context:
context_parts.append(f"## 當前環境狀態 (MCP 實時查詢)\n{mcp_context}")
if kb_context:
context_parts.append(f"## 相關歷史知識\n{kb_context}")
if existing:
context_parts.append(existing)
if context_parts:
llm_expert_context["diagnosis_context"] = "\n\n".join(context_parts)
# 2026-05-06 Codex: The alert goal is resolution quality, not a
# fast-but-paid card. The outer guard is configurable and must allow
# the GCP-A → GCP-B → 111 Ollama lane to finish before cloud backup.
llm_timeout_seconds = _incident_llm_timeout_seconds()
llm_result, provider, success = await asyncio.wait_for(
self._openclaw.generate_incident_proposal_with_tools(
incident_id=incident.incident_id,
severity=incident.severity.value,
signals=signals_dict,
affected_services=incident.affected_services,
expert_context=llm_expert_context if llm_expert_context else None,
),
timeout=llm_timeout_seconds,
)
if success and llm_result:
logger.info(
"dual_engine_llm_win",
incident_id=incident.incident_id,
provider=provider,
kb_rag=bool(kb_context),
)
result = {**llm_result, "source": f"llm_{provider}"}
# ADR-073 Phase 3-6: YAML rule risk_level 優先於 LLM 輸出 (2026-04-12 ogt)
# LLM 有時把 critical 告警估為 medium,YAML 規則是由人工審閱過的,優先採用
try:
from src.services.alert_rule_engine import get_risk_for_alertname
_alertname_for_risk = (
incident.signals[0].labels.get("alertname", "")
if incident.signals else ""
)
if _alertname_for_risk:
_yaml_risk = get_risk_for_alertname(_alertname_for_risk)
if _yaml_risk and _yaml_risk != result.get("risk_level"):
logger.info(
"risk_level_yaml_override",
incident_id=incident.incident_id,
llm_risk=result.get("risk_level"),
yaml_risk=_yaml_risk,
)
result["risk_level"] = _yaml_risk
except Exception as _re:
logger.debug("risk_level_yaml_override_failed", error=str(_re))
# MCP Phase 4a: 信心 < 0.7 → NemoClaw second opinion (2026-04-11 Claude Sonnet 4.6)
_conf = float(result.get("confidence", 1.0))
if _conf < 0.7:
try:
# GAP-B4 (2026-04-14 Claude Sonnet 4.6): NemoClaw 是 advisory,
# 3s timeout 保護主決策流程不被拖累
_advisory = await asyncio.wait_for(
_nemoclaw_second_opinion(incident, result),
timeout=3.0,
)
if _advisory:
result["advisory_note"] = _advisory
logger.info(
"nemoclaw_second_opinion_added",
incident_id=incident.incident_id,
confidence=_conf,
)
except asyncio.TimeoutError:
logger.warning("nemoclaw_second_opinion_timeout",
incident_id=incident.incident_id)
except Exception as _soe:
logger.warning("nemoclaw_second_opinion_failed",
incident_id=incident.incident_id, error=str(_soe))
# 2026-04-27 Wave8-B1 by Claude — evidence_snapshot 攜帶進 result(P1 LLM 路徑)
if evidence_snapshot is not None:
result["_evidence_snapshot_ref"] = evidence_snapshot
return result
except asyncio.TimeoutError:
# GAP-B4: LLM 超時 → 明確標記,降級 Expert System
logger.warning(
"llm_timeout_fallback",
incident_id=incident.incident_id,
timeout_sec=llm_timeout_seconds,
action="降級 Expert System",
)
except Exception as e:
logger.warning(
"dual_engine_llm_failed",
incident_id=incident.incident_id,
error=str(e),
)
# LLM 失敗/超時,使用 Expert System
logger.info(
"dual_engine_expert_fallback",
incident_id=incident.incident_id,
)
return expert_result
async def _try_playbook_match(
self,
incident: Incident,
) -> dict[str, Any] | None:
"""
Phase 7.5: 嘗試 Playbook 匹配
條件:
- 相似度 >= PLAYBOOK_SIMILARITY_THRESHOLD (85%)
- Playbook 狀態為 APPROVED
- 成功率 >= 80% (如果有執行紀錄)
Returns:
匹配成功返回 proposal_data,否則 None
"""
try:
playbook_service = get_playbook_service()
# 建構症狀模式
alert_names = [s.alert_name for s in incident.signals] if incident.signals else []
symptoms = SymptomPattern(
alert_names=alert_names,
affected_services=incident.affected_services or [],
severity_range=[incident.severity.value] if incident.severity else ["P2"],
)
# 取得推薦 (只取 Top 1)
recommendations = await playbook_service.get_recommendations(
symptoms=symptoms,
top_k=1,
)
if not recommendations:
logger.debug(
"playbook_no_match",
incident_id=incident.incident_id,
)
return None
best_match = recommendations[0]
playbook = best_match.playbook
# 檢查相似度閾值
if best_match.similarity_score < PLAYBOOK_SIMILARITY_THRESHOLD:
logger.debug(
"playbook_similarity_below_threshold",
incident_id=incident.incident_id,
playbook_id=playbook.playbook_id,
similarity=best_match.similarity_score,
threshold=PLAYBOOK_SIMILARITY_THRESHOLD,
)
return None
# 檢查成功率 (如果有執行紀錄)
if playbook.total_executions > 0 and playbook.success_rate < 0.8:
logger.debug(
"playbook_low_success_rate",
incident_id=incident.incident_id,
playbook_id=playbook.playbook_id,
success_rate=playbook.success_rate,
)
return None
# Playbook 命中!
# 取得第一個修復步驟的指令
kubectl_command = ""
if playbook.repair_steps:
# 將 target 替換為實際服務名稱
target = incident.affected_services[0] if incident.affected_services else "unknown"
kubectl_command = playbook.repair_steps[0].command.format(target=target)
logger.info(
"playbook_match_success",
incident_id=incident.incident_id,
playbook_id=playbook.playbook_id,
playbook_name=playbook.name,
similarity=best_match.similarity_score,
success_rate=playbook.success_rate,
)
return {
"source": "playbook",
"playbook_id": playbook.playbook_id,
"playbook_name": playbook.name,
"action": kubectl_command,
"kubectl_command": kubectl_command,
"description": playbook.description,
"risk_level": playbook.repair_steps[0].risk_level.value.lower() if playbook.repair_steps else "medium",
"reasoning": f"Playbook 匹配 ({best_match.similarity_score:.0%} 相似度, {playbook.success_rate:.0%} 成功率): {best_match.reason}",
"confidence": 0.0, # 🔴 Playbook RAG 匹配不是 AI 分析,信心度設 0
"matched_symptoms": best_match.matched_symptoms,
"from_cache": False,
}
except Exception as e:
logger.warning(
"playbook_match_error",
incident_id=incident.incident_id,
error=str(e),
)
return None
async def _find_existing_token(
self,
incident_id: str,
) -> DecisionToken | None:
"""查找現有的決策令牌"""
redis_client = get_redis()
# 掃描 decision:* 找到匹配的 incident_id
cursor = 0
while True:
cursor, keys = await redis_client.scan(
cursor=cursor,
match=f"{DECISION_TOKEN_PREFIX}*",
count=100,
)
for key in keys:
try:
import json
data = await redis_client.get(key)
if data:
token_data = json.loads(data)
if token_data.get("incident_id") == incident_id:
return DecisionToken.from_dict(token_data)
except Exception:
continue
if cursor == 0:
break
return None
async def _persist_decision_to_db(
self, incident_id: str, proposal_data: dict
) -> None:
"""
2026-04-16 Claude Sonnet 4.6: 將 AI 分析結果寫入 incidents.decision_chain (PostgreSQL)
修復 Gap: decision token 只有 Redis TTL,AI 分析歷史永久丟失
寫入格式: JSON array,每次分析追加一條記錄
欄位: ts / confidence / risk_level / provider / source / diagnosis (前 200 字)
"""
try:
from src.db.base import get_db_context
from sqlalchemy import text as _sa_text
import json as _json
from src.utils.timezone import now_taipei
# 2026-04-16 ogt + Claude Sonnet 4.6: 補入 playbook_id / alert_category (ADR-076)
entry = {
"ts": now_taipei().isoformat(),
"confidence": proposal_data.get("confidence", 0.0),
"risk_level": proposal_data.get("risk_level", "unknown"),
"provider": proposal_data.get("provider", proposal_data.get("source", "")),
"source": proposal_data.get("source", ""),
"diagnosis": str(proposal_data.get("diagnosis", proposal_data.get("description", "")))[:200],
"playbook_id": proposal_data.get("playbook_id", ""),
"playbook_name": proposal_data.get("playbook_name", ""),
"alert_category": proposal_data.get("alert_category", ""),
}
async with get_db_context() as db:
# 讀取現有 decision_chain (可能為 null 或 json array)
r = await db.execute(
_sa_text(
"SELECT decision_chain FROM incidents WHERE incident_id = :iid"
),
{"iid": incident_id},
)
row = r.fetchone()
if row is None:
return
existing = row[0] or []
if isinstance(existing, str):
try:
existing = _json.loads(existing)
except Exception:
existing = []
if not isinstance(existing, list):
existing = []
existing.append(entry)
# asyncpg 不支援 :param::type 語法,改用 CAST(:param AS jsonb)
# 2026-04-16 Claude Sonnet 4.6: fix syntax error at or near ":"
await db.execute(
_sa_text(
"UPDATE incidents SET decision_chain = CAST(:dc AS jsonb) WHERE incident_id = :iid"
),
{"dc": _json.dumps(existing), "iid": incident_id},
)
await db.commit()
logger.debug(
"decision_chain_persisted",
incident_id=incident_id,
confidence=entry["confidence"],
provider=entry["provider"],
)
except Exception as e:
logger.warning("decision_chain_persist_failed", incident_id=incident_id, error=str(e))
async def _save_token(self, token: DecisionToken, ttl: int = DECISION_TOKEN_TTL) -> None:
"""儲存決策令牌到 Redis
ttl: 過期秒數,預設 DECISION_TOKEN_TTL (3600s)
TYPE-1 純資訊通知使用 86400s (24h) 防重複洗版
"""
import json
redis_client = get_redis()
key = f"{DECISION_TOKEN_PREFIX}{token.token}"
await redis_client.set(
key,
json.dumps(token.to_dict()),
ex=ttl,
)
async def get_token(self, token_id: str) -> DecisionToken | None:
"""取得決策令牌"""
import json
redis_client = get_redis()
key = f"{DECISION_TOKEN_PREFIX}{token_id}"
data = await redis_client.get(key)
if data:
return DecisionToken.from_dict(json.loads(data))
return None
async def update_token_state(
self,
token_id: str,
new_state: DecisionState,
proposal_id: str | None = None,
) -> DecisionToken | None:
"""更新決策狀態"""
token = await self.get_token(token_id)
if not token:
return None
token.state = new_state
token.updated_at = datetime.now(UTC)
if proposal_id:
token.proposal_id = proposal_id
await self._save_token(token)
return token
async def get_or_create_decision_with_consensus(
self,
incident: Incident,
timeout_sec: float = 30.0,
use_consensus: bool = True,
) -> DecisionToken:
"""
取得或建立決策令牌 (含 Agent Teams 共識)
Phase 9.4 升級版本:
- 對於 P0/P1 事件,自動啟用 ConsensusEngine
- 整合多專家意見
- 共識分數影響風險評估
Args:
incident: 事件
timeout_sec: 超時秒數
use_consensus: 是否使用共識引擎 (預設 True)
Returns:
DecisionToken
"""
# 2026-04-26 P2.4 by Claude — 12-Agent Consensus 整合
# ENABLE_12AGENT_CONSENSUS=False(預設)→ 跳過 consensus,走原有雙軌決策
# ENABLE_12AGENT_CONSENSUS=True + P0/P1 + use_consensus=True → 走 consensus 路徑
if not settings.ENABLE_12AGENT_CONSENSUS:
return await self.get_or_create_decision(incident, timeout_sec)
# 判斷是否需要共識 (P0/P1 或明確要求)
should_use_consensus = use_consensus and incident.severity.value in ["P0", "P1"]
if not should_use_consensus:
# 使用原有的雙軌決策
return await self.get_or_create_decision(incident, timeout_sec)
# Phase 9.4: 使用 ConsensusEngine
from src.services.consensus_engine import get_consensus_engine
consensus_engine = get_consensus_engine()
# 檢查現有 token
existing_token = await self._find_existing_token(incident.incident_id)
if existing_token:
# READY 或 EXECUTING 狀態: 直接返回
if existing_token.state in (DecisionState.READY, DecisionState.EXECUTING):
return existing_token
# COMPLETED 狀態: 直接返回,避免重複建立 decision 導致 Telegram 轟炸
if existing_token.state == DecisionState.COMPLETED:
return existing_token
# 建立新 token
token = DecisionToken(
token=f"DEC-{uuid4().hex[:12].upper()}",
incident_id=incident.incident_id,
state=DecisionState.ANALYZING,
)
await self._save_token(token)
logger.info(
"decision_analyzing_with_consensus",
token=token.token,
incident_id=incident.incident_id,
)
try:
# 執行共識分析
consensus_result = await asyncio.wait_for(
consensus_engine.run_consensus(incident, timeout_sec),
timeout=timeout_sec,
)
# 轉換為 proposal_data 格式
proposal_data = {
"source": "consensus_engine",
"consensus_id": consensus_result.consensus_id,
"consensus_score": consensus_result.consensus_score,
"action": consensus_result.recommended_action,
"description": consensus_result.final_reasoning,
"risk_level": consensus_result.risk_level,
"kubectl_command": consensus_result.recommended_kubectl,
"reasoning": consensus_result.final_reasoning,
# 2026-04-27 Wave8-B5 by Claude — Consensus auto_approve confidence 修復:
# 原設 0.0 → auto_approve 永遠因 confidence<0.5 拒絕 → 所有 consensus 送人工
# 改為真實共識分數,配合 _is_rule_based 的 consensus_engine+score>=0.6 分支
"confidence": consensus_result.consensus_score,
"agent_count": len(consensus_result.opinions),
"dissenting_opinions": consensus_result.dissenting_opinions,
"from_cache": False,
}
token.state = DecisionState.READY
token.proposal_data = proposal_data
token.updated_at = datetime.now(UTC)
logger.info(
"decision_ready_with_consensus",
token=token.token,
consensus_id=consensus_result.consensus_id,
consensus_score=consensus_result.consensus_score,
)
except TimeoutError:
logger.warning(
"consensus_timeout_using_expert",
token=token.token,
timeout_sec=timeout_sec,
)
# Fallback 到 Expert System
expert_result = expert_analyze(incident)
token.state = DecisionState.READY
token.proposal_data = expert_result
token.updated_at = datetime.now(UTC)
except Exception as e:
logger.exception(
"consensus_error_using_expert",
token=token.token,
error=str(e),
)
expert_result = expert_analyze(incident)
token.state = DecisionState.READY
token.proposal_data = expert_result
token.error = str(e)
token.updated_at = datetime.now(UTC)
await self._save_token(token)
return token
async def resend_stale_ready_tokens(self) -> int:
"""
BUG-005 修復 2026-04-11: 掃描 Redis 中所有 state=ready 且 dedup_key 不存在的 token,
重新推送 Telegram 審核卡片。
觸發時機:API 啟動(lifespan startup)+ 管理 API 手動呼叫。
2026-04-14 Claude Sonnet 4.6 修復: 加並發限制防止 Pod 啟動時壓爆 Ollama
- 原 fire_and_forget 同時啟動 N 個 task → N=108 時 Ollama embedding 全部 timeout
- 改 Semaphore 限 5 並發 + 每批 sleep 1s,總體 throughput 降低但系統穩定
Returns:
重新推送的 token 數量
"""
import asyncio as _asyncio
import json as _json
from src.core.redis_client import get_redis
from src.db.base import get_db_context
from src.repositories.incident_repository import IncidentDBRepository
redis = get_redis()
resent = 0
# GAP-A4 後續修復:限制並發 5,避免壓爆 Ollama
_sem = _asyncio.Semaphore(5)
async def _bounded_push(incident_obj, proposal_data_obj, _id, _token):
async with _sem:
try:
await _push_decision_to_telegram(incident_obj, proposal_data_obj)
logger.info(
"stale_ready_token_resent",
incident_id=_id,
token=_token,
)
except Exception as _e:
logger.warning(
"stale_ready_token_resend_failed",
incident_id=_id,
error=str(_e),
)
# 每次完成後喘 200ms,給 Ollama embedding 恢復空間
await _asyncio.sleep(0.2)
try:
# 掃描所有 decision:* key
cursor = 0
tasks: list[_asyncio.Task] = []
while True:
cursor, keys = await redis.scan(cursor, match="decision:*", count=200)
for key in keys:
try:
raw = await redis.get(key)
if not raw:
continue
data = _json.loads(raw)
if data.get("state") != DecisionState.READY.value:
continue
incident_id = data.get("incident_id", "")
# 取 Incident 資料(確認未 resolved)
async with get_db_context() as _db:
incident = await IncidentDBRepository(_db).get_by_id(incident_id)
if not incident:
continue
if str(getattr(incident, "status", "")).lower() in ("resolved", "closed"):
continue
# 2026-05-02 Claude Opus 4.7 + 統帥 ogt:dedup key 改 fingerprint,
# 與 line 217-218 同邏輯,避免 pod restart resend 路徑繞過 fingerprint dedup。
# 原本 telegram_sent:{incident_id} TTL 600s 早就過期 → 重啟必重發;
# 改 fingerprint + 24h TTL → 同症狀 24h 內任何 INC ID 都不會重推。
_alertname_fp = _incident_alertname_for_dedup(incident).strip().lower().replace(" ", "_")[:60]
_affected = getattr(incident, "affected_services", None) or []
_target_fp = (_affected[0] if _affected else "unknown").lower()[:40]
dedup_key = f"telegram_sent:fp:{_alertname_fp}:{_target_fp}"
if await redis.exists(dedup_key):
continue # dedup 還在,跳過
# 2026-04-15 Claude Sonnet 4.6 (節點 3 修復):
# 跳過 > 3 天的 stale incident — labels 已過時,重 process 無意義
# 只會壓爆 Ollama + 污染 Telegram 卡片(截圖:4/11 的卡片今天還在彈)
_STALE_DAYS = 3
_created_at = getattr(incident, "created_at", None)
if _created_at:
from datetime import datetime as _dt, timedelta as _td, timezone as _tz
_now = _dt.now(_tz.utc)
_cutoff = _now - _td(days=_STALE_DAYS)
# 確保 _created_at 有時區
if _created_at.tzinfo is None:
_created_at = _created_at.replace(tzinfo=_tz.utc)
if _created_at < _cutoff:
logger.debug(
"stale_ready_token_skipped_too_old",
incident_id=incident_id,
age_days=(_now - _created_at).days,
cutoff_days=_STALE_DAYS,
)
continue
proposal_data = data.get("proposal_data") or {}
if not proposal_data:
continue
# 用 Semaphore 限制並發,task 自帶 throttle
_task = _asyncio.create_task(
_bounded_push(incident, proposal_data, incident_id, data.get("token", ""))
)
tasks.append(_task)
resent += 1
except Exception as _te:
logger.debug("stale_ready_token_scan_error", error=str(_te))
if cursor == 0:
break
# 不等所有 task 完成(fire-and-forget 語義保留),但 await 一下讓並發限制生效
if tasks:
logger.info("stale_ready_tokens_throttled_dispatch",
total=len(tasks), max_concurrent=5)
except Exception as e:
logger.warning("resend_stale_ready_tokens_failed", error=str(e))
logger.info("stale_ready_tokens_scan_done", resent=resent)
return resent
async def _ssh_execute(
self,
incident: "Incident",
token: "DecisionToken",
action: str,
target: str,
) -> None:
"""
ADR-073 Phase 3-2: infrastructure 告警 SSH MCP routing
Docker/Host 告警走 SSH MCP Provider,不走 K8s executor
2026-04-12 ogt
支援指令:
- docker restart
- systemctl restart
- docker rm -f (含 docker start)
"""
import os as _os
# 取得主機 — 從 instance label 或 SSH_MCP_ALLOWED_HOSTS 第一台
_instance = incident.signals[0].labels.get("instance", "") if incident.signals else ""
_host = _instance.split(":")[0] if ":" in _instance else _instance
_allowed = [h.strip() for h in _os.environ.get("SSH_MCP_ALLOWED_HOSTS", "").split(",") if h.strip()]
if not _host or _host not in _allowed:
_host = _allowed[0] if _allowed else ""
if not _host:
logger.warning(
"ssh_execute_no_host",
incident_id=incident.incident_id,
reason="SSH_MCP_ALLOWED_HOSTS 未設定或 instance label 不在白名單",
)
token.state = DecisionState.READY
token.proposal_data["decision_state"] = DecisionState.READY.value
token.proposal_data["auto_executed"] = False
token.proposal_data["mcp_all_failed"] = True
await self._save_token(token)
_fire_and_forget(
_escalate_decision_auto_repair_unavailable(
incident=incident,
token=token,
failure_reason="SSH MCP host unavailable or not allowed",
attempted_actions="decision_manager._ssh_execute -> host_resolution_failed",
)
)
_fire_and_forget(_push_decision_to_telegram(incident, token.proposal_data))
return
# 解析 SSH tool + params
# 2026-04-27 Claude Sonnet 4.6: 加入主機診斷路徑
# 根因:只支援 docker/systemctl restart,主機告警 ssh {host} '...' 格式全降級人工
# 修復:識別 ssh_diagnose 模式,路由到 ssh_get_top_processes / ssh_get_disk_usage
_labels = incident.signals[0].labels if incident.signals else {}
if not target or target == "unknown":
target = (
_labels.get("container")
or _labels.get("name")
or _labels.get("component")
or _labels.get("service")
or _labels.get("job")
or "unknown"
)
_action_lower = action.lower().strip()
if _action_lower.startswith("docker restart"):
_tool = "ssh_docker_restart"
_container = target
elif _action_lower.startswith("systemctl restart"):
_tool = "ssh_systemctl_restart"
_service = target
elif _action_lower.startswith("ssh ") and "systemctl restart" in _action_lower:
import re as _ssh_re
_tool = "ssh_systemctl_restart"
_svc_match = _ssh_re.search(r"systemctl\s+restart\s+([a-z0-9_.-]+)", _action_lower)
_service = _svc_match.group(1) if _svc_match else target
elif _action_lower.startswith("ssh ") and "docker restart" in _action_lower and target != "unknown":
_tool = "ssh_docker_restart"
_container = target
elif _action_lower.startswith("ssh ") and "docker" in _action_lower and "prune" in _action_lower:
# 主機磁碟告警自動修復:docker image/volume/builder prune
# 2026-05-02 ogt + Claude Sonnet 4.6: ADR-068 飛輪 — disk full SOP
# ssh_docker_prune 內含 ≥75% 磁碟使用率守衛,低於閾值會 no-op
_tool = "ssh_docker_prune"
elif _action_lower.startswith("ssh ") and ("ps aux" in _action_lower or "top" in _action_lower or "free" in _action_lower or "df -h" in _action_lower or "uptime" in _action_lower):
# 主機診斷指令:自動收集 CPU/記憶體/磁碟,不修改任何狀態
_tool = "ssh_diagnose"
else:
logger.info(
"ssh_execute_unknown_action",
incident_id=incident.incident_id,
action=action,
reason="不支援的 SSH action 格式,降級為人工審核",
)
token.state = DecisionState.READY
token.proposal_data["decision_state"] = DecisionState.READY.value
token.proposal_data["auto_executed"] = False
token.proposal_data["mcp_all_failed"] = True
await self._save_token(token)
_fire_and_forget(
_escalate_decision_auto_repair_unavailable(
incident=incident,
token=token,
failure_reason="Unsupported SSH remediation action",
attempted_actions=f"decision_manager._ssh_execute -> unsupported_action: {action[:160]}",
)
)
_fire_and_forget(_push_decision_to_telegram(incident, token.proposal_data))
return
params: dict = {"host": _host}
if _tool == "ssh_docker_restart":
params["container_name"] = _container
elif _tool == "ssh_systemctl_restart":
params["service"] = _service
if _tool in {"ssh_docker_restart", "ssh_systemctl_restart"}:
# Codex: 用 token 的 trust_score / confidence 餵給 Group B 守衛
try:
params["trust_score"] = float(
token.proposal_data.get("trust_score")
or token.proposal_data.get("confidence")
or 0.0
)
except Exception:
params["trust_score"] = 0.0
elif _tool == "ssh_docker_prune":
# 2026-05-02 ogt + Claude Sonnet 4.6: docker prune 飛輪
# Group B 工具需要 trust_score;自動修復鏈用 0.85(高於 0.8 門檻)
params["trust_score"] = 0.85
# ssh_diagnose: read-only, host only.
try:
result = await self._ssh.execute(tool_name=_tool, parameters=params)
success = result.success
output_preview = str(result.output or "")[:200]
error_text = result.error or ""
logger.info(
"ssh_execute_result",
incident_id=incident.incident_id,
tool=_tool,
host=_host,
success=success,
output=output_preview,
error=error_text[:200],
)
if not success:
token.state = DecisionState.READY
token.error = error_text or "SSH MCP execution failed"
token.proposal_data["decision_state"] = DecisionState.READY.value
token.proposal_data["auto_executed"] = False
token.proposal_data["mcp_all_failed"] = True
await self._save_token(token)
_fire_and_forget(
_escalate_decision_auto_repair_unavailable(
incident=incident,
token=token,
failure_reason=token.error,
attempted_actions=f"decision_manager._ssh_execute -> {_tool}",
)
)
_fire_and_forget(
_push_auto_repair_result(
incident,
action,
success=False,
error=token.error,
)
)
_fire_and_forget(_push_decision_to_telegram(incident, token.proposal_data))
return
if _tool == "ssh_diagnose":
token.state = DecisionState.READY
token.proposal_data["decision_state"] = DecisionState.READY.value
token.proposal_data["auto_executed"] = False
token.proposal_data["ssh_diagnosis_collected"] = True
token.proposal_data["ssh_diagnosis_preview"] = output_preview
await self._save_token(token)
_fire_and_forget(
_escalate_decision_auto_repair_unavailable(
incident=incident,
token=token,
failure_reason="SSH diagnosis collected; no safe automatic repair action was produced",
attempted_actions=f"decision_manager._ssh_execute -> {_tool}",
)
)
_fire_and_forget(_push_decision_to_telegram(incident, token.proposal_data))
return
token.state = DecisionState.COMPLETED
token.proposal_data["auto_executed"] = True
token.proposal_data["decision_state"] = DecisionState.COMPLETED.value
await self._save_token(token)
_fire_and_forget(
_push_auto_repair_result(incident, action, success=success)
)
except Exception as e:
logger.error(
"ssh_execute_failed",
incident_id=incident.incident_id,
error=str(e),
)
token.state = DecisionState.READY
token.error = str(e)
token.proposal_data["decision_state"] = DecisionState.READY.value
token.proposal_data["auto_executed"] = False
token.proposal_data["mcp_all_failed"] = True
await self._save_token(token)
_fire_and_forget(
_escalate_decision_auto_repair_unavailable(
incident=incident,
token=token,
failure_reason=str(e),
attempted_actions="decision_manager._ssh_execute -> exception",
)
)
_fire_and_forget(
_push_auto_repair_result(incident, action, success=False, error=str(e))
)
_fire_and_forget(_push_decision_to_telegram(incident, token.proposal_data))
# =============================================================================
# Singleton
# =============================================================================
_decision_manager: DecisionManager | None = None
def get_decision_manager() -> DecisionManager:
"""取得 DecisionManager 實例 (Singleton)"""
global _decision_manager
if _decision_manager is None:
_decision_manager = DecisionManager()
return _decision_manager