fix(api): scope AI SLO reads by project
All checks were successful
Code Review / ai-code-review (push) Successful in 18s
CD Pipeline / tests (push) Successful in 1m41s
CD Pipeline / build-and-deploy (push) Successful in 4m46s
CD Pipeline / post-deploy-checks (push) Successful in 1m36s

This commit is contained in:
Your Name
2026-06-27 15:17:02 +08:00
parent 12c8df05d2
commit 9afc89a461
5 changed files with 78 additions and 16 deletions

View File

@@ -58,6 +58,12 @@ class RemediationApprovalRequest(BaseModel):
@router.get("/ai/slo")
async def get_ai_slo(
force_refresh: bool = Query(False, description="忽略快取,強制重算"),
project_id: str = Query(
"awoooi",
min_length=1,
max_length=64,
description="租戶 / 專案 ID預設 AWOOOI 產品線",
),
) -> dict:
"""
取得 AI 決策品質 SLO 最新結果。
@@ -71,20 +77,24 @@ async def get_ai_slo(
cache_hit 是否命中快取
metrics[] 三大 SLO 指標明細
"""
calc = AiSloCalculator()
normalized_project_id = project_id.strip() or "awoooi"
calc = AiSloCalculator(project_id=normalized_project_id)
adr100_service = get_adr100_slo_status_service(normalized_project_id)
if not force_refresh:
cached = await calc.get_cached_report()
if cached:
data = cached.to_dict()
data["cache_hit"] = True
data["adr100"] = await get_adr100_slo_status_service().fetch_report()
data["project_id"] = normalized_project_id
data["adr100"] = await adr100_service.fetch_report()
return data
report = await calc.run()
data = report.to_dict()
data["cache_hit"] = False
data["adr100"] = await get_adr100_slo_status_service().fetch_report()
data["project_id"] = normalized_project_id
data["adr100"] = await adr100_service.fetch_report()
return data

View File

@@ -96,6 +96,10 @@ ADR100_SLO_DEFINITIONS: tuple[Adr100SloDefinition, ...] = (
class Adr100SloStatusService:
"""Fetch ADR-100 SLO status from Prometheus without writing governance events."""
def __init__(self, project_id: str = "awoooi") -> None:
normalized = str(project_id or "awoooi").strip()
self.project_id = normalized or "awoooi"
async def fetch_report(self) -> dict[str, Any]:
prom_url = getattr(
settings,
@@ -117,6 +121,7 @@ class Adr100SloStatusService:
return {
"schema_version": "adr100_slo_status_v1",
"source": "prometheus+postgresql",
"project_id": self.project_id,
"evaluated_at": now_taipei_iso(),
"overall_status": overall_status,
"overall_compliance": overall_compliance,
@@ -193,7 +198,7 @@ class Adr100SloStatusService:
async def _fetch_verification_coverage(self) -> dict[str, Any]:
"""Summarize whether recent auto-repair executions have verifier evidence."""
try:
async with get_db_context() as db:
async with get_db_context(self.project_id) as db:
summary_row = (
await db.execute(text(_VERIFICATION_COVERAGE_SQL))
).mappings().one()
@@ -754,11 +759,11 @@ def _overall_status(
return "skipped_low_volume"
_adr100_slo_status_service: Adr100SloStatusService | None = None
_adr100_slo_status_services: dict[str, Adr100SloStatusService] = {}
def get_adr100_slo_status_service() -> Adr100SloStatusService:
global _adr100_slo_status_service
if _adr100_slo_status_service is None:
_adr100_slo_status_service = Adr100SloStatusService()
return _adr100_slo_status_service
def get_adr100_slo_status_service(project_id: str = "awoooi") -> Adr100SloStatusService:
normalized = str(project_id or "awoooi").strip() or "awoooi"
if normalized not in _adr100_slo_status_services:
_adr100_slo_status_services[normalized] = Adr100SloStatusService(normalized)
return _adr100_slo_status_services[normalized]

View File

@@ -47,7 +47,8 @@ SLO_FALSE_NEG_MAX: float = 0.05 # verifier false negative 上限
SLO_WINDOW_DAYS: int = 7 # 滾動視窗(天)
SLO_MIN_SAMPLES: int = 5 # 最少樣本數,低於此不計算(資料不足)
REDIS_KEY = "ai:slo:latest"
DEFAULT_AI_SLO_PROJECT_ID = "awoooi"
REDIS_KEY_PREFIX = "ai:slo:latest"
REDIS_TTL_SEC = 300 # 5 分鐘快取
@@ -121,6 +122,14 @@ class AiSloCalculator:
await calc.save_violation_event(report)
"""
def __init__(self, project_id: str = DEFAULT_AI_SLO_PROJECT_ID) -> None:
normalized = str(project_id or DEFAULT_AI_SLO_PROJECT_ID).strip()
self.project_id = normalized or DEFAULT_AI_SLO_PROJECT_ID
@property
def redis_key(self) -> str:
return f"{REDIS_KEY_PREFIX}:{self.project_id}"
async def calculate(self) -> SloReport:
"""
計算三大 SLO 指標7d 滾動視窗)。
@@ -131,7 +140,7 @@ class AiSloCalculator:
try:
since = now_taipei() - timedelta(days=SLO_WINDOW_DAYS)
async with get_db_context() as session:
async with get_db_context(self.project_id) as session:
slo1 = await self._calc_auto_success_rate(session, since)
slo2 = await self._calc_human_override_rate(session, since)
slo3 = await self._calc_false_neg_rate(session, since)
@@ -152,6 +161,7 @@ class AiSloCalculator:
logger.info(
"slo_calculated",
project_id=self.project_id,
any_violated=any_violated,
slo1=slo1.value,
slo2=slo2.value,
@@ -160,7 +170,7 @@ class AiSloCalculator:
return report
except Exception as e:
logger.error("slo_calculation_error", error=str(e))
logger.error("slo_calculation_error", project_id=self.project_id, error=str(e))
# 保守:計算失敗 → 假設違反
violated_metric = SloMetric(
name="calculation_error",
@@ -180,7 +190,7 @@ class AiSloCalculator:
try:
from src.core.redis_client import get_redis
redis = get_redis()
raw = await redis.get(REDIS_KEY)
raw = await redis.get(self.redis_key)
if raw:
data = json.loads(raw)
metrics = [
@@ -210,7 +220,7 @@ class AiSloCalculator:
try:
from src.core.redis_client import get_redis
redis = get_redis()
await redis.set(REDIS_KEY, json.dumps(report.to_dict()), ex=REDIS_TTL_SEC)
await redis.set(self.redis_key, json.dumps(report.to_dict()), ex=REDIS_TTL_SEC)
except Exception as e:
logger.warning("slo_cache_write_error", error=str(e))
@@ -221,7 +231,7 @@ class AiSloCalculator:
只在 any_violated=True 時呼叫。不管舊違反是否解決。
"""
try:
async with get_db_context() as session:
async with get_db_context(self.project_id) as session:
event = AiGovernanceEvent(
event_type="slo_violation",
details=report.to_dict(),
@@ -231,6 +241,7 @@ class AiSloCalculator:
await session.commit()
logger.warning(
"slo_violation_recorded",
project_id=self.project_id,
violated_metrics=[m.name for m in report.metrics if m.violated],
)
except Exception as e:

View File

@@ -88,6 +88,35 @@ async def _low_volume_coverage(self): # noqa: ANN001
}
@pytest.mark.asyncio
async def test_verification_coverage_uses_project_scoped_db(monkeypatch):
seen: dict[str, str | None] = {}
class _FailingDbContext:
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
return False
async def execute(self, *_args, **_kwargs):
raise RuntimeError("stop_after_project_id_capture")
def _fake_get_db_context(project_id=None): # noqa: ANN001
seen["project_id"] = project_id
return _FailingDbContext()
monkeypatch.setattr(
"src.services.adr100_slo_status_service.get_db_context",
_fake_get_db_context,
)
payload = await Adr100SloStatusService(project_id="demo")._fetch_verification_coverage()
assert seen["project_id"] == "demo"
assert payload["status"] == "error"
@pytest.mark.asyncio
async def test_fetch_report_marks_ratio_slos_low_volume(monkeypatch):
values = {

View File

@@ -8,6 +8,7 @@ from src.jobs.ai_slo_watchdog_job import (
_is_observation_only_slo_violation,
)
from src.services.ai_slo_calculator import (
AiSloCalculator,
SLO_AUTO_SUCCESS_MIN,
SloMetric,
SloReport,
@@ -18,6 +19,12 @@ from src.services.ai_slo_calculator import (
TZ = ZoneInfo("Asia/Taipei")
def test_ai_slo_calculator_uses_project_scoped_cache_key():
assert AiSloCalculator().project_id == "awoooi"
assert AiSloCalculator().redis_key == "ai:slo:latest:awoooi"
assert AiSloCalculator(project_id="demo").redis_key == "ai:slo:latest:demo"
def _row(
*,
created_at: datetime,