Files
awoooi/scripts/agents/ai-technology-watch.py
Your Name 210577de28
Some checks failed
Code Review / ai-code-review (push) Successful in 13s
CD Pipeline / tests (push) Successful in 1m39s
CD Pipeline / build-and-deploy (push) Successful in 4m35s
CD Pipeline / post-deploy-checks (push) Successful in 1m51s
Ansible / Reboot Recovery Contract / validate (push) Has been cancelled
feat(governance): 新增 AI 技術雷達滾動監控
2026-06-25 11:57:38 +08:00

83 lines
2.6 KiB
Python

#!/usr/bin/env python3
"""Run the AWOOOI read-only AI technology watch."""
from __future__ import annotations
import argparse
import importlib.util
import json
import sys
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[2]
API_ROOT = ROOT / "apps" / "api"
SERVICE_PATH = ROOT / "apps" / "api" / "src" / "services" / "ai_technology_watch.py"
run_ai_technology_watch = None
def main() -> int:
global run_ai_technology_watch
if run_ai_technology_watch is None:
run_ai_technology_watch = _load_service()
parser = argparse.ArgumentParser(description="Run AWOOOI AI technology watch.")
parser.add_argument(
"--registry",
default="docs/ai/ai-technology-watch-sources.v1.json",
help="AI technology watch registry JSON",
)
parser.add_argument("--output", required=True, help="report output JSON")
parser.add_argument(
"--mode",
choices=("offline", "live"),
default="live",
help="offline validates registry only; live fetches primary sources",
)
parser.add_argument("--previous-report")
parser.add_argument("--timeout-seconds", type=int, default=12)
args = parser.parse_args()
registry = _read_json(Path(args.registry))
previous = _read_json(Path(args.previous_report)) if args.previous_report else None
report = run_ai_technology_watch(
registry,
registry_path=args.registry,
mode=args.mode,
previous_report=previous,
timeout_seconds=args.timeout_seconds,
)
Path(args.output).write_text(
json.dumps(report, ensure_ascii=False, indent=2, sort_keys=True) + "\n",
encoding="utf-8",
)
print(json.dumps(report["summary"], ensure_ascii=False, sort_keys=True))
return 0
def _read_json(path: Path) -> dict[str, Any]:
with path.open(encoding="utf-8") as handle:
payload = json.load(handle)
if not isinstance(payload, dict):
raise SystemExit(f"{path}: expected JSON object")
return payload
def _load_service() -> Any:
api_root = str(API_ROOT)
if api_root not in sys.path:
sys.path.insert(0, api_root)
module_name = "awoooi_ai_technology_watch_service"
spec = importlib.util.spec_from_file_location(module_name, SERVICE_PATH)
if spec is None or spec.loader is None:
raise SystemExit(f"cannot load AI technology watch service from {SERVICE_PATH}")
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module
spec.loader.exec_module(module)
return module.run_ai_technology_watch
if __name__ == "__main__":
raise SystemExit(main())