def test_enqueue_insight_embedding_builds_queue_payload(monkeypatch): import services.openclaw_learning_service as learning calls = [] monkeypatch.setattr( learning, "_enqueue_embedding", lambda table, target_id, text: calls.append((table, target_id, text)) or True, ) assert learning.enqueue_insight_embedding(42, "agent_action", "hello", "2026-04-29") is True assert calls == [("ai_insights", 42, "agent_action (2026-04-29): hello")] def test_enqueue_insight_embedding_rejects_missing_content(monkeypatch): import services.openclaw_learning_service as learning monkeypatch.setattr( learning, "_enqueue_embedding", lambda *args, **kwargs: (_ for _ in ()).throw(AssertionError("should not enqueue")), ) assert learning.enqueue_insight_embedding(42, "agent_action", "") is False assert learning.enqueue_insight_embedding(None, "agent_action", "hello") is False def test_enqueue_missing_insight_embeddings_queues_rows(monkeypatch): from types import SimpleNamespace import services.openclaw_learning_service as learning rows = [ SimpleNamespace(id=1, insight_type="mcp_cache", period=None, content="市場資料"), SimpleNamespace(id=2, insight_type="human_review", period="2026-04-29", content="人工審核"), ] class Result: def fetchall(self): return rows class Session: def execute(self, *args, **kwargs): return Result() def close(self): pass calls = [] monkeypatch.setattr(learning, "get_session", lambda: Session()) monkeypatch.setattr( learning, "enqueue_insight_embedding", lambda insight_id, insight_type, content, period=None: calls.append((insight_id, insight_type, content, period)) or True, ) result = learning.enqueue_missing_insight_embeddings(limit=10) assert result == {"scanned": 2, "enqueued": 2, "status": "ok"} assert calls[0] == (1, "mcp_cache", "市場資料", None) def test_process_one_embedding_writes_signature(monkeypatch): import services.openclaw_learning_service as learning from services.rag_service import get_embedding_signature executed = [] class Session: def execute(self, stmt, params=None): executed.append((str(stmt), params or {})) def commit(self): pass def rollback(self): pass def close(self): pass monkeypatch.setattr(learning, "get_session", lambda: Session()) monkeypatch.setattr( learning.ollama_service, "generate_embedding", lambda text, model="bge-m3:latest", **_kwargs: [0.1] * 1024, ) ok = learning._process_one_embedding( row_id=7, target_table="learning_episodes", target_id=42, text_content="測試內容", model="bge-m3:latest", ) assert ok is True target_updates = [item for item in executed if "UPDATE learning_episodes" in item[0]] assert target_updates assert "embedding_signature" in target_updates[0][0] assert target_updates[0][1]["sig"] == get_embedding_signature(model="bge-m3:latest", dim=1024) def test_process_one_embedding_defers_without_attempt_when_gcp_circuit_open(monkeypatch): import services.openclaw_learning_service as learning executed = [] class Session: def execute(self, stmt, params=None): executed.append((str(stmt), params or {})) def commit(self): pass def rollback(self): pass def close(self): pass monkeypatch.setattr(learning, "get_session", lambda: Session()) monkeypatch.setattr( learning.ollama_service, "generate_embedding", lambda text, model="bge-m3:latest", **_kwargs: [], ) monkeypatch.setattr(learning, "is_embedding_gcp_circuit_open", lambda: True) monkeypatch.setattr(learning, "embedding_gcp_circuit_remaining_seconds", lambda: 42.0) ok = learning._process_one_embedding( row_id=7, target_table="learning_episodes", target_id=42, text_content="測試內容", model="bge-m3:latest", ) assert ok is False assert not any("attempts = attempts + 1" in stmt for stmt, _ in executed) defer_updates = [item for item in executed if "SET status='pending'" in item[0]] assert defer_updates assert "不扣 attempts" in defer_updates[0][1]["err"] def test_claim_pending_embeddings_pauses_when_gcp_circuit_open(monkeypatch): import services.openclaw_learning_service as learning monkeypatch.setattr(learning, "is_embedding_gcp_circuit_open", lambda: True) monkeypatch.setattr(learning, "embedding_gcp_circuit_remaining_seconds", lambda: 30.0) monkeypatch.setattr( learning, "get_session", lambda: (_ for _ in ()).throw(AssertionError("熔斷中不應 claim DB rows")), ) assert learning._claim_pending_embeddings(limit=3, max_attempts=5) == [] def test_claim_pending_embeddings_uses_skip_locked(monkeypatch): import services.openclaw_learning_service as learning executed = [] claimed_rows = [(7, "ai_insights", 42, "測試內容", "bge-m3:latest")] class Result: def fetchall(self): return claimed_rows class Session: def execute(self, stmt, params=None): executed.append((str(stmt), params or {})) if "RETURNING q.id" in str(stmt): return Result() return Result() def commit(self): pass def rollback(self): pass def close(self): pass monkeypatch.setattr(learning, "get_session", lambda: Session()) rows = learning._claim_pending_embeddings(limit=3, max_attempts=5) assert rows == claimed_rows claim_sql = [stmt for stmt, _ in executed if "RETURNING q.id" in stmt][0] assert "FOR UPDATE SKIP LOCKED" in claim_sql assert "UPDATE embedding_retry_queue q" in claim_sql assert executed[-1][1]["lim"] == 3 assert executed[-1][1]["max"] == 5