274 lines
9.2 KiB
Python
274 lines
9.2 KiB
Python
import importlib
|
||
import inspect
|
||
import ast
|
||
from pathlib import Path
|
||
|
||
|
||
def _load_run_scheduler(monkeypatch):
|
||
monkeypatch.setenv("MOMO_ALLOW_INSECURE_CONFIG_FOR_TESTS", "true")
|
||
monkeypatch.setenv("USE_POSTGRESQL", "false")
|
||
return importlib.import_module("run_scheduler")
|
||
|
||
|
||
def _function_source(path, function_name):
|
||
source = Path(path).read_text()
|
||
tree = ast.parse(source)
|
||
for node in tree.body:
|
||
if isinstance(node, ast.FunctionDef) and node.name == function_name:
|
||
return ast.get_source_segment(source, node) or ""
|
||
raise AssertionError(f"{function_name} not found in {path}")
|
||
|
||
|
||
def test_embed_consistency_mismatch_notifies_event_router(monkeypatch):
|
||
run_scheduler = _load_run_scheduler(monkeypatch)
|
||
import services.rag_service as rag_service
|
||
|
||
monkeypatch.setattr(
|
||
rag_service,
|
||
"verify_embedding_consistency",
|
||
lambda: {
|
||
"ok": False,
|
||
"reachable": ["gcp_a", "gcp_b", "fallback_111"],
|
||
"max_diff": 0.125,
|
||
"signature": "abc123",
|
||
},
|
||
)
|
||
|
||
notifications = []
|
||
|
||
def fake_notify(task_name, error, **kwargs):
|
||
notifications.append((task_name, str(error), kwargs))
|
||
|
||
monkeypatch.setattr(run_scheduler, "_notify_scheduler_failure", fake_notify)
|
||
|
||
run_scheduler.run_embed_consistency_check()
|
||
|
||
assert notifications == [
|
||
(
|
||
"run_embed_consistency_check",
|
||
"BGE-M3 embedding consistency mismatch "
|
||
"reachable=['gcp_a', 'gcp_b', 'fallback_111'] "
|
||
"max_diff=1.25e-01 signature=abc123",
|
||
{
|
||
"source": "Scheduler.RAG",
|
||
"event_type": "embed_consistency_mismatch",
|
||
"title": "BGE-M3 一致性異常",
|
||
"dedup_ttl_sec": 86400,
|
||
},
|
||
)
|
||
]
|
||
|
||
|
||
def test_embed_consistency_ok_does_not_notify(monkeypatch):
|
||
run_scheduler = _load_run_scheduler(monkeypatch)
|
||
import services.rag_service as rag_service
|
||
|
||
monkeypatch.setattr(
|
||
rag_service,
|
||
"verify_embedding_consistency",
|
||
lambda: {
|
||
"ok": True,
|
||
"reachable": ["gcp_a", "gcp_b", "fallback_111"],
|
||
"max_diff": 0.0,
|
||
"signature": "abc123",
|
||
},
|
||
)
|
||
|
||
notifications = []
|
||
monkeypatch.setattr(
|
||
run_scheduler,
|
||
"_notify_scheduler_failure",
|
||
lambda *args, **kwargs: notifications.append((args, kwargs)),
|
||
)
|
||
|
||
run_scheduler.run_embed_consistency_check()
|
||
|
||
assert notifications == []
|
||
|
||
|
||
def test_notify_scheduler_failure_without_active_exception_uses_error_trace(monkeypatch):
|
||
run_scheduler = _load_run_scheduler(monkeypatch)
|
||
import services.event_router as event_router
|
||
|
||
calls = []
|
||
monkeypatch.setattr(
|
||
event_router,
|
||
"notify_failure",
|
||
lambda **kwargs: calls.append(kwargs),
|
||
)
|
||
|
||
run_scheduler._notify_scheduler_failure(
|
||
"synthetic_check",
|
||
RuntimeError("mismatch"),
|
||
source="Scheduler.Test",
|
||
event_type="synthetic_check_failed",
|
||
title="Synthetic check failed",
|
||
)
|
||
|
||
assert calls[0]["trace"] == "RuntimeError: mismatch"
|
||
|
||
|
||
def test_scheduler_observability_wrappers_notify_on_exception(monkeypatch):
|
||
run_scheduler = _load_run_scheduler(monkeypatch)
|
||
|
||
for fn_name in [
|
||
"run_host_health_probe",
|
||
"run_ai_calls_error_spike_check",
|
||
"run_observability_daily_summary",
|
||
"run_host_health_probe_cleanup",
|
||
]:
|
||
source = inspect.getsource(getattr(run_scheduler, fn_name))
|
||
assert "_notify_scheduler_failure(" in source
|
||
|
||
|
||
def test_host_health_transition_alert_keeps_db_dedup_window(monkeypatch):
|
||
run_scheduler = _load_run_scheduler(monkeypatch)
|
||
source = inspect.getsource(run_scheduler.run_host_health_probe)
|
||
|
||
assert "prev_healthy != rec['healthy']" in source
|
||
assert "recent_transition" in source
|
||
assert "INTERVAL '1 hour'" in source
|
||
assert "INTERVAL '90 minutes'" in source
|
||
assert "if recent_transition is None:" in source
|
||
assert "_push_host_transition_alert(tr)" in source
|
||
|
||
|
||
def test_host_health_probe_verifies_gcp_embedding_runtime(monkeypatch):
|
||
run_scheduler = _load_run_scheduler(monkeypatch)
|
||
|
||
class Resp:
|
||
status_code = 200
|
||
|
||
def json(self):
|
||
return {"embeddings": [[0.1, 0.2, 0.3]]}
|
||
|
||
ok, err = run_scheduler._probe_ollama_embedding_runtime(
|
||
type("Requests", (), {"post": staticmethod(lambda *args, **kwargs: Resp())}),
|
||
"http://34.21.145.224:11434",
|
||
)
|
||
|
||
assert ok is True
|
||
assert err is None
|
||
assert run_scheduler._host_health_model_probe_enabled("Primary (GCP)") is True
|
||
assert run_scheduler._host_health_model_probe_enabled("Secondary (GCP)") is True
|
||
assert run_scheduler._host_health_model_probe_enabled("Fallback (111)") is False
|
||
|
||
|
||
def test_host_health_embedding_probe_uses_30s_default_timeout(monkeypatch):
|
||
run_scheduler = _load_run_scheduler(monkeypatch)
|
||
monkeypatch.delenv("OLLAMA_HOST_HEALTH_EMBED_TIMEOUT", raising=False)
|
||
seen = {}
|
||
|
||
class Resp:
|
||
status_code = 200
|
||
|
||
def json(self):
|
||
return {"embeddings": [[0.1, 0.2, 0.3]]}
|
||
|
||
class Requests:
|
||
@staticmethod
|
||
def post(*_args, **kwargs):
|
||
seen["timeout"] = kwargs["timeout"]
|
||
return Resp()
|
||
|
||
ok, err = run_scheduler._probe_ollama_embedding_runtime(
|
||
Requests,
|
||
"http://34.21.145.224:11434",
|
||
)
|
||
|
||
assert ok is True
|
||
assert err is None
|
||
assert seen["timeout"] == 30.0
|
||
|
||
|
||
def test_host_health_probe_reports_embedding_runtime_failure(monkeypatch):
|
||
run_scheduler = _load_run_scheduler(monkeypatch)
|
||
|
||
class Requests:
|
||
@staticmethod
|
||
def post(*_args, **_kwargs):
|
||
raise TimeoutError("embed timeout")
|
||
|
||
ok, err = run_scheduler._probe_ollama_embedding_runtime(
|
||
Requests,
|
||
"http://34.21.145.224:11434",
|
||
)
|
||
|
||
assert ok is False
|
||
assert "EmbedProbe TimeoutError" in err
|
||
|
||
|
||
def test_v2_cron_blind_spot_list_has_failure_notifications(monkeypatch):
|
||
run_scheduler = _load_run_scheduler(monkeypatch)
|
||
|
||
for fn_name in [
|
||
"run_daily_token_report_task",
|
||
"run_promotion_gate_worker",
|
||
"run_awaiting_review_push",
|
||
"run_expire_stale_reviews",
|
||
"run_action_plan_hygiene_task",
|
||
"run_cost_throttle_evaluate",
|
||
"run_cost_throttle_reset_if_new_month",
|
||
"run_ppt_vision_audit",
|
||
"run_embed_consistency_check",
|
||
"run_ollama_111_usage_guard_check",
|
||
]:
|
||
source = inspect.getsource(getattr(run_scheduler, fn_name))
|
||
assert "_notify_scheduler_failure(" in source
|
||
|
||
promo_source = _function_source("scheduler.py", "run_promo_event_task")
|
||
assert "notify_failure(" in promo_source
|
||
|
||
|
||
def test_roi_ai_smoke_and_daily_report_schedules_stay_staggered():
|
||
source = Path("run_scheduler.py").read_text()
|
||
|
||
assert 'schedule.every().day.at("09:00").do(run_daily_report_task)' in source
|
||
assert 'schedule.every().day.at("09:05").do(run_roi_monthly_report_if_new_month)' in source
|
||
assert 'schedule.every().day.at("09:10").do(run_ai_smoke_daily_summary_task)' in source
|
||
assert 'schedule.every().day.at("10:30").do(run_pchome_match_backfill_task)' in source
|
||
assert 'schedule.every().day.at("10:45").do(run_pchome_growth_momo_backfill_task)' in source
|
||
assert "schedule.every(6).hours.do(run_action_plan_hygiene_task)" in source
|
||
assert "schedule.every(15).minutes.do(run_ollama_111_usage_guard_check)" in source
|
||
|
||
|
||
def test_ollama_111_usage_guard_stays_observational(monkeypatch):
|
||
run_scheduler = _load_run_scheduler(monkeypatch)
|
||
source = inspect.getsource(run_scheduler.run_ollama_111_usage_guard_check)
|
||
|
||
assert "OLLAMA_111_USAGE_ALERT_ENABLED" in source
|
||
assert "provider = 'ollama_111'" in source
|
||
assert "send_telegram_with_result" in source
|
||
assert "_notify_scheduler_failure(" in source
|
||
assert "只觀測 ai_calls,不改路由" in source
|
||
assert "UPDATE" not in source
|
||
assert "DELETE" not in source
|
||
|
||
|
||
def test_legacy_edm_and_seasonal_promo_schedules_are_opt_in(monkeypatch):
|
||
run_scheduler = _load_run_scheduler(monkeypatch)
|
||
source = inspect.getsource(run_scheduler._register_schedules)
|
||
|
||
monkeypatch.delenv("MOMO_ENABLE_LEGACY_EDM_SCHEDULE", raising=False)
|
||
monkeypatch.delenv("MOMO_ENABLE_SEASONAL_PROMO_SCHEDULE", raising=False)
|
||
assert run_scheduler._legacy_edm_schedule_enabled() is False
|
||
assert run_scheduler._seasonal_promo_schedule_enabled() is False
|
||
|
||
monkeypatch.setenv("MOMO_ENABLE_LEGACY_EDM_SCHEDULE", "true")
|
||
monkeypatch.setenv("MOMO_ENABLE_SEASONAL_PROMO_SCHEDULE", "1")
|
||
assert run_scheduler._legacy_edm_schedule_enabled() is True
|
||
assert run_scheduler._seasonal_promo_schedule_enabled() is True
|
||
|
||
assert "if _legacy_edm_schedule_enabled():" in source
|
||
assert "if not _seasonal_promo_schedule_enabled():" in source
|
||
assert "MOMO_ENABLE_LEGACY_EDM_SCHEDULE" in Path("run_scheduler.py").read_text()
|
||
assert "MOMO_ENABLE_SEASONAL_PROMO_SCHEDULE" in Path("run_scheduler.py").read_text()
|
||
|
||
|
||
def test_ai_smoke_daily_summary_refreshes_smoke_before_push(monkeypatch):
|
||
run_scheduler = _load_run_scheduler(monkeypatch)
|
||
source = inspect.getsource(run_scheduler.run_ai_smoke_daily_summary_task)
|
||
|
||
assert "collect_ai_automation_smoke(record_history=True" in source
|
||
assert "send_smoke_daily_summary()" in source
|