Files
awoooi/apps/api/src/services/approval_execution.py
OG T 2e75a20150 feat(api): Phase 7.5-7.6 Playbook 整合決策與自動萃取
Phase 7.5: DecisionManager 三軌決策
- 新增 Playbook 優先匹配 (similarity >= 85%)
- 三軌決策順序: Playbook > LLM > Expert System
- 整合 PlaybookService 推薦引擎

Phase 7.6: 自動萃取機制
- approval_execution.py 成功執行後觸發萃取
- 條件: RESOLVED/CLOSED + effectiveness >= 4
- 滿分 (5) 自動核准 Playbook

測試:
- 13 個 Playbook 單元測試全部通過
- 修復 Incident 模型欄位對應 (reasoning_steps)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-03-26 11:09:25 +08:00

407 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Approval Execution Service - Phase 16 R4.2 瘦身 Router 抽取
============================================================
從 approvals.py 抽取執行編排邏輯,整合:
- OperationParser: 解析操作類型
- K8s Executor: 執行 K8s 操作
- ApprovalDBService: 更新狀態
- TimelineService: 記錄事件
- NotificationManager: 發送通知
- Phase 7.6: Playbook 自動萃取
版本: v1.1
建立: 2026-03-25 (台北時區)
更新: 2026-03-26 (Phase 7.6 自動萃取)
建立者: Claude Code (Phase 16 R4.2)
"""
import asyncio
from typing import TYPE_CHECKING
import structlog
from src.core.config import settings
from src.models.approval import ApprovalRequest
from src.services.approval_db import get_approval_service, get_timeline_service
from src.services.executor import get_executor
from src.services.operation_parser import parse_operation_from_action
if TYPE_CHECKING:
from src.services.notifications import ExecutionStatus
logger = structlog.get_logger(__name__)
class ApprovalExecutionService:
"""
授權執行服務 - 編排整個執行流程
職責:
1. 解析操作類型
2. 呼叫 K8s Executor 執行
3. 更新資料庫狀態
4. 記錄 Timeline 事件
5. 發送通知
"""
async def execute_approved_action(self, approval: ApprovalRequest) -> None:
"""
背景執行已批准的操作
此函數由 BackgroundTasks 呼叫,不阻塞 API 回應
Phase 5: 執行後更新資料庫狀態
Phase 6: 執行後發送通知 (Post-Execution Hook)
Args:
approval: 已批准的授權請求
"""
from src.services.notifications import ExecutionStatus
logger.info(
"background_execution_start",
approval_id=str(approval.id),
action=approval.action,
)
service = get_approval_service()
timeline = get_timeline_service()
# Parse operation details
parsed = parse_operation_from_action(approval.action)
operation_type = parsed.operation_type
resource_name = parsed.resource_name
namespace = parsed.namespace
if operation_type is None or resource_name is None:
logger.warning(
"background_execution_skip",
approval_id=str(approval.id),
reason="Could not parse operation type from action",
action=approval.action,
)
# Phase 5: 更新資料庫狀態
await service.update_execution_status(approval.id, success=False)
await timeline.add_event(
event_type="exec",
status="error",
title="執行失敗: 無法解析操作類型",
description=f"Action: {approval.action}",
actor="leWOOOgo",
actor_role="executor",
approval_id=str(approval.id),
)
# Phase 6: 發送失敗通知 (fire-and-forget)
asyncio.create_task(
self._send_execution_notification(
approval=approval,
execution_status=ExecutionStatus.FAILED,
operation_type="unknown",
namespace=namespace,
error_message="Could not parse operation type",
)
)
return
# Execute with audit
executor = get_executor()
result = await executor.execute_with_audit(
approval=approval,
operation_type=operation_type,
resource_name=resource_name,
namespace=namespace,
)
# Phase 5: 更新資料庫狀態
await service.update_execution_status(approval.id, success=result.success)
# Update approval status based on result
if result.success:
logger.info(
"background_execution_success",
approval_id=str(approval.id),
operation=operation_type.value,
target=resource_name,
namespace=namespace,
duration_ms=result.duration_ms,
)
await timeline.add_event(
event_type="exec",
status="success",
title=f"✅ K8s 執行成功: {operation_type.value}",
description=f"Target: {resource_name} @ {namespace} ({result.duration_ms}ms)",
actor="leWOOOgo",
actor_role="executor",
approval_id=str(approval.id),
)
# Phase 6: 發送成功通知 (fire-and-forget)
asyncio.create_task(
self._send_execution_notification(
approval=approval,
execution_status=ExecutionStatus.SUCCESS,
operation_type=operation_type.value,
namespace=namespace,
duration_ms=result.duration_ms,
)
)
# Phase 7.6: 觸發 Playbook 自動萃取 (fire-and-forget)
asyncio.create_task(
self._trigger_playbook_extraction(approval)
)
else:
logger.error(
"background_execution_failed",
approval_id=str(approval.id),
operation=operation_type.value,
target=resource_name,
namespace=namespace,
error=result.error,
)
await timeline.add_event(
event_type="exec",
status="error",
title=f"❌ K8s 執行失敗: {operation_type.value}",
description=f"Error: {result.error}",
actor="leWOOOgo",
actor_role="executor",
approval_id=str(approval.id),
)
# Phase 6: 發送失敗通知 (fire-and-forget, 包含 Dry-Run 攔截)
exec_status = (
ExecutionStatus.DRY_RUN_BLOCKED
if "not found" in (result.error or "")
else ExecutionStatus.FAILED
)
asyncio.create_task(
self._send_execution_notification(
approval=approval,
execution_status=exec_status,
operation_type=operation_type.value,
namespace=namespace,
error_message=result.error,
duration_ms=result.duration_ms,
)
)
async def _send_execution_notification(
self,
approval: ApprovalRequest,
execution_status: "ExecutionStatus",
operation_type: str,
namespace: str,
duration_ms: int | None = None,
error_message: str | None = None,
) -> None:
"""
Phase 6: 發送執行通知 (Post-Execution Hook)
將執行結果發送至所有已配置的通知頻道 (Discord, Slack, etc.)
"""
from src.services.notifications import (
NotificationMessage,
get_notification_manager,
)
if not settings.NOTIFICATION_ENABLED:
logger.info("notification_disabled", approval_id=str(approval.id))
return
try:
# 建構簽核者列表
signers = [
{"name": sig.signer_name, "comment": sig.comment or ""}
for sig in approval.signatures
]
# 建構通知訊息
message = NotificationMessage(
execution_status=execution_status,
action_title=approval.action[:100],
action_description=approval.description[:200] if approval.description else "",
approval_id=str(approval.id),
signers=signers,
required_signatures=approval.required_signatures,
affected_pods=approval.blast_radius.affected_pods if approval.blast_radius else 0,
estimated_downtime=approval.blast_radius.estimated_downtime if approval.blast_radius else "N/A",
related_services=approval.blast_radius.related_services if approval.blast_radius else [],
data_impact=approval.blast_radius.data_impact.value if approval.blast_radius else "none",
namespace=namespace,
operation_type=operation_type,
duration_ms=duration_ms,
error_message=error_message,
risk_level=approval.risk_level.value,
ai_provider=approval.requested_by,
)
# 發送通知
manager = get_notification_manager()
results = await manager.send_all(message)
for result in results:
logger.info(
"notification_result",
approval_id=str(approval.id),
provider=result.provider,
status=result.status.value,
message=result.message,
)
except Exception as e:
logger.exception(
"notification_failed",
approval_id=str(approval.id),
error=str(e),
)
async def _trigger_playbook_extraction(
self,
approval: ApprovalRequest,
) -> None:
"""
Phase 7.6: 觸發 Playbook 自動萃取
條件:
- 執行成功
- 關聯的 Incident 狀態為 RESOLVED 或 CLOSED
- effectiveness_score >= 4
此函數為 fire-and-forget失敗不影響主流程
"""
try:
# 1. 從 approval 取得關聯的 incident_id
# approval.requested_by 可能包含 incident 資訊,或從 metadata 取得
# 暫時從 description 或 action 解析
incident_id = self._extract_incident_id_from_approval(approval)
if not incident_id:
logger.debug(
"playbook_extraction_skip",
approval_id=str(approval.id),
reason="No incident_id found",
)
return
# 2. 取得 Incident
from src.services.incident_service import get_incident_service
incident_service = get_incident_service()
incident = await incident_service.get_incident(incident_id)
if not incident:
logger.debug(
"playbook_extraction_skip",
approval_id=str(approval.id),
incident_id=incident_id,
reason="Incident not found",
)
return
# 3. 檢查 Incident 狀態
from src.models.incident import IncidentStatus
if incident.status not in [IncidentStatus.RESOLVED, IncidentStatus.CLOSED]:
logger.debug(
"playbook_extraction_skip",
approval_id=str(approval.id),
incident_id=incident_id,
incident_status=incident.status.value,
reason="Incident not resolved",
)
return
# 4. 檢查 effectiveness_score
effectiveness = incident.outcome.effectiveness_score if incident.outcome else 0
if effectiveness < 4:
logger.debug(
"playbook_extraction_skip",
approval_id=str(approval.id),
incident_id=incident_id,
effectiveness=effectiveness,
reason="Low effectiveness score",
)
return
# 5. 觸發萃取
from src.services.playbook_service import get_playbook_service
playbook_service = get_playbook_service()
playbook = await playbook_service.extract_from_incident(
incident=incident,
auto_approve=effectiveness >= 5, # 滿分自動核准
)
if playbook:
logger.info(
"playbook_auto_extracted",
approval_id=str(approval.id),
incident_id=incident_id,
playbook_id=playbook.playbook_id,
playbook_name=playbook.name,
auto_approved=playbook.status.value == "approved",
)
else:
logger.debug(
"playbook_extraction_no_result",
approval_id=str(approval.id),
incident_id=incident_id,
)
except Exception as e:
# 萃取失敗不影響主流程
logger.warning(
"playbook_extraction_error",
approval_id=str(approval.id),
error=str(e),
)
def _extract_incident_id_from_approval(
self,
approval: ApprovalRequest,
) -> str | None:
"""
從 approval 提取關聯的 incident_id
嘗試以下來源:
1. approval.metadata (如果有)
2. approval.description 中的 INC- 模式
3. approval.requested_by 中的 incident 資訊
"""
import re
# 從 description 或 action 中尋找 INC-XXXXXX 模式
text = f"{approval.description or ''} {approval.action or ''}"
match = re.search(r"INC-([A-Z0-9-]+)", text)
if match:
return match.group(0) # 返回完整的 INC-XXXXX
# 從 requested_by 尋找
if approval.requested_by and "INC-" in approval.requested_by:
match = re.search(r"INC-([A-Z0-9-]+)", approval.requested_by)
if match:
return match.group(0)
return None
# =============================================================================
# Singleton Instance
# =============================================================================
_execution_service: ApprovalExecutionService | None = None
def get_execution_service() -> ApprovalExecutionService:
"""
取得 ApprovalExecutionService 單例
Returns:
ApprovalExecutionService: 執行服務實例
"""
global _execution_service
if _execution_service is None:
_execution_service = ApprovalExecutionService()
return _execution_service