feat(governance): process hermes km healthchecks
This commit is contained in:
304
apps/api/src/jobs/hermes_kb_growth_worker.py
Normal file
304
apps/api/src/jobs/hermes_kb_growth_worker.py
Normal file
@@ -0,0 +1,304 @@
|
||||
"""
|
||||
Hermes KB Growth Worker
|
||||
=======================
|
||||
|
||||
消費 governance_remediation_dispatch 中的 hermes_kb_growth_healthcheck work item,
|
||||
把 knowledge_degradation 告警推進成可審核的 KM 草稿。
|
||||
|
||||
邊界:
|
||||
- 可以建立 REVIEW 狀態的 auto_runbook 草稿,讓 owner 在前端審核。
|
||||
- 不可以直接把 KM 標成 APPROVED / PUBLISHED。
|
||||
- 不修改 immutable ai_governance_events;流程進度寫回 dispatch.decision_context。
|
||||
|
||||
2026-05-19 ogt + Codex: T90 Hermes KB growth healthcheck worker。
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from copy import deepcopy
|
||||
from typing import Any
|
||||
|
||||
import structlog
|
||||
|
||||
from src.db.base import get_db_context
|
||||
from src.db.models import GovernanceRemediationDispatch
|
||||
from src.models.knowledge import (
|
||||
EntrySource,
|
||||
EntryStatus,
|
||||
EntryType,
|
||||
KnowledgeEntry,
|
||||
KnowledgeEntryCreate,
|
||||
)
|
||||
from src.repositories.governance_remediation_dispatch_repo import (
|
||||
InvalidStatusTransition,
|
||||
list_pending_by_executor,
|
||||
transition_status,
|
||||
update_decision_context,
|
||||
)
|
||||
from src.repositories.knowledge_repository import KnowledgeDBRepository
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
EXECUTOR_TYPE = "hermes_kb_growth_healthcheck"
|
||||
DEFAULT_INTERVAL_SECONDS = 300
|
||||
DEFAULT_LIMIT = 20
|
||||
|
||||
|
||||
async def run_hermes_kb_growth_once(limit: int = DEFAULT_LIMIT) -> dict[str, int]:
|
||||
"""執行一輪 Hermes KB growth healthcheck。
|
||||
|
||||
Returns:
|
||||
統計資訊,供 log / smoke test 判讀。
|
||||
"""
|
||||
rows = await list_pending_by_executor(EXECUTOR_TYPE, limit=limit)
|
||||
result = {
|
||||
"scanned": len(rows),
|
||||
"processed": 0,
|
||||
"skipped": 0,
|
||||
"failed": 0,
|
||||
}
|
||||
|
||||
for row in rows:
|
||||
try:
|
||||
await _process_dispatch(row)
|
||||
result["processed"] += 1
|
||||
except InvalidStatusTransition as exc:
|
||||
result["skipped"] += 1
|
||||
logger.info(
|
||||
"hermes_kb_growth_dispatch_skipped",
|
||||
dispatch_id=row.id,
|
||||
event_id=row.governance_event_id,
|
||||
reason=str(exc),
|
||||
)
|
||||
except Exception as exc:
|
||||
result["failed"] += 1
|
||||
logger.exception(
|
||||
"hermes_kb_growth_dispatch_failed",
|
||||
dispatch_id=row.id,
|
||||
event_id=row.governance_event_id,
|
||||
error=str(exc),
|
||||
)
|
||||
await _mark_failed_if_started(row.id, str(exc))
|
||||
|
||||
if any(result.values()):
|
||||
logger.info("hermes_kb_growth_once_completed", **result)
|
||||
return result
|
||||
|
||||
|
||||
async def run_hermes_kb_growth_loop(
|
||||
interval_seconds: int = DEFAULT_INTERVAL_SECONDS,
|
||||
limit: int = DEFAULT_LIMIT,
|
||||
) -> None:
|
||||
"""背景 loop:定期消費 Hermes KB growth dispatch。"""
|
||||
logger.info(
|
||||
"hermes_kb_growth_loop_started",
|
||||
interval_seconds=interval_seconds,
|
||||
limit=limit,
|
||||
)
|
||||
while True:
|
||||
try:
|
||||
await run_hermes_kb_growth_once(limit=limit)
|
||||
except asyncio.CancelledError:
|
||||
raise
|
||||
except Exception as exc:
|
||||
logger.exception("hermes_kb_growth_loop_error", error=str(exc))
|
||||
await asyncio.sleep(interval_seconds)
|
||||
|
||||
|
||||
async def _process_dispatch(row: GovernanceRemediationDispatch) -> None:
|
||||
"""處理單筆 pending dispatch,最後停在 waiting_owner_review。"""
|
||||
dispatched = await transition_status(row.id, "pending", "dispatched")
|
||||
executing = await transition_status(dispatched.id, "dispatched", "executing")
|
||||
|
||||
km_entry = await _create_or_get_km_review_draft(executing)
|
||||
updated_context = _build_review_context(
|
||||
executing.decision_context or {},
|
||||
dispatch_id=executing.id,
|
||||
governance_event_id=executing.governance_event_id,
|
||||
km_entry_id=km_entry.id,
|
||||
)
|
||||
await update_decision_context(executing.id, updated_context)
|
||||
await transition_status(executing.id, "executing", "succeeded")
|
||||
|
||||
logger.info(
|
||||
"hermes_kb_growth_review_draft_ready",
|
||||
dispatch_id=executing.id,
|
||||
event_id=executing.governance_event_id,
|
||||
km_entry_id=km_entry.id,
|
||||
workflow_stage="waiting_owner_review",
|
||||
)
|
||||
|
||||
|
||||
async def _create_or_get_km_review_draft(
|
||||
dispatch: GovernanceRemediationDispatch,
|
||||
) -> KnowledgeEntry:
|
||||
"""以 dispatch tag 做冪等,建立或取得 REVIEW 狀態 KM 草稿。"""
|
||||
dispatch_tag = f"dispatch:{dispatch.id}"
|
||||
payload = _build_km_review_entry_payload(dispatch)
|
||||
|
||||
async with get_db_context() as db:
|
||||
repo = KnowledgeDBRepository(db)
|
||||
existing, _ = await repo.list_entries(tags=[dispatch_tag], limit=1)
|
||||
if existing:
|
||||
return existing[0]
|
||||
return await repo.create(payload)
|
||||
|
||||
|
||||
def _build_km_review_entry_payload(
|
||||
dispatch: GovernanceRemediationDispatch,
|
||||
) -> KnowledgeEntryCreate:
|
||||
"""把 governance dispatch 轉成待審核的 KM 草稿 payload。"""
|
||||
context = dispatch.decision_context or {}
|
||||
workflow = context.get("workflow") if isinstance(context.get("workflow"), dict) else {}
|
||||
impact = workflow.get("impact") if isinstance(workflow.get("impact"), dict) else {}
|
||||
extra = context.get("extra") if isinstance(context.get("extra"), dict) else {}
|
||||
ownership = context.get("ownership") if isinstance(context.get("ownership"), dict) else {}
|
||||
if not ownership and isinstance(extra.get("ownership"), dict):
|
||||
ownership = extra["ownership"]
|
||||
|
||||
stale_count = _pick_first(impact, extra, key="stale_count")
|
||||
total_count = _pick_first(impact, extra, key="total_count")
|
||||
stale_ratio = _pick_first(impact, context, key="stale_ratio")
|
||||
threshold = _pick_first(impact, context, key="threshold")
|
||||
stale_days = _pick_first(impact, extra, key="stale_days")
|
||||
lead_agent = ownership.get("lead_agent") or "Hermes"
|
||||
human_owner = ownership.get("human_owner") or "KM owner / SRE owner"
|
||||
|
||||
content = "\n".join([
|
||||
"# KM 健康檢查草稿",
|
||||
"",
|
||||
"## 來源",
|
||||
f"- governance_event_id: {dispatch.governance_event_id}",
|
||||
f"- dispatch_id: {dispatch.id}",
|
||||
f"- executor_type: {dispatch.executor_type}",
|
||||
"",
|
||||
"## 影響摘要",
|
||||
f"- stale_count: {_format_unknown(stale_count)}",
|
||||
f"- total_count: {_format_unknown(total_count)}",
|
||||
f"- stale_ratio: {_format_ratio(stale_ratio)}",
|
||||
f"- threshold: {_format_ratio(threshold)}",
|
||||
f"- stale_days: {_format_unknown(stale_days)}",
|
||||
"",
|
||||
"## AI 已完成",
|
||||
"- Hermes 已接手 knowledge_degradation dispatch。",
|
||||
"- 已產生 KM 更新草稿與 owner review work item。",
|
||||
"- 尚未把任何條目標成 approved / published。",
|
||||
"",
|
||||
"## Owner 審核重點",
|
||||
"- 優先反查最近被 Incident、Sentry、SigNoz、PlayBook 引用的 KM。",
|
||||
"- 確認草稿內容沒有把過期處置方式寫回正式知識庫。",
|
||||
"- 審核通過後再進入 km_writeback_after_approval。",
|
||||
"",
|
||||
"## 安全邊界",
|
||||
"- writes_km_without_approval=false",
|
||||
f"- lead_agent={lead_agent}",
|
||||
f"- human_owner={human_owner}",
|
||||
])
|
||||
|
||||
return KnowledgeEntryCreate(
|
||||
title=f"KM healthcheck review draft - {dispatch.governance_event_id[:8]}",
|
||||
content=content,
|
||||
entry_type=EntryType.AUTO_RUNBOOK,
|
||||
category="AI治理",
|
||||
tags=[
|
||||
"governance:knowledge_degradation",
|
||||
"workflow:kb_growth_healthcheck",
|
||||
"stage:waiting_owner_review",
|
||||
"agent:Hermes",
|
||||
"needs_owner_review",
|
||||
f"dispatch:{dispatch.id}",
|
||||
f"governance_event:{dispatch.governance_event_id}",
|
||||
],
|
||||
source=EntrySource.AI_EXTRACTED,
|
||||
status=EntryStatus.REVIEW,
|
||||
path_type="hermes_kb_growth_healthcheck",
|
||||
created_by="hermes_kb_growth_worker",
|
||||
)
|
||||
|
||||
|
||||
def _build_review_context(
|
||||
context: dict[str, Any],
|
||||
*,
|
||||
dispatch_id: str,
|
||||
governance_event_id: str,
|
||||
km_entry_id: str,
|
||||
) -> dict[str, Any]:
|
||||
"""更新 dispatch read model,讓 Work Items/Telegram 可見目前停在 owner review。"""
|
||||
updated = deepcopy(context)
|
||||
workflow = updated.setdefault("workflow", {})
|
||||
if not isinstance(workflow, dict):
|
||||
workflow = {}
|
||||
updated["workflow"] = workflow
|
||||
|
||||
stages = workflow.setdefault("stage_by_dispatch_status", {})
|
||||
if not isinstance(stages, dict):
|
||||
stages = {}
|
||||
workflow["stage_by_dispatch_status"] = stages
|
||||
stages.update({
|
||||
"executing": "draft_km_updates",
|
||||
"succeeded": "waiting_owner_review",
|
||||
"failed": "needs_manual_km_triage",
|
||||
})
|
||||
|
||||
workflow["current_stage"] = "waiting_owner_review"
|
||||
workflow["next_action"] = "owner_review_km_draft"
|
||||
workflow["needs_human_review"] = True
|
||||
workflow["writes_km_without_approval"] = False
|
||||
workflow["kb_draft_entry_id"] = km_entry_id
|
||||
|
||||
updated["next_action"] = "owner_review_km_draft"
|
||||
updated["decision_path"] = "draft_created_waiting_owner_review"
|
||||
updated["proposed_action"] = "Hermes 已建立 KM 更新草稿,等待 owner 審核"
|
||||
updated["worker_result"] = {
|
||||
"worker": "Hermes",
|
||||
"executor_type": EXECUTOR_TYPE,
|
||||
"dispatch_id": dispatch_id,
|
||||
"governance_event_id": governance_event_id,
|
||||
"km_draft_entry_id": km_entry_id,
|
||||
"stage": "waiting_owner_review",
|
||||
"status": "draft_created",
|
||||
"writes_km_without_approval": False,
|
||||
}
|
||||
return updated
|
||||
|
||||
|
||||
async def _mark_failed_if_started(dispatch_id: str, error: str) -> None:
|
||||
"""若 worker 已取得 dispatch,將它收斂到 failed,保留錯誤。"""
|
||||
for from_status in ("executing", "dispatched"):
|
||||
try:
|
||||
await transition_status(
|
||||
dispatch_id,
|
||||
from_status,
|
||||
"failed",
|
||||
last_error=error[:500],
|
||||
)
|
||||
return
|
||||
except InvalidStatusTransition:
|
||||
continue
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
"hermes_kb_growth_mark_failed_failed",
|
||||
dispatch_id=dispatch_id,
|
||||
from_status=from_status,
|
||||
error=str(exc),
|
||||
)
|
||||
return
|
||||
|
||||
|
||||
def _pick_first(*sources: dict[str, Any], key: str) -> Any:
|
||||
for source in sources:
|
||||
if key in source:
|
||||
return source[key]
|
||||
return None
|
||||
|
||||
|
||||
def _format_unknown(value: Any) -> str:
|
||||
return "unknown" if value is None else str(value)
|
||||
|
||||
|
||||
def _format_ratio(value: Any) -> str:
|
||||
try:
|
||||
return f"{float(value) * 100:.1f}%"
|
||||
except (TypeError, ValueError):
|
||||
return "unknown"
|
||||
@@ -668,6 +668,16 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
|
||||
except Exception as e:
|
||||
logger.warning("governance_dispatcher_schedule_failed", error=str(e))
|
||||
|
||||
# T90 2026-05-19 ogt + Codex: Hermes KB growth worker(每 5 分鐘)
|
||||
# 消費 knowledge_degradation 的 hermes_kb_growth_healthcheck dispatch,
|
||||
# 只產生 REVIEW 草稿並停在 owner review,不直接批准或發布 KM。
|
||||
try:
|
||||
from src.jobs.hermes_kb_growth_worker import run_hermes_kb_growth_loop
|
||||
asyncio.create_task(run_hermes_kb_growth_loop())
|
||||
logger.info("hermes_kb_growth_worker_scheduled", interval_sec=300)
|
||||
except Exception as e:
|
||||
logger.warning("hermes_kb_growth_worker_schedule_failed", error=str(e))
|
||||
|
||||
# 2026-04-25 P1.2 by Claude Engineer-A2 — failover 整合到 ai_router + lifespan
|
||||
# OllamaFailoverManager + OllamaAutoRecoveryService 飛輪接線:
|
||||
# failover 切換時 → recovery_callback → set_current_primary → Redis 持久化
|
||||
|
||||
@@ -356,6 +356,75 @@ async def list_pending(
|
||||
return list(result.scalars().all())
|
||||
|
||||
|
||||
async def list_pending_by_executor(
|
||||
executor_type: str,
|
||||
*,
|
||||
limit: int = 50,
|
||||
) -> list[GovernanceRemediationDispatch]:
|
||||
"""列出指定 executor 的 pending dispatch(按 dispatched_at ASC)。
|
||||
|
||||
用於 Hermes / 其他 worker 消費自己的 work item。由 repository 層集中查詢,
|
||||
避免 job 直接散落表名與狀態條件。
|
||||
|
||||
Args:
|
||||
executor_type: dispatch.executor_type,例如 hermes_kb_growth_healthcheck
|
||||
limit: 本輪最多取幾筆,避免 backlog 一次拖垮 worker
|
||||
|
||||
Returns:
|
||||
最舊優先的 pending dispatch 列表。
|
||||
"""
|
||||
async with get_db_context() as db:
|
||||
result = await db.execute(
|
||||
select(GovernanceRemediationDispatch)
|
||||
.where(GovernanceRemediationDispatch.dispatch_status == "pending")
|
||||
.where(GovernanceRemediationDispatch.executor_type == executor_type)
|
||||
.order_by(GovernanceRemediationDispatch.dispatched_at.asc())
|
||||
.limit(limit)
|
||||
)
|
||||
return list(result.scalars().all())
|
||||
|
||||
|
||||
async def update_decision_context(
|
||||
dispatch_id: str,
|
||||
decision_context: dict[str, Any],
|
||||
) -> GovernanceRemediationDispatch:
|
||||
"""更新 dispatch 的 decision_context,保留同一 row 的 audit trail。
|
||||
|
||||
這只更新 dispatch work item 的讀模型上下文,不修改 immutable
|
||||
ai_governance_events,也不代表治理事件已被解決。
|
||||
|
||||
Args:
|
||||
dispatch_id: governance_remediation_dispatch.id
|
||||
decision_context: 新的 JSONB context
|
||||
|
||||
Returns:
|
||||
更新後的 GovernanceRemediationDispatch ORM 物件
|
||||
|
||||
Raises:
|
||||
DispatchNotFound: 找不到 dispatch_id
|
||||
"""
|
||||
async with get_db_context() as db:
|
||||
result = await db.execute(
|
||||
select(GovernanceRemediationDispatch)
|
||||
.where(GovernanceRemediationDispatch.id == dispatch_id)
|
||||
)
|
||||
row = result.scalar_one_or_none()
|
||||
if row is None:
|
||||
raise DispatchNotFound(f"dispatch_id={dispatch_id!r} 不存在")
|
||||
|
||||
row.decision_context = decision_context
|
||||
await db.flush()
|
||||
await db.refresh(row)
|
||||
|
||||
logger.info(
|
||||
"dispatch_decision_context_updated",
|
||||
dispatch_id=dispatch_id,
|
||||
event_id=row.governance_event_id,
|
||||
executor_type=row.executor_type,
|
||||
)
|
||||
return row
|
||||
|
||||
|
||||
async def list_by_event(
|
||||
event_id: str,
|
||||
) -> list[GovernanceRemediationDispatch]:
|
||||
|
||||
120
apps/api/tests/test_hermes_kb_growth_worker.py
Normal file
120
apps/api/tests/test_hermes_kb_growth_worker.py
Normal file
@@ -0,0 +1,120 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import AsyncMock, call, patch
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
def _make_dispatch(**overrides):
|
||||
defaults = {
|
||||
"id": "dispatch-001",
|
||||
"governance_event_id": "event-001",
|
||||
"event_type": "knowledge_degradation",
|
||||
"executor_type": "hermes_kb_growth_healthcheck",
|
||||
"decision_context": {
|
||||
"workflow": {
|
||||
"impact": {
|
||||
"stale_count": 1450,
|
||||
"total_count": 1867,
|
||||
"stale_ratio": 0.777,
|
||||
"threshold": 0.2,
|
||||
"stale_days": 7,
|
||||
},
|
||||
"stage_by_dispatch_status": {
|
||||
"pending": "queued_kb_healthcheck",
|
||||
"executing": "draft_km_updates",
|
||||
"succeeded": "stale_ratio_recheck",
|
||||
},
|
||||
},
|
||||
"ownership": {
|
||||
"lead_agent": "Hermes",
|
||||
"human_owner": "KM owner / SRE owner",
|
||||
},
|
||||
},
|
||||
}
|
||||
defaults.update(overrides)
|
||||
return SimpleNamespace(**defaults)
|
||||
|
||||
|
||||
def test_km_review_payload_is_review_only():
|
||||
"""Hermes worker 只能產生 REVIEW 草稿,不可直接 approve/publish KM。"""
|
||||
from src.jobs.hermes_kb_growth_worker import _build_km_review_entry_payload
|
||||
from src.models.knowledge import EntrySource, EntryStatus, EntryType
|
||||
|
||||
payload = _build_km_review_entry_payload(_make_dispatch())
|
||||
|
||||
assert payload.entry_type == EntryType.AUTO_RUNBOOK
|
||||
assert payload.source == EntrySource.AI_EXTRACTED
|
||||
assert payload.status == EntryStatus.REVIEW
|
||||
assert "agent:Hermes" in payload.tags
|
||||
assert "needs_owner_review" in payload.tags
|
||||
assert "dispatch:dispatch-001" in payload.tags
|
||||
assert "writes_km_without_approval=false" in payload.content
|
||||
|
||||
|
||||
def test_review_context_keeps_succeeded_at_owner_review_stage():
|
||||
"""dispatch succeeded 代表 worker 完成草稿,不代表 KM 劣化已解決。"""
|
||||
from src.jobs.hermes_kb_growth_worker import _build_review_context
|
||||
|
||||
context = _make_dispatch().decision_context
|
||||
updated = _build_review_context(
|
||||
context,
|
||||
dispatch_id="dispatch-001",
|
||||
governance_event_id="event-001",
|
||||
km_entry_id="km-001",
|
||||
)
|
||||
|
||||
workflow = updated["workflow"]
|
||||
assert workflow["current_stage"] == "waiting_owner_review"
|
||||
assert workflow["stage_by_dispatch_status"]["succeeded"] == "waiting_owner_review"
|
||||
assert workflow["kb_draft_entry_id"] == "km-001"
|
||||
assert workflow["writes_km_without_approval"] is False
|
||||
assert updated["next_action"] == "owner_review_km_draft"
|
||||
assert updated["worker_result"]["status"] == "draft_created"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_once_advances_pending_dispatch_to_review_draft():
|
||||
"""pending dispatch 應推進到 succeeded + waiting_owner_review read model。"""
|
||||
from src.jobs.hermes_kb_growth_worker import run_hermes_kb_growth_once
|
||||
|
||||
pending = _make_dispatch()
|
||||
dispatched = _make_dispatch()
|
||||
executing = _make_dispatch()
|
||||
km_entry = SimpleNamespace(id="km-001")
|
||||
|
||||
transition_mock = AsyncMock(side_effect=[dispatched, executing, _make_dispatch()])
|
||||
|
||||
with (
|
||||
patch(
|
||||
"src.jobs.hermes_kb_growth_worker.list_pending_by_executor",
|
||||
new=AsyncMock(return_value=[pending]),
|
||||
) as list_pending,
|
||||
patch(
|
||||
"src.jobs.hermes_kb_growth_worker.transition_status",
|
||||
new=transition_mock,
|
||||
),
|
||||
patch(
|
||||
"src.jobs.hermes_kb_growth_worker._create_or_get_km_review_draft",
|
||||
new=AsyncMock(return_value=km_entry),
|
||||
) as create_draft,
|
||||
patch(
|
||||
"src.jobs.hermes_kb_growth_worker.update_decision_context",
|
||||
new=AsyncMock(),
|
||||
) as update_context,
|
||||
):
|
||||
result = await run_hermes_kb_growth_once(limit=5)
|
||||
|
||||
assert result == {"scanned": 1, "processed": 1, "skipped": 0, "failed": 0}
|
||||
list_pending.assert_awaited_once_with("hermes_kb_growth_healthcheck", limit=5)
|
||||
create_draft.assert_awaited_once_with(executing)
|
||||
transition_mock.assert_has_awaits([
|
||||
call("dispatch-001", "pending", "dispatched"),
|
||||
call("dispatch-001", "dispatched", "executing"),
|
||||
call("dispatch-001", "executing", "succeeded"),
|
||||
])
|
||||
update_context.assert_awaited_once()
|
||||
updated_context = update_context.call_args.args[1]
|
||||
assert updated_context["workflow"]["current_stage"] == "waiting_owner_review"
|
||||
assert updated_context["worker_result"]["km_draft_entry_id"] == "km-001"
|
||||
Reference in New Issue
Block a user