diff --git a/apps/api/src/services/auto_repair_service.py b/apps/api/src/services/auto_repair_service.py index d4ddec4d..35dfa9bc 100644 --- a/apps/api/src/services/auto_repair_service.py +++ b/apps/api/src/services/auto_repair_service.py @@ -468,6 +468,23 @@ class AutoRepairService: blocked_by="HOST_BACKUP_K8S_PLAYBOOK", ) + if not self._playbook_has_mutating_steps(best_match.playbook): + logger.warning( + "auto_repair_blocked_observe_only_playbook", + incident_id=incident.incident_id, + playbook_id=best_match.playbook.playbook_id, + ) + return AutoRepairDecision( + can_auto_repair=False, + playbook=best_match.playbook, + reason=( + "PlayBook 只有診斷 / 觀測步驟,不能宣稱自動修復;" + "請轉成 Work Item 補真正修復步驟或 gated Ansible apply" + ), + risk_level=max_risk or RiskLevel.MEDIUM, + blocked_by="OBSERVE_ONLY_PLAYBOOK", + ) + # 5. 可以自動修復 logger.info( "auto_repair_approved", @@ -936,6 +953,33 @@ class AutoRepairService: return True return False + def _playbook_has_mutating_steps(self, playbook: Playbook) -> bool: + """Return true only when a PlayBook contains an actual repair action.""" + + for step in playbook.repair_steps: + command = (step.command or "").strip().lower() + if not command or step.action_type == ActionType.MANUAL: + continue + if step.action_type == ActionType.KUBECTL or command.startswith("kubectl "): + if not command.startswith(( + "kubectl get ", + "kubectl describe ", + "kubectl logs ", + "kubectl top ", + "kubectl explain ", + )): + return True + continue + if step.action_type == ActionType.SSH_COMMAND: + if command.startswith(("openclaw://", "ansible://")): + return True + if any(token in command for token in _SSH_WRITE_KEYWORDS): + return True + continue + if step.action_type == ActionType.SCRIPT: + return True + return False + def _playbook_is_k3s_node_repair(self, playbook: Playbook) -> bool: """K3s node repair must only run for actual K3s node alerts.""" diff --git a/apps/api/tests/test_auto_repair_service.py b/apps/api/tests/test_auto_repair_service.py index a856ebfd..dea53a76 100644 --- a/apps/api/tests/test_auto_repair_service.py +++ b/apps/api/tests/test_auto_repair_service.py @@ -395,8 +395,8 @@ class TestAutoRepairService: assert decision.playbook == playbook @pytest.mark.asyncio - async def test_backup_failure_allows_ssh_playbook(self, service, mock_playbook_service): - """Backup/host incidents may still use SSH playbooks.""" + async def test_backup_failure_blocks_observe_only_ssh_playbook(self, service, mock_playbook_service): + """Backup/host read-only diagnostics must not be counted as auto-repair.""" playbook = Playbook( playbook_id="PB-BACKUP-SSH", name="Backup SSH diagnostics", @@ -430,6 +430,53 @@ class TestAutoRepairService: ) decision = await service.evaluate_auto_repair(incident) + assert decision.can_auto_repair is False + assert decision.blocked_by == "OBSERVE_ONLY_PLAYBOOK" + + @pytest.mark.asyncio + async def test_docker_restart_ssh_playbook_remains_auto_repair_candidate( + self, + service, + mock_playbook_service, + ): + """Safe Docker restart PlayBooks still qualify as real mutating repair.""" + playbook = Playbook( + playbook_id="PB-DOCKER-RESTART", + name="Docker health restart", + description="Restart unhealthy Docker container", + status=PlaybookStatus.APPROVED, + symptom_pattern=SymptomPattern( + alert_names=["DockerContainerUnhealthy"], + affected_services=["stockplatform-v2-edge-1"], + ), + repair_steps=[ + RepairStep( + step_number=1, + action_type=ActionType.SSH_COMMAND, + command="ssh {host} 'docker restart {container}'", + risk_level=RiskLevel.MEDIUM, + ), + ], + success_count=20, + failure_count=1, + ) + mock_playbook_service.add_playbook(playbook) + mock_playbook_service.set_recommendations([ + MockPlaybookRecommendation(playbook, similarity_score=0.85) + ]) + + incident = create_test_incident( + severity=Severity.P2, + alert_category="infrastructure", + alert_name="DockerContainerUnhealthy", + ) + incident.affected_services = ["stockplatform-v2-edge-1"] + incident.signals[0].labels.update({ + "host": "110", + "container_name": "stockplatform-v2-edge-1", + }) + decision = await service.evaluate_auto_repair(incident) + assert decision.can_auto_repair is True assert decision.blocked_by is None