195 lines
6.0 KiB
Python
195 lines
6.0 KiB
Python
def test_enqueue_insight_embedding_builds_queue_payload(monkeypatch):
|
|
import services.openclaw_learning_service as learning
|
|
|
|
calls = []
|
|
monkeypatch.setattr(
|
|
learning,
|
|
"_enqueue_embedding",
|
|
lambda table, target_id, text: calls.append((table, target_id, text)) or True,
|
|
)
|
|
|
|
assert learning.enqueue_insight_embedding(42, "agent_action", "hello", "2026-04-29") is True
|
|
assert calls == [("ai_insights", 42, "agent_action (2026-04-29): hello")]
|
|
|
|
|
|
def test_enqueue_insight_embedding_rejects_missing_content(monkeypatch):
|
|
import services.openclaw_learning_service as learning
|
|
|
|
monkeypatch.setattr(
|
|
learning,
|
|
"_enqueue_embedding",
|
|
lambda *args, **kwargs: (_ for _ in ()).throw(AssertionError("should not enqueue")),
|
|
)
|
|
|
|
assert learning.enqueue_insight_embedding(42, "agent_action", "") is False
|
|
assert learning.enqueue_insight_embedding(None, "agent_action", "hello") is False
|
|
|
|
|
|
def test_enqueue_missing_insight_embeddings_queues_rows(monkeypatch):
|
|
from types import SimpleNamespace
|
|
import services.openclaw_learning_service as learning
|
|
|
|
rows = [
|
|
SimpleNamespace(id=1, insight_type="mcp_cache", period=None, content="市場資料"),
|
|
SimpleNamespace(id=2, insight_type="human_review", period="2026-04-29", content="人工審核"),
|
|
]
|
|
|
|
class Result:
|
|
def fetchall(self):
|
|
return rows
|
|
|
|
class Session:
|
|
def execute(self, *args, **kwargs):
|
|
return Result()
|
|
|
|
def close(self):
|
|
pass
|
|
|
|
calls = []
|
|
monkeypatch.setattr(learning, "get_session", lambda: Session())
|
|
monkeypatch.setattr(
|
|
learning,
|
|
"enqueue_insight_embedding",
|
|
lambda insight_id, insight_type, content, period=None: calls.append((insight_id, insight_type, content, period)) or True,
|
|
)
|
|
|
|
result = learning.enqueue_missing_insight_embeddings(limit=10)
|
|
|
|
assert result == {"scanned": 2, "enqueued": 2, "status": "ok"}
|
|
assert calls[0] == (1, "mcp_cache", "市場資料", None)
|
|
|
|
|
|
def test_process_one_embedding_writes_signature(monkeypatch):
|
|
import services.openclaw_learning_service as learning
|
|
from services.rag_service import get_embedding_signature
|
|
|
|
executed = []
|
|
|
|
class Session:
|
|
def execute(self, stmt, params=None):
|
|
executed.append((str(stmt), params or {}))
|
|
|
|
def commit(self):
|
|
pass
|
|
|
|
def rollback(self):
|
|
pass
|
|
|
|
def close(self):
|
|
pass
|
|
|
|
monkeypatch.setattr(learning, "get_session", lambda: Session())
|
|
monkeypatch.setattr(
|
|
learning.ollama_service,
|
|
"generate_embedding",
|
|
lambda text, model="bge-m3:latest", **_kwargs: [0.1] * 1024,
|
|
)
|
|
|
|
ok = learning._process_one_embedding(
|
|
row_id=7,
|
|
target_table="learning_episodes",
|
|
target_id=42,
|
|
text_content="測試內容",
|
|
model="bge-m3:latest",
|
|
)
|
|
|
|
assert ok is True
|
|
target_updates = [item for item in executed if "UPDATE learning_episodes" in item[0]]
|
|
assert target_updates
|
|
assert "embedding_signature" in target_updates[0][0]
|
|
assert target_updates[0][1]["sig"] == get_embedding_signature(model="bge-m3:latest", dim=1024)
|
|
|
|
|
|
def test_process_one_embedding_defers_without_attempt_when_gcp_circuit_open(monkeypatch):
|
|
import services.openclaw_learning_service as learning
|
|
|
|
executed = []
|
|
|
|
class Session:
|
|
def execute(self, stmt, params=None):
|
|
executed.append((str(stmt), params or {}))
|
|
|
|
def commit(self):
|
|
pass
|
|
|
|
def rollback(self):
|
|
pass
|
|
|
|
def close(self):
|
|
pass
|
|
|
|
monkeypatch.setattr(learning, "get_session", lambda: Session())
|
|
monkeypatch.setattr(
|
|
learning.ollama_service,
|
|
"generate_embedding",
|
|
lambda text, model="bge-m3:latest", **_kwargs: [],
|
|
)
|
|
monkeypatch.setattr(learning, "is_embedding_gcp_circuit_open", lambda: True)
|
|
monkeypatch.setattr(learning, "embedding_gcp_circuit_remaining_seconds", lambda: 42.0)
|
|
|
|
ok = learning._process_one_embedding(
|
|
row_id=7,
|
|
target_table="learning_episodes",
|
|
target_id=42,
|
|
text_content="測試內容",
|
|
model="bge-m3:latest",
|
|
)
|
|
|
|
assert ok is False
|
|
assert not any("attempts = attempts + 1" in stmt for stmt, _ in executed)
|
|
defer_updates = [item for item in executed if "SET status='pending'" in item[0]]
|
|
assert defer_updates
|
|
assert "不扣 attempts" in defer_updates[0][1]["err"]
|
|
|
|
|
|
def test_claim_pending_embeddings_pauses_when_gcp_circuit_open(monkeypatch):
|
|
import services.openclaw_learning_service as learning
|
|
|
|
monkeypatch.setattr(learning, "is_embedding_gcp_circuit_open", lambda: True)
|
|
monkeypatch.setattr(learning, "embedding_gcp_circuit_remaining_seconds", lambda: 30.0)
|
|
monkeypatch.setattr(
|
|
learning,
|
|
"get_session",
|
|
lambda: (_ for _ in ()).throw(AssertionError("熔斷中不應 claim DB rows")),
|
|
)
|
|
|
|
assert learning._claim_pending_embeddings(limit=3, max_attempts=5) == []
|
|
|
|
|
|
def test_claim_pending_embeddings_uses_skip_locked(monkeypatch):
|
|
import services.openclaw_learning_service as learning
|
|
|
|
executed = []
|
|
claimed_rows = [(7, "ai_insights", 42, "測試內容", "bge-m3:latest")]
|
|
|
|
class Result:
|
|
def fetchall(self):
|
|
return claimed_rows
|
|
|
|
class Session:
|
|
def execute(self, stmt, params=None):
|
|
executed.append((str(stmt), params or {}))
|
|
if "RETURNING q.id" in str(stmt):
|
|
return Result()
|
|
return Result()
|
|
|
|
def commit(self):
|
|
pass
|
|
|
|
def rollback(self):
|
|
pass
|
|
|
|
def close(self):
|
|
pass
|
|
|
|
monkeypatch.setattr(learning, "get_session", lambda: Session())
|
|
|
|
rows = learning._claim_pending_embeddings(limit=3, max_attempts=5)
|
|
|
|
assert rows == claimed_rows
|
|
claim_sql = [stmt for stmt, _ in executed if "RETURNING q.id" in stmt][0]
|
|
assert "FOR UPDATE SKIP LOCKED" in claim_sql
|
|
assert "UPDATE embedding_retry_queue q" in claim_sql
|
|
assert executed[-1][1]["lim"] == 3
|
|
assert executed[-1][1]["max"] == 5
|