feat(awooop): Phase 1-8 完整實作 — AwoooP Agent Platform 六平面架構
Some checks failed
run-migration / migrate (push) Failing after 59s
Code Review / ai-code-review (push) Successful in 1m8s
Type Sync Check / check-type-sync (push) Successful in 2m27s

## Phase 1-3: Control Plane + Contract System
- awooop_phase1_control_plane_2026-05-04.sql: 12 張核心表 + RLS
- awooop_phase1_batch1_rls_2026-05-04.sql: 全部 FORCE RLS + GRANT
- packages/awooop-contracts/: 六合約 JSON Schema + golden fixtures
- src/models/awooop_contracts.py: Pydantic v2 contract models(extra=forbid)
- src/repositories/contract_repository.py: contract lifecycle(draft→published→active)
- src/services/contract_service.py: HMAC publish sig + Redis multi-sig activate
- src/services/schema_validator.py: LLM output validator(retry×3, E-SCHEMA-001)

## Phase 2: Tenant Isolation
- awooop_phase2_budget_ledger_2026-05-04.sql: budget_ledger + RLS
- src/services/budget_service.py: Token Budget Hard Kill 三層防線
- src/core/context.py: PROJECT_ID ContextVar(31 background loop 自動繼承)
- src/db/base.py + models.py: project_id 欄位 + RLS set_config 注入
- src/hermes/nl_gateway.py: project_id Redis key 前綴(Phase A 雙寫)
- src/services/anomaly_counter.py: per-project 改造(Phase A fallback)

## Phase 4: Platform Shell in Shadow Mode
- awooop_phase4_run_state_2026-05-04.sql: run_state + step_journal + idempotency
- src/services/run_state_machine.py: 8-state FSM + SKIP LOCKED + stale reaper
- src/services/platform_runtime.py: UUID v7 + W3C trace_id + shadow_execute
- src/services/audit_sink.py: PII/secret redaction 9 patterns
- src/api/v1/platform/runs.py: POST/GET /v1/platform/runs(Router→Service 架構)
- src/workers/platform_worker.py: SKIP LOCKED worker + heartbeat + reaper loop
- src/main.py: platform router + lifespan worker start/stop

## Phase 5: MCP Gateway 五閘門
- awooop_phase5_mcp_gateway_2026-05-04.sql: 4 表 + RLS
- src/plugins/mcp/gateway.py: McpGateway(Gate 1~5, E-MCP-GATE-001~009)
- src/plugins/mcp/redaction_middleware.py: 雙層 redaction + 16K 截斷
- src/plugins/mcp/registry.py: __provider name mangling(ADR-116)
- src/plugins/mcp/credential_resolver.py: k8s secret ref 解析
- tests/test_mcp_credential_isolation.py: 10 個迴歸測試(secret leak 防再現)

## Phase 6-8: EwoooC + Channel Hub + Approval Token
- awooop_phase6_ewoooc_onboarding_2026-05-04.sql: ewoooc tenant + 4 read-only MCP tools
- awooop_phase7_channel_hub_2026-05-04.sql: conversation_event + outbound_message
- src/services/provider_proxy.py: ProviderProxy + PlatformEnvelope(ADR-115)
- src/services/channel_hub.py: Telegram inbound mirror + Progressive Feedback(30s)
- src/services/awooop_approval_token.py: HS256 + jti NX replay 防護 + suggest mode

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Your Name
2026-05-04 19:31:53 +08:00
parent 0a90dab1e9
commit 8629ac709b
69 changed files with 10999 additions and 77 deletions

View File

@@ -0,0 +1,136 @@
"""
MCP Credential Resolver — k8s Secret 參照解析
=============================================
AwoooP Phase 5.5: ADR-118 Credential Isolation
2026-05-04 ogt + Claude Sonnet 4.6
設計原則2026-04-18 Secret Leak 事故教訓):
- 明文 credential 絕不進入 audit log / LLM context
- Gateway 只傳 k8s secret ref格式"namespace/secret-name#key"
- 真實 secret value 在記憶體中短暫存在,使用後立刻清除
- 回傳給 caller 時只提供「遮罩版」(前 4 字元 + *** + 後 4 字元)
- sha256(actual_value) 記入 awooop_mcp_credential_refs.value_sha256指紋不可還原
k8s secret ref 格式:
"namespace/secret-name#key"
例:"awoooi/telegram-bot#TELEGRAM_BOT_TOKEN"
解析方式(兩種,依環境):
1. k8s in-cluster使用 kubernetes asyncclientprod
2. 本機開發 fallback讀 AWOOOP_DEV_SECRETS_JSON 環境變數dev only
"""
from __future__ import annotations
import hashlib
import os
import re
import structlog
logger = structlog.get_logger(__name__)
# k8s secret ref 格式正則(與 DB CHECK 一致)
_K8S_REF_RE = re.compile(r"^([a-z0-9-]+)/([a-z0-9-]+)#([a-zA-Z0-9_-]+)$")
# dev fallbackJSON 格式 {"namespace/secret-name#key": "actual_value"}
_DEV_SECRETS_ENV = "AWOOOP_DEV_SECRETS_JSON"
class CredentialResolutionError(Exception):
error_code = "E-MCP-GATE-009"
def _mask_secret(value: str) -> str:
"""回傳遮罩版:前 4 + *** + 後 4若長度 < 8 則全遮罩)"""
if len(value) < 8:
return "***"
return f"{value[:4]}***{value[-4:]}"
def _sha256_secret(value: str) -> str:
return hashlib.sha256(value.encode()).hexdigest()
async def resolve_k8s_secret(ref: str) -> tuple[str, str, str]:
"""
解析 k8s secret ref回傳 (actual_value, masked_value, sha256)。
actual_value明文caller 必須在使用後清除(不可存入任何持久化層)
masked_value供 log / response 使用
sha256供 awooop_mcp_credential_refs.value_sha256 記錄
Raises:
CredentialResolutionError: ref 格式錯誤或 secret 不存在
"""
m = _K8S_REF_RE.match(ref)
if not m:
raise CredentialResolutionError(
f"k8s secret ref 格式錯誤(期望 'namespace/secret-name#key'{ref!r}"
)
namespace, secret_name, key = m.group(1), m.group(2), m.group(3)
# Dev fallback讀環境變數
dev_json = os.environ.get(_DEV_SECRETS_ENV)
if dev_json:
try:
import json
dev_secrets: dict[str, str] = json.loads(dev_json)
value = dev_secrets.get(ref)
if value is None:
raise CredentialResolutionError(
f"dev secrets 中找不到 ref={ref!r}"
)
logger.debug("credential_resolved_dev", ref=ref)
return value, _mask_secret(value), _sha256_secret(value)
except CredentialResolutionError:
raise
except Exception as exc:
raise CredentialResolutionError(
f"AWOOOP_DEV_SECRETS_JSON 解析失敗: {exc}"
) from exc
# Productionk8s in-cluster
try:
from kubernetes_asyncio import client, config # type: ignore[import]
from kubernetes_asyncio.client import CoreV1Api # type: ignore[import]
await config.load_incluster_config()
async with client.ApiClient() as api:
v1 = CoreV1Api(api)
secret = await v1.read_namespaced_secret(secret_name, namespace)
if secret.data is None or key not in secret.data:
raise CredentialResolutionError(
f"k8s secret '{namespace}/{secret_name}' 中找不到 key='{key}'"
)
import base64
encoded = secret.data[key]
value = base64.b64decode(encoded).decode()
logger.info(
"credential_resolved_k8s",
namespace=namespace,
secret_name=secret_name,
key=key,
masked=_mask_secret(value),
)
return value, _mask_secret(value), _sha256_secret(value)
except CredentialResolutionError:
raise
except ImportError:
raise CredentialResolutionError(
"kubernetes_asyncio 未安裝,且未設定 AWOOOP_DEV_SECRETS_JSONdev fallback"
)
except Exception as exc:
logger.exception(
"credential_resolution_k8s_failed",
ref=ref,
error=str(exc),
)
raise CredentialResolutionError(
f"k8s secret 解析失敗({namespace}/{secret_name}#{key}: {exc}"
) from exc

View File

@@ -0,0 +1,507 @@
"""
MCP Gateway — 五閘門 Enforcement Service
=========================================
AwoooP Phase 5.2: ADR-116 五閘門強制執行
2026-05-04 ogt + Claude Sonnet 4.6
五閘門定義(依序,任一失敗即阻斷):
Gate 1 — Projectproject_id 在 awooop_projects 且 migration_mode != 'legacy_awoooi_default'
Gate 2 — Agentagent_id 在 awooop_agents 且 status = 'active'
Gate 3 — Tooltool_id 在 awooop_mcp_tool_registry 且 grant 存在且未到期
Gate 4 — Environmenttool.environment_tags 與 run context 匹配shadow mode 強制放行)
Gate 5 — Approval工具 scope 需要 approval 時,檢查 multi_sig 是否已核准
錯誤碼E-MCP-GATE-XXX
E-MCP-GATE-001 Gate 1 project 不存在或 migration_mode 不符
E-MCP-GATE-002 Gate 2 agent 不存在或未啟用
E-MCP-GATE-003 Gate 3 tool 不在白名單或 grant 不存在/已到期/已撤銷
E-MCP-GATE-004 Gate 4 environment 標籤不匹配(非 shadow mode
E-MCP-GATE-005 Gate 5 approval 尚未取得
E-MCP-GATE-009 credential 解析失敗k8s secret 取不到)
使用方式:
from src.plugins.mcp.gateway import McpGateway, GatewayContext
ctx = GatewayContext(
project_id="awoooi",
agent_id="my-agent",
tool_name="kubectl_get",
run_id=run_id,
trace_id=trace_id,
is_shadow=True,
)
result = await McpGateway(db).call(ctx, parameters={"namespace": "default"})
"""
from __future__ import annotations
import hashlib
import json
import time
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any
from uuid import UUID
import structlog
from sqlalchemy import select, text
from sqlalchemy.ext.asyncio import AsyncSession
from src.db.awooop_models import (
AwoooPActiveRevision,
AwoooPMcpGatewayAudit,
AwoooPMcpGrant,
AwoooPMcpToolRegistry,
AwoooPProject,
)
from src.plugins.mcp.interfaces import MCPToolResult
from src.plugins.mcp.registry import get_provider_registry
logger = structlog.get_logger(__name__)
# ─────────────────────────────────────────────────────────────────────────────
# 錯誤定義
# ─────────────────────────────────────────────────────────────────────────────
class McpGatewayError(Exception):
"""所有 Gateway 攔截錯誤的基礎類別"""
def __init__(self, error_code: str, message: str, gate: int) -> None:
super().__init__(message)
self.error_code = error_code
self.gate = gate
class GateProjectError(McpGatewayError):
def __init__(self, msg: str = "project 不存在或 migration_mode 不符") -> None:
super().__init__("E-MCP-GATE-001", msg, gate=1)
class GateAgentError(McpGatewayError):
def __init__(self, msg: str = "agent 不存在或未啟用") -> None:
super().__init__("E-MCP-GATE-002", msg, gate=2)
class GateToolError(McpGatewayError):
def __init__(self, msg: str = "tool 不在白名單或 grant 失效") -> None:
super().__init__("E-MCP-GATE-003", msg, gate=3)
class GateEnvironmentError(McpGatewayError):
def __init__(self, msg: str = "environment 標籤不匹配") -> None:
super().__init__("E-MCP-GATE-004", msg, gate=4)
class GateApprovalError(McpGatewayError):
def __init__(self, msg: str = "approval 尚未取得") -> None:
super().__init__("E-MCP-GATE-005", msg, gate=5)
class CredentialResolutionError(McpGatewayError):
def __init__(self, msg: str = "credential 解析失敗") -> None:
super().__init__("E-MCP-GATE-009", msg, gate=0)
# ─────────────────────────────────────────────────────────────────────────────
# Gateway Context每次 call 一個)
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class GatewayContext:
project_id: str
agent_id: str
tool_name: str
run_id: UUID | None = None
trace_id: str | None = None
is_shadow: bool = True # shadow modeGate 4/5 放行,不執行 destructive
environment: dict[str, str] = field(default_factory=dict) # e.g. {"env": "prod"}
required_scope: str = "read" # "read" | "write" | "admin"
@dataclass
class GateCheckResult:
gate1_project: bool = False
gate2_agent: bool = False
gate3_tool: bool = False
gate4_env: bool = False
gate5_approval: bool = False
def as_dict(self) -> dict[str, bool]:
return {
"gate1_project": self.gate1_project,
"gate2_agent": self.gate2_agent,
"gate3_tool": self.gate3_tool,
"gate4_env": self.gate4_env,
"gate5_approval": self.gate5_approval,
}
@property
def all_passed(self) -> bool:
return all([
self.gate1_project,
self.gate2_agent,
self.gate3_tool,
self.gate4_env,
self.gate5_approval,
])
# ─────────────────────────────────────────────────────────────────────────────
# McpGateway
# ─────────────────────────────────────────────────────────────────────────────
class McpGateway:
"""
MCP Gateway五閘門 enforcement + audit log + credential isolation。
每個 gateway call 都寫一筆 awooop_mcp_gateway_audit。
"""
def __init__(self, db: AsyncSession) -> None:
self._db = db
async def call(
self,
ctx: GatewayContext,
parameters: dict[str, Any],
) -> MCPToolResult:
"""
執行五閘門檢查後呼叫底層 MCP provider。
任一閘門失敗 → raise McpGatewayError + 寫 blocked audit。
"""
started = time.monotonic()
gate_result = GateCheckResult()
tool_row: AwoooPMcpToolRegistry | None = None
grant_row: AwoooPMcpGrant | None = None
try:
# Gate 1 — Project
tool_row, grant_row = await self._gate1_project(ctx, gate_result)
# Gate 2 — Agent
await self._gate2_agent(ctx, gate_result)
# Gate 3 — Tool + Grant
tool_row, grant_row = await self._gate3_tool(ctx, gate_result)
# Gate 4 — Environmentshadow mode 直接放行)
await self._gate4_environment(ctx, tool_row, gate_result)
# Gate 5 — Approvalshadow mode + scope=read 直接放行)
await self._gate5_approval(ctx, grant_row, gate_result)
except McpGatewayError as exc:
latency = int((time.monotonic() - started) * 1000)
await self._write_audit(
ctx=ctx,
tool_row=tool_row,
parameters=parameters,
result=None,
gate_result=gate_result,
result_status="blocked",
block_gate=exc.gate,
block_reason=f"{exc.error_code}: {exc}",
latency_ms=latency,
)
raise
# 五閘通過 → 執行 tool
result: MCPToolResult | None = None
result_status = "failed"
try:
result = await self._execute_tool(ctx, tool_row, parameters)
result_status = "success" if result.success else "failed"
return result
except Exception as exc:
logger.exception(
"mcp_gateway_execution_error",
project_id=ctx.project_id,
tool_name=ctx.tool_name,
error=str(exc),
)
raise
finally:
latency = int((time.monotonic() - started) * 1000)
await self._write_audit(
ctx=ctx,
tool_row=tool_row,
parameters=parameters,
result=result,
gate_result=gate_result,
result_status=result_status,
block_gate=None,
block_reason=None,
latency_ms=latency,
)
# ── 五閘門實作 ────────────────────────────────────────────────────────────
async def _gate1_project(
self, ctx: GatewayContext, gate_result: GateCheckResult
) -> tuple[AwoooPMcpToolRegistry | None, AwoooPMcpGrant | None]:
"""Gate 1project 必須存在且 migration_mode != 'legacy_awoooi_default'"""
result = await self._db.execute(
select(AwoooPProject).where(
AwoooPProject.project_id == ctx.project_id,
AwoooPProject.migration_mode != "legacy_awoooi_default",
)
)
project = result.scalar_one_or_none()
if project is None:
raise GateProjectError(
f"project '{ctx.project_id}' 不存在或 migration_mode=legacy_awoooi_default"
)
gate_result.gate1_project = True
return None, None
async def _gate2_agent(
self, ctx: GatewayContext, gate_result: GateCheckResult
) -> None:
"""Gate 2agent 必須在 awooop_active_revisions 中有 active contractfamily='agent'"""
result = await self._db.execute(
select(AwoooPActiveRevision).where(
AwoooPActiveRevision.project_id == ctx.project_id,
AwoooPActiveRevision.contract_family == "agent",
AwoooPActiveRevision.contract_id == ctx.agent_id,
)
)
active = result.scalar_one_or_none()
if active is None:
raise GateAgentError(
f"agent '{ctx.agent_id}''{ctx.project_id}' 無 active contract"
)
gate_result.gate2_agent = True
async def _gate3_tool(
self, ctx: GatewayContext, gate_result: GateCheckResult
) -> tuple[AwoooPMcpToolRegistry, AwoooPMcpGrant]:
"""Gate 3tool 在白名單 + grant 有效(未到期、未撤銷)"""
now = datetime.now(timezone.utc)
# 查 tool registry
tool_result = await self._db.execute(
select(AwoooPMcpToolRegistry).where(
AwoooPMcpToolRegistry.project_id == ctx.project_id,
AwoooPMcpToolRegistry.tool_name == ctx.tool_name,
AwoooPMcpToolRegistry.is_active.is_(True),
)
)
tool_row = tool_result.scalar_one_or_none()
if tool_row is None:
raise GateToolError(f"tool '{ctx.tool_name}' 不在白名單")
# 查 grantscope 必須包含 required_scope
grant_result = await self._db.execute(
select(AwoooPMcpGrant).where(
AwoooPMcpGrant.project_id == ctx.project_id,
AwoooPMcpGrant.agent_id == ctx.agent_id,
AwoooPMcpGrant.tool_id == tool_row.tool_id,
AwoooPMcpGrant.is_revoked.is_(False),
)
)
grant_row = grant_result.scalar_one_or_none()
if grant_row is None:
raise GateToolError(
f"agent '{ctx.agent_id}' 對 tool '{ctx.tool_name}' 無有效 grant"
)
if grant_row.expires_at is not None and grant_row.expires_at < now:
raise GateToolError(
f"agent '{ctx.agent_id}' 對 tool '{ctx.tool_name}' 的 grant 已到期"
)
# scope 檢查required_scope 必須在 granted_scopes 中
granted_scopes: list[str] = grant_row.granted_scopes or []
if ctx.required_scope not in granted_scopes:
raise GateToolError(
f"grant 未包含所需 scope '{ctx.required_scope}'(有:{granted_scopes}"
)
gate_result.gate3_tool = True
return tool_row, grant_row
async def _gate4_environment(
self,
ctx: GatewayContext,
tool_row: AwoooPMcpToolRegistry | None,
gate_result: GateCheckResult,
) -> None:
"""Gate 4environment 標籤匹配shadow mode 強制放行)"""
if ctx.is_shadow:
gate_result.gate4_env = True
return
if tool_row is None:
gate_result.gate4_env = True
return
required_tags: dict[str, str] = tool_row.environment_tags or {}
for k, v in required_tags.items():
if ctx.environment.get(k) != v:
raise GateEnvironmentError(
f"environment tag '{k}' 期望 '{v}',實際 '{ctx.environment.get(k)}'"
)
gate_result.gate4_env = True
async def _gate5_approval(
self,
ctx: GatewayContext,
grant_row: AwoooPMcpGrant | None,
gate_result: GateCheckResult,
) -> None:
"""Gate 5需要 approval 時,檢查 Redis multi_sigshadow + read scope 直接放行)"""
# shadow mode 或 read scope 不需 approval
if ctx.is_shadow or ctx.required_scope == "read":
gate_result.gate5_approval = True
return
# write/admin scope 需要檢查 approval
if ctx.run_id is None:
raise GateApprovalError("write/admin 操作需要 run_idapproval 追蹤用)")
try:
import aioredis
from src.core.config import settings
redis = aioredis.from_url(settings.REDIS_URL)
approval_key = f"mcp_approval:{ctx.project_id}:{ctx.agent_id}:{ctx.tool_name}:{ctx.run_id}"
approved = await redis.get(approval_key)
await redis.aclose()
except Exception as exc:
logger.warning(
"mcp_gate5_redis_error",
project_id=ctx.project_id,
tool_name=ctx.tool_name,
error=str(exc),
)
# Redis 失敗時 fail-closed不放行
raise GateApprovalError(f"approval Redis 查詢失敗: {exc}") from exc
if not approved:
raise GateApprovalError(
f"tool '{ctx.tool_name}' 需要 approvalkey={approval_key}"
)
gate_result.gate5_approval = True
# ── 執行層 ───────────────────────────────────────────────────────────────
async def _execute_tool(
self,
ctx: GatewayContext,
tool_row: AwoooPMcpToolRegistry | None,
parameters: dict[str, Any],
) -> MCPToolResult:
"""呼叫底層 MCP provider 執行工具"""
registry = get_provider_registry()
provider = registry.get(ctx.tool_name) or registry.get(
tool_row.tool_name if tool_row else ctx.tool_name
)
# 找不到 provider → 回傳 shadow no-op
if provider is None:
logger.warning(
"mcp_gateway_no_provider",
tool_name=ctx.tool_name,
is_shadow=ctx.is_shadow,
)
return MCPToolResult(
success=True,
execution_id=f"shadow-noop-{ctx.tool_name}",
output={"shadow": True, "message": "no provider registered, shadow no-op"},
)
audit_params = dict(parameters)
audit_params["_mcp_audit"] = {
"project_id": ctx.project_id,
"agent_id": ctx.agent_id,
"run_id": str(ctx.run_id) if ctx.run_id else None,
"trace_id": ctx.trace_id,
}
return await provider.execute(ctx.tool_name, audit_params)
# ── Audit log ─────────────────────────────────────────────────────────────
async def _write_audit(
self,
*,
ctx: GatewayContext,
tool_row: AwoooPMcpToolRegistry | None,
parameters: dict[str, Any],
result: MCPToolResult | None,
gate_result: GateCheckResult,
result_status: str,
block_gate: int | None,
block_reason: str | None,
latency_ms: int,
) -> None:
"""寫 awooop_mcp_gateway_audit — 只寫 hash不寫明文 input/output"""
try:
input_hash = hashlib.sha256(
json.dumps(parameters, sort_keys=True, default=str).encode()
).hexdigest()
output_hash: str | None = None
if result is not None:
output_hash = hashlib.sha256(
json.dumps(result.output, sort_keys=True, default=str).encode()
).hexdigest()
audit = AwoooPMcpGatewayAudit(
project_id=ctx.project_id,
run_id=ctx.run_id,
trace_id=ctx.trace_id,
agent_id=ctx.agent_id,
tool_id=tool_row.tool_id if tool_row else None, # type: ignore[arg-type]
tool_name=ctx.tool_name,
input_hash=input_hash,
output_hash=output_hash,
gate_result=gate_result.as_dict(),
result_status=result_status,
block_gate=block_gate,
block_reason=block_reason,
latency_ms=latency_ms,
)
if tool_row is not None:
self._db.add(audit)
await self._db.flush()
except Exception as exc:
logger.warning(
"mcp_gateway_audit_write_failed",
project_id=ctx.project_id,
tool_name=ctx.tool_name,
error=str(exc),
)
# ─────────────────────────────────────────────────────────────────────────────
# 便捷函數
# ─────────────────────────────────────────────────────────────────────────────
async def gateway_call(
db: AsyncSession,
*,
project_id: str,
agent_id: str,
tool_name: str,
parameters: dict[str, Any],
run_id: UUID | None = None,
trace_id: str | None = None,
is_shadow: bool = True,
required_scope: str = "read",
environment: dict[str, str] | None = None,
) -> MCPToolResult:
"""
Stateless 便捷函數:建立 GatewayContext + 執行 McpGateway.call()。
"""
ctx = GatewayContext(
project_id=project_id,
agent_id=agent_id,
tool_name=tool_name,
run_id=run_id,
trace_id=trace_id,
is_shadow=is_shadow,
required_scope=required_scope,
environment=environment or {},
)
return await McpGateway(db).call(ctx, parameters)

View File

@@ -0,0 +1,159 @@
"""
MCP Redaction Middleware — 雙層 PII/Secret Redaction
=====================================================
AwoooP Phase 5.3: ADR-116 P1-04 + P1-09
2026-05-04 ogt + Claude Sonnet 4.6
MCP tool call 的 input/output 必須經過雙層 redaction
Layer 1audit_sink— 寫入 audit log 前的 sanitization欄位黑名單 + pattern 攔截)
Layer 2本層 — MCP tool call input/output 專用:
- 移除已知 secret 欄位_mcp_audit 注入的 context
- 對 output 套用 audit_sink 的完整 redaction patterns
- 限制 output 大小(防 prompt stuffing
設計原則ADR-118 credential isolation 延伸):
- MCP tool 的 output 可能含 k8s secret 值 → 必須在 output 進入 LLM context 前 redact
- 只有「安全的」output 才能被 platform_runtime.shadow_execute 使用
- input credential 欄位(如 k8s_value在送入 provider 前清除credential isolation
雙層保障的必要性:
- audit_sink 保護的是 audit log DB
- 本 middleware 保護的是 LLM context + gateway audit hash
- 兩者防護對象不同,不可互相替代
"""
from __future__ import annotations
import hashlib
import json
import re
from typing import Any
import structlog
from src.services.audit_sink import _BLOCKED_FIELD_NAMES, _REDACTION_PATTERNS, _redact_string
logger = structlog.get_logger(__name__)
# MCP output 進入 LLM context 的最大字元數(防 prompt stuffing
_MCP_OUTPUT_MAX_CHARS = 16_000
# MCP gateway 注入的 audit context key送 provider 前移除)
_MCP_AUDIT_KEY = "_mcp_audit"
# MCP credential 欄位名稱Gate 5 credential isolation — 在 input 中清除)
_MCP_CREDENTIAL_FIELDS = frozenset({
"k8s_value", "secret_value", "credential", "credential_value",
"token_value", "api_key_value", "private_key_value",
})
def redact_mcp_input(parameters: dict[str, Any]) -> dict[str, Any]:
"""
Layer 2 Input Redaction清理 MCP tool call 的 input parameters。
1. 移除 _mcp_auditaudit context不應傳給 provider
2. 移除 credential 欄位credential isolation
3. 對剩餘的 string values 套用 audit_sink patterns
"""
cleaned: dict[str, Any] = {}
for key, value in parameters.items():
# 移除 audit context injection
if key == _MCP_AUDIT_KEY:
continue
# credential isolation — 不讓 credential 明文流向 provider
if key.lower() in _MCP_CREDENTIAL_FIELDS:
cleaned[key] = "[REDACTED:CREDENTIAL_ISOLATION]"
continue
# 欄位名稱黑名單(與 audit_sink 對齊)
if key.lower() in _BLOCKED_FIELD_NAMES:
cleaned[key] = "[REDACTED:BLOCKED_FIELD]"
continue
# string value — 套用 pattern redaction
if isinstance(value, str):
cleaned[key] = _redact_string(value)
elif isinstance(value, dict):
cleaned[key] = redact_mcp_input(value)
elif isinstance(value, list):
cleaned[key] = [
redact_mcp_input(item) if isinstance(item, dict)
else (_redact_string(item) if isinstance(item, str) else item)
for item in value
]
else:
cleaned[key] = value
return cleaned
def redact_mcp_output(output: Any) -> Any:
"""
Layer 2 Output Redaction清理 MCP tool call 的 output。
1. 對 output dict / string 套用 audit_sink patterns
2. 限制 output 大小(防 prompt stuffing
3. 回傳清理後的 output供 LLM context 使用)
"""
if output is None:
return None
if isinstance(output, str):
redacted = _redact_string(output)
if len(redacted) > _MCP_OUTPUT_MAX_CHARS:
redacted = redacted[:_MCP_OUTPUT_MAX_CHARS] + f"\n[TRUNCATED:{len(output)} chars]"
return redacted
if isinstance(output, dict):
return _redact_output_dict(output)
if isinstance(output, list):
result = []
total = 0
for item in output:
if total > _MCP_OUTPUT_MAX_CHARS:
result.append(f"[TRUNCATED:{len(output)} items total]")
break
cleaned = redact_mcp_output(item)
serialized = json.dumps(cleaned, ensure_ascii=False, default=str)
total += len(serialized)
result.append(cleaned)
return result
return output
def _redact_output_dict(d: dict[str, Any], depth: int = 0) -> dict[str, Any]:
"""遞迴 redact output dict"""
if depth > 8:
return {"[MAX_DEPTH]": True}
result: dict[str, Any] = {}
for key, value in d.items():
# 欄位名稱黑名單
if key.lower() in _BLOCKED_FIELD_NAMES:
result[key] = "[REDACTED:BLOCKED_FIELD]"
continue
if isinstance(value, str):
result[key] = _redact_string(value)
elif isinstance(value, dict):
result[key] = _redact_output_dict(value, depth + 1)
elif isinstance(value, list):
result[key] = [
_redact_output_dict(item, depth + 1) if isinstance(item, dict)
else (_redact_string(item) if isinstance(item, str) else item)
for item in value
]
else:
result[key] = value
return result
def compute_safe_hash(data: Any) -> str:
"""計算 redacted data 的 sha256供 gateway audit 使用)"""
serialized = json.dumps(data, sort_keys=True, ensure_ascii=False, default=str)
return hashlib.sha256(serialized.encode()).hexdigest()

View File

@@ -21,18 +21,20 @@ class AuditedMCPToolProvider(MCPToolProvider):
"""Provider wrapper that writes every MCP tool call to the audit subsystem."""
def __init__(self, provider: MCPToolProvider) -> None:
self._provider = provider
# __provider 使用 Python name mangling_AuditedMCPToolProvider__provider
# 防止 caller 透過 wrapper._provider 直接存取 inner providerADR-116 封裝要求)
self.__provider = provider # noqa: SLF001 — intentional name mangling
@property
def name(self) -> str:
return self._provider.name
return self.__provider.name
@property
def enabled(self) -> bool:
return self._provider.enabled
return self.__provider.enabled
async def list_tools(self) -> list[MCPTool]:
return await self._provider.list_tools()
return await self.__provider.list_tools()
async def execute(
self,
@@ -49,7 +51,7 @@ class AuditedMCPToolProvider(MCPToolProvider):
started = monotonic_ms()
result: MCPToolResult | None = None
try:
result = await self._provider.execute(tool_name, provider_parameters)
result = await self.__provider.execute(tool_name, provider_parameters)
return result
finally:
duration_ms = monotonic_ms() - started
@@ -68,7 +70,7 @@ class AuditedMCPToolProvider(MCPToolProvider):
)
async def health_check(self) -> bool:
return await self._provider.health_check()
return await self.__provider.health_check()
class ProviderRegistry: