fix(governance): dedupe km degradation owner review
All checks were successful
CD Pipeline / tests (push) Successful in 5m4s
Code Review / ai-code-review (push) Successful in 10s
CD Pipeline / build-and-deploy (push) Successful in 4m29s
CD Pipeline / post-deploy-checks (push) Successful in 1m38s

This commit is contained in:
Your Name
2026-05-24 16:14:51 +08:00
parent 7fd52d26b5
commit de68514283
3 changed files with 275 additions and 12 deletions

View File

@@ -20,12 +20,13 @@ from datetime import timedelta
from typing import Any
import structlog
from sqlalchemy import func, select
from sqlalchemy import func, select, update
from src.db.base import get_db_context
from src.db.models import (
AiGovernanceEvent,
AutoRepairExecution,
GovernanceRemediationDispatch,
IncidentEvidence,
KnowledgeEntryRecord,
PlaybookRecord,
@@ -53,6 +54,7 @@ KM_STALE_RATIO = 0.20 # 陳舊比例超過此值 → 告警
HALLUCINATION_RATE_THRESHOLD = 0.10 # LLM verification failed 比例超過此值 → 告警
EXECUTION_FAIL_RATE_THRESHOLD = 0.15 # 執行失敗比例超過此值 → 告警
RECENT_LIMIT = 100 # 最近幾筆做統計
GOVERNANCE_SELF_CHECK_LEASE_KEY = "governance:self_check:cycle_lease"
# =============================================================================
@@ -211,6 +213,21 @@ class GovernanceAgent:
ratio = stale / total if total > 0 else 0.0
if total > 0 and ratio > KM_STALE_RATIO:
if await _has_open_knowledge_degradation_review():
logger.info(
"governance_knowledge_degradation_alert_suppressed",
reason="open_owner_review_exists",
total=total,
stale=stale,
ratio=round(ratio, 3),
)
return {
"total": total,
"stale": stale,
"ratio": round(ratio, 3),
"alert_suppressed": True,
"suppress_reason": "open_owner_review_exists",
}
await self._alert(
"knowledge_degradation",
{
@@ -259,7 +276,10 @@ class GovernanceAgent:
stale=stale,
ratio=round(ratio, 3),
)
return {"total": total, "stale": stale, "ratio": round(ratio, 3)}
result = {"total": total, "stale": stale, "ratio": round(ratio, 3)}
if total > 0 and ratio <= KM_STALE_RATIO:
result["resolved_open_events"] = await _resolve_open_knowledge_degradation_events()
return result
# =========================================================================
# 3. LLM 幻覺率
@@ -413,9 +433,10 @@ class GovernanceAgent:
2026-04-27 P3.4 by Claude — AI SLOADR-100
"""
import httpx
import math
import httpx
from src.core.config import settings
prom_url = getattr(settings, "PROMETHEUS_URL", "http://prometheus.observability.svc:9090")
@@ -757,6 +778,70 @@ class GovernanceAgent:
logger.warning("governance_telegram_alert_failed", error=str(e))
async def _has_open_knowledge_degradation_review() -> bool:
"""已有 Hermes owner-review 工單時,不再重複建立 KM stale 告警。
多個 API Pod 會同時啟動 governance loop同一個 stale ratio 若已經
進入 Hermes review draft就應視為「同一個未結治理工作」避免
Telegram / Work Items 每輪產生新的治理事件與 REVIEW 草稿。
"""
try:
async with get_db_context() as db:
result = await db.execute(
select(GovernanceRemediationDispatch.id)
.join(
AiGovernanceEvent,
GovernanceRemediationDispatch.governance_event_id == AiGovernanceEvent.id,
)
.where(AiGovernanceEvent.event_type == "knowledge_degradation")
.where(AiGovernanceEvent.resolved.is_(False))
.where(GovernanceRemediationDispatch.event_type == "knowledge_degradation")
.where(GovernanceRemediationDispatch.executor_type == "hermes_kb_growth_healthcheck")
.where(
GovernanceRemediationDispatch.dispatch_status.in_(
["pending", "dispatched", "executing", "succeeded"]
)
)
.order_by(GovernanceRemediationDispatch.dispatched_at.desc())
.limit(1)
)
return result.scalar_one_or_none() is not None
except Exception as exc:
logger.warning(
"governance_knowledge_degradation_review_lookup_failed_fail_open",
error=str(exc),
)
return False
async def _resolve_open_knowledge_degradation_events() -> int:
"""KM stale ratio 回到門檻內時,收斂未解治理事件。"""
try:
async with get_db_context() as db:
result = await db.execute(
update(AiGovernanceEvent)
.where(AiGovernanceEvent.event_type == "knowledge_degradation")
.where(AiGovernanceEvent.resolved.is_(False))
.values(resolved=True, resolved_at=now_taipei())
.execution_options(synchronize_session=False)
)
resolved_count = int(result.rowcount or 0)
if resolved_count:
await db.commit()
logger.info(
"governance_knowledge_degradation_resolved",
resolved_count=resolved_count,
reason="stale_ratio_recovered",
)
return resolved_count
except Exception as exc:
logger.warning(
"governance_knowledge_degradation_resolve_failed",
error=str(exc),
)
return 0
async def _maybe_create_intake_dispatch(
event_id: str,
event_type: str,
@@ -891,7 +976,40 @@ async def run_governance_loop(interval_seconds: int = 3600) -> None:
agent = get_governance_agent()
while True:
try:
await agent.run_self_check()
if await _try_acquire_governance_self_check_lease(interval_seconds):
await agent.run_self_check()
else:
logger.debug(
"governance_self_check_cycle_skipped",
reason="cycle_lease_held",
)
except Exception as e:
logger.warning("governance_loop_error", error=str(e))
await asyncio.sleep(interval_seconds)
async def _try_acquire_governance_self_check_lease(interval_seconds: int) -> bool:
"""跨 API Pod 的 self-check 週期租約。
這是週期 cooldown不是 critical-section lock取得後不主動 release。
TTL 到期前其他 replica 只略過本輪,避免同一治理狀態被多個 Pod 寫成
多筆事件、多張 Hermes KM 草稿。
"""
ttl = max(60, int(interval_seconds))
try:
from src.core.redis_client import get_redis
redis = get_redis()
acquired = await redis.set(
GOVERNANCE_SELF_CHECK_LEASE_KEY,
"1",
ex=ttl,
nx=True,
)
return bool(acquired)
except Exception as exc:
logger.warning(
"governance_self_check_lease_unavailable_fail_open",
error=str(exc),
)
return True

View File

@@ -25,7 +25,6 @@ from src.services.governance_agent import (
GovernanceAgent,
)
# =============================================================================
# Helpers
# =============================================================================
@@ -78,9 +77,9 @@ class TestCheckTrustDrift:
@pytest.mark.asyncio
async def test_drifted_playbooks_trigger_alert(self):
"""有 playbook trust_score < 0.2 + 最近用過 → 觸發告警,不 auto-deprecate"""
from datetime import datetime, timezone
from datetime import UTC, datetime
recent = datetime.now(timezone.utc)
recent = datetime.now(UTC)
low_record = MagicMock()
low_record.trust_score = 0.05
low_record.playbook_id = "PB-LOW"
@@ -125,10 +124,10 @@ class TestCheckTrustDrift:
2026-05-02 ogt + Claude Sonnet 4.6: 飛輪自治新路徑
"""
from datetime import datetime, timedelta, timezone
from datetime import UTC, datetime, timedelta
old = datetime.now(timezone.utc) - timedelta(days=45)
recent = datetime.now(timezone.utc)
old = datetime.now(UTC) - timedelta(days=45)
recent = datetime.now(UTC)
stale_low = MagicMock()
stale_low.trust_score = 0.1
@@ -214,13 +213,20 @@ class TestCheckKnowledgeDegradation:
alerter.alert_governance = AsyncMock()
agent = _make_agent(alerter=alerter)
with patch("src.services.governance_agent.get_db_context") as mock_ctx:
with (
patch("src.services.governance_agent.get_db_context") as mock_ctx,
patch(
"src.services.governance_agent._resolve_open_knowledge_degradation_events",
new=AsyncMock(return_value=0),
) as mock_resolve,
):
mock_ctx.return_value.__aenter__ = AsyncMock(return_value=mock_db)
mock_ctx.return_value.__aexit__ = AsyncMock(return_value=False)
result = await agent.check_knowledge_degradation()
alerter.alert_governance.assert_not_called()
mock_resolve.assert_awaited_once()
assert result["stale"] == 1
assert result["total"] == 10
assert result["ratio"] == 0.1
@@ -249,12 +255,17 @@ class TestCheckKnowledgeDegradation:
"src.services.governance_agent.create_dispatch",
new=AsyncMock(),
) as mock_create_dispatch,
patch(
"src.services.governance_agent._has_open_knowledge_degradation_review",
new=AsyncMock(return_value=False),
) as mock_has_open_review,
):
mock_ctx.return_value.__aenter__ = AsyncMock(return_value=mock_db)
mock_ctx.return_value.__aexit__ = AsyncMock(return_value=False)
result = await agent.check_knowledge_degradation()
mock_has_open_review.assert_awaited_once()
alerter.alert_governance.assert_called_once()
call_args = alerter.alert_governance.call_args
assert call_args[0][0] == "knowledge_degradation"
@@ -274,9 +285,49 @@ class TestCheckKnowledgeDegradation:
assert result["stale"] == 3
assert result["ratio"] == 0.3
@pytest.mark.asyncio
async def test_stale_ratio_above_threshold_suppresses_when_owner_review_open(self):
"""已有 Hermes owner-review 時,同一 KM stale 狀態不重複打 Telegram / 建草稿。"""
mock_db = AsyncMock()
total_mock = MagicMock()
total_mock.scalar.return_value = 10
stale_mock = MagicMock()
stale_mock.scalar.return_value = 3
mock_db.execute = AsyncMock(side_effect=[total_mock, stale_mock])
alerter = AsyncMock()
alerter.alert_governance = AsyncMock()
agent = _make_agent(alerter=alerter)
with (
patch("src.services.governance_agent.get_db_context") as mock_ctx,
patch(
"src.services.governance_agent.create_dispatch",
new=AsyncMock(),
) as mock_create_dispatch,
patch(
"src.services.governance_agent._has_open_knowledge_degradation_review",
new=AsyncMock(return_value=True),
) as mock_has_open_review,
):
mock_ctx.return_value.__aenter__ = AsyncMock(return_value=mock_db)
mock_ctx.return_value.__aexit__ = AsyncMock(return_value=False)
result = await agent.check_knowledge_degradation()
mock_has_open_review.assert_awaited_once()
alerter.alert_governance.assert_not_called()
mock_create_dispatch.assert_not_awaited()
assert result["alert_suppressed"] is True
assert result["suppress_reason"] == "open_owner_review_exists"
def test_knowledge_degradation_dispatch_context(self):
"""intake dispatch context 必須能被 Work Items 直接讀出 owner / stage / next_action."""
from src.services.governance_agent import _build_knowledge_degradation_dispatch_context
from src.services.governance_agent import (
_build_knowledge_degradation_dispatch_context,
)
ctx = _build_knowledge_degradation_dispatch_context(
"evt-km-001",
@@ -542,6 +593,60 @@ class TestRunSelfCheck:
assert "error" in results[key]
# =============================================================================
# Governance self-check cycle lease
# =============================================================================
class TestGovernanceSelfCheckLease:
"""多 API Pod 只能有一個 Pod 寫入本輪治理事件。"""
@pytest.mark.asyncio
async def test_cycle_lease_acquired(self):
from src.services.governance_agent import (
GOVERNANCE_SELF_CHECK_LEASE_KEY,
_try_acquire_governance_self_check_lease,
)
mock_redis = AsyncMock()
mock_redis.set = AsyncMock(return_value=True)
with patch("src.core.redis_client.get_redis", return_value=mock_redis):
acquired = await _try_acquire_governance_self_check_lease(3600)
assert acquired is True
mock_redis.set.assert_awaited_once_with(
GOVERNANCE_SELF_CHECK_LEASE_KEY,
"1",
ex=3600,
nx=True,
)
@pytest.mark.asyncio
async def test_cycle_lease_blocks_second_pod(self):
from src.services.governance_agent import (
_try_acquire_governance_self_check_lease,
)
mock_redis = AsyncMock()
mock_redis.set = AsyncMock(return_value=None)
with patch("src.core.redis_client.get_redis", return_value=mock_redis):
acquired = await _try_acquire_governance_self_check_lease(3600)
assert acquired is False
@pytest.mark.asyncio
async def test_cycle_lease_fail_open_when_redis_unavailable(self):
from src.services.governance_agent import (
_try_acquire_governance_self_check_lease,
)
with patch("src.core.redis_client.get_redis", side_effect=RuntimeError("redis down")):
acquired = await _try_acquire_governance_self_check_lease(3600)
assert acquired is True
# =============================================================================
# FailoverAlerter.alert_governance — dedup 邏輯
# =============================================================================

View File

@@ -1,3 +1,43 @@
## 2026-05-24T153 KM degradation governance dedupe / owner-review lifecycle
**觸發**
- Telegram `AI 治理警報KM 需要更新(影響 AI 判斷)` 顯示 `1490 / 3016` stale KM、`stale_ratio=49.4%`,使用者詢問這類告警應如何接續處理,以及陳舊資料要如何收斂。
- live API 查核確認 Hermes 其實有接手:最新 `knowledge_degradation` event 已有 `hermes_kb_growth_healthcheck` dispatch狀態為 `succeeded / waiting_owner_review`,並產生 `kb_draft_entry_id`
- 真正噪音來源是 production `awoooi-api` 有 2 個 replicas而每個 API Pod 都會啟動 `governance_agent` loop同一個 KM stale 狀態會被多個 Pod 寫成多筆治理事件,再各自產生 KM review draft。
**修正**
- `GovernanceAgent.check_knowledge_degradation()` 在 stale ratio 超標時,若已有 unresolved `knowledge_degradation` 且存在 `hermes_kb_growth_healthcheck` 的 pending / dispatched / executing / succeeded dispatch就不再新增 Telegram 告警、治理事件與 KM review draft。
- `run_governance_loop()` 新增 Redis cycle lease同一個 self-check 週期只允許一個 API Pod 執行,避免多 replica 同步寫入重複治理事件Redis 不可用時 fail-open維持治理自檢不中斷。
- stale ratio 回到門檻內時,會把 unresolved `knowledge_degradation` 事件標為 resolved讓「治理品質恢復」能在 AwoooP 裡收斂,而不是永遠留在未解清單。
- 補測試覆蓋:
- stale ratio 超標且已有 owner-review 時不重複送告警 / 建草稿。
- governance self-check cycle lease acquire / second pod blocked / Redis unavailable fail-open。
**local verification**
```text
python3 -m py_compile apps/api/src/services/governance_agent.py apps/api/tests/test_governance_agent.py -> OK
DATABASE_URL='postgresql+asyncpg://test:test@localhost/test' REDIS_URL='redis://localhost:6379/0' /Users/ogt/.pyenv/versions/3.11.7/bin/python -m pytest apps/api/tests/test_governance_agent.py apps/api/tests/test_hermes_kb_growth_worker.py apps/api/tests/test_governance_dispatcher.py apps/api/tests/test_ai_governance_endpoints.py -q
-> 90 passed
DATABASE_URL='postgresql+asyncpg://test:test@localhost/test' REDIS_URL='redis://localhost:6379/0' /Users/ogt/.pyenv/versions/3.11.7/bin/python -m ruff check apps/api/src/services/governance_agent.py apps/api/tests/test_governance_agent.py -> OK
```
**處置判讀**
- 這類告警不是服務故障,不應重啟 API / Redis / K8s workload。
- 正確接續流程是:治理事件偵測 → Hermes 建立 KM healthcheck review draft → OpenClaw 提供 Incident / 規則 / PlayBook 脈絡 → ElephantAlpha read-only 稽核 → KM/SRE owner 審核高影響草稿 → 審核後才 writeback / archive / recheck stale ratio。
- 陳舊 KM 不等於錯誤 KM不得只改 `updated_at` 來壓低 stale ratio。應分三類處理仍有效但需補證據的更新、被新條目取代的 archive/supersede、最近被 Incident / Sentry / SigNoz / PlayBook 引用的高優先級 owner review。
**目前整體進度**
- AwoooP 告警可觀測鏈:約 97.5%。
- 治理告警可讀性 / 可處置性:約 93%。
- KM stale governance 自動化:約 78%。
- 前端 AI 自動化管理介面同步:約 92.5%。
- 完整 AI 自動化管理產品化:約 90%。
## 2026-05-24T152 Ansible runtime readiness surfaced
**觸發**