Files
awoooi/apps/api/src/agents/solver_agent.py
Your Name 7f200aff5f
All checks were successful
CD Pipeline / build-and-deploy (push) Successful in 10m45s
fix(solver): 注入告警 labels 讓 params 模板填充真實值
根因:Solver LLM 不知道 namespace/pod/deployment/instance 真實值,
      recommended_actions.params 模板({labels.namespace} 等)填不出來
      → Telegram 顯示 kubectl scale deployment  --replicas=(空白)

修復:
- solver.run() 加 incident_labels 參數
- _build_prompt() 把 labels 顯式列出給 LLM 參考
- orchestrator 從 snapshot.alert_info.labels 取出後傳入

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-28 15:05:06 +08:00

1146 lines
52 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
AWOOOI AIOps Phase 2 — Solver Agent軍師
===========================================
職責:對每個根因假設產修復方案
輸入DiagnosisReport來自 Diagnostician
輸出ActionPlan候選動作含 blast_radius + rollback_cost + confidence
設計原則:
1. 每個 Hypothesis 至少產 1 個 CandidateAction
2. blast_radius 評分影響 Reviewer 的審查嚴格度
3. blast_radius > 50 → Reviewer 必須 request_revision
4. 熔斷降級LLM 失敗 → graceful degradedcandidates=[], recommended_actions=[], degraded=True
禁止 rule-based mock / hardcode RESTART 兜底(北極星 §1.1 修復多樣性 ≥ 40%,禁止寫死規則)
5. Solver 不直接觸碰執行層Coordinator 的工作)
6. recommended_actions結構化 MCP 動作清單,供 B3 Telegram 按鈕動態生成
安全原則B1 Fix Round, 2026-04-27
- F1: mcp_tool registry 白名單 — LLM 推薦的 name/mcp_tool 必須對應 registry 存在的 entry
- F2: risk 由 registry 蓋寫 — 信代碼不信 LLMLLM 自報 risk 一律被 registry 值取代
- F3: prompt 注入點 sanitize — hypothesis/category/evidence_summary 全部過 sanitize()
- F4: critical risk 永遠 reject — critical 動作必須走人工審批,絕不成為 Telegram 按鈕
ADR-082: Phase 2 多 Agent 協作
2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 2 初始建立
2026-04-27 Claude Sonnet 4.6: B1 — Solver 結構化動作 (北極星 §1.1 修復多樣性 ≥ 40%)
2026-04-27 Claude Sonnet 4.6: B1 Fix Round — F1/F2/F3/F4/H1/H2/H3 安全強化
"""
from __future__ import annotations
import asyncio
import hashlib
import os
import re
import time
from pathlib import Path
from typing import Any
import structlog
import yaml
from src.agents.base import BaseAgent
from src.agents.protocol import (
ActionPlan,
AgentRole,
AgentVote,
CandidateAction,
DiagnosisReport,
RecommendedAction,
RecommendedActionsStatus,
)
from src.observability.agent_step_metrics import observe_agent_step
from src.services.sanitization_service import sanitize
logger = structlog.get_logger(__name__)
# 2026-04-27 Claude Sonnet 4.6: A1 — 三段 timeout 拆分 + step metric (北極星 §1.2 Observable by Default)
# 背景INC-20260425-8D17BB / 3B6C39 兩則告警 AI 信心降到 20%
# OpenClaw NIM (192.168.0.188:8088) 實測 2-27s原共用 PHASE2_STEP_TIMEOUT_SEC=20.0
# Solver prompt 規模中等K8s inventory + hypothesis分配 timeout=20s
# env override部署時可透過 K8s ConfigMap 動態調整,無需重新 build image
AGENT_SOLVER_TIMEOUT_SEC: float = float(
os.environ.get("AGENT_SOLVER_TIMEOUT_SEC", "20.0")
)
# 保留相容 alias標記棄用
# DEPRECATED (2026-04-27): 使用 AGENT_SOLVER_TIMEOUT_SEC此 alias 將在下一個 Sprint 移除
PHASE2_STEP_TIMEOUT_SEC = AGENT_SOLVER_TIMEOUT_SEC
# 2026-04-24 ogt + Claude Sonnet 4.6: kubectl 白名單正則C1/C3 安全修復版)
# C1原正則 \s 匹配 \n\r\t\f\v可繞過防護注入換行命令PoC: "kubectl get pods\nrm -rf /" 通過)
# C3\s+(變長)與字元類 \s含空白組合構成指數回溯 ReDoS 向量
# PoC: 40000 個空格 → 7.18s 阻塞
# 修復策略:
# 1. 分隔符改為顯式 [ ]ASCII 空格),明確排除 \n\r\t\f\v
# 2. 字元類改為 [A-Za-z0-9 \-=./:_,@](顯式空格,非 \s
# 3. 有界 quantifier {1,500} 防止無界回溯
# 4. re.ASCII 旗標禁用 Unicode 空白匹配(如 等不可見字元)
# 範圍Nemo 路徑 + action_title 路徑 + 標準 candidates 路徑三層防護C2
_KUBECTL_COMMAND_PATTERN = re.compile(
r"^kubectl[ ][A-Za-z0-9 \-=./:_,@]{1,500}$",
re.ASCII,
)
# 指令長度上限(與正則 {1,500} 對齊,先做長度 O(1) 硬檢查再跑正則)
_KUBECTL_MAX_LEN = 500
# 2026-04-27 Claude Sonnet 4.6: B1 — 從 callback_action_spec.yaml 動態載入 MCP tool 清單
# 失敗無害:載入失敗 → 使用空 dict → prompt 提示「暫無 MCP 清單」
_CALLBACK_SPEC_PATH = Path(__file__).parent.parent / "services" / "callback_action_spec.yaml"
# 有效的 mcp_provider 清單(與 RecommendedAction schema 對應)
_VALID_MCP_PROVIDERS = frozenset(
{"k8s", "ssh", "prometheus", "signoz", "database", "internal"}
)
# 有效的 risk 等級(與 RecommendedAction schema 對應)
_VALID_RISK_LEVELS = frozenset({"low", "medium", "high", "critical"})
# 2026-04-27 Claude Sonnet 4.6: H2+F4+vuln-V3/V4 — module-level cache含 mtime invalidation
# 北極星 Blast Radiusregistry 載入失敗時立即 set metric=error不 silent 失敗
# cache 結構:{registry: dict, mtime: float | 0.0}
_REGISTRY_CACHE: dict[str, dict[str, Any]] = {} # 快取的 registry過濾後
_REGISTRY_MTIME: float = 0.0 # 上次 YAML mtime0.0 = 尚未載入)
def _load_mcp_tool_registry() -> dict[str, dict[str, Any]]:
"""
從 callback_action_spec.yaml 動態載入 MCP tool 清單(含 module-level cache
2026-04-27 Claude Sonnet 4.6: B1 — 動態載入,避免 hardcode
2026-04-27 Claude Sonnet 4.6: H2+F4+vuln-V3/V4 — cache + mtime invalidation + metric + 安全過濾
北極星 Blast Radiuscritical/multi-sig actions 不放進 registry絕不暴露給 LLM
回傳格式:{action_name: {provider, tool, risk, label, emoji}}
過濾規則:
- requires_multi_sig=true → 排除(不讓 LLM 看到,避免提示 LLM 自創相似名稱)
- risk=critical → 排除F4: critical 動作必須走人工審批,不成為按鈕)
快取邏輯:
- 第一次 load 寫 _REGISTRY_CACHE / _REGISTRY_MTIME
- 後續 call 比較 YAML mtime未變化直接回快取O(1)
失敗 → 返回 {} + set metric=error + log error不 silent
"""
global _REGISTRY_CACHE, _REGISTRY_MTIME
from src.core.metrics import SOLVER_MCP_REGISTRY_LOADED
# 比較 mtime無法取得 mtime 視為需要重新載入
try:
current_mtime = _CALLBACK_SPEC_PATH.stat().st_mtime
except OSError:
current_mtime = 0.0
# cache 命中mtime 未變 且 快取非空
if current_mtime > 0.0 and current_mtime == _REGISTRY_MTIME and _REGISTRY_CACHE:
return _REGISTRY_CACHE
try:
with _CALLBACK_SPEC_PATH.open(encoding="utf-8") as f:
spec = yaml.safe_load(f)
registry: dict[str, dict[str, Any]] = {}
for name, action in (spec.get("actions") or {}).items():
mcp = action.get("mcp") or {}
risk_val = str(action.get("risk", "medium")).strip().lower()
requires_multi_sig = bool(action.get("requires_multi_sig", False))
# 2026-04-27 Claude Sonnet 4.6: F4+vuln-V3 — 過濾 critical/multi-sig
# 北極星 Blast Radiuscritical 動作絕不變成按鈕,也不暴露給 LLM 供模仿
if risk_val == "critical" or requires_multi_sig:
logger.debug(
"mcp_registry_action_filtered",
name=name,
risk=risk_val,
requires_multi_sig=requires_multi_sig,
reason="critical 或 requires_multi_sig=true不放入 LLM 可見 registry",
)
continue
registry[name] = {
"provider": mcp.get("provider", ""),
"tool": mcp.get("tool", ""),
"risk": risk_val,
"label": action.get("label", name),
"emoji": action.get("emoji", ""),
"params": mcp.get("params") or {},
"description": action.get("description", ""),
}
# 更新快取
_REGISTRY_CACHE = registry
_REGISTRY_MTIME = current_mtime
# H2: registry 健康 metric
if registry:
SOLVER_MCP_REGISTRY_LOADED.labels(status="ok").set(1)
SOLVER_MCP_REGISTRY_LOADED.labels(status="empty").set(0)
SOLVER_MCP_REGISTRY_LOADED.labels(status="error").set(0)
else:
SOLVER_MCP_REGISTRY_LOADED.labels(status="empty").set(1)
SOLVER_MCP_REGISTRY_LOADED.labels(status="ok").set(0)
SOLVER_MCP_REGISTRY_LOADED.labels(status="error").set(0)
logger.warning("mcp_registry_empty", path=str(_CALLBACK_SPEC_PATH))
return registry
except Exception as e:
# H2: 載入失敗設 error metric + log error不 silent
SOLVER_MCP_REGISTRY_LOADED.labels(status="error").set(1)
SOLVER_MCP_REGISTRY_LOADED.labels(status="ok").set(0)
SOLVER_MCP_REGISTRY_LOADED.labels(status="empty").set(0)
logger.error("mcp_registry_load_failed", path=str(_CALLBACK_SPEC_PATH), error=str(e))
return {}
def _is_safe_kubectl_command(cmd: str) -> bool:
"""kubectl 命令白名單驗證。
只允許 kubectl 開頭 + 合法字符集英數、ASCII 空格、- = . / : _ , @)。
任何 shell 元字符(; & | ` $ > < 換行 Tab null 等)皆返回 False。
C1 防禦:顯式拒絕 \\n \\r \\t \\x00換行注入 / null byte
C3 防禦:長度上限硬檢查,避免 ReDoS\\_KUBECTL_MAX_LEN = 500
Args:
cmd: 待驗證的命令字串
Returns:
True — 通過白名單False — 含非法字符或非 kubectl 開頭
"""
# 型別保護(在 str() 轉換前確保已是 str
if not isinstance(cmd, str):
return False
cmd = cmd.strip()
# C3長度上限硬檢查O(1),避免觸發正則回溯
if len(cmd) > _KUBECTL_MAX_LEN:
return False
# C1顯式拒絕換行 / 歸位 / Tab / null byte
if any(ch in cmd for ch in ("\n", "\r", "\t", "\x00")):
return False
if not cmd.startswith("kubectl"):
return False
return _KUBECTL_COMMAND_PATTERN.fullmatch(cmd) is not None
def _validate_recommended_action(
raw: Any,
registry_actions: dict[str, dict[str, Any]] | None = None,
) -> RecommendedAction | None:
"""
驗證單一 recommended_action LLM 輸出是否符合 RecommendedAction schema。
2026-04-27 Claude Sonnet 4.6: B1 — schema 驗證,不合規則 graceful skip
2026-04-27 Claude Opus 4.7: F1+F2+F4+vuln-V3/V4 — registry 交叉驗 + risk 蓋寫 + 拒 critical
- F1: name 必須在 registry_actions 內(防 LLM 自創 mcp_tool 如 ssh_run_arbitrary
- F2: risk 由 registry 蓋寫,不信 LLM防 LLM 把 kubectl_delete_namespace 標 risk=low
- F4: 任何 risk='critical' 永遠 rejectcritical 必走人工審批,不變按鈕)
Args:
raw: LLM 輸出的單一 action dict
registry_actions: 由 _load_mcp_tool_registry 載入的合法 action 清單。
None = 不驗證(向後相容單元測試),但生產路徑必傳
Returns:
RecommendedAction — 驗證通過risk 已被 registry 值覆蓋)
None — 不符 schema 或不在 registry 或 critical呼叫端 skip 此項(不假造)
"""
if not isinstance(raw, dict):
return None
name = str(raw.get("name", "")).strip()
label = str(raw.get("label", "")).strip()
emoji = str(raw.get("emoji", "")).strip()
mcp_provider = str(raw.get("mcp_provider", "")).strip().lower()
mcp_tool = str(raw.get("mcp_tool", "")).strip()
llm_risk = str(raw.get("risk", "")).strip().lower()
reasoning = str(raw.get("reasoning", "")).strip()
params_raw = raw.get("params", {})
# 必填欄位非空
if not name or not label or not mcp_provider or not mcp_tool or not llm_risk:
return None
# provider 白名單驗證
if mcp_provider not in _VALID_MCP_PROVIDERS:
logger.warning(
"solver_recommended_action_invalid_provider",
name=name,
mcp_provider=mcp_provider,
valid=sorted(_VALID_MCP_PROVIDERS),
)
return None
# risk 白名單驗證(先做基本格式檢查)
if llm_risk not in _VALID_RISK_LEVELS:
logger.warning(
"solver_recommended_action_invalid_risk",
name=name,
risk=llm_risk,
valid=sorted(_VALID_RISK_LEVELS),
)
return None
# 2026-04-27 Claude Opus 4.7: F4+vuln-V3 — 永遠拒 critical不論 LLM 自報或 registry
# 北極星 Blast Radiuscritical 動作絕不變成按鈕,必走人工審批
if llm_risk == "critical":
logger.warning(
"solver_recommended_action_critical_rejected",
name=name,
mcp_tool=mcp_tool,
reason="F4: critical 動作必須走人工審批,不可變按鈕",
)
return None
# 2026-04-27 Claude Opus 4.7: F1+vuln-V4 — registry 白名單交叉驗
# 防 LLM 自創 mcp_tool如 ssh_run_arbitrary、kubectl_delete_namespace
final_risk = llm_risk
if registry_actions is not None:
if name not in registry_actions:
logger.warning(
"solver_recommended_action_not_in_registry",
name=name,
mcp_tool=mcp_tool,
reason="F1: name 不在 callback_action_spec.yaml registryreject",
)
return None
registry_entry = registry_actions[name]
registry_tool = str(registry_entry.get("tool", "")).strip()
registry_provider = str(registry_entry.get("provider", "")).strip().lower()
registry_risk = str(registry_entry.get("risk", "")).strip().lower()
# F1: mcp_tool 必須與 registry 對應一致(防混搭注入)
if registry_tool and mcp_tool != registry_tool:
logger.warning(
"solver_recommended_action_tool_mismatch",
name=name,
llm_tool=mcp_tool,
registry_tool=registry_tool,
reason="F1: mcp_tool 與 registry 不一致reject",
)
return None
# F1: provider 也必須與 registry 對應一致
if registry_provider and mcp_provider != registry_provider:
logger.warning(
"solver_recommended_action_provider_mismatch",
name=name,
llm_provider=mcp_provider,
registry_provider=registry_provider,
reason="F1: mcp_provider 與 registry 不一致reject",
)
return None
# F2+vuln-V3: risk 由 registry 蓋寫,不信 LLM
if registry_risk and registry_risk in _VALID_RISK_LEVELS:
if registry_risk == "critical":
logger.warning(
"solver_recommended_action_registry_critical_rejected",
name=name,
reason="F4: registry 標 critical理論上 _load_mcp_tool_registry 已過濾,雙重保險)",
)
return None
final_risk = registry_risk
if final_risk != llm_risk:
logger.info(
"solver_recommended_action_risk_overridden",
name=name,
llm_risk=llm_risk,
registry_risk=final_risk,
reason="F2: 信代碼不信 LLMrisk 由 registry 蓋寫",
)
# params 型別保護(必須是 dict[str, str]
if not isinstance(params_raw, dict):
params: dict[str, str] = {}
else:
params = {str(k): str(v) for k, v in params_raw.items()}
return RecommendedAction(
name=name[:64],
label=label[:80],
emoji=emoji[:8],
mcp_provider=mcp_provider, # type: ignore[arg-type]
mcp_tool=mcp_tool[:80],
params=params,
risk=final_risk, # type: ignore[arg-type]
reasoning=reasoning[:400],
)
def _extract_recommended_actions(
parsed: dict[str, Any],
registry_actions: dict[str, dict[str, Any]] | None = None,
) -> tuple[list[RecommendedAction], str]:
"""
從 LLM 解析結果提取 recommended_actions按 schema 驗證 + registry 交叉驗)。
2026-04-27 Claude Sonnet 4.6: B1 — schema 驗證 + graceful degraded
2026-04-27 Claude Opus 4.7: F1+H1 — 傳遞 registry + 回傳 status enum
Args:
parsed: LLM JSON 輸出
registry_actions: callback_action_spec.yaml 載入後的合法 action 清單
Returns:
(actions, status):
actions: list[RecommendedAction]0-3 個registry 通過的)
status:
- "ok" LLM 推 ≥ 1 通過驗證
- "empty" LLM 沒推任何動作
- "schema_failed" LLM 推但全被 reject白名單/risk/registry
- "registry_unavailable" registry 載入失敗caller 傳 {} 或 None
"""
raw_list = parsed.get("recommended_actions", [])
if not isinstance(raw_list, list) or len(raw_list) == 0:
# registry 不可用是更嚴重的問題,先標 registry_unavailable
if registry_actions is not None and not registry_actions:
return [], "registry_unavailable"
return [], "empty"
if registry_actions is not None and not registry_actions:
# registry 載入失敗:所有動作都會被 F1 reject提早標記避免誤判
logger.warning(
"solver_recommended_actions_registry_unavailable",
llm_proposed=len(raw_list),
reason="registry 為空_load_mcp_tool_registry 失敗),所有動作將被 F1 reject",
)
return [], "registry_unavailable"
result: list[RecommendedAction] = []
rejected = 0
for idx, raw in enumerate(raw_list[:5]): # 最多處理 5 個,取前 3 個通過驗證的
action = _validate_recommended_action(raw, registry_actions=registry_actions)
if action is None:
rejected += 1
logger.warning(
"solver_recommended_action_schema_invalid",
index=idx,
raw=str(raw)[:200],
reason="欄位缺失/registry 不符/criticalskip不假造",
)
continue
result.append(action)
if len(result) >= 3:
break
if not result:
# LLM 有推但全被擋下 → schema_failed與 empty 區分)
return [], "schema_failed"
return result, "ok"
class SolverAgent(BaseAgent):
"""
Solver Agent — 修復方案軍師
Usage:
agent = SolverAgent()
plan = await agent.run(diagnosis_report)
"""
AGENT_NAME = AgentRole.SOLVER.value
AGENT_DESCRIPTION = "Remediation plan specialist. Produces candidate actions with blast radius scoring."
async def run(
self,
diagnosis: DiagnosisReport,
timeout_sec: float = 0.0, # noqa: ARG002 — 已廢棄,保留簽名相容性
incident_labels: dict | None = None, # 2026-04-28: 告警 labels 注入 prompt
) -> ActionPlan:
"""
根據診斷報告產出修復計畫。
Args:
diagnosis: Diagnostician 輸出
timeout_sec: 已廢棄 (2026-04-16 ogt) — LLM 等完整回應,真實異常才降級
incident_labels: 原始告警 labels用於 params 模板填充,如 namespace/pod/instance
Returns:
ActionPlan真實異常時 degraded=Truerecommended_actions=[],不假造)
"""
self._incident_labels = incident_labels or {}
start_ms = int(time.monotonic() * 1000)
# 若 Diagnostician 已棄權Solver 也應棄權(無論降級假設是否存在)
# Gate 2: 原條件 `and not diagnosis.hypotheses` 誤放行降級的 confidence=0.2 假設
if diagnosis.vote == AgentVote.ABSTAIN:
return ActionPlan(
candidates=[],
diagnosis_report=diagnosis,
latency_ms=0,
vote=AgentVote.ABSTAIN,
degraded=diagnosis.degraded,
recommended_actions=[],
)
try:
plan = await self._solve(diagnosis)
plan.latency_ms = int(time.monotonic() * 1000) - start_ms
logger.info(
"solver_done",
candidates=len(plan.candidates),
recommended_actions=len(plan.recommended_actions),
vote=plan.vote,
latency_ms=plan.latency_ms,
)
return plan
except Exception:
latency = int(time.monotonic() * 1000) - start_ms
logger.exception("solver_error")
return self._degraded_plan(diagnosis, latency, "error")
async def _solve(self, diagnosis: DiagnosisReport) -> ActionPlan:
"""核心 LLM 推理邏輯。"""
top = diagnosis.top_hypothesis
if not top:
return ActionPlan(
candidates=[],
diagnosis_report=diagnosis,
latency_ms=0,
vote=AgentVote.ABSTAIN,
recommended_actions=[],
)
# 2026-04-17 ogt + Claude Sonnet 4.6 (Checkpoint-2 環境感知):
# 根因LLM 在無叢集上下文時「盲猜」資源名稱 → awooiii-api三個 i→ K8s not found
# 修復:生成指令前先拉取實際 Deployment 清單,注入 prompt 讓 LLM 對齊真實名稱
# 失敗無害kubectl 超時或拒絕 → _k8s_inventory 為空 → prompt 仍正常但無鎖定效果
_k8s_inventory = await _fetch_k8s_inventory(namespace="awoooi-prod")
# 2026-04-27 Claude Sonnet 4.6: B1 — 動態載入 MCP tool 清單注入 prompt
mcp_registry = _load_mcp_tool_registry()
prompt = self._build_prompt({
"hypothesis": top.description,
"category": top.category,
"confidence": top.confidence,
"k8s_inventory": _k8s_inventory,
"mcp_registry": mcp_registry,
"incident_labels": getattr(self, "_incident_labels", {}),
})
# 2026-04-16 ogt + Claude Sonnet 4.6: 傳遞 hypothesis 結構化資料給 OPENCLAW_NEMO
# 根因:原本 call(prompt) 不傳 context → nemo fallback 把 prompt[:500](系統說明)
# 當 signal description → LLM 回傳「設計修復方案的軍師 Agent」垃圾
# 修復:把 top hypothesis description 放進 alert_context.signals 讓 nemo 看到真實診斷
_hypothesis_text = (top.description or "(待診斷)")[:800]
alert_context = {
"incident_id": diagnosis.evidence_snapshot_id or "UNKNOWN",
"severity": "P3",
"signals": [{"alert_name": "diagnosis_hypothesis", "description": _hypothesis_text}],
"affected_services": [],
"intent_hint": "diagnose",
}
from src.services.openclaw import get_openclaw
openclaw = get_openclaw()
_step_start = time.monotonic()
try:
response_text, _provider, success = await asyncio.wait_for(
openclaw.call(prompt, alert_context=alert_context),
timeout=AGENT_SOLVER_TIMEOUT_SEC,
)
# 2026-04-27 Claude Sonnet 4.6: A1 — success path metric observe
observe_agent_step("solver", "success", time.monotonic() - _step_start)
except asyncio.TimeoutError:
# 2026-04-27 Claude Sonnet 4.6: A1 — timeout path metric observe
observe_agent_step("solver", "timeout", time.monotonic() - _step_start)
logger.warning(
"solver_step_timeout",
snapshot_id=diagnosis.evidence_snapshot_id,
timeout_sec=AGENT_SOLVER_TIMEOUT_SEC,
)
return self._degraded_plan(diagnosis, 0, "step_timeout")
if not success or not response_text:
return self._degraded_plan(diagnosis, 0, "llm_failed")
parsed = self._parse_response(sanitize(response_text, "solver_output"))
candidates = _extract_candidates(parsed)
# 2026-04-27 Claude Sonnet 4.6: B1 — 提取 recommended_actionsschema 驗證)
# 2026-04-27 Claude Opus 4.7: F1+H1 — 傳 registry 進 validator + 拿 status enum
# LLM 輸出不合 schema 或 registry → graceful skiplog warn + empty actions不假造
recommended_actions, ra_status = _extract_recommended_actions(
parsed, registry_actions=mcp_registry,
)
if ra_status != "ok":
logger.info(
"solver_recommended_actions_not_ok",
status=ra_status,
snapshot_id=diagnosis.evidence_snapshot_id,
)
# 2026-04-25 ogt + Claude Sonnet 4.6: 非 K8s target 後置過濾P0 修復)
# 根因GiteaMemoryPressure 告警觸發 Solver → LLM 生成 "kubectl scale deployment gitea"
# Gitea 在主機 docker-compose不在 awoooi-prod namespace → 執行必然失敗
# 修復:用 inventory 清單對 candidates 過濾scale/restart/delete 若 target 不在清單則丟棄
if _k8s_inventory and candidates:
candidates = _filter_non_k8s_targets(candidates, _k8s_inventory)
logger.debug(
"solver_k8s_target_filter_applied",
remaining_candidates=len(candidates),
inventory_preview=_k8s_inventory[:100],
)
if not candidates:
return self._degraded_plan(diagnosis, 0, "no_candidates")
return ActionPlan(
candidates=candidates,
diagnosis_report=diagnosis,
latency_ms=0,
vote=AgentVote.APPROVE,
recommended_actions=recommended_actions,
recommended_actions_status=ra_status,
)
def _build_prompt(self, context: dict[str, Any]) -> str:
# 2026-04-17 ogt + Claude Sonnet 4.6: 修復 Solver action 格式問題
# 根因:舊 prompt action 範例為 "restart_service:awoooi-api"(自訂格式)
# LLM 模仿範例輸出自然語言描述,而非 kubectl 命令
# → auto_approve Condition 1c 拒絕(無 kubectl 關鍵字)
# → blast_radius_calculator 永遠不被調用fill rate = 0%
# 修復:要求 action 必須是真實 kubectl 命令,並提供正確範例
# 2026-04-17 ogt + Claude Sonnet 4.6 (Checkpoint-2): 注入 K8s 實際 Deployment 清單
# LLM 必須從此清單選擇資源名稱,不可自行編造
# 2026-04-27 Claude Sonnet 4.6: B1 — 注入 MCP tool 清單 + recommended_actions 要求
_inventory = context.get("k8s_inventory", "")
_inventory_section = (
f"\n🔒 叢集實際 Deployment 清單awoooi-prod— 必須從此清單選擇資源名稱:\n{_inventory}\n"
if _inventory
else "\n⚠️ 無法取得叢集清單,請謹慎填寫資源名稱。\n"
)
# 2026-04-25 ogt + Claude Sonnet 4.6: 防止 Gitea 等非 K8s 服務被 kubectl scale
# 根因Gitea 運行在主機 docker-compose不在 K8s awoooi-prod namespace
# LLM 看到「Gitea 記憶體壓力」後自動推薦 kubectl scale deployment gitea
# 但 gitea 不在 K8s執行必然失敗kubectl not found error
# 修復:加明確禁令 — 清單外資源禁止 kubectl scale/restart/delete
_non_k8s_warning = (
"\n🚫 禁令:若 inventory 清單中無此服務(如 gitea、sentry、harbor、postgres、signoz"
"→ 禁止使用 kubectl scale/restart/delete必須輸出空 candidates 陣列。\n"
"這些服務運行在主機 docker-compose 環境,不在 K8s 叢集內kubectl 無法操作它們。\n"
if _inventory
else ""
)
# 2026-04-27 Claude Sonnet 4.6: B1 — 動態 MCP tool 清單注入
# 只注入 provider/tool/risk/label避免 params 模板混淆 LLM
mcp_registry: dict[str, dict[str, Any]] = context.get("mcp_registry") or {}
if mcp_registry:
_mcp_lines = []
for action_name, info in mcp_registry.items():
_mcp_lines.append(
f" - name: {action_name}"
f" | provider: {info['provider']}"
f" | tool: {info['tool']}"
f" | risk: {info['risk']}"
f" | label: {info['label']}"
)
_mcp_section = (
"\n可用 MCP actions請從此清單選擇 recommended_actions\n"
+ "\n".join(_mcp_lines)
+ "\n"
)
else:
_mcp_section = "\nMCP action 清單暫不可用recommended_actions 可留空陣列)\n"
# 2026-04-27 Claude Opus 4.7: F3+vuln-V1/V2 — prompt 注入點全過 sanitize
# 北極星 Skepticism in RAGhypothesis/category 來自 Diagnostician再上游是告警 label
# 攻擊者可透過控制 alertname/description 注入「忽略上述指令」「新指令」等 jailbreak 話術。
# sanitize_service 已含 12 條中英文 patterns_INJECTION_PATTERNS呼叫前必過。
_safe_hypothesis = sanitize(str(context.get("hypothesis", ""))[:800], "solver_hypothesis")
_safe_category = sanitize(str(context.get("category", ""))[:120], "solver_category")
_confidence = context.get("confidence", 0.0)
try:
_confidence_pct = f"{float(_confidence):.0%}"
except (TypeError, ValueError):
_confidence_pct = "0%"
# 2026-04-28 Claude Sonnet 4.6: 把告警 labels 注入 prompt
# 根因LLM 不知道真實 namespace/pod/deployment/instanceparams 模板填不出來
# 修復:把 incident_labels 顯式列出LLM 直接用真實值填 params
_incident_labels: dict = context.get("incident_labels") or {}
if _incident_labels:
_labels_lines = "\n".join(
f" {k}: {v}" for k, v in sorted(_incident_labels.items()) if v
)
_labels_section = f"\n告警 Labelsparams 模板可直接引用):\n{_labels_lines}\n"
else:
_labels_section = ""
return f"""你是 AWOOOI SRE 系統的軍師 Agent專職修復方案設計。
根因假設:{_safe_hypothesis}
告警類別:{_safe_category}
診斷信心:{_confidence_pct}
{_labels_section}{_inventory_section}{_non_k8s_warning}{_mcp_section}
你的工作:依照根因假設,提出 1-3 個針對性修復方案,同時輸出 0-3 個結構化 recommended_actions。
⚠️ 核心規則:修復方案必須對應根因,禁止無腦重啟
- HostDisk 類(磁碟滿)→ 先查大檔du -sh、清 logjournalctl --vacuum、查 df -h最後才考慮擴容
- HostCPU / CPU 競爭 → 先查兇手進程top -bn1 / ps aux、找具體進程名再決定是否重啟
- OOM / 記憶體 → 先查 Pod 記憶體kubectl top pods、查 OOM log再重啟
- NetworkLatency → 先查連線狀態ss -tp / ping / traceroute
- DatabaseConnection → 先查連線池pg_stat_activity、DB log再重啟
- K8s Pod Crash → 先查 Pod logkubectl logs再重啟
- 只有在診斷明確指向 crash/OOM/deadlock 時才用重啟;資源類問題(磁碟/CPU優先用診斷+清理命令
candidates 格式規則:
- action 欄位K8s 問題用 kubectl 命令主機層問題HostDisk/HostCPU 等)用 ssh 診斷命令
- 主機 IP192.168.0.188AI+Web/ 192.168.0.110(主服務)/ 192.168.0.111Ollama
- 每個方案必須評估 blast_radius0-100 影響範圍)和 rollback_cost0-100 回滾難度)
blast_radius 參考:
- 診斷命令df/du/top/ps/kubectl top= 0
- SSH 清理 logjournalctl --vacuum / find -delete= 5
- kubectl rollout restart deployment = 10
- kubectl scale deployment --replicas=N = 15
- kubectl rollout undo deployment = 25
- kubectl delete deployment = 75
recommended_actions 規則(北極星 §1.1 修復多樣性):
- 第一個動作必須是診斷/查看類(低風險),讓 SRE 先確認情況
- 不要全部是 restart 類動作
- mcp_provider 必須是以下之一k8s | ssh | prometheus | signoz | database | internal
- risk 必須是以下之一low | medium | high | critical
- params 中可使用模板:{{labels.namespace}} / {{labels.pod}} / {{incident_id}}
以 JSON 回覆(範例為 HostDisk 場景,根據根因假設替換):
{{
"candidates": [
{{
"action": "ssh user@192.168.0.110 'df -h && du -sh /var/log/* 2>/dev/null | sort -rh | head -20'",
"blast_radius": 0,
"rollback_cost": 0,
"confidence": 0.95,
"rationale": "先確認磁碟使用情況和最大目錄,找出根因後再決定清理方式"
}},
{{
"action": "ssh user@192.168.0.110 'journalctl --vacuum-time=7d && find /tmp -mtime +7 -delete'",
"blast_radius": 5,
"rollback_cost": 5,
"confidence": 0.8,
"rationale": "清理 7 天前 journal log 和 /tmp 舊檔,釋放磁碟空間"
}}
],
"recommended_actions": [
{{
"name": "check_disk_usage",
"label": "查磁碟用量",
"emoji": "💾",
"mcp_provider": "ssh",
"mcp_tool": "ssh_exec",
"params": {{"host": "192.168.0.110", "command": "df -h && du -sh /var/log/* 2>/dev/null | sort -rh | head -10"}},
"risk": "low",
"reasoning": "診斷磁碟使用分布,找出佔用大戶"
}},
{{
"name": "clean_old_logs",
"label": "清舊 Log",
"emoji": "🗑️",
"mcp_provider": "ssh",
"mcp_tool": "ssh_exec",
"params": {{"host": "192.168.0.110", "command": "journalctl --vacuum-time=7d"}},
"risk": "low",
"reasoning": "清理 7 天前 journal log安全可回滾"
}}
]
}}"""
def _parse_response(self, response: str) -> dict[str, Any]:
return self._extract_json(response)
def analyze(self, context: dict[str, Any]) -> Any:
raise NotImplementedError("Use run() for Phase 2 agents")
def _degraded_plan(
self,
diagnosis: DiagnosisReport,
latency_ms: int,
reason: str = "unknown",
) -> ActionPlan:
"""
熔斷降級graceful degradedcandidates=[], recommended_actions=[], degraded=True
2026-04-27 Claude Sonnet 4.6: B1 — 砍掉 rule-based mock RESTART 兜底
舊行為LLM 失敗 → category-based RESTART mockhardcode 違反北極星 §1.1
新行為candidates=[]recommended_actions=[]degraded=True明確標記降級
- 不假造任何動作Coordinator 會因 candidates=[] 進入人工審核路徑)
- Telegram 顯示「AI 降級,請人工研判」而非錯誤的 hardcode 建議
北極星 §1.1「禁止寫死規則」— 降級不等於兜底假造,是誠實的能力邊界聲明。
"""
logger.warning(
"solver_degraded",
reason=reason,
snapshot_id=diagnosis.evidence_snapshot_id if diagnosis else None,
)
return ActionPlan(
candidates=[],
diagnosis_report=diagnosis,
latency_ms=latency_ms,
vote=AgentVote.DEGRADED,
degraded=True,
recommended_actions=[],
)
# ─────────────────────────────────────────────────────────────────────────────
# Helpers
# ─────────────────────────────────────────────────────────────────────────────
# 2026-04-25 ogt + Claude Sonnet 4.6: 高風險 kubectl 動詞集合P0 非 K8s target 過濾用)
# 唯讀動詞get/top/describe/logs不需過濾 — 執行失敗不會造成破壞
# 寫入動詞scale/restart/delete/undo/apply/set才需驗證 target 在 K8s inventory 內
# 注意kubectl rollout restart/undo 的第二個 token 是 "rollout"(非 restart/undo
# 因此額外加 "rollout" 讓分支進入 rollout 子動詞解析
_KUBECTL_MUTATING_VERBS: frozenset[str] = frozenset(
{"scale", "rollout", "delete", "apply", "set", "patch", "exec"}
)
# rollout 子動詞中,只有 restart/undo 是寫入操作history/status 是唯讀
_KUBECTL_ROLLOUT_MUTATING_SUBVERBS: frozenset[str] = frozenset({"restart", "undo"})
def _filter_non_k8s_targets(
candidates: list["CandidateAction"],
inventory: str,
) -> list["CandidateAction"]:
"""
後置過濾:丟棄 kubectl 寫入指令中 target 不在 K8s inventory 的 candidate。
2026-04-25 ogt + Claude Sonnet 4.6 (P0 非 K8s target 過濾):
- 根因GiteaMemoryPressure → LLM 生成 "kubectl scale deployment gitea --replicas=3"
Gitea 在主機 docker-compose不在 awoooi-prod → 執行必然失敗
- 過濾規則:
1. 解析 action 的動詞scale/restart/delete/undo 等)
2. 若動詞屬於 _KUBECTL_MUTATING_VERBS從 action 中提取 deployment/<name> 的 <name>
3. 若 <name> 不在 inventory且 inventory 非空),丟棄此 candidate + 記 warning log
4. 唯讀動詞get/top/describe/logs直接放行不做 target 驗證
- 邊界inventory 為空fetch 失敗)時不過濾,保留原有降級語意
Args:
candidates: LLM 輸出的候選方案列表
inventory: "awoooi-api, awoooi-web, postgres, ..." 格式字串(由 _fetch_k8s_inventory 提供)
Returns:
過濾後的 candidates 列表(可能為空,交由呼叫端降級)
"""
if not inventory:
return candidates
# 解析 inventory 為 set支援逗號/空格分隔
import re as _re
inventory_names: set[str] = {
n.strip().lower() for n in _re.split(r"[,\s]+", inventory) if n.strip()
}
# 從 action 提取 deployment/<name> 或 statefulset/<name> 中的 <name>
_target_pattern = _re.compile(
r"(?:deployment|statefulset|deploy|sts)/([A-Za-z0-9][\w.-]{0,62})",
_re.IGNORECASE,
)
result: list[CandidateAction] = []
for candidate in candidates:
action = candidate.action.strip()
# 解析 kubectl 動詞(第二個 tokenkubectl <verb> ...
parts = action.split()
if len(parts) < 2 or parts[0].lower() != "kubectl":
result.append(candidate)
continue
verb = parts[1].lower()
# 唯讀動詞放行
if verb not in _KUBECTL_MUTATING_VERBS:
result.append(candidate)
continue
# kubectl rollout <subverb>:只有 restart/undo 屬寫入操作
# history/status/pause/resume 是唯讀,放行不做 target 驗證
if verb == "rollout":
subverb = parts[2].lower() if len(parts) > 2 else ""
if subverb not in _KUBECTL_ROLLOUT_MUTATING_SUBVERBS:
result.append(candidate)
continue
# 寫入動詞:提取 target name
match = _target_pattern.search(action)
if not match:
# 無法解析 target → 保守放行(避免誤殺無 deployment/ 前綴的合法指令)
result.append(candidate)
continue
target_name = match.group(1).lower()
if target_name in inventory_names:
result.append(candidate)
else:
logger.warning(
"solver_non_k8s_target_rejected",
action=action[:120],
target=target_name,
reason="target 不在 K8s awoooi-prod inventory可能是 docker-compose 服務(如 gitea",
inventory_preview=inventory[:100],
)
return result
async def _fetch_k8s_inventory(namespace: str = "awoooi-prod", timeout_sec: float = 5.0) -> str:
"""
取得 K8s 叢集實際 Deployment/StatefulSet 清單,供 Solver prompt 注入。
2026-04-17 ogt + Claude Sonnet 4.6 (Checkpoint-2 環境感知):
- 在生成 kubectl 指令前查詢叢集真實資源,防止 LLM 幻覺資源名(如 awooiii-api
- 超時或失敗 → 返回 ""(呼叫端降級為警示模式,不中斷 Solver 主流程)
- 只執行唯讀 get 指令,不修改叢集
Returns:
"awoooi-api, awoooi-web, postgres, ..." 格式字串,失敗時返回 ""
"""
import asyncio as _asyncio
try:
cmd = f"kubectl get deployments,statefulsets -n {namespace} -o jsonpath='{{.items[*].metadata.name}}' 2>/dev/null"
proc = await _asyncio.create_subprocess_shell(
cmd,
stdout=_asyncio.subprocess.PIPE,
stderr=_asyncio.subprocess.PIPE,
)
try:
stdout, _ = await _asyncio.wait_for(proc.communicate(), timeout=timeout_sec)
except _asyncio.TimeoutError:
proc.kill()
logger.warning("k8s_inventory_timeout", namespace=namespace, timeout_sec=timeout_sec)
return ""
raw = (stdout or b"").decode("utf-8", errors="replace").strip()
if not raw:
return ""
# jsonpath 輸出以空格分隔,轉成可讀逗號格式
names = [n.strip() for n in raw.split() if n.strip()]
inventory = ", ".join(names)
logger.debug("k8s_inventory_fetched", namespace=namespace, count=len(names))
return inventory
except Exception as _e:
logger.warning("k8s_inventory_failed", namespace=namespace, error=str(_e))
return ""
def _extract_candidates(parsed: dict[str, Any]) -> list[CandidateAction]:
"""從 LLM 解析結果提取候選方案(按信心降序)。
支援兩種格式:
1. 標準格式:{"candidates": [{action, blast_radius, rollback_cost, confidence, rationale}]}
2. OpenClaw Nemo 格式:{"action_title": "...", "risk_level": "...", "confidence": 0.85}
2026-04-16 ogt + Claude Sonnet 4.6: 與 diagnostician 同步,修復 openclaw_nemo 格式不相容
"""
# OpenClaw Nemo 格式轉換
# 2026-04-17 ogt + Claude Sonnet 4.6: Nemo path kubectl 驗證
# 根因Nemo 回傳 {"action_title": "重啟 Crash Looping Pod"} 自然語言
# 直接用 action_title → 無 kubectl → auto_approve 誤通過 → 死迴圈
# 2026-04-24 ogt + Claude Sonnet 4.6: 修復靜默丟棄 → 語意合成兜底
# 舊action_title 無 kubectl → return [] → _degraded_plan confidence=0.2
# 新:先嘗試語意合成 kubectl 指令;真的無從映射才 return []
if "action_title" in parsed and "candidates" not in parsed:
action_title = str(parsed.get("action_title", ""))
# 2026-04-24 ogt + Claude Sonnet 4.6: confidence try/except + clampMajor #3/#4
# 根因LLM 可能回傳非數字字串(如 "high")或超界值(如 1.5 / -0.1
# float() 直接呼叫會 ValueError超界值會破壞 auto_approve 門檻判斷
# 修復try/except 捕捉型別錯誤 → 預設 0.5;再 clamp 到 [0.0, 1.0]
try:
confidence = float(parsed.get("confidence", 0.5))
except (TypeError, ValueError):
logger.warning(
"solver_nemo_confidence_type_error",
raw_confidence=parsed.get("confidence"),
)
confidence = 0.5
confidence = max(0.0, min(1.0, confidence))
risk_level = str(parsed.get("risk_level", "medium"))
risk_to_blast = {"critical": 60, "high": 40, "medium": 25, "low": 10}
blast = risk_to_blast.get(risk_level.lower(), 30)
# 2026-04-24 ogt + Claude Sonnet 4.6: 修復信心度被語意合成硬壓問題
# 根因OpenClaw NIM 回傳 action_title自然語言+ kubectl_command完整指令
# 舊邏輯action_title 無 kubectl → 語意合成 → min(confidence, 0.5) 壓到 0.5
# 修法 A優先採用 kubectl_command 欄位,保留原始 confidence如 0.9
kubectl_cmd = str(parsed.get("kubectl_command", "")).strip()
if kubectl_cmd and kubectl_cmd.startswith("kubectl"):
# 2026-04-24 ogt + Claude Sonnet 4.6: 白名單正則防注入Major #1 改版)
# 根因黑名單枚舉不完整改用白名單正則_is_safe_kubectl_command
# 修復:不通過白名單 → log warning → fall-through 到 action_title 路徑(不 return
if not _is_safe_kubectl_command(kubectl_cmd):
logger.warning(
"solver_kubectl_invalid_syntax",
cmd=kubectl_cmd[:80],
reason="未通過白名單正則檢驗",
)
# fall-through不採用此 kubectl_command繼續往下走 action_title 路徑
else:
logger.debug(
"solver_nemo_kubectl_command_used",
action_title=action_title[:80],
kubectl_command=kubectl_cmd[:80],
confidence=confidence,
)
return [CandidateAction(
action=kubectl_cmd[:200],
blast_radius=blast,
rollback_cost=20,
confidence=confidence,
rationale=f"OpenClaw Nemo: {action_title[:80]}",
)]
if "kubectl" in action_title.lower():
# C2 防禦action_title 含 kubectl 字串,但仍需白名單檢驗
# 根因action_title 可能是自然語言描述("kubectl get pods; rm -rf /"
# 未檢驗直接 action_title[:200] 會將惡意命令注入 CandidateAction
# 修復:通過 _is_safe_kubectl_command 才採用;不通過 → fall-through 到語意合成
# 2026-04-24 ogt + Claude Sonnet 4.6 (C1/C2 安全修復)
if action_title and confidence > 0 and _is_safe_kubectl_command(action_title):
return [CandidateAction(
action=action_title[:200],
blast_radius=blast,
rollback_cost=20,
confidence=confidence,
rationale=f"OpenClaw Nemo 建議: {action_title}",
)]
# 不安全或信心為 0 → fall-through 到語意合成
if action_title and not _is_safe_kubectl_command(action_title):
logger.warning(
"solver_kubernetes_command_unsafe",
action=action_title[:80],
reason="action_title 含 kubectl 但未通過白名單fall-through 至語意合成",
)
# action_title 無 kubectl → 嘗試語意合成 kubectl 指令
_at_lower = action_title.lower()
_synthesized: str | None = None
# 2026-04-25 修復 L3語意合成不能生成不完整的 kubectl 指令
# 根本原因LLM action_title 如「重啟服務」缺乏具體 deployment 名稱
# 舊邏輯:硬造 "kubectl rollout restart deployment -n awoooi-prod"(缺名)
# 下游 operation_parser 無法解析regex 要求 deployment/<name>
# → parse 失敗 → 執行失敗分支 → Telegram 被 L2 吞掉(無 provider
# 修法:優先從 parsed 提取具體資源名稱;無名則 return [] 降級到 _degraded_plan
_target: str | None = None
import re as regex_module
for _key in ("target", "resource", "deployment", "service", "pod"):
_v = str(parsed.get(_key, "")).strip().lower()
if _v and regex_module.match(r"^[a-z0-9][\w.-]{0,62}$", _v):
_target = _v
logger.debug(
"solver_synthesis_target_found",
key=_key,
target=_target,
)
break
if any(w in _at_lower for w in ("rollback", "undo", "回滾", "還原")):
if _target:
_synthesized = f"kubectl rollout undo deployment/{_target} -n awoooi-prod"
elif any(w in _at_lower for w in ("restart", "重啟", "重新啟動")):
if _target:
_synthesized = f"kubectl rollout restart deployment/{_target} -n awoooi-prod"
elif any(w in _at_lower for w in ("scale", "擴容", "縮容", "replicas")):
# scale 需要 --replicas=NLLM 無法提供時不合成
pass
elif any(w in _at_lower for w in ("logs", "日誌", "log")):
_synthesized = "kubectl logs -n awoooi-prod --tail=100 --selector=app=awoooi-api"
elif any(w in _at_lower for w in ("describe", "診斷", "diagnos")):
_synthesized = "kubectl describe pods -n awoooi-prod"
if _synthesized and _is_safe_kubectl_command(_synthesized):
logger.debug(
"solver_nemo_action_synthesized",
action_title=action_title[:80],
synthesized=_synthesized,
target=_target,
)
return [CandidateAction(
action=_synthesized,
blast_radius=blast,
rollback_cost=20,
confidence=min(confidence, 0.5), # 合成指令最高 0.5,避免誤入自動執行
rationale=f"[語意合成] Nemo 建議「{action_title[:80]}」→ 轉為 kubectl 指令",
)]
# 缺乏資源名稱或無法合成 → return [](交由 _degraded_plan 輸出 empty actions
if not _target and any(w in _at_lower for w in ("rollback", "undo", "restart", "重啟", "回滾", "還原", "重新啟動")):
logger.warning(
"solver_synthesis_insufficient_context",
action_title=action_title[:80],
reason="Deployment 名稱未被 LLM 提供,無法合成完整 kubectl 指令",
)
else:
logger.debug(
"solver_nemo_no_kubectl_fallback",
action_title=action_title[:80],
reason="action_title 無 kubectl 且語意合成失敗,降級至 _degraded_plan",
)
return []
raw = parsed.get("candidates", [])
candidates = []
for item in raw:
if not isinstance(item, dict):
continue
# 2026-04-24 ogt + Claude Sonnet 4.6: 標準 candidates 路徑白名單防護Major #2 / C2
# 根因:標準路徑未驗證 action 欄位LLM 可注入含 shell 元字符的惡意命令
# 修復:先驗原始字串(截斷前),失敗則 skip通過才截斷進 CandidateAction
# 注意:驗證必須在 [:200] 截斷前執行,否則截斷恰好移除危險字符會誤放行
action_raw = str(item.get("action", ""))
if not _is_safe_kubectl_command(action_raw):
logger.warning(
"solver_standard_action_unsafe",
action=action_raw[:80],
reason="未通過白名單檢驗",
)
continue
action = action_raw[:200]
c = CandidateAction(
action=action,
blast_radius=max(0, min(100, int(item.get("blast_radius", 50)))),
rollback_cost=max(0, min(100, int(item.get("rollback_cost", 50)))),
confidence=float(item.get("confidence", 0.0)),
rationale=str(item.get("rationale", ""))[:500],
)
candidates.append(c)
candidates.sort(key=lambda c: c.confidence, reverse=True)
return candidates
def compute_input_hash(diagnosis: DiagnosisReport) -> str:
"""計算 Solver 輸入的 fingerprint。"""
key = diagnosis.evidence_snapshot_id + (
diagnosis.top_hypothesis.description if diagnosis.top_hypothesis else ""
)
return hashlib.sha256(key.encode()).hexdigest()[:16]
# ─────────────────────────────────────────────────────────────────────────────
# Singleton
# ─────────────────────────────────────────────────────────────────────────────
_agent: SolverAgent | None = None
def get_solver_agent() -> SolverAgent:
global _agent
if _agent is None:
_agent = SolverAgent()
return _agent