fix(governance): drain km healthcheck backlog
This commit is contained in:
@@ -28,11 +28,12 @@ from datetime import datetime, timezone
|
||||
from typing import Any
|
||||
|
||||
import structlog
|
||||
from sqlalchemy import select, update
|
||||
from sqlalchemy import exists, select, update
|
||||
|
||||
from src.db.base import get_db_context
|
||||
from src.db.models import AiGovernanceEvent
|
||||
from src.db.models import AiGovernanceEvent, GovernanceRemediationDispatch
|
||||
from src.repositories.governance_remediation_dispatch_repo import (
|
||||
ACTIVE_STATUSES,
|
||||
DispatchAlreadyActive,
|
||||
create_dispatch,
|
||||
get_active_for_event,
|
||||
@@ -136,16 +137,9 @@ async def dispatch_governance_event(event: AiGovernanceEvent) -> str | None:
|
||||
event_id = event.id
|
||||
event_type = event.event_type
|
||||
|
||||
# Step 0: Redis skip 冷卻檢查(防止 skip 事件每 30s 重新做 LLM 呼叫)
|
||||
if await _is_skip_cooldown(event_id):
|
||||
logger.debug(
|
||||
"governance_dispatch_skip_cooldown",
|
||||
event_id=event_id,
|
||||
event_type=event_type,
|
||||
)
|
||||
return None
|
||||
|
||||
# Step 1: 檢查是否已有活躍 dispatch(冪等保護)
|
||||
# Step 0: 檢查是否已有活躍 dispatch(冪等保護)。
|
||||
# 先查 DB 再看 skip cooldown,避免曾經被 skip 的 knowledge_degradation
|
||||
# 無法補建 Hermes KB healthcheck intake work item。
|
||||
existing = await get_active_for_event(event_id)
|
||||
if existing is not None:
|
||||
logger.debug(
|
||||
@@ -162,6 +156,15 @@ async def dispatch_governance_event(event: AiGovernanceEvent) -> str | None:
|
||||
if _is_kb_growth_healthcheck_event(event):
|
||||
return await _record_kb_growth_healthcheck_dispatch(event)
|
||||
|
||||
# Step 1: Redis skip 冷卻檢查(防止一般 skip 事件每 30s 重新做 LLM 呼叫)。
|
||||
if await _is_skip_cooldown(event_id):
|
||||
logger.debug(
|
||||
"governance_dispatch_skip_cooldown",
|
||||
event_id=event_id,
|
||||
event_type=event_type,
|
||||
)
|
||||
return None
|
||||
|
||||
# Step 2: 決策融合(三維:LLM × Playbook × MCP)
|
||||
adapter = get_decision_fusion_adapter()
|
||||
try:
|
||||
@@ -346,10 +349,15 @@ async def _poll_unresolved_events() -> list[AiGovernanceEvent]:
|
||||
最多 _MAX_EVENTS_PER_CYCLE 筆 AiGovernanceEvent ORM 物件列表
|
||||
"""
|
||||
async with get_db_context() as db:
|
||||
active_dispatch_exists = exists().where(
|
||||
GovernanceRemediationDispatch.governance_event_id == AiGovernanceEvent.id,
|
||||
GovernanceRemediationDispatch.dispatch_status.in_(list(ACTIVE_STATUSES)),
|
||||
)
|
||||
result = await db.execute(
|
||||
select(AiGovernanceEvent)
|
||||
.where(AiGovernanceEvent.resolved.is_(False))
|
||||
.where(AiGovernanceEvent.event_type.in_(list(_DISPATCHABLE_EVENT_TYPES)))
|
||||
.where(~active_dispatch_exists)
|
||||
.order_by(AiGovernanceEvent.triggered_at.asc())
|
||||
.limit(_MAX_EVENTS_PER_CYCLE)
|
||||
)
|
||||
|
||||
@@ -253,6 +253,38 @@ class TestDispatchGovernanceEvent:
|
||||
assert dispatch_kwargs["decision_context"]["next_action"] == "run_kb_growth_healthcheck"
|
||||
assert dispatch_kwargs["decision_context"]["workflow"]["current_stage"] == "queued_kb_healthcheck"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_knowledge_degradation_intake_ignores_legacy_skip_cooldown(self):
|
||||
"""舊 skip cooldown 不可阻止 KM healthcheck 補建 work item。"""
|
||||
event = _make_governance_event(event_type="knowledge_degradation")
|
||||
event.details = {
|
||||
"remediation": {"next_action": "run_kb_growth_healthcheck"},
|
||||
"ownership": {"lead_agent": "Hermes"},
|
||||
}
|
||||
mock_dispatch_row = MagicMock()
|
||||
mock_dispatch_row.id = "dispatch-kb-cooldown"
|
||||
|
||||
with (
|
||||
patch(
|
||||
"src.services.governance_dispatcher.get_active_for_event",
|
||||
new=AsyncMock(return_value=None),
|
||||
),
|
||||
patch(
|
||||
"src.services.governance_dispatcher._is_skip_cooldown",
|
||||
new=AsyncMock(return_value=True),
|
||||
) as mock_skip_cooldown,
|
||||
patch(
|
||||
"src.services.governance_dispatcher.create_dispatch",
|
||||
new=AsyncMock(return_value=mock_dispatch_row),
|
||||
) as mock_create,
|
||||
):
|
||||
from src.services.governance_dispatcher import dispatch_governance_event
|
||||
result = await dispatch_governance_event(event)
|
||||
|
||||
assert result == "dispatch-kb-cooldown"
|
||||
mock_skip_cooldown.assert_not_awaited()
|
||||
mock_create.assert_awaited_once()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_llm_failure_fallback_to_skip(self):
|
||||
"""fusion adapter 拋 Exception → fallback skip,不寫 dispatch,返回 None。"""
|
||||
@@ -467,6 +499,17 @@ class TestDecisionFusionAdapterHelpers:
|
||||
class TestRunGovernanceDispatcherLoop:
|
||||
"""run_governance_dispatcher_loop 排程迴圈行為測試。"""
|
||||
|
||||
def test_poll_unresolved_events_excludes_active_dispatch_rows(self):
|
||||
"""poll SQL 必須跳過已有 active dispatch 的事件,避免 backlog 餓死。"""
|
||||
import inspect
|
||||
from src.services import governance_dispatcher
|
||||
|
||||
source = inspect.getsource(governance_dispatcher._poll_unresolved_events)
|
||||
|
||||
assert "GovernanceRemediationDispatch" in source
|
||||
assert "ACTIVE_STATUSES" in source
|
||||
assert ".where(~active_dispatch_exists)" in source
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_loop_processes_events_and_sleeps(self):
|
||||
"""loop 一次 cycle 應處理 events 並 sleep。"""
|
||||
|
||||
Reference in New Issue
Block a user