Files
ewoooc/services/ppt_auto_generation_service.py
OoO c420d48263
Some checks failed
CD Pipeline / deploy (push) Failing after 26s
Fix PPT auto generation and analytics fallbacks
2026-05-18 11:52:31 +08:00

327 lines
9.9 KiB
Python

"""
PPT auto-generation orchestration.
The observability page audits generated decks, but the scheduler previously
only ran the vision audit. This service fills that gap by materializing the
defined deck set before the audit window.
"""
from __future__ import annotations
import os
import threading
from dataclasses import asdict, dataclass
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Iterable, Sequence
from sqlalchemy import text as sa_text
from database.manager import get_session
TAIPEI_TZ = timezone(timedelta(hours=8))
DEFINED_REPORT_TYPES = ("daily", "weekly", "monthly", "strategy", "competitor", "promo")
REPORT_TYPE_LABELS = {
"daily": "每日日報",
"weekly": "週報",
"monthly": "月報",
"strategy": "策略",
"competitor": "競品",
"promo": "促銷",
}
REPORT_PREFIXES = {
"daily": "ocbot_daily_",
"weekly": "ocbot_weekly_",
"monthly": "ocbot_monthly_",
"strategy": "ocbot_strategy_",
"competitor": "ocbot_competitor_",
"promo": "ocbot_promo_",
}
_RUN_LOCK = threading.Lock()
_LAST_RUN: dict | None = None
@dataclass(frozen=True)
class PPTAutoJob:
report_type: str
label: str
sub_type: str
sub_arg: str
target_date: str
def _truthy(value: str | None, default: bool = False) -> bool:
if value is None:
return default
return value.strip().lower() in {"1", "true", "yes", "on"}
def is_ppt_auto_generation_enabled() -> bool:
return _truthy(os.getenv("PPT_AUTO_GENERATION_ENABLED"), default=True)
def _parse_report_types(report_types: Iterable[str] | str | None) -> list[str]:
if report_types is None:
raw = os.getenv("PPT_AUTO_REPORT_TYPES", ",".join(DEFINED_REPORT_TYPES))
parts = raw.split(",")
elif isinstance(report_types, str):
parts = report_types.split(",")
else:
parts = list(report_types)
parsed = []
for part in parts:
key = str(part or "").strip().lower()
if key == "all":
return list(DEFINED_REPORT_TYPES)
if key in DEFINED_REPORT_TYPES and key not in parsed:
parsed.append(key)
return parsed or list(DEFINED_REPORT_TYPES)
def _latest_sales_date() -> str | None:
try:
from routes.openclaw_bot_routes import latest_date
return latest_date()
except Exception:
return None
def _normalise_date(value: str | None) -> str:
if value:
cleaned = value.strip().replace("-", "/")
try:
dt = datetime.strptime(cleaned, "%Y/%m/%d")
return dt.strftime("%Y/%m/%d")
except ValueError:
pass
return (datetime.now(TAIPEI_TZ) - timedelta(days=1)).strftime("%Y/%m/%d")
def build_defined_ppt_jobs(
*,
latest_date: str | None = None,
report_types: Iterable[str] | str | None = None,
) -> list[PPTAutoJob]:
target = _normalise_date(latest_date or _latest_sales_date())
target_dt = datetime.strptime(target, "%Y/%m/%d")
month_arg = target_dt.strftime("%Y/%m")
promo_start = (target_dt - timedelta(days=6)).strftime("%Y/%m/%d")
promo_arg = f"{promo_start}-{target}"
job_map = {
"daily": PPTAutoJob("daily", "每日日報", "daily", target, target),
"weekly": PPTAutoJob("weekly", "週報", "weekly", "", target),
"monthly": PPTAutoJob("monthly", "月報", "monthly", month_arg, target),
"strategy": PPTAutoJob("strategy", "策略(月)", "strategy", f"monthly {month_arg}", target),
"competitor": PPTAutoJob("competitor", "競品(月)", "competitor", "monthly", target),
"promo": PPTAutoJob("promo", "促銷(近 7 日)", "promo", promo_arg, target),
}
return [job_map[key] for key in _parse_report_types(report_types)]
def get_defined_report_coverage(
*,
month_start: datetime,
month_end: datetime,
reports_dir: str | os.PathLike[str] | None = None,
report_types: Iterable[str] | str | None = None,
) -> dict:
selected_types = _parse_report_types(report_types)
counts = {key: 0 for key in selected_types}
sources = {key: set() for key in selected_types}
try:
session = get_session()
try:
rows = session.execute(
sa_text(
"""
SELECT report_type, COUNT(*)
FROM ppt_reports
WHERE generated_at >= :month_start
AND generated_at < :month_end
GROUP BY report_type
"""
),
{"month_start": month_start, "month_end": month_end},
).fetchall()
for report_type, count in rows:
if report_type in counts:
counts[report_type] = max(counts[report_type], int(count or 0))
if count:
sources[report_type].add("database")
finally:
session.close()
except Exception:
pass
root = Path(reports_dir or os.getenv("REPORTS_DIR", "/app/data/reports"))
if root.is_dir():
month_start_ts = month_start.timestamp()
month_end_ts = month_end.timestamp()
for path in root.iterdir():
if not path.is_file() or path.is_symlink() or path.suffix.lower() != ".pptx":
continue
try:
mtime = path.stat().st_mtime
except OSError:
continue
if not (month_start_ts <= mtime < month_end_ts):
continue
for report_type in selected_types:
if path.name.startswith(REPORT_PREFIXES[report_type]):
counts[report_type] += 1
sources[report_type].add("filesystem")
items = [
{
"key": key,
"label": REPORT_TYPE_LABELS[key],
"count": counts[key],
"ready": counts[key] > 0,
"sources": sorted(sources[key]),
}
for key in selected_types
]
missing = [item for item in items if not item["ready"]]
return {
"enabled": is_ppt_auto_generation_enabled(),
"items": items,
"missing_report_types": [item["key"] for item in missing],
"missing_count": len(missing),
"ready_count": len(items) - len(missing),
"total": len(items),
"last_run": _LAST_RUN,
}
def _generate_job(job: PPTAutoJob) -> str | None:
from routes import openclaw_bot_routes as bot_routes
original_send_message = getattr(bot_routes, "send_message", None)
def _noop_send_message(*_args, **_kwargs):
return None
if original_send_message is not None:
bot_routes.send_message = _noop_send_message
try:
return bot_routes._generate_ppt_cmd(
job.sub_type,
job.sub_arg,
0,
job.target_date,
_reply_to=None,
)
finally:
if original_send_message is not None:
bot_routes.send_message = original_send_message
def generate_defined_ppt_reports(
*,
report_types: Iterable[str] | str | None = None,
force: bool = False,
dry_run: bool = False,
max_jobs: int | None = None,
) -> dict:
global _LAST_RUN
if not force and not is_ppt_auto_generation_enabled():
result = {
"ok": False,
"status": "disabled",
"message": "PPT_AUTO_GENERATION_ENABLED=false",
"jobs": [],
}
_LAST_RUN = result
return result
jobs = build_defined_ppt_jobs(report_types=report_types)
if max_jobs is not None:
jobs = jobs[: max(0, int(max_jobs))]
if dry_run:
return {
"ok": True,
"status": "planned",
"jobs": [asdict(job) for job in jobs],
}
if not _RUN_LOCK.acquire(blocking=False):
return {
"ok": True,
"status": "already_running",
"message": "PPT auto-generation is already running.",
"jobs": [],
"last_run": _LAST_RUN,
}
started_at = datetime.now(TAIPEI_TZ)
results = []
try:
for job in jobs:
item = asdict(job)
try:
path = _generate_job(job)
item["path"] = path
item["exists"] = bool(path and os.path.exists(path))
item["status"] = "ready" if item["exists"] else "missing_file"
except Exception as exc:
item["status"] = "error"
item["error"] = f"{type(exc).__name__}: {str(exc)[:220]}"
results.append(item)
finished_at = datetime.now(TAIPEI_TZ)
result = {
"ok": True,
"status": "completed",
"started_at": started_at.strftime("%Y-%m-%d %H:%M:%S"),
"finished_at": finished_at.strftime("%Y-%m-%d %H:%M:%S"),
"duration_sec": round((finished_at - started_at).total_seconds(), 1),
"jobs": results,
"ready": sum(1 for item in results if item.get("status") == "ready"),
"errors": sum(1 for item in results if item.get("status") == "error"),
}
_LAST_RUN = result
return result
finally:
_RUN_LOCK.release()
def start_defined_ppt_generation_background(
*,
report_types: Sequence[str] | str | None = None,
force: bool = False,
) -> dict:
if _RUN_LOCK.locked():
return {
"ok": True,
"status": "already_running",
"message": "PPT auto-generation is already running.",
"last_run": _LAST_RUN,
}
def _run():
generate_defined_ppt_reports(report_types=report_types, force=force)
thread = threading.Thread(target=_run, name="ppt-auto-generation", daemon=True)
thread.start()
return {
"ok": True,
"status": "queued",
"message": "PPT auto-generation queued.",
"report_types": _parse_report_types(report_types),
}
def get_last_generation_status() -> dict | None:
return _LAST_RUN