fix(api): reconcile completed stuck incidents
All checks were successful
Code Review / ai-code-review (push) Successful in 11s
CD Pipeline / tests (push) Successful in 1m2s
CD Pipeline / build-and-deploy (push) Successful in 3m34s
CD Pipeline / post-deploy-checks (push) Successful in 1m35s

This commit is contained in:
Your Name
2026-05-19 11:45:15 +08:00
parent 50833a0efb
commit d0835a7be1
7 changed files with 393 additions and 61 deletions

View File

@@ -0,0 +1,164 @@
"""
Incident Lifecycle Reconciler
=============================
把已有強證據的舊 stuck incident 收斂回 RESOLVED。
範圍刻意保守:
- auto_repair_executions.success = true
- approval_records.status = EXECUTION_SUCCESS
- approval_records.status = EXPIRED
不處理單純 APPROVED / NO_ACTION / manual_required避免把仍需人工的事件
誤當作自動修復完成。
"""
from __future__ import annotations
import asyncio
from dataclasses import dataclass
import structlog
from sqlalchemy import text
from src.db.base import get_db_context
logger = structlog.get_logger(__name__)
BATCH_LIMIT = 25
INTERVAL_SECONDS = 1800
@dataclass(frozen=True)
class LifecycleCandidate:
incident_id: str
resolution_type: str
reason: str
async def run_incident_lifecycle_reconciler_loop() -> None:
"""每 30 分鐘收斂一小批已有完成證據的 stuck incident。"""
while True:
try:
resolved, errors = await reconcile_stuck_incidents()
if resolved > 0 or errors > 0:
logger.info(
"incident_lifecycle_reconciler_done",
resolved=resolved,
errors=errors,
batch_limit=BATCH_LIMIT,
)
except Exception as exc:
logger.warning("incident_lifecycle_reconciler_loop_failed", error=str(exc))
await asyncio.sleep(INTERVAL_SECONDS)
async def reconcile_stuck_incidents(limit: int = BATCH_LIMIT) -> tuple[int, int]:
"""
找出已完成但仍卡在 INVESTIGATING 的 incident透過 IncidentService 統一路徑結案。
Returns:
(resolved_count, error_count)
"""
candidates = await _fetch_candidates(limit)
if not candidates:
return 0, 0
from src.services.incident_service import get_incident_service
incident_service = get_incident_service()
resolved = 0
errors = 0
for candidate in candidates:
try:
result = await incident_service.resolve_incident(
candidate.incident_id,
resolution_type=candidate.resolution_type,
)
if result is not None:
resolved += 1
logger.info(
"incident_lifecycle_reconciled",
incident_id=candidate.incident_id,
reason=candidate.reason,
resolution_type=candidate.resolution_type,
)
except Exception as exc:
errors += 1
logger.warning(
"incident_lifecycle_reconcile_failed",
incident_id=candidate.incident_id,
reason=candidate.reason,
error=str(exc),
)
return resolved, errors
async def _fetch_candidates(limit: int) -> list[LifecycleCandidate]:
async with get_db_context() as db:
result = await db.execute(
text(
"""
WITH stale AS (
SELECT
i.incident_id,
i.created_at,
EXISTS (
SELECT 1
FROM auto_repair_executions are
WHERE are.incident_id = i.incident_id
AND are.success IS TRUE
) AS has_success_auto_repair,
EXISTS (
SELECT 1
FROM approval_records ar
WHERE ar.incident_id = i.incident_id
AND ar.status::text = 'EXECUTION_SUCCESS'
) AS has_execution_success,
EXISTS (
SELECT 1
FROM approval_records ar
WHERE ar.incident_id = i.incident_id
AND ar.status::text = 'EXPIRED'
) AS has_expired_approval
FROM incidents i
WHERE i.status = 'INVESTIGATING'
AND i.created_at <= now() - interval '24 hours'
)
SELECT
incident_id,
CASE
WHEN has_success_auto_repair THEN 'auto_repair'
WHEN has_execution_success THEN 'auto_repair'
ELSE 'timeout'
END AS resolution_type,
CASE
WHEN has_success_auto_repair THEN 'auto_repair_execution_success'
WHEN has_execution_success THEN 'approval_execution_success'
ELSE 'approval_expired'
END AS reason
FROM stale
WHERE has_success_auto_repair
OR has_execution_success
OR has_expired_approval
ORDER BY created_at DESC
LIMIT :limit
"""
),
{
"limit": limit,
},
)
rows = result.mappings().all()
return [
LifecycleCandidate(
incident_id=str(row["incident_id"]),
resolution_type=str(row["resolution_type"]),
reason=str(row["reason"]),
)
for row in rows
]

View File

@@ -34,17 +34,22 @@ from sentry_sdk.integrations.starlette import StarletteIntegration
from src.api.v1 import agents as agents_v1 # Phase 9.5: Agent Teams API
from src.api.v1 import ai as ai_v1
from src.api.v1 import aider_events as aider_events_v1 # aider-watch v2 ADR-091
from src.api.v1 import ai_governance as ai_governance_v1 # 2026-05-02: /governance 頁面 3 endpoints
from src.api.v1 import (
ai_governance as ai_governance_v1, # 2026-05-02: /governance 頁面 3 endpoints
)
from src.api.v1 import ai_slo as ai_slo_v1 # Phase 6 ADR-087: AI SLO 自我治理
from src.api.v1 import aider_events as aider_events_v1 # aider-watch v2 ADR-091
from src.api.v1 import aiops_kpi as aiops_kpi_v1 # ADR-090 § Phase 7 KPI Dashboard
from src.api.v1 import aiops_timeline as aiops_timeline_v1 # 2026-04-27 Wave8-X3 B4 timeline endpoint
from src.api.v1 import approvals as approvals_v1
from src.api.v1 import (
aiops_timeline as aiops_timeline_v1, # 2026-04-27 Wave8-X3 B4 timeline endpoint
)
from src.api.v1 import alert_operation_logs as alert_operation_logs_v1
from src.api.v1 import approvals as approvals_v1
from src.api.v1 import audit_logs as audit_logs_v1
from src.api.v1 import auto_repair as auto_repair_v1 # #8: 自動升級決策
from src.api.v1 import csrf as csrf_v1 # Phase 20: CSRF Protection
from src.api.v1 import dashboard as dashboard_v1
from src.api.v1 import drift as drift_v1 # Phase 25 P2: Config Drift Detection
from src.api.v1 import errors as errors_v1 # #40: Sentry 錯誤 BFF API
from src.api.v1 import (
gitea_webhook as gitea_webhook_v1, # ADR-059: Gitea → OpenClaw (GitHub → Gitea 遷移)
@@ -56,19 +61,20 @@ from src.api.v1 import incidents as incidents_v1 # Phase 6.4: Decision Proposal
from src.api.v1 import knowledge as knowledge_v1 # KB Phase 1: Knowledge Base
from src.api.v1 import learning as learning_v1 # Phase D-G P0: Learning API
from src.api.v1 import metrics as metrics_v1 # Phase 7: Gold Metrics (真實血脈)
from src.api.v1 import monitoring as monitoring_v1 # 2026-04-03: 監控工具狀態
from src.api.v1 import notifications as notifications_v1 # 2026-04-10: 通知頻道狀態
from src.api.v1 import (
platform as platform_v1, # AwoooP Phase 4: Platform ShellShadow Mode
)
from src.api.v1 import playbooks as playbooks_v1 # #7: Playbook 萃取
from src.api.v1 import proposals as proposals_v1 # Phase 6.4h: Proposals CRUD API
from src.api.v1 import rag as rag_v1 # Phase 33 ADR-067: RAG 知識庫
from src.api.v1 import (
sentry_webhook as sentry_webhook_v1, # Phase 10.2.1: Sentry → Telegram
)
from src.api.v1 import (
signoz_webhook as signoz_webhook_v1, # Phase 21: SignOz → Telegram (ADR-037)
)
from src.api.v1 import drift as drift_v1 # Phase 25 P2: Config Drift Detection
from src.api.v1 import platform as platform_v1 # AwoooP Phase 4: Platform ShellShadow Mode
from src.api.v1 import rag as rag_v1 # Phase 33 ADR-067: RAG 知識庫
from src.api.v1 import monitoring as monitoring_v1 # 2026-04-03: 監控工具狀態
from src.api.v1 import notifications as notifications_v1 # 2026-04-10: 通知頻道狀態
from src.api.v1 import stats as stats_v1 # Phase 6.5: Statistics Analytics
from src.api.v1 import telegram as telegram_v1 # Phase 5.4: Telegram Gateway
from src.api.v1 import telegram_webhook as telegram_webhook_v1 # ADR-094: Webhook入口
@@ -81,8 +87,6 @@ from src.core.logging import get_logger, setup_logging
from src.core.redis_client import close_redis_pool, init_redis_pool
from src.core.sse import get_publisher
from src.core.telemetry import setup_telemetry, shutdown_telemetry
from src.services.adr100_slo_metrics_service import get_adr100_slo_metrics_service
from src.services.flywheel_stats_service import get_flywheel_stats_service
# CTO-201: Database & Executor
from src.db.base import close_db, init_db
@@ -92,7 +96,9 @@ from src.routers import proposals as proposals_router
# Legacy route imports (to be migrated)
from src.routes import agent, notifications, pipelines, plugins
from src.services.adr100_slo_metrics_service import get_adr100_slo_metrics_service
from src.services.executor import close_executor
from src.services.flywheel_stats_service import get_flywheel_stats_service
# Phase 5: OpenClaw AI Engine
from src.services.openclaw import close_openclaw
@@ -267,16 +273,21 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
# 2026-04-05 ogt: 重開機後 Redis 清空,從 DB restore 未解決的 incidents
# 統帥批准: 數據必須長久記錄,重開機後自動恢復 Working Memory
try:
from src.services.incident_service import get_incident_service
from sqlalchemy import select
from src.db.base import get_db_context
from src.db.models import IncidentRecord
from sqlalchemy import select
from src.models.incident import IncidentStatus
from src.services.incident_service import get_incident_service
incident_service = get_incident_service()
async with get_db_context() as db:
result = await db.execute(
select(IncidentRecord).where(
IncidentRecord.status.in_(["investigating", "mitigating"])
IncidentRecord.status.in_([
IncidentStatus.INVESTIGATING,
IncidentStatus.MITIGATING,
])
)
)
records = result.scalars().all()
@@ -284,31 +295,16 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
restored = 0
for record in records:
try:
from src.models.incident import Incident
incident = Incident(
incident_id=record.incident_id,
status=record.status,
severity=record.severity,
signals=record.signals or [],
affected_services=record.affected_services or [],
decision_chain=record.decision_chain,
proposal_ids=record.proposal_ids or [],
outcome=record.outcome,
created_at=record.created_at,
updated_at=record.updated_at,
resolved_at=record.resolved_at,
closed_at=record.closed_at,
ttl_days=record.ttl_days,
vectorized=record.vectorized,
# ADR-073: 分類欄位必須還原,否則 KM 寫入時全為 "unknown"
notification_type=record.notification_type,
alert_category=record.alert_category,
)
incident = incident_service._record_to_incident(record)
if await incident_service.save_to_working_memory(incident):
restored += 1
except Exception:
except Exception as record_error:
# 舊資料 source 值不合法node-exporter 等)→ 跳過
pass
logger.warning(
"working_memory_warmup_record_skipped",
incident_id=getattr(record, "incident_id", None),
error=str(record_error),
)
logger.info("working_memory_warmed_up", restored=restored, total=len(records))
except Exception as e:
@@ -351,7 +347,9 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
logger.warning("playbook_pg_backfill_schedule_failed", error=str(e))
try:
from src.services.playbook_embedding_service import ensure_playbook_embeddings_indexed
from src.services.playbook_embedding_service import (
ensure_playbook_embeddings_indexed,
)
asyncio.create_task(ensure_playbook_embeddings_indexed())
logger.info("playbook_embedding_indexing_scheduled")
except Exception as e:
@@ -499,6 +497,24 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
except Exception as e:
logger.warning("approval_timeout_resolver_schedule_failed", error=str(e))
# T73: 已有完成證據但仍卡在 INVESTIGATING 的舊 incident 小批次收斂。
# 僅處理 auto-repair success / approval EXECUTION_SUCCESS / approval EXPIRED
# 不自動關閉 manual_required 或單純 APPROVED 事件。
try:
from src.jobs.incident_lifecycle_reconciler import (
INTERVAL_SECONDS as INCIDENT_LIFECYCLE_RECONCILER_INTERVAL,
)
from src.jobs.incident_lifecycle_reconciler import (
run_incident_lifecycle_reconciler_loop,
)
asyncio.create_task(run_incident_lifecycle_reconciler_loop())
logger.info(
"incident_lifecycle_reconciler_scheduled",
interval_sec=INCIDENT_LIFECYCLE_RECONCILER_INTERVAL,
)
except Exception as e:
logger.warning("incident_lifecycle_reconciler_schedule_failed", error=str(e))
# ADR-083 Phase 3: Evolver Agent每日— Playbook 自動合併 + 低信任封存
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 3 初始建立
try:
@@ -510,7 +526,9 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
# ADR-104 T2: LLM Playbook DRAFT governance每小時
try:
from src.jobs.playbook_generation_governance_job import run_playbook_generation_governance_loop
from src.jobs.playbook_generation_governance_job import (
run_playbook_generation_governance_loop,
)
asyncio.create_task(run_playbook_generation_governance_loop())
logger.info(
"playbook_generation_governance_loop_scheduled",
@@ -556,8 +574,9 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
from src.utils.timezone import now_taipei
async def _run_kb_rot_cleaner_loop() -> None:
from src.jobs.kb_rot_cleaner import get_kb_rot_cleaner
import asyncio as _asyncio
from src.jobs.kb_rot_cleaner import get_kb_rot_cleaner
while True:
try:
now = now_taipei()
@@ -654,8 +673,8 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
# recovery service 每 30s 檢查 → 111 連續 3 次 HEALTHY → 自動切回 → clear_cache
# 順序:先取 singleton → wire callback → 啟動 recovery service才能接收 callback
try:
from src.services.ollama_failover_manager import get_ollama_failover_manager
from src.services.ollama_auto_recovery import get_ollama_auto_recovery_service
from src.services.ollama_failover_manager import get_ollama_failover_manager
_failover_mgr = get_ollama_failover_manager()
_recovery_svc = get_ollama_auto_recovery_service()
@@ -668,8 +687,8 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
# alerter 還沒注入 Redis → dedup fail-open告警會送出且無 dedup 保護(重複告警風險)
# 修法configure_alerter() 提前到 start() 之前Redis pool 在 lifespan 早期已就緒
try:
from src.services.failover_alerter import configure_alerter
from src.core.redis_client import get_redis
from src.services.failover_alerter import configure_alerter
configure_alerter(get_redis())
logger.info("failover_alerter_configured")
except Exception as _alerter_err:
@@ -810,6 +829,7 @@ else:
# 原因: FastAPI 不知道自己在 HTTPS 後面redirect 回 http://
# 效果: 有了此中間件307 Location 會是 https://
from uvicorn.middleware.proxy_headers import ProxyHeadersMiddleware
app.add_middleware(ProxyHeadersMiddleware, trusted_hosts="*")
# CORS - Strict Whitelist (Iron Law #2)

View File

@@ -19,7 +19,12 @@ from sqlalchemy import select
from src.db.base import get_db_context
from src.db.models import IncidentRecord
from src.models.incident import Incident, IncidentFrequencyStats, IncidentStatus, Severity
from src.models.incident import (
Incident,
IncidentFrequencyStats,
IncidentStatus,
Severity,
)
from src.repositories.interfaces import IIncidentRepository
logger = structlog.get_logger(__name__)
@@ -41,8 +46,8 @@ def _record_to_incident(record: IncidentRecord) -> Incident:
return Incident(
incident_id=record.incident_id,
status=IncidentStatus(record.status),
severity=Severity(record.severity),
status=IncidentStatus(_normalize_status(record.status)),
severity=Severity(_normalize_severity(record.severity)),
signals=record.signals or [],
affected_services=record.affected_services or [],
proposal_ids=record.proposal_ids or [],
@@ -93,6 +98,36 @@ def _incident_to_record_data(incident: Incident) -> dict[str, Any]:
}
def _normalize_status(value: str | IncidentStatus) -> str:
if isinstance(value, IncidentStatus):
return value.value
raw = str(value)
if raw in IncidentStatus.__members__:
return IncidentStatus[raw].value
normalized = raw.strip().lower()
if normalized == "open":
return IncidentStatus.INVESTIGATING.value
return normalized
def _normalize_severity(value: str | Severity) -> str:
if isinstance(value, Severity):
return value.value
raw = str(value)
if raw in Severity.__members__:
return Severity[raw].value
legacy_map = {
"critical": Severity.P0.value,
"high": Severity.P1.value,
"warning": Severity.P2.value,
"medium": Severity.P2.value,
"info": Severity.P3.value,
"low": Severity.P3.value,
"none": Severity.P3.value,
}
return legacy_map.get(raw.strip().lower(), raw)
# =============================================================================
# IncidentDBRepository
# =============================================================================
@@ -136,8 +171,8 @@ class IncidentDBRepository(IIncidentRepository):
async def get_active(self) -> list[Incident]:
"""取得所有活躍的 Incident"""
active_statuses = [
IncidentStatus.INVESTIGATING.value,
IncidentStatus.MITIGATING.value,
IncidentStatus.INVESTIGATING,
IncidentStatus.MITIGATING,
]
async with get_db_context() as db:
result = await db.execute(

View File

@@ -329,7 +329,7 @@ class FlywheelStatsService:
# 卡住的 IncidentINVESTIGATING > 24h
stuck_q = await db.execute(
select(func.count()).where(
IncidentRecord.status == IncidentStatus.INVESTIGATING.value,
IncidentRecord.status == IncidentStatus.INVESTIGATING,
IncidentRecord.created_at <= stuck_threshold,
)
)
@@ -340,7 +340,7 @@ class FlywheelStatsService:
type4_q = await db.execute(
select(func.count()).where(
IncidentRecord.notification_type == "TYPE-4",
IncidentRecord.status == IncidentStatus.INVESTIGATING.value,
IncidentRecord.status == IncidentStatus.INVESTIGATING,
)
)
type4_count = type4_q.scalar_one_or_none() or 0

View File

@@ -445,20 +445,31 @@ async def create_incident_for_approval(
# 回滾: git revert (秒級恢復)
# =============================================================================
def normalize_status(value: str) -> str:
def normalize_status(value: str | IncidentStatus) -> str:
"""
正規化 IncidentStatus 舊格式值
舊值 → 新值:
- 'open''investigating'
"""
if isinstance(value, IncidentStatus):
return value.value
raw = str(value)
if raw in IncidentStatus.__members__:
return IncidentStatus[raw].value
normalized = raw.strip().lower()
legacy_map = {
"open": "investigating",
}
return legacy_map.get(value, value)
valid_values = {status.value for status in IncidentStatus}
if normalized in valid_values:
return normalized
return legacy_map.get(normalized, raw)
def normalize_severity(value: str) -> str:
def normalize_severity(value: str | Severity) -> str:
"""
正規化 Severity 舊格式值
@@ -471,6 +482,14 @@ def normalize_severity(value: str) -> str:
- 'low''P3'
- 'none''P3'
"""
if isinstance(value, Severity):
return value.value
raw = str(value)
if raw in Severity.__members__:
return Severity[raw].value
normalized = raw.strip().lower()
legacy_map = {
"critical": "P0",
"high": "P1",
@@ -480,7 +499,7 @@ def normalize_severity(value: str) -> str:
"low": "P3",
"none": "P3",
}
return legacy_map.get(value, value)
return legacy_map.get(normalized, raw)
# =============================================================================
@@ -1097,17 +1116,24 @@ class IncidentService:
from src.repositories.incident_repository import get_incident_repository
from src.utils.timezone import now_taipei
# 1. 從 Working Memory 讀取
# 1. 從 Working Memory 讀取Redis TTL 過期時退回 PostgreSQL。
incident = await self.get_from_working_memory(incident_id)
if incident is None:
logger.warning("incident_not_found_for_resolve", incident_id=incident_id)
return None
incident = await self.get_from_episodic_memory(incident_id)
if incident is None:
logger.warning("incident_not_found_for_resolve", incident_id=incident_id)
return None
logger.info(
"incident_resolve_hydrated_from_episodic_memory",
incident_id=incident_id,
resolution_type=resolution_type,
)
# 1.5 F2 (2026-05-07 ogt + Codex + Claude Sonnet 4.6) — 冪等保護:
# 已經 RESOLVED 的 incident 直接 return existing避免後續所有副作用
# 已經 RESOLVED/CLOSED 的 incident 直接 return existing避免後續所有副作用
# 重複觸發postmortem / KB extract / KM convert / disposition / Telegram
# F2 NO_ACTION 路徑會頻繁呼叫 resolve_incident必須擋在 status mutation 之前。
if incident.status == IncidentStatus.RESOLVED:
if incident.status in (IncidentStatus.RESOLVED, IncidentStatus.CLOSED):
logger.info(
"incident_resolve_skipped_already_resolved",
incident_id=incident_id,
@@ -1146,6 +1172,7 @@ class IncidentService:
# KB Phase 2-A: 自動萃取 KB 草稿 (fire-and-forget, 2026-04-03 ogt)
try:
import asyncio
from src.services.knowledge_extractor_service import get_knowledge_extractor
asyncio.create_task(
get_knowledge_extractor().extract_from_incident(incident)
@@ -1160,13 +1187,13 @@ class IncidentService:
# 改為在呼叫點加統一契約保護(指數退避 3 次 + DLQ 失敗回收)。
try:
import asyncio
from src.core.config import settings
from src.services.km_conversion_service import get_km_conversion_service
from src.services.km_writer import (
KMWritePayload,
KMWriteResult,
_RETRY_BASE_DELAY,
_RETRY_MAX,
KMWritePayload,
_is_retriable,
_write_to_dlq,
)
@@ -1253,7 +1280,10 @@ class IncidentService:
# 孤兒 report_generation_service.trigger_postmortem 本次接上 resolve 路徑
try:
import asyncio
from src.services.report_generation_service import get_report_generation_service
from src.services.report_generation_service import (
get_report_generation_service,
)
alertname = (
incident.signals[0].labels.get("alertname", "UnknownAlert")

View File

@@ -0,0 +1,50 @@
from types import SimpleNamespace
from unittest.mock import AsyncMock
import pytest
from src.jobs.incident_lifecycle_reconciler import (
LifecycleCandidate,
reconcile_stuck_incidents,
)
@pytest.mark.asyncio
async def test_reconcile_stuck_incidents_resolves_strong_evidence(monkeypatch):
service = SimpleNamespace(resolve_incident=AsyncMock(return_value=object()))
monkeypatch.setattr(
"src.jobs.incident_lifecycle_reconciler._fetch_candidates",
AsyncMock(
return_value=[
LifecycleCandidate(
incident_id="INC-EXEC-SUCCESS",
resolution_type="auto_repair",
reason="approval_execution_success",
),
LifecycleCandidate(
incident_id="INC-TIMEOUT",
resolution_type="timeout",
reason="approval_expired",
),
]
),
)
monkeypatch.setattr(
"src.services.incident_service.get_incident_service",
lambda: service,
)
resolved, errors = await reconcile_stuck_incidents(limit=2)
assert (resolved, errors) == (2, 0)
assert service.resolve_incident.await_args_list[0].args == (
"INC-EXEC-SUCCESS",
)
assert service.resolve_incident.await_args_list[0].kwargs == {
"resolution_type": "auto_repair",
}
assert service.resolve_incident.await_args_list[1].args == ("INC-TIMEOUT",)
assert service.resolve_incident.await_args_list[1].kwargs == {
"resolution_type": "timeout",
}

View File

@@ -18,7 +18,7 @@ from unittest.mock import AsyncMock
import pytest
from src.models.incident import IncidentStatus
from src.services.incident_service import IncidentService
from src.services.incident_service import IncidentService, normalize_status
@pytest.mark.asyncio
@@ -55,6 +55,9 @@ async def test_resolve_incident_returns_none_when_not_found(monkeypatch):
monkeypatch.setattr(
svc, "get_from_working_memory", AsyncMock(return_value=None)
)
monkeypatch.setattr(
svc, "get_from_episodic_memory", AsyncMock(return_value=None)
)
save_mock = AsyncMock(return_value=True)
monkeypatch.setattr(svc, "save_to_working_memory", save_mock)
@@ -62,3 +65,33 @@ async def test_resolve_incident_returns_none_when_not_found(monkeypatch):
assert result is None
save_mock.assert_not_called()
@pytest.mark.asyncio
async def test_resolve_incident_uses_episodic_memory_for_idempotent_fallback(monkeypatch):
"""Redis TTL 過期但 DB 已 RESOLVED 時resolve 應從 DB fallback 並保持冪等。"""
fake_incident = SimpleNamespace(
incident_id="INC-DB-FALLBACK-001",
status=IncidentStatus.RESOLVED,
)
svc = IncidentService()
monkeypatch.setattr(
svc, "get_from_working_memory", AsyncMock(return_value=None)
)
episodic_mock = AsyncMock(return_value=fake_incident)
monkeypatch.setattr(svc, "get_from_episodic_memory", episodic_mock)
save_mock = AsyncMock(return_value=True)
monkeypatch.setattr(svc, "save_to_working_memory", save_mock)
result = await svc.resolve_incident("INC-DB-FALLBACK-001")
assert result is fake_incident
episodic_mock.assert_awaited_once_with("INC-DB-FALLBACK-001")
save_mock.assert_not_called()
def test_normalize_status_accepts_db_enum_name() -> None:
"""PostgreSQL SQLEnum 會存 Enum name讀回時必須正規化成 Pydantic value。"""
assert normalize_status("INVESTIGATING") == "investigating"
assert normalize_status(IncidentStatus.CLOSED) == "closed"