Files
ewoooc/run_scheduler.py
ogt 0099543c05
Some checks failed
CD Pipeline / deploy (push) Failing after 5m18s
fix(security): 全域健檢 — 40 項安全/Bug/品質修復
🔴 Critical
- auto_heal_service: 補 import re + sqlalchemy.text + 修正 orchestrator 變數名
  + autoheal_playbook→playbooks 表名 + _alert_and_store cooldown 修復
- aider_heal_executor: shell injection 改 shell=False + list 參數
- docker-compose: DISABLE_LOGIN 改 env var + 移除密碼 fallback + POSTGRES_HOST 修正
- app.py: /api/backup /api/run_task 等 6 個管理 API 加 @login_required
- config.py + pg_sync + e2e_test: 移除 wooo_pg_2026 hardcoded 密碼 fallback
- pg_backup.sh: 移除 TELEGRAM_TOKEN= 中間變數,直接用 $TELEGRAM_BOT_TOKEN
- migration 014: trigger_pattern→match_pattern + 補 error_type NOT NULL 欄位

🟡 High
- telegram_bot_service: str(e) 改通用訊息 + session try/finally + 移除 pa:/pr: 舊 callback
- run_scheduler: ElephantAlpha thread 死亡監控 + 自動重啟 + Telegram 告警
  + agent_context 03:30 TTL 定時清理任務
- openclaw_learning_service: build_rag_context 兩路徑加 .limit(200)
- hooks: commit-quality + momo-prod-guard 空 catch 改 stderr+exit(1)
- scripts/code_review: auto_yes 預設改 false
- db_backup_service: PGPASSWORD 透過 env dict 傳遞

📦 Migrations
- 013_autoheal: 修正建表順序 playbooks→incidents(外鍵前向引用)
- 018_add_missing_indexes: heal_logs/incidents 外鍵索引 + cleanup_expired_agent_context()

🟢 Infrastructure
- requirements.txt: 加版本下界 Flask>=2.3 SQLAlchemy>=1.4 等
- cd.yaml: 新增 run_scheduler.py + run_telegram_bot.py 監聽路徑
- .gitignore: insert_playbook_local.py 加入忽略

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-22 01:12:23 +08:00

187 lines
6.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
run_scheduler.py — momo-scheduler 容器入口點
排程任務清單(對齊 app.py init_scheduler + scheduler.py 全任務):
每 30 分鐘auto_import、whitepage_check
每 1 小時momo、edm、festival
每 4 小時competitor_price_feeder、icaim_analysis
每 6 小時openclaw_meta_analysis、quality_rescore
每 12 小時dedup_batch
每 1 天 db_backup03:00、cleanup_agent_context03:30、backup_monitor04:00、daily_report09:00
每 1 週 weekly_strategy週一 06:00
每 1 月 monthly_report每月1日 07:00
"""
import asyncio
import logging
import threading
import time
import schedule
# 匯入全部排程任務函式
from scheduler import (
run_momo_task,
run_edm_task,
run_festival_task,
run_auto_import_task,
run_whitepage_check,
run_competitor_price_feeder_task,
run_icaim_analysis_task,
run_weekly_strategy_task,
run_db_backup_task,
run_backup_monitor_task,
run_openclaw_meta_analysis_task,
run_dedup_batch_task,
run_quality_rescore_task,
run_daily_report_task,
run_monthly_report_task,
)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
)
logger = logging.getLogger(__name__)
def _register_schedules():
schedule.every(30).minutes.do(run_auto_import_task)
logger.info("📅 每 30 分鐘auto_import")
schedule.every(30).minutes.do(run_whitepage_check)
logger.info("📅 每 30 分鐘whitepage_check")
schedule.every(1).hours.do(run_momo_task)
logger.info("📅 每 1 小時momo_task")
schedule.every(1).hours.do(run_edm_task)
logger.info("📅 每 1 小時edm_task")
schedule.every(1).hours.do(run_festival_task)
logger.info("📅 每 1 小時festival_task")
schedule.every(4).hours.do(run_competitor_price_feeder_task)
logger.info("📅 每 4 小時competitor_price_feeder")
schedule.every(4).hours.do(run_icaim_analysis_task)
logger.info("📅 每 4 小時icaim_analysis")
schedule.every(6).hours.do(run_openclaw_meta_analysis_task)
logger.info("📅 每 6 小時openclaw_meta_analysis")
schedule.every(6).hours.do(run_quality_rescore_task)
logger.info("📅 每 6 小時quality_rescore")
schedule.every(12).hours.do(run_dedup_batch_task)
logger.info("📅 每 12 小時dedup_batch")
schedule.every().day.at("03:00").do(run_db_backup_task)
logger.info("📅 每日 03:00db_backup")
schedule.every().day.at("03:30").do(run_cleanup_agent_context)
logger.info("📅 每日 03:30cleanup_agent_context")
schedule.every().day.at("04:00").do(run_backup_monitor_task)
logger.info("📅 每日 04:00backup_monitor")
schedule.every().monday.at("06:00").do(run_weekly_strategy_task)
logger.info("📅 每週一 06:00weekly_strategy")
schedule.every().day.at("09:00").do(run_daily_report_task)
logger.info("📅 每日 09:00daily_report")
# 每月1日 07:00 月報schedule 不支援 every().month用每日 07:00 + 日期判斷)
def _monthly_report_gate():
from datetime import datetime as _dt
if _dt.now().day == 1:
run_monthly_report_task()
schedule.every().day.at("07:00").do(_monthly_report_gate)
logger.info("📅 每月1日 07:00monthly_report")
def run_cleanup_agent_context():
"""每日 03:30 — 清理 agent_context 表中已過期的 TTL 記錄migration 018 定義)"""
from database.manager import get_session
from sqlalchemy import text
session = get_session()
try:
session.execute(text("SELECT cleanup_expired_agent_context()"))
session.commit()
logger.info("[Cleanup] agent_context TTL 清理完成")
except Exception as e:
logger.error(f"[Cleanup] agent_context 清理失敗: {e}")
finally:
session.close()
def _run_elephant_alpha_engine():
"""Daemon thread: ElephantAlpha 自主監控引擎(獨立 asyncio loop"""
try:
from services.elephant_alpha_autonomous_engine import autonomous_engine
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
logger.info("🐘 [ElephantAlpha] Autonomous engine thread started")
loop.run_until_complete(autonomous_engine.start_autonomous_monitoring())
except Exception as e:
logger.error(f"🐘 [ElephantAlpha] Engine crashed: {e}")
finally:
loop.close()
if __name__ == "__main__":
logger.info("🚀 momo-scheduler 啟動中...")
_register_schedules()
logger.info("✅ 全部排程任務已註冊")
_ea_thread = threading.Thread(
target=_run_elephant_alpha_engine,
daemon=True,
name="elephant-alpha-engine",
)
_ea_thread.start()
logger.info("🐘 [ElephantAlpha] Autonomous engine thread launched")
logger.info("⏰ 排程主迴圈啟動,等待任務觸發...")
_ea_watchdog_counter = 0 # 每 60 秒60 次 sleep(1))做一次存活檢查
while True:
try:
schedule.run_pending()
time.sleep(1)
# 每 60 秒檢查 ElephantAlpha 執行緒是否還活著
_ea_watchdog_counter += 1
if _ea_watchdog_counter >= 60:
_ea_watchdog_counter = 0
if not _ea_thread.is_alive():
logger.error("[ElephantAlpha] 監控執行緒已死亡,嘗試重啟")
try:
from services.event_router import dispatch as _dispatch
_dispatch({
"source": "Scheduler.ElephantAlpha",
"event_type": "thread_crashed",
"severity": "alert",
"title": "ElephantAlpha 執行緒死亡",
"status": "自動重啟中",
"impact": "P2 - 自主監控引擎暫停",
"summary": "ElephantAlphaEngine daemon thread 意外終止,排程主迴圈已偵測並觸發重啟",
})
except Exception as _alert_err:
logger.error(f"[ElephantAlpha] 無法發送告警: {_alert_err}")
_ea_thread = threading.Thread(
target=_run_elephant_alpha_engine,
daemon=True,
name="ElephantAlphaEngine",
)
_ea_thread.start()
logger.info("[ElephantAlpha] 執行緒已重啟")
except KeyboardInterrupt:
logger.info("⛔ Scheduler stopped.")
break
except Exception as e:
logger.error(f"Scheduler error: {e}")
time.sleep(5)