diff --git a/apps/api/src/services/governance_dispatcher.py b/apps/api/src/services/governance_dispatcher.py index acca3d8c..9edbb108 100644 --- a/apps/api/src/services/governance_dispatcher.py +++ b/apps/api/src/services/governance_dispatcher.py @@ -28,11 +28,12 @@ from datetime import datetime, timezone from typing import Any import structlog -from sqlalchemy import select, update +from sqlalchemy import exists, select, update from src.db.base import get_db_context -from src.db.models import AiGovernanceEvent +from src.db.models import AiGovernanceEvent, GovernanceRemediationDispatch from src.repositories.governance_remediation_dispatch_repo import ( + ACTIVE_STATUSES, DispatchAlreadyActive, create_dispatch, get_active_for_event, @@ -136,16 +137,9 @@ async def dispatch_governance_event(event: AiGovernanceEvent) -> str | None: event_id = event.id event_type = event.event_type - # Step 0: Redis skip 冷卻檢查(防止 skip 事件每 30s 重新做 LLM 呼叫) - if await _is_skip_cooldown(event_id): - logger.debug( - "governance_dispatch_skip_cooldown", - event_id=event_id, - event_type=event_type, - ) - return None - - # Step 1: 檢查是否已有活躍 dispatch(冪等保護) + # Step 0: 檢查是否已有活躍 dispatch(冪等保護)。 + # 先查 DB 再看 skip cooldown,避免曾經被 skip 的 knowledge_degradation + # 無法補建 Hermes KB healthcheck intake work item。 existing = await get_active_for_event(event_id) if existing is not None: logger.debug( @@ -162,6 +156,15 @@ async def dispatch_governance_event(event: AiGovernanceEvent) -> str | None: if _is_kb_growth_healthcheck_event(event): return await _record_kb_growth_healthcheck_dispatch(event) + # Step 1: Redis skip 冷卻檢查(防止一般 skip 事件每 30s 重新做 LLM 呼叫)。 + if await _is_skip_cooldown(event_id): + logger.debug( + "governance_dispatch_skip_cooldown", + event_id=event_id, + event_type=event_type, + ) + return None + # Step 2: 決策融合(三維:LLM × Playbook × MCP) adapter = get_decision_fusion_adapter() try: @@ -346,10 +349,15 @@ async def _poll_unresolved_events() -> list[AiGovernanceEvent]: 最多 _MAX_EVENTS_PER_CYCLE 筆 AiGovernanceEvent ORM 物件列表 """ async with get_db_context() as db: + active_dispatch_exists = exists().where( + GovernanceRemediationDispatch.governance_event_id == AiGovernanceEvent.id, + GovernanceRemediationDispatch.dispatch_status.in_(list(ACTIVE_STATUSES)), + ) result = await db.execute( select(AiGovernanceEvent) .where(AiGovernanceEvent.resolved.is_(False)) .where(AiGovernanceEvent.event_type.in_(list(_DISPATCHABLE_EVENT_TYPES))) + .where(~active_dispatch_exists) .order_by(AiGovernanceEvent.triggered_at.asc()) .limit(_MAX_EVENTS_PER_CYCLE) ) diff --git a/apps/api/tests/test_governance_dispatcher.py b/apps/api/tests/test_governance_dispatcher.py index 3964793d..b992f0d2 100644 --- a/apps/api/tests/test_governance_dispatcher.py +++ b/apps/api/tests/test_governance_dispatcher.py @@ -253,6 +253,38 @@ class TestDispatchGovernanceEvent: assert dispatch_kwargs["decision_context"]["next_action"] == "run_kb_growth_healthcheck" assert dispatch_kwargs["decision_context"]["workflow"]["current_stage"] == "queued_kb_healthcheck" + @pytest.mark.asyncio + async def test_knowledge_degradation_intake_ignores_legacy_skip_cooldown(self): + """舊 skip cooldown 不可阻止 KM healthcheck 補建 work item。""" + event = _make_governance_event(event_type="knowledge_degradation") + event.details = { + "remediation": {"next_action": "run_kb_growth_healthcheck"}, + "ownership": {"lead_agent": "Hermes"}, + } + mock_dispatch_row = MagicMock() + mock_dispatch_row.id = "dispatch-kb-cooldown" + + with ( + patch( + "src.services.governance_dispatcher.get_active_for_event", + new=AsyncMock(return_value=None), + ), + patch( + "src.services.governance_dispatcher._is_skip_cooldown", + new=AsyncMock(return_value=True), + ) as mock_skip_cooldown, + patch( + "src.services.governance_dispatcher.create_dispatch", + new=AsyncMock(return_value=mock_dispatch_row), + ) as mock_create, + ): + from src.services.governance_dispatcher import dispatch_governance_event + result = await dispatch_governance_event(event) + + assert result == "dispatch-kb-cooldown" + mock_skip_cooldown.assert_not_awaited() + mock_create.assert_awaited_once() + @pytest.mark.asyncio async def test_llm_failure_fallback_to_skip(self): """fusion adapter 拋 Exception → fallback skip,不寫 dispatch,返回 None。""" @@ -467,6 +499,17 @@ class TestDecisionFusionAdapterHelpers: class TestRunGovernanceDispatcherLoop: """run_governance_dispatcher_loop 排程迴圈行為測試。""" + def test_poll_unresolved_events_excludes_active_dispatch_rows(self): + """poll SQL 必須跳過已有 active dispatch 的事件,避免 backlog 餓死。""" + import inspect + from src.services import governance_dispatcher + + source = inspect.getsource(governance_dispatcher._poll_unresolved_events) + + assert "GovernanceRemediationDispatch" in source + assert "ACTIVE_STATUSES" in source + assert ".where(~active_dispatch_exists)" in source + @pytest.mark.asyncio async def test_loop_processes_events_and_sleeps(self): """loop 一次 cycle 應處理 events 並 sleep。"""