Files
awoooi/apps/api/tests/test_mcp_credential_isolation.py
Your Name 8629ac709b
Some checks failed
run-migration / migrate (push) Failing after 59s
Code Review / ai-code-review (push) Successful in 1m8s
Type Sync Check / check-type-sync (push) Successful in 2m27s
feat(awooop): Phase 1-8 完整實作 — AwoooP Agent Platform 六平面架構
## Phase 1-3: Control Plane + Contract System
- awooop_phase1_control_plane_2026-05-04.sql: 12 張核心表 + RLS
- awooop_phase1_batch1_rls_2026-05-04.sql: 全部 FORCE RLS + GRANT
- packages/awooop-contracts/: 六合約 JSON Schema + golden fixtures
- src/models/awooop_contracts.py: Pydantic v2 contract models(extra=forbid)
- src/repositories/contract_repository.py: contract lifecycle(draft→published→active)
- src/services/contract_service.py: HMAC publish sig + Redis multi-sig activate
- src/services/schema_validator.py: LLM output validator(retry×3, E-SCHEMA-001)

## Phase 2: Tenant Isolation
- awooop_phase2_budget_ledger_2026-05-04.sql: budget_ledger + RLS
- src/services/budget_service.py: Token Budget Hard Kill 三層防線
- src/core/context.py: PROJECT_ID ContextVar(31 background loop 自動繼承)
- src/db/base.py + models.py: project_id 欄位 + RLS set_config 注入
- src/hermes/nl_gateway.py: project_id Redis key 前綴(Phase A 雙寫)
- src/services/anomaly_counter.py: per-project 改造(Phase A fallback)

## Phase 4: Platform Shell in Shadow Mode
- awooop_phase4_run_state_2026-05-04.sql: run_state + step_journal + idempotency
- src/services/run_state_machine.py: 8-state FSM + SKIP LOCKED + stale reaper
- src/services/platform_runtime.py: UUID v7 + W3C trace_id + shadow_execute
- src/services/audit_sink.py: PII/secret redaction 9 patterns
- src/api/v1/platform/runs.py: POST/GET /v1/platform/runs(Router→Service 架構)
- src/workers/platform_worker.py: SKIP LOCKED worker + heartbeat + reaper loop
- src/main.py: platform router + lifespan worker start/stop

## Phase 5: MCP Gateway 五閘門
- awooop_phase5_mcp_gateway_2026-05-04.sql: 4 表 + RLS
- src/plugins/mcp/gateway.py: McpGateway(Gate 1~5, E-MCP-GATE-001~009)
- src/plugins/mcp/redaction_middleware.py: 雙層 redaction + 16K 截斷
- src/plugins/mcp/registry.py: __provider name mangling(ADR-116)
- src/plugins/mcp/credential_resolver.py: k8s secret ref 解析
- tests/test_mcp_credential_isolation.py: 10 個迴歸測試(secret leak 防再現)

## Phase 6-8: EwoooC + Channel Hub + Approval Token
- awooop_phase6_ewoooc_onboarding_2026-05-04.sql: ewoooc tenant + 4 read-only MCP tools
- awooop_phase7_channel_hub_2026-05-04.sql: conversation_event + outbound_message
- src/services/provider_proxy.py: ProviderProxy + PlatformEnvelope(ADR-115)
- src/services/channel_hub.py: Telegram inbound mirror + Progressive Feedback(30s)
- src/services/awooop_approval_token.py: HS256 + jti NX replay 防護 + suggest mode

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-04 19:31:53 +08:00

192 lines
7.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
MCP Credential Isolation 迴歸測試
==================================
AwoooP Phase 5.5:防止 2026-04-18 Secret Leak 事故再現
2026-05-04 ogt + Claude Sonnet 4.6
覆蓋:
1. credential_resolver 格式驗證bad ref 拒絕)
2. dev fallback 正確返回 (value, masked, sha256)
3. secret value 不洩漏到 redaction_middleware output
4. _mcp_audit key 在 redact_mcp_input 中被移除(不送 provider
5. AuditedMCPToolProvider.__provider 不可從外部直接存取name mangling
"""
from __future__ import annotations
import hashlib
import json
import os
import pytest
class TestCredentialResolverFormat:
"""credential_resolver 格式驗證"""
def test_bad_ref_raises(self):
"""格式錯誤的 ref 應拋出 CredentialResolutionError"""
import asyncio
import sys
sys.path.insert(0, ".")
from src.plugins.mcp.credential_resolver import (
CredentialResolutionError,
resolve_k8s_secret,
)
bad_refs = [
"no-slash",
"namespace/secret", # 缺 #key
"NAMESPACE/secret#key", # namespace 大寫不符格式
"ns/secret#", # key 為空
"",
]
for ref in bad_refs:
with pytest.raises(CredentialResolutionError):
asyncio.run(resolve_k8s_secret(ref))
def test_dev_fallback_resolves(self, monkeypatch):
"""AWOOOP_DEV_SECRETS_JSON 設定後應正確解析"""
import asyncio
import sys
sys.path.insert(0, ".")
dev_secrets = {"awoooi/telegram-bot#TELEGRAM_BOT_TOKEN": "test-token-value-1234"}
monkeypatch.setenv("AWOOOP_DEV_SECRETS_JSON", json.dumps(dev_secrets))
from src.plugins.mcp.credential_resolver import resolve_k8s_secret
value, masked, sha256 = asyncio.run(
resolve_k8s_secret("awoooi/telegram-bot#TELEGRAM_BOT_TOKEN")
)
assert value == "test-token-value-1234"
assert "test" in masked # 前 4 字元
assert "***" in masked
assert sha256 == hashlib.sha256("test-token-value-1234".encode()).hexdigest()
def test_dev_fallback_missing_key_raises(self, monkeypatch):
"""dev secrets 中找不到 key 應拋出錯誤"""
import asyncio
import sys
sys.path.insert(0, ".")
monkeypatch.setenv("AWOOOP_DEV_SECRETS_JSON", json.dumps({}))
from src.plugins.mcp.credential_resolver import (
CredentialResolutionError,
resolve_k8s_secret,
)
with pytest.raises(CredentialResolutionError):
asyncio.run(resolve_k8s_secret("awoooi/some-secret#key"))
class TestRedactionMiddlewareSecretLeak:
"""2026-04-18 Secret Leak 迴歸secret value 不得進入 output"""
def test_pg_dsn_redacted_from_output(self):
"""PG DSN 在 output redaction 後不可見"""
import sys
sys.path.insert(0, ".")
from src.plugins.mcp.redaction_middleware import redact_mcp_output
output = {
"connection": "postgresql+asyncpg://admin:supersecret@10.0.1.5/prod",
"status": "connected",
}
cleaned = redact_mcp_output(output)
assert "supersecret" not in json.dumps(cleaned), "PG DSN 密碼不得出現在 output"
assert "[REDACTED:PG_DSN]" in cleaned["connection"]
def test_telegram_token_redacted_from_output(self):
"""Telegram token 在 output redaction 後不可見"""
import sys
sys.path.insert(0, ".")
from src.plugins.mcp.redaction_middleware import redact_mcp_output
output = "Bot token: 1234567890:ABCDEFGHIJKLMNOPabcdefghijklmno12345678"
cleaned = redact_mcp_output(output)
assert "ABCDEFGHIJKLMNO" not in cleaned, "Telegram token 不得出現在 output"
assert "[REDACTED:TELEGRAM_TOKEN]" in cleaned
def test_internal_ip_redacted(self):
"""GCP 內網 IP 在 output redaction 後不可見"""
import sys
sys.path.insert(0, ".")
from src.plugins.mcp.redaction_middleware import redact_mcp_output
output = {"host": "10.0.1.5", "port": 5432}
cleaned = redact_mcp_output(output)
assert "10.0.1.5" not in json.dumps(cleaned), "內網 IP 不得出現在 output"
assert "[REDACTED:INTERNAL_IP]" in cleaned["host"]
def test_mcp_audit_key_removed_from_input(self):
"""_mcp_audit key 在 redact_mcp_input 後應被移除"""
import sys
sys.path.insert(0, ".")
from src.plugins.mcp.redaction_middleware import redact_mcp_input
params = {
"_mcp_audit": {"session_id": "abc123", "run_id": "xyz"},
"namespace": "default",
}
cleaned = redact_mcp_input(params)
assert "_mcp_audit" not in cleaned, "_mcp_audit 應在送 provider 前移除"
assert cleaned["namespace"] == "default"
def test_k8s_value_credential_isolation(self):
"""k8s_value 欄位應被 credential isolation 攔截"""
import sys
sys.path.insert(0, ".")
from src.plugins.mcp.redaction_middleware import redact_mcp_input
params = {
"k8s_value": "actual-secret-credential",
"tool": "some-tool",
}
cleaned = redact_mcp_input(params)
assert "actual-secret-credential" not in json.dumps(cleaned)
assert cleaned["k8s_value"] == "[REDACTED:CREDENTIAL_ISOLATION]"
class TestNameManglingEncapsulation:
"""AuditedMCPToolProvider.__provider name mangling 封裝驗證"""
def test_single_underscore_not_accessible(self):
"""_provider單底線應不存在於 AuditedMCPToolProvider 實例"""
import sys
sys.path.insert(0, ".")
from src.plugins.mcp.interfaces import MCPToolProvider, MCPTool, MCPToolResult
from src.plugins.mcp.registry import AuditedMCPToolProvider
class DummyProvider(MCPToolProvider):
@property
def name(self): return "dummy"
async def list_tools(self): return []
async def execute(self, tool_name, parameters):
return MCPToolResult(success=True, execution_id="t")
wrapped = AuditedMCPToolProvider(DummyProvider())
assert not hasattr(wrapped, "_provider"), (
"_provider 不應可直接存取name mangling 防止直接存取 inner provider"
)
def test_double_underscore_mangled(self):
"""__provider 應被 Python name mangling 重命名"""
import sys
sys.path.insert(0, ".")
from src.plugins.mcp.interfaces import MCPToolProvider, MCPTool, MCPToolResult
from src.plugins.mcp.registry import AuditedMCPToolProvider
class DummyProvider(MCPToolProvider):
@property
def name(self): return "dummy"
async def list_tools(self): return []
async def execute(self, tool_name, parameters):
return MCPToolResult(success=True, execution_id="t")
wrapped = AuditedMCPToolProvider(DummyProvider())
# Python name mangling: __provider → _AuditedMCPToolProvider__provider
assert hasattr(wrapped, "_AuditedMCPToolProvider__provider"), (
"name mangling 後的屬性應可被內部存取"
)
assert wrapped.name == "dummy"