1189 lines
42 KiB
Python
1189 lines
42 KiB
Python
"""
|
||
PPT auto-generation orchestration.
|
||
|
||
The observability page audits generated decks, but the scheduler previously
|
||
only ran the vision audit. This service fills that gap by materializing the
|
||
defined deck set before the audit window.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import calendar
|
||
import json
|
||
import os
|
||
import threading
|
||
from dataclasses import asdict, dataclass
|
||
from datetime import date, datetime, timedelta, timezone
|
||
from pathlib import Path
|
||
from typing import Iterable, Sequence
|
||
|
||
from sqlalchemy import text as sa_text
|
||
|
||
from database.manager import get_session
|
||
|
||
|
||
TAIPEI_TZ = timezone(timedelta(hours=8))
|
||
|
||
DEFINED_REPORT_TYPES = (
|
||
"daily",
|
||
"weekly",
|
||
"monthly",
|
||
"quarterly",
|
||
"half_yearly",
|
||
"annual",
|
||
"ttm",
|
||
"strategy",
|
||
"competitor",
|
||
"competitor_v4",
|
||
"promo",
|
||
"promo_compare",
|
||
"forecast_pre_event",
|
||
"vendor",
|
||
"category",
|
||
"customer",
|
||
"new_product",
|
||
"market_intel",
|
||
"price_elasticity",
|
||
)
|
||
|
||
REPORT_TYPE_LABELS = {
|
||
"daily": "每日日報",
|
||
"weekly": "週報",
|
||
"monthly": "月報",
|
||
"quarterly": "季報",
|
||
"half_yearly": "半年報",
|
||
"annual": "年報",
|
||
"ttm": "TTM 滾動 12 月",
|
||
"strategy": "策略",
|
||
"competitor": "競品",
|
||
"competitor_v4": "競業五力",
|
||
"promo": "促銷",
|
||
"promo_compare": "多活動比較",
|
||
"forecast_pre_event": "檔期前瞻",
|
||
"vendor": "廠商",
|
||
"category": "品類",
|
||
"customer": "客戶",
|
||
"new_product": "新品追蹤",
|
||
"market_intel": "市場情報",
|
||
"price_elasticity": "價格甜蜜點",
|
||
}
|
||
|
||
REPORT_PREFIXES = {key: f"ocbot_{key}_" for key in DEFINED_REPORT_TYPES}
|
||
|
||
SCHEDULE_PROFILES = {
|
||
"daily": ("daily",),
|
||
"weekly": ("weekly", "market_intel"),
|
||
"monthly": (
|
||
"monthly",
|
||
"strategy",
|
||
"competitor",
|
||
"competitor_v4",
|
||
"promo",
|
||
"promo_compare",
|
||
"forecast_pre_event",
|
||
"vendor",
|
||
"category",
|
||
"customer",
|
||
"new_product",
|
||
"price_elasticity",
|
||
"ttm",
|
||
),
|
||
"quarterly": ("quarterly",),
|
||
"half_yearly": ("half_yearly",),
|
||
"annual": ("annual",),
|
||
}
|
||
|
||
SCHEDULE_CADENCES = {
|
||
"daily": {
|
||
"label": "每日",
|
||
"schedule_text": "每日 20:30",
|
||
"gate": "每日固定產出",
|
||
"time": "20:30",
|
||
"description": "每日補齊營運日報,供 22:00 視覺 QA 接續審核。",
|
||
},
|
||
"weekly": {
|
||
"label": "每週",
|
||
"schedule_text": "每週一 20:40",
|
||
"gate": "週一固定產出",
|
||
"time": "20:40",
|
||
"description": "每週產出週報與市場情報,整理近 7 日變化。",
|
||
},
|
||
"monthly": {
|
||
"label": "每月",
|
||
"schedule_text": "每月 1 日 20:50",
|
||
"gate": "每月 1 日產出",
|
||
"time": "20:50",
|
||
"description": "每月產出月報、策略、競品、促銷、品類與管理型簡報。",
|
||
},
|
||
"quarterly": {
|
||
"label": "每季",
|
||
"schedule_text": "每季首日 21:00",
|
||
"gate": "1/4/7/10 月 1 日產出",
|
||
"time": "21:00",
|
||
"description": "每季首日產出季報,承接季度營運檢討。",
|
||
},
|
||
"half_yearly": {
|
||
"label": "每半年",
|
||
"schedule_text": "每半年首日 21:10",
|
||
"gate": "1/7 月 1 日產出",
|
||
"time": "21:10",
|
||
"description": "每半年首日產出半年報,彙整 H1 / H2 成果。",
|
||
},
|
||
"annual": {
|
||
"label": "每年",
|
||
"schedule_text": "每年 1/1 21:20",
|
||
"gate": "每年 1 月 1 日產出",
|
||
"time": "21:20",
|
||
"description": "每年首日產出年度總結,保留完整年度資料快照。",
|
||
},
|
||
}
|
||
|
||
_RUN_LOCK = threading.Lock()
|
||
_LAST_RUN: dict | None = None
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class PPTAutoJob:
|
||
report_type: str
|
||
label: str
|
||
sub_type: str
|
||
sub_arg: str
|
||
target_date: str
|
||
target_label: str
|
||
expected_params: dict
|
||
|
||
|
||
def _truthy(value: str | None, default: bool = False) -> bool:
|
||
if value is None:
|
||
return default
|
||
return value.strip().lower() in {"1", "true", "yes", "on"}
|
||
|
||
|
||
def is_ppt_auto_generation_enabled() -> bool:
|
||
return _truthy(os.getenv("PPT_AUTO_GENERATION_ENABLED"), default=True)
|
||
|
||
|
||
def _parse_report_types(report_types: Iterable[str] | str | None) -> list[str]:
|
||
if report_types is None:
|
||
raw = os.getenv("PPT_AUTO_REPORT_TYPES", ",".join(DEFINED_REPORT_TYPES))
|
||
parts = raw.split(",")
|
||
elif isinstance(report_types, str):
|
||
parts = report_types.split(",")
|
||
else:
|
||
parts = list(report_types)
|
||
|
||
parsed = []
|
||
for part in parts:
|
||
key = str(part or "").strip().lower()
|
||
if key == "all":
|
||
return list(DEFINED_REPORT_TYPES)
|
||
if key in DEFINED_REPORT_TYPES and key not in parsed:
|
||
parsed.append(key)
|
||
return parsed or list(DEFINED_REPORT_TYPES)
|
||
|
||
|
||
def _latest_sales_date() -> str | None:
|
||
try:
|
||
from routes.openclaw_bot_routes import latest_date
|
||
|
||
return latest_date()
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def _normalise_date(value: str | None) -> str:
|
||
if value:
|
||
cleaned = value.strip().replace("-", "/")
|
||
try:
|
||
dt = datetime.strptime(cleaned, "%Y/%m/%d")
|
||
return dt.strftime("%Y/%m/%d")
|
||
except ValueError:
|
||
pass
|
||
return (datetime.now(TAIPEI_TZ) - timedelta(days=1)).strftime("%Y/%m/%d")
|
||
|
||
|
||
def _target_datetime(latest_date: str | None = None) -> datetime:
|
||
target = _normalise_date(latest_date or _latest_sales_date())
|
||
return datetime.strptime(target, "%Y/%m/%d")
|
||
|
||
|
||
def _month_bounds(target_dt: datetime) -> tuple[str, str, str]:
|
||
start = f"{target_dt.year}/{target_dt.month:02d}/01"
|
||
last_day = calendar.monthrange(target_dt.year, target_dt.month)[1]
|
||
end = f"{target_dt.year}/{target_dt.month:02d}/{last_day:02d}"
|
||
label = f"{target_dt.year}/{target_dt.month:02d}"
|
||
return start, end, label
|
||
|
||
|
||
def _quarter_label(target_dt: datetime) -> str:
|
||
quarter = ((target_dt.month - 1) // 3) + 1
|
||
return f"{target_dt.year} Q{quarter}"
|
||
|
||
|
||
def _half_year_label(target_dt: datetime) -> str:
|
||
half = 1 if target_dt.month <= 6 else 2
|
||
return f"{target_dt.year} H{half}"
|
||
|
||
|
||
def _ttm_label(target_dt: datetime) -> str:
|
||
ttm_start = target_dt.date().replace(day=1) - timedelta(days=365)
|
||
ttm_start = ttm_start.replace(day=1)
|
||
return f"TTM {ttm_start.strftime('%Y/%m/%d')[:7]}~{target_dt.strftime('%Y/%m/%d')[:7]}"
|
||
|
||
|
||
def _week_label(target_dt: datetime) -> str:
|
||
week_start = target_dt.date() - timedelta(days=target_dt.weekday())
|
||
return f"{week_start.strftime('%Y/%m/%d')} 起一週"
|
||
|
||
|
||
def _default_category() -> str:
|
||
return os.getenv("PPT_AUTO_DEFAULT_CATEGORY", "美妝保養").strip() or "美妝保養"
|
||
|
||
|
||
def _default_forecast_event(target_dt: datetime) -> tuple[str, str]:
|
||
events = [
|
||
("618", f"{target_dt.year}/06/18"),
|
||
("七夕", f"{target_dt.year}/08/19"),
|
||
("雙11", f"{target_dt.year}/11/11"),
|
||
("雙12", f"{target_dt.year}/12/12"),
|
||
("母親節", f"{target_dt.year + 1}/05/10"),
|
||
]
|
||
today = target_dt.date()
|
||
for name, date_str in events:
|
||
try:
|
||
if datetime.strptime(date_str, "%Y/%m/%d").date() >= today:
|
||
return name, date_str
|
||
except ValueError:
|
||
continue
|
||
return "雙11", f"{target_dt.year}/11/11"
|
||
|
||
|
||
def get_report_type_options() -> list[dict]:
|
||
return [
|
||
{
|
||
"key": key,
|
||
"label": REPORT_TYPE_LABELS[key],
|
||
"prefix": REPORT_PREFIXES[key],
|
||
}
|
||
for key in DEFINED_REPORT_TYPES
|
||
] + [{"key": "all", "label": "全部", "prefix": "all"}]
|
||
|
||
|
||
def get_schedule_cadence_status(coverage_items: Sequence[dict] | None = None) -> list[dict]:
|
||
"""Return the scheduler contract with optional month coverage counts."""
|
||
item_by_type = {
|
||
str(item.get("key")): item
|
||
for item in (coverage_items or [])
|
||
if item.get("key")
|
||
}
|
||
cadences: list[dict] = []
|
||
for key, meta in SCHEDULE_CADENCES.items():
|
||
report_types = SCHEDULE_PROFILES.get(key, ())
|
||
related_items = [item_by_type[report_type] for report_type in report_types if report_type in item_by_type]
|
||
ready_count = sum(1 for item in related_items if item.get("ready"))
|
||
missing_types = [
|
||
report_type
|
||
for report_type in report_types
|
||
if not item_by_type.get(report_type, {}).get("ready")
|
||
]
|
||
total = len(report_types)
|
||
if total and not missing_types:
|
||
status = "ready"
|
||
status_label = "當期完整"
|
||
status_hint = "排程定義內的簡報都已找到目標版本。"
|
||
elif ready_count > 0:
|
||
status = "partial"
|
||
status_label = f"已完成 {ready_count}/{total}"
|
||
status_hint = "仍有部分簡報尚未補齊,需等排程或手動回補。"
|
||
else:
|
||
status = "missing"
|
||
status_label = "待產出"
|
||
status_hint = "當期尚未看到符合定義的簡報。"
|
||
cadences.append({
|
||
"key": key,
|
||
"label": meta["label"],
|
||
"schedule_text": meta["schedule_text"],
|
||
"gate": meta["gate"],
|
||
"time": meta["time"],
|
||
"description": meta["description"],
|
||
"report_types": list(report_types),
|
||
"report_labels": [REPORT_TYPE_LABELS.get(report_type, report_type) for report_type in report_types],
|
||
"ready_count": ready_count,
|
||
"missing_count": len(missing_types),
|
||
"missing_report_types": missing_types,
|
||
"missing_report_labels": [REPORT_TYPE_LABELS.get(report_type, report_type) for report_type in missing_types],
|
||
"total": total,
|
||
"progress_pct": round((ready_count / total * 100), 1) if total else 0,
|
||
"status": status,
|
||
"status_label": status_label,
|
||
"status_hint": status_hint,
|
||
"coverage_text": f"{ready_count}/{total}",
|
||
})
|
||
return cadences
|
||
|
||
|
||
def build_defined_ppt_jobs(
|
||
*,
|
||
latest_date: str | None = None,
|
||
report_types: Iterable[str] | str | None = None,
|
||
) -> list[PPTAutoJob]:
|
||
target_dt = _target_datetime(latest_date)
|
||
target = target_dt.strftime("%Y/%m/%d")
|
||
month_arg = target_dt.strftime("%Y/%m")
|
||
month_start, month_end, month_label = _month_bounds(target_dt)
|
||
quarter_label = _quarter_label(target_dt)
|
||
half_label = _half_year_label(target_dt)
|
||
ttm_label = _ttm_label(target_dt)
|
||
week_label = _week_label(target_dt)
|
||
category = _default_category()
|
||
forecast_event, forecast_date = _default_forecast_event(target_dt)
|
||
promo_start = (target_dt - timedelta(days=6)).strftime("%Y/%m/%d")
|
||
promo_prev_start = (target_dt - timedelta(days=13)).strftime("%Y/%m/%d")
|
||
promo_prev_end = (target_dt - timedelta(days=7)).strftime("%Y/%m/%d")
|
||
promo_arg = f"{promo_start}-{target}"
|
||
promo_label = f"{promo_start}~{target}"
|
||
promo_compare_arg = f"近7日:{promo_start}-{target}|前7日:{promo_prev_start}-{promo_prev_end}"
|
||
strategy_label = f"{month_label} 月策略(截至 {target_dt.strftime('%m/%d')})"
|
||
strategy_arg = f"{month_start}-{target}"
|
||
|
||
job_map = {
|
||
"daily": PPTAutoJob("daily", "每日日報", "daily", target, target, target, {
|
||
"report_type": "daily", "date": target,
|
||
}),
|
||
"weekly": PPTAutoJob("weekly", "週報", "weekly", "", target, "最新 7 日", {
|
||
"report_type": "weekly",
|
||
}),
|
||
"monthly": PPTAutoJob("monthly", "月報", "monthly", month_arg, target, month_label, {
|
||
"report_type": "monthly", "month": month_arg,
|
||
}),
|
||
"quarterly": PPTAutoJob("quarterly", "季報", "quarterly", quarter_label.replace(" ", "/"), target, quarter_label, {
|
||
"report_type": "quarterly", "period": quarter_label,
|
||
}),
|
||
"half_yearly": PPTAutoJob("half_yearly", "半年報", "half_yearly", half_label.replace(" ", "/"), target, half_label, {
|
||
"report_type": "half_yearly", "period": half_label,
|
||
}),
|
||
"annual": PPTAutoJob("annual", "年報", "annual", str(target_dt.year), target, str(target_dt.year), {
|
||
"report_type": "annual", "period": str(target_dt.year),
|
||
}),
|
||
"ttm": PPTAutoJob("ttm", "TTM 滾動 12 月", "ttm", "", target, ttm_label, {
|
||
"report_type": "ttm", "period": ttm_label,
|
||
}),
|
||
"strategy": PPTAutoJob("strategy", "策略(月)", "strategy", strategy_arg, target, strategy_label, {
|
||
"report_type": "strategy", "start": month_start, "end": target, "label": strategy_label,
|
||
}),
|
||
"competitor": PPTAutoJob("competitor", "競品(月)", "competitor", "monthly", target, f"{month_label} 月比較", {
|
||
"report_type": "competitor", "start": month_start, "end": target, "label": f"{month_label} 月比較",
|
||
}),
|
||
"competitor_v4": PPTAutoJob("competitor_v4", "競業五力", "competitor_v4", "PChome", target, "PChome 近 30 天", {
|
||
"report_type": "competitor_v4", "competitor": "PChome",
|
||
}),
|
||
"promo": PPTAutoJob("promo", "促銷(近 7 日)", "promo", promo_arg, target, promo_label, {
|
||
"report_type": "promo", "start": promo_start, "end": target, "label": promo_label,
|
||
}),
|
||
"promo_compare": PPTAutoJob("promo_compare", "多活動比較", "promo_compare", promo_compare_arg, target, "近7日 vs 前7日", {
|
||
"report_type": "promo_compare", "promos": promo_compare_arg,
|
||
}),
|
||
"forecast_pre_event": PPTAutoJob("forecast_pre_event", "檔期前瞻", "forecast_pre_event", f"{forecast_event} {forecast_date}", target, f"{forecast_event} {forecast_date}", {
|
||
"report_type": "forecast_pre_event", "event": forecast_event, "date": forecast_date,
|
||
}),
|
||
"vendor": PPTAutoJob("vendor", "廠商", "vendor", month_arg, target, month_label, {
|
||
"report_type": "vendor", "period": month_label,
|
||
}),
|
||
"category": PPTAutoJob("category", f"品類({category})", "category", f"{category} 90", target, f"{category} 近 90 天", {
|
||
"report_type": "category", "category": category, "days": 90,
|
||
}),
|
||
"customer": PPTAutoJob("customer", "客戶", "customer", month_arg, target, month_label, {
|
||
"report_type": "customer", "period": month_label,
|
||
}),
|
||
"new_product": PPTAutoJob("new_product", "新品追蹤", "new_product", "30", target, "近 30 天", {
|
||
"report_type": "new_product", "days": 30,
|
||
}),
|
||
"market_intel": PPTAutoJob("market_intel", "市場情報", "market_intel", week_label, target, week_label, {
|
||
"report_type": "market_intel", "week": week_label,
|
||
}),
|
||
"price_elasticity": PPTAutoJob("price_elasticity", "價格甜蜜點", "price_elasticity", "90", target, "全平台近 90 天", {
|
||
"report_type": "price_elasticity", "category": "all", "days": 90,
|
||
}),
|
||
}
|
||
return [job_map[key] for key in _parse_report_types(report_types)]
|
||
|
||
|
||
def _parse_cache_params(raw: str | None) -> dict:
|
||
if not raw:
|
||
return {}
|
||
try:
|
||
data = json.loads(raw)
|
||
return data if isinstance(data, dict) else {}
|
||
except Exception:
|
||
return {}
|
||
|
||
|
||
def _params_match(actual: dict, expected: dict) -> bool:
|
||
return all(actual.get(key) == value for key, value in expected.items())
|
||
|
||
|
||
def _ensure_generation_log_table() -> None:
|
||
session = get_session()
|
||
try:
|
||
session.execute(
|
||
sa_text(
|
||
"""
|
||
CREATE TABLE IF NOT EXISTS ppt_generation_runs (
|
||
id SERIAL PRIMARY KEY,
|
||
schedule_kind VARCHAR(40) NOT NULL,
|
||
report_type VARCHAR(50) NOT NULL,
|
||
target_label VARCHAR(160),
|
||
status VARCHAR(30) NOT NULL,
|
||
parameters_json TEXT,
|
||
file_path VARCHAR(500),
|
||
file_size INTEGER,
|
||
error_msg TEXT,
|
||
result_payload TEXT,
|
||
started_at TIMESTAMP WITHOUT TIME ZONE,
|
||
finished_at TIMESTAMP WITHOUT TIME ZONE
|
||
)
|
||
"""
|
||
)
|
||
)
|
||
session.execute(
|
||
sa_text(
|
||
"CREATE INDEX IF NOT EXISTS ix_ppt_generation_runs_report_type "
|
||
"ON ppt_generation_runs (report_type)"
|
||
)
|
||
)
|
||
session.execute(
|
||
sa_text(
|
||
"CREATE INDEX IF NOT EXISTS ix_ppt_generation_runs_started_at "
|
||
"ON ppt_generation_runs (started_at)"
|
||
)
|
||
)
|
||
session.execute(
|
||
sa_text(
|
||
"CREATE INDEX IF NOT EXISTS ix_ppt_generation_runs_schedule_kind "
|
||
"ON ppt_generation_runs (schedule_kind)"
|
||
)
|
||
)
|
||
session.commit()
|
||
except Exception:
|
||
session.rollback()
|
||
finally:
|
||
session.close()
|
||
|
||
|
||
def _log_generation_run(
|
||
job: PPTAutoJob,
|
||
*,
|
||
schedule_kind: str,
|
||
status: str,
|
||
started_at: datetime,
|
||
finished_at: datetime,
|
||
path: str | None = None,
|
||
error: str | None = None,
|
||
result_payload: dict | None = None,
|
||
) -> None:
|
||
try:
|
||
_ensure_generation_log_table()
|
||
file_size = os.path.getsize(path) if path and os.path.exists(path) else None
|
||
session = get_session()
|
||
try:
|
||
session.execute(
|
||
sa_text(
|
||
"""
|
||
INSERT INTO ppt_generation_runs (
|
||
schedule_kind, report_type, target_label, status,
|
||
parameters_json, file_path, file_size, error_msg,
|
||
result_payload, started_at, finished_at
|
||
)
|
||
VALUES (
|
||
:schedule_kind, :report_type, :target_label, :status,
|
||
:parameters_json, :file_path, :file_size, :error_msg,
|
||
:result_payload, :started_at, :finished_at
|
||
)
|
||
"""
|
||
),
|
||
{
|
||
"schedule_kind": schedule_kind,
|
||
"report_type": job.report_type,
|
||
"target_label": job.target_label,
|
||
"status": status,
|
||
"parameters_json": json.dumps(job.expected_params, ensure_ascii=False, sort_keys=True, default=str),
|
||
"file_path": path,
|
||
"file_size": file_size,
|
||
"error_msg": error,
|
||
"result_payload": json.dumps(result_payload or {}, ensure_ascii=False, default=str),
|
||
"started_at": started_at.replace(tzinfo=None),
|
||
"finished_at": finished_at.replace(tzinfo=None),
|
||
},
|
||
)
|
||
session.commit()
|
||
finally:
|
||
session.close()
|
||
except Exception:
|
||
# 排程紀錄不能阻斷簡報產出主流程。
|
||
pass
|
||
|
||
|
||
def _normalize_ppt_cache_parameters(parameters: dict) -> str:
|
||
"""與 OpenClaw PPT cache key 使用同一套 tpl_ver 參數正規化。"""
|
||
try:
|
||
from routes.openclaw_bot_routes import _normalize_ppt_parameters
|
||
|
||
return _normalize_ppt_parameters(parameters)
|
||
except Exception:
|
||
return json.dumps(parameters or {}, ensure_ascii=False, sort_keys=True, separators=(",", ":"))
|
||
|
||
|
||
def _expire_matching_ppt_cache(job: PPTAutoJob) -> int:
|
||
"""只失效這次重跑目標的 active cache,避免按「重跑」仍拿到舊 PPT。"""
|
||
params = _normalize_ppt_cache_parameters(job.expected_params)
|
||
now = datetime.now(TAIPEI_TZ).replace(tzinfo=None)
|
||
expired_at = now - timedelta(minutes=1)
|
||
try:
|
||
session = get_session()
|
||
try:
|
||
result = session.execute(
|
||
sa_text(
|
||
"""
|
||
UPDATE ppt_reports
|
||
SET expires_at = :expired_at
|
||
WHERE report_type = :report_type
|
||
AND parameters = :parameters
|
||
AND (expires_at IS NULL OR expires_at > :now)
|
||
"""
|
||
),
|
||
{
|
||
"expired_at": expired_at,
|
||
"report_type": job.report_type,
|
||
"parameters": params,
|
||
"now": now,
|
||
},
|
||
)
|
||
session.commit()
|
||
return int(getattr(result, "rowcount", 0) or 0)
|
||
finally:
|
||
session.close()
|
||
except Exception:
|
||
return 0
|
||
|
||
|
||
def get_defined_report_coverage(
|
||
*,
|
||
month_start: datetime,
|
||
month_end: datetime,
|
||
reports_dir: str | os.PathLike[str] | None = None,
|
||
report_types: Iterable[str] | str | None = None,
|
||
) -> dict:
|
||
jobs = build_defined_ppt_jobs(report_types=report_types)
|
||
selected_types = [job.report_type for job in jobs]
|
||
counts = {key: 0 for key in selected_types}
|
||
exact_counts = {key: 0 for key in selected_types}
|
||
sources = {key: set() for key in selected_types}
|
||
latest_generated_at = {key: None for key in selected_types}
|
||
latest_file_path = {key: None for key in selected_types}
|
||
expected_params = {job.report_type: job.expected_params for job in jobs}
|
||
|
||
try:
|
||
session = get_session()
|
||
try:
|
||
rows = session.execute(
|
||
sa_text(
|
||
"""
|
||
SELECT report_type, parameters, file_path, generated_at
|
||
FROM ppt_reports
|
||
WHERE generated_at >= :month_start
|
||
AND generated_at < :month_end
|
||
"""
|
||
),
|
||
{"month_start": month_start, "month_end": month_end},
|
||
).fetchall()
|
||
for report_type, parameters, file_path, generated_at in rows:
|
||
if report_type in counts:
|
||
counts[report_type] += 1
|
||
sources[report_type].add("database")
|
||
if (
|
||
latest_generated_at[report_type] is None
|
||
or (generated_at and generated_at > latest_generated_at[report_type])
|
||
):
|
||
latest_generated_at[report_type] = generated_at
|
||
latest_file_path[report_type] = file_path
|
||
if _params_match(_parse_cache_params(parameters), expected_params[report_type]):
|
||
exact_counts[report_type] += 1
|
||
finally:
|
||
session.close()
|
||
except Exception:
|
||
pass
|
||
|
||
root = Path(reports_dir or os.getenv("REPORTS_DIR", "/app/data/reports"))
|
||
if root.is_dir():
|
||
month_start_ts = month_start.timestamp()
|
||
month_end_ts = month_end.timestamp()
|
||
for path in root.iterdir():
|
||
if not path.is_file() or path.is_symlink() or path.suffix.lower() != ".pptx":
|
||
continue
|
||
try:
|
||
mtime = path.stat().st_mtime
|
||
except OSError:
|
||
continue
|
||
if not (month_start_ts <= mtime < month_end_ts):
|
||
continue
|
||
for report_type in selected_types:
|
||
if path.name.startswith(REPORT_PREFIXES[report_type]):
|
||
counts[report_type] += 1
|
||
sources[report_type].add("filesystem")
|
||
if latest_generated_at[report_type] is None or mtime > latest_generated_at[report_type].timestamp():
|
||
latest_generated_at[report_type] = datetime.fromtimestamp(mtime)
|
||
latest_file_path[report_type] = str(path)
|
||
|
||
items = []
|
||
for job in jobs:
|
||
count = counts[job.report_type]
|
||
exact_count = exact_counts[job.report_type]
|
||
if exact_count > 0:
|
||
status = "ready"
|
||
status_label = "已產出"
|
||
status_hint = "檔案參數與本期定義相符。"
|
||
elif count > 0:
|
||
status = "partial"
|
||
status_label = "有其他版本"
|
||
status_hint = "找到同類簡報,但參數或目標期別不完全相符。"
|
||
else:
|
||
status = "missing"
|
||
status_label = "待排程補齊"
|
||
status_hint = "尚未找到符合定義的檔案或 DB 紀錄。"
|
||
items.append({
|
||
"key": job.report_type,
|
||
"label": job.label,
|
||
"target_label": job.target_label,
|
||
"count": count,
|
||
"exact_count": exact_count,
|
||
"ready": status == "ready",
|
||
"has_other_versions": status == "partial",
|
||
"status": status,
|
||
"status_label": status_label,
|
||
"status_hint": status_hint,
|
||
"sources": sorted(sources[job.report_type]),
|
||
"latest_generated_at": (
|
||
latest_generated_at[job.report_type].strftime("%Y-%m-%d %H:%M")
|
||
if latest_generated_at[job.report_type] else None
|
||
),
|
||
"latest_file_path": latest_file_path[job.report_type],
|
||
"latest_file_name": os.path.basename(latest_file_path[job.report_type]) if latest_file_path[job.report_type] else "",
|
||
"expected_params": job.expected_params,
|
||
})
|
||
missing = [item for item in items if not item["ready"]]
|
||
cadences = get_schedule_cadence_status(items)
|
||
return {
|
||
"enabled": is_ppt_auto_generation_enabled(),
|
||
"items": items,
|
||
"cadences": cadences,
|
||
"cadence_summary": "、".join(cadence["schedule_text"] for cadence in cadences),
|
||
"missing_report_types": [item["key"] for item in missing],
|
||
"missing_count": len(missing),
|
||
"ready_count": len(items) - len(missing),
|
||
"total": len(items),
|
||
"last_run": _LAST_RUN,
|
||
}
|
||
|
||
|
||
def get_generation_run_history(
|
||
*,
|
||
month_start: datetime | None = None,
|
||
month_end: datetime | None = None,
|
||
limit: int = 24,
|
||
) -> list[dict]:
|
||
"""Read persisted PPT generation runs without blocking the observability page."""
|
||
safe_limit = max(1, min(int(limit or 24), 100))
|
||
where_clauses = []
|
||
params: dict = {"limit": safe_limit}
|
||
if month_start is not None:
|
||
where_clauses.append("started_at >= :month_start")
|
||
params["month_start"] = month_start
|
||
if month_end is not None:
|
||
where_clauses.append("started_at < :month_end")
|
||
params["month_end"] = month_end
|
||
where_sql = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else ""
|
||
|
||
try:
|
||
session = get_session()
|
||
try:
|
||
rows = session.execute(
|
||
sa_text(
|
||
f"""
|
||
SELECT schedule_kind, report_type, target_label, status,
|
||
file_path, file_size, error_msg, started_at, finished_at
|
||
FROM ppt_generation_runs
|
||
{where_sql}
|
||
ORDER BY started_at DESC NULLS LAST
|
||
LIMIT :limit
|
||
"""
|
||
),
|
||
params,
|
||
).fetchall()
|
||
finally:
|
||
session.close()
|
||
except Exception:
|
||
return []
|
||
|
||
def _format_dt(value) -> str:
|
||
if hasattr(value, "strftime"):
|
||
return value.strftime("%Y-%m-%d %H:%M")
|
||
return str(value or "")
|
||
|
||
items = []
|
||
for row in rows:
|
||
schedule_kind = row[0] or "manual"
|
||
report_type = row[1] or ""
|
||
file_path = row[4] or ""
|
||
status = row[3] or ""
|
||
items.append({
|
||
"schedule_kind": schedule_kind,
|
||
"schedule_label": SCHEDULE_CADENCES.get(schedule_kind, {}).get("label", "手動"),
|
||
"report_type": report_type,
|
||
"report_label": REPORT_TYPE_LABELS.get(report_type, report_type or "未知"),
|
||
"target_label": row[2] or "",
|
||
"status": status,
|
||
"status_label": {
|
||
"ready": "已產出",
|
||
"missing_file": "未落盤",
|
||
"error": "失敗",
|
||
"planned": "已規劃",
|
||
}.get(status, status or "未知"),
|
||
"file_name": os.path.basename(file_path) if file_path else "",
|
||
"file_size_kb": round(float(row[5] or 0) / 1024, 1) if row[5] else None,
|
||
"error_msg": row[6] or "",
|
||
"started_at": _format_dt(row[7]),
|
||
"finished_at": _format_dt(row[8]),
|
||
})
|
||
return items
|
||
|
||
|
||
def _generate_job(job: PPTAutoJob, *, force: bool = False, schedule_kind: str = "manual") -> tuple[str | None, int]:
|
||
from routes import openclaw_bot_routes as bot_routes
|
||
|
||
original_send_message = getattr(bot_routes, "send_message", None)
|
||
invalidated_count = _expire_matching_ppt_cache(job) if force else 0
|
||
scheduled_fast_fallback = schedule_kind != "manual"
|
||
previous_fast_fallback = os.environ.get("PPT_SCHEDULED_FAST_FALLBACK")
|
||
previous_mcp_fast_fallback = os.environ.get("MCP_FAST_STATIC_FALLBACK")
|
||
|
||
def _noop_send_message(*_args, **_kwargs):
|
||
return None
|
||
|
||
if original_send_message is not None:
|
||
bot_routes.send_message = _noop_send_message
|
||
try:
|
||
if scheduled_fast_fallback:
|
||
os.environ["PPT_SCHEDULED_FAST_FALLBACK"] = "true"
|
||
os.environ["MCP_FAST_STATIC_FALLBACK"] = "true"
|
||
path = bot_routes._generate_ppt_cmd(
|
||
job.sub_type,
|
||
job.sub_arg,
|
||
0,
|
||
job.target_date,
|
||
_reply_to=None,
|
||
)
|
||
return path, invalidated_count
|
||
finally:
|
||
if scheduled_fast_fallback:
|
||
if previous_fast_fallback is None:
|
||
os.environ.pop("PPT_SCHEDULED_FAST_FALLBACK", None)
|
||
else:
|
||
os.environ["PPT_SCHEDULED_FAST_FALLBACK"] = previous_fast_fallback
|
||
if previous_mcp_fast_fallback is None:
|
||
os.environ.pop("MCP_FAST_STATIC_FALLBACK", None)
|
||
else:
|
||
os.environ["MCP_FAST_STATIC_FALLBACK"] = previous_mcp_fast_fallback
|
||
if original_send_message is not None:
|
||
bot_routes.send_message = original_send_message
|
||
|
||
|
||
def generate_defined_ppt_reports(
|
||
*,
|
||
report_types: Iterable[str] | str | None = None,
|
||
schedule_kind: str = "manual",
|
||
force: bool = False,
|
||
dry_run: bool = False,
|
||
max_jobs: int | None = None,
|
||
) -> dict:
|
||
global _LAST_RUN
|
||
|
||
if not force and not is_ppt_auto_generation_enabled():
|
||
result = {
|
||
"ok": False,
|
||
"status": "disabled",
|
||
"message": "PPT_AUTO_GENERATION_ENABLED=false",
|
||
"jobs": [],
|
||
}
|
||
_LAST_RUN = result
|
||
return result
|
||
|
||
jobs = build_defined_ppt_jobs(report_types=report_types)
|
||
if max_jobs is not None:
|
||
jobs = jobs[: max(0, int(max_jobs))]
|
||
|
||
if dry_run:
|
||
return {
|
||
"ok": True,
|
||
"status": "planned",
|
||
"jobs": [asdict(job) for job in jobs],
|
||
}
|
||
|
||
if not _RUN_LOCK.acquire(blocking=False):
|
||
return {
|
||
"ok": True,
|
||
"status": "already_running",
|
||
"message": "PPT auto-generation is already running.",
|
||
"jobs": [],
|
||
"last_run": _LAST_RUN,
|
||
}
|
||
|
||
started_at = datetime.now(TAIPEI_TZ)
|
||
results = []
|
||
try:
|
||
for job in jobs:
|
||
item = asdict(job)
|
||
job_started_at = datetime.now(TAIPEI_TZ)
|
||
try:
|
||
path, invalidated_count = _generate_job(job, force=force, schedule_kind=schedule_kind)
|
||
item["path"] = path
|
||
item["cache_invalidated"] = invalidated_count
|
||
item["exists"] = bool(path and os.path.exists(path))
|
||
item["status"] = "ready" if item["exists"] else "missing_file"
|
||
_log_generation_run(
|
||
job,
|
||
schedule_kind=schedule_kind,
|
||
status=item["status"],
|
||
path=path,
|
||
started_at=job_started_at,
|
||
finished_at=datetime.now(TAIPEI_TZ),
|
||
result_payload=item,
|
||
)
|
||
except Exception as exc:
|
||
item["status"] = "error"
|
||
item["error"] = f"{type(exc).__name__}: {str(exc)[:220]}"
|
||
_log_generation_run(
|
||
job,
|
||
schedule_kind=schedule_kind,
|
||
status="error",
|
||
error=item["error"],
|
||
started_at=job_started_at,
|
||
finished_at=datetime.now(TAIPEI_TZ),
|
||
result_payload=item,
|
||
)
|
||
results.append(item)
|
||
|
||
finished_at = datetime.now(TAIPEI_TZ)
|
||
result = {
|
||
"ok": True,
|
||
"status": "completed",
|
||
"started_at": started_at.strftime("%Y-%m-%d %H:%M:%S"),
|
||
"finished_at": finished_at.strftime("%Y-%m-%d %H:%M:%S"),
|
||
"duration_sec": round((finished_at - started_at).total_seconds(), 1),
|
||
"jobs": results,
|
||
"ready": sum(1 for item in results if item.get("status") == "ready"),
|
||
"errors": sum(1 for item in results if item.get("status") == "error"),
|
||
}
|
||
_LAST_RUN = result
|
||
return result
|
||
finally:
|
||
_RUN_LOCK.release()
|
||
|
||
|
||
def start_defined_ppt_generation_background(
|
||
*,
|
||
report_types: Sequence[str] | str | None = None,
|
||
schedule_kind: str = "manual",
|
||
force: bool = False,
|
||
) -> dict:
|
||
if _RUN_LOCK.locked():
|
||
return {
|
||
"ok": True,
|
||
"status": "already_running",
|
||
"message": "PPT auto-generation is already running.",
|
||
"last_run": _LAST_RUN,
|
||
}
|
||
|
||
def _run():
|
||
generate_defined_ppt_reports(
|
||
report_types=report_types,
|
||
schedule_kind=schedule_kind,
|
||
force=force,
|
||
)
|
||
|
||
thread = threading.Thread(target=_run, name="ppt-auto-generation", daemon=True)
|
||
thread.start()
|
||
return {
|
||
"ok": True,
|
||
"status": "queued",
|
||
"message": "PPT auto-generation queued.",
|
||
"report_types": _parse_report_types(report_types),
|
||
"schedule_kind": schedule_kind,
|
||
}
|
||
|
||
|
||
def get_last_generation_status() -> dict | None:
|
||
return _LAST_RUN
|
||
|
||
|
||
def get_due_schedule_kinds(now: datetime | None = None) -> list[str]:
|
||
current = now or datetime.now(TAIPEI_TZ)
|
||
kinds = ["daily"]
|
||
if current.weekday() == 0:
|
||
kinds.append("weekly")
|
||
if current.day == 1:
|
||
kinds.append("monthly")
|
||
if current.day == 1 and current.month in (1, 4, 7, 10):
|
||
kinds.append("quarterly")
|
||
if current.day == 1 and current.month in (1, 7):
|
||
kinds.append("half_yearly")
|
||
if current.day == 1 and current.month == 1:
|
||
kinds.append("annual")
|
||
return kinds
|
||
|
||
|
||
def _parse_cadence_time(kind: str) -> tuple[int, int]:
|
||
raw = (SCHEDULE_CADENCES.get(kind) or {}).get("time", "00:00")
|
||
hour, minute = raw.split(":", 1)
|
||
return int(hour), int(minute)
|
||
|
||
|
||
def _combine_local(run_date: date, kind: str) -> datetime:
|
||
hour, minute = _parse_cadence_time(kind)
|
||
return datetime(run_date.year, run_date.month, run_date.day, hour, minute)
|
||
|
||
|
||
def _previous_month(year: int, month: int) -> tuple[int, int]:
|
||
if month == 1:
|
||
return year - 1, 12
|
||
return year, month - 1
|
||
|
||
|
||
def _latest_month_occurrence(current: datetime, kind: str, allowed_months: set[int] | None = None) -> datetime:
|
||
year, month = current.year, current.month
|
||
for _ in range(24):
|
||
if allowed_months is None or month in allowed_months:
|
||
candidate = _combine_local(date(year, month, 1), kind)
|
||
if current >= candidate:
|
||
return candidate
|
||
year, month = _previous_month(year, month)
|
||
return _combine_local(date(current.year, current.month, 1), kind)
|
||
|
||
|
||
def get_latest_schedule_occurrence(kind: str, now: datetime | None = None) -> datetime | None:
|
||
"""Return the most recent planned occurrence for a PPT schedule kind.
|
||
|
||
The Python `schedule` package intentionally does not replay missed jobs.
|
||
This helper lets the PPT pipeline detect a missed daily/weekly/monthly slot
|
||
after long crawler or AI jobs release the scheduler loop.
|
||
"""
|
||
if kind not in SCHEDULE_PROFILES:
|
||
return None
|
||
current = now or datetime.now(TAIPEI_TZ)
|
||
if current.tzinfo is not None:
|
||
current = current.astimezone(TAIPEI_TZ).replace(tzinfo=None)
|
||
|
||
if kind == "daily":
|
||
candidate = _combine_local(current.date(), kind)
|
||
return candidate if current >= candidate else candidate - timedelta(days=1)
|
||
|
||
if kind == "weekly":
|
||
monday = current.date() - timedelta(days=current.weekday())
|
||
candidate = _combine_local(monday, kind)
|
||
return candidate if current >= candidate else candidate - timedelta(days=7)
|
||
|
||
if kind == "monthly":
|
||
return _latest_month_occurrence(current, kind)
|
||
if kind == "quarterly":
|
||
return _latest_month_occurrence(current, kind, {1, 4, 7, 10})
|
||
if kind == "half_yearly":
|
||
return _latest_month_occurrence(current, kind, {1, 7})
|
||
if kind == "annual":
|
||
return _latest_month_occurrence(current, kind, {1})
|
||
return None
|
||
|
||
|
||
def _ready_report_types_since(jobs: Sequence[PPTAutoJob], since: datetime) -> set[str]:
|
||
"""Return report types that already have matching DB rows and files."""
|
||
if not jobs:
|
||
return set()
|
||
expected_params = {job.report_type: job.expected_params for job in jobs}
|
||
ready: set[str] = set()
|
||
try:
|
||
session = get_session()
|
||
try:
|
||
rows = session.execute(
|
||
sa_text(
|
||
"""
|
||
SELECT report_type, parameters, file_path
|
||
FROM ppt_reports
|
||
WHERE generated_at >= :since
|
||
"""
|
||
),
|
||
{"since": since},
|
||
).fetchall()
|
||
finally:
|
||
session.close()
|
||
except Exception:
|
||
return ready
|
||
|
||
for report_type, parameters, file_path in rows:
|
||
report_type = str(report_type or "")
|
||
if report_type not in expected_params or report_type in ready:
|
||
continue
|
||
if file_path and not os.path.exists(str(file_path)):
|
||
continue
|
||
if _params_match(_parse_cache_params(parameters), expected_params[report_type]):
|
||
ready.add(report_type)
|
||
return ready
|
||
|
||
|
||
def get_scheduled_ppt_catchup_plan(
|
||
*,
|
||
now: datetime | None = None,
|
||
schedule_kinds: Iterable[str] | None = None,
|
||
) -> list[dict]:
|
||
"""Build a catch-up plan for missed periodic PPT generations."""
|
||
current = now or datetime.now(TAIPEI_TZ)
|
||
kinds = list(schedule_kinds or SCHEDULE_PROFILES.keys())
|
||
plan: list[dict] = []
|
||
for kind in kinds:
|
||
report_types = SCHEDULE_PROFILES.get(kind)
|
||
scheduled_at = get_latest_schedule_occurrence(kind, current)
|
||
if not report_types or scheduled_at is None:
|
||
continue
|
||
jobs = build_defined_ppt_jobs(report_types=report_types)
|
||
ready_types = _ready_report_types_since(jobs, scheduled_at)
|
||
missing_jobs = [job for job in jobs if job.report_type not in ready_types]
|
||
plan.append({
|
||
"schedule_kind": kind,
|
||
"schedule_label": SCHEDULE_CADENCES.get(kind, {}).get("label", kind),
|
||
"scheduled_at": scheduled_at.strftime("%Y-%m-%d %H:%M"),
|
||
"ready_report_types": sorted(ready_types),
|
||
"missing_report_types": [job.report_type for job in missing_jobs],
|
||
"missing_report_labels": [REPORT_TYPE_LABELS.get(job.report_type, job.report_type) for job in missing_jobs],
|
||
"ready": not missing_jobs,
|
||
})
|
||
return plan
|
||
|
||
|
||
def catch_up_scheduled_ppt_reports(
|
||
*,
|
||
now: datetime | None = None,
|
||
force: bool = False,
|
||
schedule_kinds: Iterable[str] | None = None,
|
||
) -> dict:
|
||
"""Generate missing scheduled PPT decks that were skipped by a blocked loop."""
|
||
global _LAST_RUN
|
||
|
||
if not force and not is_ppt_auto_generation_enabled():
|
||
result = {
|
||
"ok": False,
|
||
"status": "disabled",
|
||
"message": "PPT_AUTO_GENERATION_ENABLED=false",
|
||
"runs": [],
|
||
}
|
||
_LAST_RUN = result
|
||
return result
|
||
|
||
plan = get_scheduled_ppt_catchup_plan(now=now, schedule_kinds=schedule_kinds)
|
||
runs = []
|
||
for item in plan:
|
||
missing = item.get("missing_report_types") or []
|
||
if not missing:
|
||
continue
|
||
run = generate_defined_ppt_reports(
|
||
report_types=missing,
|
||
schedule_kind=item["schedule_kind"],
|
||
force=force,
|
||
)
|
||
run["schedule_kind"] = item["schedule_kind"]
|
||
run["catchup_scheduled_at"] = item.get("scheduled_at")
|
||
run["catchup_missing_report_types"] = missing
|
||
runs.append(run)
|
||
|
||
result = {
|
||
"ok": all(run.get("ok", False) for run in runs) if runs else True,
|
||
"status": "completed" if runs else "skipped",
|
||
"plan": plan,
|
||
"runs": runs,
|
||
"generated_kinds": [
|
||
run.get("schedule_kind")
|
||
for run in runs
|
||
if run.get("schedule_kind")
|
||
],
|
||
"ready": sum(int(run.get("ready") or 0) for run in runs),
|
||
"errors": sum(int(run.get("errors") or 0) for run in runs),
|
||
}
|
||
_LAST_RUN = result
|
||
return result
|
||
|
||
|
||
def start_scheduled_ppt_catchup_background(
|
||
*,
|
||
now: datetime | None = None,
|
||
force: bool = False,
|
||
schedule_kinds: Iterable[str] | None = None,
|
||
) -> dict:
|
||
"""Queue scheduled catch-up without blocking the scheduler main loop."""
|
||
if _RUN_LOCK.locked():
|
||
return {
|
||
"ok": True,
|
||
"status": "already_running",
|
||
"message": "PPT auto-generation is already running.",
|
||
"last_run": _LAST_RUN,
|
||
}
|
||
|
||
planned_kinds = list(schedule_kinds or SCHEDULE_PROFILES.keys())
|
||
|
||
def _run():
|
||
catch_up_scheduled_ppt_reports(
|
||
now=now,
|
||
force=force,
|
||
schedule_kinds=planned_kinds,
|
||
)
|
||
|
||
thread = threading.Thread(target=_run, name="ppt-auto-generation-catchup", daemon=True)
|
||
thread.start()
|
||
return {
|
||
"ok": True,
|
||
"status": "queued",
|
||
"message": "PPT scheduled catch-up queued.",
|
||
"schedule_kinds": planned_kinds,
|
||
}
|
||
|
||
|
||
def generate_scheduled_ppt_reports(
|
||
*,
|
||
schedule_kind: str | None = None,
|
||
force: bool = False,
|
||
) -> dict:
|
||
kinds = [schedule_kind] if schedule_kind else get_due_schedule_kinds()
|
||
runs = []
|
||
for kind in kinds:
|
||
report_types = SCHEDULE_PROFILES.get(kind)
|
||
if not report_types:
|
||
runs.append({
|
||
"ok": False,
|
||
"status": "unknown_schedule_kind",
|
||
"schedule_kind": kind,
|
||
"jobs": [],
|
||
})
|
||
continue
|
||
runs.append(
|
||
generate_defined_ppt_reports(
|
||
report_types=report_types,
|
||
schedule_kind=kind,
|
||
force=force,
|
||
)
|
||
)
|
||
|
||
return {
|
||
"ok": all(run.get("ok", False) for run in runs) if runs else True,
|
||
"status": "completed",
|
||
"schedule_kinds": kinds,
|
||
"runs": runs,
|
||
"ready": sum(int(run.get("ready") or 0) for run in runs),
|
||
"errors": sum(int(run.get("errors") or 0) for run in runs),
|
||
"jobs": [job for run in runs for job in run.get("jobs", [])],
|
||
}
|