Files
ewoooc/services/mcp_collector_service.py
OoO d90f96fab3
All checks were successful
CD Pipeline / deploy (push) Successful in 1m6s
V10.567 收斂 MCP 市場洞察 fallback
2026-06-02 10:09:36 +08:00

424 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
services/mcp_collector_service.py
MCP 外部情報收集層
透過 MCP omnisearch / Ollama-first fallback / Gemini final fallback 收集外部市場情報,供 OpenClaw 戰略分析使用:
- 台灣電商市場趨勢
- 節日 / 促銷行事曆
- 季節性消費洞察
- 競品動態(蝦皮/PChome/momo/Yahoo
- 消費者情緒與熱銷品類
結果快取至 ai_insightstype='mcp_cache'24h TTL 避免重複呼叫。
"""
import json
import logging
import os
import time
import requests
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional
from database.manager import get_session
from sqlalchemy import text
from services.gemini_guard import (
gemini_disabled_message,
get_gemini_api_key,
is_gemini_fallback_enabled,
)
logger = logging.getLogger(__name__)
MCP_CACHE_TTL_HOURS = int(os.getenv("MCP_CACHE_TTL_HOURS", "24"))
# MCP router 是即時情報主路徑router 不可用時先走 GCP Ollama 做離線洞察。
# 市場洞察屬非即時必需批次工作,不把長分析轉嫁到 111 fallback。
# Gemini Grounding 僅作最後備援,避免再次回到 Gemini-first。
MCP_MODEL = os.getenv("MCP_GEMINI_MODEL", "gemini-2.0-flash")
MCP_FALLBACK_MODEL = "gemini-1.5-flash"
MCP_OLLAMA_TIMEOUT = int(os.getenv("MCP_OLLAMA_TIMEOUT", "25"))
MCP_OLLAMA_NUM_PREDICT = int(os.getenv("MCP_OLLAMA_NUM_PREDICT", "500"))
try:
from services.ollama_service import OllamaService
_OLLAMA_AVAILABLE = True
except ImportError:
_OLLAMA_AVAILABLE = False
# ── 查詢主題定義 ────────────────────────────────────────────────────────────
_SEARCH_TOPICS = {
"market_trends": (
"台灣電商 momo購物網 2026年熱銷商品趨勢 消費者行為 美妝保養 家電 生活用品"
),
"holiday_calendar": (
"2026年台灣重要節日促銷行事曆 母親節 618購物節 雙11 雙12 中秋 跨年 電商大促"
),
"seasonal_insights": (
"台灣電商季節性銷售趨勢 換季商品 夏季防曬 冬季保暖 Q3 Q4 消費高峰"
),
"competitor_intel": (
"momo購物網 PChome 蝦皮 Yahoo購物 2026年競爭策略 促銷活動 物流比較"
),
"consumer_sentiment": (
"台灣消費者 2026 購物偏好 低價高CP 品牌忠誠度 直播購物 社群電商 KOL影響"
),
"pricing_strategy": (
"台灣電商定價策略 動態定價 競品比價 心理定價 促銷折扣最佳時機"
),
}
class MCPCollectorService:
"""
外部情報收集服務MCP 節點)
先用 MCP / OllamaGemini Search Grounding 僅作顯式開啟後的緊急備援。
"""
def __init__(self):
self._initialized = False
self._genai = None
def _ensure_init(self) -> bool:
if self._initialized:
return True
if not is_gemini_fallback_enabled("mcp_collector"):
logger.info("[MCP] %s", gemini_disabled_message("mcp_collector"))
return False
gemini_api_key = get_gemini_api_key("mcp_collector")
if not gemini_api_key:
logger.warning("[MCP] GEMINI_API_KEY 未設定,跳過外部情報收集")
return False
try:
import google.generativeai as genai
genai.configure(api_key=gemini_api_key)
self._genai = genai
self._initialized = True
return True
except ImportError:
logger.error("[MCP] google-generativeai 未安裝")
return False
except Exception as e:
logger.error("[MCP] Gemini 初始化失敗: %s", e)
return False
# ── 快取讀寫 ────────────────────────────────────────────────────────────
def _read_cache(self, topic: str) -> Optional[str]:
session = get_session()
try:
row = session.execute(
text(f"""
SELECT content FROM ai_insights
WHERE insight_type = 'mcp_cache'
AND created_by = 'mcp_collector'
AND metadata_json::jsonb ->> 'topic' = :topic
AND created_at >= NOW() - INTERVAL '{MCP_CACHE_TTL_HOURS} hours'
ORDER BY created_at DESC LIMIT 1
"""),
{"topic": topic},
).fetchone()
if row:
logger.debug("[MCP] 快取命中: %s", topic)
content = row[0]
if self._looks_unreliable(content):
logger.warning("[MCP] 快取內容含占位文字,略過 topic=%s", topic)
return None
return content
return None
except Exception:
return None
finally:
session.close()
def _write_cache(self, topic: str, content: str) -> None:
session = get_session()
try:
row = session.execute(text("""
INSERT INTO ai_insights
(insight_type, content, confidence, created_by, status, metadata_json)
VALUES ('mcp_cache', :content, 0.9, 'mcp_collector', 'active', :meta)
RETURNING id
"""), {
"content": content[:4000],
"meta": json.dumps({"topic": topic, "model": MCP_MODEL, "cached_at": datetime.now().isoformat()})
}).fetchone()
session.commit()
if row:
try:
from services.openclaw_learning_service import enqueue_insight_embedding
enqueue_insight_embedding(row[0], "mcp_cache", content[:4000])
except Exception as embed_err:
logger.warning("[MCP] embedding queue enqueue failed: %s", embed_err)
except Exception as e:
logger.warning("[MCP] 快取寫入失敗: %s", e)
session.rollback()
finally:
session.close()
# ── 單主題搜尋 ──────────────────────────────────────────────────────────
def _search_topic(self, topic: str, query: str) -> str:
cached = self._read_cache(topic)
if cached:
return cached
# ─── Phase 10.52026-05-04MCP omnisearch L0 路徑 ───
# MCP_ROUTER_ENABLED=true 且 docker-compose.mcp.yml 已 deploy 時,
# 優先走 self-hosted Tavily/Exa取代 Gemini Grounding 主路徑)。
# 失敗先 fallback 到 Ollama 三主機離線摘要Gemini Grounding 僅作最後備援。
try:
from services.mcp_router import mcp_router, is_mcp_router_enabled
if is_mcp_router_enabled():
mcp_result = mcp_router.call(
server='omnisearch',
tool='tavily_search',
args={'query': query, 'max_results': 5},
caller='mcp_collector',
)
if mcp_result.success and mcp_result.data:
# tavily 回傳格式:{'results': [{'title', 'content', 'url'}, ...]}
results = mcp_result.data.get('results', [])
if results:
content_lines = []
for r in results[:5]:
title = (r.get('title') or '').strip()
text_ = (r.get('content') or r.get('text') or '').strip()[:300]
if title and text_:
content_lines.append(f"{title}{text_}")
if content_lines:
content = "\n\n".join(content_lines)
if not self._looks_unreliable(content):
self._write_cache(topic, content)
logger.info("[MCP] omnisearch tavily 命中 topic=%s 取代 Gemini Grounding", topic)
return content
# omnisearch 失敗 / 結果太少 → 嘗試 exa 備援
exa_result = mcp_router.call(
server='omnisearch', tool='exa_search',
args={'query': query, 'num_results': 5},
caller='mcp_collector',
)
if exa_result.success and exa_result.data:
results = exa_result.data.get('results', [])
if results:
content_lines = [
f"{r.get('title','')}{(r.get('text') or '')[:300]}"
for r in results[:5] if r.get('title')
]
if content_lines:
content = "\n\n".join(content_lines)
if not self._looks_unreliable(content):
self._write_cache(topic, content)
logger.info("[MCP] omnisearch exa 命中 topic=%stavily 失敗備援)", topic)
return content
logger.info("[MCP] omnisearch 全失敗fallback Ollama static insight")
except Exception as router_err:
logger.debug("[MCP] mcp_router 不可用 (預期 deploy 前): %s", router_err)
# ─── Phase 10.5 end下方先走 OllamaGemini Grounding 僅最後備援 ───
ollama_content = self._ollama_topic_fallback(topic, query)
if ollama_content:
return ollama_content
if not self._ensure_init():
reason = (
gemini_disabled_message("mcp_collector")
if not is_gemini_fallback_enabled("mcp_collector")
else "GEMINI_API_KEY 未設定,使用本地行銷情報。"
)
return self._fallback_topic_content(topic, reason)
try:
prompt = f"請用繁體中文整理以下主題的最新資訊提供具體數據與洞察500字以內\n{query}"
response = None
last_error = None
for tools in (["google_search"], ["google_search_retrieval"], None):
try:
kwargs = {"model_name": MCP_MODEL}
if tools:
kwargs["tools"] = tools
model = self._genai.GenerativeModel(**kwargs)
response = model.generate_content(prompt)
break
except Exception as tool_err:
last_error = tool_err
continue
if response is None:
raise last_error or RuntimeError("Gemini response empty")
content = response.text or ""
if self._looks_unreliable(content):
return self._fallback_topic_content(topic, "即時搜尋內容含占位數字或待更新文字,已改用本地行銷情報。")
if content:
self._write_cache(topic, content)
return content
return self._fallback_topic_content(topic, "Gemini 回傳空內容,使用本地行銷情報。")
except Exception as e:
logger.warning("[MCP] Gemini 2.0 Grounding failed topic=%s: %s, trying 1.5 Flash", topic, e)
# 級別 2嘗試 1.5 Flash (通常配額較穩)
try:
model = self._genai.GenerativeModel(model_name=MCP_FALLBACK_MODEL, tools=["google_search"])
response = model.generate_content(prompt)
content = response.text or ""
if content and not self._looks_unreliable(content):
self._write_cache(topic, content)
return content
except Exception as e2:
logger.warning("[MCP] Gemini 1.5 Flash also failed: %s", e2)
return self._fallback_topic_content(topic, f"即時外部搜尋暫不可用:{type(e).__name__}")
def _ollama_topic_fallback(self, topic: str, query: str) -> Optional[str]:
"""MCP/搜尋不可用時先走 GCP Ollama111 不承接市場洞察長任務。"""
if not _OLLAMA_AVAILABLE:
return None
try:
logger.info("[MCP] Using GCP-A/GCP-B Ollama for market insight fallback topic=%s", topic)
ollama_model = os.getenv('OPENCLAW_OLLAMA_MODEL', 'qwen2.5-coder:7b')
ollama_prompt = (
f"你是一位精通台灣電商市場的分析師。目前無法取得即時搜尋結果,"
f"請根據你的知識儲備,針對以下主題提供 2026 年可能的市場動態或洞察繁體中文300字以內\n"
f"主題:{query}\n\n"
"請註明:『(此為基於歷史趨勢的預測性洞察)』"
)
resp = OllamaService(model=ollama_model).generate(
prompt=ollama_prompt,
model=ollama_model,
temperature=0.4,
timeout=MCP_OLLAMA_TIMEOUT,
options={'num_predict': MCP_OLLAMA_NUM_PREDICT},
allow_111_fallback=False,
)
content = (resp.content or '').strip() if resp.success else ''
if content and not self._looks_unreliable(content):
# 不進快取,因為這是預測性內容。
return content
if not resp.success:
logger.warning("[MCP] GCP Ollama fallback failed: %s", resp.error)
except Exception as exc:
logger.warning("[MCP] Ollama fallback failed: %s", exc)
return None
@staticmethod
def _looks_unreliable(content: str) -> bool:
"""避免將模型產生的占位數字或待補文字當成真實情報。"""
if not content:
return False
markers = (
"XX",
"請自行更新",
"待補",
"資料待查",
"自行查詢",
"示例數據",
"範例數據",
)
return any(marker in content for marker in markers)
def _fallback_topic_content(self, topic: str, reason: str = "") -> str:
"""外部搜尋失敗時的穩定回覆,避免 Telegram 按鈕空白或像壞掉。"""
holiday = self.get_holiday_context()
seasonal = self.get_seasonal_context()
fallback_map = {
"market_trends": [
"台灣電商營運觀察:美妝保養、保健食品、母嬰與個人清潔仍適合用週期性促銷與組合包拉升轉換。",
"建議優先檢查近期高業績品類、毛利率與庫存週轉,將活動資源集中在高轉換商品。",
],
"competitor_intel": [
"競品情報 fallback請優先比較 momo / PChome / 蝦皮同款商品的售價、庫存、到貨速度與組合優惠。",
"若 PChome 價格優勢明顯,可強化文案中的即時到貨、價格透明與組合折扣。",
],
"consumer_sentiment": [
"消費者聲量 fallback高 CP 值、到貨速度、真實評價與成分/規格透明度通常會影響購買意願。",
"建議把負評來源拆成價格、物流、規格不符、售後服務四類追蹤。",
],
"pricing_strategy": [
"定價策略 fallback先鎖定高流量高轉換商品做競品價差監控再用加價購、滿額折與組合包保護毛利。",
],
"holiday_calendar": [holiday],
"seasonal_insights": [seasonal],
}
lines = fallback_map.get(topic, [holiday, seasonal])
if reason:
lines.append(f"資料狀態:{reason}")
return "\n".join(line for line in lines if line)
# ── 公開介面 ────────────────────────────────────────────────────────────
def collect_all(self) -> Dict[str, str]:
"""
收集所有外部情報主題,回傳 {topic: content} 字典。
各主題獨立失敗不影響整體。
"""
results = {}
for topic, query in _SEARCH_TOPICS.items():
try:
results[topic] = self._search_topic(topic, query)
time.sleep(0.5) # 避免 Gemini rate limit
except Exception as e:
logger.error("[MCP] topic=%s 收集失敗: %s", topic, e)
results[topic] = ""
logger.info("[MCP] 收集完成,有效主題=%d/%d", sum(1 for v in results.values() if v), len(results))
return results
def collect_topic(self, topic: str) -> str:
"""收集單一主題"""
query = _SEARCH_TOPICS.get(topic, topic)
return self._search_topic(topic, query)
def get_holiday_context(self) -> str:
"""取得節日行事曆(供 Prompt 注入)"""
now = datetime.now()
month = now.month
day = now.day
# 靜態台灣電商節日知識庫
static_calendar = {
1: "元旦促銷、農曆新年備貨期",
2: "農曆新年、情人節2/14",
3: "婦女節3/8、開學季",
4: "清明連假4月初、春季大促、換季服飾高峰",
5: "母親節5月第2週年度大促、520情人節、勞動節5/1",
6: "618購物節年中最大促銷、端午節",
7: "暑假開端、父親節前哨站",
8: "父親節8/8、七夕情人節",
9: "中秋節、開學季、百貨週年慶預熱",
10: "雙10國慶、百貨週年慶高峰期",
11: "雙11光棍節全年最強電商季、黑五大促",
12: "雙12年終慶、聖誕節、跨年備貨",
}
current_focus = static_calendar.get(month, "")
next_month = (month % 12) + 1
next_focus = static_calendar.get(next_month, "")
# 月底優化:若超過 20 號自動將焦點轉向「下月預告」避免產生如「4月底還在過清明節」的幻覺
if day > 20:
header = f"當前日期:{now.strftime('%Y/%m/%d')} (月底轉場期)"
body = f"本月即將結束,目前重點已轉向:{next_focus}"
footer = f"下月詳細預告:{next_focus}"
else:
header = f"當前日期:{now.strftime('%Y/%m/%d')}"
body = f"本月電商重點:{current_focus}"
footer = f"下月預告:{next_focus}"
return f"{header}\n{body}\n{footer}"
def get_seasonal_context(self) -> str:
"""季節性消費情境"""
month = datetime.now().month
seasons = {
(3, 4, 5): "春季:換季保養、外出服飾、春遊裝備",
(6, 7, 8): "夏季:防曬/美白、涼感寢具、戶外運動、冷氣清潔",
(9, 10, 11): "秋季:保濕修護、秋冬服飾、保健養生、熱飲週邊",
(12, 1, 2): "冬季:保暖寢具、暖身家電、年節禮品、養生補品",
}
for months, desc in seasons.items():
if month in months:
return desc
return ""
# 模組單例
mcp_collector = MCPCollectorService()