Files
awoooi/apps/api/src/services/approval_db.py
OG T 658337ec18
Some checks failed
CD Pipeline / build-and-deploy (push) Failing after 1m29s
Type Sync Check / check-type-sync (push) Failing after 52s
fix(phase26): 打通 Incident→DB→KM 完整鏈路 + namespace 修正
問題根因:
1. create_incident_for_approval 只存 Redis,不存 PostgreSQL
   → TTL 7天後消失,Playbook 萃取永遠找不到 Incident
2. ApprovalRecord 無 incident_id 欄位
   → _trigger_playbook_extraction 靠 regex 掃中文文字找 INC-,永遠失敗
3. operation_parser namespace fallback 是 "default"
   → 所有 deployment 在 awoooi-prod,203 次執行全失敗

修復:
- Incident 同時寫入 Redis + PostgreSQL (save_to_episodic_memory)
- ApprovalRecord 加入 incident_id 欄位 (model + ORM + migration)
- alertmanager_webhook 建立 Approval 後回寫 incident_id
- _trigger_playbook_extraction 直接用 approval.incident_id
- operation_parser DEFAULT_NAMESPACE = "awoooi-prod"

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-06 11:46:05 +08:00

804 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Database-based Approval Service
================================
Phase 5: 永久記憶植入
將 TrustEngine 的 in-memory 邏輯轉換為資料庫 CRUD 操作。
重啟後資料完好無缺。
Features:
- SQLAlchemy async CRUD
- ApprovalRecord 持久化
- TimelineEvent 持久化
- 與原有 API 契約相容
"""
from datetime import UTC, datetime, timedelta
from typing import Any
from uuid import UUID
import structlog
from sqlalchemy import and_, or_, select, update
from src.core.trust_engine import classify_risk, get_required_signatures
from src.db.base import get_db_context
from src.db.models import ApprovalRecord, TimelineEvent
from src.models.approval import (
ApprovalRequest,
ApprovalRequestCreate,
ApprovalStatus,
BlastRadius,
DataImpact,
DryRunCheck,
RiskLevel,
Signature,
)
logger = structlog.get_logger(__name__)
# =============================================================================
# Conversion Helpers
# =============================================================================
def approval_record_to_request(record: ApprovalRecord) -> ApprovalRequest:
"""
Convert DB ApprovalRecord to Pydantic ApprovalRequest
保持 API 契約相容性
"""
# Parse blast_radius from JSON
blast_radius = None
if record.blast_radius:
br = record.blast_radius
blast_radius = BlastRadius(
affected_pods=br.get("affected_pods", 0),
estimated_downtime=br.get("estimated_downtime", "0"),
related_services=br.get("related_services", []),
data_impact=DataImpact(br.get("data_impact", "none").lower())
if br.get("data_impact")
else DataImpact.NONE,
)
# Parse dry_run_checks from JSON
dry_run_checks = []
if record.dry_run_checks:
for check in record.dry_run_checks:
dry_run_checks.append(
DryRunCheck(
name=check.get("name", ""),
passed=check.get("passed", True),
message=check.get("message"),
)
)
# Parse signatures from JSON
signatures = []
if record.signatures:
for sig in record.signatures:
signatures.append(
Signature(
signer_id=sig.get("signer_id", ""),
signer_name=sig.get("signer_name", ""),
timestamp=datetime.fromisoformat(sig["timestamp"])
if sig.get("timestamp")
else datetime.now(UTC),
comment=sig.get("comment"),
)
)
return ApprovalRequest(
id=UUID(record.id),
action=record.action,
description=record.description,
status=ApprovalStatus(record.status.value if hasattr(record.status, 'value') else record.status),
risk_level=RiskLevel(record.risk_level.value if hasattr(record.risk_level, 'value') else record.risk_level),
blast_radius=blast_radius,
dry_run_checks=dry_run_checks,
required_signatures=record.required_signatures,
current_signatures=record.current_signatures,
signatures=signatures,
requested_by=record.requested_by,
created_at=record.created_at,
expires_at=record.expires_at,
resolved_at=record.resolved_at,
rejection_reason=record.rejection_reason,
metadata=record.extra_metadata,
# 戰略 B: 告警風暴收斂
fingerprint=record.fingerprint,
hit_count=record.hit_count,
last_seen_at=record.last_seen_at,
)
def approval_request_to_record_data(
request: ApprovalRequestCreate,
risk_level: RiskLevel,
required_sigs: int,
fingerprint: str | None = None, # 戰略 B: 告警指紋
) -> dict[str, Any]:
"""
Convert ApprovalRequestCreate to dict for ApprovalRecord creation
"""
blast_radius_dict = None
if request.blast_radius:
blast_radius_dict = {
"affected_pods": request.blast_radius.affected_pods,
"estimated_downtime": request.blast_radius.estimated_downtime,
"related_services": request.blast_radius.related_services,
"data_impact": request.blast_radius.data_impact.value.lower()
if request.blast_radius.data_impact
else "none",
}
dry_run_checks_list = []
if request.dry_run_checks:
for check in request.dry_run_checks:
dry_run_checks_list.append({
"name": check.name,
"passed": check.passed,
"message": check.message,
})
now = datetime.now(UTC)
return {
"action": request.action,
"description": request.description,
"status": ApprovalStatus.APPROVED if risk_level == RiskLevel.LOW else ApprovalStatus.PENDING,
"risk_level": risk_level,
"required_signatures": required_sigs,
"current_signatures": 0,
"signatures": [],
"blast_radius": blast_radius_dict or {},
"dry_run_checks": dry_run_checks_list,
"requested_by": request.requested_by,
"expires_at": request.expires_at,
"extra_metadata": request.metadata,
"resolved_at": now if risk_level == RiskLevel.LOW else None,
# 戰略 B: 告警風暴收斂
"fingerprint": fingerprint,
"hit_count": 1,
"last_seen_at": now,
}
# =============================================================================
# Database Approval Service
# =============================================================================
class ApprovalDBService:
"""
資料庫授權服務 - 替代 in-memory TrustEngine
所有操作皆為資料庫 CRUD重啟後資料保持
Phase 16 R3.4: 支援 Repository 注入
- 有注入 repository: 使用 Repository 層
- 無注入: 使用內嵌 DB 操作 (向下相容)
"""
def __init__(self, repository=None):
"""
初始化 ApprovalDBService
Args:
repository: IApprovalRepository 實例 (可選Phase 16 DI)
"""
self._repository = repository
async def create_approval(
self,
request: ApprovalRequestCreate,
) -> ApprovalRequest:
"""
建立新授權請求 (寫入資料庫)
"""
# 分類風險
risk_level = classify_risk(
action=request.action,
blast_radius=request.blast_radius,
explicit_level=request.risk_level,
)
# 取得所需簽核數
required_sigs = get_required_signatures(risk_level)
# 準備資料
data = approval_request_to_record_data(request, risk_level, required_sigs)
async with get_db_context() as db:
record = ApprovalRecord(**data)
db.add(record)
await db.flush()
await db.refresh(record)
logger.info(
"approval_created_db",
id=record.id,
risk_level=risk_level.value,
status=record.status.value if hasattr(record.status, 'value') else record.status,
)
return approval_record_to_request(record)
# =========================================================================
# 戰略 B: 告警風暴收斂
# =========================================================================
async def create_approval_with_fingerprint(
self,
request: ApprovalRequestCreate,
fingerprint: str,
) -> ApprovalRequest:
"""
建立帶指紋的授權請求 (戰略 B)
用於告警收斂:相同指紋的告警會被聚合
"""
risk_level = classify_risk(
action=request.action,
blast_radius=request.blast_radius,
explicit_level=request.risk_level,
)
required_sigs = get_required_signatures(risk_level)
data = approval_request_to_record_data(request, risk_level, required_sigs, fingerprint=fingerprint)
async with get_db_context() as db:
record = ApprovalRecord(**data)
db.add(record)
await db.flush()
await db.refresh(record)
logger.info(
"approval_created_with_fingerprint",
id=record.id,
fingerprint=fingerprint,
risk_level=risk_level.value,
)
return approval_record_to_request(record)
async def find_by_fingerprint(
self,
fingerprint: str,
debounce_minutes: int = 5,
) -> ApprovalRequest | None:
"""
根據指紋查詢現有的告警記錄 (戰略 B)
查詢條件:
1. 相同指紋
2. 狀態為 PENDING
3. 在 debounce_minutes 分鐘內建立
Returns:
ApprovalRequest if found, None otherwise
"""
now = datetime.now(UTC)
cutoff_time = now - timedelta(minutes=debounce_minutes)
async with get_db_context() as db:
result = await db.execute(
select(ApprovalRecord)
.where(ApprovalRecord.fingerprint == fingerprint)
.where(
or_(
ApprovalRecord.status == ApprovalStatus.PENDING,
ApprovalRecord.created_at >= cutoff_time,
)
)
.order_by(ApprovalRecord.created_at.desc())
.limit(1)
)
record = result.scalar_one_or_none()
if record:
logger.info(
"fingerprint_match_found",
fingerprint=fingerprint,
approval_id=record.id,
hit_count=record.hit_count,
status=record.status.value if hasattr(record.status, 'value') else record.status,
)
return approval_record_to_request(record)
return None
async def increment_hit_count(
self,
approval_id: UUID,
) -> ApprovalRequest | None:
"""
增加告警聚合次數 (戰略 B)
當相同指紋的告警再次觸發時:
1. hit_count += 1
2. last_seen_at = now
這樣可以跳過 LLM 分析,節省 API 成本!
"""
now = datetime.now(UTC)
async with get_db_context() as db:
# 更新 hit_count 和 last_seen_at
result = await db.execute(
update(ApprovalRecord)
.where(ApprovalRecord.id == str(approval_id))
.values(
hit_count=ApprovalRecord.hit_count + 1,
last_seen_at=now,
)
.returning(ApprovalRecord.hit_count)
)
new_count = result.scalar_one_or_none()
if new_count is None:
return None
# 重新讀取完整記錄
result = await db.execute(
select(ApprovalRecord).where(ApprovalRecord.id == str(approval_id))
)
record = result.scalar_one_or_none()
if record:
logger.info(
"hit_count_incremented",
approval_id=str(approval_id),
new_hit_count=new_count,
last_seen_at=now.isoformat(),
)
return approval_record_to_request(record)
return None
async def get_approval(self, approval_id: UUID) -> ApprovalRequest | None:
"""
取得單一授權請求
Phase 16 R3.4: 支援 Repository 注入
"""
# Phase 16: 使用 Repository (如果有注入)
if self._repository:
return await self._repository.get_by_id(approval_id)
# Legacy: 內嵌 DB 操作
async with get_db_context() as db:
result = await db.execute(
select(ApprovalRecord).where(ApprovalRecord.id == str(approval_id))
)
record = result.scalar_one_or_none()
if record is None:
return None
return approval_record_to_request(record)
async def get_pending_approvals(self) -> list[ApprovalRequest]:
"""
取得所有待簽核請求
Phase 16 R3.4: 支援 Repository 注入
"""
# Phase 16: 使用 Repository (如果有注入)
if self._repository:
return await self._repository.get_pending()
# Legacy: 內嵌 DB 操作
now = datetime.now(UTC)
async with get_db_context() as db:
# 先更新過期的請求
await db.execute(
update(ApprovalRecord)
.where(ApprovalRecord.status == ApprovalStatus.PENDING)
.where(ApprovalRecord.expires_at < now)
.values(status=ApprovalStatus.EXPIRED, resolved_at=now)
)
# 取得所有 PENDING
result = await db.execute(
select(ApprovalRecord)
.where(ApprovalRecord.status == ApprovalStatus.PENDING)
.order_by(ApprovalRecord.created_at.desc())
)
records = result.scalars().all()
return [approval_record_to_request(r) for r in records]
async def sign_approval(
self,
approval_id: UUID,
signer_id: str,
signer_name: str,
comment: str | None = None,
) -> tuple[ApprovalRequest | None, str, bool]:
"""
簽核授權請求
Phase 5: 使用 FOR UPDATE 行鎖防止 Race Condition
當多人同時簽核時,確保只有一人能成功取得鎖並更新
Returns:
(approval, message, execution_triggered)
"""
async with get_db_context() as db:
# Phase 5: FOR UPDATE 行級鎖 - 防止併發簽核競爭
# SQLite 不支援 FOR UPDATE但 PostgreSQL 完整支援
result = await db.execute(
select(ApprovalRecord)
.where(ApprovalRecord.id == str(approval_id))
.with_for_update() # Row-Level Lock
)
record = result.scalar_one_or_none()
logger.info(
"sign_approval_lock_acquired",
approval_id=str(approval_id),
signer_id=signer_id,
)
if record is None:
return None, "Approval not found", False
# 檢查狀態
status_value = record.status.value if hasattr(record.status, 'value') else record.status
if status_value != "pending":
return (
approval_record_to_request(record),
f"Cannot sign: status is {status_value}",
False,
)
# 檢查是否已簽核
signatures = record.signatures or []
for sig in signatures:
if sig.get("signer_id") == signer_id:
return (
approval_record_to_request(record),
f"User {signer_name} has already signed this approval",
False,
)
# Phase 5: 樂觀鎖 - 記錄更新前的簽名數
old_sig_count = record.current_signatures
# 新增簽章
new_signature = {
"signer_id": signer_id,
"signer_name": signer_name,
"timestamp": datetime.now(UTC).isoformat(),
"comment": comment,
}
signatures.append(new_signature)
new_sig_count = len(signatures)
# 計算新狀態
execution_triggered = False
new_status = record.status
resolved_at = None
if new_sig_count >= record.required_signatures:
new_status = ApprovalStatus.APPROVED
resolved_at = datetime.now(UTC)
execution_triggered = True
# Phase 5: 樂觀鎖更新 - 使用 WHERE current_signatures = old_value
# 如果其他人已更新,這個 UPDATE 會更新 0 行
result = await db.execute(
update(ApprovalRecord)
.where(and_(
ApprovalRecord.id == str(approval_id),
ApprovalRecord.current_signatures == old_sig_count, # 樂觀鎖條件
))
.values(
signatures=signatures,
current_signatures=new_sig_count,
status=new_status,
resolved_at=resolved_at,
)
)
# 檢查是否更新成功
if result.rowcount == 0:
logger.warning(
"sign_approval_optimistic_lock_conflict",
approval_id=str(approval_id),
signer_id=signer_id,
old_sig_count=old_sig_count,
)
return (
approval_record_to_request(record),
"Concurrent modification detected. Please retry.",
False,
)
# 重新讀取更新後的記錄
result = await db.execute(
select(ApprovalRecord).where(ApprovalRecord.id == str(approval_id))
)
record = result.scalar_one()
if execution_triggered:
message = f"Approval complete! ({new_sig_count}/{record.required_signatures} signatures)"
else:
message = f"Signature added ({new_sig_count}/{record.required_signatures})"
logger.info(
"approval_signed_db",
id=record.id,
signer=signer_name,
current=record.current_signatures,
required=record.required_signatures,
execution_triggered=execution_triggered,
)
return approval_record_to_request(record), message, execution_triggered
async def reject_approval(
self,
approval_id: UUID,
rejector_id: str,
rejector_name: str,
reason: str,
) -> tuple[ApprovalRequest | None, str]:
"""
拒絕授權請求
"""
async with get_db_context() as db:
result = await db.execute(
select(ApprovalRecord).where(ApprovalRecord.id == str(approval_id))
)
record = result.scalar_one_or_none()
if record is None:
return None, "Approval not found"
status_value = record.status.value if hasattr(record.status, 'value') else record.status
if status_value != "pending":
return (
approval_record_to_request(record),
f"Cannot reject: status is {status_value}",
)
record.status = ApprovalStatus.REJECTED
record.rejection_reason = f"{rejector_name}: {reason}"
record.resolved_at = datetime.now(UTC)
await db.flush()
await db.refresh(record)
logger.info(
"approval_rejected_db",
id=record.id,
rejector=rejector_name,
reason=reason,
)
return approval_record_to_request(record), "Approval rejected"
async def update_execution_status(
self,
approval_id: UUID,
success: bool,
) -> None:
"""
更新執行狀態
"""
async with get_db_context() as db:
status = ApprovalStatus.EXECUTION_SUCCESS if success else ApprovalStatus.EXECUTION_FAILED
await db.execute(
update(ApprovalRecord)
.where(ApprovalRecord.id == str(approval_id))
.values(status=status)
)
logger.info(
"approval_execution_status_updated",
id=str(approval_id),
success=success,
)
async def update_incident_id(self, approval_id: UUID, incident_id: str) -> None:
"""
2026-04-06 ogt: Phase 26 — 回寫 incident_id 到 approval_records
讓 Playbook 萃取和 KM 寫入能找到對應的 Incident
"""
async with get_db_context() as db:
await db.execute(
update(ApprovalRecord)
.where(ApprovalRecord.id == str(approval_id))
.values(incident_id=incident_id)
)
# =========================================================================
# Phase 6.4h: Proposals API 支援方法
# =========================================================================
async def get_approval_by_id(self, approval_id: UUID) -> ApprovalRequest | None:
"""
根據 ID 取得單一授權請求 (Phase 6.4h)
Args:
approval_id: 授權請求 UUID
Returns:
ApprovalRequest if found, None otherwise
"""
async with get_db_context() as db:
result = await db.execute(
select(ApprovalRecord).where(ApprovalRecord.id == str(approval_id))
)
record = result.scalar_one_or_none()
if record is None:
return None
return approval_record_to_request(record)
async def get_all_approvals(
self,
status: ApprovalStatus | None = None,
incident_id: str | None = None,
limit: int = 50,
offset: int = 0,
) -> list[ApprovalRequest]:
"""
取得所有授權請求 (Phase 6.4h)
Args:
status: 狀態篩選 (可選)
incident_id: Incident ID 篩選 (可選)
limit: 每頁數量
offset: 偏移量
Returns:
ApprovalRequest 清單
"""
async with get_db_context() as db:
query = select(ApprovalRecord)
# 狀態篩選
if status is not None:
query = query.where(ApprovalRecord.status == status)
# Incident ID 篩選 (從 extra_metadata JSON 欄位)
# NOTE: 這是基於 JSON 欄位查詢,效能可能受影響
# 若有效能問題,考慮新增 incident_id 欄位到 ApprovalRecord
query = query.order_by(ApprovalRecord.created_at.desc())
query = query.offset(offset).limit(limit)
result = await db.execute(query)
records = result.scalars().all()
approvals = [approval_record_to_request(r) for r in records]
# 若有 incident_id 篩選,在應用層過濾
if incident_id:
approvals = [
a for a in approvals
if a.metadata and a.metadata.get("incident_id") == incident_id
]
return approvals
# =============================================================================
# Timeline Event Service
# =============================================================================
class TimelineDBService:
"""
時間軸事件服務 - Phase 4 Action Timeline 持久化
"""
async def add_event(
self,
event_type: str,
status: str,
title: str,
description: str | None = None,
actor: str | None = None,
actor_role: str | None = None,
risk_level: str | None = None,
approval_id: str | None = None,
) -> dict[str, Any]:
"""
新增時間軸事件
"""
async with get_db_context() as db:
event = TimelineEvent(
event_type=event_type,
status=status,
title=title,
description=description,
actor=actor,
actor_role=actor_role,
risk_level=risk_level,
approval_id=approval_id,
)
db.add(event)
await db.flush()
await db.refresh(event)
logger.info(
"timeline_event_added",
id=event.id,
type=event_type,
title=title,
)
return {
"id": event.id,
"type": event.event_type,
"status": event.status,
"title": event.title,
"created_at": event.created_at.isoformat(),
}
async def get_events(self, limit: int = 50) -> list[dict[str, Any]]:
"""
取得最近的時間軸事件
"""
async with get_db_context() as db:
result = await db.execute(
select(TimelineEvent)
.order_by(TimelineEvent.created_at.desc())
.limit(limit)
)
events = result.scalars().all()
return [
{
"id": e.id,
"type": e.event_type,
"status": e.status,
"title": e.title,
"description": e.description,
"actor": e.actor,
"actor_role": e.actor_role,
"risk_level": e.risk_level,
"approval_id": e.approval_id,
"created_at": e.created_at.isoformat(),
}
for e in events
]
# =============================================================================
# Singleton Instances
# =============================================================================
_approval_service: ApprovalDBService | None = None
_timeline_service: TimelineDBService | None = None
def get_approval_service(use_repository: bool = False) -> ApprovalDBService:
"""
取得授權服務實例
Args:
use_repository: 是否使用 Repository 層 (Phase 16 R3.4)
Phase 16: 絞殺者模式
- use_repository=False: 使用內嵌 DB 操作 (預設,向下相容)
- use_repository=True: 使用 ApprovalDBRepository
"""
global _approval_service
if _approval_service is None:
if use_repository:
from src.repositories import get_approval_repository
_approval_service = ApprovalDBService(repository=get_approval_repository())
logger.info("approval_service_with_repository")
else:
_approval_service = ApprovalDBService()
return _approval_service
def get_timeline_service() -> TimelineDBService:
"""取得時間軸服務實例"""
global _timeline_service
if _timeline_service is None:
_timeline_service = TimelineDBService()
return _timeline_service