""" AI Agent communication and learning contract snapshot. Loads the latest committed, read-only contract for OpenClaw, Hermes, and NemoTron proactive communication, learning, recording, MCP, RAG, and intelligence service boundaries. This module never starts workers, writes database migrations, sends Telegram messages, installs SDKs, calls paid providers, or changes production routes. """ from __future__ import annotations import json from pathlib import Path from typing import Any from src.services.snapshot_paths import default_evaluations_dir _DEFAULT_EVALUATIONS_DIR = default_evaluations_dir(Path(__file__)) _SNAPSHOT_PATTERN = "ai_agent_communication_learning_contract_*.json" _SCHEMA_VERSION = "ai_agent_communication_learning_contract_v1" def load_latest_ai_agent_communication_learning_contract( evaluations_dir: Path | None = None, ) -> dict[str, Any]: """Load the newest committed AI Agent communication learning contract.""" directory = evaluations_dir or _DEFAULT_EVALUATIONS_DIR candidates = sorted(directory.glob(_SNAPSHOT_PATTERN)) if not candidates: raise FileNotFoundError( f"no AI Agent communication learning contract snapshots found in {directory}" ) latest = candidates[-1] with latest.open(encoding="utf-8") as handle: payload = json.load(handle) if not isinstance(payload, dict): raise ValueError(f"{latest}: expected JSON object") _require_schema(payload, _SCHEMA_VERSION, str(latest)) _require_read_only_contract(payload, str(latest)) _require_rollup_consistency(payload, str(latest)) _require_agent_boundaries(payload, str(latest)) _require_frontend_redaction(payload, str(latest)) return payload def _require_schema(payload: dict[str, Any], expected: str, label: str) -> None: actual = payload.get("schema_version") if actual != expected: raise ValueError(f"{label}: expected schema_version={expected}, got {actual!r}") def _require_read_only_contract(payload: dict[str, Any], label: str) -> None: program_status = payload.get("program_status") or {} if program_status.get("read_only_mode") is not True: raise ValueError(f"{label}: program_status.read_only_mode must be true") if program_status.get("runtime_authority") != "contract_only_no_runtime_worker": raise ValueError(f"{label}: runtime_authority must stay contract_only_no_runtime_worker") boundaries = payload.get("approval_boundaries") or {} blocked_flags = { "runtime_worker_allowed", "db_migration_allowed", "telegram_direct_send_allowed", "paid_external_service_allowed", "secret_plaintext_allowed", "autonomous_host_mutation_allowed", "production_route_change_allowed", "sdk_installation_allowed", } allowed = sorted(flag for flag in blocked_flags if boundaries.get(flag) is not False) if allowed: raise ValueError(f"{label}: approval boundaries must remain false: {allowed}") def _require_rollup_consistency(payload: dict[str, Any], label: str) -> None: rollups = payload.get("rollups") or {} expected_counts = { "agent_lane_count": len(payload.get("agent_lanes") or []), "mcp_stack_count": len(payload.get("mcp_stack") or []), "rag_layer_count": len(payload.get("rag_memory_stack") or []), "learning_loop_count": len(payload.get("learning_loops") or []), "intelligence_service_count": len(payload.get("intelligence_services") or []), "rollout_task_count": len(payload.get("rollout_tasks") or []), } mismatched = { key: {"expected": expected, "actual": rollups.get(key)} for key, expected in expected_counts.items() if rollups.get(key) != expected } if mismatched: raise ValueError(f"{label}: rollup counts must match payload sections: {mismatched}") rollout_tasks = payload.get("rollout_tasks") or [] blocked_task_ids = sorted( task.get("task_id") for task in rollout_tasks if task.get("status") in {"planned", "blocked"} and ( "approval" in str(task.get("next_gate", "")).lower() or "gate" in str(task.get("next_gate", "")).lower() ) ) if sorted(rollups.get("blocked_task_ids") or []) != blocked_task_ids: raise ValueError(f"{label}: rollups.blocked_task_ids must match gated rollout tasks") optional_service_ids = sorted( service.get("id") for service in payload.get("intelligence_services") or [] if service.get("status") in {"optional_candidate", "deferred_candidate"} ) if sorted(rollups.get("optional_service_ids") or []) != optional_service_ids: raise ValueError(f"{label}: rollups.optional_service_ids must match optional services") def _require_agent_boundaries(payload: dict[str, Any], label: str) -> None: lanes = payload.get("agent_lanes") or [] lane_ids = {lane.get("agent_id") for lane in lanes} required_lanes = {"openclaw", "hermes", "nemotron"} if not required_lanes.issubset(lane_ids): raise ValueError(f"{label}: missing required agent lanes: {sorted(required_lanes - lane_ids)}") unsafe_lanes = [ lane.get("agent_id") for lane in lanes if not lane.get("blocked_actions") or "secret_plaintext_read" not in set(lane.get("blocked_actions") or []) ] if unsafe_lanes: raise ValueError(f"{label}: agent lanes must block secret plaintext read: {unsafe_lanes}") nemotron = next((lane for lane in lanes if lane.get("agent_id") == "nemotron"), {}) nemotron_blocked = set(nemotron.get("blocked_actions") or []) if "production_route_change" not in nemotron_blocked: raise ValueError(f"{label}: Nemotron must remain blocked from production route changes") def _require_frontend_redaction(payload: dict[str, Any], label: str) -> None: redaction = ((payload.get("communication_plane") or {}).get("frontend_redaction") or {}) if redaction.get("operator_conversation_display_allowed") is not False: raise ValueError(f"{label}: operator conversation display must stay false") if redaction.get("agent_private_reasoning_display_allowed") is not False: raise ValueError(f"{label}: agent private reasoning display must stay false")