Files
awoooi/scripts/agents/replay-reference-candidate.py
Your Name cfb866d055
Some checks failed
Ansible Lint / lint (push) Successful in 35s
CD Pipeline / tests (push) Failing after 13s
CD Pipeline / build-and-deploy (push) Has been skipped
CD Pipeline / post-deploy-checks (push) Has been skipped
Code Review / ai-code-review (push) Failing after 11s
feat(governance): add agent market automation surfaces
2026-06-04 21:50:55 +08:00

87 lines
2.4 KiB
Python

#!/usr/bin/env python3
"""
Deterministic no-LLM reference adapter for Agent replacement replay smoke tests.
This adapter is smoke-only. It is not a market candidate and must not be used as
replacement evidence.
"""
from __future__ import annotations
import argparse
import json
import sys
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[2]
API_SRC = ROOT / "apps" / "api"
sys.path.insert(0, str(API_SRC))
from src.services.agent_reference_adapter import ( # noqa: E402
build_reference_candidate_results,
)
def main() -> int:
parser = argparse.ArgumentParser(
description="Run the deterministic reference replay adapter."
)
parser.add_argument("--inputs", required=True, help="candidate input JSONL")
parser.add_argument("--output", required=True, help="candidate raw result JSONL")
parser.add_argument(
"--candidate-id",
default="reference_deterministic_adapter",
help="candidate_id to emit",
)
parser.add_argument(
"--candidate-role",
default="contract_smoke_adapter",
help="candidate_role to emit",
)
args = parser.parse_args()
results = build_reference_candidate_results(
_read_jsonl(Path(args.inputs)),
candidate_id=args.candidate_id,
candidate_role=args.candidate_role,
)
with Path(args.output).open("w", encoding="utf-8") as handle:
for result in results:
handle.write(json.dumps(result.to_dict(), ensure_ascii=False, sort_keys=True))
handle.write("\n")
print(
json.dumps(
{
"inputs": args.inputs,
"output": args.output,
"candidate_id": args.candidate_id,
"records": len(results),
"smoke_only": True,
},
ensure_ascii=False,
sort_keys=True,
)
)
return 0
def _read_jsonl(path: Path) -> list[dict[str, Any]]:
records: list[dict[str, Any]] = []
with path.open(encoding="utf-8") as handle:
for line_number, line in enumerate(handle, start=1):
line = line.strip()
if not line or line.startswith("#"):
continue
try:
records.append(json.loads(line))
except Exception as exc:
raise SystemExit(f"{path}:{line_number}: invalid JSONL: {exc}") from exc
return records
if __name__ == "__main__":
raise SystemExit(main())