Files
awoooi/apps/api/src/services/decision_manager.py
OG T 2cef2098d3
Some checks failed
CD Pipeline / build-and-deploy (push) Has been cancelled
feat(adr-075): 修復 Telegram 動態按鈕 4 個斷點 + 新增 7 種告警分類
斷點 A: decision_manager 提取 alert_category/notification_type 傳入 send_approval_card
斷點 B: send_approval_card 新增參數並傳遞至 _build_inline_keyboard
斷點 C: 互動型通知 (TYPE-3/4/4D/8M) 禁止發 SRE 群組,防 nonce 洩漏
斷點 D: _CATEGORY_BUTTONS k8s_workload → kubernetes + 新增 6 類按鈕組

classify_alert_early 新增: alertchain_health, flywheel_health, storage,
devops_tool, external_site, ssl_cert, host_resource (從 infrastructure 分離)
Test: 52 classify + 664 total passed

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-12 18:35:56 +08:00

2135 lines
88 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Decision Manager - Phase 6.5 非同步決策狀態機
# 2026-04-10 Claude Sonnet 4.6: auto_execute ApprovalRequest 修復 + placeholder 替換
=============================================
實作「雙軌決策」(Dual-Engine Decision):
1. OpenClaw LLM (主要) - 智能提案
2. Expert System (備援) - 規則引擎
狀態機:
- INIT: 事件剛建立
- ANALYZING: 正在分析中 (LLM + Expert 並行)
- READY: 決策就緒,等待統帥親核
- EXECUTING: 已授權,正在執行
- COMPLETED: 執行完成
統帥鐵律:
- 永遠不能讓 UI 鎖死
- 30 秒內必須有 decision_token
- LLM 失敗時 Expert System 保底
"""
import asyncio
from datetime import UTC, datetime
from enum import Enum
from typing import Any, Protocol, runtime_checkable
from uuid import uuid4
import structlog
from src.core.config import settings
from src.core.redis_client import get_redis
from src.models.incident import Incident
from src.models.playbook import SymptomPattern
from src.services.auto_approve import get_auto_approve_policy
from src.services.openclaw import get_openclaw
from src.services.playbook_service import get_playbook_service
logger = structlog.get_logger(__name__)
# Phase 7.5: Playbook 優先閾值
PLAYBOOK_SIMILARITY_THRESHOLD = 0.85 # 相似度 >= 85% 直接使用 Playbook
# P1 fix 2026-04-11: background task GC guard — keep strong refs until done
_background_tasks: set[asyncio.Task] = set()
def _fire_and_forget(coro) -> asyncio.Task:
"""Create a background task with GC protection via _background_tasks."""
task = asyncio.create_task(coro)
_background_tasks.add(task)
task.add_done_callback(_background_tasks.discard)
return task
# P1 fix 2026-04-11: kubectl action dangerous char whitelist
import re as _re_module
_ALLOWED_KUBECTL_PATTERN = _re_module.compile(
r"^kubectl\s+(rollout restart|rollout undo|scale|delete pod|get|describe|logs)"
r"\s+[a-zA-Z0-9_./-]+(\s+(-n|--namespace)\s+[a-zA-Z0-9_-]+)?$"
)
# =============================================================================
# Phase 31 (ADR-067 2026-04-10): Log 異常摘要 — NemoTron deepseek-r1:14b
# =============================================================================
async def _send_log_summary(incident: "Incident") -> None:
"""
非同步取得 Pod log 異常摘要,用 NemoTron bot 發到 SRE 群組
觸發點_push_decision_to_telegram 發完審批卡後
"""
try:
target = incident.affected_services[0] if incident.affected_services else None
if not target:
return
namespace = "awoooi-prod"
if incident.signals:
namespace = incident.signals[0].labels.get("namespace", "awoooi-prod")
from src.services.log_summary_service import get_log_summary_service
svc = get_log_summary_service()
summary = await svc.summarize_with_soft_timeout(pod_name=target, namespace=namespace)
if not summary:
return
from src.services.telegram_gateway import get_telegram_gateway
tg = get_telegram_gateway()
await tg.send_as_nemotron(
f"📋 <b>Log 異常摘要</b> — <code>{target}</code>\n{summary}"
)
import structlog as _sl
_sl.get_logger(__name__).info("log_summary_sent", target=target, incident_id=incident.incident_id)
except Exception as e:
import structlog as _sl
_sl.get_logger(__name__).warning("log_summary_failed", error=str(e))
# =============================================================================
# Telegram 推送 (Phase 6.5: 決策就緒通知)
# =============================================================================
async def _push_decision_to_telegram(
incident: Incident,
proposal_data: dict[str, Any],
) -> None:
"""
決策就緒時推送到 Telegram
Phase 6.5: 整合 Signal Worker 流程與 Telegram 通知
2026-03-27 ogt: 加入 Redis 去重機制 (10 分鐘 TTL)
"""
try:
# 延遲導入避免循環依賴
from src.core.redis_client import get_redis
from src.services.telegram_gateway import (
classify_notification,
get_telegram_gateway,
NotificationType,
)
# 🔴 去重檢查:同一個 incident 10 分鐘內只發一次
redis = get_redis()
dedup_key = f"telegram_sent:{incident.incident_id}"
if await redis.exists(dedup_key):
logger.debug(
"telegram_push_skipped",
reason="Already sent within 10 minutes",
incident_id=incident.incident_id,
)
return
# 2026-04-09 Claude Code: resolved Incident 不重送 Telegram
# 場景: dedup TTL 過期後,已 resolve 的 Incident 仍被重新推送
if incident.status and str(incident.status).lower() in ("resolved", "closed"):
logger.info(
"telegram_push_skipped",
reason="Incident already resolved",
incident_id=incident.incident_id,
)
return
# 🔴 靜默檢查:此資源是否被靜默 (2026-03-27 P1 優化)
target = incident.affected_services[0] if incident.affected_services else "unknown"
silence_key = f"telegram_silence:{target}"
if await redis.exists(silence_key):
logger.info(
"telegram_push_silenced",
reason="Resource is silenced",
incident_id=incident.incident_id,
resource=target,
)
return
# 檢查是否有設定 Bot Token
if not settings.OPENCLAW_TG_BOT_TOKEN:
logger.debug(
"telegram_push_skipped",
reason="Bot token not configured",
incident_id=incident.incident_id,
)
return
gateway = get_telegram_gateway()
# 從 proposal_data 提取資料
import re as _re
def _strip_placeholders(s: str) -> str:
"""移除 <placeholder> 佔位符,避免 Telegram HTML parse 錯誤"""
return _re.sub(r'<[^>]+>', '', s).strip()
target = incident.affected_services[0] if incident.affected_services else "unknown"
risk_level = proposal_data.get("risk_level", "medium")
# 2026-04-09 Claude Code: action 不用 _strip_placeholders避免截掉 deployment name
# <placeholder> 應在 nemotron 補正後已填入真實值
action = proposal_data.get("action", proposal_data.get("kubectl_command", ""))
# 2026-04-09 Claude Code: 修復舊 Incident proposal_data 存 enum string 導致建議空白
# 舊 code 存 action="RESTART_DEPLOYMENT" 而非 kubectl command
# 偵測:無 kubectl/ssh/docker 關鍵字 → 用規則引擎重新查
_KUBECTL_MARKERS = ("kubectl", "ssh", "docker", "systemctl", "/")
if action and not any(m in action for m in _KUBECTL_MARKERS):
# action 是 enum string嘗試用規則引擎補出 kubectl command
try:
from src.services.alert_rule_engine import match_rule as _match_rule
_labels = incident.signals[0].labels if incident.signals else {}
_rule_resp = _match_rule({
"labels": _labels,
"alert_type": _labels.get("alertname", target),
"message": incident.title or "",
"target_resource": target,
"namespace": incident.signals[0].labels.get("namespace", "awoooi-prod") if incident.signals else "awoooi-prod",
"severity": risk_level,
})
if _rule_resp and _rule_resp.get("kubectl_command", "").strip():
action = _rule_resp["kubectl_command"]
except Exception:
pass # 規則引擎失敗不影響通知,保留原 action
description = proposal_data.get("description", "")
reasoning = _strip_placeholders(proposal_data.get("reasoning", ""))
confidence = proposal_data.get("confidence", 0.0) # 🔴 預設 0.0 表示未經 AI 分析
source = proposal_data.get("source", "unknown")
ai_provider = proposal_data.get("provider", "") # 2026-03-29 ogt: AI 模型來源
ai_model = proposal_data.get("model", "") # 2026-04-04 ogt: 底層模型名稱
# 2026-04-02 ogt: Phase 22 Nemotron 協作資料
nemotron_enabled = proposal_data.get("nemotron_enabled", False)
nemotron_tools = proposal_data.get("nemotron_tools")
nemotron_validation = proposal_data.get("nemotron_validation", "")
nemotron_latency_ms = proposal_data.get("nemotron_latency_ms", 0.0)
# 2026-04-09 Claude Sonnet 4.6: Tool Calling 模型/後端
nemotron_tool_model = proposal_data.get("nemotron_tool_model", "")
nemotron_tool_backend = proposal_data.get("nemotron_tool_backend", "")
# 建立 approval_id (使用 incident_id 作為追蹤)
# 2026-03-27 ogt: 修復 INC-INC-INC- 重複前綴 bug
approval_id = incident.incident_id # 已經是 INC-xxx 格式
# ADR-071: 通知分類器 — 依告警類型/狀態決定卡片種類
# 2026-04-11 Claude Sonnet 4.6: 接通 classify_notification(),原本死程式碼
_auto_executed = proposal_data.get("auto_executed", False)
_decision_state = proposal_data.get("decision_state", "")
_mcp_all_failed = proposal_data.get("mcp_all_failed", False)
_notif_type = classify_notification(
incident=incident,
confidence=confidence,
auto_executed=_auto_executed,
mcp_all_failed=_mcp_all_failed,
decision_state=_decision_state,
)
if _notif_type == NotificationType.TYPE_1:
# 純資訊通知 — 無按鈕
tg_result = await gateway.send_info_notification(
incident_id=incident.incident_id,
title=incident.title or "告警通知",
message=reasoning[:200] if reasoning else description[:200],
alertname=incident.signals[0].labels.get("alertname", "") if incident.signals else "",
severity="info",
)
elif _notif_type == NotificationType.TYPE_4_DRIFT:
# Config Drift 專屬卡片
tg_result = await gateway.send_drift_card(
incident_id=incident.incident_id,
approval_id=approval_id,
resource_name=target[:50],
diff_summary=description[:500],
)
else:
# TYPE-2 / TYPE-3 / TYPE-4 都走 send_approval_card按鈕組合由 alert_category 決定)
# 2026-04-12 ogt: ADR-075 斷點 A 修復 — 從 Incident 提取 alert_category/notification_type
_alert_category = getattr(incident, "alert_category", "") or ""
_notification_type = getattr(incident, "notification_type", "") or _notif_type.value if _notif_type else ""
tg_result = await gateway.send_approval_card(
approval_id=approval_id,
risk_level=risk_level,
resource_name=target[:50],
root_cause=reasoning[:150] if reasoning else description[:150],
suggested_action=action[:80] if action else "待分析",
estimated_downtime="5-15 min",
primary_responsibility="INFRA",
confidence=confidence,
namespace=incident.signals[0].labels.get("namespace", "default") if incident.signals else "default",
ai_provider=ai_provider,
ai_model=ai_model,
nemotron_enabled=nemotron_enabled,
nemotron_tools=nemotron_tools,
nemotron_validation=nemotron_validation,
nemotron_latency_ms=nemotron_latency_ms,
nemotron_tool_model=nemotron_tool_model,
nemotron_tool_backend=nemotron_tool_backend,
incident_id=incident.incident_id,
alert_category=_alert_category,
notification_type=_notification_type,
)
# 2026-04-09 Claude Sonnet 4.6: 存 message_id → 後續狀態更新在原訊息延續
# 同時寫 Redis (快速查詢) 和 DB (持久化,不受 TTL 限制)
tg_message_id = tg_result.get("result", {}).get("message_id") if isinstance(tg_result, dict) else None
tg_chat_id = tg_result.get("result", {}).get("chat", {}).get("id") if isinstance(tg_result, dict) else None
if tg_message_id:
await redis.setex(f"tg_msg:{incident.incident_id}", 86400, str(tg_message_id))
# 持久化到 DB
try:
from src.services.approval_db import get_approval_service as _get_approval_svc
_approval_svc = _get_approval_svc()
await _approval_svc.update_telegram_message(
incident_id=incident.incident_id,
telegram_message_id=tg_message_id,
telegram_chat_id=tg_chat_id,
)
except Exception as _e:
logger.warning("telegram_message_id_db_save_failed", incident_id=incident.incident_id, error=str(_e))
# Phase 31 (ADR-067 2026-04-10): Log 異常摘要 — NemoTron deepseek-r1:14b
# 非同步執行,不阻塞主流程
_fire_and_forget(_send_log_summary(incident))
# MCP Phase 4a: NemoClaw second opinion (2026-04-11 Claude Sonnet 4.6)
# 若 proposal_data 有 advisory_note用 NemoClaw bot 身分追加一條訊息
_advisory_note = proposal_data.get("advisory_note", "")
if _advisory_note:
_fire_and_forget(
gateway.send_as_nemotron(
f"🤔 <b>NemoClaw 第二意見</b> (信心={confidence:.2f})\n"
f"<i>{_advisory_note}</i>"
)
)
# 🔴 發送成功後設置去重 key (TTL 10 分鐘)
await redis.setex(dedup_key, 600, "1")
logger.info(
"telegram_decision_pushed",
incident_id=incident.incident_id,
source=source,
risk_level=risk_level,
)
except Exception as e:
# Telegram 失敗不影響主流程
logger.warning(
"telegram_decision_push_failed",
incident_id=incident.incident_id,
error=str(e),
)
async def _nemoclaw_second_opinion(incident: "Incident", primary_result: dict) -> str | None:
"""
MCP Phase 4a: NemoClaw second opinion — 信心 < 0.7 時觸發
============================================================
用 deepseek-r1:14b (Ollama 188) 對同一份資料做獨立推理,
輸出純文字 advisory_note不執行任何操作。
2026-04-11 Claude Sonnet 4.6 Asia/Taipei
"""
try:
from src.core.config import settings
import httpx as _httpx
from src.services.model_registry import get_model as _get_model
ollama_url = getattr(settings, "OLLAMA_URL", "http://192.168.0.188:11434")
# D1 集中化 2026-04-11: 從 models.json providers.ollama.models.nemoclaw 讀取
model = _get_model("ollama", "nemoclaw")
signals_summary = ""
if incident.signals:
lbl = incident.signals[0].labels
signals_summary = (
f"alertname={lbl.get('alertname','?')} "
f"severity={lbl.get('severity','?')} "
f"instance={lbl.get('instance','?')}"
)
prompt = (
f"你是資深 SRE請對以下告警做獨立分析並提出建議。\n"
f"告警摘要: {signals_summary}\n"
f"受影響服務: {', '.join(incident.affected_services or [])}\n"
f"主 AI 已提出: {primary_result.get('action','?')} "
f"(信心={primary_result.get('confidence',0):.2f})\n"
f"主 AI 根因: {primary_result.get('reasoning','')[:200]}\n\n"
f"請用繁體中文100 字以內,給出你的獨立判斷與建議(若同意主 AI 則說明原因)。"
)
async with _httpx.AsyncClient(timeout=30.0) as client:
resp = await client.post(
f"{ollama_url}/api/generate",
json={"model": model, "prompt": prompt, "stream": False},
)
resp.raise_for_status()
data = resp.json()
advisory = data.get("response", "").strip()
# 截取 <think>...</think> 後的正文deepseek-r1 CoT 格式)
if "</think>" in advisory:
advisory = advisory.split("</think>", 1)[-1].strip()
return advisory[:300] if advisory else None
except Exception as e:
import structlog as _sl
_sl.get_logger(__name__).debug("nemoclaw_second_opinion_error", error=str(e))
return None
async def _generate_playbook_draft_if_new(incident: "Incident") -> None:
"""
MCP Phase 4c: Playbook 無命中時,自動生成 AI 草稿 Playbook 寫入 KM
=====================================================================
- 僅在 KM 中不存在同 alertname 的 Playbook 時觸發(避免重複)
- 用 qwen2.5:7b-instruct (Ollama 188) 生成結構化 Playbook 草稿
- 寫入 KnowledgeEntrystatus=DRAFT需人工審核後升為 APPROVED
- 寫入 AlertOperationLog PLAYBOOK_DRAFT_CREATED 事件
2026-04-11 Claude Sonnet 4.6 Asia/Taipei
"""
try:
import httpx as _httpx
from src.core.config import settings
from src.models.knowledge import (
EntrySource, EntryStatus, EntryType, KnowledgeEntryCreate,
)
from src.repositories.alert_operation_log_repository import get_alert_operation_log_repository
from src.services.knowledge_service import get_knowledge_service
alertname = ""
if incident.signals:
alertname = incident.signals[0].labels.get("alertname", "")
if not alertname:
return
# 已存在同 alertname 的 KM 條目則跳過
knowledge_svc = get_knowledge_service()
existing = await knowledge_svc.semantic_search(alertname, limit=1, threshold=0.92)
if existing:
return
# 用 qwen2.5:7b-instruct 生成 Playbook 草稿
severity = incident.signals[0].labels.get("severity", "warning") if incident.signals else "warning"
services = ", ".join(incident.affected_services or ["unknown"])
prompt = (
f"你是資深 SRE請為以下告警生成一份結構化 Playbook 草稿(繁體中文)。\n"
f"告警名稱: {alertname}\n"
f"嚴重度: {severity}\n"
f"受影響服務: {services}\n\n"
f"請按以下格式輸出(不超過 300 字):\n"
f"## 症狀\n(描述此告警代表什麼)\n"
f"## 根因假設\n(最常見的 2-3 個原因)\n"
f"## 診斷步驟\nkubectl 或 shell 指令)\n"
f"## 修復動作\n(具體修復指令,含 kubectl rollout restart 等)\n"
f"## 驗收條件\n(如何確認修復成功)"
)
from src.services.model_registry import get_model as _get_model
ollama_url = getattr(settings, "OLLAMA_URL", "http://192.168.0.188:11434")
# D1 集中化 2026-04-11: 從 models.json providers.ollama.models.playbook_draft 讀取
_pb_model = _get_model("ollama", "playbook_draft")
async with _httpx.AsyncClient(timeout=45.0) as client:
resp = await client.post(
f"{ollama_url}/api/generate",
json={"model": _pb_model, "prompt": prompt, "stream": False},
)
resp.raise_for_status()
content = resp.json().get("response", "").strip()
if not content or len(content) < 50:
return
# 寫入 KMstatus=DRAFT
entry = await knowledge_svc.create_entry(
KnowledgeEntryCreate(
title=f"[AI草稿] {alertname} Playbook",
content=content,
entry_type=EntryType.PLAYBOOK,
category="auto_generated",
tags=[alertname, severity, "ai_draft", "mcp_phase4c"],
source=EntrySource.AI_EXTRACTED,
status=EntryStatus.DRAFT,
related_incident_id=incident.incident_id,
)
)
# 寫入操作日誌
op_repo = get_alert_operation_log_repository()
await op_repo.append(
event_type="PLAYBOOK_DRAFT_CREATED",
incident_id=incident.incident_id,
actor="mcp_phase4c",
action_detail=f"AI 草稿 Playbook: {entry.entry_id}",
success=True,
context={"alertname": alertname, "km_entry_id": entry.entry_id},
)
import structlog as _sl
_sl.get_logger(__name__).info(
"playbook_draft_created",
incident_id=incident.incident_id,
alertname=alertname,
entry_id=entry.entry_id,
)
except Exception as e:
import structlog as _sl
_sl.get_logger(__name__).debug("playbook_draft_failed", error=str(e))
async def _resolve_target_from_k8s(incident: "Incident", namespace: str) -> str | None:
"""
BUG-002 補救:主機層告警無 component/job/pod label 時,
用 K8s MCP kubectl get pods 依 alertname/host label 動態查詢受影響 Pod name
回傳 deployment name去掉 hash suffix或 None。
2026-04-11 Claude Sonnet 4.6 Asia/Taipei
"""
try:
from src.plugins.mcp.providers.k8s_provider import K8sProvider
k8s = K8sProvider()
if not k8s.enabled:
return None
alertname = ""
if incident.signals:
labels = incident.signals[0].labels
alertname = labels.get("alertname", "")
# 用 kubectl get pods 列出所有 pods再根據 alertname 推測受影響的 deployment
result = await k8s.execute(
tool_name="kubectl_get",
params={"resource": "pods", "namespace": namespace, "output": "name"},
)
if not result.get("success"):
return None
pod_lines: list[str] = (result.get("output", "") or "").splitlines()
if not pod_lines:
return None
# alertname → 關鍵字映射(主機層告警常見類型)
# I2 修復 2026-04-11: HostHighDiskUsage → HostOutOfDiskSpace與 alerts-unified.yml 一致)
# DockerContainerUnhealthy/HostOutOfDiskSpace keywords=[] 走 fallback找第一個非 infra pod
# 並加 log 便於追蹤 fallback 路徑
_ALERTNAME_KEYWORDS: dict[str, list[str]] = {
"HostHighCpuLoad": ["api", "web"],
"HostOutOfMemory": ["api", "web"],
"DockerContainerUnhealthy": [],
"DockerContainerExited": [],
"HostOutOfDiskSpace": [],
}
keywords = _ALERTNAME_KEYWORDS.get(alertname, [])
if not keywords and alertname in _ALERTNAME_KEYWORDS:
logger.debug(
"resolve_target_k8s_fallback_to_first_pod",
alertname=alertname,
reason="alertname 有對應但 keywords=[],走 fallback 取第一個非 infra pod",
)
import re as _re
for line in pod_lines:
pod = line.removeprefix("pod/").strip()
if not pod:
continue
# 優先找關鍵字命中的 pod
if keywords and not any(kw in pod for kw in keywords):
continue
# 去掉 hash suffix → deployment name
parts = pod.rsplit("-", 2)
if len(parts) >= 3 and len(parts[-1]) == 5 and len(parts[-2]) in (9, 10):
return parts[0]
if len(parts) >= 2:
return "-".join(parts[:-1])
return pod
# 無關鍵字命中時,回傳第一個 non-infra pod
for line in pod_lines:
pod = line.removeprefix("pod/").strip()
if pod and not any(inf in pod for inf in ("prometheus", "alertmanager", "grafana")):
parts = pod.rsplit("-", 2)
if len(parts) >= 3:
return parts[0]
return pod
except Exception as e:
logger.debug("resolve_target_from_k8s_failed", error=str(e))
return None
async def _verify_k8s_deployment_exists(target: str, namespace: str, alertname: str = "") -> bool:
"""
BUG-003 補救:呼叫 K8s MCP 確認 deployment/pod 是否真實存在。
K8s MCP 不可用時:
- 主機層告警 (Host*/Docker*) → 返回 False阻止 K8s 操作)
- K8s 層告警 → 返回 True保守放行讓 kubectl 自行報錯)
2026-04-11 Claude Sonnet 4.6 Asia/Taipei
"""
# 主機層告警前綴 — 這類告警的修復目標是 Docker container/主機,不是 K8s deployment
_HOST_ALERTNAME_PREFIXES = ("Host", "Docker", "Backup", "Velero", "SSH")
_is_host_alert = alertname.startswith(_HOST_ALERTNAME_PREFIXES) if alertname else False
try:
from src.plugins.mcp.providers.k8s_provider import K8sProvider
k8s = K8sProvider()
if not k8s.enabled:
# 主機層告警K8s MCP 不可用 → 拒絕(不應對主機層問題執行 K8s 操作)
# K8s 層告警:保守放行,讓 kubectl 自行報錯
return not _is_host_alert
result = await k8s.execute(
tool_name="kubectl_get",
params={"resource": "deployment", "name": target, "namespace": namespace},
)
if result.get("success"):
return True
# 嘗試 pod有些告警對應的是 pod 而非 deployment
result_pod = await k8s.execute(
tool_name="kubectl_get",
params={"resource": "pod", "namespace": namespace, "selector": f"app={target}"},
)
return bool(result_pod.get("success") and result_pod.get("output", "").strip())
except Exception as e:
logger.debug("verify_k8s_deployment_exists_failed", target=target, error=str(e))
# 例外時:主機層告警拒絕,其他保守放行
return not _is_host_alert
async def _fetch_metrics_snapshot(incident: Incident) -> dict:
"""
ADR-071-I: 從 Prometheus 抓取與此 incident 相關的指標快照
失敗時靜默返回空 dict不阻塞主流程
2026-04-11 Claude Sonnet 4.6 Asia/Taipei
"""
try:
from src.plugins.mcp.providers.prometheus_provider import PrometheusProvider
prom = PrometheusProvider()
if not prom.enabled:
return {}
labels = incident.signals[0].labels if incident.signals else {}
alertname = labels.get("alertname", "")
instance = labels.get("instance", "")
snapshots: dict = {}
# 根據 alertname 選擇最相關的指標
if alertname in ("HostHighCpuLoad", "HostOutOfMemory"):
if instance:
host = instance.split(":")[0]
# P0 fix 2026-04-11: _instant_query 要求 dict回傳 {"result": [...]}
r = await prom._instant_query({"query": f'100 - (avg by(instance) (irate(node_cpu_seconds_total{{mode="idle",instance=~"{host}.*"}}[5m])) * 100)'})
for item in r.get("result", []):
snapshots["cpu_pct"] = round(float(item["value"][1]), 1)
cpu_query = (
f'(1 - (node_memory_MemAvailable_bytes{{instance=~"{instance}"}} / node_memory_MemTotal_bytes{{instance=~"{instance}"}})) * 100'
if instance else "100 * (1 - node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes)"
)
r2 = await prom._instant_query({"query": cpu_query})
for item in r2.get("result", []):
snapshots["mem_pct"] = round(float(item["value"][1]), 1)
elif alertname == "HostOutOfDiskSpace":
r = await prom._instant_query({"query": 'max(100 - ((node_filesystem_avail_bytes{fstype!="tmpfs"} / node_filesystem_size_bytes{fstype!="tmpfs"}) * 100))'})
for item in r.get("result", []):
snapshots["disk_pct"] = round(float(item["value"][1]), 1)
elif alertname in ("PodRestartingTooMuch", "PodCrashLoopBackOff"):
pod = labels.get("pod", labels.get("component", ""))
if pod:
r = await prom._instant_query({"query": f'sum(kube_pod_container_status_restarts_total{{namespace="awoooi-prod",pod=~"{pod}.*"}})'})
for item in r.get("result", []):
snapshots["restart_count"] = int(float(item["value"][1]))
return snapshots
except Exception as _e:
logger.debug("metrics_snapshot_failed", incident_id=incident.incident_id, error=str(_e))
return {}
def _format_metrics_delta(before: dict, after: dict) -> str:
"""
ADR-071-I: 格式化指標前後對比文字
CPU 92%→23% | Mem 78%→45%
"""
if not before and not after:
return ""
parts = []
for key, label in [("cpu_pct", "CPU"), ("mem_pct", "Mem"), ("disk_pct", "Disk")]:
b = before.get(key)
a = after.get(key)
if b is not None and a is not None:
parts.append(f"{label} {b}%→{a}%")
elif b is not None:
parts.append(f"{label} {b}% (before)")
elif a is not None:
parts.append(f"{label} {a}% (after)")
for key, label in [("restart_count", "Restarts")]:
b = before.get(key)
a = after.get(key)
if b is not None and a is not None:
parts.append(f"{label} {b}{a}")
return " | ".join(parts)
async def _push_auto_repair_result(
incident: Incident,
action: str,
success: bool,
error: str = "",
) -> None:
"""
自動修復執行後,在原始告警訊息追加狀態行。
統帥要求: 所有狀態變更必須在原告警訊息延續,不發新訊息。
- append_incident_update() 取 Redis tg_msg:{id} → reply 原訊息 + 換按鈕
- 找不到 message_id 時 fallback 到 send_notification降級
ADR-071-I: 成功時抓 metrics_after 快照,寫入 incidents 表,並在通知中顯示前後對比
2026-04-09 Claude Sonnet 4.6 Asia/Taipei
2026-04-11 Claude Sonnet 4.6: +ADR-071-I 指標快照
"""
try:
from src.services.telegram_gateway import get_telegram_gateway
gateway = get_telegram_gateway()
target = incident.affected_services[0] if incident.affected_services else "unknown"
inc_id = incident.incident_id
# ADR-071-I: 抓 metrics_after成功時
metrics_delta_text = ""
if success:
metrics_after = await _fetch_metrics_snapshot(incident)
metrics_before = getattr(incident, "metrics_before", None) or {}
# 寫入 DB不阻塞主流程
if metrics_after:
try:
from src.db.base import get_db_context
from src.db.models import Incident as IncidentORM
from sqlalchemy import update as _update
async with get_db_context() as db:
await db.execute(
_update(IncidentORM)
.where(IncidentORM.incident_id == inc_id)
.values(metrics_after=metrics_after)
)
await db.commit()
logger.info("metrics_after_saved", incident_id=inc_id, metrics=metrics_after)
except Exception as _e:
logger.warning("metrics_after_save_failed", incident_id=inc_id, error=str(_e))
metrics_delta_text = _format_metrics_delta(metrics_before, metrics_after)
# MCP Phase 4b: 抓 K8s Pod 狀態寫入 k8s_state_after (2026-04-11 Claude Sonnet 4.6)
try:
from src.plugins.mcp.providers.k8s_provider import K8sProvider
_k8s = K8sProvider()
if _k8s.enabled:
_service = (incident.affected_services or [""])[0]
if _service:
_k8s_result = await _k8s.execute(
"kubectl_get",
{"resource_type": "pods", "namespace": "awoooi-prod",
"label_selector": f"app={_service}"},
)
if _k8s_result.success and _k8s_result.data:
_k8s_state = str(_k8s_result.data)[:500]
from src.db.base import get_db_context
from src.db.models import Incident as IncidentORM
from sqlalchemy import update as _upd2
async with get_db_context() as _db2:
await _db2.execute(
_upd2(IncidentORM)
.where(IncidentORM.incident_id == inc_id)
.values(k8s_state_after=_k8s_state)
)
await _db2.commit()
except Exception as _k8s_err:
logger.debug("k8s_state_after_failed", incident_id=inc_id, error=str(_k8s_err))
if success:
delta_line = f"\n├ 指標: <code>{metrics_delta_text}</code>" if metrics_delta_text else ""
status_line = (
f"✅ <b>自動修復完成</b>\n"
f"├ <code>{action[:100] if action else '已執行'}</code>"
f"{delta_line}"
)
else:
status_line = (
f"❌ <b>自動修復失敗,請人工介入</b>\n"
f"├ 動作: <code>{action[:80] if action else '未知'}</code>\n"
f"└ 錯誤: {error[:100] if error else '未知錯誤'}"
)
# BUG-006 修復 2026-04-11: outcome + verification_result 全為 null
# 原因_push_auto_repair_result 只送 Telegram沒寫 DB
# 修復:寫入 incidents 表 outcome/verification_result 欄位
try:
from src.db.base import get_db_context
from src.db.models import IncidentRecord
from sqlalchemy import update as _upd_outcome
_outcome = "auto_repaired" if success else "auto_repair_failed"
_verification = (
f"自動修復{'成功' if success else '失敗'}{action[:120] if action else '未知'}"
+ (f" | 錯誤:{error[:80]}" if error else "")
)
async with get_db_context() as _odb:
await _odb.execute(
_upd_outcome(IncidentRecord)
.where(IncidentRecord.incident_id == inc_id)
.values(outcome=_outcome, verification_result=_verification)
)
await _odb.commit()
logger.info("outcome_written", incident_id=inc_id, outcome=_outcome)
except Exception as _oe:
logger.warning("outcome_write_failed", incident_id=inc_id, error=str(_oe))
# 優先: reply 原告警訊息並換掉按鈕
appended = await gateway.append_incident_update(
incident_id=inc_id,
status_line=status_line,
keep_info_buttons=True, # 保留詳情/重診/歷史,移除批准/拒絕
)
# Fallback: 找不到原訊息 ID舊告警或 Redis 過期)→ 發新訊息
if not appended:
fallback_text = (
f"{'' if success else ''} <b>[自動修復{'完成' if success else '失敗'}]</b> "
f"<code>{inc_id}</code>\n"
f"對象: <code>{target[:50]}</code>\n"
f"{status_line}"
)
await gateway.send_notification(fallback_text)
logger.info("auto_repair_result_sent", incident_id=inc_id, success=success, appended=appended)
except Exception as e:
logger.warning("auto_repair_result_push_failed", incident_id=incident.incident_id, error=str(e))
# =============================================================================
# Decision States
# =============================================================================
class DecisionState(str, Enum):
"""決策狀態機"""
INIT = "init" # 事件剛建立
ANALYZING = "analyzing" # 正在分析
READY = "ready" # 決策就緒
EXECUTING = "executing" # 正在執行
COMPLETED = "completed" # 已完成
ERROR = "error" # 錯誤
# =============================================================================
# Expert System - 規則引擎 (Local Fallback)
# =============================================================================
EXPERT_RULES: dict[str, dict[str, Any]] = {
# Pod 崩潰 → 重啟
"pod_crash": {
"patterns": ["crash", "restart", "oom", "killed", "failed"],
"action": "kubectl rollout restart deployment/{target}",
"description": "Expert System: 偵測到 Pod 異常,建議重啟部署",
"risk_level": "medium",
"reasoning": "根據歷史數據,重啟可解決 85% 的 Pod 崩潰問題",
},
# 高延遲 → 擴容
"high_latency": {
"patterns": ["latency", "slow", "timeout", "p99"],
"action": "kubectl scale deployment/{target} --replicas=3",
"description": "Expert System: 偵測到高延遲,建議擴容至 3 副本",
"risk_level": "low",
"reasoning": "擴容可分散負載,降低單一 Pod 壓力",
},
# 高錯誤率 → 回滾
"high_error_rate": {
"patterns": ["error", "5xx", "fail", "exception"],
"action": "kubectl rollout undo deployment/{target}",
"description": "Expert System: 偵測到高錯誤率,建議回滾至上一版",
"risk_level": "critical",
"reasoning": "錯誤率突增通常源自最近部署,回滾是最快修復方式",
},
# 資源耗盡 → 擴容
"resource_exhaustion": {
"patterns": ["cpu", "memory", "resource", "quota"],
"action": "kubectl scale deployment/{target} --replicas=2",
"description": "Expert System: 偵測到資源耗盡,建議擴容",
"risk_level": "medium",
"reasoning": "增加副本可分散資源壓力",
},
# 預設 → 重啟 (最保守)
"default": {
"patterns": [],
"action": "kubectl rollout restart deployment/{target}",
"description": "Expert System: 無法確定具體問題,建議安全重啟",
"risk_level": "medium",
"reasoning": "重啟是最安全的通用修復動作",
},
}
def expert_analyze(incident: Incident) -> dict[str, Any]:
"""
Expert System 規則引擎分析
這是 100% 本地執行,永不失敗的保底方案
"""
target = incident.affected_services[0] if incident.affected_services else "unknown-service"
alert_names = " ".join([s.alert_name.lower() for s in incident.signals])
# 匹配規則
matched_rule = "default"
for rule_name, rule in EXPERT_RULES.items():
if rule_name == "default":
continue
if any(pattern in alert_names for pattern in rule["patterns"]):
matched_rule = rule_name
break
rule = EXPERT_RULES[matched_rule]
# 2026-03-29 ogt: Expert System 不應該假裝有高信心分數
# 設為 0.0 強制標記為規則匹配,而非 AI 仲裁
return {
"source": "expert_system",
"action": rule["action"].format(target=target),
"description": rule["description"],
"risk_level": rule["risk_level"],
"reasoning": f"[規則匹配] {rule['reasoning']}", # 明確標示來源
"confidence": 0.0, # 🔴 規則匹配不是 AI 仲裁,信心度設 0
"kubectl_command": rule["action"].format(target=target),
"matched_rule": matched_rule,
"from_cache": False,
"is_rule_based": True, # 新增標記
}
# =============================================================================
# Decision Token (Redis)
# =============================================================================
class DecisionToken:
"""
決策令牌 - 前端持有此 token 即可操作
Redis Key: decision:{token}
TTL: 1 小時
"""
def __init__(
self,
token: str,
incident_id: str,
state: DecisionState,
proposal_data: dict[str, Any] | None = None,
proposal_id: str | None = None,
created_at: datetime | None = None,
updated_at: datetime | None = None,
error: str | None = None,
):
self.token = token
self.incident_id = incident_id
self.state = state
self.proposal_data = proposal_data
self.proposal_id = proposal_id
self.created_at = created_at or datetime.now(UTC)
self.updated_at = updated_at or datetime.now(UTC)
self.error = error
def to_dict(self) -> dict[str, Any]:
return {
"token": self.token,
"incident_id": self.incident_id,
"state": self.state.value,
"proposal_data": self.proposal_data,
"proposal_id": self.proposal_id,
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat(),
"error": self.error,
}
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "DecisionToken":
return cls(
token=data["token"],
incident_id=data["incident_id"],
state=DecisionState(data["state"]),
proposal_data=data.get("proposal_data"),
proposal_id=data.get("proposal_id"),
created_at=datetime.fromisoformat(data["created_at"]) if data.get("created_at") else None,
updated_at=datetime.fromisoformat(data["updated_at"]) if data.get("updated_at") else None,
error=data.get("error"),
)
# =============================================================================
# Protocol Interface (Phase 17 P1 - 紅區治理)
# =============================================================================
@runtime_checkable
class IDecisionManager(Protocol):
"""
DecisionManager 介面定義
用途:
- 依賴注入 (DI) 時的型別約束
- 測試時 Mock 的型別檢查
- 符合 leWOOOgo 積木化規範
Tier 3 紅區服務: 修改需首席架構師簽核
@see feedback_lewooogo_modular_enforcement.md
@see docs/RED_ZONES.md
"""
async def get_or_create_decision(
self,
incident: "Incident",
timeout_sec: float = 30.0,
) -> "DecisionToken":
"""取得或建立決策令牌"""
...
async def mark_executing(self, token: str) -> "DecisionToken | None":
"""標記決策為執行中"""
...
async def mark_completed(self, token: str, result: dict[str, Any] | None = None) -> "DecisionToken | None":
"""標記決策為已完成"""
...
# =============================================================================
# Decision Manager
# =============================================================================
DECISION_TOKEN_PREFIX = "decision:"
DECISION_TOKEN_TTL = 3600 # 1 小時
class DecisionManager:
"""
決策管理器 - Phase 6.5 核心
職責:
1. 為每個 Incident 簽發 decision_token
2. 並行執行 LLM + Expert System
3. First-Win 或 Fallback 策略
4. 確保 UI 永遠有決策可操作
"""
def __init__(self):
self._openclaw = get_openclaw()
# I2 修復 (首席架構師 Review): 注入 KnowledgeService 避免函數內 import 耦合
# 2026-04-04 Claude Code
from src.services.knowledge_service import get_knowledge_service
self._knowledge_svc = get_knowledge_service()
# I2 續修 2026-04-12 Claude Sonnet 4.6 Asia/Taipei: 注入 MCP Providers
# 原 _collect_mcp_context 在函數內 import + 直接實例化,違反積木化鐵律
from src.plugins.mcp.providers.ssh_provider import SSHProvider
from src.plugins.mcp.providers.k8s_provider import K8sProvider
self._ssh = SSHProvider()
self._k8s = K8sProvider()
async def get_or_create_decision(
self,
incident: Incident,
timeout_sec: float = 30.0,
) -> DecisionToken:
"""
取得或建立決策令牌
核心邏輯:
1. 檢查是否已有 token
2. 沒有則建立新 token (INIT)
3. 啟動非同步分析 (ANALYZING)
4. 等待結果或 timeout 後使用 Expert System
這個方法保證在 timeout_sec 內返回有效 token
"""
_redis_client = get_redis()
# ADR-073 Phase 3-1: TYPE-1 triage guard — 純資訊告警跳過 LLM 分析
# classify_alert_early() 已在 webhook 入口設定 notification_type
# TYPE-1 (info/backup/heartbeat) 不需 AI 推理,直接推 Telegram 後返回
# 2026-04-12 ogt
if getattr(incident, "notification_type", None) == "TYPE-1":
_info_token = DecisionToken(
token=f"DEC-{uuid4().hex[:12].upper()}",
incident_id=incident.incident_id,
state=DecisionState.COMPLETED,
proposal_data={
"source": "triage_guard",
"notification_type": "TYPE-1",
"decision_state": "COMPLETED",
"auto_executed": False,
"confidence": 1.0,
"risk_level": "low",
"description": "純資訊通知,無需操作",
},
)
await self._save_token(_info_token)
_fire_and_forget(_push_decision_to_telegram(incident, _info_token.proposal_data))
logger.info(
"decision_type1_bypass",
incident_id=incident.incident_id,
notification_type="TYPE-1",
)
return _info_token
# 1. 檢查現有 token
existing_token = await self._find_existing_token(incident.incident_id)
if existing_token:
# READY 或 EXECUTING 狀態: 直接返回
if existing_token.state in (DecisionState.READY, DecisionState.EXECUTING):
return existing_token
# COMPLETED 狀態: 直接返回,避免重複建立 decision 導致 Telegram 轟炸
if existing_token.state == DecisionState.COMPLETED:
return existing_token
# 2. 建立新 token
token = DecisionToken(
token=f"DEC-{uuid4().hex[:12].upper()}",
incident_id=incident.incident_id,
state=DecisionState.ANALYZING,
)
await self._save_token(token)
logger.info(
"decision_analyzing",
token=token.token,
incident_id=incident.incident_id,
)
# 3. 並行執行雙軌決策
try:
proposal_data = await asyncio.wait_for(
self._dual_engine_analyze(incident),
timeout=timeout_sec,
)
token.state = DecisionState.READY
token.proposal_data = proposal_data
token.updated_at = datetime.now(UTC)
logger.info(
"decision_ready",
token=token.token,
source=proposal_data.get("source", "unknown"),
)
except TimeoutError:
# Timeout: 使用 Expert System 保底
logger.warning(
"decision_timeout_using_expert",
token=token.token,
timeout_sec=timeout_sec,
)
expert_result = expert_analyze(incident)
token.state = DecisionState.READY
token.proposal_data = expert_result
token.updated_at = datetime.now(UTC)
except Exception as e:
# 任何錯誤: 使用 Expert System 保底
logger.exception(
"decision_error_using_expert",
token=token.token,
error=str(e),
)
expert_result = expert_analyze(incident)
token.state = DecisionState.READY
token.proposal_data = expert_result
token.error = str(e)
token.updated_at = datetime.now(UTC)
# 4. 儲存最終結果
await self._save_token(token)
# 5. ADR-030 Phase 4: 自動執行判斷
if token.state == DecisionState.READY and token.proposal_data:
# 評估是否可以自動執行
auto_policy = get_auto_approve_policy()
auto_decision = auto_policy.evaluate(
proposal_data=token.proposal_data,
playbook=token.proposal_data.get("_matched_playbook"), # 如果有
)
if auto_decision.should_auto_approve:
# 自動執行 (跳過人工審核)
logger.info(
"auto_approve_triggered",
incident_id=incident.incident_id,
reason=auto_decision.reason.value,
detail=auto_decision.reason_detail,
)
token.state = DecisionState.EXECUTING
token.proposal_data["auto_approved"] = True
token.proposal_data["auto_approve_reason"] = auto_decision.reason_detail
await self._save_token(token)
# 觸發自動執行 (非阻塞)
_fire_and_forget(
self._auto_execute(incident, token)
)
else:
# 需人工審核: 推送到 Telegram
# ADR-071: 注入 decision_state + auto_executed 供 classify_notification 使用
token.proposal_data["decision_state"] = token.state.value if token.state else ""
token.proposal_data["auto_executed"] = False
_fire_and_forget(
_push_decision_to_telegram(incident, token.proposal_data)
)
return token
async def _auto_execute(self, incident: Incident, token: "DecisionToken") -> None:
"""
ADR-030 Phase 4: 自動執行已批准的操作
僅當 AutoApprovePolicy 判斷可自動執行時呼叫
執行後發 Telegram 結果通知 (統帥要求: 修復結果對應同一告警)
2026-04-09 Claude Sonnet 4.6 Asia/Taipei
"""
action = token.proposal_data.get("kubectl_command", "")
# ADR-073 Phase 3-5: action | parse fix (2026-04-12 ogt)
# LLM 有時輸出 "kubectl rollout restart X | kubectl get pods -n Y"
# | 後面是查詢指令,取第一個才是真正的修復操作
if action and "|" in action:
action = action.split("|")[0].strip()
logger.debug("action_pipe_stripped", incident_id=incident.incident_id, action=action)
# NO_ACTION 規則(備份失敗/E2E smoke test 等)— kubectl_command 為空,不執行,直接返回
# 2026-04-11 Claude Sonnet 4.6: 防止空 action 或 NO_ACTION 字串進入自動執行流程
_suggested_action = token.proposal_data.get("suggested_action", "")
if not action or _suggested_action == "NO_ACTION":
logger.info(
"auto_execute_skipped_no_action",
incident_id=incident.incident_id,
suggested_action=_suggested_action,
reason="規則標記 NO_ACTION 或 kubectl_command 為空,不執行自動修復",
)
token.state = DecisionState.READY
await self._save_token(token)
_fire_and_forget(
_push_decision_to_telegram(incident, token.proposal_data)
)
return
# 替換所有 placeholder — {target}/{namespace}/<deployment_name> 等
_target = incident.affected_services[0] if incident.affected_services else "unknown"
_ns = "awoooi-prod"
if incident.signals:
_ns = incident.signals[0].labels.get("namespace", "awoooi-prod")
import re as _re
# BUG-002 修復 2026-04-11: 主機層告警(HostHighCpuLoad 等)無 component/job/pod label
# → affected_services=[] → target="unknown" → safety guard 攔截
# 補救:用 K8s MCP 依 alertname/host label 動態查詢受影響 Pod
if _target == "unknown":
_target = await _resolve_target_from_k8s(incident, _ns) or "unknown"
action = action.replace("{target}", _target).replace("{namespace}", _ns)
# <xxx> 格式佔位符 → 用 target 替換
action = _re.sub(r"<deployment_name>", _target, action)
action = _re.sub(r"<[^>]+>", _target, action)
# ADR-073 Phase 3-2: infrastructure 告警 (Docker/Host) → SSH MCP routing (2026-04-12 ogt)
# alert_category = "infrastructure" 表示 Docker/Host 告警,不走 K8s executor
# action 格式應為 "docker restart <container>" 或 "systemctl restart <service>"
# P1-1 fix 2026-04-12: 必須在 kubectl safety guard 之前 routing否則 docker 指令被 _action_safe=False 攔截
_alert_category = getattr(incident, "alert_category", None) or ""
if _alert_category == "infrastructure" and action and not action.startswith("kubectl"):
await self._ssh_execute(incident, token, action, _target)
return
# 安全守衛: 替換後仍含 "unknown" 或未替換的 <...>/{...} → 拒絕執行
# 另外:若 target 等於 alertname代表 LLM 把告警名稱填入 deployment_name也拒絕
_alertname = incident.signals[0].labels.get("alertname", "") if incident.signals else ""
_target_is_alertname = bool(_alertname and _target == _alertname)
# P1 fix 2026-04-11: kubectl action 危險字元白名單 — 防止 && || ; > | 注入
_action_safe = bool(_ALLOWED_KUBECTL_PATTERN.match(action.strip()))
if "unknown" in action or _re.search(r"[<{][^>}]+[>}]", action) or _target_is_alertname or not _action_safe:
logger.warning(
"auto_execute_blocked_unresolved_placeholder",
incident_id=incident.incident_id,
action=action,
target=_target,
reason="action 含未解析的 placeholder、unknown、target==alertname、或危險字元拒絕執行",
)
# Safety guard 攔截 → 降級為人工審核,而非「修復失敗」
# 2026-04-11 Claude Sonnet 4.6: 不發 ❌ 失敗訊息,改發人工審核卡片
# 讓統帥看到告警並決定如何處理,而不是重複收到「無法確認 deployment 名稱」
token.state = DecisionState.READY # 回到 READY讓人工審核
token.error = f"Auto-execute blocked (safety guard): {action[:80]}"
token.proposal_data["decision_state"] = DecisionState.READY.value
token.proposal_data["auto_executed"] = False
token.proposal_data["mcp_all_failed"] = True # 標記讓 classify_notification → TYPE-4
await self._save_token(token)
_fire_and_forget(
_push_decision_to_telegram(incident, token.proposal_data)
)
return
# BUG-003 修復 2026-04-11: 加入 K8s deployment 存在性驗證,
# 避免 LLM 產生的無效 deployment name<placeholder>/alertname/unknown通過 safety guard
# 但仍對 K8s 發出錯誤指令
if _target and _target != "unknown":
_k8s_verified = await _verify_k8s_deployment_exists(_target, _ns, alertname=_alertname)
if not _k8s_verified:
logger.warning(
"auto_execute_blocked_deployment_not_found",
incident_id=incident.incident_id,
target=_target,
namespace=_ns,
reason="K8s 中找不到此 deployment/pod拒絕執行",
)
# K8s 找不到 target → 降級為人工審核,同 safety guard 策略
# 2026-04-11 Claude Sonnet 4.6: 不發 ❌ 失敗,改發 TYPE-4 人工審核卡片
token.state = DecisionState.READY
token.error = f"Auto-execute blocked: deployment '{_target}' not found in K8s"
token.proposal_data["decision_state"] = DecisionState.READY.value
token.proposal_data["auto_executed"] = False
token.proposal_data["mcp_all_failed"] = True
await self._save_token(token)
_fire_and_forget(
_push_decision_to_telegram(incident, token.proposal_data)
)
return
try:
# 延遲導入避免循環依賴
from src.models.approval import ApprovalRequest, ApprovalStatus
from src.services.approval_execution import ApprovalExecutionService
# 建立虛擬 ApprovalRequest (auto_execute — 不需人工審核)
_risk = token.proposal_data.get("risk_level", "low")
approval = ApprovalRequest(
incident_id=incident.incident_id,
action=action,
description=token.proposal_data.get("description", action[:100]),
requested_by="auto_approve",
required_signatures=0,
status=ApprovalStatus.APPROVED,
risk_level=_risk,
)
# ADR-071-I: 執行前抓 metrics_before 快照 (2026-04-11 Claude Sonnet 4.6)
_metrics_before = await _fetch_metrics_snapshot(incident)
if _metrics_before:
try:
from src.db.base import get_db_context
from src.db.models import Incident as IncidentORM
from sqlalchemy import update as _sa_update
async with get_db_context() as _db:
await _db.execute(
_sa_update(IncidentORM)
.where(IncidentORM.incident_id == incident.incident_id)
.values(metrics_before=_metrics_before)
)
await _db.commit()
except Exception as _mb_err:
logger.debug("metrics_before_save_failed",
incident_id=incident.incident_id, error=str(_mb_err))
# 執行
executor = ApprovalExecutionService()
await executor.execute_approved_action(approval)
# 更新狀態
token.state = DecisionState.COMPLETED
token.proposal_data["auto_executed"] = True
await self._save_token(token)
logger.info(
"auto_execute_completed",
incident_id=incident.incident_id,
action=approval.action,
)
# 2026-04-09 Claude Sonnet 4.6: 執行成功 → 發 Telegram 結果通知
_fire_and_forget(
_push_auto_repair_result(incident, action, success=True)
)
except Exception as e:
logger.error(
"auto_execute_failed",
incident_id=incident.incident_id,
error=str(e),
)
token.state = DecisionState.ERROR
token.error = f"Auto-execute failed: {e}"
await self._save_token(token)
# 2026-04-09 Claude Sonnet 4.6: 執行失敗 → 發 Telegram 失敗通知 + fallback 人工
_fire_and_forget(
_push_auto_repair_result(incident, action, success=False, error=str(e))
)
_fire_and_forget(
_push_decision_to_telegram(incident, token.proposal_data)
)
async def _query_kb_context_inner(self, incident: Incident) -> str:
"""KB RAG 實際查詢邏輯,由 _query_kb_context 包裝 timeout 後呼叫"""
query_parts = list(incident.affected_services)
if incident.signals:
query_parts.insert(0, getattr(incident.signals[0], "alert_name", ""))
query = " ".join(filter(None, query_parts))
results = await self._knowledge_svc.semantic_search(query, limit=3, threshold=0.4)
if not results:
return ""
lines = ["## Knowledge Base Related Entries (KB RAG)"]
for entry, score in results:
lines.append(
f"\n### [{entry.entry_type}] {entry.title} (similarity={score:.2f})"
)
lines.append(entry.content[:500])
if len(entry.content) > 500:
lines.append("... (truncated)")
logger.info(
"kb_rag_context_injected",
incident_id=incident.incident_id,
kb_hits=len(results),
)
return "\n".join(lines)
async def _query_kb_context(self, incident: Incident) -> str:
"""
KB Phase 2: 語意搜尋相關 KB 條目,組裝為 LLM context 字串
2026-04-04 Claude Code: KB RAG 整合
C1 修復 (首席架構師審查): 5 秒 hard timeout防止 Ollama 慢響應威脅 30s SLA
失敗/timeout 時靜默降級,不影響主分析流程
"""
try:
return await asyncio.wait_for(
self._query_kb_context_inner(incident),
timeout=5.0,
)
except asyncio.TimeoutError:
logger.warning("kb_rag_timeout", incident_id=incident.incident_id)
return ""
except (ConnectionError, OSError) as e:
# Ollama 連線問題,預期可降級
logger.warning("kb_rag_connection_error", incident_id=incident.incident_id, error=str(e))
return ""
except Exception as e:
# 非預期錯誤,用 error 級別方便監控
logger.error("kb_rag_unexpected_error", incident_id=incident.incident_id, error=str(e))
return ""
async def _collect_mcp_context(self, incident: Incident) -> str:
"""
ADR-070 全自動 AIOps: 分析前用 MCP 收集真實環境狀態
讓 LLM 拿到真實資訊做決策,而非只憑 alert labels
策略:
- K8s 告警 → K8s MCP 查 Pod 狀態/事件
- 主機/Docker 告警 → SSH MCP 查容器狀態/資源
2026-04-11 Claude Sonnet 4.6 Asia/Taipei
"""
if not incident.signals:
return ""
labels = incident.signals[0].labels
alertname = labels.get("alertname", "")
host = labels.get("instance", "").split(":")[0] or labels.get("host", "")
# C1 修復 2026-04-11 (Code Review): 修正 Python ternary 優先度問題
# 原: `A or B or C[0] if list else ""` → ternary 控制全式,非僅 C[0]
container = (
labels.get("name")
or labels.get("container")
or (incident.affected_services[0] if incident.affected_services else "")
)
ns = labels.get("namespace", "awoooi-prod")
ctx_parts: list[str] = []
# C2 修復 2026-04-11 (Code Review): 所有 MCP 呼叫加 5s timeout防止阻塞決策主路徑
_MCP_TIMEOUT = 5.0
# 主機/Docker 告警 → SSH MCP 診斷
_HOST_ALERT_PREFIXES = ("Host", "Docker", "Sentry", "Harbor", "Ollama", "Backup")
if alertname.startswith(_HOST_ALERT_PREFIXES) and host:
try:
ssh = self._ssh
# C4: 未知主機記錄 warning不靜默跳過
_KNOWN_HOSTS = ("192.168.0.188", "192.168.0.110")
if ssh.enabled and host not in _KNOWN_HOSTS:
logger.warning("mcp_context_unknown_host", host=host, known=_KNOWN_HOSTS)
if ssh.enabled and host in _KNOWN_HOSTS:
# 查容器狀態
if container and container != alertname:
status_result = await asyncio.wait_for(
ssh.execute(
tool_name="ssh_get_container_status",
params={"host": host, "container_name": container},
),
timeout=_MCP_TIMEOUT,
)
if status_result.get("success"):
ctx_parts.append(f"[SSH] 容器 {container} 狀態: {status_result.get('output', '')[:300]}")
# 查主機資源
if "CpuLoad" in alertname or "Memory" in alertname:
top_result = await asyncio.wait_for(
ssh.execute(
tool_name="ssh_get_top_processes",
params={"host": host, "top_n": 5},
),
timeout=_MCP_TIMEOUT,
)
if top_result.get("success"):
ctx_parts.append(f"[SSH] 主機 {host} Top processes: {top_result.get('output', '')[:300]}")
except asyncio.TimeoutError:
logger.warning("mcp_context_ssh_timeout", alertname=alertname, host=host, timeout=_MCP_TIMEOUT)
except Exception as e:
logger.debug("mcp_context_ssh_failed", alertname=alertname, error=str(e))
# K8s 告警 → K8s MCP 查 Pod 狀態
if alertname.startswith(("Kube", "K3s")) or labels.get("pod"):
try:
k8s = self._k8s
if k8s.enabled:
pod = labels.get("pod", "")
if pod:
events_result = await asyncio.wait_for(
k8s.execute(
tool_name="k8s_get_events",
params={"namespace": ns, "field_selector": f"involvedObject.name={pod}"},
),
timeout=_MCP_TIMEOUT,
)
if events_result.get("success"):
ctx_parts.append(f"[K8s] Pod {pod} 事件: {events_result.get('output', '')[:300]}")
except asyncio.TimeoutError:
logger.warning("mcp_context_k8s_timeout", alertname=alertname, timeout=_MCP_TIMEOUT)
except Exception as e:
logger.debug("mcp_context_k8s_failed", alertname=alertname, error=str(e))
return "\n".join(ctx_parts)
async def _dual_engine_analyze(
self,
incident: Incident,
) -> dict[str, Any]:
"""
三軌決策分析 (Phase 7.5 升級 + KB Phase 2 RAG 整合 + ADR-070 MCP 前置收集)
策略:
1. MCP 前置收集真實環境狀態ADR-070
2. 先檢查 Playbook 是否有高度匹配 (similarity >= 85%)
3. Playbook 命中則直接使用 (最快、經驗驗證)
4. 否則 LLM + Expert System 雙軌 + KB RAG context + MCP context 注入
優先順序: Playbook > LLM > Expert System
"""
# ADR-070: 分析前用 MCP 收集真實環境狀態
mcp_context = await self._collect_mcp_context(incident)
# Phase 7.5: 先嘗試 Playbook 匹配
playbook_result = await self._try_playbook_match(incident)
if playbook_result:
return playbook_result
# MCP Phase 4c: Playbook 無命中 → 非同步產生 AI 草稿 Playbook (2026-04-11 Claude Sonnet 4.6)
_fire_and_forget(_generate_playbook_draft_if_new(incident))
# Expert System 同步執行 (立即可用)
expert_result = expert_analyze(incident)
# KB Phase 2: 語意搜尋相關知識條目 (失敗時靜默降級)
# 2026-04-04 Claude Code: KB RAG 整合,提升 LLM 決策品質
kb_context = await self._query_kb_context(incident)
# LLM 非同步執行 (Phase 22: OpenClaw + Nemotron 協作)
# 2026-03-31 Claude Code: 使用 _with_tools 方法啟用雙軌協作
try:
signals_dict = [s.model_dump() for s in incident.signals]
# 將 KB context + MCP 實時狀態 注入 expert_context 傳給 LLM
# ADR-070: MCP context 優先放最前面,讓 LLM 看到真實環境狀態再做決策
llm_expert_context: dict[str, Any] = {**expert_result} if expert_result else {}
existing = str(llm_expert_context.get("diagnosis_context", ""))
context_parts = []
if mcp_context:
context_parts.append(f"## 當前環境狀態 (MCP 實時查詢)\n{mcp_context}")
if kb_context:
context_parts.append(f"## 相關歷史知識\n{kb_context}")
if existing:
context_parts.append(existing)
if context_parts:
llm_expert_context["diagnosis_context"] = "\n\n".join(context_parts)
llm_result, provider, success = await self._openclaw.generate_incident_proposal_with_tools(
incident_id=incident.incident_id,
severity=incident.severity.value,
signals=signals_dict,
affected_services=incident.affected_services,
expert_context=llm_expert_context if llm_expert_context else None,
)
if success and llm_result:
logger.info(
"dual_engine_llm_win",
incident_id=incident.incident_id,
provider=provider,
kb_rag=bool(kb_context),
)
result = {**llm_result, "source": f"llm_{provider}"}
# ADR-073 Phase 3-6: YAML rule risk_level 優先於 LLM 輸出 (2026-04-12 ogt)
# LLM 有時把 critical 告警估為 mediumYAML 規則是由人工審閱過的,優先採用
try:
from src.services.alert_rule_engine import get_risk_for_alertname
_alertname_for_risk = (
incident.signals[0].labels.get("alertname", "")
if incident.signals else ""
)
if _alertname_for_risk:
_yaml_risk = get_risk_for_alertname(_alertname_for_risk)
if _yaml_risk and _yaml_risk != result.get("risk_level"):
logger.info(
"risk_level_yaml_override",
incident_id=incident.incident_id,
llm_risk=result.get("risk_level"),
yaml_risk=_yaml_risk,
)
result["risk_level"] = _yaml_risk
except Exception as _re:
logger.debug("risk_level_yaml_override_failed", error=str(_re))
# MCP Phase 4a: 信心 < 0.7 → NemoClaw second opinion (2026-04-11 Claude Sonnet 4.6)
_conf = float(result.get("confidence", 1.0))
if _conf < 0.7:
try:
_advisory = await _nemoclaw_second_opinion(incident, result)
if _advisory:
result["advisory_note"] = _advisory
logger.info(
"nemoclaw_second_opinion_added",
incident_id=incident.incident_id,
confidence=_conf,
)
except Exception as _soe:
logger.warning("nemoclaw_second_opinion_failed",
incident_id=incident.incident_id, error=str(_soe))
return result
except Exception as e:
logger.warning(
"dual_engine_llm_failed",
incident_id=incident.incident_id,
error=str(e),
)
# LLM 失敗,使用 Expert System
logger.info(
"dual_engine_expert_fallback",
incident_id=incident.incident_id,
)
return expert_result
async def _try_playbook_match(
self,
incident: Incident,
) -> dict[str, Any] | None:
"""
Phase 7.5: 嘗試 Playbook 匹配
條件:
- 相似度 >= PLAYBOOK_SIMILARITY_THRESHOLD (85%)
- Playbook 狀態為 APPROVED
- 成功率 >= 80% (如果有執行紀錄)
Returns:
匹配成功返回 proposal_data否則 None
"""
try:
playbook_service = get_playbook_service()
# 建構症狀模式
alert_names = [s.alert_name for s in incident.signals] if incident.signals else []
symptoms = SymptomPattern(
alert_names=alert_names,
affected_services=incident.affected_services or [],
severity_range=[incident.severity.value] if incident.severity else ["P2"],
)
# 取得推薦 (只取 Top 1)
recommendations = await playbook_service.get_recommendations(
symptoms=symptoms,
top_k=1,
)
if not recommendations:
logger.debug(
"playbook_no_match",
incident_id=incident.incident_id,
)
return None
best_match = recommendations[0]
playbook = best_match.playbook
# 檢查相似度閾值
if best_match.similarity_score < PLAYBOOK_SIMILARITY_THRESHOLD:
logger.debug(
"playbook_similarity_below_threshold",
incident_id=incident.incident_id,
playbook_id=playbook.playbook_id,
similarity=best_match.similarity_score,
threshold=PLAYBOOK_SIMILARITY_THRESHOLD,
)
return None
# 檢查成功率 (如果有執行紀錄)
if playbook.total_executions > 0 and playbook.success_rate < 0.8:
logger.debug(
"playbook_low_success_rate",
incident_id=incident.incident_id,
playbook_id=playbook.playbook_id,
success_rate=playbook.success_rate,
)
return None
# Playbook 命中!
# 取得第一個修復步驟的指令
kubectl_command = ""
if playbook.repair_steps:
# 將 target 替換為實際服務名稱
target = incident.affected_services[0] if incident.affected_services else "unknown"
kubectl_command = playbook.repair_steps[0].command.format(target=target)
logger.info(
"playbook_match_success",
incident_id=incident.incident_id,
playbook_id=playbook.playbook_id,
playbook_name=playbook.name,
similarity=best_match.similarity_score,
success_rate=playbook.success_rate,
)
return {
"source": "playbook",
"playbook_id": playbook.playbook_id,
"playbook_name": playbook.name,
"action": kubectl_command,
"kubectl_command": kubectl_command,
"description": playbook.description,
"risk_level": playbook.repair_steps[0].risk_level.value.lower() if playbook.repair_steps else "medium",
"reasoning": f"Playbook 匹配 ({best_match.similarity_score:.0%} 相似度, {playbook.success_rate:.0%} 成功率): {best_match.reason}",
"confidence": 0.0, # 🔴 Playbook RAG 匹配不是 AI 分析,信心度設 0
"matched_symptoms": best_match.matched_symptoms,
"from_cache": False,
}
except Exception as e:
logger.warning(
"playbook_match_error",
incident_id=incident.incident_id,
error=str(e),
)
return None
async def _find_existing_token(
self,
incident_id: str,
) -> DecisionToken | None:
"""查找現有的決策令牌"""
redis_client = get_redis()
# 掃描 decision:* 找到匹配的 incident_id
cursor = 0
while True:
cursor, keys = await redis_client.scan(
cursor=cursor,
match=f"{DECISION_TOKEN_PREFIX}*",
count=100,
)
for key in keys:
try:
import json
data = await redis_client.get(key)
if data:
token_data = json.loads(data)
if token_data.get("incident_id") == incident_id:
return DecisionToken.from_dict(token_data)
except Exception:
continue
if cursor == 0:
break
return None
async def _save_token(self, token: DecisionToken) -> None:
"""儲存決策令牌到 Redis"""
import json
redis_client = get_redis()
key = f"{DECISION_TOKEN_PREFIX}{token.token}"
await redis_client.set(
key,
json.dumps(token.to_dict()),
ex=DECISION_TOKEN_TTL,
)
async def get_token(self, token_id: str) -> DecisionToken | None:
"""取得決策令牌"""
import json
redis_client = get_redis()
key = f"{DECISION_TOKEN_PREFIX}{token_id}"
data = await redis_client.get(key)
if data:
return DecisionToken.from_dict(json.loads(data))
return None
async def update_token_state(
self,
token_id: str,
new_state: DecisionState,
proposal_id: str | None = None,
) -> DecisionToken | None:
"""更新決策狀態"""
token = await self.get_token(token_id)
if not token:
return None
token.state = new_state
token.updated_at = datetime.now(UTC)
if proposal_id:
token.proposal_id = proposal_id
await self._save_token(token)
return token
async def get_or_create_decision_with_consensus(
self,
incident: Incident,
timeout_sec: float = 30.0,
use_consensus: bool = True,
) -> DecisionToken:
"""
取得或建立決策令牌 (含 Agent Teams 共識)
Phase 9.4 升級版本:
- 對於 P0/P1 事件,自動啟用 ConsensusEngine
- 整合多專家意見
- 共識分數影響風險評估
Args:
incident: 事件
timeout_sec: 超時秒數
use_consensus: 是否使用共識引擎 (預設 True)
Returns:
DecisionToken
"""
# 判斷是否需要共識 (P0/P1 或明確要求)
should_use_consensus = use_consensus and incident.severity.value in ["P0", "P1"]
if not should_use_consensus:
# 使用原有的雙軌決策
return await self.get_or_create_decision(incident, timeout_sec)
# Phase 9.4: 使用 ConsensusEngine
from src.services.consensus_engine import get_consensus_engine
consensus_engine = get_consensus_engine()
# 檢查現有 token
existing_token = await self._find_existing_token(incident.incident_id)
if existing_token:
# READY 或 EXECUTING 狀態: 直接返回
if existing_token.state in (DecisionState.READY, DecisionState.EXECUTING):
return existing_token
# COMPLETED 狀態: 直接返回,避免重複建立 decision 導致 Telegram 轟炸
if existing_token.state == DecisionState.COMPLETED:
return existing_token
# 建立新 token
token = DecisionToken(
token=f"DEC-{uuid4().hex[:12].upper()}",
incident_id=incident.incident_id,
state=DecisionState.ANALYZING,
)
await self._save_token(token)
logger.info(
"decision_analyzing_with_consensus",
token=token.token,
incident_id=incident.incident_id,
)
try:
# 執行共識分析
consensus_result = await asyncio.wait_for(
consensus_engine.run_consensus(incident, timeout_sec),
timeout=timeout_sec,
)
# 轉換為 proposal_data 格式
proposal_data = {
"source": "consensus_engine",
"consensus_id": consensus_result.consensus_id,
"consensus_score": consensus_result.consensus_score,
"action": consensus_result.recommended_action,
"description": consensus_result.final_reasoning,
"risk_level": consensus_result.risk_level,
"kubectl_command": consensus_result.recommended_kubectl,
"reasoning": consensus_result.final_reasoning,
"confidence": 0.0, # 🔴 Consensus Engine 共識分數不是 AI 信心度,設 0
"agent_count": len(consensus_result.opinions),
"dissenting_opinions": consensus_result.dissenting_opinions,
"from_cache": False,
}
token.state = DecisionState.READY
token.proposal_data = proposal_data
token.updated_at = datetime.now(UTC)
logger.info(
"decision_ready_with_consensus",
token=token.token,
consensus_id=consensus_result.consensus_id,
consensus_score=consensus_result.consensus_score,
)
except TimeoutError:
logger.warning(
"consensus_timeout_using_expert",
token=token.token,
timeout_sec=timeout_sec,
)
# Fallback 到 Expert System
expert_result = expert_analyze(incident)
token.state = DecisionState.READY
token.proposal_data = expert_result
token.updated_at = datetime.now(UTC)
except Exception as e:
logger.exception(
"consensus_error_using_expert",
token=token.token,
error=str(e),
)
expert_result = expert_analyze(incident)
token.state = DecisionState.READY
token.proposal_data = expert_result
token.error = str(e)
token.updated_at = datetime.now(UTC)
await self._save_token(token)
return token
async def resend_stale_ready_tokens(self) -> int:
"""
BUG-005 修復 2026-04-11: 掃描 Redis 中所有 state=ready 且 dedup_key 不存在的 token
重新推送 Telegram 審核卡片。
觸發時機API 啟動lifespan startup+ 管理 API 手動呼叫。
Returns:
重新推送的 token 數量
"""
import json as _json
from src.core.redis_client import get_redis
from src.db.base import get_db_context
from src.repositories.incident_repository import IncidentDBRepository
redis = get_redis()
resent = 0
try:
# 掃描所有 decision:* key
cursor = 0
while True:
cursor, keys = await redis.scan(cursor, match="decision:*", count=200)
for key in keys:
try:
raw = await redis.get(key)
if not raw:
continue
data = _json.loads(raw)
if data.get("state") != DecisionState.READY.value:
continue
incident_id = data.get("incident_id", "")
dedup_key = f"telegram_sent:{incident_id}"
if await redis.exists(dedup_key):
continue # dedup 還在,跳過
# 取 Incident 資料(確認未 resolved
async with get_db_context() as _db:
incident = await IncidentDBRepository(_db).get_by_id(incident_id)
if not incident:
continue
if str(getattr(incident, "status", "")).lower() in ("resolved", "closed"):
continue
proposal_data = data.get("proposal_data") or {}
if not proposal_data:
continue
_fire_and_forget(
_push_decision_to_telegram(incident, proposal_data)
)
resent += 1
logger.info(
"stale_ready_token_resent",
incident_id=incident_id,
token=data.get("token", ""),
)
except Exception as _te:
logger.debug("stale_ready_token_scan_error", error=str(_te))
if cursor == 0:
break
except Exception as e:
logger.warning("resend_stale_ready_tokens_failed", error=str(e))
logger.info("stale_ready_tokens_scan_done", resent=resent)
return resent
async def _ssh_execute(
self,
incident: "Incident",
token: "DecisionToken",
action: str,
target: str,
) -> None:
"""
ADR-073 Phase 3-2: infrastructure 告警 SSH MCP routing
Docker/Host 告警走 SSH MCP Provider不走 K8s executor
2026-04-12 ogt
支援指令:
- docker restart <container>
- systemctl restart <service>
- docker rm -f <container> (含 docker start)
"""
import os as _os
# 取得主機 — 從 instance label 或 SSH_MCP_ALLOWED_HOSTS 第一台
_instance = incident.signals[0].labels.get("instance", "") if incident.signals else ""
_host = _instance.split(":")[0] if ":" in _instance else _instance
_allowed = [h.strip() for h in _os.environ.get("SSH_MCP_ALLOWED_HOSTS", "").split(",") if h.strip()]
if not _host or _host not in _allowed:
_host = _allowed[0] if _allowed else ""
if not _host:
logger.warning(
"ssh_execute_no_host",
incident_id=incident.incident_id,
reason="SSH_MCP_ALLOWED_HOSTS 未設定或 instance label 不在白名單",
)
token.state = DecisionState.READY
token.proposal_data["decision_state"] = DecisionState.READY.value
token.proposal_data["auto_executed"] = False
token.proposal_data["mcp_all_failed"] = True
await self._save_token(token)
_fire_and_forget(_push_decision_to_telegram(incident, token.proposal_data))
return
# 解析 SSH tool + params
_action_lower = action.lower().strip()
if _action_lower.startswith("docker restart"):
_tool = "docker_restart"
_container = target
elif _action_lower.startswith("systemctl restart"):
_tool = "service_restart"
_service = target
else:
logger.info(
"ssh_execute_unknown_action",
incident_id=incident.incident_id,
action=action,
reason="不支援的 SSH action 格式,降級為人工審核",
)
token.state = DecisionState.READY
token.proposal_data["decision_state"] = DecisionState.READY.value
token.proposal_data["auto_executed"] = False
await self._save_token(token)
_fire_and_forget(_push_decision_to_telegram(incident, token.proposal_data))
return
params: dict = {"host": _host}
if _tool == "docker_restart":
params["container"] = _container
else:
params["service"] = _service
try:
result = await self._ssh.execute(tool_name=_tool, parameters=params)
success = result.success
logger.info(
"ssh_execute_result",
incident_id=incident.incident_id,
tool=_tool,
host=_host,
success=success,
output=result.output[:200] if result.output else "",
)
token.state = DecisionState.COMPLETED
token.proposal_data["auto_executed"] = True
await self._save_token(token)
_fire_and_forget(
_push_auto_repair_result(incident, action, success=success)
)
except Exception as e:
logger.error(
"ssh_execute_failed",
incident_id=incident.incident_id,
error=str(e),
)
token.state = DecisionState.READY
token.error = str(e)
token.proposal_data["decision_state"] = DecisionState.READY.value
token.proposal_data["auto_executed"] = False
await self._save_token(token)
_fire_and_forget(_push_decision_to_telegram(incident, token.proposal_data))
# =============================================================================
# Singleton
# =============================================================================
_decision_manager: DecisionManager | None = None
def get_decision_manager() -> DecisionManager:
"""取得 DecisionManager 實例 (Singleton)"""
global _decision_manager
if _decision_manager is None:
_decision_manager = DecisionManager()
return _decision_manager