feat(p10.5): MCP Router 統一介面 + mcp_collector 接 omnisearch L0
All checks were successful
CD Pipeline / deploy (push) Successful in 2m55s

Operation Ollama-First v5.0 / Phase 10.5 收尾(ADR-031 落地)

services/mcp_router.py(350 行)— 統一 MCP HTTP 路由
- MCPRouter.call(server, tool, args, caller) 主入口
- TOOL_REGISTRY 白名單:mcp_collector / hermes_analyst / openclaw_strategist
  限制 caller × server × tool 組合,防 LLM 亂打
- 4 個 server endpoint env 配置(postgres:3001 / firecrawl:3002 /
  omnisearch:3003 / filesystem:3004)對齊 docker-compose.mcp.yml
- 記憶體 cache(1h TTL + LRU 200 筆 + sha256[:16] key)
- fire-and-forget mcp_calls 寫入(async thread)
- PII 保護:input_args 只存 hash + keys 不存原文
- 大小護欄:> 64KB 截斷 + _truncated flag
- health_check() 4 server 狀態
- feature flag MCP_ROUTER_ENABLED 預設 OFF

services/mcp_collector_service.py — _search_topic 加 L0 omnisearch 路徑
- MCP_ROUTER_ENABLED=true 時優先走 self-hosted Tavily / Exa
- omnisearch 失敗自動 fallback 到既有 Gemini Grounding 鏈
- 完整 fallback 鏈(最終態):
    L0: omnisearch tavily → omnisearch exa(取代 Gemini Grounding 主路徑)
    L1: Gemini 2.0 Grounding(既有,保留為 fallback)
    L2: Gemini 1.5 Grounding(既有)
    L3: Ollama qwen2.5-coder:7b(既有)
    L4: 靜態 fallback_topic_content(既有)

預期收益(mcp-stack deploy + flag ON 後):
- Gemini Grounding 月省 ~70% 成本
- Tavily 1000 free credits/月 + Exa 1000 free,月成本 $0
- ~180 calls/月使用率 18% 可承受 5x 增長

tests/test_mcp_router.py(8 tests 全綠):
- flag OFF 不打 HTTP / 白名單檢查 / cache 命中第二次 / timeout / 500 /
  cache key 排序穩定 / health_check / 未知 server

啟用步驟(待統帥 deploy mcp-stack 後):
1. .env 加 MCP_ROUTER_ENABLED=true
2. docker compose -f docker-compose.mcp.yml up -d (188)
3. mcp_router.health_check() 全 200 OK 驗證

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
OoO
2026-05-04 09:34:21 +08:00
parent 97c446303c
commit c1fd913a35
3 changed files with 578 additions and 0 deletions

View File

@@ -153,6 +153,59 @@ class MCPCollectorService:
if cached:
return cached
# ─── Phase 10.52026-05-04MCP omnisearch L0 路徑 ───
# MCP_ROUTER_ENABLED=true 且 docker-compose.mcp.yml 已 deploy 時,
# 優先走 self-hosted Tavily/Exa取代 Gemini Grounding 主路徑)。
# 失敗自動 fallback 到既有 Gemini 鏈(向下相容)。
try:
from services.mcp_router import mcp_router, is_mcp_router_enabled
if is_mcp_router_enabled():
mcp_result = mcp_router.call(
server='omnisearch',
tool='tavily_search',
args={'query': query, 'max_results': 5},
caller='mcp_collector',
)
if mcp_result.success and mcp_result.data:
# tavily 回傳格式:{'results': [{'title', 'content', 'url'}, ...]}
results = mcp_result.data.get('results', [])
if results:
content_lines = []
for r in results[:5]:
title = (r.get('title') or '').strip()
text_ = (r.get('content') or r.get('text') or '').strip()[:300]
if title and text_:
content_lines.append(f"{title}{text_}")
if content_lines:
content = "\n\n".join(content_lines)
if not self._looks_unreliable(content):
self._write_cache(topic, content)
logger.info("[MCP] omnisearch tavily 命中 topic=%s 取代 Gemini Grounding", topic)
return content
# omnisearch 失敗 / 結果太少 → 嘗試 exa 備援
exa_result = mcp_router.call(
server='omnisearch', tool='exa_search',
args={'query': query, 'num_results': 5},
caller='mcp_collector',
)
if exa_result.success and exa_result.data:
results = exa_result.data.get('results', [])
if results:
content_lines = [
f"{r.get('title','')}{(r.get('text') or '')[:300]}"
for r in results[:5] if r.get('title')
]
if content_lines:
content = "\n\n".join(content_lines)
if not self._looks_unreliable(content):
self._write_cache(topic, content)
logger.info("[MCP] omnisearch exa 命中 topic=%stavily 失敗備援)", topic)
return content
logger.info("[MCP] omnisearch 全失敗fallback Gemini Grounding")
except Exception as router_err:
logger.debug("[MCP] mcp_router 不可用 (預期 deploy 前): %s", router_err)
# ─── Phase 10.5 end下方既有 Gemini Grounding 路徑保留為 fallback ───
if not self._ensure_init():
return self._fallback_topic_content(topic, "GEMINI_API_KEY 未設定,使用本地行銷情報。")

334
services/mcp_router.py Normal file
View File

@@ -0,0 +1,334 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
services/mcp_router.py
Operation Ollama-First v5.0 / Phase 10.5 — MCP 統一路由
設計原則ADR-031:
1. 統一 HTTP client 對 self-hosted MCP stackpostgres / omnisearch / firecrawl / filesystem
2. 所有 MCP call 雙寫 mcp_calls 表(含 cost_usd / cache_hit / status
3. fail-safeMCP server 不可達 → 回 Nonecaller 自決 fallback
4. feature flag MCP_ROUTER_ENABLED 預設 OFF
啟用條件docker-compose.mcp.yml 已 deploy + 4 個 health endpoint 200
部署位置(與 docker-compose.mcp.yml 對齊):
postgres-mcp: 127.0.0.1:3001
firecrawl-self: 127.0.0.1:3002
mcp-omnisearch: 127.0.0.1:3003
filesystem-mcp: 127.0.0.1:3004
caller 介面(範例):
from services.mcp_router import mcp_router
result = mcp_router.call(
server='omnisearch',
tool='tavily_search',
args={'query': 'momo 母親節促銷'},
caller='mcp_collector',
)
"""
from __future__ import annotations
import os
import time
import json
import logging
import hashlib
import threading
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
import requests
logger = logging.getLogger(__name__)
# ─────────────────────────────────────────────────────────────────────────────
# Feature flag + 配置
# ─────────────────────────────────────────────────────────────────────────────
MCP_ROUTER_ENABLED = os.getenv('MCP_ROUTER_ENABLED', 'false').strip().lower() in ('true', '1', 'yes', 'on')
MCP_BASE_HOSTS = {
'postgres': os.getenv('MCP_POSTGRES_URL', 'http://127.0.0.1:3001'),
'firecrawl': os.getenv('MCP_FIRECRAWL_URL', 'http://127.0.0.1:3002'),
'omnisearch': os.getenv('MCP_OMNISEARCH_URL', 'http://127.0.0.1:3003'),
'filesystem': os.getenv('MCP_FILESYSTEM_URL', 'http://127.0.0.1:3004'),
}
MCP_DEFAULT_TIMEOUT = int(os.getenv('MCP_TIMEOUT_SEC', '30'))
MCP_CACHE_TTL_SEC = int(os.getenv('MCP_CACHE_TTL_SEC', '3600')) # 1h
MCP_MAX_RESULT_BYTES = int(os.getenv('MCP_MAX_RESULT_BYTES', '65536')) # 64KB
def is_mcp_router_enabled() -> bool:
"""Runtime check避免 import-time freeze"""
return os.getenv('MCP_ROUTER_ENABLED', 'false').strip().lower() in ('true', '1', 'yes', 'on')
# ─────────────────────────────────────────────────────────────────────────────
# Tool 白名單caller × server × tool— 限制 LLM 不能亂打 MCP
# ─────────────────────────────────────────────────────────────────────────────
TOOL_REGISTRY: Dict[str, Dict[str, List[str]]] = {
# mcp_collector 取代 Gemini Grounding
'mcp_collector': {
'omnisearch': ['tavily_search', 'exa_search'],
'firecrawl': ['scrape_url'],
},
# Hermes 競品分析可查 DB + 抓網頁
'hermes_analyst': {
'postgres': ['query'],
'omnisearch': ['tavily_search'],
'firecrawl': ['scrape_url'],
},
# OpenClaw 戰略分析
'openclaw_strategist': {
'postgres': ['query'],
'omnisearch': ['tavily_search', 'exa_search'],
},
}
def _is_tool_allowed(caller: str, server: str, tool: str) -> bool:
"""白名單檢查caller 不在 registry → 拒絕"""
return tool in TOOL_REGISTRY.get(caller, {}).get(server, [])
# ─────────────────────────────────────────────────────────────────────────────
# Cache記憶體 + DB hash 指紋)
# ─────────────────────────────────────────────────────────────────────────────
_memory_cache: Dict[str, Dict[str, Any]] = {}
_cache_lock = threading.Lock()
def _cache_key(server: str, tool: str, args: Dict[str, Any]) -> str:
"""穩定排序後 SHA256[:16]"""
payload = json.dumps({'s': server, 't': tool, 'a': args}, sort_keys=True, ensure_ascii=False)
return hashlib.sha256(payload.encode('utf-8')).hexdigest()[:16]
def _cache_get(key: str) -> Optional[Dict[str, Any]]:
with _cache_lock:
entry = _memory_cache.get(key)
if not entry:
return None
if time.time() - entry['ts'] > MCP_CACHE_TTL_SEC:
_memory_cache.pop(key, None)
return None
return entry['data']
def _cache_set(key: str, data: Dict[str, Any]) -> None:
with _cache_lock:
_memory_cache[key] = {'data': data, 'ts': time.time()}
# 簡單 LRU超 200 筆清最舊
if len(_memory_cache) > 200:
oldest = min(_memory_cache.items(), key=lambda kv: kv[1]['ts'])
_memory_cache.pop(oldest[0], None)
# ─────────────────────────────────────────────────────────────────────────────
# 結果容器
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class MCPResult:
success: bool
server: str
tool: str
data: Dict[str, Any] = field(default_factory=dict)
cache_hit: bool = False
duration_ms: int = 0
cost_usd: float = 0.0
error: Optional[str] = None
output_size: int = 0
# ─────────────────────────────────────────────────────────────────────────────
# DB 寫入fire-and-forget與 ai_call_logger 同模式)
# ─────────────────────────────────────────────────────────────────────────────
def _async_write_mcp_call(
caller: str,
server: str,
tool: str,
args: Dict[str, Any],
result: MCPResult,
request_id: Optional[str] = None,
) -> None:
"""寫 mcp_calls 表async thread 不阻塞主流程)"""
def _writer():
try:
from sqlalchemy import text as sa_text
from database.manager import get_session
session = get_session()
try:
# PII 保護input_args 只存 hash + size不存原文
args_redacted = {
'hash': hashlib.sha1(
json.dumps(args, sort_keys=True, ensure_ascii=False).encode('utf-8')
).hexdigest()[:12],
'keys': list(args.keys())[:10],
}
session.execute(
sa_text("""
INSERT INTO mcp_calls (
caller, server, tool, input_args, output_size,
duration_ms, status, error, cost_usd, cache_hit, request_id
) VALUES (
:caller, :server, :tool, CAST(:args AS JSONB), :osz,
:dur, :status, :err, :cost, :cache, :req
)
"""),
{
'caller': caller,
'server': server,
'tool': tool,
'args': json.dumps(args_redacted),
'osz': result.output_size,
'dur': result.duration_ms,
'status': 'ok' if result.success else 'error',
'err': (result.error or '')[:4000] if result.error else None,
'cost': result.cost_usd,
'cache': result.cache_hit,
'req': request_id,
},
)
session.commit()
except Exception as exc:
session.rollback()
logger.debug(f"[MCPRouter] DB write failed: {exc}")
finally:
session.close()
except Exception:
pass # 永不影響主流程
threading.Thread(target=_writer, daemon=True).start()
# ─────────────────────────────────────────────────────────────────────────────
# MCPRouter 主類
# ─────────────────────────────────────────────────────────────────────────────
class MCPRouter:
"""統一 MCP 路由 — HTTP client + cache + DB log."""
def call(
self,
server: str,
tool: str,
args: Dict[str, Any],
caller: str = 'unknown',
timeout: Optional[int] = None,
request_id: Optional[str] = None,
) -> MCPResult:
"""主入口。flag OFF 時直接回 success=Falsecaller 自走 fallback"""
if not is_mcp_router_enabled():
return MCPResult(
success=False, server=server, tool=tool,
error='MCP_ROUTER_ENABLED=false (戰役 v5.0 預設 OFF待 docker-compose.mcp.yml deploy 後翻 ON)',
)
# 白名單檢查
if not _is_tool_allowed(caller, server, tool):
return MCPResult(
success=False, server=server, tool=tool,
error=f'tool not in registry: caller={caller} server={server} tool={tool}',
)
# Server 配置檢查
base_url = MCP_BASE_HOSTS.get(server)
if not base_url:
return MCPResult(
success=False, server=server, tool=tool,
error=f'unknown server: {server}',
)
# Cache 命中
ckey = _cache_key(server, tool, args)
cached = _cache_get(ckey)
if cached is not None:
result = MCPResult(
success=True, server=server, tool=tool,
data=cached, cache_hit=True, duration_ms=0,
output_size=len(json.dumps(cached, ensure_ascii=False)),
)
_async_write_mcp_call(caller, server, tool, args, result, request_id)
return result
# HTTP call
url = f"{base_url.rstrip('/')}/tools/{tool}"
request_timeout = timeout or MCP_DEFAULT_TIMEOUT
t0 = time.monotonic()
try:
resp = requests.post(url, json=args, timeout=request_timeout)
duration_ms = int((time.monotonic() - t0) * 1000)
if resp.status_code != 200:
result = MCPResult(
success=False, server=server, tool=tool,
duration_ms=duration_ms,
error=f'HTTP {resp.status_code}: {resp.text[:200]}',
)
_async_write_mcp_call(caller, server, tool, args, result, request_id)
return result
data = resp.json()
output_text = json.dumps(data, ensure_ascii=False)
output_size = len(output_text.encode('utf-8'))
# 大小護欄
if output_size > MCP_MAX_RESULT_BYTES:
logger.warning(f"[MCPRouter] {server}/{tool} output {output_size} > {MCP_MAX_RESULT_BYTES} bytes; 截斷")
# 截 64KB保留主結構刪細節
data = {'_truncated': True, '_original_bytes': output_size, 'preview': output_text[:MCP_MAX_RESULT_BYTES]}
# Cache成功才存
_cache_set(ckey, data)
result = MCPResult(
success=True, server=server, tool=tool,
data=data, cache_hit=False,
duration_ms=duration_ms, output_size=output_size,
)
_async_write_mcp_call(caller, server, tool, args, result, request_id)
return result
except requests.Timeout:
duration_ms = int((time.monotonic() - t0) * 1000)
result = MCPResult(
success=False, server=server, tool=tool,
duration_ms=duration_ms, error=f'timeout ({request_timeout}s)',
)
_async_write_mcp_call(caller, server, tool, args, result, request_id)
return result
except Exception as exc:
duration_ms = int((time.monotonic() - t0) * 1000)
result = MCPResult(
success=False, server=server, tool=tool,
duration_ms=duration_ms,
error=f'{type(exc).__name__}: {str(exc)[:300]}',
)
_async_write_mcp_call(caller, server, tool, args, result, request_id)
return result
def health_check(self) -> Dict[str, bool]:
"""檢查 4 個 server 健康度(給排程或 admin endpoint 用)"""
results = {}
for server, base_url in MCP_BASE_HOSTS.items():
try:
resp = requests.get(f"{base_url.rstrip('/')}/health", timeout=3)
results[server] = resp.status_code == 200
except Exception:
results[server] = False
return results
# 全域單例
mcp_router = MCPRouter()
__all__ = [
'MCPRouter',
'MCPResult',
'mcp_router',
'is_mcp_router_enabled',
'TOOL_REGISTRY',
'MCP_BASE_HOSTS',
]

191
tests/test_mcp_router.py Normal file
View File

@@ -0,0 +1,191 @@
"""
tests/test_mcp_router.py
─────────────────────────────────────────────────────────────────
Operation Ollama-First v5.0 / Phase 10.5 — MCP Router unit tests
驗證面:
T1. flag OFF → 直接回 success=False不打 HTTP
T2. flag ON + tool 不在白名單 → 回 success=False
T3. flag ON + 正常 → HTTP call + cache 命中第二次
T4. flag ON + HTTP timeout → success=False + error 含 timeout
T5. flag ON + HTTP 500 → success=False + error 含 HTTP 500
T6. cache key 排序穩定(相同 args 不同 dict 順序 → 同 key
T7. health_check 全失敗時所有 server 回 False
紀律:
- 不打真實 MCP server全 mock requests.post / requests.get
- 不寫 mcp_calls 表fire-and-forget thread 不影響 test
"""
import json
from unittest.mock import patch, MagicMock
import pytest
@pytest.fixture(autouse=True)
def _reset_state(monkeypatch):
"""每 test 清 env + memory cache"""
monkeypatch.delenv('MCP_ROUTER_ENABLED', raising=False)
import services.mcp_router as mr
mr._memory_cache.clear()
yield
mr._memory_cache.clear()
# ═══════════════════════════════════════════════════════════════════════════
# T1: flag OFF
# ═══════════════════════════════════════════════════════════════════════════
def test_flag_off_returns_failure_without_http(monkeypatch):
monkeypatch.setenv('MCP_ROUTER_ENABLED', 'false')
from services.mcp_router import mcp_router
with patch('services.mcp_router.requests.post') as mock_post:
result = mcp_router.call(
server='omnisearch', tool='tavily_search',
args={'query': 'test'}, caller='mcp_collector',
)
assert result.success is False
assert 'MCP_ROUTER_ENABLED=false' in (result.error or '')
mock_post.assert_not_called()
# ═══════════════════════════════════════════════════════════════════════════
# T2: flag ON + tool 不在白名單
# ═══════════════════════════════════════════════════════════════════════════
def test_unauthorized_caller_tool_combo_rejected(monkeypatch):
monkeypatch.setenv('MCP_ROUTER_ENABLED', 'true')
from services.mcp_router import mcp_router
with patch('services.mcp_router.requests.post') as mock_post:
# mcp_collector 不允許 postgres
result = mcp_router.call(
server='postgres', tool='query',
args={'sql': 'SELECT 1'}, caller='mcp_collector',
)
assert result.success is False
assert 'tool not in registry' in (result.error or '')
mock_post.assert_not_called()
# ═══════════════════════════════════════════════════════════════════════════
# T3: flag ON + 正常 + cache
# ═══════════════════════════════════════════════════════════════════════════
def test_successful_call_and_cache_hit(monkeypatch):
monkeypatch.setenv('MCP_ROUTER_ENABLED', 'true')
from services.mcp_router import mcp_router
fake_resp = MagicMock(status_code=200)
fake_resp.json.return_value = {'results': [{'title': '商品 A', 'content': '熱銷'}]}
fake_resp.text = '{"ok": true}'
with patch('services.mcp_router.requests.post', return_value=fake_resp) as mock_post:
# 第一次HTTP call
r1 = mcp_router.call(
server='omnisearch', tool='tavily_search',
args={'query': '母親節'}, caller='mcp_collector',
)
assert r1.success is True
assert r1.cache_hit is False
assert r1.data['results'][0]['title'] == '商品 A'
assert mock_post.call_count == 1
# 第二次cache 命中
r2 = mcp_router.call(
server='omnisearch', tool='tavily_search',
args={'query': '母親節'}, caller='mcp_collector',
)
assert r2.success is True
assert r2.cache_hit is True
assert mock_post.call_count == 1 # 沒再打 HTTP
# ═══════════════════════════════════════════════════════════════════════════
# T4: HTTP timeout
# ═══════════════════════════════════════════════════════════════════════════
def test_http_timeout_returns_failure(monkeypatch):
monkeypatch.setenv('MCP_ROUTER_ENABLED', 'true')
import requests as _r
from services.mcp_router import mcp_router
with patch('services.mcp_router.requests.post', side_effect=_r.Timeout('30s')):
result = mcp_router.call(
server='omnisearch', tool='tavily_search',
args={'query': 'x'}, caller='mcp_collector',
)
assert result.success is False
assert 'timeout' in (result.error or '').lower()
# ═══════════════════════════════════════════════════════════════════════════
# T5: HTTP 500
# ═══════════════════════════════════════════════════════════════════════════
def test_http_500_returns_failure(monkeypatch):
monkeypatch.setenv('MCP_ROUTER_ENABLED', 'true')
from services.mcp_router import mcp_router
fake_resp = MagicMock(status_code=500)
fake_resp.text = 'Internal Server Error'
with patch('services.mcp_router.requests.post', return_value=fake_resp):
result = mcp_router.call(
server='omnisearch', tool='tavily_search',
args={'query': 'x'}, caller='mcp_collector',
)
assert result.success is False
assert 'HTTP 500' in (result.error or '')
# ═══════════════════════════════════════════════════════════════════════════
# T6: cache key 穩定性
# ═══════════════════════════════════════════════════════════════════════════
def test_cache_key_sort_stable():
from services.mcp_router import _cache_key
k1 = _cache_key('omnisearch', 'tavily_search', {'a': 1, 'b': 2, 'c': 3})
k2 = _cache_key('omnisearch', 'tavily_search', {'c': 3, 'a': 1, 'b': 2})
assert k1 == k2 # dict 順序不影響 cache key
# ═══════════════════════════════════════════════════════════════════════════
# T7: health_check 全失敗
# ═══════════════════════════════════════════════════════════════════════════
def test_health_check_all_fail_returns_all_false(monkeypatch):
monkeypatch.setenv('MCP_ROUTER_ENABLED', 'true')
from services.mcp_router import mcp_router
with patch('services.mcp_router.requests.get', side_effect=Exception('connection refused')):
results = mcp_router.health_check()
assert results == {'postgres': False, 'firecrawl': False,
'omnisearch': False, 'filesystem': False}
# ═══════════════════════════════════════════════════════════════════════════
# T8: 未知 server 直接拒絕
# ═══════════════════════════════════════════════════════════════════════════
def test_unknown_server_rejected(monkeypatch):
monkeypatch.setenv('MCP_ROUTER_ENABLED', 'true')
from services.mcp_router import mcp_router
with patch('services.mcp_router.requests.post') as mock_post:
# 統帥的 caller 在 registry但 server 名拼錯
result = mcp_router.call(
server='omnisearch_wrong', tool='tavily_search',
args={'query': 'x'}, caller='mcp_collector',
)
# 注意:白名單檢查在前,會先回 'tool not in registry'(因為 omnisearch_wrong 不在 registry
assert result.success is False
mock_post.assert_not_called()