diff --git a/apps/api/src/services/governance_query_service.py b/apps/api/src/services/governance_query_service.py index 320081fc..1d4a89e6 100644 --- a/apps/api/src/services/governance_query_service.py +++ b/apps/api/src/services/governance_query_service.py @@ -116,7 +116,11 @@ def _extract_remediation(details: dict) -> str | None: return str(remediation)[:160] -def _to_governance_event(row: AiGovernanceEvent) -> GovernanceEvent: +def _to_governance_event( + row: AiGovernanceEvent, + *, + dispatch_ids: list[str] | None = None, +) -> GovernanceEvent: details = row.details if isinstance(row.details, dict) else {} return GovernanceEvent( id=row.id, @@ -128,10 +132,60 @@ def _to_governance_event(row: AiGovernanceEvent) -> GovernanceEvent: impact=_extract_impact(details), details=details, remediation=_extract_remediation(details), - dispatch_ids=details.get("dispatch_ids", []), + dispatch_ids=_merge_dispatch_ids(dispatch_ids or [], details.get("dispatch_ids")), ) +def _merge_dispatch_ids( + db_dispatch_ids: list[str], + legacy_dispatch_ids: Any, +) -> list[str]: + """合併 DB dispatch trail 與 legacy payload ids,DB truth-first。""" + merged: list[str] = [] + for raw in [*db_dispatch_ids, *(legacy_dispatch_ids if isinstance(legacy_dispatch_ids, list) else [])]: + if raw is None: + continue + value = str(raw) + if value and value not in merged: + merged.append(value) + return merged + + +async def _load_dispatch_ids_for_events(event_ids: list[str]) -> dict[str, list[str]]: + """從 governance_remediation_dispatch 讀取事件對應 dispatch ids。 + + events endpoint 必須能在 dispatch 表尚未建立的環境 graceful fallback, + 因此這裡捕捉 ProgrammingError 並回空 dict。 + """ + if not event_ids: + return {} + + sql = text(""" + SELECT + d.governance_event_id, + d.id + FROM governance_remediation_dispatch d + WHERE d.governance_event_id IN :event_ids + ORDER BY d.dispatched_at DESC + """).bindparams(bindparam("event_ids", expanding=True)) + + try: + async with get_db_context() as db: + result = await db.execute(sql, {"event_ids": event_ids}) + rows = result.fetchall() + except ProgrammingError as exc: + logger.warning( + "governance_dispatch_ids_table_not_ready", + error=str(exc), + ) + return {} + + dispatch_ids_by_event: dict[str, list[str]] = {} + for row in rows: + dispatch_ids_by_event.setdefault(str(row.governance_event_id), []).append(str(row.id)) + return dispatch_ids_by_event + + # ============================================================================= # Endpoint 1: events # ============================================================================= @@ -175,6 +229,7 @@ async def query_governance_events( result = await db.execute(stmt) all_rows = result.scalars().all() + event_rows_by_id = {str(r.id): r for r in all_rows} events = [_to_governance_event(r) for r in all_rows] # severity 過濾(Python 層) @@ -194,6 +249,15 @@ async def query_governance_events( total = len(events) offset = (page - 1) * size page_items = events[offset: offset + size] + dispatch_ids_by_event = await _load_dispatch_ids_for_events([e.id for e in page_items]) + if dispatch_ids_by_event: + page_items = [ + _to_governance_event( + event_rows_by_id[item.id], + dispatch_ids=dispatch_ids_by_event.get(item.id, []), + ) + for item in page_items + ] return GovernanceEventsResponse( items=page_items, diff --git a/apps/api/tests/test_ai_governance_endpoints.py b/apps/api/tests/test_ai_governance_endpoints.py index 33a8e7c9..bb4afff1 100644 --- a/apps/api/tests/test_ai_governance_endpoints.py +++ b/apps/api/tests/test_ai_governance_endpoints.py @@ -33,6 +33,7 @@ from src.models.governance import ( ) from src.services.governance_query_service import ( _extract_remediation, + _merge_dispatch_ids, _query_dispatch_table, _to_governance_event, ) @@ -221,6 +222,42 @@ class TestEventsReadSideNormalization: assert event.remediation == "補齊 SLO emitter" assert event.impact == "SLO metrics missing" + def test_governance_event_uses_db_dispatch_ids_first(self): + """events read model 應以 dispatch table ids 補齊 detail/history 鏈路。""" + row = type("Row", (), { + "id": "evt-001", + "event_type": "knowledge_degradation", + "triggered_at": NOW, + "resolved": False, + "resolved_at": None, + "details": { + "message": "KM stale", + "dispatch_ids": ["legacy-dispatch"], + }, + })() + + event = _to_governance_event(row, dispatch_ids=["db-dispatch", "legacy-dispatch"]) + + assert event.dispatch_ids == ["db-dispatch", "legacy-dispatch"] + + def test_merge_dispatch_ids_deduplicates_and_preserves_db_priority(self): + """DB truth-first,legacy payload 只作 fallback。""" + assert _merge_dispatch_ids( + ["db-2", "db-1"], + ["db-1", "legacy-1", None], + ) == ["db-2", "db-1", "legacy-1"] + + def test_events_query_joins_dispatch_table_for_history_buttons(self): + """events endpoint 不可只讀 details.dispatch_ids,必須查 dispatch table。""" + import inspect + from src.services import governance_query_service + + source = inspect.getsource(governance_query_service._load_dispatch_ids_for_events) + + assert "FROM governance_remediation_dispatch d" in source + assert "d.governance_event_id IN :event_ids" in source + assert "ORDER BY d.dispatched_at DESC" in source + # ============================================================================= # 3. queue endpoint graceful fallback