feat(governance): 新增 release authorization readback gate
This commit is contained in:
@@ -148,6 +148,9 @@ from src.services.ai_agent_result_capture_final_release_candidate_readback impor
|
||||
from src.services.ai_agent_result_capture_release_authorization_hold import (
|
||||
load_latest_ai_agent_result_capture_release_authorization_hold,
|
||||
)
|
||||
from src.services.ai_agent_result_capture_release_authorization_readback_gate import (
|
||||
load_latest_ai_agent_result_capture_release_authorization_readback_gate,
|
||||
)
|
||||
from src.services.ai_agent_result_capture_post_release_verifier_rollback_gate import (
|
||||
load_latest_ai_agent_result_capture_post_release_verifier_rollback_gate,
|
||||
)
|
||||
@@ -2153,6 +2156,37 @@ async def get_agent_result_capture_release_authorization_hold() -> dict[str, Any
|
||||
) from exc
|
||||
|
||||
|
||||
@router.get(
|
||||
"/agent-result-capture-release-authorization-readback-gate",
|
||||
response_model=dict[str, Any],
|
||||
summary="取得 AI Agent result capture release authorization readback gate",
|
||||
description=(
|
||||
"讀取最新已提交的 P2-135 release authorization readback gate;"
|
||||
"此端點只回傳 release authorization readback、rollback release readback、maintenance window readback hold、"
|
||||
"live apply release readback hold、blocked release readback transition 與 operator handoff,"
|
||||
"不授權 owner release、不批准 maintenance window、不確認 rollback owner、不通過 release authorization、"
|
||||
"不釋放 rollback release 或 live apply、不套用 writer、不寫 receipt、不寫 result capture、learning、PlayBook trust、"
|
||||
"reviewer queue、Gateway queue,不送 Telegram、不呼叫 Bot API、不讀 secret。"
|
||||
),
|
||||
)
|
||||
async def get_agent_result_capture_release_authorization_readback_gate() -> dict[str, Any]:
|
||||
"""Return the latest read-only release authorization readback gate."""
|
||||
try:
|
||||
payload = await asyncio.to_thread(load_latest_ai_agent_result_capture_release_authorization_readback_gate)
|
||||
return redact_public_lan_topology(payload)
|
||||
except FileNotFoundError as exc:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail=str(exc),
|
||||
) from exc
|
||||
except (json.JSONDecodeError, ValueError) as exc:
|
||||
logger.error("ai_agent_result_capture_release_authorization_readback_gate_invalid", error=str(exc))
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail="AI Agent result capture release authorization readback gate 無效",
|
||||
) from exc
|
||||
|
||||
|
||||
@router.get(
|
||||
"/agent-owner-approved-fixture-dry-run",
|
||||
response_model=dict[str, Any],
|
||||
|
||||
@@ -0,0 +1,438 @@
|
||||
"""
|
||||
AI Agent result capture release authorization readback gate snapshot.
|
||||
|
||||
Loads the latest committed P2-135 release authorization readback gate.
|
||||
This module validates committed evidence only; it never authorizes owner
|
||||
release, approves maintenance windows, confirms rollback owners, passes
|
||||
release authorization, releases live apply, applies writers, writes receipts,
|
||||
writes result captures, writes learning records, updates PlayBook trust, writes
|
||||
reviewer / Gateway queues, sends Telegram messages, reads secrets, or performs
|
||||
destructive operations.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import re
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from src.services.snapshot_paths import default_evaluations_dir
|
||||
|
||||
_DEFAULT_EVALUATIONS_DIR = default_evaluations_dir(Path(__file__))
|
||||
_SNAPSHOT_PATTERN = "ai_agent_result_capture_release_authorization_readback_gate_*.json"
|
||||
_SCHEMA_VERSION = "ai_agent_result_capture_release_authorization_readback_gate_v1"
|
||||
_RUNTIME_AUTHORITY = "result_capture_release_authorization_readback_gate_only_no_live_write"
|
||||
|
||||
|
||||
def load_latest_ai_agent_result_capture_release_authorization_readback_gate(
|
||||
evaluations_dir: Path | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Load the newest committed release authorization readback gate."""
|
||||
directory = evaluations_dir or _DEFAULT_EVALUATIONS_DIR
|
||||
candidates = sorted(directory.glob(_SNAPSHOT_PATTERN))
|
||||
if not candidates:
|
||||
raise FileNotFoundError(f"no AI Agent result capture release authorization readback gate snapshots found in {directory}")
|
||||
|
||||
latest = candidates[-1]
|
||||
with latest.open(encoding="utf-8") as handle:
|
||||
payload = json.load(handle)
|
||||
|
||||
if not isinstance(payload, dict):
|
||||
raise ValueError(f"{latest}: expected JSON object")
|
||||
|
||||
label = str(latest)
|
||||
_require_schema(payload, label)
|
||||
_require_prior(payload, label)
|
||||
_require_truth(payload, label)
|
||||
_require_authorization_readbacks(payload, label)
|
||||
_require_rollback_readbacks(payload, label)
|
||||
_require_maintenance_holds(payload, label)
|
||||
_require_live_apply_holds(payload, label)
|
||||
_require_blocked_transitions(payload, label)
|
||||
_require_actions(payload, label)
|
||||
_require_display_redaction(payload, label)
|
||||
_require_no_forbidden_display_terms(payload, label)
|
||||
_require_rollup_consistency(payload, label)
|
||||
return payload
|
||||
|
||||
|
||||
def _require_schema(payload: dict[str, Any], label: str) -> None:
|
||||
if payload.get("schema_version") != _SCHEMA_VERSION:
|
||||
raise ValueError(f"{label}: expected schema_version={_SCHEMA_VERSION}")
|
||||
status = payload.get("program_status") or {}
|
||||
expected = {
|
||||
"current_priority": "P2",
|
||||
"current_task_id": "P2-135",
|
||||
"next_task_id": "P2-136",
|
||||
"read_only_mode": True,
|
||||
"runtime_authority": _RUNTIME_AUTHORITY,
|
||||
"overall_completion_percent": 100,
|
||||
}
|
||||
mismatches = _mismatches(status, expected)
|
||||
if mismatches:
|
||||
raise ValueError(f"{label}: program_status mismatch: {mismatches}")
|
||||
if not status.get("status_note"):
|
||||
raise ValueError(f"{label}: program_status.status_note is required")
|
||||
|
||||
|
||||
def _require_prior(payload: dict[str, Any], label: str) -> None:
|
||||
prior = payload.get("prior_release_authorization_hold") or {}
|
||||
expected = {
|
||||
"schema_version": "ai_agent_result_capture_release_authorization_hold_v1",
|
||||
"release_authorization_hold_count": 5,
|
||||
"rollback_authorization_hold_count": 5,
|
||||
"release_window_hold_count": 5,
|
||||
"live_apply_authorization_hold_count": 5,
|
||||
"blocked_authorization_transition_count": 6,
|
||||
"operator_action_count": 5,
|
||||
"approval_required_total": 8,
|
||||
"blocked_total": 9,
|
||||
"owner_release_authorized_count": 0,
|
||||
"owner_release_approved_count": 0,
|
||||
"maintenance_window_approved_count": 0,
|
||||
"rollback_owner_confirmed_count": 0,
|
||||
"post_release_verifier_ready_count": 0,
|
||||
"final_release_candidate_approved_count": 0,
|
||||
"final_release_candidate_pass_count": 0,
|
||||
"release_authorization_granted_count": 0,
|
||||
"release_authorization_pass_count": 0,
|
||||
"rollback_release_pass_count": 0,
|
||||
"live_apply_release_pass_count": 0,
|
||||
"writer_apply_count": 0,
|
||||
"execution_apply_count": 0,
|
||||
"receipt_write_count": 0,
|
||||
"result_capture_write_count": 0,
|
||||
"learning_write_count": 0,
|
||||
"playbook_trust_write_count": 0,
|
||||
"reviewer_queue_write_count": 0,
|
||||
"gateway_queue_write_count": 0,
|
||||
"telegram_send_count": 0,
|
||||
"bot_api_call_count": 0,
|
||||
"report_receipt_write_count": 0,
|
||||
}
|
||||
mismatches = _mismatches(prior, expected)
|
||||
if mismatches:
|
||||
raise ValueError(f"{label}: prior_release_authorization_hold mismatch: {mismatches}")
|
||||
if not prior.get("readiness_note"):
|
||||
raise ValueError(f"{label}: prior_release_authorization_hold.readiness_note is required")
|
||||
|
||||
|
||||
def _require_truth(payload: dict[str, Any], label: str) -> None:
|
||||
truth = payload.get("release_authorization_readback_truth") or {}
|
||||
required_true = {
|
||||
"p2_134_release_authorization_hold_loaded",
|
||||
"release_authorization_readback_ready",
|
||||
"rollback_release_readback_ready",
|
||||
"maintenance_window_readback_hold_active",
|
||||
"live_apply_release_readback_hold_active",
|
||||
"owner_authorization_review_still_required",
|
||||
"release_verifier_review_still_required",
|
||||
"rollback_owner_review_required",
|
||||
"redaction_review_required",
|
||||
"release_authorization_readback_only",
|
||||
}
|
||||
missing = sorted(field for field in required_true if truth.get(field) is not True)
|
||||
if missing:
|
||||
raise ValueError(f"{label}: release authorization readback flags must remain true: {missing}")
|
||||
|
||||
required_false = {
|
||||
"owner_release_authorized",
|
||||
"owner_release_approved",
|
||||
"maintenance_window_approved",
|
||||
"rollback_owner_confirmed",
|
||||
"post_release_verifier_ready",
|
||||
"final_release_candidate_approved",
|
||||
"final_release_candidate_passed",
|
||||
"release_authorization_granted",
|
||||
"release_authorization_passed",
|
||||
"rollback_release_passed",
|
||||
"live_apply_release_passed",
|
||||
"writer_apply_enabled",
|
||||
"execution_apply_enabled",
|
||||
"receipt_write_enabled",
|
||||
"reviewer_queue_write_enabled",
|
||||
"gateway_queue_write_enabled",
|
||||
"telegram_send_enabled",
|
||||
"bot_api_call_enabled",
|
||||
"report_receipt_write_enabled",
|
||||
"result_capture_write_enabled",
|
||||
"learning_write_enabled",
|
||||
"playbook_trust_write_enabled",
|
||||
"production_write_enabled",
|
||||
"secret_read_enabled",
|
||||
"destructive_operation_enabled",
|
||||
}
|
||||
unsafe = sorted(field for field in required_false if truth.get(field) is not False)
|
||||
if unsafe:
|
||||
raise ValueError(f"{label}: release authorization readback live/send/write flags must remain false: {unsafe}")
|
||||
|
||||
zero_counts = {
|
||||
"owner_release_authorized_count",
|
||||
"owner_release_approved_count",
|
||||
"maintenance_window_approved_count",
|
||||
"rollback_owner_confirmed_count",
|
||||
"post_release_verifier_ready_count",
|
||||
"final_release_candidate_approved_count",
|
||||
"final_release_candidate_pass_count",
|
||||
"release_authorization_granted_count",
|
||||
"release_authorization_pass_count",
|
||||
"rollback_release_pass_count",
|
||||
"live_apply_release_pass_count",
|
||||
"writer_apply_count",
|
||||
"execution_apply_count",
|
||||
"receipt_write_count",
|
||||
"reviewer_queue_write_count",
|
||||
"gateway_queue_write_count",
|
||||
"telegram_send_count",
|
||||
"bot_api_call_count",
|
||||
"report_receipt_write_count",
|
||||
"result_capture_write_count",
|
||||
"learning_write_count",
|
||||
"playbook_trust_write_count",
|
||||
"production_write_count",
|
||||
"secret_read_count",
|
||||
"destructive_operation_count",
|
||||
}
|
||||
non_zero = sorted(field for field in zero_counts if truth.get(field) != 0)
|
||||
if non_zero:
|
||||
raise ValueError(f"{label}: release authorization readback live counters must remain zero: {non_zero}")
|
||||
if not truth.get("truth_note"):
|
||||
raise ValueError(f"{label}: release_authorization_readback_truth.truth_note is required")
|
||||
|
||||
|
||||
def _require_authorization_readbacks(payload: dict[str, Any], label: str) -> None:
|
||||
items = payload.get("release_authorization_readbacks") or []
|
||||
_require_ids(
|
||||
items,
|
||||
"readback_id",
|
||||
{
|
||||
"release_authorization_readback_result_capture",
|
||||
"release_authorization_readback_learning",
|
||||
"release_authorization_readback_playbook_trust",
|
||||
"release_authorization_readback_reviewer_queue",
|
||||
"release_authorization_readback_gateway_queue",
|
||||
},
|
||||
label,
|
||||
"release authorization readbacks",
|
||||
)
|
||||
for item in items:
|
||||
item_id = item.get("readback_id")
|
||||
if item.get("readback_mode") != "release_authorization_readback_gate":
|
||||
raise ValueError(f"{label}: release authorization readback {item_id} mode is invalid")
|
||||
if item.get("owner_release_authorized") is not False:
|
||||
raise ValueError(f"{label}: release authorization readback {item_id} must stay unauthorized")
|
||||
if item.get("release_authorization_granted") is not False or item.get("release_authorization_passed") is not False:
|
||||
raise ValueError(f"{label}: release authorization readback {item_id} must stay ungranted and unpassed")
|
||||
_require_valid_status(item, label, f"release authorization readback {item_id}")
|
||||
if not item.get("readback_summary") or not _is_redacted_sha256(item.get("evidence_hash")):
|
||||
raise ValueError(f"{label}: release authorization readback {item_id} must include summary and redacted evidence_hash")
|
||||
|
||||
|
||||
def _require_rollback_readbacks(payload: dict[str, Any], label: str) -> None:
|
||||
items = payload.get("rollback_release_readbacks") or []
|
||||
_require_count(items, 5, label, "rollback release readbacks")
|
||||
for item in items:
|
||||
item_id = item.get("readback_id")
|
||||
if item.get("readback_mode") != "rollback_release_readback_gate":
|
||||
raise ValueError(f"{label}: rollback release readback {item_id} mode is invalid")
|
||||
if item.get("rollback_owner_required") is not True:
|
||||
raise ValueError(f"{label}: rollback release readback {item_id} must require rollback owner")
|
||||
if item.get("rollback_owner_confirmed") is not False or item.get("rollback_release_passed") is not False or item.get("rollback_release_enabled") is not False:
|
||||
raise ValueError(f"{label}: rollback release readback {item_id} must stay unconfirmed, unpassed, and disabled")
|
||||
_require_valid_status(item, label, f"rollback release readback {item_id}")
|
||||
if not item.get("readback_summary"):
|
||||
raise ValueError(f"{label}: rollback release readback {item_id}.readback_summary is required")
|
||||
|
||||
|
||||
def _require_maintenance_holds(payload: dict[str, Any], label: str) -> None:
|
||||
items = payload.get("maintenance_window_readback_holds") or []
|
||||
_require_count(items, 5, label, "maintenance window readback holds")
|
||||
for item in items:
|
||||
item_id = item.get("hold_id")
|
||||
if item.get("hold_mode") != "maintenance_window_readback_hold":
|
||||
raise ValueError(f"{label}: maintenance window readback hold {item_id} mode is invalid")
|
||||
if item.get("owner_release_authorized") is not False or item.get("maintenance_window_approved") is not False:
|
||||
raise ValueError(f"{label}: maintenance window readback hold {item_id} must keep owner release and maintenance unapproved")
|
||||
if item.get("final_release_candidate_passed") is not False:
|
||||
raise ValueError(f"{label}: maintenance window readback hold {item_id} must keep final candidate unpassed")
|
||||
_require_valid_status(item, label, f"maintenance window readback hold {item_id}")
|
||||
if not item.get("hold_reason"):
|
||||
raise ValueError(f"{label}: maintenance window readback hold {item_id}.hold_reason is required")
|
||||
|
||||
|
||||
def _require_live_apply_holds(payload: dict[str, Any], label: str) -> None:
|
||||
items = payload.get("live_apply_release_readback_holds") or []
|
||||
_require_count(items, 5, label, "live apply release readback holds")
|
||||
for item in items:
|
||||
item_id = item.get("hold_id")
|
||||
if item.get("hold_mode") != "live_apply_release_readback_hold":
|
||||
raise ValueError(f"{label}: live apply release readback hold {item_id} mode is invalid")
|
||||
if item.get("release_authorization_granted") is not False or item.get("release_authorization_passed") is not False:
|
||||
raise ValueError(f"{label}: live apply release readback hold {item_id} must stay ungranted and unpassed")
|
||||
if item.get("live_apply_release_passed") is not False or item.get("live_apply_release_enabled") is not False:
|
||||
raise ValueError(f"{label}: live apply release readback hold {item_id} must stay unpassed and disabled")
|
||||
_require_valid_status(item, label, f"live apply release readback hold {item_id}")
|
||||
if not item.get("release_condition"):
|
||||
raise ValueError(f"{label}: live apply release readback hold {item_id}.release_condition is required")
|
||||
|
||||
|
||||
def _require_blocked_transitions(payload: dict[str, Any], label: str) -> None:
|
||||
items = payload.get("blocked_release_readback_transitions") or []
|
||||
_require_count(items, 6, label, "blocked release readback transitions")
|
||||
critical_count = 0
|
||||
for item in items:
|
||||
item_id = item.get("blocker_id")
|
||||
if item.get("severity") == "critical":
|
||||
critical_count += 1
|
||||
if item.get("status") not in {"approval_required", "blocked_by_policy"}:
|
||||
raise ValueError(f"{label}: blocked release readback transition {item_id} status is invalid")
|
||||
if not item.get("blocked_action") or not item.get("blocked_until") or not _is_redacted_sha256(item.get("evidence_hash")):
|
||||
raise ValueError(f"{label}: blocked release readback transition {item_id} must include blocked action, until, and evidence hash")
|
||||
if critical_count != 5:
|
||||
raise ValueError(f"{label}: expected 5 critical release readback blockers, got {critical_count}")
|
||||
|
||||
|
||||
def _require_actions(payload: dict[str, Any], label: str) -> None:
|
||||
items = payload.get("operator_actions") or []
|
||||
_require_count(items, 5, label, "operator actions")
|
||||
for item in items:
|
||||
item_id = item.get("action_id")
|
||||
if item.get("runtime_write_allowed") is not False:
|
||||
raise ValueError(f"{label}: operator action {item_id} must not allow runtime writes")
|
||||
if item.get("status") not in {"ready_for_operator_review", "approval_required"}:
|
||||
raise ValueError(f"{label}: operator action {item_id} status is invalid")
|
||||
if not item.get("operator_instruction"):
|
||||
raise ValueError(f"{label}: operator action {item_id}.operator_instruction is required")
|
||||
|
||||
|
||||
def _require_display_redaction(payload: dict[str, Any], label: str) -> None:
|
||||
contract = payload.get("display_redaction_contract") or {}
|
||||
expected = {
|
||||
"redaction_required": True,
|
||||
"raw_prompt_display_allowed": False,
|
||||
"private_reasoning_display_allowed": False,
|
||||
"secret_value_display_allowed": False,
|
||||
"raw_runtime_payload_display_allowed": False,
|
||||
"internal_collaboration_content_display_allowed": False,
|
||||
}
|
||||
mismatches = _mismatches(contract, expected)
|
||||
if mismatches:
|
||||
raise ValueError(f"{label}: display_redaction_contract mismatch: {mismatches}")
|
||||
if not contract.get("frontend_display_policy"):
|
||||
raise ValueError(f"{label}: display_redaction_contract.frontend_display_policy is required")
|
||||
|
||||
|
||||
def _require_no_forbidden_display_terms(payload: dict[str, Any], label: str) -> None:
|
||||
forbidden_terms = {
|
||||
"work_window_transcript",
|
||||
"session_id",
|
||||
"browser_context",
|
||||
"authorization_header",
|
||||
"raw Telegram payload",
|
||||
"private reasoning",
|
||||
"raw prompt",
|
||||
"chain-of-thought",
|
||||
"批准!繼續",
|
||||
"My request for Codex",
|
||||
"In app browser",
|
||||
}
|
||||
display_blob = json.dumps(
|
||||
{
|
||||
"program_status": payload.get("program_status"),
|
||||
"release_authorization_readbacks": payload.get("release_authorization_readbacks"),
|
||||
"rollback_release_readbacks": payload.get("rollback_release_readbacks"),
|
||||
"maintenance_window_readback_holds": payload.get("maintenance_window_readback_holds"),
|
||||
"live_apply_release_readback_holds": payload.get("live_apply_release_readback_holds"),
|
||||
"operator_actions": payload.get("operator_actions"),
|
||||
"display_redaction_contract": payload.get("display_redaction_contract"),
|
||||
},
|
||||
ensure_ascii=False,
|
||||
)
|
||||
leaked = sorted(term for term in forbidden_terms if term in display_blob)
|
||||
if leaked:
|
||||
raise ValueError(f"{label}: forbidden display terms leaked: {leaked}")
|
||||
|
||||
|
||||
def _require_rollup_consistency(payload: dict[str, Any], label: str) -> None:
|
||||
rollups = payload.get("rollups") or {}
|
||||
expected = {
|
||||
"release_authorization_readback_count": 5,
|
||||
"rollback_release_readback_count": 5,
|
||||
"maintenance_window_readback_hold_count": 5,
|
||||
"live_apply_release_readback_hold_count": 5,
|
||||
"blocked_release_readback_transition_count": 6,
|
||||
"operator_action_count": 5,
|
||||
"approval_required_authorization_readback_count": 2,
|
||||
"blocked_authorization_readback_count": 1,
|
||||
"approval_required_rollback_readback_count": 2,
|
||||
"blocked_rollback_readback_count": 1,
|
||||
"approval_required_maintenance_window_count": 2,
|
||||
"blocked_maintenance_window_count": 1,
|
||||
"approval_required_live_apply_readback_count": 2,
|
||||
"blocked_live_apply_readback_count": 1,
|
||||
"critical_blocker_count": 5,
|
||||
"owner_release_authorized_count": 0,
|
||||
"owner_release_approved_count": 0,
|
||||
"maintenance_window_approved_count": 0,
|
||||
"rollback_owner_confirmed_count": 0,
|
||||
"post_release_verifier_ready_count": 0,
|
||||
"final_release_candidate_approved_count": 0,
|
||||
"final_release_candidate_pass_count": 0,
|
||||
"release_authorization_granted_count": 0,
|
||||
"release_authorization_pass_count": 0,
|
||||
"rollback_release_pass_count": 0,
|
||||
"live_apply_release_pass_count": 0,
|
||||
"writer_apply_count": 0,
|
||||
"execution_apply_count": 0,
|
||||
"receipt_write_count": 0,
|
||||
"reviewer_queue_write_count": 0,
|
||||
"gateway_queue_write_count": 0,
|
||||
"telegram_send_count": 0,
|
||||
"bot_api_call_count": 0,
|
||||
"report_receipt_write_count": 0,
|
||||
"result_capture_write_count": 0,
|
||||
"learning_write_count": 0,
|
||||
"playbook_trust_write_count": 0,
|
||||
"production_write_count": 0,
|
||||
"secret_read_count": 0,
|
||||
"destructive_operation_count": 0,
|
||||
}
|
||||
mismatches = _mismatches(rollups, expected)
|
||||
if mismatches:
|
||||
raise ValueError(f"{label}: rollups mismatch: {mismatches}")
|
||||
|
||||
|
||||
def _require_valid_status(item: dict[str, Any], label: str, item_label: str) -> None:
|
||||
if item.get("status") not in {"ready_for_owner_review", "approval_required", "blocked_by_policy"}:
|
||||
raise ValueError(f"{label}: {item_label} status is invalid")
|
||||
|
||||
|
||||
def _require_ids(
|
||||
items: list[dict[str, Any]],
|
||||
id_field: str,
|
||||
expected_ids: set[str],
|
||||
label: str,
|
||||
item_label: str,
|
||||
) -> None:
|
||||
actual_ids = {str(item.get(id_field)) for item in items}
|
||||
if actual_ids != expected_ids:
|
||||
raise ValueError(f"{label}: {item_label} ids mismatch: expected={sorted(expected_ids)} actual={sorted(actual_ids)}")
|
||||
|
||||
|
||||
def _require_count(items: list[dict[str, Any]], expected_count: int, label: str, item_label: str) -> None:
|
||||
if len(items) != expected_count:
|
||||
raise ValueError(f"{label}: expected {expected_count} {item_label}, got {len(items)}")
|
||||
|
||||
|
||||
def _mismatches(data: dict[str, Any], expected: dict[str, Any]) -> dict[str, dict[str, Any]]:
|
||||
mismatches: dict[str, dict[str, Any]] = {}
|
||||
for key, expected_value in expected.items():
|
||||
actual_value = data.get(key)
|
||||
if actual_value != expected_value:
|
||||
mismatches[key] = {"expected": expected_value, "actual": actual_value}
|
||||
return mismatches
|
||||
|
||||
|
||||
def _is_redacted_sha256(value: object) -> bool:
|
||||
return isinstance(value, str) and re.fullmatch(r"sha256:[0-9a-f]{64}", value) is not None
|
||||
Reference in New Issue
Block a user