Files
ewoooc/services/ppt_auto_generation_service.py
OoO cb02cd350f
Some checks failed
CD Pipeline / deploy (push) Has been cancelled
feat: schedule full ppt auto generation cadence
2026-05-18 14:22:09 +08:00

724 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
PPT auto-generation orchestration.
The observability page audits generated decks, but the scheduler previously
only ran the vision audit. This service fills that gap by materializing the
defined deck set before the audit window.
"""
from __future__ import annotations
import calendar
import json
import os
import threading
from dataclasses import asdict, dataclass
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Iterable, Sequence
from sqlalchemy import text as sa_text
from database.manager import get_session
TAIPEI_TZ = timezone(timedelta(hours=8))
DEFINED_REPORT_TYPES = (
"daily",
"weekly",
"monthly",
"quarterly",
"half_yearly",
"annual",
"ttm",
"strategy",
"competitor",
"competitor_v4",
"promo",
"promo_compare",
"forecast_pre_event",
"vendor",
"category",
"customer",
"new_product",
"market_intel",
"price_elasticity",
)
REPORT_TYPE_LABELS = {
"daily": "每日日報",
"weekly": "週報",
"monthly": "月報",
"quarterly": "季報",
"half_yearly": "半年報",
"annual": "年報",
"ttm": "TTM 滾動 12 月",
"strategy": "策略",
"competitor": "競品",
"competitor_v4": "競業五力",
"promo": "促銷",
"promo_compare": "多活動比較",
"forecast_pre_event": "檔期前瞻",
"vendor": "廠商",
"category": "品類",
"customer": "客戶",
"new_product": "新品追蹤",
"market_intel": "市場情報",
"price_elasticity": "價格甜蜜點",
}
REPORT_PREFIXES = {key: f"ocbot_{key}_" for key in DEFINED_REPORT_TYPES}
SCHEDULE_PROFILES = {
"daily": ("daily",),
"weekly": ("weekly", "market_intel"),
"monthly": (
"monthly",
"strategy",
"competitor",
"competitor_v4",
"promo",
"promo_compare",
"forecast_pre_event",
"vendor",
"category",
"customer",
"new_product",
"price_elasticity",
"ttm",
),
"quarterly": ("quarterly",),
"half_yearly": ("half_yearly",),
"annual": ("annual",),
}
_RUN_LOCK = threading.Lock()
_LAST_RUN: dict | None = None
@dataclass(frozen=True)
class PPTAutoJob:
report_type: str
label: str
sub_type: str
sub_arg: str
target_date: str
target_label: str
expected_params: dict
def _truthy(value: str | None, default: bool = False) -> bool:
if value is None:
return default
return value.strip().lower() in {"1", "true", "yes", "on"}
def is_ppt_auto_generation_enabled() -> bool:
return _truthy(os.getenv("PPT_AUTO_GENERATION_ENABLED"), default=True)
def _parse_report_types(report_types: Iterable[str] | str | None) -> list[str]:
if report_types is None:
raw = os.getenv("PPT_AUTO_REPORT_TYPES", ",".join(DEFINED_REPORT_TYPES))
parts = raw.split(",")
elif isinstance(report_types, str):
parts = report_types.split(",")
else:
parts = list(report_types)
parsed = []
for part in parts:
key = str(part or "").strip().lower()
if key == "all":
return list(DEFINED_REPORT_TYPES)
if key in DEFINED_REPORT_TYPES and key not in parsed:
parsed.append(key)
return parsed or list(DEFINED_REPORT_TYPES)
def _latest_sales_date() -> str | None:
try:
from routes.openclaw_bot_routes import latest_date
return latest_date()
except Exception:
return None
def _normalise_date(value: str | None) -> str:
if value:
cleaned = value.strip().replace("-", "/")
try:
dt = datetime.strptime(cleaned, "%Y/%m/%d")
return dt.strftime("%Y/%m/%d")
except ValueError:
pass
return (datetime.now(TAIPEI_TZ) - timedelta(days=1)).strftime("%Y/%m/%d")
def _target_datetime(latest_date: str | None = None) -> datetime:
target = _normalise_date(latest_date or _latest_sales_date())
return datetime.strptime(target, "%Y/%m/%d")
def _month_bounds(target_dt: datetime) -> tuple[str, str, str]:
start = f"{target_dt.year}/{target_dt.month:02d}/01"
last_day = calendar.monthrange(target_dt.year, target_dt.month)[1]
end = f"{target_dt.year}/{target_dt.month:02d}/{last_day:02d}"
label = f"{target_dt.year}/{target_dt.month:02d}"
return start, end, label
def _quarter_label(target_dt: datetime) -> str:
quarter = ((target_dt.month - 1) // 3) + 1
return f"{target_dt.year} Q{quarter}"
def _half_year_label(target_dt: datetime) -> str:
half = 1 if target_dt.month <= 6 else 2
return f"{target_dt.year} H{half}"
def _ttm_label(target_dt: datetime) -> str:
ttm_start = target_dt.date().replace(day=1) - timedelta(days=365)
ttm_start = ttm_start.replace(day=1)
return f"TTM {ttm_start.strftime('%Y/%m/%d')[:7]}~{target_dt.strftime('%Y/%m/%d')[:7]}"
def _week_label(target_dt: datetime) -> str:
week_start = target_dt.date() - timedelta(days=target_dt.weekday())
return f"{week_start.strftime('%Y/%m/%d')} 起一週"
def _default_category() -> str:
return os.getenv("PPT_AUTO_DEFAULT_CATEGORY", "美妝保養").strip() or "美妝保養"
def _default_forecast_event(target_dt: datetime) -> tuple[str, str]:
events = [
("618", f"{target_dt.year}/06/18"),
("七夕", f"{target_dt.year}/08/19"),
("雙11", f"{target_dt.year}/11/11"),
("雙12", f"{target_dt.year}/12/12"),
("母親節", f"{target_dt.year + 1}/05/10"),
]
today = target_dt.date()
for name, date_str in events:
try:
if datetime.strptime(date_str, "%Y/%m/%d").date() >= today:
return name, date_str
except ValueError:
continue
return "雙11", f"{target_dt.year}/11/11"
def get_report_type_options() -> list[dict]:
return [
{
"key": key,
"label": REPORT_TYPE_LABELS[key],
"prefix": REPORT_PREFIXES[key],
}
for key in DEFINED_REPORT_TYPES
] + [{"key": "all", "label": "全部", "prefix": "all"}]
def build_defined_ppt_jobs(
*,
latest_date: str | None = None,
report_types: Iterable[str] | str | None = None,
) -> list[PPTAutoJob]:
target_dt = _target_datetime(latest_date)
target = target_dt.strftime("%Y/%m/%d")
month_arg = target_dt.strftime("%Y/%m")
month_start, month_end, month_label = _month_bounds(target_dt)
quarter_label = _quarter_label(target_dt)
half_label = _half_year_label(target_dt)
ttm_label = _ttm_label(target_dt)
week_label = _week_label(target_dt)
category = _default_category()
forecast_event, forecast_date = _default_forecast_event(target_dt)
promo_start = (target_dt - timedelta(days=6)).strftime("%Y/%m/%d")
promo_prev_start = (target_dt - timedelta(days=13)).strftime("%Y/%m/%d")
promo_prev_end = (target_dt - timedelta(days=7)).strftime("%Y/%m/%d")
promo_arg = f"{promo_start}-{target}"
promo_compare_arg = f"近7日:{promo_start}-{target}|前7日:{promo_prev_start}-{promo_prev_end}"
job_map = {
"daily": PPTAutoJob("daily", "每日日報", "daily", target, target, target, {
"report_type": "daily", "date": target,
}),
"weekly": PPTAutoJob("weekly", "週報", "weekly", "", target, "最新 7 日", {
"report_type": "weekly",
}),
"monthly": PPTAutoJob("monthly", "月報", "monthly", month_arg, target, month_label, {
"report_type": "monthly", "month": month_arg,
}),
"quarterly": PPTAutoJob("quarterly", "季報", "quarterly", quarter_label.replace(" ", "/"), target, quarter_label, {
"report_type": "quarterly", "period": quarter_label,
}),
"half_yearly": PPTAutoJob("half_yearly", "半年報", "half_yearly", half_label.replace(" ", "/"), target, half_label, {
"report_type": "half_yearly", "period": half_label,
}),
"annual": PPTAutoJob("annual", "年報", "annual", str(target_dt.year), target, str(target_dt.year), {
"report_type": "annual", "period": str(target_dt.year),
}),
"ttm": PPTAutoJob("ttm", "TTM 滾動 12 月", "ttm", "", target, ttm_label, {
"report_type": "ttm", "period": ttm_label,
}),
"strategy": PPTAutoJob("strategy", "策略(月)", "strategy", month_arg, target, f"{month_label} 月策略", {
"report_type": "strategy", "start": month_start, "end": month_end, "label": f"{month_label} 月策略",
}),
"competitor": PPTAutoJob("competitor", "競品(月)", "competitor", "monthly", target, f"{month_label} 月比較", {
"report_type": "competitor", "start": month_start, "end": target, "label": f"{month_label} 月比較",
}),
"competitor_v4": PPTAutoJob("competitor_v4", "競業五力", "competitor_v4", "PChome", target, "PChome 近 30 天", {
"report_type": "competitor_v4", "competitor": "PChome",
}),
"promo": PPTAutoJob("promo", "促銷(近 7 日)", "promo", promo_arg, target, promo_arg, {
"report_type": "promo", "start": promo_start, "end": target, "label": promo_arg,
}),
"promo_compare": PPTAutoJob("promo_compare", "多活動比較", "promo_compare", promo_compare_arg, target, "近7日 vs 前7日", {
"report_type": "promo_compare", "promos": promo_compare_arg,
}),
"forecast_pre_event": PPTAutoJob("forecast_pre_event", "檔期前瞻", "forecast_pre_event", f"{forecast_event} {forecast_date}", target, f"{forecast_event} {forecast_date}", {
"report_type": "forecast_pre_event", "event": forecast_event, "date": forecast_date,
}),
"vendor": PPTAutoJob("vendor", "廠商", "vendor", month_arg, target, month_label, {
"report_type": "vendor", "period": month_label,
}),
"category": PPTAutoJob("category", f"品類({category}", "category", f"{category} 90", target, f"{category} 近 90 天", {
"report_type": "category", "category": category, "days": 90,
}),
"customer": PPTAutoJob("customer", "客戶", "customer", month_arg, target, month_label, {
"report_type": "customer", "period": month_label,
}),
"new_product": PPTAutoJob("new_product", "新品追蹤", "new_product", "30", target, "近 30 天", {
"report_type": "new_product", "days": 30,
}),
"market_intel": PPTAutoJob("market_intel", "市場情報", "market_intel", "", target, week_label, {
"report_type": "market_intel", "week": week_label,
}),
"price_elasticity": PPTAutoJob("price_elasticity", "價格甜蜜點", "price_elasticity", "90", target, "全平台近 90 天", {
"report_type": "price_elasticity", "category": "all", "days": 90,
}),
}
return [job_map[key] for key in _parse_report_types(report_types)]
def _parse_cache_params(raw: str | None) -> dict:
if not raw:
return {}
try:
data = json.loads(raw)
return data if isinstance(data, dict) else {}
except Exception:
return {}
def _params_match(actual: dict, expected: dict) -> bool:
return all(actual.get(key) == value for key, value in expected.items())
def _ensure_generation_log_table() -> None:
session = get_session()
try:
session.execute(
sa_text(
"""
CREATE TABLE IF NOT EXISTS ppt_generation_runs (
id SERIAL PRIMARY KEY,
schedule_kind VARCHAR(40) NOT NULL,
report_type VARCHAR(50) NOT NULL,
target_label VARCHAR(160),
status VARCHAR(30) NOT NULL,
parameters_json TEXT,
file_path VARCHAR(500),
file_size INTEGER,
error_msg TEXT,
result_payload TEXT,
started_at TIMESTAMP WITHOUT TIME ZONE,
finished_at TIMESTAMP WITHOUT TIME ZONE
)
"""
)
)
session.execute(
sa_text(
"CREATE INDEX IF NOT EXISTS ix_ppt_generation_runs_report_type "
"ON ppt_generation_runs (report_type)"
)
)
session.execute(
sa_text(
"CREATE INDEX IF NOT EXISTS ix_ppt_generation_runs_started_at "
"ON ppt_generation_runs (started_at)"
)
)
session.execute(
sa_text(
"CREATE INDEX IF NOT EXISTS ix_ppt_generation_runs_schedule_kind "
"ON ppt_generation_runs (schedule_kind)"
)
)
session.commit()
except Exception:
session.rollback()
finally:
session.close()
def _log_generation_run(
job: PPTAutoJob,
*,
schedule_kind: str,
status: str,
started_at: datetime,
finished_at: datetime,
path: str | None = None,
error: str | None = None,
result_payload: dict | None = None,
) -> None:
try:
_ensure_generation_log_table()
file_size = os.path.getsize(path) if path and os.path.exists(path) else None
session = get_session()
try:
session.execute(
sa_text(
"""
INSERT INTO ppt_generation_runs (
schedule_kind, report_type, target_label, status,
parameters_json, file_path, file_size, error_msg,
result_payload, started_at, finished_at
)
VALUES (
:schedule_kind, :report_type, :target_label, :status,
:parameters_json, :file_path, :file_size, :error_msg,
:result_payload, :started_at, :finished_at
)
"""
),
{
"schedule_kind": schedule_kind,
"report_type": job.report_type,
"target_label": job.target_label,
"status": status,
"parameters_json": json.dumps(job.expected_params, ensure_ascii=False, sort_keys=True, default=str),
"file_path": path,
"file_size": file_size,
"error_msg": error,
"result_payload": json.dumps(result_payload or {}, ensure_ascii=False, default=str),
"started_at": started_at.replace(tzinfo=None),
"finished_at": finished_at.replace(tzinfo=None),
},
)
session.commit()
finally:
session.close()
except Exception:
# 排程紀錄不能阻斷簡報產出主流程。
pass
def get_defined_report_coverage(
*,
month_start: datetime,
month_end: datetime,
reports_dir: str | os.PathLike[str] | None = None,
report_types: Iterable[str] | str | None = None,
) -> dict:
jobs = build_defined_ppt_jobs(report_types=report_types)
selected_types = [job.report_type for job in jobs]
counts = {key: 0 for key in selected_types}
exact_counts = {key: 0 for key in selected_types}
sources = {key: set() for key in selected_types}
latest_generated_at = {key: None for key in selected_types}
latest_file_path = {key: None for key in selected_types}
expected_params = {job.report_type: job.expected_params for job in jobs}
try:
session = get_session()
try:
rows = session.execute(
sa_text(
"""
SELECT report_type, parameters, file_path, generated_at
FROM ppt_reports
WHERE generated_at >= :month_start
AND generated_at < :month_end
"""
),
{"month_start": month_start, "month_end": month_end},
).fetchall()
for report_type, parameters, file_path, generated_at in rows:
if report_type in counts:
counts[report_type] += 1
sources[report_type].add("database")
if (
latest_generated_at[report_type] is None
or (generated_at and generated_at > latest_generated_at[report_type])
):
latest_generated_at[report_type] = generated_at
latest_file_path[report_type] = file_path
if _params_match(_parse_cache_params(parameters), expected_params[report_type]):
exact_counts[report_type] += 1
finally:
session.close()
except Exception:
pass
root = Path(reports_dir or os.getenv("REPORTS_DIR", "/app/data/reports"))
if root.is_dir():
month_start_ts = month_start.timestamp()
month_end_ts = month_end.timestamp()
for path in root.iterdir():
if not path.is_file() or path.is_symlink() or path.suffix.lower() != ".pptx":
continue
try:
mtime = path.stat().st_mtime
except OSError:
continue
if not (month_start_ts <= mtime < month_end_ts):
continue
for report_type in selected_types:
if path.name.startswith(REPORT_PREFIXES[report_type]):
counts[report_type] += 1
sources[report_type].add("filesystem")
if latest_generated_at[report_type] is None or mtime > latest_generated_at[report_type].timestamp():
latest_generated_at[report_type] = datetime.fromtimestamp(mtime)
latest_file_path[report_type] = str(path)
items = [
{
"key": job.report_type,
"label": job.label,
"target_label": job.target_label,
"count": counts[job.report_type],
"exact_count": exact_counts[job.report_type],
"ready": exact_counts[job.report_type] > 0,
"has_other_versions": counts[job.report_type] > 0 and exact_counts[job.report_type] == 0,
"sources": sorted(sources[job.report_type]),
"latest_generated_at": (
latest_generated_at[job.report_type].strftime("%Y-%m-%d %H:%M")
if latest_generated_at[job.report_type] else None
),
"latest_file_path": latest_file_path[job.report_type],
"expected_params": job.expected_params,
}
for job in jobs
]
missing = [item for item in items if not item["ready"]]
return {
"enabled": is_ppt_auto_generation_enabled(),
"items": items,
"missing_report_types": [item["key"] for item in missing],
"missing_count": len(missing),
"ready_count": len(items) - len(missing),
"total": len(items),
"last_run": _LAST_RUN,
}
def _generate_job(job: PPTAutoJob) -> str | None:
from routes import openclaw_bot_routes as bot_routes
original_send_message = getattr(bot_routes, "send_message", None)
def _noop_send_message(*_args, **_kwargs):
return None
if original_send_message is not None:
bot_routes.send_message = _noop_send_message
try:
return bot_routes._generate_ppt_cmd(
job.sub_type,
job.sub_arg,
0,
job.target_date,
_reply_to=None,
)
finally:
if original_send_message is not None:
bot_routes.send_message = original_send_message
def generate_defined_ppt_reports(
*,
report_types: Iterable[str] | str | None = None,
schedule_kind: str = "manual",
force: bool = False,
dry_run: bool = False,
max_jobs: int | None = None,
) -> dict:
global _LAST_RUN
if not force and not is_ppt_auto_generation_enabled():
result = {
"ok": False,
"status": "disabled",
"message": "PPT_AUTO_GENERATION_ENABLED=false",
"jobs": [],
}
_LAST_RUN = result
return result
jobs = build_defined_ppt_jobs(report_types=report_types)
if max_jobs is not None:
jobs = jobs[: max(0, int(max_jobs))]
if dry_run:
return {
"ok": True,
"status": "planned",
"jobs": [asdict(job) for job in jobs],
}
if not _RUN_LOCK.acquire(blocking=False):
return {
"ok": True,
"status": "already_running",
"message": "PPT auto-generation is already running.",
"jobs": [],
"last_run": _LAST_RUN,
}
started_at = datetime.now(TAIPEI_TZ)
results = []
try:
for job in jobs:
item = asdict(job)
job_started_at = datetime.now(TAIPEI_TZ)
try:
path = _generate_job(job)
item["path"] = path
item["exists"] = bool(path and os.path.exists(path))
item["status"] = "ready" if item["exists"] else "missing_file"
_log_generation_run(
job,
schedule_kind=schedule_kind,
status=item["status"],
path=path,
started_at=job_started_at,
finished_at=datetime.now(TAIPEI_TZ),
result_payload=item,
)
except Exception as exc:
item["status"] = "error"
item["error"] = f"{type(exc).__name__}: {str(exc)[:220]}"
_log_generation_run(
job,
schedule_kind=schedule_kind,
status="error",
error=item["error"],
started_at=job_started_at,
finished_at=datetime.now(TAIPEI_TZ),
result_payload=item,
)
results.append(item)
finished_at = datetime.now(TAIPEI_TZ)
result = {
"ok": True,
"status": "completed",
"started_at": started_at.strftime("%Y-%m-%d %H:%M:%S"),
"finished_at": finished_at.strftime("%Y-%m-%d %H:%M:%S"),
"duration_sec": round((finished_at - started_at).total_seconds(), 1),
"jobs": results,
"ready": sum(1 for item in results if item.get("status") == "ready"),
"errors": sum(1 for item in results if item.get("status") == "error"),
}
_LAST_RUN = result
return result
finally:
_RUN_LOCK.release()
def start_defined_ppt_generation_background(
*,
report_types: Sequence[str] | str | None = None,
schedule_kind: str = "manual",
force: bool = False,
) -> dict:
if _RUN_LOCK.locked():
return {
"ok": True,
"status": "already_running",
"message": "PPT auto-generation is already running.",
"last_run": _LAST_RUN,
}
def _run():
generate_defined_ppt_reports(
report_types=report_types,
schedule_kind=schedule_kind,
force=force,
)
thread = threading.Thread(target=_run, name="ppt-auto-generation", daemon=True)
thread.start()
return {
"ok": True,
"status": "queued",
"message": "PPT auto-generation queued.",
"report_types": _parse_report_types(report_types),
"schedule_kind": schedule_kind,
}
def get_last_generation_status() -> dict | None:
return _LAST_RUN
def get_due_schedule_kinds(now: datetime | None = None) -> list[str]:
current = now or datetime.now(TAIPEI_TZ)
kinds = ["daily"]
if current.weekday() == 0:
kinds.append("weekly")
if current.day == 1:
kinds.append("monthly")
if current.day == 1 and current.month in (1, 4, 7, 10):
kinds.append("quarterly")
if current.day == 1 and current.month in (1, 7):
kinds.append("half_yearly")
if current.day == 1 and current.month == 1:
kinds.append("annual")
return kinds
def generate_scheduled_ppt_reports(
*,
schedule_kind: str | None = None,
force: bool = False,
) -> dict:
kinds = [schedule_kind] if schedule_kind else get_due_schedule_kinds()
runs = []
for kind in kinds:
report_types = SCHEDULE_PROFILES.get(kind)
if not report_types:
runs.append({
"ok": False,
"status": "unknown_schedule_kind",
"schedule_kind": kind,
"jobs": [],
})
continue
runs.append(
generate_defined_ppt_reports(
report_types=report_types,
schedule_kind=kind,
force=force,
)
)
return {
"ok": all(run.get("ok", False) for run in runs) if runs else True,
"status": "completed",
"schedule_kinds": kinds,
"runs": runs,
"ready": sum(int(run.get("ready") or 0) for run in runs),
"errors": sum(int(run.get("errors") or 0) for run in runs),
"jobs": [job for run in runs for job in run.get("jobs", [])],
}