Files
awoooi/apps/api/src/services/approval_execution.py
OG T b7ea362efc
All checks were successful
CD Pipeline / build-and-deploy (push) Successful in 12m13s
fix(api): Review #2 技術債清理 — I1/S1/S2/S3 全數修正
I1: error_type 欄位補全
- AnomalyCounter.derive_key_from_incident() 新增
  從 signal.labels 提取 reason/error_type,確保四欄位完整

S1: 三處 signature 建構邏輯統一
- auto_repair_service._derive_anomaly_key() → 委託 derive_key_from_incident()
- approval_execution._get_anomaly_key_from_approval() → 同上
- incident_service.resolve_incident() B4 → 同上
- 消除 3 處重複的 signature 建構程式碼

S2: Redis Pipeline 批次查詢
- get_all_disposition_stats() 從 N+1 hgetall 改為 2 次 Pipeline
- Pipeline 1: 批次 hgetall 所有 disposition key
- Pipeline 2: 批次 hget metadata (alert_name)
- 效能從 O(2N) Redis round-trip 降至 O(2)

S3: auto_repair.py get_incident AttributeError 修復
- get_incident() → get_from_working_memory() (pre-existing bug)

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
2026-04-07 13:13:42 +08:00

559 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Approval Execution Service - Phase 16 R4.2 瘦身 Router 抽取
============================================================
從 approvals.py 抽取執行編排邏輯,整合:
- OperationParser: 解析操作類型
- K8s Executor: 執行 K8s 操作
- ApprovalDBService: 更新狀態
- TimelineService: 記錄事件
- NotificationManager: 發送通知
- Phase 7.6: Playbook 自動萃取
版本: v1.1
建立: 2026-03-25 (台北時區)
更新: 2026-03-26 (Phase 7.6 自動萃取)
建立者: Claude Code (Phase 16 R4.2)
"""
import asyncio
from typing import TYPE_CHECKING
import structlog
from src.core.config import settings
from src.models.approval import ApprovalRequest
from src.services.approval_db import get_approval_service, get_timeline_service
from src.services.executor import get_executor
from src.services.operation_parser import parse_operation_from_action
if TYPE_CHECKING:
from src.services.notifications import ExecutionStatus
logger = structlog.get_logger(__name__)
class ApprovalExecutionService:
"""
授權執行服務 - 編排整個執行流程
職責:
1. 解析操作類型
2. 呼叫 K8s Executor 執行
3. 更新資料庫狀態
4. 記錄 Timeline 事件
5. 發送通知
"""
async def execute_approved_action(self, approval: ApprovalRequest) -> None:
"""
背景執行已批准的操作
此函數由 BackgroundTasks 呼叫,不阻塞 API 回應
Phase 5: 執行後更新資料庫狀態
Phase 6: 執行後發送通知 (Post-Execution Hook)
Args:
approval: 已批准的授權請求
"""
from src.services.notifications import ExecutionStatus
logger.info(
"background_execution_start",
approval_id=str(approval.id),
action=approval.action,
)
service = get_approval_service()
timeline = get_timeline_service()
# Parse operation details
parsed = parse_operation_from_action(approval.action)
operation_type = parsed.operation_type
resource_name = parsed.resource_name
namespace = parsed.namespace
if operation_type is None or resource_name is None:
logger.warning(
"background_execution_skip",
approval_id=str(approval.id),
reason="Could not parse operation type from action",
action=approval.action,
)
# Phase 5: 更新資料庫狀態
await service.update_execution_status(approval.id, success=False)
await timeline.add_event(
event_type="exec",
status="error",
title="執行失敗: 無法解析操作類型",
description=f"Action: {approval.action}",
actor="leWOOOgo",
actor_role="executor",
approval_id=str(approval.id),
)
# Phase 6: 發送失敗通知 (fire-and-forget)
asyncio.create_task(
self._send_execution_notification(
approval=approval,
execution_status=ExecutionStatus.FAILED,
operation_type="unknown",
namespace=namespace,
error_message="Could not parse operation type",
)
)
return
# Execute with audit
executor = get_executor()
result = await executor.execute_with_audit(
approval=approval,
operation_type=operation_type,
resource_name=resource_name,
namespace=namespace,
)
# Phase 5: 更新資料庫狀態
await service.update_execution_status(approval.id, success=result.success)
# Update approval status based on result
if result.success:
logger.info(
"background_execution_success",
approval_id=str(approval.id),
operation=operation_type.value,
target=resource_name,
namespace=namespace,
duration_ms=result.duration_ms,
)
await timeline.add_event(
event_type="exec",
status="success",
title=f"✅ K8s 執行成功: {operation_type.value}",
description=f"Target: {resource_name} @ {namespace} ({result.duration_ms}ms)",
actor="leWOOOgo",
actor_role="executor",
approval_id=str(approval.id),
)
# Phase 6: 發送成功通知 (fire-and-forget)
asyncio.create_task(
self._send_execution_notification(
approval=approval,
execution_status=ExecutionStatus.SUCCESS,
operation_type=operation_type.value,
namespace=namespace,
duration_ms=result.duration_ms,
)
)
# Phase 7.6: 觸發 Playbook 自動萃取 (fire-and-forget)
asyncio.create_task(
self._trigger_playbook_extraction(approval)
)
# ADR-030 Phase 5: 觸發學習服務 (fire-and-forget)
asyncio.create_task(
self._trigger_learning(
approval=approval,
success=True,
duration_seconds=result.duration_ms / 1000 if result.duration_ms else 0,
)
)
# 2026-04-07 Claude Code: Sprint 4 B3 — 記錄人工批准處置類型
try:
anomaly_key = await self._get_anomaly_key_from_approval(approval)
if anomaly_key:
from src.services.anomaly_counter import get_anomaly_counter
counter = get_anomaly_counter()
await counter.record_disposition(anomaly_key, "human_approved")
except Exception as _disp_e:
logger.warning("disposition_record_failed", error=str(_disp_e))
else:
logger.error(
"background_execution_failed",
approval_id=str(approval.id),
operation=operation_type.value,
target=resource_name,
namespace=namespace,
error=result.error,
)
await timeline.add_event(
event_type="exec",
status="error",
title=f"❌ K8s 執行失敗: {operation_type.value}",
description=f"Error: {result.error}",
actor="leWOOOgo",
actor_role="executor",
approval_id=str(approval.id),
)
# Phase 6: 發送失敗通知 (fire-and-forget, 包含 Dry-Run 攔截)
exec_status = (
ExecutionStatus.DRY_RUN_BLOCKED
if "not found" in (result.error or "")
else ExecutionStatus.FAILED
)
asyncio.create_task(
self._send_execution_notification(
approval=approval,
execution_status=exec_status,
operation_type=operation_type.value,
namespace=namespace,
error_message=result.error,
duration_ms=result.duration_ms,
)
)
# ADR-030 Phase 5: 觸發學習服務 (失敗案例)
asyncio.create_task(
self._trigger_learning(
approval=approval,
success=False,
error_message=result.error,
duration_seconds=result.duration_ms / 1000 if result.duration_ms else 0,
)
)
async def _get_anomaly_key_from_approval(self, approval: ApprovalRequest) -> str | None:
"""
從 approval → incident → anomaly_key。
2026-04-07 Claude Code: I1+S1 Fix — 委託 AnomalyCounter.derive_key_from_incident()
"""
try:
if not approval.incident_id:
return None
from src.services.incident_service import get_incident_service
incident_service = get_incident_service()
incident = await incident_service.get_from_working_memory(approval.incident_id)
if not incident:
return None
from src.services.anomaly_counter import AnomalyCounter
return AnomalyCounter.derive_key_from_incident(incident)
except Exception as e:
logger.warning("get_anomaly_key_from_approval_failed", error=str(e))
return None
async def _trigger_learning(
self,
approval: ApprovalRequest,
success: bool,
duration_seconds: float = 0,
error_message: str | None = None,
) -> None:
"""
ADR-030 Phase 5: 觸發學習服務
處理執行結果,調整信任度和 Playbook 統計
"""
try:
from src.services.learning_service import (
ExecutionResult,
get_learning_service,
)
learning = get_learning_service()
result = ExecutionResult(
approval_id=str(approval.id),
incident_id=approval.incident_id or "",
action=approval.action,
success=success,
error_message=error_message,
duration_seconds=duration_seconds,
)
await learning.process_execution_result(
approval=approval,
result=result,
)
except Exception as e:
# 學習失敗不影響主流程
logger.warning(
"learning_trigger_failed",
approval_id=str(approval.id),
error=str(e),
)
# 2026-04-04 ogt: 執行結果沉澱到 KM — 移出 try/except 確保 learning 失敗也寫入
# 統帥鐵律: 所有異常與自動修復紀錄必須回寫 KM
asyncio.create_task(
self._write_execution_result_to_km(approval, success, error_message)
)
async def _write_execution_result_to_km(
self,
approval: "ApprovalRequest",
success: bool,
error_message: str | None,
) -> None:
"""
執行結果沉澱到 KM (Knowledge Base)
2026-04-04 ogt: 統帥鐵律 — 成功/失敗執行記錄都必須回寫 KM
"""
try:
from src.models.knowledge import EntrySource, EntryType, KnowledgeEntryCreate
from src.services.knowledge_service import get_knowledge_service
status_icon = "" if success else ""
status_text = "成功" if success else f"失敗: {error_message or '未知原因'}"
content = (
f"# {status_icon} 執行記錄: {approval.action[:80]}\n\n"
f"**Approval ID**: {approval.id}\n"
f"**Incident ID**: {approval.incident_id or '未關聯'}\n"
f"**執行結果**: {status_text}\n"
f"**風險等級**: {approval.risk_level.value if approval.risk_level else '未知'}\n\n"
f"## 操作內容\n{approval.description or '無描述'}\n"
)
entry_data = KnowledgeEntryCreate(
title=f"[執行記錄] {status_icon} {approval.action[:60]}",
content=content,
entry_type=EntryType.INCIDENT_CASE,
category="execution_result",
tags=["execution", "auto_repair" if success else "execution_failed"],
source=EntrySource.AI_EXTRACTED,
related_incident_id=approval.incident_id or None,
created_by="approval_execution",
)
await get_knowledge_service().create_entry(entry_data)
logger.debug(
"execution_result_written_to_km",
approval_id=str(approval.id),
success=success,
)
except Exception as e:
logger.warning(
"execution_result_km_write_failed",
approval_id=str(approval.id),
error=str(e),
)
async def _send_execution_notification(
self,
approval: ApprovalRequest,
execution_status: "ExecutionStatus",
operation_type: str,
namespace: str,
duration_ms: int | None = None,
error_message: str | None = None,
) -> None:
"""
Phase 6: 發送執行通知 (Post-Execution Hook)
將執行結果發送至所有已配置的通知頻道 (Discord, Slack, etc.)
"""
from src.services.notifications import (
NotificationMessage,
get_notification_manager,
)
if not settings.NOTIFICATION_ENABLED:
logger.info("notification_disabled", approval_id=str(approval.id))
return
try:
# 建構簽核者列表
signers = [
{"name": sig.signer_name, "comment": sig.comment or ""}
for sig in approval.signatures
]
# 建構通知訊息
message = NotificationMessage(
execution_status=execution_status,
action_title=approval.action[:100],
action_description=approval.description[:200] if approval.description else "",
approval_id=str(approval.id),
signers=signers,
required_signatures=approval.required_signatures,
affected_pods=approval.blast_radius.affected_pods if approval.blast_radius else 0,
estimated_downtime=approval.blast_radius.estimated_downtime if approval.blast_radius else "N/A",
related_services=approval.blast_radius.related_services if approval.blast_radius else [],
data_impact=approval.blast_radius.data_impact.value if approval.blast_radius else "none",
namespace=namespace,
operation_type=operation_type,
duration_ms=duration_ms,
error_message=error_message,
risk_level=approval.risk_level.value,
ai_provider=approval.requested_by,
)
# 發送通知
manager = get_notification_manager()
results = await manager.send_all(message)
for result in results:
logger.info(
"notification_result",
approval_id=str(approval.id),
provider=result.provider,
status=result.status.value,
message=result.message,
)
except Exception as e:
logger.exception(
"notification_failed",
approval_id=str(approval.id),
error=str(e),
)
async def _trigger_playbook_extraction(
self,
approval: ApprovalRequest,
) -> None:
"""
Phase 7.6: 觸發 Playbook 自動萃取
條件:
- 執行成功
- 關聯的 Incident 狀態為 RESOLVED 或 CLOSED
- effectiveness_score >= 4
此函數為 fire-and-forget失敗不影響主流程
"""
try:
# 1. 從 approval.incident_id 直接取得 (Phase 26 修復)
# 原本靠 regex 掃文字找 INC- 前綴,中文 action 完全找不到
incident_id = getattr(approval, "incident_id", None)
if not incident_id:
# Fallback: 嘗試文字解析 (向後兼容舊資料)
incident_id = self._extract_incident_id_from_approval(approval)
if not incident_id:
logger.info(
"playbook_extraction_skipped",
approval_id=str(approval.id),
reason="No incident_id found in approval.incident_id or text",
)
return
# 2. 取得 Incident
from src.services.incident_service import get_incident_service
incident_service = get_incident_service()
incident = await incident_service.get_incident(incident_id)
if not incident:
logger.info(
"playbook_extraction_skipped",
approval_id=str(approval.id),
incident_id=incident_id,
reason="Incident not found",
)
return
# 3. 執行成功後自動設定 outcome (冷啟動關鍵)
# 2026-04-04 ogt: 首席架構師 Review — 補上 execution_success + effectiveness_score
# 確保 Playbook 萃取前置條件能成立,不再依賴人工填分
from src.models.incident import IncidentOutcome, IncidentStatus
from src.utils.timezone import now_taipei
if incident.outcome is None:
incident.outcome = IncidentOutcome()
if not incident.outcome.execution_success:
incident.outcome.execution_success = True
if incident.outcome.effectiveness_score is None or incident.outcome.effectiveness_score < 4:
incident.outcome.effectiveness_score = 4 # 系統判斷K8s 執行成功 = 有效
if incident.status not in [IncidentStatus.RESOLVED, IncidentStatus.CLOSED]:
incident.status = IncidentStatus.RESOLVED
incident.resolved_at = now_taipei()
# 回存 Incidentfire-and-forget 路徑,失敗不影響主流程)
await incident_service.save_to_working_memory(incident)
logger.info(
"playbook_extraction_incident_updated",
approval_id=str(approval.id),
incident_id=incident_id,
effectiveness_score=incident.outcome.effectiveness_score,
status=incident.status.value,
)
# 4. 觸發萃取effectiveness 已保證 >= 4
from src.services.playbook_service import get_playbook_service
playbook_service = get_playbook_service()
effectiveness = incident.outcome.effectiveness_score or 4
playbook = await playbook_service.extract_from_incident(
incident=incident,
auto_approve=effectiveness >= 5, # 滿分自動核准
)
if playbook:
logger.info(
"playbook_auto_extracted",
approval_id=str(approval.id),
incident_id=incident_id,
playbook_id=playbook.playbook_id,
playbook_name=playbook.name,
auto_approved=playbook.status.value == "approved",
)
else:
logger.debug(
"playbook_extraction_no_result",
approval_id=str(approval.id),
incident_id=incident_id,
)
except Exception as e:
# 萃取失敗不影響主流程
logger.warning(
"playbook_extraction_error",
approval_id=str(approval.id),
error=str(e),
)
def _extract_incident_id_from_approval(
self,
approval: ApprovalRequest,
) -> str | None:
"""
從 approval 提取關聯的 incident_id
嘗試以下來源:
1. approval.metadata (如果有)
2. approval.description 中的 INC- 模式
3. approval.requested_by 中的 incident 資訊
"""
import re
# 從 description 或 action 中尋找 INC-XXXXXX 模式
text = f"{approval.description or ''} {approval.action or ''}"
match = re.search(r"INC-([A-Z0-9-]+)", text)
if match:
return match.group(0) # 返回完整的 INC-XXXXX
# 從 requested_by 尋找
if approval.requested_by and "INC-" in approval.requested_by:
match = re.search(r"INC-([A-Z0-9-]+)", approval.requested_by)
if match:
return match.group(0)
return None
# =============================================================================
# Singleton Instance
# =============================================================================
_execution_service: ApprovalExecutionService | None = None
def get_execution_service() -> ApprovalExecutionService:
"""
取得 ApprovalExecutionService 單例
Returns:
ApprovalExecutionService: 執行服務實例
"""
global _execution_service
if _execution_service is None:
_execution_service = ApprovalExecutionService()
return _execution_service