From edf97ad8cab83b22eb07f0aa69d3d66116830f7b Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 19 May 2026 22:32:55 +0800 Subject: [PATCH] feat(governance): process hermes km healthchecks --- apps/api/src/jobs/hermes_kb_growth_worker.py | 304 ++++++++++++++++++ apps/api/src/main.py | 10 + .../governance_remediation_dispatch_repo.py | 69 ++++ .../api/tests/test_hermes_kb_growth_worker.py | 120 +++++++ 4 files changed, 503 insertions(+) create mode 100644 apps/api/src/jobs/hermes_kb_growth_worker.py create mode 100644 apps/api/tests/test_hermes_kb_growth_worker.py diff --git a/apps/api/src/jobs/hermes_kb_growth_worker.py b/apps/api/src/jobs/hermes_kb_growth_worker.py new file mode 100644 index 00000000..ecf17904 --- /dev/null +++ b/apps/api/src/jobs/hermes_kb_growth_worker.py @@ -0,0 +1,304 @@ +""" +Hermes KB Growth Worker +======================= + +消費 governance_remediation_dispatch 中的 hermes_kb_growth_healthcheck work item, +把 knowledge_degradation 告警推進成可審核的 KM 草稿。 + +邊界: +- 可以建立 REVIEW 狀態的 auto_runbook 草稿,讓 owner 在前端審核。 +- 不可以直接把 KM 標成 APPROVED / PUBLISHED。 +- 不修改 immutable ai_governance_events;流程進度寫回 dispatch.decision_context。 + +2026-05-19 ogt + Codex: T90 Hermes KB growth healthcheck worker。 +""" + +from __future__ import annotations + +import asyncio +from copy import deepcopy +from typing import Any + +import structlog + +from src.db.base import get_db_context +from src.db.models import GovernanceRemediationDispatch +from src.models.knowledge import ( + EntrySource, + EntryStatus, + EntryType, + KnowledgeEntry, + KnowledgeEntryCreate, +) +from src.repositories.governance_remediation_dispatch_repo import ( + InvalidStatusTransition, + list_pending_by_executor, + transition_status, + update_decision_context, +) +from src.repositories.knowledge_repository import KnowledgeDBRepository + +logger = structlog.get_logger(__name__) + +EXECUTOR_TYPE = "hermes_kb_growth_healthcheck" +DEFAULT_INTERVAL_SECONDS = 300 +DEFAULT_LIMIT = 20 + + +async def run_hermes_kb_growth_once(limit: int = DEFAULT_LIMIT) -> dict[str, int]: + """執行一輪 Hermes KB growth healthcheck。 + + Returns: + 統計資訊,供 log / smoke test 判讀。 + """ + rows = await list_pending_by_executor(EXECUTOR_TYPE, limit=limit) + result = { + "scanned": len(rows), + "processed": 0, + "skipped": 0, + "failed": 0, + } + + for row in rows: + try: + await _process_dispatch(row) + result["processed"] += 1 + except InvalidStatusTransition as exc: + result["skipped"] += 1 + logger.info( + "hermes_kb_growth_dispatch_skipped", + dispatch_id=row.id, + event_id=row.governance_event_id, + reason=str(exc), + ) + except Exception as exc: + result["failed"] += 1 + logger.exception( + "hermes_kb_growth_dispatch_failed", + dispatch_id=row.id, + event_id=row.governance_event_id, + error=str(exc), + ) + await _mark_failed_if_started(row.id, str(exc)) + + if any(result.values()): + logger.info("hermes_kb_growth_once_completed", **result) + return result + + +async def run_hermes_kb_growth_loop( + interval_seconds: int = DEFAULT_INTERVAL_SECONDS, + limit: int = DEFAULT_LIMIT, +) -> None: + """背景 loop:定期消費 Hermes KB growth dispatch。""" + logger.info( + "hermes_kb_growth_loop_started", + interval_seconds=interval_seconds, + limit=limit, + ) + while True: + try: + await run_hermes_kb_growth_once(limit=limit) + except asyncio.CancelledError: + raise + except Exception as exc: + logger.exception("hermes_kb_growth_loop_error", error=str(exc)) + await asyncio.sleep(interval_seconds) + + +async def _process_dispatch(row: GovernanceRemediationDispatch) -> None: + """處理單筆 pending dispatch,最後停在 waiting_owner_review。""" + dispatched = await transition_status(row.id, "pending", "dispatched") + executing = await transition_status(dispatched.id, "dispatched", "executing") + + km_entry = await _create_or_get_km_review_draft(executing) + updated_context = _build_review_context( + executing.decision_context or {}, + dispatch_id=executing.id, + governance_event_id=executing.governance_event_id, + km_entry_id=km_entry.id, + ) + await update_decision_context(executing.id, updated_context) + await transition_status(executing.id, "executing", "succeeded") + + logger.info( + "hermes_kb_growth_review_draft_ready", + dispatch_id=executing.id, + event_id=executing.governance_event_id, + km_entry_id=km_entry.id, + workflow_stage="waiting_owner_review", + ) + + +async def _create_or_get_km_review_draft( + dispatch: GovernanceRemediationDispatch, +) -> KnowledgeEntry: + """以 dispatch tag 做冪等,建立或取得 REVIEW 狀態 KM 草稿。""" + dispatch_tag = f"dispatch:{dispatch.id}" + payload = _build_km_review_entry_payload(dispatch) + + async with get_db_context() as db: + repo = KnowledgeDBRepository(db) + existing, _ = await repo.list_entries(tags=[dispatch_tag], limit=1) + if existing: + return existing[0] + return await repo.create(payload) + + +def _build_km_review_entry_payload( + dispatch: GovernanceRemediationDispatch, +) -> KnowledgeEntryCreate: + """把 governance dispatch 轉成待審核的 KM 草稿 payload。""" + context = dispatch.decision_context or {} + workflow = context.get("workflow") if isinstance(context.get("workflow"), dict) else {} + impact = workflow.get("impact") if isinstance(workflow.get("impact"), dict) else {} + extra = context.get("extra") if isinstance(context.get("extra"), dict) else {} + ownership = context.get("ownership") if isinstance(context.get("ownership"), dict) else {} + if not ownership and isinstance(extra.get("ownership"), dict): + ownership = extra["ownership"] + + stale_count = _pick_first(impact, extra, key="stale_count") + total_count = _pick_first(impact, extra, key="total_count") + stale_ratio = _pick_first(impact, context, key="stale_ratio") + threshold = _pick_first(impact, context, key="threshold") + stale_days = _pick_first(impact, extra, key="stale_days") + lead_agent = ownership.get("lead_agent") or "Hermes" + human_owner = ownership.get("human_owner") or "KM owner / SRE owner" + + content = "\n".join([ + "# KM 健康檢查草稿", + "", + "## 來源", + f"- governance_event_id: {dispatch.governance_event_id}", + f"- dispatch_id: {dispatch.id}", + f"- executor_type: {dispatch.executor_type}", + "", + "## 影響摘要", + f"- stale_count: {_format_unknown(stale_count)}", + f"- total_count: {_format_unknown(total_count)}", + f"- stale_ratio: {_format_ratio(stale_ratio)}", + f"- threshold: {_format_ratio(threshold)}", + f"- stale_days: {_format_unknown(stale_days)}", + "", + "## AI 已完成", + "- Hermes 已接手 knowledge_degradation dispatch。", + "- 已產生 KM 更新草稿與 owner review work item。", + "- 尚未把任何條目標成 approved / published。", + "", + "## Owner 審核重點", + "- 優先反查最近被 Incident、Sentry、SigNoz、PlayBook 引用的 KM。", + "- 確認草稿內容沒有把過期處置方式寫回正式知識庫。", + "- 審核通過後再進入 km_writeback_after_approval。", + "", + "## 安全邊界", + "- writes_km_without_approval=false", + f"- lead_agent={lead_agent}", + f"- human_owner={human_owner}", + ]) + + return KnowledgeEntryCreate( + title=f"KM healthcheck review draft - {dispatch.governance_event_id[:8]}", + content=content, + entry_type=EntryType.AUTO_RUNBOOK, + category="AI治理", + tags=[ + "governance:knowledge_degradation", + "workflow:kb_growth_healthcheck", + "stage:waiting_owner_review", + "agent:Hermes", + "needs_owner_review", + f"dispatch:{dispatch.id}", + f"governance_event:{dispatch.governance_event_id}", + ], + source=EntrySource.AI_EXTRACTED, + status=EntryStatus.REVIEW, + path_type="hermes_kb_growth_healthcheck", + created_by="hermes_kb_growth_worker", + ) + + +def _build_review_context( + context: dict[str, Any], + *, + dispatch_id: str, + governance_event_id: str, + km_entry_id: str, +) -> dict[str, Any]: + """更新 dispatch read model,讓 Work Items/Telegram 可見目前停在 owner review。""" + updated = deepcopy(context) + workflow = updated.setdefault("workflow", {}) + if not isinstance(workflow, dict): + workflow = {} + updated["workflow"] = workflow + + stages = workflow.setdefault("stage_by_dispatch_status", {}) + if not isinstance(stages, dict): + stages = {} + workflow["stage_by_dispatch_status"] = stages + stages.update({ + "executing": "draft_km_updates", + "succeeded": "waiting_owner_review", + "failed": "needs_manual_km_triage", + }) + + workflow["current_stage"] = "waiting_owner_review" + workflow["next_action"] = "owner_review_km_draft" + workflow["needs_human_review"] = True + workflow["writes_km_without_approval"] = False + workflow["kb_draft_entry_id"] = km_entry_id + + updated["next_action"] = "owner_review_km_draft" + updated["decision_path"] = "draft_created_waiting_owner_review" + updated["proposed_action"] = "Hermes 已建立 KM 更新草稿,等待 owner 審核" + updated["worker_result"] = { + "worker": "Hermes", + "executor_type": EXECUTOR_TYPE, + "dispatch_id": dispatch_id, + "governance_event_id": governance_event_id, + "km_draft_entry_id": km_entry_id, + "stage": "waiting_owner_review", + "status": "draft_created", + "writes_km_without_approval": False, + } + return updated + + +async def _mark_failed_if_started(dispatch_id: str, error: str) -> None: + """若 worker 已取得 dispatch,將它收斂到 failed,保留錯誤。""" + for from_status in ("executing", "dispatched"): + try: + await transition_status( + dispatch_id, + from_status, + "failed", + last_error=error[:500], + ) + return + except InvalidStatusTransition: + continue + except Exception as exc: + logger.warning( + "hermes_kb_growth_mark_failed_failed", + dispatch_id=dispatch_id, + from_status=from_status, + error=str(exc), + ) + return + + +def _pick_first(*sources: dict[str, Any], key: str) -> Any: + for source in sources: + if key in source: + return source[key] + return None + + +def _format_unknown(value: Any) -> str: + return "unknown" if value is None else str(value) + + +def _format_ratio(value: Any) -> str: + try: + return f"{float(value) * 100:.1f}%" + except (TypeError, ValueError): + return "unknown" diff --git a/apps/api/src/main.py b/apps/api/src/main.py index f911bfda..a47687df 100644 --- a/apps/api/src/main.py +++ b/apps/api/src/main.py @@ -668,6 +668,16 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]: except Exception as e: logger.warning("governance_dispatcher_schedule_failed", error=str(e)) + # T90 2026-05-19 ogt + Codex: Hermes KB growth worker(每 5 分鐘) + # 消費 knowledge_degradation 的 hermes_kb_growth_healthcheck dispatch, + # 只產生 REVIEW 草稿並停在 owner review,不直接批准或發布 KM。 + try: + from src.jobs.hermes_kb_growth_worker import run_hermes_kb_growth_loop + asyncio.create_task(run_hermes_kb_growth_loop()) + logger.info("hermes_kb_growth_worker_scheduled", interval_sec=300) + except Exception as e: + logger.warning("hermes_kb_growth_worker_schedule_failed", error=str(e)) + # 2026-04-25 P1.2 by Claude Engineer-A2 — failover 整合到 ai_router + lifespan # OllamaFailoverManager + OllamaAutoRecoveryService 飛輪接線: # failover 切換時 → recovery_callback → set_current_primary → Redis 持久化 diff --git a/apps/api/src/repositories/governance_remediation_dispatch_repo.py b/apps/api/src/repositories/governance_remediation_dispatch_repo.py index 90fd5840..3318bf54 100644 --- a/apps/api/src/repositories/governance_remediation_dispatch_repo.py +++ b/apps/api/src/repositories/governance_remediation_dispatch_repo.py @@ -356,6 +356,75 @@ async def list_pending( return list(result.scalars().all()) +async def list_pending_by_executor( + executor_type: str, + *, + limit: int = 50, +) -> list[GovernanceRemediationDispatch]: + """列出指定 executor 的 pending dispatch(按 dispatched_at ASC)。 + + 用於 Hermes / 其他 worker 消費自己的 work item。由 repository 層集中查詢, + 避免 job 直接散落表名與狀態條件。 + + Args: + executor_type: dispatch.executor_type,例如 hermes_kb_growth_healthcheck + limit: 本輪最多取幾筆,避免 backlog 一次拖垮 worker + + Returns: + 最舊優先的 pending dispatch 列表。 + """ + async with get_db_context() as db: + result = await db.execute( + select(GovernanceRemediationDispatch) + .where(GovernanceRemediationDispatch.dispatch_status == "pending") + .where(GovernanceRemediationDispatch.executor_type == executor_type) + .order_by(GovernanceRemediationDispatch.dispatched_at.asc()) + .limit(limit) + ) + return list(result.scalars().all()) + + +async def update_decision_context( + dispatch_id: str, + decision_context: dict[str, Any], +) -> GovernanceRemediationDispatch: + """更新 dispatch 的 decision_context,保留同一 row 的 audit trail。 + + 這只更新 dispatch work item 的讀模型上下文,不修改 immutable + ai_governance_events,也不代表治理事件已被解決。 + + Args: + dispatch_id: governance_remediation_dispatch.id + decision_context: 新的 JSONB context + + Returns: + 更新後的 GovernanceRemediationDispatch ORM 物件 + + Raises: + DispatchNotFound: 找不到 dispatch_id + """ + async with get_db_context() as db: + result = await db.execute( + select(GovernanceRemediationDispatch) + .where(GovernanceRemediationDispatch.id == dispatch_id) + ) + row = result.scalar_one_or_none() + if row is None: + raise DispatchNotFound(f"dispatch_id={dispatch_id!r} 不存在") + + row.decision_context = decision_context + await db.flush() + await db.refresh(row) + + logger.info( + "dispatch_decision_context_updated", + dispatch_id=dispatch_id, + event_id=row.governance_event_id, + executor_type=row.executor_type, + ) + return row + + async def list_by_event( event_id: str, ) -> list[GovernanceRemediationDispatch]: diff --git a/apps/api/tests/test_hermes_kb_growth_worker.py b/apps/api/tests/test_hermes_kb_growth_worker.py new file mode 100644 index 00000000..4652d3ca --- /dev/null +++ b/apps/api/tests/test_hermes_kb_growth_worker.py @@ -0,0 +1,120 @@ +from __future__ import annotations + +from types import SimpleNamespace +from unittest.mock import AsyncMock, call, patch + +import pytest + + +def _make_dispatch(**overrides): + defaults = { + "id": "dispatch-001", + "governance_event_id": "event-001", + "event_type": "knowledge_degradation", + "executor_type": "hermes_kb_growth_healthcheck", + "decision_context": { + "workflow": { + "impact": { + "stale_count": 1450, + "total_count": 1867, + "stale_ratio": 0.777, + "threshold": 0.2, + "stale_days": 7, + }, + "stage_by_dispatch_status": { + "pending": "queued_kb_healthcheck", + "executing": "draft_km_updates", + "succeeded": "stale_ratio_recheck", + }, + }, + "ownership": { + "lead_agent": "Hermes", + "human_owner": "KM owner / SRE owner", + }, + }, + } + defaults.update(overrides) + return SimpleNamespace(**defaults) + + +def test_km_review_payload_is_review_only(): + """Hermes worker 只能產生 REVIEW 草稿,不可直接 approve/publish KM。""" + from src.jobs.hermes_kb_growth_worker import _build_km_review_entry_payload + from src.models.knowledge import EntrySource, EntryStatus, EntryType + + payload = _build_km_review_entry_payload(_make_dispatch()) + + assert payload.entry_type == EntryType.AUTO_RUNBOOK + assert payload.source == EntrySource.AI_EXTRACTED + assert payload.status == EntryStatus.REVIEW + assert "agent:Hermes" in payload.tags + assert "needs_owner_review" in payload.tags + assert "dispatch:dispatch-001" in payload.tags + assert "writes_km_without_approval=false" in payload.content + + +def test_review_context_keeps_succeeded_at_owner_review_stage(): + """dispatch succeeded 代表 worker 完成草稿,不代表 KM 劣化已解決。""" + from src.jobs.hermes_kb_growth_worker import _build_review_context + + context = _make_dispatch().decision_context + updated = _build_review_context( + context, + dispatch_id="dispatch-001", + governance_event_id="event-001", + km_entry_id="km-001", + ) + + workflow = updated["workflow"] + assert workflow["current_stage"] == "waiting_owner_review" + assert workflow["stage_by_dispatch_status"]["succeeded"] == "waiting_owner_review" + assert workflow["kb_draft_entry_id"] == "km-001" + assert workflow["writes_km_without_approval"] is False + assert updated["next_action"] == "owner_review_km_draft" + assert updated["worker_result"]["status"] == "draft_created" + + +@pytest.mark.asyncio +async def test_run_once_advances_pending_dispatch_to_review_draft(): + """pending dispatch 應推進到 succeeded + waiting_owner_review read model。""" + from src.jobs.hermes_kb_growth_worker import run_hermes_kb_growth_once + + pending = _make_dispatch() + dispatched = _make_dispatch() + executing = _make_dispatch() + km_entry = SimpleNamespace(id="km-001") + + transition_mock = AsyncMock(side_effect=[dispatched, executing, _make_dispatch()]) + + with ( + patch( + "src.jobs.hermes_kb_growth_worker.list_pending_by_executor", + new=AsyncMock(return_value=[pending]), + ) as list_pending, + patch( + "src.jobs.hermes_kb_growth_worker.transition_status", + new=transition_mock, + ), + patch( + "src.jobs.hermes_kb_growth_worker._create_or_get_km_review_draft", + new=AsyncMock(return_value=km_entry), + ) as create_draft, + patch( + "src.jobs.hermes_kb_growth_worker.update_decision_context", + new=AsyncMock(), + ) as update_context, + ): + result = await run_hermes_kb_growth_once(limit=5) + + assert result == {"scanned": 1, "processed": 1, "skipped": 0, "failed": 0} + list_pending.assert_awaited_once_with("hermes_kb_growth_healthcheck", limit=5) + create_draft.assert_awaited_once_with(executing) + transition_mock.assert_has_awaits([ + call("dispatch-001", "pending", "dispatched"), + call("dispatch-001", "dispatched", "executing"), + call("dispatch-001", "executing", "succeeded"), + ]) + update_context.assert_awaited_once() + updated_context = update_context.call_args.args[1] + assert updated_context["workflow"]["current_stage"] == "waiting_owner_review" + assert updated_context["worker_result"]["km_draft_entry_id"] == "km-001"