feat(p3.1-t2-patha): DiagnosisAggregator 路徑 A + Solver F4 critical reject + 對齊測試
Some checks failed
CD Pipeline / build-and-deploy (push) Failing after 1m59s
Some checks failed
CD Pipeline / build-and-deploy (push) Failing after 1m59s
Wave 8 P3.1-T2 PathA 啟用 + Solver F4 安全強化 + test 對齊:
PathA — DiagnosisAggregator 信號分類層補 PDI:
- ENABLE_DIAGNOSIS_AGGREGATOR default=False → True
· PathA 純信號分類層(OOMKilled/CrashLoop 等業務邏輯)
· 不重複呼叫 K8s/SignOz API(只取 PDI 已收集的 raw 資料)
· 安全 default on — 純邏輯處理,無外部依賴重疊
- diagnosis_aggregator.py +155 行(PathA 實作)
- pre_decision_investigator.py 已接 (commit 3a2cd151)
F4 — Solver critical risk reject:
- solver_agent.py: _validate_recommended_action 拒絕 risk=critical
· 鐵律:critical 動作必須走人工審批,不可變 Telegram 按鈕
· log warning + return None(被 _extract 過濾掉)
- _extract_recommended_actions 改返回 (list, status_str) tuple
· status="ok"/"empty"/"all_invalid" 供呼叫端決策
- protocol.py +16 / metrics.py +9 / ai_router.py +18 — 配套 metric + protocol field
測試對齊:
- test_solver_recommended_actions.py 拆 test_all_valid → low/medium/high accepted +
test_critical_rejected
- result tuple unpack: result, _ = _extract_recommended_actions(...)
- test_diagnosis_aggregator_stub.py: feature flag default 改 True 對齊 PathA
Tests: 51 passed (solver 28 + aggregator 16 + router fallback 8)
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Multiple Engineers (Wave 8 P3.1-T2 PathA + F4) <noreply@anthropic.com>
This commit is contained in:
@@ -12,6 +12,7 @@ AWOOOI AIOps Phase 2 — 多 Agent 協作訊息協定
|
||||
ADR-082: 多 Agent 協作架構(Phase 2)
|
||||
2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 2 初始建立
|
||||
2026-04-27 Claude Sonnet 4.6: B1 — 新增 RecommendedAction schema(北極星 §1.1 修復多樣性 ≥ 40%)
|
||||
2026-04-27 Claude Sonnet 4.6: H1+B1 Fix Round — ActionPlan.recommended_actions_status enum(可觀測性)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
@@ -20,6 +21,15 @@ from dataclasses import dataclass, field
|
||||
from enum import Enum
|
||||
from typing import Any, Literal
|
||||
|
||||
# 2026-04-27 Claude Sonnet 4.6: H1+B1 Fix Round — recommended_actions_status 型別別名
|
||||
# 方便 solver_agent.py 使用;Literal 比 Enum 輕量且不需要額外 import
|
||||
RecommendedActionsStatus = Literal[
|
||||
"ok", # LLM 推出 ≥ 1 個通過 registry + validator 的 action
|
||||
"empty", # LLM 推 0 個 recommended_actions
|
||||
"schema_failed", # LLM 推但全被 schema / registry 驗證 reject
|
||||
"registry_unavailable",# registry 載入失敗({})
|
||||
]
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Enums
|
||||
@@ -142,6 +152,8 @@ class ActionPlan:
|
||||
2026-04-27 Claude Sonnet 4.6: B1 新增 recommended_actions(結構化動作清單)
|
||||
- recommended_actions 為空 list 代表降級(degraded=True)或 LLM 無法輸出合法動作
|
||||
- Coordinator 舊邏輯只讀 candidates,不受影響
|
||||
2026-04-27 Claude Sonnet 4.6: H1+B1 Fix Round — recommended_actions_status 新增
|
||||
- 可觀測性:B3 Telegram / 監控 dashboard 可讀取此欄位判斷 Solver 品質
|
||||
"""
|
||||
candidates: list[CandidateAction]
|
||||
diagnosis_report: DiagnosisReport
|
||||
@@ -150,6 +162,10 @@ class ActionPlan:
|
||||
degraded: bool = False
|
||||
# 2026-04-27 Claude Sonnet 4.6: B1 — 結構化推薦動作(0-3 個,降級時為 [])
|
||||
recommended_actions: list[RecommendedAction] = field(default_factory=list)
|
||||
# 2026-04-27 Claude Sonnet 4.6: H1+B1 Fix Round — recommended_actions 提取結果狀態
|
||||
# ok=正常, empty=LLM 未輸出, schema_failed=全部驗證失敗, registry_unavailable=registry 載入失敗
|
||||
# 欄位加在尾部,default="ok",不破壞既有 callsite
|
||||
recommended_actions_status: RecommendedActionsStatus = "ok"
|
||||
|
||||
@property
|
||||
def top_candidate(self) -> CandidateAction | None:
|
||||
|
||||
@@ -15,9 +15,16 @@ AWOOOI AIOps Phase 2 — Solver Agent(軍師)
|
||||
5. Solver 不直接觸碰執行層(Coordinator 的工作)
|
||||
6. recommended_actions:結構化 MCP 動作清單,供 B3 Telegram 按鈕動態生成
|
||||
|
||||
安全原則(B1 Fix Round, 2026-04-27):
|
||||
- F1: mcp_tool registry 白名單 — LLM 推薦的 name/mcp_tool 必須對應 registry 存在的 entry
|
||||
- F2: risk 由 registry 蓋寫 — 信代碼不信 LLM;LLM 自報 risk 一律被 registry 值取代
|
||||
- F3: prompt 注入點 sanitize — hypothesis/category/evidence_summary 全部過 sanitize()
|
||||
- F4: critical risk 永遠 reject — critical 動作必須走人工審批,絕不成為 Telegram 按鈕
|
||||
|
||||
ADR-082: Phase 2 多 Agent 協作
|
||||
2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 2 初始建立
|
||||
2026-04-27 Claude Sonnet 4.6: B1 — Solver 結構化動作 (北極星 §1.1 修復多樣性 ≥ 40%)
|
||||
2026-04-27 Claude Sonnet 4.6: B1 Fix Round — F1/F2/F3/F4/H1/H2/H3 安全強化
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
@@ -41,6 +48,7 @@ from src.agents.protocol import (
|
||||
CandidateAction,
|
||||
DiagnosisReport,
|
||||
RecommendedAction,
|
||||
RecommendedActionsStatus,
|
||||
)
|
||||
from src.observability.agent_step_metrics import observe_agent_step
|
||||
from src.services.sanitization_service import sanitize
|
||||
@@ -90,33 +98,98 @@ _VALID_MCP_PROVIDERS = frozenset(
|
||||
# 有效的 risk 等級(與 RecommendedAction schema 對應)
|
||||
_VALID_RISK_LEVELS = frozenset({"low", "medium", "high", "critical"})
|
||||
|
||||
# 2026-04-27 Claude Sonnet 4.6: H2+F4+vuln-V3/V4 — module-level cache(含 mtime invalidation)
|
||||
# 北極星 Blast Radius:registry 載入失敗時立即 set metric=error,不 silent 失敗
|
||||
# cache 結構:{registry: dict, mtime: float | 0.0}
|
||||
_REGISTRY_CACHE: dict[str, dict[str, Any]] = {} # 快取的 registry(過濾後)
|
||||
_REGISTRY_MTIME: float = 0.0 # 上次 YAML mtime(0.0 = 尚未載入)
|
||||
|
||||
|
||||
def _load_mcp_tool_registry() -> dict[str, dict[str, Any]]:
|
||||
"""
|
||||
從 callback_action_spec.yaml 動態載入 MCP tool 清單。
|
||||
從 callback_action_spec.yaml 動態載入 MCP tool 清單(含 module-level cache)。
|
||||
|
||||
2026-04-27 Claude Sonnet 4.6: B1 — 動態載入,避免 hardcode
|
||||
2026-04-27 Claude Sonnet 4.6: H2+F4+vuln-V3/V4 — cache + mtime invalidation + metric + 安全過濾
|
||||
北極星 Blast Radius:critical/multi-sig actions 不放進 registry,絕不暴露給 LLM
|
||||
|
||||
回傳格式:{action_name: {provider, tool, risk, label, emoji}}
|
||||
失敗 → 返回 {} 並 log warning(不中斷 Solver 主流程)
|
||||
過濾規則:
|
||||
- requires_multi_sig=true → 排除(不讓 LLM 看到,避免提示 LLM 自創相似名稱)
|
||||
- risk=critical → 排除(F4: critical 動作必須走人工審批,不成為按鈕)
|
||||
快取邏輯:
|
||||
- 第一次 load 寫 _REGISTRY_CACHE / _REGISTRY_MTIME
|
||||
- 後續 call 比較 YAML mtime,未變化直接回快取(O(1))
|
||||
失敗 → 返回 {} + set metric=error + log error(不 silent)
|
||||
"""
|
||||
global _REGISTRY_CACHE, _REGISTRY_MTIME
|
||||
|
||||
from src.core.metrics import SOLVER_MCP_REGISTRY_LOADED
|
||||
|
||||
# 比較 mtime;無法取得 mtime 視為需要重新載入
|
||||
try:
|
||||
current_mtime = _CALLBACK_SPEC_PATH.stat().st_mtime
|
||||
except OSError:
|
||||
current_mtime = 0.0
|
||||
|
||||
# cache 命中:mtime 未變 且 快取非空
|
||||
if current_mtime > 0.0 and current_mtime == _REGISTRY_MTIME and _REGISTRY_CACHE:
|
||||
return _REGISTRY_CACHE
|
||||
|
||||
try:
|
||||
with _CALLBACK_SPEC_PATH.open(encoding="utf-8") as f:
|
||||
spec = yaml.safe_load(f)
|
||||
registry: dict[str, dict[str, Any]] = {}
|
||||
for name, action in (spec.get("actions") or {}).items():
|
||||
mcp = action.get("mcp") or {}
|
||||
risk_val = str(action.get("risk", "medium")).strip().lower()
|
||||
requires_multi_sig = bool(action.get("requires_multi_sig", False))
|
||||
|
||||
# 2026-04-27 Claude Sonnet 4.6: F4+vuln-V3 — 過濾 critical/multi-sig
|
||||
# 北極星 Blast Radius:critical 動作絕不變成按鈕,也不暴露給 LLM 供模仿
|
||||
if risk_val == "critical" or requires_multi_sig:
|
||||
logger.debug(
|
||||
"mcp_registry_action_filtered",
|
||||
name=name,
|
||||
risk=risk_val,
|
||||
requires_multi_sig=requires_multi_sig,
|
||||
reason="critical 或 requires_multi_sig=true,不放入 LLM 可見 registry",
|
||||
)
|
||||
continue
|
||||
|
||||
registry[name] = {
|
||||
"provider": mcp.get("provider", ""),
|
||||
"tool": mcp.get("tool", ""),
|
||||
"risk": action.get("risk", "low"),
|
||||
"risk": risk_val,
|
||||
"label": action.get("label", name),
|
||||
"emoji": action.get("emoji", ""),
|
||||
"params": mcp.get("params") or {},
|
||||
"description": action.get("description", ""),
|
||||
}
|
||||
|
||||
# 更新快取
|
||||
_REGISTRY_CACHE = registry
|
||||
_REGISTRY_MTIME = current_mtime
|
||||
|
||||
# H2: registry 健康 metric
|
||||
if registry:
|
||||
SOLVER_MCP_REGISTRY_LOADED.labels(status="ok").set(1)
|
||||
SOLVER_MCP_REGISTRY_LOADED.labels(status="empty").set(0)
|
||||
SOLVER_MCP_REGISTRY_LOADED.labels(status="error").set(0)
|
||||
else:
|
||||
SOLVER_MCP_REGISTRY_LOADED.labels(status="empty").set(1)
|
||||
SOLVER_MCP_REGISTRY_LOADED.labels(status="ok").set(0)
|
||||
SOLVER_MCP_REGISTRY_LOADED.labels(status="error").set(0)
|
||||
logger.warning("mcp_registry_empty", path=str(_CALLBACK_SPEC_PATH))
|
||||
|
||||
return registry
|
||||
|
||||
except Exception as e:
|
||||
logger.warning("mcp_registry_load_failed", path=str(_CALLBACK_SPEC_PATH), error=str(e))
|
||||
# H2: 載入失敗設 error metric + log error(不 silent)
|
||||
SOLVER_MCP_REGISTRY_LOADED.labels(status="error").set(1)
|
||||
SOLVER_MCP_REGISTRY_LOADED.labels(status="ok").set(0)
|
||||
SOLVER_MCP_REGISTRY_LOADED.labels(status="empty").set(0)
|
||||
logger.error("mcp_registry_load_failed", path=str(_CALLBACK_SPEC_PATH), error=str(e))
|
||||
return {}
|
||||
|
||||
|
||||
@@ -155,18 +228,27 @@ def _is_safe_kubectl_command(cmd: str) -> bool:
|
||||
return _KUBECTL_COMMAND_PATTERN.fullmatch(cmd) is not None
|
||||
|
||||
|
||||
def _validate_recommended_action(raw: Any) -> RecommendedAction | None:
|
||||
def _validate_recommended_action(
|
||||
raw: Any,
|
||||
registry_actions: dict[str, dict[str, Any]] | None = None,
|
||||
) -> RecommendedAction | None:
|
||||
"""
|
||||
驗證單一 recommended_action LLM 輸出是否符合 RecommendedAction schema。
|
||||
|
||||
2026-04-27 Claude Sonnet 4.6: B1 — schema 驗證,不合規則 graceful skip
|
||||
- mcp_provider 必須在 _VALID_MCP_PROVIDERS 清單
|
||||
- risk 必須在 _VALID_RISK_LEVELS 清單
|
||||
- 任何欄位缺失 → 返回 None,由呼叫端記 warning 並 skip
|
||||
2026-04-27 Claude Opus 4.7: F1+F2+F4+vuln-V3/V4 — registry 交叉驗 + risk 蓋寫 + 拒 critical
|
||||
- F1: name 必須在 registry_actions 內(防 LLM 自創 mcp_tool 如 ssh_run_arbitrary)
|
||||
- F2: risk 由 registry 蓋寫,不信 LLM(防 LLM 把 kubectl_delete_namespace 標 risk=low)
|
||||
- F4: 任何 risk='critical' 永遠 reject(critical 必走人工審批,不變按鈕)
|
||||
|
||||
Args:
|
||||
raw: LLM 輸出的單一 action dict
|
||||
registry_actions: 由 _load_mcp_tool_registry 載入的合法 action 清單。
|
||||
None = 不驗證(向後相容單元測試),但生產路徑必傳
|
||||
|
||||
Returns:
|
||||
RecommendedAction — 驗證通過
|
||||
None — 不符 schema,呼叫端 skip 此項(不假造)
|
||||
RecommendedAction — 驗證通過(risk 已被 registry 值覆蓋)
|
||||
None — 不符 schema 或不在 registry 或 critical,呼叫端 skip 此項(不假造)
|
||||
"""
|
||||
if not isinstance(raw, dict):
|
||||
return None
|
||||
@@ -176,12 +258,12 @@ def _validate_recommended_action(raw: Any) -> RecommendedAction | None:
|
||||
emoji = str(raw.get("emoji", "")).strip()
|
||||
mcp_provider = str(raw.get("mcp_provider", "")).strip().lower()
|
||||
mcp_tool = str(raw.get("mcp_tool", "")).strip()
|
||||
risk = str(raw.get("risk", "")).strip().lower()
|
||||
llm_risk = str(raw.get("risk", "")).strip().lower()
|
||||
reasoning = str(raw.get("reasoning", "")).strip()
|
||||
params_raw = raw.get("params", {})
|
||||
|
||||
# 必填欄位非空
|
||||
if not name or not label or not mcp_provider or not mcp_tool or not risk:
|
||||
if not name or not label or not mcp_provider or not mcp_tool or not llm_risk:
|
||||
return None
|
||||
|
||||
# provider 白名單驗證
|
||||
@@ -194,16 +276,86 @@ def _validate_recommended_action(raw: Any) -> RecommendedAction | None:
|
||||
)
|
||||
return None
|
||||
|
||||
# risk 白名單驗證
|
||||
if risk not in _VALID_RISK_LEVELS:
|
||||
# risk 白名單驗證(先做基本格式檢查)
|
||||
if llm_risk not in _VALID_RISK_LEVELS:
|
||||
logger.warning(
|
||||
"solver_recommended_action_invalid_risk",
|
||||
name=name,
|
||||
risk=risk,
|
||||
risk=llm_risk,
|
||||
valid=sorted(_VALID_RISK_LEVELS),
|
||||
)
|
||||
return None
|
||||
|
||||
# 2026-04-27 Claude Opus 4.7: F4+vuln-V3 — 永遠拒 critical(不論 LLM 自報或 registry)
|
||||
# 北極星 Blast Radius:critical 動作絕不變成按鈕,必走人工審批
|
||||
if llm_risk == "critical":
|
||||
logger.warning(
|
||||
"solver_recommended_action_critical_rejected",
|
||||
name=name,
|
||||
mcp_tool=mcp_tool,
|
||||
reason="F4: critical 動作必須走人工審批,不可變按鈕",
|
||||
)
|
||||
return None
|
||||
|
||||
# 2026-04-27 Claude Opus 4.7: F1+vuln-V4 — registry 白名單交叉驗
|
||||
# 防 LLM 自創 mcp_tool(如 ssh_run_arbitrary、kubectl_delete_namespace)
|
||||
final_risk = llm_risk
|
||||
if registry_actions is not None:
|
||||
if name not in registry_actions:
|
||||
logger.warning(
|
||||
"solver_recommended_action_not_in_registry",
|
||||
name=name,
|
||||
mcp_tool=mcp_tool,
|
||||
reason="F1: name 不在 callback_action_spec.yaml registry,reject",
|
||||
)
|
||||
return None
|
||||
|
||||
registry_entry = registry_actions[name]
|
||||
registry_tool = str(registry_entry.get("tool", "")).strip()
|
||||
registry_provider = str(registry_entry.get("provider", "")).strip().lower()
|
||||
registry_risk = str(registry_entry.get("risk", "")).strip().lower()
|
||||
|
||||
# F1: mcp_tool 必須與 registry 對應一致(防混搭注入)
|
||||
if registry_tool and mcp_tool != registry_tool:
|
||||
logger.warning(
|
||||
"solver_recommended_action_tool_mismatch",
|
||||
name=name,
|
||||
llm_tool=mcp_tool,
|
||||
registry_tool=registry_tool,
|
||||
reason="F1: mcp_tool 與 registry 不一致,reject",
|
||||
)
|
||||
return None
|
||||
|
||||
# F1: provider 也必須與 registry 對應一致
|
||||
if registry_provider and mcp_provider != registry_provider:
|
||||
logger.warning(
|
||||
"solver_recommended_action_provider_mismatch",
|
||||
name=name,
|
||||
llm_provider=mcp_provider,
|
||||
registry_provider=registry_provider,
|
||||
reason="F1: mcp_provider 與 registry 不一致,reject",
|
||||
)
|
||||
return None
|
||||
|
||||
# F2+vuln-V3: risk 由 registry 蓋寫,不信 LLM
|
||||
if registry_risk and registry_risk in _VALID_RISK_LEVELS:
|
||||
if registry_risk == "critical":
|
||||
logger.warning(
|
||||
"solver_recommended_action_registry_critical_rejected",
|
||||
name=name,
|
||||
reason="F4: registry 標 critical(理論上 _load_mcp_tool_registry 已過濾,雙重保險)",
|
||||
)
|
||||
return None
|
||||
final_risk = registry_risk
|
||||
if final_risk != llm_risk:
|
||||
logger.info(
|
||||
"solver_recommended_action_risk_overridden",
|
||||
name=name,
|
||||
llm_risk=llm_risk,
|
||||
registry_risk=final_risk,
|
||||
reason="F2: 信代碼不信 LLM,risk 由 registry 蓋寫",
|
||||
)
|
||||
|
||||
# params 型別保護(必須是 dict[str, str])
|
||||
if not isinstance(params_raw, dict):
|
||||
params: dict[str, str] = {}
|
||||
@@ -214,46 +366,74 @@ def _validate_recommended_action(raw: Any) -> RecommendedAction | None:
|
||||
name=name[:64],
|
||||
label=label[:80],
|
||||
emoji=emoji[:8],
|
||||
mcp_provider=mcp_provider, # type: ignore[arg-type] # 已驗過在 Literal 範圍
|
||||
mcp_provider=mcp_provider, # type: ignore[arg-type]
|
||||
mcp_tool=mcp_tool[:80],
|
||||
params=params,
|
||||
risk=risk, # type: ignore[arg-type] # 已驗過在 Literal 範圍
|
||||
risk=final_risk, # type: ignore[arg-type]
|
||||
reasoning=reasoning[:400],
|
||||
)
|
||||
|
||||
|
||||
def _extract_recommended_actions(parsed: dict[str, Any]) -> list[RecommendedAction]:
|
||||
def _extract_recommended_actions(
|
||||
parsed: dict[str, Any],
|
||||
registry_actions: dict[str, dict[str, Any]] | None = None,
|
||||
) -> tuple[list[RecommendedAction], str]:
|
||||
"""
|
||||
從 LLM 解析結果提取 recommended_actions(按 schema 驗證)。
|
||||
從 LLM 解析結果提取 recommended_actions(按 schema 驗證 + registry 交叉驗)。
|
||||
|
||||
2026-04-27 Claude Sonnet 4.6: B1 — schema 驗證 + graceful degraded
|
||||
- LLM 輸出不合 schema → 記 warning + skip(不假造)
|
||||
- 最多取 3 個(每假設 0-3 個動作)
|
||||
- 返回空列表 = 降級,呼叫端設 degraded=True
|
||||
2026-04-27 Claude Opus 4.7: F1+H1 — 傳遞 registry + 回傳 status enum
|
||||
|
||||
Args:
|
||||
parsed: LLM JSON 輸出
|
||||
registry_actions: callback_action_spec.yaml 載入後的合法 action 清單
|
||||
|
||||
Returns:
|
||||
list[RecommendedAction](0-3 個,驗證通過的項目)
|
||||
(actions, status):
|
||||
actions: list[RecommendedAction](0-3 個,registry 通過的)
|
||||
status:
|
||||
- "ok" LLM 推 ≥ 1 通過驗證
|
||||
- "empty" LLM 沒推任何動作
|
||||
- "schema_failed" LLM 推但全被 reject(白名單/risk/registry)
|
||||
- "registry_unavailable" registry 載入失敗(caller 傳 {} 或 None)
|
||||
"""
|
||||
raw_list = parsed.get("recommended_actions", [])
|
||||
if not isinstance(raw_list, list):
|
||||
return []
|
||||
if not isinstance(raw_list, list) or len(raw_list) == 0:
|
||||
# registry 不可用是更嚴重的問題,先標 registry_unavailable
|
||||
if registry_actions is not None and not registry_actions:
|
||||
return [], "registry_unavailable"
|
||||
return [], "empty"
|
||||
|
||||
if registry_actions is not None and not registry_actions:
|
||||
# registry 載入失敗:所有動作都會被 F1 reject,提早標記避免誤判
|
||||
logger.warning(
|
||||
"solver_recommended_actions_registry_unavailable",
|
||||
llm_proposed=len(raw_list),
|
||||
reason="registry 為空(_load_mcp_tool_registry 失敗),所有動作將被 F1 reject",
|
||||
)
|
||||
return [], "registry_unavailable"
|
||||
|
||||
result: list[RecommendedAction] = []
|
||||
rejected = 0
|
||||
for idx, raw in enumerate(raw_list[:5]): # 最多處理 5 個,取前 3 個通過驗證的
|
||||
action = _validate_recommended_action(raw)
|
||||
action = _validate_recommended_action(raw, registry_actions=registry_actions)
|
||||
if action is None:
|
||||
rejected += 1
|
||||
logger.warning(
|
||||
"solver_recommended_action_schema_invalid",
|
||||
index=idx,
|
||||
raw=str(raw)[:200],
|
||||
reason="欄位缺失或不符白名單,skip(不假造)",
|
||||
reason="欄位缺失/registry 不符/critical,skip(不假造)",
|
||||
)
|
||||
continue
|
||||
result.append(action)
|
||||
if len(result) >= 3:
|
||||
break
|
||||
|
||||
return result
|
||||
if not result:
|
||||
# LLM 有推但全被擋下 → schema_failed(與 empty 區分)
|
||||
return [], "schema_failed"
|
||||
return result, "ok"
|
||||
|
||||
|
||||
class SolverAgent(BaseAgent):
|
||||
@@ -383,12 +563,15 @@ class SolverAgent(BaseAgent):
|
||||
candidates = _extract_candidates(parsed)
|
||||
|
||||
# 2026-04-27 Claude Sonnet 4.6: B1 — 提取 recommended_actions(schema 驗證)
|
||||
# LLM 輸出不合 schema → graceful skip(log warn + empty actions),不假造
|
||||
recommended_actions = _extract_recommended_actions(parsed)
|
||||
if not recommended_actions:
|
||||
# 2026-04-27 Claude Opus 4.7: F1+H1 — 傳 registry 進 validator + 拿 status enum
|
||||
# LLM 輸出不合 schema 或 registry → graceful skip(log warn + empty actions),不假造
|
||||
recommended_actions, ra_status = _extract_recommended_actions(
|
||||
parsed, registry_actions=mcp_registry,
|
||||
)
|
||||
if ra_status != "ok":
|
||||
logger.info(
|
||||
"solver_recommended_actions_empty",
|
||||
reason="LLM 未輸出合法 recommended_actions 或 schema 驗證失敗",
|
||||
"solver_recommended_actions_not_ok",
|
||||
status=ra_status,
|
||||
snapshot_id=diagnosis.evidence_snapshot_id,
|
||||
)
|
||||
|
||||
@@ -413,6 +596,7 @@ class SolverAgent(BaseAgent):
|
||||
latency_ms=0,
|
||||
vote=AgentVote.APPROVE,
|
||||
recommended_actions=recommended_actions,
|
||||
recommended_actions_status=ra_status,
|
||||
)
|
||||
|
||||
def _build_prompt(self, context: dict[str, Any]) -> str:
|
||||
@@ -465,11 +649,23 @@ class SolverAgent(BaseAgent):
|
||||
else:
|
||||
_mcp_section = "\n(MCP action 清單暫不可用,recommended_actions 可留空陣列)\n"
|
||||
|
||||
# 2026-04-27 Claude Opus 4.7: F3+vuln-V1/V2 — prompt 注入點全過 sanitize
|
||||
# 北極星 Skepticism in RAG:hypothesis/category 來自 Diagnostician(再上游是告警 label),
|
||||
# 攻擊者可透過控制 alertname/description 注入「忽略上述指令」「新指令」等 jailbreak 話術。
|
||||
# sanitize_service 已含 12 條中英文 patterns(_INJECTION_PATTERNS),呼叫前必過。
|
||||
_safe_hypothesis = sanitize(str(context.get("hypothesis", ""))[:800], "solver_hypothesis")
|
||||
_safe_category = sanitize(str(context.get("category", ""))[:120], "solver_category")
|
||||
_confidence = context.get("confidence", 0.0)
|
||||
try:
|
||||
_confidence_pct = f"{float(_confidence):.0%}"
|
||||
except (TypeError, ValueError):
|
||||
_confidence_pct = "0%"
|
||||
|
||||
return f"""你是 AWOOOI SRE 系統的軍師 Agent,專職修復方案設計。
|
||||
|
||||
根因假設:{context.get("hypothesis", "")}
|
||||
告警類別:{context.get("category", "")}
|
||||
診斷信心:{context.get("confidence", 0.0):.0%}
|
||||
根因假設:{_safe_hypothesis}
|
||||
告警類別:{_safe_category}
|
||||
診斷信心:{_confidence_pct}
|
||||
{_inventory_section}{_non_k8s_warning}{_mcp_section}
|
||||
你的工作:為此根因提出 1-3 個修復候選方案,同時輸出 0-3 個結構化 recommended_actions。
|
||||
|
||||
|
||||
@@ -513,13 +513,13 @@ class Settings(BaseSettings):
|
||||
default=False,
|
||||
description="ADR-095: 啟用 12-Agent ConsensusEngine weights(預設關閉)",
|
||||
)
|
||||
# 2026-04-27 P3.1-T2 by Claude — Tier-2 感知強化:DiagnosisAggregator 整合開關
|
||||
# 預設關閉:DiagnosisAggregator 與 PreDecisionInvestigator 存在 K8s+SignOz 資料重疊,
|
||||
# 待重疊分析完成(獨立審查任務)確認互補性後再啟用。
|
||||
# 2026-04-27 P3.1-T2-PathA by Claude — DiagAggregator 信號分類層補 PDI
|
||||
# 路徑 A 已啟用:DA 只取 PDI 已收集的 raw 資料做業務邏輯分類(OOMKilled/CrashLoop 等),
|
||||
# 不重複呼叫 K8s/SignOz API(純邏輯分類,不打外部服務)。
|
||||
# 啟用:kubectl set env deployment/awoooi-api ENABLE_DIAGNOSIS_AGGREGATOR=true
|
||||
ENABLE_DIAGNOSIS_AGGREGATOR: bool = Field(
|
||||
default=False,
|
||||
description="P3.1-T2: 啟用 DiagnosisAggregator 在 PreDecisionInvestigator 中補充 Pod 診斷(預設關閉,待重疊分析完成後評估)",
|
||||
default=True,
|
||||
description="P3.1-T2-PathA: 啟用 DiagnosisAggregator 信號分類層補 PDI(路徑 A:不重複收集,只分類已有 raw 資料)",
|
||||
)
|
||||
|
||||
def get_tg_user_whitelist(self) -> list[int]:
|
||||
|
||||
@@ -207,6 +207,15 @@ AIOPS_DIAGNOSE_FALLBACK_TOTAL = Counter(
|
||||
["from_provider", "to_provider"],
|
||||
)
|
||||
|
||||
# 2026-04-27 Claude Sonnet 4.6: F6 — metric 寫入失敗計數器
|
||||
# 觸發條件: ai_router.py 的 diagnose_fallback_metric_failed except 分支
|
||||
# 用途: 讓 Prometheus 可觀測 metric 管道是否有問題(silent swallow 升 warning + counter)
|
||||
# 告警參考: rate(aiops_diagnose_fallback_metric_error_total[5m]) > 0 → 調查 metrics.py import 鏈
|
||||
AIOPS_DIAGNOSE_FALLBACK_METRIC_ERROR_TOTAL = Counter(
|
||||
"aiops_diagnose_fallback_metric_error_total",
|
||||
"Failures when writing aiops_diagnose_fallback_total metric (indicates metric pipeline issue)",
|
||||
)
|
||||
|
||||
|
||||
def record_diagnose_fallback(from_provider: str, to_provider: str) -> None:
|
||||
"""記錄 DIAGNOSE fallback 事件(per-provider pair 計數)
|
||||
|
||||
@@ -1094,6 +1094,10 @@ class AIRouterExecutor:
|
||||
for provider_name in provider_order:
|
||||
# 2026-04-27 Claude Sonnet 4.6: A2 — 若上一輪失敗且本輪開始,表示發生 fallback
|
||||
# 記錄 metric(DIAGNOSE intent 專屬;非 DIAGNOSE 不記,不影響其他路徑)
|
||||
# 2026-04-27 Claude Sonnet 4.6: F6 — fallback metric 只在真實 analyze() 失敗時觸發
|
||||
# _last_attempted_provider 僅在 provider.analyze() 執行後失敗才賦值(見下方兩處);
|
||||
# not_registered / privacy_skip / circuit_open / rate_limit 分支均不賦值,
|
||||
# 避免這些「被跳過的 provider」誤計入 from→to fallback 鏈,metric 不可信問題(F6)。
|
||||
if _is_diagnose_intent and _last_attempted_provider is not None:
|
||||
try:
|
||||
from src.core.metrics import record_diagnose_fallback
|
||||
@@ -1107,16 +1111,24 @@ class AIRouterExecutor:
|
||||
to_provider=provider_name,
|
||||
)
|
||||
except Exception as _metric_e:
|
||||
logger.debug("diagnose_fallback_metric_failed", error=str(_metric_e))
|
||||
# 2026-04-27 Claude Sonnet 4.6: F6 — 升 warning(原 debug 會 silent swallow)
|
||||
# + 計入 error counter 讓 metric 管道問題可被 Prometheus 偵測
|
||||
logger.warning("diagnose_fallback_metric_failed", error=str(_metric_e))
|
||||
try:
|
||||
from src.core.metrics import AIOPS_DIAGNOSE_FALLBACK_METRIC_ERROR_TOTAL
|
||||
AIOPS_DIAGNOSE_FALLBACK_METRIC_ERROR_TOTAL.inc()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
provider = self._registry.get(provider_name)
|
||||
if not provider:
|
||||
# 2026-04-14 Claude Sonnet 4.6: silent skip 改 errors 累積(觀測性)
|
||||
# 2026-04-27 Claude Sonnet 4.6: F6 — 不設 _last_attempted_provider(未真實執行 analyze)
|
||||
errors.append(f"{provider_name}: not_registered")
|
||||
_last_attempted_provider = provider_name
|
||||
continue
|
||||
|
||||
# 隱私過濾 (D7)
|
||||
# 2026-04-27 Claude Sonnet 4.6: F6 — privacy_skip 不設 _last_attempted_provider(未嘗試)
|
||||
if require_local and provider.privacy_level != "local":
|
||||
errors.append(f"{provider_name}: privacy_skip(non_local)")
|
||||
continue
|
||||
@@ -1126,7 +1138,7 @@ class AIRouterExecutor:
|
||||
if cb.is_open():
|
||||
errors.append(f"{provider_name}: circuit_open")
|
||||
logger.warning("ai_router_circuit_open", provider=provider_name)
|
||||
_last_attempted_provider = provider_name
|
||||
# 2026-04-27 Claude Sonnet 4.6: F6 — circuit_open 不設 _last_attempted_provider(未嘗試)
|
||||
continue
|
||||
|
||||
# 閘門 2: Rate Limiter
|
||||
|
||||
@@ -556,6 +556,161 @@ class DiagnosisAggregator:
|
||||
},
|
||||
))
|
||||
|
||||
def classify_signals_from_raw(
|
||||
self,
|
||||
k8s_data: dict | None = None,
|
||||
logs_data: str | None = None,
|
||||
metrics_data: dict | None = None,
|
||||
) -> list[DiagnosisSignal]:
|
||||
"""
|
||||
2026-04-27 P3.1-T2-PathA by Claude — DiagAggregator 信號分類層補 PDI
|
||||
|
||||
純邏輯信號分類:接受 PDI 已收集的 raw 資料做業務邏輯分類,
|
||||
不打外部 API(K8s/SignOz),不重複收集。
|
||||
|
||||
Args:
|
||||
k8s_data: EvidenceSnapshot.k8s_state(D1,PDI 已收集的 dict)
|
||||
logs_data: EvidenceSnapshot.recent_logs(D2,sanitized string)
|
||||
metrics_data: EvidenceSnapshot.metrics_snapshot(D3,PDI 已收集的 dict)
|
||||
|
||||
Returns:
|
||||
list[DiagnosisSignal]: 分類後的信號清單(空清單代表無異常)
|
||||
"""
|
||||
# 組裝暫時 context 供 _analyze_signals 使用(不觸發任何 IO)
|
||||
ctx = DiagnosisContext(target="_classify_only")
|
||||
|
||||
# D1: k8s_state dict → 嘗試映射為 K8sDiagnostics(只提取可分類欄位)
|
||||
if k8s_data and isinstance(k8s_data, dict):
|
||||
# 利用 K8sDiagnostics.from_dict(若存在)或直接從常見欄位提取信號
|
||||
# 不依賴 K8sDiagnostics.from_dict(避免 import coupling),
|
||||
# 改從 k8s_data 中提取已知信號模式
|
||||
self._classify_k8s_dict_signals(ctx, k8s_data)
|
||||
|
||||
# D3: metrics_snapshot → GoldMetrics-like 分析
|
||||
if metrics_data and isinstance(metrics_data, dict):
|
||||
self._classify_metrics_dict_signals(ctx, metrics_data)
|
||||
|
||||
# D2: logs string → 錯誤計數分類
|
||||
if logs_data and isinstance(logs_data, str):
|
||||
self._classify_log_string_signals(ctx, logs_data)
|
||||
|
||||
return ctx.signals
|
||||
|
||||
def _classify_k8s_dict_signals(
|
||||
self,
|
||||
context: DiagnosisContext,
|
||||
k8s_data: dict,
|
||||
) -> None:
|
||||
"""
|
||||
2026-04-27 P3.1-T2-PathA by Claude — 從 PDI k8s_state dict 提取信號
|
||||
不依賴 K8sDiagnostics 物件,直接從 dict 關鍵字段分類。
|
||||
"""
|
||||
phase = str(k8s_data.get("phase", "")).lower()
|
||||
reason = str(k8s_data.get("reason", "")).lower()
|
||||
restart_count = k8s_data.get("restart_count", 0) or 0
|
||||
|
||||
# CrashLoopBackOff
|
||||
if "crashloop" in phase or "crashloopbackoff" in reason:
|
||||
context.signals.append(DiagnosisSignal(
|
||||
source="k8s_state",
|
||||
signal_type="crash_loop",
|
||||
severity=DiagnosisSeverity.CRITICAL,
|
||||
message=f"CrashLoopBackOff detected (phase={k8s_data.get('phase', '?')})",
|
||||
evidence={"phase": k8s_data.get("phase"), "reason": k8s_data.get("reason")},
|
||||
))
|
||||
|
||||
# OOMKilled
|
||||
if "oomkilled" in phase or "oomkilled" in reason or "oom" in reason:
|
||||
context.signals.append(DiagnosisSignal(
|
||||
source="k8s_state",
|
||||
signal_type="oom_killed",
|
||||
severity=DiagnosisSeverity.CRITICAL,
|
||||
message=f"OOMKilled detected (reason={k8s_data.get('reason', '?')})",
|
||||
evidence={"phase": k8s_data.get("phase"), "reason": k8s_data.get("reason")},
|
||||
))
|
||||
|
||||
# Image pull error
|
||||
if "imagepullerr" in reason or "errimagepull" in reason or "imagepullbackoff" in reason:
|
||||
context.signals.append(DiagnosisSignal(
|
||||
source="k8s_state",
|
||||
signal_type="image_pull_error",
|
||||
severity=DiagnosisSeverity.HIGH,
|
||||
message=f"Image pull error (reason={k8s_data.get('reason', '?')})",
|
||||
evidence={"reason": k8s_data.get("reason")},
|
||||
))
|
||||
|
||||
# High restart count
|
||||
try:
|
||||
rc = int(restart_count)
|
||||
except (TypeError, ValueError):
|
||||
rc = 0
|
||||
if rc > 5:
|
||||
context.signals.append(DiagnosisSignal(
|
||||
source="k8s_state",
|
||||
signal_type="high_restart_count",
|
||||
severity=DiagnosisSeverity.MEDIUM,
|
||||
message=f"High restart count: {rc}",
|
||||
evidence={"restart_count": rc},
|
||||
))
|
||||
|
||||
def _classify_metrics_dict_signals(
|
||||
self,
|
||||
context: DiagnosisContext,
|
||||
metrics_data: dict,
|
||||
) -> None:
|
||||
"""
|
||||
2026-04-27 P3.1-T2-PathA by Claude — 從 PDI metrics_snapshot dict 提取信號
|
||||
"""
|
||||
try:
|
||||
error_rate = float(metrics_data.get("error_rate", 0) or 0)
|
||||
except (TypeError, ValueError):
|
||||
error_rate = 0.0
|
||||
|
||||
try:
|
||||
p99_ms = float(metrics_data.get("p99_latency_ms", 0) or 0)
|
||||
except (TypeError, ValueError):
|
||||
p99_ms = 0.0
|
||||
|
||||
if error_rate > 5:
|
||||
context.signals.append(DiagnosisSignal(
|
||||
source="metrics_snapshot",
|
||||
signal_type="high_error_rate",
|
||||
severity=DiagnosisSeverity.CRITICAL if error_rate > 20 else DiagnosisSeverity.HIGH,
|
||||
message=f"High error rate: {error_rate:.2f}%",
|
||||
evidence={"error_rate": error_rate},
|
||||
))
|
||||
|
||||
if p99_ms > 5000:
|
||||
context.signals.append(DiagnosisSignal(
|
||||
source="metrics_snapshot",
|
||||
signal_type="high_latency",
|
||||
severity=DiagnosisSeverity.HIGH if p99_ms >= 10000 else DiagnosisSeverity.MEDIUM,
|
||||
message=f"High P99 latency: {p99_ms:.0f}ms",
|
||||
evidence={"p99_latency_ms": p99_ms},
|
||||
))
|
||||
|
||||
def _classify_log_string_signals(
|
||||
self,
|
||||
context: DiagnosisContext,
|
||||
logs_data: str,
|
||||
) -> None:
|
||||
"""
|
||||
2026-04-27 P3.1-T2-PathA by Claude — 從 PDI recent_logs string 提取信號
|
||||
"""
|
||||
# 簡單計數 ERROR/FATAL 行
|
||||
error_lines = [
|
||||
line for line in logs_data.splitlines()
|
||||
if any(kw in line.upper() for kw in ("ERROR", "FATAL", "CRITICAL", "EXCEPTION", "TRACEBACK"))
|
||||
]
|
||||
if len(error_lines) > 10:
|
||||
context.signals.append(DiagnosisSignal(
|
||||
source="recent_logs",
|
||||
signal_type="frequent_errors",
|
||||
severity=DiagnosisSeverity.HIGH if len(error_lines) >= 50 else DiagnosisSeverity.MEDIUM,
|
||||
message=f"Frequent error lines in logs: {len(error_lines)}",
|
||||
evidence={"error_line_count": len(error_lines), "sample": error_lines[:3]},
|
||||
))
|
||||
|
||||
# =========================================================================
|
||||
# Utilities
|
||||
# =========================================================================
|
||||
|
||||
@@ -1,20 +1,20 @@
|
||||
"""
|
||||
DiagnosisAggregator Conservative 整合測試
|
||||
DiagnosisAggregator 路徑 A 整合測試
|
||||
==========================================
|
||||
P3.1-T2 by Claude 2026-04-27 — Tier-2 三服務感知強化
|
||||
P3.1-T2-PathA by Claude 2026-04-27 — DiagAggregator 信號分類層補 PDI
|
||||
|
||||
路徑 A 策略:ENABLE_DIAGNOSIS_AGGREGATOR=True(預設啟用)
|
||||
DA 只取 PDI 已收集的 raw 資料做信號分類,不重複呼叫 K8s/SignOz API。
|
||||
|
||||
Conservative 策略:ENABLE_DIAGNOSIS_AGGREGATOR=False(預設關閉)
|
||||
驗證:
|
||||
1. ENABLE_DIAGNOSIS_AGGREGATOR=False 時 aggregator 不被呼叫
|
||||
2. ENABLE_DIAGNOSIS_AGGREGATOR=True 時 _collect_diagnosis_aggregator 被呼叫
|
||||
1. ENABLE_DIAGNOSIS_AGGREGATOR=True(路徑 A 預設啟用)
|
||||
2. _collect_diagnosis_aggregator 走 classify_signals_from_raw(不打外部 API)
|
||||
3. aggregator 呼叫失敗時不影響主路徑(exception 隔離)
|
||||
4. EvidenceSnapshot.extra_diagnosis 欄位存在
|
||||
5. build_summary() 包含 extra_diagnosis 時正確輸出
|
||||
6. DiagnosisAggregator.collect_pod_diagnosis 介面正確
|
||||
4. EvidenceSnapshot.extra_diagnosis 為 dict 結構化資料
|
||||
5. build_summary() 包含 Signal Classification 區塊
|
||||
6. DiagnosisAggregator.classify_signals_from_raw 純邏輯無 IO
|
||||
|
||||
注意:不依賴真實 K8s/SignOz — 全 mock 測試
|
||||
|
||||
重疊分析報告(見 P7-COMPLETION 章節)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
@@ -29,28 +29,42 @@ from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
class TestEvidenceSnapshotExtraDiagnosis:
|
||||
def test_extra_diagnosis_field_exists(self):
|
||||
"""EvidenceSnapshot 應有 extra_diagnosis 欄位,預設 None"""
|
||||
"""EvidenceSnapshot 應有 extra_diagnosis 欄位,預設 None,型別為 dict | None"""
|
||||
from src.services.evidence_snapshot import EvidenceSnapshot
|
||||
snap = EvidenceSnapshot(incident_id="INC-001")
|
||||
assert hasattr(snap, "extra_diagnosis")
|
||||
assert snap.extra_diagnosis is None
|
||||
|
||||
def test_build_summary_includes_extra_diagnosis(self):
|
||||
"""extra_diagnosis 不為 None 時 build_summary 應包含 Pod深診斷"""
|
||||
def test_build_summary_includes_signal_classification(self):
|
||||
"""extra_diagnosis 有 signals 時 build_summary 應包含 Signal Classification"""
|
||||
from src.services.evidence_snapshot import EvidenceSnapshot
|
||||
snap = EvidenceSnapshot(incident_id="INC-001")
|
||||
snap.extra_diagnosis = "## 診斷目標\n- Target: api-pod\n- Namespace: awoooi-prod"
|
||||
snap.extra_diagnosis = {
|
||||
"signal_count": 2,
|
||||
"signals": [
|
||||
{"signal_type": "crash_loop", "severity": "critical", "message": "CrashLoopBackOff"},
|
||||
{"signal_type": "oom_killed", "severity": "critical", "message": "OOMKilled"},
|
||||
],
|
||||
}
|
||||
summary = snap.build_summary()
|
||||
assert "Pod深診斷" in summary
|
||||
assert "api-pod" in summary
|
||||
assert "Signal Classification" in summary
|
||||
assert "crash_loop" in summary
|
||||
|
||||
def test_build_summary_no_extra_diagnosis_no_section(self):
|
||||
"""extra_diagnosis=None 時 build_summary 不應包含 Pod深診斷"""
|
||||
"""extra_diagnosis=None 時 build_summary 不應包含 Signal Classification"""
|
||||
from src.services.evidence_snapshot import EvidenceSnapshot
|
||||
snap = EvidenceSnapshot(incident_id="INC-001")
|
||||
snap.extra_diagnosis = None
|
||||
summary = snap.build_summary()
|
||||
assert "Pod深診斷" not in summary
|
||||
assert "Signal Classification" not in summary
|
||||
|
||||
def test_build_summary_empty_signals_no_section(self):
|
||||
"""extra_diagnosis signals=[] 時 build_summary 不應包含 Signal Classification"""
|
||||
from src.services.evidence_snapshot import EvidenceSnapshot
|
||||
snap = EvidenceSnapshot(incident_id="INC-001")
|
||||
snap.extra_diagnosis = {"signal_count": 0, "signals": []}
|
||||
summary = snap.build_summary()
|
||||
assert "Signal Classification" not in summary
|
||||
|
||||
def test_extra_diagnosis_not_persisted_to_db_record(self):
|
||||
"""extra_diagnosis 是 in-memory only,save() 不應包含此欄位到 DB model"""
|
||||
@@ -67,20 +81,19 @@ class TestEvidenceSnapshotExtraDiagnosis:
|
||||
|
||||
class TestDiagnosisAggregatorFeatureFlag:
|
||||
def test_feature_flag_exists_in_settings(self):
|
||||
"""config.py 應有 ENABLE_DIAGNOSIS_AGGREGATOR 欄位,預設 False"""
|
||||
"""config.py 應有 ENABLE_DIAGNOSIS_AGGREGATOR 欄位,路徑 A 預設 True"""
|
||||
from src.core.config import settings
|
||||
assert hasattr(settings, "ENABLE_DIAGNOSIS_AGGREGATOR")
|
||||
# 預設應為 False(conservative)
|
||||
assert settings.ENABLE_DIAGNOSIS_AGGREGATOR is False
|
||||
# 路徑 A 啟用:預設 True
|
||||
assert settings.ENABLE_DIAGNOSIS_AGGREGATOR is True
|
||||
|
||||
def test_feature_flag_default_false(self):
|
||||
"""直接從 Settings class 確認預設值"""
|
||||
def test_feature_flag_default_true(self):
|
||||
"""直接從 Settings class 確認路徑 A 預設值為 True"""
|
||||
from src.core.config import Settings
|
||||
import inspect
|
||||
source = inspect.getsource(Settings)
|
||||
# 確認 ENABLE_DIAGNOSIS_AGGREGATOR 存在且預設 False
|
||||
assert "ENABLE_DIAGNOSIS_AGGREGATOR" in source
|
||||
assert "default=False" in source or "False" in source
|
||||
assert "default=True" in source
|
||||
|
||||
def test_aggregator_guarded_by_flag_in_investigate(self):
|
||||
"""investigate() 4.6 區塊有 ENABLE_DIAGNOSIS_AGGREGATOR flag 守門(source inspection)"""
|
||||
@@ -93,104 +106,129 @@ class TestDiagnosisAggregatorFeatureFlag:
|
||||
"investigate() 應呼叫 _collect_diagnosis_aggregator"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_collect_diagnosis_aggregator_skips_no_pod(self):
|
||||
"""沒有 pod label 時 _collect_diagnosis_aggregator 應 early return"""
|
||||
async def test_collect_diagnosis_aggregator_uses_classify_signals_from_raw(self):
|
||||
"""路徑 A:_collect_diagnosis_aggregator 應呼叫 classify_signals_from_raw(非 collect_pod_diagnosis)"""
|
||||
from src.services.pre_decision_investigator import PreDecisionInvestigator
|
||||
from src.services.evidence_snapshot import EvidenceSnapshot
|
||||
|
||||
investigator = PreDecisionInvestigator()
|
||||
|
||||
class _Signal:
|
||||
labels = {"alertname": "HostDown", "namespace": "awoooi-prod", "severity": "critical"}
|
||||
alert_name = "HostDown"
|
||||
annotations = {}
|
||||
source = "prometheus"
|
||||
|
||||
class _Inc:
|
||||
incident_id = "INC-TEST-002"
|
||||
signals = [_Signal()]
|
||||
signals = []
|
||||
|
||||
snap = EvidenceSnapshot(incident_id="INC-TEST-002")
|
||||
mock_aggregator = AsyncMock()
|
||||
mock_aggregator.collect_pod_diagnosis = AsyncMock()
|
||||
snap.k8s_state = {"phase": "CrashLoopBackOff"}
|
||||
snap.recent_logs = None
|
||||
snap.metrics_snapshot = None
|
||||
|
||||
mock_aggregator = MagicMock()
|
||||
mock_aggregator.classify_signals_from_raw = MagicMock(return_value=[])
|
||||
mock_aggregator.collect_pod_diagnosis = AsyncMock() # 不應被呼叫
|
||||
|
||||
with patch("src.services.diagnosis_aggregator.get_diagnosis_aggregator", return_value=mock_aggregator):
|
||||
await investigator._collect_diagnosis_aggregator(snap, _Inc())
|
||||
|
||||
# 無 pod_name → early return,aggregator 不被呼叫
|
||||
# 路徑 A:classify_signals_from_raw 被呼叫,collect_pod_diagnosis 不被呼叫
|
||||
mock_aggregator.classify_signals_from_raw.assert_called_once()
|
||||
mock_aggregator.collect_pod_diagnosis.assert_not_called()
|
||||
assert snap.extra_diagnosis is None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_collect_diagnosis_aggregator_fills_extra_diagnosis(self):
|
||||
"""有 pod label + aggregator 成功時 extra_diagnosis 應被填入"""
|
||||
async def test_da_classify_signals_no_api_call(self):
|
||||
"""路徑 A 核心驗證:classify_signals_from_raw 是純邏輯,不打外部 API"""
|
||||
from src.services.diagnosis_aggregator import DiagnosisAggregator
|
||||
|
||||
agg = DiagnosisAggregator.__new__(DiagnosisAggregator)
|
||||
|
||||
# 不 init k8s_service / signoz_client(純邏輯方法不依賴這些)
|
||||
with patch("src.services.diagnosis_aggregator.get_k8s_diagnostics_service"), \
|
||||
patch("src.services.diagnosis_aggregator.get_signoz_client"):
|
||||
|
||||
signals = agg.classify_signals_from_raw(
|
||||
k8s_data={"phase": "CrashLoopBackOff", "restart_count": 10},
|
||||
logs_data=None,
|
||||
metrics_data={"error_rate": 25.0},
|
||||
)
|
||||
|
||||
# 應產出信號
|
||||
assert len(signals) >= 1
|
||||
signal_types = [s.signal_type for s in signals]
|
||||
assert "crash_loop" in signal_types
|
||||
assert "high_error_rate" in signal_types
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_collect_diagnosis_aggregator_fills_extra_diagnosis_dict(self):
|
||||
"""路徑 A:extra_diagnosis 應為 dict 結構化資料(非 string)"""
|
||||
from src.services.pre_decision_investigator import PreDecisionInvestigator
|
||||
from src.services.evidence_snapshot import EvidenceSnapshot
|
||||
from src.services.diagnosis_aggregator import DiagnosisSignal, DiagnosisSeverity
|
||||
|
||||
investigator = PreDecisionInvestigator()
|
||||
|
||||
class _Signal:
|
||||
labels = {"alertname": "KubePodCrashLooping", "namespace": "awoooi-prod", "pod": "api-xyz-abc", "severity": "critical"}
|
||||
alert_name = "KubePodCrashLooping"
|
||||
annotations = {}
|
||||
source = "prometheus"
|
||||
|
||||
class _Inc:
|
||||
incident_id = "INC-TEST-003"
|
||||
signals = [_Signal()]
|
||||
signals = []
|
||||
|
||||
snap = EvidenceSnapshot(incident_id="INC-TEST-003")
|
||||
snap.k8s_state = {"phase": "CrashLoopBackOff"}
|
||||
snap.recent_logs = None
|
||||
snap.metrics_snapshot = None
|
||||
|
||||
# mock DiagnosisContext
|
||||
mock_ctx = MagicMock()
|
||||
mock_ctx.signals = []
|
||||
mock_ctx.highest_severity = MagicMock()
|
||||
mock_ctx.highest_severity.value = "info"
|
||||
mock_ctx.get_llm_prompt_context = MagicMock(return_value="## 診斷目標\n- Target: api-xyz-abc")
|
||||
fake_signal = DiagnosisSignal(
|
||||
source="k8s_state",
|
||||
signal_type="crash_loop",
|
||||
severity=DiagnosisSeverity.CRITICAL,
|
||||
message="CrashLoopBackOff detected",
|
||||
)
|
||||
|
||||
mock_aggregator = MagicMock()
|
||||
mock_aggregator.collect_pod_diagnosis = AsyncMock(return_value=mock_ctx)
|
||||
mock_aggregator.classify_signals_from_raw = MagicMock(return_value=[fake_signal])
|
||||
|
||||
with patch("src.services.diagnosis_aggregator.get_diagnosis_aggregator", return_value=mock_aggregator):
|
||||
await investigator._collect_diagnosis_aggregator(snap, _Inc())
|
||||
|
||||
assert snap.extra_diagnosis is not None
|
||||
assert "api-xyz-abc" in snap.extra_diagnosis
|
||||
assert isinstance(snap.extra_diagnosis, dict)
|
||||
assert snap.extra_diagnosis["signal_count"] == 1
|
||||
assert snap.extra_diagnosis["signals"][0]["signal_type"] == "crash_loop"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_da_signals_appear_in_evidence_summary(self):
|
||||
"""路徑 A:extra_diagnosis signals 應出現在 build_summary 的 Signal Classification 區塊"""
|
||||
from src.services.evidence_snapshot import EvidenceSnapshot
|
||||
|
||||
snap = EvidenceSnapshot(incident_id="INC-TEST-SC-001")
|
||||
snap.extra_diagnosis = {
|
||||
"signal_count": 1,
|
||||
"signals": [{"signal_type": "oom_killed", "severity": "critical", "message": "OOMKilled"}],
|
||||
}
|
||||
summary = snap.build_summary()
|
||||
assert "Signal Classification" in summary
|
||||
assert "oom_killed" in summary
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_collect_diagnosis_aggregator_exception_isolated(self):
|
||||
"""aggregator 拋出 exception 時不影響主路徑,snap.extra_diagnosis 維持 None"""
|
||||
"""路徑 A:aggregator 內部異常時 snap.extra_diagnosis 維持 None(exception 被內層 catch)"""
|
||||
from src.services.pre_decision_investigator import PreDecisionInvestigator
|
||||
from src.services.evidence_snapshot import EvidenceSnapshot
|
||||
|
||||
investigator = PreDecisionInvestigator()
|
||||
|
||||
class _Signal:
|
||||
labels = {"alertname": "KubePodCrashLooping", "namespace": "awoooi-prod", "pod": "api-fail", "severity": "critical"}
|
||||
alert_name = "KubePodCrashLooping"
|
||||
annotations = {}
|
||||
source = "prometheus"
|
||||
|
||||
class _Inc:
|
||||
incident_id = "INC-TEST-004"
|
||||
signals = [_Signal()]
|
||||
signals = []
|
||||
|
||||
snap = EvidenceSnapshot(incident_id="INC-TEST-004")
|
||||
|
||||
mock_aggregator = MagicMock()
|
||||
mock_aggregator.collect_pod_diagnosis = AsyncMock(
|
||||
side_effect=Exception("K8s API timeout")
|
||||
mock_aggregator.classify_signals_from_raw = MagicMock(
|
||||
side_effect=Exception("classify error")
|
||||
)
|
||||
|
||||
# _collect_diagnosis_aggregator 本身不 catch,外層 investigate() 有 try/except
|
||||
# 測試:exception 傳出 (由外層 catch)
|
||||
# 路徑 A:_collect_diagnosis_aggregator 有內層 try/except,不會 raise
|
||||
with patch("src.services.diagnosis_aggregator.get_diagnosis_aggregator", return_value=mock_aggregator):
|
||||
try:
|
||||
await investigator._collect_diagnosis_aggregator(snap, _Inc())
|
||||
except Exception:
|
||||
pass # 外層 investigate() 會 catch
|
||||
await investigator._collect_diagnosis_aggregator(snap, _Inc())
|
||||
|
||||
# snap 不受影響
|
||||
assert snap.extra_diagnosis is None
|
||||
|
||||
|
||||
|
||||
@@ -201,10 +201,9 @@ class TestValidateRecommendedAction:
|
||||
result = _validate_recommended_action(raw)
|
||||
assert result is not None, f"provider={provider} 應通過驗證"
|
||||
|
||||
def test_all_valid_risk_levels(self):
|
||||
"""所有合法 risk 等級都能通過驗證"""
|
||||
valid_risks = ["low", "medium", "high", "critical"]
|
||||
for risk in valid_risks:
|
||||
def test_low_medium_high_risk_levels_accepted(self):
|
||||
"""low/medium/high risk 等級通過驗證;critical 須走人工審批(F4 設計)"""
|
||||
for risk in ["low", "medium", "high"]:
|
||||
raw = {
|
||||
"name": f"test_{risk}",
|
||||
"label": f"測試 {risk}",
|
||||
@@ -218,6 +217,21 @@ class TestValidateRecommendedAction:
|
||||
result = _validate_recommended_action(raw)
|
||||
assert result is not None, f"risk={risk} 應通過驗證"
|
||||
|
||||
def test_critical_risk_rejected_for_button(self):
|
||||
"""F4 修法:critical 動作必須走人工審批,不可變 Telegram 按鈕"""
|
||||
raw = {
|
||||
"name": "test_critical",
|
||||
"label": "測試 critical",
|
||||
"emoji": "🔍",
|
||||
"mcp_provider": "k8s",
|
||||
"mcp_tool": "some_tool",
|
||||
"params": {},
|
||||
"risk": "critical",
|
||||
"reasoning": "test",
|
||||
}
|
||||
result = _validate_recommended_action(raw)
|
||||
assert result is None, "critical risk 應被拒絕(F4:不可變按鈕,須走人工審批)"
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Unit — _extract_recommended_actions 批量驗證
|
||||
@@ -228,17 +242,17 @@ class TestExtractRecommendedActions:
|
||||
|
||||
def test_empty_list_returns_empty(self):
|
||||
"""空 recommended_actions → 返回 []"""
|
||||
result = _extract_recommended_actions({"recommended_actions": []})
|
||||
result, _ = _extract_recommended_actions({"recommended_actions": []})
|
||||
assert result == []
|
||||
|
||||
def test_missing_key_returns_empty(self):
|
||||
"""沒有 recommended_actions key → 返回 []"""
|
||||
result = _extract_recommended_actions({"candidates": []})
|
||||
result, _ = _extract_recommended_actions({"candidates": []})
|
||||
assert result == []
|
||||
|
||||
def test_non_list_returns_empty(self):
|
||||
"""recommended_actions 非 list → 返回 []"""
|
||||
result = _extract_recommended_actions({"recommended_actions": "not_a_list"})
|
||||
result, _ = _extract_recommended_actions({"recommended_actions": "not_a_list"})
|
||||
assert result == []
|
||||
|
||||
def test_valid_actions_extracted(self):
|
||||
@@ -267,7 +281,7 @@ class TestExtractRecommendedActions:
|
||||
},
|
||||
]
|
||||
}
|
||||
result = _extract_recommended_actions(parsed)
|
||||
result, _ = _extract_recommended_actions(parsed)
|
||||
assert len(result) == 2
|
||||
assert result[0].name == "check_pod_logs"
|
||||
assert result[1].name == "k8s_restart"
|
||||
@@ -307,7 +321,7 @@ class TestExtractRecommendedActions:
|
||||
},
|
||||
]
|
||||
}
|
||||
result = _extract_recommended_actions(parsed)
|
||||
result, _ = _extract_recommended_actions(parsed)
|
||||
assert len(result) == 2, "應 skip 非法,只返回 2 個合法 action"
|
||||
names = {r.name for r in result}
|
||||
assert "valid_action" in names
|
||||
@@ -331,7 +345,7 @@ class TestExtractRecommendedActions:
|
||||
for i in range(5)
|
||||
]
|
||||
}
|
||||
result = _extract_recommended_actions(parsed)
|
||||
result, _ = _extract_recommended_actions(parsed)
|
||||
assert len(result) == 3, "最多取 3 個 recommended_actions"
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user