diff --git a/services/chart_generator_service.py b/services/chart_generator_service.py index cdc8d34..adc4f38 100644 --- a/services/chart_generator_service.py +++ b/services/chart_generator_service.py @@ -129,7 +129,7 @@ def _fetch_daily_revenue(days: int = 30) -> List[Dict]: try: rows = session.execute(text(f""" SELECT snapshot_date::date AS dt, - SUM(COALESCE("銷售金額"::numeric, 0)) AS revenue + SUM(COALESCE("總業績"::numeric, 0)) AS revenue FROM daily_sales_snapshot WHERE snapshot_date::date >= CURRENT_DATE - {days} GROUP BY dt ORDER BY dt @@ -150,7 +150,7 @@ def _fetch_category_revenue(days: int = 7) -> List[Dict]: try: rows = session.execute(text(f""" SELECT p.category, - SUM(COALESCE(s."銷售金額"::numeric, 0)) AS revenue + SUM(COALESCE(s."總業績"::numeric, 0)) AS revenue FROM daily_sales_snapshot s JOIN products p ON p.name = s."商品名稱" WHERE s.snapshot_date::date >= CURRENT_DATE - {days} @@ -175,7 +175,7 @@ def _fetch_monthly_revenue(months: int = 6) -> List[Dict]: try: rows = session.execute(text(f""" SELECT DATE_TRUNC('month', snapshot_date)::date AS mon, - SUM(COALESCE("銷售金額"::numeric, 0)) AS revenue + SUM(COALESCE("總業績"::numeric, 0)) AS revenue FROM daily_sales_snapshot WHERE snapshot_date >= NOW() - INTERVAL '{months} months' GROUP BY mon ORDER BY mon diff --git a/services/elephant_alpha_autonomous_engine.py b/services/elephant_alpha_autonomous_engine.py index ed650a0..7e2f8b1 100644 --- a/services/elephant_alpha_autonomous_engine.py +++ b/services/elephant_alpha_autonomous_engine.py @@ -879,7 +879,11 @@ class ElephantAlphaAutonomousEngine: session.close() def _get_system_load_percentage(self) -> float: - return 45.0 + try: + import psutil + return float(psutil.cpu_percent(interval=0.1)) + except ImportError: + return min(90.0, float(self._get_action_queue_size() * 5.0)) @staticmethod async def _run_with_timeout(coro, *args, timeout: int = 30, **kwargs): diff --git a/services/openclaw_strategist_service.py b/services/openclaw_strategist_service.py index 16e272d..2417a0f 100644 --- a/services/openclaw_strategist_service.py +++ b/services/openclaw_strategist_service.py @@ -29,7 +29,7 @@ from datetime import datetime, timedelta from typing import Any, Dict, List, Optional from database.manager import get_session -from sqlalchemy import text +from sqlalchemy import bindparam, text logger = logging.getLogger(__name__) @@ -105,10 +105,16 @@ def _fetch_sales_summary(days: int = 14) -> Dict[str, Any]: """近 N 天業績彙總(本期 / 前期 對比)""" session = get_session() try: + max_date_row = session.execute(text("SELECT MAX(snapshot_date::date) FROM daily_sales_snapshot")).fetchone() + max_date = max_date_row[0] if max_date_row and max_date_row[0] else None + + if not max_date or max_date < (datetime.now().date() - timedelta(days=2)): + return {"stale": True, "last_date": str(max_date) if max_date else "None"} + rows = session.execute(text(""" SELECT snapshot_date::date AS dt, - SUM(COALESCE("銷售金額"::numeric, 0)) AS revenue, + SUM(COALESCE("總業績"::numeric, 0)) AS revenue, COUNT(DISTINCT "商品ID") AS sku_count FROM daily_sales_snapshot WHERE snapshot_date::date >= CURRENT_DATE - :days @@ -131,8 +137,8 @@ def _fetch_sales_summary(days: int = 14) -> Dict[str, Any]: "wow_pct": round(wow, 1), } except Exception as e: - logger.warning("[OpenClaw] 業績數據讀取失敗: %s", e) - return {} + logger.error("[OpenClaw] 業績數據讀取失敗: %s", e) + raise finally: session.close() @@ -169,8 +175,8 @@ def _fetch_top_threats(limit: int = 10) -> List[Dict]: }) return result except Exception as e: - logger.warning("[OpenClaw] 威脅數據讀取失敗: %s", e) - return [] + logger.error("[OpenClaw] 威脅數據讀取失敗: %s", e) + raise finally: session.close() @@ -193,8 +199,8 @@ def _fetch_top_recommendations(limit: int = 10) -> List[Dict]: r )) for r in rows] except Exception as e: - logger.warning("[OpenClaw] 建議數據讀取失敗: %s", e) - return [] + logger.error("[OpenClaw] 建議數據讀取失敗: %s", e) + raise finally: session.close() @@ -205,7 +211,7 @@ def _fetch_category_breakdown(days: int = 7) -> List[Dict]: try: rows = session.execute(text(""" SELECT p.category, - SUM(COALESCE(s."銷售金額"::numeric, 0)) AS revenue, + SUM(COALESCE(s."總業績"::numeric, 0)) AS revenue, COUNT(DISTINCT p.i_code) AS sku_count FROM daily_sales_snapshot s JOIN products p ON p.name = s."商品名稱" @@ -218,8 +224,8 @@ def _fetch_category_breakdown(days: int = 7) -> List[Dict]: return [{"category": r[0], "revenue": float(r[1] or 0), "sku_count": int(r[2] or 0)} for r in rows] except Exception as e: - logger.warning("[OpenClaw] 品類數據讀取失敗: %s", e) - return [] + logger.error("[OpenClaw] 品類數據讀取失敗: %s", e) + raise finally: session.close() @@ -251,8 +257,8 @@ def _fetch_competitor_summary() -> Dict[str, Any]: } return {} except Exception as e: - logger.warning("[OpenClaw] 競品概況讀取失敗: %s", e) - return {} + logger.error("[OpenClaw] 競品概況讀取失敗: %s", e) + raise finally: session.close() @@ -302,6 +308,239 @@ def _save_to_ai_insights( session.close() +def _find_existing_weekly_strategy( + period: str, + sent_only: bool = False, +) -> Optional[Dict[str, Any]]: + """查詢同一週期最新已啟用週報(不重複生成)。 + + `sent_only` 主要保留相容性;舊邏輯曾依 telegram_sent 去阻擋重複推播, + 現在改為只取最新 active/approved 記錄,避免「內容已存在仍重打」。 + """ + session = get_session() + try: + row = session.execute(text(""" + SELECT id, created_at + FROM ai_insights + WHERE insight_type = 'weekly_strategy' + AND created_by = 'openclaw' + AND period = :period + AND status IN ('active', 'approved') + ORDER BY created_at DESC + LIMIT 1 + """), {"period": period}).fetchone() + if not row: + return None + return {"id": row[0], "created_at": row[1]} + except Exception as e: + logger.warning("[OpenClaw] 週報去重查詢失敗 period=%s: %s", period, e) + return None + finally: + session.close() + + +def _load_weekly_strategy_payload(period: str) -> Optional[Dict[str, Any]]: + """載入同一週期最新已啟用週報正文與 metadata(供重用/直接回傳)。""" + session = get_session() + try: + row = session.execute(text(""" + SELECT id, content, metadata_json, created_at + FROM ai_insights + WHERE insight_type = 'weekly_strategy' + AND created_by = 'openclaw' + AND period = :period + AND status IN ('active', 'approved') + ORDER BY created_at DESC + LIMIT 1 + """), {"period": period}).fetchone() + if not row: + return None + meta = _normalize_weekly_strategy_metadata(row[2]) + return {"id": row[0], "content": row[1], "metadata": meta, "created_at": row[3]} + except Exception as e: + logger.warning("[OpenClaw] 週報載入失敗 period=%s: %s", period, e) + return None + finally: + session.close() + + +def _normalize_weekly_strategy_metadata(raw_meta: Any) -> Dict[str, Any]: + """將 ai_insights metadata 轉成 dict,並補入預設欄位避免型別錯誤。""" + meta = raw_meta or {} + if isinstance(meta, str): + try: + meta = json.loads(meta) + except Exception: + meta = {} + if not isinstance(meta, dict): + meta = {} + if "telegram_sent" not in meta: + meta["telegram_sent"] = False + if "telegram_sending" not in meta: + meta["telegram_sending"] = False + return meta + + +def _set_weekly_strategy_metadata(insight_id: int, metadata: Dict[str, Any]) -> bool: + """以 metadata 全量覆寫指定週報記錄,並回傳是否寫入成功。""" + if not insight_id: + return False + session = get_session() + try: + session.execute( + text(""" + UPDATE ai_insights + SET metadata_json = :metadata + WHERE id = :id + """), + {"id": insight_id, "metadata": json.dumps(metadata, ensure_ascii=False)}, + ) + session.commit() + return True + except Exception as e: + logger.warning("[OpenClaw] 更新週報 metadata 失敗 insight_id=%s: %s", insight_id, e) + session.rollback() + return False + finally: + session.close() + + +def _set_weekly_strategy_telegram_locked(insight_id: int, *, telegram_sent: Optional[bool] = None, + telegram_sending: Optional[bool] = None, sent_at: Optional[datetime] = None) -> bool: + """更新既有週報的發送狀態欄位(telegram_sent / telegram_sending)。""" + if not insight_id: + return False + session = get_session() + try: + row = session.execute( + text("SELECT metadata_json FROM ai_insights WHERE id = :id"), + {"id": insight_id}, + ).fetchone() + if not row: + return False + + meta = _normalize_weekly_strategy_metadata(row[0]) + if telegram_sent is not None: + meta["telegram_sent"] = bool(telegram_sent) + if telegram_sending is not None: + meta["telegram_sending"] = bool(telegram_sending) + if sent_at is None and telegram_sent: + meta.pop("telegram_sent_at", None) + elif sent_at is not None: + meta["telegram_sent_at"] = sent_at.isoformat() + + session.execute( + text(""" + UPDATE ai_insights + SET metadata_json = :metadata + WHERE id = :id + """), + {"id": insight_id, "metadata": json.dumps(meta, ensure_ascii=False)}, + ) + session.commit() + return True + except Exception as e: + logger.warning("[OpenClaw] 更新週報 telegram metadata 失敗 insight_id=%s: %s", insight_id, e) + session.rollback() + return False + finally: + session.close() + + +def _acquire_weekly_strategy_send_lock(insight_id: int) -> bool: + """嘗試取得週報 Telegram 發送鎖。 + + 若該筆已標記發送中或已發送,回傳 False。 + """ + if not insight_id: + return False + session = get_session() + try: + row = session.execute( + text("SELECT metadata_json FROM ai_insights WHERE id = :id FOR UPDATE"), + {"id": insight_id}, + ).fetchone() + if not row: + return False + + meta = _normalize_weekly_strategy_metadata(row[0]) + if bool(meta.get("telegram_sending")) or bool(meta.get("telegram_sent")): + return False + + meta["telegram_sending"] = True + meta["telegram_sent"] = False + session.execute( + text(""" + UPDATE ai_insights + SET metadata_json = :metadata + WHERE id = :id + """), + {"id": insight_id, "metadata": json.dumps(meta, ensure_ascii=False)}, + ) + session.commit() + return True + except Exception as e: + logger.warning("[OpenClaw] 取得週報 telegram 發送鎖失敗 insight_id=%s: %s", insight_id, e) + session.rollback() + return False + finally: + session.close() + + +def _set_weekly_strategy_telegram_sent(insight_id: int) -> None: + """更新已儲存週報的 telegram_sent 狀態,避免再次重複發送。""" + _set_weekly_strategy_telegram_locked( + insight_id, + telegram_sending=False, + telegram_sent=True, + sent_at=datetime.now(), + ) + + +def _consolidate_weekly_strategy_records(period: str) -> Dict[str, int]: + """同一週保留最新一筆,將舊重複紀錄標示為 superseded(保留內容)。""" + session = get_session() + kept_id = None + superseded_count = 0 + total_count = 0 + try: + rows = session.execute(text(""" + SELECT id, created_at + FROM ai_insights + WHERE insight_type = 'weekly_strategy' + AND created_by = 'openclaw' + AND period = :period + ORDER BY created_at DESC, id DESC + """), {"period": period}).fetchall() + total_count = len(rows) + if total_count <= 1: + return {"period": period, "total_count": total_count, "kept_id": None, "superseded_count": 0} + + kept_id = rows[0][0] + old_ids = [int(r[0]) for r in rows[1:]] + if old_ids: + res = session.execute(text(""" + UPDATE ai_insights + SET status = 'superseded' + WHERE id IN :ids + AND status IN ('active', 'approved') + """).bindparams(bindparam("ids", expanding=True)), {"ids": old_ids}) + superseded_count = int(getattr(res, "rowcount", 0) or 0) + session.commit() + return { + "period": period, + "total_count": total_count, + "kept_id": kept_id, + "superseded_count": superseded_count, + } + except Exception as e: + logger.warning("[OpenClaw] 週報 dedupe 失敗 period=%s: %s", period, e) + session.rollback() + return {"period": period, "total_count": total_count, "kept_id": kept_id, "superseded_count": superseded_count} + finally: + session.close() + + def _save_action_items(actions: List[str], source_insight_id: Optional[int]) -> None: """將 AI 建議的行動項目寫入 action_plans""" if not actions: @@ -390,7 +629,8 @@ def _call_nvidia_nim(system_prompt: str, user_prompt: str, temperature: float = # Telegram 推播 # ═══════════════════════════════════════════════════════════════════════════════ -def _send_strategy_telegram(title: str, report_type: str, period: str, content: str) -> None: +def _send_strategy_telegram(title: str, report_type: str, period: str, content: str) -> bool: + """發送週報到 Telegram。成功時回傳 True。""" try: from services.telegram_templates import report as tpl_report, _send_telegram_raw @@ -401,8 +641,10 @@ def _send_strategy_telegram(title: str, report_type: str, period: str, content: for i, chunk in enumerate(chunks): msg = tpl_report(title, report_type, period, chunk) if i == 0 else chunk _send_telegram_raw(msg) + return True except Exception as e: logger.error("[OpenClaw] Telegram 推播失敗: %s", e) + return False def _split_message(text: str, max_len: int = 3800) -> List[str]: @@ -422,6 +664,7 @@ def _split_message(text: str, max_len: int = 3800) -> List[str]: def generate_weekly_strategy_report( context: Optional[Any] = None, force_tg_alert: bool = False, + force_generate: bool = False, ) -> dict: """ OpenClaw 全景電商週報(每週一 06:00) @@ -435,10 +678,115 @@ def generate_weekly_strategy_report( """ now = datetime.now() period = f"{now.strftime('%Y年第%W週')} ({now.strftime('%m/%d')})" + period_key = now.strftime("%Y-%W") logger.info("[OpenClaw] 週報任務啟動 period=%s", period) + existing = _load_weekly_strategy_payload(period_key) + if existing and not force_generate: + # 已有同週報告則沿用既有內容,不再重新呼叫 Gemini + sent_metadata = bool(existing.get("metadata", {}).get("telegram_sent")) + sending_metadata = bool(existing.get("metadata", {}).get("telegram_sending")) + if force_tg_alert: + if sending_metadata: + logger.info( + "[OpenClaw] 本週週報正在發送中,略過重複推播 period=%s insight_id=%s", + period_key, + existing["id"], + ) + return { + "status": "skipped", + "report_type": "weekly_strategy", + "reason": "weekly_strategy_send_in_progress", + "insight_id": existing["id"], + "period": period, + } + + if not sent_metadata and existing.get("content"): + if _acquire_weekly_strategy_send_lock(existing["id"]): + send_ok = _send_strategy_telegram( + title="OpenClaw 電商全景週報", + report_type="weekly_strategy", + period=period, + content=existing["content"], + ) + if send_ok: + _set_weekly_strategy_telegram_locked( + existing["id"], + telegram_sent=True, + telegram_sending=False, + sent_at=datetime.now(), + ) + return { + "status": "sent", + "report_type": "weekly_strategy", + "reason": "weekly_strategy_reused_from_cache", + "insight_id": existing["id"], + "period": period, + } + + _set_weekly_strategy_telegram_locked( + existing["id"], + telegram_sent=False, + telegram_sending=False, + ) + return { + "status": "error", + "report_type": "weekly_strategy", + "reason": "weekly_strategy_send_failed", + "insight_id": existing["id"], + "period": period, + } + + logger.warning( + "[OpenClaw] 取得週報發送鎖失敗 period=%s insight_id=%s", + period_key, + existing["id"], + ) + return { + "status": "skipped", + "report_type": "weekly_strategy", + "reason": "weekly_strategy_send_in_progress", + "insight_id": existing["id"], + "period": period, + } + + logger.info( + "[OpenClaw] 本週週報已存在且已發送,跳過重複推播 period=%s insight_id=%s", + period_key, + existing["id"], + ) + return { + "status": "skipped", + "report_type": "weekly_strategy", + "reason": "weekly_strategy_already_generated", + "insight_id": existing["id"], + "period": period, + } + + logger.info( + "[OpenClaw] 本週週報已存在,跳過重複產生 period=%s insight_id=%s", + period_key, + existing["id"], + ) + return { + "status": "skipped", + "report_type": "weekly_strategy", + "reason": "weekly_strategy_already_generated", + "insight_id": existing["id"], + "period": period, + } + # ── Step 1:DB 數據收集 ────────────────────────────────────────────────── sales = _fetch_sales_summary(14) + if sales.get("stale"): + msg = f"⚠️ [資料停更告警] daily_sales_snapshot 最後更新為 {sales.get('last_date')},請檢查人工上傳流程。" + try: + from services.telegram_templates import _send_telegram_raw + _send_telegram_raw(msg) + except Exception: + pass + return {"status": "error", "reason": "data_stale"} + threats = _fetch_top_threats(10) recommendations = _fetch_top_recommendations(10) categories = _fetch_category_breakdown(7) @@ -574,24 +922,65 @@ TOP 威脅品項(近48h Hermes 偵測): "mcp_topics_collected": sum(1 for v in mcp_data.values() if v), "action_count": len(action_items), "generated_at": now.isoformat(), + "telegram_sent": False, + "telegram_sending": False, } insight_id = _save_to_ai_insights( insight_type="weekly_strategy", content=report_content, confidence=0.88, metadata=metadata, - period=now.strftime("%Y-%W"), + period=period_key, ) _save_action_items(action_items, insight_id) + _consolidate_weekly_strategy_records(period_key) # ── Step 7:Telegram 推播 ──────────────────────────────────────────────── - if force_tg_alert or True: - _send_strategy_telegram( - title="OpenClaw 電商全景週報", - report_type="weekly_strategy", - period=period, - content=report_content, - ) + if force_tg_alert: + latest_payload = _load_weekly_strategy_payload(period_key) + send_target_id = latest_payload["id"] if latest_payload else insight_id + send_content = latest_payload["content"] if latest_payload else report_content + + if _acquire_weekly_strategy_send_lock(send_target_id): + send_ok = _send_strategy_telegram( + title="OpenClaw 電商全景週報", + report_type="weekly_strategy", + period=period, + content=send_content, + ) + if send_ok: + _set_weekly_strategy_telegram_locked( + send_target_id, + telegram_sent=True, + telegram_sending=False, + sent_at=datetime.now(), + ) + else: + _set_weekly_strategy_telegram_locked( + send_target_id, + telegram_sent=False, + telegram_sending=False, + ) + return { + "status": "error", + "report_type": "weekly_strategy", + "reason": "weekly_strategy_send_failed", + "insight_id": send_target_id, + "period": period, + } + else: + logger.info( + "[OpenClaw] 本週週報發送已被其他執行緒持有,跳過推播 period=%s latest_id=%s", + period_key, + send_target_id, + ) + return { + "status": "skipped", + "report_type": "weekly_strategy", + "reason": "weekly_strategy_send_in_progress", + "insight_id": send_target_id, + "period": period, + } logger.info("[OpenClaw] 週報完成 insight_id=%s actions=%d", insight_id, len(action_items)) return { @@ -637,7 +1026,7 @@ def generate_daily_report() -> dict: user_prompt = f"""請根據以下數據,產出今日電商日報({period}): 【昨日業績】 - 銷售金額:NT${yesterday_sales.get('revenue', 0):,.0f} + 總業績:NT${yesterday_sales.get('revenue', 0):,.0f} 成交SKU數:{yesterday_sales.get('sku_count', 0)} 個 訂單數:{yesterday_sales.get('order_count', 0)} 筆 @@ -1116,7 +1505,7 @@ def _fetch_yesterday_sales() -> Dict[str, Any]: try: row = session.execute(text(""" SELECT - SUM(COALESCE("銷售金額"::numeric, 0)) AS revenue, + SUM(COALESCE("總業績"::numeric, 0)) AS revenue, COUNT(DISTINCT "商品ID") AS sku_count, COUNT(*) AS order_count FROM daily_sales_snapshot @@ -1130,8 +1519,8 @@ def _fetch_yesterday_sales() -> Dict[str, Any]: } return {"revenue": 0, "sku_count": 0, "order_count": 0} except Exception as e: - logger.warning("[OpenClaw] 昨日業績讀取失敗: %s", e) - return {"revenue": 0, "sku_count": 0, "order_count": 0} + logger.error("[OpenClaw] 昨日業績讀取失敗: %s", e) + raise finally: session.close() @@ -1142,10 +1531,10 @@ def _fetch_monthly_sales_summary(start_date: datetime, end_date: datetime) -> Di try: row = session.execute(text(""" SELECT - SUM(COALESCE("銷售金額"::numeric, 0)) AS revenue, + SUM(COALESCE("總業績"::numeric, 0)) AS revenue, COUNT(DISTINCT "商品ID") AS sku_count, COUNT(*) AS order_count, - AVG(COALESCE("銷售金額"::numeric, 0)) AS avg_order_value + AVG(COALESCE("總業績"::numeric, 0)) AS avg_order_value FROM daily_sales_snapshot WHERE snapshot_date::date BETWEEN :start AND :end """), {"start": start_date.date(), "end": end_date.date()}).fetchone() @@ -1158,7 +1547,7 @@ def _fetch_monthly_sales_summary(start_date: datetime, end_date: datetime) -> Di prev_start = (start_date - timedelta(days=1)).replace(day=1) prev_end = start_date - timedelta(days=1) prev_row = session.execute(text(""" - SELECT SUM(COALESCE("銷售金額"::numeric, 0)) AS revenue + SELECT SUM(COALESCE("總業績"::numeric, 0)) AS revenue FROM daily_sales_snapshot WHERE snapshot_date::date BETWEEN :start AND :end """), {"start": prev_start.date(), "end": prev_end.date()}).fetchone() @@ -1169,7 +1558,7 @@ def _fetch_monthly_sales_summary(start_date: datetime, end_date: datetime) -> Di yoy_start = start_date.replace(year=start_date.year - 1) yoy_end = end_date.replace(year=end_date.year - 1) yoy_row = session.execute(text(""" - SELECT SUM(COALESCE("銷售金額"::numeric, 0)) AS revenue + SELECT SUM(COALESCE("總業績"::numeric, 0)) AS revenue FROM daily_sales_snapshot WHERE snapshot_date::date BETWEEN :start AND :end """), {"start": yoy_start.date(), "end": yoy_end.date()}).fetchone() @@ -1185,9 +1574,8 @@ def _fetch_monthly_sales_summary(start_date: datetime, end_date: datetime) -> Di "yoy_pct": round(yoy_pct, 1), } except Exception as e: - logger.warning("[OpenClaw] 月度業績讀取失敗: %s", e) - return {"revenue": 0, "sku_count": 0, "order_count": 0, - "avg_order_value": 0, "mom_pct": 0, "yoy_pct": 0} + logger.error("[OpenClaw] 月度業績讀取失敗: %s", e) + raise finally: session.close() @@ -1221,8 +1609,8 @@ def _fetch_price_trend_summary(days: int = 30) -> Dict[str, Any]: } return {"price_changes": 0, "avg_change_pct": 0, "price_cuts": 0, "price_raises": 0} except Exception as e: - logger.warning("[OpenClaw] 價格趨勢統計讀取失敗: %s", e) - return {"price_changes": 0, "avg_change_pct": 0, "price_cuts": 0, "price_raises": 0} + logger.error("[OpenClaw] 價格趨勢統計讀取失敗: %s", e) + raise finally: session.close()