Files
awoooi/scripts/security/backup-restore-owner-response-acceptance.py
Your Name 0359020212
All checks were successful
Code Review / ai-code-review (push) Successful in 14s
CD Pipeline / tests (push) Successful in 1m37s
CD Pipeline / build-and-deploy (push) Successful in 4m47s
CD Pipeline / post-deploy-checks (push) Successful in 1m30s
feat(iwooos): 強化備份復原金庫回補 gate
2026-06-15 15:22:30 +08:00

418 lines
20 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
IwoooS Backup / Restore / Escrow owner response acceptance 只讀帳本產生器。
本工具讀取 Backup / Restore / Escrow inventory 與 owner request draft建立
未來 owner response 如何收件、補件、隔離、拒收或進 restore / retention
review 的 metadata-only acceptance ledger。它不執行 backup、不 restore、不
rclone sync、不寫 escrow marker、不 restic prune、不 kubectl、不 SSH、不收
secret value。
"""
from __future__ import annotations
import argparse
import json
import subprocess
import sys
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any
TAIPEI = timezone(timedelta(hours=8))
ACCEPTANCE_FIELDS = [
"acceptance_candidate_id",
"request_id",
"surface_id",
"config_kind",
"backup_scope",
"control_tier",
"write_capable_surface",
"owner_response_ref",
"owner_role_or_team",
"decision",
"decision_reason",
"affected_scope",
"redacted_evidence_refs",
"latest_backup_status_ref",
"restore_drill_plan_ref",
"offsite_sync_evidence_ref",
"credential_escrow_evidence_ref",
"freshness_slo_ref",
"restore_target_isolation_ref",
"backup_dependency_map_ref",
"data_classification_ref",
"remote_delete_guard_ref",
"retention_runway_ref",
"restore_observer_stop_condition_ref",
"credential_recovery_drill_ref",
"backup_health_no_false_green_ref",
"retention_owner",
"maintenance_window",
"rollback_owner",
"validation_plan",
"reviewer_outcome",
"followup_owner",
"not_approval",
]
BACKUP_RECOVERY_OWNER_RESPONSE_FIELDS = [
"freshness_slo_ref",
"restore_target_isolation_ref",
"backup_dependency_map_ref",
"data_classification_ref",
"remote_delete_guard_ref",
"retention_runway_ref",
"restore_observer_stop_condition_ref",
"credential_recovery_drill_ref",
"backup_health_no_false_green_ref",
]
REVIEWER_CHECKS = [
{"check_id": "owner_identity_present", "instruction": "owner role / team 必須可追溯。"},
{"check_id": "decision_reason_present", "instruction": "decision 與 decision reason 必須同時存在。"},
{"check_id": "affected_scope_matches_surface", "instruction": "affected scope 必須能對回 committed surface_id。"},
{"check_id": "redacted_refs_only", "instruction": "evidence 只能是脫敏 ref、hash、ticket、commit 或 artifact pointer。"},
{"check_id": "secret_value_absent", "instruction": "不得出現 token、private key、seed、rclone config、kubeconfig 或 credential derivative。"},
{"check_id": "backup_status_ref_shape", "instruction": "latest backup status 只能是 owner-provided redacted ref不得讀 live backup store。"},
{"check_id": "restore_drill_plan_present", "instruction": "restore drill 必須是 plan / approval package不得是執行請求。"},
{"check_id": "offsite_sync_ref_not_payload", "instruction": "offsite sync evidence 只能是 ref不得貼 raw listing 或 secret path。"},
{"check_id": "credential_escrow_metadata_only", "instruction": "credential escrow 只能是 metadata / marker ref不得包含 value。"},
{"check_id": "retention_owner_present", "instruction": "retention owner 與 retention decision 必須可追溯。"},
{"check_id": "maintenance_window_present", "instruction": "任何未來 backup / restore / prune / sync 都必須另有維護窗口。"},
{"check_id": "rollback_owner_present", "instruction": "rollback owner 與 rollback ref 必須存在。"},
{"check_id": "counts_transition_safe", "instruction": "只有 reviewer record 可更新 received / accepted / rejected不得同時開 runtime gate。"},
{"check_id": "freshness_slo_present", "instruction": "必須有備份 freshness SLO / RPO ref不得只用 latest 字樣取代。"},
{"check_id": "restore_target_isolation_present", "instruction": "restore drill 必須有隔離目標或明確 no-production-write 邊界。"},
{"check_id": "backup_dependency_map_present", "instruction": "必須列出資料庫、物件儲存、repo、配置、憑證與告警的復原依賴圖 ref。"},
{"check_id": "data_classification_present", "instruction": "必須標示備份集資料分級;不得要求 raw customer data、payload 或 unredacted listing。"},
{"check_id": "remote_delete_guard_present", "instruction": "offsite sync / latest-only policy 必須有 remote delete guard 與 owner ref。"},
{"check_id": "retention_runway_present", "instruction": "retention / prune 必須有可恢復窗口、runway 與撤回條件。"},
{"check_id": "restore_observer_stop_condition_present", "instruction": "restore drill 必須有 observer、stop condition 與 rollback owner。"},
{"check_id": "credential_recovery_drill_metadata_only", "instruction": "credential recovery 只能收 non-secret proof / evidence id不得收 value、hash、seed 或 recovery code。"},
{"check_id": "backup_health_no_false_green_reviewed", "instruction": "backup health / textfile / alert evidence 必須防止 false-green不能只看單一 healthy。"},
]
OUTCOME_LANES = [
{"lane_id": "waiting_owner_response", "meaning": "尚未收到 owner response所有 accepted / runtime count 維持 0。"},
{"lane_id": "quarantine_raw_payload", "meaning": "收到 raw backup listing、secret、rclone config 或不可保存內容時只能隔離。"},
{"lane_id": "reject_secret_or_credential_value", "meaning": "出現 secret value、credential derivative 或未脫敏 payload 時直接拒收。"},
{"lane_id": "request_supplement", "meaning": "欄位不足、scope 不清、restore / retention owner 缺失時要求補件。"},
{"lane_id": "ready_for_restore_review", "meaning": "metadata 合格後,只能進 restore / retention reviewer review。"},
{"lane_id": "owner_review_only_update", "meaning": "只允許更新只讀 owner review ledger不得執行 backup / restore / offsite / retention。"},
{"lane_id": "restore_recovery_backfill_required", "meaning": "restore / cold-start / incident recovery 資料不足時只要求補件,不得演練或寫 production。"},
{"lane_id": "remote_delete_retention_review_required", "meaning": "offsite remote delete、latest-only 與 restic prune 必須進 retention reviewer review。"},
{"lane_id": "waiting_runtime_gate", "meaning": "即使 owner response acceptedruntime gate 仍等待獨立人工批准。"},
]
BLOCKED_ACTIONS = [
"backup_run",
"restore_run",
"restore_drill",
"offsite_sync",
"offsite_remote_delete",
"credential_escrow_marker_write",
"retention_change",
"restic_prune",
"rclone_config",
"velero_restore",
"velero_backup",
"kubectl_action",
"ssh_read",
"ssh_write",
"secret_value_collection",
"host_write",
"active_scan",
"runtime_gate_open",
"raw_backup_payload_storage",
"accept_secret_value_evidence",
"mark_owner_response_accepted_without_reviewer_record",
"accept_backup_without_freshness_slo",
"accept_restore_without_isolated_target",
"accept_offsite_without_remote_delete_guard",
"accept_retention_without_runway",
"accept_credential_recovery_without_non_secret_proof",
"accept_backup_health_false_green",
"skip_dependency_map_review",
"skip_data_classification_review",
"store_raw_restore_payload",
"add_action_button",
]
def git_short_sha(root: Path) -> str:
try:
result = subprocess.run(
["git", "rev-parse", "--short", "HEAD"],
cwd=root,
check=True,
capture_output=True,
text=True,
)
return result.stdout.strip()
except Exception:
return "unknown"
def load_json(path: Path) -> dict[str, Any]:
return json.loads(path.read_text(encoding="utf-8"))
def acceptance_candidate(request: dict[str, Any]) -> dict[str, Any]:
surface_id = request["surface_id"]
required_owner_fields = list(
dict.fromkeys(request["required_owner_fields"] + BACKUP_RECOVERY_OWNER_RESPONSE_FIELDS)
)
return {
"acceptance_candidate_id": f"backup_restore_owner_response_acceptance:{surface_id}",
"status": "waiting_owner_response",
"request_id": request["request_id"],
"surface_id": surface_id,
"config_kind": request["config_kind"],
"backup_scope": request["backup_scope"],
"control_tier": request["control_tier"],
"write_capable_surface": request["write_capable_surface"],
"owner_response_ref": None,
"owner_role_or_team": "pending_owner_response",
"decision": "pending_owner_response",
"decision_reason": "pending_owner_response",
"affected_scope": "pending_owner_response",
"redacted_evidence_refs": [],
"latest_backup_status_ref": None,
"restore_drill_plan_ref": None,
"offsite_sync_evidence_ref": None,
"credential_escrow_evidence_ref": None,
"freshness_slo_ref": None,
"restore_target_isolation_ref": None,
"backup_dependency_map_ref": None,
"data_classification_ref": None,
"remote_delete_guard_ref": None,
"retention_runway_ref": None,
"restore_observer_stop_condition_ref": None,
"credential_recovery_drill_ref": None,
"backup_health_no_false_green_ref": None,
"retention_owner": "pending_owner_response",
"maintenance_window": "pending_owner_response",
"rollback_owner": "pending_owner_response",
"validation_plan": "pending_owner_response",
"reviewer_outcome": "waiting_owner_response",
"followup_owner": "pending_owner_response",
"acceptance_fields": ACCEPTANCE_FIELDS,
"required_owner_fields": required_owner_fields,
"reviewer_checks": [item["check_id"] for item in REVIEWER_CHECKS],
"outcome_lanes": [item["lane_id"] for item in OUTCOME_LANES],
"blocked_actions": BLOCKED_ACTIONS,
"not_approval": True,
"request_sent": False,
"recipient_confirmed": False,
"owner_response_received": False,
"owner_response_accepted": False,
"owner_response_rejected": False,
"owner_response_quarantined": False,
"supplement_requested": False,
"live_evidence_received": False,
"restore_drill_accepted": False,
"offsite_sync_accepted": False,
"credential_escrow_accepted": False,
"retention_change_accepted": False,
"freshness_slo_accepted": False,
"restore_target_isolation_accepted": False,
"backup_dependency_map_accepted": False,
"data_classification_accepted": False,
"remote_delete_guard_accepted": False,
"retention_runway_accepted": False,
"restore_observer_stop_condition_accepted": False,
"credential_recovery_drill_accepted": False,
"backup_health_no_false_green_accepted": False,
"maintenance_window_accepted": False,
"rollback_owner_accepted": False,
"validation_plan_accepted": False,
"backup_run_authorized": False,
"restore_run_authorized": False,
"offsite_sync_authorized": False,
"offsite_remote_delete_authorized": False,
"credential_escrow_marker_write_authorized": False,
"retention_change_authorized": False,
"restic_prune_authorized": False,
"rclone_config_authorized": False,
"velero_restore_authorized": False,
"velero_backup_authorized": False,
"kubectl_action_authorized": False,
"ssh_read_authorized": False,
"ssh_write_authorized": False,
"secret_value_collection_allowed": False,
"host_write_authorized": False,
"active_scan_authorized": False,
"runtime_gate": False,
"action_buttons_allowed": False,
}
def build_report(
root: Path,
inventory: dict[str, Any],
request_draft_report: dict[str, Any],
generated_at: str | None,
) -> dict[str, Any]:
report_time = generated_at or datetime.now(TAIPEI).isoformat(timespec="seconds")
requests = request_draft_report.get("request_drafts", [])
acceptance_candidates = [acceptance_candidate(item) for item in requests]
write_capable = [item for item in acceptance_candidates if item["write_capable_surface"]]
live_evidence = [item for item in acceptance_candidates if item["live_evidence_received"] is False]
required_owner_fields = (
acceptance_candidates[0]["required_owner_fields"]
if acceptance_candidates
else request_draft_report.get("required_owner_fields", [])
)
return {
"schema_version": "backup_restore_owner_response_acceptance_v1",
"generated_at": report_time,
"git_commit": git_short_sha(root),
"source_inventory_schema_version": inventory.get("schema_version"),
"source_inventory_status": inventory.get("status"),
"source_owner_request_schema_version": request_draft_report.get("schema_version"),
"source_owner_request_status": request_draft_report.get("status"),
"status": "owner_response_acceptance_ledger_ready_no_runtime_action",
"summary": {
"source_surface_count": inventory.get("summary", {}).get("surface_count", 0),
"source_request_draft_count": request_draft_report.get("summary", {}).get("request_draft_count", 0),
"acceptance_candidate_count": len(acceptance_candidates),
"write_capable_acceptance_candidate_count": len(write_capable),
"live_evidence_required_candidate_count": len(live_evidence),
"acceptance_field_count": len(ACCEPTANCE_FIELDS),
"required_owner_field_count": len(required_owner_fields),
"reviewer_check_count": len(REVIEWER_CHECKS),
"outcome_lane_count": len(OUTCOME_LANES),
"blocked_action_count": len(BLOCKED_ACTIONS),
"request_sent_count": 0,
"recipient_confirmed_count": 0,
"owner_response_received_count": 0,
"owner_response_accepted_count": 0,
"owner_response_rejected_count": 0,
"owner_response_quarantined_count": 0,
"supplement_requested_count": 0,
"live_evidence_received_count": 0,
"restore_drill_accepted_count": 0,
"offsite_sync_accepted_count": 0,
"credential_escrow_accepted_count": 0,
"retention_change_accepted_count": 0,
"freshness_slo_accepted_count": 0,
"restore_target_isolation_accepted_count": 0,
"backup_dependency_map_accepted_count": 0,
"data_classification_accepted_count": 0,
"remote_delete_guard_accepted_count": 0,
"retention_runway_accepted_count": 0,
"restore_observer_stop_condition_accepted_count": 0,
"credential_recovery_drill_accepted_count": 0,
"backup_health_no_false_green_accepted_count": 0,
"maintenance_window_accepted_count": 0,
"rollback_owner_accepted_count": 0,
"validation_plan_accepted_count": 0,
"backup_run_authorized_count": 0,
"restore_run_authorized_count": 0,
"offsite_sync_authorized_count": 0,
"offsite_remote_delete_authorized_count": 0,
"credential_escrow_marker_write_authorized_count": 0,
"retention_change_authorized_count": 0,
"restic_prune_authorized_count": 0,
"rclone_config_authorized_count": 0,
"velero_restore_authorized_count": 0,
"velero_backup_authorized_count": 0,
"kubectl_action_authorized_count": 0,
"ssh_read_authorized_count": 0,
"ssh_write_authorized_count": 0,
"secret_value_collection_allowed_count": 0,
"host_write_authorized_count": 0,
"active_scan_authorized_count": 0,
"runtime_gate_count": 0,
"action_button_count": 0,
},
"execution_boundaries": {
"request_dispatch_authorized": False,
"owner_response_accepted": False,
"live_evidence_received": False,
"backup_run_authorized": False,
"restore_run_authorized": False,
"restore_drill_authorized": False,
"offsite_sync_authorized": False,
"offsite_remote_delete_authorized": False,
"credential_escrow_marker_write_authorized": False,
"retention_change_authorized": False,
"restic_prune_authorized": False,
"rclone_config_authorized": False,
"velero_restore_authorized": False,
"velero_backup_authorized": False,
"kubectl_action_authorized": False,
"ssh_read_authorized": False,
"ssh_write_authorized": False,
"secret_value_collection_allowed": False,
"host_write_authorized": False,
"active_scan_authorized": False,
"runtime_execution_authorized": False,
"action_buttons_allowed": False,
"not_authorization": True,
},
"acceptance_fields": ACCEPTANCE_FIELDS,
"required_owner_fields": required_owner_fields,
"reviewer_checks": REVIEWER_CHECKS,
"outcome_lanes": OUTCOME_LANES,
"blocked_actions": BLOCKED_ACTIONS,
"acceptance_candidates": acceptance_candidates,
"next_steps": [
"等待 owner response未收到前不得更新 accepted count。",
"收到回覆後先走 raw payload / secret / scope / restore / retention / freshness / isolation / remote delete guard / evidence ref 檢查,不合格即隔離、拒收或補件。",
"metadata 合格也只能進 restore / retention reviewer reviewbackup、restore、offsite sync、escrow marker、retention change 與 production write 仍需獨立人工批准。",
],
}
def main() -> int:
parser = argparse.ArgumentParser(description="IwoooS Backup / Restore / Escrow owner response acceptance 只讀帳本產生器")
parser.add_argument("--root", default=".", help="repo root")
parser.add_argument(
"--inventory-report",
default="docs/security/backup-restore-escrow-inventory.snapshot.json",
help="backup-restore-escrow-inventory.py 輸出的 JSON",
)
parser.add_argument(
"--owner-request-report",
default="docs/security/backup-restore-owner-request-draft.snapshot.json",
help="backup-restore-owner-request-draft.py 輸出的 JSON",
)
parser.add_argument("--output", help="寫出 JSON 報告")
parser.add_argument("--generated-at", help="固定報告時間,供 committed snapshot 使用")
args = parser.parse_args()
root = Path(args.root).resolve()
inventory = load_json(root / args.inventory_report)
request_draft_report = load_json(root / args.owner_request_report)
report = build_report(root, inventory, request_draft_report, args.generated_at)
payload = json.dumps(report, ensure_ascii=False, indent=2, sort_keys=True)
if args.output:
output = Path(args.output)
output.parent.mkdir(parents=True, exist_ok=True)
output.write_text(payload + "\n", encoding="utf-8")
else:
print(payload)
summary = report["summary"]
print(
"BACKUP_RESTORE_OWNER_RESPONSE_ACCEPTANCE_OK "
f"candidates={summary['acceptance_candidate_count']} "
f"write_capable={summary['write_capable_acceptance_candidate_count']} "
f"checks={summary['reviewer_check_count']} "
f"lanes={summary['outcome_lane_count']} "
f"accepted={summary['owner_response_accepted_count']} "
f"runtime_gate={summary['runtime_gate_count']}",
file=sys.stderr,
)
return 0
if __name__ == "__main__":
raise SystemExit(main())