Files
awoooi/scripts/security/k8s-argocd-owner-response-acceptance.py

359 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
IwoooS K8s / ArgoCD owner response acceptance 只讀帳本產生器。
本工具讀取 K8s / ArgoCD manifest inventory 與 owner request draft建立未來
owner response 如何收件、補件、隔離、拒收或進 rendered manifest review 的
metadata-only acceptance ledger。它不讀 live cluster、不呼叫 ArgoCD API、不
sync、不執行 kubectl、不套用 manifest、不讀 secret value。
"""
from __future__ import annotations
import argparse
import json
import subprocess
import sys
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any
TAIPEI = timezone(timedelta(hours=8))
ACCEPTANCE_FIELDS = [
"acceptance_candidate_id",
"request_id",
"group_id",
"root",
"control_tier",
"owner_response_ref",
"owner_role_or_team",
"decision",
"decision_reason",
"affected_scope",
"redacted_evidence_refs",
"argocd_health_readback_ref",
"argocd_sync_revision_ref",
"rollback_revision",
"rendered_manifest_diff_ref",
"maintenance_window",
"validation_plan",
"reviewer_outcome",
"followup_owner",
"not_approval",
]
REVIEWER_CHECKS = [
{
"check_id": "owner_identity_present",
"instruction": "owner role / team 必須可追溯,不能只寫聊天同意或模糊職稱。",
},
{
"check_id": "decision_reason_present",
"instruction": "decision 與 decision reason 必須同時存在,且不得包含機敏值。",
},
{
"check_id": "affected_scope_matches_group",
"instruction": "affected scope 必須能對回 committed scan group 與 root path。",
},
{
"check_id": "redacted_refs_only",
"instruction": "evidence 只能是脫敏 ref、hash、ticket、commit 或 artifact pointer。",
},
{
"check_id": "secret_value_absent",
"instruction": "不得出現 Secret value、token、cookie、private key、完整 kubeconfig 或 credential derivative。",
},
{
"check_id": "argocd_health_readback_ref_shape",
"instruction": "ArgoCD health readback 只能是 owner-provided redacted ref不得觸發 live API read。",
},
{
"check_id": "argocd_sync_revision_ref_not_execution",
"instruction": "sync revision ref 只做 rollback / diff 判讀,不得自動 sync。",
},
{
"check_id": "rollback_revision_present",
"instruction": "C0 / C1 回覆都必須有 rollback revision 或明確禁止窗口。",
},
{
"check_id": "rendered_manifest_diff_ref_not_payload",
"instruction": "rendered manifest diff 只能是 ref不得把 manifest payload 貼入回覆。",
},
{
"check_id": "maintenance_window_present",
"instruction": "任何未來 ArgoCD sync / kubectl action 都必須另有維護窗口。",
},
{
"check_id": "validation_plan_present",
"instruction": "validation plan 必須列 health、rollout、metric、alert 與 rollback post-check。",
},
{
"check_id": "counts_transition_safe",
"instruction": "只有 reviewer record 可更新 received / accepted / rejected不得同時開 runtime gate。",
},
]
OUTCOME_LANES = [
{
"lane_id": "waiting_owner_response",
"meaning": "尚未收到 owner response所有 accepted / runtime count 維持 0。",
},
{
"lane_id": "quarantine_raw_payload",
"meaning": "收到 raw manifest、kubeconfig、Secret value 或不可保存內容時只能隔離。",
},
{
"lane_id": "reject_secret_or_unredacted_manifest",
"meaning": "出現 secret value、未脫敏 manifest payload 或 credential derivative 時直接拒收。",
},
{
"lane_id": "request_supplement",
"meaning": "欄位不足、scope 不清、rollback 缺失或 evidence ref 不可追溯時要求補件。",
},
{
"lane_id": "ready_for_rendered_manifest_review",
"meaning": "metadata 合格後,只能進 rendered manifest diff reviewer review。",
},
{
"lane_id": "owner_review_only_update",
"meaning": "只允許更新只讀 owner review ledger不得改 K8s、ArgoCD、Helm 或 secret。",
},
{
"lane_id": "waiting_runtime_gate",
"meaning": "即使 owner response acceptedruntime gate 仍等待獨立人工批准。",
},
]
BLOCKED_ACTIONS = [
"argocd_api_read_without_approval",
"live_cluster_read_without_approval",
"argocd_sync",
"kubectl_apply",
"kubectl_patch",
"kubectl_delete",
"helm_upgrade",
"secret_value_collection",
"live_cluster_write",
"manual_pod_restart",
"scale_workload",
"change_network_policy",
"change_rbac",
"restore_backup",
"render_manifest_diff_from_untrusted_payload",
"mark_owner_response_accepted_without_reviewer_record",
"open_runtime_gate",
"add_action_button",
]
def git_short_sha(root: Path) -> str:
try:
result = subprocess.run(
["git", "rev-parse", "--short", "HEAD"],
cwd=root,
check=True,
capture_output=True,
text=True,
)
return result.stdout.strip()
except Exception:
return "unknown"
def load_json(path: Path) -> dict[str, Any]:
return json.loads(path.read_text(encoding="utf-8"))
def acceptance_candidate(request: dict[str, Any]) -> dict[str, Any]:
group_id = request["group_id"]
return {
"acceptance_candidate_id": f"k8s_argocd_owner_response_acceptance:{group_id}",
"status": "waiting_owner_response",
"request_id": request["request_id"],
"group_id": group_id,
"root": request["root"],
"control_tier": request["control_tier"],
"owner_response_ref": None,
"owner_role_or_team": "pending_owner_response",
"decision": "pending_owner_response",
"decision_reason": "pending_owner_response",
"affected_scope": "pending_owner_response",
"redacted_evidence_refs": [],
"argocd_health_readback_ref": None,
"argocd_sync_revision_ref": None,
"rollback_revision": "pending_owner_response",
"rendered_manifest_diff_ref": None,
"maintenance_window": "pending_owner_response",
"validation_plan": "pending_owner_response",
"reviewer_outcome": "waiting_owner_response",
"followup_owner": "pending_owner_response",
"acceptance_fields": ACCEPTANCE_FIELDS,
"required_owner_fields": request["required_owner_fields"],
"reviewer_checks": [item["check_id"] for item in REVIEWER_CHECKS],
"outcome_lanes": [item["lane_id"] for item in OUTCOME_LANES],
"blocked_actions": BLOCKED_ACTIONS,
"not_approval": True,
"request_sent": False,
"recipient_confirmed": False,
"owner_response_received": False,
"owner_response_accepted": False,
"owner_response_rejected": False,
"owner_response_quarantined": False,
"supplement_requested": False,
"rendered_manifest_diff_candidate": False,
"rendered_manifest_diff_ready": False,
"argocd_health_readback_received": False,
"argocd_api_read_authorized": False,
"argocd_api_read_executed": False,
"argocd_sync_authorized": False,
"argocd_sync_executed": False,
"kubectl_action_authorized": False,
"kubectl_action_executed": False,
"live_cluster_read_authorized": False,
"live_cluster_read_executed": False,
"secret_value_collection_allowed": False,
"maintenance_window_accepted": False,
"rollback_revision_accepted": False,
"production_write_authorized": False,
"runtime_gate": False,
"action_buttons_allowed": False,
}
def build_report(
root: Path,
inventory: dict[str, Any],
request_draft_report: dict[str, Any],
generated_at: str | None,
) -> dict[str, Any]:
report_time = generated_at or datetime.now(TAIPEI).isoformat(timespec="seconds")
requests = request_draft_report.get("request_drafts", [])
acceptance_candidates = [acceptance_candidate(item) for item in requests]
c0_candidates = [item for item in acceptance_candidates if item["control_tier"] == "C0"]
c1_candidates = [item for item in acceptance_candidates if item["control_tier"] == "C1"]
return {
"schema_version": "k8s_argocd_owner_response_acceptance_v1",
"generated_at": report_time,
"git_commit": git_short_sha(root),
"source_inventory_schema_version": inventory.get("schema_version"),
"source_inventory_status": inventory.get("status"),
"source_owner_request_schema_version": request_draft_report.get("schema_version"),
"source_owner_request_status": request_draft_report.get("status"),
"status": "owner_response_acceptance_ledger_ready_no_runtime_action",
"summary": {
"source_scan_group_count": inventory.get("summary", {}).get("scan_group_count", 0),
"source_request_draft_count": request_draft_report.get("summary", {}).get("request_draft_count", 0),
"acceptance_candidate_count": len(acceptance_candidates),
"c0_acceptance_candidate_count": len(c0_candidates),
"c1_acceptance_candidate_count": len(c1_candidates),
"acceptance_field_count": len(ACCEPTANCE_FIELDS),
"required_owner_field_count": len(inventory["required_owner_fields"]),
"reviewer_check_count": len(REVIEWER_CHECKS),
"outcome_lane_count": len(OUTCOME_LANES),
"blocked_action_count": len(BLOCKED_ACTIONS),
"request_sent_count": 0,
"recipient_confirmed_count": 0,
"owner_response_received_count": 0,
"owner_response_accepted_count": 0,
"owner_response_rejected_count": 0,
"owner_response_quarantined_count": 0,
"supplement_requested_count": 0,
"rendered_manifest_diff_candidate_count": 0,
"rendered_manifest_diff_ready_count": 0,
"argocd_health_readback_received_count": 0,
"argocd_api_read_authorized_count": 0,
"argocd_api_read_executed_count": 0,
"argocd_sync_authorized_count": 0,
"argocd_sync_executed_count": 0,
"kubectl_action_authorized_count": 0,
"kubectl_action_executed_count": 0,
"live_cluster_read_authorized_count": 0,
"live_cluster_read_executed_count": 0,
"secret_value_collection_allowed_count": 0,
"maintenance_window_accepted_count": 0,
"rollback_revision_accepted_count": 0,
"runtime_gate_count": 0,
"action_button_count": 0,
},
"execution_boundaries": {
"request_dispatch_authorized": False,
"owner_response_accepted": False,
"argocd_api_read_authorized": False,
"argocd_api_read_executed": False,
"argocd_sync_authorized": False,
"argocd_sync_executed": False,
"kubectl_action_authorized": False,
"kubectl_action_executed": False,
"live_cluster_read_authorized": False,
"live_cluster_read_executed": False,
"secret_value_collection_allowed": False,
"rendered_manifest_diff_ready": False,
"production_write_authorized": False,
"runtime_execution_authorized": False,
"action_buttons_allowed": False,
"not_authorization": True,
},
"acceptance_fields": ACCEPTANCE_FIELDS,
"required_owner_fields": inventory["required_owner_fields"],
"reviewer_checks": REVIEWER_CHECKS,
"outcome_lanes": OUTCOME_LANES,
"blocked_actions": BLOCKED_ACTIONS,
"acceptance_candidates": acceptance_candidates,
"next_steps": [
"等待 owner response未收到前不得更新 accepted count。",
"收到回覆後先走 raw payload / secret / scope / rollback / evidence ref 檢查,不合格即隔離、拒收或補件。",
"metadata 合格也只能進 rendered manifest diff reviewer reviewArgoCD API read、sync、kubectl action、secret collection 與 production write 仍需獨立人工批准。",
],
}
def main() -> int:
parser = argparse.ArgumentParser(description="IwoooS K8s / ArgoCD owner response acceptance 只讀帳本產生器")
parser.add_argument("--root", default=".", help="repo root")
parser.add_argument(
"--inventory-report",
default="docs/security/k8s-argocd-manifest-inventory.snapshot.json",
help="k8s-argocd-manifest-inventory.py 輸出的 JSON",
)
parser.add_argument(
"--owner-request-report",
default="docs/security/k8s-argocd-owner-request-draft.snapshot.json",
help="k8s-argocd-owner-request-draft.py 輸出的 JSON",
)
parser.add_argument("--output", help="寫出 JSON 報告")
parser.add_argument("--generated-at", help="固定報告時間,供 committed snapshot 使用")
args = parser.parse_args()
root = Path(args.root).resolve()
inventory = load_json(root / args.inventory_report)
request_draft_report = load_json(root / args.owner_request_report)
report = build_report(root, inventory, request_draft_report, args.generated_at)
payload = json.dumps(report, ensure_ascii=False, indent=2, sort_keys=True)
if args.output:
output = Path(args.output)
output.parent.mkdir(parents=True, exist_ok=True)
output.write_text(payload + "\n", encoding="utf-8")
else:
print(payload)
summary = report["summary"]
print(
"K8S_ARGOCD_OWNER_RESPONSE_ACCEPTANCE_OK "
f"candidates={summary['acceptance_candidate_count']} "
f"c0={summary['c0_acceptance_candidate_count']} "
f"checks={summary['reviewer_check_count']} "
f"lanes={summary['outcome_lane_count']} "
f"accepted={summary['owner_response_accepted_count']} "
f"runtime_gate={summary['runtime_gate_count']}",
file=sys.stderr,
)
return 0
if __name__ == "__main__":
raise SystemExit(main())