Files
awoooi/apps/api/src/services/agent_replay_label_grader.py
Your Name cfb866d055
Some checks failed
Ansible Lint / lint (push) Successful in 35s
CD Pipeline / tests (push) Failing after 13s
CD Pipeline / build-and-deploy (push) Has been skipped
CD Pipeline / post-deploy-checks (push) Has been skipped
Code Review / ai-code-review (push) Failing after 11s
feat(governance): add agent market automation surfaces
2026-06-04 21:50:55 +08:00

203 lines
6.5 KiB
Python

"""
Agent Replay Label Grader
=========================
Applies AWOOOI-owned fixture labels to normalized candidate replay records.
Candidate adapters must not provide RCA / dry-run / repair success grades. This
module joins internal fixtures with normalized candidate outputs after replay and
fills scorecard fields only when AWOOOI has enough label evidence.
"""
from __future__ import annotations
import json
from dataclasses import dataclass, field, replace
from typing import Any
from src.services.agent_replacement_evaluator import AgentReplayRecord
@dataclass(frozen=True)
class AgentReplayGradingReport:
"""Summary of local label grading coverage."""
records: int
graded_records: int
missing_fixtures: list[str] = field(default_factory=list)
missing_expected_markers: list[str] = field(default_factory=list)
action_match_true: int = 0
action_match_false: int = 0
def to_dict(self) -> dict[str, Any]:
return {
"schema_version": "agent_replay_grading_report_v1",
"records": self.records,
"graded_records": self.graded_records,
"missing_fixtures": list(self.missing_fixtures),
"missing_expected_markers": list(self.missing_expected_markers),
"action_match_true": self.action_match_true,
"action_match_false": self.action_match_false,
}
def grade_replay_records_with_fixtures(
*,
fixtures: list[dict[str, Any]],
replay_records: list[AgentReplayRecord | dict[str, Any]],
) -> tuple[list[AgentReplayRecord], AgentReplayGradingReport]:
"""Apply fixture evaluation labels to normalized replay records."""
fixture_index = _index_fixtures(fixtures)
normalized = [
record if isinstance(record, AgentReplayRecord) else AgentReplayRecord.from_dict(record)
for record in replay_records
]
graded: list[AgentReplayRecord] = []
missing_fixtures: list[str] = []
missing_expected_markers: list[str] = []
action_match_true = 0
action_match_false = 0
for record in normalized:
fixture = fixture_index.get(record.incident_id)
if fixture is None:
missing_fixtures.append(record.incident_id)
graded.append(_clear_candidate_self_grades(record, reason="missing_fixture"))
continue
labels = dict(fixture.get("evaluation_labels") or {})
markers = _expected_action_markers(labels)
if not markers:
missing_expected_markers.append(record.incident_id)
graded.append(
_clear_candidate_self_grades(
record,
reason="missing_expected_action_markers",
labels=labels,
)
)
continue
action_match = _action_matches(record, markers)
if action_match:
action_match_true += 1
else:
action_match_false += 1
graded.append(_grade_record(record, labels=labels, action_match=action_match))
report = AgentReplayGradingReport(
records=len(normalized),
graded_records=action_match_true + action_match_false,
missing_fixtures=missing_fixtures,
missing_expected_markers=missing_expected_markers,
action_match_true=action_match_true,
action_match_false=action_match_false,
)
return graded, report
def _grade_record(
record: AgentReplayRecord,
*,
labels: dict[str, Any],
action_match: bool,
) -> AgentReplayRecord:
verification_success = _verification_success(labels)
execution_success = _optional_bool(labels.get("execution_success"))
rca_correct = verification_success if action_match else False
repair_success = verification_success if action_match else False
tool_dry_run_pass = execution_success if action_match else False
false_repair = bool(
action_match
and execution_success is True
and verification_success is False
)
return replace(
record,
rca_correct=rca_correct,
tool_dry_run_pass=tool_dry_run_pass,
repair_success=repair_success,
false_repair=false_repair,
metadata={
**record.metadata,
"candidate_self_grading_ignored": True,
"label_grader": "agent_replay_label_grader_v1",
"label_grader_action_match": action_match,
"label_grader_expected_markers": _expected_action_markers(labels),
"label_grader_verification_result": labels.get("verification_result"),
"label_grader_execution_success": execution_success,
},
)
def _clear_candidate_self_grades(
record: AgentReplayRecord,
*,
reason: str,
labels: dict[str, Any] | None = None,
) -> AgentReplayRecord:
return replace(
record,
rca_correct=None,
tool_dry_run_pass=None,
repair_success=None,
false_repair=False,
metadata={
**record.metadata,
"candidate_self_grading_ignored": True,
"label_grader": "agent_replay_label_grader_v1",
"label_grader_reason": reason,
"label_grader_verification_result": (labels or {}).get("verification_result"),
},
)
def _index_fixtures(fixtures: list[dict[str, Any]]) -> dict[str, dict[str, Any]]:
indexed: dict[str, dict[str, Any]] = {}
for fixture in fixtures:
incident_id = str(fixture.get("incident_id", "")).strip()
if incident_id:
indexed[incident_id] = fixture
return indexed
def _expected_action_markers(labels: dict[str, Any]) -> list[str]:
raw = labels.get("expected_action_markers") or []
if isinstance(raw, str):
raw = [raw]
if not isinstance(raw, list):
return []
return [
marker.strip().lower()
for marker in (str(item) for item in raw)
if marker.strip()
]
def _action_matches(record: AgentReplayRecord, markers: list[str]) -> bool:
action_bundle = json.dumps(
{
"proposed_action": record.metadata.get("proposed_action"),
"action_plan": record.metadata.get("action_plan"),
},
ensure_ascii=False,
sort_keys=True,
).lower()
return all(marker in action_bundle for marker in markers)
def _verification_success(labels: dict[str, Any]) -> bool | None:
value = labels.get("verification_result")
if value is None:
return None
return str(value).lower() == "success"
def _optional_bool(value: Any) -> bool | None:
if value is None:
return None
return bool(value)