#!/usr/bin/env python3 """Validate a redacted Gitea authenticated/admin inventory payload. This is a preflight only. It never calls Gitea, never stores token values, and never writes repos, refs, secrets, or runtime state. """ from __future__ import annotations import argparse import json import re from pathlib import Path from typing import Any from urllib.parse import parse_qsl, urlsplit SCHEMA_VERSION = "gitea_authenticated_inventory_payload_validation_v1" PAYLOAD_SCHEMA_VERSION = "gitea_repo_inventory_v1" ACCEPTED_VISIBILITY_SCOPES = {"authenticated", "admin_export"} REQUIRED_ATTESTATIONS = { "no_token_value", "no_write_token", "no_webhook_secret", "no_deploy_key_private_key", "no_runner_registration_token", "no_cookie_or_session", "no_gitea_db_dump", "no_git_object_pack", } FORBIDDEN_TRUE_FIELDS = { "repo_write_allowed", "refs_sync_allowed", "github_primary_switch_authorized", "runtime_execution_authorized", "write_to_gitea", "create_gitea_repo", "delete_or_archive_gitea_repo", "sync_git_refs", "force_push", } SECRET_PATTERNS = { "authorization_header": re.compile(r"Authorization\s*:", re.IGNORECASE), "bearer_token": re.compile(r"Bearer\s+[A-Za-z0-9._~+/=-]{12,}", re.IGNORECASE), "cookie_header": re.compile(r"\bCookie\s*:", re.IGNORECASE), "password_assignment": re.compile(r"\bpassword\s*[:=]\s*[^,\s]+", re.IGNORECASE), "private_key": re.compile(r"-----BEGIN [A-Z ]*PRIVATE KEY-----"), "token_assignment": re.compile(r"\btoken\s*[:=]\s*[^,\s]+", re.IGNORECASE), } SECRET_QUERY_KEYS = {"access_token", "auth", "key", "password", "secret", "token"} ROOT = Path(__file__).resolve().parents[2] DEFAULT_INPUT = ROOT / "docs" / "security" / "gitea-repo-inventory.snapshot.json" def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Validate redacted Gitea authenticated/admin inventory payload.", ) parser.add_argument( "--input", type=Path, default=DEFAULT_INPUT, help="Payload JSON to validate.", ) parser.add_argument("--output", type=Path, help="Write validation JSON here.") return parser.parse_args() def load_json(path: Path) -> dict[str, Any]: payload = json.loads(path.read_text(encoding="utf-8")) if not isinstance(payload, dict): raise SystemExit(f"json_not_object={path}") return payload def validate_payload(payload: dict[str, Any]) -> dict[str, Any]: blockers: list[str] = [] sensitive_hits = find_sensitive_strings(payload) forbidden_true_fields = find_forbidden_true_fields(payload) if payload.get("schema_version") != PAYLOAD_SCHEMA_VERSION: blockers.append(f"schema_version_not_{PAYLOAD_SCHEMA_VERSION}") if payload.get("status") != "ok": blockers.append("status_not_ok") visibility_scope = str(payload.get("visibility_scope") or "") if visibility_scope not in ACCEPTED_VISIBILITY_SCOPES: blockers.append("visibility_scope_not_authenticated_or_admin_export") repos = [repo for repo in as_list(payload.get("repos")) if isinstance(repo, dict)] repo_count = as_int(payload.get("repo_count")) if repo_count != len(repos): blockers.append("repo_count_mismatch") if repo_count < 4: blockers.append("repo_count_below_current_public_floor") blockers.extend(validate_repos(repos)) if is_placeholder(payload.get("coverage_gap_explanation")): blockers.append("coverage_gap_explanation_missing") blockers.extend(validate_redaction_attestation(payload.get("redaction_attestation"))) if forbidden_true_fields: status = "rejected_execution_request" elif sensitive_hits: status = "quarantined_sensitive_payload" elif blockers: status = "needs_supplement" else: status = "accepted_for_private_inventory_review_only" return { "schema_version": SCHEMA_VERSION, "status": status, "priority": "P0-003", "scope": "gitea_authenticated_inventory_payload_validation", "result": { "accepted_payload_count": ( 1 if status == "accepted_for_private_inventory_review_only" else 0 ), "repo_count": repo_count, "visible_repo_count": len(repos), "blocker_count": len(blockers), "sensitive_payload_hit_count": len(sensitive_hits), "forbidden_true_field_count": len(forbidden_true_fields), "token_value_collection_allowed": False, "repo_write_allowed": False, "refs_sync_allowed": False, "github_primary_switch_authorized": False, "runtime_gate_count": 0, }, "blockers": blockers, "sensitive_payload_hits": sensitive_hits, "forbidden_true_fields": forbidden_true_fields, "operation_boundaries": { "payload_persisted": False, "gitea_api_called": False, "gitea_write_performed": False, "repo_write_performed": False, "refs_sync_performed": False, "github_api_used": False, "secret_plaintext_read": False, "token_value_collection_allowed": False, "runtime_action_performed": False, "raw_session_or_sqlite_read_performed": False, }, "safe_next_step": ( "review_redacted_inventory_payload_then_update_gitea_inventory_snapshot" if status == "accepted_for_private_inventory_review_only" else "supplement_authenticated_or_admin_export_redacted_inventory_payload" ), } def validate_repos(repos: list[dict[str, Any]]) -> list[str]: blockers: list[str] = [] seen: set[str] = set() for index, repo in enumerate(repos): identity = str(repo.get("full_name") or repo.get("gitea_repo") or "") if not identity: blockers.append(f"repos[{index}].identity_missing") elif identity in seen: blockers.append(f"repos[{index}].identity_duplicate") seen.add(identity) for key in ("name", "default_branch", "clone_url_redacted", "ssh_url_redacted"): if is_placeholder(repo.get(key)): blockers.append(f"repos[{index}].{key}_missing") if is_placeholder(repo.get("owner")) and is_placeholder(as_dict(repo.get("owner")).get("login")): blockers.append(f"repos[{index}].owner_missing") for key in ("private", "archived", "empty"): if not isinstance(repo.get(key), bool): blockers.append(f"repos[{index}].{key}_not_boolean") for key in ("clone_url_redacted", "ssh_url_redacted"): value = str(repo.get(key) or "") if url_has_secret(value): blockers.append(f"repos[{index}].{key}_not_redacted") return blockers def validate_redaction_attestation(value: Any) -> list[str]: attestation = as_dict(value) if not attestation: return ["redaction_attestation_missing"] blockers: list[str] = [] for key in sorted(REQUIRED_ATTESTATIONS): if attestation.get(key) is not True: blockers.append(f"redaction_attestation.{key}_not_true") return blockers def find_sensitive_strings(value: Any) -> list[str]: hits: list[str] = [] def walk(node: Any, path: str) -> None: if isinstance(node, dict): for key, item in node.items(): walk(item, f"{path}.{key}" if path else str(key)) elif isinstance(node, list): for index, item in enumerate(node): walk(item, f"{path}[{index}]") elif isinstance(node, str): for name, pattern in SECRET_PATTERNS.items(): if pattern.search(node): hits.append(f"{path}:{name}") if url_has_secret(node): hits.append(f"{path}:url_contains_secret_material") walk(value, "") return sorted(set(hits)) def find_forbidden_true_fields(value: Any) -> list[str]: hits: list[str] = [] def walk(node: Any, path: str) -> None: if isinstance(node, dict): for key, item in node.items(): next_path = f"{path}.{key}" if path else str(key) if key in FORBIDDEN_TRUE_FIELDS and item is True: hits.append(next_path) walk(item, next_path) elif isinstance(node, list): for index, item in enumerate(node): walk(item, f"{path}[{index}]") walk(value, "") return sorted(hits) def url_has_secret(value: str) -> bool: if "://" not in value: return False parsed = urlsplit(value) if parsed.username or parsed.password: return True return any(key.lower() in SECRET_QUERY_KEYS for key, _ in parse_qsl(parsed.query)) def is_placeholder(value: Any) -> bool: if value is None: return True if isinstance(value, str): return value.strip().lower() in {"", "pending", "todo", "tbd", "n/a", "na"} return False def as_list(value: Any) -> list[Any]: return value if isinstance(value, list) else [] def as_dict(value: Any) -> dict[str, Any]: return value if isinstance(value, dict) else {} def as_int(value: Any) -> int: try: return int(value) except (TypeError, ValueError): return 0 def main() -> int: args = parse_args() validation = validate_payload(load_json(args.input)) text = json.dumps(validation, ensure_ascii=False, indent=2) + "\n" if args.output: if args.output.resolve() == args.input.resolve(): raise SystemExit("output_must_not_equal_input") args.output.parent.mkdir(parents=True, exist_ok=True) args.output.write_text(text, encoding="utf-8") else: print(text, end="") return 0 if __name__ == "__main__": raise SystemExit(main())