Files
awoooi/apps/api/src/services/decision_manager.py
OG T 1483218bab
All checks were successful
CD Pipeline / build-and-deploy (push) Successful in 13m9s
feat(approval): 批准/拒絕後立即回應 Telegram + 持久化 message_id 到 DB
問題:按下 TG 批准/拒絕按鈕後完全沒有任何回應,使用者不知道是否成功。
      Telegram message_id 只存 Redis 24h TTL,過期後無法追蹤。

修正:
- approval_records 加 telegram_message_id / telegram_chat_id 欄位(已 ALTER TABLE)
- approval_db.update_telegram_message() — 持久化 message_id 到 DB
- decision_manager: 發送告警卡片後同時寫 Redis + DB
- telegram_gateway._notify_approval_result() — 批准/拒絕後:
    1. editMessageReplyMarkup 移除批准/拒絕按鈕,保留資訊按鈕
    2. sendMessage reply_to 在原訊息下回覆狀態行
    3. fallback: send_notification 發新訊息
- _handle_group_command: chat_id 改為 _chat_id 消除 IDE lint

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-09 18:19:31 +08:00

1115 lines
42 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Decision Manager - Phase 6.5 非同步決策狀態機
=============================================
實作「雙軌決策」(Dual-Engine Decision):
1. OpenClaw LLM (主要) - 智能提案
2. Expert System (備援) - 規則引擎
狀態機:
- INIT: 事件剛建立
- ANALYZING: 正在分析中 (LLM + Expert 並行)
- READY: 決策就緒,等待統帥親核
- EXECUTING: 已授權,正在執行
- COMPLETED: 執行完成
統帥鐵律:
- 永遠不能讓 UI 鎖死
- 30 秒內必須有 decision_token
- LLM 失敗時 Expert System 保底
"""
import asyncio
from datetime import UTC, datetime
from enum import Enum
from typing import Any, Protocol, runtime_checkable
from uuid import uuid4
import structlog
from src.core.config import settings
from src.core.redis_client import get_redis
from src.models.incident import Incident
from src.models.playbook import SymptomPattern
from src.services.auto_approve import get_auto_approve_policy
from src.services.openclaw import get_openclaw
from src.services.playbook_service import get_playbook_service
logger = structlog.get_logger(__name__)
# Phase 7.5: Playbook 優先閾值
PLAYBOOK_SIMILARITY_THRESHOLD = 0.85 # 相似度 >= 85% 直接使用 Playbook
# =============================================================================
# Telegram 推送 (Phase 6.5: 決策就緒通知)
# =============================================================================
async def _push_decision_to_telegram(
incident: Incident,
proposal_data: dict[str, Any],
) -> None:
"""
決策就緒時推送到 Telegram
Phase 6.5: 整合 Signal Worker 流程與 Telegram 通知
2026-03-27 ogt: 加入 Redis 去重機制 (10 分鐘 TTL)
"""
try:
# 延遲導入避免循環依賴
from src.core.redis_client import get_redis
from src.services.telegram_gateway import (
get_telegram_gateway,
)
# 🔴 去重檢查:同一個 incident 10 分鐘內只發一次
redis = get_redis()
dedup_key = f"telegram_sent:{incident.incident_id}"
if await redis.exists(dedup_key):
logger.debug(
"telegram_push_skipped",
reason="Already sent within 10 minutes",
incident_id=incident.incident_id,
)
return
# 2026-04-09 Claude Code: resolved Incident 不重送 Telegram
# 場景: dedup TTL 過期後,已 resolve 的 Incident 仍被重新推送
if incident.status and str(incident.status).lower() in ("resolved", "closed"):
logger.info(
"telegram_push_skipped",
reason="Incident already resolved",
incident_id=incident.incident_id,
)
return
# 🔴 靜默檢查:此資源是否被靜默 (2026-03-27 P1 優化)
target = incident.affected_services[0] if incident.affected_services else "unknown"
silence_key = f"telegram_silence:{target}"
if await redis.exists(silence_key):
logger.info(
"telegram_push_silenced",
reason="Resource is silenced",
incident_id=incident.incident_id,
resource=target,
)
return
# 檢查是否有設定 Bot Token
if not settings.OPENCLAW_TG_BOT_TOKEN:
logger.debug(
"telegram_push_skipped",
reason="Bot token not configured",
incident_id=incident.incident_id,
)
return
gateway = get_telegram_gateway()
# 從 proposal_data 提取資料
import re as _re
def _strip_placeholders(s: str) -> str:
"""移除 <placeholder> 佔位符,避免 Telegram HTML parse 錯誤"""
return _re.sub(r'<[^>]+>', '', s).strip()
target = incident.affected_services[0] if incident.affected_services else "unknown"
risk_level = proposal_data.get("risk_level", "medium")
# 2026-04-09 Claude Code: action 不用 _strip_placeholders避免截掉 deployment name
# <placeholder> 應在 nemotron 補正後已填入真實值
action = proposal_data.get("action", proposal_data.get("kubectl_command", ""))
# 2026-04-09 Claude Code: 修復舊 Incident proposal_data 存 enum string 導致建議空白
# 舊 code 存 action="RESTART_DEPLOYMENT" 而非 kubectl command
# 偵測:無 kubectl/ssh/docker 關鍵字 → 用規則引擎重新查
_KUBECTL_MARKERS = ("kubectl", "ssh", "docker", "systemctl", "/")
if action and not any(m in action for m in _KUBECTL_MARKERS):
# action 是 enum string嘗試用規則引擎補出 kubectl command
try:
from src.services.alert_rule_engine import match_rule as _match_rule
_labels = incident.signals[0].labels if incident.signals else {}
_rule_resp = _match_rule({
"labels": _labels,
"alert_type": _labels.get("alertname", target),
"message": incident.title or "",
"target_resource": target,
"namespace": incident.signals[0].labels.get("namespace", "awoooi-prod") if incident.signals else "awoooi-prod",
"severity": risk_level,
})
if _rule_resp and _rule_resp.get("kubectl_command", "").strip():
action = _rule_resp["kubectl_command"]
except Exception:
pass # 規則引擎失敗不影響通知,保留原 action
description = proposal_data.get("description", "")
reasoning = _strip_placeholders(proposal_data.get("reasoning", ""))
confidence = proposal_data.get("confidence", 0.0) # 🔴 預設 0.0 表示未經 AI 分析
source = proposal_data.get("source", "unknown")
ai_provider = proposal_data.get("provider", "") # 2026-03-29 ogt: AI 模型來源
ai_model = proposal_data.get("model", "") # 2026-04-04 ogt: 底層模型名稱
# 2026-04-02 ogt: Phase 22 Nemotron 協作資料
nemotron_enabled = proposal_data.get("nemotron_enabled", False)
nemotron_tools = proposal_data.get("nemotron_tools")
nemotron_validation = proposal_data.get("nemotron_validation", "")
nemotron_latency_ms = proposal_data.get("nemotron_latency_ms", 0.0)
# 2026-04-09 Claude Sonnet 4.6: Tool Calling 模型/後端
nemotron_tool_model = proposal_data.get("nemotron_tool_model", "")
nemotron_tool_backend = proposal_data.get("nemotron_tool_backend", "")
# 建立 approval_id (使用 incident_id 作為追蹤)
# 2026-03-27 ogt: 修復 INC-INC-INC- 重複前綴 bug
approval_id = incident.incident_id # 已經是 INC-xxx 格式
tg_result = await gateway.send_approval_card(
approval_id=approval_id,
risk_level=risk_level,
resource_name=target[:50],
root_cause=reasoning[:150] if reasoning else description[:150], # 2026-04-03 ogt: 移除 [LLM_xxx] prefix擴大至 150 字
suggested_action=action[:80] if action else "待分析", # 2026-04-03 ogt: 50→80 字
estimated_downtime="5-15 min",
primary_responsibility="INFRA",
confidence=confidence,
namespace=incident.signals[0].labels.get("namespace", "default") if incident.signals else "default",
ai_provider=ai_provider, # 2026-03-29 ogt: 顯示 AI 模型來源
ai_model=ai_model, # 2026-04-04 ogt: 底層模型名稱
# 2026-04-02 ogt: Phase 22 Nemotron 協作 (ADR-044)
nemotron_enabled=nemotron_enabled,
nemotron_tools=nemotron_tools,
nemotron_validation=nemotron_validation,
nemotron_latency_ms=nemotron_latency_ms,
nemotron_tool_model=nemotron_tool_model,
nemotron_tool_backend=nemotron_tool_backend,
# 2026-04-05 Claude Code: 傳入 incident_id 以啟用 detail/reanalyze/history 按鈕
incident_id=incident.incident_id,
)
# 2026-04-09 Claude Sonnet 4.6: 存 message_id → 後續狀態更新在原訊息延續
# 同時寫 Redis (快速查詢) 和 DB (持久化,不受 TTL 限制)
tg_message_id = tg_result.get("result", {}).get("message_id") if isinstance(tg_result, dict) else None
tg_chat_id = tg_result.get("result", {}).get("chat", {}).get("id") if isinstance(tg_result, dict) else None
if tg_message_id:
await redis.setex(f"tg_msg:{incident.incident_id}", 86400, str(tg_message_id))
# 持久化到 DB
try:
from src.services.approval_db import get_approval_service as _get_approval_svc
_approval_svc = _get_approval_svc()
await _approval_svc.update_telegram_message(
incident_id=incident.incident_id,
telegram_message_id=tg_message_id,
telegram_chat_id=tg_chat_id,
)
except Exception as _e:
logger.warning("telegram_message_id_db_save_failed", incident_id=incident.incident_id, error=str(_e))
# 🔴 發送成功後設置去重 key (TTL 10 分鐘)
await redis.setex(dedup_key, 600, "1")
logger.info(
"telegram_decision_pushed",
incident_id=incident.incident_id,
source=source,
risk_level=risk_level,
)
except Exception as e:
# Telegram 失敗不影響主流程
logger.warning(
"telegram_decision_push_failed",
incident_id=incident.incident_id,
error=str(e),
)
async def _push_auto_repair_result(
incident: Incident,
action: str,
success: bool,
error: str = "",
) -> None:
"""
自動修復執行後,在原始告警訊息追加狀態行。
統帥要求: 所有狀態變更必須在原告警訊息延續,不發新訊息。
- append_incident_update() 取 Redis tg_msg:{id} → reply 原訊息 + 換按鈕
- 找不到 message_id 時 fallback 到 send_notification降級
2026-04-09 Claude Sonnet 4.6 Asia/Taipei
"""
try:
from src.services.telegram_gateway import get_telegram_gateway
gateway = get_telegram_gateway()
target = incident.affected_services[0] if incident.affected_services else "unknown"
inc_id = incident.incident_id
if success:
status_line = (
f"✅ <b>自動修復完成</b>\n"
f"└ <code>{action[:100] if action else '已執行'}</code>"
)
else:
status_line = (
f"❌ <b>自動修復失敗,請人工介入</b>\n"
f"├ 動作: <code>{action[:80] if action else '未知'}</code>\n"
f"└ 錯誤: {error[:100] if error else '未知錯誤'}"
)
# 優先: reply 原告警訊息並換掉按鈕
appended = await gateway.append_incident_update(
incident_id=inc_id,
status_line=status_line,
keep_info_buttons=True, # 保留詳情/重診/歷史,移除批准/拒絕
)
# Fallback: 找不到原訊息 ID舊告警或 Redis 過期)→ 發新訊息
if not appended:
fallback_text = (
f"{'' if success else ''} <b>[自動修復{'完成' if success else '失敗'}]</b> "
f"<code>{inc_id}</code>\n"
f"對象: <code>{target[:50]}</code>\n"
f"{status_line}"
)
await gateway.send_notification(fallback_text)
logger.info("auto_repair_result_sent", incident_id=inc_id, success=success, appended=appended)
except Exception as e:
logger.warning("auto_repair_result_push_failed", incident_id=incident.incident_id, error=str(e))
# =============================================================================
# Decision States
# =============================================================================
class DecisionState(str, Enum):
"""決策狀態機"""
INIT = "init" # 事件剛建立
ANALYZING = "analyzing" # 正在分析
READY = "ready" # 決策就緒
EXECUTING = "executing" # 正在執行
COMPLETED = "completed" # 已完成
ERROR = "error" # 錯誤
# =============================================================================
# Expert System - 規則引擎 (Local Fallback)
# =============================================================================
EXPERT_RULES: dict[str, dict[str, Any]] = {
# Pod 崩潰 → 重啟
"pod_crash": {
"patterns": ["crash", "restart", "oom", "killed", "failed"],
"action": "kubectl rollout restart deployment/{target}",
"description": "Expert System: 偵測到 Pod 異常,建議重啟部署",
"risk_level": "medium",
"reasoning": "根據歷史數據,重啟可解決 85% 的 Pod 崩潰問題",
},
# 高延遲 → 擴容
"high_latency": {
"patterns": ["latency", "slow", "timeout", "p99"],
"action": "kubectl scale deployment/{target} --replicas=3",
"description": "Expert System: 偵測到高延遲,建議擴容至 3 副本",
"risk_level": "low",
"reasoning": "擴容可分散負載,降低單一 Pod 壓力",
},
# 高錯誤率 → 回滾
"high_error_rate": {
"patterns": ["error", "5xx", "fail", "exception"],
"action": "kubectl rollout undo deployment/{target}",
"description": "Expert System: 偵測到高錯誤率,建議回滾至上一版",
"risk_level": "critical",
"reasoning": "錯誤率突增通常源自最近部署,回滾是最快修復方式",
},
# 資源耗盡 → 擴容
"resource_exhaustion": {
"patterns": ["cpu", "memory", "resource", "quota"],
"action": "kubectl scale deployment/{target} --replicas=2",
"description": "Expert System: 偵測到資源耗盡,建議擴容",
"risk_level": "medium",
"reasoning": "增加副本可分散資源壓力",
},
# 預設 → 重啟 (最保守)
"default": {
"patterns": [],
"action": "kubectl rollout restart deployment/{target}",
"description": "Expert System: 無法確定具體問題,建議安全重啟",
"risk_level": "medium",
"reasoning": "重啟是最安全的通用修復動作",
},
}
def expert_analyze(incident: Incident) -> dict[str, Any]:
"""
Expert System 規則引擎分析
這是 100% 本地執行,永不失敗的保底方案
"""
target = incident.affected_services[0] if incident.affected_services else "unknown-service"
alert_names = " ".join([s.alert_name.lower() for s in incident.signals])
# 匹配規則
matched_rule = "default"
for rule_name, rule in EXPERT_RULES.items():
if rule_name == "default":
continue
if any(pattern in alert_names for pattern in rule["patterns"]):
matched_rule = rule_name
break
rule = EXPERT_RULES[matched_rule]
# 2026-03-29 ogt: Expert System 不應該假裝有高信心分數
# 設為 0.0 強制標記為規則匹配,而非 AI 仲裁
return {
"source": "expert_system",
"action": rule["action"].format(target=target),
"description": rule["description"],
"risk_level": rule["risk_level"],
"reasoning": f"[規則匹配] {rule['reasoning']}", # 明確標示來源
"confidence": 0.0, # 🔴 規則匹配不是 AI 仲裁,信心度設 0
"kubectl_command": rule["action"].format(target=target),
"matched_rule": matched_rule,
"from_cache": False,
"is_rule_based": True, # 新增標記
}
# =============================================================================
# Decision Token (Redis)
# =============================================================================
class DecisionToken:
"""
決策令牌 - 前端持有此 token 即可操作
Redis Key: decision:{token}
TTL: 1 小時
"""
def __init__(
self,
token: str,
incident_id: str,
state: DecisionState,
proposal_data: dict[str, Any] | None = None,
proposal_id: str | None = None,
created_at: datetime | None = None,
updated_at: datetime | None = None,
error: str | None = None,
):
self.token = token
self.incident_id = incident_id
self.state = state
self.proposal_data = proposal_data
self.proposal_id = proposal_id
self.created_at = created_at or datetime.now(UTC)
self.updated_at = updated_at or datetime.now(UTC)
self.error = error
def to_dict(self) -> dict[str, Any]:
return {
"token": self.token,
"incident_id": self.incident_id,
"state": self.state.value,
"proposal_data": self.proposal_data,
"proposal_id": self.proposal_id,
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat(),
"error": self.error,
}
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "DecisionToken":
return cls(
token=data["token"],
incident_id=data["incident_id"],
state=DecisionState(data["state"]),
proposal_data=data.get("proposal_data"),
proposal_id=data.get("proposal_id"),
created_at=datetime.fromisoformat(data["created_at"]) if data.get("created_at") else None,
updated_at=datetime.fromisoformat(data["updated_at"]) if data.get("updated_at") else None,
error=data.get("error"),
)
# =============================================================================
# Protocol Interface (Phase 17 P1 - 紅區治理)
# =============================================================================
@runtime_checkable
class IDecisionManager(Protocol):
"""
DecisionManager 介面定義
用途:
- 依賴注入 (DI) 時的型別約束
- 測試時 Mock 的型別檢查
- 符合 leWOOOgo 積木化規範
Tier 3 紅區服務: 修改需首席架構師簽核
@see feedback_lewooogo_modular_enforcement.md
@see docs/RED_ZONES.md
"""
async def get_or_create_decision(
self,
incident: "Incident",
timeout_sec: float = 30.0,
) -> "DecisionToken":
"""取得或建立決策令牌"""
...
async def mark_executing(self, token: str) -> "DecisionToken | None":
"""標記決策為執行中"""
...
async def mark_completed(self, token: str, result: dict[str, Any] | None = None) -> "DecisionToken | None":
"""標記決策為已完成"""
...
# =============================================================================
# Decision Manager
# =============================================================================
DECISION_TOKEN_PREFIX = "decision:"
DECISION_TOKEN_TTL = 3600 # 1 小時
class DecisionManager:
"""
決策管理器 - Phase 6.5 核心
職責:
1. 為每個 Incident 簽發 decision_token
2. 並行執行 LLM + Expert System
3. First-Win 或 Fallback 策略
4. 確保 UI 永遠有決策可操作
"""
def __init__(self):
self._openclaw = get_openclaw()
# I2 修復 (首席架構師 Review): 注入 KnowledgeService 避免函數內 import 耦合
# 2026-04-04 Claude Code
from src.services.knowledge_service import get_knowledge_service
self._knowledge_svc = get_knowledge_service()
async def get_or_create_decision(
self,
incident: Incident,
timeout_sec: float = 30.0,
) -> DecisionToken:
"""
取得或建立決策令牌
核心邏輯:
1. 檢查是否已有 token
2. 沒有則建立新 token (INIT)
3. 啟動非同步分析 (ANALYZING)
4. 等待結果或 timeout 後使用 Expert System
這個方法保證在 timeout_sec 內返回有效 token
"""
_redis_client = get_redis()
# 1. 檢查現有 token
existing_token = await self._find_existing_token(incident.incident_id)
if existing_token:
# READY 或 EXECUTING 狀態: 直接返回
if existing_token.state in (DecisionState.READY, DecisionState.EXECUTING):
return existing_token
# COMPLETED 狀態: 直接返回,避免重複建立 decision 導致 Telegram 轟炸
if existing_token.state == DecisionState.COMPLETED:
return existing_token
# 2. 建立新 token
token = DecisionToken(
token=f"DEC-{uuid4().hex[:12].upper()}",
incident_id=incident.incident_id,
state=DecisionState.ANALYZING,
)
await self._save_token(token)
logger.info(
"decision_analyzing",
token=token.token,
incident_id=incident.incident_id,
)
# 3. 並行執行雙軌決策
try:
proposal_data = await asyncio.wait_for(
self._dual_engine_analyze(incident),
timeout=timeout_sec,
)
token.state = DecisionState.READY
token.proposal_data = proposal_data
token.updated_at = datetime.now(UTC)
logger.info(
"decision_ready",
token=token.token,
source=proposal_data.get("source", "unknown"),
)
except TimeoutError:
# Timeout: 使用 Expert System 保底
logger.warning(
"decision_timeout_using_expert",
token=token.token,
timeout_sec=timeout_sec,
)
expert_result = expert_analyze(incident)
token.state = DecisionState.READY
token.proposal_data = expert_result
token.updated_at = datetime.now(UTC)
except Exception as e:
# 任何錯誤: 使用 Expert System 保底
logger.exception(
"decision_error_using_expert",
token=token.token,
error=str(e),
)
expert_result = expert_analyze(incident)
token.state = DecisionState.READY
token.proposal_data = expert_result
token.error = str(e)
token.updated_at = datetime.now(UTC)
# 4. 儲存最終結果
await self._save_token(token)
# 5. ADR-030 Phase 4: 自動執行判斷
if token.state == DecisionState.READY and token.proposal_data:
# 評估是否可以自動執行
auto_policy = get_auto_approve_policy()
auto_decision = auto_policy.evaluate(
proposal_data=token.proposal_data,
playbook=token.proposal_data.get("_matched_playbook"), # 如果有
)
if auto_decision.should_auto_approve:
# 自動執行 (跳過人工審核)
logger.info(
"auto_approve_triggered",
incident_id=incident.incident_id,
reason=auto_decision.reason.value,
detail=auto_decision.reason_detail,
)
token.state = DecisionState.EXECUTING
token.proposal_data["auto_approved"] = True
token.proposal_data["auto_approve_reason"] = auto_decision.reason_detail
await self._save_token(token)
# 觸發自動執行 (非阻塞)
asyncio.create_task(
self._auto_execute(incident, token)
)
else:
# 需人工審核: 推送到 Telegram
asyncio.create_task(
_push_decision_to_telegram(incident, token.proposal_data)
)
return token
async def _auto_execute(self, incident: Incident, token: "DecisionToken") -> None:
"""
ADR-030 Phase 4: 自動執行已批准的操作
僅當 AutoApprovePolicy 判斷可自動執行時呼叫
執行後發 Telegram 結果通知 (統帥要求: 修復結果對應同一告警)
2026-04-09 Claude Sonnet 4.6 Asia/Taipei
"""
action = token.proposal_data.get("kubectl_command", "")
try:
# 延遲導入避免循環依賴
from src.models.approval import ApprovalRequest, ApprovalStatus
from src.services.approval_execution import ApprovalExecutionService
# 建立虛擬 ApprovalRequest
approval = ApprovalRequest(
incident_id=incident.incident_id,
action=action,
status=ApprovalStatus.APPROVED,
risk_level=token.proposal_data.get("risk_level", "low"),
)
# 執行
executor = ApprovalExecutionService()
await executor.execute_approved_action(approval)
# 更新狀態
token.state = DecisionState.COMPLETED
token.proposal_data["auto_executed"] = True
await self._save_token(token)
logger.info(
"auto_execute_completed",
incident_id=incident.incident_id,
action=approval.action,
)
# 2026-04-09 Claude Sonnet 4.6: 執行成功 → 發 Telegram 結果通知
asyncio.create_task(
_push_auto_repair_result(incident, action, success=True)
)
except Exception as e:
logger.error(
"auto_execute_failed",
incident_id=incident.incident_id,
error=str(e),
)
token.state = DecisionState.ERROR
token.error = f"Auto-execute failed: {e}"
await self._save_token(token)
# 2026-04-09 Claude Sonnet 4.6: 執行失敗 → 發 Telegram 失敗通知 + fallback 人工
asyncio.create_task(
_push_auto_repair_result(incident, action, success=False, error=str(e))
)
asyncio.create_task(
_push_decision_to_telegram(incident, token.proposal_data)
)
async def _query_kb_context_inner(self, incident: Incident) -> str:
"""KB RAG 實際查詢邏輯,由 _query_kb_context 包裝 timeout 後呼叫"""
query_parts = list(incident.affected_services)
if incident.signals:
query_parts.insert(0, getattr(incident.signals[0], "alert_name", ""))
query = " ".join(filter(None, query_parts))
results = await self._knowledge_svc.semantic_search(query, limit=3, threshold=0.4)
if not results:
return ""
lines = ["## Knowledge Base Related Entries (KB RAG)"]
for entry, score in results:
lines.append(
f"\n### [{entry.entry_type}] {entry.title} (similarity={score:.2f})"
)
lines.append(entry.content[:500])
if len(entry.content) > 500:
lines.append("... (truncated)")
logger.info(
"kb_rag_context_injected",
incident_id=incident.incident_id,
kb_hits=len(results),
)
return "\n".join(lines)
async def _query_kb_context(self, incident: Incident) -> str:
"""
KB Phase 2: 語意搜尋相關 KB 條目,組裝為 LLM context 字串
2026-04-04 Claude Code: KB RAG 整合
C1 修復 (首席架構師審查): 5 秒 hard timeout防止 Ollama 慢響應威脅 30s SLA
失敗/timeout 時靜默降級,不影響主分析流程
"""
try:
return await asyncio.wait_for(
self._query_kb_context_inner(incident),
timeout=5.0,
)
except asyncio.TimeoutError:
logger.warning("kb_rag_timeout", incident_id=incident.incident_id)
return ""
except (ConnectionError, OSError) as e:
# Ollama 連線問題,預期可降級
logger.warning("kb_rag_connection_error", incident_id=incident.incident_id, error=str(e))
return ""
except Exception as e:
# 非預期錯誤,用 error 級別方便監控
logger.error("kb_rag_unexpected_error", incident_id=incident.incident_id, error=str(e))
return ""
async def _dual_engine_analyze(
self,
incident: Incident,
) -> dict[str, Any]:
"""
三軌決策分析 (Phase 7.5 升級 + KB Phase 2 RAG 整合)
策略:
1. 先檢查 Playbook 是否有高度匹配 (similarity >= 85%)
2. Playbook 命中則直接使用 (最快、經驗驗證)
3. 否則 LLM + Expert System 雙軌 + KB RAG context 注入
優先順序: Playbook > LLM > Expert System
"""
# Phase 7.5: 先嘗試 Playbook 匹配
playbook_result = await self._try_playbook_match(incident)
if playbook_result:
return playbook_result
# Expert System 同步執行 (立即可用)
expert_result = expert_analyze(incident)
# KB Phase 2: 語意搜尋相關知識條目 (失敗時靜默降級)
# 2026-04-04 Claude Code: KB RAG 整合,提升 LLM 決策品質
kb_context = await self._query_kb_context(incident)
# LLM 非同步執行 (Phase 22: OpenClaw + Nemotron 協作)
# 2026-03-31 Claude Code: 使用 _with_tools 方法啟用雙軌協作
try:
signals_dict = [s.model_dump() for s in incident.signals]
# 將 KB context 注入 expert_context 傳給 LLM
llm_expert_context: dict[str, Any] = {**expert_result} if expert_result else {}
if kb_context:
existing = str(llm_expert_context.get("diagnosis_context", ""))
llm_expert_context["diagnosis_context"] = (
f"{kb_context}\n\n{existing}" if existing else kb_context
)
llm_result, provider, success = await self._openclaw.generate_incident_proposal_with_tools(
incident_id=incident.incident_id,
severity=incident.severity.value,
signals=signals_dict,
affected_services=incident.affected_services,
expert_context=llm_expert_context if llm_expert_context else None,
)
if success and llm_result:
logger.info(
"dual_engine_llm_win",
incident_id=incident.incident_id,
provider=provider,
kb_rag=bool(kb_context),
)
return {
**llm_result,
"source": f"llm_{provider}",
}
except Exception as e:
logger.warning(
"dual_engine_llm_failed",
incident_id=incident.incident_id,
error=str(e),
)
# LLM 失敗,使用 Expert System
logger.info(
"dual_engine_expert_fallback",
incident_id=incident.incident_id,
)
return expert_result
async def _try_playbook_match(
self,
incident: Incident,
) -> dict[str, Any] | None:
"""
Phase 7.5: 嘗試 Playbook 匹配
條件:
- 相似度 >= PLAYBOOK_SIMILARITY_THRESHOLD (85%)
- Playbook 狀態為 APPROVED
- 成功率 >= 80% (如果有執行紀錄)
Returns:
匹配成功返回 proposal_data否則 None
"""
try:
playbook_service = get_playbook_service()
# 建構症狀模式
alert_names = [s.alert_name for s in incident.signals] if incident.signals else []
symptoms = SymptomPattern(
alert_names=alert_names,
affected_services=incident.affected_services or [],
severity_range=[incident.severity.value] if incident.severity else ["P2"],
)
# 取得推薦 (只取 Top 1)
recommendations = await playbook_service.get_recommendations(
symptoms=symptoms,
top_k=1,
)
if not recommendations:
logger.debug(
"playbook_no_match",
incident_id=incident.incident_id,
)
return None
best_match = recommendations[0]
playbook = best_match.playbook
# 檢查相似度閾值
if best_match.similarity_score < PLAYBOOK_SIMILARITY_THRESHOLD:
logger.debug(
"playbook_similarity_below_threshold",
incident_id=incident.incident_id,
playbook_id=playbook.playbook_id,
similarity=best_match.similarity_score,
threshold=PLAYBOOK_SIMILARITY_THRESHOLD,
)
return None
# 檢查成功率 (如果有執行紀錄)
if playbook.total_executions > 0 and playbook.success_rate < 0.8:
logger.debug(
"playbook_low_success_rate",
incident_id=incident.incident_id,
playbook_id=playbook.playbook_id,
success_rate=playbook.success_rate,
)
return None
# Playbook 命中!
# 取得第一個修復步驟的指令
kubectl_command = ""
if playbook.repair_steps:
# 將 target 替換為實際服務名稱
target = incident.affected_services[0] if incident.affected_services else "unknown"
kubectl_command = playbook.repair_steps[0].command.format(target=target)
logger.info(
"playbook_match_success",
incident_id=incident.incident_id,
playbook_id=playbook.playbook_id,
playbook_name=playbook.name,
similarity=best_match.similarity_score,
success_rate=playbook.success_rate,
)
return {
"source": "playbook",
"playbook_id": playbook.playbook_id,
"playbook_name": playbook.name,
"action": kubectl_command,
"kubectl_command": kubectl_command,
"description": playbook.description,
"risk_level": playbook.repair_steps[0].risk_level.value.lower() if playbook.repair_steps else "medium",
"reasoning": f"Playbook 匹配 ({best_match.similarity_score:.0%} 相似度, {playbook.success_rate:.0%} 成功率): {best_match.reason}",
"confidence": 0.0, # 🔴 Playbook RAG 匹配不是 AI 分析,信心度設 0
"matched_symptoms": best_match.matched_symptoms,
"from_cache": False,
}
except Exception as e:
logger.warning(
"playbook_match_error",
incident_id=incident.incident_id,
error=str(e),
)
return None
async def _find_existing_token(
self,
incident_id: str,
) -> DecisionToken | None:
"""查找現有的決策令牌"""
redis_client = get_redis()
# 掃描 decision:* 找到匹配的 incident_id
cursor = 0
while True:
cursor, keys = await redis_client.scan(
cursor=cursor,
match=f"{DECISION_TOKEN_PREFIX}*",
count=100,
)
for key in keys:
try:
import json
data = await redis_client.get(key)
if data:
token_data = json.loads(data)
if token_data.get("incident_id") == incident_id:
return DecisionToken.from_dict(token_data)
except Exception:
continue
if cursor == 0:
break
return None
async def _save_token(self, token: DecisionToken) -> None:
"""儲存決策令牌到 Redis"""
import json
redis_client = get_redis()
key = f"{DECISION_TOKEN_PREFIX}{token.token}"
await redis_client.set(
key,
json.dumps(token.to_dict()),
ex=DECISION_TOKEN_TTL,
)
async def get_token(self, token_id: str) -> DecisionToken | None:
"""取得決策令牌"""
import json
redis_client = get_redis()
key = f"{DECISION_TOKEN_PREFIX}{token_id}"
data = await redis_client.get(key)
if data:
return DecisionToken.from_dict(json.loads(data))
return None
async def update_token_state(
self,
token_id: str,
new_state: DecisionState,
proposal_id: str | None = None,
) -> DecisionToken | None:
"""更新決策狀態"""
token = await self.get_token(token_id)
if not token:
return None
token.state = new_state
token.updated_at = datetime.now(UTC)
if proposal_id:
token.proposal_id = proposal_id
await self._save_token(token)
return token
async def get_or_create_decision_with_consensus(
self,
incident: Incident,
timeout_sec: float = 30.0,
use_consensus: bool = True,
) -> DecisionToken:
"""
取得或建立決策令牌 (含 Agent Teams 共識)
Phase 9.4 升級版本:
- 對於 P0/P1 事件,自動啟用 ConsensusEngine
- 整合多專家意見
- 共識分數影響風險評估
Args:
incident: 事件
timeout_sec: 超時秒數
use_consensus: 是否使用共識引擎 (預設 True)
Returns:
DecisionToken
"""
# 判斷是否需要共識 (P0/P1 或明確要求)
should_use_consensus = use_consensus and incident.severity.value in ["P0", "P1"]
if not should_use_consensus:
# 使用原有的雙軌決策
return await self.get_or_create_decision(incident, timeout_sec)
# Phase 9.4: 使用 ConsensusEngine
from src.services.consensus_engine import get_consensus_engine
consensus_engine = get_consensus_engine()
# 檢查現有 token
existing_token = await self._find_existing_token(incident.incident_id)
if existing_token:
# READY 或 EXECUTING 狀態: 直接返回
if existing_token.state in (DecisionState.READY, DecisionState.EXECUTING):
return existing_token
# COMPLETED 狀態: 直接返回,避免重複建立 decision 導致 Telegram 轟炸
if existing_token.state == DecisionState.COMPLETED:
return existing_token
# 建立新 token
token = DecisionToken(
token=f"DEC-{uuid4().hex[:12].upper()}",
incident_id=incident.incident_id,
state=DecisionState.ANALYZING,
)
await self._save_token(token)
logger.info(
"decision_analyzing_with_consensus",
token=token.token,
incident_id=incident.incident_id,
)
try:
# 執行共識分析
consensus_result = await asyncio.wait_for(
consensus_engine.run_consensus(incident, timeout_sec),
timeout=timeout_sec,
)
# 轉換為 proposal_data 格式
proposal_data = {
"source": "consensus_engine",
"consensus_id": consensus_result.consensus_id,
"consensus_score": consensus_result.consensus_score,
"action": consensus_result.recommended_action,
"description": consensus_result.final_reasoning,
"risk_level": consensus_result.risk_level,
"kubectl_command": consensus_result.recommended_kubectl,
"reasoning": consensus_result.final_reasoning,
"confidence": 0.0, # 🔴 Consensus Engine 共識分數不是 AI 信心度,設 0
"agent_count": len(consensus_result.opinions),
"dissenting_opinions": consensus_result.dissenting_opinions,
"from_cache": False,
}
token.state = DecisionState.READY
token.proposal_data = proposal_data
token.updated_at = datetime.now(UTC)
logger.info(
"decision_ready_with_consensus",
token=token.token,
consensus_id=consensus_result.consensus_id,
consensus_score=consensus_result.consensus_score,
)
except TimeoutError:
logger.warning(
"consensus_timeout_using_expert",
token=token.token,
timeout_sec=timeout_sec,
)
# Fallback 到 Expert System
expert_result = expert_analyze(incident)
token.state = DecisionState.READY
token.proposal_data = expert_result
token.updated_at = datetime.now(UTC)
except Exception as e:
logger.exception(
"consensus_error_using_expert",
token=token.token,
error=str(e),
)
expert_result = expert_analyze(incident)
token.state = DecisionState.READY
token.proposal_data = expert_result
token.error = str(e)
token.updated_at = datetime.now(UTC)
await self._save_token(token)
return token
# =============================================================================
# Singleton
# =============================================================================
_decision_manager: DecisionManager | None = None
def get_decision_manager() -> DecisionManager:
"""取得 DecisionManager 實例 (Singleton)"""
global _decision_manager
if _decision_manager is None:
_decision_manager = DecisionManager()
return _decision_manager