diff --git a/.agents/skills/07-tool-integration-expert.md b/.agents/skills/07-tool-integration-expert.md index b28bb2af..fcb057d0 100644 --- a/.agents/skills/07-tool-integration-expert.md +++ b/.agents/skills/07-tool-integration-expert.md @@ -10,16 +10,17 @@ | 欄位 | 值 | |------|-----| -| **版本** | v1.3 | +| **版本** | v1.4 | | **建立日期** | 2026-03-25 23:30 (台北) | | **建立者** | Claude Code | -| **最後修改** | 2026-03-26 18:00 (台北) | -| **修改者** | Claude Code | +| **最後修改** | 2026-05-01 15:45 (台北) | +| **修改者** | Codex | ### 變更紀錄 | 版本 | 日期 | 執行者 | 變更內容 | |------|------|--------|----------| +| v1.4 | 2026-05-01 | Codex | MCP Agent Loop governance、audit schema、Agent role tool permissions | | v1.3 | 2026-03-26 18:00 | Claude Code | 新增 Grafana MCP (#83) + SignOz query_logs | | v1.2 | 2026-03-26 23:30 | Claude Code | 新增 Filesystem MCP Tool (#82 已完成) | | v1.1 | 2026-03-26 14:20 | Claude Code | 更新 MCP Tool 狀態 (#79/#80/#81 已完成) | @@ -48,6 +49,15 @@ Phase 13.2 Tool 實作 (P0 最優先): | **Grafana** | ✅ 真實 | `providers/grafana_provider.py` | #83 ✅ | | 維運手冊 RAG | 📋 設計完成 | - | #84 (待實作) | +## Agent Loop MCP 鐵律 (ADR-105) + +- MCP Provider 已存在時,不要重複安裝外部 MCP server;先接入 `ProviderRegistry` / `MCPToolRegistry`,再補 audit 與權限。 +- 所有 provider `execute()` 必須經過 audited wrapper,寫入 `mcp_audit_log` 與 `mcp_daily_stats`。 +- Agent Loop 工具 schema 必須由 `ai_providers/tool_schema.py` 產生,禁止 provider 各自手刻不同命名規則。 +- OpenClaw / NemoTron / Hermes / ElephantAlpha 的工具白名單必須由 `ai_providers/permissions.py` 控制。 +- Internal RAG/MCP 知識層沿用 PostgreSQL + pgvector + Redis hot cache;不得為「MCP RAG」另建孤立資料庫,除非已有量級、隔離或延遲證據。 +- `incident_id` 在 MCP audit schema 中使用 `VARCHAR(64)`,因為 AWOOOI incident 是 `INC-*` 字串,不是 UUID。 + ### 已完成 Tool 功能 **SignOz MCP (#79)**: diff --git a/apps/api/migrations/adr105_mcp_audit_snapshots.sql b/apps/api/migrations/adr105_mcp_audit_snapshots.sql new file mode 100644 index 00000000..b37f6e13 --- /dev/null +++ b/apps/api/migrations/adr105_mcp_audit_snapshots.sql @@ -0,0 +1,77 @@ +-- ADR-105 MCP audit and snapshot foundation +-- 2026-05-01 +-- Notes: +-- AWOOOI incident ids are string values such as INC-20260429-xxxx, not UUIDs. +-- Keep incident_id as VARCHAR(64) so MCP audit can join existing incident records. + +CREATE TABLE IF NOT EXISTS mcp_audit_log ( + id BIGSERIAL PRIMARY KEY, + session_id VARCHAR(36) NOT NULL, + flywheel_node VARCHAR(20), + mcp_server VARCHAR(80) NOT NULL, + tool_name VARCHAR(120) NOT NULL, + input_params JSONB, + output_result JSONB, + duration_ms INTEGER, + success BOOLEAN, + error_message TEXT, + incident_id VARCHAR(64), + agent_role VARCHAR(40), + created_at TIMESTAMPTZ DEFAULT NOW() +); + +ALTER TABLE mcp_audit_log + ADD COLUMN IF NOT EXISTS agent_role VARCHAR(40); + +CREATE INDEX IF NOT EXISTS idx_mcp_audit_session + ON mcp_audit_log(session_id); +CREATE INDEX IF NOT EXISTS idx_mcp_audit_incident + ON mcp_audit_log(incident_id); +CREATE INDEX IF NOT EXISTS idx_mcp_audit_node + ON mcp_audit_log(flywheel_node, created_at DESC); +CREATE INDEX IF NOT EXISTS idx_mcp_audit_server_tool + ON mcp_audit_log(mcp_server, tool_name, created_at DESC); +CREATE INDEX IF NOT EXISTS idx_mcp_audit_agent_role + ON mcp_audit_log(agent_role, created_at DESC); + +CREATE TABLE IF NOT EXISTS mcp_daily_stats ( + date DATE NOT NULL, + mcp_server VARCHAR(80) NOT NULL, + tool_name VARCHAR(120) NOT NULL, + call_count INTEGER DEFAULT 0 NOT NULL, + success_count INTEGER DEFAULT 0 NOT NULL, + avg_duration_ms FLOAT, + PRIMARY KEY (date, mcp_server, tool_name) +); + +CREATE TABLE IF NOT EXISTS k8s_state_snapshots ( + id BIGSERIAL PRIMARY KEY, + incident_id VARCHAR(64), + snapshot_type VARCHAR(40) NOT NULL, + namespace VARCHAR(63), + resource_type VARCHAR(80), + resource_name VARCHAR(253), + state_json JSONB, + captured_at TIMESTAMPTZ DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_k8s_snapshot_incident + ON k8s_state_snapshots(incident_id); +CREATE INDEX IF NOT EXISTS idx_k8s_snapshot_resource + ON k8s_state_snapshots(namespace, resource_type, resource_name); +CREATE INDEX IF NOT EXISTS idx_k8s_snapshot_captured + ON k8s_state_snapshots(captured_at DESC); + +CREATE TABLE IF NOT EXISTS prometheus_snapshots ( + id BIGSERIAL PRIMARY KEY, + incident_id VARCHAR(64), + query TEXT NOT NULL, + result_json JSONB, + snapshot_type VARCHAR(40), + captured_at TIMESTAMPTZ DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_prom_snapshot_incident + ON prometheus_snapshots(incident_id); +CREATE INDEX IF NOT EXISTS idx_prom_snapshot_type + ON prometheus_snapshots(snapshot_type, captured_at DESC); diff --git a/apps/api/src/db/models.py b/apps/api/src/db/models.py index 396c82a6..1b341084 100644 --- a/apps/api/src/db/models.py +++ b/apps/api/src/db/models.py @@ -19,6 +19,7 @@ from sqlalchemy import ( BigInteger, Boolean, CheckConstraint, + Date, DateTime, Float, Index, @@ -456,6 +457,90 @@ class AuditLog(Base): ) +# ============================================================================= +# MCP Audit / Snapshots — 2026-05-01 ghost-loop + MCP governance +# ============================================================================= + +class MCPAuditLog(Base): + """Durable audit trail for every MCP tool call.""" + + __tablename__ = "mcp_audit_log" + + id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True) + session_id: Mapped[str] = mapped_column(String(36), nullable=False) + flywheel_node: Mapped[str | None] = mapped_column(String(20), nullable=True) + mcp_server: Mapped[str] = mapped_column(String(80), nullable=False) + tool_name: Mapped[str] = mapped_column(String(120), nullable=False) + input_params: Mapped[dict | None] = mapped_column(JSONB, nullable=True) + output_result: Mapped[dict | list | str | None] = mapped_column(JSONB, nullable=True) + duration_ms: Mapped[int | None] = mapped_column(Integer, nullable=True) + success: Mapped[bool | None] = mapped_column(Boolean, nullable=True) + error_message: Mapped[str | None] = mapped_column(Text, nullable=True) + incident_id: Mapped[str | None] = mapped_column(String(64), nullable=True) + agent_role: Mapped[str | None] = mapped_column(String(40), nullable=True) + created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=taipei_now) + + __table_args__ = ( + Index("idx_mcp_audit_session", "session_id"), + Index("idx_mcp_audit_incident", "incident_id"), + Index("idx_mcp_audit_node", "flywheel_node", "created_at"), + Index("idx_mcp_audit_server_tool", "mcp_server", "tool_name", "created_at"), + Index("idx_mcp_audit_agent_role", "agent_role", "created_at"), + ) + + +class MCPDailyStats(Base): + """Daily aggregate for MCP provider/tool success rate and latency.""" + + __tablename__ = "mcp_daily_stats" + + date: Mapped[datetime] = mapped_column(Date, primary_key=True) + mcp_server: Mapped[str] = mapped_column(String(80), primary_key=True) + tool_name: Mapped[str] = mapped_column(String(120), primary_key=True) + call_count: Mapped[int] = mapped_column(Integer, default=0, nullable=False) + success_count: Mapped[int] = mapped_column(Integer, default=0, nullable=False) + avg_duration_ms: Mapped[float | None] = mapped_column(Float, nullable=True) + + +class K8sStateSnapshot(Base): + """Pre/post Kubernetes resource state snapshots for remediation verification.""" + + __tablename__ = "k8s_state_snapshots" + + id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True) + incident_id: Mapped[str | None] = mapped_column(String(64), nullable=True) + snapshot_type: Mapped[str] = mapped_column(String(40), nullable=False) + namespace: Mapped[str | None] = mapped_column(String(63), nullable=True) + resource_type: Mapped[str | None] = mapped_column(String(80), nullable=True) + resource_name: Mapped[str | None] = mapped_column(String(253), nullable=True) + state_json: Mapped[dict | None] = mapped_column(JSONB, nullable=True) + captured_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=taipei_now) + + __table_args__ = ( + Index("idx_k8s_snapshot_incident", "incident_id"), + Index("idx_k8s_snapshot_resource", "namespace", "resource_type", "resource_name"), + Index("idx_k8s_snapshot_captured", "captured_at"), + ) + + +class PrometheusSnapshot(Base): + """Prometheus query snapshots for detect/verify flywheel stages.""" + + __tablename__ = "prometheus_snapshots" + + id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True) + incident_id: Mapped[str | None] = mapped_column(String(64), nullable=True) + query: Mapped[str] = mapped_column(Text, nullable=False) + result_json: Mapped[dict | list | str | None] = mapped_column(JSONB, nullable=True) + snapshot_type: Mapped[str | None] = mapped_column(String(40), nullable=True) + captured_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=taipei_now) + + __table_args__ = ( + Index("idx_prom_snapshot_incident", "incident_id"), + Index("idx_prom_snapshot_type", "snapshot_type", "captured_at"), + ) + + # ============================================================================= # AutoRepairExecution - Phase 10 操作記錄 # 2026-04-08 Claude Code: 統帥指令「所有操作都必須被記錄,寫入資料庫」 diff --git a/apps/api/src/plugins/mcp/registry.py b/apps/api/src/plugins/mcp/registry.py index 43196e9a..4451b881 100644 --- a/apps/api/src/plugins/mcp/registry.py +++ b/apps/api/src/plugins/mcp/registry.py @@ -12,11 +12,65 @@ Provider 註冊中心,實現依賴注入 (DI) 模式: import structlog -from src.plugins.mcp.interfaces import MCPToolProvider +from src.plugins.mcp.interfaces import MCPTool, MCPToolProvider, MCPToolResult logger = structlog.get_logger(__name__) +class AuditedMCPToolProvider(MCPToolProvider): + """Provider wrapper that writes every MCP tool call to the audit subsystem.""" + + def __init__(self, provider: MCPToolProvider) -> None: + self._provider = provider + + @property + def name(self) -> str: + return self._provider.name + + @property + def enabled(self) -> bool: + return self._provider.enabled + + async def list_tools(self) -> list[MCPTool]: + return await self._provider.list_tools() + + async def execute( + self, + tool_name: str, + parameters: dict, + ) -> MCPToolResult: + from src.services.mcp_audit_service import monotonic_ms, record_mcp_call + + audit_context = parameters.get("_mcp_audit") if isinstance(parameters, dict) else None + provider_parameters = { + key: value for key, value in parameters.items() + if key != "_mcp_audit" + } + started = monotonic_ms() + result: MCPToolResult | None = None + try: + result = await self._provider.execute(tool_name, provider_parameters) + return result + finally: + duration_ms = monotonic_ms() - started + await record_mcp_call( + mcp_server=self.name, + tool_name=tool_name, + input_params=parameters, + output_result=result.output if result else None, + duration_ms=duration_ms, + success=bool(result.success) if result else False, + error_message=result.error if result else "provider_exception", + session_id=audit_context.get("session_id") if isinstance(audit_context, dict) else None, + flywheel_node=audit_context.get("flywheel_node") if isinstance(audit_context, dict) else None, + incident_id=audit_context.get("incident_id") if isinstance(audit_context, dict) else None, + agent_role=audit_context.get("agent_role") if isinstance(audit_context, dict) else None, + ) + + async def health_check(self) -> bool: + return await self._provider.health_check() + + class ProviderRegistry: """ MCP Tool Provider 註冊中心 @@ -52,7 +106,7 @@ class ProviderRegistry: if provider.name in self._providers: raise ValueError(f"Provider '{provider.name}' already registered") - self._providers[provider.name] = provider + self._providers[provider.name] = AuditedMCPToolProvider(provider) logger.info( "provider_registered", name=provider.name, diff --git a/apps/api/src/services/ai_providers/agent_loop.py b/apps/api/src/services/ai_providers/agent_loop.py new file mode 100644 index 00000000..29b5fd09 --- /dev/null +++ b/apps/api/src/services/ai_providers/agent_loop.py @@ -0,0 +1,62 @@ +"""Shared Agent Loop helpers for AI providers with tool_use support.""" + +from __future__ import annotations + +import uuid +from typing import Any + +from src.plugins.mcp.interfaces import MCPTool, MCPToolProvider, MCPToolResult +from src.services.ai_providers.permissions import filter_tools_for_agent +from src.services.ai_providers.tool_schema import tool_by_provider_name + + +class AgentToolExecutor: + """Execute provider-safe tool names against MCP providers with audit context.""" + + def __init__( + self, + *, + available_tools: list[MCPTool], + providers: dict[str, MCPToolProvider], + agent_role: str, + incident_id: str | None = None, + session_id: str | None = None, + flywheel_node: str | None = None, + ) -> None: + self.available_tools = filter_tools_for_agent(available_tools, agent_role) + self.providers = providers + self.agent_role = agent_role + self.incident_id = incident_id + self.session_id = session_id or str(uuid.uuid4()) + self.flywheel_node = flywheel_node + + async def execute(self, provider_tool_name: str, parameters: dict[str, Any]) -> MCPToolResult: + """Resolve and execute one tool call.""" + + tool = tool_by_provider_name(self.available_tools, provider_tool_name) + if tool is None: + return MCPToolResult( + success=False, + execution_id=str(uuid.uuid4()), + error=f"Tool not allowed or unknown: {provider_tool_name}", + ) + + provider = self.providers.get(tool.server_name) + if provider is None: + return MCPToolResult( + success=False, + execution_id=str(uuid.uuid4()), + error=f"Provider not available: {tool.server_name}", + ) + + audit_context = { + "agent_role": self.agent_role, + "session_id": self.session_id, + "incident_id": self.incident_id, + "flywheel_node": self.flywheel_node, + } + tool_parameters = dict(parameters) + tool_parameters["_mcp_audit"] = { + key: value for key, value in audit_context.items() if value + } + return await provider.execute(tool.name, tool_parameters) diff --git a/apps/api/src/services/ai_providers/claude.py b/apps/api/src/services/ai_providers/claude.py index b09e1546..8665e0c4 100644 --- a/apps/api/src/services/ai_providers/claude.py +++ b/apps/api/src/services/ai_providers/claude.py @@ -18,7 +18,9 @@ import httpx import structlog from src.core.config import get_settings +from src.plugins.mcp.interfaces import MCPTool from src.services.ai_providers.interfaces import AIResult, is_provider_enabled_by_env +from src.services.ai_providers.tool_schema import anthropic_tools_for_agent from src.services.model_registry import get_model_registry logger = structlog.get_logger(__name__) @@ -164,6 +166,143 @@ class ClaudeProvider: logger.warning("claude_provider_failed", error=str(e), latency_ms=round(latency, 1)) return AIResult(raw_response="", success=False, provider=self.name, latency_ms=latency, error=str(e)) + async def analyze_with_tools( + self, + prompt: str, + available_tools: list[MCPTool], + tool_executor, + max_iterations: int = 5, + agent_role: str = "openclaw", + context: dict | None = None, + ) -> AIResult: + """Run Claude in Agent Loop mode with MCP tool_use.""" + + if not available_tools: + return await self.analyze(prompt, context=context) + if not settings.CLAUDE_API_KEY: + return AIResult(raw_response="", success=False, provider=self.name, error="CLAUDE_API_KEY not configured") + + tools_schema = anthropic_tools_for_agent(available_tools, agent_role) + if not tools_schema: + return AIResult( + raw_response="", + success=False, + provider=self.name, + error=f"No MCP tools allowed for agent_role={agent_role}", + ) + + start = time.perf_counter() + total_tokens = 0 + total_cost = 0.0 + messages: list[dict] = [{"role": "user", "content": prompt}] + + try: + client = await self._get_client() + last_text = "" + + for iteration in range(max_iterations): + response = await client.post( + "https://api.anthropic.com/v1/messages", + headers={ + "x-api-key": settings.CLAUDE_API_KEY, + "anthropic-version": "2023-06-01", + "content-type": "application/json", + }, + json={ + "model": get_model_registry().get_model("claude", "rca"), + "max_tokens": 2048, + "messages": messages, + "tools": tools_schema, + }, + timeout=30.0, + ) + response.raise_for_status() + data = response.json() + + usage = data.get("usage", {}) + input_tokens = usage.get("input_tokens", 0) + output_tokens = usage.get("output_tokens", 0) + total_tokens += input_tokens + output_tokens + total_cost += (input_tokens * 0.000001) + (output_tokens * 0.000005) + + content_blocks = data.get("content", []) + text_blocks = [ + block.get("text", "") + for block in content_blocks + if block.get("type") == "text" + ] + if text_blocks: + last_text = "\n".join(text_blocks) + + tool_uses = [ + block for block in content_blocks + if block.get("type") == "tool_use" + ] + if not tool_uses: + latency = (time.perf_counter() - start) * 1000 + logger.info( + "claude_agent_loop_success", + agent_role=agent_role, + iterations=iteration + 1, + tokens=total_tokens, + latency_ms=round(latency, 1), + ) + return AIResult( + raw_response=last_text or json.dumps(data, ensure_ascii=False), + success=True, + provider=f"{self.name}_agent_loop", + tokens=total_tokens, + cost_usd=total_cost, + latency_ms=latency, + ) + + tool_results = [] + for block in tool_uses: + tool_name = str(block.get("name", "")) + tool_input = block.get("input") or {} + result = await tool_executor(tool_name, tool_input) + tool_results.append({ + "type": "tool_result", + "tool_use_id": block.get("id"), + "content": json.dumps( + result.to_dict() if hasattr(result, "to_dict") else result, + ensure_ascii=False, + default=str, + ), + }) + + messages.append({"role": "assistant", "content": content_blocks}) + messages.append({"role": "user", "content": tool_results}) + + latency = (time.perf_counter() - start) * 1000 + return AIResult( + raw_response=last_text, + success=False, + provider=f"{self.name}_agent_loop", + tokens=total_tokens, + cost_usd=total_cost, + latency_ms=latency, + error=f"Agent loop exceeded max_iterations={max_iterations}", + ) + + except Exception as e: + latency = (time.perf_counter() - start) * 1000 + logger.warning( + "claude_agent_loop_failed", + agent_role=agent_role, + error=str(e), + latency_ms=round(latency, 1), + ) + return AIResult( + raw_response="", + success=False, + provider=f"{self.name}_agent_loop", + tokens=total_tokens, + cost_usd=total_cost, + latency_ms=latency, + error=str(e), + ) + async def health_check(self) -> bool: return bool(settings.CLAUDE_API_KEY) diff --git a/apps/api/src/services/ai_providers/interfaces.py b/apps/api/src/services/ai_providers/interfaces.py index ddd0acb0..deef3c51 100644 --- a/apps/api/src/services/ai_providers/interfaces.py +++ b/apps/api/src/services/ai_providers/interfaces.py @@ -15,7 +15,9 @@ from __future__ import annotations import os from dataclasses import dataclass, field -from typing import Any, Protocol, runtime_checkable +from typing import Any, Callable, Protocol, runtime_checkable + +from src.plugins.mcp.interfaces import MCPTool, MCPToolResult # ============================================================================= @@ -54,6 +56,26 @@ class AIResult: } +@dataclass +class ToolCallResult: + """One tool_use result returned to an Agent Loop capable model.""" + + tool_use_id: str + tool_name: str + output: Any + success: bool = True + error: str | None = None + + def to_dict(self) -> dict: + return { + "tool_use_id": self.tool_use_id, + "tool_name": self.tool_name, + "output": self.output, + "success": self.success, + "error": self.error, + } + + # ============================================================================= # AIProvider Protocol — 所有 LLM 引擎必須實作 # ============================================================================= @@ -125,6 +147,25 @@ class AIProvider(Protocol): """ ... + async def analyze_with_tools( + self, + prompt: str, + available_tools: list[MCPTool], + tool_executor: Callable[[str, dict[str, Any]], Any], + max_iterations: int = 5, + agent_role: str = "openclaw", + context: dict[str, Any] | None = None, + ) -> AIResult: + """ + Agent Loop mode: let the model request MCP tools, receive tool results, + and decide when it has enough evidence. + + Providers may implement native tool_use. Providers without native support + should fail closed or fall back to analyze(); routing code decides whether + fallback is acceptable for the current incident. + """ + ... + async def health_check(self) -> bool: """健康檢查 (供 /health endpoint 和 AIRouter 動態路由)""" ... diff --git a/apps/api/src/services/ai_providers/ollama.py b/apps/api/src/services/ai_providers/ollama.py index 49a7bdb6..66a8442e 100644 --- a/apps/api/src/services/ai_providers/ollama.py +++ b/apps/api/src/services/ai_providers/ollama.py @@ -11,13 +11,16 @@ Ollama Provider - Phase 24 ADR-052 from __future__ import annotations +import json import time import httpx import structlog from src.core.config import get_settings +from src.plugins.mcp.interfaces import MCPTool from src.services.ai_providers.interfaces import AIProvider, AIResult, is_provider_enabled_by_env +from src.services.ai_providers.tool_schema import openai_tools_for_agent from src.services.model_registry import get_model_registry logger = structlog.get_logger(__name__) @@ -42,6 +45,9 @@ class OllamaProvider: ) return self._http_client + def _endpoint_url(self) -> str: + return settings.OLLAMA_URL + @property def name(self) -> str: return "ollama" @@ -121,6 +127,129 @@ class OllamaProvider: logger.warning("ollama_provider_failed", error=str(e), latency_ms=round(latency, 1)) return AIResult(raw_response="", success=False, provider=self.name, latency_ms=latency, error=str(e)) + async def analyze_with_tools( + self, + prompt: str, + available_tools: list[MCPTool], + tool_executor, + max_iterations: int = 5, + agent_role: str = "openclaw", + context: dict | None = None, + ) -> AIResult: + """Run Ollama chat tool calling loop when the local model supports tools.""" + + if not available_tools: + return await self.analyze(prompt, context=context) + + tools_schema = openai_tools_for_agent(available_tools, agent_role) + if not tools_schema: + return AIResult( + raw_response="", + success=False, + provider=self.name, + error=f"No MCP tools allowed for agent_role={agent_role}", + ) + + start = time.perf_counter() + total_tokens = 0 + messages: list[dict] = [{"role": "user", "content": prompt}] + registry = get_model_registry() + model_name = registry.get_model("ollama", "rca") + options = registry.get_provider_options("ollama") + task_type = (context or {}).get("task_type", "") + if task_type in ("diagnose", "force_local"): + read_timeout = float(getattr(settings, "OLLAMA_DIAGNOSE_TIMEOUT_SECONDS", 200)) + else: + read_timeout = float(settings.OPENCLAW_TIMEOUT) + + try: + client = await self._get_client() + last_content = "" + for iteration in range(max_iterations): + response = await client.post( + f"{self._endpoint_url()}/api/chat", + json={ + "model": model_name, + "messages": messages, + "stream": False, + "tools": tools_schema, + "options": { + "num_predict": options.get("num_predict", 1024), + "temperature": options.get("temperature", 0.1), + "top_p": options.get("top_p", 0.9), + }, + }, + timeout=httpx.Timeout(read_timeout, connect=10.0), + ) + response.raise_for_status() + data = response.json() + total_tokens += int(data.get("eval_count", 0) or 0) + total_tokens += int(data.get("prompt_eval_count", 0) or 0) + + message = data.get("message") or {} + last_content = message.get("content") or last_content + tool_calls = message.get("tool_calls") or [] + if not tool_calls: + latency = (time.perf_counter() - start) * 1000 + return AIResult( + raw_response=last_content or json.dumps(data, ensure_ascii=False), + success=True, + provider=f"{self.name}_agent_loop", + tokens=total_tokens, + latency_ms=latency, + ) + + messages.append(message) + for call in tool_calls: + function = call.get("function") or {} + tool_name = function.get("name", "") + arguments = function.get("arguments") or {} + result = await tool_executor(tool_name, arguments) + messages.append({ + "role": "tool", + "content": json.dumps( + result.to_dict() if hasattr(result, "to_dict") else result, + ensure_ascii=False, + default=str, + ), + }) + + logger.debug( + "ollama_agent_loop_iteration", + provider=self.name, + agent_role=agent_role, + iteration=iteration + 1, + tool_calls=len(tool_calls), + ) + + latency = (time.perf_counter() - start) * 1000 + return AIResult( + raw_response=last_content, + success=False, + provider=f"{self.name}_agent_loop", + tokens=total_tokens, + latency_ms=latency, + error=f"Agent loop exceeded max_iterations={max_iterations}", + ) + + except Exception as e: + latency = (time.perf_counter() - start) * 1000 + logger.warning( + "ollama_agent_loop_failed", + provider=self.name, + agent_role=agent_role, + error=str(e), + latency_ms=round(latency, 1), + ) + return AIResult( + raw_response="", + success=False, + provider=f"{self.name}_agent_loop", + tokens=total_tokens, + latency_ms=latency, + error=str(e), + ) + async def health_check(self) -> bool: try: client = await self._get_client() @@ -164,6 +293,9 @@ class Ollama188Provider(OllamaProvider): # OLLAMA_FALLBACK_URL 空字串 → 未設定 188 節點 → 停用 return bool(getattr(settings, "OLLAMA_FALLBACK_URL", "")) + def _endpoint_url(self) -> str: + return getattr(settings, "OLLAMA_FALLBACK_URL", "") + async def analyze( self, prompt: str, diff --git a/apps/api/src/services/ai_providers/permissions.py b/apps/api/src/services/ai_providers/permissions.py new file mode 100644 index 00000000..cd24fea1 --- /dev/null +++ b/apps/api/src/services/ai_providers/permissions.py @@ -0,0 +1,91 @@ +"""Agent-role MCP tool permissions for Agent Loop mode.""" + +from __future__ import annotations + +from enum import StrEnum + +from src.plugins.mcp.interfaces import MCPTool + + +class AgentRole(StrEnum): + OPENCLAW = "openclaw" + NEMOTRON = "nemotron" + HERMES = "hermes" + ELEPHANT_ALPHA = "elephant_alpha" + + +_WRITE_TOOL_MARKERS = ( + "apply", + "create", + "delete", + "patch", + "restart", + "rollout", + "scale", + "undo", + "write", + "commit", + "notify", + "send", +) + +_K8S_READ_TOOL_MARKERS = ( + "describe", + "events", + "get", + "hpa", + "logs", + "node", + "status", + "watch", +) + + +def is_read_only_tool(tool: MCPTool) -> bool: + """Return whether a tool is considered side-effect free.""" + + name = tool.name.lower() + if any(marker in name for marker in _WRITE_TOOL_MARKERS): + return False + return True + + +def is_tool_allowed(tool: MCPTool, agent_role: str) -> bool: + """Check whether an agent role may receive a specific MCP tool.""" + + role = _normalize_role(agent_role) + server = tool.server_name.lower() + name = tool.name.lower() + + if role == AgentRole.OPENCLAW: + return True + + if role == AgentRole.NEMOTRON: + if server in {"prometheus", "grafana", "signoz"}: + return True + if server == "kubernetes": + return is_read_only_tool(tool) and any(marker in name for marker in _K8S_READ_TOOL_MARKERS) + return False + + if role == AgentRole.HERMES: + if server in {"database", "prometheus", "filesystem", "rag"}: + return is_read_only_tool(tool) or server == "database" + return False + + if role == AgentRole.ELEPHANT_ALPHA: + return is_read_only_tool(tool) + + return False + + +def filter_tools_for_agent(tools: list[MCPTool], agent_role: str) -> list[MCPTool]: + """Return only the tools allowed for an agent role.""" + + return [tool for tool in tools if is_tool_allowed(tool, agent_role)] + + +def _normalize_role(agent_role: str) -> AgentRole: + try: + return AgentRole(agent_role) + except ValueError: + return AgentRole.ELEPHANT_ALPHA diff --git a/apps/api/src/services/ai_providers/tool_schema.py b/apps/api/src/services/ai_providers/tool_schema.py new file mode 100644 index 00000000..0dfcbe6d --- /dev/null +++ b/apps/api/src/services/ai_providers/tool_schema.py @@ -0,0 +1,61 @@ +"""Convert MCP tools into LLM provider tool schemas.""" + +from __future__ import annotations + +import re + +from src.plugins.mcp.interfaces import MCPTool +from src.services.ai_providers.permissions import filter_tools_for_agent + +_SAFE_TOOL_NAME = re.compile(r"[^a-zA-Z0-9_-]+") + + +def to_provider_tool_name(tool: MCPTool) -> str: + """Return a provider-safe unique tool name.""" + + raw_name = f"{tool.server_name}__{tool.name}" + return _SAFE_TOOL_NAME.sub("_", raw_name)[:96] + + +def anthropic_tool_schema(tool: MCPTool) -> dict: + """Convert one MCPTool to Anthropic Messages API tool schema.""" + + return { + "name": to_provider_tool_name(tool), + "description": f"[{tool.server_name}] {tool.description}", + "input_schema": tool.input_schema or {"type": "object", "properties": {}}, + } + + +def openai_tool_schema(tool: MCPTool) -> dict: + """Convert one MCPTool to OpenAI/Ollama-compatible tool schema.""" + + return { + "type": "function", + "function": { + "name": to_provider_tool_name(tool), + "description": f"[{tool.server_name}] {tool.description}", + "parameters": tool.input_schema or {"type": "object", "properties": {}}, + }, + } + + +def anthropic_tools_for_agent(tools: list[MCPTool], agent_role: str) -> list[dict]: + """Return Anthropic tool schemas filtered by agent role.""" + + return [anthropic_tool_schema(tool) for tool in filter_tools_for_agent(tools, agent_role)] + + +def openai_tools_for_agent(tools: list[MCPTool], agent_role: str) -> list[dict]: + """Return OpenAI/Ollama tool schemas filtered by agent role.""" + + return [openai_tool_schema(tool) for tool in filter_tools_for_agent(tools, agent_role)] + + +def tool_by_provider_name(tools: list[MCPTool], provider_tool_name: str) -> MCPTool | None: + """Resolve provider-safe tool name back to the original MCPTool.""" + + for tool in tools: + if to_provider_tool_name(tool) == provider_tool_name: + return tool + return None diff --git a/apps/api/src/services/mcp_audit_service.py b/apps/api/src/services/mcp_audit_service.py new file mode 100644 index 00000000..bee78f06 --- /dev/null +++ b/apps/api/src/services/mcp_audit_service.py @@ -0,0 +1,175 @@ +"""MCP audit and daily usage statistics. + +Every MCP provider call should leave a durable trace that can be joined back to +incidents, flywheel nodes, and provider health. This service intentionally uses +raw SQL because the audit schema is additive and may be created by migration +before every runtime model has been refreshed. +""" + +from __future__ import annotations + +import json +import time +import uuid +from typing import Any + +import structlog +from sqlalchemy import text + +from src.db.base import get_db_context + +logger = structlog.get_logger(__name__) + +_REDACT_KEYS = {"token", "password", "secret", "api_key", "authorization", "key"} + + +def infer_flywheel_node(mcp_server: str, tool_name: str) -> str: + """Infer the flywheel node for a provider/tool pair.""" + + name = f"{mcp_server}:{tool_name}".lower() + if any(k in name for k in ("prometheus", "alert", "metric", "query_range")): + return "detect" + if any(k in name for k in ("logs", "describe", "events", "node", "hpa", "ssh_get")): + return "sense" + if any(k in name for k in ("database", "rag", "knowledge", "history")): + return "reason" + if any(k in name for k in ("blast", "risk", "approval")): + return "decide" + if any(k in name for k in ("restart", "delete", "scale", "rollout", "ssh_restart")): + return "execute" + if any(k in name for k in ("watch", "status", "health", "grafana")): + return "verify" + if any(k in name for k in ("playbook", "km", "embedding")): + return "learn" + return "govern" + + +def _redact(value: Any) -> Any: + if isinstance(value, dict): + redacted = {} + for key, item in value.items(): + if any(marker in str(key).lower() for marker in _REDACT_KEYS): + redacted[key] = "" + else: + redacted[key] = _redact(item) + return redacted + if isinstance(value, list): + return [_redact(item) for item in value] + return value + + +def _json_dumps(value: Any) -> str: + return json.dumps(_redact(value), ensure_ascii=False, default=str) + + +def _extract_incident_id(parameters: dict[str, Any]) -> str | None: + audit_context = parameters.get("_mcp_audit") + if isinstance(audit_context, dict) and audit_context.get("incident_id"): + return str(audit_context["incident_id"]) + for key in ("incident_id", "incidentId"): + value = parameters.get(key) + if value: + return str(value) + return None + + +def _extract_audit_context(parameters: dict[str, Any]) -> dict[str, Any]: + audit_context = parameters.get("_mcp_audit") + return audit_context if isinstance(audit_context, dict) else {} + + +async def record_mcp_call( + *, + mcp_server: str, + tool_name: str, + input_params: dict[str, Any], + output_result: Any | None, + duration_ms: int, + success: bool, + error_message: str | None, + session_id: str | None = None, + flywheel_node: str | None = None, + incident_id: str | None = None, + agent_role: str | None = None, +) -> None: + """Persist one MCP tool call and update daily aggregate stats.""" + + audit_context = _extract_audit_context(input_params) + session_id = session_id or audit_context.get("session_id") or str(uuid.uuid4()) + flywheel_node = flywheel_node or infer_flywheel_node(mcp_server, tool_name) + incident_id = incident_id or _extract_incident_id(input_params) + agent_role = agent_role or audit_context.get("agent_role") + + try: + async with get_db_context() as db: + await db.execute( + text( + """ + INSERT INTO mcp_audit_log ( + session_id, flywheel_node, mcp_server, tool_name, + input_params, output_result, duration_ms, success, + error_message, incident_id, agent_role + ) + VALUES ( + :session_id, :flywheel_node, :mcp_server, :tool_name, + CAST(:input_params AS jsonb), CAST(:output_result AS jsonb), + :duration_ms, :success, :error_message, :incident_id, + :agent_role + ) + """ + ), + { + "session_id": session_id, + "flywheel_node": flywheel_node, + "mcp_server": mcp_server, + "tool_name": tool_name, + "input_params": _json_dumps(input_params), + "output_result": _json_dumps(output_result), + "duration_ms": duration_ms, + "success": success, + "error_message": error_message, + "incident_id": incident_id, + "agent_role": agent_role, + }, + ) + await db.execute( + text( + """ + INSERT INTO mcp_daily_stats ( + date, mcp_server, tool_name, call_count, success_count, + avg_duration_ms + ) + VALUES ( + CURRENT_DATE, :mcp_server, :tool_name, 1, + CASE WHEN :success THEN 1 ELSE 0 END, + :duration_ms + ) + ON CONFLICT (date, mcp_server, tool_name) + DO UPDATE SET + call_count = mcp_daily_stats.call_count + 1, + success_count = mcp_daily_stats.success_count + + CASE WHEN EXCLUDED.success_count > 0 THEN 1 ELSE 0 END, + avg_duration_ms = ( + (mcp_daily_stats.avg_duration_ms * mcp_daily_stats.call_count) + + EXCLUDED.avg_duration_ms + ) / (mcp_daily_stats.call_count + 1) + """ + ), + { + "mcp_server": mcp_server, + "tool_name": tool_name, + "success": success, + "duration_ms": duration_ms, + }, + ) + except Exception as exc: + logger.warning( + "mcp_audit_write_failed", + mcp_server=mcp_server, + tool_name=tool_name, + error=str(exc), + ) + + +def monotonic_ms() -> int: + return int(time.monotonic() * 1000) diff --git a/apps/api/src/services/mcp_tool_registry.py b/apps/api/src/services/mcp_tool_registry.py index bc3e323b..5d8e0b34 100644 --- a/apps/api/src/services/mcp_tool_registry.py +++ b/apps/api/src/services/mcp_tool_registry.py @@ -28,6 +28,7 @@ from typing import Any import structlog from src.plugins.mcp.interfaces import MCPTool, MCPToolProvider +from src.plugins.mcp.registry import AuditedMCPToolProvider logger = structlog.get_logger(__name__) @@ -110,7 +111,8 @@ class MCPToolRegistry: count = 0 for tool in tools: - reg = _classify_tool(tool, provider) + audited_provider = AuditedMCPToolProvider(provider) + reg = _classify_tool(tool, audited_provider) self._tools.append(reg) count += 1 @@ -342,28 +344,25 @@ def _build_providers() -> list[MCPToolProvider]: 安全原則:各 Provider 的 enabled 屬性由環境變數控制, 不可用的 Provider 在 register_provider() 中會被靜默跳過。 """ - from src.plugins.mcp.providers.k8s_provider import K8sProvider - from src.plugins.mcp.providers.prometheus_provider import PrometheusProvider - from src.plugins.mcp.providers.ssh_provider import SSHProvider - providers: list[MCPToolProvider] = [] + provider_specs = [ + ("k8s", "src.plugins.mcp.providers.k8s_provider", "K8sProvider"), + ("ssh", "src.plugins.mcp.providers.ssh_provider", "SSHProvider"), + ("prometheus", "src.plugins.mcp.providers.prometheus_provider", "PrometheusProvider"), + ("signoz", "src.plugins.mcp.providers.signoz_provider", "SignOzProvider"), + ("database", "src.plugins.mcp.providers.database_provider", "DatabaseProvider"), + ("filesystem", "src.plugins.mcp.providers.filesystem_provider", "FilesystemProvider"), + ("grafana", "src.plugins.mcp.providers.grafana_provider", "GrafanaProvider"), + ("rag", "src.plugins.mcp.providers.rag_provider", "RAGProvider"), + ("argocd", "src.plugins.mcp.providers.argocd_provider", "ArgoCDProvider"), + ("sentry", "src.plugins.mcp.providers.sentry_provider", "SentryProvider"), + ] - # K8s Provider (D1: Pod 狀態/事件/日誌) - try: - providers.append(K8sProvider()) - except Exception: - logger.warning("mcp_registry_k8s_provider_init_failed") - - # SSH Provider (D1/D2/D3: 主機層感官) - try: - providers.append(SSHProvider()) - except Exception: - logger.warning("mcp_registry_ssh_provider_init_failed") - - # Prometheus Provider (D3: 時序指標) - try: - providers.append(PrometheusProvider()) - except Exception: - logger.warning("mcp_registry_prometheus_provider_init_failed") + for provider_name, module_name, class_name in provider_specs: + try: + module = __import__(module_name, fromlist=[class_name]) + providers.append(getattr(module, class_name)()) + except Exception: + logger.warning("mcp_registry_provider_init_failed", provider=provider_name) return providers diff --git a/apps/api/tests/test_agent_loop_foundation.py b/apps/api/tests/test_agent_loop_foundation.py new file mode 100644 index 00000000..a5b94396 --- /dev/null +++ b/apps/api/tests/test_agent_loop_foundation.py @@ -0,0 +1,119 @@ +import pytest + +from src.plugins.mcp.interfaces import MCPTool, MCPToolProvider, MCPToolResult +from src.plugins.mcp.registry import AuditedMCPToolProvider +from src.services.ai_providers.agent_loop import AgentToolExecutor +from src.services.ai_providers.permissions import filter_tools_for_agent, is_tool_allowed +from src.services.ai_providers.tool_schema import ( + anthropic_tool_schema, + openai_tool_schema, + to_provider_tool_name, + tool_by_provider_name, +) + + +class FakeProvider(MCPToolProvider): + def __init__(self, name="kubernetes"): + self.calls = [] + self._name = name + + @property + def name(self): + return self._name + + async def list_tools(self): + return [] + + async def execute(self, tool_name, parameters): + self.calls.append((tool_name, parameters)) + return MCPToolResult(success=True, execution_id="exec-1", output={"ok": True}) + + +def _tool(server: str, name: str) -> MCPTool: + return MCPTool( + name=name, + description=f"{server}.{name}", + input_schema={"type": "object", "properties": {}}, + server_name=server, + ) + + +def test_agent_tool_permissions_are_role_scoped(): + k8s_get = _tool("kubernetes", "kubectl_get") + k8s_restart = _tool("kubernetes", "kubectl_restart") + prom_query = _tool("prometheus", "prometheus_query") + db_tool = _tool("database", "write_km_entry") + + assert is_tool_allowed(k8s_restart, "openclaw") is True + assert is_tool_allowed(k8s_get, "nemotron") is True + assert is_tool_allowed(k8s_restart, "nemotron") is False + assert is_tool_allowed(prom_query, "hermes") is True + assert is_tool_allowed(db_tool, "hermes") is True + assert is_tool_allowed(k8s_restart, "elephant_alpha") is False + + filtered = filter_tools_for_agent([k8s_get, k8s_restart, prom_query], "nemotron") + assert [tool.name for tool in filtered] == ["kubectl_get", "prometheus_query"] + + +def test_tool_schema_round_trips_provider_safe_names(): + tool = _tool("kubernetes", "kubectl_get") + + safe_name = to_provider_tool_name(tool) + + assert safe_name == "kubernetes__kubectl_get" + assert anthropic_tool_schema(tool)["name"] == safe_name + assert openai_tool_schema(tool)["function"]["name"] == safe_name + assert tool_by_provider_name([tool], safe_name) is tool + + +@pytest.mark.asyncio +async def test_audited_provider_strips_internal_audit_context(monkeypatch): + audit_calls = [] + + async def fake_record_mcp_call(**kwargs): + audit_calls.append(kwargs) + + monkeypatch.setattr( + "src.services.mcp_audit_service.record_mcp_call", + fake_record_mcp_call, + ) + + provider = FakeProvider() + audited = AuditedMCPToolProvider(provider) + + result = await audited.execute( + "kubectl_get", + { + "resource": "pods", + "_mcp_audit": { + "agent_role": "openclaw", + "session_id": "session-1", + "incident_id": "INC-1", + "flywheel_node": "sense", + }, + }, + ) + + assert result.success is True + assert provider.calls == [("kubectl_get", {"resource": "pods"})] + assert audit_calls[0]["agent_role"] == "openclaw" + assert audit_calls[0]["session_id"] == "session-1" + assert audit_calls[0]["incident_id"] == "INC-1" + + +@pytest.mark.asyncio +async def test_agent_tool_executor_blocks_disallowed_tool(): + restart_tool = _tool("kubernetes", "kubectl_restart") + provider = FakeProvider() + executor = AgentToolExecutor( + available_tools=[restart_tool], + providers={"kubernetes": provider}, + agent_role="nemotron", + incident_id="INC-1", + ) + + result = await executor.execute("kubernetes__kubectl_restart", {"deployment": "api"}) + + assert result.success is False + assert "not allowed" in (result.error or "") + assert provider.calls == [] diff --git a/docs/LOGBOOK.md b/docs/LOGBOOK.md index e155d206..00a93cc1 100644 --- a/docs/LOGBOOK.md +++ b/docs/LOGBOOK.md @@ -21,6 +21,23 @@ Claude Code 成本評估指出真正瓶頸不是外部 AI 費用,而是同一 - `python3 -m py_compile apps/api/src/api/v1/webhooks.py apps/api/src/services/openclaw.py` 通過。 - `cd apps/api && pytest tests/test_alertmanager_rule_bypass.py tests/test_openclaw_cache_key.py tests/test_callback_dispatcher.py tests/test_telegram_button_consistency.py -q` → 60 passed。 +## 2026-05-01 | MCP Agent Loop 地基 — audit + 權限矩陣 + +承接 Claude Code MCP/Agent Loop 盤點。Repo 實際已有 K8s/SSH/Prometheus/Database/Grafana/RAG/Filesystem/ArgoCD/Sentry MCP provider 與 PreDecisionInvestigator;缺口是統一 audit、Agent tool_use loop、角色權限邊界,而不是從零安裝 Kubernetes MCP。 + +### 完成 +- 新增 ADR-105,定義 OpenClaw/NemoTron/Hermes/ElephantAlpha 的 MCP 工具權限與 Agent Loop 漸進接線策略。 +- 新增 `mcp_audit_log`、`mcp_daily_stats`、`k8s_state_snapshots`、`prometheus_snapshots` additive migration;`incident_id` 採 `VARCHAR(64)` 對齊現有 `INC-*` ID。 +- MCP provider registry 與 PreDecisionInvestigator tool registry 皆包上 audited provider,MCP tool call 會寫 server/tool/latency/success/error/incident/session/agent_role。 +- 新增 `AIProvider.analyze_with_tools()` 介面、`ToolCallResult`、四 Agent 權限矩陣、Anthropic/OpenAI/Ollama tool schema 轉換、`AgentToolExecutor`。 +- ClaudeProvider 實作 Anthropic tool_use loop;OllamaProvider 實作 `/api/chat` tools loop。現階段先作 capability foundation,DecisionManager 主路徑仍維持 pre-gathering fallback。 +- `MCPToolRegistry._build_providers()` 從 K8s/SSH/Prometheus 擴為現有 10 個 provider,使 PDI 能真正看見 DB/RAG/Grafana/Filesystem/SignOz/ArgoCD/Sentry。 +- 內部 MCP RAG 評估:不另建獨立 DB,沿用 PostgreSQL + pgvector + Redis hot cache;只有量級/隔離需求逼近瓶頸時再拆專用 vector DB。 + +### 驗證 +- `python3 -m py_compile` 針對 MCP registry/audit、AI provider interface、Agent Loop、Claude/Ollama provider、DB models 通過。 +- `cd apps/api && pytest tests/test_agent_loop_foundation.py tests/test_mcp_tool_registry.py tests/test_callback_dispatcher.py tests/test_openclaw_cache_key.py -q` → 64 passed。 + ## 2026-05-01 | HostBackupFailed rule-first e2e 補洞 Live e2e 用 `HostBackupFailed` 打 Alertmanager 後發現 aged backup 告警會被分類成 `backup_failure`,未命中原本只允許 `host_resource` 的 rule-first gate,導致又進 OpenClaw LLM。 diff --git a/docs/adr/ADR-105-mcp-agent-loop-governance.md b/docs/adr/ADR-105-mcp-agent-loop-governance.md new file mode 100644 index 00000000..04763fe7 --- /dev/null +++ b/docs/adr/ADR-105-mcp-agent-loop-governance.md @@ -0,0 +1,72 @@ +# ADR-105: MCP Agent Loop Governance + +**狀態**: Accepted +**日期**: 2026-05-01 +**範圍**: MCP audit, Agent Loop, OpenClaw/NemoTron/Hermes/ElephantAlpha tool permissions + +## 背景 + +AWOOOI 已有 MCP provider registry、K8s/SSH/Prometheus/Database/Grafana/RAG/Filesystem 等 provider、PreDecisionInvestigator、EvidenceSnapshot、PostExecutionVerifier。缺口不是「從零安裝 MCP」,而是: + +- MCP 呼叫沒有統一 audit trail 與每日統計。 +- PreDecisionInvestigator 是 Python pre-gathering 模式,LLM 看到結果但看不到 tool_use 過程。 +- 不同 AI Agent 沒有可驗證的工具權限邊界。 +- LLM retry storm 修掉後,需要讓真正的新 incident 能用 Agent Loop 深查,而不是回到靜態 prompt。 + +## 決策 + +### D1 — 保留 pre-gathering,新增 Agent Loop + +PreDecisionInvestigator 仍是穩定 fallback。Agent Loop 作為 provider capability 漸進接入: + +```text +Alert -> PreDecisionInvestigator -> EvidenceSnapshot -> LLM text decision +Alert -> Agent Loop -> LLM tool_use -> MCP tool_result -> Final Decision +``` + +任何 Agent Loop 超過 `max_iterations`、provider 不支援 tool_use、或工具權限不足時,必須降級回既有 pre-gathering / rule-first / Playbook RAG 路徑。 + +### D2 — 四 Agent 工具權限矩陣 + +| Agent | 允許 | 禁止 | +|-------|------|------| +| OpenClaw | 所有已註冊 MCP tool,由既有 safety gates 控制執行 | 無額外禁用 | +| NemoTron | Prometheus/Grafana/SignOz,K8s read-only | K8s restart/delete/scale/apply/patch | +| Hermes | Database、RAG、Filesystem、Prometheus | K8s/SSH 執行類操作 | +| ElephantAlpha | 全部 read-only | 任何寫入/通知/執行操作 | + +權限定義在 `apps/api/src/services/ai_providers/permissions.py`,provider schema 轉換前必須先過濾。 + +### D3 — 統一 MCP audit + +所有 registry provider 與 PDI tool registry provider 都包成 audited provider,寫入: + +- `mcp_audit_log` +- `mcp_daily_stats` + +`incident_id` 使用 `VARCHAR(64)`,因為 AWOOOI incident id 是 `INC-*` 字串,不是 UUID。Audit 必須包含 `agent_role`、`session_id`、`flywheel_node`、provider/tool、input/output、latency、success/error。 + +### D4 — MCP RAG 不獨立建 DB + +內部 MCP RAG 沿用現有 PostgreSQL + pgvector + Redis hot cache: + +- `knowledge_entries` +- `rag_chunks` +- `playbook_embeddings` +- existing KMWriter / KnowledgeRAG / PlaybookRAG + +不另建獨立資料庫。原因:一致性、備份、incident/playbook join、權限治理都在同一 Postgres 邊界內已經存在。只有當向量資料量、查詢延遲、或多租戶隔離需要獨立擴展時,才評估外移專用 vector DB。 + +## 落地順序 + +1. P0: audit schema、agent_role、權限矩陣、tool schema、AgentToolExecutor。 +2. P1: Claude/Ollama provider 接 `analyze_with_tools()`,但 DecisionManager 先 feature flag,不直接切主路徑。 +3. P2: OpenClaw/NemoTron/Hermes/ElephantAlpha 逐一接線,先 read-only / diagnose,再開執行類工具。 +4. P3: Langfuse spans、Grafana MCP dashboard、audit replay。 + +## 驗收 + +- `mcp_audit_log` 24h call count > 0。 +- 每筆 Agent Loop tool_use 有 `agent_role`。 +- NemoTron / ElephantAlpha 無法取得 K8s 寫入工具。 +- LLM ghost-loop 不因 Agent Loop 重啟:仍受 Alertmanager in-flight lock、stable cache、provider circuit breaker、max_iterations 控制。