""" Docker build surface 盤點快照。 只讀取已提交的 JSON 快照;不執行 docker build、不 pull image、 不推 registry、不查外部 CVE、不安裝套件、不改生產路由。 """ from __future__ import annotations import json from pathlib import Path from typing import Any from src.services.snapshot_paths import default_evaluations_dir _DEFAULT_EVALUATIONS_DIR = default_evaluations_dir(Path(__file__)) _SNAPSHOT_PATTERN = "docker_build_surface_inventory_*.json" _SCHEMA_VERSION = "docker_build_surface_inventory_v1" def load_latest_docker_build_surface_inventory( evaluations_dir: Path | None = None, ) -> dict[str, Any]: """載入最新已提交的 Docker build surface 盤點快照。""" directory = evaluations_dir or _DEFAULT_EVALUATIONS_DIR candidates = sorted(directory.glob(_SNAPSHOT_PATTERN)) if not candidates: raise FileNotFoundError(f"no Docker build surface inventory snapshots found in {directory}") latest = candidates[-1] with latest.open(encoding="utf-8") as handle: payload = json.load(handle) if not isinstance(payload, dict): raise ValueError(f"{latest}: expected JSON object") _require_schema(payload, _SCHEMA_VERSION, str(latest)) _require_read_only_boundaries(payload, str(latest)) _require_operation_boundaries(payload, str(latest)) _require_rollup_consistency(payload, str(latest)) return payload def _require_schema(payload: dict[str, Any], expected: str, label: str) -> None: actual = payload.get("schema_version") if actual != expected: raise ValueError(f"{label}: expected schema_version={expected}, got {actual!r}") def _require_read_only_boundaries(payload: dict[str, Any], label: str) -> None: program_status = payload.get("program_status") or {} if program_status.get("read_only_mode") is not True: raise ValueError(f"{label}: program_status.read_only_mode must be true") boundaries = payload.get("approval_boundaries") or {} blocked_flags = { "sdk_installation_allowed", "paid_api_call_allowed", "shadow_or_canary_allowed", "production_routing_allowed", "destructive_operation_allowed", } allowed = sorted(flag for flag in blocked_flags if boundaries.get(flag) is not False) if allowed: raise ValueError(f"{label}: approval boundaries must remain false: {allowed}") def _require_operation_boundaries(payload: dict[str, Any], label: str) -> None: boundaries = payload.get("operation_boundaries") or {} if boundaries.get("read_only_api_allowed") is not True: raise ValueError(f"{label}: read_only_api_allowed must be true") blocked_flags = { "docker_build_allowed", "image_pull_allowed", "image_rebuild_allowed", "registry_push_allowed", "external_cve_lookup_allowed", "package_installation_allowed", "production_routing_allowed", } allowed = sorted(flag for flag in blocked_flags if boundaries.get(flag) is not False) if allowed: raise ValueError(f"{label}: operation boundaries must remain false: {allowed}") def _require_rollup_consistency(payload: dict[str, Any], label: str) -> None: surfaces = payload.get("surfaces") or [] rollups = payload.get("rollups") or {} total = rollups.get("total_surfaces") if total != len(surfaces): raise ValueError(f"{label}: rollups.total_surfaces must equal surfaces length") action_required = set(rollups.get("action_required_surface_ids") or []) actual_action_required = { surface.get("surface_id") for surface in surfaces if surface.get("status") == "action_required" } if action_required != actual_action_required: raise ValueError( f"{label}: rollups.action_required_surface_ids must match action_required surfaces" ) planned_next = set(rollups.get("planned_next_surface_ids") or []) actual_planned_next = { surface.get("surface_id") for surface in surfaces if surface.get("status") == "planned_next" } if planned_next != actual_planned_next: raise ValueError(f"{label}: rollups.planned_next_surface_ids must match planned_next surfaces") network_fetches = sum(len(surface.get("build_time_network_fetches") or []) for surface in surfaces) if rollups.get("build_time_network_fetch_count") != network_fetches: raise ValueError( f"{label}: rollups.build_time_network_fetch_count must equal build_time_network_fetches length" ) non_root_count = sum(1 for surface in surfaces if surface.get("non_root_runtime") is True) if rollups.get("non_root_runtime_count") != non_root_count: raise ValueError(f"{label}: rollups.non_root_runtime_count must match non-root surfaces") healthcheck_count = sum(1 for surface in surfaces if surface.get("healthcheck_present") is True) if rollups.get("healthcheck_count") != healthcheck_count: raise ValueError(f"{label}: rollups.healthcheck_count must match healthcheck surfaces")