feat(governance): process hermes km healthchecks
All checks were successful
Code Review / ai-code-review (push) Successful in 10s
CD Pipeline / tests (push) Successful in 2m13s
CD Pipeline / build-and-deploy (push) Successful in 5m14s
CD Pipeline / post-deploy-checks (push) Successful in 1m55s

This commit is contained in:
Your Name
2026-05-19 22:32:55 +08:00
parent bda857a8f3
commit edf97ad8ca
4 changed files with 503 additions and 0 deletions

View File

@@ -0,0 +1,304 @@
"""
Hermes KB Growth Worker
=======================
消費 governance_remediation_dispatch 中的 hermes_kb_growth_healthcheck work item
把 knowledge_degradation 告警推進成可審核的 KM 草稿。
邊界:
- 可以建立 REVIEW 狀態的 auto_runbook 草稿,讓 owner 在前端審核。
- 不可以直接把 KM 標成 APPROVED / PUBLISHED。
- 不修改 immutable ai_governance_events流程進度寫回 dispatch.decision_context。
2026-05-19 ogt + Codex: T90 Hermes KB growth healthcheck worker。
"""
from __future__ import annotations
import asyncio
from copy import deepcopy
from typing import Any
import structlog
from src.db.base import get_db_context
from src.db.models import GovernanceRemediationDispatch
from src.models.knowledge import (
EntrySource,
EntryStatus,
EntryType,
KnowledgeEntry,
KnowledgeEntryCreate,
)
from src.repositories.governance_remediation_dispatch_repo import (
InvalidStatusTransition,
list_pending_by_executor,
transition_status,
update_decision_context,
)
from src.repositories.knowledge_repository import KnowledgeDBRepository
logger = structlog.get_logger(__name__)
EXECUTOR_TYPE = "hermes_kb_growth_healthcheck"
DEFAULT_INTERVAL_SECONDS = 300
DEFAULT_LIMIT = 20
async def run_hermes_kb_growth_once(limit: int = DEFAULT_LIMIT) -> dict[str, int]:
"""執行一輪 Hermes KB growth healthcheck。
Returns:
統計資訊,供 log / smoke test 判讀。
"""
rows = await list_pending_by_executor(EXECUTOR_TYPE, limit=limit)
result = {
"scanned": len(rows),
"processed": 0,
"skipped": 0,
"failed": 0,
}
for row in rows:
try:
await _process_dispatch(row)
result["processed"] += 1
except InvalidStatusTransition as exc:
result["skipped"] += 1
logger.info(
"hermes_kb_growth_dispatch_skipped",
dispatch_id=row.id,
event_id=row.governance_event_id,
reason=str(exc),
)
except Exception as exc:
result["failed"] += 1
logger.exception(
"hermes_kb_growth_dispatch_failed",
dispatch_id=row.id,
event_id=row.governance_event_id,
error=str(exc),
)
await _mark_failed_if_started(row.id, str(exc))
if any(result.values()):
logger.info("hermes_kb_growth_once_completed", **result)
return result
async def run_hermes_kb_growth_loop(
interval_seconds: int = DEFAULT_INTERVAL_SECONDS,
limit: int = DEFAULT_LIMIT,
) -> None:
"""背景 loop定期消費 Hermes KB growth dispatch。"""
logger.info(
"hermes_kb_growth_loop_started",
interval_seconds=interval_seconds,
limit=limit,
)
while True:
try:
await run_hermes_kb_growth_once(limit=limit)
except asyncio.CancelledError:
raise
except Exception as exc:
logger.exception("hermes_kb_growth_loop_error", error=str(exc))
await asyncio.sleep(interval_seconds)
async def _process_dispatch(row: GovernanceRemediationDispatch) -> None:
"""處理單筆 pending dispatch最後停在 waiting_owner_review。"""
dispatched = await transition_status(row.id, "pending", "dispatched")
executing = await transition_status(dispatched.id, "dispatched", "executing")
km_entry = await _create_or_get_km_review_draft(executing)
updated_context = _build_review_context(
executing.decision_context or {},
dispatch_id=executing.id,
governance_event_id=executing.governance_event_id,
km_entry_id=km_entry.id,
)
await update_decision_context(executing.id, updated_context)
await transition_status(executing.id, "executing", "succeeded")
logger.info(
"hermes_kb_growth_review_draft_ready",
dispatch_id=executing.id,
event_id=executing.governance_event_id,
km_entry_id=km_entry.id,
workflow_stage="waiting_owner_review",
)
async def _create_or_get_km_review_draft(
dispatch: GovernanceRemediationDispatch,
) -> KnowledgeEntry:
"""以 dispatch tag 做冪等,建立或取得 REVIEW 狀態 KM 草稿。"""
dispatch_tag = f"dispatch:{dispatch.id}"
payload = _build_km_review_entry_payload(dispatch)
async with get_db_context() as db:
repo = KnowledgeDBRepository(db)
existing, _ = await repo.list_entries(tags=[dispatch_tag], limit=1)
if existing:
return existing[0]
return await repo.create(payload)
def _build_km_review_entry_payload(
dispatch: GovernanceRemediationDispatch,
) -> KnowledgeEntryCreate:
"""把 governance dispatch 轉成待審核的 KM 草稿 payload。"""
context = dispatch.decision_context or {}
workflow = context.get("workflow") if isinstance(context.get("workflow"), dict) else {}
impact = workflow.get("impact") if isinstance(workflow.get("impact"), dict) else {}
extra = context.get("extra") if isinstance(context.get("extra"), dict) else {}
ownership = context.get("ownership") if isinstance(context.get("ownership"), dict) else {}
if not ownership and isinstance(extra.get("ownership"), dict):
ownership = extra["ownership"]
stale_count = _pick_first(impact, extra, key="stale_count")
total_count = _pick_first(impact, extra, key="total_count")
stale_ratio = _pick_first(impact, context, key="stale_ratio")
threshold = _pick_first(impact, context, key="threshold")
stale_days = _pick_first(impact, extra, key="stale_days")
lead_agent = ownership.get("lead_agent") or "Hermes"
human_owner = ownership.get("human_owner") or "KM owner / SRE owner"
content = "\n".join([
"# KM 健康檢查草稿",
"",
"## 來源",
f"- governance_event_id: {dispatch.governance_event_id}",
f"- dispatch_id: {dispatch.id}",
f"- executor_type: {dispatch.executor_type}",
"",
"## 影響摘要",
f"- stale_count: {_format_unknown(stale_count)}",
f"- total_count: {_format_unknown(total_count)}",
f"- stale_ratio: {_format_ratio(stale_ratio)}",
f"- threshold: {_format_ratio(threshold)}",
f"- stale_days: {_format_unknown(stale_days)}",
"",
"## AI 已完成",
"- Hermes 已接手 knowledge_degradation dispatch。",
"- 已產生 KM 更新草稿與 owner review work item。",
"- 尚未把任何條目標成 approved / published。",
"",
"## Owner 審核重點",
"- 優先反查最近被 Incident、Sentry、SigNoz、PlayBook 引用的 KM。",
"- 確認草稿內容沒有把過期處置方式寫回正式知識庫。",
"- 審核通過後再進入 km_writeback_after_approval。",
"",
"## 安全邊界",
"- writes_km_without_approval=false",
f"- lead_agent={lead_agent}",
f"- human_owner={human_owner}",
])
return KnowledgeEntryCreate(
title=f"KM healthcheck review draft - {dispatch.governance_event_id[:8]}",
content=content,
entry_type=EntryType.AUTO_RUNBOOK,
category="AI治理",
tags=[
"governance:knowledge_degradation",
"workflow:kb_growth_healthcheck",
"stage:waiting_owner_review",
"agent:Hermes",
"needs_owner_review",
f"dispatch:{dispatch.id}",
f"governance_event:{dispatch.governance_event_id}",
],
source=EntrySource.AI_EXTRACTED,
status=EntryStatus.REVIEW,
path_type="hermes_kb_growth_healthcheck",
created_by="hermes_kb_growth_worker",
)
def _build_review_context(
context: dict[str, Any],
*,
dispatch_id: str,
governance_event_id: str,
km_entry_id: str,
) -> dict[str, Any]:
"""更新 dispatch read model讓 Work Items/Telegram 可見目前停在 owner review。"""
updated = deepcopy(context)
workflow = updated.setdefault("workflow", {})
if not isinstance(workflow, dict):
workflow = {}
updated["workflow"] = workflow
stages = workflow.setdefault("stage_by_dispatch_status", {})
if not isinstance(stages, dict):
stages = {}
workflow["stage_by_dispatch_status"] = stages
stages.update({
"executing": "draft_km_updates",
"succeeded": "waiting_owner_review",
"failed": "needs_manual_km_triage",
})
workflow["current_stage"] = "waiting_owner_review"
workflow["next_action"] = "owner_review_km_draft"
workflow["needs_human_review"] = True
workflow["writes_km_without_approval"] = False
workflow["kb_draft_entry_id"] = km_entry_id
updated["next_action"] = "owner_review_km_draft"
updated["decision_path"] = "draft_created_waiting_owner_review"
updated["proposed_action"] = "Hermes 已建立 KM 更新草稿,等待 owner 審核"
updated["worker_result"] = {
"worker": "Hermes",
"executor_type": EXECUTOR_TYPE,
"dispatch_id": dispatch_id,
"governance_event_id": governance_event_id,
"km_draft_entry_id": km_entry_id,
"stage": "waiting_owner_review",
"status": "draft_created",
"writes_km_without_approval": False,
}
return updated
async def _mark_failed_if_started(dispatch_id: str, error: str) -> None:
"""若 worker 已取得 dispatch將它收斂到 failed保留錯誤。"""
for from_status in ("executing", "dispatched"):
try:
await transition_status(
dispatch_id,
from_status,
"failed",
last_error=error[:500],
)
return
except InvalidStatusTransition:
continue
except Exception as exc:
logger.warning(
"hermes_kb_growth_mark_failed_failed",
dispatch_id=dispatch_id,
from_status=from_status,
error=str(exc),
)
return
def _pick_first(*sources: dict[str, Any], key: str) -> Any:
for source in sources:
if key in source:
return source[key]
return None
def _format_unknown(value: Any) -> str:
return "unknown" if value is None else str(value)
def _format_ratio(value: Any) -> str:
try:
return f"{float(value) * 100:.1f}%"
except (TypeError, ValueError):
return "unknown"

View File

@@ -668,6 +668,16 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
except Exception as e:
logger.warning("governance_dispatcher_schedule_failed", error=str(e))
# T90 2026-05-19 ogt + Codex: Hermes KB growth worker每 5 分鐘)
# 消費 knowledge_degradation 的 hermes_kb_growth_healthcheck dispatch
# 只產生 REVIEW 草稿並停在 owner review不直接批准或發布 KM。
try:
from src.jobs.hermes_kb_growth_worker import run_hermes_kb_growth_loop
asyncio.create_task(run_hermes_kb_growth_loop())
logger.info("hermes_kb_growth_worker_scheduled", interval_sec=300)
except Exception as e:
logger.warning("hermes_kb_growth_worker_schedule_failed", error=str(e))
# 2026-04-25 P1.2 by Claude Engineer-A2 — failover 整合到 ai_router + lifespan
# OllamaFailoverManager + OllamaAutoRecoveryService 飛輪接線:
# failover 切換時 → recovery_callback → set_current_primary → Redis 持久化

View File

@@ -356,6 +356,75 @@ async def list_pending(
return list(result.scalars().all())
async def list_pending_by_executor(
executor_type: str,
*,
limit: int = 50,
) -> list[GovernanceRemediationDispatch]:
"""列出指定 executor 的 pending dispatch按 dispatched_at ASC
用於 Hermes / 其他 worker 消費自己的 work item。由 repository 層集中查詢,
避免 job 直接散落表名與狀態條件。
Args:
executor_type: dispatch.executor_type例如 hermes_kb_growth_healthcheck
limit: 本輪最多取幾筆,避免 backlog 一次拖垮 worker
Returns:
最舊優先的 pending dispatch 列表。
"""
async with get_db_context() as db:
result = await db.execute(
select(GovernanceRemediationDispatch)
.where(GovernanceRemediationDispatch.dispatch_status == "pending")
.where(GovernanceRemediationDispatch.executor_type == executor_type)
.order_by(GovernanceRemediationDispatch.dispatched_at.asc())
.limit(limit)
)
return list(result.scalars().all())
async def update_decision_context(
dispatch_id: str,
decision_context: dict[str, Any],
) -> GovernanceRemediationDispatch:
"""更新 dispatch 的 decision_context保留同一 row 的 audit trail。
這只更新 dispatch work item 的讀模型上下文,不修改 immutable
ai_governance_events也不代表治理事件已被解決。
Args:
dispatch_id: governance_remediation_dispatch.id
decision_context: 新的 JSONB context
Returns:
更新後的 GovernanceRemediationDispatch ORM 物件
Raises:
DispatchNotFound: 找不到 dispatch_id
"""
async with get_db_context() as db:
result = await db.execute(
select(GovernanceRemediationDispatch)
.where(GovernanceRemediationDispatch.id == dispatch_id)
)
row = result.scalar_one_or_none()
if row is None:
raise DispatchNotFound(f"dispatch_id={dispatch_id!r} 不存在")
row.decision_context = decision_context
await db.flush()
await db.refresh(row)
logger.info(
"dispatch_decision_context_updated",
dispatch_id=dispatch_id,
event_id=row.governance_event_id,
executor_type=row.executor_type,
)
return row
async def list_by_event(
event_id: str,
) -> list[GovernanceRemediationDispatch]:

View File

@@ -0,0 +1,120 @@
from __future__ import annotations
from types import SimpleNamespace
from unittest.mock import AsyncMock, call, patch
import pytest
def _make_dispatch(**overrides):
defaults = {
"id": "dispatch-001",
"governance_event_id": "event-001",
"event_type": "knowledge_degradation",
"executor_type": "hermes_kb_growth_healthcheck",
"decision_context": {
"workflow": {
"impact": {
"stale_count": 1450,
"total_count": 1867,
"stale_ratio": 0.777,
"threshold": 0.2,
"stale_days": 7,
},
"stage_by_dispatch_status": {
"pending": "queued_kb_healthcheck",
"executing": "draft_km_updates",
"succeeded": "stale_ratio_recheck",
},
},
"ownership": {
"lead_agent": "Hermes",
"human_owner": "KM owner / SRE owner",
},
},
}
defaults.update(overrides)
return SimpleNamespace(**defaults)
def test_km_review_payload_is_review_only():
"""Hermes worker 只能產生 REVIEW 草稿,不可直接 approve/publish KM。"""
from src.jobs.hermes_kb_growth_worker import _build_km_review_entry_payload
from src.models.knowledge import EntrySource, EntryStatus, EntryType
payload = _build_km_review_entry_payload(_make_dispatch())
assert payload.entry_type == EntryType.AUTO_RUNBOOK
assert payload.source == EntrySource.AI_EXTRACTED
assert payload.status == EntryStatus.REVIEW
assert "agent:Hermes" in payload.tags
assert "needs_owner_review" in payload.tags
assert "dispatch:dispatch-001" in payload.tags
assert "writes_km_without_approval=false" in payload.content
def test_review_context_keeps_succeeded_at_owner_review_stage():
"""dispatch succeeded 代表 worker 完成草稿,不代表 KM 劣化已解決。"""
from src.jobs.hermes_kb_growth_worker import _build_review_context
context = _make_dispatch().decision_context
updated = _build_review_context(
context,
dispatch_id="dispatch-001",
governance_event_id="event-001",
km_entry_id="km-001",
)
workflow = updated["workflow"]
assert workflow["current_stage"] == "waiting_owner_review"
assert workflow["stage_by_dispatch_status"]["succeeded"] == "waiting_owner_review"
assert workflow["kb_draft_entry_id"] == "km-001"
assert workflow["writes_km_without_approval"] is False
assert updated["next_action"] == "owner_review_km_draft"
assert updated["worker_result"]["status"] == "draft_created"
@pytest.mark.asyncio
async def test_run_once_advances_pending_dispatch_to_review_draft():
"""pending dispatch 應推進到 succeeded + waiting_owner_review read model。"""
from src.jobs.hermes_kb_growth_worker import run_hermes_kb_growth_once
pending = _make_dispatch()
dispatched = _make_dispatch()
executing = _make_dispatch()
km_entry = SimpleNamespace(id="km-001")
transition_mock = AsyncMock(side_effect=[dispatched, executing, _make_dispatch()])
with (
patch(
"src.jobs.hermes_kb_growth_worker.list_pending_by_executor",
new=AsyncMock(return_value=[pending]),
) as list_pending,
patch(
"src.jobs.hermes_kb_growth_worker.transition_status",
new=transition_mock,
),
patch(
"src.jobs.hermes_kb_growth_worker._create_or_get_km_review_draft",
new=AsyncMock(return_value=km_entry),
) as create_draft,
patch(
"src.jobs.hermes_kb_growth_worker.update_decision_context",
new=AsyncMock(),
) as update_context,
):
result = await run_hermes_kb_growth_once(limit=5)
assert result == {"scanned": 1, "processed": 1, "skipped": 0, "failed": 0}
list_pending.assert_awaited_once_with("hermes_kb_growth_healthcheck", limit=5)
create_draft.assert_awaited_once_with(executing)
transition_mock.assert_has_awaits([
call("dispatch-001", "pending", "dispatched"),
call("dispatch-001", "dispatched", "executing"),
call("dispatch-001", "executing", "succeeded"),
])
update_context.assert_awaited_once()
updated_context = update_context.call_args.args[1]
assert updated_context["workflow"]["current_stage"] == "waiting_owner_review"
assert updated_context["worker_result"]["km_draft_entry_id"] == "km-001"