Files
awoooi/apps/api/src/services/repair_candidate_service.py
Your Name fe74d8616e
Some checks failed
Code Review / ai-code-review (push) Successful in 14s
CD Pipeline / tests (push) Successful in 1m40s
Ansible / Reboot Recovery Contract / validate (push) Has been cancelled
CD Pipeline / post-deploy-checks (push) Has been cancelled
CD Pipeline / build-and-deploy (push) Has been cancelled
fix(api): expose controlled runtime promotion summaries
2026-06-26 23:56:24 +08:00

1277 lines
51 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Repair candidate generation from MCP evidence and PlayBook trust.
This service is intentionally candidate-only. It never executes a repair; it
builds an ApprovalRequestCreate only when evidence, PlayBook status, trust and
command safety gates are all explicit enough for a human approval flow.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
from urllib.parse import urlencode
import structlog
from src.models.approval import (
ApprovalRequestCreate,
BlastRadius,
DataImpact,
DryRunCheck,
)
from src.models.approval import RiskLevel as ApprovalRiskLevel
from src.models.incident import Incident
from src.models.playbook import (
ActionType,
Playbook,
PlaybookStatus,
)
from src.models.playbook import RiskLevel as PlaybookRiskLevel
from src.repositories.playbook_repository import get_playbook_repository
from src.services.action_parser import ActionKind, parse_kubectl_action
from src.services.auto_repair_service import AutoRepairService
from src.services.awooop_deeplinks import work_item_url
from src.services.evidence_snapshot import EvidenceSnapshot
from src.services.incident_service import get_incident_service
from src.services.playbook_match_resolver import resolve_playbook_id_for_alert
from src.services.pre_decision_investigator import get_pre_decision_investigator
logger = structlog.get_logger(__name__)
MIN_REPAIR_CANDIDATE_TRUST = 0.30
@dataclass
class RepairCandidateResult:
"""Candidate generation result used by webhook and Telegram handoff paths."""
approval_request: ApprovalRequestCreate | None = None
evidence: EvidenceSnapshot | None = None
playbook: Playbook | None = None
blockers: list[str] = field(default_factory=list)
metadata: dict[str, Any] = field(default_factory=dict)
@property
def candidate_found(self) -> bool:
return self.approval_request is not None
@property
def draft_ready_for_owner_review(self) -> bool:
return bool(self.metadata.get("repair_candidate_draft_ready"))
class RepairCandidateService:
"""Build a repair approval candidate without executing it."""
def __init__(
self,
*,
incident_service: Any | None = None,
investigator: Any | None = None,
playbook_repository: Any | None = None,
auto_repair_service: AutoRepairService | None = None,
) -> None:
self._incident_service = incident_service or get_incident_service()
self._investigator = investigator or get_pre_decision_investigator()
self._playbook_repository = playbook_repository or get_playbook_repository()
self._auto_repair = auto_repair_service or AutoRepairService()
async def build_from_incident_id(
self,
*,
incident_id: str,
alertname: str,
target_resource: str,
namespace: str,
message: str,
fallback_action: str,
matched_playbook_id: str | None = None,
rule_id: str | None = None,
severity: str | None = None,
) -> RepairCandidateResult:
"""Load an incident and produce a repair candidate or explicit blockers."""
incident = await self._incident_service.get_from_working_memory(incident_id)
if incident is None:
return RepairCandidateResult(
blockers=["incident_not_found"],
metadata={"repair_candidate_status": "blocked", "blockers": ["incident_not_found"]},
)
return await self.build_from_incident(
incident=incident,
alertname=alertname,
target_resource=target_resource,
namespace=namespace,
message=message,
fallback_action=fallback_action,
matched_playbook_id=matched_playbook_id,
rule_id=rule_id,
severity=severity,
)
async def build_from_incident(
self,
*,
incident: Incident,
alertname: str,
target_resource: str,
namespace: str,
message: str,
fallback_action: str,
matched_playbook_id: str | None = None,
rule_id: str | None = None,
severity: str | None = None,
) -> RepairCandidateResult:
evidence = await self._collect_evidence(incident)
metadata = self._base_metadata(evidence=evidence)
blockers: list[str] = []
if not evidence or evidence.sensors_succeeded <= 0:
blockers.append("mcp_evidence_missing")
playbook_id = matched_playbook_id or await resolve_playbook_id_for_alert(
rule_id=rule_id,
alertname=alertname,
affected_services=[target_resource] if target_resource else [],
severity=severity,
)
if not playbook_id:
blockers.append("playbook_not_matched")
return self._blocked_result(
blockers=blockers,
metadata=metadata,
evidence=evidence,
fallback_action=fallback_action,
incident=incident,
alertname=alertname,
target_resource=target_resource,
namespace=namespace,
)
playbook = await self._playbook_repository.get_by_id(playbook_id)
if not playbook:
blockers.append("playbook_not_found")
return self._blocked_result(
blockers=blockers,
metadata=metadata,
evidence=evidence,
fallback_action=fallback_action,
incident=incident,
alertname=alertname,
target_resource=target_resource,
namespace=namespace,
)
metadata["playbook_trust"] = {
"playbook_id": playbook.playbook_id,
"name": playbook.name,
"status": playbook.status.value,
"trust_score": float(playbook.trust_score),
}
if playbook.status != PlaybookStatus.APPROVED:
blockers.append("playbook_not_approved")
if float(playbook.trust_score) < MIN_REPAIR_CANDIDATE_TRUST:
blockers.append("playbook_trust_below_gate")
if self._is_generic_fallback_playbook(playbook):
blockers.append("playbook_generic_fallback_not_repair")
step, step_blockers = self._select_executable_step(incident, playbook)
blockers.extend(step_blockers)
if blockers or step is None:
return self._blocked_result(
blockers=blockers,
metadata=metadata,
evidence=evidence,
playbook=playbook,
fallback_action=fallback_action,
incident=incident,
alertname=alertname,
target_resource=target_resource,
namespace=namespace,
)
metadata["repair_candidate"] = {
"source": "mcp_evidence_playbook_trust",
"playbook_id": playbook.playbook_id,
"playbook_name": playbook.name,
"step_number": step.step_number,
"command": step.command,
"action_type": step.action_type.value,
"risk_level": step.risk_level.value,
"expected_result": step.expected_result,
"rollback_command": step.rollback_command,
"requires_approval": True,
}
metadata["verifier_plan"] = self._build_verifier_plan(
command=step.command,
namespace=namespace,
target_resource=target_resource,
evidence=evidence,
)
metadata["repair_candidate_status"] = "candidate_ready_for_approval"
metadata["fallback_replaced"] = True
approval_request = ApprovalRequestCreate(
action=step.command,
description=self._build_description(
message=message,
playbook=playbook,
step_command=step.command,
evidence=evidence,
verifier_plan=metadata["verifier_plan"],
),
risk_level=self._approval_risk(step.risk_level),
blast_radius=BlastRadius(
affected_pods=1,
estimated_downtime=f"{playbook.estimated_duration_minutes} min",
related_services=[target_resource] if target_resource else [],
data_impact=DataImpact.WRITE,
),
dry_run_checks=self._build_dry_run_checks(
evidence=evidence,
playbook=playbook,
command=step.command,
),
requested_by="OpenClaw (MCP evidence + PlayBook trust)",
incident_id=incident.incident_id,
metadata=metadata,
matched_playbook_id=playbook.playbook_id,
)
logger.info(
"repair_candidate_generated",
incident_id=incident.incident_id,
playbook_id=playbook.playbook_id,
sensors_succeeded=evidence.sensors_succeeded if evidence else 0,
command=step.command[:160],
)
return RepairCandidateResult(
approval_request=approval_request,
evidence=evidence,
playbook=playbook,
metadata=metadata,
)
async def _collect_evidence(self, incident: Incident) -> EvidenceSnapshot | None:
try:
return await self._investigator.investigate(incident)
except Exception as exc:
logger.warning(
"repair_candidate_evidence_collect_failed",
incident_id=incident.incident_id,
error=str(exc),
)
return None
def _select_executable_step(
self,
incident: Incident,
playbook: Playbook,
) -> tuple[Any | None, list[str]]:
blockers: list[str] = []
if not playbook.repair_steps:
return None, ["playbook_has_no_repair_steps"]
rejected_readonly = False
rejected_unsafe = False
for step in playbook.repair_steps:
command = (step.command or "").strip()
if not command:
continue
if step.action_type == ActionType.MANUAL:
rejected_readonly = True
continue
if step.action_type == ActionType.KUBECTL or command.startswith("kubectl"):
parsed = parse_kubectl_action(command)
if not parsed.ok:
rejected_unsafe = True
continue
if parsed.kind == ActionKind.READONLY:
rejected_readonly = True
continue
return step, []
if step.action_type == ActionType.SSH_COMMAND or command.startswith("ssh "):
if self._auto_repair.preview_write_ssh_mcp_route(incident, command):
return step, []
if self._looks_like_diagnostic_command(command):
rejected_readonly = True
continue
rejected_unsafe = True
if rejected_unsafe:
blockers.append("playbook_command_not_safely_routable")
if rejected_readonly:
blockers.append("playbook_observe_only")
if not blockers:
blockers.append("playbook_has_no_executable_step")
return None, blockers
def _base_metadata(self, *, evidence: EvidenceSnapshot | None) -> dict[str, Any]:
if not evidence:
return {
"source": "mcp_playbook_repair_candidate",
"mcp_evidence": {"status": "missing"},
}
return {
"source": "mcp_playbook_repair_candidate",
"mcp_evidence": {
"status": "collected" if evidence.sensors_succeeded > 0 else "degraded",
"snapshot_id": evidence.snapshot_id,
"sensors_attempted": evidence.sensors_attempted,
"sensors_succeeded": evidence.sensors_succeeded,
"mcp_health": evidence.mcp_health,
"summary_excerpt": (evidence.evidence_summary or "")[:800],
},
}
def _blocked_result(
self,
*,
blockers: list[str],
metadata: dict[str, Any],
fallback_action: str,
evidence: EvidenceSnapshot | None = None,
playbook: Playbook | None = None,
incident: Incident | None = None,
alertname: str = "",
target_resource: str = "",
namespace: str = "",
) -> RepairCandidateResult:
metadata["repair_candidate_blockers"] = list(dict.fromkeys(blockers))
metadata["repair_candidate_blocker_summary"] = self._humanize_blockers(
metadata["repair_candidate_blockers"]
)
draft_package = self._build_draft_package(
blockers=metadata["repair_candidate_blockers"],
playbook=playbook,
evidence=evidence,
incident=incident,
alertname=alertname,
target_resource=target_resource,
namespace=namespace,
)
draft_ready = self._is_owner_review_ready_draft(
draft_package=draft_package,
blockers=metadata["repair_candidate_blockers"],
)
metadata["repair_candidate_status"] = (
"draft_ready_for_owner_review" if draft_ready else "blocked"
)
metadata["repair_candidate_draft_ready"] = draft_ready
metadata["repair_candidate_owner_review_required"] = True
metadata["repair_candidate_runtime_action_ready"] = False
metadata["repair_candidate_runtime_execution_authorized"] = False
if draft_ready:
draft_package["status"] = "owner_review_ready"
draft_package["owner_review_gate"] = "required"
draft_package["next_action"] = "owner_review_repair_candidate_draft"
draft_package["runtime_execution_authorized"] = False
draft_package["writes_runtime_state"] = False
work_item = draft_package.get("awooop_work_item")
if isinstance(work_item, dict):
work_item["status"] = "owner_review_ready"
work_item["next_action"] = "owner_review_repair_candidate_draft"
work_item["owner_review_required"] = True
work_item["runtime_execution_authorized"] = False
promotion_contract = draft_package.get("candidate_promotion_contract")
if isinstance(promotion_contract, dict):
metadata["repair_candidate_promotion_contract"] = promotion_contract
metadata["repair_candidate_promotion_summary"] = (
self._promotion_summary_for_operator(promotion_contract)
)
metadata["playbook_draft_required"] = True
metadata["repair_candidate_prefilled_draft_summary"] = self._draft_summary_for_operator(
draft_package.get("playbook_draft_template") or {}
)
metadata["repair_candidate_next_step"] = self._next_step_with_draft_summary(
str(draft_package["next_step"]),
str(metadata["repair_candidate_prefilled_draft_summary"]),
)
metadata["repair_candidate_draft_package"] = draft_package
metadata["fallback_action"] = fallback_action
return RepairCandidateResult(
evidence=evidence,
playbook=playbook,
blockers=metadata["repair_candidate_blockers"],
metadata=metadata,
)
def _is_generic_fallback_playbook(self, playbook: Playbook) -> bool:
alert_names = playbook.symptom_pattern.alert_names or []
if "*" in alert_names:
return True
return "通用兜底" in playbook.name or "generic fallback" in playbook.name.lower()
def _looks_like_diagnostic_command(self, command: str) -> bool:
command = command.lower()
diagnostic_markers = (
"uptime",
"ps aux",
"docker stats",
"journalctl",
"systemctl status",
"kubectl get",
"kubectl describe",
"tail ",
"grep ",
)
return any(marker in command for marker in diagnostic_markers)
def _humanize_blockers(self, blockers: list[str]) -> str:
labels = {
"incident_not_found": "找不到 incident無法綁定真相鏈",
"mcp_evidence_missing": "MCP 證據未完成",
"playbook_not_matched": "沒有命中專屬 PlayBook",
"playbook_not_found": "命中的 PlayBook 不存在",
"playbook_not_approved": "PlayBook 尚未批准",
"playbook_trust_below_gate": "PlayBook trust score 低於候選門檻",
"playbook_generic_fallback_not_repair": "只命中通用兜底 PlayBook禁止當成修復命令",
"playbook_has_no_repair_steps": "PlayBook 沒有修復步驟",
"playbook_command_not_safely_routable": "PlayBook 命令未通過安全路由",
"playbook_observe_only": "PlayBook 只有觀察或診斷步驟",
"playbook_has_no_executable_step": "PlayBook 沒有可執行修復步驟",
}
return "".join(labels.get(blocker, blocker) for blocker in blockers)
def _build_draft_package(
self,
*,
blockers: list[str],
playbook: Playbook | None,
evidence: EvidenceSnapshot | None,
incident: Incident | None,
alertname: str = "",
target_resource: str = "",
namespace: str = "",
) -> dict[str, Any]:
"""Describe the concrete owner-review package needed to unblock repair.
The package is a handoff contract only. It must not be interpreted as
approval to mutate runtime state or auto-create an approved PlayBook.
"""
blocker_set = set(blockers)
if "incident_not_found" in blocker_set:
lane = "restore_truth_chain_before_repair"
next_step = "先修復 incident / approval 真相鏈綁定,再重跑 MCP evidence 與 PlayBook 匹配。"
elif "mcp_evidence_missing" in blocker_set:
lane = "rerun_mcp_evidence_collection"
next_step = (
"先按重診收集 MCP evidence成功後再建立服務專屬 PlayBook 草案,"
"禁止只憑通用規則批准修復。"
)
elif {
"playbook_not_matched",
"playbook_not_found",
"playbook_generic_fallback_not_repair",
} & blocker_set:
lane = "create_service_specific_repair_playbook"
next_step = (
"建立專屬 PlayBook 草案:綁定 alertname / target selector補 MCP evidence refs、"
"修復命令、rollback、verifier plan 與 owner review通用兜底不可執行。"
)
elif "playbook_observe_only" in blocker_set:
lane = "promote_diagnostic_to_repair_playbook"
next_step = (
"把診斷命令保留為 MCP evidence collector另建獨立修復步驟、rollback "
"與 verifier經 owner review 後才可進入批准。"
)
elif "playbook_command_not_safely_routable" in blocker_set:
lane = "route_command_through_safe_mcp_or_ansible"
next_step = (
"將命令改走 allowlisted MCP / Ansible route補 blast radius、rollback "
"與 verifier plan再送 owner review。"
)
elif {
"playbook_not_approved",
"playbook_trust_below_gate",
} & blocker_set:
lane = "owner_review_playbook_trust_gate"
next_step = (
"由 owner review PlayBook 狀態與 trust score補成功/失敗證據後才可進入修復候選。"
)
else:
lane = "repair_candidate_owner_review"
next_step = (
"建立人工處置包並補 PlayBook 草案欄位;完成 owner review 後再重跑候選生成。"
)
evidence_ref = None
if evidence and evidence.snapshot_id:
evidence_ref = evidence.snapshot_id
required_fields = [
"alertname",
"target_selector",
"mcp_evidence_refs",
"repair_command",
"rollback_command",
"verifier_plan",
"owner_review",
"script_or_ansible_ref",
"schedule_or_monitoring_rule_ref",
"km_update_plan",
"automation_asset_record",
]
blocked_operations = [
"auto_execute",
"approve_no_action_as_repair",
"generic_fallback_repair",
]
automation_asset_requirements = [
{
"asset_type": "KM",
"required_record": "incident_root_cause_and_resolution_summary",
"visibility": "knowledge_base",
"owner_review_required": True,
},
{
"asset_type": "PlayBook",
"required_record": "service_specific_repair_steps_with_trust_policy",
"visibility": "awooop_work_items",
"owner_review_required": True,
},
{
"asset_type": "ScriptOrAnsible",
"required_record": "safe_route_command_or_ansible_playbook_ref",
"visibility": "runs_and_work_items",
"owner_review_required": True,
},
{
"asset_type": "ScheduleOrMonitoringRule",
"required_record": "recurrence_detection_or_alert_rule_gap",
"visibility": "observability_and_reports",
"owner_review_required": True,
},
{
"asset_type": "Verifier",
"required_record": "success_failure_and_rollback_verification_plan",
"visibility": "incident_timeline",
"owner_review_required": True,
},
]
required_writebacks = [
"incident_timeline_stage_update",
"execution_or_manual_handoff_result",
"verifier_result",
"km_update_draft",
"playbook_trust_update",
"automation_asset_inventory_record",
]
coverage_gap = self._build_coverage_gap(
blockers=blockers,
lane=lane,
alertname=alertname,
target_resource=target_resource,
namespace=namespace,
evidence=evidence,
playbook=playbook,
)
playbook_draft_template = self._build_playbook_draft_template(
coverage_gap=coverage_gap,
blockers=blockers,
lane=lane,
alertname=alertname,
target_resource=target_resource,
namespace=namespace,
evidence=evidence,
playbook=playbook,
)
incident_id = getattr(incident, "incident_id", "") if incident else ""
project_id = str(getattr(incident, "project_id", "awoooi") or "awoooi")
work_item: dict[str, Any] | None = None
work_item_id = ""
work_item_url_value = ""
if incident_id:
work_item_id = f"repair-candidate-draft:{project_id}:{incident_id}:{lane}"
work_item_query = urlencode({
"project_id": project_id,
"incident_id": incident_id,
"work_item_id": work_item_id,
})
run_query = urlencode({
"project_id": project_id,
"incident_id": incident_id,
})
work_item_url_value = work_item_url(
work_item_id,
incident_id=incident_id,
project_id=project_id,
)
work_item = {
"schema_version": "awooop_repair_candidate_draft_work_item_v1",
"work_item_id": work_item_id,
"kind": "repair_candidate_playbook_draft",
"status": "open",
"needs_human": True,
"project_id": project_id,
"incident_id": incident_id,
"alertname": alertname or None,
"namespace": namespace or None,
"target_resource": target_resource or None,
"lane": lane,
"reason": ",".join(blockers),
"next_step": next_step,
"required_fields": required_fields,
"automation_asset_requirements": automation_asset_requirements,
"required_writebacks": required_writebacks,
"coverage_gap": coverage_gap,
"playbook_draft_template": playbook_draft_template,
"blocked_operations": blocked_operations,
"target_href": f"/awooop/runs?{run_query}",
"work_item_href": f"/awooop/work-items?{work_item_query}",
"work_item_url": work_item_url_value,
"decision_effect": "none",
"safety_level": "read_only_work_item_projection",
"writes_incident_state": False,
"writes_auto_repair_result": False,
"writes_runtime_state": False,
}
promotion_contract = self._build_candidate_promotion_contract(
coverage_gap=coverage_gap,
playbook_draft_template=playbook_draft_template,
lane=lane,
blockers=blockers,
incident_id=incident_id,
project_id=project_id,
work_item_id=work_item_id,
work_item_url_value=work_item_url_value,
)
if work_item is not None:
work_item["candidate_promotion_contract"] = promotion_contract
package = {
"schema_version": "repair_candidate_draft_package_v1",
"status": "draft_required",
"lane": lane,
"next_step": next_step,
"matched_playbook_id": playbook.playbook_id if playbook else None,
"matched_playbook_name": playbook.name if playbook else None,
"evidence_snapshot_id": evidence_ref,
"required_fields": required_fields,
"automation_asset_requirements": automation_asset_requirements,
"required_writebacks": required_writebacks,
"coverage_gap": coverage_gap,
"playbook_draft_template": playbook_draft_template,
"candidate_promotion_contract": promotion_contract,
"blocked_operations": blocked_operations,
}
if work_item:
package["awooop_work_item"] = work_item
return package
def _build_candidate_promotion_contract(
self,
*,
coverage_gap: dict[str, Any],
playbook_draft_template: dict[str, Any],
lane: str,
blockers: list[str],
incident_id: str,
project_id: str,
work_item_id: str,
work_item_url_value: str,
) -> dict[str, Any]:
"""Describe exactly what is needed to promote a draft into an apply gate.
This contract is still fail-closed. It gives AwoooP / Telegram a
machine-readable checklist so a draft does not collapse back into
generic "manual review" text.
"""
route = str(playbook_draft_template.get("suggested_route") or "").strip()
repair_template = str(
playbook_draft_template.get("repair_command_template") or ""
).strip()
rollback_template = str(
playbook_draft_template.get("rollback_command_template") or ""
).strip()
verifier_plan = playbook_draft_template.get("verifier_plan_template")
if not isinstance(verifier_plan, list):
verifier_plan = []
evidence_refs = playbook_draft_template.get("mcp_evidence_refs")
if not isinstance(evidence_refs, list):
evidence_refs = []
alert_selector = playbook_draft_template.get("alert_selector")
if not isinstance(alert_selector, dict):
alert_selector = {}
field_rows = [
self._promotion_contract_field(
field="target_selector",
label="Alert / target selector",
status="ready" if alert_selector.get("target_resource") else "blocked",
source="coverage_gap",
value=coverage_gap.get("coverage_key"),
),
self._promotion_contract_field(
field="mcp_evidence_refs",
label="MCP evidence refs",
status=(
"ready"
if coverage_gap.get("mcp_evidence_ready") and evidence_refs
else "blocked"
),
source="mcp_evidence",
value=evidence_refs[:8],
),
self._promotion_contract_field(
field="route_id",
label="Safe route after owner review",
status="ready" if route.endswith("_after_owner_review") else "blocked",
source="playbook_draft_template",
value=route or "--",
),
self._promotion_contract_field(
field="repair_command_template",
label="Repair command template",
status="ready"
if repair_template
and "<" not in repair_template
and not repair_template.startswith("owner_supplied")
else "blocked",
source="playbook_draft_template",
value=repair_template or "--",
),
self._promotion_contract_field(
field="rollback_command_template",
label="Rollback command template",
status="ready"
if rollback_template
and "<" not in rollback_template
and not rollback_template.startswith("owner_supplied")
else "blocked",
source="playbook_draft_template",
value=rollback_template or "--",
),
self._promotion_contract_field(
field="verifier_plan",
label="Post-apply verifier plan",
status="ready" if verifier_plan else "blocked",
source="playbook_draft_template",
value=verifier_plan[:8],
),
self._promotion_contract_field(
field="owner_review",
label="Owner review release",
status="blocked",
source="owner_response",
value="required_before_apply_gate",
),
self._promotion_contract_field(
field="maintenance_window",
label="Maintenance window",
status="blocked",
source="owner_response",
value="required_before_runtime_write",
),
self._promotion_contract_field(
field="blast_radius",
label="Blast radius",
status="blocked",
source="owner_response",
value="required_before_runtime_write",
),
self._promotion_contract_field(
field="km_writeback_owner",
label="KM writeback owner",
status="blocked",
source="owner_response",
value="required_before_closure",
),
self._promotion_contract_field(
field="playbook_trust_owner",
label="PlayBook trust owner",
status="blocked",
source="owner_response",
value="required_before_trust_raise",
),
]
ready_fields = [row["field"] for row in field_rows if row["status"] == "ready"]
blocked_fields = [row["field"] for row in field_rows if row["status"] != "ready"]
status = (
"owner_review_ready_runtime_blocked"
if {
"target_selector",
"mcp_evidence_refs",
"route_id",
"repair_command_template",
} <= set(ready_fields)
else "blocked_missing_candidate_inputs"
)
return {
"schema_version": "repair_candidate_promotion_contract_v1",
"status": status,
"lane": lane,
"incident_id": incident_id or None,
"project_id": project_id,
"source_work_item_id": work_item_id or None,
"source_work_item_url": work_item_url_value or None,
"route_id": route or "--",
"repair_command_template": repair_template or "--",
"rollback_command_template": rollback_template or "--",
"verifier_plan_template": list(verifier_plan),
"ready_count": len(ready_fields),
"total_count": len(field_rows),
"blocked_count": len(blocked_fields),
"ready_fields": ready_fields,
"blocked_fields": blocked_fields,
"fields": field_rows,
"blockers": list(dict.fromkeys(blockers)),
"runtime_write_allowed": False,
"runtime_execution_authorized": False,
"approval_required_before_execution": True,
"owner_review_required": True,
"forbidden_until_promoted": [
"auto_execute",
"systemctl_restart",
"ssh_write",
"ansible_apply",
"telegram_success_message",
"km_writeback",
"playbook_trust_writeback",
],
"next_steps": [
"owner_review_release",
"fill_maintenance_window_and_blast_radius",
"approve_post_apply_verifier",
"assign_km_and_playbook_trust_writeback_owner",
"rerun_repair_candidate_gate_after_owner_release",
],
}
def _promotion_contract_field(
self,
*,
field: str,
label: str,
status: str,
source: str,
value: Any,
) -> dict[str, Any]:
return {
"field": field,
"label": label,
"status": status,
"source": source,
"value": value,
"runtime_execution_authorized": False,
}
def _promotion_summary_for_operator(self, contract: dict[str, Any]) -> str:
route = str(contract.get("route_id") or "--")
ready = int(contract.get("ready_count") or 0)
total = int(contract.get("total_count") or 0)
blocked = int(contract.get("blocked_count") or 0)
status = str(contract.get("status") or "unknown")
runtime_state = (
"controlled"
if contract.get("runtime_execution_authorized") is True
or contract.get("runtime_write_allowed") is True
else "false"
)
return (
f"route={route}; promotion={ready}/{total}; "
f"blocked={blocked}; status={status}; runtime={runtime_state}"
)
def _draft_summary_for_operator(self, template: dict[str, Any]) -> str:
route = str(template.get("suggested_route") or "--")
repair = str(template.get("repair_command_template") or "--")
verifier = template.get("verifier_plan_template") or []
verifier_head = "--"
if isinstance(verifier, list) and verifier:
verifier_head = str(verifier[0])
return f"route={route}; repair_template={repair}; verifier={verifier_head}"
def _next_step_with_draft_summary(self, next_step: str, draft_summary: str) -> str:
if not draft_summary or draft_summary == "route=--; repair_template=--; verifier=--":
return next_step
return f"{next_step} 草案已預填:{draft_summary}"
def _is_owner_review_ready_draft(
self,
*,
draft_package: dict[str, Any],
blockers: list[str],
) -> bool:
"""Return true when the handoff is a concrete owner-review draft.
This still does not make the action executable. It only separates a
useful prefilled draft from a genuinely missing repair candidate.
"""
coverage_gap = draft_package.get("coverage_gap")
template = draft_package.get("playbook_draft_template")
work_item = draft_package.get("awooop_work_item")
if not isinstance(coverage_gap, dict) or not isinstance(template, dict):
return False
if not isinstance(work_item, dict):
return False
if "mcp_evidence_missing" in set(blockers):
return False
if "playbook_command_not_safely_routable" in set(blockers):
return False
if not coverage_gap.get("mcp_evidence_ready"):
return False
repair_template = str(template.get("repair_command_template") or "").strip()
route = str(template.get("suggested_route") or "").strip()
if not repair_template or not route:
return False
if "<" in repair_template:
return False
if repair_template.startswith("owner_supplied"):
return False
if repair_template.startswith("service-specific repair"):
return False
return route.endswith("_after_owner_review")
def _build_playbook_draft_template(
self,
*,
coverage_gap: dict[str, Any],
blockers: list[str],
lane: str,
alertname: str,
target_resource: str,
namespace: str,
evidence: EvidenceSnapshot | None,
playbook: Playbook | None,
) -> dict[str, Any]:
"""Prefill a service-specific PlayBook draft without authorizing execution."""
target_kind = str(coverage_gap.get("target_kind") or "unknown")
target = (target_resource or "").strip()
ns = (namespace or "").strip()
command_template = "owner_supplied_service_specific_command_after_evidence_review"
rollback_template = "owner_supplied_rollback_or_safe_stop_after_evidence_review"
route = "owner_review_required"
verifier_plan = [
"rerun_mcp_evidence_collection",
"confirm_fingerprint_recurrence_stops_or_decreases",
"write_execution_result_and_verifier_outcome",
"update_km_and_playbook_trust_after_owner_review",
"record_script_or_ansible_asset_ref",
"record_schedule_or_monitoring_rule_gap",
"publish_assets_to_runs_work_items_and_knowledge_base",
]
if target_kind == "k8s_workload":
workload = target.replace("deployment/", "").replace("pod/", "").replace("svc/", "")
workload_ref = workload or "<deployment>"
namespace_ref = ns or "<namespace>"
command_template = (
f"kubectl rollout restart deployment/{workload_ref} -n {namespace_ref}"
)
rollback_template = (
f"kubectl rollout undo deployment/{workload_ref} -n {namespace_ref}"
)
route = "k8s_rollout_after_owner_review"
verifier_plan.extend([
f"kubectl rollout status deployment/{workload_ref} -n {namespace_ref}",
f"kubectl get events -n {namespace_ref} --field-selector involvedObject.name={workload_ref}",
])
elif target_kind == "host_service":
service_ref = target or "<service>"
command_template = f"systemctl restart {service_ref}"
rollback_template = f"systemctl status {service_ref}; journalctl -u {service_ref} -n 120"
route = "host_service_route_after_owner_review"
verifier_plan.extend([
f"systemctl is-active {service_ref}",
f"journalctl -u {service_ref} -n 120 --no-pager",
])
elif target_kind == "database":
db_ref = target or "postgres"
command_template = (
"owner_supplied_query_or_index_fix_after_readonly_evidence_review"
)
rollback_template = (
"owner_supplied_db_rollback_plan_with_restore_and_no_false_green_check"
)
route = "database_slow_query_owner_review"
verifier_plan.extend([
"read_only_pg_stat_activity_and_pg_locks_before_after",
"read_only_pg_stat_statements_p95_or_top_query_delta",
"confirm_api_latency_and_error_rate_recover",
"confirm_no_new_lock_wait_or_connection_pool_saturation",
f"confirm target database surface remains stable for {db_ref}",
])
elif target_kind == "service":
service_ref = target or "<service>"
command_template = f"service-specific repair for {service_ref}"
rollback_template = f"service-specific rollback for {service_ref}"
route = "service_specific_route_after_owner_review"
verifier_plan.append(f"confirm service health for {service_ref}")
owner_review_checklist = [
"確認 alertname 與 target selector 沒有誤配",
"確認 MCP evidence refs 足以支持修復假設",
"確認 repair command 不是純診斷命令或通用兜底命令",
"確認 rollback command 能在失敗時安全收斂",
"確認 verifier plan 能判斷成功、失敗與是否升級人工",
"確認 blast radius、maintenance window 與 owner 已填寫",
"確認 runtime_execution_authorized 仍為 false直到正式 approval gate 通過",
]
return {
"schema_version": "service_specific_playbook_draft_template_v1",
"status": "prefilled_owner_review_required",
"lane": lane,
"coverage_key": coverage_gap.get("coverage_key"),
"blocking_stage": coverage_gap.get("blocking_stage"),
"blockers": list(dict.fromkeys(blockers)),
"alert_selector": {
"alertname": alertname or None,
"namespace": ns or None,
"target_resource": target or None,
"target_kind": target_kind,
},
"mcp_evidence_refs": coverage_gap.get("required_mcp_evidence_refs") or [],
"matched_playbook": {
"playbook_id": playbook.playbook_id if playbook else None,
"name": playbook.name if playbook else None,
"status": playbook.status.value if playbook else None,
"trust_score": float(playbook.trust_score) if playbook else None,
},
"suggested_route": route,
"repair_command_template": command_template,
"rollback_command_template": rollback_template,
"verifier_plan_template": list(dict.fromkeys(verifier_plan)),
"owner_review_checklist": owner_review_checklist,
"evidence_snapshot_id": evidence.snapshot_id if evidence else None,
"template_is_executable": False,
"approval_required_before_execution": True,
"runtime_execution_authorized": False,
"writes_runtime_state": False,
"telegram_send_authorized": False,
}
def _build_coverage_gap(
self,
*,
blockers: list[str],
lane: str,
alertname: str,
target_resource: str,
namespace: str,
evidence: EvidenceSnapshot | None,
playbook: Playbook | None,
) -> dict[str, Any]:
"""Describe why this alert is not yet covered by an executable PlayBook."""
blocker_set = set(blockers)
coverage_key = self._coverage_key(alertname=alertname, target_resource=target_resource)
target_kind = self._infer_target_kind(target_resource=target_resource, namespace=namespace)
if "mcp_evidence_missing" in blocker_set:
blocking_stage = "mcp_evidence"
elif {
"playbook_not_matched",
"playbook_not_found",
"playbook_generic_fallback_not_repair",
"playbook_observe_only",
"playbook_has_no_repair_steps",
"playbook_has_no_executable_step",
} & blocker_set:
blocking_stage = "service_playbook_coverage"
elif {"playbook_not_approved", "playbook_trust_below_gate"} & blocker_set:
blocking_stage = "playbook_trust_gate"
elif "playbook_command_not_safely_routable" in blocker_set:
blocking_stage = "safe_execution_route"
else:
blocking_stage = "owner_review"
evidence_refs = [
"alertmanager_event",
"incident_timeline",
"mcp_health_snapshot",
"k8s_or_host_status",
"metrics_or_logs_window",
"recurrence_fingerprint",
]
if target_kind == "k8s_workload":
evidence_refs.extend(["k8s_events", "rollout_status"])
elif target_kind == "host_service":
evidence_refs.extend(["systemd_or_container_status", "host_resource_window"])
elif target_kind == "database":
evidence_refs.extend([
"postgres_readonly_activity",
"postgres_lock_waits",
"pg_stat_statements_top_queries",
"connection_pool_window",
"recent_deploy_or_migration_refs",
"backup_freshness_before_write_candidate",
])
return {
"schema_version": "repair_candidate_coverage_gap_v1",
"coverage_key": coverage_key,
"target_kind": target_kind,
"blocking_stage": blocking_stage,
"next_owner_lane": lane,
"alertname": alertname or None,
"namespace": namespace or None,
"target_resource": target_resource or None,
"matched_playbook_id": playbook.playbook_id if playbook else None,
"evidence_snapshot_id": evidence.snapshot_id if evidence else None,
"mcp_evidence_ready": bool(evidence and evidence.sensors_succeeded > 0),
"required_mcp_evidence_refs": list(dict.fromkeys(evidence_refs)),
"playbook_template_required": True,
"playbook_template_fields": [
"symptom_pattern.alert_names",
"symptom_pattern.affected_services",
"mcp_evidence_refs",
"repair_steps.command_or_ansible_ref",
"repair_steps.rollback_command",
"verifier_plan",
"owner_review_record",
"trust_score_update_policy",
"script_or_ansible_ref",
"schedule_or_alert_rule_ref",
"automation_asset_record",
"dashboard_visibility_refs",
],
"blocked_operations": [
"auto_execute",
"mark_repaired_without_execution",
"approve_no_action_as_repair",
*(
[
"terminate_backend_without_owner_review",
"postgres_restart_without_maintenance_window",
"migration_or_reindex_without_rollback",
]
if target_kind == "database"
else []
),
],
"runtime_execution_authorized": False,
"writes_runtime_state": False,
}
def _coverage_key(self, *, alertname: str, target_resource: str) -> str:
raw_alert = (alertname or "unknown-alert").strip().lower()
raw_target = (target_resource or "unknown-target").strip().lower()
safe_alert = "-".join(raw_alert.replace("_", "-").split())
safe_target = "-".join(raw_target.replace("_", "-").split())
return f"{safe_alert}:{safe_target}"
def _infer_target_kind(self, *, target_resource: str, namespace: str) -> str:
target = (target_resource or "").lower()
if any(marker in target for marker in ("postgres", "postgresql", "pgbouncer", "database", "db-")):
return "database"
if any(marker in target for marker in ("node-exporter", "host", "188", "111", "168")):
return "host_service"
if namespace or any(marker in target for marker in ("deployment/", "pod/", "svc/", "api")):
return "k8s_workload"
if target:
return "service"
return "unknown"
def _build_description(
self,
*,
message: str,
playbook: Playbook,
step_command: str,
evidence: EvidenceSnapshot | None,
verifier_plan: list[str],
) -> str:
evidence_line = "MCP evidence missing"
if evidence:
evidence_line = (
f"MCP evidence {evidence.sensors_succeeded}/{evidence.sensors_attempted}; "
f"snapshot={evidence.snapshot_id}"
)
verifier_text = "; ".join(verifier_plan)
return (
"LLM fallback 後由 MCP evidence + PlayBook trust 產生修復候選。\n"
f"原始告警:{message[:500]}\n"
f"PlayBook{playbook.playbook_id} / {playbook.name} / trust={playbook.trust_score:.2f}\n"
f"證據:{evidence_line}\n"
f"候選命令:{step_command}\n"
f"Verifier plan{verifier_text}\n"
"注意:這只是 approval candidate未經批准不得執行。"
)
def _build_dry_run_checks(
self,
*,
evidence: EvidenceSnapshot | None,
playbook: Playbook,
command: str,
) -> list[DryRunCheck]:
sensors_attempted = evidence.sensors_attempted if evidence else 0
sensors_succeeded = evidence.sensors_succeeded if evidence else 0
return [
DryRunCheck(
name="MCP evidence collected",
passed=sensors_succeeded > 0,
message=f"{sensors_succeeded}/{sensors_attempted} sensors succeeded",
),
DryRunCheck(
name="PlayBook approved and trusted",
passed=(
playbook.status == PlaybookStatus.APPROVED
and float(playbook.trust_score) >= MIN_REPAIR_CANDIDATE_TRUST
),
message=f"{playbook.playbook_id} trust={playbook.trust_score:.2f}",
),
DryRunCheck(
name="Command safety gate",
passed=True,
message=command[:240],
),
DryRunCheck(
name="Post execution verifier planned",
passed=True,
message="verify health/evidence after execution before closing incident",
),
]
def _build_verifier_plan(
self,
*,
command: str,
namespace: str,
target_resource: str,
evidence: EvidenceSnapshot | None,
) -> list[str]:
plan = [
"rerun_pre_decision_evidence_after_execution",
"compare_mcp_sensor_success_before_after",
]
if command.startswith("kubectl"):
plan.append(f"kubectl rollout status/get events in {namespace or 'awoooi-prod'}")
if command.startswith("ssh "):
plan.append("ssh_mcp_readonly_health_check_after_write_route")
if target_resource:
plan.append(f"confirm target_resource={target_resource} alert stops recurring")
if evidence and evidence.snapshot_id:
plan.append(f"baseline_snapshot={evidence.snapshot_id}")
return plan
def _approval_risk(self, risk: PlaybookRiskLevel) -> ApprovalRiskLevel:
try:
mapped = ApprovalRiskLevel(risk.value.lower())
except Exception:
return ApprovalRiskLevel.MEDIUM
if mapped == ApprovalRiskLevel.LOW:
return ApprovalRiskLevel.MEDIUM
return mapped
_repair_candidate_service: RepairCandidateService | None = None
def get_repair_candidate_service() -> RepairCandidateService:
"""Return singleton candidate builder."""
global _repair_candidate_service
if _repair_candidate_service is None:
_repair_candidate_service = RepairCandidateService()
return _repair_candidate_service