import importlib import inspect import ast from pathlib import Path def _load_run_scheduler(monkeypatch): monkeypatch.setenv("MOMO_ALLOW_INSECURE_CONFIG_FOR_TESTS", "true") monkeypatch.setenv("USE_POSTGRESQL", "false") return importlib.import_module("run_scheduler") def _function_source(path, function_name): source = Path(path).read_text() tree = ast.parse(source) for node in tree.body: if isinstance(node, ast.FunctionDef) and node.name == function_name: return ast.get_source_segment(source, node) or "" raise AssertionError(f"{function_name} not found in {path}") def test_embed_consistency_mismatch_notifies_event_router(monkeypatch): run_scheduler = _load_run_scheduler(monkeypatch) import services.rag_service as rag_service monkeypatch.setattr( rag_service, "verify_embedding_consistency", lambda: { "ok": False, "reachable": ["gcp_a", "gcp_b", "fallback_111"], "max_diff": 0.125, "signature": "abc123", }, ) notifications = [] def fake_notify(task_name, error, **kwargs): notifications.append((task_name, str(error), kwargs)) monkeypatch.setattr(run_scheduler, "_notify_scheduler_failure", fake_notify) run_scheduler.run_embed_consistency_check() assert notifications == [ ( "run_embed_consistency_check", "BGE-M3 embedding consistency mismatch " "reachable=['gcp_a', 'gcp_b', 'fallback_111'] " "max_diff=1.25e-01 signature=abc123", { "source": "Scheduler.RAG", "event_type": "embed_consistency_mismatch", "title": "BGE-M3 一致性異常", "dedup_ttl_sec": 86400, }, ) ] def test_embed_consistency_ok_does_not_notify(monkeypatch): run_scheduler = _load_run_scheduler(monkeypatch) import services.rag_service as rag_service monkeypatch.setattr( rag_service, "verify_embedding_consistency", lambda: { "ok": True, "reachable": ["gcp_a", "gcp_b", "fallback_111"], "max_diff": 0.0, "signature": "abc123", }, ) notifications = [] monkeypatch.setattr( run_scheduler, "_notify_scheduler_failure", lambda *args, **kwargs: notifications.append((args, kwargs)), ) run_scheduler.run_embed_consistency_check() assert notifications == [] def test_notify_scheduler_failure_without_active_exception_uses_error_trace(monkeypatch): run_scheduler = _load_run_scheduler(monkeypatch) import services.event_router as event_router calls = [] monkeypatch.setattr( event_router, "notify_failure", lambda **kwargs: calls.append(kwargs), ) run_scheduler._notify_scheduler_failure( "synthetic_check", RuntimeError("mismatch"), source="Scheduler.Test", event_type="synthetic_check_failed", title="Synthetic check failed", ) assert calls[0]["trace"] == "RuntimeError: mismatch" def test_scheduler_observability_wrappers_notify_on_exception(monkeypatch): run_scheduler = _load_run_scheduler(monkeypatch) for fn_name in [ "run_host_health_probe", "run_ai_calls_error_spike_check", "run_observability_daily_summary", "run_host_health_probe_cleanup", ]: source = inspect.getsource(getattr(run_scheduler, fn_name)) assert "_notify_scheduler_failure(" in source def test_host_health_transition_alert_keeps_db_dedup_window(monkeypatch): run_scheduler = _load_run_scheduler(monkeypatch) source = inspect.getsource(run_scheduler.run_host_health_probe) assert "prev_healthy != rec['healthy']" in source assert "recent_transition" in source assert "INTERVAL '1 hour'" in source assert "INTERVAL '90 minutes'" in source assert "if recent_transition is None:" in source assert "_push_host_transition_alert(tr)" in source def test_host_health_probe_verifies_gcp_embedding_runtime(monkeypatch): run_scheduler = _load_run_scheduler(monkeypatch) class Resp: status_code = 200 def json(self): return {"embeddings": [[0.1, 0.2, 0.3]]} ok, err = run_scheduler._probe_ollama_embedding_runtime( type("Requests", (), {"post": staticmethod(lambda *args, **kwargs: Resp())}), "http://34.21.145.224:11434", ) assert ok is True assert err is None assert run_scheduler._host_health_model_probe_enabled("Primary (GCP)") is True assert run_scheduler._host_health_model_probe_enabled("Secondary (GCP)") is True assert run_scheduler._host_health_model_probe_enabled("Fallback (111)") is False def test_host_health_embedding_probe_uses_30s_default_timeout(monkeypatch): run_scheduler = _load_run_scheduler(monkeypatch) monkeypatch.delenv("OLLAMA_HOST_HEALTH_EMBED_TIMEOUT", raising=False) seen = {} class Resp: status_code = 200 def json(self): return {"embeddings": [[0.1, 0.2, 0.3]]} class Requests: @staticmethod def post(*_args, **kwargs): seen["timeout"] = kwargs["timeout"] return Resp() ok, err = run_scheduler._probe_ollama_embedding_runtime( Requests, "http://34.21.145.224:11434", ) assert ok is True assert err is None assert seen["timeout"] == 30.0 def test_host_health_probe_reports_embedding_runtime_failure(monkeypatch): run_scheduler = _load_run_scheduler(monkeypatch) class Requests: @staticmethod def post(*_args, **_kwargs): raise TimeoutError("embed timeout") ok, err = run_scheduler._probe_ollama_embedding_runtime( Requests, "http://34.21.145.224:11434", ) assert ok is False assert "EmbedProbe TimeoutError" in err def test_v2_cron_blind_spot_list_has_failure_notifications(monkeypatch): run_scheduler = _load_run_scheduler(monkeypatch) for fn_name in [ "run_daily_token_report_task", "run_promotion_gate_worker", "run_awaiting_review_push", "run_expire_stale_reviews", "run_action_plan_hygiene_task", "run_cost_throttle_evaluate", "run_cost_throttle_reset_if_new_month", "run_ppt_vision_audit", "run_embed_consistency_check", "run_ollama_111_usage_guard_check", ]: source = inspect.getsource(getattr(run_scheduler, fn_name)) assert "_notify_scheduler_failure(" in source promo_source = _function_source("scheduler.py", "run_promo_event_task") assert "notify_failure(" in promo_source def test_roi_ai_smoke_and_daily_report_schedules_stay_staggered(): source = Path("run_scheduler.py").read_text() assert 'schedule.every().day.at("09:00").do(run_daily_report_task)' in source assert 'schedule.every().day.at("09:05").do(run_roi_monthly_report_if_new_month)' in source assert 'schedule.every().day.at("09:10").do(run_ai_smoke_daily_summary_task)' in source assert 'schedule.every().day.at("10:30").do(run_pchome_match_backfill_task)' in source assert 'schedule.every().day.at("10:45").do(run_pchome_growth_momo_backfill_task)' in source assert "schedule.every(6).hours.do(run_action_plan_hygiene_task)" in source assert "schedule.every(15).minutes.do(run_ollama_111_usage_guard_check)" in source def test_ollama_111_usage_guard_stays_observational(monkeypatch): run_scheduler = _load_run_scheduler(monkeypatch) source = inspect.getsource(run_scheduler.run_ollama_111_usage_guard_check) assert "OLLAMA_111_USAGE_ALERT_ENABLED" in source assert "provider = 'ollama_111'" in source assert "send_telegram_with_result" in source assert "_notify_scheduler_failure(" in source assert "只觀測 ai_calls,不改路由" in source assert "UPDATE" not in source assert "DELETE" not in source def test_legacy_edm_and_seasonal_promo_schedules_are_opt_in(monkeypatch): run_scheduler = _load_run_scheduler(monkeypatch) source = inspect.getsource(run_scheduler._register_schedules) monkeypatch.delenv("MOMO_ENABLE_LEGACY_EDM_SCHEDULE", raising=False) monkeypatch.delenv("MOMO_ENABLE_SEASONAL_PROMO_SCHEDULE", raising=False) assert run_scheduler._legacy_edm_schedule_enabled() is False assert run_scheduler._seasonal_promo_schedule_enabled() is False monkeypatch.setenv("MOMO_ENABLE_LEGACY_EDM_SCHEDULE", "true") monkeypatch.setenv("MOMO_ENABLE_SEASONAL_PROMO_SCHEDULE", "1") assert run_scheduler._legacy_edm_schedule_enabled() is True assert run_scheduler._seasonal_promo_schedule_enabled() is True assert "if _legacy_edm_schedule_enabled():" in source assert "if not _seasonal_promo_schedule_enabled():" in source assert "MOMO_ENABLE_LEGACY_EDM_SCHEDULE" in Path("run_scheduler.py").read_text() assert "MOMO_ENABLE_SEASONAL_PROMO_SCHEDULE" in Path("run_scheduler.py").read_text() def test_ai_smoke_daily_summary_refreshes_smoke_before_push(monkeypatch): run_scheduler = _load_run_scheduler(monkeypatch) source = inspect.getsource(run_scheduler.run_ai_smoke_daily_summary_task) assert "collect_ai_automation_smoke(record_history=True" in source assert "send_smoke_daily_summary()" in source