Files
awoooi/apps/api/src/services/approval_execution.py
OG T fb1bbd0e20
Some checks failed
CD Pipeline / build-and-deploy (push) Has been cancelled
feat(Phase 3): 學習閉環補完 — Root cause 3 + 診斷 feedback + 知識遺忘 + Fine-tune 管線
- approval_execution.py: _run_post_execution_verify() 補接 record_verification_result()
  Root cause 3 終結:環境驗證結果(success/degraded/failed/timeout)不再孤立
- learning_service.py: 新增 record_verification_result() — 驗證結果 → Redis + Playbook EWMA
- learning_service.py: 新增 record_diagnosis_outcome() — 誤診負向訊號回寫(L3×D4)
- jobs/knowledge_decay_job.py: 新建 30d 知識遺忘 Job(未引用 draft/review → archived)
- services/finetune_exporter.py: 新建每週 JSONL 匯出(EvidenceSnapshot × AgentSession)
- main.py: 掛載 knowledge_decay_loop(24h)+ finetune_export_loop(7d)
- MASTER §8: Phase 3 核心改造項全部落地記錄

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-15 20:57:43 +08:00

917 lines
35 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Approval Execution Service - Phase 16 R4.2 瘦身 Router 抽取
============================================================
從 approvals.py 抽取執行編排邏輯,整合:
- OperationParser: 解析操作類型
- K8s Executor: 執行 K8s 操作
- ApprovalDBService: 更新狀態
- TimelineService: 記錄事件
- NotificationManager: 發送通知
- Phase 7.6: Playbook 自動萃取
版本: v1.2
建立: 2026-03-25 (台北時區)
更新: 2026-03-26 (Phase 7.6 自動萃取)
更新: 2026-04-14 (ADR-076 Task 3: 執行失敗重試機制 — Claude Haiku 4.5 Asia/Taipei)
建立者: Claude Code (Phase 16 R4.2)
重試設計 (ADR-076):
- MAX_RETRY = 2 次(共最多 3 次嘗試)
- RETRY_DELAY_SECONDS = 30 秒
- 只重試瞬態錯誤connection refused, timeout, i/o error 等)
- 永久性錯誤not found, permission denied, already exists不重試
"""
import asyncio
from typing import TYPE_CHECKING
import structlog
from src.core.config import settings
from src.models.approval import ApprovalRequest
from src.services.approval_db import get_approval_service, get_timeline_service
from src.services.executor import get_executor
from src.services.operation_parser import parse_operation_from_action
if TYPE_CHECKING:
from src.services.notifications import ExecutionStatus
logger = structlog.get_logger(__name__)
class ApprovalExecutionService:
"""
授權執行服務 - 編排整個執行流程
職責:
1. 解析操作類型
2. 呼叫 K8s Executor 執行(含重試)
3. 更新資料庫狀態
4. 記錄 Timeline 事件
5. 發送通知
"""
# ADR-076 Task 3: 重試常數
MAX_RETRY: int = 2
RETRY_DELAY_SECONDS: int = 30
# 瞬態錯誤關鍵字(小寫比對),符合任一 → 可重試
_TRANSIENT_ERROR_KEYWORDS: tuple[str, ...] = (
"connection refused",
"connection reset",
"timeout",
"timed out",
"i/o error",
"io error",
"temporary failure",
"service unavailable",
"too many requests",
"dial tcp",
"eof",
)
# 永久性錯誤關鍵字(小寫比對),符合任一 → 不重試
_PERMANENT_ERROR_KEYWORDS: tuple[str, ...] = (
"not found",
"forbidden",
"permission denied",
"unauthorized",
"already exists",
"invalid",
"immutable",
"destructive",
"blocked",
)
@classmethod
def _is_transient_error(cls, error_message: str | None) -> bool:
"""
判斷執行錯誤是否為瞬態(可重試)
優先檢查永久性錯誤(比瞬態錯誤有更高的優先順序),
避免 "connection refused (not found)" 這類混合訊息誤判。
Args:
error_message: 執行錯誤訊息
Returns:
True 表示可重試False 表示永久失敗
"""
if not error_message:
return False
lower = error_message.lower()
# 永久性錯誤 → 不重試
if any(kw in lower for kw in cls._PERMANENT_ERROR_KEYWORDS):
return False
# 瞬態錯誤 → 可重試
return any(kw in lower for kw in cls._TRANSIENT_ERROR_KEYWORDS)
async def execute_approved_action(self, approval: ApprovalRequest) -> None:
"""
背景執行已批准的操作
此函數由 BackgroundTasks 呼叫,不阻塞 API 回應
Phase 5: 執行後更新資料庫狀態
Phase 6: 執行後發送通知 (Post-Execution Hook)
Args:
approval: 已批准的授權請求
"""
from src.services.notifications import ExecutionStatus
logger.info(
"background_execution_start",
approval_id=str(approval.id),
action=approval.action,
)
service = get_approval_service()
timeline = get_timeline_service()
# Parse operation details
parsed = parse_operation_from_action(approval.action)
operation_type = parsed.operation_type
resource_name = parsed.resource_name
namespace = parsed.namespace
if operation_type is None or resource_name is None:
logger.warning(
"background_execution_skip",
approval_id=str(approval.id),
reason="Could not parse operation type from action",
action=approval.action,
)
# Phase 5: 更新資料庫狀態
await service.update_execution_status(approval.id, success=False)
await timeline.add_event(
event_type="exec",
status="error",
title="執行失敗: 無法解析操作類型",
description=f"Action: {approval.action}",
actor="leWOOOgo",
actor_role="executor",
approval_id=str(approval.id),
)
# Phase 6: 發送失敗通知 (fire-and-forget)
asyncio.create_task(
self._send_execution_notification(
approval=approval,
execution_status=ExecutionStatus.FAILED,
operation_type="unknown",
namespace=namespace,
error_message="Could not parse operation type",
)
)
return
# ADR-076 Task 3: 執行失敗重試機制
# 瞬態錯誤 (connection refused, timeout 等) 自動重試,最多 MAX_RETRY 次
executor = get_executor()
result = await executor.execute_with_audit(
approval=approval,
operation_type=operation_type,
resource_name=resource_name,
namespace=namespace,
)
attempt = 1
while not result.success and attempt <= self.MAX_RETRY:
if not self._is_transient_error(result.error):
logger.info(
"execution_retry_skipped_permanent_error",
approval_id=str(approval.id),
attempt=attempt,
error=result.error,
)
break
logger.warning(
"execution_retry_transient_error",
approval_id=str(approval.id),
attempt=attempt,
max_retry=self.MAX_RETRY,
error=result.error,
delay_seconds=self.RETRY_DELAY_SECONDS,
)
await timeline.add_event(
event_type="exec",
status="warning",
title=f"⚠️ 執行失敗,{self.RETRY_DELAY_SECONDS}s 後重試 ({attempt}/{self.MAX_RETRY})",
description=f"Error: {result.error}",
actor="leWOOOgo",
actor_role="executor",
approval_id=str(approval.id),
)
await asyncio.sleep(self.RETRY_DELAY_SECONDS)
result = await executor.execute_with_audit(
approval=approval,
operation_type=operation_type,
resource_name=resource_name,
namespace=namespace,
)
attempt += 1
# Phase 5: 更新資料庫狀態
await service.update_execution_status(approval.id, success=result.success)
# Update approval status based on result
total_attempts = attempt # attempt 在重試迴圈後為最終嘗試次數
if result.success:
logger.info(
"background_execution_success",
approval_id=str(approval.id),
operation=operation_type.value,
target=resource_name,
namespace=namespace,
duration_ms=result.duration_ms,
total_attempts=total_attempts,
)
retry_note = f" (重試 {total_attempts - 1} 次後成功)" if total_attempts > 1 else ""
await timeline.add_event(
event_type="exec",
status="success",
title=f"✅ K8s 執行成功: {operation_type.value}{retry_note}",
description=f"Target: {resource_name} @ {namespace} ({result.duration_ms}ms)",
actor="leWOOOgo",
actor_role="executor",
approval_id=str(approval.id),
)
# Phase 6: 發送成功通知 (fire-and-forget)
asyncio.create_task(
self._send_execution_notification(
approval=approval,
execution_status=ExecutionStatus.SUCCESS,
operation_type=operation_type.value,
namespace=namespace,
duration_ms=result.duration_ms,
)
)
# 2026-04-14 Claude Sonnet 4.6: reply_to 原告警卡片顯示執行結果
# auto_approve 路徑由 _push_auto_repair_result 處理,此處僅處理人工批准
asyncio.create_task(
self._push_execution_result_to_alert(approval, success=True, error=None)
)
# Phase 7.6: 觸發 Playbook 自動萃取 (fire-and-forget)
asyncio.create_task(
self._trigger_playbook_extraction(approval)
)
# ADR-030 Phase 5 / ADR-083 Phase 3: 觸發學習服務
# Phase 3 修復:移除 fire-and-forget改用 await + 30s 熔斷
# 超時 → 記錄 metric主流程繼續不 crash
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 3 fire-and-forget 修復
try:
await asyncio.wait_for(
self._trigger_learning(
approval=approval,
success=True,
duration_seconds=result.duration_ms / 1000 if result.duration_ms else 0,
),
timeout=30.0,
)
except asyncio.TimeoutError:
logger.warning(
"learning_trigger_timeout",
approval_id=str(approval.id),
timeout_sec=30.0,
)
# ADR-081 Phase 1: 執行後驗證 (fire-and-forget)
# PostExecutionVerifier 等待 K8s 收斂後抓取後狀態,補填 EvidenceSnapshot
from src.core.feature_flags import aiops_flags
if aiops_flags.is_sub_flag_enabled("AIOPS_P1_POST_EXECUTION_VERIFIER"):
asyncio.create_task(
self._run_post_execution_verify(
approval=approval,
action_taken=f"{operation_type.value}:{resource_name}",
)
)
# 2026-04-07 Claude Code: Sprint 4 B3 — 記錄人工批准處置類型
try:
anomaly_key = await self._get_anomaly_key_from_approval(approval)
if anomaly_key:
from src.services.anomaly_counter import get_anomaly_counter
counter = get_anomaly_counter()
await counter.record_disposition(anomaly_key, "human_approved")
except Exception as _disp_e:
logger.warning("disposition_record_failed", error=str(_disp_e))
# ADR-073 修補: 執行成功 → 解決 Incident → 觸發 KM 轉換
# 之前 RESOLVED 從未被呼叫,導致 KM 永遠不生成、Playbook 永遠是 0
if approval.incident_id:
try:
from src.services.incident_service import get_incident_service
_inc_svc = get_incident_service()
await _inc_svc.resolve_incident(approval.incident_id)
logger.info(
"incident_resolved_after_execution",
incident_id=approval.incident_id,
approval_id=str(approval.id),
)
except Exception as _resolve_e:
logger.warning("incident_resolve_after_execution_failed", error=str(_resolve_e))
else:
logger.error(
"background_execution_failed",
approval_id=str(approval.id),
operation=operation_type.value,
target=resource_name,
namespace=namespace,
error=result.error,
)
await timeline.add_event(
event_type="exec",
status="error",
title=f"❌ K8s 執行失敗: {operation_type.value}",
description=f"Error: {result.error}",
actor="leWOOOgo",
actor_role="executor",
approval_id=str(approval.id),
)
# Phase 6: 發送失敗通知 (fire-and-forget, 包含 Dry-Run 攔截)
exec_status = (
ExecutionStatus.DRY_RUN_BLOCKED
if "not found" in (result.error or "")
else ExecutionStatus.FAILED
)
asyncio.create_task(
self._send_execution_notification(
approval=approval,
execution_status=exec_status,
operation_type=operation_type.value,
namespace=namespace,
error_message=result.error,
duration_ms=result.duration_ms,
)
)
# 2026-04-14 Claude Sonnet 4.6: reply_to 原告警卡片顯示失敗結果
asyncio.create_task(
self._push_execution_result_to_alert(
approval, success=False, error=result.error
)
)
# ADR-030 Phase 5 / ADR-083 Phase 3: 觸發學習服務(失敗案例)
# Phase 3 修復fire-and-forget → await + 30s 熔斷
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 3 fire-and-forget 修復
try:
await asyncio.wait_for(
self._trigger_learning(
approval=approval,
success=False,
error_message=result.error,
duration_seconds=result.duration_ms / 1000 if result.duration_ms else 0,
),
timeout=30.0,
)
except asyncio.TimeoutError:
logger.warning(
"learning_trigger_timeout",
approval_id=str(approval.id),
timeout_sec=30.0,
)
async def _push_execution_result_to_alert(
self,
approval: ApprovalRequest,
success: bool,
error: str | None,
) -> None:
"""
執行結果回覆到原告警 Telegram 卡片reply_to_message_id
2026-04-14 Claude Sonnet 4.6 實裝:
- 人工路徑:人類在 Telegram 點批准後,等執行完成,在原告警下 reply 執行結果
- 自動路徑 (requested_by=auto_approve) 由 _push_auto_repair_result 處理,此處 skip
透過 Redis tg_msg:{incident_id} 查原告警 message_id找不到則靜默不發。
"""
try:
# 自動執行路徑 skip避免與 _push_auto_repair_result 重複發訊息)
if (approval.requested_by or "").lower() == "auto_approve":
return
if not approval.incident_id:
return
from src.core.redis_client import get_redis
redis = get_redis()
msg_id_raw = await redis.get(f"tg_msg:{approval.incident_id}")
if not msg_id_raw:
logger.debug(
"push_execution_result_no_msg_id",
incident_id=approval.incident_id,
approval_id=str(approval.id),
)
return
try:
orig_msg_id = int(msg_id_raw)
except (TypeError, ValueError):
return
from src.core.config import get_settings
from src.services.telegram_gateway import get_telegram_gateway
settings = get_settings()
gateway = get_telegram_gateway()
if success:
text = f"✅ <b>執行成功</b>\n<code>{(approval.action or '')[:180]}</code>"
else:
err_short = (error or "未知錯誤")[:150]
text = f"❌ <b>執行失敗</b>\n<code>{(approval.action or '')[:180]}</code>\n原因: {err_short}"
await gateway._http_client.post(
f"https://api.telegram.org/bot{settings.OPENCLAW_TG_BOT_TOKEN}/sendMessage",
json={
"chat_id": settings.OPENCLAW_TG_CHAT_ID,
"text": text,
"parse_mode": "HTML",
"reply_to_message_id": orig_msg_id,
},
)
logger.info(
"push_execution_result_sent",
incident_id=approval.incident_id,
approval_id=str(approval.id),
success=success,
orig_msg_id=orig_msg_id,
)
except Exception as e:
logger.warning(
"push_execution_result_failed",
approval_id=str(approval.id),
error=str(e),
)
async def _get_anomaly_key_from_approval(self, approval: ApprovalRequest) -> str | None:
"""
從 approval → incident → anomaly_key。
2026-04-07 Claude Code: I1+S1 Fix — 委託 AnomalyCounter.derive_key_from_incident()
"""
try:
if not approval.incident_id:
return None
from src.services.incident_service import get_incident_service
incident_service = get_incident_service()
incident = await incident_service.get_from_working_memory(approval.incident_id)
if not incident:
return None
from src.services.anomaly_counter import AnomalyCounter
return AnomalyCounter.derive_key_from_incident(incident)
except Exception as e:
logger.warning("get_anomaly_key_from_approval_failed", error=str(e))
return None
async def _trigger_learning(
self,
approval: ApprovalRequest,
success: bool,
duration_seconds: float = 0,
error_message: str | None = None,
) -> None:
"""
ADR-030 Phase 5: 觸發學習服務
處理執行結果,調整信任度和 Playbook 統計
"""
try:
from src.services.learning_service import (
ExecutionResult,
get_learning_service,
)
learning = get_learning_service()
result = ExecutionResult(
approval_id=str(approval.id),
incident_id=approval.incident_id or "",
action=approval.action,
success=success,
error_message=error_message,
duration_seconds=duration_seconds,
)
await learning.process_execution_result(
approval=approval,
result=result,
)
except Exception as e:
# 學習失敗不影響主流程
logger.warning(
"learning_trigger_failed",
approval_id=str(approval.id),
error=str(e),
)
# 2026-04-04 ogt: 執行結果沉澱到 KM — 移出 try/except 確保 learning 失敗也寫入
# 統帥鐵律: 所有異常與自動修復紀錄必須回寫 KM
asyncio.create_task(
self._write_execution_result_to_km(approval, success, error_message)
)
async def _run_post_execution_verify(
self,
approval: "ApprovalRequest",
action_taken: str,
) -> None:
"""
ADR-081 Phase 1: 執行後驗證 (fire-and-forget 包裝)
1. 從 incident_id 查 Incident
2. 從 incident_evidence 取最新 EvidenceSnapshot
3. 呼叫 PostExecutionVerifier.verify() 補填後狀態 + 驗證結果
4. 結果傳給 learning_service 更新 Playbook trust_scorePhase 3
"""
if not approval.incident_id:
return
try:
from src.services.incident_service import get_incident_service
from src.services.post_execution_verifier import get_post_execution_verifier
from src.services.evidence_snapshot import EvidenceSnapshot
incident_svc = get_incident_service()
incident = await incident_svc.get_incident(approval.incident_id)
if incident is None:
logger.warning(
"post_verify_incident_not_found",
approval_id=str(approval.id),
incident_id=approval.incident_id,
)
return
# 取最新 EvidenceSnapshot若 Phase 1 flag 有啟動才會有)
snapshot = await EvidenceSnapshot.get_latest_snapshot(approval.incident_id)
verifier = get_post_execution_verifier()
verification_result = await verifier.verify(
incident=incident,
snapshot=snapshot,
action_taken=action_taken,
)
logger.info(
"post_verify_complete",
approval_id=str(approval.id),
incident_id=approval.incident_id,
result=verification_result,
action=action_taken,
)
# ADR-083 Phase 3 Root cause 3: 驗證結果接線到學習服務
# 環境驗證Pod Running / 指標恢復)是比執行 exit code 更精確的學習訊號
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太)
try:
from src.services.learning_service import get_learning_service
_matched_pb_id = getattr(approval, "matched_playbook_id", None)
await get_learning_service().record_verification_result(
incident_id=approval.incident_id,
action_taken=action_taken,
verification_result=verification_result,
matched_playbook_id=_matched_pb_id,
)
except Exception as _lerr:
logger.warning(
"post_verify_learning_failed",
approval_id=str(approval.id),
error=str(_lerr),
)
except Exception as _e:
# 驗證失敗不影響執行結果
logger.warning(
"post_verify_failed",
approval_id=str(approval.id),
error=str(_e),
)
async def _write_execution_result_to_km(
self,
approval: "ApprovalRequest",
success: bool,
error_message: str | None,
) -> None:
"""
執行結果沉澱到 KM (Knowledge Base)
2026-04-04 ogt: 統帥鐵律 — 成功/失敗執行記錄都必須回寫 KM
2026-04-14 Claude Sonnet 4.6 (BP-1 B.1 精修): 區分 auto_approve vs 人工路徑,
補齊 alert_category / alertname / affected_services 供 RAG 檢索。
"""
try:
from src.models.knowledge import EntrySource, EntryType, KnowledgeEntryCreate
from src.services.knowledge_service import get_knowledge_service
# 來源辨識B.1 精修)
_is_auto = (approval.requested_by or "").lower() == "auto_approve"
_mode_prefix = "[自動修復]" if _is_auto else "[人工修復]"
_mode_tag = "auto_executed" if _is_auto else "human_approved"
status_icon = "" if success else ""
status_text = "成功" if success else f"失敗: {error_message or '未知原因'}"
_status_tag = "success" if success else "failure"
# 從關聯 Incident 提取豐富元資料
alertname = "unknown"
alert_category = "general"
affected_services: list[str] = []
if approval.incident_id:
try:
from src.services.incident_service import get_incident_service
_inc = await get_incident_service().get_incident(approval.incident_id)
if _inc:
if _inc.signals:
alertname = _inc.signals[0].labels.get("alertname", "unknown") or "unknown"
alert_category = getattr(_inc, "alert_category", "") or "general"
affected_services = list(_inc.affected_services or [])
except Exception as _ie:
logger.debug("km_incident_enrich_failed",
incident_id=approval.incident_id, error=str(_ie))
_services_str = ", ".join(affected_services) if affected_services else "未關聯"
content = (
f"# {status_icon} {_mode_prefix} {alertname}\n\n"
f"**告警名稱**: {alertname}\n"
f"**告警類別**: {alert_category}\n"
f"**受影響服務**: {_services_str}\n"
f"**執行命令**: `{approval.action[:200]}`\n"
f"**執行結果**: {status_text}\n"
f"**風險等級**: {approval.risk_level.value if approval.risk_level else '未知'}\n"
f"**執行路徑**: {'自動執行 (confidence >= 0.65)' if _is_auto else '人工審核批准'}\n"
f"**Incident ID**: {approval.incident_id or '未關聯'}\n"
f"**Approval ID**: {approval.id}\n\n"
f"## 操作描述\n{approval.description or '無描述'}\n"
)
# Tags: 模式 + 狀態 + 類別(供 RAG 多維度檢索)
tags = [_mode_tag, _status_tag, alert_category, "execution"]
if not success:
tags.append("execution_failed")
entry_data = KnowledgeEntryCreate(
title=f"{_mode_prefix} {alertname}: {approval.action[:50]}",
content=content,
entry_type=EntryType.INCIDENT_CASE,
category=alert_category, # 用真實類別取代硬編 "execution_result"
tags=tags,
source=EntrySource.AI_EXTRACTED,
related_incident_id=approval.incident_id or None,
created_by="auto_execute" if _is_auto else "approval_execution",
)
await get_knowledge_service().create_entry(entry_data)
logger.info(
"execution_result_written_to_km",
approval_id=str(approval.id),
incident_id=approval.incident_id,
alertname=alertname,
alert_category=alert_category,
mode=_mode_tag,
success=success,
)
except Exception as e:
logger.warning(
"execution_result_km_write_failed",
approval_id=str(approval.id),
error=str(e),
)
async def _send_execution_notification(
self,
approval: ApprovalRequest,
execution_status: "ExecutionStatus",
operation_type: str,
namespace: str,
duration_ms: int | None = None,
error_message: str | None = None,
) -> None:
"""
Phase 6: 發送執行通知 (Post-Execution Hook)
將執行結果發送至所有已配置的通知頻道 (Discord, Slack, etc.)
"""
from src.services.notifications import (
NotificationMessage,
get_notification_manager,
)
if not settings.NOTIFICATION_ENABLED:
logger.info("notification_disabled", approval_id=str(approval.id))
return
try:
# 建構簽核者列表
signers = [
{"name": sig.signer_name, "comment": sig.comment or ""}
for sig in approval.signatures
]
# 建構通知訊息
message = NotificationMessage(
execution_status=execution_status,
action_title=approval.action[:100],
action_description=approval.description[:200] if approval.description else "",
approval_id=str(approval.id),
signers=signers,
required_signatures=approval.required_signatures,
affected_pods=approval.blast_radius.affected_pods if approval.blast_radius else 0,
estimated_downtime=approval.blast_radius.estimated_downtime if approval.blast_radius else "N/A",
related_services=approval.blast_radius.related_services if approval.blast_radius else [],
data_impact=approval.blast_radius.data_impact.value if approval.blast_radius else "none",
namespace=namespace,
operation_type=operation_type,
duration_ms=duration_ms,
error_message=error_message,
risk_level=approval.risk_level.value,
ai_provider=approval.requested_by,
)
# 發送通知
manager = get_notification_manager()
results = await manager.send_all(message)
for result in results:
logger.info(
"notification_result",
approval_id=str(approval.id),
provider=result.provider,
status=result.status.value,
message=result.message,
)
except Exception as e:
logger.exception(
"notification_failed",
approval_id=str(approval.id),
error=str(e),
)
async def _trigger_playbook_extraction(
self,
approval: ApprovalRequest,
) -> None:
"""
Phase 7.6: 觸發 Playbook 自動萃取
條件:
- 執行成功
- 關聯的 Incident 狀態為 RESOLVED 或 CLOSED
- effectiveness_score >= 4
此函數為 fire-and-forget失敗不影響主流程
"""
try:
# 1. 從 approval.incident_id 直接取得 (Phase 26 修復)
# 原本靠 regex 掃文字找 INC- 前綴,中文 action 完全找不到
incident_id = getattr(approval, "incident_id", None)
if not incident_id:
# Fallback: 嘗試文字解析 (向後兼容舊資料)
incident_id = self._extract_incident_id_from_approval(approval)
if not incident_id:
logger.info(
"playbook_extraction_skipped",
approval_id=str(approval.id),
reason="No incident_id found in approval.incident_id or text",
)
return
# 2. 取得 Incident
from src.services.incident_service import get_incident_service
incident_service = get_incident_service()
incident = await incident_service.get_incident(incident_id)
if not incident:
logger.info(
"playbook_extraction_skipped",
approval_id=str(approval.id),
incident_id=incident_id,
reason="Incident not found",
)
return
# 3. 執行成功後自動設定 outcome (冷啟動關鍵)
# 2026-04-04 ogt: 首席架構師 Review — 補上 execution_success + effectiveness_score
# 確保 Playbook 萃取前置條件能成立,不再依賴人工填分
from src.models.incident import IncidentOutcome, IncidentStatus
from src.utils.timezone import now_taipei
if incident.outcome is None:
incident.outcome = IncidentOutcome()
if not incident.outcome.execution_success:
incident.outcome.execution_success = True
if incident.outcome.effectiveness_score is None or incident.outcome.effectiveness_score < 4:
incident.outcome.effectiveness_score = 4 # 系統判斷K8s 執行成功 = 有效
if incident.status not in [IncidentStatus.RESOLVED, IncidentStatus.CLOSED]:
incident.status = IncidentStatus.RESOLVED
incident.resolved_at = now_taipei()
# Task 3.3 (2026-04-14): 記錄執行動作供 SSH 路徑 KM 萃取
# approval.action 含實際執行指令(可能是 kubectl 或 ssh ...
# 寫入 learning_notes 供 playbook_service._extract_repair_steps 萃取 SSH RepairStep
if not incident.outcome.learning_notes and approval.action:
incident.outcome.learning_notes = approval.action
# 回存 Incidentfire-and-forget 路徑,失敗不影響主流程)
await incident_service.save_to_working_memory(incident)
logger.info(
"playbook_extraction_incident_updated",
approval_id=str(approval.id),
incident_id=incident_id,
effectiveness_score=incident.outcome.effectiveness_score,
status=incident.status.value,
)
# 4. 觸發萃取effectiveness 已保證 >= 4
from src.services.playbook_service import get_playbook_service
playbook_service = get_playbook_service()
effectiveness = incident.outcome.effectiveness_score or 4
playbook = await playbook_service.extract_from_incident(
incident=incident,
auto_approve=effectiveness >= 5, # 滿分自動核准
)
if playbook:
logger.info(
"playbook_auto_extracted",
approval_id=str(approval.id),
incident_id=incident_id,
playbook_id=playbook.playbook_id,
playbook_name=playbook.name,
auto_approved=playbook.status.value == "approved",
)
else:
logger.debug(
"playbook_extraction_no_result",
approval_id=str(approval.id),
incident_id=incident_id,
)
except Exception as e:
# 萃取失敗不影響主流程
logger.warning(
"playbook_extraction_error",
approval_id=str(approval.id),
error=str(e),
)
def _extract_incident_id_from_approval(
self,
approval: ApprovalRequest,
) -> str | None:
"""
從 approval 提取關聯的 incident_id
嘗試以下來源:
1. approval.metadata (如果有)
2. approval.description 中的 INC- 模式
3. approval.requested_by 中的 incident 資訊
"""
import re
# 從 description 或 action 中尋找 INC-XXXXXX 模式
text = f"{approval.description or ''} {approval.action or ''}"
match = re.search(r"INC-([A-Z0-9-]+)", text)
if match:
return match.group(0) # 返回完整的 INC-XXXXX
# 從 requested_by 尋找
if approval.requested_by and "INC-" in approval.requested_by:
match = re.search(r"INC-([A-Z0-9-]+)", approval.requested_by)
if match:
return match.group(0)
return None
# =============================================================================
# Singleton Instance
# =============================================================================
_execution_service: ApprovalExecutionService | None = None
def get_execution_service() -> ApprovalExecutionService:
"""
取得 ApprovalExecutionService 單例
Returns:
ApprovalExecutionService: 執行服務實例
"""
global _execution_service
if _execution_service is None:
_execution_service = ApprovalExecutionService()
return _execution_service