83 KiB
83 KiB
趨勢資料庫系統設計文件
文件資訊
- 建立日期: 2026-01-22
- 目標: 自動爬取社群趨勢資料,儲存至資料庫,供 AI 文案生成參考
- Ollama 伺服器: 192.168.0.188 (內部網路)
- 新增功能: Ollama Web Search 整合、Telegram Bot 整合
一、系統架構概覽
┌─────────────────────────────────────────────────────────────────────────────┐
│ 趨勢資料庫系統架構 (含 Web Search & Telegram) │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ 資料來源層 │ │
│ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌───────────────┐ │ │
│ │ │Google News │ │ PTT │ │ Dcard │ │ Ollama Web │ │ │
│ │ │ RSS Feed │ │ 熱門看板 │ │ 熱門文章 │ │ Search │ │ │
│ │ └─────┬──────┘ └─────┬──────┘ └─────┬──────┘ └───────┬───────┘ │ │
│ │ │ │ │ │ │ │
│ │ └───────────────┴───────────────┴─────────────────┘ │ │
│ └────────────────────────────────┬────────────────────────────────────┘ │
│ ▼ │
│ ┌──────────────────────────────┐ │
│ │ Trend Crawler Service │ │
│ │ + Ollama Web Search Service │ │
│ └──────────────┬───────────────┘ │
│ │ │
│ ┌────────────────────────┼────────────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌────────────┐ ┌──────────────────────┐ ┌────────────────┐ │
│ │ 原始資料 │ │ Ollama AI 分析 │ │ 關鍵字萃取 │ │
│ │ 儲存 │ │ (192.168.0.188) │ │ TF-IDF │ │
│ └─────┬──────┘ │ - 生成分析 │ └───────┬────────┘ │
│ │ │ - Web Search │ │ │
│ │ │ - 情緒分析 │ │ │
│ │ └──────────┬───────────┘ │ │
│ │ │ │ │
│ └──────────────────────┼────────────────────────┘ │
│ ▼ │
│ ┌──────────────────────────────┐ │
│ │ SQLite Database │ │
│ │ ┌────────────────────────┐ │ │
│ │ │ trend_records │ │ │
│ │ │ trend_keywords │ │ │
│ │ │ trend_analysis │ │ │
│ │ │ web_search_cache │ │ ← 新增:Web Search 快取 │
│ │ └────────────────────────┘ │ │
│ └──────────────┬───────────────┘ │
│ │ │
│ ┌────────────────────┼────────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Web API │ │ Telegram Bot │ │ AI 推薦頁面 │ │
│ │ Endpoints │ │ Service │ │ Frontend │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
│ │
└────────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────────┐
│ Telegram Bot 互動流程 │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ User Telegram Bot Backend │
│ │ │ │ │
│ │ /trend 美妝 │ │ │
│ │─────────────────────────>│ │ │
│ │ │ GET /api/trends/keywords │ │
│ │ │────────────────────────────>│ │
│ │ │<────────────────────────────│ │
│ │ 📊 美妝熱門關鍵字: │ │ │
│ │ 1. 保濕 (58次) │ │ │
│ │ 2. 防曬 (45次) │ │ │
│ │<─────────────────────────│ │ │
│ │ │ │ │
│ │ /search 夏季防曬推薦 │ │ │
│ │─────────────────────────>│ │ │
│ │ │ POST /api/ai/web_search │ │
│ │ │────────────────────────────>│ │
│ │ │ (Ollama Web Search) │ │
│ │ │<────────────────────────────│ │
│ │ 🔍 搜尋結果: │ │ │
│ │ - 2026防曬評比... │ │ │
│ │ - PTT推薦清單... │ │ │
│ │<─────────────────────────│ │ │
│ │ │ │ │
│ │ /copy 防曬乳 │ │ │
│ │─────────────────────────>│ │ │
│ │ │ POST /api/ai/generate │ │
│ │ │────────────────────────────>│ │
│ │ │<────────────────────────────│ │
│ │ ✨ AI 生成文案: │ │ │
│ │ 「夏日必備防曬神器...」 │ │ │
│ │<─────────────────────────│ │ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
1.5 Ollama Web Search 整合架構
Web Search 工作流程
┌─────────────────────────────────────────────────────────────────┐
│ Ollama Web Search 流程 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 1. 使用者輸入查詢 │
│ ↓ │
│ 2. 系統呼叫 Ollama Web Search API │
│ POST http://192.168.0.188:11434/api/chat │
│ { │
│ "model": "gemma3:4b", │
│ "messages": [...], │
│ "tools": [{"type": "web_search", ...}] ← 啟用 Web Search │
│ } │
│ ↓ │
│ 3. Ollama 執行網路搜尋並分析結果 │
│ ↓ │
│ 4. 回傳結構化結果 (JSON) │
│ - 搜尋結果摘要 │
│ - 來源連結 │
│ - AI 分析洞察 │
│ ↓ │
│ 5. 儲存至 web_search_cache 表 (可選) │
│ ↓ │
│ 6. 回傳給前端/Telegram │
│ │
└─────────────────────────────────────────────────────────────────┘
已實作的 Web Search API
系統中 services/ollama_service.py 已包含以下 Web Search 方法:
| 方法 | 說明 |
|---|---|
web_search(query, search_type) |
通用網路搜尋 |
search_product_insights(product_name) |
商品市場洞察 |
search_trend_keywords(category) |
趨勢關鍵字搜尋 |
整合至趨勢爬取
Web Search 可作為額外的趨勢資料來源,補充傳統爬蟲無法取得的即時資訊:
# 在 TrendCrawlerService 中整合 Web Search
def crawl_with_web_search(self, query: str, category: str = None):
"""使用 Ollama Web Search 爬取趨勢資料"""
# 呼叫 Web Search
response = self.ollama.web_search(
query=query,
search_type='trends',
num_results=10
)
if response.success:
# 解析結果並儲存
results = json.loads(response.content)
for item in results.get('results', []):
record = TrendRecord(
source='ollama_web_search',
source_url=item.get('url'),
title=item.get('title'),
content=item.get('snippet'),
category=category,
trend_date=date.today(),
ai_summary=results.get('summary')
)
session.add(record)
二、資料來源規劃
2.1 支援的資料來源
| 來源 | 類型 | 爬取方式 | 更新頻率 | 說明 |
|---|---|---|---|---|
| Google News | 新聞 | RSS Feed | 每小時 | 台灣新聞、科技、生活類別 |
| PTT | 社群 | Web Scraping | 每 2 小時 | Gossiping, Lifeismoney, e-shopping |
| Dcard | 社群 | Web Scraping | 每 2 小時 | 熱門看板、電商相關 |
| YouTube Trending | 影音 | YouTube API | 每 4 小時 | 台灣熱門影片 |
| 氣象資料 | 天氣 | 中央氣象署 API | 每 6 小時 | 搭配季節性行銷 |
| Ollama Web Search | AI 搜尋 | Ollama API | 即時/排程 | AI 智慧網路搜尋,補充即時資訊 |
2.2 PTT 目標看板
PTT_BOARDS = [
'Gossiping', # 八卦板 - 熱門話題
'Lifeismoney', # 省錢板 - 優惠情報
'e-shopping', # 網購板 - 電商趨勢
'Beauty', # 美妝板 - 美妝趨勢
'MakeUp', # 化妝板 - 彩妝趨勢
'WomenTalk', # 女板 - 女性消費趨勢
'home-sale', # 房屋板 - 居家用品參考
'BabyMother', # 媽寶板 - 母嬰市場
'Tech_Job', # 科技業 - 3C 消費力
]
2.3 Dcard 目標看板
DCARD_BOARDS = [
'網路購物', # 電商討論
'美妝', # 美妝趨勢
'穿搭', # 服飾趨勢
'3C', # 科技產品
'省錢', # 優惠情報
'生活', # 生活趨勢
'美食', # 餐飲趨勢
]
三、資料庫設計
3.1 trend_records (趨勢記錄表)
class TrendRecord(Base):
"""趨勢資料記錄 - 儲存爬取的原始內容"""
__tablename__ = 'trend_records'
id = Column(Integer, primary_key=True)
# 來源識別
source = Column(String(50), nullable=False, index=True)
# 可選值: 'google_news', 'ptt', 'dcard', 'youtube', 'weather'
source_board = Column(String(100))
# PTT/Dcard 看板名稱,如 'Gossiping', '網路購物'
source_url = Column(String(500))
# 原始連結
source_id = Column(String(100))
# 來源平台的唯一識別碼 (用於去重)
# 內容
title = Column(String(500), nullable=False)
content = Column(Text)
# 全文內容或摘要
author = Column(String(100))
# 作者/媒體名稱
# 互動指標
popularity_score = Column(Integer, default=0)
# 熱門度分數 (推數、讚數、觀看數等)
comment_count = Column(Integer, default=0)
# 留言數
# 分類標籤
category = Column(String(100), index=True)
# 商品分類對應: '美妝', '3C', '家電', '服飾' 等
tags = Column(Text)
# JSON 格式的標籤列表
# 時間資訊
published_at = Column(DateTime)
# 原始發布時間
trend_date = Column(Date, nullable=False, index=True)
# 趨勢所屬日期 (用於聚合查詢)
created_at = Column(DateTime, default=datetime.now)
# 爬取時間
# AI 分析結果
sentiment = Column(String(20))
# 情緒分析: 'positive', 'negative', 'neutral'
ai_summary = Column(Text)
# Ollama 生成的摘要
relevance_score = Column(Float, default=0.0)
# 與商品銷售的相關性分數 (0-1)
# 索引優化
__table_args__ = (
Index('idx_trend_source_date', 'source', 'trend_date'),
Index('idx_trend_category_date', 'category', 'trend_date'),
Index('idx_trend_popularity', 'popularity_score', 'trend_date'),
UniqueConstraint('source', 'source_id', name='uq_source_record'),
)
3.2 trend_keywords (趨勢關鍵字表)
class TrendKeyword(Base):
"""趨勢關鍵字 - 從文章中萃取的熱門詞彙"""
__tablename__ = 'trend_keywords'
id = Column(Integer, primary_key=True)
keyword = Column(String(100), nullable=False, index=True)
# 關鍵字
keyword_type = Column(String(50), default='general')
# 類型: 'product' (商品), 'brand' (品牌), 'event' (事件), 'general'
source = Column(String(50), nullable=False)
# 來源平台
category = Column(String(100), index=True)
# 商品分類
mention_count = Column(Integer, default=1)
# 提及次數
trend_date = Column(Date, nullable=False, index=True)
# 趨勢日期
sentiment_avg = Column(Float, default=0.0)
# 平均情緒分數 (-1 到 1)
related_keywords = Column(Text)
# JSON 格式的相關關鍵字
created_at = Column(DateTime, default=datetime.now)
updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)
__table_args__ = (
Index('idx_keyword_date_count', 'trend_date', 'mention_count'),
UniqueConstraint('keyword', 'source', 'trend_date', name='uq_keyword_source_date'),
)
3.3 trend_analysis (趨勢分析報告表)
class TrendAnalysis(Base):
"""趨勢分析報告 - Ollama AI 生成的分析結果"""
__tablename__ = 'trend_analysis'
id = Column(Integer, primary_key=True)
analysis_date = Column(Date, nullable=False, index=True)
# 分析日期
category = Column(String(100), index=True)
# 分析的商品分類 (null 表示全品類)
analysis_type = Column(String(50), nullable=False)
# 分析類型: 'daily_summary', 'weekly_trend', 'hot_topic', 'marketing_insight'
# AI 分析內容
summary = Column(Text, nullable=False)
# 摘要說明
hot_keywords = Column(Text)
# JSON: 熱門關鍵字列表
hot_topics = Column(Text)
# JSON: 熱門話題列表
consumer_insights = Column(Text)
# JSON: 消費者洞察
marketing_suggestions = Column(Text)
# JSON: 行銷建議
copywriting_hints = Column(Text)
# JSON: 文案撰寫提示
# 來源統計
source_stats = Column(Text)
# JSON: 各來源資料統計
record_count = Column(Integer, default=0)
# 分析涵蓋的記錄數
# Ollama 資訊
model_used = Column(String(50))
# 使用的模型
generation_time = Column(Float)
# 生成耗時 (秒)
created_at = Column(DateTime, default=datetime.now)
__table_args__ = (
UniqueConstraint('analysis_date', 'category', 'analysis_type', name='uq_analysis'),
)
四、服務層設計
4.1 更新 trend_crawler.py
在現有 services/trend_crawler.py 基礎上新增資料庫儲存功能:
# services/trend_crawler.py 擴展
from database.trend_models import TrendRecord, TrendKeyword, TrendAnalysis
from database.manager import get_session
from services.ollama_service import OllamaService
class TrendCrawlerService:
"""趨勢爬蟲服務 - 含資料庫儲存"""
def __init__(self):
self.crawler = TrendCrawler()
self.ollama = OllamaService()
self.logger = SystemLogger("TrendCrawlerService")
def crawl_and_save_all(self,
sources: List[str] = None,
categories: List[str] = None,
analyze: bool = True) -> dict:
"""
爬取所有來源並儲存到資料庫
Args:
sources: 指定來源 ['google_news', 'ptt', 'dcard', 'youtube']
categories: 指定分類
analyze: 是否進行 AI 分析
Returns:
dict: 爬取結果統計
"""
results = {
'total_records': 0,
'new_records': 0,
'keywords_extracted': 0,
'sources': {}
}
session = get_session()
try:
# 1. 爬取各來源資料
trend_data = self.crawler.get_all_trends(
categories=categories,
include_social=True
)
# 2. 儲存新聞資料
if not sources or 'google_news' in sources:
news_result = self._save_news_records(session, trend_data.news_items)
results['sources']['google_news'] = news_result
results['total_records'] += news_result['total']
results['new_records'] += news_result['new']
# 3. 儲存社群資料 (PTT/Dcard)
if not sources or 'ptt' in sources or 'dcard' in sources:
social_result = self._save_social_records(session, trend_data.social_posts)
results['sources']['social'] = social_result
results['total_records'] += social_result['total']
results['new_records'] += social_result['new']
# 4. 儲存 YouTube 資料
if not sources or 'youtube' in sources:
youtube_result = self._save_youtube_records(session, trend_data.youtube_videos)
results['sources']['youtube'] = youtube_result
results['total_records'] += youtube_result['total']
results['new_records'] += youtube_result['new']
# 5. 萃取關鍵字
keywords_count = self._extract_and_save_keywords(session)
results['keywords_extracted'] = keywords_count
session.commit()
# 6. AI 分析 (commit 後執行)
if analyze and results['new_records'] > 0:
self._generate_ai_analysis(session)
self.logger.info(f"趨勢爬取完成: {results}")
return results
except Exception as e:
session.rollback()
self.logger.error(f"趨勢爬取失敗: {e}")
raise
finally:
session.close()
def _save_news_records(self, session, news_items: List[NewsItem]) -> dict:
"""儲存新聞記錄"""
total = len(news_items)
new_count = 0
for news in news_items:
# 檢查是否已存在
exists = session.query(TrendRecord).filter(
TrendRecord.source == 'google_news',
TrendRecord.source_url == news.link
).first()
if not exists:
record = TrendRecord(
source='google_news',
source_url=news.link,
source_id=hashlib.md5(news.link.encode()).hexdigest(),
title=news.title,
content=news.summary,
author=news.source,
category=news.category,
published_at=news.published,
trend_date=news.published.date() if news.published else date.today(),
)
session.add(record)
new_count += 1
return {'total': total, 'new': new_count}
def _save_social_records(self, session, social_posts: List[SocialPost]) -> dict:
"""儲存社群記錄 (PTT/Dcard)"""
total = len(social_posts)
new_count = 0
for post in social_posts:
exists = session.query(TrendRecord).filter(
TrendRecord.source == post.platform.lower(),
TrendRecord.source_id == post.post_id
).first()
if not exists:
record = TrendRecord(
source=post.platform.lower(),
source_board=post.board,
source_url=post.url,
source_id=post.post_id,
title=post.title,
content=post.content,
author=post.author,
popularity_score=post.push_count or post.like_count or 0,
comment_count=post.comment_count or 0,
category=self._map_board_to_category(post.board),
published_at=post.created_at,
trend_date=post.created_at.date() if post.created_at else date.today(),
)
session.add(record)
new_count += 1
return {'total': total, 'new': new_count}
def _map_board_to_category(self, board: str) -> str:
"""將看板名稱對應到商品分類"""
mapping = {
# PTT
'Beauty': '美妝',
'MakeUp': '美妝',
'e-shopping': '電商',
'Lifeismoney': '優惠',
'home-sale': '居家',
'BabyMother': '母嬰',
# Dcard
'美妝': '美妝',
'穿搭': '服飾',
'3C': '3C',
'網路購物': '電商',
'省錢': '優惠',
}
return mapping.get(board, '其他')
def _extract_and_save_keywords(self, session) -> int:
"""從今日記錄中萃取關鍵字"""
today = date.today()
# 取得今日所有記錄
records = session.query(TrendRecord).filter(
TrendRecord.trend_date == today
).all()
# 使用 jieba 分詞萃取關鍵字
keyword_counts = defaultdict(lambda: {'count': 0, 'sources': set()})
for record in records:
text = f"{record.title} {record.content or ''}"
keywords = self._extract_keywords_jieba(text)
for kw in keywords:
keyword_counts[kw]['count'] += 1
keyword_counts[kw]['sources'].add(record.source)
# 儲存關鍵字 (只保留出現 2 次以上的)
saved_count = 0
for keyword, data in keyword_counts.items():
if data['count'] >= 2:
for source in data['sources']:
existing = session.query(TrendKeyword).filter(
TrendKeyword.keyword == keyword,
TrendKeyword.source == source,
TrendKeyword.trend_date == today
).first()
if existing:
existing.mention_count = data['count']
else:
kw_record = TrendKeyword(
keyword=keyword,
source=source,
mention_count=data['count'],
trend_date=today
)
session.add(kw_record)
saved_count += 1
return saved_count
def _generate_ai_analysis(self, session) -> None:
"""使用 Ollama 生成趨勢分析"""
today = date.today()
# 取得今日資料摘要
records = session.query(TrendRecord).filter(
TrendRecord.trend_date == today
).order_by(TrendRecord.popularity_score.desc()).limit(50).all()
if not records:
return
# 準備分析內容
content_for_analysis = "\n".join([
f"- [{r.source}] {r.title} (熱度:{r.popularity_score})"
for r in records
])
# 呼叫 Ollama 分析
prompt = f"""
請分析以下今日趨勢資料,提供行銷洞察:
{content_for_analysis}
請以 JSON 格式回覆,包含:
1. summary: 整體趨勢摘要 (100字內)
2. hot_keywords: 熱門關鍵字列表 (最多10個)
3. hot_topics: 熱門話題列表 (最多5個)
4. consumer_insights: 消費者洞察 (3-5點)
5. marketing_suggestions: 行銷建議 (3-5點)
6. copywriting_hints: 文案撰寫提示 (3-5個)
"""
response = self.ollama.generate(prompt)
if response.success:
try:
analysis_data = json.loads(response.content)
except json.JSONDecodeError:
analysis_data = {'summary': response.content}
analysis = TrendAnalysis(
analysis_date=today,
category=None, # 全品類
analysis_type='daily_summary',
summary=analysis_data.get('summary', ''),
hot_keywords=json.dumps(analysis_data.get('hot_keywords', []), ensure_ascii=False),
hot_topics=json.dumps(analysis_data.get('hot_topics', []), ensure_ascii=False),
consumer_insights=json.dumps(analysis_data.get('consumer_insights', []), ensure_ascii=False),
marketing_suggestions=json.dumps(analysis_data.get('marketing_suggestions', []), ensure_ascii=False),
copywriting_hints=json.dumps(analysis_data.get('copywriting_hints', []), ensure_ascii=False),
record_count=len(records),
model_used=response.model,
generation_time=response.duration
)
session.add(analysis)
session.commit()
4.2 Ollama 整合 (192.168.0.188)
確認 services/ollama_service.py 設定:
# config.py 新增
OLLAMA_CONFIG = {
'base_url': 'http://192.168.0.188:11434', # 內部 Ollama 伺服器
'model': 'gemma3:4b',
'timeout': 120,
'api_key': '<OLLAMA_API_KEY>'
}
五、API 端點設計
5.1 新增 routes/trend_routes.py
from flask import Blueprint, request, jsonify
from database.trend_models import TrendRecord, TrendKeyword, TrendAnalysis
from database.manager import get_session
from services.trend_crawler_service import TrendCrawlerService
from auth import login_required, permission_required
from datetime import date, timedelta
trend_bp = Blueprint('trend', __name__)
@trend_bp.route('/api/trends/records', methods=['GET'])
@login_required
@permission_required('report.trends.view')
def get_trend_records():
"""
取得趨勢記錄
Query params:
source: 來源篩選 (google_news, ptt, dcard, youtube)
category: 分類篩選
date_from: 起始日期 (YYYY-MM-DD)
date_to: 結束日期 (YYYY-MM-DD)
limit: 筆數限制 (預設 50)
offset: 分頁偏移
"""
source = request.args.get('source')
category = request.args.get('category')
date_from = request.args.get('date_from', (date.today() - timedelta(days=7)).isoformat())
date_to = request.args.get('date_to', date.today().isoformat())
limit = min(int(request.args.get('limit', 50)), 200)
offset = int(request.args.get('offset', 0))
session = get_session()
try:
query = session.query(TrendRecord).filter(
TrendRecord.trend_date >= date_from,
TrendRecord.trend_date <= date_to
)
if source:
query = query.filter(TrendRecord.source == source)
if category:
query = query.filter(TrendRecord.category == category)
total = query.count()
records = query.order_by(
TrendRecord.popularity_score.desc(),
TrendRecord.created_at.desc()
).offset(offset).limit(limit).all()
return jsonify({
'success': True,
'total': total,
'data': [
{
'id': r.id,
'source': r.source,
'source_board': r.source_board,
'title': r.title,
'content': r.content[:200] if r.content else None,
'author': r.author,
'popularity_score': r.popularity_score,
'category': r.category,
'published_at': r.published_at.isoformat() if r.published_at else None,
'source_url': r.source_url,
'sentiment': r.sentiment,
}
for r in records
]
})
finally:
session.close()
@trend_bp.route('/api/trends/keywords', methods=['GET'])
@login_required
@permission_required('report.trends.view')
def get_trend_keywords():
"""
取得熱門關鍵字
Query params:
source: 來源篩選
category: 分類篩選
days: 天數範圍 (預設 7)
limit: 筆數限制 (預設 30)
"""
source = request.args.get('source')
category = request.args.get('category')
days = int(request.args.get('days', 7))
limit = min(int(request.args.get('limit', 30)), 100)
date_from = date.today() - timedelta(days=days)
session = get_session()
try:
query = session.query(
TrendKeyword.keyword,
func.sum(TrendKeyword.mention_count).label('total_mentions'),
func.count(TrendKeyword.source.distinct()).label('source_count')
).filter(
TrendKeyword.trend_date >= date_from
).group_by(TrendKeyword.keyword)
if source:
query = query.filter(TrendKeyword.source == source)
if category:
query = query.filter(TrendKeyword.category == category)
keywords = query.order_by(
text('total_mentions DESC')
).limit(limit).all()
return jsonify({
'success': True,
'date_range': {
'from': date_from.isoformat(),
'to': date.today().isoformat()
},
'data': [
{
'keyword': kw.keyword,
'total_mentions': kw.total_mentions,
'source_count': kw.source_count
}
for kw in keywords
]
})
finally:
session.close()
@trend_bp.route('/api/trends/analysis', methods=['GET'])
@login_required
@permission_required('report.trends.view')
def get_trend_analysis():
"""
取得 AI 趨勢分析報告
Query params:
date: 分析日期 (預設今日)
category: 分類篩選
type: 分析類型 (daily_summary, weekly_trend, hot_topic)
"""
analysis_date = request.args.get('date', date.today().isoformat())
category = request.args.get('category')
analysis_type = request.args.get('type', 'daily_summary')
session = get_session()
try:
query = session.query(TrendAnalysis).filter(
TrendAnalysis.analysis_date == analysis_date,
TrendAnalysis.analysis_type == analysis_type
)
if category:
query = query.filter(TrendAnalysis.category == category)
else:
query = query.filter(TrendAnalysis.category.is_(None))
analysis = query.first()
if not analysis:
return jsonify({
'success': False,
'message': '找不到分析報告'
}), 404
return jsonify({
'success': True,
'data': {
'analysis_date': analysis.analysis_date.isoformat(),
'category': analysis.category,
'analysis_type': analysis.analysis_type,
'summary': analysis.summary,
'hot_keywords': json.loads(analysis.hot_keywords or '[]'),
'hot_topics': json.loads(analysis.hot_topics or '[]'),
'consumer_insights': json.loads(analysis.consumer_insights or '[]'),
'marketing_suggestions': json.loads(analysis.marketing_suggestions or '[]'),
'copywriting_hints': json.loads(analysis.copywriting_hints or '[]'),
'record_count': analysis.record_count,
'model_used': analysis.model_used,
'created_at': analysis.created_at.isoformat()
}
})
finally:
session.close()
@trend_bp.route('/api/trends/crawl', methods=['POST'])
@login_required
@permission_required('system.crawler.manage')
def trigger_trend_crawl():
"""手動觸發趨勢爬取"""
data = request.get_json() or {}
sources = data.get('sources') # 可選,指定來源
categories = data.get('categories') # 可選,指定分類
analyze = data.get('analyze', True) # 是否進行 AI 分析
try:
service = TrendCrawlerService()
result = service.crawl_and_save_all(
sources=sources,
categories=categories,
analyze=analyze
)
return jsonify({
'success': True,
'message': f"爬取完成: 新增 {result['new_records']} 筆記錄",
'result': result
})
except Exception as e:
return jsonify({
'success': False,
'message': f"爬取失敗: {str(e)}"
}), 500
@trend_bp.route('/api/trends/stats', methods=['GET'])
@login_required
@permission_required('report.trends.view')
def get_trend_stats():
"""取得趨勢資料統計"""
days = int(request.args.get('days', 7))
date_from = date.today() - timedelta(days=days)
session = get_session()
try:
# 各來源統計
source_stats = session.query(
TrendRecord.source,
func.count(TrendRecord.id).label('count'),
func.avg(TrendRecord.popularity_score).label('avg_popularity')
).filter(
TrendRecord.trend_date >= date_from
).group_by(TrendRecord.source).all()
# 各分類統計
category_stats = session.query(
TrendRecord.category,
func.count(TrendRecord.id).label('count')
).filter(
TrendRecord.trend_date >= date_from,
TrendRecord.category.isnot(None)
).group_by(TrendRecord.category).all()
# 每日趨勢量
daily_counts = session.query(
TrendRecord.trend_date,
func.count(TrendRecord.id).label('count')
).filter(
TrendRecord.trend_date >= date_from
).group_by(TrendRecord.trend_date).order_by(TrendRecord.trend_date).all()
return jsonify({
'success': True,
'date_range': {
'from': date_from.isoformat(),
'to': date.today().isoformat()
},
'source_stats': [
{
'source': s.source,
'count': s.count,
'avg_popularity': round(s.avg_popularity or 0, 1)
}
for s in source_stats
],
'category_stats': [
{'category': c.category, 'count': c.count}
for c in category_stats
],
'daily_counts': [
{'date': d.trend_date.isoformat(), 'count': d.count}
for d in daily_counts
]
})
finally:
session.close()
六、排程設定
6.1 app.py 新增排程任務
# app.py 中新增
def run_trend_crawler_task():
"""趨勢爬蟲排程任務"""
from services.trend_crawler_service import TrendCrawlerService
logger.info("[TrendCrawler] 開始執行趨勢爬取任務")
start_time = time.time()
try:
service = TrendCrawlerService()
result = service.crawl_and_save_all(analyze=True)
duration = time.time() - start_time
logger.info(f"[TrendCrawler] 完成: 新增 {result['new_records']} 筆, 耗時 {duration:.1f}s")
# 儲存統計
_save_stats('trend_crawler_task', {
'status': 'success',
'new_records': result['new_records'],
'keywords_extracted': result['keywords_extracted'],
'duration': round(duration, 2),
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
})
except Exception as e:
logger.error(f"[TrendCrawler] 失敗: {e}")
_save_stats('trend_crawler_task', {
'status': 'failed',
'error': str(e),
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
})
def init_scheduler():
"""初始化排程"""
# ... 現有排程 ...
# 趨勢爬蟲: 每 2 小時執行一次
schedule.every(2).hours.do(run_trend_crawler_task)
# 或指定時間執行 (上班時間)
# schedule.every().day.at("09:00").do(run_trend_crawler_task)
# schedule.every().day.at("12:00").do(run_trend_crawler_task)
# schedule.every().day.at("15:00").do(run_trend_crawler_task)
# schedule.every().day.at("18:00").do(run_trend_crawler_task)
七、前端介面設計
7.1 在 ai_recommend.html 新增趨勢區塊
<!-- 趨勢資料區塊 -->
<div class="card mb-4">
<div class="card-header d-flex justify-content-between align-items-center">
<h5 class="mb-0"><i class="fas fa-chart-line me-2"></i>即時趨勢洞察</h5>
<div>
<button class="btn btn-sm btn-outline-primary" onclick="refreshTrends()">
<i class="fas fa-sync-alt"></i> 更新
</button>
<button class="btn btn-sm btn-primary ms-2" onclick="triggerTrendCrawl()">
<i class="fas fa-spider"></i> 爬取最新
</button>
</div>
</div>
<div class="card-body">
<!-- 快速篩選 -->
<div class="row mb-3">
<div class="col-md-3">
<select id="trendSource" class="form-select" onchange="refreshTrends()">
<option value="">全部來源</option>
<option value="google_news">Google 新聞</option>
<option value="ptt">PTT</option>
<option value="dcard">Dcard</option>
<option value="youtube">YouTube</option>
</select>
</div>
<div class="col-md-3">
<select id="trendCategory" class="form-select" onchange="refreshTrends()">
<option value="">全部分類</option>
<option value="美妝">美妝</option>
<option value="3C">3C</option>
<option value="服飾">服飾</option>
<option value="居家">居家</option>
<option value="電商">電商</option>
</select>
</div>
<div class="col-md-3">
<select id="trendDays" class="form-select" onchange="refreshTrends()">
<option value="1">今日</option>
<option value="3">近 3 天</option>
<option value="7" selected>近 7 天</option>
<option value="14">近 14 天</option>
</select>
</div>
</div>
<!-- 統計概覽 -->
<div class="row mb-4" id="trendStats">
<!-- 由 JS 動態填充 -->
</div>
<!-- AI 分析摘要 -->
<div class="card bg-light mb-4" id="trendAnalysisCard" style="display:none;">
<div class="card-body">
<h6 class="card-title"><i class="fas fa-robot text-primary me-2"></i>AI 趨勢分析</h6>
<p id="trendAnalysisSummary" class="mb-3"></p>
<div class="row">
<div class="col-md-6">
<strong>熱門關鍵字:</strong>
<div id="trendHotKeywords" class="mt-2"></div>
</div>
<div class="col-md-6">
<strong>文案提示:</strong>
<ul id="trendCopywritingHints" class="mt-2 ps-3"></ul>
</div>
</div>
</div>
</div>
<!-- 熱門關鍵字標籤雲 -->
<div class="mb-4">
<h6><i class="fas fa-tags me-2"></i>熱門關鍵字</h6>
<div id="keywordCloud" class="d-flex flex-wrap gap-2">
<!-- 由 JS 動態填充 -->
</div>
</div>
<!-- 趨勢列表 -->
<div class="table-responsive">
<table class="table table-hover" id="trendTable">
<thead>
<tr>
<th>來源</th>
<th>標題</th>
<th>分類</th>
<th>熱度</th>
<th>時間</th>
<th>操作</th>
</tr>
</thead>
<tbody id="trendTableBody">
<!-- 由 JS 動態填充 -->
</tbody>
</table>
</div>
</div>
</div>
7.2 JavaScript 函數
// 趨勢相關函數
async function refreshTrends() {
const source = document.getElementById('trendSource').value;
const category = document.getElementById('trendCategory').value;
const days = document.getElementById('trendDays').value;
try {
// 並行載入資料
const [recordsRes, keywordsRes, statsRes, analysisRes] = await Promise.all([
fetch(`/api/trends/records?source=${source}&category=${category}&days=${days}&limit=20`),
fetch(`/api/trends/keywords?source=${source}&category=${category}&days=${days}&limit=30`),
fetch(`/api/trends/stats?days=${days}`),
fetch(`/api/trends/analysis`)
]);
const records = await recordsRes.json();
const keywords = await keywordsRes.json();
const stats = await statsRes.json();
const analysis = await analysisRes.json();
// 更新統計
if (stats.success) {
renderTrendStats(stats);
}
// 更新關鍵字雲
if (keywords.success) {
renderKeywordCloud(keywords.data);
}
// 更新趨勢列表
if (records.success) {
renderTrendTable(records.data);
}
// 更新 AI 分析
if (analysis.success) {
renderTrendAnalysis(analysis.data);
}
} catch (error) {
console.error('載入趨勢資料失敗:', error);
}
}
function renderTrendStats(stats) {
const html = `
<div class="col-md-3">
<div class="border rounded p-3 text-center">
<h4 class="mb-0">${stats.source_stats.reduce((a, b) => a + b.count, 0)}</h4>
<small class="text-muted">總趨勢數</small>
</div>
</div>
${stats.source_stats.slice(0, 3).map(s => `
<div class="col-md-3">
<div class="border rounded p-3 text-center">
<h4 class="mb-0">${s.count}</h4>
<small class="text-muted">${getSourceName(s.source)}</small>
</div>
</div>
`).join('')}
`;
document.getElementById('trendStats').innerHTML = html;
}
function renderKeywordCloud(keywords) {
const html = keywords.map(kw => {
const size = Math.min(Math.max(kw.total_mentions * 2, 12), 24);
return `
<span class="badge bg-primary"
style="font-size: ${size}px; cursor: pointer;"
onclick="addKeywordToCopy('${kw.keyword}')">
${kw.keyword} (${kw.total_mentions})
</span>
`;
}).join('');
document.getElementById('keywordCloud').innerHTML = html;
}
function renderTrendTable(records) {
const tbody = document.getElementById('trendTableBody');
tbody.innerHTML = records.map(r => `
<tr>
<td><span class="badge ${getSourceBadgeClass(r.source)}">${getSourceName(r.source)}</span></td>
<td>
<a href="${r.source_url}" target="_blank" class="text-decoration-none">
${r.title.substring(0, 50)}${r.title.length > 50 ? '...' : ''}
</a>
</td>
<td>${r.category || '-'}</td>
<td><span class="badge bg-warning text-dark">${r.popularity_score}</span></td>
<td><small>${formatDate(r.published_at)}</small></td>
<td>
<button class="btn btn-sm btn-outline-primary" onclick="useTrendForCopy('${encodeURIComponent(r.title)}')">
<i class="fas fa-copy"></i>
</button>
</td>
</tr>
`).join('');
}
function renderTrendAnalysis(data) {
const card = document.getElementById('trendAnalysisCard');
card.style.display = 'block';
document.getElementById('trendAnalysisSummary').textContent = data.summary;
// 熱門關鍵字
const keywordsHtml = (data.hot_keywords || []).map(kw =>
`<span class="badge bg-info me-1 mb-1" style="cursor:pointer" onclick="addKeywordToCopy('${kw}')">${kw}</span>`
).join('');
document.getElementById('trendHotKeywords').innerHTML = keywordsHtml;
// 文案提示
const hintsHtml = (data.copywriting_hints || []).map(hint =>
`<li>${hint}</li>`
).join('');
document.getElementById('trendCopywritingHints').innerHTML = hintsHtml;
}
function getSourceName(source) {
const names = {
'google_news': 'Google 新聞',
'ptt': 'PTT',
'dcard': 'Dcard',
'youtube': 'YouTube'
};
return names[source] || source;
}
function getSourceBadgeClass(source) {
const classes = {
'google_news': 'bg-danger',
'ptt': 'bg-primary',
'dcard': 'bg-success',
'youtube': 'bg-warning text-dark'
};
return classes[source] || 'bg-secondary';
}
function addKeywordToCopy(keyword) {
// 將關鍵字加入文案生成輸入框
const input = document.getElementById('aiKeywordInput');
if (input) {
const current = input.value.trim();
input.value = current ? `${current}, ${keyword}` : keyword;
}
}
function useTrendForCopy(encodedTitle) {
const title = decodeURIComponent(encodedTitle);
const input = document.getElementById('aiProductInput');
if (input) {
input.value = title;
}
}
async function triggerTrendCrawl() {
if (!confirm('確定要立即爬取最新趨勢資料?')) return;
try {
const response = await fetch('/api/trends/crawl', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-CSRFToken': getCSRFToken()
},
body: JSON.stringify({ analyze: true })
});
const result = await response.json();
if (result.success) {
alert(`爬取完成!新增 ${result.result.new_records} 筆記錄`);
refreshTrends();
} else {
alert('爬取失敗: ' + result.message);
}
} catch (error) {
alert('爬取失敗: ' + error.message);
}
}
// 頁面載入時初始化
document.addEventListener('DOMContentLoaded', function() {
refreshTrends();
});
八、新增檔案清單
| 檔案 | 類型 | 說明 |
|---|---|---|
database/trend_models.py |
新增 | 趨勢資料表模型 |
services/trend_crawler_service.py |
新增 | 趨勢爬蟲服務 (含 DB 儲存) |
routes/trend_routes.py |
新增 | 趨勢 API 端點 |
九、修改檔案清單
| 檔案 | 修改內容 |
|---|---|
database/manager.py |
新增 trend 表初始化 |
app.py |
註冊 trend_bp、新增排程任務 |
templates/ai_recommend.html |
新增趨勢資料區塊 |
config.py |
確認 Ollama 伺服器設定 (192.168.0.188) |
十、權限設定
在 services/permission_service.py 新增:
# 趨勢相關權限
'report.trends.view': {
'name': '查看趨勢資料',
'category': '報表',
'description': '訪問趨勢資料頁面和 API'
},
'report.trends.crawl': {
'name': '觸發趨勢爬取',
'category': '系統',
'description': '手動觸發趨勢爬蟲'
},
十一、實作步驟
Phase 1: 資料庫模型
- 建立
database/trend_models.py - 更新
database/manager.py初始化表結構
Phase 2: 服務層
- 建立
services/trend_crawler_service.py - 整合 Ollama 分析功能
Phase 3: API 端點
- 建立
routes/trend_routes.py - 在
app.py註冊 Blueprint
Phase 4: 排程任務
- 在
app.py新增run_trend_crawler_task - 設定排程 (每 2 小時)
Phase 5: 前端介面
- 更新
ai_recommend.html新增趨勢區塊 - 新增 JavaScript 函數
Phase 6: 測試
- 手動測試 API 端點
- 驗證排程任務
- 確認前端顯示正常
十二、Ollama 伺服器整合
伺服器資訊
- IP: 192.168.0.188
- Port: 11434
- Model: gemma3:4b
- API Key:
<OLLAMA_API_KEY>
網路設定確認
# 測試連線
curl http://192.168.0.188:11434/api/tags
# 測試生成
curl http://192.168.0.188:11434/api/generate \
-d '{"model": "gemma3:4b", "prompt": "Hello", "stream": false}'
config.py 設定
# Ollama 伺服器設定
OLLAMA_BASE_URL = os.getenv('OLLAMA_BASE_URL', 'http://192.168.0.188:11434')
OLLAMA_MODEL = os.getenv('OLLAMA_MODEL', 'gemma3:4b')
OLLAMA_API_KEY = os.getenv('OLLAMA_API_KEY', '<OLLAMA_API_KEY>')
OLLAMA_TIMEOUT = int(os.getenv('OLLAMA_TIMEOUT', '120'))
十三、Ollama Web Search 整合
13.1 Web Search 快取資料表
新增 web_search_cache 表以減少重複查詢:
class WebSearchCache(Base):
"""Web Search 結果快取 - 避免重複查詢"""
__tablename__ = 'web_search_cache'
id = Column(Integer, primary_key=True)
# 查詢識別
query_hash = Column(String(64), nullable=False, unique=True, index=True)
# MD5(query + search_type)
query = Column(String(500), nullable=False)
# 原始查詢字串
search_type = Column(String(50), default='general')
# 搜尋類型: general, news, shopping, trends
# 結果
result_json = Column(Text, nullable=False)
# JSON 格式的完整結果
summary = Column(Text)
# AI 生成的摘要
result_count = Column(Integer, default=0)
# 結果數量
# 元資料
model_used = Column(String(50))
generation_time = Column(Float)
# 時間
created_at = Column(DateTime, default=datetime.now, index=True)
expires_at = Column(DateTime)
# 快取過期時間 (預設 24 小時)
__table_args__ = (
Index('idx_cache_query_type', 'query', 'search_type'),
Index('idx_cache_expires', 'expires_at'),
)
13.2 Web Search 服務擴展
# services/ollama_service.py 擴展
class OllamaService:
# ... 現有程式碼 ...
def web_search_with_cache(self, query: str, search_type: str = 'general',
cache_hours: int = 24) -> OllamaResponse:
"""
帶快取的 Web Search
Args:
query: 搜尋查詢
search_type: 搜尋類型
cache_hours: 快取時間 (小時)
Returns:
OllamaResponse
"""
from database.trend_models import WebSearchCache
from database.manager import get_session
# 計算查詢雜湊
query_hash = hashlib.md5(f"{query}:{search_type}".encode()).hexdigest()
session = get_session()
try:
# 檢查快取
cache = session.query(WebSearchCache).filter(
WebSearchCache.query_hash == query_hash,
WebSearchCache.expires_at > datetime.now()
).first()
if cache:
# 命中快取
return OllamaResponse(
success=True,
content=cache.result_json,
model=cache.model_used,
duration=0.0 # 快取無延遲
)
# 執行搜尋
response = self.web_search(query, search_type=search_type)
if response.success:
# 儲存快取
new_cache = WebSearchCache(
query_hash=query_hash,
query=query,
search_type=search_type,
result_json=response.content,
summary=self._extract_summary(response.content),
model_used=response.model,
generation_time=response.duration,
expires_at=datetime.now() + timedelta(hours=cache_hours)
)
session.add(new_cache)
session.commit()
return response
finally:
session.close()
def search_trends_for_category(self, category: str) -> dict:
"""
搜尋特定分類的趨勢資訊
用於趨勢爬蟲的補充資料來源
"""
queries = {
'美妝': ['2026美妝趨勢', 'PTT美妝推薦', '韓系彩妝新品'],
'3C': ['2026 3C新品', 'PTT 3C開箱', '科技產品評測'],
'服飾': ['2026服飾流行', '穿搭趨勢', 'Dcard穿搭版'],
'居家': ['居家好物推薦', '收納神器', '家電開箱'],
'電商': ['電商優惠活動', '購物節', '限時特賣'],
}
category_queries = queries.get(category, [f'{category}趨勢'])
results = []
for q in category_queries:
response = self.web_search_with_cache(q, search_type='trends')
if response.success:
try:
data = json.loads(response.content)
results.append({
'query': q,
'results': data.get('results', []),
'keywords': data.get('keywords', [])
})
except:
pass
return {
'category': category,
'search_results': results,
'timestamp': datetime.now().isoformat()
}
13.3 趨勢 API 新增 Web Search 端點
# routes/trend_routes.py 新增
@trend_bp.route('/api/trends/web_search', methods=['POST'])
@login_required
@permission_required('report.trends.view')
def trend_web_search():
"""
使用 Ollama Web Search 搜尋趨勢
Request body:
query: 搜尋查詢
search_type: 搜尋類型 (general, news, shopping, trends)
use_cache: 是否使用快取 (預設 true)
"""
data = request.get_json() or {}
query = data.get('query', '').strip()
search_type = data.get('search_type', 'trends')
use_cache = data.get('use_cache', True)
if not query:
return jsonify({'success': False, 'message': '請輸入搜尋關鍵字'}), 400
try:
ollama = OllamaService()
if use_cache:
response = ollama.web_search_with_cache(query, search_type)
else:
response = ollama.web_search(query, search_type=search_type)
if response.success:
try:
parsed = json.loads(response.content)
except:
parsed = {'raw': response.content}
return jsonify({
'success': True,
'data': {
'query': query,
'search_type': search_type,
'results': parsed,
'model': response.model,
'duration': response.duration
}
})
else:
return jsonify({
'success': False,
'message': response.error or '搜尋失敗'
}), 500
except Exception as e:
return jsonify({
'success': False,
'message': f'搜尋失敗: {str(e)}'
}), 500
@trend_bp.route('/api/trends/category_insights', methods=['GET'])
@login_required
@permission_required('report.trends.view')
def get_category_insights():
"""
取得分類趨勢洞察 (結合 Web Search)
Query params:
category: 商品分類
"""
category = request.args.get('category', '美妝')
try:
ollama = OllamaService()
insights = ollama.search_trends_for_category(category)
return jsonify({
'success': True,
'data': insights
})
except Exception as e:
return jsonify({
'success': False,
'message': str(e)
}), 500
十四、Telegram Bot 整合
14.1 設計目標
將趨勢資料庫系統整合至 Telegram,讓團隊成員可以透過手機隨時:
- 查詢熱門趨勢關鍵字
- 執行 AI 網路搜尋
- 生成行銷文案
- 接收趨勢推播通知
14.2 Bot 資訊設定
# config.py 新增
TELEGRAM_BOT_CONFIG = {
'token': os.getenv('TELEGRAM_BOT_TOKEN'), # 從 BotFather 取得
'webhook_url': os.getenv('TELEGRAM_WEBHOOK_URL'), # 公開可存取的 URL
'allowed_users': os.getenv('TELEGRAM_ALLOWED_USERS', '').split(','), # 允許的用戶 ID
'admin_chat_id': os.getenv('TELEGRAM_ADMIN_CHAT_ID'), # 管理員聊天室 ID
}
14.3 新增資料表: telegram_users
class TelegramUser(Base):
"""Telegram 用戶綁定表"""
__tablename__ = 'telegram_users'
id = Column(Integer, primary_key=True)
telegram_id = Column(BigInteger, unique=True, nullable=False, index=True)
# Telegram 用戶 ID
telegram_username = Column(String(100))
# Telegram 用戶名稱
user_id = Column(Integer, ForeignKey('users.id'))
# 綁定的系統用戶 ID (可選)
display_name = Column(String(100))
# 顯示名稱
is_active = Column(Boolean, default=True)
# 是否啟用
is_admin = Column(Boolean, default=False)
# 是否為管理員
# 偏好設定
notify_trends = Column(Boolean, default=True)
# 是否接收趨勢通知
notify_daily_summary = Column(Boolean, default=True)
# 是否接收每日摘要
preferred_categories = Column(Text)
# JSON: 偏好的分類列表
created_at = Column(DateTime, default=datetime.now)
last_active_at = Column(DateTime, default=datetime.now)
# 關聯
user = relationship("User", backref="telegram_binding")
14.4 Telegram Bot 服務
# services/telegram_bot_service.py
import telegram
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import Application, CommandHandler, MessageHandler, CallbackQueryHandler, filters
import asyncio
from config import TELEGRAM_BOT_CONFIG
from services.ollama_service import OllamaService
from services.logger_manager import SystemLogger
class TrendTelegramBot:
"""趨勢資料庫 Telegram Bot 服務"""
def __init__(self):
self.token = TELEGRAM_BOT_CONFIG['token']
self.logger = SystemLogger("TelegramBot")
self.ollama = OllamaService()
self.application = None
async def start(self):
"""啟動 Bot"""
self.application = Application.builder().token(self.token).build()
# 註冊指令處理器
self.application.add_handler(CommandHandler("start", self.cmd_start))
self.application.add_handler(CommandHandler("help", self.cmd_help))
self.application.add_handler(CommandHandler("trend", self.cmd_trend))
self.application.add_handler(CommandHandler("search", self.cmd_search))
self.application.add_handler(CommandHandler("copy", self.cmd_copy))
self.application.add_handler(CommandHandler("keywords", self.cmd_keywords))
self.application.add_handler(CommandHandler("daily", self.cmd_daily))
self.application.add_handler(CommandHandler("settings", self.cmd_settings))
# 回調查詢處理器 (按鈕點擊)
self.application.add_handler(CallbackQueryHandler(self.handle_callback))
# 一般訊息處理器
self.application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, self.handle_message))
# 啟動
await self.application.initialize()
await self.application.start()
await self.application.updater.start_polling()
self.logger.info("Telegram Bot 已啟動")
async def stop(self):
"""停止 Bot"""
if self.application:
await self.application.updater.stop()
await self.application.stop()
await self.application.shutdown()
# ========== 指令處理器 ==========
async def cmd_start(self, update: Update, context):
"""開始指令"""
user = update.effective_user
await update.message.reply_text(
f"👋 嗨 {user.first_name}!\n\n"
f"我是 MOMO 趨勢助手 Bot,可以幫你:\n"
f"📊 /trend [分類] - 查看熱門趨勢\n"
f"🔍 /search [關鍵字] - AI 網路搜尋\n"
f"✍️ /copy [商品名] - AI 生成文案\n"
f"🏷️ /keywords - 查看熱門關鍵字\n"
f"📰 /daily - 每日趨勢摘要\n"
f"⚙️ /settings - 通知設定\n\n"
f"輸入 /help 查看詳細說明"
)
async def cmd_help(self, update: Update, context):
"""說明指令"""
help_text = """
📖 *指令說明*
*趨勢查詢*
/trend - 查看所有分類熱門趨勢
/trend 美妝 - 查看美妝分類趨勢
/trend 3C - 查看 3C 分類趨勢
*AI 搜尋*
/search 夏季防曬推薦 - AI 搜尋並分析
/search 母親節禮物 - 搜尋熱門話題
*文案生成*
/copy 防曬乳 - 為商品生成行銷文案
/copy 氣炸鍋 活動檔期 - 加入情境參數
*關鍵字*
/keywords - 近 7 天熱門關鍵字
/keywords 美妝 - 指定分類關鍵字
*每日摘要*
/daily - 查看今日趨勢摘要
*設定*
/settings - 管理通知偏好
"""
await update.message.reply_text(help_text, parse_mode='Markdown')
async def cmd_trend(self, update: Update, context):
"""趨勢查詢指令"""
category = ' '.join(context.args) if context.args else None
await update.message.reply_text("🔄 正在查詢趨勢資料...")
try:
from database.trend_models import TrendRecord, TrendKeyword
from database.manager import get_session
from datetime import date, timedelta
session = get_session()
date_from = date.today() - timedelta(days=7)
# 查詢熱門趨勢
query = session.query(TrendRecord).filter(
TrendRecord.trend_date >= date_from
)
if category:
query = query.filter(TrendRecord.category == category)
records = query.order_by(
TrendRecord.popularity_score.desc()
).limit(10).all()
session.close()
if not records:
await update.message.reply_text(
f"📭 {f'{category}分類' if category else ''}近 7 天沒有趨勢資料"
)
return
# 格式化回覆
title = f"📊 {category if category else '所有分類'}熱門趨勢 (近7天)\n\n"
lines = []
for i, r in enumerate(records, 1):
source_emoji = {'ptt': '💬', 'dcard': '📱', 'google_news': '📰', 'youtube': '🎬'}.get(r.source, '📄')
lines.append(f"{i}. {source_emoji} {r.title[:30]}... (熱度:{r.popularity_score})")
await update.message.reply_text(title + '\n'.join(lines))
except Exception as e:
await update.message.reply_text(f"❌ 查詢失敗: {str(e)}")
async def cmd_search(self, update: Update, context):
"""AI 搜尋指令"""
query = ' '.join(context.args)
if not query:
await update.message.reply_text("❓ 請輸入搜尋關鍵字\n範例: /search 夏季防曬推薦")
return
await update.message.reply_text(f"🔍 正在搜尋「{query}」...")
try:
response = self.ollama.web_search_with_cache(query, search_type='trends')
if response.success:
try:
data = json.loads(response.content)
summary = data.get('summary', '無摘要')
results = data.get('results', [])[:5]
reply = f"🔍 *搜尋結果: {query}*\n\n"
reply += f"📝 *摘要:*\n{summary}\n\n"
if results:
reply += "*相關資訊:*\n"
for i, r in enumerate(results, 1):
title = r.get('title', '無標題')[:40]
reply += f"{i}. {title}\n"
await update.message.reply_text(reply, parse_mode='Markdown')
except:
await update.message.reply_text(f"🔍 搜尋結果:\n{response.content[:1000]}")
else:
await update.message.reply_text(f"❌ 搜尋失敗: {response.error}")
except Exception as e:
await update.message.reply_text(f"❌ 搜尋失敗: {str(e)}")
async def cmd_copy(self, update: Update, context):
"""文案生成指令"""
if not context.args:
await update.message.reply_text("❓ 請輸入商品名稱\n範例: /copy 防曬乳")
return
product_name = context.args[0]
context_hint = ' '.join(context.args[1:]) if len(context.args) > 1 else None
await update.message.reply_text(f"✍️ 正在為「{product_name}」生成文案...")
try:
prompt = f"""
請為以下商品生成 3 種風格的行銷文案:
商品: {product_name}
{f'情境: {context_hint}' if context_hint else ''}
請分別生成:
1. 標準版 (100字內)
2. 活潑版 (含表情符號,100字內)
3. 限時版 (強調緊迫感,100字內)
以繁體中文回覆。
"""
response = self.ollama.generate(prompt)
if response.success:
await update.message.reply_text(
f"✨ *{product_name} 行銷文案*\n\n{response.content}",
parse_mode='Markdown'
)
else:
await update.message.reply_text(f"❌ 生成失敗: {response.error}")
except Exception as e:
await update.message.reply_text(f"❌ 生成失敗: {str(e)}")
async def cmd_keywords(self, update: Update, context):
"""熱門關鍵字指令"""
category = ' '.join(context.args) if context.args else None
try:
from database.trend_models import TrendKeyword
from database.manager import get_session
from sqlalchemy import func
from datetime import date, timedelta
session = get_session()
date_from = date.today() - timedelta(days=7)
query = session.query(
TrendKeyword.keyword,
func.sum(TrendKeyword.mention_count).label('total')
).filter(
TrendKeyword.trend_date >= date_from
).group_by(TrendKeyword.keyword)
if category:
query = query.filter(TrendKeyword.category == category)
keywords = query.order_by(func.sum(TrendKeyword.mention_count).desc()).limit(20).all()
session.close()
if not keywords:
await update.message.reply_text("📭 近 7 天沒有熱門關鍵字資料")
return
title = f"🏷️ {category if category else '所有分類'}熱門關鍵字 (近7天)\n\n"
lines = [f"{i}. {kw.keyword} ({kw.total}次)" for i, kw in enumerate(keywords, 1)]
await update.message.reply_text(title + '\n'.join(lines))
except Exception as e:
await update.message.reply_text(f"❌ 查詢失敗: {str(e)}")
async def cmd_daily(self, update: Update, context):
"""每日趨勢摘要"""
try:
from database.trend_models import TrendAnalysis
from database.manager import get_session
from datetime import date
session = get_session()
analysis = session.query(TrendAnalysis).filter(
TrendAnalysis.analysis_date == date.today(),
TrendAnalysis.analysis_type == 'daily_summary'
).first()
session.close()
if not analysis:
await update.message.reply_text("📭 今日尚無趨勢分析報告")
return
hot_keywords = json.loads(analysis.hot_keywords or '[]')
marketing = json.loads(analysis.marketing_suggestions or '[]')
reply = f"📰 *今日趨勢摘要*\n\n"
reply += f"📝 *概況:*\n{analysis.summary}\n\n"
if hot_keywords:
reply += f"🏷️ *熱門關鍵字:*\n{', '.join(hot_keywords[:10])}\n\n"
if marketing:
reply += f"💡 *行銷建議:*\n"
for i, m in enumerate(marketing[:5], 1):
reply += f"{i}. {m}\n"
await update.message.reply_text(reply, parse_mode='Markdown')
except Exception as e:
await update.message.reply_text(f"❌ 查詢失敗: {str(e)}")
async def cmd_settings(self, update: Update, context):
"""設定指令"""
keyboard = [
[InlineKeyboardButton("📊 趨勢通知 ON/OFF", callback_data="toggle_trends")],
[InlineKeyboardButton("📰 每日摘要 ON/OFF", callback_data="toggle_daily")],
[InlineKeyboardButton("🏷️ 設定偏好分類", callback_data="set_categories")],
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"⚙️ *通知設定*\n\n點擊下方按鈕調整設定",
reply_markup=reply_markup,
parse_mode='Markdown'
)
async def handle_callback(self, update: Update, context):
"""處理按鈕回調"""
query = update.callback_query
await query.answer()
if query.data == "toggle_trends":
await query.edit_message_text("✅ 趨勢通知已切換")
elif query.data == "toggle_daily":
await query.edit_message_text("✅ 每日摘要通知已切換")
elif query.data == "set_categories":
keyboard = [
[InlineKeyboardButton("美妝", callback_data="cat_美妝"),
InlineKeyboardButton("3C", callback_data="cat_3C")],
[InlineKeyboardButton("服飾", callback_data="cat_服飾"),
InlineKeyboardButton("居家", callback_data="cat_居家")],
[InlineKeyboardButton("電商", callback_data="cat_電商"),
InlineKeyboardButton("全部", callback_data="cat_all")],
]
await query.edit_message_text(
"🏷️ 選擇偏好分類 (可多選)",
reply_markup=InlineKeyboardMarkup(keyboard)
)
async def handle_message(self, update: Update, context):
"""處理一般訊息"""
text = update.message.text
# 簡單的自然語言處理
if '趨勢' in text or '熱門' in text:
await update.message.reply_text("💡 輸入 /trend 查看熱門趨勢")
elif '搜尋' in text or '查詢' in text:
await update.message.reply_text("💡 輸入 /search [關鍵字] 進行 AI 搜尋")
elif '文案' in text or '生成' in text:
await update.message.reply_text("💡 輸入 /copy [商品名] 生成文案")
else:
await update.message.reply_text("❓ 不確定您的需求,請輸入 /help 查看可用指令")
# ========== 推播功能 ==========
async def broadcast_trend_alert(self, message: str, category: str = None):
"""推播趨勢警報"""
from database.trend_models import TelegramUser
from database.manager import get_session
session = get_session()
query = session.query(TelegramUser).filter(
TelegramUser.is_active == True,
TelegramUser.notify_trends == True
)
if category:
query = query.filter(
TelegramUser.preferred_categories.like(f'%{category}%')
)
users = query.all()
session.close()
for user in users:
try:
await self.application.bot.send_message(
chat_id=user.telegram_id,
text=message,
parse_mode='Markdown'
)
except Exception as e:
self.logger.error(f"推播失敗 {user.telegram_id}: {e}")
async def send_daily_summary(self):
"""發送每日摘要"""
from database.trend_models import TrendAnalysis, TelegramUser
from database.manager import get_session
from datetime import date
session = get_session()
# 取得今日分析
analysis = session.query(TrendAnalysis).filter(
TrendAnalysis.analysis_date == date.today(),
TrendAnalysis.analysis_type == 'daily_summary'
).first()
if not analysis:
session.close()
return
# 取得要通知的用戶
users = session.query(TelegramUser).filter(
TelegramUser.is_active == True,
TelegramUser.notify_daily_summary == True
).all()
session.close()
# 格式化訊息
hot_keywords = json.loads(analysis.hot_keywords or '[]')
message = f"📰 *{date.today()} 趨勢日報*\n\n"
message += f"{analysis.summary}\n\n"
message += f"🏷️ *熱門關鍵字:* {', '.join(hot_keywords[:5])}"
# 推播
for user in users:
try:
await self.application.bot.send_message(
chat_id=user.telegram_id,
text=message,
parse_mode='Markdown'
)
except Exception as e:
self.logger.error(f"每日摘要推播失敗 {user.telegram_id}: {e}")
# 全域實例
_bot_instance = None
def get_telegram_bot() -> TrendTelegramBot:
global _bot_instance
if _bot_instance is None:
_bot_instance = TrendTelegramBot()
return _bot_instance
14.5 整合至主應用程式
# app.py 新增
import asyncio
from services.telegram_bot_service import get_telegram_bot
# 在應用程式啟動時初始化 Bot
def init_telegram_bot():
"""初始化 Telegram Bot"""
if not TELEGRAM_BOT_CONFIG.get('token'):
logger.warning("Telegram Bot Token 未設定,跳過初始化")
return
bot = get_telegram_bot()
# 在背景執行 Bot
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
def run_bot():
loop.run_until_complete(bot.start())
loop.run_forever()
bot_thread = threading.Thread(target=run_bot, daemon=True)
bot_thread.start()
logger.info("Telegram Bot 已在背景啟動")
# 在排程中加入每日摘要推播
def run_daily_summary_broadcast():
"""每日摘要推播任務"""
bot = get_telegram_bot()
if bot.application:
asyncio.run(bot.send_daily_summary())
def init_scheduler():
# ... 現有排程 ...
# 每日早上 9 點推播摘要
schedule.every().day.at("09:00").do(run_daily_summary_broadcast)
14.6 環境變數設定
# .env 新增
TELEGRAM_BOT_TOKEN=your_bot_token_from_botfather
TELEGRAM_WEBHOOK_URL=https://your-domain.com/webhook/telegram
TELEGRAM_ALLOWED_USERS=123456789,987654321
TELEGRAM_ADMIN_CHAT_ID=123456789
14.7 建立 Telegram Bot 步驟
- 在 Telegram 搜尋 @BotFather
- 發送
/newbot建立新 Bot - 設定 Bot 名稱和 username
- 取得 API Token
- 設定環境變數
- 重啟應用程式
十五、新增檔案清單 (更新版)
| 檔案 | 類型 | 說明 |
|---|---|---|
database/trend_models.py |
新增 | 趨勢資料表模型 (含 WebSearchCache, TelegramUser) |
services/trend_crawler_service.py |
新增 | 趨勢爬蟲服務 (含 DB 儲存) |
services/telegram_bot_service.py |
新增 | Telegram Bot 服務 |
routes/trend_routes.py |
新增 | 趨勢 API 端點 (含 Web Search API) |
十六、修改檔案清單 (更新版)
| 檔案 | 修改內容 |
|---|---|
database/manager.py |
新增 trend 表、web_search_cache 表、telegram_users 表初始化 |
services/ollama_service.py |
新增 web_search_with_cache、search_trends_for_category 方法 |
app.py |
註冊 trend_bp、新增排程任務、初始化 Telegram Bot |
templates/ai_recommend.html |
新增趨勢資料區塊、Web Search UI |
config.py |
新增 Telegram Bot 設定 |
.env.example |
新增 Telegram 相關環境變數 |
十七、權限設定 (更新版)
# 趨勢相關權限
'report.trends.view': {
'name': '查看趨勢資料',
'category': '報表',
'description': '訪問趨勢資料頁面和 API'
},
'report.trends.crawl': {
'name': '觸發趨勢爬取',
'category': '系統',
'description': '手動觸發趨勢爬蟲'
},
'report.trends.web_search': {
'name': '使用 AI 搜尋',
'category': '報表',
'description': '使用 Ollama Web Search 功能'
},
'system.telegram.manage': {
'name': '管理 Telegram Bot',
'category': '系統',
'description': '管理 Telegram Bot 設定和用戶'
},
十八、實作步驟 (更新版)
Phase 1: 資料庫模型
- 建立
database/trend_models.py(含 WebSearchCache, TelegramUser) - 更新
database/manager.py初始化表結構
Phase 2: 服務層
- 建立
services/trend_crawler_service.py - 更新
services/ollama_service.py新增 Web Search 快取方法 - 建立
services/telegram_bot_service.py
Phase 3: API 端點
- 建立
routes/trend_routes.py(含 Web Search API) - 在
app.py註冊 Blueprint
Phase 4: 排程任務
- 在
app.py新增run_trend_crawler_task - 新增
run_daily_summary_broadcast - 設定排程
Phase 5: Telegram Bot
- 向 BotFather 註冊 Bot
- 設定環境變數
- 初始化 Bot 服務
Phase 6: 前端介面
- 更新
ai_recommend.html新增趨勢區塊 - 新增 Web Search UI 組件
- 新增 JavaScript 函數
Phase 7: 測試
- 手動測試 API 端點
- 驗證 Web Search 快取
- 測試 Telegram Bot 指令
- 確認排程任務和推播
文件結束