Files
awoooi/apps/api/src/services/mcp_tool_registry.py
OG T f1cbf6db7d feat(adr-081): Phase 1 感官縱深 — 8D 情報蒐集 + 執行後驗證
成品:
- IncidentEvidence DB model(8D 感官 + pre/post 執行狀態)
- EvidenceSnapshot dataclass(build_summary → LLM 上下文)
- SanitizationService(Prompt Injection 0-tolerance,12 pattern)
- MCPToolRegistry(動態工具登記,suggest_tools 不寫死告警類型)
- PreDecisionInvestigator(8D 並行感官,P99 < 8s,Redis 30s 快取)
- PostExecutionVerifier(warmup 10s → 後狀態評估 success/degraded/failed)
- decision_manager + approval_execution 接線(feature flag 守衛)

Gate 1 修復:D4/D5/D7/D8 補 sanitize_dict_values;移除裸 "error" failure
signal 防 error_rate key 誤判;evidence_snapshot rowcount 零行警告。

測試:130 passed(+111 新增)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-15 13:08:38 +08:00

370 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
AWOOOI AIOps Phase 1 — MCP 工具動態登記冊
==========================================
禁止寫死工具清單。PreDecisionInvestigator 透過此 Registry
動態查詢「目前有哪些 MCP 工具可用」,並由 AI 自選要呼叫哪幾個。
設計原則:
1. 工具登記Register— 系統啟動時各 Provider 自我登記
2. 動態查詢Suggest— 依告警類型 / Incident 特徵建議相關工具
3. 健康快取Health Cache— 避免每次都打所有 Provider 測試連線
4. 感官分組Sensor Groups— 8D 感官各有對應工具組
絕對禁止:
❌ hardcode 在 pre_decision_investigator.py 裡寫死 "if kubernetes: call kubectl_get"
✅ 改為 registry.suggest_tools(incident) 回傳動態清單
ADR-081: PreDecisionInvestigator + EvidenceSnapshot
MASTER §3.1.3 (B) AI 自主工具選擇
2026-04-15 ogt + Claude Sonnet 4.6 (亞太): Phase 1 初始建立
"""
from __future__ import annotations
from dataclasses import dataclass, field
from enum import Enum
from typing import Any
import structlog
from src.plugins.mcp.interfaces import MCPTool, MCPToolProvider
logger = structlog.get_logger(__name__)
class SensorDimension(str, Enum):
"""8D 感官維度分類"""
D1_K8S_STATE = "d1_k8s_state"
D2_LOGS = "d2_logs"
D3_METRICS = "d3_metrics"
D4_CHANGES = "d4_changes"
D5_BUSINESS = "d5_business"
D6_HISTORY = "d6_history"
D7_PEERS = "d7_peers"
D8_TOPOLOGY = "d8_topology"
@dataclass
class RegisteredTool:
"""登記在 Registry 的工具定義(含感官維度標籤)"""
tool: MCPTool
provider: MCPToolProvider
dimensions: list[SensorDimension]
incident_type_hints: list[str] = field(default_factory=list)
"""告警前綴白名單(空 = 適用所有告警)"""
priority: int = 5
"""1=最高優先(必呼叫)~ 10=最低(只在特定場景)"""
class MCPToolRegistry:
"""
MCP 工具動態登記冊。
系統啟動時,各 Provider 呼叫 register_provider() 自我登記。
PreDecisionInvestigator 透過 suggest_tools() 取得本次應呼叫的工具清單。
Usage:
registry = get_mcp_tool_registry()
# 啟動時登記(通常在 lifespan 或 Provider __init__
await registry.register_provider(k8s_provider)
# 決策前查詢
tools = registry.suggest_tools(
alertname="KubePodCrashLooping",
incident_labels={"namespace": "awoooi-prod"},
)
for reg_tool in tools:
result = await reg_tool.provider.execute(
reg_tool.tool.name, params
)
"""
def __init__(self) -> None:
self._tools: list[RegisteredTool] = []
self._provider_names: set[str] = set()
async def register_provider(self, provider: MCPToolProvider) -> int:
"""
登記一個 MCP Provider 的所有工具。
Args:
provider: MCPToolProvider 實作
Returns:
int: 成功登記的工具數量
"""
if not provider.enabled:
logger.info("mcp_registry_provider_disabled", provider=provider.name)
return 0
if provider.name in self._provider_names:
logger.warning("mcp_registry_duplicate_provider", provider=provider.name)
return 0
try:
tools = await provider.list_tools()
except Exception:
logger.exception("mcp_registry_list_tools_error", provider=provider.name)
return 0
count = 0
for tool in tools:
reg = _classify_tool(tool, provider)
self._tools.append(reg)
count += 1
self._provider_names.add(provider.name)
logger.info(
"mcp_registry_provider_registered",
provider=provider.name,
tool_count=count,
)
return count
def register_tool_manually(
self,
tool: MCPTool,
provider: MCPToolProvider,
dimensions: list[SensorDimension],
incident_type_hints: list[str] | None = None,
priority: int = 5,
) -> None:
"""
手動登記單一工具(用於測試或特殊工具注入)。
"""
self._tools.append(RegisteredTool(
tool=tool,
provider=provider,
dimensions=dimensions,
incident_type_hints=incident_type_hints or [],
priority=priority,
))
def suggest_tools(
self,
alertname: str = "",
incident_labels: dict[str, Any] | None = None, # noqa: ARG002 — Phase 4 used for namespace filter
max_tools: int = 8,
) -> list[RegisteredTool]:
"""
依告警特徵推薦應呼叫的工具清單8D 覆蓋,去重,優先排序)。
選擇邏輯:
1. incident_type_hints 為空 → 所有告警適用
2. incident_type_hints 非空 → alertname 必須以其中之一開頭
3. 工具已在 Provider 停用 → 跳過
4. 依 priority 升序排列1=最高)
5. 最多回傳 max_tools 個(防止超出 token budget / latency budget
Args:
alertname: 告警名稱(如 "KubePodCrashLooping"
incident_labels: 告警 labels{"namespace": "awoooi-prod"}
max_tools: 最多回傳幾個工具(預設 8對應 8D
Returns:
list[RegisteredTool]: 推薦工具(已排序)
"""
suggested: list[RegisteredTool] = []
# 依優先度排序後篩選
sorted_tools = sorted(self._tools, key=lambda t: t.priority)
for reg in sorted_tools:
# 工具 Provider 停用
if not reg.provider.enabled:
continue
# incident_type_hints 過濾
if reg.incident_type_hints:
if not any(alertname.startswith(hint) for hint in reg.incident_type_hints):
continue
# 感官維度去重(每個維度取優先度最高的一個工具即可)
# 但允許多個工具覆蓋同一維度(例如 D1 需要 kubectl_describe + kubectl_events
suggested.append(reg)
# 取前 max_tools 個
result = suggested[:max_tools]
logger.debug(
"mcp_registry_suggest_tools",
alertname=alertname,
suggested_count=len(result),
dims=[d.value for reg in result for d in reg.dimensions],
)
return result
def get_all_tools(self) -> list[RegisteredTool]:
"""取得所有已登記的工具(供健康檢查 / API 列表用)。"""
return list(self._tools)
@property
def provider_count(self) -> int:
return len(self._provider_names)
@property
def tool_count(self) -> int:
return len(self._tools)
# ─────────────────────────────────────────────────────────────────────────────
# 工具自動分類(根據 tool name 推斷感官維度)
# ─────────────────────────────────────────────────────────────────────────────
def _classify_tool(tool: MCPTool, provider: MCPToolProvider) -> RegisteredTool:
"""
依工具名稱自動推斷感官維度與告警類型提示。
這是啟動時的靜態分類,不影響 suggest_tools() 的動態選擇。
"""
name = tool.name.lower()
dims: list[SensorDimension] = []
hints: list[str] = []
priority = 5
# D1 K8s 狀態
if any(k in name for k in ("describe", "pod", "deployment", "node", "hpa", "event", "k8s_get")):
dims.append(SensorDimension.D1_K8S_STATE)
hints = ["Kube", "Pod", "Deploy", "Node", "Velero", "ArgoCD"]
priority = 2
# D2 日誌(精確匹配:避免 "topology" 中的 "log" substring 誤觸)
elif any(k in name for k in ("logs", "stderr", "journal")) or "_log" in name or name.startswith("log"):
dims.append(SensorDimension.D2_LOGS)
priority = 2
# D3 指標
elif any(k in name for k in ("metric", "prometheus", "query", "range", "cpu", "memory", "disk")):
dims.append(SensorDimension.D3_METRICS)
priority = 3
# D4 部署變更
elif any(k in name for k in ("deploy", "diff", "argocd", "gitea", "git", "revision")):
dims.append(SensorDimension.D4_CHANGES)
priority = 3
# D5 業務指標Grafana / Signoz SLI
elif any(k in name for k in ("sli", "slo", "order", "revenue", "business", "grafana")):
dims.append(SensorDimension.D5_BUSINESS)
priority = 4
# D6 歷史脈絡RAG / KM 查詢)
elif any(k in name for k in ("rag", "knowledge", "history", "similar", "past")):
dims.append(SensorDimension.D6_HISTORY)
priority = 4
# D7 同級副本
elif any(k in name for k in ("peer", "replica", "scale", "replicaset")):
dims.append(SensorDimension.D7_PEERS)
priority = 5
# D8 依賴拓撲
elif any(k in name for k in ("topology", "istio", "mesh", "upstream", "downstream", "trace")):
dims.append(SensorDimension.D8_TOPOLOGY)
priority = 6
# SSH 工具橫跨多維度
elif "ssh" in name:
dims = [SensorDimension.D1_K8S_STATE, SensorDimension.D2_LOGS, SensorDimension.D3_METRICS]
hints = ["Host", "Docker", "Sentry", "Harbor", "Ollama", "Backup"]
priority = 2
else:
dims = [SensorDimension.D1_K8S_STATE] # 預設放 D1
return RegisteredTool(
tool=tool,
provider=provider,
dimensions=dims,
incident_type_hints=hints,
priority=priority,
)
# ─────────────────────────────────────────────────────────────────────────────
# Singleton
# ─────────────────────────────────────────────────────────────────────────────
_registry: MCPToolRegistry | None = None
def get_mcp_tool_registry() -> MCPToolRegistry:
"""
取得 Registry Singleton。
初始化時機:應用程式啟動 lifespan 中呼叫 init_mcp_tool_registry()。
"""
global _registry
if _registry is None:
_registry = MCPToolRegistry()
return _registry
async def init_mcp_tool_registry() -> MCPToolRegistry:
"""
初始化並登記所有可用 MCP Provider。
在 main.py lifespan startup 中呼叫。
Feature flag AIOPS_P1_ENABLED=False 時不初始化(直接回傳空 Registry
Returns:
MCPToolRegistry: 已初始化的 Registry含全部工具
"""
from src.core.feature_flags import aiops_flags
registry = get_mcp_tool_registry()
if not aiops_flags.is_phase_enabled(1):
logger.info("mcp_registry_skip_p1_disabled")
return registry
# 登記所有可用 Provider
providers_to_register = _build_providers()
total = 0
for provider in providers_to_register:
count = await registry.register_provider(provider)
total += count
logger.info(
"mcp_registry_initialized",
providers=registry.provider_count,
tools=registry.tool_count,
total_registered=total,
)
return registry
def _build_providers() -> list[MCPToolProvider]:
"""
建立並回傳所有 MCP Provider 實例。
安全原則:各 Provider 的 enabled 屬性由環境變數控制,
不可用的 Provider 在 register_provider() 中會被靜默跳過。
"""
from src.plugins.mcp.providers.k8s_provider import K8sProvider
from src.plugins.mcp.providers.prometheus_provider import PrometheusProvider
from src.plugins.mcp.providers.ssh_provider import SSHProvider
providers: list[MCPToolProvider] = []
# K8s Provider (D1: Pod 狀態/事件/日誌)
try:
providers.append(K8sProvider())
except Exception:
logger.warning("mcp_registry_k8s_provider_init_failed")
# SSH Provider (D1/D2/D3: 主機層感官)
try:
providers.append(SSHProvider())
except Exception:
logger.warning("mcp_registry_ssh_provider_init_failed")
# Prometheus Provider (D3: 時序指標)
try:
providers.append(PrometheusProvider())
except Exception:
logger.warning("mcp_registry_prometheus_provider_init_failed")
return providers