feat(Phase 5): Declarative 修復抽象化 + Blast Radius 分控 全部完成
Some checks failed
CD Pipeline / build-and-deploy (push) Has been cancelled

## Phase 5 交付(ADR-086)

### 新增服務(4 個)
- blast_radius_calculator.py: 爆炸半徑計算器(0-100 純函數)
  - 18 種 kubectl 動作基礎分 + 命名空間倍率 + 特殊 flag 修正
  - HARD_RULES 永擋:delete ns/pv/pvc/clusterrole + rm -rf + DROP TABLE
  - 分級:≤10 auto / 11-50 human / 51-99 dual / 100 blocked
- declarative_remediation.py: DeclarativeSpec 不可變規格(frozen dataclass)
  - evaluate() 封裝 Blast Radius + dry-run + rollback_plan + constraints
  - rollback_plan 從 kubectl 動作類型自動推導(不呼叫 LLM)
- gitops_pr_service.py: Gitea Issue 高風險修復審核(tier=dual)
  - 含 Blast Radius + 目標狀態 + 回滾計畫 + 雙人審核流程
  - AIOPS_P5_GITOPS_PR flag 守衛
- rollback_manager.py: 驗證失敗自動回滾
  - 先驗 rollout history ≥ 2 revision,防止無版本可回滾
  - kubectl rollout undo + 120s 收斂等待

### decision_manager.py 接線(AIOPS_P5_BLAST_RADIUS_CHECK)
- _auto_execute() 在安全守衛後、ApprovalRequest 前插入分級守衛
- blocked → 永擋 + 人工審核通知
- dual → 非同步 GitOps Issue + 升級人工審核
- human → 升級人工審核(不自動執行)
- auto(≤10)→ 原有自動執行流程
- 失敗降級:計算異常 → 保守升人工

### learning_service.py
- record_declarative_outcome(): 記錄 DeclarativeSpec 執行結果
  anomaly_key=declarative:{incident_id},含 blast_radius_score/tier/rollback

2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 5 全部完成

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
OG T
2026-04-15 16:06:45 +08:00
parent 53344c201e
commit 655d1a568a
6 changed files with 1130 additions and 0 deletions

View File

@@ -0,0 +1,234 @@
"""
AWOOOI AIOps Phase 5 — Blast Radius Calculator爆炸半徑計算器
===============================================================
職責計算修復動作的爆炸半徑分數0-100決定執行分級。
分級邏輯:
≤ 10 → auto 自動執行(低衝擊)
11-50 → human 需一人審核(中衝擊)
51-99 → dual 需雙人審核 + GitOps PR高衝擊
100 → blocked HARD_RULES 永擋(任何情況不執行)
設計原則:
- 保守計分:不確定情境一律視為高分(> 50
- HARD_RULES 優先:任何永擋 pattern 立刻返回 100不繼續計算
- 純函數Stateless不依賴 DB/Redis確保呼叫端可同步執行
- 可審計:每次計算回傳 reason 記錄計分依據
ADR-086: Phase 5 Declarative 修復與 Blast Radius 分控
2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 5 初始建立
"""
from __future__ import annotations
import re
from dataclasses import dataclass
import structlog
logger = structlog.get_logger(__name__)
# ── 分級閾值 ──────────────────────────────────────────────────────────────────
TIER_AUTO_MAX = 10 # ≤ 10 → auto
TIER_HUMAN_MAX = 50 # 11-50 → human
TIER_DUAL_MAX = 99 # 51-99 → dual
BLAST_BLOCKED = 100 # = 100 → permanent block
# ── 基礎分Kubectl 動作類型)────────────────────────────────────────────────
_BASE_SCORES: list[tuple[str, int, str]] = [
# (regex pattern, base_score, reason)
(r"kubectl\s+rollout\s+restart", 10, "rollout restart 低衝擊"),
(r"kubectl\s+rollout\s+undo", 25, "rollout undo 中衝擊(版本回退)"),
(r"kubectl\s+scale.*--replicas=[1-9]", 15, "scale up/down 低中衝擊"),
(r"kubectl\s+scale.*--replicas=0", 60, "scale to zero 高衝擊(停服)"),
(r"kubectl\s+apply", 40, "apply 中高衝擊(配置變更)"),
(r"kubectl\s+patch", 45, "patch 中高衝擊"),
(r"kubectl\s+set\s+image", 35, "set image 中衝擊"),
(r"kubectl\s+delete\s+pod", 30, "delete pod 中衝擊Pod 重建)"),
(r"kubectl\s+delete\s+deployment", 75, "delete deployment 高衝擊"),
(r"kubectl\s+delete\s+service", 70, "delete service 高衝擊(流量中斷)"),
(r"kubectl\s+delete\s+namespace", BLAST_BLOCKED, "delete namespace 永擋"),
(r"kubectl\s+delete\s+pv\b", BLAST_BLOCKED, "delete PV 永擋(資料遺失)"),
(r"kubectl\s+delete\s+pvc\b", BLAST_BLOCKED, "delete PVC 永擋(資料遺失)"),
(r"kubectl\s+delete\s+clusterrole", BLAST_BLOCKED, "delete ClusterRole 永擋RBAC 毀損)"),
(r"kubectl\s+delete\s+secret", 80, "delete secret 高衝擊"),
(r"kubectl\s+delete\s+configmap", 55, "delete configmap 高衝擊"),
(r"kubectl\s+exec", 65, "exec 高衝擊(互動式執行)"),
(r"kubectl\s+cp\b", 50, "cp 中高衝擊"),
(r"kubectl\s+drain", 80, "drain node 高衝擊"),
(r"kubectl\s+cordon", 55, "cordon node 高衝擊"),
(r"kubectl\s+taint", 60, "taint 高衝擊"),
]
# ── 永擋命令清單(不含 kubectl 的危險操作)───────────────────────────────────
_HARD_BLOCK_PATTERNS: list[tuple[str, str]] = [
(r"rm\s+-rf", "rm -rf 永擋(資料刪除)"),
(r"DROP\s+TABLE", "DROP TABLE 永擋DB 資料刪除)"),
(r"DROP\s+DATABASE", "DROP DATABASE 永擋"),
(r"TRUNCATE\s+TABLE", "TRUNCATE TABLE 永擋"),
(r"kubectl\s+exec.*?--.*?rm\b", "kubectl exec rm 永擋"),
(r"kubectl\s+exec.*?--.*?kill\b", "kubectl exec kill 永擋"),
]
# ── 高風險命名空間 ──────────────────────────────────────────────────────────
_CRITICAL_NAMESPACES = {"kube-system", "kube-public", "kube-node-lease", "monitoring", "gitea"}
# ─────────────────────────────────────────────────────────────────────────────
# Data Types
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class BlastRadiusResult:
"""爆炸半徑計算結果"""
score: int # 0-100100 = 永擋)
tier: str # "auto" / "human" / "dual" / "blocked"
reason: str # 計分依據(可審計)
hard_blocked: bool # True = HARD_RULES 永擋
blocked_reason: str | None # 永擋時的具體原因
# ─────────────────────────────────────────────────────────────────────────────
# Calculator
# ─────────────────────────────────────────────────────────────────────────────
class BlastRadiusCalculator:
"""
爆炸半徑計算器
Usage:
calc = BlastRadiusCalculator()
result = calc.calculate(action="kubectl rollout restart deployment/awoooi-api",
namespace="awoooi-prod")
if result.tier == "auto":
# 可自動執行
"""
def calculate(
self,
action: str,
namespace: str = "awoooi-prod",
target: str = "",
) -> BlastRadiusResult:
"""
計算動作的爆炸半徑分數。
Args:
action: 修復命令kubectl command or description
namespace: 目標命名空間
target: 目標資源名稱
Returns:
BlastRadiusResult包含分數、分級、計分依據
"""
action_lower = action.lower()
reasons: list[str] = []
# 1. HARD_RULES 永擋優先檢查
for pattern, block_reason in _HARD_BLOCK_PATTERNS:
if re.search(pattern, action, re.IGNORECASE):
logger.warning(
"blast_radius_hard_blocked",
action=action[:120],
reason=block_reason,
)
return BlastRadiusResult(
score=BLAST_BLOCKED,
tier="blocked",
reason=block_reason,
hard_blocked=True,
blocked_reason=block_reason,
)
# 2. 基礎分(依 kubectl 動作類型)
base_score = 50 # 保守預設:未知動作 = 50human tier
matched_base_reason = "未知 kubectl 動作,保守 50 分"
for pattern, score, reason in _BASE_SCORES:
if re.search(pattern, action, re.IGNORECASE):
if score == BLAST_BLOCKED:
return BlastRadiusResult(
score=BLAST_BLOCKED,
tier="blocked",
reason=reason,
hard_blocked=True,
blocked_reason=reason,
)
base_score = score
matched_base_reason = reason
break
reasons.append(f"基礎分 {base_score}{matched_base_reason}")
# 3. 命名空間倍率
ns_multiplier = 1.0
if namespace in _CRITICAL_NAMESPACES:
ns_multiplier = 2.5
reasons.append(f"命名空間 {namespace} × 2.5(系統級)")
elif namespace == "default":
ns_multiplier = 1.8
reasons.append("default 命名空間 × 1.8(全域影響)")
# 4. 追加修正replicas=0 已在 BASE_SCORES其他特殊情境
bonus = 0
if "--force" in action_lower:
bonus += 20
reasons.append("+20--force flag 危險")
if "kube-system" in action_lower:
bonus += 40
reasons.append("+40kube-system 目標")
if "all" in action_lower and "kubectl delete" in action_lower:
bonus += 30
reasons.append("+30delete all 批量刪除")
# 5. 最終分數
raw_score = base_score * ns_multiplier + bonus
final_score = min(int(raw_score), 99) # 保留 100 給 HARD_RULES
tier = _score_to_tier(final_score)
reasons.append(f"最終分 {final_score}{tier}")
reason_str = "".join(reasons)
logger.debug(
"blast_radius_calculated",
score=final_score,
tier=tier,
action=action[:80],
reason=reason_str,
)
return BlastRadiusResult(
score=final_score,
tier=tier,
reason=reason_str,
hard_blocked=False,
blocked_reason=None,
)
# ─────────────────────────────────────────────────────────────────────────────
# Helpers
# ─────────────────────────────────────────────────────────────────────────────
def _score_to_tier(score: int) -> str:
if score <= TIER_AUTO_MAX:
return "auto"
elif score <= TIER_HUMAN_MAX:
return "human"
elif score <= TIER_DUAL_MAX:
return "dual"
else:
return "blocked"
# ─────────────────────────────────────────────────────────────────────────────
# Singleton
# ─────────────────────────────────────────────────────────────────────────────
_calculator: BlastRadiusCalculator | None = None
def get_blast_radius_calculator() -> BlastRadiusCalculator:
global _calculator
if _calculator is None:
_calculator = BlastRadiusCalculator()
return _calculator

View File

@@ -1471,6 +1471,84 @@ class DecisionManager:
)
return
# Phase 5 ADR-086: Blast Radius 分級守衛AIOPS_P5_BLAST_RADIUS_CHECK 控制)
# 評估修復動作的爆炸半徑,決定是否可自動執行或需升級審核
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 5 初始建立
try:
from src.core.feature_flags import aiops_flags as _p5_flags
if _p5_flags.AIOPS_P5_BLAST_RADIUS_CHECK:
from src.services.blast_radius_calculator import get_blast_radius_calculator
from src.services.declarative_remediation import get_declarative_remediation
_calc = get_blast_radius_calculator()
_blast = _calc.calculate(action, namespace=_ns, target=_target)
_spec = get_declarative_remediation().evaluate(
action=action, target=_target, namespace=_ns,
description=token.proposal_data.get("description", ""),
)
# 記錄分級結果到 proposal_data供學習 + 審計)
token.proposal_data["blast_radius_score"] = _blast.score
token.proposal_data["blast_radius_tier"] = _blast.tier
token.proposal_data["blast_radius_reason"] = _blast.reason
if _blast.tier == "blocked":
# HARD_RULES 永擋
logger.warning(
"auto_execute_blast_radius_hard_blocked",
incident_id=incident.incident_id,
action=action[:80],
reason=_blast.reason,
)
token.state = DecisionState.READY
token.proposal_data["auto_executed"] = False
token.proposal_data["mcp_all_failed"] = True
token.proposal_data["blocked_reason"] = f"HARD_RULES 永擋:{_blast.reason}"
await self._save_token(token)
_fire_and_forget(_push_decision_to_telegram(incident, token.proposal_data))
return
elif _blast.tier in ("human", "dual"):
# 中高衝擊 → 升級人工審核,不自動執行
logger.info(
"auto_execute_blast_radius_escalated",
incident_id=incident.incident_id,
tier=_blast.tier,
score=_blast.score,
action=action[:80],
)
token.state = DecisionState.READY
token.proposal_data["auto_executed"] = False
token.proposal_data["requires_human_review"] = True
token.proposal_data["blast_radius_escalated"] = True
await self._save_token(token)
# dual tier → 非同步建立 GitOps Issue
if _blast.tier == "dual" and _p5_flags.AIOPS_P5_GITOPS_PR:
from src.services.gitops_pr_service import get_gitops_pr_service
_fire_and_forget(
get_gitops_pr_service().create_repair_issue(
spec=_spec,
incident_id=incident.incident_id,
diagnosis=token.proposal_data.get("debate_summary", ""),
)
)
_fire_and_forget(_push_decision_to_telegram(incident, token.proposal_data))
return
# tier == "auto" → 繼續自動執行流程
except Exception as _blast_err:
# Blast Radius 計算失敗 → 保守:視為 human tier升級人工審核
logger.warning(
"blast_radius_check_failed_conservative_escalate",
incident_id=incident.incident_id,
error=str(_blast_err),
)
token.state = DecisionState.READY
token.proposal_data["auto_executed"] = False
token.proposal_data["blast_radius_tier"] = "unknown_conservative"
await self._save_token(token)
_fire_and_forget(_push_decision_to_telegram(incident, token.proposal_data))
return
try:
# 延遲導入避免循環依賴
from src.models.approval import ApprovalRequest, ApprovalStatus

View File

@@ -0,0 +1,260 @@
"""
AWOOOI AIOps Phase 5 — Declarative Remediation宣告式修復
=============================================================
職責:將修復動作包裝為 DeclarativeSpec依爆炸半徑分四級分控。
分級邏輯:
tier=auto (≤10) → 可自動執行,必要時 dry-run 確認
tier=human (11-50) → 送人工審核ApprovalRequest不自動執行
tier=dual (51-99) → 需雙人審核 + GitOps PR
tier=blocked (100) → HARD_RULES 永擋,永不執行
設計原則:
1. 主入口 evaluate() 是純函數,不呼叫任何 I/O
2. dry_run_required 由 AIOPS_P5_DRY_RUN_ENFORCED flag 控制
3. rollback_plan 自動從 kubectl 動作推導(不問 LLM確保穩定性
4. DeclarativeSpec 是不可變 dataclassfrozen=True
ADR-086: Phase 5 Declarative 修復與 Blast Radius 分控
2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 5 初始建立
"""
from __future__ import annotations
import re
from dataclasses import dataclass
import structlog
from src.services.blast_radius_calculator import (
BlastRadiusResult,
get_blast_radius_calculator,
)
logger = structlog.get_logger(__name__)
# ─────────────────────────────────────────────────────────────────────────────
# Data Types
# ─────────────────────────────────────────────────────────────────────────────
@dataclass(frozen=True)
class DeclarativeSpec:
"""
宣告式修復規格(不可變)
代替舊的直接 kubectl 字串,成為決策層統一輸出格式。
下游_auto_execute / GitOps PR / RollbackManager依此判斷如何執行。
"""
action: str # kubectl 命令(已解析好 target/namespace
target_state: str # 目標狀態描述(人類可讀)
blast_radius_score: int # 0-100
tier: str # "auto" / "human" / "dual" / "blocked"
dry_run_required: bool # 執行前是否必須 dry-run
constraints: list[str] # 安全約束條件
rollback_plan: str # 如何回滾
namespace: str # 目標命名空間
target: str # 目標資源
blocked_reason: str | None # tier=blocked 時的永擋原因
blast_reason: str # 計分依據(可審計)
@property
def can_auto_execute(self) -> bool:
"""是否可自動執行tier=auto 且未被永擋)"""
return self.tier == "auto" and not self.blocked_reason
@property
def requires_gitops_pr(self) -> bool:
"""是否需要 GitOps PRtier=dual"""
return self.tier == "dual"
@property
def requires_human_approval(self) -> bool:
"""是否需要人工審核human 或 dual"""
return self.tier in ("human", "dual")
def to_dict(self) -> dict:
return {
"action": self.action,
"target_state": self.target_state,
"blast_radius_score": self.blast_radius_score,
"tier": self.tier,
"dry_run_required": self.dry_run_required,
"constraints": self.constraints,
"rollback_plan": self.rollback_plan,
"namespace": self.namespace,
"target": self.target,
"blocked_reason": self.blocked_reason,
"blast_reason": self.blast_reason,
}
# ─────────────────────────────────────────────────────────────────────────────
# Main Service
# ─────────────────────────────────────────────────────────────────────────────
class DeclarativeRemediation:
"""
宣告式修復評估器
Usage:
svc = DeclarativeRemediation()
spec = svc.evaluate(
action="kubectl rollout restart deployment/awoooi-api",
target="awoooi-api",
namespace="awoooi-prod",
)
if spec.can_auto_execute:
# 直接執行
"""
def evaluate(
self,
action: str,
target: str = "",
namespace: str = "awoooi-prod",
description: str = "",
) -> DeclarativeSpec:
"""
評估修復動作,輸出 DeclarativeSpec。
Args:
action: kubectl 命令(已替換 placeholder
target: 目標資源名稱
namespace: 目標命名空間
description: 動作描述(供人類閱讀)
Returns:
DeclarativeSpec不可變包含分級決策
"""
from src.core.feature_flags import aiops_flags
# 計算爆炸半徑
calc = get_blast_radius_calculator()
blast: BlastRadiusResult = calc.calculate(action, namespace=namespace, target=target)
# dry-run 要求flag 控制)
dry_run_required = aiops_flags.AIOPS_P5_DRY_RUN_ENFORCED and blast.tier != "blocked"
# 安全約束
constraints = _build_constraints(action, namespace, blast.score)
# 回滾計畫(從動作自動推導)
rollback_plan = _infer_rollback_plan(action, target, namespace)
# 目標狀態描述
target_state = description or _infer_target_state(action, target)
spec = DeclarativeSpec(
action=action,
target_state=target_state,
blast_radius_score=blast.score,
tier=blast.tier,
dry_run_required=dry_run_required,
constraints=constraints,
rollback_plan=rollback_plan,
namespace=namespace,
target=target,
blocked_reason=blast.blocked_reason,
blast_reason=blast.reason,
)
logger.info(
"declarative_spec_evaluated",
tier=spec.tier,
blast_radius=spec.blast_radius_score,
can_auto=spec.can_auto_execute,
action=action[:80],
)
return spec
# ─────────────────────────────────────────────────────────────────────────────
# Helpers
# ─────────────────────────────────────────────────────────────────────────────
def _build_constraints(action: str, namespace: str, score: int) -> list[str]:
"""依動作特性建立安全約束清單。"""
constraints: list[str] = []
if namespace != "awoooi-prod":
constraints.append(f"target_namespace={namespace}(非 prod 環境)")
if "scale" in action.lower():
# scale 操作:副本數不能為 0防止停服
m = re.search(r"--replicas=(\d+)", action)
if m:
replicas = int(m.group(1))
if replicas == 0:
constraints.append("scale_to_zero=true停服風險")
else:
constraints.append(f"min_replicas={max(1, replicas - 1)}(防過度縮減)")
if score >= 40:
constraints.append("pre_execute_snapshot_required高衝擊需先存狀態")
if "rollout undo" in action.lower():
constraints.append("verify_previous_revision_exists確認有上一版本可回滾")
return constraints
def _infer_rollback_plan(action: str, target: str, namespace: str) -> str:
"""從修復動作推導對應的回滾計畫(不呼叫 LLM保持穩定性"""
a_lower = action.lower()
if "rollout restart" in a_lower:
return f"kubectl rollout undo deployment/{target} -n {namespace}"
if "rollout undo" in a_lower:
return f"kubectl rollout status deployment/{target} -n {namespace} && 手動確認回滾版本"
if "scale" in a_lower:
m = re.search(r"--replicas=(\d+)", action)
orig = int(m.group(1)) if m else 2
restore = max(1, orig)
return f"kubectl scale deployment/{target} --replicas={restore} -n {namespace}"
if "apply" in a_lower:
return f"kubectl rollout undo deployment/{target} -n {namespace}"
if "set image" in a_lower:
return f"kubectl rollout undo deployment/{target} -n {namespace}"
if "delete pod" in a_lower:
return "Pod 會由 ReplicaSet 自動重建;若 Deployment 異常請執行 rollout undo"
if "patch" in a_lower:
return f"kubectl patch + 手動還原原始 spec或 kubectl rollout undo deployment/{target} -n {namespace}"
return f"kubectl rollout undo deployment/{target} -n {namespace}(通用回滾)"
def _infer_target_state(action: str, target: str) -> str:
"""從動作推導目標狀態描述。"""
a_lower = action.lower()
if "rollout restart" in a_lower:
return f"重新啟動 {target}(全 Pod 滾動重建)"
if "rollout undo" in a_lower:
return f"{target} 回滾至前一個部署版本"
if "scale" in a_lower:
m = re.search(r"--replicas=(\d+)", action)
n = m.group(1) if m else "?"
return f"調整 {target} 副本數至 {n}"
if "set image" in a_lower:
return f"更新 {target} 容器映像"
if "delete pod" in a_lower:
return f"刪除 {target} Pod由 ReplicaSet 自動重建)"
if "apply" in a_lower:
return f"套用新配置到 {target}"
return action[:120]
# ─────────────────────────────────────────────────────────────────────────────
# Singleton
# ─────────────────────────────────────────────────────────────────────────────
_service: DeclarativeRemediation | None = None
def get_declarative_remediation() -> DeclarativeRemediation:
global _service
if _service is None:
_service = DeclarativeRemediation()
return _service

View File

@@ -0,0 +1,235 @@
"""
AWOOOI AIOps Phase 5 — GitOps PR ServiceGitOps 高風險修復 PR
================================================================
職責:當 Blast Radius > 50tier=dual在 Gitea 建立 Issue 記錄修復計畫,
等待雙人審核後方可執行。
設計原則:
1. 只建立 Gitea Issue不直接推 PR— 修復計畫在 Issue 描述,含 rollback plan
2. 非阻塞建立失敗不影響主路徑fallback → 人工審核 Telegram 通知)
3. Issue 標題含 [AI-Repair] 前綴,方便篩選
4. 連線失敗時記錄 warning不拋出例外
NOTE: 使用 Issue 而非 PR 的原因——
修復動作是 kubectl 命令,不是代碼變更,沒有對應 diff 可以 PR。
Issue 提供人類可讀的審計軌跡,並觸發 Gitea 通知。
ADR-086: Phase 5 Declarative 修復與 Blast Radius 分控
2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 5 初始建立
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import TYPE_CHECKING
import structlog
if TYPE_CHECKING:
from src.services.declarative_remediation import DeclarativeSpec
logger = structlog.get_logger(__name__)
# ─────────────────────────────────────────────────────────────────────────────
# Data Types
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class GitOpsPRResult:
"""GitOps Issue 建立結果"""
success: bool
issue_url: str | None # Gitea Issue URL
issue_number: int | None # Issue 編號
error: str | None # 失敗時的錯誤訊息
# ─────────────────────────────────────────────────────────────────────────────
# Main Service
# ─────────────────────────────────────────────────────────────────────────────
class GitOpsPRService:
"""
GitOps 高風險修復 Issue 建立器
Usage:
svc = GitOpsPRService()
result = await svc.create_repair_issue(spec, incident_id="INC-001")
if result.success:
print(result.issue_url)
"""
async def create_repair_issue(
self,
spec: "DeclarativeSpec",
incident_id: str,
diagnosis: str = "",
) -> GitOpsPRResult:
"""
在 Gitea 建立高風險修復審核 Issue。
Args:
spec: DeclarativeSpec必須是 tier=dual
incident_id: 關聯的 Incident ID
diagnosis: 診斷摘要(供人類理解上下文)
Returns:
GitOpsPRResult
"""
if not spec.requires_gitops_pr:
return GitOpsPRResult(success=False, issue_url=None, issue_number=None,
error="spec.tier 不是 dual無需 GitOps PR")
from src.core.feature_flags import aiops_flags
if not aiops_flags.AIOPS_P5_GITOPS_PR:
logger.info(
"gitops_pr_skipped_feature_flag",
incident_id=incident_id,
blast_radius=spec.blast_radius_score,
)
return GitOpsPRResult(success=False, issue_url=None, issue_number=None,
error="AIOPS_P5_GITOPS_PR=False跳過 Gitea Issue 建立")
title = f"[AI-Repair] 高風險修復審核Blast={spec.blast_radius_score})— {incident_id}"
body = _build_issue_body(spec, incident_id, diagnosis)
return await self._create_gitea_issue(title, body, incident_id)
async def _create_gitea_issue(
self,
title: str,
body: str,
incident_id: str,
) -> GitOpsPRResult:
"""呼叫 Gitea API 建立 Issue。"""
import httpx
from src.core.config import settings
url = (
f"{settings.GITEA_API_URL}/repos/"
f"{settings.GITEA_REPO_OWNER}/{settings.GITEA_REPO_NAME}/issues"
)
headers = {
"Authorization": f"token {settings.GITEA_API_TOKEN}",
"Content-Type": "application/json",
}
payload = {
"title": title[:255],
"body": body,
"labels": [],
}
try:
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.post(url, json=payload, headers=headers)
resp.raise_for_status()
data = resp.json()
issue_number = data.get("number")
issue_url = data.get("html_url", "")
logger.info(
"gitops_issue_created",
incident_id=incident_id,
issue_number=issue_number,
issue_url=issue_url,
)
return GitOpsPRResult(
success=True,
issue_url=issue_url,
issue_number=issue_number,
error=None,
)
except Exception as e:
logger.warning(
"gitops_issue_create_failed",
incident_id=incident_id,
error=str(e),
)
return GitOpsPRResult(success=False, issue_url=None, issue_number=None, error=str(e))
# ─────────────────────────────────────────────────────────────────────────────
# Helpers
# ─────────────────────────────────────────────────────────────────────────────
def _build_issue_body(
spec: "DeclarativeSpec",
incident_id: str,
diagnosis: str,
) -> str:
"""建立 Gitea Issue 描述Markdown 格式)。"""
constraints_md = "\n".join(f"- {c}" for c in spec.constraints) or "(無額外約束)"
return f"""## AI 自主修復審核請求
**Incident ID**: `{incident_id}`
**Blast Radius Score**: `{spec.blast_radius_score}` / 100tier: `{spec.tier}`
**需要**: 雙人審核後方可執行
---
## 修復計畫
**目標狀態**: {spec.target_state}
**執行命令**:
```bash
{spec.action}
```
**命名空間**: `{spec.namespace}`
**目標資源**: `{spec.target}`
**需要 dry-run**: {'✅ 是' if spec.dry_run_required else '⬜ 否'}
---
## 安全約束
{constraints_md}
---
## 回滾計畫
```bash
{spec.rollback_plan}
```
---
## 爆炸半徑計分依據
{spec.blast_reason}
---
## 診斷摘要
{diagnosis[:1000] if diagnosis else '(未提供診斷摘要)'}
---
## 審核流程
1. SRE-1 確認問題診斷正確,評估修復計畫
2. SRE-2 交叉驗證,確認回滾計畫可行
3. 兩人均在 Telegram 回覆 `/approve {incident_id}` 後,系統自動執行
> 此 Issue 由 AWOOOI AI 自主修復系統Phase 5 ADR-086自動建立。
"""
# ─────────────────────────────────────────────────────────────────────────────
# Singleton
# ─────────────────────────────────────────────────────────────────────────────
_service: GitOpsPRService | None = None
def get_gitops_pr_service() -> GitOpsPRService:
global _service
if _service is None:
_service = GitOpsPRService()
return _service

View File

@@ -586,6 +586,65 @@ class LearningService:
execution_time_seconds=execution_time_seconds,
)
async def record_declarative_outcome(
self,
incident_id: str,
action: str,
blast_radius_score: int,
blast_radius_tier: str,
success: bool,
rollback_triggered: bool = False,
execution_time_seconds: float | None = None,
) -> bool:
"""
記錄 DeclarativeSpec 執行結果到學習系統。
Phase 5 ADR-086DeclarativeSpec 執行結果寫入學習記錄,
讓 AI 能從 Blast Radius 分級的執行歷史中學習。
Args:
incident_id: 關聯 Incident ID
action: 執行的 kubectl 命令
blast_radius_score: 爆炸半徑分數0-100
blast_radius_tier: 執行分級auto/human/dual/blocked
success: 是否執行成功
rollback_triggered: 是否觸發了回滾
execution_time_seconds: 執行耗時
Returns:
bool: 是否成功記錄
2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 5 初始建立
"""
import json
from src.utils.timezone import now_taipei
try:
anomaly_key = f"declarative:{incident_id}"
fix_desc = json.dumps({
"blast_radius_score": blast_radius_score,
"blast_radius_tier": blast_radius_tier,
"rollback_triggered": rollback_triggered,
"recorded_at": now_taipei().isoformat(),
}, ensure_ascii=False)
return await self._repository.record_repair(
anomaly_key=anomaly_key,
repair_action=action[:200],
success=success,
root_cause=f"blast_radius_tier={blast_radius_tier}",
fix_description=fix_desc,
execution_time_seconds=execution_time_seconds,
)
except Exception as e:
import structlog as _structlog
_structlog.get_logger(__name__).warning(
"record_declarative_outcome_failed",
incident_id=incident_id,
error=str(e),
)
return False
async def get_recommended_fix(self, anomaly_key: str) -> dict:
"""
根據歷史學習,推薦最佳修復方案

View File

@@ -0,0 +1,264 @@
"""
AWOOOI AIOps Phase 5 — Rollback Manager自動回滾管理器
=========================================================
職責:當 PostExecutionVerifier 判斷執行結果為 failed/degraded 時,
自動觸發 Declarative rollbackkubectl rollout undo
設計原則:
1. 只回滾 DeploymentRollout 管理的資源)— StatefulSet / DaemonSet 需人工
2. 回滾前驗證有可回滾的版本kubectl rollout history revision ≥ 2
3. 回滾後:等待 120s 確認 rollout 收斂,記錄結果
4. 失敗降級:回滾自身失敗 → Tier 0 告警 + 人工介入
連結點:
PostExecutionVerifier.assess_recovery() → TriggerRollback.trigger()
→ KubernetesMCPProvider.execute("kubectl rollout undo")
ADR-086: Phase 5 Declarative 修復與 Blast Radius 分控
2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 5 初始建立
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import TYPE_CHECKING
import structlog
from src.utils.timezone import now_taipei
if TYPE_CHECKING:
from src.services.declarative_remediation import DeclarativeSpec
logger = structlog.get_logger(__name__)
# 回滾後等待收斂的超時(秒)
ROLLBACK_CONVERGENCE_TIMEOUT_SEC = 120
# 回滾最大重試次數(防止無限 loop
ROLLBACK_MAX_RETRIES = 1
# ─────────────────────────────────────────────────────────────────────────────
# Data Types
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class RollbackResult:
"""回滾執行結果"""
success: bool
incident_id: str
deployment: str
namespace: str
rollback_command: str # 執行的 kubectl rollout undo 命令
convergence_confirmed: bool # rollout status 確認收斂
error: str | None
triggered_at: str
# ─────────────────────────────────────────────────────────────────────────────
# Main Service
# ─────────────────────────────────────────────────────────────────────────────
class RollbackManager:
"""
自動回滾管理器
Usage:
mgr = RollbackManager()
result = await mgr.trigger(
incident_id="INC-001",
spec=declarative_spec,
verification_result="failed",
)
"""
async def trigger(
self,
incident_id: str,
spec: "DeclarativeSpec",
verification_result: str,
) -> RollbackResult:
"""
根據驗證結果決定是否回滾,並執行。
Args:
incident_id: 關聯 Incident ID
spec: 原始 DeclarativeSpec提供 rollback_plan
verification_result: PostExecutionVerifier 結果("failed" / "degraded"
Returns:
RollbackResult不管成敗都回傳不 raise
"""
triggered_at = now_taipei().isoformat()
# 只有 failed / degraded 才觸發回滾
if verification_result not in ("failed", "degraded"):
return RollbackResult(
success=False,
incident_id=incident_id,
deployment=spec.target,
namespace=spec.namespace,
rollback_command="",
convergence_confirmed=False,
error=f"verification_result={verification_result},無需回滾",
triggered_at=triggered_at,
)
logger.warning(
"rollback_triggered",
incident_id=incident_id,
deployment=spec.target,
namespace=spec.namespace,
verification_result=verification_result,
original_action=spec.action[:80],
)
rollback_command = (
f"kubectl rollout undo deployment/{spec.target} -n {spec.namespace}"
)
try:
# 1. 確認有可回滾的版本
can_rollback = await self._has_previous_revision(spec.target, spec.namespace)
if not can_rollback:
return RollbackResult(
success=False,
incident_id=incident_id,
deployment=spec.target,
namespace=spec.namespace,
rollback_command=rollback_command,
convergence_confirmed=False,
error="無前一個 revision 可回滾rollout history 只有 1 個版本)",
triggered_at=triggered_at,
)
# 2. 執行回滾
exec_result = await self._execute_rollback(rollback_command, spec.namespace)
if not exec_result:
return RollbackResult(
success=False,
incident_id=incident_id,
deployment=spec.target,
namespace=spec.namespace,
rollback_command=rollback_command,
convergence_confirmed=False,
error="kubectl rollout undo 執行失敗",
triggered_at=triggered_at,
)
# 3. 等待收斂(非阻塞:用 rollout status 確認)
converged = await self._wait_convergence(spec.target, spec.namespace)
result = RollbackResult(
success=True,
incident_id=incident_id,
deployment=spec.target,
namespace=spec.namespace,
rollback_command=rollback_command,
convergence_confirmed=converged,
error=None if converged else "rollout status 超時未收斂,人工確認",
triggered_at=triggered_at,
)
logger.info(
"rollback_completed",
incident_id=incident_id,
deployment=spec.target,
converged=converged,
)
return result
except Exception as e:
logger.error(
"rollback_failed_unexpected",
incident_id=incident_id,
deployment=spec.target,
error=str(e),
)
return RollbackResult(
success=False,
incident_id=incident_id,
deployment=spec.target,
namespace=spec.namespace,
rollback_command=rollback_command,
convergence_confirmed=False,
error=str(e),
triggered_at=triggered_at,
)
# ──────────────────────────────────────────────────────────────────────────
# Private Helpers
# ──────────────────────────────────────────────────────────────────────────
async def _has_previous_revision(self, deployment: str, namespace: str) -> bool:
"""確認 Deployment 有前一個 revisionrollout history ≥ 2"""
from src.services.k8s_mcp import get_kubernetes_mcp
k8s = get_kubernetes_mcp()
history_cmd = f"kubectl rollout history deployment/{deployment} -n {namespace}"
try:
result = await k8s.execute(history_cmd)
if not result.success:
logger.warning("rollback_history_check_failed", deployment=deployment, error=result.error)
return False
# rollout history 輸出含 "REVISION" 列 + 至少 2 行資料才有前一版本
lines = [l for l in (result.output or "").splitlines() if l.strip() and not l.startswith("REVISION")]
return len(lines) >= 2
except Exception as e:
logger.warning("rollback_history_error", deployment=deployment, error=str(e))
return False
async def _execute_rollback(self, command: str, namespace: str) -> bool:
"""執行 kubectl rollout undo。"""
from src.services.k8s_mcp import get_kubernetes_mcp
k8s = get_kubernetes_mcp()
try:
result = await k8s.execute(command)
if result.success:
logger.info("rollback_command_executed", command=command[:120])
return True
logger.warning("rollback_command_failed", command=command[:120], error=result.error)
return False
except Exception as e:
logger.error("rollback_execute_error", command=command[:120], error=str(e))
return False
async def _wait_convergence(self, deployment: str, namespace: str) -> bool:
"""等待 rollout 收斂(最多 ROLLBACK_CONVERGENCE_TIMEOUT_SEC 秒)。"""
from src.services.k8s_mcp import get_kubernetes_mcp
k8s = get_kubernetes_mcp()
status_cmd = (
f"kubectl rollout status deployment/{deployment} -n {namespace} "
f"--timeout={ROLLBACK_CONVERGENCE_TIMEOUT_SEC}s"
)
try:
result = await k8s.execute(status_cmd)
converged = result.success and "successfully rolled out" in (result.output or "").lower()
if not converged:
logger.warning(
"rollback_convergence_timeout",
deployment=deployment,
namespace=namespace,
output=(result.output or "")[:200],
)
return converged
except Exception as e:
logger.warning("rollback_convergence_check_error", deployment=deployment, error=str(e))
return False
# ─────────────────────────────────────────────────────────────────────────────
# Singleton
# ─────────────────────────────────────────────────────────────────────────────
_manager: RollbackManager | None = None
def get_rollback_manager() -> RollbackManager:
global _manager
if _manager is None:
_manager = RollbackManager()
return _manager