feat(governance): 新增操作類別權限模型
All checks were successful
Code Review / ai-code-review (push) Successful in 16s
CD Pipeline / tests (push) Successful in 1m24s
CD Pipeline / build-and-deploy (push) Successful in 4m45s
CD Pipeline / post-deploy-checks (push) Successful in 1m46s

This commit is contained in:
Your Name
2026-06-12 15:04:51 +08:00
parent b5112ccf65
commit 7c8bb3645b
14 changed files with 1994 additions and 10 deletions

View File

@@ -76,6 +76,9 @@ from src.services.ai_agent_owner_approved_fixture_dry_run import (
from src.services.ai_agent_owner_approved_learning_dry_run import (
load_latest_ai_agent_owner_approved_learning_dry_run,
)
from src.services.ai_agent_operation_permission_model import (
load_latest_ai_agent_operation_permission_model,
)
from src.services.ai_agent_post_write_verifier_package import (
load_latest_ai_agent_post_write_verifier_package,
)
@@ -1035,6 +1038,34 @@ async def get_agent_runtime_worker_shadow_gate() -> dict[str, Any]:
) from exc
@router.get(
"/agent-operation-permission-model",
response_model=dict[str, Any],
summary="取得 AI Agent 操作類別權限模型",
description=(
"讀取最新已提交的 P2-101 操作類別權限模型;此端點只回傳 permission lane、"
"operation category、Agent responsibility、gate transition 與 operator template"
"不啟動 runtime worker、不寫 Gateway queue、不送 Telegram、不呼叫 Bot API、"
"不寫讀報回執、不執行 verifier live readback、不寫 production target、不讀 secret。"
),
)
async def get_agent_operation_permission_model() -> dict[str, Any]:
"""Return the latest read-only AI Agent operation permission model."""
try:
return await asyncio.to_thread(load_latest_ai_agent_operation_permission_model)
except FileNotFoundError as exc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=str(exc),
) from exc
except (json.JSONDecodeError, ValueError) as exc:
logger.error("ai_agent_operation_permission_model_invalid", error=str(exc))
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="AI Agent 操作類別權限模型無效",
) from exc
@router.get(
"/agent-owner-approved-fixture-dry-run",
response_model=dict[str, Any],

View File

@@ -0,0 +1,313 @@
"""
AI Agent operation permission model snapshot.
Loads the latest committed P2-101 operation category permission model.
This module validates repo-committed evidence only; it never enables runtime
workers, writes Gateway queues, sends Telegram messages, reads secrets, or
writes production targets.
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
from src.services.snapshot_paths import default_evaluations_dir
_DEFAULT_EVALUATIONS_DIR = default_evaluations_dir(Path(__file__))
_SNAPSHOT_PATTERN = "ai_agent_operation_permission_model_*.json"
_SCHEMA_VERSION = "ai_agent_operation_permission_model_v1"
_RUNTIME_AUTHORITY = "operation_permission_model_only_no_live_execution_or_send"
def load_latest_ai_agent_operation_permission_model(
evaluations_dir: Path | None = None,
) -> dict[str, Any]:
"""Load the newest committed AI Agent operation permission model."""
directory = evaluations_dir or _DEFAULT_EVALUATIONS_DIR
candidates = sorted(directory.glob(_SNAPSHOT_PATTERN))
if not candidates:
raise FileNotFoundError(f"no AI Agent operation permission model snapshots found in {directory}")
latest = candidates[-1]
with latest.open(encoding="utf-8") as handle:
payload = json.load(handle)
if not isinstance(payload, dict):
raise ValueError(f"{latest}: expected JSON object")
_require_schema(payload, str(latest))
_require_no_live_boundaries(payload, str(latest))
_require_permission_lanes(payload, str(latest))
_require_operation_categories(payload, str(latest))
_require_agent_roles(payload, str(latest))
_require_gate_transitions(payload, str(latest))
_require_operator_templates(payload, str(latest))
_require_redaction_contract(payload, str(latest))
_require_rollup_consistency(payload, str(latest))
return payload
def _require_schema(payload: dict[str, Any], label: str) -> None:
if payload.get("schema_version") != _SCHEMA_VERSION:
raise ValueError(f"{label}: expected schema_version={_SCHEMA_VERSION}")
status = payload.get("program_status") or {}
if status.get("read_only_mode") is not True:
raise ValueError(f"{label}: program_status.read_only_mode must be true")
if status.get("runtime_authority") != _RUNTIME_AUTHORITY:
raise ValueError(f"{label}: runtime_authority must remain {_RUNTIME_AUTHORITY}")
if status.get("current_task_id") != "P2-101":
raise ValueError(f"{label}: current_task_id must be P2-101")
if status.get("next_task_id") != "P2-102":
raise ValueError(f"{label}: next_task_id must be P2-102")
def _require_no_live_boundaries(payload: dict[str, Any], label: str) -> None:
truth = payload.get("operation_permission_truth") or {}
required_true = {
"permission_model_ready",
"operation_category_matrix_ready",
"risk_tier_mapping_ready",
"agent_responsibility_mapping_ready",
"approval_gate_mapping_ready",
"manual_sop_lane_ready",
"p2_404_shadow_gate_handoff_ready",
}
missing = sorted(field for field in required_true if truth.get(field) is not True)
if missing:
raise ValueError(f"{label}: permission readiness flags must remain true: {missing}")
required_false = {
"runtime_execution_enabled",
"gateway_queue_write_enabled",
"telegram_send_enabled",
"telegram_bot_api_call_enabled",
"delivery_receipt_write_enabled",
"ai_runtime_worker_enabled",
"medium_low_auto_worker_enabled",
"post_action_verifier_live_readback_enabled",
"production_write_enabled",
"secret_value_read_enabled",
"paid_provider_call_enabled",
"host_or_cluster_command_enabled",
"destructive_operation_enabled",
"work_window_transcript_display_allowed",
}
unsafe = sorted(field for field in required_false if truth.get(field) is not False)
if unsafe:
raise ValueError(f"{label}: live execution/send/write flags must remain false: {unsafe}")
zero_counts = {
"runtime_execution_count_24h",
"gateway_queue_write_count_24h",
"telegram_send_count_24h",
"telegram_bot_api_call_count_24h",
"delivery_receipt_write_count_24h",
"ai_runtime_worker_run_count_24h",
"medium_low_auto_execution_count_24h",
"post_action_verifier_live_readback_count_24h",
"production_write_count_24h",
"secret_value_read_count_24h",
"paid_provider_call_count_24h",
"host_or_cluster_command_count_24h",
"destructive_operation_count_24h",
}
non_zero = sorted(field for field in zero_counts if truth.get(field) != 0)
if non_zero:
raise ValueError(f"{label}: live execution/send/write counts must remain zero: {non_zero}")
def _require_permission_lanes(payload: dict[str, Any], label: str) -> None:
lanes = payload.get("permission_lanes") or []
lane_ids = {lane.get("lane_id") for lane in lanes}
required = {
"observe_only",
"no_write_replay_allowed",
"proposal_only",
"human_approval_required",
"explicitly_blocked",
}
if lane_ids != required:
raise ValueError(f"{label}: permission lanes must match {sorted(required)}")
for lane in lanes:
lane_id = lane.get("lane_id")
if lane.get("live_execution_allowed") is not False:
raise ValueError(f"{label}: lane {lane_id} live_execution_allowed must remain false")
if lane.get("production_write_allowed") is not False:
raise ValueError(f"{label}: lane {lane_id} production_write_allowed must remain false")
def _require_operation_categories(payload: dict[str, Any], label: str) -> None:
categories = payload.get("operation_categories") or []
category_ids = {category.get("category_id") for category in categories}
required = {
"observe_inventory_read",
"diagnose_correlate_evidence",
"report_digest_queue_candidate",
"shadow_no_write_replay",
"manual_sop_draft",
"repair_candidate_proposal",
"low_risk_noop_execution",
"medium_risk_repair_execution",
"post_action_verifier_live_readback",
"telegram_gateway_queue_write",
"production_config_or_data_write",
"secret_or_paid_provider_access",
"destructive_host_or_cluster_action",
}
if category_ids != required:
raise ValueError(f"{label}: operation categories must match {sorted(required)}")
for category in categories:
category_id = category.get("category_id")
if category.get("queue_write_allowed") is not False:
raise ValueError(f"{label}: category {category_id} queue_write_allowed must remain false")
if category.get("telegram_send_allowed") is not False:
raise ValueError(f"{label}: category {category_id} telegram_send_allowed must remain false")
if category.get("production_write_allowed") is not False:
raise ValueError(f"{label}: category {category_id} production_write_allowed must remain false")
if category.get("secret_value_read_allowed") is not False:
raise ValueError(f"{label}: category {category_id} secret_value_read_allowed must remain false")
if category.get("destructive_action_allowed") is not False:
raise ValueError(f"{label}: category {category_id} destructive_action_allowed must remain false")
if category.get("live_execution_allowed") is not False:
raise ValueError(f"{label}: category {category_id} live_execution_allowed must remain false")
if not _is_redacted_sha256(category.get("evidence_hash")):
raise ValueError(f"{label}: category {category_id} must expose a redacted sha256 evidence_hash")
def _require_agent_roles(payload: dict[str, Any], label: str) -> None:
roles = payload.get("agent_permission_roles") or []
agents = {role.get("agent_id") for role in roles}
if agents != {"openclaw", "hermes", "nemotron"}:
raise ValueError(f"{label}: permission roles must include OpenClaw, Hermes, and NemoTron")
for role in roles:
if role.get("live_action_count_24h") != 0:
raise ValueError(f"{label}: agent {role.get('agent_id')} live_action_count_24h must remain zero")
if role.get("self_approval_allowed") is not False:
raise ValueError(f"{label}: agent {role.get('agent_id')} self_approval_allowed must remain false")
def _require_gate_transitions(payload: dict[str, Any], label: str) -> None:
gates = payload.get("gate_transitions") or []
gate_ids = {gate.get("gate_id") for gate in gates}
required = {
"p2_101_permission_review_gate",
"p2_102_dry_run_evidence_gate",
"gateway_queue_write_permission_gate",
"telegram_send_permission_gate",
"medium_low_auto_worker_permission_gate",
"post_action_verifier_live_gate",
"production_write_permission_gate",
"secret_or_paid_provider_gate",
}
if gate_ids != required:
raise ValueError(f"{label}: gate transitions must match {sorted(required)}")
for gate in gates:
gate_id = gate.get("gate_id")
if gate.get("opens_live_execution") is not False:
raise ValueError(f"{label}: gate {gate_id} opens_live_execution must remain false")
if gate.get("current_status") not in {"ready_for_review", "blocked_until_evidence", "blocked_by_policy"}:
raise ValueError(f"{label}: gate {gate_id} current_status is invalid")
def _require_operator_templates(payload: dict[str, Any], label: str) -> None:
templates = payload.get("operator_decision_templates") or []
template_ids = {template.get("template_id") for template in templates}
required = {
"evidence_collect_next_step",
"manual_sop_next_step",
"repair_proposal_next_step",
"queue_candidate_next_step",
"rollback_or_fix_next_step",
}
if template_ids != required:
raise ValueError(f"{label}: operator templates must match {sorted(required)}")
for template in templates:
if template.get("creates_runtime_action") is not False:
raise ValueError(f"{label}: template {template.get('template_id')} creates_runtime_action must remain false")
if template.get("requires_human_review") is not True:
raise ValueError(f"{label}: template {template.get('template_id')} requires_human_review must remain true")
def _require_redaction_contract(payload: dict[str, Any], label: str) -> None:
contract = payload.get("display_redaction_contract") or {}
required_false = {
"raw_prompt_display_allowed",
"private_reasoning_display_allowed",
"secret_value_display_allowed",
"raw_telegram_payload_display_allowed",
"work_window_transcript_display_allowed",
}
if contract.get("redaction_required") is not True:
raise ValueError(f"{label}: display redaction must remain required")
unsafe = sorted(field for field in required_false if contract.get(field) is not False)
if unsafe:
raise ValueError(f"{label}: display redaction fields must remain false: {unsafe}")
def _require_rollup_consistency(payload: dict[str, Any], label: str) -> None:
rollups = payload.get("rollups") or {}
truth = payload.get("operation_permission_truth") or {}
lanes = payload.get("permission_lanes") or []
categories = payload.get("operation_categories") or []
roles = payload.get("agent_permission_roles") or []
gates = payload.get("gate_transitions") or []
templates = payload.get("operator_decision_templates") or []
expected = {
"permission_lane_count": len(lanes),
"operation_category_count": len(categories),
"observe_only_category_count": sum(1 for item in categories if item.get("permission_lane") == "observe_only"),
"no_write_replay_allowed_category_count": sum(1 for item in categories if item.get("permission_lane") == "no_write_replay_allowed"),
"proposal_only_category_count": sum(1 for item in categories if item.get("permission_lane") == "proposal_only"),
"human_approval_required_category_count": sum(1 for item in categories if item.get("permission_lane") == "human_approval_required"),
"explicitly_blocked_category_count": sum(1 for item in categories if item.get("permission_lane") == "explicitly_blocked"),
"agent_role_count": len(roles),
"gate_transition_count": len(gates),
"operator_decision_template_count": len(templates),
}
mismatches = sorted(field for field, value in expected.items() if rollups.get(field) != value)
if mismatches:
raise ValueError(f"{label}: rollup counts must match source arrays: {mismatches}")
approval_category_ids = sorted(
item.get("category_id") for item in categories if item.get("permission_lane") == "human_approval_required"
)
if sorted(rollups.get("human_approval_required_category_ids") or []) != approval_category_ids:
raise ValueError(f"{label}: human_approval_required_category_ids must match categories")
blocked_category_ids = sorted(
item.get("category_id") for item in categories if item.get("permission_lane") == "explicitly_blocked"
)
if sorted(rollups.get("explicitly_blocked_category_ids") or []) != blocked_category_ids:
raise ValueError(f"{label}: explicitly_blocked_category_ids must match categories")
zero_pairs = {
"runtime_execution_count": truth.get("runtime_execution_count_24h"),
"gateway_queue_write_count": truth.get("gateway_queue_write_count_24h"),
"telegram_send_count": truth.get("telegram_send_count_24h"),
"telegram_bot_api_call_count": truth.get("telegram_bot_api_call_count_24h"),
"delivery_receipt_write_count": truth.get("delivery_receipt_write_count_24h"),
"ai_runtime_worker_run_count": truth.get("ai_runtime_worker_run_count_24h"),
"medium_low_auto_execution_count": truth.get("medium_low_auto_execution_count_24h"),
"post_action_verifier_live_readback_count": truth.get("post_action_verifier_live_readback_count_24h"),
"production_write_count": truth.get("production_write_count_24h"),
"secret_value_read_count": truth.get("secret_value_read_count_24h"),
"paid_provider_call_count": truth.get("paid_provider_call_count_24h"),
"host_or_cluster_command_count": truth.get("host_or_cluster_command_count_24h"),
"destructive_operation_count": truth.get("destructive_operation_count_24h"),
}
non_zero = sorted(field for field, value in zero_pairs.items() if rollups.get(field) != 0 or value != 0)
if non_zero:
raise ValueError(f"{label}: rollup live counts must remain zero: {non_zero}")
def _is_redacted_sha256(value: Any) -> bool:
if not isinstance(value, str):
return False
prefix = "sha256:"
if not value.startswith(prefix):
return False
digest = value[len(prefix) :]
return len(digest) == 64 and all(char in "0123456789abcdef" for char in digest)