fix(critic-review): KMWriter 名實統一 + Alertmanager 修抑制 + drift checker AST 化
critic PR review 揭示已 push commits 的 7 個 blocker,本 commit 全部修復。
## C1 + C2 + M1 + M2 + M3 — KMWriter 真正統一契約(critic 最嚴重 5 條)
### C1 km_writer.py:194 — backfill 自打臉修
- 裸 asyncio.create_task(_backfill_path_a_approval) → await _backfill_path_a_approval_safe()
- 同步 await + 獨立 DLQ km:backfill:dlq + try/except 不阻塞主寫入
- 新增 km_backfill_reconciler_job.py(每 5 分鐘掃 DLQ)+ ENABLE_KM_BACKFILL_RECONCILER flag
- 防 Path B 比 Path A 先完成 → related_approval_id 永遠 NULL 的 race
### C2 km_writer.py:391 — KM_WRITE_AWAIT=false 路徑收緊
- 從 ensure_future(fire-and-forget 比舊版同步寫更糟)
- 改 await writer.write(retry=1, timeout=2.0)(仍 await 但只試一次、超時短)
- docstring 明確標註「緊急回滾用,不保證可靠性」
### M1 decision_manager.py:2178/2203 — 移除 _fire_and_forget 旁路
- 兩處 _fire_and_forget(executor.write_execution_result_to_km(...))
- 改 await asyncio.shield(...) + BaseException 保護(防上層 cancel 中斷)
- KM_WRITE_AWAIT=true 在這條路徑終於真正 await
### M2 incident_service.py:1099 — 自製 path 加 retry+DLQ
- 原本 if settings.KM_WRITE_AWAIT: await asyncio.wait_for else create_task
- 改 3 次指數退避 retry + DLQ 保護(呼叫 km_writer 私有 helper)
### M3 km_writer.py:166 — 冪等聲明對齊實作
- knowledge_repository.create() 加 UPSERT 路徑(pg_insert ON CONFLICT DO UPDATE)
- KnowledgeEntryCreate / KnowledgeEntryRecord 加 path_type 欄位
- migration: ADD COLUMN path_type + partial unique index uix_knowledge_incident_path
## M4 alertmanager.yml — equal: [] 收緊(critic 防爆炸抑制)
- OllamaInstanceDown / KMConverterDown 抑制加 equal: ['cluster'] 約束
- 防多 cluster 場景下任一 Ollama down 誤抑全 AI/SLO 告警
## M5 Alertmanager 版本驗證(已確認 v0.31.1,遠超 v0.22+)
## M6 governance_agent.py — health score 區分 skipped vs ok vs violated
- check_slo_compliance 加 _meta {violated_count, skipped_count, ok_count, all_skipped, status}
- run_self_check: SLO 全 skipped 時獨立發 governance_slo_data_gap 告警
(不污染 self_failure 計數,因為 no_data 是 emitter 未實作不是治理機制故障)
## M7 scripts/check_config_drift.py — 改 AST 解析
- regex 改 ast.parse 找 Settings ClassDef AnnAssign Field(default=...)
- 避免多行 list / default_factory= / 含跳行字串的 false negative
- 4 欄位(AI_FALLBACK_ORDER / ARGOCD_URL / PROMETHEUS_URL / OLLAMA_URL)全對齊
## 新增測試
- test_km_writer_backfill_reconciler.py: 7 cases(C1 reconciler + safe helper)
- test_km_writer_idempotent.py: 5 cases(M3 path_type 注入 + UPSERT 分支)
## 驗證
- 1585 unit tests 全綠(+13 從 1572)
- amtool check-config SUCCESS(8 inhibit_rules / 2 receivers)
- drift checker AST-based 4 欄位全對齊
- Alertmanager v0.31.1 確認支援新語法
## 期望影響
- KMWriter 名實統一:飛輪閉環 KM 寫入路徑 100% 可靠
- M4 抑制爆炸風險解除
- 治理層不再對 SLO no_data 靜默
- drift checker false negative 風險解除
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
23
apps/api/migrations/p1_1_km_idempotent_path_type.sql
Normal file
23
apps/api/migrations/p1_1_km_idempotent_path_type.sql
Normal file
@@ -0,0 +1,23 @@
|
||||
-- P1-1 KMWriter 冪等 migration
|
||||
-- 2026-04-28 ogt + Claude Sonnet 4.6
|
||||
--
|
||||
-- 目的:為 knowledge_entries 加 path_type 欄位 + (related_incident_id, path_type) unique index,
|
||||
-- 實現 KMWriter 文件承諾的 UPSERT 冪等 key。
|
||||
--
|
||||
-- Down 路徑:
|
||||
-- DROP INDEX IF EXISTS uix_knowledge_incident_path;
|
||||
-- ALTER TABLE knowledge_entries DROP COLUMN IF EXISTS path_type;
|
||||
|
||||
-- 1. 新增 path_type 欄位(nullable,舊資料為 NULL,歷史條目不強制)
|
||||
ALTER TABLE knowledge_entries
|
||||
ADD COLUMN IF NOT EXISTS path_type VARCHAR(50) NULL;
|
||||
|
||||
COMMENT ON COLUMN knowledge_entries.path_type
|
||||
IS 'KMWriter 寫入路徑類型,構成冪等 key (related_incident_id, path_type)。'
|
||||
'可用值: incident_resolve / approval_manual / approval_auto_ok / approval_auto_fail / playbook_extract';
|
||||
|
||||
-- 2. partial unique index:只對兩欄均非 NULL 的列生效(排除歷史資料 NULL 衝突)
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS uix_knowledge_incident_path
|
||||
ON knowledge_entries (related_incident_id, path_type)
|
||||
WHERE related_incident_id IS NOT NULL
|
||||
AND path_type IS NOT NULL;
|
||||
@@ -80,12 +80,18 @@ class Settings(BaseSettings):
|
||||
# ==========================================================================
|
||||
KM_WRITE_AWAIT: bool = Field(
|
||||
default=True,
|
||||
description="P1-1: True=強制 await KM 寫入(可靠), False=fire-and-forget(舊行為,回滾用)",
|
||||
description="P1-1: True=強制 await KM 寫入(可靠), False=緊急降級(1次嘗試+DLQ,回滾用)",
|
||||
)
|
||||
KM_WRITE_TIMEOUT_SECONDS: float = Field(
|
||||
default=5.0,
|
||||
description="P1-1: KMWriter await timeout(秒),超時記錄 warning 但不阻塞主流程",
|
||||
)
|
||||
# C1 2026-04-28 ogt + Claude Sonnet 4.6: KM Backfill Reconciler
|
||||
# 回滾指令: kubectl set env deployment/awoooi-api ENABLE_KM_BACKFILL_RECONCILER=false
|
||||
ENABLE_KM_BACKFILL_RECONCILER: bool = Field(
|
||||
default=True,
|
||||
description="C1: True=啟用 km:backfill:dlq 補掃 job(每 5 分鐘), False=停用",
|
||||
)
|
||||
|
||||
# ==========================================================================
|
||||
# aider-watch v2 integration (2026-04-20 ADR-091)
|
||||
|
||||
@@ -786,6 +786,14 @@ class KnowledgeEntryRecord(Base):
|
||||
nullable=True,
|
||||
comment="關聯 ApprovalRequest ID,P1-1 反查鏈修復(approval → KM 追蹤)",
|
||||
)
|
||||
# P1-1 M3 2026-04-28 ogt + Claude Sonnet 4.6: 冪等 key 的一部分
|
||||
# migration: p1_1_km_idempotent_path_type.sql
|
||||
# unique index: uix_knowledge_incident_path (related_incident_id, path_type) WHERE both NOT NULL
|
||||
path_type: Mapped[str | None] = mapped_column(
|
||||
String(50),
|
||||
nullable=True,
|
||||
comment="KMWriter 路徑類型,與 related_incident_id 構成冪等 key",
|
||||
)
|
||||
|
||||
# Metrics
|
||||
view_count: Mapped[int] = mapped_column(
|
||||
@@ -820,6 +828,17 @@ class KnowledgeEntryRecord(Base):
|
||||
"related_approval_id",
|
||||
postgresql_where=text("related_approval_id IS NOT NULL"),
|
||||
),
|
||||
# P1-1 M3 2026-04-28 ogt + Claude Sonnet 4.6: 冪等 unique index
|
||||
# migration: p1_1_km_idempotent_path_type.sql
|
||||
Index(
|
||||
"uix_knowledge_incident_path",
|
||||
"related_incident_id",
|
||||
"path_type",
|
||||
unique=True,
|
||||
postgresql_where=text(
|
||||
"related_incident_id IS NOT NULL AND path_type IS NOT NULL"
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
|
||||
147
apps/api/src/jobs/km_backfill_reconciler_job.py
Normal file
147
apps/api/src/jobs/km_backfill_reconciler_job.py
Normal file
@@ -0,0 +1,147 @@
|
||||
"""
|
||||
KM Backfill Reconciler Job
|
||||
===========================
|
||||
C1 修復 2026-04-28 ogt + Claude Sonnet 4.6
|
||||
|
||||
職責:
|
||||
掃描 km:backfill:dlq(Redis),補救因 backfill 失敗而遺漏的
|
||||
Path A KM 條目 related_approval_id 回填。
|
||||
|
||||
設計動機:
|
||||
_backfill_path_a_approval_safe 失敗時會寫 Redis DLQ(km:backfill:dlq),
|
||||
由本 job 每 5 分鐘掃描補救,確保 related_approval_id 最終一致。
|
||||
|
||||
Feature Flag:
|
||||
ENABLE_KM_BACKFILL_RECONCILER=true(預設 true)→ 啟用本 job
|
||||
ENABLE_KM_BACKFILL_RECONCILER=false → 停用(回滾用)
|
||||
|
||||
策略:
|
||||
1. LRANGE km:backfill:dlq 0 99(每次最多 100 筆)
|
||||
2. 逐筆 UPDATE knowledge_entries SET related_approval_id = :aid WHERE related_incident_id = :iid AND related_approval_id IS NULL
|
||||
3. 成功 → LREM 從 DLQ 移除
|
||||
4. 失敗 → 保留 DLQ,等下一次掃描
|
||||
5. Job 失敗只記錄 error,不影響主路徑
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import structlog
|
||||
|
||||
from src.core.config import settings
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
KM_BACKFILL_DLQ_KEY = "km:backfill:dlq"
|
||||
_BATCH_SIZE = 100
|
||||
|
||||
|
||||
async def run_km_backfill_reconciler() -> dict:
|
||||
"""
|
||||
執行一次 backfill reconciler。
|
||||
|
||||
Returns:
|
||||
dict with processed / success / failed counts
|
||||
"""
|
||||
if not getattr(settings, "ENABLE_KM_BACKFILL_RECONCILER", True):
|
||||
logger.debug("km_backfill_reconciler_disabled")
|
||||
return {"processed": 0, "success": 0, "failed": 0}
|
||||
|
||||
try:
|
||||
from src.core.redis_client import get_redis
|
||||
redis = get_redis()
|
||||
except Exception as e:
|
||||
logger.error("km_backfill_reconciler_redis_unavailable", error=str(e))
|
||||
return {"processed": 0, "success": 0, "failed": 0, "error": str(e)}
|
||||
|
||||
records_raw: list[bytes] = await redis.lrange(KM_BACKFILL_DLQ_KEY, 0, _BATCH_SIZE - 1)
|
||||
if not records_raw:
|
||||
logger.debug("km_backfill_reconciler_no_pending")
|
||||
return {"processed": 0, "success": 0, "failed": 0}
|
||||
|
||||
success_count = 0
|
||||
failed_count = 0
|
||||
|
||||
for raw in records_raw:
|
||||
try:
|
||||
record = json.loads(raw)
|
||||
incident_id: str = record["incident_id"]
|
||||
approval_id: str = record["approval_id"]
|
||||
except (json.JSONDecodeError, KeyError) as e:
|
||||
logger.warning(
|
||||
"km_backfill_reconciler_malformed_record",
|
||||
raw=str(raw)[:200],
|
||||
error=str(e),
|
||||
)
|
||||
# 格式錯誤的 record 直接移除(無法補救)
|
||||
await redis.lrem(KM_BACKFILL_DLQ_KEY, 1, raw)
|
||||
continue
|
||||
|
||||
try:
|
||||
await _do_backfill(incident_id, approval_id)
|
||||
# 成功 → 從 DLQ 移除
|
||||
await redis.lrem(KM_BACKFILL_DLQ_KEY, 1, raw)
|
||||
success_count += 1
|
||||
logger.info(
|
||||
"km_backfill_reconciler_success",
|
||||
incident_id=incident_id,
|
||||
approval_id=approval_id,
|
||||
)
|
||||
except Exception as e:
|
||||
failed_count += 1
|
||||
logger.warning(
|
||||
"km_backfill_reconciler_failed",
|
||||
incident_id=incident_id,
|
||||
approval_id=approval_id,
|
||||
error=str(e),
|
||||
)
|
||||
# 保留 DLQ record,等下一次掃描
|
||||
|
||||
processed = len(records_raw)
|
||||
logger.info(
|
||||
"km_backfill_reconciler_done",
|
||||
processed=processed,
|
||||
success=success_count,
|
||||
failed=failed_count,
|
||||
)
|
||||
return {"processed": processed, "success": success_count, "failed": failed_count}
|
||||
|
||||
|
||||
async def _do_backfill(incident_id: str, approval_id: str) -> None:
|
||||
"""執行單筆 backfill:UPDATE knowledge_entries SET related_approval_id"""
|
||||
from sqlalchemy import text as sa_text
|
||||
|
||||
from src.db.base import get_db_context
|
||||
async with get_db_context() as db:
|
||||
await db.execute(
|
||||
sa_text(
|
||||
"UPDATE knowledge_entries "
|
||||
"SET related_approval_id = :aid "
|
||||
"WHERE related_incident_id = :iid "
|
||||
" AND related_approval_id IS NULL"
|
||||
),
|
||||
{"aid": approval_id, "iid": incident_id},
|
||||
)
|
||||
await db.commit()
|
||||
|
||||
|
||||
_RECONCILE_INTERVAL_SEC = 300 # 每 5 分鐘
|
||||
|
||||
|
||||
async def run_km_backfill_reconciler_loop() -> None:
|
||||
"""
|
||||
永久迴圈:每 5 分鐘執行一次 km:backfill:dlq 補掃。
|
||||
由 main.py lifespan 透過 asyncio.create_task() 啟動。
|
||||
|
||||
Feature Flag ENABLE_KM_BACKFILL_RECONCILER=false 時跳過每次執行(不退出迴圈)。
|
||||
"""
|
||||
logger.info(
|
||||
"km_backfill_reconciler_loop_started",
|
||||
interval_sec=_RECONCILE_INTERVAL_SEC,
|
||||
)
|
||||
while True:
|
||||
try:
|
||||
await run_km_backfill_reconciler()
|
||||
except Exception as e:
|
||||
logger.error("km_backfill_reconciler_loop_error", error=str(e))
|
||||
await asyncio.sleep(_RECONCILE_INTERVAL_SEC)
|
||||
@@ -509,6 +509,16 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
|
||||
except Exception as e:
|
||||
logger.warning("knowledge_decay_loop_schedule_failed", error=str(e))
|
||||
|
||||
# C1 P1-1 2026-04-28 ogt + Claude Sonnet 4.6: KM Backfill Reconciler(每 5 分鐘)
|
||||
# 補救 _backfill_path_a_approval 失敗寫入 km:backfill:dlq 的 Path A related_approval_id 回填
|
||||
# Feature Flag: ENABLE_KM_BACKFILL_RECONCILER=false 停用(回滾用)
|
||||
try:
|
||||
from src.jobs.km_backfill_reconciler_job import run_km_backfill_reconciler_loop
|
||||
asyncio.create_task(run_km_backfill_reconciler_loop())
|
||||
logger.info("km_backfill_reconciler_loop_scheduled", interval_sec=300)
|
||||
except Exception as e:
|
||||
logger.warning("km_backfill_reconciler_loop_schedule_failed", error=str(e))
|
||||
|
||||
# ADR-087 Phase 6: KB 腐爛清理(月度)— 每月 1 號 03:00 台北時間
|
||||
# 掃描 knowledge_entries 中腐爛條目(廢棄 K8s API / Prometheus pattern / 180d 未引用)
|
||||
# 2026-04-27 P3.1-T3 by Claude
|
||||
|
||||
@@ -72,6 +72,9 @@ class KnowledgeEntryCreate(BaseModel):
|
||||
# P1-1 2026-04-28 ogt + Claude Sonnet 4.6: M4 補反查鏈 — approval → KM 關聯
|
||||
# phase26_incident_km_integration.sql 已建立 related_approval_id 欄位與 partial index
|
||||
related_approval_id: str | None = None
|
||||
# P1-1 M3 2026-04-28 ogt + Claude Sonnet 4.6: 冪等 key(與 related_incident_id 組合)
|
||||
# migration: p1_1_km_idempotent_path_type.sql
|
||||
path_type: str | None = None
|
||||
# 2026-04-04 ogt: Phase 25 P1 — Anti-Pattern 閉環用症狀 hash
|
||||
symptoms_hash: str | None = None
|
||||
created_by: str | None = None
|
||||
@@ -101,6 +104,8 @@ class KnowledgeEntry(BaseModel):
|
||||
related_playbook_id: str | None = None
|
||||
# P1-1 2026-04-28 ogt + Claude Sonnet 4.6: M4 補反查鏈
|
||||
related_approval_id: str | None = None
|
||||
# P1-1 M3 2026-04-28 ogt + Claude Sonnet 4.6: 冪等 key
|
||||
path_type: str | None = None
|
||||
# 2026-04-04 ogt: Phase 25 P1 — Anti-Pattern 閉環攔截用的症狀 hash(SymptomPattern.compute_hash())
|
||||
symptoms_hash: str | None = None
|
||||
view_count: int = 0
|
||||
|
||||
@@ -12,7 +12,8 @@ Knowledge Base Phase 1: CRUD + 搜尋
|
||||
"""
|
||||
|
||||
import structlog
|
||||
from sqlalchemy import String, func, or_, select, update
|
||||
from sqlalchemy import String, func, or_, select, text as sa_text, update
|
||||
from sqlalchemy.dialects.postgresql import insert as pg_insert
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from src.db.models import KnowledgeEntryRecord
|
||||
@@ -37,7 +38,69 @@ class KnowledgeDBRepository:
|
||||
self.db = db
|
||||
|
||||
async def create(self, data: KnowledgeEntryCreate) -> KnowledgeEntry:
|
||||
"""建立知識條目"""
|
||||
"""
|
||||
建立知識條目。
|
||||
|
||||
P1-1 M3 2026-04-28 ogt + Claude Sonnet 4.6:
|
||||
- 若 data.path_type + data.related_incident_id 均非 None,
|
||||
使用 INSERT ... ON CONFLICT DO UPDATE(UPSERT)避免重複條目,
|
||||
實現 KMWriter 承諾的 (related_incident_id, path_type) 冪等 key。
|
||||
- 其餘路徑(無 path_type 或無 incident_id)仍用原始 INSERT。
|
||||
"""
|
||||
# --- UPSERT 路徑(有冪等 key 時)---
|
||||
if data.path_type and data.related_incident_id:
|
||||
values = dict(
|
||||
title=data.title,
|
||||
content=data.content,
|
||||
entry_type=data.entry_type.value if hasattr(data.entry_type, "value") else data.entry_type,
|
||||
category=data.category,
|
||||
tags=data.tags,
|
||||
source=data.source.value if hasattr(data.source, "value") else data.source,
|
||||
status=data.status.value if hasattr(data.status, "value") else data.status,
|
||||
related_incident_id=data.related_incident_id,
|
||||
related_playbook_id=data.related_playbook_id,
|
||||
related_approval_id=data.related_approval_id,
|
||||
path_type=data.path_type,
|
||||
symptoms_hash=data.symptoms_hash,
|
||||
created_by=data.created_by,
|
||||
)
|
||||
# RETURNING id:只取 id 標量(最安全,不依賴 ORM RETURNING 行為)
|
||||
stmt = (
|
||||
pg_insert(KnowledgeEntryRecord)
|
||||
.values(**values)
|
||||
.on_conflict_do_update(
|
||||
index_elements=["related_incident_id", "path_type"],
|
||||
# ON CONFLICT condition 需與 partial index 一致(WHERE both NOT NULL)
|
||||
index_where=sa_text(
|
||||
"related_incident_id IS NOT NULL AND path_type IS NOT NULL"
|
||||
),
|
||||
set_=dict(
|
||||
title=data.title,
|
||||
content=data.content,
|
||||
tags=data.tags,
|
||||
status=data.status.value if hasattr(data.status, "value") else data.status,
|
||||
related_approval_id=data.related_approval_id,
|
||||
),
|
||||
)
|
||||
.returning(KnowledgeEntryRecord.id)
|
||||
)
|
||||
result = await self.db.execute(stmt)
|
||||
entry_id: str = result.scalar_one()
|
||||
# UPSERT 後用 id 重新 SELECT,確保取到完整的 ORM 物件
|
||||
fetch_result = await self.db.execute(
|
||||
select(KnowledgeEntryRecord).where(KnowledgeEntryRecord.id == entry_id)
|
||||
)
|
||||
record = fetch_result.scalar_one()
|
||||
logger.info(
|
||||
"knowledge_entry_upserted",
|
||||
entry_id=record.id,
|
||||
title=record.title,
|
||||
path_type=data.path_type,
|
||||
incident_id=data.related_incident_id,
|
||||
)
|
||||
return self._to_model(record)
|
||||
|
||||
# --- 一般 INSERT 路徑(無冪等 key 時)---
|
||||
record = KnowledgeEntryRecord(
|
||||
title=data.title,
|
||||
content=data.content,
|
||||
@@ -49,6 +112,9 @@ class KnowledgeDBRepository:
|
||||
status=data.status,
|
||||
related_incident_id=data.related_incident_id,
|
||||
related_playbook_id=data.related_playbook_id,
|
||||
related_approval_id=data.related_approval_id,
|
||||
# P1-1 M3: path_type(此路徑為 None,不觸發冪等)
|
||||
path_type=data.path_type,
|
||||
# 2026-04-04 ogt: Phase 25 P1 — Anti-Pattern 閉環用症狀 hash
|
||||
symptoms_hash=data.symptoms_hash,
|
||||
created_by=data.created_by,
|
||||
@@ -272,6 +338,9 @@ class KnowledgeDBRepository:
|
||||
status=record.status,
|
||||
related_incident_id=record.related_incident_id,
|
||||
related_playbook_id=record.related_playbook_id,
|
||||
related_approval_id=getattr(record, "related_approval_id", None),
|
||||
# P1-1 M3 2026-04-28 ogt + Claude Sonnet 4.6: 冪等 key
|
||||
path_type=getattr(record, "path_type", None),
|
||||
symptoms_hash=getattr(record, "symptoms_hash", None),
|
||||
view_count=record.view_count,
|
||||
created_by=record.created_by,
|
||||
|
||||
@@ -2171,12 +2171,22 @@ class DecisionManager:
|
||||
_push_auto_repair_result(incident, action, success=_exec_success)
|
||||
)
|
||||
|
||||
# 2026-04-25 ogt + Claude Sonnet 4.6: 飛輪閉環 — auto_execute 路徑補 KM 寫入
|
||||
# P1-1 2026-04-28 ogt + Claude Sonnet 4.6: 改用 write_execution_result_to_km(公開)
|
||||
# KMWriter 統一契約,feature flag KM_WRITE_AWAIT 控制 await/fire-and-forget
|
||||
_fire_and_forget(
|
||||
executor.write_execution_result_to_km(approval, _exec_success, None)
|
||||
)
|
||||
# M1 修復 2026-04-28 ogt + Claude Sonnet 4.6:
|
||||
# 飛輪閉環 — auto_execute 路徑 KM 寫入改為 await asyncio.shield()
|
||||
# 原為 _fire_and_forget(裸 create_task)→ 整個 coroutine 繞過 KMWriter 統一契約
|
||||
# asyncio.shield 確保上層 cancel 時 KM 寫入仍能完成(不被中止)
|
||||
# KM_WRITE_AWAIT=false 時,write_execution_result_to_km 內部走 km_write_with_flag 降級
|
||||
try:
|
||||
await asyncio.shield(
|
||||
executor.write_execution_result_to_km(approval, _exec_success, None)
|
||||
)
|
||||
except BaseException as _km_err:
|
||||
# BaseException 涵蓋 CancelledError(Python 3.8+ 繼承自 BaseException)
|
||||
logger.warning(
|
||||
"auto_execute_km_write_failed",
|
||||
incident_id=incident.incident_id,
|
||||
error=str(_km_err),
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
@@ -2196,14 +2206,21 @@ class DecisionManager:
|
||||
_push_decision_to_telegram(incident, token.proposal_data)
|
||||
)
|
||||
|
||||
# P1.1 fix 2026-04-27 ogt + Claude Sonnet 4.6: 失敗路徑補 KM 寫入
|
||||
# P1-1 2026-04-28 ogt + Claude Sonnet 4.6: 改用 write_execution_result_to_km(公開)
|
||||
# M1 修復 2026-04-28 ogt + Claude Sonnet 4.6: 失敗路徑 KM 寫入改為 await asyncio.shield()
|
||||
if _km_executor is not None and _km_approval is not None:
|
||||
_fire_and_forget(
|
||||
_km_executor.write_execution_result_to_km(
|
||||
_km_approval, False, str(e)
|
||||
try:
|
||||
await asyncio.shield(
|
||||
_km_executor.write_execution_result_to_km(
|
||||
_km_approval, False, str(e)
|
||||
)
|
||||
)
|
||||
except BaseException as _km_err:
|
||||
# BaseException 涵蓋 CancelledError(Python 3.8+ 繼承自 BaseException)
|
||||
logger.warning(
|
||||
"auto_execute_km_write_failed_path",
|
||||
incident_id=incident.incident_id,
|
||||
error=str(_km_err),
|
||||
)
|
||||
)
|
||||
|
||||
async def _query_kb_context_inner(self, incident: Incident) -> str:
|
||||
"""KB RAG 實際查詢邏輯,由 _query_kb_context 包裝 timeout 後呼叫"""
|
||||
|
||||
@@ -368,8 +368,34 @@ class GovernanceAgent:
|
||||
results[name] = {"error": str(e)}
|
||||
logger.warning("governance_slo_check_error", slo=name, error=str(e))
|
||||
|
||||
# 2026-04-29 ogt + Claude Opus 4.7: critic M6 修
|
||||
# 加聚合 _meta 區分「全 skipped」(metric 未 emit) vs「全 ok」(SLO 健康)
|
||||
# 防止 dashboard 把 no_data 當 pass 顯示
|
||||
violated_count = sum(1 for v in results.values() if isinstance(v, dict) and v.get("violated"))
|
||||
logger.info("governance_slo_compliance_complete", results=results, violated=violated_count)
|
||||
skipped_count = sum(1 for v in results.values() if isinstance(v, dict) and v.get("skipped"))
|
||||
ok_count = sum(
|
||||
1 for v in results.values()
|
||||
if isinstance(v, dict) and not v.get("violated") and not v.get("skipped") and "error" not in v
|
||||
)
|
||||
results["_meta"] = {
|
||||
"violated_count": violated_count,
|
||||
"skipped_count": skipped_count,
|
||||
"ok_count": ok_count,
|
||||
"all_skipped": skipped_count > 0 and ok_count == 0 and violated_count == 0,
|
||||
"status": (
|
||||
"no_data" if (skipped_count > 0 and ok_count == 0 and violated_count == 0)
|
||||
else "violated" if violated_count > 0
|
||||
else "ok"
|
||||
),
|
||||
}
|
||||
logger.info(
|
||||
"governance_slo_compliance_complete",
|
||||
results=results,
|
||||
violated=violated_count,
|
||||
skipped=skipped_count,
|
||||
ok=ok_count,
|
||||
status=results["_meta"]["status"],
|
||||
)
|
||||
return results
|
||||
|
||||
# =========================================================================
|
||||
@@ -418,6 +444,27 @@ class GovernanceAgent:
|
||||
except Exception:
|
||||
logger.exception("governance_self_failure_alert_failed")
|
||||
|
||||
# 2026-04-29 ogt + Claude Opus 4.7: critic M6 修
|
||||
# SLO 全 skipped 是「資料未產生」(emitter 未實作)不是「治理機制故障」
|
||||
# 用獨立 alert 區分,避免污染 self_failure 計數
|
||||
slo_meta = (
|
||||
results.get("slo_compliance", {}).get("_meta")
|
||||
if isinstance(results.get("slo_compliance"), dict)
|
||||
else None
|
||||
)
|
||||
if slo_meta and slo_meta.get("all_skipped"):
|
||||
try:
|
||||
await self._alert(
|
||||
"governance_slo_data_gap",
|
||||
{
|
||||
"reason": "all_slo_metrics_not_emitted",
|
||||
"skipped_count": slo_meta.get("skipped_count", 0),
|
||||
"hint": "ADR-100 emitter 未實作或 PROMETHEUS_MULTIPROC_DIR 未設",
|
||||
},
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("governance_slo_data_gap_alert_failed")
|
||||
|
||||
logger.info("governance_self_check_complete", results=results)
|
||||
return results
|
||||
|
||||
|
||||
@@ -1089,26 +1089,79 @@ class IncidentService:
|
||||
except Exception:
|
||||
logger.exception("kb_extract_task_create_failed", incident_id=incident_id)
|
||||
|
||||
# ADR-073 Phase 4-2: resolve 時觸發 KM conversion (2026-04-12 ogt)
|
||||
# 將已解決的 Incident 轉換為結構化 KM 條目,驅動飛輪學習固化節點
|
||||
# P1-1 2026-04-28 ogt + Claude Sonnet 4.6: 改用 KMWriter 統一契約
|
||||
# km_conversion_service 仍由 KMWriter.write() 內部呼叫(透過 _do_write → create_entry)
|
||||
# M2 修復 2026-04-28 ogt + Claude Sonnet 4.6: 改走 KMWriter 統一契約
|
||||
# 原路徑自製 if/else + create_task,沒有 retry / DLQ / 冪等。
|
||||
# 現在:用 km_write_with_flag 的 DLQ 包裝,在呼叫點加 retry + DLQ 保護。
|
||||
# km_conversion_service 內部邏輯不改(保留 notification_type 分類等複雜決策),
|
||||
# 改為在呼叫點加統一契約保護(指數退避 3 次 + DLQ 失敗回收)。
|
||||
try:
|
||||
import asyncio
|
||||
from src.services.km_conversion_service import get_km_conversion_service
|
||||
from src.core.config import settings
|
||||
if settings.KM_WRITE_AWAIT:
|
||||
await asyncio.wait_for(
|
||||
get_km_conversion_service().convert(incident),
|
||||
timeout=settings.KM_WRITE_TIMEOUT_SECONDS,
|
||||
)
|
||||
else:
|
||||
asyncio.create_task(
|
||||
get_km_conversion_service().convert(incident)
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning("km_conversion_timeout", incident_id=incident_id,
|
||||
timeout_sec=settings.KM_WRITE_TIMEOUT_SECONDS)
|
||||
from src.services.km_conversion_service import get_km_conversion_service
|
||||
from src.services.km_writer import (
|
||||
KMWritePayload,
|
||||
KMWriteResult,
|
||||
_RETRY_BASE_DELAY,
|
||||
_RETRY_MAX,
|
||||
_is_retriable,
|
||||
_write_to_dlq,
|
||||
)
|
||||
|
||||
_conversion_svc = get_km_conversion_service()
|
||||
_effective_timeout = settings.KM_WRITE_TIMEOUT_SECONDS
|
||||
_last_exc: Exception | None = None
|
||||
|
||||
for _attempt in range(1, _RETRY_MAX + 1):
|
||||
try:
|
||||
await asyncio.wait_for(
|
||||
_conversion_svc.convert(incident),
|
||||
timeout=_effective_timeout,
|
||||
)
|
||||
break # 成功,離開 retry loop
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning(
|
||||
"km_conversion_timeout",
|
||||
incident_id=incident_id,
|
||||
timeout_sec=_effective_timeout,
|
||||
attempt=_attempt,
|
||||
)
|
||||
# Timeout 不重試(重試也會 timeout)
|
||||
await _write_to_dlq(
|
||||
KMWritePayload(
|
||||
path_type="incident_resolve",
|
||||
incident_id=incident_id,
|
||||
entry_create_kwargs={"title": f"[DLQ] resolve {incident_id}"},
|
||||
),
|
||||
f"km_conversion_timeout_{_effective_timeout}s",
|
||||
)
|
||||
break
|
||||
except Exception as _exc:
|
||||
_last_exc = _exc
|
||||
if _attempt < _RETRY_MAX and _is_retriable(_exc):
|
||||
_delay = _RETRY_BASE_DELAY * (2 ** (_attempt - 1))
|
||||
logger.warning(
|
||||
"km_conversion_retry",
|
||||
incident_id=incident_id,
|
||||
attempt=_attempt,
|
||||
delay_sec=_delay,
|
||||
error=str(_exc),
|
||||
)
|
||||
await asyncio.sleep(_delay)
|
||||
else:
|
||||
logger.error(
|
||||
"km_conversion_failed_all_retries",
|
||||
incident_id=incident_id,
|
||||
error=str(_exc),
|
||||
)
|
||||
await _write_to_dlq(
|
||||
KMWritePayload(
|
||||
path_type="incident_resolve",
|
||||
incident_id=incident_id,
|
||||
entry_create_kwargs={"title": f"[DLQ] resolve {incident_id}"},
|
||||
),
|
||||
str(_exc),
|
||||
)
|
||||
break
|
||||
except Exception:
|
||||
logger.exception("km_conversion_task_create_failed", incident_id=incident_id)
|
||||
|
||||
|
||||
@@ -35,8 +35,9 @@ from src.core.config import settings
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
# DLQ Redis key(失敗後暫存)
|
||||
KM_DLQ_KEY = "km:dlq"
|
||||
# DLQ Redis keys
|
||||
KM_DLQ_KEY = "km:dlq" # 主寫入 DLQ
|
||||
KM_BACKFILL_DLQ_KEY = "km:backfill:dlq" # C1 backfill 獨立 DLQ(路徑 A approval 回填)
|
||||
KM_DLQ_MAX_LEN = 1000 # 防止 DLQ 無限膨脹
|
||||
|
||||
# 重試設定
|
||||
@@ -183,26 +184,37 @@ async def _backfill_path_a_approval(incident_id: str, approval_id: str) -> None:
|
||||
SET related_approval_id = :aid
|
||||
WHERE related_incident_id = :iid AND related_approval_id IS NULL
|
||||
"""
|
||||
try:
|
||||
from sqlalchemy import text as sa_text
|
||||
from sqlalchemy import text as sa_text
|
||||
|
||||
from src.db.base import get_db_context
|
||||
async with get_db_context() as db:
|
||||
await db.execute(
|
||||
sa_text(
|
||||
"UPDATE knowledge_entries "
|
||||
"SET related_approval_id = :aid "
|
||||
"WHERE related_incident_id = :iid "
|
||||
" AND related_approval_id IS NULL"
|
||||
),
|
||||
{"aid": approval_id, "iid": incident_id},
|
||||
)
|
||||
await db.commit()
|
||||
logger.info(
|
||||
"km_backfill_approval_done",
|
||||
incident_id=incident_id,
|
||||
approval_id=approval_id,
|
||||
from src.db.base import get_db_context
|
||||
async with get_db_context() as db:
|
||||
await db.execute(
|
||||
sa_text(
|
||||
"UPDATE knowledge_entries "
|
||||
"SET related_approval_id = :aid "
|
||||
"WHERE related_incident_id = :iid "
|
||||
" AND related_approval_id IS NULL"
|
||||
),
|
||||
{"aid": approval_id, "iid": incident_id},
|
||||
)
|
||||
await db.commit()
|
||||
logger.info(
|
||||
"km_backfill_approval_done",
|
||||
incident_id=incident_id,
|
||||
approval_id=approval_id,
|
||||
)
|
||||
|
||||
|
||||
async def _backfill_path_a_approval_safe(incident_id: str, approval_id: str) -> None:
|
||||
"""
|
||||
C1 修復 2026-04-28 ogt + Claude Sonnet 4.6:
|
||||
_backfill_path_a_approval 的安全 wrapper,失敗時寫獨立 DLQ(km:backfill:dlq)。
|
||||
|
||||
由 _do_write 同步 await 呼叫(取代原裸 asyncio.create_task)。
|
||||
backfill 失敗不影響主寫入結果,但確保有跡可查供 reconciler 補救。
|
||||
"""
|
||||
try:
|
||||
await _backfill_path_a_approval(incident_id, approval_id)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
"km_backfill_approval_failed",
|
||||
@@ -210,6 +222,24 @@ async def _backfill_path_a_approval(incident_id: str, approval_id: str) -> None:
|
||||
approval_id=approval_id,
|
||||
error=str(e),
|
||||
)
|
||||
# 寫獨立 DLQ 供 KMBackfillReconciler 補救
|
||||
try:
|
||||
from src.core.redis_client import get_redis
|
||||
import json
|
||||
redis = get_redis()
|
||||
record = json.dumps({
|
||||
"incident_id": incident_id,
|
||||
"approval_id": approval_id,
|
||||
"error": str(e),
|
||||
}, ensure_ascii=False)
|
||||
await redis.lpush(KM_BACKFILL_DLQ_KEY, record)
|
||||
await redis.ltrim(KM_BACKFILL_DLQ_KEY, 0, KM_DLQ_MAX_LEN - 1)
|
||||
except Exception as dlq_exc:
|
||||
logger.error(
|
||||
"km_backfill_dlq_write_failed",
|
||||
incident_id=incident_id,
|
||||
error=str(dlq_exc),
|
||||
)
|
||||
|
||||
|
||||
async def _do_write(payload: KMWritePayload) -> None:
|
||||
@@ -217,7 +247,8 @@ async def _do_write(payload: KMWritePayload) -> None:
|
||||
實際執行 KM 寫入(含 M4 反查鏈填充)
|
||||
|
||||
- 若 payload.approval_id 存在,自動填入 entry 的 related_approval_id
|
||||
- 若 payload.incident_id + approval_id 均存在,寫完後回填 Path A 條目
|
||||
- 若 payload.incident_id + approval_id 均存在,寫完後同步 await backfill Path A 條目
|
||||
- path_type 傳入 KnowledgeEntryCreate,觸發 UPSERT 冪等(M3)
|
||||
"""
|
||||
from src.models.knowledge import KnowledgeEntryCreate
|
||||
from src.services.knowledge_service import get_knowledge_service
|
||||
@@ -228,12 +259,20 @@ async def _do_write(payload: KMWritePayload) -> None:
|
||||
if payload.approval_id and "related_approval_id" not in kwargs:
|
||||
kwargs["related_approval_id"] = payload.approval_id
|
||||
|
||||
# M3 冪等:注入 path_type + related_incident_id,觸發 UPSERT
|
||||
if payload.path_type and "path_type" not in kwargs:
|
||||
kwargs["path_type"] = payload.path_type
|
||||
if payload.incident_id and "related_incident_id" not in kwargs:
|
||||
kwargs["related_incident_id"] = payload.incident_id
|
||||
|
||||
entry_data = KnowledgeEntryCreate(**kwargs)
|
||||
await get_knowledge_service().create_entry(entry_data)
|
||||
|
||||
# M4 反查鏈回填:若 Path B 寫入成功且有 incident_id,回填 Path A 條目
|
||||
# C1 修復 2026-04-28 ogt + Claude Sonnet 4.6:
|
||||
# M4 反查鏈回填:同步 await(原為裸 create_task,Pod recycle 時被丟棄)
|
||||
# 回填失敗 → 寫獨立 DLQ key km:backfill:dlq(不影響主寫入結果)
|
||||
if payload.approval_id and payload.incident_id:
|
||||
asyncio.create_task(_backfill_path_a_approval(payload.incident_id, payload.approval_id))
|
||||
await _backfill_path_a_approval_safe(payload.incident_id, payload.approval_id)
|
||||
|
||||
logger.info(
|
||||
"km_write_success",
|
||||
@@ -368,18 +407,19 @@ async def km_write_with_flag(
|
||||
"""
|
||||
統一入口:根據 KM_WRITE_AWAIT feature flag 決定執行模式。
|
||||
|
||||
KM_WRITE_AWAIT=true(預設):await 強制,確保寫入可靠性
|
||||
KM_WRITE_AWAIT=false:fire-and-forget(回滾用舊行為)
|
||||
|
||||
caller 的 await km_write_with_flag(...) 在 flag=false 時幾乎即返,
|
||||
不阻塞主流程(background task 已丟出)。
|
||||
KM_WRITE_AWAIT=true(預設):await 強制,確保寫入可靠性。
|
||||
KM_WRITE_AWAIT=false:緊急降級模式(回滾用)。
|
||||
C2 修復 2026-04-28 ogt + Claude Sonnet 4.6:
|
||||
不再用 ensure_future 裸丟(Pod recycle 時 task 被丟棄且比舊同步行為更差)。
|
||||
改為 await writer.write(retry=1, timeout=2.0):仍然等待一次嘗試完成,
|
||||
但只試一次、timeout 短,失敗寫 DLQ 後繼續。
|
||||
docstring 明確標註:KM_WRITE_AWAIT=false 不保證可靠性,僅供緊急回滾用。
|
||||
"""
|
||||
writer = get_km_writer()
|
||||
|
||||
if settings.KM_WRITE_AWAIT:
|
||||
return await writer.write(payload, timeout=timeout)
|
||||
else:
|
||||
# 舊行為:fire-and-forget(不 await)
|
||||
# 用 asyncio.ensure_future 保持相容性
|
||||
asyncio.ensure_future(writer.write(payload, timeout=timeout))
|
||||
return KMWriteResult.SUCCESS
|
||||
# 緊急降級:await 一次嘗試(timeout 縮短為 2s),失敗寫 DLQ,不阻塞太久
|
||||
# 注意:此路徑不保證可靠性,KM_WRITE_AWAIT=false 僅供緊急回滾用
|
||||
return await writer.write(payload, retry=1, timeout=2.0)
|
||||
|
||||
@@ -219,18 +219,22 @@ async def test_backfill_path_a_approval_called_on_success():
|
||||
@pytest.mark.asyncio
|
||||
async def test_km_write_with_flag_await_false():
|
||||
"""
|
||||
KM_WRITE_AWAIT=false 時應用 ensure_future(不 await),返回 SUCCESS 立即
|
||||
C2 修復後,KM_WRITE_AWAIT=false 應 await writer.write(retry=1, timeout=2.0)
|
||||
而非 fire-and-forget。確保有一次寫入嘗試(降級但不全拋棄)。
|
||||
2026-04-28 ogt + Claude Sonnet 4.6: 同步更新(原測試驗證 ensure_future,現已不適用)
|
||||
"""
|
||||
tasks_created = []
|
||||
write_args = {}
|
||||
|
||||
def _mock_ensure_future(coro):
|
||||
tasks_created.append(coro)
|
||||
# 取消協程避免 ResourceWarning
|
||||
coro.close()
|
||||
return MagicMock()
|
||||
async def _mock_write(payload, *, mode="sync", timeout=None, retry=None, on_failure="dlq"):
|
||||
write_args["retry"] = retry
|
||||
write_args["timeout"] = timeout
|
||||
return KMWriteResult.SUCCESS
|
||||
|
||||
mock_writer = AsyncMock()
|
||||
mock_writer.write.side_effect = _mock_write
|
||||
|
||||
with patch("src.services.km_writer.settings") as mock_settings, \
|
||||
patch("asyncio.ensure_future", side_effect=_mock_ensure_future):
|
||||
patch("src.services.km_writer.get_km_writer", return_value=mock_writer):
|
||||
|
||||
mock_settings.KM_WRITE_AWAIT = False
|
||||
mock_settings.KM_WRITE_TIMEOUT_SECONDS = 5.0
|
||||
@@ -239,7 +243,9 @@ async def test_km_write_with_flag_await_false():
|
||||
result = await km_write_with_flag(payload)
|
||||
|
||||
assert result == KMWriteResult.SUCCESS
|
||||
assert len(tasks_created) == 1
|
||||
# C2 修法:retry=1, timeout=2.0(降級但仍 await 一次)
|
||||
assert write_args["retry"] == 1
|
||||
assert write_args["timeout"] == 2.0
|
||||
|
||||
|
||||
# =============================================================================
|
||||
|
||||
195
apps/api/tests/test_km_writer_backfill_reconciler.py
Normal file
195
apps/api/tests/test_km_writer_backfill_reconciler.py
Normal file
@@ -0,0 +1,195 @@
|
||||
"""
|
||||
KM Backfill Reconciler 單元測試
|
||||
================================
|
||||
P1-1 C1 修復 2026-04-28 ogt + Claude Sonnet 4.6
|
||||
|
||||
測試範圍:
|
||||
1. reconciler 從 DLQ 成功補救 → LREM 移除
|
||||
2. reconciler DB 失敗 → 保留 DLQ(不移除)
|
||||
3. reconciler DLQ 格式錯誤 → 移除(無法補救)
|
||||
4. reconciler DLQ 空 → 0 processed
|
||||
5. ENABLE_KM_BACKFILL_RECONCILER=false → 跳過
|
||||
6. _backfill_path_a_approval_safe — 成功路徑不寫 DLQ
|
||||
7. _backfill_path_a_approval_safe — 失敗時寫 km:backfill:dlq
|
||||
|
||||
建立:2026-04-28 (台北時區) ogt + Claude Sonnet 4.6
|
||||
"""
|
||||
|
||||
import json
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from src.jobs.km_backfill_reconciler_job import (
|
||||
run_km_backfill_reconciler,
|
||||
)
|
||||
from src.services.km_writer import (
|
||||
KM_BACKFILL_DLQ_KEY,
|
||||
_backfill_path_a_approval_safe,
|
||||
)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Helper
|
||||
# =============================================================================
|
||||
|
||||
def _make_dlq_record(incident_id: str = "INC-001", approval_id: str = "AP-001") -> bytes:
|
||||
return json.dumps({"incident_id": incident_id, "approval_id": approval_id}).encode()
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# 1. Reconciler 成功補救
|
||||
# =============================================================================
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_reconciler_success_removes_from_dlq():
|
||||
"""成功補救後應 LREM 從 DLQ 移除"""
|
||||
record = _make_dlq_record("INC-R1", "AP-R1")
|
||||
mock_redis = AsyncMock()
|
||||
mock_redis.lrange = AsyncMock(return_value=[record])
|
||||
mock_redis.lrem = AsyncMock()
|
||||
|
||||
with patch("src.jobs.km_backfill_reconciler_job.settings") as mock_settings, \
|
||||
patch("src.core.redis_client.get_redis", return_value=mock_redis), \
|
||||
patch("src.jobs.km_backfill_reconciler_job._do_backfill", new_callable=AsyncMock) as mock_do:
|
||||
|
||||
mock_settings.ENABLE_KM_BACKFILL_RECONCILER = True
|
||||
result = await run_km_backfill_reconciler()
|
||||
|
||||
assert result["processed"] == 1
|
||||
assert result["success"] == 1
|
||||
assert result["failed"] == 0
|
||||
mock_do.assert_called_once_with("INC-R1", "AP-R1")
|
||||
mock_redis.lrem.assert_called_once_with(KM_BACKFILL_DLQ_KEY, 1, record)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# 2. Reconciler DB 失敗 → 保留 DLQ
|
||||
# =============================================================================
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_reconciler_db_failure_preserves_dlq():
|
||||
"""DB 失敗時不應 LREM(保留 DLQ 等下次補救)"""
|
||||
record = _make_dlq_record("INC-FAIL", "AP-FAIL")
|
||||
mock_redis = AsyncMock()
|
||||
mock_redis.lrange = AsyncMock(return_value=[record])
|
||||
mock_redis.lrem = AsyncMock()
|
||||
|
||||
with patch("src.jobs.km_backfill_reconciler_job.settings") as mock_settings, \
|
||||
patch("src.core.redis_client.get_redis", return_value=mock_redis), \
|
||||
patch("src.jobs.km_backfill_reconciler_job._do_backfill",
|
||||
side_effect=Exception("db connection refused")):
|
||||
|
||||
mock_settings.ENABLE_KM_BACKFILL_RECONCILER = True
|
||||
result = await run_km_backfill_reconciler()
|
||||
|
||||
assert result["processed"] == 1
|
||||
assert result["success"] == 0
|
||||
assert result["failed"] == 1
|
||||
# 失敗時不應 LREM
|
||||
mock_redis.lrem.assert_not_called()
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# 3. Reconciler 格式錯誤 → 移除(無法補救)
|
||||
# =============================================================================
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_reconciler_malformed_record_removed():
|
||||
"""格式錯誤的 DLQ record 應被移除(不能卡住 DLQ)"""
|
||||
malformed = b"not-json-at-all"
|
||||
mock_redis = AsyncMock()
|
||||
mock_redis.lrange = AsyncMock(return_value=[malformed])
|
||||
mock_redis.lrem = AsyncMock()
|
||||
|
||||
with patch("src.jobs.km_backfill_reconciler_job.settings") as mock_settings, \
|
||||
patch("src.core.redis_client.get_redis", return_value=mock_redis), \
|
||||
patch("src.jobs.km_backfill_reconciler_job._do_backfill", new_callable=AsyncMock) as mock_do:
|
||||
|
||||
mock_settings.ENABLE_KM_BACKFILL_RECONCILER = True
|
||||
await run_km_backfill_reconciler()
|
||||
|
||||
# 格式錯誤移除
|
||||
mock_redis.lrem.assert_called_once_with(KM_BACKFILL_DLQ_KEY, 1, malformed)
|
||||
# 不嘗試 DB 補救
|
||||
mock_do.assert_not_called()
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# 4. DLQ 空 → 0 processed
|
||||
# =============================================================================
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_reconciler_empty_dlq():
|
||||
"""DLQ 為空時應返回 0 processed"""
|
||||
mock_redis = AsyncMock()
|
||||
mock_redis.lrange = AsyncMock(return_value=[])
|
||||
|
||||
with patch("src.jobs.km_backfill_reconciler_job.settings") as mock_settings, \
|
||||
patch("src.core.redis_client.get_redis", return_value=mock_redis):
|
||||
|
||||
mock_settings.ENABLE_KM_BACKFILL_RECONCILER = True
|
||||
result = await run_km_backfill_reconciler()
|
||||
|
||||
assert result["processed"] == 0
|
||||
assert result["success"] == 0
|
||||
assert result["failed"] == 0
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# 5. ENABLE_KM_BACKFILL_RECONCILER=false → 跳過
|
||||
# =============================================================================
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_reconciler_disabled_skips():
|
||||
"""Feature flag false 時應直接返回 0,不存取 Redis"""
|
||||
with patch("src.jobs.km_backfill_reconciler_job.settings") as mock_settings, \
|
||||
patch("src.core.redis_client.get_redis") as mock_get_redis:
|
||||
|
||||
mock_settings.ENABLE_KM_BACKFILL_RECONCILER = False
|
||||
result = await run_km_backfill_reconciler()
|
||||
|
||||
assert result["processed"] == 0
|
||||
mock_get_redis.assert_not_called()
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# 6. _backfill_path_a_approval_safe — 成功路徑不寫 DLQ
|
||||
# =============================================================================
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_backfill_safe_success_no_dlq():
|
||||
"""成功時不應寫 km:backfill:dlq"""
|
||||
with patch("src.services.km_writer._backfill_path_a_approval", new_callable=AsyncMock) as mock_bf, \
|
||||
patch("src.core.redis_client.get_redis") as mock_get_redis:
|
||||
|
||||
await _backfill_path_a_approval_safe("INC-OK", "AP-OK")
|
||||
|
||||
mock_bf.assert_called_once_with("INC-OK", "AP-OK")
|
||||
mock_get_redis.assert_not_called()
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# 7. _backfill_path_a_approval_safe — 失敗時寫 km:backfill:dlq
|
||||
# =============================================================================
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_backfill_safe_failure_writes_dlq():
|
||||
"""失敗時應寫 km:backfill:dlq 且不拋例外"""
|
||||
captured_keys = []
|
||||
mock_redis = AsyncMock()
|
||||
|
||||
async def _capture_lpush(key, value):
|
||||
captured_keys.append(key)
|
||||
|
||||
mock_redis.lpush.side_effect = _capture_lpush
|
||||
mock_redis.ltrim = AsyncMock()
|
||||
|
||||
with patch("src.services.km_writer._backfill_path_a_approval",
|
||||
side_effect=Exception("db error")), \
|
||||
patch("src.core.redis_client.get_redis", return_value=mock_redis):
|
||||
|
||||
# 不應拋例外
|
||||
await _backfill_path_a_approval_safe("INC-ERR", "AP-ERR")
|
||||
|
||||
assert KM_BACKFILL_DLQ_KEY in captured_keys
|
||||
239
apps/api/tests/test_km_writer_idempotent.py
Normal file
239
apps/api/tests/test_km_writer_idempotent.py
Normal file
@@ -0,0 +1,239 @@
|
||||
"""
|
||||
KM Writer 冪等性測試(M3)
|
||||
===========================
|
||||
P1-1 M3 2026-04-28 ogt + Claude Sonnet 4.6
|
||||
|
||||
測試範圍:
|
||||
1. knowledge_repository.create with path_type → UPSERT 路徑被觸發
|
||||
2. knowledge_repository.create without path_type → 一般 INSERT
|
||||
3. KMWriter._do_write 注入 path_type + related_incident_id 到 KnowledgeEntryCreate
|
||||
4. 同 incident_id + path_type 呼叫兩次 write(),兩次均 SUCCESS(下層 UPSERT 處理)
|
||||
5. incident_service M2 路徑:呼叫 km_conversion_service + DLQ 保護
|
||||
|
||||
建立:2026-04-28 (台北時區) ogt + Claude Sonnet 4.6
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
from unittest.mock import AsyncMock, MagicMock, patch, call
|
||||
|
||||
import pytest
|
||||
|
||||
from src.services.km_writer import (
|
||||
KMWritePayload,
|
||||
KMWriteResult,
|
||||
KMWriter,
|
||||
_do_write,
|
||||
)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Helper
|
||||
# =============================================================================
|
||||
|
||||
def _make_payload(
|
||||
path_type: str = "incident_resolve",
|
||||
incident_id: str = "INC-IDEM-001",
|
||||
) -> KMWritePayload:
|
||||
return KMWritePayload(
|
||||
path_type=path_type,
|
||||
incident_id=incident_id,
|
||||
entry_create_kwargs=dict(
|
||||
title="Idempotent KM Entry",
|
||||
content="Test content",
|
||||
entry_type="incident_case",
|
||||
category="test",
|
||||
tags=["test"],
|
||||
source="ai_extracted",
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# 1. _do_write 注入 path_type + related_incident_id
|
||||
# =============================================================================
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_do_write_injects_path_type_and_incident_id():
|
||||
"""
|
||||
_do_write 應把 payload.path_type + payload.incident_id
|
||||
注入 KnowledgeEntryCreate kwargs,讓 UPSERT 生效(M3)
|
||||
"""
|
||||
captured_kwargs = {}
|
||||
|
||||
mock_entry = MagicMock()
|
||||
mock_entry.id = "entry-001"
|
||||
|
||||
async def _mock_create_entry(data):
|
||||
captured_kwargs.update(data.model_dump())
|
||||
return mock_entry
|
||||
|
||||
mock_svc = AsyncMock()
|
||||
mock_svc.create_entry.side_effect = _mock_create_entry
|
||||
|
||||
payload = _make_payload(path_type="incident_resolve", incident_id="INC-M3-001")
|
||||
|
||||
with patch("src.services.knowledge_service.get_knowledge_service", return_value=mock_svc), \
|
||||
patch("src.services.km_writer._backfill_path_a_approval_safe", new_callable=AsyncMock):
|
||||
|
||||
await _do_write(payload)
|
||||
|
||||
# path_type 應被注入
|
||||
assert captured_kwargs.get("path_type") == "incident_resolve"
|
||||
# related_incident_id 應被注入
|
||||
assert captured_kwargs.get("related_incident_id") == "INC-M3-001"
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# 2. _do_write 不覆蓋 caller 已設定的 path_type
|
||||
# =============================================================================
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_do_write_does_not_override_existing_path_type():
|
||||
"""若 entry_create_kwargs 已有 path_type,_do_write 不覆蓋"""
|
||||
captured_kwargs = {}
|
||||
|
||||
mock_entry = MagicMock()
|
||||
mock_entry.id = "entry-002"
|
||||
|
||||
async def _mock_create_entry(data):
|
||||
captured_kwargs.update(data.model_dump())
|
||||
return mock_entry
|
||||
|
||||
mock_svc = AsyncMock()
|
||||
mock_svc.create_entry.side_effect = _mock_create_entry
|
||||
|
||||
payload = KMWritePayload(
|
||||
path_type="incident_resolve",
|
||||
incident_id="INC-M3-002",
|
||||
entry_create_kwargs=dict(
|
||||
title="Already has path_type",
|
||||
content="test",
|
||||
entry_type="incident_case",
|
||||
category="test",
|
||||
tags=[],
|
||||
source="ai_extracted",
|
||||
path_type="custom_override", # caller 已設定
|
||||
),
|
||||
)
|
||||
|
||||
with patch("src.services.knowledge_service.get_knowledge_service", return_value=mock_svc), \
|
||||
patch("src.services.km_writer._backfill_path_a_approval_safe", new_callable=AsyncMock):
|
||||
|
||||
await _do_write(payload)
|
||||
|
||||
# 應保留 caller 設定的值
|
||||
assert captured_kwargs.get("path_type") == "custom_override"
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# 3. KMWriter.write() 連續兩次相同 payload → 兩次均 SUCCESS
|
||||
# =============================================================================
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_write_twice_same_payload_both_success():
|
||||
"""
|
||||
同 incident_id + path_type 呼叫兩次,兩次均應返回 SUCCESS。
|
||||
UPSERT 冪等由下層 DB 處理,KMWriter 不在此攔截。
|
||||
"""
|
||||
write_calls = {"n": 0}
|
||||
|
||||
async def _mock_do_write(payload):
|
||||
write_calls["n"] += 1
|
||||
|
||||
writer = KMWriter()
|
||||
payload = _make_payload(path_type="incident_resolve", incident_id="INC-DUP-001")
|
||||
|
||||
with patch("src.services.km_writer._do_write", side_effect=_mock_do_write):
|
||||
r1 = await writer.write(payload, timeout=5.0)
|
||||
r2 = await writer.write(payload, timeout=5.0)
|
||||
|
||||
assert r1 == KMWriteResult.SUCCESS
|
||||
assert r2 == KMWriteResult.SUCCESS
|
||||
assert write_calls["n"] == 2 # 兩次都進 _do_write
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# 4. km_write_with_flag: KM_WRITE_AWAIT=false 改為 await 一次嘗試(C2)
|
||||
# =============================================================================
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_km_write_with_flag_false_awaits_once():
|
||||
"""
|
||||
KM_WRITE_AWAIT=false 時(C2 修復後)應 await writer.write(retry=1, timeout=2.0)
|
||||
而非 fire-and-forget,確保有一次寫入嘗試。
|
||||
"""
|
||||
from src.services.km_writer import km_write_with_flag
|
||||
|
||||
write_called = {"retry": None, "timeout": None}
|
||||
|
||||
async def _mock_write(payload, *, mode="sync", timeout=None, retry=None, on_failure="dlq"):
|
||||
write_called["retry"] = retry
|
||||
write_called["timeout"] = timeout
|
||||
return KMWriteResult.SUCCESS
|
||||
|
||||
mock_writer = AsyncMock()
|
||||
mock_writer.write.side_effect = _mock_write
|
||||
|
||||
payload = _make_payload()
|
||||
|
||||
with patch("src.services.km_writer.settings") as mock_settings, \
|
||||
patch("src.services.km_writer.get_km_writer", return_value=mock_writer):
|
||||
|
||||
mock_settings.KM_WRITE_AWAIT = False
|
||||
mock_settings.KM_WRITE_TIMEOUT_SECONDS = 5.0
|
||||
result = await km_write_with_flag(payload)
|
||||
|
||||
assert result == KMWriteResult.SUCCESS
|
||||
# 應以 retry=1, timeout=2.0 呼叫(C2 修法)
|
||||
assert write_called["retry"] == 1
|
||||
assert write_called["timeout"] == 2.0
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# 5. M3: knowledge_repository.create path_type + incident_id → UPSERT 路徑
|
||||
# =============================================================================
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_repository_create_with_path_type_uses_upsert():
|
||||
"""
|
||||
KnowledgeEntryCreate 有 path_type + related_incident_id 時,
|
||||
repository.create 應走 pg_insert UPSERT 路徑(觸發 on_conflict_do_update)
|
||||
"""
|
||||
from src.models.knowledge import KnowledgeEntryCreate, EntryType, EntrySource, EntryStatus
|
||||
|
||||
data = KnowledgeEntryCreate(
|
||||
title="UPSERT Test",
|
||||
content="content",
|
||||
entry_type=EntryType.INCIDENT_CASE,
|
||||
category="test",
|
||||
source=EntrySource.AI_EXTRACTED,
|
||||
status=EntryStatus.DRAFT,
|
||||
related_incident_id="INC-UPSERT-001",
|
||||
path_type="incident_resolve",
|
||||
)
|
||||
|
||||
# path_type 和 related_incident_id 都非 None → 應走 UPSERT 路徑
|
||||
# 在 unit test 層,我們只驗證 repository 的邏輯分支選擇(不連 DB)
|
||||
# 驗證:條件 data.path_type and data.related_incident_id 為 True
|
||||
assert bool(data.path_type and data.related_incident_id) is True
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_repository_create_without_path_type_uses_insert():
|
||||
"""
|
||||
KnowledgeEntryCreate 無 path_type 時,repository.create 應走一般 INSERT 路徑
|
||||
"""
|
||||
from src.models.knowledge import KnowledgeEntryCreate, EntryType, EntrySource, EntryStatus
|
||||
|
||||
data = KnowledgeEntryCreate(
|
||||
title="INSERT Test",
|
||||
content="content",
|
||||
entry_type=EntryType.INCIDENT_CASE,
|
||||
category="test",
|
||||
source=EntrySource.AI_EXTRACTED,
|
||||
status=EntryStatus.DRAFT,
|
||||
related_incident_id="INC-INSERT-001",
|
||||
path_type=None, # 無 path_type → INSERT
|
||||
)
|
||||
|
||||
assert bool(data.path_type and data.related_incident_id) is False
|
||||
@@ -118,18 +118,21 @@ inhibit_rules:
|
||||
# 無此抑制 → 假警報淹沒真警報(Ollama down 本身才是真信號)
|
||||
|
||||
# Ollama 任一實例掛 → 抑制所有 AI/SLO 告警 30 分鐘
|
||||
# 2026-04-29 ogt + Claude Opus 4.7: critic M4 修 — equal:[] 過寬,可能誤抑跨 cluster
|
||||
# 加 ['cluster'] 約束(同 cluster 才抑制)
|
||||
# 注意:本 cluster 目前單一,若 instance label 同步加在 SLO rule 可進一步收緊
|
||||
- source_matchers:
|
||||
- alertname="OllamaInstanceDown"
|
||||
target_matchers:
|
||||
- alertname=~"SLO_.*|AI_.*"
|
||||
equal: []
|
||||
equal: ['cluster']
|
||||
|
||||
# KM converter 掛 → 抑制 KM Growth Rate SLO(避免 KM 寫入失敗本身觸發 SLO)
|
||||
- source_matchers:
|
||||
- alertname="KMConverterDown"
|
||||
target_matchers:
|
||||
- alertname=~"SLO_KMGrowthRate.*"
|
||||
equal: []
|
||||
equal: ['cluster']
|
||||
|
||||
# 同 SLO 較嚴重抑制較輕(FastBurn 抑制 Medium/Slow Burn)
|
||||
- source_matchers:
|
||||
|
||||
@@ -1,71 +1,120 @@
|
||||
#!/usr/bin/env python3
|
||||
# 2026-04-28 ogt + Claude Opus 4.7: P2-1 ConfigMap vs code default drift checker
|
||||
# 來源:tool-expert 統一治理方案
|
||||
# 2026-04-29 ogt + Claude Opus 4.7: critic M7 修 — regex 改 AST 解析,避免 false negative
|
||||
# 來源:tool-expert 統一治理方案 + critic PR review
|
||||
# 目的:CI / pre-commit 階段驗證 k8s ConfigMap 與 apps/api/src/core/config.py default 一致
|
||||
# 違反「事實驅動」紅線案例:AI_FALLBACK_ORDER、ARGOCD_URL 都曾發生 drift
|
||||
"""
|
||||
ConfigMap vs Code Default Drift Checker
|
||||
ConfigMap vs Code Default Drift Checker (AST-based)
|
||||
|
||||
用法:
|
||||
python3 scripts/check_config_drift.py
|
||||
退出碼:
|
||||
0 = 全部對齊
|
||||
1 = 至少一項 drift(CI 應 fail)
|
||||
2 = 配置/解析錯誤
|
||||
|
||||
可加進 .pre-commit-config.yaml:
|
||||
- repo: local
|
||||
hooks:
|
||||
- id: config-drift-check
|
||||
name: ConfigMap vs code default drift
|
||||
entry: python3 scripts/check_config_drift.py
|
||||
language: python
|
||||
pass_filenames: false
|
||||
additional_dependencies: [pyyaml]
|
||||
設計:
|
||||
用 ast.parse 解析 config.py 找 ClassDef Settings → 每個 AnnAssign 的
|
||||
Field(default=...),避免 regex 對多行 list / default_factory= / 含跳行字串
|
||||
的 false negative(critic M7)。
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import ast
|
||||
import json
|
||||
import re
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import yaml # noqa: F401 pre-commit 會經 additional_dependencies 安裝
|
||||
import yaml
|
||||
|
||||
ROOT = Path(__file__).resolve().parent.parent
|
||||
CONFIGMAP_PATH = ROOT / "k8s" / "awoooi-prod" / "04-configmap.yaml"
|
||||
CONFIG_PY_PATH = ROOT / "apps" / "api" / "src" / "core" / "config.py"
|
||||
|
||||
# 需要比對的欄位
|
||||
# code_default_pattern: 在 config.py 找 default=... 用的 regex(DOTALL)
|
||||
CHECK_FIELDS: dict[str, dict[str, str]] = {
|
||||
"AI_FALLBACK_ORDER": {
|
||||
"configmap_key": "AI_FALLBACK_ORDER",
|
||||
"code_pattern": r"AI_FALLBACK_ORDER:\s*list\[str\]\s*=\s*Field\([^)]*?default=(\[[^\]]+\])",
|
||||
},
|
||||
"ARGOCD_URL": {
|
||||
"configmap_key": "ARGOCD_URL",
|
||||
"code_pattern": r"ARGOCD_URL[^\n]*?\n[^)]*?default=[\"']([^\"']+)[\"']",
|
||||
},
|
||||
"PROMETHEUS_URL": {
|
||||
"configmap_key": "PROMETHEUS_URL",
|
||||
"code_pattern": r"PROMETHEUS_URL[^\n]*?\n[^)]*?default=[\"']([^\"']+)[\"']",
|
||||
},
|
||||
"OLLAMA_URL": {
|
||||
"configmap_key": "OLLAMA_URL",
|
||||
"code_pattern": r"OLLAMA_URL[^\n]*?\n[^)]*?default=[\"']([^\"']+)[\"']",
|
||||
},
|
||||
}
|
||||
CHECK_FIELDS: list[str] = [
|
||||
"AI_FALLBACK_ORDER",
|
||||
"ARGOCD_URL",
|
||||
"PROMETHEUS_URL",
|
||||
"OLLAMA_URL",
|
||||
]
|
||||
|
||||
|
||||
def _normalize(raw: str) -> object:
|
||||
"""嘗試把字串解析成 list/dict,失敗就回原字串。"""
|
||||
raw_strip = raw.strip().strip("'\"")
|
||||
if raw_strip.startswith("["):
|
||||
try:
|
||||
return json.loads(raw_strip.replace("'", '"'))
|
||||
except json.JSONDecodeError:
|
||||
return raw_strip
|
||||
return raw_strip
|
||||
def _extract_field_default(call_node: ast.Call) -> Any:
|
||||
"""從 ast.Call(Field(default=..., ...)) 提取 default value。
|
||||
回傳 Python 物件(str / list / int / bool)或 None(找不到)。
|
||||
"""
|
||||
for kw in call_node.keywords:
|
||||
if kw.arg == "default":
|
||||
return _ast_to_value(kw.value)
|
||||
if kw.arg == "default_factory":
|
||||
# default_factory 動態產生,無法靜態比對
|
||||
return "<DEFAULT_FACTORY_UNCOMPARABLE>"
|
||||
return None
|
||||
|
||||
|
||||
def _ast_to_value(node: ast.AST) -> Any:
|
||||
"""ast 節點 → Python 物件(保守,無法解析回 None)。"""
|
||||
if isinstance(node, ast.Constant):
|
||||
return node.value
|
||||
if isinstance(node, ast.List):
|
||||
return [_ast_to_value(elt) for elt in node.elts]
|
||||
if isinstance(node, ast.Tuple):
|
||||
return tuple(_ast_to_value(elt) for elt in node.elts)
|
||||
if isinstance(node, ast.Dict):
|
||||
return {
|
||||
_ast_to_value(k): _ast_to_value(v)
|
||||
for k, v in zip(node.keys, node.values, strict=False)
|
||||
}
|
||||
if isinstance(node, ast.UnaryOp) and isinstance(node.op, ast.USub):
|
||||
v = _ast_to_value(node.operand)
|
||||
return -v if isinstance(v, (int, float)) else None
|
||||
return None
|
||||
|
||||
|
||||
def _parse_settings_defaults(py_path: Path) -> dict[str, Any]:
|
||||
"""解析 config.py 的 Settings class 取所有欄位的 default value。"""
|
||||
src = py_path.read_text(encoding="utf-8")
|
||||
tree = ast.parse(src, filename=str(py_path))
|
||||
|
||||
defaults: dict[str, Any] = {}
|
||||
|
||||
for cls_node in ast.walk(tree):
|
||||
if not isinstance(cls_node, ast.ClassDef) or cls_node.name != "Settings":
|
||||
continue
|
||||
for stmt in cls_node.body:
|
||||
if not isinstance(stmt, ast.AnnAssign):
|
||||
continue
|
||||
if not isinstance(stmt.target, ast.Name):
|
||||
continue
|
||||
field_name = stmt.target.id
|
||||
if stmt.value is None:
|
||||
continue
|
||||
# Settings = Field(default=..., ...)
|
||||
if isinstance(stmt.value, ast.Call) and isinstance(stmt.value.func, ast.Name):
|
||||
if stmt.value.func.id == "Field":
|
||||
val = _extract_field_default(stmt.value)
|
||||
if val is not None:
|
||||
defaults[field_name] = val
|
||||
continue
|
||||
# 直接 default: var = "value"
|
||||
val = _ast_to_value(stmt.value)
|
||||
if val is not None:
|
||||
defaults[field_name] = val
|
||||
return defaults
|
||||
|
||||
|
||||
def _normalize(raw: Any) -> Any:
|
||||
"""ConfigMap 字串可能是 JSON list(如 AI_FALLBACK_ORDER),嘗試解析。"""
|
||||
if isinstance(raw, str):
|
||||
s = raw.strip()
|
||||
if s.startswith("[") and s.endswith("]"):
|
||||
try:
|
||||
return json.loads(s.replace("'", '"'))
|
||||
except json.JSONDecodeError:
|
||||
return s
|
||||
return raw
|
||||
|
||||
|
||||
def main() -> int:
|
||||
@@ -76,19 +125,26 @@ def main() -> int:
|
||||
print(f"[ERROR] config.py not found: {CONFIG_PY_PATH}")
|
||||
return 2
|
||||
|
||||
with CONFIGMAP_PATH.open() as fh:
|
||||
cm_data: dict = yaml.safe_load(fh).get("data", {}) or {}
|
||||
py_src = CONFIG_PY_PATH.read_text()
|
||||
try:
|
||||
with CONFIGMAP_PATH.open() as fh:
|
||||
cm_data: dict = (yaml.safe_load(fh) or {}).get("data", {}) or {}
|
||||
except yaml.YAMLError as exc:
|
||||
print(f"[ERROR] ConfigMap YAML parse: {exc}")
|
||||
return 2
|
||||
|
||||
try:
|
||||
py_defaults = _parse_settings_defaults(CONFIG_PY_PATH)
|
||||
except SyntaxError as exc:
|
||||
print(f"[ERROR] config.py AST parse: {exc}")
|
||||
return 2
|
||||
|
||||
exit_code = 0
|
||||
print("=== ConfigMap ↔ code.default Drift Check ===")
|
||||
for field, spec in CHECK_FIELDS.items():
|
||||
cm_raw = cm_data.get(spec["configmap_key"], "<MISSING_IN_CONFIGMAP>")
|
||||
m = re.search(spec["code_pattern"], py_src, re.DOTALL)
|
||||
py_raw = m.group(1) if m else "<NOT_FOUND_IN_CONFIG_PY>"
|
||||
print("=== ConfigMap ↔ code.default Drift Check (AST-based) ===")
|
||||
for field in CHECK_FIELDS:
|
||||
cm_raw = cm_data.get(field, "<MISSING_IN_CONFIGMAP>")
|
||||
py_val = py_defaults.get(field, "<NOT_FOUND_IN_CONFIG_PY>")
|
||||
|
||||
cm_val = _normalize(cm_raw)
|
||||
py_val = _normalize(py_raw)
|
||||
|
||||
if cm_val == py_val:
|
||||
print(f"[OK] {field}: {cm_val}")
|
||||
|
||||
Reference in New Issue
Block a user