Files
awoooi/apps/api/src/services/playbook_service.py
Your Name a0a0731cd6
All checks were successful
Code Review / ai-code-review (push) Successful in 10s
CD Pipeline / tests (push) Successful in 5m46s
CD Pipeline / build-and-deploy (push) Successful in 4m6s
CD Pipeline / post-deploy-checks (push) Successful in 1m28s
fix(auto-repair): preserve exact playbook candidates
2026-05-13 23:38:19 +08:00

889 lines
31 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Playbook Service - #7 Playbook 萃取
===================================
Playbook 業務邏輯層
Phase 7.3: Service 實作
Phase 3 ADR-030: RAG 向量搜尋整合
建立時間: 2026-03-26 (台北時區)
建立者: Claude Code (#7 Playbook 萃取)
遵循 leWOOOgo 積木化原則:
- Service 層只依賴 Repository Interface
- 不直接存取 Redis/DB
- 封裝所有業務邏輯
"""
import re as _re
from typing import Protocol
import structlog
from src.models.incident import Incident, IncidentStatus
from src.models.playbook import (
ActionType,
Playbook,
PlaybookRecommendation,
PlaybookSource,
PlaybookStatus,
RepairStep,
RiskLevel,
SymptomPattern,
generate_playbook_id,
)
from src.repositories.interfaces import IPlaybookRepository
from src.repositories.playbook_repository import get_playbook_repository
from src.services.playbook_rag import PlaybookMatch, get_playbook_rag_service
from src.utils.timezone import now_taipei
logger = structlog.get_logger(__name__)
def _parse_ssh_command(ssh_cmd: str) -> tuple[str, str]:
"""
從 SSH 指令字串中分離主機名與實際執行指令。
Task 3.3 (2026-04-14): SSH 修復 KM 萃取輔助函式
支援格式:
ssh 192.168.0.188 'docker restart minio'
ssh root@192.168.0.110 'systemctl restart ollama || docker restart ollama'
ssh {host} "cd /data/harbor && docker-compose up -d"
Returns:
(host, inner_command) — 無法解析時回傳 ("", original_cmd)
"""
m = _re.match(
r"ssh\s+(?:[a-zA-Z0-9_]+@)?([\w.\-:{}]+)\s+['\"](.+)['\"]",
ssh_cmd.strip(),
_re.DOTALL,
)
if m:
return m.group(1), m.group(2)
# fallback: 空 host保留完整命令
return "", ssh_cmd
class IPlaybookService(Protocol):
"""Playbook Service Interface"""
async def extract_from_incident(
self,
incident: Incident,
auto_approve: bool = False,
) -> Playbook | None:
"""從成功案例萃取 Playbook"""
...
async def get_recommendations(
self,
symptoms: SymptomPattern,
top_k: int = 3,
use_rag: bool = True,
) -> list[PlaybookRecommendation]:
"""取得 Playbook 推薦"""
...
async def get_by_id(self, playbook_id: str) -> Playbook | None:
"""取得 Playbook"""
...
async def create_new_version(
self,
base_playbook_id: str,
candidate: Playbook,
reason: str,
) -> Playbook | None:
"""從既有 Playbook 建立下一版"""
...
async def update_with_validation(
self,
playbook_id: str,
update_data: dict,
) -> Playbook | None:
"""驗證後更新 Playbook"""
...
async def approve(
self,
playbook_id: str,
approved_by: str,
notes: str | None = None,
) -> Playbook | None:
"""核准 Playbook"""
...
async def record_execution(
self,
playbook_id: str,
success: bool,
) -> bool:
"""記錄 Playbook 執行結果"""
...
class PlaybookService:
"""
Playbook Service 實作
職責:
- 從 Incident 萃取 Playbook
- 提供 Playbook 推薦 (混合搜尋: Jaccard + RAG)
- 管理 Playbook 生命週期
- 維護向量索引
"""
def __init__(self, repository: IPlaybookRepository | None = None):
self._repository = repository or get_playbook_repository()
async def _get_rag_service(self):
"""
取得 RAG Service — 每次走工廠,不在 Service 層快取
2026-04-04 ogt: 首席架構師 Review — 移除 Service 層快取
原因: PlaybookService 快取舊實例會繞過工廠的 is_closed 重建邏輯
由 get_playbook_rag_service() 工廠統一管理生命週期
"""
return await get_playbook_rag_service()
# === Core Operations ===
async def extract_from_incident(
self,
incident: Incident,
auto_approve: bool = False,
) -> Playbook | None:
"""
從成功案例萃取 Playbook
前置條件:
- Incident 狀態為 RESOLVED 或 CLOSED
- outcome.execution_success == True
- outcome.effectiveness_score >= 4
Args:
incident: 來源 Incident
auto_approve: 是否自動核准 (僅限高信心度)
Returns:
Playbook | None
"""
# 1. 驗證前置條件
if incident.status not in [IncidentStatus.RESOLVED, IncidentStatus.CLOSED]:
logger.warning(
"playbook_extract_invalid_status",
incident_id=incident.incident_id,
status=incident.status,
)
return None
if not incident.outcome or not incident.outcome.execution_success:
logger.warning(
"playbook_extract_no_successful_outcome",
incident_id=incident.incident_id,
)
return None
effectiveness = incident.outcome.effectiveness_score or 0
if effectiveness < 4:
logger.info(
"playbook_extract_low_effectiveness",
incident_id=incident.incident_id,
effectiveness=effectiveness,
)
return None
# 2. 萃取症狀模式
symptom_pattern = self._extract_symptom_pattern(incident)
# 3. 萃取修復步驟
repair_steps = self._extract_repair_steps(incident)
# 4. 計算信心度
confidence = self._calculate_confidence(incident, effectiveness)
# 5. 生成名稱和描述
name = self._generate_name(incident)
description = self._generate_description(incident)
# 6. 建立 Playbook
playbook = Playbook(
name=name,
description=description,
status=PlaybookStatus.APPROVED if auto_approve and confidence >= 0.9 else PlaybookStatus.DRAFT,
source=PlaybookSource.EXTRACTED,
symptom_pattern=symptom_pattern,
repair_steps=repair_steps,
source_incident_ids=[incident.incident_id],
ai_confidence=confidence,
tags=self._extract_tags(incident),
)
# 7. 儲存
playbook = await self._repository.create(playbook)
# 8. ADR-030 Phase 3: 建立向量索引 (非阻塞,失敗不影響主流程)
import asyncio
asyncio.create_task(self._index_playbook_async(playbook))
# 9. 2026-04-04 ogt: 沉澱到 KM (Knowledge Base)
# 統帥鐵律: 所有異常與自動修復紀錄必須回寫 KM
asyncio.create_task(self._write_to_km(playbook, incident))
logger.info(
"playbook_extracted",
playbook_id=playbook.playbook_id,
incident_id=incident.incident_id,
confidence=confidence,
auto_approved=playbook.status == PlaybookStatus.APPROVED,
)
return playbook
async def _write_to_km(self, playbook: Playbook, incident: Incident) -> None:
"""
Playbook 萃取後沉澱到 KM (Knowledge Base)
2026-04-04 ogt: 統帥鐵律 — 異常+自動修復記錄必須回寫 KM
P1-1 2026-04-28 ogt + Claude Sonnet 4.6: 委派 KMWriter 統一契約
"""
from src.models.knowledge import EntrySource, EntryType
from src.services.km_writer import KMWritePayload, km_write_with_flag
# 組 Playbook 修復步驟摘要
steps_text = "\n".join(
f"{i+1}. [{s.action_type}] {s.command}"
for i, s in enumerate(playbook.repair_steps)
) or "(無明確修復步驟)"
alert_names = ", ".join(playbook.symptom_pattern.alert_names) or "未知"
services = ", ".join(playbook.symptom_pattern.affected_services) or "未知"
body = (
f"# Playbook: {playbook.name}\n\n"
f"**來源 Incident**: {', '.join(playbook.source_incident_ids)}\n"
f"**AI 信心度**: {playbook.ai_confidence:.0%}\n"
f"**狀態**: {playbook.status.value}\n\n"
f"## 症狀模式\n"
f"- 告警: {alert_names}\n"
f"- 受影響服務: {services}\n\n"
f"## 修復步驟\n{steps_text}\n\n"
f"## 描述\n{playbook.description}"
)
payload = KMWritePayload(
path_type="playbook_extract",
entry_create_kwargs={
"title": f"[Playbook] {playbook.name}",
"content": body,
"entry_type": EntryType.INCIDENT_CASE,
"category": "auto_repair",
"tags": [*playbook.tags, "playbook", "auto_extracted", playbook.status.value],
"source": EntrySource.AI_EXTRACTED,
"related_incident_id": incident.incident_id,
"created_by": "playbook_service",
},
incident_id=incident.incident_id,
)
result = await km_write_with_flag(payload)
logger.info(
"playbook_written_to_km",
playbook_id=playbook.playbook_id,
incident_id=incident.incident_id,
km_result=result.value,
)
async def _index_playbook_async(self, playbook: Playbook) -> None:
"""非同步建立 Playbook 向量索引 (ADR-030 Phase 3)"""
try:
rag_service = await self._get_rag_service()
success = await rag_service.index_playbook(playbook)
if success:
logger.debug(
"playbook_indexed",
playbook_id=playbook.playbook_id,
)
except Exception as e:
logger.warning(
"playbook_index_failed",
playbook_id=playbook.playbook_id,
error=str(e),
)
async def get_recommendations(
self,
symptoms: SymptomPattern,
top_k: int = 3,
use_rag: bool = True,
) -> list[PlaybookRecommendation]:
"""
取得 Playbook 推薦
ADR-030 Phase 3 策略:
1. Jaccard 精確匹配 (Repository)
2. RAG 向量語意搜尋 (可選)
3. 混合排序 (Jaccard 40% + Vector 60%)
4. 按 similarity_score * success_rate 排序
"""
# Step 1: Jaccard 精確匹配
similar_playbooks = await self._repository.find_by_symptoms(
symptoms=symptoms,
top_k=top_k * 2, # 多取一些用於後續過濾
min_similarity=0.4,
)
jaccard_results = [(pb.playbook_id, sim) for pb, sim in similar_playbooks]
playbook_map = {pb.playbook_id: pb for pb, _ in similar_playbooks}
# Step 2: RAG 混合搜尋 (如果啟用)
if use_rag and symptoms.alert_names:
try:
rag_service = await self._get_rag_service()
hybrid_matches = await rag_service.hybrid_search(
symptoms=symptoms,
jaccard_results=jaccard_results,
top_k=top_k * 2,
vector_weight=0.6,
jaccard_weight=0.4,
)
hybrid_by_id = {match.playbook_id: match for match in hybrid_matches}
for playbook_id, jaccard_score in jaccard_results:
if playbook_id in hybrid_by_id:
continue
hybrid_matches.append(
PlaybookMatch(
playbook_id=playbook_id,
similarity_score=jaccard_score,
match_type="jaccard",
)
)
# 補充 playbook_map (RAG 可能找到 Jaccard 沒找到的)
for match in hybrid_matches:
if match.playbook_id not in playbook_map:
pb = await self._repository.get_by_id(match.playbook_id)
if pb:
playbook_map[match.playbook_id] = pb
# 使用混合結果
final_results = [
(playbook_map[m.playbook_id], m.similarity_score)
for m in hybrid_matches
if m.playbook_id in playbook_map
]
logger.info(
"playbook_recommendation_hybrid",
jaccard_count=len(jaccard_results),
hybrid_count=len(final_results),
)
except Exception as e:
# RAG 失敗時 fallback 到純 Jaccard
logger.warning(
"playbook_rag_fallback",
error=str(e),
)
final_results = similar_playbooks
else:
final_results = similar_playbooks
if not final_results:
return []
# Step 3: 建立推薦列表
recommendations: list[PlaybookRecommendation] = []
for playbook, similarity in final_results:
# 找出匹配的症狀
matched_symptoms = self._find_matched_symptoms(symptoms, playbook.symptom_pattern)
# 生成推薦原因
reason = self._generate_recommendation_reason(
playbook,
similarity,
matched_symptoms,
)
recommendations.append(
PlaybookRecommendation(
playbook=playbook,
similarity_score=similarity,
matched_symptoms=matched_symptoms,
reason=reason,
)
)
# Step 4: 先保住 exact signal避免精準 Playbook 被語意近似項擠掉。
recommendations.sort(
key=lambda r: self._recommendation_priority(r, symptoms),
reverse=True,
)
return recommendations[:top_k]
async def approve(
self,
playbook_id: str,
approved_by: str,
notes: str | None = None,
) -> Playbook | None:
"""核准 Playbook"""
playbook = await self._repository.get_by_id(playbook_id)
if not playbook:
return None
if playbook.status != PlaybookStatus.DRAFT:
logger.warning(
"playbook_approve_invalid_status",
playbook_id=playbook_id,
current_status=playbook.status,
)
return None
playbook.status = PlaybookStatus.APPROVED
playbook.approved_by = approved_by
playbook.approved_at = now_taipei()
if notes:
playbook.notes = notes
updated = await self._repository.update(playbook)
if updated:
logger.info(
"playbook_approved",
playbook_id=playbook_id,
approved_by=approved_by,
)
return updated
async def record_execution(
self,
playbook_id: str,
success: bool,
) -> bool:
"""記錄 Playbook 執行結果"""
return await self._repository.update_stats(playbook_id, success)
# === CRUD Proxies ===
# 2026-04-05 Claude Code: C2 修正 — 提供 create() proxyRouter 不再直接呼叫 _repository
async def create(self, playbook: Playbook) -> Playbook:
"""直接建立 Playbook管理/seed 用途)"""
return await self._repository.create(playbook)
async def get_by_id(self, playbook_id: str) -> Playbook | None:
"""取得 Playbook"""
return await self._repository.get_by_id(playbook_id)
async def list_playbooks(
self,
status: PlaybookStatus | None = None,
tags: list[str] | None = None,
limit: int = 20,
offset: int = 0,
) -> tuple[list[Playbook], int]:
"""列出 Playbooks"""
return await self._repository.list_playbooks(
status=status,
tags=tags,
limit=limit,
offset=offset,
)
async def update(self, playbook: Playbook) -> Playbook | None:
"""更新 Playbook"""
return await self._repository.update(playbook)
async def create_new_version(
self,
base_playbook_id: str,
candidate: Playbook,
reason: str,
) -> Playbook | None:
"""
從既有 Playbook 建立下一版。
ADR-104 T4: LLM 生成的改良方案不覆蓋舊 Playbook而是建立 lineage
root(parent_playbook_id) -> v2 -> v3。舊版在新版 APPROVED 前仍可用。
"""
base = await self._repository.get_by_id(base_playbook_id)
if not base:
logger.warning("playbook_version_base_missing", base_playbook_id=base_playbook_id)
return None
root_id = base.parent_playbook_id or base.playbook_id
candidate.playbook_id = generate_playbook_id()
candidate.version = base.version + 1
candidate.parent_playbook_id = root_id
candidate.supersedes_playbook_id = base.playbook_id
candidate.version_reason = reason[:500]
candidate.success_count = 0
candidate.failure_count = 0
candidate.last_used_at = None
candidate.approved_by = None
candidate.approved_at = None
if base.playbook_id not in candidate.source_incident_ids:
candidate.notes = (
(candidate.notes or "")
+ f"\n[Version lineage: v{candidate.version} supersedes {base.playbook_id}]"
).strip()
created = await self._repository.create(candidate)
logger.info(
"playbook_version_created",
playbook_id=created.playbook_id,
base_playbook_id=base.playbook_id,
root_playbook_id=root_id,
version=created.version,
reason=reason,
)
return created
async def update_with_validation(
self,
playbook_id: str,
update_data: dict,
) -> Playbook | None:
"""
更新 Playbook (含驗證)
Phase 8 P1 修復: 從 Router 層移至 Service 層進行驗證
驗證規則:
- 禁止直接修改 playbook_id
- 禁止反向狀態轉換 (APPROVED → DRAFT)
- 統計欄位 (success_count, failure_count) 只能透過 record_execution 更新
Args:
playbook_id: Playbook ID
update_data: 要更新的欄位 (dict)
Returns:
更新後的 Playbook 或 None
"""
playbook = await self._repository.get_by_id(playbook_id)
if not playbook:
return None
# 禁止修改的欄位
forbidden_fields = {
"playbook_id",
"created_at",
"success_count",
"failure_count",
"last_used_at",
}
for field in forbidden_fields:
if field in update_data:
logger.warning(
"playbook_update_forbidden_field",
playbook_id=playbook_id,
field=field,
)
del update_data[field]
# 狀態轉換驗證
if "status" in update_data:
new_status = update_data["status"]
current_status = playbook.status
# 允許的轉換: DRAFT → APPROVED, APPROVED → DEPRECATED
# 禁止: APPROVED → DRAFT, DEPRECATED → 任何
if current_status == PlaybookStatus.DEPRECATED:
logger.warning(
"playbook_update_deprecated_status",
playbook_id=playbook_id,
)
return None
if (
current_status == PlaybookStatus.APPROVED
and new_status == PlaybookStatus.DRAFT
):
logger.warning(
"playbook_update_invalid_status_transition",
playbook_id=playbook_id,
from_status=current_status.value,
to_status=new_status,
)
return None
# 應用更新
# 2026-04-20 ogt + Claude Opus 4.7: setattr 不觸發 Pydantic validation
# Evolver 傳入 PlaybookStatus.DEPRECATED.valuestr "deprecated"
# → _pg_upsert playbook.status.value 炸:'str' has no attribute 'value'
# 修Enum 欄位強制轉型,防止 str 混入 Playbook 物件
for field, value in update_data.items():
if value is not None and hasattr(playbook, field):
if field == "status" and isinstance(value, str) and not isinstance(value, PlaybookStatus):
try:
value = PlaybookStatus(value)
except ValueError:
pass
elif field == "source" and isinstance(value, str) and not isinstance(value, PlaybookSource):
try:
value = PlaybookSource(value)
except ValueError:
pass
setattr(playbook, field, value)
return await self._repository.update(playbook)
async def delete(self, playbook_id: str) -> bool:
"""刪除 Playbook (軟刪除)"""
return await self._repository.delete(playbook_id)
# === Private Helpers ===
def _extract_symptom_pattern(self, incident: Incident) -> SymptomPattern:
"""從 Incident 萃取症狀模式"""
alert_names = [s.alert_name for s in incident.signals] if incident.signals else []
keywords = []
# 從 annotations 提取關鍵字
for signal in incident.signals or []:
if signal.annotations:
for value in signal.annotations.values():
if isinstance(value, str) and len(value) < 50:
keywords.append(value)
return SymptomPattern(
alert_names=alert_names,
affected_services=incident.affected_services or [],
severity_range=[incident.severity.value] if incident.severity else ["P2"],
keywords=keywords[:10], # 最多 10 個關鍵字
)
def _extract_repair_steps(self, incident: Incident) -> list[RepairStep]:
"""
從 Incident 萃取修復步驟
Task 3.3 (2026-04-14): 補齊 SSH 修復路徑。原本只處理 kubectl
新增 last_repair_action 作為第三優先來源,支援 SSH_COMMAND 類型。
優先順序:
1. decision_chain.reasoning_steps — kubectl 命令AI 推論步驟)
2. outcome.learning_notes — kubectl 命令(人工補充)
3. outcome.last_repair_action — SSH 或 kubectl實際執行動作Task 3.3 新增)
"""
steps: list[RepairStep] = []
step_number = 1
# 1. 從 decision_chain.reasoning_steps 提取 kubectl 命令
if incident.decision_chain and incident.decision_chain.reasoning_steps:
for reasoning in incident.decision_chain.reasoning_steps:
if "kubectl" in reasoning.lower():
kubectl_match = _re.search(r"kubectl\s+\S+.*", reasoning)
if kubectl_match:
steps.append(
RepairStep(
step_number=step_number,
action_type=ActionType.KUBECTL,
command=kubectl_match.group(0).strip(),
risk_level=RiskLevel.MEDIUM,
)
)
step_number += 1
# 2. Task 3.3: 從 learning_notes 萃取 kubectl 或 SSH 命令
# learning_notes 由兩個來源寫入:
# a. 人工補充筆記(既有邏輯)
# b. approval_execution._trigger_playbook_extraction 寫入 approval.actionTask 3.3 新增)
if not steps and incident.outcome and incident.outcome.learning_notes:
notes = incident.outcome.learning_notes.strip()
if notes.startswith("ssh "):
# SSH 修復路徑Task 3.3 新增)
host, inner_cmd = _parse_ssh_command(notes)
steps.append(
RepairStep(
step_number=1,
action_type=ActionType.SSH_COMMAND,
command=inner_cmd or notes,
risk_level=RiskLevel.MEDIUM,
)
)
logger.info(
"playbook_ssh_step_extracted",
host=host or "unknown",
inner_cmd_preview=(inner_cmd or notes)[:60],
)
elif "kubectl" in notes.lower():
# kubectl 路徑(原有邏輯,移入此區塊統一處理)
kubectl_match = _re.search(r"kubectl\s+\S+.*", notes)
if kubectl_match:
steps.append(
RepairStep(
step_number=1,
action_type=ActionType.KUBECTL,
command=kubectl_match.group(0).strip(),
risk_level=RiskLevel.MEDIUM,
)
)
else:
steps.append(
RepairStep(
step_number=1,
action_type=ActionType.KUBECTL,
command=notes,
risk_level=RiskLevel.MEDIUM,
)
)
return steps
def _calculate_confidence(self, incident: Incident, effectiveness: int) -> float:
"""計算 AI 萃取信心度"""
base_score = 0.5
# effectiveness 貢獻 (4-5 → 0.2-0.4)
effectiveness_bonus = (effectiveness - 3) * 0.2
# 有 decision_chain 加分
if incident.decision_chain and incident.decision_chain.reasoning_steps:
base_score += 0.1
# 有多個 signals 加分 (更多資料)
if incident.signals and len(incident.signals) >= 2:
base_score += 0.05
return min(base_score + effectiveness_bonus, 1.0)
def _generate_name(self, incident: Incident) -> str:
"""生成 Playbook 名稱Task 3.3: SSH 修復加 [SSH] 前綴)"""
alert_name = incident.signals[0].alert_name if incident.signals else "Unknown"
services = incident.affected_services[:2] if incident.affected_services else []
service_str = "/".join(services) if services else "system"
# 偵測 SSH 修復路徑 — 加前綴以利搜尋與過濾Task 3.3
notes = (incident.outcome.learning_notes or "") if incident.outcome else ""
prefix = "[SSH] " if notes.strip().startswith("ssh ") else ""
return f"{prefix}{alert_name} - {service_str} 修復劇本"
def _generate_description(self, incident: Incident) -> str:
"""生成 Playbook 描述"""
parts = []
if incident.signals:
parts.append(f"觸發告警: {incident.signals[0].alert_name}")
if incident.affected_services:
parts.append(f"影響服務: {', '.join(incident.affected_services)}")
# 從 decision_chain.hypothesis 取得 AI 分析結果
if incident.decision_chain and incident.decision_chain.hypothesis:
parts.append(f"AI 分析: {incident.decision_chain.hypothesis[:100]}")
return ". ".join(parts) if parts else "從成功案例自動萃取的修復劇本"
def _extract_tags(self, incident: Incident) -> list[str]:
"""萃取標籤Task 3.3: SSH 修復自動加 ssh 標籤)"""
tags: set[str] = set()
# 從服務名稱提取
for service in incident.affected_services or []:
tags.add(service.lower())
# 從告警名稱提取類型
if incident.signals:
for signal in incident.signals:
if "cpu" in signal.alert_name.lower():
tags.add("cpu")
if "memory" in signal.alert_name.lower():
tags.add("memory")
if "pod" in signal.alert_name.lower():
tags.add("kubernetes")
if "network" in signal.alert_name.lower():
tags.add("network")
# Task 3.3: SSH 修復加標籤learning_notes 以 ssh 開頭 → 主機層修復)
notes = (incident.outcome.learning_notes or "") if incident.outcome else ""
if notes.strip().startswith("ssh "):
tags.add("ssh")
tags.add("host_layer")
return list(tags)[:10]
def _find_matched_symptoms(
self,
query: SymptomPattern,
playbook_pattern: SymptomPattern,
) -> list[str]:
"""找出匹配的症狀"""
matched = []
# 匹配的告警
alert_matches = set(query.alert_names) & set(playbook_pattern.alert_names)
for alert in alert_matches:
matched.append(f"Alert: {alert}")
# 匹配的服務
service_matches = set(query.affected_services) & set(playbook_pattern.affected_services)
for service in service_matches:
matched.append(f"Service: {service}")
# 匹配的嚴重度
if set(query.severity_range) & set(playbook_pattern.severity_range):
matched.append(f"Severity: {query.severity_range[0]}")
return matched
@staticmethod
def _normalized_overlap(left: list[str], right: list[str]) -> bool:
left_values = {value.casefold() for value in left if value}
right_values = {value.casefold() for value in right if value}
return bool(left_values & right_values)
def _recommendation_priority(
self,
recommendation: PlaybookRecommendation,
symptoms: SymptomPattern,
) -> tuple[bool, bool, float]:
pattern = recommendation.playbook.symptom_pattern
alert_exact = self._normalized_overlap(symptoms.alert_names, pattern.alert_names)
service_exact = self._normalized_overlap(symptoms.affected_services, pattern.affected_services)
quality_score = recommendation.similarity_score * (
0.5 + 0.5 * recommendation.playbook.success_rate
)
return (alert_exact, service_exact, quality_score)
def _generate_recommendation_reason(
self,
playbook: Playbook,
similarity: float,
matched_symptoms: list[str],
) -> str:
"""生成推薦原因"""
parts = []
parts.append(f"相似度 {similarity:.0%}")
if playbook.success_rate > 0:
parts.append(f"成功率 {playbook.success_rate:.0%}")
if playbook.total_executions > 0:
parts.append(f"已執行 {playbook.total_executions}")
if matched_symptoms:
parts.append(f"匹配: {', '.join(matched_symptoms[:3])}")
return ". ".join(parts)
# =============================================================================
# Singleton
# =============================================================================
_service: PlaybookService | None = None
def get_playbook_service() -> IPlaybookService:
"""取得 PlaybookService 單例"""
global _service
if _service is None:
_service = PlaybookService()
return _service