""" MCP Redaction Middleware — 雙層 PII/Secret Redaction ===================================================== AwoooP Phase 5.3: ADR-116 P1-04 + P1-09 2026-05-04 ogt + Claude Sonnet 4.6 MCP tool call 的 input/output 必須經過雙層 redaction: Layer 1(audit_sink)— 寫入 audit log 前的 sanitization(欄位黑名單 + pattern 攔截) Layer 2(本層) — MCP tool call input/output 專用: - 移除已知 secret 欄位(_mcp_audit 注入的 context) - 對 output 套用 audit_sink 的完整 redaction patterns - 限制 output 大小(防 prompt stuffing) 設計原則(ADR-118 credential isolation 延伸): - MCP tool 的 output 可能含 k8s secret 值 → 必須在 output 進入 LLM context 前 redact - 只有「安全的」output 才能被 platform_runtime.shadow_execute 使用 - input credential 欄位(如 k8s_value)在送入 provider 前清除(credential isolation) 雙層保障的必要性: - audit_sink 保護的是 audit log DB - 本 middleware 保護的是 LLM context + gateway audit hash - 兩者防護對象不同,不可互相替代 """ from __future__ import annotations import hashlib import json import re from typing import Any import structlog from src.services.audit_sink import _BLOCKED_FIELD_NAMES, _REDACTION_PATTERNS, _redact_string logger = structlog.get_logger(__name__) # MCP output 進入 LLM context 的最大字元數(防 prompt stuffing) _MCP_OUTPUT_MAX_CHARS = 16_000 # MCP gateway 注入的 audit context key(送 provider 前移除) _MCP_AUDIT_KEY = "_mcp_audit" # MCP credential 欄位名稱(Gate 5 credential isolation — 在 input 中清除) _MCP_CREDENTIAL_FIELDS = frozenset({ "k8s_value", "secret_value", "credential", "credential_value", "token_value", "api_key_value", "private_key_value", }) def redact_mcp_input(parameters: dict[str, Any]) -> dict[str, Any]: """ Layer 2 Input Redaction:清理 MCP tool call 的 input parameters。 1. 移除 _mcp_audit(audit context,不應傳給 provider) 2. 移除 credential 欄位(credential isolation) 3. 對剩餘的 string values 套用 audit_sink patterns """ cleaned: dict[str, Any] = {} for key, value in parameters.items(): # 移除 audit context injection if key == _MCP_AUDIT_KEY: continue # credential isolation — 不讓 credential 明文流向 provider if key.lower() in _MCP_CREDENTIAL_FIELDS: cleaned[key] = "[REDACTED:CREDENTIAL_ISOLATION]" continue # 欄位名稱黑名單(與 audit_sink 對齊) if key.lower() in _BLOCKED_FIELD_NAMES: cleaned[key] = "[REDACTED:BLOCKED_FIELD]" continue # string value — 套用 pattern redaction if isinstance(value, str): cleaned[key] = _redact_string(value) elif isinstance(value, dict): cleaned[key] = redact_mcp_input(value) elif isinstance(value, list): cleaned[key] = [ redact_mcp_input(item) if isinstance(item, dict) else (_redact_string(item) if isinstance(item, str) else item) for item in value ] else: cleaned[key] = value return cleaned def redact_mcp_output(output: Any) -> Any: """ Layer 2 Output Redaction:清理 MCP tool call 的 output。 1. 對 output dict / string 套用 audit_sink patterns 2. 限制 output 大小(防 prompt stuffing) 3. 回傳清理後的 output(供 LLM context 使用) """ if output is None: return None if isinstance(output, str): redacted = _redact_string(output) if len(redacted) > _MCP_OUTPUT_MAX_CHARS: redacted = redacted[:_MCP_OUTPUT_MAX_CHARS] + f"\n[TRUNCATED:{len(output)} chars]" return redacted if isinstance(output, dict): return _redact_output_dict(output) if isinstance(output, list): result = [] total = 0 for item in output: if total > _MCP_OUTPUT_MAX_CHARS: result.append(f"[TRUNCATED:{len(output)} items total]") break cleaned = redact_mcp_output(item) serialized = json.dumps(cleaned, ensure_ascii=False, default=str) total += len(serialized) result.append(cleaned) return result return output def _redact_output_dict(d: dict[str, Any], depth: int = 0) -> dict[str, Any]: """遞迴 redact output dict""" if depth > 8: return {"[MAX_DEPTH]": True} result: dict[str, Any] = {} for key, value in d.items(): # 欄位名稱黑名單 if key.lower() in _BLOCKED_FIELD_NAMES: result[key] = "[REDACTED:BLOCKED_FIELD]" continue if isinstance(value, str): result[key] = _redact_string(value) elif isinstance(value, dict): result[key] = _redact_output_dict(value, depth + 1) elif isinstance(value, list): result[key] = [ _redact_output_dict(item, depth + 1) if isinstance(item, dict) else (_redact_string(item) if isinstance(item, str) else item) for item in value ] else: result[key] = value return result def compute_safe_hash(data: Any) -> str: """計算 redacted data 的 sha256(供 gateway audit 使用)""" serialized = json.dumps(data, sort_keys=True, ensure_ascii=False, default=str) return hashlib.sha256(serialized.encode()).hexdigest()