diff --git a/apps/api/src/services/approval_execution.py b/apps/api/src/services/approval_execution.py index 56bbcdd8..e08b1ce6 100644 --- a/apps/api/src/services/approval_execution.py +++ b/apps/api/src/services/approval_execution.py @@ -8,9 +8,11 @@ Approval Execution Service - Phase 16 R4.2 瘦身 Router 抽取 - ApprovalDBService: 更新狀態 - TimelineService: 記錄事件 - NotificationManager: 發送通知 +- Phase 7.6: Playbook 自動萃取 -版本: v1.0 +版本: v1.1 建立: 2026-03-25 (台北時區) +更新: 2026-03-26 (Phase 7.6 自動萃取) 建立者: Claude Code (Phase 16 R4.2) """ @@ -144,6 +146,11 @@ class ApprovalExecutionService: duration_ms=result.duration_ms, ) ) + + # Phase 7.6: 觸發 Playbook 自動萃取 (fire-and-forget) + asyncio.create_task( + self._trigger_playbook_extraction(approval) + ) else: logger.error( "background_execution_failed", @@ -250,6 +257,134 @@ class ApprovalExecutionService: error=str(e), ) + async def _trigger_playbook_extraction( + self, + approval: ApprovalRequest, + ) -> None: + """ + Phase 7.6: 觸發 Playbook 自動萃取 + + 條件: + - 執行成功 + - 關聯的 Incident 狀態為 RESOLVED 或 CLOSED + - effectiveness_score >= 4 + + 此函數為 fire-and-forget,失敗不影響主流程 + """ + try: + # 1. 從 approval 取得關聯的 incident_id + # approval.requested_by 可能包含 incident 資訊,或從 metadata 取得 + # 暫時從 description 或 action 解析 + incident_id = self._extract_incident_id_from_approval(approval) + if not incident_id: + logger.debug( + "playbook_extraction_skip", + approval_id=str(approval.id), + reason="No incident_id found", + ) + return + + # 2. 取得 Incident + from src.services.incident_service import get_incident_service + + incident_service = get_incident_service() + incident = await incident_service.get_incident(incident_id) + + if not incident: + logger.debug( + "playbook_extraction_skip", + approval_id=str(approval.id), + incident_id=incident_id, + reason="Incident not found", + ) + return + + # 3. 檢查 Incident 狀態 + from src.models.incident import IncidentStatus + + if incident.status not in [IncidentStatus.RESOLVED, IncidentStatus.CLOSED]: + logger.debug( + "playbook_extraction_skip", + approval_id=str(approval.id), + incident_id=incident_id, + incident_status=incident.status.value, + reason="Incident not resolved", + ) + return + + # 4. 檢查 effectiveness_score + effectiveness = incident.outcome.effectiveness_score if incident.outcome else 0 + if effectiveness < 4: + logger.debug( + "playbook_extraction_skip", + approval_id=str(approval.id), + incident_id=incident_id, + effectiveness=effectiveness, + reason="Low effectiveness score", + ) + return + + # 5. 觸發萃取 + from src.services.playbook_service import get_playbook_service + + playbook_service = get_playbook_service() + playbook = await playbook_service.extract_from_incident( + incident=incident, + auto_approve=effectiveness >= 5, # 滿分自動核准 + ) + + if playbook: + logger.info( + "playbook_auto_extracted", + approval_id=str(approval.id), + incident_id=incident_id, + playbook_id=playbook.playbook_id, + playbook_name=playbook.name, + auto_approved=playbook.status.value == "approved", + ) + else: + logger.debug( + "playbook_extraction_no_result", + approval_id=str(approval.id), + incident_id=incident_id, + ) + + except Exception as e: + # 萃取失敗不影響主流程 + logger.warning( + "playbook_extraction_error", + approval_id=str(approval.id), + error=str(e), + ) + + def _extract_incident_id_from_approval( + self, + approval: ApprovalRequest, + ) -> str | None: + """ + 從 approval 提取關聯的 incident_id + + 嘗試以下來源: + 1. approval.metadata (如果有) + 2. approval.description 中的 INC- 模式 + 3. approval.requested_by 中的 incident 資訊 + """ + import re + + # 從 description 或 action 中尋找 INC-XXXXXX 模式 + text = f"{approval.description or ''} {approval.action or ''}" + match = re.search(r"INC-([A-Z0-9-]+)", text) + if match: + return match.group(0) # 返回完整的 INC-XXXXX + + # 從 requested_by 尋找 + if approval.requested_by and "INC-" in approval.requested_by: + match = re.search(r"INC-([A-Z0-9-]+)", approval.requested_by) + if match: + return match.group(0) + + return None + # ============================================================================= # Singleton Instance diff --git a/apps/api/src/services/decision_manager.py b/apps/api/src/services/decision_manager.py index 1f8d2244..1cafed55 100644 --- a/apps/api/src/services/decision_manager.py +++ b/apps/api/src/services/decision_manager.py @@ -30,10 +30,15 @@ import structlog from src.core.config import settings from src.core.redis_client import get_redis from src.models.incident import Incident +from src.models.playbook import SymptomPattern from src.services.openclaw import get_openclaw +from src.services.playbook_service import get_playbook_service logger = structlog.get_logger(__name__) +# Phase 7.5: Playbook 優先閾值 +PLAYBOOK_SIMILARITY_THRESHOLD = 0.85 # 相似度 >= 85% 直接使用 Playbook + # ============================================================================= # Telegram 推送 (Phase 6.5: 決策就緒通知) @@ -394,13 +399,20 @@ class DecisionManager: incident: Incident, ) -> dict[str, Any]: """ - 雙軌決策分析 + 三軌決策分析 (Phase 7.5 升級) 策略: - - 同時啟動 LLM 和 Expert System - - LLM 成功則用 LLM (更智能) - - LLM 失敗則用 Expert System (保底) + 1. 先檢查 Playbook 是否有高度匹配 (similarity >= 85%) + 2. Playbook 命中則直接使用 (最快、經驗驗證) + 3. 否則 LLM + Expert System 雙軌 + + 優先順序: Playbook > LLM > Expert System """ + # Phase 7.5: 先嘗試 Playbook 匹配 + playbook_result = await self._try_playbook_match(incident) + if playbook_result: + return playbook_result + # Expert System 同步執行 (立即可用) expert_result = expert_analyze(incident) @@ -440,6 +452,108 @@ class DecisionManager: ) return expert_result + async def _try_playbook_match( + self, + incident: Incident, + ) -> dict[str, Any] | None: + """ + Phase 7.5: 嘗試 Playbook 匹配 + + 條件: + - 相似度 >= PLAYBOOK_SIMILARITY_THRESHOLD (85%) + - Playbook 狀態為 APPROVED + - 成功率 >= 80% (如果有執行紀錄) + + Returns: + 匹配成功返回 proposal_data,否則 None + """ + try: + playbook_service = get_playbook_service() + + # 建構症狀模式 + alert_names = [s.alert_name for s in incident.signals] if incident.signals else [] + symptoms = SymptomPattern( + alert_names=alert_names, + affected_services=incident.affected_services or [], + severity_range=[incident.severity.value] if incident.severity else ["P2"], + ) + + # 取得推薦 (只取 Top 1) + recommendations = await playbook_service.get_recommendations( + symptoms=symptoms, + top_k=1, + ) + + if not recommendations: + logger.debug( + "playbook_no_match", + incident_id=incident.incident_id, + ) + return None + + best_match = recommendations[0] + playbook = best_match.playbook + + # 檢查相似度閾值 + if best_match.similarity_score < PLAYBOOK_SIMILARITY_THRESHOLD: + logger.debug( + "playbook_similarity_below_threshold", + incident_id=incident.incident_id, + playbook_id=playbook.playbook_id, + similarity=best_match.similarity_score, + threshold=PLAYBOOK_SIMILARITY_THRESHOLD, + ) + return None + + # 檢查成功率 (如果有執行紀錄) + if playbook.total_executions > 0 and playbook.success_rate < 0.8: + logger.debug( + "playbook_low_success_rate", + incident_id=incident.incident_id, + playbook_id=playbook.playbook_id, + success_rate=playbook.success_rate, + ) + return None + + # Playbook 命中! + # 取得第一個修復步驟的指令 + kubectl_command = "" + if playbook.repair_steps: + # 將 target 替換為實際服務名稱 + target = incident.affected_services[0] if incident.affected_services else "unknown" + kubectl_command = playbook.repair_steps[0].command.format(target=target) + + logger.info( + "playbook_match_success", + incident_id=incident.incident_id, + playbook_id=playbook.playbook_id, + playbook_name=playbook.name, + similarity=best_match.similarity_score, + success_rate=playbook.success_rate, + ) + + return { + "source": "playbook", + "playbook_id": playbook.playbook_id, + "playbook_name": playbook.name, + "action": kubectl_command, + "kubectl_command": kubectl_command, + "description": playbook.description, + "risk_level": playbook.repair_steps[0].risk_level.value.lower() if playbook.repair_steps else "medium", + "reasoning": f"Playbook 匹配 ({best_match.similarity_score:.0%} 相似度, {playbook.success_rate:.0%} 成功率): {best_match.reason}", + "confidence": min(best_match.similarity_score, playbook.success_rate) if playbook.total_executions > 0 else best_match.similarity_score, + "matched_symptoms": best_match.matched_symptoms, + "from_cache": False, + } + + except Exception as e: + logger.warning( + "playbook_match_error", + incident_id=incident.incident_id, + error=str(e), + ) + return None + async def _find_existing_token( self, incident_id: str, diff --git a/apps/api/src/services/playbook_service.py b/apps/api/src/services/playbook_service.py index 7def9a4e..de8874e3 100644 --- a/apps/api/src/services/playbook_service.py +++ b/apps/api/src/services/playbook_service.py @@ -322,31 +322,39 @@ class PlaybookService: def _extract_repair_steps(self, incident: Incident) -> list[RepairStep]: """從 Incident 萃取修復步驟""" steps: list[RepairStep] = [] + step_number = 1 - # 從 decision_chain 提取 - if incident.decision_chain: - for i, step in enumerate(incident.decision_chain.steps, 1): - if step.executed_action: - steps.append( - RepairStep( - step_number=i, - action_type=ActionType.KUBECTL, - command=step.executed_action, - expected_result=step.result or None, - risk_level=RiskLevel.MEDIUM, + # 從 decision_chain.reasoning_steps 提取 kubectl 命令 + if incident.decision_chain and incident.decision_chain.reasoning_steps: + for reasoning in incident.decision_chain.reasoning_steps: + # 尋找包含 kubectl 的步驟 + if "kubectl" in reasoning.lower(): + # 嘗試提取 kubectl 命令 + import re + kubectl_match = re.search(r"kubectl\s+\S+.*", reasoning) + if kubectl_match: + steps.append( + RepairStep( + step_number=step_number, + action_type=ActionType.KUBECTL, + command=kubectl_match.group(0).strip(), + risk_level=RiskLevel.MEDIUM, + ) ) - ) + step_number += 1 - # 如果沒有從 decision_chain 取得,嘗試從 outcome 取得 - if not steps and incident.outcome and incident.outcome.repair_action: - steps.append( - RepairStep( - step_number=1, - action_type=ActionType.KUBECTL, - command=incident.outcome.repair_action, - risk_level=RiskLevel.MEDIUM, + # 如果沒有從 reasoning_steps 取得,嘗試從 learning_notes 取得 + if not steps and incident.outcome and incident.outcome.learning_notes: + notes = incident.outcome.learning_notes + if "kubectl" in notes.lower(): + steps.append( + RepairStep( + step_number=1, + action_type=ActionType.KUBECTL, + command=notes, + risk_level=RiskLevel.MEDIUM, + ) ) - ) return steps @@ -358,7 +366,7 @@ class PlaybookService: effectiveness_bonus = (effectiveness - 3) * 0.2 # 有 decision_chain 加分 - if incident.decision_chain and incident.decision_chain.steps: + if incident.decision_chain and incident.decision_chain.reasoning_steps: base_score += 0.1 # 有多個 signals 加分 (更多資料) @@ -385,8 +393,9 @@ class PlaybookService: if incident.affected_services: parts.append(f"影響服務: {', '.join(incident.affected_services)}") - if incident.outcome and incident.outcome.repair_action: - parts.append(f"修復動作: {incident.outcome.repair_action[:100]}") + # 從 decision_chain.hypothesis 取得 AI 分析結果 + if incident.decision_chain and incident.decision_chain.hypothesis: + parts.append(f"AI 分析: {incident.decision_chain.hypothesis[:100]}") return ". ".join(parts) if parts else "從成功案例自動萃取的修復劇本" diff --git a/apps/api/tests/test_playbook_service.py b/apps/api/tests/test_playbook_service.py new file mode 100644 index 00000000..1608db8c --- /dev/null +++ b/apps/api/tests/test_playbook_service.py @@ -0,0 +1,371 @@ +""" +Playbook Service Tests - #7 Playbook 萃取 +========================================== +測試 Playbook 服務層功能 + +版本: v1.0 +建立: 2026-03-26 (台北時區) +建立者: Claude Code (Phase 7.5-7.6) +""" + +from datetime import UTC, datetime + +import pytest + +from src.models.incident import ( + Incident, + IncidentOutcome, + IncidentStatus, + Severity, + Signal, +) +from src.models.playbook import ( + ActionType, + Playbook, + PlaybookStatus, + RepairStep, + RiskLevel, + SymptomPattern, +) +from src.services.playbook_service import PlaybookService + + +class MockPlaybookRepository: + """Mock repository for testing""" + + def __init__(self): + self._playbooks: dict[str, Playbook] = {} + + async def create(self, playbook: Playbook) -> Playbook: + self._playbooks[playbook.playbook_id] = playbook + return playbook + + async def get_by_id(self, playbook_id: str) -> Playbook | None: + return self._playbooks.get(playbook_id) + + async def update(self, playbook: Playbook) -> Playbook | None: + if playbook.playbook_id in self._playbooks: + self._playbooks[playbook.playbook_id] = playbook + return playbook + return None + + async def delete(self, playbook_id: str) -> bool: + if playbook_id in self._playbooks: + self._playbooks[playbook_id].status = PlaybookStatus.DEPRECATED + return True + return False + + async def list_playbooks( + self, + status: PlaybookStatus | None = None, + tags: list[str] | None = None, + limit: int = 20, + offset: int = 0, + ) -> tuple[list[Playbook], int]: + items = list(self._playbooks.values()) + if status: + items = [p for p in items if p.status == status] + if tags: + items = [p for p in items if any(t in p.tags for t in tags)] + total = len(items) + return items[offset : offset + limit], total + + async def find_by_symptoms( + self, + symptoms: SymptomPattern, + top_k: int = 3, + min_similarity: float = 0.4, + ) -> list[tuple[Playbook, float]]: + results = [] + for playbook in self._playbooks.values(): + if playbook.status != PlaybookStatus.APPROVED: + continue + # Simple similarity calculation for testing + similarity = self._calculate_similarity(symptoms, playbook.symptom_pattern) + if similarity >= min_similarity: + results.append((playbook, similarity)) + results.sort(key=lambda x: x[1], reverse=True) + return results[:top_k] + + async def update_stats(self, playbook_id: str, success: bool) -> bool: + playbook = self._playbooks.get(playbook_id) + if not playbook: + return False + if success: + playbook.success_count += 1 + else: + playbook.failure_count += 1 + return True + + def _calculate_similarity(self, query: SymptomPattern, target: SymptomPattern) -> float: + """Simple Jaccard-like similarity for testing""" + alert_match = len(set(query.alert_names) & set(target.alert_names)) + alert_union = len(set(query.alert_names) | set(target.alert_names)) + service_match = len(set(query.affected_services) & set(target.affected_services)) + service_union = len(set(query.affected_services) | set(target.affected_services)) + + if alert_union == 0 and service_union == 0: + return 0.0 + + score = 0.0 + if alert_union > 0: + score += 0.5 * (alert_match / alert_union) + if service_union > 0: + score += 0.5 * (service_match / service_union) + return score + + +def create_test_incident( + incident_id: str = "INC-TEST-001", + status: IncidentStatus = IncidentStatus.RESOLVED, + effectiveness_score: int = 5, +) -> Incident: + """Create a test incident for extraction""" + from src.models.incident import AIDecisionChain + + now = datetime.now(UTC) + return Incident( + incident_id=incident_id, + status=status, + severity=Severity.P1, + affected_services=["test-service", "api-gateway"], + signals=[ + Signal( + alert_name="HighCPU", + severity=Severity.P1, + source="prometheus", + fired_at=now, + labels={"namespace": "prod"}, + annotations={"description": "CPU usage above 90%"}, + ), + ], + decision_chain=AIDecisionChain( + model_used="ollama/llama3.2:latest", + hypothesis="High CPU usage detected, likely due to resource leak", + confidence=0.85, + reasoning_steps=[ + "Detected HighCPU alert from prometheus", + "Service test-service affected", + "Recommended action: kubectl rollout restart deployment/test-service", + ], + inference_started_at=now, + inference_completed_at=now, + latency_ms=150, + ), + outcome=IncidentOutcome( + proposal_executed=True, + execution_success=True, + effectiveness_score=effectiveness_score, + learning_notes="kubectl rollout restart deployment/test-service", + ), + ) + + +def create_test_playbook( + playbook_id: str = "PB-TEST-001", + status: PlaybookStatus = PlaybookStatus.APPROVED, + success_count: int = 10, + failure_count: int = 1, +) -> Playbook: + """Create a test playbook""" + return Playbook( + playbook_id=playbook_id, + name="HighCPU - test-service 修復劇本", + description="觸發告警: HighCPU. 影響服務: test-service", + status=status, + symptom_pattern=SymptomPattern( + alert_names=["HighCPU"], + affected_services=["test-service"], + severity_range=["P1"], + ), + repair_steps=[ + RepairStep( + step_number=1, + action_type=ActionType.KUBECTL, + command="kubectl rollout restart deployment/{target}", + expected_result="Deployment restarted", + risk_level=RiskLevel.MEDIUM, + ), + ], + success_count=success_count, + failure_count=failure_count, + ai_confidence=0.85, + tags=["cpu", "kubernetes", "test-service"], + ) + + +class TestPlaybookService: + """Playbook Service unit tests""" + + @pytest.fixture + def mock_repo(self): + return MockPlaybookRepository() + + @pytest.fixture + def service(self, mock_repo): + return PlaybookService(repository=mock_repo) + + @pytest.mark.asyncio + async def test_extract_from_incident_success(self, service): + """Test successful playbook extraction from incident""" + incident = create_test_incident() + playbook = await service.extract_from_incident(incident) + + assert playbook is not None + assert "HighCPU" in playbook.name + assert playbook.status == PlaybookStatus.DRAFT + assert len(playbook.repair_steps) > 0 + assert playbook.ai_confidence > 0.5 + + @pytest.mark.asyncio + async def test_extract_from_incident_auto_approve(self, service): + """Test auto-approve for high confidence extraction""" + incident = create_test_incident(effectiveness_score=5) + playbook = await service.extract_from_incident(incident, auto_approve=True) + + assert playbook is not None + # Auto-approve only if confidence >= 0.9 + # With effectiveness=5, confidence should be ~0.9 + + @pytest.mark.asyncio + async def test_extract_from_incident_invalid_status(self, service): + """Test extraction fails for non-resolved incidents""" + incident = create_test_incident(status=IncidentStatus.INVESTIGATING) + playbook = await service.extract_from_incident(incident) + + assert playbook is None + + @pytest.mark.asyncio + async def test_extract_from_incident_low_effectiveness(self, service): + """Test extraction fails for low effectiveness score""" + incident = create_test_incident(effectiveness_score=3) + playbook = await service.extract_from_incident(incident) + + assert playbook is None + + @pytest.mark.asyncio + async def test_get_recommendations_with_match(self, service, mock_repo): + """Test getting recommendations with matching playbook""" + # Add a matching playbook + playbook = create_test_playbook() + await mock_repo.create(playbook) + + # Query with matching symptoms + symptoms = SymptomPattern( + alert_names=["HighCPU"], + affected_services=["test-service"], + severity_range=["P1"], + ) + + recommendations = await service.get_recommendations(symptoms, top_k=3) + + assert len(recommendations) == 1 + assert recommendations[0].playbook.playbook_id == playbook.playbook_id + assert recommendations[0].similarity_score > 0.5 + + @pytest.mark.asyncio + async def test_get_recommendations_no_match(self, service, mock_repo): + """Test getting recommendations with no matching playbook""" + # Add a playbook with different symptoms + playbook = create_test_playbook() + playbook.symptom_pattern.alert_names = ["HighMemory"] + playbook.symptom_pattern.affected_services = ["other-service"] + await mock_repo.create(playbook) + + # Query with non-matching symptoms + symptoms = SymptomPattern( + alert_names=["NetworkLatency"], + affected_services=["api-gateway"], + ) + + recommendations = await service.get_recommendations(symptoms, top_k=3) + + # Should be empty or have very low similarity + assert len(recommendations) == 0 or recommendations[0].similarity_score < 0.4 + + @pytest.mark.asyncio + async def test_approve_playbook(self, service, mock_repo): + """Test approving a draft playbook""" + playbook = create_test_playbook(status=PlaybookStatus.DRAFT) + await mock_repo.create(playbook) + + approved = await service.approve( + playbook_id=playbook.playbook_id, + approved_by="test-user", + notes="Verified and approved", + ) + + assert approved is not None + assert approved.status == PlaybookStatus.APPROVED + assert approved.approved_by == "test-user" + assert approved.notes == "Verified and approved" + + @pytest.mark.asyncio + async def test_approve_non_draft_playbook_fails(self, service, mock_repo): + """Test that approving non-draft playbook fails""" + playbook = create_test_playbook(status=PlaybookStatus.APPROVED) + await mock_repo.create(playbook) + + result = await service.approve( + playbook_id=playbook.playbook_id, + approved_by="test-user", + ) + + assert result is None + + @pytest.mark.asyncio + async def test_record_execution(self, service, mock_repo): + """Test recording execution results""" + playbook = create_test_playbook(success_count=10, failure_count=1) + await mock_repo.create(playbook) + + # Record success + result = await service.record_execution(playbook.playbook_id, success=True) + assert result is True + + # Check updated stats + updated = await service.get_by_id(playbook.playbook_id) + assert updated.success_count == 11 + + @pytest.mark.asyncio + async def test_playbook_success_rate(self): + """Test success rate calculation""" + playbook = create_test_playbook(success_count=9, failure_count=1) + + assert playbook.success_rate == 0.9 + assert playbook.total_executions == 10 + + @pytest.mark.asyncio + async def test_playbook_is_high_quality(self): + """Test high quality playbook detection""" + # High quality: APPROVED, >= 95% success rate, >= 10 successes + playbook = create_test_playbook( + status=PlaybookStatus.APPROVED, + success_count=20, + failure_count=1, + ) + + assert playbook.is_high_quality is True + + @pytest.mark.asyncio + async def test_playbook_not_high_quality_draft(self): + """Test draft playbook is not high quality""" + playbook = create_test_playbook( + status=PlaybookStatus.DRAFT, + success_count=20, + failure_count=0, + ) + + assert playbook.is_high_quality is False + + @pytest.mark.asyncio + async def test_delete_playbook_soft_delete(self, service, mock_repo): + """Test soft delete (deprecation)""" + playbook = create_test_playbook() + await mock_repo.create(playbook) + + result = await service.delete(playbook.playbook_id) + + assert result is True + deleted = await service.get_by_id(playbook.playbook_id) + assert deleted.status == PlaybookStatus.DEPRECATED