Files
awoooi/apps/api/src/services/failure_watcher.py
Your Name 4013c6a1ad
Some checks failed
Code Review / ai-code-review (push) Successful in 20s
CD Pipeline / tests (push) Successful in 1m36s
CD Pipeline / build-and-deploy (push) Successful in 5m46s
CD Pipeline / post-deploy-checks (push) Successful in 1m29s
Ansible / Reboot Recovery Contract / validate (push) Has been cancelled
fix(agents): align reports with controlled autonomy
2026-06-26 23:40:41 +08:00

807 lines
29 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Failure Watcher Service - Phase 18 失敗自動修復閉環
====================================================
Phase 18: Failure Auto-Repair Loop (2026-03-31 統帥批准)
職責:
- 監聽 AuditLog 失敗事件
- AI 分析失敗原因
- 評估風險等級
- LOW / MEDIUM / HIGH 由 AI Agent 受控自動修復CRITICAL / break-glass 才硬封鎖
設計原則:
- 實作 IFailureWatcher Protocol
- 使用 OpenClaw 進行 AI 分析
- 與 Telegram 整合推送修復請求
版本: v1.0
建立: 2026-03-31 (台北時區)
建立者: Claude Code (Phase 18 失敗自動修復)
"""
import json
from dataclasses import dataclass
from datetime import UTC, datetime
import structlog
from src.core.redis_client import get_redis
from src.db.base import get_db_context
from src.db.models import AuditLog
from src.repositories.interfaces import IFailureWatcher
logger = structlog.get_logger(__name__)
# =============================================================================
# Constants
# =============================================================================
# 失敗分類
FAILURE_CLASSIFICATIONS = {
"TIMEOUT": ["timeout", "timed out", "deadline exceeded"],
"K8S_ERROR": ["kubernetes", "k8s", "pod", "deployment", "service", "forbidden"],
"NETWORK_ERROR": ["connection", "network", "unreachable", "dns", "resolve"],
"PERMISSION_DENIED": ["permission", "denied", "unauthorized", "403", "401"],
"RESOURCE_ERROR": ["oom", "memory", "cpu", "quota", "limit"],
}
# 風險等級配置
RISK_LEVELS = {
"LOW": {
"auto_repair": True,
"operations": ["restart_pod", "restart_deployment", "clear_cache"],
},
"MEDIUM": {
"auto_repair": True,
"operations": ["scale_deployment", "rollback", "update_config"],
},
"HIGH": {
"auto_repair": True,
"operations": ["controlled_rollback", "controlled_config_repair", "controlled_service_repair"],
},
"CRITICAL": {
"auto_repair": False,
"operations": ["delete_pvc", "drop_database", "network_policy"],
},
}
# Redis Stream for failures
FAILURE_STREAM_KEY = "awoooi:failures"
FAILURE_CONSUMER_GROUP = "failure_watchers"
# 自動修復重試上限
MAX_AUTO_REPAIR_RETRIES = 3
# =============================================================================
# Failure Analysis Result
# =============================================================================
@dataclass
class FailureAnalysis:
"""失敗分析結果"""
classification: str # TIMEOUT/K8S_ERROR/NETWORK_ERROR/PERMISSION_DENIED
root_cause: str
suggested_repair: str
risk_level: str # LOW/MEDIUM/CRITICAL
confidence: float
def to_dict(self) -> dict:
return {
"classification": self.classification,
"root_cause": self.root_cause,
"suggested_repair": self.suggested_repair,
"risk_level": self.risk_level,
"confidence": self.confidence,
}
# =============================================================================
# Failure Watcher Service
# =============================================================================
class FailureWatcherService(IFailureWatcher):
"""
失敗監聯服務 - Phase 18 核心元件
流程:
1. 收到失敗事件 (from Redis Stream or direct call)
2. AI 分析失敗原因 (OpenClaw)
3. 評估風險等級
4. LOW / MEDIUM / HIGH → 受控自動修復 → 揭露通知
5. CRITICAL / hard blocker → Telegram + 前端 break-glass blocked 狀態
"""
def __init__(self) -> None:
pass # Stateless service
async def process_failure(
self,
audit_log_id: str,
failure_data: dict,
) -> dict:
"""
處理單一失敗事件
Args:
audit_log_id: AuditLog ID
failure_data: {error_message, operation_type, target_resource, ...}
Returns:
{repair_attempted, repair_result, risk_level, next_action}
"""
error_message = failure_data.get("error_message", "Unknown error")
operation_type = failure_data.get("operation_type", "UNKNOWN")
target_resource = failure_data.get("target_resource", "unknown")
logger.info(
"failure_watcher_processing",
audit_log_id=audit_log_id,
operation_type=operation_type,
target_resource=target_resource,
)
# 1. AI 分析失敗原因
analysis = await self.analyze_failure(
error_message=error_message,
operation_type=operation_type,
target_resource=target_resource,
)
# 2. 更新 AuditLog 分類
await self._update_audit_log_classification(
audit_log_id=audit_log_id,
classification=analysis["classification"],
auto_repair_attempted=False,
)
# 3. 根據風險等級決定行動
risk_level = analysis["risk_level"]
result = {
"repair_attempted": False,
"repair_result": None,
"risk_level": risk_level,
"next_action": "unknown",
"analysis": analysis,
}
# =====================================================================
# P0-1 修復: 全域自動修復熔斷檢查 (ADR-040)
# 2026-03-31 首席架構師審查要求
# =====================================================================
from src.services.global_repair_cooldown import check_global_repair_cooldown
can_global_repair, global_reason = await check_global_repair_cooldown(
incident_id=audit_log_id,
affected_services=[target_resource],
)
if not can_global_repair:
logger.warning(
"global_repair_cooldown_blocked",
audit_log_id=audit_log_id,
target_resource=target_resource,
reason=global_reason,
)
# 全域熔斷代表 repair storm 風險,仍維持 break-glass blocked。
risk_level = "CRITICAL"
result["risk_level"] = "CRITICAL"
result["next_action"] = "blocked_by_global_cooldown"
await self._queue_ai_repair_followup(
audit_log_id=audit_log_id,
analysis=analysis,
reason=global_reason,
)
return result
# Phase 18.3: 單資源安全檢查 - 防止修復風暴
can_auto_repair = await self._check_repair_cooldown(
target_resource=target_resource,
namespace=failure_data.get("namespace", "awoooi"),
)
if not can_auto_repair:
# 超過單資源冷卻期限制,改由 AI 受控重試佇列處理,不再要求人工接手。
logger.info(
"repair_cooldown_escalate",
audit_log_id=audit_log_id,
target_resource=target_resource,
original_risk_level=risk_level,
)
risk_level = "MEDIUM"
result["risk_level"] = "MEDIUM"
if risk_level != "CRITICAL" and RISK_LEVELS.get(risk_level, {}).get("auto_repair"):
# 自動修復 (Phase 18.3: 傳入完整 failure_data)
success, repair_result = await self.execute_auto_repair(
audit_log_id=audit_log_id,
repair_strategy=analysis["suggested_repair"],
failure_data=failure_data,
)
result["repair_attempted"] = True
result["repair_result"] = repair_result
result["next_action"] = "auto_repaired" if success else "escalate"
# 更新 AuditLog
await self._update_audit_log_classification(
audit_log_id=audit_log_id,
classification=analysis["classification"],
auto_repair_attempted=True,
auto_repair_result=repair_result,
)
if success:
# P0-1 補充: 記錄全域修復動作 (ADR-040)
from src.services.global_repair_cooldown import record_global_repair_action
await record_global_repair_action()
# 推送揭露通知 (自動修復成功)
await self._push_repair_notification(
audit_log_id=audit_log_id,
repair_result=repair_result,
auto=True,
)
else:
# 失敗後排入 AI controlled retry / rollback不再直接轉人工。
result["risk_level"] = "MEDIUM"
await self._queue_ai_repair_followup(
audit_log_id=audit_log_id,
analysis=analysis,
reason="AI 受控自動修復失敗,已排入 rollback / transport / PlayBook 修復重試",
)
result["next_action"] = "ai_retry_queued"
else:
# CRITICAL: break-glass / hard blocker不執行寫入。
await self._queue_ai_repair_followup(
audit_log_id=audit_log_id,
analysis=analysis,
reason=f"風險等級 {risk_level} 命中 break-glass guardrailAI 已封鎖寫入並保留證據",
)
result["next_action"] = "break_glass_blocked"
logger.info(
"failure_watcher_processed",
audit_log_id=audit_log_id,
risk_level=risk_level,
next_action=result["next_action"],
repair_attempted=result["repair_attempted"],
)
return result
async def analyze_failure(
self,
error_message: str,
operation_type: str,
target_resource: str,
) -> dict:
"""
AI 分析失敗原因
先用規則引擎快速分類,再用 LLM 深度分析
"""
# 1. 規則引擎快速分類
classification = self._classify_by_rules(error_message)
# 2. 評估風險等級 (基於操作類型)
risk_level = self._assess_risk_level(operation_type)
# 3. LLM 深度分析 (非阻塞,失敗降級為規則結果)
llm_analysis = await self._llm_analyze(
error_message=error_message,
operation_type=operation_type,
target_resource=target_resource,
initial_classification=classification,
)
if llm_analysis:
# LLM 分析成功,使用 LLM 結果
return llm_analysis
# LLM 失敗,使用規則引擎結果
# 2026-04-16 ogt + Claude Sonnet 4.6: 修復 root_cause 只顯示 "規則引擎分類: K8S_ERROR"
# 根因LLM 分析失敗時未帶入實際 error_message用戶看到的卡片無任何有用資訊
_error_preview = (error_message[:200] if error_message else "未知錯誤").strip()
return {
"classification": classification,
"root_cause": (
f"[{classification}] {operation_type} 操作在 {target_resource} 失敗\n"
f"錯誤:{_error_preview}"
),
"suggested_repair": self._suggest_repair(classification),
"risk_level": risk_level,
"confidence": 0.4,
}
async def execute_auto_repair(
self,
audit_log_id: str,
repair_strategy: str,
failure_data: dict | None = None,
) -> tuple[bool, str]:
"""
執行自動修復 (僅限 LOW 風險)
Phase 18.3: K8s Executor 整合
2026-03-31 Claude Code (統帥批准)
支援操作:
- restart_deployment: 重啟 Deployment (rollout restart)
- restart_pod: 刪除 Pod 觸發重建
- clear_cache: 清理 Redis 快取
Returns:
(success, result_message)
"""
logger.info(
"auto_repair_executing",
audit_log_id=audit_log_id,
strategy=repair_strategy,
target=failure_data.get("target_resource") if failure_data else None,
)
try:
# 解析目標資源
target_resource = failure_data.get("target_resource", "") if failure_data else ""
namespace = failure_data.get("namespace", "awoooi") if failure_data else "awoooi"
# 解析資源類型和名稱 (格式: "deployment/api" 或 "pod/api-xxx")
resource_type = ""
resource_name = ""
if "/" in target_resource:
parts = target_resource.split("/", 1)
resource_type = parts[0].lower()
resource_name = parts[1] if len(parts) > 1 else ""
# =====================================================================
# Phase 18.3: 實際執行 K8s 修復操作
# P0-2 修復: 加入 Dry-run 驗證 (首席架構師審查要求)
# =====================================================================
if "restart" in repair_strategy.lower() and resource_name:
from src.services.executor import OperationType, get_executor
executor = get_executor()
# P0-2: Dry-run 驗證資源存在
if resource_type == "deployment":
dry_run = await executor.validate_action(
operation_type=OperationType.RESTART_DEPLOYMENT,
resource_name=resource_name,
namespace=namespace,
)
if not dry_run.passed:
logger.warning(
"auto_repair_dry_run_failed",
audit_log_id=audit_log_id,
resource=f"{resource_type}/{resource_name}",
reason=dry_run.message,
)
return False, f"Dry-run 失敗: {dry_run.message}"
# 重啟 Deployment
result = await executor.restart_deployment(
name=resource_name,
namespace=namespace,
)
if result.success:
logger.info(
"auto_repair_deployment_restarted",
audit_log_id=audit_log_id,
deployment=resource_name,
namespace=namespace,
)
return True, f"✅ Deployment {resource_name} 已重啟"
else:
return False, f"❌ 重啟失敗: {result.message}"
elif resource_type == "pod":
# P0-2: Dry-run 驗證 Pod 存在
dry_run = await executor.validate_action(
operation_type=OperationType.DELETE_POD,
resource_name=resource_name,
namespace=namespace,
)
if not dry_run.passed:
logger.warning(
"auto_repair_dry_run_failed",
audit_log_id=audit_log_id,
resource=f"{resource_type}/{resource_name}",
reason=dry_run.message,
)
return False, f"Dry-run 失敗: {dry_run.message}"
# 刪除 Pod 觸發重建
result = await executor.delete_pod(
name=resource_name,
namespace=namespace,
)
if result.success:
logger.info(
"auto_repair_pod_deleted",
audit_log_id=audit_log_id,
pod=resource_name,
namespace=namespace,
)
return True, f"✅ Pod {resource_name} 已刪除,等待重建"
else:
return False, f"❌ 刪除失敗: {result.message}"
else:
# 未知資源類型,記錄但不執行
logger.warning(
"auto_repair_unknown_resource_type",
audit_log_id=audit_log_id,
resource_type=resource_type,
resource_name=resource_name,
)
return False, f"未知資源類型: {resource_type}"
elif "clear_cache" in repair_strategy.lower():
# 清理 Redis 快取 (只清理特定前綴)
redis = get_redis()
# 安全清理: 只清理 cache 前綴
keys = await redis.keys("awoooi:cache:*")
if keys:
await redis.delete(*keys)
logger.info(
"auto_repair_cache_cleared",
audit_log_id=audit_log_id,
keys_deleted=len(keys),
)
return True, f"✅ 已清理 {len(keys)} 個快取 key"
else:
return True, " 無快取需清理"
else:
return False, f"未知修復策略: {repair_strategy}"
except Exception as e:
logger.exception(
"auto_repair_error",
audit_log_id=audit_log_id,
strategy=repair_strategy,
error=str(e),
)
return False, f"修復執行失敗: {e}"
# =========================================================================
# Private Methods
# =========================================================================
def _classify_by_rules(self, error_message: str) -> str:
"""規則引擎快速分類"""
error_lower = error_message.lower()
for classification, keywords in FAILURE_CLASSIFICATIONS.items():
if any(kw in error_lower for kw in keywords):
return classification
return "UNKNOWN"
def _assess_risk_level(self, operation_type: str) -> str:
"""評估風險等級"""
op_lower = operation_type.lower()
# CRITICAL 操作
if any(kw in op_lower for kw in ["delete", "drop", "force"]):
return "CRITICAL"
# MEDIUM 操作
if any(kw in op_lower for kw in ["scale", "rollback", "update", "patch"]):
return "MEDIUM"
# LOW 操作 (重啟類)
if any(kw in op_lower for kw in ["restart", "refresh", "clear"]):
return "LOW"
return "MEDIUM" # 預設 MEDIUM
def _suggest_repair(self, classification: str) -> str:
"""基於分類建議修復策略"""
suggestions = {
"TIMEOUT": "增加超時時間或重試操作",
"K8S_ERROR": "檢查 K8s 資源狀態,考慮重啟 Pod",
"NETWORK_ERROR": "檢查網路連線,驗證 DNS 解析",
"PERMISSION_DENIED": "檢查 RBAC 權限配置",
"RESOURCE_ERROR": "增加資源配額或清理資源",
"UNKNOWN": "AI 深度診斷:補抓 timeline / log / metric / PlayBook 相似案例",
}
return suggestions.get(classification, "AI 深度診斷")
async def _check_repair_cooldown(
self,
target_resource: str,
namespace: str,
) -> bool:
"""
檢查修復冷卻期 - 防止修復風暴
Phase 18.3: 安全機制
2026-03-31 Claude Code (統帥批准)
規則:
- 同一資源 5 分鐘內最多修復 3 次
- 超過則升級為 MEDIUM 風險,排入 AI 受控重試 / rollback / verifier
Returns:
True 如果可以自動修復False 如果超過限制
"""
try:
redis = get_redis()
cooldown_key = f"awoooi:repair_cooldown:{namespace}:{target_resource}"
# 檢查修復次數
repair_count = await redis.get(cooldown_key)
if repair_count and int(repair_count) >= MAX_AUTO_REPAIR_RETRIES:
logger.warning(
"repair_cooldown_exceeded",
target_resource=target_resource,
namespace=namespace,
repair_count=int(repair_count),
max_retries=MAX_AUTO_REPAIR_RETRIES,
)
return False
# 增加計數並設置 5 分鐘過期
await redis.incr(cooldown_key)
await redis.expire(cooldown_key, 300) # 5 分鐘
return True
except Exception as e:
logger.warning(
"repair_cooldown_check_error",
target_resource=target_resource,
error=str(e),
)
# 檢查失敗時,保守起見返回 True 允許修復
return True
async def _llm_analyze(
self,
error_message: str,
operation_type: str,
target_resource: str,
initial_classification: str,
) -> dict | None:
"""
LLM 深度分析失敗原因
Phase 18.4: OpenClaw 整合
2026-03-31 Claude Code (統帥批准)
使用 OpenClawService 進行 AI 分析,
整合 SignOz 監控數據提供更精準的 RCA。
"""
try:
from src.services.openclaw import get_openclaw
openclaw = get_openclaw()
# 建構告警上下文
alert_context = {
"alert_type": "execution_failure",
"severity": "warning",
"error_message": error_message,
"operation_type": operation_type,
"target_resource": target_resource,
"initial_classification": initial_classification,
"source": "failure_watcher",
}
# 呼叫 OpenClaw 分析 (含 SignOz 整合)
analysis_result, ai_provider, raw_response, signoz_metrics, trace_url, tokens, cost = (
await openclaw.analyze_alert(alert_context)
)
if analysis_result:
# 從 OpenClaw 結果建構修復分析
logger.info(
"openclaw_failure_analysis_success",
ai_provider=ai_provider,
severity=analysis_result.severity,
confidence=analysis_result.confidence,
tokens=tokens,
cost_usd=cost,
)
# 映射 OpenClaw 結果到修復分析格式
risk_level = self._map_severity_to_risk(analysis_result.severity)
return {
"classification": initial_classification, # 保留規則引擎分類
"root_cause": analysis_result.root_cause_analysis[:100],
"suggested_repair": self._extract_repair_action(
analysis_result.recommended_action
),
"risk_level": risk_level,
"confidence": analysis_result.confidence,
"ai_provider": ai_provider,
"signoz_trace_url": trace_url,
}
logger.warning(
"openclaw_failure_analysis_no_result",
raw_response=raw_response[:200] if raw_response else None,
)
return None
except Exception as e:
logger.warning(
"openclaw_failure_analysis_error",
error=str(e),
)
return None
def _map_severity_to_risk(self, severity: str) -> str:
"""
將 OpenClaw severity 映射到修復風險等級
Phase 18.4: 嚴重度映射
"""
severity_lower = severity.lower()
if severity_lower in ["critical", ""]:
return "CRITICAL"
elif severity_lower in ["warning", "medium", ""]:
return "MEDIUM"
else:
return "LOW"
def _extract_repair_action(self, recommended_action: str) -> str:
"""
從 OpenClaw 建議中提取可執行的修復動作
Phase 18.4: 動作提取
"""
action_lower = recommended_action.lower()
# 識別可自動執行的動作
if any(kw in action_lower for kw in ["restart", "重啟", "重新啟動"]):
if "deployment" in action_lower or "部署" in action_lower:
return "restart_deployment"
elif "pod" in action_lower:
return "restart_pod"
return "restart_pod" # 預設重啟 Pod
if any(kw in action_lower for kw in ["clear", "清理", "cache", "快取"]):
return "clear_cache"
if any(kw in action_lower for kw in ["scale", "擴展", "增加"]):
return "scale_up" # 交由受控策略檢查副本上下限與回滾
# 無法自動執行,返回原始建議
return recommended_action[:50]
async def _update_audit_log_classification(
self,
audit_log_id: str,
classification: str,
auto_repair_attempted: bool,
auto_repair_result: str | None = None,
) -> None:
"""更新 AuditLog 的失敗分類"""
try:
async with get_db_context() as db:
from sqlalchemy import update
stmt = (
update(AuditLog)
.where(AuditLog.id == audit_log_id)
.values(
failure_classification=classification,
auto_repair_attempted=auto_repair_attempted,
auto_repair_result=auto_repair_result,
)
)
await db.execute(stmt)
await db.commit()
logger.debug(
"audit_log_classification_updated",
audit_log_id=audit_log_id,
classification=classification,
)
except Exception as e:
logger.warning(
"audit_log_classification_update_failed",
audit_log_id=audit_log_id,
error=str(e),
)
async def _queue_ai_repair_followup(
self,
audit_log_id: str,
analysis: dict,
reason: str,
) -> None:
"""排入 AI 受控修復後續 (推送到 Telegram + 前端)"""
try:
# 推送到 Redis (前端 WebSocket 訂閱)
redis = get_redis()
repair_request = {
"type": "ai_controlled_repair_followup",
"audit_log_id": audit_log_id,
"analysis": analysis,
"reason": reason,
"controlled_retry": True,
"created_at": datetime.now(UTC).isoformat(),
}
await redis.publish(
"awoooi:repair_requests",
json.dumps(repair_request),
)
# 推送到 Telegram
from src.services.telegram_gateway import get_telegram_gateway
tg = get_telegram_gateway()
message = (
f"🤖 <b>AI 受控修復後續</b>\n\n"
f"├ 📋 AuditLog: <code>{audit_log_id[:8]}...</code>\n"
f"├ 📊 分類: {analysis.get('classification', 'UNKNOWN')}\n"
f"├ ⚠️ 風險: {analysis.get('risk_level', 'MEDIUM')}\n"
f"├ 🔍 原因: {analysis.get('root_cause', reason)}\n"
f"└ 💡 下一步: {analysis.get('suggested_repair', 'AI 深度診斷')}\n\n"
f"狀態:<code>ai_retry_or_break_glass_recorded</code>"
)
await tg.send_alert_notification(message)
logger.info(
"ai_controlled_repair_followup_sent",
audit_log_id=audit_log_id,
risk_level=analysis.get("risk_level"),
)
except Exception as e:
logger.warning(
"repair_request_send_failed",
audit_log_id=audit_log_id,
error=str(e),
)
async def _push_repair_notification(
self,
audit_log_id: str,
repair_result: str,
auto: bool = True,
) -> None:
"""推送修復完成通知"""
try:
from src.services.telegram_gateway import get_telegram_gateway
tg = get_telegram_gateway()
prefix = "🤖 自動修復" if auto else "✅ 手動修復"
message = (
f"{prefix} <b>完成</b>\n\n"
f"├ 📋 AuditLog: <code>{audit_log_id[:8]}...</code>\n"
f"└ 📝 結果: {repair_result}"
)
await tg.send_alert_notification(message)
except Exception as e:
logger.warning(
"repair_notification_send_failed",
audit_log_id=audit_log_id,
error=str(e),
)
# =============================================================================
# Singleton
# =============================================================================
_failure_watcher: FailureWatcherService | None = None
def get_failure_watcher() -> FailureWatcherService:
"""取得 FailureWatcherService 實例 (Singleton)"""
global _failure_watcher
if _failure_watcher is None:
_failure_watcher = FailureWatcherService()
return _failure_watcher