Files
ewoooc/services/ollama_service.py
OoO 353e565e52
All checks were successful
CD Pipeline / deploy (push) Successful in 1m4s
V10.417 protect embedding fallback routing
2026-05-24 14:53:43 +08:00

1023 lines
41 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Ollama LLM 服務模組
負責與 Ollama API 互動,提供文案生成、關鍵字提取等功能
"""
import os
import requests
import json
import logging
import fnmatch
from typing import Optional, Dict, Any, List, Tuple
from dataclasses import dataclass
logger = logging.getLogger(__name__)
APPROVED_OLLAMA_HOST_SUBSTRINGS = (
'34.143.170.20:11434', # GCP-A / Primary
'34.21.145.224:11434', # GCP-B / Secondary
'192.168.0.111:11434', # 111 / final fallback
'192.168.0.110:11435', # 110 proxy to GCP-A
'192.168.0.110:11436', # 110 proxy to GCP-B
)
def is_approved_ollama_host(host: str) -> bool:
"""只允許 ADR-028 指定的 Ollama 主機或 110 轉發端口。"""
if not host:
return False
return any(approved in host for approved in APPROVED_OLLAMA_HOST_SUBSTRINGS)
def approved_ollama_env(name: str, default: str = '') -> str:
"""讀取 Ollama host env拒絕非 GCP-A/GCP-B/111 的舊值或誤設值。"""
value = os.getenv(name, '').strip()
if not value:
return default
if is_approved_ollama_host(value):
return value
logger.warning(
"[OllamaHost] 忽略未核准的 %s=%sLLM 只能走 GCP-A/GCP-B/111",
name,
value,
)
return default
# Ollama 設定 - 僅允許 GCP-A → GCP-B → 111 三主機
OLLAMA_HOST_PRIMARY = approved_ollama_env('OLLAMA_HOST_PRIMARY', 'http://34.143.170.20:11434')
OLLAMA_HOST_SECONDARY = approved_ollama_env('OLLAMA_HOST_SECONDARY', 'http://34.21.145.224:11434')
OLLAMA_HOST_FALLBACK = approved_ollama_env('OLLAMA_HOST_FALLBACK', 'http://192.168.0.111:11434')
# 舊 OLLAMA_HOST 只接受核准主機;否則回到 primary由 resolve_ollama_host() 管控級聯
OLLAMA_HOST = approved_ollama_env('OLLAMA_HOST', OLLAMA_HOST_PRIMARY)
DEFAULT_MODEL = os.getenv('OLLAMA_MODEL', 'llama3.1:8b') # 較快速的模型
TIMEOUT = int(os.getenv('OLLAMA_TIMEOUT', '120')) # 秒 - 2 分鐘
COPY_TIMEOUT = int(os.getenv('OLLAMA_COPY_TIMEOUT', '180')) # 文案生成專用超時 - 3 分鐘
EMBED_TIMEOUT = int(os.getenv('OLLAMA_EMBED_TIMEOUT', os.getenv('EMBEDDING_TIMEOUT', '45')))
EMBED_MAX_TIMEOUT = int(os.getenv('OLLAMA_EMBED_MAX_TIMEOUT', '15'))
EMBED_KEEP_ALIVE = os.getenv('OLLAMA_EMBED_KEEP_ALIVE', '1m')
EMBED_MAX_CHARS = int(os.getenv('OLLAMA_EMBED_MAX_CHARS', '4000'))
FALLBACK_111_KEEP_ALIVE = os.getenv('OLLAMA_111_KEEP_ALIVE', '5m')
FALLBACK_111_MAX_TIMEOUT = int(os.getenv('OLLAMA_111_MAX_TIMEOUT', '20'))
FALLBACK_111_NUM_CTX = int(os.getenv('OLLAMA_111_NUM_CTX', '4096'))
FALLBACK_111_NUM_PREDICT = int(os.getenv('OLLAMA_111_NUM_PREDICT', '512'))
FALLBACK_111_MODEL = os.getenv('OLLAMA_111_MODEL_FALLBACK', 'llama3.2:latest')
FALLBACK_111_MODEL_PATTERNS = tuple(
pattern.strip().lower()
for pattern in os.getenv(
'OLLAMA_111_MODEL_DOWNGRADE_PATTERNS',
(
'qwen3:*,deepseek-r1:*,hermes3:*,llama3.1:*,'
'qwen2.5:*,qwen2.5-coder:*,gemma3:*,minicpm-v:*,llava:*,'
'*:7b*,*:8b*,*:14b*,*:32b*,*:70b*'
),
).split(',')
if pattern.strip()
)
# ── GCP 優先 / 111 備援:解析實際可用的 Ollama 主機 ──────────────────────────
# ADR-027 Phase 2 強化:
# B3 — _is_reachable 從純 TCP 改 HTTP probe/api/version避免 process 掛但 port listen 中誤判。
# B4 — 新增 mark_unhealthy(host)generate/embedding 失敗時呼叫,下次 resolve 30s 內跳過該主機。
_resolved_host_cache: dict = {'host': None, 'ts': 0}
_RESOLVE_TTL = 120 # 主機健康狀態快取 120 秒
_unhealthy_marks: dict = {} # host_url -> ts30s 內被標記就跳過
_UNHEALTHY_TTL = 30 # 主機被標 unhealthy 後 30 秒內跳過 resolve
def mark_unhealthy(host: str) -> None:
"""
將指定主機標記為近期不健康,使下一次 resolve_ollama_host 跳過該主機直接 fallback。
呼叫時機generate / generate_embedding / 任何 Ollama HTTP 互動失敗。
過 _UNHEALTHY_TTL30s後會自然 expire恢復正常 resolve 流程。
同時清空 resolved cache強制下次 resolve 重新評估。
"""
import time
if not host:
return
_unhealthy_marks[host.rstrip('/')] = time.time()
# 同時讓 resolved cache 失效,下次 resolve 才會重新評估
_resolved_host_cache['host'] = None
_resolved_host_cache['ts'] = 0
logger.warning(f"[OllamaHost] 主機標記為 unhealthy30s 跳過):{host}")
def _mark_unhealthy_best_effort(host: str) -> None:
try:
mark_unhealthy(host)
except Exception:
logger.debug("[OllamaHost] mark_unhealthy failed for host=%s", host, exc_info=True)
def _normalize_host(host: str) -> str:
return (host or '').rstrip('/')
def _is_111_fallback_host(host: str) -> bool:
return '192.168.0.111:11434' in (host or '')
def _effective_model_for_host(model: str, host: str) -> str:
"""
111 是 Mac/HDD final fallback不承接 7B+ / vision / long-context 等模型。
GCP-A/GCP-B 仍照 caller 指定模型;只有落到 111 才降級,避免 16GB RAM
被 hermes3/qwen/gemma 的大 context runner 長時間壓到 swap。
"""
if not _is_111_fallback_host(host):
return model
model_lower = (model or '').lower()
if any(fnmatch.fnmatch(model_lower, pattern) for pattern in FALLBACK_111_MODEL_PATTERNS):
logger.warning(
"[Ollama] 111 fallback 不承接重模型 model=%s,改用 %s",
model,
FALLBACK_111_MODEL,
)
return FALLBACK_111_MODEL
return model
def _effective_timeout_for_host(timeout_s: int, host: str) -> int:
"""111 final fallback timeout 封頂,避免單次 fallback 拖住 Mac 數分鐘。"""
if _is_111_fallback_host(host):
return min(timeout_s, FALLBACK_111_MAX_TIMEOUT)
return timeout_s
def _cap_111_options(options: Dict[str, Any]) -> None:
"""111 fallback 強制縮 context / output避免最後備援被長任務拖成高負載。"""
try:
requested_num_ctx = int(options.get("num_ctx") or FALLBACK_111_NUM_CTX)
except (TypeError, ValueError):
requested_num_ctx = FALLBACK_111_NUM_CTX
options["num_ctx"] = min(requested_num_ctx, FALLBACK_111_NUM_CTX)
try:
requested_num_predict = int(options.get("num_predict") or FALLBACK_111_NUM_PREDICT)
except (TypeError, ValueError):
requested_num_predict = FALLBACK_111_NUM_PREDICT
options["num_predict"] = min(requested_num_predict, FALLBACK_111_NUM_PREDICT)
def _canonical_host_chain() -> List[str]:
"""Return the approved static fallback chain without duplicates."""
chain: List[str] = []
for host in (OLLAMA_HOST_PRIMARY, OLLAMA_HOST_SECONDARY, OLLAMA_HOST_FALLBACK):
clean = _normalize_host(host)
if clean and clean not in chain:
chain.append(clean)
return chain
def _is_unhealthy(host: str) -> bool:
"""檢查 host 是否在 unhealthy TTL 內"""
import time
if not host:
return False
clean_host = _normalize_host(host)
ts = _unhealthy_marks.get(clean_host)
if ts is None:
return False
if time.time() - ts >= _UNHEALTHY_TTL:
# TTL 過期,清除
_unhealthy_marks.pop(clean_host, None)
return False
return True
def resolve_ollama_host(primary: str = OLLAMA_HOST_PRIMARY,
secondary: str = OLLAMA_HOST_SECONDARY,
fallback: str = OLLAMA_HOST_FALLBACK) -> str:
"""
V-New: 自動探測 GCP Ollama 主機是否可用。
- 可用 → 回傳 primaryGCP
- 不可用 → fallback 到 111 內網 Hermes
結果快取 120 秒,避免每次請求都觸發 HTTP 探測。
ADR-027 Phase 2:
- HTTP probe/api/version取代純 TCP socket避免 process 掛但 port listen 假活。
- 若主機被 mark_unhealthy 標記在 30s 內,跳過該主機直接 fallback。
"""
import time
now = time.time()
if (_resolved_host_cache['host'] is not None and
now - _resolved_host_cache['ts'] < _RESOLVE_TTL):
return _resolved_host_cache['host']
def _is_reachable(url: str, timeout: float = 2.0) -> bool:
"""
HTTP probe呼叫 /api/version 檢查 Ollama 服務真的健康。
回傳 200 才算可用timeout 收緊到 2s從 3s以加快失敗探測。
相比純 TCP可避免 process 掛但 port 仍 listen 中的假活情境。
"""
try:
resp = requests.get(f"{url.rstrip('/')}/api/version", timeout=timeout)
return resp.status_code == 200
except Exception:
return False
def _is_tcp_reachable(url: str, timeout: float = 1.5) -> bool:
"""
次級備援探測:純 TCP socket。
當 HTTP 失敗但 TCP 通時,紀錄為「網路層健康但 HTTP 掛」的觀測點,
仍視為 unreachable因為 Ollama 提供的是 HTTP 服務TCP 通沒意義),
但留在程式碼裡作為診斷工具。
"""
try:
import socket
from urllib.parse import urlparse
parsed = urlparse(url)
host = parsed.hostname
port = parsed.port or 11434
sock = socket.create_connection((host, port), timeout=timeout)
sock.close()
return True
except Exception:
return False
# B4: primary 若被標 unhealthy嘗試 secondary
if not _is_unhealthy(primary) and _is_reachable(primary):
selected = primary
logger.info(f"[OllamaHost] Primary 主機可用: {primary}")
elif not _is_unhealthy(secondary) and _is_reachable(secondary):
selected = secondary
logger.info(f"[OllamaHost] Primary 不可用,使用 Secondary: {secondary}")
else:
selected = fallback
logger.warning(f"[OllamaHost] Primary 與 Secondary 皆無法連線,切換 Fallback: {fallback}")
_resolved_host_cache['host'] = selected
_resolved_host_cache['ts'] = now
return selected
def get_host_label(host: str) -> str:
"""將 IP/URL 轉換為易讀的主機標籤
Phase 53支援 110 Nginx Proxy11435/11436 → GCP
判斷順序:直連 GCP IP > Nginx 轉發 port > 內網 IP > 本地。
"""
if not host:
return "未知"
# 直連 GCPdocker-compose 環境)
if "34.143.170.20" in host:
return "GCP-SSD"
if "34.21.145.224" in host:
return "GCP-SSD-2"
# Nginx Proxy 轉發110 跳板代理 GCP
if "192.168.0.110:11435" in host:
return "GCP-SSDvia Nginx 110"
if "192.168.0.110:11436" in host:
return "GCP-SSD-2via Nginx 110"
# 內網 / 本地
if "192.168.0.111" in host:
return "111 備援"
if "192.168.0.188" in host or "localhost" in host:
return "188 本地"
return host.split('//')[-1].split(':')[0]
def get_provider_tag(host: str) -> str:
"""將 host URL 轉換為 ai_calls.provider 標籤
Phase 53 新加:統一 provider 判斷邏輯,避免散落各 service 重寫。
對應 ai_calls.provider 白名單gcp_ollama / ollama_secondary / ollama_111 / ollama_other
"""
if not host:
return 'ollama_other'
# GCP 直連或 Nginx 轉發都歸 gcp_ollama / ollama_secondary
if "34.143.170.20" in host or "192.168.0.110:11435" in host:
return 'gcp_ollama'
if "34.21.145.224" in host or "192.168.0.110:11436" in host:
return 'ollama_secondary'
if "192.168.0.111" in host:
return 'ollama_111'
return 'ollama_other'
@dataclass
class OllamaResponse:
"""Ollama 回應結構
Phase 13 補強A4 已知 limitation 修補):
新增 input_tokens / output_tokens 欄位,由 /api/generate 回應的
prompt_eval_count + eval_count 解析。解 ai_call_logger 的
openclaw_bot_main token=0 假數據問題(誤導 token 日報統計)。
"""
success: bool
content: str
model: str
error: Optional[str] = None
total_duration: Optional[float] = None
host: Optional[str] = None
input_tokens: int = 0 # prompt_eval_count
output_tokens: int = 0 # eval_count
class OllamaService:
"""Ollama LLM 服務"""
# V-Opt: 連線狀態快取,避免重複檢查
_connection_cache = {'status': None, 'timestamp': 0}
_CACHE_TTL = 60 # 快取 60 秒
def __init__(self, host: str = None, model: str = DEFAULT_MODEL):
# HOTFIX 2026-05-04 統帥反饋「111 關機 → GCP 也斷」根因修補:
# 原邏輯 self.host = resolve_ollama_host() 是 __init__ 凍結,容器啟動時若
# GCP cold start 觸發 fallback 111self.host 永遠卡 111。即使 cache 過期,
# OllamaService instance 不會重新 resolve。
# 修補self._explicit_host 只在 caller 顯式指定時凍結;否則 self.host 改 property
# 每次 access 走 resolve_ollama_host()(內部 120s cache 控制成本)。
self._explicit_host = host # None 表示走 lazy resolve
self.model = model
self.available_models = []
@property
def host(self) -> str:
"""每次存取 lazy resolve120s cachecaller 顯式指定 host 才凍結。"""
if self._explicit_host:
return self._explicit_host
return resolve_ollama_host()
def check_connection(self) -> bool:
"""檢查 Ollama 服務是否可用(含快取)"""
import time
# V-Opt: 使用快取避免頻繁檢查
now = time.time()
if (OllamaService._connection_cache['status'] is not None and
now - OllamaService._connection_cache['timestamp'] < OllamaService._CACHE_TTL):
return OllamaService._connection_cache['status']
try:
# V-Opt: 增加超時時間避免 Ollama 負載高時誤判為離線
response = requests.get(f"{self.host}/api/tags", timeout=10)
if response.status_code == 200:
data = response.json()
self.available_models = [m['name'] for m in data.get('models', [])]
logger.info(f"Ollama 連線成功,可用模型: {self.available_models}")
OllamaService._connection_cache = {'status': True, 'timestamp': now}
return True
OllamaService._connection_cache = {'status': False, 'timestamp': now}
return False
except Exception as e:
logger.error(f"Ollama 連線失敗: {e}")
OllamaService._connection_cache = {'status': False, 'timestamp': now}
return False
def list_models(self) -> List[str]:
"""列出可用模型"""
if not self.available_models:
self.check_connection()
return self.available_models
def generate(self, prompt: str, model: str = None,
system_prompt: str = None, temperature: float = 0.7,
timeout: int = None, keep_alive: str = None,
options: Optional[Dict[str, Any]] = None,
images: Optional[List[str]] = None,
allow_111_fallback: bool = True) -> OllamaResponse:
"""
生成文字 — 含三主機自動 retryHOTFIX 2026-05-04
失敗時自動嘗試下一台主機(最多 3 次Primary → Secondary → 111
每次失敗 mark_unhealthy 觸發 resolve cache 失效。
"""
model = model or self.model
request_timeout = timeout or TIMEOUT
base_payload = {
"model": model,
"prompt": prompt,
"stream": False,
"options": {"temperature": temperature},
}
if options:
base_payload["options"].update(options)
if system_prompt:
base_payload["system"] = system_prompt
if images:
base_payload["images"] = images
# HOTFIX 三主機 retry 鏈
attempted_hosts: List[str] = []
last_error: Optional[str] = None
canonical_hosts = _canonical_host_chain()
allowed_hosts = [
host for host in canonical_hosts
if allow_111_fallback or not _is_111_fallback_host(host)
]
max_attempts = len(canonical_hosts) if allow_111_fallback else max(1, len(allowed_hosts))
for attempt in range(max_attempts):
current_host = _normalize_host(self.host) # property 每次 lazy resolve
if not allow_111_fallback and _is_111_fallback_host(current_host):
last_error = "111 fallback disabled; no approved GCP Ollama host available"
logger.warning("[Ollama] %s", last_error)
break
if current_host in attempted_hosts:
# 已試過同主機時,若是標準三主機鏈且 caller 沒指定 host
# 改走尚未嘗試的下一台。避免 request timeout(60s) 大於
# unhealthy TTL(30s) 時第三輪又 resolve 回 primary導致 111
# final fallback 永遠沒被打到。
next_host = None
if self._explicit_host is None and current_host in allowed_hosts:
next_host = next((host for host in allowed_hosts if host not in attempted_hosts), None)
if not next_host:
# 非標準 host 或 explicit host 維持原行為:跳出避免無限迴圈。
break
logger.info(
"[Ollama] resolver returned previously attempted host=%s; forcing next fallback host=%s",
current_host,
next_host,
)
current_host = next_host
attempted_hosts.append(current_host)
effective_model = _effective_model_for_host(model, current_host)
effective_timeout = _effective_timeout_for_host(request_timeout, current_host)
payload = dict(base_payload)
payload["options"] = dict(base_payload["options"])
payload["model"] = effective_model
if _is_111_fallback_host(current_host):
payload["keep_alive"] = FALLBACK_111_KEEP_ALIVE
_cap_111_options(payload["options"])
elif keep_alive:
payload["keep_alive"] = keep_alive
logger.info(
"[Ollama] 嘗試 #%s/%s host=%s model=%s timeout=%ss keep_alive=%s",
attempt + 1,
max_attempts,
current_host,
effective_model,
effective_timeout,
payload.get("keep_alive", ""),
)
try:
response = requests.post(
f"{current_host}/api/generate",
json=payload,
timeout=effective_timeout,
)
if response.status_code == 200:
data = response.json()
return OllamaResponse(
success=True,
content=data.get('response', ''),
model=effective_model,
total_duration=data.get('total_duration', 0) / 1e9,
host=current_host,
# Phase 13 補強:解 token=0 假數據A4 已知 limitation
input_tokens=int(data.get('prompt_eval_count', 0) or 0),
output_tokens=int(data.get('eval_count', 0) or 0),
)
# HTTP 非 200標 unhealthy + 嘗試下一台
last_error = f"HTTP {response.status_code}: {response.text[:200]}"
logger.warning(f"[Ollama] {current_host} HTTP 失敗 → mark_unhealthy + retry: {last_error}")
_mark_unhealthy_best_effort(current_host)
except requests.Timeout:
last_error = f"timeout ({effective_timeout}s)"
logger.warning(f"[Ollama] {current_host} timeout → mark_unhealthy + retry")
_mark_unhealthy_best_effort(current_host)
except Exception as e:
last_error = f"{type(e).__name__}: {str(e)[:200]}"
logger.error(f"[Ollama] {current_host} error → mark_unhealthy + retry: {last_error}")
_mark_unhealthy_best_effort(current_host)
# 三台都失敗
return OllamaResponse(
success=False,
content='',
model=model,
error=f"all {len(attempted_hosts)} hosts failed; last={last_error}; tried={attempted_hosts}",
host=attempted_hosts[-1] if attempted_hosts else 'unknown',
)
def generate_sales_copy(self, product_name: str, trend_keywords: List[str] = None,
style: str = "吸睛", upcoming_holidays: List[Dict] = None,
bestseller_products: List[Dict] = None) -> OllamaResponse:
"""
生成銷售文案
Args:
product_name: 商品名稱
trend_keywords: 相關趨勢關鍵字
style: 文案風格 (吸睛/專業/溫馨/急迫)
upcoming_holidays: 即將到來的假期 [{"name": "春節", "date": "2026-01-29", "days_until": 8}]
bestseller_products: 競品熱銷商品 [{"name": "xxx", "price": 999}]
Returns:
OllamaResponse
"""
style_prompts = {
"吸睛": "使用吸引眼球的標題和表情符號",
"專業": "使用專業術語,強調成分和功效",
"溫馨": "使用溫暖的語氣,強調呵護和關愛",
"急迫": "使用限時優惠的語氣,創造緊迫感"
}
# 趨勢關鍵字
trend_context = ""
if trend_keywords:
trend_context = f"\n目前的熱門趨勢關鍵字:{', '.join(trend_keywords)}。請嘗試將這些趨勢融入文案中。"
# 即將到來的假期
holiday_context = ""
if upcoming_holidays:
holidays_text = []
for h in upcoming_holidays[:3]: # 最多取 3 個
name = h.get('name', '')
days = h.get('days_until', 0)
if days == 0:
holidays_text.append(f"{name}(今天)")
elif days == 1:
holidays_text.append(f"{name}(明天)")
else:
holidays_text.append(f"{name}{days}天後)")
if holidays_text:
holiday_context = f"\n即將到來的假期:{', '.join(holidays_text)}。可以考慮結合節慶氛圍或送禮情境。"
# 競品熱銷參考
bestseller_context = ""
if bestseller_products:
products_text = [f"{p.get('name', '')}${p.get('price', '')}" for p in bestseller_products[:3]]
if products_text:
bestseller_context = f"\n市場熱銷參考:{', '.join(products_text)}。可參考熱銷趨勢但要突出自家商品特色。"
system_prompt = """你是一位專業的電商銷售文案寫手和行銷策略專家,專門為台灣電商平台撰寫商品文案。
你的文案特點:
- 使用繁體中文
- 善用表情符號增加吸引力
- 強調商品賣點和消費者利益
- 適時使用行動呼籲 (CTA)
- 若有即將到來的節日,可適度融入節慶元素
- 提供完整的行銷建議"""
prompt = f"""請為以下商品撰寫完整的銷售文案套組:
商品名稱:{product_name}
文案風格:{style_prompts.get(style, style_prompts['吸睛'])}
{trend_context}{holiday_context}{bestseller_context}
請按照以下格式生成完整的銷售文案套組:
【大標題】
15字以內的主打標語吸引眼球適合用於廣告Banner
【中標題】
30字以內的副標題補充說明賣點
【小標題】
20字以內的精簡標語適合用於社群貼文
【詳細文案】
100-150字的完整銷售文案包含商品特色、使用情境、行動呼籲
【推廣建議】
• 社群推廣Facebook/Instagram/LINE 等社群平台的建議策略)
• 影音內容:(短影音/直播/開箱影片等建議)
• 其他建議EDM、部落格、KOL合作等專業建議
請確保所有內容使用繁體中文,風格一致,並突出商品價值:"""
# 文案生成使用更長的超時時間
# Phase 22.12026-05-04caller × context 動態 model 路由
# 短文 < 100 字 → gemma3:4b輕量快長文 → llama3.1:8b既有預設
# MODEL_ROUTER_ENABLED=false 時直接回 default向下相容
try:
from services.llm_model_router import select_model
expected_length = len(product_name) * 5 # 商品名長 × 5 推估文案輸出長度
chosen_model = select_model(
caller='sales_copy',
context={'expected_length': expected_length},
default=self.model, # llama3.1:8b 預設
)
except Exception:
chosen_model = self.model # router 失敗不影響主流程
return self.generate(prompt, model=chosen_model, system_prompt=system_prompt,
temperature=0.8, timeout=COPY_TIMEOUT)
def extract_keywords(self, text: str, max_keywords: int = 10) -> OllamaResponse:
"""
從文字中提取關鍵字
Args:
text: 要分析的文字
max_keywords: 最大關鍵字數量
Returns:
OllamaResponsecontent 為逗號分隔的關鍵字)
"""
system_prompt = "你是一位關鍵字提取專家。請從給定的文字中提取最重要的關鍵字。"
prompt = f"""請從以下文字中提取最多 {max_keywords} 個關鍵字,這些關鍵字應該能代表文章的主題和重點。
文字內容:
{text}
請只輸出關鍵字,用逗號分隔,不要輸出其他內容:"""
return self.generate(prompt, system_prompt=system_prompt, temperature=0.3)
def match_products_to_trend(self, trend_topic: str, trend_description: str,
products: List[Dict[str, Any]]) -> OllamaResponse:
"""
根據趨勢話題匹配適合的商品
Args:
trend_topic: 趨勢話題
trend_description: 趨勢描述
products: 商品列表 [{"name": "...", "category": "...", "description": "..."}, ...]
Returns:
OllamaResponsecontent 為 JSON 格式的推薦結果)
"""
# 只取前 50 個商品避免 prompt 過長
products_text = "\n".join([
f"- {p.get('name', '')} (分類: {p.get('category', '未分類')})"
for p in products[:50]
])
system_prompt = """你是一位電商行銷專家,擅長將熱門話題與商品進行關聯。
你的任務是從商品列表中找出最適合搭配當前趨勢話題進行行銷的商品。"""
prompt = f"""當前熱門話題:{trend_topic}
話題描述:{trend_description}
商品列表:
{products_text}
請從上述商品中選出最適合搭配這個話題進行行銷的前 5 個商品。
對於每個推薦的商品,請說明:
1. 為什麼這個商品適合這個話題
2. 建議的行銷角度
請用以下 JSON 格式回覆:
{{
"recommendations": [
{{"product_name": "商品名稱", "reason": "推薦原因", "marketing_angle": "行銷角度"}},
...
]
}}"""
return self.generate(prompt, system_prompt=system_prompt, temperature=0.5)
def analyze_trend_relevance(self, trend_info: str, product_categories: List[str]) -> OllamaResponse:
"""
分析趨勢與商品分類的相關性
Args:
trend_info: 趨勢資訊
product_categories: 商品分類列表
Returns:
OllamaResponse
"""
categories_text = ", ".join(product_categories)
system_prompt = "你是一位市場分析師,擅長分析消費趨勢與商品之間的關聯。"
prompt = f"""趨勢資訊:
{trend_info}
可用的商品分類:
{categories_text}
請分析這個趨勢與哪些商品分類最相關並給出相關性評分1-10分
請用 JSON 格式回覆:
{{
"analysis": "簡短的分析說明",
"relevant_categories": [
{{"category": "分類名稱", "score": 8, "reason": "相關原因"}},
...
]
}}"""
return self.generate(prompt, system_prompt=system_prompt, temperature=0.4)
def web_search(self, query: str, num_results: int = 5,
search_type: str = "general") -> OllamaResponse:
"""
使用 Ollama 進行網路搜尋並整理結果
注意:這個功能需要 Ollama 支援工具調用 (tool calling)
或使用支援搜尋的模型 (如 llama3.2 with tools)
Args:
query: 搜尋關鍵字
num_results: 返回結果數量
search_type: 搜尋類型 (general/news/shopping/trends)
Returns:
OllamaResponse
"""
search_prompts = {
"general": "請搜尋並整理關於此主題的最新資訊",
"news": "請搜尋並整理此主題的最新新聞和報導",
"shopping": "請搜尋並整理此商品的市場資訊、價格和評價",
"trends": "請搜尋並分析此主題的市場趨勢和熱門程度"
}
system_prompt = """你是一位專業的市場研究分析師。
你的任務是根據使用者的搜尋需求,整理出結構化的資訊。
請用以下 JSON 格式回覆:
{
"query": "原始搜尋關鍵字",
"summary": "搜尋結果摘要50字以內",
"results": [
{
"title": "結果標題",
"description": "簡短描述",
"relevance": "與搜尋的相關性說明",
"keywords": ["相關關鍵字1", "關鍵字2"]
}
],
"insights": ["洞察1", "洞察2"],
"recommended_actions": ["建議行動1", "建議行動2"]
}"""
search_context = search_prompts.get(search_type, search_prompts["general"])
prompt = f"""搜尋需求:{query}
搜尋類型:{search_type}
期望結果數:{num_results}
{search_context}
請根據你對這個主題的了解,提供結構化的分析結果。
包含主要的市場趨勢、相關關鍵字、以及對電商銷售的建議。"""
return self.generate(prompt, system_prompt=system_prompt, temperature=0.5, timeout=120)
def search_product_insights(self, product_name: str,
include_competitors: bool = True,
include_trends: bool = True,
web_context: str = "") -> OllamaResponse:
"""
搜尋商品相關的市場洞察
Args:
product_name: 商品名稱
include_competitors: 是否包含競品分析
include_trends: 是否包含趨勢分析
web_context: 網路搜尋結果(用於提供即時市場資訊)
Returns:
OllamaResponse
"""
system_prompt = """你是一位資深的電商市場分析師,專精於台灣市場。
你擅長分析商品的市場定位、競爭對手、以及銷售趨勢。
請提供全面但簡潔的市場洞察,使用繁體中文。
若有提供網路搜尋結果,請優先參考這些最新資訊進行分析。"""
analysis_parts = ["市場定位分析"]
if include_competitors:
analysis_parts.append("主要競爭對手分析")
if include_trends:
analysis_parts.append("市場趨勢分析")
# 建構動態 JSON 區塊(避免 f-string 中使用 backslash
competitors_json = '"competitors": [{"name": "競品名稱", "strength": "優勢", "weakness": "劣勢"}],' if include_competitors else ""
trends_json = '"trends": {"current": "當前趨勢", "forecast": "趨勢預測", "seasonality": "季節性因素"},' if include_trends else ""
analysis_list = chr(10).join([f'{i+1}. {part}' for i, part in enumerate(analysis_parts)])
# 加入網路搜尋結果(如果有)
web_context_section = ""
if web_context and web_context.strip():
web_context_section = f"""
【參考資料 - 網路搜尋最新結果】
{web_context.strip()}
請根據以上網路搜尋結果,結合你的知識,提供更精準的市場分析。
"""
prompt = f"""請為以下商品提供市場洞察分析:
商品名稱:{product_name}
{web_context_section}
請分析以下面向:
{analysis_list}
請用以下 JSON 格式回覆(務必輸出有效的 JSON
{{
"product_name": "{product_name}",
"market_position": {{
"target_audience": "目標客群描述",
"price_range": "價格區間建議",
"positioning": "市場定位建議"
}},
{competitors_json}
{trends_json}
"recommendations": ["銷售建議1", "銷售建議2", "銷售建議3"],
"keywords": ["行銷關鍵字1", "關鍵字2", "關鍵字3"]
}}"""
return self.generate(prompt, system_prompt=system_prompt, temperature=0.6, timeout=180)
def search_trend_keywords(self, category: str, time_range: str = "week") -> OllamaResponse:
"""
搜尋特定分類的熱門關鍵字和趨勢
Args:
category: 商品分類
time_range: 時間範圍 (day/week/month)
Returns:
OllamaResponse
"""
time_desc = {
"day": "今天",
"week": "本週",
"month": "本月"
}
system_prompt = """你是一位社群媒體和搜尋趨勢分析專家,專注於台灣電商市場。
你熟悉各大平台的熱門話題、關鍵字趨勢、以及消費者行為。"""
prompt = f"""請分析「{category}」這個商品分類在{time_desc.get(time_range, '近期')}的熱門關鍵字和趨勢。
請提供:
1. 熱門搜尋關鍵字5-10個
2. 社群討論熱點3-5個話題
3. 消費者關注點
4. 行銷建議
請用以下 JSON 格式回覆:
{{
"category": "{category}",
"time_range": "{time_range}",
"hot_keywords": [
{{"keyword": "關鍵字", "trend": "上升/穩定/下降", "relevance": "高/中/低"}}
],
"social_topics": [
{{"topic": "話題", "platform": "平台", "engagement": "互動度描述"}}
],
"consumer_concerns": ["關注點1", "關注點2"],
"marketing_suggestions": ["建議1", "建議2"]
}}"""
return self.generate(prompt, system_prompt=system_prompt, temperature=0.5, timeout=120)
@staticmethod
def _extract_embedding(payload: Dict[str, Any]) -> List[float]:
"""Normalize Ollama /api/embed and legacy /api/embeddings response shapes."""
embeddings = payload.get("embeddings")
if isinstance(embeddings, list) and embeddings:
first = embeddings[0]
if isinstance(first, list):
return first
if all(isinstance(value, (int, float)) for value in embeddings):
return embeddings
embedding = payload.get("embedding")
if isinstance(embedding, list):
return embedding
return []
def generate_embedding(self, text: str, model: str = "bge-m3:latest",
host: str = None, timeout: int = None,
allow_111_fallback: bool = True) -> List[float]:
"""
[ADR-007] Embedding — 含三主機自動 retryHOTFIX 2026-05-04
失敗時自動嘗試下一台主機(最多 3 次Primary → Secondary → 111
每次失敗 mark_unhealthy 觸發 resolve cache 失效,下次 resolve 取新主機。
caller 顯式 host=... 時凍結(不 retry
"""
clean_text = (text or "").strip()
if not clean_text:
return []
if len(clean_text) > EMBED_MAX_CHARS:
logger.info(
"[Embed] input clipped from %s to %s chars for model=%s",
len(clean_text),
EMBED_MAX_CHARS,
model,
)
clean_text = clean_text[:EMBED_MAX_CHARS]
request_timeout = min(timeout or EMBED_TIMEOUT, EMBED_MAX_TIMEOUT)
def _embed_one(target_host: str) -> List[float]:
"""單次 embedding 嘗試 — 成功回 vec失敗回 [] + mark_unhealthy"""
try:
# /api/embed 主路徑
response = requests.post(
f"{target_host}/api/embed",
json={"model": model, "input": clean_text, "keep_alive": EMBED_KEEP_ALIVE},
timeout=request_timeout,
)
if response.status_code == 200:
vec = self._extract_embedding(response.json())
if vec:
return vec
logger.warning(f"[Embed] empty response @ {target_host}/api/embed")
elif response.status_code not in (404, 405):
logger.warning(f"[Embed] HTTP {response.status_code} @ {target_host}/api/embed: {response.text[:200]}")
mark_unhealthy(target_host)
return []
# /api/embeddings legacy fallback
legacy = requests.post(
f"{target_host}/api/embeddings",
json={"model": model, "prompt": clean_text},
timeout=request_timeout,
)
if legacy.status_code == 200:
return self._extract_embedding(legacy.json())
logger.warning(f"[Embed] both endpoints failed @ {target_host}: {legacy.status_code}")
mark_unhealthy(target_host)
return []
except Exception as e:
logger.warning(f"[Embed] exception @ {target_host}: {e}")
mark_unhealthy(target_host)
return []
# caller 顯式指定 host → 凍結不 retry
if host:
if not allow_111_fallback and _is_111_fallback_host(host):
logger.warning("[Embed] 111 fallback disabled; explicit host skipped: %s", host)
return []
return _embed_one(host.rstrip("/"))
# HOTFIX 三主機 retry 鏈(與 generate() 同模式)
attempted_hosts: List[str] = []
canonical_hosts = _canonical_host_chain()
allowed_hosts = [
candidate for candidate in canonical_hosts
if allow_111_fallback or not _is_111_fallback_host(candidate)
]
max_attempts = len(canonical_hosts) if allow_111_fallback else max(1, len(allowed_hosts))
for attempt in range(max_attempts):
configured_host = (approved_ollama_env("EMBEDDING_HOST") or "").rstrip("/")
if configured_host and (allow_111_fallback or not _is_111_fallback_host(configured_host)):
target_host = configured_host
else:
if configured_host and _is_111_fallback_host(configured_host):
logger.warning("[Embed] 111 fallback disabled; ignoring EMBEDDING_HOST=%s", configured_host)
target_host = resolve_ollama_host().rstrip("/")
if not allow_111_fallback and _is_111_fallback_host(target_host):
logger.warning("[Embed] 111 fallback disabled; no approved GCP embedding host available")
break
if target_host in attempted_hosts:
next_host = None
if target_host in allowed_hosts:
next_host = next((candidate for candidate in allowed_hosts if candidate not in attempted_hosts), None)
if not next_host:
break # cache 還沒過期或同主機,避免無限迴圈
logger.info(
"[Embed] resolver returned attempted host=%s; forcing next host=%s",
target_host,
next_host,
)
target_host = next_host
attempted_hosts.append(target_host)
vec = _embed_one(target_host)
if vec:
return vec
logger.info(f"[Embed] retry #{attempt+1}/{max_attempts}{target_host} failed, mark_unhealthy + 取新主機")
logger.error(f"[Embed] all {len(attempted_hosts)} hosts failed; tried={attempted_hosts}")
return []
# 建立全域服務實例
ollama_service = OllamaService()
if __name__ == "__main__":
# 測試程式碼
logging.basicConfig(level=logging.INFO)
service = OllamaService()
# 測試連線
print("測試 Ollama 連線...")
if service.check_connection():
print(f"連線成功!可用模型: {service.available_models}")
# 測試文案生成
print("\n測試文案生成...")
result = service.generate_sales_copy(
"玻尿酸保濕面膜",
trend_keywords=["換季保養", "敏感肌"],
style="吸睛"
)
if result.success:
print(f"生成結果: {result.content}")
print(f"耗時: {result.total_duration:.2f}")
else:
print(f"生成失敗: {result.error}")
else:
print("連線失敗")