- Import sorting (I001) - Unused imports (F401) - f-string without placeholders (F541) - Loop variable unused (B007) - zip() strict parameter (B905) - Exception chaining (B904) - collections.abc imports (UP035) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
371 lines
13 KiB
Python
371 lines
13 KiB
Python
"""
|
|
Playbook Service Tests - #7 Playbook 萃取
|
|
==========================================
|
|
測試 Playbook 服務層功能
|
|
|
|
版本: v1.0
|
|
建立: 2026-03-26 (台北時區)
|
|
建立者: Claude Code (Phase 7.5-7.6)
|
|
"""
|
|
|
|
import pytest
|
|
|
|
from src.models.incident import (
|
|
Incident,
|
|
IncidentOutcome,
|
|
IncidentStatus,
|
|
Severity,
|
|
Signal,
|
|
)
|
|
from src.models.playbook import (
|
|
ActionType,
|
|
Playbook,
|
|
PlaybookStatus,
|
|
RepairStep,
|
|
RiskLevel,
|
|
SymptomPattern,
|
|
)
|
|
from src.services.playbook_service import PlaybookService
|
|
from src.utils.timezone import now_taipei
|
|
|
|
|
|
class MockPlaybookRepository:
|
|
"""Mock repository for testing"""
|
|
|
|
def __init__(self):
|
|
self._playbooks: dict[str, Playbook] = {}
|
|
|
|
async def create(self, playbook: Playbook) -> Playbook:
|
|
self._playbooks[playbook.playbook_id] = playbook
|
|
return playbook
|
|
|
|
async def get_by_id(self, playbook_id: str) -> Playbook | None:
|
|
return self._playbooks.get(playbook_id)
|
|
|
|
async def update(self, playbook: Playbook) -> Playbook | None:
|
|
if playbook.playbook_id in self._playbooks:
|
|
self._playbooks[playbook.playbook_id] = playbook
|
|
return playbook
|
|
return None
|
|
|
|
async def delete(self, playbook_id: str) -> bool:
|
|
if playbook_id in self._playbooks:
|
|
self._playbooks[playbook_id].status = PlaybookStatus.DEPRECATED
|
|
return True
|
|
return False
|
|
|
|
async def list_playbooks(
|
|
self,
|
|
status: PlaybookStatus | None = None,
|
|
tags: list[str] | None = None,
|
|
limit: int = 20,
|
|
offset: int = 0,
|
|
) -> tuple[list[Playbook], int]:
|
|
items = list(self._playbooks.values())
|
|
if status:
|
|
items = [p for p in items if p.status == status]
|
|
if tags:
|
|
items = [p for p in items if any(t in p.tags for t in tags)]
|
|
total = len(items)
|
|
return items[offset : offset + limit], total
|
|
|
|
async def find_by_symptoms(
|
|
self,
|
|
symptoms: SymptomPattern,
|
|
top_k: int = 3,
|
|
min_similarity: float = 0.4,
|
|
) -> list[tuple[Playbook, float]]:
|
|
results = []
|
|
for playbook in self._playbooks.values():
|
|
if playbook.status != PlaybookStatus.APPROVED:
|
|
continue
|
|
# Simple similarity calculation for testing
|
|
similarity = self._calculate_similarity(symptoms, playbook.symptom_pattern)
|
|
if similarity >= min_similarity:
|
|
results.append((playbook, similarity))
|
|
results.sort(key=lambda x: x[1], reverse=True)
|
|
return results[:top_k]
|
|
|
|
async def update_stats(self, playbook_id: str, success: bool) -> bool:
|
|
playbook = self._playbooks.get(playbook_id)
|
|
if not playbook:
|
|
return False
|
|
if success:
|
|
playbook.success_count += 1
|
|
else:
|
|
playbook.failure_count += 1
|
|
return True
|
|
|
|
def _calculate_similarity(self, query: SymptomPattern, target: SymptomPattern) -> float:
|
|
"""Simple Jaccard-like similarity for testing"""
|
|
alert_match = len(set(query.alert_names) & set(target.alert_names))
|
|
alert_union = len(set(query.alert_names) | set(target.alert_names))
|
|
service_match = len(set(query.affected_services) & set(target.affected_services))
|
|
service_union = len(set(query.affected_services) | set(target.affected_services))
|
|
|
|
if alert_union == 0 and service_union == 0:
|
|
return 0.0
|
|
|
|
score = 0.0
|
|
if alert_union > 0:
|
|
score += 0.5 * (alert_match / alert_union)
|
|
if service_union > 0:
|
|
score += 0.5 * (service_match / service_union)
|
|
return score
|
|
|
|
|
|
def create_test_incident(
|
|
incident_id: str = "INC-TEST-001",
|
|
status: IncidentStatus = IncidentStatus.RESOLVED,
|
|
effectiveness_score: int = 5,
|
|
) -> Incident:
|
|
"""Create a test incident for extraction"""
|
|
from src.models.incident import AIDecisionChain
|
|
|
|
now = now_taipei()
|
|
return Incident(
|
|
incident_id=incident_id,
|
|
status=status,
|
|
severity=Severity.P1,
|
|
affected_services=["test-service", "api-gateway"],
|
|
signals=[
|
|
Signal(
|
|
alert_name="HighCPU",
|
|
severity=Severity.P1,
|
|
source="prometheus",
|
|
fired_at=now,
|
|
labels={"namespace": "prod"},
|
|
annotations={"description": "CPU usage above 90%"},
|
|
),
|
|
],
|
|
decision_chain=AIDecisionChain(
|
|
model_used="ollama/llama3.2:latest",
|
|
hypothesis="High CPU usage detected, likely due to resource leak",
|
|
confidence=0.85,
|
|
reasoning_steps=[
|
|
"Detected HighCPU alert from prometheus",
|
|
"Service test-service affected",
|
|
"Recommended action: kubectl rollout restart deployment/test-service",
|
|
],
|
|
inference_started_at=now,
|
|
inference_completed_at=now,
|
|
latency_ms=150,
|
|
),
|
|
outcome=IncidentOutcome(
|
|
proposal_executed=True,
|
|
execution_success=True,
|
|
effectiveness_score=effectiveness_score,
|
|
learning_notes="kubectl rollout restart deployment/test-service",
|
|
),
|
|
)
|
|
|
|
|
|
def create_test_playbook(
|
|
playbook_id: str = "PB-TEST-001",
|
|
status: PlaybookStatus = PlaybookStatus.APPROVED,
|
|
success_count: int = 10,
|
|
failure_count: int = 1,
|
|
) -> Playbook:
|
|
"""Create a test playbook"""
|
|
return Playbook(
|
|
playbook_id=playbook_id,
|
|
name="HighCPU - test-service 修復劇本",
|
|
description="觸發告警: HighCPU. 影響服務: test-service",
|
|
status=status,
|
|
symptom_pattern=SymptomPattern(
|
|
alert_names=["HighCPU"],
|
|
affected_services=["test-service"],
|
|
severity_range=["P1"],
|
|
),
|
|
repair_steps=[
|
|
RepairStep(
|
|
step_number=1,
|
|
action_type=ActionType.KUBECTL,
|
|
command="kubectl rollout restart deployment/{target}",
|
|
expected_result="Deployment restarted",
|
|
risk_level=RiskLevel.MEDIUM,
|
|
),
|
|
],
|
|
success_count=success_count,
|
|
failure_count=failure_count,
|
|
ai_confidence=0.85,
|
|
tags=["cpu", "kubernetes", "test-service"],
|
|
)
|
|
|
|
|
|
class TestPlaybookService:
|
|
"""Playbook Service unit tests"""
|
|
|
|
@pytest.fixture
|
|
def mock_repo(self):
|
|
return MockPlaybookRepository()
|
|
|
|
@pytest.fixture
|
|
def service(self, mock_repo):
|
|
return PlaybookService(repository=mock_repo)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_extract_from_incident_success(self, service):
|
|
"""Test successful playbook extraction from incident"""
|
|
incident = create_test_incident()
|
|
playbook = await service.extract_from_incident(incident)
|
|
|
|
assert playbook is not None
|
|
assert "HighCPU" in playbook.name
|
|
assert playbook.status == PlaybookStatus.DRAFT
|
|
assert len(playbook.repair_steps) > 0
|
|
assert playbook.ai_confidence > 0.5
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_extract_from_incident_auto_approve(self, service):
|
|
"""Test auto-approve for high confidence extraction"""
|
|
incident = create_test_incident(effectiveness_score=5)
|
|
playbook = await service.extract_from_incident(incident, auto_approve=True)
|
|
|
|
assert playbook is not None
|
|
# Auto-approve only if confidence >= 0.9
|
|
# With effectiveness=5, confidence should be ~0.9
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_extract_from_incident_invalid_status(self, service):
|
|
"""Test extraction fails for non-resolved incidents"""
|
|
incident = create_test_incident(status=IncidentStatus.INVESTIGATING)
|
|
playbook = await service.extract_from_incident(incident)
|
|
|
|
assert playbook is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_extract_from_incident_low_effectiveness(self, service):
|
|
"""Test extraction fails for low effectiveness score"""
|
|
incident = create_test_incident(effectiveness_score=3)
|
|
playbook = await service.extract_from_incident(incident)
|
|
|
|
assert playbook is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_recommendations_with_match(self, service, mock_repo):
|
|
"""Test getting recommendations with matching playbook"""
|
|
# Add a matching playbook
|
|
playbook = create_test_playbook()
|
|
await mock_repo.create(playbook)
|
|
|
|
# Query with matching symptoms
|
|
symptoms = SymptomPattern(
|
|
alert_names=["HighCPU"],
|
|
affected_services=["test-service"],
|
|
severity_range=["P1"],
|
|
)
|
|
|
|
recommendations = await service.get_recommendations(symptoms, top_k=3)
|
|
|
|
assert len(recommendations) == 1
|
|
assert recommendations[0].playbook.playbook_id == playbook.playbook_id
|
|
assert recommendations[0].similarity_score > 0.5
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_recommendations_no_match(self, service, mock_repo):
|
|
"""Test getting recommendations with no matching playbook"""
|
|
# Add a playbook with different symptoms
|
|
playbook = create_test_playbook()
|
|
playbook.symptom_pattern.alert_names = ["HighMemory"]
|
|
playbook.symptom_pattern.affected_services = ["other-service"]
|
|
await mock_repo.create(playbook)
|
|
|
|
# Query with non-matching symptoms
|
|
symptoms = SymptomPattern(
|
|
alert_names=["NetworkLatency"],
|
|
affected_services=["api-gateway"],
|
|
)
|
|
|
|
recommendations = await service.get_recommendations(symptoms, top_k=3)
|
|
|
|
# Should be empty or have very low similarity
|
|
assert len(recommendations) == 0 or recommendations[0].similarity_score < 0.4
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_approve_playbook(self, service, mock_repo):
|
|
"""Test approving a draft playbook"""
|
|
playbook = create_test_playbook(status=PlaybookStatus.DRAFT)
|
|
await mock_repo.create(playbook)
|
|
|
|
approved = await service.approve(
|
|
playbook_id=playbook.playbook_id,
|
|
approved_by="test-user",
|
|
notes="Verified and approved",
|
|
)
|
|
|
|
assert approved is not None
|
|
assert approved.status == PlaybookStatus.APPROVED
|
|
assert approved.approved_by == "test-user"
|
|
assert approved.notes == "Verified and approved"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_approve_non_draft_playbook_fails(self, service, mock_repo):
|
|
"""Test that approving non-draft playbook fails"""
|
|
playbook = create_test_playbook(status=PlaybookStatus.APPROVED)
|
|
await mock_repo.create(playbook)
|
|
|
|
result = await service.approve(
|
|
playbook_id=playbook.playbook_id,
|
|
approved_by="test-user",
|
|
)
|
|
|
|
assert result is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_record_execution(self, service, mock_repo):
|
|
"""Test recording execution results"""
|
|
playbook = create_test_playbook(success_count=10, failure_count=1)
|
|
await mock_repo.create(playbook)
|
|
|
|
# Record success
|
|
result = await service.record_execution(playbook.playbook_id, success=True)
|
|
assert result is True
|
|
|
|
# Check updated stats
|
|
updated = await service.get_by_id(playbook.playbook_id)
|
|
assert updated.success_count == 11
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_playbook_success_rate(self):
|
|
"""Test success rate calculation"""
|
|
playbook = create_test_playbook(success_count=9, failure_count=1)
|
|
|
|
assert playbook.success_rate == 0.9
|
|
assert playbook.total_executions == 10
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_playbook_is_high_quality(self):
|
|
"""Test high quality playbook detection"""
|
|
# High quality: APPROVED, >= 95% success rate, >= 10 successes
|
|
playbook = create_test_playbook(
|
|
status=PlaybookStatus.APPROVED,
|
|
success_count=20,
|
|
failure_count=1,
|
|
)
|
|
|
|
assert playbook.is_high_quality is True
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_playbook_not_high_quality_draft(self):
|
|
"""Test draft playbook is not high quality"""
|
|
playbook = create_test_playbook(
|
|
status=PlaybookStatus.DRAFT,
|
|
success_count=20,
|
|
failure_count=0,
|
|
)
|
|
|
|
assert playbook.is_high_quality is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_delete_playbook_soft_delete(self, service, mock_repo):
|
|
"""Test soft delete (deprecation)"""
|
|
playbook = create_test_playbook()
|
|
await mock_repo.create(playbook)
|
|
|
|
result = await service.delete(playbook.playbook_id)
|
|
|
|
assert result is True
|
|
deleted = await service.get_by_id(playbook.playbook_id)
|
|
assert deleted.status == PlaybookStatus.DEPRECATED
|