Files
awoooi/scripts/security/gitea-authenticated-inventory-payload-validator.py
Your Name 0f71f7fd4f
Some checks failed
CD Pipeline / workflow-shape (push) Successful in 0s
CD Pipeline / cancel-stale-cd (push) Has been skipped
CD Pipeline / build-and-deploy (push) Has been cancelled
CD Pipeline / post-deploy-checks (push) Has been cancelled
CD Pipeline / tests (push) Has been cancelled
feat(security): validate gitea inventory payloads
2026-06-29 15:50:55 +08:00

272 lines
9.5 KiB
Python

#!/usr/bin/env python3
"""Validate a redacted Gitea authenticated/admin inventory payload.
This is a preflight only. It never calls Gitea, never stores token values, and
never writes repos, refs, secrets, or runtime state.
"""
from __future__ import annotations
import argparse
import json
import re
from pathlib import Path
from typing import Any
from urllib.parse import parse_qsl, urlsplit
SCHEMA_VERSION = "gitea_authenticated_inventory_payload_validation_v1"
PAYLOAD_SCHEMA_VERSION = "gitea_repo_inventory_v1"
ACCEPTED_VISIBILITY_SCOPES = {"authenticated", "admin_export"}
REQUIRED_ATTESTATIONS = {
"no_token_value",
"no_write_token",
"no_webhook_secret",
"no_deploy_key_private_key",
"no_runner_registration_token",
"no_cookie_or_session",
"no_gitea_db_dump",
"no_git_object_pack",
}
FORBIDDEN_TRUE_FIELDS = {
"repo_write_allowed",
"refs_sync_allowed",
"github_primary_switch_authorized",
"runtime_execution_authorized",
"write_to_gitea",
"create_gitea_repo",
"delete_or_archive_gitea_repo",
"sync_git_refs",
"force_push",
}
SECRET_PATTERNS = {
"authorization_header": re.compile(r"Authorization\s*:", re.IGNORECASE),
"bearer_token": re.compile(r"Bearer\s+[A-Za-z0-9._~+/=-]{12,}", re.IGNORECASE),
"cookie_header": re.compile(r"\bCookie\s*:", re.IGNORECASE),
"password_assignment": re.compile(r"\bpassword\s*[:=]\s*[^,\s]+", re.IGNORECASE),
"private_key": re.compile(r"-----BEGIN [A-Z ]*PRIVATE KEY-----"),
"token_assignment": re.compile(r"\btoken\s*[:=]\s*[^,\s]+", re.IGNORECASE),
}
SECRET_QUERY_KEYS = {"access_token", "auth", "key", "password", "secret", "token"}
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Validate redacted Gitea authenticated/admin inventory payload.",
)
parser.add_argument(
"--input",
type=Path,
default=Path("docs/security/gitea-repo-inventory.snapshot.json"),
help="Payload JSON to validate.",
)
parser.add_argument("--output", type=Path, help="Write validation JSON here.")
return parser.parse_args()
def load_json(path: Path) -> dict[str, Any]:
payload = json.loads(path.read_text(encoding="utf-8"))
if not isinstance(payload, dict):
raise SystemExit(f"json_not_object={path}")
return payload
def validate_payload(payload: dict[str, Any]) -> dict[str, Any]:
blockers: list[str] = []
sensitive_hits = find_sensitive_strings(payload)
forbidden_true_fields = find_forbidden_true_fields(payload)
if payload.get("schema_version") != PAYLOAD_SCHEMA_VERSION:
blockers.append(f"schema_version_not_{PAYLOAD_SCHEMA_VERSION}")
if payload.get("status") != "ok":
blockers.append("status_not_ok")
visibility_scope = str(payload.get("visibility_scope") or "")
if visibility_scope not in ACCEPTED_VISIBILITY_SCOPES:
blockers.append("visibility_scope_not_authenticated_or_admin_export")
repos = [repo for repo in as_list(payload.get("repos")) if isinstance(repo, dict)]
repo_count = as_int(payload.get("repo_count"))
if repo_count != len(repos):
blockers.append("repo_count_mismatch")
if repo_count < 4:
blockers.append("repo_count_below_current_public_floor")
blockers.extend(validate_repos(repos))
if is_placeholder(payload.get("coverage_gap_explanation")):
blockers.append("coverage_gap_explanation_missing")
blockers.extend(validate_redaction_attestation(payload.get("redaction_attestation")))
if forbidden_true_fields:
status = "rejected_execution_request"
elif sensitive_hits:
status = "quarantined_sensitive_payload"
elif blockers:
status = "needs_supplement"
else:
status = "accepted_for_private_inventory_review_only"
return {
"schema_version": SCHEMA_VERSION,
"status": status,
"priority": "P0-003",
"scope": "gitea_authenticated_inventory_payload_validation",
"result": {
"accepted_payload_count": (
1 if status == "accepted_for_private_inventory_review_only" else 0
),
"repo_count": repo_count,
"visible_repo_count": len(repos),
"blocker_count": len(blockers),
"sensitive_payload_hit_count": len(sensitive_hits),
"forbidden_true_field_count": len(forbidden_true_fields),
"token_value_collection_allowed": False,
"repo_write_allowed": False,
"refs_sync_allowed": False,
"github_primary_switch_authorized": False,
"runtime_gate_count": 0,
},
"blockers": blockers,
"sensitive_payload_hits": sensitive_hits,
"forbidden_true_fields": forbidden_true_fields,
"operation_boundaries": {
"payload_persisted": False,
"gitea_api_called": False,
"gitea_write_performed": False,
"repo_write_performed": False,
"refs_sync_performed": False,
"github_api_used": False,
"secret_plaintext_read": False,
"token_value_collection_allowed": False,
"runtime_action_performed": False,
"raw_session_or_sqlite_read_performed": False,
},
"safe_next_step": (
"review_redacted_inventory_payload_then_update_gitea_inventory_snapshot"
if status == "accepted_for_private_inventory_review_only"
else "supplement_authenticated_or_admin_export_redacted_inventory_payload"
),
}
def validate_repos(repos: list[dict[str, Any]]) -> list[str]:
blockers: list[str] = []
seen: set[str] = set()
for index, repo in enumerate(repos):
identity = str(repo.get("full_name") or repo.get("gitea_repo") or "")
if not identity:
blockers.append(f"repos[{index}].identity_missing")
elif identity in seen:
blockers.append(f"repos[{index}].identity_duplicate")
seen.add(identity)
for key in ("name", "default_branch", "clone_url_redacted", "ssh_url_redacted"):
if is_placeholder(repo.get(key)):
blockers.append(f"repos[{index}].{key}_missing")
if is_placeholder(repo.get("owner")) and is_placeholder(as_dict(repo.get("owner")).get("login")):
blockers.append(f"repos[{index}].owner_missing")
for key in ("private", "archived", "empty"):
if not isinstance(repo.get(key), bool):
blockers.append(f"repos[{index}].{key}_not_boolean")
for key in ("clone_url_redacted", "ssh_url_redacted"):
value = str(repo.get(key) or "")
if url_has_secret(value):
blockers.append(f"repos[{index}].{key}_not_redacted")
return blockers
def validate_redaction_attestation(value: Any) -> list[str]:
attestation = as_dict(value)
if not attestation:
return ["redaction_attestation_missing"]
blockers: list[str] = []
for key in sorted(REQUIRED_ATTESTATIONS):
if attestation.get(key) is not True:
blockers.append(f"redaction_attestation.{key}_not_true")
return blockers
def find_sensitive_strings(value: Any) -> list[str]:
hits: list[str] = []
def walk(node: Any, path: str) -> None:
if isinstance(node, dict):
for key, item in node.items():
walk(item, f"{path}.{key}" if path else str(key))
elif isinstance(node, list):
for index, item in enumerate(node):
walk(item, f"{path}[{index}]")
elif isinstance(node, str):
for name, pattern in SECRET_PATTERNS.items():
if pattern.search(node):
hits.append(f"{path}:{name}")
if url_has_secret(node):
hits.append(f"{path}:url_contains_secret_material")
walk(value, "")
return sorted(set(hits))
def find_forbidden_true_fields(value: Any) -> list[str]:
hits: list[str] = []
def walk(node: Any, path: str) -> None:
if isinstance(node, dict):
for key, item in node.items():
next_path = f"{path}.{key}" if path else str(key)
if key in FORBIDDEN_TRUE_FIELDS and item is True:
hits.append(next_path)
walk(item, next_path)
elif isinstance(node, list):
for index, item in enumerate(node):
walk(item, f"{path}[{index}]")
walk(value, "")
return sorted(hits)
def url_has_secret(value: str) -> bool:
if "://" not in value:
return False
parsed = urlsplit(value)
if parsed.username or parsed.password:
return True
return any(key.lower() in SECRET_QUERY_KEYS for key, _ in parse_qsl(parsed.query))
def is_placeholder(value: Any) -> bool:
if value is None:
return True
if isinstance(value, str):
return value.strip().lower() in {"", "pending", "todo", "tbd", "n/a", "na"}
return False
def as_list(value: Any) -> list[Any]:
return value if isinstance(value, list) else []
def as_dict(value: Any) -> dict[str, Any]:
return value if isinstance(value, dict) else {}
def as_int(value: Any) -> int:
try:
return int(value)
except (TypeError, ValueError):
return 0
def main() -> int:
args = parse_args()
validation = validate_payload(load_json(args.input))
text = json.dumps(validation, ensure_ascii=False, indent=2) + "\n"
if args.output:
args.output.parent.mkdir(parents=True, exist_ok=True)
args.output.write_text(text, encoding="utf-8")
else:
print(text, end="")
return 0
if __name__ == "__main__":
raise SystemExit(main())