Files
ewoooc/run_scheduler.py
OoO 3ea7004a6f refactor(p4)+docs(p5+p6): Meta 降頻 + LOCKED-GEMINI + ADR-028/029
Phase 4 A10 — OpenClaw 雙塔重劃
- run_scheduler.py: Meta 自審 cron 6h → 每日 12:00(月省 2.25M Gemini, +20% 達標)
- scheduler.py: 移除 icaim 內 2 處 inline meta 觸發
- openclaw_strategist 抽 _push_report_with_charts (call×3) + _collect_mcp_intel (call×2)
- 行數目標 -25% 未達(4 報告函數結構差異大,A10 採保守抽出避險)
- 主戰果:Meta 降頻月呼叫 300 → 30(-90%)

Phase 5 — 5 處 LOCKED-GEMINI 註解(涵蓋鎖定 7 場景)
- services/mcp_collector_service.py:32 (場景 #1: Google Search Grounding)
- services/openclaw_strategist_service.py:40 (場景 #2/3/4: 週/月/年報)
- services/code_review_pipeline_service.py:46 (場景 #5: 100K+ token diff)
- services/elephant_alpha_orchestrator.py:88 (場景 #6: EA HITL)
- routes/openclaw_bot_routes.py:98 (場景 #7: PPT 簡報)

Phase 6 A12 — 憲法級 ADR 三份
- ADR-028「LLM 路由統一準則」(269 行)
  - 5 大支柱:三主機級聯 / Ollama 優先 / 雙塔分工 / Gemini 鎖 7 場景 / 可觀測性
  - 8 個 provider 白名單(DB CHECK 對齊)
  - 30+ caller 名單分「已實作 / 規劃中」
- ADR-029「Hermes-First 雙塔分工」(222 行)
  - 12 項職責重劃表 + A7/A8/A10 落地對照
  - Gemini 月支出 -23.5%(critic 第 3 輪 B5 算術修正)
- ADR-027 附錄(+69 行)
  - 三主機架構(Primary/Secondary/Fallback)
  - 4 條獨立 fallback 鏈
  - 廢止「188 Ollama」概念
- README 索引更新

A11 critic 第 3 輪修補:5 BLOCKER 全清
- B1: 行數 1831 → 2677 (含 baseline 對照)
- B2: 場景 #4 行號 759/1267 → 1102/1628 + annual 不存在註明
- B3: 虛構 caller 改實存(ea_hitl_prefetch → ea_engine 等)
- B4: 白名單三層對齊(DB 8 = ADR 8 = token_report 補 ollama_secondary)
- B5: KPI 算術 50→38 = -23.5% 重核

services/telegram_templates.py: A5 daily_token_report() 函數
services/mcp_collector_service.py: 加 LOCKED-GEMINI 註解
services/elephant_alpha_orchestrator.py: 加 LOCKED-GEMINI 註解

103/103 unit test 全綠(zero regression)

Operation Ollama-First v5.0 / Phase 4 A10 + Phase 5 + Phase 6 A12

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-03 23:06:08 +08:00

266 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
run_scheduler.py — momo-scheduler 容器入口點
排程任務清單(對齊 app.py init_scheduler + scheduler.py 全任務):
每 30 分鐘auto_import、whitepage_check
每 1 小時momo、edm、festival
每 4 小時competitor_price_feeder、icaim_analysis
每 6 小時quality_rescore
每 12 小時dedup_batch
每 1 天 db_backup03:00、cleanup_agent_context03:30、backup_monitor04:00、daily_report09:00、ai_smoke_summary09:10、pchome_match_backfill10:30、openclaw_meta_analysis12:00, Phase 4 降頻、daily_token_report23:55
每 1 週 weekly_strategy週一 06:00
每 1 月 monthly_report每月1日 07:00
"""
import asyncio
import logging
import threading
import time
import schedule
# 匯入全部排程任務函式
from scheduler import (
run_momo_task,
run_edm_task,
run_festival_task,
run_promo_event_task,
run_auto_import_task,
run_whitepage_check,
run_competitor_price_feeder_task,
run_pchome_match_backfill_task,
run_icaim_analysis_task,
run_weekly_strategy_task,
run_db_backup_task,
run_backup_monitor_task,
run_openclaw_meta_analysis_task,
run_dedup_batch_task,
run_quality_rescore_task,
run_daily_report_task,
run_ai_smoke_daily_summary_task,
run_monthly_report_task,
)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
)
logger = logging.getLogger(__name__)
def _register_schedules():
schedule.every(30).minutes.do(run_auto_import_task)
logger.info("📅 每 30 分鐘auto_import")
schedule.every(30).minutes.do(run_whitepage_check)
logger.info("📅 每 30 分鐘whitepage_check")
schedule.every(1).hours.do(run_momo_task)
logger.info("📅 每 1 小時momo_task")
schedule.every(1).hours.do(run_edm_task)
logger.info("📅 每 1 小時edm_task")
schedule.every(1).hours.do(run_festival_task)
logger.info("📅 每 1 小時festival_task")
# 動態註冊促銷活動爬蟲(根據配置)
from services.crawler_config_loader import get_enabled_crawlers
enabled_crawlers = get_enabled_crawlers()
promo_event_configs = {
'mothers_day_2026': {'lpn': '', 'page_type': 'mothers_day', 'name': '母親節超值限時購'},
'valentine_520_2026': {'lpn': '', 'page_type': 'valentine_520', 'name': '520情人節限定購物'},
'labor_day_2026': {'lpn': '', 'page_type': 'labor_day', 'name': '勞動節購物優惠'}
}
for crawler_key, config in enabled_crawlers.items():
if crawler_key in promo_event_configs:
event_config = promo_event_configs[crawler_key]
lpn_code = config.get('lpn_code', '')
if lpn_code:
schedule_hours = config.get('schedule_hours', 4)
schedule.every(schedule_hours).hours.do(
lambda lpn=lpn_code, pt=event_config['page_type'], an=event_config['name']:
run_promo_event_task(lpn, pt, an)
)
logger.info(f"📅 每 {schedule_hours} 小時:{event_config['name']} ({event_config['page_type']})")
else:
logger.warning(f"⚠️ {event_config['name']} 未配置 LPN 代碼,跳過排程")
schedule.every(4).hours.do(run_competitor_price_feeder_task)
logger.info("📅 每 4 小時competitor_price_feeder")
schedule.every(4).hours.do(run_icaim_analysis_task)
logger.info("📅 每 4 小時icaim_analysis")
# Operation Ollama-First v5.0 Phase 4Meta 自審降頻 6h → 每日 12:00月省 ~1.875M Gemini tokens
# icaim_analysis 內原本 line 2233/2253 的額外觸發已同步移除(避免重複呼叫)
schedule.every().day.at("12:00").do(run_openclaw_meta_analysis_task)
logger.info("📅 每日 12:00openclaw_meta_analysisPhase 4 降頻:原 6h")
schedule.every(6).hours.do(run_quality_rescore_task)
logger.info("📅 每 6 小時quality_rescore")
schedule.every(12).hours.do(run_dedup_batch_task)
logger.info("📅 每 12 小時dedup_batch")
schedule.every().day.at("03:00").do(run_db_backup_task)
logger.info("📅 每日 03:00db_backup")
schedule.every().day.at("03:30").do(run_cleanup_agent_context)
logger.info("📅 每日 03:30cleanup_agent_context")
schedule.every().day.at("04:00").do(run_backup_monitor_task)
logger.info("📅 每日 04:00backup_monitor")
schedule.every().monday.at("06:00").do(run_weekly_strategy_task)
logger.info("📅 每週一 06:00weekly_strategy")
schedule.every().day.at("09:00").do(run_daily_report_task)
logger.info("📅 每日 09:00daily_report")
schedule.every().day.at("09:10").do(run_ai_smoke_daily_summary_task)
logger.info("📅 每日 09:10ai_smoke_daily_summary")
schedule.every().day.at("10:30").do(run_pchome_match_backfill_task)
logger.info("📅 每日 10:30pchome_match_backfill")
# Operation Ollama-First v5.0 — Phase 1 收尾:每日 23:55 LLM Token 日報
schedule.every().day.at("23:55").do(run_daily_token_report_task)
logger.info("📅 每日 23:55daily_token_report")
# 每月1日 07:00 月報schedule 不支援 every().month用每日 07:00 + 日期判斷)
def _monthly_report_gate():
from datetime import datetime as _dt
if _dt.now().day == 1:
run_monthly_report_task()
schedule.every().day.at("07:00").do(_monthly_report_gate)
logger.info("📅 每月1日 07:00monthly_report")
def run_daily_token_report_task():
"""每日 23:55 — Operation Ollama-First v5.0 Phase 1 收尾LLM Token 日報。
任務:
1. 查 ai_calls 過去 24h 統計(總覽 / 供應商 / TOP caller / 成本 / 趨勢 / 告警)
2. 推 Telegram + 寫 ai_insightstype='daily_token_report'
紀律:
- 失敗安全DB 查不到資料 → 推「⚠️ 報表生成失敗」訊息但不爆 scheduler
- 不影響其他排程:例外完全吞掉,僅 log error
"""
try:
from services.token_report_service import send_daily_report
result = send_daily_report()
logger.info(
"[TokenReport] sent=%s failed=%s chars=%s ok=%s",
result.get('sent'), result.get('failed'),
result.get('chars'), result.get('ok'),
)
except Exception as e:
logger.error(f"[TokenReport] task failed: {e}", exc_info=True)
# 不再嘗試 event_router避免循環依賴純 log 即可
# 統帥可從 scheduler logs 觀察失敗
def run_cleanup_agent_context():
"""每日 03:30 — 清理 agent_context 表中已過期的 TTL 記錄migration 018 定義)"""
from database.manager import get_session
from sqlalchemy import text
session = get_session()
try:
session.execute(text("SELECT cleanup_expired_agent_context()"))
session.commit()
logger.info("[Cleanup] agent_context TTL 清理完成")
except Exception as e:
logger.error(f"[Cleanup] agent_context 清理失敗: {e}")
try:
from services.event_router import notify_failure
notify_failure(
task_name="run_cleanup_agent_context",
error=e,
source="Scheduler.Cleanup",
event_type="agent_context_cleanup_failure",
priority="P2",
title="agent_context TTL 清理失敗",
dedup_ttl_sec=3600,
)
except Exception as _router_e:
logger.error(f"[Cleanup] event_router 失敗: {_router_e}")
finally:
session.close()
def _run_elephant_alpha_engine():
"""Daemon thread: ElephantAlpha 自主監控引擎(獨立 asyncio loop"""
loop = None
try:
from services.elephant_alpha_autonomous_engine import autonomous_engine
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
logger.info("🐘 [ElephantAlpha] Autonomous engine thread started")
loop.run_until_complete(autonomous_engine.start_autonomous_monitoring())
except Exception as e:
logger.error(f"🐘 [ElephantAlpha] Engine crashed: {e}")
finally:
if loop is not None:
loop.close()
if __name__ == "__main__":
logger.info("🚀 momo-scheduler 啟動中...")
_register_schedules()
logger.info("✅ 全部排程任務已註冊")
_ea_thread = threading.Thread(
target=_run_elephant_alpha_engine,
daemon=True,
name="elephant-alpha-engine",
)
_ea_thread.start()
logger.info("🐘 [ElephantAlpha] Autonomous engine thread launched")
logger.info("⏰ 排程主迴圈啟動,等待任務觸發...")
_ea_watchdog_counter = 0 # 每 60 秒60 次 sleep(1))做一次存活檢查
while True:
try:
schedule.run_pending()
time.sleep(1)
# 每 60 秒檢查 ElephantAlpha 執行緒是否還活著
_ea_watchdog_counter += 1
if _ea_watchdog_counter >= 60:
_ea_watchdog_counter = 0
if not _ea_thread.is_alive():
logger.error("[ElephantAlpha] 監控執行緒已死亡,嘗試重啟")
try:
from services.event_router import dispatch_sync as _dispatch_sync
_dispatch_sync({
"source": "Scheduler.ElephantAlpha",
"event_type": "thread_crashed",
"severity": "alert",
"title": "ElephantAlpha 執行緒死亡",
"status": "自動重啟中",
"impact": "P2 - 自主監控引擎暫停",
"summary": "ElephantAlphaEngine daemon thread 意外終止,排程主迴圈已偵測並觸發重啟",
})
except Exception as _alert_err:
logger.error(f"[ElephantAlpha] 無法發送告警: {_alert_err}")
_ea_thread = threading.Thread(
target=_run_elephant_alpha_engine,
daemon=True,
name="ElephantAlphaEngine",
)
_ea_thread.start()
logger.info("[ElephantAlpha] 執行緒已重啟")
except KeyboardInterrupt:
logger.info("⛔ Scheduler stopped.")
break
except Exception as e:
logger.error(f"Scheduler error: {e}")
time.sleep(5)