All checks were successful
Code Review / ai-code-review (push) Successful in 11s
Deploy Alert Rules / Deploy Prometheus Alert Rules (push) Successful in 22s
CD Pipeline / tests (push) Successful in 1m6s
CD Pipeline / build-and-deploy (push) Successful in 5m17s
CD Pipeline / post-deploy-checks (push) Successful in 1m38s
701 lines
27 KiB
Python
701 lines
27 KiB
Python
# apps/api/tests/test_governance_agent.py | 2026-04-26 @ Asia/Taipei
|
||
# 2026-04-26 P2.2 by Claude — GovernanceAgent 單元測試
|
||
"""
|
||
GovernanceAgent 單元測試 — P2.2
|
||
================================
|
||
測試覆蓋:
|
||
- check_trust_drift : 觸發 / 不觸發
|
||
- check_knowledge_degradation : 觸發 / 不觸發
|
||
- check_llm_hallucination : 觸發 / 不觸發 / 空資料
|
||
- check_execution_blast_radius : 觸發 / 不觸發 / 空資料
|
||
- run_self_check : 全跑 + exception 隔離(單一 check 拋例外不影響其他)
|
||
- alert_governance : FailoverAlerter dedup 邏輯
|
||
|
||
測試分類:unit(全部 mock DB / alerter,無真實 PG 依賴)
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
from typing import Any
|
||
from unittest.mock import AsyncMock, MagicMock, patch
|
||
|
||
import pytest
|
||
|
||
from src.services.governance_agent import (
|
||
GovernanceAgent,
|
||
)
|
||
|
||
|
||
# =============================================================================
|
||
# Helpers
|
||
# =============================================================================
|
||
|
||
def _make_agent(alerter=None) -> GovernanceAgent:
|
||
"""建立 GovernanceAgent,注入 mock alerter"""
|
||
if alerter is None:
|
||
alerter = AsyncMock()
|
||
alerter.alert_governance = AsyncMock()
|
||
return GovernanceAgent(alerter=alerter)
|
||
|
||
|
||
# =============================================================================
|
||
# check_trust_drift
|
||
# =============================================================================
|
||
|
||
class TestCheckTrustDrift:
|
||
"""check_trust_drift — Playbook 信任度漂移"""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_no_drifted_playbooks_no_alert(self):
|
||
"""所有 playbook trust_score >= 0.2 → 不觸發告警"""
|
||
mock_record = MagicMock()
|
||
mock_record.trust_score = 0.8
|
||
mock_record.playbook_id = "PB-001"
|
||
mock_record.last_used_at = None
|
||
mock_record.created_at = None
|
||
|
||
mock_result = MagicMock()
|
||
mock_result.scalars.return_value.all.return_value = [mock_record]
|
||
|
||
mock_db = AsyncMock()
|
||
mock_db.execute = AsyncMock(return_value=mock_result)
|
||
mock_db.commit = AsyncMock()
|
||
|
||
alerter = AsyncMock()
|
||
alerter.alert_governance = AsyncMock()
|
||
agent = _make_agent(alerter=alerter)
|
||
|
||
with patch("src.services.governance_agent.get_db_context") as mock_ctx:
|
||
mock_ctx.return_value.__aenter__ = AsyncMock(return_value=mock_db)
|
||
mock_ctx.return_value.__aexit__ = AsyncMock(return_value=False)
|
||
|
||
result = await agent.check_trust_drift()
|
||
|
||
alerter.alert_governance.assert_not_called()
|
||
assert result["drifted"] == 0
|
||
assert result["checked"] == 1
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_drifted_playbooks_trigger_alert(self):
|
||
"""有 playbook trust_score < 0.2 + 最近用過 → 觸發告警,不 auto-deprecate"""
|
||
from datetime import datetime, timezone
|
||
|
||
recent = datetime.now(timezone.utc)
|
||
low_record = MagicMock()
|
||
low_record.trust_score = 0.05
|
||
low_record.playbook_id = "PB-LOW"
|
||
low_record.last_used_at = recent # 最近用過 → kept
|
||
low_record.created_at = recent
|
||
|
||
ok_record = MagicMock()
|
||
ok_record.trust_score = 0.9
|
||
ok_record.playbook_id = "PB-OK"
|
||
ok_record.last_used_at = recent
|
||
ok_record.created_at = recent
|
||
|
||
mock_result = MagicMock()
|
||
mock_result.scalars.return_value.all.return_value = [low_record, ok_record]
|
||
|
||
mock_db = AsyncMock()
|
||
mock_db.execute = AsyncMock(return_value=mock_result)
|
||
mock_db.commit = AsyncMock()
|
||
|
||
alerter = AsyncMock()
|
||
alerter.alert_governance = AsyncMock()
|
||
agent = _make_agent(alerter=alerter)
|
||
|
||
with patch("src.services.governance_agent.get_db_context") as mock_ctx:
|
||
mock_ctx.return_value.__aenter__ = AsyncMock(return_value=mock_db)
|
||
mock_ctx.return_value.__aexit__ = AsyncMock(return_value=False)
|
||
|
||
result = await agent.check_trust_drift()
|
||
|
||
alerter.alert_governance.assert_called_once()
|
||
call_args = alerter.alert_governance.call_args
|
||
assert call_args[0][0] == "trust_drift"
|
||
assert call_args[0][1]["drifted_count"] == 1
|
||
assert call_args[0][1]["auto_deprecated_count"] == 0
|
||
assert result["drifted"] == 1
|
||
assert result["auto_deprecated"] == 0
|
||
assert result["checked"] == 2
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_low_trust_unused_30d_auto_deprecates(self):
|
||
"""trust < 0.2 + last_used > 30 天前 → 自動 status='deprecated'
|
||
|
||
2026-05-02 ogt + Claude Sonnet 4.6: 飛輪自治新路徑
|
||
"""
|
||
from datetime import datetime, timedelta, timezone
|
||
|
||
old = datetime.now(timezone.utc) - timedelta(days=45)
|
||
recent = datetime.now(timezone.utc)
|
||
|
||
stale_low = MagicMock()
|
||
stale_low.trust_score = 0.1
|
||
stale_low.playbook_id = "PB-STALE"
|
||
stale_low.status = "approved"
|
||
stale_low.last_used_at = old
|
||
stale_low.created_at = old
|
||
|
||
fresh_low = MagicMock()
|
||
fresh_low.trust_score = 0.1
|
||
fresh_low.playbook_id = "PB-FRESH"
|
||
fresh_low.status = "approved"
|
||
fresh_low.last_used_at = recent # 7 天試用期內
|
||
fresh_low.created_at = recent
|
||
|
||
never_used_old = MagicMock()
|
||
never_used_old.trust_score = 0.05
|
||
never_used_old.playbook_id = "PB-NEVER-USED-OLD"
|
||
never_used_old.status = "approved"
|
||
never_used_old.last_used_at = None # 從沒用過
|
||
never_used_old.created_at = old # 但創建超過 30 天 → 該 deprecate
|
||
|
||
mock_result = MagicMock()
|
||
mock_result.scalars.return_value.all.return_value = [
|
||
stale_low, fresh_low, never_used_old,
|
||
]
|
||
|
||
mock_db = AsyncMock()
|
||
mock_db.execute = AsyncMock(return_value=mock_result)
|
||
mock_db.commit = AsyncMock()
|
||
|
||
alerter = AsyncMock()
|
||
alerter.alert_governance = AsyncMock()
|
||
agent = _make_agent(alerter=alerter)
|
||
|
||
with patch("src.services.governance_agent.get_db_context") as mock_ctx:
|
||
mock_ctx.return_value.__aenter__ = AsyncMock(return_value=mock_db)
|
||
mock_ctx.return_value.__aexit__ = AsyncMock(return_value=False)
|
||
|
||
result = await agent.check_trust_drift()
|
||
|
||
# 兩個老的都被 deprecate
|
||
assert stale_low.status == "deprecated"
|
||
assert never_used_old.status == "deprecated"
|
||
# 新的不動
|
||
assert fresh_low.status == "approved"
|
||
|
||
# commit 必須被呼叫一次
|
||
mock_db.commit.assert_awaited()
|
||
|
||
# alert payload 反映自治結果
|
||
call_args = alerter.alert_governance.call_args[0][1]
|
||
assert call_args["drifted_count"] == 3
|
||
assert call_args["auto_deprecated_count"] == 2
|
||
assert set(call_args["auto_deprecated_ids"]) == {"PB-STALE", "PB-NEVER-USED-OLD"}
|
||
assert call_args["playbook_ids"] == ["PB-FRESH"]
|
||
|
||
assert result["auto_deprecated"] == 2
|
||
assert result["kept"] == 1
|
||
|
||
|
||
# =============================================================================
|
||
# check_knowledge_degradation
|
||
# =============================================================================
|
||
|
||
class TestCheckKnowledgeDegradation:
|
||
"""check_knowledge_degradation — 知識庫衰退"""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_stale_ratio_below_threshold_no_alert(self):
|
||
"""陳舊比例 < 20% → 不觸發告警"""
|
||
# total=10, stale=1 → ratio=0.1 < 0.2
|
||
mock_db = AsyncMock()
|
||
|
||
total_mock = MagicMock()
|
||
total_mock.scalar.return_value = 10
|
||
stale_mock = MagicMock()
|
||
stale_mock.scalar.return_value = 1
|
||
|
||
mock_db.execute = AsyncMock(side_effect=[total_mock, stale_mock])
|
||
|
||
alerter = AsyncMock()
|
||
alerter.alert_governance = AsyncMock()
|
||
agent = _make_agent(alerter=alerter)
|
||
|
||
with patch("src.services.governance_agent.get_db_context") as mock_ctx:
|
||
mock_ctx.return_value.__aenter__ = AsyncMock(return_value=mock_db)
|
||
mock_ctx.return_value.__aexit__ = AsyncMock(return_value=False)
|
||
|
||
result = await agent.check_knowledge_degradation()
|
||
|
||
alerter.alert_governance.assert_not_called()
|
||
assert result["stale"] == 1
|
||
assert result["total"] == 10
|
||
assert result["ratio"] == 0.1
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_stale_ratio_above_threshold_triggers_alert(self):
|
||
"""陳舊比例 > 20% → 觸發告警"""
|
||
# total=10, stale=3 → ratio=0.3 > 0.2
|
||
mock_db = AsyncMock()
|
||
|
||
total_mock = MagicMock()
|
||
total_mock.scalar.return_value = 10
|
||
stale_mock = MagicMock()
|
||
stale_mock.scalar.return_value = 3
|
||
|
||
mock_db.execute = AsyncMock(side_effect=[total_mock, stale_mock])
|
||
|
||
alerter = AsyncMock()
|
||
alerter.alert_governance = AsyncMock()
|
||
agent = _make_agent(alerter=alerter)
|
||
|
||
with patch("src.services.governance_agent.get_db_context") as mock_ctx:
|
||
mock_ctx.return_value.__aenter__ = AsyncMock(return_value=mock_db)
|
||
mock_ctx.return_value.__aexit__ = AsyncMock(return_value=False)
|
||
|
||
result = await agent.check_knowledge_degradation()
|
||
|
||
alerter.alert_governance.assert_called_once()
|
||
call_args = alerter.alert_governance.call_args
|
||
assert call_args[0][0] == "knowledge_degradation"
|
||
assert result["stale"] == 3
|
||
assert result["ratio"] == 0.3
|
||
|
||
|
||
# =============================================================================
|
||
# check_llm_hallucination
|
||
# =============================================================================
|
||
|
||
class TestCheckLlmHallucination:
|
||
"""check_llm_hallucination — LLM 幻覺率"""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_empty_evidence_no_alert(self):
|
||
"""沒有 evidence 記錄 → 不觸發告警,rate=0"""
|
||
mock_result = MagicMock()
|
||
mock_result.scalars.return_value.all.return_value = []
|
||
|
||
mock_db = AsyncMock()
|
||
mock_db.execute = AsyncMock(return_value=mock_result)
|
||
|
||
alerter = AsyncMock()
|
||
alerter.alert_governance = AsyncMock()
|
||
agent = _make_agent(alerter=alerter)
|
||
|
||
with patch("src.services.governance_agent.get_db_context") as mock_ctx:
|
||
mock_ctx.return_value.__aenter__ = AsyncMock(return_value=mock_db)
|
||
mock_ctx.return_value.__aexit__ = AsyncMock(return_value=False)
|
||
|
||
result = await agent.check_llm_hallucination()
|
||
|
||
alerter.alert_governance.assert_not_called()
|
||
assert result["rate"] == 0.0
|
||
assert result["total"] == 0
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_hallucination_below_threshold_no_alert(self):
|
||
"""failed 比例 < 10% → 不觸發告警"""
|
||
# 100 筆中 8 筆 failed → 8% < 10%
|
||
rows = ["success"] * 92 + ["failed"] * 8
|
||
|
||
mock_result = MagicMock()
|
||
mock_result.scalars.return_value.all.return_value = rows
|
||
|
||
mock_db = AsyncMock()
|
||
mock_db.execute = AsyncMock(return_value=mock_result)
|
||
|
||
alerter = AsyncMock()
|
||
alerter.alert_governance = AsyncMock()
|
||
agent = _make_agent(alerter=alerter)
|
||
|
||
with patch("src.services.governance_agent.get_db_context") as mock_ctx:
|
||
mock_ctx.return_value.__aenter__ = AsyncMock(return_value=mock_db)
|
||
mock_ctx.return_value.__aexit__ = AsyncMock(return_value=False)
|
||
|
||
result = await agent.check_llm_hallucination()
|
||
|
||
alerter.alert_governance.assert_not_called()
|
||
assert result["failed"] == 8
|
||
assert result["rate"] == 0.08
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_hallucination_above_threshold_triggers_alert(self):
|
||
"""failed 比例 > 10% → 觸發告警"""
|
||
# 100 筆中 15 筆 failed → 15% > 10%
|
||
rows = ["success"] * 85 + ["failed"] * 15
|
||
|
||
mock_result = MagicMock()
|
||
mock_result.scalars.return_value.all.return_value = rows
|
||
|
||
mock_db = AsyncMock()
|
||
mock_db.execute = AsyncMock(return_value=mock_result)
|
||
|
||
alerter = AsyncMock()
|
||
alerter.alert_governance = AsyncMock()
|
||
agent = _make_agent(alerter=alerter)
|
||
|
||
with patch("src.services.governance_agent.get_db_context") as mock_ctx:
|
||
mock_ctx.return_value.__aenter__ = AsyncMock(return_value=mock_db)
|
||
mock_ctx.return_value.__aexit__ = AsyncMock(return_value=False)
|
||
|
||
result = await agent.check_llm_hallucination()
|
||
|
||
alerter.alert_governance.assert_called_once()
|
||
call_args = alerter.alert_governance.call_args
|
||
assert call_args[0][0] == "llm_hallucination"
|
||
assert result["failed"] == 15
|
||
assert result["rate"] == 0.15
|
||
|
||
|
||
# =============================================================================
|
||
# check_execution_blast_radius
|
||
# =============================================================================
|
||
|
||
class TestCheckExecutionBlastRadius:
|
||
"""check_execution_blast_radius — 執行失敗率"""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_empty_executions_no_alert(self):
|
||
"""沒有執行記錄 → 不觸發告警"""
|
||
mock_result = MagicMock()
|
||
mock_result.scalars.return_value.all.return_value = []
|
||
|
||
mock_db = AsyncMock()
|
||
mock_db.execute = AsyncMock(return_value=mock_result)
|
||
|
||
alerter = AsyncMock()
|
||
alerter.alert_governance = AsyncMock()
|
||
agent = _make_agent(alerter=alerter)
|
||
|
||
with patch("src.services.governance_agent.get_db_context") as mock_ctx:
|
||
mock_ctx.return_value.__aenter__ = AsyncMock(return_value=mock_db)
|
||
mock_ctx.return_value.__aexit__ = AsyncMock(return_value=False)
|
||
|
||
result = await agent.check_execution_blast_radius()
|
||
|
||
alerter.alert_governance.assert_not_called()
|
||
assert result["total"] == 0
|
||
assert result["rate"] == 0.0
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_failure_rate_below_threshold_no_alert(self):
|
||
"""失敗比例 < 15% → 不觸發告警"""
|
||
# 100 筆,10 筆 False → 10% < 15%
|
||
rows = [True] * 90 + [False] * 10
|
||
|
||
mock_result = MagicMock()
|
||
mock_result.scalars.return_value.all.return_value = rows
|
||
|
||
mock_db = AsyncMock()
|
||
mock_db.execute = AsyncMock(return_value=mock_result)
|
||
|
||
alerter = AsyncMock()
|
||
alerter.alert_governance = AsyncMock()
|
||
agent = _make_agent(alerter=alerter)
|
||
|
||
with patch("src.services.governance_agent.get_db_context") as mock_ctx:
|
||
mock_ctx.return_value.__aenter__ = AsyncMock(return_value=mock_db)
|
||
mock_ctx.return_value.__aexit__ = AsyncMock(return_value=False)
|
||
|
||
result = await agent.check_execution_blast_radius()
|
||
|
||
alerter.alert_governance.assert_not_called()
|
||
assert result["failed"] == 10
|
||
assert result["rate"] == 0.1
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_failure_rate_above_threshold_triggers_alert(self):
|
||
"""失敗比例 > 15% → 觸發告警"""
|
||
# 100 筆,20 筆 False → 20% > 15%
|
||
rows = [True] * 80 + [False] * 20
|
||
|
||
mock_result = MagicMock()
|
||
mock_result.scalars.return_value.all.return_value = rows
|
||
|
||
mock_db = AsyncMock()
|
||
mock_db.execute = AsyncMock(return_value=mock_result)
|
||
|
||
alerter = AsyncMock()
|
||
alerter.alert_governance = AsyncMock()
|
||
agent = _make_agent(alerter=alerter)
|
||
|
||
with patch("src.services.governance_agent.get_db_context") as mock_ctx:
|
||
mock_ctx.return_value.__aenter__ = AsyncMock(return_value=mock_db)
|
||
mock_ctx.return_value.__aexit__ = AsyncMock(return_value=False)
|
||
|
||
result = await agent.check_execution_blast_radius()
|
||
|
||
alerter.alert_governance.assert_called_once()
|
||
call_args = alerter.alert_governance.call_args
|
||
assert call_args[0][0] == "execution_blast_radius"
|
||
assert result["failed"] == 20
|
||
assert result["rate"] == 0.2
|
||
|
||
|
||
# =============================================================================
|
||
# run_self_check — exception 隔離
|
||
# =============================================================================
|
||
|
||
class TestRunSelfCheck:
|
||
"""run_self_check — 全跑 + exception 隔離"""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_all_checks_run_successfully(self):
|
||
"""4 項全部成功 → results 有 4 個 key,無 error 欄位"""
|
||
agent = _make_agent()
|
||
|
||
# 讓 4 個 check 都回傳假資料
|
||
agent.check_trust_drift = AsyncMock(return_value={"checked": 5, "drifted": 0})
|
||
agent.check_knowledge_degradation = AsyncMock(return_value={"total": 10, "stale": 1, "ratio": 0.1})
|
||
agent.check_llm_hallucination = AsyncMock(return_value={"total": 100, "failed": 5, "rate": 0.05})
|
||
agent.check_execution_blast_radius = AsyncMock(return_value={"total": 100, "failed": 8, "rate": 0.08})
|
||
|
||
results = await agent.run_self_check()
|
||
|
||
assert "trust_drift" in results
|
||
assert "knowledge_degradation" in results
|
||
assert "llm_hallucination" in results
|
||
assert "execution_blast_radius" in results
|
||
assert "error" not in results["trust_drift"]
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_one_check_fails_others_still_run(self):
|
||
"""某一項 check 拋例外 → 其他項目仍照常執行,失敗項有 error key"""
|
||
agent = _make_agent()
|
||
|
||
agent.check_trust_drift = AsyncMock(side_effect=RuntimeError("DB connection failed"))
|
||
agent.check_knowledge_degradation = AsyncMock(return_value={"total": 5, "stale": 0, "ratio": 0.0})
|
||
agent.check_llm_hallucination = AsyncMock(return_value={"total": 50, "failed": 2, "rate": 0.04})
|
||
agent.check_execution_blast_radius = AsyncMock(return_value={"total": 50, "failed": 3, "rate": 0.06})
|
||
|
||
results = await agent.run_self_check()
|
||
|
||
# 失敗項有 error
|
||
assert "error" in results["trust_drift"]
|
||
assert "DB connection failed" in results["trust_drift"]["error"]
|
||
|
||
# 其他三項不受影響
|
||
assert results["knowledge_degradation"]["total"] == 5
|
||
assert results["llm_hallucination"]["total"] == 50
|
||
assert results["execution_blast_radius"]["total"] == 50
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_all_checks_fail_returns_all_errors(self):
|
||
"""所有項目全部失敗 → 5 個 key 都有 error(2026-04-27 P3.4 加入 slo_compliance)"""
|
||
agent = _make_agent()
|
||
|
||
for attr in ["check_trust_drift", "check_knowledge_degradation",
|
||
"check_llm_hallucination", "check_execution_blast_radius",
|
||
"check_slo_compliance"]:
|
||
setattr(agent, attr, AsyncMock(side_effect=Exception("mock failure")))
|
||
|
||
results = await agent.run_self_check()
|
||
|
||
assert len(results) == 5
|
||
for key in ["trust_drift", "knowledge_degradation", "llm_hallucination",
|
||
"execution_blast_radius", "slo_compliance"]:
|
||
assert "error" in results[key]
|
||
|
||
|
||
# =============================================================================
|
||
# FailoverAlerter.alert_governance — dedup 邏輯
|
||
# =============================================================================
|
||
|
||
class TestAlertGovernance:
|
||
"""FailoverAlerter.alert_governance — dedup 邏輯"""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_first_call_sends_message(self):
|
||
"""Redis dedup 未命中(第一次)→ 送出告警"""
|
||
from src.services.failover_alerter import FailoverAlerter
|
||
|
||
mock_redis = AsyncMock()
|
||
mock_redis.set = AsyncMock(return_value=True) # SET NX → OK(第一次)
|
||
|
||
alerter = FailoverAlerter(redis_client=mock_redis)
|
||
|
||
with patch.object(alerter, "_send", new_callable=AsyncMock) as mock_send:
|
||
await alerter.alert_governance("trust_drift", {"drifted_count": 2})
|
||
mock_send.assert_called_once()
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_dedup_blocks_second_call(self):
|
||
"""Redis dedup 命中(已送過)→ 不重複發送"""
|
||
from src.services.failover_alerter import FailoverAlerter
|
||
|
||
mock_redis = AsyncMock()
|
||
mock_redis.set = AsyncMock(return_value=None) # SET NX → None(已存在)
|
||
|
||
alerter = FailoverAlerter(redis_client=mock_redis)
|
||
|
||
with patch.object(alerter, "_send", new_callable=AsyncMock) as mock_send:
|
||
await alerter.alert_governance("trust_drift", {"drifted_count": 2})
|
||
mock_send.assert_not_called()
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_different_event_types_independent_dedup(self):
|
||
"""不同 event_type 的 dedup key 互相獨立"""
|
||
from src.services.failover_alerter import FailoverAlerter
|
||
|
||
call_count = 0
|
||
set_keys = []
|
||
|
||
async def mock_set(key, value, ex, nx):
|
||
nonlocal call_count
|
||
call_count += 1
|
||
set_keys.append(key)
|
||
return True # 永遠是第一次
|
||
|
||
mock_redis = AsyncMock()
|
||
mock_redis.set = mock_set
|
||
|
||
alerter = FailoverAlerter(redis_client=mock_redis)
|
||
|
||
with patch.object(alerter, "_send", new_callable=AsyncMock):
|
||
await alerter.alert_governance("trust_drift", {})
|
||
await alerter.alert_governance("llm_hallucination", {})
|
||
|
||
assert call_count == 2
|
||
assert any("trust_drift" in k for k in set_keys)
|
||
assert any("llm_hallucination" in k for k in set_keys)
|
||
|
||
|
||
# =============================================================================
|
||
# B8 — run_self_check 全失敗聚合告警
|
||
# 2026-04-27 Wave8-X3 by Claude — governance silent failure alert
|
||
# =============================================================================
|
||
|
||
class TestRunSelfCheckGlobalFailureAlert:
|
||
"""≥3 項 check 失敗時必須送出 governance_self_failure 告警。"""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_three_checks_fail_triggers_governance_self_failure_alert(self):
|
||
"""3 項失敗 → 觸發 governance_self_failure 告警"""
|
||
alerter = AsyncMock()
|
||
alerter.alert_governance = AsyncMock()
|
||
agent = _make_agent(alerter=alerter)
|
||
|
||
agent.check_trust_drift = AsyncMock(side_effect=Exception("db error 1"))
|
||
agent.check_knowledge_degradation = AsyncMock(side_effect=Exception("db error 2"))
|
||
agent.check_llm_hallucination = AsyncMock(side_effect=Exception("db error 3"))
|
||
agent.check_execution_blast_radius = AsyncMock(return_value={"total": 10, "failed": 0, "rate": 0.0})
|
||
|
||
with patch("src.services.governance_agent.get_db_context") as mock_ctx:
|
||
mock_ctx.return_value.__aenter__ = AsyncMock(return_value=AsyncMock())
|
||
mock_ctx.return_value.__aexit__ = AsyncMock(return_value=False)
|
||
|
||
results = await agent.run_self_check()
|
||
|
||
# _alert 是透過 alerter.alert_governance 發送的
|
||
# 驗證 governance_self_failure 有被呼叫
|
||
calls = [call[0][0] for call in alerter.alert_governance.call_args_list]
|
||
assert "governance_self_failure" in calls
|
||
|
||
# 失敗的 3 項都有 error
|
||
for key in ["trust_drift", "knowledge_degradation", "llm_hallucination"]:
|
||
assert "error" in results[key]
|
||
# 成功的 1 項無 error
|
||
assert "error" not in results["execution_blast_radius"]
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_all_four_checks_fail_triggers_alert_with_four_failed(self):
|
||
"""5 項全失敗 → governance_self_failure 告警的 failed_checks 包含全部 5 個(2026-04-27 P3.4 加 slo_compliance)"""
|
||
alerter = AsyncMock()
|
||
alerter.alert_governance = AsyncMock()
|
||
agent = _make_agent(alerter=alerter)
|
||
|
||
for attr in ["check_trust_drift", "check_knowledge_degradation",
|
||
"check_llm_hallucination", "check_execution_blast_radius",
|
||
"check_slo_compliance"]:
|
||
setattr(agent, attr, AsyncMock(side_effect=Exception("all down")))
|
||
|
||
with patch("src.services.governance_agent.get_db_context") as mock_ctx:
|
||
mock_ctx.return_value.__aenter__ = AsyncMock(return_value=AsyncMock())
|
||
mock_ctx.return_value.__aexit__ = AsyncMock(return_value=False)
|
||
|
||
await agent.run_self_check()
|
||
|
||
calls = alerter.alert_governance.call_args_list
|
||
governance_failure_calls = [c for c in calls if c[0][0] == "governance_self_failure"]
|
||
assert len(governance_failure_calls) >= 1
|
||
|
||
# 2026-05-03 Claude Opus 4.7 + 統帥 ogt:對齊 governance_agent.py:604-624 的嵌套 payload structure
|
||
# (critic M6 修:{status, impact, remediation, actionable}),原本直接讀 payload["total_checks"] 會 KeyError
|
||
payload = governance_failure_calls[0][0][1]
|
||
assert payload["impact"]["total_checks"] == 5
|
||
assert len(payload["impact"]["failed_checks"]) == 5
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_two_checks_fail_does_not_trigger_governance_self_failure(self):
|
||
"""僅 2 項失敗 → 不觸發 governance_self_failure(不足 3 項門檻)"""
|
||
alerter = AsyncMock()
|
||
alerter.alert_governance = AsyncMock()
|
||
agent = _make_agent(alerter=alerter)
|
||
|
||
agent.check_trust_drift = AsyncMock(side_effect=Exception("err"))
|
||
agent.check_knowledge_degradation = AsyncMock(side_effect=Exception("err"))
|
||
agent.check_llm_hallucination = AsyncMock(return_value={"total": 10, "failed": 0, "rate": 0.0})
|
||
agent.check_execution_blast_radius = AsyncMock(return_value={"total": 10, "failed": 0, "rate": 0.0})
|
||
|
||
with patch("src.services.governance_agent.get_db_context") as mock_ctx:
|
||
mock_ctx.return_value.__aenter__ = AsyncMock(return_value=AsyncMock())
|
||
mock_ctx.return_value.__aexit__ = AsyncMock(return_value=False)
|
||
|
||
await agent.run_self_check()
|
||
|
||
calls = [c[0][0] for c in alerter.alert_governance.call_args_list]
|
||
assert "governance_self_failure" not in calls
|
||
|
||
|
||
class _FakePrometheusResponse:
|
||
def __init__(self, value: str) -> None:
|
||
self._value = value
|
||
|
||
def json(self) -> dict[str, Any]:
|
||
return {
|
||
"status": "success",
|
||
"data": {"result": [{"value": [1778756604, self._value]}]},
|
||
}
|
||
|
||
|
||
class _FakePrometheusClient:
|
||
def __init__(self, value: str) -> None:
|
||
self._value = value
|
||
self.queries: list[str] = []
|
||
|
||
async def __aenter__(self):
|
||
return self
|
||
|
||
async def __aexit__(self, exc_type, exc, tb):
|
||
return False
|
||
|
||
async def get(self, *args, **kwargs): # noqa: ANN002, ANN003
|
||
self.queries.append(str(kwargs.get("params", {}).get("query", "")))
|
||
return _FakePrometheusResponse(self._value)
|
||
|
||
|
||
class TestCheckSloCompliance:
|
||
@pytest.mark.asyncio
|
||
async def test_non_finite_prometheus_value_is_skipped_not_ok(self):
|
||
"""Prometheus NaN 代表分母暫無有效事件,不可被治理層誤判為 ok."""
|
||
agent = _make_agent()
|
||
|
||
with patch("httpx.AsyncClient", return_value=_FakePrometheusClient("NaN")):
|
||
result = await agent.check_slo_compliance()
|
||
|
||
for name in (
|
||
"autonomy_rate",
|
||
"decision_accuracy",
|
||
"confidence_calibration",
|
||
"km_growth_rate",
|
||
):
|
||
assert result[name]["status"] == "skipped"
|
||
assert result[name]["reason"] == "prometheus_nan_or_inf"
|
||
assert result["_meta"]["status"] == "no_data"
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_km_growth_prefers_db_derived_24h_gauge(self):
|
||
"""KM SLO 要優先使用 DB 24h gauge,避免新 counter 暖機時誤報 0."""
|
||
agent = _make_agent()
|
||
client = _FakePrometheusClient("25")
|
||
|
||
with patch("httpx.AsyncClient", return_value=client):
|
||
result = await agent.check_slo_compliance()
|
||
|
||
assert "max(knowledge_entries_created_24h) or max(sli:km_growth_rate:24h)" in client.queries
|
||
assert result["km_growth_rate"]["status"] == "ok"
|
||
assert result["km_growth_rate"]["value"] == 25
|