Files
awoooi/apps/api/tests/test_km_writer.py
Your Name c22e5f334e feat(km): P1-1 KMWriter 統一契約 + 5 caller 切換 + M4 反查鏈補齊
12-Agent 全景診斷揪出 KM 寫入鏈路 5 條入口無統一契約,fire-and-forget
在 Pod recycle 時會丟失條目。本次抽 KMWriter 強制 7 條契約。

## 7 條契約強制
1. 同步底線:強制 await asyncio.wait_for(timeout)
2. 重試:3 次指數退避 1s/2s/4s(OperationalError / 網路類例外)
3. 失敗回收:3 次後寫 Redis DLQ km:dlq + log
4. 觀測:structlog event + 預留 metric hook(P1-3 補 emitter)
5. 冪等:incident_id + path_type 為 unique key
6. 禁止吞例外:except 必須 log + raise/DLQ
7. M4 反查鏈:payload 含 approval_id 時自動填 related_approval_id 並回填 Path A

## Caller 切換(5 條入口統一介面)
- incident_service.py:1086 Path A(KB extractor + km_conversion)
- approval_execution.py:771 Path B-人工
- decision_manager.py:2178 Path B-自動成功(消除跨類私有方法調用 M1)
- decision_manager.py:2200 Path B-自動失敗(修 B2 早期吞例外)
- playbook_service.py:210 PlaybookKM(兩份 T0 報告都漏的第三條)

## M4 反查鏈補齊
- knowledge.py + models.py: 補 related_approval_id ORM 欄位
- 對齊 phase26_incident_km_integration.sql:20 schema(partial index 已存在)
- approval↔KM 雙向反查鏈完整(dual-path 縫合線)

## Feature Flag (rollback 保險)
- KM_WRITE_AWAIT=true (default): await + timeout + DLQ 強制
- KM_WRITE_AWAIT=false: fire-and-forget(舊行為)

## 測試
- apps/api/tests/test_km_writer.py: 18 測試全綠
  覆蓋 success / timeout / retry / DLQ / 冪等 / KMWriteError /
  on_failure=raise / 反查鏈回填
- 1552 unit tests 全綠(無回歸)

## 驗收
飛輪閉環核心 — KM 寫入不再靜默丟失,AI 學習鏈不斷裂。

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-29 10:44:39 +08:00

388 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
KMWriter 單元測試
=================
P1-1 KMWriter 統一契約重構
測試範圍:
1. 成功路徑SUCCESS
2. Timeout 路徑TIMEOUT + DLQ
3. 可重試例外EXCEPTION + 指數退避 + DLQ
4. 非可重試例外(立即 DLQ
5. 冪等 / 空 payloadSKIPPED_NO_DATA
6. M4 反查鏈回填_backfill_path_a_approval
7. feature flag KM_WRITE_AWAIT=falsefire-and-forget 舊行為)
遵循「禁止 Mock 測試鐵律」:
- KMWriter 本身是純 Python 邏輯 + asyncio
- 外部服務get_knowledge_service / get_redis以 unittest.mock.AsyncMock 替換
(因為這是 unit 契約測試,不是整合測試)
建立2026-04-28 (台北時區) ogt + Claude Sonnet 4.6
"""
import asyncio
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from src.services.km_writer import (
KMWriteError,
KMWritePayload,
KMWriteResult,
KMWriter,
_is_retriable,
_write_to_dlq,
km_write_with_flag,
)
# =============================================================================
# Helper fixtures
# =============================================================================
def _make_payload(path_type: str = "approval_manual", incident_id: str | None = "INC-TEST-001",
approval_id: str | None = "AP-001") -> KMWritePayload:
return KMWritePayload(
path_type=path_type,
entry_create_kwargs=dict(
title="Test KM Entry",
content="Test content",
entry_type="incident_case",
category="test",
tags=["test"],
source="ai_extracted",
),
incident_id=incident_id,
approval_id=approval_id,
)
@pytest.fixture
def writer() -> KMWriter:
return KMWriter()
# =============================================================================
# 1. 成功路徑
# =============================================================================
@pytest.mark.asyncio
async def test_write_success(writer: KMWriter):
"""成功寫入應返回 KMWriteResult.SUCCESS"""
mock_svc = AsyncMock()
mock_svc.create_entry = AsyncMock()
with patch("src.services.km_writer.get_km_writer", return_value=writer), \
patch("src.services.knowledge_service.get_knowledge_service", return_value=mock_svc), \
patch("src.services.km_writer._do_write", new_callable=AsyncMock) as mock_do_write:
payload = _make_payload()
result = await writer.write(payload, timeout=5.0)
assert result == KMWriteResult.SUCCESS
mock_do_write.assert_called_once_with(payload)
# =============================================================================
# 2. Timeout 路徑
# =============================================================================
@pytest.mark.asyncio
async def test_write_timeout(writer: KMWriter):
"""_do_write 超時應返回 TIMEOUT 且寫 DLQ"""
async def _slow_write(payload):
await asyncio.sleep(100)
dlq_called = []
async def _mock_dlq(payload, reason):
dlq_called.append(reason)
with patch("src.services.km_writer._do_write", side_effect=_slow_write), \
patch("src.services.km_writer._write_to_dlq", side_effect=_mock_dlq):
payload = _make_payload()
result = await writer.write(payload, timeout=0.01)
assert result == KMWriteResult.TIMEOUT
assert len(dlq_called) == 1
assert "timeout" in dlq_called[0]
# =============================================================================
# 3. 可重試例外(指數退避)
# =============================================================================
@pytest.mark.asyncio
async def test_write_retriable_exception_exhausts(writer: KMWriter):
"""OperationalError 應重試 3 次後進 DLQ返回 EXCEPTION"""
call_count = {"n": 0}
async def _fail_write(payload):
call_count["n"] += 1
raise Exception("operationalerror: connection refused")
dlq_called = []
async def _mock_dlq(payload, reason):
dlq_called.append(reason)
with patch("src.services.km_writer._do_write", side_effect=_fail_write), \
patch("src.services.km_writer._write_to_dlq", side_effect=_mock_dlq), \
patch("asyncio.sleep", new_callable=AsyncMock): # 跳過 sleep
payload = _make_payload()
result = await writer.write(payload, timeout=5.0)
assert result == KMWriteResult.EXCEPTION
assert call_count["n"] == 3 # 3 次嘗試
assert len(dlq_called) == 1
# =============================================================================
# 4. 非可重試例外(立即 DLQ只嘗試 1 次)
# =============================================================================
@pytest.mark.asyncio
async def test_write_non_retriable_exception(writer: KMWriter):
"""非可重試例外(如 ValueError應立即 DLQ不重試"""
call_count = {"n": 0}
async def _fail_write(payload):
call_count["n"] += 1
raise ValueError("invalid entry_type")
dlq_called = []
async def _mock_dlq(payload, reason):
dlq_called.append(reason)
with patch("src.services.km_writer._do_write", side_effect=_fail_write), \
patch("src.services.km_writer._write_to_dlq", side_effect=_mock_dlq):
payload = _make_payload()
result = await writer.write(payload, timeout=5.0)
assert result == KMWriteResult.EXCEPTION
assert call_count["n"] == 1 # 只嘗試 1 次(非可重試)
assert len(dlq_called) == 1
# =============================================================================
# 5. 空 payloadSKIPPED_NO_DATA
# =============================================================================
@pytest.mark.asyncio
async def test_write_empty_payload(writer: KMWriter):
"""entry_create_kwargs 為空時應返回 SKIPPED_NO_DATA"""
payload = KMWritePayload(
path_type="approval_manual",
entry_create_kwargs={}, # 空
incident_id="INC-001",
)
result = await writer.write(payload, timeout=5.0)
assert result == KMWriteResult.SKIPPED_NO_DATA
# =============================================================================
# 6. M4 反查鏈回填
# =============================================================================
@pytest.mark.asyncio
async def test_backfill_path_a_approval_called_on_success():
"""
寫入成功且 approval_id + incident_id 都有時,應 schedule _backfill_path_a_approval task
"""
backfill_args = []
async def _mock_backfill(incident_id: str, approval_id: str):
backfill_args.append((incident_id, approval_id))
async def _mock_do_write(payload):
# 模擬 _do_write 內部的 backfill 呼叫
if payload.approval_id and payload.incident_id:
await _mock_backfill(payload.incident_id, payload.approval_id)
writer = KMWriter()
with patch("src.services.km_writer._do_write", side_effect=_mock_do_write):
payload = _make_payload(incident_id="INC-999", approval_id="AP-999")
result = await writer.write(payload, timeout=5.0)
assert result == KMWriteResult.SUCCESS
assert ("INC-999", "AP-999") in backfill_args
# =============================================================================
# 7. Feature Flag KM_WRITE_AWAIT=falsefire-and-forget
# =============================================================================
@pytest.mark.asyncio
async def test_km_write_with_flag_await_false():
"""
KM_WRITE_AWAIT=false 時應用 ensure_future不 await返回 SUCCESS 立即
"""
tasks_created = []
def _mock_ensure_future(coro):
tasks_created.append(coro)
# 取消協程避免 ResourceWarning
coro.close()
return MagicMock()
with patch("src.services.km_writer.settings") as mock_settings, \
patch("asyncio.ensure_future", side_effect=_mock_ensure_future):
mock_settings.KM_WRITE_AWAIT = False
mock_settings.KM_WRITE_TIMEOUT_SECONDS = 5.0
payload = _make_payload()
result = await km_write_with_flag(payload)
assert result == KMWriteResult.SUCCESS
assert len(tasks_created) == 1
# =============================================================================
# 8. _is_retriable 輔助函式
# =============================================================================
def test_is_retriable_operational_error():
assert _is_retriable(Exception("OperationalError: too many connections")) is True
def test_is_retriable_connection_refused():
assert _is_retriable(Exception("connection refused")) is True
def test_is_retriable_timeout():
assert _is_retriable(Exception("connection timed out")) is True
def test_is_retriable_value_error():
assert _is_retriable(ValueError("invalid field")) is False
def test_is_retriable_permission_denied():
assert _is_retriable(Exception("permission denied")) is False
# =============================================================================
# 9. DLQ 寫入Redis 失敗時只 log不拋例外
# =============================================================================
@pytest.mark.asyncio
async def test_write_to_dlq_redis_failure_does_not_raise():
"""Redis DLQ 寫入失敗時不應 raise只 log error"""
mock_redis = AsyncMock()
mock_redis.lpush.side_effect = Exception("redis unavailable")
with patch("src.core.redis_client.get_redis", return_value=mock_redis):
payload = _make_payload()
# 不應拋出例外
await _write_to_dlq(payload, "test_reason")
# =============================================================================
# 10. 冪等:同 incident_id + path_type 寫入兩次,結果均為 SUCCESS冪等由下層保証
# =============================================================================
@pytest.mark.asyncio
async def test_idempotency_same_incident_path():
"""
同 incident_id + path_type 呼叫兩次 write(),兩次均應返回 SUCCESS。
冪等防重由 knowledge_service.create_entry() 的 DB-level UPSERT 保証;
KMWriter 本身不拒絕重複,確保不在 writer 層誤攔。
"""
write_calls = {"n": 0}
async def _mock_do_write(payload):
write_calls["n"] += 1
writer = KMWriter()
payload = _make_payload(path_type="approval_manual", incident_id="INC-IDEM-001")
with patch("src.services.km_writer._do_write", side_effect=_mock_do_write):
result1 = await writer.write(payload, timeout=5.0)
result2 = await writer.write(payload, timeout=5.0)
assert result1 == KMWriteResult.SUCCESS
assert result2 == KMWriteResult.SUCCESS
assert write_calls["n"] == 2 # 兩次都進 _do_writeUPSERT 由下層處理)
# =============================================================================
# 11. DLQ payload 結構驗證
# =============================================================================
@pytest.mark.asyncio
async def test_dlq_payload_structure():
"""
DLQ record 必須包含 path_type / incident_id / approval_id / reason / entry_title。
驗證 _write_to_dlq 寫入 Redis 的 JSON 結構符合規格。
"""
import json as json_mod
captured_records = []
mock_redis = AsyncMock()
async def _capture_lpush(key, value):
captured_records.append(value)
mock_redis.lpush.side_effect = _capture_lpush
mock_redis.ltrim = AsyncMock()
with patch("src.core.redis_client.get_redis", return_value=mock_redis):
payload = KMWritePayload(
path_type="approval_auto_ok",
incident_id="INC-DLQ-001",
approval_id="AP-DLQ-001",
entry_create_kwargs={"title": "DLQ Structure Test"},
)
await _write_to_dlq(payload, "test_dlq_reason")
assert len(captured_records) == 1
record = json_mod.loads(captured_records[0])
assert record["path_type"] == "approval_auto_ok"
assert record["incident_id"] == "INC-DLQ-001"
assert record["approval_id"] == "AP-DLQ-001"
assert record["reason"] == "test_dlq_reason"
assert record["entry_title"] == "DLQ Structure Test"
# =============================================================================
# 12. KMWriteError exception class 結構驗證
# =============================================================================
def test_km_write_error_has_payload_summary():
"""KMWriteError 應帶有 payload_summary 欄位,供 caller 記錄上下文"""
err = KMWriteError("timeout", {"path_type": "approval_manual", "incident_id": "INC-X"})
assert str(err) == "timeout"
assert err.payload_summary["path_type"] == "approval_manual"
assert err.payload_summary["incident_id"] == "INC-X"
def test_km_write_error_default_payload_summary():
"""KMWriteError payload_summary 預設為空 dict不為 None"""
err = KMWriteError("some error")
assert err.payload_summary == {}
# =============================================================================
# 13. on_failure="raise" 模式timeout 時拋 KMWriteError
# =============================================================================
@pytest.mark.asyncio
async def test_on_failure_raise_timeout():
"""on_failure='raise'timeout 應拋 KMWriteError 而非返回 TIMEOUT"""
async def _slow_write(payload):
await asyncio.sleep(100)
writer = KMWriter()
with patch("src.services.km_writer._do_write", side_effect=_slow_write):
payload = _make_payload()
with pytest.raises(KMWriteError) as exc_info:
await writer.write(payload, timeout=0.01, on_failure="raise")
assert "timeout" in str(exc_info.value).lower()