feat(api): Phase 7.5-7.6 Playbook 整合決策與自動萃取

Phase 7.5: DecisionManager 三軌決策
- 新增 Playbook 優先匹配 (similarity >= 85%)
- 三軌決策順序: Playbook > LLM > Expert System
- 整合 PlaybookService 推薦引擎

Phase 7.6: 自動萃取機制
- approval_execution.py 成功執行後觸發萃取
- 條件: RESOLVED/CLOSED + effectiveness >= 4
- 滿分 (5) 自動核准 Playbook

測試:
- 13 個 Playbook 單元測試全部通過
- 修復 Incident 模型欄位對應 (reasoning_steps)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
OG T
2026-03-26 11:09:25 +08:00
parent 6f99113888
commit 2e75a20150
4 changed files with 658 additions and 29 deletions

View File

@@ -8,9 +8,11 @@ Approval Execution Service - Phase 16 R4.2 瘦身 Router 抽取
- ApprovalDBService: 更新狀態
- TimelineService: 記錄事件
- NotificationManager: 發送通知
- Phase 7.6: Playbook 自動萃取
版本: v1.0
版本: v1.1
建立: 2026-03-25 (台北時區)
更新: 2026-03-26 (Phase 7.6 自動萃取)
建立者: Claude Code (Phase 16 R4.2)
"""
@@ -144,6 +146,11 @@ class ApprovalExecutionService:
duration_ms=result.duration_ms,
)
)
# Phase 7.6: 觸發 Playbook 自動萃取 (fire-and-forget)
asyncio.create_task(
self._trigger_playbook_extraction(approval)
)
else:
logger.error(
"background_execution_failed",
@@ -250,6 +257,134 @@ class ApprovalExecutionService:
error=str(e),
)
async def _trigger_playbook_extraction(
self,
approval: ApprovalRequest,
) -> None:
"""
Phase 7.6: 觸發 Playbook 自動萃取
條件:
- 執行成功
- 關聯的 Incident 狀態為 RESOLVED 或 CLOSED
- effectiveness_score >= 4
此函數為 fire-and-forget失敗不影響主流程
"""
try:
# 1. 從 approval 取得關聯的 incident_id
# approval.requested_by 可能包含 incident 資訊,或從 metadata 取得
# 暫時從 description 或 action 解析
incident_id = self._extract_incident_id_from_approval(approval)
if not incident_id:
logger.debug(
"playbook_extraction_skip",
approval_id=str(approval.id),
reason="No incident_id found",
)
return
# 2. 取得 Incident
from src.services.incident_service import get_incident_service
incident_service = get_incident_service()
incident = await incident_service.get_incident(incident_id)
if not incident:
logger.debug(
"playbook_extraction_skip",
approval_id=str(approval.id),
incident_id=incident_id,
reason="Incident not found",
)
return
# 3. 檢查 Incident 狀態
from src.models.incident import IncidentStatus
if incident.status not in [IncidentStatus.RESOLVED, IncidentStatus.CLOSED]:
logger.debug(
"playbook_extraction_skip",
approval_id=str(approval.id),
incident_id=incident_id,
incident_status=incident.status.value,
reason="Incident not resolved",
)
return
# 4. 檢查 effectiveness_score
effectiveness = incident.outcome.effectiveness_score if incident.outcome else 0
if effectiveness < 4:
logger.debug(
"playbook_extraction_skip",
approval_id=str(approval.id),
incident_id=incident_id,
effectiveness=effectiveness,
reason="Low effectiveness score",
)
return
# 5. 觸發萃取
from src.services.playbook_service import get_playbook_service
playbook_service = get_playbook_service()
playbook = await playbook_service.extract_from_incident(
incident=incident,
auto_approve=effectiveness >= 5, # 滿分自動核准
)
if playbook:
logger.info(
"playbook_auto_extracted",
approval_id=str(approval.id),
incident_id=incident_id,
playbook_id=playbook.playbook_id,
playbook_name=playbook.name,
auto_approved=playbook.status.value == "approved",
)
else:
logger.debug(
"playbook_extraction_no_result",
approval_id=str(approval.id),
incident_id=incident_id,
)
except Exception as e:
# 萃取失敗不影響主流程
logger.warning(
"playbook_extraction_error",
approval_id=str(approval.id),
error=str(e),
)
def _extract_incident_id_from_approval(
self,
approval: ApprovalRequest,
) -> str | None:
"""
從 approval 提取關聯的 incident_id
嘗試以下來源:
1. approval.metadata (如果有)
2. approval.description 中的 INC- 模式
3. approval.requested_by 中的 incident 資訊
"""
import re
# 從 description 或 action 中尋找 INC-XXXXXX 模式
text = f"{approval.description or ''} {approval.action or ''}"
match = re.search(r"INC-([A-Z0-9-]+)", text)
if match:
return match.group(0) # 返回完整的 INC-XXXXX
# 從 requested_by 尋找
if approval.requested_by and "INC-" in approval.requested_by:
match = re.search(r"INC-([A-Z0-9-]+)", approval.requested_by)
if match:
return match.group(0)
return None
# =============================================================================
# Singleton Instance

View File

@@ -30,10 +30,15 @@ import structlog
from src.core.config import settings
from src.core.redis_client import get_redis
from src.models.incident import Incident
from src.models.playbook import SymptomPattern
from src.services.openclaw import get_openclaw
from src.services.playbook_service import get_playbook_service
logger = structlog.get_logger(__name__)
# Phase 7.5: Playbook 優先閾值
PLAYBOOK_SIMILARITY_THRESHOLD = 0.85 # 相似度 >= 85% 直接使用 Playbook
# =============================================================================
# Telegram 推送 (Phase 6.5: 決策就緒通知)
@@ -394,13 +399,20 @@ class DecisionManager:
incident: Incident,
) -> dict[str, Any]:
"""
軌決策分析
軌決策分析 (Phase 7.5 升級)
策略:
- 同時啟動 LLM 和 Expert System
- LLM 成功則用 LLM (更智能)
- LLM 失敗則用 Expert System (保底)
1. 先檢查 Playbook 是否有高度匹配 (similarity >= 85%)
2. Playbook 命中則直接使用 (最快、經驗驗證)
3. 否則 LLM + Expert System 雙軌
優先順序: Playbook > LLM > Expert System
"""
# Phase 7.5: 先嘗試 Playbook 匹配
playbook_result = await self._try_playbook_match(incident)
if playbook_result:
return playbook_result
# Expert System 同步執行 (立即可用)
expert_result = expert_analyze(incident)
@@ -440,6 +452,108 @@ class DecisionManager:
)
return expert_result
async def _try_playbook_match(
self,
incident: Incident,
) -> dict[str, Any] | None:
"""
Phase 7.5: 嘗試 Playbook 匹配
條件:
- 相似度 >= PLAYBOOK_SIMILARITY_THRESHOLD (85%)
- Playbook 狀態為 APPROVED
- 成功率 >= 80% (如果有執行紀錄)
Returns:
匹配成功返回 proposal_data否則 None
"""
try:
playbook_service = get_playbook_service()
# 建構症狀模式
alert_names = [s.alert_name for s in incident.signals] if incident.signals else []
symptoms = SymptomPattern(
alert_names=alert_names,
affected_services=incident.affected_services or [],
severity_range=[incident.severity.value] if incident.severity else ["P2"],
)
# 取得推薦 (只取 Top 1)
recommendations = await playbook_service.get_recommendations(
symptoms=symptoms,
top_k=1,
)
if not recommendations:
logger.debug(
"playbook_no_match",
incident_id=incident.incident_id,
)
return None
best_match = recommendations[0]
playbook = best_match.playbook
# 檢查相似度閾值
if best_match.similarity_score < PLAYBOOK_SIMILARITY_THRESHOLD:
logger.debug(
"playbook_similarity_below_threshold",
incident_id=incident.incident_id,
playbook_id=playbook.playbook_id,
similarity=best_match.similarity_score,
threshold=PLAYBOOK_SIMILARITY_THRESHOLD,
)
return None
# 檢查成功率 (如果有執行紀錄)
if playbook.total_executions > 0 and playbook.success_rate < 0.8:
logger.debug(
"playbook_low_success_rate",
incident_id=incident.incident_id,
playbook_id=playbook.playbook_id,
success_rate=playbook.success_rate,
)
return None
# Playbook 命中!
# 取得第一個修復步驟的指令
kubectl_command = ""
if playbook.repair_steps:
# 將 target 替換為實際服務名稱
target = incident.affected_services[0] if incident.affected_services else "unknown"
kubectl_command = playbook.repair_steps[0].command.format(target=target)
logger.info(
"playbook_match_success",
incident_id=incident.incident_id,
playbook_id=playbook.playbook_id,
playbook_name=playbook.name,
similarity=best_match.similarity_score,
success_rate=playbook.success_rate,
)
return {
"source": "playbook",
"playbook_id": playbook.playbook_id,
"playbook_name": playbook.name,
"action": kubectl_command,
"kubectl_command": kubectl_command,
"description": playbook.description,
"risk_level": playbook.repair_steps[0].risk_level.value.lower() if playbook.repair_steps else "medium",
"reasoning": f"Playbook 匹配 ({best_match.similarity_score:.0%} 相似度, {playbook.success_rate:.0%} 成功率): {best_match.reason}",
"confidence": min(best_match.similarity_score, playbook.success_rate) if playbook.total_executions > 0 else best_match.similarity_score,
"matched_symptoms": best_match.matched_symptoms,
"from_cache": False,
}
except Exception as e:
logger.warning(
"playbook_match_error",
incident_id=incident.incident_id,
error=str(e),
)
return None
async def _find_existing_token(
self,
incident_id: str,

View File

@@ -322,31 +322,39 @@ class PlaybookService:
def _extract_repair_steps(self, incident: Incident) -> list[RepairStep]:
"""從 Incident 萃取修復步驟"""
steps: list[RepairStep] = []
step_number = 1
# 從 decision_chain 提取
if incident.decision_chain:
for i, step in enumerate(incident.decision_chain.steps, 1):
if step.executed_action:
steps.append(
RepairStep(
step_number=i,
action_type=ActionType.KUBECTL,
command=step.executed_action,
expected_result=step.result or None,
risk_level=RiskLevel.MEDIUM,
# 從 decision_chain.reasoning_steps 提取 kubectl 命令
if incident.decision_chain and incident.decision_chain.reasoning_steps:
for reasoning in incident.decision_chain.reasoning_steps:
# 尋找包含 kubectl 的步驟
if "kubectl" in reasoning.lower():
# 嘗試提取 kubectl 命令
import re
kubectl_match = re.search(r"kubectl\s+\S+.*", reasoning)
if kubectl_match:
steps.append(
RepairStep(
step_number=step_number,
action_type=ActionType.KUBECTL,
command=kubectl_match.group(0).strip(),
risk_level=RiskLevel.MEDIUM,
)
)
)
step_number += 1
# 如果沒有從 decision_chain 取得,嘗試從 outcome 取得
if not steps and incident.outcome and incident.outcome.repair_action:
steps.append(
RepairStep(
step_number=1,
action_type=ActionType.KUBECTL,
command=incident.outcome.repair_action,
risk_level=RiskLevel.MEDIUM,
# 如果沒有從 reasoning_steps 取得,嘗試從 learning_notes 取得
if not steps and incident.outcome and incident.outcome.learning_notes:
notes = incident.outcome.learning_notes
if "kubectl" in notes.lower():
steps.append(
RepairStep(
step_number=1,
action_type=ActionType.KUBECTL,
command=notes,
risk_level=RiskLevel.MEDIUM,
)
)
)
return steps
@@ -358,7 +366,7 @@ class PlaybookService:
effectiveness_bonus = (effectiveness - 3) * 0.2
# 有 decision_chain 加分
if incident.decision_chain and incident.decision_chain.steps:
if incident.decision_chain and incident.decision_chain.reasoning_steps:
base_score += 0.1
# 有多個 signals 加分 (更多資料)
@@ -385,8 +393,9 @@ class PlaybookService:
if incident.affected_services:
parts.append(f"影響服務: {', '.join(incident.affected_services)}")
if incident.outcome and incident.outcome.repair_action:
parts.append(f"修復動作: {incident.outcome.repair_action[:100]}")
# 從 decision_chain.hypothesis 取得 AI 分析結果
if incident.decision_chain and incident.decision_chain.hypothesis:
parts.append(f"AI 分析: {incident.decision_chain.hypothesis[:100]}")
return ". ".join(parts) if parts else "從成功案例自動萃取的修復劇本"

View File

@@ -0,0 +1,371 @@
"""
Playbook Service Tests - #7 Playbook 萃取
==========================================
測試 Playbook 服務層功能
版本: v1.0
建立: 2026-03-26 (台北時區)
建立者: Claude Code (Phase 7.5-7.6)
"""
from datetime import UTC, datetime
import pytest
from src.models.incident import (
Incident,
IncidentOutcome,
IncidentStatus,
Severity,
Signal,
)
from src.models.playbook import (
ActionType,
Playbook,
PlaybookStatus,
RepairStep,
RiskLevel,
SymptomPattern,
)
from src.services.playbook_service import PlaybookService
class MockPlaybookRepository:
"""Mock repository for testing"""
def __init__(self):
self._playbooks: dict[str, Playbook] = {}
async def create(self, playbook: Playbook) -> Playbook:
self._playbooks[playbook.playbook_id] = playbook
return playbook
async def get_by_id(self, playbook_id: str) -> Playbook | None:
return self._playbooks.get(playbook_id)
async def update(self, playbook: Playbook) -> Playbook | None:
if playbook.playbook_id in self._playbooks:
self._playbooks[playbook.playbook_id] = playbook
return playbook
return None
async def delete(self, playbook_id: str) -> bool:
if playbook_id in self._playbooks:
self._playbooks[playbook_id].status = PlaybookStatus.DEPRECATED
return True
return False
async def list_playbooks(
self,
status: PlaybookStatus | None = None,
tags: list[str] | None = None,
limit: int = 20,
offset: int = 0,
) -> tuple[list[Playbook], int]:
items = list(self._playbooks.values())
if status:
items = [p for p in items if p.status == status]
if tags:
items = [p for p in items if any(t in p.tags for t in tags)]
total = len(items)
return items[offset : offset + limit], total
async def find_by_symptoms(
self,
symptoms: SymptomPattern,
top_k: int = 3,
min_similarity: float = 0.4,
) -> list[tuple[Playbook, float]]:
results = []
for playbook in self._playbooks.values():
if playbook.status != PlaybookStatus.APPROVED:
continue
# Simple similarity calculation for testing
similarity = self._calculate_similarity(symptoms, playbook.symptom_pattern)
if similarity >= min_similarity:
results.append((playbook, similarity))
results.sort(key=lambda x: x[1], reverse=True)
return results[:top_k]
async def update_stats(self, playbook_id: str, success: bool) -> bool:
playbook = self._playbooks.get(playbook_id)
if not playbook:
return False
if success:
playbook.success_count += 1
else:
playbook.failure_count += 1
return True
def _calculate_similarity(self, query: SymptomPattern, target: SymptomPattern) -> float:
"""Simple Jaccard-like similarity for testing"""
alert_match = len(set(query.alert_names) & set(target.alert_names))
alert_union = len(set(query.alert_names) | set(target.alert_names))
service_match = len(set(query.affected_services) & set(target.affected_services))
service_union = len(set(query.affected_services) | set(target.affected_services))
if alert_union == 0 and service_union == 0:
return 0.0
score = 0.0
if alert_union > 0:
score += 0.5 * (alert_match / alert_union)
if service_union > 0:
score += 0.5 * (service_match / service_union)
return score
def create_test_incident(
incident_id: str = "INC-TEST-001",
status: IncidentStatus = IncidentStatus.RESOLVED,
effectiveness_score: int = 5,
) -> Incident:
"""Create a test incident for extraction"""
from src.models.incident import AIDecisionChain
now = datetime.now(UTC)
return Incident(
incident_id=incident_id,
status=status,
severity=Severity.P1,
affected_services=["test-service", "api-gateway"],
signals=[
Signal(
alert_name="HighCPU",
severity=Severity.P1,
source="prometheus",
fired_at=now,
labels={"namespace": "prod"},
annotations={"description": "CPU usage above 90%"},
),
],
decision_chain=AIDecisionChain(
model_used="ollama/llama3.2:latest",
hypothesis="High CPU usage detected, likely due to resource leak",
confidence=0.85,
reasoning_steps=[
"Detected HighCPU alert from prometheus",
"Service test-service affected",
"Recommended action: kubectl rollout restart deployment/test-service",
],
inference_started_at=now,
inference_completed_at=now,
latency_ms=150,
),
outcome=IncidentOutcome(
proposal_executed=True,
execution_success=True,
effectiveness_score=effectiveness_score,
learning_notes="kubectl rollout restart deployment/test-service",
),
)
def create_test_playbook(
playbook_id: str = "PB-TEST-001",
status: PlaybookStatus = PlaybookStatus.APPROVED,
success_count: int = 10,
failure_count: int = 1,
) -> Playbook:
"""Create a test playbook"""
return Playbook(
playbook_id=playbook_id,
name="HighCPU - test-service 修復劇本",
description="觸發告警: HighCPU. 影響服務: test-service",
status=status,
symptom_pattern=SymptomPattern(
alert_names=["HighCPU"],
affected_services=["test-service"],
severity_range=["P1"],
),
repair_steps=[
RepairStep(
step_number=1,
action_type=ActionType.KUBECTL,
command="kubectl rollout restart deployment/{target}",
expected_result="Deployment restarted",
risk_level=RiskLevel.MEDIUM,
),
],
success_count=success_count,
failure_count=failure_count,
ai_confidence=0.85,
tags=["cpu", "kubernetes", "test-service"],
)
class TestPlaybookService:
"""Playbook Service unit tests"""
@pytest.fixture
def mock_repo(self):
return MockPlaybookRepository()
@pytest.fixture
def service(self, mock_repo):
return PlaybookService(repository=mock_repo)
@pytest.mark.asyncio
async def test_extract_from_incident_success(self, service):
"""Test successful playbook extraction from incident"""
incident = create_test_incident()
playbook = await service.extract_from_incident(incident)
assert playbook is not None
assert "HighCPU" in playbook.name
assert playbook.status == PlaybookStatus.DRAFT
assert len(playbook.repair_steps) > 0
assert playbook.ai_confidence > 0.5
@pytest.mark.asyncio
async def test_extract_from_incident_auto_approve(self, service):
"""Test auto-approve for high confidence extraction"""
incident = create_test_incident(effectiveness_score=5)
playbook = await service.extract_from_incident(incident, auto_approve=True)
assert playbook is not None
# Auto-approve only if confidence >= 0.9
# With effectiveness=5, confidence should be ~0.9
@pytest.mark.asyncio
async def test_extract_from_incident_invalid_status(self, service):
"""Test extraction fails for non-resolved incidents"""
incident = create_test_incident(status=IncidentStatus.INVESTIGATING)
playbook = await service.extract_from_incident(incident)
assert playbook is None
@pytest.mark.asyncio
async def test_extract_from_incident_low_effectiveness(self, service):
"""Test extraction fails for low effectiveness score"""
incident = create_test_incident(effectiveness_score=3)
playbook = await service.extract_from_incident(incident)
assert playbook is None
@pytest.mark.asyncio
async def test_get_recommendations_with_match(self, service, mock_repo):
"""Test getting recommendations with matching playbook"""
# Add a matching playbook
playbook = create_test_playbook()
await mock_repo.create(playbook)
# Query with matching symptoms
symptoms = SymptomPattern(
alert_names=["HighCPU"],
affected_services=["test-service"],
severity_range=["P1"],
)
recommendations = await service.get_recommendations(symptoms, top_k=3)
assert len(recommendations) == 1
assert recommendations[0].playbook.playbook_id == playbook.playbook_id
assert recommendations[0].similarity_score > 0.5
@pytest.mark.asyncio
async def test_get_recommendations_no_match(self, service, mock_repo):
"""Test getting recommendations with no matching playbook"""
# Add a playbook with different symptoms
playbook = create_test_playbook()
playbook.symptom_pattern.alert_names = ["HighMemory"]
playbook.symptom_pattern.affected_services = ["other-service"]
await mock_repo.create(playbook)
# Query with non-matching symptoms
symptoms = SymptomPattern(
alert_names=["NetworkLatency"],
affected_services=["api-gateway"],
)
recommendations = await service.get_recommendations(symptoms, top_k=3)
# Should be empty or have very low similarity
assert len(recommendations) == 0 or recommendations[0].similarity_score < 0.4
@pytest.mark.asyncio
async def test_approve_playbook(self, service, mock_repo):
"""Test approving a draft playbook"""
playbook = create_test_playbook(status=PlaybookStatus.DRAFT)
await mock_repo.create(playbook)
approved = await service.approve(
playbook_id=playbook.playbook_id,
approved_by="test-user",
notes="Verified and approved",
)
assert approved is not None
assert approved.status == PlaybookStatus.APPROVED
assert approved.approved_by == "test-user"
assert approved.notes == "Verified and approved"
@pytest.mark.asyncio
async def test_approve_non_draft_playbook_fails(self, service, mock_repo):
"""Test that approving non-draft playbook fails"""
playbook = create_test_playbook(status=PlaybookStatus.APPROVED)
await mock_repo.create(playbook)
result = await service.approve(
playbook_id=playbook.playbook_id,
approved_by="test-user",
)
assert result is None
@pytest.mark.asyncio
async def test_record_execution(self, service, mock_repo):
"""Test recording execution results"""
playbook = create_test_playbook(success_count=10, failure_count=1)
await mock_repo.create(playbook)
# Record success
result = await service.record_execution(playbook.playbook_id, success=True)
assert result is True
# Check updated stats
updated = await service.get_by_id(playbook.playbook_id)
assert updated.success_count == 11
@pytest.mark.asyncio
async def test_playbook_success_rate(self):
"""Test success rate calculation"""
playbook = create_test_playbook(success_count=9, failure_count=1)
assert playbook.success_rate == 0.9
assert playbook.total_executions == 10
@pytest.mark.asyncio
async def test_playbook_is_high_quality(self):
"""Test high quality playbook detection"""
# High quality: APPROVED, >= 95% success rate, >= 10 successes
playbook = create_test_playbook(
status=PlaybookStatus.APPROVED,
success_count=20,
failure_count=1,
)
assert playbook.is_high_quality is True
@pytest.mark.asyncio
async def test_playbook_not_high_quality_draft(self):
"""Test draft playbook is not high quality"""
playbook = create_test_playbook(
status=PlaybookStatus.DRAFT,
success_count=20,
failure_count=0,
)
assert playbook.is_high_quality is False
@pytest.mark.asyncio
async def test_delete_playbook_soft_delete(self, service, mock_repo):
"""Test soft delete (deprecation)"""
playbook = create_test_playbook()
await mock_repo.create(playbook)
result = await service.delete(playbook.playbook_id)
assert result is True
deleted = await service.get_by_id(playbook.playbook_id)
assert deleted.status == PlaybookStatus.DEPRECATED