Some checks failed
CD Pipeline / workflow-shape (push) Successful in 0s
CD Pipeline / cancel-stale-cd (push) Has been skipped
CD Pipeline / tests (push) Successful in 17s
CD Pipeline / post-deploy-checks (push) Has been cancelled
CD Pipeline / build-and-deploy (push) Has been cancelled
275 lines
9.7 KiB
Python
275 lines
9.7 KiB
Python
#!/usr/bin/env python3
|
|
"""Validate a redacted Gitea authenticated/admin inventory payload.
|
|
|
|
This is a preflight only. It never calls Gitea, never stores token values, and
|
|
never writes repos, refs, secrets, or runtime state.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import re
|
|
from pathlib import Path
|
|
from typing import Any
|
|
from urllib.parse import parse_qsl, urlsplit
|
|
|
|
|
|
SCHEMA_VERSION = "gitea_authenticated_inventory_payload_validation_v1"
|
|
PAYLOAD_SCHEMA_VERSION = "gitea_repo_inventory_v1"
|
|
ACCEPTED_VISIBILITY_SCOPES = {"authenticated", "admin_export"}
|
|
REQUIRED_ATTESTATIONS = {
|
|
"no_token_value",
|
|
"no_write_token",
|
|
"no_webhook_secret",
|
|
"no_deploy_key_private_key",
|
|
"no_runner_registration_token",
|
|
"no_cookie_or_session",
|
|
"no_gitea_db_dump",
|
|
"no_git_object_pack",
|
|
}
|
|
FORBIDDEN_TRUE_FIELDS = {
|
|
"repo_write_allowed",
|
|
"refs_sync_allowed",
|
|
"github_primary_switch_authorized",
|
|
"runtime_execution_authorized",
|
|
"write_to_gitea",
|
|
"create_gitea_repo",
|
|
"delete_or_archive_gitea_repo",
|
|
"sync_git_refs",
|
|
"force_push",
|
|
}
|
|
SECRET_PATTERNS = {
|
|
"authorization_header": re.compile(r"Authorization\s*:", re.IGNORECASE),
|
|
"bearer_token": re.compile(r"Bearer\s+[A-Za-z0-9._~+/=-]{12,}", re.IGNORECASE),
|
|
"cookie_header": re.compile(r"\bCookie\s*:", re.IGNORECASE),
|
|
"password_assignment": re.compile(r"\bpassword\s*[:=]\s*[^,\s]+", re.IGNORECASE),
|
|
"private_key": re.compile(r"-----BEGIN [A-Z ]*PRIVATE KEY-----"),
|
|
"token_assignment": re.compile(r"\btoken\s*[:=]\s*[^,\s]+", re.IGNORECASE),
|
|
}
|
|
SECRET_QUERY_KEYS = {"access_token", "auth", "key", "password", "secret", "token"}
|
|
ROOT = Path(__file__).resolve().parents[2]
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(
|
|
description="Validate redacted Gitea authenticated/admin inventory payload.",
|
|
)
|
|
parser.add_argument(
|
|
"--input",
|
|
type=Path,
|
|
default=ROOT / "docs/security/gitea-repo-inventory.snapshot.json",
|
|
help="Payload JSON to validate.",
|
|
)
|
|
parser.add_argument("--output", type=Path, help="Write validation JSON here.")
|
|
return parser.parse_args()
|
|
|
|
|
|
def load_json(path: Path) -> dict[str, Any]:
|
|
payload = json.loads(path.read_text(encoding="utf-8"))
|
|
if not isinstance(payload, dict):
|
|
raise SystemExit(f"json_not_object={path}")
|
|
return payload
|
|
|
|
|
|
def validate_payload(payload: dict[str, Any]) -> dict[str, Any]:
|
|
blockers: list[str] = []
|
|
sensitive_hits = find_sensitive_strings(payload)
|
|
forbidden_true_fields = find_forbidden_true_fields(payload)
|
|
|
|
if payload.get("schema_version") != PAYLOAD_SCHEMA_VERSION:
|
|
blockers.append(f"schema_version_not_{PAYLOAD_SCHEMA_VERSION}")
|
|
if payload.get("status") != "ok":
|
|
blockers.append("status_not_ok")
|
|
visibility_scope = str(payload.get("visibility_scope") or "")
|
|
if visibility_scope not in ACCEPTED_VISIBILITY_SCOPES:
|
|
blockers.append("visibility_scope_not_authenticated_or_admin_export")
|
|
|
|
repos = [repo for repo in as_list(payload.get("repos")) if isinstance(repo, dict)]
|
|
repo_count = as_int(payload.get("repo_count"))
|
|
if repo_count != len(repos):
|
|
blockers.append("repo_count_mismatch")
|
|
if repo_count < 4:
|
|
blockers.append("repo_count_below_current_public_floor")
|
|
blockers.extend(validate_repos(repos))
|
|
|
|
if is_placeholder(payload.get("coverage_gap_explanation")):
|
|
blockers.append("coverage_gap_explanation_missing")
|
|
blockers.extend(validate_redaction_attestation(payload.get("redaction_attestation")))
|
|
|
|
if forbidden_true_fields:
|
|
status = "rejected_execution_request"
|
|
elif sensitive_hits:
|
|
status = "quarantined_sensitive_payload"
|
|
elif blockers:
|
|
status = "needs_supplement"
|
|
else:
|
|
status = "accepted_for_private_inventory_review_only"
|
|
|
|
return {
|
|
"schema_version": SCHEMA_VERSION,
|
|
"status": status,
|
|
"priority": "P0-003",
|
|
"scope": "gitea_authenticated_inventory_payload_validation",
|
|
"result": {
|
|
"accepted_payload_count": (
|
|
1 if status == "accepted_for_private_inventory_review_only" else 0
|
|
),
|
|
"repo_count": repo_count,
|
|
"visible_repo_count": len(repos),
|
|
"blocker_count": len(blockers),
|
|
"sensitive_payload_hit_count": len(sensitive_hits),
|
|
"forbidden_true_field_count": len(forbidden_true_fields),
|
|
"token_value_collection_allowed": False,
|
|
"repo_write_allowed": False,
|
|
"refs_sync_allowed": False,
|
|
"github_primary_switch_authorized": False,
|
|
"runtime_gate_count": 0,
|
|
},
|
|
"blockers": blockers,
|
|
"sensitive_payload_hits": sensitive_hits,
|
|
"forbidden_true_fields": forbidden_true_fields,
|
|
"operation_boundaries": {
|
|
"payload_persisted": False,
|
|
"gitea_api_called": False,
|
|
"gitea_write_performed": False,
|
|
"repo_write_performed": False,
|
|
"refs_sync_performed": False,
|
|
"github_api_used": False,
|
|
"secret_plaintext_read": False,
|
|
"token_value_collection_allowed": False,
|
|
"runtime_action_performed": False,
|
|
"raw_session_or_sqlite_read_performed": False,
|
|
},
|
|
"safe_next_step": (
|
|
"review_redacted_inventory_payload_then_update_gitea_inventory_snapshot"
|
|
if status == "accepted_for_private_inventory_review_only"
|
|
else "supplement_authenticated_or_admin_export_redacted_inventory_payload"
|
|
),
|
|
}
|
|
|
|
|
|
def validate_repos(repos: list[dict[str, Any]]) -> list[str]:
|
|
blockers: list[str] = []
|
|
seen: set[str] = set()
|
|
for index, repo in enumerate(repos):
|
|
identity = str(repo.get("full_name") or repo.get("gitea_repo") or "")
|
|
if not identity:
|
|
blockers.append(f"repos[{index}].identity_missing")
|
|
elif identity in seen:
|
|
blockers.append(f"repos[{index}].identity_duplicate")
|
|
seen.add(identity)
|
|
for key in ("name", "default_branch", "clone_url_redacted", "ssh_url_redacted"):
|
|
if is_placeholder(repo.get(key)):
|
|
blockers.append(f"repos[{index}].{key}_missing")
|
|
if is_placeholder(repo.get("owner")) and is_placeholder(as_dict(repo.get("owner")).get("login")):
|
|
blockers.append(f"repos[{index}].owner_missing")
|
|
for key in ("private", "archived", "empty"):
|
|
if not isinstance(repo.get(key), bool):
|
|
blockers.append(f"repos[{index}].{key}_not_boolean")
|
|
for key in ("clone_url_redacted", "ssh_url_redacted"):
|
|
value = str(repo.get(key) or "")
|
|
if url_has_secret(value):
|
|
blockers.append(f"repos[{index}].{key}_not_redacted")
|
|
return blockers
|
|
|
|
|
|
def validate_redaction_attestation(value: Any) -> list[str]:
|
|
attestation = as_dict(value)
|
|
if not attestation:
|
|
return ["redaction_attestation_missing"]
|
|
blockers: list[str] = []
|
|
for key in sorted(REQUIRED_ATTESTATIONS):
|
|
if attestation.get(key) is not True:
|
|
blockers.append(f"redaction_attestation.{key}_not_true")
|
|
return blockers
|
|
|
|
|
|
def find_sensitive_strings(value: Any) -> list[str]:
|
|
hits: list[str] = []
|
|
|
|
def walk(node: Any, path: str) -> None:
|
|
if isinstance(node, dict):
|
|
for key, item in node.items():
|
|
walk(item, f"{path}.{key}" if path else str(key))
|
|
elif isinstance(node, list):
|
|
for index, item in enumerate(node):
|
|
walk(item, f"{path}[{index}]")
|
|
elif isinstance(node, str):
|
|
for name, pattern in SECRET_PATTERNS.items():
|
|
if pattern.search(node):
|
|
hits.append(f"{path}:{name}")
|
|
if url_has_secret(node):
|
|
hits.append(f"{path}:url_contains_secret_material")
|
|
|
|
walk(value, "")
|
|
return sorted(set(hits))
|
|
|
|
|
|
def find_forbidden_true_fields(value: Any) -> list[str]:
|
|
hits: list[str] = []
|
|
|
|
def walk(node: Any, path: str) -> None:
|
|
if isinstance(node, dict):
|
|
for key, item in node.items():
|
|
next_path = f"{path}.{key}" if path else str(key)
|
|
if key in FORBIDDEN_TRUE_FIELDS and item is True:
|
|
hits.append(next_path)
|
|
walk(item, next_path)
|
|
elif isinstance(node, list):
|
|
for index, item in enumerate(node):
|
|
walk(item, f"{path}[{index}]")
|
|
|
|
walk(value, "")
|
|
return sorted(hits)
|
|
|
|
|
|
def url_has_secret(value: str) -> bool:
|
|
if "://" not in value:
|
|
return False
|
|
parsed = urlsplit(value)
|
|
if parsed.username or parsed.password:
|
|
return True
|
|
return any(key.lower() in SECRET_QUERY_KEYS for key, _ in parse_qsl(parsed.query))
|
|
|
|
|
|
def is_placeholder(value: Any) -> bool:
|
|
if value is None:
|
|
return True
|
|
if isinstance(value, str):
|
|
return value.strip().lower() in {"", "pending", "todo", "tbd", "n/a", "na"}
|
|
return False
|
|
|
|
|
|
def as_list(value: Any) -> list[Any]:
|
|
return value if isinstance(value, list) else []
|
|
|
|
|
|
def as_dict(value: Any) -> dict[str, Any]:
|
|
return value if isinstance(value, dict) else {}
|
|
|
|
|
|
def as_int(value: Any) -> int:
|
|
try:
|
|
return int(value)
|
|
except (TypeError, ValueError):
|
|
return 0
|
|
|
|
|
|
def main() -> int:
|
|
args = parse_args()
|
|
validation = validate_payload(load_json(args.input))
|
|
text = json.dumps(validation, ensure_ascii=False, indent=2) + "\n"
|
|
if args.output:
|
|
if args.output.resolve() == args.input.resolve():
|
|
raise SystemExit("output_must_not_equal_input")
|
|
args.output.parent.mkdir(parents=True, exist_ok=True)
|
|
args.output.write_text(text, encoding="utf-8")
|
|
else:
|
|
print(text, end="")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|