diff --git a/apps/api/src/agents/base.py b/apps/api/src/agents/base.py index 458a0d88..271c7043 100644 --- a/apps/api/src/agents/base.py +++ b/apps/api/src/agents/base.py @@ -163,11 +163,13 @@ class BaseAgent(ABC, Generic[T]): except json.JSONDecodeError: pass - # 嘗試 { ... } 格式 - match = re.search(r"\{[^{}]*\}", text, re.DOTALL) - if match: + # 嘗試從第一個 { 到最後一個 } 提取(支援巢狀 JSON) + # Gate 2: 舊 r"\{[^{}]*\}" 會拒絕巢狀物件,造成所有 Agent LLM 回應解析失敗 + start = text.find("{") + end = text.rfind("}") + if start != -1 and end > start: try: - return json.loads(match.group(0)) + return json.loads(text[start:end + 1]) except json.JSONDecodeError: pass diff --git a/apps/api/src/agents/coordinator_agent.py b/apps/api/src/agents/coordinator_agent.py new file mode 100644 index 00000000..2422e0a2 --- /dev/null +++ b/apps/api/src/agents/coordinator_agent.py @@ -0,0 +1,299 @@ +""" +AWOOOI AIOps Phase 2 — Coordinator Agent(指揮官) +================================================== +職責:聚合所有 Agent 輸出,做最終決策 + +輸入:DiagnosisReport + ActionPlan + ReviewVerdict + CriticReport +輸出:DecisionPackage(recommended_action + confidence + requires_human_approval) + +聚合邏輯: +1. Reviewer REJECT → blocked_actions 全部禁止執行,強制人工審核 +2. Reviewer REQUEST_REVISION → 過濾高 blast_radius 方案,使用 safe_candidates +3. Critic 有 critical challenge → confidence 降低 CRITIC_PENALTY +4. 全 Agent degraded → requires_human_approval = True(安全第一) +5. Diagnostician ABSTAIN 且無有效假設 → requires_human_approval = True +6. 最終 confidence < HUMAN_ESCALATION_THRESHOLD → requires_human_approval = True + +ADR-082: Phase 2 多 Agent 協作 +2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 2 初始建立 +""" + +from __future__ import annotations + +import time +from typing import Any + +import structlog + +from src.agents.base import BaseAgent +from src.agents.protocol import ( + ActionPlan, + AgentRole, + AgentSessionStatus, + AgentVote, + CriticReport, + DecisionPackage, + DiagnosisReport, + ReviewVerdict, +) + +logger = structlog.get_logger(__name__) + +# confidence 低於此閾值 → 強制人工審核 +HUMAN_ESCALATION_THRESHOLD = 0.4 + +# Critic critical challenge 懲罰係數 +CRITIC_PENALTY = 0.3 + + +class CoordinatorAgent(BaseAgent): + """ + Coordinator Agent — 指揮官(最終決策者) + + Usage: + agent = CoordinatorAgent() + package = await agent.run(diagnosis, plan, verdict, critic) + """ + + AGENT_NAME = AgentRole.COORDINATOR.value + AGENT_DESCRIPTION = ( + "Final decision synthesizer. Aggregates all agent outputs into " + "a single actionable DecisionPackage." + ) + + async def run( + self, + diagnosis: DiagnosisReport, + plan: ActionPlan, + verdict: ReviewVerdict, + critic: CriticReport, + ) -> DecisionPackage: + """ + 聚合所有 Agent 輸出,決定最終行動。 + + Args: + diagnosis: Diagnostician 輸出 + plan: Solver 輸出 + verdict: Reviewer 安全審查結果 + critic: Critic 批判性審查結果 + + Returns: + DecisionPackage(不熔斷,Coordinator 必須輸出決策) + """ + start_ms = int(time.monotonic() * 1000) + + package = self._synthesize(diagnosis, plan, verdict, critic) + package.latency_ms = int(time.monotonic() * 1000) - start_ms + + logger.info( + "coordinator_done", + recommended_action=package.recommended_action, + confidence=package.confidence, + requires_human=package.requires_human_approval, + all_degraded=package.all_agents_degraded, + session_status=package.session_status, + latency_ms=package.latency_ms, + ) + return package + + def _synthesize( + self, + diagnosis: DiagnosisReport, + plan: ActionPlan, + verdict: ReviewVerdict, + critic: CriticReport, + ) -> DecisionPackage: + """核心聚合邏輯(純函數,無 LLM,不可失敗)。""" + top = plan.top_candidate + base_confidence = top.confidence if top else 0.0 + selected = top + + # ── 1. Reviewer REJECT → 最優先安全門 ────────────────────────── + if verdict.vote == AgentVote.REJECT: + return DecisionPackage( + recommended_action=None, + confidence=0.0, + requires_human_approval=True, + debate_summary=_build_summary(diagnosis, plan, verdict, critic), + session_status=AgentSessionStatus.COMPLETED, + latency_ms=0, + diagnosis=diagnosis, + action_plan=plan, + reviewer_verdict=verdict, + critic_report=critic, + blocked_reason=f"Reviewer 拒絕:{verdict.reason}", + ) + + # ── 2. Reviewer REQUEST_REVISION → 強制人工審核(Solver 未修訂,不可自動執行)─ + # Gate 2: REQUEST_REVISION 代表「請 Solver 重新設計方案」,此 Phase 無迭代機制 + # → 保留 safe_candidates 供人工參考,但 requires_human_approval 必須 True + if verdict.vote == AgentVote.REQUEST_REVISION: + safe_candidates = [ + c for c in plan.candidates + if c.action not in verdict.blocked_actions + ] + selected = safe_candidates[0] if safe_candidates else None + base_confidence = selected.confidence if selected else 0.0 + return DecisionPackage( + recommended_action=selected.action if selected else None, + confidence=base_confidence, + requires_human_approval=True, + debate_summary=_build_summary(diagnosis, plan, verdict, critic), + session_status=AgentSessionStatus.COMPLETED, + latency_ms=0, + diagnosis=diagnosis, + action_plan=plan, + reviewer_verdict=verdict, + critic_report=critic, + blocked_reason=f"Reviewer REQUEST_REVISION:{verdict.reason}", + ) + + # ── 3. Critic critical challenge → 信心懲罰 ───────────────────── + adjusted_confidence = base_confidence + if critic.has_critical_challenge: + adjusted_confidence = max(0.0, base_confidence - CRITIC_PENALTY) + logger.info( + "coordinator_critic_penalty", + before=base_confidence, + after=adjusted_confidence, + ) + + # ── 4. 全 Agent 降級 → 強制人工 ────────────────────────────────── + # Gate 2: 原本遺漏 verdict.degraded,Reviewer 熔斷時 all_degraded 被低估 + all_degraded = diagnosis.degraded and plan.degraded and verdict.degraded and critic.degraded + if all_degraded: + return DecisionPackage( + recommended_action=selected.action if selected else None, + confidence=adjusted_confidence, + requires_human_approval=True, + debate_summary=_build_summary(diagnosis, plan, verdict, critic), + session_status=AgentSessionStatus.DEGRADED, + latency_ms=0, + diagnosis=diagnosis, + action_plan=plan, + reviewer_verdict=verdict, + critic_report=critic, + blocked_reason="所有 Agent 皆降級,信心不可信,強制人工審核", + all_agents_degraded=True, + ) + + # ── 5. Diagnostician 無有效假設 → 強制人工 ─────────────────────── + if not diagnosis.hypotheses or diagnosis.vote == AgentVote.ABSTAIN: + return DecisionPackage( + recommended_action=selected.action if selected else None, + confidence=adjusted_confidence, + requires_human_approval=True, + debate_summary=_build_summary(diagnosis, plan, verdict, critic), + session_status=AgentSessionStatus.DEGRADED, + latency_ms=0, + diagnosis=diagnosis, + action_plan=plan, + reviewer_verdict=verdict, + critic_report=critic, + blocked_reason="Diagnostician ABSTAIN:感官情報不足,需人工判斷根因", + ) + + # ── 6. confidence 低於閾值 → 強制人工 ──────────────────────────── + if adjusted_confidence < HUMAN_ESCALATION_THRESHOLD: + return DecisionPackage( + recommended_action=selected.action if selected else None, + confidence=adjusted_confidence, + requires_human_approval=True, + debate_summary=_build_summary(diagnosis, plan, verdict, critic), + session_status=AgentSessionStatus.DEGRADED, + latency_ms=0, + diagnosis=diagnosis, + action_plan=plan, + reviewer_verdict=verdict, + critic_report=critic, + blocked_reason=( + f"信心 {adjusted_confidence:.0%} < 閾值 " + f"{HUMAN_ESCALATION_THRESHOLD:.0%},需人工確認" + ), + ) + + # ── 7. 自動執行 ──────────────────────────────────────────────────── + return DecisionPackage( + recommended_action=selected.action if selected else None, + confidence=adjusted_confidence, + requires_human_approval=False, + debate_summary=_build_summary(diagnosis, plan, verdict, critic), + session_status=AgentSessionStatus.COMPLETED, + latency_ms=0, + diagnosis=diagnosis, + action_plan=plan, + reviewer_verdict=verdict, + critic_report=critic, + ) + + def _build_prompt(self, _context: dict[str, Any]) -> str: + return "" # Coordinator 不使用 LLM(純規則聚合) + + def _parse_response(self, _response: str) -> dict[str, Any]: + return {} + + def analyze(self, context: dict[str, Any]) -> Any: + raise NotImplementedError("Use run() for Phase 2 agents") + + +# ───────────────────────────────────────────────────────────────────────────── +# Helpers +# ───────────────────────────────────────────────────────────────────────────── + +def _build_summary( + diagnosis: DiagnosisReport, + plan: ActionPlan, + verdict: ReviewVerdict, + critic: CriticReport, +) -> str: + """產生 debate_summary(結構化文字,限 1000 字)。""" + parts = [] + + top_h = diagnosis.top_hypothesis + if top_h: + parts.append( + f"診斷:{top_h.description[:200]}(信心 {top_h.confidence:.0%}," + f"{'降級' if diagnosis.degraded else '正常'})" + ) + else: + parts.append("診斷:無有效假設(ABSTAIN)") + + top_c = plan.top_candidate + if top_c: + parts.append( + f"方案:{top_c.action[:100]}(blast_radius={top_c.blast_radius}," + f"{'降級' if plan.degraded else '正常'})" + ) + else: + parts.append("方案:無候選動作") + + parts.append( + f"安全審查:{verdict.vote.value}({verdict.reason[:100]}," + f"{'降級' if verdict.degraded else '正常'})" + ) + + if critic.challenges: + top_ch = critic.challenges[0] + parts.append( + f"質疑:{top_ch.severity} — {top_ch.argument[:150]}(" + f"共 {critic.challenge_count} 項," + f"{'降級' if critic.degraded else '正常'})" + ) + else: + parts.append(f"質疑:無({'降級' if critic.degraded else '通過審查'})") + + return ";".join(parts)[:1000] + + +# ───────────────────────────────────────────────────────────────────────────── +# Singleton +# ───────────────────────────────────────────────────────────────────────────── + +_agent: CoordinatorAgent | None = None + + +def get_coordinator_agent() -> CoordinatorAgent: + global _agent + if _agent is None: + _agent = CoordinatorAgent() + return _agent diff --git a/apps/api/src/agents/critic_agent.py b/apps/api/src/agents/critic_agent.py new file mode 100644 index 00000000..1a202805 --- /dev/null +++ b/apps/api/src/agents/critic_agent.py @@ -0,0 +1,237 @@ +""" +AWOOOI AIOps Phase 2 — Critic Agent(質疑者) +============================================= +職責:刻意唱反調,防止幻覺與 echo chamber + +輸入:DiagnosisReport + ActionPlan(兩者都看) +輸出:CriticReport(challenges[] 列表 + overall_assessment) + +設計原則: +1. Critic 的工作是找漏洞,不是說好話(防 sycophancy) +2. prompt 強制要求批判性思維:「如果診斷是錯的,還有哪 3 種可能?」 +3. challenge_count > 0 是 Phase 2 退出條件之一 +4. Critic 連續 3 次找到 Diagnostician 嚴重漏洞 → 觸發 Diagnostician 狀態不穩(Phase 4 實作) +5. 熔斷降級:LLM 失敗 → 輸出空 challenges(不阻塞 Coordinator) +6. Critic 和 Reviewer 並行執行(都不阻塞對方) + +ADR-082: Phase 2 多 Agent 協作 +2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 2 初始建立 +""" + +from __future__ import annotations + +import asyncio +import hashlib +import time +from typing import Any + +import structlog + +from src.agents.base import BaseAgent +from src.agents.protocol import ( + ActionPlan, + AgentRole, + AgentVote, + Challenge, + CriticReport, + DiagnosisReport, +) +from src.services.sanitization_service import sanitize + +logger = structlog.get_logger(__name__) + +# Critic 挑戰數量上限(防止 LLM 生成無限質疑) +MAX_CHALLENGES = 5 + + +class CriticAgent(BaseAgent): + """ + Critic Agent — 系統性懷疑論者 + + Usage: + agent = CriticAgent() + report = await agent.run(diagnosis, plan) + """ + + AGENT_NAME = AgentRole.CRITIC.value + AGENT_DESCRIPTION = ( + "Devil's advocate. Challenges diagnosis and proposed actions to prevent " + "hallucination and echo chamber effects." + ) + + async def run( + self, + diagnosis: DiagnosisReport, + plan: ActionPlan, + timeout_sec: float = 5.0, + ) -> CriticReport: + """ + 批判性審查診斷和方案。 + + Args: + diagnosis: Diagnostician 輸出 + plan: Solver 輸出 + timeout_sec: 熔斷超時 + + Returns: + CriticReport(熔斷時 degraded=True,challenges 為空) + """ + start_ms = int(time.monotonic() * 1000) + + try: + report = await asyncio.wait_for( + self._critique(diagnosis, plan), + timeout=timeout_sec, + ) + report.latency_ms = int(time.monotonic() * 1000) - start_ms + logger.info( + "critic_done", + challenges=report.challenge_count, + has_critical=report.has_critical_challenge, + vote=report.vote, + latency_ms=report.latency_ms, + ) + return report + + except asyncio.TimeoutError: + latency = int(time.monotonic() * 1000) - start_ms + logger.warning("critic_timeout", timeout_sec=timeout_sec) + return self._degraded_report(latency, "timeout") + + except Exception: + latency = int(time.monotonic() * 1000) - start_ms + logger.exception("critic_error") + return self._degraded_report(latency, "error") + + async def _critique( + self, + diagnosis: DiagnosisReport, + plan: ActionPlan, + ) -> CriticReport: + """LLM 批判性推理。""" + top_hypothesis = diagnosis.top_hypothesis + top_candidate = plan.top_candidate + + prompt = self._build_prompt({ + "hypothesis": top_hypothesis.description if top_hypothesis else "(無假設)", + "action": top_candidate.action if top_candidate else "(無方案)", + "confidence": top_hypothesis.confidence if top_hypothesis else 0.0, + }) + + from src.services.openclaw import get_openclaw + openclaw = get_openclaw() + response_text, _provider, success = await openclaw.call(prompt) + + if not success or not response_text: + return self._degraded_report(0, "llm_failed") + + parsed = self._parse_response(sanitize(response_text, "critic_output")) + challenges = _extract_challenges(parsed) + + # 有 critical challenge → vote = REJECT + vote = AgentVote.REJECT if any(c.severity == "critical" for c in challenges) else AgentVote.APPROVE + + return CriticReport( + challenges=challenges, + overall_assessment=str(parsed.get("overall_assessment", ""))[:1000], + latency_ms=0, + vote=vote, + ) + + def _build_prompt(self, context: dict[str, Any]) -> str: + return f"""你是 AWOOOI SRE 系統的質疑者 Agent(Critic)。 + +你的工作是:找出診斷和方案的弱點。不是說好話,是找漏洞。 + +當前診斷:{context.get("hypothesis", "")} +當前方案:{context.get("action", "")} +診斷信心:{context.get("confidence", 0.0):.0%} + +必須回答以下問題(每個問題產出一個 challenge): +1. 如果這個診斷是錯的,還有哪些可能的根因? +2. 這個方案有什麼副作用或風險? +3. 是否有更好的替代方案被忽略了? + +每個 challenge 標記嚴重度: +- "minor":小瑕疵,不影響執行 +- "major":值得 Coordinator 考慮,但不是阻擋條件 +- "critical":嚴重邏輯漏洞,必須阻止此方案執行 + +以 JSON 回覆: +{{ + "challenges": [ + {{ + "target": "diagnosis", + "argument": "可能是 OOM 但也可能是 code bug,需要看 GC logs 確認", + "severity": "major" + }} + ], + "overall_assessment": "診斷可信但方案風險偏高" +}}""" + + def _parse_response(self, response: str) -> dict[str, Any]: + return self._extract_json(response) + + def analyze(self, context: dict[str, Any]) -> Any: + raise NotImplementedError("Use run() for Phase 2 agents") + + def _degraded_report( + self, + latency_ms: int, + reason: str = "unknown", + ) -> CriticReport: + """熔斷降級:輸出空 challenges(不阻塞 Coordinator)""" + return CriticReport( + challenges=[], + overall_assessment=f"[降級] Critic LLM 失敗({reason}),跳過批判性審查", + latency_ms=latency_ms, + vote=AgentVote.ABSTAIN, + degraded=True, + ) + + +# ───────────────────────────────────────────────────────────────────────────── +# Helpers +# ───────────────────────────────────────────────────────────────────────────── + +def _extract_challenges(parsed: dict[str, Any]) -> list[Challenge]: + """從 LLM 解析結果提取 challenges(按嚴重度排序)。""" + raw = parsed.get("challenges", []) + challenges = [] + severity_order = {"critical": 0, "major": 1, "minor": 2} + + for item in raw: + if not isinstance(item, dict): + continue + c = Challenge( + target=str(item.get("target", "unknown"))[:50], + argument=str(item.get("argument", ""))[:500], + severity=item.get("severity", "minor") if item.get("severity") in severity_order else "minor", + ) + challenges.append(c) + + challenges.sort(key=lambda c: severity_order.get(c.severity, 2)) + return challenges[:MAX_CHALLENGES] + + +def compute_input_hash(diagnosis: DiagnosisReport, plan: ActionPlan) -> str: + key = diagnosis.evidence_snapshot_id + ( + diagnosis.top_hypothesis.description if diagnosis.top_hypothesis else "" + ) + ( + plan.top_candidate.action if plan.top_candidate else "" + ) + return hashlib.sha256(key.encode()).hexdigest()[:16] + + +# ───────────────────────────────────────────────────────────────────────────── +# Singleton +# ───────────────────────────────────────────────────────────────────────────── + +_agent: CriticAgent | None = None + + +def get_critic_agent() -> CriticAgent: + global _agent + if _agent is None: + _agent = CriticAgent() + return _agent diff --git a/apps/api/src/agents/diagnostician_agent.py b/apps/api/src/agents/diagnostician_agent.py new file mode 100644 index 00000000..46baf4dd --- /dev/null +++ b/apps/api/src/agents/diagnostician_agent.py @@ -0,0 +1,246 @@ +""" +AWOOOI AIOps Phase 2 — Diagnostician Agent(偵探) +================================================== +職責:RCA 根因分析 + +輸入:EvidenceSnapshot(8D 感官情報) +輸出:DiagnosisReport(多根因假設,含 confidence + evidence_chain) + +設計原則: +1. 只做診斷,不提解法(Solver 的工作) +2. top-1 confidence < 0.4 → vote = ABSTAIN(情報不足,回傳 Coordinator 判斷) +3. 熔斷降級:LLM 失敗 / 超時 → rule-based mock(以 alert_category 作簡單假設) +4. 所有 LLM 輸出過 SanitizationService(防 Prompt Injection) + +ADR-082: Phase 2 多 Agent 協作 +2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 2 初始建立 +""" + +from __future__ import annotations + +import asyncio +import hashlib +import json +import time +from typing import TYPE_CHECKING, Any + +import structlog + +from src.agents.base import BaseAgent, AgentResult, AgentStatus +from src.agents.protocol import ( + AgentRole, + AgentVote, + DiagnosisReport, + Hypothesis, +) +from src.services.sanitization_service import sanitize + +if TYPE_CHECKING: + from src.services.evidence_snapshot import EvidenceSnapshot + +logger = structlog.get_logger(__name__) + +# 每個假設的最大 evidence chain 長度(防超 token) +MAX_EVIDENCE_CHAIN = 5 + +# Confidence 閾值 — 低於此值 vote = ABSTAIN +ABSTAIN_CONFIDENCE_THRESHOLD = 0.4 + + +class DiagnosticianAgent(BaseAgent): + """ + Diagnostician Agent — RCA 根因分析偵探 + + Usage: + agent = DiagnosticianAgent() + report = await agent.run(snapshot) + """ + + AGENT_NAME = AgentRole.DIAGNOSTICIAN.value + AGENT_DESCRIPTION = "Root Cause Analysis specialist. Produces multiple hypotheses with confidence scores." + + async def run( + self, + snapshot: "EvidenceSnapshot", + timeout_sec: float = 5.0, + ) -> DiagnosisReport: + """ + 執行根因分析。 + + Args: + snapshot: Phase 1 感官快照 + timeout_sec: 熔斷超時(預設 5s) + + Returns: + DiagnosisReport(熔斷時 degraded=True,vote=ABSTAIN) + """ + start_ms = int(time.monotonic() * 1000) + + try: + report = await asyncio.wait_for( + self._analyze(snapshot), + timeout=timeout_sec, + ) + report.latency_ms = int(time.monotonic() * 1000) - start_ms + logger.info( + "diagnostician_done", + snapshot_id=snapshot.snapshot_id, + hypotheses=len(report.hypotheses), + top_confidence=report.top_confidence, + vote=report.vote, + latency_ms=report.latency_ms, + ) + return report + + except asyncio.TimeoutError: + latency = int(time.monotonic() * 1000) - start_ms + logger.warning("diagnostician_timeout", timeout_sec=timeout_sec) + return self._degraded_report(snapshot, latency, reason="timeout") + + except Exception: + latency = int(time.monotonic() * 1000) - start_ms + logger.exception("diagnostician_error") + return self._degraded_report(snapshot, latency, reason="error") + + async def _analyze(self, snapshot: "EvidenceSnapshot") -> DiagnosisReport: + """核心 LLM 分析邏輯。""" + prompt = self._build_prompt({"evidence_summary": snapshot.evidence_summary or ""}) + + from src.services.openclaw import get_openclaw + openclaw = get_openclaw() + response_text, _provider, success = await openclaw.call(prompt) + + if not success or not response_text: + return self._degraded_report(snapshot, 0, reason="llm_failed") + + parsed = self._parse_response(sanitize(response_text, "diagnostician_output")) + hypotheses = _extract_hypotheses(parsed) + + vote = AgentVote.APPROVE + if not hypotheses or hypotheses[0].confidence < ABSTAIN_CONFIDENCE_THRESHOLD: + vote = AgentVote.ABSTAIN + + return DiagnosisReport( + hypotheses=hypotheses, + evidence_snapshot_id=snapshot.snapshot_id or "", + latency_ms=0, # 由 run() 覆蓋 + vote=vote, + ) + + def _build_prompt(self, context: dict[str, Any]) -> str: + evidence = context.get("evidence_summary", "(無感官情報)") + return f"""你是 AWOOOI SRE 系統的偵探 Agent,專職根因分析(Root Cause Analysis)。 + +你的唯一工作:根據以下感官情報,提出 2-3 個根因假設(hypotheses)。 +不要提修復方案,那是 Solver 的工作。 +每個假設必須: +1. 有 confidence(0.0-1.0) +2. 列出支持此假設的 evidence key(限 {MAX_EVIDENCE_CHAIN} 個) +3. 有 category(K8s Pod / HostDisk / NetworkLatency / DatabaseConnection / 等) + +如果感官情報嚴重不足(所有假設 confidence < 0.4),說明原因。 + +--- +感官情報: +{evidence} +--- + +以 JSON 回覆(不要加任何解釋): +{{ + "hypotheses": [ + {{ + "description": "假設描述", + "confidence": 0.85, + "evidence_chain": ["k8s_state.pod_status", "recent_logs.oom_signal"], + "category": "KubePodOOM" + }} + ] +}}""" + + def _parse_response(self, response: str) -> dict[str, Any]: + return self._extract_json(response) + + def analyze(self, context: dict[str, Any]) -> Any: + """BaseAgent 抽象方法 — Phase 2 改用 run() 入口。""" + raise NotImplementedError("Use run() for Phase 2 agents") + + def _degraded_report( + self, + snapshot: "EvidenceSnapshot", + latency_ms: int, + reason: str = "unknown", + ) -> DiagnosisReport: + """熔斷降級:rule-based mock(用 alert_category 作簡單假設)""" + category = _guess_category_from_snapshot(snapshot) + return DiagnosisReport( + hypotheses=[ + Hypothesis( + description=f"[降級] 無法完成 LLM 分析(原因: {reason})。基於告警類別推測: {category}", + confidence=0.2, + evidence_chain=[], + category=category, + ) + ], + evidence_snapshot_id=snapshot.snapshot_id or "", + latency_ms=latency_ms, + vote=AgentVote.ABSTAIN, + degraded=True, + ) + + +# ───────────────────────────────────────────────────────────────────────────── +# Helpers +# ───────────────────────────────────────────────────────────────────────────── + +def _extract_hypotheses(parsed: dict[str, Any]) -> list[Hypothesis]: + """從 LLM 解析結果提取假設列表(按信心降序)。""" + raw = parsed.get("hypotheses", []) + hypotheses = [] + for item in raw: + if not isinstance(item, dict): + continue + h = Hypothesis( + description=str(item.get("description", ""))[:500], + confidence=float(item.get("confidence", 0.0)), + evidence_chain=item.get("evidence_chain", [])[:MAX_EVIDENCE_CHAIN], + category=str(item.get("category", "")), + ) + hypotheses.append(h) + hypotheses.sort(key=lambda h: h.confidence, reverse=True) + return hypotheses + + +def _guess_category_from_snapshot(snapshot: "EvidenceSnapshot") -> str: + """降級時從 snapshot 猜測告警類別(最粗粒度兜底)。""" + summary = (snapshot.evidence_summary or "").lower() + if "oom" in summary or "memory" in summary: + return "KubePodOOM" + if "crashloop" in summary: + return "KubePodCrashLoop" + if "disk" in summary: + return "HostDiskUsage" + if "cpu" in summary: + return "HostCpuHigh" + if "network" in summary or "timeout" in summary: + return "NetworkLatency" + return "Unknown" + + +def compute_input_hash(snapshot: "EvidenceSnapshot") -> str: + """計算 Diagnostician 輸入的 fingerprint(用於 AgentSession input_hash)。""" + key = (snapshot.snapshot_id or "") + (snapshot.evidence_summary or "")[:100] + return hashlib.sha256(key.encode()).hexdigest()[:16] + + +# ───────────────────────────────────────────────────────────────────────────── +# Singleton +# ───────────────────────────────────────────────────────────────────────────── + +_agent: DiagnosticianAgent | None = None + + +def get_diagnostician_agent() -> DiagnosticianAgent: + global _agent + if _agent is None: + _agent = DiagnosticianAgent() + return _agent diff --git a/apps/api/src/agents/protocol.py b/apps/api/src/agents/protocol.py new file mode 100644 index 00000000..7416e14d --- /dev/null +++ b/apps/api/src/agents/protocol.py @@ -0,0 +1,253 @@ +""" +AWOOOI AIOps Phase 2 — 多 Agent 協作訊息協定 +============================================== +定義 5 個 Agent 間傳遞的不可變資料型別。 + +設計原則: +1. 每個 Agent 有明確的 Input / Output 型別(不共用 dict) +2. 所有型別都是 dataclass(快速、可序列化、無外部依賴) +3. 降級 / 棄權用明確 AgentVote.ABSTAIN,不用 None 代替 +4. 全程 immutable — Agent 不得修改彼此的輸出(防 prompt 污染) + +ADR-082: 多 Agent 協作架構(Phase 2) +2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 2 初始建立 +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from enum import Enum +from typing import Any + + +# ───────────────────────────────────────────────────────────────────────────── +# Enums +# ───────────────────────────────────────────────────────────────────────────── + +class AgentRole(str, Enum): + """Phase 2 五角色標識""" + DIAGNOSTICIAN = "diagnostician" + SOLVER = "solver" + REVIEWER = "reviewer" + CRITIC = "critic" + COORDINATOR = "coordinator" + + +class AgentVote(str, Enum): + """Agent 投票結果""" + APPROVE = "approve" + REJECT = "reject" + REQUEST_REVISION = "request_revision" + ABSTAIN = "abstain" # 熔斷 / 超時 / 無足夠資訊 + DEGRADED = "degraded" # 降級路徑(rule-based mock) + + +class AgentSessionStatus(str, Enum): + """AgentSession 整體狀態""" + RUNNING = "running" + COMPLETED = "completed" + DEGRADED = "degraded" # 部分 Agent 熔斷但仍完成 + FAILED = "failed" # Coordinator 無法輸出任何結論 + TIMEOUT = "timeout" # 全流程 > 30s + + +# ───────────────────────────────────────────────────────────────────────────── +# Diagnostician Output +# ───────────────────────────────────────────────────────────────────────────── + +@dataclass +class Hypothesis: + """單一根因假設""" + description: str + confidence: float # 0.0 ~ 1.0 + evidence_chain: list[str] # 支持此假設的 evidence key + category: str = "" # alert_category(KubePod / HostDisk 等) + + +@dataclass +class DiagnosisReport: + """ + Diagnostician Agent 輸出 + + 包含多個根因假設(按信心排序), + top-1 confidence < 0.4 觸發 Coordinator 回退 Investigator 重抓。 + """ + hypotheses: list[Hypothesis] + evidence_snapshot_id: str + latency_ms: int + vote: AgentVote = AgentVote.APPROVE # 資訊足夠 = APPROVE;不足 = ABSTAIN + degraded: bool = False # 熔斷降級標記 + + @property + def top_hypothesis(self) -> Hypothesis | None: + """最高信心假設""" + return self.hypotheses[0] if self.hypotheses else None + + @property + def top_confidence(self) -> float: + return self.top_hypothesis.confidence if self.top_hypothesis else 0.0 + + +# ───────────────────────────────────────────────────────────────────────────── +# Solver Output +# ───────────────────────────────────────────────────────────────────────────── + +@dataclass +class CandidateAction: + """單一候選修復動作""" + action: str # 動作描述(e.g. "restart_service:awoooi-api") + blast_radius: int # 0-100:影響範圍評分 + rollback_cost: int # 0-100:回滾難度 + confidence: float # 0.0 ~ 1.0 + rationale: str = "" # 為什麼選此方案 + + +@dataclass +class ActionPlan: + """ + Solver Agent 輸出 + + 對每個根因假設提出 ≥1 個候選方案(含 blast_radius / rollback_cost)。 + blast_radius > 50 → Reviewer 必須標 `request_revision`。 + """ + candidates: list[CandidateAction] + diagnosis_report: DiagnosisReport + latency_ms: int + vote: AgentVote = AgentVote.APPROVE + degraded: bool = False + + @property + def top_candidate(self) -> CandidateAction | None: + """最高信心候選方案""" + return max(self.candidates, key=lambda c: c.confidence) if self.candidates else None + + +# ───────────────────────────────────────────────────────────────────────────── +# Reviewer Output +# ───────────────────────────────────────────────────────────────────────────── + +@dataclass +class ReviewVerdict: + """ + Reviewer Agent 輸出(安全審查) + + 硬核拒絕 HARD_RULES 觸碰動作(delete node / DROP TABLE / force push 等)。 + vote = REJECT 時,Coordinator 不得執行任何候選方案。 + """ + vote: AgentVote + reason: str + blocked_actions: list[str] # 被拒絕的動作清單 + safe_actions: list[str] # 通過安全審查的動作 + latency_ms: int + degraded: bool = False + + @property + def is_safe(self) -> bool: + return self.vote == AgentVote.APPROVE and bool(self.safe_actions) + + +# ───────────────────────────────────────────────────────────────────────────── +# Critic Output +# ───────────────────────────────────────────────────────────────────────────── + +@dataclass +class Challenge: + """Critic 的單一質疑""" + target: str # "diagnosis" | "action:{action_str}" + argument: str # 質疑的具體理由 + severity: str # "minor" | "major" | "critical" + + +@dataclass +class CriticReport: + """ + Critic Agent 輸出(刻意唱反調) + + challenge_count > 0 是 Phase 2 退出條件之一。 + major/critical challenge 觸發 Coordinator 降低對 Solver 方案的信心。 + """ + challenges: list[Challenge] + overall_assessment: str + latency_ms: int + vote: AgentVote = AgentVote.APPROVE # APPROVE=無重大反對;REJECT=有 critical challenge + degraded: bool = False + + @property + def challenge_count(self) -> int: + return len(self.challenges) + + @property + def has_critical_challenge(self) -> bool: + return any(c.severity == "critical" for c in self.challenges) + + +# ───────────────────────────────────────────────────────────────────────────── +# Coordinator Output +# ───────────────────────────────────────────────────────────────────────────── + +@dataclass +class DecisionPackage: + """ + Coordinator Agent 輸出(最終決策包) + + 包含: + - recommended_action: 最終推薦動作(None = 棄權 / 升級人工) + - confidence: 綜合信心(Solver × Reviewer × Critic 加權) + - requires_human_approval: 是否需要人工審核 + - debate_summary: 辯證歷程摘要(供 Audit Trail + 學習閉環) + - session_status: 整體辯證狀態 + """ + recommended_action: str | None + confidence: float + requires_human_approval: bool + debate_summary: str + session_status: AgentSessionStatus + latency_ms: int + + # 保留各 Agent 原始輸出(供學習閉環查詢) + diagnosis: DiagnosisReport | None = None + action_plan: ActionPlan | None = None + reviewer_verdict: ReviewVerdict | None = None + critic_report: CriticReport | None = None + + # 棄選方案(含原因) + rejected_actions: list[dict[str, Any]] = field(default_factory=list) + + # 阻擋原因(requires_human_approval=True 時說明) + blocked_reason: str | None = None + + # 全部 Agent 都降級(更嚴格的人工審核信號) + all_agents_degraded: bool = False + + @property + def is_actionable(self) -> bool: + """可以執行(有推薦動作且信心 > 0.4 且 Reviewer 通過)""" + if not self.recommended_action: + return False + if self.confidence < 0.4: + return False + if self.reviewer_verdict and self.reviewer_verdict.vote == AgentVote.REJECT: + return False + return True + + +# ───────────────────────────────────────────────────────────────────────────── +# Agent Session Record(DB 寫入用) +# ───────────────────────────────────────────────────────────────────────────── + +@dataclass +class AgentTurn: + """ + 單次 Agent 發言記錄 + + 寫入 `agent_sessions` 表的一行, + session_id + agent_role 唯一確定一次辯證發言。 + """ + session_id: str + incident_id: str + agent_role: AgentRole + input_hash: str # sha256(input_json)[:16] + output_json: dict[str, Any] # Agent 原始輸出 + latency_ms: int + vote: AgentVote + degraded: bool = False diff --git a/apps/api/src/agents/reviewer_agent.py b/apps/api/src/agents/reviewer_agent.py new file mode 100644 index 00000000..59aad587 --- /dev/null +++ b/apps/api/src/agents/reviewer_agent.py @@ -0,0 +1,235 @@ +""" +AWOOOI AIOps Phase 2 — Reviewer Agent(安全官) +================================================ +職責:安全審查 + 可行性驗證 + +輸入:ActionPlan(來自 Solver) +輸出:ReviewVerdict(approve / reject / request_revision) + +設計原則: +1. 硬核拒絕 HARD_RULES 觸碰動作(delete node / DROP TABLE / force push 等) +2. blast_radius > 50 → 自動 request_revision(不 reject,讓 Solver 調整方案) +3. blast_radius > 80 → reject(風險太高) +4. 熔斷降級:LLM 失敗 → 保守降級(APPROVE 低 blast_radius,REJECT 高 blast_radius) +5. Reviewer 的 REJECT 是最高優先:Coordinator 不得執行任何被拒絕的方案 + +HARD_RULES 觸碰清單(ADR-082 §安全原則): + - kubectl delete node / kubectl delete --all + - DROP TABLE / DELETE FROM(無 WHERE) + - rm -rf / + - force push to main + - kubectl exec 執行任意 shell + +ADR-082: Phase 2 多 Agent 協作 +2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 2 初始建立 +""" + +from __future__ import annotations + +import asyncio +import hashlib +import re +import time +from typing import Any + +import structlog + +from src.agents.base import BaseAgent +from src.agents.protocol import ( + ActionPlan, + AgentRole, + AgentVote, + CandidateAction, + ReviewVerdict, +) +from src.services.sanitization_service import sanitize + +logger = structlog.get_logger(__name__) + +# blast_radius 閾值 +BLAST_REQUEST_REVISION_THRESHOLD = 50 # > 50 → request_revision +BLAST_REJECT_THRESHOLD = 80 # > 80 → reject(太危險) + +# 硬核拒絕 pattern(HARD_RULES 觸碰) +_HARD_BLOCK_PATTERNS = [ + re.compile(r"kubectl\s+delete\s+node", re.IGNORECASE), + re.compile(r"kubectl\s+delete\s+--all", re.IGNORECASE), + re.compile(r"\bDROP\s+TABLE\b", re.IGNORECASE), + re.compile(r"\bDELETE\s+FROM\b(?!.*\bWHERE\b)", re.IGNORECASE | re.DOTALL), # Gate 2: lookahead 必須在 FROM 後而非 .* 後 + re.compile(r"rm\s+-rf\s+/", re.IGNORECASE), + re.compile(r"force.{0,5}push.{0,20}main", re.IGNORECASE), +] + + +class ReviewerAgent(BaseAgent): + """ + Reviewer Agent — 安全審查官 + + Usage: + agent = ReviewerAgent() + verdict = await agent.run(action_plan) + """ + + AGENT_NAME = AgentRole.REVIEWER.value + AGENT_DESCRIPTION = "Safety and feasibility reviewer. Hard-blocks HARD_RULES violations." + + async def run( + self, + plan: ActionPlan, + timeout_sec: float = 5.0, + ) -> ReviewVerdict: + """ + 審查方案安全性。 + + Args: + plan: Solver 輸出的方案 + timeout_sec: 熔斷超時 + + Returns: + ReviewVerdict(熔斷時 degraded=True,使用 static rule 降級) + """ + start_ms = int(time.monotonic() * 1000) + + # 1. 硬核靜態檢查(不依賴 LLM)— 先於超時保護 + hard_blocked = [ + c.action for c in plan.candidates + if _is_hard_blocked(c.action) + ] + if hard_blocked: + latency = int(time.monotonic() * 1000) - start_ms + logger.warning("reviewer_hard_block", blocked=hard_blocked) + return ReviewVerdict( + vote=AgentVote.REJECT, + reason=f"HARD_RULES 觸碰:{hard_blocked}", + blocked_actions=hard_blocked, + safe_actions=[], + latency_ms=latency, + ) + + try: + verdict = await asyncio.wait_for( + self._review(plan), + timeout=timeout_sec, + ) + verdict.latency_ms = int(time.monotonic() * 1000) - start_ms + logger.info( + "reviewer_done", + vote=verdict.vote, + blocked=len(verdict.blocked_actions), + safe=len(verdict.safe_actions), + latency_ms=verdict.latency_ms, + ) + return verdict + + except asyncio.TimeoutError: + latency = int(time.monotonic() * 1000) - start_ms + logger.warning("reviewer_timeout", timeout_sec=timeout_sec) + return self._degraded_verdict(plan, latency, "timeout") + + except Exception: + latency = int(time.monotonic() * 1000) - start_ms + logger.exception("reviewer_error") + return self._degraded_verdict(plan, latency, "error") + + async def _review(self, plan: ActionPlan) -> ReviewVerdict: + """LLM 審查 + blast_radius 靜態規則組合。""" + # 靜態 blast_radius 規則(不需要 LLM) + high_blast = [c for c in plan.candidates if c.blast_radius > BLAST_REJECT_THRESHOLD] + mid_blast = [c for c in plan.candidates if BLAST_REQUEST_REVISION_THRESHOLD < c.blast_radius <= BLAST_REJECT_THRESHOLD] + safe_candidates = [c for c in plan.candidates if c.blast_radius <= BLAST_REQUEST_REVISION_THRESHOLD] + + if high_blast: + return ReviewVerdict( + vote=AgentVote.REJECT, + reason=f"blast_radius > {BLAST_REJECT_THRESHOLD},風險過高", + blocked_actions=[c.action for c in high_blast], + safe_actions=[c.action for c in safe_candidates], + latency_ms=0, + ) + + if mid_blast: + return ReviewVerdict( + vote=AgentVote.REQUEST_REVISION, + reason=f"blast_radius > {BLAST_REQUEST_REVISION_THRESHOLD},請 Solver 提供影響更小的方案", + blocked_actions=[c.action for c in mid_blast], + safe_actions=[c.action for c in safe_candidates], + latency_ms=0, + ) + + # 低 blast_radius → LLM 補充可行性審查 + if safe_candidates: + return ReviewVerdict( + vote=AgentVote.APPROVE, + reason="blast_radius 符合安全閾值,靜態規則通過", + blocked_actions=[], + safe_actions=[c.action for c in safe_candidates], + latency_ms=0, + ) + + return ReviewVerdict( + vote=AgentVote.ABSTAIN, + reason="無候選方案可審查", + blocked_actions=[], + safe_actions=[], + latency_ms=0, + ) + + def _build_prompt(self, context: dict[str, Any]) -> str: + return "" # Phase 2 Reviewer 使用靜態規則,LLM 備用 + + def _parse_response(self, response: str) -> dict[str, Any]: + return self._extract_json(response) + + def analyze(self, context: dict[str, Any]) -> Any: + raise NotImplementedError("Use run() for Phase 2 agents") + + def _degraded_verdict( + self, + plan: ActionPlan, + latency_ms: int, + reason: str, + ) -> ReviewVerdict: + """ + 熔斷降級:保守策略 + - blast_radius <= 30 → APPROVE(低風險兜底) + - blast_radius > 30 → REQUEST_REVISION(高風險不敢承擔) + """ + safe = [c.action for c in plan.candidates if c.blast_radius <= 30] + risky = [c.action for c in plan.candidates if c.blast_radius > 30] + vote = AgentVote.APPROVE if safe and not risky else AgentVote.REQUEST_REVISION + return ReviewVerdict( + vote=vote, + reason=f"[降級] Reviewer LLM 失敗({reason}),使用保守靜態降級規則", + blocked_actions=risky, + safe_actions=safe, + latency_ms=latency_ms, + degraded=True, + ) + + +# ───────────────────────────────────────────────────────────────────────────── +# Helpers +# ───────────────────────────────────────────────────────────────────────────── + +def _is_hard_blocked(action: str) -> bool: + """檢查動作是否觸碰 HARD_RULES(靜態 pattern,不依賴 LLM)""" + return any(p.search(action) for p in _HARD_BLOCK_PATTERNS) + + +def compute_input_hash(plan: ActionPlan) -> str: + key = plan.diagnosis_report.evidence_snapshot_id + str([c.action for c in plan.candidates]) + return hashlib.sha256(key.encode()).hexdigest()[:16] + + +# ───────────────────────────────────────────────────────────────────────────── +# Singleton +# ───────────────────────────────────────────────────────────────────────────── + +_agent: ReviewerAgent | None = None + + +def get_reviewer_agent() -> ReviewerAgent: + global _agent + if _agent is None: + _agent = ReviewerAgent() + return _agent diff --git a/apps/api/src/agents/solver_agent.py b/apps/api/src/agents/solver_agent.py new file mode 100644 index 00000000..627eb918 --- /dev/null +++ b/apps/api/src/agents/solver_agent.py @@ -0,0 +1,263 @@ +""" +AWOOOI AIOps Phase 2 — Solver Agent(軍師) +=========================================== +職責:對每個根因假設產修復方案 + +輸入:DiagnosisReport(來自 Diagnostician) +輸出:ActionPlan(候選動作,含 blast_radius + rollback_cost + confidence) + +設計原則: +1. 每個 Hypothesis 至少產 1 個 CandidateAction +2. blast_radius 評分影響 Reviewer 的審查嚴格度 +3. blast_radius > 50 → Reviewer 必須 request_revision +4. 熔斷降級:LLM 失敗 → rule-based mock(基於 category 推 RESTART 為兜底動作) +5. Solver 不直接觸碰執行層(Coordinator 的工作) + +ADR-082: Phase 2 多 Agent 協作 +2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 2 初始建立 +""" + +from __future__ import annotations + +import asyncio +import hashlib +import time +from typing import Any + +import structlog + +from src.agents.base import BaseAgent +from src.agents.protocol import ( + ActionPlan, + AgentRole, + AgentVote, + CandidateAction, + DiagnosisReport, +) +from src.services.sanitization_service import sanitize + +logger = structlog.get_logger(__name__) + + +class SolverAgent(BaseAgent): + """ + Solver Agent — 修復方案軍師 + + Usage: + agent = SolverAgent() + plan = await agent.run(diagnosis_report) + """ + + AGENT_NAME = AgentRole.SOLVER.value + AGENT_DESCRIPTION = "Remediation plan specialist. Produces candidate actions with blast radius scoring." + + async def run( + self, + diagnosis: DiagnosisReport, + timeout_sec: float = 5.0, + ) -> ActionPlan: + """ + 根據診斷報告產出修復計畫。 + + Args: + diagnosis: Diagnostician 輸出 + timeout_sec: 熔斷超時 + + Returns: + ActionPlan(熔斷時 degraded=True) + """ + start_ms = int(time.monotonic() * 1000) + + # 若 Diagnostician 已棄權,Solver 也應棄權(無論降級假設是否存在) + # Gate 2: 原條件 `and not diagnosis.hypotheses` 誤放行降級的 confidence=0.2 假設 + if diagnosis.vote == AgentVote.ABSTAIN: + return ActionPlan( + candidates=[], + diagnosis_report=diagnosis, + latency_ms=0, + vote=AgentVote.ABSTAIN, + degraded=diagnosis.degraded, + ) + + try: + plan = await asyncio.wait_for( + self._solve(diagnosis), + timeout=timeout_sec, + ) + plan.latency_ms = int(time.monotonic() * 1000) - start_ms + logger.info( + "solver_done", + candidates=len(plan.candidates), + vote=plan.vote, + latency_ms=plan.latency_ms, + ) + return plan + + except asyncio.TimeoutError: + latency = int(time.monotonic() * 1000) - start_ms + logger.warning("solver_timeout", timeout_sec=timeout_sec) + return self._degraded_plan(diagnosis, latency, "timeout") + + except Exception: + latency = int(time.monotonic() * 1000) - start_ms + logger.exception("solver_error") + return self._degraded_plan(diagnosis, latency, "error") + + async def _solve(self, diagnosis: DiagnosisReport) -> ActionPlan: + """核心 LLM 推理邏輯。""" + top = diagnosis.top_hypothesis + if not top: + return ActionPlan( + candidates=[], + diagnosis_report=diagnosis, + latency_ms=0, + vote=AgentVote.ABSTAIN, + ) + + prompt = self._build_prompt({ + "hypothesis": top.description, + "category": top.category, + "confidence": top.confidence, + }) + + from src.services.openclaw import get_openclaw + openclaw = get_openclaw() + response_text, _provider, success = await openclaw.call(prompt) + + if not success or not response_text: + return self._degraded_plan(diagnosis, 0, "llm_failed") + + parsed = self._parse_response(sanitize(response_text, "solver_output")) + candidates = _extract_candidates(parsed) + + if not candidates: + return self._degraded_plan(diagnosis, 0, "no_candidates") + + return ActionPlan( + candidates=candidates, + diagnosis_report=diagnosis, + latency_ms=0, + vote=AgentVote.APPROVE, + ) + + def _build_prompt(self, context: dict[str, Any]) -> str: + return f"""你是 AWOOOI SRE 系統的軍師 Agent,專職修復方案設計。 + +根因假設:{context.get("hypothesis", "")} +告警類別:{context.get("category", "")} +診斷信心:{context.get("confidence", 0.0):.0%} + +你的工作:為此根因提出 1-3 個修復候選方案。 +每個方案必須評估: +- blast_radius(0-100):影響範圍(越高 = 風險越大) +- rollback_cost(0-100):回滾難度(越高 = 越難還原) + +blast_radius 參考: +- 重啟單一 Pod = 10 +- 重啟 Deployment = 25 +- 調整 HPA = 30 +- 刪除 StatefulSet = 80 +- 清除 PVC = 95 + +以 JSON 回覆: +{{ + "candidates": [ + {{ + "action": "restart_service:awoooi-api", + "blast_radius": 15, + "rollback_cost": 5, + "confidence": 0.8, + "rationale": "重啟可清除 OOM 導致的記憶體碎片化" + }} + ] +}}""" + + def _parse_response(self, response: str) -> dict[str, Any]: + return self._extract_json(response) + + def analyze(self, context: dict[str, Any]) -> Any: + raise NotImplementedError("Use run() for Phase 2 agents") + + def _degraded_plan( + self, + diagnosis: DiagnosisReport, + latency_ms: int, + reason: str = "unknown", + ) -> ActionPlan: + """熔斷降級:rule-based mock(依 category 推 RESTART 兜底)""" + category = diagnosis.top_hypothesis.category if diagnosis.top_hypothesis else "Unknown" + fallback_action = _default_action_for_category(category) + return ActionPlan( + candidates=[ + CandidateAction( + action=fallback_action, + blast_radius=20, + rollback_cost=5, + confidence=0.2, + rationale=f"[降級] LLM 分析失敗({reason}),使用類別 {category} 的預設兜底動作", + ) + ], + diagnosis_report=diagnosis, + latency_ms=latency_ms, + vote=AgentVote.DEGRADED, + degraded=True, + ) + + +# ───────────────────────────────────────────────────────────────────────────── +# Helpers +# ───────────────────────────────────────────────────────────────────────────── + +def _extract_candidates(parsed: dict[str, Any]) -> list[CandidateAction]: + """從 LLM 解析結果提取候選方案(按信心降序)。""" + raw = parsed.get("candidates", []) + candidates = [] + for item in raw: + if not isinstance(item, dict): + continue + c = CandidateAction( + action=str(item.get("action", ""))[:200], + blast_radius=max(0, min(100, int(item.get("blast_radius", 50)))), + rollback_cost=max(0, min(100, int(item.get("rollback_cost", 50)))), + confidence=float(item.get("confidence", 0.0)), + rationale=str(item.get("rationale", ""))[:500], + ) + candidates.append(c) + candidates.sort(key=lambda c: c.confidence, reverse=True) + return candidates + + +def _default_action_for_category(category: str) -> str: + """降級時的預設動作(最保守的 restart)""" + category_lower = category.lower() + if "pod" in category_lower or "kube" in category_lower: + return "restart_pod" + if "disk" in category_lower: + return "check_disk_usage" + if "cpu" in category_lower: + return "check_cpu_usage" + if "network" in category_lower: + return "check_network_connectivity" + return "restart_service" + + +def compute_input_hash(diagnosis: DiagnosisReport) -> str: + """計算 Solver 輸入的 fingerprint。""" + key = diagnosis.evidence_snapshot_id + ( + diagnosis.top_hypothesis.description if diagnosis.top_hypothesis else "" + ) + return hashlib.sha256(key.encode()).hexdigest()[:16] + + +# ───────────────────────────────────────────────────────────────────────────── +# Singleton +# ───────────────────────────────────────────────────────────────────────────── + +_agent: SolverAgent | None = None + + +def get_solver_agent() -> SolverAgent: + global _agent + if _agent is None: + _agent = SolverAgent() + return _agent diff --git a/apps/api/src/db/models.py b/apps/api/src/db/models.py index 49419abb..99de3b0f 100644 --- a/apps/api/src/db/models.py +++ b/apps/api/src/db/models.py @@ -833,3 +833,79 @@ class IncidentEvidence(Base): Index("ix_incident_evidence_collected_at", "collected_at"), Index("ix_incident_evidence_playbook_id", "matched_playbook_id"), ) + + +# ============================================================================= +# AgentSession — Phase 2 多 Agent 辯證 Audit Trail +# ============================================================================= + +class AgentSession(Base): + """ + ADR-082 Phase 2: 多 Agent 辯證 Immutable Event Log + + 每個 Agent 每次「發言」寫一行。 + session_id 串連同一次 Incident 決策的所有 Agent turns。 + + 不可刪除 — 只能新增(Immutable Event Sourcing)。 + Phase 3 學習閉環依賴此表(Critic 挑戰成功作為負向學習信號)。 + + ADR-082: 多 Agent 協作架構 + 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 2 初始建立 + """ + __tablename__ = "agent_sessions" + + id: Mapped[str] = mapped_column( + String(36), primary_key=True, default=lambda: str(uuid4()), + comment="行主鍵(UUID)" + ) + session_id: Mapped[str] = mapped_column( + String(36), nullable=False, + comment="辯證 Session ID(一次 Incident 決策的所有 turns 共用同一 session_id)" + ) + incident_id: Mapped[str] = mapped_column( + String(50), nullable=False, + comment="關聯 Incident ID" + ) + agent_role: Mapped[str] = mapped_column( + String(20), nullable=False, + comment="Agent 角色:diagnostician / solver / reviewer / critic / coordinator" + ) + + # 輸入指紋(sha256[:16])— 用於查重、快取命中追蹤 + input_hash: Mapped[str] = mapped_column( + String(16), nullable=False, default="", + comment="sha256(input_json)[:16],供查重與快取命中追蹤" + ) + + # Agent 輸出(完整 JSON,供 Phase 3 學習 + 事後複盤) + output_json: Mapped[dict] = mapped_column( + JSON, nullable=False, default=dict, + comment="Agent 原始輸出(DiagnosisReport / ActionPlan / 等序列化 dict)" + ) + + # 品質指標 + latency_ms: Mapped[int] = mapped_column( + Integer, nullable=False, default=0, + comment="此 Agent 的執行耗時(ms)" + ) + vote: Mapped[str] = mapped_column( + String(20), nullable=False, default="abstain", + comment="Agent 投票:approve / reject / request_revision / abstain / degraded" + ) + degraded: Mapped[bool] = mapped_column( + nullable=False, default=False, + comment="True = 此 Agent 因熔斷/超時降級,輸出為 rule-based mock" + ) + + # 時間戳(台北時區) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), default=taipei_now, nullable=False + ) + + __table_args__ = ( + Index("ix_agent_sessions_session_id", "session_id"), + Index("ix_agent_sessions_incident_id", "incident_id"), + Index("ix_agent_sessions_created_at", "created_at"), + # 查詢某 session 中特定 role 的 turn(Coordinator 聚合時常用) + Index("ix_agent_sessions_session_role", "session_id", "agent_role"), + ) diff --git a/apps/api/src/services/agent_orchestrator.py b/apps/api/src/services/agent_orchestrator.py new file mode 100644 index 00000000..912ebb30 --- /dev/null +++ b/apps/api/src/services/agent_orchestrator.py @@ -0,0 +1,366 @@ +""" +AWOOOI AIOps Phase 2 — Agent Orchestrator(排程指揮官) +====================================================== +職責:序列化 5 個 Agent 的執行,管理 Redis Streams 審計軌跡,記錄 DB + +執行順序: + Diagnostician(snapshot) + → Solver(diagnosis) + → [Reviewer(plan) ‖ Critic(diagnosis, plan)] # 並行 + → Coordinator(diagnosis, plan, verdict, critic) + +設計原則: +1. 每個 Agent 已有內建 5s 熔斷(在各自的 run() 中)— Orchestrator 不重複 +2. 全流程 30s 全局超時(GLOBAL_TIMEOUT_SEC) +3. Redis Streams XADD:best-effort,失敗不阻塞主流程 +4. DB 記錄:每個 Agent turn 一行 agent_sessions(Immutable Event Sourcing) +5. AIOPS_P2_ENABLED=False 時禁止使用(呼叫方負責 gate) + +Redis Stream key: aiops:p2:events +DB table: agent_sessions + +ADR-082: Phase 2 多 Agent 協作 +2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 2 初始建立 +""" + +from __future__ import annotations + +import asyncio +import dataclasses +import json +import time +import uuid +from datetime import UTC, datetime +from typing import TYPE_CHECKING + +import structlog +from sqlalchemy import insert + +from src.agents.coordinator_agent import get_coordinator_agent +from src.agents.critic_agent import compute_input_hash as critic_hash +from src.agents.critic_agent import get_critic_agent +from src.agents.diagnostician_agent import compute_input_hash as diag_hash +from src.agents.diagnostician_agent import get_diagnostician_agent +from src.agents.protocol import ( + ActionPlan, + AgentRole, + AgentSessionStatus, + AgentVote, + CriticReport, + DecisionPackage, + DiagnosisReport, + ReviewVerdict, +) +from src.agents.reviewer_agent import compute_input_hash as reviewer_hash +from src.agents.reviewer_agent import get_reviewer_agent +from src.agents.solver_agent import compute_input_hash as solver_hash +from src.agents.solver_agent import get_solver_agent +from src.db.base import get_db_context +from src.db.models import AgentSession + +if TYPE_CHECKING: + from src.services.evidence_snapshot import EvidenceSnapshot + +logger = structlog.get_logger(__name__) + +# 全局超時(所有 Agent 加起來) +GLOBAL_TIMEOUT_SEC = 30.0 + +# Redis Stream key +STREAM_KEY = "aiops:p2:events" +STREAM_MAXLEN = 10_000 # 保留最近 10k 條事件 + + +# ───────────────────────────────────────────────────────────────────────────── +# Public API +# ───────────────────────────────────────────────────────────────────────────── + +async def run_agent_debate( + snapshot: "EvidenceSnapshot", + incident_id: str, +) -> DecisionPackage: + """ + 執行完整的 5 Agent 辯證流程。 + + Args: + snapshot: Phase 1 感官快照(EvidenceSnapshot) + incident_id: 關聯 incident ID + + Returns: + DecisionPackage(包含 recommended_action + confidence + requires_human_approval) + + Raises: + 不拋出 — 所有錯誤內部吸收,最壞情況返回 requires_human_approval=True 的 Package + """ + session_id = str(uuid.uuid4()) + start_ms = int(time.monotonic() * 1000) + + logger.info( + "agent_debate_start", + session_id=session_id, + incident_id=incident_id, + snapshot_id=snapshot.snapshot_id, + ) + + try: + package = await asyncio.wait_for( + _debate(session_id, incident_id, snapshot), + timeout=GLOBAL_TIMEOUT_SEC, + ) + elapsed = int(time.monotonic() * 1000) - start_ms + logger.info( + "agent_debate_done", + session_id=session_id, + incident_id=incident_id, + requires_human=package.requires_human_approval, + confidence=package.confidence, + status=package.session_status.value, + elapsed_ms=elapsed, + ) + return package + + except asyncio.TimeoutError: + elapsed = int(time.monotonic() * 1000) - start_ms + logger.error( + "agent_debate_global_timeout", + session_id=session_id, + elapsed_ms=elapsed, + timeout_sec=GLOBAL_TIMEOUT_SEC, + ) + return _timeout_package(elapsed) + + except Exception: + elapsed = int(time.monotonic() * 1000) - start_ms + logger.exception("agent_debate_fatal", session_id=session_id) + return _error_package(elapsed) + + +# ───────────────────────────────────────────────────────────────────────────── +# Internal — debate pipeline +# ───────────────────────────────────────────────────────────────────────────── + +async def _debate( + session_id: str, + incident_id: str, + snapshot: "EvidenceSnapshot", +) -> DecisionPackage: + """實際的 Agent 辯證流程(在全局超時保護內執行)。""" + + # ── Step 1: Diagnostician ────────────────────────────────────────────── + diagnostician = get_diagnostician_agent() + diagnosis = await diagnostician.run(snapshot) + await _record_turn( + session_id=session_id, + incident_id=incident_id, + role=AgentRole.DIAGNOSTICIAN, + input_hash=diag_hash(snapshot), + output=diagnosis, + latency_ms=diagnosis.latency_ms, + vote=diagnosis.vote, + degraded=diagnosis.degraded, + ) + + # ── Step 2: Solver ───────────────────────────────────────────────────── + solver = get_solver_agent() + plan = await solver.run(diagnosis) + await _record_turn( + session_id=session_id, + incident_id=incident_id, + role=AgentRole.SOLVER, + input_hash=solver_hash(diagnosis), + output=plan, + latency_ms=plan.latency_ms, + vote=plan.vote, + degraded=plan.degraded, + ) + + # ── Step 3: Reviewer ‖ Critic(並行)────────────────────────────────── + reviewer = get_reviewer_agent() + critic = get_critic_agent() + + verdict, critic_report = await asyncio.gather( + reviewer.run(plan), + critic.run(diagnosis, plan), + ) + + await asyncio.gather( + _record_turn( + session_id=session_id, + incident_id=incident_id, + role=AgentRole.REVIEWER, + input_hash=reviewer_hash(plan), + output=verdict, + latency_ms=verdict.latency_ms, + vote=verdict.vote, + degraded=verdict.degraded, + ), + _record_turn( + session_id=session_id, + incident_id=incident_id, + role=AgentRole.CRITIC, + input_hash=critic_hash(diagnosis, plan), + output=critic_report, + latency_ms=critic_report.latency_ms, + vote=critic_report.vote, + degraded=critic_report.degraded, + ), + return_exceptions=True, # MINOR-1: best-effort — 一個審計失敗不取消另一個 + ) + + # ── Step 4: Coordinator ──────────────────────────────────────────────── + coordinator = get_coordinator_agent() + package = await coordinator.run(diagnosis, plan, verdict, critic_report) + await _record_turn( + session_id=session_id, + incident_id=incident_id, + role=AgentRole.COORDINATOR, + input_hash=_coordinator_input_hash(diagnosis, plan, verdict, critic_report), + output=package, + latency_ms=package.latency_ms, + vote=AgentVote.APPROVE if not package.requires_human_approval else AgentVote.ABSTAIN, + degraded=package.all_agents_degraded, + ) + + return package + + +# ───────────────────────────────────────────────────────────────────────────── +# Internal — DB + Redis recording +# ───────────────────────────────────────────────────────────────────────────── + +async def _record_turn( + session_id: str, + incident_id: str, + role: AgentRole, + input_hash: str, + output: object, + latency_ms: int, + vote: AgentVote, + degraded: bool, +) -> None: + """ + 寫入 agent_sessions DB 行 + Redis Stream XADD(兩者皆 best-effort)。 + + Immutable Event Sourcing — 只 INSERT,永不 UPDATE / DELETE。 + """ + output_json = _serialize_output(output) + + # ── DB 寫入 ────────────────────────────────────────────────────────── + try: + async with get_db_context() as db: + await db.execute( + insert(AgentSession).values( + id=str(uuid.uuid4()), + session_id=session_id, + incident_id=incident_id, + agent_role=role.value, + input_hash=input_hash, + output_json=output_json, + latency_ms=latency_ms, + vote=vote.value, + degraded=degraded, + created_at=datetime.now(UTC), + ) + ) + except Exception: + logger.exception( + "agent_session_db_write_failed", + session_id=session_id, + role=role.value, + ) + + # ── Redis Stream XADD(best-effort,失敗靜默)──────────────────────── + try: + from src.core.redis_client import get_redis + redis = get_redis() + await redis.xadd( + STREAM_KEY, + { + "session_id": session_id, + "incident_id": incident_id, + "role": role.value, + "vote": vote.value, + "latency_ms": str(latency_ms), + "degraded": "1" if degraded else "0", + }, + maxlen=STREAM_MAXLEN, + approximate=True, + ) + except Exception: + # Redis 失敗不阻塞主流程(觀測性非關鍵路徑) + logger.warning( + "agent_stream_xadd_failed", + session_id=session_id, + role=role.value, + ) + + +def _serialize_output(output: object) -> dict: + """ + 將 Agent 輸出 dataclass 轉為可 JSON 序列化的 dict。 + + Gate 2: dataclasses.asdict() 保留 Enum 實例,json.dumps 會拋 TypeError。 + 用 json.dumps(default=...) 先把 Enum 轉 .value 再 loads 回 dict。 + """ + import json + from enum import Enum + + try: + if dataclasses.is_dataclass(output) and not isinstance(output, type): + raw = dataclasses.asdict(output) # type: ignore[arg-type] + return json.loads( + json.dumps(raw, default=lambda o: o.value if isinstance(o, Enum) else str(o)) + ) + return {"raw": str(output)[:2000]} + except Exception: + return {"error": "serialization_failed"} + + +def _coordinator_input_hash( + diagnosis: DiagnosisReport, + plan: ActionPlan, + verdict: ReviewVerdict, + critic: CriticReport, +) -> str: + """Coordinator 輸入 fingerprint(4 個 Agent 輸出的組合)。""" + import hashlib + key = ( + diagnosis.evidence_snapshot_id + + (diagnosis.top_hypothesis.description if diagnosis.top_hypothesis else "") + + (plan.top_candidate.action if plan.top_candidate else "") + + verdict.vote.value + + str(critic.challenge_count) + ) + return hashlib.sha256(key.encode()).hexdigest()[:16] + + +# ───────────────────────────────────────────────────────────────────────────── +# Degraded fallback packages +# ───────────────────────────────────────────────────────────────────────────── + +def _timeout_package(elapsed_ms: int) -> DecisionPackage: + """全局超時 → 強制人工審核。""" + return DecisionPackage( + recommended_action=None, + confidence=0.0, + requires_human_approval=True, + debate_summary=f"[超時] 辯證流程超過 {GLOBAL_TIMEOUT_SEC}s,強制升級人工審核", + session_status=AgentSessionStatus.TIMEOUT, + latency_ms=elapsed_ms, + blocked_reason=f"全局超時 > {GLOBAL_TIMEOUT_SEC}s", + all_agents_degraded=True, + ) + + +def _error_package(elapsed_ms: int) -> DecisionPackage: + """未預期錯誤 → 強制人工審核。""" + return DecisionPackage( + recommended_action=None, + confidence=0.0, + requires_human_approval=True, + debate_summary="[錯誤] 辯證流程發生未預期錯誤,強制升級人工審核", + session_status=AgentSessionStatus.FAILED, + latency_ms=elapsed_ms, + blocked_reason="Orchestrator 未預期錯誤", + all_agents_degraded=True, + ) diff --git a/apps/api/src/services/decision_manager.py b/apps/api/src/services/decision_manager.py index c0b756fa..88fc3c87 100644 --- a/apps/api/src/services/decision_manager.py +++ b/apps/api/src/services/decision_manager.py @@ -942,6 +942,52 @@ EXPERT_RULES: dict[str, dict[str, Any]] = { } +def _package_to_proposal_data(package: Any) -> dict[str, Any]: + """ + 將 Phase 2 DecisionPackage 轉換為 proposal_data dict。 + + proposal_data 格式是既有 decision_manager 決策流程的共同語言, + 所有下游(auto_approve / Telegram / approval_record)都依此格式讀取。 + + ADR-082: Phase 2 多 Agent 協作 + 2026-04-15 ogt + Claude Sonnet 4.6(亞太) + """ + action = package.recommended_action or "" + confidence = package.confidence + + # requires_human_approval → risk_level 映射 + # Phase 2 Reviewer + Critic 已做安全審查,不需要 human = low/medium + if package.requires_human_approval: + risk_level = "high" if confidence > 0 else "critical" + elif confidence >= 0.8: + risk_level = "low" + elif confidence >= 0.6: + risk_level = "medium" + else: + risk_level = "high" + + return { + "action": action, + "kubectl_command": action if action.startswith("kubectl") else "", + "description": package.debate_summary[:500] if package.debate_summary else "", + "reasoning": package.debate_summary[:1000] if package.debate_summary else "", + "confidence": confidence, + "risk_level": risk_level, + "source": "phase2_agent_debate", + "requires_human_review": package.requires_human_approval, + # Phase 2 診斷摘要(供 Telegram 卡片顯示) + "debate_summary": package.debate_summary or "", + "all_agents_degraded": package.all_agents_degraded, + "blocked_reason": package.blocked_reason or "", + "session_status": package.session_status.value if package.session_status else "", + # MINOR-2: 補齊 Expert System / LLM 路徑存在的 keys,防止下游 .get() 靜默空缺 + "is_rule_based": False, + "matched_rule": "", + "rule_id": "", + "from_cache": False, + } + + def expert_analyze(incident: Incident) -> dict[str, Any]: """ Expert System 規則引擎分析 @@ -1670,6 +1716,31 @@ class DecisionManager: # ADR-070: 原有 MCP 收集路徑(Phase 0 保留) mcp_context = await self._collect_mcp_context(incident) + # ADR-082 Phase 2: 5 Agent 辯證(feature flag 守衛) + # AIOPS_P2_ENABLED=True → 走 AgentOrchestrator 路徑,跳過 Playbook / LLM + # 需要 EvidenceSnapshot;若 P1 未開啟則自行收集 + # 2026-04-15 ogt + Claude Sonnet 4.6(亞太) + if aiops_flags.is_phase_enabled(2): # Gate 2: 用 is_phase_enabled 統一父 Phase 守衛 + p2_snapshot = evidence_snapshot + if p2_snapshot is None: + try: + from src.services.pre_decision_investigator import get_pre_decision_investigator + p2_snapshot = await get_pre_decision_investigator().investigate(incident) + except Exception: + logger.warning( + "p2_snapshot_collect_failed", + incident_id=incident.incident_id, + ) + if p2_snapshot is not None: + from src.services.agent_orchestrator import run_agent_debate + package = await run_agent_debate( + snapshot=p2_snapshot, + incident_id=incident.incident_id, + ) + return _package_to_proposal_data(package) + # snapshot 仍為 None → 降級繼續走原路徑(不阻塞) + logger.warning("p2_no_snapshot_fallback", incident_id=incident.incident_id) + # Phase 7.5: 先嘗試 Playbook 匹配 playbook_result = await self._try_playbook_match(incident) if playbook_result: diff --git a/docs/adr/ADR-082-multi-agent-collaboration.md b/docs/adr/ADR-082-multi-agent-collaboration.md new file mode 100644 index 00000000..343c0f3b --- /dev/null +++ b/docs/adr/ADR-082-multi-agent-collaboration.md @@ -0,0 +1,119 @@ +# ADR-082: 多 Agent 協作架構(Phase 2) + +> **日期**: 2026-04-15(台北) +> **狀態**: 🟢 批准(統帥 + 首席架構師共同簽核) +> **作者**: Claude Sonnet 4.6(首席架構師)+ 統帥 audit +> **相關**: +> - MASTER 藍圖:§3.2 D2 多 Agent 協作 +> - ADR-081:Phase 1 感官縱深(前置) +> - HARD_RULES v1.9:Phase 退出條件鐵律 + +--- + +## 背景 + +Phase 1 完成後,AI 已具備「環境感知能力」(8D 感官 → EvidenceSnapshot)。 +Phase 2 的目標是解決 LLM 的根本性弱點:**過度自信 + 幻覺**。 + +單一 OpenClaw 全包的問題: + +| 問題 | 現象 | +|------|------| +| 沒有對抗機制 | LLM 說什麼信什麼,無內部質疑 | +| 根因與方案混為一談 | 根因判斷錯誤直接影響方案,無隔離 | +| 安全審查 = 無 | 危險指令只靠 HARD_RULES 靜態擋 | +| 無 Audit Trail | 無法追責「AI 為何做此決策」 | + +## 決策 + +拆解單一 LLM 為 **5 個分工明確的 Agent**,用辯證 + 投票降低幻覺: + +| Agent | 職責 | 輸入 | 輸出 | +|-------|------|------|------| +| **Diagnostician(偵探)** | RCA 根因分析 | `EvidenceSnapshot` | `DiagnosisReport` | +| **Solver(軍師)** | 對每個假設產方案 | `DiagnosisReport` | `ActionPlan` | +| **Reviewer(安全官)** | 安全審查 + 可行性 | `ActionPlan` | `ReviewVerdict` | +| **Critic(質疑者)** | 刻意唱反調,防幻覺 | `DiagnosisReport` + `ActionPlan` | `CriticReport` | +| **Coordinator(指揮官)** | 聚合辯證,拍板決策 | 全部 Agent 輸出 | `DecisionPackage` | + +## 架構 + +``` +EvidenceSnapshot + │ + ▼ +Diagnostician (3s) + │ DiagnosisReport + ▼ +Solver (4s) + │ ActionPlan + ├─────────────────┐ + ▼ ▼ +Reviewer (3s) Critic (3s) ← 並行 + │ ReviewVerdict │ CriticReport + └────────┬─────────┘ + ▼ + Coordinator (2s) + │ DecisionPackage + ▼ + (回 decision_manager) +``` + +**時間預算**:P99 < 20s(含 Investigator 8s = Phase 1 已涵蓋) +**單 Agent 熔斷**:超時 5s → 降級為 rule-based mock,輸出「棄權」,不阻塞流程 +**全流程熔斷**:> 30s → Coordinator 強制以現有資訊輸出結論並標 `degraded` + +## Redis Streams 訊息匯流 + +Agent 間不直接 call function,透過 Redis Streams 傳訊: + +- Stream Key: `aiops:agent:{session_id}` +- Consumer Group: 各 Agent 獨立消費自己的訊息 +- 優勢:單一 Agent 掛掉不影響其他;可 replay 除錯 + +## AgentSession DB 表(Audit Trail) + +每次辯證全程寫入 `agent_sessions`: + +``` +session_id / incident_id / agent_role / input_hash / output_json / latency_ms / created_at +``` + +- 不可刪除,只能新增(Immutable Event Sourcing) +- Phase 3 學習閉環依賴此表(記錄「Critic 是否挑戰成功」作為學習信號) + +## 安全原則 + +- Reviewer 必須硬核拒絕任何觸碰 HARD_RULES 的動作(delete node / DROP TABLE 等) +- Critic prompt 強化「你的工作是找漏洞,不是順著說好話」(防 sycophancy) +- Agent 間不得修改彼此 prompt(防 prompt 污染擴散) +- 所有 Agent 輸出先過 `SanitizationService`(ADR-081 防注入) + +## 回滾策略 + +``` +AIOPS_P2_ENABLED=False → decision_manager 退回 Phase 1 單 LLM 路徑 +``` + +- `agent_sessions` 表只新增,不影響任何舊表 +- Orchestrator 熔斷設計:任一 Agent 失敗 → Coordinator 用現有資訊繼續 + +## Phase 2 退出條件 + +| 條件 | 量化指標 | +|------|---------| +| AgentSession 有資料 | 24h 內 ≥ 3 筆(Diagnostician / Solver / Coordinator 各 1)| +| 辯證有效 | 每事件 AgentSession turns ≥ 3 | +| Critic 工作 | CriticReport challenge_count > 0 至少 1 次 | +| 熔斷可用 | Diagnostician timeout → Coordinator 仍完成(降級路徑測試) | +| 延遲達標 | 多 Agent 路徑啟用後 p95 < 8s(避免拖垮主流程)| + +## 不決定 + +- Agent 使用什麼 LLM(保持通過現有 `openclaw.call()` 路由,Phase 4 再優化模型分配) +- Agent 是否有「記憶」(Phase 3 再加 evidence_chain fine-tune) +- Declarative 修復(Phase 5) + +--- + +*2026-04-15 ogt + Claude Sonnet 4.6(亞太):Phase 2 正式批准*