Files
ewoooc/docs/TREND_DATABASE_DESIGN.md
OoO d6d8777e41
All checks were successful
CD Pipeline / deploy (push) Successful in 1m12s
V10.601 收斂 Gemini 與密鑰治理
2026-06-06 14:52:46 +08:00

2337 lines
83 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 趨勢資料庫系統設計文件
## 文件資訊
- **建立日期**: 2026-01-22
- **目標**: 自動爬取社群趨勢資料,儲存至資料庫,供 AI 文案生成參考
- **Ollama 伺服器**: 192.168.0.188 (內部網路)
- **新增功能**: Ollama Web Search 整合、Telegram Bot 整合
---
## 一、系統架構概覽
```
┌─────────────────────────────────────────────────────────────────────────────┐
│ 趨勢資料庫系統架構 (含 Web Search & Telegram) │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ 資料來源層 │ │
│ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌───────────────┐ │ │
│ │ │Google News │ │ PTT │ │ Dcard │ │ Ollama Web │ │ │
│ │ │ RSS Feed │ │ 熱門看板 │ │ 熱門文章 │ │ Search │ │ │
│ │ └─────┬──────┘ └─────┬──────┘ └─────┬──────┘ └───────┬───────┘ │ │
│ │ │ │ │ │ │ │
│ │ └───────────────┴───────────────┴─────────────────┘ │ │
│ └────────────────────────────────┬────────────────────────────────────┘ │
│ ▼ │
│ ┌──────────────────────────────┐ │
│ │ Trend Crawler Service │ │
│ │ + Ollama Web Search Service │ │
│ └──────────────┬───────────────┘ │
│ │ │
│ ┌────────────────────────┼────────────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌────────────┐ ┌──────────────────────┐ ┌────────────────┐ │
│ │ 原始資料 │ │ Ollama AI 分析 │ │ 關鍵字萃取 │ │
│ │ 儲存 │ │ (192.168.0.188) │ │ TF-IDF │ │
│ └─────┬──────┘ │ - 生成分析 │ └───────┬────────┘ │
│ │ │ - Web Search │ │ │
│ │ │ - 情緒分析 │ │ │
│ │ └──────────┬───────────┘ │ │
│ │ │ │ │
│ └──────────────────────┼────────────────────────┘ │
│ ▼ │
│ ┌──────────────────────────────┐ │
│ │ SQLite Database │ │
│ │ ┌────────────────────────┐ │ │
│ │ │ trend_records │ │ │
│ │ │ trend_keywords │ │ │
│ │ │ trend_analysis │ │ │
│ │ │ web_search_cache │ │ ← 新增Web Search 快取 │
│ │ └────────────────────────┘ │ │
│ └──────────────┬───────────────┘ │
│ │ │
│ ┌────────────────────┼────────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Web API │ │ Telegram Bot │ │ AI 推薦頁面 │ │
│ │ Endpoints │ │ Service │ │ Frontend │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
│ │
└────────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────────┐
│ Telegram Bot 互動流程 │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ User Telegram Bot Backend │
│ │ │ │ │
│ │ /trend 美妝 │ │ │
│ │─────────────────────────>│ │ │
│ │ │ GET /api/trends/keywords │ │
│ │ │────────────────────────────>│ │
│ │ │<────────────────────────────│ │
│ │ 📊 美妝熱門關鍵字: │ │ │
│ │ 1. 保濕 (58次) │ │ │
│ │ 2. 防曬 (45次) │ │ │
│ │<─────────────────────────│ │ │
│ │ │ │ │
│ │ /search 夏季防曬推薦 │ │ │
│ │─────────────────────────>│ │ │
│ │ │ POST /api/ai/web_search │ │
│ │ │────────────────────────────>│ │
│ │ │ (Ollama Web Search) │ │
│ │ │<────────────────────────────│ │
│ │ 🔍 搜尋結果: │ │ │
│ │ - 2026防曬評比... │ │ │
│ │ - PTT推薦清單... │ │ │
│ │<─────────────────────────│ │ │
│ │ │ │ │
│ │ /copy 防曬乳 │ │ │
│ │─────────────────────────>│ │ │
│ │ │ POST /api/ai/generate │ │
│ │ │────────────────────────────>│ │
│ │ │<────────────────────────────│ │
│ │ ✨ AI 生成文案: │ │ │
│ │ 「夏日必備防曬神器...」 │ │ │
│ │<─────────────────────────│ │ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
```
---
## 1.5 Ollama Web Search 整合架構
### Web Search 工作流程
```
┌─────────────────────────────────────────────────────────────────┐
│ Ollama Web Search 流程 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 1. 使用者輸入查詢 │
│ ↓ │
│ 2. 系統呼叫 Ollama Web Search API │
│ POST http://192.168.0.188:11434/api/chat │
│ { │
│ "model": "gemma3:4b", │
│ "messages": [...], │
│ "tools": [{"type": "web_search", ...}] ← 啟用 Web Search │
│ } │
│ ↓ │
│ 3. Ollama 執行網路搜尋並分析結果 │
│ ↓ │
│ 4. 回傳結構化結果 (JSON) │
│ - 搜尋結果摘要 │
│ - 來源連結 │
│ - AI 分析洞察 │
│ ↓ │
│ 5. 儲存至 web_search_cache 表 (可選) │
│ ↓ │
│ 6. 回傳給前端/Telegram │
│ │
└─────────────────────────────────────────────────────────────────┘
```
### 已實作的 Web Search API
系統中 `services/ollama_service.py` 已包含以下 Web Search 方法:
| 方法 | 說明 |
|------|------|
| `web_search(query, search_type)` | 通用網路搜尋 |
| `search_product_insights(product_name)` | 商品市場洞察 |
| `search_trend_keywords(category)` | 趨勢關鍵字搜尋 |
### 整合至趨勢爬取
Web Search 可作為額外的趨勢資料來源,補充傳統爬蟲無法取得的即時資訊:
```python
# 在 TrendCrawlerService 中整合 Web Search
def crawl_with_web_search(self, query: str, category: str = None):
"""使用 Ollama Web Search 爬取趨勢資料"""
# 呼叫 Web Search
response = self.ollama.web_search(
query=query,
search_type='trends',
num_results=10
)
if response.success:
# 解析結果並儲存
results = json.loads(response.content)
for item in results.get('results', []):
record = TrendRecord(
source='ollama_web_search',
source_url=item.get('url'),
title=item.get('title'),
content=item.get('snippet'),
category=category,
trend_date=date.today(),
ai_summary=results.get('summary')
)
session.add(record)
```
---
## 二、資料來源規劃
### 2.1 支援的資料來源
| 來源 | 類型 | 爬取方式 | 更新頻率 | 說明 |
|------|------|---------|---------|------|
| Google News | 新聞 | RSS Feed | 每小時 | 台灣新聞、科技、生活類別 |
| PTT | 社群 | Web Scraping | 每 2 小時 | Gossiping, Lifeismoney, e-shopping |
| Dcard | 社群 | Web Scraping | 每 2 小時 | 熱門看板、電商相關 |
| YouTube Trending | 影音 | YouTube API | 每 4 小時 | 台灣熱門影片 |
| 氣象資料 | 天氣 | 中央氣象署 API | 每 6 小時 | 搭配季節性行銷 |
| **Ollama Web Search** | **AI 搜尋** | **Ollama API** | **即時/排程** | **AI 智慧網路搜尋,補充即時資訊** |
### 2.2 PTT 目標看板
```python
PTT_BOARDS = [
'Gossiping', # 八卦板 - 熱門話題
'Lifeismoney', # 省錢板 - 優惠情報
'e-shopping', # 網購板 - 電商趨勢
'Beauty', # 美妝板 - 美妝趨勢
'MakeUp', # 化妝板 - 彩妝趨勢
'WomenTalk', # 女板 - 女性消費趨勢
'home-sale', # 房屋板 - 居家用品參考
'BabyMother', # 媽寶板 - 母嬰市場
'Tech_Job', # 科技業 - 3C 消費力
]
```
### 2.3 Dcard 目標看板
```python
DCARD_BOARDS = [
'網路購物', # 電商討論
'美妝', # 美妝趨勢
'穿搭', # 服飾趨勢
'3C', # 科技產品
'省錢', # 優惠情報
'生活', # 生活趨勢
'美食', # 餐飲趨勢
]
```
---
## 三、資料庫設計
### 3.1 trend_records (趨勢記錄表)
```python
class TrendRecord(Base):
"""趨勢資料記錄 - 儲存爬取的原始內容"""
__tablename__ = 'trend_records'
id = Column(Integer, primary_key=True)
# 來源識別
source = Column(String(50), nullable=False, index=True)
# 可選值: 'google_news', 'ptt', 'dcard', 'youtube', 'weather'
source_board = Column(String(100))
# PTT/Dcard 看板名稱,如 'Gossiping', '網路購物'
source_url = Column(String(500))
# 原始連結
source_id = Column(String(100))
# 來源平台的唯一識別碼 (用於去重)
# 內容
title = Column(String(500), nullable=False)
content = Column(Text)
# 全文內容或摘要
author = Column(String(100))
# 作者/媒體名稱
# 互動指標
popularity_score = Column(Integer, default=0)
# 熱門度分數 (推數、讚數、觀看數等)
comment_count = Column(Integer, default=0)
# 留言數
# 分類標籤
category = Column(String(100), index=True)
# 商品分類對應: '美妝', '3C', '家電', '服飾' 等
tags = Column(Text)
# JSON 格式的標籤列表
# 時間資訊
published_at = Column(DateTime)
# 原始發布時間
trend_date = Column(Date, nullable=False, index=True)
# 趨勢所屬日期 (用於聚合查詢)
created_at = Column(DateTime, default=datetime.now)
# 爬取時間
# AI 分析結果
sentiment = Column(String(20))
# 情緒分析: 'positive', 'negative', 'neutral'
ai_summary = Column(Text)
# Ollama 生成的摘要
relevance_score = Column(Float, default=0.0)
# 與商品銷售的相關性分數 (0-1)
# 索引優化
__table_args__ = (
Index('idx_trend_source_date', 'source', 'trend_date'),
Index('idx_trend_category_date', 'category', 'trend_date'),
Index('idx_trend_popularity', 'popularity_score', 'trend_date'),
UniqueConstraint('source', 'source_id', name='uq_source_record'),
)
```
### 3.2 trend_keywords (趨勢關鍵字表)
```python
class TrendKeyword(Base):
"""趨勢關鍵字 - 從文章中萃取的熱門詞彙"""
__tablename__ = 'trend_keywords'
id = Column(Integer, primary_key=True)
keyword = Column(String(100), nullable=False, index=True)
# 關鍵字
keyword_type = Column(String(50), default='general')
# 類型: 'product' (商品), 'brand' (品牌), 'event' (事件), 'general'
source = Column(String(50), nullable=False)
# 來源平台
category = Column(String(100), index=True)
# 商品分類
mention_count = Column(Integer, default=1)
# 提及次數
trend_date = Column(Date, nullable=False, index=True)
# 趨勢日期
sentiment_avg = Column(Float, default=0.0)
# 平均情緒分數 (-1 到 1)
related_keywords = Column(Text)
# JSON 格式的相關關鍵字
created_at = Column(DateTime, default=datetime.now)
updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)
__table_args__ = (
Index('idx_keyword_date_count', 'trend_date', 'mention_count'),
UniqueConstraint('keyword', 'source', 'trend_date', name='uq_keyword_source_date'),
)
```
### 3.3 trend_analysis (趨勢分析報告表)
```python
class TrendAnalysis(Base):
"""趨勢分析報告 - Ollama AI 生成的分析結果"""
__tablename__ = 'trend_analysis'
id = Column(Integer, primary_key=True)
analysis_date = Column(Date, nullable=False, index=True)
# 分析日期
category = Column(String(100), index=True)
# 分析的商品分類 (null 表示全品類)
analysis_type = Column(String(50), nullable=False)
# 分析類型: 'daily_summary', 'weekly_trend', 'hot_topic', 'marketing_insight'
# AI 分析內容
summary = Column(Text, nullable=False)
# 摘要說明
hot_keywords = Column(Text)
# JSON: 熱門關鍵字列表
hot_topics = Column(Text)
# JSON: 熱門話題列表
consumer_insights = Column(Text)
# JSON: 消費者洞察
marketing_suggestions = Column(Text)
# JSON: 行銷建議
copywriting_hints = Column(Text)
# JSON: 文案撰寫提示
# 來源統計
source_stats = Column(Text)
# JSON: 各來源資料統計
record_count = Column(Integer, default=0)
# 分析涵蓋的記錄數
# Ollama 資訊
model_used = Column(String(50))
# 使用的模型
generation_time = Column(Float)
# 生成耗時 (秒)
created_at = Column(DateTime, default=datetime.now)
__table_args__ = (
UniqueConstraint('analysis_date', 'category', 'analysis_type', name='uq_analysis'),
)
```
---
## 四、服務層設計
### 4.1 更新 trend_crawler.py
在現有 `services/trend_crawler.py` 基礎上新增資料庫儲存功能:
```python
# services/trend_crawler.py 擴展
from database.trend_models import TrendRecord, TrendKeyword, TrendAnalysis
from database.manager import get_session
from services.ollama_service import OllamaService
class TrendCrawlerService:
"""趨勢爬蟲服務 - 含資料庫儲存"""
def __init__(self):
self.crawler = TrendCrawler()
self.ollama = OllamaService()
self.logger = SystemLogger("TrendCrawlerService")
def crawl_and_save_all(self,
sources: List[str] = None,
categories: List[str] = None,
analyze: bool = True) -> dict:
"""
爬取所有來源並儲存到資料庫
Args:
sources: 指定來源 ['google_news', 'ptt', 'dcard', 'youtube']
categories: 指定分類
analyze: 是否進行 AI 分析
Returns:
dict: 爬取結果統計
"""
results = {
'total_records': 0,
'new_records': 0,
'keywords_extracted': 0,
'sources': {}
}
session = get_session()
try:
# 1. 爬取各來源資料
trend_data = self.crawler.get_all_trends(
categories=categories,
include_social=True
)
# 2. 儲存新聞資料
if not sources or 'google_news' in sources:
news_result = self._save_news_records(session, trend_data.news_items)
results['sources']['google_news'] = news_result
results['total_records'] += news_result['total']
results['new_records'] += news_result['new']
# 3. 儲存社群資料 (PTT/Dcard)
if not sources or 'ptt' in sources or 'dcard' in sources:
social_result = self._save_social_records(session, trend_data.social_posts)
results['sources']['social'] = social_result
results['total_records'] += social_result['total']
results['new_records'] += social_result['new']
# 4. 儲存 YouTube 資料
if not sources or 'youtube' in sources:
youtube_result = self._save_youtube_records(session, trend_data.youtube_videos)
results['sources']['youtube'] = youtube_result
results['total_records'] += youtube_result['total']
results['new_records'] += youtube_result['new']
# 5. 萃取關鍵字
keywords_count = self._extract_and_save_keywords(session)
results['keywords_extracted'] = keywords_count
session.commit()
# 6. AI 分析 (commit 後執行)
if analyze and results['new_records'] > 0:
self._generate_ai_analysis(session)
self.logger.info(f"趨勢爬取完成: {results}")
return results
except Exception as e:
session.rollback()
self.logger.error(f"趨勢爬取失敗: {e}")
raise
finally:
session.close()
def _save_news_records(self, session, news_items: List[NewsItem]) -> dict:
"""儲存新聞記錄"""
total = len(news_items)
new_count = 0
for news in news_items:
# 檢查是否已存在
exists = session.query(TrendRecord).filter(
TrendRecord.source == 'google_news',
TrendRecord.source_url == news.link
).first()
if not exists:
record = TrendRecord(
source='google_news',
source_url=news.link,
source_id=hashlib.md5(news.link.encode()).hexdigest(),
title=news.title,
content=news.summary,
author=news.source,
category=news.category,
published_at=news.published,
trend_date=news.published.date() if news.published else date.today(),
)
session.add(record)
new_count += 1
return {'total': total, 'new': new_count}
def _save_social_records(self, session, social_posts: List[SocialPost]) -> dict:
"""儲存社群記錄 (PTT/Dcard)"""
total = len(social_posts)
new_count = 0
for post in social_posts:
exists = session.query(TrendRecord).filter(
TrendRecord.source == post.platform.lower(),
TrendRecord.source_id == post.post_id
).first()
if not exists:
record = TrendRecord(
source=post.platform.lower(),
source_board=post.board,
source_url=post.url,
source_id=post.post_id,
title=post.title,
content=post.content,
author=post.author,
popularity_score=post.push_count or post.like_count or 0,
comment_count=post.comment_count or 0,
category=self._map_board_to_category(post.board),
published_at=post.created_at,
trend_date=post.created_at.date() if post.created_at else date.today(),
)
session.add(record)
new_count += 1
return {'total': total, 'new': new_count}
def _map_board_to_category(self, board: str) -> str:
"""將看板名稱對應到商品分類"""
mapping = {
# PTT
'Beauty': '美妝',
'MakeUp': '美妝',
'e-shopping': '電商',
'Lifeismoney': '優惠',
'home-sale': '居家',
'BabyMother': '母嬰',
# Dcard
'美妝': '美妝',
'穿搭': '服飾',
'3C': '3C',
'網路購物': '電商',
'省錢': '優惠',
}
return mapping.get(board, '其他')
def _extract_and_save_keywords(self, session) -> int:
"""從今日記錄中萃取關鍵字"""
today = date.today()
# 取得今日所有記錄
records = session.query(TrendRecord).filter(
TrendRecord.trend_date == today
).all()
# 使用 jieba 分詞萃取關鍵字
keyword_counts = defaultdict(lambda: {'count': 0, 'sources': set()})
for record in records:
text = f"{record.title} {record.content or ''}"
keywords = self._extract_keywords_jieba(text)
for kw in keywords:
keyword_counts[kw]['count'] += 1
keyword_counts[kw]['sources'].add(record.source)
# 儲存關鍵字 (只保留出現 2 次以上的)
saved_count = 0
for keyword, data in keyword_counts.items():
if data['count'] >= 2:
for source in data['sources']:
existing = session.query(TrendKeyword).filter(
TrendKeyword.keyword == keyword,
TrendKeyword.source == source,
TrendKeyword.trend_date == today
).first()
if existing:
existing.mention_count = data['count']
else:
kw_record = TrendKeyword(
keyword=keyword,
source=source,
mention_count=data['count'],
trend_date=today
)
session.add(kw_record)
saved_count += 1
return saved_count
def _generate_ai_analysis(self, session) -> None:
"""使用 Ollama 生成趨勢分析"""
today = date.today()
# 取得今日資料摘要
records = session.query(TrendRecord).filter(
TrendRecord.trend_date == today
).order_by(TrendRecord.popularity_score.desc()).limit(50).all()
if not records:
return
# 準備分析內容
content_for_analysis = "\n".join([
f"- [{r.source}] {r.title} (熱度:{r.popularity_score})"
for r in records
])
# 呼叫 Ollama 分析
prompt = f"""
請分析以下今日趨勢資料,提供行銷洞察:
{content_for_analysis}
請以 JSON 格式回覆,包含:
1. summary: 整體趨勢摘要 (100字內)
2. hot_keywords: 熱門關鍵字列表 (最多10個)
3. hot_topics: 熱門話題列表 (最多5個)
4. consumer_insights: 消費者洞察 (3-5點)
5. marketing_suggestions: 行銷建議 (3-5點)
6. copywriting_hints: 文案撰寫提示 (3-5個)
"""
response = self.ollama.generate(prompt)
if response.success:
try:
analysis_data = json.loads(response.content)
except json.JSONDecodeError:
analysis_data = {'summary': response.content}
analysis = TrendAnalysis(
analysis_date=today,
category=None, # 全品類
analysis_type='daily_summary',
summary=analysis_data.get('summary', ''),
hot_keywords=json.dumps(analysis_data.get('hot_keywords', []), ensure_ascii=False),
hot_topics=json.dumps(analysis_data.get('hot_topics', []), ensure_ascii=False),
consumer_insights=json.dumps(analysis_data.get('consumer_insights', []), ensure_ascii=False),
marketing_suggestions=json.dumps(analysis_data.get('marketing_suggestions', []), ensure_ascii=False),
copywriting_hints=json.dumps(analysis_data.get('copywriting_hints', []), ensure_ascii=False),
record_count=len(records),
model_used=response.model,
generation_time=response.duration
)
session.add(analysis)
session.commit()
```
### 4.2 Ollama 整合 (192.168.0.188)
確認 `services/ollama_service.py` 設定:
```python
# config.py 新增
OLLAMA_CONFIG = {
'base_url': 'http://192.168.0.188:11434', # 內部 Ollama 伺服器
'model': 'gemma3:4b',
'timeout': 120,
'api_key': '<OLLAMA_API_KEY>'
}
```
---
## 五、API 端點設計
### 5.1 新增 routes/trend_routes.py
```python
from flask import Blueprint, request, jsonify
from database.trend_models import TrendRecord, TrendKeyword, TrendAnalysis
from database.manager import get_session
from services.trend_crawler_service import TrendCrawlerService
from auth import login_required, permission_required
from datetime import date, timedelta
trend_bp = Blueprint('trend', __name__)
@trend_bp.route('/api/trends/records', methods=['GET'])
@login_required
@permission_required('report.trends.view')
def get_trend_records():
"""
取得趨勢記錄
Query params:
source: 來源篩選 (google_news, ptt, dcard, youtube)
category: 分類篩選
date_from: 起始日期 (YYYY-MM-DD)
date_to: 結束日期 (YYYY-MM-DD)
limit: 筆數限制 (預設 50)
offset: 分頁偏移
"""
source = request.args.get('source')
category = request.args.get('category')
date_from = request.args.get('date_from', (date.today() - timedelta(days=7)).isoformat())
date_to = request.args.get('date_to', date.today().isoformat())
limit = min(int(request.args.get('limit', 50)), 200)
offset = int(request.args.get('offset', 0))
session = get_session()
try:
query = session.query(TrendRecord).filter(
TrendRecord.trend_date >= date_from,
TrendRecord.trend_date <= date_to
)
if source:
query = query.filter(TrendRecord.source == source)
if category:
query = query.filter(TrendRecord.category == category)
total = query.count()
records = query.order_by(
TrendRecord.popularity_score.desc(),
TrendRecord.created_at.desc()
).offset(offset).limit(limit).all()
return jsonify({
'success': True,
'total': total,
'data': [
{
'id': r.id,
'source': r.source,
'source_board': r.source_board,
'title': r.title,
'content': r.content[:200] if r.content else None,
'author': r.author,
'popularity_score': r.popularity_score,
'category': r.category,
'published_at': r.published_at.isoformat() if r.published_at else None,
'source_url': r.source_url,
'sentiment': r.sentiment,
}
for r in records
]
})
finally:
session.close()
@trend_bp.route('/api/trends/keywords', methods=['GET'])
@login_required
@permission_required('report.trends.view')
def get_trend_keywords():
"""
取得熱門關鍵字
Query params:
source: 來源篩選
category: 分類篩選
days: 天數範圍 (預設 7)
limit: 筆數限制 (預設 30)
"""
source = request.args.get('source')
category = request.args.get('category')
days = int(request.args.get('days', 7))
limit = min(int(request.args.get('limit', 30)), 100)
date_from = date.today() - timedelta(days=days)
session = get_session()
try:
query = session.query(
TrendKeyword.keyword,
func.sum(TrendKeyword.mention_count).label('total_mentions'),
func.count(TrendKeyword.source.distinct()).label('source_count')
).filter(
TrendKeyword.trend_date >= date_from
).group_by(TrendKeyword.keyword)
if source:
query = query.filter(TrendKeyword.source == source)
if category:
query = query.filter(TrendKeyword.category == category)
keywords = query.order_by(
text('total_mentions DESC')
).limit(limit).all()
return jsonify({
'success': True,
'date_range': {
'from': date_from.isoformat(),
'to': date.today().isoformat()
},
'data': [
{
'keyword': kw.keyword,
'total_mentions': kw.total_mentions,
'source_count': kw.source_count
}
for kw in keywords
]
})
finally:
session.close()
@trend_bp.route('/api/trends/analysis', methods=['GET'])
@login_required
@permission_required('report.trends.view')
def get_trend_analysis():
"""
取得 AI 趨勢分析報告
Query params:
date: 分析日期 (預設今日)
category: 分類篩選
type: 分析類型 (daily_summary, weekly_trend, hot_topic)
"""
analysis_date = request.args.get('date', date.today().isoformat())
category = request.args.get('category')
analysis_type = request.args.get('type', 'daily_summary')
session = get_session()
try:
query = session.query(TrendAnalysis).filter(
TrendAnalysis.analysis_date == analysis_date,
TrendAnalysis.analysis_type == analysis_type
)
if category:
query = query.filter(TrendAnalysis.category == category)
else:
query = query.filter(TrendAnalysis.category.is_(None))
analysis = query.first()
if not analysis:
return jsonify({
'success': False,
'message': '找不到分析報告'
}), 404
return jsonify({
'success': True,
'data': {
'analysis_date': analysis.analysis_date.isoformat(),
'category': analysis.category,
'analysis_type': analysis.analysis_type,
'summary': analysis.summary,
'hot_keywords': json.loads(analysis.hot_keywords or '[]'),
'hot_topics': json.loads(analysis.hot_topics or '[]'),
'consumer_insights': json.loads(analysis.consumer_insights or '[]'),
'marketing_suggestions': json.loads(analysis.marketing_suggestions or '[]'),
'copywriting_hints': json.loads(analysis.copywriting_hints or '[]'),
'record_count': analysis.record_count,
'model_used': analysis.model_used,
'created_at': analysis.created_at.isoformat()
}
})
finally:
session.close()
@trend_bp.route('/api/trends/crawl', methods=['POST'])
@login_required
@permission_required('system.crawler.manage')
def trigger_trend_crawl():
"""手動觸發趨勢爬取"""
data = request.get_json() or {}
sources = data.get('sources') # 可選,指定來源
categories = data.get('categories') # 可選,指定分類
analyze = data.get('analyze', True) # 是否進行 AI 分析
try:
service = TrendCrawlerService()
result = service.crawl_and_save_all(
sources=sources,
categories=categories,
analyze=analyze
)
return jsonify({
'success': True,
'message': f"爬取完成: 新增 {result['new_records']} 筆記錄",
'result': result
})
except Exception as e:
return jsonify({
'success': False,
'message': f"爬取失敗: {str(e)}"
}), 500
@trend_bp.route('/api/trends/stats', methods=['GET'])
@login_required
@permission_required('report.trends.view')
def get_trend_stats():
"""取得趨勢資料統計"""
days = int(request.args.get('days', 7))
date_from = date.today() - timedelta(days=days)
session = get_session()
try:
# 各來源統計
source_stats = session.query(
TrendRecord.source,
func.count(TrendRecord.id).label('count'),
func.avg(TrendRecord.popularity_score).label('avg_popularity')
).filter(
TrendRecord.trend_date >= date_from
).group_by(TrendRecord.source).all()
# 各分類統計
category_stats = session.query(
TrendRecord.category,
func.count(TrendRecord.id).label('count')
).filter(
TrendRecord.trend_date >= date_from,
TrendRecord.category.isnot(None)
).group_by(TrendRecord.category).all()
# 每日趨勢量
daily_counts = session.query(
TrendRecord.trend_date,
func.count(TrendRecord.id).label('count')
).filter(
TrendRecord.trend_date >= date_from
).group_by(TrendRecord.trend_date).order_by(TrendRecord.trend_date).all()
return jsonify({
'success': True,
'date_range': {
'from': date_from.isoformat(),
'to': date.today().isoformat()
},
'source_stats': [
{
'source': s.source,
'count': s.count,
'avg_popularity': round(s.avg_popularity or 0, 1)
}
for s in source_stats
],
'category_stats': [
{'category': c.category, 'count': c.count}
for c in category_stats
],
'daily_counts': [
{'date': d.trend_date.isoformat(), 'count': d.count}
for d in daily_counts
]
})
finally:
session.close()
```
---
## 六、排程設定
### 6.1 app.py 新增排程任務
```python
# app.py 中新增
def run_trend_crawler_task():
"""趨勢爬蟲排程任務"""
from services.trend_crawler_service import TrendCrawlerService
logger.info("[TrendCrawler] 開始執行趨勢爬取任務")
start_time = time.time()
try:
service = TrendCrawlerService()
result = service.crawl_and_save_all(analyze=True)
duration = time.time() - start_time
logger.info(f"[TrendCrawler] 完成: 新增 {result['new_records']} 筆, 耗時 {duration:.1f}s")
# 儲存統計
_save_stats('trend_crawler_task', {
'status': 'success',
'new_records': result['new_records'],
'keywords_extracted': result['keywords_extracted'],
'duration': round(duration, 2),
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
})
except Exception as e:
logger.error(f"[TrendCrawler] 失敗: {e}")
_save_stats('trend_crawler_task', {
'status': 'failed',
'error': str(e),
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
})
def init_scheduler():
"""初始化排程"""
# ... 現有排程 ...
# 趨勢爬蟲: 每 2 小時執行一次
schedule.every(2).hours.do(run_trend_crawler_task)
# 或指定時間執行 (上班時間)
# schedule.every().day.at("09:00").do(run_trend_crawler_task)
# schedule.every().day.at("12:00").do(run_trend_crawler_task)
# schedule.every().day.at("15:00").do(run_trend_crawler_task)
# schedule.every().day.at("18:00").do(run_trend_crawler_task)
```
---
## 七、前端介面設計
### 7.1 在 ai_recommend.html 新增趨勢區塊
```html
<!-- 趨勢資料區塊 -->
<div class="card mb-4">
<div class="card-header d-flex justify-content-between align-items-center">
<h5 class="mb-0"><i class="fas fa-chart-line me-2"></i>即時趨勢洞察</h5>
<div>
<button class="btn btn-sm btn-outline-primary" onclick="refreshTrends()">
<i class="fas fa-sync-alt"></i> 更新
</button>
<button class="btn btn-sm btn-primary ms-2" onclick="triggerTrendCrawl()">
<i class="fas fa-spider"></i> 爬取最新
</button>
</div>
</div>
<div class="card-body">
<!-- 快速篩選 -->
<div class="row mb-3">
<div class="col-md-3">
<select id="trendSource" class="form-select" onchange="refreshTrends()">
<option value="">全部來源</option>
<option value="google_news">Google 新聞</option>
<option value="ptt">PTT</option>
<option value="dcard">Dcard</option>
<option value="youtube">YouTube</option>
</select>
</div>
<div class="col-md-3">
<select id="trendCategory" class="form-select" onchange="refreshTrends()">
<option value="">全部分類</option>
<option value="美妝">美妝</option>
<option value="3C">3C</option>
<option value="服飾">服飾</option>
<option value="居家">居家</option>
<option value="電商">電商</option>
</select>
</div>
<div class="col-md-3">
<select id="trendDays" class="form-select" onchange="refreshTrends()">
<option value="1">今日</option>
<option value="3">近 3 天</option>
<option value="7" selected>近 7 天</option>
<option value="14">近 14 天</option>
</select>
</div>
</div>
<!-- 統計概覽 -->
<div class="row mb-4" id="trendStats">
<!-- 由 JS 動態填充 -->
</div>
<!-- AI 分析摘要 -->
<div class="card bg-light mb-4" id="trendAnalysisCard" style="display:none;">
<div class="card-body">
<h6 class="card-title"><i class="fas fa-robot text-primary me-2"></i>AI 趨勢分析</h6>
<p id="trendAnalysisSummary" class="mb-3"></p>
<div class="row">
<div class="col-md-6">
<strong>熱門關鍵字:</strong>
<div id="trendHotKeywords" class="mt-2"></div>
</div>
<div class="col-md-6">
<strong>文案提示:</strong>
<ul id="trendCopywritingHints" class="mt-2 ps-3"></ul>
</div>
</div>
</div>
</div>
<!-- 熱門關鍵字標籤雲 -->
<div class="mb-4">
<h6><i class="fas fa-tags me-2"></i>熱門關鍵字</h6>
<div id="keywordCloud" class="d-flex flex-wrap gap-2">
<!-- 由 JS 動態填充 -->
</div>
</div>
<!-- 趨勢列表 -->
<div class="table-responsive">
<table class="table table-hover" id="trendTable">
<thead>
<tr>
<th>來源</th>
<th>標題</th>
<th>分類</th>
<th>熱度</th>
<th>時間</th>
<th>操作</th>
</tr>
</thead>
<tbody id="trendTableBody">
<!-- 由 JS 動態填充 -->
</tbody>
</table>
</div>
</div>
</div>
```
### 7.2 JavaScript 函數
```javascript
// 趨勢相關函數
async function refreshTrends() {
const source = document.getElementById('trendSource').value;
const category = document.getElementById('trendCategory').value;
const days = document.getElementById('trendDays').value;
try {
// 並行載入資料
const [recordsRes, keywordsRes, statsRes, analysisRes] = await Promise.all([
fetch(`/api/trends/records?source=${source}&category=${category}&days=${days}&limit=20`),
fetch(`/api/trends/keywords?source=${source}&category=${category}&days=${days}&limit=30`),
fetch(`/api/trends/stats?days=${days}`),
fetch(`/api/trends/analysis`)
]);
const records = await recordsRes.json();
const keywords = await keywordsRes.json();
const stats = await statsRes.json();
const analysis = await analysisRes.json();
// 更新統計
if (stats.success) {
renderTrendStats(stats);
}
// 更新關鍵字雲
if (keywords.success) {
renderKeywordCloud(keywords.data);
}
// 更新趨勢列表
if (records.success) {
renderTrendTable(records.data);
}
// 更新 AI 分析
if (analysis.success) {
renderTrendAnalysis(analysis.data);
}
} catch (error) {
console.error('載入趨勢資料失敗:', error);
}
}
function renderTrendStats(stats) {
const html = `
<div class="col-md-3">
<div class="border rounded p-3 text-center">
<h4 class="mb-0">${stats.source_stats.reduce((a, b) => a + b.count, 0)}</h4>
<small class="text-muted">總趨勢數</small>
</div>
</div>
${stats.source_stats.slice(0, 3).map(s => `
<div class="col-md-3">
<div class="border rounded p-3 text-center">
<h4 class="mb-0">${s.count}</h4>
<small class="text-muted">${getSourceName(s.source)}</small>
</div>
</div>
`).join('')}
`;
document.getElementById('trendStats').innerHTML = html;
}
function renderKeywordCloud(keywords) {
const html = keywords.map(kw => {
const size = Math.min(Math.max(kw.total_mentions * 2, 12), 24);
return `
<span class="badge bg-primary"
style="font-size: ${size}px; cursor: pointer;"
onclick="addKeywordToCopy('${kw.keyword}')">
${kw.keyword} (${kw.total_mentions})
</span>
`;
}).join('');
document.getElementById('keywordCloud').innerHTML = html;
}
function renderTrendTable(records) {
const tbody = document.getElementById('trendTableBody');
tbody.innerHTML = records.map(r => `
<tr>
<td><span class="badge ${getSourceBadgeClass(r.source)}">${getSourceName(r.source)}</span></td>
<td>
<a href="${r.source_url}" target="_blank" class="text-decoration-none">
${r.title.substring(0, 50)}${r.title.length > 50 ? '...' : ''}
</a>
</td>
<td>${r.category || '-'}</td>
<td><span class="badge bg-warning text-dark">${r.popularity_score}</span></td>
<td><small>${formatDate(r.published_at)}</small></td>
<td>
<button class="btn btn-sm btn-outline-primary" onclick="useTrendForCopy('${encodeURIComponent(r.title)}')">
<i class="fas fa-copy"></i>
</button>
</td>
</tr>
`).join('');
}
function renderTrendAnalysis(data) {
const card = document.getElementById('trendAnalysisCard');
card.style.display = 'block';
document.getElementById('trendAnalysisSummary').textContent = data.summary;
// 熱門關鍵字
const keywordsHtml = (data.hot_keywords || []).map(kw =>
`<span class="badge bg-info me-1 mb-1" style="cursor:pointer" onclick="addKeywordToCopy('${kw}')">${kw}</span>`
).join('');
document.getElementById('trendHotKeywords').innerHTML = keywordsHtml;
// 文案提示
const hintsHtml = (data.copywriting_hints || []).map(hint =>
`<li>${hint}</li>`
).join('');
document.getElementById('trendCopywritingHints').innerHTML = hintsHtml;
}
function getSourceName(source) {
const names = {
'google_news': 'Google 新聞',
'ptt': 'PTT',
'dcard': 'Dcard',
'youtube': 'YouTube'
};
return names[source] || source;
}
function getSourceBadgeClass(source) {
const classes = {
'google_news': 'bg-danger',
'ptt': 'bg-primary',
'dcard': 'bg-success',
'youtube': 'bg-warning text-dark'
};
return classes[source] || 'bg-secondary';
}
function addKeywordToCopy(keyword) {
// 將關鍵字加入文案生成輸入框
const input = document.getElementById('aiKeywordInput');
if (input) {
const current = input.value.trim();
input.value = current ? `${current}, ${keyword}` : keyword;
}
}
function useTrendForCopy(encodedTitle) {
const title = decodeURIComponent(encodedTitle);
const input = document.getElementById('aiProductInput');
if (input) {
input.value = title;
}
}
async function triggerTrendCrawl() {
if (!confirm('確定要立即爬取最新趨勢資料?')) return;
try {
const response = await fetch('/api/trends/crawl', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-CSRFToken': getCSRFToken()
},
body: JSON.stringify({ analyze: true })
});
const result = await response.json();
if (result.success) {
alert(`爬取完成!新增 ${result.result.new_records} 筆記錄`);
refreshTrends();
} else {
alert('爬取失敗: ' + result.message);
}
} catch (error) {
alert('爬取失敗: ' + error.message);
}
}
// 頁面載入時初始化
document.addEventListener('DOMContentLoaded', function() {
refreshTrends();
});
```
---
## 八、新增檔案清單
| 檔案 | 類型 | 說明 |
|------|------|------|
| `database/trend_models.py` | 新增 | 趨勢資料表模型 |
| `services/trend_crawler_service.py` | 新增 | 趨勢爬蟲服務 (含 DB 儲存) |
| `routes/trend_routes.py` | 新增 | 趨勢 API 端點 |
---
## 九、修改檔案清單
| 檔案 | 修改內容 |
|------|---------|
| `database/manager.py` | 新增 trend 表初始化 |
| `app.py` | 註冊 trend_bp、新增排程任務 |
| `templates/ai_recommend.html` | 新增趨勢資料區塊 |
| `config.py` | 確認 Ollama 伺服器設定 (192.168.0.188) |
---
## 十、權限設定
`services/permission_service.py` 新增:
```python
# 趨勢相關權限
'report.trends.view': {
'name': '查看趨勢資料',
'category': '報表',
'description': '訪問趨勢資料頁面和 API'
},
'report.trends.crawl': {
'name': '觸發趨勢爬取',
'category': '系統',
'description': '手動觸發趨勢爬蟲'
},
```
---
## 十一、實作步驟
### Phase 1: 資料庫模型
1. 建立 `database/trend_models.py`
2. 更新 `database/manager.py` 初始化表結構
### Phase 2: 服務層
1. 建立 `services/trend_crawler_service.py`
2. 整合 Ollama 分析功能
### Phase 3: API 端點
1. 建立 `routes/trend_routes.py`
2.`app.py` 註冊 Blueprint
### Phase 4: 排程任務
1.`app.py` 新增 `run_trend_crawler_task`
2. 設定排程 (每 2 小時)
### Phase 5: 前端介面
1. 更新 `ai_recommend.html` 新增趨勢區塊
2. 新增 JavaScript 函數
### Phase 6: 測試
1. 手動測試 API 端點
2. 驗證排程任務
3. 確認前端顯示正常
---
## 十二、Ollama 伺服器整合
### 伺服器資訊
- **IP**: 192.168.0.188
- **Port**: 11434
- **Model**: gemma3:4b
- **API Key**: `<OLLAMA_API_KEY>`
### 網路設定確認
```bash
# 測試連線
curl http://192.168.0.188:11434/api/tags
# 測試生成
curl http://192.168.0.188:11434/api/generate \
-d '{"model": "gemma3:4b", "prompt": "Hello", "stream": false}'
```
### config.py 設定
```python
# Ollama 伺服器設定
OLLAMA_BASE_URL = os.getenv('OLLAMA_BASE_URL', 'http://192.168.0.188:11434')
OLLAMA_MODEL = os.getenv('OLLAMA_MODEL', 'gemma3:4b')
OLLAMA_API_KEY = os.getenv('OLLAMA_API_KEY', '<OLLAMA_API_KEY>')
OLLAMA_TIMEOUT = int(os.getenv('OLLAMA_TIMEOUT', '120'))
```
---
## 十三、Ollama Web Search 整合
### 13.1 Web Search 快取資料表
新增 `web_search_cache` 表以減少重複查詢:
```python
class WebSearchCache(Base):
"""Web Search 結果快取 - 避免重複查詢"""
__tablename__ = 'web_search_cache'
id = Column(Integer, primary_key=True)
# 查詢識別
query_hash = Column(String(64), nullable=False, unique=True, index=True)
# MD5(query + search_type)
query = Column(String(500), nullable=False)
# 原始查詢字串
search_type = Column(String(50), default='general')
# 搜尋類型: general, news, shopping, trends
# 結果
result_json = Column(Text, nullable=False)
# JSON 格式的完整結果
summary = Column(Text)
# AI 生成的摘要
result_count = Column(Integer, default=0)
# 結果數量
# 元資料
model_used = Column(String(50))
generation_time = Column(Float)
# 時間
created_at = Column(DateTime, default=datetime.now, index=True)
expires_at = Column(DateTime)
# 快取過期時間 (預設 24 小時)
__table_args__ = (
Index('idx_cache_query_type', 'query', 'search_type'),
Index('idx_cache_expires', 'expires_at'),
)
```
### 13.2 Web Search 服務擴展
```python
# services/ollama_service.py 擴展
class OllamaService:
# ... 現有程式碼 ...
def web_search_with_cache(self, query: str, search_type: str = 'general',
cache_hours: int = 24) -> OllamaResponse:
"""
帶快取的 Web Search
Args:
query: 搜尋查詢
search_type: 搜尋類型
cache_hours: 快取時間 (小時)
Returns:
OllamaResponse
"""
from database.trend_models import WebSearchCache
from database.manager import get_session
# 計算查詢雜湊
query_hash = hashlib.md5(f"{query}:{search_type}".encode()).hexdigest()
session = get_session()
try:
# 檢查快取
cache = session.query(WebSearchCache).filter(
WebSearchCache.query_hash == query_hash,
WebSearchCache.expires_at > datetime.now()
).first()
if cache:
# 命中快取
return OllamaResponse(
success=True,
content=cache.result_json,
model=cache.model_used,
duration=0.0 # 快取無延遲
)
# 執行搜尋
response = self.web_search(query, search_type=search_type)
if response.success:
# 儲存快取
new_cache = WebSearchCache(
query_hash=query_hash,
query=query,
search_type=search_type,
result_json=response.content,
summary=self._extract_summary(response.content),
model_used=response.model,
generation_time=response.duration,
expires_at=datetime.now() + timedelta(hours=cache_hours)
)
session.add(new_cache)
session.commit()
return response
finally:
session.close()
def search_trends_for_category(self, category: str) -> dict:
"""
搜尋特定分類的趨勢資訊
用於趨勢爬蟲的補充資料來源
"""
queries = {
'美妝': ['2026美妝趨勢', 'PTT美妝推薦', '韓系彩妝新品'],
'3C': ['2026 3C新品', 'PTT 3C開箱', '科技產品評測'],
'服飾': ['2026服飾流行', '穿搭趨勢', 'Dcard穿搭版'],
'居家': ['居家好物推薦', '收納神器', '家電開箱'],
'電商': ['電商優惠活動', '購物節', '限時特賣'],
}
category_queries = queries.get(category, [f'{category}趨勢'])
results = []
for q in category_queries:
response = self.web_search_with_cache(q, search_type='trends')
if response.success:
try:
data = json.loads(response.content)
results.append({
'query': q,
'results': data.get('results', []),
'keywords': data.get('keywords', [])
})
except:
pass
return {
'category': category,
'search_results': results,
'timestamp': datetime.now().isoformat()
}
```
### 13.3 趨勢 API 新增 Web Search 端點
```python
# routes/trend_routes.py 新增
@trend_bp.route('/api/trends/web_search', methods=['POST'])
@login_required
@permission_required('report.trends.view')
def trend_web_search():
"""
使用 Ollama Web Search 搜尋趨勢
Request body:
query: 搜尋查詢
search_type: 搜尋類型 (general, news, shopping, trends)
use_cache: 是否使用快取 (預設 true)
"""
data = request.get_json() or {}
query = data.get('query', '').strip()
search_type = data.get('search_type', 'trends')
use_cache = data.get('use_cache', True)
if not query:
return jsonify({'success': False, 'message': '請輸入搜尋關鍵字'}), 400
try:
ollama = OllamaService()
if use_cache:
response = ollama.web_search_with_cache(query, search_type)
else:
response = ollama.web_search(query, search_type=search_type)
if response.success:
try:
parsed = json.loads(response.content)
except:
parsed = {'raw': response.content}
return jsonify({
'success': True,
'data': {
'query': query,
'search_type': search_type,
'results': parsed,
'model': response.model,
'duration': response.duration
}
})
else:
return jsonify({
'success': False,
'message': response.error or '搜尋失敗'
}), 500
except Exception as e:
return jsonify({
'success': False,
'message': f'搜尋失敗: {str(e)}'
}), 500
@trend_bp.route('/api/trends/category_insights', methods=['GET'])
@login_required
@permission_required('report.trends.view')
def get_category_insights():
"""
取得分類趨勢洞察 (結合 Web Search)
Query params:
category: 商品分類
"""
category = request.args.get('category', '美妝')
try:
ollama = OllamaService()
insights = ollama.search_trends_for_category(category)
return jsonify({
'success': True,
'data': insights
})
except Exception as e:
return jsonify({
'success': False,
'message': str(e)
}), 500
```
---
## 十四、Telegram Bot 整合
### 14.1 設計目標
將趨勢資料庫系統整合至 Telegram讓團隊成員可以透過手機隨時
- 查詢熱門趨勢關鍵字
- 執行 AI 網路搜尋
- 生成行銷文案
- 接收趨勢推播通知
### 14.2 Bot 資訊設定
```python
# config.py 新增
TELEGRAM_BOT_CONFIG = {
'token': os.getenv('TELEGRAM_BOT_TOKEN'), # 從 BotFather 取得
'webhook_url': os.getenv('TELEGRAM_WEBHOOK_URL'), # 公開可存取的 URL
'allowed_users': os.getenv('TELEGRAM_ALLOWED_USERS', '').split(','), # 允許的用戶 ID
'admin_chat_id': os.getenv('TELEGRAM_ADMIN_CHAT_ID'), # 管理員聊天室 ID
}
```
### 14.3 新增資料表: telegram_users
```python
class TelegramUser(Base):
"""Telegram 用戶綁定表"""
__tablename__ = 'telegram_users'
id = Column(Integer, primary_key=True)
telegram_id = Column(BigInteger, unique=True, nullable=False, index=True)
# Telegram 用戶 ID
telegram_username = Column(String(100))
# Telegram 用戶名稱
user_id = Column(Integer, ForeignKey('users.id'))
# 綁定的系統用戶 ID (可選)
display_name = Column(String(100))
# 顯示名稱
is_active = Column(Boolean, default=True)
# 是否啟用
is_admin = Column(Boolean, default=False)
# 是否為管理員
# 偏好設定
notify_trends = Column(Boolean, default=True)
# 是否接收趨勢通知
notify_daily_summary = Column(Boolean, default=True)
# 是否接收每日摘要
preferred_categories = Column(Text)
# JSON: 偏好的分類列表
created_at = Column(DateTime, default=datetime.now)
last_active_at = Column(DateTime, default=datetime.now)
# 關聯
user = relationship("User", backref="telegram_binding")
```
### 14.4 Telegram Bot 服務
```python
# services/telegram_bot_service.py
import telegram
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import Application, CommandHandler, MessageHandler, CallbackQueryHandler, filters
import asyncio
from config import TELEGRAM_BOT_CONFIG
from services.ollama_service import OllamaService
from services.logger_manager import SystemLogger
class TrendTelegramBot:
"""趨勢資料庫 Telegram Bot 服務"""
def __init__(self):
self.token = TELEGRAM_BOT_CONFIG['token']
self.logger = SystemLogger("TelegramBot")
self.ollama = OllamaService()
self.application = None
async def start(self):
"""啟動 Bot"""
self.application = Application.builder().token(self.token).build()
# 註冊指令處理器
self.application.add_handler(CommandHandler("start", self.cmd_start))
self.application.add_handler(CommandHandler("help", self.cmd_help))
self.application.add_handler(CommandHandler("trend", self.cmd_trend))
self.application.add_handler(CommandHandler("search", self.cmd_search))
self.application.add_handler(CommandHandler("copy", self.cmd_copy))
self.application.add_handler(CommandHandler("keywords", self.cmd_keywords))
self.application.add_handler(CommandHandler("daily", self.cmd_daily))
self.application.add_handler(CommandHandler("settings", self.cmd_settings))
# 回調查詢處理器 (按鈕點擊)
self.application.add_handler(CallbackQueryHandler(self.handle_callback))
# 一般訊息處理器
self.application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, self.handle_message))
# 啟動
await self.application.initialize()
await self.application.start()
await self.application.updater.start_polling()
self.logger.info("Telegram Bot 已啟動")
async def stop(self):
"""停止 Bot"""
if self.application:
await self.application.updater.stop()
await self.application.stop()
await self.application.shutdown()
# ========== 指令處理器 ==========
async def cmd_start(self, update: Update, context):
"""開始指令"""
user = update.effective_user
await update.message.reply_text(
f"👋 嗨 {user.first_name}\n\n"
f"我是 MOMO 趨勢助手 Bot可以幫你\n"
f"📊 /trend [分類] - 查看熱門趨勢\n"
f"🔍 /search [關鍵字] - AI 網路搜尋\n"
f"✍️ /copy [商品名] - AI 生成文案\n"
f"🏷️ /keywords - 查看熱門關鍵字\n"
f"📰 /daily - 每日趨勢摘要\n"
f"⚙️ /settings - 通知設定\n\n"
f"輸入 /help 查看詳細說明"
)
async def cmd_help(self, update: Update, context):
"""說明指令"""
help_text = """
📖 *指令說明*
*趨勢查詢*
/trend - 查看所有分類熱門趨勢
/trend 美妝 - 查看美妝分類趨勢
/trend 3C - 查看 3C 分類趨勢
*AI 搜尋*
/search 夏季防曬推薦 - AI 搜尋並分析
/search 母親節禮物 - 搜尋熱門話題
*文案生成*
/copy 防曬乳 - 為商品生成行銷文案
/copy 氣炸鍋 活動檔期 - 加入情境參數
*關鍵字*
/keywords - 近 7 天熱門關鍵字
/keywords 美妝 - 指定分類關鍵字
*每日摘要*
/daily - 查看今日趨勢摘要
*設定*
/settings - 管理通知偏好
"""
await update.message.reply_text(help_text, parse_mode='Markdown')
async def cmd_trend(self, update: Update, context):
"""趨勢查詢指令"""
category = ' '.join(context.args) if context.args else None
await update.message.reply_text("🔄 正在查詢趨勢資料...")
try:
from database.trend_models import TrendRecord, TrendKeyword
from database.manager import get_session
from datetime import date, timedelta
session = get_session()
date_from = date.today() - timedelta(days=7)
# 查詢熱門趨勢
query = session.query(TrendRecord).filter(
TrendRecord.trend_date >= date_from
)
if category:
query = query.filter(TrendRecord.category == category)
records = query.order_by(
TrendRecord.popularity_score.desc()
).limit(10).all()
session.close()
if not records:
await update.message.reply_text(
f"📭 {f'{category}分類' if category else ''}近 7 天沒有趨勢資料"
)
return
# 格式化回覆
title = f"📊 {category if category else '所有分類'}熱門趨勢 (近7天)\n\n"
lines = []
for i, r in enumerate(records, 1):
source_emoji = {'ptt': '💬', 'dcard': '📱', 'google_news': '📰', 'youtube': '🎬'}.get(r.source, '📄')
lines.append(f"{i}. {source_emoji} {r.title[:30]}... (熱度:{r.popularity_score})")
await update.message.reply_text(title + '\n'.join(lines))
except Exception as e:
await update.message.reply_text(f"❌ 查詢失敗: {str(e)}")
async def cmd_search(self, update: Update, context):
"""AI 搜尋指令"""
query = ' '.join(context.args)
if not query:
await update.message.reply_text("❓ 請輸入搜尋關鍵字\n範例: /search 夏季防曬推薦")
return
await update.message.reply_text(f"🔍 正在搜尋「{query}」...")
try:
response = self.ollama.web_search_with_cache(query, search_type='trends')
if response.success:
try:
data = json.loads(response.content)
summary = data.get('summary', '無摘要')
results = data.get('results', [])[:5]
reply = f"🔍 *搜尋結果: {query}*\n\n"
reply += f"📝 *摘要:*\n{summary}\n\n"
if results:
reply += "*相關資訊:*\n"
for i, r in enumerate(results, 1):
title = r.get('title', '無標題')[:40]
reply += f"{i}. {title}\n"
await update.message.reply_text(reply, parse_mode='Markdown')
except:
await update.message.reply_text(f"🔍 搜尋結果:\n{response.content[:1000]}")
else:
await update.message.reply_text(f"❌ 搜尋失敗: {response.error}")
except Exception as e:
await update.message.reply_text(f"❌ 搜尋失敗: {str(e)}")
async def cmd_copy(self, update: Update, context):
"""文案生成指令"""
if not context.args:
await update.message.reply_text("❓ 請輸入商品名稱\n範例: /copy 防曬乳")
return
product_name = context.args[0]
context_hint = ' '.join(context.args[1:]) if len(context.args) > 1 else None
await update.message.reply_text(f"✍️ 正在為「{product_name}」生成文案...")
try:
prompt = f"""
請為以下商品生成 3 種風格的行銷文案:
商品: {product_name}
{f'情境: {context_hint}' if context_hint else ''}
請分別生成:
1. 標準版 (100字內)
2. 活潑版 (含表情符號100字內)
3. 限時版 (強調緊迫感100字內)
以繁體中文回覆。
"""
response = self.ollama.generate(prompt)
if response.success:
await update.message.reply_text(
f"✨ *{product_name} 行銷文案*\n\n{response.content}",
parse_mode='Markdown'
)
else:
await update.message.reply_text(f"❌ 生成失敗: {response.error}")
except Exception as e:
await update.message.reply_text(f"❌ 生成失敗: {str(e)}")
async def cmd_keywords(self, update: Update, context):
"""熱門關鍵字指令"""
category = ' '.join(context.args) if context.args else None
try:
from database.trend_models import TrendKeyword
from database.manager import get_session
from sqlalchemy import func
from datetime import date, timedelta
session = get_session()
date_from = date.today() - timedelta(days=7)
query = session.query(
TrendKeyword.keyword,
func.sum(TrendKeyword.mention_count).label('total')
).filter(
TrendKeyword.trend_date >= date_from
).group_by(TrendKeyword.keyword)
if category:
query = query.filter(TrendKeyword.category == category)
keywords = query.order_by(func.sum(TrendKeyword.mention_count).desc()).limit(20).all()
session.close()
if not keywords:
await update.message.reply_text("📭 近 7 天沒有熱門關鍵字資料")
return
title = f"🏷️ {category if category else '所有分類'}熱門關鍵字 (近7天)\n\n"
lines = [f"{i}. {kw.keyword} ({kw.total}次)" for i, kw in enumerate(keywords, 1)]
await update.message.reply_text(title + '\n'.join(lines))
except Exception as e:
await update.message.reply_text(f"❌ 查詢失敗: {str(e)}")
async def cmd_daily(self, update: Update, context):
"""每日趨勢摘要"""
try:
from database.trend_models import TrendAnalysis
from database.manager import get_session
from datetime import date
session = get_session()
analysis = session.query(TrendAnalysis).filter(
TrendAnalysis.analysis_date == date.today(),
TrendAnalysis.analysis_type == 'daily_summary'
).first()
session.close()
if not analysis:
await update.message.reply_text("📭 今日尚無趨勢分析報告")
return
hot_keywords = json.loads(analysis.hot_keywords or '[]')
marketing = json.loads(analysis.marketing_suggestions or '[]')
reply = f"📰 *今日趨勢摘要*\n\n"
reply += f"📝 *概況:*\n{analysis.summary}\n\n"
if hot_keywords:
reply += f"🏷️ *熱門關鍵字:*\n{', '.join(hot_keywords[:10])}\n\n"
if marketing:
reply += f"💡 *行銷建議:*\n"
for i, m in enumerate(marketing[:5], 1):
reply += f"{i}. {m}\n"
await update.message.reply_text(reply, parse_mode='Markdown')
except Exception as e:
await update.message.reply_text(f"❌ 查詢失敗: {str(e)}")
async def cmd_settings(self, update: Update, context):
"""設定指令"""
keyboard = [
[InlineKeyboardButton("📊 趨勢通知 ON/OFF", callback_data="toggle_trends")],
[InlineKeyboardButton("📰 每日摘要 ON/OFF", callback_data="toggle_daily")],
[InlineKeyboardButton("🏷️ 設定偏好分類", callback_data="set_categories")],
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"⚙️ *通知設定*\n\n點擊下方按鈕調整設定",
reply_markup=reply_markup,
parse_mode='Markdown'
)
async def handle_callback(self, update: Update, context):
"""處理按鈕回調"""
query = update.callback_query
await query.answer()
if query.data == "toggle_trends":
await query.edit_message_text("✅ 趨勢通知已切換")
elif query.data == "toggle_daily":
await query.edit_message_text("✅ 每日摘要通知已切換")
elif query.data == "set_categories":
keyboard = [
[InlineKeyboardButton("美妝", callback_data="cat_美妝"),
InlineKeyboardButton("3C", callback_data="cat_3C")],
[InlineKeyboardButton("服飾", callback_data="cat_服飾"),
InlineKeyboardButton("居家", callback_data="cat_居家")],
[InlineKeyboardButton("電商", callback_data="cat_電商"),
InlineKeyboardButton("全部", callback_data="cat_all")],
]
await query.edit_message_text(
"🏷️ 選擇偏好分類 (可多選)",
reply_markup=InlineKeyboardMarkup(keyboard)
)
async def handle_message(self, update: Update, context):
"""處理一般訊息"""
text = update.message.text
# 簡單的自然語言處理
if '趨勢' in text or '熱門' in text:
await update.message.reply_text("💡 輸入 /trend 查看熱門趨勢")
elif '搜尋' in text or '查詢' in text:
await update.message.reply_text("💡 輸入 /search [關鍵字] 進行 AI 搜尋")
elif '文案' in text or '生成' in text:
await update.message.reply_text("💡 輸入 /copy [商品名] 生成文案")
else:
await update.message.reply_text("❓ 不確定您的需求,請輸入 /help 查看可用指令")
# ========== 推播功能 ==========
async def broadcast_trend_alert(self, message: str, category: str = None):
"""推播趨勢警報"""
from database.trend_models import TelegramUser
from database.manager import get_session
session = get_session()
query = session.query(TelegramUser).filter(
TelegramUser.is_active == True,
TelegramUser.notify_trends == True
)
if category:
query = query.filter(
TelegramUser.preferred_categories.like(f'%{category}%')
)
users = query.all()
session.close()
for user in users:
try:
await self.application.bot.send_message(
chat_id=user.telegram_id,
text=message,
parse_mode='Markdown'
)
except Exception as e:
self.logger.error(f"推播失敗 {user.telegram_id}: {e}")
async def send_daily_summary(self):
"""發送每日摘要"""
from database.trend_models import TrendAnalysis, TelegramUser
from database.manager import get_session
from datetime import date
session = get_session()
# 取得今日分析
analysis = session.query(TrendAnalysis).filter(
TrendAnalysis.analysis_date == date.today(),
TrendAnalysis.analysis_type == 'daily_summary'
).first()
if not analysis:
session.close()
return
# 取得要通知的用戶
users = session.query(TelegramUser).filter(
TelegramUser.is_active == True,
TelegramUser.notify_daily_summary == True
).all()
session.close()
# 格式化訊息
hot_keywords = json.loads(analysis.hot_keywords or '[]')
message = f"📰 *{date.today()} 趨勢日報*\n\n"
message += f"{analysis.summary}\n\n"
message += f"🏷️ *熱門關鍵字:* {', '.join(hot_keywords[:5])}"
# 推播
for user in users:
try:
await self.application.bot.send_message(
chat_id=user.telegram_id,
text=message,
parse_mode='Markdown'
)
except Exception as e:
self.logger.error(f"每日摘要推播失敗 {user.telegram_id}: {e}")
# 全域實例
_bot_instance = None
def get_telegram_bot() -> TrendTelegramBot:
global _bot_instance
if _bot_instance is None:
_bot_instance = TrendTelegramBot()
return _bot_instance
```
### 14.5 整合至主應用程式
```python
# app.py 新增
import asyncio
from services.telegram_bot_service import get_telegram_bot
# 在應用程式啟動時初始化 Bot
def init_telegram_bot():
"""初始化 Telegram Bot"""
if not TELEGRAM_BOT_CONFIG.get('token'):
logger.warning("Telegram Bot Token 未設定,跳過初始化")
return
bot = get_telegram_bot()
# 在背景執行 Bot
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
def run_bot():
loop.run_until_complete(bot.start())
loop.run_forever()
bot_thread = threading.Thread(target=run_bot, daemon=True)
bot_thread.start()
logger.info("Telegram Bot 已在背景啟動")
# 在排程中加入每日摘要推播
def run_daily_summary_broadcast():
"""每日摘要推播任務"""
bot = get_telegram_bot()
if bot.application:
asyncio.run(bot.send_daily_summary())
def init_scheduler():
# ... 現有排程 ...
# 每日早上 9 點推播摘要
schedule.every().day.at("09:00").do(run_daily_summary_broadcast)
```
### 14.6 環境變數設定
```bash
# .env 新增
TELEGRAM_BOT_TOKEN=your_bot_token_from_botfather
TELEGRAM_WEBHOOK_URL=https://your-domain.com/webhook/telegram
TELEGRAM_ALLOWED_USERS=123456789,987654321
TELEGRAM_ADMIN_CHAT_ID=123456789
```
### 14.7 建立 Telegram Bot 步驟
1. **在 Telegram 搜尋 @BotFather**
2. **發送 `/newbot` 建立新 Bot**
3. **設定 Bot 名稱和 username**
4. **取得 API Token**
5. **設定環境變數**
6. **重啟應用程式**
---
## 十五、新增檔案清單 (更新版)
| 檔案 | 類型 | 說明 |
|------|------|------|
| `database/trend_models.py` | 新增 | 趨勢資料表模型 (含 WebSearchCache, TelegramUser) |
| `services/trend_crawler_service.py` | 新增 | 趨勢爬蟲服務 (含 DB 儲存) |
| `services/telegram_bot_service.py` | 新增 | Telegram Bot 服務 |
| `routes/trend_routes.py` | 新增 | 趨勢 API 端點 (含 Web Search API) |
---
## 十六、修改檔案清單 (更新版)
| 檔案 | 修改內容 |
|------|---------|
| `database/manager.py` | 新增 trend 表、web_search_cache 表、telegram_users 表初始化 |
| `services/ollama_service.py` | 新增 `web_search_with_cache``search_trends_for_category` 方法 |
| `app.py` | 註冊 trend_bp、新增排程任務、初始化 Telegram Bot |
| `templates/ai_recommend.html` | 新增趨勢資料區塊、Web Search UI |
| `config.py` | 新增 Telegram Bot 設定 |
| `.env.example` | 新增 Telegram 相關環境變數 |
---
## 十七、權限設定 (更新版)
```python
# 趨勢相關權限
'report.trends.view': {
'name': '查看趨勢資料',
'category': '報表',
'description': '訪問趨勢資料頁面和 API'
},
'report.trends.crawl': {
'name': '觸發趨勢爬取',
'category': '系統',
'description': '手動觸發趨勢爬蟲'
},
'report.trends.web_search': {
'name': '使用 AI 搜尋',
'category': '報表',
'description': '使用 Ollama Web Search 功能'
},
'system.telegram.manage': {
'name': '管理 Telegram Bot',
'category': '系統',
'description': '管理 Telegram Bot 設定和用戶'
},
```
---
## 十八、實作步驟 (更新版)
### Phase 1: 資料庫模型
1. 建立 `database/trend_models.py` (含 WebSearchCache, TelegramUser)
2. 更新 `database/manager.py` 初始化表結構
### Phase 2: 服務層
1. 建立 `services/trend_crawler_service.py`
2. 更新 `services/ollama_service.py` 新增 Web Search 快取方法
3. 建立 `services/telegram_bot_service.py`
### Phase 3: API 端點
1. 建立 `routes/trend_routes.py` (含 Web Search API)
2.`app.py` 註冊 Blueprint
### Phase 4: 排程任務
1.`app.py` 新增 `run_trend_crawler_task`
2. 新增 `run_daily_summary_broadcast`
3. 設定排程
### Phase 5: Telegram Bot
1. 向 BotFather 註冊 Bot
2. 設定環境變數
3. 初始化 Bot 服務
### Phase 6: 前端介面
1. 更新 `ai_recommend.html` 新增趨勢區塊
2. 新增 Web Search UI 組件
3. 新增 JavaScript 函數
### Phase 7: 測試
1. 手動測試 API 端點
2. 驗證 Web Search 快取
3. 測試 Telegram Bot 指令
4. 確認排程任務和推播
---
*文件結束*