feat(api): #7 Playbook 萃取功能 (Phase 7.1-7.4)

實作內容:
- models/playbook.py: Playbook 資料模型 + Request/Response
- repositories/playbook_repository.py: Redis 雙層儲存
- repositories/interfaces.py: IPlaybookRepository Protocol
- services/playbook_service.py: 業務邏輯 (萃取/推薦/核准)
- api/v1/playbooks.py: REST API 端點

API 端點:
- POST /playbooks/extract/{incident_id} - 從成功案例萃取
- POST /playbooks/recommend - 症狀匹配推薦
- POST /playbooks/{id}/approve - 人工核准
- GET/PATCH/DELETE /playbooks/{id} - CRUD

遵循 leWOOOgo 積木化: Router → Service → Repository

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
OG T
2026-03-26 10:54:13 +08:00
parent 8a163609bf
commit 698687f092
6 changed files with 1569 additions and 0 deletions

View File

@@ -0,0 +1,267 @@
"""
Playbook API Router - #7 Playbook 萃取
======================================
Playbook CRUD API 端點
Phase 7.4: API Router 實作
建立時間: 2026-03-26 (台北時區)
建立者: Claude Code (#7 Playbook 萃取)
遵循 leWOOOgo 積木化原則:
- Router 層只做 HTTP 轉發
- 不直接存取 Redis/DB
- 業務邏輯委託給 Service 層
"""
from fastapi import APIRouter, HTTPException, Query
from pydantic import BaseModel
from src.models.playbook import (
Playbook,
PlaybookApproveRequest,
PlaybookListResponse,
PlaybookRecommendation,
PlaybookResponse,
PlaybookStatus,
PlaybookUpdateRequest,
SymptomPatternRequest,
)
from src.services.playbook_service import get_playbook_service
router = APIRouter(prefix="/playbooks", tags=["Playbooks"])
# =============================================================================
# Response Models
# =============================================================================
class ExtractPlaybookResponse(BaseModel):
"""萃取 Playbook 回應"""
success: bool
playbook: Playbook | None = None
message: str
class DeletePlaybookResponse(BaseModel):
"""刪除 Playbook 回應"""
success: bool
message: str
# =============================================================================
# API Endpoints
# =============================================================================
@router.post("/extract/{incident_id}", response_model=ExtractPlaybookResponse)
async def extract_playbook(
incident_id: str,
auto_approve: bool = Query(False, description="自動核准 (僅限高信心度)"),
) -> ExtractPlaybookResponse:
"""
從成功案例萃取 Playbook
前置條件:
- Incident 狀態為 RESOLVED 或 CLOSED
- 執行成功且 effectiveness_score >= 4
"""
# 取得 Incident (需要從 incident_service 取得)
from src.services.incident_service import get_incident_service
incident_service = get_incident_service()
incident = await incident_service.get_incident(incident_id)
if not incident:
raise HTTPException(
status_code=404,
detail=f"Incident {incident_id} not found",
)
service = get_playbook_service()
playbook = await service.extract_from_incident(
incident=incident,
auto_approve=auto_approve,
)
if playbook:
return ExtractPlaybookResponse(
success=True,
playbook=playbook,
message=f"Playbook {playbook.playbook_id} extracted successfully",
)
else:
return ExtractPlaybookResponse(
success=False,
playbook=None,
message="Cannot extract playbook: incident does not meet requirements",
)
@router.post("/recommend", response_model=list[PlaybookRecommendation])
async def get_recommendations(
symptoms: SymptomPatternRequest,
top_k: int = Query(3, ge=1, le=10, description="返回數量"),
) -> list[PlaybookRecommendation]:
"""
根據症狀取得 Playbook 推薦
輸入症狀模式,返回最相似的 Playbooks
"""
service = get_playbook_service()
recommendations = await service.get_recommendations(
symptoms=symptoms.to_symptom_pattern(),
top_k=top_k,
)
return recommendations
@router.post("/{playbook_id}/approve", response_model=PlaybookResponse)
async def approve_playbook(
playbook_id: str,
request: PlaybookApproveRequest,
) -> PlaybookResponse:
"""
人工核准 Playbook
將 DRAFT 狀態的 Playbook 核准為 APPROVED
"""
service = get_playbook_service()
playbook = await service.approve(
playbook_id=playbook_id,
approved_by=request.approved_by,
notes=request.notes,
)
if not playbook:
raise HTTPException(
status_code=404,
detail=f"Playbook {playbook_id} not found or not in DRAFT status",
)
return PlaybookResponse.from_playbook(playbook)
@router.get("/", response_model=PlaybookListResponse)
async def list_playbooks(
status: PlaybookStatus | None = Query(default=None, description="狀態過濾"), # noqa: B008
tags: list[str] | None = Query(default=None, description="標籤過濾"), # noqa: B008
limit: int = Query(default=20, ge=1, le=100, description="每頁數量"),
offset: int = Query(default=0, ge=0, description="偏移量"),
) -> PlaybookListResponse:
"""
取得 Playbook 列表
支援狀態和標籤過濾
"""
service = get_playbook_service()
items, total = await service.list_playbooks(
status=status,
tags=tags,
limit=limit,
offset=offset,
)
return PlaybookListResponse(
items=[PlaybookResponse.from_playbook(p) for p in items],
total=total,
limit=limit,
offset=offset,
)
@router.get("/{playbook_id}", response_model=PlaybookResponse)
async def get_playbook(playbook_id: str) -> PlaybookResponse:
"""取得單一 Playbook"""
service = get_playbook_service()
playbook = await service.get_by_id(playbook_id)
if not playbook:
raise HTTPException(
status_code=404,
detail=f"Playbook {playbook_id} not found",
)
return PlaybookResponse.from_playbook(playbook)
@router.patch("/{playbook_id}", response_model=PlaybookResponse)
async def update_playbook(
playbook_id: str,
request: PlaybookUpdateRequest,
) -> PlaybookResponse:
"""
更新 Playbook (人工編輯)
可更新名稱、描述、步驟、標籤等
"""
service = get_playbook_service()
playbook = await service.get_by_id(playbook_id)
if not playbook:
raise HTTPException(
status_code=404,
detail=f"Playbook {playbook_id} not found",
)
# 更新欄位
update_data = request.model_dump(exclude_unset=True)
for field, value in update_data.items():
if value is not None:
setattr(playbook, field, value)
updated = await service.update(playbook)
if not updated:
raise HTTPException(
status_code=500,
detail="Failed to update playbook",
)
return PlaybookResponse.from_playbook(updated)
@router.delete("/{playbook_id}", response_model=DeletePlaybookResponse)
async def delete_playbook(playbook_id: str) -> DeletePlaybookResponse:
"""
刪除 Playbook (軟刪除)
將狀態改為 DEPRECATED不真正刪除
"""
service = get_playbook_service()
success = await service.delete(playbook_id)
if not success:
raise HTTPException(
status_code=404,
detail=f"Playbook {playbook_id} not found",
)
return DeletePlaybookResponse(
success=True,
message=f"Playbook {playbook_id} deprecated successfully",
)
@router.post("/{playbook_id}/record-execution")
async def record_execution(
playbook_id: str,
success: bool = Query(..., description="執行是否成功"),
) -> dict:
"""
記錄 Playbook 執行結果
用於更新成功率統計
"""
service = get_playbook_service()
result = await service.record_execution(playbook_id, success)
if not result:
raise HTTPException(
status_code=404,
detail=f"Playbook {playbook_id} not found",
)
return {"success": True, "message": "Execution recorded"}

View File

@@ -35,11 +35,15 @@ from src.api.v1 import ai as ai_v1
from src.api.v1 import approvals as approvals_v1
from src.api.v1 import audit_logs as audit_logs_v1
from src.api.v1 import dashboard as dashboard_v1
from src.api.v1 import (
github_webhook as github_webhook_v1, # Phase 13.1: GitHub → OpenClaw
)
# Import API routers
from src.api.v1 import health as health_v1
from src.api.v1 import incidents as incidents_v1 # Phase 6.4: Decision Proposal
from src.api.v1 import metrics as metrics_v1 # Phase 7: Gold Metrics (真實血脈)
from src.api.v1 import playbooks as playbooks_v1 # #7: Playbook 萃取
from src.api.v1 import proposals as proposals_v1 # Phase 6.4h: Proposals CRUD API
from src.api.v1 import stats as stats_v1 # Phase 6.5: Statistics Analytics
from src.api.v1 import telegram as telegram_v1 # Phase 5.4: Telegram Gateway
@@ -384,6 +388,12 @@ app.include_router(
app.include_router(
stats_v1.router, prefix="/api/v1", tags=["Statistics"]
) # Phase 6.5: Statistics Analytics
app.include_router(
github_webhook_v1.router, prefix="/api/v1", tags=["GitHub Webhook"]
) # Phase 13.1: GitHub → OpenClaw
app.include_router(
playbooks_v1.router, prefix="/api/v1", tags=["Playbooks"]
) # #7: Playbook 萃取
app.include_router(
proposals_router.router, tags=["Proposals (Legacy)"]
) # Phase 6.4g: lewooogo-brain (舊版)

View File

@@ -0,0 +1,337 @@
"""
Playbook Models - #7 Playbook 萃取
==================================
從成功案例萃取的修復劇本資料模型
Phase 7.1: 資料模型定義
建立時間: 2026-03-26 (台北時區)
作者: Claude Code (Phase 7)
遵循 leWOOOgo 積木化原則:
- Pydantic BaseModel 定義
- 支援 PostgreSQL + Redis 雙層儲存
"""
from datetime import UTC, datetime
from enum import Enum
from typing import Any
from uuid import uuid4
from pydantic import BaseModel, ConfigDict, Field
# =============================================================================
# Enums
# =============================================================================
class PlaybookStatus(str, Enum):
"""Playbook 狀態"""
DRAFT = "draft" # AI 萃取,待人工審核
APPROVED = "approved" # 人工核准,可用於推薦
DEPRECATED = "deprecated" # 已棄用 (有更好方案)
class PlaybookSource(str, Enum):
"""Playbook 來源"""
EXTRACTED = "extracted" # 從 Incident 自動萃取
MANUAL = "manual" # 人工建立
class ActionType(str, Enum):
"""執行類型"""
KUBECTL = "kubectl" # Kubernetes 命令
SCRIPT = "script" # 腳本執行
MANUAL = "manual" # 需人工操作
class RiskLevel(str, Enum):
"""風險等級"""
LOW = "LOW"
MEDIUM = "MEDIUM"
HIGH = "HIGH"
CRITICAL = "CRITICAL"
# =============================================================================
# Sub-Models
# =============================================================================
class SymptomPattern(BaseModel):
"""
症狀模式 - 用於相似度比對
設計: 多維度特徵向量
- alert_names: 告警名稱集合
- affected_services: 受影響服務集合
- severity: 嚴重度
- labels: Prometheus 標籤 (k8s namespace, deployment, etc.)
"""
alert_names: list[str] = Field(
default_factory=list,
description="告警名稱模式 (如 HighCPU*, PodCrash*)",
)
affected_services: list[str] = Field(
default_factory=list,
description="受影響服務模式",
)
severity_range: list[str] = Field(
default=["P1", "P2"],
description="適用嚴重度範圍",
)
label_patterns: dict[str, str] = Field(
default_factory=dict,
description="標籤匹配 (regex)",
)
keywords: list[str] = Field(
default_factory=list,
description="關鍵字 (從 annotations 提取)",
)
model_config = ConfigDict(extra="ignore")
class RepairStep(BaseModel):
"""
修復步驟
設計: 支援多種執行類型
- kubectl: Kubernetes 命令
- script: 腳本執行
- manual: 需人工操作
"""
step_number: int = Field(ge=1, description="步驟序號")
action_type: ActionType = Field(description="執行類型")
command: str = Field(description="執行命令或操作描述")
expected_result: str | None = Field(None, description="預期結果")
rollback_command: str | None = Field(None, description="回滾命令")
requires_approval: bool = Field(default=False, description="是否需要人工審核")
risk_level: RiskLevel = Field(default=RiskLevel.MEDIUM, description="風險等級")
model_config = ConfigDict(extra="ignore")
# =============================================================================
# Core Model
# =============================================================================
def generate_playbook_id() -> str:
"""生成 Playbook ID"""
return f"PB-{datetime.now(UTC).strftime('%Y%m%d')}-{uuid4().hex[:6].upper()}"
class Playbook(BaseModel):
"""
Playbook - 修復劇本
三層記憶位置:
- Working Memory (Redis): playbook:{playbook_id} TTL 7天
- Episodic Memory (PostgreSQL): playbooks 表
- Semantic Memory (Vector DB): 向量化症狀特徵 (Phase 8+)
設計遵循:
- ADR-003 leWOOOgo 模組化架構
- ADR-007 資料保留策略
"""
# === 識別 ===
playbook_id: str = Field(
default_factory=generate_playbook_id,
description="Playbook 唯一識別碼",
)
# === 元資料 ===
name: str = Field(description="Playbook 名稱 (人類可讀)")
description: str = Field(description="問題描述與修復策略摘要")
status: PlaybookStatus = Field(default=PlaybookStatus.DRAFT)
source: PlaybookSource = Field(default=PlaybookSource.EXTRACTED)
# === 症狀模式 ===
symptom_pattern: SymptomPattern = Field(
default_factory=SymptomPattern,
description="觸發此 Playbook 的症狀模式",
)
# === 修復步驟 ===
repair_steps: list[RepairStep] = Field(
default_factory=list,
description="修復步驟列表",
)
estimated_duration_minutes: int = Field(
default=5,
ge=1,
le=480,
description="預估修復時間 (分鐘)",
)
# === 來源追溯 ===
source_incident_ids: list[str] = Field(
default_factory=list,
description="萃取來源的 Incident ID",
)
ai_confidence: float = Field(
default=0.0,
ge=0.0,
le=1.0,
description="AI 萃取信心度",
)
# === 統計數據 ===
success_count: int = Field(default=0, ge=0, description="成功執行次數")
failure_count: int = Field(default=0, ge=0, description="失敗執行次數")
last_used_at: datetime | None = Field(None, description="最後使用時間")
# === 人工標記 ===
approved_by: str | None = Field(None, description="核准者")
approved_at: datetime | None = Field(None, description="核准時間")
tags: list[str] = Field(default_factory=list, description="標籤")
notes: str | None = Field(None, description="人工補充說明")
# === 時間軸 ===
created_at: datetime = Field(default_factory=lambda: datetime.now(UTC))
updated_at: datetime = Field(default_factory=lambda: datetime.now(UTC))
model_config = ConfigDict(extra="ignore")
@property
def success_rate(self) -> float:
"""成功率"""
total = self.success_count + self.failure_count
return self.success_count / total if total > 0 else 0.0
@property
def is_high_quality(self) -> bool:
"""
是否為高品質 Playbook (供 #8 自動升級參考)
條件:
- 狀態為 APPROVED
- 成功率 >= 95%
- 成功次數 >= 10
"""
return (
self.status == PlaybookStatus.APPROVED
and self.success_rate >= 0.95
and self.success_count >= 10
)
@property
def total_executions(self) -> int:
"""總執行次數"""
return self.success_count + self.failure_count
def to_redis_dict(self) -> dict[str, Any]:
"""轉換為 Redis 儲存格式"""
return self.model_dump(mode="json")
@classmethod
def from_redis_dict(cls, data: dict[str, Any]) -> "Playbook":
"""從 Redis 資料還原"""
return cls.model_validate(data)
# =============================================================================
# Response Models
# =============================================================================
class PlaybookRecommendation(BaseModel):
"""Playbook 推薦結果"""
playbook: Playbook
similarity_score: float = Field(ge=0.0, le=1.0, description="相似度分數")
matched_symptoms: list[str] = Field(
default_factory=list,
description="匹配的症狀",
)
reason: str = Field(description="推薦原因")
model_config = ConfigDict(extra="ignore")
class PlaybookResponse(BaseModel):
"""單一 Playbook 回應"""
playbook: Playbook
success_rate: float = Field(ge=0.0, le=1.0)
is_high_quality: bool
@classmethod
def from_playbook(cls, playbook: Playbook) -> "PlaybookResponse":
"""從 Playbook 建立回應"""
return cls(
playbook=playbook,
success_rate=playbook.success_rate,
is_high_quality=playbook.is_high_quality,
)
class PlaybookListResponse(BaseModel):
"""Playbook 列表回應"""
items: list[PlaybookResponse]
total: int
limit: int
offset: int
# =============================================================================
# Request Models
# =============================================================================
class PlaybookCreateRequest(BaseModel):
"""建立 Playbook 請求 (人工建立)"""
name: str = Field(min_length=1, max_length=256)
description: str = Field(min_length=1, max_length=2000)
symptom_pattern: SymptomPattern
repair_steps: list[RepairStep] = Field(min_length=1)
estimated_duration_minutes: int = Field(default=5, ge=1, le=480)
tags: list[str] = Field(default_factory=list)
notes: str | None = None
class PlaybookUpdateRequest(BaseModel):
"""更新 Playbook 請求"""
name: str | None = Field(None, min_length=1, max_length=256)
description: str | None = Field(None, min_length=1, max_length=2000)
symptom_pattern: SymptomPattern | None = None
repair_steps: list[RepairStep] | None = None
estimated_duration_minutes: int | None = Field(None, ge=1, le=480)
tags: list[str] | None = None
notes: str | None = None
status: PlaybookStatus | None = None
class PlaybookApproveRequest(BaseModel):
"""核准 Playbook 請求"""
approved_by: str = Field(min_length=1, max_length=128)
notes: str | None = Field(None, max_length=1000)
class SymptomPatternRequest(BaseModel):
"""症狀模式查詢請求"""
alert_names: list[str] = Field(default_factory=list)
affected_services: list[str] = Field(default_factory=list)
severity: str | None = None
keywords: list[str] = Field(default_factory=list)
def to_symptom_pattern(self) -> SymptomPattern:
"""轉換為 SymptomPattern"""
return SymptomPattern(
alert_names=self.alert_names,
affected_services=self.affected_services,
severity_range=[self.severity] if self.severity else ["P1", "P2"],
keywords=self.keywords,
)

View File

@@ -18,6 +18,11 @@ from uuid import UUID
from src.models.approval import ApprovalRequest, ApprovalRequestCreate, ApprovalStatus
from src.models.incident import Incident
from src.models.playbook import (
Playbook,
PlaybookStatus,
SymptomPattern,
)
@runtime_checkable
@@ -171,3 +176,70 @@ class IMetricsRepository(Protocol):
list[float]: 每小時成功率列表 (由舊到新)
"""
...
@runtime_checkable
class IPlaybookRepository(Protocol):
"""
Playbook Repository Protocol
職責: Playbook CRUD 操作 (PostgreSQL + Redis 雙層)
實作: PlaybookRepository
版本: v1.0
建立: 2026-03-26 (台北時區)
建立者: Claude Code (#7 Playbook 萃取)
"""
async def create(self, playbook: Playbook) -> Playbook:
"""建立新的 Playbook"""
...
async def get_by_id(self, playbook_id: str) -> Playbook | None:
"""根據 ID 取得 Playbook"""
...
async def update(self, playbook: Playbook) -> Playbook | None:
"""更新 Playbook"""
...
async def delete(self, playbook_id: str) -> bool:
"""刪除 Playbook (軟刪除 → DEPRECATED)"""
...
async def list_playbooks(
self,
status: PlaybookStatus | None = None,
tags: list[str] | None = None,
limit: int = 20,
offset: int = 0,
) -> tuple[list[Playbook], int]:
"""
列出 Playbooks
Returns:
(items, total_count)
"""
...
async def find_by_symptoms(
self,
symptoms: SymptomPattern,
top_k: int = 5,
min_similarity: float = 0.5,
) -> list[tuple[Playbook, float]]:
"""
根據症狀模式找相似 Playbook
Returns:
list[(Playbook, similarity_score)]
"""
...
async def update_stats(
self,
playbook_id: str,
success: bool,
) -> bool:
"""更新執行統計"""
...

View File

@@ -0,0 +1,409 @@
"""
Playbook Repository - #7 Playbook 萃取
======================================
Playbook CRUD 操作 (Redis + PostgreSQL)
Phase 7.2: Repository 實作
建立時間: 2026-03-26 (台北時區)
建立者: Claude Code (#7 Playbook 萃取)
遵循 leWOOOgo 積木化原則:
- 實作 IPlaybookRepository Protocol
- Redis 為 Working Memory (7天 TTL)
- PostgreSQL 為 Episodic Memory
"""
import json
from datetime import UTC, datetime
import structlog
from src.core.redis_client import get_redis
from src.models.playbook import (
Playbook,
PlaybookStatus,
SymptomPattern,
)
from src.repositories.interfaces import IPlaybookRepository
logger = structlog.get_logger(__name__)
# Redis TTL: 7 天
PLAYBOOK_TTL_SECONDS = 7 * 24 * 60 * 60
# Redis Key 前綴
PLAYBOOK_KEY_PREFIX = "playbook:"
PLAYBOOK_INDEX_ALERT_PREFIX = "playbook:index:alert:"
PLAYBOOK_INDEX_SERVICE_PREFIX = "playbook:index:service:"
def _calculate_jaccard_similarity(set_a: set, set_b: set) -> float:
"""計算 Jaccard 相似度"""
if not set_a and not set_b:
return 1.0
intersection = len(set_a & set_b)
union = len(set_a | set_b)
return intersection / union if union > 0 else 0.0
def calculate_symptom_similarity(
pattern_a: SymptomPattern,
pattern_b: SymptomPattern,
) -> float:
"""
計算症狀相似度
算法: 加權 Jaccard 相似度
維度權重:
- alert_names: 0.35 (最重要)
- affected_services: 0.30
- severity: 0.15
- keywords: 0.20
Returns:
float: 0.0 ~ 1.0 相似度分數
"""
weights = {
"alert_names": 0.35,
"affected_services": 0.30,
"severity": 0.15,
"keywords": 0.20,
}
scores = {
"alert_names": _calculate_jaccard_similarity(
set(pattern_a.alert_names),
set(pattern_b.alert_names),
),
"affected_services": _calculate_jaccard_similarity(
set(pattern_a.affected_services),
set(pattern_b.affected_services),
),
"severity": (
1.0
if set(pattern_a.severity_range) & set(pattern_b.severity_range)
else 0.0
),
"keywords": _calculate_jaccard_similarity(
set(pattern_a.keywords),
set(pattern_b.keywords),
),
}
return sum(weights[k] * scores[k] for k in weights)
class PlaybookRepository:
"""
Playbook Repository 實作
儲存策略:
- Redis: Working Memory (快速讀取7天 TTL)
- PostgreSQL: Episodic Memory (持久化,待實作)
Phase 7.2 先實作 Redis 層PostgreSQL 待 #7.5 整合
"""
# === CRUD Operations ===
async def create(self, playbook: Playbook) -> Playbook:
"""
建立新的 Playbook
1. 儲存到 Redis
2. 建立索引 (alert_names, services)
"""
try:
redis_client = get_redis()
# 確保有建立時間
if not playbook.created_at:
playbook.created_at = datetime.now(UTC)
playbook.updated_at = datetime.now(UTC)
# 儲存 Playbook
key = f"{PLAYBOOK_KEY_PREFIX}{playbook.playbook_id}"
await redis_client.set(
key,
json.dumps(playbook.to_redis_dict(), ensure_ascii=False),
ex=PLAYBOOK_TTL_SECONDS,
)
# 建立索引
await self._update_indexes(playbook)
logger.info(
"playbook_created",
playbook_id=playbook.playbook_id,
name=playbook.name,
)
return playbook
except Exception as e:
logger.error("playbook_create_failed", error=str(e))
raise
async def get_by_id(self, playbook_id: str) -> Playbook | None:
"""根據 ID 取得 Playbook"""
try:
redis_client = get_redis()
key = f"{PLAYBOOK_KEY_PREFIX}{playbook_id}"
data = await redis_client.get(key)
if data:
return Playbook.from_redis_dict(json.loads(data))
return None
except Exception as e:
logger.error("playbook_get_failed", playbook_id=playbook_id, error=str(e))
return None
async def update(self, playbook: Playbook) -> Playbook | None:
"""更新 Playbook"""
try:
existing = await self.get_by_id(playbook.playbook_id)
if not existing:
return None
playbook.updated_at = datetime.now(UTC)
redis_client = get_redis()
key = f"{PLAYBOOK_KEY_PREFIX}{playbook.playbook_id}"
await redis_client.set(
key,
json.dumps(playbook.to_redis_dict(), ensure_ascii=False),
ex=PLAYBOOK_TTL_SECONDS,
)
# 更新索引
await self._update_indexes(playbook)
logger.info("playbook_updated", playbook_id=playbook.playbook_id)
return playbook
except Exception as e:
logger.error(
"playbook_update_failed",
playbook_id=playbook.playbook_id,
error=str(e),
)
return None
async def delete(self, playbook_id: str) -> bool:
"""
刪除 Playbook (軟刪除 → DEPRECATED)
不真正刪除,而是將狀態改為 DEPRECATED
"""
try:
playbook = await self.get_by_id(playbook_id)
if not playbook:
return False
playbook.status = PlaybookStatus.DEPRECATED
playbook.updated_at = datetime.now(UTC)
await self.update(playbook)
logger.info("playbook_deprecated", playbook_id=playbook_id)
return True
except Exception as e:
logger.error(
"playbook_delete_failed",
playbook_id=playbook_id,
error=str(e),
)
return False
# === Query Operations ===
async def list_playbooks(
self,
status: PlaybookStatus | None = None,
tags: list[str] | None = None,
limit: int = 20,
offset: int = 0,
) -> tuple[list[Playbook], int]:
"""
列出 Playbooks
注意: Redis 實作效率較低,後續需遷移到 PostgreSQL
"""
try:
redis_client = get_redis()
# 掃描所有 Playbook keys
pattern = f"{PLAYBOOK_KEY_PREFIX}PB-*"
keys = []
async for key in redis_client.scan_iter(match=pattern, count=100):
keys.append(key)
# 讀取並過濾
all_playbooks: list[Playbook] = []
for key in keys:
data = await redis_client.get(key)
if data:
playbook = Playbook.from_redis_dict(json.loads(data))
# 狀態過濾
if status and playbook.status != status:
continue
# 標籤過濾
if tags and not set(tags).intersection(set(playbook.tags)):
continue
all_playbooks.append(playbook)
# 排序: 按 updated_at 降序
all_playbooks.sort(key=lambda p: p.updated_at, reverse=True)
total = len(all_playbooks)
items = all_playbooks[offset : offset + limit]
return items, total
except Exception as e:
logger.error("playbook_list_failed", error=str(e))
return [], 0
async def find_by_symptoms(
self,
symptoms: SymptomPattern,
top_k: int = 5,
min_similarity: float = 0.5,
) -> list[tuple[Playbook, float]]:
"""
根據症狀模式找相似 Playbook
策略:
1. 從索引快速過濾候選
2. 計算詳細相似度
3. 返回 Top K
"""
try:
redis_client = get_redis()
# 1. 使用索引找候選 Playbook IDs
candidate_ids: set[str] = set()
# 從 alert_names 索引查詢
for alert_name in symptoms.alert_names:
index_key = f"{PLAYBOOK_INDEX_ALERT_PREFIX}{alert_name}"
members = await redis_client.smembers(index_key)
candidate_ids.update(m.decode() if isinstance(m, bytes) else m for m in members)
# 從 services 索引查詢
for service in symptoms.affected_services:
index_key = f"{PLAYBOOK_INDEX_SERVICE_PREFIX}{service}"
members = await redis_client.smembers(index_key)
candidate_ids.update(m.decode() if isinstance(m, bytes) else m for m in members)
# 如果沒有索引命中,掃描所有 APPROVED Playbooks
if not candidate_ids:
playbooks, _ = await self.list_playbooks(
status=PlaybookStatus.APPROVED,
limit=100,
)
candidate_ids = {p.playbook_id for p in playbooks}
# 2. 計算相似度
results: list[tuple[Playbook, float]] = []
for playbook_id in candidate_ids:
playbook = await self.get_by_id(playbook_id)
if not playbook:
continue
# 只考慮 APPROVED 狀態
if playbook.status != PlaybookStatus.APPROVED:
continue
similarity = calculate_symptom_similarity(
symptoms,
playbook.symptom_pattern,
)
if similarity >= min_similarity:
results.append((playbook, similarity))
# 3. 排序並返回 Top K
results.sort(key=lambda x: x[1], reverse=True)
return results[:top_k]
except Exception as e:
logger.error("playbook_find_symptoms_failed", error=str(e))
return []
async def update_stats(
self,
playbook_id: str,
success: bool,
) -> bool:
"""更新執行統計"""
try:
playbook = await self.get_by_id(playbook_id)
if not playbook:
return False
if success:
playbook.success_count += 1
else:
playbook.failure_count += 1
playbook.last_used_at = datetime.now(UTC)
await self.update(playbook)
logger.info(
"playbook_stats_updated",
playbook_id=playbook_id,
success=success,
success_rate=playbook.success_rate,
)
return True
except Exception as e:
logger.error(
"playbook_stats_update_failed",
playbook_id=playbook_id,
error=str(e),
)
return False
# === Index Management ===
async def _update_indexes(self, playbook: Playbook) -> None:
"""更新索引"""
try:
redis_client = get_redis()
# Alert names 索引
for alert_name in playbook.symptom_pattern.alert_names:
index_key = f"{PLAYBOOK_INDEX_ALERT_PREFIX}{alert_name}"
await redis_client.sadd(index_key, playbook.playbook_id)
await redis_client.expire(index_key, PLAYBOOK_TTL_SECONDS)
# Services 索引
for service in playbook.symptom_pattern.affected_services:
index_key = f"{PLAYBOOK_INDEX_SERVICE_PREFIX}{service}"
await redis_client.sadd(index_key, playbook.playbook_id)
await redis_client.expire(index_key, PLAYBOOK_TTL_SECONDS)
except Exception as e:
logger.warning("playbook_index_update_failed", error=str(e))
# =============================================================================
# Singleton
# =============================================================================
_repository: PlaybookRepository | None = None
def get_playbook_repository() -> IPlaybookRepository:
"""取得 PlaybookRepository 單例"""
global _repository
if _repository is None:
_repository = PlaybookRepository()
return _repository

View File

@@ -0,0 +1,474 @@
"""
Playbook Service - #7 Playbook 萃取
===================================
Playbook 業務邏輯層
Phase 7.3: Service 實作
建立時間: 2026-03-26 (台北時區)
建立者: Claude Code (#7 Playbook 萃取)
遵循 leWOOOgo 積木化原則:
- Service 層只依賴 Repository Interface
- 不直接存取 Redis/DB
- 封裝所有業務邏輯
"""
from datetime import UTC, datetime
from typing import Protocol
import structlog
from src.models.incident import Incident, IncidentStatus
from src.models.playbook import (
ActionType,
Playbook,
PlaybookRecommendation,
PlaybookSource,
PlaybookStatus,
RepairStep,
RiskLevel,
SymptomPattern,
)
from src.repositories.interfaces import IPlaybookRepository
from src.repositories.playbook_repository import get_playbook_repository
logger = structlog.get_logger(__name__)
class IPlaybookService(Protocol):
"""Playbook Service Interface"""
async def extract_from_incident(
self,
incident: Incident,
auto_approve: bool = False,
) -> Playbook | None:
"""從成功案例萃取 Playbook"""
...
async def get_recommendations(
self,
symptoms: SymptomPattern,
top_k: int = 3,
) -> list[PlaybookRecommendation]:
"""取得 Playbook 推薦"""
...
async def approve(
self,
playbook_id: str,
approved_by: str,
notes: str | None = None,
) -> Playbook | None:
"""核准 Playbook"""
...
async def record_execution(
self,
playbook_id: str,
success: bool,
) -> bool:
"""記錄 Playbook 執行結果"""
...
class PlaybookService:
"""
Playbook Service 實作
職責:
- 從 Incident 萃取 Playbook
- 提供 Playbook 推薦
- 管理 Playbook 生命週期
"""
def __init__(self, repository: IPlaybookRepository | None = None):
self._repository = repository or get_playbook_repository()
# === Core Operations ===
async def extract_from_incident(
self,
incident: Incident,
auto_approve: bool = False,
) -> Playbook | None:
"""
從成功案例萃取 Playbook
前置條件:
- Incident 狀態為 RESOLVED 或 CLOSED
- outcome.execution_success == True
- outcome.effectiveness_score >= 4
Args:
incident: 來源 Incident
auto_approve: 是否自動核准 (僅限高信心度)
Returns:
Playbook | None
"""
# 1. 驗證前置條件
if incident.status not in [IncidentStatus.RESOLVED, IncidentStatus.CLOSED]:
logger.warning(
"playbook_extract_invalid_status",
incident_id=incident.incident_id,
status=incident.status,
)
return None
if not incident.outcome or not incident.outcome.execution_success:
logger.warning(
"playbook_extract_no_successful_outcome",
incident_id=incident.incident_id,
)
return None
effectiveness = incident.outcome.effectiveness_score or 0
if effectiveness < 4:
logger.info(
"playbook_extract_low_effectiveness",
incident_id=incident.incident_id,
effectiveness=effectiveness,
)
return None
# 2. 萃取症狀模式
symptom_pattern = self._extract_symptom_pattern(incident)
# 3. 萃取修復步驟
repair_steps = self._extract_repair_steps(incident)
# 4. 計算信心度
confidence = self._calculate_confidence(incident, effectiveness)
# 5. 生成名稱和描述
name = self._generate_name(incident)
description = self._generate_description(incident)
# 6. 建立 Playbook
playbook = Playbook(
name=name,
description=description,
status=PlaybookStatus.APPROVED if auto_approve and confidence >= 0.9 else PlaybookStatus.DRAFT,
source=PlaybookSource.EXTRACTED,
symptom_pattern=symptom_pattern,
repair_steps=repair_steps,
source_incident_ids=[incident.incident_id],
ai_confidence=confidence,
tags=self._extract_tags(incident),
)
# 7. 儲存
playbook = await self._repository.create(playbook)
logger.info(
"playbook_extracted",
playbook_id=playbook.playbook_id,
incident_id=incident.incident_id,
confidence=confidence,
auto_approved=playbook.status == PlaybookStatus.APPROVED,
)
return playbook
async def get_recommendations(
self,
symptoms: SymptomPattern,
top_k: int = 3,
) -> list[PlaybookRecommendation]:
"""
取得 Playbook 推薦
策略:
1. 從 Repository 找相似症狀的 Playbook
2. 按 similarity_score * success_rate 排序
3. 返回 Top K 推薦
"""
# 查詢相似 Playbook
similar_playbooks = await self._repository.find_by_symptoms(
symptoms=symptoms,
top_k=top_k * 2, # 多取一些用於後續過濾
min_similarity=0.4,
)
if not similar_playbooks:
return []
# 建立推薦列表
recommendations: list[PlaybookRecommendation] = []
for playbook, similarity in similar_playbooks:
# 找出匹配的症狀
matched_symptoms = self._find_matched_symptoms(symptoms, playbook.symptom_pattern)
# 生成推薦原因
reason = self._generate_recommendation_reason(
playbook,
similarity,
matched_symptoms,
)
recommendations.append(
PlaybookRecommendation(
playbook=playbook,
similarity_score=similarity,
matched_symptoms=matched_symptoms,
reason=reason,
)
)
# 按綜合分數排序
recommendations.sort(
key=lambda r: r.similarity_score * (0.5 + 0.5 * r.playbook.success_rate),
reverse=True,
)
return recommendations[:top_k]
async def approve(
self,
playbook_id: str,
approved_by: str,
notes: str | None = None,
) -> Playbook | None:
"""核准 Playbook"""
playbook = await self._repository.get_by_id(playbook_id)
if not playbook:
return None
if playbook.status != PlaybookStatus.DRAFT:
logger.warning(
"playbook_approve_invalid_status",
playbook_id=playbook_id,
current_status=playbook.status,
)
return None
playbook.status = PlaybookStatus.APPROVED
playbook.approved_by = approved_by
playbook.approved_at = datetime.now(UTC)
if notes:
playbook.notes = notes
updated = await self._repository.update(playbook)
if updated:
logger.info(
"playbook_approved",
playbook_id=playbook_id,
approved_by=approved_by,
)
return updated
async def record_execution(
self,
playbook_id: str,
success: bool,
) -> bool:
"""記錄 Playbook 執行結果"""
return await self._repository.update_stats(playbook_id, success)
# === CRUD Proxies ===
async def get_by_id(self, playbook_id: str) -> Playbook | None:
"""取得 Playbook"""
return await self._repository.get_by_id(playbook_id)
async def list_playbooks(
self,
status: PlaybookStatus | None = None,
tags: list[str] | None = None,
limit: int = 20,
offset: int = 0,
) -> tuple[list[Playbook], int]:
"""列出 Playbooks"""
return await self._repository.list_playbooks(
status=status,
tags=tags,
limit=limit,
offset=offset,
)
async def update(self, playbook: Playbook) -> Playbook | None:
"""更新 Playbook"""
return await self._repository.update(playbook)
async def delete(self, playbook_id: str) -> bool:
"""刪除 Playbook (軟刪除)"""
return await self._repository.delete(playbook_id)
# === Private Helpers ===
def _extract_symptom_pattern(self, incident: Incident) -> SymptomPattern:
"""從 Incident 萃取症狀模式"""
alert_names = [s.alert_name for s in incident.signals] if incident.signals else []
keywords = []
# 從 annotations 提取關鍵字
for signal in incident.signals or []:
if signal.annotations:
for value in signal.annotations.values():
if isinstance(value, str) and len(value) < 50:
keywords.append(value)
return SymptomPattern(
alert_names=alert_names,
affected_services=incident.affected_services or [],
severity_range=[incident.severity.value] if incident.severity else ["P2"],
keywords=keywords[:10], # 最多 10 個關鍵字
)
def _extract_repair_steps(self, incident: Incident) -> list[RepairStep]:
"""從 Incident 萃取修復步驟"""
steps: list[RepairStep] = []
# 從 decision_chain 提取
if incident.decision_chain:
for i, step in enumerate(incident.decision_chain.steps, 1):
if step.executed_action:
steps.append(
RepairStep(
step_number=i,
action_type=ActionType.KUBECTL,
command=step.executed_action,
expected_result=step.result or None,
risk_level=RiskLevel.MEDIUM,
)
)
# 如果沒有從 decision_chain 取得,嘗試從 outcome 取得
if not steps and incident.outcome and incident.outcome.repair_action:
steps.append(
RepairStep(
step_number=1,
action_type=ActionType.KUBECTL,
command=incident.outcome.repair_action,
risk_level=RiskLevel.MEDIUM,
)
)
return steps
def _calculate_confidence(self, incident: Incident, effectiveness: int) -> float:
"""計算 AI 萃取信心度"""
base_score = 0.5
# effectiveness 貢獻 (4-5 → 0.2-0.4)
effectiveness_bonus = (effectiveness - 3) * 0.2
# 有 decision_chain 加分
if incident.decision_chain and incident.decision_chain.steps:
base_score += 0.1
# 有多個 signals 加分 (更多資料)
if incident.signals and len(incident.signals) >= 2:
base_score += 0.05
return min(base_score + effectiveness_bonus, 1.0)
def _generate_name(self, incident: Incident) -> str:
"""生成 Playbook 名稱"""
alert_name = incident.signals[0].alert_name if incident.signals else "Unknown"
services = incident.affected_services[:2] if incident.affected_services else []
service_str = "/".join(services) if services else "system"
return f"{alert_name} - {service_str} 修復劇本"
def _generate_description(self, incident: Incident) -> str:
"""生成 Playbook 描述"""
parts = []
if incident.signals:
parts.append(f"觸發告警: {incident.signals[0].alert_name}")
if incident.affected_services:
parts.append(f"影響服務: {', '.join(incident.affected_services)}")
if incident.outcome and incident.outcome.repair_action:
parts.append(f"修復動作: {incident.outcome.repair_action[:100]}")
return ". ".join(parts) if parts else "從成功案例自動萃取的修復劇本"
def _extract_tags(self, incident: Incident) -> list[str]:
"""萃取標籤"""
tags: set[str] = set()
# 從服務名稱提取
for service in incident.affected_services or []:
tags.add(service.lower())
# 從告警名稱提取類型
if incident.signals:
for signal in incident.signals:
if "cpu" in signal.alert_name.lower():
tags.add("cpu")
if "memory" in signal.alert_name.lower():
tags.add("memory")
if "pod" in signal.alert_name.lower():
tags.add("kubernetes")
if "network" in signal.alert_name.lower():
tags.add("network")
return list(tags)[:10]
def _find_matched_symptoms(
self,
query: SymptomPattern,
playbook_pattern: SymptomPattern,
) -> list[str]:
"""找出匹配的症狀"""
matched = []
# 匹配的告警
alert_matches = set(query.alert_names) & set(playbook_pattern.alert_names)
for alert in alert_matches:
matched.append(f"Alert: {alert}")
# 匹配的服務
service_matches = set(query.affected_services) & set(playbook_pattern.affected_services)
for service in service_matches:
matched.append(f"Service: {service}")
# 匹配的嚴重度
if set(query.severity_range) & set(playbook_pattern.severity_range):
matched.append(f"Severity: {query.severity_range[0]}")
return matched
def _generate_recommendation_reason(
self,
playbook: Playbook,
similarity: float,
matched_symptoms: list[str],
) -> str:
"""生成推薦原因"""
parts = []
parts.append(f"相似度 {similarity:.0%}")
if playbook.success_rate > 0:
parts.append(f"成功率 {playbook.success_rate:.0%}")
if playbook.total_executions > 0:
parts.append(f"已執行 {playbook.total_executions}")
if matched_symptoms:
parts.append(f"匹配: {', '.join(matched_symptoms[:3])}")
return ". ".join(parts)
# =============================================================================
# Singleton
# =============================================================================
_service: PlaybookService | None = None
def get_playbook_service() -> IPlaybookService:
"""取得 PlaybookService 單例"""
global _service
if _service is None:
_service = PlaybookService()
return _service