#!/usr/bin/env python3 # -*- coding: utf-8 -*- """Market intelligence candidate queue writer CLI. This script prints a JSON gate status by default. Real queue writes require --execute, --apply-real-write, --read-only-preflight, backup/smoke confirmations, and a one-time approval token. """ import argparse import contextlib import json import os import sys from pathlib import Path REPO_ROOT = Path(__file__).resolve().parents[1] if str(REPO_ROOT) not in sys.path: sys.path.insert(0, str(REPO_ROOT)) with contextlib.redirect_stdout(sys.stderr): from services.market_intel import MarketIntelService # noqa: E402 from services.market_intel.candidate_queue_writer_cli import ( # noqa: E402 APPROVAL_ENV_VAR, build_candidate_queue_writer_cli_plan, ) from services.market_intel.candidate_queue_writer_preflight import ( # noqa: E402 build_candidate_queue_writer_preflight, ) def parse_args(argv=None): parser = argparse.ArgumentParser( description="Preview candidate queue writer execution gates." ) parser.add_argument( "--sample-json", default=None, help="Path to one manual sample result JSON file. Default: empty payload.", ) parser.add_argument( "--execute", action="store_true", help="Request real execution.", ) parser.add_argument( "--apply-real-write", action="store_true", help="Request the guarded queue write transaction.", ) parser.add_argument( "--approval-token", default=None, help=f"One-time approval token checked against {APPROVAL_ENV_VAR}.", ) parser.add_argument( "--read-only-preflight", action="store_true", help="Run read-only schema / payload preflight before returning the gate.", ) parser.add_argument( "--backup-verified", action="store_true", help="Confirm a fresh operator backup before allowing a real write.", ) parser.add_argument( "--migration-live-smoke-passed", action="store_true", help="Confirm live schema smoke passed before allowing a real write.", ) return parser.parse_args(argv) def _load_sample_result(path): if not path: return {} with open(path, encoding="utf-8") as handle: payload = json.load(handle) return payload.get("sample_result", payload) if isinstance(payload, dict) else payload def main(argv=None): args = parse_args(argv) sample_result = _load_sample_result(args.sample_json) if not isinstance(sample_result, dict): sample_result = None payload_error = "invalid_json_object" else: payload_error = None service = MarketIntelService() transaction_preview = service.build_manual_sample_candidate_queue_transaction( sample_result=sample_result, payload_error=payload_error, ) writer_preflight = build_candidate_queue_writer_preflight( transaction_preview=transaction_preview, execute_requested=args.read_only_preflight, ) plan = build_candidate_queue_writer_cli_plan( transaction_preview=transaction_preview, writer_preflight=writer_preflight, execute_requested=args.execute, apply_real_write=args.apply_real_write, approval_token=args.approval_token, approval_token_secret=os.getenv(APPROVAL_ENV_VAR), backup_verified=args.backup_verified, migration_live_smoke_passed=args.migration_live_smoke_passed, ) plan["phase"] = service.phase print(json.dumps(plan, ensure_ascii=False, indent=2, sort_keys=True)) return int(plan.get("exit_code", 2)) if __name__ == "__main__": raise SystemExit(main())