Files
awoooi/scripts/agents/run-agent-replacement-replay.py
Your Name cfb866d055
Some checks failed
Ansible Lint / lint (push) Successful in 35s
CD Pipeline / tests (push) Failing after 13s
CD Pipeline / build-and-deploy (push) Has been skipped
CD Pipeline / post-deploy-checks (push) Has been skipped
Code Review / ai-code-review (push) Failing after 11s
feat(governance): add agent market automation surfaces
2026-06-04 21:50:55 +08:00

199 lines
7.0 KiB
Python

#!/usr/bin/env python3
"""
Run the AWOOOI Agent replacement replay pipeline for one candidate.
Pipeline:
candidate input JSONL + candidate raw result JSONL
-> contract validation
-> normalized candidate replay JSONL
-> OpenClaw baseline + candidate scorecard
"""
from __future__ import annotations
import argparse
import json
import sys
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[2]
API_SRC = ROOT / "apps" / "api"
sys.path.insert(0, str(API_SRC))
from src.services.agent_replay_contract import ( # noqa: E402
validate_candidate_replay_contract,
)
from src.services.agent_replay_label_grader import ( # noqa: E402
grade_replay_records_with_fixtures,
)
from src.services.agent_replay_normalizer import ( # noqa: E402
CandidateReplayResult,
normalize_candidate_result,
)
from src.services.agent_replacement_evaluator import ( # noqa: E402
BASELINE_CANDIDATE_ID,
MIN_INCIDENTS_FOR_CANARY,
AgentReplayRecord,
score_replay_records,
)
def main() -> int:
parser = argparse.ArgumentParser(
description="Validate, normalize, and score one Agent replacement candidate."
)
parser.add_argument("--inputs", required=True, help="candidate input JSONL")
parser.add_argument("--results", required=True, help="candidate raw result JSONL")
parser.add_argument("--baseline", required=True, help="OpenClaw baseline replay JSONL")
parser.add_argument("--candidate-id", required=True, help="Expected candidate_id")
parser.add_argument("--normalized-output", required=True, help="Normalized candidate JSONL")
parser.add_argument("--fixtures", help="Optional internal fixture JSONL for local grading")
parser.add_argument("--graded-output", help="Graded candidate replay JSONL")
parser.add_argument("--grading-report", help="Local grading report JSON")
parser.add_argument("--contract-report", required=True, help="Contract report JSON")
parser.add_argument("--scorecard", required=True, help="Scorecard JSON")
parser.add_argument("--summary", help="Pipeline summary JSON")
parser.add_argument(
"--baseline-id",
default=BASELINE_CANDIDATE_ID,
help=f"Baseline candidate id (default: {BASELINE_CANDIDATE_ID})",
)
parser.add_argument(
"--min-incidents",
type=int,
default=MIN_INCIDENTS_FOR_CANARY,
help=f"Minimum incidents required for canary (default: {MIN_INCIDENTS_FOR_CANARY})",
)
args = parser.parse_args()
candidate_inputs = _read_jsonl(Path(args.inputs))
candidate_results = _read_jsonl(Path(args.results))
contract_report = validate_candidate_replay_contract(
candidate_inputs=candidate_inputs,
candidate_results=candidate_results,
expected_candidate_id=args.candidate_id,
).to_dict()
_write_json(Path(args.contract_report), contract_report)
if not contract_report["valid"]:
summary = _summary(
args=args,
contract_report=contract_report,
normalized_records=0,
scorecard_written=False,
)
if args.summary:
_write_json(Path(args.summary), summary)
print(json.dumps(summary, ensure_ascii=False, sort_keys=True))
return 2
normalized_records = [
normalize_candidate_result(CandidateReplayResult.from_dict(payload))
for payload in candidate_results
]
_write_replay_jsonl(Path(args.normalized_output), normalized_records)
score_records = normalized_records
grading_report: dict[str, Any] | None = None
graded_records = 0
if args.fixtures:
score_records, report = grade_replay_records_with_fixtures(
fixtures=_read_jsonl(Path(args.fixtures)),
replay_records=normalized_records,
)
grading_report = report.to_dict()
graded_records = len(score_records)
if args.graded_output:
_write_replay_jsonl(Path(args.graded_output), score_records)
if args.grading_report:
_write_json(Path(args.grading_report), grading_report)
baseline_records = _read_replay_jsonl(Path(args.baseline))
report = score_replay_records(
baseline_records + score_records,
baseline_candidate_id=args.baseline_id,
min_incidents_for_canary=args.min_incidents,
).to_dict()
_write_json(Path(args.scorecard), report)
summary = _summary(
args=args,
contract_report=contract_report,
normalized_records=len(normalized_records),
graded_records=graded_records,
grading_report=grading_report,
scorecard_written=True,
)
if args.summary:
_write_json(Path(args.summary), summary)
print(json.dumps(summary, ensure_ascii=False, sort_keys=True))
return 0
def _summary(
*,
args,
contract_report: dict[str, Any],
normalized_records: int,
scorecard_written: bool,
graded_records: int = 0,
grading_report: dict[str, Any] | None = None,
) -> dict[str, Any]:
return {
"schema_version": "agent_replay_pipeline_report_v1",
"candidate_id": args.candidate_id,
"inputs": args.inputs,
"results": args.results,
"baseline": args.baseline,
"contract_report": args.contract_report,
"normalized_output": args.normalized_output,
"fixtures": args.fixtures,
"graded_output": args.graded_output,
"grading_report": args.grading_report,
"scorecard": args.scorecard,
"contract_valid": bool(contract_report.get("valid")),
"input_records": int(contract_report.get("inputs", 0)),
"result_records": int(contract_report.get("results", 0)),
"normalized_records": normalized_records,
"graded_records": graded_records,
"label_grading_applied": bool(grading_report),
"scorecard_written": scorecard_written,
}
def _read_jsonl(path: Path) -> list[dict[str, Any]]:
records: list[dict[str, Any]] = []
with path.open(encoding="utf-8") as handle:
for line_number, line in enumerate(handle, start=1):
line = line.strip()
if not line or line.startswith("#"):
continue
try:
records.append(json.loads(line))
except Exception as exc:
raise SystemExit(f"{path}:{line_number}: invalid JSONL: {exc}") from exc
return records
def _read_replay_jsonl(path: Path) -> list[AgentReplayRecord]:
return [AgentReplayRecord.from_dict(payload) for payload in _read_jsonl(path)]
def _write_replay_jsonl(path: Path, records: list[AgentReplayRecord]) -> None:
with path.open("w", encoding="utf-8") as handle:
for record in records:
handle.write(json.dumps(record.__dict__, ensure_ascii=False, sort_keys=True))
handle.write("\n")
def _write_json(path: Path, payload: dict[str, Any]) -> None:
path.write_text(
json.dumps(payload, ensure_ascii=False, indent=2, sort_keys=True) + "\n",
encoding="utf-8",
)
if __name__ == "__main__":
raise SystemExit(main())