fix(awooop): link auto approved execution evidence
All checks were successful
Code Review / ai-code-review (push) Successful in 11s
CD Pipeline / tests (push) Successful in 1m17s
CD Pipeline / build-and-deploy (push) Successful in 3m42s
CD Pipeline / post-deploy-checks (push) Successful in 1m21s

This commit is contained in:
Your Name
2026-05-13 19:14:17 +08:00
parent c68cbd3139
commit 596f2f6820
3 changed files with 371 additions and 2 deletions

View File

@@ -1636,6 +1636,10 @@ async def _process_new_alert_background(
# 2026-04-27 ogt + Claude Sonnet 4.6: CS2 規則引擎自動執行
# 設計is_rule_based=True 確定性高,滿足條件直接執行,不等人工審核
# 安全防線CRITICAL / destructive patterns / NO_ACTION / 空 kubectl → 全部降級 PENDING
_cs2_auto_approval = None
_cs2_executor = None
_cs2_exec_success: bool | None = None
_cs2_exec_error: str | None = None
try:
from src.models.approval import ApprovalRequest, ApprovalStatus
from src.services.approval_execution import ApprovalExecutionService
@@ -1659,6 +1663,7 @@ async def _process_new_alert_background(
)
# 使用 DB 中剛建立的 approval.id 讓 executor 可回寫
_auto_approval.id = approval.id
_cs2_auto_approval = _auto_approval
_cs2_executor = ApprovalExecutionService()
_cs2_exec_success = await _cs2_executor.execute_approved_action(_auto_approval)
@@ -1681,6 +1686,8 @@ async def _process_new_alert_background(
exec_success=_cs2_exec_success,
)
except Exception as _auto_err:
_cs2_exec_success = False if _cs2_auto_approval is not None else None
_cs2_exec_error = str(_auto_err)
logger.warning(
"cs2_auto_execute_failed_degraded_to_pending",
approval_id=str(approval.id),
@@ -1712,6 +1719,23 @@ async def _process_new_alert_background(
error=str(_meta_err),
)
if _cs2_auto_approval is not None and _cs2_exec_success is not None:
try:
_cs2_auto_approval.incident_id = incident_id
_cs2_executor = _cs2_executor or ApprovalExecutionService()
await _cs2_executor.finalize_auto_approved_execution(
_cs2_auto_approval,
success=_cs2_exec_success,
error_message=_cs2_exec_error,
)
except Exception as _cs2_finalize_err:
logger.warning(
"cs2_auto_execute_finalize_failed",
approval_id=str(approval.id),
incident_id=incident_id,
error=str(_cs2_finalize_err),
)
_is_heartbeat = is_heartbeat_alertname(alertname)
if can_auto_repair and not _is_heartbeat:
await _try_auto_repair_background(
@@ -1875,8 +1899,15 @@ async def _process_new_alert_background(
and "NO_ACTION" not in (analysis_result.action_title or "")
and is_safe_kubectl_action(_cs3_kubectl)
)
_cs3_auto_approval = None
_cs3_executor = None
_cs3_exec_success: bool | None = None
_cs3_exec_error: str | None = None
if _cs3_can_auto:
try:
from src.models.approval import ApprovalRequest, ApprovalStatus
from src.services.approval_execution import ApprovalExecutionService
_cs3_auto_approval = ApprovalRequest(
action=approval_create.action,
description=approval_create.description,
@@ -1893,8 +1924,17 @@ async def _process_new_alert_background(
else "cs3_auto_confident_execution",
},
)
_cs3_auto_approval.id = approval.id
_cs3_executor = ApprovalExecutionService()
_cs3_exec_success = await _cs3_executor.execute_approved_action(_cs3_auto_approval)
try:
await service.update_execution_status(approval.id, _cs3_exec_success)
except Exception as _cs3_upd_err:
logger.warning(
"cs3_auto_execute_status_update_failed",
approval_id=str(approval.id),
error=str(_cs3_upd_err),
)
logger.info(
"cs3_llm_auto_executed",
approval_id=str(approval.id),
@@ -1910,6 +1950,8 @@ async def _process_new_alert_background(
),
)
except Exception as _cs3_exec_err:
_cs3_exec_success = False if _cs3_auto_approval is not None else None
_cs3_exec_error = str(_cs3_exec_err)
logger.warning("cs3_llm_auto_execute_failed", error=str(_cs3_exec_err))
incident_id = await create_incident_for_approval(
@@ -1937,6 +1979,23 @@ async def _process_new_alert_background(
error=str(_meta_err),
)
if _cs3_auto_approval is not None and _cs3_exec_success is not None:
try:
_cs3_auto_approval.incident_id = incident_id
_cs3_executor = _cs3_executor or ApprovalExecutionService()
await _cs3_executor.finalize_auto_approved_execution(
_cs3_auto_approval,
success=_cs3_exec_success,
error_message=_cs3_exec_error,
)
except Exception as _cs3_finalize_err:
logger.warning(
"cs3_auto_execute_finalize_failed",
approval_id=str(approval.id),
incident_id=incident_id,
error=str(_cs3_finalize_err),
)
root_cause = analysis_result.description or message
estimated_downtime = blast.estimated_downtime if blast else "~30s"
primary_responsibility = analysis_result.primary_responsibility or "COLLAB"

View File

@@ -858,7 +858,7 @@ class ApprovalExecutionService:
"""
try:
# 自動執行路徑 skip避免與 _push_auto_repair_result 重複發訊息)
if (approval.requested_by or "").lower() == "auto_approve":
if self._is_auto_approved_request(approval):
return
if not approval.incident_id:
@@ -1106,6 +1106,186 @@ class ApprovalExecutionService:
error=str(_e),
)
@staticmethod
def _is_auto_approved_request(approval: "ApprovalRequest") -> bool:
requested_by = (getattr(approval, "requested_by", "") or "").lower()
return requested_by.startswith("auto_approve")
@staticmethod
def _is_observation_only_action(action: str | None) -> bool:
action_upper = (action or "").strip().upper()
return (
not action_upper
or "NO_ACTION" in action_upper
or "NO-ACTION" in action_upper
or "NOACTION" in action_upper
or action_upper.startswith("OBSERVE")
or action_upper.startswith("INVESTIGATE")
)
@staticmethod
def _approval_risk_value(approval: "ApprovalRequest") -> str | None:
risk_level = getattr(approval, "risk_level", None)
if risk_level is None:
return None
return getattr(risk_level, "value", str(risk_level))
async def finalize_auto_approved_execution(
self,
approval: "ApprovalRequest",
*,
success: bool,
error_message: str | None = None,
) -> None:
"""
補齊「自動批准已執行」路徑的 incident-linked 證據鏈。
CS2/CS3 webhook 路徑為了快速執行,會先呼叫 execute_approved_action()
再建立 Incident。executor 當下沒有 incident_id導致 verifier/KM/
auto_repair_executions 都無法串回同一張告警卡。此方法只在 incident
建立後補上 durable trace不重新執行 action。
"""
if not self._is_auto_approved_request(approval):
return
incident_id = getattr(approval, "incident_id", None)
if not incident_id:
logger.warning(
"auto_approved_execution_finalize_skipped_no_incident",
approval_id=str(getattr(approval, "id", "")),
requested_by=getattr(approval, "requested_by", None),
)
return
if self._is_observation_only_action(getattr(approval, "action", None)):
logger.info(
"auto_approved_execution_finalize_skipped_observation_only",
approval_id=str(approval.id),
incident_id=incident_id,
action=(approval.action or "")[:120],
)
return
parsed = parse_operation_from_action(approval.action)
operation_type = parsed.operation_type
resource_name = parsed.resource_name or "unknown"
namespace = parsed.namespace or "default"
playbook_id = str(getattr(approval, "matched_playbook_id", None) or approval.id)[:36]
operation_label = operation_type.value if operation_type else "unknown"
playbook_name = f"approval_auto_execute:{operation_label}:{resource_name}"[:200]
triggered_by = (getattr(approval, "requested_by", None) or "auto_approve")[:50]
action_taken = f"auto_repair_playbook:{playbook_id}:{operation_label}:{resource_name}"
if not success:
action_taken = f"{action_taken}:FAILED"
error_message = error_message or "auto-approved executor returned failure; see approval/aol logs"
try:
from src.repositories.audit_log_repository import get_auto_repair_execution_repository
repo = get_auto_repair_execution_repository()
existing = await repo.list_by_incident(incident_id)
already_recorded = any(
str(getattr(row, "playbook_id", "")) == playbook_id
and getattr(row, "triggered_by", "") == triggered_by
and (approval.action or "") in list(getattr(row, "executed_steps", []) or [])
for row in existing
)
if not already_recorded:
await repo.create(
incident_id=incident_id,
playbook_id=playbook_id,
playbook_name=playbook_name,
success=success,
executed_steps=[approval.action],
error_message=error_message,
triggered_by=triggered_by,
risk_level=self._approval_risk_value(approval),
)
else:
logger.info(
"auto_approved_execution_record_already_exists",
approval_id=str(approval.id),
incident_id=incident_id,
playbook_id=playbook_id,
)
except Exception as exc:
logger.warning(
"auto_approved_execution_record_failed",
approval_id=str(approval.id),
incident_id=incident_id,
error=str(exc),
)
try:
timeline = get_timeline_service()
await timeline.add_event(
event_type="exec",
status="success" if success else "error",
title=f"{'' if success else ''} 自動批准執行已補鏈: {operation_label}",
description=(
f"Target: {resource_name} @ {namespace}; "
f"source={triggered_by}; action={approval.action[:160]}"
),
actor="leWOOOgo",
actor_role="executor",
approval_id=str(approval.id),
incident_id=incident_id,
)
except Exception as exc:
logger.warning(
"auto_approved_execution_timeline_failed",
approval_id=str(approval.id),
incident_id=incident_id,
error=str(exc),
)
try:
await self.write_execution_result_to_km(approval, success, error_message)
except Exception as exc:
logger.warning(
"auto_approved_execution_km_failed",
approval_id=str(approval.id),
incident_id=incident_id,
error=str(exc),
)
from src.core.feature_flags import aiops_flags
if aiops_flags.is_sub_flag_enabled("AIOPS_P1_POST_EXECUTION_VERIFIER"):
try:
await asyncio.wait_for(
self._run_post_execution_verify(
approval=approval,
action_taken=action_taken,
),
timeout=_VERIFIER_AWAIT_TIMEOUT_SEC,
)
except asyncio.TimeoutError:
logger.warning(
"auto_approved_execution_post_verify_timeout",
approval_id=str(approval.id),
incident_id=incident_id,
timeout_sec=_VERIFIER_AWAIT_TIMEOUT_SEC,
)
if success:
try:
from src.services.incident_service import get_incident_service
await get_incident_service().resolve_incident(incident_id)
logger.info(
"incident_resolved_after_auto_approved_execution_finalize",
incident_id=incident_id,
approval_id=str(approval.id),
)
except Exception as exc:
logger.warning(
"incident_resolve_after_auto_approved_execution_finalize_failed",
incident_id=incident_id,
approval_id=str(approval.id),
error=str(exc),
)
async def write_execution_result_to_km(
self,
approval: "ApprovalRequest",
@@ -1124,7 +1304,7 @@ class ApprovalExecutionService:
from src.services.km_writer import KMWritePayload, km_write_with_flag
# 來源辨識B.1 精修)
_is_auto = (approval.requested_by or "").lower() == "auto_approve"
_is_auto = self._is_auto_approved_request(approval)
_mode_prefix = "[自動修復]" if _is_auto else "[人工修復]"
_mode_tag = "auto_executed" if _is_auto else "human_approved"

View File

@@ -0,0 +1,130 @@
from types import SimpleNamespace
from unittest.mock import AsyncMock
import pytest
from src.models.approval import RiskLevel
from src.services.approval_execution import ApprovalExecutionService
class _FakeAutoRepairRepo:
def __init__(self) -> None:
self.created: list[dict] = []
async def list_by_incident(self, incident_id: str) -> list:
return []
async def create(self, **kwargs):
self.created.append(kwargs)
return SimpleNamespace(id="are-1", **kwargs)
@pytest.mark.asyncio
async def test_finalize_auto_approved_execution_persists_incident_link(monkeypatch):
repo = _FakeAutoRepairRepo()
timeline = SimpleNamespace(add_event=AsyncMock())
incident_service = SimpleNamespace(resolve_incident=AsyncMock())
write_km = AsyncMock()
run_verify = AsyncMock()
monkeypatch.setattr(
"src.repositories.audit_log_repository.get_auto_repair_execution_repository",
lambda: repo,
)
monkeypatch.setattr(
"src.services.approval_execution.get_timeline_service",
lambda: timeline,
)
monkeypatch.setattr(
"src.services.incident_service.get_incident_service",
lambda: incident_service,
)
monkeypatch.setattr(
"src.core.feature_flags.aiops_flags",
SimpleNamespace(is_sub_flag_enabled=lambda _: True),
)
monkeypatch.setattr(
ApprovalExecutionService,
"write_execution_result_to_km",
write_km,
)
monkeypatch.setattr(
ApprovalExecutionService,
"_run_post_execution_verify",
run_verify,
)
approval = SimpleNamespace(
id="11111111-1111-1111-1111-111111111111",
incident_id="INC-20260513-001",
action="kubectl rollout restart deployment/api -n awoooi-prod",
requested_by="auto_approve_rule_engine",
matched_playbook_id="pb-auto-001",
risk_level=RiskLevel.LOW,
)
await ApprovalExecutionService().finalize_auto_approved_execution(
approval,
success=True,
)
assert repo.created == [
{
"incident_id": "INC-20260513-001",
"playbook_id": "pb-auto-001",
"playbook_name": "approval_auto_execute:RESTART_DEPLOYMENT:api",
"success": True,
"executed_steps": ["kubectl rollout restart deployment/api -n awoooi-prod"],
"error_message": None,
"triggered_by": "auto_approve_rule_engine",
"risk_level": "low",
}
]
timeline.add_event.assert_awaited_once()
write_km.assert_awaited_once_with(approval, True, None)
run_verify.assert_awaited_once()
assert run_verify.await_args.kwargs["action_taken"].startswith(
"auto_repair_playbook:pb-auto-001:RESTART_DEPLOYMENT:api"
)
incident_service.resolve_incident.assert_awaited_once_with("INC-20260513-001")
@pytest.mark.asyncio
async def test_finalize_auto_approved_execution_skips_no_action(monkeypatch):
repo = _FakeAutoRepairRepo()
write_km = AsyncMock()
run_verify = AsyncMock()
monkeypatch.setattr(
"src.repositories.audit_log_repository.get_auto_repair_execution_repository",
lambda: repo,
)
monkeypatch.setattr(
ApprovalExecutionService,
"write_execution_result_to_km",
write_km,
)
monkeypatch.setattr(
ApprovalExecutionService,
"_run_post_execution_verify",
run_verify,
)
approval = SimpleNamespace(
id="22222222-2222-2222-2222-222222222222",
incident_id="INC-20260513-002",
action="NO_ACTION: observe only",
requested_by="auto_approve_rule_engine",
matched_playbook_id="pb-auto-002",
risk_level=RiskLevel.LOW,
)
await ApprovalExecutionService().finalize_auto_approved_execution(
approval,
success=True,
)
assert repo.created == []
write_km.assert_not_awaited()
run_verify.assert_not_awaited()