fix(governance): link events to dispatch history
All checks were successful
Code Review / ai-code-review (push) Successful in 11s
CD Pipeline / tests (push) Successful in 5m55s
CD Pipeline / build-and-deploy (push) Successful in 3m38s
CD Pipeline / post-deploy-checks (push) Successful in 1m46s

This commit is contained in:
Your Name
2026-05-19 22:04:31 +08:00
parent 955dbce670
commit e2a2e03c79
2 changed files with 103 additions and 2 deletions

View File

@@ -116,7 +116,11 @@ def _extract_remediation(details: dict) -> str | None:
return str(remediation)[:160]
def _to_governance_event(row: AiGovernanceEvent) -> GovernanceEvent:
def _to_governance_event(
row: AiGovernanceEvent,
*,
dispatch_ids: list[str] | None = None,
) -> GovernanceEvent:
details = row.details if isinstance(row.details, dict) else {}
return GovernanceEvent(
id=row.id,
@@ -128,10 +132,60 @@ def _to_governance_event(row: AiGovernanceEvent) -> GovernanceEvent:
impact=_extract_impact(details),
details=details,
remediation=_extract_remediation(details),
dispatch_ids=details.get("dispatch_ids", []),
dispatch_ids=_merge_dispatch_ids(dispatch_ids or [], details.get("dispatch_ids")),
)
def _merge_dispatch_ids(
db_dispatch_ids: list[str],
legacy_dispatch_ids: Any,
) -> list[str]:
"""合併 DB dispatch trail 與 legacy payload idsDB truth-first。"""
merged: list[str] = []
for raw in [*db_dispatch_ids, *(legacy_dispatch_ids if isinstance(legacy_dispatch_ids, list) else [])]:
if raw is None:
continue
value = str(raw)
if value and value not in merged:
merged.append(value)
return merged
async def _load_dispatch_ids_for_events(event_ids: list[str]) -> dict[str, list[str]]:
"""從 governance_remediation_dispatch 讀取事件對應 dispatch ids。
events endpoint 必須能在 dispatch 表尚未建立的環境 graceful fallback
因此這裡捕捉 ProgrammingError 並回空 dict。
"""
if not event_ids:
return {}
sql = text("""
SELECT
d.governance_event_id,
d.id
FROM governance_remediation_dispatch d
WHERE d.governance_event_id IN :event_ids
ORDER BY d.dispatched_at DESC
""").bindparams(bindparam("event_ids", expanding=True))
try:
async with get_db_context() as db:
result = await db.execute(sql, {"event_ids": event_ids})
rows = result.fetchall()
except ProgrammingError as exc:
logger.warning(
"governance_dispatch_ids_table_not_ready",
error=str(exc),
)
return {}
dispatch_ids_by_event: dict[str, list[str]] = {}
for row in rows:
dispatch_ids_by_event.setdefault(str(row.governance_event_id), []).append(str(row.id))
return dispatch_ids_by_event
# =============================================================================
# Endpoint 1: events
# =============================================================================
@@ -175,6 +229,7 @@ async def query_governance_events(
result = await db.execute(stmt)
all_rows = result.scalars().all()
event_rows_by_id = {str(r.id): r for r in all_rows}
events = [_to_governance_event(r) for r in all_rows]
# severity 過濾Python 層)
@@ -194,6 +249,15 @@ async def query_governance_events(
total = len(events)
offset = (page - 1) * size
page_items = events[offset: offset + size]
dispatch_ids_by_event = await _load_dispatch_ids_for_events([e.id for e in page_items])
if dispatch_ids_by_event:
page_items = [
_to_governance_event(
event_rows_by_id[item.id],
dispatch_ids=dispatch_ids_by_event.get(item.id, []),
)
for item in page_items
]
return GovernanceEventsResponse(
items=page_items,

View File

@@ -33,6 +33,7 @@ from src.models.governance import (
)
from src.services.governance_query_service import (
_extract_remediation,
_merge_dispatch_ids,
_query_dispatch_table,
_to_governance_event,
)
@@ -221,6 +222,42 @@ class TestEventsReadSideNormalization:
assert event.remediation == "補齊 SLO emitter"
assert event.impact == "SLO metrics missing"
def test_governance_event_uses_db_dispatch_ids_first(self):
"""events read model 應以 dispatch table ids 補齊 detail/history 鏈路。"""
row = type("Row", (), {
"id": "evt-001",
"event_type": "knowledge_degradation",
"triggered_at": NOW,
"resolved": False,
"resolved_at": None,
"details": {
"message": "KM stale",
"dispatch_ids": ["legacy-dispatch"],
},
})()
event = _to_governance_event(row, dispatch_ids=["db-dispatch", "legacy-dispatch"])
assert event.dispatch_ids == ["db-dispatch", "legacy-dispatch"]
def test_merge_dispatch_ids_deduplicates_and_preserves_db_priority(self):
"""DB truth-firstlegacy payload 只作 fallback。"""
assert _merge_dispatch_ids(
["db-2", "db-1"],
["db-1", "legacy-1", None],
) == ["db-2", "db-1", "legacy-1"]
def test_events_query_joins_dispatch_table_for_history_buttons(self):
"""events endpoint 不可只讀 details.dispatch_ids必須查 dispatch table。"""
import inspect
from src.services import governance_query_service
source = inspect.getsource(governance_query_service._load_dispatch_ids_for_events)
assert "FROM governance_remediation_dispatch d" in source
assert "d.governance_event_id IN :event_ids" in source
assert "ORDER BY d.dispatched_at DESC" in source
# =============================================================================
# 3. queue endpoint graceful fallback