Files
ewoooc/routes/ai_routes.py
ogt 06418878e0
All checks were successful
CD Pipeline / deploy (push) Successful in 1m11s
feat: add momo review candidate queue
2026-06-24 13:09:56 +08:00

2899 lines
110 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
"""
AI 推薦路由模組
提供時事熱點商品推薦與文案生成功能
支援 Ollama-first AI 生成Gemini 僅作 Ollama 失敗備援
"""
from flask import Blueprint, render_template, request, jsonify, session
from auth import login_required
from services.ollama_service import OllamaService
from services.gemini_service import GeminiService, AVAILABLE_GEMINI_MODELS, GEMINI_PRICING
from services.ai_provider import AIProviderService, ai_provider_service, get_ai_status, set_ai_provider
from services.trend_crawler import TrendCrawler
from services.ai_history_service import AIHistoryService, AITemplateService
from database.manager import DatabaseManager
from database.ai_models import AIUsageTracking
from services.pchome_growth_cache_state import get_pchome_growth_cache_epoch
import pandas as pd
import logging
import json
import os
import time
from datetime import datetime, date, timedelta
logger = logging.getLogger(__name__)
# 分類 JSON 路徑
CATEGORIES_JSON_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'data', 'categories.json')
def load_product_categories():
"""從 JSON 檔案載入商品分類列表"""
try:
with open(CATEGORIES_JSON_PATH, 'r', encoding='utf-8') as f:
categories = json.load(f)
return [c['name'] for c in categories]
except (FileNotFoundError, json.JSONDecodeError):
return []
ai_bp = Blueprint('ai', __name__)
_PCHOME_REVALIDATION_PREVIEW_CACHE = {
"expires_at": 0.0,
"payload": None,
}
_PCHOME_REVALIDATION_PREVIEW_TTL_SECONDS = 60
_PCHOME_STALE_RECOVERY_PREVIEW_CACHE = {
"expires_at": 0.0,
"payload": None,
}
_PCHOME_STALE_RECOVERY_PREVIEW_TTL_SECONDS = 60
_ICAIM_DASHBOARD_CACHE = {
"expires_at": 0.0,
"cached_at": 0.0,
"payload": None,
}
_ICAIM_DASHBOARD_TTL_SECONDS = 120
_ICAIM_DASHBOARD_STALE_TTL_SECONDS = 900
_ICAIM_MATCH_SCORE_FLOOR = 0.76
_ICAIM_DB_STATEMENT_TIMEOUT_MS = 5000
_PCHOME_GROWTH_CACHE = {
"expires_at": 0.0,
"epoch": 0.0,
"payload": None,
}
_PCHOME_GROWTH_TTL_SECONDS = 120
# 服務實例
ollama_service = OllamaService()
gemini_service = GeminiService()
trend_crawler = TrendCrawler()
ai_history_service = AIHistoryService()
ai_template_service = AITemplateService()
def _safe_primary_provider(provider: str | None) -> str:
normalized = (provider or 'ollama').strip().lower()
if normalized == 'gemini':
return 'ollama'
if normalized in ('ollama', 'elephant'):
return normalized
return 'ollama'
def _safe_recommended_provider(provider: str | None) -> str:
"""首屏不得把 Gemini 顯示成主要提供者。"""
normalized = _safe_primary_provider(provider)
return normalized if normalized in ('ollama', 'elephant') else 'none'
def _get_ai_status_for_initial_render():
"""取得首屏用 AI 狀態快照,不做同步網路健康檢查。"""
status_cache = getattr(ai_provider_service, '_status_cache', {}) or {}
cached_status = status_cache.get('data')
if cached_status:
cached_status = dict(cached_status)
cached_status['default_provider'] = _safe_primary_provider(cached_status.get('default_provider'))
cached_status['recommended_provider'] = _safe_recommended_provider(
cached_status.get('recommended_provider') or cached_status.get('default_provider')
)
return cached_status
default_model = getattr(ollama_service, 'model', None) or 'gemma3:4b'
default_provider = _safe_primary_provider(getattr(ai_provider_service, 'default_provider', 'ollama'))
return {
'default_provider': default_provider,
'ollama': {
'connected': None,
'model': default_model,
'available_models': [default_model],
'type': 'local',
'cost': 'free',
},
'gemini': {
'connected': None,
'model': None,
'available_models': AVAILABLE_GEMINI_MODELS,
'type': 'cloud',
'cost': 'paid',
},
'elephant': {
'connected': None,
'model': None,
'available_models': [],
'type': 'cloud',
'cost': 'efficient',
},
'recommended_provider': _safe_recommended_provider(default_provider),
'timestamp': None,
}
@ai_bp.route('/ai_recommend')
@login_required
def ai_recommend():
"""AI 推薦頁面"""
# 首屏渲染不可被 AI 健康檢查阻塞;實際狀態由前端載入後刷新。
ai_status = _get_ai_status_for_initial_render()
# 載入商品分類(從商品看板設定)
product_categories = load_product_categories()
return render_template('ai_recommend.html',
active_page='ai_recommend',
ai_status=ai_status,
ollama_status=ai_status['ollama']['connected'],
gemini_status=ai_status['gemini']['connected'],
available_models=ai_status['ollama']['available_models'],
gemini_models=AVAILABLE_GEMINI_MODELS,
default_provider=ai_status['default_provider'],
product_categories=product_categories)
@ai_bp.route('/api/ai/status')
@login_required
def api_ai_status():
"""檢查 AI 服務狀態(雙提供者)"""
force_refresh = request.args.get('refresh', 'false').lower() == 'true'
status = get_ai_status(force_refresh=force_refresh)
return jsonify({
'success': True,
'data': status
})
@ai_bp.route('/api/ai/set_provider', methods=['POST'])
@login_required
def api_set_provider():
"""切換預設 AI 提供者"""
try:
data = request.get_json()
provider = (data.get('provider', 'ollama') or 'ollama').strip().lower()
if provider == 'gemini':
return jsonify({
'success': False,
'error': 'Gemini 僅可作為 Ollama 失敗備援,不可設為預設提供者'
}), 400
if provider not in ('ollama',):
return jsonify({'success': False, 'error': '無效的提供者,請使用 Ollama 主路徑'}), 400
success = set_ai_provider(provider)
if success:
logger.info("AI 提供者已切換至 Ollama 主路徑")
return jsonify({
'success': True,
'message': '已切換至 Ollama 主路徑',
'provider': provider
})
else:
return jsonify({'success': False, 'error': '切換失敗'}), 500
except Exception as e:
logger.error(f"切換 AI 提供者失敗: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@ai_bp.route('/api/ai/gemini_usage')
@login_required
def api_gemini_usage():
"""取得 Gemini API 使用量和費用統計"""
try:
days = request.args.get('days', 30, type=int)
if days > 365:
days = 365
db = DatabaseManager()
session = db.get_session()
try:
# 計算日期範圍
end_date = date.today()
start_date = end_date - timedelta(days=days)
# 查詢使用量
from sqlalchemy import func
usage_query = session.query(
AIUsageTracking.model_name,
func.sum(AIUsageTracking.input_tokens).label('total_input_tokens'),
func.sum(AIUsageTracking.output_tokens).label('total_output_tokens'),
func.sum(AIUsageTracking.total_tokens).label('total_tokens'),
func.sum(AIUsageTracking.total_cost).label('total_cost'),
func.count(AIUsageTracking.id).label('request_count')
).filter(
AIUsageTracking.provider == 'gemini',
AIUsageTracking.request_date >= start_date,
AIUsageTracking.request_date <= end_date
).group_by(AIUsageTracking.model_name).all()
# 計算總計
total_cost = 0.0
total_tokens = 0
total_requests = 0
by_model = []
for row in usage_query:
model_data = {
'model': row.model_name,
'input_tokens': row.total_input_tokens or 0,
'output_tokens': row.total_output_tokens or 0,
'total_tokens': row.total_tokens or 0,
'total_cost': float(row.total_cost or 0),
'request_count': row.request_count or 0
}
by_model.append(model_data)
total_cost += model_data['total_cost']
total_tokens += model_data['total_tokens']
total_requests += model_data['request_count']
# 每日使用趨勢
daily_query = session.query(
AIUsageTracking.request_date,
func.sum(AIUsageTracking.total_cost).label('daily_cost'),
func.count(AIUsageTracking.id).label('daily_requests')
).filter(
AIUsageTracking.provider == 'gemini',
AIUsageTracking.request_date >= start_date,
AIUsageTracking.request_date <= end_date
).group_by(AIUsageTracking.request_date).order_by(AIUsageTracking.request_date).all()
daily_trend = [
{
'date': row.request_date.isoformat(),
'cost': float(row.daily_cost or 0),
'requests': row.daily_requests or 0
}
for row in daily_query
]
return jsonify({
'success': True,
'data': {
'period': {
'start': start_date.isoformat(),
'end': end_date.isoformat(),
'days': days
},
'summary': {
'total_cost_usd': round(total_cost, 6),
'total_tokens': total_tokens,
'total_requests': total_requests
},
'by_model': by_model,
'daily_trend': daily_trend,
'pricing': GEMINI_PRICING
}
})
finally:
session.close()
except Exception as e:
logger.error(f"取得 Gemini 使用量失敗: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@ai_bp.route('/api/ai/trends')
@login_required
def api_get_trends():
"""獲取趨勢資料"""
try:
categories = request.args.getlist('categories') or ['時尚美妝', '生活居家', '健康保健']
location = request.args.get('location', '臺北市')
time_range = request.args.get('time_range', 'week') # day, week, month
include_social = request.args.get('include_social', 'true').lower() == 'true'
# 驗證時間範圍參數
if time_range not in ['day', 'week', 'month']:
time_range = 'week'
trend_data = trend_crawler.get_all_trends(
categories=categories,
weather_location=location,
time_range=time_range,
include_social=include_social
)
# 轉換為可序列化的格式
result = {
'timestamp': trend_data.timestamp.isoformat(),
'time_range': time_range,
'news': [
{
'title': n.title,
'link': n.link,
'source': n.source,
'category': n.category,
'published': n.published.isoformat()
}
for n in trend_data.news_items[:30] # 最多 30 則
],
'youtube': [
{
'title': v.title,
'video_id': v.video_id,
'url': v.url,
'channel': v.channel_title,
'category': v.category,
'thumbnail': v.thumbnail_url,
'published': v.published.isoformat()
}
for v in trend_data.youtube_videos[:20] # 最多 20 則
],
'social': [
{
'title': p.title,
'link': p.link,
'source': p.source,
'board': p.board,
'category': p.category,
'likes': p.likes,
'comments': p.comments,
'published': p.published.isoformat()
}
for p in trend_data.social_posts[:30] # 最多 30 則
],
'weather': None,
'keywords': trend_data.keywords[:15],
'category_trends': {
cat: titles[:5]
for cat, titles in trend_data.category_trends.items()
}
}
if trend_data.weather:
result['weather'] = {
'location': trend_data.weather.location,
'date': trend_data.weather.date,
'description': trend_data.weather.weather_description,
'min_temp': trend_data.weather.min_temp,
'max_temp': trend_data.weather.max_temp,
'rain_probability': trend_data.weather.rain_probability,
'humidity': trend_data.weather.humidity,
'marketing_suggestions': trend_data.weather.marketing_suggestions
}
return jsonify({'success': True, 'data': result})
except Exception as e:
logger.error(f"獲取趨勢資料失敗: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@ai_bp.route('/api/ai/weather')
@login_required
def api_get_weather():
"""獲取天氣資訊"""
try:
location = request.args.get('location', '臺北市')
weather = trend_crawler.fetch_weather(location)
if weather:
return jsonify({
'success': True,
'data': {
'location': weather.location,
'date': weather.date,
'description': weather.weather_description,
'min_temp': weather.min_temp,
'max_temp': weather.max_temp,
'rain_probability': weather.rain_probability,
'humidity': weather.humidity,
'marketing_suggestions': weather.marketing_suggestions
}
})
else:
return jsonify({'success': False, 'error': '無法獲取天氣資訊'}), 500
except Exception as e:
logger.error(f"獲取天氣失敗: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@ai_bp.route('/api/ai/generate_copy', methods=['POST'])
@login_required
def api_generate_copy():
"""生成銷售文案Ollama-firstGemini 僅作失敗備援)"""
try:
data = request.get_json()
product_name = data.get('product_name', '')
trend_keywords = data.get('trend_keywords', [])
style = data.get('style', '吸睛')
model = data.get('model', None)
provider = data.get('provider', None) # gemini 會被視為 fallback-only不會直接主呼叫
save_to_history = data.get('save_to_history', True)
# 額外的上下文資訊
upcoming_holidays = data.get('upcoming_holidays', [])
bestseller_products = data.get('bestseller_products', [])
if not product_name:
return jsonify({'success': False, 'error': '請提供商品名稱'}), 400
# 使用統一的 AI 提供者服務
result = ai_provider_service.generate_sales_copy(
product_name=product_name,
provider=provider,
model=model,
trend_keywords=trend_keywords,
style=style,
upcoming_holidays=upcoming_holidays,
bestseller_products=bestseller_products
)
if result.success:
history_id = None
user_id = session.get('user_id')
# 儲存到歷史記錄
if save_to_history:
history_id = ai_history_service.save_generation(
generation_type='copy',
output_content=result.content,
product_name=product_name,
input_keywords=trend_keywords,
input_style=style,
model_name=result.model,
generation_duration=result.total_duration,
created_by=user_id,
ai_provider=result.provider,
input_tokens=result.input_tokens,
output_tokens=result.output_tokens
)
# 如果是 Gemini記錄使用量
if result.provider == 'gemini' and result.total_tokens > 0:
_save_gemini_usage(
model_name=result.model,
usage_type='copy',
input_tokens=result.input_tokens,
output_tokens=result.output_tokens,
total_cost=result.total_cost,
duration=result.total_duration,
user_id=user_id,
history_id=history_id
)
return jsonify({
'success': True,
'data': {
'copy': result.content,
'model': result.model,
'provider': result.provider,
'duration': round(result.total_duration, 2) if result.total_duration else None,
'history_id': history_id,
'tokens': {
'input': result.input_tokens,
'output': result.output_tokens,
'total': result.total_tokens
},
'cost': {
'input': result.input_cost,
'output': result.output_cost,
'total': result.total_cost
} if result.provider == 'gemini' else None
}
})
else:
return jsonify({'success': False, 'error': result.error}), 500
except Exception as e:
logger.error(f"生成文案失敗: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
def _save_gemini_usage(model_name: str, usage_type: str, input_tokens: int,
output_tokens: int, total_cost: float, duration: float,
user_id: int = None, history_id: int = None):
"""儲存 Gemini API 使用記錄"""
try:
db = DatabaseManager()
db_session = db.get_session()
try:
usage = AIUsageTracking(
provider='gemini',
model_name=model_name,
usage_type=usage_type,
input_tokens=input_tokens,
output_tokens=output_tokens,
total_tokens=input_tokens + output_tokens,
input_cost=GeminiService.calculate_cost(model_name, input_tokens, 0)['input_cost'],
output_cost=GeminiService.calculate_cost(model_name, 0, output_tokens)['output_cost'],
total_cost=total_cost,
duration=duration,
request_date=date.today(),
created_by=user_id,
history_id=history_id
)
db_session.add(usage)
db_session.commit()
logger.info(f"Gemini 使用記錄已儲存: {model_name}, ${total_cost:.6f}")
finally:
db_session.close()
except Exception as e:
logger.error(f"儲存 Gemini 使用記錄失敗: {e}")
@ai_bp.route('/api/ai/recommend_products', methods=['POST'])
@login_required
def api_recommend_products():
"""根據趨勢推薦商品"""
try:
data = request.get_json()
trend_topic = data.get('trend_topic', '')
trend_description = data.get('trend_description', '')
if not trend_topic:
return jsonify({'success': False, 'error': '請提供趨勢話題'}), 400
# 從資料庫獲取商品列表
db = DatabaseManager()
engine = db.engine
# 查詢商品
try:
df = pd.read_sql("""
SELECT DISTINCT
商品名稱 as name,
商品分類L1 as category_l1,
商品分類L2 as category_l2,
品牌 as brand
FROM products
WHERE 商品名稱 IS NOT NULL
LIMIT 100
""", engine)
products = df.to_dict('records')
except Exception as e:
# 如果 products 表不存在,嘗試從 daily_sales_snapshot 取得
logger.warning(f"products 表查詢失敗: {e}, 嘗試 daily_sales_snapshot")
try:
df = pd.read_sql("""
SELECT DISTINCT
商品名稱 as name,
商品分類L1 as category_l1,
商品分類L2 as category_l2,
品牌 as brand
FROM daily_sales_snapshot
WHERE 商品名稱 IS NOT NULL
LIMIT 100
""", engine)
products = df.to_dict('records')
except Exception as e2:
logger.error(f"查詢商品失敗: {e2}")
products = []
if not products:
return jsonify({'success': False, 'error': '沒有可用的商品資料'}), 400
# 使用 LLM 進行匹配
result = ollama_service.match_products_to_trend(
trend_topic=trend_topic,
trend_description=trend_description,
products=[
{
'name': p.get('name', ''),
'category': f"{p.get('category_l1', '')} > {p.get('category_l2', '')}".strip(' > '),
'brand': p.get('brand', '')
}
for p in products
]
)
if result.success:
# 嘗試解析 JSON 結果
try:
# 提取 JSON 部分
content = result.content
json_match = content[content.find('{'):content.rfind('}') + 1]
recommendations = json.loads(json_match)
return jsonify({
'success': True,
'data': {
'recommendations': recommendations.get('recommendations', []),
'model': result.model,
'duration': round(result.total_duration, 2) if result.total_duration else None
}
})
except json.JSONDecodeError:
# 如果無法解析 JSON返回原始內容
return jsonify({
'success': True,
'data': {
'raw_response': result.content,
'model': result.model
}
})
else:
return jsonify({'success': False, 'error': result.error}), 500
except Exception as e:
logger.error(f"推薦商品失敗: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@ai_bp.route('/api/ai/analyze_weather_products', methods=['POST'])
@login_required
def api_analyze_weather_products():
"""根據天氣分析適合推薦的商品類型"""
try:
data = request.get_json()
location = data.get('location', '臺北市')
# 獲取天氣
weather = trend_crawler.fetch_weather(location)
if not weather:
return jsonify({'success': False, 'error': '無法獲取天氣資訊'}), 500
# 建立分析提示
weather_context = f"""
今日天氣資訊({weather.location}
- 天氣狀況:{weather.weather_description}
- 溫度:{weather.min_temp}°C ~ {weather.max_temp}°C
- 降雨機率:{weather.rain_probability}%
- 濕度:{weather.humidity}%
"""
system_prompt = """你是一位電商行銷專家,擅長根據天氣狀況推薦適合的商品類型。
請根據天氣資訊,分析今天適合推薦哪些類型的商品,並說明原因。"""
prompt = f"""{weather_context}
請分析今天的天氣狀況,推薦適合銷售的商品類型。
請用以下 JSON 格式回覆:
{{
"weather_summary": "天氣簡述",
"recommended_categories": [
{{"category": "商品類型", "reason": "推薦原因", "keywords": ["關鍵字1", "關鍵字2"]}}
],
"marketing_tips": ["行銷建議1", "行銷建議2"]
}}"""
result = ollama_service.generate(prompt, system_prompt=system_prompt, temperature=0.5)
if result.success:
try:
content = result.content
json_match = content[content.find('{'):content.rfind('}') + 1]
analysis = json.loads(json_match)
return jsonify({
'success': True,
'data': {
'weather': {
'location': weather.location,
'description': weather.weather_description,
'temp_range': f"{weather.min_temp}°C ~ {weather.max_temp}°C",
'rain_probability': weather.rain_probability,
'humidity': weather.humidity,
'builtin_suggestions': weather.marketing_suggestions
},
'analysis': analysis,
'model': result.model
}
})
except json.JSONDecodeError:
return jsonify({
'success': True,
'data': {
'weather': {
'location': weather.location,
'description': weather.weather_description,
'temp_range': f"{weather.min_temp}°C ~ {weather.max_temp}°C",
'rain_probability': weather.rain_probability,
'humidity': weather.humidity,
'builtin_suggestions': weather.marketing_suggestions
},
'raw_response': result.content
}
})
else:
return jsonify({'success': False, 'error': result.error}), 500
except Exception as e:
logger.error(f"天氣商品分析失敗:{e}")
return jsonify({'success': False, 'error': str(e)}), 500
@ai_bp.route('/api/ai/batch_generate_copy', methods=['POST'])
@login_required
def api_batch_generate_copy():
"""批次生成多個商品的文案"""
try:
data = request.get_json()
products = data.get('products', []) # [{"name": "...", "keywords": [...]}]
style = data.get('style', '吸睛')
if not products:
return jsonify({'success': False, 'error': '請提供商品列表'}), 400
if len(products) > 10:
return jsonify({'success': False, 'error': '一次最多處理 10 個商品'}), 400
results = []
for product in products:
result = ollama_service.generate_sales_copy(
product_name=product.get('name', ''),
trend_keywords=product.get('keywords', []),
style=style
)
results.append({
'product_name': product.get('name', ''),
'success': result.success,
'copy': result.content if result.success else None,
'error': result.error if not result.success else None
})
success_count = sum(1 for r in results if r['success'])
return jsonify({
'success': True,
'data': {
'results': results,
'total': len(results),
'success_count': success_count
}
})
except Exception as e:
logger.error(f"批次生成文案失敗: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
# ===== AI 歷史記錄 CRUD API =====
@ai_bp.route('/ai_history')
@login_required
def ai_history_page():
"""AI 歷史記錄頁面"""
return render_template('ai_history.html', active_page='ai_history')
@ai_bp.route('/api/ai/history')
@login_required
def api_get_history():
"""獲取 AI 生成歷史列表"""
try:
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 20, type=int)
generation_type = request.args.get('type')
search = request.args.get('search')
is_favorite = request.args.get('favorite')
start_date = request.args.get('start_date')
end_date = request.args.get('end_date')
# 處理布林值
if is_favorite is not None:
is_favorite = is_favorite.lower() == 'true'
# 處理日期
from datetime import datetime
if start_date:
start_date = datetime.fromisoformat(start_date)
if end_date:
end_date = datetime.fromisoformat(end_date)
result = ai_history_service.get_history_list(
page=page,
per_page=per_page,
generation_type=generation_type,
search=search,
is_favorite=is_favorite,
start_date=start_date,
end_date=end_date
)
return jsonify({'success': True, 'data': result})
except Exception as e:
logger.error(f"獲取 AI 歷史記錄失敗: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@ai_bp.route('/api/ai/history/<int:history_id>')
@login_required
def api_get_history_detail(history_id):
"""獲取單筆歷史記錄"""
try:
history = ai_history_service.get_by_id(history_id)
if history:
return jsonify({'success': True, 'data': history})
else:
return jsonify({'success': False, 'error': '記錄不存在'}), 404
except Exception as e:
logger.error(f"獲取 AI 歷史記錄詳情失敗: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@ai_bp.route('/api/ai/history/<int:history_id>', methods=['PUT'])
@login_required
def api_update_history(history_id):
"""更新歷史記錄"""
try:
data = request.get_json()
success = ai_history_service.update(
history_id=history_id,
output_content=data.get('output_content'),
rating=data.get('rating'),
is_favorite=data.get('is_favorite'),
is_used=data.get('is_used'),
notes=data.get('notes')
)
if success:
return jsonify({'success': True, 'message': '更新成功'})
else:
return jsonify({'success': False, 'error': '記錄不存在或更新失敗'}), 404
except Exception as e:
logger.error(f"更新 AI 歷史記錄失敗: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@ai_bp.route('/api/ai/history/<int:history_id>', methods=['DELETE'])
@login_required
def api_delete_history(history_id):
"""刪除歷史記錄"""
try:
success = ai_history_service.delete(history_id)
if success:
return jsonify({'success': True, 'message': '刪除成功'})
else:
return jsonify({'success': False, 'error': '記錄不存在或刪除失敗'}), 404
except Exception as e:
logger.error(f"刪除 AI 歷史記錄失敗: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@ai_bp.route('/api/ai/history/batch', methods=['DELETE'])
@login_required
def api_batch_delete_history():
"""批次刪除歷史記錄"""
try:
data = request.get_json()
ids = data.get('ids', [])
if not ids:
return jsonify({'success': False, 'error': '請提供要刪除的 ID 列表'}), 400
result = ai_history_service.batch_delete(ids)
return jsonify({
'success': True,
'data': result
})
except Exception as e:
logger.error(f"批次刪除 AI 歷史記錄失敗: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@ai_bp.route('/api/ai/history/<int:history_id>/favorite', methods=['POST'])
@login_required
def api_toggle_favorite(history_id):
"""切換收藏狀態"""
try:
new_status = ai_history_service.toggle_favorite(history_id)
if new_status is not None:
return jsonify({
'success': True,
'data': {'is_favorite': new_status}
})
else:
return jsonify({'success': False, 'error': '記錄不存在'}), 404
except Exception as e:
logger.error(f"切換收藏狀態失敗: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@ai_bp.route('/api/ai/statistics')
@login_required
def api_get_statistics():
"""獲取 AI 統計資料"""
try:
days = request.args.get('days', 30, type=int)
stats = ai_history_service.get_statistics(days=days)
return jsonify({'success': True, 'data': stats})
except Exception as e:
logger.error(f"獲取 AI 統計資料失敗: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
# ===== AI 模板 API =====
@ai_bp.route('/api/ai/templates')
@login_required
def api_get_templates():
"""獲取 AI 提示模板列表"""
try:
template_type = request.args.get('type')
templates = ai_template_service.get_all_templates(template_type=template_type)
return jsonify({'success': True, 'data': templates})
except Exception as e:
logger.error(f"獲取 AI 模板列表失敗: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@ai_bp.route('/api/ai/templates/<int:template_id>')
@login_required
def api_get_template(template_id):
"""獲取單個模板"""
try:
template = ai_template_service.get_by_id(template_id)
if template:
return jsonify({'success': True, 'data': template})
else:
return jsonify({'success': False, 'error': '模板不存在'}), 404
except Exception as e:
logger.error(f"獲取 AI 模板失敗: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
# ===== 熱銷商品 API =====
@ai_bp.route('/api/ai/bestsellers')
@login_required
def api_get_bestsellers():
"""取得熱銷商品(支援 PChome 和 MOMO"""
try:
category = request.args.get('category', '面膜')
limit = request.args.get('limit', 5, type=int)
platform = request.args.get('platform', 'pchome').lower() # pchome 或 momo
if limit > 10:
limit = 10 # 限制最多 10 個
if platform == 'momo':
from services.momo_crawler import get_momo_bestsellers
success, message, products = get_momo_bestsellers(category, limit=limit)
source = 'MOMO購物網'
else:
from services.pchome_crawler import get_pchome_bestsellers
success, message, products = get_pchome_bestsellers(category, limit=limit)
source = 'PChome 24h'
if success:
return jsonify({
'success': True,
'data': {
'category': category,
'products': products,
'source': source,
'platform': platform
}
})
else:
return jsonify({'success': False, 'error': message}), 500
except Exception as e:
logger.error(f"取得熱銷商品失敗: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@ai_bp.route('/api/ai/sales_suggestion', methods=['POST'])
@login_required
def api_generate_sales_suggestion():
"""生成商品銷售策略建議"""
try:
data = request.get_json()
product_name = data.get('product_name', '')
trend_keywords = data.get('trend_keywords', [])
upcoming_holidays = data.get('upcoming_holidays', [])
bestseller_products = data.get('bestseller_products', [])
model = data.get('model', None)
if not product_name:
return jsonify({'success': False, 'error': '請提供商品名稱'}), 400
# 建立上下文
context_parts = []
if trend_keywords:
context_parts.append(f"熱門趨勢關鍵字:{', '.join(trend_keywords)}")
if upcoming_holidays:
holidays_text = []
for h in upcoming_holidays[:3]:
name = h.get('name', '')
days = h.get('days_until', 0)
if days == 0:
holidays_text.append(f"{name}(今天)")
elif days == 1:
holidays_text.append(f"{name}(明天)")
else:
holidays_text.append(f"{name}{days}天後)")
if holidays_text:
context_parts.append(f"即將到來的節日:{', '.join(holidays_text)}")
if bestseller_products:
products_text = [f"{p.get('name', '')}${p.get('price', '')}" for p in bestseller_products[:5]]
if products_text:
context_parts.append(f"市場競品參考:{', '.join(products_text)}")
context = '\n'.join(context_parts) if context_parts else '無額外市場資訊'
system_prompt = """你是一位資深電商銷售策略顧問,專精於台灣市場。
你的建議特點:
- 精準、可執行的銷售策略
- 考慮節日、季節、市場趨勢
- 針對競品進行差異化定位
- 提供具體的促銷方案建議"""
prompt = f"""請為以下商品提供銷售策略建議:
商品名稱:{product_name}
市場資訊:
{context}
請提供:
1. 【定位建議】這個商品應該如何定位1-2句
2. 【目標客群】主要目標客群是誰?
3. 【促銷策略】針對近期節日或趨勢的促銷建議2-3點
4. 【差異化賣點】相較競品的差異化賣點建議
請簡潔回答,每點不超過 30 字。"""
# 如果指定了模型,暫時切換
original_model = ollama_service.model
if model:
ollama_service.model = model
result = ollama_service.generate(prompt, system_prompt=system_prompt, temperature=0.6)
# 恢復原始模型
if model:
ollama_service.model = original_model
if result.success:
return jsonify({
'success': True,
'data': {
'suggestion': result.content,
'model': result.model,
'duration': round(result.total_duration, 2) if result.total_duration else None
}
})
else:
return jsonify({'success': False, 'error': result.error}), 500
except Exception as e:
logger.error(f"生成銷售建議失敗: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@ai_bp.route('/api/ai/cosme_rankings')
@login_required
def api_get_cosme_rankings():
"""取得 COSME 排行榜"""
try:
category = request.args.get('category', 'mask')
limit = request.args.get('limit', 10, type=int)
if limit > 20:
limit = 20
from services.cosme_crawler import get_cosme_rankings, get_cosme_categories
success, message, products = get_cosme_rankings(category, limit=limit)
if success:
return jsonify({
'success': True,
'data': {
'category': category,
'products': products,
'source': 'COSME 台灣',
'categories': get_cosme_categories()
}
})
else:
return jsonify({'success': False, 'error': message}), 500
except Exception as e:
logger.error(f"取得 COSME 排行榜失敗: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@ai_bp.route('/api/ai/mybest_articles')
@login_required
def api_get_mybest_articles():
"""取得 mybest 推薦文章"""
try:
category = request.args.get('category', 'skincare')
limit = request.args.get('limit', 10, type=int)
if limit > 20:
limit = 20
from services.mybest_crawler import get_mybest_articles, get_mybest_categories
success, message, articles = get_mybest_articles(category, limit=limit)
if success:
return jsonify({
'success': True,
'data': {
'category': category,
'articles': articles,
'source': 'mybest 台灣',
'categories': get_mybest_categories()
}
})
else:
return jsonify({'success': False, 'error': message}), 500
except Exception as e:
logger.error(f"取得 mybest 文章失敗: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@ai_bp.route('/api/ai/mybest_latest')
@login_required
def api_get_mybest_latest():
"""取得 mybest 最新文章"""
try:
limit = request.args.get('limit', 20, type=int)
if limit > 30:
limit = 30
from services.mybest_crawler import get_mybest_latest
success, message, articles = get_mybest_latest(limit=limit)
if success:
return jsonify({
'success': True,
'data': {
'articles': articles,
'source': 'mybest 台灣'
}
})
else:
return jsonify({'success': False, 'error': message}), 500
except Exception as e:
logger.error(f"取得 mybest 最新文章失敗: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
# ===== 網路搜尋 API =====
@ai_bp.route('/api/ai/web_search', methods=['POST'])
@login_required
def api_web_search():
"""
AI 網路搜尋 - 使用 Ollama 進行智慧搜尋
Request body:
{
"query": "搜尋關鍵字",
"search_type": "general|news|shopping|trends",
"num_results": 5,
"model": "gemma3:4b" (optional)
}
"""
try:
data = request.get_json()
query = data.get('query', '')
search_type = data.get('search_type', 'general')
num_results = data.get('num_results', 5)
model = data.get('model', None)
save_to_history = data.get('save_to_history', True)
if not query:
return jsonify({'success': False, 'error': '請提供搜尋關鍵字'}), 400
# 驗證搜尋類型
valid_types = ['general', 'news', 'shopping', 'trends']
if search_type not in valid_types:
search_type = 'general'
# 限制結果數量
num_results = min(max(num_results, 1), 10)
# 如果指定了模型,暫時切換
original_model = ollama_service.model
if model:
ollama_service.model = model
result = ollama_service.web_search(
query=query,
num_results=num_results,
search_type=search_type
)
# 恢復原始模型
if model:
ollama_service.model = original_model
if result.success:
# 嘗試解析 JSON 結果
response_data = {
'raw_content': result.content,
'model': result.model,
'duration': round(result.total_duration, 2) if result.total_duration else None,
'search_type': search_type,
'query': query
}
try:
content = result.content
json_start = content.find('{')
json_end = content.rfind('}') + 1
if json_start >= 0 and json_end > json_start:
parsed = json.loads(content[json_start:json_end])
response_data['parsed'] = parsed
except json.JSONDecodeError:
pass # 保持原始內容
# 儲存到歷史記錄
history_id = None
if save_to_history:
user_id = session.get('user_id')
history_id = ai_history_service.save_generation(
generation_type='web_search',
output_content=result.content,
product_name=query,
input_keywords=[search_type],
input_style=search_type,
model_name=result.model,
generation_duration=result.total_duration,
created_by=user_id
)
response_data['history_id'] = history_id
return jsonify({
'success': True,
'data': response_data
})
else:
return jsonify({'success': False, 'error': result.error}), 500
except Exception as e:
logger.error(f"AI 網路搜尋失敗:{e}")
return jsonify({'success': False, 'error': str(e)}), 500
@ai_bp.route('/api/ai/product_insights', methods=['POST'])
@login_required
def api_product_insights():
"""
商品市場洞察分析
Request body:
{
"product_name": "商品名稱",
"include_competitors": true,
"include_trends": true,
"web_context": "網路搜尋結果(可選)",
"model": "gemma3:4b" (optional)
}
"""
try:
data = request.get_json()
product_name = data.get('product_name', '')
include_competitors = data.get('include_competitors', True)
include_trends = data.get('include_trends', True)
web_context = data.get('web_context', '') # 網路搜尋結果
model = data.get('model', None)
save_to_history = data.get('save_to_history', True)
if not product_name:
return jsonify({'success': False, 'error': '請提供商品名稱'}), 400
# 如果指定了模型,暫時切換
original_model = ollama_service.model
if model:
ollama_service.model = model
result = ollama_service.search_product_insights(
product_name=product_name,
include_competitors=include_competitors,
include_trends=include_trends,
web_context=web_context # 傳入網路搜尋結果
)
# 恢復原始模型
if model:
ollama_service.model = original_model
if result.success:
response_data = {
'raw_content': result.content,
'model': result.model,
'duration': round(result.total_duration, 2) if result.total_duration else None,
'product_name': product_name
}
try:
content = result.content
json_start = content.find('{')
json_end = content.rfind('}') + 1
if json_start >= 0 and json_end > json_start:
parsed = json.loads(content[json_start:json_end])
response_data['insights'] = parsed
except json.JSONDecodeError:
pass
# 儲存到歷史記錄
history_id = None
if save_to_history:
user_id = session.get('user_id')
history_id = ai_history_service.save_generation(
generation_type='product_insights',
output_content=result.content,
product_name=product_name,
input_keywords=['competitors' if include_competitors else '', 'trends' if include_trends else ''],
model_name=result.model,
generation_duration=result.total_duration,
created_by=user_id
)
response_data['history_id'] = history_id
return jsonify({
'success': True,
'data': response_data
})
else:
return jsonify({'success': False, 'error': result.error}), 500
except Exception as e:
logger.error(f"商品洞察分析失敗:{e}")
return jsonify({'success': False, 'error': str(e)}), 500
@ai_bp.route('/api/ai/trend_keywords', methods=['POST'])
@login_required
def api_trend_keywords():
"""
搜尋分類熱門關鍵字和趨勢
Request body:
{
"category": "商品分類",
"time_range": "day|week|month",
"model": "gemma3:4b" (optional)
}
"""
try:
data = request.get_json()
category = data.get('category', '')
time_range = data.get('time_range', 'week')
model = data.get('model', None)
save_to_history = data.get('save_to_history', True)
if not category:
return jsonify({'success': False, 'error': '請提供商品分類'}), 400
# 驗證時間範圍
if time_range not in ['day', 'week', 'month']:
time_range = 'week'
# 如果指定了模型,暫時切換
original_model = ollama_service.model
if model:
ollama_service.model = model
result = ollama_service.search_trend_keywords(
category=category,
time_range=time_range
)
# 恢復原始模型
if model:
ollama_service.model = original_model
if result.success:
response_data = {
'raw_content': result.content,
'model': result.model,
'duration': round(result.total_duration, 2) if result.total_duration else None,
'category': category,
'time_range': time_range
}
try:
content = result.content
json_start = content.find('{')
json_end = content.rfind('}') + 1
if json_start >= 0 and json_end > json_start:
parsed = json.loads(content[json_start:json_end])
response_data['trends'] = parsed
except json.JSONDecodeError:
pass
# 儲存到歷史記錄
history_id = None
if save_to_history:
user_id = session.get('user_id')
history_id = ai_history_service.save_generation(
generation_type='trend_keywords',
output_content=result.content,
product_name=category,
input_keywords=[time_range],
model_name=result.model,
generation_duration=result.total_duration,
created_by=user_id
)
response_data['history_id'] = history_id
return jsonify({
'success': True,
'data': response_data
})
else:
return jsonify({'success': False, 'error': result.error}), 500
except Exception as e:
logger.error(f"趨勢關鍵字搜尋失敗:{e}")
return jsonify({'success': False, 'error': str(e)}), 500
@ai_bp.route('/api/ai/data_source_suggestion', methods=['POST'])
@login_required
def api_suggest_data_sources():
"""建議爬蟲資料來源"""
try:
data = request.get_json()
product_name = data.get('product_name', '')
product_category = data.get('product_category', '')
model = data.get('model', None)
if not product_name and not product_category:
return jsonify({'success': False, 'error': '請提供商品名稱或分類'}), 400
context = product_name if product_name else product_category
system_prompt = """你是一位資深電商數據分析師,專精於台灣市場資料蒐集。
你熟悉各種可以爬取的公開資料來源,包括:
- 電商平台momo、PChome、蝦皮、Yahoo購物等
- 社群媒體PTT、Dcard、Facebook 公開社團)
- 新聞媒體Google News、各大新聞網站
- 部落格與評測網站
- 價格比較網站
- 官方統計資料
請根據商品類型推薦適合的資料來源。"""
prompt = f"""針對「{context}」這類商品/分類,請建議可以爬取的資料來源:
請提供 5-8 個建議,每個建議包含:
1. 網站名稱
2. 網站 URL實際可訪問的網址
3. 可獲取的資訊類型(價格/評價/趨勢/討論等)
4. 爬取難度(簡單/中等/困難)
5. 建議優先度(高/中/低)
請用以下格式回答:
【建議 1】
- 網站XXX
- URLhttps://...
- 資訊XXX
- 難度:簡單
- 優先:高
請只回答實際存在且可訪問的台灣網站。"""
# 如果指定了模型,暫時切換
original_model = ollama_service.model
if model:
ollama_service.model = model
result = ollama_service.generate(prompt, system_prompt=system_prompt, temperature=0.5)
# 恢復原始模型
if model:
ollama_service.model = original_model
if result.success:
return jsonify({
'success': True,
'data': {
'suggestions': result.content,
'model': result.model,
'duration': round(result.total_duration, 2) if result.total_duration else None
}
})
else:
return jsonify({'success': False, 'error': result.error}), 500
except Exception as e:
logger.error(f"生成資料來源建議失敗: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
# ══════════════════════════════════════════════════════════════
# ICAIM — AI 競情中樞 (Intelligent Competitive Analysis & Intelligence Module)
# 2026-04-17 新增
# ══════════════════════════════════════════════════════════════
@ai_bp.route('/ai_intelligence')
@login_required
def ai_intelligence():
"""AI 競情中樞頁面"""
return render_template('ai_intelligence.html', active_page='ai_intelligence')
def _clone_icaim_dashboard_payload(payload):
cloned = dict(payload or {})
cloned['stats'] = dict(cloned.get('stats') or {})
cloned['competitors'] = [dict(row) for row in (cloned.get('competitors') or [])]
cloned['ai_recs'] = [dict(row) for row in (cloned.get('ai_recs') or [])]
return cloned
def _get_cached_icaim_dashboard_payload(allow_stale=False):
payload = _ICAIM_DASHBOARD_CACHE.get('payload')
if not payload:
return None
now_ts = time.time()
expires_at = float(_ICAIM_DASHBOARD_CACHE.get('expires_at') or 0)
cached_at = float(_ICAIM_DASHBOARD_CACHE.get('cached_at') or 0)
if now_ts <= expires_at:
cached = _clone_icaim_dashboard_payload(payload)
cached['cache_state'] = 'fresh'
return cached
if allow_stale and cached_at and now_ts - cached_at <= _ICAIM_DASHBOARD_STALE_TTL_SECONDS:
cached = _clone_icaim_dashboard_payload(payload)
cached['cache_state'] = 'stale'
cached['notice'] = '目前資料更新較慢,先顯示最近一次可用快照。'
return cached
return None
def _set_icaim_dashboard_cache(payload):
now_ts = time.time()
cached = _clone_icaim_dashboard_payload(payload)
cached['cache_state'] = 'fresh'
_ICAIM_DASHBOARD_CACHE.update({
'expires_at': now_ts + _ICAIM_DASHBOARD_TTL_SECONDS,
'cached_at': now_ts,
'payload': cached,
})
def _icaim_dashboard_empty_payload():
return {
'success': True,
'stats': {
'total_skus': 0,
'valid_competitor_prices': 0,
'high_risk_count': 0,
'total_ai_recs': 0,
'product_pick_count': 0,
'match_rate': 0,
'last_feeder_run': '資料整理中',
},
'competitors': [],
'ai_recs': [],
'cache_state': 'empty',
'notice': '競品資料正在整理,請稍後重新整理。'
}
def _create_icaim_dashboard_engine(database_path):
from sqlalchemy import create_engine
engine_kwargs = {}
if str(database_path).startswith(('postgresql://', 'postgresql+psycopg2://', 'postgres://')):
engine_kwargs['connect_args'] = {
'options': f'-c statement_timeout={_ICAIM_DB_STATEMENT_TIMEOUT_MS}'
}
return create_engine(database_path, **engine_kwargs)
def _get_cached_pchome_growth_payload():
now_ts = time.time()
cached_payload = _PCHOME_GROWTH_CACHE.get("payload")
cache_epoch = float(_PCHOME_GROWTH_CACHE.get("epoch") or 0)
if get_pchome_growth_cache_epoch() > cache_epoch:
return None
if cached_payload is not None and now_ts < float(_PCHOME_GROWTH_CACHE.get("expires_at") or 0):
payload = json.loads(json.dumps(cached_payload, ensure_ascii=False, default=str))
payload["cache_state"] = "fresh"
return payload
return None
def _set_pchome_growth_cache(payload):
cached = json.loads(json.dumps(payload, ensure_ascii=False, default=str))
cached["cache_state"] = "fresh"
_PCHOME_GROWTH_CACHE.update({
"expires_at": time.time() + _PCHOME_GROWTH_TTL_SECONDS,
"epoch": get_pchome_growth_cache_epoch(),
"payload": cached,
})
@ai_bp.route('/api/ai/pchome-growth/opportunities')
@login_required
def api_pchome_growth_opportunities():
"""PChome 業績成長自動化作戰清單,只讀、不呼叫 LLM、不寫 DB。"""
try:
from config import DATABASE_PATH
from services.pchome_revenue_growth_service import build_pchome_growth_opportunities
force_refresh = str(request.args.get('refresh') or '').strip().lower() in {'1', 'true', 'yes'}
limit = request.args.get('limit', 20, type=int)
limit = max(5, min(limit, 50))
if not force_refresh:
cached_payload = _get_cached_pchome_growth_payload()
if cached_payload:
return jsonify(cached_payload)
engine = _create_icaim_dashboard_engine(DATABASE_PATH)
try:
payload = build_pchome_growth_opportunities(engine, limit=limit)
finally:
engine.dispose()
payload["cache_state"] = "fresh"
_set_pchome_growth_cache(payload)
return jsonify(payload)
except Exception as exc:
logger.error("[PChomeGrowth] 作戰清單讀取失敗: %s", exc, exc_info=True)
return jsonify({
"success": False,
"error": "PChome 成長作戰清單暫時無法讀取,請稍後再試。",
}), 500
def _run_pchome_growth_momo_backfill(engine, limit):
from services.pchome_growth_momo_backfill_service import run_pchome_growth_momo_backfill
return run_pchome_growth_momo_backfill(engine, limit=limit)
@ai_bp.route('/api/ai/pchome-growth/backfill-momo-candidates', methods=['POST'])
@login_required
def api_pchome_growth_backfill_momo_candidates():
"""用高業績 PChome 商品主動反查 MOMO 候選,不呼叫 LLM。"""
payload = request.get_json(silent=True) or {}
try:
limit = max(1, min(int(payload.get('limit', 12)), 20))
except (TypeError, ValueError):
limit = 12
engine = None
try:
from config import DATABASE_PATH
engine = _create_icaim_dashboard_engine(DATABASE_PATH)
result = _run_pchome_growth_momo_backfill(engine, limit)
_PCHOME_GROWTH_CACHE.update({
"expires_at": 0.0,
"epoch": 0.0,
"payload": None,
})
return jsonify(result)
except Exception as exc:
logger.error("[PChomeGrowth] MOMO 對應補抓失敗: %s", exc, exc_info=True)
return jsonify({
"success": False,
"error": "MOMO 對應補抓暫時無法執行,請稍後再試。",
}), 500
finally:
if engine is not None:
engine.dispose()
@ai_bp.route('/api/ai/pchome-growth/source-contract')
@login_required
def api_pchome_growth_source_contract():
"""外部市場來源與報價欄位規格,只讀。"""
try:
from config import DATABASE_PATH
from services.external_market_offer_service import build_external_source_readiness
engine = _create_icaim_dashboard_engine(DATABASE_PATH)
try:
payload = build_external_source_readiness(engine)
finally:
engine.dispose()
return jsonify(payload)
except Exception as exc:
logger.error("[PChomeGrowth] 外部來源規格讀取失敗: %s", exc, exc_info=True)
return jsonify({
"success": False,
"error": "外部資料來源狀態暫時無法讀取,請稍後再試。",
}), 500
@ai_bp.route('/api/ai/pchome-growth/review-candidates')
@login_required
def api_pchome_growth_review_candidates():
"""列出 MOMO 待確認候選,只讀、不呼叫 LLM。"""
try:
from config import DATABASE_PATH
from services.external_market_offer_service import list_momo_review_candidates
limit = request.args.get('limit', 20, type=int)
limit = max(1, min(limit, 50))
engine = _create_icaim_dashboard_engine(DATABASE_PATH)
try:
payload = list_momo_review_candidates(engine, limit=limit)
finally:
engine.dispose()
status_code = 200 if payload.get("success") else 400
return jsonify(payload), status_code
except Exception as exc:
logger.error("[PChomeGrowth] MOMO 待確認候選讀取失敗: %s", exc, exc_info=True)
return jsonify({
"success": False,
"error": "MOMO 待確認候選暫時無法讀取,請稍後再試。",
}), 500
@ai_bp.route('/api/ai/pchome-growth/review-candidates/<int:offer_id>', methods=['POST'])
@login_required
def api_pchome_growth_update_review_candidate(offer_id):
"""確認或排除 MOMO 待確認候選,不呼叫 LLM。"""
payload = request.get_json(silent=True) or {}
action = str(payload.get("action") or "").strip().lower()
note = str(payload.get("note") or "").strip()
engine = None
try:
from config import DATABASE_PATH
from services.external_market_offer_service import update_momo_review_candidate
engine = _create_icaim_dashboard_engine(DATABASE_PATH)
result = update_momo_review_candidate(engine, offer_id, action, note=note)
if result.get("success"):
_PCHOME_GROWTH_CACHE.update({
"expires_at": 0.0,
"epoch": 0.0,
"payload": None,
})
status_code = 200 if result.get("success") else 400
return jsonify(result), status_code
except Exception as exc:
logger.error("[PChomeGrowth] MOMO 待確認候選更新失敗: %s", exc, exc_info=True)
return jsonify({
"success": False,
"error": "MOMO 待確認候選暫時無法更新,請稍後再試。",
}), 500
finally:
if engine is not None:
engine.dispose()
def _decode_external_offer_csv_upload(raw_bytes):
for encoding in ("utf-8-sig", "utf-8", "big5", "cp950"):
try:
return raw_bytes.decode(encoding)
except UnicodeDecodeError:
continue
return raw_bytes.decode("utf-8", errors="replace")
@ai_bp.route('/api/ai/pchome-growth/external-offers/csv-dry-run', methods=['POST'])
@login_required
def api_pchome_growth_external_offer_csv_dry_run():
"""手動 CSV 外部報價預檢,只讀、不寫 DB。"""
try:
from services.external_market_offer_service import dry_run_external_offer_csv
csv_text = ""
upload = request.files.get("file")
if upload and upload.filename:
raw_bytes = upload.read()
if len(raw_bytes) > 1024 * 1024:
return jsonify({
"success": False,
"error": "CSV 檔案太大,請先切成 1 MB 以下再預檢。",
}), 400
csv_text = _decode_external_offer_csv_upload(raw_bytes)
elif request.is_json:
csv_text = str((request.get_json(silent=True) or {}).get("csv_text") or "")
else:
csv_text = str(request.form.get("csv_text") or "")
payload = dry_run_external_offer_csv(csv_text, limit=200)
status_code = 200 if payload.get("success") else 400
return jsonify(payload), status_code
except Exception as exc:
logger.error("[PChomeGrowth] 外部報價 CSV 預檢失敗: %s", exc, exc_info=True)
return jsonify({
"success": False,
"error": "CSV 預檢暫時無法執行,請稍後再試。",
}), 500
@ai_bp.route('/api/ai/icaim/dashboard')
@login_required
def api_icaim_dashboard():
"""
競情儀表板數據 API
回傳:統計摘要 + 競品比價清單 + AI 決策日誌
"""
try:
from config import DATABASE_PATH
from sqlalchemy import inspect, text as sa_text
force_refresh = str(request.args.get('refresh') or '').strip().lower() in {'1', 'true', 'yes'}
if not force_refresh:
cached_payload = _get_cached_icaim_dashboard_payload()
if cached_payload:
return jsonify(cached_payload)
engine = _create_icaim_dashboard_engine(DATABASE_PATH)
with engine.connect() as conn:
inspector = inspect(conn)
has_external_offers = inspector.has_table("external_offers")
normalized_cte = """
valid_normalized AS (
SELECT NULL::text AS sku,
NULL::numeric AS pchome_price,
NULL::numeric AS momo_price,
NULL::numeric AS match_score,
'[]'::jsonb AS tags,
NULL::timestamp AS crawled_at,
'external_offers'::text AS data_source,
'自動同步資料層'::text AS data_source_label,
1 AS source_priority
WHERE FALSE
),
"""
if has_external_offers:
normalized_cte = """
normalized_raw AS (
SELECT DISTINCT ON (COALESCE(NULLIF(eo.momo_sku, ''), NULLIF(eo.source_product_id, '')))
COALESCE(NULLIF(eo.momo_sku, ''), NULLIF(eo.source_product_id, '')) AS sku,
NULLIF(
regexp_replace(
COALESCE(eo.raw_payload_json::jsonb ->> 'pchome_public_price', ''),
'[^0-9.-]',
'',
'g'
),
''
)::numeric AS pchome_price,
eo.price AS momo_price,
CASE
WHEN COALESCE(eo.quality_score, 0) > 1 THEN eo.quality_score / 100.0
ELSE COALESCE(eo.quality_score, 0)
END AS match_score,
to_jsonb(ARRAY['identity_v2', 'external_offers']) AS tags,
eo.observed_at AS crawled_at,
'external_offers'::text AS data_source,
'自動同步資料層'::text AS data_source_label,
1 AS source_priority
FROM external_offers eo
WHERE eo.source_code = 'momo_reference'
AND COALESCE(NULLIF(eo.momo_sku, ''), NULLIF(eo.source_product_id, '')) IS NOT NULL
AND eo.price IS NOT NULL
AND eo.price > 0
AND COALESCE(eo.quality_score, 0) >= 76
AND eo.match_status IN ('verified', 'usable', 'reviewed', 'exact', 'confirmed')
AND eo.data_quality_status IN ('verified', 'usable', 'reviewed')
AND (eo.expires_at IS NULL OR eo.expires_at > CURRENT_TIMESTAMP)
ORDER BY
COALESCE(NULLIF(eo.momo_sku, ''), NULLIF(eo.source_product_id, '')),
eo.observed_at DESC NULLS LAST,
eo.id DESC
),
valid_normalized AS (
SELECT *
FROM normalized_raw
WHERE pchome_price IS NOT NULL
AND pchome_price > 0
AND momo_price IS NOT NULL
AND momo_price > 0
),
"""
# ── 統計摘要 ────────────────────────────────────────────
# high_risk_count以 PChome 視角計算PChome 比 MOMO 外部參考價高 > 15%
compared_cte = f"""
WITH latest_momo AS (
SELECT
p.i_code AS sku,
p.name,
p.category,
latest_price.price AS momo_price
FROM products p
JOIN LATERAL (
SELECT pr.price
FROM price_records pr
WHERE pr.product_id = p.id
ORDER BY pr.timestamp DESC, pr.id DESC
LIMIT 1
) latest_price ON TRUE
WHERE p.status = 'ACTIVE'
),
{normalized_cte}
valid_legacy AS (
SELECT DISTINCT ON (cp.sku)
cp.sku,
cp.price AS pchome_price,
NULL::numeric AS momo_price,
cp.match_score,
COALESCE(cp.tags, '[]'::jsonb) || '["legacy_competitor_cache"]'::jsonb AS tags,
cp.crawled_at,
'competitor_prices'::text AS data_source,
'舊比價快取'::text AS data_source_label,
2 AS source_priority
FROM competitor_prices cp
WHERE cp.source = 'pchome'
AND cp.expires_at > CURRENT_TIMESTAMP
AND cp.price IS NOT NULL
AND cp.price > 0
AND COALESCE(cp.match_score, 0) >= {_ICAIM_MATCH_SCORE_FLOOR}
AND COALESCE(cp.tags, '[]'::jsonb) ? 'identity_v2'
ORDER BY cp.sku, cp.crawled_at DESC NULLS LAST
),
valid_competitor AS (
SELECT DISTINCT ON (sku)
sku,
pchome_price,
momo_price,
match_score,
tags,
crawled_at,
data_source,
data_source_label
FROM (
SELECT * FROM valid_normalized
UNION ALL
SELECT * FROM valid_legacy
) src
WHERE sku IS NOT NULL
AND pchome_price IS NOT NULL
AND pchome_price > 0
ORDER BY sku, source_priority ASC, crawled_at DESC NULLS LAST
),
compared AS (
SELECT
lm.sku,
lm.name,
lm.category,
COALESCE(vc.momo_price, lm.momo_price) AS momo_price,
vc.pchome_price,
ROUND(((vc.pchome_price - COALESCE(vc.momo_price, lm.momo_price)) / COALESCE(vc.momo_price, lm.momo_price) * 100)::numeric, 1) AS gap_pct,
vc.match_score,
vc.tags,
vc.crawled_at,
vc.data_source,
vc.data_source_label
FROM latest_momo lm
JOIN valid_competitor vc ON vc.sku = lm.sku
WHERE COALESCE(vc.momo_price, lm.momo_price) IS NOT NULL
AND COALESCE(vc.momo_price, lm.momo_price) > 0
)
"""
stats_sql = sa_text(compared_cte + """
SELECT
(SELECT COUNT(*) FROM products WHERE status = 'ACTIVE') AS total_skus,
(SELECT COUNT(*) FROM compared) AS valid_competitor_prices,
(SELECT COUNT(*) FROM compared WHERE gap_pct > 15) AS high_risk_count,
(SELECT COUNT(*) FROM ai_price_recommendations) AS total_ai_recs,
(SELECT COUNT(*) FROM ai_price_recommendations
WHERE strategy = 'product_pick' AND status = 'pending') AS product_pick_count,
(SELECT MAX(crawled_at) FROM valid_competitor) AS last_feeder_run,
(SELECT COALESCE(jsonb_object_agg(data_source_label, source_count), '{}'::jsonb)
FROM (
SELECT data_source_label, COUNT(*) AS source_count
FROM compared
GROUP BY data_source_label
) data_sources) AS competitor_data_source_counts
""")
# ── 競品比價(每個 SKU 只取最新且通過身份比對的一筆)──
competitor_sql = sa_text(compared_cte + """
SELECT *
FROM compared
ORDER BY gap_pct DESC NULLS LAST, crawled_at DESC NULLS LAST
LIMIT 200
""")
# ── AI 決策日誌 ─────────────────────────────────────────
ai_sql = sa_text("""
SELECT id, sku, name, strategy, confidence,
momo_price, pchome_price, gap_pct, sales_7d_delta,
reason, status, model_footprint, created_at
FROM ai_price_recommendations
ORDER BY created_at DESC
LIMIT 50
""")
stats_row = conn.execute(stats_sql).fetchone()
comp_rows = conn.execute(competitor_sql).fetchall()
ai_rows = conn.execute(ai_sql).fetchall()
# 格式化競品資料
competitors = []
for r in comp_rows:
tags = r.tags or []
if isinstance(tags, str):
try:
tags = json.loads(tags)
except Exception:
tags = []
gap = float(r.gap_pct) if r.gap_pct is not None else 0.0
competitors.append({
'sku': r.sku,
'name': r.name[:45] if r.name else '',
'category': r.category or '',
'momo_price': float(r.momo_price),
'pchome_price': float(r.pchome_price),
'gap_pct': gap,
'match_score': float(r.match_score),
'tags': tags,
'crawled_at': r.crawled_at.strftime('%m/%d %H:%M') if r.crawled_at else '',
'risk': 'HIGH' if gap > 15 else ('MED' if gap > 5 else 'LOW'),
'data_source': r.data_source or 'competitor_prices',
'data_source_label': r.data_source_label or '舊比價快取',
})
# 格式化 AI 決策記錄
ai_recs = []
for r in ai_rows:
footprint = r.model_footprint or {}
analyst_fp = footprint.get('analyst', {}) if footprint else {}
dispatch_fp = footprint.get('dispatcher', {}) if footprint else {}
ai_recs.append({
'id': r.id,
'sku': r.sku,
'name': r.name[:35] if r.name else '',
'strategy': r.strategy or 'promote',
'confidence': float(r.confidence) if r.confidence else 0.0,
'momo_price': float(r.momo_price) if r.momo_price else 0.0,
'pchome_price': float(r.pchome_price) if r.pchome_price else 0.0,
'gap_pct': float(r.gap_pct) if r.gap_pct else 0.0,
'sales_7d_delta': float(r.sales_7d_delta) if r.sales_7d_delta else 0.0,
'reason': r.reason or '',
'status': r.status,
# AI 模型足跡(供儀表板顯示推理開銷)
'analyst': analyst_fp.get('model', 'hermes3'),
'hermes_duration': analyst_fp.get('duration_sec', 0),
'hermes_tokens': analyst_fp.get('tokens', 0),
'dispatcher': dispatch_fp.get('model', 'NIM'),
'nim_tokens': dispatch_fp.get('total_tokens', 0),
'created_at': r.created_at.strftime('%m/%d %H:%M') if r.created_at else '',
})
stats = dict(stats_row._mapping) if stats_row else {}
source_counts = stats.get('competitor_data_source_counts') or {}
if isinstance(source_counts, str):
try:
source_counts = json.loads(source_counts)
except Exception:
source_counts = {}
last_feeder_run = stats.get('last_feeder_run')
last_feeder = (
last_feeder_run.strftime('%Y-%m-%d %H:%M')
if last_feeder_run else '尚未執行'
)
total_skus = int(stats.get('total_skus') or 0)
valid_competitor_prices = int(stats.get('valid_competitor_prices') or 0)
payload = {
'success': True,
'stats': {
'total_skus': total_skus,
'valid_competitor_prices': valid_competitor_prices,
'high_risk_count': int(stats.get('high_risk_count') or 0),
'total_ai_recs': int(stats.get('total_ai_recs') or 0),
'product_pick_count': int(stats.get('product_pick_count') or 0),
'match_rate': round(valid_competitor_prices / max(total_skus, 1) * 100, 1),
'last_feeder_run': last_feeder,
'competitor_data_source_counts': source_counts,
},
'competitors': competitors,
'ai_recs': ai_recs,
'cache_state': 'fresh',
}
_set_icaim_dashboard_cache(payload)
return jsonify(payload)
except Exception as e:
logger.warning(f"[ICAIM] dashboard API 讀取降級: {e}")
cached_payload = _get_cached_icaim_dashboard_payload(allow_stale=True)
if cached_payload:
return jsonify(cached_payload)
return jsonify(_icaim_dashboard_empty_payload())
@ai_bp.route('/api/ai/product-picks/generate', methods=['POST'])
@login_required
def api_generate_product_picks():
"""手動產生 PChome 銷售用 AI 建議挑品清單,結果寫入 DB。"""
try:
from config import DATABASE_PATH
from sqlalchemy import create_engine
from services.ai_product_pick_agent import generate_product_pick_list
from services.cache_manager import clear_dashboard_cache
from routes.dashboard_routes import get_full_dashboard_data
payload = request.get_json(silent=True) or {}
limit = int(payload.get('limit', 50))
limit = max(5, min(limit, 100))
engine = create_engine(DATABASE_PATH)
result = generate_product_pick_list(engine, limit=limit)
clear_dashboard_cache()
dashboard_cache_warmed = False
try:
dashboard_cache_warmed = bool(get_full_dashboard_data())
except Exception as warm_err:
logger.warning(f"[ProductPickAgent] 商品看板快取預熱失敗: {warm_err}")
return jsonify({
'success': True,
'message': f'AI 挑品清單已產生:寫入 {result.written} 筆,候選 {result.candidates}',
'data': {
'candidates': result.candidates,
'written': result.written,
'generated_at': result.generated_at,
'dashboard_cache_warmed': dashboard_cache_warmed,
'picks': result.picks[:50],
}
})
except Exception as e:
logger.error(f"[ProductPickAgent] 產生挑品清單失敗: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
def _feeder_result_payload(result):
return {
'total_skus': int(getattr(result, 'total_skus', 0) or 0),
'matched': int(getattr(result, 'matched', 0) or 0),
'skipped_no_result': int(getattr(result, 'skipped_no_result', 0) or 0),
'skipped_low_score': int(getattr(result, 'skipped_low_score', 0) or 0),
'errors': int(getattr(result, 'errors', 0) or 0),
'history_written': int(getattr(result, 'history_written', 0) or 0),
'attempts_written': int(getattr(result, 'attempts_written', 0) or 0),
'duration_sec': round(float(getattr(result, 'duration_sec', 0) or 0), 2),
}
def _empty_feeder_result_payload():
return {
'total_skus': 0,
'matched': 0,
'skipped_no_result': 0,
'skipped_low_score': 0,
'errors': 0,
'history_written': 0,
'attempts_written': 0,
'duration_sec': 0.0,
}
def _pick_result_payload(result):
return {
'candidates': int(getattr(result, 'candidates', 0) or 0),
'written': int(getattr(result, 'written', 0) or 0),
'generated_at': getattr(result, 'generated_at', None),
}
def _combined_feeder_payload(revalidation_result, feeder_result, stale_refresh_result=None):
stale_refresh_payload = (
_feeder_result_payload(stale_refresh_result)
if stale_refresh_result is not None
else _empty_feeder_result_payload()
)
revalidation_payload = _feeder_result_payload(revalidation_result)
feeder_payload = _feeder_result_payload(feeder_result)
return {
'total_skus': (
stale_refresh_payload['total_skus']
+ revalidation_payload['total_skus']
+ feeder_payload['total_skus']
),
'matched': (
stale_refresh_payload['matched']
+ revalidation_payload['matched']
+ feeder_payload['matched']
),
'skipped_no_result': (
stale_refresh_payload['skipped_no_result']
+ revalidation_payload['skipped_no_result']
+ feeder_payload['skipped_no_result']
),
'skipped_low_score': (
stale_refresh_payload['skipped_low_score']
+ revalidation_payload['skipped_low_score']
+ feeder_payload['skipped_low_score']
),
'errors': stale_refresh_payload['errors'] + revalidation_payload['errors'] + feeder_payload['errors'],
'history_written': (
stale_refresh_payload['history_written']
+ revalidation_payload['history_written']
+ feeder_payload['history_written']
),
'attempts_written': (
stale_refresh_payload['attempts_written']
+ revalidation_payload['attempts_written']
+ feeder_payload['attempts_written']
),
'duration_sec': round(
stale_refresh_payload['duration_sec']
+ revalidation_payload['duration_sec']
+ feeder_payload['duration_sec'],
2,
),
'stale_identity_refresh': stale_refresh_payload,
'retryable_candidate_revalidation': revalidation_payload,
'unmatched_priority_backfill': feeder_payload,
}
@ai_bp.route('/api/ai/pchome-match/backfill', methods=['POST'])
@login_required
def api_pchome_match_backfill():
"""背景補抓尚未有有效 PChome 配對的高價商品,提高比價覆蓋率。"""
import threading
payload = request.get_json(silent=True) or {}
try:
limit = max(5, min(int(payload.get('limit', 60)), 160))
except (TypeError, ValueError):
limit = 60
from services.pchome_backfill_status import (
PchomeBackfillAlreadyRunning,
fail_pchome_backfill_run,
finish_pchome_backfill_run,
start_pchome_backfill_run,
update_pchome_backfill_run,
)
try:
run = start_pchome_backfill_run(
limit=limit,
operator=session.get('username') or 'web',
)
except PchomeBackfillAlreadyRunning as exc:
return jsonify({
'success': False,
'message': 'PChome 補抓已在執行中,請稍後查看進度',
'data': _get_pchome_backfill_status_payload(),
}), 409
run_id = run['run_id']
def _run_backfill():
engine = None
try:
from config import DATABASE_PATH
from sqlalchemy import create_engine
from services.ai_product_pick_agent import generate_product_pick_list
from services.competitor_intel_repository import clear_competitor_intel_cache
from services.competitor_price_feeder import CompetitorPriceFeeder
engine = create_engine(DATABASE_PATH)
feeder = CompetitorPriceFeeder(engine=engine)
stale_refresh_limit = max(5, min(40, max(5, limit // 3)))
update_pchome_backfill_run(
run_id,
stage='refreshing_stale',
message=f'正在先刷新 {stale_refresh_limit} 筆已知 PChome identity補回可用比價新鮮度',
)
stale_refresh_result = feeder.run_expired_identity_refresh(limit=stale_refresh_limit)
revalidation_limit = min(limit, 80)
update_pchome_backfill_run(
run_id,
stage='revalidating',
message=f'正在重新評分 {revalidation_limit} 筆近門檻 PChome 候選',
)
revalidation_result = feeder.run_retryable_candidate_revalidation(
limit=revalidation_limit,
min_score=0.70,
)
unmatched_limit = max(5, min(limit, limit - int(getattr(revalidation_result, 'total_skus', 0) or 0)))
update_pchome_backfill_run(
run_id,
stage='matching',
message=f'正在補抓 {unmatched_limit} 筆高優先尚未搜尋商品',
)
result = feeder.run_unmatched_priority(limit=unmatched_limit)
result_payload = _combined_feeder_payload(
revalidation_result,
result,
stale_refresh_result=stale_refresh_result,
)
update_pchome_backfill_run(
run_id,
stage='generating_picks',
message='PChome 補抓完成,正在重算 AI 挑品清單',
result=result_payload,
)
pick_result = generate_product_pick_list(engine, limit=50)
pick_payload = _pick_result_payload(pick_result)
update_pchome_backfill_run(
run_id,
stage='clearing_cache',
message='AI 挑品已重算,正在清除看板快取',
result=result_payload,
pick_result=pick_payload,
)
from services.cache_manager import clear_dashboard_cache
clear_dashboard_cache()
clear_competitor_intel_cache()
finish_pchome_backfill_run(
run_id,
result=result_payload,
pick_result=pick_payload,
message=(
f"PChome 比價補強完成:刷新/重評/補抓 {result_payload['total_skus']} 筆、"
f"新增/更新 {result_payload['matched']} 筆、"
f"AI 挑品寫入 {pick_payload['written']}"
),
)
logger.info(
"[PChomeBackfill] done total=%s matched=%s no=%s low=%s errors=%s history=%s duration=%ss pick_written=%s",
result_payload['total_skus'],
result_payload['matched'],
result_payload['skipped_no_result'],
result_payload['skipped_low_score'],
result_payload['errors'],
result_payload['history_written'],
result_payload['duration_sec'],
pick_result.written,
)
except Exception as exc:
fail_pchome_backfill_run(run_id, str(exc))
logger.error(f"[PChomeBackfill] 背景補抓失敗: {exc}", exc_info=True)
finally:
if engine is not None:
engine.dispose()
thread = threading.Thread(target=_run_backfill, daemon=True)
thread.start()
return jsonify({
'success': True,
'message': f'已啟動 PChome 比價補強,會先刷新舊 identity再重評近門檻與補抓 {limit} 筆高價未配對商品;完成後會重算 AI 挑品清單',
'limit': limit,
'data': _get_pchome_backfill_status_payload(),
}), 202
@ai_bp.route('/api/ai/pchome-match/refresh-stale', methods=['POST'])
@login_required
def api_pchome_match_refresh_stale():
"""背景刷新已建立 identity_v2 但價格過期的 PChome 商品。"""
import threading
payload = request.get_json(silent=True) or {}
try:
limit = max(5, min(int(payload.get('limit', 120)), 300))
except (TypeError, ValueError):
limit = 120
from services.pchome_backfill_status import (
PchomeBackfillAlreadyRunning,
fail_pchome_backfill_run,
finish_pchome_backfill_run,
start_pchome_backfill_run,
update_pchome_backfill_run,
)
try:
run = start_pchome_backfill_run(
limit=limit,
operator=session.get('username') or 'web',
)
except PchomeBackfillAlreadyRunning:
return jsonify({
'success': False,
'message': 'PChome 產線已有任務執行中,請稍後查看進度',
'data': _get_pchome_backfill_status_payload(),
}), 409
run_id = run['run_id']
def _run_refresh_stale():
engine = None
try:
from config import DATABASE_PATH
from sqlalchemy import create_engine
from services.ai_product_pick_agent import generate_product_pick_list
from services.cache_manager import clear_dashboard_cache
from services.competitor_intel_repository import clear_competitor_intel_cache
from services.competitor_price_feeder import CompetitorPriceFeeder
update_pchome_backfill_run(
run_id,
stage='refreshing_stale',
message=f'正在刷新 {limit} 筆過期 identity_v2 PChome 價格',
)
engine = create_engine(DATABASE_PATH)
feeder = CompetitorPriceFeeder(engine=engine)
result = feeder.run_expired_identity_refresh(limit=limit)
result_payload = _feeder_result_payload(result)
update_pchome_backfill_run(
run_id,
stage='generating_picks',
message='過期價格刷新完成,正在重算 AI 挑品清單',
result=result_payload,
)
pick_result = generate_product_pick_list(engine, limit=50)
pick_payload = _pick_result_payload(pick_result)
update_pchome_backfill_run(
run_id,
stage='clearing_cache',
message='AI 挑品已重算,正在清除看板快取',
result=result_payload,
pick_result=pick_payload,
)
clear_dashboard_cache()
clear_competitor_intel_cache()
finish_pchome_backfill_run(
run_id,
result=result_payload,
pick_result=pick_payload,
message=(
f"PChome 過期價格刷新完成:檢查 {result_payload['total_skus']} 筆、"
f"更新 {result_payload['matched']} 筆、"
f"AI 挑品寫入 {pick_payload['written']}"
),
)
logger.info(
"[PChomeRefreshStale] done total=%s matched=%s no=%s low=%s errors=%s history=%s duration=%ss pick_written=%s",
result_payload['total_skus'],
result_payload['matched'],
result_payload['skipped_no_result'],
result_payload['skipped_low_score'],
result_payload['errors'],
result_payload['history_written'],
result_payload['duration_sec'],
pick_result.written,
)
except Exception as exc:
fail_pchome_backfill_run(run_id, str(exc))
logger.error(f"[PChomeRefreshStale] 背景刷新失敗: {exc}", exc_info=True)
finally:
if engine is not None:
engine.dispose()
thread = threading.Thread(target=_run_refresh_stale, daemon=True)
thread.start()
return jsonify({
'success': True,
'message': f'已啟動 PChome 過期價格刷新,優先處理 {limit} 筆已建立 identity_v2 的舊價格',
'limit': limit,
'data': _get_pchome_backfill_status_payload(),
}), 202
@ai_bp.route('/api/ai/pchome-match/recover-stale', methods=['POST'])
@login_required
def api_pchome_match_recover_stale():
"""背景搜尋救援過期 identity_v2舊 ID 缺失或低分時才允許 fresh search recovery。"""
import threading
if os.getenv("PCHOME_STALE_RECOVERY_ENABLED", "false").lower() not in {"1", "true", "yes", "on"}:
return jsonify({
'success': False,
'message': 'PChome 過期 identity 搜尋救援目前僅開放只讀預覽;正式 smoke 顯示成功率不足,需開啟 PCHOME_STALE_RECOVERY_ENABLED 才能執行',
'data': _get_pchome_backfill_status_payload(),
}), 409
payload = request.get_json(silent=True) or {}
try:
limit = max(5, min(int(payload.get('limit', 40)), 80))
except (TypeError, ValueError):
limit = 40
from services.pchome_backfill_status import (
PchomeBackfillAlreadyRunning,
fail_pchome_backfill_run,
finish_pchome_backfill_run,
start_pchome_backfill_run,
update_pchome_backfill_run,
)
try:
run = start_pchome_backfill_run(
limit=limit,
operator=session.get('username') or 'web',
)
except PchomeBackfillAlreadyRunning:
return jsonify({
'success': False,
'message': 'PChome 產線已有任務執行中,請稍後查看進度',
'data': _get_pchome_backfill_status_payload(),
}), 409
run_id = run['run_id']
def _run_recover_stale():
engine = None
try:
from config import DATABASE_PATH
from sqlalchemy import create_engine
from services.ai_product_pick_agent import generate_product_pick_list
from services.cache_manager import clear_dashboard_cache
from services.competitor_intel_repository import clear_competitor_intel_cache
from services.competitor_price_feeder import CompetitorPriceFeeder
update_pchome_backfill_run(
run_id,
stage='recovering_stale',
message=f'正在搜尋救援 {limit} 筆過期 identity_v2舊 ID 失效或低分才改找新候選',
)
engine = create_engine(DATABASE_PATH)
feeder = CompetitorPriceFeeder(engine=engine)
result = feeder.run_expired_identity_search_recovery(limit=limit)
result_payload = _feeder_result_payload(result)
update_pchome_backfill_run(
run_id,
stage='generating_picks',
message='搜尋救援完成,正在重算 AI 挑品清單',
result=result_payload,
)
pick_result = generate_product_pick_list(engine, limit=50)
pick_payload = _pick_result_payload(pick_result)
update_pchome_backfill_run(
run_id,
stage='clearing_cache',
message='AI 挑品已重算,正在清除看板快取',
result=result_payload,
pick_result=pick_payload,
)
clear_dashboard_cache()
clear_competitor_intel_cache()
finish_pchome_backfill_run(
run_id,
result=result_payload,
pick_result=pick_payload,
message=(
f"PChome 過期 identity 搜尋救援完成:檢查 {result_payload['total_skus']} 筆、"
f"更新 {result_payload['matched']} 筆、"
f"AI 挑品寫入 {pick_payload['written']}"
),
)
logger.info(
"[PChomeRecoverStale] done total=%s matched=%s no=%s low=%s errors=%s history=%s duration=%ss pick_written=%s",
result_payload['total_skus'],
result_payload['matched'],
result_payload['skipped_no_result'],
result_payload['skipped_low_score'],
result_payload['errors'],
result_payload['history_written'],
result_payload['duration_sec'],
pick_result.written,
)
except Exception as exc:
fail_pchome_backfill_run(run_id, str(exc))
logger.error(f"[PChomeRecoverStale] 背景搜尋救援失敗: {exc}", exc_info=True)
finally:
if engine is not None:
engine.dispose()
thread = threading.Thread(target=_run_recover_stale, daemon=True)
thread.start()
return jsonify({
'success': True,
'message': f'已啟動 PChome 過期 identity 搜尋救援,優先處理 {limit} 筆舊價格',
'limit': limit,
'data': _get_pchome_backfill_status_payload(),
}), 202
def _build_pchome_backfill_coverage_payload():
"""讀取目前 PChome 身份覆蓋與價格新鮮度,供補抓狀態卡判斷下一步。"""
engine = None
try:
from config import DATABASE_PATH
from sqlalchemy import create_engine
from services.competitor_intel_repository import fetch_competitor_coverage
engine = create_engine(DATABASE_PATH)
coverage = fetch_competitor_coverage(engine) or {}
revalidation_preview = _build_pchome_revalidation_preview_payload(engine)
stale_recovery_preview = _build_pchome_stale_recovery_preview_payload(engine)
operation_backlog = _build_pchome_operation_backlog(
coverage,
revalidation_preview,
stale_recovery_preview,
)
return {
'available': True,
'active_with_price': int(coverage.get('active_with_price') or 0),
'valid_matches': int(coverage.get('valid_matches') or 0),
'match_rate': float(coverage.get('match_rate') or 0),
'fresh_matches': int(coverage.get('fresh_matches') or 0),
'fresh_match_rate': float(coverage.get('fresh_match_rate') or 0),
'decision_ready_matches': int(
coverage.get('decision_ready_matches') or coverage.get('fresh_matches') or 0
),
'decision_ready_rate': float(coverage.get('decision_ready_rate') or 0),
'decision_support_count': int(coverage.get('decision_support_count') or 0),
'decision_support_rate': float(coverage.get('decision_support_rate') or 0),
'decision_support_non_exact_count': int(coverage.get('decision_support_non_exact_count') or 0),
'catalog_comparable_count': int(coverage.get('catalog_comparable_count') or 0),
'catalog_comparable_rate': float(coverage.get('catalog_comparable_rate') or 0),
'catalog_variant_review_count': int(coverage.get('catalog_variant_review_count') or 0),
'catalog_unit_review_count': int(coverage.get('catalog_unit_review_count') or 0),
'catalog_identity_review_count': int(coverage.get('catalog_identity_review_count') or 0),
'catalog_review_plan': coverage.get('catalog_review_plan') or {},
'stale_matches': int(coverage.get('stale_matches') or 0),
'pending': int(coverage.get('pending') or 0),
'actionable_review_count': int(coverage.get('actionable_review_count') or 0),
'unit_comparable_count': int(coverage.get('unit_comparable_count') or 0),
'rescore_accepted_count': int(coverage.get('rescore_accepted_count') or 0),
'manual_accept_count': int(coverage.get('manual_accept_count') or 0),
'manual_reject_count': int(coverage.get('manual_reject_count') or 0),
'manual_unit_price_count': int(coverage.get('manual_unit_price_count') or 0),
'revalidation_preview': revalidation_preview,
'retryable_candidate_preview_count': int(revalidation_preview.get('candidate_count') or 0),
'retryable_candidate_preview_has_more': bool(revalidation_preview.get('has_more')),
'review_gated_exact_preview_count': int(revalidation_preview.get('review_gated_count') or 0),
'stale_recovery_preview': stale_recovery_preview,
'stale_recovery_preview_count': int(stale_recovery_preview.get('candidate_count') or 0),
'stale_recovery_preview_has_more': bool(stale_recovery_preview.get('has_more')),
'operation_backlog': operation_backlog,
'recommended_next_action': _pick_pchome_recommended_next_action(operation_backlog),
}
except Exception as exc:
logger.warning(f"[PChomeBackfill] coverage snapshot unavailable: {exc}")
return {
'available': False,
'error': str(exc),
}
finally:
if engine is not None:
engine.dispose()
def _build_pchome_operation_backlog(coverage, revalidation_preview, stale_recovery_preview):
"""把低覆蓋率拆成可操作 backlog避免只回傳一個模糊百分比。"""
retryable_count = int((revalidation_preview or {}).get('candidate_count') or 0)
return {
'refresh_stale': {
'label': '刷新舊 identity',
'count': int((coverage or {}).get('stale_matches') or 0),
'endpoint': '/api/ai/pchome-match/refresh-stale',
},
'retryable_revalidation': {
'label': '重評近門檻',
'count': retryable_count,
'endpoint': '/api/ai/pchome-match/backfill',
},
'unmatched_priority': {
'label': '補抓未配對',
'count': int((coverage or {}).get('pending') or 0),
'endpoint': '/api/ai/pchome-match/backfill',
},
'manual_review': {
'label': '人工覆核',
'count': int((coverage or {}).get('actionable_review_count') or 0),
'endpoint': '/vendor-stockout/list',
},
'unit_price_review': {
'label': '單位價覆核',
'count': int((coverage or {}).get('unit_comparable_count') or 0),
'endpoint': '/vendor-stockout/list?review_status=unit_comparable',
},
'catalog_variant_review': {
'label': '型錄選項待核',
'count': int((coverage or {}).get('catalog_variant_review_count') or 0),
'endpoint': '/vendor-stockout/list?review_status=catalog_variant_review',
},
'catalog_unit_review': {
'label': '型錄入數待核',
'count': int((coverage or {}).get('catalog_unit_review_count') or 0),
'endpoint': '/vendor-stockout/list?review_status=catalog_unit_review',
},
'catalog_identity_review': {
'label': '型錄身份待核',
'count': int((coverage or {}).get('catalog_identity_review_count') or 0),
'endpoint': '/vendor-stockout/list?review_status=catalog_identity_review',
},
'stale_search_recovery_preview': {
'label': '過期 identity 搜尋救援預覽',
'count': int((stale_recovery_preview or {}).get('candidate_count') or 0),
'endpoint': '/api/ai/pchome-match/backfill/status',
'read_only': True,
},
}
def _pick_pchome_recommended_next_action(operation_backlog):
"""根據 backlog 選一個清楚的下一步;不在這裡自動執行任何寫入。"""
retryable = int(operation_backlog.get('retryable_revalidation', {}).get('count') or 0)
pending = int(operation_backlog.get('unmatched_priority', {}).get('count') or 0)
stale = int(operation_backlog.get('refresh_stale', {}).get('count') or 0)
manual = int(operation_backlog.get('manual_review', {}).get('count') or 0)
unit_price = int(operation_backlog.get('unit_price_review', {}).get('count') or 0)
if retryable > 0 or pending > 0:
return {
'key': 'run_backfill',
'label': '執行比價補強',
'reason': '同時刷新少量舊 identity、重評近門檻候選並補抓高優先未配對商品。',
'endpoint': '/api/ai/pchome-match/backfill',
}
if stale > 0:
return {
'key': 'refresh_stale',
'label': '刷新過期 identity',
'reason': '已有身份配對但價格過期,先補回可決策的新鮮價格。',
'endpoint': '/api/ai/pchome-match/refresh-stale',
}
if unit_price > 0:
return {
'key': 'review_unit_price',
'label': '處理單位價覆核',
'reason': '商品可比較但需要人工確認單位價換算,避免錯誤總價決策。',
'endpoint': '/vendor-stockout/list?review_status=unit_comparable',
}
if manual > 0:
return {
'key': 'manual_review',
'label': '處理人工覆核',
'reason': '剩餘項目需要人工判斷款式、色號、件數或既有候選衝突。',
'endpoint': '/vendor-stockout/list',
}
return {
'key': 'observe',
'label': '維持觀測',
'reason': '目前沒有明確的自動補強 backlog。',
'endpoint': '/api/ai/pchome-match/backfill/status',
}
def _build_pchome_revalidation_preview_payload(engine):
"""回傳近門檻重評候選的只讀預覽,供操作員判讀下一步。"""
now_ts = datetime.now().timestamp()
cached_payload = _PCHOME_REVALIDATION_PREVIEW_CACHE.get("payload")
if cached_payload is not None and now_ts < float(_PCHOME_REVALIDATION_PREVIEW_CACHE.get("expires_at") or 0):
return cached_payload
try:
from services.competitor_price_feeder import CompetitorPriceFeeder
payload = CompetitorPriceFeeder(engine=engine).preview_retryable_candidate_revalidation(
limit=80,
min_score=0.70,
)
except Exception as exc:
logger.warning(f"[PChomeBackfill] revalidation preview unavailable: {exc}")
payload = {
'available': False,
'error': str(exc),
'candidate_count': 0,
'has_more': False,
'review_gated_count': 0,
'boundary': 'read_only_no_crawl_no_llm_no_db_write',
}
_PCHOME_REVALIDATION_PREVIEW_CACHE.update({
"expires_at": now_ts + _PCHOME_REVALIDATION_PREVIEW_TTL_SECONDS,
"payload": payload,
})
return payload
def _build_pchome_stale_recovery_preview_payload(engine):
"""回傳過期 identity_v2 搜尋救援候選的只讀預覽。"""
now_ts = datetime.now().timestamp()
cached_payload = _PCHOME_STALE_RECOVERY_PREVIEW_CACHE.get("payload")
if cached_payload is not None and now_ts < float(_PCHOME_STALE_RECOVERY_PREVIEW_CACHE.get("expires_at") or 0):
return cached_payload
try:
from services.competitor_price_feeder import CompetitorPriceFeeder
payload = CompetitorPriceFeeder(engine=engine).preview_expired_identity_recovery(
limit=40,
)
except Exception as exc:
logger.warning(f"[PChomeBackfill] stale recovery preview unavailable: {exc}")
payload = {
'available': False,
'error': str(exc),
'candidate_count': 0,
'has_more': False,
'boundary': 'read_only_no_crawl_no_llm_no_db_write',
}
_PCHOME_STALE_RECOVERY_PREVIEW_CACHE.update({
"expires_at": now_ts + _PCHOME_STALE_RECOVERY_PREVIEW_TTL_SECONDS,
"payload": payload,
})
return payload
def _get_pchome_backfill_status_payload():
from services.pchome_backfill_status import get_pchome_backfill_status
status = get_pchome_backfill_status()
status['coverage'] = _build_pchome_backfill_coverage_payload()
return status
@ai_bp.route('/api/ai/pchome-match/backfill/status', methods=['GET'])
@login_required
def api_pchome_match_backfill_status():
"""取得 PChome 未搜尋補抓的背景執行狀態。"""
return jsonify({
'success': True,
'data': _get_pchome_backfill_status_payload(),
})
@ai_bp.route('/api/ai/icaim/trigger', methods=['POST'])
@login_required
def api_icaim_trigger():
"""
手動觸發一次完整 ICAIM 分析(非同步,背景執行)
立即回傳 202後台跑 Hermes → NemoTron → Telegram
"""
import threading
def _run_pipeline():
try:
from config import DATABASE_PATH
from sqlalchemy import create_engine
from services.hermes_analyst_service import HermesAnalystService
from services.nemoton_dispatcher_service import NemotronDispatcher as NemotronDispatcherService
from services.notification_manager import NotificationManager
engine = create_engine(DATABASE_PATH)
hermes_svc = HermesAnalystService(engine=engine)
t0 = datetime.now()
result = hermes_svc.run()
hermes_duration = round((datetime.now() - t0).total_seconds(), 1)
logger.info(f"[ICAIM][手動觸發] Hermes 完成 threats={len(result.threats)} 耗時={hermes_duration}s")
if result.threats:
hermes_stats = {
'model': 'Hermes 3', # 顯示用識別符;下游 _build_footprint 不讀此 key
'duration_sec': hermes_duration,
'tokens': result.hermes_tokens,
}
notifier = NotificationManager()
dispatcher = NemotronDispatcherService(
notification_manager=notifier, engine=engine
)
dispatch_result = dispatcher.dispatch(result.threats, hermes_stats=hermes_stats)
logger.info(f"[ICAIM][手動觸發] NIM dispatch 完成: {dispatch_result}")
else:
logger.info("[ICAIM][手動觸發] 無威脅商品,跳過 NIM dispatch")
except Exception as ex:
logger.error(f"[ICAIM][手動觸發] 背景執行失敗: {ex}")
thread = threading.Thread(target=_run_pipeline, daemon=True)
thread.start()
return jsonify({
'success': True,
'message': '分析已在背景啟動,約 30~60 秒後完成,請重新整理儀表板查看結果',
}), 202