Files
awoooi/apps/api/src/services/km_writer.py
Your Name c5753e1c57 fix(critic-review): KMWriter 名實統一 + Alertmanager 修抑制 + drift checker AST 化
critic PR review 揭示已 push commits 的 7 個 blocker,本 commit 全部修復。

## C1 + C2 + M1 + M2 + M3 — KMWriter 真正統一契約(critic 最嚴重 5 條)

### C1 km_writer.py:194 — backfill 自打臉修
- 裸 asyncio.create_task(_backfill_path_a_approval) → await _backfill_path_a_approval_safe()
- 同步 await + 獨立 DLQ km:backfill:dlq + try/except 不阻塞主寫入
- 新增 km_backfill_reconciler_job.py(每 5 分鐘掃 DLQ)+ ENABLE_KM_BACKFILL_RECONCILER flag
- 防 Path B 比 Path A 先完成 → related_approval_id 永遠 NULL 的 race

### C2 km_writer.py:391 — KM_WRITE_AWAIT=false 路徑收緊
- 從 ensure_future(fire-and-forget 比舊版同步寫更糟)
- 改 await writer.write(retry=1, timeout=2.0)(仍 await 但只試一次、超時短)
- docstring 明確標註「緊急回滾用,不保證可靠性」

### M1 decision_manager.py:2178/2203 — 移除 _fire_and_forget 旁路
- 兩處 _fire_and_forget(executor.write_execution_result_to_km(...))
- 改 await asyncio.shield(...) + BaseException 保護(防上層 cancel 中斷)
- KM_WRITE_AWAIT=true 在這條路徑終於真正 await

### M2 incident_service.py:1099 — 自製 path 加 retry+DLQ
- 原本 if settings.KM_WRITE_AWAIT: await asyncio.wait_for else create_task
- 改 3 次指數退避 retry + DLQ 保護(呼叫 km_writer 私有 helper)

### M3 km_writer.py:166 — 冪等聲明對齊實作
- knowledge_repository.create() 加 UPSERT 路徑(pg_insert ON CONFLICT DO UPDATE)
- KnowledgeEntryCreate / KnowledgeEntryRecord 加 path_type 欄位
- migration: ADD COLUMN path_type + partial unique index uix_knowledge_incident_path

## M4 alertmanager.yml — equal: [] 收緊(critic 防爆炸抑制)
- OllamaInstanceDown / KMConverterDown 抑制加 equal: ['cluster'] 約束
- 防多 cluster 場景下任一 Ollama down 誤抑全 AI/SLO 告警

## M5 Alertmanager 版本驗證(已確認 v0.31.1,遠超 v0.22+)

## M6 governance_agent.py — health score 區分 skipped vs ok vs violated
- check_slo_compliance 加 _meta {violated_count, skipped_count, ok_count, all_skipped, status}
- run_self_check: SLO 全 skipped 時獨立發 governance_slo_data_gap 告警
  (不污染 self_failure 計數,因為 no_data 是 emitter 未實作不是治理機制故障)

## M7 scripts/check_config_drift.py — 改 AST 解析
- regex 改 ast.parse 找 Settings ClassDef AnnAssign Field(default=...)
- 避免多行 list / default_factory= / 含跳行字串的 false negative
- 4 欄位(AI_FALLBACK_ORDER / ARGOCD_URL / PROMETHEUS_URL / OLLAMA_URL)全對齊

## 新增測試
- test_km_writer_backfill_reconciler.py: 7 cases(C1 reconciler + safe helper)
- test_km_writer_idempotent.py: 5 cases(M3 path_type 注入 + UPSERT 分支)

## 驗證
- 1585 unit tests 全綠(+13 從 1572)
- amtool check-config SUCCESS(8 inhibit_rules / 2 receivers)
- drift checker AST-based 4 欄位全對齊
- Alertmanager v0.31.1 確認支援新語法

## 期望影響
- KMWriter 名實統一:飛輪閉環 KM 寫入路徑 100% 可靠
- M4 抑制爆炸風險解除
- 治理層不再對 SLO no_data 靜默
- drift checker false negative 風險解除

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-29 10:44:39 +08:00

426 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
KMWriter - Knowledge Memory 統一寫入契約
=========================================
P1-1 KMWriter 統一契約重構
2026-04-28 ogt + Claude Sonnet 4.6
設計動機:
原本 5 條 KM 寫入入口各自決定同步/異步/錯誤處理,
fire-and-forget 在 Pod recycle 時被殺,導致 KM 條目遺失。
此模組統一契約,確保所有路徑有一致的 await / retry / DLQ 行為。
7 條契約強制:
1. 同步底線:強制 await asyncio.wait_for(timeout)
2. 重試3 次指數退避 1s/2s/4s僅針對 OperationalError / 網路類例外
3. 失敗回收3 次後寫 Redis DLQ km:dlq + log
4. 觀測structlog event + 預留 metric hook
5. 冪等incident_id + path_type 為 unique key避免重複
6. 禁止吞例外except 必須 log + DLQ
7. M4 反查鏈payload 含 approval_id 時自動填 related_approval_id 並回填 Path A 條目
Feature Flag
KM_WRITE_AWAIT=true預設→ await 強制模式
KM_WRITE_AWAIT=false → 舊 fire-and-forget回滾用
"""
import asyncio
import json
from enum import Enum
from typing import Any, Literal, Protocol, runtime_checkable
import structlog
from pydantic import BaseModel, Field
from src.core.config import settings
logger = structlog.get_logger(__name__)
# DLQ Redis keys
KM_DLQ_KEY = "km:dlq" # 主寫入 DLQ
KM_BACKFILL_DLQ_KEY = "km:backfill:dlq" # C1 backfill 獨立 DLQ路徑 A approval 回填)
KM_DLQ_MAX_LEN = 1000 # 防止 DLQ 無限膨脹
# 重試設定
_RETRY_MAX = 3
_RETRY_BASE_DELAY = 1.0 # 指數退避基底(秒)
# 可重試的例外 patternsOperationalError + 網路類)
_RETRIABLE_ERRORS = (
"operationalerror",
"connection refused",
"connection reset",
"connection timed out",
"broken pipe",
"network",
"timeout",
"too many connections",
"deadlock",
)
class KMWriteResult(str, Enum):
"""KM 寫入結果"""
SUCCESS = "success"
TIMEOUT = "timeout"
EXCEPTION = "exception"
SKIPPED_NO_DATA = "skipped_no_data"
DLQ_WRITTEN = "dlq_written" # P1-1: 明確表示 DLQ 已接管(供 caller 判斷)
class KMWriteError(Exception):
"""
KMWriter 寫入例外(非可重試類,直接 DLQ
區別於可重試的 OperationalError / 網路類例外;
caller 若需要主動識別 KM 寫入失敗,可 catch 此類別。
P1-1 2026-04-28 ogt + Claude Sonnet 4.6
"""
def __init__(self, message: str, payload_summary: dict[str, Any] | None = None) -> None:
super().__init__(message)
self.payload_summary = payload_summary or {}
@runtime_checkable
class KMWriterProtocol(Protocol):
"""
KM 寫入服務協議介面(供 DI / mock 使用)
P1-1 擴充參數2026-04-28 ogt + Claude Sonnet 4.6
mode — "sync"(預設,強制 await預留 "async" 供未來非同步管線
timeout — override settings.KM_WRITE_TIMEOUT_SECONDSNone = 使用設定)
retry — 最大重試次數None = 使用模組常數 _RETRY_MAX=3
on_failure — "dlq"(預設,寫 DLQ"raise" = 拋 KMWriteError 給 caller
"""
async def write(
self,
payload: "KMWritePayload",
*,
mode: Literal["sync", "async"] = "sync",
timeout: float | None = None,
retry: int | None = None,
on_failure: Literal["dlq", "raise"] = "dlq",
) -> KMWriteResult:
...
class KMWritePayload(BaseModel):
"""
KM 寫入載體(統一所有路徑的資料格式)
P1-1 2026-04-28 ogt + Claude Sonnet 4.6
從 __slots__ class 升級為 Pydantic BaseModel保留所有欄位向下相容
新增型別驗證與 metadata 欄位。
path_type用於冪等 key + 日誌標記
- "incident_resolve" ← Path A
- "approval_manual" ← Path B-人工
- "approval_auto_ok" ← Path B-自動成功
- "approval_auto_fail" ← Path B-自動失敗
- "playbook_extract" ← Path C
冪等 key(incident_id, path_type) — 同組合的重複寫入由 knowledge_service.create_entry() UPSERT 處理
"""
model_config = {"arbitrary_types_allowed": True}
path_type: str = Field(..., description="寫入路徑類型,構成冪等 key 的一部分")
incident_id: str | None = Field(None, description="關聯告警 IDPath A/B/C 均可帶入)")
approval_id: str | None = Field(None, description="關聯審核 IDPath B 帶入,觸發 M4 反查鏈)")
entry_create_kwargs: dict[str, Any] = Field(
default_factory=dict,
description="傳入 KnowledgeEntryCreate 的全部欄位title / content / entry_type / ...",
)
metadata: dict[str, Any] = Field(
default_factory=dict,
description="額外觀測元資料(不寫 DB僅用於 structlog / DLQ record",
)
def _is_retriable(exc: Exception) -> bool:
"""判斷例外是否屬於可重試類型"""
msg = str(exc).lower()
return any(pattern in msg for pattern in _RETRIABLE_ERRORS)
async def _write_to_dlq(payload: KMWritePayload, reason: str) -> None:
"""將失敗載體寫入 Redis DLQ確保不遺失"""
try:
from src.core.redis_client import get_redis
redis = get_redis()
record = json.dumps({
"path_type": payload.path_type,
"incident_id": payload.incident_id,
"approval_id": payload.approval_id,
"reason": reason,
"entry_title": payload.entry_create_kwargs.get("title", "")[:100],
}, ensure_ascii=False)
# LPUSH + LTRIM 防止 DLQ 無限膨脹
await redis.lpush(KM_DLQ_KEY, record)
await redis.ltrim(KM_DLQ_KEY, 0, KM_DLQ_MAX_LEN - 1)
logger.warning(
"km_write_dlq",
path_type=payload.path_type,
incident_id=payload.incident_id,
approval_id=payload.approval_id,
reason=reason,
)
except Exception as dlq_exc:
# DLQ 本身失敗只 log不再遞迴
logger.error(
"km_dlq_write_failed",
path_type=payload.path_type,
error=str(dlq_exc),
)
async def _backfill_path_a_approval(incident_id: str, approval_id: str) -> None:
"""
M4 反查鏈回填:當 approval_id 已知,回填 Path Aincident_resolve的 KM 條目。
UPDATE knowledge_entries
SET related_approval_id = :aid
WHERE related_incident_id = :iid AND related_approval_id IS NULL
"""
from sqlalchemy import text as sa_text
from src.db.base import get_db_context
async with get_db_context() as db:
await db.execute(
sa_text(
"UPDATE knowledge_entries "
"SET related_approval_id = :aid "
"WHERE related_incident_id = :iid "
" AND related_approval_id IS NULL"
),
{"aid": approval_id, "iid": incident_id},
)
await db.commit()
logger.info(
"km_backfill_approval_done",
incident_id=incident_id,
approval_id=approval_id,
)
async def _backfill_path_a_approval_safe(incident_id: str, approval_id: str) -> None:
"""
C1 修復 2026-04-28 ogt + Claude Sonnet 4.6
_backfill_path_a_approval 的安全 wrapper失敗時寫獨立 DLQkm:backfill:dlq
由 _do_write 同步 await 呼叫(取代原裸 asyncio.create_task
backfill 失敗不影響主寫入結果,但確保有跡可查供 reconciler 補救。
"""
try:
await _backfill_path_a_approval(incident_id, approval_id)
except Exception as e:
logger.warning(
"km_backfill_approval_failed",
incident_id=incident_id,
approval_id=approval_id,
error=str(e),
)
# 寫獨立 DLQ 供 KMBackfillReconciler 補救
try:
from src.core.redis_client import get_redis
import json
redis = get_redis()
record = json.dumps({
"incident_id": incident_id,
"approval_id": approval_id,
"error": str(e),
}, ensure_ascii=False)
await redis.lpush(KM_BACKFILL_DLQ_KEY, record)
await redis.ltrim(KM_BACKFILL_DLQ_KEY, 0, KM_DLQ_MAX_LEN - 1)
except Exception as dlq_exc:
logger.error(
"km_backfill_dlq_write_failed",
incident_id=incident_id,
error=str(dlq_exc),
)
async def _do_write(payload: KMWritePayload) -> None:
"""
實際執行 KM 寫入(含 M4 反查鏈填充)
- 若 payload.approval_id 存在,自動填入 entry 的 related_approval_id
- 若 payload.incident_id + approval_id 均存在,寫完後同步 await backfill Path A 條目
- path_type 傳入 KnowledgeEntryCreate觸發 UPSERT 冪等M3
"""
from src.models.knowledge import KnowledgeEntryCreate
from src.services.knowledge_service import get_knowledge_service
kwargs = dict(payload.entry_create_kwargs)
# M4 反查鏈:自動填入 related_approval_id
if payload.approval_id and "related_approval_id" not in kwargs:
kwargs["related_approval_id"] = payload.approval_id
# M3 冪等:注入 path_type + related_incident_id觸發 UPSERT
if payload.path_type and "path_type" not in kwargs:
kwargs["path_type"] = payload.path_type
if payload.incident_id and "related_incident_id" not in kwargs:
kwargs["related_incident_id"] = payload.incident_id
entry_data = KnowledgeEntryCreate(**kwargs)
await get_knowledge_service().create_entry(entry_data)
# C1 修復 2026-04-28 ogt + Claude Sonnet 4.6:
# M4 反查鏈回填:同步 await原為裸 create_taskPod recycle 時被丟棄)
# 回填失敗 → 寫獨立 DLQ key km:backfill:dlq不影響主寫入結果
if payload.approval_id and payload.incident_id:
await _backfill_path_a_approval_safe(payload.incident_id, payload.approval_id)
logger.info(
"km_write_success",
path_type=payload.path_type,
incident_id=payload.incident_id,
approval_id=payload.approval_id,
title=kwargs.get("title", "")[:80],
)
class KMWriter:
"""
KM 統一寫入服務(預設實作)
所有路徑統一透過此類呼叫,確保:
- await 強制KM_WRITE_AWAIT=true
- 指數退避重試
- DLQ 失敗回收
- M4 反查鏈
"""
async def write(
self,
payload: KMWritePayload,
*,
mode: Literal["sync", "async"] = "sync",
timeout: float | None = None,
retry: int | None = None,
on_failure: Literal["dlq", "raise"] = "dlq",
) -> KMWriteResult:
"""
寫入 KM 條目。
Args:
payload: KM 寫入載體
mode: "sync"(預設強制 await"async" 預留未來管線用
timeout: override KM_WRITE_TIMEOUT_SECONDSNone = 使用 settings
retry: 最大重試次數None = 使用模組常數 _RETRY_MAX=3
on_failure: "dlq"(預設寫 DLQ"raise" = 拋 KMWriteError 給 caller
Returns:
KMWriteResult
"""
if not payload.entry_create_kwargs:
return KMWriteResult.SKIPPED_NO_DATA
effective_timeout = timeout if timeout is not None else settings.KM_WRITE_TIMEOUT_SECONDS
effective_retry = retry if retry is not None else _RETRY_MAX
last_exc: Exception | None = None
for attempt in range(1, effective_retry + 1):
try:
await asyncio.wait_for(_do_write(payload), timeout=effective_timeout)
return KMWriteResult.SUCCESS
except asyncio.TimeoutError:
logger.warning(
"km_write_timeout",
path_type=payload.path_type,
incident_id=payload.incident_id,
attempt=attempt,
timeout_sec=effective_timeout,
)
# Timeout 不重試(重試也會 timeout且會阻塞更久
if on_failure == "raise":
raise KMWriteError(
f"KM write timeout after {effective_timeout}s",
{"path_type": payload.path_type, "incident_id": payload.incident_id},
)
await _write_to_dlq(payload, f"timeout_{effective_timeout}s")
return KMWriteResult.TIMEOUT
except Exception as exc:
last_exc = exc
if attempt < effective_retry and _is_retriable(exc):
delay = _RETRY_BASE_DELAY * (2 ** (attempt - 1)) # 1s / 2s / 4s
logger.warning(
"km_write_retry",
path_type=payload.path_type,
incident_id=payload.incident_id,
attempt=attempt,
delay_sec=delay,
error=str(exc),
)
await asyncio.sleep(delay)
else:
# 非可重試錯誤,或已達最大重試次數
break
# 所有重試耗盡
logger.error(
"km_write_failed_all_retries",
path_type=payload.path_type,
incident_id=payload.incident_id,
approval_id=payload.approval_id,
error=str(last_exc),
)
if on_failure == "raise":
raise KMWriteError(
str(last_exc),
{"path_type": payload.path_type, "incident_id": payload.incident_id},
)
await _write_to_dlq(payload, str(last_exc))
return KMWriteResult.EXCEPTION
# ---------------------------------------------------------------------------
# 模組級單例(與 knowledge_service / decision_manager 等同模式)
# ---------------------------------------------------------------------------
_km_writer: KMWriter | None = None
def get_km_writer() -> KMWriter:
"""取得 KMWriter 單例"""
global _km_writer
if _km_writer is None:
_km_writer = KMWriter()
return _km_writer
# ---------------------------------------------------------------------------
# 便利函式:包裹舊式 fire-and-forget → 統一契約呼叫
# 供 incident_service / decision_manager / playbook_service 使用
# ---------------------------------------------------------------------------
async def km_write_with_flag(
payload: KMWritePayload,
*,
timeout: float | None = None,
) -> KMWriteResult:
"""
統一入口:根據 KM_WRITE_AWAIT feature flag 決定執行模式。
KM_WRITE_AWAIT=true預設await 強制,確保寫入可靠性。
KM_WRITE_AWAIT=false緊急降級模式回滾用
C2 修復 2026-04-28 ogt + Claude Sonnet 4.6
不再用 ensure_future 裸丟Pod recycle 時 task 被丟棄且比舊同步行為更差)。
改為 await writer.write(retry=1, timeout=2.0):仍然等待一次嘗試完成,
但只試一次、timeout 短,失敗寫 DLQ 後繼續。
docstring 明確標註KM_WRITE_AWAIT=false 不保證可靠性,僅供緊急回滾用。
"""
writer = get_km_writer()
if settings.KM_WRITE_AWAIT:
return await writer.write(payload, timeout=timeout)
else:
# 緊急降級await 一次嘗試timeout 縮短為 2s失敗寫 DLQ不阻塞太久
# 注意此路徑不保證可靠性KM_WRITE_AWAIT=false 僅供緊急回滾用
return await writer.write(payload, retry=1, timeout=2.0)