fix(auto-repair): prefer exact playbooks and fail failed steps
All checks were successful
Code Review / ai-code-review (push) Successful in 11s
CD Pipeline / tests (push) Successful in 1m3s
CD Pipeline / build-and-deploy (push) Successful in 3m31s
CD Pipeline / post-deploy-checks (push) Successful in 1m32s

This commit is contained in:
Your Name
2026-05-13 23:21:16 +08:00
parent ae643552e9
commit 7a8cbb3241
4 changed files with 155 additions and 11 deletions

View File

@@ -321,7 +321,16 @@ class AutoRepairService:
)
# 4. 檢查最佳匹配
best_match = recommendations[0]
best_match = self._select_best_recommendation(recommendations, symptoms)
if best_match is not recommendations[0]:
logger.warning(
"auto_repair_exact_match_prioritized",
incident_id=incident.incident_id,
selected_playbook_id=best_match.playbook.playbook_id,
original_playbook_id=recommendations[0].playbook.playbook_id,
selected_similarity=best_match.similarity_score,
original_similarity=recommendations[0].similarity_score,
)
# 2026-04-07 Claude Code: 統帥指令「直接全部跳成自動修復」
# 移除: 相似度門檻、is_high_quality 門檻、冷啟動機制、風險等級門檻
@@ -416,6 +425,8 @@ class AutoRepairService:
executed_steps.append(
f"Step {step.step_number}: {step.command[:50]}... -> {step_result}"
)
if self._is_step_failure_result(step_result):
raise RuntimeError(f"Step {step.step_number} failed: {step_result}")
# 更新 Playbook 統計
await self._playbook_service.record_execution(
@@ -697,6 +708,44 @@ class AutoRepairService:
keywords=keywords[:10],
)
def _select_best_recommendation(
self,
recommendations,
symptoms: SymptomPattern,
):
"""Prefer deterministic alert/service matches over fuzzy similarity only.
A higher fuzzy score must not outrank a playbook that explicitly names the
firing alert or affected service. Live-fire T16 proved that this can route
a safe K8s canary into an unrelated host diagnostic playbook.
"""
symptom_alerts = {str(name) for name in (symptoms.alert_names or []) if name}
symptom_services = {
str(service) for service in (symptoms.affected_services or []) if service
}
def _priority(recommendation) -> tuple[int, int, float]:
pattern = recommendation.playbook.symptom_pattern
playbook_alerts = {
str(name) for name in (pattern.alert_names or []) if name
}
playbook_services = {
str(service) for service in (pattern.affected_services or []) if service
}
alert_exact = int(bool(symptom_alerts & playbook_alerts))
service_exact = int(bool(symptom_services & playbook_services))
return (alert_exact, service_exact, float(recommendation.similarity_score or 0.0))
return max(recommendations, key=_priority)
@staticmethod
def _is_step_failure_result(step_result: str) -> bool:
"""Treat executor-declared failures as failed auto-repair executions."""
normalized = (step_result or "").strip().upper()
return normalized.startswith("FAILED:") or normalized == "UNKNOWN_ACTION_TYPE"
def _get_max_risk_level(self, playbook: Playbook) -> RiskLevel:
"""取得 Playbook 中最高的風險等級"""
risk_order = {

View File

@@ -277,6 +277,49 @@ class TestAutoRepairService:
assert decision.playbook.playbook_id == playbook.playbook_id
assert decision.blocked_by is None
@pytest.mark.asyncio
async def test_exact_alert_match_wins_over_higher_fuzzy_similarity(
self,
service,
mock_playbook_service,
):
"""Exact alert/service playbooks must outrank unrelated fuzzy matches."""
fuzzy_playbook = create_high_quality_playbook(
playbook_id="PB-FUZZY-HOST",
risk_level=RiskLevel.LOW,
)
fuzzy_playbook.symptom_pattern = SymptomPattern(
alert_names=["HostCPUHigh"],
affected_services=["node-exporter"],
severity_range=["P2"],
)
exact_playbook = create_high_quality_playbook(
playbook_id="PB-EXACT-CANARY",
risk_level=RiskLevel.LOW,
)
exact_playbook.symptom_pattern = SymptomPattern(
alert_names=["AwoooPAutoRepairCanaryT16"],
affected_services=["awoooi-auto-repair-canary"],
severity_range=["P3"],
)
mock_playbook_service.add_playbook(fuzzy_playbook)
mock_playbook_service.add_playbook(exact_playbook)
mock_playbook_service.set_recommendations([
MockPlaybookRecommendation(fuzzy_playbook, similarity_score=0.95),
MockPlaybookRecommendation(exact_playbook, similarity_score=0.45),
])
incident = create_test_incident(
severity=Severity.P3,
alert_name="AwoooPAutoRepairCanaryT16",
)
incident.affected_services = ["awoooi-auto-repair-canary"]
decision = await service.evaluate_auto_repair(incident)
assert decision.can_auto_repair is True
assert decision.playbook is not None
assert decision.playbook.playbook_id == "PB-EXACT-CANARY"
@pytest.mark.asyncio
async def test_backup_failure_blocks_k8s_playbook(self, service, mock_playbook_service):
"""Backup/host incidents must not execute K8s rollout playbooks."""

View File

@@ -298,9 +298,6 @@ async def test_auto_repair_failure_does_not_call_verifier(monkeypatch):
pb_service = FailingPlaybookService()
pb_service.add_playbook(playbook)
# 讓 _execute_step 拋例外以觸發失敗路徑
original_execute_step = AutoRepairService._execute_step
async def _always_fail(self_inner, incident_arg, step_arg) -> str:
raise RuntimeError("強制測試失敗")
@@ -323,6 +320,40 @@ async def test_auto_repair_failure_does_not_call_verifier(monkeypatch):
assert len(stub_learning.verification_calls) == 0, "執行失敗時不應呼叫 record_verification_result"
@pytest.mark.asyncio
async def test_auto_repair_failed_step_string_marks_execution_failure(monkeypatch):
"""Executor returned FAILED text must not be stored as successful repair."""
stub_verifier = StubVerifier(result="success")
stub_learning = StubLearningService()
import src.services.post_execution_verifier as _pev_mod
monkeypatch.setattr(_pev_mod, "_verifier", stub_verifier)
import src.services.learning_service as _ls_mod
monkeypatch.setattr(_ls_mod, "_learning_service", stub_learning)
playbook = _make_playbook()
pb_service = StubPlaybookService()
pb_service.add_playbook(playbook)
async def _returns_failed(self_inner, incident_arg, step_arg) -> str:
return "FAILED: simulated executor failure"
monkeypatch.setattr(AutoRepairService, "_execute_step", _returns_failed)
service = AutoRepairService(
playbook_service=pb_service,
cooldown_checker=_no_cooldown,
)
result = await service.execute_auto_repair(_make_incident(), playbook)
assert result.success is False
assert "simulated executor failure" in (result.error or "")
assert len(stub_verifier.calls) == 0
assert len(stub_learning.verification_calls) == 0
@pytest.mark.asyncio
async def test_record_verification_result_no_playbook_id_does_not_crash():
"""
@@ -330,7 +361,6 @@ async def test_record_verification_result_no_playbook_id_does_not_crash():
驗證 learning_service 對 None playbook_id 的防禦性。
"""
from src.services.learning_service import LearningService
from src.repositories.interfaces import ILearningRepository, ITrustRepository
class NullLearningRepo:
async def record_repair(self, **kwargs) -> bool:

View File

@@ -36,6 +36,7 @@ from src.models.playbook import (
RiskLevel,
SymptomPattern,
)
from src.core.redis_client import close_redis_pool, init_redis_pool
from src.repositories.playbook_repository import get_playbook_repository
from src.utils.timezone import now_taipei
@@ -146,12 +147,33 @@ async def _amain() -> None:
parser.add_argument("--namespace", default=DEFAULT_NAMESPACE)
args = parser.parse_args()
result = await seed_canary_playbook(
alertname=args.alertname,
target=args.target,
namespace=args.namespace,
)
print(json.dumps(asdict(result), ensure_ascii=False, sort_keys=True))
redis_initialized = False
try:
await init_redis_pool()
redis_initialized = True
except Exception as exc:
print(
json.dumps(
{
"warning": "redis_pool_init_failed_pg_seed_continues",
"error": str(exc),
},
ensure_ascii=False,
sort_keys=True,
),
file=sys.stderr,
)
try:
result = await seed_canary_playbook(
alertname=args.alertname,
target=args.target,
namespace=args.namespace,
)
print(json.dumps(asdict(result), ensure_ascii=False, sort_keys=True))
finally:
if redis_initialized:
await close_redis_pool()
if __name__ == "__main__":