Files
awoooi/scripts/agents/nemotron-external-runner-readiness.py
Your Name cfb866d055
Some checks failed
Ansible Lint / lint (push) Successful in 35s
CD Pipeline / tests (push) Failing after 13s
CD Pipeline / build-and-deploy (push) Has been skipped
CD Pipeline / post-deploy-checks (push) Has been skipped
Code Review / ai-code-review (push) Failing after 11s
feat(governance): add agent market automation surfaces
2026-06-04 21:50:55 +08:00

74 lines
2.3 KiB
Python

#!/usr/bin/env python3
"""
Evaluate the final local readiness gate before an external NeMo runner is used.
This command is read-only and local. It does not call NIM, NVIDIA APIs,
production tools, or LLMs.
"""
from __future__ import annotations
import argparse
import json
import sys
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[2]
API_SRC = ROOT / "apps" / "api"
sys.path.insert(0, str(API_SRC))
from src.services.agent_nemotron_external_runner_readiness import ( # noqa: E402
DEFAULT_MINIMUM_RECORDS,
evaluate_nemotron_external_runner_readiness,
)
def main() -> int:
parser = argparse.ArgumentParser(
description="Evaluate NeMo/Nemotron external runner readiness."
)
parser.add_argument("--manifest", required=True, help="external runner manifest JSON")
parser.add_argument("--sanitize-report", required=True, help="sanitize report JSON")
parser.add_argument(
"--sanitized-preflight",
required=True,
help="sanitized external runner preflight report JSON",
)
parser.add_argument(
"--minimum-records",
type=int,
default=DEFAULT_MINIMUM_RECORDS,
help="minimum request records required before readiness can pass",
)
parser.add_argument("--output", help="readiness report JSON")
args = parser.parse_args()
report = evaluate_nemotron_external_runner_readiness(
manifest=_read_json(Path(args.manifest)),
sanitize_report=_read_json(Path(args.sanitize_report)),
sanitized_preflight=_read_json(Path(args.sanitized_preflight)),
minimum_records=args.minimum_records,
).to_dict()
rendered = json.dumps(report, ensure_ascii=False, indent=2, sort_keys=True)
if args.output:
Path(args.output).write_text(rendered + "\n", encoding="utf-8")
else:
print(rendered)
return 0 if report["ready"] else 2
def _read_json(path: Path) -> dict[str, Any]:
try:
payload = json.loads(path.read_text(encoding="utf-8"))
except Exception as exc:
raise SystemExit(f"{path}: invalid JSON: {exc}") from exc
if not isinstance(payload, dict):
raise SystemExit(f"{path}: expected JSON object")
return payload
if __name__ == "__main__":
raise SystemExit(main())