Files
awoooi/scripts/security/backup-restore-owner-request-draft.py

306 lines
11 KiB
Python

#!/usr/bin/env python3
"""
IwoooS Backup / Restore / Escrow owner request draft 產生器。
本工具讀取 backup-restore-escrow repo-only 清冊,將 38 個 surface 轉成人工
送件前 request draft。它不執行 backup、不 restore、不 rclone sync、不
寫 escrow marker、不 restic prune、不 kubectl、不 SSH、不收 secret value。
"""
from __future__ import annotations
import argparse
import json
import subprocess
import sys
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any
TAIPEI = timezone(timedelta(hours=8))
REQUEST_FIELDS = [
"request_id",
"surface_id",
"label",
"expected_scope",
"config_kind",
"backup_scope",
"control_tier",
"repo_source_path",
"repo_sha256",
"owner_role_or_team",
"decision",
"decision_reason",
"affected_scope",
"redacted_evidence_refs",
"latest_backup_status_ref",
"restore_drill_plan",
"offsite_sync_evidence_ref",
"credential_escrow_evidence_ref",
"maintenance_window",
"rollback_owner",
"validation_plan",
"retention_owner",
"followup_owner",
"not_approval",
]
REQUIRED_OWNER_FIELDS = [
"owner_role_or_team",
"decision",
"decision_reason",
"affected_scope",
"redacted_evidence_refs",
"latest_backup_status_ref",
"restore_drill_plan",
"offsite_sync_evidence_ref",
"credential_escrow_evidence_ref",
"maintenance_window",
"rollback_owner",
"validation_plan",
"retention_owner",
"followup_owner",
]
BLOCKED_ACTIONS = [
"backup_run",
"restore_run",
"restore_drill",
"offsite_sync",
"offsite_remote_delete",
"credential_escrow_marker_write",
"retention_change",
"restic_prune",
"rclone_config",
"velero_restore",
"velero_backup",
"kubectl_action",
"ssh_read",
"ssh_write",
"secret_value_collection",
"host_write",
"active_scan",
"runtime_gate_open",
]
def git_short_sha(root: Path) -> str:
try:
result = subprocess.run(
["git", "rev-parse", "--short", "HEAD"],
cwd=root,
check=True,
capture_output=True,
text=True,
)
return result.stdout.strip()
except Exception:
return "unknown"
def load_json(path: Path) -> dict[str, Any]:
return json.loads(path.read_text(encoding="utf-8"))
def request_for(surface: dict[str, Any], write_capable_ids: set[str]) -> dict[str, Any]:
surface_id = surface["surface_id"]
write_capable = surface_id in write_capable_ids
return {
"request_id": f"backup_restore_owner_request:{surface_id}",
"status": "draft_not_dispatched",
"surface_id": surface_id,
"label": surface["label"],
"expected_scope": surface["expected_scope"],
"config_kind": surface["config_kind"],
"backup_scope": surface["backup_scope"],
"control_tier": surface["control_tier"],
"repo_source_path": surface["source_path"],
"repo_sha256": surface["sha256"],
"source_line_count": surface["line_count"],
"requires_live_evidence": surface["requires_live_evidence"],
"write_capable_surface": write_capable,
"source_inventory_ref": "docs/security/backup-restore-escrow-inventory.snapshot.json",
"request_fields": REQUEST_FIELDS,
"required_owner_fields": REQUIRED_OWNER_FIELDS,
"blocked_actions": BLOCKED_ACTIONS,
"owner_role_or_team": "pending_owner_role_or_team",
"decision": "pending_owner_decision",
"decision_reason": "pending_decision_reason",
"affected_scope": "pending_affected_scope",
"redacted_evidence_refs": [],
"latest_backup_status_ref": None,
"restore_drill_plan": "pending_restore_drill_plan",
"offsite_sync_evidence_ref": None,
"credential_escrow_evidence_ref": None,
"maintenance_window": "pending_maintenance_window",
"rollback_owner": "pending_rollback_owner",
"validation_plan": "pending_validation_plan",
"retention_owner": "pending_retention_owner",
"followup_owner": "pending_followup_owner",
"not_approval": True,
"request_sent": False,
"recipient_confirmed": False,
"owner_response_received": False,
"owner_response_accepted": False,
"live_evidence_received": False,
"restore_drill_accepted": False,
"offsite_sync_accepted": False,
"credential_escrow_accepted": False,
"retention_change_accepted": False,
"maintenance_window_accepted": False,
"rollback_owner_accepted": False,
"validation_plan_accepted": False,
"backup_run_authorized": False,
"restore_run_authorized": False,
"offsite_sync_authorized": False,
"offsite_remote_delete_authorized": False,
"credential_escrow_marker_write_authorized": False,
"retention_change_authorized": False,
"restic_prune_authorized": False,
"rclone_config_authorized": False,
"velero_restore_authorized": False,
"velero_backup_authorized": False,
"kubectl_action_authorized": False,
"ssh_read_authorized": False,
"ssh_write_authorized": False,
"secret_value_collection_allowed": False,
"host_write_authorized": False,
"active_scan_authorized": False,
"runtime_gate": False,
"action_buttons_allowed": False,
}
def build_report(root: Path, inventory: dict[str, Any], generated_at: str | None) -> dict[str, Any]:
report_time = generated_at or datetime.now(TAIPEI).isoformat(timespec="seconds")
write_capable_ids = {surface["surface_id"] for surface in inventory["write_capable_surfaces"]}
requests = [request_for(surface, write_capable_ids) for surface in inventory["backup_surfaces"]]
write_capable_requests = [item for item in requests if item["write_capable_surface"]]
live_evidence_requests = [item for item in requests if item["requires_live_evidence"]]
return {
"schema_version": "backup_restore_owner_request_draft_v1",
"generated_at": report_time,
"git_commit": git_short_sha(root),
"source_inventory_schema_version": inventory.get("schema_version"),
"source_inventory_status": inventory.get("status"),
"status": "owner_request_draft_ready_not_dispatched",
"summary": {
"request_draft_count": len(requests),
"write_capable_request_draft_count": len(write_capable_requests),
"live_evidence_required_request_count": len(live_evidence_requests),
"request_field_count": len(REQUEST_FIELDS),
"required_owner_field_count": len(REQUIRED_OWNER_FIELDS),
"blocked_action_count": len(BLOCKED_ACTIONS),
"request_sent_count": 0,
"recipient_confirmed_count": 0,
"owner_response_received_count": 0,
"owner_response_accepted_count": 0,
"live_evidence_received_count": 0,
"restore_drill_accepted_count": 0,
"offsite_sync_accepted_count": 0,
"credential_escrow_accepted_count": 0,
"retention_change_accepted_count": 0,
"maintenance_window_accepted_count": 0,
"rollback_owner_accepted_count": 0,
"validation_plan_accepted_count": 0,
"backup_run_authorized_count": 0,
"restore_run_authorized_count": 0,
"offsite_sync_authorized_count": 0,
"offsite_remote_delete_authorized_count": 0,
"credential_escrow_marker_write_authorized_count": 0,
"retention_change_authorized_count": 0,
"restic_prune_authorized_count": 0,
"rclone_config_authorized_count": 0,
"velero_restore_authorized_count": 0,
"velero_backup_authorized_count": 0,
"kubectl_action_authorized_count": 0,
"ssh_read_authorized_count": 0,
"ssh_write_authorized_count": 0,
"secret_value_collection_allowed_count": 0,
"host_write_authorized_count": 0,
"active_scan_authorized_count": 0,
"runtime_gate_count": 0,
"action_button_count": 0,
},
"execution_boundaries": {
"request_sent": False,
"recipient_confirmed": False,
"owner_response_received": False,
"owner_response_accepted": False,
"live_evidence_received": False,
"backup_run_authorized": False,
"restore_run_authorized": False,
"restore_drill_authorized": False,
"offsite_sync_authorized": False,
"offsite_remote_delete_authorized": False,
"credential_escrow_marker_write_authorized": False,
"retention_change_authorized": False,
"restic_prune_authorized": False,
"rclone_config_authorized": False,
"velero_restore_authorized": False,
"velero_backup_authorized": False,
"kubectl_action_authorized": False,
"ssh_read_authorized": False,
"ssh_write_authorized": False,
"secret_value_collection_allowed": False,
"host_write_authorized": False,
"active_scan_authorized": False,
"runtime_execution_authorized": False,
"action_buttons_allowed": False,
"not_authorization": True,
},
"request_fields": REQUEST_FIELDS,
"required_owner_fields": REQUIRED_OWNER_FIELDS,
"blocked_actions": BLOCKED_ACTIONS,
"request_drafts": requests,
"next_steps": [
"人工送件前確認 backup / restore / offsite / credential escrow owner role 與回覆窗口。",
"owner 只能提供非敏感 evidence id、最新備份狀態、restore drill plan、maintenance window、rollback owner 與 validation plan。",
"收到回覆後先做欄位完整性、敏感 payload 隔離、restore / offsite / retention gate 檢查,不得直接執行 backup、restore、sync、prune 或 marker write。",
],
}
def main() -> int:
parser = argparse.ArgumentParser(description="IwoooS backup / restore owner request draft 產生器")
parser.add_argument("--root", default=".", help="repo root")
parser.add_argument(
"--inventory-report",
default="docs/security/backup-restore-escrow-inventory.snapshot.json",
help="backup-restore-escrow-inventory.py 輸出的 JSON",
)
parser.add_argument("--output", help="寫出 JSON 報告")
parser.add_argument("--generated-at", help="固定報告時間,供 committed snapshot 使用")
args = parser.parse_args()
root = Path(args.root).resolve()
inventory = load_json(root / args.inventory_report)
report = build_report(root, inventory, args.generated_at)
payload = json.dumps(report, ensure_ascii=False, indent=2, sort_keys=True)
if args.output:
output = Path(args.output)
output.parent.mkdir(parents=True, exist_ok=True)
output.write_text(payload + "\n", encoding="utf-8")
else:
print(payload)
summary = report["summary"]
print(
"BACKUP_RESTORE_OWNER_REQUEST_DRAFT_OK "
f"drafts={summary['request_draft_count']} "
f"write_capable={summary['write_capable_request_draft_count']} "
f"fields={summary['required_owner_field_count']} "
f"sent={summary['request_sent_count']} "
f"runtime_gate={summary['runtime_gate_count']}",
file=sys.stderr,
)
return 0
if __name__ == "__main__":
sys.exit(main())