Files
awoooi/apps/api/src/services/agent_replay_input.py
Your Name cfb866d055
Some checks failed
Ansible Lint / lint (push) Successful in 35s
CD Pipeline / tests (push) Failing after 13s
CD Pipeline / build-and-deploy (push) Has been skipped
CD Pipeline / post-deploy-checks (push) Has been skipped
Code Review / ai-code-review (push) Failing after 11s
feat(governance): add agent market automation surfaces
2026-06-04 21:50:55 +08:00

105 lines
3.4 KiB
Python

"""
Agent Replay Candidate Input Builder
====================================
Builds candidate-visible replay inputs from sanitized AWOOOI fixtures.
Candidate Agents must never receive evaluation_labels. This module strips the
answer-key section and emits only incident_context plus minimal source metadata.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
@dataclass(frozen=True)
class AgentReplayCandidateInput:
"""One candidate-visible incident replay input."""
run_id: str
incident_id: str
schema_version: str = "agent_replay_candidate_input_v1"
incident_context: dict[str, Any] = field(default_factory=dict)
source_metadata: dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> dict[str, Any]:
return {
"schema_version": self.schema_version,
"run_id": self.run_id,
"incident_id": self.incident_id,
"incident_context": dict(self.incident_context),
"source_metadata": dict(self.source_metadata),
}
def build_candidate_input_from_fixture(
fixture: dict[str, Any],
) -> AgentReplayCandidateInput:
"""Strip evaluation labels from one replay fixture."""
required = ("run_id", "incident_id", "incident_context")
missing = [key for key in required if not fixture.get(key)]
if missing:
raise ValueError(f"missing required fixture field(s): {missing}")
return AgentReplayCandidateInput(
run_id=str(fixture["run_id"]),
incident_id=str(fixture["incident_id"]),
incident_context=dict(fixture["incident_context"]),
source_metadata=_safe_source_metadata(fixture.get("source_metadata") or {}),
)
def build_candidate_inputs_from_fixtures(
fixtures: list[dict[str, Any]],
) -> list[AgentReplayCandidateInput]:
"""Strip evaluation labels from many replay fixtures."""
return [build_candidate_input_from_fixture(fixture) for fixture in fixtures]
def assert_no_evaluation_label_leak(payload: dict[str, Any]) -> None:
"""Reject candidate-visible payloads that still contain answer-key fields."""
forbidden = {
"evaluation_labels",
"verification_result",
"execution_success",
"execution_error",
"self_healing_score",
"repair_success",
}
leaks = sorted(_find_forbidden_keys(payload, forbidden))
if leaks:
raise ValueError(f"candidate input leaks evaluation label field(s): {leaks}")
def _safe_source_metadata(metadata: dict[str, Any]) -> dict[str, Any]:
allowed = {
"created_at",
"updated_at",
"agent_turn_count",
"source",
}
return {key: value for key, value in metadata.items() if key in allowed}
def _find_forbidden_keys(
value: Any,
forbidden: set[str],
*,
prefix: str = "",
) -> set[str]:
found: set[str] = set()
if isinstance(value, dict):
for key, nested in value.items():
key_text = str(key)
path = f"{prefix}.{key_text}" if prefix else key_text
if key_text in forbidden:
found.add(path)
found.update(_find_forbidden_keys(nested, forbidden, prefix=path))
elif isinstance(value, list):
for index, nested in enumerate(value):
path = f"{prefix}[{index}]"
found.update(_find_forbidden_keys(nested, forbidden, prefix=path))
return found