I1: auto_repair_service — 失敗分支 anti_pattern task 補齊 _pending_tasks GC 防護
C3: drift_remediator — _kubectl_apply() 實作 resource_key 範圍過濾(修復虛設參數 bug)
M1: drift_remediator — _git_push() 標記 DISABLED,防止誤啟用
I2: drift.py — Telegram 通知移除失效的 adopt() 端點連結
I3: drift/page.tsx — handleScan POST body namespace→namespaces(對齊後端 DriftScanRequest)
I4: drift/page.tsx — 移除硬編碼英文字串,改用 t('loading')/t('highCount')/t('mediumCount')
i18n: zh-TW.json + en.json 補齊 drift.loading key
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
240 lines
8.2 KiB
Python
240 lines
8.2 KiB
Python
"""
|
||
Drift Remediator - Phase 25 P2 Config Drift Detection
|
||
======================================================
|
||
職責:確定性修復執行
|
||
- rollback():kubectl apply -f <git-yaml>(覆蓋回 Git 狀態)
|
||
- adopt():git commit + git push gitea main(承認變更,更新 Git)
|
||
|
||
設計邊界(核心原則):
|
||
- 不使用 AI 判斷如何修復
|
||
- 只有人工確認按鈕後才執行
|
||
- rollback 失敗只通知,不重試(避免重複操作)
|
||
|
||
版本: v1.0
|
||
建立: 2026-04-04 (台北時區)
|
||
建立者: ogt (首席架構師設計) + Claude Code (實作)
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import subprocess
|
||
from typing import TYPE_CHECKING
|
||
|
||
import structlog
|
||
|
||
if TYPE_CHECKING:
|
||
from src.models.drift import DriftItem, DriftReport
|
||
|
||
logger = structlog.get_logger(__name__)
|
||
|
||
|
||
class DriftRemediator:
|
||
"""
|
||
確定性漂移修復執行器
|
||
|
||
職責邊界:
|
||
✅ kubectl apply(覆蓋回 Git 狀態)
|
||
✅ git commit + push(承認變更)
|
||
❌ 不使用 AI 決定修復策略
|
||
❌ 不自動重試
|
||
"""
|
||
|
||
def __init__(self, k8s_dir: str = "k8s"):
|
||
self._k8s_dir = k8s_dir
|
||
|
||
async def rollback(
|
||
self,
|
||
report: "DriftReport",
|
||
resource_key: str | None = None,
|
||
) -> dict:
|
||
"""
|
||
覆蓋回 Git 狀態(kubectl apply)
|
||
|
||
Args:
|
||
report: 漂移報告
|
||
resource_key: 指定資源(Kind/Name),None 表示全部
|
||
|
||
Returns:
|
||
{"success": bool, "message": str}
|
||
"""
|
||
logger.info(
|
||
"drift_rollback_start",
|
||
report_id=report.report_id,
|
||
resource=resource_key or "all",
|
||
)
|
||
|
||
try:
|
||
result = await asyncio.get_event_loop().run_in_executor(
|
||
None,
|
||
self._kubectl_apply,
|
||
report.namespace,
|
||
resource_key,
|
||
)
|
||
|
||
if result["success"]:
|
||
logger.info(
|
||
"drift_rollback_success",
|
||
report_id=report.report_id,
|
||
namespace=report.namespace,
|
||
)
|
||
await self._notify_telegram(
|
||
f"✅ 漂移已覆蓋回 Git 狀態\n"
|
||
f"Namespace: {report.namespace}\n"
|
||
f"資源: {resource_key or '全部'}"
|
||
)
|
||
else:
|
||
logger.error(
|
||
"drift_rollback_failed",
|
||
report_id=report.report_id,
|
||
error=result.get("message"),
|
||
)
|
||
await self._notify_telegram(
|
||
f"❌ 漂移覆蓋失敗,需要人工介入\n"
|
||
f"Namespace: {report.namespace}\n"
|
||
f"錯誤: {result.get('message', '')[:200]}"
|
||
)
|
||
|
||
return result
|
||
|
||
except Exception as e:
|
||
msg = f"rollback 異常: {str(e)}"
|
||
logger.error("drift_rollback_exception", error=str(e))
|
||
await self._notify_telegram(
|
||
f"❌ 漂移覆蓋異常\nNamespace: {report.namespace}\n錯誤: {str(e)[:200]}"
|
||
)
|
||
return {"success": False, "message": msg}
|
||
|
||
async def adopt(
|
||
self,
|
||
report: "DriftReport",
|
||
field_description: str = "",
|
||
) -> dict:
|
||
"""
|
||
承認變更:git commit + git push gitea main
|
||
|
||
Args:
|
||
report: 漂移報告
|
||
field_description: 漂移欄位說明(用於 commit message)
|
||
|
||
Returns:
|
||
{"success": bool, "message": str}
|
||
"""
|
||
logger.info(
|
||
"drift_adopt_start",
|
||
report_id=report.report_id,
|
||
namespace=report.namespace,
|
||
)
|
||
|
||
# 這裡不直接修改 git(需要人工決定具體的值),
|
||
# 而是提示用戶需要在本地執行 git 操作
|
||
# 在實際部署場景中,可透過 Gitea API 建立 PR 或直接 push
|
||
commit_msg = (
|
||
f"chore: adopt drift — {report.namespace} "
|
||
f"{field_description or report.summary}"
|
||
)
|
||
|
||
try:
|
||
result = await asyncio.get_event_loop().run_in_executor(
|
||
None,
|
||
self._git_push,
|
||
commit_msg,
|
||
)
|
||
|
||
if result["success"]:
|
||
logger.info("drift_adopt_success", report_id=report.report_id)
|
||
await self._notify_telegram(
|
||
f"✅ 漂移已承認,Git 已更新\n"
|
||
f"Namespace: {report.namespace}\n"
|
||
f"Commit: {commit_msg[:80]}"
|
||
)
|
||
else:
|
||
logger.error("drift_adopt_failed", error=result.get("message"))
|
||
await self._notify_telegram(
|
||
f"❌ Git 更新失敗,需要人工處理\n"
|
||
f"錯誤: {result.get('message', '')[:200]}"
|
||
)
|
||
|
||
return result
|
||
|
||
except Exception as e:
|
||
logger.error("drift_adopt_exception", error=str(e))
|
||
return {"success": False, "message": str(e)}
|
||
|
||
# =========================================================================
|
||
# Private
|
||
# =========================================================================
|
||
|
||
def _kubectl_apply(self, namespace: str, resource_key: str | None) -> dict:
|
||
"""
|
||
執行 kubectl apply(同步)
|
||
|
||
2026-04-05 Claude Code: C3 修正 — resource_key 現在實際影響 apply 範圍
|
||
- resource_key=None: apply 整個 k8s/ 目錄
|
||
- resource_key="Deployment/api": 只 apply 匹配前綴的 YAML 檔
|
||
"""
|
||
try:
|
||
if resource_key:
|
||
# 從 resource_key (e.g. "Deployment/api") 推斷檔名前綴
|
||
kind_lower = resource_key.split("/")[0].lower() if "/" in resource_key else resource_key.lower()
|
||
import pathlib
|
||
k8s_path = pathlib.Path(self._k8s_dir)
|
||
matched = list(k8s_path.glob(f"*{kind_lower}*.yaml")) + list(k8s_path.glob(f"*{kind_lower}*.yml"))
|
||
if matched:
|
||
target = str(matched[0])
|
||
logger.info("kubectl_apply_targeted", resource_key=resource_key, file=target)
|
||
else:
|
||
# 找不到匹配檔案,fallback 整目錄但記錄警告
|
||
logger.warning("kubectl_apply_no_match_fallback", resource_key=resource_key, k8s_dir=self._k8s_dir)
|
||
target = self._k8s_dir
|
||
else:
|
||
target = self._k8s_dir
|
||
|
||
cmd = ["kubectl", "apply", "-f", target, "-n", namespace, "--dry-run=none"]
|
||
proc = subprocess.run(
|
||
cmd,
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=60,
|
||
)
|
||
if proc.returncode == 0:
|
||
return {"success": True, "message": proc.stdout[:500]}
|
||
else:
|
||
return {"success": False, "message": proc.stderr[:500]}
|
||
except subprocess.TimeoutExpired:
|
||
return {"success": False, "message": "kubectl apply 超時(60s)"}
|
||
except Exception as e:
|
||
return {"success": False, "message": str(e)}
|
||
|
||
def _git_push(self, _commit_msg: str) -> dict:
|
||
"""
|
||
執行 git add + commit + push gitea(同步)
|
||
|
||
2026-04-05 Claude Code: M1 — DISABLED 標記,避免誤啟用
|
||
adopt() 端點已回傳 501,此方法目前不可到達。
|
||
ADR-057 起草後改由 Gitea PR API 實作,屆時此方法整體移除。
|
||
"""
|
||
# DISABLED: adopt() 端點已返回 501,此方法不應被呼叫
|
||
# 保留程式碼僅作歷史參考,ADR-057 完成後刪除
|
||
return {"success": False, "message": "git_push DISABLED — 請參考 ADR-057"}
|
||
|
||
|
||
async def _notify_telegram(self, message: str) -> None:
|
||
"""推送通知到 Telegram"""
|
||
try:
|
||
from src.services.telegram_gateway import get_telegram_gateway
|
||
tg = get_telegram_gateway()
|
||
await tg.send_text(message)
|
||
except Exception as e:
|
||
logger.warning("drift_remediator_telegram_failed", error=str(e))
|
||
|
||
|
||
_remediator: DriftRemediator | None = None
|
||
|
||
|
||
def get_drift_remediator() -> DriftRemediator:
|
||
global _remediator
|
||
if _remediator is None:
|
||
_remediator = DriftRemediator()
|
||
return _remediator
|