Files
awoooi/apps/api/src/plugins/mcp/mcp_bridge.py
Your Name 14bf86a462 fix(awooop): Phase 2 初批 P0 修正 + Phase 1 Task 1.7 integration tests
## P0 安全 / 架構修正

### P0-08 telemetry.py — 移除硬碼 IP assert(ADR-121)
- config.py:新增 OTEL_ALLOWED_ENDPOINTS(預設 192.168.0.188)+ OTEL_FORBIDDEN_ENDPOINTS
- telemetry.py:_validate_endpoint() 改為 config-driven allowlist/forbidlist
- EwoooC 可用 env 覆寫 OTEL_ALLOWED_ENDPOINTS 指向自己的 SigNoz host

### P0-13 mcp_bridge.py — K8s namespace 由 settings 提供
- config.py:新增 AWOOOI_K8S_NAMESPACE(預設 "awoooi-prod")
- mcp_bridge.py:5 處 parameters.get("namespace", "awoooi-prod") → settings.AWOOOI_K8S_NAMESPACE
- EwoooC/Tsenyang 可設自己的 namespace

### P1-24 decision_manager.py — silence key 常數統一
- 新增 from src.services.telegram_gateway import SILENCE_KEY_PREFIX
- f"telegram_silence:{target}" → f"{SILENCE_KEY_PREFIX}{target}"
- 消除跨兩處重複定義(ADR-118 No Island Coding 原則)

## Phase 1 Task 1.7 Integration Tests
- tests/integration/test_awooop_phase1_schema.py:31 個測試案例
  - awooop_projects CHECK 約束(4 cases)
  - revision 不可變性 trigger(5 cases:draft 可改、published 鎖住、身份欄不可改、非法流轉、DELETE 禁止)
  - awooop_published_revisions VIEW draft/published 隔離(2 cases)
  - active_pointer_guard(3 cases:不可指向 draft、可指向 active、跨租戶 mismatch)
  - RLS fail-closed(3 cases:未設/錯設/正確設 project_id)
  - outbox FK + dedup(2 cases)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-04 13:46:19 +08:00

937 lines
36 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
MCP Bridge - AI 與外部工具橋樑
Phase 3: 企業功能 - ADR-001 MCP 協議採用
核心功能:
1. list_tools(server_name) - 動態獲取 MCP Server 工具清單
2. call_tool(server_name, tool_name, parameters) - 執行工具
資安機制:
- Rehydration: 執行前將 [IP_1] 還原為真實值
- 符合 leWOOOgo ActionExecutor 介面
MCP Protocol Spec: https://modelcontextprotocol.io/
"""
import logging
import re
import uuid
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Any
import httpx
from src.core.config import settings # P0-13: K8s namespace 由 settings.AWOOOI_K8S_NAMESPACE 提供
from src.utils.timezone import now_taipei
logger = logging.getLogger(__name__)
# ==================== Types ====================
class MCPTransport(str, Enum):
"""MCP 傳輸方式"""
STDIO = "stdio" # 標準輸入輸出 (本地程式)
HTTP = "http" # HTTP/SSE (遠端服務)
WEBSOCKET = "ws" # WebSocket (即時雙向)
@dataclass
class MCPTool:
"""MCP 工具定義"""
name: str
description: str
input_schema: dict[str, Any]
server_name: str
@dataclass
class MCPToolResult:
"""工具執行結果 (符合 ActionResult 介面)"""
success: bool
execution_id: str
output: Any | None = None
error: str | None = None
duration: float = 0.0
timestamp: datetime = field(default_factory=now_taipei)
def to_dict(self) -> dict:
return {
"success": self.success,
"executionId": self.execution_id,
"output": self.output,
"error": self.error,
"duration": self.duration,
"timestamp": self.timestamp.isoformat(),
}
@dataclass
class MCPServer:
"""MCP Server 配置"""
name: str
transport: MCPTransport
endpoint: str # 執行檔路徑 (stdio) 或 URL (http/ws)
args: list[str] = field(default_factory=list)
env: dict[str, str] = field(default_factory=dict)
enabled: bool = True
# ==================== Rehydration Engine ====================
class RehydrationEngine:
"""
資安標籤還原器
將 Privacy Shield 產生的 [IP_1], [EMAIL_1], [SECRET_1] 等標籤
還原為真實值,以便 MCP Tool 執行
"""
# 標籤格式: [TYPE_N]
LABEL_PATTERN = re.compile(r'\[(IP|EMAIL|SECRET|CC|PHONE|ID)_(\d+)\]')
def unredact(
self,
data: Any,
mapping: dict[str, str],
) -> Any:
"""
還原脫敏資料
Args:
data: 可能包含脫敏標籤的資料 (str, dict, list)
mapping: 原始值 → 標籤 的映射表 (來自 Privacy Shield)
Returns:
還原後的資料
"""
# 反轉映射: 標籤 → 原始值
reverse_mapping = {v: k for k, v in mapping.items()}
return self._recursive_unredact(data, reverse_mapping)
def _recursive_unredact(
self,
data: Any,
reverse_mapping: dict[str, str],
) -> Any:
"""遞迴還原各種資料結構"""
if isinstance(data, str):
return self._unredact_string(data, reverse_mapping)
elif isinstance(data, dict):
return {
k: self._recursive_unredact(v, reverse_mapping)
for k, v in data.items()
}
elif isinstance(data, list):
return [
self._recursive_unredact(item, reverse_mapping)
for item in data
]
else:
return data
def _unredact_string(
self,
text: str,
reverse_mapping: dict[str, str],
) -> str:
"""
還原字串中的標籤
⚠️ 重要: 按標籤長度從長到短排序替換
避免 [IP_1] 被先替換而污染 [IP_10] → 結果變成 "192.168.1.1000"
"""
result = text
# 按標籤長度降序排序,確保 [IP_10] 先於 [IP_1] 處理
sorted_labels = sorted(
reverse_mapping.items(),
key=lambda x: len(x[0]),
reverse=True,
)
for label, original in sorted_labels:
# 使用精準邊界匹配,避免部分替換
result = result.replace(label, original)
return result
def validate_no_labels(self, data: Any) -> tuple[bool, list[str]]:
"""
驗證資料中是否還有未還原的標籤
Returns:
(is_clean, remaining_labels)
"""
remaining = []
self._find_labels(data, remaining)
return len(remaining) == 0, remaining
def _find_labels(self, data: Any, found: list[str]) -> None:
"""遞迴搜尋標籤"""
if isinstance(data, str):
matches = self.LABEL_PATTERN.findall(data)
for match in matches:
label = f"[{match[0]}_{match[1]}]"
if label not in found:
found.append(label)
elif isinstance(data, dict):
for v in data.values():
self._find_labels(v, found)
elif isinstance(data, list):
for item in data:
self._find_labels(item, found)
# ==================== MCP Bridge ====================
class MCPBridge:
"""
MCP 協議橋樑
連接 AI 與外部 MCP Server實現動態工具調用
符合 leWOOOgo ActionExecutor 介面設計
"""
def __init__(self):
self.rehydrator = RehydrationEngine()
self._servers: dict[str, MCPServer] = {}
self._tool_cache: dict[str, list[MCPTool]] = {}
self._http_client = httpx.AsyncClient(timeout=30.0)
# 註冊 MCP Servers (Phase 13.2: 整合真實服務)
self._register_servers()
def _register_servers(self) -> None:
"""註冊 MCP Servers"""
self._servers["kubernetes"] = MCPServer(
name="kubernetes",
transport=MCPTransport.HTTP,
endpoint="http://localhost:8081/mcp",
)
self._servers["signoz"] = MCPServer(
name="signoz",
transport=MCPTransport.HTTP,
endpoint="http://192.168.0.188:3301", # SignOz Query Service
)
# Phase 13.2 #82: Filesystem Provider (安全受限)
self._servers["filesystem"] = MCPServer(
name="filesystem",
transport=MCPTransport.HTTP, # 改用 HTTP由 Provider 處理
endpoint="internal://filesystem-provider",
)
self._servers["database"] = MCPServer(
name="database",
transport=MCPTransport.HTTP,
endpoint="http://localhost:8082/mcp",
)
def register_server(self, server: MCPServer) -> None:
"""註冊 MCP Server"""
self._servers[server.name] = server
logger.info(f"MCP Server registered: {server.name} ({server.transport.value})")
async def list_tools(self, server_name: str) -> list[MCPTool]:
"""
動態獲取 MCP Server 工具清單
Args:
server_name: MCP Server 名稱
Returns:
可用工具列表
"""
if server_name not in self._servers:
raise ValueError(f"Unknown MCP Server: {server_name}")
# 快取檢查
if server_name in self._tool_cache:
return self._tool_cache[server_name]
server = self._servers[server_name]
tools = await self._fetch_tools(server)
self._tool_cache[server_name] = tools
return tools
async def _fetch_tools(self, server: MCPServer) -> list[MCPTool]:
"""從 MCP Server 獲取工具清單"""
if server.transport == MCPTransport.HTTP:
return await self._fetch_tools_http(server)
elif server.transport == MCPTransport.STDIO:
return await self._fetch_tools_stdio(server)
else:
raise NotImplementedError(f"Transport not supported: {server.transport}")
async def _fetch_tools_http(self, server: MCPServer) -> list[MCPTool]:
"""HTTP 方式獲取工具 (Mock 實作)"""
# Phase 3: Mock 回傳,實際連接待 MCP Server 部署
mock_tools = {
"kubernetes": [
MCPTool(
name="kubectl_get",
description="Get Kubernetes resources",
input_schema={
"type": "object",
"properties": {
"resource": {"type": "string"},
"namespace": {"type": "string"},
"name": {"type": "string"},
},
"required": ["resource"],
},
server_name=server.name,
),
MCPTool(
name="kubectl_delete",
description="Delete Kubernetes resources",
input_schema={
"type": "object",
"properties": {
"resource": {"type": "string"},
"namespace": {"type": "string"},
"name": {"type": "string"},
},
"required": ["resource", "name"],
},
server_name=server.name,
),
MCPTool(
name="kubectl_scale",
description="Scale Kubernetes deployment",
input_schema={
"type": "object",
"properties": {
"deployment": {"type": "string"},
"namespace": {"type": "string"},
"replicas": {"type": "integer"},
},
"required": ["deployment", "replicas"],
},
server_name=server.name,
),
MCPTool(
name="kubectl_restart",
description="Restart Kubernetes deployment (rollout restart)",
input_schema={
"type": "object",
"properties": {
"deployment": {"type": "string"},
"namespace": {"type": "string"},
},
"required": ["deployment"],
},
server_name=server.name,
),
],
"signoz": [
MCPTool(
name="gold_metrics",
description="Get Gold Metrics (RPS, Error Rate, P99 Latency) for a service",
input_schema={
"type": "object",
"properties": {
"service_name": {"type": "string", "description": "Service name (e.g., api-gateway)"},
"namespace": {"type": "string", "description": "K8s namespace (default: awoooi-prod)"},
"time_window_minutes": {"type": "integer", "description": "Time window in minutes (default: 10)"},
},
"required": ["service_name"],
},
server_name=server.name,
),
MCPTool(
name="trace_url",
description="Generate SignOz trace URL for debugging a service",
input_schema={
"type": "object",
"properties": {
"service_name": {"type": "string", "description": "Service name"},
"window_minutes": {"type": "integer", "description": "Time window ± minutes (default: 5)"},
},
"required": ["service_name"],
},
server_name=server.name,
),
MCPTool(
name="system_metrics",
description="Get system metrics (CPU, Disk I/O) from SignOz",
input_schema={
"type": "object",
"properties": {
"host": {"type": "string", "description": "Host IP (default: 192.168.0.188)"},
"time_window_minutes": {"type": "integer", "description": "Time window in minutes (default: 5)"},
},
},
server_name=server.name,
),
],
"database": [
MCPTool(
name="list_approvals",
description="List approval requests with optional status filter",
input_schema={
"type": "object",
"properties": {
"status": {"type": "string", "description": "Filter by status: pending, approved, rejected, expired"},
"limit": {"type": "integer", "description": "Max results (default: 20)"},
},
},
server_name=server.name,
),
MCPTool(
name="get_approval",
description="Get a single approval by ID",
input_schema={
"type": "object",
"properties": {
"approval_id": {"type": "string", "description": "Approval UUID"},
},
"required": ["approval_id"],
},
server_name=server.name,
),
MCPTool(
name="list_incidents",
description="List incidents with optional status filter",
input_schema={
"type": "object",
"properties": {
"status": {"type": "string", "description": "Filter by status: active, resolved, escalated"},
"limit": {"type": "integer", "description": "Max results (default: 20)"},
},
},
server_name=server.name,
),
MCPTool(
name="list_timeline",
description="List recent timeline events",
input_schema={
"type": "object",
"properties": {
"limit": {"type": "integer", "description": "Max results (default: 50)"},
},
},
server_name=server.name,
),
],
}
return mock_tools.get(server.name, [])
async def _fetch_tools_stdio(self, server: MCPServer) -> list[MCPTool]:
"""STDIO 方式獲取工具 (Mock 實作)"""
# Phase 3: Mock 回傳
return [
MCPTool(
name="read_file",
description="Read file contents",
input_schema={
"type": "object",
"properties": {"path": {"type": "string"}},
"required": ["path"],
},
server_name=server.name,
),
MCPTool(
name="write_file",
description="Write file contents",
input_schema={
"type": "object",
"properties": {
"path": {"type": "string"},
"content": {"type": "string"},
},
"required": ["path", "content"],
},
server_name=server.name,
),
]
# ╔════════════════════════════════════════════════════════════════╗
# ║ ⚠️ SECURITY CRITICAL - DO NOT LOG REHYDRATED PARAMETERS ⚠️ ║
# ║ ║
# ║ After rehydration, `parameters` contains REAL sensitive ║
# ║ data (IPs, emails, secrets). Logging them defeats the ║
# ║ entire purpose of Privacy Shield. ║
# ║ ║
# ║ ALLOWED: logger.info(f"Calling {tool_name}") ║
# ║ FORBIDDEN: logger.info(f"Params: {parameters}") ║
# ╚════════════════════════════════════════════════════════════════╝
async def call_tool(
self,
server_name: str,
tool_name: str,
parameters: dict[str, Any],
redaction_mapping: dict[str, str] | None = None,
) -> MCPToolResult:
"""
執行 MCP 工具
⚠️ 資安關鍵路徑:
1. Rehydration - 還原脫敏標籤為真實值
2. 驗證 - 確保無殘留標籤
3. 執行 - 調用 MCP Server
4. 結果 - 返回 ActionResult 格式
⛔ 禁止 logging 任何已 rehydrate 的 parameters
Args:
server_name: MCP Server 名稱
tool_name: 工具名稱
parameters: 工具參數 (可能包含脫敏標籤)
redaction_mapping: Privacy Shield 映射表 (原始值 → 標籤)
Returns:
MCPToolResult (符合 ActionResult 介面)
"""
execution_id = str(uuid.uuid4())
start_time = now_taipei()
try:
# ========================================
# 1. Rehydration: 還原脫敏標籤
# ========================================
if redaction_mapping:
logger.info(f"[{execution_id}] Rehydrating {len(redaction_mapping)} labels")
parameters = self.rehydrator.unredact(parameters, redaction_mapping)
# ========================================
# 2. 驗證: 確保無殘留標籤
# ========================================
is_clean, remaining = self.rehydrator.validate_no_labels(parameters)
if not is_clean:
logger.error(f"[{execution_id}] Unrehydrated labels found: {remaining}")
return MCPToolResult(
success=False,
execution_id=execution_id,
error=f"Security violation: Unrehydrated labels found: {remaining}",
duration=self._calc_duration(start_time),
)
# ========================================
# 3. 執行: 調用 MCP Server
# ========================================
logger.info(f"[{execution_id}] Calling {server_name}.{tool_name}")
if server_name not in self._servers:
raise ValueError(f"Unknown MCP Server: {server_name}")
server = self._servers[server_name]
result = await self._execute_tool(server, tool_name, parameters)
# ========================================
# 4. 結果: 返回 ActionResult 格式
# ========================================
return MCPToolResult(
success=True,
execution_id=execution_id,
output=result,
duration=self._calc_duration(start_time),
)
except Exception as e:
logger.error(f"[{execution_id}] Tool execution failed: {e}")
return MCPToolResult(
success=False,
execution_id=execution_id,
error=str(e),
duration=self._calc_duration(start_time),
)
async def _execute_tool(
self,
server: MCPServer,
tool_name: str,
parameters: dict[str, Any],
) -> Any:
"""執行 MCP 工具 (實際調用)"""
if server.transport == MCPTransport.HTTP:
return await self._execute_http(server, tool_name, parameters)
elif server.transport == MCPTransport.STDIO:
return await self._execute_stdio(server, tool_name, parameters)
else:
raise NotImplementedError(f"Transport not supported: {server.transport}")
async def _execute_http(
self,
server: MCPServer,
tool_name: str,
parameters: dict[str, Any],
) -> Any:
"""
HTTP 方式執行工具
ADR-015 重構: 透過 ProviderRegistry 委派執行
不再直接 import services符合 leWOOOgo 積木化原則
"""
# =============================================
# ADR-015: 使用 Provider Registry (DI 模式)
# =============================================
from src.plugins.mcp.registry import get_provider
provider = get_provider(server.name)
if provider:
result = await provider.execute(tool_name, parameters)
if result.success:
return result.output
else:
return {"error": result.error}
# =============================================
# Fallback: 舊邏輯 (逐步遷移後刪除)
# =============================================
# Kubernetes: 使用真實 ActionExecutor
# =============================================
if server.name == "kubernetes":
from src.services.executor import get_executor
executor = get_executor()
if tool_name == "kubectl_get":
# 使用 kubectl 指令查詢
namespace = parameters.get("namespace", settings.AWOOOI_K8S_NAMESPACE)
resource = parameters.get("resource", "pods")
name = parameters.get("name", "")
cmd = f"kubectl get {resource} {name} -n {namespace} -o json".strip()
result = await executor.execute_kubectl_command(cmd)
if result.success and result.k8s_response:
return result.k8s_response.get("stdout", "")
return {"error": result.error}
elif tool_name == "kubectl_delete":
namespace = parameters.get("namespace", settings.AWOOOI_K8S_NAMESPACE)
resource = parameters.get("resource", "pod")
name = parameters.get("name", "")
if not name:
return {"error": "Missing 'name' parameter"}
# Dry-run 驗證
if resource == "pod":
dry_run = await executor.validate_pod_exists(name, namespace)
else:
dry_run = await executor.validate_deployment_exists(name, namespace)
if not dry_run.passed:
return {"error": dry_run.message, "dry_run": False}
# 執行刪除
if resource == "pod":
result = await executor.delete_pod(name, namespace)
else:
# deployment 不支援直接刪除,改用 restart
return {"error": "Direct deployment deletion not supported, use restart"}
return {
"success": result.success,
"message": result.message,
"duration_ms": result.duration_ms,
}
elif tool_name == "kubectl_scale":
namespace = parameters.get("namespace", settings.AWOOOI_K8S_NAMESPACE)
deployment = parameters.get("deployment", "")
replicas = parameters.get("replicas", 1)
if not deployment:
return {"error": "Missing 'deployment' parameter"}
cmd = f"kubectl scale deployment/{deployment} --replicas={replicas} -n {namespace}"
result = await executor.execute_kubectl_command(cmd)
return {
"success": result.success,
"scaled": result.success,
"replicas": replicas,
"message": result.message,
}
elif tool_name == "kubectl_restart":
namespace = parameters.get("namespace", settings.AWOOOI_K8S_NAMESPACE)
deployment = parameters.get("deployment", "")
if not deployment:
return {"error": "Missing 'deployment' parameter"}
dry_run = await executor.validate_deployment_exists(deployment, namespace)
if not dry_run.passed:
return {"error": dry_run.message, "dry_run": False}
result = await executor.restart_deployment(deployment, namespace)
return {
"success": result.success,
"restarted": result.success,
"message": result.message,
"duration_ms": result.duration_ms,
}
else:
return {"error": f"Unknown kubernetes tool: {tool_name}"}
# =============================================
# SignOz: 監控指標查詢 (Phase 13.2 #79)
# =============================================
elif server.name == "signoz":
from src.services.signoz_client import get_signoz_client
signoz = get_signoz_client()
if tool_name == "gold_metrics":
# 取得 Gold Metrics (RPS, Error Rate, P99)
service_name = parameters.get("service_name", "")
if not service_name:
return {"error": "Missing 'service_name' parameter"}
namespace = parameters.get("namespace", settings.AWOOOI_K8S_NAMESPACE)
time_window = parameters.get("time_window_minutes", 10)
metrics = await signoz.get_gold_metrics(
service_name=service_name,
namespace=namespace,
time_window_minutes=time_window,
)
return {
"service_name": metrics.service_name,
"namespace": metrics.namespace,
"rps": round(metrics.rps, 2),
"rps_trend": metrics.rps_trend,
"error_rate": round(metrics.error_rate, 2),
"error_count": metrics.error_count,
"total_requests": metrics.total_requests,
"p50_latency_ms": round(metrics.p50_latency_ms, 1),
"p95_latency_ms": round(metrics.p95_latency_ms, 1),
"p99_latency_ms": round(metrics.p99_latency_ms, 1),
"latency_trend": metrics.latency_trend,
"summary": metrics.to_summary(),
}
elif tool_name == "trace_url":
# 生成 SignOz Trace URL
service_name = parameters.get("service_name", "")
if not service_name:
return {"error": "Missing 'service_name' parameter"}
window_minutes = parameters.get("window_minutes", 5)
url = signoz.generate_trace_url(
service_name=service_name,
window_minutes=window_minutes,
)
return {
"service_name": service_name,
"trace_url": url,
"window_minutes": window_minutes,
}
elif tool_name == "system_metrics":
# 取得系統指標 (CPU/Disk)
host = parameters.get("host", "192.168.0.188")
time_window = parameters.get("time_window_minutes", 5)
metrics = await signoz.get_system_metrics(
_host=host,
time_window_minutes=time_window,
)
return {
"host": host,
"cpu": metrics.get("cpu", {}),
"disk": metrics.get("disk", {}),
"time_range": metrics.get("time_range", {}),
}
else:
return {"error": f"Unknown signoz tool: {tool_name}"}
# =============================================
# Database: 查詢 incident/approval 歷史 (Phase 13.2 #81)
# =============================================
elif server.name == "database":
from uuid import UUID
from src.models.approval import ApprovalStatus
from src.services.approval_db import (
get_approval_service,
get_timeline_service,
)
from src.services.incident_service import get_incident_service
if tool_name == "list_approvals":
# 列出 Approval 請求
approval_svc = get_approval_service()
status_str = parameters.get("status")
limit = parameters.get("limit", 20)
status_filter = None
if status_str:
try:
status_filter = ApprovalStatus(status_str.lower())
except ValueError:
return {"error": f"Invalid status: {status_str}. Valid: pending, approved, rejected, expired"}
approvals = await approval_svc.get_all_approvals(
status=status_filter,
limit=limit,
)
return {
"count": len(approvals),
"approvals": [
{
"id": str(a.id),
"action": a.action[:80] if a.action else "",
"status": a.status.value if hasattr(a.status, 'value') else str(a.status),
"risk_level": a.risk_level.value if hasattr(a.risk_level, 'value') else str(a.risk_level),
"signatures": f"{a.current_signatures}/{a.required_signatures}",
"created_at": a.created_at.isoformat() if a.created_at else None,
}
for a in approvals
],
}
elif tool_name == "get_approval":
# 取得單一 Approval
approval_id = parameters.get("approval_id")
if not approval_id:
return {"error": "Missing 'approval_id' parameter"}
approval_svc = get_approval_service()
try:
approval = await approval_svc.get_approval_by_id(UUID(approval_id))
except ValueError:
return {"error": f"Invalid UUID format: {approval_id}"}
if not approval:
return {"error": f"Approval not found: {approval_id}"}
return {
"id": str(approval.id),
"action": approval.action,
"description": approval.description,
"status": approval.status.value if hasattr(approval.status, 'value') else str(approval.status),
"risk_level": approval.risk_level.value if hasattr(approval.risk_level, 'value') else str(approval.risk_level),
"required_signatures": approval.required_signatures,
"current_signatures": approval.current_signatures,
"signatures": [
{"signer": s.signer_name, "timestamp": s.timestamp.isoformat()}
for s in (approval.signatures or [])
],
"created_at": approval.created_at.isoformat() if approval.created_at else None,
"resolved_at": approval.resolved_at.isoformat() if approval.resolved_at else None,
}
elif tool_name == "list_incidents":
# 列出 Incidents
incident_svc = get_incident_service()
status_filter = parameters.get("status")
limit = parameters.get("limit", 20)
incidents = await incident_svc.get_active_incidents()
# 狀態過濾
if status_filter:
incidents = [i for i in incidents if i.status.value == status_filter.lower()]
incidents = incidents[:limit]
return {
"count": len(incidents),
"incidents": [
{
"id": str(i.id),
"title": i.title[:80] if i.title else "",
"severity": i.severity.value if hasattr(i.severity, 'value') else str(i.severity),
"status": i.status.value if hasattr(i.status, 'value') else str(i.status),
"source": i.source,
"created_at": i.created_at.isoformat() if i.created_at else None,
}
for i in incidents
],
}
elif tool_name == "list_timeline":
# 列出 Timeline 事件
timeline_svc = get_timeline_service()
limit = parameters.get("limit", 50)
events = await timeline_svc.get_events(limit=limit)
return {
"count": len(events),
"events": events,
}
else:
return {"error": f"Unknown database tool: {tool_name}"}
# =============================================
# Mock fallback
# =============================================
logger.info(f"[MOCK] HTTP call to {server.endpoint}: {tool_name}")
return {"status": "ok", "mock": True}
async def _execute_stdio(
self,
server: MCPServer,
tool_name: str,
parameters: dict[str, Any],
) -> Any:
"""STDIO 方式執行工具 (Mock 實作)"""
# Phase 3: Mock 執行
# ⛔ 禁止 logging parameters(ADR-015 Code Review 修復)
logger.info(f"[MOCK] STDIO call to {server.endpoint}: {tool_name}")
mock_responses = {
"read_file": f"[Mock] Contents of {parameters.get('path')}",
"write_file": {"written": True, "path": parameters.get("path")},
}
return mock_responses.get(tool_name, {"status": "ok"})
def _calc_duration(self, start_time: datetime) -> float:
"""計算執行時間 (毫秒)"""
return (now_taipei() - start_time).total_seconds() * 1000
# ==================== ActionExecutor 介面對齊 ====================
def get_supported_operations(self) -> list[str]:
"""取得支援的操作列表 (符合 ActionExecutor 介面)"""
operations = []
for server_name, tools in self._tool_cache.items():
for tool in tools:
operations.append(f"{server_name}.{tool.name}")
return operations
async def execute(
self,
operation: str,
parameters: dict[str, Any],
redaction_mapping: dict[str, str] | None = None,
) -> MCPToolResult:
"""
執行操作 (符合 ActionExecutor.execute 介面)
Args:
operation: 格式為 "server_name.tool_name"
parameters: 工具參數
redaction_mapping: Privacy Shield 映射表
Returns:
MCPToolResult
"""
parts = operation.split(".", 1)
if len(parts) != 2:
return MCPToolResult(
success=False,
execution_id=str(uuid.uuid4()),
error=f"Invalid operation format: {operation}. Expected: server.tool",
)
server_name, tool_name = parts
return await self.call_tool(server_name, tool_name, parameters, redaction_mapping)
async def close(self) -> None:
"""關閉連線"""
await self._http_client.aclose()
# 全域實例
mcp_bridge = MCPBridge()