Files
awoooi/apps/api/src/services/mcp_tool_registry.py
Your Name 64c7044282
All checks were successful
Code Review / ai-code-review (push) Successful in 11s
CD Pipeline / tests (push) Successful in 1m17s
CD Pipeline / build-and-deploy (push) Successful in 3m41s
CD Pipeline / post-deploy-checks (push) Successful in 1m43s
fix(mcp): balance host alert tool suggestions
2026-05-18 12:14:21 +08:00

415 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
AWOOOI AIOps Phase 1 — MCP 工具動態登記冊
==========================================
禁止寫死工具清單。PreDecisionInvestigator 透過此 Registry
動態查詢「目前有哪些 MCP 工具可用」,並由 AI 自選要呼叫哪幾個。
設計原則:
1. 工具登記Register— 系統啟動時各 Provider 自我登記
2. 動態查詢Suggest— 依告警類型 / Incident 特徵建議相關工具
3. 健康快取Health Cache— 避免每次都打所有 Provider 測試連線
4. 感官分組Sensor Groups— 8D 感官各有對應工具組
絕對禁止:
❌ hardcode 在 pre_decision_investigator.py 裡寫死 "if kubernetes: call kubectl_get"
✅ 改為 registry.suggest_tools(incident) 回傳動態清單
ADR-081: PreDecisionInvestigator + EvidenceSnapshot
MASTER §3.1.3 (B) AI 自主工具選擇
2026-04-15 ogt + Claude Sonnet 4.6 (亞太): Phase 1 初始建立
"""
from __future__ import annotations
from dataclasses import dataclass, field, replace
from enum import StrEnum
from typing import Any
import structlog
from src.plugins.mcp.interfaces import MCPTool, MCPToolProvider
from src.plugins.mcp.registry import AuditedMCPToolProvider
logger = structlog.get_logger(__name__)
class SensorDimension(StrEnum):
"""8D 感官維度分類"""
D1_K8S_STATE = "d1_k8s_state"
D2_LOGS = "d2_logs"
D3_METRICS = "d3_metrics"
D4_CHANGES = "d4_changes"
D5_BUSINESS = "d5_business"
D6_HISTORY = "d6_history"
D7_PEERS = "d7_peers"
D8_TOPOLOGY = "d8_topology"
@dataclass
class RegisteredTool:
"""登記在 Registry 的工具定義(含感官維度標籤)"""
tool: MCPTool
provider: MCPToolProvider
dimensions: list[SensorDimension]
incident_type_hints: list[str] = field(default_factory=list)
"""告警前綴白名單(空 = 適用所有告警)"""
priority: int = 5
"""1=最高優先(必呼叫)~ 10=最低(只在特定場景)"""
class MCPToolRegistry:
"""
MCP 工具動態登記冊。
系統啟動時,各 Provider 呼叫 register_provider() 自我登記。
PreDecisionInvestigator 透過 suggest_tools() 取得本次應呼叫的工具清單。
Usage:
registry = get_mcp_tool_registry()
# 啟動時登記(通常在 lifespan 或 Provider __init__
await registry.register_provider(k8s_provider)
# 決策前查詢
tools = registry.suggest_tools(
alertname="KubePodCrashLooping",
incident_labels={"namespace": "awoooi-prod"},
)
for reg_tool in tools:
result = await reg_tool.provider.execute(
reg_tool.tool.name, params
)
"""
def __init__(self) -> None:
self._tools: list[RegisteredTool] = []
self._provider_names: set[str] = set()
async def register_provider(self, provider: MCPToolProvider) -> int:
"""
登記一個 MCP Provider 的所有工具。
Args:
provider: MCPToolProvider 實作
Returns:
int: 成功登記的工具數量
"""
if not provider.enabled:
logger.info("mcp_registry_provider_disabled", provider=provider.name)
return 0
if provider.name in self._provider_names:
logger.warning("mcp_registry_duplicate_provider", provider=provider.name)
return 0
try:
tools = await provider.list_tools()
except Exception:
logger.exception("mcp_registry_list_tools_error", provider=provider.name)
return 0
count = 0
for tool in tools:
if not tool.server_name:
tool = replace(tool, server_name=provider.name)
audited_provider = AuditedMCPToolProvider(provider)
reg = _classify_tool(tool, audited_provider)
self._tools.append(reg)
count += 1
self._provider_names.add(provider.name)
logger.info(
"mcp_registry_provider_registered",
provider=provider.name,
tool_count=count,
)
return count
def register_tool_manually(
self,
tool: MCPTool,
provider: MCPToolProvider,
dimensions: list[SensorDimension],
incident_type_hints: list[str] | None = None,
priority: int = 5,
) -> None:
"""
手動登記單一工具(用於測試或特殊工具注入)。
"""
self._tools.append(RegisteredTool(
tool=tool,
provider=provider,
dimensions=dimensions,
incident_type_hints=incident_type_hints or [],
priority=priority,
))
def suggest_tools(
self,
alertname: str = "",
incident_labels: dict[str, Any] | None = None, # noqa: ARG002 — Phase 4 used for namespace filter
max_tools: int = 8,
) -> list[RegisteredTool]:
"""
依告警特徵推薦應呼叫的工具清單8D 覆蓋,去重,優先排序)。
選擇邏輯:
1. incident_type_hints 為空 → 所有告警適用
2. incident_type_hints 非空 → alertname 必須以其中之一開頭
3. 工具已在 Provider 停用 → 跳過
4. 依 priority 升序排列1=最高)
5. 最多回傳 max_tools 個(防止超出 token budget / latency budget
Args:
alertname: 告警名稱(如 "KubePodCrashLooping"
incident_labels: 告警 labels{"namespace": "awoooi-prod"}
max_tools: 最多回傳幾個工具(預設 8對應 8D
Returns:
list[RegisteredTool]: 推薦工具(已排序)
"""
suggested: list[RegisteredTool] = []
labels = incident_labels or {}
# namespace alone is only routing context. Requiring a concrete
# workload/node locator prevents host alerts such as HostErrorLogFlood
# from being misrouted into Kubernetes tools just because an upstream
# bridge added namespace="infra".
has_k8s_locator = any(
labels.get(key)
for key in ("deployment", "pod", "node", "container")
)
# 依優先度排序後篩選
sorted_tools = sorted(self._tools, key=lambda t: t.priority)
for reg in sorted_tools:
# 工具 Provider 停用
if not reg.provider.enabled:
continue
# incident_type_hints 過濾
if reg.incident_type_hints:
matches_hint = any(alertname.startswith(hint) for hint in reg.incident_type_hints)
is_k8s_state_tool = SensorDimension.D1_K8S_STATE in reg.dimensions
if not matches_hint and not (has_k8s_locator and is_k8s_state_tool):
continue
# 感官維度去重(每個維度取優先度最高的一個工具即可)
# 但允許多個工具覆蓋同一維度(例如 D1 需要 kubectl_describe + kubectl_events
suggested.append(reg)
# 取前 max_tools 個;同一 provider 先給半數上限,避免單一 MCP
# provider 擠滿整個 8D 預算,讓 SignOz/Prometheus/Sentry 這類側證
# 有機會和主診斷工具一起進入調查。
result = _select_provider_balanced_tools(suggested, max_tools)
logger.debug(
"mcp_registry_suggest_tools",
alertname=alertname,
suggested_count=len(result),
dims=[d.value for reg in result for d in reg.dimensions],
)
return result
def get_all_tools(self) -> list[RegisteredTool]:
"""取得所有已登記的工具(供健康檢查 / API 列表用)。"""
return list(self._tools)
@property
def provider_count(self) -> int:
return len(self._provider_names)
@property
def tool_count(self) -> int:
return len(self._tools)
def _select_provider_balanced_tools(
tools: list[RegisteredTool],
max_tools: int,
) -> list[RegisteredTool]:
if max_tools <= 0:
return []
provider_soft_cap = max(2, max_tools // 2)
selected: list[RegisteredTool] = []
deferred: list[RegisteredTool] = []
provider_counts: dict[str, int] = {}
for reg in tools:
if len(selected) >= max_tools:
break
provider_name = reg.provider.name
count = provider_counts.get(provider_name, 0)
if count >= provider_soft_cap:
deferred.append(reg)
continue
selected.append(reg)
provider_counts[provider_name] = count + 1
for reg in deferred:
if len(selected) >= max_tools:
break
selected.append(reg)
return selected
# ─────────────────────────────────────────────────────────────────────────────
# 工具自動分類(根據 tool name 推斷感官維度)
# ─────────────────────────────────────────────────────────────────────────────
def _classify_tool(tool: MCPTool, provider: MCPToolProvider) -> RegisteredTool:
"""
依工具名稱自動推斷感官維度與告警類型提示。
這是啟動時的靜態分類,不影響 suggest_tools() 的動態選擇。
"""
name = tool.name.lower()
dims: list[SensorDimension] = []
hints: list[str] = []
priority = 5
# D1 K8s 狀態
if any(k in name for k in ("describe", "pod", "deployment", "node", "hpa", "event", "k8s_get", "rollout")):
dims.append(SensorDimension.D1_K8S_STATE)
hints = ["Kube", "Pod", "Deploy", "Node", "Velero", "ArgoCD"]
priority = 1 if "rollout" in name else 2
# D2 日誌(精確匹配:避免 "topology" 中的 "log" substring 誤觸)
elif any(k in name for k in ("logs", "stderr", "journal")) or "_log" in name or name.startswith("log"):
dims.append(SensorDimension.D2_LOGS)
priority = 2
# D3 指標
elif any(k in name for k in ("metric", "prometheus", "query", "range", "cpu", "memory", "disk")):
dims.append(SensorDimension.D3_METRICS)
priority = 3
# D4 部署變更
elif any(k in name for k in ("deploy", "diff", "argocd", "gitea", "git", "revision")):
dims.append(SensorDimension.D4_CHANGES)
priority = 3
# D5 業務指標Grafana / Signoz SLI
elif any(k in name for k in ("sli", "slo", "order", "revenue", "business", "grafana")):
dims.append(SensorDimension.D5_BUSINESS)
priority = 4
# D6 歷史脈絡RAG / KM 查詢)
elif any(k in name for k in ("rag", "knowledge", "history", "similar", "past")):
dims.append(SensorDimension.D6_HISTORY)
priority = 4
# D7 同級副本
elif any(k in name for k in ("peer", "replica", "scale", "replicaset")):
dims.append(SensorDimension.D7_PEERS)
priority = 5
# D8 依賴拓撲
elif any(k in name for k in ("topology", "istio", "mesh", "upstream", "downstream", "trace")):
dims.append(SensorDimension.D8_TOPOLOGY)
priority = 6
# SSH 工具橫跨多維度
elif "ssh" in name:
dims = [SensorDimension.D1_K8S_STATE, SensorDimension.D2_LOGS, SensorDimension.D3_METRICS]
hints = ["Host", "Docker", "Sentry", "Harbor", "Ollama", "Backup"]
priority = 2
else:
dims = [SensorDimension.D1_K8S_STATE] # 預設放 D1
return RegisteredTool(
tool=tool,
provider=provider,
dimensions=dims,
incident_type_hints=hints,
priority=priority,
)
# ─────────────────────────────────────────────────────────────────────────────
# Singleton
# ─────────────────────────────────────────────────────────────────────────────
_registry: MCPToolRegistry | None = None
def get_mcp_tool_registry() -> MCPToolRegistry:
"""
取得 Registry Singleton。
初始化時機:應用程式啟動 lifespan 中呼叫 init_mcp_tool_registry()。
"""
global _registry
if _registry is None:
_registry = MCPToolRegistry()
return _registry
async def init_mcp_tool_registry() -> MCPToolRegistry:
"""
初始化並登記所有可用 MCP Provider。
在 main.py lifespan startup 中呼叫。
Feature flag AIOPS_P1_ENABLED=False 時不初始化(直接回傳空 Registry
Returns:
MCPToolRegistry: 已初始化的 Registry含全部工具
"""
from src.core.feature_flags import aiops_flags
registry = get_mcp_tool_registry()
if not aiops_flags.is_phase_enabled(1):
logger.info("mcp_registry_skip_p1_disabled")
return registry
# 登記所有可用 Provider
providers_to_register = _build_providers()
total = 0
for provider in providers_to_register:
count = await registry.register_provider(provider)
total += count
logger.info(
"mcp_registry_initialized",
providers=registry.provider_count,
tools=registry.tool_count,
total_registered=total,
)
return registry
def _build_providers() -> list[MCPToolProvider]:
"""
建立並回傳所有 MCP Provider 實例。
安全原則:各 Provider 的 enabled 屬性由環境變數控制,
不可用的 Provider 在 register_provider() 中會被靜默跳過。
"""
providers: list[MCPToolProvider] = []
provider_specs = [
("k8s", "src.plugins.mcp.providers.k8s_provider", "K8sProvider"),
("ssh", "src.plugins.mcp.providers.ssh_provider", "SSHProvider"),
("prometheus", "src.plugins.mcp.providers.prometheus_provider", "PrometheusProvider"),
("signoz", "src.plugins.mcp.providers.signoz_provider", "SignOzProvider"),
("database", "src.plugins.mcp.providers.database_provider", "DatabaseProvider"),
("filesystem", "src.plugins.mcp.providers.filesystem_provider", "FilesystemProvider"),
("grafana", "src.plugins.mcp.providers.grafana_provider", "GrafanaProvider"),
("rag", "src.plugins.mcp.providers.rag_provider", "RAGProvider"),
("argocd", "src.plugins.mcp.providers.argocd_provider", "ArgoCDProvider"),
("sentry", "src.plugins.mcp.providers.sentry_provider", "SentryProvider"),
]
for provider_name, module_name, class_name in provider_specs:
try:
module = __import__(module_name, fromlist=[class_name])
providers.append(getattr(module, class_name)())
except Exception:
logger.warning("mcp_registry_provider_init_failed", provider=provider_name)
return providers