feat(adr-082): Phase 2 多 Agent 協作 — 5 角色辯證系統骨架上線

新增 5 個 Agent + Orchestrator + DecisionManager 接線:
- protocol.py: DiagnosisReport / ActionPlan / ReviewVerdict / CriticReport / DecisionPackage 型別系統
- DiagnosticianAgent: RCA 根因分析,confidence < 0.4 → ABSTAIN
- SolverAgent: 修復方案軍師,blast_radius 評分 + 降級 rule-based mock
- ReviewerAgent: 安全審查,HARD_RULES 靜態 pattern + blast_radius 閾值 (>50 revision, >80 reject)
- CriticAgent: 刻意唱反調,強制 3 問批判性思維,critical challenge → REJECT
- CoordinatorAgent: 純規則聚合,6 級決策閘,REQUEST_REVISION → 強制人工
- AgentOrchestrator: 30s 全局超時,Reviewer ‖ Critic 並行,DB Immutable Event Sourcing + Redis Streams
- DecisionManager: AIOPS_P2_ENABLED gate + _package_to_proposal_data 橋接既有 proposal_data 格式
- AgentSession DB table + 4 個複合 index
- ADR-082 決策記錄

Gate 2 修復(7 項):
- CRITICAL: DELETE FROM regex lookahead 位置錯誤(移至 FROM 後)
- CRITICAL: REQUEST_REVISION 可抵達 auto-execute 路徑(改回 requires_human_approval=True)
- IMPORTANT: _extract_json flat regex 不支援巢狀 JSON(改 find/rfind 邊界提取)
- IMPORTANT: all_degraded 遺漏 verdict.degraded(補全 4 個 Agent)
- IMPORTANT: Solver ABSTAIN guard 放行降級假設(改為無論 hypotheses 有無均跳過)
- IMPORTANT: dataclasses.asdict() Enum 未序列化導致 DB 寫入靜默失敗(加 json.dumps default handler)
- IMPORTANT: P2 gate 直讀屬性繞過父 Phase 守衛(改用 is_phase_enabled(2))

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
OG T
2026-04-15 13:33:13 +08:00
parent d51705b4ec
commit 5ddba6d6e0
11 changed files with 2171 additions and 4 deletions

View File

@@ -163,11 +163,13 @@ class BaseAgent(ABC, Generic[T]):
except json.JSONDecodeError:
pass
# 嘗試 { ... } 格式
match = re.search(r"\{[^{}]*\}", text, re.DOTALL)
if match:
# 嘗試從第一個 { 到最後一個 } 提取(支援巢狀 JSON
# Gate 2: 舊 r"\{[^{}]*\}" 會拒絕巢狀物件,造成所有 Agent LLM 回應解析失敗
start = text.find("{")
end = text.rfind("}")
if start != -1 and end > start:
try:
return json.loads(match.group(0))
return json.loads(text[start:end + 1])
except json.JSONDecodeError:
pass

View File

@@ -0,0 +1,299 @@
"""
AWOOOI AIOps Phase 2 — Coordinator Agent指揮官
==================================================
職責:聚合所有 Agent 輸出,做最終決策
輸入DiagnosisReport + ActionPlan + ReviewVerdict + CriticReport
輸出DecisionPackagerecommended_action + confidence + requires_human_approval
聚合邏輯:
1. Reviewer REJECT → blocked_actions 全部禁止執行,強制人工審核
2. Reviewer REQUEST_REVISION → 過濾高 blast_radius 方案,使用 safe_candidates
3. Critic 有 critical challenge → confidence 降低 CRITIC_PENALTY
4. 全 Agent degraded → requires_human_approval = True安全第一
5. Diagnostician ABSTAIN 且無有效假設 → requires_human_approval = True
6. 最終 confidence < HUMAN_ESCALATION_THRESHOLD → requires_human_approval = True
ADR-082: Phase 2 多 Agent 協作
2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 2 初始建立
"""
from __future__ import annotations
import time
from typing import Any
import structlog
from src.agents.base import BaseAgent
from src.agents.protocol import (
ActionPlan,
AgentRole,
AgentSessionStatus,
AgentVote,
CriticReport,
DecisionPackage,
DiagnosisReport,
ReviewVerdict,
)
logger = structlog.get_logger(__name__)
# confidence 低於此閾值 → 強制人工審核
HUMAN_ESCALATION_THRESHOLD = 0.4
# Critic critical challenge 懲罰係數
CRITIC_PENALTY = 0.3
class CoordinatorAgent(BaseAgent):
"""
Coordinator Agent — 指揮官(最終決策者)
Usage:
agent = CoordinatorAgent()
package = await agent.run(diagnosis, plan, verdict, critic)
"""
AGENT_NAME = AgentRole.COORDINATOR.value
AGENT_DESCRIPTION = (
"Final decision synthesizer. Aggregates all agent outputs into "
"a single actionable DecisionPackage."
)
async def run(
self,
diagnosis: DiagnosisReport,
plan: ActionPlan,
verdict: ReviewVerdict,
critic: CriticReport,
) -> DecisionPackage:
"""
聚合所有 Agent 輸出,決定最終行動。
Args:
diagnosis: Diagnostician 輸出
plan: Solver 輸出
verdict: Reviewer 安全審查結果
critic: Critic 批判性審查結果
Returns:
DecisionPackage不熔斷Coordinator 必須輸出決策)
"""
start_ms = int(time.monotonic() * 1000)
package = self._synthesize(diagnosis, plan, verdict, critic)
package.latency_ms = int(time.monotonic() * 1000) - start_ms
logger.info(
"coordinator_done",
recommended_action=package.recommended_action,
confidence=package.confidence,
requires_human=package.requires_human_approval,
all_degraded=package.all_agents_degraded,
session_status=package.session_status,
latency_ms=package.latency_ms,
)
return package
def _synthesize(
self,
diagnosis: DiagnosisReport,
plan: ActionPlan,
verdict: ReviewVerdict,
critic: CriticReport,
) -> DecisionPackage:
"""核心聚合邏輯(純函數,無 LLM不可失敗"""
top = plan.top_candidate
base_confidence = top.confidence if top else 0.0
selected = top
# ── 1. Reviewer REJECT → 最優先安全門 ──────────────────────────
if verdict.vote == AgentVote.REJECT:
return DecisionPackage(
recommended_action=None,
confidence=0.0,
requires_human_approval=True,
debate_summary=_build_summary(diagnosis, plan, verdict, critic),
session_status=AgentSessionStatus.COMPLETED,
latency_ms=0,
diagnosis=diagnosis,
action_plan=plan,
reviewer_verdict=verdict,
critic_report=critic,
blocked_reason=f"Reviewer 拒絕:{verdict.reason}",
)
# ── 2. Reviewer REQUEST_REVISION → 強制人工審核Solver 未修訂,不可自動執行)─
# Gate 2: REQUEST_REVISION 代表「請 Solver 重新設計方案」,此 Phase 無迭代機制
# → 保留 safe_candidates 供人工參考,但 requires_human_approval 必須 True
if verdict.vote == AgentVote.REQUEST_REVISION:
safe_candidates = [
c for c in plan.candidates
if c.action not in verdict.blocked_actions
]
selected = safe_candidates[0] if safe_candidates else None
base_confidence = selected.confidence if selected else 0.0
return DecisionPackage(
recommended_action=selected.action if selected else None,
confidence=base_confidence,
requires_human_approval=True,
debate_summary=_build_summary(diagnosis, plan, verdict, critic),
session_status=AgentSessionStatus.COMPLETED,
latency_ms=0,
diagnosis=diagnosis,
action_plan=plan,
reviewer_verdict=verdict,
critic_report=critic,
blocked_reason=f"Reviewer REQUEST_REVISION{verdict.reason}",
)
# ── 3. Critic critical challenge → 信心懲罰 ─────────────────────
adjusted_confidence = base_confidence
if critic.has_critical_challenge:
adjusted_confidence = max(0.0, base_confidence - CRITIC_PENALTY)
logger.info(
"coordinator_critic_penalty",
before=base_confidence,
after=adjusted_confidence,
)
# ── 4. 全 Agent 降級 → 強制人工 ──────────────────────────────────
# Gate 2: 原本遺漏 verdict.degradedReviewer 熔斷時 all_degraded 被低估
all_degraded = diagnosis.degraded and plan.degraded and verdict.degraded and critic.degraded
if all_degraded:
return DecisionPackage(
recommended_action=selected.action if selected else None,
confidence=adjusted_confidence,
requires_human_approval=True,
debate_summary=_build_summary(diagnosis, plan, verdict, critic),
session_status=AgentSessionStatus.DEGRADED,
latency_ms=0,
diagnosis=diagnosis,
action_plan=plan,
reviewer_verdict=verdict,
critic_report=critic,
blocked_reason="所有 Agent 皆降級,信心不可信,強制人工審核",
all_agents_degraded=True,
)
# ── 5. Diagnostician 無有效假設 → 強制人工 ───────────────────────
if not diagnosis.hypotheses or diagnosis.vote == AgentVote.ABSTAIN:
return DecisionPackage(
recommended_action=selected.action if selected else None,
confidence=adjusted_confidence,
requires_human_approval=True,
debate_summary=_build_summary(diagnosis, plan, verdict, critic),
session_status=AgentSessionStatus.DEGRADED,
latency_ms=0,
diagnosis=diagnosis,
action_plan=plan,
reviewer_verdict=verdict,
critic_report=critic,
blocked_reason="Diagnostician ABSTAIN感官情報不足需人工判斷根因",
)
# ── 6. confidence 低於閾值 → 強制人工 ────────────────────────────
if adjusted_confidence < HUMAN_ESCALATION_THRESHOLD:
return DecisionPackage(
recommended_action=selected.action if selected else None,
confidence=adjusted_confidence,
requires_human_approval=True,
debate_summary=_build_summary(diagnosis, plan, verdict, critic),
session_status=AgentSessionStatus.DEGRADED,
latency_ms=0,
diagnosis=diagnosis,
action_plan=plan,
reviewer_verdict=verdict,
critic_report=critic,
blocked_reason=(
f"信心 {adjusted_confidence:.0%} < 閾值 "
f"{HUMAN_ESCALATION_THRESHOLD:.0%},需人工確認"
),
)
# ── 7. 自動執行 ────────────────────────────────────────────────────
return DecisionPackage(
recommended_action=selected.action if selected else None,
confidence=adjusted_confidence,
requires_human_approval=False,
debate_summary=_build_summary(diagnosis, plan, verdict, critic),
session_status=AgentSessionStatus.COMPLETED,
latency_ms=0,
diagnosis=diagnosis,
action_plan=plan,
reviewer_verdict=verdict,
critic_report=critic,
)
def _build_prompt(self, _context: dict[str, Any]) -> str:
return "" # Coordinator 不使用 LLM純規則聚合
def _parse_response(self, _response: str) -> dict[str, Any]:
return {}
def analyze(self, context: dict[str, Any]) -> Any:
raise NotImplementedError("Use run() for Phase 2 agents")
# ─────────────────────────────────────────────────────────────────────────────
# Helpers
# ─────────────────────────────────────────────────────────────────────────────
def _build_summary(
diagnosis: DiagnosisReport,
plan: ActionPlan,
verdict: ReviewVerdict,
critic: CriticReport,
) -> str:
"""產生 debate_summary結構化文字限 1000 字)。"""
parts = []
top_h = diagnosis.top_hypothesis
if top_h:
parts.append(
f"診斷:{top_h.description[:200]}(信心 {top_h.confidence:.0%}"
f"{'降級' if diagnosis.degraded else '正常'}"
)
else:
parts.append("診斷無有效假設ABSTAIN")
top_c = plan.top_candidate
if top_c:
parts.append(
f"方案:{top_c.action[:100]}blast_radius={top_c.blast_radius}"
f"{'降級' if plan.degraded else '正常'}"
)
else:
parts.append("方案:無候選動作")
parts.append(
f"安全審查:{verdict.vote.value}{verdict.reason[:100]}"
f"{'降級' if verdict.degraded else '正常'}"
)
if critic.challenges:
top_ch = critic.challenges[0]
parts.append(
f"質疑:{top_ch.severity}{top_ch.argument[:150]}"
f"{critic.challenge_count} 項,"
f"{'降級' if critic.degraded else '正常'}"
)
else:
parts.append(f"質疑:無({'降級' if critic.degraded else '通過審查'}")
return "".join(parts)[:1000]
# ─────────────────────────────────────────────────────────────────────────────
# Singleton
# ─────────────────────────────────────────────────────────────────────────────
_agent: CoordinatorAgent | None = None
def get_coordinator_agent() -> CoordinatorAgent:
global _agent
if _agent is None:
_agent = CoordinatorAgent()
return _agent

View File

@@ -0,0 +1,237 @@
"""
AWOOOI AIOps Phase 2 — Critic Agent質疑者
=============================================
職責:刻意唱反調,防止幻覺與 echo chamber
輸入DiagnosisReport + ActionPlan兩者都看
輸出CriticReportchallenges[] 列表 + overall_assessment
設計原則:
1. Critic 的工作是找漏洞,不是說好話(防 sycophancy
2. prompt 強制要求批判性思維:「如果診斷是錯的,還有哪 3 種可能?」
3. challenge_count > 0 是 Phase 2 退出條件之一
4. Critic 連續 3 次找到 Diagnostician 嚴重漏洞 → 觸發 Diagnostician 狀態不穩Phase 4 實作)
5. 熔斷降級LLM 失敗 → 輸出空 challenges不阻塞 Coordinator
6. Critic 和 Reviewer 並行執行(都不阻塞對方)
ADR-082: Phase 2 多 Agent 協作
2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 2 初始建立
"""
from __future__ import annotations
import asyncio
import hashlib
import time
from typing import Any
import structlog
from src.agents.base import BaseAgent
from src.agents.protocol import (
ActionPlan,
AgentRole,
AgentVote,
Challenge,
CriticReport,
DiagnosisReport,
)
from src.services.sanitization_service import sanitize
logger = structlog.get_logger(__name__)
# Critic 挑戰數量上限(防止 LLM 生成無限質疑)
MAX_CHALLENGES = 5
class CriticAgent(BaseAgent):
"""
Critic Agent — 系統性懷疑論者
Usage:
agent = CriticAgent()
report = await agent.run(diagnosis, plan)
"""
AGENT_NAME = AgentRole.CRITIC.value
AGENT_DESCRIPTION = (
"Devil's advocate. Challenges diagnosis and proposed actions to prevent "
"hallucination and echo chamber effects."
)
async def run(
self,
diagnosis: DiagnosisReport,
plan: ActionPlan,
timeout_sec: float = 5.0,
) -> CriticReport:
"""
批判性審查診斷和方案。
Args:
diagnosis: Diagnostician 輸出
plan: Solver 輸出
timeout_sec: 熔斷超時
Returns:
CriticReport熔斷時 degraded=Truechallenges 為空)
"""
start_ms = int(time.monotonic() * 1000)
try:
report = await asyncio.wait_for(
self._critique(diagnosis, plan),
timeout=timeout_sec,
)
report.latency_ms = int(time.monotonic() * 1000) - start_ms
logger.info(
"critic_done",
challenges=report.challenge_count,
has_critical=report.has_critical_challenge,
vote=report.vote,
latency_ms=report.latency_ms,
)
return report
except asyncio.TimeoutError:
latency = int(time.monotonic() * 1000) - start_ms
logger.warning("critic_timeout", timeout_sec=timeout_sec)
return self._degraded_report(latency, "timeout")
except Exception:
latency = int(time.monotonic() * 1000) - start_ms
logger.exception("critic_error")
return self._degraded_report(latency, "error")
async def _critique(
self,
diagnosis: DiagnosisReport,
plan: ActionPlan,
) -> CriticReport:
"""LLM 批判性推理。"""
top_hypothesis = diagnosis.top_hypothesis
top_candidate = plan.top_candidate
prompt = self._build_prompt({
"hypothesis": top_hypothesis.description if top_hypothesis else "(無假設)",
"action": top_candidate.action if top_candidate else "(無方案)",
"confidence": top_hypothesis.confidence if top_hypothesis else 0.0,
})
from src.services.openclaw import get_openclaw
openclaw = get_openclaw()
response_text, _provider, success = await openclaw.call(prompt)
if not success or not response_text:
return self._degraded_report(0, "llm_failed")
parsed = self._parse_response(sanitize(response_text, "critic_output"))
challenges = _extract_challenges(parsed)
# 有 critical challenge → vote = REJECT
vote = AgentVote.REJECT if any(c.severity == "critical" for c in challenges) else AgentVote.APPROVE
return CriticReport(
challenges=challenges,
overall_assessment=str(parsed.get("overall_assessment", ""))[:1000],
latency_ms=0,
vote=vote,
)
def _build_prompt(self, context: dict[str, Any]) -> str:
return f"""你是 AWOOOI SRE 系統的質疑者 AgentCritic
你的工作是:找出診斷和方案的弱點。不是說好話,是找漏洞。
當前診斷:{context.get("hypothesis", "")}
當前方案:{context.get("action", "")}
診斷信心:{context.get("confidence", 0.0):.0%}
必須回答以下問題(每個問題產出一個 challenge
1. 如果這個診斷是錯的,還有哪些可能的根因?
2. 這個方案有什麼副作用或風險?
3. 是否有更好的替代方案被忽略了?
每個 challenge 標記嚴重度:
- "minor":小瑕疵,不影響執行
- "major":值得 Coordinator 考慮,但不是阻擋條件
- "critical":嚴重邏輯漏洞,必須阻止此方案執行
以 JSON 回覆:
{{
"challenges": [
{{
"target": "diagnosis",
"argument": "可能是 OOM 但也可能是 code bug需要看 GC logs 確認",
"severity": "major"
}}
],
"overall_assessment": "診斷可信但方案風險偏高"
}}"""
def _parse_response(self, response: str) -> dict[str, Any]:
return self._extract_json(response)
def analyze(self, context: dict[str, Any]) -> Any:
raise NotImplementedError("Use run() for Phase 2 agents")
def _degraded_report(
self,
latency_ms: int,
reason: str = "unknown",
) -> CriticReport:
"""熔斷降級:輸出空 challenges不阻塞 Coordinator"""
return CriticReport(
challenges=[],
overall_assessment=f"[降級] Critic LLM 失敗({reason}),跳過批判性審查",
latency_ms=latency_ms,
vote=AgentVote.ABSTAIN,
degraded=True,
)
# ─────────────────────────────────────────────────────────────────────────────
# Helpers
# ─────────────────────────────────────────────────────────────────────────────
def _extract_challenges(parsed: dict[str, Any]) -> list[Challenge]:
"""從 LLM 解析結果提取 challenges按嚴重度排序"""
raw = parsed.get("challenges", [])
challenges = []
severity_order = {"critical": 0, "major": 1, "minor": 2}
for item in raw:
if not isinstance(item, dict):
continue
c = Challenge(
target=str(item.get("target", "unknown"))[:50],
argument=str(item.get("argument", ""))[:500],
severity=item.get("severity", "minor") if item.get("severity") in severity_order else "minor",
)
challenges.append(c)
challenges.sort(key=lambda c: severity_order.get(c.severity, 2))
return challenges[:MAX_CHALLENGES]
def compute_input_hash(diagnosis: DiagnosisReport, plan: ActionPlan) -> str:
key = diagnosis.evidence_snapshot_id + (
diagnosis.top_hypothesis.description if diagnosis.top_hypothesis else ""
) + (
plan.top_candidate.action if plan.top_candidate else ""
)
return hashlib.sha256(key.encode()).hexdigest()[:16]
# ─────────────────────────────────────────────────────────────────────────────
# Singleton
# ─────────────────────────────────────────────────────────────────────────────
_agent: CriticAgent | None = None
def get_critic_agent() -> CriticAgent:
global _agent
if _agent is None:
_agent = CriticAgent()
return _agent

View File

@@ -0,0 +1,246 @@
"""
AWOOOI AIOps Phase 2 — Diagnostician Agent偵探
==================================================
職責RCA 根因分析
輸入EvidenceSnapshot8D 感官情報)
輸出DiagnosisReport多根因假設含 confidence + evidence_chain
設計原則:
1. 只做診斷不提解法Solver 的工作)
2. top-1 confidence < 0.4 → vote = ABSTAIN情報不足回傳 Coordinator 判斷)
3. 熔斷降級LLM 失敗 / 超時 → rule-based mock以 alert_category 作簡單假設)
4. 所有 LLM 輸出過 SanitizationService防 Prompt Injection
ADR-082: Phase 2 多 Agent 協作
2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 2 初始建立
"""
from __future__ import annotations
import asyncio
import hashlib
import json
import time
from typing import TYPE_CHECKING, Any
import structlog
from src.agents.base import BaseAgent, AgentResult, AgentStatus
from src.agents.protocol import (
AgentRole,
AgentVote,
DiagnosisReport,
Hypothesis,
)
from src.services.sanitization_service import sanitize
if TYPE_CHECKING:
from src.services.evidence_snapshot import EvidenceSnapshot
logger = structlog.get_logger(__name__)
# 每個假設的最大 evidence chain 長度(防超 token
MAX_EVIDENCE_CHAIN = 5
# Confidence 閾值 — 低於此值 vote = ABSTAIN
ABSTAIN_CONFIDENCE_THRESHOLD = 0.4
class DiagnosticianAgent(BaseAgent):
"""
Diagnostician Agent — RCA 根因分析偵探
Usage:
agent = DiagnosticianAgent()
report = await agent.run(snapshot)
"""
AGENT_NAME = AgentRole.DIAGNOSTICIAN.value
AGENT_DESCRIPTION = "Root Cause Analysis specialist. Produces multiple hypotheses with confidence scores."
async def run(
self,
snapshot: "EvidenceSnapshot",
timeout_sec: float = 5.0,
) -> DiagnosisReport:
"""
執行根因分析。
Args:
snapshot: Phase 1 感官快照
timeout_sec: 熔斷超時(預設 5s
Returns:
DiagnosisReport熔斷時 degraded=Truevote=ABSTAIN
"""
start_ms = int(time.monotonic() * 1000)
try:
report = await asyncio.wait_for(
self._analyze(snapshot),
timeout=timeout_sec,
)
report.latency_ms = int(time.monotonic() * 1000) - start_ms
logger.info(
"diagnostician_done",
snapshot_id=snapshot.snapshot_id,
hypotheses=len(report.hypotheses),
top_confidence=report.top_confidence,
vote=report.vote,
latency_ms=report.latency_ms,
)
return report
except asyncio.TimeoutError:
latency = int(time.monotonic() * 1000) - start_ms
logger.warning("diagnostician_timeout", timeout_sec=timeout_sec)
return self._degraded_report(snapshot, latency, reason="timeout")
except Exception:
latency = int(time.monotonic() * 1000) - start_ms
logger.exception("diagnostician_error")
return self._degraded_report(snapshot, latency, reason="error")
async def _analyze(self, snapshot: "EvidenceSnapshot") -> DiagnosisReport:
"""核心 LLM 分析邏輯。"""
prompt = self._build_prompt({"evidence_summary": snapshot.evidence_summary or ""})
from src.services.openclaw import get_openclaw
openclaw = get_openclaw()
response_text, _provider, success = await openclaw.call(prompt)
if not success or not response_text:
return self._degraded_report(snapshot, 0, reason="llm_failed")
parsed = self._parse_response(sanitize(response_text, "diagnostician_output"))
hypotheses = _extract_hypotheses(parsed)
vote = AgentVote.APPROVE
if not hypotheses or hypotheses[0].confidence < ABSTAIN_CONFIDENCE_THRESHOLD:
vote = AgentVote.ABSTAIN
return DiagnosisReport(
hypotheses=hypotheses,
evidence_snapshot_id=snapshot.snapshot_id or "",
latency_ms=0, # 由 run() 覆蓋
vote=vote,
)
def _build_prompt(self, context: dict[str, Any]) -> str:
evidence = context.get("evidence_summary", "(無感官情報)")
return f"""你是 AWOOOI SRE 系統的偵探 Agent專職根因分析Root Cause Analysis
你的唯一工作:根據以下感官情報,提出 2-3 個根因假設hypotheses
不要提修復方案,那是 Solver 的工作。
每個假設必須:
1. 有 confidence0.0-1.0
2. 列出支持此假設的 evidence key{MAX_EVIDENCE_CHAIN} 個)
3. 有 categoryK8s Pod / HostDisk / NetworkLatency / DatabaseConnection / 等)
如果感官情報嚴重不足(所有假設 confidence < 0.4),說明原因。
---
感官情報:
{evidence}
---
以 JSON 回覆(不要加任何解釋):
{{
"hypotheses": [
{{
"description": "假設描述",
"confidence": 0.85,
"evidence_chain": ["k8s_state.pod_status", "recent_logs.oom_signal"],
"category": "KubePodOOM"
}}
]
}}"""
def _parse_response(self, response: str) -> dict[str, Any]:
return self._extract_json(response)
def analyze(self, context: dict[str, Any]) -> Any:
"""BaseAgent 抽象方法 — Phase 2 改用 run() 入口。"""
raise NotImplementedError("Use run() for Phase 2 agents")
def _degraded_report(
self,
snapshot: "EvidenceSnapshot",
latency_ms: int,
reason: str = "unknown",
) -> DiagnosisReport:
"""熔斷降級rule-based mock用 alert_category 作簡單假設)"""
category = _guess_category_from_snapshot(snapshot)
return DiagnosisReport(
hypotheses=[
Hypothesis(
description=f"[降級] 無法完成 LLM 分析(原因: {reason})。基於告警類別推測: {category}",
confidence=0.2,
evidence_chain=[],
category=category,
)
],
evidence_snapshot_id=snapshot.snapshot_id or "",
latency_ms=latency_ms,
vote=AgentVote.ABSTAIN,
degraded=True,
)
# ─────────────────────────────────────────────────────────────────────────────
# Helpers
# ─────────────────────────────────────────────────────────────────────────────
def _extract_hypotheses(parsed: dict[str, Any]) -> list[Hypothesis]:
"""從 LLM 解析結果提取假設列表(按信心降序)。"""
raw = parsed.get("hypotheses", [])
hypotheses = []
for item in raw:
if not isinstance(item, dict):
continue
h = Hypothesis(
description=str(item.get("description", ""))[:500],
confidence=float(item.get("confidence", 0.0)),
evidence_chain=item.get("evidence_chain", [])[:MAX_EVIDENCE_CHAIN],
category=str(item.get("category", "")),
)
hypotheses.append(h)
hypotheses.sort(key=lambda h: h.confidence, reverse=True)
return hypotheses
def _guess_category_from_snapshot(snapshot: "EvidenceSnapshot") -> str:
"""降級時從 snapshot 猜測告警類別(最粗粒度兜底)。"""
summary = (snapshot.evidence_summary or "").lower()
if "oom" in summary or "memory" in summary:
return "KubePodOOM"
if "crashloop" in summary:
return "KubePodCrashLoop"
if "disk" in summary:
return "HostDiskUsage"
if "cpu" in summary:
return "HostCpuHigh"
if "network" in summary or "timeout" in summary:
return "NetworkLatency"
return "Unknown"
def compute_input_hash(snapshot: "EvidenceSnapshot") -> str:
"""計算 Diagnostician 輸入的 fingerprint用於 AgentSession input_hash"""
key = (snapshot.snapshot_id or "") + (snapshot.evidence_summary or "")[:100]
return hashlib.sha256(key.encode()).hexdigest()[:16]
# ─────────────────────────────────────────────────────────────────────────────
# Singleton
# ─────────────────────────────────────────────────────────────────────────────
_agent: DiagnosticianAgent | None = None
def get_diagnostician_agent() -> DiagnosticianAgent:
global _agent
if _agent is None:
_agent = DiagnosticianAgent()
return _agent

View File

@@ -0,0 +1,253 @@
"""
AWOOOI AIOps Phase 2 — 多 Agent 協作訊息協定
==============================================
定義 5 個 Agent 間傳遞的不可變資料型別。
設計原則:
1. 每個 Agent 有明確的 Input / Output 型別(不共用 dict
2. 所有型別都是 dataclass快速、可序列化、無外部依賴
3. 降級 / 棄權用明確 AgentVote.ABSTAIN不用 None 代替
4. 全程 immutable — Agent 不得修改彼此的輸出(防 prompt 污染)
ADR-082: 多 Agent 協作架構Phase 2
2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 2 初始建立
"""
from __future__ import annotations
from dataclasses import dataclass, field
from enum import Enum
from typing import Any
# ─────────────────────────────────────────────────────────────────────────────
# Enums
# ─────────────────────────────────────────────────────────────────────────────
class AgentRole(str, Enum):
"""Phase 2 五角色標識"""
DIAGNOSTICIAN = "diagnostician"
SOLVER = "solver"
REVIEWER = "reviewer"
CRITIC = "critic"
COORDINATOR = "coordinator"
class AgentVote(str, Enum):
"""Agent 投票結果"""
APPROVE = "approve"
REJECT = "reject"
REQUEST_REVISION = "request_revision"
ABSTAIN = "abstain" # 熔斷 / 超時 / 無足夠資訊
DEGRADED = "degraded" # 降級路徑rule-based mock
class AgentSessionStatus(str, Enum):
"""AgentSession 整體狀態"""
RUNNING = "running"
COMPLETED = "completed"
DEGRADED = "degraded" # 部分 Agent 熔斷但仍完成
FAILED = "failed" # Coordinator 無法輸出任何結論
TIMEOUT = "timeout" # 全流程 > 30s
# ─────────────────────────────────────────────────────────────────────────────
# Diagnostician Output
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class Hypothesis:
"""單一根因假設"""
description: str
confidence: float # 0.0 ~ 1.0
evidence_chain: list[str] # 支持此假設的 evidence key
category: str = "" # alert_categoryKubePod / HostDisk 等)
@dataclass
class DiagnosisReport:
"""
Diagnostician Agent 輸出
包含多個根因假設(按信心排序),
top-1 confidence < 0.4 觸發 Coordinator 回退 Investigator 重抓。
"""
hypotheses: list[Hypothesis]
evidence_snapshot_id: str
latency_ms: int
vote: AgentVote = AgentVote.APPROVE # 資訊足夠 = APPROVE不足 = ABSTAIN
degraded: bool = False # 熔斷降級標記
@property
def top_hypothesis(self) -> Hypothesis | None:
"""最高信心假設"""
return self.hypotheses[0] if self.hypotheses else None
@property
def top_confidence(self) -> float:
return self.top_hypothesis.confidence if self.top_hypothesis else 0.0
# ─────────────────────────────────────────────────────────────────────────────
# Solver Output
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class CandidateAction:
"""單一候選修復動作"""
action: str # 動作描述e.g. "restart_service:awoooi-api"
blast_radius: int # 0-100影響範圍評分
rollback_cost: int # 0-100回滾難度
confidence: float # 0.0 ~ 1.0
rationale: str = "" # 為什麼選此方案
@dataclass
class ActionPlan:
"""
Solver Agent 輸出
對每個根因假設提出 ≥1 個候選方案(含 blast_radius / rollback_cost
blast_radius > 50 → Reviewer 必須標 `request_revision`。
"""
candidates: list[CandidateAction]
diagnosis_report: DiagnosisReport
latency_ms: int
vote: AgentVote = AgentVote.APPROVE
degraded: bool = False
@property
def top_candidate(self) -> CandidateAction | None:
"""最高信心候選方案"""
return max(self.candidates, key=lambda c: c.confidence) if self.candidates else None
# ─────────────────────────────────────────────────────────────────────────────
# Reviewer Output
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class ReviewVerdict:
"""
Reviewer Agent 輸出(安全審查)
硬核拒絕 HARD_RULES 觸碰動作delete node / DROP TABLE / force push 等)。
vote = REJECT 時Coordinator 不得執行任何候選方案。
"""
vote: AgentVote
reason: str
blocked_actions: list[str] # 被拒絕的動作清單
safe_actions: list[str] # 通過安全審查的動作
latency_ms: int
degraded: bool = False
@property
def is_safe(self) -> bool:
return self.vote == AgentVote.APPROVE and bool(self.safe_actions)
# ─────────────────────────────────────────────────────────────────────────────
# Critic Output
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class Challenge:
"""Critic 的單一質疑"""
target: str # "diagnosis" | "action:{action_str}"
argument: str # 質疑的具體理由
severity: str # "minor" | "major" | "critical"
@dataclass
class CriticReport:
"""
Critic Agent 輸出(刻意唱反調)
challenge_count > 0 是 Phase 2 退出條件之一。
major/critical challenge 觸發 Coordinator 降低對 Solver 方案的信心。
"""
challenges: list[Challenge]
overall_assessment: str
latency_ms: int
vote: AgentVote = AgentVote.APPROVE # APPROVE=無重大反對REJECT=有 critical challenge
degraded: bool = False
@property
def challenge_count(self) -> int:
return len(self.challenges)
@property
def has_critical_challenge(self) -> bool:
return any(c.severity == "critical" for c in self.challenges)
# ─────────────────────────────────────────────────────────────────────────────
# Coordinator Output
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class DecisionPackage:
"""
Coordinator Agent 輸出(最終決策包)
包含:
- recommended_action: 最終推薦動作None = 棄權 / 升級人工)
- confidence: 綜合信心Solver × Reviewer × Critic 加權)
- requires_human_approval: 是否需要人工審核
- debate_summary: 辯證歷程摘要(供 Audit Trail + 學習閉環)
- session_status: 整體辯證狀態
"""
recommended_action: str | None
confidence: float
requires_human_approval: bool
debate_summary: str
session_status: AgentSessionStatus
latency_ms: int
# 保留各 Agent 原始輸出(供學習閉環查詢)
diagnosis: DiagnosisReport | None = None
action_plan: ActionPlan | None = None
reviewer_verdict: ReviewVerdict | None = None
critic_report: CriticReport | None = None
# 棄選方案(含原因)
rejected_actions: list[dict[str, Any]] = field(default_factory=list)
# 阻擋原因requires_human_approval=True 時說明)
blocked_reason: str | None = None
# 全部 Agent 都降級(更嚴格的人工審核信號)
all_agents_degraded: bool = False
@property
def is_actionable(self) -> bool:
"""可以執行(有推薦動作且信心 > 0.4 且 Reviewer 通過)"""
if not self.recommended_action:
return False
if self.confidence < 0.4:
return False
if self.reviewer_verdict and self.reviewer_verdict.vote == AgentVote.REJECT:
return False
return True
# ─────────────────────────────────────────────────────────────────────────────
# Agent Session RecordDB 寫入用)
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class AgentTurn:
"""
單次 Agent 發言記錄
寫入 `agent_sessions` 表的一行,
session_id + agent_role 唯一確定一次辯證發言。
"""
session_id: str
incident_id: str
agent_role: AgentRole
input_hash: str # sha256(input_json)[:16]
output_json: dict[str, Any] # Agent 原始輸出
latency_ms: int
vote: AgentVote
degraded: bool = False

View File

@@ -0,0 +1,235 @@
"""
AWOOOI AIOps Phase 2 — Reviewer Agent安全官
================================================
職責:安全審查 + 可行性驗證
輸入ActionPlan來自 Solver
輸出ReviewVerdictapprove / reject / request_revision
設計原則:
1. 硬核拒絕 HARD_RULES 觸碰動作delete node / DROP TABLE / force push 等)
2. blast_radius > 50 → 自動 request_revision不 reject讓 Solver 調整方案)
3. blast_radius > 80 → reject風險太高
4. 熔斷降級LLM 失敗 → 保守降級APPROVE 低 blast_radiusREJECT 高 blast_radius
5. Reviewer 的 REJECT 是最高優先Coordinator 不得執行任何被拒絕的方案
HARD_RULES 觸碰清單ADR-082 §安全原則):
- kubectl delete node / kubectl delete --all
- DROP TABLE / DELETE FROM無 WHERE
- rm -rf /
- force push to main
- kubectl exec 執行任意 shell
ADR-082: Phase 2 多 Agent 協作
2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 2 初始建立
"""
from __future__ import annotations
import asyncio
import hashlib
import re
import time
from typing import Any
import structlog
from src.agents.base import BaseAgent
from src.agents.protocol import (
ActionPlan,
AgentRole,
AgentVote,
CandidateAction,
ReviewVerdict,
)
from src.services.sanitization_service import sanitize
logger = structlog.get_logger(__name__)
# blast_radius 閾值
BLAST_REQUEST_REVISION_THRESHOLD = 50 # > 50 → request_revision
BLAST_REJECT_THRESHOLD = 80 # > 80 → reject太危險
# 硬核拒絕 patternHARD_RULES 觸碰)
_HARD_BLOCK_PATTERNS = [
re.compile(r"kubectl\s+delete\s+node", re.IGNORECASE),
re.compile(r"kubectl\s+delete\s+--all", re.IGNORECASE),
re.compile(r"\bDROP\s+TABLE\b", re.IGNORECASE),
re.compile(r"\bDELETE\s+FROM\b(?!.*\bWHERE\b)", re.IGNORECASE | re.DOTALL), # Gate 2: lookahead 必須在 FROM 後而非 .* 後
re.compile(r"rm\s+-rf\s+/", re.IGNORECASE),
re.compile(r"force.{0,5}push.{0,20}main", re.IGNORECASE),
]
class ReviewerAgent(BaseAgent):
"""
Reviewer Agent — 安全審查官
Usage:
agent = ReviewerAgent()
verdict = await agent.run(action_plan)
"""
AGENT_NAME = AgentRole.REVIEWER.value
AGENT_DESCRIPTION = "Safety and feasibility reviewer. Hard-blocks HARD_RULES violations."
async def run(
self,
plan: ActionPlan,
timeout_sec: float = 5.0,
) -> ReviewVerdict:
"""
審查方案安全性。
Args:
plan: Solver 輸出的方案
timeout_sec: 熔斷超時
Returns:
ReviewVerdict熔斷時 degraded=True使用 static rule 降級)
"""
start_ms = int(time.monotonic() * 1000)
# 1. 硬核靜態檢查(不依賴 LLM— 先於超時保護
hard_blocked = [
c.action for c in plan.candidates
if _is_hard_blocked(c.action)
]
if hard_blocked:
latency = int(time.monotonic() * 1000) - start_ms
logger.warning("reviewer_hard_block", blocked=hard_blocked)
return ReviewVerdict(
vote=AgentVote.REJECT,
reason=f"HARD_RULES 觸碰:{hard_blocked}",
blocked_actions=hard_blocked,
safe_actions=[],
latency_ms=latency,
)
try:
verdict = await asyncio.wait_for(
self._review(plan),
timeout=timeout_sec,
)
verdict.latency_ms = int(time.monotonic() * 1000) - start_ms
logger.info(
"reviewer_done",
vote=verdict.vote,
blocked=len(verdict.blocked_actions),
safe=len(verdict.safe_actions),
latency_ms=verdict.latency_ms,
)
return verdict
except asyncio.TimeoutError:
latency = int(time.monotonic() * 1000) - start_ms
logger.warning("reviewer_timeout", timeout_sec=timeout_sec)
return self._degraded_verdict(plan, latency, "timeout")
except Exception:
latency = int(time.monotonic() * 1000) - start_ms
logger.exception("reviewer_error")
return self._degraded_verdict(plan, latency, "error")
async def _review(self, plan: ActionPlan) -> ReviewVerdict:
"""LLM 審查 + blast_radius 靜態規則組合。"""
# 靜態 blast_radius 規則(不需要 LLM
high_blast = [c for c in plan.candidates if c.blast_radius > BLAST_REJECT_THRESHOLD]
mid_blast = [c for c in plan.candidates if BLAST_REQUEST_REVISION_THRESHOLD < c.blast_radius <= BLAST_REJECT_THRESHOLD]
safe_candidates = [c for c in plan.candidates if c.blast_radius <= BLAST_REQUEST_REVISION_THRESHOLD]
if high_blast:
return ReviewVerdict(
vote=AgentVote.REJECT,
reason=f"blast_radius > {BLAST_REJECT_THRESHOLD},風險過高",
blocked_actions=[c.action for c in high_blast],
safe_actions=[c.action for c in safe_candidates],
latency_ms=0,
)
if mid_blast:
return ReviewVerdict(
vote=AgentVote.REQUEST_REVISION,
reason=f"blast_radius > {BLAST_REQUEST_REVISION_THRESHOLD},請 Solver 提供影響更小的方案",
blocked_actions=[c.action for c in mid_blast],
safe_actions=[c.action for c in safe_candidates],
latency_ms=0,
)
# 低 blast_radius → LLM 補充可行性審查
if safe_candidates:
return ReviewVerdict(
vote=AgentVote.APPROVE,
reason="blast_radius 符合安全閾值,靜態規則通過",
blocked_actions=[],
safe_actions=[c.action for c in safe_candidates],
latency_ms=0,
)
return ReviewVerdict(
vote=AgentVote.ABSTAIN,
reason="無候選方案可審查",
blocked_actions=[],
safe_actions=[],
latency_ms=0,
)
def _build_prompt(self, context: dict[str, Any]) -> str:
return "" # Phase 2 Reviewer 使用靜態規則LLM 備用
def _parse_response(self, response: str) -> dict[str, Any]:
return self._extract_json(response)
def analyze(self, context: dict[str, Any]) -> Any:
raise NotImplementedError("Use run() for Phase 2 agents")
def _degraded_verdict(
self,
plan: ActionPlan,
latency_ms: int,
reason: str,
) -> ReviewVerdict:
"""
熔斷降級:保守策略
- blast_radius <= 30 → APPROVE低風險兜底
- blast_radius > 30 → REQUEST_REVISION高風險不敢承擔
"""
safe = [c.action for c in plan.candidates if c.blast_radius <= 30]
risky = [c.action for c in plan.candidates if c.blast_radius > 30]
vote = AgentVote.APPROVE if safe and not risky else AgentVote.REQUEST_REVISION
return ReviewVerdict(
vote=vote,
reason=f"[降級] Reviewer LLM 失敗({reason}),使用保守靜態降級規則",
blocked_actions=risky,
safe_actions=safe,
latency_ms=latency_ms,
degraded=True,
)
# ─────────────────────────────────────────────────────────────────────────────
# Helpers
# ─────────────────────────────────────────────────────────────────────────────
def _is_hard_blocked(action: str) -> bool:
"""檢查動作是否觸碰 HARD_RULES靜態 pattern不依賴 LLM"""
return any(p.search(action) for p in _HARD_BLOCK_PATTERNS)
def compute_input_hash(plan: ActionPlan) -> str:
key = plan.diagnosis_report.evidence_snapshot_id + str([c.action for c in plan.candidates])
return hashlib.sha256(key.encode()).hexdigest()[:16]
# ─────────────────────────────────────────────────────────────────────────────
# Singleton
# ─────────────────────────────────────────────────────────────────────────────
_agent: ReviewerAgent | None = None
def get_reviewer_agent() -> ReviewerAgent:
global _agent
if _agent is None:
_agent = ReviewerAgent()
return _agent

View File

@@ -0,0 +1,263 @@
"""
AWOOOI AIOps Phase 2 — Solver Agent軍師
===========================================
職責:對每個根因假設產修復方案
輸入DiagnosisReport來自 Diagnostician
輸出ActionPlan候選動作含 blast_radius + rollback_cost + confidence
設計原則:
1. 每個 Hypothesis 至少產 1 個 CandidateAction
2. blast_radius 評分影響 Reviewer 的審查嚴格度
3. blast_radius > 50 → Reviewer 必須 request_revision
4. 熔斷降級LLM 失敗 → rule-based mock基於 category 推 RESTART 為兜底動作)
5. Solver 不直接觸碰執行層Coordinator 的工作)
ADR-082: Phase 2 多 Agent 協作
2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 2 初始建立
"""
from __future__ import annotations
import asyncio
import hashlib
import time
from typing import Any
import structlog
from src.agents.base import BaseAgent
from src.agents.protocol import (
ActionPlan,
AgentRole,
AgentVote,
CandidateAction,
DiagnosisReport,
)
from src.services.sanitization_service import sanitize
logger = structlog.get_logger(__name__)
class SolverAgent(BaseAgent):
"""
Solver Agent — 修復方案軍師
Usage:
agent = SolverAgent()
plan = await agent.run(diagnosis_report)
"""
AGENT_NAME = AgentRole.SOLVER.value
AGENT_DESCRIPTION = "Remediation plan specialist. Produces candidate actions with blast radius scoring."
async def run(
self,
diagnosis: DiagnosisReport,
timeout_sec: float = 5.0,
) -> ActionPlan:
"""
根據診斷報告產出修復計畫。
Args:
diagnosis: Diagnostician 輸出
timeout_sec: 熔斷超時
Returns:
ActionPlan熔斷時 degraded=True
"""
start_ms = int(time.monotonic() * 1000)
# 若 Diagnostician 已棄權Solver 也應棄權(無論降級假設是否存在)
# Gate 2: 原條件 `and not diagnosis.hypotheses` 誤放行降級的 confidence=0.2 假設
if diagnosis.vote == AgentVote.ABSTAIN:
return ActionPlan(
candidates=[],
diagnosis_report=diagnosis,
latency_ms=0,
vote=AgentVote.ABSTAIN,
degraded=diagnosis.degraded,
)
try:
plan = await asyncio.wait_for(
self._solve(diagnosis),
timeout=timeout_sec,
)
plan.latency_ms = int(time.monotonic() * 1000) - start_ms
logger.info(
"solver_done",
candidates=len(plan.candidates),
vote=plan.vote,
latency_ms=plan.latency_ms,
)
return plan
except asyncio.TimeoutError:
latency = int(time.monotonic() * 1000) - start_ms
logger.warning("solver_timeout", timeout_sec=timeout_sec)
return self._degraded_plan(diagnosis, latency, "timeout")
except Exception:
latency = int(time.monotonic() * 1000) - start_ms
logger.exception("solver_error")
return self._degraded_plan(diagnosis, latency, "error")
async def _solve(self, diagnosis: DiagnosisReport) -> ActionPlan:
"""核心 LLM 推理邏輯。"""
top = diagnosis.top_hypothesis
if not top:
return ActionPlan(
candidates=[],
diagnosis_report=diagnosis,
latency_ms=0,
vote=AgentVote.ABSTAIN,
)
prompt = self._build_prompt({
"hypothesis": top.description,
"category": top.category,
"confidence": top.confidence,
})
from src.services.openclaw import get_openclaw
openclaw = get_openclaw()
response_text, _provider, success = await openclaw.call(prompt)
if not success or not response_text:
return self._degraded_plan(diagnosis, 0, "llm_failed")
parsed = self._parse_response(sanitize(response_text, "solver_output"))
candidates = _extract_candidates(parsed)
if not candidates:
return self._degraded_plan(diagnosis, 0, "no_candidates")
return ActionPlan(
candidates=candidates,
diagnosis_report=diagnosis,
latency_ms=0,
vote=AgentVote.APPROVE,
)
def _build_prompt(self, context: dict[str, Any]) -> str:
return f"""你是 AWOOOI SRE 系統的軍師 Agent專職修復方案設計。
根因假設:{context.get("hypothesis", "")}
告警類別:{context.get("category", "")}
診斷信心:{context.get("confidence", 0.0):.0%}
你的工作:為此根因提出 1-3 個修復候選方案。
每個方案必須評估:
- blast_radius0-100影響範圍越高 = 風險越大)
- rollback_cost0-100回滾難度越高 = 越難還原)
blast_radius 參考:
- 重啟單一 Pod = 10
- 重啟 Deployment = 25
- 調整 HPA = 30
- 刪除 StatefulSet = 80
- 清除 PVC = 95
以 JSON 回覆:
{{
"candidates": [
{{
"action": "restart_service:awoooi-api",
"blast_radius": 15,
"rollback_cost": 5,
"confidence": 0.8,
"rationale": "重啟可清除 OOM 導致的記憶體碎片化"
}}
]
}}"""
def _parse_response(self, response: str) -> dict[str, Any]:
return self._extract_json(response)
def analyze(self, context: dict[str, Any]) -> Any:
raise NotImplementedError("Use run() for Phase 2 agents")
def _degraded_plan(
self,
diagnosis: DiagnosisReport,
latency_ms: int,
reason: str = "unknown",
) -> ActionPlan:
"""熔斷降級rule-based mock依 category 推 RESTART 兜底)"""
category = diagnosis.top_hypothesis.category if diagnosis.top_hypothesis else "Unknown"
fallback_action = _default_action_for_category(category)
return ActionPlan(
candidates=[
CandidateAction(
action=fallback_action,
blast_radius=20,
rollback_cost=5,
confidence=0.2,
rationale=f"[降級] LLM 分析失敗({reason}),使用類別 {category} 的預設兜底動作",
)
],
diagnosis_report=diagnosis,
latency_ms=latency_ms,
vote=AgentVote.DEGRADED,
degraded=True,
)
# ─────────────────────────────────────────────────────────────────────────────
# Helpers
# ─────────────────────────────────────────────────────────────────────────────
def _extract_candidates(parsed: dict[str, Any]) -> list[CandidateAction]:
"""從 LLM 解析結果提取候選方案(按信心降序)。"""
raw = parsed.get("candidates", [])
candidates = []
for item in raw:
if not isinstance(item, dict):
continue
c = CandidateAction(
action=str(item.get("action", ""))[:200],
blast_radius=max(0, min(100, int(item.get("blast_radius", 50)))),
rollback_cost=max(0, min(100, int(item.get("rollback_cost", 50)))),
confidence=float(item.get("confidence", 0.0)),
rationale=str(item.get("rationale", ""))[:500],
)
candidates.append(c)
candidates.sort(key=lambda c: c.confidence, reverse=True)
return candidates
def _default_action_for_category(category: str) -> str:
"""降級時的預設動作(最保守的 restart"""
category_lower = category.lower()
if "pod" in category_lower or "kube" in category_lower:
return "restart_pod"
if "disk" in category_lower:
return "check_disk_usage"
if "cpu" in category_lower:
return "check_cpu_usage"
if "network" in category_lower:
return "check_network_connectivity"
return "restart_service"
def compute_input_hash(diagnosis: DiagnosisReport) -> str:
"""計算 Solver 輸入的 fingerprint。"""
key = diagnosis.evidence_snapshot_id + (
diagnosis.top_hypothesis.description if diagnosis.top_hypothesis else ""
)
return hashlib.sha256(key.encode()).hexdigest()[:16]
# ─────────────────────────────────────────────────────────────────────────────
# Singleton
# ─────────────────────────────────────────────────────────────────────────────
_agent: SolverAgent | None = None
def get_solver_agent() -> SolverAgent:
global _agent
if _agent is None:
_agent = SolverAgent()
return _agent

View File

@@ -833,3 +833,79 @@ class IncidentEvidence(Base):
Index("ix_incident_evidence_collected_at", "collected_at"),
Index("ix_incident_evidence_playbook_id", "matched_playbook_id"),
)
# =============================================================================
# AgentSession — Phase 2 多 Agent 辯證 Audit Trail
# =============================================================================
class AgentSession(Base):
"""
ADR-082 Phase 2: 多 Agent 辯證 Immutable Event Log
每個 Agent 每次「發言」寫一行。
session_id 串連同一次 Incident 決策的所有 Agent turns。
不可刪除 — 只能新增Immutable Event Sourcing
Phase 3 學習閉環依賴此表Critic 挑戰成功作為負向學習信號)。
ADR-082: 多 Agent 協作架構
2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 2 初始建立
"""
__tablename__ = "agent_sessions"
id: Mapped[str] = mapped_column(
String(36), primary_key=True, default=lambda: str(uuid4()),
comment="行主鍵UUID"
)
session_id: Mapped[str] = mapped_column(
String(36), nullable=False,
comment="辯證 Session ID一次 Incident 決策的所有 turns 共用同一 session_id"
)
incident_id: Mapped[str] = mapped_column(
String(50), nullable=False,
comment="關聯 Incident ID"
)
agent_role: Mapped[str] = mapped_column(
String(20), nullable=False,
comment="Agent 角色diagnostician / solver / reviewer / critic / coordinator"
)
# 輸入指紋sha256[:16])— 用於查重、快取命中追蹤
input_hash: Mapped[str] = mapped_column(
String(16), nullable=False, default="",
comment="sha256(input_json)[:16],供查重與快取命中追蹤"
)
# Agent 輸出(完整 JSON供 Phase 3 學習 + 事後複盤)
output_json: Mapped[dict] = mapped_column(
JSON, nullable=False, default=dict,
comment="Agent 原始輸出DiagnosisReport / ActionPlan / 等序列化 dict"
)
# 品質指標
latency_ms: Mapped[int] = mapped_column(
Integer, nullable=False, default=0,
comment="此 Agent 的執行耗時ms"
)
vote: Mapped[str] = mapped_column(
String(20), nullable=False, default="abstain",
comment="Agent 投票approve / reject / request_revision / abstain / degraded"
)
degraded: Mapped[bool] = mapped_column(
nullable=False, default=False,
comment="True = 此 Agent 因熔斷/超時降級,輸出為 rule-based mock"
)
# 時間戳(台北時區)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=taipei_now, nullable=False
)
__table_args__ = (
Index("ix_agent_sessions_session_id", "session_id"),
Index("ix_agent_sessions_incident_id", "incident_id"),
Index("ix_agent_sessions_created_at", "created_at"),
# 查詢某 session 中特定 role 的 turnCoordinator 聚合時常用)
Index("ix_agent_sessions_session_role", "session_id", "agent_role"),
)

View File

@@ -0,0 +1,366 @@
"""
AWOOOI AIOps Phase 2 — Agent Orchestrator排程指揮官
======================================================
職責:序列化 5 個 Agent 的執行,管理 Redis Streams 審計軌跡,記錄 DB
執行順序:
Diagnostician(snapshot)
→ Solver(diagnosis)
→ [Reviewer(plan) ‖ Critic(diagnosis, plan)] # 並行
→ Coordinator(diagnosis, plan, verdict, critic)
設計原則:
1. 每個 Agent 已有內建 5s 熔斷(在各自的 run() 中)— Orchestrator 不重複
2. 全流程 30s 全局超時GLOBAL_TIMEOUT_SEC
3. Redis Streams XADDbest-effort失敗不阻塞主流程
4. DB 記錄:每個 Agent turn 一行 agent_sessionsImmutable Event Sourcing
5. AIOPS_P2_ENABLED=False 時禁止使用(呼叫方負責 gate
Redis Stream key: aiops:p2:events
DB table: agent_sessions
ADR-082: Phase 2 多 Agent 協作
2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 2 初始建立
"""
from __future__ import annotations
import asyncio
import dataclasses
import json
import time
import uuid
from datetime import UTC, datetime
from typing import TYPE_CHECKING
import structlog
from sqlalchemy import insert
from src.agents.coordinator_agent import get_coordinator_agent
from src.agents.critic_agent import compute_input_hash as critic_hash
from src.agents.critic_agent import get_critic_agent
from src.agents.diagnostician_agent import compute_input_hash as diag_hash
from src.agents.diagnostician_agent import get_diagnostician_agent
from src.agents.protocol import (
ActionPlan,
AgentRole,
AgentSessionStatus,
AgentVote,
CriticReport,
DecisionPackage,
DiagnosisReport,
ReviewVerdict,
)
from src.agents.reviewer_agent import compute_input_hash as reviewer_hash
from src.agents.reviewer_agent import get_reviewer_agent
from src.agents.solver_agent import compute_input_hash as solver_hash
from src.agents.solver_agent import get_solver_agent
from src.db.base import get_db_context
from src.db.models import AgentSession
if TYPE_CHECKING:
from src.services.evidence_snapshot import EvidenceSnapshot
logger = structlog.get_logger(__name__)
# 全局超時(所有 Agent 加起來)
GLOBAL_TIMEOUT_SEC = 30.0
# Redis Stream key
STREAM_KEY = "aiops:p2:events"
STREAM_MAXLEN = 10_000 # 保留最近 10k 條事件
# ─────────────────────────────────────────────────────────────────────────────
# Public API
# ─────────────────────────────────────────────────────────────────────────────
async def run_agent_debate(
snapshot: "EvidenceSnapshot",
incident_id: str,
) -> DecisionPackage:
"""
執行完整的 5 Agent 辯證流程。
Args:
snapshot: Phase 1 感官快照EvidenceSnapshot
incident_id: 關聯 incident ID
Returns:
DecisionPackage包含 recommended_action + confidence + requires_human_approval
Raises:
不拋出 — 所有錯誤內部吸收,最壞情況返回 requires_human_approval=True 的 Package
"""
session_id = str(uuid.uuid4())
start_ms = int(time.monotonic() * 1000)
logger.info(
"agent_debate_start",
session_id=session_id,
incident_id=incident_id,
snapshot_id=snapshot.snapshot_id,
)
try:
package = await asyncio.wait_for(
_debate(session_id, incident_id, snapshot),
timeout=GLOBAL_TIMEOUT_SEC,
)
elapsed = int(time.monotonic() * 1000) - start_ms
logger.info(
"agent_debate_done",
session_id=session_id,
incident_id=incident_id,
requires_human=package.requires_human_approval,
confidence=package.confidence,
status=package.session_status.value,
elapsed_ms=elapsed,
)
return package
except asyncio.TimeoutError:
elapsed = int(time.monotonic() * 1000) - start_ms
logger.error(
"agent_debate_global_timeout",
session_id=session_id,
elapsed_ms=elapsed,
timeout_sec=GLOBAL_TIMEOUT_SEC,
)
return _timeout_package(elapsed)
except Exception:
elapsed = int(time.monotonic() * 1000) - start_ms
logger.exception("agent_debate_fatal", session_id=session_id)
return _error_package(elapsed)
# ─────────────────────────────────────────────────────────────────────────────
# Internal — debate pipeline
# ─────────────────────────────────────────────────────────────────────────────
async def _debate(
session_id: str,
incident_id: str,
snapshot: "EvidenceSnapshot",
) -> DecisionPackage:
"""實際的 Agent 辯證流程(在全局超時保護內執行)。"""
# ── Step 1: Diagnostician ──────────────────────────────────────────────
diagnostician = get_diagnostician_agent()
diagnosis = await diagnostician.run(snapshot)
await _record_turn(
session_id=session_id,
incident_id=incident_id,
role=AgentRole.DIAGNOSTICIAN,
input_hash=diag_hash(snapshot),
output=diagnosis,
latency_ms=diagnosis.latency_ms,
vote=diagnosis.vote,
degraded=diagnosis.degraded,
)
# ── Step 2: Solver ─────────────────────────────────────────────────────
solver = get_solver_agent()
plan = await solver.run(diagnosis)
await _record_turn(
session_id=session_id,
incident_id=incident_id,
role=AgentRole.SOLVER,
input_hash=solver_hash(diagnosis),
output=plan,
latency_ms=plan.latency_ms,
vote=plan.vote,
degraded=plan.degraded,
)
# ── Step 3: Reviewer ‖ Critic並行──────────────────────────────────
reviewer = get_reviewer_agent()
critic = get_critic_agent()
verdict, critic_report = await asyncio.gather(
reviewer.run(plan),
critic.run(diagnosis, plan),
)
await asyncio.gather(
_record_turn(
session_id=session_id,
incident_id=incident_id,
role=AgentRole.REVIEWER,
input_hash=reviewer_hash(plan),
output=verdict,
latency_ms=verdict.latency_ms,
vote=verdict.vote,
degraded=verdict.degraded,
),
_record_turn(
session_id=session_id,
incident_id=incident_id,
role=AgentRole.CRITIC,
input_hash=critic_hash(diagnosis, plan),
output=critic_report,
latency_ms=critic_report.latency_ms,
vote=critic_report.vote,
degraded=critic_report.degraded,
),
return_exceptions=True, # MINOR-1: best-effort — 一個審計失敗不取消另一個
)
# ── Step 4: Coordinator ────────────────────────────────────────────────
coordinator = get_coordinator_agent()
package = await coordinator.run(diagnosis, plan, verdict, critic_report)
await _record_turn(
session_id=session_id,
incident_id=incident_id,
role=AgentRole.COORDINATOR,
input_hash=_coordinator_input_hash(diagnosis, plan, verdict, critic_report),
output=package,
latency_ms=package.latency_ms,
vote=AgentVote.APPROVE if not package.requires_human_approval else AgentVote.ABSTAIN,
degraded=package.all_agents_degraded,
)
return package
# ─────────────────────────────────────────────────────────────────────────────
# Internal — DB + Redis recording
# ─────────────────────────────────────────────────────────────────────────────
async def _record_turn(
session_id: str,
incident_id: str,
role: AgentRole,
input_hash: str,
output: object,
latency_ms: int,
vote: AgentVote,
degraded: bool,
) -> None:
"""
寫入 agent_sessions DB 行 + Redis Stream XADD兩者皆 best-effort
Immutable Event Sourcing — 只 INSERT永不 UPDATE / DELETE。
"""
output_json = _serialize_output(output)
# ── DB 寫入 ──────────────────────────────────────────────────────────
try:
async with get_db_context() as db:
await db.execute(
insert(AgentSession).values(
id=str(uuid.uuid4()),
session_id=session_id,
incident_id=incident_id,
agent_role=role.value,
input_hash=input_hash,
output_json=output_json,
latency_ms=latency_ms,
vote=vote.value,
degraded=degraded,
created_at=datetime.now(UTC),
)
)
except Exception:
logger.exception(
"agent_session_db_write_failed",
session_id=session_id,
role=role.value,
)
# ── Redis Stream XADDbest-effort失敗靜默────────────────────────
try:
from src.core.redis_client import get_redis
redis = get_redis()
await redis.xadd(
STREAM_KEY,
{
"session_id": session_id,
"incident_id": incident_id,
"role": role.value,
"vote": vote.value,
"latency_ms": str(latency_ms),
"degraded": "1" if degraded else "0",
},
maxlen=STREAM_MAXLEN,
approximate=True,
)
except Exception:
# Redis 失敗不阻塞主流程(觀測性非關鍵路徑)
logger.warning(
"agent_stream_xadd_failed",
session_id=session_id,
role=role.value,
)
def _serialize_output(output: object) -> dict:
"""
將 Agent 輸出 dataclass 轉為可 JSON 序列化的 dict。
Gate 2: dataclasses.asdict() 保留 Enum 實例json.dumps 會拋 TypeError。
用 json.dumps(default=...) 先把 Enum 轉 .value 再 loads 回 dict。
"""
import json
from enum import Enum
try:
if dataclasses.is_dataclass(output) and not isinstance(output, type):
raw = dataclasses.asdict(output) # type: ignore[arg-type]
return json.loads(
json.dumps(raw, default=lambda o: o.value if isinstance(o, Enum) else str(o))
)
return {"raw": str(output)[:2000]}
except Exception:
return {"error": "serialization_failed"}
def _coordinator_input_hash(
diagnosis: DiagnosisReport,
plan: ActionPlan,
verdict: ReviewVerdict,
critic: CriticReport,
) -> str:
"""Coordinator 輸入 fingerprint4 個 Agent 輸出的組合)。"""
import hashlib
key = (
diagnosis.evidence_snapshot_id
+ (diagnosis.top_hypothesis.description if diagnosis.top_hypothesis else "")
+ (plan.top_candidate.action if plan.top_candidate else "")
+ verdict.vote.value
+ str(critic.challenge_count)
)
return hashlib.sha256(key.encode()).hexdigest()[:16]
# ─────────────────────────────────────────────────────────────────────────────
# Degraded fallback packages
# ─────────────────────────────────────────────────────────────────────────────
def _timeout_package(elapsed_ms: int) -> DecisionPackage:
"""全局超時 → 強制人工審核。"""
return DecisionPackage(
recommended_action=None,
confidence=0.0,
requires_human_approval=True,
debate_summary=f"[超時] 辯證流程超過 {GLOBAL_TIMEOUT_SEC}s強制升級人工審核",
session_status=AgentSessionStatus.TIMEOUT,
latency_ms=elapsed_ms,
blocked_reason=f"全局超時 > {GLOBAL_TIMEOUT_SEC}s",
all_agents_degraded=True,
)
def _error_package(elapsed_ms: int) -> DecisionPackage:
"""未預期錯誤 → 強制人工審核。"""
return DecisionPackage(
recommended_action=None,
confidence=0.0,
requires_human_approval=True,
debate_summary="[錯誤] 辯證流程發生未預期錯誤,強制升級人工審核",
session_status=AgentSessionStatus.FAILED,
latency_ms=elapsed_ms,
blocked_reason="Orchestrator 未預期錯誤",
all_agents_degraded=True,
)

View File

@@ -942,6 +942,52 @@ EXPERT_RULES: dict[str, dict[str, Any]] = {
}
def _package_to_proposal_data(package: Any) -> dict[str, Any]:
"""
將 Phase 2 DecisionPackage 轉換為 proposal_data dict。
proposal_data 格式是既有 decision_manager 決策流程的共同語言,
所有下游auto_approve / Telegram / approval_record都依此格式讀取。
ADR-082: Phase 2 多 Agent 協作
2026-04-15 ogt + Claude Sonnet 4.6(亞太)
"""
action = package.recommended_action or ""
confidence = package.confidence
# requires_human_approval → risk_level 映射
# Phase 2 Reviewer + Critic 已做安全審查,不需要 human = low/medium
if package.requires_human_approval:
risk_level = "high" if confidence > 0 else "critical"
elif confidence >= 0.8:
risk_level = "low"
elif confidence >= 0.6:
risk_level = "medium"
else:
risk_level = "high"
return {
"action": action,
"kubectl_command": action if action.startswith("kubectl") else "",
"description": package.debate_summary[:500] if package.debate_summary else "",
"reasoning": package.debate_summary[:1000] if package.debate_summary else "",
"confidence": confidence,
"risk_level": risk_level,
"source": "phase2_agent_debate",
"requires_human_review": package.requires_human_approval,
# Phase 2 診斷摘要(供 Telegram 卡片顯示)
"debate_summary": package.debate_summary or "",
"all_agents_degraded": package.all_agents_degraded,
"blocked_reason": package.blocked_reason or "",
"session_status": package.session_status.value if package.session_status else "",
# MINOR-2: 補齊 Expert System / LLM 路徑存在的 keys防止下游 .get() 靜默空缺
"is_rule_based": False,
"matched_rule": "",
"rule_id": "",
"from_cache": False,
}
def expert_analyze(incident: Incident) -> dict[str, Any]:
"""
Expert System 規則引擎分析
@@ -1670,6 +1716,31 @@ class DecisionManager:
# ADR-070: 原有 MCP 收集路徑Phase 0 保留)
mcp_context = await self._collect_mcp_context(incident)
# ADR-082 Phase 2: 5 Agent 辯證feature flag 守衛)
# AIOPS_P2_ENABLED=True → 走 AgentOrchestrator 路徑,跳過 Playbook / LLM
# 需要 EvidenceSnapshot若 P1 未開啟則自行收集
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太)
if aiops_flags.is_phase_enabled(2): # Gate 2: 用 is_phase_enabled 統一父 Phase 守衛
p2_snapshot = evidence_snapshot
if p2_snapshot is None:
try:
from src.services.pre_decision_investigator import get_pre_decision_investigator
p2_snapshot = await get_pre_decision_investigator().investigate(incident)
except Exception:
logger.warning(
"p2_snapshot_collect_failed",
incident_id=incident.incident_id,
)
if p2_snapshot is not None:
from src.services.agent_orchestrator import run_agent_debate
package = await run_agent_debate(
snapshot=p2_snapshot,
incident_id=incident.incident_id,
)
return _package_to_proposal_data(package)
# snapshot 仍為 None → 降級繼續走原路徑(不阻塞)
logger.warning("p2_no_snapshot_fallback", incident_id=incident.incident_id)
# Phase 7.5: 先嘗試 Playbook 匹配
playbook_result = await self._try_playbook_match(incident)
if playbook_result:

View File

@@ -0,0 +1,119 @@
# ADR-082: 多 Agent 協作架構Phase 2
> **日期**: 2026-04-15台北
> **狀態**: 🟢 批准(統帥 + 首席架構師共同簽核)
> **作者**: Claude Sonnet 4.6(首席架構師)+ 統帥 audit
> **相關**:
> - MASTER 藍圖§3.2 D2 多 Agent 協作
> - ADR-081Phase 1 感官縱深(前置)
> - HARD_RULES v1.9Phase 退出條件鐵律
---
## 背景
Phase 1 完成後AI 已具備「環境感知能力」8D 感官 → EvidenceSnapshot
Phase 2 的目標是解決 LLM 的根本性弱點:**過度自信 + 幻覺**。
單一 OpenClaw 全包的問題:
| 問題 | 現象 |
|------|------|
| 沒有對抗機制 | LLM 說什麼信什麼,無內部質疑 |
| 根因與方案混為一談 | 根因判斷錯誤直接影響方案,無隔離 |
| 安全審查 = 無 | 危險指令只靠 HARD_RULES 靜態擋 |
| 無 Audit Trail | 無法追責「AI 為何做此決策」 |
## 決策
拆解單一 LLM 為 **5 個分工明確的 Agent**,用辯證 + 投票降低幻覺:
| Agent | 職責 | 輸入 | 輸出 |
|-------|------|------|------|
| **Diagnostician偵探** | RCA 根因分析 | `EvidenceSnapshot` | `DiagnosisReport` |
| **Solver軍師** | 對每個假設產方案 | `DiagnosisReport` | `ActionPlan` |
| **Reviewer安全官** | 安全審查 + 可行性 | `ActionPlan` | `ReviewVerdict` |
| **Critic質疑者** | 刻意唱反調,防幻覺 | `DiagnosisReport` + `ActionPlan` | `CriticReport` |
| **Coordinator指揮官** | 聚合辯證,拍板決策 | 全部 Agent 輸出 | `DecisionPackage` |
## 架構
```
EvidenceSnapshot
Diagnostician (3s)
│ DiagnosisReport
Solver (4s)
│ ActionPlan
├─────────────────┐
▼ ▼
Reviewer (3s) Critic (3s) ← 並行
│ ReviewVerdict │ CriticReport
└────────┬─────────┘
Coordinator (2s)
│ DecisionPackage
(回 decision_manager)
```
**時間預算**P99 < 20s含 Investigator 8s = Phase 1 已涵蓋)
**單 Agent 熔斷**:超時 5s → 降級為 rule-based mock輸出「棄權」不阻塞流程
**全流程熔斷**> 30s → Coordinator 強制以現有資訊輸出結論並標 `degraded`
## Redis Streams 訊息匯流
Agent 間不直接 call function透過 Redis Streams 傳訊:
- Stream Key: `aiops:agent:{session_id}`
- Consumer Group: 各 Agent 獨立消費自己的訊息
- 優勢:單一 Agent 掛掉不影響其他;可 replay 除錯
## AgentSession DB 表Audit Trail
每次辯證全程寫入 `agent_sessions`
```
session_id / incident_id / agent_role / input_hash / output_json / latency_ms / created_at
```
- 不可刪除只能新增Immutable Event Sourcing
- Phase 3 學習閉環依賴此表記錄「Critic 是否挑戰成功」作為學習信號)
## 安全原則
- Reviewer 必須硬核拒絕任何觸碰 HARD_RULES 的動作delete node / DROP TABLE 等)
- Critic prompt 強化「你的工作是找漏洞,不是順著說好話」(防 sycophancy
- Agent 間不得修改彼此 prompt防 prompt 污染擴散)
- 所有 Agent 輸出先過 `SanitizationService`ADR-081 防注入)
## 回滾策略
```
AIOPS_P2_ENABLED=False → decision_manager 退回 Phase 1 單 LLM 路徑
```
- `agent_sessions` 表只新增,不影響任何舊表
- Orchestrator 熔斷設計:任一 Agent 失敗 → Coordinator 用現有資訊繼續
## Phase 2 退出條件
| 條件 | 量化指標 |
|------|---------|
| AgentSession 有資料 | 24h 內 ≥ 3 筆Diagnostician / Solver / Coordinator 各 1|
| 辯證有效 | 每事件 AgentSession turns ≥ 3 |
| Critic 工作 | CriticReport challenge_count > 0 至少 1 次 |
| 熔斷可用 | Diagnostician timeout → Coordinator 仍完成(降級路徑測試) |
| 延遲達標 | 多 Agent 路徑啟用後 p95 < 8s避免拖垮主流程|
## 不決定
- Agent 使用什麼 LLM保持通過現有 `openclaw.call()` 路由Phase 4 再優化模型分配)
- Agent 是否有「記憶」Phase 3 再加 evidence_chain fine-tune
- Declarative 修復Phase 5
---
*2026-04-15 ogt + Claude Sonnet 4.6亞太Phase 2 正式批准*