#!/usr/bin/env python3 """檢查公開前端不可洩漏內網拓樸、原始 namespace 或工作視窗內容。""" from __future__ import annotations import argparse import json import re import subprocess from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Any PRIVATE_IP_PATTERN = re.compile( r"\b(?:10(?:\.\d{1,3}){3}|192\.168(?:\.\d{1,3}){2}|172\.(?:1[6-9]|2\d|3[01])(?:\.\d{1,3}){2})\b" ) TAIPEI = timezone(timedelta(hours=8)) RETIRED_PUBLIC_TOPOLOGY_KEYS = { "NEXT_PUBLIC_HOST_IPS", "NEXT_PUBLIC_K8S_VIP_INFO", } ENV_EXAMPLE_PATHS = ( Path("apps/web/.env.example"), ) PUBLIC_SURFACE_ROOTS = ( Path("apps/web/src"), Path("apps/web/messages"), ) PUBLIC_SURFACE_SUFFIXES = { ".json", ".mdx", ".ts", ".tsx", } PUBLIC_SURFACE_FORBIDDEN_PATTERNS: tuple[tuple[str, re.Pattern[str]], ...] = ( ("raw_personal_owner_namespace", re.compile(r"\bowenhytsai\b", re.IGNORECASE)), ("raw_external_owner_namespace", re.compile(r"\bnexu-io\b", re.IGNORECASE)), ("raw_blocked_waiting_state", re.compile(r"\bblocked_waiting_[A-Za-z0-9_]+\b")), ("raw_blockers_counter", re.compile(r"\bblockers=\d+\b")), ("codex_delegation_payload", re.compile(r"\bcodex_delegation\b", re.IGNORECASE)), ("codex_source_thread_id", re.compile(r"\bsource_thread_id\b", re.IGNORECASE)), ("approval_chat_phrase", re.compile(r"批准!")), ("work_window_plaintext", re.compile(r"工作視窗")), ("in_app_browser_transcript", re.compile(r"\bIn app browser\b", re.IGNORECASE)), ("codex_request_transcript", re.compile(r"\bMy request for Codex\b", re.IGNORECASE)), ("work_window_transcript", re.compile(r"\bwork window transcript\b", re.IGNORECASE)), ("internal_rfc1918_ip", PRIVATE_IP_PATTERN), ) ALLOWED_PUBLIC_SURFACE_MATCHES: set[tuple[str, str]] = { ("apps/web/src/lib/api-client.ts", "work_window_transcript"), ("apps/web/src/app/[locale]/governance/tabs/automation-inventory-tab.tsx", "work_window_transcript"), } def _active_assignment(line: str) -> tuple[str, str] | None: stripped = line.strip() if not stripped or stripped.startswith("#") or "=" not in stripped: return None key, value = stripped.split("=", 1) return key.strip(), value.strip() def _git_short_sha(root: Path) -> str: try: result = subprocess.run( ["git", "rev-parse", "--short", "HEAD"], cwd=root, check=True, capture_output=True, text=True, ) return result.stdout.strip() except Exception: return "unknown" def _iter_public_surface_files(root: Path) -> list[Path]: files: list[Path] = [] for surface_root in PUBLIC_SURFACE_ROOTS: absolute_root = root / surface_root if not absolute_root.exists(): continue for path in absolute_root.rglob("*"): if path.is_file() and path.suffix in PUBLIC_SURFACE_SUFFIXES: files.append(path) return sorted(files) def _line_excerpt(line: str, start: int, end: int) -> str: prefix_start = max(start - 36, 0) suffix_end = min(end + 36, len(line)) excerpt = line[prefix_start:suffix_end].strip() return excerpt if len(excerpt) <= 120 else excerpt[:117] + "..." def build_report(root: Path, generated_at: str | None = None) -> dict[str, Any]: generated = generated_at or datetime.now(TAIPEI).isoformat(timespec="seconds") errors: list[str] = [] env_violation_count = 0 public_surface_files = _iter_public_surface_files(root) public_surface_matches: list[dict[str, Any]] = [] public_surface_violations: list[dict[str, Any]] = [] allowlisted_match_count = 0 for relative_path in ENV_EXAMPLE_PATHS: path = root / relative_path if not path.exists(): continue for line_number, line in enumerate(path.read_text(encoding="utf-8").splitlines(), start=1): assignment = _active_assignment(line) if assignment is None: continue key, value = assignment if key in RETIRED_PUBLIC_TOPOLOGY_KEYS: errors.append(f"{relative_path}:{line_number}: 已停用公開前端拓樸 env key:{key}") env_violation_count += 1 if PRIVATE_IP_PATTERN.search(value): errors.append(f"{relative_path}:{line_number}: env 範例不得包含內網 IP:{key}") env_violation_count += 1 if key.startswith("NEXT_PUBLIC_") and not value: errors.append(f"{relative_path}:{line_number}: NEXT_PUBLIC_* 範例需明確使用公網入口或安全預設值:{key}") env_violation_count += 1 for path in public_surface_files: relative_path = path.relative_to(root).as_posix() text = path.read_text(encoding="utf-8", errors="replace") for line_number, line in enumerate(text.splitlines(), start=1): for pattern_id, pattern in PUBLIC_SURFACE_FORBIDDEN_PATTERNS: for match in pattern.finditer(line): item = { "path": relative_path, "line": line_number, "pattern_id": pattern_id, "excerpt": _line_excerpt(line, match.start(), match.end()), } public_surface_matches.append(item) if (relative_path, pattern_id) in ALLOWED_PUBLIC_SURFACE_MATCHES: allowlisted_match_count += 1 continue public_surface_violations.append(item) errors.append( f"{relative_path}:{line_number}: 前台敏感資訊命中 {pattern_id},需改成脫敏名稱或遮罩器例外" ) return { "schema_version": "public_frontend_sensitive_surface_guard_v1", "generated_at": generated, "git_commit": _git_short_sha(root), "status": "pass" if not errors else "blocked", "mode": "repo_source_scan_no_runtime_no_secret_collection", "guarded_paths": [path.as_posix() for path in PUBLIC_SURFACE_ROOTS], "env_example_paths": [path.as_posix() for path in ENV_EXAMPLE_PATHS], "forbidden_patterns": [pattern_id for pattern_id, _ in PUBLIC_SURFACE_FORBIDDEN_PATTERNS], "allowed_matches": [ {"path": path, "pattern_id": pattern_id} for path, pattern_id in sorted(ALLOWED_PUBLIC_SURFACE_MATCHES) ], "summary": { "env_example_file_count": sum(1 for path in ENV_EXAMPLE_PATHS if (root / path).exists()), "public_surface_file_count": len(public_surface_files), "forbidden_pattern_count": len(PUBLIC_SURFACE_FORBIDDEN_PATTERNS), "raw_match_count": len(public_surface_matches), "allowlisted_match_count": allowlisted_match_count, "violation_count": len(public_surface_violations), "env_violation_count": env_violation_count, "runtime_gate_count": 0, "action_button_count": 0, }, "execution_boundaries": { "runtime_execution_authorized": False, "frontend_deploy_authorized": False, "production_deploy_authorized": False, "secret_value_collection_allowed": False, "raw_payload_storage_allowed": False, "internal_namespace_public_display_allowed": False, "work_window_transcript_public_display_allowed": False, "internal_ip_public_display_allowed": False, "action_buttons_allowed": False, "not_authorization": True, }, "public_surface_matches": public_surface_matches, "public_surface_violations": public_surface_violations, "operator_interpretation": [ "此 guard 只掃描 repo 內前端 source / messages 與 env example,不讀 production bundle、不部署、不收 secret。", "遮罩器中的 banned phrase 測試 pattern 允許列在 allowlist;產品文案、表格、API payload 與 i18n 不允許顯示 raw namespace、工作視窗逐字內容、raw blocker 狀態或內網 IP。", "violation_count 維持 0 才能視為 source-control 防洩漏檢查通過;仍不代表 production smoke、runtime approval 或 owner response accepted。", ], } def validate(root: Path) -> None: report = build_report(root) errors = [ f"{item['path']}:{item['line']}: 前台敏感資訊命中 {item['pattern_id']},需改成脫敏名稱或遮罩器例外" for item in report["public_surface_violations"] ] if report["summary"]["env_violation_count"]: errors.append(f"env_violation_count={report['summary']['env_violation_count']}") if errors: raise SystemExit("BLOCKED public frontend sensitive surface guard:\n" + "\n".join(f"- {error}" for error in errors)) def main() -> None: parser = argparse.ArgumentParser(description="檢查公開前端不可洩漏內網拓樸、原始 namespace 或工作視窗內容") parser.add_argument("--root", default=".", help="repository root") parser.add_argument("--output", help="寫出 JSON 報告") parser.add_argument("--generated-at", help="固定報告時間,供 committed snapshot 使用") args = parser.parse_args() root = Path(args.root).resolve() report = build_report(root, args.generated_at) if args.output: output = Path(args.output) output.parent.mkdir(parents=True, exist_ok=True) output.write_text(json.dumps(report, ensure_ascii=False, indent=2, sort_keys=True) + "\n", encoding="utf-8") validate(root) summary = report["summary"] print( "OK public frontend sensitive surface guard " f"files={summary['public_surface_file_count']} " f"patterns={summary['forbidden_pattern_count']} " f"allowlisted={summary['allowlisted_match_count']} " f"violations={summary['violation_count']} " f"runtime_gate={summary['runtime_gate_count']}" ) if __name__ == "__main__": main()