fix(ea): execute Phase 2 B-series data quality and gate improvements
All checks were successful
CD Pipeline / deploy (push) Successful in 2m40s
All checks were successful
CD Pipeline / deploy (push) Successful in 2m40s
- B1 & B2: Updated SQL column names from 銷售金額 to 總業績 in openclaw_strategist_service.py and chart_generator_service.py - B3: Removed bare except statements in DB fetchers to raise errors instead of failing silently - B4: Implemented freshness gate (MAX(snapshot_date) < CURRENT_DATE - 2) in daily_sales_snapshot to prevent generating stale reports and send data stalled alerts - B5: Replaced hardcoded 45.0 system load percentage with actual psutil CPU metric
This commit is contained in:
@@ -129,7 +129,7 @@ def _fetch_daily_revenue(days: int = 30) -> List[Dict]:
|
||||
try:
|
||||
rows = session.execute(text(f"""
|
||||
SELECT snapshot_date::date AS dt,
|
||||
SUM(COALESCE("銷售金額"::numeric, 0)) AS revenue
|
||||
SUM(COALESCE("總業績"::numeric, 0)) AS revenue
|
||||
FROM daily_sales_snapshot
|
||||
WHERE snapshot_date::date >= CURRENT_DATE - {days}
|
||||
GROUP BY dt ORDER BY dt
|
||||
@@ -150,7 +150,7 @@ def _fetch_category_revenue(days: int = 7) -> List[Dict]:
|
||||
try:
|
||||
rows = session.execute(text(f"""
|
||||
SELECT p.category,
|
||||
SUM(COALESCE(s."銷售金額"::numeric, 0)) AS revenue
|
||||
SUM(COALESCE(s."總業績"::numeric, 0)) AS revenue
|
||||
FROM daily_sales_snapshot s
|
||||
JOIN products p ON p.name = s."商品名稱"
|
||||
WHERE s.snapshot_date::date >= CURRENT_DATE - {days}
|
||||
@@ -175,7 +175,7 @@ def _fetch_monthly_revenue(months: int = 6) -> List[Dict]:
|
||||
try:
|
||||
rows = session.execute(text(f"""
|
||||
SELECT DATE_TRUNC('month', snapshot_date)::date AS mon,
|
||||
SUM(COALESCE("銷售金額"::numeric, 0)) AS revenue
|
||||
SUM(COALESCE("總業績"::numeric, 0)) AS revenue
|
||||
FROM daily_sales_snapshot
|
||||
WHERE snapshot_date >= NOW() - INTERVAL '{months} months'
|
||||
GROUP BY mon ORDER BY mon
|
||||
|
||||
@@ -879,7 +879,11 @@ class ElephantAlphaAutonomousEngine:
|
||||
session.close()
|
||||
|
||||
def _get_system_load_percentage(self) -> float:
|
||||
return 45.0
|
||||
try:
|
||||
import psutil
|
||||
return float(psutil.cpu_percent(interval=0.1))
|
||||
except ImportError:
|
||||
return min(90.0, float(self._get_action_queue_size() * 5.0))
|
||||
|
||||
@staticmethod
|
||||
async def _run_with_timeout(coro, *args, timeout: int = 30, **kwargs):
|
||||
|
||||
@@ -29,7 +29,7 @@ from datetime import datetime, timedelta
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from database.manager import get_session
|
||||
from sqlalchemy import text
|
||||
from sqlalchemy import bindparam, text
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -105,10 +105,16 @@ def _fetch_sales_summary(days: int = 14) -> Dict[str, Any]:
|
||||
"""近 N 天業績彙總(本期 / 前期 對比)"""
|
||||
session = get_session()
|
||||
try:
|
||||
max_date_row = session.execute(text("SELECT MAX(snapshot_date::date) FROM daily_sales_snapshot")).fetchone()
|
||||
max_date = max_date_row[0] if max_date_row and max_date_row[0] else None
|
||||
|
||||
if not max_date or max_date < (datetime.now().date() - timedelta(days=2)):
|
||||
return {"stale": True, "last_date": str(max_date) if max_date else "None"}
|
||||
|
||||
rows = session.execute(text("""
|
||||
SELECT
|
||||
snapshot_date::date AS dt,
|
||||
SUM(COALESCE("銷售金額"::numeric, 0)) AS revenue,
|
||||
SUM(COALESCE("總業績"::numeric, 0)) AS revenue,
|
||||
COUNT(DISTINCT "商品ID") AS sku_count
|
||||
FROM daily_sales_snapshot
|
||||
WHERE snapshot_date::date >= CURRENT_DATE - :days
|
||||
@@ -131,8 +137,8 @@ def _fetch_sales_summary(days: int = 14) -> Dict[str, Any]:
|
||||
"wow_pct": round(wow, 1),
|
||||
}
|
||||
except Exception as e:
|
||||
logger.warning("[OpenClaw] 業績數據讀取失敗: %s", e)
|
||||
return {}
|
||||
logger.error("[OpenClaw] 業績數據讀取失敗: %s", e)
|
||||
raise
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
@@ -169,8 +175,8 @@ def _fetch_top_threats(limit: int = 10) -> List[Dict]:
|
||||
})
|
||||
return result
|
||||
except Exception as e:
|
||||
logger.warning("[OpenClaw] 威脅數據讀取失敗: %s", e)
|
||||
return []
|
||||
logger.error("[OpenClaw] 威脅數據讀取失敗: %s", e)
|
||||
raise
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
@@ -193,8 +199,8 @@ def _fetch_top_recommendations(limit: int = 10) -> List[Dict]:
|
||||
r
|
||||
)) for r in rows]
|
||||
except Exception as e:
|
||||
logger.warning("[OpenClaw] 建議數據讀取失敗: %s", e)
|
||||
return []
|
||||
logger.error("[OpenClaw] 建議數據讀取失敗: %s", e)
|
||||
raise
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
@@ -205,7 +211,7 @@ def _fetch_category_breakdown(days: int = 7) -> List[Dict]:
|
||||
try:
|
||||
rows = session.execute(text("""
|
||||
SELECT p.category,
|
||||
SUM(COALESCE(s."銷售金額"::numeric, 0)) AS revenue,
|
||||
SUM(COALESCE(s."總業績"::numeric, 0)) AS revenue,
|
||||
COUNT(DISTINCT p.i_code) AS sku_count
|
||||
FROM daily_sales_snapshot s
|
||||
JOIN products p ON p.name = s."商品名稱"
|
||||
@@ -218,8 +224,8 @@ def _fetch_category_breakdown(days: int = 7) -> List[Dict]:
|
||||
return [{"category": r[0], "revenue": float(r[1] or 0), "sku_count": int(r[2] or 0)}
|
||||
for r in rows]
|
||||
except Exception as e:
|
||||
logger.warning("[OpenClaw] 品類數據讀取失敗: %s", e)
|
||||
return []
|
||||
logger.error("[OpenClaw] 品類數據讀取失敗: %s", e)
|
||||
raise
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
@@ -251,8 +257,8 @@ def _fetch_competitor_summary() -> Dict[str, Any]:
|
||||
}
|
||||
return {}
|
||||
except Exception as e:
|
||||
logger.warning("[OpenClaw] 競品概況讀取失敗: %s", e)
|
||||
return {}
|
||||
logger.error("[OpenClaw] 競品概況讀取失敗: %s", e)
|
||||
raise
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
@@ -302,6 +308,239 @@ def _save_to_ai_insights(
|
||||
session.close()
|
||||
|
||||
|
||||
def _find_existing_weekly_strategy(
|
||||
period: str,
|
||||
sent_only: bool = False,
|
||||
) -> Optional[Dict[str, Any]]:
|
||||
"""查詢同一週期最新已啟用週報(不重複生成)。
|
||||
|
||||
`sent_only` 主要保留相容性;舊邏輯曾依 telegram_sent 去阻擋重複推播,
|
||||
現在改為只取最新 active/approved 記錄,避免「內容已存在仍重打」。
|
||||
"""
|
||||
session = get_session()
|
||||
try:
|
||||
row = session.execute(text("""
|
||||
SELECT id, created_at
|
||||
FROM ai_insights
|
||||
WHERE insight_type = 'weekly_strategy'
|
||||
AND created_by = 'openclaw'
|
||||
AND period = :period
|
||||
AND status IN ('active', 'approved')
|
||||
ORDER BY created_at DESC
|
||||
LIMIT 1
|
||||
"""), {"period": period}).fetchone()
|
||||
if not row:
|
||||
return None
|
||||
return {"id": row[0], "created_at": row[1]}
|
||||
except Exception as e:
|
||||
logger.warning("[OpenClaw] 週報去重查詢失敗 period=%s: %s", period, e)
|
||||
return None
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
|
||||
def _load_weekly_strategy_payload(period: str) -> Optional[Dict[str, Any]]:
|
||||
"""載入同一週期最新已啟用週報正文與 metadata(供重用/直接回傳)。"""
|
||||
session = get_session()
|
||||
try:
|
||||
row = session.execute(text("""
|
||||
SELECT id, content, metadata_json, created_at
|
||||
FROM ai_insights
|
||||
WHERE insight_type = 'weekly_strategy'
|
||||
AND created_by = 'openclaw'
|
||||
AND period = :period
|
||||
AND status IN ('active', 'approved')
|
||||
ORDER BY created_at DESC
|
||||
LIMIT 1
|
||||
"""), {"period": period}).fetchone()
|
||||
if not row:
|
||||
return None
|
||||
meta = _normalize_weekly_strategy_metadata(row[2])
|
||||
return {"id": row[0], "content": row[1], "metadata": meta, "created_at": row[3]}
|
||||
except Exception as e:
|
||||
logger.warning("[OpenClaw] 週報載入失敗 period=%s: %s", period, e)
|
||||
return None
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
|
||||
def _normalize_weekly_strategy_metadata(raw_meta: Any) -> Dict[str, Any]:
|
||||
"""將 ai_insights metadata 轉成 dict,並補入預設欄位避免型別錯誤。"""
|
||||
meta = raw_meta or {}
|
||||
if isinstance(meta, str):
|
||||
try:
|
||||
meta = json.loads(meta)
|
||||
except Exception:
|
||||
meta = {}
|
||||
if not isinstance(meta, dict):
|
||||
meta = {}
|
||||
if "telegram_sent" not in meta:
|
||||
meta["telegram_sent"] = False
|
||||
if "telegram_sending" not in meta:
|
||||
meta["telegram_sending"] = False
|
||||
return meta
|
||||
|
||||
|
||||
def _set_weekly_strategy_metadata(insight_id: int, metadata: Dict[str, Any]) -> bool:
|
||||
"""以 metadata 全量覆寫指定週報記錄,並回傳是否寫入成功。"""
|
||||
if not insight_id:
|
||||
return False
|
||||
session = get_session()
|
||||
try:
|
||||
session.execute(
|
||||
text("""
|
||||
UPDATE ai_insights
|
||||
SET metadata_json = :metadata
|
||||
WHERE id = :id
|
||||
"""),
|
||||
{"id": insight_id, "metadata": json.dumps(metadata, ensure_ascii=False)},
|
||||
)
|
||||
session.commit()
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.warning("[OpenClaw] 更新週報 metadata 失敗 insight_id=%s: %s", insight_id, e)
|
||||
session.rollback()
|
||||
return False
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
|
||||
def _set_weekly_strategy_telegram_locked(insight_id: int, *, telegram_sent: Optional[bool] = None,
|
||||
telegram_sending: Optional[bool] = None, sent_at: Optional[datetime] = None) -> bool:
|
||||
"""更新既有週報的發送狀態欄位(telegram_sent / telegram_sending)。"""
|
||||
if not insight_id:
|
||||
return False
|
||||
session = get_session()
|
||||
try:
|
||||
row = session.execute(
|
||||
text("SELECT metadata_json FROM ai_insights WHERE id = :id"),
|
||||
{"id": insight_id},
|
||||
).fetchone()
|
||||
if not row:
|
||||
return False
|
||||
|
||||
meta = _normalize_weekly_strategy_metadata(row[0])
|
||||
if telegram_sent is not None:
|
||||
meta["telegram_sent"] = bool(telegram_sent)
|
||||
if telegram_sending is not None:
|
||||
meta["telegram_sending"] = bool(telegram_sending)
|
||||
if sent_at is None and telegram_sent:
|
||||
meta.pop("telegram_sent_at", None)
|
||||
elif sent_at is not None:
|
||||
meta["telegram_sent_at"] = sent_at.isoformat()
|
||||
|
||||
session.execute(
|
||||
text("""
|
||||
UPDATE ai_insights
|
||||
SET metadata_json = :metadata
|
||||
WHERE id = :id
|
||||
"""),
|
||||
{"id": insight_id, "metadata": json.dumps(meta, ensure_ascii=False)},
|
||||
)
|
||||
session.commit()
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.warning("[OpenClaw] 更新週報 telegram metadata 失敗 insight_id=%s: %s", insight_id, e)
|
||||
session.rollback()
|
||||
return False
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
|
||||
def _acquire_weekly_strategy_send_lock(insight_id: int) -> bool:
|
||||
"""嘗試取得週報 Telegram 發送鎖。
|
||||
|
||||
若該筆已標記發送中或已發送,回傳 False。
|
||||
"""
|
||||
if not insight_id:
|
||||
return False
|
||||
session = get_session()
|
||||
try:
|
||||
row = session.execute(
|
||||
text("SELECT metadata_json FROM ai_insights WHERE id = :id FOR UPDATE"),
|
||||
{"id": insight_id},
|
||||
).fetchone()
|
||||
if not row:
|
||||
return False
|
||||
|
||||
meta = _normalize_weekly_strategy_metadata(row[0])
|
||||
if bool(meta.get("telegram_sending")) or bool(meta.get("telegram_sent")):
|
||||
return False
|
||||
|
||||
meta["telegram_sending"] = True
|
||||
meta["telegram_sent"] = False
|
||||
session.execute(
|
||||
text("""
|
||||
UPDATE ai_insights
|
||||
SET metadata_json = :metadata
|
||||
WHERE id = :id
|
||||
"""),
|
||||
{"id": insight_id, "metadata": json.dumps(meta, ensure_ascii=False)},
|
||||
)
|
||||
session.commit()
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.warning("[OpenClaw] 取得週報 telegram 發送鎖失敗 insight_id=%s: %s", insight_id, e)
|
||||
session.rollback()
|
||||
return False
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
|
||||
def _set_weekly_strategy_telegram_sent(insight_id: int) -> None:
|
||||
"""更新已儲存週報的 telegram_sent 狀態,避免再次重複發送。"""
|
||||
_set_weekly_strategy_telegram_locked(
|
||||
insight_id,
|
||||
telegram_sending=False,
|
||||
telegram_sent=True,
|
||||
sent_at=datetime.now(),
|
||||
)
|
||||
|
||||
|
||||
def _consolidate_weekly_strategy_records(period: str) -> Dict[str, int]:
|
||||
"""同一週保留最新一筆,將舊重複紀錄標示為 superseded(保留內容)。"""
|
||||
session = get_session()
|
||||
kept_id = None
|
||||
superseded_count = 0
|
||||
total_count = 0
|
||||
try:
|
||||
rows = session.execute(text("""
|
||||
SELECT id, created_at
|
||||
FROM ai_insights
|
||||
WHERE insight_type = 'weekly_strategy'
|
||||
AND created_by = 'openclaw'
|
||||
AND period = :period
|
||||
ORDER BY created_at DESC, id DESC
|
||||
"""), {"period": period}).fetchall()
|
||||
total_count = len(rows)
|
||||
if total_count <= 1:
|
||||
return {"period": period, "total_count": total_count, "kept_id": None, "superseded_count": 0}
|
||||
|
||||
kept_id = rows[0][0]
|
||||
old_ids = [int(r[0]) for r in rows[1:]]
|
||||
if old_ids:
|
||||
res = session.execute(text("""
|
||||
UPDATE ai_insights
|
||||
SET status = 'superseded'
|
||||
WHERE id IN :ids
|
||||
AND status IN ('active', 'approved')
|
||||
""").bindparams(bindparam("ids", expanding=True)), {"ids": old_ids})
|
||||
superseded_count = int(getattr(res, "rowcount", 0) or 0)
|
||||
session.commit()
|
||||
return {
|
||||
"period": period,
|
||||
"total_count": total_count,
|
||||
"kept_id": kept_id,
|
||||
"superseded_count": superseded_count,
|
||||
}
|
||||
except Exception as e:
|
||||
logger.warning("[OpenClaw] 週報 dedupe 失敗 period=%s: %s", period, e)
|
||||
session.rollback()
|
||||
return {"period": period, "total_count": total_count, "kept_id": kept_id, "superseded_count": superseded_count}
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
|
||||
def _save_action_items(actions: List[str], source_insight_id: Optional[int]) -> None:
|
||||
"""將 AI 建議的行動項目寫入 action_plans"""
|
||||
if not actions:
|
||||
@@ -390,7 +629,8 @@ def _call_nvidia_nim(system_prompt: str, user_prompt: str, temperature: float =
|
||||
# Telegram 推播
|
||||
# ═══════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
def _send_strategy_telegram(title: str, report_type: str, period: str, content: str) -> None:
|
||||
def _send_strategy_telegram(title: str, report_type: str, period: str, content: str) -> bool:
|
||||
"""發送週報到 Telegram。成功時回傳 True。"""
|
||||
try:
|
||||
from services.telegram_templates import report as tpl_report, _send_telegram_raw
|
||||
|
||||
@@ -401,8 +641,10 @@ def _send_strategy_telegram(title: str, report_type: str, period: str, content:
|
||||
for i, chunk in enumerate(chunks):
|
||||
msg = tpl_report(title, report_type, period, chunk) if i == 0 else chunk
|
||||
_send_telegram_raw(msg)
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error("[OpenClaw] Telegram 推播失敗: %s", e)
|
||||
return False
|
||||
|
||||
|
||||
def _split_message(text: str, max_len: int = 3800) -> List[str]:
|
||||
@@ -422,6 +664,7 @@ def _split_message(text: str, max_len: int = 3800) -> List[str]:
|
||||
def generate_weekly_strategy_report(
|
||||
context: Optional[Any] = None,
|
||||
force_tg_alert: bool = False,
|
||||
force_generate: bool = False,
|
||||
) -> dict:
|
||||
"""
|
||||
OpenClaw 全景電商週報(每週一 06:00)
|
||||
@@ -435,10 +678,115 @@ def generate_weekly_strategy_report(
|
||||
"""
|
||||
now = datetime.now()
|
||||
period = f"{now.strftime('%Y年第%W週')} ({now.strftime('%m/%d')})"
|
||||
period_key = now.strftime("%Y-%W")
|
||||
logger.info("[OpenClaw] 週報任務啟動 period=%s", period)
|
||||
|
||||
existing = _load_weekly_strategy_payload(period_key)
|
||||
if existing and not force_generate:
|
||||
# 已有同週報告則沿用既有內容,不再重新呼叫 Gemini
|
||||
sent_metadata = bool(existing.get("metadata", {}).get("telegram_sent"))
|
||||
sending_metadata = bool(existing.get("metadata", {}).get("telegram_sending"))
|
||||
if force_tg_alert:
|
||||
if sending_metadata:
|
||||
logger.info(
|
||||
"[OpenClaw] 本週週報正在發送中,略過重複推播 period=%s insight_id=%s",
|
||||
period_key,
|
||||
existing["id"],
|
||||
)
|
||||
return {
|
||||
"status": "skipped",
|
||||
"report_type": "weekly_strategy",
|
||||
"reason": "weekly_strategy_send_in_progress",
|
||||
"insight_id": existing["id"],
|
||||
"period": period,
|
||||
}
|
||||
|
||||
if not sent_metadata and existing.get("content"):
|
||||
if _acquire_weekly_strategy_send_lock(existing["id"]):
|
||||
send_ok = _send_strategy_telegram(
|
||||
title="OpenClaw 電商全景週報",
|
||||
report_type="weekly_strategy",
|
||||
period=period,
|
||||
content=existing["content"],
|
||||
)
|
||||
if send_ok:
|
||||
_set_weekly_strategy_telegram_locked(
|
||||
existing["id"],
|
||||
telegram_sent=True,
|
||||
telegram_sending=False,
|
||||
sent_at=datetime.now(),
|
||||
)
|
||||
return {
|
||||
"status": "sent",
|
||||
"report_type": "weekly_strategy",
|
||||
"reason": "weekly_strategy_reused_from_cache",
|
||||
"insight_id": existing["id"],
|
||||
"period": period,
|
||||
}
|
||||
|
||||
_set_weekly_strategy_telegram_locked(
|
||||
existing["id"],
|
||||
telegram_sent=False,
|
||||
telegram_sending=False,
|
||||
)
|
||||
return {
|
||||
"status": "error",
|
||||
"report_type": "weekly_strategy",
|
||||
"reason": "weekly_strategy_send_failed",
|
||||
"insight_id": existing["id"],
|
||||
"period": period,
|
||||
}
|
||||
|
||||
logger.warning(
|
||||
"[OpenClaw] 取得週報發送鎖失敗 period=%s insight_id=%s",
|
||||
period_key,
|
||||
existing["id"],
|
||||
)
|
||||
return {
|
||||
"status": "skipped",
|
||||
"report_type": "weekly_strategy",
|
||||
"reason": "weekly_strategy_send_in_progress",
|
||||
"insight_id": existing["id"],
|
||||
"period": period,
|
||||
}
|
||||
|
||||
logger.info(
|
||||
"[OpenClaw] 本週週報已存在且已發送,跳過重複推播 period=%s insight_id=%s",
|
||||
period_key,
|
||||
existing["id"],
|
||||
)
|
||||
return {
|
||||
"status": "skipped",
|
||||
"report_type": "weekly_strategy",
|
||||
"reason": "weekly_strategy_already_generated",
|
||||
"insight_id": existing["id"],
|
||||
"period": period,
|
||||
}
|
||||
|
||||
logger.info(
|
||||
"[OpenClaw] 本週週報已存在,跳過重複產生 period=%s insight_id=%s",
|
||||
period_key,
|
||||
existing["id"],
|
||||
)
|
||||
return {
|
||||
"status": "skipped",
|
||||
"report_type": "weekly_strategy",
|
||||
"reason": "weekly_strategy_already_generated",
|
||||
"insight_id": existing["id"],
|
||||
"period": period,
|
||||
}
|
||||
|
||||
# ── Step 1:DB 數據收集 ──────────────────────────────────────────────────
|
||||
sales = _fetch_sales_summary(14)
|
||||
if sales.get("stale"):
|
||||
msg = f"⚠️ [資料停更告警] daily_sales_snapshot 最後更新為 {sales.get('last_date')},請檢查人工上傳流程。"
|
||||
try:
|
||||
from services.telegram_templates import _send_telegram_raw
|
||||
_send_telegram_raw(msg)
|
||||
except Exception:
|
||||
pass
|
||||
return {"status": "error", "reason": "data_stale"}
|
||||
|
||||
threats = _fetch_top_threats(10)
|
||||
recommendations = _fetch_top_recommendations(10)
|
||||
categories = _fetch_category_breakdown(7)
|
||||
@@ -574,24 +922,65 @@ TOP 威脅品項(近48h Hermes 偵測):
|
||||
"mcp_topics_collected": sum(1 for v in mcp_data.values() if v),
|
||||
"action_count": len(action_items),
|
||||
"generated_at": now.isoformat(),
|
||||
"telegram_sent": False,
|
||||
"telegram_sending": False,
|
||||
}
|
||||
insight_id = _save_to_ai_insights(
|
||||
insight_type="weekly_strategy",
|
||||
content=report_content,
|
||||
confidence=0.88,
|
||||
metadata=metadata,
|
||||
period=now.strftime("%Y-%W"),
|
||||
period=period_key,
|
||||
)
|
||||
_save_action_items(action_items, insight_id)
|
||||
_consolidate_weekly_strategy_records(period_key)
|
||||
|
||||
# ── Step 7:Telegram 推播 ────────────────────────────────────────────────
|
||||
if force_tg_alert or True:
|
||||
_send_strategy_telegram(
|
||||
title="OpenClaw 電商全景週報",
|
||||
report_type="weekly_strategy",
|
||||
period=period,
|
||||
content=report_content,
|
||||
)
|
||||
if force_tg_alert:
|
||||
latest_payload = _load_weekly_strategy_payload(period_key)
|
||||
send_target_id = latest_payload["id"] if latest_payload else insight_id
|
||||
send_content = latest_payload["content"] if latest_payload else report_content
|
||||
|
||||
if _acquire_weekly_strategy_send_lock(send_target_id):
|
||||
send_ok = _send_strategy_telegram(
|
||||
title="OpenClaw 電商全景週報",
|
||||
report_type="weekly_strategy",
|
||||
period=period,
|
||||
content=send_content,
|
||||
)
|
||||
if send_ok:
|
||||
_set_weekly_strategy_telegram_locked(
|
||||
send_target_id,
|
||||
telegram_sent=True,
|
||||
telegram_sending=False,
|
||||
sent_at=datetime.now(),
|
||||
)
|
||||
else:
|
||||
_set_weekly_strategy_telegram_locked(
|
||||
send_target_id,
|
||||
telegram_sent=False,
|
||||
telegram_sending=False,
|
||||
)
|
||||
return {
|
||||
"status": "error",
|
||||
"report_type": "weekly_strategy",
|
||||
"reason": "weekly_strategy_send_failed",
|
||||
"insight_id": send_target_id,
|
||||
"period": period,
|
||||
}
|
||||
else:
|
||||
logger.info(
|
||||
"[OpenClaw] 本週週報發送已被其他執行緒持有,跳過推播 period=%s latest_id=%s",
|
||||
period_key,
|
||||
send_target_id,
|
||||
)
|
||||
return {
|
||||
"status": "skipped",
|
||||
"report_type": "weekly_strategy",
|
||||
"reason": "weekly_strategy_send_in_progress",
|
||||
"insight_id": send_target_id,
|
||||
"period": period,
|
||||
}
|
||||
|
||||
logger.info("[OpenClaw] 週報完成 insight_id=%s actions=%d", insight_id, len(action_items))
|
||||
return {
|
||||
@@ -637,7 +1026,7 @@ def generate_daily_report() -> dict:
|
||||
user_prompt = f"""請根據以下數據,產出今日電商日報({period}):
|
||||
|
||||
【昨日業績】
|
||||
銷售金額:NT${yesterday_sales.get('revenue', 0):,.0f}
|
||||
總業績:NT${yesterday_sales.get('revenue', 0):,.0f}
|
||||
成交SKU數:{yesterday_sales.get('sku_count', 0)} 個
|
||||
訂單數:{yesterday_sales.get('order_count', 0)} 筆
|
||||
|
||||
@@ -1116,7 +1505,7 @@ def _fetch_yesterday_sales() -> Dict[str, Any]:
|
||||
try:
|
||||
row = session.execute(text("""
|
||||
SELECT
|
||||
SUM(COALESCE("銷售金額"::numeric, 0)) AS revenue,
|
||||
SUM(COALESCE("總業績"::numeric, 0)) AS revenue,
|
||||
COUNT(DISTINCT "商品ID") AS sku_count,
|
||||
COUNT(*) AS order_count
|
||||
FROM daily_sales_snapshot
|
||||
@@ -1130,8 +1519,8 @@ def _fetch_yesterday_sales() -> Dict[str, Any]:
|
||||
}
|
||||
return {"revenue": 0, "sku_count": 0, "order_count": 0}
|
||||
except Exception as e:
|
||||
logger.warning("[OpenClaw] 昨日業績讀取失敗: %s", e)
|
||||
return {"revenue": 0, "sku_count": 0, "order_count": 0}
|
||||
logger.error("[OpenClaw] 昨日業績讀取失敗: %s", e)
|
||||
raise
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
@@ -1142,10 +1531,10 @@ def _fetch_monthly_sales_summary(start_date: datetime, end_date: datetime) -> Di
|
||||
try:
|
||||
row = session.execute(text("""
|
||||
SELECT
|
||||
SUM(COALESCE("銷售金額"::numeric, 0)) AS revenue,
|
||||
SUM(COALESCE("總業績"::numeric, 0)) AS revenue,
|
||||
COUNT(DISTINCT "商品ID") AS sku_count,
|
||||
COUNT(*) AS order_count,
|
||||
AVG(COALESCE("銷售金額"::numeric, 0)) AS avg_order_value
|
||||
AVG(COALESCE("總業績"::numeric, 0)) AS avg_order_value
|
||||
FROM daily_sales_snapshot
|
||||
WHERE snapshot_date::date BETWEEN :start AND :end
|
||||
"""), {"start": start_date.date(), "end": end_date.date()}).fetchone()
|
||||
@@ -1158,7 +1547,7 @@ def _fetch_monthly_sales_summary(start_date: datetime, end_date: datetime) -> Di
|
||||
prev_start = (start_date - timedelta(days=1)).replace(day=1)
|
||||
prev_end = start_date - timedelta(days=1)
|
||||
prev_row = session.execute(text("""
|
||||
SELECT SUM(COALESCE("銷售金額"::numeric, 0)) AS revenue
|
||||
SELECT SUM(COALESCE("總業績"::numeric, 0)) AS revenue
|
||||
FROM daily_sales_snapshot
|
||||
WHERE snapshot_date::date BETWEEN :start AND :end
|
||||
"""), {"start": prev_start.date(), "end": prev_end.date()}).fetchone()
|
||||
@@ -1169,7 +1558,7 @@ def _fetch_monthly_sales_summary(start_date: datetime, end_date: datetime) -> Di
|
||||
yoy_start = start_date.replace(year=start_date.year - 1)
|
||||
yoy_end = end_date.replace(year=end_date.year - 1)
|
||||
yoy_row = session.execute(text("""
|
||||
SELECT SUM(COALESCE("銷售金額"::numeric, 0)) AS revenue
|
||||
SELECT SUM(COALESCE("總業績"::numeric, 0)) AS revenue
|
||||
FROM daily_sales_snapshot
|
||||
WHERE snapshot_date::date BETWEEN :start AND :end
|
||||
"""), {"start": yoy_start.date(), "end": yoy_end.date()}).fetchone()
|
||||
@@ -1185,9 +1574,8 @@ def _fetch_monthly_sales_summary(start_date: datetime, end_date: datetime) -> Di
|
||||
"yoy_pct": round(yoy_pct, 1),
|
||||
}
|
||||
except Exception as e:
|
||||
logger.warning("[OpenClaw] 月度業績讀取失敗: %s", e)
|
||||
return {"revenue": 0, "sku_count": 0, "order_count": 0,
|
||||
"avg_order_value": 0, "mom_pct": 0, "yoy_pct": 0}
|
||||
logger.error("[OpenClaw] 月度業績讀取失敗: %s", e)
|
||||
raise
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
@@ -1221,8 +1609,8 @@ def _fetch_price_trend_summary(days: int = 30) -> Dict[str, Any]:
|
||||
}
|
||||
return {"price_changes": 0, "avg_change_pct": 0, "price_cuts": 0, "price_raises": 0}
|
||||
except Exception as e:
|
||||
logger.warning("[OpenClaw] 價格趨勢統計讀取失敗: %s", e)
|
||||
return {"price_changes": 0, "avg_change_pct": 0, "price_cuts": 0, "price_raises": 0}
|
||||
logger.error("[OpenClaw] 價格趨勢統計讀取失敗: %s", e)
|
||||
raise
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user