feat(api): Phase 16 R4.2 抽取 ApprovalExecutionService

Strangler Fig Pattern: 從 approvals.py 抽取執行編排邏輯

新增:
- src/services/approval_execution.py (271 行)
- ApprovalExecutionService class
- 整合 OperationParser + Executor + Timeline + Notifications

瘦身成果:
- approvals.py: 1097 → 787 行 (-310 行)
- R4 總計: 移除 310 行內嵌業務邏輯

CI/CD 修復:
- 移除危險的 rm -f ~/actions-runner-* 指令
- 改用 checkout clean: true + workspace 內清理

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
OG T
2026-03-25 22:04:15 +08:00
parent aefd351e20
commit 716b94f60a
4 changed files with 281 additions and 222 deletions

View File

@@ -55,12 +55,6 @@ jobs:
runs-on: [self-hosted, harbor, k8s]
timeout-minutes: 1
steps:
# Phase 16: 清理 Runner diag logs 避免 "file already exists" 錯誤
- name: Clean runner diag logs
run: |
rm -f ~/actions-runner-awoooi/_diag/pages/*.log 2>/dev/null || true
rm -f ~/actions-runner-awoooi-2/_diag/pages/*.log 2>/dev/null || true
- name: "Check Required Secrets"
run: |
MISSING=""

View File

@@ -31,12 +31,6 @@ jobs:
runs-on: [self-hosted, harbor, k8s]
timeout-minutes: 1
steps:
# Phase 16: 清理 Runner diag logs 避免 "file already exists" 錯誤
- name: Clean runner diag logs
run: |
rm -f ~/actions-runner-awoooi/_diag/pages/*.log 2>/dev/null || true
rm -f ~/actions-runner-awoooi-2/_diag/pages/*.log 2>/dev/null || true
- name: Quick sanity check
run: |
echo "✅ Runner 可用"
@@ -50,13 +44,14 @@ jobs:
needs: pre-flight
timeout-minutes: 10
steps:
- name: Clean worktrees
run: rm -rf .claude/worktrees 2>/dev/null || true
- uses: actions/checkout@v4
with:
clean: true
# 清理 worktrees (在 checkout 後,確保在 workspace 內)
- name: Clean worktrees
run: rm -rf ${{ github.workspace }}/.claude/worktrees 2>/dev/null || true
- name: Setup pnpm
uses: pnpm/action-setup@v3
with:

View File

@@ -21,12 +21,8 @@ Endpoints:
"""
import asyncio
from typing import TYPE_CHECKING
from uuid import UUID
if TYPE_CHECKING:
from src.services.notifications import ExecutionStatus
from fastapi import (
APIRouter,
BackgroundTasks,
@@ -54,8 +50,8 @@ from src.models.approval import (
SignResponse,
)
from src.services.approval_db import get_approval_service, get_timeline_service
from src.services.approval_execution import get_execution_service
from src.services.executor import get_executor
from src.services.operation_parser import parse_operation_from_action
from src.services.proposal_service import get_proposal_service
router = APIRouter(prefix="/approvals", tags=["HITL Approvals"])
@@ -161,209 +157,10 @@ async def test_k8s_connection() -> dict:
# =============================================================================
# Background Execution Helper
# Phase 16 R4: parse_operation_from_action 已抽取至 src/services/operation_parser.py
# Phase 16 R4.2: 執行邏輯已抽取至 src/services/approval_execution.py
# =============================================================================
async def execute_approved_action(approval: ApprovalRequest) -> None:
"""
背景執行已批准的操作
此函數由 BackgroundTasks 呼叫,不阻塞 API 回應
Phase 5: 執行後更新資料庫狀態
Phase 6: 執行後發送通知 (Post-Execution Hook)
"""
from src.services.notifications import (
ExecutionStatus,
)
logger.info(
"background_execution_start",
approval_id=str(approval.id),
action=approval.action,
)
service = get_approval_service()
timeline = get_timeline_service()
# Parse operation details (Phase 16 R4: 使用新的 ParsedOperation dataclass)
parsed = parse_operation_from_action(approval.action)
operation_type = parsed.operation_type
resource_name = parsed.resource_name
namespace = parsed.namespace
if operation_type is None or resource_name is None:
logger.warning(
"background_execution_skip",
approval_id=str(approval.id),
reason="Could not parse operation type from action",
action=approval.action,
)
# Phase 5: 更新資料庫狀態
await service.update_execution_status(approval.id, success=False)
await timeline.add_event(
event_type="exec",
status="error",
title="執行失敗: 無法解析操作類型",
description=f"Action: {approval.action}",
actor="leWOOOgo",
actor_role="executor",
approval_id=str(approval.id),
)
# Phase 6: 發送失敗通知 (fire-and-forget, 不阻塞執行緒)
asyncio.create_task(_send_execution_notification(
approval=approval,
execution_status=ExecutionStatus.FAILED,
operation_type="unknown",
namespace=namespace,
error_message="Could not parse operation type",
))
return
# Execute with audit
executor = get_executor()
result = await executor.execute_with_audit(
approval=approval,
operation_type=operation_type,
resource_name=resource_name,
namespace=namespace,
)
# Phase 5: 更新資料庫狀態
await service.update_execution_status(approval.id, success=result.success)
# Update approval status based on result
if result.success:
logger.info(
"background_execution_success",
approval_id=str(approval.id),
operation=operation_type.value,
target=resource_name,
namespace=namespace,
duration_ms=result.duration_ms,
)
await timeline.add_event(
event_type="exec",
status="success",
title=f"✅ K8s 執行成功: {operation_type.value}",
description=f"Target: {resource_name} @ {namespace} ({result.duration_ms}ms)",
actor="leWOOOgo",
actor_role="executor",
approval_id=str(approval.id),
)
# Phase 6: 發送成功通知 (fire-and-forget, 不阻塞執行緒)
asyncio.create_task(_send_execution_notification(
approval=approval,
execution_status=ExecutionStatus.SUCCESS,
operation_type=operation_type.value,
namespace=namespace,
duration_ms=result.duration_ms,
))
else:
logger.error(
"background_execution_failed",
approval_id=str(approval.id),
operation=operation_type.value,
target=resource_name,
namespace=namespace,
error=result.error,
)
await timeline.add_event(
event_type="exec",
status="error",
title=f"❌ K8s 執行失敗: {operation_type.value}",
description=f"Error: {result.error}",
actor="leWOOOgo",
actor_role="executor",
approval_id=str(approval.id),
)
# Phase 6: 發送失敗通知 (fire-and-forget, 包含 Dry-Run 攔截)
exec_status = ExecutionStatus.DRY_RUN_BLOCKED if "not found" in (result.error or "") else ExecutionStatus.FAILED
asyncio.create_task(_send_execution_notification(
approval=approval,
execution_status=exec_status,
operation_type=operation_type.value,
namespace=namespace,
error_message=result.error,
duration_ms=result.duration_ms,
))
async def _send_execution_notification(
approval: ApprovalRequest,
execution_status: "ExecutionStatus",
operation_type: str,
namespace: str,
duration_ms: int | None = None,
error_message: str | None = None,
) -> None:
"""
Phase 6: 發送執行通知 (Post-Execution Hook)
將執行結果發送至所有已配置的通知頻道 (Discord, Slack, etc.)
"""
from src.core.config import settings
from src.services.notifications import (
NotificationMessage,
get_notification_manager,
)
if not settings.NOTIFICATION_ENABLED:
logger.info("notification_disabled", approval_id=str(approval.id))
return
try:
# 建構簽核者列表
signers = [
{"name": sig.signer_name, "comment": sig.comment or ""}
for sig in approval.signatures
]
# 建構通知訊息
message = NotificationMessage(
execution_status=execution_status,
action_title=approval.action[:100],
action_description=approval.description[:200] if approval.description else "",
approval_id=str(approval.id),
signers=signers,
required_signatures=approval.required_signatures,
affected_pods=approval.blast_radius.affected_pods if approval.blast_radius else 0,
estimated_downtime=approval.blast_radius.estimated_downtime if approval.blast_radius else "N/A",
related_services=approval.blast_radius.related_services if approval.blast_radius else [],
data_impact=approval.blast_radius.data_impact.value if approval.blast_radius else "none",
namespace=namespace,
operation_type=operation_type,
duration_ms=duration_ms,
error_message=error_message,
risk_level=approval.risk_level.value,
ai_provider=approval.requested_by,
)
# 發送通知
manager = get_notification_manager()
results = await manager.send_all(message)
for result in results:
logger.info(
"notification_result",
approval_id=str(approval.id),
provider=result.provider,
status=result.status.value,
message=result.message,
)
except Exception as e:
logger.exception(
"notification_failed",
approval_id=str(approval.id),
error=str(e),
)
# =============================================================================
# GET /api/v1/approvals/pending
# =============================================================================
@@ -557,7 +354,8 @@ async def sign_approval(
approval_id=str(approval_id),
)
background_tasks.add_task(execute_approved_action, approval)
execution_svc = get_execution_service()
background_tasks.add_task(execution_svc.execute_approved_action, approval)
# Phase 6.5: 更新關聯的 Incident 狀態為 RESOLVED
# 🔴 關鍵: 這是審核後 Incident 狀態更新的核心邏輯
@@ -813,7 +611,8 @@ async def bulk_approve(
# 如果觸發執行,加入背景任務
if execution_triggered:
background_tasks.add_task(execute_approved_action, signed_approval)
bulk_execution_svc = get_execution_service()
background_tasks.add_task(bulk_execution_svc.execute_approved_action, signed_approval)
# 更新關聯的 Incident 狀態
incident_id = signed_approval.metadata.get("incident_id") if signed_approval.metadata else None

View File

@@ -0,0 +1,271 @@
"""
Approval Execution Service - Phase 16 R4.2 瘦身 Router 抽取
============================================================
從 approvals.py 抽取執行編排邏輯,整合:
- OperationParser: 解析操作類型
- K8s Executor: 執行 K8s 操作
- ApprovalDBService: 更新狀態
- TimelineService: 記錄事件
- NotificationManager: 發送通知
版本: v1.0
建立: 2026-03-25 (台北時區)
建立者: Claude Code (Phase 16 R4.2)
"""
import asyncio
from typing import TYPE_CHECKING
import structlog
from src.core.config import settings
from src.models.approval import ApprovalRequest
from src.services.approval_db import get_approval_service, get_timeline_service
from src.services.executor import get_executor
from src.services.operation_parser import parse_operation_from_action
if TYPE_CHECKING:
from src.services.notifications import ExecutionStatus
logger = structlog.get_logger(__name__)
class ApprovalExecutionService:
"""
授權執行服務 - 編排整個執行流程
職責:
1. 解析操作類型
2. 呼叫 K8s Executor 執行
3. 更新資料庫狀態
4. 記錄 Timeline 事件
5. 發送通知
"""
async def execute_approved_action(self, approval: ApprovalRequest) -> None:
"""
背景執行已批准的操作
此函數由 BackgroundTasks 呼叫,不阻塞 API 回應
Phase 5: 執行後更新資料庫狀態
Phase 6: 執行後發送通知 (Post-Execution Hook)
Args:
approval: 已批准的授權請求
"""
from src.services.notifications import ExecutionStatus
logger.info(
"background_execution_start",
approval_id=str(approval.id),
action=approval.action,
)
service = get_approval_service()
timeline = get_timeline_service()
# Parse operation details
parsed = parse_operation_from_action(approval.action)
operation_type = parsed.operation_type
resource_name = parsed.resource_name
namespace = parsed.namespace
if operation_type is None or resource_name is None:
logger.warning(
"background_execution_skip",
approval_id=str(approval.id),
reason="Could not parse operation type from action",
action=approval.action,
)
# Phase 5: 更新資料庫狀態
await service.update_execution_status(approval.id, success=False)
await timeline.add_event(
event_type="exec",
status="error",
title="執行失敗: 無法解析操作類型",
description=f"Action: {approval.action}",
actor="leWOOOgo",
actor_role="executor",
approval_id=str(approval.id),
)
# Phase 6: 發送失敗通知 (fire-and-forget)
asyncio.create_task(
self._send_execution_notification(
approval=approval,
execution_status=ExecutionStatus.FAILED,
operation_type="unknown",
namespace=namespace,
error_message="Could not parse operation type",
)
)
return
# Execute with audit
executor = get_executor()
result = await executor.execute_with_audit(
approval=approval,
operation_type=operation_type,
resource_name=resource_name,
namespace=namespace,
)
# Phase 5: 更新資料庫狀態
await service.update_execution_status(approval.id, success=result.success)
# Update approval status based on result
if result.success:
logger.info(
"background_execution_success",
approval_id=str(approval.id),
operation=operation_type.value,
target=resource_name,
namespace=namespace,
duration_ms=result.duration_ms,
)
await timeline.add_event(
event_type="exec",
status="success",
title=f"✅ K8s 執行成功: {operation_type.value}",
description=f"Target: {resource_name} @ {namespace} ({result.duration_ms}ms)",
actor="leWOOOgo",
actor_role="executor",
approval_id=str(approval.id),
)
# Phase 6: 發送成功通知 (fire-and-forget)
asyncio.create_task(
self._send_execution_notification(
approval=approval,
execution_status=ExecutionStatus.SUCCESS,
operation_type=operation_type.value,
namespace=namespace,
duration_ms=result.duration_ms,
)
)
else:
logger.error(
"background_execution_failed",
approval_id=str(approval.id),
operation=operation_type.value,
target=resource_name,
namespace=namespace,
error=result.error,
)
await timeline.add_event(
event_type="exec",
status="error",
title=f"❌ K8s 執行失敗: {operation_type.value}",
description=f"Error: {result.error}",
actor="leWOOOgo",
actor_role="executor",
approval_id=str(approval.id),
)
# Phase 6: 發送失敗通知 (fire-and-forget, 包含 Dry-Run 攔截)
exec_status = (
ExecutionStatus.DRY_RUN_BLOCKED
if "not found" in (result.error or "")
else ExecutionStatus.FAILED
)
asyncio.create_task(
self._send_execution_notification(
approval=approval,
execution_status=exec_status,
operation_type=operation_type.value,
namespace=namespace,
error_message=result.error,
duration_ms=result.duration_ms,
)
)
async def _send_execution_notification(
self,
approval: ApprovalRequest,
execution_status: "ExecutionStatus",
operation_type: str,
namespace: str,
duration_ms: int | None = None,
error_message: str | None = None,
) -> None:
"""
Phase 6: 發送執行通知 (Post-Execution Hook)
將執行結果發送至所有已配置的通知頻道 (Discord, Slack, etc.)
"""
from src.services.notifications import (
NotificationMessage,
get_notification_manager,
)
if not settings.NOTIFICATION_ENABLED:
logger.info("notification_disabled", approval_id=str(approval.id))
return
try:
# 建構簽核者列表
signers = [
{"name": sig.signer_name, "comment": sig.comment or ""}
for sig in approval.signatures
]
# 建構通知訊息
message = NotificationMessage(
execution_status=execution_status,
action_title=approval.action[:100],
action_description=approval.description[:200] if approval.description else "",
approval_id=str(approval.id),
signers=signers,
required_signatures=approval.required_signatures,
affected_pods=approval.blast_radius.affected_pods if approval.blast_radius else 0,
estimated_downtime=approval.blast_radius.estimated_downtime if approval.blast_radius else "N/A",
related_services=approval.blast_radius.related_services if approval.blast_radius else [],
data_impact=approval.blast_radius.data_impact.value if approval.blast_radius else "none",
namespace=namespace,
operation_type=operation_type,
duration_ms=duration_ms,
error_message=error_message,
risk_level=approval.risk_level.value,
ai_provider=approval.requested_by,
)
# 發送通知
manager = get_notification_manager()
results = await manager.send_all(message)
for result in results:
logger.info(
"notification_result",
approval_id=str(approval.id),
provider=result.provider,
status=result.status.value,
message=result.message,
)
except Exception as e:
logger.exception(
"notification_failed",
approval_id=str(approval.id),
error=str(e),
)
# =============================================================================
# Singleton Instance
# =============================================================================
_execution_service: ApprovalExecutionService | None = None
def get_execution_service() -> ApprovalExecutionService:
"""
取得 ApprovalExecutionService 單例
Returns:
ApprovalExecutionService: 執行服務實例
"""
global _execution_service
if _execution_service is None:
_execution_service = ApprovalExecutionService()
return _execution_service