Files
awoooi/apps/api/src/services/drift_remediator.py
OG T 4912c7f307 fix(phase25): 首席架構師 Review R2 修正 (I1/I2/I3/I4/C3/M1)
I1: auto_repair_service — 失敗分支 anti_pattern task 補齊 _pending_tasks GC 防護
C3: drift_remediator — _kubectl_apply() 實作 resource_key 範圍過濾(修復虛設參數 bug)
M1: drift_remediator — _git_push() 標記 DISABLED,防止誤啟用
I2: drift.py — Telegram 通知移除失效的 adopt() 端點連結
I3: drift/page.tsx — handleScan POST body namespace→namespaces(對齊後端 DriftScanRequest)
I4: drift/page.tsx — 移除硬編碼英文字串,改用 t('loading')/t('highCount')/t('mediumCount')
i18n: zh-TW.json + en.json 補齊 drift.loading key

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-05 00:22:38 +08:00

240 lines
8.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Drift Remediator - Phase 25 P2 Config Drift Detection
======================================================
職責:確定性修復執行
- rollback()kubectl apply -f <git-yaml>(覆蓋回 Git 狀態)
- adopt()git commit + git push gitea main承認變更更新 Git
設計邊界(核心原則):
- 不使用 AI 判斷如何修復
- 只有人工確認按鈕後才執行
- rollback 失敗只通知,不重試(避免重複操作)
版本: v1.0
建立: 2026-04-04 (台北時區)
建立者: ogt (首席架構師設計) + Claude Code (實作)
"""
from __future__ import annotations
import asyncio
import subprocess
from typing import TYPE_CHECKING
import structlog
if TYPE_CHECKING:
from src.models.drift import DriftItem, DriftReport
logger = structlog.get_logger(__name__)
class DriftRemediator:
"""
確定性漂移修復執行器
職責邊界:
✅ kubectl apply覆蓋回 Git 狀態)
✅ git commit + push承認變更
❌ 不使用 AI 決定修復策略
❌ 不自動重試
"""
def __init__(self, k8s_dir: str = "k8s"):
self._k8s_dir = k8s_dir
async def rollback(
self,
report: "DriftReport",
resource_key: str | None = None,
) -> dict:
"""
覆蓋回 Git 狀態kubectl apply
Args:
report: 漂移報告
resource_key: 指定資源Kind/NameNone 表示全部
Returns:
{"success": bool, "message": str}
"""
logger.info(
"drift_rollback_start",
report_id=report.report_id,
resource=resource_key or "all",
)
try:
result = await asyncio.get_event_loop().run_in_executor(
None,
self._kubectl_apply,
report.namespace,
resource_key,
)
if result["success"]:
logger.info(
"drift_rollback_success",
report_id=report.report_id,
namespace=report.namespace,
)
await self._notify_telegram(
f"✅ 漂移已覆蓋回 Git 狀態\n"
f"Namespace: {report.namespace}\n"
f"資源: {resource_key or '全部'}"
)
else:
logger.error(
"drift_rollback_failed",
report_id=report.report_id,
error=result.get("message"),
)
await self._notify_telegram(
f"❌ 漂移覆蓋失敗,需要人工介入\n"
f"Namespace: {report.namespace}\n"
f"錯誤: {result.get('message', '')[:200]}"
)
return result
except Exception as e:
msg = f"rollback 異常: {str(e)}"
logger.error("drift_rollback_exception", error=str(e))
await self._notify_telegram(
f"❌ 漂移覆蓋異常\nNamespace: {report.namespace}\n錯誤: {str(e)[:200]}"
)
return {"success": False, "message": msg}
async def adopt(
self,
report: "DriftReport",
field_description: str = "",
) -> dict:
"""
承認變更git commit + git push gitea main
Args:
report: 漂移報告
field_description: 漂移欄位說明(用於 commit message
Returns:
{"success": bool, "message": str}
"""
logger.info(
"drift_adopt_start",
report_id=report.report_id,
namespace=report.namespace,
)
# 這裡不直接修改 git需要人工決定具體的值
# 而是提示用戶需要在本地執行 git 操作
# 在實際部署場景中,可透過 Gitea API 建立 PR 或直接 push
commit_msg = (
f"chore: adopt drift — {report.namespace} "
f"{field_description or report.summary}"
)
try:
result = await asyncio.get_event_loop().run_in_executor(
None,
self._git_push,
commit_msg,
)
if result["success"]:
logger.info("drift_adopt_success", report_id=report.report_id)
await self._notify_telegram(
f"✅ 漂移已承認Git 已更新\n"
f"Namespace: {report.namespace}\n"
f"Commit: {commit_msg[:80]}"
)
else:
logger.error("drift_adopt_failed", error=result.get("message"))
await self._notify_telegram(
f"❌ Git 更新失敗,需要人工處理\n"
f"錯誤: {result.get('message', '')[:200]}"
)
return result
except Exception as e:
logger.error("drift_adopt_exception", error=str(e))
return {"success": False, "message": str(e)}
# =========================================================================
# Private
# =========================================================================
def _kubectl_apply(self, namespace: str, resource_key: str | None) -> dict:
"""
執行 kubectl apply同步
2026-04-05 Claude Code: C3 修正 — resource_key 現在實際影響 apply 範圍
- resource_key=None: apply 整個 k8s/ 目錄
- resource_key="Deployment/api": 只 apply 匹配前綴的 YAML 檔
"""
try:
if resource_key:
# 從 resource_key (e.g. "Deployment/api") 推斷檔名前綴
kind_lower = resource_key.split("/")[0].lower() if "/" in resource_key else resource_key.lower()
import pathlib
k8s_path = pathlib.Path(self._k8s_dir)
matched = list(k8s_path.glob(f"*{kind_lower}*.yaml")) + list(k8s_path.glob(f"*{kind_lower}*.yml"))
if matched:
target = str(matched[0])
logger.info("kubectl_apply_targeted", resource_key=resource_key, file=target)
else:
# 找不到匹配檔案fallback 整目錄但記錄警告
logger.warning("kubectl_apply_no_match_fallback", resource_key=resource_key, k8s_dir=self._k8s_dir)
target = self._k8s_dir
else:
target = self._k8s_dir
cmd = ["kubectl", "apply", "-f", target, "-n", namespace, "--dry-run=none"]
proc = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=60,
)
if proc.returncode == 0:
return {"success": True, "message": proc.stdout[:500]}
else:
return {"success": False, "message": proc.stderr[:500]}
except subprocess.TimeoutExpired:
return {"success": False, "message": "kubectl apply 超時60s"}
except Exception as e:
return {"success": False, "message": str(e)}
def _git_push(self, _commit_msg: str) -> dict:
"""
執行 git add + commit + push gitea同步
2026-04-05 Claude Code: M1 — DISABLED 標記,避免誤啟用
adopt() 端點已回傳 501此方法目前不可到達。
ADR-057 起草後改由 Gitea PR API 實作,屆時此方法整體移除。
"""
# DISABLED: adopt() 端點已返回 501此方法不應被呼叫
# 保留程式碼僅作歷史參考ADR-057 完成後刪除
return {"success": False, "message": "git_push DISABLED — 請參考 ADR-057"}
async def _notify_telegram(self, message: str) -> None:
"""推送通知到 Telegram"""
try:
from src.services.telegram_gateway import get_telegram_gateway
tg = get_telegram_gateway()
await tg.send_text(message)
except Exception as e:
logger.warning("drift_remediator_telegram_failed", error=str(e))
_remediator: DriftRemediator | None = None
def get_drift_remediator() -> DriftRemediator:
global _remediator
if _remediator is None:
_remediator = DriftRemediator()
return _remediator