ops(reboot): add post-reboot owner packet JSON [skip ci]

This commit is contained in:
ogt
2026-06-26 08:32:18 +08:00
parent 229e7fc8cd
commit 02bcf0a31e
5 changed files with 269 additions and 5 deletions

View File

@@ -0,0 +1,211 @@
#!/usr/bin/env python3
"""Build machine-readable owner packets from post-reboot next-gate dispatch.
Read-only by design. This script may run the read-only dispatch checklist, but
it never sends owner requests, reads secrets, writes credential markers, or
modifies host/runtime state.
"""
from __future__ import annotations
import argparse
import json
import subprocess
import sys
from datetime import datetime
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[2]
DISPATCH_SCRIPT = ROOT / "scripts" / "reboot-recovery" / "post-reboot-next-gate-dispatch.sh"
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Convert post-reboot gate dispatch output into owner packet JSON.",
)
parser.add_argument(
"--dispatch-file",
type=Path,
help="Use an existing post-reboot-next-gate-dispatch output file.",
)
parser.add_argument(
"--output",
type=Path,
help="Write JSON to this path instead of stdout.",
)
parser.add_argument(
"--no-color",
action="store_true",
help="Pass --no-color to the delegated dispatch script.",
)
return parser.parse_args()
def run_dispatch(no_color: bool) -> str:
cmd = [str(DISPATCH_SCRIPT)]
if no_color:
cmd.append("--no-color")
completed = subprocess.run(
cmd,
cwd=ROOT,
check=False,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
if completed.returncode not in (0,):
raise SystemExit(
f"dispatch_failed rc={completed.returncode}\n{completed.stdout}"
)
return completed.stdout
def load_dispatch(args: argparse.Namespace) -> str:
if args.dispatch_file:
return args.dispatch_file.read_text(encoding="utf-8")
return run_dispatch(no_color=args.no_color)
def split_csv(value: str) -> list[str]:
if not value or value == "none":
return []
return [item.strip() for item in value.split(",") if item.strip()]
def parse_dispatch(text: str) -> dict[str, Any]:
summary: dict[str, str] = {}
gates: list[dict[str, Any]] = []
current_gate: dict[str, Any] | None = None
for raw_line in text.splitlines():
line = raw_line.strip()
if not line or "=" not in line:
continue
key, value = line.split("=", 1)
key = key.strip()
value = value.strip()
if key == "GATE_ID":
if current_gate:
gates.append(current_gate)
current_gate = {"gate_id": value}
continue
target = current_gate if current_gate is not None else summary
target[key.lower()] = value
if current_gate:
gates.append(current_gate)
for gate in gates:
for key in (
"owner_group",
"required_items",
"required_evidence",
"required_decisions",
"required_export",
"forbidden_payloads",
"forbidden_action",
"forbidden_actions",
"done_criteria",
):
if key in gate:
gate[key] = split_csv(str(gate[key]))
return {"summary": summary, "gates": gates}
def build_packet(parsed: dict[str, Any]) -> dict[str, Any]:
summary = parsed["summary"]
gates = parsed["gates"]
next_required = split_csv(summary.get("next_required_gates", ""))
return {
"schema_version": "awoooi_post_reboot_next_gate_owner_packets_v1",
"generated_at": datetime.now().astimezone().isoformat(timespec="seconds"),
"source": {
"dispatch_script": str(DISPATCH_SCRIPT.relative_to(ROOT)),
"summary_file": summary.get("summary_file", "unknown"),
"summary_artifact_dir": summary.get("summary_artifact_dir", "unknown"),
"overall_declaration": summary.get("overall_declaration", "unknown"),
"next_required_gates": next_required,
},
"status": {
"service_green": summary.get("service_green", "unknown"),
"runtime_action_authorized": 0,
"dispatch_authorized": 0,
"request_sent_count": 0,
"owner_response_received_count": 0,
"owner_response_accepted_count": 0,
"host_write_authorized": 0,
"secret_value_collection_allowed": 0,
"runtime_gate_count": 0,
},
"owner_packets": [
{
"packet_id": gate.get("gate_id", "unknown"),
"title": gate.get("gate_title", "unknown"),
"priority": gate.get("gate_priority", "unknown"),
"status": gate.get("gate_status", "unknown"),
"current_evidence": gate.get("current_evidence", "unknown"),
"owner_group": gate.get("owner_group", []),
"required_items": gate.get("required_items", []),
"required_evidence": gate.get("required_evidence", []),
"required_decisions": gate.get("required_decisions", []),
"required_export": gate.get("required_export", []),
"allowed_action": gate.get("allowed_action", "unknown"),
"forbidden_payloads": gate.get("forbidden_payloads", []),
"forbidden_actions": gate.get("forbidden_actions")
or gate.get("forbidden_action", []),
"done_criteria": gate.get("done_criteria", []),
"request_sent": False,
"response_received": False,
"response_accepted": False,
"runtime_action_authorized": False,
}
for gate in gates
],
"counts": {
"next_gate_count": len(gates),
"p0_gate_count": sum(1 for gate in gates if gate.get("gate_priority") == "P0"),
"request_sent_count": 0,
"owner_response_received_count": 0,
"owner_response_accepted_count": 0,
"runtime_action_authorized_count": 0,
},
"no_false_green_rules": [
"service_green_does_not_equal_dr_complete",
"backup_fresh_does_not_equal_credential_escrow_complete",
"host_188_service_green_does_not_equal_host_hygiene_green",
"wazuh_route_or_transport_does_not_equal_manager_registry_accepted",
],
"forbidden_global_actions": [
"send_owner_request_without_review",
"write_credential_marker_without_non_secret_evidence",
"collect_secret_value_hash_prefix_suffix_or_raw_payload",
"pg_resetwal_or_db_restore_without_maintenance_window",
"nginx_reload_or_certbot_renew_without_owner_gate",
"wazuh_active_response_reenroll_restart_or_secret_patch",
"host_write_or_kali_active_scan_without_explicit_approval",
],
}
def main() -> int:
args = parse_args()
parsed = parse_dispatch(load_dispatch(args))
packet = build_packet(parsed)
payload = json.dumps(packet, ensure_ascii=False, indent=2, sort_keys=True)
if args.output:
args.output.parent.mkdir(parents=True, exist_ok=True)
args.output.write_text(payload + "\n", encoding="utf-8")
else:
print(payload)
return 0
if __name__ == "__main__":
sys.exit(main())