diff --git a/services/mcp_router.py b/services/mcp_router.py index b99315f..9e93d4d 100644 --- a/services/mcp_router.py +++ b/services/mcp_router.py @@ -66,6 +66,16 @@ def is_mcp_router_enabled() -> bool: # ───────────────────────────────────────────────────────────────────────────── # Tool 白名單(caller × server × tool)— 限制 LLM 不能亂打 MCP # ───────────────────────────────────────────────────────────────────────────── +READONLY_FILESYSTEM_TOOLS = [ + 'list_allowed_directories', + 'list_directory', + 'directory_tree', + 'read_file', + 'read_multiple_files', + 'search_files', + 'get_file_info', +] + TOOL_REGISTRY: Dict[str, Dict[str, List[str]]] = { # mcp_collector 取代 Gemini Grounding 'mcp_collector': { @@ -83,6 +93,10 @@ TOOL_REGISTRY: Dict[str, Dict[str, List[str]]] = { 'postgres': ['query'], 'omnisearch': ['tavily_search', 'exa_search'], }, + # filesystem-mcp 僅掛載 /data、/logs read-only;保留給診斷工具讀檔,不開寫入類工具。 + 'ops_diagnostics': { + 'filesystem': READONLY_FILESYSTEM_TOOLS, + }, } diff --git a/tests/test_mcp_router.py b/tests/test_mcp_router.py index 7637720..cd0b482 100644 --- a/tests/test_mcp_router.py +++ b/tests/test_mcp_router.py @@ -72,6 +72,31 @@ def test_unauthorized_caller_tool_combo_rejected(monkeypatch): mock_post.assert_not_called() +def test_filesystem_registry_is_read_only(monkeypatch): + monkeypatch.setenv('MCP_ROUTER_ENABLED', 'true') + from services.mcp_router import mcp_router + + fake_resp = MagicMock(status_code=200) + fake_resp.json.return_value = {'content': 'ok'} + fake_resp.text = '{"content": "ok"}' + + with patch('services.mcp_router.requests.post', return_value=fake_resp) as mock_post: + allowed = mcp_router.call( + server='filesystem', tool='read_file', + args={'path': '/logs/system.log'}, caller='ops_diagnostics', + ) + denied = mcp_router.call( + server='filesystem', tool='write_file', + args={'path': '/data/x', 'content': 'nope'}, caller='ops_diagnostics', + ) + + assert allowed.success is True + assert denied.success is False + assert 'tool not in registry' in (denied.error or '') + mock_post.assert_called_once() + assert mock_post.call_args.args[0].endswith('/tools/read_file') + + # ═══════════════════════════════════════════════════════════════════════════ # T3: flag ON + 正常 + cache # ═══════════════════════════════════════════════════════════════════════════