## P0-05 Callback Nonce 防偽造(ADR-116)
- security_interceptor.py:generate_callback_nonce() 新增 HMAC-SHA256[:16] 附加
- 新 5-part 格式:{action}:{short_id}:{ts}:{rand}:{hmac16}
- CALLBACK_HMAC_SECRET 未設定時降級 warning(向後相容)
- security_interceptor.py:parse_callback_data() 新增 5-part 分支 + HMAC 驗證
- config.py:新增 CALLBACK_HMAC_SECRET: str = Field(default="")
## P0-06 Webhook HMAC Replay 防護(ADR-116)
- security_interceptor.py:新增 check_webhook_nonce()(Service 層,get_redis 在此層合法)
- webhooks.py:verify_webhook_signature() 新增兩個可選 Header
- X-Webhook-Timestamp:±300s 窗口驗證(若提供)
- X-Webhook-Nonce:呼叫 check_webhook_nonce()(Redis NX dedup,fail open)
- 移除直接 get_redis import(leWOOOgo 積木化修正)
## P0-11 ollama:current_primary Redis key 遷移 Phase A(ADR-110)
- ollama_auto_recovery.py:_REDIS_PRIMARY_KEY = "platform:ollama:current_primary"
- 雙寫舊 key "ollama:current_primary"(Phase A 30 天)
- 讀取以新 key 為主,fallback 舊 key
## P0-12 consensus Redis key 加 project namespace Phase A
- consensus_engine.py:新增 _consensus_key() / _consensus_legacy_key() helper
- 新 key:{project_id}:consensus:{consensus_id}
- project_id=None 時 fallback __platform__:consensus:{consensus_id}
- Phase A 雙寫 + fallback 讀取,現有呼叫方零修改
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
784 lines
29 KiB
Python
784 lines
29 KiB
Python
"""
|
||
Consensus Engine - Phase 9.4 多專家共識引擎
|
||
============================================
|
||
|
||
實作 Agent Teams 的共識機制,整合多個專家 Agent 的意見。
|
||
|
||
Features:
|
||
- 收集多個專家 Agent 的意見 (SRE, Security, Cost, Performance)
|
||
- 計算加權共識分數
|
||
- 產生最終整合決策
|
||
- 支援 Redis Working Memory 儲存
|
||
|
||
統帥鐵律:
|
||
- 所有專家意見必須被記錄 (CISO 可稽核性要求)
|
||
- 信心度低於 0.6 的意見權重降低
|
||
- 最終決策必須包含所有專家的推理過程
|
||
"""
|
||
|
||
import asyncio
|
||
import json
|
||
from datetime import UTC, datetime
|
||
from enum import Enum
|
||
from typing import Any
|
||
from uuid import uuid4
|
||
|
||
import structlog
|
||
from pydantic import BaseModel, Field, field_validator
|
||
|
||
from src.core.redis_client import get_redis
|
||
from src.models.incident import Incident
|
||
|
||
logger = structlog.get_logger(__name__)
|
||
|
||
|
||
# =============================================================================
|
||
# Agent Types (專家類型)
|
||
# =============================================================================
|
||
|
||
class AgentType(str, Enum):
|
||
"""專家 Agent 類型"""
|
||
SRE = "sre" # Site Reliability Engineer - 系統穩定性
|
||
SECURITY = "security" # Security Expert - 資安風險
|
||
COST = "cost" # FinOps Expert - 成本效益
|
||
PERFORMANCE = "performance" # Performance Expert - 效能優化
|
||
|
||
|
||
# =============================================================================
|
||
# Agent Opinion (專家意見)
|
||
# =============================================================================
|
||
|
||
class AgentOpinion(BaseModel):
|
||
"""
|
||
單一專家的意見
|
||
|
||
每個專家會針對同一個 Incident 提出自己的分析與建議
|
||
"""
|
||
|
||
agent_type: AgentType
|
||
action: str
|
||
reasoning: str
|
||
confidence: float = Field(ge=0.0, le=1.0, description="信心度 0-1")
|
||
risk_assessment: str
|
||
kubectl_command: str | None = None
|
||
priority: int = Field(default=5, ge=1, le=10, description="優先度 1-10, 10 最高")
|
||
estimated_impact: dict[str, Any] = Field(default_factory=dict)
|
||
created_at: datetime = Field(default_factory=lambda: datetime.now(UTC))
|
||
|
||
model_config = {"use_enum_values": False}
|
||
|
||
@field_validator("confidence", mode="before")
|
||
@classmethod
|
||
def clamp_confidence(cls, v: float) -> float:
|
||
"""Clamp confidence to 0-1 range"""
|
||
return min(max(v, 0.0), 1.0)
|
||
|
||
def to_dict(self) -> dict[str, Any]:
|
||
return {
|
||
"agent_type": self.agent_type.value,
|
||
"action": self.action,
|
||
"reasoning": self.reasoning,
|
||
"confidence": self.confidence,
|
||
"risk_assessment": self.risk_assessment,
|
||
"kubectl_command": self.kubectl_command,
|
||
"priority": self.priority,
|
||
"estimated_impact": self.estimated_impact,
|
||
"created_at": self.created_at.isoformat(),
|
||
}
|
||
|
||
@classmethod
|
||
def from_dict(cls, data: dict[str, Any]) -> "AgentOpinion":
|
||
return cls(
|
||
agent_type=AgentType(data["agent_type"]),
|
||
action=data["action"],
|
||
reasoning=data["reasoning"],
|
||
confidence=data["confidence"],
|
||
risk_assessment=data["risk_assessment"],
|
||
kubectl_command=data.get("kubectl_command"),
|
||
priority=data.get("priority", 5),
|
||
estimated_impact=data.get("estimated_impact", {}),
|
||
)
|
||
|
||
|
||
# =============================================================================
|
||
# Consensus Result (共識結果)
|
||
# =============================================================================
|
||
|
||
class ConsensusResult(BaseModel):
|
||
"""
|
||
共識引擎的最終決策結果
|
||
|
||
包含:
|
||
- 所有專家意見 (CISO 可稽核性)
|
||
- 加權共識分數
|
||
- 最終推薦行動
|
||
- 決策理由
|
||
"""
|
||
|
||
consensus_id: str
|
||
incident_id: str
|
||
opinions: list[AgentOpinion]
|
||
consensus_score: float = Field(ge=0.0, le=1.0, description="共識分數 0-1")
|
||
recommended_action: str
|
||
recommended_kubectl: str | None = None
|
||
final_reasoning: str
|
||
risk_level: str
|
||
dissenting_opinions: list[str] = Field(default_factory=list)
|
||
created_at: datetime = Field(default_factory=lambda: datetime.now(UTC))
|
||
|
||
model_config = {"use_enum_values": False}
|
||
|
||
def to_dict(self) -> dict[str, Any]:
|
||
return {
|
||
"consensus_id": self.consensus_id,
|
||
"incident_id": self.incident_id,
|
||
"opinions": [op.to_dict() for op in self.opinions],
|
||
"consensus_score": self.consensus_score,
|
||
"recommended_action": self.recommended_action,
|
||
"recommended_kubectl": self.recommended_kubectl,
|
||
"final_reasoning": self.final_reasoning,
|
||
"risk_level": self.risk_level,
|
||
"dissenting_opinions": self.dissenting_opinions,
|
||
"created_at": self.created_at.isoformat(),
|
||
"agent_count": len(self.opinions),
|
||
}
|
||
|
||
@classmethod
|
||
def from_dict(cls, data: dict[str, Any]) -> "ConsensusResult":
|
||
return cls(
|
||
consensus_id=data["consensus_id"],
|
||
incident_id=data["incident_id"],
|
||
opinions=[AgentOpinion.from_dict(op) for op in data["opinions"]],
|
||
consensus_score=data["consensus_score"],
|
||
recommended_action=data["recommended_action"],
|
||
recommended_kubectl=data.get("recommended_kubectl"),
|
||
final_reasoning=data["final_reasoning"],
|
||
risk_level=data["risk_level"],
|
||
dissenting_opinions=data.get("dissenting_opinions", []),
|
||
)
|
||
|
||
|
||
# =============================================================================
|
||
# Expert Agent Base (專家 Agent 基類)
|
||
# =============================================================================
|
||
|
||
class ExpertAgent:
|
||
"""
|
||
專家 Agent 基類
|
||
|
||
每個專家會從自己的角度分析 Incident,
|
||
子類別實作 analyze() 方法
|
||
"""
|
||
|
||
agent_type: AgentType
|
||
|
||
async def analyze(self, incident: Incident) -> AgentOpinion:
|
||
"""
|
||
分析 Incident 並產生意見
|
||
|
||
子類別必須實作此方法
|
||
"""
|
||
raise NotImplementedError
|
||
|
||
|
||
class SREAgent(ExpertAgent):
|
||
"""SRE 專家 - 專注系統穩定性與可用性"""
|
||
|
||
agent_type = AgentType.SRE
|
||
|
||
async def analyze(self, incident: Incident) -> AgentOpinion:
|
||
"""SRE 視角分析"""
|
||
# 分析 signals 決定建議
|
||
alert_names = " ".join([s.alert_name.lower() for s in incident.signals])
|
||
target = incident.affected_services[0] if incident.affected_services else "unknown"
|
||
|
||
# SRE 規則引擎 — confidence 依關鍵字明確度定
|
||
if any(kw in alert_names for kw in ["crash", "restart", "oom", "killed"]):
|
||
action = "重新啟動服務以恢復穩定性"
|
||
kubectl = f"kubectl rollout restart deployment/{target} -n awoooi-prod"
|
||
confidence = 0.72 # 明確崩潰訊號,規則高可信
|
||
risk = "medium"
|
||
elif any(kw in alert_names for kw in ["latency", "slow", "timeout"]):
|
||
action = "擴展副本數以分散負載"
|
||
kubectl = f"kubectl scale deployment/{target} --replicas=3 -n awoooi-prod"
|
||
confidence = 0.65 # 效能問題,可能多因,中等可信
|
||
risk = "low"
|
||
elif any(kw in alert_names for kw in ["cpu", "memory", "resource"]):
|
||
action = "調整資源限制或擴展副本"
|
||
kubectl = f"kubectl scale deployment/{target} --replicas=2 -n awoooi-prod"
|
||
confidence = 0.68 # 資源告警,指標明確
|
||
risk = "medium"
|
||
else:
|
||
action = "進行安全重啟以排除未知問題"
|
||
kubectl = f"kubectl rollout restart deployment/{target} -n awoooi-prod"
|
||
confidence = 0.45 # 無明確訊號,低可信保守處理
|
||
risk = "medium"
|
||
|
||
return AgentOpinion(
|
||
agent_type=self.agent_type,
|
||
action=action,
|
||
reasoning=f"SRE 分析: 根據告警 {alert_names[:50]} 判斷服務 {target} 需要 {action}",
|
||
confidence=confidence,
|
||
risk_assessment=f"SRE 評估風險等級: {risk},預計恢復時間 < 5 分鐘",
|
||
kubectl_command=kubectl,
|
||
priority=8 if incident.severity.value in ["P0", "P1"] else 5,
|
||
estimated_impact={
|
||
"downtime_seconds": 30 if "restart" in action else 0,
|
||
"affected_users": "minimal",
|
||
},
|
||
)
|
||
|
||
|
||
class SecurityAgent(ExpertAgent):
|
||
"""資安專家 - 專注安全風險評估"""
|
||
|
||
agent_type = AgentType.SECURITY
|
||
|
||
async def analyze(self, incident: Incident) -> AgentOpinion:
|
||
"""資安視角分析"""
|
||
alert_names = " ".join([s.alert_name.lower() for s in incident.signals])
|
||
|
||
# 資安掃描
|
||
security_concerns = []
|
||
if any(kw in alert_names for kw in ["auth", "login", "401", "403"]):
|
||
security_concerns.append("可能存在認證問題")
|
||
if any(kw in alert_names for kw in ["injection", "xss", "csrf"]):
|
||
security_concerns.append("可能存在注入攻擊")
|
||
if any(kw in alert_names for kw in ["rate", "ddos", "flood"]):
|
||
security_concerns.append("可能存在 DoS 攻擊")
|
||
|
||
if security_concerns:
|
||
action = "建議先隔離受影響服務,啟用 NetworkPolicy 限制"
|
||
confidence = 0.80 # 安全關鍵字強命中,資安規則高可信
|
||
risk = "critical"
|
||
else:
|
||
action = "無明顯資安風險,建議 SRE 處理"
|
||
confidence = 0.60 # 排除確認,中等可信
|
||
risk = "low"
|
||
|
||
return AgentOpinion(
|
||
agent_type=self.agent_type,
|
||
action=action,
|
||
reasoning=f"Security 分析: {'; '.join(security_concerns) if security_concerns else '未發現資安威脅'}",
|
||
confidence=confidence,
|
||
risk_assessment=f"資安風險等級: {risk}",
|
||
kubectl_command=None, # 資安建議通常需要人工審核
|
||
priority=9 if security_concerns else 3,
|
||
estimated_impact={
|
||
"security_risk": "high" if security_concerns else "none",
|
||
"requires_audit": bool(security_concerns),
|
||
},
|
||
)
|
||
|
||
|
||
class CostAgent(ExpertAgent):
|
||
"""成本專家 - 專注資源效益分析"""
|
||
|
||
agent_type = AgentType.COST
|
||
|
||
async def analyze(self, incident: Incident) -> AgentOpinion:
|
||
"""成本視角分析"""
|
||
target = incident.affected_services[0] if incident.affected_services else "unknown"
|
||
|
||
# 成本評估 (假設每個副本每小時 $0.05)
|
||
action = "建議使用 HPA 自動擴展而非固定擴容,以優化成本"
|
||
kubectl = f"kubectl autoscale deployment/{target} --cpu-percent=70 --min=2 --max=5 -n awoooi-prod"
|
||
|
||
return AgentOpinion(
|
||
agent_type=self.agent_type,
|
||
action=action,
|
||
reasoning="FinOps 分析: 使用 HPA 可在負載降低後自動縮減,相比固定擴容可節省約 40% 成本",
|
||
confidence=0.55, # 通用建議,非症狀驅動,保守可信
|
||
risk_assessment="成本風險: low,使用 HPA 可自動調節",
|
||
kubectl_command=kubectl,
|
||
priority=4,
|
||
estimated_impact={
|
||
"monthly_cost_change": "+$15 to +$50",
|
||
"cost_optimization": "HPA 自動縮減",
|
||
},
|
||
)
|
||
|
||
|
||
class PerformanceAgent(ExpertAgent):
|
||
"""效能專家 - 專注性能優化"""
|
||
|
||
agent_type = AgentType.PERFORMANCE
|
||
|
||
async def analyze(self, incident: Incident) -> AgentOpinion:
|
||
"""效能視角分析"""
|
||
target = incident.affected_services[0] if incident.affected_services else "unknown"
|
||
alert_names = " ".join([s.alert_name.lower() for s in incident.signals])
|
||
|
||
if any(kw in alert_names for kw in ["latency", "p99", "slow"]):
|
||
action = "建議增加資源限制並啟用 PodDisruptionBudget"
|
||
kubectl = f"kubectl patch deployment/{target} -n awoooi-prod -p '{{\"spec\":{{\"template\":{{\"spec\":{{\"containers\":[{{\"name\":\"{target}\",\"resources\":{{\"limits\":{{\"cpu\":\"2\",\"memory\":\"2Gi\"}}}}}}]}}}}}}}}'"
|
||
confidence = 0.70 # 效能關鍵字明確命中
|
||
else:
|
||
action = "當前效能指標正常,建議觀察"
|
||
kubectl = None
|
||
confidence = 0.50 # 無效能異常,不確定,低權重
|
||
|
||
return AgentOpinion(
|
||
agent_type=self.agent_type,
|
||
action=action,
|
||
reasoning=f"Performance 分析: 根據 P99 latency 指標,{action}",
|
||
confidence=confidence,
|
||
risk_assessment="效能風險: medium,資源調整可能影響其他 Pod",
|
||
kubectl_command=kubectl,
|
||
priority=6,
|
||
estimated_impact={
|
||
"latency_improvement": "預計 P99 降低 30%",
|
||
"resource_increase": "+1 CPU, +1Gi Memory",
|
||
},
|
||
)
|
||
|
||
|
||
# =============================================================================
|
||
# Consensus Engine
|
||
# =============================================================================
|
||
|
||
# P0-12 修正 2026-05-04 ogt + Claude Sonnet 4.6:
|
||
# 舊格式(無 project 前綴):consensus:{consensus_id}
|
||
# 新格式(含 project 前綴):{project_id}:consensus:{consensus_id}
|
||
# 遷移策略:Phase A 雙寫 + fallback 讀舊 key,待全部遷移後移除 fallback
|
||
CONSENSUS_PREFIX = "consensus:" # 舊格式前綴(讀 fallback 用)
|
||
PLATFORM_INTERNAL = "__platform__" # project_id 不可得時的 sentinel namespace
|
||
CONSENSUS_TTL = 3600 # 1 小時
|
||
|
||
|
||
def _consensus_key(consensus_id: str, project_id: str | None) -> str:
|
||
"""
|
||
建構 consensus Redis key(含 project_id namespace)
|
||
|
||
Args:
|
||
consensus_id: 共識 ID(如 CON-20260504-ABCD1234)
|
||
project_id: 租戶 project ID;若為 None 則使用 __platform__ sentinel
|
||
|
||
Returns:
|
||
新格式 key:{project_id}:consensus:{consensus_id}
|
||
或 fallback:__platform__:consensus:{consensus_id}
|
||
"""
|
||
ns = project_id if project_id else PLATFORM_INTERNAL
|
||
return f"{ns}:consensus:{consensus_id}"
|
||
|
||
|
||
def _consensus_legacy_key(consensus_id: str) -> str:
|
||
"""舊格式 key(Phase A fallback 讀取用)"""
|
||
return f"{CONSENSUS_PREFIX}{consensus_id}"
|
||
|
||
|
||
class ConsensusEngine:
|
||
"""
|
||
共識引擎 - Phase 9.4 核心
|
||
|
||
職責:
|
||
1. 收集所有專家 Agent 的意見
|
||
2. 計算加權共識分數
|
||
3. 產生最終整合決策
|
||
4. 儲存結果到 Redis (Working Memory)
|
||
|
||
共識計算規則:
|
||
- 高信心度意見權重較高
|
||
- 同類型建議會強化共識
|
||
- 分歧意見會降低共識分數
|
||
"""
|
||
|
||
# ADR-095 2026-04-25 ogt + Claude Sonnet 4.6: 12-Agent Claude Code weights
|
||
# 僅在 ENABLE_12AGENT_CONSENSUS=True 時參與投票(預設 False)
|
||
# security=0.4 永遠最高(ADR-009 鐵律)
|
||
CONSENSUS_WEIGHTS: dict[str, float] = {
|
||
# ADR-009 原始三核心
|
||
"SecurityAgent": 0.4, # 資安永遠最高,不可降
|
||
"BlastRadiusAgent": 0.15, # 原 0.3 → 0.15
|
||
"ActionPlannerAgent": 0.15, # 原 0.3 → 0.15
|
||
# ADR-095 新增 9 個 Claude Code agent(按需投票)
|
||
"critic": 0.06,
|
||
"debugger": 0.06,
|
||
"db-expert": 0.04,
|
||
"vuln-verifier": 0.04,
|
||
"planner": 0.02,
|
||
"fullstack-engineer": 0.02,
|
||
"refactor-specialist":0.02,
|
||
"migration-engineer": 0.02,
|
||
"tool-expert": 0.02,
|
||
# onboarder / frontend-designer / web-researcher 不參與投票(諮詢型)
|
||
# sum = 1.0
|
||
}
|
||
|
||
def __init__(self):
|
||
self._agents: list[ExpertAgent] = [
|
||
SREAgent(),
|
||
SecurityAgent(),
|
||
CostAgent(),
|
||
PerformanceAgent(),
|
||
]
|
||
|
||
async def gather_opinions(
|
||
self,
|
||
incident: Incident,
|
||
timeout_sec: float = 30.0,
|
||
) -> list[AgentOpinion]:
|
||
"""
|
||
收集所有專家的意見
|
||
|
||
並行執行所有專家分析,使用 timeout 防止單一專家阻塞
|
||
"""
|
||
async def safe_analyze(agent: ExpertAgent) -> AgentOpinion | None:
|
||
try:
|
||
return await asyncio.wait_for(
|
||
agent.analyze(incident),
|
||
timeout=timeout_sec / len(self._agents),
|
||
)
|
||
except TimeoutError:
|
||
logger.warning(
|
||
"agent_analyze_timeout",
|
||
agent_type=agent.agent_type.value,
|
||
incident_id=incident.incident_id,
|
||
)
|
||
return None
|
||
except Exception as e:
|
||
logger.exception(
|
||
"agent_analyze_error",
|
||
agent_type=agent.agent_type.value,
|
||
error=str(e),
|
||
)
|
||
return None
|
||
|
||
# 並行執行所有專家分析
|
||
results = await asyncio.gather(
|
||
*[safe_analyze(agent) for agent in self._agents],
|
||
return_exceptions=False,
|
||
)
|
||
|
||
opinions = [r for r in results if r is not None]
|
||
|
||
logger.info(
|
||
"opinions_gathered",
|
||
incident_id=incident.incident_id,
|
||
total_agents=len(self._agents),
|
||
successful_opinions=len(opinions),
|
||
)
|
||
|
||
return opinions
|
||
|
||
def calculate_consensus(
|
||
self,
|
||
opinions: list[AgentOpinion],
|
||
) -> tuple[float, str, list[str]]:
|
||
"""
|
||
計算共識分數
|
||
|
||
算法:
|
||
1. 按 action 類型分組
|
||
2. 計算加權投票 (confidence * priority)
|
||
3. 最高票數的 action 為推薦
|
||
4. 共識分數 = 最高票 / 總票數
|
||
|
||
Returns:
|
||
(consensus_score, recommended_action, dissenting_opinions)
|
||
"""
|
||
if not opinions:
|
||
return 0.0, "NO_ACTION", []
|
||
|
||
# 按 action 分組計算加權票數
|
||
action_votes: dict[str, float] = {}
|
||
action_details: dict[str, list[AgentOpinion]] = {}
|
||
|
||
for opinion in opinions:
|
||
# 低信心度意見權重降低
|
||
weight_multiplier = 1.0 if opinion.confidence >= 0.6 else 0.5
|
||
vote_weight = opinion.confidence * opinion.priority * weight_multiplier
|
||
|
||
# 簡化 action 到類別
|
||
action_key = self._normalize_action(opinion.action)
|
||
|
||
if action_key not in action_votes:
|
||
action_votes[action_key] = 0.0
|
||
action_details[action_key] = []
|
||
|
||
action_votes[action_key] += vote_weight
|
||
action_details[action_key].append(opinion)
|
||
|
||
# 找出最高票
|
||
total_votes = sum(action_votes.values())
|
||
if total_votes == 0:
|
||
return 0.0, "NO_ACTION", []
|
||
|
||
winner_action = max(action_votes.keys(), key=lambda k: action_votes[k])
|
||
consensus_score = action_votes[winner_action] / total_votes
|
||
|
||
# 找出分歧意見 (非主流意見)
|
||
dissenting = []
|
||
for action_key, ops in action_details.items():
|
||
if action_key != winner_action:
|
||
for op in ops:
|
||
dissenting.append(
|
||
f"{op.agent_type.value}: {op.action} (信心度: {op.confidence:.0%})"
|
||
)
|
||
|
||
logger.info(
|
||
"consensus_calculated",
|
||
winner_action=winner_action,
|
||
consensus_score=consensus_score,
|
||
total_votes=total_votes,
|
||
dissenting_count=len(dissenting),
|
||
)
|
||
|
||
return consensus_score, winner_action, dissenting
|
||
|
||
def _normalize_action(self, action: str) -> str:
|
||
"""將 action 正規化到類別"""
|
||
action_lower = action.lower()
|
||
|
||
if any(kw in action_lower for kw in ["重啟", "重新啟動", "restart"]):
|
||
return "RESTART"
|
||
elif any(kw in action_lower for kw in ["擴展", "scale", "副本"]):
|
||
return "SCALE"
|
||
elif any(kw in action_lower for kw in ["hpa", "autoscale"]):
|
||
return "HPA"
|
||
elif any(kw in action_lower for kw in ["隔離", "isolate", "network"]):
|
||
return "ISOLATE"
|
||
elif any(kw in action_lower for kw in ["資源", "resource", "limit"]):
|
||
return "TUNE_RESOURCES"
|
||
elif any(kw in action_lower for kw in ["觀察", "observe", "正常"]):
|
||
return "OBSERVE"
|
||
else:
|
||
return "OTHER"
|
||
|
||
async def generate_final_decision(
|
||
self,
|
||
incident: Incident,
|
||
opinions: list[AgentOpinion],
|
||
consensus_score: float,
|
||
recommended_action_type: str,
|
||
dissenting: list[str],
|
||
project_id: str | None = None,
|
||
) -> ConsensusResult:
|
||
"""
|
||
產生最終決策
|
||
|
||
整合所有專家意見,產生結構化的 ConsensusResult
|
||
"""
|
||
consensus_id = f"CON-{datetime.now(UTC).strftime('%Y%m%d')}-{uuid4().hex[:8].upper()}"
|
||
|
||
# 找出最佳的 kubectl 指令 (來自最高 priority + confidence 的意見)
|
||
best_kubectl = None
|
||
best_score = 0.0
|
||
best_action_detail = ""
|
||
|
||
for op in opinions:
|
||
if self._normalize_action(op.action) == recommended_action_type:
|
||
score = op.confidence * op.priority
|
||
if score > best_score and op.kubectl_command:
|
||
best_score = score
|
||
best_kubectl = op.kubectl_command
|
||
best_action_detail = op.action
|
||
|
||
# 決定風險等級
|
||
if consensus_score >= 0.8:
|
||
risk_level = "low"
|
||
elif consensus_score >= 0.6:
|
||
risk_level = "medium"
|
||
else:
|
||
risk_level = "critical" # 共識不足,需人工審核
|
||
|
||
# 組合最終推理
|
||
reasoning_parts = []
|
||
for op in opinions:
|
||
reasoning_parts.append(f"[{op.agent_type.value.upper()}] {op.reasoning}")
|
||
|
||
final_reasoning = (
|
||
f"共識引擎整合 {len(opinions)} 位專家意見:\n"
|
||
+ "\n".join(reasoning_parts)
|
||
+ f"\n\n最終共識: {recommended_action_type} (共識度: {consensus_score:.0%})"
|
||
)
|
||
|
||
result = ConsensusResult(
|
||
consensus_id=consensus_id,
|
||
incident_id=incident.incident_id,
|
||
opinions=opinions,
|
||
consensus_score=consensus_score,
|
||
recommended_action=best_action_detail or recommended_action_type,
|
||
recommended_kubectl=best_kubectl,
|
||
final_reasoning=final_reasoning,
|
||
risk_level=risk_level,
|
||
dissenting_opinions=dissenting,
|
||
)
|
||
|
||
# 儲存到 Redis(含 project_id namespace)
|
||
await self._save_consensus(result, project_id=project_id)
|
||
|
||
logger.info(
|
||
"consensus_generated",
|
||
consensus_id=consensus_id,
|
||
incident_id=incident.incident_id,
|
||
consensus_score=consensus_score,
|
||
risk_level=risk_level,
|
||
)
|
||
|
||
return result
|
||
|
||
async def run_consensus(
|
||
self,
|
||
incident: Incident,
|
||
timeout_sec: float = 30.0,
|
||
project_id: str | None = None,
|
||
) -> ConsensusResult:
|
||
"""
|
||
執行完整的共識流程
|
||
|
||
這是對外的主要 API:
|
||
1. 收集意見
|
||
2. 計算共識
|
||
3. 產生決策
|
||
|
||
Args:
|
||
incident: 要分析的事件
|
||
timeout_sec: 超時秒數
|
||
project_id: 租戶 project ID,用於 Redis key namespace 隔離(P0-12)
|
||
"""
|
||
# Step 1: 收集意見
|
||
opinions = await self.gather_opinions(incident, timeout_sec)
|
||
|
||
# Step 2: 計算共識
|
||
consensus_score, recommended_action, dissenting = self.calculate_consensus(opinions)
|
||
|
||
# Step 3: 產生決策(傳入 project_id 供 Redis key namespace 隔離)
|
||
result = await self.generate_final_decision(
|
||
incident=incident,
|
||
opinions=opinions,
|
||
consensus_score=consensus_score,
|
||
recommended_action_type=recommended_action,
|
||
dissenting=dissenting,
|
||
project_id=project_id,
|
||
)
|
||
|
||
return result
|
||
|
||
async def _save_consensus(
|
||
self,
|
||
result: ConsensusResult,
|
||
project_id: str | None = None,
|
||
) -> None:
|
||
"""儲存共識結果到 Redis(熱快取)+ PG(永久記錄)
|
||
|
||
2026-04-26 P2-DB-Fix by Claude — db-expert P0 三修(P0.2):
|
||
補 PG 寫入 agent_sessions,符合 ADR-085 鐵律
|
||
Redis TTL 到期不再造成共識記憶消失
|
||
|
||
P0-12 修正 2026-05-04 ogt + Claude Sonnet 4.6:
|
||
Phase A 雙寫:新 key(含 project_id 前綴)+ 舊 key(向後相容)
|
||
待全部遷移完成後移除舊 key 寫入
|
||
"""
|
||
redis_client = get_redis()
|
||
payload = json.dumps(result.to_dict())
|
||
|
||
# 1a. 新 key(含 project namespace)— Phase A 主要 key
|
||
new_key = _consensus_key(result.consensus_id, project_id)
|
||
await redis_client.set(new_key, payload, ex=CONSENSUS_TTL)
|
||
|
||
# 1b. 舊 key(無 project 前綴)— Phase A fallback,向後相容
|
||
legacy_key = _consensus_legacy_key(result.consensus_id)
|
||
await redis_client.set(legacy_key, payload, ex=CONSENSUS_TTL)
|
||
|
||
# 2. 補 PG 永久寫入(ADR-085 鐵律 — 失敗不阻斷主流程)
|
||
try:
|
||
from src.db.base import get_db_context
|
||
from src.db.models import AgentSession
|
||
from sqlalchemy import insert as _sa_insert
|
||
from hashlib import sha256 as _sha256
|
||
|
||
rows = []
|
||
# 每個 AgentOpinion 寫一行(CISO 可稽核性要求)
|
||
for opinion in result.opinions:
|
||
_input_hash = _sha256(
|
||
json.dumps(opinion.to_dict(), sort_keys=True).encode()
|
||
).hexdigest()[:16]
|
||
rows.append({
|
||
"session_id": result.consensus_id,
|
||
"incident_id": result.incident_id,
|
||
"agent_role": opinion.agent_type.value, # sre/security/cost/performance ≤20 chars
|
||
"vote": "abstain", # AgentOpinion 無標準投票欄;coordinator 行再覆蓋
|
||
"output_json": opinion.to_dict(),
|
||
"latency_ms": 0, # Phase 9.4 AgentOpinion 未計 latency
|
||
"degraded": False,
|
||
"input_hash": _input_hash,
|
||
})
|
||
# coordinator 行:整合決策結果
|
||
_coord_hash = _sha256(
|
||
json.dumps({"consensus_id": result.consensus_id}, sort_keys=True).encode()
|
||
).hexdigest()[:16]
|
||
rows.append({
|
||
"session_id": result.consensus_id,
|
||
"incident_id": result.incident_id,
|
||
"agent_role": "coordinator",
|
||
"vote": "approve" if result.consensus_score >= 0.6 else "abstain",
|
||
"output_json": result.to_dict(),
|
||
"latency_ms": 0,
|
||
"degraded": False,
|
||
"input_hash": _coord_hash,
|
||
})
|
||
|
||
async with get_db_context() as db:
|
||
await db.execute(_sa_insert(AgentSession), rows)
|
||
await db.commit()
|
||
|
||
logger.info(
|
||
"consensus_pg_write_ok",
|
||
consensus_id=result.consensus_id,
|
||
rows=len(rows),
|
||
)
|
||
except Exception as _pg_err:
|
||
logger.warning(
|
||
"consensus_pg_write_failed",
|
||
error=str(_pg_err),
|
||
consensus_id=result.consensus_id,
|
||
)
|
||
|
||
async def get_consensus(
|
||
self,
|
||
consensus_id: str,
|
||
project_id: str | None = None,
|
||
) -> ConsensusResult | None:
|
||
"""取得共識結果
|
||
|
||
P0-12 修正 2026-05-04 ogt + Claude Sonnet 4.6:
|
||
Phase A 雙讀:先讀新 key(含 project 前綴),若 miss 再 fallback 舊 key
|
||
"""
|
||
redis_client = get_redis()
|
||
|
||
# 先嘗試新格式 key(含 project namespace)
|
||
new_key = _consensus_key(consensus_id, project_id)
|
||
data = await redis_client.get(new_key)
|
||
|
||
if not data:
|
||
# Phase A fallback:讀舊格式 key(無 project 前綴)
|
||
legacy_key = _consensus_legacy_key(consensus_id)
|
||
data = await redis_client.get(legacy_key)
|
||
if data:
|
||
logger.info(
|
||
"consensus_legacy_key_hit",
|
||
consensus_id=consensus_id,
|
||
project_id=project_id,
|
||
note="Phase A fallback 命中,建議觸發資料遷移",
|
||
)
|
||
|
||
if data:
|
||
return ConsensusResult.from_dict(json.loads(data))
|
||
return None
|
||
|
||
|
||
# =============================================================================
|
||
# Singleton
|
||
# =============================================================================
|
||
|
||
_consensus_engine: ConsensusEngine | None = None
|
||
|
||
|
||
def get_consensus_engine() -> ConsensusEngine:
|
||
"""取得 ConsensusEngine 實例 (Singleton)"""
|
||
global _consensus_engine
|
||
if _consensus_engine is None:
|
||
_consensus_engine = ConsensusEngine()
|
||
return _consensus_engine
|