Files
awoooi/apps/api/src/services/dependency_risk_policy.py
Your Name c28212027c
Some checks failed
CD Pipeline / tests (push) Successful in 1m23s
Code Review / ai-code-review (push) Successful in 13s
CD Pipeline / build-and-deploy (push) Failing after 3m52s
CD Pipeline / post-deploy-checks (push) Has been skipped
fix(api): resolve snapshot paths in production image
2026-06-04 22:26:44 +08:00

123 lines
4.9 KiB
Python

"""
Dependency risk policy snapshot.
Loads the latest committed, read-only CVE / license / drift severity policy.
The policy never queries external CVE or license services, installs packages,
upgrades dependencies, writes lockfiles, builds images, pulls images, pushes
registries, calls paid APIs, creates shadow/canary traffic, or changes
production routing.
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
from src.services.snapshot_paths import default_evaluations_dir
_DEFAULT_EVALUATIONS_DIR = default_evaluations_dir(Path(__file__))
_SNAPSHOT_PATTERN = "dependency_risk_policy_*.json"
_SCHEMA_VERSION = "dependency_risk_policy_v1"
def load_latest_dependency_risk_policy(
evaluations_dir: Path | None = None,
) -> dict[str, Any]:
"""Load the newest committed dependency risk policy snapshot."""
directory = evaluations_dir or _DEFAULT_EVALUATIONS_DIR
candidates = sorted(directory.glob(_SNAPSHOT_PATTERN))
if not candidates:
raise FileNotFoundError(f"no dependency risk policy snapshots found in {directory}")
latest = candidates[-1]
with latest.open(encoding="utf-8") as handle:
payload = json.load(handle)
if not isinstance(payload, dict):
raise ValueError(f"{latest}: expected JSON object")
_require_schema(payload, _SCHEMA_VERSION, str(latest))
_require_read_only_boundaries(payload, str(latest))
_require_operation_boundaries(payload, str(latest))
_require_rollup_consistency(payload, str(latest))
return payload
def _require_schema(payload: dict[str, Any], expected: str, label: str) -> None:
actual = payload.get("schema_version")
if actual != expected:
raise ValueError(f"{label}: expected schema_version={expected}, got {actual!r}")
def _require_read_only_boundaries(payload: dict[str, Any], label: str) -> None:
program_status = payload.get("program_status") or {}
if program_status.get("read_only_mode") is not True:
raise ValueError(f"{label}: program_status.read_only_mode must be true")
boundaries = payload.get("approval_boundaries") or {}
blocked_flags = {
"sdk_installation_allowed",
"paid_api_call_allowed",
"shadow_or_canary_allowed",
"production_routing_allowed",
"destructive_operation_allowed",
}
allowed = sorted(flag for flag in blocked_flags if boundaries.get(flag) is not False)
if allowed:
raise ValueError(f"{label}: approval boundaries must remain false: {allowed}")
def _require_operation_boundaries(payload: dict[str, Any], label: str) -> None:
boundaries = payload.get("operation_boundaries") or {}
if boundaries.get("read_only_policy_allowed") is not True:
raise ValueError(f"{label}: read_only_policy_allowed must be true")
blocked_flags = {
"external_cve_lookup_allowed",
"external_license_lookup_allowed",
"package_installation_allowed",
"package_upgrade_allowed",
"lockfile_write_allowed",
"docker_build_allowed",
"image_pull_allowed",
"image_rebuild_allowed",
"registry_push_allowed",
"paid_api_call_allowed",
"shadow_or_canary_allowed",
"production_routing_allowed",
}
allowed = sorted(flag for flag in blocked_flags if boundaries.get(flag) is not False)
if allowed:
raise ValueError(f"{label}: operation boundaries must remain false: {allowed}")
def _require_rollup_consistency(payload: dict[str, Any], label: str) -> None:
rules = payload.get("severity_rules") or []
rollups = payload.get("rollups") or {}
total = rollups.get("total_rules")
if total != len(rules):
raise ValueError(f"{label}: rollups.total_rules must equal severity_rules length")
by_severity = rollups.get("by_severity") or {}
for severity in ("critical", "high", "medium", "low"):
actual = sum(1 for rule in rules if rule.get("severity") == severity)
if by_severity.get(severity) != actual:
raise ValueError(f"{label}: rollups.by_severity.{severity} must match rules")
by_status = rollups.get("by_status") or {}
for status in ("accepted", "action_required", "planned_next", "blocked"):
actual = sum(1 for rule in rules if rule.get("status") == status)
expected = by_status.get(status, 0)
if expected != actual:
raise ValueError(f"{label}: rollups.by_status.{status} must match rules")
expected_by_status = {
"action_required": set(rollups.get("action_required_rule_ids") or []),
"planned_next": set(rollups.get("planned_next_rule_ids") or []),
"accepted": set(rollups.get("accepted_rule_ids") or []),
}
for status, expected_ids in expected_by_status.items():
actual_ids = {rule.get("rule_id") for rule in rules if rule.get("status") == status}
if expected_ids != actual_ids:
raise ValueError(f"{label}: rollups.{status}_rule_ids must match rules")