fix(embed): generate_embedding 三主機 retry — 修同類「111 死則全死」bug
All checks were successful
CD Pipeline / deploy (push) Successful in 2m42s

承前 commit e862a90(generate retry)的同類修補:
generate_embedding 之前邏輯:
  target_host = host or env or resolve  # 一次解析
  try: post → mark_unhealthy + return []  # 失敗無 retry

修補後:
  caller 顯式 host → 凍結不 retry(向下相容)
  caller 走 lazy → 三主機 retry 鏈:
    每次 self.host 走 resolve_ollama_host()
    失敗 mark_unhealthy + cache 失效 + 取新主機
    最多 3 次(避免同主機無限迴圈)

影響範圍:KM embedding worker / RAG query embedding / openclaw_learning

regression: 57 unit tests 全綠

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
OoO
2026-05-04 08:56:38 +08:00
parent e862a9040c
commit 6572d521ba

View File

@@ -663,55 +663,69 @@ class OllamaService:
def generate_embedding(self, text: str, model: str = "bge-m3:latest",
host: str = None, timeout: int = None) -> List[float]:
"""
[ADR-007, Step 3] 呼叫 Ollama API 將文字轉換為向量 Embedding
[ADR-007] Embedding — 含三主機自動 retryHOTFIX 2026-05-04
2026-04-19 更新ADR-003 對齊):
embedding 預設走 Hermes 主機 `EMBEDDING_HOST`env: EMBEDDING_HOST
→ fallback http://192.168.0.111:11434內網免認證
避免 self.host 若指向公開 ollama.wooo.work 時回 401。
可透過 host 參數 override。
失敗時自動嘗試下一台主機(最多 3 次Primary → Secondary → 111
每次失敗 mark_unhealthy 觸發 resolve cache 失效,下次 resolve 取新主機。
caller 顯式 host=... 時凍結(不 retry
"""
# V-New: Embedding 也遵循 GCP 優先、111 備援邏輯
# EMBEDDING_HOST 若有明確設定則優先使用;否則透過 resolve_ollama_host 自動決定
target_host = (host or os.getenv("EMBEDDING_HOST") or resolve_ollama_host()).rstrip("/")
request_timeout = timeout or EMBED_TIMEOUT
try:
payload = {"model": model, "input": text}
response = requests.post(
f"{target_host}/api/embed",
json=payload,
timeout=request_timeout,
)
if response.status_code == 200:
vec = self._extract_embedding(response.json())
if vec:
return vec
logger.warning(f"Ollama Embed Empty Response @ {target_host}/api/embed")
elif response.status_code not in (404, 405):
logger.error(
f"Ollama Embed Error HTTP {response.status_code} @ {target_host}/api/embed: {response.text[:200]}"
def _embed_one(target_host: str) -> List[float]:
"""單次 embedding 嘗試 — 成功回 vec失敗回 [] + mark_unhealthy"""
try:
# /api/embed 主路徑
response = requests.post(
f"{target_host}/api/embed",
json={"model": model, "input": text},
timeout=request_timeout,
)
if response.status_code == 200:
vec = self._extract_embedding(response.json())
if vec:
return vec
logger.warning(f"[Embed] empty response @ {target_host}/api/embed")
elif response.status_code not in (404, 405):
logger.warning(f"[Embed] HTTP {response.status_code} @ {target_host}/api/embed: {response.text[:200]}")
mark_unhealthy(target_host)
return []
# /api/embeddings legacy fallback
legacy = requests.post(
f"{target_host}/api/embeddings",
json={"model": model, "prompt": text},
timeout=request_timeout,
)
if legacy.status_code == 200:
return self._extract_embedding(legacy.json())
logger.warning(f"[Embed] both endpoints failed @ {target_host}: {legacy.status_code}")
mark_unhealthy(target_host)
return []
except Exception as e:
logger.warning(f"[Embed] exception @ {target_host}: {e}")
mark_unhealthy(target_host)
return []
# V-Fix: 舊 Ollama 相容;/api/embeddings 已 deprecated但仍是部分舊節點唯一可用路徑。
legacy_response = requests.post(
f"{target_host}/api/embeddings",
json={"model": model, "prompt": text},
timeout=request_timeout,
)
if legacy_response.status_code == 200:
return self._extract_embedding(legacy_response.json())
logger.error(
f"Ollama Embed Error HTTP {legacy_response.status_code} @ {target_host}/api/embeddings: {legacy_response.text[:200]}"
)
# B4: 兩個 endpoint 都失敗,標 unhealthy 讓下次 resolve 跳過
mark_unhealthy(target_host)
return []
except Exception as e:
logger.error(f"Ollama Embed Exception @ {target_host}: {e}")
# B4: 連線/timeout 例外標 unhealthy
mark_unhealthy(target_host)
return []
# caller 顯式指定 host → 凍結不 retry
if host:
return _embed_one(host.rstrip("/"))
# HOTFIX 三主機 retry 鏈(與 generate() 同模式)
attempted_hosts: List[str] = []
for attempt in range(3):
target_host = (os.getenv("EMBEDDING_HOST") or resolve_ollama_host()).rstrip("/")
if target_host in attempted_hosts:
break # cache 還沒過期或同主機,避免無限迴圈
attempted_hosts.append(target_host)
vec = _embed_one(target_host)
if vec:
return vec
logger.info(f"[Embed] retry #{attempt+1}/3 — {target_host} failed, mark_unhealthy + 取新主機")
logger.error(f"[Embed] all {len(attempted_hosts)} hosts failed; tried={attempted_hosts}")
return []
# 建立全域服務實例