121 lines
4.9 KiB
Python
121 lines
4.9 KiB
Python
"""
|
|
Docker build surface 盤點快照。
|
|
|
|
只讀取已提交的 JSON 快照;不執行 docker build、不 pull image、
|
|
不推 registry、不查外部 CVE、不安裝套件、不改生產路由。
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
_REPO_ROOT = Path(__file__).resolve().parents[4]
|
|
_DEFAULT_EVALUATIONS_DIR = _REPO_ROOT / "docs" / "evaluations"
|
|
_SNAPSHOT_PATTERN = "docker_build_surface_inventory_*.json"
|
|
_SCHEMA_VERSION = "docker_build_surface_inventory_v1"
|
|
|
|
|
|
def load_latest_docker_build_surface_inventory(
|
|
evaluations_dir: Path | None = None,
|
|
) -> dict[str, Any]:
|
|
"""載入最新已提交的 Docker build surface 盤點快照。"""
|
|
directory = evaluations_dir or _DEFAULT_EVALUATIONS_DIR
|
|
candidates = sorted(directory.glob(_SNAPSHOT_PATTERN))
|
|
if not candidates:
|
|
raise FileNotFoundError(f"no Docker build surface inventory snapshots found in {directory}")
|
|
|
|
latest = candidates[-1]
|
|
with latest.open(encoding="utf-8") as handle:
|
|
payload = json.load(handle)
|
|
|
|
if not isinstance(payload, dict):
|
|
raise ValueError(f"{latest}: expected JSON object")
|
|
_require_schema(payload, _SCHEMA_VERSION, str(latest))
|
|
_require_read_only_boundaries(payload, str(latest))
|
|
_require_operation_boundaries(payload, str(latest))
|
|
_require_rollup_consistency(payload, str(latest))
|
|
return payload
|
|
|
|
|
|
def _require_schema(payload: dict[str, Any], expected: str, label: str) -> None:
|
|
actual = payload.get("schema_version")
|
|
if actual != expected:
|
|
raise ValueError(f"{label}: expected schema_version={expected}, got {actual!r}")
|
|
|
|
|
|
def _require_read_only_boundaries(payload: dict[str, Any], label: str) -> None:
|
|
program_status = payload.get("program_status") or {}
|
|
if program_status.get("read_only_mode") is not True:
|
|
raise ValueError(f"{label}: program_status.read_only_mode must be true")
|
|
|
|
boundaries = payload.get("approval_boundaries") or {}
|
|
blocked_flags = {
|
|
"sdk_installation_allowed",
|
|
"paid_api_call_allowed",
|
|
"shadow_or_canary_allowed",
|
|
"production_routing_allowed",
|
|
"destructive_operation_allowed",
|
|
}
|
|
allowed = sorted(flag for flag in blocked_flags if boundaries.get(flag) is not False)
|
|
if allowed:
|
|
raise ValueError(f"{label}: approval boundaries must remain false: {allowed}")
|
|
|
|
|
|
def _require_operation_boundaries(payload: dict[str, Any], label: str) -> None:
|
|
boundaries = payload.get("operation_boundaries") or {}
|
|
if boundaries.get("read_only_api_allowed") is not True:
|
|
raise ValueError(f"{label}: read_only_api_allowed must be true")
|
|
|
|
blocked_flags = {
|
|
"docker_build_allowed",
|
|
"image_pull_allowed",
|
|
"image_rebuild_allowed",
|
|
"registry_push_allowed",
|
|
"external_cve_lookup_allowed",
|
|
"package_installation_allowed",
|
|
"production_routing_allowed",
|
|
}
|
|
allowed = sorted(flag for flag in blocked_flags if boundaries.get(flag) is not False)
|
|
if allowed:
|
|
raise ValueError(f"{label}: operation boundaries must remain false: {allowed}")
|
|
|
|
|
|
def _require_rollup_consistency(payload: dict[str, Any], label: str) -> None:
|
|
surfaces = payload.get("surfaces") or []
|
|
rollups = payload.get("rollups") or {}
|
|
total = rollups.get("total_surfaces")
|
|
if total != len(surfaces):
|
|
raise ValueError(f"{label}: rollups.total_surfaces must equal surfaces length")
|
|
|
|
action_required = set(rollups.get("action_required_surface_ids") or [])
|
|
actual_action_required = {
|
|
surface.get("surface_id") for surface in surfaces if surface.get("status") == "action_required"
|
|
}
|
|
if action_required != actual_action_required:
|
|
raise ValueError(
|
|
f"{label}: rollups.action_required_surface_ids must match action_required surfaces"
|
|
)
|
|
|
|
planned_next = set(rollups.get("planned_next_surface_ids") or [])
|
|
actual_planned_next = {
|
|
surface.get("surface_id") for surface in surfaces if surface.get("status") == "planned_next"
|
|
}
|
|
if planned_next != actual_planned_next:
|
|
raise ValueError(f"{label}: rollups.planned_next_surface_ids must match planned_next surfaces")
|
|
|
|
network_fetches = sum(len(surface.get("build_time_network_fetches") or []) for surface in surfaces)
|
|
if rollups.get("build_time_network_fetch_count") != network_fetches:
|
|
raise ValueError(
|
|
f"{label}: rollups.build_time_network_fetch_count must equal build_time_network_fetches length"
|
|
)
|
|
|
|
non_root_count = sum(1 for surface in surfaces if surface.get("non_root_runtime") is True)
|
|
if rollups.get("non_root_runtime_count") != non_root_count:
|
|
raise ValueError(f"{label}: rollups.non_root_runtime_count must match non-root surfaces")
|
|
|
|
healthcheck_count = sum(1 for surface in surfaces if surface.get("healthcheck_present") is True)
|
|
if rollups.get("healthcheck_count") != healthcheck_count:
|
|
raise ValueError(f"{label}: rollups.healthcheck_count must match healthcheck surfaces")
|