#!/usr/bin/env python3 """Validate source-control owner response packets stay read-only. This is a repo-snapshot-only guard. It reads committed JSON snapshots and does not call GitHub, Gitea, AwoooP, Kali, or any runtime API. """ from __future__ import annotations import argparse import json from pathlib import Path from typing import Any LANES = [ { "lane_id": "s4_9_gitea_inventory_owner_attestation_response", "path": "gitea-inventory-owner-attestation-response.snapshot.json", "expected_templates": 5, "false_flags": [ "token_value_collection_allowed", "raw_secret_allowed", "repo_write_allowed", "refs_sync_allowed", "github_primary_switch_authorized", "action_buttons_allowed", ], "expected_preflight_checks": [ "preflight-known-attestation-item", "preflight-required-owner-fields", "preflight-allowed-decision", "preflight-redacted-evidence-only", "preflight-no-execution-request", "preflight-all-five-items-before-accepted", ], "expected_outcome_lanes": [ "ready_for_owner_review", "request_more_evidence", "quarantine_sensitive_payload", "reject_execution_request", "keep_waiting_owner_response", ], "expected_request_packet_id": "s4_9_gitea_owner_attestation_response_request", "expected_request_template_ids": [ "response-public-only-vs-local-gitea-gap", "response-org-user-endpoint-identity", "response-internal-110-adjacent-scope", "response-repo-owner-canonical-scope", "response-legacy-or-inaccessible-disposition", ], "expected_collection_checks": [ "collection-request-packet-displayed", "collection-read-only-submission-mode", "collection-five-template-tracking", "collection-redacted-evidence-only", "collection-no-approval-language", "collection-audit-metadata-only", ], "expected_template_statuses": [ "response-public-only-vs-local-gitea-gap", "response-org-user-endpoint-identity", "response-internal-110-adjacent-scope", "response-repo-owner-canonical-scope", "response-legacy-or-inaccessible-disposition", ], "expected_audit_event_templates": [ "audit-owner-response-request-shown", "audit-owner-response-received-metadata", "audit-owner-response-outcome-classified", ], "expected_redaction_examples": [ "redaction-existing-doc-ref", "redaction-owner-decision-metadata", "redaction-private-url-metadata", "redaction-api-export-summary", "redaction-quarantine-pointer", ], "expected_display_sections": [ "display-owner-response-summary", "display-owner-response-request-packet", "display-template-status-ledger", "display-audit-event-templates", "display-redaction-examples", "display-collection-checks", "display-preflight-and-outcome-lanes", "display-acceptance-and-rejection-rules", ], }, { "lane_id": "s4_10_github_target_owner_decision_response", "path": "github-target-owner-decision-response.snapshot.json", "expected_templates": 7, "false_flags": [ "repo_creation_authorized", "visibility_change_authorized", "refs_sync_authorized", "github_primary_switch_authorized", "secret_value_collection_allowed", "action_buttons_allowed", ], "expected_request_packet_id": "s4_10_github_target_owner_decision_response_request", "expected_request_template_ids": [ "target-awoooi-refs-blocked", "target-clawbot-v5-refs-blocked", "target-wooo-aiops-refs-blocked", "target-wooo-infra-config-internal-remote", "target-ewoooc-private-or-new", "target-bitan-pharmacy-private-or-new", "target-tsenyang-website-private-or-new", ], "expected_template_statuses": [ "target-awoooi-refs-blocked", "target-clawbot-v5-refs-blocked", "target-wooo-aiops-refs-blocked", "target-wooo-infra-config-internal-remote", "target-ewoooc-private-or-new", "target-bitan-pharmacy-private-or-new", "target-tsenyang-website-private-or-new", ], }, { "lane_id": "s4_11_ref_truth_owner_response", "path": "source-control-ref-truth-owner-response.snapshot.json", "expected_templates": 5, "false_flags": [ "refs_sync_authorized", "refs_delete_authorized", "force_push_authorized", "github_primary_switch_authorized", "secret_value_collection_allowed", "action_buttons_allowed", ], }, { "lane_id": "s4_12_workflow_secret_name_owner_response", "path": "source-control-workflow-secret-name-owner-response.snapshot.json", "expected_templates": 5, "false_flags": [ "secret_value_collection_allowed", "write_token_allowed", "workflow_modification_authorized", "webhook_modification_authorized", "runner_change_authorized", "deploy_key_change_authorized", "branch_protection_change_authorized", "repo_secret_change_authorized", "github_hosted_runner_enable_authorized", "refs_sync_authorized", "github_primary_switch_authorized", "action_buttons_allowed", ], }, ] def load_json(path: Path) -> dict[str, Any]: return json.loads(path.read_text(encoding="utf-8")) def fail(label: str, actual: Any, expected: Any) -> None: raise SystemExit(f"BLOCKED {label}: expected {expected!r}, got {actual!r}") def assert_equal(label: str, actual: Any, expected: Any) -> None: if actual != expected: fail(label, actual, expected) def assert_false(label: str, actual: Any) -> None: assert_equal(label, actual, False) def assert_true(label: str, actual: Any) -> None: assert_equal(label, actual, True) def validate(root: Path) -> None: security_dir = root / "docs" / "security" rollup = load_json(security_dir / "source-control-owner-response-validation-rollup.snapshot.json") rollup_summary = rollup["summary"] assert_equal("rollup.status", rollup["status"], "draft_waiting_owner_responses") assert_false("rollup.runtime_execution_authorized", rollup["runtime_execution_authorized"]) assert_equal("rollup.response_packet_count", rollup_summary["response_packet_count"], len(LANES)) assert_equal("rollup.validation_lane_count", rollup_summary["validation_lane_count"], len(LANES)) assert_equal("rollup.total_response_template_count", rollup_summary["total_response_template_count"], 22) assert_equal("rollup.total_received_response_count", rollup_summary["total_received_response_count"], 0) assert_equal("rollup.total_accepted_response_count", rollup_summary["total_accepted_response_count"], 0) assert_equal("rollup.total_rejected_response_count", rollup_summary["total_rejected_response_count"], 0) assert_equal("rollup.total_acceptance_check_count", rollup_summary["total_acceptance_check_count"], 32) assert_equal("rollup.total_rejection_rule_count", rollup_summary["total_rejection_rule_count"], 40) assert_true("rollup.quarantine_required", rollup_summary["quarantine_required"]) assert_equal("rollup.primary_ready_count", rollup_summary["primary_ready_count"], 0) for flag in [ "runtime_execution_authorized", "token_value_collection_allowed", "secret_value_collection_allowed", "write_token_allowed", "repo_creation_authorized", "visibility_change_authorized", "gitea_repo_write_authorized", "refs_sync_authorized", "refs_delete_authorized", "force_push_authorized", "workflow_modification_authorized", "runner_enablement_authorized", "github_hosted_runner_enable_authorized", "github_primary_switch_authorized", "action_buttons_allowed", ]: assert_false(f"rollup.{flag}", rollup_summary[flag]) lane_by_id = {lane["lane_id"]: lane for lane in rollup["validation_lanes"]} missing_lane_by_id = {lane["lane_id"]: lane for lane in rollup["missing_response_lanes"]} collection_order_by_id = {item["lane_id"]: item for item in rollup["owner_response_collection_order"]} next_collection_candidate = rollup["next_collection_candidate"] total_templates = 0 total_acceptance_checks = 0 total_rejection_rules = 0 for index, lane in enumerate(LANES, start=1): snapshot = load_json(security_dir / lane["path"]) summary = snapshot["summary"] rollup_lane = lane_by_id[lane["lane_id"]] missing_lane = missing_lane_by_id[lane["lane_id"]] collection_item = collection_order_by_id[lane["lane_id"]] assert_equal(f"{lane['lane_id']}.status", summary["owner_response_status"], "waiting_owner_response") assert_equal(f"{lane['lane_id']}.response_template_count", summary["response_template_count"], lane["expected_templates"]) assert_equal(f"{lane['lane_id']}.received_response_count", summary["received_response_count"], 0) assert_equal(f"{lane['lane_id']}.accepted_response_count", summary["accepted_response_count"], 0) assert_equal(f"{lane['lane_id']}.rejected_response_count", summary["rejected_response_count"], 0) assert_equal(f"{lane['lane_id']}.acceptance_check_count", summary["acceptance_check_count"], 8) assert_equal(f"{lane['lane_id']}.rejection_rule_count", summary["rejection_rule_count"], 10) expected_template_statuses = lane.get("expected_template_statuses") if expected_template_statuses is not None: template_statuses = snapshot["owner_response_template_statuses"] assert_equal( f"{lane['lane_id']}.owner_response_template_status_count", summary["owner_response_template_status_count"], len(expected_template_statuses), ) assert_equal( f"{lane['lane_id']}.owner_response_template_status_ids", [item["template_id"] for item in template_statuses], expected_template_statuses, ) assert_equal( f"{lane['lane_id']}.owner_response_template_status_display_order", [item["display_order"] for item in template_statuses], list(range(1, len(expected_template_statuses) + 1)), ) for item in template_statuses: assert_equal( f"{lane['lane_id']}.{item['template_id']}.collection_status", item["collection_status"], "waiting_owner_response", ) assert_equal( f"{lane['lane_id']}.{item['template_id']}.request_status", item["request_status"], "request_ready_not_sent", ) assert_equal(f"{lane['lane_id']}.{item['template_id']}.received_response_count", item["received_response_count"], 0) assert_equal(f"{lane['lane_id']}.{item['template_id']}.accepted_response_count", item["accepted_response_count"], 0) assert_equal(f"{lane['lane_id']}.{item['template_id']}.rejected_response_count", item["rejected_response_count"], 0) assert_equal( f"{lane['lane_id']}.{item['template_id']}.latest_outcome_lane", item["latest_outcome_lane"], "keep_waiting_owner_response", ) assert_equal( f"{lane['lane_id']}.{item['template_id']}.awooop_display_mode", item["awooop_display_mode"], "display_template_status_only", ) assert_false( f"{lane['lane_id']}.{item['template_id']}.execution_authorized", item["execution_authorized"], ) assert_true(f"{lane['lane_id']}.{item['template_id']}.not_approval", item["not_approval"]) expected_audit_event_templates = lane.get("expected_audit_event_templates") if expected_audit_event_templates is not None: audit_event_templates = snapshot["owner_response_audit_event_templates"] assert_equal( f"{lane['lane_id']}.owner_response_audit_event_template_count", summary["owner_response_audit_event_template_count"], len(expected_audit_event_templates), ) assert_equal( f"{lane['lane_id']}.owner_response_audit_event_template_ids", [item["event_template_id"] for item in audit_event_templates], expected_audit_event_templates, ) assert_equal( f"{lane['lane_id']}.owner_response_audit_event_display_order", [item["display_order"] for item in audit_event_templates], list(range(1, len(expected_audit_event_templates) + 1)), ) for item in audit_event_templates: assert_equal( f"{lane['lane_id']}.{item['event_template_id']}.event_status", item["event_status"], "template_only_not_emitted", ) assert_equal(f"{lane['lane_id']}.{item['event_template_id']}.emitted_event_count", item["emitted_event_count"], 0) assert_false( f"{lane['lane_id']}.{item['event_template_id']}.stored_raw_payload_allowed", item["stored_raw_payload_allowed"], ) assert_equal( f"{lane['lane_id']}.{item['event_template_id']}.awooop_display_mode", item["awooop_display_mode"], "display_audit_template_only", ) assert_false( f"{lane['lane_id']}.{item['event_template_id']}.execution_authorized", item["execution_authorized"], ) assert_true(f"{lane['lane_id']}.{item['event_template_id']}.not_approval", item["not_approval"]) expected_redaction_examples = lane.get("expected_redaction_examples") if expected_redaction_examples is not None: redaction_examples = snapshot["owner_response_redaction_examples"] assert_equal( f"{lane['lane_id']}.owner_response_redaction_example_count", summary["owner_response_redaction_example_count"], len(expected_redaction_examples), ) assert_equal( f"{lane['lane_id']}.owner_response_redaction_example_ids", [item["example_id"] for item in redaction_examples], expected_redaction_examples, ) assert_equal( f"{lane['lane_id']}.owner_response_redaction_example_display_order", [item["display_order"] for item in redaction_examples], list(range(1, len(expected_redaction_examples) + 1)), ) for item in redaction_examples: assert_equal( f"{lane['lane_id']}.{item['example_id']}.example_status", item["example_status"], "template_example_only", ) assert_false( f"{lane['lane_id']}.{item['example_id']}.stored_raw_payload_allowed", item["stored_raw_payload_allowed"], ) assert_equal( f"{lane['lane_id']}.{item['example_id']}.awooop_display_mode", item["awooop_display_mode"], "display_redaction_example_only", ) assert_false( f"{lane['lane_id']}.{item['example_id']}.execution_authorized", item["execution_authorized"], ) assert_true(f"{lane['lane_id']}.{item['example_id']}.not_approval", item["not_approval"]) expected_display_sections = lane.get("expected_display_sections") if expected_display_sections is not None: display_sections = snapshot["owner_response_display_sections"] assert_equal( f"{lane['lane_id']}.owner_response_display_section_count", summary["owner_response_display_section_count"], len(expected_display_sections), ) assert_equal( f"{lane['lane_id']}.owner_response_display_section_ids", [item["section_id"] for item in display_sections], expected_display_sections, ) assert_equal( f"{lane['lane_id']}.owner_response_display_section_order", [item["display_order"] for item in display_sections], list(range(1, len(expected_display_sections) + 1)), ) for item in display_sections: assert_equal( f"{lane['lane_id']}.{item['section_id']}.section_status", item["section_status"], "display_contract_only", ) assert_false( f"{lane['lane_id']}.{item['section_id']}.execution_authorized", item["execution_authorized"], ) assert_true(f"{lane['lane_id']}.{item['section_id']}.not_approval", item["not_approval"]) expected_request_packet_id = lane.get("expected_request_packet_id") if expected_request_packet_id is not None: request_packet = snapshot["owner_response_request_packet"] expected_request_template_ids = lane["expected_request_template_ids"] assert_equal( f"{lane['lane_id']}.owner_response_request_packet_count", summary["owner_response_request_packet_count"], 1, ) assert_equal( f"{lane['lane_id']}.owner_response_request_packet_id", request_packet["request_id"], expected_request_packet_id, ) assert_equal( f"{lane['lane_id']}.owner_response_request_display_status", request_packet["display_status"], "ready_to_request_owner_response", ) assert_equal( f"{lane['lane_id']}.owner_response_request_template_ids", request_packet["requested_template_ids"], expected_request_template_ids, ) assert_equal( f"{lane['lane_id']}.owner_response_request_awooop_display_mode", request_packet["awooop_display_mode"], "display_owner_response_request_only", ) assert_false( f"{lane['lane_id']}.owner_response_request_execution_authorized", request_packet["execution_authorized"], ) assert_true(f"{lane['lane_id']}.owner_response_request_not_approval", request_packet["not_approval"]) expected_collection_checks = lane.get("expected_collection_checks") if expected_collection_checks is not None: collection_checks = snapshot["owner_response_collection_checks"] assert_equal( f"{lane['lane_id']}.owner_response_collection_check_count", summary["owner_response_collection_check_count"], len(expected_collection_checks), ) assert_equal( f"{lane['lane_id']}.owner_response_collection_check_ids", [item["check_id"] for item in collection_checks], expected_collection_checks, ) assert_equal( f"{lane['lane_id']}.owner_response_collection_display_order", [item["display_order"] for item in collection_checks], list(range(1, len(expected_collection_checks) + 1)), ) for item in collection_checks: assert_true(f"{lane['lane_id']}.{item['check_id']}.required", item["required"]) assert_false( f"{lane['lane_id']}.{item['check_id']}.execution_authorized", item["execution_authorized"], ) assert_true(f"{lane['lane_id']}.{item['check_id']}.not_approval", item["not_approval"]) expected_preflight_checks = lane.get("expected_preflight_checks") if expected_preflight_checks is not None: intake_preflight_checks = snapshot["intake_preflight_checks"] assert_equal( f"{lane['lane_id']}.intake_preflight_check_count", summary["intake_preflight_check_count"], len(expected_preflight_checks), ) assert_equal( f"{lane['lane_id']}.intake_preflight_check_ids", [item["check_id"] for item in intake_preflight_checks], expected_preflight_checks, ) assert_equal( f"{lane['lane_id']}.intake_preflight_display_order", [item["display_order"] for item in intake_preflight_checks], list(range(1, len(expected_preflight_checks) + 1)), ) for item in intake_preflight_checks: assert_true(f"{lane['lane_id']}.{item['check_id']}.required", item["required"]) assert_false( f"{lane['lane_id']}.{item['check_id']}.execution_authorized", item["execution_authorized"], ) expected_outcome_lanes = lane.get("expected_outcome_lanes") if expected_outcome_lanes is not None: intake_outcome_lanes = snapshot["intake_outcome_lanes"] assert_equal( f"{lane['lane_id']}.intake_outcome_lane_count", summary["intake_outcome_lane_count"], len(expected_outcome_lanes), ) assert_equal( f"{lane['lane_id']}.intake_outcome_lane_ids", [item["lane_id"] for item in intake_outcome_lanes], expected_outcome_lanes, ) assert_equal( f"{lane['lane_id']}.intake_outcome_display_order", [item["display_order"] for item in intake_outcome_lanes], list(range(1, len(expected_outcome_lanes) + 1)), ) for item in intake_outcome_lanes: assert_false( f"{lane['lane_id']}.{item['lane_id']}.execution_authorized", item["execution_authorized"], ) assert_true(f"{lane['lane_id']}.{item['lane_id']}.not_approval", item["not_approval"]) assert_false(f"{lane['lane_id']}.runtime_execution_authorized", snapshot["runtime_execution_authorized"]) assert_false(f"{lane['lane_id']}.rollup_execution_authorized", rollup_lane["execution_authorized"]) assert_equal( f"{lane['lane_id']}.rollup_response_template_count", rollup_lane["response_template_count"], lane["expected_templates"], ) assert_equal(f"{lane['lane_id']}.rollup_received_response_count", rollup_lane["received_response_count"], 0) assert_equal(f"{lane['lane_id']}.rollup_accepted_response_count", rollup_lane["accepted_response_count"], 0) assert_equal(f"{lane['lane_id']}.rollup_rejected_response_count", rollup_lane["rejected_response_count"], 0) assert_equal(f"{lane['lane_id']}.missing_current_status", missing_lane["current_status"], "waiting_owner_response") assert_equal( f"{lane['lane_id']}.missing_response_template_count", missing_lane["response_template_count"], lane["expected_templates"], ) assert_equal(f"{lane['lane_id']}.missing_received_response_count", missing_lane["received_response_count"], 0) assert_equal(f"{lane['lane_id']}.missing_accepted_response_count", missing_lane["accepted_response_count"], 0) assert_equal(f"{lane['lane_id']}.missing_awooop_display_mode", missing_lane["awooop_display_mode"], "observe_missing_response") assert_equal(f"{lane['lane_id']}.collection_order", collection_item["order"], index) assert_equal( f"{lane['lane_id']}.collection_awooop_action", collection_item["awooop_action"], "display_next_collection_item", ) assert_true(f"{lane['lane_id']}.collection_blocked_until_received", collection_item["blocked_until_received"]) assert_false(f"{lane['lane_id']}.collection_execution_authorized", collection_item["execution_authorized"]) for flag in lane["false_flags"]: assert_false(f"{lane['lane_id']}.{flag}", summary[flag]) total_templates += summary["response_template_count"] total_acceptance_checks += summary["acceptance_check_count"] total_rejection_rules += summary["rejection_rule_count"] assert_equal("source_packets.total_templates", total_templates, rollup_summary["total_response_template_count"]) assert_equal("source_packets.total_acceptance_checks", total_acceptance_checks, rollup_summary["total_acceptance_check_count"]) assert_equal("source_packets.total_rejection_rules", total_rejection_rules, rollup_summary["total_rejection_rule_count"]) assert_equal("missing_response_lanes.count", len(missing_lane_by_id), len(LANES)) assert_equal("owner_response_collection_order.count", len(collection_order_by_id), len(LANES)) first_lane = LANES[0] first_collection_item = collection_order_by_id[first_lane["lane_id"]] first_missing_lane = missing_lane_by_id[first_lane["lane_id"]] assert_equal("next_collection_candidate.order", next_collection_candidate["order"], 1) assert_equal("next_collection_candidate.lane_id", next_collection_candidate["lane_id"], first_lane["lane_id"]) assert_equal( "next_collection_candidate.display_status", next_collection_candidate["display_status"], "next_owner_response_required", ) assert_equal( "next_collection_candidate.source_contract", next_collection_candidate["source_contract"], first_missing_lane["source_contract"], ) assert_equal( "next_collection_candidate.required_packet", next_collection_candidate["required_packet"], first_collection_item["required_packet"], ) assert_equal( "next_collection_candidate.required_response_template_count", next_collection_candidate["required_response_template_count"], first_lane["expected_templates"], ) assert_equal("next_collection_candidate.received_response_count", next_collection_candidate["received_response_count"], 0) assert_equal("next_collection_candidate.accepted_response_count", next_collection_candidate["accepted_response_count"], 0) assert_equal( "next_collection_candidate.minimum_response", next_collection_candidate["minimum_response"], first_collection_item["minimum_response"], ) assert_equal( "next_collection_candidate.awooop_display_mode", next_collection_candidate["awooop_display_mode"], "display_next_collection_item_only", ) assert_true("next_collection_candidate.blocked_until_received", next_collection_candidate["blocked_until_received"]) assert_false("next_collection_candidate.execution_authorized", next_collection_candidate["execution_authorized"]) assert_true("next_collection_candidate.not_approval", next_collection_candidate["not_approval"]) assert_equal( "next_collection_candidate.still_forbidden", next_collection_candidate["still_forbidden"], first_collection_item["still_forbidden"], ) local_validation = rollup["latest_local_validation"] assert_equal("rollup.latest_local_validation.status", local_validation["status"], "repo_snapshot_guard_pass") assert_equal("rollup.latest_local_validation.scope", local_validation["scope"], "repo_snapshot_only") assert_equal("rollup.latest_local_validation.result", local_validation["result"], "SOURCE_CONTROL_OWNER_RESPONSE_GUARD_OK") assert_equal("rollup.latest_local_validation.received_response_count", local_validation["received_response_count"], 0) assert_equal("rollup.latest_local_validation.accepted_response_count", local_validation["accepted_response_count"], 0) assert_false("rollup.latest_local_validation.runtime_actions_authorized", local_validation["runtime_actions_authorized"]) assert_false("rollup.latest_local_validation.repo_or_refs_actions_authorized", local_validation["repo_or_refs_actions_authorized"]) assert_false("rollup.latest_local_validation.workflow_or_secret_actions_authorized", local_validation["workflow_or_secret_actions_authorized"]) assert_true("rollup.latest_local_validation.not_authorization", local_validation["not_authorization"]) def main() -> None: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument( "--root", default=Path(__file__).resolve().parents[2], type=Path, help="Repository root. Defaults to the current script's repository.", ) args = parser.parse_args() validate(args.root.resolve()) print("SOURCE_CONTROL_OWNER_RESPONSE_GUARD_OK") if __name__ == "__main__": main()