fix(governance): stop km healthcheck requeue
This commit is contained in:
@@ -133,12 +133,16 @@ async def _process_dispatch(row: GovernanceRemediationDispatch) -> None:
|
||||
async def _create_or_get_km_review_draft(
|
||||
dispatch: GovernanceRemediationDispatch,
|
||||
) -> KnowledgeEntry:
|
||||
"""以 dispatch tag 做冪等,建立或取得 REVIEW 狀態 KM 草稿。"""
|
||||
"""以 governance event tag 做冪等,建立或取得 REVIEW 狀態 KM 草稿。"""
|
||||
dispatch_tag = f"dispatch:{dispatch.id}"
|
||||
event_tag = f"governance_event:{dispatch.governance_event_id}"
|
||||
payload = _build_km_review_entry_payload(dispatch)
|
||||
|
||||
async with get_db_context() as db:
|
||||
repo = KnowledgeDBRepository(db)
|
||||
existing, _ = await repo.list_entries(tags=[event_tag], limit=1)
|
||||
if existing:
|
||||
return existing[0]
|
||||
existing, _ = await repo.list_entries(tags=[dispatch_tag], limit=1)
|
||||
if existing:
|
||||
return existing[0]
|
||||
|
||||
@@ -37,6 +37,7 @@ from src.repositories.governance_remediation_dispatch_repo import (
|
||||
DispatchAlreadyActive,
|
||||
create_dispatch,
|
||||
get_active_for_event,
|
||||
list_by_event,
|
||||
transition_status,
|
||||
)
|
||||
from src.services.decision_fusion_adapter import FusedDecision, get_decision_fusion_adapter
|
||||
@@ -154,6 +155,13 @@ async def dispatch_governance_event(event: AiGovernanceEvent) -> str | None:
|
||||
# knowledge_degradation 的 run_kb_growth_healthcheck 是治理工作項 intake,
|
||||
# 不是自動修復執行;先落 pending dispatch,讓既有 unresolved 事件也能在 AwoooP 追蹤。
|
||||
if _is_kb_growth_healthcheck_event(event):
|
||||
if await _has_kb_growth_review_draft(event_id):
|
||||
logger.debug(
|
||||
"governance_kb_healthcheck_dispatch_skipped_waiting_review",
|
||||
event_id=event_id,
|
||||
event_type=event_type,
|
||||
)
|
||||
return None
|
||||
return await _record_kb_growth_healthcheck_dispatch(event)
|
||||
|
||||
# Step 1: Redis skip 冷卻檢查(防止一般 skip 事件每 30s 重新做 LLM 呼叫)。
|
||||
@@ -431,6 +439,37 @@ def _is_kb_growth_healthcheck_event(event: AiGovernanceEvent) -> bool:
|
||||
return details.get("next_action") == "run_kb_growth_healthcheck"
|
||||
|
||||
|
||||
async def _has_kb_growth_review_draft(event_id: str) -> bool:
|
||||
"""同一治理事件已有 Hermes review draft 時,不再重複建立 pending dispatch。"""
|
||||
try:
|
||||
rows = await list_by_event(event_id)
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
"governance_kb_healthcheck_history_lookup_failed",
|
||||
event_id=event_id,
|
||||
error=str(exc),
|
||||
)
|
||||
return False
|
||||
|
||||
for row in rows:
|
||||
if row.executor_type != "hermes_kb_growth_healthcheck":
|
||||
continue
|
||||
if row.dispatch_status != "succeeded":
|
||||
continue
|
||||
context = row.decision_context if isinstance(row.decision_context, dict) else {}
|
||||
workflow = context.get("workflow") if isinstance(context.get("workflow"), dict) else {}
|
||||
worker_result = (
|
||||
context.get("worker_result")
|
||||
if isinstance(context.get("worker_result"), dict)
|
||||
else {}
|
||||
)
|
||||
if workflow.get("current_stage") == "waiting_owner_review":
|
||||
return True
|
||||
if worker_result.get("status") == "draft_created":
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def _extract_event_next_action(details: dict[str, Any], fallback: str) -> str:
|
||||
remediation = details.get("remediation")
|
||||
if isinstance(remediation, dict):
|
||||
|
||||
@@ -285,6 +285,43 @@ class TestDispatchGovernanceEvent:
|
||||
mock_skip_cooldown.assert_not_awaited()
|
||||
mock_create.assert_awaited_once()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_knowledge_degradation_does_not_requeue_after_review_draft(self):
|
||||
"""同一 event 已有 Hermes review draft 後,不可反覆補 pending dispatch。"""
|
||||
event = _make_governance_event(event_type="knowledge_degradation")
|
||||
event.details = {
|
||||
"remediation": {"next_action": "run_kb_growth_healthcheck"},
|
||||
"ownership": {"lead_agent": "Hermes"},
|
||||
}
|
||||
succeeded_dispatch = MagicMock()
|
||||
succeeded_dispatch.executor_type = "hermes_kb_growth_healthcheck"
|
||||
succeeded_dispatch.dispatch_status = "succeeded"
|
||||
succeeded_dispatch.decision_context = {
|
||||
"workflow": {"current_stage": "waiting_owner_review"},
|
||||
"worker_result": {"status": "draft_created"},
|
||||
}
|
||||
|
||||
with (
|
||||
patch(
|
||||
"src.services.governance_dispatcher.get_active_for_event",
|
||||
new=AsyncMock(return_value=None),
|
||||
),
|
||||
patch(
|
||||
"src.services.governance_dispatcher.list_by_event",
|
||||
new=AsyncMock(return_value=[succeeded_dispatch]),
|
||||
) as mock_history,
|
||||
patch(
|
||||
"src.services.governance_dispatcher.create_dispatch",
|
||||
new=AsyncMock(),
|
||||
) as mock_create,
|
||||
):
|
||||
from src.services.governance_dispatcher import dispatch_governance_event
|
||||
result = await dispatch_governance_event(event)
|
||||
|
||||
assert result is None
|
||||
mock_history.assert_awaited_once_with("evt-001")
|
||||
mock_create.assert_not_awaited()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_llm_failure_fallback_to_skip(self):
|
||||
"""fusion adapter 拋 Exception → fallback skip,不寫 dispatch,返回 None。"""
|
||||
|
||||
@@ -50,6 +50,7 @@ def test_km_review_payload_is_review_only():
|
||||
assert "agent:Hermes" in payload.tags
|
||||
assert "needs_owner_review" in payload.tags
|
||||
assert "dispatch:dispatch-001" in payload.tags
|
||||
assert "governance_event:event-001" in payload.tags
|
||||
assert "writes_km_without_approval=false" in payload.content
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user