diff --git a/config.py b/config.py index ed0518f..d7ec5ea 100644 --- a/config.py +++ b/config.py @@ -320,7 +320,7 @@ YOUTUBE_API_KEY = os.getenv('YOUTUBE_API_KEY', '') # ========================================== # 系統版本與路徑 # ========================================== -SYSTEM_VERSION = "V10.256" +SYSTEM_VERSION = "V10.257" LOG_FILE_PATH = os.path.join(BASE_DIR, 'logs/system.log') public_url = PUBLIC_URL # 用於模板顯示 diff --git a/services/openclaw_learning_service.py b/services/openclaw_learning_service.py index 142ac45..ede4835 100644 --- a/services/openclaw_learning_service.py +++ b/services/openclaw_learning_service.py @@ -174,42 +174,65 @@ def _process_one_embedding(row_id: int, target_table: str, target_id: int, session.close() +def _claim_pending_embeddings(limit: int = EMBED_BATCH_SIZE, + max_attempts: int = EMBED_MAX_ATTEMPTS): + """Atomically claim pending embedding jobs across multiple workers.""" + session = get_session() + try: + session.execute( + text(""" + UPDATE embedding_retry_queue + SET status = 'pending', + updated_at = :now, + last_error = COALESCE(last_error, '') || ' | reset stale processing' + WHERE status = 'processing' + AND updated_at < :cutoff + """), + { + "now": datetime.now(), + "cutoff": datetime.fromtimestamp(time.time() - 15 * 60), + }, + ) + session.commit() + rows = session.execute( + text(""" + WITH picked AS ( + SELECT id + FROM embedding_retry_queue + WHERE status = 'pending' + AND attempts < :max + ORDER BY created_at + FOR UPDATE SKIP LOCKED + LIMIT :lim + ) + UPDATE embedding_retry_queue q + SET status = 'processing', + updated_at = :now + FROM picked + WHERE q.id = picked.id + RETURNING q.id, q.target_table, q.target_id, q.text_content, q.model + """), + { + "now": datetime.now(), + "max": max_attempts, + "lim": limit, + }, + ).fetchall() + session.commit() + return rows + except Exception: + session.rollback() + raise + finally: + session.close() + + def _embedding_worker_loop(): """背景執行緒:輪詢 embedding_retry_queue,批次處理 pending 項目""" sys_log.info("[OCLearn] Hermes Embedding Worker (DB-backed) 啟動") while True: try: - session = get_session() - try: - session.execute( - text(""" - UPDATE embedding_retry_queue - SET status = 'pending', - updated_at = :now, - last_error = COALESCE(last_error, '') || ' | reset stale processing' - WHERE status = 'processing' - AND updated_at < :cutoff - """), - { - "now": datetime.now(), - "cutoff": datetime.fromtimestamp(time.time() - 15 * 60), - }, - ) - session.commit() - rows = session.execute( - text(""" - SELECT id, target_table, target_id, text_content, model - FROM embedding_retry_queue - WHERE status = 'pending' - AND attempts < :max - ORDER BY created_at - LIMIT :lim - """), - {"max": EMBED_MAX_ATTEMPTS, "lim": EMBED_BATCH_SIZE}, - ).fetchall() - finally: - session.close() - + rows = _claim_pending_embeddings() for row in rows: _process_one_embedding( row[0], row[1], row[2], row[3], row[4] or "bge-m3:latest" diff --git a/tests/test_ai_insight_embedding_bridge.py b/tests/test_ai_insight_embedding_bridge.py index e82be0a..75f3eb2 100644 --- a/tests/test_ai_insight_embedding_bridge.py +++ b/tests/test_ai_insight_embedding_bridge.py @@ -98,3 +98,41 @@ def test_process_one_embedding_writes_signature(monkeypatch): assert target_updates assert "embedding_signature" in target_updates[0][0] assert target_updates[0][1]["sig"] == get_embedding_signature(model="bge-m3:latest", dim=1024) + + +def test_claim_pending_embeddings_uses_skip_locked(monkeypatch): + import services.openclaw_learning_service as learning + + executed = [] + claimed_rows = [(7, "ai_insights", 42, "測試內容", "bge-m3:latest")] + + class Result: + def fetchall(self): + return claimed_rows + + class Session: + def execute(self, stmt, params=None): + executed.append((str(stmt), params or {})) + if "RETURNING q.id" in str(stmt): + return Result() + return Result() + + def commit(self): + pass + + def rollback(self): + pass + + def close(self): + pass + + monkeypatch.setattr(learning, "get_session", lambda: Session()) + + rows = learning._claim_pending_embeddings(limit=3, max_attempts=5) + + assert rows == claimed_rows + claim_sql = [stmt for stmt, _ in executed if "RETURNING q.id" in stmt][0] + assert "FOR UPDATE SKIP LOCKED" in claim_sql + assert "UPDATE embedding_retry_queue q" in claim_sql + assert executed[-1][1]["lim"] == 3 + assert executed[-1][1]["max"] == 5