""" 趨勢爬蟲服務 負責: - 爬取各來源趨勢資料 (Google News, PTT, Dcard, YouTube) - 儲存到資料庫 - 萃取關鍵字 - 呼叫 Ollama 進行 AI 分析 """ import hashlib import json import logging from collections import defaultdict from datetime import datetime, date, timedelta from typing import List, Dict, Optional, Tuple from database.trend_models import ( TrendRecord, TrendKeyword, TrendAnalysis, WebSearchCache, get_category_for_board, PTT_BOARDS, DCARD_BOARDS ) from database.manager import get_session from services.ollama_service import OllamaService from services.trend_crawler import TrendCrawler, TrendData, NewsItem, SocialPost, YouTubeVideo # 設定 logging logger = logging.getLogger(__name__) class TrendCrawlerService: """趨勢爬蟲服務 - 含資料庫儲存""" def __init__(self): self.crawler = TrendCrawler() self.ollama = OllamaService() self.logger = logger def crawl_and_save_all(self, sources: List[str] = None, categories: List[str] = None, analyze: bool = True) -> dict: """ 爬取所有來源並儲存到資料庫 Args: sources: 指定來源 ['google_news', 'ptt', 'dcard', 'youtube'] categories: 指定分類 analyze: 是否進行 AI 分析 Returns: dict: 爬取結果統計 """ results = { 'total_records': 0, 'new_records': 0, 'keywords_extracted': 0, 'sources': {}, 'errors': [] } session = get_session() try: # 1. 爬取各來源資料 self.logger.info("開始爬取趨勢資料...") trend_data = self.crawler.get_all_trends( categories=categories, include_social=True ) # 2. 儲存新聞資料 if not sources or 'google_news' in sources: try: news_result = self._save_news_records(session, trend_data.news_items) results['sources']['google_news'] = news_result results['total_records'] += news_result['total'] results['new_records'] += news_result['new'] except Exception as e: self.logger.error(f"儲存新聞資料失敗: {e}") results['errors'].append(f"google_news: {str(e)}") # 3. 儲存社群資料 (PTT/Dcard) if not sources or 'ptt' in sources or 'dcard' in sources: try: social_result = self._save_social_records(session, trend_data.social_posts) results['sources']['social'] = social_result results['total_records'] += social_result['total'] results['new_records'] += social_result['new'] except Exception as e: self.logger.error(f"儲存社群資料失敗: {e}") results['errors'].append(f"social: {str(e)}") # 4. 儲存 YouTube 資料 if not sources or 'youtube' in sources: try: youtube_result = self._save_youtube_records(session, trend_data.youtube_videos) results['sources']['youtube'] = youtube_result results['total_records'] += youtube_result['total'] results['new_records'] += youtube_result['new'] except Exception as e: self.logger.error(f"儲存 YouTube 資料失敗: {e}") results['errors'].append(f"youtube: {str(e)}") # 5. 萃取關鍵字 try: keywords_count = self._extract_and_save_keywords(session) results['keywords_extracted'] = keywords_count except Exception as e: self.logger.error(f"萃取關鍵字失敗: {e}") results['errors'].append(f"keywords: {str(e)}") session.commit() # 6. AI 分析 (commit 後執行) if analyze and results['new_records'] > 0: try: self._generate_ai_analysis(session) except Exception as e: self.logger.error(f"AI 分析失敗: {e}") results['errors'].append(f"ai_analysis: {str(e)}") self.logger.info(f"趨勢爬取完成: {results}") return results except Exception as e: session.rollback() self.logger.error(f"趨勢爬取失敗: {e}") raise finally: session.close() def _save_news_records(self, session, news_items: List[NewsItem]) -> dict: """儲存新聞記錄""" total = len(news_items) new_count = 0 for news in news_items: try: # 產生唯一識別碼 source_id = hashlib.md5(news.link.encode(), usedforsecurity=False).hexdigest() if news.link else None if not source_id: continue # 檢查是否已存在 exists = session.query(TrendRecord).filter( TrendRecord.source == 'google_news', TrendRecord.source_id == source_id ).first() if not exists: record = TrendRecord( source='google_news', source_url=news.link, source_id=source_id, title=news.title, content=news.summary, author=news.source, category=news.category, published_at=news.published, trend_date=news.published.date() if news.published else date.today(), ) session.add(record) new_count += 1 except Exception as e: self.logger.warning(f"儲存新聞記錄失敗: {e}") return {'total': total, 'new': new_count} def _save_social_records(self, session, social_posts: List[SocialPost]) -> dict: """儲存社群記錄 (PTT/Dcard)""" total = len(social_posts) new_count = 0 for post in social_posts: try: source = post.platform.lower() if post.platform else 'unknown' source_id = post.post_id or hashlib.md5( f"{source}:{post.title}:{post.created_at}".encode(), usedforsecurity=False ).hexdigest() exists = session.query(TrendRecord).filter( TrendRecord.source == source, TrendRecord.source_id == source_id ).first() if not exists: record = TrendRecord( source=source, source_board=post.board, source_url=post.url, source_id=source_id, title=post.title, content=post.content, author=post.author, popularity_score=post.push_count or post.like_count or 0, comment_count=post.comment_count or 0, category=get_category_for_board(post.board) if post.board else '其他', published_at=post.created_at, trend_date=post.created_at.date() if post.created_at else date.today(), ) session.add(record) new_count += 1 except Exception as e: self.logger.warning(f"儲存社群記錄失敗: {e}") return {'total': total, 'new': new_count} def _save_youtube_records(self, session, youtube_videos: List[YouTubeVideo]) -> dict: """儲存 YouTube 記錄""" total = len(youtube_videos) new_count = 0 for video in youtube_videos: try: source_id = video.video_id or hashlib.md5(video.url.encode(), usedforsecurity=False).hexdigest() if video.url else None if not source_id: continue exists = session.query(TrendRecord).filter( TrendRecord.source == 'youtube', TrendRecord.source_id == source_id ).first() if not exists: record = TrendRecord( source='youtube', source_url=video.url, source_id=source_id, title=video.title, content=video.description, author=video.channel, popularity_score=video.view_count or 0, category=video.category, published_at=video.published_at, trend_date=video.published_at.date() if video.published_at else date.today(), ) session.add(record) new_count += 1 except Exception as e: self.logger.warning(f"儲存 YouTube 記錄失敗: {e}") return {'total': total, 'new': new_count} def _extract_and_save_keywords(self, session) -> int: """從今日記錄中萃取關鍵字""" today = date.today() # 取得今日所有記錄 records = session.query(TrendRecord).filter( TrendRecord.trend_date == today ).all() if not records: return 0 # 使用簡單的關鍵字萃取 (可以後續整合 jieba) keyword_counts = defaultdict(lambda: {'count': 0, 'sources': set(), 'categories': set()}) for record in records: text = f"{record.title} {record.content or ''}" keywords = self._extract_keywords_simple(text) for kw in keywords: keyword_counts[kw]['count'] += 1 keyword_counts[kw]['sources'].add(record.source) if record.category: keyword_counts[kw]['categories'].add(record.category) # 儲存關鍵字 (只保留出現 2 次以上的) saved_count = 0 for keyword, data in keyword_counts.items(): if data['count'] >= 2 and len(keyword) >= 2: for source in data['sources']: try: existing = session.query(TrendKeyword).filter( TrendKeyword.keyword == keyword, TrendKeyword.source == source, TrendKeyword.trend_date == today ).first() category = list(data['categories'])[0] if data['categories'] else None if existing: existing.mention_count = data['count'] existing.category = category else: kw_record = TrendKeyword( keyword=keyword, source=source, mention_count=data['count'], trend_date=today, category=category ) session.add(kw_record) saved_count += 1 except Exception as e: self.logger.warning(f"儲存關鍵字失敗: {e}") return saved_count def _extract_keywords_simple(self, text: str) -> List[str]: """簡單的關鍵字萃取 (可以後續用 jieba 替換)""" import re # 移除特殊字元 text = re.sub(r'[^\w\s\u4e00-\u9fff]', ' ', text) # 定義常見的停用詞 stop_words = { '的', '了', '是', '在', '我', '有', '和', '就', '不', '人', '都', '一', '一個', '上', '也', '很', '到', '說', '要', '去', '你', '會', '著', '沒有', '看', '好', '自己', '這', '那', '什麼', '如果', '因為', '所以', 'the', 'a', 'an', 'is', 'are', 'was', 'were', 'be', 'been', 'being', } # 分割並過濾 words = text.split() keywords = [] for word in words: word = word.strip() if len(word) >= 2 and word.lower() not in stop_words: # 檢查是否是中文詞彙或有意義的英文 if re.match(r'^[\u4e00-\u9fff]+$', word) and len(word) >= 2: keywords.append(word) elif re.match(r'^[a-zA-Z]+$', word) and len(word) >= 3: keywords.append(word.lower()) return keywords def _generate_ai_analysis(self, session) -> None: """使用 Ollama 生成趨勢分析""" today = date.today() # 檢查今日是否已有分析 existing = session.query(TrendAnalysis).filter( TrendAnalysis.analysis_date == today, TrendAnalysis.analysis_type == 'daily_summary', TrendAnalysis.category.is_(None) ).first() if existing: self.logger.info("今日已有分析報告,跳過生成") return # 取得今日資料摘要 records = session.query(TrendRecord).filter( TrendRecord.trend_date == today ).order_by(TrendRecord.popularity_score.desc()).limit(50).all() if not records: return # 準備分析內容 content_for_analysis = "\n".join([ f"- [{r.source}] {r.title} (熱度:{r.popularity_score})" for r in records ]) # 呼叫 Ollama 分析 prompt = f"""請分析以下今日趨勢資料,提供行銷洞察: {content_for_analysis} 請以 JSON 格式回覆,包含: 1. summary: 整體趨勢摘要 (100字內) 2. hot_keywords: 熱門關鍵字列表 (最多10個) 3. hot_topics: 熱門話題列表 (最多5個) 4. consumer_insights: 消費者洞察 (3-5點) 5. marketing_suggestions: 行銷建議 (3-5點) 6. copywriting_hints: 文案撰寫提示 (3-5個)""" response = self.ollama.generate(prompt, temperature=0.6, timeout=180) if response.success: try: # 嘗試解析 JSON content = response.content json_start = content.find('{') json_end = content.rfind('}') + 1 if json_start >= 0 and json_end > json_start: analysis_data = json.loads(content[json_start:json_end]) else: analysis_data = {'summary': content} except json.JSONDecodeError: analysis_data = {'summary': response.content} analysis = TrendAnalysis( analysis_date=today, category=None, # 全品類 analysis_type='daily_summary', summary=analysis_data.get('summary', ''), hot_keywords=json.dumps(analysis_data.get('hot_keywords', []), ensure_ascii=False), hot_topics=json.dumps(analysis_data.get('hot_topics', []), ensure_ascii=False), consumer_insights=json.dumps(analysis_data.get('consumer_insights', []), ensure_ascii=False), marketing_suggestions=json.dumps(analysis_data.get('marketing_suggestions', []), ensure_ascii=False), copywriting_hints=json.dumps(analysis_data.get('copywriting_hints', []), ensure_ascii=False), record_count=len(records), model_used=response.model, generation_time=response.total_duration ) session.add(analysis) session.commit() self.logger.info("AI 趨勢分析報告已生成") else: self.logger.error(f"AI 分析失敗: {response.error}") def web_search_with_cache(self, query: str, search_type: str = 'general', cache_hours: int = 24) -> dict: """ 帶快取的 Web Search Args: query: 搜尋查詢 search_type: 搜尋類型 cache_hours: 快取時間 (小時) Returns: dict: 搜尋結果 """ # 計算查詢雜湊 query_hash = WebSearchCache.generate_hash(query, search_type) session = get_session() try: # 檢查快取 cache = session.query(WebSearchCache).filter( WebSearchCache.query_hash == query_hash, WebSearchCache.expires_at > datetime.now() ).first() if cache: # 命中快取 self.logger.info(f"Web Search 快取命中: {query}") return { 'success': True, 'cached': True, 'data': { 'query': cache.query, 'search_type': cache.search_type, 'result': json.loads(cache.result_json) if cache.result_json else None, 'summary': cache.summary, 'model': cache.model_used, 'duration': 0.0 } } # 執行搜尋 response = self.ollama.web_search(query, search_type=search_type) if response.success: # 解析結果 try: content = response.content json_start = content.find('{') json_end = content.rfind('}') + 1 if json_start >= 0 and json_end > json_start: parsed = json.loads(content[json_start:json_end]) else: parsed = {'raw': content} except json.JSONDecodeError: parsed = {'raw': response.content} # 儲存快取 new_cache = WebSearchCache( query_hash=query_hash, query=query, search_type=search_type, result_json=json.dumps(parsed, ensure_ascii=False), summary=parsed.get('summary', ''), result_count=len(parsed.get('results', [])), model_used=response.model, generation_time=response.total_duration, expires_at=datetime.now() + timedelta(hours=cache_hours) ) session.add(new_cache) session.commit() return { 'success': True, 'cached': False, 'data': { 'query': query, 'search_type': search_type, 'result': parsed, 'summary': parsed.get('summary', ''), 'model': response.model, 'duration': response.total_duration } } else: return { 'success': False, 'error': response.error } except Exception as e: self.logger.error(f"Web Search 失敗: {e}") return { 'success': False, 'error': str(e) } finally: session.close() def search_trends_for_category(self, category: str) -> dict: """ 搜尋特定分類的趨勢資訊 Args: category: 商品分類 Returns: dict: 趨勢資訊 """ queries = { '美妝': ['2026美妝趨勢', 'PTT美妝推薦', '韓系彩妝新品'], '3C': ['2026 3C新品', 'PTT 3C開箱', '科技產品評測'], '服飾': ['2026服飾流行', '穿搭趨勢', 'Dcard穿搭版'], '居家': ['居家好物推薦', '收納神器', '家電開箱'], '電商': ['電商優惠活動', '購物節', '限時特賣'], } category_queries = queries.get(category, [f'{category}趨勢']) results = [] for q in category_queries: result = self.web_search_with_cache(q, search_type='trends') if result['success']: results.append({ 'query': q, 'data': result['data'] }) return { 'category': category, 'search_results': results, 'timestamp': datetime.now().isoformat() } def get_trend_stats(self, days: int = 7) -> dict: """ 取得趨勢統計 Args: days: 統計天數 Returns: dict: 統計資料 """ from sqlalchemy import func date_from = date.today() - timedelta(days=days) session = get_session() try: # 各來源統計 source_stats = session.query( TrendRecord.source, func.count(TrendRecord.id).label('count'), func.avg(TrendRecord.popularity_score).label('avg_popularity') ).filter( TrendRecord.trend_date >= date_from ).group_by(TrendRecord.source).all() # 各分類統計 category_stats = session.query( TrendRecord.category, func.count(TrendRecord.id).label('count') ).filter( TrendRecord.trend_date >= date_from, TrendRecord.category.isnot(None) ).group_by(TrendRecord.category).all() # 每日趨勢量 daily_counts = session.query( TrendRecord.trend_date, func.count(TrendRecord.id).label('count') ).filter( TrendRecord.trend_date >= date_from ).group_by(TrendRecord.trend_date).order_by(TrendRecord.trend_date).all() return { 'success': True, 'date_range': { 'from': date_from.isoformat(), 'to': date.today().isoformat() }, 'source_stats': [ { 'source': s.source, 'count': s.count, 'avg_popularity': round(s.avg_popularity or 0, 1) } for s in source_stats ], 'category_stats': [ {'category': c.category, 'count': c.count} for c in category_stats ], 'daily_counts': [ {'date': d.trend_date.isoformat(), 'count': d.count} for d in daily_counts ] } except Exception as e: self.logger.error(f"取得趨勢統計失敗: {e}") return { 'success': False, 'error': str(e) } finally: session.close() # 全域服務實例 _service_instance = None def get_trend_crawler_service() -> TrendCrawlerService: """取得趨勢爬蟲服務實例""" global _service_instance if _service_instance is None: _service_instance = TrendCrawlerService() return _service_instance