feat(adr100): request gate5 replay approval
Some checks failed
CD Pipeline / tests (push) Successful in 1m32s
Code Review / ai-code-review (push) Successful in 12s
CD Pipeline / post-deploy-checks (push) Has been cancelled
CD Pipeline / build-and-deploy (push) Has been cancelled

This commit is contained in:
Your Name
2026-06-02 10:43:09 +08:00
parent 98c01cdaff
commit f519c8e1ab
4 changed files with 261 additions and 19 deletions

View File

@@ -49,7 +49,7 @@ class RemediationDryRunRequest(BaseModel):
class RemediationApprovalRequest(BaseModel):
"""ADR-100 record-only PlayBook authoring approval request."""
"""ADR-100 record-only approval request."""
work_item_id: str = Field(min_length=1)
mode: RemediationMode = "approval"
@@ -131,7 +131,7 @@ async def dry_run_ai_slo_remediation(request: RemediationDryRunRequest) -> dict:
async def create_ai_slo_remediation_approval_request(
request: RemediationApprovalRequest,
) -> dict:
"""Create a record-only approval request for ADR-100 PlayBook authoring."""
"""Create a record-only approval request for ADR-100 remediation."""
try:
return await get_adr100_remediation_service().create_approval_request(

View File

@@ -45,6 +45,8 @@ RemediationMode = Literal["auto", "reverify", "replay", "ticket", "approval"]
_READY_STATUSES = {"ready_for_replay", "ready_for_reverify"}
_TICKET_STATUSES = {"needs_playbook_ticket"}
_TICKET_ACTIONS = {"create_playbook_ticket", "promote_diagnostic_to_repair_playbook"}
_RUNTIME_REPLAY_STATUSES = {"ready_for_replay"}
_RUNTIME_REPLAY_ACTIONS = {"replay_with_supported_executor"}
class RemediationNotFoundError(LookupError):
@@ -134,13 +136,13 @@ class Adr100RemediationService:
work_item_id: str,
mode: RemediationMode = "approval",
) -> dict[str, Any]:
"""Create a record-only approval for PlayBook authoring remediation."""
"""Create a record-only approval for PlayBook authoring or runtime replay."""
item = await self._find_work_item(work_item_id)
selected_mode = _select_mode(item, mode)
checks = _base_checks(item)
checks.append({
"name": "playbook_authoring_ticket_required",
"name": "approval_request_supported",
"passed": selected_mode in {"ticket", "approval"},
"detail": str(item.get("remediation_status") or "unknown"),
})
@@ -157,14 +159,43 @@ class Adr100RemediationService:
payload["history"] = await self._record_dry_run_history(item, payload)
return payload
approval_request = _approval_request_for_item(item, incident, checks)
replay_gate: dict[str, Any] | None = None
if selected_mode == "approval":
replay_gate = await self._build_replay_gate(item, incident)
checks.append({
"name": "runtime_replay_gate_ready",
"passed": replay_gate.get("status") == "runtime_replay_ready",
"detail": str(replay_gate.get("status") or "unknown"),
})
if replay_gate.get("status") != "runtime_replay_ready":
payload = _approval_blocked_payload(
item,
selected_mode,
checks,
extra={
"replay_gate": replay_gate,
"verification_result_preview": "runtime_replay_gate_blocked",
},
)
payload["history"] = await self._record_dry_run_history(item, payload)
return payload
approval_request = _runtime_replay_approval_request_for_item(
item,
incident,
checks,
replay_gate,
)
else:
approval_request = _approval_request_for_item(item, incident, checks)
approval_svc = self._approval_service
if approval_svc is None:
from src.services.approval_db import get_approval_service
approval_svc = get_approval_service()
fingerprint = _approval_fingerprint(item)
approval_kind = str((approval_request.metadata or {}).get("approval_kind") or "")
fingerprint = _approval_fingerprint(item, approval_kind=approval_kind)
approval = None
if hasattr(approval_svc, "find_by_fingerprint"):
try:
@@ -597,6 +628,8 @@ class Adr100RemediationService:
"timeline_event_id": None,
}
context = _approval_history_context(item, payload)
approval_kind = str(payload.get("approval_kind") or "")
is_runtime_replay = approval_kind == "adr100_runtime_replay_gate5"
try:
repo = self._alert_operation_log_repository
@@ -612,7 +645,11 @@ class Adr100RemediationService:
approval_id=approval_id or None,
auto_repair_id=str(item.get("auto_repair_id") or "") or None,
actor="adr100_remediation_service",
action_detail="adr100_playbook_authoring_approval_requested",
action_detail=(
"adr100_runtime_replay_gate5_approval_requested"
if is_runtime_replay
else "adr100_playbook_authoring_approval_requested"
),
success=True,
context=context,
)
@@ -635,7 +672,11 @@ class Adr100RemediationService:
event = await timeline.add_event(
event_type="human",
status="warning",
title="ADR-100 PlayBook authoring approval requested",
title=(
"ADR-100 runtime replay Gate 5 approval requested"
if is_runtime_replay
else "ADR-100 PlayBook authoring approval requested"
),
description=_approval_history_description(context),
actor="adr100_remediation_service",
actor_role="approval",
@@ -658,9 +699,21 @@ class Adr100RemediationService:
return history
def _select_mode(item: dict[str, Any], requested: RemediationMode) -> Literal["reverify", "replay", "ticket"]:
def _select_mode(
item: dict[str, Any],
requested: RemediationMode,
) -> Literal["reverify", "replay", "ticket", "approval"]:
if requested == "approval":
return "ticket"
if item.get("remediation_status") in _TICKET_STATUSES:
return "ticket"
if item.get("remediation_action") in _TICKET_ACTIONS:
return "ticket"
if (
item.get("remediation_status") in _RUNTIME_REPLAY_STATUSES
or item.get("remediation_action") in _RUNTIME_REPLAY_ACTIONS
):
return "approval"
return "replay"
if requested in ("reverify", "replay"):
return requested
if requested == "ticket":
@@ -719,6 +772,17 @@ def _plan_for_item(item: dict[str, Any], mode: str) -> dict[str, Any]:
"target_action": item.get("remediation_action"),
}
if mode == "approval":
if (
item.get("remediation_status") in _RUNTIME_REPLAY_STATUSES
or item.get("remediation_action") in _RUNTIME_REPLAY_ACTIONS
):
return {
"step": "request_runtime_replay_gate5_approval",
"agent_id": "auto_repair_executor",
"required_scope": "record_only_until_approved",
"writes": ["approval_records", "alert_operation_log", "timeline"],
"target_action": item.get("remediation_action"),
}
return {
"step": "request_playbook_authoring_approval",
"agent_id": "openclaw_playbook_planner",
@@ -763,6 +827,7 @@ def _approval_blocked_payload(
item: dict[str, Any],
mode: str,
checks: list[dict[str, Any]],
extra: dict[str, Any] | None = None,
) -> dict[str, Any]:
return {
"schema_version": "adr100_remediation_approval_v1",
@@ -784,6 +849,7 @@ def _approval_blocked_payload(
"approval": None,
"approval_id": None,
"plan": _plan_for_item(item, "approval"),
**(extra or {}),
}
@@ -882,12 +948,99 @@ def _approval_request_for_item(
)
def _approval_fingerprint(item: dict[str, Any]) -> str:
def _runtime_replay_approval_request_for_item(
item: dict[str, Any],
incident: Incident,
checks: list[dict[str, Any]],
replay_gate: dict[str, Any],
) -> ApprovalRequestCreate:
services = [svc for svc in (incident.affected_services or []) if svc]
if not services:
services = [str(item.get("alertname") or "unknown_alert")]
playbook_id = str(item.get("playbook_id") or "unknown_playbook")
work_item_id = str(item.get("work_item_id") or "")
write_routes = [
step.get("write_route")
for step in replay_gate.get("steps") or []
if isinstance(step, dict) and step.get("write_route")
]
route_names = [
str(route.get("tool_name") or "unknown_write_route")
for route in write_routes
if isinstance(route, dict)
]
action = (
"RUNTIME_REPLAY_GATE5: "
f"ADR-100 replay {playbook_id} via {', '.join(route_names) or 'mcp_write'}"
)
description = (
f"Incident: {item.get('incident_id') or incident.incident_id}\n"
f"Work item: {work_item_id or '-'}\n"
f"PlayBook: {playbook_id}\n"
f"Replay gate: {replay_gate.get('status')}\n"
f"Write routes: {', '.join(route_names) or '-'}\n\n"
"Approval scope: Gate 5 authorization for a controlled runtime replay. "
"Creating this approval does not execute repair, does not restart a "
"container, does not update incident state, and does not write an "
"auto_repair_executions result. Execution must happen only after the "
"approval status reaches approved and the executor re-validates the gate."
)
return ApprovalRequestCreate(
action=action,
description=description[:4000],
risk_level=RiskLevel.MEDIUM,
blast_radius=BlastRadius(
affected_pods=max(1, int(replay_gate.get("supported_write_route_count") or 1)),
estimated_downtime="<1m",
related_services=services[:6],
data_impact=DataImpact.WRITE,
),
dry_run_checks=[
DryRunCheck(
name=str(check.get("name") or "check"),
passed=bool(check.get("passed")),
message=str(check.get("detail") or ""),
)
for check in checks
],
requested_by="adr100_remediation_service",
expires_at=datetime.now(timezone.utc) + timedelta(hours=2),
metadata={
"schema_version": "adr100_runtime_replay_gate5_approval_v1",
"approval_kind": "adr100_runtime_replay_gate5",
"execution_kind": "runtime_replay_gate5_pending",
"execution_authorized": False,
"repair_attempted": False,
"repair_executed": False,
"work_item_id": work_item_id,
"auto_repair_id": item.get("auto_repair_id"),
"source": "adr100.verification_coverage.remediation_queue",
"target_action": item.get("remediation_action"),
"required_scope": "write_after_approval",
"next_step": "approve_then_dispatch_auto_repair_executor",
"playbook_id": playbook_id,
"flywheel_node": "approval",
"agent_id": "auto_repair_executor",
"mcp_gate": "gate5_required",
"replay_gate": replay_gate,
"write_routes": write_routes,
},
incident_id=str(item.get("incident_id") or incident.incident_id),
matched_playbook_id=playbook_id if playbook_id != "unknown_playbook" else None,
)
def _approval_fingerprint(
item: dict[str, Any],
*,
approval_kind: str = "adr100_playbook_authoring",
) -> str:
work_item_id = str(item.get("work_item_id") or "")
playbook_id = str(item.get("playbook_id") or "")
incident_id = str(item.get("incident_id") or "")
basis = work_item_id or f"{incident_id}:{playbook_id}:{item.get('remediation_action') or ''}"
return hashlib.sha256(f"adr100_playbook_authoring:{basis}".encode("utf-8")).hexdigest()
kind = approval_kind or "adr100_playbook_authoring"
return hashlib.sha256(f"{kind}:{basis}".encode("utf-8")).hexdigest()
def _approval_result_payload(
@@ -900,10 +1053,12 @@ def _approval_result_payload(
approval_created: bool,
fingerprint: str,
) -> dict[str, Any]:
ticket_preview = (request.metadata or {}).get("ticket_preview") or _ticket_preview_for_item(
item,
incident,
)
metadata = request.metadata or {}
approval_kind = str(metadata.get("approval_kind") or "adr100_playbook_authoring")
ticket_preview = metadata.get("ticket_preview")
if ticket_preview is None and approval_kind == "adr100_playbook_authoring":
ticket_preview = _ticket_preview_for_item(item, incident)
replay_gate = metadata.get("replay_gate")
approval_id = str(getattr(approval, "id", "") or "")
approval_status = getattr(getattr(approval, "status", None), "value", None) or getattr(
approval,
@@ -931,8 +1086,13 @@ def _approval_result_payload(
"creates_external_ticket": False,
"deduplicated": not approval_created,
"fingerprint": fingerprint,
"approval_kind": approval_kind,
"checks": checks,
"verification_result_preview": "approval_requested",
"verification_result_preview": (
"runtime_replay_approval_requested"
if approval_kind == "adr100_runtime_replay_gate5"
else "approval_requested"
),
"approval_id": approval_id or None,
"approval": {
"id": approval_id or None,
@@ -945,6 +1105,7 @@ def _approval_result_payload(
"matched_playbook_id": getattr(approval, "matched_playbook_id", None),
},
"ticket_preview": ticket_preview,
"replay_gate": replay_gate,
"plan": _plan_for_item(item, "approval"),
}
@@ -1043,7 +1204,9 @@ def _approval_history_context(item: dict[str, Any], payload: dict[str, Any]) ->
"creates_external_ticket": payload.get("creates_external_ticket"),
"deduplicated": payload.get("deduplicated"),
"fingerprint": payload.get("fingerprint"),
"approval_kind": payload.get("approval_kind"),
"ticket_preview": payload.get("ticket_preview"),
"replay_gate": payload.get("replay_gate"),
"approval": payload.get("approval"),
"approval_id": payload.get("approval_id"),
"plan": payload.get("plan"),
@@ -1077,6 +1240,7 @@ def _history_description(context: dict[str, Any]) -> str:
def _approval_history_description(context: dict[str, Any]) -> str:
approval = context.get("approval") or {}
return (
f"kind={context.get('approval_kind') or 'unknown'} "
f"approval={approval.get('id') or context.get('approval_id') or 'unknown'} "
f"status={approval.get('status') or 'unknown'} "
f"preview={context.get('verification_result_preview')} "
@@ -1125,6 +1289,7 @@ def _history_item(record: Any, context: dict[str, Any]) -> dict[str, Any]:
"writes_ticket": context.get("writes_ticket"),
"writes_approval_record": context.get("writes_approval_record"),
"creates_external_ticket": context.get("creates_external_ticket"),
"approval_kind": context.get("approval_kind"),
"approval_id": context.get("approval_id") or approval.get("id"),
"approval_status": approval.get("status"),
"approval_risk_level": approval.get("risk_level"),

View File

@@ -9,7 +9,7 @@ from fastapi import FastAPI
from fastapi.testclient import TestClient
from src.api.v1.ai_slo import router
from src.models.approval import ApprovalRequest, ApprovalStatus, RiskLevel
from src.models.approval import ApprovalRequest, ApprovalStatus, DataImpact, RiskLevel
from src.models.incident import Incident, IncidentStatus, Severity, Signal
from src.models.playbook import (
ActionType,
@@ -406,6 +406,81 @@ async def test_create_approval_request_deduplicates_existing_pending_approval():
assert approval_service.requests == []
@pytest.mark.asyncio
async def test_create_approval_request_for_runtime_replay_creates_gate5_record_only():
alert_repo = _FakeAlertOperationLogRepository()
timeline = _FakeTimelineService()
approval_service = _FakeApprovalService()
svc = _service(
item=_queue_item(),
playbook_service=_FakePlaybookService(_runtime_replay_playbook()),
approval_service=approval_service,
timeline_service=timeline,
alert_operation_log_repository=alert_repo,
record_history=True,
)
result = await svc.create_approval_request("verification:INC-20260514-TEST01:are-1")
assert result["schema_version"] == "adr100_remediation_approval_v1"
assert result["allowed"] is True
assert result["mode"] == "approval"
assert result["approval_kind"] == "adr100_runtime_replay_gate5"
assert result["verification_result_preview"] == "runtime_replay_approval_requested"
assert result["writes_approval_record"] is True
assert result["writes_incident_state"] is False
assert result["writes_auto_repair_result"] is False
assert result["replay_gate"]["status"] == "runtime_replay_ready"
assert result["replay_gate"]["repair_executed"] is False
assert result["approval"]["status"] == "pending"
assert result["plan"]["step"] == "request_runtime_replay_gate5_approval"
request = approval_service.requests[0]
assert request.action.startswith("RUNTIME_REPLAY_GATE5:")
assert request.blast_radius.data_impact == DataImpact.WRITE
assert request.metadata["approval_kind"] == "adr100_runtime_replay_gate5"
assert request.metadata["execution_authorized"] is False
assert request.metadata["repair_attempted"] is False
assert request.metadata["repair_executed"] is False
assert request.metadata["required_scope"] == "write_after_approval"
assert request.metadata["mcp_gate"] == "gate5_required"
assert request.metadata["write_routes"][0]["tool_name"] == "ssh_docker_restart"
assert request.metadata["write_routes"][0]["required_scope"] == "write"
assert request.metadata["replay_gate"]["status"] == "runtime_replay_ready"
assert alert_repo.calls[0]["event_type"] == "APPROVAL_ESCALATED"
assert alert_repo.calls[0]["action_detail"] == (
"adr100_runtime_replay_gate5_approval_requested"
)
assert alert_repo.calls[0]["context"]["approval_kind"] == "adr100_runtime_replay_gate5"
assert alert_repo.calls[0]["context"]["replay_gate"]["status"] == "runtime_replay_ready"
assert timeline.calls[0]["title"] == "ADR-100 runtime replay Gate 5 approval requested"
@pytest.mark.asyncio
async def test_create_approval_request_blocks_runtime_replay_when_gate_not_ready():
approval_service = _FakeApprovalService()
alert_repo = _FakeAlertOperationLogRepository()
svc = _service(
item=_queue_item(),
playbook_service=_FakePlaybookService(None),
approval_service=approval_service,
alert_operation_log_repository=alert_repo,
record_history=True,
)
result = await svc.create_approval_request("verification:INC-20260514-TEST01:are-1")
assert result["allowed"] is False
assert result["writes_approval_record"] is False
assert result["approval_id"] is None
assert result["verification_result_preview"] == "runtime_replay_gate_blocked"
assert result["replay_gate"]["status"] == "blocked_playbook_not_found"
assert approval_service.requests == []
assert alert_repo.calls[0]["event_type"] == "PRE_FLIGHT_FAILED"
assert alert_repo.calls[0]["context"]["replay_gate"]["status"] == (
"blocked_playbook_not_found"
)
@pytest.mark.asyncio
async def test_dry_run_reverify_collects_state_without_writes():
item = _queue_item(

View File

@@ -2778,7 +2778,9 @@ function Adr100RemediationQueuePanel({
const approval = result?.approval ?? null;
const replayGate = result?.replay_gate ?? null;
const canCreateApproval = item.remediation_status === "needs_playbook_ticket"
|| item.remediation_action === "promote_diagnostic_to_repair_playbook";
|| item.remediation_action === "promote_diagnostic_to_repair_playbook"
|| item.remediation_status === "ready_for_replay"
|| item.remediation_action === "replay_with_supported_executor";
return (
<article key={workItemId || item.incident_id || item.auto_repair_id} className="min-w-0 bg-white p-4">
<div className="flex flex-wrap items-start justify-between gap-3">