Files
ewoooc/run_scheduler.py
OoO 97c446303c
Some checks failed
CD Pipeline / deploy (push) Has been cancelled
feat(p11.0): BGE-M3 跨主機一致性驗證 + 每週日 04:30 cron
Operation Ollama-First v5.0 / Phase 11.0 收尾(ADR-033 護欄 #3 完整落地)

services/rag_service.py 新增:
- verify_embedding_consistency() — 跨三主機 BGE-M3 embedding 一致性驗證
  測試文字「momo電商競品分析測試向量一致性檢查」分別呼叫 GCP Primary /
  Secondary / 111 三主機,計算兩兩 cosine 距離。
  max_diff > 1e-4 視為不一致(模型版本漂移)→ logger.error。
- _cosine_distance() — 純 Python,不依賴 numpy
- fail-safe:< 2 主機可達也回 ok=True(戰時部分主機暫斷不算錯)

run_scheduler.py 新增:
- run_embed_consistency_check task wrapper
- schedule.every().sunday.at("04:30").do(...) — 每週一次足夠
  (不需每次啟動驗證,過頻會打三主機 Ollama 浪費)

落地 ADR-033 護欄 #3 完整版:
  簽名鎖定(migration 026 embedding_signature 欄位) 既有
  程式端簽名計算(rag_service.get_embedding_signature) 既有
  RAG 查詢時簽名比對過濾(rag_service._select_hits) 既有
  跨主機一致性驗證 cron  新增 
  既有 14k+ 筆回填  待手動跑 enqueue_missing_insight_embeddings()

regression: 47 unit tests 全綠

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-04 09:31:31 +08:00

354 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
run_scheduler.py — momo-scheduler 容器入口點
排程任務清單(對齊 app.py init_scheduler + scheduler.py 全任務):
每 30 分鐘auto_import、whitepage_check
每 1 小時momo、edm、festival
每 4 小時competitor_price_feeder、icaim_analysis
每 6 小時quality_rescore
每 12 小時dedup_batch
每 1 天 db_backup03:00、cleanup_agent_context03:30、backup_monitor04:00、daily_report09:00、ai_smoke_summary09:10、pchome_match_backfill10:30、openclaw_meta_analysis12:00, Phase 4 降頻、daily_token_report23:55
每 1 週 weekly_strategy週一 06:00
每 1 月 monthly_report每月1日 07:00
"""
import asyncio
import logging
import threading
import time
import schedule
# 匯入全部排程任務函式
from scheduler import (
run_momo_task,
run_edm_task,
run_festival_task,
run_promo_event_task,
run_auto_import_task,
run_whitepage_check,
run_competitor_price_feeder_task,
run_pchome_match_backfill_task,
run_icaim_analysis_task,
run_weekly_strategy_task,
run_db_backup_task,
run_backup_monitor_task,
run_openclaw_meta_analysis_task,
run_dedup_batch_task,
run_quality_rescore_task,
run_daily_report_task,
run_ai_smoke_daily_summary_task,
run_monthly_report_task,
)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
)
logger = logging.getLogger(__name__)
def _register_schedules():
schedule.every(30).minutes.do(run_auto_import_task)
logger.info("📅 每 30 分鐘auto_import")
schedule.every(30).minutes.do(run_whitepage_check)
logger.info("📅 每 30 分鐘whitepage_check")
schedule.every(1).hours.do(run_momo_task)
logger.info("📅 每 1 小時momo_task")
schedule.every(1).hours.do(run_edm_task)
logger.info("📅 每 1 小時edm_task")
schedule.every(1).hours.do(run_festival_task)
logger.info("📅 每 1 小時festival_task")
# 動態註冊促銷活動爬蟲(根據配置)
from services.crawler_config_loader import get_enabled_crawlers
enabled_crawlers = get_enabled_crawlers()
promo_event_configs = {
'mothers_day_2026': {'lpn': '', 'page_type': 'mothers_day', 'name': '母親節超值限時購'},
'valentine_520_2026': {'lpn': '', 'page_type': 'valentine_520', 'name': '520情人節限定購物'},
'labor_day_2026': {'lpn': '', 'page_type': 'labor_day', 'name': '勞動節購物優惠'}
}
for crawler_key, config in enabled_crawlers.items():
if crawler_key in promo_event_configs:
event_config = promo_event_configs[crawler_key]
lpn_code = config.get('lpn_code', '')
if lpn_code:
schedule_hours = config.get('schedule_hours', 4)
schedule.every(schedule_hours).hours.do(
lambda lpn=lpn_code, pt=event_config['page_type'], an=event_config['name']:
run_promo_event_task(lpn, pt, an)
)
logger.info(f"📅 每 {schedule_hours} 小時:{event_config['name']} ({event_config['page_type']})")
else:
logger.warning(f"⚠️ {event_config['name']} 未配置 LPN 代碼,跳過排程")
schedule.every(4).hours.do(run_competitor_price_feeder_task)
logger.info("📅 每 4 小時competitor_price_feeder")
schedule.every(4).hours.do(run_icaim_analysis_task)
logger.info("📅 每 4 小時icaim_analysis")
# Operation Ollama-First v5.0 Phase 4Meta 自審降頻 6h → 每日 12:00月省 ~1.875M Gemini tokens
# icaim_analysis 內原本 line 2233/2253 的額外觸發已同步移除(避免重複呼叫)
schedule.every().day.at("12:00").do(run_openclaw_meta_analysis_task)
logger.info("📅 每日 12:00openclaw_meta_analysisPhase 4 降頻:原 6h")
schedule.every(6).hours.do(run_quality_rescore_task)
logger.info("📅 每 6 小時quality_rescore")
schedule.every(12).hours.do(run_dedup_batch_task)
logger.info("📅 每 12 小時dedup_batch")
# Operation Ollama-First v5.0 Phase 11+ — RAG 學習迴圈 workerPhase 12 收尾)
# 預設 RAG_ENABLED=false 時learning_episodes 不會有資料worker 跑空 loop無害
schedule.every(5).minutes.do(run_promotion_gate_worker)
logger.info("📅 每 5 分鐘promotion_gate_workerpending → promote/reject/await")
schedule.every(30).minutes.do(run_awaiting_review_push)
logger.info("📅 每 30 分鐘awaiting_review_push推 Telegram 等 👍/👎)")
schedule.every(4).hours.do(run_expire_stale_reviews)
logger.info("📅 每 4 小時expire_stale_reviews24h 無回應降權 0.5")
# Phase 11.0 護欄 #3BGE-M3 跨主機一致性驗證ADR-033
# 每週一次足夠(驗證模型版本未漂移;不需每次啟動)
schedule.every().sunday.at("04:30").do(run_embed_consistency_check)
logger.info("📅 每週日 04:30bge-m3 跨主機一致性驗證")
schedule.every().day.at("03:00").do(run_db_backup_task)
logger.info("📅 每日 03:00db_backup")
schedule.every().day.at("03:30").do(run_cleanup_agent_context)
logger.info("📅 每日 03:30cleanup_agent_context")
schedule.every().day.at("04:00").do(run_backup_monitor_task)
logger.info("📅 每日 04:00backup_monitor")
schedule.every().monday.at("06:00").do(run_weekly_strategy_task)
logger.info("📅 每週一 06:00weekly_strategy")
schedule.every().day.at("09:00").do(run_daily_report_task)
logger.info("📅 每日 09:00daily_report")
schedule.every().day.at("09:10").do(run_ai_smoke_daily_summary_task)
logger.info("📅 每日 09:10ai_smoke_daily_summary")
schedule.every().day.at("10:30").do(run_pchome_match_backfill_task)
logger.info("📅 每日 10:30pchome_match_backfill")
# Operation Ollama-First v5.0 — Phase 1 收尾:每日 23:55 LLM Token 日報
schedule.every().day.at("23:55").do(run_daily_token_report_task)
logger.info("📅 每日 23:55daily_token_report")
# 每月1日 07:00 月報schedule 不支援 every().month用每日 07:00 + 日期判斷)
def _monthly_report_gate():
from datetime import datetime as _dt
if _dt.now().day == 1:
run_monthly_report_task()
schedule.every().day.at("07:00").do(_monthly_report_gate)
logger.info("📅 每月1日 07:00monthly_report")
def run_daily_token_report_task():
"""每日 23:55 — Operation Ollama-First v5.0 Phase 1 收尾LLM Token 日報。
任務:
1. 查 ai_calls 過去 24h 統計(總覽 / 供應商 / TOP caller / 成本 / 趨勢 / 告警)
2. 推 Telegram + 寫 ai_insightstype='daily_token_report'
紀律:
- 失敗安全DB 查不到資料 → 推「⚠️ 報表生成失敗」訊息但不爆 scheduler
- 不影響其他排程:例外完全吞掉,僅 log error
"""
try:
from services.token_report_service import send_daily_report
result = send_daily_report()
logger.info(
"[TokenReport] sent=%s failed=%s chars=%s ok=%s",
result.get('sent'), result.get('failed'),
result.get('chars'), result.get('ok'),
)
except Exception as e:
logger.error(f"[TokenReport] task failed: {e}", exc_info=True)
# ─────────────────────────────────────────────────────────────────────────────
# Operation Ollama-First v5.0 Phase 11+ — RAG 學習迴圈 workerPhase 12 收尾)
# ─────────────────────────────────────────────────────────────────────────────
def run_promotion_gate_worker():
"""每 5 分鐘 — 批次處理 learning_episodes pending → can_promote → promote/reject/await。
依 ADR-032 PromotionGate 4 階段,不主動跑 LLMDistiller 純規則引擎)。
RAG_ENABLED=false 時 learning_episodes 為空worker 跑空 loop無害
"""
try:
from services.learning_pipeline import process_pending_episodes
stats = process_pending_episodes()
if stats.get('pending_seen', 0) > 0:
logger.info(
"[PromotionWorker] pending=%d promoted=%d rejected=%d awaiting=%d errors=%d",
stats['pending_seen'], stats['promoted'], stats['rejected'],
stats['awaiting'], stats['errors'],
)
except Exception as e:
logger.error(f"[PromotionWorker] task failed: {e}", exc_info=True)
def run_awaiting_review_push():
"""每 30 分鐘 — 推 awaiting_review episode 到 Telegram 等 👍/👎。
限制TELEGRAM_ADMIN_CHAT_ID 未設則跳過fail-safe
"""
try:
from services.learning_pipeline import push_awaiting_reviews_to_telegram
pushed = push_awaiting_reviews_to_telegram()
if pushed > 0:
logger.info("[AwaitingReviewPush] pushed=%d episodes", pushed)
except Exception as e:
logger.error(f"[AwaitingReviewPush] task failed: {e}", exc_info=True)
def run_expire_stale_reviews():
"""每 4 小時 — 24h 無回應 awaiting_review → expiredweight=0.5)。
依 ADR-033 護欄 #1 Stage 4 規則。
"""
try:
from services.learning_pipeline import expire_stale_reviews
n = expire_stale_reviews()
if n > 0:
logger.info("[ExpireStale] expired %d awaiting_review episodes (24h timeout)", n)
except Exception as e:
logger.error(f"[ExpireStale] task failed: {e}", exc_info=True)
def run_embed_consistency_check():
"""每週日 04:30 — BGE-M3 跨主機一致性驗證ADR-033 護欄 #3
跑 verify_embedding_consistency不一致時 logger.errorok 時 logger.info。
每週一次足夠(驗證模型版本未漂移;過頻會打三主機 Ollama 浪費)。
"""
try:
from services.rag_service import verify_embedding_consistency
result = verify_embedding_consistency()
logger.info(
"[EmbedConsistency] ok=%s reachable=%s max_diff=%.2e signature=%s",
result['ok'], result['reachable'],
result['max_diff'], result['signature'],
)
if not result['ok']:
logger.error(
"[EmbedConsistency] ⚠️ INCONSISTENT — RAG 召回率將下降;"
"檢查三主機 bge-m3 模型版本是否同步ollama list"
)
except Exception as e:
logger.error(f"[EmbedConsistency] task failed: {e}", exc_info=True)
def run_cleanup_agent_context():
"""每日 03:30 — 清理 agent_context 表中已過期的 TTL 記錄migration 018 定義)"""
from database.manager import get_session
from sqlalchemy import text
session = get_session()
try:
session.execute(text("SELECT cleanup_expired_agent_context()"))
session.commit()
logger.info("[Cleanup] agent_context TTL 清理完成")
except Exception as e:
logger.error(f"[Cleanup] agent_context 清理失敗: {e}")
try:
from services.event_router import notify_failure
notify_failure(
task_name="run_cleanup_agent_context",
error=e,
source="Scheduler.Cleanup",
event_type="agent_context_cleanup_failure",
priority="P2",
title="agent_context TTL 清理失敗",
dedup_ttl_sec=3600,
)
except Exception as _router_e:
logger.error(f"[Cleanup] event_router 失敗: {_router_e}")
finally:
session.close()
def _run_elephant_alpha_engine():
"""Daemon thread: ElephantAlpha 自主監控引擎(獨立 asyncio loop"""
loop = None
try:
from services.elephant_alpha_autonomous_engine import autonomous_engine
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
logger.info("🐘 [ElephantAlpha] Autonomous engine thread started")
loop.run_until_complete(autonomous_engine.start_autonomous_monitoring())
except Exception as e:
logger.error(f"🐘 [ElephantAlpha] Engine crashed: {e}")
finally:
if loop is not None:
loop.close()
if __name__ == "__main__":
logger.info("🚀 momo-scheduler 啟動中...")
_register_schedules()
logger.info("✅ 全部排程任務已註冊")
_ea_thread = threading.Thread(
target=_run_elephant_alpha_engine,
daemon=True,
name="elephant-alpha-engine",
)
_ea_thread.start()
logger.info("🐘 [ElephantAlpha] Autonomous engine thread launched")
logger.info("⏰ 排程主迴圈啟動,等待任務觸發...")
_ea_watchdog_counter = 0 # 每 60 秒60 次 sleep(1))做一次存活檢查
while True:
try:
schedule.run_pending()
time.sleep(1)
# 每 60 秒檢查 ElephantAlpha 執行緒是否還活著
_ea_watchdog_counter += 1
if _ea_watchdog_counter >= 60:
_ea_watchdog_counter = 0
if not _ea_thread.is_alive():
logger.error("[ElephantAlpha] 監控執行緒已死亡,嘗試重啟")
try:
from services.event_router import dispatch_sync as _dispatch_sync
_dispatch_sync({
"source": "Scheduler.ElephantAlpha",
"event_type": "thread_crashed",
"severity": "alert",
"title": "ElephantAlpha 執行緒死亡",
"status": "自動重啟中",
"impact": "P2 - 自主監控引擎暫停",
"summary": "ElephantAlphaEngine daemon thread 意外終止,排程主迴圈已偵測並觸發重啟",
})
except Exception as _alert_err:
logger.error(f"[ElephantAlpha] 無法發送告警: {_alert_err}")
_ea_thread = threading.Thread(
target=_run_elephant_alpha_engine,
daemon=True,
name="ElephantAlphaEngine",
)
_ea_thread.start()
logger.info("[ElephantAlpha] 執行緒已重啟")
except KeyboardInterrupt:
logger.info("⛔ Scheduler stopped.")
break
except Exception as e:
logger.error(f"Scheduler error: {e}")
time.sleep(5)