feat(aiops): 完整 MCP + OpenClaw 全景電商分析管線
All checks were successful
CD Pipeline / deploy (push) Successful in 1m14s

- 新增 services/mcp_collector_service.py:Gemini Search Grounding 外部情報收集
- 重寫 services/openclaw_strategist_service.py:真實 Gemini 2.5 Flash 分析,DB 持久化
- scheduler.py:修復 generate_meta_analysis_report ImportError,串接 Meta-Analysis
- elephant_alpha_autonomous_engine.py:新增 weekly_insight 觸發器路由 OpenClaw

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
ogt
2026-04-21 12:50:35 +08:00
parent 31dfbcdd4d
commit a62b83f488
4 changed files with 1443 additions and 650 deletions

View File

@@ -1674,6 +1674,12 @@ def run_icaim_analysis_task():
if not result.threats:
logging.info("[Scheduler] [ICAIM] 無威脅商品,跳過 NemoTron dispatch")
# 仍觸發 OpenClaw Meta-Analysis 更新系統效能快照
try:
from services.openclaw_strategist_service import generate_meta_analysis_report
generate_meta_analysis_report()
except Exception as _meta_e:
logging.warning(f"[Scheduler] [ICAIM] Meta-Analysis 非阻塞失敗: {_meta_e}")
return
# Step 2NemoTron 派發器 → Telegram
@@ -1689,6 +1695,13 @@ def run_icaim_analysis_task():
)
_save_stats('icaim_dispatch', {**dispatch_result, "status": "Success"})
# Step 3派發完成後觸發 OpenClaw Meta-Analysis非阻塞
try:
from services.openclaw_strategist_service import generate_meta_analysis_report
generate_meta_analysis_report()
except Exception as _meta_e:
logging.warning(f"[Scheduler] [ICAIM] Meta-Analysis 非阻塞失敗: {_meta_e}")
except Exception as e:
import traceback as _tb
logging.error(f"[Scheduler] [ICAIM] 🚨 任務異常 | Error: {e}")
@@ -1885,15 +1898,25 @@ def run_backup_monitor_task():
def run_openclaw_meta_analysis_task():
"""週日 02:00 — OpenClaw 週報 Meta-AnalysisAI 系統學習效能自我審視)"""
""" 6 小時 — OpenClaw Meta-AnalysisAI 系統效能自我審視 + 電商洞察快照"""
try:
from services.openclaw_strategist_service import generate_meta_analysis_report
report = generate_meta_analysis_report()
logging.info(f"[Scheduler] [MetaAnalysis] 完成 | 長度={len(report)} 字元")
logging.info(f"[Scheduler] [MetaAnalysis] 完成 | 長度={len(report)} 字元")
_save_stats('meta_analysis', {"status": "OK", "length": len(report)})
except Exception as e:
logging.error(f"[Scheduler] [MetaAnalysis] Meta-Analysis 任務異常: {e}")
import traceback as _tb
logging.error(f"[Scheduler] [MetaAnalysis] 🚨 Meta-Analysis 任務異常: {e}")
_save_stats('meta_analysis', {"status": "Error", "error": str(e)})
try:
from services.auto_heal_service import auto_heal_service
auto_heal_service.handle_exception(
task_name="run_openclaw_meta_analysis_task",
exception=e,
traceback_str=_tb.format_exc(),
)
except Exception as _heal_e:
logging.error(f"[Scheduler] [MetaAnalysis] auto_heal_service 失敗: {_heal_e}")
def run_dedup_batch_task():

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,225 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
services/mcp_collector_service.py
MCP 外部情報收集層
透過 Gemini Google Search Grounding 收集外部市場情報,供 OpenClaw 戰略分析使用:
- 台灣電商市場趨勢
- 節日 / 促銷行事曆
- 季節性消費洞察
- 競品動態(蝦皮/PChome/momo/Yahoo
- 消費者情緒與熱銷品類
結果快取至 ai_insightstype='mcp_cache'24h TTL 避免重複呼叫。
"""
import json
import logging
import os
import time
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional
from database.manager import get_session
from sqlalchemy import text
logger = logging.getLogger(__name__)
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY", "")
MCP_CACHE_TTL_HOURS = int(os.getenv("MCP_CACHE_TTL_HOURS", "24"))
MCP_MODEL = os.getenv("MCP_GEMINI_MODEL", "gemini-2.5-flash-preview-05-20")
# ── 查詢主題定義 ────────────────────────────────────────────────────────────
_SEARCH_TOPICS = {
"market_trends": (
"台灣電商 momo購物網 2025年熱銷商品趨勢 消費者行為 美妝保養 家電 生活用品"
),
"holiday_calendar": (
"2025年台灣重要節日促銷行事曆 母親節 618購物節 雙11 雙12 中秋 跨年 電商大促"
),
"seasonal_insights": (
"台灣電商季節性銷售趨勢 換季商品 夏季防曬 冬季保暖 Q3 Q4 消費高峰"
),
"competitor_intel": (
"momo購物網 PChome 蝦皮 Yahoo購物 2025年競爭策略 促銷活動 物流比較"
),
"consumer_sentiment": (
"台灣消費者 2025 購物偏好 低價高CP 品牌忠誠度 直播購物 社群電商 KOL影響"
),
"pricing_strategy": (
"台灣電商定價策略 動態定價 競品比價 心理定價 促銷折扣最佳時機"
),
}
class MCPCollectorService:
"""
外部情報收集服務MCP 節點)
使用 Gemini Search Grounding 抓取即時市場資訊
"""
def __init__(self):
self._initialized = False
self._genai = None
def _ensure_init(self) -> bool:
if self._initialized:
return True
if not GEMINI_API_KEY:
logger.warning("[MCP] GEMINI_API_KEY 未設定,跳過外部情報收集")
return False
try:
import google.generativeai as genai
genai.configure(api_key=GEMINI_API_KEY)
self._genai = genai
self._initialized = True
return True
except ImportError:
logger.error("[MCP] google-generativeai 未安裝")
return False
except Exception as e:
logger.error("[MCP] Gemini 初始化失敗: %s", e)
return False
# ── 快取讀寫 ────────────────────────────────────────────────────────────
def _read_cache(self, topic: str) -> Optional[str]:
session = get_session()
try:
row = session.execute(
text(f"""
SELECT content FROM ai_insights
WHERE insight_type = 'mcp_cache'
AND created_by = 'mcp_collector'
AND metadata_json::jsonb ->> 'topic' = :topic
AND created_at >= NOW() - INTERVAL '{MCP_CACHE_TTL_HOURS} hours'
ORDER BY created_at DESC LIMIT 1
"""),
{"topic": topic},
).fetchone()
if row:
logger.debug("[MCP] 快取命中: %s", topic)
return row[0]
return None
except Exception:
return None
finally:
session.close()
def _write_cache(self, topic: str, content: str) -> None:
session = get_session()
try:
session.execute(text("""
INSERT INTO ai_insights
(insight_type, content, confidence, created_by, status, metadata_json)
VALUES ('mcp_cache', :content, 0.9, 'mcp_collector', 'active', :meta)
"""), {
"content": content[:4000],
"meta": json.dumps({"topic": topic, "model": MCP_MODEL, "cached_at": datetime.now().isoformat()})
})
session.commit()
except Exception as e:
logger.warning("[MCP] 快取寫入失敗: %s", e)
session.rollback()
finally:
session.close()
# ── 單主題搜尋 ──────────────────────────────────────────────────────────
def _search_topic(self, topic: str, query: str) -> str:
if not self._ensure_init():
return ""
cached = self._read_cache(topic)
if cached:
return cached
try:
model = self._genai.GenerativeModel(
model_name=MCP_MODEL,
tools=["google_search_retrieval"],
)
response = model.generate_content(
f"請用繁體中文整理以下主題的最新資訊提供具體數據與洞察500字以內\n{query}"
)
content = response.text or ""
if content:
self._write_cache(topic, content)
return content
except Exception as e:
logger.warning("[MCP] 搜尋失敗 topic=%s: %s", topic, e)
return ""
# ── 公開介面 ────────────────────────────────────────────────────────────
def collect_all(self) -> Dict[str, str]:
"""
收集所有外部情報主題,回傳 {topic: content} 字典。
各主題獨立失敗不影響整體。
"""
results = {}
for topic, query in _SEARCH_TOPICS.items():
try:
results[topic] = self._search_topic(topic, query)
time.sleep(0.5) # 避免 Gemini rate limit
except Exception as e:
logger.error("[MCP] topic=%s 收集失敗: %s", topic, e)
results[topic] = ""
logger.info("[MCP] 收集完成,有效主題=%d/%d", sum(1 for v in results.values() if v), len(results))
return results
def collect_topic(self, topic: str) -> str:
"""收集單一主題"""
query = _SEARCH_TOPICS.get(topic, topic)
return self._search_topic(topic, query)
def get_holiday_context(self) -> str:
"""取得節日行事曆(供 Prompt 注入)"""
now = datetime.now()
month = now.month
# 靜態台灣電商節日知識庫(無需 API 呼叫)
static_calendar = {
1: "元旦促銷、農曆新年備貨期1/20前後開始",
2: "農曆新年年貨、禮盒熱賣、情人節2/14保養/禮品衝量)",
3: "婦女節3/8、春季換季保養、開學季",
4: "清明連假、春季大促、換季服飾高峰",
5: "母親節5/2週前後美妝/保健/家電最高峰、520情人節",
6: "618購物節最大中年促銷全平台衝量、父親節備檔",
7: "父親節7/4週前後、暑假家電/3C/旅遊用品高峰",
8: "七夕情人節8/10前後、暑假尾聲出清",
9: "中秋節(禮盒/食品衝量、開學季3C/文具",
10: "雙10國慶、品牌週年慶百貨、電商 10月旺季",
11: "雙11光棍節全年最大促銷、品牌大促備貨",
12: "雙12年終慶、聖誕節12/25、跨年元旦備貨",
}
base = static_calendar.get(month, "")
# 加入下個月預告
next_month = (month % 12) + 1
next_base = static_calendar.get(next_month, "")
return (
f"當前月份:{now.strftime('%Y年%m月')}\n"
f"本月電商重點:{base}\n"
f"下月預告:{next_base}"
)
def get_seasonal_context(self) -> str:
"""季節性消費情境"""
month = datetime.now().month
seasons = {
(3, 4, 5): "春季:換季保養、外出服飾、春遊裝備",
(6, 7, 8): "夏季:防曬/美白、涼感寢具、戶外運動、冷氣清潔",
(9, 10, 11): "秋季:保濕修護、秋冬服飾、保健養生、熱飲週邊",
(12, 1, 2): "冬季:保暖寢具、暖身家電、年節禮品、養生補品",
}
for months, desc in seasons.items():
if month in months:
return desc
return ""
# 模組單例
mcp_collector = MCPCollectorService()

View File

@@ -1,31 +1,715 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
openclaw_strategist_service.py
OpenClaw 戰略分析師服務。
services/openclaw_strategist_service.py
OpenClaw 戰略分析師Gemini 2.5 Flash
re-export SSHJumpExecutor from auto_heal_service唯一來源
以及 OpenClaw 策略分析功能。
完整電商情報分析管線:
DB 爬蟲數據 + MCP 外部情報 → Gemini 深度分析 → ai_insights 持久化 → Telegram 推播
提供:
generate_weekly_strategy_report() — 週報(每週一 06:00
generate_meta_analysis_report() — AI 系統效能自我審視(每 6 小時)
分析維度:
1. 業績趨勢MoM / WoW
2. 競品價格比對
3. 定價策略建議
4. 行銷活動洞察
5. 季節性 / 節日機會
6. TOP 威脅 / 機會品項
7. 具體行動清單48h 優先事項)
"""
import json
import logging
from typing import Optional, Any
import os
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional
from database.manager import get_session
from sqlalchemy import text
# re-exportSSHJumpExecutor 維護於 auto_heal_service向後相容
from services.auto_heal_service import SSHJumpExecutor # noqa: F401
logger = logging.getLogger(__name__)
# SSHJumpExecutor 統一維護於 auto_heal_service此處 re-export 向後相容
from services.auto_heal_service import SSHJumpExecutor # noqa: F401
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY", "")
STRATEGY_MODEL = os.getenv("OPENCLAW_MODEL", "gemini-2.5-flash-preview-05-20")
TAIPEI_TZ_OFFSET = 8 # UTC+8
__all__ = ["SSHJumpExecutor", "generate_weekly_strategy_report"]
__all__ = [
"SSHJumpExecutor",
"generate_weekly_strategy_report",
"generate_meta_analysis_report",
]
def generate_weekly_strategy_report(context: Optional[Any] = None) -> dict:
# ═══════════════════════════════════════════════════════════════════════════════
# DB 數據讀取層
# ═══════════════════════════════════════════════════════════════════════════════
def _fetch_sales_summary(days: int = 14) -> Dict[str, Any]:
"""近 N 天業績彙總(本期 / 前期 對比)"""
session = get_session()
try:
rows = session.execute(text(f"""
SELECT
snapshot_date::date AS dt,
SUM(COALESCE("銷售金額"::numeric, 0)) AS revenue,
COUNT(DISTINCT "商品ID") AS sku_count
FROM daily_sales_snapshot
WHERE snapshot_date::date >= CURRENT_DATE - {days}
GROUP BY dt
ORDER BY dt DESC
""")).fetchall()
data = [{"date": str(r[0]), "revenue": float(r[1] or 0), "sku_count": int(r[2] or 0)}
for r in rows]
mid = len(data) // 2
curr_rev = sum(d["revenue"] for d in data[:mid]) if mid else 0
prev_rev = sum(d["revenue"] for d in data[mid:]) if mid else 0
wow = ((curr_rev - prev_rev) / prev_rev * 100) if prev_rev else 0
return {
"daily": data[:7],
"current_7d_revenue": curr_rev,
"prev_7d_revenue": prev_rev,
"wow_pct": round(wow, 1),
}
except Exception as e:
logger.warning("[OpenClaw] 業績數據讀取失敗: %s", e)
return {}
finally:
session.close()
def _fetch_top_threats(limit: int = 10) -> List[Dict]:
"""最新 TOP N 競價威脅(來自 Hermes 分析)"""
session = get_session()
try:
rows = session.execute(text("""
SELECT product_sku, content, confidence, metadata_json, created_at
FROM ai_insights
WHERE insight_type = 'price_alert'
AND status = 'active'
AND created_at >= NOW() - INTERVAL '48 hours'
ORDER BY confidence DESC
LIMIT :lim
"""), {"lim": limit}).fetchall()
result = []
for r in rows:
meta = {}
try:
meta = json.loads(r[3]) if r[3] else {}
except Exception:
pass
result.append({
"sku": r[0],
"summary": (r[1] or "")[:200],
"confidence": float(r[2] or 0),
"gap_pct": meta.get("gap_pct", 0),
"sales_delta": meta.get("sales_7d_delta_pct", 0),
"momo_price": meta.get("momo_price"),
"pchome_price": meta.get("pchome_price"),
})
return result
except Exception as e:
logger.warning("[OpenClaw] 威脅數據讀取失敗: %s", e)
return []
finally:
session.close()
def _fetch_top_recommendations(limit: int = 10) -> List[Dict]:
"""最新定價建議"""
session = get_session()
try:
rows = session.execute(text("""
SELECT sku, name, reason, strategy, confidence,
momo_price, pchome_price, gap_pct, sales_7d_delta
FROM ai_price_recommendations
WHERE status = 'pending'
AND created_at >= NOW() - INTERVAL '48 hours'
ORDER BY confidence DESC
LIMIT :lim
"""), {"lim": limit}).fetchall()
return [dict(zip(
["sku","name","reason","strategy","confidence","momo_price","pchome_price","gap_pct","sales_delta"],
r
)) for r in rows]
except Exception as e:
logger.warning("[OpenClaw] 建議數據讀取失敗: %s", e)
return []
finally:
session.close()
def _fetch_category_breakdown(days: int = 7) -> List[Dict]:
"""品類業績分佈"""
session = get_session()
try:
rows = session.execute(text(f"""
SELECT p.category,
SUM(COALESCE(s."銷售金額"::numeric, 0)) AS revenue,
COUNT(DISTINCT p.i_code) AS sku_count
FROM daily_sales_snapshot s
JOIN products p ON p.name = s."商品名稱"
WHERE s.snapshot_date::date >= CURRENT_DATE - {days}
AND p.status = 'ACTIVE'
GROUP BY p.category
ORDER BY revenue DESC
LIMIT 10
""")).fetchall()
return [{"category": r[0], "revenue": float(r[1] or 0), "sku_count": int(r[2] or 0)}
for r in rows]
except Exception as e:
logger.warning("[OpenClaw] 品類數據讀取失敗: %s", e)
return []
finally:
session.close()
def _fetch_competitor_summary() -> Dict[str, Any]:
"""競品價格整體概況"""
session = get_session()
try:
row = session.execute(text("""
SELECT
COUNT(*) AS total,
AVG((cp.price - pr.price) / pr.price * 100) AS avg_gap_pct,
SUM(CASE WHEN cp.price < pr.price * 0.9 THEN 1 ELSE 0 END) AS undercut_count,
SUM(CASE WHEN cp.price > pr.price * 1.1 THEN 1 ELSE 0 END) AS premium_count
FROM competitor_prices cp
JOIN products p ON p.i_code = cp.sku
JOIN (
SELECT DISTINCT ON (product_id) product_id, price
FROM price_records ORDER BY product_id, timestamp DESC
) pr ON pr.product_id = p.id
WHERE cp.expires_at > NOW()
""")).fetchone()
if row and row[0]:
return {
"total_skus": int(row[0]),
"avg_gap_pct": round(float(row[1] or 0), 1),
"undercut_count": int(row[2] or 0),
"premium_count": int(row[3] or 0),
}
return {}
except Exception as e:
logger.warning("[OpenClaw] 競品概況讀取失敗: %s", e)
return {}
finally:
session.close()
# ═══════════════════════════════════════════════════════════════════════════════
# DB 寫入層
# ═══════════════════════════════════════════════════════════════════════════════
def _save_to_ai_insights(
insight_type: str,
content: str,
confidence: float,
metadata: Dict[str, Any],
period: Optional[str] = None,
) -> Optional[int]:
"""將分析結果持久化到 ai_insights"""
session = get_session()
try:
row = session.execute(text("""
INSERT INTO ai_insights
(insight_type, content, confidence, created_by, status,
metadata_json, period, created_at)
VALUES (:type, :content, :conf, 'openclaw', 'active', :meta, :period, NOW())
RETURNING id
"""), {
"type": insight_type,
"content": content[:8000],
"conf": confidence,
"meta": json.dumps(metadata, ensure_ascii=False),
"period": period or datetime.now().strftime("%Y-%m-%d"),
}).fetchone()
session.commit()
insight_id = row[0] if row else None
logger.info("[OpenClaw] ai_insights 寫入成功 id=%s type=%s", insight_id, insight_type)
return insight_id
except Exception as e:
logger.error("[OpenClaw] ai_insights 寫入失敗: %s", e)
session.rollback()
return None
finally:
session.close()
def _save_action_items(actions: List[str], source_insight_id: Optional[int]) -> None:
"""將 AI 建議的行動項目寫入 action_plans"""
if not actions:
return
session = get_session()
try:
for i, action in enumerate(actions[:10]):
session.execute(text("""
INSERT INTO action_plans
(action_type, description, status, priority, metadata_json, created_at)
VALUES ('openclaw_recommendation', :desc, 'pending', :priority, :meta, NOW())
"""), {
"desc": action[:500],
"priority": i + 1,
"meta": json.dumps({"source_insight_id": source_insight_id, "created_by": "openclaw"}),
})
session.commit()
logger.info("[OpenClaw] action_plans 寫入 %d", len(actions[:10]))
except Exception as e:
logger.warning("[OpenClaw] action_plans 寫入失敗: %s", e)
session.rollback()
finally:
session.close()
# ═══════════════════════════════════════════════════════════════════════════════
# Gemini 呼叫層
# ═══════════════════════════════════════════════════════════════════════════════
def _call_gemini(system_prompt: str, user_prompt: str, temperature: float = 0.4) -> Optional[str]:
"""呼叫 Gemini回傳文字失敗回傳 None"""
if not GEMINI_API_KEY:
logger.warning("[OpenClaw] GEMINI_API_KEY 未設定")
return None
try:
import google.generativeai as genai
genai.configure(api_key=GEMINI_API_KEY)
model = genai.GenerativeModel(
model_name=STRATEGY_MODEL,
generation_config=genai.types.GenerationConfig(
temperature=temperature,
max_output_tokens=4096,
),
system_instruction=system_prompt,
)
response = model.generate_content(
user_prompt,
request_options={"timeout": 180},
)
return response.text or ""
except Exception as e:
logger.error("[OpenClaw] Gemini 呼叫失敗: %s", e)
return None
# ═══════════════════════════════════════════════════════════════════════════════
# Telegram 推播
# ═══════════════════════════════════════════════════════════════════════════════
def _send_strategy_telegram(title: str, report_type: str, period: str, content: str) -> None:
try:
from services.telegram_templates import report as tpl_report, _send_telegram_raw
# Telegram 訊息長度限制 4096分段發送
header = tpl_report(title, report_type, period, "")
chunks = _split_message(content, max_len=3800 - len(header))
for i, chunk in enumerate(chunks):
msg = tpl_report(title, report_type, period, chunk) if i == 0 else chunk
_send_telegram_raw(msg)
except Exception as e:
logger.error("[OpenClaw] Telegram 推播失敗: %s", e)
def _split_message(text: str, max_len: int = 3800) -> List[str]:
if len(text) <= max_len:
return [text]
chunks = []
while text:
chunks.append(text[:max_len])
text = text[max_len:]
return chunks
# ═══════════════════════════════════════════════════════════════════════════════
# 主要公開函式
# ═══════════════════════════════════════════════════════════════════════════════
def generate_weekly_strategy_report(
context: Optional[Any] = None,
force_tg_alert: bool = False,
) -> dict:
"""
OpenClaw 週報生成(戰略分析)。
當 ElephantAlpha orchestrator 分派 openclaw generate_market_analysis 時呼叫。
OpenClaw 全景電商週報(每週一 06:00
流程:
1. 讀取 DB業績 / 競品 / 威脅 / 建議 / 品類
2. MCP 收集:外部市場趨勢 / 節日 / 競品動態
3. Gemini 2.5 Flash 深度分析
4. 持久化 → ai_insights + action_plans
5. Telegram 推播
"""
logger.info("[OpenClaw] generate_weekly_strategy_report called")
# TODO: 接入 OpenClaw LLM 生成真實週報
now = datetime.now()
period = f"{now.strftime('%Y年第%W週')} ({now.strftime('%m/%d')})"
logger.info("[OpenClaw] 週報任務啟動 period=%s", period)
# ── Step 1DB 數據收集 ──────────────────────────────────────────────────
sales = _fetch_sales_summary(14)
threats = _fetch_top_threats(10)
recommendations = _fetch_top_recommendations(10)
categories = _fetch_category_breakdown(7)
competitor_summary = _fetch_competitor_summary()
# ── Step 2MCP 外部情報 ─────────────────────────────────────────────────
mcp_data: Dict[str, str] = {}
try:
from services.mcp_collector_service import mcp_collector
mcp_data = mcp_collector.collect_all()
holiday_ctx = mcp_collector.get_holiday_context()
seasonal_ctx = mcp_collector.get_seasonal_context()
except Exception as e:
logger.warning("[OpenClaw] MCP 收集失敗(非阻塞): %s", e)
holiday_ctx = ""
seasonal_ctx = ""
# ── Step 3組建 Gemini Prompt ───────────────────────────────────────────
system_prompt = """你是 OpenClaw一位台灣頂尖電商戰略分析師專精於 momo 購物平台。
你的任務是根據真實業績數據、競品情報、外部市場趨勢,產出一份具體可執行的週報。
語言規定:
- 所有輸出必須使用繁體中文(台灣用語)
- 數字格式:金額用 NT$ 標示百分比保留1位小數
- 語氣:專業但不失親切,適合匯報給電商運營主管
分析原則:
- 每個洞察必須有數據支撐,禁止憑空推測
- 建議必須具體(時間、對象、行動、預期效益)
- 優先關注「可在 48 小時內執行」的行動項目"""
db_section = f"""
【DB 即時數據】
業績概況:
本週營收NT${sales.get('current_7d_revenue', 0):,.0f}
前週營收NT${sales.get('prev_7d_revenue', 0):,.0f}
週成長率:{sales.get('wow_pct', 0):+.1f}%
競品比對概況:
監控SKU總數{competitor_summary.get('total_skus', 0)}
平均價差:{competitor_summary.get('avg_gap_pct', 0):+.1f}%
被競品削價數:{competitor_summary.get('undercut_count', 0)}
我方具優勢數:{competitor_summary.get('premium_count', 0)}
TOP 威脅品項近48h Hermes 偵測):
{_format_threats(threats)}
待處理定價建議:
{_format_recommendations(recommendations)}
品類業績分佈(本週):
{_format_categories(categories)}
"""
mcp_section = f"""
【MCP 外部情報】
市場趨勢:
{mcp_data.get('market_trends', '(未取得)')[:600]}
競品動態:
{mcp_data.get('competitor_intel', '(未取得)')[:500]}
消費者情緒:
{mcp_data.get('consumer_sentiment', '(未取得)')[:400]}
定價策略參考:
{mcp_data.get('pricing_strategy', '(未取得)')[:400]}
節日行事曆:
{holiday_ctx}
{mcp_data.get('holiday_calendar', '')[:300]}
季節性洞察:
{seasonal_ctx}
{mcp_data.get('seasonal_insights', '')[:300]}
"""
user_prompt = f"""請根據以下數據,產出本週電商全景戰略週報:
{db_section}
{mcp_section}
請按以下結構輸出(每節使用 HTML <b> 標題,內容精簡扼要):
<b>📊 本週業績總結</b>
(關鍵指標 + WoW 變化 + 異常警示)
<b>🏆 TOP 機會品項</b>
具備提價或強推空間的品項3-5個含具體建議
<b>⚠️ TOP 威脅品項</b>
最需緊急處理的競品削價風險3-5個含建議行動
<b>💰 本週定價策略建議</b>
(整體定價方向 + 品類重點調整 + 心理定價應用)
<b>📢 行銷活動洞察</b>
(節日/季節機會 + 推薦活動形式 + 投放時機)
<b>📦 品類熱度分析</b>
(成長品類 vs 衰退品類 + 庫存備貨建議)
<b>🔮 市場競爭洞察</b>
(競品最新動態 + 平台策略差異 + 我方應對)
<b>🎯 48小時優先行動清單</b>
5-8條具體可執行任務格式[優先度] 行動說明 → 預期效益)
<b>📈 下週展望</b>
(風險提示 + 機會預告 + 需人工決策事項)
重要:語言必須是繁體中文,數據必須引用上方提供的實際數字。
"""
# ── Step 4Gemini 生成 ───────────────────────────────────────────────────
logger.info("[OpenClaw] 呼叫 Gemini %s 生成週報...", STRATEGY_MODEL)
report_content = _call_gemini(system_prompt, user_prompt, temperature=0.35)
if not report_content:
logger.error("[OpenClaw] Gemini 週報生成失敗")
return {"status": "error", "report_type": "weekly_strategy", "error": "Gemini 呼叫失敗"}
# ── Step 5解析行動清單 ─────────────────────────────────────────────────
action_items = _extract_action_items(report_content)
# ── Step 6持久化 DB ────────────────────────────────────────────────────
metadata = {
"period": period,
"model": STRATEGY_MODEL,
"wow_pct": sales.get("wow_pct", 0),
"threat_count": len(threats),
"recommendation_count": len(recommendations),
"mcp_topics_collected": sum(1 for v in mcp_data.values() if v),
"action_count": len(action_items),
"generated_at": now.isoformat(),
}
insight_id = _save_to_ai_insights(
insight_type="weekly_strategy",
content=report_content,
confidence=0.88,
metadata=metadata,
period=now.strftime("%Y-%W"),
)
_save_action_items(action_items, insight_id)
# ── Step 7Telegram 推播 ────────────────────────────────────────────────
if force_tg_alert or True:
_send_strategy_telegram(
title="OpenClaw 電商全景週報",
report_type="weekly_strategy",
period=period,
content=report_content,
)
logger.info("[OpenClaw] 週報完成 insight_id=%s actions=%d", insight_id, len(action_items))
return {
"status": "ok",
"report_type": "weekly_strategy",
"summary": "OpenClaw strategy report placeholder — LLM integration pending",
"context": context,
"insight_id": insight_id,
"period": period,
"action_count": len(action_items),
"summary": report_content[:300],
}
def generate_meta_analysis_report() -> str:
"""
AI 系統效能自我審視(每 6 小時 run_openclaw_meta_analysis_task 呼叫)
分析 ai_insights 近期累積資料,評估:
- 各 Agent 預測準確率
- 價格建議執行率
- 告警品質與誤報率
- 系統盲區與改進方向
結果持久化至 ai_insightstype='meta_analysis'),並推播 Telegram。
"""
now = datetime.now()
period = now.strftime("%Y-%m-%d %H:00")
logger.info("[OpenClaw] Meta-Analysis 任務啟動 %s", period)
# ── 讀取近期 ai_insights 摘要 ────────────────────────────────────────────
session = get_session()
try:
stats = session.execute(text("""
SELECT
insight_type,
created_by,
COUNT(*) AS total,
AVG(confidence) AS avg_conf,
SUM(CASE WHEN status='active' THEN 1 ELSE 0 END) AS active_cnt,
SUM(CASE WHEN status='relearn' THEN 1 ELSE 0 END) AS relearn_cnt,
MAX(created_at) AS latest
FROM ai_insights
WHERE created_at >= NOW() - INTERVAL '24 hours'
GROUP BY insight_type, created_by
ORDER BY total DESC
""")).fetchall()
action_stats = session.execute(text("""
SELECT status, COUNT(*) AS cnt
FROM action_plans
WHERE created_at >= NOW() - INTERVAL '24 hours'
GROUP BY status
""")).fetchall()
reco_stats = session.execute(text("""
SELECT status, COUNT(*) AS cnt, AVG(confidence) AS avg_conf
FROM ai_price_recommendations
WHERE created_at >= NOW() - INTERVAL '24 hours'
GROUP BY status
""")).fetchall()
except Exception as e:
logger.warning("[OpenClaw] Meta 數據讀取失敗: %s", e)
stats, action_stats, reco_stats = [], [], []
finally:
session.close()
# ── 組建 Prompt ───────────────────────────────────────────────────────────
system_prompt = """你是 OpenClaw 自我審視模組,負責分析 AI 多智能體系統的近期表現。
請用繁體中文,以電商 AI 系統架構師的視角撰寫分析報告,語氣客觀、聚焦問題與改進。"""
stats_text = "\n".join([
f" {r[0]} ({r[1]}): 共{r[2]}筆, 平均信心{r[3]:.2f}, 活躍{r[4]}, 重學{r[5]}"
for r in stats
]) or " (無近期數據)"
action_text = "\n".join([
f" {r[0]}: {r[1]}" for r in action_stats
]) or " (無近期數據)"
reco_text = "\n".join([
f" {r[0]}: {r[1]} 筆, 平均信心{r[2]:.2f}" for r in reco_stats
]) or " (無近期數據)"
user_prompt = f"""請分析以下 AI 系統近 24 小時運作數據,產出自我審視報告:
【ai_insights 產出統計】
{stats_text}
【action_plans 執行狀況】
{action_text}
【ai_price_recommendations 建議狀況】
{reco_text}
【分析時間】{period}
請按以下結構輸出:
<b>🤖 AI 系統效能自我審視報告</b>
時間:{period}
<b>📊 各 Agent 產出統計</b>
(逐一評估 Hermes/NemoTron/OpenClaw/ElephantAlpha 的輸出品質)
<b>⚠️ 偵測到的系統問題</b>
(誤報、漏報、重學事件分析)
<b>💡 盲區與改進建議</b>
(哪些場景 AI 表現不足?建議優化方向)
<b>✅ 本週期亮點</b>
(表現良好的分析案例)
<b>🔧 技術債與優化優先順序</b>
1-3 項具體技術改進建議)
語言繁體中文200字以內精簡扼要。
"""
# ── Gemini 生成 ──────────────────────────────────────────────────────────
report_content = _call_gemini(system_prompt, user_prompt, temperature=0.3)
if not report_content:
logger.error("[OpenClaw] Meta-Analysis Gemini 呼叫失敗")
return "Meta-Analysis 生成失敗)"
# ── 持久化 DB ─────────────────────────────────────────────────────────────
metadata = {
"period": period,
"model": STRATEGY_MODEL,
"insight_types_analyzed": len(stats),
"generated_at": now.isoformat(),
}
insight_id = _save_to_ai_insights(
insight_type="meta_analysis",
content=report_content,
confidence=0.85,
metadata=metadata,
period=now.strftime("%Y-%m-%d"),
)
# ── Telegram 推播 ────────────────────────────────────────────────────────
try:
from services.telegram_templates import _send_telegram_raw
_send_telegram_raw(report_content)
except Exception as e:
logger.error("[OpenClaw] Meta-Analysis Telegram 推播失敗: %s", e)
logger.info("[OpenClaw] Meta-Analysis 完成 insight_id=%s", insight_id)
return report_content
# ═══════════════════════════════════════════════════════════════════════════════
# 輔助格式化函式
# ═══════════════════════════════════════════════════════════════════════════════
def _format_threats(threats: List[Dict]) -> str:
if not threats:
return " (無近期競價威脅)"
lines = []
for t in threats[:5]:
lines.append(
f" • SKU {t['sku']}:價差 {t.get('gap_pct', 0):+.1f}%"
f"業績週變化 {t.get('sales_delta', 0):+.1f}%"
f"信心 {t.get('confidence', 0):.2f}"
)
return "\n".join(lines)
def _format_recommendations(recs: List[Dict]) -> str:
if not recs:
return " (無待處理定價建議)"
lines = []
for r in recs[:5]:
lines.append(
f"{r.get('name', r.get('sku', ''))[:30]}{r.get('strategy', '')}"
f"信心 {r.get('confidence', 0):.2f}"
)
return "\n".join(lines)
def _format_categories(cats: List[Dict]) -> str:
if not cats:
return " (無品類數據)"
lines = []
for c in cats[:5]:
lines.append(
f"{c.get('category', '未分類')}"
f"NT${c.get('revenue', 0):,.0f}{c.get('sku_count', 0)} 個 SKU"
)
return "\n".join(lines)
def _extract_action_items(report_text: str) -> List[str]:
"""從報告文字中解析行動清單48小時優先行動"""
lines = report_text.split("\n")
items = []
in_action_section = False
for line in lines:
if "48小時" in line or "優先行動" in line:
in_action_section = True
continue
if in_action_section:
stripped = line.strip()
if stripped.startswith("") or stripped.startswith("-") or stripped.startswith("["):
items.append(stripped.lstrip("•-").strip())
elif stripped.startswith("<b>") and items:
break
return items[:8]