Files
awoooi/apps/api/src/plugins/mcp/gateway.py
Your Name 8629ac709b
Some checks failed
run-migration / migrate (push) Failing after 59s
Code Review / ai-code-review (push) Successful in 1m8s
Type Sync Check / check-type-sync (push) Successful in 2m27s
feat(awooop): Phase 1-8 完整實作 — AwoooP Agent Platform 六平面架構
## Phase 1-3: Control Plane + Contract System
- awooop_phase1_control_plane_2026-05-04.sql: 12 張核心表 + RLS
- awooop_phase1_batch1_rls_2026-05-04.sql: 全部 FORCE RLS + GRANT
- packages/awooop-contracts/: 六合約 JSON Schema + golden fixtures
- src/models/awooop_contracts.py: Pydantic v2 contract models(extra=forbid)
- src/repositories/contract_repository.py: contract lifecycle(draft→published→active)
- src/services/contract_service.py: HMAC publish sig + Redis multi-sig activate
- src/services/schema_validator.py: LLM output validator(retry×3, E-SCHEMA-001)

## Phase 2: Tenant Isolation
- awooop_phase2_budget_ledger_2026-05-04.sql: budget_ledger + RLS
- src/services/budget_service.py: Token Budget Hard Kill 三層防線
- src/core/context.py: PROJECT_ID ContextVar(31 background loop 自動繼承)
- src/db/base.py + models.py: project_id 欄位 + RLS set_config 注入
- src/hermes/nl_gateway.py: project_id Redis key 前綴(Phase A 雙寫)
- src/services/anomaly_counter.py: per-project 改造(Phase A fallback)

## Phase 4: Platform Shell in Shadow Mode
- awooop_phase4_run_state_2026-05-04.sql: run_state + step_journal + idempotency
- src/services/run_state_machine.py: 8-state FSM + SKIP LOCKED + stale reaper
- src/services/platform_runtime.py: UUID v7 + W3C trace_id + shadow_execute
- src/services/audit_sink.py: PII/secret redaction 9 patterns
- src/api/v1/platform/runs.py: POST/GET /v1/platform/runs(Router→Service 架構)
- src/workers/platform_worker.py: SKIP LOCKED worker + heartbeat + reaper loop
- src/main.py: platform router + lifespan worker start/stop

## Phase 5: MCP Gateway 五閘門
- awooop_phase5_mcp_gateway_2026-05-04.sql: 4 表 + RLS
- src/plugins/mcp/gateway.py: McpGateway(Gate 1~5, E-MCP-GATE-001~009)
- src/plugins/mcp/redaction_middleware.py: 雙層 redaction + 16K 截斷
- src/plugins/mcp/registry.py: __provider name mangling(ADR-116)
- src/plugins/mcp/credential_resolver.py: k8s secret ref 解析
- tests/test_mcp_credential_isolation.py: 10 個迴歸測試(secret leak 防再現)

## Phase 6-8: EwoooC + Channel Hub + Approval Token
- awooop_phase6_ewoooc_onboarding_2026-05-04.sql: ewoooc tenant + 4 read-only MCP tools
- awooop_phase7_channel_hub_2026-05-04.sql: conversation_event + outbound_message
- src/services/provider_proxy.py: ProviderProxy + PlatformEnvelope(ADR-115)
- src/services/channel_hub.py: Telegram inbound mirror + Progressive Feedback(30s)
- src/services/awooop_approval_token.py: HS256 + jti NX replay 防護 + suggest mode

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-04 19:31:53 +08:00

508 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
MCP Gateway — 五閘門 Enforcement Service
=========================================
AwoooP Phase 5.2: ADR-116 五閘門強制執行
2026-05-04 ogt + Claude Sonnet 4.6
五閘門定義(依序,任一失敗即阻斷):
Gate 1 — Projectproject_id 在 awooop_projects 且 migration_mode != 'legacy_awoooi_default'
Gate 2 — Agentagent_id 在 awooop_agents 且 status = 'active'
Gate 3 — Tooltool_id 在 awooop_mcp_tool_registry 且 grant 存在且未到期
Gate 4 — Environmenttool.environment_tags 與 run context 匹配shadow mode 強制放行)
Gate 5 — Approval工具 scope 需要 approval 時,檢查 multi_sig 是否已核准
錯誤碼E-MCP-GATE-XXX
E-MCP-GATE-001 Gate 1 project 不存在或 migration_mode 不符
E-MCP-GATE-002 Gate 2 agent 不存在或未啟用
E-MCP-GATE-003 Gate 3 tool 不在白名單或 grant 不存在/已到期/已撤銷
E-MCP-GATE-004 Gate 4 environment 標籤不匹配(非 shadow mode
E-MCP-GATE-005 Gate 5 approval 尚未取得
E-MCP-GATE-009 credential 解析失敗k8s secret 取不到)
使用方式:
from src.plugins.mcp.gateway import McpGateway, GatewayContext
ctx = GatewayContext(
project_id="awoooi",
agent_id="my-agent",
tool_name="kubectl_get",
run_id=run_id,
trace_id=trace_id,
is_shadow=True,
)
result = await McpGateway(db).call(ctx, parameters={"namespace": "default"})
"""
from __future__ import annotations
import hashlib
import json
import time
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any
from uuid import UUID
import structlog
from sqlalchemy import select, text
from sqlalchemy.ext.asyncio import AsyncSession
from src.db.awooop_models import (
AwoooPActiveRevision,
AwoooPMcpGatewayAudit,
AwoooPMcpGrant,
AwoooPMcpToolRegistry,
AwoooPProject,
)
from src.plugins.mcp.interfaces import MCPToolResult
from src.plugins.mcp.registry import get_provider_registry
logger = structlog.get_logger(__name__)
# ─────────────────────────────────────────────────────────────────────────────
# 錯誤定義
# ─────────────────────────────────────────────────────────────────────────────
class McpGatewayError(Exception):
"""所有 Gateway 攔截錯誤的基礎類別"""
def __init__(self, error_code: str, message: str, gate: int) -> None:
super().__init__(message)
self.error_code = error_code
self.gate = gate
class GateProjectError(McpGatewayError):
def __init__(self, msg: str = "project 不存在或 migration_mode 不符") -> None:
super().__init__("E-MCP-GATE-001", msg, gate=1)
class GateAgentError(McpGatewayError):
def __init__(self, msg: str = "agent 不存在或未啟用") -> None:
super().__init__("E-MCP-GATE-002", msg, gate=2)
class GateToolError(McpGatewayError):
def __init__(self, msg: str = "tool 不在白名單或 grant 失效") -> None:
super().__init__("E-MCP-GATE-003", msg, gate=3)
class GateEnvironmentError(McpGatewayError):
def __init__(self, msg: str = "environment 標籤不匹配") -> None:
super().__init__("E-MCP-GATE-004", msg, gate=4)
class GateApprovalError(McpGatewayError):
def __init__(self, msg: str = "approval 尚未取得") -> None:
super().__init__("E-MCP-GATE-005", msg, gate=5)
class CredentialResolutionError(McpGatewayError):
def __init__(self, msg: str = "credential 解析失敗") -> None:
super().__init__("E-MCP-GATE-009", msg, gate=0)
# ─────────────────────────────────────────────────────────────────────────────
# Gateway Context每次 call 一個)
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class GatewayContext:
project_id: str
agent_id: str
tool_name: str
run_id: UUID | None = None
trace_id: str | None = None
is_shadow: bool = True # shadow modeGate 4/5 放行,不執行 destructive
environment: dict[str, str] = field(default_factory=dict) # e.g. {"env": "prod"}
required_scope: str = "read" # "read" | "write" | "admin"
@dataclass
class GateCheckResult:
gate1_project: bool = False
gate2_agent: bool = False
gate3_tool: bool = False
gate4_env: bool = False
gate5_approval: bool = False
def as_dict(self) -> dict[str, bool]:
return {
"gate1_project": self.gate1_project,
"gate2_agent": self.gate2_agent,
"gate3_tool": self.gate3_tool,
"gate4_env": self.gate4_env,
"gate5_approval": self.gate5_approval,
}
@property
def all_passed(self) -> bool:
return all([
self.gate1_project,
self.gate2_agent,
self.gate3_tool,
self.gate4_env,
self.gate5_approval,
])
# ─────────────────────────────────────────────────────────────────────────────
# McpGateway
# ─────────────────────────────────────────────────────────────────────────────
class McpGateway:
"""
MCP Gateway五閘門 enforcement + audit log + credential isolation。
每個 gateway call 都寫一筆 awooop_mcp_gateway_audit。
"""
def __init__(self, db: AsyncSession) -> None:
self._db = db
async def call(
self,
ctx: GatewayContext,
parameters: dict[str, Any],
) -> MCPToolResult:
"""
執行五閘門檢查後呼叫底層 MCP provider。
任一閘門失敗 → raise McpGatewayError + 寫 blocked audit。
"""
started = time.monotonic()
gate_result = GateCheckResult()
tool_row: AwoooPMcpToolRegistry | None = None
grant_row: AwoooPMcpGrant | None = None
try:
# Gate 1 — Project
tool_row, grant_row = await self._gate1_project(ctx, gate_result)
# Gate 2 — Agent
await self._gate2_agent(ctx, gate_result)
# Gate 3 — Tool + Grant
tool_row, grant_row = await self._gate3_tool(ctx, gate_result)
# Gate 4 — Environmentshadow mode 直接放行)
await self._gate4_environment(ctx, tool_row, gate_result)
# Gate 5 — Approvalshadow mode + scope=read 直接放行)
await self._gate5_approval(ctx, grant_row, gate_result)
except McpGatewayError as exc:
latency = int((time.monotonic() - started) * 1000)
await self._write_audit(
ctx=ctx,
tool_row=tool_row,
parameters=parameters,
result=None,
gate_result=gate_result,
result_status="blocked",
block_gate=exc.gate,
block_reason=f"{exc.error_code}: {exc}",
latency_ms=latency,
)
raise
# 五閘通過 → 執行 tool
result: MCPToolResult | None = None
result_status = "failed"
try:
result = await self._execute_tool(ctx, tool_row, parameters)
result_status = "success" if result.success else "failed"
return result
except Exception as exc:
logger.exception(
"mcp_gateway_execution_error",
project_id=ctx.project_id,
tool_name=ctx.tool_name,
error=str(exc),
)
raise
finally:
latency = int((time.monotonic() - started) * 1000)
await self._write_audit(
ctx=ctx,
tool_row=tool_row,
parameters=parameters,
result=result,
gate_result=gate_result,
result_status=result_status,
block_gate=None,
block_reason=None,
latency_ms=latency,
)
# ── 五閘門實作 ────────────────────────────────────────────────────────────
async def _gate1_project(
self, ctx: GatewayContext, gate_result: GateCheckResult
) -> tuple[AwoooPMcpToolRegistry | None, AwoooPMcpGrant | None]:
"""Gate 1project 必須存在且 migration_mode != 'legacy_awoooi_default'"""
result = await self._db.execute(
select(AwoooPProject).where(
AwoooPProject.project_id == ctx.project_id,
AwoooPProject.migration_mode != "legacy_awoooi_default",
)
)
project = result.scalar_one_or_none()
if project is None:
raise GateProjectError(
f"project '{ctx.project_id}' 不存在或 migration_mode=legacy_awoooi_default"
)
gate_result.gate1_project = True
return None, None
async def _gate2_agent(
self, ctx: GatewayContext, gate_result: GateCheckResult
) -> None:
"""Gate 2agent 必須在 awooop_active_revisions 中有 active contractfamily='agent'"""
result = await self._db.execute(
select(AwoooPActiveRevision).where(
AwoooPActiveRevision.project_id == ctx.project_id,
AwoooPActiveRevision.contract_family == "agent",
AwoooPActiveRevision.contract_id == ctx.agent_id,
)
)
active = result.scalar_one_or_none()
if active is None:
raise GateAgentError(
f"agent '{ctx.agent_id}''{ctx.project_id}' 無 active contract"
)
gate_result.gate2_agent = True
async def _gate3_tool(
self, ctx: GatewayContext, gate_result: GateCheckResult
) -> tuple[AwoooPMcpToolRegistry, AwoooPMcpGrant]:
"""Gate 3tool 在白名單 + grant 有效(未到期、未撤銷)"""
now = datetime.now(timezone.utc)
# 查 tool registry
tool_result = await self._db.execute(
select(AwoooPMcpToolRegistry).where(
AwoooPMcpToolRegistry.project_id == ctx.project_id,
AwoooPMcpToolRegistry.tool_name == ctx.tool_name,
AwoooPMcpToolRegistry.is_active.is_(True),
)
)
tool_row = tool_result.scalar_one_or_none()
if tool_row is None:
raise GateToolError(f"tool '{ctx.tool_name}' 不在白名單")
# 查 grantscope 必須包含 required_scope
grant_result = await self._db.execute(
select(AwoooPMcpGrant).where(
AwoooPMcpGrant.project_id == ctx.project_id,
AwoooPMcpGrant.agent_id == ctx.agent_id,
AwoooPMcpGrant.tool_id == tool_row.tool_id,
AwoooPMcpGrant.is_revoked.is_(False),
)
)
grant_row = grant_result.scalar_one_or_none()
if grant_row is None:
raise GateToolError(
f"agent '{ctx.agent_id}' 對 tool '{ctx.tool_name}' 無有效 grant"
)
if grant_row.expires_at is not None and grant_row.expires_at < now:
raise GateToolError(
f"agent '{ctx.agent_id}' 對 tool '{ctx.tool_name}' 的 grant 已到期"
)
# scope 檢查required_scope 必須在 granted_scopes 中
granted_scopes: list[str] = grant_row.granted_scopes or []
if ctx.required_scope not in granted_scopes:
raise GateToolError(
f"grant 未包含所需 scope '{ctx.required_scope}'(有:{granted_scopes}"
)
gate_result.gate3_tool = True
return tool_row, grant_row
async def _gate4_environment(
self,
ctx: GatewayContext,
tool_row: AwoooPMcpToolRegistry | None,
gate_result: GateCheckResult,
) -> None:
"""Gate 4environment 標籤匹配shadow mode 強制放行)"""
if ctx.is_shadow:
gate_result.gate4_env = True
return
if tool_row is None:
gate_result.gate4_env = True
return
required_tags: dict[str, str] = tool_row.environment_tags or {}
for k, v in required_tags.items():
if ctx.environment.get(k) != v:
raise GateEnvironmentError(
f"environment tag '{k}' 期望 '{v}',實際 '{ctx.environment.get(k)}'"
)
gate_result.gate4_env = True
async def _gate5_approval(
self,
ctx: GatewayContext,
grant_row: AwoooPMcpGrant | None,
gate_result: GateCheckResult,
) -> None:
"""Gate 5需要 approval 時,檢查 Redis multi_sigshadow + read scope 直接放行)"""
# shadow mode 或 read scope 不需 approval
if ctx.is_shadow or ctx.required_scope == "read":
gate_result.gate5_approval = True
return
# write/admin scope 需要檢查 approval
if ctx.run_id is None:
raise GateApprovalError("write/admin 操作需要 run_idapproval 追蹤用)")
try:
import aioredis
from src.core.config import settings
redis = aioredis.from_url(settings.REDIS_URL)
approval_key = f"mcp_approval:{ctx.project_id}:{ctx.agent_id}:{ctx.tool_name}:{ctx.run_id}"
approved = await redis.get(approval_key)
await redis.aclose()
except Exception as exc:
logger.warning(
"mcp_gate5_redis_error",
project_id=ctx.project_id,
tool_name=ctx.tool_name,
error=str(exc),
)
# Redis 失敗時 fail-closed不放行
raise GateApprovalError(f"approval Redis 查詢失敗: {exc}") from exc
if not approved:
raise GateApprovalError(
f"tool '{ctx.tool_name}' 需要 approvalkey={approval_key}"
)
gate_result.gate5_approval = True
# ── 執行層 ───────────────────────────────────────────────────────────────
async def _execute_tool(
self,
ctx: GatewayContext,
tool_row: AwoooPMcpToolRegistry | None,
parameters: dict[str, Any],
) -> MCPToolResult:
"""呼叫底層 MCP provider 執行工具"""
registry = get_provider_registry()
provider = registry.get(ctx.tool_name) or registry.get(
tool_row.tool_name if tool_row else ctx.tool_name
)
# 找不到 provider → 回傳 shadow no-op
if provider is None:
logger.warning(
"mcp_gateway_no_provider",
tool_name=ctx.tool_name,
is_shadow=ctx.is_shadow,
)
return MCPToolResult(
success=True,
execution_id=f"shadow-noop-{ctx.tool_name}",
output={"shadow": True, "message": "no provider registered, shadow no-op"},
)
audit_params = dict(parameters)
audit_params["_mcp_audit"] = {
"project_id": ctx.project_id,
"agent_id": ctx.agent_id,
"run_id": str(ctx.run_id) if ctx.run_id else None,
"trace_id": ctx.trace_id,
}
return await provider.execute(ctx.tool_name, audit_params)
# ── Audit log ─────────────────────────────────────────────────────────────
async def _write_audit(
self,
*,
ctx: GatewayContext,
tool_row: AwoooPMcpToolRegistry | None,
parameters: dict[str, Any],
result: MCPToolResult | None,
gate_result: GateCheckResult,
result_status: str,
block_gate: int | None,
block_reason: str | None,
latency_ms: int,
) -> None:
"""寫 awooop_mcp_gateway_audit — 只寫 hash不寫明文 input/output"""
try:
input_hash = hashlib.sha256(
json.dumps(parameters, sort_keys=True, default=str).encode()
).hexdigest()
output_hash: str | None = None
if result is not None:
output_hash = hashlib.sha256(
json.dumps(result.output, sort_keys=True, default=str).encode()
).hexdigest()
audit = AwoooPMcpGatewayAudit(
project_id=ctx.project_id,
run_id=ctx.run_id,
trace_id=ctx.trace_id,
agent_id=ctx.agent_id,
tool_id=tool_row.tool_id if tool_row else None, # type: ignore[arg-type]
tool_name=ctx.tool_name,
input_hash=input_hash,
output_hash=output_hash,
gate_result=gate_result.as_dict(),
result_status=result_status,
block_gate=block_gate,
block_reason=block_reason,
latency_ms=latency_ms,
)
if tool_row is not None:
self._db.add(audit)
await self._db.flush()
except Exception as exc:
logger.warning(
"mcp_gateway_audit_write_failed",
project_id=ctx.project_id,
tool_name=ctx.tool_name,
error=str(exc),
)
# ─────────────────────────────────────────────────────────────────────────────
# 便捷函數
# ─────────────────────────────────────────────────────────────────────────────
async def gateway_call(
db: AsyncSession,
*,
project_id: str,
agent_id: str,
tool_name: str,
parameters: dict[str, Any],
run_id: UUID | None = None,
trace_id: str | None = None,
is_shadow: bool = True,
required_scope: str = "read",
environment: dict[str, str] | None = None,
) -> MCPToolResult:
"""
Stateless 便捷函數:建立 GatewayContext + 執行 McpGateway.call()。
"""
ctx = GatewayContext(
project_id=project_id,
agent_id=agent_id,
tool_name=tool_name,
run_id=run_id,
trace_id=trace_id,
is_shadow=is_shadow,
required_scope=required_scope,
environment=environment or {},
)
return await McpGateway(db).call(ctx, parameters)