""" Reference Agent Replay Adapter ============================== Deterministic no-LLM adapter used to smoke-test the replacement replay pipeline. This is not a market candidate and must not be used as replacement evidence. It exists so real adapters have an executable input/output example. """ from __future__ import annotations import json from dataclasses import dataclass from typing import Any @dataclass(frozen=True) class ReferenceAdapterDecision: """Candidate replay result payload produced by the reference adapter.""" payload: dict[str, Any] def to_dict(self) -> dict[str, Any]: return dict(self.payload) def build_reference_candidate_result( candidate_input: dict[str, Any], *, candidate_id: str = "reference_deterministic_adapter", candidate_role: str = "contract_smoke_adapter", ) -> ReferenceAdapterDecision: """Build one deterministic candidate replay result from candidate input.""" context = dict(candidate_input.get("incident_context") or {}) incident_id = str(candidate_input.get("incident_id", "")).strip() run_id = str(candidate_input.get("run_id", "")).strip() if not incident_id or not run_id: raise ValueError("candidate input must include incident_id and run_id") action = _proposed_action(context) risk_level = _risk_level(context, action) return ReferenceAdapterDecision( payload={ "schema_version": "agent_candidate_replay_result_v1", "run_id": run_id, "incident_id": incident_id, "candidate_id": candidate_id, "candidate_role": candidate_role, "proposed_action": action, "action_plan": _action_plan(action), "risk_level": risk_level, "requires_human_approval": risk_level in {"medium", "high", "critical"}, "blocked_by_policy": False, "fallback_used": False, "trace_complete": True, "trace_events": [ {"type": "input_loaded"}, {"type": "deterministic_policy"}, {"type": "safety_gate"}, ], "rca_correct": None, "tool_dry_run_pass": None, "repair_success": None, "false_repair": False, "latency_ms": 1, "cost_usd": 0, "metadata": { "source": "reference_deterministic_adapter", "not_market_evidence": True, }, } ) def build_reference_candidate_results( candidate_inputs: list[dict[str, Any]], *, candidate_id: str = "reference_deterministic_adapter", candidate_role: str = "contract_smoke_adapter", ) -> list[ReferenceAdapterDecision]: """Build many deterministic candidate replay results.""" return [ build_reference_candidate_result( candidate_input, candidate_id=candidate_id, candidate_role=candidate_role, ) for candidate_input in candidate_inputs ] def _proposed_action(context: dict[str, Any]) -> str: haystack = json.dumps(context, ensure_ascii=False, sort_keys=True).lower() service = _primary_service(context) namespace = _namespace(context) if any(marker in haystack for marker in ("crashloop", "restart", "podcrash")): return f"kubectl rollout restart deployment {service} -n {namespace}" if any(marker in haystack for marker in ("oom", "memory", "cpu")): return f"kubectl describe deployment {service} -n {namespace}" return f"kubectl logs deployment/{service} -n {namespace} --tail=200" def _action_plan(action: str) -> list[dict[str, Any]]: args = action.split() if "rollout restart" in action: dry_run = args + ["--dry-run=server"] else: dry_run = args return [ { "step": "dry_run", "tool": "kubectl", "args": dry_run[1:] if dry_run and dry_run[0] == "kubectl" else dry_run, }, { "step": "proposal", "tool": "kubectl", "args": args[1:] if args and args[0] == "kubectl" else args, }, ] def _risk_level(context: dict[str, Any], action: str) -> str: severity = str(context.get("severity", "")).upper() if severity == "P0": return "high" if "rollout restart" in action: return "medium" if severity in {"P1", "P2"}: return "medium" return "low" def _primary_service(context: dict[str, Any]) -> str: services = context.get("affected_services") or [] if services: return _resource_name(str(services[0])) for signal in context.get("signals") or []: labels = signal.get("labels") or {} for key in ("deployment", "service", "app", "pod"): if labels.get(key): return _resource_name(str(labels[key]).split("-")[0]) return "unknown" def _namespace(context: dict[str, Any]) -> str: for signal in context.get("signals") or []: labels = signal.get("labels") or {} if labels.get("namespace"): return _resource_name(str(labels["namespace"])) return "default" def _resource_name(value: str) -> str: cleaned = "".join( char.lower() for char in value if char.isalnum() or char in {"-", "."} ).strip("-.") return cleaned or "unknown"