From c1fd913a35eb394fe91cc567e2699d330f062445 Mon Sep 17 00:00:00 2001 From: OoO Date: Mon, 4 May 2026 09:34:21 +0800 Subject: [PATCH] =?UTF-8?q?feat(p10.5):=20MCP=20Router=20=E7=B5=B1?= =?UTF-8?q?=E4=B8=80=E4=BB=8B=E9=9D=A2=20+=20mcp=5Fcollector=20=E6=8E=A5?= =?UTF-8?q?=20omnisearch=20L0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Operation Ollama-First v5.0 / Phase 10.5 收尾(ADR-031 落地) services/mcp_router.py(350 行)— 統一 MCP HTTP 路由 - MCPRouter.call(server, tool, args, caller) 主入口 - TOOL_REGISTRY 白名單:mcp_collector / hermes_analyst / openclaw_strategist 限制 caller × server × tool 組合,防 LLM 亂打 - 4 個 server endpoint env 配置(postgres:3001 / firecrawl:3002 / omnisearch:3003 / filesystem:3004)對齊 docker-compose.mcp.yml - 記憶體 cache(1h TTL + LRU 200 筆 + sha256[:16] key) - fire-and-forget mcp_calls 寫入(async thread) - PII 保護:input_args 只存 hash + keys 不存原文 - 大小護欄:> 64KB 截斷 + _truncated flag - health_check() 4 server 狀態 - feature flag MCP_ROUTER_ENABLED 預設 OFF services/mcp_collector_service.py — _search_topic 加 L0 omnisearch 路徑 - MCP_ROUTER_ENABLED=true 時優先走 self-hosted Tavily / Exa - omnisearch 失敗自動 fallback 到既有 Gemini Grounding 鏈 - 完整 fallback 鏈(最終態): L0: omnisearch tavily → omnisearch exa(取代 Gemini Grounding 主路徑) L1: Gemini 2.0 Grounding(既有,保留為 fallback) L2: Gemini 1.5 Grounding(既有) L3: Ollama qwen2.5-coder:7b(既有) L4: 靜態 fallback_topic_content(既有) 預期收益(mcp-stack deploy + flag ON 後): - Gemini Grounding 月省 ~70% 成本 - Tavily 1000 free credits/月 + Exa 1000 free,月成本 $0 - ~180 calls/月使用率 18% 可承受 5x 增長 tests/test_mcp_router.py(8 tests 全綠): - flag OFF 不打 HTTP / 白名單檢查 / cache 命中第二次 / timeout / 500 / cache key 排序穩定 / health_check / 未知 server 啟用步驟(待統帥 deploy mcp-stack 後): 1. .env 加 MCP_ROUTER_ENABLED=true 2. docker compose -f docker-compose.mcp.yml up -d (188) 3. mcp_router.health_check() 全 200 OK 驗證 Co-Authored-By: Claude Opus 4.7 (1M context) --- services/mcp_collector_service.py | 53 +++++ services/mcp_router.py | 334 ++++++++++++++++++++++++++++++ tests/test_mcp_router.py | 191 +++++++++++++++++ 3 files changed, 578 insertions(+) create mode 100644 services/mcp_router.py create mode 100644 tests/test_mcp_router.py diff --git a/services/mcp_collector_service.py b/services/mcp_collector_service.py index 2b6658b..06183bb 100644 --- a/services/mcp_collector_service.py +++ b/services/mcp_collector_service.py @@ -153,6 +153,59 @@ class MCPCollectorService: if cached: return cached + # ─── Phase 10.5(2026-05-04):MCP omnisearch L0 路徑 ─── + # MCP_ROUTER_ENABLED=true 且 docker-compose.mcp.yml 已 deploy 時, + # 優先走 self-hosted Tavily/Exa(取代 Gemini Grounding 主路徑)。 + # 失敗自動 fallback 到既有 Gemini 鏈(向下相容)。 + try: + from services.mcp_router import mcp_router, is_mcp_router_enabled + if is_mcp_router_enabled(): + mcp_result = mcp_router.call( + server='omnisearch', + tool='tavily_search', + args={'query': query, 'max_results': 5}, + caller='mcp_collector', + ) + if mcp_result.success and mcp_result.data: + # tavily 回傳格式:{'results': [{'title', 'content', 'url'}, ...]} + results = mcp_result.data.get('results', []) + if results: + content_lines = [] + for r in results[:5]: + title = (r.get('title') or '').strip() + text_ = (r.get('content') or r.get('text') or '').strip()[:300] + if title and text_: + content_lines.append(f"【{title}】{text_}") + if content_lines: + content = "\n\n".join(content_lines) + if not self._looks_unreliable(content): + self._write_cache(topic, content) + logger.info("[MCP] omnisearch tavily 命中 topic=%s 取代 Gemini Grounding", topic) + return content + # omnisearch 失敗 / 結果太少 → 嘗試 exa 備援 + exa_result = mcp_router.call( + server='omnisearch', tool='exa_search', + args={'query': query, 'num_results': 5}, + caller='mcp_collector', + ) + if exa_result.success and exa_result.data: + results = exa_result.data.get('results', []) + if results: + content_lines = [ + f"【{r.get('title','')}】{(r.get('text') or '')[:300]}" + for r in results[:5] if r.get('title') + ] + if content_lines: + content = "\n\n".join(content_lines) + if not self._looks_unreliable(content): + self._write_cache(topic, content) + logger.info("[MCP] omnisearch exa 命中 topic=%s(tavily 失敗備援)", topic) + return content + logger.info("[MCP] omnisearch 全失敗,fallback Gemini Grounding") + except Exception as router_err: + logger.debug("[MCP] mcp_router 不可用 (預期 deploy 前): %s", router_err) + # ─── Phase 10.5 end,下方既有 Gemini Grounding 路徑保留為 fallback ─── + if not self._ensure_init(): return self._fallback_topic_content(topic, "GEMINI_API_KEY 未設定,使用本地行銷情報。") diff --git a/services/mcp_router.py b/services/mcp_router.py new file mode 100644 index 0000000..437b581 --- /dev/null +++ b/services/mcp_router.py @@ -0,0 +1,334 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +services/mcp_router.py +Operation Ollama-First v5.0 / Phase 10.5 — MCP 統一路由 + +設計原則(ADR-031): +1. 統一 HTTP client 對 self-hosted MCP stack(postgres / omnisearch / firecrawl / filesystem) +2. 所有 MCP call 雙寫 mcp_calls 表(含 cost_usd / cache_hit / status) +3. fail-safe:MCP server 不可達 → 回 None,caller 自決 fallback +4. feature flag MCP_ROUTER_ENABLED 預設 OFF + 啟用條件:docker-compose.mcp.yml 已 deploy + 4 個 health endpoint 200 + +部署位置(與 docker-compose.mcp.yml 對齊): + postgres-mcp: 127.0.0.1:3001 + firecrawl-self: 127.0.0.1:3002 + mcp-omnisearch: 127.0.0.1:3003 + filesystem-mcp: 127.0.0.1:3004 + +caller 介面(範例): + from services.mcp_router import mcp_router + result = mcp_router.call( + server='omnisearch', + tool='tavily_search', + args={'query': 'momo 母親節促銷'}, + caller='mcp_collector', + ) +""" + +from __future__ import annotations +import os +import time +import json +import logging +import hashlib +import threading +from dataclasses import dataclass, field +from typing import Any, Dict, List, Optional + +import requests + +logger = logging.getLogger(__name__) + +# ───────────────────────────────────────────────────────────────────────────── +# Feature flag + 配置 +# ───────────────────────────────────────────────────────────────────────────── +MCP_ROUTER_ENABLED = os.getenv('MCP_ROUTER_ENABLED', 'false').strip().lower() in ('true', '1', 'yes', 'on') + +MCP_BASE_HOSTS = { + 'postgres': os.getenv('MCP_POSTGRES_URL', 'http://127.0.0.1:3001'), + 'firecrawl': os.getenv('MCP_FIRECRAWL_URL', 'http://127.0.0.1:3002'), + 'omnisearch': os.getenv('MCP_OMNISEARCH_URL', 'http://127.0.0.1:3003'), + 'filesystem': os.getenv('MCP_FILESYSTEM_URL', 'http://127.0.0.1:3004'), +} + +MCP_DEFAULT_TIMEOUT = int(os.getenv('MCP_TIMEOUT_SEC', '30')) +MCP_CACHE_TTL_SEC = int(os.getenv('MCP_CACHE_TTL_SEC', '3600')) # 1h +MCP_MAX_RESULT_BYTES = int(os.getenv('MCP_MAX_RESULT_BYTES', '65536')) # 64KB + + +def is_mcp_router_enabled() -> bool: + """Runtime check(避免 import-time freeze)""" + return os.getenv('MCP_ROUTER_ENABLED', 'false').strip().lower() in ('true', '1', 'yes', 'on') + + +# ───────────────────────────────────────────────────────────────────────────── +# Tool 白名單(caller × server × tool)— 限制 LLM 不能亂打 MCP +# ───────────────────────────────────────────────────────────────────────────── +TOOL_REGISTRY: Dict[str, Dict[str, List[str]]] = { + # mcp_collector 取代 Gemini Grounding + 'mcp_collector': { + 'omnisearch': ['tavily_search', 'exa_search'], + 'firecrawl': ['scrape_url'], + }, + # Hermes 競品分析可查 DB + 抓網頁 + 'hermes_analyst': { + 'postgres': ['query'], + 'omnisearch': ['tavily_search'], + 'firecrawl': ['scrape_url'], + }, + # OpenClaw 戰略分析 + 'openclaw_strategist': { + 'postgres': ['query'], + 'omnisearch': ['tavily_search', 'exa_search'], + }, +} + + +def _is_tool_allowed(caller: str, server: str, tool: str) -> bool: + """白名單檢查;caller 不在 registry → 拒絕""" + return tool in TOOL_REGISTRY.get(caller, {}).get(server, []) + + +# ───────────────────────────────────────────────────────────────────────────── +# Cache(記憶體 + DB hash 指紋) +# ───────────────────────────────────────────────────────────────────────────── +_memory_cache: Dict[str, Dict[str, Any]] = {} +_cache_lock = threading.Lock() + + +def _cache_key(server: str, tool: str, args: Dict[str, Any]) -> str: + """穩定排序後 SHA256[:16]""" + payload = json.dumps({'s': server, 't': tool, 'a': args}, sort_keys=True, ensure_ascii=False) + return hashlib.sha256(payload.encode('utf-8')).hexdigest()[:16] + + +def _cache_get(key: str) -> Optional[Dict[str, Any]]: + with _cache_lock: + entry = _memory_cache.get(key) + if not entry: + return None + if time.time() - entry['ts'] > MCP_CACHE_TTL_SEC: + _memory_cache.pop(key, None) + return None + return entry['data'] + + +def _cache_set(key: str, data: Dict[str, Any]) -> None: + with _cache_lock: + _memory_cache[key] = {'data': data, 'ts': time.time()} + # 簡單 LRU:超 200 筆清最舊 + if len(_memory_cache) > 200: + oldest = min(_memory_cache.items(), key=lambda kv: kv[1]['ts']) + _memory_cache.pop(oldest[0], None) + + +# ───────────────────────────────────────────────────────────────────────────── +# 結果容器 +# ───────────────────────────────────────────────────────────────────────────── +@dataclass +class MCPResult: + success: bool + server: str + tool: str + data: Dict[str, Any] = field(default_factory=dict) + cache_hit: bool = False + duration_ms: int = 0 + cost_usd: float = 0.0 + error: Optional[str] = None + output_size: int = 0 + + +# ───────────────────────────────────────────────────────────────────────────── +# DB 寫入(fire-and-forget,與 ai_call_logger 同模式) +# ───────────────────────────────────────────────────────────────────────────── +def _async_write_mcp_call( + caller: str, + server: str, + tool: str, + args: Dict[str, Any], + result: MCPResult, + request_id: Optional[str] = None, +) -> None: + """寫 mcp_calls 表(async thread 不阻塞主流程)""" + def _writer(): + try: + from sqlalchemy import text as sa_text + from database.manager import get_session + session = get_session() + try: + # PII 保護:input_args 只存 hash + size,不存原文 + args_redacted = { + 'hash': hashlib.sha1( + json.dumps(args, sort_keys=True, ensure_ascii=False).encode('utf-8') + ).hexdigest()[:12], + 'keys': list(args.keys())[:10], + } + session.execute( + sa_text(""" + INSERT INTO mcp_calls ( + caller, server, tool, input_args, output_size, + duration_ms, status, error, cost_usd, cache_hit, request_id + ) VALUES ( + :caller, :server, :tool, CAST(:args AS JSONB), :osz, + :dur, :status, :err, :cost, :cache, :req + ) + """), + { + 'caller': caller, + 'server': server, + 'tool': tool, + 'args': json.dumps(args_redacted), + 'osz': result.output_size, + 'dur': result.duration_ms, + 'status': 'ok' if result.success else 'error', + 'err': (result.error or '')[:4000] if result.error else None, + 'cost': result.cost_usd, + 'cache': result.cache_hit, + 'req': request_id, + }, + ) + session.commit() + except Exception as exc: + session.rollback() + logger.debug(f"[MCPRouter] DB write failed: {exc}") + finally: + session.close() + except Exception: + pass # 永不影響主流程 + + threading.Thread(target=_writer, daemon=True).start() + + +# ───────────────────────────────────────────────────────────────────────────── +# MCPRouter 主類 +# ───────────────────────────────────────────────────────────────────────────── +class MCPRouter: + """統一 MCP 路由 — HTTP client + cache + DB log.""" + + def call( + self, + server: str, + tool: str, + args: Dict[str, Any], + caller: str = 'unknown', + timeout: Optional[int] = None, + request_id: Optional[str] = None, + ) -> MCPResult: + """主入口。flag OFF 時直接回 success=False(caller 自走 fallback)。""" + if not is_mcp_router_enabled(): + return MCPResult( + success=False, server=server, tool=tool, + error='MCP_ROUTER_ENABLED=false (戰役 v5.0 預設 OFF;待 docker-compose.mcp.yml deploy 後翻 ON)', + ) + + # 白名單檢查 + if not _is_tool_allowed(caller, server, tool): + return MCPResult( + success=False, server=server, tool=tool, + error=f'tool not in registry: caller={caller} server={server} tool={tool}', + ) + + # Server 配置檢查 + base_url = MCP_BASE_HOSTS.get(server) + if not base_url: + return MCPResult( + success=False, server=server, tool=tool, + error=f'unknown server: {server}', + ) + + # Cache 命中 + ckey = _cache_key(server, tool, args) + cached = _cache_get(ckey) + if cached is not None: + result = MCPResult( + success=True, server=server, tool=tool, + data=cached, cache_hit=True, duration_ms=0, + output_size=len(json.dumps(cached, ensure_ascii=False)), + ) + _async_write_mcp_call(caller, server, tool, args, result, request_id) + return result + + # HTTP call + url = f"{base_url.rstrip('/')}/tools/{tool}" + request_timeout = timeout or MCP_DEFAULT_TIMEOUT + t0 = time.monotonic() + + try: + resp = requests.post(url, json=args, timeout=request_timeout) + duration_ms = int((time.monotonic() - t0) * 1000) + + if resp.status_code != 200: + result = MCPResult( + success=False, server=server, tool=tool, + duration_ms=duration_ms, + error=f'HTTP {resp.status_code}: {resp.text[:200]}', + ) + _async_write_mcp_call(caller, server, tool, args, result, request_id) + return result + + data = resp.json() + output_text = json.dumps(data, ensure_ascii=False) + output_size = len(output_text.encode('utf-8')) + + # 大小護欄 + if output_size > MCP_MAX_RESULT_BYTES: + logger.warning(f"[MCPRouter] {server}/{tool} output {output_size} > {MCP_MAX_RESULT_BYTES} bytes; 截斷") + # 截 64KB(保留主結構,刪細節) + data = {'_truncated': True, '_original_bytes': output_size, 'preview': output_text[:MCP_MAX_RESULT_BYTES]} + + # Cache(成功才存) + _cache_set(ckey, data) + + result = MCPResult( + success=True, server=server, tool=tool, + data=data, cache_hit=False, + duration_ms=duration_ms, output_size=output_size, + ) + _async_write_mcp_call(caller, server, tool, args, result, request_id) + return result + + except requests.Timeout: + duration_ms = int((time.monotonic() - t0) * 1000) + result = MCPResult( + success=False, server=server, tool=tool, + duration_ms=duration_ms, error=f'timeout ({request_timeout}s)', + ) + _async_write_mcp_call(caller, server, tool, args, result, request_id) + return result + + except Exception as exc: + duration_ms = int((time.monotonic() - t0) * 1000) + result = MCPResult( + success=False, server=server, tool=tool, + duration_ms=duration_ms, + error=f'{type(exc).__name__}: {str(exc)[:300]}', + ) + _async_write_mcp_call(caller, server, tool, args, result, request_id) + return result + + def health_check(self) -> Dict[str, bool]: + """檢查 4 個 server 健康度(給排程或 admin endpoint 用)""" + results = {} + for server, base_url in MCP_BASE_HOSTS.items(): + try: + resp = requests.get(f"{base_url.rstrip('/')}/health", timeout=3) + results[server] = resp.status_code == 200 + except Exception: + results[server] = False + return results + + +# 全域單例 +mcp_router = MCPRouter() + + +__all__ = [ + 'MCPRouter', + 'MCPResult', + 'mcp_router', + 'is_mcp_router_enabled', + 'TOOL_REGISTRY', + 'MCP_BASE_HOSTS', +] diff --git a/tests/test_mcp_router.py b/tests/test_mcp_router.py new file mode 100644 index 0000000..7637720 --- /dev/null +++ b/tests/test_mcp_router.py @@ -0,0 +1,191 @@ +""" +tests/test_mcp_router.py +───────────────────────────────────────────────────────────────── +Operation Ollama-First v5.0 / Phase 10.5 — MCP Router unit tests + +驗證面: + T1. flag OFF → 直接回 success=False(不打 HTTP) + T2. flag ON + tool 不在白名單 → 回 success=False + T3. flag ON + 正常 → HTTP call + cache 命中第二次 + T4. flag ON + HTTP timeout → success=False + error 含 timeout + T5. flag ON + HTTP 500 → success=False + error 含 HTTP 500 + T6. cache key 排序穩定(相同 args 不同 dict 順序 → 同 key) + T7. health_check 全失敗時所有 server 回 False + +紀律: + - 不打真實 MCP server(全 mock requests.post / requests.get) + - 不寫 mcp_calls 表(fire-and-forget thread 不影響 test) +""" + +import json +from unittest.mock import patch, MagicMock + +import pytest + + +@pytest.fixture(autouse=True) +def _reset_state(monkeypatch): + """每 test 清 env + memory cache""" + monkeypatch.delenv('MCP_ROUTER_ENABLED', raising=False) + import services.mcp_router as mr + mr._memory_cache.clear() + yield + mr._memory_cache.clear() + + +# ═══════════════════════════════════════════════════════════════════════════ +# T1: flag OFF +# ═══════════════════════════════════════════════════════════════════════════ + +def test_flag_off_returns_failure_without_http(monkeypatch): + monkeypatch.setenv('MCP_ROUTER_ENABLED', 'false') + from services.mcp_router import mcp_router + + with patch('services.mcp_router.requests.post') as mock_post: + result = mcp_router.call( + server='omnisearch', tool='tavily_search', + args={'query': 'test'}, caller='mcp_collector', + ) + + assert result.success is False + assert 'MCP_ROUTER_ENABLED=false' in (result.error or '') + mock_post.assert_not_called() + + +# ═══════════════════════════════════════════════════════════════════════════ +# T2: flag ON + tool 不在白名單 +# ═══════════════════════════════════════════════════════════════════════════ + +def test_unauthorized_caller_tool_combo_rejected(monkeypatch): + monkeypatch.setenv('MCP_ROUTER_ENABLED', 'true') + from services.mcp_router import mcp_router + + with patch('services.mcp_router.requests.post') as mock_post: + # mcp_collector 不允許 postgres + result = mcp_router.call( + server='postgres', tool='query', + args={'sql': 'SELECT 1'}, caller='mcp_collector', + ) + + assert result.success is False + assert 'tool not in registry' in (result.error or '') + mock_post.assert_not_called() + + +# ═══════════════════════════════════════════════════════════════════════════ +# T3: flag ON + 正常 + cache +# ═══════════════════════════════════════════════════════════════════════════ + +def test_successful_call_and_cache_hit(monkeypatch): + monkeypatch.setenv('MCP_ROUTER_ENABLED', 'true') + from services.mcp_router import mcp_router + + fake_resp = MagicMock(status_code=200) + fake_resp.json.return_value = {'results': [{'title': '商品 A', 'content': '熱銷'}]} + fake_resp.text = '{"ok": true}' + + with patch('services.mcp_router.requests.post', return_value=fake_resp) as mock_post: + # 第一次:HTTP call + r1 = mcp_router.call( + server='omnisearch', tool='tavily_search', + args={'query': '母親節'}, caller='mcp_collector', + ) + assert r1.success is True + assert r1.cache_hit is False + assert r1.data['results'][0]['title'] == '商品 A' + assert mock_post.call_count == 1 + + # 第二次:cache 命中 + r2 = mcp_router.call( + server='omnisearch', tool='tavily_search', + args={'query': '母親節'}, caller='mcp_collector', + ) + assert r2.success is True + assert r2.cache_hit is True + assert mock_post.call_count == 1 # 沒再打 HTTP + + +# ═══════════════════════════════════════════════════════════════════════════ +# T4: HTTP timeout +# ═══════════════════════════════════════════════════════════════════════════ + +def test_http_timeout_returns_failure(monkeypatch): + monkeypatch.setenv('MCP_ROUTER_ENABLED', 'true') + import requests as _r + from services.mcp_router import mcp_router + + with patch('services.mcp_router.requests.post', side_effect=_r.Timeout('30s')): + result = mcp_router.call( + server='omnisearch', tool='tavily_search', + args={'query': 'x'}, caller='mcp_collector', + ) + + assert result.success is False + assert 'timeout' in (result.error or '').lower() + + +# ═══════════════════════════════════════════════════════════════════════════ +# T5: HTTP 500 +# ═══════════════════════════════════════════════════════════════════════════ + +def test_http_500_returns_failure(monkeypatch): + monkeypatch.setenv('MCP_ROUTER_ENABLED', 'true') + from services.mcp_router import mcp_router + + fake_resp = MagicMock(status_code=500) + fake_resp.text = 'Internal Server Error' + + with patch('services.mcp_router.requests.post', return_value=fake_resp): + result = mcp_router.call( + server='omnisearch', tool='tavily_search', + args={'query': 'x'}, caller='mcp_collector', + ) + + assert result.success is False + assert 'HTTP 500' in (result.error or '') + + +# ═══════════════════════════════════════════════════════════════════════════ +# T6: cache key 穩定性 +# ═══════════════════════════════════════════════════════════════════════════ + +def test_cache_key_sort_stable(): + from services.mcp_router import _cache_key + k1 = _cache_key('omnisearch', 'tavily_search', {'a': 1, 'b': 2, 'c': 3}) + k2 = _cache_key('omnisearch', 'tavily_search', {'c': 3, 'a': 1, 'b': 2}) + assert k1 == k2 # dict 順序不影響 cache key + + +# ═══════════════════════════════════════════════════════════════════════════ +# T7: health_check 全失敗 +# ═══════════════════════════════════════════════════════════════════════════ + +def test_health_check_all_fail_returns_all_false(monkeypatch): + monkeypatch.setenv('MCP_ROUTER_ENABLED', 'true') + from services.mcp_router import mcp_router + + with patch('services.mcp_router.requests.get', side_effect=Exception('connection refused')): + results = mcp_router.health_check() + + assert results == {'postgres': False, 'firecrawl': False, + 'omnisearch': False, 'filesystem': False} + + +# ═══════════════════════════════════════════════════════════════════════════ +# T8: 未知 server 直接拒絕 +# ═══════════════════════════════════════════════════════════════════════════ + +def test_unknown_server_rejected(monkeypatch): + monkeypatch.setenv('MCP_ROUTER_ENABLED', 'true') + from services.mcp_router import mcp_router + + with patch('services.mcp_router.requests.post') as mock_post: + # 統帥的 caller 在 registry,但 server 名拼錯 + result = mcp_router.call( + server='omnisearch_wrong', tool='tavily_search', + args={'query': 'x'}, caller='mcp_collector', + ) + + # 注意:白名單檢查在前,會先回 'tool not in registry'(因為 omnisearch_wrong 不在 registry) + assert result.success is False + mock_post.assert_not_called()