diff --git a/config.py b/config.py index 2c3113d..e0065d2 100644 --- a/config.py +++ b/config.py @@ -402,7 +402,7 @@ YOUTUBE_API_KEY = os.getenv('YOUTUBE_API_KEY', '') # ========================================== # 系統版本與路徑 # ========================================== -SYSTEM_VERSION = "V10.624" +SYSTEM_VERSION = "V10.625" LOG_FILE_PATH = os.path.join(BASE_DIR, 'logs/system.log') public_url = PUBLIC_URL # 用於模板顯示 diff --git a/docs/AI_INTELLIGENCE_MODULE_SOT.md b/docs/AI_INTELLIGENCE_MODULE_SOT.md index 8ea5734..024109d 100644 --- a/docs/AI_INTELLIGENCE_MODULE_SOT.md +++ b/docs/AI_INTELLIGENCE_MODULE_SOT.md @@ -1,8 +1,8 @@ # PChome 業績成長自動化作戰系統 — AI 競價情報模組 Single Source of Truth -> **最後更新**: 2026-06-16 (台北時間) -> **狀態**: 🟢 四 AI Agent 自動化閉環已落地;LLM 路由紅線升級為 Ollama-first 三主機級聯;PChome 後台業績匯入韌性已補強;產品定位正名為「PChome 業績成長自動化作戰系統」;外部市場來源正規化層、自動同步、作戰清單與價格參考表優先讀取、CSV 備援預檢、前台操作入口、高可見頁面繁中化守門與比價/作戰 UI 工作台化已建立 -> **適用版本**: V10.624 +> **最後更新**: 2026-06-18 (台北時間) +> **狀態**: 🟢 四 AI Agent 自動化閉環已落地;LLM 路由紅線升級為 Ollama-first 三主機級聯;PChome 後台業績匯入韌性已補強;產品定位正名為「PChome 業績成長自動化作戰系統」;外部市場來源正規化層、自動同步、作戰清單與價格參考表優先讀取、CSV 備援預檢、前台操作入口、高可見頁面繁中化守門、比價/作戰 UI 工作台化與 GCP embedding 熔斷延後處理已建立 +> **適用版本**: V10.625 --- @@ -25,6 +25,7 @@ - `allow_111_fallback=False` 時,若 resolver 因 unhealthy cache 回傳 111,不得直接結束 embedding;必須強制改試尚未嘗試的 GCP-A / GCP-B,避免正式 log 出現 `tried=[]` 或只試單台 GCP-B。 - `allow_111_fallback=False` 且 GCP-A / GCP-B 皆失敗時,背景 embedding 會開啟短暫 GCP failure circuit(預設 60 秒),期間不重複打兩台 GCP、不落 111,避免 worker 與 log 被連續失敗拖慢;GCP 恢復後會自然再試。 - 背景 embedding 的 GCP-only 熔斷屬於可降級背景能力,應記錄為明確 WARNING 與 circuit 狀態,不應每次污染 ERROR 通道;真正允許三主機 fallback 的同步 embedding 全失敗仍保留 ERROR。 +- OpenClaw embedding worker 遇到 GCP-only failure circuit 時,必須把已 claim 任務退回 `pending` 並延後處理,不得扣 `attempts`、不得把同批任務刷成 `failed`;熔斷期間也不得繼續 claim 新任務。 - Scheduler host health probe 不只看 `/api/tags`;GCP-A / GCP-B 節點必須再通過 `bge-m3` `/api/embed` 實作探針,才算 healthy。探針 timeout 預設 30s,111 預設不納入這個背景 embedding 探針,避免監測任務把 fallback Mac 載入 `bge-m3`。 - 背景 embedding 會讀取最近 `host_health_probes` runtime 結果;若 GCP-A / GCP-B 在 `OLLAMA_EMBED_HOST_HEALTH_SKIP_WINDOW_MINUTES=20` 視窗內已被標為 unhealthy,`OllamaService.generate_embedding(..., allow_111_fallback=False)` 會先跳過該節點並開啟短暫 GCP circuit,不再等待 30 秒 timeout,也仍不落 111。此功能由 `OLLAMA_EMBED_HOST_HEALTH_SKIP_ENABLED=true` 控制,DB 讀取失敗時 fail-open 回到原本網路 retry。 - BGE-M3 一致性檢查是監測任務,不是 fallback 壓測;預設只比對 GCP-A / GCP-B。111 Mac fallback 只有 `EMBED_CONSISTENCY_INCLUDE_111=true` 時才納入,避免每週背景檢查把 `bge-m3` 載入 111。 diff --git a/docs/memory/current_execution_queue_20260524.md b/docs/memory/current_execution_queue_20260524.md index 329ac16..62c801f 100644 --- a/docs/memory/current_execution_queue_20260524.md +++ b/docs/memory/current_execution_queue_20260524.md @@ -310,3 +310,10 @@ - V10.624 將價格類 trigger 的高信心路徑改為「有實證就發 L3 HITL 價格覆核通知」,不再執行 orchestrator `execution_plan`,避免長任務 timeout 與自動調價誤解。 - 新增 `price_decision_review` 決策信封,固定標示 `can_auto_execute=false`、`requires_hitl=true`、`execution_plan skipped`;通知只呈現 DB/Hermes 具體價差實證。 - 測試新增高信心價格決策不執行長任務 step 的守門,避免未來又把價格告警回退成自主執行。 + +## 28. 2026-06-18 V10.625 背景 embedding 熔斷不扣 retry + +- 部署 V10.624 後 scheduler log 顯示 GCP-A 最近 host health 不健康,GCP-B `bge-m3` 仍可能 30 秒 timeout;embedding worker 會在同批任務中連續把多筆 queue 標成失敗,造成 attempts 被白白消耗。 +- V10.625 將 GCP embedding failure circuit 狀態公開為 `is_embedding_gcp_circuit_open()` / `embedding_gcp_circuit_remaining_seconds()`,讓 worker 可用明確狀態判斷,不再猜測空向量原因。 +- `OpenClawLearningService` worker 在熔斷中不 claim 新任務;若處理中開啟熔斷,當筆與同批剩餘任務會退回 `pending` 並寫入延後原因,不扣 `attempts`、不刷成 `failed`。 +- 背景 embedding 仍維持 GCP-A → GCP-B,不落 111;111 不承接 `bge-m3` 背景批次的治理規則不變。 diff --git a/services/ollama_service.py b/services/ollama_service.py index c4ae4de..be99aa2 100644 --- a/services/ollama_service.py +++ b/services/ollama_service.py @@ -151,25 +151,37 @@ def _clear_resolved_host_cache() -> None: def _embedding_gcp_circuit_active() -> bool: """背景 embedding 不落 111;GCP 全掛時短暫熔斷,避免每筆任務重打兩台。""" - if EMBED_GCP_FAILURE_COOLDOWN_SEC <= 0: - return False - import time - now = time.time() - blocked_until = float(_embedding_gcp_failure_circuit.get('blocked_until') or 0) - if now >= blocked_until: + remaining_seconds = embedding_gcp_circuit_remaining_seconds() + if remaining_seconds <= 0: return False + import time + now = time.time() notice_ts = float(_embedding_gcp_failure_circuit.get('notice_ts') or 0) if now - notice_ts >= EMBED_GCP_FAILURE_NOTICE_SEC: logger.warning( "[Embed] GCP embedding circuit open for %.1fs; tried=%s", - blocked_until - now, + remaining_seconds, list(_embedding_gcp_failure_circuit.get('tried') or ()), ) _embedding_gcp_failure_circuit['notice_ts'] = now return True +def embedding_gcp_circuit_remaining_seconds() -> float: + """回傳背景 GCP embedding 熔斷剩餘秒數;0 代表沒有熔斷。""" + if EMBED_GCP_FAILURE_COOLDOWN_SEC <= 0: + return 0.0 + import time + blocked_until = float(_embedding_gcp_failure_circuit.get('blocked_until') or 0) + return max(0.0, blocked_until - time.time()) + + +def is_embedding_gcp_circuit_open() -> bool: + """供 worker 判斷是否應延後 embedding 任務,避免白白扣 retry 次數。""" + return embedding_gcp_circuit_remaining_seconds() > 0 + + def _open_embedding_gcp_circuit(attempted_hosts: List[str]) -> None: if EMBED_GCP_FAILURE_COOLDOWN_SEC <= 0 or not attempted_hosts: return diff --git a/services/openclaw_learning_service.py b/services/openclaw_learning_service.py index c1ac976..39aaf53 100644 --- a/services/openclaw_learning_service.py +++ b/services/openclaw_learning_service.py @@ -7,7 +7,11 @@ from sqlalchemy import text from services.logger_manager import SystemLogger from database.manager import get_session from database.ai_models import AIInsight -from services.ollama_service import ollama_service +from services.ollama_service import ( + embedding_gcp_circuit_remaining_seconds, + is_embedding_gcp_circuit_open, + ollama_service, +) sys_log = SystemLogger("OCLearn").get_logger() @@ -64,6 +68,34 @@ def _enqueue_embedding(target_table: str, target_id: int, text_content: str, session.close() +def _defer_embedding_row(row_id: int, reason: str) -> bool: + """把已 claim 的 embedding 任務退回 pending,不扣 attempts。""" + session = get_session() + try: + session.execute( + text(""" + UPDATE embedding_retry_queue + SET status='pending', + last_error=:err, + updated_at=:now + WHERE id=:id + """), + { + "err": reason[:500], + "now": _now_taipei_naive(), + "id": row_id, + }, + ) + session.commit() + return True + except Exception as exc: + session.rollback() + sys_log.warning(f"[OCLearn] embedding 延後處理狀態更新失敗: {exc}") + return False + finally: + session.close() + + def enqueue_insight_embedding(insight_id: int, insight_type: str, content: str, period: str = None) -> bool: """Public bridge for legacy raw ai_insights inserts to join ADR-007 embedding queue.""" @@ -125,6 +157,12 @@ def _process_one_embedding(row_id: int, target_table: str, target_id: int, allow_111_fallback=False, ) if not vec: + if is_embedding_gcp_circuit_open(): + remaining = embedding_gcp_circuit_remaining_seconds() + reason = f"GCP embedding 熔斷中,延後處理,不扣 attempts(約 {remaining:.0f}s 後重試)" + _defer_embedding_row(row_id, reason) + sys_log.info(f"[OCLearn] embedding 延後處理 row_id={row_id}: {reason}") + return False raise RuntimeError("embedding 回傳空值") if target_table not in _EMBEDDING_TARGET_TABLES: @@ -187,6 +225,11 @@ def _process_one_embedding(row_id: int, target_table: str, target_id: int, def _claim_pending_embeddings(limit: int = EMBED_BATCH_SIZE, max_attempts: int = EMBED_MAX_ATTEMPTS): """Atomically claim pending embedding jobs across multiple workers.""" + if is_embedding_gcp_circuit_open(): + remaining = embedding_gcp_circuit_remaining_seconds() + sys_log.info(f"[OCLearn] embedding GCP 熔斷中,暫停 claim pending(約 {remaining:.0f}s)") + return [] + session = get_session() try: session.execute( @@ -243,10 +286,21 @@ def _embedding_worker_loop(): while True: try: rows = _claim_pending_embeddings() - for row in rows: + for idx, row in enumerate(rows): _process_one_embedding( row[0], row[1], row[2], row[3], row[4] or "bge-m3:latest" ) + if is_embedding_gcp_circuit_open(): + remaining_rows = rows[idx + 1:] + remaining = embedding_gcp_circuit_remaining_seconds() + reason = f"GCP embedding 熔斷中,同批任務退回 pending,不扣 attempts(約 {remaining:.0f}s 後重試)" + for pending_row in remaining_rows: + _defer_embedding_row(pending_row[0], reason) + if remaining_rows: + sys_log.info( + f"[OCLearn] embedding 同批任務已延後 {len(remaining_rows)} 筆: {reason}" + ) + break except Exception as e: sys_log.warning(f"[OCLearn] worker 輪詢略過 (表可能未建立): {e}") diff --git a/tests/test_ai_insight_embedding_bridge.py b/tests/test_ai_insight_embedding_bridge.py index 95127d2..6115e2f 100644 --- a/tests/test_ai_insight_embedding_bridge.py +++ b/tests/test_ai_insight_embedding_bridge.py @@ -100,6 +100,62 @@ def test_process_one_embedding_writes_signature(monkeypatch): assert target_updates[0][1]["sig"] == get_embedding_signature(model="bge-m3:latest", dim=1024) +def test_process_one_embedding_defers_without_attempt_when_gcp_circuit_open(monkeypatch): + import services.openclaw_learning_service as learning + + executed = [] + + class Session: + def execute(self, stmt, params=None): + executed.append((str(stmt), params or {})) + + def commit(self): + pass + + def rollback(self): + pass + + def close(self): + pass + + monkeypatch.setattr(learning, "get_session", lambda: Session()) + monkeypatch.setattr( + learning.ollama_service, + "generate_embedding", + lambda text, model="bge-m3:latest", **_kwargs: [], + ) + monkeypatch.setattr(learning, "is_embedding_gcp_circuit_open", lambda: True) + monkeypatch.setattr(learning, "embedding_gcp_circuit_remaining_seconds", lambda: 42.0) + + ok = learning._process_one_embedding( + row_id=7, + target_table="learning_episodes", + target_id=42, + text_content="測試內容", + model="bge-m3:latest", + ) + + assert ok is False + assert not any("attempts = attempts + 1" in stmt for stmt, _ in executed) + defer_updates = [item for item in executed if "SET status='pending'" in item[0]] + assert defer_updates + assert "不扣 attempts" in defer_updates[0][1]["err"] + + +def test_claim_pending_embeddings_pauses_when_gcp_circuit_open(monkeypatch): + import services.openclaw_learning_service as learning + + monkeypatch.setattr(learning, "is_embedding_gcp_circuit_open", lambda: True) + monkeypatch.setattr(learning, "embedding_gcp_circuit_remaining_seconds", lambda: 30.0) + monkeypatch.setattr( + learning, + "get_session", + lambda: (_ for _ in ()).throw(AssertionError("熔斷中不應 claim DB rows")), + ) + + assert learning._claim_pending_embeddings(limit=3, max_attempts=5) == [] + + def test_claim_pending_embeddings_uses_skip_locked(monkeypatch): import services.openclaw_learning_service as learning diff --git a/tests/test_ollama_retry_chain.py b/tests/test_ollama_retry_chain.py index 7d989f7..af02a26 100644 --- a/tests/test_ollama_retry_chain.py +++ b/tests/test_ollama_retry_chain.py @@ -482,6 +482,8 @@ def test_embedding_fallback_disabled_opens_short_gcp_failure_circuit(): assert second == [] assert posted_hosts == [oss.OLLAMA_HOST_PRIMARY, oss.OLLAMA_HOST_SECONDARY] assert oss._embedding_gcp_failure_circuit['blocked_until'] > 0 + assert oss.is_embedding_gcp_circuit_open() is True + assert oss.embedding_gcp_circuit_remaining_seconds() > 0 def test_embedding_health_label_maps_direct_and_proxy_gcp_hosts():