Files
awoooi/apps/api/src/services/repair_candidate_service.py
Your Name febe9ecfcd
All checks were successful
CD Pipeline / tests (push) Successful in 1m28s
Code Review / ai-code-review (push) Successful in 14s
CD Pipeline / build-and-deploy (push) Successful in 4m0s
CD Pipeline / post-deploy-checks (push) Successful in 1m48s
fix(api): 補修復候選人工草案包
2026-06-11 18:24:16 +08:00

568 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Repair candidate generation from MCP evidence and PlayBook trust.
This service is intentionally candidate-only. It never executes a repair; it
builds an ApprovalRequestCreate only when evidence, PlayBook status, trust and
command safety gates are all explicit enough for a human approval flow.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
import structlog
from src.models.approval import (
ApprovalRequestCreate,
BlastRadius,
DataImpact,
DryRunCheck,
)
from src.models.approval import RiskLevel as ApprovalRiskLevel
from src.models.incident import Incident
from src.models.playbook import (
ActionType,
Playbook,
PlaybookStatus,
)
from src.models.playbook import RiskLevel as PlaybookRiskLevel
from src.repositories.playbook_repository import get_playbook_repository
from src.services.action_parser import ActionKind, parse_kubectl_action
from src.services.auto_repair_service import AutoRepairService
from src.services.evidence_snapshot import EvidenceSnapshot
from src.services.incident_service import get_incident_service
from src.services.playbook_match_resolver import resolve_playbook_id_for_alert
from src.services.pre_decision_investigator import get_pre_decision_investigator
logger = structlog.get_logger(__name__)
MIN_REPAIR_CANDIDATE_TRUST = 0.30
@dataclass
class RepairCandidateResult:
"""Candidate generation result used by webhook and Telegram handoff paths."""
approval_request: ApprovalRequestCreate | None = None
evidence: EvidenceSnapshot | None = None
playbook: Playbook | None = None
blockers: list[str] = field(default_factory=list)
metadata: dict[str, Any] = field(default_factory=dict)
@property
def candidate_found(self) -> bool:
return self.approval_request is not None
class RepairCandidateService:
"""Build a repair approval candidate without executing it."""
def __init__(
self,
*,
incident_service: Any | None = None,
investigator: Any | None = None,
playbook_repository: Any | None = None,
auto_repair_service: AutoRepairService | None = None,
) -> None:
self._incident_service = incident_service or get_incident_service()
self._investigator = investigator or get_pre_decision_investigator()
self._playbook_repository = playbook_repository or get_playbook_repository()
self._auto_repair = auto_repair_service or AutoRepairService()
async def build_from_incident_id(
self,
*,
incident_id: str,
alertname: str,
target_resource: str,
namespace: str,
message: str,
fallback_action: str,
matched_playbook_id: str | None = None,
rule_id: str | None = None,
severity: str | None = None,
) -> RepairCandidateResult:
"""Load an incident and produce a repair candidate or explicit blockers."""
incident = await self._incident_service.get_from_working_memory(incident_id)
if incident is None:
return RepairCandidateResult(
blockers=["incident_not_found"],
metadata={"repair_candidate_status": "blocked", "blockers": ["incident_not_found"]},
)
return await self.build_from_incident(
incident=incident,
alertname=alertname,
target_resource=target_resource,
namespace=namespace,
message=message,
fallback_action=fallback_action,
matched_playbook_id=matched_playbook_id,
rule_id=rule_id,
severity=severity,
)
async def build_from_incident(
self,
*,
incident: Incident,
alertname: str,
target_resource: str,
namespace: str,
message: str,
fallback_action: str,
matched_playbook_id: str | None = None,
rule_id: str | None = None,
severity: str | None = None,
) -> RepairCandidateResult:
evidence = await self._collect_evidence(incident)
metadata = self._base_metadata(evidence=evidence)
blockers: list[str] = []
if not evidence or evidence.sensors_succeeded <= 0:
blockers.append("mcp_evidence_missing")
playbook_id = matched_playbook_id or await resolve_playbook_id_for_alert(
rule_id=rule_id,
alertname=alertname,
affected_services=[target_resource] if target_resource else [],
severity=severity,
)
if not playbook_id:
blockers.append("playbook_not_matched")
return self._blocked_result(
blockers=blockers,
metadata=metadata,
evidence=evidence,
fallback_action=fallback_action,
)
playbook = await self._playbook_repository.get_by_id(playbook_id)
if not playbook:
blockers.append("playbook_not_found")
return self._blocked_result(
blockers=blockers,
metadata=metadata,
evidence=evidence,
fallback_action=fallback_action,
)
metadata["playbook_trust"] = {
"playbook_id": playbook.playbook_id,
"name": playbook.name,
"status": playbook.status.value,
"trust_score": float(playbook.trust_score),
}
if playbook.status != PlaybookStatus.APPROVED:
blockers.append("playbook_not_approved")
if float(playbook.trust_score) < MIN_REPAIR_CANDIDATE_TRUST:
blockers.append("playbook_trust_below_gate")
if self._is_generic_fallback_playbook(playbook):
blockers.append("playbook_generic_fallback_not_repair")
step, step_blockers = self._select_executable_step(incident, playbook)
blockers.extend(step_blockers)
if blockers or step is None:
return self._blocked_result(
blockers=blockers,
metadata=metadata,
evidence=evidence,
playbook=playbook,
fallback_action=fallback_action,
)
metadata["repair_candidate"] = {
"source": "mcp_evidence_playbook_trust",
"playbook_id": playbook.playbook_id,
"playbook_name": playbook.name,
"step_number": step.step_number,
"command": step.command,
"action_type": step.action_type.value,
"risk_level": step.risk_level.value,
"expected_result": step.expected_result,
"rollback_command": step.rollback_command,
"requires_approval": True,
}
metadata["verifier_plan"] = self._build_verifier_plan(
command=step.command,
namespace=namespace,
target_resource=target_resource,
evidence=evidence,
)
metadata["repair_candidate_status"] = "candidate_ready_for_approval"
metadata["fallback_replaced"] = True
approval_request = ApprovalRequestCreate(
action=step.command,
description=self._build_description(
message=message,
playbook=playbook,
step_command=step.command,
evidence=evidence,
verifier_plan=metadata["verifier_plan"],
),
risk_level=self._approval_risk(step.risk_level),
blast_radius=BlastRadius(
affected_pods=1,
estimated_downtime=f"{playbook.estimated_duration_minutes} min",
related_services=[target_resource] if target_resource else [],
data_impact=DataImpact.WRITE,
),
dry_run_checks=self._build_dry_run_checks(
evidence=evidence,
playbook=playbook,
command=step.command,
),
requested_by="OpenClaw (MCP evidence + PlayBook trust)",
incident_id=incident.incident_id,
metadata=metadata,
matched_playbook_id=playbook.playbook_id,
)
logger.info(
"repair_candidate_generated",
incident_id=incident.incident_id,
playbook_id=playbook.playbook_id,
sensors_succeeded=evidence.sensors_succeeded if evidence else 0,
command=step.command[:160],
)
return RepairCandidateResult(
approval_request=approval_request,
evidence=evidence,
playbook=playbook,
metadata=metadata,
)
async def _collect_evidence(self, incident: Incident) -> EvidenceSnapshot | None:
try:
return await self._investigator.investigate(incident)
except Exception as exc:
logger.warning(
"repair_candidate_evidence_collect_failed",
incident_id=incident.incident_id,
error=str(exc),
)
return None
def _select_executable_step(
self,
incident: Incident,
playbook: Playbook,
) -> tuple[Any | None, list[str]]:
blockers: list[str] = []
if not playbook.repair_steps:
return None, ["playbook_has_no_repair_steps"]
rejected_readonly = False
rejected_unsafe = False
for step in playbook.repair_steps:
command = (step.command or "").strip()
if not command:
continue
if step.action_type == ActionType.MANUAL:
rejected_readonly = True
continue
if step.action_type == ActionType.KUBECTL or command.startswith("kubectl"):
parsed = parse_kubectl_action(command)
if not parsed.ok:
rejected_unsafe = True
continue
if parsed.kind == ActionKind.READONLY:
rejected_readonly = True
continue
return step, []
if step.action_type == ActionType.SSH_COMMAND or command.startswith("ssh "):
if self._auto_repair.preview_write_ssh_mcp_route(incident, command):
return step, []
if self._looks_like_diagnostic_command(command):
rejected_readonly = True
continue
rejected_unsafe = True
if rejected_unsafe:
blockers.append("playbook_command_not_safely_routable")
if rejected_readonly:
blockers.append("playbook_observe_only")
if not blockers:
blockers.append("playbook_has_no_executable_step")
return None, blockers
def _base_metadata(self, *, evidence: EvidenceSnapshot | None) -> dict[str, Any]:
if not evidence:
return {
"source": "mcp_playbook_repair_candidate",
"mcp_evidence": {"status": "missing"},
}
return {
"source": "mcp_playbook_repair_candidate",
"mcp_evidence": {
"status": "collected" if evidence.sensors_succeeded > 0 else "degraded",
"snapshot_id": evidence.snapshot_id,
"sensors_attempted": evidence.sensors_attempted,
"sensors_succeeded": evidence.sensors_succeeded,
"mcp_health": evidence.mcp_health,
"summary_excerpt": (evidence.evidence_summary or "")[:800],
},
}
def _blocked_result(
self,
*,
blockers: list[str],
metadata: dict[str, Any],
fallback_action: str,
evidence: EvidenceSnapshot | None = None,
playbook: Playbook | None = None,
) -> RepairCandidateResult:
metadata["repair_candidate_status"] = "blocked"
metadata["repair_candidate_blockers"] = list(dict.fromkeys(blockers))
metadata["repair_candidate_blocker_summary"] = self._humanize_blockers(
metadata["repair_candidate_blockers"]
)
draft_package = self._build_draft_package(
blockers=metadata["repair_candidate_blockers"],
playbook=playbook,
evidence=evidence,
)
metadata["playbook_draft_required"] = True
metadata["repair_candidate_next_step"] = draft_package["next_step"]
metadata["repair_candidate_draft_package"] = draft_package
metadata["fallback_action"] = fallback_action
return RepairCandidateResult(
evidence=evidence,
playbook=playbook,
blockers=metadata["repair_candidate_blockers"],
metadata=metadata,
)
def _is_generic_fallback_playbook(self, playbook: Playbook) -> bool:
alert_names = playbook.symptom_pattern.alert_names or []
if "*" in alert_names:
return True
return "通用兜底" in playbook.name or "generic fallback" in playbook.name.lower()
def _looks_like_diagnostic_command(self, command: str) -> bool:
command = command.lower()
diagnostic_markers = (
"uptime",
"ps aux",
"docker stats",
"journalctl",
"systemctl status",
"kubectl get",
"kubectl describe",
"tail ",
"grep ",
)
return any(marker in command for marker in diagnostic_markers)
def _humanize_blockers(self, blockers: list[str]) -> str:
labels = {
"incident_not_found": "找不到 incident無法綁定真相鏈",
"mcp_evidence_missing": "MCP 證據未完成",
"playbook_not_matched": "沒有命中專屬 PlayBook",
"playbook_not_found": "命中的 PlayBook 不存在",
"playbook_not_approved": "PlayBook 尚未批准",
"playbook_trust_below_gate": "PlayBook trust score 低於候選門檻",
"playbook_generic_fallback_not_repair": "只命中通用兜底 PlayBook禁止當成修復命令",
"playbook_has_no_repair_steps": "PlayBook 沒有修復步驟",
"playbook_command_not_safely_routable": "PlayBook 命令未通過安全路由",
"playbook_observe_only": "PlayBook 只有觀察或診斷步驟",
"playbook_has_no_executable_step": "PlayBook 沒有可執行修復步驟",
}
return "".join(labels.get(blocker, blocker) for blocker in blockers)
def _build_draft_package(
self,
*,
blockers: list[str],
playbook: Playbook | None,
evidence: EvidenceSnapshot | None,
) -> dict[str, Any]:
"""Describe the concrete owner-review package needed to unblock repair.
The package is a handoff contract only. It must not be interpreted as
approval to mutate runtime state or auto-create an approved PlayBook.
"""
blocker_set = set(blockers)
if "incident_not_found" in blocker_set:
lane = "restore_truth_chain_before_repair"
next_step = "先修復 incident / approval 真相鏈綁定,再重跑 MCP evidence 與 PlayBook 匹配。"
elif "mcp_evidence_missing" in blocker_set:
lane = "rerun_mcp_evidence_collection"
next_step = (
"先按重診收集 MCP evidence成功後再建立服務專屬 PlayBook 草案,"
"禁止只憑通用規則批准修復。"
)
elif {
"playbook_not_matched",
"playbook_not_found",
"playbook_generic_fallback_not_repair",
} & blocker_set:
lane = "create_service_specific_repair_playbook"
next_step = (
"建立專屬 PlayBook 草案:綁定 alertname / target selector補 MCP evidence refs、"
"修復命令、rollback、verifier plan 與 owner review通用兜底不可執行。"
)
elif "playbook_observe_only" in blocker_set:
lane = "promote_diagnostic_to_repair_playbook"
next_step = (
"把診斷命令保留為 MCP evidence collector另建獨立修復步驟、rollback "
"與 verifier經 owner review 後才可進入批准。"
)
elif "playbook_command_not_safely_routable" in blocker_set:
lane = "route_command_through_safe_mcp_or_ansible"
next_step = (
"將命令改走 allowlisted MCP / Ansible route補 blast radius、rollback "
"與 verifier plan再送 owner review。"
)
elif {
"playbook_not_approved",
"playbook_trust_below_gate",
} & blocker_set:
lane = "owner_review_playbook_trust_gate"
next_step = (
"由 owner review PlayBook 狀態與 trust score補成功/失敗證據後才可進入修復候選。"
)
else:
lane = "repair_candidate_owner_review"
next_step = (
"建立人工處置包並補 PlayBook 草案欄位;完成 owner review 後再重跑候選生成。"
)
evidence_ref = None
if evidence and evidence.snapshot_id:
evidence_ref = evidence.snapshot_id
return {
"schema_version": "repair_candidate_draft_package_v1",
"status": "draft_required",
"lane": lane,
"next_step": next_step,
"matched_playbook_id": playbook.playbook_id if playbook else None,
"matched_playbook_name": playbook.name if playbook else None,
"evidence_snapshot_id": evidence_ref,
"required_fields": [
"alertname",
"target_selector",
"mcp_evidence_refs",
"repair_command",
"rollback_command",
"verifier_plan",
"owner_review",
],
"blocked_operations": [
"auto_execute",
"approve_no_action_as_repair",
"generic_fallback_repair",
],
}
def _build_description(
self,
*,
message: str,
playbook: Playbook,
step_command: str,
evidence: EvidenceSnapshot | None,
verifier_plan: list[str],
) -> str:
evidence_line = "MCP evidence missing"
if evidence:
evidence_line = (
f"MCP evidence {evidence.sensors_succeeded}/{evidence.sensors_attempted}; "
f"snapshot={evidence.snapshot_id}"
)
verifier_text = "; ".join(verifier_plan)
return (
"LLM fallback 後由 MCP evidence + PlayBook trust 產生修復候選。\n"
f"原始告警:{message[:500]}\n"
f"PlayBook{playbook.playbook_id} / {playbook.name} / trust={playbook.trust_score:.2f}\n"
f"證據:{evidence_line}\n"
f"候選命令:{step_command}\n"
f"Verifier plan{verifier_text}\n"
"注意:這只是 approval candidate未經批准不得執行。"
)
def _build_dry_run_checks(
self,
*,
evidence: EvidenceSnapshot | None,
playbook: Playbook,
command: str,
) -> list[DryRunCheck]:
sensors_attempted = evidence.sensors_attempted if evidence else 0
sensors_succeeded = evidence.sensors_succeeded if evidence else 0
return [
DryRunCheck(
name="MCP evidence collected",
passed=sensors_succeeded > 0,
message=f"{sensors_succeeded}/{sensors_attempted} sensors succeeded",
),
DryRunCheck(
name="PlayBook approved and trusted",
passed=(
playbook.status == PlaybookStatus.APPROVED
and float(playbook.trust_score) >= MIN_REPAIR_CANDIDATE_TRUST
),
message=f"{playbook.playbook_id} trust={playbook.trust_score:.2f}",
),
DryRunCheck(
name="Command safety gate",
passed=True,
message=command[:240],
),
DryRunCheck(
name="Post execution verifier planned",
passed=True,
message="verify health/evidence after execution before closing incident",
),
]
def _build_verifier_plan(
self,
*,
command: str,
namespace: str,
target_resource: str,
evidence: EvidenceSnapshot | None,
) -> list[str]:
plan = [
"rerun_pre_decision_evidence_after_execution",
"compare_mcp_sensor_success_before_after",
]
if command.startswith("kubectl"):
plan.append(f"kubectl rollout status/get events in {namespace or 'awoooi-prod'}")
if command.startswith("ssh "):
plan.append("ssh_mcp_readonly_health_check_after_write_route")
if target_resource:
plan.append(f"confirm target_resource={target_resource} alert stops recurring")
if evidence and evidence.snapshot_id:
plan.append(f"baseline_snapshot={evidence.snapshot_id}")
return plan
def _approval_risk(self, risk: PlaybookRiskLevel) -> ApprovalRiskLevel:
try:
mapped = ApprovalRiskLevel(risk.value.lower())
except Exception:
return ApprovalRiskLevel.MEDIUM
if mapped == ApprovalRiskLevel.LOW:
return ApprovalRiskLevel.MEDIUM
return mapped
_repair_candidate_service: RepairCandidateService | None = None
def get_repair_candidate_service() -> RepairCandidateService:
"""Return singleton candidate builder."""
global _repair_candidate_service
if _repair_candidate_service is None:
_repair_candidate_service = RepairCandidateService()
return _repair_candidate_service