Files
awoooi/scripts/security/k8s-argocd-owner-request-draft.py

229 lines
8.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
IwoooS K8s / ArgoCD owner request draft 產生器。
本工具讀取 K8s / ArgoCD repo-only manifest inventory將 scan groups 轉成
人工送件前 request draft。它不讀 live cluster、不呼叫 ArgoCD API、不 sync、
不執行 kubectl、不套用 manifest、不讀 secret value。
"""
from __future__ import annotations
import argparse
import json
import subprocess
import sys
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any
TAIPEI = timezone(timedelta(hours=8))
REQUEST_FIELDS = [
"request_id",
"group_id",
"root",
"control_tier",
"file_count",
"yaml_manifest_file_count",
"supporting_source_file_count",
"top_level_kind_marker_count",
"owner_role_or_team",
"decision",
"decision_reason",
"affected_scope",
"redacted_evidence_refs",
"argocd_health_readback_ref",
"argocd_sync_revision_ref",
"rollback_revision",
"followup_owner",
"maintenance_window",
"validation_plan",
"not_approval",
]
def git_short_sha(root: Path) -> str:
try:
result = subprocess.run(
["git", "rev-parse", "--short", "HEAD"],
cwd=root,
check=True,
capture_output=True,
text=True,
)
return result.stdout.strip()
except Exception:
return "unknown"
def load_json(path: Path) -> dict[str, Any]:
return json.loads(path.read_text(encoding="utf-8"))
def request_for(group: dict[str, Any], inventory: dict[str, Any]) -> dict[str, Any]:
group_id = group["group_id"]
rows = [row for row in inventory["manifest_rows"] if row["group_id"] == group_id]
gate_tags = sorted({tag for row in rows for tag in row.get("gate_tags", [])})
top_level_kinds = sorted({kind for row in rows for kind in row.get("top_level_kinds", [])})
return {
"request_id": f"k8s_argocd_owner_request:{group_id}",
"status": "draft_not_dispatched",
"group_id": group_id,
"root": group["root"],
"label": group["label"],
"control_tier": group["control_tier"],
"file_count": group["file_count"],
"yaml_manifest_file_count": group["yaml_manifest_file_count"],
"supporting_source_file_count": group["supporting_source_file_count"],
"top_level_kind_marker_count": sum(row["top_level_kind_count"] for row in rows),
"gate_tags": gate_tags,
"top_level_kinds": top_level_kinds,
"source_snapshot_ref": "docs/security/k8s-argocd-manifest-inventory.snapshot.json",
"request_fields": REQUEST_FIELDS,
"required_owner_fields": inventory["required_owner_fields"],
"evidence_gaps": inventory["evidence_gaps"],
"blocked_actions": inventory["blocked_actions"],
"owner_role_or_team": "pending_owner_role_or_team",
"decision": "pending_owner_decision",
"decision_reason": "pending_decision_reason",
"affected_scope": "pending_affected_scope",
"redacted_evidence_refs": [],
"argocd_health_readback_ref": None,
"argocd_sync_revision_ref": None,
"rollback_revision": "pending_rollback_revision",
"followup_owner": "pending_followup_owner",
"maintenance_window": "pending_maintenance_window",
"validation_plan": "pending_validation_plan",
"not_approval": True,
"request_sent": False,
"recipient_confirmed": False,
"owner_response_received": False,
"owner_response_accepted": False,
"owner_response_rejected": False,
"rendered_manifest_diff_ready": False,
"argocd_health_readback_received": False,
"argocd_sync_authorized": False,
"argocd_sync_executed": False,
"kubectl_action_authorized": False,
"kubectl_action_executed": False,
"live_cluster_read_authorized": False,
"live_cluster_read_executed": False,
"secret_value_collection_allowed": False,
"runtime_gate": False,
"action_buttons_allowed": False,
"production_write_authorized": False,
}
def build_report(root: Path, inventory: dict[str, Any], generated_at: str | None) -> dict[str, Any]:
report_time = generated_at or datetime.now(TAIPEI).isoformat(timespec="seconds")
requests = [request_for(group, inventory) for group in inventory["group_summaries"]]
c0_requests = [item for item in requests if item["control_tier"] == "C0"]
c1_requests = [item for item in requests if item["control_tier"] == "C1"]
return {
"schema_version": "k8s_argocd_owner_request_draft_v1",
"generated_at": report_time,
"git_commit": git_short_sha(root),
"source_inventory_schema_version": inventory.get("schema_version"),
"source_inventory_status": inventory.get("status"),
"status": "owner_request_draft_ready_not_dispatched",
"summary": {
"request_draft_count": len(requests),
"c0_request_draft_count": len(c0_requests),
"c1_request_draft_count": len(c1_requests),
"request_field_count": len(REQUEST_FIELDS),
"required_owner_field_count": len(inventory["required_owner_fields"]),
"evidence_gap_count": len(inventory["evidence_gaps"]),
"blocked_action_count": len(inventory["blocked_actions"]),
"request_sent_count": 0,
"recipient_confirmed_count": 0,
"owner_response_received_count": 0,
"owner_response_accepted_count": 0,
"owner_response_rejected_count": 0,
"rendered_manifest_diff_ready_count": 0,
"argocd_health_readback_received_count": 0,
"argocd_sync_authorized_count": 0,
"argocd_sync_executed_count": 0,
"kubectl_action_authorized_count": 0,
"kubectl_action_executed_count": 0,
"live_cluster_read_authorized_count": 0,
"live_cluster_read_executed_count": 0,
"secret_value_collection_allowed_count": 0,
"runtime_gate_count": 0,
"action_button_count": 0,
},
"execution_boundaries": {
"request_sent": False,
"recipient_confirmed": False,
"owner_response_received": False,
"owner_response_accepted": False,
"rendered_manifest_diff_ready": False,
"argocd_api_read_authorized": False,
"argocd_sync_authorized": False,
"argocd_sync_executed": False,
"kubectl_action_authorized": False,
"kubectl_action_executed": False,
"live_cluster_read_authorized": False,
"live_cluster_read_executed": False,
"secret_value_collection_allowed": False,
"production_write_authorized": False,
"runtime_execution_authorized": False,
"action_buttons_allowed": False,
"not_authorization": True,
},
"request_fields": REQUEST_FIELDS,
"required_owner_fields": inventory["required_owner_fields"],
"evidence_gaps": inventory["evidence_gaps"],
"blocked_actions": inventory["blocked_actions"],
"request_drafts": requests,
"next_steps": [
"人工送件前確認 recipient role / team、snapshot 版本與 affected scope。",
"收到 owner response 後先做欄位完整性與敏感 payload 隔離,不得直接 sync 或 apply。",
"若未來要 live readback、ArgoCD sync 或 kubectl action必須另開維護窗口、rollback revision 與 post-check gate。",
],
}
def main() -> int:
parser = argparse.ArgumentParser(description="IwoooS K8s / ArgoCD owner request draft 產生器")
parser.add_argument("--root", default=".", help="repo root")
parser.add_argument(
"--inventory-report",
default="docs/security/k8s-argocd-manifest-inventory.snapshot.json",
help="k8s-argocd-manifest-inventory.py 輸出的 JSON",
)
parser.add_argument("--output", help="寫出 JSON 報告")
parser.add_argument("--generated-at", help="固定報告時間,供 committed snapshot 使用")
args = parser.parse_args()
root = Path(args.root).resolve()
inventory = load_json(root / args.inventory_report)
report = build_report(root, inventory, args.generated_at)
payload = json.dumps(report, ensure_ascii=False, indent=2, sort_keys=True)
if args.output:
output = Path(args.output)
output.parent.mkdir(parents=True, exist_ok=True)
output.write_text(payload + "\n", encoding="utf-8")
else:
print(payload)
summary = report["summary"]
print(
"K8S_ARGOCD_OWNER_REQUEST_DRAFT_OK "
f"drafts={summary['request_draft_count']} "
f"c0={summary['c0_request_draft_count']} "
f"fields={summary['required_owner_field_count']} "
f"sent={summary['request_sent_count']} "
f"runtime_gate={summary['runtime_gate_count']}",
file=sys.stderr,
)
return 0
if __name__ == "__main__":
sys.exit(main())