diff --git a/apps/api/src/services/blast_radius_calculator.py b/apps/api/src/services/blast_radius_calculator.py new file mode 100644 index 00000000..9cf8b2f2 --- /dev/null +++ b/apps/api/src/services/blast_radius_calculator.py @@ -0,0 +1,234 @@ +""" +AWOOOI AIOps Phase 5 — Blast Radius Calculator(爆炸半徑計算器) +=============================================================== +職責:計算修復動作的爆炸半徑分數(0-100),決定執行分級。 + +分級邏輯: + ≤ 10 → auto 自動執行(低衝擊) + 11-50 → human 需一人審核(中衝擊) + 51-99 → dual 需雙人審核 + GitOps PR(高衝擊) + 100 → blocked HARD_RULES 永擋(任何情況不執行) + +設計原則: +- 保守計分:不確定情境一律視為高分(> 50) +- HARD_RULES 優先:任何永擋 pattern 立刻返回 100,不繼續計算 +- 純函數(Stateless):不依賴 DB/Redis,確保呼叫端可同步執行 +- 可審計:每次計算回傳 reason 記錄計分依據 + +ADR-086: Phase 5 Declarative 修復與 Blast Radius 分控 +2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 5 初始建立 +""" + +from __future__ import annotations + +import re +from dataclasses import dataclass + +import structlog + +logger = structlog.get_logger(__name__) + +# ── 分級閾值 ────────────────────────────────────────────────────────────────── +TIER_AUTO_MAX = 10 # ≤ 10 → auto +TIER_HUMAN_MAX = 50 # 11-50 → human +TIER_DUAL_MAX = 99 # 51-99 → dual +BLAST_BLOCKED = 100 # = 100 → permanent block + +# ── 基礎分(Kubectl 動作類型)──────────────────────────────────────────────── +_BASE_SCORES: list[tuple[str, int, str]] = [ + # (regex pattern, base_score, reason) + (r"kubectl\s+rollout\s+restart", 10, "rollout restart 低衝擊"), + (r"kubectl\s+rollout\s+undo", 25, "rollout undo 中衝擊(版本回退)"), + (r"kubectl\s+scale.*--replicas=[1-9]", 15, "scale up/down 低中衝擊"), + (r"kubectl\s+scale.*--replicas=0", 60, "scale to zero 高衝擊(停服)"), + (r"kubectl\s+apply", 40, "apply 中高衝擊(配置變更)"), + (r"kubectl\s+patch", 45, "patch 中高衝擊"), + (r"kubectl\s+set\s+image", 35, "set image 中衝擊"), + (r"kubectl\s+delete\s+pod", 30, "delete pod 中衝擊(Pod 重建)"), + (r"kubectl\s+delete\s+deployment", 75, "delete deployment 高衝擊"), + (r"kubectl\s+delete\s+service", 70, "delete service 高衝擊(流量中斷)"), + (r"kubectl\s+delete\s+namespace", BLAST_BLOCKED, "delete namespace 永擋"), + (r"kubectl\s+delete\s+pv\b", BLAST_BLOCKED, "delete PV 永擋(資料遺失)"), + (r"kubectl\s+delete\s+pvc\b", BLAST_BLOCKED, "delete PVC 永擋(資料遺失)"), + (r"kubectl\s+delete\s+clusterrole", BLAST_BLOCKED, "delete ClusterRole 永擋(RBAC 毀損)"), + (r"kubectl\s+delete\s+secret", 80, "delete secret 高衝擊"), + (r"kubectl\s+delete\s+configmap", 55, "delete configmap 高衝擊"), + (r"kubectl\s+exec", 65, "exec 高衝擊(互動式執行)"), + (r"kubectl\s+cp\b", 50, "cp 中高衝擊"), + (r"kubectl\s+drain", 80, "drain node 高衝擊"), + (r"kubectl\s+cordon", 55, "cordon node 高衝擊"), + (r"kubectl\s+taint", 60, "taint 高衝擊"), +] + +# ── 永擋命令清單(不含 kubectl 的危險操作)─────────────────────────────────── +_HARD_BLOCK_PATTERNS: list[tuple[str, str]] = [ + (r"rm\s+-rf", "rm -rf 永擋(資料刪除)"), + (r"DROP\s+TABLE", "DROP TABLE 永擋(DB 資料刪除)"), + (r"DROP\s+DATABASE", "DROP DATABASE 永擋"), + (r"TRUNCATE\s+TABLE", "TRUNCATE TABLE 永擋"), + (r"kubectl\s+exec.*?--.*?rm\b", "kubectl exec rm 永擋"), + (r"kubectl\s+exec.*?--.*?kill\b", "kubectl exec kill 永擋"), +] + +# ── 高風險命名空間 ────────────────────────────────────────────────────────── +_CRITICAL_NAMESPACES = {"kube-system", "kube-public", "kube-node-lease", "monitoring", "gitea"} + + +# ───────────────────────────────────────────────────────────────────────────── +# Data Types +# ───────────────────────────────────────────────────────────────────────────── + +@dataclass +class BlastRadiusResult: + """爆炸半徑計算結果""" + score: int # 0-100(100 = 永擋) + tier: str # "auto" / "human" / "dual" / "blocked" + reason: str # 計分依據(可審計) + hard_blocked: bool # True = HARD_RULES 永擋 + blocked_reason: str | None # 永擋時的具體原因 + + +# ───────────────────────────────────────────────────────────────────────────── +# Calculator +# ───────────────────────────────────────────────────────────────────────────── + +class BlastRadiusCalculator: + """ + 爆炸半徑計算器 + + Usage: + calc = BlastRadiusCalculator() + result = calc.calculate(action="kubectl rollout restart deployment/awoooi-api", + namespace="awoooi-prod") + if result.tier == "auto": + # 可自動執行 + """ + + def calculate( + self, + action: str, + namespace: str = "awoooi-prod", + target: str = "", + ) -> BlastRadiusResult: + """ + 計算動作的爆炸半徑分數。 + + Args: + action: 修復命令(kubectl command or description) + namespace: 目標命名空間 + target: 目標資源名稱 + + Returns: + BlastRadiusResult(包含分數、分級、計分依據) + """ + action_lower = action.lower() + reasons: list[str] = [] + + # 1. HARD_RULES 永擋優先檢查 + for pattern, block_reason in _HARD_BLOCK_PATTERNS: + if re.search(pattern, action, re.IGNORECASE): + logger.warning( + "blast_radius_hard_blocked", + action=action[:120], + reason=block_reason, + ) + return BlastRadiusResult( + score=BLAST_BLOCKED, + tier="blocked", + reason=block_reason, + hard_blocked=True, + blocked_reason=block_reason, + ) + + # 2. 基礎分(依 kubectl 動作類型) + base_score = 50 # 保守預設:未知動作 = 50(human tier) + matched_base_reason = "未知 kubectl 動作,保守 50 分" + + for pattern, score, reason in _BASE_SCORES: + if re.search(pattern, action, re.IGNORECASE): + if score == BLAST_BLOCKED: + return BlastRadiusResult( + score=BLAST_BLOCKED, + tier="blocked", + reason=reason, + hard_blocked=True, + blocked_reason=reason, + ) + base_score = score + matched_base_reason = reason + break + + reasons.append(f"基礎分 {base_score}:{matched_base_reason}") + + # 3. 命名空間倍率 + ns_multiplier = 1.0 + if namespace in _CRITICAL_NAMESPACES: + ns_multiplier = 2.5 + reasons.append(f"命名空間 {namespace} × 2.5(系統級)") + elif namespace == "default": + ns_multiplier = 1.8 + reasons.append("default 命名空間 × 1.8(全域影響)") + + # 4. 追加修正(replicas=0 已在 BASE_SCORES,其他特殊情境) + bonus = 0 + if "--force" in action_lower: + bonus += 20 + reasons.append("+20:--force flag 危險") + if "kube-system" in action_lower: + bonus += 40 + reasons.append("+40:kube-system 目標") + if "all" in action_lower and "kubectl delete" in action_lower: + bonus += 30 + reasons.append("+30:delete all 批量刪除") + + # 5. 最終分數 + raw_score = base_score * ns_multiplier + bonus + final_score = min(int(raw_score), 99) # 保留 100 給 HARD_RULES + + tier = _score_to_tier(final_score) + reasons.append(f"最終分 {final_score} → {tier}") + + reason_str = ";".join(reasons) + logger.debug( + "blast_radius_calculated", + score=final_score, + tier=tier, + action=action[:80], + reason=reason_str, + ) + return BlastRadiusResult( + score=final_score, + tier=tier, + reason=reason_str, + hard_blocked=False, + blocked_reason=None, + ) + + +# ───────────────────────────────────────────────────────────────────────────── +# Helpers +# ───────────────────────────────────────────────────────────────────────────── + +def _score_to_tier(score: int) -> str: + if score <= TIER_AUTO_MAX: + return "auto" + elif score <= TIER_HUMAN_MAX: + return "human" + elif score <= TIER_DUAL_MAX: + return "dual" + else: + return "blocked" + + +# ───────────────────────────────────────────────────────────────────────────── +# Singleton +# ───────────────────────────────────────────────────────────────────────────── + +_calculator: BlastRadiusCalculator | None = None + + +def get_blast_radius_calculator() -> BlastRadiusCalculator: + global _calculator + if _calculator is None: + _calculator = BlastRadiusCalculator() + return _calculator diff --git a/apps/api/src/services/decision_manager.py b/apps/api/src/services/decision_manager.py index 4375a8ed..725a6daa 100644 --- a/apps/api/src/services/decision_manager.py +++ b/apps/api/src/services/decision_manager.py @@ -1471,6 +1471,84 @@ class DecisionManager: ) return + # Phase 5 ADR-086: Blast Radius 分級守衛(AIOPS_P5_BLAST_RADIUS_CHECK 控制) + # 評估修復動作的爆炸半徑,決定是否可自動執行或需升級審核 + # 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 5 初始建立 + try: + from src.core.feature_flags import aiops_flags as _p5_flags + if _p5_flags.AIOPS_P5_BLAST_RADIUS_CHECK: + from src.services.blast_radius_calculator import get_blast_radius_calculator + from src.services.declarative_remediation import get_declarative_remediation + + _calc = get_blast_radius_calculator() + _blast = _calc.calculate(action, namespace=_ns, target=_target) + _spec = get_declarative_remediation().evaluate( + action=action, target=_target, namespace=_ns, + description=token.proposal_data.get("description", ""), + ) + + # 記錄分級結果到 proposal_data(供學習 + 審計) + token.proposal_data["blast_radius_score"] = _blast.score + token.proposal_data["blast_radius_tier"] = _blast.tier + token.proposal_data["blast_radius_reason"] = _blast.reason + + if _blast.tier == "blocked": + # HARD_RULES 永擋 + logger.warning( + "auto_execute_blast_radius_hard_blocked", + incident_id=incident.incident_id, + action=action[:80], + reason=_blast.reason, + ) + token.state = DecisionState.READY + token.proposal_data["auto_executed"] = False + token.proposal_data["mcp_all_failed"] = True + token.proposal_data["blocked_reason"] = f"HARD_RULES 永擋:{_blast.reason}" + await self._save_token(token) + _fire_and_forget(_push_decision_to_telegram(incident, token.proposal_data)) + return + + elif _blast.tier in ("human", "dual"): + # 中高衝擊 → 升級人工審核,不自動執行 + logger.info( + "auto_execute_blast_radius_escalated", + incident_id=incident.incident_id, + tier=_blast.tier, + score=_blast.score, + action=action[:80], + ) + token.state = DecisionState.READY + token.proposal_data["auto_executed"] = False + token.proposal_data["requires_human_review"] = True + token.proposal_data["blast_radius_escalated"] = True + await self._save_token(token) + # dual tier → 非同步建立 GitOps Issue + if _blast.tier == "dual" and _p5_flags.AIOPS_P5_GITOPS_PR: + from src.services.gitops_pr_service import get_gitops_pr_service + _fire_and_forget( + get_gitops_pr_service().create_repair_issue( + spec=_spec, + incident_id=incident.incident_id, + diagnosis=token.proposal_data.get("debate_summary", ""), + ) + ) + _fire_and_forget(_push_decision_to_telegram(incident, token.proposal_data)) + return + # tier == "auto" → 繼續自動執行流程 + except Exception as _blast_err: + # Blast Radius 計算失敗 → 保守:視為 human tier,升級人工審核 + logger.warning( + "blast_radius_check_failed_conservative_escalate", + incident_id=incident.incident_id, + error=str(_blast_err), + ) + token.state = DecisionState.READY + token.proposal_data["auto_executed"] = False + token.proposal_data["blast_radius_tier"] = "unknown_conservative" + await self._save_token(token) + _fire_and_forget(_push_decision_to_telegram(incident, token.proposal_data)) + return + try: # 延遲導入避免循環依賴 from src.models.approval import ApprovalRequest, ApprovalStatus diff --git a/apps/api/src/services/declarative_remediation.py b/apps/api/src/services/declarative_remediation.py new file mode 100644 index 00000000..be1f32b9 --- /dev/null +++ b/apps/api/src/services/declarative_remediation.py @@ -0,0 +1,260 @@ +""" +AWOOOI AIOps Phase 5 — Declarative Remediation(宣告式修復) +============================================================= +職責:將修復動作包裝為 DeclarativeSpec,依爆炸半徑分四級分控。 + +分級邏輯: + tier=auto (≤10) → 可自動執行,必要時 dry-run 確認 + tier=human (11-50) → 送人工審核(ApprovalRequest),不自動執行 + tier=dual (51-99) → 需雙人審核 + GitOps PR + tier=blocked (100) → HARD_RULES 永擋,永不執行 + +設計原則: +1. 主入口 evaluate() 是純函數,不呼叫任何 I/O +2. dry_run_required 由 AIOPS_P5_DRY_RUN_ENFORCED flag 控制 +3. rollback_plan 自動從 kubectl 動作推導(不問 LLM,確保穩定性) +4. DeclarativeSpec 是不可變 dataclass(frozen=True) + +ADR-086: Phase 5 Declarative 修復與 Blast Radius 分控 +2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 5 初始建立 +""" + +from __future__ import annotations + +import re +from dataclasses import dataclass + +import structlog + +from src.services.blast_radius_calculator import ( + BlastRadiusResult, + get_blast_radius_calculator, +) + +logger = structlog.get_logger(__name__) + + +# ───────────────────────────────────────────────────────────────────────────── +# Data Types +# ───────────────────────────────────────────────────────────────────────────── + +@dataclass(frozen=True) +class DeclarativeSpec: + """ + 宣告式修復規格(不可變) + + 代替舊的直接 kubectl 字串,成為決策層統一輸出格式。 + 下游(_auto_execute / GitOps PR / RollbackManager)依此判斷如何執行。 + """ + action: str # kubectl 命令(已解析好 target/namespace) + target_state: str # 目標狀態描述(人類可讀) + blast_radius_score: int # 0-100 + tier: str # "auto" / "human" / "dual" / "blocked" + dry_run_required: bool # 執行前是否必須 dry-run + constraints: list[str] # 安全約束條件 + rollback_plan: str # 如何回滾 + namespace: str # 目標命名空間 + target: str # 目標資源 + blocked_reason: str | None # tier=blocked 時的永擋原因 + blast_reason: str # 計分依據(可審計) + + @property + def can_auto_execute(self) -> bool: + """是否可自動執行(tier=auto 且未被永擋)""" + return self.tier == "auto" and not self.blocked_reason + + @property + def requires_gitops_pr(self) -> bool: + """是否需要 GitOps PR(tier=dual)""" + return self.tier == "dual" + + @property + def requires_human_approval(self) -> bool: + """是否需要人工審核(human 或 dual)""" + return self.tier in ("human", "dual") + + def to_dict(self) -> dict: + return { + "action": self.action, + "target_state": self.target_state, + "blast_radius_score": self.blast_radius_score, + "tier": self.tier, + "dry_run_required": self.dry_run_required, + "constraints": self.constraints, + "rollback_plan": self.rollback_plan, + "namespace": self.namespace, + "target": self.target, + "blocked_reason": self.blocked_reason, + "blast_reason": self.blast_reason, + } + + +# ───────────────────────────────────────────────────────────────────────────── +# Main Service +# ───────────────────────────────────────────────────────────────────────────── + +class DeclarativeRemediation: + """ + 宣告式修復評估器 + + Usage: + svc = DeclarativeRemediation() + spec = svc.evaluate( + action="kubectl rollout restart deployment/awoooi-api", + target="awoooi-api", + namespace="awoooi-prod", + ) + if spec.can_auto_execute: + # 直接執行 + """ + + def evaluate( + self, + action: str, + target: str = "", + namespace: str = "awoooi-prod", + description: str = "", + ) -> DeclarativeSpec: + """ + 評估修復動作,輸出 DeclarativeSpec。 + + Args: + action: kubectl 命令(已替換 placeholder) + target: 目標資源名稱 + namespace: 目標命名空間 + description: 動作描述(供人類閱讀) + + Returns: + DeclarativeSpec(不可變,包含分級決策) + """ + from src.core.feature_flags import aiops_flags + + # 計算爆炸半徑 + calc = get_blast_radius_calculator() + blast: BlastRadiusResult = calc.calculate(action, namespace=namespace, target=target) + + # dry-run 要求(flag 控制) + dry_run_required = aiops_flags.AIOPS_P5_DRY_RUN_ENFORCED and blast.tier != "blocked" + + # 安全約束 + constraints = _build_constraints(action, namespace, blast.score) + + # 回滾計畫(從動作自動推導) + rollback_plan = _infer_rollback_plan(action, target, namespace) + + # 目標狀態描述 + target_state = description or _infer_target_state(action, target) + + spec = DeclarativeSpec( + action=action, + target_state=target_state, + blast_radius_score=blast.score, + tier=blast.tier, + dry_run_required=dry_run_required, + constraints=constraints, + rollback_plan=rollback_plan, + namespace=namespace, + target=target, + blocked_reason=blast.blocked_reason, + blast_reason=blast.reason, + ) + + logger.info( + "declarative_spec_evaluated", + tier=spec.tier, + blast_radius=spec.blast_radius_score, + can_auto=spec.can_auto_execute, + action=action[:80], + ) + return spec + + +# ───────────────────────────────────────────────────────────────────────────── +# Helpers +# ───────────────────────────────────────────────────────────────────────────── + +def _build_constraints(action: str, namespace: str, score: int) -> list[str]: + """依動作特性建立安全約束清單。""" + constraints: list[str] = [] + + if namespace != "awoooi-prod": + constraints.append(f"target_namespace={namespace}(非 prod 環境)") + + if "scale" in action.lower(): + # scale 操作:副本數不能為 0(防止停服) + m = re.search(r"--replicas=(\d+)", action) + if m: + replicas = int(m.group(1)) + if replicas == 0: + constraints.append("scale_to_zero=true(停服風險)") + else: + constraints.append(f"min_replicas={max(1, replicas - 1)}(防過度縮減)") + + if score >= 40: + constraints.append("pre_execute_snapshot_required(高衝擊需先存狀態)") + + if "rollout undo" in action.lower(): + constraints.append("verify_previous_revision_exists(確認有上一版本可回滾)") + + return constraints + + +def _infer_rollback_plan(action: str, target: str, namespace: str) -> str: + """從修復動作推導對應的回滾計畫(不呼叫 LLM,保持穩定性)。""" + a_lower = action.lower() + + if "rollout restart" in a_lower: + return f"kubectl rollout undo deployment/{target} -n {namespace}" + if "rollout undo" in a_lower: + return f"kubectl rollout status deployment/{target} -n {namespace} && 手動確認回滾版本" + if "scale" in a_lower: + m = re.search(r"--replicas=(\d+)", action) + orig = int(m.group(1)) if m else 2 + restore = max(1, orig) + return f"kubectl scale deployment/{target} --replicas={restore} -n {namespace}" + if "apply" in a_lower: + return f"kubectl rollout undo deployment/{target} -n {namespace}" + if "set image" in a_lower: + return f"kubectl rollout undo deployment/{target} -n {namespace}" + if "delete pod" in a_lower: + return "Pod 會由 ReplicaSet 自動重建;若 Deployment 異常請執行 rollout undo" + if "patch" in a_lower: + return f"kubectl patch + 手動還原原始 spec,或 kubectl rollout undo deployment/{target} -n {namespace}" + + return f"kubectl rollout undo deployment/{target} -n {namespace}(通用回滾)" + + +def _infer_target_state(action: str, target: str) -> str: + """從動作推導目標狀態描述。""" + a_lower = action.lower() + + if "rollout restart" in a_lower: + return f"重新啟動 {target}(全 Pod 滾動重建)" + if "rollout undo" in a_lower: + return f"將 {target} 回滾至前一個部署版本" + if "scale" in a_lower: + m = re.search(r"--replicas=(\d+)", action) + n = m.group(1) if m else "?" + return f"調整 {target} 副本數至 {n}" + if "set image" in a_lower: + return f"更新 {target} 容器映像" + if "delete pod" in a_lower: + return f"刪除 {target} Pod(由 ReplicaSet 自動重建)" + if "apply" in a_lower: + return f"套用新配置到 {target}" + + return action[:120] + + +# ───────────────────────────────────────────────────────────────────────────── +# Singleton +# ───────────────────────────────────────────────────────────────────────────── + +_service: DeclarativeRemediation | None = None + + +def get_declarative_remediation() -> DeclarativeRemediation: + global _service + if _service is None: + _service = DeclarativeRemediation() + return _service diff --git a/apps/api/src/services/gitops_pr_service.py b/apps/api/src/services/gitops_pr_service.py new file mode 100644 index 00000000..7a03d11e --- /dev/null +++ b/apps/api/src/services/gitops_pr_service.py @@ -0,0 +1,235 @@ +""" +AWOOOI AIOps Phase 5 — GitOps PR Service(GitOps 高風險修復 PR) +================================================================ +職責:當 Blast Radius > 50(tier=dual)時,在 Gitea 建立 Issue 記錄修復計畫, + 等待雙人審核後方可執行。 + +設計原則: +1. 只建立 Gitea Issue(不直接推 PR)— 修復計畫在 Issue 描述,含 rollback plan +2. 非阻塞:建立失敗不影響主路徑(fallback → 人工審核 Telegram 通知) +3. Issue 標題含 [AI-Repair] 前綴,方便篩選 +4. 連線失敗時記錄 warning,不拋出例外 + +NOTE: 使用 Issue 而非 PR 的原因—— + 修復動作是 kubectl 命令,不是代碼變更,沒有對應 diff 可以 PR。 + Issue 提供人類可讀的審計軌跡,並觸發 Gitea 通知。 + +ADR-086: Phase 5 Declarative 修復與 Blast Radius 分控 +2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 5 初始建立 +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import TYPE_CHECKING + +import structlog + +if TYPE_CHECKING: + from src.services.declarative_remediation import DeclarativeSpec + +logger = structlog.get_logger(__name__) + + +# ───────────────────────────────────────────────────────────────────────────── +# Data Types +# ───────────────────────────────────────────────────────────────────────────── + +@dataclass +class GitOpsPRResult: + """GitOps Issue 建立結果""" + success: bool + issue_url: str | None # Gitea Issue URL + issue_number: int | None # Issue 編號 + error: str | None # 失敗時的錯誤訊息 + + +# ───────────────────────────────────────────────────────────────────────────── +# Main Service +# ───────────────────────────────────────────────────────────────────────────── + +class GitOpsPRService: + """ + GitOps 高風險修復 Issue 建立器 + + Usage: + svc = GitOpsPRService() + result = await svc.create_repair_issue(spec, incident_id="INC-001") + if result.success: + print(result.issue_url) + """ + + async def create_repair_issue( + self, + spec: "DeclarativeSpec", + incident_id: str, + diagnosis: str = "", + ) -> GitOpsPRResult: + """ + 在 Gitea 建立高風險修復審核 Issue。 + + Args: + spec: DeclarativeSpec(必須是 tier=dual) + incident_id: 關聯的 Incident ID + diagnosis: 診斷摘要(供人類理解上下文) + + Returns: + GitOpsPRResult + """ + if not spec.requires_gitops_pr: + return GitOpsPRResult(success=False, issue_url=None, issue_number=None, + error="spec.tier 不是 dual,無需 GitOps PR") + + from src.core.feature_flags import aiops_flags + if not aiops_flags.AIOPS_P5_GITOPS_PR: + logger.info( + "gitops_pr_skipped_feature_flag", + incident_id=incident_id, + blast_radius=spec.blast_radius_score, + ) + return GitOpsPRResult(success=False, issue_url=None, issue_number=None, + error="AIOPS_P5_GITOPS_PR=False,跳過 Gitea Issue 建立") + + title = f"[AI-Repair] 高風險修復審核(Blast={spec.blast_radius_score})— {incident_id}" + body = _build_issue_body(spec, incident_id, diagnosis) + + return await self._create_gitea_issue(title, body, incident_id) + + async def _create_gitea_issue( + self, + title: str, + body: str, + incident_id: str, + ) -> GitOpsPRResult: + """呼叫 Gitea API 建立 Issue。""" + import httpx + from src.core.config import settings + + url = ( + f"{settings.GITEA_API_URL}/repos/" + f"{settings.GITEA_REPO_OWNER}/{settings.GITEA_REPO_NAME}/issues" + ) + headers = { + "Authorization": f"token {settings.GITEA_API_TOKEN}", + "Content-Type": "application/json", + } + payload = { + "title": title[:255], + "body": body, + "labels": [], + } + + try: + async with httpx.AsyncClient(timeout=10.0) as client: + resp = await client.post(url, json=payload, headers=headers) + resp.raise_for_status() + data = resp.json() + + issue_number = data.get("number") + issue_url = data.get("html_url", "") + + logger.info( + "gitops_issue_created", + incident_id=incident_id, + issue_number=issue_number, + issue_url=issue_url, + ) + return GitOpsPRResult( + success=True, + issue_url=issue_url, + issue_number=issue_number, + error=None, + ) + + except Exception as e: + logger.warning( + "gitops_issue_create_failed", + incident_id=incident_id, + error=str(e), + ) + return GitOpsPRResult(success=False, issue_url=None, issue_number=None, error=str(e)) + + +# ───────────────────────────────────────────────────────────────────────────── +# Helpers +# ───────────────────────────────────────────────────────────────────────────── + +def _build_issue_body( + spec: "DeclarativeSpec", + incident_id: str, + diagnosis: str, +) -> str: + """建立 Gitea Issue 描述(Markdown 格式)。""" + constraints_md = "\n".join(f"- {c}" for c in spec.constraints) or "(無額外約束)" + + return f"""## AI 自主修復審核請求 + +**Incident ID**: `{incident_id}` +**Blast Radius Score**: `{spec.blast_radius_score}` / 100(tier: `{spec.tier}`) +**需要**: 雙人審核後方可執行 + +--- + +## 修復計畫 + +**目標狀態**: {spec.target_state} + +**執行命令**: +```bash +{spec.action} +``` + +**命名空間**: `{spec.namespace}` +**目標資源**: `{spec.target}` +**需要 dry-run**: {'✅ 是' if spec.dry_run_required else '⬜ 否'} + +--- + +## 安全約束 + +{constraints_md} + +--- + +## 回滾計畫 + +```bash +{spec.rollback_plan} +``` + +--- + +## 爆炸半徑計分依據 + +{spec.blast_reason} + +--- + +## 診斷摘要 + +{diagnosis[:1000] if diagnosis else '(未提供診斷摘要)'} + +--- + +## 審核流程 + +1. SRE-1 確認問題診斷正確,評估修復計畫 +2. SRE-2 交叉驗證,確認回滾計畫可行 +3. 兩人均在 Telegram 回覆 `/approve {incident_id}` 後,系統自動執行 + +> 此 Issue 由 AWOOOI AI 自主修復系統(Phase 5 ADR-086)自動建立。 +""" + + +# ───────────────────────────────────────────────────────────────────────────── +# Singleton +# ───────────────────────────────────────────────────────────────────────────── + +_service: GitOpsPRService | None = None + + +def get_gitops_pr_service() -> GitOpsPRService: + global _service + if _service is None: + _service = GitOpsPRService() + return _service diff --git a/apps/api/src/services/learning_service.py b/apps/api/src/services/learning_service.py index d7cc8cb7..c2938e6a 100644 --- a/apps/api/src/services/learning_service.py +++ b/apps/api/src/services/learning_service.py @@ -586,6 +586,65 @@ class LearningService: execution_time_seconds=execution_time_seconds, ) + async def record_declarative_outcome( + self, + incident_id: str, + action: str, + blast_radius_score: int, + blast_radius_tier: str, + success: bool, + rollback_triggered: bool = False, + execution_time_seconds: float | None = None, + ) -> bool: + """ + 記錄 DeclarativeSpec 執行結果到學習系統。 + + Phase 5 ADR-086:DeclarativeSpec 執行結果寫入學習記錄, + 讓 AI 能從 Blast Radius 分級的執行歷史中學習。 + + Args: + incident_id: 關聯 Incident ID + action: 執行的 kubectl 命令 + blast_radius_score: 爆炸半徑分數(0-100) + blast_radius_tier: 執行分級(auto/human/dual/blocked) + success: 是否執行成功 + rollback_triggered: 是否觸發了回滾 + execution_time_seconds: 執行耗時 + + Returns: + bool: 是否成功記錄 + + 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 5 初始建立 + """ + import json + from src.utils.timezone import now_taipei + + try: + anomaly_key = f"declarative:{incident_id}" + fix_desc = json.dumps({ + "blast_radius_score": blast_radius_score, + "blast_radius_tier": blast_radius_tier, + "rollback_triggered": rollback_triggered, + "recorded_at": now_taipei().isoformat(), + }, ensure_ascii=False) + + return await self._repository.record_repair( + anomaly_key=anomaly_key, + repair_action=action[:200], + success=success, + root_cause=f"blast_radius_tier={blast_radius_tier}", + fix_description=fix_desc, + execution_time_seconds=execution_time_seconds, + ) + except Exception as e: + import structlog as _structlog + _structlog.get_logger(__name__).warning( + "record_declarative_outcome_failed", + incident_id=incident_id, + error=str(e), + ) + return False + async def get_recommended_fix(self, anomaly_key: str) -> dict: """ 根據歷史學習,推薦最佳修復方案 diff --git a/apps/api/src/services/rollback_manager.py b/apps/api/src/services/rollback_manager.py new file mode 100644 index 00000000..dcb7d213 --- /dev/null +++ b/apps/api/src/services/rollback_manager.py @@ -0,0 +1,264 @@ +""" +AWOOOI AIOps Phase 5 — Rollback Manager(自動回滾管理器) +========================================================= +職責:當 PostExecutionVerifier 判斷執行結果為 failed/degraded 時, + 自動觸發 Declarative rollback(kubectl rollout undo)。 + +設計原則: +1. 只回滾 Deployment(Rollout 管理的資源)— StatefulSet / DaemonSet 需人工 +2. 回滾前:驗證有可回滾的版本(kubectl rollout history revision ≥ 2) +3. 回滾後:等待 120s 確認 rollout 收斂,記錄結果 +4. 失敗降級:回滾自身失敗 → Tier 0 告警 + 人工介入 + +連結點: + PostExecutionVerifier.assess_recovery() → TriggerRollback.trigger() + → KubernetesMCPProvider.execute("kubectl rollout undo") + +ADR-086: Phase 5 Declarative 修復與 Blast Radius 分控 +2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 5 初始建立 +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import TYPE_CHECKING + +import structlog + +from src.utils.timezone import now_taipei + +if TYPE_CHECKING: + from src.services.declarative_remediation import DeclarativeSpec + +logger = structlog.get_logger(__name__) + +# 回滾後等待收斂的超時(秒) +ROLLBACK_CONVERGENCE_TIMEOUT_SEC = 120 + +# 回滾最大重試次數(防止無限 loop) +ROLLBACK_MAX_RETRIES = 1 + + +# ───────────────────────────────────────────────────────────────────────────── +# Data Types +# ───────────────────────────────────────────────────────────────────────────── + +@dataclass +class RollbackResult: + """回滾執行結果""" + success: bool + incident_id: str + deployment: str + namespace: str + rollback_command: str # 執行的 kubectl rollout undo 命令 + convergence_confirmed: bool # rollout status 確認收斂 + error: str | None + triggered_at: str + + +# ───────────────────────────────────────────────────────────────────────────── +# Main Service +# ───────────────────────────────────────────────────────────────────────────── + +class RollbackManager: + """ + 自動回滾管理器 + + Usage: + mgr = RollbackManager() + result = await mgr.trigger( + incident_id="INC-001", + spec=declarative_spec, + verification_result="failed", + ) + """ + + async def trigger( + self, + incident_id: str, + spec: "DeclarativeSpec", + verification_result: str, + ) -> RollbackResult: + """ + 根據驗證結果決定是否回滾,並執行。 + + Args: + incident_id: 關聯 Incident ID + spec: 原始 DeclarativeSpec(提供 rollback_plan) + verification_result: PostExecutionVerifier 結果("failed" / "degraded") + + Returns: + RollbackResult(不管成敗都回傳,不 raise) + """ + triggered_at = now_taipei().isoformat() + + # 只有 failed / degraded 才觸發回滾 + if verification_result not in ("failed", "degraded"): + return RollbackResult( + success=False, + incident_id=incident_id, + deployment=spec.target, + namespace=spec.namespace, + rollback_command="", + convergence_confirmed=False, + error=f"verification_result={verification_result},無需回滾", + triggered_at=triggered_at, + ) + + logger.warning( + "rollback_triggered", + incident_id=incident_id, + deployment=spec.target, + namespace=spec.namespace, + verification_result=verification_result, + original_action=spec.action[:80], + ) + + rollback_command = ( + f"kubectl rollout undo deployment/{spec.target} -n {spec.namespace}" + ) + + try: + # 1. 確認有可回滾的版本 + can_rollback = await self._has_previous_revision(spec.target, spec.namespace) + if not can_rollback: + return RollbackResult( + success=False, + incident_id=incident_id, + deployment=spec.target, + namespace=spec.namespace, + rollback_command=rollback_command, + convergence_confirmed=False, + error="無前一個 revision 可回滾(rollout history 只有 1 個版本)", + triggered_at=triggered_at, + ) + + # 2. 執行回滾 + exec_result = await self._execute_rollback(rollback_command, spec.namespace) + if not exec_result: + return RollbackResult( + success=False, + incident_id=incident_id, + deployment=spec.target, + namespace=spec.namespace, + rollback_command=rollback_command, + convergence_confirmed=False, + error="kubectl rollout undo 執行失敗", + triggered_at=triggered_at, + ) + + # 3. 等待收斂(非阻塞:用 rollout status 確認) + converged = await self._wait_convergence(spec.target, spec.namespace) + + result = RollbackResult( + success=True, + incident_id=incident_id, + deployment=spec.target, + namespace=spec.namespace, + rollback_command=rollback_command, + convergence_confirmed=converged, + error=None if converged else "rollout status 超時未收斂,人工確認", + triggered_at=triggered_at, + ) + + logger.info( + "rollback_completed", + incident_id=incident_id, + deployment=spec.target, + converged=converged, + ) + return result + + except Exception as e: + logger.error( + "rollback_failed_unexpected", + incident_id=incident_id, + deployment=spec.target, + error=str(e), + ) + return RollbackResult( + success=False, + incident_id=incident_id, + deployment=spec.target, + namespace=spec.namespace, + rollback_command=rollback_command, + convergence_confirmed=False, + error=str(e), + triggered_at=triggered_at, + ) + + # ────────────────────────────────────────────────────────────────────────── + # Private Helpers + # ────────────────────────────────────────────────────────────────────────── + + async def _has_previous_revision(self, deployment: str, namespace: str) -> bool: + """確認 Deployment 有前一個 revision(rollout history ≥ 2)。""" + from src.services.k8s_mcp import get_kubernetes_mcp + + k8s = get_kubernetes_mcp() + history_cmd = f"kubectl rollout history deployment/{deployment} -n {namespace}" + try: + result = await k8s.execute(history_cmd) + if not result.success: + logger.warning("rollback_history_check_failed", deployment=deployment, error=result.error) + return False + # rollout history 輸出含 "REVISION" 列 + 至少 2 行資料才有前一版本 + lines = [l for l in (result.output or "").splitlines() if l.strip() and not l.startswith("REVISION")] + return len(lines) >= 2 + except Exception as e: + logger.warning("rollback_history_error", deployment=deployment, error=str(e)) + return False + + async def _execute_rollback(self, command: str, namespace: str) -> bool: + """執行 kubectl rollout undo。""" + from src.services.k8s_mcp import get_kubernetes_mcp + + k8s = get_kubernetes_mcp() + try: + result = await k8s.execute(command) + if result.success: + logger.info("rollback_command_executed", command=command[:120]) + return True + logger.warning("rollback_command_failed", command=command[:120], error=result.error) + return False + except Exception as e: + logger.error("rollback_execute_error", command=command[:120], error=str(e)) + return False + + async def _wait_convergence(self, deployment: str, namespace: str) -> bool: + """等待 rollout 收斂(最多 ROLLBACK_CONVERGENCE_TIMEOUT_SEC 秒)。""" + from src.services.k8s_mcp import get_kubernetes_mcp + + k8s = get_kubernetes_mcp() + status_cmd = ( + f"kubectl rollout status deployment/{deployment} -n {namespace} " + f"--timeout={ROLLBACK_CONVERGENCE_TIMEOUT_SEC}s" + ) + try: + result = await k8s.execute(status_cmd) + converged = result.success and "successfully rolled out" in (result.output or "").lower() + if not converged: + logger.warning( + "rollback_convergence_timeout", + deployment=deployment, + namespace=namespace, + output=(result.output or "")[:200], + ) + return converged + except Exception as e: + logger.warning("rollback_convergence_check_error", deployment=deployment, error=str(e)) + return False + + +# ───────────────────────────────────────────────────────────────────────────── +# Singleton +# ───────────────────────────────────────────────────────────────────────────── + +_manager: RollbackManager | None = None + + +def get_rollback_manager() -> RollbackManager: + global _manager + if _manager is None: + _manager = RollbackManager() + return _manager