493 lines
17 KiB
Python
493 lines
17 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
Google Gemini AI 服務模組
|
||
負責與 Google Gemini API 互動,提供文案生成、關鍵字提取等功能
|
||
支援 gemini-1.5-flash, gemini-2.5-flash, gemini-2.5-pro 模型
|
||
"""
|
||
|
||
import os
|
||
import time
|
||
import logging
|
||
from typing import Optional, Dict, Any, List
|
||
from dataclasses import dataclass, field
|
||
from datetime import date
|
||
from services.gemini_guard import (
|
||
gemini_disabled_message,
|
||
get_gemini_api_key,
|
||
is_gemini_fallback_enabled,
|
||
)
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# Gemini 設定 - 支援環境變數覆蓋;API key 一律透過 gemini_guard 取得
|
||
DEFAULT_GEMINI_MODEL = os.getenv('GEMINI_MODEL', 'gemini-1.5-flash')
|
||
GEMINI_TIMEOUT = int(os.getenv('GEMINI_TIMEOUT', '60')) # 秒
|
||
|
||
# Gemini 定價 (USD per 1M tokens) - 2024/2025 定價
|
||
GEMINI_PRICING = {
|
||
'gemini-1.5-flash': {'input': 0.075, 'output': 0.30},
|
||
'gemini-1.5-flash-latest': {'input': 0.075, 'output': 0.30},
|
||
'gemini-1.5-pro': {'input': 1.25, 'output': 5.00},
|
||
'gemini-1.5-pro-latest': {'input': 1.25, 'output': 5.00},
|
||
'gemini-2.5-flash': {'input': 0.10, 'output': 0.40},
|
||
'gemini-2.5-flash-preview-05-20': {'input': 0.10, 'output': 0.40},
|
||
'gemini-2.5-pro': {'input': 1.25, 'output': 10.00},
|
||
'gemini-2.5-pro-preview-05-06': {'input': 1.25, 'output': 10.00},
|
||
}
|
||
|
||
# 可選模型清單(供 UI 選擇)
|
||
AVAILABLE_GEMINI_MODELS = [
|
||
{'id': 'gemini-1.5-flash', 'name': 'Gemini 1.5 Flash', 'description': '最便宜、速度快'},
|
||
{'id': 'gemini-2.5-flash', 'name': 'Gemini 2.5 Flash', 'description': '性價比最佳'},
|
||
{'id': 'gemini-2.5-pro', 'name': 'Gemini 2.5 Pro', 'description': '最強大、高品質'},
|
||
]
|
||
|
||
|
||
@dataclass
|
||
class GeminiResponse:
|
||
"""Gemini 回應結構"""
|
||
success: bool
|
||
content: str
|
||
model: str
|
||
error: Optional[str] = None
|
||
total_duration: Optional[float] = None
|
||
input_tokens: int = 0
|
||
output_tokens: int = 0
|
||
total_tokens: int = 0
|
||
input_cost: float = 0.0
|
||
output_cost: float = 0.0
|
||
total_cost: float = 0.0
|
||
|
||
|
||
class GeminiService:
|
||
"""Google Gemini AI 服務"""
|
||
|
||
# 連線狀態快取
|
||
_connection_cache = {'status': None, 'timestamp': 0, 'models': []}
|
||
_CACHE_TTL = 300 # 快取 5 分鐘
|
||
|
||
def __init__(self, api_key: str = None, model: str = None):
|
||
"""
|
||
初始化 Gemini 服務
|
||
|
||
Args:
|
||
api_key: Google API Key(預設從環境變數取得)
|
||
model: 預設模型(預設 gemini-1.5-flash)
|
||
"""
|
||
self.api_key = api_key or get_gemini_api_key("gemini_service")
|
||
self.model = model or DEFAULT_GEMINI_MODEL
|
||
self._client = None
|
||
self._initialized = False
|
||
|
||
def _ensure_initialized(self) -> bool:
|
||
"""確保 SDK 已初始化"""
|
||
if self._initialized:
|
||
return True
|
||
|
||
if not is_gemini_fallback_enabled("gemini_service"):
|
||
logger.info(gemini_disabled_message("gemini_service"))
|
||
return False
|
||
|
||
if not self.api_key:
|
||
self.api_key = get_gemini_api_key("gemini_service")
|
||
|
||
if not self.api_key:
|
||
logger.warning("Gemini API Key 未設定")
|
||
return False
|
||
|
||
try:
|
||
import google.generativeai as genai
|
||
genai.configure(api_key=self.api_key)
|
||
self._initialized = True
|
||
logger.info("Gemini SDK 初始化成功")
|
||
return True
|
||
except ImportError:
|
||
logger.error("請安裝 google-generativeai: pip install google-generativeai")
|
||
return False
|
||
except Exception as e:
|
||
logger.error(f"Gemini SDK 初始化失敗: {e}")
|
||
return False
|
||
|
||
def check_connection(self) -> bool:
|
||
"""檢查 Gemini 服務是否可用(含快取)"""
|
||
now = time.time()
|
||
|
||
# 使用快取避免頻繁檢查
|
||
if (GeminiService._connection_cache['status'] is not None and
|
||
now - GeminiService._connection_cache['timestamp'] < GeminiService._CACHE_TTL):
|
||
return GeminiService._connection_cache['status']
|
||
|
||
if not self._ensure_initialized():
|
||
GeminiService._connection_cache = {'status': False, 'timestamp': now, 'models': []}
|
||
return False
|
||
|
||
try:
|
||
import google.generativeai as genai
|
||
|
||
# 嘗試列出模型驗證連線
|
||
models = list(genai.list_models())
|
||
available_models = [m.name for m in models if 'generateContent' in m.supported_generation_methods]
|
||
|
||
logger.info(f"Gemini 連線成功,可用模型數: {len(available_models)}")
|
||
GeminiService._connection_cache = {
|
||
'status': True,
|
||
'timestamp': now,
|
||
'models': available_models
|
||
}
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"Gemini 連線失敗: {e}")
|
||
GeminiService._connection_cache = {'status': False, 'timestamp': now, 'models': []}
|
||
return False
|
||
|
||
def list_models(self) -> List[Dict]:
|
||
"""列出可用模型(供 UI 選擇)"""
|
||
return AVAILABLE_GEMINI_MODELS
|
||
|
||
@staticmethod
|
||
def calculate_cost(model: str, input_tokens: int, output_tokens: int) -> Dict[str, float]:
|
||
"""
|
||
計算 API 費用 (USD)
|
||
|
||
Args:
|
||
model: 模型名稱
|
||
input_tokens: 輸入 token 數量
|
||
output_tokens: 輸出 token 數量
|
||
|
||
Returns:
|
||
dict: {'input_cost': float, 'output_cost': float, 'total_cost': float}
|
||
"""
|
||
# 查找定價(支援完整模型名稱和簡短名稱)
|
||
pricing = None
|
||
for model_key, price in GEMINI_PRICING.items():
|
||
if model_key in model or model in model_key:
|
||
pricing = price
|
||
break
|
||
|
||
if not pricing:
|
||
# 使用預設定價(最便宜的)
|
||
pricing = GEMINI_PRICING.get('gemini-1.5-flash', {'input': 0.075, 'output': 0.30})
|
||
|
||
# 計算費用(per 1M tokens)
|
||
input_cost = (input_tokens / 1_000_000) * pricing['input']
|
||
output_cost = (output_tokens / 1_000_000) * pricing['output']
|
||
total_cost = input_cost + output_cost
|
||
|
||
return {
|
||
'input_cost': round(input_cost, 6),
|
||
'output_cost': round(output_cost, 6),
|
||
'total_cost': round(total_cost, 6)
|
||
}
|
||
|
||
def generate(self, prompt: str, model: str = None,
|
||
system_prompt: str = None, temperature: float = 0.7,
|
||
timeout: int = None) -> GeminiResponse:
|
||
"""
|
||
生成文字
|
||
|
||
Args:
|
||
prompt: 使用者提示
|
||
model: 模型名稱(預設使用 self.model)
|
||
system_prompt: 系統提示
|
||
temperature: 創意度 (0-2)
|
||
timeout: 自訂超時時間(秒)
|
||
|
||
Returns:
|
||
GeminiResponse
|
||
"""
|
||
model_name = model or self.model
|
||
request_timeout = timeout or GEMINI_TIMEOUT
|
||
|
||
if not is_gemini_fallback_enabled("gemini_service.generate"):
|
||
return GeminiResponse(
|
||
success=False,
|
||
content='',
|
||
model=model_name,
|
||
error=gemini_disabled_message("gemini_service.generate")
|
||
)
|
||
|
||
if not self._ensure_initialized():
|
||
return GeminiResponse(
|
||
success=False,
|
||
content='',
|
||
model=model_name,
|
||
error="Gemini SDK 未初始化,請檢查 API Key"
|
||
)
|
||
|
||
try:
|
||
import google.generativeai as genai
|
||
|
||
start_time = time.time()
|
||
|
||
# 建立模型
|
||
generation_config = genai.types.GenerationConfig(
|
||
temperature=temperature,
|
||
max_output_tokens=2048,
|
||
)
|
||
|
||
gen_model = genai.GenerativeModel(
|
||
model_name=model_name,
|
||
generation_config=generation_config,
|
||
system_instruction=system_prompt if system_prompt else None
|
||
)
|
||
|
||
logger.info(f"[Gemini] 開始生成,模型: {model_name},超時: {request_timeout}秒")
|
||
|
||
# 生成內容
|
||
response = gen_model.generate_content(
|
||
prompt,
|
||
request_options={'timeout': request_timeout}
|
||
)
|
||
|
||
end_time = time.time()
|
||
duration = end_time - start_time
|
||
|
||
# 解析回應
|
||
content = response.text if response.text else ''
|
||
|
||
# 取得 token 用量
|
||
input_tokens = 0
|
||
output_tokens = 0
|
||
if hasattr(response, 'usage_metadata'):
|
||
input_tokens = getattr(response.usage_metadata, 'prompt_token_count', 0)
|
||
output_tokens = getattr(response.usage_metadata, 'candidates_token_count', 0)
|
||
|
||
# 計算費用
|
||
costs = self.calculate_cost(model_name, input_tokens, output_tokens)
|
||
|
||
logger.info(f"[Gemini] 生成完成,耗時: {duration:.2f}秒,"
|
||
f"tokens: {input_tokens}+{output_tokens}={input_tokens+output_tokens},"
|
||
f"費用: ${costs['total_cost']:.6f}")
|
||
|
||
return GeminiResponse(
|
||
success=True,
|
||
content=content,
|
||
model=model_name,
|
||
total_duration=duration,
|
||
input_tokens=input_tokens,
|
||
output_tokens=output_tokens,
|
||
total_tokens=input_tokens + output_tokens,
|
||
input_cost=costs['input_cost'],
|
||
output_cost=costs['output_cost'],
|
||
total_cost=costs['total_cost']
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.error(f"Gemini 生成錯誤: {e}")
|
||
return GeminiResponse(
|
||
success=False,
|
||
content='',
|
||
model=model_name,
|
||
error=str(e)
|
||
)
|
||
|
||
def generate_sales_copy(self, product_name: str, trend_keywords: List[str] = None,
|
||
style: str = "吸睛", upcoming_holidays: List[Dict] = None,
|
||
bestseller_products: List[Dict] = None,
|
||
model: str = None) -> GeminiResponse:
|
||
"""
|
||
生成銷售文案
|
||
|
||
Args:
|
||
product_name: 商品名稱
|
||
trend_keywords: 相關趨勢關鍵字
|
||
style: 文案風格 (吸睛/專業/溫馨/急迫)
|
||
upcoming_holidays: 即將到來的假期
|
||
bestseller_products: 競品熱銷商品
|
||
model: 指定模型(預設使用 self.model)
|
||
|
||
Returns:
|
||
GeminiResponse
|
||
"""
|
||
style_prompts = {
|
||
"吸睛": "使用吸引眼球的標題和表情符號",
|
||
"專業": "使用專業術語,強調成分和功效",
|
||
"溫馨": "使用溫暖的語氣,強調呵護和關愛",
|
||
"急迫": "使用限時優惠的語氣,創造緊迫感"
|
||
}
|
||
|
||
# 趨勢關鍵字
|
||
trend_context = ""
|
||
if trend_keywords:
|
||
trend_context = f"\n目前的熱門趨勢關鍵字:{', '.join(trend_keywords)}。請嘗試將這些趨勢融入文案中。"
|
||
|
||
# 即將到來的假期
|
||
holiday_context = ""
|
||
if upcoming_holidays:
|
||
holidays_text = []
|
||
for h in upcoming_holidays[:3]:
|
||
name = h.get('name', '')
|
||
days = h.get('days_until', 0)
|
||
if days == 0:
|
||
holidays_text.append(f"{name}(今天)")
|
||
elif days == 1:
|
||
holidays_text.append(f"{name}(明天)")
|
||
else:
|
||
holidays_text.append(f"{name}({days}天後)")
|
||
if holidays_text:
|
||
holiday_context = f"\n即將到來的假期:{', '.join(holidays_text)}。可以考慮結合節慶氛圍或送禮情境。"
|
||
|
||
# 競品熱銷參考
|
||
bestseller_context = ""
|
||
if bestseller_products:
|
||
products_text = [f"{p.get('name', '')}(${p.get('price', '')})" for p in bestseller_products[:3]]
|
||
if products_text:
|
||
bestseller_context = f"\n市場熱銷參考:{', '.join(products_text)}。可參考熱銷趨勢但要突出自家商品特色。"
|
||
|
||
system_prompt = """你是一位專業的電商銷售文案寫手和行銷策略專家,專門為台灣電商平台撰寫商品文案。
|
||
你的文案特點:
|
||
- 使用繁體中文
|
||
- 善用表情符號增加吸引力
|
||
- 強調商品賣點和消費者利益
|
||
- 適時使用行動呼籲 (CTA)
|
||
- 若有即將到來的節日,可適度融入節慶元素
|
||
- 提供完整的行銷建議"""
|
||
|
||
prompt = f"""請為以下商品撰寫完整的銷售文案套組:
|
||
|
||
商品名稱:{product_name}
|
||
|
||
文案風格:{style_prompts.get(style, style_prompts['吸睛'])}
|
||
{trend_context}{holiday_context}{bestseller_context}
|
||
|
||
請按照以下格式生成完整的銷售文案套組:
|
||
|
||
【大標題】
|
||
(15字以內的主打標語,吸引眼球,適合用於廣告Banner)
|
||
|
||
【中標題】
|
||
(30字以內的副標題,補充說明賣點)
|
||
|
||
【小標題】
|
||
(20字以內的精簡標語,適合用於社群貼文)
|
||
|
||
【詳細文案】
|
||
(100-150字的完整銷售文案,包含商品特色、使用情境、行動呼籲)
|
||
|
||
【推廣建議】
|
||
• 社群推廣:(Facebook/Instagram/LINE 等社群平台的建議策略)
|
||
• 影音內容:(短影音/直播/開箱影片等建議)
|
||
• 其他建議:(EDM、部落格、KOL合作等專業建議)
|
||
|
||
請確保所有內容使用繁體中文,風格一致,並突出商品價值:"""
|
||
|
||
return self.generate(
|
||
prompt,
|
||
model=model,
|
||
system_prompt=system_prompt,
|
||
temperature=0.8,
|
||
timeout=90
|
||
)
|
||
|
||
def extract_keywords(self, text: str, max_keywords: int = 10,
|
||
model: str = None) -> GeminiResponse:
|
||
"""
|
||
從文字中提取關鍵字
|
||
|
||
Args:
|
||
text: 要分析的文字
|
||
max_keywords: 最大關鍵字數量
|
||
model: 指定模型
|
||
|
||
Returns:
|
||
GeminiResponse(content 為逗號分隔的關鍵字)
|
||
"""
|
||
system_prompt = "你是一位關鍵字提取專家。請從給定的文字中提取最重要的關鍵字。"
|
||
|
||
prompt = f"""請從以下文字中提取最多 {max_keywords} 個關鍵字,這些關鍵字應該能代表文章的主題和重點。
|
||
|
||
文字內容:
|
||
{text}
|
||
|
||
請只輸出關鍵字,用逗號分隔,不要輸出其他內容:"""
|
||
|
||
return self.generate(prompt, model=model, system_prompt=system_prompt, temperature=0.3)
|
||
|
||
def search_product_insights(self, product_name: str,
|
||
include_competitors: bool = True,
|
||
include_trends: bool = True,
|
||
model: str = None) -> GeminiResponse:
|
||
"""
|
||
搜尋商品相關的市場洞察
|
||
|
||
Args:
|
||
product_name: 商品名稱
|
||
include_competitors: 是否包含競品分析
|
||
include_trends: 是否包含趨勢分析
|
||
model: 指定模型
|
||
|
||
Returns:
|
||
GeminiResponse
|
||
"""
|
||
system_prompt = """你是一位資深的電商市場分析師,專精於台灣市場。
|
||
你擅長分析商品的市場定位、競爭對手、以及銷售趨勢。
|
||
請提供全面但簡潔的市場洞察,使用繁體中文。"""
|
||
|
||
analysis_parts = ["市場定位分析"]
|
||
if include_competitors:
|
||
analysis_parts.append("主要競爭對手分析")
|
||
if include_trends:
|
||
analysis_parts.append("市場趨勢分析")
|
||
|
||
competitors_json = '"competitors": [{"name": "競品名稱", "strength": "優勢", "weakness": "劣勢"}],' if include_competitors else ""
|
||
trends_json = '"trends": {"current": "當前趨勢", "forecast": "趨勢預測", "seasonality": "季節性因素"},' if include_trends else ""
|
||
analysis_list = '\n'.join([f'{i+1}. {part}' for i, part in enumerate(analysis_parts)])
|
||
|
||
prompt = f"""請為以下商品提供市場洞察分析:
|
||
|
||
商品名稱:{product_name}
|
||
|
||
請分析以下面向:
|
||
{analysis_list}
|
||
|
||
請用以下 JSON 格式回覆(務必輸出有效的 JSON):
|
||
{{
|
||
"product_name": "{product_name}",
|
||
"market_position": {{
|
||
"target_audience": "目標客群描述",
|
||
"price_range": "價格區間建議",
|
||
"positioning": "市場定位建議"
|
||
}},
|
||
{competitors_json}
|
||
{trends_json}
|
||
"recommendations": ["銷售建議1", "銷售建議2", "銷售建議3"],
|
||
"keywords": ["行銷關鍵字1", "關鍵字2", "關鍵字3"]
|
||
}}"""
|
||
|
||
return self.generate(prompt, model=model, system_prompt=system_prompt, temperature=0.6, timeout=90)
|
||
|
||
|
||
# 建立全域服務實例
|
||
gemini_service = GeminiService()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
# 測試程式碼
|
||
logging.basicConfig(level=logging.INFO)
|
||
|
||
service = GeminiService()
|
||
|
||
# 測試連線
|
||
print("測試 Gemini 連線...")
|
||
if service.check_connection():
|
||
print(f"連線成功!")
|
||
|
||
# 測試文案生成
|
||
print("\n測試文案生成...")
|
||
result = service.generate_sales_copy(
|
||
"玻尿酸保濕面膜",
|
||
trend_keywords=["換季保養", "敏感肌"],
|
||
style="吸睛"
|
||
)
|
||
if result.success:
|
||
print(f"生成結果:\n{result.content}")
|
||
print(f"\n耗時: {result.total_duration:.2f} 秒")
|
||
print(f"Token 用量: 輸入 {result.input_tokens}, 輸出 {result.output_tokens}")
|
||
print(f"費用: ${result.total_cost:.6f} USD")
|
||
else:
|
||
print(f"生成失敗: {result.error}")
|
||
else:
|
||
print("連線失敗(請確認 GEMINI_API_KEY 環境變數)")
|