Files
awoooi/apps/api/src/repositories/knowledge_repository.py
Your Name c5753e1c57 fix(critic-review): KMWriter 名實統一 + Alertmanager 修抑制 + drift checker AST 化
critic PR review 揭示已 push commits 的 7 個 blocker,本 commit 全部修復。

## C1 + C2 + M1 + M2 + M3 — KMWriter 真正統一契約(critic 最嚴重 5 條)

### C1 km_writer.py:194 — backfill 自打臉修
- 裸 asyncio.create_task(_backfill_path_a_approval) → await _backfill_path_a_approval_safe()
- 同步 await + 獨立 DLQ km:backfill:dlq + try/except 不阻塞主寫入
- 新增 km_backfill_reconciler_job.py(每 5 分鐘掃 DLQ)+ ENABLE_KM_BACKFILL_RECONCILER flag
- 防 Path B 比 Path A 先完成 → related_approval_id 永遠 NULL 的 race

### C2 km_writer.py:391 — KM_WRITE_AWAIT=false 路徑收緊
- 從 ensure_future(fire-and-forget 比舊版同步寫更糟)
- 改 await writer.write(retry=1, timeout=2.0)(仍 await 但只試一次、超時短)
- docstring 明確標註「緊急回滾用,不保證可靠性」

### M1 decision_manager.py:2178/2203 — 移除 _fire_and_forget 旁路
- 兩處 _fire_and_forget(executor.write_execution_result_to_km(...))
- 改 await asyncio.shield(...) + BaseException 保護(防上層 cancel 中斷)
- KM_WRITE_AWAIT=true 在這條路徑終於真正 await

### M2 incident_service.py:1099 — 自製 path 加 retry+DLQ
- 原本 if settings.KM_WRITE_AWAIT: await asyncio.wait_for else create_task
- 改 3 次指數退避 retry + DLQ 保護(呼叫 km_writer 私有 helper)

### M3 km_writer.py:166 — 冪等聲明對齊實作
- knowledge_repository.create() 加 UPSERT 路徑(pg_insert ON CONFLICT DO UPDATE)
- KnowledgeEntryCreate / KnowledgeEntryRecord 加 path_type 欄位
- migration: ADD COLUMN path_type + partial unique index uix_knowledge_incident_path

## M4 alertmanager.yml — equal: [] 收緊(critic 防爆炸抑制)
- OllamaInstanceDown / KMConverterDown 抑制加 equal: ['cluster'] 約束
- 防多 cluster 場景下任一 Ollama down 誤抑全 AI/SLO 告警

## M5 Alertmanager 版本驗證(已確認 v0.31.1,遠超 v0.22+)

## M6 governance_agent.py — health score 區分 skipped vs ok vs violated
- check_slo_compliance 加 _meta {violated_count, skipped_count, ok_count, all_skipped, status}
- run_self_check: SLO 全 skipped 時獨立發 governance_slo_data_gap 告警
  (不污染 self_failure 計數,因為 no_data 是 emitter 未實作不是治理機制故障)

## M7 scripts/check_config_drift.py — 改 AST 解析
- regex 改 ast.parse 找 Settings ClassDef AnnAssign Field(default=...)
- 避免多行 list / default_factory= / 含跳行字串的 false negative
- 4 欄位(AI_FALLBACK_ORDER / ARGOCD_URL / PROMETHEUS_URL / OLLAMA_URL)全對齊

## 新增測試
- test_km_writer_backfill_reconciler.py: 7 cases(C1 reconciler + safe helper)
- test_km_writer_idempotent.py: 5 cases(M3 path_type 注入 + UPSERT 分支)

## 驗證
- 1585 unit tests 全綠(+13 從 1572)
- amtool check-config SUCCESS(8 inhibit_rules / 2 receivers)
- drift checker AST-based 4 欄位全對齊
- Alertmanager v0.31.1 確認支援新語法

## 期望影響
- KMWriter 名實統一:飛輪閉環 KM 寫入路徑 100% 可靠
- M4 抑制爆炸風險解除
- 治理層不再對 SLO no_data 靜默
- drift checker false negative 風險解除

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-29 10:44:39 +08:00

350 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Knowledge Repository - PostgreSQL 實作
=======================================
Knowledge Base Phase 1: CRUD + 搜尋
建立時間: 2026-04-02 (台北時區)
建立者: Claude Code (Knowledge Base Phase 1)
遵循 leWOOOgo 積木化原則:
- 實作 IKnowledgeRepository Protocol
- 只做資料存取,業務邏輯在 Service 層
"""
import structlog
from sqlalchemy import String, func, or_, select, text as sa_text, update
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.ext.asyncio import AsyncSession
from src.db.models import KnowledgeEntryRecord
from src.models.knowledge import (
EntryStatus,
EntryType,
KnowledgeEntry,
KnowledgeEntryCreate,
)
logger = structlog.get_logger(__name__)
class KnowledgeDBRepository:
"""
Knowledge Repository - PostgreSQL 實作
實作 IKnowledgeRepository Protocol
"""
def __init__(self, db: AsyncSession):
self.db = db
async def create(self, data: KnowledgeEntryCreate) -> KnowledgeEntry:
"""
建立知識條目。
P1-1 M3 2026-04-28 ogt + Claude Sonnet 4.6
- 若 data.path_type + data.related_incident_id 均非 None
使用 INSERT ... ON CONFLICT DO UPDATEUPSERT避免重複條目
實現 KMWriter 承諾的 (related_incident_id, path_type) 冪等 key。
- 其餘路徑(無 path_type 或無 incident_id仍用原始 INSERT。
"""
# --- UPSERT 路徑(有冪等 key 時)---
if data.path_type and data.related_incident_id:
values = dict(
title=data.title,
content=data.content,
entry_type=data.entry_type.value if hasattr(data.entry_type, "value") else data.entry_type,
category=data.category,
tags=data.tags,
source=data.source.value if hasattr(data.source, "value") else data.source,
status=data.status.value if hasattr(data.status, "value") else data.status,
related_incident_id=data.related_incident_id,
related_playbook_id=data.related_playbook_id,
related_approval_id=data.related_approval_id,
path_type=data.path_type,
symptoms_hash=data.symptoms_hash,
created_by=data.created_by,
)
# RETURNING id只取 id 標量(最安全,不依賴 ORM RETURNING 行為)
stmt = (
pg_insert(KnowledgeEntryRecord)
.values(**values)
.on_conflict_do_update(
index_elements=["related_incident_id", "path_type"],
# ON CONFLICT condition 需與 partial index 一致WHERE both NOT NULL
index_where=sa_text(
"related_incident_id IS NOT NULL AND path_type IS NOT NULL"
),
set_=dict(
title=data.title,
content=data.content,
tags=data.tags,
status=data.status.value if hasattr(data.status, "value") else data.status,
related_approval_id=data.related_approval_id,
),
)
.returning(KnowledgeEntryRecord.id)
)
result = await self.db.execute(stmt)
entry_id: str = result.scalar_one()
# UPSERT 後用 id 重新 SELECT確保取到完整的 ORM 物件
fetch_result = await self.db.execute(
select(KnowledgeEntryRecord).where(KnowledgeEntryRecord.id == entry_id)
)
record = fetch_result.scalar_one()
logger.info(
"knowledge_entry_upserted",
entry_id=record.id,
title=record.title,
path_type=data.path_type,
incident_id=data.related_incident_id,
)
return self._to_model(record)
# --- 一般 INSERT 路徑(無冪等 key 時)---
record = KnowledgeEntryRecord(
title=data.title,
content=data.content,
entry_type=data.entry_type,
category=data.category,
tags=data.tags,
source=data.source,
# 2026-04-04 ogt: Phase 25 P1 — 支援指定 statusANTI_PATTERN 直接 PUBLISHED
status=data.status,
related_incident_id=data.related_incident_id,
related_playbook_id=data.related_playbook_id,
related_approval_id=data.related_approval_id,
# P1-1 M3: path_type此路徑為 None不觸發冪等
path_type=data.path_type,
# 2026-04-04 ogt: Phase 25 P1 — Anti-Pattern 閉環用症狀 hash
symptoms_hash=data.symptoms_hash,
created_by=data.created_by,
)
self.db.add(record)
await self.db.flush()
logger.info("knowledge_entry_created", entry_id=record.id, title=record.title)
return self._to_model(record)
async def get_by_id(self, entry_id: str) -> KnowledgeEntry | None:
"""根據 ID 取得知識條目(排除 archived"""
result = await self.db.execute(
select(KnowledgeEntryRecord).where(
KnowledgeEntryRecord.id == entry_id,
KnowledgeEntryRecord.status != EntryStatus.ARCHIVED,
)
)
record = result.scalar_one_or_none()
return self._to_model(record) if record else None
async def update(self, entry_id: str, data: dict) -> KnowledgeEntry | None:
"""更新知識條目"""
result = await self.db.execute(
select(KnowledgeEntryRecord).where(KnowledgeEntryRecord.id == entry_id)
)
record = result.scalar_one_or_none()
if not record:
return None
for key, value in data.items():
if value is not None and hasattr(record, key):
setattr(record, key, value)
await self.db.flush()
logger.info("knowledge_entry_updated", entry_id=entry_id)
return self._to_model(record)
async def delete(self, entry_id: str) -> bool:
"""軟刪除 → status = archived"""
result = await self.db.execute(
update(KnowledgeEntryRecord)
.where(KnowledgeEntryRecord.id == entry_id)
.values(status=EntryStatus.ARCHIVED)
)
return result.rowcount > 0
async def list_entries(
self,
category: str | None = None,
entry_type: EntryType | None = None,
status: EntryStatus | None = None,
tags: list[str] | None = None,
q: str | None = None,
limit: int = 20,
offset: int = 0,
) -> tuple[list[KnowledgeEntry], int]:
"""列出知識條目 (支援篩選)"""
query = select(KnowledgeEntryRecord).where(
KnowledgeEntryRecord.status != EntryStatus.ARCHIVED
)
count_query = select(func.count()).select_from(KnowledgeEntryRecord).where(
KnowledgeEntryRecord.status != EntryStatus.ARCHIVED
)
if category:
query = query.where(KnowledgeEntryRecord.category == category)
count_query = count_query.where(KnowledgeEntryRecord.category == category)
if entry_type:
query = query.where(KnowledgeEntryRecord.entry_type == entry_type)
count_query = count_query.where(KnowledgeEntryRecord.entry_type == entry_type)
if status:
query = query.where(KnowledgeEntryRecord.status == status)
count_query = count_query.where(KnowledgeEntryRecord.status == status)
if tags:
for tag in tags:
tag_filter = KnowledgeEntryRecord.tags.op('@>')(f'["{tag}"]')
query = query.where(tag_filter)
count_query = count_query.where(tag_filter)
if q:
like_q = f"%{q}%"
filter_cond = or_(
KnowledgeEntryRecord.title.ilike(like_q),
KnowledgeEntryRecord.content.ilike(like_q),
)
query = query.where(filter_cond)
count_query = count_query.where(filter_cond)
total_result = await self.db.execute(count_query)
total = total_result.scalar() or 0
query = query.order_by(KnowledgeEntryRecord.updated_at.desc())
query = query.limit(limit).offset(offset)
result = await self.db.execute(query)
records = result.scalars().all()
return [self._to_model(r) for r in records], total
async def get_categories(self) -> list[tuple[str, int]]:
"""取得分類統計"""
result = await self.db.execute(
select(
KnowledgeEntryRecord.category,
func.count().label("cnt"),
)
.where(KnowledgeEntryRecord.status != EntryStatus.ARCHIVED)
.group_by(KnowledgeEntryRecord.category)
.order_by(func.count().desc())
)
return [(row.category, row.cnt) for row in result.all()]
async def search(self, query: str, limit: int = 20) -> list[KnowledgeEntry]:
"""關鍵字搜尋 (title + content + tags)"""
like_q = f"%{query}%"
result = await self.db.execute(
select(KnowledgeEntryRecord)
.where(
KnowledgeEntryRecord.status != EntryStatus.ARCHIVED,
or_(
KnowledgeEntryRecord.title.ilike(like_q),
KnowledgeEntryRecord.content.ilike(like_q),
KnowledgeEntryRecord.tags.cast(String).ilike(like_q),
),
)
.order_by(KnowledgeEntryRecord.view_count.desc())
.limit(limit)
)
records = result.scalars().all()
return [self._to_model(r) for r in records]
async def increment_view_count(self, entry_id: str) -> bool:
"""view_count +1"""
result = await self.db.execute(
update(KnowledgeEntryRecord)
.where(KnowledgeEntryRecord.id == entry_id)
.values(view_count=KnowledgeEntryRecord.view_count + 1)
)
return result.rowcount > 0
async def list_unembedded_entries(self) -> list[tuple[str, str, str]]:
"""列出尚未產生 embedding 的條目 [(id, title, content)]"""
from sqlalchemy import text as sa_text
result = await self.db.execute(
sa_text(
"SELECT id, title, content FROM knowledge_entries "
"WHERE embedding IS NULL AND status != 'ARCHIVED'"
)
)
return [(row.id, row.title, row.content) for row in result.fetchall()]
async def save_embedding(self, entry_id: str, embedding: list[float]) -> bool:
"""儲存向量 embedding (768 維)
注意: asyncpg 不支援 :param::type 語法,必須用 CAST(:param AS vector)
"""
from sqlalchemy import text as sa_text
result = await self.db.execute(
sa_text(
"UPDATE knowledge_entries SET embedding = CAST(:emb AS vector) WHERE id = :id"
),
{"emb": str(embedding), "id": entry_id},
)
return result.rowcount > 0
async def semantic_search(
self,
query_embedding: list[float],
limit: int = 10,
threshold: float = 0.5,
) -> list[tuple[KnowledgeEntry, float]]:
"""
語意搜尋 — cosine similarity (pgvector)
Returns:
list of (entry, similarity_score) 已按分數降序排列
"""
from sqlalchemy import text as sa_text
sql = sa_text("""
SELECT id, 1 - (embedding <=> CAST(:emb AS vector)) AS score
FROM knowledge_entries
WHERE status != 'ARCHIVED'
AND embedding IS NOT NULL
AND 1 - (embedding <=> CAST(:emb AS vector)) >= :threshold
ORDER BY embedding <=> CAST(:emb AS vector)
LIMIT :limit
""")
rows = await self.db.execute(
sql,
{"emb": str(query_embedding), "threshold": threshold, "limit": limit},
)
rows = rows.fetchall()
if not rows:
return []
# 批次取得完整 entry
ids = [r[0] for r in rows]
scores = {r[0]: float(r[1]) for r in rows}
result = await self.db.execute(
select(KnowledgeEntryRecord).where(KnowledgeEntryRecord.id.in_(ids))
)
records = {r.id: r for r in result.scalars().all()}
return [
(self._to_model(records[entry_id]), scores[entry_id])
for entry_id in ids
if entry_id in records
]
def _to_model(self, record: KnowledgeEntryRecord) -> KnowledgeEntry:
"""ORM Record → Pydantic Model"""
return KnowledgeEntry(
id=record.id,
title=record.title,
content=record.content,
entry_type=record.entry_type,
category=record.category,
tags=record.tags or [],
source=record.source,
status=record.status,
related_incident_id=record.related_incident_id,
related_playbook_id=record.related_playbook_id,
related_approval_id=getattr(record, "related_approval_id", None),
# P1-1 M3 2026-04-28 ogt + Claude Sonnet 4.6: 冪等 key
path_type=getattr(record, "path_type", None),
symptoms_hash=getattr(record, "symptoms_hash", None),
view_count=record.view_count,
created_by=record.created_by,
created_at=record.created_at,
updated_at=record.updated_at,
)