Files
awoooi/apps/api/src/services/approval_db.py
OG T 6ad73b4834
All checks were successful
CD Pipeline / build-and-deploy (push) Successful in 11m6s
fix(flywheel): 三修 L5/L6 斷鏈 — RBAC 擴權 + 失敗原因入庫 + verifier 失敗時也跑
2026-04-18 晚(台北時區) — ogt + Claude Opus 4.7 (1M)

全景飛輪診斷暴露 3 個真斷鏈:
  - L5 執行 30d: EXECUTION_FAILED 216 / EXECUTION_SUCCESS 2 (失敗率 99%)
  - L6 驗證 7d: verification_result 全 NULL (988 筆 evidence 都沒驗)
  - 所有 rejection_reason / error_message 欄位全空(無法診斷)

根因: awoooi-executor ServiceAccount RBAC 不足,executor.py 每次
kubectl get nodes/HPA 都 Forbidden,連 evidence 都抓不到,後面 repair
全炸,verifier 因為 execution 沒 success 永遠不 trigger,evidence
驗證結果永遠 NULL。修一個 RBAC 解 3 個節點。

## P0.1 RBAC 擴權 (k8s/awoooi-prod/07-rbac.yaml)

新增 cluster-scope 讀權(僅 list/get/watch,零寫入):
  - nodes + nodes/status (evidence gathering 必需)
  - horizontalpodautoscalers (HPA 狀態)
  - metrics.k8s.io: nodes + pods (resource metrics)
  - statefulsets + daemonsets (完整 workload 視圖)

已 kubectl apply + 煙霧測試: kubectl get nodes 可跑。

## P0.2 失敗時必寫 rejection_reason (approval_db.py)

update_execution_status() 新增 error_message 參數,失敗時寫入
rejection_reason (截 2000 字) → 之後診斷有依據。

approval_execution.py 呼叫端同步更新,result.error 一路傳進 DB。

## P0.3 Verifier 失敗時也跑 (approval_execution.py)

原邏輯: verifier 只在 result.success=True 時呼叫 → 99% 失敗下
永遠不跑。

新邏輯: 失敗 path 也 create_task 跑 verifier,action_taken 後綴
加 ":FAILED" 標記。verifier 抓 post_state 寫
verification_result='failed' 回 incident_evidence。

L7 learning 從此有失敗樣本可學,playbook trust 負向 2x 衰減才
真正生效。

預期效果:
  - EXECUTION_FAILED 率 30d 內應從 99% 降到 <30%
  - incident_evidence.verification_result NULL 率應從 100% 降到 <10%
  - approval_records.rejection_reason 補齊率從 0% 到 100%

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 20:12:57 +08:00

929 lines
32 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Database-based Approval Service
================================
Phase 5: 永久記憶植入
將 TrustEngine 的 in-memory 邏輯轉換為資料庫 CRUD 操作。
重啟後資料完好無缺。
Features:
- SQLAlchemy async CRUD
- ApprovalRecord 持久化
- TimelineEvent 持久化
- 與原有 API 契約相容
"""
from datetime import UTC, datetime, timedelta
from typing import Any
from uuid import UUID
import structlog
from sqlalchemy import and_, or_, select, update
from src.core.trust_engine import classify_risk, get_required_signatures
from src.db.base import get_db_context
from src.db.models import ApprovalRecord, TimelineEvent
from src.models.approval import (
ApprovalRequest,
ApprovalRequestCreate,
ApprovalStatus,
BlastRadius,
DataImpact,
DryRunCheck,
RiskLevel,
Signature,
)
logger = structlog.get_logger(__name__)
# =============================================================================
# Conversion Helpers
# =============================================================================
def approval_record_to_request(record: ApprovalRecord) -> ApprovalRequest:
"""
Convert DB ApprovalRecord to Pydantic ApprovalRequest
保持 API 契約相容性
"""
# Parse blast_radius from JSON
blast_radius = None
if record.blast_radius:
br = record.blast_radius
blast_radius = BlastRadius(
affected_pods=br.get("affected_pods", 0),
estimated_downtime=br.get("estimated_downtime", "0"),
related_services=br.get("related_services", []),
data_impact=DataImpact(br.get("data_impact", "none").lower())
if br.get("data_impact")
else DataImpact.NONE,
)
# Parse dry_run_checks from JSON
dry_run_checks = []
if record.dry_run_checks:
for check in record.dry_run_checks:
dry_run_checks.append(
DryRunCheck(
name=check.get("name", ""),
passed=check.get("passed", True),
message=check.get("message"),
)
)
# Parse signatures from JSON
signatures = []
if record.signatures:
for sig in record.signatures:
signatures.append(
Signature(
signer_id=sig.get("signer_id", ""),
signer_name=sig.get("signer_name", ""),
timestamp=datetime.fromisoformat(sig["timestamp"])
if sig.get("timestamp")
else datetime.now(UTC),
comment=sig.get("comment"),
)
)
return ApprovalRequest(
id=UUID(record.id),
action=record.action,
description=record.description,
status=ApprovalStatus(record.status.value if hasattr(record.status, 'value') else record.status),
risk_level=RiskLevel(record.risk_level.value if hasattr(record.risk_level, 'value') else record.risk_level),
blast_radius=blast_radius,
dry_run_checks=dry_run_checks,
required_signatures=record.required_signatures,
current_signatures=record.current_signatures,
signatures=signatures,
requested_by=record.requested_by,
created_at=record.created_at,
expires_at=record.expires_at,
resolved_at=record.resolved_at,
rejection_reason=record.rejection_reason,
metadata=record.extra_metadata,
# 戰略 B: 告警風暴收斂
fingerprint=record.fingerprint,
hit_count=record.hit_count,
last_seen_at=record.last_seen_at,
)
def approval_request_to_record_data(
request: ApprovalRequestCreate,
risk_level: RiskLevel,
required_sigs: int,
fingerprint: str | None = None, # 戰略 B: 告警指紋
) -> dict[str, Any]:
"""
Convert ApprovalRequestCreate to dict for ApprovalRecord creation
"""
blast_radius_dict = None
if request.blast_radius:
blast_radius_dict = {
"affected_pods": request.blast_radius.affected_pods,
"estimated_downtime": request.blast_radius.estimated_downtime,
"related_services": request.blast_radius.related_services,
"data_impact": request.blast_radius.data_impact.value.lower()
if request.blast_radius.data_impact
else "none",
}
dry_run_checks_list = []
if request.dry_run_checks:
for check in request.dry_run_checks:
dry_run_checks_list.append({
"name": check.name,
"passed": check.passed,
"message": check.message,
})
now = datetime.now(UTC)
return {
"action": request.action,
"description": request.description,
"status": ApprovalStatus.APPROVED if risk_level == RiskLevel.LOW else ApprovalStatus.PENDING,
"risk_level": risk_level,
"required_signatures": required_sigs,
"current_signatures": 0,
"signatures": [],
"blast_radius": blast_radius_dict or {},
"dry_run_checks": dry_run_checks_list,
"requested_by": request.requested_by,
"expires_at": request.expires_at,
"extra_metadata": request.metadata,
"resolved_at": now if risk_level == RiskLevel.LOW else None,
# 戰略 B: 告警風暴收斂
"fingerprint": fingerprint,
"hit_count": 1,
"last_seen_at": now,
# 2026-04-14 Claude Sonnet 4.6: 補漏 — 原本 incident_id/telegram_message_id
# 不在 dict 裡導致 DB 欄位永遠 NULLTelegram 卡片顯示 INC 號是空白
# 用戶在 Telegram 根本認不出對應的告警,審核閉環名存實亡
"incident_id": request.incident_id,
}
# =============================================================================
# Database Approval Service
# =============================================================================
class ApprovalDBService:
"""
資料庫授權服務 - 替代 in-memory TrustEngine
所有操作皆為資料庫 CRUD重啟後資料保持
Phase 16 R3.4: 支援 Repository 注入
- 有注入 repository: 使用 Repository 層
- 無注入: 使用內嵌 DB 操作 (向下相容)
"""
def __init__(self, repository=None):
"""
初始化 ApprovalDBService
Args:
repository: IApprovalRepository 實例 (可選Phase 16 DI)
"""
self._repository = repository
async def create_approval(
self,
request: ApprovalRequestCreate,
) -> ApprovalRequest:
"""
建立新授權請求 (寫入資料庫)
"""
# 分類風險
risk_level = classify_risk(
action=request.action,
blast_radius=request.blast_radius,
explicit_level=request.risk_level,
)
# 取得所需簽核數
required_sigs = get_required_signatures(risk_level)
# 準備資料
data = approval_request_to_record_data(request, risk_level, required_sigs)
async with get_db_context() as db:
record = ApprovalRecord(**data)
db.add(record)
await db.flush()
await db.refresh(record)
logger.info(
"approval_created_db",
id=record.id,
risk_level=risk_level.value,
status=record.status.value if hasattr(record.status, 'value') else record.status,
)
return approval_record_to_request(record)
# =========================================================================
# 戰略 B: 告警風暴收斂
# =========================================================================
async def create_approval_with_fingerprint(
self,
request: ApprovalRequestCreate,
fingerprint: str,
) -> ApprovalRequest:
"""
建立帶指紋的授權請求 (戰略 B)
用於告警收斂:相同指紋的告警會被聚合。
ADR-073 補丁 2026-04-15 ogt + Claude Sonnet 4.6:
所有 webhook 路徑都未傳 expires_at導致 DB 欄位為 NULL
get_pending_approvals() 的自動過期邏輯 (WHERE expires_at < now)
永遠不觸發,殭屍 PENDING 記錄無限堆積。
修正:凡未傳 expires_at自動注入 48h 預設值。
"""
DEFAULT_APPROVAL_TTL_HOURS = 48 # 給人類 48h 決定視窗
if not request.expires_at:
now = datetime.now(UTC)
request = request.model_copy(
update={"expires_at": now + timedelta(hours=DEFAULT_APPROVAL_TTL_HOURS)}
)
risk_level = classify_risk(
action=request.action,
blast_radius=request.blast_radius,
explicit_level=request.risk_level,
)
required_sigs = get_required_signatures(risk_level)
data = approval_request_to_record_data(request, risk_level, required_sigs, fingerprint=fingerprint)
async with get_db_context() as db:
record = ApprovalRecord(**data)
db.add(record)
await db.flush()
await db.refresh(record)
logger.info(
"approval_created_with_fingerprint",
id=record.id,
fingerprint=fingerprint,
risk_level=risk_level.value,
)
return approval_record_to_request(record)
async def find_by_fingerprint(
self,
fingerprint: str,
debounce_minutes: int = 5,
) -> ApprovalRequest | None:
"""
根據指紋查詢現有的告警記錄 (戰略 B)
查詢條件:
1. 相同指紋
2. 狀態為 PENDING 且在 24 小時內建立(超過 24h 的 PENDING 視為過期,不再收斂)
3. 或在 debounce_minutes 分鐘內建立(不論狀態)
ADR-073 補丁 2026-04-15 ogt + Claude Sonnet 4.6:
原邏輯 PENDING 無 TTL → 3 天前 PENDING 記錄永久封鎖同指紋告警。
修正PENDING 收斂窗口上限 PENDING_TTL_HOURS24h
Returns:
ApprovalRequest if found, None otherwise
"""
PENDING_TTL_HOURS = 24 # PENDING 記錄最長收斂時效(超過則視為已過期)
now = datetime.now(UTC)
cutoff_time = now - timedelta(minutes=debounce_minutes)
pending_cutoff = now - timedelta(hours=PENDING_TTL_HOURS)
async with get_db_context() as db:
result = await db.execute(
select(ApprovalRecord)
.where(ApprovalRecord.fingerprint == fingerprint)
.where(
or_(
# PENDING 狀態但必須在 24h 內,防止老 PENDING 永久封鎖
and_(
ApprovalRecord.status == ApprovalStatus.PENDING,
ApprovalRecord.created_at >= pending_cutoff,
),
# 最近 debounce_minutes 分鐘內建立的任何記錄
ApprovalRecord.created_at >= cutoff_time,
)
)
.order_by(ApprovalRecord.created_at.desc())
.limit(1)
)
record = result.scalar_one_or_none()
if record:
logger.info(
"fingerprint_match_found",
fingerprint=fingerprint,
approval_id=record.id,
hit_count=record.hit_count,
status=record.status.value if hasattr(record.status, 'value') else record.status,
)
return approval_record_to_request(record)
return None
async def increment_hit_count(
self,
approval_id: UUID,
) -> ApprovalRequest | None:
"""
增加告警聚合次數 (戰略 B)
當相同指紋的告警再次觸發時:
1. hit_count += 1
2. last_seen_at = now
這樣可以跳過 LLM 分析,節省 API 成本!
"""
now = datetime.now(UTC)
async with get_db_context() as db:
# 更新 hit_count 和 last_seen_at
result = await db.execute(
update(ApprovalRecord)
.where(ApprovalRecord.id == str(approval_id))
.values(
hit_count=ApprovalRecord.hit_count + 1,
last_seen_at=now,
)
.returning(ApprovalRecord.hit_count)
)
new_count = result.scalar_one_or_none()
if new_count is None:
return None
# 重新讀取完整記錄
result = await db.execute(
select(ApprovalRecord).where(ApprovalRecord.id == str(approval_id))
)
record = result.scalar_one_or_none()
if record:
logger.info(
"hit_count_incremented",
approval_id=str(approval_id),
new_hit_count=new_count,
last_seen_at=now.isoformat(),
)
return approval_record_to_request(record)
return None
async def get_approval(self, approval_id: UUID) -> ApprovalRequest | None:
"""
取得單一授權請求
Phase 16 R3.4: 支援 Repository 注入
"""
# Phase 16: 使用 Repository (如果有注入)
if self._repository:
return await self._repository.get_by_id(approval_id)
# Legacy: 內嵌 DB 操作
async with get_db_context() as db:
result = await db.execute(
select(ApprovalRecord).where(ApprovalRecord.id == str(approval_id))
)
record = result.scalar_one_or_none()
if record is None:
return None
return approval_record_to_request(record)
async def get_pending_approvals(self) -> list[ApprovalRequest]:
"""
取得所有待簽核請求
Phase 16 R3.4: 支援 Repository 注入
"""
# Phase 16: 使用 Repository (如果有注入)
if self._repository:
return await self._repository.get_pending()
# Legacy: 內嵌 DB 操作
now = datetime.now(UTC)
async with get_db_context() as db:
# 先更新過期的請求
await db.execute(
update(ApprovalRecord)
.where(ApprovalRecord.status == ApprovalStatus.PENDING)
.where(ApprovalRecord.expires_at < now)
.values(status=ApprovalStatus.EXPIRED, resolved_at=now)
)
# 取得所有 PENDING
result = await db.execute(
select(ApprovalRecord)
.where(ApprovalRecord.status == ApprovalStatus.PENDING)
.order_by(ApprovalRecord.created_at.desc())
)
records = result.scalars().all()
return [approval_record_to_request(r) for r in records]
async def sign_approval(
self,
approval_id: UUID,
signer_id: str,
signer_name: str,
comment: str | None = None,
) -> tuple[ApprovalRequest | None, str, bool]:
"""
簽核授權請求
Phase 5: 使用 FOR UPDATE 行鎖防止 Race Condition
當多人同時簽核時,確保只有一人能成功取得鎖並更新
Returns:
(approval, message, execution_triggered)
"""
async with get_db_context() as db:
# Phase 5: FOR UPDATE 行級鎖 - 防止併發簽核競爭
# SQLite 不支援 FOR UPDATE但 PostgreSQL 完整支援
result = await db.execute(
select(ApprovalRecord)
.where(ApprovalRecord.id == str(approval_id))
.with_for_update() # Row-Level Lock
)
record = result.scalar_one_or_none()
logger.info(
"sign_approval_lock_acquired",
approval_id=str(approval_id),
signer_id=signer_id,
)
if record is None:
return None, "Approval not found", False
# 檢查狀態
status_value = record.status.value if hasattr(record.status, 'value') else record.status
if status_value != "pending":
return (
approval_record_to_request(record),
f"Cannot sign: status is {status_value}",
False,
)
# 檢查是否已簽核
signatures = record.signatures or []
for sig in signatures:
if sig.get("signer_id") == signer_id:
return (
approval_record_to_request(record),
f"User {signer_name} has already signed this approval",
False,
)
# Phase 5: 樂觀鎖 - 記錄更新前的簽名數
old_sig_count = record.current_signatures
# 新增簽章
new_signature = {
"signer_id": signer_id,
"signer_name": signer_name,
"timestamp": datetime.now(UTC).isoformat(),
"comment": comment,
}
signatures.append(new_signature)
new_sig_count = len(signatures)
# 計算新狀態
execution_triggered = False
new_status = record.status
resolved_at = None
if new_sig_count >= record.required_signatures:
new_status = ApprovalStatus.APPROVED
resolved_at = datetime.now(UTC)
execution_triggered = True
# Phase 5: 樂觀鎖更新 - 使用 WHERE current_signatures = old_value
# 如果其他人已更新,這個 UPDATE 會更新 0 行
result = await db.execute(
update(ApprovalRecord)
.where(and_(
ApprovalRecord.id == str(approval_id),
ApprovalRecord.current_signatures == old_sig_count, # 樂觀鎖條件
))
.values(
signatures=signatures,
current_signatures=new_sig_count,
status=new_status,
resolved_at=resolved_at,
)
)
# 檢查是否更新成功
if result.rowcount == 0:
logger.warning(
"sign_approval_optimistic_lock_conflict",
approval_id=str(approval_id),
signer_id=signer_id,
old_sig_count=old_sig_count,
)
return (
approval_record_to_request(record),
"Concurrent modification detected. Please retry.",
False,
)
# 重新讀取更新後的記錄
result = await db.execute(
select(ApprovalRecord).where(ApprovalRecord.id == str(approval_id))
)
record = result.scalar_one()
if execution_triggered:
message = f"Approval complete! ({new_sig_count}/{record.required_signatures} signatures)"
else:
message = f"Signature added ({new_sig_count}/{record.required_signatures})"
logger.info(
"approval_signed_db",
id=record.id,
signer=signer_name,
current=record.current_signatures,
required=record.required_signatures,
execution_triggered=execution_triggered,
)
return approval_record_to_request(record), message, execution_triggered
async def reject_approval(
self,
approval_id: UUID,
rejector_id: str,
rejector_name: str,
reason: str,
) -> tuple[ApprovalRequest | None, str]:
"""
拒絕授權請求
"""
async with get_db_context() as db:
result = await db.execute(
select(ApprovalRecord).where(ApprovalRecord.id == str(approval_id))
)
record = result.scalar_one_or_none()
if record is None:
return None, "Approval not found"
status_value = record.status.value if hasattr(record.status, 'value') else record.status
if status_value != "pending":
return (
approval_record_to_request(record),
f"Cannot reject: status is {status_value}",
)
record.status = ApprovalStatus.REJECTED
record.rejection_reason = f"{rejector_name}: {reason}"
record.resolved_at = datetime.now(UTC)
await db.flush()
await db.refresh(record)
logger.info(
"approval_rejected_db",
id=record.id,
rejector=rejector_name,
reason=reason,
)
return approval_record_to_request(record), "Approval rejected"
async def update_execution_status(
self,
approval_id: UUID,
success: bool,
error_message: str | None = None,
) -> None:
"""
更新執行狀態
2026-04-18 ogt + Claude Opus 4.7: ADR-090 L5 斷鏈修復 — P0.2
失敗時必寫 rejection_reason,讓診斷不再黑盒
(之前 EXECUTION_FAILED 216 筆 reason 全空)
"""
async with get_db_context() as db:
status = ApprovalStatus.EXECUTION_SUCCESS if success else ApprovalStatus.EXECUTION_FAILED
values: dict = {"status": status}
if not success and error_message:
# 截斷至合理長度,避免爆欄位
values["rejection_reason"] = str(error_message)[:2000]
await db.execute(
update(ApprovalRecord)
.where(ApprovalRecord.id == str(approval_id))
.values(**values)
)
logger.info(
"approval_execution_status_updated",
id=str(approval_id),
success=success,
has_error=bool(error_message),
)
async def update_incident_id(self, approval_id: UUID, incident_id: str) -> None:
"""
2026-04-06 ogt: Phase 26 — 回寫 incident_id 到 approval_records
讓 Playbook 萃取和 KM 寫入能找到對應的 Incident
2026-04-14 Claude Sonnet 4.6 診斷: Live-fire #7 發現 approval.incident_id 仍 NULL
加 rowcount 與 pre/post 值檢查,若 0 rows affected 則 log warning
"""
async with get_db_context() as db:
result = await db.execute(
update(ApprovalRecord)
.where(ApprovalRecord.id == str(approval_id))
.values(incident_id=incident_id)
)
rowcount = result.rowcount if hasattr(result, "rowcount") else -1
if rowcount == 0:
# 找不到對應 approval — 可能 id 型別或 session 不同步
logger.warning(
"update_incident_id_zero_rows",
approval_id=str(approval_id),
approval_id_type=type(approval_id).__name__,
incident_id=incident_id,
reason="UPDATE 0 rows affected — approval 不存在或 id mismatch",
)
else:
logger.info(
"update_incident_id_success",
approval_id=str(approval_id),
incident_id=incident_id,
rowcount=rowcount,
)
async def update_telegram_message(
self, incident_id: str, telegram_message_id: int, telegram_chat_id: int | None = None
) -> None:
"""
2026-04-09 Claude Sonnet 4.6: 持久化 Telegram message_id 到 DB
讓告警訊息 ID 不再只存 Redis24h TTL支援長期狀態追蹤和訊息更新。
以 incident_id 查找最新 PENDING approval record 並回填。
"""
async with get_db_context() as db:
# 2026-04-10 Claude Sonnet 4.6: 用 raw SQL 避免 SQLAlchemy 推斷 INTEGER
# telegram_chat_id 為 BIGINTORM update() 會誤用 $N::INTEGER 導致 int32 overflow
from sqlalchemy import text as _text
params: dict = {
"incident_id": incident_id,
"telegram_message_id": telegram_message_id,
"status": "PENDING",
}
chat_clause = ""
if telegram_chat_id is not None:
params["telegram_chat_id"] = telegram_chat_id
chat_clause = ", telegram_chat_id = CAST(:telegram_chat_id AS BIGINT)"
await db.execute(
_text(f"""
UPDATE approval_records
SET telegram_message_id = :telegram_message_id{chat_clause}
WHERE incident_id = :incident_id
AND status = :status
"""),
params,
)
async def update_action_by_incident_id(self, incident_id: str, new_action: str) -> int:
"""
Agent Orchestrator 分析完成後覆寫 ApprovalRecord.action。
設計動機 (2026-04-16 ogt + Claude Sonnet 4.6):
- Webhook inline LLM 寫入垃圾 action如 kubectl rollout restart for postgres disk
- Agent 分析正確但只發新 Telegram 卡,未覆寫 ApprovalRecord
- 用戶批准 Agent 卡 → 系統查 incident_id → 執行舊 webhook 垃圾 action
- 修復Agent 完成後呼叫此方法,讓用戶批准時執行正確 action
Args:
incident_id: INC-xxx 格式 Incident ID
new_action: Agent 決定的 action空字串 → 不覆寫)
Returns:
int: rowcount0 表示找不到對應 PENDING approval
"""
if not new_action:
return 0
async with get_db_context() as db:
from sqlalchemy import text as _text
result = await db.execute(
_text("""
UPDATE approval_records
SET action = :new_action
WHERE incident_id = :incident_id
AND status = 'PENDING'
"""),
{"incident_id": incident_id, "new_action": new_action},
)
rowcount = result.rowcount if hasattr(result, "rowcount") else -1
logger.info(
"approval_action_updated_by_agent",
incident_id=incident_id,
new_action=new_action[:80],
rowcount=rowcount,
)
return rowcount
# =========================================================================
# Phase 6.4h: Proposals API 支援方法
# =========================================================================
async def get_approval_by_id(self, approval_id: UUID) -> ApprovalRequest | None:
"""
根據 ID 取得單一授權請求 (Phase 6.4h)
Args:
approval_id: 授權請求 UUID
Returns:
ApprovalRequest if found, None otherwise
"""
async with get_db_context() as db:
result = await db.execute(
select(ApprovalRecord).where(ApprovalRecord.id == str(approval_id))
)
record = result.scalar_one_or_none()
if record is None:
return None
return approval_record_to_request(record)
async def get_all_approvals(
self,
status: ApprovalStatus | None = None,
incident_id: str | None = None,
limit: int = 50,
offset: int = 0,
) -> list[ApprovalRequest]:
"""
取得所有授權請求 (Phase 6.4h)
Args:
status: 狀態篩選 (可選)
incident_id: Incident ID 篩選 (可選)
limit: 每頁數量
offset: 偏移量
Returns:
ApprovalRequest 清單
"""
async with get_db_context() as db:
query = select(ApprovalRecord)
# 狀態篩選
if status is not None:
query = query.where(ApprovalRecord.status == status)
# 2026-04-09 Claude Sonnet 4.6: 修復 incident_id 篩選 — 直接用 DB 欄位
# 舊版在應用層查 a.metadata.get("incident_id") 但 ApprovalRecord.incident_id
# 是直接欄位,不在 extra_metadata JSON 裡,導致 telegram_approval_not_found_by_incident
if incident_id:
query = query.where(ApprovalRecord.incident_id == incident_id)
query = query.order_by(ApprovalRecord.created_at.desc())
query = query.offset(offset).limit(limit)
result = await db.execute(query)
records = result.scalars().all()
approvals = [approval_record_to_request(r) for r in records]
return approvals
# =============================================================================
# Timeline Event Service
# =============================================================================
class TimelineDBService:
"""
時間軸事件服務 - Phase 4 Action Timeline 持久化
"""
async def add_event(
self,
event_type: str,
status: str,
title: str,
description: str | None = None,
actor: str | None = None,
actor_role: str | None = None,
risk_level: str | None = None,
approval_id: str | None = None,
) -> dict[str, Any]:
"""
新增時間軸事件
"""
async with get_db_context() as db:
event = TimelineEvent(
event_type=event_type,
status=status,
title=title,
description=description,
actor=actor,
actor_role=actor_role,
risk_level=risk_level,
approval_id=approval_id,
)
db.add(event)
await db.flush()
await db.refresh(event)
logger.info(
"timeline_event_added",
id=event.id,
type=event_type,
title=title,
)
return {
"id": event.id,
"type": event.event_type,
"status": event.status,
"title": event.title,
"created_at": event.created_at.isoformat(),
}
async def get_events(self, limit: int = 50) -> list[dict[str, Any]]:
"""
取得最近的時間軸事件
"""
async with get_db_context() as db:
result = await db.execute(
select(TimelineEvent)
.order_by(TimelineEvent.created_at.desc())
.limit(limit)
)
events = result.scalars().all()
return [
{
"id": e.id,
"type": e.event_type,
"status": e.status,
"title": e.title,
"description": e.description,
"actor": e.actor,
"actor_role": e.actor_role,
"risk_level": e.risk_level,
"approval_id": e.approval_id,
"created_at": e.created_at.isoformat(),
}
for e in events
]
# =============================================================================
# Singleton Instances
# =============================================================================
_approval_service: ApprovalDBService | None = None
_timeline_service: TimelineDBService | None = None
def get_approval_service(use_repository: bool = False) -> ApprovalDBService:
"""
取得授權服務實例
Args:
use_repository: 是否使用 Repository 層 (Phase 16 R3.4)
Phase 16: 絞殺者模式
- use_repository=False: 使用內嵌 DB 操作 (預設,向下相容)
- use_repository=True: 使用 ApprovalDBRepository
"""
global _approval_service
if _approval_service is None:
if use_repository:
from src.repositories import get_approval_repository
_approval_service = ApprovalDBService(repository=get_approval_repository())
logger.info("approval_service_with_repository")
else:
_approval_service = ApprovalDBService()
return _approval_service
def get_timeline_service() -> TimelineDBService:
"""取得時間軸服務實例"""
global _timeline_service
if _timeline_service is None:
_timeline_service = TimelineDBService()
return _timeline_service