fix(governance): drain km healthcheck backlog
All checks were successful
Code Review / ai-code-review (push) Successful in 10s
CD Pipeline / tests (push) Successful in 6m2s
CD Pipeline / build-and-deploy (push) Successful in 4m26s
CD Pipeline / post-deploy-checks (push) Successful in 1m37s

This commit is contained in:
Your Name
2026-05-19 21:43:19 +08:00
parent 271aadcefe
commit 2f68b3f472
2 changed files with 63 additions and 12 deletions

View File

@@ -28,11 +28,12 @@ from datetime import datetime, timezone
from typing import Any
import structlog
from sqlalchemy import select, update
from sqlalchemy import exists, select, update
from src.db.base import get_db_context
from src.db.models import AiGovernanceEvent
from src.db.models import AiGovernanceEvent, GovernanceRemediationDispatch
from src.repositories.governance_remediation_dispatch_repo import (
ACTIVE_STATUSES,
DispatchAlreadyActive,
create_dispatch,
get_active_for_event,
@@ -136,16 +137,9 @@ async def dispatch_governance_event(event: AiGovernanceEvent) -> str | None:
event_id = event.id
event_type = event.event_type
# Step 0: Redis skip 冷卻檢查(防止 skip 事件每 30s 重新做 LLM 呼叫)
if await _is_skip_cooldown(event_id):
logger.debug(
"governance_dispatch_skip_cooldown",
event_id=event_id,
event_type=event_type,
)
return None
# Step 1: 檢查是否已有活躍 dispatch冪等保護
# Step 0: 檢查是否已有活躍 dispatch冪等保護
# 先查 DB 再看 skip cooldown,避免曾經被 skip 的 knowledge_degradation
# 無法補建 Hermes KB healthcheck intake work item。
existing = await get_active_for_event(event_id)
if existing is not None:
logger.debug(
@@ -162,6 +156,15 @@ async def dispatch_governance_event(event: AiGovernanceEvent) -> str | None:
if _is_kb_growth_healthcheck_event(event):
return await _record_kb_growth_healthcheck_dispatch(event)
# Step 1: Redis skip 冷卻檢查(防止一般 skip 事件每 30s 重新做 LLM 呼叫)。
if await _is_skip_cooldown(event_id):
logger.debug(
"governance_dispatch_skip_cooldown",
event_id=event_id,
event_type=event_type,
)
return None
# Step 2: 決策融合三維LLM × Playbook × MCP
adapter = get_decision_fusion_adapter()
try:
@@ -346,10 +349,15 @@ async def _poll_unresolved_events() -> list[AiGovernanceEvent]:
最多 _MAX_EVENTS_PER_CYCLE 筆 AiGovernanceEvent ORM 物件列表
"""
async with get_db_context() as db:
active_dispatch_exists = exists().where(
GovernanceRemediationDispatch.governance_event_id == AiGovernanceEvent.id,
GovernanceRemediationDispatch.dispatch_status.in_(list(ACTIVE_STATUSES)),
)
result = await db.execute(
select(AiGovernanceEvent)
.where(AiGovernanceEvent.resolved.is_(False))
.where(AiGovernanceEvent.event_type.in_(list(_DISPATCHABLE_EVENT_TYPES)))
.where(~active_dispatch_exists)
.order_by(AiGovernanceEvent.triggered_at.asc())
.limit(_MAX_EVENTS_PER_CYCLE)
)

View File

@@ -253,6 +253,38 @@ class TestDispatchGovernanceEvent:
assert dispatch_kwargs["decision_context"]["next_action"] == "run_kb_growth_healthcheck"
assert dispatch_kwargs["decision_context"]["workflow"]["current_stage"] == "queued_kb_healthcheck"
@pytest.mark.asyncio
async def test_knowledge_degradation_intake_ignores_legacy_skip_cooldown(self):
"""舊 skip cooldown 不可阻止 KM healthcheck 補建 work item。"""
event = _make_governance_event(event_type="knowledge_degradation")
event.details = {
"remediation": {"next_action": "run_kb_growth_healthcheck"},
"ownership": {"lead_agent": "Hermes"},
}
mock_dispatch_row = MagicMock()
mock_dispatch_row.id = "dispatch-kb-cooldown"
with (
patch(
"src.services.governance_dispatcher.get_active_for_event",
new=AsyncMock(return_value=None),
),
patch(
"src.services.governance_dispatcher._is_skip_cooldown",
new=AsyncMock(return_value=True),
) as mock_skip_cooldown,
patch(
"src.services.governance_dispatcher.create_dispatch",
new=AsyncMock(return_value=mock_dispatch_row),
) as mock_create,
):
from src.services.governance_dispatcher import dispatch_governance_event
result = await dispatch_governance_event(event)
assert result == "dispatch-kb-cooldown"
mock_skip_cooldown.assert_not_awaited()
mock_create.assert_awaited_once()
@pytest.mark.asyncio
async def test_llm_failure_fallback_to_skip(self):
"""fusion adapter 拋 Exception → fallback skip不寫 dispatch返回 None。"""
@@ -467,6 +499,17 @@ class TestDecisionFusionAdapterHelpers:
class TestRunGovernanceDispatcherLoop:
"""run_governance_dispatcher_loop 排程迴圈行為測試。"""
def test_poll_unresolved_events_excludes_active_dispatch_rows(self):
"""poll SQL 必須跳過已有 active dispatch 的事件,避免 backlog 餓死。"""
import inspect
from src.services import governance_dispatcher
source = inspect.getsource(governance_dispatcher._poll_unresolved_events)
assert "GovernanceRemediationDispatch" in source
assert "ACTIVE_STATUSES" in source
assert ".where(~active_dispatch_exists)" in source
@pytest.mark.asyncio
async def test_loop_processes_events_and_sleeps(self):
"""loop 一次 cycle 應處理 events 並 sleep。"""