fix: defer embedding work during gcp backoff
All checks were successful
CD Pipeline / deploy (push) Successful in 1m10s
All checks were successful
CD Pipeline / deploy (push) Successful in 1m10s
This commit is contained in:
@@ -402,7 +402,7 @@ YOUTUBE_API_KEY = os.getenv('YOUTUBE_API_KEY', '')
|
||||
# ==========================================
|
||||
# 系統版本與路徑
|
||||
# ==========================================
|
||||
SYSTEM_VERSION = "V10.624"
|
||||
SYSTEM_VERSION = "V10.625"
|
||||
LOG_FILE_PATH = os.path.join(BASE_DIR, 'logs/system.log')
|
||||
public_url = PUBLIC_URL # 用於模板顯示
|
||||
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
# PChome 業績成長自動化作戰系統 — AI 競價情報模組 Single Source of Truth
|
||||
|
||||
> **最後更新**: 2026-06-16 (台北時間)
|
||||
> **狀態**: 🟢 四 AI Agent 自動化閉環已落地;LLM 路由紅線升級為 Ollama-first 三主機級聯;PChome 後台業績匯入韌性已補強;產品定位正名為「PChome 業績成長自動化作戰系統」;外部市場來源正規化層、自動同步、作戰清單與價格參考表優先讀取、CSV 備援預檢、前台操作入口、高可見頁面繁中化守門與比價/作戰 UI 工作台化已建立
|
||||
> **適用版本**: V10.624
|
||||
> **最後更新**: 2026-06-18 (台北時間)
|
||||
> **狀態**: 🟢 四 AI Agent 自動化閉環已落地;LLM 路由紅線升級為 Ollama-first 三主機級聯;PChome 後台業績匯入韌性已補強;產品定位正名為「PChome 業績成長自動化作戰系統」;外部市場來源正規化層、自動同步、作戰清單與價格參考表優先讀取、CSV 備援預檢、前台操作入口、高可見頁面繁中化守門、比價/作戰 UI 工作台化與 GCP embedding 熔斷延後處理已建立
|
||||
> **適用版本**: V10.625
|
||||
|
||||
---
|
||||
|
||||
@@ -25,6 +25,7 @@
|
||||
- `allow_111_fallback=False` 時,若 resolver 因 unhealthy cache 回傳 111,不得直接結束 embedding;必須強制改試尚未嘗試的 GCP-A / GCP-B,避免正式 log 出現 `tried=[]` 或只試單台 GCP-B。
|
||||
- `allow_111_fallback=False` 且 GCP-A / GCP-B 皆失敗時,背景 embedding 會開啟短暫 GCP failure circuit(預設 60 秒),期間不重複打兩台 GCP、不落 111,避免 worker 與 log 被連續失敗拖慢;GCP 恢復後會自然再試。
|
||||
- 背景 embedding 的 GCP-only 熔斷屬於可降級背景能力,應記錄為明確 WARNING 與 circuit 狀態,不應每次污染 ERROR 通道;真正允許三主機 fallback 的同步 embedding 全失敗仍保留 ERROR。
|
||||
- OpenClaw embedding worker 遇到 GCP-only failure circuit 時,必須把已 claim 任務退回 `pending` 並延後處理,不得扣 `attempts`、不得把同批任務刷成 `failed`;熔斷期間也不得繼續 claim 新任務。
|
||||
- Scheduler host health probe 不只看 `/api/tags`;GCP-A / GCP-B 節點必須再通過 `bge-m3` `/api/embed` 實作探針,才算 healthy。探針 timeout 預設 30s,111 預設不納入這個背景 embedding 探針,避免監測任務把 fallback Mac 載入 `bge-m3`。
|
||||
- 背景 embedding 會讀取最近 `host_health_probes` runtime 結果;若 GCP-A / GCP-B 在 `OLLAMA_EMBED_HOST_HEALTH_SKIP_WINDOW_MINUTES=20` 視窗內已被標為 unhealthy,`OllamaService.generate_embedding(..., allow_111_fallback=False)` 會先跳過該節點並開啟短暫 GCP circuit,不再等待 30 秒 timeout,也仍不落 111。此功能由 `OLLAMA_EMBED_HOST_HEALTH_SKIP_ENABLED=true` 控制,DB 讀取失敗時 fail-open 回到原本網路 retry。
|
||||
- BGE-M3 一致性檢查是監測任務,不是 fallback 壓測;預設只比對 GCP-A / GCP-B。111 Mac fallback 只有 `EMBED_CONSISTENCY_INCLUDE_111=true` 時才納入,避免每週背景檢查把 `bge-m3` 載入 111。
|
||||
|
||||
@@ -310,3 +310,10 @@
|
||||
- V10.624 將價格類 trigger 的高信心路徑改為「有實證就發 L3 HITL 價格覆核通知」,不再執行 orchestrator `execution_plan`,避免長任務 timeout 與自動調價誤解。
|
||||
- 新增 `price_decision_review` 決策信封,固定標示 `can_auto_execute=false`、`requires_hitl=true`、`execution_plan skipped`;通知只呈現 DB/Hermes 具體價差實證。
|
||||
- 測試新增高信心價格決策不執行長任務 step 的守門,避免未來又把價格告警回退成自主執行。
|
||||
|
||||
## 28. 2026-06-18 V10.625 背景 embedding 熔斷不扣 retry
|
||||
|
||||
- 部署 V10.624 後 scheduler log 顯示 GCP-A 最近 host health 不健康,GCP-B `bge-m3` 仍可能 30 秒 timeout;embedding worker 會在同批任務中連續把多筆 queue 標成失敗,造成 attempts 被白白消耗。
|
||||
- V10.625 將 GCP embedding failure circuit 狀態公開為 `is_embedding_gcp_circuit_open()` / `embedding_gcp_circuit_remaining_seconds()`,讓 worker 可用明確狀態判斷,不再猜測空向量原因。
|
||||
- `OpenClawLearningService` worker 在熔斷中不 claim 新任務;若處理中開啟熔斷,當筆與同批剩餘任務會退回 `pending` 並寫入延後原因,不扣 `attempts`、不刷成 `failed`。
|
||||
- 背景 embedding 仍維持 GCP-A → GCP-B,不落 111;111 不承接 `bge-m3` 背景批次的治理規則不變。
|
||||
|
||||
@@ -151,25 +151,37 @@ def _clear_resolved_host_cache() -> None:
|
||||
|
||||
def _embedding_gcp_circuit_active() -> bool:
|
||||
"""背景 embedding 不落 111;GCP 全掛時短暫熔斷,避免每筆任務重打兩台。"""
|
||||
if EMBED_GCP_FAILURE_COOLDOWN_SEC <= 0:
|
||||
return False
|
||||
import time
|
||||
now = time.time()
|
||||
blocked_until = float(_embedding_gcp_failure_circuit.get('blocked_until') or 0)
|
||||
if now >= blocked_until:
|
||||
remaining_seconds = embedding_gcp_circuit_remaining_seconds()
|
||||
if remaining_seconds <= 0:
|
||||
return False
|
||||
|
||||
import time
|
||||
now = time.time()
|
||||
notice_ts = float(_embedding_gcp_failure_circuit.get('notice_ts') or 0)
|
||||
if now - notice_ts >= EMBED_GCP_FAILURE_NOTICE_SEC:
|
||||
logger.warning(
|
||||
"[Embed] GCP embedding circuit open for %.1fs; tried=%s",
|
||||
blocked_until - now,
|
||||
remaining_seconds,
|
||||
list(_embedding_gcp_failure_circuit.get('tried') or ()),
|
||||
)
|
||||
_embedding_gcp_failure_circuit['notice_ts'] = now
|
||||
return True
|
||||
|
||||
|
||||
def embedding_gcp_circuit_remaining_seconds() -> float:
|
||||
"""回傳背景 GCP embedding 熔斷剩餘秒數;0 代表沒有熔斷。"""
|
||||
if EMBED_GCP_FAILURE_COOLDOWN_SEC <= 0:
|
||||
return 0.0
|
||||
import time
|
||||
blocked_until = float(_embedding_gcp_failure_circuit.get('blocked_until') or 0)
|
||||
return max(0.0, blocked_until - time.time())
|
||||
|
||||
|
||||
def is_embedding_gcp_circuit_open() -> bool:
|
||||
"""供 worker 判斷是否應延後 embedding 任務,避免白白扣 retry 次數。"""
|
||||
return embedding_gcp_circuit_remaining_seconds() > 0
|
||||
|
||||
|
||||
def _open_embedding_gcp_circuit(attempted_hosts: List[str]) -> None:
|
||||
if EMBED_GCP_FAILURE_COOLDOWN_SEC <= 0 or not attempted_hosts:
|
||||
return
|
||||
|
||||
@@ -7,7 +7,11 @@ from sqlalchemy import text
|
||||
from services.logger_manager import SystemLogger
|
||||
from database.manager import get_session
|
||||
from database.ai_models import AIInsight
|
||||
from services.ollama_service import ollama_service
|
||||
from services.ollama_service import (
|
||||
embedding_gcp_circuit_remaining_seconds,
|
||||
is_embedding_gcp_circuit_open,
|
||||
ollama_service,
|
||||
)
|
||||
|
||||
sys_log = SystemLogger("OCLearn").get_logger()
|
||||
|
||||
@@ -64,6 +68,34 @@ def _enqueue_embedding(target_table: str, target_id: int, text_content: str,
|
||||
session.close()
|
||||
|
||||
|
||||
def _defer_embedding_row(row_id: int, reason: str) -> bool:
|
||||
"""把已 claim 的 embedding 任務退回 pending,不扣 attempts。"""
|
||||
session = get_session()
|
||||
try:
|
||||
session.execute(
|
||||
text("""
|
||||
UPDATE embedding_retry_queue
|
||||
SET status='pending',
|
||||
last_error=:err,
|
||||
updated_at=:now
|
||||
WHERE id=:id
|
||||
"""),
|
||||
{
|
||||
"err": reason[:500],
|
||||
"now": _now_taipei_naive(),
|
||||
"id": row_id,
|
||||
},
|
||||
)
|
||||
session.commit()
|
||||
return True
|
||||
except Exception as exc:
|
||||
session.rollback()
|
||||
sys_log.warning(f"[OCLearn] embedding 延後處理狀態更新失敗: {exc}")
|
||||
return False
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
|
||||
def enqueue_insight_embedding(insight_id: int, insight_type: str, content: str,
|
||||
period: str = None) -> bool:
|
||||
"""Public bridge for legacy raw ai_insights inserts to join ADR-007 embedding queue."""
|
||||
@@ -125,6 +157,12 @@ def _process_one_embedding(row_id: int, target_table: str, target_id: int,
|
||||
allow_111_fallback=False,
|
||||
)
|
||||
if not vec:
|
||||
if is_embedding_gcp_circuit_open():
|
||||
remaining = embedding_gcp_circuit_remaining_seconds()
|
||||
reason = f"GCP embedding 熔斷中,延後處理,不扣 attempts(約 {remaining:.0f}s 後重試)"
|
||||
_defer_embedding_row(row_id, reason)
|
||||
sys_log.info(f"[OCLearn] embedding 延後處理 row_id={row_id}: {reason}")
|
||||
return False
|
||||
raise RuntimeError("embedding 回傳空值")
|
||||
|
||||
if target_table not in _EMBEDDING_TARGET_TABLES:
|
||||
@@ -187,6 +225,11 @@ def _process_one_embedding(row_id: int, target_table: str, target_id: int,
|
||||
def _claim_pending_embeddings(limit: int = EMBED_BATCH_SIZE,
|
||||
max_attempts: int = EMBED_MAX_ATTEMPTS):
|
||||
"""Atomically claim pending embedding jobs across multiple workers."""
|
||||
if is_embedding_gcp_circuit_open():
|
||||
remaining = embedding_gcp_circuit_remaining_seconds()
|
||||
sys_log.info(f"[OCLearn] embedding GCP 熔斷中,暫停 claim pending(約 {remaining:.0f}s)")
|
||||
return []
|
||||
|
||||
session = get_session()
|
||||
try:
|
||||
session.execute(
|
||||
@@ -243,10 +286,21 @@ def _embedding_worker_loop():
|
||||
while True:
|
||||
try:
|
||||
rows = _claim_pending_embeddings()
|
||||
for row in rows:
|
||||
for idx, row in enumerate(rows):
|
||||
_process_one_embedding(
|
||||
row[0], row[1], row[2], row[3], row[4] or "bge-m3:latest"
|
||||
)
|
||||
if is_embedding_gcp_circuit_open():
|
||||
remaining_rows = rows[idx + 1:]
|
||||
remaining = embedding_gcp_circuit_remaining_seconds()
|
||||
reason = f"GCP embedding 熔斷中,同批任務退回 pending,不扣 attempts(約 {remaining:.0f}s 後重試)"
|
||||
for pending_row in remaining_rows:
|
||||
_defer_embedding_row(pending_row[0], reason)
|
||||
if remaining_rows:
|
||||
sys_log.info(
|
||||
f"[OCLearn] embedding 同批任務已延後 {len(remaining_rows)} 筆: {reason}"
|
||||
)
|
||||
break
|
||||
except Exception as e:
|
||||
sys_log.warning(f"[OCLearn] worker 輪詢略過 (表可能未建立): {e}")
|
||||
|
||||
|
||||
@@ -100,6 +100,62 @@ def test_process_one_embedding_writes_signature(monkeypatch):
|
||||
assert target_updates[0][1]["sig"] == get_embedding_signature(model="bge-m3:latest", dim=1024)
|
||||
|
||||
|
||||
def test_process_one_embedding_defers_without_attempt_when_gcp_circuit_open(monkeypatch):
|
||||
import services.openclaw_learning_service as learning
|
||||
|
||||
executed = []
|
||||
|
||||
class Session:
|
||||
def execute(self, stmt, params=None):
|
||||
executed.append((str(stmt), params or {}))
|
||||
|
||||
def commit(self):
|
||||
pass
|
||||
|
||||
def rollback(self):
|
||||
pass
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
|
||||
monkeypatch.setattr(learning, "get_session", lambda: Session())
|
||||
monkeypatch.setattr(
|
||||
learning.ollama_service,
|
||||
"generate_embedding",
|
||||
lambda text, model="bge-m3:latest", **_kwargs: [],
|
||||
)
|
||||
monkeypatch.setattr(learning, "is_embedding_gcp_circuit_open", lambda: True)
|
||||
monkeypatch.setattr(learning, "embedding_gcp_circuit_remaining_seconds", lambda: 42.0)
|
||||
|
||||
ok = learning._process_one_embedding(
|
||||
row_id=7,
|
||||
target_table="learning_episodes",
|
||||
target_id=42,
|
||||
text_content="測試內容",
|
||||
model="bge-m3:latest",
|
||||
)
|
||||
|
||||
assert ok is False
|
||||
assert not any("attempts = attempts + 1" in stmt for stmt, _ in executed)
|
||||
defer_updates = [item for item in executed if "SET status='pending'" in item[0]]
|
||||
assert defer_updates
|
||||
assert "不扣 attempts" in defer_updates[0][1]["err"]
|
||||
|
||||
|
||||
def test_claim_pending_embeddings_pauses_when_gcp_circuit_open(monkeypatch):
|
||||
import services.openclaw_learning_service as learning
|
||||
|
||||
monkeypatch.setattr(learning, "is_embedding_gcp_circuit_open", lambda: True)
|
||||
monkeypatch.setattr(learning, "embedding_gcp_circuit_remaining_seconds", lambda: 30.0)
|
||||
monkeypatch.setattr(
|
||||
learning,
|
||||
"get_session",
|
||||
lambda: (_ for _ in ()).throw(AssertionError("熔斷中不應 claim DB rows")),
|
||||
)
|
||||
|
||||
assert learning._claim_pending_embeddings(limit=3, max_attempts=5) == []
|
||||
|
||||
|
||||
def test_claim_pending_embeddings_uses_skip_locked(monkeypatch):
|
||||
import services.openclaw_learning_service as learning
|
||||
|
||||
|
||||
@@ -482,6 +482,8 @@ def test_embedding_fallback_disabled_opens_short_gcp_failure_circuit():
|
||||
assert second == []
|
||||
assert posted_hosts == [oss.OLLAMA_HOST_PRIMARY, oss.OLLAMA_HOST_SECONDARY]
|
||||
assert oss._embedding_gcp_failure_circuit['blocked_until'] > 0
|
||||
assert oss.is_embedding_gcp_circuit_open() is True
|
||||
assert oss.embedding_gcp_circuit_remaining_seconds() > 0
|
||||
|
||||
|
||||
def test_embedding_health_label_maps_direct_and_proxy_gcp_hosts():
|
||||
|
||||
Reference in New Issue
Block a user