""" Dependency drift check plan snapshot. Loads the latest committed, read-only dependency drift and external source watch design. The plan never activates schedules, writes workflows, queries external sources, installs SDKs, calls paid APIs, installs or upgrades packages, writes lockfiles, builds or pulls images, pushes registries, creates shadow/canary traffic, or changes production routing. """ from __future__ import annotations import json from pathlib import Path from typing import Any from src.services.snapshot_paths import default_evaluations_dir _DEFAULT_EVALUATIONS_DIR = default_evaluations_dir(Path(__file__)) _SNAPSHOT_PATTERN = "dependency_drift_check_plan_*.json" _SCHEMA_VERSION = "dependency_drift_check_plan_v1" def load_latest_dependency_drift_check_plan( evaluations_dir: Path | None = None, ) -> dict[str, Any]: """Load the newest committed dependency drift check plan snapshot.""" directory = evaluations_dir or _DEFAULT_EVALUATIONS_DIR candidates = sorted(directory.glob(_SNAPSHOT_PATTERN)) if not candidates: raise FileNotFoundError(f"no dependency drift check plan snapshots found in {directory}") latest = candidates[-1] with latest.open(encoding="utf-8") as handle: payload = json.load(handle) if not isinstance(payload, dict): raise ValueError(f"{latest}: expected JSON object") _require_schema(payload, _SCHEMA_VERSION, str(latest)) _require_read_only_boundaries(payload, str(latest)) _require_operation_boundaries(payload, str(latest)) _require_rollup_consistency(payload, str(latest)) return payload def _require_schema(payload: dict[str, Any], expected: str, label: str) -> None: actual = payload.get("schema_version") if actual != expected: raise ValueError(f"{label}: expected schema_version={expected}, got {actual!r}") def _require_read_only_boundaries(payload: dict[str, Any], label: str) -> None: program_status = payload.get("program_status") or {} if program_status.get("read_only_mode") is not True: raise ValueError(f"{label}: program_status.read_only_mode must be true") boundaries = payload.get("approval_boundaries") or {} blocked_flags = { "sdk_installation_allowed", "paid_api_call_allowed", "shadow_or_canary_allowed", "production_routing_allowed", "destructive_operation_allowed", } allowed = sorted(flag for flag in blocked_flags if boundaries.get(flag) is not False) if allowed: raise ValueError(f"{label}: approval boundaries must remain false: {allowed}") def _require_operation_boundaries(payload: dict[str, Any], label: str) -> None: boundaries = payload.get("operation_boundaries") or {} if boundaries.get("read_only_plan_allowed") is not True: raise ValueError(f"{label}: read_only_plan_allowed must be true") blocked_flags = { "schedule_activation_allowed", "workflow_write_allowed", "external_cve_lookup_allowed", "external_license_lookup_allowed", "registry_lookup_allowed", "agent_market_external_lookup_allowed", "sdk_installation_allowed", "paid_api_call_allowed", "package_installation_allowed", "package_upgrade_allowed", "lockfile_write_allowed", "docker_build_allowed", "image_pull_allowed", "image_rebuild_allowed", "registry_push_allowed", "shadow_or_canary_allowed", "production_routing_allowed", } allowed = sorted(flag for flag in blocked_flags if boundaries.get(flag) is not False) if allowed: raise ValueError(f"{label}: operation boundaries must remain false: {allowed}") def _require_rollup_consistency(payload: dict[str, Any], label: str) -> None: cadence_items = ((payload.get("cadence_policy") or {}).get("items")) or [] local_checks = payload.get("local_check_plan") or [] external_sources = payload.get("external_source_candidates") or [] rollups = payload.get("rollups") or {} if rollups.get("total_cadence_items") != len(cadence_items): raise ValueError(f"{label}: rollups.total_cadence_items must match cadence items") if rollups.get("total_local_checks") != len(local_checks): raise ValueError(f"{label}: rollups.total_local_checks must match local_check_plan") if rollups.get("total_external_source_candidates") != len(external_sources): raise ValueError( f"{label}: rollups.total_external_source_candidates must match external_source_candidates" ) local_ids = {check.get("check_id") for check in local_checks if check.get("status") == "read_only_design"} if set(rollups.get("read_only_local_check_ids") or []) != local_ids: raise ValueError(f"{label}: rollups.read_only_local_check_ids must match local checks") source_ids = { source.get("source_id") for source in external_sources if source.get("approval_status") in {"approval_required", "blocked_until_approval"} } if set(rollups.get("approval_required_source_ids") or []) != source_ids: raise ValueError(f"{label}: rollups.approval_required_source_ids must match external sources") cadence_ids = { item.get("cadence_id") for item in cadence_items if item.get("activation_status") in {"design_only", "blocked_until_approval"} } if set(rollups.get("design_only_cadence_ids") or []) != cadence_ids: raise ValueError(f"{label}: rollups.design_only_cadence_ids must match cadence items")