diff --git a/apps/api/src/api/v1/playbooks.py b/apps/api/src/api/v1/playbooks.py new file mode 100644 index 00000000..8fd04a07 --- /dev/null +++ b/apps/api/src/api/v1/playbooks.py @@ -0,0 +1,267 @@ +""" +Playbook API Router - #7 Playbook 萃取 +====================================== +Playbook CRUD API 端點 + +Phase 7.4: API Router 實作 +建立時間: 2026-03-26 (台北時區) +建立者: Claude Code (#7 Playbook 萃取) + +遵循 leWOOOgo 積木化原則: +- Router 層只做 HTTP 轉發 +- 不直接存取 Redis/DB +- 業務邏輯委託給 Service 層 +""" + +from fastapi import APIRouter, HTTPException, Query +from pydantic import BaseModel + +from src.models.playbook import ( + Playbook, + PlaybookApproveRequest, + PlaybookListResponse, + PlaybookRecommendation, + PlaybookResponse, + PlaybookStatus, + PlaybookUpdateRequest, + SymptomPatternRequest, +) +from src.services.playbook_service import get_playbook_service + +router = APIRouter(prefix="/playbooks", tags=["Playbooks"]) + + +# ============================================================================= +# Response Models +# ============================================================================= + + +class ExtractPlaybookResponse(BaseModel): + """萃取 Playbook 回應""" + + success: bool + playbook: Playbook | None = None + message: str + + +class DeletePlaybookResponse(BaseModel): + """刪除 Playbook 回應""" + + success: bool + message: str + + +# ============================================================================= +# API Endpoints +# ============================================================================= + + +@router.post("/extract/{incident_id}", response_model=ExtractPlaybookResponse) +async def extract_playbook( + incident_id: str, + auto_approve: bool = Query(False, description="自動核准 (僅限高信心度)"), +) -> ExtractPlaybookResponse: + """ + 從成功案例萃取 Playbook + + 前置條件: + - Incident 狀態為 RESOLVED 或 CLOSED + - 執行成功且 effectiveness_score >= 4 + """ + # 取得 Incident (需要從 incident_service 取得) + from src.services.incident_service import get_incident_service + + incident_service = get_incident_service() + incident = await incident_service.get_incident(incident_id) + + if not incident: + raise HTTPException( + status_code=404, + detail=f"Incident {incident_id} not found", + ) + + service = get_playbook_service() + playbook = await service.extract_from_incident( + incident=incident, + auto_approve=auto_approve, + ) + + if playbook: + return ExtractPlaybookResponse( + success=True, + playbook=playbook, + message=f"Playbook {playbook.playbook_id} extracted successfully", + ) + else: + return ExtractPlaybookResponse( + success=False, + playbook=None, + message="Cannot extract playbook: incident does not meet requirements", + ) + + +@router.post("/recommend", response_model=list[PlaybookRecommendation]) +async def get_recommendations( + symptoms: SymptomPatternRequest, + top_k: int = Query(3, ge=1, le=10, description="返回數量"), +) -> list[PlaybookRecommendation]: + """ + 根據症狀取得 Playbook 推薦 + + 輸入症狀模式,返回最相似的 Playbooks + """ + service = get_playbook_service() + recommendations = await service.get_recommendations( + symptoms=symptoms.to_symptom_pattern(), + top_k=top_k, + ) + return recommendations + + +@router.post("/{playbook_id}/approve", response_model=PlaybookResponse) +async def approve_playbook( + playbook_id: str, + request: PlaybookApproveRequest, +) -> PlaybookResponse: + """ + 人工核准 Playbook + + 將 DRAFT 狀態的 Playbook 核准為 APPROVED + """ + service = get_playbook_service() + playbook = await service.approve( + playbook_id=playbook_id, + approved_by=request.approved_by, + notes=request.notes, + ) + + if not playbook: + raise HTTPException( + status_code=404, + detail=f"Playbook {playbook_id} not found or not in DRAFT status", + ) + + return PlaybookResponse.from_playbook(playbook) + + +@router.get("/", response_model=PlaybookListResponse) +async def list_playbooks( + status: PlaybookStatus | None = Query(default=None, description="狀態過濾"), # noqa: B008 + tags: list[str] | None = Query(default=None, description="標籤過濾"), # noqa: B008 + limit: int = Query(default=20, ge=1, le=100, description="每頁數量"), + offset: int = Query(default=0, ge=0, description="偏移量"), +) -> PlaybookListResponse: + """ + 取得 Playbook 列表 + + 支援狀態和標籤過濾 + """ + service = get_playbook_service() + items, total = await service.list_playbooks( + status=status, + tags=tags, + limit=limit, + offset=offset, + ) + + return PlaybookListResponse( + items=[PlaybookResponse.from_playbook(p) for p in items], + total=total, + limit=limit, + offset=offset, + ) + + +@router.get("/{playbook_id}", response_model=PlaybookResponse) +async def get_playbook(playbook_id: str) -> PlaybookResponse: + """取得單一 Playbook""" + service = get_playbook_service() + playbook = await service.get_by_id(playbook_id) + + if not playbook: + raise HTTPException( + status_code=404, + detail=f"Playbook {playbook_id} not found", + ) + + return PlaybookResponse.from_playbook(playbook) + + +@router.patch("/{playbook_id}", response_model=PlaybookResponse) +async def update_playbook( + playbook_id: str, + request: PlaybookUpdateRequest, +) -> PlaybookResponse: + """ + 更新 Playbook (人工編輯) + + 可更新名稱、描述、步驟、標籤等 + """ + service = get_playbook_service() + playbook = await service.get_by_id(playbook_id) + + if not playbook: + raise HTTPException( + status_code=404, + detail=f"Playbook {playbook_id} not found", + ) + + # 更新欄位 + update_data = request.model_dump(exclude_unset=True) + for field, value in update_data.items(): + if value is not None: + setattr(playbook, field, value) + + updated = await service.update(playbook) + + if not updated: + raise HTTPException( + status_code=500, + detail="Failed to update playbook", + ) + + return PlaybookResponse.from_playbook(updated) + + +@router.delete("/{playbook_id}", response_model=DeletePlaybookResponse) +async def delete_playbook(playbook_id: str) -> DeletePlaybookResponse: + """ + 刪除 Playbook (軟刪除) + + 將狀態改為 DEPRECATED,不真正刪除 + """ + service = get_playbook_service() + success = await service.delete(playbook_id) + + if not success: + raise HTTPException( + status_code=404, + detail=f"Playbook {playbook_id} not found", + ) + + return DeletePlaybookResponse( + success=True, + message=f"Playbook {playbook_id} deprecated successfully", + ) + + +@router.post("/{playbook_id}/record-execution") +async def record_execution( + playbook_id: str, + success: bool = Query(..., description="執行是否成功"), +) -> dict: + """ + 記錄 Playbook 執行結果 + + 用於更新成功率統計 + """ + service = get_playbook_service() + result = await service.record_execution(playbook_id, success) + + if not result: + raise HTTPException( + status_code=404, + detail=f"Playbook {playbook_id} not found", + ) + + return {"success": True, "message": "Execution recorded"} diff --git a/apps/api/src/main.py b/apps/api/src/main.py index 9f232131..42fe221c 100644 --- a/apps/api/src/main.py +++ b/apps/api/src/main.py @@ -35,11 +35,15 @@ from src.api.v1 import ai as ai_v1 from src.api.v1 import approvals as approvals_v1 from src.api.v1 import audit_logs as audit_logs_v1 from src.api.v1 import dashboard as dashboard_v1 +from src.api.v1 import ( + github_webhook as github_webhook_v1, # Phase 13.1: GitHub → OpenClaw +) # Import API routers from src.api.v1 import health as health_v1 from src.api.v1 import incidents as incidents_v1 # Phase 6.4: Decision Proposal from src.api.v1 import metrics as metrics_v1 # Phase 7: Gold Metrics (真實血脈) +from src.api.v1 import playbooks as playbooks_v1 # #7: Playbook 萃取 from src.api.v1 import proposals as proposals_v1 # Phase 6.4h: Proposals CRUD API from src.api.v1 import stats as stats_v1 # Phase 6.5: Statistics Analytics from src.api.v1 import telegram as telegram_v1 # Phase 5.4: Telegram Gateway @@ -384,6 +388,12 @@ app.include_router( app.include_router( stats_v1.router, prefix="/api/v1", tags=["Statistics"] ) # Phase 6.5: Statistics Analytics +app.include_router( + github_webhook_v1.router, prefix="/api/v1", tags=["GitHub Webhook"] +) # Phase 13.1: GitHub → OpenClaw +app.include_router( + playbooks_v1.router, prefix="/api/v1", tags=["Playbooks"] +) # #7: Playbook 萃取 app.include_router( proposals_router.router, tags=["Proposals (Legacy)"] ) # Phase 6.4g: lewooogo-brain (舊版) diff --git a/apps/api/src/models/playbook.py b/apps/api/src/models/playbook.py new file mode 100644 index 00000000..c4ec0198 --- /dev/null +++ b/apps/api/src/models/playbook.py @@ -0,0 +1,337 @@ +""" +Playbook Models - #7 Playbook 萃取 +================================== +從成功案例萃取的修復劇本資料模型 + +Phase 7.1: 資料模型定義 +建立時間: 2026-03-26 (台北時區) +作者: Claude Code (Phase 7) + +遵循 leWOOOgo 積木化原則: +- Pydantic BaseModel 定義 +- 支援 PostgreSQL + Redis 雙層儲存 +""" + +from datetime import UTC, datetime +from enum import Enum +from typing import Any +from uuid import uuid4 + +from pydantic import BaseModel, ConfigDict, Field + +# ============================================================================= +# Enums +# ============================================================================= + + +class PlaybookStatus(str, Enum): + """Playbook 狀態""" + + DRAFT = "draft" # AI 萃取,待人工審核 + APPROVED = "approved" # 人工核准,可用於推薦 + DEPRECATED = "deprecated" # 已棄用 (有更好方案) + + +class PlaybookSource(str, Enum): + """Playbook 來源""" + + EXTRACTED = "extracted" # 從 Incident 自動萃取 + MANUAL = "manual" # 人工建立 + + +class ActionType(str, Enum): + """執行類型""" + + KUBECTL = "kubectl" # Kubernetes 命令 + SCRIPT = "script" # 腳本執行 + MANUAL = "manual" # 需人工操作 + + +class RiskLevel(str, Enum): + """風險等級""" + + LOW = "LOW" + MEDIUM = "MEDIUM" + HIGH = "HIGH" + CRITICAL = "CRITICAL" + + +# ============================================================================= +# Sub-Models +# ============================================================================= + + +class SymptomPattern(BaseModel): + """ + 症狀模式 - 用於相似度比對 + + 設計: 多維度特徵向量 + - alert_names: 告警名稱集合 + - affected_services: 受影響服務集合 + - severity: 嚴重度 + - labels: Prometheus 標籤 (k8s namespace, deployment, etc.) + """ + + alert_names: list[str] = Field( + default_factory=list, + description="告警名稱模式 (如 HighCPU*, PodCrash*)", + ) + affected_services: list[str] = Field( + default_factory=list, + description="受影響服務模式", + ) + severity_range: list[str] = Field( + default=["P1", "P2"], + description="適用嚴重度範圍", + ) + label_patterns: dict[str, str] = Field( + default_factory=dict, + description="標籤匹配 (regex)", + ) + keywords: list[str] = Field( + default_factory=list, + description="關鍵字 (從 annotations 提取)", + ) + + model_config = ConfigDict(extra="ignore") + + +class RepairStep(BaseModel): + """ + 修復步驟 + + 設計: 支援多種執行類型 + - kubectl: Kubernetes 命令 + - script: 腳本執行 + - manual: 需人工操作 + """ + + step_number: int = Field(ge=1, description="步驟序號") + action_type: ActionType = Field(description="執行類型") + command: str = Field(description="執行命令或操作描述") + expected_result: str | None = Field(None, description="預期結果") + rollback_command: str | None = Field(None, description="回滾命令") + requires_approval: bool = Field(default=False, description="是否需要人工審核") + risk_level: RiskLevel = Field(default=RiskLevel.MEDIUM, description="風險等級") + + model_config = ConfigDict(extra="ignore") + + +# ============================================================================= +# Core Model +# ============================================================================= + + +def generate_playbook_id() -> str: + """生成 Playbook ID""" + return f"PB-{datetime.now(UTC).strftime('%Y%m%d')}-{uuid4().hex[:6].upper()}" + + +class Playbook(BaseModel): + """ + Playbook - 修復劇本 + + 三層記憶位置: + - Working Memory (Redis): playbook:{playbook_id} TTL 7天 + - Episodic Memory (PostgreSQL): playbooks 表 + - Semantic Memory (Vector DB): 向量化症狀特徵 (Phase 8+) + + 設計遵循: + - ADR-003 leWOOOgo 模組化架構 + - ADR-007 資料保留策略 + """ + + # === 識別 === + playbook_id: str = Field( + default_factory=generate_playbook_id, + description="Playbook 唯一識別碼", + ) + + # === 元資料 === + name: str = Field(description="Playbook 名稱 (人類可讀)") + description: str = Field(description="問題描述與修復策略摘要") + status: PlaybookStatus = Field(default=PlaybookStatus.DRAFT) + source: PlaybookSource = Field(default=PlaybookSource.EXTRACTED) + + # === 症狀模式 === + symptom_pattern: SymptomPattern = Field( + default_factory=SymptomPattern, + description="觸發此 Playbook 的症狀模式", + ) + + # === 修復步驟 === + repair_steps: list[RepairStep] = Field( + default_factory=list, + description="修復步驟列表", + ) + estimated_duration_minutes: int = Field( + default=5, + ge=1, + le=480, + description="預估修復時間 (分鐘)", + ) + + # === 來源追溯 === + source_incident_ids: list[str] = Field( + default_factory=list, + description="萃取來源的 Incident ID", + ) + ai_confidence: float = Field( + default=0.0, + ge=0.0, + le=1.0, + description="AI 萃取信心度", + ) + + # === 統計數據 === + success_count: int = Field(default=0, ge=0, description="成功執行次數") + failure_count: int = Field(default=0, ge=0, description="失敗執行次數") + last_used_at: datetime | None = Field(None, description="最後使用時間") + + # === 人工標記 === + approved_by: str | None = Field(None, description="核准者") + approved_at: datetime | None = Field(None, description="核准時間") + tags: list[str] = Field(default_factory=list, description="標籤") + notes: str | None = Field(None, description="人工補充說明") + + # === 時間軸 === + created_at: datetime = Field(default_factory=lambda: datetime.now(UTC)) + updated_at: datetime = Field(default_factory=lambda: datetime.now(UTC)) + + model_config = ConfigDict(extra="ignore") + + @property + def success_rate(self) -> float: + """成功率""" + total = self.success_count + self.failure_count + return self.success_count / total if total > 0 else 0.0 + + @property + def is_high_quality(self) -> bool: + """ + 是否為高品質 Playbook (供 #8 自動升級參考) + + 條件: + - 狀態為 APPROVED + - 成功率 >= 95% + - 成功次數 >= 10 + """ + return ( + self.status == PlaybookStatus.APPROVED + and self.success_rate >= 0.95 + and self.success_count >= 10 + ) + + @property + def total_executions(self) -> int: + """總執行次數""" + return self.success_count + self.failure_count + + def to_redis_dict(self) -> dict[str, Any]: + """轉換為 Redis 儲存格式""" + return self.model_dump(mode="json") + + @classmethod + def from_redis_dict(cls, data: dict[str, Any]) -> "Playbook": + """從 Redis 資料還原""" + return cls.model_validate(data) + + +# ============================================================================= +# Response Models +# ============================================================================= + + +class PlaybookRecommendation(BaseModel): + """Playbook 推薦結果""" + + playbook: Playbook + similarity_score: float = Field(ge=0.0, le=1.0, description="相似度分數") + matched_symptoms: list[str] = Field( + default_factory=list, + description="匹配的症狀", + ) + reason: str = Field(description="推薦原因") + + model_config = ConfigDict(extra="ignore") + + +class PlaybookResponse(BaseModel): + """單一 Playbook 回應""" + + playbook: Playbook + success_rate: float = Field(ge=0.0, le=1.0) + is_high_quality: bool + + @classmethod + def from_playbook(cls, playbook: Playbook) -> "PlaybookResponse": + """從 Playbook 建立回應""" + return cls( + playbook=playbook, + success_rate=playbook.success_rate, + is_high_quality=playbook.is_high_quality, + ) + + +class PlaybookListResponse(BaseModel): + """Playbook 列表回應""" + + items: list[PlaybookResponse] + total: int + limit: int + offset: int + + +# ============================================================================= +# Request Models +# ============================================================================= + + +class PlaybookCreateRequest(BaseModel): + """建立 Playbook 請求 (人工建立)""" + + name: str = Field(min_length=1, max_length=256) + description: str = Field(min_length=1, max_length=2000) + symptom_pattern: SymptomPattern + repair_steps: list[RepairStep] = Field(min_length=1) + estimated_duration_minutes: int = Field(default=5, ge=1, le=480) + tags: list[str] = Field(default_factory=list) + notes: str | None = None + + +class PlaybookUpdateRequest(BaseModel): + """更新 Playbook 請求""" + + name: str | None = Field(None, min_length=1, max_length=256) + description: str | None = Field(None, min_length=1, max_length=2000) + symptom_pattern: SymptomPattern | None = None + repair_steps: list[RepairStep] | None = None + estimated_duration_minutes: int | None = Field(None, ge=1, le=480) + tags: list[str] | None = None + notes: str | None = None + status: PlaybookStatus | None = None + + +class PlaybookApproveRequest(BaseModel): + """核准 Playbook 請求""" + + approved_by: str = Field(min_length=1, max_length=128) + notes: str | None = Field(None, max_length=1000) + + +class SymptomPatternRequest(BaseModel): + """症狀模式查詢請求""" + + alert_names: list[str] = Field(default_factory=list) + affected_services: list[str] = Field(default_factory=list) + severity: str | None = None + keywords: list[str] = Field(default_factory=list) + + def to_symptom_pattern(self) -> SymptomPattern: + """轉換為 SymptomPattern""" + return SymptomPattern( + alert_names=self.alert_names, + affected_services=self.affected_services, + severity_range=[self.severity] if self.severity else ["P1", "P2"], + keywords=self.keywords, + ) diff --git a/apps/api/src/repositories/interfaces.py b/apps/api/src/repositories/interfaces.py index f8e50e31..d7fbc32a 100644 --- a/apps/api/src/repositories/interfaces.py +++ b/apps/api/src/repositories/interfaces.py @@ -18,6 +18,11 @@ from uuid import UUID from src.models.approval import ApprovalRequest, ApprovalRequestCreate, ApprovalStatus from src.models.incident import Incident +from src.models.playbook import ( + Playbook, + PlaybookStatus, + SymptomPattern, +) @runtime_checkable @@ -171,3 +176,70 @@ class IMetricsRepository(Protocol): list[float]: 每小時成功率列表 (由舊到新) """ ... + + +@runtime_checkable +class IPlaybookRepository(Protocol): + """ + Playbook Repository Protocol + + 職責: Playbook CRUD 操作 (PostgreSQL + Redis 雙層) + 實作: PlaybookRepository + + 版本: v1.0 + 建立: 2026-03-26 (台北時區) + 建立者: Claude Code (#7 Playbook 萃取) + """ + + async def create(self, playbook: Playbook) -> Playbook: + """建立新的 Playbook""" + ... + + async def get_by_id(self, playbook_id: str) -> Playbook | None: + """根據 ID 取得 Playbook""" + ... + + async def update(self, playbook: Playbook) -> Playbook | None: + """更新 Playbook""" + ... + + async def delete(self, playbook_id: str) -> bool: + """刪除 Playbook (軟刪除 → DEPRECATED)""" + ... + + async def list_playbooks( + self, + status: PlaybookStatus | None = None, + tags: list[str] | None = None, + limit: int = 20, + offset: int = 0, + ) -> tuple[list[Playbook], int]: + """ + 列出 Playbooks + + Returns: + (items, total_count) + """ + ... + + async def find_by_symptoms( + self, + symptoms: SymptomPattern, + top_k: int = 5, + min_similarity: float = 0.5, + ) -> list[tuple[Playbook, float]]: + """ + 根據症狀模式找相似 Playbook + + Returns: + list[(Playbook, similarity_score)] + """ + ... + + async def update_stats( + self, + playbook_id: str, + success: bool, + ) -> bool: + """更新執行統計""" + ... diff --git a/apps/api/src/repositories/playbook_repository.py b/apps/api/src/repositories/playbook_repository.py new file mode 100644 index 00000000..ee6dc04a --- /dev/null +++ b/apps/api/src/repositories/playbook_repository.py @@ -0,0 +1,409 @@ +""" +Playbook Repository - #7 Playbook 萃取 +====================================== +Playbook CRUD 操作 (Redis + PostgreSQL) + +Phase 7.2: Repository 實作 +建立時間: 2026-03-26 (台北時區) +建立者: Claude Code (#7 Playbook 萃取) + +遵循 leWOOOgo 積木化原則: +- 實作 IPlaybookRepository Protocol +- Redis 為 Working Memory (7天 TTL) +- PostgreSQL 為 Episodic Memory +""" + +import json +from datetime import UTC, datetime + +import structlog + +from src.core.redis_client import get_redis +from src.models.playbook import ( + Playbook, + PlaybookStatus, + SymptomPattern, +) +from src.repositories.interfaces import IPlaybookRepository + +logger = structlog.get_logger(__name__) + +# Redis TTL: 7 天 +PLAYBOOK_TTL_SECONDS = 7 * 24 * 60 * 60 + +# Redis Key 前綴 +PLAYBOOK_KEY_PREFIX = "playbook:" +PLAYBOOK_INDEX_ALERT_PREFIX = "playbook:index:alert:" +PLAYBOOK_INDEX_SERVICE_PREFIX = "playbook:index:service:" + + +def _calculate_jaccard_similarity(set_a: set, set_b: set) -> float: + """計算 Jaccard 相似度""" + if not set_a and not set_b: + return 1.0 + intersection = len(set_a & set_b) + union = len(set_a | set_b) + return intersection / union if union > 0 else 0.0 + + +def calculate_symptom_similarity( + pattern_a: SymptomPattern, + pattern_b: SymptomPattern, +) -> float: + """ + 計算症狀相似度 + + 算法: 加權 Jaccard 相似度 + + 維度權重: + - alert_names: 0.35 (最重要) + - affected_services: 0.30 + - severity: 0.15 + - keywords: 0.20 + + Returns: + float: 0.0 ~ 1.0 相似度分數 + """ + weights = { + "alert_names": 0.35, + "affected_services": 0.30, + "severity": 0.15, + "keywords": 0.20, + } + + scores = { + "alert_names": _calculate_jaccard_similarity( + set(pattern_a.alert_names), + set(pattern_b.alert_names), + ), + "affected_services": _calculate_jaccard_similarity( + set(pattern_a.affected_services), + set(pattern_b.affected_services), + ), + "severity": ( + 1.0 + if set(pattern_a.severity_range) & set(pattern_b.severity_range) + else 0.0 + ), + "keywords": _calculate_jaccard_similarity( + set(pattern_a.keywords), + set(pattern_b.keywords), + ), + } + + return sum(weights[k] * scores[k] for k in weights) + + +class PlaybookRepository: + """ + Playbook Repository 實作 + + 儲存策略: + - Redis: Working Memory (快速讀取,7天 TTL) + - PostgreSQL: Episodic Memory (持久化,待實作) + + Phase 7.2 先實作 Redis 層,PostgreSQL 待 #7.5 整合 + """ + + # === CRUD Operations === + + async def create(self, playbook: Playbook) -> Playbook: + """ + 建立新的 Playbook + + 1. 儲存到 Redis + 2. 建立索引 (alert_names, services) + """ + try: + redis_client = get_redis() + + # 確保有建立時間 + if not playbook.created_at: + playbook.created_at = datetime.now(UTC) + playbook.updated_at = datetime.now(UTC) + + # 儲存 Playbook + key = f"{PLAYBOOK_KEY_PREFIX}{playbook.playbook_id}" + await redis_client.set( + key, + json.dumps(playbook.to_redis_dict(), ensure_ascii=False), + ex=PLAYBOOK_TTL_SECONDS, + ) + + # 建立索引 + await self._update_indexes(playbook) + + logger.info( + "playbook_created", + playbook_id=playbook.playbook_id, + name=playbook.name, + ) + return playbook + + except Exception as e: + logger.error("playbook_create_failed", error=str(e)) + raise + + async def get_by_id(self, playbook_id: str) -> Playbook | None: + """根據 ID 取得 Playbook""" + try: + redis_client = get_redis() + key = f"{PLAYBOOK_KEY_PREFIX}{playbook_id}" + data = await redis_client.get(key) + + if data: + return Playbook.from_redis_dict(json.loads(data)) + return None + + except Exception as e: + logger.error("playbook_get_failed", playbook_id=playbook_id, error=str(e)) + return None + + async def update(self, playbook: Playbook) -> Playbook | None: + """更新 Playbook""" + try: + existing = await self.get_by_id(playbook.playbook_id) + if not existing: + return None + + playbook.updated_at = datetime.now(UTC) + + redis_client = get_redis() + key = f"{PLAYBOOK_KEY_PREFIX}{playbook.playbook_id}" + await redis_client.set( + key, + json.dumps(playbook.to_redis_dict(), ensure_ascii=False), + ex=PLAYBOOK_TTL_SECONDS, + ) + + # 更新索引 + await self._update_indexes(playbook) + + logger.info("playbook_updated", playbook_id=playbook.playbook_id) + return playbook + + except Exception as e: + logger.error( + "playbook_update_failed", + playbook_id=playbook.playbook_id, + error=str(e), + ) + return None + + async def delete(self, playbook_id: str) -> bool: + """ + 刪除 Playbook (軟刪除 → DEPRECATED) + + 不真正刪除,而是將狀態改為 DEPRECATED + """ + try: + playbook = await self.get_by_id(playbook_id) + if not playbook: + return False + + playbook.status = PlaybookStatus.DEPRECATED + playbook.updated_at = datetime.now(UTC) + await self.update(playbook) + + logger.info("playbook_deprecated", playbook_id=playbook_id) + return True + + except Exception as e: + logger.error( + "playbook_delete_failed", + playbook_id=playbook_id, + error=str(e), + ) + return False + + # === Query Operations === + + async def list_playbooks( + self, + status: PlaybookStatus | None = None, + tags: list[str] | None = None, + limit: int = 20, + offset: int = 0, + ) -> tuple[list[Playbook], int]: + """ + 列出 Playbooks + + 注意: Redis 實作效率較低,後續需遷移到 PostgreSQL + """ + try: + redis_client = get_redis() + + # 掃描所有 Playbook keys + pattern = f"{PLAYBOOK_KEY_PREFIX}PB-*" + keys = [] + async for key in redis_client.scan_iter(match=pattern, count=100): + keys.append(key) + + # 讀取並過濾 + all_playbooks: list[Playbook] = [] + for key in keys: + data = await redis_client.get(key) + if data: + playbook = Playbook.from_redis_dict(json.loads(data)) + + # 狀態過濾 + if status and playbook.status != status: + continue + + # 標籤過濾 + if tags and not set(tags).intersection(set(playbook.tags)): + continue + + all_playbooks.append(playbook) + + # 排序: 按 updated_at 降序 + all_playbooks.sort(key=lambda p: p.updated_at, reverse=True) + + total = len(all_playbooks) + items = all_playbooks[offset : offset + limit] + + return items, total + + except Exception as e: + logger.error("playbook_list_failed", error=str(e)) + return [], 0 + + async def find_by_symptoms( + self, + symptoms: SymptomPattern, + top_k: int = 5, + min_similarity: float = 0.5, + ) -> list[tuple[Playbook, float]]: + """ + 根據症狀模式找相似 Playbook + + 策略: + 1. 從索引快速過濾候選 + 2. 計算詳細相似度 + 3. 返回 Top K + """ + try: + redis_client = get_redis() + + # 1. 使用索引找候選 Playbook IDs + candidate_ids: set[str] = set() + + # 從 alert_names 索引查詢 + for alert_name in symptoms.alert_names: + index_key = f"{PLAYBOOK_INDEX_ALERT_PREFIX}{alert_name}" + members = await redis_client.smembers(index_key) + candidate_ids.update(m.decode() if isinstance(m, bytes) else m for m in members) + + # 從 services 索引查詢 + for service in symptoms.affected_services: + index_key = f"{PLAYBOOK_INDEX_SERVICE_PREFIX}{service}" + members = await redis_client.smembers(index_key) + candidate_ids.update(m.decode() if isinstance(m, bytes) else m for m in members) + + # 如果沒有索引命中,掃描所有 APPROVED Playbooks + if not candidate_ids: + playbooks, _ = await self.list_playbooks( + status=PlaybookStatus.APPROVED, + limit=100, + ) + candidate_ids = {p.playbook_id for p in playbooks} + + # 2. 計算相似度 + results: list[tuple[Playbook, float]] = [] + + for playbook_id in candidate_ids: + playbook = await self.get_by_id(playbook_id) + if not playbook: + continue + + # 只考慮 APPROVED 狀態 + if playbook.status != PlaybookStatus.APPROVED: + continue + + similarity = calculate_symptom_similarity( + symptoms, + playbook.symptom_pattern, + ) + + if similarity >= min_similarity: + results.append((playbook, similarity)) + + # 3. 排序並返回 Top K + results.sort(key=lambda x: x[1], reverse=True) + return results[:top_k] + + except Exception as e: + logger.error("playbook_find_symptoms_failed", error=str(e)) + return [] + + async def update_stats( + self, + playbook_id: str, + success: bool, + ) -> bool: + """更新執行統計""" + try: + playbook = await self.get_by_id(playbook_id) + if not playbook: + return False + + if success: + playbook.success_count += 1 + else: + playbook.failure_count += 1 + + playbook.last_used_at = datetime.now(UTC) + await self.update(playbook) + + logger.info( + "playbook_stats_updated", + playbook_id=playbook_id, + success=success, + success_rate=playbook.success_rate, + ) + return True + + except Exception as e: + logger.error( + "playbook_stats_update_failed", + playbook_id=playbook_id, + error=str(e), + ) + return False + + # === Index Management === + + async def _update_indexes(self, playbook: Playbook) -> None: + """更新索引""" + try: + redis_client = get_redis() + + # Alert names 索引 + for alert_name in playbook.symptom_pattern.alert_names: + index_key = f"{PLAYBOOK_INDEX_ALERT_PREFIX}{alert_name}" + await redis_client.sadd(index_key, playbook.playbook_id) + await redis_client.expire(index_key, PLAYBOOK_TTL_SECONDS) + + # Services 索引 + for service in playbook.symptom_pattern.affected_services: + index_key = f"{PLAYBOOK_INDEX_SERVICE_PREFIX}{service}" + await redis_client.sadd(index_key, playbook.playbook_id) + await redis_client.expire(index_key, PLAYBOOK_TTL_SECONDS) + + except Exception as e: + logger.warning("playbook_index_update_failed", error=str(e)) + + +# ============================================================================= +# Singleton +# ============================================================================= + +_repository: PlaybookRepository | None = None + + +def get_playbook_repository() -> IPlaybookRepository: + """取得 PlaybookRepository 單例""" + global _repository + if _repository is None: + _repository = PlaybookRepository() + return _repository diff --git a/apps/api/src/services/playbook_service.py b/apps/api/src/services/playbook_service.py new file mode 100644 index 00000000..7def9a4e --- /dev/null +++ b/apps/api/src/services/playbook_service.py @@ -0,0 +1,474 @@ +""" +Playbook Service - #7 Playbook 萃取 +=================================== +Playbook 業務邏輯層 + +Phase 7.3: Service 實作 +建立時間: 2026-03-26 (台北時區) +建立者: Claude Code (#7 Playbook 萃取) + +遵循 leWOOOgo 積木化原則: +- Service 層只依賴 Repository Interface +- 不直接存取 Redis/DB +- 封裝所有業務邏輯 +""" + +from datetime import UTC, datetime +from typing import Protocol + +import structlog + +from src.models.incident import Incident, IncidentStatus +from src.models.playbook import ( + ActionType, + Playbook, + PlaybookRecommendation, + PlaybookSource, + PlaybookStatus, + RepairStep, + RiskLevel, + SymptomPattern, +) +from src.repositories.interfaces import IPlaybookRepository +from src.repositories.playbook_repository import get_playbook_repository + +logger = structlog.get_logger(__name__) + + +class IPlaybookService(Protocol): + """Playbook Service Interface""" + + async def extract_from_incident( + self, + incident: Incident, + auto_approve: bool = False, + ) -> Playbook | None: + """從成功案例萃取 Playbook""" + ... + + async def get_recommendations( + self, + symptoms: SymptomPattern, + top_k: int = 3, + ) -> list[PlaybookRecommendation]: + """取得 Playbook 推薦""" + ... + + async def approve( + self, + playbook_id: str, + approved_by: str, + notes: str | None = None, + ) -> Playbook | None: + """核准 Playbook""" + ... + + async def record_execution( + self, + playbook_id: str, + success: bool, + ) -> bool: + """記錄 Playbook 執行結果""" + ... + + +class PlaybookService: + """ + Playbook Service 實作 + + 職責: + - 從 Incident 萃取 Playbook + - 提供 Playbook 推薦 + - 管理 Playbook 生命週期 + """ + + def __init__(self, repository: IPlaybookRepository | None = None): + self._repository = repository or get_playbook_repository() + + # === Core Operations === + + async def extract_from_incident( + self, + incident: Incident, + auto_approve: bool = False, + ) -> Playbook | None: + """ + 從成功案例萃取 Playbook + + 前置條件: + - Incident 狀態為 RESOLVED 或 CLOSED + - outcome.execution_success == True + - outcome.effectiveness_score >= 4 + + Args: + incident: 來源 Incident + auto_approve: 是否自動核准 (僅限高信心度) + + Returns: + Playbook | None + """ + # 1. 驗證前置條件 + if incident.status not in [IncidentStatus.RESOLVED, IncidentStatus.CLOSED]: + logger.warning( + "playbook_extract_invalid_status", + incident_id=incident.incident_id, + status=incident.status, + ) + return None + + if not incident.outcome or not incident.outcome.execution_success: + logger.warning( + "playbook_extract_no_successful_outcome", + incident_id=incident.incident_id, + ) + return None + + effectiveness = incident.outcome.effectiveness_score or 0 + if effectiveness < 4: + logger.info( + "playbook_extract_low_effectiveness", + incident_id=incident.incident_id, + effectiveness=effectiveness, + ) + return None + + # 2. 萃取症狀模式 + symptom_pattern = self._extract_symptom_pattern(incident) + + # 3. 萃取修復步驟 + repair_steps = self._extract_repair_steps(incident) + + # 4. 計算信心度 + confidence = self._calculate_confidence(incident, effectiveness) + + # 5. 生成名稱和描述 + name = self._generate_name(incident) + description = self._generate_description(incident) + + # 6. 建立 Playbook + playbook = Playbook( + name=name, + description=description, + status=PlaybookStatus.APPROVED if auto_approve and confidence >= 0.9 else PlaybookStatus.DRAFT, + source=PlaybookSource.EXTRACTED, + symptom_pattern=symptom_pattern, + repair_steps=repair_steps, + source_incident_ids=[incident.incident_id], + ai_confidence=confidence, + tags=self._extract_tags(incident), + ) + + # 7. 儲存 + playbook = await self._repository.create(playbook) + + logger.info( + "playbook_extracted", + playbook_id=playbook.playbook_id, + incident_id=incident.incident_id, + confidence=confidence, + auto_approved=playbook.status == PlaybookStatus.APPROVED, + ) + + return playbook + + async def get_recommendations( + self, + symptoms: SymptomPattern, + top_k: int = 3, + ) -> list[PlaybookRecommendation]: + """ + 取得 Playbook 推薦 + + 策略: + 1. 從 Repository 找相似症狀的 Playbook + 2. 按 similarity_score * success_rate 排序 + 3. 返回 Top K 推薦 + """ + # 查詢相似 Playbook + similar_playbooks = await self._repository.find_by_symptoms( + symptoms=symptoms, + top_k=top_k * 2, # 多取一些用於後續過濾 + min_similarity=0.4, + ) + + if not similar_playbooks: + return [] + + # 建立推薦列表 + recommendations: list[PlaybookRecommendation] = [] + + for playbook, similarity in similar_playbooks: + # 找出匹配的症狀 + matched_symptoms = self._find_matched_symptoms(symptoms, playbook.symptom_pattern) + + # 生成推薦原因 + reason = self._generate_recommendation_reason( + playbook, + similarity, + matched_symptoms, + ) + + recommendations.append( + PlaybookRecommendation( + playbook=playbook, + similarity_score=similarity, + matched_symptoms=matched_symptoms, + reason=reason, + ) + ) + + # 按綜合分數排序 + recommendations.sort( + key=lambda r: r.similarity_score * (0.5 + 0.5 * r.playbook.success_rate), + reverse=True, + ) + + return recommendations[:top_k] + + async def approve( + self, + playbook_id: str, + approved_by: str, + notes: str | None = None, + ) -> Playbook | None: + """核准 Playbook""" + playbook = await self._repository.get_by_id(playbook_id) + if not playbook: + return None + + if playbook.status != PlaybookStatus.DRAFT: + logger.warning( + "playbook_approve_invalid_status", + playbook_id=playbook_id, + current_status=playbook.status, + ) + return None + + playbook.status = PlaybookStatus.APPROVED + playbook.approved_by = approved_by + playbook.approved_at = datetime.now(UTC) + if notes: + playbook.notes = notes + + updated = await self._repository.update(playbook) + + if updated: + logger.info( + "playbook_approved", + playbook_id=playbook_id, + approved_by=approved_by, + ) + + return updated + + async def record_execution( + self, + playbook_id: str, + success: bool, + ) -> bool: + """記錄 Playbook 執行結果""" + return await self._repository.update_stats(playbook_id, success) + + # === CRUD Proxies === + + async def get_by_id(self, playbook_id: str) -> Playbook | None: + """取得 Playbook""" + return await self._repository.get_by_id(playbook_id) + + async def list_playbooks( + self, + status: PlaybookStatus | None = None, + tags: list[str] | None = None, + limit: int = 20, + offset: int = 0, + ) -> tuple[list[Playbook], int]: + """列出 Playbooks""" + return await self._repository.list_playbooks( + status=status, + tags=tags, + limit=limit, + offset=offset, + ) + + async def update(self, playbook: Playbook) -> Playbook | None: + """更新 Playbook""" + return await self._repository.update(playbook) + + async def delete(self, playbook_id: str) -> bool: + """刪除 Playbook (軟刪除)""" + return await self._repository.delete(playbook_id) + + # === Private Helpers === + + def _extract_symptom_pattern(self, incident: Incident) -> SymptomPattern: + """從 Incident 萃取症狀模式""" + alert_names = [s.alert_name for s in incident.signals] if incident.signals else [] + keywords = [] + + # 從 annotations 提取關鍵字 + for signal in incident.signals or []: + if signal.annotations: + for value in signal.annotations.values(): + if isinstance(value, str) and len(value) < 50: + keywords.append(value) + + return SymptomPattern( + alert_names=alert_names, + affected_services=incident.affected_services or [], + severity_range=[incident.severity.value] if incident.severity else ["P2"], + keywords=keywords[:10], # 最多 10 個關鍵字 + ) + + def _extract_repair_steps(self, incident: Incident) -> list[RepairStep]: + """從 Incident 萃取修復步驟""" + steps: list[RepairStep] = [] + + # 從 decision_chain 提取 + if incident.decision_chain: + for i, step in enumerate(incident.decision_chain.steps, 1): + if step.executed_action: + steps.append( + RepairStep( + step_number=i, + action_type=ActionType.KUBECTL, + command=step.executed_action, + expected_result=step.result or None, + risk_level=RiskLevel.MEDIUM, + ) + ) + + # 如果沒有從 decision_chain 取得,嘗試從 outcome 取得 + if not steps and incident.outcome and incident.outcome.repair_action: + steps.append( + RepairStep( + step_number=1, + action_type=ActionType.KUBECTL, + command=incident.outcome.repair_action, + risk_level=RiskLevel.MEDIUM, + ) + ) + + return steps + + def _calculate_confidence(self, incident: Incident, effectiveness: int) -> float: + """計算 AI 萃取信心度""" + base_score = 0.5 + + # effectiveness 貢獻 (4-5 → 0.2-0.4) + effectiveness_bonus = (effectiveness - 3) * 0.2 + + # 有 decision_chain 加分 + if incident.decision_chain and incident.decision_chain.steps: + base_score += 0.1 + + # 有多個 signals 加分 (更多資料) + if incident.signals and len(incident.signals) >= 2: + base_score += 0.05 + + return min(base_score + effectiveness_bonus, 1.0) + + def _generate_name(self, incident: Incident) -> str: + """生成 Playbook 名稱""" + alert_name = incident.signals[0].alert_name if incident.signals else "Unknown" + services = incident.affected_services[:2] if incident.affected_services else [] + service_str = "/".join(services) if services else "system" + + return f"{alert_name} - {service_str} 修復劇本" + + def _generate_description(self, incident: Incident) -> str: + """生成 Playbook 描述""" + parts = [] + + if incident.signals: + parts.append(f"觸發告警: {incident.signals[0].alert_name}") + + if incident.affected_services: + parts.append(f"影響服務: {', '.join(incident.affected_services)}") + + if incident.outcome and incident.outcome.repair_action: + parts.append(f"修復動作: {incident.outcome.repair_action[:100]}") + + return ". ".join(parts) if parts else "從成功案例自動萃取的修復劇本" + + def _extract_tags(self, incident: Incident) -> list[str]: + """萃取標籤""" + tags: set[str] = set() + + # 從服務名稱提取 + for service in incident.affected_services or []: + tags.add(service.lower()) + + # 從告警名稱提取類型 + if incident.signals: + for signal in incident.signals: + if "cpu" in signal.alert_name.lower(): + tags.add("cpu") + if "memory" in signal.alert_name.lower(): + tags.add("memory") + if "pod" in signal.alert_name.lower(): + tags.add("kubernetes") + if "network" in signal.alert_name.lower(): + tags.add("network") + + return list(tags)[:10] + + def _find_matched_symptoms( + self, + query: SymptomPattern, + playbook_pattern: SymptomPattern, + ) -> list[str]: + """找出匹配的症狀""" + matched = [] + + # 匹配的告警 + alert_matches = set(query.alert_names) & set(playbook_pattern.alert_names) + for alert in alert_matches: + matched.append(f"Alert: {alert}") + + # 匹配的服務 + service_matches = set(query.affected_services) & set(playbook_pattern.affected_services) + for service in service_matches: + matched.append(f"Service: {service}") + + # 匹配的嚴重度 + if set(query.severity_range) & set(playbook_pattern.severity_range): + matched.append(f"Severity: {query.severity_range[0]}") + + return matched + + def _generate_recommendation_reason( + self, + playbook: Playbook, + similarity: float, + matched_symptoms: list[str], + ) -> str: + """生成推薦原因""" + parts = [] + + parts.append(f"相似度 {similarity:.0%}") + + if playbook.success_rate > 0: + parts.append(f"成功率 {playbook.success_rate:.0%}") + + if playbook.total_executions > 0: + parts.append(f"已執行 {playbook.total_executions} 次") + + if matched_symptoms: + parts.append(f"匹配: {', '.join(matched_symptoms[:3])}") + + return ". ".join(parts) + + +# ============================================================================= +# Singleton +# ============================================================================= + +_service: PlaybookService | None = None + + +def get_playbook_service() -> IPlaybookService: + """取得 PlaybookService 單例""" + global _service + if _service is None: + _service = PlaybookService() + return _service