feat(api): Phase 16 R3.3 Repository 實作 + CI 修復

新增:
- ApprovalDBRepository: Approval CRUD 操作
- IncidentDBRepository: Incident CRUD 操作
- get_approval_repository/get_incident_repository 函數

修復:
- .gitignore 新增 .claude/worktrees/ (防止 CI 失敗)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
OG T
2026-03-25 21:22:02 +08:00
parent fe76d0b108
commit 75ef8fee0c
4 changed files with 483 additions and 0 deletions

3
.gitignore vendored
View File

@@ -65,3 +65,6 @@ Thumbs.db
tmp/
temp/
*.tmp
# Claude Code worktrees (本地使用,不需提交)
.claude/worktrees/

View File

@@ -18,9 +18,24 @@ from src.repositories.interfaces import (
IIncidentRepository,
ITimelineRepository,
)
from src.repositories.approval_repository import (
ApprovalDBRepository,
get_approval_repository,
)
from src.repositories.incident_repository import (
IncidentDBRepository,
get_incident_repository,
)
__all__ = [
# Interfaces
"IApprovalRepository",
"IIncidentRepository",
"ITimelineRepository",
# Implementations
"ApprovalDBRepository",
"IncidentDBRepository",
# Getters
"get_approval_repository",
"get_incident_repository",
]

View File

@@ -0,0 +1,288 @@
"""
Approval Repository - PostgreSQL 實作
======================================
Phase 16 R3.3: Repository 層實作
職責: ApprovalRecord 的 CRUD 操作
設計: 實作 IApprovalRepository Protocol
版本: v1.0
建立: 2026-03-26 (台北時區)
建立者: Claude Code (Phase 16 架構重構)
"""
from datetime import UTC, datetime
from typing import Any
from uuid import UUID
import structlog
from sqlalchemy import select, update
from src.db.base import get_db_context
from src.db.models import ApprovalRecord
from src.models.approval import (
ApprovalRequest,
ApprovalRequestCreate,
ApprovalStatus,
BlastRadius,
DataImpact,
DryRunCheck,
RiskLevel,
Signature,
)
from src.repositories.interfaces import IApprovalRepository
logger = structlog.get_logger(__name__)
# =============================================================================
# Conversion Helpers
# =============================================================================
def _record_to_request(record: ApprovalRecord) -> ApprovalRequest:
"""Convert DB ApprovalRecord to Pydantic ApprovalRequest"""
blast_radius = None
if record.blast_radius:
br = record.blast_radius
blast_radius = BlastRadius(
affected_pods=br.get("affected_pods", 0),
estimated_downtime=br.get("estimated_downtime", "0"),
related_services=br.get("related_services", []),
data_impact=DataImpact(br.get("data_impact", "none").lower())
if br.get("data_impact")
else DataImpact.NONE,
)
dry_run_checks = []
if record.dry_run_checks:
for check in record.dry_run_checks:
dry_run_checks.append(
DryRunCheck(
name=check.get("name", ""),
passed=check.get("passed", True),
message=check.get("message"),
)
)
signatures = []
if record.signatures:
for sig in record.signatures:
signatures.append(
Signature(
signer_id=sig.get("signer_id", ""),
signer_name=sig.get("signer_name", ""),
timestamp=datetime.fromisoformat(sig["timestamp"])
if sig.get("timestamp")
else datetime.now(UTC),
comment=sig.get("comment"),
)
)
return ApprovalRequest(
id=UUID(record.id),
action=record.action,
description=record.description,
status=ApprovalStatus(
record.status.value if hasattr(record.status, "value") else record.status
),
risk_level=RiskLevel(
record.risk_level.value
if hasattr(record.risk_level, "value")
else record.risk_level
),
blast_radius=blast_radius,
dry_run_checks=dry_run_checks,
required_signatures=record.required_signatures,
current_signatures=record.current_signatures,
signatures=signatures,
requested_by=record.requested_by,
created_at=record.created_at,
expires_at=record.expires_at,
resolved_at=record.resolved_at,
rejection_reason=record.rejection_reason,
metadata=record.extra_metadata,
fingerprint=record.fingerprint,
hit_count=record.hit_count,
last_seen_at=record.last_seen_at,
)
# =============================================================================
# ApprovalDBRepository
# =============================================================================
class ApprovalDBRepository(IApprovalRepository):
"""
Approval Repository - PostgreSQL 實作
純 CRUD 操作,不含業務邏輯
業務邏輯請放在 Service 層
"""
async def create(
self,
data: dict[str, Any],
) -> ApprovalRequest:
"""建立新的 Approval Record"""
async with get_db_context() as db:
record = ApprovalRecord(**data)
db.add(record)
await db.flush()
await db.refresh(record)
logger.debug(
"approval_created",
id=record.id,
status=record.status,
)
return _record_to_request(record)
async def get_by_id(self, approval_id: UUID) -> ApprovalRequest | None:
"""根據 ID 取得 Approval"""
async with get_db_context() as db:
result = await db.execute(
select(ApprovalRecord).where(ApprovalRecord.id == str(approval_id))
)
record = result.scalar_one_or_none()
return _record_to_request(record) if record else None
async def get_pending(self) -> list[ApprovalRequest]:
"""取得所有待審核的 Approval"""
async with get_db_context() as db:
result = await db.execute(
select(ApprovalRecord)
.where(ApprovalRecord.status == ApprovalStatus.PENDING)
.order_by(ApprovalRecord.created_at.desc())
)
records = result.scalars().all()
return [_record_to_request(r) for r in records]
async def update_status(
self,
approval_id: UUID,
status: ApprovalStatus,
actor: str | None = None,
) -> ApprovalRequest | None:
"""更新 Approval 狀態"""
async with get_db_context() as db:
result = await db.execute(
select(ApprovalRecord).where(ApprovalRecord.id == str(approval_id))
)
record = result.scalar_one_or_none()
if not record:
return None
record.status = status
if status in (ApprovalStatus.APPROVED, ApprovalStatus.REJECTED):
record.resolved_at = datetime.now(UTC)
await db.flush()
await db.refresh(record)
logger.info(
"approval_status_updated",
id=str(approval_id),
status=status.value,
actor=actor,
)
return _record_to_request(record)
async def add_signature(
self,
approval_id: UUID,
signer: str,
decision: str,
comment: str | None = None,
) -> ApprovalRequest | None:
"""新增簽核"""
async with get_db_context() as db:
result = await db.execute(
select(ApprovalRecord).where(ApprovalRecord.id == str(approval_id))
)
record = result.scalar_one_or_none()
if not record:
return None
# 新增簽名
new_signature = {
"signer_id": signer,
"signer_name": signer,
"timestamp": datetime.now(UTC).isoformat(),
"decision": decision,
"comment": comment,
}
signatures = record.signatures or []
signatures.append(new_signature)
record.signatures = signatures
record.current_signatures = len(signatures)
await db.flush()
await db.refresh(record)
logger.info(
"signature_added",
id=str(approval_id),
signer=signer,
decision=decision,
total_signatures=record.current_signatures,
)
return _record_to_request(record)
async def find_by_fingerprint(
self,
fingerprint: str,
) -> ApprovalRequest | None:
"""根據指紋查找 Approval (告警收斂用)"""
async with get_db_context() as db:
result = await db.execute(
select(ApprovalRecord)
.where(ApprovalRecord.fingerprint == fingerprint)
.where(ApprovalRecord.status == ApprovalStatus.PENDING)
.order_by(ApprovalRecord.created_at.desc())
.limit(1)
)
record = result.scalar_one_or_none()
return _record_to_request(record) if record else None
async def increment_hit_count(
self,
approval_id: UUID,
) -> ApprovalRequest | None:
"""增加 hit_count (告警收斂用)"""
async with get_db_context() as db:
result = await db.execute(
select(ApprovalRecord).where(ApprovalRecord.id == str(approval_id))
)
record = result.scalar_one_or_none()
if not record:
return None
record.hit_count = (record.hit_count or 0) + 1
record.last_seen_at = datetime.now(UTC)
await db.flush()
await db.refresh(record)
return _record_to_request(record)
# =============================================================================
# Singleton
# =============================================================================
_approval_repository: ApprovalDBRepository | None = None
def get_approval_repository() -> ApprovalDBRepository:
"""取得 ApprovalDBRepository 實例 (Singleton)"""
global _approval_repository
if _approval_repository is None:
_approval_repository = ApprovalDBRepository()
return _approval_repository

View File

@@ -0,0 +1,177 @@
"""
Incident Repository - PostgreSQL 實作
======================================
Phase 16 R3.3: Repository 層實作
職責: IncidentRecord 的 CRUD 操作
設計: 實作 IIncidentRepository Protocol
版本: v1.0
建立: 2026-03-26 (台北時區)
建立者: Claude Code (Phase 16 架構重構)
"""
from datetime import UTC, datetime
from typing import Any
import structlog
from sqlalchemy import select
from src.db.base import get_db_context
from src.db.models import IncidentRecord
from src.models.incident import Incident, IncidentStatus, Severity
from src.repositories.interfaces import IIncidentRepository
logger = structlog.get_logger(__name__)
# =============================================================================
# Conversion Helpers
# =============================================================================
def _record_to_incident(record: IncidentRecord) -> Incident:
"""Convert DB IncidentRecord to Pydantic Incident"""
return Incident(
incident_id=record.incident_id,
status=IncidentStatus(record.status),
severity=Severity(record.severity),
signals=record.signals or [],
affected_services=record.affected_services or [],
proposal_ids=record.proposal_ids or [],
created_at=record.created_at,
updated_at=record.updated_at,
resolved_at=record.resolved_at,
closed_at=record.closed_at,
)
def _incident_to_record_data(incident: Incident) -> dict[str, Any]:
"""Convert Pydantic Incident to dict for DB record"""
return {
"incident_id": incident.incident_id,
"status": incident.status.value,
"severity": incident.severity.value,
"signals": [s.model_dump() for s in incident.signals],
"affected_services": incident.affected_services,
"proposal_ids": incident.proposal_ids,
"created_at": incident.created_at,
"updated_at": incident.updated_at,
"resolved_at": incident.resolved_at,
"closed_at": incident.closed_at,
}
# =============================================================================
# IncidentDBRepository
# =============================================================================
class IncidentDBRepository(IIncidentRepository):
"""
Incident Repository - PostgreSQL 實作
純 CRUD 操作,不含業務邏輯
業務邏輯請放在 Service 層或 Engine
"""
async def create(self, incident: Incident) -> Incident:
"""建立新的 Incident Record"""
async with get_db_context() as db:
data = _incident_to_record_data(incident)
record = IncidentRecord(**data)
db.add(record)
await db.flush()
await db.refresh(record)
logger.debug(
"incident_created_db",
incident_id=record.incident_id,
status=record.status,
)
return _record_to_incident(record)
async def get_by_id(self, incident_id: str) -> Incident | None:
"""根據 ID 取得 Incident"""
async with get_db_context() as db:
result = await db.execute(
select(IncidentRecord).where(
IncidentRecord.incident_id == incident_id
)
)
record = result.scalar_one_or_none()
return _record_to_incident(record) if record else None
async def get_active(self) -> list[Incident]:
"""取得所有活躍的 Incident"""
active_statuses = [
IncidentStatus.INVESTIGATING.value,
IncidentStatus.MITIGATING.value,
]
async with get_db_context() as db:
result = await db.execute(
select(IncidentRecord)
.where(IncidentRecord.status.in_(active_statuses))
.order_by(IncidentRecord.created_at.desc())
)
records = result.scalars().all()
return [_record_to_incident(r) for r in records]
async def update(self, incident: Incident) -> Incident | None:
"""更新 Incident"""
async with get_db_context() as db:
result = await db.execute(
select(IncidentRecord).where(
IncidentRecord.incident_id == incident.incident_id
)
)
record = result.scalar_one_or_none()
if not record:
return None
# 更新欄位
record.status = incident.status.value
record.severity = incident.severity.value
record.signals = [s.model_dump() for s in incident.signals]
record.affected_services = incident.affected_services
record.proposal_ids = incident.proposal_ids
record.updated_at = datetime.now(UTC)
record.resolved_at = incident.resolved_at
record.closed_at = incident.closed_at
await db.flush()
await db.refresh(record)
logger.debug(
"incident_updated_db",
incident_id=record.incident_id,
status=record.status,
)
return _record_to_incident(record)
async def upsert(self, incident: Incident) -> bool:
"""Upsert Incident (存在則更新,不存在則建立)"""
existing = await self.get_by_id(incident.incident_id)
if existing:
result = await self.update(incident)
return result is not None
else:
result = await self.create(incident)
return result is not None
# =============================================================================
# Singleton
# =============================================================================
_incident_repository: IncidentDBRepository | None = None
def get_incident_repository() -> IncidentDBRepository:
"""取得 IncidentDBRepository 實例 (Singleton)"""
global _incident_repository
if _incident_repository is None:
_incident_repository = IncidentDBRepository()
return _incident_repository