Files
awoooi/scripts/security/source-control-owner-response-guard.py

1173 lines
55 KiB
Python
Executable File

#!/usr/bin/env python3
"""Validate source-control owner response packets stay read-only.
This is a repo-snapshot-only guard. It reads committed JSON snapshots and does
not call GitHub, Gitea, AwoooP, Kali, or any runtime API.
"""
from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import Any
LANES = [
{
"lane_id": "s4_9_gitea_inventory_owner_attestation_response",
"path": "gitea-inventory-owner-attestation-response.snapshot.json",
"expected_templates": 5,
"false_flags": [
"token_value_collection_allowed",
"raw_secret_allowed",
"repo_write_allowed",
"refs_sync_allowed",
"github_primary_switch_authorized",
"action_buttons_allowed",
],
"expected_preflight_checks": [
"preflight-known-attestation-item",
"preflight-required-owner-fields",
"preflight-allowed-decision",
"preflight-redacted-evidence-only",
"preflight-no-execution-request",
"preflight-all-five-items-before-accepted",
],
"expected_outcome_lanes": [
"ready_for_owner_review",
"request_more_evidence",
"quarantine_sensitive_payload",
"reject_execution_request",
"keep_waiting_owner_response",
],
"expected_request_packet_id": "s4_9_gitea_owner_attestation_response_request",
"expected_request_template_ids": [
"response-public-only-vs-local-gitea-gap",
"response-org-user-endpoint-identity",
"response-internal-110-adjacent-scope",
"response-repo-owner-canonical-scope",
"response-legacy-or-inaccessible-disposition",
],
"expected_collection_checks": [
"collection-request-packet-displayed",
"collection-read-only-submission-mode",
"collection-five-template-tracking",
"collection-redacted-evidence-only",
"collection-no-approval-language",
"collection-audit-metadata-only",
],
"expected_template_statuses": [
"response-public-only-vs-local-gitea-gap",
"response-org-user-endpoint-identity",
"response-internal-110-adjacent-scope",
"response-repo-owner-canonical-scope",
"response-legacy-or-inaccessible-disposition",
],
"expected_audit_event_templates": [
"audit-owner-response-request-shown",
"audit-owner-response-received-metadata",
"audit-owner-response-outcome-classified",
],
"expected_redaction_examples": [
"redaction-existing-doc-ref",
"redaction-owner-decision-metadata",
"redaction-private-url-metadata",
"redaction-api-export-summary",
"redaction-quarantine-pointer",
],
"expected_display_sections": [
"display-owner-response-summary",
"display-owner-response-request-packet",
"display-template-status-ledger",
"display-audit-event-templates",
"display-redaction-examples",
"display-collection-checks",
"display-preflight-and-outcome-lanes",
"display-acceptance-and-rejection-rules",
],
},
{
"lane_id": "s4_10_github_target_owner_decision_response",
"path": "github-target-owner-decision-response.snapshot.json",
"expected_templates": 7,
"false_flags": [
"repo_creation_authorized",
"visibility_change_authorized",
"refs_sync_authorized",
"github_primary_switch_authorized",
"secret_value_collection_allowed",
"action_buttons_allowed",
],
"expected_request_packet_id": "s4_10_github_target_owner_decision_response_request",
"expected_request_template_ids": [
"target-awoooi-refs-blocked",
"target-clawbot-v5-refs-blocked",
"target-wooo-aiops-refs-blocked",
"target-wooo-infra-config-internal-remote",
"target-ewoooc-private-or-new",
"target-bitan-pharmacy-private-or-new",
"target-tsenyang-website-private-or-new",
],
"expected_template_statuses": [
"target-awoooi-refs-blocked",
"target-clawbot-v5-refs-blocked",
"target-wooo-aiops-refs-blocked",
"target-wooo-infra-config-internal-remote",
"target-ewoooc-private-or-new",
"target-bitan-pharmacy-private-or-new",
"target-tsenyang-website-private-or-new",
],
"expected_audit_event_templates": [
"audit-github-target-response-request-shown",
"audit-github-target-response-received-metadata",
"audit-github-target-response-outcome-classified",
],
"expected_redaction_examples": [
"redaction-github-target-doc-ref",
"redaction-owner-visibility-canonical-metadata",
"redaction-private-target-access-metadata",
"redaction-refs-truth-dependency-summary",
"redaction-github-target-quarantine-pointer",
],
"expected_collection_checks": [
"collection-github-target-request-packet-displayed",
"collection-github-target-read-only-submission-mode",
"collection-seven-target-template-tracking",
"collection-github-target-redacted-evidence-only",
"collection-github-target-no-approval-language",
"collection-github-target-audit-metadata-only",
],
"expected_preflight_checks": [
"preflight-known-github-target",
"preflight-required-github-target-owner-fields",
"preflight-allowed-github-target-decision",
"preflight-github-target-redacted-evidence-only",
"preflight-no-source-control-execution-request",
"preflight-all-seven-targets-before-accepted",
],
},
{
"lane_id": "s4_11_ref_truth_owner_response",
"path": "source-control-ref-truth-owner-response.snapshot.json",
"expected_templates": 5,
"expected_request_packet_id": "s4_11_ref_truth_owner_response_request",
"expected_request_template_ids": [
"response-main-branch-truth-source",
"response-active-dev-branch-truth-source",
"response-drift-deprecated-candidate-batch",
"response-release-tag-retention",
"response-github-only-ref-review",
],
"expected_template_statuses": [
"response-main-branch-truth-source",
"response-active-dev-branch-truth-source",
"response-drift-deprecated-candidate-batch",
"response-release-tag-retention",
"response-github-only-ref-review",
],
"expected_audit_event_templates": [
"audit-ref-truth-response-request-shown",
"audit-ref-truth-response-received-metadata",
"audit-ref-truth-response-outcome-classified",
],
"expected_redaction_examples": [
"redaction-ref-truth-existing-doc-ref",
"redaction-main-branch-truth-metadata",
"redaction-deprecated-batch-disposition",
"redaction-release-tag-retention-metadata",
"redaction-ref-truth-quarantine-pointer",
],
"expected_collection_checks": [
"collection-ref-truth-request-packet-displayed",
"collection-ref-truth-read-only-submission-mode",
"collection-five-ref-truth-template-tracking",
"collection-ref-truth-redacted-evidence-only",
"collection-ref-truth-no-approval-language",
"collection-ref-truth-audit-metadata-only",
],
"expected_preflight_checks": [
"preflight-known-ref-truth-lane",
"preflight-required-ref-truth-owner-fields",
"preflight-allowed-ref-truth-decision",
"preflight-ref-truth-redacted-evidence-only",
"preflight-no-refs-execution-request",
"preflight-all-five-ref-truth-lanes-before-accepted",
],
"false_flags": [
"refs_sync_authorized",
"refs_delete_authorized",
"force_push_authorized",
"github_primary_switch_authorized",
"secret_value_collection_allowed",
"action_buttons_allowed",
],
},
{
"lane_id": "s4_12_workflow_secret_name_owner_response",
"path": "source-control-workflow-secret-name-owner-response.snapshot.json",
"expected_templates": 5,
"false_flags": [
"secret_value_collection_allowed",
"write_token_allowed",
"workflow_modification_authorized",
"webhook_modification_authorized",
"runner_change_authorized",
"deploy_key_change_authorized",
"branch_protection_change_authorized",
"repo_secret_change_authorized",
"github_hosted_runner_enable_authorized",
"refs_sync_authorized",
"github_primary_switch_authorized",
"action_buttons_allowed",
],
"expected_request_packet_id": "s4_12_workflow_secret_name_owner_response_request",
"expected_request_template_ids": [
"response-webhook-redacted-export",
"response-runner-label-owner",
"response-deploy-key-redacted-export",
"response-branch-protection-codeowners",
"response-repository-secret-name-parity",
],
"expected_template_statuses": [
"response-webhook-redacted-export",
"response-runner-label-owner",
"response-deploy-key-redacted-export",
"response-branch-protection-codeowners",
"response-repository-secret-name-parity",
],
"expected_audit_event_templates": [
"audit-workflow-secret-response-request-shown",
"audit-workflow-secret-response-received-metadata",
"audit-workflow-secret-response-outcome-classified",
],
"expected_redaction_examples": [
"redaction-webhook-redacted-host-metadata",
"redaction-runner-label-owner-metadata",
"redaction-deploy-key-name-scope-metadata",
"redaction-branch-protection-codeowners-metadata",
"redaction-secret-name-parity-quarantine-pointer",
],
"expected_collection_checks": [
"collection-workflow-secret-request-packet-displayed",
"collection-workflow-secret-read-only-submission-mode",
"collection-five-workflow-secret-template-tracking",
"collection-workflow-secret-redacted-evidence-only",
"collection-workflow-secret-no-approval-language",
"collection-workflow-secret-audit-metadata-only",
],
"expected_preflight_checks": [
"preflight-known-workflow-secret-lane",
"preflight-required-workflow-secret-owner-fields",
"preflight-allowed-workflow-secret-decision",
"preflight-workflow-secret-redacted-evidence-only",
"preflight-no-workflow-secret-execution-request",
"preflight-all-five-workflow-secret-lanes-before-accepted",
],
},
]
EXPECTED_ROLLUP_EVIDENCE_ROUTING_RULES = [
"evidence-routing-known-lane",
"evidence-routing-required-fields",
"evidence-routing-sensitive-payload",
"evidence-routing-execution-request",
"evidence-routing-cross-packet-conflict",
"evidence-routing-accepted-metadata",
]
EXPECTED_ROLLUP_DISPLAY_SECTIONS = [
"display-validation-summary",
"display-missing-response-lanes",
"display-owner-response-collection-order",
"display-next-collection-candidate",
"display-cross-packet-acceptance-checks",
"display-evidence-routing-rules",
"display-quarantine-and-forbidden-actions",
"display-latest-local-validation",
]
EXPECTED_ROLLUP_STATE_TRANSITION_RULES = [
"transition-waiting-to-received-pending-validation",
"transition-missing-required-fields-to-request-more-evidence",
"transition-sensitive-payload-to-mirror-quarantine",
"transition-execution-request-to-hard-rejected",
"transition-cross-packet-conflict-to-owner-review",
"transition-validation-pass-to-read-only-update",
"transition-post-update-stays-waiting-runtime-gate",
]
EXPECTED_ROLLUP_REVIEWER_CHECKLIST = [
"checklist-confirm-lane-and-template",
"checklist-confirm-required-owner-fields",
"checklist-confirm-redacted-evidence-refs",
"checklist-confirm-source-packet-preflight",
"checklist-confirm-cross-packet-consistency",
"checklist-confirm-no-sensitive-payload",
"checklist-confirm-no-execution-intent",
"checklist-confirm-read-only-update-scope",
"checklist-confirm-followup-runtime-gate-still-required",
]
EXPECTED_ROLLUP_REVIEWER_OUTCOME_LANES = [
"outcome-keep-waiting-owner-response",
"outcome-request-more-evidence",
"outcome-mirror-quarantine-sensitive-payload",
"outcome-hard-reject-execution-request",
"outcome-cross-packet-owner-review",
"outcome-read-only-update-candidate",
"outcome-waiting-followup-runtime-gate",
]
EXPECTED_ROLLUP_REVIEWER_AUDIT_EVENT_TEMPLATES = [
"audit-reviewer-outcome-review-opened",
"audit-reviewer-outcome-classified",
"audit-reviewer-quarantine-or-reject-recorded",
"audit-reviewer-readonly-update-noted",
]
EXPECTED_ROLLUP_REVIEWER_AUDIT_DISPLAY_SECTIONS = [
"display-reviewer-audit-template-summary",
"display-reviewer-audit-metadata-fields",
"display-reviewer-audit-forbidden-payloads",
"display-reviewer-audit-emission-status",
"display-reviewer-audit-non-authorization-boundary",
]
EXPECTED_ROLLUP_REVIEWER_AUDIT_COLLECTION_CHECKS = [
"check-reviewer-audit-template-visible",
"check-reviewer-audit-metadata-only",
"check-reviewer-audit-forbidden-payloads-blocked",
"check-reviewer-audit-emitted-remains-zero",
"check-reviewer-audit-no-runtime-side-effect",
"check-reviewer-audit-owner-response-counts-unchanged",
]
EXPECTED_ROLLUP_REVIEWER_AUDIT_REDACTION_EXAMPLES = [
"redaction-reviewer-role-lane-template-metadata",
"redaction-classification-reason-summary",
"redaction-quarantine-pointer",
"redaction-readonly-update-targets",
"redaction-runtime-gate-counter-summary",
]
EXPECTED_ROLLUP_REVIEWER_AUDIT_RETENTION_RULES = [
"retention-reviewer-start-metadata-only",
"retention-classification-summary-only",
"retention-quarantine-pointer-only",
"retention-readonly-update-targets-only",
"retention-counter-snapshot-only",
]
def load_json(path: Path) -> dict[str, Any]:
return json.loads(path.read_text(encoding="utf-8"))
def fail(label: str, actual: Any, expected: Any) -> None:
raise SystemExit(f"BLOCKED {label}: expected {expected!r}, got {actual!r}")
def assert_equal(label: str, actual: Any, expected: Any) -> None:
if actual != expected:
fail(label, actual, expected)
def assert_false(label: str, actual: Any) -> None:
assert_equal(label, actual, False)
def assert_true(label: str, actual: Any) -> None:
assert_equal(label, actual, True)
def validate(root: Path) -> None:
security_dir = root / "docs" / "security"
rollup = load_json(security_dir / "source-control-owner-response-validation-rollup.snapshot.json")
rollup_summary = rollup["summary"]
assert_equal("rollup.status", rollup["status"], "draft_waiting_owner_responses")
assert_false("rollup.runtime_execution_authorized", rollup["runtime_execution_authorized"])
assert_equal("rollup.response_packet_count", rollup_summary["response_packet_count"], len(LANES))
assert_equal("rollup.validation_lane_count", rollup_summary["validation_lane_count"], len(LANES))
assert_equal("rollup.total_response_template_count", rollup_summary["total_response_template_count"], 22)
assert_equal("rollup.total_received_response_count", rollup_summary["total_received_response_count"], 0)
assert_equal("rollup.total_accepted_response_count", rollup_summary["total_accepted_response_count"], 0)
assert_equal("rollup.total_rejected_response_count", rollup_summary["total_rejected_response_count"], 0)
assert_equal("rollup.total_acceptance_check_count", rollup_summary["total_acceptance_check_count"], 32)
assert_equal("rollup.total_rejection_rule_count", rollup_summary["total_rejection_rule_count"], 40)
assert_equal(
"rollup.owner_response_evidence_routing_rule_count",
rollup_summary["owner_response_evidence_routing_rule_count"],
len(EXPECTED_ROLLUP_EVIDENCE_ROUTING_RULES),
)
assert_equal(
"rollup.owner_response_validation_display_section_count",
rollup_summary["owner_response_validation_display_section_count"],
len(EXPECTED_ROLLUP_DISPLAY_SECTIONS),
)
assert_equal(
"rollup.owner_response_validation_state_transition_rule_count",
rollup_summary["owner_response_validation_state_transition_rule_count"],
len(EXPECTED_ROLLUP_STATE_TRANSITION_RULES),
)
assert_equal(
"rollup.owner_response_validation_reviewer_checklist_count",
rollup_summary["owner_response_validation_reviewer_checklist_count"],
len(EXPECTED_ROLLUP_REVIEWER_CHECKLIST),
)
assert_equal(
"rollup.owner_response_validation_reviewer_outcome_lane_count",
rollup_summary["owner_response_validation_reviewer_outcome_lane_count"],
len(EXPECTED_ROLLUP_REVIEWER_OUTCOME_LANES),
)
assert_equal(
"rollup.owner_response_validation_reviewer_audit_event_template_count",
rollup_summary["owner_response_validation_reviewer_audit_event_template_count"],
len(EXPECTED_ROLLUP_REVIEWER_AUDIT_EVENT_TEMPLATES),
)
assert_equal(
"rollup.owner_response_validation_reviewer_audit_display_section_count",
rollup_summary["owner_response_validation_reviewer_audit_display_section_count"],
len(EXPECTED_ROLLUP_REVIEWER_AUDIT_DISPLAY_SECTIONS),
)
assert_equal(
"rollup.owner_response_validation_reviewer_audit_collection_check_count",
rollup_summary["owner_response_validation_reviewer_audit_collection_check_count"],
len(EXPECTED_ROLLUP_REVIEWER_AUDIT_COLLECTION_CHECKS),
)
assert_equal(
"rollup.owner_response_validation_reviewer_audit_redaction_example_count",
rollup_summary["owner_response_validation_reviewer_audit_redaction_example_count"],
len(EXPECTED_ROLLUP_REVIEWER_AUDIT_REDACTION_EXAMPLES),
)
assert_equal(
"rollup.owner_response_validation_reviewer_audit_retention_rule_count",
rollup_summary["owner_response_validation_reviewer_audit_retention_rule_count"],
len(EXPECTED_ROLLUP_REVIEWER_AUDIT_RETENTION_RULES),
)
assert_true("rollup.quarantine_required", rollup_summary["quarantine_required"])
assert_equal("rollup.primary_ready_count", rollup_summary["primary_ready_count"], 0)
for flag in [
"runtime_execution_authorized",
"token_value_collection_allowed",
"secret_value_collection_allowed",
"write_token_allowed",
"repo_creation_authorized",
"visibility_change_authorized",
"gitea_repo_write_authorized",
"refs_sync_authorized",
"refs_delete_authorized",
"force_push_authorized",
"workflow_modification_authorized",
"runner_enablement_authorized",
"github_hosted_runner_enable_authorized",
"github_primary_switch_authorized",
"action_buttons_allowed",
]:
assert_false(f"rollup.{flag}", rollup_summary[flag])
lane_by_id = {lane["lane_id"]: lane for lane in rollup["validation_lanes"]}
missing_lane_by_id = {lane["lane_id"]: lane for lane in rollup["missing_response_lanes"]}
collection_order_by_id = {item["lane_id"]: item for item in rollup["owner_response_collection_order"]}
next_collection_candidate = rollup["next_collection_candidate"]
total_templates = 0
total_acceptance_checks = 0
total_rejection_rules = 0
for index, lane in enumerate(LANES, start=1):
snapshot = load_json(security_dir / lane["path"])
summary = snapshot["summary"]
rollup_lane = lane_by_id[lane["lane_id"]]
missing_lane = missing_lane_by_id[lane["lane_id"]]
collection_item = collection_order_by_id[lane["lane_id"]]
assert_equal(f"{lane['lane_id']}.status", summary["owner_response_status"], "waiting_owner_response")
assert_equal(f"{lane['lane_id']}.response_template_count", summary["response_template_count"], lane["expected_templates"])
assert_equal(f"{lane['lane_id']}.received_response_count", summary["received_response_count"], 0)
assert_equal(f"{lane['lane_id']}.accepted_response_count", summary["accepted_response_count"], 0)
assert_equal(f"{lane['lane_id']}.rejected_response_count", summary["rejected_response_count"], 0)
assert_equal(f"{lane['lane_id']}.acceptance_check_count", summary["acceptance_check_count"], 8)
assert_equal(f"{lane['lane_id']}.rejection_rule_count", summary["rejection_rule_count"], 10)
expected_template_statuses = lane.get("expected_template_statuses")
if expected_template_statuses is not None:
template_statuses = snapshot["owner_response_template_statuses"]
assert_equal(
f"{lane['lane_id']}.owner_response_template_status_count",
summary["owner_response_template_status_count"],
len(expected_template_statuses),
)
assert_equal(
f"{lane['lane_id']}.owner_response_template_status_ids",
[item["template_id"] for item in template_statuses],
expected_template_statuses,
)
assert_equal(
f"{lane['lane_id']}.owner_response_template_status_display_order",
[item["display_order"] for item in template_statuses],
list(range(1, len(expected_template_statuses) + 1)),
)
for item in template_statuses:
assert_equal(
f"{lane['lane_id']}.{item['template_id']}.collection_status",
item["collection_status"],
"waiting_owner_response",
)
assert_equal(
f"{lane['lane_id']}.{item['template_id']}.request_status",
item["request_status"],
"request_ready_not_sent",
)
assert_equal(f"{lane['lane_id']}.{item['template_id']}.received_response_count", item["received_response_count"], 0)
assert_equal(f"{lane['lane_id']}.{item['template_id']}.accepted_response_count", item["accepted_response_count"], 0)
assert_equal(f"{lane['lane_id']}.{item['template_id']}.rejected_response_count", item["rejected_response_count"], 0)
assert_equal(
f"{lane['lane_id']}.{item['template_id']}.latest_outcome_lane",
item["latest_outcome_lane"],
"keep_waiting_owner_response",
)
assert_equal(
f"{lane['lane_id']}.{item['template_id']}.awooop_display_mode",
item["awooop_display_mode"],
"display_template_status_only",
)
assert_false(
f"{lane['lane_id']}.{item['template_id']}.execution_authorized",
item["execution_authorized"],
)
assert_true(f"{lane['lane_id']}.{item['template_id']}.not_approval", item["not_approval"])
expected_audit_event_templates = lane.get("expected_audit_event_templates")
if expected_audit_event_templates is not None:
audit_event_templates = snapshot["owner_response_audit_event_templates"]
assert_equal(
f"{lane['lane_id']}.owner_response_audit_event_template_count",
summary["owner_response_audit_event_template_count"],
len(expected_audit_event_templates),
)
assert_equal(
f"{lane['lane_id']}.owner_response_audit_event_template_ids",
[item["event_template_id"] for item in audit_event_templates],
expected_audit_event_templates,
)
assert_equal(
f"{lane['lane_id']}.owner_response_audit_event_display_order",
[item["display_order"] for item in audit_event_templates],
list(range(1, len(expected_audit_event_templates) + 1)),
)
for item in audit_event_templates:
assert_equal(
f"{lane['lane_id']}.{item['event_template_id']}.event_status",
item["event_status"],
"template_only_not_emitted",
)
assert_equal(f"{lane['lane_id']}.{item['event_template_id']}.emitted_event_count", item["emitted_event_count"], 0)
assert_false(
f"{lane['lane_id']}.{item['event_template_id']}.stored_raw_payload_allowed",
item["stored_raw_payload_allowed"],
)
assert_equal(
f"{lane['lane_id']}.{item['event_template_id']}.awooop_display_mode",
item["awooop_display_mode"],
"display_audit_template_only",
)
assert_false(
f"{lane['lane_id']}.{item['event_template_id']}.execution_authorized",
item["execution_authorized"],
)
assert_true(f"{lane['lane_id']}.{item['event_template_id']}.not_approval", item["not_approval"])
expected_redaction_examples = lane.get("expected_redaction_examples")
if expected_redaction_examples is not None:
redaction_examples = snapshot["owner_response_redaction_examples"]
assert_equal(
f"{lane['lane_id']}.owner_response_redaction_example_count",
summary["owner_response_redaction_example_count"],
len(expected_redaction_examples),
)
assert_equal(
f"{lane['lane_id']}.owner_response_redaction_example_ids",
[item["example_id"] for item in redaction_examples],
expected_redaction_examples,
)
assert_equal(
f"{lane['lane_id']}.owner_response_redaction_example_display_order",
[item["display_order"] for item in redaction_examples],
list(range(1, len(expected_redaction_examples) + 1)),
)
for item in redaction_examples:
assert_equal(
f"{lane['lane_id']}.{item['example_id']}.example_status",
item["example_status"],
"template_example_only",
)
assert_false(
f"{lane['lane_id']}.{item['example_id']}.stored_raw_payload_allowed",
item["stored_raw_payload_allowed"],
)
assert_equal(
f"{lane['lane_id']}.{item['example_id']}.awooop_display_mode",
item["awooop_display_mode"],
"display_redaction_example_only",
)
assert_false(
f"{lane['lane_id']}.{item['example_id']}.execution_authorized",
item["execution_authorized"],
)
assert_true(f"{lane['lane_id']}.{item['example_id']}.not_approval", item["not_approval"])
expected_display_sections = lane.get("expected_display_sections")
if expected_display_sections is not None:
display_sections = snapshot["owner_response_display_sections"]
assert_equal(
f"{lane['lane_id']}.owner_response_display_section_count",
summary["owner_response_display_section_count"],
len(expected_display_sections),
)
assert_equal(
f"{lane['lane_id']}.owner_response_display_section_ids",
[item["section_id"] for item in display_sections],
expected_display_sections,
)
assert_equal(
f"{lane['lane_id']}.owner_response_display_section_order",
[item["display_order"] for item in display_sections],
list(range(1, len(expected_display_sections) + 1)),
)
for item in display_sections:
assert_equal(
f"{lane['lane_id']}.{item['section_id']}.section_status",
item["section_status"],
"display_contract_only",
)
assert_false(
f"{lane['lane_id']}.{item['section_id']}.execution_authorized",
item["execution_authorized"],
)
assert_true(f"{lane['lane_id']}.{item['section_id']}.not_approval", item["not_approval"])
expected_request_packet_id = lane.get("expected_request_packet_id")
if expected_request_packet_id is not None:
request_packet = snapshot["owner_response_request_packet"]
expected_request_template_ids = lane["expected_request_template_ids"]
assert_equal(
f"{lane['lane_id']}.owner_response_request_packet_count",
summary["owner_response_request_packet_count"],
1,
)
assert_equal(
f"{lane['lane_id']}.owner_response_request_packet_id",
request_packet["request_id"],
expected_request_packet_id,
)
assert_equal(
f"{lane['lane_id']}.owner_response_request_display_status",
request_packet["display_status"],
"ready_to_request_owner_response",
)
assert_equal(
f"{lane['lane_id']}.owner_response_request_template_ids",
request_packet["requested_template_ids"],
expected_request_template_ids,
)
assert_equal(
f"{lane['lane_id']}.owner_response_request_awooop_display_mode",
request_packet["awooop_display_mode"],
"display_owner_response_request_only",
)
assert_false(
f"{lane['lane_id']}.owner_response_request_execution_authorized",
request_packet["execution_authorized"],
)
assert_true(f"{lane['lane_id']}.owner_response_request_not_approval", request_packet["not_approval"])
expected_collection_checks = lane.get("expected_collection_checks")
if expected_collection_checks is not None:
collection_checks = snapshot["owner_response_collection_checks"]
assert_equal(
f"{lane['lane_id']}.owner_response_collection_check_count",
summary["owner_response_collection_check_count"],
len(expected_collection_checks),
)
assert_equal(
f"{lane['lane_id']}.owner_response_collection_check_ids",
[item["check_id"] for item in collection_checks],
expected_collection_checks,
)
assert_equal(
f"{lane['lane_id']}.owner_response_collection_display_order",
[item["display_order"] for item in collection_checks],
list(range(1, len(expected_collection_checks) + 1)),
)
for item in collection_checks:
assert_true(f"{lane['lane_id']}.{item['check_id']}.required", item["required"])
assert_false(
f"{lane['lane_id']}.{item['check_id']}.execution_authorized",
item["execution_authorized"],
)
assert_true(f"{lane['lane_id']}.{item['check_id']}.not_approval", item["not_approval"])
expected_preflight_checks = lane.get("expected_preflight_checks")
if expected_preflight_checks is not None:
intake_preflight_checks = snapshot["intake_preflight_checks"]
assert_equal(
f"{lane['lane_id']}.intake_preflight_check_count",
summary["intake_preflight_check_count"],
len(expected_preflight_checks),
)
assert_equal(
f"{lane['lane_id']}.intake_preflight_check_ids",
[item["check_id"] for item in intake_preflight_checks],
expected_preflight_checks,
)
assert_equal(
f"{lane['lane_id']}.intake_preflight_display_order",
[item["display_order"] for item in intake_preflight_checks],
list(range(1, len(expected_preflight_checks) + 1)),
)
for item in intake_preflight_checks:
assert_true(f"{lane['lane_id']}.{item['check_id']}.required", item["required"])
assert_false(
f"{lane['lane_id']}.{item['check_id']}.execution_authorized",
item["execution_authorized"],
)
expected_outcome_lanes = lane.get("expected_outcome_lanes")
if expected_outcome_lanes is not None:
intake_outcome_lanes = snapshot["intake_outcome_lanes"]
assert_equal(
f"{lane['lane_id']}.intake_outcome_lane_count",
summary["intake_outcome_lane_count"],
len(expected_outcome_lanes),
)
assert_equal(
f"{lane['lane_id']}.intake_outcome_lane_ids",
[item["lane_id"] for item in intake_outcome_lanes],
expected_outcome_lanes,
)
assert_equal(
f"{lane['lane_id']}.intake_outcome_display_order",
[item["display_order"] for item in intake_outcome_lanes],
list(range(1, len(expected_outcome_lanes) + 1)),
)
for item in intake_outcome_lanes:
assert_false(
f"{lane['lane_id']}.{item['lane_id']}.execution_authorized",
item["execution_authorized"],
)
assert_true(f"{lane['lane_id']}.{item['lane_id']}.not_approval", item["not_approval"])
assert_false(f"{lane['lane_id']}.runtime_execution_authorized", snapshot["runtime_execution_authorized"])
assert_false(f"{lane['lane_id']}.rollup_execution_authorized", rollup_lane["execution_authorized"])
assert_equal(
f"{lane['lane_id']}.rollup_response_template_count",
rollup_lane["response_template_count"],
lane["expected_templates"],
)
assert_equal(f"{lane['lane_id']}.rollup_received_response_count", rollup_lane["received_response_count"], 0)
assert_equal(f"{lane['lane_id']}.rollup_accepted_response_count", rollup_lane["accepted_response_count"], 0)
assert_equal(f"{lane['lane_id']}.rollup_rejected_response_count", rollup_lane["rejected_response_count"], 0)
assert_equal(f"{lane['lane_id']}.missing_current_status", missing_lane["current_status"], "waiting_owner_response")
assert_equal(
f"{lane['lane_id']}.missing_response_template_count",
missing_lane["response_template_count"],
lane["expected_templates"],
)
assert_equal(f"{lane['lane_id']}.missing_received_response_count", missing_lane["received_response_count"], 0)
assert_equal(f"{lane['lane_id']}.missing_accepted_response_count", missing_lane["accepted_response_count"], 0)
assert_equal(f"{lane['lane_id']}.missing_awooop_display_mode", missing_lane["awooop_display_mode"], "observe_missing_response")
assert_equal(f"{lane['lane_id']}.collection_order", collection_item["order"], index)
assert_equal(
f"{lane['lane_id']}.collection_awooop_action",
collection_item["awooop_action"],
"display_next_collection_item",
)
assert_true(f"{lane['lane_id']}.collection_blocked_until_received", collection_item["blocked_until_received"])
assert_false(f"{lane['lane_id']}.collection_execution_authorized", collection_item["execution_authorized"])
for flag in lane["false_flags"]:
assert_false(f"{lane['lane_id']}.{flag}", summary[flag])
total_templates += summary["response_template_count"]
total_acceptance_checks += summary["acceptance_check_count"]
total_rejection_rules += summary["rejection_rule_count"]
assert_equal("source_packets.total_templates", total_templates, rollup_summary["total_response_template_count"])
assert_equal("source_packets.total_acceptance_checks", total_acceptance_checks, rollup_summary["total_acceptance_check_count"])
assert_equal("source_packets.total_rejection_rules", total_rejection_rules, rollup_summary["total_rejection_rule_count"])
assert_equal("missing_response_lanes.count", len(missing_lane_by_id), len(LANES))
assert_equal("owner_response_collection_order.count", len(collection_order_by_id), len(LANES))
evidence_routing_rules = rollup["owner_response_evidence_routing_rules"]
assert_equal(
"owner_response_evidence_routing_rules.ids",
[item["rule_id"] for item in evidence_routing_rules],
EXPECTED_ROLLUP_EVIDENCE_ROUTING_RULES,
)
assert_equal(
"owner_response_evidence_routing_rules.display_order",
[item["display_order"] for item in evidence_routing_rules],
list(range(1, len(EXPECTED_ROLLUP_EVIDENCE_ROUTING_RULES) + 1)),
)
for item in evidence_routing_rules:
assert_equal(
f"owner_response_evidence_routing_rules.{item['rule_id']}.awooop_display_mode",
item["awooop_display_mode"],
"display_evidence_route_only",
)
assert_false(
f"owner_response_evidence_routing_rules.{item['rule_id']}.execution_authorized",
item["execution_authorized"],
)
assert_true(f"owner_response_evidence_routing_rules.{item['rule_id']}.not_approval", item["not_approval"])
display_sections = rollup["owner_response_validation_display_sections"]
assert_equal(
"owner_response_validation_display_sections.ids",
[item["section_id"] for item in display_sections],
EXPECTED_ROLLUP_DISPLAY_SECTIONS,
)
assert_equal(
"owner_response_validation_display_sections.display_order",
[item["display_order"] for item in display_sections],
list(range(1, len(EXPECTED_ROLLUP_DISPLAY_SECTIONS) + 1)),
)
for item in display_sections:
assert_equal(
f"owner_response_validation_display_sections.{item['section_id']}.section_status",
item["section_status"],
"display_contract_only",
)
assert_equal(
f"owner_response_validation_display_sections.{item['section_id']}.awooop_display_mode",
item["awooop_display_mode"],
"display_validation_section_only",
)
assert_false(
f"owner_response_validation_display_sections.{item['section_id']}.execution_authorized",
item["execution_authorized"],
)
assert_true(f"owner_response_validation_display_sections.{item['section_id']}.not_approval", item["not_approval"])
state_transition_rules = rollup["owner_response_validation_state_transition_rules"]
assert_equal(
"owner_response_validation_state_transition_rules.ids",
[item["rule_id"] for item in state_transition_rules],
EXPECTED_ROLLUP_STATE_TRANSITION_RULES,
)
assert_equal(
"owner_response_validation_state_transition_rules.display_order",
[item["display_order"] for item in state_transition_rules],
list(range(1, len(EXPECTED_ROLLUP_STATE_TRANSITION_RULES) + 1)),
)
for item in state_transition_rules:
assert_equal(
f"owner_response_validation_state_transition_rules.{item['rule_id']}.awooop_display_mode",
item["awooop_display_mode"],
"display_state_transition_rule_only",
)
assert_false(
f"owner_response_validation_state_transition_rules.{item['rule_id']}.execution_authorized",
item["execution_authorized"],
)
assert_true(f"owner_response_validation_state_transition_rules.{item['rule_id']}.not_approval", item["not_approval"])
for blocked_update in item["blocked_updates"]:
if blocked_update in {"create_runtime_gate", "enqueue_execution", "add_action_button"}:
assert_false(
f"owner_response_validation_state_transition_rules.{item['rule_id']}.runtime_execution_authorized",
item["execution_authorized"],
)
reviewer_checklist = rollup["owner_response_validation_reviewer_checklist"]
assert_equal(
"owner_response_validation_reviewer_checklist.ids",
[item["checklist_id"] for item in reviewer_checklist],
EXPECTED_ROLLUP_REVIEWER_CHECKLIST,
)
assert_equal(
"owner_response_validation_reviewer_checklist.display_order",
[item["display_order"] for item in reviewer_checklist],
list(range(1, len(EXPECTED_ROLLUP_REVIEWER_CHECKLIST) + 1)),
)
for item in reviewer_checklist:
assert_equal(
f"owner_response_validation_reviewer_checklist.{item['checklist_id']}.awooop_display_mode",
item["awooop_display_mode"],
"display_reviewer_checklist_only",
)
assert_false(
f"owner_response_validation_reviewer_checklist.{item['checklist_id']}.execution_authorized",
item["execution_authorized"],
)
assert_true(f"owner_response_validation_reviewer_checklist.{item['checklist_id']}.not_approval", item["not_approval"])
reviewer_outcome_lanes = rollup["owner_response_validation_reviewer_outcome_lanes"]
assert_equal(
"owner_response_validation_reviewer_outcome_lanes.ids",
[item["outcome_lane_id"] for item in reviewer_outcome_lanes],
EXPECTED_ROLLUP_REVIEWER_OUTCOME_LANES,
)
assert_equal(
"owner_response_validation_reviewer_outcome_lanes.display_order",
[item["display_order"] for item in reviewer_outcome_lanes],
list(range(1, len(EXPECTED_ROLLUP_REVIEWER_OUTCOME_LANES) + 1)),
)
for item in reviewer_outcome_lanes:
assert_equal(
f"owner_response_validation_reviewer_outcome_lanes.{item['outcome_lane_id']}.awooop_display_mode",
item["awooop_display_mode"],
"display_reviewer_outcome_lane_only",
)
assert_false(
f"owner_response_validation_reviewer_outcome_lanes.{item['outcome_lane_id']}.execution_authorized",
item["execution_authorized"],
)
assert_true(
f"owner_response_validation_reviewer_outcome_lanes.{item['outcome_lane_id']}.not_approval",
item["not_approval"],
)
for blocked_update in item["blocked_updates"]:
if blocked_update in {"create_runtime_gate", "enqueue_execution", "add_action_button"}:
assert_false(
f"owner_response_validation_reviewer_outcome_lanes.{item['outcome_lane_id']}.runtime_execution_authorized",
item["execution_authorized"],
)
reviewer_audit_event_templates = rollup["owner_response_validation_reviewer_audit_event_templates"]
assert_equal(
"owner_response_validation_reviewer_audit_event_templates.ids",
[item["event_template_id"] for item in reviewer_audit_event_templates],
EXPECTED_ROLLUP_REVIEWER_AUDIT_EVENT_TEMPLATES,
)
assert_equal(
"owner_response_validation_reviewer_audit_event_templates.display_order",
[item["display_order"] for item in reviewer_audit_event_templates],
list(range(1, len(EXPECTED_ROLLUP_REVIEWER_AUDIT_EVENT_TEMPLATES) + 1)),
)
for item in reviewer_audit_event_templates:
assert_equal(
f"owner_response_validation_reviewer_audit_event_templates.{item['event_template_id']}.event_status",
item["event_status"],
"template_only_not_emitted",
)
assert_equal(
f"owner_response_validation_reviewer_audit_event_templates.{item['event_template_id']}.emitted_event_count",
item["emitted_event_count"],
0,
)
assert_false(
f"owner_response_validation_reviewer_audit_event_templates.{item['event_template_id']}.stored_raw_payload_allowed",
item["stored_raw_payload_allowed"],
)
assert_equal(
f"owner_response_validation_reviewer_audit_event_templates.{item['event_template_id']}.awooop_display_mode",
item["awooop_display_mode"],
"display_reviewer_audit_template_only",
)
assert_false(
f"owner_response_validation_reviewer_audit_event_templates.{item['event_template_id']}.execution_authorized",
item["execution_authorized"],
)
assert_true(
f"owner_response_validation_reviewer_audit_event_templates.{item['event_template_id']}.not_approval",
item["not_approval"],
)
reviewer_audit_display_sections = rollup["owner_response_validation_reviewer_audit_display_sections"]
assert_equal(
"owner_response_validation_reviewer_audit_display_sections.ids",
[item["section_id"] for item in reviewer_audit_display_sections],
EXPECTED_ROLLUP_REVIEWER_AUDIT_DISPLAY_SECTIONS,
)
assert_equal(
"owner_response_validation_reviewer_audit_display_sections.display_order",
[item["display_order"] for item in reviewer_audit_display_sections],
list(range(1, len(EXPECTED_ROLLUP_REVIEWER_AUDIT_DISPLAY_SECTIONS) + 1)),
)
for item in reviewer_audit_display_sections:
assert_equal(
f"owner_response_validation_reviewer_audit_display_sections.{item['section_id']}.section_status",
item["section_status"],
"display_contract_only",
)
assert_equal(
f"owner_response_validation_reviewer_audit_display_sections.{item['section_id']}.awooop_display_mode",
item["awooop_display_mode"],
"display_reviewer_audit_section_only",
)
assert_false(
f"owner_response_validation_reviewer_audit_display_sections.{item['section_id']}.execution_authorized",
item["execution_authorized"],
)
assert_true(
f"owner_response_validation_reviewer_audit_display_sections.{item['section_id']}.not_approval",
item["not_approval"],
)
reviewer_audit_collection_checks = rollup["owner_response_validation_reviewer_audit_collection_checks"]
assert_equal(
"owner_response_validation_reviewer_audit_collection_checks.ids",
[item["check_id"] for item in reviewer_audit_collection_checks],
EXPECTED_ROLLUP_REVIEWER_AUDIT_COLLECTION_CHECKS,
)
assert_equal(
"owner_response_validation_reviewer_audit_collection_checks.display_order",
[item["display_order"] for item in reviewer_audit_collection_checks],
list(range(1, len(EXPECTED_ROLLUP_REVIEWER_AUDIT_COLLECTION_CHECKS) + 1)),
)
for item in reviewer_audit_collection_checks:
assert_equal(
f"owner_response_validation_reviewer_audit_collection_checks.{item['check_id']}.awooop_display_mode",
item["awooop_display_mode"],
"display_reviewer_audit_collection_check_only",
)
assert_false(
f"owner_response_validation_reviewer_audit_collection_checks.{item['check_id']}.execution_authorized",
item["execution_authorized"],
)
assert_true(
f"owner_response_validation_reviewer_audit_collection_checks.{item['check_id']}.not_approval",
item["not_approval"],
)
for blocked_interpretation in item["blocked_interpretations"]:
if blocked_interpretation in {
"create_runtime_gate",
"enqueue_execution",
"add_action_button",
"start_kali_scan",
"modify_repo_or_workflow",
}:
assert_false(
f"owner_response_validation_reviewer_audit_collection_checks.{item['check_id']}.runtime_execution_authorized",
item["execution_authorized"],
)
reviewer_audit_redaction_examples = rollup["owner_response_validation_reviewer_audit_redaction_examples"]
assert_equal(
"owner_response_validation_reviewer_audit_redaction_examples.ids",
[item["example_id"] for item in reviewer_audit_redaction_examples],
EXPECTED_ROLLUP_REVIEWER_AUDIT_REDACTION_EXAMPLES,
)
assert_equal(
"owner_response_validation_reviewer_audit_redaction_examples.display_order",
[item["display_order"] for item in reviewer_audit_redaction_examples],
list(range(1, len(EXPECTED_ROLLUP_REVIEWER_AUDIT_REDACTION_EXAMPLES) + 1)),
)
for item in reviewer_audit_redaction_examples:
assert_equal(
f"owner_response_validation_reviewer_audit_redaction_examples.{item['example_id']}.redaction_status",
item["redaction_status"],
"example_only_not_response",
)
assert_equal(
f"owner_response_validation_reviewer_audit_redaction_examples.{item['example_id']}.awooop_display_mode",
item["awooop_display_mode"],
"display_reviewer_audit_redaction_example_only",
)
assert_false(
f"owner_response_validation_reviewer_audit_redaction_examples.{item['example_id']}.execution_authorized",
item["execution_authorized"],
)
assert_true(
f"owner_response_validation_reviewer_audit_redaction_examples.{item['example_id']}.not_approval",
item["not_approval"],
)
reviewer_audit_retention_rules = rollup["owner_response_validation_reviewer_audit_retention_rules"]
assert_equal(
"owner_response_validation_reviewer_audit_retention_rules.ids",
[item["rule_id"] for item in reviewer_audit_retention_rules],
EXPECTED_ROLLUP_REVIEWER_AUDIT_RETENTION_RULES,
)
assert_equal(
"owner_response_validation_reviewer_audit_retention_rules.display_order",
[item["display_order"] for item in reviewer_audit_retention_rules],
list(range(1, len(EXPECTED_ROLLUP_REVIEWER_AUDIT_RETENTION_RULES) + 1)),
)
for item in reviewer_audit_retention_rules:
assert_equal(
f"owner_response_validation_reviewer_audit_retention_rules.{item['rule_id']}.retention_status",
item["retention_status"],
"metadata_retention_rule_only",
)
assert_equal(
f"owner_response_validation_reviewer_audit_retention_rules.{item['rule_id']}.awooop_display_mode",
item["awooop_display_mode"],
"display_reviewer_audit_retention_rule_only",
)
assert_false(
f"owner_response_validation_reviewer_audit_retention_rules.{item['rule_id']}.execution_authorized",
item["execution_authorized"],
)
assert_true(
f"owner_response_validation_reviewer_audit_retention_rules.{item['rule_id']}.not_approval",
item["not_approval"],
)
first_lane = LANES[0]
first_collection_item = collection_order_by_id[first_lane["lane_id"]]
first_missing_lane = missing_lane_by_id[first_lane["lane_id"]]
assert_equal("next_collection_candidate.order", next_collection_candidate["order"], 1)
assert_equal("next_collection_candidate.lane_id", next_collection_candidate["lane_id"], first_lane["lane_id"])
assert_equal(
"next_collection_candidate.display_status",
next_collection_candidate["display_status"],
"next_owner_response_required",
)
assert_equal(
"next_collection_candidate.source_contract",
next_collection_candidate["source_contract"],
first_missing_lane["source_contract"],
)
assert_equal(
"next_collection_candidate.required_packet",
next_collection_candidate["required_packet"],
first_collection_item["required_packet"],
)
assert_equal(
"next_collection_candidate.required_response_template_count",
next_collection_candidate["required_response_template_count"],
first_lane["expected_templates"],
)
assert_equal("next_collection_candidate.received_response_count", next_collection_candidate["received_response_count"], 0)
assert_equal("next_collection_candidate.accepted_response_count", next_collection_candidate["accepted_response_count"], 0)
assert_equal(
"next_collection_candidate.minimum_response",
next_collection_candidate["minimum_response"],
first_collection_item["minimum_response"],
)
assert_equal(
"next_collection_candidate.awooop_display_mode",
next_collection_candidate["awooop_display_mode"],
"display_next_collection_item_only",
)
assert_true("next_collection_candidate.blocked_until_received", next_collection_candidate["blocked_until_received"])
assert_false("next_collection_candidate.execution_authorized", next_collection_candidate["execution_authorized"])
assert_true("next_collection_candidate.not_approval", next_collection_candidate["not_approval"])
assert_equal(
"next_collection_candidate.still_forbidden",
next_collection_candidate["still_forbidden"],
first_collection_item["still_forbidden"],
)
local_validation = rollup["latest_local_validation"]
assert_equal("rollup.latest_local_validation.status", local_validation["status"], "repo_snapshot_guard_pass")
assert_equal("rollup.latest_local_validation.scope", local_validation["scope"], "repo_snapshot_only")
assert_equal("rollup.latest_local_validation.result", local_validation["result"], "SOURCE_CONTROL_OWNER_RESPONSE_GUARD_OK")
assert_equal("rollup.latest_local_validation.received_response_count", local_validation["received_response_count"], 0)
assert_equal("rollup.latest_local_validation.accepted_response_count", local_validation["accepted_response_count"], 0)
assert_false("rollup.latest_local_validation.runtime_actions_authorized", local_validation["runtime_actions_authorized"])
assert_false("rollup.latest_local_validation.repo_or_refs_actions_authorized", local_validation["repo_or_refs_actions_authorized"])
assert_false("rollup.latest_local_validation.workflow_or_secret_actions_authorized", local_validation["workflow_or_secret_actions_authorized"])
assert_true("rollup.latest_local_validation.not_authorization", local_validation["not_authorization"])
def main() -> None:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--root",
default=Path(__file__).resolve().parents[2],
type=Path,
help="Repository root. Defaults to the current script's repository.",
)
args = parser.parse_args()
validate(args.root.resolve())
print("SOURCE_CONTROL_OWNER_RESPONSE_GUARD_OK")
if __name__ == "__main__":
main()