Files
awoooi/apps/api/src/services/decision_manager.py
OG T d148756b67 feat(api): LLM 整合 Expert System 診斷上下文
長期方案實作: Expert 診斷 + LLM 智能分析

變更:
1. decision_manager._dual_engine_analyze():
   - 測試資源跳過 LLM (省錢)
   - 傳遞 Expert 診斷上下文給 LLM
   - LLM 失敗時根據診斷調整回應

2. openclaw.generate_incident_proposal():
   - 新增 expert_context 參數
   - Prompt 包含 Expert 診斷結果
   - 引導 LLM 基於診斷做決策

流程:
Playbook → Expert診斷 → LLM(with context) → 智能建議

這是「先診斷根因,再決定行動」的正確實作

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-03-26 21:41:26 +08:00

992 lines
36 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Decision Manager - Phase 6.5 非同步決策狀態機
=============================================
實作「雙軌決策」(Dual-Engine Decision):
1. OpenClaw LLM (主要) - 智能提案
2. Expert System (備援) - 規則引擎
狀態機:
- INIT: 事件剛建立
- ANALYZING: 正在分析中 (LLM + Expert 並行)
- READY: 決策就緒,等待統帥親核
- EXECUTING: 已授權,正在執行
- COMPLETED: 執行完成
統帥鐵律:
- 永遠不能讓 UI 鎖死
- 30 秒內必須有 decision_token
- LLM 失敗時 Expert System 保底
"""
import asyncio
from datetime import UTC, datetime
from enum import Enum
from typing import Any, Protocol, runtime_checkable
from uuid import uuid4
import structlog
from src.core.config import settings
from src.core.redis_client import get_redis
from src.models.incident import Incident
from src.models.playbook import SymptomPattern
from src.services.openclaw import get_openclaw
from src.services.playbook_service import get_playbook_service
logger = structlog.get_logger(__name__)
# Phase 7.5: Playbook 優先閾值
PLAYBOOK_SIMILARITY_THRESHOLD = 0.85 # 相似度 >= 85% 直接使用 Playbook
# =============================================================================
# Telegram 推送 (Phase 6.5: 決策就緒通知)
# =============================================================================
async def _push_decision_to_telegram(
incident: Incident,
proposal_data: dict[str, Any],
) -> None:
"""
決策就緒時推送到 Telegram
Phase 6.5: 整合 Signal Worker 流程與 Telegram 通知
"""
try:
# 延遲導入避免循環依賴
from src.services.telegram_gateway import (
get_telegram_gateway,
)
# 2026-03-26 修復: 防止重複發送 Telegram (每 incident 10 分鐘只發一次)
redis_client = get_redis()
dedup_key = f"telegram_sent:{incident.incident_id}"
already_sent = await redis_client.get(dedup_key)
if already_sent:
logger.debug(
"telegram_push_skipped_dedup",
incident_id=incident.incident_id,
reason="Already sent within 10 minutes",
)
return
# 檢查是否有設定 Bot Token
if not settings.OPENCLAW_TG_BOT_TOKEN:
logger.debug(
"telegram_push_skipped",
reason="Bot token not configured",
incident_id=incident.incident_id,
)
return
gateway = get_telegram_gateway()
# 從 proposal_data 提取資料
target = incident.affected_services[0] if incident.affected_services else "unknown"
risk_level = proposal_data.get("risk_level", "medium")
action = proposal_data.get("action", proposal_data.get("kubectl_command", ""))
description = proposal_data.get("description", "")
reasoning = proposal_data.get("reasoning", "")
confidence = proposal_data.get("confidence", 0.75)
source = proposal_data.get("source", "unknown")
# 2026-03-26 修復: incident_id 已有 INC- 前綴,不要再加
approval_id = incident.incident_id
await gateway.send_approval_card(
approval_id=approval_id,
risk_level=risk_level,
resource_name=target[:50],
root_cause=f"[{source.upper()}] {reasoning[:80]}" if reasoning else description[:100],
suggested_action=action[:50] if action else "待分析",
estimated_downtime="5-15 min",
primary_responsibility="INFRA",
confidence=confidence,
namespace=incident.signals[0].labels.get("namespace", "default") if incident.signals else "default",
)
# 2026-03-26 修復: 標記已發送10 分鐘內不再發送
await redis_client.set(dedup_key, "1", ex=600)
logger.info(
"telegram_decision_pushed",
incident_id=incident.incident_id,
source=source,
risk_level=risk_level,
)
except Exception as e:
# Telegram 失敗不影響主流程
logger.warning(
"telegram_decision_push_failed",
incident_id=incident.incident_id,
error=str(e),
)
# =============================================================================
# Decision States
# =============================================================================
class DecisionState(str, Enum):
"""決策狀態機"""
INIT = "init" # 事件剛建立
ANALYZING = "analyzing" # 正在分析
READY = "ready" # 決策就緒
EXECUTING = "executing" # 正在執行
COMPLETED = "completed" # 已完成
ERROR = "error" # 錯誤
# =============================================================================
# Expert System - 規則引擎 (Local Fallback)
# =============================================================================
# 2026-03-27 重構: 分層診斷 + 根因優先 + 避免盲目重啟
#
# 設計原則:
# 1. 診斷優先於修復 - 先了解問題再行動
# 2. 測試資源忽略 - 避免處理臨時測試告警
# 3. 根因導向 - 提供診斷指令而非直接重啟
# 4. 人工判斷 - 未知問題建議人工介入
# =============================================================================
# 測試資源黑名單 (自動忽略)
TEST_RESOURCE_PATTERNS = [
"test", "demo", "tmp", "temp", "debug", "dev-",
"sandbox", "experiment", "trial", "mock",
]
EXPERT_RULES: dict[str, dict[str, Any]] = {
# ========== 第一類: 明確根因的自動修復 ==========
# OOM Kill → 建議增加記憶體限制 (非重啟)
"oom_killed": {
"patterns": ["oomkill", "oom", "out of memory", "memory limit"],
"action": "kubectl describe pod {target} -n awoooi-prod | grep -A5 'Last State'",
"description": "偵測到 OOM Kill建議檢查記憶體用量後調整 limits",
"risk_level": "medium",
"reasoning": "OOM 通常是記憶體 limits 不足或記憶體洩漏,重啟無法解決根因",
"diagnosis_commands": [
"kubectl top pod {target} -n awoooi-prod",
"kubectl logs {target} -n awoooi-prod --tail=100 | grep -i memory",
],
},
# CrashLoopBackOff → 查日誌找根因 (非重啟)
"crash_loop": {
"patterns": ["crashloop", "backoff", "crash loop"],
"action": "kubectl logs {target} -n awoooi-prod --previous --tail=50",
"description": "偵測到 CrashLoopBackOff需查看崩潰日誌找根因",
"risk_level": "high",
"reasoning": "CrashLoop 表示容器持續崩潰,重啟無效,需從日誌找根因",
"diagnosis_commands": [
"kubectl describe pod {target} -n awoooi-prod | grep -A10 'Events'",
"kubectl logs {target} -n awoooi-prod --previous",
],
},
# ImagePullBackOff → 檢查映像名稱 (非重啟)
"image_pull_error": {
"patterns": ["imagepull", "pull error", "image not found", "errimagepull"],
"action": "kubectl describe pod {target} -n awoooi-prod | grep -A5 'Events'",
"description": "偵測到映像拉取失敗,需檢查映像名稱或 Registry 連線",
"risk_level": "high",
"reasoning": "映像問題需修正配置或檢查 Harbor 連線,重啟無法解決",
"diagnosis_commands": [
"kubectl get pod {target} -n awoooi-prod -o jsonpath='{.spec.containers[*].image}'",
],
},
# ========== 第二類: 可能需要擴容的情況 ==========
# 高 CPU 使用率 → 先診斷是否正常負載
"high_cpu": {
"patterns": ["cpu", "high cpu", "cpu throttl"],
"action": "kubectl top pod -n awoooi-prod -l app={target_app}",
"description": "偵測到高 CPU建議先確認是否為正常負載高峰",
"risk_level": "low",
"reasoning": "CPU 高可能是正常負載,需先診斷再決定是否擴容",
"diagnosis_commands": [
"kubectl top pod -n awoooi-prod",
"kubectl get hpa -n awoooi-prod",
],
},
# 高延遲 → 先診斷瓶頸在哪
"high_latency": {
"patterns": ["latency", "slow", "p99", "p95"],
"action": "kubectl logs -n awoooi-prod -l app={target_app} --tail=50 | grep -E 'latency|slow|timeout'",
"description": "偵測到高延遲,建議先診斷瓶頸位置",
"risk_level": "medium",
"reasoning": "延遲可能來自 DB、外部 API 或代碼,需診斷後對症下藥",
"diagnosis_commands": [
"查看 SignOz Trace: http://192.168.0.188:3301/traces",
],
},
# ========== 第三類: 需要謹慎的高風險操作 ==========
# 高錯誤率 → 建議查日誌,回滾需人工確認
"high_error_rate": {
"patterns": ["error rate", "5xx", "500 error", "exception rate"],
"action": "kubectl logs -n awoooi-prod -l app={target_app} --tail=100 | grep -i error",
"description": "偵測到高錯誤率,建議先查日誌確認錯誤類型",
"risk_level": "high",
"reasoning": "錯誤原因多樣,需先診斷是代碼問題還是依賴服務問題",
"diagnosis_commands": [
"查看 Sentry: http://192.168.0.110:9000",
"kubectl logs -n awoooi-prod -l app={target_app} | grep -i exception",
],
"human_review_required": True,
},
# ========== 第四類: 已確認可安全重啟的情況 ==========
# 明確的 Pod 異常 (非 CrashLoop)
"pod_unhealthy": {
"patterns": ["unhealthy", "not ready", "readiness", "liveness"],
"action": "kubectl rollout restart deployment/{target_app} -n awoooi-prod",
"description": "Pod 健康檢查失敗,重啟可能解決",
"risk_level": "medium",
"reasoning": "健康檢查失敗且非 CrashLoop重啟通常有效",
},
# ========== 預設: 不要盲目重啟,建議人工診斷 ==========
"default": {
"patterns": [],
"action": "kubectl describe pod {target} -n awoooi-prod",
"description": "無法自動判斷問題類型,建議人工查看詳情後決定",
"risk_level": "low",
"reasoning": "未知問題不應盲目重啟,需人工判斷根因",
"diagnosis_commands": [
"kubectl get events -n awoooi-prod --sort-by='.lastTimestamp' | tail -20",
"kubectl logs -n awoooi-prod {target} --tail=50",
],
"human_review_required": True,
},
}
def expert_analyze(incident: Incident) -> dict[str, Any]:
"""
Expert System 規則引擎分析
2026-03-27 重構:
- 分層診斷 (測試資源過濾 → 規則匹配 → 診斷指令)
- 根因優先 (提供診斷指令而非盲目重啟)
- 人工判斷標記 (未知問題標記需人工介入)
這是 100% 本地執行,永不失敗的保底方案
"""
target = incident.affected_services[0] if incident.affected_services else "unknown-service"
target_lower = target.lower()
# 從 target 提取 app 名稱 (去除 pod hash)
# e.g., "awoooi-api-649986569-2sgch" → "awoooi-api"
target_app = "-".join(target.split("-")[:2]) if "-" in target else target
alert_names = " ".join([s.alert_name.lower() for s in incident.signals])
all_text = f"{alert_names} {target_lower}"
# ========== 第一層: 測試資源過濾 ==========
is_test_resource = any(pattern in target_lower for pattern in TEST_RESOURCE_PATTERNS)
if is_test_resource:
return {
"source": "expert_system",
"action": "# 測試資源,建議忽略或手動清理",
"description": f"偵測到測試資源 ({target}),建議確認是否需要清理",
"risk_level": "low",
"reasoning": "測試資源告警通常是臨時性的,不需要自動修復",
"confidence": 0.9,
"kubectl_command": f"kubectl delete pod {target} -n awoooi-prod --grace-period=0",
"matched_rule": "test_resource_filter",
"from_cache": False,
"human_review_required": True,
"is_test_resource": True,
}
# ========== 第二層: 規則匹配 ==========
matched_rule = "default"
for rule_name, rule in EXPERT_RULES.items():
if rule_name == "default":
continue
if any(pattern in all_text for pattern in rule["patterns"]):
matched_rule = rule_name
break
rule = EXPERT_RULES[matched_rule]
# 格式化指令 (支援 {target} 和 {target_app})
format_vars = {"target": target, "target_app": target_app}
action = rule["action"].format(**format_vars)
# 格式化診斷指令
diagnosis_commands = []
if "diagnosis_commands" in rule:
diagnosis_commands = [
cmd.format(**format_vars) if "{" in cmd else cmd
for cmd in rule["diagnosis_commands"]
]
# ========== 第三層: 建構回應 ==========
result = {
"source": "expert_system",
"action": action,
"description": rule["description"],
"risk_level": rule["risk_level"],
"reasoning": rule["reasoning"],
"confidence": 0.75 if matched_rule != "default" else 0.5,
"kubectl_command": action,
"matched_rule": matched_rule,
"from_cache": False,
}
# 新增診斷指令 (如果有)
if diagnosis_commands:
result["diagnosis_commands"] = diagnosis_commands
# 標記是否需要人工審查
if rule.get("human_review_required"):
result["human_review_required"] = True
result["description"] += " (建議人工確認)"
return result
# =============================================================================
# Decision Token (Redis)
# =============================================================================
class DecisionToken:
"""
決策令牌 - 前端持有此 token 即可操作
Redis Key: decision:{token}
TTL: 1 小時
"""
def __init__(
self,
token: str,
incident_id: str,
state: DecisionState,
proposal_data: dict[str, Any] | None = None,
proposal_id: str | None = None,
created_at: datetime | None = None,
updated_at: datetime | None = None,
error: str | None = None,
):
self.token = token
self.incident_id = incident_id
self.state = state
self.proposal_data = proposal_data
self.proposal_id = proposal_id
self.created_at = created_at or datetime.now(UTC)
self.updated_at = updated_at or datetime.now(UTC)
self.error = error
def to_dict(self) -> dict[str, Any]:
return {
"token": self.token,
"incident_id": self.incident_id,
"state": self.state.value,
"proposal_data": self.proposal_data,
"proposal_id": self.proposal_id,
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat(),
"error": self.error,
}
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "DecisionToken":
return cls(
token=data["token"],
incident_id=data["incident_id"],
state=DecisionState(data["state"]),
proposal_data=data.get("proposal_data"),
proposal_id=data.get("proposal_id"),
created_at=datetime.fromisoformat(data["created_at"]) if data.get("created_at") else None,
updated_at=datetime.fromisoformat(data["updated_at"]) if data.get("updated_at") else None,
error=data.get("error"),
)
# =============================================================================
# Protocol Interface (Phase 17 P1 - 紅區治理)
# =============================================================================
@runtime_checkable
class IDecisionManager(Protocol):
"""
DecisionManager 介面定義
用途:
- 依賴注入 (DI) 時的型別約束
- 測試時 Mock 的型別檢查
- 符合 leWOOOgo 積木化規範
Tier 3 紅區服務: 修改需首席架構師簽核
@see feedback_lewooogo_modular_enforcement.md
@see docs/RED_ZONES.md
"""
async def get_or_create_decision(
self,
incident: "Incident",
timeout_sec: float = 30.0,
) -> "DecisionToken":
"""取得或建立決策令牌"""
...
async def mark_executing(self, token: str) -> "DecisionToken | None":
"""標記決策為執行中"""
...
async def mark_completed(self, token: str, result: dict[str, Any] | None = None) -> "DecisionToken | None":
"""標記決策為已完成"""
...
# =============================================================================
# Decision Manager
# =============================================================================
DECISION_TOKEN_PREFIX = "decision:"
DECISION_TOKEN_TTL = 3600 # 1 小時
class DecisionManager:
"""
決策管理器 - Phase 6.5 核心
職責:
1. 為每個 Incident 簽發 decision_token
2. 並行執行 LLM + Expert System
3. First-Win 或 Fallback 策略
4. 確保 UI 永遠有決策可操作
"""
def __init__(self):
self._openclaw = get_openclaw()
async def get_or_create_decision(
self,
incident: Incident,
timeout_sec: float = 30.0,
) -> DecisionToken:
"""
取得或建立決策令牌
核心邏輯:
1. 檢查是否已有 token
2. 沒有則建立新 token (INIT)
3. 啟動非同步分析 (ANALYZING)
4. 等待結果或 timeout 後使用 Expert System
這個方法保證在 timeout_sec 內返回有效 token
"""
_redis_client = get_redis()
# 1. 檢查現有 token
existing_token = await self._find_existing_token(incident.incident_id)
if existing_token:
# READY 或 EXECUTING 狀態: 直接返回
if existing_token.state in (DecisionState.READY, DecisionState.EXECUTING):
return existing_token
# COMPLETED 狀態: 只有 incident 也已解決才返回,否則創建新 decision
# 修復: 避免 incident 未解決但 decision 已完成導致 Y/n 按鈕永久禁用
if existing_token.state == DecisionState.COMPLETED:
from src.models.incident import IncidentStatus
if incident.status in (IncidentStatus.RESOLVED, IncidentStatus.CLOSED):
return existing_token
# incident 仍在處理中,需要新的 decision
logger.info(
"decision_reset_for_active_incident",
token=existing_token.token,
incident_id=incident.incident_id,
incident_status=incident.status.value,
)
# 2. 建立新 token
token = DecisionToken(
token=f"DEC-{uuid4().hex[:12].upper()}",
incident_id=incident.incident_id,
state=DecisionState.ANALYZING,
)
await self._save_token(token)
logger.info(
"decision_analyzing",
token=token.token,
incident_id=incident.incident_id,
)
# 3. 並行執行雙軌決策
try:
proposal_data = await asyncio.wait_for(
self._dual_engine_analyze(incident),
timeout=timeout_sec,
)
token.state = DecisionState.READY
token.proposal_data = proposal_data
token.updated_at = datetime.now(UTC)
logger.info(
"decision_ready",
token=token.token,
source=proposal_data.get("source", "unknown"),
)
except TimeoutError:
# Timeout: 使用 Expert System 保底
logger.warning(
"decision_timeout_using_expert",
token=token.token,
timeout_sec=timeout_sec,
)
expert_result = expert_analyze(incident)
token.state = DecisionState.READY
token.proposal_data = expert_result
token.updated_at = datetime.now(UTC)
except Exception as e:
# 任何錯誤: 使用 Expert System 保底
logger.exception(
"decision_error_using_expert",
token=token.token,
error=str(e),
)
expert_result = expert_analyze(incident)
token.state = DecisionState.READY
token.proposal_data = expert_result
token.error = str(e)
token.updated_at = datetime.now(UTC)
# 4. 儲存最終結果
await self._save_token(token)
# 5. Phase 6.5: 推送到 Telegram (非阻塞)
if token.state == DecisionState.READY and token.proposal_data:
# 使用 asyncio.create_task 非阻塞執行
asyncio.create_task(
_push_decision_to_telegram(incident, token.proposal_data)
)
return token
async def _dual_engine_analyze(
self,
incident: Incident,
) -> dict[str, Any]:
"""
三軌決策分析 (Phase 7.5 升級 + 2026-03-27 智能診斷重構)
策略:
1. 先檢查 Playbook 是否有高度匹配 (similarity >= 85%)
2. Playbook 命中則直接使用 (最快、經驗驗證)
3. Expert System 提供初步診斷 (分類 + 診斷指令)
4. LLM 基於診斷上下文提供智能建議
5. LLM 失敗時,根據 Expert 診斷決定是否需人工介入
優先順序: Playbook > LLM(with Expert context) > Expert System
"""
# Phase 7.5: 先嘗試 Playbook 匹配
playbook_result = await self._try_playbook_match(incident)
if playbook_result:
return playbook_result
# ========== 2026-03-27 重構: 分層智能診斷 ==========
# Step 1: Expert System 提供初步診斷 (永不失敗)
expert_result = expert_analyze(incident)
# Step 2: 測試資源直接返回 (不浪費 LLM 呼叫)
if expert_result.get("is_test_resource"):
logger.info(
"dual_engine_test_resource_skip",
incident_id=incident.incident_id,
target=incident.affected_services[0] if incident.affected_services else "unknown",
)
return expert_result
# Step 3: 準備 LLM 上下文 (含 Expert 診斷)
signals_dict = [s.model_dump() for s in incident.signals]
expert_context = {
"initial_diagnosis": expert_result.get("matched_rule"),
"diagnosis_description": expert_result.get("description"),
"suggested_diagnosis_commands": expert_result.get("diagnosis_commands", []),
"expert_confidence": expert_result.get("confidence"),
"requires_human_review": expert_result.get("human_review_required", False),
}
# Step 4: LLM 分析 (帶上 Expert 上下文)
try:
llm_result, provider, success = await self._openclaw.generate_incident_proposal(
incident_id=incident.incident_id,
severity=incident.severity.value,
signals=signals_dict,
affected_services=incident.affected_services,
expert_context=expert_context, # 傳遞 Expert 診斷上下文
)
if success and llm_result:
logger.info(
"dual_engine_llm_win",
incident_id=incident.incident_id,
provider=provider,
expert_rule=expert_result.get("matched_rule"),
)
return {
**llm_result,
"source": f"llm_{provider}",
"expert_diagnosis": expert_result.get("matched_rule"),
}
except Exception as e:
logger.warning(
"dual_engine_llm_failed",
incident_id=incident.incident_id,
error=str(e),
expert_rule=expert_result.get("matched_rule"),
)
# Step 5: LLM 失敗,使用 Expert System 結果
# 但根據診斷結果調整回應
logger.info(
"dual_engine_expert_fallback",
incident_id=incident.incident_id,
expert_rule=expert_result.get("matched_rule"),
human_review=expert_result.get("human_review_required", False),
)
# 如果 Expert 標記需人工介入,降低 confidence
if expert_result.get("human_review_required"):
expert_result["confidence"] = min(expert_result.get("confidence", 0.5), 0.5)
expert_result["description"] += " [LLM 分析失敗,建議人工確認]"
return expert_result
async def _try_playbook_match(
self,
incident: Incident,
) -> dict[str, Any] | None:
"""
Phase 7.5: 嘗試 Playbook 匹配
條件:
- 相似度 >= PLAYBOOK_SIMILARITY_THRESHOLD (85%)
- Playbook 狀態為 APPROVED
- 成功率 >= 80% (如果有執行紀錄)
Returns:
匹配成功返回 proposal_data否則 None
"""
try:
playbook_service = get_playbook_service()
# 建構症狀模式
alert_names = [s.alert_name for s in incident.signals] if incident.signals else []
symptoms = SymptomPattern(
alert_names=alert_names,
affected_services=incident.affected_services or [],
severity_range=[incident.severity.value] if incident.severity else ["P2"],
)
# 取得推薦 (只取 Top 1)
recommendations = await playbook_service.get_recommendations(
symptoms=symptoms,
top_k=1,
)
if not recommendations:
logger.debug(
"playbook_no_match",
incident_id=incident.incident_id,
)
return None
best_match = recommendations[0]
playbook = best_match.playbook
# 檢查相似度閾值
if best_match.similarity_score < PLAYBOOK_SIMILARITY_THRESHOLD:
logger.debug(
"playbook_similarity_below_threshold",
incident_id=incident.incident_id,
playbook_id=playbook.playbook_id,
similarity=best_match.similarity_score,
threshold=PLAYBOOK_SIMILARITY_THRESHOLD,
)
return None
# 檢查成功率 (如果有執行紀錄)
if playbook.total_executions > 0 and playbook.success_rate < 0.8:
logger.debug(
"playbook_low_success_rate",
incident_id=incident.incident_id,
playbook_id=playbook.playbook_id,
success_rate=playbook.success_rate,
)
return None
# Playbook 命中!
# 取得第一個修復步驟的指令
kubectl_command = ""
if playbook.repair_steps:
# 將 target 替換為實際服務名稱
target = incident.affected_services[0] if incident.affected_services else "unknown"
kubectl_command = playbook.repair_steps[0].command.format(target=target)
logger.info(
"playbook_match_success",
incident_id=incident.incident_id,
playbook_id=playbook.playbook_id,
playbook_name=playbook.name,
similarity=best_match.similarity_score,
success_rate=playbook.success_rate,
)
return {
"source": "playbook",
"playbook_id": playbook.playbook_id,
"playbook_name": playbook.name,
"action": kubectl_command,
"kubectl_command": kubectl_command,
"description": playbook.description,
"risk_level": playbook.repair_steps[0].risk_level.value.lower() if playbook.repair_steps else "medium",
"reasoning": f"Playbook 匹配 ({best_match.similarity_score:.0%} 相似度, {playbook.success_rate:.0%} 成功率): {best_match.reason}",
"confidence": min(best_match.similarity_score, playbook.success_rate) if playbook.total_executions > 0 else best_match.similarity_score,
"matched_symptoms": best_match.matched_symptoms,
"from_cache": False,
}
except Exception as e:
logger.warning(
"playbook_match_error",
incident_id=incident.incident_id,
error=str(e),
)
return None
async def _find_existing_token(
self,
incident_id: str,
) -> DecisionToken | None:
"""查找現有的決策令牌"""
redis_client = get_redis()
# 掃描 decision:* 找到匹配的 incident_id
cursor = 0
while True:
cursor, keys = await redis_client.scan(
cursor=cursor,
match=f"{DECISION_TOKEN_PREFIX}*",
count=100,
)
for key in keys:
try:
import json
data = await redis_client.get(key)
if data:
token_data = json.loads(data)
if token_data.get("incident_id") == incident_id:
return DecisionToken.from_dict(token_data)
except Exception:
continue
if cursor == 0:
break
return None
async def _save_token(self, token: DecisionToken) -> None:
"""儲存決策令牌到 Redis"""
import json
redis_client = get_redis()
key = f"{DECISION_TOKEN_PREFIX}{token.token}"
await redis_client.set(
key,
json.dumps(token.to_dict()),
ex=DECISION_TOKEN_TTL,
)
async def get_token(self, token_id: str) -> DecisionToken | None:
"""取得決策令牌"""
import json
redis_client = get_redis()
key = f"{DECISION_TOKEN_PREFIX}{token_id}"
data = await redis_client.get(key)
if data:
return DecisionToken.from_dict(json.loads(data))
return None
async def update_token_state(
self,
token_id: str,
new_state: DecisionState,
proposal_id: str | None = None,
) -> DecisionToken | None:
"""更新決策狀態"""
token = await self.get_token(token_id)
if not token:
return None
token.state = new_state
token.updated_at = datetime.now(UTC)
if proposal_id:
token.proposal_id = proposal_id
await self._save_token(token)
return token
async def get_or_create_decision_with_consensus(
self,
incident: Incident,
timeout_sec: float = 30.0,
use_consensus: bool = True,
) -> DecisionToken:
"""
取得或建立決策令牌 (含 Agent Teams 共識)
Phase 9.4 升級版本:
- 對於 P0/P1 事件,自動啟用 ConsensusEngine
- 整合多專家意見
- 共識分數影響風險評估
Args:
incident: 事件
timeout_sec: 超時秒數
use_consensus: 是否使用共識引擎 (預設 True)
Returns:
DecisionToken
"""
# 判斷是否需要共識 (P0/P1 或明確要求)
should_use_consensus = use_consensus and incident.severity.value in ["P0", "P1"]
if not should_use_consensus:
# 使用原有的雙軌決策
return await self.get_or_create_decision(incident, timeout_sec)
# Phase 9.4: 使用 ConsensusEngine
from src.services.consensus_engine import get_consensus_engine
consensus_engine = get_consensus_engine()
# 檢查現有 token
existing_token = await self._find_existing_token(incident.incident_id)
if existing_token:
# READY 或 EXECUTING 狀態: 直接返回
if existing_token.state in (DecisionState.READY, DecisionState.EXECUTING):
return existing_token
# COMPLETED 狀態: 只有 incident 也已解決才返回
if existing_token.state == DecisionState.COMPLETED:
from src.models.incident import IncidentStatus
if incident.status in (IncidentStatus.RESOLVED, IncidentStatus.CLOSED):
return existing_token
logger.info(
"decision_reset_for_active_incident_consensus",
token=existing_token.token,
incident_id=incident.incident_id,
incident_status=incident.status.value,
)
# 建立新 token
token = DecisionToken(
token=f"DEC-{uuid4().hex[:12].upper()}",
incident_id=incident.incident_id,
state=DecisionState.ANALYZING,
)
await self._save_token(token)
logger.info(
"decision_analyzing_with_consensus",
token=token.token,
incident_id=incident.incident_id,
)
try:
# 執行共識分析
consensus_result = await asyncio.wait_for(
consensus_engine.run_consensus(incident, timeout_sec),
timeout=timeout_sec,
)
# 轉換為 proposal_data 格式
proposal_data = {
"source": "consensus_engine",
"consensus_id": consensus_result.consensus_id,
"consensus_score": consensus_result.consensus_score,
"action": consensus_result.recommended_action,
"description": consensus_result.final_reasoning,
"risk_level": consensus_result.risk_level,
"kubectl_command": consensus_result.recommended_kubectl,
"reasoning": consensus_result.final_reasoning,
"confidence": consensus_result.consensus_score,
"agent_count": len(consensus_result.opinions),
"dissenting_opinions": consensus_result.dissenting_opinions,
"from_cache": False,
}
token.state = DecisionState.READY
token.proposal_data = proposal_data
token.updated_at = datetime.now(UTC)
logger.info(
"decision_ready_with_consensus",
token=token.token,
consensus_id=consensus_result.consensus_id,
consensus_score=consensus_result.consensus_score,
)
except TimeoutError:
logger.warning(
"consensus_timeout_using_expert",
token=token.token,
timeout_sec=timeout_sec,
)
# Fallback 到 Expert System
expert_result = expert_analyze(incident)
token.state = DecisionState.READY
token.proposal_data = expert_result
token.updated_at = datetime.now(UTC)
except Exception as e:
logger.exception(
"consensus_error_using_expert",
token=token.token,
error=str(e),
)
expert_result = expert_analyze(incident)
token.state = DecisionState.READY
token.proposal_data = expert_result
token.error = str(e)
token.updated_at = datetime.now(UTC)
await self._save_token(token)
return token
# =============================================================================
# Singleton
# =============================================================================
_decision_manager: DecisionManager | None = None
def get_decision_manager() -> DecisionManager:
"""取得 DecisionManager 實例 (Singleton)"""
global _decision_manager
if _decision_manager is None:
_decision_manager = DecisionManager()
return _decision_manager