feat(awooop): route sense mcp through gateway
Some checks failed
Code Review / ai-code-review (push) Successful in 10s
run-migration / migrate (push) Failing after 8s
CD Pipeline / tests (push) Successful in 1m14s
CD Pipeline / build-and-deploy (push) Has been cancelled
CD Pipeline / post-deploy-checks (push) Has been cancelled

This commit is contained in:
Your Name
2026-05-13 09:46:12 +08:00
parent 5ecd21e664
commit 57ed07d1d0
6 changed files with 492 additions and 7 deletions

View File

@@ -0,0 +1,209 @@
-- T7: awoooi read-only MCP Gateway seed
-- 目的:讓決策前感官 MCP 能通過 AwoooP Gateway Gate 2/3產生 first-class audit。
-- 邊界:只授權 read scope不授權 restart/delete/scale/apply/rollback 等 write/admin 工具。
WITH agent_seed(agent_id, display_name) AS (
VALUES
('pre_decision_investigator', 'Pre-decision Investigator'),
('post_execution_verifier', 'Post-execution Verifier')
),
agent_body AS (
SELECT
agent_id,
jsonb_build_object(
'schema_version', 'awooop_agent_contract_v1',
'agent_id', agent_id,
'display_name', display_name,
'project_id', 'awoooi',
'purpose', 'Read-only MCP sensing through AwoooP Gateway',
'allowed_scopes', jsonb_build_array('read'),
'forbidden_scopes', jsonb_build_array('write', 'admin'),
'stage', 't7_mcp_gateway_read_sense'
) AS body_json
FROM agent_seed
),
inserted_revision AS (
INSERT INTO awooop_contract_revisions (
project_id,
contract_family,
contract_id,
version_major,
version_minor,
lifecycle_status,
body_json,
body_hash,
body_schema_version,
publisher_id,
published_at
)
SELECT
'awoooi',
'agent',
agent_id,
1,
0,
'active',
body_json,
encode(digest(body_json::text, 'sha256'), 'hex'),
'v1.0',
'migration:t7_mcp_gateway_read_seed',
NOW()
FROM agent_body
ON CONFLICT (project_id, contract_family, contract_id, version_major, version_minor)
DO NOTHING
RETURNING revision_id, project_id, contract_family, contract_id
),
chosen_revision AS (
SELECT revision_id, project_id, contract_family, contract_id
FROM inserted_revision
UNION ALL
SELECT revision_id, project_id, contract_family, contract_id
FROM awooop_contract_revisions
WHERE project_id = 'awoooi'
AND contract_family = 'agent'
AND contract_id IN (SELECT agent_id FROM agent_seed)
AND version_major = 1
AND version_minor = 0
AND lifecycle_status = 'active'
),
upsert_pointer AS (
INSERT INTO awooop_active_revisions (
project_id,
contract_family,
contract_id,
active_revision_id,
updated_at
)
SELECT DISTINCT ON (project_id, contract_family, contract_id)
project_id,
contract_family,
contract_id,
revision_id,
NOW()
FROM chosen_revision
ORDER BY project_id, contract_family, contract_id, revision_id
ON CONFLICT (project_id, contract_family, contract_id)
DO UPDATE SET
active_revision_id = EXCLUDED.active_revision_id,
updated_at = NOW()
RETURNING contract_id
)
SELECT 'active_agent_contracts', count(*) FROM upsert_pointer;
WITH read_tools(tool_name, description) AS (
VALUES
('k8s_get_pod_logs', 'Kubernetes pod logs read'),
('k8s_get_events', 'Kubernetes events read'),
('k8s_describe_pod', 'Kubernetes pod describe read'),
('k8s_get_hpa_status', 'Kubernetes HPA status read'),
('k8s_get_node_conditions', 'Kubernetes node conditions read'),
('ssh_diagnose', 'SSH host diagnosis read'),
('ssh_get_top_processes', 'SSH top processes read'),
('ssh_get_disk_usage', 'SSH disk usage read'),
('ssh_get_memory_info', 'SSH memory info read'),
('ssh_get_container_logs', 'SSH container logs read'),
('ssh_get_container_status', 'SSH container status read'),
('ssh_get_service_status', 'SSH service status read'),
('ssh_check_port', 'SSH port check read'),
('ssh_get_nginx_error_log', 'SSH nginx error log read'),
('ssh_get_swap_info', 'SSH swap info read'),
('prometheus_query', 'Prometheus instant query read'),
('prometheus_query_range', 'Prometheus range query read'),
('prometheus_get_alert_history', 'Prometheus alert history read'),
('gold_metrics', 'SigNoz gold metrics read'),
('trace_url', 'SigNoz trace URL read'),
('system_metrics', 'SigNoz system metrics read'),
('query_logs', 'SigNoz logs read'),
('error_logs_summary', 'SigNoz error logs summary read'),
('list_approvals', 'Approval records read'),
('get_approval', 'Approval detail read'),
('list_incidents', 'Incident records read'),
('list_timeline', 'Timeline records read'),
('read_file', 'Filesystem allowlisted file read'),
('list_directory', 'Filesystem allowlisted directory read'),
('search_in_file', 'Filesystem allowlisted file search'),
('list_dashboards', 'Grafana dashboards read'),
('get_dashboard', 'Grafana dashboard read'),
('get_panel_data', 'Grafana panel data read'),
('generate_dashboard_url', 'Grafana dashboard URL read'),
('search_runbook', 'Runbook semantic search read'),
('get_index_stats', 'Runbook index stats read'),
('argocd_list_apps', 'ArgoCD apps read'),
('argocd_get_app_status', 'ArgoCD app status read'),
('argocd_get_sync_history', 'ArgoCD sync history read'),
('sentry_list_issues', 'Sentry issues read'),
('sentry_get_issue', 'Sentry issue detail read'),
('sentry_search_issues', 'Sentry issue search read')
),
upsert_tools AS (
INSERT INTO awooop_mcp_tool_registry (
project_id,
tool_name,
tool_type,
description,
allowed_scopes,
environment_tags,
is_active,
updated_at
)
SELECT
'awoooi',
tool_name,
'mcp_server',
description,
'["read"]'::jsonb,
'{"env": "prod"}'::jsonb,
TRUE,
NOW()
FROM read_tools
ON CONFLICT (project_id, tool_name)
DO UPDATE SET
description = EXCLUDED.description,
allowed_scopes = EXCLUDED.allowed_scopes,
environment_tags = EXCLUDED.environment_tags,
is_active = TRUE,
updated_at = NOW()
RETURNING tool_id
),
grant_agents(agent_id) AS (
VALUES
('pre_decision_investigator'),
('post_execution_verifier')
),
upsert_grants AS (
INSERT INTO awooop_mcp_grants (
project_id,
agent_id,
tool_id,
granted_by,
granted_scopes,
expires_at,
is_revoked,
revoked_at,
revoked_by
)
SELECT
'awoooi',
grant_agents.agent_id,
upsert_tools.tool_id,
'migration:t7_mcp_gateway_read_seed',
'["read"]'::jsonb,
NULL,
FALSE,
NULL,
NULL
FROM upsert_tools
CROSS JOIN grant_agents
ON CONFLICT (project_id, agent_id, tool_id)
DO UPDATE SET
granted_scopes = EXCLUDED.granted_scopes,
expires_at = NULL,
is_revoked = FALSE,
revoked_at = NULL,
revoked_by = NULL
RETURNING grant_id
)
SELECT
'awoooi_read_tools',
(SELECT count(*) FROM upsert_tools) AS tool_rows,
(SELECT count(*) FROM upsert_grants) AS grant_rows;

View File

@@ -0,0 +1,75 @@
-- Rollback for T7 awoooi read-only MCP Gateway seed.
-- Contract revisions are append-only; rollback revokes grants and deactivates the seeded read tools.
UPDATE awooop_mcp_grants
SET
is_revoked = TRUE,
revoked_at = NOW(),
revoked_by = 'rollback:t7_mcp_gateway_read_seed'
WHERE project_id = 'awoooi'
AND agent_id IN ('pre_decision_investigator', 'post_execution_verifier')
AND granted_by = 'migration:t7_mcp_gateway_read_seed'
AND is_revoked = FALSE;
UPDATE awooop_mcp_tool_registry
SET
is_active = FALSE,
updated_at = NOW()
WHERE project_id = 'awoooi'
AND tool_name IN (
'k8s_get_pod_logs',
'k8s_get_events',
'k8s_describe_pod',
'k8s_get_hpa_status',
'k8s_get_node_conditions',
'ssh_diagnose',
'ssh_get_top_processes',
'ssh_get_disk_usage',
'ssh_get_memory_info',
'ssh_get_container_logs',
'ssh_get_container_status',
'ssh_get_service_status',
'ssh_check_port',
'ssh_get_nginx_error_log',
'ssh_get_swap_info',
'prometheus_query',
'prometheus_query_range',
'prometheus_get_alert_history',
'gold_metrics',
'trace_url',
'system_metrics',
'query_logs',
'error_logs_summary',
'list_approvals',
'get_approval',
'list_incidents',
'list_timeline',
'read_file',
'list_directory',
'search_in_file',
'list_dashboards',
'get_dashboard',
'get_panel_data',
'generate_dashboard_url',
'search_runbook',
'get_index_stats',
'argocd_list_apps',
'argocd_get_app_status',
'argocd_get_sync_history',
'sentry_list_issues',
'sentry_get_issue',
'sentry_search_issues'
);
DELETE FROM awooop_active_revisions
WHERE project_id = 'awoooi'
AND contract_family = 'agent'
AND contract_id IN ('pre_decision_investigator', 'post_execution_verifier');
UPDATE awooop_contract_revisions
SET lifecycle_status = 'revoked'
WHERE project_id = 'awoooi'
AND contract_family = 'agent'
AND contract_id IN ('pre_decision_investigator', 'post_execution_verifier')
AND publisher_id = 'migration:t7_mcp_gateway_read_seed'
AND lifecycle_status = 'active';

View File

@@ -388,10 +388,7 @@ class McpGateway:
parameters: dict[str, Any],
) -> MCPToolResult:
"""呼叫底層 MCP provider 執行工具"""
registry = get_provider_registry()
provider = registry.get(ctx.tool_name) or registry.get(
tool_row.tool_name if tool_row else ctx.tool_name
)
provider = await self._resolve_provider(ctx, tool_row)
# 找不到 provider → 回傳 shadow no-op
if provider is None:
@@ -407,15 +404,57 @@ class McpGateway:
)
audit_params = dict(parameters)
existing_audit = (
parameters.get("_mcp_audit")
if isinstance(parameters, dict) and isinstance(parameters.get("_mcp_audit"), dict)
else {}
)
audit_params["_mcp_audit"] = {
"project_id": ctx.project_id,
"agent_id": ctx.agent_id,
"run_id": str(ctx.run_id) if ctx.run_id else None,
"trace_id": ctx.trace_id,
"incident_id": existing_audit.get("incident_id") or ctx.trace_id,
"session_id": existing_audit.get("session_id"),
"flywheel_node": existing_audit.get("flywheel_node"),
"agent_role": existing_audit.get("agent_role") or ctx.agent_id,
"gateway_path": "awooop_mcp_gateway",
}
return await provider.execute(ctx.tool_name, audit_params)
async def _resolve_provider(
self,
ctx: GatewayContext,
tool_row: AwoooPMcpToolRegistry | None,
):
"""Find the provider that owns ctx.tool_name.
ProviderRegistry is keyed by provider name (`kubernetes`, `ssh_host`, ...),
while GatewayContext intentionally uses the governed tool name
(`kubectl_get`, `ssh_diagnose`, ...). Scan provider tool manifests as the
compatibility bridge until registry exposes a first-class tool index.
"""
registry = get_provider_registry()
direct = registry.get(ctx.tool_name)
if direct is not None:
return direct
lookup_name = tool_row.tool_name if tool_row else ctx.tool_name
for provider in registry.all():
try:
tools = await provider.list_tools()
except Exception as exc:
logger.debug(
"mcp_gateway_provider_manifest_skipped",
provider=getattr(provider, "name", None),
tool_name=lookup_name,
error=str(exc),
)
continue
if any(tool.name == lookup_name for tool in tools):
return provider
return None
# ── Audit log ─────────────────────────────────────────────────────────────
async def _write_audit(
@@ -443,6 +482,15 @@ class McpGateway:
json.dumps(result.output, sort_keys=True, default=str).encode()
).hexdigest()
gate_payload = {
**gate_result.as_dict(),
"schema_version": "awooop_mcp_gateway_audit_v1",
"gateway_path": "awooop_mcp_gateway",
"policy_enforced": True,
"is_shadow": ctx.is_shadow,
"required_scope": ctx.required_scope,
}
audit = AwoooPMcpGatewayAudit(
project_id=ctx.project_id,
run_id=ctx.run_id,
@@ -452,7 +500,7 @@ class McpGateway:
tool_name=ctx.tool_name,
input_hash=input_hash,
output_hash=output_hash,
gate_result=gate_result.as_dict(),
gate_result=gate_payload,
result_status=result_status,
block_gate=block_gate,
block_reason=block_reason,

View File

@@ -32,6 +32,9 @@ from typing import TYPE_CHECKING, Any
import structlog
from src.db.base import get_db_context
from src.plugins.mcp.gateway import GatewayContext, McpGateway
from src.plugins.mcp.registry import AuditedMCPToolProvider
from src.services.evidence_snapshot import EvidenceSnapshot
from src.services.mcp_audit_context import with_mcp_audit_context
from src.services.mcp_tool_registry import RegisteredTool, SensorDimension, get_mcp_tool_registry
@@ -332,7 +335,7 @@ class PreDecisionInvestigator:
agent_role="pre_decision_investigator",
)
result = await asyncio.wait_for(
reg.provider.execute(tool_name, audited_params),
self._execute_tool(reg, tool_name, audited_params, snapshot.incident_id),
timeout=MCP_TOOL_TIMEOUT_SEC,
)
@@ -376,6 +379,34 @@ class PreDecisionInvestigator:
except Exception:
pass
async def _execute_tool(
self,
reg: RegisteredTool,
tool_name: str,
audited_params: dict[str, Any],
incident_id: str,
):
"""Route production audited providers through AwoooP MCP Gateway.
Tests and manual injections can still pass a raw provider; production
registry entries are `AuditedMCPToolProvider` and must now leave a
first-class gateway audit row instead of only a legacy bridge row.
"""
if not isinstance(reg.provider, AuditedMCPToolProvider):
return await reg.provider.execute(tool_name, audited_params)
async with get_db_context("awoooi") as db:
ctx = GatewayContext(
project_id="awoooi",
agent_id="pre_decision_investigator",
tool_name=tool_name,
trace_id=incident_id,
is_shadow=True,
environment={"env": "prod"},
required_scope="read",
)
return await McpGateway(db).call(ctx, audited_params)
async def _log_mcp_call_to_timeline(
snapshot_incident_id: str | None,

View File

@@ -1,10 +1,13 @@
from __future__ import annotations
import uuid
from types import SimpleNamespace
import pytest
from src.plugins.mcp import gateway as gateway_module
from src.plugins.mcp.gateway import GateCheckResult, GatewayContext, McpGateway
from src.plugins.mcp.interfaces import MCPTool, MCPToolProvider, MCPToolResult
class FakeDb:
@@ -51,3 +54,72 @@ async def test_write_audit_persists_blocked_gate_without_tool_row() -> None:
assert audit.tool_name == "missing_tool"
assert audit.result_status == "blocked"
assert audit.block_gate == 1
assert audit.gate_result["schema_version"] == "awooop_mcp_gateway_audit_v1"
assert audit.gate_result["gateway_path"] == "awooop_mcp_gateway"
assert audit.gate_result["policy_enforced"] is True
class FakeProvider(MCPToolProvider):
def __init__(self) -> None:
self.calls: list[tuple[str, dict]] = []
@property
def name(self) -> str:
return "kubernetes"
async def list_tools(self) -> list[MCPTool]:
return [MCPTool(name="kubectl_get", description="", input_schema={})]
async def execute(self, tool_name: str, parameters: dict) -> MCPToolResult:
self.calls.append((tool_name, parameters))
return MCPToolResult(success=True, execution_id="provider-ok", output={"ok": True})
class FakeProviderRegistry:
def __init__(self, provider: FakeProvider) -> None:
self.provider = provider
def get(self, _name: str):
return None
def all(self) -> list[FakeProvider]:
return [self.provider]
@pytest.mark.asyncio
async def test_execute_tool_resolves_provider_by_tool_manifest(
monkeypatch: pytest.MonkeyPatch,
) -> None:
provider = FakeProvider()
monkeypatch.setattr(
gateway_module,
"get_provider_registry",
lambda: FakeProviderRegistry(provider),
)
result = await McpGateway(FakeDb())._execute_tool(
GatewayContext(
project_id="awoooi",
agent_id="pre_decision_investigator",
tool_name="kubectl_get",
trace_id="INC-GW",
),
SimpleNamespace(tool_name="kubectl_get"),
{
"namespace": "awoooi-prod",
"_mcp_audit": {
"incident_id": "INC-GW",
"session_id": "incident:INC-GW:pre_decision",
"flywheel_node": "sense",
"agent_role": "pre_decision_investigator",
},
},
)
assert result.success is True
assert provider.calls
tool_name, parameters = provider.calls[0]
assert tool_name == "kubectl_get"
assert parameters["_mcp_audit"]["gateway_path"] == "awooop_mcp_gateway"
assert parameters["_mcp_audit"]["incident_id"] == "INC-GW"
assert parameters["_mcp_audit"]["flywheel_node"] == "sense"

View File

@@ -20,12 +20,14 @@ ADR-081: Phase 1 決策前情報調查員
from __future__ import annotations
import asyncio
from typing import Any
import pytest
from src.plugins.mcp.interfaces import MCPTool, MCPToolProvider, MCPToolResult
from src.plugins.mcp.registry import AuditedMCPToolProvider
from src.services import pre_decision_investigator as pdi_module
from src.services.evidence_snapshot import EvidenceSnapshot
from src.services.mcp_tool_registry import (
MCPToolRegistry,
RegisteredTool,
SensorDimension,
)
@@ -103,6 +105,14 @@ class _CaptureProvider(_SuccessProvider):
return await super().execute(tool_name, parameters)
class _DbContext:
async def __aenter__(self) -> object:
return object()
async def __aexit__(self, *_args: object) -> None:
return None
def _stub_incident(
alertname: str = "KubePodCrashLooping",
namespace: str = "awoooi-prod",
@@ -296,6 +306,46 @@ class TestCollectOne:
assert audit_context["flywheel_node"] == "sense"
assert audit_context["agent_role"] == "pre_decision_investigator"
@pytest.mark.asyncio
async def test_collect_one_routes_audited_provider_through_gateway(
self,
monkeypatch: pytest.MonkeyPatch,
):
investigator = PreDecisionInvestigator()
snap = EvidenceSnapshot(incident_id="INC-GATEWAY")
provider = _CaptureProvider()
reg = _reg(
"kubectl_describe",
AuditedMCPToolProvider(provider),
SensorDimension.D1_K8S_STATE,
)
calls: list[dict[str, Any]] = []
class FakeGateway:
def __init__(self, db: object) -> None:
self.db = db
async def call(self, ctx, parameters: dict[str, Any]) -> MCPToolResult:
calls.append({"ctx": ctx, "parameters": parameters, "db": self.db})
return MCPToolResult(success=True, execution_id="gw", output={"status": "Running"})
monkeypatch.setattr(pdi_module, "get_db_context", lambda _project_id: _DbContext())
monkeypatch.setattr(pdi_module, "McpGateway", FakeGateway)
await investigator._collect_one(snap, reg, {"namespace": "prod"})
assert snap.mcp_health["kubectl_describe"] is True
assert snap.k8s_state is not None
assert provider.seen_parameters is None
assert calls
ctx = calls[0]["ctx"]
assert ctx.project_id == "awoooi"
assert ctx.agent_id == "pre_decision_investigator"
assert ctx.tool_name == "kubectl_describe"
assert ctx.trace_id == "INC-GATEWAY"
assert ctx.required_scope == "read"
assert calls[0]["parameters"]["_mcp_audit"]["incident_id"] == "INC-GATEWAY"
@pytest.mark.asyncio
async def test_failed_tool_marks_health_false(self):
investigator = PreDecisionInvestigator()