feat(awooop): preview recurrence repair work items
All checks were successful
Code Review / ai-code-review (push) Successful in 10s
CD Pipeline / tests (push) Successful in 1m11s
CD Pipeline / build-and-deploy (push) Successful in 3m33s
CD Pipeline / post-deploy-checks (push) Successful in 1m32s

This commit is contained in:
Your Name
2026-05-18 21:42:20 +08:00
parent 51660ecbb1
commit d1ebcdac10
6 changed files with 925 additions and 4 deletions

View File

@@ -10,13 +10,17 @@ from datetime import datetime
from typing import Any
from uuid import UUID
from fastapi import APIRouter, Query
from fastapi import APIRouter, HTTPException, Query
from pydantic import BaseModel, Field
from src.services.channel_event_dossier_service import (
RecurrenceWorkItemMode,
RecurrenceWorkItemNotFoundError,
fetch_channel_event_dossier,
fetch_channel_event_dossier_coverage,
fetch_channel_event_dossier_recurrence,
fetch_recurrence_work_item_dry_run,
fetch_recurrence_work_item_preview,
)
from src.services.platform_operator_service import list_recent_channel_events
@@ -170,6 +174,16 @@ class ChannelEventRecurrenceResponse(BaseModel):
items: list[ChannelEventRecurrenceItem]
class RecurrenceWorkItemDryRunRequest(BaseModel):
"""AwoooP recurrence work item dry-run request."""
project_id: str | None = Field(default=None, min_length=1)
work_item_id: str = Field(min_length=1)
mode: RecurrenceWorkItemMode = "auto"
provider: str | None = Field(default=None, min_length=1)
limit: int = Field(default=300, ge=1, le=300)
@router.get(
"/events/dossier",
response_model=ChannelEventDossierResponse,
@@ -241,6 +255,64 @@ async def get_event_dossier_recurrence(
)
@router.get(
"/events/dossier/recurrence/work-item/preview",
summary="預覽重複告警工作項的安全處理計畫",
description=(
"依 recurrence read model 找出指定 work_item返回下一步、pre-flight checks "
"與 read-only / no-write 保證;不修改 incident、auto-repair 或 ticket 狀態。"
),
)
async def preview_event_recurrence_work_item(
work_item_id: str = Query(..., min_length=1, description="recurrence work_item_id"),
project_id: str | None = Query(None, description="租戶 ID可選"),
provider: str | None = Query(
None, description="provider可選如 alertmanager / sentry / signoz"
),
mode: RecurrenceWorkItemMode = Query("auto", description="預覽模式"),
limit: int = Query(300, ge=1, le=300, description="最多納入統計筆數"),
) -> dict[str, Any]:
try:
return await fetch_recurrence_work_item_preview(
project_id=project_id,
work_item_id=work_item_id,
mode=mode,
provider=provider,
limit=limit,
)
except RecurrenceWorkItemNotFoundError as exc:
raise HTTPException(
status_code=404,
detail="recurrence_work_item_not_found",
) from exc
@router.post(
"/events/dossier/recurrence/work-item/dry-run",
summary="乾跑重複告警工作項的安全處理流程",
description=(
"依 recurrence read model 產生 dry-run 結果並寫入 pre-flight history"
"但不修改 incident、auto-repair 或 ticket 狀態。"
),
)
async def dry_run_event_recurrence_work_item(
request: RecurrenceWorkItemDryRunRequest,
) -> dict[str, Any]:
try:
return await fetch_recurrence_work_item_dry_run(
project_id=request.project_id,
work_item_id=request.work_item_id,
mode=request.mode,
provider=request.provider,
limit=request.limit,
)
except RecurrenceWorkItemNotFoundError as exc:
raise HTTPException(
status_code=404,
detail="recurrence_work_item_not_found",
) from exc
@router.get(
"/events/recent",
response_model=RecentEventsResponse,

View File

@@ -8,19 +8,27 @@ automation state.
from __future__ import annotations
import re
from typing import Any
from typing import Any, Literal
from uuid import UUID
import structlog
from fastapi import HTTPException, status
from sqlalchemy import text
from src.db.base import get_db_context
logger = structlog.get_logger(__name__)
_MAX_DOSSIER_EVENTS = 50
_MAX_COVERAGE_EVENTS = 200
_MAX_RECURRENCE_EVENTS = 300
_MAX_REPAIR_INCIDENTS = 200
_INCIDENT_ID_RE = re.compile(r"\bINC-\d{8}-[A-Z0-9]{4,}\b")
RecurrenceWorkItemMode = Literal["auto", "ticket", "reverify", "approval_review", "observe"]
class RecurrenceWorkItemNotFoundError(LookupError):
"""Requested recurrence work item is not in the current read model."""
def _as_dict(value: Any) -> dict[str, Any]:
@@ -390,6 +398,386 @@ def _attach_work_item_summary(
}
def _recurrence_work_item_target(item: dict[str, Any]) -> dict[str, Any]:
return {
"recurrence_key": item.get("recurrence_key"),
"provider": item.get("provider"),
"alertname": item.get("alertname"),
"severity": item.get("severity"),
"namespace": item.get("namespace"),
"target_resource": item.get("target_resource"),
"fingerprint": item.get("fingerprint"),
"latest_event_id": item.get("latest_event_id"),
"latest_provider_event_id": item.get("latest_provider_event_id"),
"latest_run_id": item.get("latest_run_id"),
"latest_run_state": item.get("latest_run_state"),
"latest_agent_id": item.get("latest_agent_id"),
"latest_incident_id": item.get("latest_incident_id"),
}
def _selected_recurrence_mode(
work_item: dict[str, Any],
requested_mode: RecurrenceWorkItemMode,
) -> str:
if requested_mode != "auto":
return requested_mode
next_step = str(work_item.get("next_step") or "")
if next_step == "create_repair_ticket":
return "ticket"
if next_step in {
"run_post_verification",
"triage_failed_repair",
"review_repair_record",
"triage_missing_repair_record",
}:
return "reverify"
if next_step == "review_approval":
return "approval_review"
return "observe"
def _recurrence_work_item_checks(
item: dict[str, Any],
work_item: dict[str, Any],
) -> list[dict[str, Any]]:
repair_summary = _as_dict(item.get("repair_summary"))
source_ref_total = int(item.get("source_ref_total") or 0)
return [
{
"name": "work_item_open",
"passed": work_item.get("status") == "open",
"detail": str(work_item.get("status") or "unknown"),
},
{
"name": "incident_linked",
"passed": bool(work_item.get("incident_id")),
"detail": str(work_item.get("incident_id") or "missing incident_id"),
},
{
"name": "known_next_step",
"passed": str(work_item.get("next_step") or "none") != "none",
"detail": str(work_item.get("next_step") or "none"),
},
{
"name": "source_refs_present",
"passed": source_ref_total > 0,
"detail": str(source_ref_total),
},
{
"name": "no_destructive_writes",
"passed": True,
"detail": "preview_and_dry_run_only",
},
{
"name": "repair_status_visible",
"passed": bool(repair_summary.get("status")),
"detail": str(repair_summary.get("status") or "unknown"),
},
]
def _recurrence_plan(
item: dict[str, Any],
work_item: dict[str, Any],
mode: str,
) -> dict[str, Any]:
route_by_mode = {
"ticket": {
"step": "prepare_repair_ticket_preview",
"flywheel_node": "work_item_to_ticket",
"agent_id": "awooop_recurrence_coordinator",
},
"reverify": {
"step": "collect_read_model_and_prepare_reverification",
"flywheel_node": "verify",
"agent_id": "post_execution_verifier",
},
"approval_review": {
"step": "route_to_approval_review",
"flywheel_node": "approval",
"agent_id": "awooop_approval_coordinator",
},
"observe": {
"step": "observe_until_run_or_repair_state_changes",
"flywheel_node": "observe",
"agent_id": "awooop_recurrence_coordinator",
},
}
route = route_by_mode.get(mode, route_by_mode["observe"])
return {
**route,
"required_scope": "read",
"writes": [],
"target_action": work_item.get("next_step"),
"reason": work_item.get("reason"),
"target": _recurrence_work_item_target(item),
}
def _ticket_preview(item: dict[str, Any], work_item: dict[str, Any]) -> dict[str, Any]:
alertname = str(item.get("alertname") or item.get("provider") or "recurrence")
incident_id = str(work_item.get("incident_id") or item.get("latest_incident_id") or "")
kind = str(work_item.get("kind") or "recurrence")
title = f"[AwoooP] {alertname} recurrence work item: {incident_id or 'unlinked'}"
labels = ["awooop", "recurrence", kind]
body_lines = [
f"Incident: {incident_id or '--'}",
f"Alert: {alertname}",
f"Namespace/Target: {item.get('namespace') or '--'} / {item.get('target_resource') or '--'}",
f"Occurrences: {item.get('occurrence_total') or 0}",
f"Duplicates: {item.get('duplicate_total') or 0}",
f"Latest run: {item.get('latest_run_id') or '--'} ({item.get('latest_run_state') or '--'})",
f"Repair status: {_as_dict(item.get('repair_summary')).get('status') or '--'}",
f"Next step: {work_item.get('next_step') or '--'}",
"Writes: none in preview/dry-run; ticket creation requires a later explicit apply path.",
]
return {
"would_create": False,
"title": title[:180],
"labels": labels,
"body_preview": "\n".join(body_lines)[:1000],
}
def _recurrence_current_state_summary(
item: dict[str, Any],
work_item: dict[str, Any],
) -> dict[str, Any]:
repair_summary = _as_dict(item.get("repair_summary"))
return {
"work_item_status": work_item.get("status"),
"work_item_kind": work_item.get("kind"),
"work_item_next_step": work_item.get("next_step"),
"work_item_reason": work_item.get("reason"),
"occurrence_total": int(item.get("occurrence_total") or 0),
"duplicate_total": int(item.get("duplicate_total") or 0),
"linked_run_total": int(item.get("linked_run_total") or 0),
"run_state_counts": item.get("run_state_counts") or {},
"latest_run_state": item.get("latest_run_state"),
"latest_run_id": item.get("latest_run_id"),
"repair_status": repair_summary.get("status"),
"latest_auto_repair_id": repair_summary.get("latest_auto_repair_id"),
"latest_verification_result": repair_summary.get("latest_verification_result"),
"auto_repair_total": int(repair_summary.get("auto_repair_total") or 0),
"source_ref_total": int(item.get("source_ref_total") or 0),
"sentry_ref_total": int(item.get("sentry_ref_total") or 0),
"signoz_ref_total": int(item.get("signoz_ref_total") or 0),
"alert_ref_total": int(item.get("alert_ref_total") or 0),
}
def _verification_result_preview(mode: str, allowed: bool) -> str:
if not allowed:
return "blocked"
return {
"ticket": "ticket_preview_ready",
"reverify": "reverify_preview_ready",
"approval_review": "approval_review_required",
"observe": "observe_only",
}.get(mode, "observe_only")
def _find_recurrence_work_item(
recurrence: dict[str, Any],
work_item_id: str,
) -> tuple[dict[str, Any], dict[str, Any]]:
for item in recurrence.get("items") or []:
work_item = _as_dict(item.get("work_item"))
if work_item.get("work_item_id") == work_item_id:
return item, work_item
raise RecurrenceWorkItemNotFoundError(work_item_id)
def build_recurrence_work_item_preview(
recurrence: dict[str, Any],
*,
work_item_id: str,
mode: RecurrenceWorkItemMode = "auto",
) -> dict[str, Any]:
"""Build a read-only plan for a recurrence work item."""
item, work_item = _find_recurrence_work_item(recurrence, work_item_id)
selected_mode = _selected_recurrence_mode(work_item, mode)
checks = _recurrence_work_item_checks(item, work_item)
allowed = all(check["passed"] for check in checks)
return {
"schema_version": "awooop_recurrence_work_item_preview_v1",
"source": "channel_event_dossier.recurrence",
"project_id": recurrence.get("project_id"),
"work_item_id": work_item.get("work_item_id"),
"incident_id": work_item.get("incident_id"),
"auto_repair_id": work_item.get("auto_repair_id"),
"mode": selected_mode,
"requested_mode": mode,
"allowed": allowed,
"safety_level": "read_only",
"writes_incident_state": False,
"writes_auto_repair_result": False,
"writes_ticket": False,
"checks": checks,
"plan": _recurrence_plan(item, work_item, selected_mode),
}
def build_recurrence_work_item_dry_run(
recurrence: dict[str, Any],
*,
work_item_id: str,
mode: RecurrenceWorkItemMode = "auto",
) -> dict[str, Any]:
"""Build a read-only dry-run result for a recurrence work item."""
item, work_item = _find_recurrence_work_item(recurrence, work_item_id)
selected_mode = _selected_recurrence_mode(work_item, mode)
checks = _recurrence_work_item_checks(item, work_item)
allowed = all(check["passed"] for check in checks)
payload = {
"schema_version": "awooop_recurrence_work_item_dry_run_v1",
"source": "channel_event_dossier.recurrence",
"project_id": recurrence.get("project_id"),
"work_item_id": work_item.get("work_item_id"),
"incident_id": work_item.get("incident_id"),
"auto_repair_id": work_item.get("auto_repair_id"),
"mode": selected_mode,
"requested_mode": mode,
"allowed": allowed,
"executed": allowed,
"safety_level": "read_only",
"writes_incident_state": False,
"writes_auto_repair_result": False,
"writes_ticket": False,
"checks": checks,
"verification_result_preview": _verification_result_preview(
selected_mode,
allowed,
),
"current_state_summary": _recurrence_current_state_summary(item, work_item),
"ticket_preview": _ticket_preview(item, work_item),
"plan": _recurrence_plan(item, work_item, selected_mode),
"read_model_route": {
"agent_id": "awooop_recurrence_coordinator",
"tool_name": "channel_event_dossier.recurrence",
"required_scope": "read",
"is_shadow": True,
"flywheel_node": "work_item",
},
"next_step": work_item.get("next_step"),
}
if not allowed:
payload["executed"] = False
return payload
def _recurrence_history_context(payload: dict[str, Any]) -> dict[str, Any]:
return {
"schema_version": "awooop_recurrence_work_item_dry_run_history_v1",
"source": payload.get("source"),
"project_id": payload.get("project_id"),
"work_item_id": payload.get("work_item_id"),
"incident_id": payload.get("incident_id"),
"auto_repair_id": payload.get("auto_repair_id"),
"mode": payload.get("mode"),
"requested_mode": payload.get("requested_mode"),
"allowed": payload.get("allowed"),
"executed": payload.get("executed"),
"safety_level": payload.get("safety_level"),
"writes_incident_state": payload.get("writes_incident_state"),
"writes_auto_repair_result": payload.get("writes_auto_repair_result"),
"writes_ticket": payload.get("writes_ticket"),
"verification_result_preview": payload.get("verification_result_preview"),
"current_state_summary": payload.get("current_state_summary"),
"ticket_preview": payload.get("ticket_preview"),
"read_model_route": payload.get("read_model_route"),
"checks": payload.get("checks"),
"next_step": payload.get("next_step"),
}
async def _record_recurrence_work_item_dry_run_history(
payload: dict[str, Any],
) -> dict[str, Any]:
incident_id = str(payload.get("incident_id") or "")
if not incident_id:
return {"recorded": False, "reason": "missing_incident_id"}
history: dict[str, Any] = {
"recorded": False,
"alert_operation_id": None,
"timeline_event_id": None,
}
context = _recurrence_history_context(payload)
allowed = bool(payload.get("allowed"))
try:
from src.repositories.alert_operation_log_repository import (
get_alert_operation_log_repository,
)
record = await get_alert_operation_log_repository().append(
"PRE_FLIGHT_PASSED" if allowed else "PRE_FLIGHT_FAILED",
incident_id=incident_id,
auto_repair_id=str(payload.get("auto_repair_id") or "") or None,
actor="awooop_recurrence_work_item_service",
action_detail=f"recurrence_work_item_dry_run:{payload.get('mode')}"[:200],
success=allowed,
context=context,
)
if record is not None:
history["alert_operation_id"] = getattr(record, "id", None)
except Exception as exc:
logger.warning(
"awooop_recurrence_work_item_alert_operation_history_failed",
incident_id=incident_id,
error=str(exc),
)
try:
from src.services.approval_db import get_timeline_service
event = await get_timeline_service().add_event(
event_type="verifier",
status="success" if allowed else "warning",
title="AwoooP recurrence work item dry-run",
description=_recurrence_history_description(context),
actor="awooop_recurrence_work_item_service",
actor_role=str(payload.get("mode") or "dry_run"),
incident_id=incident_id,
)
if event:
history["timeline_event_id"] = event.get("id")
except Exception as exc:
logger.warning(
"awooop_recurrence_work_item_timeline_history_failed",
incident_id=incident_id,
error=str(exc),
)
history["recorded"] = bool(
history.get("alert_operation_id") or history.get("timeline_event_id")
)
if not history["recorded"]:
history["reason"] = "history_sink_unavailable"
return history
def _recurrence_history_description(context: dict[str, Any]) -> str:
state = context.get("current_state_summary") or {}
route = context.get("read_model_route") or {}
return (
f"mode={context.get('mode')} "
f"preview={context.get('verification_result_preview')} "
f"occurrences={state.get('occurrence_total')} "
f"repair_status={state.get('repair_status')} "
f"route={route.get('agent_id')}/{route.get('tool_name')} "
f"writes_incident={context.get('writes_incident_state')} "
f"writes_auto_repair={context.get('writes_auto_repair_result')} "
f"writes_ticket={context.get('writes_ticket')}"
)[:500]
def build_dossier_coverage(
rows: list[dict[str, Any]],
*,
@@ -813,3 +1201,49 @@ async def fetch_channel_event_dossier_recurrence(
limit=safe_limit,
repair_summaries_by_incident=repair_summaries,
)
async def fetch_recurrence_work_item_preview(
*,
project_id: str | None,
work_item_id: str,
mode: RecurrenceWorkItemMode = "auto",
provider: str | None = None,
limit: int = _MAX_RECURRENCE_EVENTS,
) -> dict[str, Any]:
"""Fetch a read-only preview for a recurrence work item."""
recurrence = await fetch_channel_event_dossier_recurrence(
project_id=project_id,
provider=provider,
limit=limit,
)
return build_recurrence_work_item_preview(
recurrence,
work_item_id=work_item_id,
mode=mode,
)
async def fetch_recurrence_work_item_dry_run(
*,
project_id: str | None,
work_item_id: str,
mode: RecurrenceWorkItemMode = "auto",
provider: str | None = None,
limit: int = _MAX_RECURRENCE_EVENTS,
) -> dict[str, Any]:
"""Fetch and record a safe read-only dry-run for a recurrence work item."""
recurrence = await fetch_channel_event_dossier_recurrence(
project_id=project_id,
provider=provider,
limit=limit,
)
payload = build_recurrence_work_item_dry_run(
recurrence,
work_item_id=work_item_id,
mode=mode,
)
payload["history"] = await _record_recurrence_work_item_dry_run_history(payload)
return payload