diff --git a/config.py b/config.py index c4e974a..9bcde7f 100644 --- a/config.py +++ b/config.py @@ -402,7 +402,7 @@ YOUTUBE_API_KEY = os.getenv('YOUTUBE_API_KEY', '') # ========================================== # 系統版本與路徑 # ========================================== -SYSTEM_VERSION = "V10.601" +SYSTEM_VERSION = "V10.604" LOG_FILE_PATH = os.path.join(BASE_DIR, 'logs/system.log') public_url = PUBLIC_URL # 用於模板顯示 diff --git a/docs/memory/current_execution_queue_20260524.md b/docs/memory/current_execution_queue_20260524.md index 3aa5fd2..97eb66c 100644 --- a/docs/memory/current_execution_queue_20260524.md +++ b/docs/memory/current_execution_queue_20260524.md @@ -156,6 +156,7 @@ - 每次產出與視覺 QA 結果必須完整寫入 DB。 - `/observability/ppt_audit_history` 必須清楚顯示 runtime 狀態、產出狀態、視覺 QA、問題追蹤與可預覽檔案。 - PPTX / PDF 預覽需可站內直接開啟,不能只下載。 +- 2026-06-06 起,`V10.604` 修正定期簡報長期漏產:根因為 `schedule.run_pending()` 同步執行,20:30/20:40/20:50 精準時段會被 feeder / AI 長任務卡過且不自動 replay。新增每 10 分鐘 missed-run catch-up、scheduler 背景化補跑、排程型市場情報與價格甜蜜點快速 fallback;production 已補齊 `daily`、`market_intel`、`price_elasticity`,catch-up plan 顯示 daily/weekly/monthly/quarterly/half_yearly/annual 全數 ready,`/observability/ppt_audit_history` 可看到新檔與預覽入口。 ## 6. 外部 BI / 協作入口 diff --git a/routes/admin_observability_routes.py b/routes/admin_observability_routes.py index fa15210..c227b56 100644 --- a/routes/admin_observability_routes.py +++ b/routes/admin_observability_routes.py @@ -18,6 +18,7 @@ Operation Ollama-First v5.0 / Phase 27 — Admin Observability Dashboard """ import logging +import re import threading import time from datetime import datetime, timedelta @@ -2511,6 +2512,22 @@ def _guess_ppt_report_type_from_filename(filename: str) -> str: return "" +def _ppt_filename_matches_month(filename: str, *, year: int, month: int) -> bool: + """判斷檔名是否明確帶有指定月份,用於補足歷史檔案 mtime 漂移。""" + name = str(filename or "") + for match in re.finditer(r"(? str: return text.strip() +def _scheduled_ppt_fast_fallback_enabled() -> bool: + return os.getenv("PPT_SCHEDULED_FAST_FALLBACK", "").strip().lower() in {"1", "true", "yes", "on"} + + def _ppt_ai_analysis(prompt_data: str, report_type: str = '') -> str: """ 用 NIM DeepSeek 生成簡報 AI 分析文字 @@ -1919,6 +1923,9 @@ def _ppt_ai_analysis(prompt_data: str, report_type: str = '') -> str: is_price_elast = '價格彈性' in report_type or 'price_elasticity' in report_type is_5forces = '五力' in report_type or 'competitor_v4' in report_type or '競業五力' in report_type + if _scheduled_ppt_fast_fallback_enabled() and (is_market_intel or is_price_elast): + return _ppt_fallback_insight(report_type or '簡報', prompt_data, '') + # ── 格式鐵律(所有 prompt 共用後綴)──────────────────────── FORMAT_RULES = ( "\n\n【輸出格式鐵律 — 絕對遵守】\n" diff --git a/run_scheduler.py b/run_scheduler.py index b4f8995..c3c1339 100644 --- a/run_scheduler.py +++ b/run_scheduler.py @@ -8,6 +8,7 @@ run_scheduler.py — momo-scheduler 容器入口點 每 4 小時:competitor_price_feeder、icaim_analysis 每 6 小時:quality_rescore、action_plan_hygiene 每 12 小時:dedup_batch + 每 10 分鐘:ppt_auto_generation_catchup(補跑被長任務卡過的定期簡報) 每 1 天 :db_backup(03:00)、cleanup_agent_context(03:30)、backup_monitor(04:00)、daily_report(09:00)、roi_monthly_report gate(09:05)、ai_smoke_summary(09:10)、observability_daily_summary(09:30)、pchome_match_backfill(10:30)、openclaw_meta_analysis(12:00, Phase 4 降頻)、ppt_auto_generation_daily(20:30)、ppt_vision_audit(22:00)、daily_token_report(23:55) 每 1 週 :weekly_strategy(週一 06:00)、ppt_auto_generation_weekly(週一 20:40) 每 1 月 :monthly_report(每月1日 07:00)、ppt_auto_generation_monthly(每月1日 20:50) @@ -274,6 +275,9 @@ def _register_schedules(): schedule.every().day.at("21:20").do(_ppt_annual_gate) logger.info("📅 每年1月1日 21:20:ppt_auto_generation_annual(年報)") + schedule.every(10).minutes.do(run_ppt_auto_generation_catchup_task) + logger.info("📅 每 10 分鐘:ppt_auto_generation_catchup(補跑錯過的定期簡報)") + # Phase 26: PPT 視覺審核(每日 22:00 掃當天新生 .pptx,有 issues 才推 Telegram) schedule.every().day.at("22:00").do(run_ppt_vision_audit) logger.info("📅 每日 22:00:ppt_vision_audit(PPT_VISION_ENABLED=true 才生效)") @@ -1096,6 +1100,32 @@ def run_ppt_vision_audit(): ) +def run_ppt_auto_generation_catchup_task(): + """每 10 分鐘補跑被同步長任務錯過的定期 PPT 產出。""" + try: + from services.ppt_auto_generation_service import start_scheduled_ppt_catchup_background + + result = start_scheduled_ppt_catchup_background() + if result.get("status") == "queued": + logger.info( + "[PPTAutoGenerationCatchup] queued kinds=%s", + ",".join(result.get("schedule_kinds") or []), + ) + elif result.get("status") == "already_running": + logger.debug("[PPTAutoGenerationCatchup] skipped; generation already running") + else: + logger.debug("[PPTAutoGenerationCatchup] status=%s", result.get("status")) + except Exception as e: + logger.error(f"[PPTAutoGenerationCatchup] task failed: {e}", exc_info=True) + _notify_scheduler_failure( + "run_ppt_auto_generation_catchup_task", + e, + source="Scheduler.PPTAutoGeneration", + event_type="ppt_auto_generation_catchup_failure", + title="PPT 定期簡報補跑失敗", + ) + + def run_roi_monthly_report_if_new_month(): """每日 09:00 — Phase 24 ROI 月報(內部判斷月初第 1 日才送) diff --git a/services/mcp_collector_service.py b/services/mcp_collector_service.py index f903224..511f8a8 100644 --- a/services/mcp_collector_service.py +++ b/services/mcp_collector_service.py @@ -47,6 +47,11 @@ try: except ImportError: _OLLAMA_AVAILABLE = False + +def _fast_static_fallback_enabled() -> bool: + return os.getenv("MCP_FAST_STATIC_FALLBACK", "").strip().lower() in {"1", "true", "yes", "on"} + + # ── 查詢主題定義 ──────────────────────────────────────────────────────────── _SEARCH_TOPICS = { "market_trends": ( @@ -164,6 +169,9 @@ class MCPCollectorService: if cached: return cached + if _fast_static_fallback_enabled(): + return self._fallback_topic_content(topic, "定期簡報快速補跑:外部模型暫停,使用穩定行銷情報。") + # ─── Phase 10.5(2026-05-04):MCP omnisearch L0 路徑 ─── # MCP_ROUTER_ENABLED=true 且 docker-compose.mcp.yml 已 deploy 時, # 優先走 self-hosted Tavily/Exa(取代 Gemini Grounding 主路徑)。 diff --git a/services/ppt_auto_generation_service.py b/services/ppt_auto_generation_service.py index 948dbaf..e30e73c 100644 --- a/services/ppt_auto_generation_service.py +++ b/services/ppt_auto_generation_service.py @@ -13,7 +13,7 @@ import json import os import threading from dataclasses import asdict, dataclass -from datetime import datetime, timedelta, timezone +from datetime import date, datetime, timedelta, timezone from pathlib import Path from typing import Iterable, Sequence @@ -757,11 +757,14 @@ def get_generation_run_history( return items -def _generate_job(job: PPTAutoJob, *, force: bool = False) -> tuple[str | None, int]: +def _generate_job(job: PPTAutoJob, *, force: bool = False, schedule_kind: str = "manual") -> tuple[str | None, int]: from routes import openclaw_bot_routes as bot_routes original_send_message = getattr(bot_routes, "send_message", None) invalidated_count = _expire_matching_ppt_cache(job) if force else 0 + scheduled_fast_fallback = schedule_kind != "manual" + previous_fast_fallback = os.environ.get("PPT_SCHEDULED_FAST_FALLBACK") + previous_mcp_fast_fallback = os.environ.get("MCP_FAST_STATIC_FALLBACK") def _noop_send_message(*_args, **_kwargs): return None @@ -769,6 +772,9 @@ def _generate_job(job: PPTAutoJob, *, force: bool = False) -> tuple[str | None, if original_send_message is not None: bot_routes.send_message = _noop_send_message try: + if scheduled_fast_fallback: + os.environ["PPT_SCHEDULED_FAST_FALLBACK"] = "true" + os.environ["MCP_FAST_STATIC_FALLBACK"] = "true" path = bot_routes._generate_ppt_cmd( job.sub_type, job.sub_arg, @@ -778,6 +784,15 @@ def _generate_job(job: PPTAutoJob, *, force: bool = False) -> tuple[str | None, ) return path, invalidated_count finally: + if scheduled_fast_fallback: + if previous_fast_fallback is None: + os.environ.pop("PPT_SCHEDULED_FAST_FALLBACK", None) + else: + os.environ["PPT_SCHEDULED_FAST_FALLBACK"] = previous_fast_fallback + if previous_mcp_fast_fallback is None: + os.environ.pop("MCP_FAST_STATIC_FALLBACK", None) + else: + os.environ["MCP_FAST_STATIC_FALLBACK"] = previous_mcp_fast_fallback if original_send_message is not None: bot_routes.send_message = original_send_message @@ -829,7 +844,7 @@ def generate_defined_ppt_reports( item = asdict(job) job_started_at = datetime.now(TAIPEI_TZ) try: - path, invalidated_count = _generate_job(job, force=force) + path, invalidated_count = _generate_job(job, force=force, schedule_kind=schedule_kind) item["path"] = path item["cache_invalidated"] = invalidated_count item["exists"] = bool(path and os.path.exists(path)) @@ -926,6 +941,217 @@ def get_due_schedule_kinds(now: datetime | None = None) -> list[str]: return kinds +def _parse_cadence_time(kind: str) -> tuple[int, int]: + raw = (SCHEDULE_CADENCES.get(kind) or {}).get("time", "00:00") + hour, minute = raw.split(":", 1) + return int(hour), int(minute) + + +def _combine_local(run_date: date, kind: str) -> datetime: + hour, minute = _parse_cadence_time(kind) + return datetime(run_date.year, run_date.month, run_date.day, hour, minute) + + +def _previous_month(year: int, month: int) -> tuple[int, int]: + if month == 1: + return year - 1, 12 + return year, month - 1 + + +def _latest_month_occurrence(current: datetime, kind: str, allowed_months: set[int] | None = None) -> datetime: + year, month = current.year, current.month + for _ in range(24): + if allowed_months is None or month in allowed_months: + candidate = _combine_local(date(year, month, 1), kind) + if current >= candidate: + return candidate + year, month = _previous_month(year, month) + return _combine_local(date(current.year, current.month, 1), kind) + + +def get_latest_schedule_occurrence(kind: str, now: datetime | None = None) -> datetime | None: + """Return the most recent planned occurrence for a PPT schedule kind. + + The Python `schedule` package intentionally does not replay missed jobs. + This helper lets the PPT pipeline detect a missed daily/weekly/monthly slot + after long crawler or AI jobs release the scheduler loop. + """ + if kind not in SCHEDULE_PROFILES: + return None + current = now or datetime.now(TAIPEI_TZ) + if current.tzinfo is not None: + current = current.astimezone(TAIPEI_TZ).replace(tzinfo=None) + + if kind == "daily": + candidate = _combine_local(current.date(), kind) + return candidate if current >= candidate else candidate - timedelta(days=1) + + if kind == "weekly": + monday = current.date() - timedelta(days=current.weekday()) + candidate = _combine_local(monday, kind) + return candidate if current >= candidate else candidate - timedelta(days=7) + + if kind == "monthly": + return _latest_month_occurrence(current, kind) + if kind == "quarterly": + return _latest_month_occurrence(current, kind, {1, 4, 7, 10}) + if kind == "half_yearly": + return _latest_month_occurrence(current, kind, {1, 7}) + if kind == "annual": + return _latest_month_occurrence(current, kind, {1}) + return None + + +def _ready_report_types_since(jobs: Sequence[PPTAutoJob], since: datetime) -> set[str]: + """Return report types that already have matching DB rows and files.""" + if not jobs: + return set() + expected_params = {job.report_type: job.expected_params for job in jobs} + ready: set[str] = set() + try: + session = get_session() + try: + rows = session.execute( + sa_text( + """ + SELECT report_type, parameters, file_path + FROM ppt_reports + WHERE generated_at >= :since + """ + ), + {"since": since}, + ).fetchall() + finally: + session.close() + except Exception: + return ready + + for report_type, parameters, file_path in rows: + report_type = str(report_type or "") + if report_type not in expected_params or report_type in ready: + continue + if file_path and not os.path.exists(str(file_path)): + continue + if _params_match(_parse_cache_params(parameters), expected_params[report_type]): + ready.add(report_type) + return ready + + +def get_scheduled_ppt_catchup_plan( + *, + now: datetime | None = None, + schedule_kinds: Iterable[str] | None = None, +) -> list[dict]: + """Build a catch-up plan for missed periodic PPT generations.""" + current = now or datetime.now(TAIPEI_TZ) + kinds = list(schedule_kinds or SCHEDULE_PROFILES.keys()) + plan: list[dict] = [] + for kind in kinds: + report_types = SCHEDULE_PROFILES.get(kind) + scheduled_at = get_latest_schedule_occurrence(kind, current) + if not report_types or scheduled_at is None: + continue + jobs = build_defined_ppt_jobs(report_types=report_types) + ready_types = _ready_report_types_since(jobs, scheduled_at) + missing_jobs = [job for job in jobs if job.report_type not in ready_types] + plan.append({ + "schedule_kind": kind, + "schedule_label": SCHEDULE_CADENCES.get(kind, {}).get("label", kind), + "scheduled_at": scheduled_at.strftime("%Y-%m-%d %H:%M"), + "ready_report_types": sorted(ready_types), + "missing_report_types": [job.report_type for job in missing_jobs], + "missing_report_labels": [REPORT_TYPE_LABELS.get(job.report_type, job.report_type) for job in missing_jobs], + "ready": not missing_jobs, + }) + return plan + + +def catch_up_scheduled_ppt_reports( + *, + now: datetime | None = None, + force: bool = False, + schedule_kinds: Iterable[str] | None = None, +) -> dict: + """Generate missing scheduled PPT decks that were skipped by a blocked loop.""" + global _LAST_RUN + + if not force and not is_ppt_auto_generation_enabled(): + result = { + "ok": False, + "status": "disabled", + "message": "PPT_AUTO_GENERATION_ENABLED=false", + "runs": [], + } + _LAST_RUN = result + return result + + plan = get_scheduled_ppt_catchup_plan(now=now, schedule_kinds=schedule_kinds) + runs = [] + for item in plan: + missing = item.get("missing_report_types") or [] + if not missing: + continue + run = generate_defined_ppt_reports( + report_types=missing, + schedule_kind=item["schedule_kind"], + force=force, + ) + run["schedule_kind"] = item["schedule_kind"] + run["catchup_scheduled_at"] = item.get("scheduled_at") + run["catchup_missing_report_types"] = missing + runs.append(run) + + result = { + "ok": all(run.get("ok", False) for run in runs) if runs else True, + "status": "completed" if runs else "skipped", + "plan": plan, + "runs": runs, + "generated_kinds": [ + run.get("schedule_kind") + for run in runs + if run.get("schedule_kind") + ], + "ready": sum(int(run.get("ready") or 0) for run in runs), + "errors": sum(int(run.get("errors") or 0) for run in runs), + } + _LAST_RUN = result + return result + + +def start_scheduled_ppt_catchup_background( + *, + now: datetime | None = None, + force: bool = False, + schedule_kinds: Iterable[str] | None = None, +) -> dict: + """Queue scheduled catch-up without blocking the scheduler main loop.""" + if _RUN_LOCK.locked(): + return { + "ok": True, + "status": "already_running", + "message": "PPT auto-generation is already running.", + "last_run": _LAST_RUN, + } + + planned_kinds = list(schedule_kinds or SCHEDULE_PROFILES.keys()) + + def _run(): + catch_up_scheduled_ppt_reports( + now=now, + force=force, + schedule_kinds=planned_kinds, + ) + + thread = threading.Thread(target=_run, name="ppt-auto-generation-catchup", daemon=True) + thread.start() + return { + "ok": True, + "status": "queued", + "message": "PPT scheduled catch-up queued.", + "schedule_kinds": planned_kinds, + } + + def generate_scheduled_ppt_reports( *, schedule_kind: str | None = None, diff --git a/tests/test_ppt_auto_generation_service.py b/tests/test_ppt_auto_generation_service.py index b7f5ce2..1538300 100644 --- a/tests/test_ppt_auto_generation_service.py +++ b/tests/test_ppt_auto_generation_service.py @@ -1,5 +1,6 @@ from datetime import datetime import json +import os def test_build_defined_ppt_jobs_uses_latest_date(): @@ -128,6 +129,182 @@ def test_due_schedule_kinds_include_periodic_boundaries(): ] +def test_latest_schedule_occurrence_replays_missed_slots(): + from services.ppt_auto_generation_service import get_latest_schedule_occurrence + + assert get_latest_schedule_occurrence("daily", datetime(2026, 6, 6, 14, 0)) == datetime(2026, 6, 5, 20, 30) + assert get_latest_schedule_occurrence("daily", datetime(2026, 6, 6, 21, 0)) == datetime(2026, 6, 6, 20, 30) + assert get_latest_schedule_occurrence("weekly", datetime(2026, 6, 6, 14, 0)) == datetime(2026, 6, 1, 20, 40) + assert get_latest_schedule_occurrence("monthly", datetime(2026, 6, 6, 14, 0)) == datetime(2026, 6, 1, 20, 50) + assert get_latest_schedule_occurrence("quarterly", datetime(2026, 6, 6, 14, 0)) == datetime(2026, 4, 1, 21, 0) + assert get_latest_schedule_occurrence("half_yearly", datetime(2026, 6, 6, 14, 0)) == datetime(2026, 1, 1, 21, 10) + assert get_latest_schedule_occurrence("annual", datetime(2026, 6, 6, 14, 0)) == datetime(2026, 1, 1, 21, 20) + + +def test_ppt_catchup_plan_marks_missing_after_missed_slot(monkeypatch): + from services import ppt_auto_generation_service as svc + + class _Rows: + def fetchall(self): + return [] + + class _Session: + def execute(self, *_args, **_kwargs): + return _Rows() + + def close(self): + return None + + monkeypatch.setattr(svc, "get_session", lambda: _Session()) + monkeypatch.setattr(svc, "_latest_sales_date", lambda: "2026-06-04") + + plan = svc.get_scheduled_ppt_catchup_plan( + now=datetime(2026, 6, 6, 14, 0), + schedule_kinds=["daily"], + ) + + assert plan[0]["scheduled_at"] == "2026-06-05 20:30" + assert plan[0]["missing_report_types"] == ["daily"] + assert plan[0]["ready"] is False + + +def test_ppt_catchup_plan_uses_existing_exact_report(monkeypatch, tmp_path): + from services import ppt_auto_generation_service as svc + + pptx = tmp_path / "ocbot_daily_ok.pptx" + pptx.write_bytes(b"pptx") + + class _Rows: + def fetchall(self): + return [ + ( + "daily", + json.dumps({"report_type": "daily", "date": "2026/06/04"}), + str(pptx), + ) + ] + + class _Session: + def execute(self, *_args, **_kwargs): + return _Rows() + + def close(self): + return None + + monkeypatch.setattr(svc, "get_session", lambda: _Session()) + monkeypatch.setattr(svc, "_latest_sales_date", lambda: "2026-06-04") + + plan = svc.get_scheduled_ppt_catchup_plan( + now=datetime(2026, 6, 6, 14, 0), + schedule_kinds=["daily"], + ) + + assert plan[0]["ready_report_types"] == ["daily"] + assert plan[0]["missing_report_types"] == [] + assert plan[0]["ready"] is True + + +def test_ppt_catchup_generates_only_missing_types(monkeypatch): + from services import ppt_auto_generation_service as svc + + calls = [] + + monkeypatch.setattr( + svc, + "get_scheduled_ppt_catchup_plan", + lambda **_kwargs: [{ + "schedule_kind": "weekly", + "scheduled_at": "2026-06-01 20:40", + "missing_report_types": ["market_intel"], + }], + ) + + def fake_generate_defined_ppt_reports(**kwargs): + calls.append(kwargs) + return {"ok": True, "ready": 1, "errors": 0, "jobs": [{"report_type": "market_intel"}]} + + monkeypatch.setattr(svc, "generate_defined_ppt_reports", fake_generate_defined_ppt_reports) + + result = svc.catch_up_scheduled_ppt_reports() + + assert result["status"] == "completed" + assert result["generated_kinds"] == ["weekly"] + assert calls == [{ + "report_types": ["market_intel"], + "schedule_kind": "weekly", + "force": False, + }] + + +def test_ppt_catchup_background_queues_without_main_loop_block(monkeypatch): + from services import ppt_auto_generation_service as svc + + calls = [] + threads = [] + + class _Thread: + def __init__(self, *, target, name, daemon): + self.target = target + self.name = name + self.daemon = daemon + threads.append(self) + + def start(self): + self.target() + + def fake_catchup(**kwargs): + calls.append(kwargs) + return {"ok": True, "status": "skipped", "runs": []} + + monkeypatch.setattr(svc.threading, "Thread", _Thread) + monkeypatch.setattr(svc, "catch_up_scheduled_ppt_reports", fake_catchup) + + result = svc.start_scheduled_ppt_catchup_background(schedule_kinds=["daily", "weekly"]) + + assert result["status"] == "queued" + assert result["schedule_kinds"] == ["daily", "weekly"] + assert threads[0].name == "ppt-auto-generation-catchup" + assert threads[0].daemon is True + assert calls == [{ + "now": None, + "force": False, + "schedule_kinds": ["daily", "weekly"], + }] + + +def test_scheduled_generation_sets_fast_fallback_env(monkeypatch, tmp_path): + from routes import openclaw_bot_routes as bot_routes + from services import ppt_auto_generation_service as svc + + output = tmp_path / "ocbot_market_intel_ok.pptx" + output.write_bytes(b"pptx") + observed = {} + job = svc.build_defined_ppt_jobs( + latest_date="2026-05-11", + report_types=["market_intel"], + )[0] + + monkeypatch.delenv("PPT_SCHEDULED_FAST_FALLBACK", raising=False) + monkeypatch.delenv("MCP_FAST_STATIC_FALLBACK", raising=False) + monkeypatch.setattr(svc, "_expire_matching_ppt_cache", lambda _job: 0) + monkeypatch.setattr(bot_routes, "send_message", lambda *_args, **_kwargs: None, raising=False) + + def fake_generate_ppt_cmd(*_args, **_kwargs): + observed["ppt_fast"] = os.getenv("PPT_SCHEDULED_FAST_FALLBACK") + observed["mcp_fast"] = os.getenv("MCP_FAST_STATIC_FALLBACK") + return str(output) + + monkeypatch.setattr(bot_routes, "_generate_ppt_cmd", fake_generate_ppt_cmd) + + path, invalidated = svc._generate_job(job, schedule_kind="weekly") + + assert path == str(output) + assert invalidated == 0 + assert observed == {"ppt_fast": "true", "mcp_fast": "true"} + assert os.getenv("PPT_SCHEDULED_FAST_FALLBACK") is None + assert os.getenv("MCP_FAST_STATIC_FALLBACK") is None + + def test_schedule_cadence_status_exposes_all_periodic_contracts(): from services.ppt_auto_generation_service import get_schedule_cadence_status