Files
awoooi/apps/api/tests/test_km_writer.py
Your Name c5753e1c57 fix(critic-review): KMWriter 名實統一 + Alertmanager 修抑制 + drift checker AST 化
critic PR review 揭示已 push commits 的 7 個 blocker,本 commit 全部修復。

## C1 + C2 + M1 + M2 + M3 — KMWriter 真正統一契約(critic 最嚴重 5 條)

### C1 km_writer.py:194 — backfill 自打臉修
- 裸 asyncio.create_task(_backfill_path_a_approval) → await _backfill_path_a_approval_safe()
- 同步 await + 獨立 DLQ km:backfill:dlq + try/except 不阻塞主寫入
- 新增 km_backfill_reconciler_job.py(每 5 分鐘掃 DLQ)+ ENABLE_KM_BACKFILL_RECONCILER flag
- 防 Path B 比 Path A 先完成 → related_approval_id 永遠 NULL 的 race

### C2 km_writer.py:391 — KM_WRITE_AWAIT=false 路徑收緊
- 從 ensure_future(fire-and-forget 比舊版同步寫更糟)
- 改 await writer.write(retry=1, timeout=2.0)(仍 await 但只試一次、超時短)
- docstring 明確標註「緊急回滾用,不保證可靠性」

### M1 decision_manager.py:2178/2203 — 移除 _fire_and_forget 旁路
- 兩處 _fire_and_forget(executor.write_execution_result_to_km(...))
- 改 await asyncio.shield(...) + BaseException 保護(防上層 cancel 中斷)
- KM_WRITE_AWAIT=true 在這條路徑終於真正 await

### M2 incident_service.py:1099 — 自製 path 加 retry+DLQ
- 原本 if settings.KM_WRITE_AWAIT: await asyncio.wait_for else create_task
- 改 3 次指數退避 retry + DLQ 保護(呼叫 km_writer 私有 helper)

### M3 km_writer.py:166 — 冪等聲明對齊實作
- knowledge_repository.create() 加 UPSERT 路徑(pg_insert ON CONFLICT DO UPDATE)
- KnowledgeEntryCreate / KnowledgeEntryRecord 加 path_type 欄位
- migration: ADD COLUMN path_type + partial unique index uix_knowledge_incident_path

## M4 alertmanager.yml — equal: [] 收緊(critic 防爆炸抑制)
- OllamaInstanceDown / KMConverterDown 抑制加 equal: ['cluster'] 約束
- 防多 cluster 場景下任一 Ollama down 誤抑全 AI/SLO 告警

## M5 Alertmanager 版本驗證(已確認 v0.31.1,遠超 v0.22+)

## M6 governance_agent.py — health score 區分 skipped vs ok vs violated
- check_slo_compliance 加 _meta {violated_count, skipped_count, ok_count, all_skipped, status}
- run_self_check: SLO 全 skipped 時獨立發 governance_slo_data_gap 告警
  (不污染 self_failure 計數,因為 no_data 是 emitter 未實作不是治理機制故障)

## M7 scripts/check_config_drift.py — 改 AST 解析
- regex 改 ast.parse 找 Settings ClassDef AnnAssign Field(default=...)
- 避免多行 list / default_factory= / 含跳行字串的 false negative
- 4 欄位(AI_FALLBACK_ORDER / ARGOCD_URL / PROMETHEUS_URL / OLLAMA_URL)全對齊

## 新增測試
- test_km_writer_backfill_reconciler.py: 7 cases(C1 reconciler + safe helper)
- test_km_writer_idempotent.py: 5 cases(M3 path_type 注入 + UPSERT 分支)

## 驗證
- 1585 unit tests 全綠(+13 從 1572)
- amtool check-config SUCCESS(8 inhibit_rules / 2 receivers)
- drift checker AST-based 4 欄位全對齊
- Alertmanager v0.31.1 確認支援新語法

## 期望影響
- KMWriter 名實統一:飛輪閉環 KM 寫入路徑 100% 可靠
- M4 抑制爆炸風險解除
- 治理層不再對 SLO no_data 靜默
- drift checker false negative 風險解除

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-29 10:44:39 +08:00

394 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
KMWriter 單元測試
=================
P1-1 KMWriter 統一契約重構
測試範圍:
1. 成功路徑SUCCESS
2. Timeout 路徑TIMEOUT + DLQ
3. 可重試例外EXCEPTION + 指數退避 + DLQ
4. 非可重試例外(立即 DLQ
5. 冪等 / 空 payloadSKIPPED_NO_DATA
6. M4 反查鏈回填_backfill_path_a_approval
7. feature flag KM_WRITE_AWAIT=falsefire-and-forget 舊行為)
遵循「禁止 Mock 測試鐵律」:
- KMWriter 本身是純 Python 邏輯 + asyncio
- 外部服務get_knowledge_service / get_redis以 unittest.mock.AsyncMock 替換
(因為這是 unit 契約測試,不是整合測試)
建立2026-04-28 (台北時區) ogt + Claude Sonnet 4.6
"""
import asyncio
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from src.services.km_writer import (
KMWriteError,
KMWritePayload,
KMWriteResult,
KMWriter,
_is_retriable,
_write_to_dlq,
km_write_with_flag,
)
# =============================================================================
# Helper fixtures
# =============================================================================
def _make_payload(path_type: str = "approval_manual", incident_id: str | None = "INC-TEST-001",
approval_id: str | None = "AP-001") -> KMWritePayload:
return KMWritePayload(
path_type=path_type,
entry_create_kwargs=dict(
title="Test KM Entry",
content="Test content",
entry_type="incident_case",
category="test",
tags=["test"],
source="ai_extracted",
),
incident_id=incident_id,
approval_id=approval_id,
)
@pytest.fixture
def writer() -> KMWriter:
return KMWriter()
# =============================================================================
# 1. 成功路徑
# =============================================================================
@pytest.mark.asyncio
async def test_write_success(writer: KMWriter):
"""成功寫入應返回 KMWriteResult.SUCCESS"""
mock_svc = AsyncMock()
mock_svc.create_entry = AsyncMock()
with patch("src.services.km_writer.get_km_writer", return_value=writer), \
patch("src.services.knowledge_service.get_knowledge_service", return_value=mock_svc), \
patch("src.services.km_writer._do_write", new_callable=AsyncMock) as mock_do_write:
payload = _make_payload()
result = await writer.write(payload, timeout=5.0)
assert result == KMWriteResult.SUCCESS
mock_do_write.assert_called_once_with(payload)
# =============================================================================
# 2. Timeout 路徑
# =============================================================================
@pytest.mark.asyncio
async def test_write_timeout(writer: KMWriter):
"""_do_write 超時應返回 TIMEOUT 且寫 DLQ"""
async def _slow_write(payload):
await asyncio.sleep(100)
dlq_called = []
async def _mock_dlq(payload, reason):
dlq_called.append(reason)
with patch("src.services.km_writer._do_write", side_effect=_slow_write), \
patch("src.services.km_writer._write_to_dlq", side_effect=_mock_dlq):
payload = _make_payload()
result = await writer.write(payload, timeout=0.01)
assert result == KMWriteResult.TIMEOUT
assert len(dlq_called) == 1
assert "timeout" in dlq_called[0]
# =============================================================================
# 3. 可重試例外(指數退避)
# =============================================================================
@pytest.mark.asyncio
async def test_write_retriable_exception_exhausts(writer: KMWriter):
"""OperationalError 應重試 3 次後進 DLQ返回 EXCEPTION"""
call_count = {"n": 0}
async def _fail_write(payload):
call_count["n"] += 1
raise Exception("operationalerror: connection refused")
dlq_called = []
async def _mock_dlq(payload, reason):
dlq_called.append(reason)
with patch("src.services.km_writer._do_write", side_effect=_fail_write), \
patch("src.services.km_writer._write_to_dlq", side_effect=_mock_dlq), \
patch("asyncio.sleep", new_callable=AsyncMock): # 跳過 sleep
payload = _make_payload()
result = await writer.write(payload, timeout=5.0)
assert result == KMWriteResult.EXCEPTION
assert call_count["n"] == 3 # 3 次嘗試
assert len(dlq_called) == 1
# =============================================================================
# 4. 非可重試例外(立即 DLQ只嘗試 1 次)
# =============================================================================
@pytest.mark.asyncio
async def test_write_non_retriable_exception(writer: KMWriter):
"""非可重試例外(如 ValueError應立即 DLQ不重試"""
call_count = {"n": 0}
async def _fail_write(payload):
call_count["n"] += 1
raise ValueError("invalid entry_type")
dlq_called = []
async def _mock_dlq(payload, reason):
dlq_called.append(reason)
with patch("src.services.km_writer._do_write", side_effect=_fail_write), \
patch("src.services.km_writer._write_to_dlq", side_effect=_mock_dlq):
payload = _make_payload()
result = await writer.write(payload, timeout=5.0)
assert result == KMWriteResult.EXCEPTION
assert call_count["n"] == 1 # 只嘗試 1 次(非可重試)
assert len(dlq_called) == 1
# =============================================================================
# 5. 空 payloadSKIPPED_NO_DATA
# =============================================================================
@pytest.mark.asyncio
async def test_write_empty_payload(writer: KMWriter):
"""entry_create_kwargs 為空時應返回 SKIPPED_NO_DATA"""
payload = KMWritePayload(
path_type="approval_manual",
entry_create_kwargs={}, # 空
incident_id="INC-001",
)
result = await writer.write(payload, timeout=5.0)
assert result == KMWriteResult.SKIPPED_NO_DATA
# =============================================================================
# 6. M4 反查鏈回填
# =============================================================================
@pytest.mark.asyncio
async def test_backfill_path_a_approval_called_on_success():
"""
寫入成功且 approval_id + incident_id 都有時,應 schedule _backfill_path_a_approval task
"""
backfill_args = []
async def _mock_backfill(incident_id: str, approval_id: str):
backfill_args.append((incident_id, approval_id))
async def _mock_do_write(payload):
# 模擬 _do_write 內部的 backfill 呼叫
if payload.approval_id and payload.incident_id:
await _mock_backfill(payload.incident_id, payload.approval_id)
writer = KMWriter()
with patch("src.services.km_writer._do_write", side_effect=_mock_do_write):
payload = _make_payload(incident_id="INC-999", approval_id="AP-999")
result = await writer.write(payload, timeout=5.0)
assert result == KMWriteResult.SUCCESS
assert ("INC-999", "AP-999") in backfill_args
# =============================================================================
# 7. Feature Flag KM_WRITE_AWAIT=falsefire-and-forget
# =============================================================================
@pytest.mark.asyncio
async def test_km_write_with_flag_await_false():
"""
C2 修復後KM_WRITE_AWAIT=false 應 await writer.write(retry=1, timeout=2.0)
而非 fire-and-forget。確保有一次寫入嘗試降級但不全拋棄
2026-04-28 ogt + Claude Sonnet 4.6: 同步更新(原測試驗證 ensure_future現已不適用
"""
write_args = {}
async def _mock_write(payload, *, mode="sync", timeout=None, retry=None, on_failure="dlq"):
write_args["retry"] = retry
write_args["timeout"] = timeout
return KMWriteResult.SUCCESS
mock_writer = AsyncMock()
mock_writer.write.side_effect = _mock_write
with patch("src.services.km_writer.settings") as mock_settings, \
patch("src.services.km_writer.get_km_writer", return_value=mock_writer):
mock_settings.KM_WRITE_AWAIT = False
mock_settings.KM_WRITE_TIMEOUT_SECONDS = 5.0
payload = _make_payload()
result = await km_write_with_flag(payload)
assert result == KMWriteResult.SUCCESS
# C2 修法retry=1, timeout=2.0(降級但仍 await 一次)
assert write_args["retry"] == 1
assert write_args["timeout"] == 2.0
# =============================================================================
# 8. _is_retriable 輔助函式
# =============================================================================
def test_is_retriable_operational_error():
assert _is_retriable(Exception("OperationalError: too many connections")) is True
def test_is_retriable_connection_refused():
assert _is_retriable(Exception("connection refused")) is True
def test_is_retriable_timeout():
assert _is_retriable(Exception("connection timed out")) is True
def test_is_retriable_value_error():
assert _is_retriable(ValueError("invalid field")) is False
def test_is_retriable_permission_denied():
assert _is_retriable(Exception("permission denied")) is False
# =============================================================================
# 9. DLQ 寫入Redis 失敗時只 log不拋例外
# =============================================================================
@pytest.mark.asyncio
async def test_write_to_dlq_redis_failure_does_not_raise():
"""Redis DLQ 寫入失敗時不應 raise只 log error"""
mock_redis = AsyncMock()
mock_redis.lpush.side_effect = Exception("redis unavailable")
with patch("src.core.redis_client.get_redis", return_value=mock_redis):
payload = _make_payload()
# 不應拋出例外
await _write_to_dlq(payload, "test_reason")
# =============================================================================
# 10. 冪等:同 incident_id + path_type 寫入兩次,結果均為 SUCCESS冪等由下層保証
# =============================================================================
@pytest.mark.asyncio
async def test_idempotency_same_incident_path():
"""
同 incident_id + path_type 呼叫兩次 write(),兩次均應返回 SUCCESS。
冪等防重由 knowledge_service.create_entry() 的 DB-level UPSERT 保証;
KMWriter 本身不拒絕重複,確保不在 writer 層誤攔。
"""
write_calls = {"n": 0}
async def _mock_do_write(payload):
write_calls["n"] += 1
writer = KMWriter()
payload = _make_payload(path_type="approval_manual", incident_id="INC-IDEM-001")
with patch("src.services.km_writer._do_write", side_effect=_mock_do_write):
result1 = await writer.write(payload, timeout=5.0)
result2 = await writer.write(payload, timeout=5.0)
assert result1 == KMWriteResult.SUCCESS
assert result2 == KMWriteResult.SUCCESS
assert write_calls["n"] == 2 # 兩次都進 _do_writeUPSERT 由下層處理)
# =============================================================================
# 11. DLQ payload 結構驗證
# =============================================================================
@pytest.mark.asyncio
async def test_dlq_payload_structure():
"""
DLQ record 必須包含 path_type / incident_id / approval_id / reason / entry_title。
驗證 _write_to_dlq 寫入 Redis 的 JSON 結構符合規格。
"""
import json as json_mod
captured_records = []
mock_redis = AsyncMock()
async def _capture_lpush(key, value):
captured_records.append(value)
mock_redis.lpush.side_effect = _capture_lpush
mock_redis.ltrim = AsyncMock()
with patch("src.core.redis_client.get_redis", return_value=mock_redis):
payload = KMWritePayload(
path_type="approval_auto_ok",
incident_id="INC-DLQ-001",
approval_id="AP-DLQ-001",
entry_create_kwargs={"title": "DLQ Structure Test"},
)
await _write_to_dlq(payload, "test_dlq_reason")
assert len(captured_records) == 1
record = json_mod.loads(captured_records[0])
assert record["path_type"] == "approval_auto_ok"
assert record["incident_id"] == "INC-DLQ-001"
assert record["approval_id"] == "AP-DLQ-001"
assert record["reason"] == "test_dlq_reason"
assert record["entry_title"] == "DLQ Structure Test"
# =============================================================================
# 12. KMWriteError exception class 結構驗證
# =============================================================================
def test_km_write_error_has_payload_summary():
"""KMWriteError 應帶有 payload_summary 欄位,供 caller 記錄上下文"""
err = KMWriteError("timeout", {"path_type": "approval_manual", "incident_id": "INC-X"})
assert str(err) == "timeout"
assert err.payload_summary["path_type"] == "approval_manual"
assert err.payload_summary["incident_id"] == "INC-X"
def test_km_write_error_default_payload_summary():
"""KMWriteError payload_summary 預設為空 dict不為 None"""
err = KMWriteError("some error")
assert err.payload_summary == {}
# =============================================================================
# 13. on_failure="raise" 模式timeout 時拋 KMWriteError
# =============================================================================
@pytest.mark.asyncio
async def test_on_failure_raise_timeout():
"""on_failure='raise'timeout 應拋 KMWriteError 而非返回 TIMEOUT"""
async def _slow_write(payload):
await asyncio.sleep(100)
writer = KMWriter()
with patch("src.services.km_writer._do_write", side_effect=_slow_write):
payload = _make_payload()
with pytest.raises(KMWriteError) as exc_info:
await writer.write(payload, timeout=0.01, on_failure="raise")
assert "timeout" in str(exc_info.value).lower()