修復 AI embedding queue 原子取件
All checks were successful
CD Pipeline / deploy (push) Successful in 1m3s

This commit is contained in:
OoO
2026-05-19 13:57:41 +08:00
parent 6af0e3c77f
commit 80bb654dbc
3 changed files with 93 additions and 32 deletions

View File

@@ -320,7 +320,7 @@ YOUTUBE_API_KEY = os.getenv('YOUTUBE_API_KEY', '')
# ==========================================
# 系統版本與路徑
# ==========================================
SYSTEM_VERSION = "V10.256"
SYSTEM_VERSION = "V10.257"
LOG_FILE_PATH = os.path.join(BASE_DIR, 'logs/system.log')
public_url = PUBLIC_URL # 用於模板顯示

View File

@@ -174,42 +174,65 @@ def _process_one_embedding(row_id: int, target_table: str, target_id: int,
session.close()
def _claim_pending_embeddings(limit: int = EMBED_BATCH_SIZE,
max_attempts: int = EMBED_MAX_ATTEMPTS):
"""Atomically claim pending embedding jobs across multiple workers."""
session = get_session()
try:
session.execute(
text("""
UPDATE embedding_retry_queue
SET status = 'pending',
updated_at = :now,
last_error = COALESCE(last_error, '') || ' | reset stale processing'
WHERE status = 'processing'
AND updated_at < :cutoff
"""),
{
"now": datetime.now(),
"cutoff": datetime.fromtimestamp(time.time() - 15 * 60),
},
)
session.commit()
rows = session.execute(
text("""
WITH picked AS (
SELECT id
FROM embedding_retry_queue
WHERE status = 'pending'
AND attempts < :max
ORDER BY created_at
FOR UPDATE SKIP LOCKED
LIMIT :lim
)
UPDATE embedding_retry_queue q
SET status = 'processing',
updated_at = :now
FROM picked
WHERE q.id = picked.id
RETURNING q.id, q.target_table, q.target_id, q.text_content, q.model
"""),
{
"now": datetime.now(),
"max": max_attempts,
"lim": limit,
},
).fetchall()
session.commit()
return rows
except Exception:
session.rollback()
raise
finally:
session.close()
def _embedding_worker_loop():
"""背景執行緒:輪詢 embedding_retry_queue批次處理 pending 項目"""
sys_log.info("[OCLearn] Hermes Embedding Worker (DB-backed) 啟動")
while True:
try:
session = get_session()
try:
session.execute(
text("""
UPDATE embedding_retry_queue
SET status = 'pending',
updated_at = :now,
last_error = COALESCE(last_error, '') || ' | reset stale processing'
WHERE status = 'processing'
AND updated_at < :cutoff
"""),
{
"now": datetime.now(),
"cutoff": datetime.fromtimestamp(time.time() - 15 * 60),
},
)
session.commit()
rows = session.execute(
text("""
SELECT id, target_table, target_id, text_content, model
FROM embedding_retry_queue
WHERE status = 'pending'
AND attempts < :max
ORDER BY created_at
LIMIT :lim
"""),
{"max": EMBED_MAX_ATTEMPTS, "lim": EMBED_BATCH_SIZE},
).fetchall()
finally:
session.close()
rows = _claim_pending_embeddings()
for row in rows:
_process_one_embedding(
row[0], row[1], row[2], row[3], row[4] or "bge-m3:latest"

View File

@@ -98,3 +98,41 @@ def test_process_one_embedding_writes_signature(monkeypatch):
assert target_updates
assert "embedding_signature" in target_updates[0][0]
assert target_updates[0][1]["sig"] == get_embedding_signature(model="bge-m3:latest", dim=1024)
def test_claim_pending_embeddings_uses_skip_locked(monkeypatch):
import services.openclaw_learning_service as learning
executed = []
claimed_rows = [(7, "ai_insights", 42, "測試內容", "bge-m3:latest")]
class Result:
def fetchall(self):
return claimed_rows
class Session:
def execute(self, stmt, params=None):
executed.append((str(stmt), params or {}))
if "RETURNING q.id" in str(stmt):
return Result()
return Result()
def commit(self):
pass
def rollback(self):
pass
def close(self):
pass
monkeypatch.setattr(learning, "get_session", lambda: Session())
rows = learning._claim_pending_embeddings(limit=3, max_attempts=5)
assert rows == claimed_rows
claim_sql = [stmt for stmt, _ in executed if "RETURNING q.id" in stmt][0]
assert "FOR UPDATE SKIP LOCKED" in claim_sql
assert "UPDATE embedding_retry_queue q" in claim_sql
assert executed[-1][1]["lim"] == 3
assert executed[-1][1]["max"] == 5