Files
awoooi/apps/api/src/services/decision_manager.py
OG T 765ee39a90 feat(api): Phase 6.5 Statistics API + Y/n 按鈕修復
新增:
- /stats/incidents/summary - 事件總覽統計
- /stats/incidents/resolution - 解決時間 P50/P95
- /stats/ai-performance - AI 提案效能
- /stats/services/affected - 受影響服務排名

修復:
- Y/n 按鈕永久禁用問題 (decision.state=completed 但 incident 未解決)
- decision_manager.py: 只有當 incident 也已解決才返回已完成的 decision

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-03-24 09:50:03 +08:00

658 lines
22 KiB
Python

"""
Decision Manager - Phase 6.5 非同步決策狀態機
=============================================
實作「雙軌決策」(Dual-Engine Decision):
1. OpenClaw LLM (主要) - 智能提案
2. Expert System (備援) - 規則引擎
狀態機:
- INIT: 事件剛建立
- ANALYZING: 正在分析中 (LLM + Expert 並行)
- READY: 決策就緒,等待統帥親核
- EXECUTING: 已授權,正在執行
- COMPLETED: 執行完成
統帥鐵律:
- 永遠不能讓 UI 鎖死
- 30 秒內必須有 decision_token
- LLM 失敗時 Expert System 保底
"""
import asyncio
from datetime import UTC, datetime
from enum import Enum
from typing import Any
from uuid import uuid4
import structlog
from src.core.config import settings
from src.core.redis_client import get_redis
from src.models.incident import Incident
from src.services.openclaw import get_openclaw
logger = structlog.get_logger(__name__)
# =============================================================================
# Telegram 推送 (Phase 6.5: 決策就緒通知)
# =============================================================================
async def _push_decision_to_telegram(
incident: Incident,
proposal_data: dict[str, Any],
) -> None:
"""
決策就緒時推送到 Telegram
Phase 6.5: 整合 Signal Worker 流程與 Telegram 通知
"""
try:
# 延遲導入避免循環依賴
from src.services.telegram_gateway import (
get_telegram_gateway,
)
# 檢查是否有設定 Bot Token
if not settings.OPENCLAW_TG_BOT_TOKEN:
logger.debug(
"telegram_push_skipped",
reason="Bot token not configured",
incident_id=incident.incident_id,
)
return
gateway = get_telegram_gateway()
# 從 proposal_data 提取資料
target = incident.affected_services[0] if incident.affected_services else "unknown"
risk_level = proposal_data.get("risk_level", "medium")
action = proposal_data.get("action", proposal_data.get("kubectl_command", ""))
description = proposal_data.get("description", "")
reasoning = proposal_data.get("reasoning", "")
confidence = proposal_data.get("confidence", 0.75)
source = proposal_data.get("source", "unknown")
# 建立 approval_id (使用 incident_id 作為追蹤)
approval_id = f"INC-{incident.incident_id}"
await gateway.send_approval_card(
approval_id=approval_id,
risk_level=risk_level,
resource_name=target[:50],
root_cause=f"[{source.upper()}] {reasoning[:80]}" if reasoning else description[:100],
suggested_action=action[:50] if action else "待分析",
estimated_downtime="5-15 min",
primary_responsibility="INFRA",
confidence=confidence,
namespace=incident.signals[0].labels.get("namespace", "default") if incident.signals else "default",
)
logger.info(
"telegram_decision_pushed",
incident_id=incident.incident_id,
source=source,
risk_level=risk_level,
)
except Exception as e:
# Telegram 失敗不影響主流程
logger.warning(
"telegram_decision_push_failed",
incident_id=incident.incident_id,
error=str(e),
)
# =============================================================================
# Decision States
# =============================================================================
class DecisionState(str, Enum):
"""決策狀態機"""
INIT = "init" # 事件剛建立
ANALYZING = "analyzing" # 正在分析
READY = "ready" # 決策就緒
EXECUTING = "executing" # 正在執行
COMPLETED = "completed" # 已完成
ERROR = "error" # 錯誤
# =============================================================================
# Expert System - 規則引擎 (Local Fallback)
# =============================================================================
EXPERT_RULES: dict[str, dict[str, Any]] = {
# Pod 崩潰 → 重啟
"pod_crash": {
"patterns": ["crash", "restart", "oom", "killed", "failed"],
"action": "kubectl rollout restart deployment/{target}",
"description": "Expert System: 偵測到 Pod 異常,建議重啟部署",
"risk_level": "medium",
"reasoning": "根據歷史數據,重啟可解決 85% 的 Pod 崩潰問題",
},
# 高延遲 → 擴容
"high_latency": {
"patterns": ["latency", "slow", "timeout", "p99"],
"action": "kubectl scale deployment/{target} --replicas=3",
"description": "Expert System: 偵測到高延遲,建議擴容至 3 副本",
"risk_level": "low",
"reasoning": "擴容可分散負載,降低單一 Pod 壓力",
},
# 高錯誤率 → 回滾
"high_error_rate": {
"patterns": ["error", "5xx", "fail", "exception"],
"action": "kubectl rollout undo deployment/{target}",
"description": "Expert System: 偵測到高錯誤率,建議回滾至上一版",
"risk_level": "critical",
"reasoning": "錯誤率突增通常源自最近部署,回滾是最快修復方式",
},
# 資源耗盡 → 擴容
"resource_exhaustion": {
"patterns": ["cpu", "memory", "resource", "quota"],
"action": "kubectl scale deployment/{target} --replicas=2",
"description": "Expert System: 偵測到資源耗盡,建議擴容",
"risk_level": "medium",
"reasoning": "增加副本可分散資源壓力",
},
# 預設 → 重啟 (最保守)
"default": {
"patterns": [],
"action": "kubectl rollout restart deployment/{target}",
"description": "Expert System: 無法確定具體問題,建議安全重啟",
"risk_level": "medium",
"reasoning": "重啟是最安全的通用修復動作",
},
}
def expert_analyze(incident: Incident) -> dict[str, Any]:
"""
Expert System 規則引擎分析
這是 100% 本地執行,永不失敗的保底方案
"""
target = incident.affected_services[0] if incident.affected_services else "unknown-service"
alert_names = " ".join([s.alert_name.lower() for s in incident.signals])
# 匹配規則
matched_rule = "default"
for rule_name, rule in EXPERT_RULES.items():
if rule_name == "default":
continue
if any(pattern in alert_names for pattern in rule["patterns"]):
matched_rule = rule_name
break
rule = EXPERT_RULES[matched_rule]
return {
"source": "expert_system",
"action": rule["action"].format(target=target),
"description": rule["description"],
"risk_level": rule["risk_level"],
"reasoning": rule["reasoning"],
"confidence": 0.75, # Expert System 固定信心分數
"kubectl_command": rule["action"].format(target=target),
"matched_rule": matched_rule,
"from_cache": False,
}
# =============================================================================
# Decision Token (Redis)
# =============================================================================
class DecisionToken:
"""
決策令牌 - 前端持有此 token 即可操作
Redis Key: decision:{token}
TTL: 1 小時
"""
def __init__(
self,
token: str,
incident_id: str,
state: DecisionState,
proposal_data: dict[str, Any] | None = None,
proposal_id: str | None = None,
created_at: datetime | None = None,
updated_at: datetime | None = None,
error: str | None = None,
):
self.token = token
self.incident_id = incident_id
self.state = state
self.proposal_data = proposal_data
self.proposal_id = proposal_id
self.created_at = created_at or datetime.now(UTC)
self.updated_at = updated_at or datetime.now(UTC)
self.error = error
def to_dict(self) -> dict[str, Any]:
return {
"token": self.token,
"incident_id": self.incident_id,
"state": self.state.value,
"proposal_data": self.proposal_data,
"proposal_id": self.proposal_id,
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat(),
"error": self.error,
}
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "DecisionToken":
return cls(
token=data["token"],
incident_id=data["incident_id"],
state=DecisionState(data["state"]),
proposal_data=data.get("proposal_data"),
proposal_id=data.get("proposal_id"),
created_at=datetime.fromisoformat(data["created_at"]) if data.get("created_at") else None,
updated_at=datetime.fromisoformat(data["updated_at"]) if data.get("updated_at") else None,
error=data.get("error"),
)
# =============================================================================
# Decision Manager
# =============================================================================
DECISION_TOKEN_PREFIX = "decision:"
DECISION_TOKEN_TTL = 3600 # 1 小時
class DecisionManager:
"""
決策管理器 - Phase 6.5 核心
職責:
1. 為每個 Incident 簽發 decision_token
2. 並行執行 LLM + Expert System
3. First-Win 或 Fallback 策略
4. 確保 UI 永遠有決策可操作
"""
def __init__(self):
self._openclaw = get_openclaw()
async def get_or_create_decision(
self,
incident: Incident,
timeout_sec: float = 30.0,
) -> DecisionToken:
"""
取得或建立決策令牌
核心邏輯:
1. 檢查是否已有 token
2. 沒有則建立新 token (INIT)
3. 啟動非同步分析 (ANALYZING)
4. 等待結果或 timeout 後使用 Expert System
這個方法保證在 timeout_sec 內返回有效 token
"""
_redis_client = get_redis()
# 1. 檢查現有 token
existing_token = await self._find_existing_token(incident.incident_id)
if existing_token:
# READY 或 EXECUTING 狀態: 直接返回
if existing_token.state in (DecisionState.READY, DecisionState.EXECUTING):
return existing_token
# COMPLETED 狀態: 只有 incident 也已解決才返回,否則創建新 decision
# 修復: 避免 incident 未解決但 decision 已完成導致 Y/n 按鈕永久禁用
if existing_token.state == DecisionState.COMPLETED:
from src.models.incident import IncidentStatus
if incident.status in (IncidentStatus.RESOLVED, IncidentStatus.CLOSED):
return existing_token
# incident 仍在處理中,需要新的 decision
logger.info(
"decision_reset_for_active_incident",
token=existing_token.token,
incident_id=incident.incident_id,
incident_status=incident.status.value,
)
# 2. 建立新 token
token = DecisionToken(
token=f"DEC-{uuid4().hex[:12].upper()}",
incident_id=incident.incident_id,
state=DecisionState.ANALYZING,
)
await self._save_token(token)
logger.info(
"decision_analyzing",
token=token.token,
incident_id=incident.incident_id,
)
# 3. 並行執行雙軌決策
try:
proposal_data = await asyncio.wait_for(
self._dual_engine_analyze(incident),
timeout=timeout_sec,
)
token.state = DecisionState.READY
token.proposal_data = proposal_data
token.updated_at = datetime.now(UTC)
logger.info(
"decision_ready",
token=token.token,
source=proposal_data.get("source", "unknown"),
)
except TimeoutError:
# Timeout: 使用 Expert System 保底
logger.warning(
"decision_timeout_using_expert",
token=token.token,
timeout_sec=timeout_sec,
)
expert_result = expert_analyze(incident)
token.state = DecisionState.READY
token.proposal_data = expert_result
token.updated_at = datetime.now(UTC)
except Exception as e:
# 任何錯誤: 使用 Expert System 保底
logger.exception(
"decision_error_using_expert",
token=token.token,
error=str(e),
)
expert_result = expert_analyze(incident)
token.state = DecisionState.READY
token.proposal_data = expert_result
token.error = str(e)
token.updated_at = datetime.now(UTC)
# 4. 儲存最終結果
await self._save_token(token)
# 5. Phase 6.5: 推送到 Telegram (非阻塞)
if token.state == DecisionState.READY and token.proposal_data:
# 使用 asyncio.create_task 非阻塞執行
asyncio.create_task(
_push_decision_to_telegram(incident, token.proposal_data)
)
return token
async def _dual_engine_analyze(
self,
incident: Incident,
) -> dict[str, Any]:
"""
雙軌決策分析
策略:
- 同時啟動 LLM 和 Expert System
- LLM 成功則用 LLM (更智能)
- LLM 失敗則用 Expert System (保底)
"""
# Expert System 同步執行 (立即可用)
expert_result = expert_analyze(incident)
# LLM 非同步執行
try:
signals_dict = [s.model_dump() for s in incident.signals]
llm_result, provider, success = await self._openclaw.generate_incident_proposal(
incident_id=incident.incident_id,
severity=incident.severity.value,
signals=signals_dict,
affected_services=incident.affected_services,
)
if success and llm_result:
logger.info(
"dual_engine_llm_win",
incident_id=incident.incident_id,
provider=provider,
)
return {
**llm_result,
"source": f"llm_{provider}",
}
except Exception as e:
logger.warning(
"dual_engine_llm_failed",
incident_id=incident.incident_id,
error=str(e),
)
# LLM 失敗,使用 Expert System
logger.info(
"dual_engine_expert_fallback",
incident_id=incident.incident_id,
)
return expert_result
async def _find_existing_token(
self,
incident_id: str,
) -> DecisionToken | None:
"""查找現有的決策令牌"""
redis_client = get_redis()
# 掃描 decision:* 找到匹配的 incident_id
cursor = 0
while True:
cursor, keys = await redis_client.scan(
cursor=cursor,
match=f"{DECISION_TOKEN_PREFIX}*",
count=100,
)
for key in keys:
try:
import json
data = await redis_client.get(key)
if data:
token_data = json.loads(data)
if token_data.get("incident_id") == incident_id:
return DecisionToken.from_dict(token_data)
except Exception:
continue
if cursor == 0:
break
return None
async def _save_token(self, token: DecisionToken) -> None:
"""儲存決策令牌到 Redis"""
import json
redis_client = get_redis()
key = f"{DECISION_TOKEN_PREFIX}{token.token}"
await redis_client.set(
key,
json.dumps(token.to_dict()),
ex=DECISION_TOKEN_TTL,
)
async def get_token(self, token_id: str) -> DecisionToken | None:
"""取得決策令牌"""
import json
redis_client = get_redis()
key = f"{DECISION_TOKEN_PREFIX}{token_id}"
data = await redis_client.get(key)
if data:
return DecisionToken.from_dict(json.loads(data))
return None
async def update_token_state(
self,
token_id: str,
new_state: DecisionState,
proposal_id: str | None = None,
) -> DecisionToken | None:
"""更新決策狀態"""
token = await self.get_token(token_id)
if not token:
return None
token.state = new_state
token.updated_at = datetime.now(UTC)
if proposal_id:
token.proposal_id = proposal_id
await self._save_token(token)
return token
async def get_or_create_decision_with_consensus(
self,
incident: Incident,
timeout_sec: float = 30.0,
use_consensus: bool = True,
) -> DecisionToken:
"""
取得或建立決策令牌 (含 Agent Teams 共識)
Phase 9.4 升級版本:
- 對於 P0/P1 事件,自動啟用 ConsensusEngine
- 整合多專家意見
- 共識分數影響風險評估
Args:
incident: 事件
timeout_sec: 超時秒數
use_consensus: 是否使用共識引擎 (預設 True)
Returns:
DecisionToken
"""
# 判斷是否需要共識 (P0/P1 或明確要求)
should_use_consensus = use_consensus and incident.severity.value in ["P0", "P1"]
if not should_use_consensus:
# 使用原有的雙軌決策
return await self.get_or_create_decision(incident, timeout_sec)
# Phase 9.4: 使用 ConsensusEngine
from src.services.consensus_engine import get_consensus_engine
consensus_engine = get_consensus_engine()
# 檢查現有 token
existing_token = await self._find_existing_token(incident.incident_id)
if existing_token:
# READY 或 EXECUTING 狀態: 直接返回
if existing_token.state in (DecisionState.READY, DecisionState.EXECUTING):
return existing_token
# COMPLETED 狀態: 只有 incident 也已解決才返回
if existing_token.state == DecisionState.COMPLETED:
from src.models.incident import IncidentStatus
if incident.status in (IncidentStatus.RESOLVED, IncidentStatus.CLOSED):
return existing_token
logger.info(
"decision_reset_for_active_incident_consensus",
token=existing_token.token,
incident_id=incident.incident_id,
incident_status=incident.status.value,
)
# 建立新 token
token = DecisionToken(
token=f"DEC-{uuid4().hex[:12].upper()}",
incident_id=incident.incident_id,
state=DecisionState.ANALYZING,
)
await self._save_token(token)
logger.info(
"decision_analyzing_with_consensus",
token=token.token,
incident_id=incident.incident_id,
)
try:
# 執行共識分析
consensus_result = await asyncio.wait_for(
consensus_engine.run_consensus(incident, timeout_sec),
timeout=timeout_sec,
)
# 轉換為 proposal_data 格式
proposal_data = {
"source": "consensus_engine",
"consensus_id": consensus_result.consensus_id,
"consensus_score": consensus_result.consensus_score,
"action": consensus_result.recommended_action,
"description": consensus_result.final_reasoning,
"risk_level": consensus_result.risk_level,
"kubectl_command": consensus_result.recommended_kubectl,
"reasoning": consensus_result.final_reasoning,
"confidence": consensus_result.consensus_score,
"agent_count": len(consensus_result.opinions),
"dissenting_opinions": consensus_result.dissenting_opinions,
"from_cache": False,
}
token.state = DecisionState.READY
token.proposal_data = proposal_data
token.updated_at = datetime.now(UTC)
logger.info(
"decision_ready_with_consensus",
token=token.token,
consensus_id=consensus_result.consensus_id,
consensus_score=consensus_result.consensus_score,
)
except TimeoutError:
logger.warning(
"consensus_timeout_using_expert",
token=token.token,
timeout_sec=timeout_sec,
)
# Fallback 到 Expert System
expert_result = expert_analyze(incident)
token.state = DecisionState.READY
token.proposal_data = expert_result
token.updated_at = datetime.now(UTC)
except Exception as e:
logger.exception(
"consensus_error_using_expert",
token=token.token,
error=str(e),
)
expert_result = expert_analyze(incident)
token.state = DecisionState.READY
token.proposal_data = expert_result
token.error = str(e)
token.updated_at = datetime.now(UTC)
await self._save_token(token)
return token
# =============================================================================
# Singleton
# =============================================================================
_decision_manager: DecisionManager | None = None
def get_decision_manager() -> DecisionManager:
"""取得 DecisionManager 實例 (Singleton)"""
global _decision_manager
if _decision_manager is None:
_decision_manager = DecisionManager()
return _decision_manager