117 lines
3.5 KiB
Python
117 lines
3.5 KiB
Python
from __future__ import annotations
|
|
|
|
from collections.abc import AsyncGenerator
|
|
from contextlib import asynccontextmanager
|
|
|
|
import pytest
|
|
|
|
from src.services import stats_service as stats_service_module
|
|
from src.services.stats_service import StatsService
|
|
|
|
|
|
class _NoopRedis:
|
|
async def get(self, key: str) -> None:
|
|
return None
|
|
|
|
async def set(self, key: str, value: str, ex: int) -> None:
|
|
return None
|
|
|
|
|
|
class _EmptyResult:
|
|
def all(self) -> list:
|
|
return []
|
|
|
|
|
|
class _RowsResult:
|
|
def __init__(self, rows: list[tuple]) -> None:
|
|
self._rows = rows
|
|
|
|
def all(self) -> list[tuple]:
|
|
return self._rows
|
|
|
|
|
|
class _FakeDb:
|
|
async def execute(self, statement) -> _EmptyResult:
|
|
return _EmptyResult()
|
|
|
|
|
|
class _QueuedDb:
|
|
def __init__(self, results: list[_RowsResult]) -> None:
|
|
self._results = results
|
|
|
|
async def execute(self, statement) -> _RowsResult:
|
|
return self._results.pop(0)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_stats_service_uses_default_project_for_db_context(monkeypatch) -> None:
|
|
seen_project_ids: list[str | None] = []
|
|
|
|
@asynccontextmanager
|
|
async def fake_db_context(project_id: str | None = None) -> AsyncGenerator[_FakeDb, None]:
|
|
seen_project_ids.append(project_id)
|
|
yield _FakeDb()
|
|
|
|
monkeypatch.setattr(stats_service_module, "get_db_context", fake_db_context)
|
|
monkeypatch.setattr(stats_service_module, "get_redis", lambda: _NoopRedis())
|
|
|
|
result = await StatsService().get_resolution_stats(days=7)
|
|
|
|
assert result["sample_size"] == 0
|
|
assert seen_project_ids == ["awoooi"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_stats_service_allows_explicit_project_for_db_context(monkeypatch) -> None:
|
|
seen_project_ids: list[str | None] = []
|
|
seen_cache_keys: list[str] = []
|
|
|
|
@asynccontextmanager
|
|
async def fake_db_context(project_id: str | None = None) -> AsyncGenerator[_FakeDb, None]:
|
|
seen_project_ids.append(project_id)
|
|
yield _FakeDb()
|
|
|
|
class TrackingRedis(_NoopRedis):
|
|
async def get(self, key: str) -> None:
|
|
seen_cache_keys.append(key)
|
|
return None
|
|
|
|
monkeypatch.setattr(stats_service_module, "get_db_context", fake_db_context)
|
|
monkeypatch.setattr(stats_service_module, "get_redis", lambda: TrackingRedis())
|
|
|
|
result = await StatsService().get_ai_performance(days=7, project_id="vibework")
|
|
|
|
assert result["total_proposals"] == 0
|
|
assert seen_project_ids == ["vibework"]
|
|
assert seen_cache_keys == ["stats:vibework:ai_performance:7"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_ai_performance_uses_auto_repair_execution_truth(monkeypatch) -> None:
|
|
seen_project_ids: list[str | None] = []
|
|
db = _QueuedDb(
|
|
[
|
|
_RowsResult([]),
|
|
_RowsResult([(True, 1200), (False, 800), (True, 500)]),
|
|
]
|
|
)
|
|
|
|
@asynccontextmanager
|
|
async def fake_db_context(project_id: str | None = None) -> AsyncGenerator[_QueuedDb, None]:
|
|
seen_project_ids.append(project_id)
|
|
yield db
|
|
|
|
monkeypatch.setattr(stats_service_module, "get_db_context", fake_db_context)
|
|
monkeypatch.setattr(stats_service_module, "get_redis", lambda: _NoopRedis())
|
|
|
|
result = await StatsService().get_ai_performance(days=7)
|
|
|
|
assert seen_project_ids == ["awoooi"]
|
|
assert result["total_proposals"] == 3
|
|
assert result["executed_count"] == 3
|
|
assert result["success_count"] == 2
|
|
assert result["success_rate"] == 66.67
|
|
assert result["auto_repair_total"] == 3
|
|
assert result["auto_repair_success"] == 2
|
|
assert result["source"] == "auto_repair_executions+incident_outcome"
|