ops(reboot): guard post-reboot owner packets [skip ci]

This commit is contained in:
ogt
2026-06-26 08:44:37 +08:00
parent 450d733304
commit c45f274d5e
5 changed files with 317 additions and 5 deletions

View File

@@ -0,0 +1,264 @@
#!/usr/bin/env python3
"""Validate post-reboot owner packet JSON stays fail-closed.
Read-only by design. The guard validates an owner packet artifact produced by
post-reboot-next-gate-owner-packets.py, or runs that generator when no packet
file is provided. It never sends requests, reads secrets, writes markers, or
modifies host/runtime state.
"""
from __future__ import annotations
import argparse
import json
import subprocess
import sys
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[2]
OWNER_PACKET_GENERATOR = (
ROOT / "scripts" / "reboot-recovery" / "post-reboot-next-gate-owner-packets.py"
)
EXPECTED_SCHEMA = "awoooi_post_reboot_next_gate_owner_packets_v1"
EXPECTED_GATES = {
"credential_escrow_evidence",
"host_188_hygiene_maintenance_window",
"wazuh_manager_registry_export",
}
EXPECTED_NO_FALSE_GREEN_RULES = {
"service_green_does_not_equal_dr_complete",
"backup_fresh_does_not_equal_credential_escrow_complete",
"host_188_service_green_does_not_equal_host_hygiene_green",
"wazuh_route_or_transport_does_not_equal_manager_registry_accepted",
}
GLOBAL_ZERO_FIELDS = {
("status", "runtime_action_authorized"): 0,
("status", "dispatch_authorized"): 0,
("status", "request_sent_count"): 0,
("status", "owner_response_received_count"): 0,
("status", "owner_response_accepted_count"): 0,
("status", "host_write_authorized"): 0,
("status", "secret_value_collection_allowed"): 0,
("status", "runtime_gate_count"): 0,
("counts", "request_sent_count"): 0,
("counts", "owner_response_received_count"): 0,
("counts", "owner_response_accepted_count"): 0,
("counts", "runtime_action_authorized_count"): 0,
}
GATE_REQUIRED_FORBIDDEN_PAYLOADS = {
"credential_escrow_evidence": {
"password",
"token",
"secret_value",
"hash",
"prefix",
"suffix",
"raw_credential",
},
"wazuh_manager_registry_export": {
"agent_real_name",
"internal_ip",
"client_keys",
"raw_wazuh_payload",
"token",
"password",
"authorization_header",
},
}
GATE_REQUIRED_FORBIDDEN_ACTIONS = {
"credential_escrow_evidence": {
"mark_placeholder",
"write_fake_marker",
"store_secret",
"disable_alert",
},
"host_188_hygiene_maintenance_window": {
"pg_resetwal",
"certbot_renew",
"nginx_reload",
"db_restore",
"docker_restart",
"host_file_write",
},
"wazuh_manager_registry_export": {
"active_response",
"agent_reenroll",
"wazuh_restart",
"secret_patch",
"host_write",
"kali_active_scan",
},
}
PACKET_FALSE_FIELDS = (
"request_sent",
"response_received",
"response_accepted",
"runtime_action_authorized",
)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Validate post-reboot owner packet JSON contract.",
)
parser.add_argument(
"--packet-file",
type=Path,
help="Validate an existing owner packet JSON file.",
)
parser.add_argument(
"--no-color",
action="store_true",
help="Pass --no-color when running the owner packet generator.",
)
return parser.parse_args()
def run_owner_packet_generator(no_color: bool) -> dict[str, Any]:
cmd = [str(OWNER_PACKET_GENERATOR)]
if no_color:
cmd.append("--no-color")
completed = subprocess.run(
cmd,
cwd=ROOT,
check=False,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
if completed.returncode != 0:
raise SystemExit(
"POST_REBOOT_OWNER_PACKET_CONTRACT_GUARD_FAILED "
f"generator_rc={completed.returncode}\n{completed.stdout}"
)
try:
return json.loads(completed.stdout)
except json.JSONDecodeError as exc:
raise SystemExit(
"POST_REBOOT_OWNER_PACKET_CONTRACT_GUARD_FAILED "
f"generator_json_invalid={exc}"
) from exc
def load_packet(args: argparse.Namespace) -> dict[str, Any]:
if args.packet_file:
try:
return json.loads(args.packet_file.read_text(encoding="utf-8"))
except json.JSONDecodeError as exc:
raise SystemExit(
"POST_REBOOT_OWNER_PACKET_CONTRACT_GUARD_FAILED "
f"packet_json_invalid={exc}"
) from exc
return run_owner_packet_generator(no_color=args.no_color)
def as_list(value: Any) -> list[Any]:
if value is None:
return []
if isinstance(value, list):
return value
return [value]
def get_nested(packet: dict[str, Any], path: tuple[str, str]) -> Any:
parent = packet.get(path[0], {})
if not isinstance(parent, dict):
return None
return parent.get(path[1])
def validate_packet(packet: dict[str, Any]) -> list[str]:
failures: list[str] = []
if packet.get("schema_version") != EXPECTED_SCHEMA:
failures.append(f"schema_version={packet.get('schema_version')!r}")
owner_packets = as_list(packet.get("owner_packets"))
counts = packet.get("counts", {})
if not isinstance(counts, dict):
failures.append("counts_not_object")
counts = {}
gate_ids = {str(item.get("packet_id", "")) for item in owner_packets if isinstance(item, dict)}
if gate_ids != EXPECTED_GATES:
failures.append(f"gate_ids={sorted(gate_ids)}")
expected_counts = {
"next_gate_count": 3,
"p0_gate_count": 3,
}
for key, expected in expected_counts.items():
if counts.get(key) != expected:
failures.append(f"{key}={counts.get(key)!r}")
for path, expected in GLOBAL_ZERO_FIELDS.items():
actual = get_nested(packet, path)
if actual != expected:
failures.append(f"{'.'.join(path)}={actual!r}")
no_false_green_rules = set(
str(item) for item in as_list(packet.get("no_false_green_rules"))
)
missing_rules = sorted(EXPECTED_NO_FALSE_GREEN_RULES - no_false_green_rules)
if missing_rules:
failures.append(f"missing_no_false_green_rules={missing_rules}")
for raw_packet in owner_packets:
if not isinstance(raw_packet, dict):
failures.append("owner_packet_not_object")
continue
packet_id = str(raw_packet.get("packet_id", ""))
if raw_packet.get("priority") != "P0":
failures.append(f"{packet_id}.priority={raw_packet.get('priority')!r}")
for key in PACKET_FALSE_FIELDS:
if raw_packet.get(key) is not False:
failures.append(f"{packet_id}.{key}={raw_packet.get(key)!r}")
required_payloads = GATE_REQUIRED_FORBIDDEN_PAYLOADS.get(packet_id, set())
actual_payloads = set(str(item) for item in as_list(raw_packet.get("forbidden_payloads")))
missing_payloads = sorted(required_payloads - actual_payloads)
if missing_payloads:
failures.append(f"{packet_id}.missing_forbidden_payloads={missing_payloads}")
required_actions = GATE_REQUIRED_FORBIDDEN_ACTIONS.get(packet_id, set())
actual_actions = set(str(item) for item in as_list(raw_packet.get("forbidden_actions")))
missing_actions = sorted(required_actions - actual_actions)
if missing_actions:
failures.append(f"{packet_id}.missing_forbidden_actions={missing_actions}")
return failures
def main() -> int:
args = parse_args()
packet = load_packet(args)
failures = validate_packet(packet)
if failures:
print(
"POST_REBOOT_OWNER_PACKET_CONTRACT_GUARD_FAILED "
+ " ".join(failures)
)
return 1
counts = packet.get("counts", {})
status = packet.get("status", {})
print(
"POST_REBOOT_OWNER_PACKET_CONTRACT_GUARD_OK "
f"gates={counts.get('next_gate_count')} "
f"request_sent={counts.get('request_sent_count')} "
f"accepted={counts.get('owner_response_accepted_count')} "
f"runtime_gate={status.get('runtime_gate_count')}"
)
return 0
if __name__ == "__main__":
sys.exit(main())