Files
ewoooc/tests/test_ai_insight_embedding_bridge.py
OoO 78eebfbcfc
All checks were successful
CD Pipeline / deploy (push) Successful in 1m19s
加入告警去重與洞察向量回補
2026-04-29 23:10:27 +08:00

60 lines
2.0 KiB
Python

def test_enqueue_insight_embedding_builds_queue_payload(monkeypatch):
import services.openclaw_learning_service as learning
calls = []
monkeypatch.setattr(
learning,
"_enqueue_embedding",
lambda table, target_id, text: calls.append((table, target_id, text)) or True,
)
assert learning.enqueue_insight_embedding(42, "agent_action", "hello", "2026-04-29") is True
assert calls == [("ai_insights", 42, "agent_action (2026-04-29): hello")]
def test_enqueue_insight_embedding_rejects_missing_content(monkeypatch):
import services.openclaw_learning_service as learning
monkeypatch.setattr(
learning,
"_enqueue_embedding",
lambda *args, **kwargs: (_ for _ in ()).throw(AssertionError("should not enqueue")),
)
assert learning.enqueue_insight_embedding(42, "agent_action", "") is False
assert learning.enqueue_insight_embedding(None, "agent_action", "hello") is False
def test_enqueue_missing_insight_embeddings_queues_rows(monkeypatch):
from types import SimpleNamespace
import services.openclaw_learning_service as learning
rows = [
SimpleNamespace(id=1, insight_type="mcp_cache", period=None, content="市場資料"),
SimpleNamespace(id=2, insight_type="human_review", period="2026-04-29", content="人工審核"),
]
class Result:
def fetchall(self):
return rows
class Session:
def execute(self, *args, **kwargs):
return Result()
def close(self):
pass
calls = []
monkeypatch.setattr(learning, "get_session", lambda: Session())
monkeypatch.setattr(
learning,
"enqueue_insight_embedding",
lambda insight_id, insight_type, content, period=None: calls.append((insight_id, insight_type, content, period)) or True,
)
result = learning.enqueue_missing_insight_embeddings(limit=10)
assert result == {"scanned": 2, "enqueued": 2, "status": "ok"}
assert calls[0] == (1, "mcp_cache", "市場資料", None)