Files
awoooi/apps/api/src/services/drift_detector.py
OG T 02a276127e
All checks were successful
CD Pipeline / build-and-deploy (push) Successful in 1h1m39s
fix(sensors+drift+repair-card): 全景修復三個節點問題
Fix 1: sensors 7/8 失敗 — SSH host 短名展開 (pre_decision_investigator.py)
  根因: Prometheus instance label 為 "110:9100",split(":")[0]="110"
        SSH_MCP_ALLOWED_HOSTS 存完整 IP "192.168.0.110" → 7 個 SSH 工具全部失敗
  修復: 加入 _SHORT_HOST_MAP,"110"→"192.168.0.110",四台主機全覆蓋

Fix 2: Config Drift 誤報 — K8s 預設欄位加入白名單 (drift_detector.py)
  根因: kubectl rollout restart 後 restartedAt annotation 被偵測為 "medium" drift
        restartPolicy/dnsPolicy/terminationGracePeriodSeconds 等 K8s 自動填入欄位未白名單
  修復: _DEFAULT_ALLOWLIST_FIELDS 加入 13 個 K8s 執行時自動填入欄位

Fix 3: 修復請求卡內容垃圾 — fallback 帶入真實 error context (failure_watcher.py)
  根因: LLM 分析失敗時 root_cause = "規則引擎分類: K8S_ERROR"(無任何有用資訊)
  修復: fallback 改為 "[K8S_ERROR] {operation_type} 在 {target_resource} 失敗\n錯誤:{error_message[:200]}"

2026-04-16 ogt + Claude Sonnet 4.6(亞太)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-16 20:50:06 +08:00

358 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Drift Detector - Phase 25 P2 Config Drift Detection
=====================================================
職責:比對 Git YAML vs K8s 實際狀態,輸出結構化 DriftItem 列表
不判斷嚴重性,不解釋意圖,只做事實比對
版本: v1.0
建立: 2026-04-04 (台北時區)
建立者: ogt (首席架構師設計) + Claude Code (實作)
"""
from __future__ import annotations
import asyncio
import subprocess
import uuid
from pathlib import Path
from typing import Any
import structlog
import yaml
from src.models.drift import DriftItem, DriftLevel, DriftReport
logger = structlog.get_logger(__name__)
# 白名單欄位(靜默記錄,不告警)
_DEFAULT_ALLOWLIST_FIELDS = frozenset([
"spec.replicas",
"spec.template.spec.containers[*].resources.requests",
"spec.template.spec.containers[*].resources.limits",
"metadata.annotations",
"metadata.labels.pod-template-hash",
"metadata.resourceVersion",
"metadata.generation",
"metadata.uid",
"status",
# K8s 執行時自動填入的欄位Git manifest 不指定K8s 注入預設值或運行時資訊)
# 2026-04-16 ogt + Claude Sonnet 4.6: 修復 kubectl rollout restart 觸發假 drift 告警
# 根因awoooi-web rollout restart 後 restartedAt annotation 被偵測為 "medium" drift
"spec.template.metadata.annotations", # kubectl.kubernetes.io/restartedAt
"spec.template.metadata.creationTimestamp",
"spec.template.spec.restartPolicy", # K8s 預設: AlwaysGit 不指定)
"spec.template.spec.dnsPolicy", # K8s 預設: ClusterFirst
"spec.template.spec.terminationGracePeriodSeconds", # K8s 預設: 30
"spec.template.spec.schedulerName", # K8s 預設: default-scheduler
"spec.strategy.rollingUpdate", # K8s 預設: maxSurge=25%
"spec.strategy.type", # K8s 預設: RollingUpdate
"spec.progressDeadlineSeconds", # K8s 預設: 600
"spec.revisionHistoryLimit", # K8s 預設: 10
"metadata.creationTimestamp",
"spec.template.spec.containers[*].terminationMessagePath",
"spec.template.spec.containers[*].terminationMessagePolicy",
"spec.template.spec.containers[*].imagePullPolicy", # K8s 預設: IfNotPresent
])
# 關鍵欄位(必須立即告警)
_DEFAULT_CRITICAL_FIELDS = frozenset([
"spec.template.spec.containers[*].image",
"spec.template.spec.containers[*].env",
"spec.template.spec.containers[*].ports",
"spec.template.spec.volumes",
"spec.template.spec.serviceAccountName",
])
class GitStateReader:
"""從 Git HEAD 讀取 K8s YAML 狀態"""
def __init__(self, k8s_dir: str = "k8s"):
self._k8s_dir = Path(k8s_dir)
async def read(self, namespace: str) -> dict[str, Any]:
"""
讀取 Git HEAD 中指定 namespace 的所有 K8s YAML
Returns:
{resource_key: parsed_yaml_dict}
resource_key 格式: "{kind}/{name}"
"""
try:
result = await asyncio.get_event_loop().run_in_executor(
None, self._read_sync, namespace
)
return result
except Exception as e:
logger.warning("git_state_read_failed", namespace=namespace, error=str(e))
return {}
def _read_sync(self, namespace: str) -> dict[str, Any]:
resources: dict[str, Any] = {}
if not self._k8s_dir.exists():
logger.warning("k8s_dir_not_found", path=str(self._k8s_dir))
return resources
for yaml_file in self._k8s_dir.rglob("*.yaml"):
try:
with open(yaml_file) as f:
docs = list(yaml.safe_load_all(f))
for doc in docs:
if not doc or not isinstance(doc, dict):
continue
metadata = doc.get("metadata", {})
ns = metadata.get("namespace", "")
if ns and ns != namespace:
continue
kind = doc.get("kind", "")
name = metadata.get("name", "")
if kind and name:
key = f"{kind}/{name}"
resources[key] = doc
except Exception as e:
logger.debug("yaml_parse_failed", file=str(yaml_file), error=str(e))
return resources
class K8sStateReader:
"""從 kubectl 讀取 K8s 實際狀態"""
async def read(self, namespace: str) -> dict[str, Any]:
"""
透過 kubectl 取得指定 namespace 的實際狀態
Returns:
{resource_key: actual_resource_dict}
"""
try:
result = await asyncio.get_event_loop().run_in_executor(
None, self._read_sync, namespace
)
return result
except Exception as e:
logger.warning("k8s_state_read_failed", namespace=namespace, error=str(e))
return {}
def _read_sync(self, namespace: str) -> dict[str, Any]:
resources: dict[str, Any] = {}
resource_types = ["deployment", "service", "configmap", "ingress"]
for rtype in resource_types:
try:
proc = subprocess.run(
["kubectl", "get", rtype, "-n", namespace, "-o", "yaml"],
capture_output=True,
text=True,
timeout=30,
)
if proc.returncode != 0:
logger.debug("kubectl_failed", type=rtype, stderr=proc.stderr[:200])
continue
data = yaml.safe_load(proc.stdout)
if not data or data.get("kind") != "List":
continue
for item in data.get("items", []):
kind = item.get("kind", rtype.capitalize())
name = item.get("metadata", {}).get("name", "")
if name:
key = f"{kind}/{name}"
resources[key] = item
except subprocess.TimeoutExpired:
logger.warning("kubectl_timeout", type=rtype, namespace=namespace)
except Exception as e:
logger.warning("kubectl_error", type=rtype, error=str(e))
return resources
class DriftDetector:
"""
比對 Git vs K8s 實際狀態,輸出 DriftItem 列表
職責邊界:只做事實比對,不判斷嚴重性,不解釋意圖
"""
def __init__(
self,
k8s_dir: str = "k8s",
allowlist_fields: frozenset | None = None,
critical_fields: frozenset | None = None,
):
self._git_reader = GitStateReader(k8s_dir)
self._k8s_reader = K8sStateReader()
self._allowlist = allowlist_fields or _DEFAULT_ALLOWLIST_FIELDS
self._critical_fields = critical_fields or _DEFAULT_CRITICAL_FIELDS
async def scan(self, namespace: str, triggered_by: str = "cron") -> DriftReport:
"""
掃描指定 namespace 的漂移
Args:
namespace: K8s namespace
triggered_by: 觸發來源cron / webhook / api
Returns:
DriftReport含 DriftItem 列表,尚未分析 intent
"""
report_id = str(uuid.uuid4())[:8]
logger.info("drift_scan_start", namespace=namespace, report_id=report_id)
git_state, k8s_state = await asyncio.gather(
self._git_reader.read(namespace),
self._k8s_reader.read(namespace),
)
items: list[DriftItem] = []
# 比對 Git 中有的資源
for resource_key, git_resource in git_state.items():
actual_resource = k8s_state.get(resource_key)
if actual_resource is None:
# 資源在 Git 中存在但 K8s 中不存在(可能尚未部署)
logger.debug("resource_missing_in_k8s", resource=resource_key)
continue
kind, name = resource_key.split("/", 1)
diffs = self._diff_resources(git_resource, actual_resource, kind, name, namespace)
items.extend(diffs)
high_count = sum(1 for i in items if i.drift_level == DriftLevel.HIGH)
medium_count = sum(1 for i in items if i.drift_level == DriftLevel.MEDIUM)
info_count = sum(1 for i in items if i.drift_level == DriftLevel.INFO)
logger.info(
"drift_scan_done",
namespace=namespace,
report_id=report_id,
high=high_count,
medium=medium_count,
info=info_count,
)
return DriftReport(
report_id=report_id,
namespace=namespace,
items=items,
high_count=high_count,
medium_count=medium_count,
info_count=info_count,
triggered_by=triggered_by,
)
def _diff_resources(
self,
git_res: dict,
actual_res: dict,
kind: str,
name: str,
namespace: str,
) -> list[DriftItem]:
"""逐欄位比對兩個資源,回傳差異列表"""
items: list[DriftItem] = []
# 只比對 spec 層metadata 的動態欄位太多)
git_spec = git_res.get("spec", {})
actual_spec = actual_res.get("spec", {})
diffs = self._flatten_diff("spec", git_spec, actual_spec)
for field_path, (git_val, actual_val) in diffs.items():
is_allowlisted = self._is_allowlisted(field_path)
if is_allowlisted:
level = DriftLevel.INFO
elif self._is_critical(field_path):
level = DriftLevel.HIGH
else:
level = DriftLevel.MEDIUM
items.append(DriftItem(
resource_kind=kind,
resource_name=name,
namespace=namespace,
field_path=field_path,
git_value=git_val,
actual_value=actual_val,
drift_level=level,
is_allowlisted=is_allowlisted,
))
return items
def _flatten_diff(
self,
prefix: str,
git_dict: Any,
actual_dict: Any,
) -> dict[str, tuple[Any, Any]]:
"""遞迴展開並比對兩個 dict回傳 {field_path: (git_val, actual_val)}"""
diffs: dict[str, tuple[Any, Any]] = {}
if not isinstance(git_dict, dict) or not isinstance(actual_dict, dict):
if git_dict != actual_dict:
diffs[prefix] = (git_dict, actual_dict)
return diffs
all_keys = set(git_dict.keys()) | set(actual_dict.keys())
for key in all_keys:
path = f"{prefix}.{key}"
git_val = git_dict.get(key)
actual_val = actual_dict.get(key)
if git_val == actual_val:
continue
if isinstance(git_val, dict) and isinstance(actual_val, dict):
diffs.update(self._flatten_diff(path, git_val, actual_val))
else:
diffs[path] = (git_val, actual_val)
return diffs
@staticmethod
def _pattern_matches(pattern: str, field_path: str) -> bool:
"""
匹配 field_path 是否符合 pattern。
支援兩種萬用字元:
- [*] → 任意索引 (e.g. containers[*] 匹配 containers[0], containers[1])
- * → 任意字串段
2026-04-05 Claude Code: I4 修正 — 舊邏輯直接 strip [*] 導致
containers[*].image 無法匹配 containers[0].image (首席架構師 Review I4)
"""
import re as _re
# 將 pattern 轉為正則:[*] → \[\d+\]* → [^.]+
regex = _re.escape(pattern)
regex = regex.replace(r"\[\*\]", r"\[\d+\]")
regex = regex.replace(r"\*", r"[^.]+")
# 允許 pattern 是前綴field_path 可能更深,. 或 [ 或字串結尾均可)
return bool(_re.match(f"^{regex}(\\.|\\[|$)", field_path))
def _is_allowlisted(self, field_path: str) -> bool:
"""判斷欄位是否在白名單(靜默記錄不告警)"""
return any(self._pattern_matches(p, field_path) for p in self._allowlist)
def _is_critical(self, field_path: str) -> bool:
"""判斷欄位是否為關鍵欄位HIGH 等級)"""
return any(self._pattern_matches(p, field_path) for p in self._critical_fields)
# =============================================================================
# Singleton
# =============================================================================
_detector: DriftDetector | None = None
def get_drift_detector() -> DriftDetector:
global _detector
if _detector is None:
_detector = DriftDetector()
return _detector