feat(governance): 新增 owner-approved fixture promotion gate
All checks were successful
Code Review / ai-code-review (push) Successful in 13s
CD Pipeline / tests (push) Successful in 1m23s
CD Pipeline / build-and-deploy (push) Successful in 4m48s
CD Pipeline / post-deploy-checks (push) Successful in 1m29s

This commit is contained in:
Your Name
2026-06-13 18:36:21 +08:00
parent 79a3e1bd18
commit 8fcf767aad
10 changed files with 2060 additions and 1 deletions

View File

@@ -82,6 +82,9 @@ from src.services.ai_agent_matched_playbook_learning_gap import (
from src.services.ai_agent_owner_approved_fixture_dry_run import (
load_latest_ai_agent_owner_approved_fixture_dry_run,
)
from src.services.ai_agent_owner_approved_fixture_promotion_gate import (
load_latest_ai_agent_owner_approved_fixture_promotion_gate,
)
from src.services.ai_agent_owner_approved_learning_dry_run import (
load_latest_ai_agent_owner_approved_learning_dry_run,
)
@@ -1458,6 +1461,36 @@ async def get_agent_runtime_readback_promotion_gate() -> dict[str, Any]:
) from exc
@router.get(
"/agent-owner-approved-fixture-promotion-gate",
response_model=dict[str, Any],
summary="取得 AI Agent owner-approved fixture promotion gate",
description=(
"讀取最新已提交的 P2-114 owner-approved fixture promotion gate"
"此端點只回傳 owner approval packet、acceptance template、fixture review、"
"no-write verifier 與 blocked promotion不讀 canonical runtime target、不做 live query、"
"不寫 reviewer queue、不寫 Gateway queue、不送 Telegram、不呼叫 Bot API、"
"不寫 report receipt、不寫 result capture、不寫 learning / PlayBook trust、不讀 secret。"
),
)
async def get_agent_owner_approved_fixture_promotion_gate() -> dict[str, Any]:
"""Return the latest read-only owner-approved fixture promotion gate."""
try:
payload = await asyncio.to_thread(load_latest_ai_agent_owner_approved_fixture_promotion_gate)
return redact_public_lan_topology(payload)
except FileNotFoundError as exc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=str(exc),
) from exc
except (json.JSONDecodeError, ValueError) as exc:
logger.error("ai_agent_owner_approved_fixture_promotion_gate_invalid", error=str(exc))
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="AI Agent owner-approved fixture promotion gate 無效",
) from exc
@router.get(
"/agent-owner-approved-fixture-dry-run",
response_model=dict[str, Any],

View File

@@ -0,0 +1,399 @@
"""
AI Agent owner-approved fixture promotion gate snapshot.
Loads the latest committed P2-114 owner approval package. This module validates
committed evidence only; it never reads canonical runtime targets, performs live
queries, writes reviewer queues, writes result captures, writes Gateway queues,
sends Telegram messages, calls Bot API, reads secrets, or performs destructive
operations.
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
from src.services.snapshot_paths import default_evaluations_dir
_DEFAULT_EVALUATIONS_DIR = default_evaluations_dir(Path(__file__))
_SNAPSHOT_PATTERN = "ai_agent_owner_approved_fixture_promotion_gate_*.json"
_SCHEMA_VERSION = "ai_agent_owner_approved_fixture_promotion_gate_v1"
_RUNTIME_AUTHORITY = "owner_approved_fixture_promotion_gate_only_no_live_read_or_write"
def load_latest_ai_agent_owner_approved_fixture_promotion_gate(
evaluations_dir: Path | None = None,
) -> dict[str, Any]:
"""Load the newest committed owner-approved fixture promotion gate."""
directory = evaluations_dir or _DEFAULT_EVALUATIONS_DIR
candidates = sorted(directory.glob(_SNAPSHOT_PATTERN))
if not candidates:
raise FileNotFoundError(f"no AI Agent owner-approved fixture promotion gate snapshots found in {directory}")
latest = candidates[-1]
with latest.open(encoding="utf-8") as handle:
payload = json.load(handle)
if not isinstance(payload, dict):
raise ValueError(f"{latest}: expected JSON object")
label = str(latest)
_require_schema(payload, label)
_require_prior(payload, label)
_require_truth(payload, label)
_require_packets(payload, label)
_require_acceptance_templates(payload, label)
_require_fixture_reviews(payload, label)
_require_verifier_plans(payload, label)
_require_blocked_promotions(payload, label)
_require_actions(payload, label)
_require_display_redaction(payload, label)
_require_no_forbidden_display_terms(payload, label)
_require_rollup_consistency(payload, label)
return payload
def _require_schema(payload: dict[str, Any], label: str) -> None:
if payload.get("schema_version") != _SCHEMA_VERSION:
raise ValueError(f"{label}: expected schema_version={_SCHEMA_VERSION}")
status = payload.get("program_status") or {}
expected = {
"current_priority": "P2",
"current_task_id": "P2-114",
"next_task_id": "P2-115",
"read_only_mode": True,
"runtime_authority": _RUNTIME_AUTHORITY,
"overall_completion_percent": 100,
}
mismatches = _mismatches(status, expected)
if mismatches:
raise ValueError(f"{label}: program_status mismatch: {mismatches}")
if not status.get("status_note"):
raise ValueError(f"{label}: program_status.status_note is required")
def _require_prior(payload: dict[str, Any], label: str) -> None:
prior = payload.get("prior_promotion_gate") or {}
expected = {
"schema_version": "ai_agent_runtime_readback_promotion_gate_v1",
"promotion_lane_count": 5,
"receipt_contract_count": 4,
"reviewer_queue_preview_count": 4,
"result_capture_preview_count": 4,
"no_write_verifier_check_count": 5,
"blocker_mapping_count": 5,
"operator_action_count": 5,
"owner_approval_received_count": 0,
"promotion_execution_count": 0,
"canonical_runtime_target_read_count": 0,
"live_query_count": 0,
"production_write_count": 0,
}
mismatches = _mismatches(prior, expected)
if mismatches:
raise ValueError(f"{label}: prior_promotion_gate mismatch: {mismatches}")
if not prior.get("readiness_note"):
raise ValueError(f"{label}: prior_promotion_gate.readiness_note is required")
def _require_truth(payload: dict[str, Any], label: str) -> None:
truth = payload.get("owner_gate_truth") or {}
required_true = {
"p2_113_promotion_gate_loaded",
"owner_promotion_package_ready",
"acceptance_record_template_ready",
"reviewer_queue_fixture_ready",
"result_capture_fixture_ready",
"rollback_owner_required",
"verifier_plan_required",
}
missing = sorted(field for field in required_true if truth.get(field) is not True)
if missing:
raise ValueError(f"{label}: owner gate ready flags must remain true: {missing}")
if truth.get("owner_approval_received") is not False:
raise ValueError(f"{label}: owner approval must remain false before acceptance")
required_false = {
"canonical_runtime_target_read_enabled",
"live_query_enabled",
"failure_receipt_send_enabled",
"reviewer_queue_write_enabled",
"gateway_queue_write_enabled",
"telegram_send_enabled",
"bot_api_call_enabled",
"report_receipt_write_enabled",
"result_capture_write_enabled",
"learning_write_enabled",
"playbook_trust_write_enabled",
"production_write_enabled",
"secret_read_enabled",
"destructive_operation_enabled",
}
unsafe = sorted(field for field in required_false if truth.get(field) is not False)
if unsafe:
raise ValueError(f"{label}: live read/send/write flags must remain false: {unsafe}")
zero_counts = {
"owner_approval_received_count",
"owner_acceptance_record_write_count",
"promotion_execution_count",
"canonical_runtime_target_read_count",
"live_query_count",
"failure_receipt_send_count",
"reviewer_queue_write_count",
"gateway_queue_write_count",
"telegram_send_count",
"bot_api_call_count",
"report_receipt_write_count",
"result_capture_write_count",
"learning_write_count",
"playbook_trust_write_count",
"production_write_count",
}
non_zero = sorted(field for field in zero_counts if truth.get(field) != 0)
if non_zero:
raise ValueError(f"{label}: owner promotion live counters must remain zero: {non_zero}")
if not truth.get("truth_note"):
raise ValueError(f"{label}: owner_gate_truth.truth_note is required")
def _require_packets(payload: dict[str, Any], label: str) -> None:
packets = payload.get("owner_approval_packets") or []
required = {
"failure_receipt_owner_packet",
"reviewer_queue_owner_packet",
"result_capture_owner_packet",
"report_receipt_owner_packet",
"p2_115_scope_owner_packet",
}
packet_ids = {packet.get("packet_id") for packet in packets}
if packet_ids != required:
raise ValueError(f"{label}: owner approval packets must match {sorted(required)}")
for packet in packets:
packet_id = packet.get("packet_id")
if packet.get("owner_acceptance_required") is not True:
raise ValueError(f"{label}: packet {packet_id} must require owner acceptance")
if packet.get("status") not in {"ready_for_owner_review", "approval_required", "blocked_by_policy"}:
raise ValueError(f"{label}: packet {packet_id} status is invalid")
if packet.get("risk_tier") not in {"high", "critical"}:
raise ValueError(f"{label}: packet {packet_id} risk_tier is invalid")
if not packet.get("required_owner_fields") or not packet.get("blocked_runtime_actions"):
raise ValueError(f"{label}: packet {packet_id} must list owner fields and blocked actions")
if not _is_redacted_sha256(packet.get("evidence_hash")):
raise ValueError(f"{label}: packet {packet_id} must expose redacted evidence_hash")
def _require_acceptance_templates(payload: dict[str, Any], label: str) -> None:
templates = payload.get("acceptance_record_templates") or []
if len(templates) != 4:
raise ValueError(f"{label}: acceptance_record_templates must contain 4 items")
for template in templates:
template_id = template.get("template_id")
if template.get("accepted") is not False or template.get("record_write_enabled") is not False:
raise ValueError(f"{label}: template {template_id} must not be accepted or write-enabled")
if not template.get("required_fields"):
raise ValueError(f"{label}: template {template_id} required_fields is required")
if not _is_redacted_sha256(template.get("evidence_hash")):
raise ValueError(f"{label}: template {template_id} must expose redacted evidence_hash")
def _require_fixture_reviews(payload: dict[str, Any], label: str) -> None:
reviews = payload.get("fixture_promotion_reviews") or []
if len(reviews) != 4:
raise ValueError(f"{label}: fixture_promotion_reviews must contain 4 items")
for review in reviews:
review_id = review.get("review_id")
if review.get("runtime_write_enabled") is not False:
raise ValueError(f"{label}: review {review_id} must not enable runtime write")
if not review.get("source_packet_id") or not review.get("review_outcome"):
raise ValueError(f"{label}: review {review_id} source/outcome is required")
if not _is_redacted_sha256(review.get("evidence_hash")):
raise ValueError(f"{label}: review {review_id} must expose redacted evidence_hash")
def _require_verifier_plans(payload: dict[str, Any], label: str) -> None:
plans = payload.get("no_write_verifier_plans") or []
required = {
"no_telegram_send_verifier",
"no_reviewer_queue_write_verifier",
"no_result_capture_write_verifier",
"no_live_readback_verifier",
"no_secret_payload_verifier",
}
plan_ids = {plan.get("plan_id") for plan in plans}
if plan_ids != required:
raise ValueError(f"{label}: no-write verifier plans must match {sorted(required)}")
for plan in plans:
plan_id = plan.get("plan_id")
if plan.get("live_verifier_enabled") is not False:
raise ValueError(f"{label}: verifier plan {plan_id} must not enable live verifier")
if not plan.get("required_fixture") or not plan.get("failure_if_missing"):
raise ValueError(f"{label}: verifier plan {plan_id} must include fixture and failure text")
if not _is_redacted_sha256(plan.get("evidence_hash")):
raise ValueError(f"{label}: verifier plan {plan_id} must expose redacted evidence_hash")
def _require_blocked_promotions(payload: dict[str, Any], label: str) -> None:
blockers = payload.get("blocked_promotions") or []
required = {
"owner_acceptance_not_received",
"rollback_owner_missing",
"maintenance_window_missing",
"canonical_readback_scope_missing",
"secret_boundary_not_verified",
}
blocker_ids = {blocker.get("blocker_id") for blocker in blockers}
if blocker_ids != required:
raise ValueError(f"{label}: blocked promotions must match {sorted(required)}")
for blocker in blockers:
blocker_id = blocker.get("blocker_id")
if blocker.get("severity") not in {"high", "critical"}:
raise ValueError(f"{label}: blocker {blocker_id} severity is invalid")
if blocker.get("status") not in {"approval_required", "blocked_by_policy"}:
raise ValueError(f"{label}: blocker {blocker_id} status is invalid")
if not blocker.get("blocked_action") or not blocker.get("blocked_until"):
raise ValueError(f"{label}: blocker {blocker_id} blocked action/until is required")
if not _is_redacted_sha256(blocker.get("evidence_hash")):
raise ValueError(f"{label}: blocker {blocker_id} must expose redacted evidence_hash")
def _require_actions(payload: dict[str, Any], label: str) -> None:
actions = payload.get("operator_actions") or []
required = {
"review_owner_packets",
"verify_acceptance_templates",
"confirm_verifier_plans",
"lock_blocked_promotions",
"promote_to_p2_115",
}
action_ids = {action.get("action_id") for action in actions}
if action_ids != required:
raise ValueError(f"{label}: operator actions must match {sorted(required)}")
for action in actions:
action_id = action.get("action_id")
if action.get("runtime_promotion_allowed") is not False:
raise ValueError(f"{label}: action {action_id} must not allow runtime promotion")
if not action.get("operator_instruction"):
raise ValueError(f"{label}: action {action_id} operator_instruction is required")
def _require_display_redaction(payload: dict[str, Any], label: str) -> None:
contract = payload.get("display_redaction_contract") or {}
if contract.get("redaction_required") is not True:
raise ValueError(f"{label}: display redaction must be required")
false_fields = {
"raw_prompt_display_allowed",
"private_reasoning_display_allowed",
"secret_value_display_allowed",
"raw_runtime_payload_display_allowed",
"internal_collaboration_content_display_allowed",
}
unsafe = sorted(field for field in false_fields if contract.get(field) is not False)
if unsafe:
raise ValueError(f"{label}: display redaction flags must remain false: {unsafe}")
if not contract.get("frontend_display_policy"):
raise ValueError(f"{label}: frontend_display_policy is required")
def _require_no_forbidden_display_terms(payload: dict[str, Any], label: str) -> None:
serialized = json.dumps(payload, ensure_ascii=False).lower()
forbidden = {
"work_window_transcript",
"session_id",
"browser_context",
"authorization_header",
"raw telegram payload",
"private reasoning",
"raw prompt",
"chain-of-thought",
}
hits = sorted(term for term in forbidden if term in serialized)
if hits:
raise ValueError(f"{label}: forbidden display terms leaked: {hits}")
def _require_rollup_consistency(payload: dict[str, Any], label: str) -> None:
rollups = payload.get("rollups") or {}
expected_counts = {
"owner_approval_packet_count": len(payload.get("owner_approval_packets") or []),
"acceptance_record_template_count": len(payload.get("acceptance_record_templates") or []),
"fixture_promotion_review_count": len(payload.get("fixture_promotion_reviews") or []),
"no_write_verifier_plan_count": len(payload.get("no_write_verifier_plans") or []),
"blocked_promotion_count": len(payload.get("blocked_promotions") or []),
"operator_action_count": len(payload.get("operator_actions") or []),
"approval_required_packet_count": sum(
1 for packet in payload.get("owner_approval_packets") or [] if packet.get("status") == "approval_required"
),
"blocked_packet_count": sum(
1 for packet in payload.get("owner_approval_packets") or [] if packet.get("status") == "blocked_by_policy"
),
"approval_required_template_count": sum(
1
for template in payload.get("acceptance_record_templates") or []
if template.get("status") == "approval_required"
),
"blocked_template_count": sum(
1
for template in payload.get("acceptance_record_templates") or []
if template.get("status") == "blocked_by_policy"
),
"approval_required_review_count": sum(
1 for review in payload.get("fixture_promotion_reviews") or [] if review.get("status") == "approval_required"
),
"blocked_review_count": sum(
1 for review in payload.get("fixture_promotion_reviews") or [] if review.get("status") == "blocked_by_policy"
),
"approval_required_verifier_count": sum(
1 for plan in payload.get("no_write_verifier_plans") or [] if plan.get("status") == "approval_required"
),
"blocked_verifier_count": sum(
1 for plan in payload.get("no_write_verifier_plans") or [] if plan.get("status") == "blocked_by_policy"
),
"critical_blocker_count": sum(
1 for blocker in payload.get("blocked_promotions") or [] if blocker.get("severity") == "critical"
),
}
mismatches = _mismatches(rollups, expected_counts)
if mismatches:
raise ValueError(f"{label}: rollup counts mismatch: {mismatches}")
zero_rollups = {
"owner_approval_received_count",
"owner_acceptance_record_write_count",
"promotion_execution_count",
"canonical_runtime_target_read_count",
"live_query_count",
"failure_receipt_send_count",
"reviewer_queue_write_count",
"gateway_queue_write_count",
"telegram_send_count",
"bot_api_call_count",
"report_receipt_write_count",
"result_capture_write_count",
"learning_write_count",
"playbook_trust_write_count",
"production_write_count",
"secret_read_count",
"destructive_operation_count",
}
non_zero = sorted(field for field in zero_rollups if rollups.get(field) != 0)
if non_zero:
raise ValueError(f"{label}: live/send/write rollups must remain zero: {non_zero}")
def _mismatches(actual: dict[str, Any], expected: dict[str, Any]) -> dict[str, dict[str, Any]]:
return {
key: {"expected": expected_value, "actual": actual.get(key)}
for key, expected_value in expected.items()
if actual.get(key) != expected_value
}
def _is_redacted_sha256(value: Any) -> bool:
if not isinstance(value, str):
return False
if not value.startswith("sha256:") or len(value) != len("sha256:") + 64:
return False
digest = value.split(":", 1)[1]
return all(char in "0123456789abcdef" for char in digest)