#!/usr/bin/env python3
"""
run_scheduler.py — momo-scheduler 容器入口點
排程任務清單(對齊 app.py init_scheduler + scheduler.py 全任務):
每 30 分鐘:auto_import、whitepage_check
每 1 小時:momo、edm、festival
每 4 小時:competitor_price_feeder、external_offer_sync、icaim_analysis
每 6 小時:quality_rescore、action_plan_hygiene
每 12 小時:dedup_batch
每 10 分鐘:ppt_auto_generation_catchup(補跑被長任務卡過的定期簡報)
每 1 天 :db_backup(03:00)、cleanup_agent_context(03:30)、backup_monitor(04:00)、daily_report(09:00)、roi_monthly_report gate(09:05)、ai_smoke_summary(09:10)、observability_daily_summary(09:30)、pchome_match_backfill(10:30)、pchome_growth_momo_backfill(10:45)、openclaw_meta_analysis(12:00, Phase 4 降頻)、ppt_auto_generation_daily(20:30)、ppt_vision_audit(22:00)、daily_token_report(23:55)
每 1 週 :weekly_strategy(週一 06:00)、ppt_auto_generation_weekly(週一 20:40)
每 1 月 :monthly_report(每月1日 07:00)、ppt_auto_generation_monthly(每月1日 20:50)
每 1 季 :ppt_auto_generation_quarterly(1/4/7/10 月 1 日 21:00)
每半年 :ppt_auto_generation_half_yearly(1/7 月 1 日 21:10)
每 1 年 :ppt_auto_generation_annual(1 月 1 日 21:20)
"""
import asyncio
import logging
import os
import threading
import time
from typing import Optional, Tuple
import schedule
# 匯入全部排程任務函式
from scheduler import (
run_momo_task,
run_edm_task,
run_festival_task,
run_promo_event_task,
run_auto_import_task,
run_whitepage_check,
run_competitor_price_feeder_task,
run_external_offer_sync_task,
run_pchome_match_backfill_task,
run_pchome_growth_momo_backfill_task,
run_icaim_analysis_task,
run_weekly_strategy_task,
run_db_backup_task,
run_backup_monitor_task,
run_openclaw_meta_analysis_task,
run_dedup_batch_task,
run_quality_rescore_task,
run_daily_report_task,
run_ai_smoke_daily_summary_task,
run_monthly_report_task,
run_ppt_auto_generation_task,
)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
)
logger = logging.getLogger(__name__)
_AI_CALLS_ERROR_SPIKE_LAST_PUSH_TS = 0.0
_OLLAMA_111_USAGE_LAST_PUSH_TS = 0.0
def _env_flag(name: str, default: bool = False) -> bool:
raw = os.getenv(name)
if raw is None:
return default
return str(raw).strip().lower() in {"1", "true", "yes", "on"}
def _host_health_model_probe_enabled(label: str) -> bool:
from services.ollama_health_probe import host_health_model_probe_enabled
return host_health_model_probe_enabled(label)
def _probe_ollama_embedding_runtime(requests_module, host: str) -> Tuple[bool, Optional[str]]:
from services.ollama_health_probe import probe_ollama_embedding_runtime
return probe_ollama_embedding_runtime(requests_module, host)
def _legacy_edm_schedule_enabled() -> bool:
"""Legacy fixed-LPN EDM/Festival crawlers are opt-in to avoid stale campaign browser loops."""
return _env_flag("MOMO_ENABLE_LEGACY_EDM_SCHEDULE", False)
def _seasonal_promo_schedule_enabled() -> bool:
"""Seasonal promo crawlers are opt-in; expired LPNs should not keep opening MOMO pages."""
return _env_flag("MOMO_ENABLE_SEASONAL_PROMO_SCHEDULE", False)
def _notify_scheduler_failure(
task_name: str,
error: Exception,
*,
source: str,
event_type: str,
priority: str = "P2",
title: str = None,
dedup_ttl_sec: int = 3600,
) -> None:
"""Best-effort EventRouter failure notification for scheduler wrapper tasks."""
try:
import traceback
from services.event_router import notify_failure
trace = traceback.format_exc()
if trace.strip() == "NoneType: None":
trace = f"{type(error).__name__}: {error}"
notify_failure(
task_name=task_name,
error=error,
source=source,
event_type=event_type,
priority=priority,
title=title or f"{task_name} 失敗",
trace=trace,
dedup_ttl_sec=dedup_ttl_sec,
)
except Exception as router_error:
logger.error("[%s] event_router notify failed: %s", task_name, router_error)
def _register_schedules():
schedule.every(30).minutes.do(run_auto_import_task)
logger.info("📅 每 30 分鐘:auto_import")
schedule.every(30).minutes.do(run_whitepage_check)
logger.info("📅 每 30 分鐘:whitepage_check")
schedule.every(1).hours.do(run_momo_task)
logger.info("📅 每 1 小時:momo_task")
if _legacy_edm_schedule_enabled():
schedule.every(1).hours.do(run_edm_task)
logger.info("📅 每 1 小時:edm_task")
schedule.every(1).hours.do(run_festival_task)
logger.info("📅 每 1 小時:festival_task")
else:
logger.info(
"⏸️ legacy EDM/festival crawler schedules disabled "
"(set MOMO_ENABLE_LEGACY_EDM_SCHEDULE=true to enable)"
)
# 動態註冊促銷活動爬蟲(根據配置)
from services.crawler_config_loader import get_enabled_crawlers
enabled_crawlers = get_enabled_crawlers()
promo_event_configs = {
'mothers_day_2026': {'lpn': '', 'page_type': 'mothers_day', 'name': '母親節超值限時購'},
'valentine_520_2026': {'lpn': '', 'page_type': 'valentine_520', 'name': '520情人節限定購物'},
'labor_day_2026': {'lpn': '', 'page_type': 'labor_day', 'name': '勞動節購物優惠'}
}
if not _seasonal_promo_schedule_enabled():
if any(crawler_key in promo_event_configs for crawler_key in enabled_crawlers):
logger.info(
"⏸️ seasonal promo crawler schedules disabled "
"(set MOMO_ENABLE_SEASONAL_PROMO_SCHEDULE=true to enable)"
)
else:
for crawler_key, config in enabled_crawlers.items():
if crawler_key in promo_event_configs:
event_config = promo_event_configs[crawler_key]
lpn_code = config.get('lpn_code', '')
if lpn_code:
schedule_hours = config.get('schedule_hours', 4)
schedule.every(schedule_hours).hours.do(
lambda lpn=lpn_code, pt=event_config['page_type'], an=event_config['name']:
run_promo_event_task(lpn, pt, an)
)
logger.info(f"📅 每 {schedule_hours} 小時:{event_config['name']} ({event_config['page_type']})")
else:
logger.warning(f"⚠️ {event_config['name']} 未配置 LPN 代碼,跳過排程")
schedule.every(4).hours.do(run_competitor_price_feeder_task)
logger.info("📅 每 4 小時:competitor_price_feeder")
schedule.every(4).hours.do(run_external_offer_sync_task)
logger.info("📅 每 4 小時:external_offer_sync(自動同步 MOMO 外部價格參考)")
schedule.every(4).hours.do(run_icaim_analysis_task)
logger.info("📅 每 4 小時:icaim_analysis")
# Operation Ollama-First v5.0 Phase 4:Meta 自審降頻 6h → 每日 12:00(月省 ~1.875M Gemini tokens)
# icaim_analysis 內原本 line 2233/2253 的額外觸發已同步移除(避免重複呼叫)
schedule.every().day.at("12:00").do(run_openclaw_meta_analysis_task)
logger.info("📅 每日 12:00:openclaw_meta_analysis(Phase 4 降頻:原 6h)")
schedule.every(6).hours.do(run_quality_rescore_task)
logger.info("📅 每 6 小時:quality_rescore")
schedule.every(6).hours.do(run_action_plan_hygiene_task)
logger.info("📅 每 6 小時:action_plan_hygiene(關閉過期 advisory action_plans)")
schedule.every(12).hours.do(run_dedup_batch_task)
logger.info("📅 每 12 小時:dedup_batch")
# Operation Ollama-First v5.0 Phase 11+ — RAG 學習迴圈 worker(Phase 12 收尾)
# 預設 RAG_ENABLED=false 時,learning_episodes 不會有資料,worker 跑空 loop(無害)
schedule.every(5).minutes.do(run_promotion_gate_worker)
logger.info("📅 每 5 分鐘:promotion_gate_worker(pending → promote/reject/await)")
schedule.every(30).minutes.do(run_awaiting_review_push)
logger.info("📅 每 30 分鐘:awaiting_review_push(推 Telegram 等 👍/👎)")
schedule.every(4).hours.do(run_expire_stale_reviews)
logger.info("📅 每 4 小時:expire_stale_reviews(24h 無回應降權 0.5)")
# Phase 11.0 護欄 #3:BGE-M3 跨 GCP Ollama 一致性驗證(ADR-033)
# 每週一次足夠(驗證模型版本未漂移;不需每次啟動)
schedule.every().sunday.at("04:30").do(run_embed_consistency_check)
logger.info("📅 每週日 04:30:bge-m3 GCP-A/GCP-B 一致性驗證(111 opt-in)")
# Phase 42: 三主機 Ollama 健康探針(即使無人開觀測台頁面也持續累積歷史)
schedule.every(15).minutes.do(run_host_health_probe)
logger.info("📅 每 15 分鐘:host_health_probe(三主機 → host_health_probes 表)")
schedule.every().day.at("03:00").do(run_host_health_probe_cleanup)
logger.info("📅 每日 03:00:host_health_probe_cleanup(清 30d 前舊資料)")
# Phase 44: AI 呼叫錯誤率突增告警
schedule.every(30).minutes.do(run_ai_calls_error_spike_check)
logger.info("📅 每 30 分鐘:ai_calls_error_spike_check(錯誤率 ≥ 30% 推 Telegram)")
# Phase 57: 111 Ollama 使用率護欄,避免 final fallback 默默承接高負載
schedule.every(15).minutes.do(run_ollama_111_usage_guard_check)
logger.info("📅 每 15 分鐘:ollama_111_usage_guard_check(111 fallback 使用率告警)")
# Phase 44: 觀測台每日 09:30 健康摘要推送
schedule.every().day.at("09:30").do(run_observability_daily_summary)
logger.info("📅 每日 09:30:observability_daily_summary(早晨報三主機/AI/Cost/PPT)")
# Phase 20: 成本自動節流(COST_THROTTLE_ENABLED 預設 OFF)
schedule.every(1).hours.do(run_cost_throttle_evaluate)
logger.info("📅 每 1 小時:cost_throttle_evaluate")
schedule.every().day.at("00:05").do(run_cost_throttle_reset_if_new_month)
logger.info("📅 每日 00:05:cost_throttle_reset_if_new_month")
# Phase 24: ROI 月報(每日 09:05 跑,內部判斷是否月初第 1 日;避開 09:00 daily_report)
schedule.every().day.at("09:05").do(run_roi_monthly_report_if_new_month)
logger.info("📅 每日 09:05:roi_monthly_report(月初第 1 日才送)")
# PPT 自動簡報補齊(先產出定義中的報表,再交給 22:00 vision audit)
schedule.every().day.at("20:30").do(lambda: run_ppt_auto_generation_task("daily"))
logger.info("📅 每日 20:30:ppt_auto_generation_daily(日報)")
schedule.every().monday.at("20:40").do(lambda: run_ppt_auto_generation_task("weekly"))
logger.info("📅 每週一 20:40:ppt_auto_generation_weekly(週報 / 市場情報)")
def _ppt_monthly_gate():
from datetime import datetime as _dt
if _dt.now().day == 1:
run_ppt_auto_generation_task("monthly")
def _ppt_quarterly_gate():
from datetime import datetime as _dt
now = _dt.now()
if now.day == 1 and now.month in (1, 4, 7, 10):
run_ppt_auto_generation_task("quarterly")
def _ppt_half_yearly_gate():
from datetime import datetime as _dt
now = _dt.now()
if now.day == 1 and now.month in (1, 7):
run_ppt_auto_generation_task("half_yearly")
def _ppt_annual_gate():
from datetime import datetime as _dt
now = _dt.now()
if now.day == 1 and now.month == 1:
run_ppt_auto_generation_task("annual")
schedule.every().day.at("20:50").do(_ppt_monthly_gate)
logger.info("📅 每月1日 20:50:ppt_auto_generation_monthly(月報與管理型簡報)")
schedule.every().day.at("21:00").do(_ppt_quarterly_gate)
logger.info("📅 每季首月1日 21:00:ppt_auto_generation_quarterly(季報)")
schedule.every().day.at("21:10").do(_ppt_half_yearly_gate)
logger.info("📅 每半年首月1日 21:10:ppt_auto_generation_half_yearly(半年報)")
schedule.every().day.at("21:20").do(_ppt_annual_gate)
logger.info("📅 每年1月1日 21:20:ppt_auto_generation_annual(年報)")
schedule.every(10).minutes.do(run_ppt_auto_generation_catchup_task)
logger.info("📅 每 10 分鐘:ppt_auto_generation_catchup(補跑錯過的定期簡報)")
# Phase 26: PPT 視覺審核(每日 22:00 掃當天新生 .pptx,有 issues 才推 Telegram)
schedule.every().day.at("22:00").do(run_ppt_vision_audit)
logger.info("📅 每日 22:00:ppt_vision_audit(PPT_VISION_ENABLED=true 才生效)")
schedule.every().day.at("03:00").do(run_db_backup_task)
logger.info("📅 每日 03:00:db_backup")
schedule.every().day.at("03:30").do(run_cleanup_agent_context)
logger.info("📅 每日 03:30:cleanup_agent_context")
schedule.every().day.at("04:00").do(run_backup_monitor_task)
logger.info("📅 每日 04:00:backup_monitor")
schedule.every().monday.at("06:00").do(run_weekly_strategy_task)
logger.info("📅 每週一 06:00:weekly_strategy")
schedule.every().day.at("09:00").do(run_daily_report_task)
logger.info("📅 每日 09:00:daily_report")
schedule.every().day.at("09:10").do(run_ai_smoke_daily_summary_task)
logger.info("📅 每日 09:10:ai_smoke_daily_summary")
schedule.every().day.at("10:30").do(run_pchome_match_backfill_task)
logger.info("📅 每日 10:30:pchome_match_backfill")
schedule.every().day.at("10:45").do(run_pchome_growth_momo_backfill_task)
logger.info("📅 每日 10:45:pchome_growth_momo_backfill(高業績商品補 MOMO 對應)")
# Operation Ollama-First v5.0 — Phase 1 收尾:每日 23:55 LLM Token 日報
schedule.every().day.at("23:55").do(run_daily_token_report_task)
logger.info("📅 每日 23:55:daily_token_report")
# 每月1日 07:00 月報(schedule 不支援 every().month,用每日 07:00 + 日期判斷)
def _monthly_report_gate():
from datetime import datetime as _dt
if _dt.now().day == 1:
run_monthly_report_task()
schedule.every().day.at("07:00").do(_monthly_report_gate)
logger.info("📅 每月1日 07:00:monthly_report")
def run_daily_token_report_task():
"""每日 23:55 — Operation Ollama-First v5.0 Phase 1 收尾:LLM Token 日報。
任務:
1. 查 ai_calls 過去 24h 統計(總覽 / 供應商 / TOP caller / 成本 / 趨勢 / 告警)
2. 推 Telegram + 寫 ai_insights(type='daily_token_report')
紀律:
- 失敗安全:DB 查不到資料 → 推「⚠️ 報表生成失敗」訊息但不爆 scheduler
- 不影響其他排程:例外完全吞掉,僅 log error
"""
try:
from services.token_report_service import send_daily_report
result = send_daily_report()
logger.info(
"[TokenReport] sent=%s failed=%s chars=%s ok=%s",
result.get('sent'), result.get('failed'),
result.get('chars'), result.get('ok'),
)
except Exception as e:
logger.error(f"[TokenReport] task failed: {e}", exc_info=True)
_notify_scheduler_failure(
"run_daily_token_report_task",
e,
source="Scheduler.TokenReport",
event_type="daily_token_report_failure",
title="LLM Token 日報失敗",
)
# ─────────────────────────────────────────────────────────────────────────────
# Operation Ollama-First v5.0 Phase 11+ — RAG 學習迴圈 worker(Phase 12 收尾)
# ─────────────────────────────────────────────────────────────────────────────
def run_promotion_gate_worker():
"""每 5 分鐘 — 批次處理 learning_episodes pending → can_promote → promote/reject/await。
依 ADR-032 PromotionGate 4 階段,不主動跑 LLM(Distiller 純規則引擎)。
RAG_ENABLED=false 時 learning_episodes 為空,worker 跑空 loop(無害)。
"""
try:
from services.learning_pipeline import process_pending_episodes
stats = process_pending_episodes()
if stats.get('pending_seen', 0) > 0:
logger.info(
"[PromotionWorker] pending=%d promoted=%d rejected=%d awaiting=%d errors=%d",
stats['pending_seen'], stats['promoted'], stats['rejected'],
stats['awaiting'], stats['errors'],
)
except Exception as e:
logger.error(f"[PromotionWorker] task failed: {e}", exc_info=True)
_notify_scheduler_failure(
"run_promotion_gate_worker",
e,
source="Scheduler.PromotionGate",
event_type="promotion_gate_worker_failure",
title="PromotionGate worker 失敗",
)
def run_awaiting_review_push():
"""每 30 分鐘 — 推 awaiting_review episode 到 Telegram 等 👍/👎。
限制:TELEGRAM_ADMIN_CHAT_ID 未設則跳過(fail-safe)。
"""
try:
from services.learning_pipeline import push_awaiting_reviews_to_telegram
pushed = push_awaiting_reviews_to_telegram()
if pushed > 0:
logger.info("[AwaitingReviewPush] pushed=%d episodes", pushed)
except Exception as e:
logger.error(f"[AwaitingReviewPush] task failed: {e}", exc_info=True)
_notify_scheduler_failure(
"run_awaiting_review_push",
e,
source="Scheduler.PromotionGate",
event_type="awaiting_review_push_failure",
title="PromotionGate 審查推播失敗",
)
def run_expire_stale_reviews():
"""每 4 小時 — 24h 無回應 awaiting_review → expired(weight=0.5)。
依 ADR-033 護欄 #1 Stage 4 規則。
"""
try:
from services.learning_pipeline import expire_stale_reviews
n = expire_stale_reviews()
if n > 0:
logger.info("[ExpireStale] expired %d awaiting_review episodes (24h timeout)", n)
except Exception as e:
logger.error(f"[ExpireStale] task failed: {e}", exc_info=True)
_notify_scheduler_failure(
"run_expire_stale_reviews",
e,
source="Scheduler.PromotionGate",
event_type="expire_stale_reviews_failure",
title="PromotionGate 過期審查清理失敗",
)
def run_action_plan_hygiene_task():
"""每 6 小時 — 關閉過期 advisory action_plans,避免 queue 再次膨脹。"""
try:
from services.action_plan_hygiene import run_action_plan_hygiene
result = run_action_plan_hygiene()
updated = int(result.get("updated_count") or 0)
if updated > 0:
logger.info(
"[ActionPlanHygiene] updated=%d by_source=%s",
updated,
result.get("by_source"),
)
else:
logger.debug("[ActionPlanHygiene] no stale advisory action plans")
except Exception as e:
logger.error(f"[ActionPlanHygiene] task failed: {e}", exc_info=True)
_notify_scheduler_failure(
"run_action_plan_hygiene_task",
e,
source="Scheduler.ActionPlanHygiene",
event_type="action_plan_hygiene_failure",
title="Action Plan 過期清理失敗",
)
def run_cost_throttle_evaluate():
"""每小時 — Phase 20 成本自動節流評估(COST_THROTTLE_ENABLED 預設 OFF)。
跑 evaluate_throttle_status;若有 provider 月底推估超預算 110% → 標 throttled。
狀態變化(throttle/unthrottle)會自動推 Telegram。
"""
try:
from services.cost_throttle_service import (
evaluate_throttle_status, is_cost_throttle_enabled,
)
if not is_cost_throttle_enabled():
return # flag OFF 直接 skip(不打 DB)
state = evaluate_throttle_status()
throttled = [p for p, info in state.items() if info.get('throttled')]
if throttled:
logger.warning("[CostThrottle] currently throttled: %s", throttled)
else:
logger.debug("[CostThrottle] no provider throttled")
except Exception as e:
logger.error(f"[CostThrottle] task failed: {e}", exc_info=True)
_notify_scheduler_failure(
"run_cost_throttle_evaluate",
e,
source="Scheduler.CostThrottle",
event_type="cost_throttle_evaluate_failure",
title="成本節流評估失敗",
)
def run_host_health_probe():
"""Phase 42 — 每 15 分鐘自動 probe 三主機 Ollama,寫入 host_health_probes。
Phase 43 增強:偵測 state transition (healthy→unhealthy / unhealthy→healthy) →
主動推 Telegram 告警 + inline AutoHeal 按鈕,完整「監控→告警→修復」閉環。
Dedup: 1 小時內同 host 同方向 transition 只推一次(防 flapping 洗版)。
"""
try:
import time as _time
import requests as _r
from sqlalchemy import text as _sa
from database.manager import DatabaseManager
from services.ollama_service import (
OLLAMA_HOST_PRIMARY, OLLAMA_HOST_SECONDARY, OLLAMA_HOST_FALLBACK,
_is_unhealthy,
)
records = []
for label, host in [
('Primary (GCP)', OLLAMA_HOST_PRIMARY),
('Secondary (GCP)', OLLAMA_HOST_SECONDARY),
('Fallback (111)', OLLAMA_HOST_FALLBACK),
]:
t0 = _time.monotonic()
healthy = False
err = None
models_count = 0
try:
resp = _r.get(f"{host.rstrip('/')}/api/tags", timeout=3)
if resp.status_code == 200:
healthy = True
models_count = len(resp.json().get('models', []) or [])
if _host_health_model_probe_enabled(label):
model_ok, model_err = _probe_ollama_embedding_runtime(_r, host)
if not model_ok:
healthy = False
err = model_err
else:
err = f"HTTP {resp.status_code}"
except Exception as e:
err = f"{type(e).__name__}: {str(e)[:200]}"
records.append({
'host_label': label, 'host_url': host, 'healthy': healthy,
'unhealthy_mark': _is_unhealthy(host),
'models_count': models_count,
'response_ms': int((_time.monotonic() - t0) * 1000),
'error_msg': err,
})
# Phase 43: state transition 偵測(在寫入新筆 *之前* 查上一筆狀態)
transitions = []
try:
session = DatabaseManager().get_session()
try:
for rec in records:
last = session.execute(
_sa("""
SELECT healthy, probed_at FROM host_health_probes
WHERE host_label = :label
ORDER BY probed_at DESC LIMIT 1
"""),
{'label': rec['host_label']},
).fetchone()
if last is None:
continue # 第一筆探針,無對比基準
prev_healthy = bool(last[0])
if prev_healthy != rec['healthy']:
# 同方向 1h 內 dedup
recent_transition = session.execute(
_sa("""
SELECT 1 FROM host_health_probes p1
WHERE p1.host_label = :label
AND p1.probed_at >= NOW() - INTERVAL '1 hour'
AND p1.healthy = :curr_healthy
AND EXISTS (
SELECT 1 FROM host_health_probes p2
WHERE p2.host_label = :label
AND p2.probed_at < p1.probed_at
AND p2.probed_at >= NOW() - INTERVAL '90 minutes'
AND p2.healthy != :curr_healthy
)
LIMIT 1
"""),
{'label': rec['host_label'], 'curr_healthy': rec['healthy']},
).fetchone()
if recent_transition is None:
transitions.append({
'host_label': rec['host_label'],
'host_url': rec['host_url'],
'prev_healthy': prev_healthy,
'curr_healthy': rec['healthy'],
'error_msg': rec.get('error_msg'),
'response_ms': rec.get('response_ms'),
})
finally:
session.close()
except Exception as e:
logger.warning(f"[HostHealthProbe] transition detect failed: {e}")
# 批次寫 DB
session = DatabaseManager().get_session()
try:
for rec in records:
session.execute(
_sa("""
INSERT INTO host_health_probes
(host_label, host_url, healthy, unhealthy_mark,
models_count, response_ms, error_msg)
VALUES
(:host_label, :host_url, :healthy, :unhealthy_mark,
:models_count, :response_ms, :error_msg)
"""),
rec,
)
session.commit()
finally:
session.close()
# Phase 43: 推 Telegram 告警 / 恢復通知
for tr in transitions:
try:
_push_host_transition_alert(tr)
except Exception as e:
logger.error(f"[HostHealthProbe] push alert failed for {tr['host_label']}: {e}")
unhealthy = [r['host_label'] for r in records if not r['healthy']]
if unhealthy:
logger.warning(f"[HostHealthProbe] unhealthy hosts: {unhealthy}")
else:
logger.debug("[HostHealthProbe] all 3 hosts healthy")
except Exception as e:
logger.error(f"[HostHealthProbe] failed: {e}", exc_info=True)
_notify_scheduler_failure(
"run_host_health_probe",
e,
source="Scheduler.HostHealth",
event_type="host_health_probe_failure",
title="三主機健康探針失敗",
)
def _push_host_transition_alert(tr):
"""Phase 43: 主機 state transition → 推 Telegram。
healthy → unhealthy:P1 告警 + inline 「🩹 修 {label}」按鈕
unhealthy → healthy:P3 簡訊「已恢復」(不附按鈕)
"""
try:
from services.telegram_templates import send_telegram_with_result
except Exception:
return
label = tr['host_label']
short_label = (
'GCP-A' if 'Primary' in label else
'GCP-B' if 'Secondary' in label else
'111'
)
if tr['curr_healthy']:
# 恢復通知(無按鈕)
text = (
f"✅ Ollama 主機已恢復\n\n"
f"主機:{label}\n"
f"網址:{tr['host_url']}\n"
f"回應:{tr['response_ms']} ms\n\n"
f"scheduler 每 15 分鐘自動探針偵測"
)
send_telegram_with_result(text, parse_mode='HTML')
else:
# 故障告警 + inline AutoHeal 按鈕
err_short = (tr.get('error_msg') or '無錯誤訊息')[:200]
text = (
f"🚨 Ollama 主機異常\n\n"
f"主機:{label}\n"
f"網址:{tr['host_url']}\n"
f"錯誤:{err_short}\n\n"
f"💡 點下方按鈕一鍵觸發 ADR-013 AutoHeal Playbook,"
f"或至 觀測台 詳查。"
)
reply_markup = {
"inline_keyboard": [
[{"text": f"🩹 立即 AutoHeal {short_label}", "callback_data": f"cmd:obs_heal:{short_label}"}],
[{"text": "📊 查 24h 健康統計", "callback_data": "cmd:obs_health"}],
],
}
send_telegram_with_result(text, reply_markup=reply_markup, parse_mode='HTML')
def run_ai_calls_error_spike_check():
"""Phase 44 — 每 30 分鐘檢查 AI 呼叫錯誤率,異常時主動告警。
觸發條件:過去 1 小時 ai_calls 錯誤率 ≥ 30% 且總呼叫 ≥ 20
Dedup:1 小時內同性質告警只推一次(看 push log)— 簡化用 60min 視窗 + 已告警 flag。
"""
try:
from sqlalchemy import text as _sa
from database.manager import DatabaseManager
session = DatabaseManager().get_session()
try:
row = session.execute(
_sa("""
SELECT COUNT(*) AS total,
COUNT(*) FILTER (WHERE status NOT IN ('ok','cache_only')) AS errors,
COUNT(DISTINCT caller) AS distinct_callers
FROM ai_calls
WHERE called_at >= NOW() - INTERVAL '1 hour'
"""),
).fetchone()
total = int(row[0] or 0)
errors = int(row[1] or 0)
if total < 20:
return # 樣本太少不告警
error_rate = errors / total
if error_rate < 0.30:
return # 低於閾值,正常
# 查 top 3 problematic caller
top_callers = session.execute(
_sa("""
SELECT caller, COUNT(*) AS calls,
COUNT(*) FILTER (WHERE status NOT IN ('ok','cache_only')) AS errs
FROM ai_calls
WHERE called_at >= NOW() - INTERVAL '1 hour'
GROUP BY caller
HAVING COUNT(*) FILTER (WHERE status NOT IN ('ok','cache_only')) >= 3
ORDER BY errs DESC
LIMIT 3
"""),
).fetchall()
finally:
session.close()
# 推 Telegram 告警 + inline Code Review 按鈕。
# 1 小時內同類 spike 只推一次,避免每 30 分鐘洗版。
global _AI_CALLS_ERROR_SPIKE_LAST_PUSH_TS
now_ts = time.time()
if now_ts - _AI_CALLS_ERROR_SPIKE_LAST_PUSH_TS < 3600:
logger.info("[AICallsErrorSpike] skip duplicate alert within 1h window")
return
from services.telegram_templates import send_telegram_with_result
lines = [
"🚨 AI 呼叫錯誤率異常", "",
f"過去 1 小時:{errors}/{total} 失敗({error_rate*100:.1f}%)",
"",
]
if top_callers:
lines.append("問題 caller Top 3:")
for c in top_callers:
lines.append(f"• {c[0]}:{c[2]}/{c[1]} 失敗")
lines.append("")
lines.append("💡 點按鈕觸發 Code Review Pipeline 自動審查最新 commit。")
reply_markup = {
"inline_keyboard": [
[{"text": "🔬 觸發 Code Review", "callback_data": "cmd:obs_trigger_review"}],
[{"text": "📊 查 24h AI 呼叫", "callback_data": "cmd:obs_ai_calls"}],
],
}
send_telegram_with_result('\n'.join(lines), reply_markup=reply_markup, parse_mode='HTML')
_AI_CALLS_ERROR_SPIKE_LAST_PUSH_TS = now_ts
logger.warning(
f"[AICallsErrorSpike] alert pushed: total={total} errors={errors} "
f"rate={error_rate*100:.1f}%"
)
except Exception as e:
logger.error(f"[AICallsErrorSpike] failed: {e}", exc_info=True)
_notify_scheduler_failure(
"run_ai_calls_error_spike_check",
e,
source="Scheduler.Observability",
event_type="ai_calls_error_spike_check_failure",
title="AI 呼叫錯誤率檢查失敗",
)
def run_ollama_111_usage_guard_check():
"""Phase 57 — final fallback 111 使用率告警。
111 是最後防線;這個 guard 只觀測 ai_calls,不改路由。
預設條件:最近 60 分鐘 Ollama 呼叫 >= 20、111 呼叫 >= 3、111 占比 >= 5%。
"""
if not _env_flag("OLLAMA_111_USAGE_ALERT_ENABLED", True):
return
try:
from sqlalchemy import text as _sa
from database.manager import DatabaseManager
window_minutes = int(os.getenv("OLLAMA_111_USAGE_ALERT_WINDOW_MINUTES", "60"))
threshold_pct = float(os.getenv("OLLAMA_111_USAGE_ALERT_PCT", "5"))
min_total = int(os.getenv("OLLAMA_111_USAGE_ALERT_MIN_TOTAL", "20"))
min_111 = int(os.getenv("OLLAMA_111_USAGE_ALERT_MIN_111", "3"))
dedup_sec = int(os.getenv("OLLAMA_111_USAGE_ALERT_DEDUP_SEC", "3600"))
session = DatabaseManager().get_session()
try:
row = session.execute(
_sa("""
SELECT
COUNT(*) FILTER (
WHERE provider IN ('gcp_ollama','ollama_secondary','ollama_111')
) AS total_ollama,
COUNT(*) FILTER (WHERE provider = 'gcp_ollama') AS gcp_a,
COUNT(*) FILTER (WHERE provider = 'ollama_secondary') AS gcp_b,
COUNT(*) FILTER (WHERE provider = 'ollama_111') AS host_111
FROM ai_calls
WHERE called_at >= NOW() - (:window_minutes || ' minutes')::interval
"""),
{"window_minutes": window_minutes},
).fetchone()
total_ollama = int(row[0] or 0)
gcp_a = int(row[1] or 0)
gcp_b = int(row[2] or 0)
host_111 = int(row[3] or 0)
if total_ollama < min_total or host_111 < min_111:
return
rate_pct = (host_111 / total_ollama * 100.0) if total_ollama else 0.0
if rate_pct < threshold_pct:
return
top_callers = session.execute(
_sa("""
SELECT caller,
COALESCE(model, '') AS model,
COUNT(*) AS calls,
COALESCE(SUM(input_tokens + output_tokens), 0) AS tokens,
COUNT(*) FILTER (WHERE status NOT IN ('ok','cache_only')) AS errors
FROM ai_calls
WHERE called_at >= NOW() - (:window_minutes || ' minutes')::interval
AND provider = 'ollama_111'
GROUP BY caller, model
ORDER BY calls DESC, tokens DESC
LIMIT 5
"""),
{"window_minutes": window_minutes},
).fetchall()
finally:
session.close()
global _OLLAMA_111_USAGE_LAST_PUSH_TS
now_ts = time.time()
if now_ts - _OLLAMA_111_USAGE_LAST_PUSH_TS < dedup_sec:
logger.info("[Ollama111Guard] skip duplicate alert within %ss window", dedup_sec)
return
from services.telegram_templates import send_telegram_with_result
lines = [
"⚠️ 111 Ollama 使用率偏高",
"",
f"過去 {window_minutes} 分鐘 Ollama 呼叫:{total_ollama} 次",
f"111 fallback:{host_111} 次({rate_pct:.1f}%)",
f"GCP-A:{gcp_a} 次 · GCP-B:{gcp_b} 次",
"",
]
if top_callers:
lines.append("111 caller Top 5:")
for caller, model, calls, tokens, errors in top_callers:
model_part = f" / {model}" if model else ""
err_part = f" · err {errors}" if int(errors or 0) else ""
lines.append(
f"• {caller}{model_part}:{calls} 次 · {int(tokens or 0):,} tokens{err_part}"
)
lines.append("")
lines.extend([
"建議先看 GCP-A/GCP-B health probe 與近期 unhealthy mark;",
"若 GCP 正常,檢查是否有 fallback flag 或重任務意外打到 111。",
])
reply_markup = {
"inline_keyboard": [
[{"text": "🏥 主機健康", "callback_data": "cmd:obs_health"},
{"text": "📊 AI 呼叫", "callback_data": "cmd:obs_ai_calls"}],
],
}
send_telegram_with_result("\n".join(lines), reply_markup=reply_markup, parse_mode="HTML")
_OLLAMA_111_USAGE_LAST_PUSH_TS = now_ts
logger.warning(
"[Ollama111Guard] alert pushed: total=%s gcp_a=%s gcp_b=%s host_111=%s rate=%.1f%%",
total_ollama, gcp_a, gcp_b, host_111, rate_pct,
)
except Exception as e:
logger.error(f"[Ollama111Guard] failed: {e}", exc_info=True)
_notify_scheduler_failure(
"run_ollama_111_usage_guard_check",
e,
source="Scheduler.Ollama111Guard",
event_type="ollama_111_usage_guard_failure",
title="111 Ollama 使用率護欄失敗",
)
def run_observability_daily_summary():
"""Phase 44 — 每日 09:30 推送觀測台健康摘要(早晨報)。
一頁式涵蓋:三主機 24h uptime / AI 呼叫量 / 當月 cost / 待審 episode 數 / PPT audit 通過率。
讓統帥早上滑手機就能看到全景,不用主動開觀測台。
"""
try:
from sqlalchemy import text as _sa
from database.manager import DatabaseManager
from datetime import datetime as _dt
session = DatabaseManager().get_session()
try:
# 三主機 24h uptime
host_rows = session.execute(
_sa("""
SELECT host_label,
COUNT(*) AS total,
COUNT(*) FILTER (WHERE healthy) AS up
FROM host_health_probes
WHERE probed_at >= NOW() - INTERVAL '24 hours'
GROUP BY host_label ORDER BY host_label
"""),
).fetchall()
# AI 呼叫 24h
ai_row = session.execute(
_sa("""
SELECT COUNT(*) AS total,
COALESCE(SUM(input_tokens + output_tokens), 0) AS tokens,
COALESCE(SUM(cost_usd), 0) AS cost,
COUNT(*) FILTER (WHERE status NOT IN ('ok','cache_only')) AS errors,
COUNT(*) FILTER (WHERE rag_hit) AS rag_hits
FROM ai_calls
WHERE called_at >= NOW() - INTERVAL '24 hours'
"""),
).fetchone()
# 當月 cost
today = _dt.now()
month_start = _dt(today.year, today.month, 1)
month_cost = session.execute(
_sa("SELECT COALESCE(SUM(cost_usd), 0) FROM ai_calls WHERE called_at >= :ms"),
{'ms': month_start},
).fetchone()[0] or 0
# 待審 episodes
ep_pending = session.execute(
_sa("SELECT COUNT(*) FROM learning_episodes WHERE promotion_status = 'awaiting_review' AND reviewed_at IS NULL"),
).fetchone()[0] or 0
# PPT audit 7d
ppt_row = session.execute(
_sa("""
SELECT COUNT(*) AS total,
COUNT(*) FILTER (WHERE audit_status = 'passed') AS passed,
COUNT(*) FILTER (WHERE audit_status = 'failed') AS failed
FROM ppt_audit_results
WHERE audited_at >= NOW() - INTERVAL '7 days'
"""),
).fetchone()
# Phase 49: 商業面未跟進機會(high-confidence 卻無 action_plan)
unfollowed_count = 0
try:
unfollowed_count = session.execute(
_sa("""
SELECT COUNT(*) FROM ai_price_recommendations r
WHERE r.created_at >= NOW() - INTERVAL '7 days'
AND r.confidence >= 0.7
AND NOT EXISTS (
SELECT 1 FROM action_plans p
WHERE p.sku = r.sku
AND p.created_at >= r.created_at
AND p.created_at < r.created_at + INTERVAL '7 days'
)
"""),
).fetchone()[0] or 0
except Exception:
pass
finally:
session.close()
ai_total = int(ai_row[0] or 0)
ai_errors = int(ai_row[3] or 0)
ai_rag = int(ai_row[4] or 0)
ai_cost = float(ai_row[2] or 0)
ai_tokens = int(ai_row[1] or 0)
ppt_total = int(ppt_row[0] or 0)
ppt_passed = int(ppt_row[1] or 0)
ppt_failed = int(ppt_row[2] or 0)
lines = [
f"🛰 觀測台日報 — {today.strftime('%Y-%m-%d')}",
"",
"三主機 24h 在線率:",
]
if host_rows:
for label, total, up in host_rows:
pct = (float(up) / float(total) * 100) if total else 0
emoji = "✅" if pct >= 99 else "⚠️" if pct >= 90 else "🚨"
lines.append(f"{emoji} {label}:{pct:.1f}%({up}/{total})")
else:
lines.append("(無資料 — scheduler probe 尚未啟動?)")
lines.append("")
lines.append("AI 呼叫 24h:")
err_rate = (ai_errors / ai_total * 100) if ai_total else 0
rag_rate = (ai_rag / ai_total * 100) if ai_total else 0
err_emoji = "✅" if err_rate < 5 else "⚠️" if err_rate < 15 else "🚨"
lines.append(f"• 呼叫:{ai_total:,} 次 · Token:{ai_tokens:,}")
lines.append(f"• 24h 成本:${ai_cost:.2f} · 當月累計:${month_cost:.2f}")
lines.append(f"{err_emoji} 錯誤率:{err_rate:.1f}%({ai_errors})")
lines.append(f"💡 RAG 命中率:{rag_rate:.1f}%({ai_rag})")
if ep_pending:
lines.append("")
lines.append(f"📋 待審 episodes:{ep_pending} 筆")
if ppt_total:
lines.append("")
ppt_pass_rate = (ppt_passed / ppt_total * 100) if ppt_total else 0
ppt_emoji = "✅" if ppt_pass_rate >= 80 else "⚠️"
lines.append(f"{ppt_emoji} PPT 視覺審核 7d:{ppt_passed}/{ppt_total} 通過({ppt_pass_rate:.0f}%)")
if ppt_failed:
lines.append(f" 失敗:{ppt_failed} 筆")
if unfollowed_count > 0:
lines.append("")
lines.append(f"⚠️ 商業面未跟進:{unfollowed_count} 筆"
f"(high-confidence AI 建議未轉化為 action_plan)")
lines.append("")
lines.append('→ 開觀測台總覽')
from services.telegram_templates import send_telegram_with_result
reply_markup = {
"inline_keyboard": [
[{"text": "🛰 觀測台總覽", "callback_data": "cmd:obs_overview"},
{"text": "🌐 Agent 編排", "callback_data": "cmd:obs_orchestration"}],
[{"text": "💼 商業面 AI", "callback_data": "cmd:obs_business"},
{"text": "🏥 主機健康", "callback_data": "cmd:obs_health"}],
[{"text": "📊 AI 呼叫", "callback_data": "cmd:obs_ai_calls"},
{"text": "💰 預算", "callback_data": "cmd:obs_budget"}],
],
}
send_telegram_with_result('\n'.join(lines), reply_markup=reply_markup, parse_mode='HTML')
logger.info("[ObservabilityDaily] summary pushed to Telegram")
except Exception as e:
logger.error(f"[ObservabilityDaily] failed: {e}", exc_info=True)
_notify_scheduler_failure(
"run_observability_daily_summary",
e,
source="Scheduler.Observability",
event_type="observability_daily_summary_failure",
title="觀測台每日摘要失敗",
)
def run_host_health_probe_cleanup():
"""Phase 42 — 每日 03:00 清 host_health_probes 30 天前資料。"""
try:
from sqlalchemy import text as _sa
from database.manager import DatabaseManager
session = DatabaseManager().get_session()
try:
result = session.execute(
_sa("DELETE FROM host_health_probes WHERE probed_at < NOW() - INTERVAL '30 days'"),
)
session.commit()
logger.info(f"[HostHealthProbe] cleanup: {result.rowcount} rows deleted (>30d)")
finally:
session.close()
except Exception as e:
logger.error(f"[HostHealthProbe] cleanup failed: {e}", exc_info=True)
_notify_scheduler_failure(
"run_host_health_probe_cleanup",
e,
source="Scheduler.HostHealth",
event_type="host_health_probe_cleanup_failure",
title="主機健康探針清理失敗",
)
def run_cost_throttle_reset_if_new_month():
"""每日 00:05 — 若當天是月份第 1 天,清 throttle state(跨月重置)。"""
try:
from datetime import datetime
from services.cost_throttle_service import reset_throttle_state
if datetime.now().day == 1:
reset_throttle_state()
logger.info("[CostThrottle] new month detected, state reset")
except Exception as e:
logger.error(f"[CostThrottle] reset failed: {e}", exc_info=True)
_notify_scheduler_failure(
"run_cost_throttle_reset_if_new_month",
e,
source="Scheduler.CostThrottle",
event_type="cost_throttle_reset_failure",
title="成本節流跨月重置失敗",
)
def run_ppt_vision_audit():
"""每日 22:00 — Phase 26 PPT 視覺審核
掃 reports/ 目錄當天新生 .pptx,跑 minicpm-v 視覺檢查,有 issues 推 Telegram。
PPT_VISION_ENABLED=false 時 audit_recent_ppts 內部直接 skip(不打 LLM)。
需 LibreOffice headless 在 PATH(轉 .pptx → png);不在則 fail-safe skip。
"""
try:
from services.ppt_vision_service import audit_recent_ppts, push_ppt_audit_to_telegram
summary = audit_recent_ppts(
reports_dir=os.getenv('REPORTS_DIR', '/app/data/reports'),
hours=24,
max_files=10,
)
if summary['total_issues'] > 0:
pushed = push_ppt_audit_to_telegram(summary)
logger.info(
"[PPTVisionAudit] %d files, %d issues, telegram=%s",
len(summary['audited_files']), summary['total_issues'], pushed,
)
else:
logger.debug("[PPTVisionAudit] no issues found")
except Exception as e:
logger.error(f"[PPTVisionAudit] task failed: {e}", exc_info=True)
_notify_scheduler_failure(
"run_ppt_vision_audit",
e,
source="Scheduler.PPTVision",
event_type="ppt_vision_audit_failure",
title="PPT 視覺審核失敗",
)
def run_ppt_auto_generation_catchup_task():
"""每 10 分鐘補跑被同步長任務錯過的定期 PPT 產出。"""
try:
from services.ppt_auto_generation_service import start_scheduled_ppt_catchup_background
result = start_scheduled_ppt_catchup_background()
if result.get("status") == "queued":
logger.info(
"[PPTAutoGenerationCatchup] queued kinds=%s",
",".join(result.get("schedule_kinds") or []),
)
elif result.get("status") == "already_running":
logger.debug("[PPTAutoGenerationCatchup] skipped; generation already running")
else:
logger.debug("[PPTAutoGenerationCatchup] status=%s", result.get("status"))
except Exception as e:
logger.error(f"[PPTAutoGenerationCatchup] task failed: {e}", exc_info=True)
_notify_scheduler_failure(
"run_ppt_auto_generation_catchup_task",
e,
source="Scheduler.PPTAutoGeneration",
event_type="ppt_auto_generation_catchup_failure",
title="PPT 定期簡報補跑失敗",
)
def run_roi_monthly_report_if_new_month():
"""每日 09:00 — Phase 24 ROI 月報(內部判斷月初第 1 日才送)
對比上月 ai_calls 統計 vs 戰前 baseline,推 Telegram「節省 X tokens / $Y」。
寫入 ai_insights (type='roi_monthly_report') 作長期記錄。
"""
try:
from datetime import datetime
if datetime.now().day != 1:
return # 非月初第 1 日 skip
from services.roi_report_service import generate_and_send_roi_report
result = generate_and_send_roi_report()
logger.info("[ROIReport] sent=%s period=%s",
result.get('sent'), result.get('period', '?'))
except Exception as e:
logger.error(f"[ROIReport] task failed: {e}", exc_info=True)
_notify_scheduler_failure(
"run_roi_monthly_report_if_new_month",
e,
source="Scheduler.ROIReport",
event_type="roi_monthly_report_failure",
title="ROI 月報失敗",
)
def run_embed_consistency_check():
"""每週日 04:30 — BGE-M3 跨 GCP Ollama 一致性驗證(ADR-033 護欄 #3)。
跑 verify_embedding_consistency,不一致時 logger.error;ok 時 logger.info。
每週一次足夠(驗證模型版本未漂移;過頻會打 Ollama 浪費)。
111 Mac fallback 預設不參與,避免背景檢查載入 bge-m3 壓住 16GB 主機。
"""
try:
from services.rag_service import verify_embedding_consistency
result = verify_embedding_consistency()
logger.info(
"[EmbedConsistency] ok=%s reachable=%s max_diff=%.2e signature=%s",
result['ok'], result['reachable'],
result['max_diff'], result['signature'],
)
if not result['ok']:
max_diff = float(result.get('max_diff') or 0.0)
detail = (
"BGE-M3 embedding consistency mismatch "
f"reachable={result.get('reachable')} "
f"max_diff={max_diff:.2e} "
f"signature={result.get('signature')}"
)
logger.error(
"[EmbedConsistency] ⚠️ INCONSISTENT — RAG 召回率將下降;"
"檢查 GCP-A/GCP-B bge-m3 模型版本是否同步(ollama list)"
)
_notify_scheduler_failure(
"run_embed_consistency_check",
RuntimeError(detail),
source="Scheduler.RAG",
event_type="embed_consistency_mismatch",
title="BGE-M3 一致性異常",
dedup_ttl_sec=86400,
)
except Exception as e:
logger.error(f"[EmbedConsistency] task failed: {e}", exc_info=True)
_notify_scheduler_failure(
"run_embed_consistency_check",
e,
source="Scheduler.RAG",
event_type="embed_consistency_check_failure",
title="BGE-M3 一致性檢查失敗",
)
def run_cleanup_agent_context():
"""每日 03:30 — 清理 agent_context 表中已過期的 TTL 記錄(migration 018 定義)"""
from database.manager import get_session
from sqlalchemy import text
session = get_session()
try:
session.execute(text("SELECT cleanup_expired_agent_context()"))
session.commit()
logger.info("[Cleanup] agent_context TTL 清理完成")
except Exception as e:
logger.error(f"[Cleanup] agent_context 清理失敗: {e}")
try:
from services.event_router import notify_failure
notify_failure(
task_name="run_cleanup_agent_context",
error=e,
source="Scheduler.Cleanup",
event_type="agent_context_cleanup_failure",
priority="P2",
title="agent_context TTL 清理失敗",
dedup_ttl_sec=3600,
)
except Exception as _router_e:
logger.error(f"[Cleanup] event_router 失敗: {_router_e}")
finally:
session.close()
def _run_elephant_alpha_engine():
"""Daemon thread: ElephantAlpha 自主監控引擎(獨立 asyncio loop)"""
loop = None
try:
from services.elephant_alpha_autonomous_engine import autonomous_engine
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
logger.info("🐘 [ElephantAlpha] Autonomous engine thread started")
loop.run_until_complete(autonomous_engine.start_autonomous_monitoring())
except Exception as e:
logger.error(f"🐘 [ElephantAlpha] Engine crashed: {e}")
finally:
if loop is not None:
loop.close()
if __name__ == "__main__":
logger.info("🚀 momo-scheduler 啟動中...")
_register_schedules()
logger.info("✅ 全部排程任務已註冊")
_ea_thread = threading.Thread(
target=_run_elephant_alpha_engine,
daemon=True,
name="elephant-alpha-engine",
)
_ea_thread.start()
logger.info("🐘 [ElephantAlpha] Autonomous engine thread launched")
logger.info("⏰ 排程主迴圈啟動,等待任務觸發...")
_ea_watchdog_counter = 0 # 每 60 秒(60 次 sleep(1))做一次存活檢查
while True:
try:
schedule.run_pending()
time.sleep(1)
# 每 60 秒檢查 ElephantAlpha 執行緒是否還活著
_ea_watchdog_counter += 1
if _ea_watchdog_counter >= 60:
_ea_watchdog_counter = 0
if not _ea_thread.is_alive():
logger.error("[ElephantAlpha] 監控執行緒已死亡,嘗試重啟")
try:
from services.event_router import dispatch_sync as _dispatch_sync
_dispatch_sync({
"source": "Scheduler.ElephantAlpha",
"event_type": "thread_crashed",
"severity": "alert",
"title": "ElephantAlpha 執行緒死亡",
"status": "自動重啟中",
"impact": "P2 - 自主監控引擎暫停",
"summary": "ElephantAlphaEngine daemon thread 意外終止,排程主迴圈已偵測並觸發重啟",
})
except Exception as _alert_err:
logger.error(f"[ElephantAlpha] 無法發送告警: {_alert_err}")
_ea_thread = threading.Thread(
target=_run_elephant_alpha_engine,
daemon=True,
name="ElephantAlphaEngine",
)
_ea_thread.start()
logger.info("[ElephantAlpha] 執行緒已重啟")
except KeyboardInterrupt:
logger.info("⛔ Scheduler stopped.")
break
except Exception as e:
logger.error(f"Scheduler error: {e}")
time.sleep(5)