239 lines
10 KiB
Python
239 lines
10 KiB
Python
#!/usr/bin/env python3
|
||
"""檢查公開前端不可洩漏內網拓樸、原始 namespace 或工作視窗內容。"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import json
|
||
import re
|
||
import subprocess
|
||
from datetime import datetime, timedelta, timezone
|
||
from pathlib import Path
|
||
from typing import Any
|
||
|
||
|
||
PRIVATE_IP_PATTERN = re.compile(
|
||
r"\b(?:10(?:\.\d{1,3}){3}|192\.168(?:\.\d{1,3}){2}|172\.(?:1[6-9]|2\d|3[01])(?:\.\d{1,3}){2})\b"
|
||
)
|
||
TAIPEI = timezone(timedelta(hours=8))
|
||
|
||
RETIRED_PUBLIC_TOPOLOGY_KEYS = {
|
||
"NEXT_PUBLIC_HOST_IPS",
|
||
"NEXT_PUBLIC_K8S_VIP_INFO",
|
||
}
|
||
|
||
ENV_EXAMPLE_PATHS = (
|
||
Path("apps/web/.env.example"),
|
||
)
|
||
|
||
PUBLIC_SURFACE_ROOTS = (
|
||
Path("apps/web/src"),
|
||
Path("apps/web/messages"),
|
||
)
|
||
|
||
PUBLIC_SURFACE_SUFFIXES = {
|
||
".json",
|
||
".mdx",
|
||
".ts",
|
||
".tsx",
|
||
}
|
||
|
||
PUBLIC_SURFACE_FORBIDDEN_PATTERNS: tuple[tuple[str, re.Pattern[str]], ...] = (
|
||
("raw_personal_owner_namespace", re.compile(r"\bowenhytsai\b", re.IGNORECASE)),
|
||
("raw_external_owner_namespace", re.compile(r"\bnexu-io\b", re.IGNORECASE)),
|
||
("raw_blocked_waiting_state", re.compile(r"\bblocked_waiting_[A-Za-z0-9_]+\b")),
|
||
("raw_blockers_counter", re.compile(r"\bblockers=\d+\b")),
|
||
("codex_delegation_payload", re.compile(r"\bcodex_delegation\b", re.IGNORECASE)),
|
||
("codex_source_thread_id", re.compile(r"\bsource_thread_id\b", re.IGNORECASE)),
|
||
("approval_chat_phrase", re.compile(r"批准!")),
|
||
("work_window_plaintext", re.compile(r"工作視窗")),
|
||
("in_app_browser_transcript", re.compile(r"\bIn app browser\b", re.IGNORECASE)),
|
||
("codex_request_transcript", re.compile(r"\bMy request for Codex\b", re.IGNORECASE)),
|
||
("work_window_transcript", re.compile(r"\bwork window transcript\b", re.IGNORECASE)),
|
||
("internal_rfc1918_ip", PRIVATE_IP_PATTERN),
|
||
)
|
||
|
||
ALLOWED_PUBLIC_SURFACE_MATCHES: set[tuple[str, str]] = {
|
||
("apps/web/src/lib/api-client.ts", "work_window_transcript"),
|
||
("apps/web/src/app/[locale]/governance/tabs/automation-inventory-tab.tsx", "work_window_transcript"),
|
||
}
|
||
|
||
|
||
def _active_assignment(line: str) -> tuple[str, str] | None:
|
||
stripped = line.strip()
|
||
if not stripped or stripped.startswith("#") or "=" not in stripped:
|
||
return None
|
||
key, value = stripped.split("=", 1)
|
||
return key.strip(), value.strip()
|
||
|
||
|
||
def _git_short_sha(root: Path) -> str:
|
||
try:
|
||
result = subprocess.run(
|
||
["git", "rev-parse", "--short", "HEAD"],
|
||
cwd=root,
|
||
check=True,
|
||
capture_output=True,
|
||
text=True,
|
||
)
|
||
return result.stdout.strip()
|
||
except Exception:
|
||
return "unknown"
|
||
|
||
|
||
def _iter_public_surface_files(root: Path) -> list[Path]:
|
||
files: list[Path] = []
|
||
for surface_root in PUBLIC_SURFACE_ROOTS:
|
||
absolute_root = root / surface_root
|
||
if not absolute_root.exists():
|
||
continue
|
||
for path in absolute_root.rglob("*"):
|
||
if path.is_file() and path.suffix in PUBLIC_SURFACE_SUFFIXES:
|
||
files.append(path)
|
||
return sorted(files)
|
||
|
||
|
||
def _line_excerpt(line: str, start: int, end: int) -> str:
|
||
prefix_start = max(start - 36, 0)
|
||
suffix_end = min(end + 36, len(line))
|
||
excerpt = line[prefix_start:suffix_end].strip()
|
||
return excerpt if len(excerpt) <= 120 else excerpt[:117] + "..."
|
||
|
||
|
||
def build_report(root: Path, generated_at: str | None = None) -> dict[str, Any]:
|
||
generated = generated_at or datetime.now(TAIPEI).isoformat(timespec="seconds")
|
||
errors: list[str] = []
|
||
env_violation_count = 0
|
||
public_surface_files = _iter_public_surface_files(root)
|
||
public_surface_matches: list[dict[str, Any]] = []
|
||
public_surface_violations: list[dict[str, Any]] = []
|
||
allowlisted_match_count = 0
|
||
|
||
for relative_path in ENV_EXAMPLE_PATHS:
|
||
path = root / relative_path
|
||
if not path.exists():
|
||
continue
|
||
|
||
for line_number, line in enumerate(path.read_text(encoding="utf-8").splitlines(), start=1):
|
||
assignment = _active_assignment(line)
|
||
if assignment is None:
|
||
continue
|
||
|
||
key, value = assignment
|
||
if key in RETIRED_PUBLIC_TOPOLOGY_KEYS:
|
||
errors.append(f"{relative_path}:{line_number}: 已停用公開前端拓樸 env key:{key}")
|
||
env_violation_count += 1
|
||
if PRIVATE_IP_PATTERN.search(value):
|
||
errors.append(f"{relative_path}:{line_number}: env 範例不得包含內網 IP:{key}")
|
||
env_violation_count += 1
|
||
if key.startswith("NEXT_PUBLIC_") and not value:
|
||
errors.append(f"{relative_path}:{line_number}: NEXT_PUBLIC_* 範例需明確使用公網入口或安全預設值:{key}")
|
||
env_violation_count += 1
|
||
|
||
for path in public_surface_files:
|
||
relative_path = path.relative_to(root).as_posix()
|
||
text = path.read_text(encoding="utf-8", errors="replace")
|
||
for line_number, line in enumerate(text.splitlines(), start=1):
|
||
for pattern_id, pattern in PUBLIC_SURFACE_FORBIDDEN_PATTERNS:
|
||
for match in pattern.finditer(line):
|
||
item = {
|
||
"path": relative_path,
|
||
"line": line_number,
|
||
"pattern_id": pattern_id,
|
||
"excerpt": _line_excerpt(line, match.start(), match.end()),
|
||
}
|
||
public_surface_matches.append(item)
|
||
if (relative_path, pattern_id) in ALLOWED_PUBLIC_SURFACE_MATCHES:
|
||
allowlisted_match_count += 1
|
||
continue
|
||
public_surface_violations.append(item)
|
||
errors.append(
|
||
f"{relative_path}:{line_number}: 前台敏感資訊命中 {pattern_id},需改成脫敏名稱或遮罩器例外"
|
||
)
|
||
|
||
return {
|
||
"schema_version": "public_frontend_sensitive_surface_guard_v1",
|
||
"generated_at": generated,
|
||
"git_commit": _git_short_sha(root),
|
||
"status": "pass" if not errors else "blocked",
|
||
"mode": "repo_source_scan_no_runtime_no_secret_collection",
|
||
"guarded_paths": [path.as_posix() for path in PUBLIC_SURFACE_ROOTS],
|
||
"env_example_paths": [path.as_posix() for path in ENV_EXAMPLE_PATHS],
|
||
"forbidden_patterns": [pattern_id for pattern_id, _ in PUBLIC_SURFACE_FORBIDDEN_PATTERNS],
|
||
"allowed_matches": [
|
||
{"path": path, "pattern_id": pattern_id}
|
||
for path, pattern_id in sorted(ALLOWED_PUBLIC_SURFACE_MATCHES)
|
||
],
|
||
"summary": {
|
||
"env_example_file_count": sum(1 for path in ENV_EXAMPLE_PATHS if (root / path).exists()),
|
||
"public_surface_file_count": len(public_surface_files),
|
||
"forbidden_pattern_count": len(PUBLIC_SURFACE_FORBIDDEN_PATTERNS),
|
||
"raw_match_count": len(public_surface_matches),
|
||
"allowlisted_match_count": allowlisted_match_count,
|
||
"violation_count": len(public_surface_violations),
|
||
"env_violation_count": env_violation_count,
|
||
"runtime_gate_count": 0,
|
||
"action_button_count": 0,
|
||
},
|
||
"execution_boundaries": {
|
||
"runtime_execution_authorized": False,
|
||
"frontend_deploy_authorized": False,
|
||
"production_deploy_authorized": False,
|
||
"secret_value_collection_allowed": False,
|
||
"raw_payload_storage_allowed": False,
|
||
"internal_namespace_public_display_allowed": False,
|
||
"work_window_transcript_public_display_allowed": False,
|
||
"internal_ip_public_display_allowed": False,
|
||
"action_buttons_allowed": False,
|
||
"not_authorization": True,
|
||
},
|
||
"public_surface_matches": public_surface_matches,
|
||
"public_surface_violations": public_surface_violations,
|
||
"operator_interpretation": [
|
||
"此 guard 只掃描 repo 內前端 source / messages 與 env example,不讀 production bundle、不部署、不收 secret。",
|
||
"遮罩器中的 banned phrase 測試 pattern 允許列在 allowlist;產品文案、表格、API payload 與 i18n 不允許顯示 raw namespace、工作視窗逐字內容、raw blocker 狀態或內網 IP。",
|
||
"violation_count 維持 0 才能視為 source-control 防洩漏檢查通過;仍不代表 production smoke、runtime approval 或 owner response accepted。",
|
||
],
|
||
}
|
||
|
||
|
||
def validate(root: Path) -> None:
|
||
report = build_report(root)
|
||
errors = [
|
||
f"{item['path']}:{item['line']}: 前台敏感資訊命中 {item['pattern_id']},需改成脫敏名稱或遮罩器例外"
|
||
for item in report["public_surface_violations"]
|
||
]
|
||
if report["summary"]["env_violation_count"]:
|
||
errors.append(f"env_violation_count={report['summary']['env_violation_count']}")
|
||
|
||
if errors:
|
||
raise SystemExit("BLOCKED public frontend sensitive surface guard:\n" + "\n".join(f"- {error}" for error in errors))
|
||
|
||
|
||
def main() -> None:
|
||
parser = argparse.ArgumentParser(description="檢查公開前端不可洩漏內網拓樸、原始 namespace 或工作視窗內容")
|
||
parser.add_argument("--root", default=".", help="repository root")
|
||
parser.add_argument("--output", help="寫出 JSON 報告")
|
||
parser.add_argument("--generated-at", help="固定報告時間,供 committed snapshot 使用")
|
||
args = parser.parse_args()
|
||
root = Path(args.root).resolve()
|
||
report = build_report(root, args.generated_at)
|
||
if args.output:
|
||
output = Path(args.output)
|
||
output.parent.mkdir(parents=True, exist_ok=True)
|
||
output.write_text(json.dumps(report, ensure_ascii=False, indent=2, sort_keys=True) + "\n", encoding="utf-8")
|
||
validate(root)
|
||
summary = report["summary"]
|
||
print(
|
||
"OK public frontend sensitive surface guard "
|
||
f"files={summary['public_surface_file_count']} "
|
||
f"patterns={summary['forbidden_pattern_count']} "
|
||
f"allowlisted={summary['allowlisted_match_count']} "
|
||
f"violations={summary['violation_count']} "
|
||
f"runtime_gate={summary['runtime_gate_count']}"
|
||
)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|