fix(aiops): 打通 AI 自主學習鏈 — verifier 改 await + aol 動作回灌
Some checks failed
CD Pipeline / build-and-deploy (push) Failing after 7m29s

統帥 2026-04-19 全景審計發現:
  - automation_operation_log: 22 筆 (全部 drift_narrator),33 件/7d approval 動作 0 筆回灌
  - incident_evidence.verification_result: 1212 筆 100% NULL,verifier 從未寫入
  - 根因: _run_post_execution_verify 用 asyncio.create_task fire-and-forget,
          Pod recycle 時 task 被殺,verification_result 永遠寫不進去

修復 (打通 verifier→learning→Playbook EWMA→finetune 全鏈):

approval_execution.py:
  + _log_aol_started: 主流程開始時 INSERT aol(playbook_executed, pending)
  + _log_aol_completed: 4 個 return 點 UPDATE aol 為 success/failed + duration + stderr
    └ NO_ACTION / parse_fail / K8s 成功 / K8s 失敗 全部留痕
  ~ _run_post_execution_verify 兩處 (成功+失敗 path) 從 create_task 改 await + 60s timeout
  + 失敗時 stderr_feed_back 寫入 result.error → 解開 E6 stderr 回灌閉環

declarative_remediation.py:
  ~ _log_remediation_event task 加 named + add_done_callback,task 失敗時有 log
    (原 fire-and-forget 0 筆寫入,現在可診斷為何 task 死掉)

預期效果:
  - aol playbook_executed 即時可見 (33 件/7d 立刻有資料)
  - incident_evidence.verification_result 開始累積 → finetune_exporter 7d cron 終於有料
  - Playbook EWMA trust_score 開始動態變化
  - stderr_feed_back 接通 → 失敗訊號回灌 retry/Playbook 負向強化

不影響:
  - background_task 跑在背景,+60s 延遲不阻塞 API
  - aol 寫入失敗只 logger.warning,不阻塞執行主流程

Refs: MASTER §3.1 L6×D1 (ADR-081 PostExecutionVerifier),
      MASTER §3.4 D4 (ADR-083 學習閉環),
      ADR-090 監控盲區治理 (2026-04-18 全景審計)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
OG T
2026-04-19 12:06:59 +08:00
parent da7956187e
commit e7ba8cb181
2 changed files with 206 additions and 18 deletions

View File

@@ -24,6 +24,7 @@ Approval Execution Service - Phase 16 R4.2 瘦身 Router 抽取
"""
import asyncio
import time
from typing import TYPE_CHECKING
import structlog
@@ -39,6 +40,11 @@ if TYPE_CHECKING:
logger = structlog.get_logger(__name__)
# ADR-090 § 自動化動作回灌 (2026-04-19 ogt + Claude Opus 4.7 亞太):
# PostExecutionVerifier 從 fire-and-forget 改 await,確保 verification_result 必寫入 incident_evidence.
# 上限 60s 涵蓋 verifier warmup(10s) + collect(30s) + 緩衝 20s.
_VERIFIER_AWAIT_TIMEOUT_SEC = 60.0
class ApprovalExecutionService:
"""
@@ -134,6 +140,11 @@ class ApprovalExecutionService:
action=approval.action,
)
# ADR-090 § 自動化動作回灌 (2026-04-19): 主流程開始即在 aol 留痕,
# 結束時 update。不依賴 fire-and-forget,確保 33 件/7d approval 全部可觀測。
_aol_op_id = await self._log_aol_started(approval)
_aol_started_ms = time.time()
service = get_approval_service()
timeline = get_timeline_service()
@@ -181,6 +192,13 @@ class ApprovalExecutionService:
approval, success=True, error=None,
)
)
# ADR-090 § aol completed (NO_ACTION 視為成功)
await self._log_aol_completed(
op_id=_aol_op_id,
status="success",
duration_ms=int((time.time() - _aol_started_ms) * 1000),
output={"reason": "NO_ACTION", "action": approval.action[:200]},
)
return True # NO_ACTION 視為成功完成
# 真解析失敗 (非 NO_ACTION)
@@ -215,6 +233,13 @@ class ApprovalExecutionService:
error_message="Could not parse operation type",
)
)
# ADR-090 § aol completed (parse 失敗)
await self._log_aol_completed(
op_id=_aol_op_id,
status="failed",
duration_ms=int((time.time() - _aol_started_ms) * 1000),
error=f"parse_fail: {approval.action[:300]}",
)
return False # 解析失敗 → 執行未發生
# ADR-076 Task 3: 執行失敗重試機制
@@ -337,16 +362,25 @@ class ApprovalExecutionService:
timeout_sec=30.0,
)
# ADR-081 Phase 1: 執行後驗證 (fire-and-forget)
# PostExecutionVerifier 等待 K8s 收斂後抓取後狀態,補填 EvidenceSnapshot
# ADR-081 Phase 1 + ADR-090 修復 (2026-04-19 ogt + Claude Opus 4.7):
# PostExecutionVerifier 改 await + 60s timeout,確保 verification_result 必寫入。
# 之前 fire-and-forget 在 Pod recycle 時 task 被殺,導致 1212 筆 evidence 全 NULL.
from src.core.feature_flags import aiops_flags
if aiops_flags.is_sub_flag_enabled("AIOPS_P1_POST_EXECUTION_VERIFIER"):
asyncio.create_task(
self._run_post_execution_verify(
approval=approval,
action_taken=f"{operation_type.value}:{resource_name}",
try:
await asyncio.wait_for(
self._run_post_execution_verify(
approval=approval,
action_taken=f"{operation_type.value}:{resource_name}",
),
timeout=_VERIFIER_AWAIT_TIMEOUT_SEC,
)
except asyncio.TimeoutError:
logger.warning(
"post_verify_timeout_exceeded",
approval_id=str(approval.id),
timeout_sec=_VERIFIER_AWAIT_TIMEOUT_SEC,
)
)
# 2026-04-07 Claude Code: Sprint 4 B3 — 記錄人工批准處置類型
try:
@@ -373,6 +407,19 @@ class ApprovalExecutionService:
except Exception as _resolve_e:
logger.warning("incident_resolve_after_execution_failed", error=str(_resolve_e))
# ADR-090 § aol completed (執行成功)
await self._log_aol_completed(
op_id=_aol_op_id,
status="success",
duration_ms=int((time.time() - _aol_started_ms) * 1000),
output={
"operation_type": operation_type.value,
"resource_name": resource_name,
"namespace": namespace,
"executor_duration_ms": result.duration_ms,
"total_attempts": total_attempts,
},
)
return True # K8s 執行成功
else:
@@ -438,18 +485,41 @@ class ApprovalExecutionService:
timeout_sec=30.0,
)
# 2026-04-18 ogt + Claude Opus 4.7: ADR-090 L6 斷鏈修復 — P0.3
# 失敗時也跑 verifier,把 verification_result='failed' 回寫 evidence
# 之前 988 筆 evidence 的 verification_result 全 NULL,因 verifier 只在 success 時跑
# ADR-090 修復 (2026-04-19 ogt + Claude Opus 4.7):
# 失敗時也跑 verifier,把 verification_result='failed' 回寫 evidence
# 改 await + 60s timeout (原為 fire-and-forget,task 在 Pod recycle 時被殺)。
from src.core.feature_flags import aiops_flags
if aiops_flags.is_sub_flag_enabled("AIOPS_P1_POST_EXECUTION_VERIFIER"):
asyncio.create_task(
self._run_post_execution_verify(
approval=approval,
action_taken=f"{operation_type.value}:{resource_name}:FAILED",
try:
await asyncio.wait_for(
self._run_post_execution_verify(
approval=approval,
action_taken=f"{operation_type.value}:{resource_name}:FAILED",
),
timeout=_VERIFIER_AWAIT_TIMEOUT_SEC,
)
except asyncio.TimeoutError:
logger.warning(
"post_verify_timeout_exceeded_failed_path",
approval_id=str(approval.id),
timeout_sec=_VERIFIER_AWAIT_TIMEOUT_SEC,
)
)
# ADR-090 § aol completed (執行失敗)
await self._log_aol_completed(
op_id=_aol_op_id,
status="failed",
duration_ms=int((time.time() - _aol_started_ms) * 1000),
output={
"operation_type": operation_type.value,
"resource_name": resource_name,
"namespace": namespace,
"executor_duration_ms": result.duration_ms,
"total_attempts": total_attempts,
},
error=result.error,
stderr=result.error, # E6 stderr 回灌 — 給 retry/Playbook 負向強化用
)
return False # K8s 執行失敗
async def _push_execution_result_to_alert(
@@ -1014,6 +1084,107 @@ class ApprovalExecutionService:
return None
# =========================================================================
# ADR-090 § AOL Writer (2026-04-19 ogt + Claude Opus 4.7 亞太)
# 把 approval execution 的生命週期回灌 automation_operation_log.
# 之前 33 件/7d approval 動作完全沒寫入 aol,只有 drift_narrator 的
# 22 筆 notification_formatted。修復後每次執行都留痕。
# =========================================================================
async def _log_aol_started(self, approval: ApprovalRequest) -> str | None:
"""
在 automation_operation_log 寫一筆 'pending' 紀錄,回傳 op_id 供 _log_aol_completed 更新。
失敗時 (DB 異常) 回 None,主流程繼續 — aol 寫入永不阻塞執行。
"""
try:
from sqlalchemy import text as _sql
from src.db.base import get_db_context
import json as _json
input_payload = {
"approval_id": str(approval.id),
"incident_id": approval.incident_id or "",
"action": (approval.action or "")[:500],
"risk_level": getattr(approval, "risk_level", None) or "",
"requested_by": getattr(approval, "requested_by", "") or "",
}
async with get_db_context() as db:
row = await db.execute(
_sql("""
INSERT INTO automation_operation_log (
operation_type, actor, status,
input, output, tags
) VALUES (
'playbook_executed',
'approval_execution',
'pending',
CAST(:input AS jsonb),
'{}'::jsonb,
:tags
)
RETURNING op_id
"""),
{
"input": _json.dumps(input_payload, ensure_ascii=False),
"tags": ["approval", "execution", "playbook"],
},
)
op_id = row.scalar()
return str(op_id) if op_id else None
except Exception as e:
logger.warning("aol_started_write_failed", approval_id=str(approval.id), error=str(e))
return None
async def _log_aol_completed(
self,
op_id: str | None,
status: str,
duration_ms: int,
output: dict | None = None,
error: str | None = None,
stderr: str | None = None,
) -> None:
"""
UPDATE automation_operation_log 為 success/failed 並寫入結果摘要 + stderr。
status 必須是 aol constraint 允許的值:
pending | success | failed | dry_run | rolled_back
op_id 為 None 時靜默跳過 (started 寫入失敗時不應觸發 update 例外)。
"""
if not op_id:
return
try:
from sqlalchemy import text as _sql
from src.db.base import get_db_context
import json as _json
async with get_db_context() as db:
await db.execute(
_sql("""
UPDATE automation_operation_log
SET status = :status,
duration_ms = :duration_ms,
output = CAST(:output AS jsonb),
error = :error,
stderr_feed_back = :stderr
WHERE op_id = CAST(:op_id AS uuid)
"""),
{
"status": status,
"duration_ms": duration_ms,
"output": _json.dumps(output or {}, ensure_ascii=False),
"error": (error or "")[:2000] if error else None,
"stderr": (stderr or "")[:8000] if stderr else None,
"op_id": op_id,
},
)
except Exception as e:
logger.warning("aol_completed_write_failed", op_id=op_id, error=str(e))
# =============================================================================
# Singleton Instance
# =============================================================================

View File

@@ -168,13 +168,30 @@ class DeclarativeRemediation:
)
# 2026-04-18 ADR-090-D: 寫入 remediation_events 表(MASTER §7.1 #6 KPI 資料源)
# fire-and-forget,不阻塞主流程
# 2026-04-19 ogt + Claude Opus 4.7 修復: 原 fire-and-forget 0 筆寫入。
# evaluate() 是同步函式,無法直接 await — 改用 named task + done_callback,
# 確保 task 失敗時有 log,後續可診斷為何 0 筆。
try:
import asyncio as _a
_a.create_task(_log_remediation_event(spec, action, target, namespace))
_task = _a.create_task(
_log_remediation_event(spec, action, target, namespace),
name=f"log_remediation:{action[:30]}",
)
def _on_done(t: _a.Task) -> None:
if t.cancelled():
logger.warning("log_remediation_event_cancelled", action=action[:80])
elif t.exception():
logger.warning(
"log_remediation_event_failed",
action=action[:80],
error=str(t.exception()),
)
_task.add_done_callback(_on_done)
except RuntimeError:
# 非 async context (正規呼叫都是 async),靜默跳過
pass
logger.debug("log_remediation_event_no_event_loop", action=action[:80])
return spec