feat(governance): 新增 report live delivery 批准包
All checks were successful
Code Review / ai-code-review (push) Successful in 13s
CD Pipeline / tests (push) Successful in 1m25s
CD Pipeline / build-and-deploy (push) Successful in 4m46s
CD Pipeline / post-deploy-checks (push) Successful in 1m32s

This commit is contained in:
Your Name
2026-06-13 16:15:18 +08:00
parent 060bcc0cb9
commit ab242018b1
14 changed files with 2543 additions and 10 deletions

View File

@@ -109,6 +109,9 @@ from src.services.ai_agent_runtime_readback_approval_package import (
from src.services.ai_agent_runtime_readback_implementation_review import (
load_latest_ai_agent_runtime_readback_implementation_review,
)
from src.services.ai_agent_report_live_delivery_approval_package import (
load_latest_ai_agent_report_live_delivery_approval_package,
)
from src.services.ai_agent_report_automation_review import (
load_latest_ai_agent_report_automation_review,
)
@@ -1359,6 +1362,36 @@ async def get_agent_runtime_readback_implementation_review() -> dict[str, Any]:
) from exc
@router.get(
"/agent-report-live-delivery-approval-package",
response_model=dict[str, Any],
summary="取得 AI Agent report live delivery 批准包",
description=(
"讀取最新已提交的 P2-111 report live delivery approval package"
"此端點只回傳日報、週報、月報、失敗限定摘要與讀報回執的實發批准包、"
"route lock、payload redaction、no-send receipt 與 operator action"
"不排程、不寫 Gateway queue、不送 Telegram、不呼叫 Bot API、不寫 report receipt、"
"不啟動 AI analysis、不做中低風險自動優化、不寫 production target、不讀 secret。"
),
)
async def get_agent_report_live_delivery_approval_package() -> dict[str, Any]:
"""Return the latest read-only report live delivery approval package."""
try:
payload = await asyncio.to_thread(load_latest_ai_agent_report_live_delivery_approval_package)
return redact_public_lan_topology(payload)
except FileNotFoundError as exc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=str(exc),
) from exc
except (json.JSONDecodeError, ValueError) as exc:
logger.error("ai_agent_report_live_delivery_approval_package_invalid", error=str(exc))
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="AI Agent report live delivery approval package 無效",
) from exc
@router.get(
"/agent-owner-approved-fixture-dry-run",
response_model=dict[str, Any],

View File

@@ -0,0 +1,418 @@
"""
AI Agent report live delivery approval package snapshot.
Loads the latest committed P2-111 report delivery approval package. This module
validates committed evidence only; it never schedules report delivery, writes
Gateway queues, sends Telegram messages, calls Bot API, writes read receipts,
starts AI analysis workers, writes production optimization results, reads
secrets, or runs destructive operations.
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
from src.services.snapshot_paths import default_evaluations_dir
_DEFAULT_EVALUATIONS_DIR = default_evaluations_dir(Path(__file__))
_SNAPSHOT_PATTERN = "ai_agent_report_live_delivery_approval_package_*.json"
_SCHEMA_VERSION = "ai_agent_report_live_delivery_approval_package_v1"
_RUNTIME_AUTHORITY = "report_live_delivery_approval_package_only_no_live_send_or_write"
def load_latest_ai_agent_report_live_delivery_approval_package(
evaluations_dir: Path | None = None,
) -> dict[str, Any]:
"""Load the newest committed report live delivery approval package."""
directory = evaluations_dir or _DEFAULT_EVALUATIONS_DIR
candidates = sorted(directory.glob(_SNAPSHOT_PATTERN))
if not candidates:
raise FileNotFoundError(f"no AI Agent report live delivery approval package snapshots found in {directory}")
latest = candidates[-1]
with latest.open(encoding="utf-8") as handle:
payload = json.load(handle)
if not isinstance(payload, dict):
raise ValueError(f"{latest}: expected JSON object")
_require_schema(payload, str(latest))
_require_prior_report_status_board(payload, str(latest))
_require_prior_runtime_review(payload, str(latest))
_require_delivery_truth(payload, str(latest))
_require_delivery_packets(payload, str(latest))
_require_route_lock_gates(payload, str(latest))
_require_payload_redaction_checks(payload, str(latest))
_require_dry_run_receipts(payload, str(latest))
_require_operator_actions(payload, str(latest))
_require_display_redaction(payload, str(latest))
_require_no_forbidden_display_terms(payload, str(latest))
_require_rollup_consistency(payload, str(latest))
return payload
def _require_schema(payload: dict[str, Any], label: str) -> None:
if payload.get("schema_version") != _SCHEMA_VERSION:
raise ValueError(f"{label}: expected schema_version={_SCHEMA_VERSION}")
status = payload.get("program_status") or {}
if status.get("read_only_mode") is not True:
raise ValueError(f"{label}: program_status.read_only_mode must be true")
if status.get("runtime_authority") != _RUNTIME_AUTHORITY:
raise ValueError(f"{label}: runtime_authority must remain {_RUNTIME_AUTHORITY}")
if status.get("current_task_id") != "P2-111":
raise ValueError(f"{label}: current_task_id must be P2-111")
if status.get("next_task_id") != "P2-112":
raise ValueError(f"{label}: next_task_id must be P2-112")
if status.get("overall_completion_percent") != 100:
raise ValueError(f"{label}: P2-111 approval package must be 100 percent complete")
def _require_prior_report_status_board(payload: dict[str, Any], label: str) -> None:
prior = payload.get("prior_report_status_board") or {}
expected = {
"source_schema_version": "ai_agent_report_status_board_v1",
"report_card_count": 3,
"agent_status_report_count": 3,
"visible_chart_count": 3,
"operator_answer_count": 4,
"work_units_total": 91,
"work_units_done": 79,
"work_units_waiting_approval": 12,
"live_delivery_count": 0,
"live_telegram_send_count_24h": 0,
"live_auto_optimization_count_24h": 0,
}
mismatches = _mismatches(prior, expected)
if mismatches:
raise ValueError(f"{label}: P2-108 prior report status counts mismatch: {mismatches}")
if not prior.get("readiness_note"):
raise ValueError(f"{label}: prior_report_status_board.readiness_note is required")
def _require_prior_runtime_review(payload: dict[str, Any], label: str) -> None:
prior = payload.get("prior_runtime_review") or {}
expected = {
"approval_package_schema_version": "ai_agent_runtime_readback_approval_package_v1",
"implementation_review_schema_version": "ai_agent_runtime_readback_implementation_review_v1",
"telegram_failure_receipt_gate_count": 4,
"implementation_blocker_count": 5,
"no_write_verifier_check_count": 5,
"owner_approval_received_count": 0,
"runtime_readback_execution_count": 0,
"live_query_count": 0,
"telegram_failure_receipt_send_count": 0,
"bot_api_call_count": 0,
"gateway_queue_write_count": 0,
"production_write_count": 0,
}
mismatches = _mismatches(prior, expected)
if mismatches:
raise ValueError(f"{label}: P2-109/P2-110 prior runtime counts mismatch: {mismatches}")
if not prior.get("readiness_note"):
raise ValueError(f"{label}: prior_runtime_review.readiness_note is required")
def _require_delivery_truth(payload: dict[str, Any], label: str) -> None:
truth = payload.get("delivery_approval_truth") or {}
required_true = {
"p2_108_report_status_loaded",
"p2_109_failure_receipt_gate_loaded",
"p2_110_implementation_review_loaded",
"delivery_approval_package_ready",
"daily_delivery_package_ready",
"weekly_delivery_package_ready",
"monthly_delivery_package_ready",
"sre_war_room_route_locked",
"payload_redaction_ready",
"dry_run_receipt_ready",
"owner_review_required_before_delivery",
}
missing = sorted(field for field in required_true if truth.get(field) is not True)
if missing:
raise ValueError(f"{label}: delivery approval ready flags must remain true: {missing}")
required_false = {
"scheduler_enabled",
"gateway_queue_write_enabled",
"telegram_send_enabled",
"bot_api_call_enabled",
"report_receipt_write_enabled",
"ai_analysis_run_enabled",
"medium_low_auto_optimization_enabled",
"production_write_enabled",
"secret_read_enabled",
"destructive_operation_enabled",
}
unsafe = sorted(field for field in required_false if truth.get(field) is not False)
if unsafe:
raise ValueError(f"{label}: live send/write flags must remain false: {unsafe}")
zero_counts = {
"owner_approval_received_count",
"scheduled_delivery_count_24h",
"gateway_queue_write_count_24h",
"telegram_send_count_24h",
"bot_api_call_count_24h",
"report_receipt_write_count_24h",
"ai_analysis_run_count_24h",
"auto_optimization_count_24h",
"production_write_count_24h",
"secret_read_count_24h",
"destructive_operation_count_24h",
}
non_zero = sorted(field for field in zero_counts if truth.get(field) != 0)
if non_zero:
raise ValueError(f"{label}: report delivery live counters must remain zero: {non_zero}")
if not truth.get("truth_note"):
raise ValueError(f"{label}: delivery_approval_truth.truth_note is required")
def _require_delivery_packets(payload: dict[str, Any], label: str) -> None:
packets = payload.get("delivery_approval_packets") or []
packet_ids = {packet.get("packet_id") for packet in packets}
required = {
"daily_report_delivery_approval",
"weekly_report_delivery_approval",
"monthly_report_delivery_approval",
"failure_only_digest_approval",
"report_receipt_readback_approval",
}
if packet_ids != required:
raise ValueError(f"{label}: delivery approval packets must match {sorted(required)}")
valid_statuses = {"approval_required", "ready_for_owner_review", "blocked_by_policy"}
valid_risks = {"medium", "high", "critical"}
for packet in packets:
packet_id = packet.get("packet_id")
if packet.get("status") not in valid_statuses:
raise ValueError(f"{label}: packet {packet_id} status is invalid")
if packet.get("risk_tier") not in valid_risks:
raise ValueError(f"{label}: packet {packet_id} risk_tier is invalid")
if packet.get("approval_required") is not True:
raise ValueError(f"{label}: packet {packet_id} approval_required must remain true")
if packet.get("no_send_mode") is not True:
raise ValueError(f"{label}: packet {packet_id} no_send_mode must remain true")
if not packet.get("required_approval_fields") or not packet.get("blocked_runtime_actions"):
raise ValueError(f"{label}: packet {packet_id} must list approval fields and blocked actions")
if not packet.get("operator_guidance"):
raise ValueError(f"{label}: packet {packet_id} must include operator guidance")
if not _is_redacted_sha256(packet.get("evidence_hash")):
raise ValueError(f"{label}: packet {packet_id} must expose evidence_hash")
def _require_route_lock_gates(payload: dict[str, Any], label: str) -> None:
gates = payload.get("route_lock_gates") or []
gate_ids = {gate.get("gate_id") for gate in gates}
required = {
"sre_war_room_route_lock",
"legacy_bot_suppression",
"dedupe_fingerprint_lock",
"delivery_window_gate",
}
if gate_ids != required:
raise ValueError(f"{label}: route lock gates must match {sorted(required)}")
for gate in gates:
gate_id = gate.get("gate_id")
if gate.get("status") not in {"ready_for_owner_review", "approval_required", "blocked_by_policy"}:
raise ValueError(f"{label}: route gate {gate_id} status is invalid")
if not gate.get("required_evidence") or not gate.get("blocked_routes"):
raise ValueError(f"{label}: route gate {gate_id} must list evidence and blocked routes")
for field in ("telegram_send_enabled", "bot_api_call_enabled", "gateway_queue_write_enabled"):
if gate.get(field) is not False:
raise ValueError(f"{label}: route gate {gate_id} {field} must remain false")
if not _is_redacted_sha256(gate.get("evidence_hash")):
raise ValueError(f"{label}: route gate {gate_id} must expose evidence_hash")
def _require_payload_redaction_checks(payload: dict[str, Any], label: str) -> None:
checks = payload.get("payload_redaction_checks") or []
check_ids = {check.get("check_id") for check in checks}
required = {
"no_raw_prompt",
"no_private_reasoning",
"no_secret_values",
"no_raw_telegram_payload",
"no_internal_collaboration_content",
}
if check_ids != required:
raise ValueError(f"{label}: payload redaction checks must match {sorted(required)}")
for check in checks:
check_id = check.get("check_id")
if check.get("status") not in {"ready", "blocked_by_policy"}:
raise ValueError(f"{label}: redaction check {check_id} status is invalid")
if check.get("display_allowed") is not False:
raise ValueError(f"{label}: redaction check {check_id} display_allowed must remain false")
if not check.get("required_rule") or not check.get("failure_if_missing"):
raise ValueError(f"{label}: redaction check {check_id} must include rule and failure explanation")
if not _is_redacted_sha256(check.get("evidence_hash")):
raise ValueError(f"{label}: redaction check {check_id} must expose evidence_hash")
def _require_dry_run_receipts(payload: dict[str, Any], label: str) -> None:
receipts = payload.get("dry_run_delivery_receipts") or []
receipt_ids = {receipt.get("receipt_id") for receipt in receipts}
required = {
"daily_digest_no_send_receipt",
"weekly_digest_no_send_receipt",
"monthly_digest_no_send_receipt",
"failure_only_no_send_receipt",
}
if receipt_ids != required:
raise ValueError(f"{label}: dry-run receipts must match {sorted(required)}")
for receipt in receipts:
receipt_id = receipt.get("receipt_id")
if receipt.get("status") not in {"ready_for_owner_review", "blocked_by_policy"}:
raise ValueError(f"{label}: dry-run receipt {receipt_id} status is invalid")
if receipt.get("live_send_count") != 0:
raise ValueError(f"{label}: dry-run receipt {receipt_id} live_send_count must remain zero")
if receipt.get("receipt_write_allowed") is not False:
raise ValueError(f"{label}: dry-run receipt {receipt_id} receipt_write_allowed must remain false")
if not receipt.get("required_fields"):
raise ValueError(f"{label}: dry-run receipt {receipt_id} must list required fields")
if not _is_redacted_sha256(receipt.get("evidence_hash")):
raise ValueError(f"{label}: dry-run receipt {receipt_id} must expose evidence_hash")
def _require_operator_actions(payload: dict[str, Any], label: str) -> None:
actions = payload.get("operator_actions") or []
action_types = {action.get("action_type") for action in actions}
required = {
"review_delivery_packet",
"validate_sre_route",
"validate_payload_redaction",
"validate_zero_send_counters",
"reject_or_promote",
}
if action_types != required:
raise ValueError(f"{label}: operator actions must match {sorted(required)}")
for action in actions:
if action.get("live_send_allowed") is not False:
raise ValueError(f"{label}: operator action {action.get('action_id')} must not allow live send")
if not action.get("operator_instruction"):
raise ValueError(f"{label}: operator action {action.get('action_id')} must include instruction")
def _require_display_redaction(payload: dict[str, Any], label: str) -> None:
contract = payload.get("display_redaction_contract") or {}
if contract.get("redaction_required") is not True:
raise ValueError(f"{label}: display redaction must remain required")
required_false = {
"raw_prompt_display_allowed",
"private_reasoning_display_allowed",
"secret_value_display_allowed",
"raw_telegram_payload_display_allowed",
"internal_collaboration_content_display_allowed",
}
unsafe = sorted(field for field in required_false if contract.get(field) is not False)
if unsafe:
raise ValueError(f"{label}: display redaction fields must remain false: {unsafe}")
if not contract.get("frontend_display_policy"):
raise ValueError(f"{label}: frontend_display_policy is required")
def _require_no_forbidden_display_terms(payload: dict[str, Any], label: str) -> None:
forbidden_terms = {
"工作視窗",
"對話內容",
"批准!繼續",
"In app browser",
"My request for Codex",
"browser_context",
"codex_user_message",
"prompt_text",
"raw prompt",
"private reasoning",
"chain of thought",
"private_reasoning",
"chain_of_thought",
"authorization_header",
"authorization header",
"secret value",
"raw payload",
"raw Telegram payload",
"work window transcript",
"internal collaboration transcript",
}
technical_identifier_fields = {
"action_id",
"action_type",
"check_id",
"gate_id",
"packet_id",
"receipt_id",
"required_rule",
}
hits: list[str] = []
def walk(value: Any, path: str) -> None:
if isinstance(value, dict):
for key, nested in value.items():
walk(nested, f"{path}.{key}" if path else str(key))
return
if isinstance(value, list):
for index, nested in enumerate(value):
walk(nested, f"{path}[{index}]")
return
if isinstance(value, str):
field_name = path.rsplit(".", 1)[-1]
if field_name in technical_identifier_fields:
return
matched = sorted(term for term in forbidden_terms if term in value)
if matched:
hits.append(f"{path}: {', '.join(matched)}")
walk(payload, "")
if hits:
raise ValueError(f"{label}: forbidden display terms found: {hits}")
def _require_rollup_consistency(payload: dict[str, Any], label: str) -> None:
rollups = payload.get("rollups") or {}
truth = payload.get("delivery_approval_truth") or {}
packets = payload.get("delivery_approval_packets") or []
gates = payload.get("route_lock_gates") or []
checks = payload.get("payload_redaction_checks") or []
receipts = payload.get("dry_run_delivery_receipts") or []
actions = payload.get("operator_actions") or []
expected = {
"delivery_approval_packet_count": len(packets),
"route_lock_gate_count": len(gates),
"payload_redaction_check_count": len(checks),
"dry_run_delivery_receipt_count": len(receipts),
"operator_action_count": len(actions),
"approval_required_packet_count": sum(1 for packet in packets if packet.get("status") == "approval_required"),
"blocked_packet_count": sum(1 for packet in packets if packet.get("status") == "blocked_by_policy"),
"blocked_route_gate_count": sum(1 for gate in gates if gate.get("status") == "blocked_by_policy"),
"blocked_receipt_count": sum(1 for receipt in receipts if receipt.get("status") == "blocked_by_policy"),
"owner_approval_received_count": truth.get("owner_approval_received_count"),
"scheduled_delivery_count": truth.get("scheduled_delivery_count_24h"),
"gateway_queue_write_count": truth.get("gateway_queue_write_count_24h"),
"telegram_send_count": truth.get("telegram_send_count_24h"),
"bot_api_call_count": truth.get("bot_api_call_count_24h"),
"report_receipt_write_count": truth.get("report_receipt_write_count_24h"),
"ai_analysis_run_count": truth.get("ai_analysis_run_count_24h"),
"auto_optimization_count": truth.get("auto_optimization_count_24h"),
"production_write_count": truth.get("production_write_count_24h"),
"secret_read_count": truth.get("secret_read_count_24h"),
"destructive_operation_count": truth.get("destructive_operation_count_24h"),
}
mismatches = _mismatches(rollups, expected)
if mismatches:
raise ValueError(f"{label}: rollup counts mismatch: {mismatches}")
def _mismatches(payload: dict[str, Any], expected: dict[str, Any]) -> dict[str, dict[str, Any]]:
return {
key: {"expected": expected_value, "actual": payload.get(key)}
for key, expected_value in expected.items()
if payload.get(key) != expected_value
}
def _is_redacted_sha256(value: Any) -> bool:
if not isinstance(value, str):
return False
if not value.startswith("sha256:") or len(value) != 71:
return False
return all(char in "0123456789abcdef" for char in value.removeprefix("sha256:"))