""" Runtime surface inventory snapshot. Loads the latest committed, read-only API / Web / Worker / K8s runtime surface inventory. This module never calls the live cluster, reads Secret payloads, executes rollouts, changes routing, or enables active scans. """ from __future__ import annotations import json from pathlib import Path from typing import Any from src.services.snapshot_paths import default_evaluations_dir _DEFAULT_EVALUATIONS_DIR = default_evaluations_dir(Path(__file__)) _SNAPSHOT_PATTERN = "runtime_surface_inventory_*.json" _SCHEMA_VERSION = "runtime_surface_inventory_v1" def load_latest_runtime_surface_inventory( evaluations_dir: Path | None = None, ) -> dict[str, Any]: """Load the newest committed runtime surface inventory snapshot.""" directory = evaluations_dir or _DEFAULT_EVALUATIONS_DIR candidates = sorted(directory.glob(_SNAPSHOT_PATTERN)) if not candidates: raise FileNotFoundError(f"no runtime surface inventory snapshots found in {directory}") latest = candidates[-1] with latest.open(encoding="utf-8") as handle: payload = json.load(handle) if not isinstance(payload, dict): raise ValueError(f"{latest}: expected JSON object") _require_schema(payload, _SCHEMA_VERSION, str(latest)) _require_read_only_boundaries(payload, str(latest)) _require_operation_boundaries(payload, str(latest)) _require_rollup_consistency(payload, str(latest)) _require_operator_denials(payload, str(latest)) _require_surface_evidence(payload, str(latest)) return payload def _require_schema(payload: dict[str, Any], expected: str, label: str) -> None: actual = payload.get("schema_version") if actual != expected: raise ValueError(f"{label}: expected schema_version={expected}, got {actual!r}") def _require_read_only_boundaries(payload: dict[str, Any], label: str) -> None: program_status = payload.get("program_status") or {} if program_status.get("read_only_mode") is not True: raise ValueError(f"{label}: program_status.read_only_mode must be true") approval_boundaries = payload.get("approval_boundaries") or {} allowed = sorted(key for key, value in approval_boundaries.items() if value is not False) if allowed: raise ValueError(f"{label}: approval boundaries must remain false: {allowed}") def _require_operation_boundaries(payload: dict[str, Any], label: str) -> None: boundaries = payload.get("operation_boundaries") or {} if boundaries.get("read_only_api_allowed") is not True: raise ValueError(f"{label}: read_only_api_allowed must be true") blocked_flags = { "live_k8s_query_allowed", "kubectl_allowed", "rollout_allowed", "restart_allowed", "scale_allowed", "delete_allowed", "secret_read_allowed", "secret_plaintext_allowed", "active_scan_allowed", "production_route_change_allowed", } allowed = sorted(flag for flag in blocked_flags if boundaries.get(flag) is not False) if allowed: raise ValueError(f"{label}: operation boundaries must remain false: {allowed}") def _require_rollup_consistency(payload: dict[str, Any], label: str) -> None: surfaces = payload.get("runtime_surfaces") or [] components = payload.get("source_runtime_components") or [] rollups = payload.get("rollups") or {} if rollups.get("total_surfaces") != len(surfaces): raise ValueError(f"{label}: rollups.total_surfaces must match runtime_surfaces") if rollups.get("by_kind") != _count_by(surfaces, "kind"): raise ValueError(f"{label}: rollups.by_kind must match runtime_surfaces") if rollups.get("by_status") != _count_by(surfaces, "status"): raise ValueError(f"{label}: rollups.by_status must match runtime_surfaces") if rollups.get("by_evidence_level") != _count_by(surfaces, "evidence_level"): raise ValueError(f"{label}: rollups.by_evidence_level must match runtime_surfaces") action_required = sorted( surface.get("surface_id") for surface in surfaces if surface.get("status") != "manifest_mapped" ) if sorted(rollups.get("action_required_surface_ids") or []) != action_required: raise ValueError(f"{label}: rollups.action_required_surface_ids must match surfaces") secret_surfaces = sorted( surface.get("surface_id") for surface in surfaces if surface.get("kind") == "secret" or surface.get("secret_exposure") != "none" ) if sorted(rollups.get("secret_surface_ids") or []) != secret_surfaces: raise ValueError(f"{label}: rollups.secret_surface_ids must match secret surfaces") live_check_required = sorted( surface.get("surface_id") for surface in surfaces if surface.get("live_check_status") == "required" ) if sorted(rollups.get("live_check_missing_surface_ids") or []) != live_check_required: raise ValueError(f"{label}: rollups.live_check_missing_surface_ids must match surfaces") if rollups.get("total_source_components") != len(components): raise ValueError(f"{label}: rollups.total_source_components must match source_runtime_components") bound_components = sum(1 for component in components if component.get("status") == "bound") if rollups.get("source_components_with_runtime_binding") != bound_components: raise ValueError( f"{label}: rollups.source_components_with_runtime_binding must match source_runtime_components" ) def _require_operator_denials(payload: dict[str, Any], label: str) -> None: contract = payload.get("operator_contract") or {} must_not_interpret_as = set(contract.get("must_not_interpret_as") or []) required_denials = { "runtime 執行授權", "rollout / restart / scale / delete 批准", "Secret 已驗證或可讀取", "Ingress / DNS 可修改", } if not required_denials.issubset(must_not_interpret_as): raise ValueError(f"{label}: operator_contract.must_not_interpret_as is missing required denials") def _require_surface_evidence(payload: dict[str, Any], label: str) -> None: surfaces = payload.get("runtime_surfaces") or [] missing_evidence = sorted( surface.get("surface_id") for surface in surfaces if not surface.get("manifest_ref") or not surface.get("evidence_refs") ) if missing_evidence: raise ValueError(f"{label}: runtime surfaces must include manifest_ref and evidence_refs: {missing_evidence}") plaintext_secret = sorted( surface.get("surface_id") for surface in surfaces if surface.get("kind") == "secret" and surface.get("secret_exposure") not in {"template_only", "name_only", "payload_redacted"} ) if plaintext_secret: raise ValueError(f"{label}: secret surfaces must stay redacted: {plaintext_secret}") def _count_by(items: list[dict[str, Any]], key: str) -> dict[str, int]: counts: dict[str, int] = {} for item in items: value = item.get(key) counts[value] = counts.get(value, 0) + 1 return counts