Files
ewoooc/services/openclaw_strategist_service.py
OoO d6d8777e41
All checks were successful
CD Pipeline / deploy (push) Successful in 1m12s
V10.601 收斂 Gemini 與密鑰治理
2026-06-06 14:52:46 +08:00

2962 lines
120 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
services/openclaw_strategist_service.py
OpenClaw 戰略分析師Ollama-firstGemini 僅備援)
完整電商情報分析管線:
DB 爬蟲數據 + MCP 外部情報 → Ollama 三主機級聯 → Gemini 備援 → ai_insights 持久化 → Telegram 推播
提供:
generate_weekly_strategy_report() — 週報(每週一 06:00
generate_meta_analysis_report() — AI 系統效能自我審視(每日 12:00, Phase 4 降頻)
分析維度:
1. 業績趨勢MoM / WoW
2. 競品價格比對
3. 定價策略建議
4. 行銷活動洞察
5. 季節性 / 節日機會
6. TOP 威脅 / 機會品項
7. 具體行動清單48h 優先事項)
"""
import json
import logging
import os
import uuid
import requests
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional
from database.manager import get_session
from sqlalchemy import bindparam, text
from services.ai_call_logger import log_ai_call # Operation Ollama-First v5.0 P1
from services.gemini_guard import (
gemini_disabled_message,
get_gemini_api_key,
is_gemini_fallback_enabled,
)
from services.action_plan_dedupe import (
active_openclaw_recommendation_exists,
openclaw_action_metadata,
)
from services.rag_service import rag_service, is_rag_enabled # Phase 11 RAG-firstQ&A 限定)
logger = logging.getLogger(__name__)
# Gemini 不可作為 OpenClaw 通用主路徑;所有週/月/meta/日報洞察都先走
# OllamaService 的 GCP-A → GCP-B → 111 級聯Gemini 僅作最後備援。
STRATEGY_MODEL = os.getenv("OPENCLAW_MODEL", "gemini-2.5-flash")
NVIDIA_API_KEY = os.getenv("NVIDIA_API_KEY", "")
NVIDIA_NIM_URL = "https://integrate.api.nvidia.com/v1/chat/completions"
NVIDIA_FALLBACK_MODEL = "meta/llama-3.3-70b-instruct"
TAIPEI_TZ_OFFSET = 8 # UTC+8
# ──────────────────────────────────────────────────────────────────────────────
# Operation Ollama-First v5.0 — Gemini 僅 fallback
# - OPENCLAW_QA_OLLAMA_FIRST: 相容舊 envfalse 只記 warning不再允許 Gemini-first
# - OPENCLAW_QA_OLLAMA_MODEL: GCP Ollama 上的模型 tagA2 推薦 qwen3:14b9.3GB
# - OPENCLAW_QA_OLLAMA_TIMEOUT: 單次 Ollama 呼叫超時(秒),低品質判定後仍會升級 Gemini
# OpenClaw Q&A 不提供單 caller host override主機必須統一走 OllamaService 的
# GCP-A → GCP-B → 111 三主機級聯,避免 Telegram Q&A 被固定在單一 GCP 節點。
# 任何 deploy 不開 flag → Ollama-first緊急時才顯式設 false 回 legacy。
# ──────────────────────────────────────────────────────────────────────────────
def _qa_ollama_first_enabled() -> bool:
"""每次呼叫即時讀環境變數,允許 runtime toggle 灰度。"""
# 統帥 2026-05-03 23:30 指令:「都是要先以免費的優先!最後才是 Gemini」
# 預設 ONOllama (qwen3:14b @ GCP Primary 已拉) 為主Gemini 為品質低時 fallback
# 緊急停用export OPENCLAW_QA_OLLAMA_FIRST=false
return os.getenv('OPENCLAW_QA_OLLAMA_FIRST', 'true').strip().lower() in ('true', '1', 'yes', 'on')
OPENCLAW_QA_OLLAMA_MODEL = os.getenv('OPENCLAW_QA_OLLAMA_MODEL', 'qwen3:14b')
OPENCLAW_QA_OLLAMA_TIMEOUT = int(os.getenv('OPENCLAW_QA_OLLAMA_TIMEOUT', '60'))
OPENCLAW_STRATEGY_OLLAMA_MODEL = os.getenv(
'OPENCLAW_STRATEGY_OLLAMA_MODEL',
os.getenv('OPENCLAW_OLLAMA_MODEL', 'qwen2.5-coder:7b'),
)
OPENCLAW_STRATEGY_OLLAMA_TIMEOUT = int(os.getenv('OPENCLAW_STRATEGY_OLLAMA_TIMEOUT', '120'))
OPENCLAW_STRATEGY_OLLAMA_NUM_PREDICT = int(os.getenv('OPENCLAW_STRATEGY_OLLAMA_NUM_PREDICT', '2048'))
OPENCLAW_STRATEGY_OLLAMA_KEEP_ALIVE = os.getenv('OPENCLAW_STRATEGY_OLLAMA_KEEP_ALIVE', '5m')
# 繁體中文強制 system promptA2 黃燈警訊「Qwen 繁中短板」緩解策略)
QWEN3_TC_SYSTEM_PROMPT = """你是 momo 電商情報分析師「OpenClaw」。
【硬性規則】
1. 必須使用繁體中文(台灣用語),絕對禁止簡體字、大陸用語(例:寫「資料」不寫「数据」、寫「軟體」不寫「软件」)
2. 商品/品牌名稱保留原文不翻譯(如 momo / PChome / 蝦皮 / 全家)
3. 數字與貨幣保留原貌NT$、%、件數、月份)
4. 若資料不足無法回答,明確說「資料不足,建議改問 ___」而非編造
【輸出風格】
- 直接回答,不要「以下是分析」開場白
- 結構化:用條列、表格、編號
- 控制在 300 字以內,除非統帥明確要求展開
"""
# 簡體字偵測樣本A2 報告警訊核心檢查項;列出商業中文情境最常被簡體污染的單字
# 注意:避免列「於」「与」這類兩岸通用字;只取明確簡繁字差
_SIMPLIFIED_HINT_CHARS = frozenset([
# 商業/科技高頻簡繁差字(每字繁體對照於註解)
'', # 設
'', # 當
'', # 點
'', # 問
'', # 獲
'', # 為
'', # 麼
'', # 資
'', # 產
'', # 業
'', # 務
'', # 說
'', # 聽
'', # 關
'', # 詞
'', # 這
'', # 過
'', # 讓
'', # 應
'亿', # 億
'', # 請
'', # 觀
'', # 戰
'', # 體
'', # 價
'', # 場
'', # 動
'', # 號
'', # 團
'', # 類
'广', # 廣
'', # 處
'', # 執
'', # 決
'', # 約
'', # 級
'', # 態
'', # 勢
'', # 運
'', # 營
])
# 拒答訊號:模型表達「無法回答」即視為低品質
_REFUSAL_PATTERNS = (
'無法回答', '無法回覆', '我不知道', '我無從', "I cannot",
"I don't know", '抱歉,我無法', '抱歉,我無法',
'需要更多資訊', '需要更多信息', '無相關資料',
)
__all__ = [
"generate_daily_report",
"generate_weekly_strategy_report",
"generate_monthly_report",
"generate_meta_analysis_report",
"generate_strategy_response",
]
# ═══════════════════════════════════════════════════════════════════════════════
# Telegram NLP 互動入口(輕量查詢,不走完整報告管線)
# ═══════════════════════════════════════════════════════════════════════════════
def generate_strategy_response(query: str, context: Optional[Dict[str, Any]] = None) -> str:
"""給 Telegram NLP 使用的輕量策略回覆。
Contract:
query: 使用者自然語言訊息(繁體中文)
context: 可選,{"intent": str, "user_id": int, ...}
Returns:
繁體中文回覆字串。所有 LLM 失敗時回降級訊息(永遠回字串、不拋例外)。
路由Operation Ollama-First v5.0 — Phase 3:
GCP-A/GCP-B/111 Ollama qwen3 → 品質檢測 → fallback Gemini → NIM。
OPENCLAW_QA_OLLAMA_FIRST=false 已不再允許 Gemini-first只保留 warning 相容舊設定。
"""
q = (query or "").strip()
if not q:
return "請輸入您的問題,例如:本週業績趨勢、競品價差分析、產出週報 PPT。"
request_id = f"qa-{uuid.uuid4().hex[:8]}"
# ── Phase 11 RAG-firstfeature flag 預設 OFF只在 Q&A 入口接,週月年報不接)──
# 高信心 RAG 命中 → 直接回 ai_insights 內容,避免 LLM 呼叫
# 低信心或 flag OFF → 走後續 Ollama-first / fallback 路徑
if is_rag_enabled():
try:
rag = rag_service.query(
text=q, caller='openclaw_qa',
threshold=0.85, request_id=request_id,
mark_saved_call=True,
)
if rag.has_high_confidence:
logger.info(
"[OpenClaw][QA] RAG hit request_id=%s top_score=%.3f → 跳過 LLM",
request_id, rag.hits[0].get('score', 0),
)
return rag.synthesize()
except Exception as exc:
logger.warning("[OpenClaw][QA] RAG query failed (%s), fallback LLM", exc)
# ── 主路徑:永遠 Ollama 優先。舊 flag=false 不再允許 Gemini-first。──
if not _qa_ollama_first_enabled():
logger.warning(
"[OpenClaw][QA] OPENCLAW_QA_OLLAMA_FIRST=false 已被忽略Gemini 僅作 Ollama 失敗備援"
)
ollama_reply = _call_qwen3_qa(q, context, request_id)
if ollama_reply and not _is_low_quality_response(ollama_reply):
return ollama_reply
# 品質守門失敗或 Ollama 離線 → 降級 Geminifallback_to 已於 _call_qwen3_qa 內標記)
logger.info(
"[OpenClaw][QA] Ollama 主路徑未通過無回應或低品質fallback Geminirequest_id=%s",
request_id,
)
# ── 備援路徑Gemini → NIMA4 已接 ai_call_logger──
return _legacy_gemini_first_qa(q, context, request_id=request_id)
def _legacy_gemini_first_qa(
q: str,
context: Optional[Dict[str, Any]],
request_id: Optional[str] = None,
) -> str:
"""Gemini 備援/緊急退路;正常情況只在 Ollama 主路徑失敗後使用。"""
system_prompt = (
"你是 MOMO Pro 電商情報策略師「OpenClaw」。以繁體中文台灣用語回覆使用者。"
"嚴禁簡體字,嚴禁空洞套話。若使用者要求的資料需即時查詢,"
"請告知使用者相關可用指令(例如 /daily、/weekly、/threats"
"回覆長度控制在 500 字內,可用 Markdown 條列。"
)
user_prompt = f"使用者問題:{q}\n上下文:{json.dumps(context or {}, ensure_ascii=False)}"
# Gemini 只在中央 guard 顯式解鎖時可用;無 key 或失敗時自動備援 NVIDIA NIM。
text_reply = None
if get_gemini_api_key("openclaw_strategy"):
try:
text_reply = _call_gemini(
system_prompt,
user_prompt,
temperature=0.5,
caller="openclaw_qa_gemini_fallback",
)
except Exception as e:
logger.warning("[OpenClaw] Gemini 呼叫失敗,備援 NVIDIA NIM%s", e)
if not text_reply and NVIDIA_API_KEY:
try:
text_reply = _call_nvidia_nim(system_prompt, user_prompt, caller="openclaw_qa")
except Exception as e:
logger.error("[OpenClaw] NVIDIA NIM 備援也失敗:%s", e)
if not text_reply:
return (
"策略師暫時無法回覆Gemini 與 NVIDIA NIM 均離線)。\n"
"請改用:/daily、/weekly、/threats 取得結構化報告。"
)
return text_reply
# ──────────────────────────────────────────────────────────────────────────────
# Phase 3 — Ollama Q&A 路徑 + 品質守門
# ──────────────────────────────────────────────────────────────────────────────
def _call_qwen3_qa(
question: str,
context: Optional[Dict[str, Any]],
request_id: str,
) -> Optional[str]:
"""呼叫 GCP Ollama 上的 qwen3:14b或環境變數指定的模型回答 Telegram QA。
回傳 None 表示「呼叫失敗或回空」,呼叫端會自動 fallback Gemini。
本函式不負責品質判定(呼叫端用 `_is_low_quality_response` 判,避免邏輯耦合)。
全程包在 `log_ai_call` context manager失敗時 set_error + fallback_to_caller。
"""
user_prompt = (
f"使用者問題:{question}\n"
f"上下文:{json.dumps(context or {}, ensure_ascii=False)}"
)
with log_ai_call(
caller='openclaw_qa',
provider='gcp_ollama',
model=OPENCLAW_QA_OLLAMA_MODEL,
request_id=request_id,
meta={
'flag': 'OPENCLAW_QA_OLLAMA_FIRST',
'route': 'ollama_first',
'temperature': 0.5,
},
) as ctx:
try:
from services.ollama_service import OllamaService, get_host_label, get_provider_tag
ctx.set_prompt_hash(user_prompt)
ollama = OllamaService(model=OPENCLAW_QA_OLLAMA_MODEL)
resp = ollama.generate(
prompt=user_prompt,
model=OPENCLAW_QA_OLLAMA_MODEL,
system_prompt=QWEN3_TC_SYSTEM_PROMPT,
temperature=0.5,
timeout=OPENCLAW_QA_OLLAMA_TIMEOUT,
)
actual_provider = get_provider_tag(resp.host or '')
ctx.set_provider(actual_provider)
ctx.set_model(resp.model or OPENCLAW_QA_OLLAMA_MODEL)
ctx.set_tokens(
input=resp.input_tokens,
output=resp.output_tokens,
)
ctx.add_meta('host', resp.host)
ctx.add_meta('host_label', get_host_label(resp.host or ''))
if resp.model and resp.model != OPENCLAW_QA_OLLAMA_MODEL:
ctx.add_meta('requested_model', OPENCLAW_QA_OLLAMA_MODEL)
if not resp.success:
ctx.set_error(resp.error or 'ollama generate failed')
ctx.fallback_to_caller('openclaw_qa_gemini_fallback')
logger.warning(
"[OpenClaw][QA] qwen3 三主機級聯失敗 request_id=%s host=%s: %s",
request_id, resp.host, resp.error,
)
return None
text_reply = (resp.content or '').strip()
if not text_reply:
ctx.set_error('empty_response')
ctx.fallback_to_caller('openclaw_qa_gemini_fallback')
return None
return text_reply
except Exception as e:
logger.warning(
"[OpenClaw][QA] qwen3 級聯呼叫例外 request_id=%s: %s",
request_id, e,
)
ctx.set_error(f"{type(e).__name__}: {str(e)[:200]}")
ctx.fallback_to_caller('openclaw_qa_gemini_fallback')
return None
# 低品質判定常數:避免 magic number 散落於規則裡
_QA_MIN_LENGTH = 50 # 規則 1長度下限
_QA_SIMPLIFIED_THRESHOLD = 3 # 規則 2簡體字數量門檻
_QA_FLOWING_TEXT_LENGTH = 200 # 規則 4「200+ 字無斷行」流水帳判定
def _is_low_quality_response(text: Optional[str]) -> bool:
"""判斷 Ollama 回應品質低,需升級 Gemini。
觸發條件(任一即視為低品質):
1. 空字串或長度 < _QA_MIN_LENGTH 字元
2. 簡體字污染:>= _QA_SIMPLIFIED_THRESHOLD 個簡體 hint 字元A2 黃燈警訊核心檢查)
3. 拒答訊號:包含「無法回答」「我不知道」等模式
4. 結構性差:> _QA_FLOWING_TEXT_LENGTH 字但完全沒有換行(流水帳)
Returns:
True → 低品質,呼叫端應 fallback Gemini
False → 可接受
"""
if not text:
return True
stripped = text.strip()
if len(stripped) < _QA_MIN_LENGTH:
return True
# 規則 2簡體字污染A2 警訊Qwen 繁中短板)
simplified_count = sum(1 for c in stripped if c in _SIMPLIFIED_HINT_CHARS)
if simplified_count >= _QA_SIMPLIFIED_THRESHOLD:
logger.info("[OpenClaw][QA] 低品質:偵測 %d 個簡體字 hint", simplified_count)
return True
# 規則 3拒答訊號
for pattern in _REFUSAL_PATTERNS:
if pattern in stripped:
logger.info("[OpenClaw][QA] 低品質:偵測拒答模式 '%s'", pattern)
return True
# 規則 4結構性 — 200+ 字無斷行 = 流水帳
if len(stripped) > _QA_FLOWING_TEXT_LENGTH and stripped.count('\n') < 1:
logger.info("[OpenClaw][QA] 低品質:%d 字無斷行(流水帳)", len(stripped))
return True
# ─── Phase 17 (2026-05-04)強化規則A2 警訊深化)───
# 規則 5純英文回應繁中問題不該用英文答Qwen 偶有此問題)
han_chars = sum(1 for c in stripped if '' <= c <= '鿿')
if len(stripped) > 80 and han_chars < len(stripped) * 0.3:
logger.info("[OpenClaw][QA] 低品質:中文字元占比 %.1f%% < 30%%(純英文回應)",
100 * han_chars / max(len(stripped), 1))
return True
# 規則 6thinking-mode 漏洞DeepSeek-R1 / Qwen3 reasoning model 偶將
# <think>...</think> 區塊洩漏到輸出,這種訊息不適合給統帥看)
if '<think>' in stripped or '</think>' in stripped:
logger.info("[OpenClaw][QA] 低品質reasoning model thinking 區塊洩漏")
return True
# 規則 7重複片段偵測LLM 卡迴圈時會重複同段話 N 次)
if len(stripped) > 200:
head = stripped[:50]
if stripped.count(head) >= 3:
logger.info("[OpenClaw][QA] 低品質:偵測重複迴圈(前 50 字出現 %d 次)",
stripped.count(head))
return True
# 規則 8佔位符未填充template render 失敗會留 {{var}} / [TODO] 等 markers
placeholder_markers = ['{{', '[todo]', '[TODO]', '{placeholder}', '<待填>', '尚未實作']
if any(m in stripped for m in placeholder_markers):
logger.info("[OpenClaw][QA] 低品質:偵測佔位符 / 未實作標記")
return True
return False
# ═══════════════════════════════════════════════════════════════════════════════
# DB 數據讀取層
# ═══════════════════════════════════════════════════════════════════════════════
def _fetch_sales_summary(days: int = 14) -> Dict[str, Any]:
"""近 N 天業績彙總(本期 / 前期 對比)"""
session = get_session()
try:
max_date_row = session.execute(text("SELECT MAX(snapshot_date::date) FROM daily_sales_snapshot")).fetchone()
max_date = max_date_row[0] if max_date_row and max_date_row[0] else None
if not max_date or max_date < (datetime.now().date() - timedelta(days=2)):
# ADR-019 Phase 2 / critic post-review BLOCKER #9
# stale 分支必須 return 完整 shape避免沒套 stale gate 的上游 caller
# 拿到 0 而靜默產出「NT$0 業績」報告。
# 數值欄位用 None而非 0讓 prompt template 的 `:,.0f` 在誤用時
# raise TypeError → 比靜默 0 更明顯,迫使呼叫端必須 `if sales.get("stale")` 擋下。
return {
"stale": True,
"last_date": str(max_date) if max_date else "None",
"daily": [],
"current_7d_revenue": None,
"prev_7d_revenue": None,
"wow_pct": None,
"sku_count": None,
}
rows = session.execute(text("""
SELECT
snapshot_date::date AS dt,
SUM(COALESCE("總業績"::numeric, 0)) AS revenue,
COUNT(DISTINCT "商品ID") AS sku_count
FROM daily_sales_snapshot
WHERE snapshot_date::date >= CURRENT_DATE - :days
GROUP BY dt
ORDER BY dt DESC
"""), {"days": days}).fetchall()
data = [{"date": str(r[0]), "revenue": float(r[1] or 0), "sku_count": int(r[2] or 0)}
for r in rows]
mid = len(data) // 2
curr_rev = sum(d["revenue"] for d in data[:mid]) if mid else 0
prev_rev = sum(d["revenue"] for d in data[mid:]) if mid else 0
wow = ((curr_rev - prev_rev) / prev_rev * 100) if prev_rev else 0
return {
"daily": data[:7],
"current_7d_revenue": curr_rev,
"prev_7d_revenue": prev_rev,
"wow_pct": round(wow, 1),
}
except Exception as e:
logger.error("[OpenClaw] 業績數據讀取失敗: %s", e)
raise
finally:
session.close()
def _fetch_top_threats(limit: int = 10) -> List[Dict]:
"""最新 TOP N 競價威脅(來自 Hermes 分析)"""
session = get_session()
try:
rows = session.execute(text("""
SELECT product_sku, content, confidence, metadata_json, created_at
FROM ai_insights
WHERE insight_type = 'price_alert'
AND status = 'approved'
AND created_at >= NOW() - INTERVAL '48 hours'
ORDER BY confidence DESC
LIMIT :lim
"""), {"lim": limit}).fetchall()
result = []
for r in rows:
meta = {}
try:
meta = json.loads(r[3]) if r[3] else {}
except Exception:
logger.debug("[OpenClaw] price_alert metadata_json decode failed sku=%s", r[0], exc_info=True)
result.append({
"sku": r[0],
"summary": (r[1] or "")[:200],
"confidence": float(r[2] or 0),
"gap_pct": meta.get("gap_pct", 0),
"sales_delta": meta.get("sales_7d_delta_pct", 0),
"momo_price": meta.get("momo_price"),
"pchome_price": meta.get("pchome_price"),
})
return result
except Exception as e:
logger.error("[OpenClaw] 威脅數據讀取失敗: %s", e)
raise
finally:
session.close()
def _fetch_top_recommendations(limit: int = 10) -> List[Dict]:
"""最新定價建議"""
session = get_session()
try:
rows = session.execute(text("""
SELECT sku, name, reason, strategy, confidence,
momo_price, pchome_price, gap_pct, sales_7d_delta
FROM ai_price_recommendations
WHERE status = 'pending'
AND created_at >= NOW() - INTERVAL '48 hours'
ORDER BY confidence DESC
LIMIT :lim
"""), {"lim": limit}).fetchall()
return [dict(zip(
["sku","name","reason","strategy","confidence","momo_price","pchome_price","gap_pct","sales_delta"],
r
)) for r in rows]
except Exception as e:
logger.error("[OpenClaw] 建議數據讀取失敗: %s", e)
raise
finally:
session.close()
def _fetch_category_breakdown(days: int = 7) -> List[Dict]:
"""品類業績分佈"""
session = get_session()
try:
rows = session.execute(text("""
SELECT p.category,
SUM(COALESCE(s."總業績"::numeric, 0)) AS revenue,
COUNT(DISTINCT p.i_code) AS sku_count
FROM daily_sales_snapshot s
JOIN products p ON p.name = s."商品名稱"
WHERE s.snapshot_date::date >= CURRENT_DATE - :days
AND p.status = 'ACTIVE'
GROUP BY p.category
ORDER BY revenue DESC
LIMIT 10
"""), {"days": days}).fetchall()
return [{"category": r[0], "revenue": float(r[1] or 0), "sku_count": int(r[2] or 0)}
for r in rows]
except Exception as e:
logger.error("[OpenClaw] 品類數據讀取失敗: %s", e)
raise
finally:
session.close()
def _fetch_competitor_summary() -> Dict[str, Any]:
"""競品價格整體概況"""
session = get_session()
try:
coverage: Dict[str, Any] = {}
review_decision_brief: Dict[str, Any] = {
"text": "(目前沒有待覆核決策信封)",
"lines": [],
"items": [],
"hitl_count": 0,
"auto_execute_blocked_count": 0,
}
if session.bind is not None:
try:
from services.competitor_intel_repository import (
fetch_competitor_coverage,
fetch_competitor_review_queue,
summarize_review_decision_envelopes,
)
coverage = fetch_competitor_coverage(session.bind) or {}
review_queue = fetch_competitor_review_queue(session.bind, limit=5) or []
review_decision_brief = summarize_review_decision_envelopes(review_queue, limit=5)
except Exception as repo_exc:
logger.warning("[OpenClaw] 競品覆核信封摘要讀取失敗,降級只讀正式價差: %s", repo_exc)
row = session.execute(text("""
SELECT
COUNT(*) AS total,
AVG((cp.price - pr.price) / pr.price * 100) AS avg_gap_pct,
SUM(CASE WHEN cp.price < pr.price * 0.9 THEN 1 ELSE 0 END) AS undercut_count,
SUM(CASE WHEN cp.price > pr.price * 1.1 THEN 1 ELSE 0 END) AS premium_count
FROM competitor_prices cp
JOIN products p ON p.i_code = cp.sku
JOIN (
SELECT DISTINCT ON (product_id) product_id, price
FROM price_records ORDER BY product_id, timestamp DESC
) pr ON pr.product_id = p.id
WHERE cp.expires_at > NOW()
AND COALESCE(cp.match_score, 0) >= 0.76
AND COALESCE(cp.tags, '[]'::jsonb) ? 'identity_v2'
""")).fetchone()
return {
"total_skus": int((row[0] if row else 0) or 0),
"avg_gap_pct": round(float((row[1] if row else 0) or 0), 1),
"undercut_count": int((row[2] if row else 0) or 0),
"premium_count": int((row[3] if row else 0) or 0),
"match_rate": float(coverage.get("match_rate") or 0),
"active_with_price": int(coverage.get("active_with_price") or 0),
"decision_ready_matches": int(
coverage.get("decision_ready_matches") or coverage.get("fresh_matches") or 0
),
"decision_ready_rate": float(coverage.get("decision_ready_rate") or 0),
"unit_comparable_count": int(coverage.get("unit_comparable_count") or 0),
"rescore_accepted_count": int(coverage.get("rescore_accepted_count") or 0),
"review_queue_count": int(coverage.get("actionable_review_count") or 0),
"manual_accept_count": int(coverage.get("manual_accept_count") or 0),
"manual_reject_count": int(coverage.get("manual_reject_count") or 0),
"manual_unit_price_count": int(coverage.get("manual_unit_price_count") or 0),
"manual_accept_rate": float(coverage.get("manual_accept_rate") or 0),
"review_decision_brief": review_decision_brief,
"review_decision_text": review_decision_brief.get("text") or "(目前沒有待覆核決策信封)",
}
except Exception as e:
logger.error("[OpenClaw] 競品概況讀取失敗: %s", e)
raise
finally:
session.close()
# ═══════════════════════════════════════════════════════════════════════════════
# DB 寫入層
# ═══════════════════════════════════════════════════════════════════════════════
def _save_to_ai_insights(
insight_type: str,
content: str,
confidence: float,
metadata: Dict[str, Any],
period: Optional[str] = None,
) -> Optional[int]:
"""將分析結果持久化到 ai_insights"""
session = get_session()
try:
row = session.execute(text("""
INSERT INTO ai_insights
(insight_type, content, confidence, created_by, status,
metadata_json, period, created_at)
VALUES (:type, :content, :conf, 'openclaw', 'active', :meta, :period, NOW())
RETURNING id
"""), {
"type": insight_type,
"content": content[:8000],
"conf": confidence,
"meta": json.dumps(metadata, ensure_ascii=False),
"period": period or datetime.now().strftime("%Y-%m-%d"),
}).fetchone()
session.commit()
insight_id = row[0] if row else None
if insight_id:
try:
from services.openclaw_learning_service import enqueue_insight_embedding
enqueue_insight_embedding(insight_id, insight_type, content[:8000], period or datetime.now().strftime("%Y-%m-%d"))
except Exception as embed_err:
logger.warning("[OpenClaw] embedding queue enqueue failed id=%s: %s", insight_id, embed_err)
logger.info("[OpenClaw] ai_insights 寫入成功 id=%s type=%s", insight_id, insight_type)
return insight_id
except Exception as e:
logger.error("[OpenClaw] ai_insights 寫入失敗: %s", e)
session.rollback()
return None
finally:
session.close()
def _send_data_stale_alert(report_type: str, last_date: str, period: str) -> bool:
"""資料停更告警daily/weekly/monthly 共用24h dedupe
critic post-review BLOCKER #1 抽取daily/weekly/monthly 任一報告路徑檢查到
`_fetch_sales_summary` 回傳 `stale=True` 時呼叫此函式,避免:
1. 每天 09:00 daily 偵測到 stale → 立刻發 NT$0 報告
2. 多份報告同時偵測到 stale → 同一天送 N 次告警噪音
Dedupe 機制:以 ai_insights 表查近 24 小時是否已有同 report_type 的
`insight_type='data_stale_alert'` 紀錄;若有,跳過 telegram 發送但仍 return True
讓上游照常 return skipped。若無發送 telegram 並寫入 ai_insights 留痕。
Args:
report_type: "daily_report" / "weekly_strategy" / "monthly_report"
last_date: daily_sales_snapshot 最後一筆 snapshot_date字串
period: 該報告原本的 period 字串(用於告警訊息可讀性)
Returns:
True — 告警已送出 或 已 dedupe 跳過(上游視為「停更已通知」)
False — DB / Telegram 都失敗(上游可選擇 fallback
"""
# Step 1: dedupe 查詢
session = get_session()
try:
dedupe_row = session.execute(text("""
SELECT id FROM ai_insights
WHERE insight_type = 'data_stale_alert'
AND created_by = 'openclaw'
AND created_at >= NOW() - INTERVAL '24 hours'
AND metadata_json::jsonb->>'report_type' = :rt
ORDER BY created_at DESC
LIMIT 1
"""), {"rt": report_type}).fetchone()
except Exception as e:
# dedupe 查詢失敗不該擋告警(寧可重複也別漏報)
logger.warning("[OpenClaw] data_stale_alert dedupe 查詢失敗 rt=%s: %s", report_type, e)
dedupe_row = None
finally:
session.close()
if dedupe_row:
logger.info(
"[OpenClaw] data_stale_alert 已於 24h 內送過,跳過重複告警 rt=%s last_date=%s",
report_type, last_date,
)
return True
# Step 2: 發送 telegram
msg = (
f"⚠️ [資料停更告警] {report_type}\n"
f"daily_sales_snapshot 最後更新:{last_date}\n"
f"原訂報告期間:{period}\n"
f"請檢查人工上傳流程;本告警已自動跳過該報告產出。"
)
sent_ok = False
try:
from services.telegram_templates import _send_telegram_raw
_send_telegram_raw(msg)
sent_ok = True
except Exception as e:
logger.error("[OpenClaw] data_stale_alert telegram 發送失敗 rt=%s: %s", report_type, e)
# Step 3: 寫入 ai_insights 留痕(即使 telegram 失敗也寫,下次 dedupe 才有依據)
try:
_save_to_ai_insights(
insight_type="data_stale_alert",
content=msg,
confidence=1.0,
metadata={
"report_type": report_type,
"last_date": last_date,
"period": period,
"telegram_sent": sent_ok,
},
period=datetime.now().strftime("%Y-%m-%d"),
)
except Exception as e:
logger.warning("[OpenClaw] data_stale_alert ai_insights 寫入失敗 rt=%s: %s", report_type, e)
return sent_ok
def _find_existing_weekly_strategy(
period: str,
sent_only: bool = False,
) -> Optional[Dict[str, Any]]:
"""查詢同一週期最新已啟用週報(不重複生成)。
`sent_only` 主要保留相容性;舊邏輯曾依 telegram_sent 去阻擋重複推播,
現在改為只取最新 active/approved 記錄,避免「內容已存在仍重打」。
"""
session = get_session()
try:
row = session.execute(text("""
SELECT id, created_at
FROM ai_insights
WHERE insight_type = 'weekly_strategy'
AND created_by = 'openclaw'
AND period = :period
AND status IN ('active', 'approved')
ORDER BY created_at DESC
LIMIT 1
"""), {"period": period}).fetchone()
if not row:
return None
return {"id": row[0], "created_at": row[1]}
except Exception as e:
logger.warning("[OpenClaw] 週報去重查詢失敗 period=%s: %s", period, e)
return None
finally:
session.close()
def _load_weekly_strategy_payload(period: str) -> Optional[Dict[str, Any]]:
"""載入同一週期最新已啟用週報正文與 metadata供重用/直接回傳)。"""
session = get_session()
try:
row = session.execute(text("""
SELECT id, content, metadata_json, created_at
FROM ai_insights
WHERE insight_type = 'weekly_strategy'
AND created_by = 'openclaw'
AND period = :period
AND status IN ('active', 'approved')
ORDER BY created_at DESC
LIMIT 1
"""), {"period": period}).fetchone()
if not row:
return None
meta = _normalize_weekly_strategy_metadata(row[2])
return {"id": row[0], "content": row[1], "metadata": meta, "created_at": row[3]}
except Exception as e:
logger.warning("[OpenClaw] 週報載入失敗 period=%s: %s", period, e)
return None
finally:
session.close()
def _normalize_weekly_strategy_metadata(raw_meta: Any) -> Dict[str, Any]:
"""將 ai_insights metadata 轉成 dict並補入預設欄位避免型別錯誤。"""
meta = raw_meta or {}
if isinstance(meta, str):
try:
meta = json.loads(meta)
except Exception:
meta = {}
if not isinstance(meta, dict):
meta = {}
if "telegram_sent" not in meta:
meta["telegram_sent"] = False
if "telegram_sending" not in meta:
meta["telegram_sending"] = False
return meta
def _set_weekly_strategy_metadata(insight_id: int, metadata: Dict[str, Any]) -> bool:
"""以 metadata 全量覆寫指定週報記錄,並回傳是否寫入成功。"""
if not insight_id:
return False
session = get_session()
try:
session.execute(
text("""
UPDATE ai_insights
SET metadata_json = :metadata
WHERE id = :id
"""),
{"id": insight_id, "metadata": json.dumps(metadata, ensure_ascii=False)},
)
session.commit()
return True
except Exception as e:
logger.warning("[OpenClaw] 更新週報 metadata 失敗 insight_id=%s: %s", insight_id, e)
session.rollback()
return False
finally:
session.close()
def _set_weekly_strategy_telegram_locked(insight_id: int, *, telegram_sent: Optional[bool] = None,
telegram_sending: Optional[bool] = None, sent_at: Optional[datetime] = None) -> bool:
"""更新既有週報的發送狀態欄位telegram_sent / telegram_sending"""
if not insight_id:
return False
session = get_session()
try:
row = session.execute(
text("SELECT metadata_json FROM ai_insights WHERE id = :id"),
{"id": insight_id},
).fetchone()
if not row:
return False
meta = _normalize_weekly_strategy_metadata(row[0])
if telegram_sent is not None:
meta["telegram_sent"] = bool(telegram_sent)
if telegram_sending is not None:
meta["telegram_sending"] = bool(telegram_sending)
if sent_at is None and telegram_sent:
meta.pop("telegram_sent_at", None)
elif sent_at is not None:
meta["telegram_sent_at"] = sent_at.isoformat()
session.execute(
text("""
UPDATE ai_insights
SET metadata_json = :metadata
WHERE id = :id
"""),
{"id": insight_id, "metadata": json.dumps(meta, ensure_ascii=False)},
)
session.commit()
return True
except Exception as e:
logger.warning("[OpenClaw] 更新週報 telegram metadata 失敗 insight_id=%s: %s", insight_id, e)
session.rollback()
return False
finally:
session.close()
def _acquire_weekly_strategy_send_lock(insight_id: int) -> bool:
"""嘗試取得週報 Telegram 發送鎖。
若該筆已標記發送中或已發送,回傳 False。
"""
if not insight_id:
return False
session = get_session()
try:
row = session.execute(
text("SELECT metadata_json FROM ai_insights WHERE id = :id FOR UPDATE"),
{"id": insight_id},
).fetchone()
if not row:
return False
meta = _normalize_weekly_strategy_metadata(row[0])
if bool(meta.get("telegram_sending")) or bool(meta.get("telegram_sent")):
return False
meta["telegram_sending"] = True
meta["telegram_sent"] = False
session.execute(
text("""
UPDATE ai_insights
SET metadata_json = :metadata
WHERE id = :id
"""),
{"id": insight_id, "metadata": json.dumps(meta, ensure_ascii=False)},
)
session.commit()
return True
except Exception as e:
logger.warning("[OpenClaw] 取得週報 telegram 發送鎖失敗 insight_id=%s: %s", insight_id, e)
session.rollback()
return False
finally:
session.close()
def _set_weekly_strategy_telegram_sent(insight_id: int) -> None:
"""更新已儲存週報的 telegram_sent 狀態,避免再次重複發送。"""
_set_weekly_strategy_telegram_locked(
insight_id,
telegram_sending=False,
telegram_sent=True,
sent_at=datetime.now(),
)
def _consolidate_weekly_strategy_records(period: str) -> Dict[str, int]:
"""同一週保留最新一筆,將舊重複紀錄標示為 superseded保留內容"""
session = get_session()
kept_id = None
superseded_count = 0
total_count = 0
try:
rows = session.execute(text("""
SELECT id, created_at
FROM ai_insights
WHERE insight_type = 'weekly_strategy'
AND created_by = 'openclaw'
AND period = :period
ORDER BY created_at DESC, id DESC
"""), {"period": period}).fetchall()
total_count = len(rows)
if total_count <= 1:
return {"period": period, "total_count": total_count, "kept_id": None, "superseded_count": 0}
kept_id = rows[0][0]
old_ids = [int(r[0]) for r in rows[1:]]
if old_ids:
res = session.execute(text("""
UPDATE ai_insights
SET status = 'superseded'
WHERE id IN :ids
AND status IN ('active', 'approved')
""").bindparams(bindparam("ids", expanding=True)), {"ids": old_ids})
superseded_count = int(getattr(res, "rowcount", 0) or 0)
session.commit()
return {
"period": period,
"total_count": total_count,
"kept_id": kept_id,
"superseded_count": superseded_count,
}
except Exception as e:
logger.warning("[OpenClaw] 週報 dedupe 失敗 period=%s: %s", period, e)
session.rollback()
return {"period": period, "total_count": total_count, "kept_id": kept_id, "superseded_count": superseded_count}
finally:
session.close()
def _save_action_items(actions: List[str], source_insight_id: Optional[int]) -> None:
"""將 AI 建議的行動項目寫入 action_plans"""
if not actions:
return
session = get_session()
try:
for i, action in enumerate(actions[:10]):
desc = action[:500]
if active_openclaw_recommendation_exists(session, desc):
logger.info("[OpenClaw] action_plans skip duplicate recommendation")
continue
session.execute(text("""
INSERT INTO action_plans
(action_type, description, status, priority, metadata_json, created_at)
VALUES ('openclaw_recommendation', :desc, 'pending', :priority, :meta, NOW())
"""), {
"desc": desc,
"priority": i + 1,
"meta": openclaw_action_metadata(source_insight_id, desc),
})
session.commit()
logger.info("[OpenClaw] action_plans 寫入 %d", len(actions[:10]))
except Exception as e:
logger.warning("[OpenClaw] action_plans 寫入失敗: %s", e)
session.rollback()
finally:
session.close()
# ═══════════════════════════════════════════════════════════════════════════════
# Gemini 呼叫層
# ═══════════════════════════════════════════════════════════════════════════════
def _gemini_nim_fallback_caller(caller: str) -> str:
"""Gemini 失敗後的 NIM caller 命名Q&A 備援要回到標準 openclaw_qa_nim。"""
if caller == "openclaw_qa_gemini_fallback":
return "openclaw_qa_nim"
return f"{caller}_nim"
def _call_gemini(
system_prompt: str,
user_prompt: str,
temperature: float = 0.4,
caller: str = "openclaw_qa",
) -> Optional[str]:
"""呼叫 Gemini回傳文字失敗回傳 None。
Args:
caller: ai_calls.caller — 由外層 generate_*_report 傳入細分 caller
(openclaw_daily / openclaw_weekly / openclaw_monthly / openclaw_meta /
openclaw_qa_gemini_fallback)
"""
if not is_gemini_fallback_enabled("openclaw_strategy"):
logger.info("[OpenClaw] %s", gemini_disabled_message("openclaw_strategy"))
return None
gemini_api_key = get_gemini_api_key("openclaw_strategy")
if not gemini_api_key:
logger.warning("[OpenClaw] GEMINI_API_KEY 未設定")
return None
with log_ai_call(
caller=caller,
provider='gemini',
model=STRATEGY_MODEL,
meta={'temperature': temperature},
) as _ctx:
try:
import google.generativeai as genai
genai.configure(api_key=gemini_api_key)
model = genai.GenerativeModel(
model_name=STRATEGY_MODEL,
generation_config=genai.types.GenerationConfig(
temperature=temperature,
max_output_tokens=4096,
),
system_instruction=system_prompt,
)
response = model.generate_content(
user_prompt,
request_options={"timeout": 180},
)
# Gemini SDKresponse.usage_metadata.{prompt_token_count, candidates_token_count}
try:
usage = getattr(response, 'usage_metadata', None)
if usage is not None:
_ctx.set_tokens(
input=getattr(usage, 'prompt_token_count', 0) or 0,
output=getattr(usage, 'candidates_token_count', 0) or 0,
)
except Exception:
logger.debug("[OpenClaw] Gemini usage metadata parse failed caller=%s", caller, exc_info=True)
return response.text or ""
except Exception as e:
logger.error("[OpenClaw] Gemini 呼叫失敗: %s", e)
_ctx.set_error(f"{type(e).__name__}: {e}")
_ctx.fallback_to_caller(_gemini_nim_fallback_caller(caller))
return None
def _call_ollama_strategy(
system_prompt: str,
user_prompt: str,
temperature: float = 0.4,
caller: str = "openclaw_strategy",
*,
model_name: Optional[str] = None,
timeout: Optional[int] = None,
num_predict: Optional[int] = None,
fallback_caller: Optional[str] = None,
) -> Optional[str]:
"""OpenClaw 敘事報告主路徑OllamaService 三主機級聯。"""
model = model_name or OPENCLAW_STRATEGY_OLLAMA_MODEL
timeout_s = timeout or OPENCLAW_STRATEGY_OLLAMA_TIMEOUT
predict = num_predict or OPENCLAW_STRATEGY_OLLAMA_NUM_PREDICT
fallback = fallback_caller or f"{caller}_gemini_fallback"
with log_ai_call(
caller=caller,
provider="gcp_ollama",
model=model,
meta={
"route": "ollama_first",
"temperature": temperature,
"num_predict": predict,
},
) as ctx:
try:
from services.ollama_service import OllamaService, get_host_label, get_provider_tag
ctx.set_prompt_hash(user_prompt)
resp = OllamaService(model=model).generate(
prompt=user_prompt,
model=model,
system_prompt=system_prompt,
temperature=temperature,
timeout=timeout_s,
keep_alive=OPENCLAW_STRATEGY_OLLAMA_KEEP_ALIVE,
options={"num_predict": predict},
allow_111_fallback=False,
)
ctx.set_provider(get_provider_tag(resp.host or ""))
ctx.set_model(resp.model or model)
ctx.set_tokens(input=resp.input_tokens, output=resp.output_tokens)
ctx.add_meta("host", resp.host)
ctx.add_meta("host_label", get_host_label(resp.host or ""))
if resp.model and resp.model != model:
ctx.add_meta("requested_model", model)
if not resp.success:
ctx.set_error(resp.error or "ollama generate failed")
ctx.fallback_to_caller(fallback)
logger.warning(
"[OpenClaw] Ollama strategy cascade failed caller=%s host=%s error=%s",
caller,
resp.host,
resp.error,
)
return None
text = (resp.content or "").strip()
if not text:
ctx.set_error("empty_response")
ctx.fallback_to_caller(fallback)
return None
return text
except Exception as exc:
ctx.set_error(f"{type(exc).__name__}: {str(exc)[:240]}")
ctx.fallback_to_caller(fallback)
logger.warning("[OpenClaw] Ollama strategy exception caller=%s: %s", caller, exc)
return None
def _call_openclaw_llm_ollama_first(
system_prompt: str,
user_prompt: str,
*,
temperature: float,
caller: str,
num_predict: Optional[int] = None,
) -> Optional[str]:
"""OpenClaw 報告統一 LLM 入口Ollama 主、Gemini 備援、NIM 最後備援。"""
gemini_caller = f"{caller}_gemini_fallback"
text_out = _call_ollama_strategy(
system_prompt,
user_prompt,
temperature=temperature,
caller=caller,
num_predict=num_predict,
fallback_caller=gemini_caller,
)
if text_out:
return text_out
if get_gemini_api_key("openclaw_strategy"):
text_out = _call_gemini(
system_prompt,
user_prompt,
temperature=temperature,
caller=gemini_caller,
)
if text_out:
return text_out
if NVIDIA_API_KEY:
return _call_nvidia_nim(system_prompt, user_prompt, temperature=temperature, caller=caller)
return None
def _call_nvidia_nim(
system_prompt: str,
user_prompt: str,
temperature: float = 0.5,
caller: str = "openclaw_qa",
) -> Optional[str]:
"""Gemini 離線時備援 NVIDIA NIM回傳文字失敗回傳 None。
Args:
caller: 由外層細分 caller最終會以 ``{caller}_nim`` 紀錄到 ai_calls。
"""
if not NVIDIA_API_KEY:
return None
nim_caller = f"{caller}_nim"
with log_ai_call(
caller=nim_caller,
provider='nim',
model=NVIDIA_FALLBACK_MODEL,
meta={'temperature': temperature},
) as _ctx:
try:
resp = requests.post(
NVIDIA_NIM_URL,
headers={
"Authorization": f"Bearer {NVIDIA_API_KEY}",
"Content-Type": "application/json",
},
json={
"model": NVIDIA_FALLBACK_MODEL,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
"temperature": temperature,
"max_tokens": 1024,
},
timeout=60,
)
resp.raise_for_status()
body = resp.json()
usage = body.get("usage", {}) or {}
_ctx.set_tokens(
input=usage.get("prompt_tokens", 0),
output=usage.get("completion_tokens", 0),
)
return body["choices"][0]["message"]["content"]
except Exception as e:
logger.error("[OpenClaw] NVIDIA NIM 呼叫失敗: %s", e)
_ctx.set_error(f"{type(e).__name__}: {e}")
return None
# ═══════════════════════════════════════════════════════════════════════════════
# Telegram 推播
# ═══════════════════════════════════════════════════════════════════════════════
def _send_strategy_telegram(title: str, report_type: str, period: str, content: str) -> bool:
"""發送週報到 Telegram。成功時回傳 True。"""
try:
from services.telegram_templates import report as tpl_report, _send_telegram_raw
# Telegram 訊息長度限制 4096分段發送
header = tpl_report(title, report_type, period, "")
chunks = _split_message(content, max_len=3800 - len(header))
for i, chunk in enumerate(chunks):
msg = tpl_report(title, report_type, period, chunk) if i == 0 else chunk
_send_telegram_raw(msg)
return True
except Exception as e:
logger.error("[OpenClaw] Telegram 推播失敗: %s", e)
return False
def _split_message(text: str, max_len: int = 3800) -> List[str]:
if len(text) <= max_len:
return [text]
chunks = []
while text:
chunks.append(text[:max_len])
text = text[max_len:]
return chunks
def _push_report_with_charts(
header: str,
body: str,
charts: List[tuple],
report_label: str,
) -> None:
"""daily/monthly 圖文報告共用推播:有圖走 send_report_with_charts無圖走 raw。
Operation Ollama-First v5.0 Phase 4 抽出(純結構重構,行為與原 inline 樣板一致):
1. 三處原 inline 邏輯_legacy_full_gemini_daily_report / generate_monthly_report /
_generate_daily_report_hermes_template完全相同header + "\\n\\n" + body
有 charts 用圖文 API無則 raw。
2. 失敗只 log warning非阻塞與原行為一致。
Args:
header: telegram_templates 已渲染的 header 字串
body: 報告主文Ollama-firstGemini 僅備援)
charts: list of (filename, png_bytes, caption) tuples可為空 list
report_label: log 訊息辨識用,例如 "日報" / "月報" / "日報(模板模式)"
"""
try:
if charts:
from services.telegram_templates import (
send_report_with_charts,
_get_chat_ids,
)
full_msg = header + "\n\n" + body
send_report_with_charts(full_msg, charts, _get_chat_ids())
else:
from services.telegram_templates import _send_telegram_raw
_send_telegram_raw(header + "\n\n" + body)
except Exception as e:
logger.error("[OpenClaw] %s Telegram 推播失敗: %s", report_label, e)
def _collect_mcp_intel(label: str) -> Dict[str, Any]:
"""weekly/monthly 共用 MCP 外部情報收集(純結構重構,無行為變更)。
Args:
label: log 訊息辨識用,例如 "週報" / "月報"
Returns:
dict: ``{"mcp_data": {...}, "holiday_ctx": str, "seasonal_ctx": str}``
失敗時三欄位皆回空字串/空 dict非阻塞與原 inline 行為一致)。
"""
try:
from services.mcp_collector_service import mcp_collector
return {
"mcp_data": mcp_collector.collect_all() or {},
"holiday_ctx": mcp_collector.get_holiday_context() or "",
"seasonal_ctx": mcp_collector.get_seasonal_context() or "",
}
except Exception as e:
logger.warning("[OpenClaw] %s MCP 收集失敗(非阻塞): %s", label, e)
return {"mcp_data": {}, "holiday_ctx": "", "seasonal_ctx": ""}
# ═══════════════════════════════════════════════════════════════════════════════
# 主要公開函式
# ═══════════════════════════════════════════════════════════════════════════════
def generate_weekly_strategy_report(
context: Optional[Any] = None,
force_tg_alert: bool = False,
force_generate: bool = False,
) -> dict:
"""
OpenClaw 全景電商週報(每週一 06:00
流程:
1. 讀取 DB業績 / 競品 / 威脅 / 建議 / 品類
2. MCP 收集:外部市場趨勢 / 節日 / 競品動態
3. Gemini 2.5 Flash 深度分析
4. 持久化 → ai_insights + action_plans
5. Telegram 推播
"""
now = datetime.now()
period = f"{now.strftime('%Y年第%W週')} ({now.strftime('%m/%d')})"
period_key = now.strftime("%Y-%W")
logger.info("[OpenClaw] 週報任務啟動 period=%s", period)
existing = _load_weekly_strategy_payload(period_key)
if existing and not force_generate:
# 已有同週報告則沿用既有內容,不再重新呼叫 Gemini
sent_metadata = bool(existing.get("metadata", {}).get("telegram_sent"))
sending_metadata = bool(existing.get("metadata", {}).get("telegram_sending"))
if force_tg_alert:
if sending_metadata:
logger.info(
"[OpenClaw] 本週週報正在發送中,略過重複推播 period=%s insight_id=%s",
period_key,
existing["id"],
)
return {
"status": "skipped",
"report_type": "weekly_strategy",
"reason": "weekly_strategy_send_in_progress",
"insight_id": existing["id"],
"period": period,
}
if not sent_metadata and existing.get("content"):
if _acquire_weekly_strategy_send_lock(existing["id"]):
send_ok = _send_strategy_telegram(
title="OpenClaw 電商全景週報",
report_type="weekly_strategy",
period=period,
content=existing["content"],
)
if send_ok:
_set_weekly_strategy_telegram_locked(
existing["id"],
telegram_sent=True,
telegram_sending=False,
sent_at=datetime.now(),
)
return {
"status": "sent",
"report_type": "weekly_strategy",
"reason": "weekly_strategy_reused_from_cache",
"insight_id": existing["id"],
"period": period,
}
_set_weekly_strategy_telegram_locked(
existing["id"],
telegram_sent=False,
telegram_sending=False,
)
return {
"status": "error",
"report_type": "weekly_strategy",
"reason": "weekly_strategy_send_failed",
"insight_id": existing["id"],
"period": period,
}
logger.warning(
"[OpenClaw] 取得週報發送鎖失敗 period=%s insight_id=%s",
period_key,
existing["id"],
)
return {
"status": "skipped",
"report_type": "weekly_strategy",
"reason": "weekly_strategy_send_in_progress",
"insight_id": existing["id"],
"period": period,
}
logger.info(
"[OpenClaw] 本週週報已存在且已發送,跳過重複推播 period=%s insight_id=%s",
period_key,
existing["id"],
)
return {
"status": "skipped",
"report_type": "weekly_strategy",
"reason": "weekly_strategy_already_generated",
"insight_id": existing["id"],
"period": period,
}
logger.info(
"[OpenClaw] 本週週報已存在,跳過重複產生 period=%s insight_id=%s",
period_key,
existing["id"],
)
return {
"status": "skipped",
"report_type": "weekly_strategy",
"reason": "weekly_strategy_already_generated",
"insight_id": existing["id"],
"period": period,
}
# ── Step 1DB 數據收集 ──────────────────────────────────────────────────
sales = _fetch_sales_summary(14)
if sales.get("stale"):
# critic post-review BLOCKER #1改用統一 _send_data_stale_alert24h dedupe
# weekly return shape 維持不動status="error"),避免動到 Phase 2 已建立的
# weekly dedupe/cache 機制下游語意。
_send_data_stale_alert(
report_type="weekly_strategy",
last_date=str(sales.get("last_date")),
period=period,
)
return {"status": "error", "reason": "data_stale"}
threats = _fetch_top_threats(10)
recommendations = _fetch_top_recommendations(10)
categories = _fetch_category_breakdown(7)
competitor_summary = _fetch_competitor_summary()
# ── Step 2MCP 外部情報 ─────────────────────────────────────────────────
_mcp = _collect_mcp_intel("週報")
mcp_data = _mcp["mcp_data"]
holiday_ctx = _mcp["holiday_ctx"]
seasonal_ctx = _mcp["seasonal_ctx"]
# ── Step 3組建 Gemini Prompt ───────────────────────────────────────────
system_prompt = """你是 OpenClaw一位台灣頂尖電商戰略分析師專精於 momo 購物平台。
你的任務是根據真實業績數據、競品情報、外部市場趨勢,產出一份具體可執行的週報。
語言規定:
- 所有輸出必須使用繁體中文(台灣用語)
- 數字格式:金額用 NT$ 標示百分比保留1位小數
- 語氣:專業但不失親切,適合匯報給電商運營主管
分析原則:
- 每個洞察必須有數據支撐,禁止憑空推測
- 建議必須具體(時間、對象、行動、預期效益)
- 優先關注「可在 48 小時內執行」的行動項目"""
db_section = f"""
【DB 即時數據】
業績概況:
本週營收NT${sales.get('current_7d_revenue', 0):,.0f}
前週營收NT${sales.get('prev_7d_revenue', 0):,.0f}
週成長率:{sales.get('wow_pct', 0):+.1f}%
競品比對概況:
監控SKU總數{competitor_summary.get('total_skus', 0)}
平均價差:{competitor_summary.get('avg_gap_pct', 0):+.1f}%
被競品削價數:{competitor_summary.get('undercut_count', 0)}
我方具優勢數:{competitor_summary.get('premium_count', 0)}
需單位價覆核:{competitor_summary.get('unit_comparable_count', 0)}
重算待人工覆核:{competitor_summary.get('rescore_accepted_count', 0)}
人工覆核採用率:{competitor_summary.get('manual_accept_rate', 0):.1f}%
PChome 覆核決策信封HITL不可自動寫正式價差
{competitor_summary.get('review_decision_text', '(目前沒有待覆核決策信封)')}
TOP 威脅品項近48h Hermes 偵測):
{_format_threats(threats)}
待處理定價建議:
{_format_recommendations(recommendations)}
品類業績分佈(本週):
{_format_categories(categories)}
"""
mcp_section = f"""
【MCP 外部情報】
市場趨勢:
{mcp_data.get('market_trends', '(未取得)')[:600]}
競品動態:
{mcp_data.get('competitor_intel', '(未取得)')[:500]}
消費者情緒:
{mcp_data.get('consumer_sentiment', '(未取得)')[:400]}
定價策略參考:
{mcp_data.get('pricing_strategy', '(未取得)')[:400]}
節日行事曆:
{holiday_ctx}
{mcp_data.get('holiday_calendar', '')[:300]}
季節性洞察:
{seasonal_ctx}
{mcp_data.get('seasonal_insights', '')[:300]}
"""
user_prompt = f"""請根據以下數據,產出本週電商全景戰略週報:
{db_section}
{mcp_section}
請按以下結構輸出(每節使用 HTML <b> 標題,內容精簡扼要):
<b>📊 本週業績總結</b>
(關鍵指標 + WoW 變化 + 異常警示)
<b>🏆 TOP 機會品項</b>
具備提價或強推空間的品項3-5個含具體建議
<b>⚠️ TOP 威脅品項</b>
最需緊急處理的競品削價風險3-5個含建議行動
<b>💰 本週定價策略建議</b>
(整體定價方向 + 品類重點調整 + 心理定價應用)
<b>📢 行銷活動洞察</b>
(節日/季節機會 + 推薦活動形式 + 投放時機)
<b>📦 品類熱度分析</b>
(成長品類 vs 衰退品類 + 庫存備貨建議)
<b>🔮 市場競爭洞察</b>
(競品最新動態 + 平台策略差異 + 我方應對)
<b>🎯 48小時優先行動清單</b>
5-8條具體可執行任務格式[優先度] 行動說明 → 預期效益)
<b>📈 下週展望</b>
(風險提示 + 機會預告 + 需人工決策事項)
重要:語言必須是繁體中文,數據必須引用上方提供的實際數字。
"""
# ── Step 4Ollama-first 生成 ─────────────────────────────────────────────
logger.info("[OpenClaw] 呼叫 Ollama 三主機級聯生成週報...")
report_content = _call_openclaw_llm_ollama_first(
system_prompt,
user_prompt,
temperature=0.35,
caller="openclaw_weekly",
num_predict=4096,
)
if not report_content:
logger.error("[OpenClaw] 週報生成失敗Ollama/Gemini/NIM 均不可用")
return {"status": "error", "report_type": "weekly_strategy", "error": "Ollama/Gemini/NIM 呼叫失敗"}
# ── Step 5解析行動清單 ─────────────────────────────────────────────────
action_items = _extract_action_items(report_content)
# ── Step 6持久化 DB ────────────────────────────────────────────────────
metadata = {
"period": period,
"model": STRATEGY_MODEL,
"wow_pct": sales.get("wow_pct", 0),
"threat_count": len(threats),
"recommendation_count": len(recommendations),
"mcp_topics_collected": sum(1 for v in mcp_data.values() if v),
"action_count": len(action_items),
"generated_at": now.isoformat(),
"telegram_sent": False,
"telegram_sending": False,
}
insight_id = _save_to_ai_insights(
insight_type="weekly_strategy",
content=report_content,
confidence=0.88,
metadata=metadata,
period=period_key,
)
_save_action_items(action_items, insight_id)
_consolidate_weekly_strategy_records(period_key)
# ── Step 7Telegram 推播 ────────────────────────────────────────────────
if force_tg_alert:
latest_payload = _load_weekly_strategy_payload(period_key)
send_target_id = latest_payload["id"] if latest_payload else insight_id
send_content = latest_payload["content"] if latest_payload else report_content
if _acquire_weekly_strategy_send_lock(send_target_id):
send_ok = _send_strategy_telegram(
title="OpenClaw 電商全景週報",
report_type="weekly_strategy",
period=period,
content=send_content,
)
if send_ok:
_set_weekly_strategy_telegram_locked(
send_target_id,
telegram_sent=True,
telegram_sending=False,
sent_at=datetime.now(),
)
else:
_set_weekly_strategy_telegram_locked(
send_target_id,
telegram_sent=False,
telegram_sending=False,
)
return {
"status": "error",
"report_type": "weekly_strategy",
"reason": "weekly_strategy_send_failed",
"insight_id": send_target_id,
"period": period,
}
else:
logger.info(
"[OpenClaw] 本週週報發送已被其他執行緒持有,跳過推播 period=%s latest_id=%s",
period_key,
send_target_id,
)
return {
"status": "skipped",
"report_type": "weekly_strategy",
"reason": "weekly_strategy_send_in_progress",
"insight_id": send_target_id,
"period": period,
}
logger.info("[OpenClaw] 週報完成 insight_id=%s actions=%d", insight_id, len(action_items))
return {
"status": "ok",
"report_type": "weekly_strategy",
"insight_id": insight_id,
"period": period,
"action_count": len(action_items),
"summary": report_content[:300],
}
def _daily_hermes_template_enabled() -> bool:
"""Operation Ollama-First v5.0 Phase 3 — Hermes 模板模式 feature flag.
每次呼叫即時讀取,允許 runtime toggle 灰度(不需重啟 scheduler
預設 true → Hermes/Ollama 模板模式false 才回 _legacy_full_gemini_daily_report。
"""
# 統帥 2026-05-03 23:30 指令:「免費優先」
# 預設 ONHermes 算 KPI + 模板填充Gemini 只寫 200 字洞察(戰前 28K → ~8K tokens, -71%
# 緊急停用(回 Gemini 全文版export OPENCLAW_DAILY_HERMES_TEMPLATE=false
return os.getenv('OPENCLAW_DAILY_HERMES_TEMPLATE', 'true').strip().lower() in ('true', '1', 'yes', 'on')
def generate_daily_report() -> dict:
"""
OpenClaw 電商日報(每日 09:00— Operation Ollama-First v5.0 Phase 3 路由層。
依 ``OPENCLAW_DAILY_HERMES_TEMPLATE`` 分流:
- true預設``_generate_daily_report_hermes_template``Hermes 算 KPI + 模板填充 +
Ollama-first 寫 200 字洞察
- false``_legacy_full_gemini_daily_report`` 相容舊名,但仍走 Ollama-first 生成全文
回傳合約兩條路徑一致:``{status, report_type, insight_id, period, ...}``
cron 不需修改ai_insights schema 不變(仍 type='daily_report')。
"""
if _daily_hermes_template_enabled():
try:
return _generate_daily_report_hermes_template()
except Exception as e:
# 模板模式異常 → 自動降級回 legacy不讓 09:00 cron 整個掛掉
logger.error(
"[OpenClaw] 日報 Hermes 模板模式異常,自動降級回 legacy: %s", e,
exc_info=True,
)
return _legacy_full_gemini_daily_report()
return _legacy_full_gemini_daily_report()
def _legacy_full_gemini_daily_report() -> dict:
"""OpenClaw 日報舊版Gemini 全文)— Phase 3 前的原始實作,保留為 baseline 對照組。
流程:
1. 讀取昨日業績快照 + TOP 競品威脅 + 定價建議
2. Gemini 快速日報分析(溫度 0.3,精簡版)
3. 生成圖表近7日營收趨勢 + 競品價差柱圖
4. 持久化 ai_insightstype='daily_report'
5. Telegram 圖文推播
"""
now = datetime.now()
yesterday = now - timedelta(days=1)
period = yesterday.strftime("%Y年%m月%d")
logger.info("[OpenClaw] 日報任務啟動 period=%s", period)
# ── Step 1DB 數據收集 ──────────────────────────────────────────────────
sales = _fetch_sales_summary(7)
# critic post-review BLOCKER #1daily 路徑必須與 weekly 對齊套 stale gate
# 否則 daily_sales_snapshot 過期時會發出 NT$0 的偽日報(每天 09:00 復發)。
if sales.get("stale"):
last_date = str(sales.get("last_date"))
logger.warning(
"[OpenClaw] 日報任務跳過daily_sales_snapshot 已停更 last_date=%s period=%s",
last_date, period,
)
_send_data_stale_alert(
report_type="daily_report",
last_date=last_date,
period=period,
)
return {
"status": "skipped",
"report_type": "daily_report",
"reason": "data_stale",
"last_date": last_date,
"period": period,
}
threats = _fetch_top_threats(5)
recommendations = _fetch_top_recommendations(5)
competitor_summary = _fetch_competitor_summary()
# 昨日單日業績
yesterday_sales = _fetch_yesterday_sales()
# ── Step 2組建 Gemini Prompt ───────────────────────────────────────────
system_prompt = """你是 OpenClaw 日報分析師,負責每日電商業績快報。
語言:繁體中文(台灣用語)。風格:精簡、數字導向、可執行。
每個洞察必須有數字支撐,禁止空泛描述。"""
user_prompt = f"""請根據以下數據,產出今日電商日報({period}
【昨日業績】
總業績NT${yesterday_sales.get('revenue', 0):,.0f}
成交SKU數{yesterday_sales.get('sku_count', 0)}
訂單數:{yesterday_sales.get('order_count', 0)}
【近7日趨勢】
本週累計NT${sales.get('current_7d_revenue', 0):,.0f}
前週同期NT${sales.get('prev_7d_revenue', 0):,.0f}
WoW變化{sales.get('wow_pct', 0):+.1f}%
【競品警示近24h Hermes偵測
{_format_threats(threats)}
【待處理定價建議TOP 5
{_format_recommendations(recommendations)}
【競品整體概況】
監控SKU{competitor_summary.get('total_skus', 0)}
被削價風險:{competitor_summary.get('undercut_count', 0)}價差超過10%
平均價差:{competitor_summary.get('avg_gap_pct', 0):+.1f}%
單位價/身份覆核隊列:{competitor_summary.get('review_queue_count', 0)}
重算待人工覆核:{competitor_summary.get('rescore_accepted_count', 0)}
【PChome 覆核決策信封HITL不可自動寫正式價差
{competitor_summary.get('review_decision_text', '(目前沒有待覆核決策信封)')}
請按以下結構輸出(使用 HTML <b> 標題):
<b>📅 {period} 電商日報</b>
<b>📊 昨日業績快報</b>
(昨日關鍵數字 + 與近期均值比較 + 異常點說明)
<b>⚠️ 今日最高優先威脅</b>
(最緊急的 1-3 個競品削價威脅,含具體行動建議)
<b>💰 今日定價行動建議</b>
1-3 條今日應執行的調價動作格式SKU → 建議行動 → 預期效果)
<b>🎯 今日 3 件事</b>
(最重要的 3 件可執行任務24h內完成
語言繁體中文全文200字以內精準扼要。
"""
# ── Step 3Ollama-first 生成 ─────────────────────────────────────────────
logger.info("[OpenClaw] 呼叫 Ollama 三主機級聯生成日報...")
report_content = _call_openclaw_llm_ollama_first(
system_prompt,
user_prompt,
temperature=0.3,
caller="openclaw_daily",
num_predict=2048,
)
if not report_content:
logger.error("[OpenClaw] 日報生成失敗Ollama/Gemini/NIM 均不可用")
return {"status": "error", "report_type": "daily_report", "error": "Ollama/Gemini/NIM 呼叫失敗"}
# ── Step 4生成圖表 ─────────────────────────────────────────────────────
charts = []
try:
from services.chart_generator_service import (
revenue_trend_chart,
price_gap_bar_chart,
)
rev_chart = revenue_trend_chart(7, "近7日")
if rev_chart:
charts.append(("revenue_7d.png", rev_chart, "📈 近7日營收趨勢"))
if threats:
gap_chart = price_gap_bar_chart(threats, "競品價差警示TOP 5")
if gap_chart:
charts.append(("price_gap.png", gap_chart, "⚠️ 競品價差分析"))
except Exception as e:
logger.warning("[OpenClaw] 日報圖表生成失敗(非阻塞): %s", e)
# ── Step 5持久化 DB ────────────────────────────────────────────────────
metadata = {
"period": period,
"model": STRATEGY_MODEL,
"yesterday_revenue": yesterday_sales.get("revenue", 0),
"wow_pct": sales.get("wow_pct", 0),
"threat_count": len(threats),
"chart_count": len(charts),
"generated_at": now.isoformat(),
}
insight_id = _save_to_ai_insights(
insight_type="daily_report",
content=report_content,
confidence=0.85,
metadata=metadata,
period=yesterday.strftime("%Y-%m-%d"),
)
action_items = _extract_action_items_daily(report_content)
_save_action_items(action_items, insight_id)
# ── Step 6Telegram 推播(圖文)────────────────────────────────────────
try:
from services.telegram_templates import daily_report_header
header = daily_report_header(
date_str=period,
revenue=yesterday_sales.get("revenue", 0),
wow=sales.get("wow_pct", 0),
threat_count=len(threats),
opportunity_count=competitor_summary.get("premium_count", 0),
)
_push_report_with_charts(header, report_content, charts, "日報")
except Exception as e:
logger.error("[OpenClaw] 日報 header 組裝失敗: %s", e)
logger.info("[OpenClaw] 日報完成 insight_id=%s charts=%d", insight_id, len(charts))
return {
"status": "ok",
"report_type": "daily_report",
"insight_id": insight_id,
"period": period,
"chart_count": len(charts),
"action_count": len(action_items),
}
def generate_monthly_report() -> dict:
"""
OpenClaw 電商月報每月1日 07:00
流程:
1. 讀取上月完整業績 + 品類分佈 + 競品趨勢
2. MCP 收集月度外部情報
3. Gemini 深度月度分析(完整版)
4. 生成圖表:月度概覽 + 品類橫條 + 價格熱圖
5. 持久化 ai_insightstype='monthly_report'
6. Telegram 圖文推播
"""
now = datetime.now()
# 上個月
first_of_this_month = now.replace(day=1)
last_month_end = first_of_this_month - timedelta(days=1)
last_month_start = last_month_end.replace(day=1)
period = last_month_end.strftime("%Y年%m月")
logger.info("[OpenClaw] 月報任務啟動 period=%s", period)
# ── Step 1DB 數據收集(上月完整數據)─────────────────────────────────
days_in_month = (first_of_this_month - last_month_start).days
sales = _fetch_monthly_sales_summary(last_month_start, last_month_end)
# critic post-review BLOCKER #1monthly 路徑同樣加 stale gate。
# _fetch_monthly_sales_summary 沒有內建 stale 檢查(查固定日期區間),
# 若上月 daily_sales_snapshot 完全沒匯入則 revenue=0、sku_count=0
# 會產出「NT$0 月報」誤導決策。以「revenue=0 且 sku=0」當資料缺失訊號。
if (sales.get("revenue", 0) or 0) == 0 and (sales.get("sku_count", 0) or 0) == 0:
last_date_str = last_month_end.strftime("%Y-%m-%d")
logger.warning(
"[OpenClaw] 月報任務跳過:上月 daily_sales_snapshot 無資料 period=%s",
period,
)
_send_data_stale_alert(
report_type="monthly_report",
last_date=last_date_str,
period=period,
)
return {
"status": "skipped",
"report_type": "monthly_report",
"reason": "data_stale",
"last_date": last_date_str,
"period": period,
}
categories = _fetch_category_breakdown(days_in_month)
threats = _fetch_top_threats(10)
competitor_summary = _fetch_competitor_summary()
price_trend_data = _fetch_price_trend_summary(days_in_month)
# ── Step 2MCP 外部情報(月度版)───────────────────────────────────────
_mcp = _collect_mcp_intel("月報")
mcp_data = _mcp["mcp_data"]
holiday_ctx = _mcp["holiday_ctx"]
seasonal_ctx = _mcp["seasonal_ctx"]
# ── Step 3組建 Gemini Prompt ───────────────────────────────────────────
system_prompt = """你是 OpenClaw 月報首席分析師,負責 momo 平台電商月度深度報告。
語言繁體中文台灣用語。格式HTML標題 + 條列式數據。
每個洞察必須有月度數字支撐,並與上月/去年同期比較。
重點:月度趨勢、品類策略、定價最佳化、下月行動計畫。"""
db_section = f"""
{period} 業績總覽】
月營收NT${sales.get('revenue', 0):,.0f}
MoM 變化:{sales.get('mom_pct', 0):+.1f}%
YoY 變化:{sales.get('yoy_pct', 0):+.1f}%
活躍SKU數{sales.get('sku_count', 0)}
平均客單價NT${sales.get('avg_order_value', 0):,.0f}
【品類業績分佈TOP 10
{_format_categories(categories)}
【競品整體概況】
監控SKU{competitor_summary.get('total_skus', 0)}
月均價差:{competitor_summary.get('avg_gap_pct', 0):+.1f}%
被削價風險SKU{competitor_summary.get('undercut_count', 0)}
需單位價覆核SKU{competitor_summary.get('unit_comparable_count', 0)}
重算待人工覆核SKU{competitor_summary.get('rescore_accepted_count', 0)}
PChome 覆核決策信封HITL不可自動寫正式價差
{competitor_summary.get('review_decision_text', '(目前沒有待覆核決策信封)')}
【價格變動概況】
本月調價次數:{price_trend_data.get('price_changes', 0)}
平均調幅:{price_trend_data.get('avg_change_pct', 0):+.1f}%
主動降價SKU{price_trend_data.get('price_cuts', 0)}
主動提價SKU{price_trend_data.get('price_raises', 0)}
"""
mcp_section = f"""
【MCP 外部情報(月度)】
市場趨勢:
{mcp_data.get('market_trends', '(未取得)')[:600]}
競品動態:
{mcp_data.get('competitor_intel', '(未取得)')[:500]}
下月節日行事曆:
{holiday_ctx}
{mcp_data.get('holiday_calendar', '')[:400]}
季節性洞察:
{seasonal_ctx}
{mcp_data.get('seasonal_insights', '')[:400]}
"""
user_prompt = f"""請根據以下數據,產出 {period} 電商月度策略報告:
{db_section}
{mcp_section}
請按以下結構輸出(使用 HTML <b> 標題,詳細分析):
<b>📅 {period} 電商月度報告</b>
<b>📊 月度業績總結</b>
(月營收 + MoM/YoY 變化 + 超出/低於預期分析 + 關鍵驅動因素)
<b>🏆 本月品類贏家 vs 輸家</b>
成長最快3個品類 vs 衰退最嚴重3個品類含原因分析
<b>💰 本月定價策略回顧</b>
(調價效果評估 + 最佳定價案例 + 失誤案例 + 改進建議)
<b>⚔️ 競品月度分析</b>
(主要競爭對手動態 + 我方優劣勢 + 市場份額評估)
<b>📢 行銷效益評估</b>
(本月活動效果 + ROI 估算 + 最佳行銷時機分析)
<b>🔮 下月趨勢預測</b>
(季節性機會 + 節日活動規劃 + 風險預警 + 庫存建議)
<b>🎯 下月優先行動計畫</b>
8-10條具體可執行任務含時間節點和負責方向
格式:[週次/日期] 行動說明 → 預期效益)
<b>📈 Q{((now.month-1)//3)+1} 策略展望</b>
(季度目標設定 + 關鍵里程碑 + 需人工決策事項)
語言:繁體中文,數據必須引用上方提供的實際數字。
"""
# ── Step 4Ollama-first 生成 ─────────────────────────────────────────────
logger.info("[OpenClaw] 呼叫 Ollama 三主機級聯生成月報...")
report_content = _call_openclaw_llm_ollama_first(
system_prompt,
user_prompt,
temperature=0.35,
caller="openclaw_monthly",
num_predict=4096,
)
if not report_content:
logger.error("[OpenClaw] 月報生成失敗Ollama/Gemini/NIM 均不可用")
return {"status": "error", "report_type": "monthly_report", "error": "Ollama/Gemini/NIM 呼叫失敗"}
# ── Step 5生成圖表 ─────────────────────────────────────────────────────
charts = []
try:
from services.chart_generator_service import (
monthly_overview_chart,
category_revenue_chart,
price_history_heatmap,
)
overview_chart = monthly_overview_chart(6)
if overview_chart:
charts.append(("monthly_overview.png", overview_chart, f"📊 近6個月營收趨勢"))
cat_chart = category_revenue_chart(days_in_month, period)
if cat_chart:
charts.append(("category_revenue.png", cat_chart, "🏆 品類業績分佈"))
heatmap = price_history_heatmap(days_in_month)
if heatmap:
charts.append(("price_heatmap.png", heatmap, "🔥 品類價格熱圖"))
except Exception as e:
logger.warning("[OpenClaw] 月報圖表生成失敗(非阻塞): %s", e)
# ── Step 6持久化 DB ────────────────────────────────────────────────────
action_items = _extract_action_items(report_content)
metadata = {
"period": period,
"model": STRATEGY_MODEL,
"monthly_revenue": sales.get("revenue", 0),
"mom_pct": sales.get("mom_pct", 0),
"yoy_pct": sales.get("yoy_pct", 0),
"category_count": len(categories),
"chart_count": len(charts),
"mcp_topics_collected": sum(1 for v in mcp_data.values() if v),
"action_count": len(action_items),
"generated_at": now.isoformat(),
}
insight_id = _save_to_ai_insights(
insight_type="monthly_report",
content=report_content,
confidence=0.90,
metadata=metadata,
period=last_month_end.strftime("%Y-%m"),
)
_save_action_items(action_items, insight_id)
# ── Step 7Telegram 推播(圖文)────────────────────────────────────────
try:
from services.telegram_templates import monthly_report_header
top3 = [c.get("category", "N/A") for c in categories[:3]] or ["N/A"]
header = monthly_report_header(
month_str=period,
revenue=sales.get("revenue", 0),
mom=sales.get("mom_pct", 0),
yoy=sales.get("yoy_pct", 0),
top3_categories=top3,
)
_push_report_with_charts(header, report_content, charts, "月報")
except Exception as e:
logger.error("[OpenClaw] 月報 header 組裝失敗: %s", e)
logger.info("[OpenClaw] 月報完成 insight_id=%s charts=%d actions=%d",
insight_id, len(charts), len(action_items))
return {
"status": "ok",
"report_type": "monthly_report",
"insight_id": insight_id,
"period": period,
"chart_count": len(charts),
"action_count": len(action_items),
}
def generate_meta_analysis_report() -> str:
"""
AI 系統效能自我審視(每日 12:00 run_openclaw_meta_analysis_task 呼叫Phase 4 降頻 6h → 24h
分析 ai_insights 近期累積資料,評估:
- 各 Agent 預測準確率
- 價格建議執行率
- 告警品質與誤報率
- 系統盲區與改進方向
結果持久化至 ai_insightstype='meta_analysis'),並推播 Telegram。
"""
now = datetime.now()
period = now.strftime("%Y-%m-%d %H:00")
logger.info("[OpenClaw] Meta-Analysis 任務啟動 %s", period)
# ── 讀取近期 ai_insights 摘要 ────────────────────────────────────────────
session = get_session()
try:
stats = session.execute(text("""
SELECT
insight_type,
created_by,
COUNT(*) AS total,
AVG(confidence) AS avg_conf,
SUM(CASE WHEN status='active' THEN 1 ELSE 0 END) AS active_cnt,
SUM(CASE WHEN status='relearn' THEN 1 ELSE 0 END) AS relearn_cnt,
MAX(created_at) AS latest
FROM ai_insights
WHERE created_at >= NOW() - INTERVAL '24 hours'
GROUP BY insight_type, created_by
ORDER BY total DESC
""")).fetchall()
action_stats = session.execute(text("""
SELECT status, COUNT(*) AS cnt
FROM action_plans
WHERE created_at >= NOW() - INTERVAL '24 hours'
GROUP BY status
""")).fetchall()
reco_stats = session.execute(text("""
SELECT status, COUNT(*) AS cnt, AVG(confidence) AS avg_conf
FROM ai_price_recommendations
WHERE created_at >= NOW() - INTERVAL '24 hours'
GROUP BY status
""")).fetchall()
except Exception as e:
logger.warning("[OpenClaw] Meta 數據讀取失敗: %s", e)
stats, action_stats, reco_stats = [], [], []
finally:
session.close()
# ── 組建 Prompt ───────────────────────────────────────────────────────────
system_prompt = """你是 OpenClaw 自我審視模組,負責分析 AI 多智能體系統的近期表現。
請用繁體中文,以電商 AI 系統架構師的視角撰寫分析報告,語氣客觀、聚焦問題與改進。"""
stats_text = "\n".join([
f" {r[0]} ({r[1]}): 共{r[2]}筆, 平均信心{r[3]:.2f}, 活躍{r[4]}, 重學{r[5]}"
for r in stats
]) or " (無近期數據)"
action_text = "\n".join([
f" {r[0]}: {r[1]}" for r in action_stats
]) or " (無近期數據)"
reco_text = "\n".join([
f" {r[0]}: {r[1]} 筆, 平均信心{r[2]:.2f}" for r in reco_stats
]) or " (無近期數據)"
user_prompt = f"""請分析以下 AI 系統近 24 小時運作數據,產出自我審視報告:
【ai_insights 產出統計】
{stats_text}
【action_plans 執行狀況】
{action_text}
【ai_price_recommendations 建議狀況】
{reco_text}
【分析時間】{period}
請按以下結構輸出:
<b>🤖 AI 系統效能自我審視報告</b>
時間:{period}
<b>📊 各 Agent 產出統計</b>
(逐一評估 Hermes/NemoTron/OpenClaw/ElephantAlpha 的輸出品質)
<b>⚠️ 偵測到的系統問題</b>
(誤報、漏報、重學事件分析)
<b>💡 盲區與改進建議</b>
(哪些場景 AI 表現不足?建議優化方向)
<b>✅ 本週期亮點</b>
(表現良好的分析案例)
<b>🔧 技術債與優化優先順序</b>
1-3 項具體技術改進建議)
語言繁體中文200字以內精簡扼要。
"""
# ── Ollama-first 生成 ───────────────────────────────────────────────────
report_content = _call_openclaw_llm_ollama_first(
system_prompt,
user_prompt,
temperature=0.3,
caller="openclaw_meta",
num_predict=3072,
)
if not report_content:
logger.error("[OpenClaw] Meta-Analysis 生成失敗Ollama/Gemini/NIM 均不可用")
return "Meta-Analysis 生成失敗)"
# ── 持久化 DB ─────────────────────────────────────────────────────────────
metadata = {
"period": period,
"model": STRATEGY_MODEL,
"insight_types_analyzed": len(stats),
"generated_at": now.isoformat(),
}
insight_id = _save_to_ai_insights(
insight_type="meta_analysis",
content=report_content,
confidence=0.85,
metadata=metadata,
period=now.strftime("%Y-%m-%d"),
)
# ── Telegram 推播 ────────────────────────────────────────────────────────
try:
from services.telegram_templates import _send_telegram_raw
_send_telegram_raw(report_content)
except Exception as e:
logger.error("[OpenClaw] Meta-Analysis Telegram 推播失敗: %s", e)
logger.info("[OpenClaw] Meta-Analysis 完成 insight_id=%s", insight_id)
return report_content
# ═══════════════════════════════════════════════════════════════════════════════
# 輔助格式化函式
# ═══════════════════════════════════════════════════════════════════════════════
def _fetch_yesterday_sales() -> Dict[str, Any]:
"""昨日單日業績"""
session = get_session()
try:
row = session.execute(text("""
SELECT
SUM(COALESCE("總業績"::numeric, 0)) AS revenue,
COUNT(DISTINCT "商品ID") AS sku_count,
COUNT(*) AS order_count
FROM daily_sales_snapshot
WHERE snapshot_date::date = CURRENT_DATE - 1
""")).fetchone()
if row:
return {
"revenue": float(row[0] or 0),
"sku_count": int(row[1] or 0),
"order_count": int(row[2] or 0),
}
return {"revenue": 0, "sku_count": 0, "order_count": 0}
except Exception as e:
logger.error("[OpenClaw] 昨日業績讀取失敗: %s", e)
raise
finally:
session.close()
def _fetch_monthly_sales_summary(start_date: datetime, end_date: datetime) -> Dict[str, Any]:
"""上月業績彙總,含 MoM / YoY 比較"""
session = get_session()
try:
row = session.execute(text("""
SELECT
SUM(COALESCE("總業績"::numeric, 0)) AS revenue,
COUNT(DISTINCT "商品ID") AS sku_count,
COUNT(*) AS order_count,
AVG(COALESCE("總業績"::numeric, 0)) AS avg_order_value
FROM daily_sales_snapshot
WHERE snapshot_date::date BETWEEN :start AND :end
"""), {"start": start_date.date(), "end": end_date.date()}).fetchone()
revenue = float(row[0] or 0) if row else 0
sku_count = int(row[1] or 0) if row else 0
avg_order_value = float(row[3] or 0) if row else 0
# 上上月MoM
prev_start = (start_date - timedelta(days=1)).replace(day=1)
prev_end = start_date - timedelta(days=1)
prev_row = session.execute(text("""
SELECT SUM(COALESCE("總業績"::numeric, 0)) AS revenue
FROM daily_sales_snapshot
WHERE snapshot_date::date BETWEEN :start AND :end
"""), {"start": prev_start.date(), "end": prev_end.date()}).fetchone()
prev_revenue = float(prev_row[0] or 0) if prev_row else 0
mom_pct = ((revenue - prev_revenue) / prev_revenue * 100) if prev_revenue else 0
# 去年同月YoY
yoy_start = start_date.replace(year=start_date.year - 1)
yoy_end = end_date.replace(year=end_date.year - 1)
yoy_row = session.execute(text("""
SELECT SUM(COALESCE("總業績"::numeric, 0)) AS revenue
FROM daily_sales_snapshot
WHERE snapshot_date::date BETWEEN :start AND :end
"""), {"start": yoy_start.date(), "end": yoy_end.date()}).fetchone()
yoy_revenue = float(yoy_row[0] or 0) if yoy_row else 0
yoy_pct = ((revenue - yoy_revenue) / yoy_revenue * 100) if yoy_revenue else 0
return {
"revenue": revenue,
"sku_count": sku_count,
"order_count": int(row[2] or 0) if row else 0,
"avg_order_value": avg_order_value,
"mom_pct": round(mom_pct, 1),
"yoy_pct": round(yoy_pct, 1),
}
except Exception as e:
logger.error("[OpenClaw] 月度業績讀取失敗: %s", e)
raise
finally:
session.close()
def _fetch_price_trend_summary(days: int = 30) -> Dict[str, Any]:
"""近N天價格異動統計"""
session = get_session()
try:
row = session.execute(text("""
SELECT
COUNT(*) AS total_changes,
AVG(ABS(pr2.price - pr1.price) / pr1.price * 100) AS avg_change_pct,
SUM(CASE WHEN pr2.price < pr1.price THEN 1 ELSE 0 END) AS price_cuts,
SUM(CASE WHEN pr2.price > pr1.price THEN 1 ELSE 0 END) AS price_raises
FROM price_records pr2
JOIN price_records pr1 ON pr1.product_id = pr2.product_id
AND pr1.timestamp = (
SELECT MAX(timestamp) FROM price_records
WHERE product_id = pr2.product_id
AND timestamp < pr2.timestamp - INTERVAL '1 day'
)
WHERE pr2.timestamp >= NOW() - :days * INTERVAL '1 day'
AND ABS(pr2.price - pr1.price) / pr1.price > 0.005
"""), {"days": days}).fetchone()
if row and row[0]:
return {
"price_changes": int(row[0]),
"avg_change_pct": round(float(row[1] or 0), 1),
"price_cuts": int(row[2] or 0),
"price_raises": int(row[3] or 0),
}
return {"price_changes": 0, "avg_change_pct": 0, "price_cuts": 0, "price_raises": 0}
except Exception as e:
logger.error("[OpenClaw] 價格趨勢統計讀取失敗: %s", e)
raise
finally:
session.close()
def _extract_action_items_daily(report_text: str) -> List[str]:
"""從日報文字中解析「今日3件事」行動項目"""
lines = report_text.split("\n")
items = []
in_action_section = False
for line in lines:
if "今日" in line and ("3件" in line or "行動" in line or "優先" in line):
in_action_section = True
continue
if in_action_section:
stripped = line.strip()
if stripped.startswith(("", "-", "1.", "2.", "3.", "", "", "")):
items.append(stripped.lstrip("•-①②③").strip().lstrip("123.").strip())
elif stripped.startswith("<b>") and items:
break
return items[:5]
def _format_threats(threats: List[Dict]) -> str:
if not threats:
return " (無近期競價威脅)"
lines = []
for t in threats[:5]:
lines.append(
f" • SKU {t['sku']}:價差 {t.get('gap_pct', 0):+.1f}%"
f"業績週變化 {t.get('sales_delta', 0):+.1f}%"
f"信心 {t.get('confidence', 0):.2f}"
)
return "\n".join(lines)
def _format_recommendations(recs: List[Dict]) -> str:
if not recs:
return " (無待處理定價建議)"
lines = []
for r in recs[:5]:
lines.append(
f"{r.get('name', r.get('sku', ''))[:30]}{r.get('strategy', '')}"
f"信心 {r.get('confidence', 0):.2f}"
)
return "\n".join(lines)
def _format_categories(cats: List[Dict]) -> str:
if not cats:
return " (無品類數據)"
lines = []
for c in cats[:5]:
lines.append(
f"{c.get('category', '未分類')}"
f"NT${c.get('revenue', 0):,.0f}{c.get('sku_count', 0)} 個 SKU"
)
return "\n".join(lines)
def _extract_action_items(report_text: str) -> List[str]:
"""從報告文字中解析行動清單48小時優先行動"""
lines = report_text.split("\n")
items = []
in_action_section = False
for line in lines:
if "48小時" in line or "優先行動" in line:
in_action_section = True
continue
if in_action_section:
stripped = line.strip()
if stripped.startswith("") or stripped.startswith("-") or stripped.startswith("["):
items.append(stripped.lstrip("•-").strip())
elif stripped.startswith("<b>") and items:
break
return items[:8]
# ═══════════════════════════════════════════════════════════════════════════════
# Operation Ollama-First v5.0 Phase 3 — Hermes 模板模式A8 fullstack
#
# 設計理念:日報 70% 是結構化 KPI純 SQL 算20% 是 Ollama-first 寫的洞察段落,
# 10% 是 Hermes 規則引擎產的行動清單。整體 token 從 ~28K → ~8K-71%)。
#
# Token 預算(單次日報):
# - Legacy 全文system + user prompt ~3K輸出 ~1.5K,含 raw KPI 嵌 prompt → ~28K 總用量
# - Hermes 模板KPI 已預算好prompt 僅含「精簡 KPI 摘要 + 寫 200 字」 ~600 tokens prompt
# 輸出 ~400 tokens總計 ~8K含 ai_call_logger 的 meta + retry buffer
#
# 規則:
# 1. 模板模式失敗 → 上層 generate_daily_report 自動降級回 legacy不讓 cron 整個掛掉)
# 2. ai_insights schema 不變(仍 type='daily_report'metadata_json 加 mode='hermes_template' 區分
# 3. _call_gemini caller 用 'openclaw_daily_insight',方便 ai_calls 統計區分新舊
# ═══════════════════════════════════════════════════════════════════════════════
# 上限常數 — 避免 magic number 散落
DAILY_TOP_SKU_LIMIT = 10
DAILY_PRICE_GAP_LIMIT = 5
DAILY_INSIGHT_MAX_TOKENS = 400
DAILY_INSIGHT_TIMEOUT_S = 60
def _compute_daily_kpi(target_date) -> Dict[str, Any]:
"""純 SQL + Hermes 規則計算當日所有結構化 KPI不走 LLM。
Args:
target_date: ``datetime.date``,要分析的目標日(一般為昨日)。
Returns:
dict: 給 Jinja2 模板的完整渲染上下文,含:
revenue / orders / top_skus / price_gaps /
inventory_alerts / priority_actions
"""
from datetime import date as _date_cls
if hasattr(target_date, 'date') and not isinstance(target_date, _date_cls):
target_date = target_date.date()
if not isinstance(target_date, _date_cls):
raise TypeError(f"target_date 必須是 date實得 {type(target_date)}")
return {
'revenue': _query_revenue_kpi(target_date),
'orders': _query_orders_kpi(target_date),
'top_skus': _query_top_skus(target_date, limit=DAILY_TOP_SKU_LIMIT),
'price_gaps': _query_competitor_price_alerts(target_date, limit=DAILY_PRICE_GAP_LIMIT),
'inventory_alerts': _query_inventory_anomalies(target_date),
'priority_actions': _generate_priority_actions(target_date),
}
def _query_revenue_kpi(target_date) -> Dict[str, Any]:
"""營收 KPI今日 / 昨日 / 7日均 + 變化%"""
session = get_session()
try:
row = session.execute(text("""
SELECT
SUM(CASE WHEN snapshot_date::date = :d THEN
COALESCE("總業績"::numeric, 0) ELSE 0 END) AS today,
SUM(CASE WHEN snapshot_date::date = :d - 1 THEN
COALESCE("總業績"::numeric, 0) ELSE 0 END) AS yesterday,
SUM(CASE WHEN snapshot_date::date BETWEEN :d - 7 AND :d - 1 THEN
COALESCE("總業績"::numeric, 0) ELSE 0 END) / 7.0 AS avg_7d
FROM daily_sales_snapshot
WHERE snapshot_date::date BETWEEN :d - 7 AND :d
"""), {"d": target_date}).fetchone()
today = float((row[0] or 0) if row else 0)
yesterday = float((row[1] or 0) if row else 0)
avg_7d = float((row[2] or 0) if row else 0)
dod_pct = ((today - yesterday) / yesterday * 100.0) if yesterday else 0.0
wow_pct = ((today - avg_7d) / avg_7d * 100.0) if avg_7d else 0.0
return {
"today": today,
"yesterday": yesterday,
"avg_7d": avg_7d,
"dod_pct": round(dod_pct, 1),
"wow_pct": round(wow_pct, 1),
}
except Exception as e:
logger.error("[OpenClaw] revenue KPI 讀取失敗: %s", e)
return {"today": 0.0, "yesterday": 0.0, "avg_7d": 0.0, "dod_pct": 0.0, "wow_pct": 0.0}
finally:
session.close()
def _query_orders_kpi(target_date) -> Dict[str, Any]:
"""訂單數 / SKU 數 / 平均客單價 KPI。
Note: daily_sales_snapshot 並無 order_id 欄位,「訂單數」以 row 數近似
(與 legacy `_fetch_yesterday_sales` 同義;保留行為一致性)。
"""
session = get_session()
try:
row = session.execute(text("""
SELECT
COUNT(*) AS today_rows,
COUNT(DISTINCT "商品ID") AS today_sku,
AVG(NULLIF(COALESCE("總業績"::numeric, 0), 0)) AS avg_value_today
FROM daily_sales_snapshot
WHERE snapshot_date::date = :d
"""), {"d": target_date}).fetchone()
prev = session.execute(text("""
SELECT COUNT(*) FROM daily_sales_snapshot
WHERE snapshot_date::date = :d - 1
"""), {"d": target_date}).fetchone()
today_rows = int((row[0] or 0) if row else 0)
today_sku = int((row[1] or 0) if row else 0)
avg_value = float((row[2] or 0) if row else 0)
yesterday_rows = int((prev[0] or 0) if prev else 0)
dod_pct = ((today_rows - yesterday_rows) / yesterday_rows * 100.0) if yesterday_rows else 0.0
return {
"today": today_rows,
"yesterday": yesterday_rows,
"sku_count": today_sku,
"avg_order_value": avg_value,
"dod_pct": round(dod_pct, 1),
}
except Exception as e:
logger.error("[OpenClaw] orders KPI 讀取失敗: %s", e)
return {"today": 0, "yesterday": 0, "sku_count": 0, "avg_order_value": 0.0, "dod_pct": 0.0}
finally:
session.close()
def _query_top_skus(target_date, limit: int = 10) -> List[Dict[str, Any]]:
"""當日 TOP N 熱銷 SKU。"""
session = get_session()
try:
rows = session.execute(text("""
SELECT
"商品ID" AS sku,
"商品名稱" AS name,
SUM(COALESCE("總業績"::numeric, 0)) AS revenue,
COUNT(*) AS qty
FROM daily_sales_snapshot
WHERE snapshot_date::date = :d
GROUP BY "商品ID", "商品名稱"
ORDER BY revenue DESC
LIMIT :lim
"""), {"d": target_date, "lim": limit}).fetchall()
return [
{
"sku": r[0] or "",
"name": (r[1] or "")[:60],
"revenue": float(r[2] or 0),
"qty": int(r[3] or 0),
}
for r in rows
]
except Exception as e:
logger.error("[OpenClaw] top SKUs 讀取失敗: %s", e)
return []
finally:
session.close()
def _query_competitor_price_alerts(target_date, limit: int = 5) -> List[Dict[str, Any]]:
"""TOP N 競品價差警示(沿用 _fetch_top_threats 並補完 SKU 名稱)。"""
threats = _fetch_top_threats(limit)
if not threats:
return []
sku_codes = [t.get("sku") for t in threats if t.get("sku")]
name_map: Dict[str, str] = {}
if sku_codes:
session = get_session()
try:
rows = session.execute(
text("""
SELECT i_code, name FROM products
WHERE i_code = ANY(:skus)
""").bindparams(bindparam("skus", expanding=True)),
{"skus": sku_codes},
).fetchall()
name_map = {r[0]: (r[1] or "")[:60] for r in rows}
except Exception as e:
logger.warning("[OpenClaw] 競品 SKU 名稱補完失敗(非阻塞): %s", e)
finally:
session.close()
alerts: List[Dict[str, Any]] = []
for t in threats:
sku = t.get("sku") or ""
gap = float(t.get("gap_pct") or 0)
alerts.append({
"sku": sku,
"sku_name": name_map.get(sku, sku),
"competitor": "PChome/蝦皮",
"gap_pct": round(gap, 1),
"momo_price": t.get("momo_price"),
"competitor_price": t.get("pchome_price"),
"confidence": float(t.get("confidence") or 0),
})
return alerts
def _query_inventory_anomalies(target_date) -> List[Dict[str, Any]]:
"""庫存 / 退單異常品項Hermes/NemoTron 寫入的 ai_insights 子類)。"""
session = get_session()
try:
rows = session.execute(text("""
SELECT product_sku, content, insight_type, confidence, metadata_json
FROM ai_insights
WHERE insight_type IN ('inventory_alert', 'return_alert', 'stock_anomaly')
AND status = 'approved'
AND created_at >= NOW() - INTERVAL '24 hours'
ORDER BY confidence DESC
LIMIT 10
""")).fetchall()
out: List[Dict[str, Any]] = []
for r in rows:
meta = {}
try:
meta = json.loads(r[4]) if r[4] else {}
except Exception:
meta = {}
out.append({
"sku": r[0] or "",
"summary": (r[1] or "")[:120],
"type": r[2] or "",
"confidence": float(r[3] or 0),
"extra": meta,
})
return out
except Exception as e:
logger.warning("[OpenClaw] 庫存異常讀取失敗(非阻塞): %s", e)
return []
finally:
session.close()
def _generate_priority_actions(target_date) -> List[str]:
"""規則引擎產生 48h 優先事項(純規則,無 LLM"""
actions: List[str] = []
threats = _fetch_top_threats(limit=DAILY_PRICE_GAP_LIMIT)
for t in threats[:3]:
gap = float(t.get("gap_pct") or 0)
if abs(gap) >= 10.0:
actions.append(
f"⚠️ SKU {t.get('sku') or ''} 競品價差 {gap:+.1f}%,建議 48h 內檢視跟降"
)
recs = _fetch_top_recommendations(limit=3)
for r in recs:
actions.append(
f"💰 {(r.get('name') or r.get('sku') or '')[:40]}{r.get('strategy') or '待覆核'}"
f"(信心 {float(r.get('confidence') or 0):.2f}"
)
invs = _query_inventory_anomalies(target_date)
for inv in invs[:2]:
actions.append(f"📦 SKU {inv.get('sku')}{(inv.get('summary') or '')[:60]}")
if not actions:
actions.append("✅ 今日無高優先警示,維持現有策略執行。")
return actions[:8]
def _compute_gemini_insight(kpi: Dict[str, Any], period: str) -> str:
"""給 Gemini 純結構化 KPI已算好請其寫 150-200 字繁中策略洞察。
Token 控制:精簡 prompt~600 tokens+ ``max_output_tokens=DAILY_INSIGHT_MAX_TOKENS``。
Gemini 失敗時回降級訊息(不拋例外,模板仍可渲染)。
"""
revenue = kpi.get("revenue") or {}
orders = kpi.get("orders") or {}
top_skus = kpi.get("top_skus") or []
price_gaps = kpi.get("price_gaps") or []
inv_alerts = kpi.get("inventory_alerts") or []
top_names = [s.get("name", "") for s in top_skus[:3]]
system_prompt = (
"你是 OpenClaw 日報洞察分析師。語言:繁體中文(台灣用語)。"
"嚴禁簡體字、嚴禁套話、嚴禁重複數字。聚焦『解讀』與『明日行動』。"
)
user_prompt = (
f"根據以下今日 ({period}) KPI已計算寫 150-200 字策略洞察:\n\n"
f"營收NT${revenue.get('today', 0):,.0f}DoD {revenue.get('dod_pct', 0):+.1f}%、"
f"vs 7日均 {revenue.get('wow_pct', 0):+.1f}%\n"
f"訂單數:{orders.get('today', 0)}DoD {orders.get('dod_pct', 0):+.1f}%\n"
f"TOP 商品:{top_names}\n"
f"價差警示:{len(price_gaps)}\n"
f"庫存異常:{len(inv_alerts)}\n\n"
"請聚焦:(1) 今日最值得統帥注意的 1-2 件事;(2) 明日建議行動。"
"不要重複上面的數字本身,專注解讀。控制 200 字內。"
)
text_out = _call_openclaw_llm_ollama_first(
system_prompt,
user_prompt,
temperature=0.35,
caller="openclaw_daily_insight",
num_predict=512,
)
if text_out:
return text_out.strip()
direction = "上升" if revenue.get("dod_pct", 0) > 0 else "下滑" if revenue.get("dod_pct", 0) < 0 else "持平"
return (
f"今日營收 NT${revenue.get('today', 0):,.0f},與昨日相較{direction} "
f"{revenue.get('dod_pct', 0):+.1f}%vs 7日均 {revenue.get('wow_pct', 0):+.1f}%。"
f"TOP 商品集中在 {', '.join([n for n in top_names if n]) if top_names else ''}"
f"今有 {len(price_gaps)} 件競品價差警示與 {len(inv_alerts)} 件庫存異常待處理。"
"AI 洞察生成暫時不可用,已回退至規則性摘要。)"
)
def _render_daily_template_v2(context: Dict[str, Any]) -> str:
"""以 Jinja2 渲染 daily_report_v2.j2缺欄位優雅降級為「—」。
將 Jinja Environment 集中於此,方便 unit test 直接呼叫無需 Flask app context。
"""
from jinja2 import Environment, FileSystemLoader, Undefined
template_dir = os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
'templates',
)
class _SafeUndefined(Undefined):
"""缺欄位回 '' 而非 raise UndefinedError符合「優雅降級」需求。"""
def __str__(self) -> str:
return ""
def __html__(self) -> str:
return ""
def __getattr__(self, name: str):
return _SafeUndefined()
env = Environment(
loader=FileSystemLoader(template_dir),
undefined=_SafeUndefined,
autoescape=False, # 日報純文字 + emoji無 HTML 注入面
trim_blocks=True,
lstrip_blocks=True,
)
def _fmt_currency(v: Any) -> str:
try:
# 容錯Undefined / None / 空字串 → 降級為 —
if v is None or isinstance(v, Undefined) or v == "":
return ""
return f"NT${float(v):,.0f}"
except (TypeError, ValueError):
return ""
def _fmt_pct(v: Any) -> str:
try:
if v is None or isinstance(v, Undefined) or v == "":
return ""
return f"{float(v):+.1f}%"
except (TypeError, ValueError):
return ""
env.filters['format_currency'] = _fmt_currency
env.filters['format_pct'] = _fmt_pct
template = env.get_template('daily_report_v2.j2')
return template.render(**context)
def _generate_daily_report_hermes_template() -> dict:
"""Hermes 模板模式日報 — 結構化 KPI + 200 字 Gemini 洞察 + 規則行動清單。
流程:
1. 取昨日業績 stale gate與 legacy 同邏輯)
2. _compute_daily_kpi 純 SQL 算齊所有結構化 KPI
3. _compute_gemini_insight 取 200 字洞察(精簡 prompt
4. Jinja2 渲染 daily_report_v2.j2
5. 持久化 ai_insightstype='daily_report'metadata.mode='hermes_template'
6. Telegram 推播(沿用 legacy 圖表生成)
回傳合約與 legacy 完全一致。
"""
now = datetime.now()
yesterday = now - timedelta(days=1)
period = yesterday.strftime("%Y年%m月%d")
target_date = yesterday.date()
weekday_map = ['週一', '週二', '週三', '週四', '週五', '週六', '週日']
weekday = weekday_map[target_date.weekday()]
logger.info("[OpenClaw] 日報任務啟動Hermes 模板模式period=%s", period)
# ── Step 1stale gate與 legacy 對齊)─────────────────────────────────
sales = _fetch_sales_summary(7)
if sales.get("stale"):
last_date = str(sales.get("last_date"))
logger.warning(
"[OpenClaw] 日報任務跳過模板模式daily_sales_snapshot 已停更 last_date=%s period=%s",
last_date, period,
)
_send_data_stale_alert(report_type="daily_report", last_date=last_date, period=period)
return {
"status": "skipped",
"report_type": "daily_report",
"reason": "data_stale",
"last_date": last_date,
"period": period,
}
# ── Step 2純 SQL 算 KPI ─────────────────────────────────────────────
kpi = _compute_daily_kpi(target_date)
# ── Step 3Ollama-first 寫 200 字洞察caller 細分)───────────────────
insight_text = _compute_gemini_insight(kpi, period)
# ── Step 4Jinja2 渲染 ───────────────────────────────────────────────
report_content = _render_daily_template_v2({
"today": period,
"weekday": weekday,
"revenue": kpi["revenue"],
"orders": kpi["orders"],
"top_skus": kpi["top_skus"],
"price_gaps": kpi["price_gaps"],
"inventory_alerts": kpi["inventory_alerts"],
"priority_actions": kpi["priority_actions"],
"gemini_insight": insight_text,
})
# ── Step 5圖表沿用 legacy非阻塞─────────────────────────────────
charts: List[tuple] = []
try:
from services.chart_generator_service import (
revenue_trend_chart,
price_gap_bar_chart,
)
rev_chart = revenue_trend_chart(7, "近7日")
if rev_chart:
charts.append(("revenue_7d.png", rev_chart, "📈 近7日營收趨勢"))
if kpi["price_gaps"]:
gap_chart = price_gap_bar_chart(
[{"sku": a["sku"], "gap_pct": a["gap_pct"]} for a in kpi["price_gaps"]],
"競品價差警示TOP 5",
)
if gap_chart:
charts.append(("price_gap.png", gap_chart, "⚠️ 競品價差分析"))
except Exception as e:
logger.warning("[OpenClaw] 日報圖表生成失敗(非阻塞): %s", e)
# ── Step 6持久化 ──────────────────────────────────────────────────────
metadata = {
"period": period,
"model": STRATEGY_MODEL,
"mode": "hermes_template", # ← 區分新舊模式關鍵欄位
"template_version": "daily_report_v2",
"today_revenue": kpi["revenue"].get("today", 0),
"dod_pct": kpi["revenue"].get("dod_pct", 0),
"wow_pct": kpi["revenue"].get("wow_pct", 0),
"top_sku_count": len(kpi["top_skus"]),
"price_gap_count": len(kpi["price_gaps"]),
"inventory_alert_count": len(kpi["inventory_alerts"]),
"priority_action_count": len(kpi["priority_actions"]),
"chart_count": len(charts),
"generated_at": now.isoformat(),
}
insight_id = _save_to_ai_insights(
insight_type="daily_report",
content=report_content,
confidence=0.85,
metadata=metadata,
period=target_date.strftime("%Y-%m-%d"),
)
action_items = list(kpi["priority_actions"])
_save_action_items(action_items, insight_id)
# ── Step 7Telegram 推播 ──────────────────────────────────────────────
try:
from services.telegram_templates import daily_report_header
header = daily_report_header(
date_str=period,
revenue=kpi["revenue"].get("today", 0),
wow=kpi["revenue"].get("wow_pct", 0),
threat_count=len(kpi["price_gaps"]),
opportunity_count=0,
)
_push_report_with_charts(header, report_content, charts, "日報(模板模式)")
except Exception as e:
logger.error("[OpenClaw] 日報 header 組裝失敗(模板模式): %s", e)
logger.info(
"[OpenClaw] 日報完成Hermes 模板模式insight_id=%s charts=%d actions=%d",
insight_id, len(charts), len(action_items),
)
return {
"status": "ok",
"report_type": "daily_report",
"insight_id": insight_id,
"period": period,
"chart_count": len(charts),
"action_count": len(action_items),
"mode": "hermes_template",
}