Files
awoooi/scripts/security/source-control-owner-response-guard.py
2026-05-18 09:42:54 +08:00

272 lines
13 KiB
Python
Executable File

#!/usr/bin/env python3
"""Validate source-control owner response packets stay read-only.
This is a repo-snapshot-only guard. It reads committed JSON snapshots and does
not call GitHub, Gitea, AwoooP, Kali, or any runtime API.
"""
from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import Any
LANES = [
{
"lane_id": "s4_9_gitea_inventory_owner_attestation_response",
"path": "gitea-inventory-owner-attestation-response.snapshot.json",
"expected_templates": 5,
"false_flags": [
"token_value_collection_allowed",
"raw_secret_allowed",
"repo_write_allowed",
"refs_sync_allowed",
"github_primary_switch_authorized",
"action_buttons_allowed",
],
},
{
"lane_id": "s4_10_github_target_owner_decision_response",
"path": "github-target-owner-decision-response.snapshot.json",
"expected_templates": 7,
"false_flags": [
"repo_creation_authorized",
"visibility_change_authorized",
"refs_sync_authorized",
"github_primary_switch_authorized",
"secret_value_collection_allowed",
"action_buttons_allowed",
],
},
{
"lane_id": "s4_11_ref_truth_owner_response",
"path": "source-control-ref-truth-owner-response.snapshot.json",
"expected_templates": 5,
"false_flags": [
"refs_sync_authorized",
"refs_delete_authorized",
"force_push_authorized",
"github_primary_switch_authorized",
"secret_value_collection_allowed",
"action_buttons_allowed",
],
},
{
"lane_id": "s4_12_workflow_secret_name_owner_response",
"path": "source-control-workflow-secret-name-owner-response.snapshot.json",
"expected_templates": 5,
"false_flags": [
"secret_value_collection_allowed",
"write_token_allowed",
"workflow_modification_authorized",
"webhook_modification_authorized",
"runner_change_authorized",
"deploy_key_change_authorized",
"branch_protection_change_authorized",
"repo_secret_change_authorized",
"github_hosted_runner_enable_authorized",
"refs_sync_authorized",
"github_primary_switch_authorized",
"action_buttons_allowed",
],
},
]
def load_json(path: Path) -> dict[str, Any]:
return json.loads(path.read_text(encoding="utf-8"))
def fail(label: str, actual: Any, expected: Any) -> None:
raise SystemExit(f"BLOCKED {label}: expected {expected!r}, got {actual!r}")
def assert_equal(label: str, actual: Any, expected: Any) -> None:
if actual != expected:
fail(label, actual, expected)
def assert_false(label: str, actual: Any) -> None:
assert_equal(label, actual, False)
def assert_true(label: str, actual: Any) -> None:
assert_equal(label, actual, True)
def validate(root: Path) -> None:
security_dir = root / "docs" / "security"
rollup = load_json(security_dir / "source-control-owner-response-validation-rollup.snapshot.json")
rollup_summary = rollup["summary"]
assert_equal("rollup.status", rollup["status"], "draft_waiting_owner_responses")
assert_false("rollup.runtime_execution_authorized", rollup["runtime_execution_authorized"])
assert_equal("rollup.response_packet_count", rollup_summary["response_packet_count"], len(LANES))
assert_equal("rollup.validation_lane_count", rollup_summary["validation_lane_count"], len(LANES))
assert_equal("rollup.total_response_template_count", rollup_summary["total_response_template_count"], 22)
assert_equal("rollup.total_received_response_count", rollup_summary["total_received_response_count"], 0)
assert_equal("rollup.total_accepted_response_count", rollup_summary["total_accepted_response_count"], 0)
assert_equal("rollup.total_rejected_response_count", rollup_summary["total_rejected_response_count"], 0)
assert_equal("rollup.total_acceptance_check_count", rollup_summary["total_acceptance_check_count"], 32)
assert_equal("rollup.total_rejection_rule_count", rollup_summary["total_rejection_rule_count"], 40)
assert_true("rollup.quarantine_required", rollup_summary["quarantine_required"])
assert_equal("rollup.primary_ready_count", rollup_summary["primary_ready_count"], 0)
for flag in [
"runtime_execution_authorized",
"token_value_collection_allowed",
"secret_value_collection_allowed",
"write_token_allowed",
"repo_creation_authorized",
"visibility_change_authorized",
"gitea_repo_write_authorized",
"refs_sync_authorized",
"refs_delete_authorized",
"force_push_authorized",
"workflow_modification_authorized",
"runner_enablement_authorized",
"github_hosted_runner_enable_authorized",
"github_primary_switch_authorized",
"action_buttons_allowed",
]:
assert_false(f"rollup.{flag}", rollup_summary[flag])
lane_by_id = {lane["lane_id"]: lane for lane in rollup["validation_lanes"]}
missing_lane_by_id = {lane["lane_id"]: lane for lane in rollup["missing_response_lanes"]}
collection_order_by_id = {item["lane_id"]: item for item in rollup["owner_response_collection_order"]}
next_collection_candidate = rollup["next_collection_candidate"]
total_templates = 0
total_acceptance_checks = 0
total_rejection_rules = 0
for index, lane in enumerate(LANES, start=1):
snapshot = load_json(security_dir / lane["path"])
summary = snapshot["summary"]
rollup_lane = lane_by_id[lane["lane_id"]]
missing_lane = missing_lane_by_id[lane["lane_id"]]
collection_item = collection_order_by_id[lane["lane_id"]]
assert_equal(f"{lane['lane_id']}.status", summary["owner_response_status"], "waiting_owner_response")
assert_equal(f"{lane['lane_id']}.response_template_count", summary["response_template_count"], lane["expected_templates"])
assert_equal(f"{lane['lane_id']}.received_response_count", summary["received_response_count"], 0)
assert_equal(f"{lane['lane_id']}.accepted_response_count", summary["accepted_response_count"], 0)
assert_equal(f"{lane['lane_id']}.rejected_response_count", summary["rejected_response_count"], 0)
assert_equal(f"{lane['lane_id']}.acceptance_check_count", summary["acceptance_check_count"], 8)
assert_equal(f"{lane['lane_id']}.rejection_rule_count", summary["rejection_rule_count"], 10)
assert_false(f"{lane['lane_id']}.runtime_execution_authorized", snapshot["runtime_execution_authorized"])
assert_false(f"{lane['lane_id']}.rollup_execution_authorized", rollup_lane["execution_authorized"])
assert_equal(
f"{lane['lane_id']}.rollup_response_template_count",
rollup_lane["response_template_count"],
lane["expected_templates"],
)
assert_equal(f"{lane['lane_id']}.rollup_received_response_count", rollup_lane["received_response_count"], 0)
assert_equal(f"{lane['lane_id']}.rollup_accepted_response_count", rollup_lane["accepted_response_count"], 0)
assert_equal(f"{lane['lane_id']}.rollup_rejected_response_count", rollup_lane["rejected_response_count"], 0)
assert_equal(f"{lane['lane_id']}.missing_current_status", missing_lane["current_status"], "waiting_owner_response")
assert_equal(
f"{lane['lane_id']}.missing_response_template_count",
missing_lane["response_template_count"],
lane["expected_templates"],
)
assert_equal(f"{lane['lane_id']}.missing_received_response_count", missing_lane["received_response_count"], 0)
assert_equal(f"{lane['lane_id']}.missing_accepted_response_count", missing_lane["accepted_response_count"], 0)
assert_equal(f"{lane['lane_id']}.missing_awooop_display_mode", missing_lane["awooop_display_mode"], "observe_missing_response")
assert_equal(f"{lane['lane_id']}.collection_order", collection_item["order"], index)
assert_equal(
f"{lane['lane_id']}.collection_awooop_action",
collection_item["awooop_action"],
"display_next_collection_item",
)
assert_true(f"{lane['lane_id']}.collection_blocked_until_received", collection_item["blocked_until_received"])
assert_false(f"{lane['lane_id']}.collection_execution_authorized", collection_item["execution_authorized"])
for flag in lane["false_flags"]:
assert_false(f"{lane['lane_id']}.{flag}", summary[flag])
total_templates += summary["response_template_count"]
total_acceptance_checks += summary["acceptance_check_count"]
total_rejection_rules += summary["rejection_rule_count"]
assert_equal("source_packets.total_templates", total_templates, rollup_summary["total_response_template_count"])
assert_equal("source_packets.total_acceptance_checks", total_acceptance_checks, rollup_summary["total_acceptance_check_count"])
assert_equal("source_packets.total_rejection_rules", total_rejection_rules, rollup_summary["total_rejection_rule_count"])
assert_equal("missing_response_lanes.count", len(missing_lane_by_id), len(LANES))
assert_equal("owner_response_collection_order.count", len(collection_order_by_id), len(LANES))
first_lane = LANES[0]
first_collection_item = collection_order_by_id[first_lane["lane_id"]]
first_missing_lane = missing_lane_by_id[first_lane["lane_id"]]
assert_equal("next_collection_candidate.order", next_collection_candidate["order"], 1)
assert_equal("next_collection_candidate.lane_id", next_collection_candidate["lane_id"], first_lane["lane_id"])
assert_equal(
"next_collection_candidate.display_status",
next_collection_candidate["display_status"],
"next_owner_response_required",
)
assert_equal(
"next_collection_candidate.source_contract",
next_collection_candidate["source_contract"],
first_missing_lane["source_contract"],
)
assert_equal(
"next_collection_candidate.required_packet",
next_collection_candidate["required_packet"],
first_collection_item["required_packet"],
)
assert_equal(
"next_collection_candidate.required_response_template_count",
next_collection_candidate["required_response_template_count"],
first_lane["expected_templates"],
)
assert_equal("next_collection_candidate.received_response_count", next_collection_candidate["received_response_count"], 0)
assert_equal("next_collection_candidate.accepted_response_count", next_collection_candidate["accepted_response_count"], 0)
assert_equal(
"next_collection_candidate.minimum_response",
next_collection_candidate["minimum_response"],
first_collection_item["minimum_response"],
)
assert_equal(
"next_collection_candidate.awooop_display_mode",
next_collection_candidate["awooop_display_mode"],
"display_next_collection_item_only",
)
assert_true("next_collection_candidate.blocked_until_received", next_collection_candidate["blocked_until_received"])
assert_false("next_collection_candidate.execution_authorized", next_collection_candidate["execution_authorized"])
assert_true("next_collection_candidate.not_approval", next_collection_candidate["not_approval"])
assert_equal(
"next_collection_candidate.still_forbidden",
next_collection_candidate["still_forbidden"],
first_collection_item["still_forbidden"],
)
local_validation = rollup["latest_local_validation"]
assert_equal("rollup.latest_local_validation.status", local_validation["status"], "repo_snapshot_guard_pass")
assert_equal("rollup.latest_local_validation.scope", local_validation["scope"], "repo_snapshot_only")
assert_equal("rollup.latest_local_validation.result", local_validation["result"], "SOURCE_CONTROL_OWNER_RESPONSE_GUARD_OK")
assert_equal("rollup.latest_local_validation.received_response_count", local_validation["received_response_count"], 0)
assert_equal("rollup.latest_local_validation.accepted_response_count", local_validation["accepted_response_count"], 0)
assert_false("rollup.latest_local_validation.runtime_actions_authorized", local_validation["runtime_actions_authorized"])
assert_false("rollup.latest_local_validation.repo_or_refs_actions_authorized", local_validation["repo_or_refs_actions_authorized"])
assert_false("rollup.latest_local_validation.workflow_or_secret_actions_authorized", local_validation["workflow_or_secret_actions_authorized"])
assert_true("rollup.latest_local_validation.not_authorization", local_validation["not_authorization"])
def main() -> None:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--root",
default=Path(__file__).resolve().parents[2],
type=Path,
help="Repository root. Defaults to the current script's repository.",
)
args = parser.parse_args()
validate(args.root.resolve())
print("SOURCE_CONTROL_OWNER_RESPONSE_GUARD_OK")
if __name__ == "__main__":
main()