Files
ewoooc/services/mcp_collector_service.py

299 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
services/mcp_collector_service.py
MCP 外部情報收集層
透過 Gemini Google Search Grounding 收集外部市場情報,供 OpenClaw 戰略分析使用:
- 台灣電商市場趨勢
- 節日 / 促銷行事曆
- 季節性消費洞察
- 競品動態(蝦皮/PChome/momo/Yahoo
- 消費者情緒與熱銷品類
結果快取至 ai_insightstype='mcp_cache'24h TTL 避免重複呼叫。
"""
import json
import logging
import os
import time
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional
from database.manager import get_session
from sqlalchemy import text
logger = logging.getLogger(__name__)
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY", "")
MCP_CACHE_TTL_HOURS = int(os.getenv("MCP_CACHE_TTL_HOURS", "24"))
MCP_MODEL = os.getenv("MCP_GEMINI_MODEL", "gemini-2.0-flash")
# ── 查詢主題定義 ────────────────────────────────────────────────────────────
_SEARCH_TOPICS = {
"market_trends": (
"台灣電商 momo購物網 2026年熱銷商品趨勢 消費者行為 美妝保養 家電 生活用品"
),
"holiday_calendar": (
"2026年台灣重要節日促銷行事曆 母親節 618購物節 雙11 雙12 中秋 跨年 電商大促"
),
"seasonal_insights": (
"台灣電商季節性銷售趨勢 換季商品 夏季防曬 冬季保暖 Q3 Q4 消費高峰"
),
"competitor_intel": (
"momo購物網 PChome 蝦皮 Yahoo購物 2026年競爭策略 促銷活動 物流比較"
),
"consumer_sentiment": (
"台灣消費者 2026 購物偏好 低價高CP 品牌忠誠度 直播購物 社群電商 KOL影響"
),
"pricing_strategy": (
"台灣電商定價策略 動態定價 競品比價 心理定價 促銷折扣最佳時機"
),
}
class MCPCollectorService:
"""
外部情報收集服務MCP 節點)
使用 Gemini Search Grounding 抓取即時市場資訊
"""
def __init__(self):
self._initialized = False
self._genai = None
def _ensure_init(self) -> bool:
if self._initialized:
return True
if not GEMINI_API_KEY:
logger.warning("[MCP] GEMINI_API_KEY 未設定,跳過外部情報收集")
return False
try:
import google.generativeai as genai
genai.configure(api_key=GEMINI_API_KEY)
self._genai = genai
self._initialized = True
return True
except ImportError:
logger.error("[MCP] google-generativeai 未安裝")
return False
except Exception as e:
logger.error("[MCP] Gemini 初始化失敗: %s", e)
return False
# ── 快取讀寫 ────────────────────────────────────────────────────────────
def _read_cache(self, topic: str) -> Optional[str]:
session = get_session()
try:
row = session.execute(
text(f"""
SELECT content FROM ai_insights
WHERE insight_type = 'mcp_cache'
AND created_by = 'mcp_collector'
AND metadata_json::jsonb ->> 'topic' = :topic
AND created_at >= NOW() - INTERVAL '{MCP_CACHE_TTL_HOURS} hours'
ORDER BY created_at DESC LIMIT 1
"""),
{"topic": topic},
).fetchone()
if row:
logger.debug("[MCP] 快取命中: %s", topic)
content = row[0]
if self._looks_unreliable(content):
logger.warning("[MCP] 快取內容含占位文字,略過 topic=%s", topic)
return None
return content
return None
except Exception:
return None
finally:
session.close()
def _write_cache(self, topic: str, content: str) -> None:
session = get_session()
try:
row = session.execute(text("""
INSERT INTO ai_insights
(insight_type, content, confidence, created_by, status, metadata_json)
VALUES ('mcp_cache', :content, 0.9, 'mcp_collector', 'active', :meta)
RETURNING id
"""), {
"content": content[:4000],
"meta": json.dumps({"topic": topic, "model": MCP_MODEL, "cached_at": datetime.now().isoformat()})
}).fetchone()
session.commit()
if row:
try:
from services.openclaw_learning_service import enqueue_insight_embedding
enqueue_insight_embedding(row[0], "mcp_cache", content[:4000])
except Exception as embed_err:
logger.warning("[MCP] embedding queue enqueue failed: %s", embed_err)
except Exception as e:
logger.warning("[MCP] 快取寫入失敗: %s", e)
session.rollback()
finally:
session.close()
# ── 單主題搜尋 ──────────────────────────────────────────────────────────
def _search_topic(self, topic: str, query: str) -> str:
cached = self._read_cache(topic)
if cached:
return cached
if not self._ensure_init():
return self._fallback_topic_content(topic, "GEMINI_API_KEY 未設定,使用本地行銷情報。")
try:
prompt = f"請用繁體中文整理以下主題的最新資訊提供具體數據與洞察500字以內\n{query}"
response = None
last_error = None
for tools in (["google_search"], ["google_search_retrieval"], None):
try:
kwargs = {"model_name": MCP_MODEL}
if tools:
kwargs["tools"] = tools
model = self._genai.GenerativeModel(**kwargs)
response = model.generate_content(prompt)
break
except Exception as tool_err:
last_error = tool_err
continue
if response is None:
raise last_error or RuntimeError("Gemini response empty")
content = response.text or ""
if self._looks_unreliable(content):
return self._fallback_topic_content(topic, "即時搜尋內容含占位數字或待更新文字,已改用本地行銷情報。")
if content:
self._write_cache(topic, content)
return content
return self._fallback_topic_content(topic, "Gemini 回傳空內容,使用本地行銷情報。")
except Exception as e:
logger.warning("[MCP] 搜尋失敗 topic=%s: %s", topic, e)
return self._fallback_topic_content(topic, f"即時外部搜尋暫不可用:{type(e).__name__}")
@staticmethod
def _looks_unreliable(content: str) -> bool:
"""避免將模型產生的占位數字或待補文字當成真實情報。"""
if not content:
return False
markers = (
"XX",
"請自行更新",
"待補",
"資料待查",
"自行查詢",
"示例數據",
"範例數據",
)
return any(marker in content for marker in markers)
def _fallback_topic_content(self, topic: str, reason: str = "") -> str:
"""外部搜尋失敗時的穩定回覆,避免 Telegram 按鈕空白或像壞掉。"""
holiday = self.get_holiday_context()
seasonal = self.get_seasonal_context()
fallback_map = {
"market_trends": [
"台灣電商營運觀察:美妝保養、保健食品、母嬰與個人清潔仍適合用週期性促銷與組合包拉升轉換。",
"建議優先檢查近期高業績品類、毛利率與庫存週轉,將活動資源集中在高轉換商品。",
],
"competitor_intel": [
"競品情報 fallback請優先比較 momo / PChome / 蝦皮同款商品的售價、庫存、到貨速度與組合優惠。",
"若 PChome 價格優勢明顯,可強化文案中的即時到貨、價格透明與組合折扣。",
],
"consumer_sentiment": [
"消費者聲量 fallback高 CP 值、到貨速度、真實評價與成分/規格透明度通常會影響購買意願。",
"建議把負評來源拆成價格、物流、規格不符、售後服務四類追蹤。",
],
"pricing_strategy": [
"定價策略 fallback先鎖定高流量高轉換商品做競品價差監控再用加價購、滿額折與組合包保護毛利。",
],
"holiday_calendar": [holiday],
"seasonal_insights": [seasonal],
}
lines = fallback_map.get(topic, [holiday, seasonal])
if reason:
lines.append(f"資料狀態:{reason}")
return "\n".join(line for line in lines if line)
# ── 公開介面 ────────────────────────────────────────────────────────────
def collect_all(self) -> Dict[str, str]:
"""
收集所有外部情報主題,回傳 {topic: content} 字典。
各主題獨立失敗不影響整體。
"""
results = {}
for topic, query in _SEARCH_TOPICS.items():
try:
results[topic] = self._search_topic(topic, query)
time.sleep(0.5) # 避免 Gemini rate limit
except Exception as e:
logger.error("[MCP] topic=%s 收集失敗: %s", topic, e)
results[topic] = ""
logger.info("[MCP] 收集完成,有效主題=%d/%d", sum(1 for v in results.values() if v), len(results))
return results
def collect_topic(self, topic: str) -> str:
"""收集單一主題"""
query = _SEARCH_TOPICS.get(topic, topic)
return self._search_topic(topic, query)
def get_holiday_context(self) -> str:
"""取得節日行事曆(供 Prompt 注入)"""
now = datetime.now()
month = now.month
day = now.day
# 靜態台灣電商節日知識庫
static_calendar = {
1: "元旦促銷、農曆新年備貨期",
2: "農曆新年、情人節2/14",
3: "婦女節3/8、開學季",
4: "清明連假4月初、春季大促、換季服飾高峰",
5: "母親節5月第2週年度大促、520情人節、勞動節5/1",
6: "618購物節年中最大促銷、端午節",
7: "暑假開端、父親節前哨站",
8: "父親節8/8、七夕情人節",
9: "中秋節、開學季、百貨週年慶預熱",
10: "雙10國慶、百貨週年慶高峰期",
11: "雙11光棍節全年最強電商季、黑五大促",
12: "雙12年終慶、聖誕節、跨年備貨",
}
current_focus = static_calendar.get(month, "")
next_month = (month % 12) + 1
next_focus = static_calendar.get(next_month, "")
# 月底優化:若超過 20 號自動將焦點轉向「下月預告」避免產生如「4月底還在過清明節」的幻覺
if day > 20:
header = f"當前日期:{now.strftime('%Y/%m/%d')} (月底轉場期)"
body = f"本月即將結束,目前重點已轉向:{next_focus}"
footer = f"下月詳細預告:{next_focus}"
else:
header = f"當前日期:{now.strftime('%Y/%m/%d')}"
body = f"本月電商重點:{current_focus}"
footer = f"下月預告:{next_focus}"
return f"{header}\n{body}\n{footer}"
def get_seasonal_context(self) -> str:
"""季節性消費情境"""
month = datetime.now().month
seasons = {
(3, 4, 5): "春季:換季保養、外出服飾、春遊裝備",
(6, 7, 8): "夏季:防曬/美白、涼感寢具、戶外運動、冷氣清潔",
(9, 10, 11): "秋季:保濕修護、秋冬服飾、保健養生、熱飲週邊",
(12, 1, 2): "冬季:保暖寢具、暖身家電、年節禮品、養生補品",
}
for months, desc in seasons.items():
if month in months:
return desc
return ""
# 模組單例
mcp_collector = MCPCollectorService()