Files
awoooi/apps/api/src/services/ai_agent_communication_learning_contract.py
Your Name 8c11af7c19
All checks were successful
CD Pipeline / tests (push) Successful in 1m23s
Code Review / ai-code-review (push) Successful in 15s
CD Pipeline / build-and-deploy (push) Successful in 4m33s
CD Pipeline / post-deploy-checks (push) Successful in 1m32s
feat(governance): 定義 Agent 主動溝通學習契約
2026-06-11 11:53:42 +08:00

147 lines
6.2 KiB
Python

"""
AI Agent communication and learning contract snapshot.
Loads the latest committed, read-only contract for OpenClaw, Hermes, and
NemoTron proactive communication, learning, recording, MCP, RAG, and
intelligence service boundaries. This module never starts workers, writes
database migrations, sends Telegram messages, installs SDKs, calls paid
providers, or changes production routes.
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
from src.services.snapshot_paths import default_evaluations_dir
_DEFAULT_EVALUATIONS_DIR = default_evaluations_dir(Path(__file__))
_SNAPSHOT_PATTERN = "ai_agent_communication_learning_contract_*.json"
_SCHEMA_VERSION = "ai_agent_communication_learning_contract_v1"
def load_latest_ai_agent_communication_learning_contract(
evaluations_dir: Path | None = None,
) -> dict[str, Any]:
"""Load the newest committed AI Agent communication learning contract."""
directory = evaluations_dir or _DEFAULT_EVALUATIONS_DIR
candidates = sorted(directory.glob(_SNAPSHOT_PATTERN))
if not candidates:
raise FileNotFoundError(
f"no AI Agent communication learning contract snapshots found in {directory}"
)
latest = candidates[-1]
with latest.open(encoding="utf-8") as handle:
payload = json.load(handle)
if not isinstance(payload, dict):
raise ValueError(f"{latest}: expected JSON object")
_require_schema(payload, _SCHEMA_VERSION, str(latest))
_require_read_only_contract(payload, str(latest))
_require_rollup_consistency(payload, str(latest))
_require_agent_boundaries(payload, str(latest))
_require_frontend_redaction(payload, str(latest))
return payload
def _require_schema(payload: dict[str, Any], expected: str, label: str) -> None:
actual = payload.get("schema_version")
if actual != expected:
raise ValueError(f"{label}: expected schema_version={expected}, got {actual!r}")
def _require_read_only_contract(payload: dict[str, Any], label: str) -> None:
program_status = payload.get("program_status") or {}
if program_status.get("read_only_mode") is not True:
raise ValueError(f"{label}: program_status.read_only_mode must be true")
if program_status.get("runtime_authority") != "contract_only_no_runtime_worker":
raise ValueError(f"{label}: runtime_authority must stay contract_only_no_runtime_worker")
boundaries = payload.get("approval_boundaries") or {}
blocked_flags = {
"runtime_worker_allowed",
"db_migration_allowed",
"telegram_direct_send_allowed",
"paid_external_service_allowed",
"secret_plaintext_allowed",
"autonomous_host_mutation_allowed",
"production_route_change_allowed",
"sdk_installation_allowed",
}
allowed = sorted(flag for flag in blocked_flags if boundaries.get(flag) is not False)
if allowed:
raise ValueError(f"{label}: approval boundaries must remain false: {allowed}")
def _require_rollup_consistency(payload: dict[str, Any], label: str) -> None:
rollups = payload.get("rollups") or {}
expected_counts = {
"agent_lane_count": len(payload.get("agent_lanes") or []),
"mcp_stack_count": len(payload.get("mcp_stack") or []),
"rag_layer_count": len(payload.get("rag_memory_stack") or []),
"learning_loop_count": len(payload.get("learning_loops") or []),
"intelligence_service_count": len(payload.get("intelligence_services") or []),
"rollout_task_count": len(payload.get("rollout_tasks") or []),
}
mismatched = {
key: {"expected": expected, "actual": rollups.get(key)}
for key, expected in expected_counts.items()
if rollups.get(key) != expected
}
if mismatched:
raise ValueError(f"{label}: rollup counts must match payload sections: {mismatched}")
rollout_tasks = payload.get("rollout_tasks") or []
blocked_task_ids = sorted(
task.get("task_id")
for task in rollout_tasks
if task.get("status") in {"planned", "blocked"}
and (
"approval" in str(task.get("next_gate", "")).lower()
or "gate" in str(task.get("next_gate", "")).lower()
)
)
if sorted(rollups.get("blocked_task_ids") or []) != blocked_task_ids:
raise ValueError(f"{label}: rollups.blocked_task_ids must match gated rollout tasks")
optional_service_ids = sorted(
service.get("id")
for service in payload.get("intelligence_services") or []
if service.get("status") in {"optional_candidate", "deferred_candidate"}
)
if sorted(rollups.get("optional_service_ids") or []) != optional_service_ids:
raise ValueError(f"{label}: rollups.optional_service_ids must match optional services")
def _require_agent_boundaries(payload: dict[str, Any], label: str) -> None:
lanes = payload.get("agent_lanes") or []
lane_ids = {lane.get("agent_id") for lane in lanes}
required_lanes = {"openclaw", "hermes", "nemotron"}
if not required_lanes.issubset(lane_ids):
raise ValueError(f"{label}: missing required agent lanes: {sorted(required_lanes - lane_ids)}")
unsafe_lanes = [
lane.get("agent_id")
for lane in lanes
if not lane.get("blocked_actions")
or "secret_plaintext_read" not in set(lane.get("blocked_actions") or [])
]
if unsafe_lanes:
raise ValueError(f"{label}: agent lanes must block secret plaintext read: {unsafe_lanes}")
nemotron = next((lane for lane in lanes if lane.get("agent_id") == "nemotron"), {})
nemotron_blocked = set(nemotron.get("blocked_actions") or [])
if "production_route_change" not in nemotron_blocked:
raise ValueError(f"{label}: Nemotron must remain blocked from production route changes")
def _require_frontend_redaction(payload: dict[str, Any], label: str) -> None:
redaction = ((payload.get("communication_plane") or {}).get("frontend_redaction") or {})
if redaction.get("operator_conversation_display_allowed") is not False:
raise ValueError(f"{label}: operator conversation display must stay false")
if redaction.get("agent_private_reasoning_display_allowed") is not False:
raise ValueError(f"{label}: agent private reasoning display must stay false")