Files
awoooi/apps/api/tests/test_ai_governance_endpoints.py
Your Name 739a8e0f78
All checks were successful
Code Review / ai-code-review (push) Successful in 11s
CD Pipeline / tests (push) Successful in 3m35s
CD Pipeline / build-and-deploy (push) Successful in 3m50s
CD Pipeline / post-deploy-checks (push) Successful in 1m42s
feat(governance): link work items to event history
2026-05-20 11:03:52 +08:00

894 lines
34 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# apps/api/tests/test_ai_governance_endpoints.py | 2026-05-02 @ Asia/Taipei
"""
Unit Tests — AI Governance Endpoints (PR 1)
覆蓋範圍:
1. events endpoint 分頁邏輯正確
2. events endpoint severity 映射正確critical / warning / info
3. queue endpoint graceful fallbackmock ProgrammingError
4. summary endpoint compliance_rate 計算(含 total=0 邊界)
5. summary endpoint compliance_rate 計算(有 unresolved 的正常情況)
測試策略mock service 層函式,不依賴 DB確保 Router 邏輯正確。
"""
from __future__ import annotations
from datetime import datetime, timedelta, timezone
from unittest.mock import AsyncMock, patch
import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
from src.api.v1.ai_governance import router
from src.models.governance import (
DailyCount,
DispatchItem,
GovernanceEvent,
GovernanceEventsResponse,
GovernanceQueueResponse,
GovernanceSummaryResponse,
KnowledgeReviewDraftArchiveRequest,
KnowledgeReviewDraftArchiveResponse,
KnowledgeReviewDraftDedupeGroup,
KnowledgeReviewDraftDedupeResponse,
KnowledgeReviewDraftStaleRatioSnapshot,
map_severity,
)
from src.services.governance_km_review_service import (
KmReviewDraftArchiveError,
_build_dry_run_plan_fingerprint,
_build_stale_ratio_recheck_context,
_validate_archive_request_against_plan,
_validate_dry_run_plan_fingerprint,
)
from src.services.governance_query_service import (
_build_km_review_draft_dedupe_groups,
_extract_archived_count,
_extract_dry_run_plan_fingerprint,
_extract_governance_event_id_from_tags,
_extract_kb_draft_entry_id,
_extract_remediation,
_extract_stale_ratio_snapshot,
_extract_worker_status,
_merge_dispatch_ids,
_query_dispatch_table,
_to_governance_event,
)
TAIPEI = timezone(timedelta(hours=8))
NOW = datetime(2026, 5, 2, 12, 0, tzinfo=TAIPEI)
# =============================================================================
# Fixture
# =============================================================================
@pytest.fixture
def client():
app = FastAPI()
app.include_router(router, prefix="/api/v1")
return TestClient(app)
def _make_event(
event_id: str = "evt-001",
event_type: str = "slo_violation",
resolved: bool = False,
) -> GovernanceEvent:
return GovernanceEvent(
id=event_id,
event_type=event_type,
severity=map_severity(event_type),
triggered_at=NOW,
resolved=resolved,
resolved_at=None,
impact="SLO violated",
details={"message": "test"},
remediation=None,
dispatch_ids=[],
)
# =============================================================================
# 1. severity 映射單元測試
# =============================================================================
class TestSeverityMapping:
def test_critical_types(self):
for et in ("slo_violation", "conservative_mode", "governance_slo_data_gap"):
assert map_severity(et) == "critical", f"{et} should be critical"
def test_warning_types(self):
for et in ("trust_drift", "kb_stale", "knowledge_degradation", "execution_blast_radius"):
assert map_severity(et) == "warning", f"{et} should be warning"
def test_info_types(self):
for et in ("replay_degraded", "self_demotion", "llm_hallucination", "unknown_event"):
assert map_severity(et) == "info", f"{et} should be info"
# =============================================================================
# 2. events endpoint 分頁
# =============================================================================
class TestEventsEndpoint:
def test_pagination_default(self, client):
"""page=1 size=20 預設分頁正確."""
fake_response = GovernanceEventsResponse(
items=[_make_event(str(i)) for i in range(5)],
total=5,
page=1,
size=20,
)
with patch(
"src.api.v1.ai_governance.query_governance_events",
new_callable=lambda: lambda **kw: None,
):
with patch(
"src.api.v1.ai_governance.query_governance_events",
new=AsyncMock(return_value=fake_response),
):
r = client.get("/api/v1/ai/governance/events")
assert r.status_code == 200
data = r.json()
assert data["total"] == 5
assert data["page"] == 1
assert data["size"] == 20
assert len(data["items"]) == 5
def test_pagination_custom(self, client):
"""自訂分頁參數傳入 service."""
fake_response = GovernanceEventsResponse(
items=[_make_event()],
total=50,
page=3,
size=10,
)
captured: dict = {}
async def mock_query(**kwargs):
captured.update(kwargs)
return fake_response
with patch("src.api.v1.ai_governance.query_governance_events", new=mock_query):
r = client.get("/api/v1/ai/governance/events?page=3&size=10")
assert r.status_code == 200
assert captured["page"] == 3
assert captured["size"] == 10
data = r.json()
assert data["total"] == 50
def test_severity_filter_passed(self, client):
"""severity query param 正確傳入 service."""
fake_response = GovernanceEventsResponse(items=[], total=0, page=1, size=20)
captured: dict = {}
async def mock_query(**kwargs):
captured.update(kwargs)
return fake_response
with patch("src.api.v1.ai_governance.query_governance_events", new=mock_query):
r = client.get("/api/v1/ai/governance/events?severity=critical")
assert r.status_code == 200
assert captured["severity"] == "critical"
def test_event_id_filter_passed(self, client):
"""event_id query param 供 Telegram 詳情 / 歷史精準回看."""
fake_response = GovernanceEventsResponse(items=[], total=0, page=1, size=20)
captured: dict = {}
async def mock_query(**kwargs):
captured.update(kwargs)
return fake_response
with patch("src.api.v1.ai_governance.query_governance_events", new=mock_query):
r = client.get(
"/api/v1/ai/governance/events"
"?event_id=evt-001&event_id=evt-002&status=unresolved"
)
assert r.status_code == 200
assert captured["event_ids"] == ["evt-001", "evt-002"]
assert captured["status"] == "unresolved"
def test_invalid_severity_rejected(self, client):
"""非法 severity 值應被拒絕422."""
r = client.get("/api/v1/ai/governance/events?severity=bad_value")
assert r.status_code == 422
def test_invalid_status_rejected(self, client):
"""非法 status 值應被拒絕422."""
r = client.get("/api/v1/ai/governance/events?status=invalid")
assert r.status_code == 422
def test_severity_in_response(self, client):
"""回傳的事件 severity 欄位對應 event_type 映射."""
events = [
_make_event("e1", "slo_violation"), # critical
_make_event("e2", "trust_drift"), # warning
_make_event("e3", "self_demotion"), # info
]
fake_response = GovernanceEventsResponse(items=events, total=3, page=1, size=20)
with patch(
"src.api.v1.ai_governance.query_governance_events",
new=AsyncMock(return_value=fake_response),
):
r = client.get("/api/v1/ai/governance/events")
assert r.status_code == 200
items = r.json()["items"]
assert items[0]["severity"] == "critical"
assert items[1]["severity"] == "warning"
assert items[2]["severity"] == "info"
class TestEventsReadSideNormalization:
def test_remediation_dict_is_normalized_to_string(self):
"""production details.remediation 可能是 dictresponse schema 必須仍回字串."""
remediation = _extract_remediation({
"remediation": {
"items": [
"補齊 ADR-100 SLO emitter",
"設置 PROMETHEUS_MULTIPROC_DIR",
]
}
})
assert remediation == "補齊 ADR-100 SLO emitter設置 PROMETHEUS_MULTIPROC_DIR"
def test_governance_event_accepts_dict_remediation(self):
"""dict remediation 不應讓 GovernanceEvent Pydantic validation 變成 500."""
row = type("Row", (), {
"id": "evt-001",
"event_type": "governance_slo_data_gap",
"triggered_at": NOW,
"resolved": False,
"resolved_at": None,
"details": {
"message": "SLO metrics missing",
"remediation": {"items": ["補齊 SLO emitter"]},
},
})()
event = _to_governance_event(row)
assert event.remediation == "補齊 SLO emitter"
assert event.impact == "SLO metrics missing"
def test_governance_event_uses_db_dispatch_ids_first(self):
"""events read model 應以 dispatch table ids 補齊 detail/history 鏈路。"""
row = type("Row", (), {
"id": "evt-001",
"event_type": "knowledge_degradation",
"triggered_at": NOW,
"resolved": False,
"resolved_at": None,
"details": {
"message": "KM stale",
"dispatch_ids": ["legacy-dispatch"],
},
})()
event = _to_governance_event(row, dispatch_ids=["db-dispatch", "legacy-dispatch"])
assert event.dispatch_ids == ["db-dispatch", "legacy-dispatch"]
def test_merge_dispatch_ids_deduplicates_and_preserves_db_priority(self):
"""DB truth-firstlegacy payload 只作 fallback。"""
assert _merge_dispatch_ids(
["db-2", "db-1"],
["db-1", "legacy-1", None],
) == ["db-2", "db-1", "legacy-1"]
def test_events_query_joins_dispatch_table_for_history_buttons(self):
"""events endpoint 不可只讀 details.dispatch_ids必須查 dispatch table。"""
import inspect
from src.services import governance_query_service
source = inspect.getsource(governance_query_service._load_dispatch_ids_for_events)
assert "FROM governance_remediation_dispatch d" in source
assert "d.governance_event_id IN :event_ids" in source
assert "ORDER BY d.dispatched_at DESC" in source
# =============================================================================
# 3. queue endpoint graceful fallback
# =============================================================================
class TestQueueEndpoint:
def test_graceful_fallback_on_programming_error(self, client):
"""dispatch 表不存在時回 table_pending=true不拋 500."""
fallback = GovernanceQueueResponse(
items=[], total=0, page=1, size=10, table_pending=True,
)
with patch(
"src.api.v1.ai_governance.query_governance_queue",
new=AsyncMock(return_value=fallback),
):
r = client.get("/api/v1/ai/governance/queue")
assert r.status_code == 200
data = r.json()
assert data["table_pending"] is True
assert data["items"] == []
assert data["total"] == 0
def test_normal_response_when_table_ready(self, client):
"""表就緒時正常回傳 items."""
dispatch_item = DispatchItem(
id="d-001",
governance_event_id="evt-001",
event_type="slo_violation",
dispatch_status="pending",
proposed_action="restart deployment",
playbook_id=None,
playbook_trust=None,
created_at=NOW,
dispatched_at=None,
completed_at=None,
operator_note=None,
)
normal = GovernanceQueueResponse(
items=[dispatch_item], total=1, page=1, size=10, table_pending=False,
)
with patch(
"src.api.v1.ai_governance.query_governance_queue",
new=AsyncMock(return_value=normal),
):
r = client.get("/api/v1/ai/governance/queue")
assert r.status_code == 200
data = r.json()
assert data["table_pending"] is False
assert len(data["items"]) == 1
assert data["items"][0]["dispatch_status"] == "pending"
def test_invalid_dispatch_status_rejected(self, client):
"""非法 dispatch_status 應被拒絕422."""
r = client.get("/api/v1/ai/governance/queue?dispatch_status=unknown")
assert r.status_code == 422
def test_all_status_and_event_type_filter_passed(self, client):
"""Work Items 需要 all + event_type 查 KM healthcheck 全狀態鏈。"""
normal = GovernanceQueueResponse(
items=[], total=0, page=1, size=20, table_pending=False,
)
captured: dict = {}
async def mock_query(**kwargs):
captured.update(kwargs)
return normal
with patch("src.api.v1.ai_governance.query_governance_queue", new=mock_query):
r = client.get(
"/api/v1/ai/governance/queue?dispatch_status=all"
"&event_type=knowledge_degradation&size=20"
)
assert r.status_code == 200
assert captured["dispatch_status"] == "all"
assert captured["event_types"] == ["knowledge_degradation"]
assert captured["size"] == 20
def test_queue_item_exposes_kb_draft_review_context(self, client):
"""Work Items 需要看到 Hermes 草稿 ID 與 worker 狀態,而不只 dispatch succeeded。"""
item = DispatchItem(
id="dispatch-001",
governance_event_id="event-001",
event_type="knowledge_degradation",
dispatch_status="succeeded",
executor_type="hermes_kb_growth_healthcheck",
proposed_action="Hermes 已建立 KM 更新草稿,等待 owner 審核",
created_at=NOW,
workflow_stage="waiting_owner_review",
next_action="owner_review_km_draft",
lead_agent="Hermes",
human_owner="KM owner / SRE owner",
kb_draft_entry_id="km-001",
worker_status="draft_created",
)
normal = GovernanceQueueResponse(
items=[item], total=1, page=1, size=20, table_pending=False,
)
with patch(
"src.api.v1.ai_governance.query_governance_queue",
new=AsyncMock(return_value=normal),
):
r = client.get(
"/api/v1/ai/governance/queue?dispatch_status=all"
"&event_type=knowledge_degradation&size=20"
)
assert r.status_code == 200
data = r.json()
assert data["items"][0]["kb_draft_entry_id"] == "km-001"
assert data["items"][0]["worker_status"] == "draft_created"
def test_extract_kb_draft_review_context_from_decision_context(self):
"""queue read model 從 workflow / worker_result 補出 KM 草稿追蹤欄位。"""
context = {
"workflow": {"kb_draft_entry_id": "km-workflow"},
"worker_result": {
"km_draft_entry_id": "km-worker",
"status": "draft_created",
},
}
assert _extract_kb_draft_entry_id(context) == "km-workflow"
assert _extract_worker_status(context) == "draft_created"
def test_extract_km_archive_audit_context_from_decision_context(self):
"""queue read model 應把 archive audit / stale ratio recheck 證據交給 Work Items。"""
context = {
"archived_count": 5,
"dry_run_plan_fingerprint": "sha256:" + "a" * 64,
"stale_ratio_snapshot": {
"stale_count": 120,
"total_count": 200,
"stale_ratio": 0.6,
"threshold": 0.2,
"stale_days": 7,
"extra": "ignored",
},
"workflow": {
"archived_entry_ids": ["fallback"],
"dry_run_plan_fingerprint": "sha256:" + "b" * 64,
},
}
assert _extract_dry_run_plan_fingerprint(context) == "sha256:" + "a" * 64
assert _extract_archived_count(context) == 5
snapshot = _extract_stale_ratio_snapshot(context)
assert snapshot == {
"stale_count": 120,
"total_count": 200,
"stale_ratio": 0.6,
"threshold": 0.2,
"stale_days": 7,
}
def test_queue_query_uses_production_dispatch_schema(self):
"""queue 查詢必須對齊 migration schema使用 dispatched_at不讀不存在的 created_at/operator_note."""
import inspect
source = inspect.getsource(_query_dispatch_table)
assert "d.dispatched_at AS created_at" in source
assert "ORDER BY d.dispatched_at DESC" in source
assert "NULL::text AS operator_note" in source
assert "CAST(:dispatch_status AS governance_dispatch_status)" in source
assert "e.event_type::text IN :event_types" in source
assert "d.created_at" not in source
assert "d.operator_note" not in source
class TestKmReviewDraftDedupe:
def test_endpoint_passes_limit_and_returns_owner_review_plan(self, client):
"""Work Items 需要 read-only canonical/duplicate plan不直接封存 KM。"""
fake = KnowledgeReviewDraftDedupeResponse(
total_review_drafts=3,
event_group_total=1,
duplicate_draft_total=2,
generated_at=NOW,
groups=[
KnowledgeReviewDraftDedupeGroup(
governance_event_id="event-001",
canonical_entry_id="km-canonical",
canonical_title="KM healthcheck review draft - event",
preferred_source="dispatch_context",
duplicate_entry_ids=["km-dup-1", "km-dup-2"],
duplicate_count=2,
total_entries=3,
suggested_action="owner_review_canonical_then_archive_duplicates",
owner_action="review_canonical_and_archive_duplicate_drafts",
writes_on_read=False,
can_archive_without_owner_approval=False,
archive_history=[
DispatchItem(
id="dispatch-archive-001",
governance_event_id="event-001",
event_type="knowledge_degradation",
dispatch_status="succeeded",
executor_type="hermes_km_review_dedupe_owner_archive",
proposed_action="Owner archived duplicate KM drafts",
created_at=NOW,
workflow_stage="km_duplicate_archive_after_owner_approval",
dry_run_plan_fingerprint="sha256:" + "a" * 64,
archived_count=2,
)
],
)
],
)
captured: dict = {}
async def mock_query(**kwargs):
captured.update(kwargs)
return fake
with patch("src.api.v1.ai_governance.query_km_review_draft_dedupe", new=mock_query):
r = client.get("/api/v1/ai/governance/km-review-drafts/dedupe?limit=50")
assert r.status_code == 200
assert captured["limit"] == 50
data = r.json()
assert data["duplicate_draft_total"] == 2
assert data["groups"][0]["canonical_entry_id"] == "km-canonical"
assert data["groups"][0]["writes_on_read"] is False
assert data["groups"][0]["can_archive_without_owner_approval"] is False
assert data["groups"][0]["archive_history"][0]["executor_type"] == (
"hermes_km_review_dedupe_owner_archive"
)
assert data["groups"][0]["archive_history"][0]["archived_count"] == 2
def test_governance_event_tag_extraction(self):
assert _extract_governance_event_id_from_tags([
"agent:Hermes",
"governance_event:event-001",
]) == "event-001"
assert _extract_governance_event_id_from_tags(["agent:Hermes"]) is None
def test_build_dedupe_groups_prefers_dispatch_context_draft(self):
"""若 dispatch context 指出 canonical draftdedupe plan 應以它為準。"""
rows = [
{
"id": "km-latest",
"title": "latest draft",
"tags": ["governance_event:event-001"],
"updated_at": datetime(2026, 5, 2, 14, 0, tzinfo=TAIPEI),
},
{
"id": "km-canonical",
"title": "canonical draft",
"tags": ["governance_event:event-001"],
"updated_at": datetime(2026, 5, 2, 13, 0, tzinfo=TAIPEI),
},
{
"id": "km-other",
"title": "other event draft",
"tags": ["governance_event:event-002"],
"updated_at": datetime(2026, 5, 2, 12, 0, tzinfo=TAIPEI),
},
]
groups = _build_km_review_draft_dedupe_groups(
rows,
{"event-001": "km-canonical"},
{
"event-001": [
DispatchItem(
id="dispatch-recheck-001",
governance_event_id="event-001",
event_type="knowledge_degradation",
dispatch_status="succeeded",
executor_type="hermes_km_stale_ratio_recheck",
proposed_action="Rechecked stale ratio",
created_at=NOW,
workflow_stage="stale_ratio_recheck",
stale_ratio_snapshot={
"stale_count": 10,
"total_count": 100,
"stale_ratio": 0.1,
"threshold": 0.2,
"stale_days": 7,
},
)
]
},
)
first = groups[0]
assert first.governance_event_id == "event-001"
assert first.canonical_entry_id == "km-canonical"
assert first.preferred_source == "dispatch_context"
assert first.duplicate_entry_ids == ["km-latest"]
assert first.writes_on_read is False
assert first.archive_history[0].executor_type == "hermes_km_stale_ratio_recheck"
assert first.archive_history[0].stale_ratio_snapshot["stale_ratio"] == pytest.approx(0.1)
def test_archive_endpoint_requires_owner_shape_and_returns_audit_result(self, client):
"""Owner 批准後的 archive endpoint 應回傳 KM write 與 audit write 結果。"""
fake = KnowledgeReviewDraftArchiveResponse(
governance_event_id="event-001",
canonical_entry_id="km-canonical",
requested_duplicate_entry_ids=["km-dup-1", "km-dup-2"],
archived_entry_ids=["km-dup-1", "km-dup-2"],
status="archived",
owner="operator_console",
owner_approved=True,
dry_run=False,
writes_km=True,
writes_governance_audit=True,
audit_dispatch_id="dispatch-audit-001",
stale_ratio_snapshot=KnowledgeReviewDraftStaleRatioSnapshot(
stale_count=120,
total_count=200,
stale_ratio=0.6,
threshold=0.2,
stale_days=7,
),
stale_ratio_recheck_status="completed",
stale_ratio_recheck_dispatch_id="dispatch-recheck-001",
generated_at=NOW,
)
captured: dict = {}
async def mock_archive(**kwargs):
captured.update(kwargs)
return fake
with patch(
"src.api.v1.ai_governance.archive_km_review_draft_duplicates",
new=mock_archive,
):
r = client.post(
"/api/v1/ai/governance/km-review-drafts/dedupe/event-001/archive-duplicates",
json={
"canonical_entry_id": "km-canonical",
"duplicate_entry_ids": ["km-dup-1", "km-dup-2"],
"owner": "operator_console",
"owner_approved": True,
"dry_run": False,
"dry_run_plan_fingerprint": "sha256:" + "a" * 64,
},
)
assert r.status_code == 200
assert captured["governance_event_id"] == "event-001"
assert captured["request"].owner_approved is True
assert captured["request"].dry_run_plan_fingerprint == "sha256:" + "a" * 64
data = r.json()
assert data["status"] == "archived"
assert data["writes_km"] is True
assert data["writes_governance_audit"] is True
assert data["audit_dispatch_id"] == "dispatch-audit-001"
assert data["stale_ratio_recheck_status"] == "completed"
assert data["stale_ratio_recheck_dispatch_id"] == "dispatch-recheck-001"
assert data["stale_ratio_snapshot"]["stale_ratio"] == pytest.approx(0.6)
def test_archive_endpoint_maps_validation_error_to_http(self, client):
async def mock_archive(**kwargs):
raise KmReviewDraftArchiveError(409, "stale plan")
with patch(
"src.api.v1.ai_governance.archive_km_review_draft_duplicates",
new=mock_archive,
):
r = client.post(
"/api/v1/ai/governance/km-review-drafts/dedupe/event-001/archive-duplicates",
json={
"canonical_entry_id": "km-canonical",
"duplicate_entry_ids": ["km-dup-1"],
"owner_approved": True,
},
)
assert r.status_code == 409
assert r.json()["detail"] == "stale plan"
def test_archive_plan_validation_rejects_stale_duplicates(self):
group = KnowledgeReviewDraftDedupeGroup(
governance_event_id="event-001",
canonical_entry_id="km-canonical",
canonical_title="canonical",
preferred_source="dispatch_context",
duplicate_entry_ids=["km-dup-1", "km-dup-2"],
duplicate_count=2,
total_entries=3,
suggested_action="owner_review_canonical_then_archive_duplicates",
owner_action="review_canonical_and_archive_duplicate_drafts",
writes_on_read=False,
can_archive_without_owner_approval=False,
)
request = KnowledgeReviewDraftArchiveRequest(
canonical_entry_id="km-canonical",
duplicate_entry_ids=["km-dup-1"],
owner_approved=True,
)
with pytest.raises(KmReviewDraftArchiveError) as exc:
_validate_archive_request_against_plan(group, request, ["km-dup-1"])
assert exc.value.status_code == 409
assert "latest dedupe plan" in exc.value.detail
def test_archive_plan_fingerprint_is_required_before_write(self):
group = KnowledgeReviewDraftDedupeGroup(
governance_event_id="event-001",
canonical_entry_id="km-canonical",
canonical_title="canonical",
preferred_source="dispatch_context",
duplicate_entry_ids=["km-dup-2", "km-dup-1"],
duplicate_count=2,
total_entries=3,
suggested_action="owner_review_canonical_then_archive_duplicates",
owner_action="review_canonical_and_archive_duplicate_drafts",
writes_on_read=False,
can_archive_without_owner_approval=False,
)
fingerprint = _build_dry_run_plan_fingerprint("event-001", group)
assert fingerprint.startswith("sha256:")
assert fingerprint == _build_dry_run_plan_fingerprint("event-001", group)
_validate_dry_run_plan_fingerprint(fingerprint, fingerprint)
with pytest.raises(KmReviewDraftArchiveError) as missing:
_validate_dry_run_plan_fingerprint(None, fingerprint)
assert missing.value.status_code == 403
assert "dry_run_plan_fingerprint" in missing.value.detail
with pytest.raises(KmReviewDraftArchiveError) as mismatch:
_validate_dry_run_plan_fingerprint("sha256:" + "0" * 64, fingerprint)
assert mismatch.value.status_code == 409
assert "latest dedupe plan" in mismatch.value.detail
def test_stale_ratio_recheck_context_is_operator_visible(self):
snapshot = KnowledgeReviewDraftStaleRatioSnapshot(
stale_count=120,
total_count=200,
stale_ratio=0.6,
threshold=0.2,
stale_days=7,
)
request = KnowledgeReviewDraftArchiveRequest(
canonical_entry_id="km-canonical",
duplicate_entry_ids=["km-dup-1", "km-dup-2"],
owner="operator_console",
owner_approved=True,
dry_run_plan_fingerprint="sha256:" + "a" * 64,
)
ctx = _build_stale_ratio_recheck_context(
governance_event_id="event-001",
request=request,
archived_ids=["km-dup-1", "km-dup-2"],
stale_ratio_snapshot=snapshot,
)
assert ctx["next_action"] == "run_stale_ratio_recheck"
assert ctx["decision_path"] == "owner_approved_recheck_after_archive"
assert ctx["workflow"]["work_kind"] == "km_stale_ratio_recheck"
assert ctx["workflow"]["current_stage"] == "stale_ratio_recheck"
assert ctx["workflow"]["stage_by_dispatch_status"]["pending"] == "stale_ratio_recheck"
assert ctx["workflow"]["writes_km"] is False
assert ctx["workflow"]["dry_run_plan_fingerprint"] == "sha256:" + "a" * 64
assert ctx["workflow"]["stale_ratio_snapshot"]["stale_ratio"] == pytest.approx(0.6)
assert ctx["worker_result"]["status"] == "stale_ratio_rechecked"
assert ctx["worker_result"]["above_threshold"] is True
# =============================================================================
# 4. summary endpoint compliance_rate
# =============================================================================
class TestSummaryEndpoint:
def test_compliance_rate_normal(self, client):
"""有 unresolved 時計算 1 - unresolved/total."""
fake = GovernanceSummaryResponse(
compliance_rate=0.8,
total_events=10,
unresolved_count=2,
daily_counts=[],
)
with patch(
"src.api.v1.ai_governance.query_governance_summary",
new=AsyncMock(return_value=fake),
):
r = client.get("/api/v1/ai/governance/summary")
assert r.status_code == 200
data = r.json()
assert data["compliance_rate"] == pytest.approx(0.8)
assert data["total_events"] == 10
assert data["unresolved_count"] == 2
def test_compliance_rate_all_resolved(self, client):
"""全部已解決時 compliance_rate = 1.0."""
fake = GovernanceSummaryResponse(
compliance_rate=1.0,
total_events=5,
unresolved_count=0,
daily_counts=[],
)
with patch(
"src.api.v1.ai_governance.query_governance_summary",
new=AsyncMock(return_value=fake),
):
r = client.get("/api/v1/ai/governance/summary?days=7")
assert r.status_code == 200
assert r.json()["compliance_rate"] == pytest.approx(1.0)
def test_compliance_rate_total_zero(self, client):
"""total_events=0 時 compliance_rate = 1.0(邊界測試)."""
fake = GovernanceSummaryResponse(
compliance_rate=1.0,
total_events=0,
unresolved_count=0,
daily_counts=[],
)
with patch(
"src.api.v1.ai_governance.query_governance_summary",
new=AsyncMock(return_value=fake),
):
r = client.get("/api/v1/ai/governance/summary")
assert r.status_code == 200
data = r.json()
assert data["compliance_rate"] == pytest.approx(1.0)
assert data["total_events"] == 0
def test_days_max_boundary(self, client):
"""days=90 邊界值應被接受."""
fake = GovernanceSummaryResponse(
compliance_rate=1.0, total_events=0, unresolved_count=0, daily_counts=[],
)
with patch(
"src.api.v1.ai_governance.query_governance_summary",
new=AsyncMock(return_value=fake),
):
r = client.get("/api/v1/ai/governance/summary?days=90")
assert r.status_code == 200
def test_days_over_max_rejected(self, client):
"""days=91 應被拒絕422."""
r = client.get("/api/v1/ai/governance/summary?days=91")
assert r.status_code == 422
def test_daily_counts_structure(self, client):
"""daily_counts 結構正確."""
fake = GovernanceSummaryResponse(
compliance_rate=0.9,
total_events=10,
unresolved_count=1,
daily_counts=[
DailyCount(date="2026-05-01", total=3, by_type={"slo_violation": 2, "trust_drift": 1}),
DailyCount(date="2026-05-02", total=7, by_type={"slo_violation": 7}),
],
)
with patch(
"src.api.v1.ai_governance.query_governance_summary",
new=AsyncMock(return_value=fake),
):
r = client.get("/api/v1/ai/governance/summary")
assert r.status_code == 200
counts = r.json()["daily_counts"]
assert len(counts) == 2
assert counts[0]["date"] == "2026-05-01"
assert counts[0]["by_type"]["slo_violation"] == 2
# =============================================================================
# 5. service 層 compliance_rate 純函式測試(不經 HTTP
# =============================================================================
class TestComplianceRateCalculation:
"""直接測試 service 邏輯,不經 Router。"""
def test_formula_normal(self):
"""1 - 2/10 = 0.8"""
rate = round(1.0 - 2 / 10, 4)
assert rate == pytest.approx(0.8)
def test_formula_zero_total(self):
"""total=0 → 1.0"""
total = 0
rate = 1.0 if total == 0 else round(1.0 - 0 / total, 4)
assert rate == pytest.approx(1.0)
def test_formula_all_unresolved(self):
"""1 - 5/5 = 0.0"""
rate = round(1.0 - 5 / 5, 4)
assert rate == pytest.approx(0.0)