#!/usr/bin/env python3 """ run_scheduler.py — momo-scheduler 容器入口點 排程任務清單(對齊 app.py init_scheduler + scheduler.py 全任務): 每 30 分鐘:auto_import、whitepage_check 每 1 小時:momo、edm、festival 每 4 小時:competitor_price_feeder、external_offer_sync、icaim_analysis 每 6 小時:quality_rescore、action_plan_hygiene 每 12 小時:dedup_batch 每 10 分鐘:ppt_auto_generation_catchup(補跑被長任務卡過的定期簡報) 每 1 天 :db_backup(03:00)、cleanup_agent_context(03:30)、backup_monitor(04:00)、daily_report(09:00)、roi_monthly_report gate(09:05)、ai_smoke_summary(09:10)、observability_daily_summary(09:30)、pchome_match_backfill(10:30)、pchome_growth_momo_backfill(10:45)、openclaw_meta_analysis(12:00, Phase 4 降頻)、ppt_auto_generation_daily(20:30)、ppt_vision_audit(22:00)、daily_token_report(23:55) 每 1 週 :weekly_strategy(週一 06:00)、ppt_auto_generation_weekly(週一 20:40) 每 1 月 :monthly_report(每月1日 07:00)、ppt_auto_generation_monthly(每月1日 20:50) 每 1 季 :ppt_auto_generation_quarterly(1/4/7/10 月 1 日 21:00) 每半年 :ppt_auto_generation_half_yearly(1/7 月 1 日 21:10) 每 1 年 :ppt_auto_generation_annual(1 月 1 日 21:20) """ import asyncio import logging import os import threading import time from typing import Optional, Tuple import schedule # 匯入全部排程任務函式 from scheduler import ( run_momo_task, run_edm_task, run_festival_task, run_promo_event_task, run_auto_import_task, run_whitepage_check, run_competitor_price_feeder_task, run_external_offer_sync_task, run_pchome_match_backfill_task, run_pchome_growth_momo_backfill_task, run_icaim_analysis_task, run_weekly_strategy_task, run_db_backup_task, run_backup_monitor_task, run_openclaw_meta_analysis_task, run_dedup_batch_task, run_quality_rescore_task, run_daily_report_task, run_ai_smoke_daily_summary_task, run_monthly_report_task, run_ppt_auto_generation_task, ) logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', ) logger = logging.getLogger(__name__) _AI_CALLS_ERROR_SPIKE_LAST_PUSH_TS = 0.0 _OLLAMA_111_USAGE_LAST_PUSH_TS = 0.0 def _env_flag(name: str, default: bool = False) -> bool: raw = os.getenv(name) if raw is None: return default return str(raw).strip().lower() in {"1", "true", "yes", "on"} def _host_health_model_probe_enabled(label: str) -> bool: from services.ollama_health_probe import host_health_model_probe_enabled return host_health_model_probe_enabled(label) def _probe_ollama_embedding_runtime(requests_module, host: str) -> Tuple[bool, Optional[str]]: from services.ollama_health_probe import probe_ollama_embedding_runtime return probe_ollama_embedding_runtime(requests_module, host) def _legacy_edm_schedule_enabled() -> bool: """Legacy fixed-LPN EDM/Festival crawlers are opt-in to avoid stale campaign browser loops.""" return _env_flag("MOMO_ENABLE_LEGACY_EDM_SCHEDULE", False) def _seasonal_promo_schedule_enabled() -> bool: """Seasonal promo crawlers are opt-in; expired LPNs should not keep opening MOMO pages.""" return _env_flag("MOMO_ENABLE_SEASONAL_PROMO_SCHEDULE", False) def _notify_scheduler_failure( task_name: str, error: Exception, *, source: str, event_type: str, priority: str = "P2", title: str = None, dedup_ttl_sec: int = 3600, ) -> None: """Best-effort EventRouter failure notification for scheduler wrapper tasks.""" try: import traceback from services.event_router import notify_failure trace = traceback.format_exc() if trace.strip() == "NoneType: None": trace = f"{type(error).__name__}: {error}" notify_failure( task_name=task_name, error=error, source=source, event_type=event_type, priority=priority, title=title or f"{task_name} 失敗", trace=trace, dedup_ttl_sec=dedup_ttl_sec, ) except Exception as router_error: logger.error("[%s] event_router notify failed: %s", task_name, router_error) def _register_schedules(): schedule.every(30).minutes.do(run_auto_import_task) logger.info("📅 每 30 分鐘:auto_import") schedule.every(30).minutes.do(run_whitepage_check) logger.info("📅 每 30 分鐘:whitepage_check") schedule.every(1).hours.do(run_momo_task) logger.info("📅 每 1 小時:momo_task") if _legacy_edm_schedule_enabled(): schedule.every(1).hours.do(run_edm_task) logger.info("📅 每 1 小時:edm_task") schedule.every(1).hours.do(run_festival_task) logger.info("📅 每 1 小時:festival_task") else: logger.info( "⏸️ legacy EDM/festival crawler schedules disabled " "(set MOMO_ENABLE_LEGACY_EDM_SCHEDULE=true to enable)" ) # 動態註冊促銷活動爬蟲(根據配置) from services.crawler_config_loader import get_enabled_crawlers enabled_crawlers = get_enabled_crawlers() promo_event_configs = { 'mothers_day_2026': {'lpn': '', 'page_type': 'mothers_day', 'name': '母親節超值限時購'}, 'valentine_520_2026': {'lpn': '', 'page_type': 'valentine_520', 'name': '520情人節限定購物'}, 'labor_day_2026': {'lpn': '', 'page_type': 'labor_day', 'name': '勞動節購物優惠'} } if not _seasonal_promo_schedule_enabled(): if any(crawler_key in promo_event_configs for crawler_key in enabled_crawlers): logger.info( "⏸️ seasonal promo crawler schedules disabled " "(set MOMO_ENABLE_SEASONAL_PROMO_SCHEDULE=true to enable)" ) else: for crawler_key, config in enabled_crawlers.items(): if crawler_key in promo_event_configs: event_config = promo_event_configs[crawler_key] lpn_code = config.get('lpn_code', '') if lpn_code: schedule_hours = config.get('schedule_hours', 4) schedule.every(schedule_hours).hours.do( lambda lpn=lpn_code, pt=event_config['page_type'], an=event_config['name']: run_promo_event_task(lpn, pt, an) ) logger.info(f"📅 每 {schedule_hours} 小時:{event_config['name']} ({event_config['page_type']})") else: logger.warning(f"⚠️ {event_config['name']} 未配置 LPN 代碼,跳過排程") schedule.every(4).hours.do(run_competitor_price_feeder_task) logger.info("📅 每 4 小時:competitor_price_feeder") schedule.every(4).hours.do(run_external_offer_sync_task) logger.info("📅 每 4 小時:external_offer_sync(自動同步 MOMO 外部價格參考)") schedule.every(4).hours.do(run_icaim_analysis_task) logger.info("📅 每 4 小時:icaim_analysis") # Operation Ollama-First v5.0 Phase 4:Meta 自審降頻 6h → 每日 12:00(月省 ~1.875M Gemini tokens) # icaim_analysis 內原本 line 2233/2253 的額外觸發已同步移除(避免重複呼叫) schedule.every().day.at("12:00").do(run_openclaw_meta_analysis_task) logger.info("📅 每日 12:00:openclaw_meta_analysis(Phase 4 降頻:原 6h)") schedule.every(6).hours.do(run_quality_rescore_task) logger.info("📅 每 6 小時:quality_rescore") schedule.every(6).hours.do(run_action_plan_hygiene_task) logger.info("📅 每 6 小時:action_plan_hygiene(關閉過期 advisory action_plans)") schedule.every(12).hours.do(run_dedup_batch_task) logger.info("📅 每 12 小時:dedup_batch") # Operation Ollama-First v5.0 Phase 11+ — RAG 學習迴圈 worker(Phase 12 收尾) # 預設 RAG_ENABLED=false 時,learning_episodes 不會有資料,worker 跑空 loop(無害) schedule.every(5).minutes.do(run_promotion_gate_worker) logger.info("📅 每 5 分鐘:promotion_gate_worker(pending → promote/reject/await)") schedule.every(30).minutes.do(run_awaiting_review_push) logger.info("📅 每 30 分鐘:awaiting_review_push(推 Telegram 等 👍/👎)") schedule.every(4).hours.do(run_expire_stale_reviews) logger.info("📅 每 4 小時:expire_stale_reviews(24h 無回應降權 0.5)") # Phase 11.0 護欄 #3:BGE-M3 跨 GCP Ollama 一致性驗證(ADR-033) # 每週一次足夠(驗證模型版本未漂移;不需每次啟動) schedule.every().sunday.at("04:30").do(run_embed_consistency_check) logger.info("📅 每週日 04:30:bge-m3 GCP-A/GCP-B 一致性驗證(111 opt-in)") # Phase 42: 三主機 Ollama 健康探針(即使無人開觀測台頁面也持續累積歷史) schedule.every(15).minutes.do(run_host_health_probe) logger.info("📅 每 15 分鐘:host_health_probe(三主機 → host_health_probes 表)") schedule.every().day.at("03:00").do(run_host_health_probe_cleanup) logger.info("📅 每日 03:00:host_health_probe_cleanup(清 30d 前舊資料)") # Phase 44: AI 呼叫錯誤率突增告警 schedule.every(30).minutes.do(run_ai_calls_error_spike_check) logger.info("📅 每 30 分鐘:ai_calls_error_spike_check(錯誤率 ≥ 30% 推 Telegram)") # Phase 57: 111 Ollama 使用率護欄,避免 final fallback 默默承接高負載 schedule.every(15).minutes.do(run_ollama_111_usage_guard_check) logger.info("📅 每 15 分鐘:ollama_111_usage_guard_check(111 fallback 使用率告警)") # Phase 44: 觀測台每日 09:30 健康摘要推送 schedule.every().day.at("09:30").do(run_observability_daily_summary) logger.info("📅 每日 09:30:observability_daily_summary(早晨報三主機/AI/Cost/PPT)") # Phase 20: 成本自動節流(COST_THROTTLE_ENABLED 預設 OFF) schedule.every(1).hours.do(run_cost_throttle_evaluate) logger.info("📅 每 1 小時:cost_throttle_evaluate") schedule.every().day.at("00:05").do(run_cost_throttle_reset_if_new_month) logger.info("📅 每日 00:05:cost_throttle_reset_if_new_month") # Phase 24: ROI 月報(每日 09:05 跑,內部判斷是否月初第 1 日;避開 09:00 daily_report) schedule.every().day.at("09:05").do(run_roi_monthly_report_if_new_month) logger.info("📅 每日 09:05:roi_monthly_report(月初第 1 日才送)") # PPT 自動簡報補齊(先產出定義中的報表,再交給 22:00 vision audit) schedule.every().day.at("20:30").do(lambda: run_ppt_auto_generation_task("daily")) logger.info("📅 每日 20:30:ppt_auto_generation_daily(日報)") schedule.every().monday.at("20:40").do(lambda: run_ppt_auto_generation_task("weekly")) logger.info("📅 每週一 20:40:ppt_auto_generation_weekly(週報 / 市場情報)") def _ppt_monthly_gate(): from datetime import datetime as _dt if _dt.now().day == 1: run_ppt_auto_generation_task("monthly") def _ppt_quarterly_gate(): from datetime import datetime as _dt now = _dt.now() if now.day == 1 and now.month in (1, 4, 7, 10): run_ppt_auto_generation_task("quarterly") def _ppt_half_yearly_gate(): from datetime import datetime as _dt now = _dt.now() if now.day == 1 and now.month in (1, 7): run_ppt_auto_generation_task("half_yearly") def _ppt_annual_gate(): from datetime import datetime as _dt now = _dt.now() if now.day == 1 and now.month == 1: run_ppt_auto_generation_task("annual") schedule.every().day.at("20:50").do(_ppt_monthly_gate) logger.info("📅 每月1日 20:50:ppt_auto_generation_monthly(月報與管理型簡報)") schedule.every().day.at("21:00").do(_ppt_quarterly_gate) logger.info("📅 每季首月1日 21:00:ppt_auto_generation_quarterly(季報)") schedule.every().day.at("21:10").do(_ppt_half_yearly_gate) logger.info("📅 每半年首月1日 21:10:ppt_auto_generation_half_yearly(半年報)") schedule.every().day.at("21:20").do(_ppt_annual_gate) logger.info("📅 每年1月1日 21:20:ppt_auto_generation_annual(年報)") schedule.every(10).minutes.do(run_ppt_auto_generation_catchup_task) logger.info("📅 每 10 分鐘:ppt_auto_generation_catchup(補跑錯過的定期簡報)") # Phase 26: PPT 視覺審核(每日 22:00 掃當天新生 .pptx,有 issues 才推 Telegram) schedule.every().day.at("22:00").do(run_ppt_vision_audit) logger.info("📅 每日 22:00:ppt_vision_audit(PPT_VISION_ENABLED=true 才生效)") schedule.every().day.at("03:00").do(run_db_backup_task) logger.info("📅 每日 03:00:db_backup") schedule.every().day.at("03:30").do(run_cleanup_agent_context) logger.info("📅 每日 03:30:cleanup_agent_context") schedule.every().day.at("04:00").do(run_backup_monitor_task) logger.info("📅 每日 04:00:backup_monitor") schedule.every().monday.at("06:00").do(run_weekly_strategy_task) logger.info("📅 每週一 06:00:weekly_strategy") schedule.every().day.at("09:00").do(run_daily_report_task) logger.info("📅 每日 09:00:daily_report") schedule.every().day.at("09:10").do(run_ai_smoke_daily_summary_task) logger.info("📅 每日 09:10:ai_smoke_daily_summary") schedule.every().day.at("10:30").do(run_pchome_match_backfill_task) logger.info("📅 每日 10:30:pchome_match_backfill") schedule.every().day.at("10:45").do(run_pchome_growth_momo_backfill_task) logger.info("📅 每日 10:45:pchome_growth_momo_backfill(高業績商品補 MOMO 對應)") # Operation Ollama-First v5.0 — Phase 1 收尾:每日 23:55 LLM Token 日報 schedule.every().day.at("23:55").do(run_daily_token_report_task) logger.info("📅 每日 23:55:daily_token_report") # 每月1日 07:00 月報(schedule 不支援 every().month,用每日 07:00 + 日期判斷) def _monthly_report_gate(): from datetime import datetime as _dt if _dt.now().day == 1: run_monthly_report_task() schedule.every().day.at("07:00").do(_monthly_report_gate) logger.info("📅 每月1日 07:00:monthly_report") def run_daily_token_report_task(): """每日 23:55 — Operation Ollama-First v5.0 Phase 1 收尾:LLM Token 日報。 任務: 1. 查 ai_calls 過去 24h 統計(總覽 / 供應商 / TOP caller / 成本 / 趨勢 / 告警) 2. 推 Telegram + 寫 ai_insights(type='daily_token_report') 紀律: - 失敗安全:DB 查不到資料 → 推「⚠️ 報表生成失敗」訊息但不爆 scheduler - 不影響其他排程:例外完全吞掉,僅 log error """ try: from services.token_report_service import send_daily_report result = send_daily_report() logger.info( "[TokenReport] sent=%s failed=%s chars=%s ok=%s", result.get('sent'), result.get('failed'), result.get('chars'), result.get('ok'), ) except Exception as e: logger.error(f"[TokenReport] task failed: {e}", exc_info=True) _notify_scheduler_failure( "run_daily_token_report_task", e, source="Scheduler.TokenReport", event_type="daily_token_report_failure", title="LLM Token 日報失敗", ) # ───────────────────────────────────────────────────────────────────────────── # Operation Ollama-First v5.0 Phase 11+ — RAG 學習迴圈 worker(Phase 12 收尾) # ───────────────────────────────────────────────────────────────────────────── def run_promotion_gate_worker(): """每 5 分鐘 — 批次處理 learning_episodes pending → can_promote → promote/reject/await。 依 ADR-032 PromotionGate 4 階段,不主動跑 LLM(Distiller 純規則引擎)。 RAG_ENABLED=false 時 learning_episodes 為空,worker 跑空 loop(無害)。 """ try: from services.learning_pipeline import process_pending_episodes stats = process_pending_episodes() if stats.get('pending_seen', 0) > 0: logger.info( "[PromotionWorker] pending=%d promoted=%d rejected=%d awaiting=%d errors=%d", stats['pending_seen'], stats['promoted'], stats['rejected'], stats['awaiting'], stats['errors'], ) except Exception as e: logger.error(f"[PromotionWorker] task failed: {e}", exc_info=True) _notify_scheduler_failure( "run_promotion_gate_worker", e, source="Scheduler.PromotionGate", event_type="promotion_gate_worker_failure", title="PromotionGate worker 失敗", ) def run_awaiting_review_push(): """每 30 分鐘 — 推 awaiting_review episode 到 Telegram 等 👍/👎。 限制:TELEGRAM_ADMIN_CHAT_ID 未設則跳過(fail-safe)。 """ try: from services.learning_pipeline import push_awaiting_reviews_to_telegram pushed = push_awaiting_reviews_to_telegram() if pushed > 0: logger.info("[AwaitingReviewPush] pushed=%d episodes", pushed) except Exception as e: logger.error(f"[AwaitingReviewPush] task failed: {e}", exc_info=True) _notify_scheduler_failure( "run_awaiting_review_push", e, source="Scheduler.PromotionGate", event_type="awaiting_review_push_failure", title="PromotionGate 審查推播失敗", ) def run_expire_stale_reviews(): """每 4 小時 — 24h 無回應 awaiting_review → expired(weight=0.5)。 依 ADR-033 護欄 #1 Stage 4 規則。 """ try: from services.learning_pipeline import expire_stale_reviews n = expire_stale_reviews() if n > 0: logger.info("[ExpireStale] expired %d awaiting_review episodes (24h timeout)", n) except Exception as e: logger.error(f"[ExpireStale] task failed: {e}", exc_info=True) _notify_scheduler_failure( "run_expire_stale_reviews", e, source="Scheduler.PromotionGate", event_type="expire_stale_reviews_failure", title="PromotionGate 過期審查清理失敗", ) def run_action_plan_hygiene_task(): """每 6 小時 — 關閉過期 advisory action_plans,避免 queue 再次膨脹。""" try: from services.action_plan_hygiene import run_action_plan_hygiene result = run_action_plan_hygiene() updated = int(result.get("updated_count") or 0) if updated > 0: logger.info( "[ActionPlanHygiene] updated=%d by_source=%s", updated, result.get("by_source"), ) else: logger.debug("[ActionPlanHygiene] no stale advisory action plans") except Exception as e: logger.error(f"[ActionPlanHygiene] task failed: {e}", exc_info=True) _notify_scheduler_failure( "run_action_plan_hygiene_task", e, source="Scheduler.ActionPlanHygiene", event_type="action_plan_hygiene_failure", title="Action Plan 過期清理失敗", ) def run_cost_throttle_evaluate(): """每小時 — Phase 20 成本自動節流評估(COST_THROTTLE_ENABLED 預設 OFF)。 跑 evaluate_throttle_status;若有 provider 月底推估超預算 110% → 標 throttled。 狀態變化(throttle/unthrottle)會自動推 Telegram。 """ try: from services.cost_throttle_service import ( evaluate_throttle_status, is_cost_throttle_enabled, ) if not is_cost_throttle_enabled(): return # flag OFF 直接 skip(不打 DB) state = evaluate_throttle_status() throttled = [p for p, info in state.items() if info.get('throttled')] if throttled: logger.warning("[CostThrottle] currently throttled: %s", throttled) else: logger.debug("[CostThrottle] no provider throttled") except Exception as e: logger.error(f"[CostThrottle] task failed: {e}", exc_info=True) _notify_scheduler_failure( "run_cost_throttle_evaluate", e, source="Scheduler.CostThrottle", event_type="cost_throttle_evaluate_failure", title="成本節流評估失敗", ) def run_host_health_probe(): """Phase 42 — 每 15 分鐘自動 probe 三主機 Ollama,寫入 host_health_probes。 Phase 43 增強:偵測 state transition (healthy→unhealthy / unhealthy→healthy) → 主動推 Telegram 告警 + inline AutoHeal 按鈕,完整「監控→告警→修復」閉環。 Dedup: 1 小時內同 host 同方向 transition 只推一次(防 flapping 洗版)。 """ try: import time as _time import requests as _r from sqlalchemy import text as _sa from database.manager import DatabaseManager from services.ollama_service import ( OLLAMA_HOST_PRIMARY, OLLAMA_HOST_SECONDARY, OLLAMA_HOST_FALLBACK, _is_unhealthy, ) records = [] for label, host in [ ('Primary (GCP)', OLLAMA_HOST_PRIMARY), ('Secondary (GCP)', OLLAMA_HOST_SECONDARY), ('Fallback (111)', OLLAMA_HOST_FALLBACK), ]: t0 = _time.monotonic() healthy = False err = None models_count = 0 try: resp = _r.get(f"{host.rstrip('/')}/api/tags", timeout=3) if resp.status_code == 200: healthy = True models_count = len(resp.json().get('models', []) or []) if _host_health_model_probe_enabled(label): model_ok, model_err = _probe_ollama_embedding_runtime(_r, host) if not model_ok: healthy = False err = model_err else: err = f"HTTP {resp.status_code}" except Exception as e: err = f"{type(e).__name__}: {str(e)[:200]}" records.append({ 'host_label': label, 'host_url': host, 'healthy': healthy, 'unhealthy_mark': _is_unhealthy(host), 'models_count': models_count, 'response_ms': int((_time.monotonic() - t0) * 1000), 'error_msg': err, }) # Phase 43: state transition 偵測(在寫入新筆 *之前* 查上一筆狀態) transitions = [] try: session = DatabaseManager().get_session() try: for rec in records: last = session.execute( _sa(""" SELECT healthy, probed_at FROM host_health_probes WHERE host_label = :label ORDER BY probed_at DESC LIMIT 1 """), {'label': rec['host_label']}, ).fetchone() if last is None: continue # 第一筆探針,無對比基準 prev_healthy = bool(last[0]) if prev_healthy != rec['healthy']: # 同方向 1h 內 dedup recent_transition = session.execute( _sa(""" SELECT 1 FROM host_health_probes p1 WHERE p1.host_label = :label AND p1.probed_at >= NOW() - INTERVAL '1 hour' AND p1.healthy = :curr_healthy AND EXISTS ( SELECT 1 FROM host_health_probes p2 WHERE p2.host_label = :label AND p2.probed_at < p1.probed_at AND p2.probed_at >= NOW() - INTERVAL '90 minutes' AND p2.healthy != :curr_healthy ) LIMIT 1 """), {'label': rec['host_label'], 'curr_healthy': rec['healthy']}, ).fetchone() if recent_transition is None: transitions.append({ 'host_label': rec['host_label'], 'host_url': rec['host_url'], 'prev_healthy': prev_healthy, 'curr_healthy': rec['healthy'], 'error_msg': rec.get('error_msg'), 'response_ms': rec.get('response_ms'), }) finally: session.close() except Exception as e: logger.warning(f"[HostHealthProbe] transition detect failed: {e}") # 批次寫 DB session = DatabaseManager().get_session() try: for rec in records: session.execute( _sa(""" INSERT INTO host_health_probes (host_label, host_url, healthy, unhealthy_mark, models_count, response_ms, error_msg) VALUES (:host_label, :host_url, :healthy, :unhealthy_mark, :models_count, :response_ms, :error_msg) """), rec, ) session.commit() finally: session.close() # Phase 43: 推 Telegram 告警 / 恢復通知 for tr in transitions: try: _push_host_transition_alert(tr) except Exception as e: logger.error(f"[HostHealthProbe] push alert failed for {tr['host_label']}: {e}") unhealthy = [r['host_label'] for r in records if not r['healthy']] if unhealthy: logger.warning(f"[HostHealthProbe] unhealthy hosts: {unhealthy}") else: logger.debug("[HostHealthProbe] all 3 hosts healthy") except Exception as e: logger.error(f"[HostHealthProbe] failed: {e}", exc_info=True) _notify_scheduler_failure( "run_host_health_probe", e, source="Scheduler.HostHealth", event_type="host_health_probe_failure", title="三主機健康探針失敗", ) def _push_host_transition_alert(tr): """Phase 43: 主機 state transition → 推 Telegram。 healthy → unhealthy:P1 告警 + inline 「🩹 修 {label}」按鈕 unhealthy → healthy:P3 簡訊「已恢復」(不附按鈕) """ try: from services.telegram_templates import send_telegram_with_result except Exception: return label = tr['host_label'] short_label = ( 'GCP-A' if 'Primary' in label else 'GCP-B' if 'Secondary' in label else '111' ) if tr['curr_healthy']: # 恢復通知(無按鈕) text = ( f"✅ Ollama 主機已恢復\n\n" f"主機:{label}\n" f"網址:{tr['host_url']}\n" f"回應:{tr['response_ms']} ms\n\n" f"scheduler 每 15 分鐘自動探針偵測" ) send_telegram_with_result(text, parse_mode='HTML') else: # 故障告警 + inline AutoHeal 按鈕 err_short = (tr.get('error_msg') or '無錯誤訊息')[:200] text = ( f"🚨 Ollama 主機異常\n\n" f"主機:{label}\n" f"網址:{tr['host_url']}\n" f"錯誤:{err_short}\n\n" f"💡 點下方按鈕一鍵觸發 ADR-013 AutoHeal Playbook," f"或至 觀測台 詳查。" ) reply_markup = { "inline_keyboard": [ [{"text": f"🩹 立即 AutoHeal {short_label}", "callback_data": f"cmd:obs_heal:{short_label}"}], [{"text": "📊 查 24h 健康統計", "callback_data": "cmd:obs_health"}], ], } send_telegram_with_result(text, reply_markup=reply_markup, parse_mode='HTML') def run_ai_calls_error_spike_check(): """Phase 44 — 每 30 分鐘檢查 AI 呼叫錯誤率,異常時主動告警。 觸發條件:過去 1 小時 ai_calls 錯誤率 ≥ 30% 且總呼叫 ≥ 20 Dedup:1 小時內同性質告警只推一次(看 push log)— 簡化用 60min 視窗 + 已告警 flag。 """ try: from sqlalchemy import text as _sa from database.manager import DatabaseManager session = DatabaseManager().get_session() try: row = session.execute( _sa(""" SELECT COUNT(*) AS total, COUNT(*) FILTER (WHERE status NOT IN ('ok','cache_only')) AS errors, COUNT(DISTINCT caller) AS distinct_callers FROM ai_calls WHERE called_at >= NOW() - INTERVAL '1 hour' """), ).fetchone() total = int(row[0] or 0) errors = int(row[1] or 0) if total < 20: return # 樣本太少不告警 error_rate = errors / total if error_rate < 0.30: return # 低於閾值,正常 # 查 top 3 problematic caller top_callers = session.execute( _sa(""" SELECT caller, COUNT(*) AS calls, COUNT(*) FILTER (WHERE status NOT IN ('ok','cache_only')) AS errs FROM ai_calls WHERE called_at >= NOW() - INTERVAL '1 hour' GROUP BY caller HAVING COUNT(*) FILTER (WHERE status NOT IN ('ok','cache_only')) >= 3 ORDER BY errs DESC LIMIT 3 """), ).fetchall() finally: session.close() # 推 Telegram 告警 + inline Code Review 按鈕。 # 1 小時內同類 spike 只推一次,避免每 30 分鐘洗版。 global _AI_CALLS_ERROR_SPIKE_LAST_PUSH_TS now_ts = time.time() if now_ts - _AI_CALLS_ERROR_SPIKE_LAST_PUSH_TS < 3600: logger.info("[AICallsErrorSpike] skip duplicate alert within 1h window") return from services.telegram_templates import send_telegram_with_result lines = [ "🚨 AI 呼叫錯誤率異常", "", f"過去 1 小時:{errors}/{total} 失敗({error_rate*100:.1f}%)", "", ] if top_callers: lines.append("問題 caller Top 3:") for c in top_callers: lines.append(f"• {c[0]}:{c[2]}/{c[1]} 失敗") lines.append("") lines.append("💡 點按鈕觸發 Code Review Pipeline 自動審查最新 commit。") reply_markup = { "inline_keyboard": [ [{"text": "🔬 觸發 Code Review", "callback_data": "cmd:obs_trigger_review"}], [{"text": "📊 查 24h AI 呼叫", "callback_data": "cmd:obs_ai_calls"}], ], } send_telegram_with_result('\n'.join(lines), reply_markup=reply_markup, parse_mode='HTML') _AI_CALLS_ERROR_SPIKE_LAST_PUSH_TS = now_ts logger.warning( f"[AICallsErrorSpike] alert pushed: total={total} errors={errors} " f"rate={error_rate*100:.1f}%" ) except Exception as e: logger.error(f"[AICallsErrorSpike] failed: {e}", exc_info=True) _notify_scheduler_failure( "run_ai_calls_error_spike_check", e, source="Scheduler.Observability", event_type="ai_calls_error_spike_check_failure", title="AI 呼叫錯誤率檢查失敗", ) def run_ollama_111_usage_guard_check(): """Phase 57 — final fallback 111 使用率告警。 111 是最後防線;這個 guard 只觀測 ai_calls,不改路由。 預設條件:最近 60 分鐘 Ollama 呼叫 >= 20、111 呼叫 >= 3、111 占比 >= 5%。 """ if not _env_flag("OLLAMA_111_USAGE_ALERT_ENABLED", True): return try: from sqlalchemy import text as _sa from database.manager import DatabaseManager window_minutes = int(os.getenv("OLLAMA_111_USAGE_ALERT_WINDOW_MINUTES", "60")) threshold_pct = float(os.getenv("OLLAMA_111_USAGE_ALERT_PCT", "5")) min_total = int(os.getenv("OLLAMA_111_USAGE_ALERT_MIN_TOTAL", "20")) min_111 = int(os.getenv("OLLAMA_111_USAGE_ALERT_MIN_111", "3")) dedup_sec = int(os.getenv("OLLAMA_111_USAGE_ALERT_DEDUP_SEC", "3600")) session = DatabaseManager().get_session() try: row = session.execute( _sa(""" SELECT COUNT(*) FILTER ( WHERE provider IN ('gcp_ollama','ollama_secondary','ollama_111') ) AS total_ollama, COUNT(*) FILTER (WHERE provider = 'gcp_ollama') AS gcp_a, COUNT(*) FILTER (WHERE provider = 'ollama_secondary') AS gcp_b, COUNT(*) FILTER (WHERE provider = 'ollama_111') AS host_111 FROM ai_calls WHERE called_at >= NOW() - (:window_minutes || ' minutes')::interval """), {"window_minutes": window_minutes}, ).fetchone() total_ollama = int(row[0] or 0) gcp_a = int(row[1] or 0) gcp_b = int(row[2] or 0) host_111 = int(row[3] or 0) if total_ollama < min_total or host_111 < min_111: return rate_pct = (host_111 / total_ollama * 100.0) if total_ollama else 0.0 if rate_pct < threshold_pct: return top_callers = session.execute( _sa(""" SELECT caller, COALESCE(model, '') AS model, COUNT(*) AS calls, COALESCE(SUM(input_tokens + output_tokens), 0) AS tokens, COUNT(*) FILTER (WHERE status NOT IN ('ok','cache_only')) AS errors FROM ai_calls WHERE called_at >= NOW() - (:window_minutes || ' minutes')::interval AND provider = 'ollama_111' GROUP BY caller, model ORDER BY calls DESC, tokens DESC LIMIT 5 """), {"window_minutes": window_minutes}, ).fetchall() finally: session.close() global _OLLAMA_111_USAGE_LAST_PUSH_TS now_ts = time.time() if now_ts - _OLLAMA_111_USAGE_LAST_PUSH_TS < dedup_sec: logger.info("[Ollama111Guard] skip duplicate alert within %ss window", dedup_sec) return from services.telegram_templates import send_telegram_with_result lines = [ "⚠️ 111 Ollama 使用率偏高", "", f"過去 {window_minutes} 分鐘 Ollama 呼叫:{total_ollama} 次", f"111 fallback:{host_111} 次({rate_pct:.1f}%)", f"GCP-A:{gcp_a} 次 · GCP-B:{gcp_b} 次", "", ] if top_callers: lines.append("111 caller Top 5:") for caller, model, calls, tokens, errors in top_callers: model_part = f" / {model}" if model else "" err_part = f" · err {errors}" if int(errors or 0) else "" lines.append( f"• {caller}{model_part}:{calls} 次 · {int(tokens or 0):,} tokens{err_part}" ) lines.append("") lines.extend([ "建議先看 GCP-A/GCP-B health probe 與近期 unhealthy mark;", "若 GCP 正常,檢查是否有 fallback flag 或重任務意外打到 111。", ]) reply_markup = { "inline_keyboard": [ [{"text": "🏥 主機健康", "callback_data": "cmd:obs_health"}, {"text": "📊 AI 呼叫", "callback_data": "cmd:obs_ai_calls"}], ], } send_telegram_with_result("\n".join(lines), reply_markup=reply_markup, parse_mode="HTML") _OLLAMA_111_USAGE_LAST_PUSH_TS = now_ts logger.warning( "[Ollama111Guard] alert pushed: total=%s gcp_a=%s gcp_b=%s host_111=%s rate=%.1f%%", total_ollama, gcp_a, gcp_b, host_111, rate_pct, ) except Exception as e: logger.error(f"[Ollama111Guard] failed: {e}", exc_info=True) _notify_scheduler_failure( "run_ollama_111_usage_guard_check", e, source="Scheduler.Ollama111Guard", event_type="ollama_111_usage_guard_failure", title="111 Ollama 使用率護欄失敗", ) def run_observability_daily_summary(): """Phase 44 — 每日 09:30 推送觀測台健康摘要(早晨報)。 一頁式涵蓋:三主機 24h uptime / AI 呼叫量 / 當月 cost / 待審 episode 數 / PPT audit 通過率。 讓統帥早上滑手機就能看到全景,不用主動開觀測台。 """ try: from sqlalchemy import text as _sa from database.manager import DatabaseManager from datetime import datetime as _dt session = DatabaseManager().get_session() try: # 三主機 24h uptime host_rows = session.execute( _sa(""" SELECT host_label, COUNT(*) AS total, COUNT(*) FILTER (WHERE healthy) AS up FROM host_health_probes WHERE probed_at >= NOW() - INTERVAL '24 hours' GROUP BY host_label ORDER BY host_label """), ).fetchall() # AI 呼叫 24h ai_row = session.execute( _sa(""" SELECT COUNT(*) AS total, COALESCE(SUM(input_tokens + output_tokens), 0) AS tokens, COALESCE(SUM(cost_usd), 0) AS cost, COUNT(*) FILTER (WHERE status NOT IN ('ok','cache_only')) AS errors, COUNT(*) FILTER (WHERE rag_hit) AS rag_hits FROM ai_calls WHERE called_at >= NOW() - INTERVAL '24 hours' """), ).fetchone() # 當月 cost today = _dt.now() month_start = _dt(today.year, today.month, 1) month_cost = session.execute( _sa("SELECT COALESCE(SUM(cost_usd), 0) FROM ai_calls WHERE called_at >= :ms"), {'ms': month_start}, ).fetchone()[0] or 0 # 待審 episodes ep_pending = session.execute( _sa("SELECT COUNT(*) FROM learning_episodes WHERE promotion_status = 'awaiting_review' AND reviewed_at IS NULL"), ).fetchone()[0] or 0 # PPT audit 7d ppt_row = session.execute( _sa(""" SELECT COUNT(*) AS total, COUNT(*) FILTER (WHERE audit_status = 'passed') AS passed, COUNT(*) FILTER (WHERE audit_status = 'failed') AS failed FROM ppt_audit_results WHERE audited_at >= NOW() - INTERVAL '7 days' """), ).fetchone() # Phase 49: 商業面未跟進機會(high-confidence 卻無 action_plan) unfollowed_count = 0 try: unfollowed_count = session.execute( _sa(""" SELECT COUNT(*) FROM ai_price_recommendations r WHERE r.created_at >= NOW() - INTERVAL '7 days' AND r.confidence >= 0.7 AND NOT EXISTS ( SELECT 1 FROM action_plans p WHERE p.sku = r.sku AND p.created_at >= r.created_at AND p.created_at < r.created_at + INTERVAL '7 days' ) """), ).fetchone()[0] or 0 except Exception: pass finally: session.close() ai_total = int(ai_row[0] or 0) ai_errors = int(ai_row[3] or 0) ai_rag = int(ai_row[4] or 0) ai_cost = float(ai_row[2] or 0) ai_tokens = int(ai_row[1] or 0) ppt_total = int(ppt_row[0] or 0) ppt_passed = int(ppt_row[1] or 0) ppt_failed = int(ppt_row[2] or 0) lines = [ f"🛰 觀測台日報 — {today.strftime('%Y-%m-%d')}", "", "三主機 24h 在線率:", ] if host_rows: for label, total, up in host_rows: pct = (float(up) / float(total) * 100) if total else 0 emoji = "✅" if pct >= 99 else "⚠️" if pct >= 90 else "🚨" lines.append(f"{emoji} {label}:{pct:.1f}%({up}/{total})") else: lines.append("(無資料 — scheduler probe 尚未啟動?)") lines.append("") lines.append("AI 呼叫 24h:") err_rate = (ai_errors / ai_total * 100) if ai_total else 0 rag_rate = (ai_rag / ai_total * 100) if ai_total else 0 err_emoji = "✅" if err_rate < 5 else "⚠️" if err_rate < 15 else "🚨" lines.append(f"• 呼叫:{ai_total:,} 次 · Token:{ai_tokens:,}") lines.append(f"• 24h 成本:${ai_cost:.2f} · 當月累計:${month_cost:.2f}") lines.append(f"{err_emoji} 錯誤率:{err_rate:.1f}%({ai_errors})") lines.append(f"💡 RAG 命中率:{rag_rate:.1f}%({ai_rag})") if ep_pending: lines.append("") lines.append(f"📋 待審 episodes:{ep_pending} 筆") if ppt_total: lines.append("") ppt_pass_rate = (ppt_passed / ppt_total * 100) if ppt_total else 0 ppt_emoji = "✅" if ppt_pass_rate >= 80 else "⚠️" lines.append(f"{ppt_emoji} PPT 視覺審核 7d:{ppt_passed}/{ppt_total} 通過({ppt_pass_rate:.0f}%)") if ppt_failed: lines.append(f" 失敗:{ppt_failed} 筆") if unfollowed_count > 0: lines.append("") lines.append(f"⚠️ 商業面未跟進:{unfollowed_count} 筆" f"(high-confidence AI 建議未轉化為 action_plan)") lines.append("") lines.append('→ 開觀測台總覽') from services.telegram_templates import send_telegram_with_result reply_markup = { "inline_keyboard": [ [{"text": "🛰 觀測台總覽", "callback_data": "cmd:obs_overview"}, {"text": "🌐 Agent 編排", "callback_data": "cmd:obs_orchestration"}], [{"text": "💼 商業面 AI", "callback_data": "cmd:obs_business"}, {"text": "🏥 主機健康", "callback_data": "cmd:obs_health"}], [{"text": "📊 AI 呼叫", "callback_data": "cmd:obs_ai_calls"}, {"text": "💰 預算", "callback_data": "cmd:obs_budget"}], ], } send_telegram_with_result('\n'.join(lines), reply_markup=reply_markup, parse_mode='HTML') logger.info("[ObservabilityDaily] summary pushed to Telegram") except Exception as e: logger.error(f"[ObservabilityDaily] failed: {e}", exc_info=True) _notify_scheduler_failure( "run_observability_daily_summary", e, source="Scheduler.Observability", event_type="observability_daily_summary_failure", title="觀測台每日摘要失敗", ) def run_host_health_probe_cleanup(): """Phase 42 — 每日 03:00 清 host_health_probes 30 天前資料。""" try: from sqlalchemy import text as _sa from database.manager import DatabaseManager session = DatabaseManager().get_session() try: result = session.execute( _sa("DELETE FROM host_health_probes WHERE probed_at < NOW() - INTERVAL '30 days'"), ) session.commit() logger.info(f"[HostHealthProbe] cleanup: {result.rowcount} rows deleted (>30d)") finally: session.close() except Exception as e: logger.error(f"[HostHealthProbe] cleanup failed: {e}", exc_info=True) _notify_scheduler_failure( "run_host_health_probe_cleanup", e, source="Scheduler.HostHealth", event_type="host_health_probe_cleanup_failure", title="主機健康探針清理失敗", ) def run_cost_throttle_reset_if_new_month(): """每日 00:05 — 若當天是月份第 1 天,清 throttle state(跨月重置)。""" try: from datetime import datetime from services.cost_throttle_service import reset_throttle_state if datetime.now().day == 1: reset_throttle_state() logger.info("[CostThrottle] new month detected, state reset") except Exception as e: logger.error(f"[CostThrottle] reset failed: {e}", exc_info=True) _notify_scheduler_failure( "run_cost_throttle_reset_if_new_month", e, source="Scheduler.CostThrottle", event_type="cost_throttle_reset_failure", title="成本節流跨月重置失敗", ) def run_ppt_vision_audit(): """每日 22:00 — Phase 26 PPT 視覺審核 掃 reports/ 目錄當天新生 .pptx,跑 minicpm-v 視覺檢查,有 issues 推 Telegram。 PPT_VISION_ENABLED=false 時 audit_recent_ppts 內部直接 skip(不打 LLM)。 需 LibreOffice headless 在 PATH(轉 .pptx → png);不在則 fail-safe skip。 """ try: from services.ppt_vision_service import audit_recent_ppts, push_ppt_audit_to_telegram summary = audit_recent_ppts( reports_dir=os.getenv('REPORTS_DIR', '/app/data/reports'), hours=24, max_files=10, ) if summary['total_issues'] > 0: pushed = push_ppt_audit_to_telegram(summary) logger.info( "[PPTVisionAudit] %d files, %d issues, telegram=%s", len(summary['audited_files']), summary['total_issues'], pushed, ) else: logger.debug("[PPTVisionAudit] no issues found") except Exception as e: logger.error(f"[PPTVisionAudit] task failed: {e}", exc_info=True) _notify_scheduler_failure( "run_ppt_vision_audit", e, source="Scheduler.PPTVision", event_type="ppt_vision_audit_failure", title="PPT 視覺審核失敗", ) def run_ppt_auto_generation_catchup_task(): """每 10 分鐘補跑被同步長任務錯過的定期 PPT 產出。""" try: from services.ppt_auto_generation_service import start_scheduled_ppt_catchup_background result = start_scheduled_ppt_catchup_background() if result.get("status") == "queued": logger.info( "[PPTAutoGenerationCatchup] queued kinds=%s", ",".join(result.get("schedule_kinds") or []), ) elif result.get("status") == "already_running": logger.debug("[PPTAutoGenerationCatchup] skipped; generation already running") else: logger.debug("[PPTAutoGenerationCatchup] status=%s", result.get("status")) except Exception as e: logger.error(f"[PPTAutoGenerationCatchup] task failed: {e}", exc_info=True) _notify_scheduler_failure( "run_ppt_auto_generation_catchup_task", e, source="Scheduler.PPTAutoGeneration", event_type="ppt_auto_generation_catchup_failure", title="PPT 定期簡報補跑失敗", ) def run_roi_monthly_report_if_new_month(): """每日 09:00 — Phase 24 ROI 月報(內部判斷月初第 1 日才送) 對比上月 ai_calls 統計 vs 戰前 baseline,推 Telegram「節省 X tokens / $Y」。 寫入 ai_insights (type='roi_monthly_report') 作長期記錄。 """ try: from datetime import datetime if datetime.now().day != 1: return # 非月初第 1 日 skip from services.roi_report_service import generate_and_send_roi_report result = generate_and_send_roi_report() logger.info("[ROIReport] sent=%s period=%s", result.get('sent'), result.get('period', '?')) except Exception as e: logger.error(f"[ROIReport] task failed: {e}", exc_info=True) _notify_scheduler_failure( "run_roi_monthly_report_if_new_month", e, source="Scheduler.ROIReport", event_type="roi_monthly_report_failure", title="ROI 月報失敗", ) def run_embed_consistency_check(): """每週日 04:30 — BGE-M3 跨 GCP Ollama 一致性驗證(ADR-033 護欄 #3)。 跑 verify_embedding_consistency,不一致時 logger.error;ok 時 logger.info。 每週一次足夠(驗證模型版本未漂移;過頻會打 Ollama 浪費)。 111 Mac fallback 預設不參與,避免背景檢查載入 bge-m3 壓住 16GB 主機。 """ try: from services.rag_service import verify_embedding_consistency result = verify_embedding_consistency() logger.info( "[EmbedConsistency] ok=%s reachable=%s max_diff=%.2e signature=%s", result['ok'], result['reachable'], result['max_diff'], result['signature'], ) if not result['ok']: max_diff = float(result.get('max_diff') or 0.0) detail = ( "BGE-M3 embedding consistency mismatch " f"reachable={result.get('reachable')} " f"max_diff={max_diff:.2e} " f"signature={result.get('signature')}" ) logger.error( "[EmbedConsistency] ⚠️ INCONSISTENT — RAG 召回率將下降;" "檢查 GCP-A/GCP-B bge-m3 模型版本是否同步(ollama list)" ) _notify_scheduler_failure( "run_embed_consistency_check", RuntimeError(detail), source="Scheduler.RAG", event_type="embed_consistency_mismatch", title="BGE-M3 一致性異常", dedup_ttl_sec=86400, ) except Exception as e: logger.error(f"[EmbedConsistency] task failed: {e}", exc_info=True) _notify_scheduler_failure( "run_embed_consistency_check", e, source="Scheduler.RAG", event_type="embed_consistency_check_failure", title="BGE-M3 一致性檢查失敗", ) def run_cleanup_agent_context(): """每日 03:30 — 清理 agent_context 表中已過期的 TTL 記錄(migration 018 定義)""" from database.manager import get_session from sqlalchemy import text session = get_session() try: session.execute(text("SELECT cleanup_expired_agent_context()")) session.commit() logger.info("[Cleanup] agent_context TTL 清理完成") except Exception as e: logger.error(f"[Cleanup] agent_context 清理失敗: {e}") try: from services.event_router import notify_failure notify_failure( task_name="run_cleanup_agent_context", error=e, source="Scheduler.Cleanup", event_type="agent_context_cleanup_failure", priority="P2", title="agent_context TTL 清理失敗", dedup_ttl_sec=3600, ) except Exception as _router_e: logger.error(f"[Cleanup] event_router 失敗: {_router_e}") finally: session.close() def _run_elephant_alpha_engine(): """Daemon thread: ElephantAlpha 自主監控引擎(獨立 asyncio loop)""" loop = None try: from services.elephant_alpha_autonomous_engine import autonomous_engine loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) logger.info("🐘 [ElephantAlpha] Autonomous engine thread started") loop.run_until_complete(autonomous_engine.start_autonomous_monitoring()) except Exception as e: logger.error(f"🐘 [ElephantAlpha] Engine crashed: {e}") finally: if loop is not None: loop.close() if __name__ == "__main__": logger.info("🚀 momo-scheduler 啟動中...") _register_schedules() logger.info("✅ 全部排程任務已註冊") _ea_thread = threading.Thread( target=_run_elephant_alpha_engine, daemon=True, name="elephant-alpha-engine", ) _ea_thread.start() logger.info("🐘 [ElephantAlpha] Autonomous engine thread launched") logger.info("⏰ 排程主迴圈啟動,等待任務觸發...") _ea_watchdog_counter = 0 # 每 60 秒(60 次 sleep(1))做一次存活檢查 while True: try: schedule.run_pending() time.sleep(1) # 每 60 秒檢查 ElephantAlpha 執行緒是否還活著 _ea_watchdog_counter += 1 if _ea_watchdog_counter >= 60: _ea_watchdog_counter = 0 if not _ea_thread.is_alive(): logger.error("[ElephantAlpha] 監控執行緒已死亡,嘗試重啟") try: from services.event_router import dispatch_sync as _dispatch_sync _dispatch_sync({ "source": "Scheduler.ElephantAlpha", "event_type": "thread_crashed", "severity": "alert", "title": "ElephantAlpha 執行緒死亡", "status": "自動重啟中", "impact": "P2 - 自主監控引擎暫停", "summary": "ElephantAlphaEngine daemon thread 意外終止,排程主迴圈已偵測並觸發重啟", }) except Exception as _alert_err: logger.error(f"[ElephantAlpha] 無法發送告警: {_alert_err}") _ea_thread = threading.Thread( target=_run_elephant_alpha_engine, daemon=True, name="ElephantAlphaEngine", ) _ea_thread.start() logger.info("[ElephantAlpha] 執行緒已重啟") except KeyboardInterrupt: logger.info("⛔ Scheduler stopped.") break except Exception as e: logger.error(f"Scheduler error: {e}") time.sleep(5)