修復定期簡報漏產與補跑可靠性

This commit is contained in:
OoO
2026-06-06 15:25:20 +08:00
parent d6d8777e41
commit 8fa95b94a9
8 changed files with 475 additions and 5 deletions

View File

@@ -402,7 +402,7 @@ YOUTUBE_API_KEY = os.getenv('YOUTUBE_API_KEY', '')
# ==========================================
# 系統版本與路徑
# ==========================================
SYSTEM_VERSION = "V10.601"
SYSTEM_VERSION = "V10.604"
LOG_FILE_PATH = os.path.join(BASE_DIR, 'logs/system.log')
public_url = PUBLIC_URL # 用於模板顯示

View File

@@ -156,6 +156,7 @@
- 每次產出與視覺 QA 結果必須完整寫入 DB。
- `/observability/ppt_audit_history` 必須清楚顯示 runtime 狀態、產出狀態、視覺 QA、問題追蹤與可預覽檔案。
- PPTX / PDF 預覽需可站內直接開啟,不能只下載。
- 2026-06-06 起,`V10.604` 修正定期簡報長期漏產:根因為 `schedule.run_pending()` 同步執行20:30/20:40/20:50 精準時段會被 feeder / AI 長任務卡過且不自動 replay。新增每 10 分鐘 missed-run catch-up、scheduler 背景化補跑、排程型市場情報與價格甜蜜點快速 fallbackproduction 已補齊 `daily``market_intel``price_elasticity`catch-up plan 顯示 daily/weekly/monthly/quarterly/half_yearly/annual 全數 ready`/observability/ppt_audit_history` 可看到新檔與預覽入口。
## 6. 外部 BI / 協作入口

View File

@@ -18,6 +18,7 @@ Operation Ollama-First v5.0 / Phase 27 — Admin Observability Dashboard
"""
import logging
import re
import threading
import time
from datetime import datetime, timedelta
@@ -2511,6 +2512,22 @@ def _guess_ppt_report_type_from_filename(filename: str) -> str:
return ""
def _ppt_filename_matches_month(filename: str, *, year: int, month: int) -> bool:
"""判斷檔名是否明確帶有指定月份,用於補足歷史檔案 mtime 漂移。"""
name = str(filename or "")
for match in re.finditer(r"(?<!\d)(20\d{2})(0[1-9]|1[0-2])([0-3]\d)(?!\d)", name):
try:
parsed = datetime(int(match.group(1)), int(match.group(2)), int(match.group(3)))
except ValueError:
continue
if parsed.year == year and parsed.month == month:
return True
for match in re.finditer(r"(?<!\d)(20\d{2})(0[1-9]|1[0-2])(?!\d)", name):
if int(match.group(1)) == year and int(match.group(2)) == month:
return True
return False
def _build_ppt_pipeline_view(files, auto_generation, audit_stats, generation_runs, vision_status, audit_records=None):
"""Compose page-level PPT pipeline health so the template stays declarative."""
files = files or []
@@ -3147,7 +3164,11 @@ def ppt_audit_history():
continue
try:
mtime = os.path.getmtime(full)
if month_start_ts <= mtime < month_end_ts:
matches_selected_month = (
month_start_ts <= mtime < month_end_ts
or _ppt_filename_matches_month(f, year=target_year, month=target_month)
)
if matches_selected_month:
is_valid, check_msg = _inspect_ppt_file(full)
files_by_name[f] = {
'source': 'filesystem',

View File

@@ -1899,6 +1899,10 @@ def _clean_ai_text(text: str) -> str:
return text.strip()
def _scheduled_ppt_fast_fallback_enabled() -> bool:
return os.getenv("PPT_SCHEDULED_FAST_FALLBACK", "").strip().lower() in {"1", "true", "yes", "on"}
def _ppt_ai_analysis(prompt_data: str, report_type: str = '') -> str:
"""
用 NIM DeepSeek 生成簡報 AI 分析文字
@@ -1919,6 +1923,9 @@ def _ppt_ai_analysis(prompt_data: str, report_type: str = '') -> str:
is_price_elast = '價格彈性' in report_type or 'price_elasticity' in report_type
is_5forces = '五力' in report_type or 'competitor_v4' in report_type or '競業五力' in report_type
if _scheduled_ppt_fast_fallback_enabled() and (is_market_intel or is_price_elast):
return _ppt_fallback_insight(report_type or '簡報', prompt_data, '')
# ── 格式鐵律(所有 prompt 共用後綴)────────────────────────
FORMAT_RULES = (
"\n\n【輸出格式鐵律 — 絕對遵守】\n"

View File

@@ -8,6 +8,7 @@ run_scheduler.py — momo-scheduler 容器入口點
每 4 小時competitor_price_feeder、icaim_analysis
每 6 小時quality_rescore、action_plan_hygiene
每 12 小時dedup_batch
每 10 分鐘ppt_auto_generation_catchup補跑被長任務卡過的定期簡報
每 1 天 db_backup03:00、cleanup_agent_context03:30、backup_monitor04:00、daily_report09:00、roi_monthly_report gate09:05、ai_smoke_summary09:10、observability_daily_summary09:30、pchome_match_backfill10:30、openclaw_meta_analysis12:00, Phase 4 降頻、ppt_auto_generation_daily20:30、ppt_vision_audit22:00、daily_token_report23:55
每 1 週 weekly_strategy週一 06:00、ppt_auto_generation_weekly週一 20:40
每 1 月 monthly_report每月1日 07:00、ppt_auto_generation_monthly每月1日 20:50
@@ -274,6 +275,9 @@ def _register_schedules():
schedule.every().day.at("21:20").do(_ppt_annual_gate)
logger.info("📅 每年1月1日 21:20ppt_auto_generation_annual年報")
schedule.every(10).minutes.do(run_ppt_auto_generation_catchup_task)
logger.info("📅 每 10 分鐘ppt_auto_generation_catchup補跑錯過的定期簡報")
# Phase 26: PPT 視覺審核(每日 22:00 掃當天新生 .pptx有 issues 才推 Telegram
schedule.every().day.at("22:00").do(run_ppt_vision_audit)
logger.info("📅 每日 22:00ppt_vision_auditPPT_VISION_ENABLED=true 才生效)")
@@ -1096,6 +1100,32 @@ def run_ppt_vision_audit():
)
def run_ppt_auto_generation_catchup_task():
"""每 10 分鐘補跑被同步長任務錯過的定期 PPT 產出。"""
try:
from services.ppt_auto_generation_service import start_scheduled_ppt_catchup_background
result = start_scheduled_ppt_catchup_background()
if result.get("status") == "queued":
logger.info(
"[PPTAutoGenerationCatchup] queued kinds=%s",
",".join(result.get("schedule_kinds") or []),
)
elif result.get("status") == "already_running":
logger.debug("[PPTAutoGenerationCatchup] skipped; generation already running")
else:
logger.debug("[PPTAutoGenerationCatchup] status=%s", result.get("status"))
except Exception as e:
logger.error(f"[PPTAutoGenerationCatchup] task failed: {e}", exc_info=True)
_notify_scheduler_failure(
"run_ppt_auto_generation_catchup_task",
e,
source="Scheduler.PPTAutoGeneration",
event_type="ppt_auto_generation_catchup_failure",
title="PPT 定期簡報補跑失敗",
)
def run_roi_monthly_report_if_new_month():
"""每日 09:00 — Phase 24 ROI 月報(內部判斷月初第 1 日才送)

View File

@@ -47,6 +47,11 @@ try:
except ImportError:
_OLLAMA_AVAILABLE = False
def _fast_static_fallback_enabled() -> bool:
return os.getenv("MCP_FAST_STATIC_FALLBACK", "").strip().lower() in {"1", "true", "yes", "on"}
# ── 查詢主題定義 ────────────────────────────────────────────────────────────
_SEARCH_TOPICS = {
"market_trends": (
@@ -164,6 +169,9 @@ class MCPCollectorService:
if cached:
return cached
if _fast_static_fallback_enabled():
return self._fallback_topic_content(topic, "定期簡報快速補跑:外部模型暫停,使用穩定行銷情報。")
# ─── Phase 10.52026-05-04MCP omnisearch L0 路徑 ───
# MCP_ROUTER_ENABLED=true 且 docker-compose.mcp.yml 已 deploy 時,
# 優先走 self-hosted Tavily/Exa取代 Gemini Grounding 主路徑)。

View File

@@ -13,7 +13,7 @@ import json
import os
import threading
from dataclasses import asdict, dataclass
from datetime import datetime, timedelta, timezone
from datetime import date, datetime, timedelta, timezone
from pathlib import Path
from typing import Iterable, Sequence
@@ -757,11 +757,14 @@ def get_generation_run_history(
return items
def _generate_job(job: PPTAutoJob, *, force: bool = False) -> tuple[str | None, int]:
def _generate_job(job: PPTAutoJob, *, force: bool = False, schedule_kind: str = "manual") -> tuple[str | None, int]:
from routes import openclaw_bot_routes as bot_routes
original_send_message = getattr(bot_routes, "send_message", None)
invalidated_count = _expire_matching_ppt_cache(job) if force else 0
scheduled_fast_fallback = schedule_kind != "manual"
previous_fast_fallback = os.environ.get("PPT_SCHEDULED_FAST_FALLBACK")
previous_mcp_fast_fallback = os.environ.get("MCP_FAST_STATIC_FALLBACK")
def _noop_send_message(*_args, **_kwargs):
return None
@@ -769,6 +772,9 @@ def _generate_job(job: PPTAutoJob, *, force: bool = False) -> tuple[str | None,
if original_send_message is not None:
bot_routes.send_message = _noop_send_message
try:
if scheduled_fast_fallback:
os.environ["PPT_SCHEDULED_FAST_FALLBACK"] = "true"
os.environ["MCP_FAST_STATIC_FALLBACK"] = "true"
path = bot_routes._generate_ppt_cmd(
job.sub_type,
job.sub_arg,
@@ -778,6 +784,15 @@ def _generate_job(job: PPTAutoJob, *, force: bool = False) -> tuple[str | None,
)
return path, invalidated_count
finally:
if scheduled_fast_fallback:
if previous_fast_fallback is None:
os.environ.pop("PPT_SCHEDULED_FAST_FALLBACK", None)
else:
os.environ["PPT_SCHEDULED_FAST_FALLBACK"] = previous_fast_fallback
if previous_mcp_fast_fallback is None:
os.environ.pop("MCP_FAST_STATIC_FALLBACK", None)
else:
os.environ["MCP_FAST_STATIC_FALLBACK"] = previous_mcp_fast_fallback
if original_send_message is not None:
bot_routes.send_message = original_send_message
@@ -829,7 +844,7 @@ def generate_defined_ppt_reports(
item = asdict(job)
job_started_at = datetime.now(TAIPEI_TZ)
try:
path, invalidated_count = _generate_job(job, force=force)
path, invalidated_count = _generate_job(job, force=force, schedule_kind=schedule_kind)
item["path"] = path
item["cache_invalidated"] = invalidated_count
item["exists"] = bool(path and os.path.exists(path))
@@ -926,6 +941,217 @@ def get_due_schedule_kinds(now: datetime | None = None) -> list[str]:
return kinds
def _parse_cadence_time(kind: str) -> tuple[int, int]:
raw = (SCHEDULE_CADENCES.get(kind) or {}).get("time", "00:00")
hour, minute = raw.split(":", 1)
return int(hour), int(minute)
def _combine_local(run_date: date, kind: str) -> datetime:
hour, minute = _parse_cadence_time(kind)
return datetime(run_date.year, run_date.month, run_date.day, hour, minute)
def _previous_month(year: int, month: int) -> tuple[int, int]:
if month == 1:
return year - 1, 12
return year, month - 1
def _latest_month_occurrence(current: datetime, kind: str, allowed_months: set[int] | None = None) -> datetime:
year, month = current.year, current.month
for _ in range(24):
if allowed_months is None or month in allowed_months:
candidate = _combine_local(date(year, month, 1), kind)
if current >= candidate:
return candidate
year, month = _previous_month(year, month)
return _combine_local(date(current.year, current.month, 1), kind)
def get_latest_schedule_occurrence(kind: str, now: datetime | None = None) -> datetime | None:
"""Return the most recent planned occurrence for a PPT schedule kind.
The Python `schedule` package intentionally does not replay missed jobs.
This helper lets the PPT pipeline detect a missed daily/weekly/monthly slot
after long crawler or AI jobs release the scheduler loop.
"""
if kind not in SCHEDULE_PROFILES:
return None
current = now or datetime.now(TAIPEI_TZ)
if current.tzinfo is not None:
current = current.astimezone(TAIPEI_TZ).replace(tzinfo=None)
if kind == "daily":
candidate = _combine_local(current.date(), kind)
return candidate if current >= candidate else candidate - timedelta(days=1)
if kind == "weekly":
monday = current.date() - timedelta(days=current.weekday())
candidate = _combine_local(monday, kind)
return candidate if current >= candidate else candidate - timedelta(days=7)
if kind == "monthly":
return _latest_month_occurrence(current, kind)
if kind == "quarterly":
return _latest_month_occurrence(current, kind, {1, 4, 7, 10})
if kind == "half_yearly":
return _latest_month_occurrence(current, kind, {1, 7})
if kind == "annual":
return _latest_month_occurrence(current, kind, {1})
return None
def _ready_report_types_since(jobs: Sequence[PPTAutoJob], since: datetime) -> set[str]:
"""Return report types that already have matching DB rows and files."""
if not jobs:
return set()
expected_params = {job.report_type: job.expected_params for job in jobs}
ready: set[str] = set()
try:
session = get_session()
try:
rows = session.execute(
sa_text(
"""
SELECT report_type, parameters, file_path
FROM ppt_reports
WHERE generated_at >= :since
"""
),
{"since": since},
).fetchall()
finally:
session.close()
except Exception:
return ready
for report_type, parameters, file_path in rows:
report_type = str(report_type or "")
if report_type not in expected_params or report_type in ready:
continue
if file_path and not os.path.exists(str(file_path)):
continue
if _params_match(_parse_cache_params(parameters), expected_params[report_type]):
ready.add(report_type)
return ready
def get_scheduled_ppt_catchup_plan(
*,
now: datetime | None = None,
schedule_kinds: Iterable[str] | None = None,
) -> list[dict]:
"""Build a catch-up plan for missed periodic PPT generations."""
current = now or datetime.now(TAIPEI_TZ)
kinds = list(schedule_kinds or SCHEDULE_PROFILES.keys())
plan: list[dict] = []
for kind in kinds:
report_types = SCHEDULE_PROFILES.get(kind)
scheduled_at = get_latest_schedule_occurrence(kind, current)
if not report_types or scheduled_at is None:
continue
jobs = build_defined_ppt_jobs(report_types=report_types)
ready_types = _ready_report_types_since(jobs, scheduled_at)
missing_jobs = [job for job in jobs if job.report_type not in ready_types]
plan.append({
"schedule_kind": kind,
"schedule_label": SCHEDULE_CADENCES.get(kind, {}).get("label", kind),
"scheduled_at": scheduled_at.strftime("%Y-%m-%d %H:%M"),
"ready_report_types": sorted(ready_types),
"missing_report_types": [job.report_type for job in missing_jobs],
"missing_report_labels": [REPORT_TYPE_LABELS.get(job.report_type, job.report_type) for job in missing_jobs],
"ready": not missing_jobs,
})
return plan
def catch_up_scheduled_ppt_reports(
*,
now: datetime | None = None,
force: bool = False,
schedule_kinds: Iterable[str] | None = None,
) -> dict:
"""Generate missing scheduled PPT decks that were skipped by a blocked loop."""
global _LAST_RUN
if not force and not is_ppt_auto_generation_enabled():
result = {
"ok": False,
"status": "disabled",
"message": "PPT_AUTO_GENERATION_ENABLED=false",
"runs": [],
}
_LAST_RUN = result
return result
plan = get_scheduled_ppt_catchup_plan(now=now, schedule_kinds=schedule_kinds)
runs = []
for item in plan:
missing = item.get("missing_report_types") or []
if not missing:
continue
run = generate_defined_ppt_reports(
report_types=missing,
schedule_kind=item["schedule_kind"],
force=force,
)
run["schedule_kind"] = item["schedule_kind"]
run["catchup_scheduled_at"] = item.get("scheduled_at")
run["catchup_missing_report_types"] = missing
runs.append(run)
result = {
"ok": all(run.get("ok", False) for run in runs) if runs else True,
"status": "completed" if runs else "skipped",
"plan": plan,
"runs": runs,
"generated_kinds": [
run.get("schedule_kind")
for run in runs
if run.get("schedule_kind")
],
"ready": sum(int(run.get("ready") or 0) for run in runs),
"errors": sum(int(run.get("errors") or 0) for run in runs),
}
_LAST_RUN = result
return result
def start_scheduled_ppt_catchup_background(
*,
now: datetime | None = None,
force: bool = False,
schedule_kinds: Iterable[str] | None = None,
) -> dict:
"""Queue scheduled catch-up without blocking the scheduler main loop."""
if _RUN_LOCK.locked():
return {
"ok": True,
"status": "already_running",
"message": "PPT auto-generation is already running.",
"last_run": _LAST_RUN,
}
planned_kinds = list(schedule_kinds or SCHEDULE_PROFILES.keys())
def _run():
catch_up_scheduled_ppt_reports(
now=now,
force=force,
schedule_kinds=planned_kinds,
)
thread = threading.Thread(target=_run, name="ppt-auto-generation-catchup", daemon=True)
thread.start()
return {
"ok": True,
"status": "queued",
"message": "PPT scheduled catch-up queued.",
"schedule_kinds": planned_kinds,
}
def generate_scheduled_ppt_reports(
*,
schedule_kind: str | None = None,

View File

@@ -1,5 +1,6 @@
from datetime import datetime
import json
import os
def test_build_defined_ppt_jobs_uses_latest_date():
@@ -128,6 +129,182 @@ def test_due_schedule_kinds_include_periodic_boundaries():
]
def test_latest_schedule_occurrence_replays_missed_slots():
from services.ppt_auto_generation_service import get_latest_schedule_occurrence
assert get_latest_schedule_occurrence("daily", datetime(2026, 6, 6, 14, 0)) == datetime(2026, 6, 5, 20, 30)
assert get_latest_schedule_occurrence("daily", datetime(2026, 6, 6, 21, 0)) == datetime(2026, 6, 6, 20, 30)
assert get_latest_schedule_occurrence("weekly", datetime(2026, 6, 6, 14, 0)) == datetime(2026, 6, 1, 20, 40)
assert get_latest_schedule_occurrence("monthly", datetime(2026, 6, 6, 14, 0)) == datetime(2026, 6, 1, 20, 50)
assert get_latest_schedule_occurrence("quarterly", datetime(2026, 6, 6, 14, 0)) == datetime(2026, 4, 1, 21, 0)
assert get_latest_schedule_occurrence("half_yearly", datetime(2026, 6, 6, 14, 0)) == datetime(2026, 1, 1, 21, 10)
assert get_latest_schedule_occurrence("annual", datetime(2026, 6, 6, 14, 0)) == datetime(2026, 1, 1, 21, 20)
def test_ppt_catchup_plan_marks_missing_after_missed_slot(monkeypatch):
from services import ppt_auto_generation_service as svc
class _Rows:
def fetchall(self):
return []
class _Session:
def execute(self, *_args, **_kwargs):
return _Rows()
def close(self):
return None
monkeypatch.setattr(svc, "get_session", lambda: _Session())
monkeypatch.setattr(svc, "_latest_sales_date", lambda: "2026-06-04")
plan = svc.get_scheduled_ppt_catchup_plan(
now=datetime(2026, 6, 6, 14, 0),
schedule_kinds=["daily"],
)
assert plan[0]["scheduled_at"] == "2026-06-05 20:30"
assert plan[0]["missing_report_types"] == ["daily"]
assert plan[0]["ready"] is False
def test_ppt_catchup_plan_uses_existing_exact_report(monkeypatch, tmp_path):
from services import ppt_auto_generation_service as svc
pptx = tmp_path / "ocbot_daily_ok.pptx"
pptx.write_bytes(b"pptx")
class _Rows:
def fetchall(self):
return [
(
"daily",
json.dumps({"report_type": "daily", "date": "2026/06/04"}),
str(pptx),
)
]
class _Session:
def execute(self, *_args, **_kwargs):
return _Rows()
def close(self):
return None
monkeypatch.setattr(svc, "get_session", lambda: _Session())
monkeypatch.setattr(svc, "_latest_sales_date", lambda: "2026-06-04")
plan = svc.get_scheduled_ppt_catchup_plan(
now=datetime(2026, 6, 6, 14, 0),
schedule_kinds=["daily"],
)
assert plan[0]["ready_report_types"] == ["daily"]
assert plan[0]["missing_report_types"] == []
assert plan[0]["ready"] is True
def test_ppt_catchup_generates_only_missing_types(monkeypatch):
from services import ppt_auto_generation_service as svc
calls = []
monkeypatch.setattr(
svc,
"get_scheduled_ppt_catchup_plan",
lambda **_kwargs: [{
"schedule_kind": "weekly",
"scheduled_at": "2026-06-01 20:40",
"missing_report_types": ["market_intel"],
}],
)
def fake_generate_defined_ppt_reports(**kwargs):
calls.append(kwargs)
return {"ok": True, "ready": 1, "errors": 0, "jobs": [{"report_type": "market_intel"}]}
monkeypatch.setattr(svc, "generate_defined_ppt_reports", fake_generate_defined_ppt_reports)
result = svc.catch_up_scheduled_ppt_reports()
assert result["status"] == "completed"
assert result["generated_kinds"] == ["weekly"]
assert calls == [{
"report_types": ["market_intel"],
"schedule_kind": "weekly",
"force": False,
}]
def test_ppt_catchup_background_queues_without_main_loop_block(monkeypatch):
from services import ppt_auto_generation_service as svc
calls = []
threads = []
class _Thread:
def __init__(self, *, target, name, daemon):
self.target = target
self.name = name
self.daemon = daemon
threads.append(self)
def start(self):
self.target()
def fake_catchup(**kwargs):
calls.append(kwargs)
return {"ok": True, "status": "skipped", "runs": []}
monkeypatch.setattr(svc.threading, "Thread", _Thread)
monkeypatch.setattr(svc, "catch_up_scheduled_ppt_reports", fake_catchup)
result = svc.start_scheduled_ppt_catchup_background(schedule_kinds=["daily", "weekly"])
assert result["status"] == "queued"
assert result["schedule_kinds"] == ["daily", "weekly"]
assert threads[0].name == "ppt-auto-generation-catchup"
assert threads[0].daemon is True
assert calls == [{
"now": None,
"force": False,
"schedule_kinds": ["daily", "weekly"],
}]
def test_scheduled_generation_sets_fast_fallback_env(monkeypatch, tmp_path):
from routes import openclaw_bot_routes as bot_routes
from services import ppt_auto_generation_service as svc
output = tmp_path / "ocbot_market_intel_ok.pptx"
output.write_bytes(b"pptx")
observed = {}
job = svc.build_defined_ppt_jobs(
latest_date="2026-05-11",
report_types=["market_intel"],
)[0]
monkeypatch.delenv("PPT_SCHEDULED_FAST_FALLBACK", raising=False)
monkeypatch.delenv("MCP_FAST_STATIC_FALLBACK", raising=False)
monkeypatch.setattr(svc, "_expire_matching_ppt_cache", lambda _job: 0)
monkeypatch.setattr(bot_routes, "send_message", lambda *_args, **_kwargs: None, raising=False)
def fake_generate_ppt_cmd(*_args, **_kwargs):
observed["ppt_fast"] = os.getenv("PPT_SCHEDULED_FAST_FALLBACK")
observed["mcp_fast"] = os.getenv("MCP_FAST_STATIC_FALLBACK")
return str(output)
monkeypatch.setattr(bot_routes, "_generate_ppt_cmd", fake_generate_ppt_cmd)
path, invalidated = svc._generate_job(job, schedule_kind="weekly")
assert path == str(output)
assert invalidated == 0
assert observed == {"ppt_fast": "true", "mcp_fast": "true"}
assert os.getenv("PPT_SCHEDULED_FAST_FALLBACK") is None
assert os.getenv("MCP_FAST_STATIC_FALLBACK") is None
def test_schedule_cadence_status_exposes_all_periodic_contracts():
from services.ppt_auto_generation_service import get_schedule_cadence_status