From d0835a7be19ea29bf213e4a36b3c369f45c42f49 Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 19 May 2026 11:45:15 +0800 Subject: [PATCH] fix(api): reconcile completed stuck incidents --- .../src/jobs/incident_lifecycle_reconciler.py | 164 ++++++++++++++++++ apps/api/src/main.py | 102 ++++++----- .../src/repositories/incident_repository.py | 45 ++++- .../src/services/flywheel_stats_service.py | 4 +- apps/api/src/services/incident_service.py | 54 ++++-- .../test_incident_lifecycle_reconciler.py | 50 ++++++ ...st_incident_service_resolve_idempotency.py | 35 +++- 7 files changed, 393 insertions(+), 61 deletions(-) create mode 100644 apps/api/src/jobs/incident_lifecycle_reconciler.py create mode 100644 apps/api/tests/test_incident_lifecycle_reconciler.py diff --git a/apps/api/src/jobs/incident_lifecycle_reconciler.py b/apps/api/src/jobs/incident_lifecycle_reconciler.py new file mode 100644 index 00000000..19c15da3 --- /dev/null +++ b/apps/api/src/jobs/incident_lifecycle_reconciler.py @@ -0,0 +1,164 @@ +""" +Incident Lifecycle Reconciler +============================= + +把已有強證據的舊 stuck incident 收斂回 RESOLVED。 + +範圍刻意保守: +- auto_repair_executions.success = true +- approval_records.status = EXECUTION_SUCCESS +- approval_records.status = EXPIRED + +不處理單純 APPROVED / NO_ACTION / manual_required,避免把仍需人工的事件 +誤當作自動修復完成。 +""" + +from __future__ import annotations + +import asyncio +from dataclasses import dataclass + +import structlog +from sqlalchemy import text + +from src.db.base import get_db_context + +logger = structlog.get_logger(__name__) + +BATCH_LIMIT = 25 +INTERVAL_SECONDS = 1800 + + +@dataclass(frozen=True) +class LifecycleCandidate: + incident_id: str + resolution_type: str + reason: str + + +async def run_incident_lifecycle_reconciler_loop() -> None: + """每 30 分鐘收斂一小批已有完成證據的 stuck incident。""" + while True: + try: + resolved, errors = await reconcile_stuck_incidents() + if resolved > 0 or errors > 0: + logger.info( + "incident_lifecycle_reconciler_done", + resolved=resolved, + errors=errors, + batch_limit=BATCH_LIMIT, + ) + except Exception as exc: + logger.warning("incident_lifecycle_reconciler_loop_failed", error=str(exc)) + + await asyncio.sleep(INTERVAL_SECONDS) + + +async def reconcile_stuck_incidents(limit: int = BATCH_LIMIT) -> tuple[int, int]: + """ + 找出已完成但仍卡在 INVESTIGATING 的 incident,透過 IncidentService 統一路徑結案。 + + Returns: + (resolved_count, error_count) + """ + candidates = await _fetch_candidates(limit) + if not candidates: + return 0, 0 + + from src.services.incident_service import get_incident_service + + incident_service = get_incident_service() + resolved = 0 + errors = 0 + + for candidate in candidates: + try: + result = await incident_service.resolve_incident( + candidate.incident_id, + resolution_type=candidate.resolution_type, + ) + if result is not None: + resolved += 1 + logger.info( + "incident_lifecycle_reconciled", + incident_id=candidate.incident_id, + reason=candidate.reason, + resolution_type=candidate.resolution_type, + ) + except Exception as exc: + errors += 1 + logger.warning( + "incident_lifecycle_reconcile_failed", + incident_id=candidate.incident_id, + reason=candidate.reason, + error=str(exc), + ) + + return resolved, errors + + +async def _fetch_candidates(limit: int) -> list[LifecycleCandidate]: + async with get_db_context() as db: + result = await db.execute( + text( + """ + WITH stale AS ( + SELECT + i.incident_id, + i.created_at, + EXISTS ( + SELECT 1 + FROM auto_repair_executions are + WHERE are.incident_id = i.incident_id + AND are.success IS TRUE + ) AS has_success_auto_repair, + EXISTS ( + SELECT 1 + FROM approval_records ar + WHERE ar.incident_id = i.incident_id + AND ar.status::text = 'EXECUTION_SUCCESS' + ) AS has_execution_success, + EXISTS ( + SELECT 1 + FROM approval_records ar + WHERE ar.incident_id = i.incident_id + AND ar.status::text = 'EXPIRED' + ) AS has_expired_approval + FROM incidents i + WHERE i.status = 'INVESTIGATING' + AND i.created_at <= now() - interval '24 hours' + ) + SELECT + incident_id, + CASE + WHEN has_success_auto_repair THEN 'auto_repair' + WHEN has_execution_success THEN 'auto_repair' + ELSE 'timeout' + END AS resolution_type, + CASE + WHEN has_success_auto_repair THEN 'auto_repair_execution_success' + WHEN has_execution_success THEN 'approval_execution_success' + ELSE 'approval_expired' + END AS reason + FROM stale + WHERE has_success_auto_repair + OR has_execution_success + OR has_expired_approval + ORDER BY created_at DESC + LIMIT :limit + """ + ), + { + "limit": limit, + }, + ) + rows = result.mappings().all() + + return [ + LifecycleCandidate( + incident_id=str(row["incident_id"]), + resolution_type=str(row["resolution_type"]), + reason=str(row["reason"]), + ) + for row in rows + ] diff --git a/apps/api/src/main.py b/apps/api/src/main.py index 02210dc3..6b0fa59f 100644 --- a/apps/api/src/main.py +++ b/apps/api/src/main.py @@ -34,17 +34,22 @@ from sentry_sdk.integrations.starlette import StarletteIntegration from src.api.v1 import agents as agents_v1 # Phase 9.5: Agent Teams API from src.api.v1 import ai as ai_v1 -from src.api.v1 import aider_events as aider_events_v1 # aider-watch v2 ADR-091 -from src.api.v1 import ai_governance as ai_governance_v1 # 2026-05-02: /governance 頁面 3 endpoints +from src.api.v1 import ( + ai_governance as ai_governance_v1, # 2026-05-02: /governance 頁面 3 endpoints +) from src.api.v1 import ai_slo as ai_slo_v1 # Phase 6 ADR-087: AI SLO 自我治理 +from src.api.v1 import aider_events as aider_events_v1 # aider-watch v2 ADR-091 from src.api.v1 import aiops_kpi as aiops_kpi_v1 # ADR-090 § Phase 7 KPI Dashboard -from src.api.v1 import aiops_timeline as aiops_timeline_v1 # 2026-04-27 Wave8-X3 B4 timeline endpoint -from src.api.v1 import approvals as approvals_v1 +from src.api.v1 import ( + aiops_timeline as aiops_timeline_v1, # 2026-04-27 Wave8-X3 B4 timeline endpoint +) from src.api.v1 import alert_operation_logs as alert_operation_logs_v1 +from src.api.v1 import approvals as approvals_v1 from src.api.v1 import audit_logs as audit_logs_v1 from src.api.v1 import auto_repair as auto_repair_v1 # #8: 自動升級決策 from src.api.v1 import csrf as csrf_v1 # Phase 20: CSRF Protection from src.api.v1 import dashboard as dashboard_v1 +from src.api.v1 import drift as drift_v1 # Phase 25 P2: Config Drift Detection from src.api.v1 import errors as errors_v1 # #40: Sentry 錯誤 BFF API from src.api.v1 import ( gitea_webhook as gitea_webhook_v1, # ADR-059: Gitea → OpenClaw (GitHub → Gitea 遷移) @@ -56,19 +61,20 @@ from src.api.v1 import incidents as incidents_v1 # Phase 6.4: Decision Proposal from src.api.v1 import knowledge as knowledge_v1 # KB Phase 1: Knowledge Base from src.api.v1 import learning as learning_v1 # Phase D-G P0: Learning API from src.api.v1 import metrics as metrics_v1 # Phase 7: Gold Metrics (真實血脈) +from src.api.v1 import monitoring as monitoring_v1 # 2026-04-03: 監控工具狀態 +from src.api.v1 import notifications as notifications_v1 # 2026-04-10: 通知頻道狀態 +from src.api.v1 import ( + platform as platform_v1, # AwoooP Phase 4: Platform Shell(Shadow Mode) +) from src.api.v1 import playbooks as playbooks_v1 # #7: Playbook 萃取 from src.api.v1 import proposals as proposals_v1 # Phase 6.4h: Proposals CRUD API +from src.api.v1 import rag as rag_v1 # Phase 33 ADR-067: RAG 知識庫 from src.api.v1 import ( sentry_webhook as sentry_webhook_v1, # Phase 10.2.1: Sentry → Telegram ) from src.api.v1 import ( signoz_webhook as signoz_webhook_v1, # Phase 21: SignOz → Telegram (ADR-037) ) -from src.api.v1 import drift as drift_v1 # Phase 25 P2: Config Drift Detection -from src.api.v1 import platform as platform_v1 # AwoooP Phase 4: Platform Shell(Shadow Mode) -from src.api.v1 import rag as rag_v1 # Phase 33 ADR-067: RAG 知識庫 -from src.api.v1 import monitoring as monitoring_v1 # 2026-04-03: 監控工具狀態 -from src.api.v1 import notifications as notifications_v1 # 2026-04-10: 通知頻道狀態 from src.api.v1 import stats as stats_v1 # Phase 6.5: Statistics Analytics from src.api.v1 import telegram as telegram_v1 # Phase 5.4: Telegram Gateway from src.api.v1 import telegram_webhook as telegram_webhook_v1 # ADR-094: Webhook入口 @@ -81,8 +87,6 @@ from src.core.logging import get_logger, setup_logging from src.core.redis_client import close_redis_pool, init_redis_pool from src.core.sse import get_publisher from src.core.telemetry import setup_telemetry, shutdown_telemetry -from src.services.adr100_slo_metrics_service import get_adr100_slo_metrics_service -from src.services.flywheel_stats_service import get_flywheel_stats_service # CTO-201: Database & Executor from src.db.base import close_db, init_db @@ -92,7 +96,9 @@ from src.routers import proposals as proposals_router # Legacy route imports (to be migrated) from src.routes import agent, notifications, pipelines, plugins +from src.services.adr100_slo_metrics_service import get_adr100_slo_metrics_service from src.services.executor import close_executor +from src.services.flywheel_stats_service import get_flywheel_stats_service # Phase 5: OpenClaw AI Engine from src.services.openclaw import close_openclaw @@ -267,16 +273,21 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]: # 2026-04-05 ogt: 重開機後 Redis 清空,從 DB restore 未解決的 incidents # 統帥批准: 數據必須長久記錄,重開機後自動恢復 Working Memory try: - from src.services.incident_service import get_incident_service + from sqlalchemy import select + from src.db.base import get_db_context from src.db.models import IncidentRecord - from sqlalchemy import select + from src.models.incident import IncidentStatus + from src.services.incident_service import get_incident_service incident_service = get_incident_service() async with get_db_context() as db: result = await db.execute( select(IncidentRecord).where( - IncidentRecord.status.in_(["investigating", "mitigating"]) + IncidentRecord.status.in_([ + IncidentStatus.INVESTIGATING, + IncidentStatus.MITIGATING, + ]) ) ) records = result.scalars().all() @@ -284,31 +295,16 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]: restored = 0 for record in records: try: - from src.models.incident import Incident - incident = Incident( - incident_id=record.incident_id, - status=record.status, - severity=record.severity, - signals=record.signals or [], - affected_services=record.affected_services or [], - decision_chain=record.decision_chain, - proposal_ids=record.proposal_ids or [], - outcome=record.outcome, - created_at=record.created_at, - updated_at=record.updated_at, - resolved_at=record.resolved_at, - closed_at=record.closed_at, - ttl_days=record.ttl_days, - vectorized=record.vectorized, - # ADR-073: 分類欄位必須還原,否則 KM 寫入時全為 "unknown" - notification_type=record.notification_type, - alert_category=record.alert_category, - ) + incident = incident_service._record_to_incident(record) if await incident_service.save_to_working_memory(incident): restored += 1 - except Exception: + except Exception as record_error: # 舊資料 source 值不合法(node-exporter 等)→ 跳過 - pass + logger.warning( + "working_memory_warmup_record_skipped", + incident_id=getattr(record, "incident_id", None), + error=str(record_error), + ) logger.info("working_memory_warmed_up", restored=restored, total=len(records)) except Exception as e: @@ -351,7 +347,9 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]: logger.warning("playbook_pg_backfill_schedule_failed", error=str(e)) try: - from src.services.playbook_embedding_service import ensure_playbook_embeddings_indexed + from src.services.playbook_embedding_service import ( + ensure_playbook_embeddings_indexed, + ) asyncio.create_task(ensure_playbook_embeddings_indexed()) logger.info("playbook_embedding_indexing_scheduled") except Exception as e: @@ -499,6 +497,24 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]: except Exception as e: logger.warning("approval_timeout_resolver_schedule_failed", error=str(e)) + # T73: 已有完成證據但仍卡在 INVESTIGATING 的舊 incident 小批次收斂。 + # 僅處理 auto-repair success / approval EXECUTION_SUCCESS / approval EXPIRED, + # 不自動關閉 manual_required 或單純 APPROVED 事件。 + try: + from src.jobs.incident_lifecycle_reconciler import ( + INTERVAL_SECONDS as INCIDENT_LIFECYCLE_RECONCILER_INTERVAL, + ) + from src.jobs.incident_lifecycle_reconciler import ( + run_incident_lifecycle_reconciler_loop, + ) + asyncio.create_task(run_incident_lifecycle_reconciler_loop()) + logger.info( + "incident_lifecycle_reconciler_scheduled", + interval_sec=INCIDENT_LIFECYCLE_RECONCILER_INTERVAL, + ) + except Exception as e: + logger.warning("incident_lifecycle_reconciler_schedule_failed", error=str(e)) + # ADR-083 Phase 3: Evolver Agent(每日)— Playbook 自動合併 + 低信任封存 # 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 3 初始建立 try: @@ -510,7 +526,9 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]: # ADR-104 T2: LLM Playbook DRAFT governance(每小時) try: - from src.jobs.playbook_generation_governance_job import run_playbook_generation_governance_loop + from src.jobs.playbook_generation_governance_job import ( + run_playbook_generation_governance_loop, + ) asyncio.create_task(run_playbook_generation_governance_loop()) logger.info( "playbook_generation_governance_loop_scheduled", @@ -556,8 +574,9 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]: from src.utils.timezone import now_taipei async def _run_kb_rot_cleaner_loop() -> None: - from src.jobs.kb_rot_cleaner import get_kb_rot_cleaner import asyncio as _asyncio + + from src.jobs.kb_rot_cleaner import get_kb_rot_cleaner while True: try: now = now_taipei() @@ -654,8 +673,8 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]: # recovery service 每 30s 檢查 → 111 連續 3 次 HEALTHY → 自動切回 → clear_cache # 順序:先取 singleton → wire callback → 啟動 recovery service(才能接收 callback) try: - from src.services.ollama_failover_manager import get_ollama_failover_manager from src.services.ollama_auto_recovery import get_ollama_auto_recovery_service + from src.services.ollama_failover_manager import get_ollama_failover_manager _failover_mgr = get_ollama_failover_manager() _recovery_svc = get_ollama_auto_recovery_service() @@ -668,8 +687,8 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]: # alerter 還沒注入 Redis → dedup fail-open,告警會送出且無 dedup 保護(重複告警風險) # 修法:configure_alerter() 提前到 start() 之前;Redis pool 在 lifespan 早期已就緒 try: - from src.services.failover_alerter import configure_alerter from src.core.redis_client import get_redis + from src.services.failover_alerter import configure_alerter configure_alerter(get_redis()) logger.info("failover_alerter_configured") except Exception as _alerter_err: @@ -810,6 +829,7 @@ else: # 原因: FastAPI 不知道自己在 HTTPS 後面,redirect 回 http:// # 效果: 有了此中間件,307 Location 會是 https:// from uvicorn.middleware.proxy_headers import ProxyHeadersMiddleware + app.add_middleware(ProxyHeadersMiddleware, trusted_hosts="*") # CORS - Strict Whitelist (Iron Law #2) diff --git a/apps/api/src/repositories/incident_repository.py b/apps/api/src/repositories/incident_repository.py index b6f18723..66a5aceb 100644 --- a/apps/api/src/repositories/incident_repository.py +++ b/apps/api/src/repositories/incident_repository.py @@ -19,7 +19,12 @@ from sqlalchemy import select from src.db.base import get_db_context from src.db.models import IncidentRecord -from src.models.incident import Incident, IncidentFrequencyStats, IncidentStatus, Severity +from src.models.incident import ( + Incident, + IncidentFrequencyStats, + IncidentStatus, + Severity, +) from src.repositories.interfaces import IIncidentRepository logger = structlog.get_logger(__name__) @@ -41,8 +46,8 @@ def _record_to_incident(record: IncidentRecord) -> Incident: return Incident( incident_id=record.incident_id, - status=IncidentStatus(record.status), - severity=Severity(record.severity), + status=IncidentStatus(_normalize_status(record.status)), + severity=Severity(_normalize_severity(record.severity)), signals=record.signals or [], affected_services=record.affected_services or [], proposal_ids=record.proposal_ids or [], @@ -93,6 +98,36 @@ def _incident_to_record_data(incident: Incident) -> dict[str, Any]: } +def _normalize_status(value: str | IncidentStatus) -> str: + if isinstance(value, IncidentStatus): + return value.value + raw = str(value) + if raw in IncidentStatus.__members__: + return IncidentStatus[raw].value + normalized = raw.strip().lower() + if normalized == "open": + return IncidentStatus.INVESTIGATING.value + return normalized + + +def _normalize_severity(value: str | Severity) -> str: + if isinstance(value, Severity): + return value.value + raw = str(value) + if raw in Severity.__members__: + return Severity[raw].value + legacy_map = { + "critical": Severity.P0.value, + "high": Severity.P1.value, + "warning": Severity.P2.value, + "medium": Severity.P2.value, + "info": Severity.P3.value, + "low": Severity.P3.value, + "none": Severity.P3.value, + } + return legacy_map.get(raw.strip().lower(), raw) + + # ============================================================================= # IncidentDBRepository # ============================================================================= @@ -136,8 +171,8 @@ class IncidentDBRepository(IIncidentRepository): async def get_active(self) -> list[Incident]: """取得所有活躍的 Incident""" active_statuses = [ - IncidentStatus.INVESTIGATING.value, - IncidentStatus.MITIGATING.value, + IncidentStatus.INVESTIGATING, + IncidentStatus.MITIGATING, ] async with get_db_context() as db: result = await db.execute( diff --git a/apps/api/src/services/flywheel_stats_service.py b/apps/api/src/services/flywheel_stats_service.py index 1d51d471..628b9f3a 100644 --- a/apps/api/src/services/flywheel_stats_service.py +++ b/apps/api/src/services/flywheel_stats_service.py @@ -329,7 +329,7 @@ class FlywheelStatsService: # 卡住的 Incident(INVESTIGATING > 24h) stuck_q = await db.execute( select(func.count()).where( - IncidentRecord.status == IncidentStatus.INVESTIGATING.value, + IncidentRecord.status == IncidentStatus.INVESTIGATING, IncidentRecord.created_at <= stuck_threshold, ) ) @@ -340,7 +340,7 @@ class FlywheelStatsService: type4_q = await db.execute( select(func.count()).where( IncidentRecord.notification_type == "TYPE-4", - IncidentRecord.status == IncidentStatus.INVESTIGATING.value, + IncidentRecord.status == IncidentStatus.INVESTIGATING, ) ) type4_count = type4_q.scalar_one_or_none() or 0 diff --git a/apps/api/src/services/incident_service.py b/apps/api/src/services/incident_service.py index 8f4df164..46a3d5a9 100644 --- a/apps/api/src/services/incident_service.py +++ b/apps/api/src/services/incident_service.py @@ -445,20 +445,31 @@ async def create_incident_for_approval( # 回滾: git revert (秒級恢復) # ============================================================================= -def normalize_status(value: str) -> str: +def normalize_status(value: str | IncidentStatus) -> str: """ 正規化 IncidentStatus 舊格式值 舊值 → 新值: - 'open' → 'investigating' """ + if isinstance(value, IncidentStatus): + return value.value + + raw = str(value) + if raw in IncidentStatus.__members__: + return IncidentStatus[raw].value + + normalized = raw.strip().lower() legacy_map = { "open": "investigating", } - return legacy_map.get(value, value) + valid_values = {status.value for status in IncidentStatus} + if normalized in valid_values: + return normalized + return legacy_map.get(normalized, raw) -def normalize_severity(value: str) -> str: +def normalize_severity(value: str | Severity) -> str: """ 正規化 Severity 舊格式值 @@ -471,6 +482,14 @@ def normalize_severity(value: str) -> str: - 'low' → 'P3' - 'none' → 'P3' """ + if isinstance(value, Severity): + return value.value + + raw = str(value) + if raw in Severity.__members__: + return Severity[raw].value + + normalized = raw.strip().lower() legacy_map = { "critical": "P0", "high": "P1", @@ -480,7 +499,7 @@ def normalize_severity(value: str) -> str: "low": "P3", "none": "P3", } - return legacy_map.get(value, value) + return legacy_map.get(normalized, raw) # ============================================================================= @@ -1097,17 +1116,24 @@ class IncidentService: from src.repositories.incident_repository import get_incident_repository from src.utils.timezone import now_taipei - # 1. 從 Working Memory 讀取 + # 1. 從 Working Memory 讀取;Redis TTL 過期時退回 PostgreSQL。 incident = await self.get_from_working_memory(incident_id) if incident is None: - logger.warning("incident_not_found_for_resolve", incident_id=incident_id) - return None + incident = await self.get_from_episodic_memory(incident_id) + if incident is None: + logger.warning("incident_not_found_for_resolve", incident_id=incident_id) + return None + logger.info( + "incident_resolve_hydrated_from_episodic_memory", + incident_id=incident_id, + resolution_type=resolution_type, + ) # 1.5 F2 (2026-05-07 ogt + Codex + Claude Sonnet 4.6) — 冪等保護: - # 已經 RESOLVED 的 incident 直接 return existing,避免後續所有副作用 + # 已經 RESOLVED/CLOSED 的 incident 直接 return existing,避免後續所有副作用 # 重複觸發(postmortem / KB extract / KM convert / disposition / Telegram)。 # F2 NO_ACTION 路徑會頻繁呼叫 resolve_incident,必須擋在 status mutation 之前。 - if incident.status == IncidentStatus.RESOLVED: + if incident.status in (IncidentStatus.RESOLVED, IncidentStatus.CLOSED): logger.info( "incident_resolve_skipped_already_resolved", incident_id=incident_id, @@ -1146,6 +1172,7 @@ class IncidentService: # KB Phase 2-A: 自動萃取 KB 草稿 (fire-and-forget, 2026-04-03 ogt) try: import asyncio + from src.services.knowledge_extractor_service import get_knowledge_extractor asyncio.create_task( get_knowledge_extractor().extract_from_incident(incident) @@ -1160,13 +1187,13 @@ class IncidentService: # 改為在呼叫點加統一契約保護(指數退避 3 次 + DLQ 失敗回收)。 try: import asyncio + from src.core.config import settings from src.services.km_conversion_service import get_km_conversion_service from src.services.km_writer import ( - KMWritePayload, - KMWriteResult, _RETRY_BASE_DELAY, _RETRY_MAX, + KMWritePayload, _is_retriable, _write_to_dlq, ) @@ -1253,7 +1280,10 @@ class IncidentService: # 孤兒 report_generation_service.trigger_postmortem 本次接上 resolve 路徑 try: import asyncio - from src.services.report_generation_service import get_report_generation_service + + from src.services.report_generation_service import ( + get_report_generation_service, + ) alertname = ( incident.signals[0].labels.get("alertname", "UnknownAlert") diff --git a/apps/api/tests/test_incident_lifecycle_reconciler.py b/apps/api/tests/test_incident_lifecycle_reconciler.py new file mode 100644 index 00000000..638df6eb --- /dev/null +++ b/apps/api/tests/test_incident_lifecycle_reconciler.py @@ -0,0 +1,50 @@ +from types import SimpleNamespace +from unittest.mock import AsyncMock + +import pytest + +from src.jobs.incident_lifecycle_reconciler import ( + LifecycleCandidate, + reconcile_stuck_incidents, +) + + +@pytest.mark.asyncio +async def test_reconcile_stuck_incidents_resolves_strong_evidence(monkeypatch): + service = SimpleNamespace(resolve_incident=AsyncMock(return_value=object())) + + monkeypatch.setattr( + "src.jobs.incident_lifecycle_reconciler._fetch_candidates", + AsyncMock( + return_value=[ + LifecycleCandidate( + incident_id="INC-EXEC-SUCCESS", + resolution_type="auto_repair", + reason="approval_execution_success", + ), + LifecycleCandidate( + incident_id="INC-TIMEOUT", + resolution_type="timeout", + reason="approval_expired", + ), + ] + ), + ) + monkeypatch.setattr( + "src.services.incident_service.get_incident_service", + lambda: service, + ) + + resolved, errors = await reconcile_stuck_incidents(limit=2) + + assert (resolved, errors) == (2, 0) + assert service.resolve_incident.await_args_list[0].args == ( + "INC-EXEC-SUCCESS", + ) + assert service.resolve_incident.await_args_list[0].kwargs == { + "resolution_type": "auto_repair", + } + assert service.resolve_incident.await_args_list[1].args == ("INC-TIMEOUT",) + assert service.resolve_incident.await_args_list[1].kwargs == { + "resolution_type": "timeout", + } diff --git a/apps/api/tests/test_incident_service_resolve_idempotency.py b/apps/api/tests/test_incident_service_resolve_idempotency.py index 5a0cc473..f514f93d 100644 --- a/apps/api/tests/test_incident_service_resolve_idempotency.py +++ b/apps/api/tests/test_incident_service_resolve_idempotency.py @@ -18,7 +18,7 @@ from unittest.mock import AsyncMock import pytest from src.models.incident import IncidentStatus -from src.services.incident_service import IncidentService +from src.services.incident_service import IncidentService, normalize_status @pytest.mark.asyncio @@ -55,6 +55,9 @@ async def test_resolve_incident_returns_none_when_not_found(monkeypatch): monkeypatch.setattr( svc, "get_from_working_memory", AsyncMock(return_value=None) ) + monkeypatch.setattr( + svc, "get_from_episodic_memory", AsyncMock(return_value=None) + ) save_mock = AsyncMock(return_value=True) monkeypatch.setattr(svc, "save_to_working_memory", save_mock) @@ -62,3 +65,33 @@ async def test_resolve_incident_returns_none_when_not_found(monkeypatch): assert result is None save_mock.assert_not_called() + + +@pytest.mark.asyncio +async def test_resolve_incident_uses_episodic_memory_for_idempotent_fallback(monkeypatch): + """Redis TTL 過期但 DB 已 RESOLVED 時,resolve 應從 DB fallback 並保持冪等。""" + fake_incident = SimpleNamespace( + incident_id="INC-DB-FALLBACK-001", + status=IncidentStatus.RESOLVED, + ) + + svc = IncidentService() + monkeypatch.setattr( + svc, "get_from_working_memory", AsyncMock(return_value=None) + ) + episodic_mock = AsyncMock(return_value=fake_incident) + monkeypatch.setattr(svc, "get_from_episodic_memory", episodic_mock) + save_mock = AsyncMock(return_value=True) + monkeypatch.setattr(svc, "save_to_working_memory", save_mock) + + result = await svc.resolve_incident("INC-DB-FALLBACK-001") + + assert result is fake_incident + episodic_mock.assert_awaited_once_with("INC-DB-FALLBACK-001") + save_mock.assert_not_called() + + +def test_normalize_status_accepts_db_enum_name() -> None: + """PostgreSQL SQLEnum 會存 Enum name;讀回時必須正規化成 Pydantic value。""" + assert normalize_status("INVESTIGATING") == "investigating" + assert normalize_status(IncidentStatus.CLOSED) == "closed"