Files
awoooi/apps/api/src/services/approval_execution.py
OG T df3ef9006c
All checks were successful
CD Pipeline / build-and-deploy (push) Successful in 7m2s
fix(auto-repair): 首席架構師 Review — 4 Critical/Important 修復
Critical #1: KM write task 移出 try/except
- _trigger_learning 的 KM 寫入原在 try 內,learning 失敗時不寫 KM
- 移至 except 後確保成功/失敗都寫入
- 移除冗餘 import asyncio(已在頂層 import)
- Minor: approval.incident_id or None 防空字串

Important #2: migration 加 PRIMARY KEY
- playbook_id 從 UNIQUE 升為 PRIMARY KEY
- prod DB 已執行 ALTER TABLE ADD PRIMARY KEY

Important #3: s.sequence→s.step_number, s.description→s.command
- embed_playbook() 使用不存在的欄位名,RAG 向量索引靜默失敗
- RepairStep 正確欄位: step_number, command

Important #1: PlaybookService._get_rag_service 不再 Service 層快取
- 改為每次呼叫工廠 get_playbook_rag_service()
- 避免舊實例繞過工廠的 is_closed 重建邏輯

冷啟動修復 (首席架構師建議B+C):
- _trigger_playbook_extraction 執行成功後自動設定
  execution_success=True, effectiveness_score=4, status=RESOLVED
- skip 路徑 logger.debug → logger.info 提升可觀測性

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-04 12:02:03 +08:00

527 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Approval Execution Service - Phase 16 R4.2 瘦身 Router 抽取
============================================================
從 approvals.py 抽取執行編排邏輯,整合:
- OperationParser: 解析操作類型
- K8s Executor: 執行 K8s 操作
- ApprovalDBService: 更新狀態
- TimelineService: 記錄事件
- NotificationManager: 發送通知
- Phase 7.6: Playbook 自動萃取
版本: v1.1
建立: 2026-03-25 (台北時區)
更新: 2026-03-26 (Phase 7.6 自動萃取)
建立者: Claude Code (Phase 16 R4.2)
"""
import asyncio
from typing import TYPE_CHECKING
import structlog
from src.core.config import settings
from src.models.approval import ApprovalRequest
from src.services.approval_db import get_approval_service, get_timeline_service
from src.services.executor import get_executor
from src.services.operation_parser import parse_operation_from_action
if TYPE_CHECKING:
from src.services.notifications import ExecutionStatus
logger = structlog.get_logger(__name__)
class ApprovalExecutionService:
"""
授權執行服務 - 編排整個執行流程
職責:
1. 解析操作類型
2. 呼叫 K8s Executor 執行
3. 更新資料庫狀態
4. 記錄 Timeline 事件
5. 發送通知
"""
async def execute_approved_action(self, approval: ApprovalRequest) -> None:
"""
背景執行已批准的操作
此函數由 BackgroundTasks 呼叫,不阻塞 API 回應
Phase 5: 執行後更新資料庫狀態
Phase 6: 執行後發送通知 (Post-Execution Hook)
Args:
approval: 已批准的授權請求
"""
from src.services.notifications import ExecutionStatus
logger.info(
"background_execution_start",
approval_id=str(approval.id),
action=approval.action,
)
service = get_approval_service()
timeline = get_timeline_service()
# Parse operation details
parsed = parse_operation_from_action(approval.action)
operation_type = parsed.operation_type
resource_name = parsed.resource_name
namespace = parsed.namespace
if operation_type is None or resource_name is None:
logger.warning(
"background_execution_skip",
approval_id=str(approval.id),
reason="Could not parse operation type from action",
action=approval.action,
)
# Phase 5: 更新資料庫狀態
await service.update_execution_status(approval.id, success=False)
await timeline.add_event(
event_type="exec",
status="error",
title="執行失敗: 無法解析操作類型",
description=f"Action: {approval.action}",
actor="leWOOOgo",
actor_role="executor",
approval_id=str(approval.id),
)
# Phase 6: 發送失敗通知 (fire-and-forget)
asyncio.create_task(
self._send_execution_notification(
approval=approval,
execution_status=ExecutionStatus.FAILED,
operation_type="unknown",
namespace=namespace,
error_message="Could not parse operation type",
)
)
return
# Execute with audit
executor = get_executor()
result = await executor.execute_with_audit(
approval=approval,
operation_type=operation_type,
resource_name=resource_name,
namespace=namespace,
)
# Phase 5: 更新資料庫狀態
await service.update_execution_status(approval.id, success=result.success)
# Update approval status based on result
if result.success:
logger.info(
"background_execution_success",
approval_id=str(approval.id),
operation=operation_type.value,
target=resource_name,
namespace=namespace,
duration_ms=result.duration_ms,
)
await timeline.add_event(
event_type="exec",
status="success",
title=f"✅ K8s 執行成功: {operation_type.value}",
description=f"Target: {resource_name} @ {namespace} ({result.duration_ms}ms)",
actor="leWOOOgo",
actor_role="executor",
approval_id=str(approval.id),
)
# Phase 6: 發送成功通知 (fire-and-forget)
asyncio.create_task(
self._send_execution_notification(
approval=approval,
execution_status=ExecutionStatus.SUCCESS,
operation_type=operation_type.value,
namespace=namespace,
duration_ms=result.duration_ms,
)
)
# Phase 7.6: 觸發 Playbook 自動萃取 (fire-and-forget)
asyncio.create_task(
self._trigger_playbook_extraction(approval)
)
# ADR-030 Phase 5: 觸發學習服務 (fire-and-forget)
asyncio.create_task(
self._trigger_learning(
approval=approval,
success=True,
duration_seconds=result.duration_ms / 1000 if result.duration_ms else 0,
)
)
else:
logger.error(
"background_execution_failed",
approval_id=str(approval.id),
operation=operation_type.value,
target=resource_name,
namespace=namespace,
error=result.error,
)
await timeline.add_event(
event_type="exec",
status="error",
title=f"❌ K8s 執行失敗: {operation_type.value}",
description=f"Error: {result.error}",
actor="leWOOOgo",
actor_role="executor",
approval_id=str(approval.id),
)
# Phase 6: 發送失敗通知 (fire-and-forget, 包含 Dry-Run 攔截)
exec_status = (
ExecutionStatus.DRY_RUN_BLOCKED
if "not found" in (result.error or "")
else ExecutionStatus.FAILED
)
asyncio.create_task(
self._send_execution_notification(
approval=approval,
execution_status=exec_status,
operation_type=operation_type.value,
namespace=namespace,
error_message=result.error,
duration_ms=result.duration_ms,
)
)
# ADR-030 Phase 5: 觸發學習服務 (失敗案例)
asyncio.create_task(
self._trigger_learning(
approval=approval,
success=False,
error_message=result.error,
duration_seconds=result.duration_ms / 1000 if result.duration_ms else 0,
)
)
async def _trigger_learning(
self,
approval: ApprovalRequest,
success: bool,
duration_seconds: float = 0,
error_message: str | None = None,
) -> None:
"""
ADR-030 Phase 5: 觸發學習服務
處理執行結果,調整信任度和 Playbook 統計
"""
try:
from src.services.learning_service import (
ExecutionResult,
get_learning_service,
)
learning = get_learning_service()
result = ExecutionResult(
approval_id=str(approval.id),
incident_id=approval.incident_id or "",
action=approval.action,
success=success,
error_message=error_message,
duration_seconds=duration_seconds,
)
await learning.process_execution_result(
approval=approval,
result=result,
)
except Exception as e:
# 學習失敗不影響主流程
logger.warning(
"learning_trigger_failed",
approval_id=str(approval.id),
error=str(e),
)
# 2026-04-04 ogt: 執行結果沉澱到 KM — 移出 try/except 確保 learning 失敗也寫入
# 統帥鐵律: 所有異常與自動修復紀錄必須回寫 KM
asyncio.create_task(
self._write_execution_result_to_km(approval, success, error_message)
)
async def _write_execution_result_to_km(
self,
approval: "ApprovalRequest",
success: bool,
error_message: str | None,
) -> None:
"""
執行結果沉澱到 KM (Knowledge Base)
2026-04-04 ogt: 統帥鐵律 — 成功/失敗執行記錄都必須回寫 KM
"""
try:
from src.models.knowledge import EntrySource, EntryType, KnowledgeEntryCreate
from src.services.knowledge_service import get_knowledge_service
status_icon = "" if success else ""
status_text = "成功" if success else f"失敗: {error_message or '未知原因'}"
content = (
f"# {status_icon} 執行記錄: {approval.action[:80]}\n\n"
f"**Approval ID**: {approval.id}\n"
f"**Incident ID**: {approval.incident_id or '未關聯'}\n"
f"**執行結果**: {status_text}\n"
f"**風險等級**: {approval.risk_level.value if approval.risk_level else '未知'}\n\n"
f"## 操作內容\n{approval.description or '無描述'}\n"
)
entry_data = KnowledgeEntryCreate(
title=f"[執行記錄] {status_icon} {approval.action[:60]}",
content=content,
entry_type=EntryType.INCIDENT_CASE,
category="execution_result",
tags=["execution", "auto_repair" if success else "execution_failed"],
source=EntrySource.AI_EXTRACTED,
related_incident_id=approval.incident_id or None,
created_by="approval_execution",
)
await get_knowledge_service().create_entry(entry_data)
logger.debug(
"execution_result_written_to_km",
approval_id=str(approval.id),
success=success,
)
except Exception as e:
logger.warning(
"execution_result_km_write_failed",
approval_id=str(approval.id),
error=str(e),
)
async def _send_execution_notification(
self,
approval: ApprovalRequest,
execution_status: "ExecutionStatus",
operation_type: str,
namespace: str,
duration_ms: int | None = None,
error_message: str | None = None,
) -> None:
"""
Phase 6: 發送執行通知 (Post-Execution Hook)
將執行結果發送至所有已配置的通知頻道 (Discord, Slack, etc.)
"""
from src.services.notifications import (
NotificationMessage,
get_notification_manager,
)
if not settings.NOTIFICATION_ENABLED:
logger.info("notification_disabled", approval_id=str(approval.id))
return
try:
# 建構簽核者列表
signers = [
{"name": sig.signer_name, "comment": sig.comment or ""}
for sig in approval.signatures
]
# 建構通知訊息
message = NotificationMessage(
execution_status=execution_status,
action_title=approval.action[:100],
action_description=approval.description[:200] if approval.description else "",
approval_id=str(approval.id),
signers=signers,
required_signatures=approval.required_signatures,
affected_pods=approval.blast_radius.affected_pods if approval.blast_radius else 0,
estimated_downtime=approval.blast_radius.estimated_downtime if approval.blast_radius else "N/A",
related_services=approval.blast_radius.related_services if approval.blast_radius else [],
data_impact=approval.blast_radius.data_impact.value if approval.blast_radius else "none",
namespace=namespace,
operation_type=operation_type,
duration_ms=duration_ms,
error_message=error_message,
risk_level=approval.risk_level.value,
ai_provider=approval.requested_by,
)
# 發送通知
manager = get_notification_manager()
results = await manager.send_all(message)
for result in results:
logger.info(
"notification_result",
approval_id=str(approval.id),
provider=result.provider,
status=result.status.value,
message=result.message,
)
except Exception as e:
logger.exception(
"notification_failed",
approval_id=str(approval.id),
error=str(e),
)
async def _trigger_playbook_extraction(
self,
approval: ApprovalRequest,
) -> None:
"""
Phase 7.6: 觸發 Playbook 自動萃取
條件:
- 執行成功
- 關聯的 Incident 狀態為 RESOLVED 或 CLOSED
- effectiveness_score >= 4
此函數為 fire-and-forget失敗不影響主流程
"""
try:
# 1. 從 approval 取得關聯的 incident_id
# approval.requested_by 可能包含 incident 資訊,或從 metadata 取得
# 暫時從 description 或 action 解析
incident_id = self._extract_incident_id_from_approval(approval)
if not incident_id:
logger.info(
"playbook_extraction_skipped",
approval_id=str(approval.id),
reason="No incident_id found",
)
return
# 2. 取得 Incident
from src.services.incident_service import get_incident_service
incident_service = get_incident_service()
incident = await incident_service.get_incident(incident_id)
if not incident:
logger.info(
"playbook_extraction_skipped",
approval_id=str(approval.id),
incident_id=incident_id,
reason="Incident not found",
)
return
# 3. 執行成功後自動設定 outcome (冷啟動關鍵)
# 2026-04-04 ogt: 首席架構師 Review — 補上 execution_success + effectiveness_score
# 確保 Playbook 萃取前置條件能成立,不再依賴人工填分
from src.models.incident import IncidentOutcome, IncidentStatus
from src.utils.timezone import now_taipei
if incident.outcome is None:
incident.outcome = IncidentOutcome()
if not incident.outcome.execution_success:
incident.outcome.execution_success = True
if incident.outcome.effectiveness_score is None or incident.outcome.effectiveness_score < 4:
incident.outcome.effectiveness_score = 4 # 系統判斷K8s 執行成功 = 有效
if incident.status not in [IncidentStatus.RESOLVED, IncidentStatus.CLOSED]:
incident.status = IncidentStatus.RESOLVED
incident.resolved_at = now_taipei()
# 回存 Incidentfire-and-forget 路徑,失敗不影響主流程)
await incident_service.save_to_working_memory(incident)
logger.info(
"playbook_extraction_incident_updated",
approval_id=str(approval.id),
incident_id=incident_id,
effectiveness_score=incident.outcome.effectiveness_score,
status=incident.status.value,
)
# 4. 觸發萃取effectiveness 已保證 >= 4
from src.services.playbook_service import get_playbook_service
playbook_service = get_playbook_service()
effectiveness = incident.outcome.effectiveness_score or 4
playbook = await playbook_service.extract_from_incident(
incident=incident,
auto_approve=effectiveness >= 5, # 滿分自動核准
)
if playbook:
logger.info(
"playbook_auto_extracted",
approval_id=str(approval.id),
incident_id=incident_id,
playbook_id=playbook.playbook_id,
playbook_name=playbook.name,
auto_approved=playbook.status.value == "approved",
)
else:
logger.debug(
"playbook_extraction_no_result",
approval_id=str(approval.id),
incident_id=incident_id,
)
except Exception as e:
# 萃取失敗不影響主流程
logger.warning(
"playbook_extraction_error",
approval_id=str(approval.id),
error=str(e),
)
def _extract_incident_id_from_approval(
self,
approval: ApprovalRequest,
) -> str | None:
"""
從 approval 提取關聯的 incident_id
嘗試以下來源:
1. approval.metadata (如果有)
2. approval.description 中的 INC- 模式
3. approval.requested_by 中的 incident 資訊
"""
import re
# 從 description 或 action 中尋找 INC-XXXXXX 模式
text = f"{approval.description or ''} {approval.action or ''}"
match = re.search(r"INC-([A-Z0-9-]+)", text)
if match:
return match.group(0) # 返回完整的 INC-XXXXX
# 從 requested_by 尋找
if approval.requested_by and "INC-" in approval.requested_by:
match = re.search(r"INC-([A-Z0-9-]+)", approval.requested_by)
if match:
return match.group(0)
return None
# =============================================================================
# Singleton Instance
# =============================================================================
_execution_service: ApprovalExecutionService | None = None
def get_execution_service() -> ApprovalExecutionService:
"""
取得 ApprovalExecutionService 單例
Returns:
ApprovalExecutionService: 執行服務實例
"""
global _execution_service
if _execution_service is None:
_execution_service = ApprovalExecutionService()
return _execution_service