Some checks failed
CD Pipeline / deploy (push) Failing after 59s
- 建立 Gitea Actions CD pipeline (.gitea/workflows/cd.yaml) - 部署模式: rsync Python 檔案至 188 → docker restart (volume mount) - Dockerfile/requirements 變動時自動重建 Docker image - 部署通知: Telegram (開始/成功/失敗) - 健康檢查: https://mo.wooo.work/health (最多 5 次重試) - 同步最新 CLAUDE.md / ADR-008 / memory (2026-04-19) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
628 lines
23 KiB
Python
628 lines
23 KiB
Python
"""
|
|
趨勢爬蟲服務
|
|
|
|
負責:
|
|
- 爬取各來源趨勢資料 (Google News, PTT, Dcard, YouTube)
|
|
- 儲存到資料庫
|
|
- 萃取關鍵字
|
|
- 呼叫 Ollama 進行 AI 分析
|
|
"""
|
|
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
from collections import defaultdict
|
|
from datetime import datetime, date, timedelta
|
|
from typing import List, Dict, Optional, Tuple
|
|
|
|
from database.trend_models import (
|
|
TrendRecord, TrendKeyword, TrendAnalysis, WebSearchCache,
|
|
get_category_for_board, PTT_BOARDS, DCARD_BOARDS
|
|
)
|
|
from database.manager import get_session
|
|
from services.ollama_service import OllamaService
|
|
from services.trend_crawler import TrendCrawler, TrendData, NewsItem, SocialPost, YouTubeVideo
|
|
|
|
# 設定 logging
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class TrendCrawlerService:
|
|
"""趨勢爬蟲服務 - 含資料庫儲存"""
|
|
|
|
def __init__(self):
|
|
self.crawler = TrendCrawler()
|
|
self.ollama = OllamaService()
|
|
self.logger = logger
|
|
|
|
def crawl_and_save_all(self,
|
|
sources: List[str] = None,
|
|
categories: List[str] = None,
|
|
analyze: bool = True) -> dict:
|
|
"""
|
|
爬取所有來源並儲存到資料庫
|
|
|
|
Args:
|
|
sources: 指定來源 ['google_news', 'ptt', 'dcard', 'youtube']
|
|
categories: 指定分類
|
|
analyze: 是否進行 AI 分析
|
|
|
|
Returns:
|
|
dict: 爬取結果統計
|
|
"""
|
|
results = {
|
|
'total_records': 0,
|
|
'new_records': 0,
|
|
'keywords_extracted': 0,
|
|
'sources': {},
|
|
'errors': []
|
|
}
|
|
|
|
session = get_session()
|
|
try:
|
|
# 1. 爬取各來源資料
|
|
self.logger.info("開始爬取趨勢資料...")
|
|
trend_data = self.crawler.get_all_trends(
|
|
categories=categories,
|
|
include_social=True
|
|
)
|
|
|
|
# 2. 儲存新聞資料
|
|
if not sources or 'google_news' in sources:
|
|
try:
|
|
news_result = self._save_news_records(session, trend_data.news_items)
|
|
results['sources']['google_news'] = news_result
|
|
results['total_records'] += news_result['total']
|
|
results['new_records'] += news_result['new']
|
|
except Exception as e:
|
|
self.logger.error(f"儲存新聞資料失敗: {e}")
|
|
results['errors'].append(f"google_news: {str(e)}")
|
|
|
|
# 3. 儲存社群資料 (PTT/Dcard)
|
|
if not sources or 'ptt' in sources or 'dcard' in sources:
|
|
try:
|
|
social_result = self._save_social_records(session, trend_data.social_posts)
|
|
results['sources']['social'] = social_result
|
|
results['total_records'] += social_result['total']
|
|
results['new_records'] += social_result['new']
|
|
except Exception as e:
|
|
self.logger.error(f"儲存社群資料失敗: {e}")
|
|
results['errors'].append(f"social: {str(e)}")
|
|
|
|
# 4. 儲存 YouTube 資料
|
|
if not sources or 'youtube' in sources:
|
|
try:
|
|
youtube_result = self._save_youtube_records(session, trend_data.youtube_videos)
|
|
results['sources']['youtube'] = youtube_result
|
|
results['total_records'] += youtube_result['total']
|
|
results['new_records'] += youtube_result['new']
|
|
except Exception as e:
|
|
self.logger.error(f"儲存 YouTube 資料失敗: {e}")
|
|
results['errors'].append(f"youtube: {str(e)}")
|
|
|
|
# 5. 萃取關鍵字
|
|
try:
|
|
keywords_count = self._extract_and_save_keywords(session)
|
|
results['keywords_extracted'] = keywords_count
|
|
except Exception as e:
|
|
self.logger.error(f"萃取關鍵字失敗: {e}")
|
|
results['errors'].append(f"keywords: {str(e)}")
|
|
|
|
session.commit()
|
|
|
|
# 6. AI 分析 (commit 後執行)
|
|
if analyze and results['new_records'] > 0:
|
|
try:
|
|
self._generate_ai_analysis(session)
|
|
except Exception as e:
|
|
self.logger.error(f"AI 分析失敗: {e}")
|
|
results['errors'].append(f"ai_analysis: {str(e)}")
|
|
|
|
self.logger.info(f"趨勢爬取完成: {results}")
|
|
return results
|
|
|
|
except Exception as e:
|
|
session.rollback()
|
|
self.logger.error(f"趨勢爬取失敗: {e}")
|
|
raise
|
|
finally:
|
|
session.close()
|
|
|
|
def _save_news_records(self, session, news_items: List[NewsItem]) -> dict:
|
|
"""儲存新聞記錄"""
|
|
total = len(news_items)
|
|
new_count = 0
|
|
|
|
for news in news_items:
|
|
try:
|
|
# 產生唯一識別碼
|
|
source_id = hashlib.md5(news.link.encode(), usedforsecurity=False).hexdigest() if news.link else None
|
|
|
|
if not source_id:
|
|
continue
|
|
|
|
# 檢查是否已存在
|
|
exists = session.query(TrendRecord).filter(
|
|
TrendRecord.source == 'google_news',
|
|
TrendRecord.source_id == source_id
|
|
).first()
|
|
|
|
if not exists:
|
|
record = TrendRecord(
|
|
source='google_news',
|
|
source_url=news.link,
|
|
source_id=source_id,
|
|
title=news.title,
|
|
content=news.summary,
|
|
author=news.source,
|
|
category=news.category,
|
|
published_at=news.published,
|
|
trend_date=news.published.date() if news.published else date.today(),
|
|
)
|
|
session.add(record)
|
|
new_count += 1
|
|
except Exception as e:
|
|
self.logger.warning(f"儲存新聞記錄失敗: {e}")
|
|
|
|
return {'total': total, 'new': new_count}
|
|
|
|
def _save_social_records(self, session, social_posts: List[SocialPost]) -> dict:
|
|
"""儲存社群記錄 (PTT/Dcard)"""
|
|
total = len(social_posts)
|
|
new_count = 0
|
|
|
|
for post in social_posts:
|
|
try:
|
|
source = post.platform.lower() if post.platform else 'unknown'
|
|
source_id = post.post_id or hashlib.md5(
|
|
f"{source}:{post.title}:{post.created_at}".encode(), usedforsecurity=False
|
|
).hexdigest()
|
|
|
|
exists = session.query(TrendRecord).filter(
|
|
TrendRecord.source == source,
|
|
TrendRecord.source_id == source_id
|
|
).first()
|
|
|
|
if not exists:
|
|
record = TrendRecord(
|
|
source=source,
|
|
source_board=post.board,
|
|
source_url=post.url,
|
|
source_id=source_id,
|
|
title=post.title,
|
|
content=post.content,
|
|
author=post.author,
|
|
popularity_score=post.push_count or post.like_count or 0,
|
|
comment_count=post.comment_count or 0,
|
|
category=get_category_for_board(post.board) if post.board else '其他',
|
|
published_at=post.created_at,
|
|
trend_date=post.created_at.date() if post.created_at else date.today(),
|
|
)
|
|
session.add(record)
|
|
new_count += 1
|
|
except Exception as e:
|
|
self.logger.warning(f"儲存社群記錄失敗: {e}")
|
|
|
|
return {'total': total, 'new': new_count}
|
|
|
|
def _save_youtube_records(self, session, youtube_videos: List[YouTubeVideo]) -> dict:
|
|
"""儲存 YouTube 記錄"""
|
|
total = len(youtube_videos)
|
|
new_count = 0
|
|
|
|
for video in youtube_videos:
|
|
try:
|
|
source_id = video.video_id or hashlib.md5(video.url.encode(), usedforsecurity=False).hexdigest() if video.url else None
|
|
|
|
if not source_id:
|
|
continue
|
|
|
|
exists = session.query(TrendRecord).filter(
|
|
TrendRecord.source == 'youtube',
|
|
TrendRecord.source_id == source_id
|
|
).first()
|
|
|
|
if not exists:
|
|
record = TrendRecord(
|
|
source='youtube',
|
|
source_url=video.url,
|
|
source_id=source_id,
|
|
title=video.title,
|
|
content=video.description,
|
|
author=video.channel,
|
|
popularity_score=video.view_count or 0,
|
|
category=video.category,
|
|
published_at=video.published_at,
|
|
trend_date=video.published_at.date() if video.published_at else date.today(),
|
|
)
|
|
session.add(record)
|
|
new_count += 1
|
|
except Exception as e:
|
|
self.logger.warning(f"儲存 YouTube 記錄失敗: {e}")
|
|
|
|
return {'total': total, 'new': new_count}
|
|
|
|
def _extract_and_save_keywords(self, session) -> int:
|
|
"""從今日記錄中萃取關鍵字"""
|
|
today = date.today()
|
|
|
|
# 取得今日所有記錄
|
|
records = session.query(TrendRecord).filter(
|
|
TrendRecord.trend_date == today
|
|
).all()
|
|
|
|
if not records:
|
|
return 0
|
|
|
|
# 使用簡單的關鍵字萃取 (可以後續整合 jieba)
|
|
keyword_counts = defaultdict(lambda: {'count': 0, 'sources': set(), 'categories': set()})
|
|
|
|
for record in records:
|
|
text = f"{record.title} {record.content or ''}"
|
|
keywords = self._extract_keywords_simple(text)
|
|
|
|
for kw in keywords:
|
|
keyword_counts[kw]['count'] += 1
|
|
keyword_counts[kw]['sources'].add(record.source)
|
|
if record.category:
|
|
keyword_counts[kw]['categories'].add(record.category)
|
|
|
|
# 儲存關鍵字 (只保留出現 2 次以上的)
|
|
saved_count = 0
|
|
for keyword, data in keyword_counts.items():
|
|
if data['count'] >= 2 and len(keyword) >= 2:
|
|
for source in data['sources']:
|
|
try:
|
|
existing = session.query(TrendKeyword).filter(
|
|
TrendKeyword.keyword == keyword,
|
|
TrendKeyword.source == source,
|
|
TrendKeyword.trend_date == today
|
|
).first()
|
|
|
|
category = list(data['categories'])[0] if data['categories'] else None
|
|
|
|
if existing:
|
|
existing.mention_count = data['count']
|
|
existing.category = category
|
|
else:
|
|
kw_record = TrendKeyword(
|
|
keyword=keyword,
|
|
source=source,
|
|
mention_count=data['count'],
|
|
trend_date=today,
|
|
category=category
|
|
)
|
|
session.add(kw_record)
|
|
saved_count += 1
|
|
except Exception as e:
|
|
self.logger.warning(f"儲存關鍵字失敗: {e}")
|
|
|
|
return saved_count
|
|
|
|
def _extract_keywords_simple(self, text: str) -> List[str]:
|
|
"""簡單的關鍵字萃取 (可以後續用 jieba 替換)"""
|
|
import re
|
|
|
|
# 移除特殊字元
|
|
text = re.sub(r'[^\w\s\u4e00-\u9fff]', ' ', text)
|
|
|
|
# 定義常見的停用詞
|
|
stop_words = {
|
|
'的', '了', '是', '在', '我', '有', '和', '就', '不', '人', '都', '一',
|
|
'一個', '上', '也', '很', '到', '說', '要', '去', '你', '會', '著',
|
|
'沒有', '看', '好', '自己', '這', '那', '什麼', '如果', '因為', '所以',
|
|
'the', 'a', 'an', 'is', 'are', 'was', 'were', 'be', 'been', 'being',
|
|
}
|
|
|
|
# 分割並過濾
|
|
words = text.split()
|
|
keywords = []
|
|
|
|
for word in words:
|
|
word = word.strip()
|
|
if len(word) >= 2 and word.lower() not in stop_words:
|
|
# 檢查是否是中文詞彙或有意義的英文
|
|
if re.match(r'^[\u4e00-\u9fff]+$', word) and len(word) >= 2:
|
|
keywords.append(word)
|
|
elif re.match(r'^[a-zA-Z]+$', word) and len(word) >= 3:
|
|
keywords.append(word.lower())
|
|
|
|
return keywords
|
|
|
|
def _generate_ai_analysis(self, session) -> None:
|
|
"""使用 Ollama 生成趨勢分析"""
|
|
today = date.today()
|
|
|
|
# 檢查今日是否已有分析
|
|
existing = session.query(TrendAnalysis).filter(
|
|
TrendAnalysis.analysis_date == today,
|
|
TrendAnalysis.analysis_type == 'daily_summary',
|
|
TrendAnalysis.category.is_(None)
|
|
).first()
|
|
|
|
if existing:
|
|
self.logger.info("今日已有分析報告,跳過生成")
|
|
return
|
|
|
|
# 取得今日資料摘要
|
|
records = session.query(TrendRecord).filter(
|
|
TrendRecord.trend_date == today
|
|
).order_by(TrendRecord.popularity_score.desc()).limit(50).all()
|
|
|
|
if not records:
|
|
return
|
|
|
|
# 準備分析內容
|
|
content_for_analysis = "\n".join([
|
|
f"- [{r.source}] {r.title} (熱度:{r.popularity_score})"
|
|
for r in records
|
|
])
|
|
|
|
# 呼叫 Ollama 分析
|
|
prompt = f"""請分析以下今日趨勢資料,提供行銷洞察:
|
|
|
|
{content_for_analysis}
|
|
|
|
請以 JSON 格式回覆,包含:
|
|
1. summary: 整體趨勢摘要 (100字內)
|
|
2. hot_keywords: 熱門關鍵字列表 (最多10個)
|
|
3. hot_topics: 熱門話題列表 (最多5個)
|
|
4. consumer_insights: 消費者洞察 (3-5點)
|
|
5. marketing_suggestions: 行銷建議 (3-5點)
|
|
6. copywriting_hints: 文案撰寫提示 (3-5個)"""
|
|
|
|
response = self.ollama.generate(prompt, temperature=0.6, timeout=180)
|
|
|
|
if response.success:
|
|
try:
|
|
# 嘗試解析 JSON
|
|
content = response.content
|
|
json_start = content.find('{')
|
|
json_end = content.rfind('}') + 1
|
|
if json_start >= 0 and json_end > json_start:
|
|
analysis_data = json.loads(content[json_start:json_end])
|
|
else:
|
|
analysis_data = {'summary': content}
|
|
except json.JSONDecodeError:
|
|
analysis_data = {'summary': response.content}
|
|
|
|
analysis = TrendAnalysis(
|
|
analysis_date=today,
|
|
category=None, # 全品類
|
|
analysis_type='daily_summary',
|
|
summary=analysis_data.get('summary', ''),
|
|
hot_keywords=json.dumps(analysis_data.get('hot_keywords', []), ensure_ascii=False),
|
|
hot_topics=json.dumps(analysis_data.get('hot_topics', []), ensure_ascii=False),
|
|
consumer_insights=json.dumps(analysis_data.get('consumer_insights', []), ensure_ascii=False),
|
|
marketing_suggestions=json.dumps(analysis_data.get('marketing_suggestions', []), ensure_ascii=False),
|
|
copywriting_hints=json.dumps(analysis_data.get('copywriting_hints', []), ensure_ascii=False),
|
|
record_count=len(records),
|
|
model_used=response.model,
|
|
generation_time=response.total_duration
|
|
)
|
|
session.add(analysis)
|
|
session.commit()
|
|
self.logger.info("AI 趨勢分析報告已生成")
|
|
else:
|
|
self.logger.error(f"AI 分析失敗: {response.error}")
|
|
|
|
def web_search_with_cache(self, query: str, search_type: str = 'general',
|
|
cache_hours: int = 24) -> dict:
|
|
"""
|
|
帶快取的 Web Search
|
|
|
|
Args:
|
|
query: 搜尋查詢
|
|
search_type: 搜尋類型
|
|
cache_hours: 快取時間 (小時)
|
|
|
|
Returns:
|
|
dict: 搜尋結果
|
|
"""
|
|
# 計算查詢雜湊
|
|
query_hash = WebSearchCache.generate_hash(query, search_type)
|
|
|
|
session = get_session()
|
|
try:
|
|
# 檢查快取
|
|
cache = session.query(WebSearchCache).filter(
|
|
WebSearchCache.query_hash == query_hash,
|
|
WebSearchCache.expires_at > datetime.now()
|
|
).first()
|
|
|
|
if cache:
|
|
# 命中快取
|
|
self.logger.info(f"Web Search 快取命中: {query}")
|
|
return {
|
|
'success': True,
|
|
'cached': True,
|
|
'data': {
|
|
'query': cache.query,
|
|
'search_type': cache.search_type,
|
|
'result': json.loads(cache.result_json) if cache.result_json else None,
|
|
'summary': cache.summary,
|
|
'model': cache.model_used,
|
|
'duration': 0.0
|
|
}
|
|
}
|
|
|
|
# 執行搜尋
|
|
response = self.ollama.web_search(query, search_type=search_type)
|
|
|
|
if response.success:
|
|
# 解析結果
|
|
try:
|
|
content = response.content
|
|
json_start = content.find('{')
|
|
json_end = content.rfind('}') + 1
|
|
if json_start >= 0 and json_end > json_start:
|
|
parsed = json.loads(content[json_start:json_end])
|
|
else:
|
|
parsed = {'raw': content}
|
|
except json.JSONDecodeError:
|
|
parsed = {'raw': response.content}
|
|
|
|
# 儲存快取
|
|
new_cache = WebSearchCache(
|
|
query_hash=query_hash,
|
|
query=query,
|
|
search_type=search_type,
|
|
result_json=json.dumps(parsed, ensure_ascii=False),
|
|
summary=parsed.get('summary', ''),
|
|
result_count=len(parsed.get('results', [])),
|
|
model_used=response.model,
|
|
generation_time=response.total_duration,
|
|
expires_at=datetime.now() + timedelta(hours=cache_hours)
|
|
)
|
|
session.add(new_cache)
|
|
session.commit()
|
|
|
|
return {
|
|
'success': True,
|
|
'cached': False,
|
|
'data': {
|
|
'query': query,
|
|
'search_type': search_type,
|
|
'result': parsed,
|
|
'summary': parsed.get('summary', ''),
|
|
'model': response.model,
|
|
'duration': response.total_duration
|
|
}
|
|
}
|
|
else:
|
|
return {
|
|
'success': False,
|
|
'error': response.error
|
|
}
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Web Search 失敗: {e}")
|
|
return {
|
|
'success': False,
|
|
'error': str(e)
|
|
}
|
|
finally:
|
|
session.close()
|
|
|
|
def search_trends_for_category(self, category: str) -> dict:
|
|
"""
|
|
搜尋特定分類的趨勢資訊
|
|
|
|
Args:
|
|
category: 商品分類
|
|
|
|
Returns:
|
|
dict: 趨勢資訊
|
|
"""
|
|
queries = {
|
|
'美妝': ['2026美妝趨勢', 'PTT美妝推薦', '韓系彩妝新品'],
|
|
'3C': ['2026 3C新品', 'PTT 3C開箱', '科技產品評測'],
|
|
'服飾': ['2026服飾流行', '穿搭趨勢', 'Dcard穿搭版'],
|
|
'居家': ['居家好物推薦', '收納神器', '家電開箱'],
|
|
'電商': ['電商優惠活動', '購物節', '限時特賣'],
|
|
}
|
|
|
|
category_queries = queries.get(category, [f'{category}趨勢'])
|
|
results = []
|
|
|
|
for q in category_queries:
|
|
result = self.web_search_with_cache(q, search_type='trends')
|
|
if result['success']:
|
|
results.append({
|
|
'query': q,
|
|
'data': result['data']
|
|
})
|
|
|
|
return {
|
|
'category': category,
|
|
'search_results': results,
|
|
'timestamp': datetime.now().isoformat()
|
|
}
|
|
|
|
def get_trend_stats(self, days: int = 7) -> dict:
|
|
"""
|
|
取得趨勢統計
|
|
|
|
Args:
|
|
days: 統計天數
|
|
|
|
Returns:
|
|
dict: 統計資料
|
|
"""
|
|
from sqlalchemy import func
|
|
|
|
date_from = date.today() - timedelta(days=days)
|
|
session = get_session()
|
|
|
|
try:
|
|
# 各來源統計
|
|
source_stats = session.query(
|
|
TrendRecord.source,
|
|
func.count(TrendRecord.id).label('count'),
|
|
func.avg(TrendRecord.popularity_score).label('avg_popularity')
|
|
).filter(
|
|
TrendRecord.trend_date >= date_from
|
|
).group_by(TrendRecord.source).all()
|
|
|
|
# 各分類統計
|
|
category_stats = session.query(
|
|
TrendRecord.category,
|
|
func.count(TrendRecord.id).label('count')
|
|
).filter(
|
|
TrendRecord.trend_date >= date_from,
|
|
TrendRecord.category.isnot(None)
|
|
).group_by(TrendRecord.category).all()
|
|
|
|
# 每日趨勢量
|
|
daily_counts = session.query(
|
|
TrendRecord.trend_date,
|
|
func.count(TrendRecord.id).label('count')
|
|
).filter(
|
|
TrendRecord.trend_date >= date_from
|
|
).group_by(TrendRecord.trend_date).order_by(TrendRecord.trend_date).all()
|
|
|
|
return {
|
|
'success': True,
|
|
'date_range': {
|
|
'from': date_from.isoformat(),
|
|
'to': date.today().isoformat()
|
|
},
|
|
'source_stats': [
|
|
{
|
|
'source': s.source,
|
|
'count': s.count,
|
|
'avg_popularity': round(s.avg_popularity or 0, 1)
|
|
}
|
|
for s in source_stats
|
|
],
|
|
'category_stats': [
|
|
{'category': c.category, 'count': c.count}
|
|
for c in category_stats
|
|
],
|
|
'daily_counts': [
|
|
{'date': d.trend_date.isoformat(), 'count': d.count}
|
|
for d in daily_counts
|
|
]
|
|
}
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"取得趨勢統計失敗: {e}")
|
|
return {
|
|
'success': False,
|
|
'error': str(e)
|
|
}
|
|
finally:
|
|
session.close()
|
|
|
|
|
|
# 全域服務實例
|
|
_service_instance = None
|
|
|
|
|
|
def get_trend_crawler_service() -> TrendCrawlerService:
|
|
"""取得趨勢爬蟲服務實例"""
|
|
global _service_instance
|
|
if _service_instance is None:
|
|
_service_instance = TrendCrawlerService()
|
|
return _service_instance
|