Files
ewoooc/services/trend_crawler_service.py
ogt 1b4f3a7bbe
Some checks failed
CD Pipeline / deploy (push) Failing after 59s
feat: EwoooC 初始化 — 完整專案推版至 Gitea
- 建立 Gitea Actions CD pipeline (.gitea/workflows/cd.yaml)
- 部署模式: rsync Python 檔案至 188 → docker restart (volume mount)
- Dockerfile/requirements 變動時自動重建 Docker image
- 部署通知: Telegram (開始/成功/失敗)
- 健康檢查: https://mo.wooo.work/health (最多 5 次重試)
- 同步最新 CLAUDE.md / ADR-008 / memory (2026-04-19)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-19 01:21:13 +08:00

628 lines
23 KiB
Python

"""
趨勢爬蟲服務
負責:
- 爬取各來源趨勢資料 (Google News, PTT, Dcard, YouTube)
- 儲存到資料庫
- 萃取關鍵字
- 呼叫 Ollama 進行 AI 分析
"""
import hashlib
import json
import logging
from collections import defaultdict
from datetime import datetime, date, timedelta
from typing import List, Dict, Optional, Tuple
from database.trend_models import (
TrendRecord, TrendKeyword, TrendAnalysis, WebSearchCache,
get_category_for_board, PTT_BOARDS, DCARD_BOARDS
)
from database.manager import get_session
from services.ollama_service import OllamaService
from services.trend_crawler import TrendCrawler, TrendData, NewsItem, SocialPost, YouTubeVideo
# 設定 logging
logger = logging.getLogger(__name__)
class TrendCrawlerService:
"""趨勢爬蟲服務 - 含資料庫儲存"""
def __init__(self):
self.crawler = TrendCrawler()
self.ollama = OllamaService()
self.logger = logger
def crawl_and_save_all(self,
sources: List[str] = None,
categories: List[str] = None,
analyze: bool = True) -> dict:
"""
爬取所有來源並儲存到資料庫
Args:
sources: 指定來源 ['google_news', 'ptt', 'dcard', 'youtube']
categories: 指定分類
analyze: 是否進行 AI 分析
Returns:
dict: 爬取結果統計
"""
results = {
'total_records': 0,
'new_records': 0,
'keywords_extracted': 0,
'sources': {},
'errors': []
}
session = get_session()
try:
# 1. 爬取各來源資料
self.logger.info("開始爬取趨勢資料...")
trend_data = self.crawler.get_all_trends(
categories=categories,
include_social=True
)
# 2. 儲存新聞資料
if not sources or 'google_news' in sources:
try:
news_result = self._save_news_records(session, trend_data.news_items)
results['sources']['google_news'] = news_result
results['total_records'] += news_result['total']
results['new_records'] += news_result['new']
except Exception as e:
self.logger.error(f"儲存新聞資料失敗: {e}")
results['errors'].append(f"google_news: {str(e)}")
# 3. 儲存社群資料 (PTT/Dcard)
if not sources or 'ptt' in sources or 'dcard' in sources:
try:
social_result = self._save_social_records(session, trend_data.social_posts)
results['sources']['social'] = social_result
results['total_records'] += social_result['total']
results['new_records'] += social_result['new']
except Exception as e:
self.logger.error(f"儲存社群資料失敗: {e}")
results['errors'].append(f"social: {str(e)}")
# 4. 儲存 YouTube 資料
if not sources or 'youtube' in sources:
try:
youtube_result = self._save_youtube_records(session, trend_data.youtube_videos)
results['sources']['youtube'] = youtube_result
results['total_records'] += youtube_result['total']
results['new_records'] += youtube_result['new']
except Exception as e:
self.logger.error(f"儲存 YouTube 資料失敗: {e}")
results['errors'].append(f"youtube: {str(e)}")
# 5. 萃取關鍵字
try:
keywords_count = self._extract_and_save_keywords(session)
results['keywords_extracted'] = keywords_count
except Exception as e:
self.logger.error(f"萃取關鍵字失敗: {e}")
results['errors'].append(f"keywords: {str(e)}")
session.commit()
# 6. AI 分析 (commit 後執行)
if analyze and results['new_records'] > 0:
try:
self._generate_ai_analysis(session)
except Exception as e:
self.logger.error(f"AI 分析失敗: {e}")
results['errors'].append(f"ai_analysis: {str(e)}")
self.logger.info(f"趨勢爬取完成: {results}")
return results
except Exception as e:
session.rollback()
self.logger.error(f"趨勢爬取失敗: {e}")
raise
finally:
session.close()
def _save_news_records(self, session, news_items: List[NewsItem]) -> dict:
"""儲存新聞記錄"""
total = len(news_items)
new_count = 0
for news in news_items:
try:
# 產生唯一識別碼
source_id = hashlib.md5(news.link.encode(), usedforsecurity=False).hexdigest() if news.link else None
if not source_id:
continue
# 檢查是否已存在
exists = session.query(TrendRecord).filter(
TrendRecord.source == 'google_news',
TrendRecord.source_id == source_id
).first()
if not exists:
record = TrendRecord(
source='google_news',
source_url=news.link,
source_id=source_id,
title=news.title,
content=news.summary,
author=news.source,
category=news.category,
published_at=news.published,
trend_date=news.published.date() if news.published else date.today(),
)
session.add(record)
new_count += 1
except Exception as e:
self.logger.warning(f"儲存新聞記錄失敗: {e}")
return {'total': total, 'new': new_count}
def _save_social_records(self, session, social_posts: List[SocialPost]) -> dict:
"""儲存社群記錄 (PTT/Dcard)"""
total = len(social_posts)
new_count = 0
for post in social_posts:
try:
source = post.platform.lower() if post.platform else 'unknown'
source_id = post.post_id or hashlib.md5(
f"{source}:{post.title}:{post.created_at}".encode(), usedforsecurity=False
).hexdigest()
exists = session.query(TrendRecord).filter(
TrendRecord.source == source,
TrendRecord.source_id == source_id
).first()
if not exists:
record = TrendRecord(
source=source,
source_board=post.board,
source_url=post.url,
source_id=source_id,
title=post.title,
content=post.content,
author=post.author,
popularity_score=post.push_count or post.like_count or 0,
comment_count=post.comment_count or 0,
category=get_category_for_board(post.board) if post.board else '其他',
published_at=post.created_at,
trend_date=post.created_at.date() if post.created_at else date.today(),
)
session.add(record)
new_count += 1
except Exception as e:
self.logger.warning(f"儲存社群記錄失敗: {e}")
return {'total': total, 'new': new_count}
def _save_youtube_records(self, session, youtube_videos: List[YouTubeVideo]) -> dict:
"""儲存 YouTube 記錄"""
total = len(youtube_videos)
new_count = 0
for video in youtube_videos:
try:
source_id = video.video_id or hashlib.md5(video.url.encode(), usedforsecurity=False).hexdigest() if video.url else None
if not source_id:
continue
exists = session.query(TrendRecord).filter(
TrendRecord.source == 'youtube',
TrendRecord.source_id == source_id
).first()
if not exists:
record = TrendRecord(
source='youtube',
source_url=video.url,
source_id=source_id,
title=video.title,
content=video.description,
author=video.channel,
popularity_score=video.view_count or 0,
category=video.category,
published_at=video.published_at,
trend_date=video.published_at.date() if video.published_at else date.today(),
)
session.add(record)
new_count += 1
except Exception as e:
self.logger.warning(f"儲存 YouTube 記錄失敗: {e}")
return {'total': total, 'new': new_count}
def _extract_and_save_keywords(self, session) -> int:
"""從今日記錄中萃取關鍵字"""
today = date.today()
# 取得今日所有記錄
records = session.query(TrendRecord).filter(
TrendRecord.trend_date == today
).all()
if not records:
return 0
# 使用簡單的關鍵字萃取 (可以後續整合 jieba)
keyword_counts = defaultdict(lambda: {'count': 0, 'sources': set(), 'categories': set()})
for record in records:
text = f"{record.title} {record.content or ''}"
keywords = self._extract_keywords_simple(text)
for kw in keywords:
keyword_counts[kw]['count'] += 1
keyword_counts[kw]['sources'].add(record.source)
if record.category:
keyword_counts[kw]['categories'].add(record.category)
# 儲存關鍵字 (只保留出現 2 次以上的)
saved_count = 0
for keyword, data in keyword_counts.items():
if data['count'] >= 2 and len(keyword) >= 2:
for source in data['sources']:
try:
existing = session.query(TrendKeyword).filter(
TrendKeyword.keyword == keyword,
TrendKeyword.source == source,
TrendKeyword.trend_date == today
).first()
category = list(data['categories'])[0] if data['categories'] else None
if existing:
existing.mention_count = data['count']
existing.category = category
else:
kw_record = TrendKeyword(
keyword=keyword,
source=source,
mention_count=data['count'],
trend_date=today,
category=category
)
session.add(kw_record)
saved_count += 1
except Exception as e:
self.logger.warning(f"儲存關鍵字失敗: {e}")
return saved_count
def _extract_keywords_simple(self, text: str) -> List[str]:
"""簡單的關鍵字萃取 (可以後續用 jieba 替換)"""
import re
# 移除特殊字元
text = re.sub(r'[^\w\s\u4e00-\u9fff]', ' ', text)
# 定義常見的停用詞
stop_words = {
'', '', '', '', '', '', '', '', '', '', '', '',
'一個', '', '', '', '', '', '', '', '', '', '',
'沒有', '', '', '自己', '', '', '什麼', '如果', '因為', '所以',
'the', 'a', 'an', 'is', 'are', 'was', 'were', 'be', 'been', 'being',
}
# 分割並過濾
words = text.split()
keywords = []
for word in words:
word = word.strip()
if len(word) >= 2 and word.lower() not in stop_words:
# 檢查是否是中文詞彙或有意義的英文
if re.match(r'^[\u4e00-\u9fff]+$', word) and len(word) >= 2:
keywords.append(word)
elif re.match(r'^[a-zA-Z]+$', word) and len(word) >= 3:
keywords.append(word.lower())
return keywords
def _generate_ai_analysis(self, session) -> None:
"""使用 Ollama 生成趨勢分析"""
today = date.today()
# 檢查今日是否已有分析
existing = session.query(TrendAnalysis).filter(
TrendAnalysis.analysis_date == today,
TrendAnalysis.analysis_type == 'daily_summary',
TrendAnalysis.category.is_(None)
).first()
if existing:
self.logger.info("今日已有分析報告,跳過生成")
return
# 取得今日資料摘要
records = session.query(TrendRecord).filter(
TrendRecord.trend_date == today
).order_by(TrendRecord.popularity_score.desc()).limit(50).all()
if not records:
return
# 準備分析內容
content_for_analysis = "\n".join([
f"- [{r.source}] {r.title} (熱度:{r.popularity_score})"
for r in records
])
# 呼叫 Ollama 分析
prompt = f"""請分析以下今日趨勢資料,提供行銷洞察:
{content_for_analysis}
請以 JSON 格式回覆,包含:
1. summary: 整體趨勢摘要 (100字內)
2. hot_keywords: 熱門關鍵字列表 (最多10個)
3. hot_topics: 熱門話題列表 (最多5個)
4. consumer_insights: 消費者洞察 (3-5點)
5. marketing_suggestions: 行銷建議 (3-5點)
6. copywriting_hints: 文案撰寫提示 (3-5個)"""
response = self.ollama.generate(prompt, temperature=0.6, timeout=180)
if response.success:
try:
# 嘗試解析 JSON
content = response.content
json_start = content.find('{')
json_end = content.rfind('}') + 1
if json_start >= 0 and json_end > json_start:
analysis_data = json.loads(content[json_start:json_end])
else:
analysis_data = {'summary': content}
except json.JSONDecodeError:
analysis_data = {'summary': response.content}
analysis = TrendAnalysis(
analysis_date=today,
category=None, # 全品類
analysis_type='daily_summary',
summary=analysis_data.get('summary', ''),
hot_keywords=json.dumps(analysis_data.get('hot_keywords', []), ensure_ascii=False),
hot_topics=json.dumps(analysis_data.get('hot_topics', []), ensure_ascii=False),
consumer_insights=json.dumps(analysis_data.get('consumer_insights', []), ensure_ascii=False),
marketing_suggestions=json.dumps(analysis_data.get('marketing_suggestions', []), ensure_ascii=False),
copywriting_hints=json.dumps(analysis_data.get('copywriting_hints', []), ensure_ascii=False),
record_count=len(records),
model_used=response.model,
generation_time=response.total_duration
)
session.add(analysis)
session.commit()
self.logger.info("AI 趨勢分析報告已生成")
else:
self.logger.error(f"AI 分析失敗: {response.error}")
def web_search_with_cache(self, query: str, search_type: str = 'general',
cache_hours: int = 24) -> dict:
"""
帶快取的 Web Search
Args:
query: 搜尋查詢
search_type: 搜尋類型
cache_hours: 快取時間 (小時)
Returns:
dict: 搜尋結果
"""
# 計算查詢雜湊
query_hash = WebSearchCache.generate_hash(query, search_type)
session = get_session()
try:
# 檢查快取
cache = session.query(WebSearchCache).filter(
WebSearchCache.query_hash == query_hash,
WebSearchCache.expires_at > datetime.now()
).first()
if cache:
# 命中快取
self.logger.info(f"Web Search 快取命中: {query}")
return {
'success': True,
'cached': True,
'data': {
'query': cache.query,
'search_type': cache.search_type,
'result': json.loads(cache.result_json) if cache.result_json else None,
'summary': cache.summary,
'model': cache.model_used,
'duration': 0.0
}
}
# 執行搜尋
response = self.ollama.web_search(query, search_type=search_type)
if response.success:
# 解析結果
try:
content = response.content
json_start = content.find('{')
json_end = content.rfind('}') + 1
if json_start >= 0 and json_end > json_start:
parsed = json.loads(content[json_start:json_end])
else:
parsed = {'raw': content}
except json.JSONDecodeError:
parsed = {'raw': response.content}
# 儲存快取
new_cache = WebSearchCache(
query_hash=query_hash,
query=query,
search_type=search_type,
result_json=json.dumps(parsed, ensure_ascii=False),
summary=parsed.get('summary', ''),
result_count=len(parsed.get('results', [])),
model_used=response.model,
generation_time=response.total_duration,
expires_at=datetime.now() + timedelta(hours=cache_hours)
)
session.add(new_cache)
session.commit()
return {
'success': True,
'cached': False,
'data': {
'query': query,
'search_type': search_type,
'result': parsed,
'summary': parsed.get('summary', ''),
'model': response.model,
'duration': response.total_duration
}
}
else:
return {
'success': False,
'error': response.error
}
except Exception as e:
self.logger.error(f"Web Search 失敗: {e}")
return {
'success': False,
'error': str(e)
}
finally:
session.close()
def search_trends_for_category(self, category: str) -> dict:
"""
搜尋特定分類的趨勢資訊
Args:
category: 商品分類
Returns:
dict: 趨勢資訊
"""
queries = {
'美妝': ['2026美妝趨勢', 'PTT美妝推薦', '韓系彩妝新品'],
'3C': ['2026 3C新品', 'PTT 3C開箱', '科技產品評測'],
'服飾': ['2026服飾流行', '穿搭趨勢', 'Dcard穿搭版'],
'居家': ['居家好物推薦', '收納神器', '家電開箱'],
'電商': ['電商優惠活動', '購物節', '限時特賣'],
}
category_queries = queries.get(category, [f'{category}趨勢'])
results = []
for q in category_queries:
result = self.web_search_with_cache(q, search_type='trends')
if result['success']:
results.append({
'query': q,
'data': result['data']
})
return {
'category': category,
'search_results': results,
'timestamp': datetime.now().isoformat()
}
def get_trend_stats(self, days: int = 7) -> dict:
"""
取得趨勢統計
Args:
days: 統計天數
Returns:
dict: 統計資料
"""
from sqlalchemy import func
date_from = date.today() - timedelta(days=days)
session = get_session()
try:
# 各來源統計
source_stats = session.query(
TrendRecord.source,
func.count(TrendRecord.id).label('count'),
func.avg(TrendRecord.popularity_score).label('avg_popularity')
).filter(
TrendRecord.trend_date >= date_from
).group_by(TrendRecord.source).all()
# 各分類統計
category_stats = session.query(
TrendRecord.category,
func.count(TrendRecord.id).label('count')
).filter(
TrendRecord.trend_date >= date_from,
TrendRecord.category.isnot(None)
).group_by(TrendRecord.category).all()
# 每日趨勢量
daily_counts = session.query(
TrendRecord.trend_date,
func.count(TrendRecord.id).label('count')
).filter(
TrendRecord.trend_date >= date_from
).group_by(TrendRecord.trend_date).order_by(TrendRecord.trend_date).all()
return {
'success': True,
'date_range': {
'from': date_from.isoformat(),
'to': date.today().isoformat()
},
'source_stats': [
{
'source': s.source,
'count': s.count,
'avg_popularity': round(s.avg_popularity or 0, 1)
}
for s in source_stats
],
'category_stats': [
{'category': c.category, 'count': c.count}
for c in category_stats
],
'daily_counts': [
{'date': d.trend_date.isoformat(), 'count': d.count}
for d in daily_counts
]
}
except Exception as e:
self.logger.error(f"取得趨勢統計失敗: {e}")
return {
'success': False,
'error': str(e)
}
finally:
session.close()
# 全域服務實例
_service_instance = None
def get_trend_crawler_service() -> TrendCrawlerService:
"""取得趨勢爬蟲服務實例"""
global _service_instance
if _service_instance is None:
_service_instance = TrendCrawlerService()
return _service_instance