Files
ewoooc/tests/test_run_scheduler_embed_consistency.py
OoO 4c59b74ced
All checks were successful
CD Pipeline / deploy (push) Successful in 1m11s
feat: schedule growth momo backfill
2026-06-19 00:18:53 +08:00

274 lines
9.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import importlib
import inspect
import ast
from pathlib import Path
def _load_run_scheduler(monkeypatch):
monkeypatch.setenv("MOMO_ALLOW_INSECURE_CONFIG_FOR_TESTS", "true")
monkeypatch.setenv("USE_POSTGRESQL", "false")
return importlib.import_module("run_scheduler")
def _function_source(path, function_name):
source = Path(path).read_text()
tree = ast.parse(source)
for node in tree.body:
if isinstance(node, ast.FunctionDef) and node.name == function_name:
return ast.get_source_segment(source, node) or ""
raise AssertionError(f"{function_name} not found in {path}")
def test_embed_consistency_mismatch_notifies_event_router(monkeypatch):
run_scheduler = _load_run_scheduler(monkeypatch)
import services.rag_service as rag_service
monkeypatch.setattr(
rag_service,
"verify_embedding_consistency",
lambda: {
"ok": False,
"reachable": ["gcp_a", "gcp_b", "fallback_111"],
"max_diff": 0.125,
"signature": "abc123",
},
)
notifications = []
def fake_notify(task_name, error, **kwargs):
notifications.append((task_name, str(error), kwargs))
monkeypatch.setattr(run_scheduler, "_notify_scheduler_failure", fake_notify)
run_scheduler.run_embed_consistency_check()
assert notifications == [
(
"run_embed_consistency_check",
"BGE-M3 embedding consistency mismatch "
"reachable=['gcp_a', 'gcp_b', 'fallback_111'] "
"max_diff=1.25e-01 signature=abc123",
{
"source": "Scheduler.RAG",
"event_type": "embed_consistency_mismatch",
"title": "BGE-M3 一致性異常",
"dedup_ttl_sec": 86400,
},
)
]
def test_embed_consistency_ok_does_not_notify(monkeypatch):
run_scheduler = _load_run_scheduler(monkeypatch)
import services.rag_service as rag_service
monkeypatch.setattr(
rag_service,
"verify_embedding_consistency",
lambda: {
"ok": True,
"reachable": ["gcp_a", "gcp_b", "fallback_111"],
"max_diff": 0.0,
"signature": "abc123",
},
)
notifications = []
monkeypatch.setattr(
run_scheduler,
"_notify_scheduler_failure",
lambda *args, **kwargs: notifications.append((args, kwargs)),
)
run_scheduler.run_embed_consistency_check()
assert notifications == []
def test_notify_scheduler_failure_without_active_exception_uses_error_trace(monkeypatch):
run_scheduler = _load_run_scheduler(monkeypatch)
import services.event_router as event_router
calls = []
monkeypatch.setattr(
event_router,
"notify_failure",
lambda **kwargs: calls.append(kwargs),
)
run_scheduler._notify_scheduler_failure(
"synthetic_check",
RuntimeError("mismatch"),
source="Scheduler.Test",
event_type="synthetic_check_failed",
title="Synthetic check failed",
)
assert calls[0]["trace"] == "RuntimeError: mismatch"
def test_scheduler_observability_wrappers_notify_on_exception(monkeypatch):
run_scheduler = _load_run_scheduler(monkeypatch)
for fn_name in [
"run_host_health_probe",
"run_ai_calls_error_spike_check",
"run_observability_daily_summary",
"run_host_health_probe_cleanup",
]:
source = inspect.getsource(getattr(run_scheduler, fn_name))
assert "_notify_scheduler_failure(" in source
def test_host_health_transition_alert_keeps_db_dedup_window(monkeypatch):
run_scheduler = _load_run_scheduler(monkeypatch)
source = inspect.getsource(run_scheduler.run_host_health_probe)
assert "prev_healthy != rec['healthy']" in source
assert "recent_transition" in source
assert "INTERVAL '1 hour'" in source
assert "INTERVAL '90 minutes'" in source
assert "if recent_transition is None:" in source
assert "_push_host_transition_alert(tr)" in source
def test_host_health_probe_verifies_gcp_embedding_runtime(monkeypatch):
run_scheduler = _load_run_scheduler(monkeypatch)
class Resp:
status_code = 200
def json(self):
return {"embeddings": [[0.1, 0.2, 0.3]]}
ok, err = run_scheduler._probe_ollama_embedding_runtime(
type("Requests", (), {"post": staticmethod(lambda *args, **kwargs: Resp())}),
"http://34.21.145.224:11434",
)
assert ok is True
assert err is None
assert run_scheduler._host_health_model_probe_enabled("Primary (GCP)") is True
assert run_scheduler._host_health_model_probe_enabled("Secondary (GCP)") is True
assert run_scheduler._host_health_model_probe_enabled("Fallback (111)") is False
def test_host_health_embedding_probe_uses_30s_default_timeout(monkeypatch):
run_scheduler = _load_run_scheduler(monkeypatch)
monkeypatch.delenv("OLLAMA_HOST_HEALTH_EMBED_TIMEOUT", raising=False)
seen = {}
class Resp:
status_code = 200
def json(self):
return {"embeddings": [[0.1, 0.2, 0.3]]}
class Requests:
@staticmethod
def post(*_args, **kwargs):
seen["timeout"] = kwargs["timeout"]
return Resp()
ok, err = run_scheduler._probe_ollama_embedding_runtime(
Requests,
"http://34.21.145.224:11434",
)
assert ok is True
assert err is None
assert seen["timeout"] == 30.0
def test_host_health_probe_reports_embedding_runtime_failure(monkeypatch):
run_scheduler = _load_run_scheduler(monkeypatch)
class Requests:
@staticmethod
def post(*_args, **_kwargs):
raise TimeoutError("embed timeout")
ok, err = run_scheduler._probe_ollama_embedding_runtime(
Requests,
"http://34.21.145.224:11434",
)
assert ok is False
assert "EmbedProbe TimeoutError" in err
def test_v2_cron_blind_spot_list_has_failure_notifications(monkeypatch):
run_scheduler = _load_run_scheduler(monkeypatch)
for fn_name in [
"run_daily_token_report_task",
"run_promotion_gate_worker",
"run_awaiting_review_push",
"run_expire_stale_reviews",
"run_action_plan_hygiene_task",
"run_cost_throttle_evaluate",
"run_cost_throttle_reset_if_new_month",
"run_ppt_vision_audit",
"run_embed_consistency_check",
"run_ollama_111_usage_guard_check",
]:
source = inspect.getsource(getattr(run_scheduler, fn_name))
assert "_notify_scheduler_failure(" in source
promo_source = _function_source("scheduler.py", "run_promo_event_task")
assert "notify_failure(" in promo_source
def test_roi_ai_smoke_and_daily_report_schedules_stay_staggered():
source = Path("run_scheduler.py").read_text()
assert 'schedule.every().day.at("09:00").do(run_daily_report_task)' in source
assert 'schedule.every().day.at("09:05").do(run_roi_monthly_report_if_new_month)' in source
assert 'schedule.every().day.at("09:10").do(run_ai_smoke_daily_summary_task)' in source
assert 'schedule.every().day.at("10:30").do(run_pchome_match_backfill_task)' in source
assert 'schedule.every().day.at("10:45").do(run_pchome_growth_momo_backfill_task)' in source
assert "schedule.every(6).hours.do(run_action_plan_hygiene_task)" in source
assert "schedule.every(15).minutes.do(run_ollama_111_usage_guard_check)" in source
def test_ollama_111_usage_guard_stays_observational(monkeypatch):
run_scheduler = _load_run_scheduler(monkeypatch)
source = inspect.getsource(run_scheduler.run_ollama_111_usage_guard_check)
assert "OLLAMA_111_USAGE_ALERT_ENABLED" in source
assert "provider = 'ollama_111'" in source
assert "send_telegram_with_result" in source
assert "_notify_scheduler_failure(" in source
assert "只觀測 ai_calls不改路由" in source
assert "UPDATE" not in source
assert "DELETE" not in source
def test_legacy_edm_and_seasonal_promo_schedules_are_opt_in(monkeypatch):
run_scheduler = _load_run_scheduler(monkeypatch)
source = inspect.getsource(run_scheduler._register_schedules)
monkeypatch.delenv("MOMO_ENABLE_LEGACY_EDM_SCHEDULE", raising=False)
monkeypatch.delenv("MOMO_ENABLE_SEASONAL_PROMO_SCHEDULE", raising=False)
assert run_scheduler._legacy_edm_schedule_enabled() is False
assert run_scheduler._seasonal_promo_schedule_enabled() is False
monkeypatch.setenv("MOMO_ENABLE_LEGACY_EDM_SCHEDULE", "true")
monkeypatch.setenv("MOMO_ENABLE_SEASONAL_PROMO_SCHEDULE", "1")
assert run_scheduler._legacy_edm_schedule_enabled() is True
assert run_scheduler._seasonal_promo_schedule_enabled() is True
assert "if _legacy_edm_schedule_enabled():" in source
assert "if not _seasonal_promo_schedule_enabled():" in source
assert "MOMO_ENABLE_LEGACY_EDM_SCHEDULE" in Path("run_scheduler.py").read_text()
assert "MOMO_ENABLE_SEASONAL_PROMO_SCHEDULE" in Path("run_scheduler.py").read_text()
def test_ai_smoke_daily_summary_refreshes_smoke_before_push(monkeypatch):
run_scheduler = _load_run_scheduler(monkeypatch)
source = inspect.getsource(run_scheduler.run_ai_smoke_daily_summary_task)
assert "collect_ai_automation_smoke(record_history=True" in source
assert "send_smoke_daily_summary()" in source