Files
ewoooc/services/mcp_router.py
OoO 07b76870c9
All checks were successful
CD Pipeline / deploy (push) Successful in 1m1s
feat(market-intel): add internal mcp contract
2026-05-18 14:42:25 +08:00

365 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
services/mcp_router.py
Operation Ollama-First v5.0 / Phase 10.5 — MCP 統一路由
設計原則ADR-031:
1. 統一 HTTP client 對 self-hosted MCP stackpostgres / omnisearch / firecrawl / filesystem
2. 所有 MCP call 雙寫 mcp_calls 表(含 cost_usd / cache_hit / status
3. fail-safeMCP server 不可達 → 回 Nonecaller 自決 fallback
4. feature flag MCP_ROUTER_ENABLED 預設 OFF
啟用條件docker-compose.mcp.yml 已 deploy + 4 個 health endpoint 200
部署位置(與 docker-compose.mcp.yml 對齊):
postgres-mcp: 127.0.0.1:3001
firecrawl-self: 127.0.0.1:3002
mcp-omnisearch: 127.0.0.1:3003
filesystem-mcp: 127.0.0.1:3004
caller 介面(範例):
from services.mcp_router import mcp_router
result = mcp_router.call(
server='omnisearch',
tool='tavily_search',
args={'query': 'momo 母親節促銷'},
caller='mcp_collector',
)
"""
from __future__ import annotations
import os
import time
import json
import logging
import hashlib
import threading
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
import requests
logger = logging.getLogger(__name__)
# ─────────────────────────────────────────────────────────────────────────────
# Feature flag + 配置
# ─────────────────────────────────────────────────────────────────────────────
MCP_ROUTER_ENABLED = os.getenv('MCP_ROUTER_ENABLED', 'false').strip().lower() in ('true', '1', 'yes', 'on')
MCP_BASE_HOSTS = {
'postgres': os.getenv('MCP_POSTGRES_URL', 'http://127.0.0.1:3001'),
'firecrawl': os.getenv('MCP_FIRECRAWL_URL', 'http://127.0.0.1:3002'),
'omnisearch': os.getenv('MCP_OMNISEARCH_URL', 'http://127.0.0.1:3003'),
'filesystem': os.getenv('MCP_FILESYSTEM_URL', 'http://127.0.0.1:3004'),
}
MCP_DEFAULT_TIMEOUT = int(os.getenv('MCP_TIMEOUT_SEC', '30'))
MCP_CACHE_TTL_SEC = int(os.getenv('MCP_CACHE_TTL_SEC', '3600')) # 1h
MCP_MAX_RESULT_BYTES = int(os.getenv('MCP_MAX_RESULT_BYTES', '65536')) # 64KB
def is_mcp_router_enabled() -> bool:
"""Runtime check避免 import-time freeze"""
return os.getenv('MCP_ROUTER_ENABLED', 'false').strip().lower() in ('true', '1', 'yes', 'on')
# ─────────────────────────────────────────────────────────────────────────────
# Tool 白名單caller × server × tool— 限制 LLM 不能亂打 MCP
# ─────────────────────────────────────────────────────────────────────────────
READONLY_FILESYSTEM_TOOLS = [
'list_allowed_directories',
'list_directory',
'directory_tree',
'read_file',
'read_multiple_files',
'search_files',
'get_file_info',
]
TOOL_REGISTRY: Dict[str, Dict[str, List[str]]] = {
# mcp_collector 取代 Gemini Grounding
'mcp_collector': {
'omnisearch': ['tavily_search', 'exa_search'],
'firecrawl': ['scrape_url'],
},
# Hermes 競品分析可查 DB + 抓網頁
'hermes_analyst': {
'postgres': ['query'],
'omnisearch': ['tavily_search'],
'firecrawl': ['scrape_url'],
},
# OpenClaw 戰略分析
'openclaw_strategist': {
'postgres': ['query'],
'omnisearch': ['tavily_search', 'exa_search'],
},
# 市場情報內部 MCP 合約:只允許公開搜尋、人工批准頁面 scrape、只讀查詢。
'market_intel': {
'postgres': ['query'],
'omnisearch': ['tavily_search', 'exa_search'],
'firecrawl': ['scrape_url'],
},
# filesystem-mcp 僅掛載 /data、/logs read-only保留給診斷工具讀檔不開寫入類工具。
'ops_diagnostics': {
'filesystem': READONLY_FILESYSTEM_TOOLS,
},
}
def _is_tool_allowed(caller: str, server: str, tool: str) -> bool:
"""白名單檢查caller 不在 registry → 拒絕"""
return tool in TOOL_REGISTRY.get(caller, {}).get(server, [])
# ─────────────────────────────────────────────────────────────────────────────
# Cache記憶體 + DB hash 指紋)
# ─────────────────────────────────────────────────────────────────────────────
_memory_cache: Dict[str, Dict[str, Any]] = {}
_cache_lock = threading.Lock()
def _cache_key(server: str, tool: str, args: Dict[str, Any]) -> str:
"""穩定排序後 SHA256[:16]"""
payload = json.dumps({'s': server, 't': tool, 'a': args}, sort_keys=True, ensure_ascii=False)
return hashlib.sha256(payload.encode('utf-8')).hexdigest()[:16]
def _cache_get(key: str) -> Optional[Dict[str, Any]]:
with _cache_lock:
entry = _memory_cache.get(key)
if not entry:
return None
if time.time() - entry['ts'] > MCP_CACHE_TTL_SEC:
_memory_cache.pop(key, None)
return None
return entry['data']
def _cache_set(key: str, data: Dict[str, Any]) -> None:
with _cache_lock:
_memory_cache[key] = {'data': data, 'ts': time.time()}
# 簡單 LRU超 200 筆清最舊
if len(_memory_cache) > 200:
oldest = min(_memory_cache.items(), key=lambda kv: kv[1]['ts'])
_memory_cache.pop(oldest[0], None)
# ─────────────────────────────────────────────────────────────────────────────
# 結果容器
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class MCPResult:
success: bool
server: str
tool: str
data: Dict[str, Any] = field(default_factory=dict)
cache_hit: bool = False
duration_ms: int = 0
cost_usd: float = 0.0
error: Optional[str] = None
output_size: int = 0
status: Optional[str] = None
# ─────────────────────────────────────────────────────────────────────────────
# DB 寫入fire-and-forget與 ai_call_logger 同模式)
# ─────────────────────────────────────────────────────────────────────────────
def _async_write_mcp_call(
caller: str,
server: str,
tool: str,
args: Dict[str, Any],
result: MCPResult,
request_id: Optional[str] = None,
) -> None:
"""寫 mcp_calls 表async thread 不阻塞主流程)"""
def _writer():
try:
from sqlalchemy import text as sa_text
from database.manager import get_session
session = get_session()
try:
# PII 保護input_args 只存 hash + size不存原文
args_redacted = {
'hash': hashlib.sha1(
json.dumps(args, sort_keys=True, ensure_ascii=False).encode('utf-8')
).hexdigest()[:12],
'keys': list(args.keys())[:10],
}
session.execute(
sa_text("""
INSERT INTO mcp_calls (
caller, server, tool, input_args, output_size,
duration_ms, status, error, cost_usd, cache_hit, request_id
) VALUES (
:caller, :server, :tool, CAST(:args AS JSONB), :osz,
:dur, :status, :err, :cost, :cache, :req
)
"""),
{
'caller': caller,
'server': server,
'tool': tool,
'args': json.dumps(args_redacted),
'osz': result.output_size,
'dur': result.duration_ms,
'status': result.status or ('ok' if result.success else 'error'),
'err': (result.error or '')[:4000] if result.error else None,
'cost': result.cost_usd,
'cache': result.cache_hit,
'req': request_id,
},
)
session.commit()
except Exception as exc:
session.rollback()
logger.debug(f"[MCPRouter] DB write failed: {exc}")
finally:
session.close()
except Exception:
logger.debug("[MCPRouter] async DB writer bootstrap failed", exc_info=True)
threading.Thread(target=_writer, daemon=True).start()
# ─────────────────────────────────────────────────────────────────────────────
# MCPRouter 主類
# ─────────────────────────────────────────────────────────────────────────────
class MCPRouter:
"""統一 MCP 路由 — HTTP client + cache + DB log."""
def call(
self,
server: str,
tool: str,
args: Dict[str, Any],
caller: str = 'unknown',
timeout: Optional[int] = None,
request_id: Optional[str] = None,
) -> MCPResult:
"""主入口。flag OFF 時直接回 success=Falsecaller 自走 fallback"""
if not is_mcp_router_enabled():
return MCPResult(
success=False, server=server, tool=tool,
error='MCP_ROUTER_ENABLED=false (戰役 v5.0 預設 OFF待 docker-compose.mcp.yml deploy 後翻 ON)',
status='error',
)
# 白名單檢查
if not _is_tool_allowed(caller, server, tool):
return MCPResult(
success=False, server=server, tool=tool,
error=f'tool not in registry: caller={caller} server={server} tool={tool}',
status='error',
)
# Server 配置檢查
base_url = MCP_BASE_HOSTS.get(server)
if not base_url:
return MCPResult(
success=False, server=server, tool=tool,
error=f'unknown server: {server}',
status='error',
)
# Cache 命中
ckey = _cache_key(server, tool, args)
cached = _cache_get(ckey)
if cached is not None:
result = MCPResult(
success=True, server=server, tool=tool,
data=cached, cache_hit=True, duration_ms=0,
output_size=len(json.dumps(cached, ensure_ascii=False)),
status='cache_only',
)
_async_write_mcp_call(caller, server, tool, args, result, request_id)
return result
# HTTP call
url = f"{base_url.rstrip('/')}/tools/{tool}"
request_timeout = timeout or MCP_DEFAULT_TIMEOUT
t0 = time.monotonic()
try:
resp = requests.post(url, json=args, timeout=request_timeout)
duration_ms = int((time.monotonic() - t0) * 1000)
if resp.status_code != 200:
status = 'rate_limited' if resp.status_code == 429 else 'error'
result = MCPResult(
success=False, server=server, tool=tool,
duration_ms=duration_ms,
error=f'HTTP {resp.status_code}: {resp.text[:200]}',
status=status,
)
_async_write_mcp_call(caller, server, tool, args, result, request_id)
return result
data = resp.json()
output_text = json.dumps(data, ensure_ascii=False)
output_size = len(output_text.encode('utf-8'))
# 大小護欄
if output_size > MCP_MAX_RESULT_BYTES:
logger.warning(f"[MCPRouter] {server}/{tool} output {output_size} > {MCP_MAX_RESULT_BYTES} bytes; 截斷")
# 截 64KB保留主結構刪細節
data = {'_truncated': True, '_original_bytes': output_size, 'preview': output_text[:MCP_MAX_RESULT_BYTES]}
# Cache成功才存
_cache_set(ckey, data)
result = MCPResult(
success=True, server=server, tool=tool,
data=data, cache_hit=False,
duration_ms=duration_ms, output_size=output_size,
status='ok',
)
_async_write_mcp_call(caller, server, tool, args, result, request_id)
return result
except requests.Timeout:
duration_ms = int((time.monotonic() - t0) * 1000)
result = MCPResult(
success=False, server=server, tool=tool,
duration_ms=duration_ms, error=f'timeout ({request_timeout}s)',
status='timeout',
)
_async_write_mcp_call(caller, server, tool, args, result, request_id)
return result
except Exception as exc:
duration_ms = int((time.monotonic() - t0) * 1000)
result = MCPResult(
success=False, server=server, tool=tool,
duration_ms=duration_ms,
error=f'{type(exc).__name__}: {str(exc)[:300]}',
status='error',
)
_async_write_mcp_call(caller, server, tool, args, result, request_id)
return result
def health_check(self) -> Dict[str, bool]:
"""檢查 4 個 server 健康度(給排程或 admin endpoint 用)"""
results = {}
for server, base_url in MCP_BASE_HOSTS.items():
try:
resp = requests.get(f"{base_url.rstrip('/')}/health", timeout=3)
results[server] = resp.status_code == 200
except Exception:
results[server] = False
return results
# 全域單例
mcp_router = MCPRouter()
__all__ = [
'MCPRouter',
'MCPResult',
'mcp_router',
'is_mcp_router_enabled',
'TOOL_REGISTRY',
'MCP_BASE_HOSTS',
]