""" AI Agent interaction and learning proof snapshot. Loads the latest committed, read-only proof surface for how operators can see OpenClaw, Hermes, and NemoTron communicating, handing off work, learning, and growing. This module is intentionally truth-gated: it never starts workers, opens Redis consumer groups, writes database migrations, sends Telegram messages, exposes transcripts, or marks live runtime as active. """ from __future__ import annotations import json from pathlib import Path from typing import Any from src.services.snapshot_paths import default_evaluations_dir _DEFAULT_EVALUATIONS_DIR = default_evaluations_dir(Path(__file__)) _SNAPSHOT_PATTERN = "ai_agent_interaction_learning_proof_*.json" _SCHEMA_VERSION = "ai_agent_interaction_learning_proof_v1" def load_latest_ai_agent_interaction_learning_proof( evaluations_dir: Path | None = None, ) -> dict[str, Any]: """Load the newest committed AI Agent interaction learning proof snapshot.""" directory = evaluations_dir or _DEFAULT_EVALUATIONS_DIR candidates = sorted(directory.glob(_SNAPSHOT_PATTERN)) if not candidates: raise FileNotFoundError( f"no AI Agent interaction learning proof snapshots found in {directory}" ) latest = candidates[-1] with latest.open(encoding="utf-8") as handle: payload = json.load(handle) if not isinstance(payload, dict): raise ValueError(f"{latest}: expected JSON object") _require_schema(payload, _SCHEMA_VERSION, str(latest)) _require_read_only_truth(payload, str(latest)) _require_rollup_consistency(payload, str(latest)) _require_agent_lanes(payload, str(latest)) _require_frontend_redaction(payload, str(latest)) return payload def _require_schema(payload: dict[str, Any], expected: str, label: str) -> None: actual = payload.get("schema_version") if actual != expected: raise ValueError(f"{label}: expected schema_version={expected}, got {actual!r}") def _require_read_only_truth(payload: dict[str, Any], label: str) -> None: program_status = payload.get("program_status") or {} if program_status.get("read_only_mode") is not True: raise ValueError(f"{label}: program_status.read_only_mode must be true") if program_status.get("runtime_authority") != "proof_surface_only_no_live_worker": raise ValueError( f"{label}: runtime_authority must stay proof_surface_only_no_live_worker" ) live_truth = payload.get("live_truth") or {} live_flags = { "runtime_loop_enabled", "live_agent_session_readback_enabled", "redis_consumer_group_enabled", "telegram_send_enabled", "learning_writeback_enabled", } enabled = sorted(flag for flag in live_flags if live_truth.get(flag) is not False) if enabled: raise ValueError(f"{label}: live truth flags must remain false: {enabled}") live_counts = { "active_live_agent_sessions", "live_agent_messages_24h", "live_handoffs_24h", "live_learning_writes_24h", "telegram_digest_receipts_24h", } non_zero = sorted(key for key in live_counts if live_truth.get(key) != 0) if non_zero: raise ValueError(f"{label}: live truth counts must remain zero: {non_zero}") boundaries = payload.get("approval_boundaries") or {} blocked_flags = { "runtime_worker_allowed", "db_migration_allowed", "redis_consumer_group_allowed", "telegram_direct_send_allowed", "conversation_transcript_display_allowed", "agent_private_reasoning_display_allowed", "secret_plaintext_allowed", "autonomous_self_modify_allowed", } allowed = sorted(flag for flag in blocked_flags if boundaries.get(flag) is not False) if allowed: raise ValueError(f"{label}: approval boundaries must remain false: {allowed}") def _require_rollup_consistency(payload: dict[str, Any], label: str) -> None: rollups = payload.get("rollups") or {} proof_ladder = payload.get("proof_ladder") or [] proof_signals = payload.get("proof_signals") or [] operator_surfaces = payload.get("operator_surfaces") or [] runtime_gates = payload.get("runtime_gates") or [] expected_counts = { "proof_level_count": len(proof_ladder), "signal_count": len(proof_signals), "operator_surface_count": len(operator_surfaces), "runtime_gate_count": len(runtime_gates), } mismatched = { key: {"expected": expected, "actual": rollups.get(key)} for key, expected in expected_counts.items() if rollups.get(key) != expected } if mismatched: raise ValueError(f"{label}: rollup counts must match payload sections: {mismatched}") contract_ready_ids = sorted( level.get("level_id") for level in proof_ladder if level.get("status") in {"contract_ready", "proof_surface_ready"} ) if rollups.get("contract_ready_level_count") != len(contract_ready_ids): raise ValueError(f"{label}: rollups.contract_ready_level_count mismatch") live_pending_ids = sorted( level.get("level_id") for level in proof_ladder if level.get("status") in {"live_pending", "blocked_by_gate"} ) if sorted(rollups.get("live_pending_level_ids") or []) != live_pending_ids: raise ValueError(f"{label}: rollups.live_pending_level_ids mismatch") live_signal_count = sum( 1 for signal in proof_signals if signal.get("current_state") == "live_verified" ) if rollups.get("live_signal_count") != live_signal_count: raise ValueError(f"{label}: rollups.live_signal_count mismatch") blocked_gate_ids = sorted( gate.get("gate_id") for gate in runtime_gates if gate.get("status") in {"blocked", "approval_required"} ) if sorted(rollups.get("blocked_gate_ids") or []) != blocked_gate_ids: raise ValueError(f"{label}: rollups.blocked_gate_ids mismatch") live_truth = payload.get("live_truth") or {} for key in ( "active_live_agent_sessions", "live_agent_messages_24h", "live_handoffs_24h", "live_learning_writes_24h", "telegram_digest_receipts_24h", ): if rollups.get(key) != live_truth.get(key): raise ValueError(f"{label}: rollups.{key} must mirror live_truth.{key}") def _require_agent_lanes(payload: dict[str, Any], label: str) -> None: lanes = payload.get("agent_lanes") or [] lane_ids = {lane.get("agent_id") for lane in lanes} required_lanes = {"openclaw", "hermes", "nemotron"} if not required_lanes.issubset(lane_ids): raise ValueError(f"{label}: missing required agent lanes: {sorted(required_lanes - lane_ids)}") missing_visible_signal = [ lane.get("agent_id") for lane in lanes if not lane.get("visible_signals") ] if missing_visible_signal: raise ValueError(f"{label}: every agent lane needs visible_signals: {missing_visible_signal}") unsafe_lanes = [ lane.get("agent_id") for lane in lanes if "conversation_transcript" in set(lane.get("visible_signals") or []) ] if unsafe_lanes: raise ValueError(f"{label}: visible signals must not expose transcripts: {unsafe_lanes}") def _require_frontend_redaction(payload: dict[str, Any], label: str) -> None: redaction = payload.get("frontend_redaction") or {} if redaction.get("operator_conversation_display_allowed") is not False: raise ValueError(f"{label}: operator conversation display must stay false") if redaction.get("agent_private_reasoning_display_allowed") is not False: raise ValueError(f"{label}: agent private reasoning display must stay false") if redaction.get("raw_prompt_display_allowed") is not False: raise ValueError(f"{label}: raw prompt display must stay false")