Files
ewoooc/run_scheduler.py
OoO c13dc22639
All checks were successful
CD Pipeline / deploy (push) Successful in 2m44s
feat(p20)+docs: cost auto-throttle + LLM 模型完整評估
Operation Ollama-First v5.0 / Phase 20 + LLM 模型治理

services/cost_throttle_service.py (新檔, 200+ 行)
- evaluate_throttle_status() 每小時 cron 跑
- 查 ai_call_budgets monthly × 累計 spent → 月底線性外推
- 推估 > 預算 110% → 標 throttled(hysteresis:降到 95% 才解除)
- _push_throttle_alerts: 狀態變化推 Telegram
- is_provider_throttled(provider) public API(給 anthropic/gemini caller 啟動 check)
- COST_THROTTLE_ENABLED 預設 OFF(避免戰時誤節流)

run_scheduler.py 加 2 cron + task wrapper
- 每 1 小時:cost_throttle_evaluate
- 每日 00:05:cost_throttle_reset_if_new_month

docs/llm_model_full_evaluation_20260504.md (260+ 行)
- 場景 × 模型對應矩陣(4 大層次)
  戰術層 / 戰略層 / 多模態 / 雲端 API
- 本次啟動的追加 4 模型(qwen2.5-coder:32b / deepseek-r1:14b /
  llava / gemma3:4b)— Primary + Secondary 並行拉
- Phase 21 路由優化建議(context size + complexity 動態選 model)
- Phase 22 多供應商編排 + cost throttle 整合
- 儲存 / RAM / 延遲評估
- 模型治理 SOP(新增 / 替換 / 淘汰)
- COST_TABLE 對齊(含 deepseek 直連價格)

啟用前置(待統帥):
1. Primary + Secondary 4 模型拉完(背景進行中)
2. .env: COST_THROTTLE_ENABLED=true(觀察 1 週後)
3. ANTHROPIC_API_KEY 設後 Code Review 自動切 Claude Opus 4.7

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-04 10:36:56 +08:00

394 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
run_scheduler.py — momo-scheduler 容器入口點
排程任務清單(對齊 app.py init_scheduler + scheduler.py 全任務):
每 30 分鐘auto_import、whitepage_check
每 1 小時momo、edm、festival
每 4 小時competitor_price_feeder、icaim_analysis
每 6 小時quality_rescore
每 12 小時dedup_batch
每 1 天 db_backup03:00、cleanup_agent_context03:30、backup_monitor04:00、daily_report09:00、ai_smoke_summary09:10、pchome_match_backfill10:30、openclaw_meta_analysis12:00, Phase 4 降頻、daily_token_report23:55
每 1 週 weekly_strategy週一 06:00
每 1 月 monthly_report每月1日 07:00
"""
import asyncio
import logging
import threading
import time
import schedule
# 匯入全部排程任務函式
from scheduler import (
run_momo_task,
run_edm_task,
run_festival_task,
run_promo_event_task,
run_auto_import_task,
run_whitepage_check,
run_competitor_price_feeder_task,
run_pchome_match_backfill_task,
run_icaim_analysis_task,
run_weekly_strategy_task,
run_db_backup_task,
run_backup_monitor_task,
run_openclaw_meta_analysis_task,
run_dedup_batch_task,
run_quality_rescore_task,
run_daily_report_task,
run_ai_smoke_daily_summary_task,
run_monthly_report_task,
)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
)
logger = logging.getLogger(__name__)
def _register_schedules():
schedule.every(30).minutes.do(run_auto_import_task)
logger.info("📅 每 30 分鐘auto_import")
schedule.every(30).minutes.do(run_whitepage_check)
logger.info("📅 每 30 分鐘whitepage_check")
schedule.every(1).hours.do(run_momo_task)
logger.info("📅 每 1 小時momo_task")
schedule.every(1).hours.do(run_edm_task)
logger.info("📅 每 1 小時edm_task")
schedule.every(1).hours.do(run_festival_task)
logger.info("📅 每 1 小時festival_task")
# 動態註冊促銷活動爬蟲(根據配置)
from services.crawler_config_loader import get_enabled_crawlers
enabled_crawlers = get_enabled_crawlers()
promo_event_configs = {
'mothers_day_2026': {'lpn': '', 'page_type': 'mothers_day', 'name': '母親節超值限時購'},
'valentine_520_2026': {'lpn': '', 'page_type': 'valentine_520', 'name': '520情人節限定購物'},
'labor_day_2026': {'lpn': '', 'page_type': 'labor_day', 'name': '勞動節購物優惠'}
}
for crawler_key, config in enabled_crawlers.items():
if crawler_key in promo_event_configs:
event_config = promo_event_configs[crawler_key]
lpn_code = config.get('lpn_code', '')
if lpn_code:
schedule_hours = config.get('schedule_hours', 4)
schedule.every(schedule_hours).hours.do(
lambda lpn=lpn_code, pt=event_config['page_type'], an=event_config['name']:
run_promo_event_task(lpn, pt, an)
)
logger.info(f"📅 每 {schedule_hours} 小時:{event_config['name']} ({event_config['page_type']})")
else:
logger.warning(f"⚠️ {event_config['name']} 未配置 LPN 代碼,跳過排程")
schedule.every(4).hours.do(run_competitor_price_feeder_task)
logger.info("📅 每 4 小時competitor_price_feeder")
schedule.every(4).hours.do(run_icaim_analysis_task)
logger.info("📅 每 4 小時icaim_analysis")
# Operation Ollama-First v5.0 Phase 4Meta 自審降頻 6h → 每日 12:00月省 ~1.875M Gemini tokens
# icaim_analysis 內原本 line 2233/2253 的額外觸發已同步移除(避免重複呼叫)
schedule.every().day.at("12:00").do(run_openclaw_meta_analysis_task)
logger.info("📅 每日 12:00openclaw_meta_analysisPhase 4 降頻:原 6h")
schedule.every(6).hours.do(run_quality_rescore_task)
logger.info("📅 每 6 小時quality_rescore")
schedule.every(12).hours.do(run_dedup_batch_task)
logger.info("📅 每 12 小時dedup_batch")
# Operation Ollama-First v5.0 Phase 11+ — RAG 學習迴圈 workerPhase 12 收尾)
# 預設 RAG_ENABLED=false 時learning_episodes 不會有資料worker 跑空 loop無害
schedule.every(5).minutes.do(run_promotion_gate_worker)
logger.info("📅 每 5 分鐘promotion_gate_workerpending → promote/reject/await")
schedule.every(30).minutes.do(run_awaiting_review_push)
logger.info("📅 每 30 分鐘awaiting_review_push推 Telegram 等 👍/👎)")
schedule.every(4).hours.do(run_expire_stale_reviews)
logger.info("📅 每 4 小時expire_stale_reviews24h 無回應降權 0.5")
# Phase 11.0 護欄 #3BGE-M3 跨主機一致性驗證ADR-033
# 每週一次足夠(驗證模型版本未漂移;不需每次啟動)
schedule.every().sunday.at("04:30").do(run_embed_consistency_check)
logger.info("📅 每週日 04:30bge-m3 跨主機一致性驗證")
# Phase 20: 成本自動節流COST_THROTTLE_ENABLED 預設 OFF
schedule.every(1).hours.do(run_cost_throttle_evaluate)
logger.info("📅 每 1 小時cost_throttle_evaluate")
schedule.every().day.at("00:05").do(run_cost_throttle_reset_if_new_month)
logger.info("📅 每日 00:05cost_throttle_reset_if_new_month")
schedule.every().day.at("03:00").do(run_db_backup_task)
logger.info("📅 每日 03:00db_backup")
schedule.every().day.at("03:30").do(run_cleanup_agent_context)
logger.info("📅 每日 03:30cleanup_agent_context")
schedule.every().day.at("04:00").do(run_backup_monitor_task)
logger.info("📅 每日 04:00backup_monitor")
schedule.every().monday.at("06:00").do(run_weekly_strategy_task)
logger.info("📅 每週一 06:00weekly_strategy")
schedule.every().day.at("09:00").do(run_daily_report_task)
logger.info("📅 每日 09:00daily_report")
schedule.every().day.at("09:10").do(run_ai_smoke_daily_summary_task)
logger.info("📅 每日 09:10ai_smoke_daily_summary")
schedule.every().day.at("10:30").do(run_pchome_match_backfill_task)
logger.info("📅 每日 10:30pchome_match_backfill")
# Operation Ollama-First v5.0 — Phase 1 收尾:每日 23:55 LLM Token 日報
schedule.every().day.at("23:55").do(run_daily_token_report_task)
logger.info("📅 每日 23:55daily_token_report")
# 每月1日 07:00 月報schedule 不支援 every().month用每日 07:00 + 日期判斷)
def _monthly_report_gate():
from datetime import datetime as _dt
if _dt.now().day == 1:
run_monthly_report_task()
schedule.every().day.at("07:00").do(_monthly_report_gate)
logger.info("📅 每月1日 07:00monthly_report")
def run_daily_token_report_task():
"""每日 23:55 — Operation Ollama-First v5.0 Phase 1 收尾LLM Token 日報。
任務:
1. 查 ai_calls 過去 24h 統計(總覽 / 供應商 / TOP caller / 成本 / 趨勢 / 告警)
2. 推 Telegram + 寫 ai_insightstype='daily_token_report'
紀律:
- 失敗安全DB 查不到資料 → 推「⚠️ 報表生成失敗」訊息但不爆 scheduler
- 不影響其他排程:例外完全吞掉,僅 log error
"""
try:
from services.token_report_service import send_daily_report
result = send_daily_report()
logger.info(
"[TokenReport] sent=%s failed=%s chars=%s ok=%s",
result.get('sent'), result.get('failed'),
result.get('chars'), result.get('ok'),
)
except Exception as e:
logger.error(f"[TokenReport] task failed: {e}", exc_info=True)
# ─────────────────────────────────────────────────────────────────────────────
# Operation Ollama-First v5.0 Phase 11+ — RAG 學習迴圈 workerPhase 12 收尾)
# ─────────────────────────────────────────────────────────────────────────────
def run_promotion_gate_worker():
"""每 5 分鐘 — 批次處理 learning_episodes pending → can_promote → promote/reject/await。
依 ADR-032 PromotionGate 4 階段,不主動跑 LLMDistiller 純規則引擎)。
RAG_ENABLED=false 時 learning_episodes 為空worker 跑空 loop無害
"""
try:
from services.learning_pipeline import process_pending_episodes
stats = process_pending_episodes()
if stats.get('pending_seen', 0) > 0:
logger.info(
"[PromotionWorker] pending=%d promoted=%d rejected=%d awaiting=%d errors=%d",
stats['pending_seen'], stats['promoted'], stats['rejected'],
stats['awaiting'], stats['errors'],
)
except Exception as e:
logger.error(f"[PromotionWorker] task failed: {e}", exc_info=True)
def run_awaiting_review_push():
"""每 30 分鐘 — 推 awaiting_review episode 到 Telegram 等 👍/👎。
限制TELEGRAM_ADMIN_CHAT_ID 未設則跳過fail-safe
"""
try:
from services.learning_pipeline import push_awaiting_reviews_to_telegram
pushed = push_awaiting_reviews_to_telegram()
if pushed > 0:
logger.info("[AwaitingReviewPush] pushed=%d episodes", pushed)
except Exception as e:
logger.error(f"[AwaitingReviewPush] task failed: {e}", exc_info=True)
def run_expire_stale_reviews():
"""每 4 小時 — 24h 無回應 awaiting_review → expiredweight=0.5)。
依 ADR-033 護欄 #1 Stage 4 規則。
"""
try:
from services.learning_pipeline import expire_stale_reviews
n = expire_stale_reviews()
if n > 0:
logger.info("[ExpireStale] expired %d awaiting_review episodes (24h timeout)", n)
except Exception as e:
logger.error(f"[ExpireStale] task failed: {e}", exc_info=True)
def run_cost_throttle_evaluate():
"""每小時 — Phase 20 成本自動節流評估COST_THROTTLE_ENABLED 預設 OFF
跑 evaluate_throttle_status若有 provider 月底推估超預算 110% → 標 throttled。
狀態變化throttle/unthrottle會自動推 Telegram。
"""
try:
from services.cost_throttle_service import (
evaluate_throttle_status, is_cost_throttle_enabled,
)
if not is_cost_throttle_enabled():
return # flag OFF 直接 skip不打 DB
state = evaluate_throttle_status()
throttled = [p for p, info in state.items() if info.get('throttled')]
if throttled:
logger.warning("[CostThrottle] currently throttled: %s", throttled)
else:
logger.debug("[CostThrottle] no provider throttled")
except Exception as e:
logger.error(f"[CostThrottle] task failed: {e}", exc_info=True)
def run_cost_throttle_reset_if_new_month():
"""每日 00:05 — 若當天是月份第 1 天,清 throttle state跨月重置"""
try:
from datetime import datetime
from services.cost_throttle_service import reset_throttle_state
if datetime.now().day == 1:
reset_throttle_state()
logger.info("[CostThrottle] new month detected, state reset")
except Exception as e:
logger.error(f"[CostThrottle] reset failed: {e}", exc_info=True)
def run_embed_consistency_check():
"""每週日 04:30 — BGE-M3 跨主機一致性驗證ADR-033 護欄 #3
跑 verify_embedding_consistency不一致時 logger.errorok 時 logger.info。
每週一次足夠(驗證模型版本未漂移;過頻會打三主機 Ollama 浪費)。
"""
try:
from services.rag_service import verify_embedding_consistency
result = verify_embedding_consistency()
logger.info(
"[EmbedConsistency] ok=%s reachable=%s max_diff=%.2e signature=%s",
result['ok'], result['reachable'],
result['max_diff'], result['signature'],
)
if not result['ok']:
logger.error(
"[EmbedConsistency] ⚠️ INCONSISTENT — RAG 召回率將下降;"
"檢查三主機 bge-m3 模型版本是否同步ollama list"
)
except Exception as e:
logger.error(f"[EmbedConsistency] task failed: {e}", exc_info=True)
def run_cleanup_agent_context():
"""每日 03:30 — 清理 agent_context 表中已過期的 TTL 記錄migration 018 定義)"""
from database.manager import get_session
from sqlalchemy import text
session = get_session()
try:
session.execute(text("SELECT cleanup_expired_agent_context()"))
session.commit()
logger.info("[Cleanup] agent_context TTL 清理完成")
except Exception as e:
logger.error(f"[Cleanup] agent_context 清理失敗: {e}")
try:
from services.event_router import notify_failure
notify_failure(
task_name="run_cleanup_agent_context",
error=e,
source="Scheduler.Cleanup",
event_type="agent_context_cleanup_failure",
priority="P2",
title="agent_context TTL 清理失敗",
dedup_ttl_sec=3600,
)
except Exception as _router_e:
logger.error(f"[Cleanup] event_router 失敗: {_router_e}")
finally:
session.close()
def _run_elephant_alpha_engine():
"""Daemon thread: ElephantAlpha 自主監控引擎(獨立 asyncio loop"""
loop = None
try:
from services.elephant_alpha_autonomous_engine import autonomous_engine
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
logger.info("🐘 [ElephantAlpha] Autonomous engine thread started")
loop.run_until_complete(autonomous_engine.start_autonomous_monitoring())
except Exception as e:
logger.error(f"🐘 [ElephantAlpha] Engine crashed: {e}")
finally:
if loop is not None:
loop.close()
if __name__ == "__main__":
logger.info("🚀 momo-scheduler 啟動中...")
_register_schedules()
logger.info("✅ 全部排程任務已註冊")
_ea_thread = threading.Thread(
target=_run_elephant_alpha_engine,
daemon=True,
name="elephant-alpha-engine",
)
_ea_thread.start()
logger.info("🐘 [ElephantAlpha] Autonomous engine thread launched")
logger.info("⏰ 排程主迴圈啟動,等待任務觸發...")
_ea_watchdog_counter = 0 # 每 60 秒60 次 sleep(1))做一次存活檢查
while True:
try:
schedule.run_pending()
time.sleep(1)
# 每 60 秒檢查 ElephantAlpha 執行緒是否還活著
_ea_watchdog_counter += 1
if _ea_watchdog_counter >= 60:
_ea_watchdog_counter = 0
if not _ea_thread.is_alive():
logger.error("[ElephantAlpha] 監控執行緒已死亡,嘗試重啟")
try:
from services.event_router import dispatch_sync as _dispatch_sync
_dispatch_sync({
"source": "Scheduler.ElephantAlpha",
"event_type": "thread_crashed",
"severity": "alert",
"title": "ElephantAlpha 執行緒死亡",
"status": "自動重啟中",
"impact": "P2 - 自主監控引擎暫停",
"summary": "ElephantAlphaEngine daemon thread 意外終止,排程主迴圈已偵測並觸發重啟",
})
except Exception as _alert_err:
logger.error(f"[ElephantAlpha] 無法發送告警: {_alert_err}")
_ea_thread = threading.Thread(
target=_run_elephant_alpha_engine,
daemon=True,
name="ElephantAlphaEngine",
)
_ea_thread.start()
logger.info("[ElephantAlpha] 執行緒已重啟")
except KeyboardInterrupt:
logger.info("⛔ Scheduler stopped.")
break
except Exception as e:
logger.error(f"Scheduler error: {e}")
time.sleep(5)