from __future__ import annotations import json from src.services.agent_market_watch import FetchedSource from src.services.ai_technology_watch import run_ai_technology_watch def test_ai_technology_watch_groups_areas_and_blocks_integration(): registry = { "schema_version": "ai_technology_watch_sources_v1", "updated_at": "2026-06-25", "cadence": { "near_real_time_watch": "每 6 小時", "daily_triage": "每日", "weekly_scorecard": "每週", "monthly_strategy_review": "每月", }, "policy": { "read_only": True, "sdk_installation_approved": False, "paid_api_calls_approved": False, "production_routing_approved": False, "telegram_send_approved": False, "model_provider_switch_approved": False, "host_write_approved": False, }, "candidates": [ { "candidate_id": "langgraph_runtime", "display_name": "LangGraph", "technology_area": "agent_frameworks", "integration_surface": "durable_workflow_human_in_loop", "awoooi_role": "事件工作流候選", "evaluation_priority": "p0", "requires_cost_approval": False, "requires_dependency_approval": True, "requires_security_review": True, "sources": [ { "source_id": "langgraph_pypi", "type": "pypi", "url": "https://pypi.org/pypi/langgraph/json", "reference_version": "1.0.0", } ], }, { "candidate_id": "ragas_eval", "display_name": "Ragas", "technology_area": "evaluation_and_observability", "integration_surface": "rag_evaluation", "awoooi_role": "RAG 品質評測候選", "evaluation_priority": "p1", "requires_cost_approval": False, "requires_dependency_approval": True, "requires_security_review": True, "sources": [ { "source_id": "ragas_pypi", "type": "pypi", "url": "https://pypi.org/pypi/ragas/json", "reference_version": "0.1.0", } ], }, ], } def fetcher(url: str, _timeout: int) -> FetchedSource: version = "1.1.0" if "langgraph" in url else "0.1.0" payload = { "info": {"version": version}, "releases": { version: [{"upload_time_iso_8601": "2026-06-25T01:02:03Z"}] }, } return FetchedSource(status="ok", http_status=200, body=json.dumps(payload).encode()) report = run_ai_technology_watch( registry, registry_path="docs/ai/ai-technology-watch-sources.v1.json", mode="live", fetcher=fetcher, generated_at="2026-06-25T00:00:00+00:00", ) assert report["schema_version"] == "ai_technology_watch_report_v1" assert report["summary"]["technology_count"] == 2 assert report["summary"]["technology_area_count"] == 2 assert report["summary"]["changed_technologies"] == 1 assert report["summary"]["review_queue_count"] == 1 assert report["summary"]["high_priority_count"] == 2 assert report["technology_area_counts"]["agent_frameworks"] == 1 assert report["technology_area_counts"]["evaluation_and_observability"] == 1 assert report["policy"]["read_only"] is True assert report["policy"]["sdk_installation_approved"] is False assert report["policy"]["paid_api_calls_approved"] is False assert report["policy"]["production_routing_approved"] is False assert report["policy"]["telegram_send_approved"] is False assert report["review_queue"][0]["technology_id"] == "langgraph_runtime" def test_ai_technology_watch_reuses_previous_report_for_change_detection(): registry = { "schema_version": "ai_technology_watch_sources_v1", "updated_at": "2026-06-25", "policy": {"read_only": True}, "candidates": [ { "candidate_id": "mcp_sdk", "display_name": "MCP SDK", "technology_area": "mcp_and_a2a", "evaluation_priority": "p0", "sources": [ { "source_id": "mcp_npm", "type": "npm", "url": "https://registry.npmjs.org/%40modelcontextprotocol%2Fsdk", } ], } ], } def fetcher(_url: str, _timeout: int) -> FetchedSource: payload = {"dist-tags": {"latest": "1.2.3"}, "time": {"1.2.3": "2026-06-25"}} return FetchedSource(status="ok", http_status=200, body=json.dumps(payload).encode()) first_report = run_ai_technology_watch( registry, registry_path="registry.json", mode="live", fetcher=fetcher, generated_at="2026-06-25T00:00:00+00:00", ) second_report = run_ai_technology_watch( registry, registry_path="registry.json", mode="live", previous_report=first_report, fetcher=fetcher, generated_at="2026-06-25T01:00:00+00:00", ) assert first_report["summary"]["changed_technologies"] == 0 assert second_report["summary"]["changed_technologies"] == 0 assert second_report["technologies"][0]["sources"][0]["version"] == "1.2.3"