fix(drift): normalize kustomize runtime defaults
All checks were successful
Code Review / ai-code-review (push) Successful in 21s
CD Pipeline / tests (push) Successful in 2m31s
CD Pipeline / build-and-deploy (push) Successful in 3m38s
CD Pipeline / post-deploy-checks (push) Successful in 1m53s

This commit is contained in:
Your Name
2026-05-19 02:02:43 +08:00
parent 9cfae83da3
commit 107c4f11cc
2 changed files with 548 additions and 2 deletions

View File

@@ -14,6 +14,7 @@ from __future__ import annotations
import asyncio
import subprocess
import uuid
from copy import deepcopy
from pathlib import Path
from typing import Any
@@ -89,6 +90,7 @@ class GitStateReader:
def _read_sync(self, namespace: str) -> dict[str, Any]:
resources: dict[str, Any] = {}
kustomization_cache: dict[Path, dict[str, Any] | None] = {}
if not self._k8s_dir.exists():
logger.warning("k8s_dir_not_found", path=str(self._k8s_dir))
@@ -98,9 +100,12 @@ class GitStateReader:
try:
with open(yaml_file) as f:
docs = list(yaml.safe_load_all(f))
kustomization = self._kustomization_for_file(yaml_file, kustomization_cache)
for doc in docs:
if not doc or not isinstance(doc, dict):
continue
if kustomization:
doc = self._apply_kustomization(doc, kustomization)
metadata = doc.get("metadata", {})
ns = metadata.get("namespace", "")
if ns and ns != namespace:
@@ -115,6 +120,165 @@ class GitStateReader:
return resources
def _kustomization_for_file(
self,
yaml_file: Path,
cache: dict[Path, dict[str, Any] | None],
) -> dict[str, Any] | None:
"""Return same-directory Kustomize settings when the file is an included resource."""
directory = yaml_file.parent
if yaml_file.name == "kustomization.yaml":
return None
if directory not in cache:
path = directory / "kustomization.yaml"
if not path.exists():
cache[directory] = None
else:
try:
with open(path) as f:
config = yaml.safe_load(f) or {}
cache[directory] = config if isinstance(config, dict) else None
except Exception as e:
logger.debug("kustomization_parse_failed", file=str(path), error=str(e))
cache[directory] = None
config = cache.get(directory)
if not config:
return None
resources = config.get("resources") or []
resource_names = {Path(str(resource)).name for resource in resources}
if resource_names and yaml_file.name not in resource_names:
return None
return config
def _apply_kustomization(self, resource: dict[str, Any], config: dict[str, Any]) -> dict[str, Any]:
"""Apply the Kustomize transforms that affect drift-relevant spec fields.
The scanner compares Git intent with live ArgoCD output. Reading raw YAML
skips Kustomize commonLabels and image transforms, which creates repeated
false drift alerts for selectors, affinity, and image tags.
"""
transformed = deepcopy(resource)
namespace = config.get("namespace")
if namespace and transformed.get("kind") != "Namespace":
metadata = transformed.setdefault("metadata", {})
metadata.setdefault("namespace", namespace)
common_labels = config.get("commonLabels") or {}
if isinstance(common_labels, dict) and common_labels:
self._apply_common_labels(transformed, common_labels)
images = config.get("images") or []
if isinstance(images, list) and images:
self._apply_image_overrides(transformed, images)
return transformed
def _apply_common_labels(self, resource: dict[str, Any], labels: dict[str, Any]) -> None:
labels = {str(k): str(v) for k, v in labels.items()}
kind = resource.get("kind")
metadata = resource.setdefault("metadata", {})
self._merge_labels(metadata.setdefault("labels", {}), labels)
spec = resource.setdefault("spec", {})
if kind in {"Deployment", "StatefulSet", "DaemonSet", "ReplicaSet"}:
selector = spec.setdefault("selector", {}).setdefault("matchLabels", {})
self._merge_labels(selector, labels)
template_metadata = spec.setdefault("template", {}).setdefault("metadata", {})
self._merge_labels(template_metadata.setdefault("labels", {}), labels)
elif kind == "Service":
selector = spec.setdefault("selector", {})
if isinstance(selector, dict):
self._merge_labels(selector, labels)
elif kind == "PodDisruptionBudget":
selector = spec.setdefault("selector", {}).setdefault("matchLabels", {})
self._merge_labels(selector, labels)
self._apply_label_selector_labels(spec, labels)
@staticmethod
def _merge_labels(target: dict[str, Any], labels: dict[str, str]) -> None:
for key, value in labels.items():
target.setdefault(key, value)
def _apply_label_selector_labels(self, value: Any, labels: dict[str, str]) -> None:
if isinstance(value, dict):
selector = value.get("labelSelector")
if isinstance(selector, dict):
match_labels = selector.setdefault("matchLabels", {})
if isinstance(match_labels, dict):
self._merge_labels(match_labels, labels)
for child in value.values():
self._apply_label_selector_labels(child, labels)
elif isinstance(value, list):
for child in value:
self._apply_label_selector_labels(child, labels)
def _apply_image_overrides(self, resource: dict[str, Any], images: list[Any]) -> None:
overrides = [image for image in images if isinstance(image, dict)]
if not overrides:
return
for pod_spec in self._iter_pod_specs(resource):
for key in ("containers", "initContainers"):
containers = pod_spec.get(key)
if not isinstance(containers, list):
continue
for container in containers:
if not isinstance(container, dict) or "image" not in container:
continue
image = container.get("image")
if isinstance(image, str):
container["image"] = self._rewrite_image(image, overrides)
@staticmethod
def _iter_pod_specs(resource: dict[str, Any]) -> list[dict[str, Any]]:
spec = resource.get("spec")
if not isinstance(spec, dict):
return []
pod_specs: list[dict[str, Any]] = []
template_spec = spec.get("template", {}).get("spec")
if isinstance(template_spec, dict):
pod_specs.append(template_spec)
job_template_spec = (
spec.get("jobTemplate", {})
.get("spec", {})
.get("template", {})
.get("spec")
)
if isinstance(job_template_spec, dict):
pod_specs.append(job_template_spec)
if resource.get("kind") == "Pod":
pod_specs.append(spec)
return pod_specs
def _rewrite_image(self, current_image: str, overrides: list[dict[str, Any]]) -> str:
current_name = self._image_name_without_tag(current_image)
for override in overrides:
source = str(override.get("name", ""))
new_name = str(override.get("newName") or self._image_name_without_tag(source))
new_tag = override.get("newTag")
source_name = self._image_name_without_tag(source)
if current_image != source and current_name != source_name:
continue
return f"{new_name}:{new_tag}" if new_tag else new_name
return current_image
@staticmethod
def _image_name_without_tag(image: str) -> str:
without_digest = image.split("@", 1)[0]
slash_index = without_digest.rfind("/")
colon_index = without_digest.rfind(":")
if colon_index > slash_index:
return without_digest[:colon_index]
return without_digest
class K8sStateReader:
"""從 kubectl 讀取 K8s 實際狀態"""
@@ -257,8 +421,8 @@ class DriftDetector:
items: list[DriftItem] = []
# 只比對 spec 層metadata 的動態欄位太多)
git_spec = git_res.get("spec", {})
actual_spec = actual_res.get("spec", {})
git_spec = self._normalized_spec(git_res)
actual_spec = self._normalized_spec(actual_res)
diffs = self._flatten_diff("spec", git_spec, actual_spec)
for field_path, (git_val, actual_val) in diffs.items():
@@ -283,6 +447,151 @@ class DriftDetector:
return items
def _normalized_spec(self, resource: dict[str, Any]) -> dict[str, Any]:
"""Normalize Kubernetes API defaults before field diffing."""
spec = deepcopy(resource.get("spec", {}))
if not isinstance(spec, dict):
return {}
kind = resource.get("kind")
if kind == "Service":
self._normalize_service_spec(spec)
if kind in {"Deployment", "StatefulSet", "DaemonSet", "ReplicaSet"}:
self._normalize_controller_defaults(spec)
self._normalize_template_defaults(spec)
return spec
@staticmethod
def _normalize_service_spec(spec: dict[str, Any]) -> None:
for field in (
"clusterIP",
"clusterIPs",
"ipFamilies",
"ipFamilyPolicy",
"internalTrafficPolicy",
):
spec.pop(field, None)
if spec.get("externalTrafficPolicy") == "Cluster":
spec.pop("externalTrafficPolicy", None)
if spec.get("sessionAffinity") == "None":
spec.pop("sessionAffinity", None)
ports = spec.get("ports")
if isinstance(ports, list):
for port in ports:
if isinstance(port, dict) and port.get("protocol") == "TCP":
port.pop("protocol", None)
def _normalize_controller_defaults(self, spec: dict[str, Any]) -> None:
if spec.get("progressDeadlineSeconds") == 600:
spec.pop("progressDeadlineSeconds", None)
if spec.get("revisionHistoryLimit") == 10:
spec.pop("revisionHistoryLimit", None)
strategy = spec.get("strategy")
if isinstance(strategy, dict):
rolling_update = strategy.get("rollingUpdate")
if isinstance(rolling_update, dict):
if rolling_update.get("maxSurge") == "25%":
rolling_update.pop("maxSurge", None)
if rolling_update.get("maxUnavailable") == "25%":
rolling_update.pop("maxUnavailable", None)
if not rolling_update:
strategy.pop("rollingUpdate", None)
if strategy.get("type") == "RollingUpdate" and len(strategy) == 1:
spec.pop("strategy", None)
def _normalize_template_defaults(self, spec: dict[str, Any]) -> None:
template = spec.get("template")
if not isinstance(template, dict):
return
template_metadata = template.get("metadata")
if isinstance(template_metadata, dict):
annotations = template_metadata.get("annotations")
if isinstance(annotations, dict):
annotations.pop("kubectl.kubernetes.io/restartedAt", None)
if not annotations:
template_metadata.pop("annotations", None)
pod_spec = template.get("spec")
if isinstance(pod_spec, dict):
self._normalize_pod_spec_defaults(pod_spec)
def _normalize_pod_spec_defaults(self, pod_spec: dict[str, Any]) -> None:
defaults = {
"restartPolicy": "Always",
"dnsPolicy": "ClusterFirst",
"schedulerName": "default-scheduler",
"terminationGracePeriodSeconds": 30,
}
for field, default in defaults.items():
if pod_spec.get(field) == default:
pod_spec.pop(field, None)
if pod_spec.get("securityContext") == {}:
pod_spec.pop("securityContext", None)
if pod_spec.get("serviceAccount") == pod_spec.get("serviceAccountName"):
pod_spec.pop("serviceAccount", None)
for key in ("containers", "initContainers"):
containers = pod_spec.get(key)
if isinstance(containers, list):
for container in containers:
if isinstance(container, dict):
self._normalize_container_defaults(container)
volumes = pod_spec.get("volumes")
if isinstance(volumes, list):
for volume in volumes:
if isinstance(volume, dict):
self._normalize_volume_defaults(volume)
def _normalize_container_defaults(self, container: dict[str, Any]) -> None:
if container.get("terminationMessagePath") == "/dev/termination-log":
container.pop("terminationMessagePath", None)
if container.get("terminationMessagePolicy") == "File":
container.pop("terminationMessagePolicy", None)
ports = container.get("ports")
if isinstance(ports, list):
for port in ports:
if isinstance(port, dict) and port.get("protocol") == "TCP":
port.pop("protocol", None)
for probe_name in ("livenessProbe", "readinessProbe", "startupProbe"):
probe = container.get(probe_name)
if isinstance(probe, dict):
self._normalize_probe_defaults(probe)
env = container.get("env")
if isinstance(env, list):
for item in env:
if isinstance(item, dict):
self._normalize_env_defaults(item)
@staticmethod
def _normalize_probe_defaults(probe: dict[str, Any]) -> None:
if probe.get("successThreshold") == 1:
probe.pop("successThreshold", None)
http_get = probe.get("httpGet")
if isinstance(http_get, dict) and http_get.get("scheme") == "HTTP":
http_get.pop("scheme", None)
@staticmethod
def _normalize_env_defaults(env_item: dict[str, Any]) -> None:
value_from = env_item.get("valueFrom")
if not isinstance(value_from, dict):
return
field_ref = value_from.get("fieldRef")
if isinstance(field_ref, dict) and field_ref.get("apiVersion") == "v1":
field_ref.pop("apiVersion", None)
@staticmethod
def _normalize_volume_defaults(volume: dict[str, Any]) -> None:
for source_key in ("secret", "configMap"):
source = volume.get(source_key)
if isinstance(source, dict) and source.get("defaultMode") in {420, "420"}:
source.pop("defaultMode", None)
def _flatten_diff(
self,
prefix: str,

View File

@@ -0,0 +1,237 @@
from __future__ import annotations
from copy import deepcopy
from src.models.drift import DriftLevel
from src.services.drift_detector import DriftDetector, GitStateReader
def test_git_state_reader_applies_kustomize_labels_and_images(tmp_path):
prod_dir = tmp_path / "k8s" / "awoooi-prod"
prod_dir.mkdir(parents=True)
(prod_dir / "kustomization.yaml").write_text(
"""
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
namespace: awoooi-prod
commonLabels:
environment: prod
system: awoooi
resources:
- deployment.yaml
images:
- name: registry.local/library/api:IMAGE_TAG_PLACEHOLDER
newName: registry.local/awoooi/api
newTag: abc123
""",
encoding="utf-8",
)
(prod_dir / "deployment.yaml").write_text(
"""
apiVersion: apps/v1
kind: Deployment
metadata:
name: awoooi-api
spec:
selector:
matchLabels:
app: awoooi-api
template:
metadata:
labels:
app: awoooi-api
spec:
containers:
- name: api
image: registry.local/library/api:IMAGE_TAG_PLACEHOLDER
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
podAffinityTerm:
labelSelector:
matchLabels:
app: awoooi-api
topologyKey: kubernetes.io/hostname
---
apiVersion: v1
kind: Service
metadata:
name: awoooi-api-svc
spec:
selector:
app: awoooi-api
ports:
- port: 8000
targetPort: 8000
""",
encoding="utf-8",
)
state = GitStateReader(str(tmp_path / "k8s"))._read_sync("awoooi-prod")
deployment = state["Deployment/awoooi-api"]
template = deployment["spec"]["template"]
assert deployment["metadata"]["namespace"] == "awoooi-prod"
assert deployment["spec"]["selector"]["matchLabels"]["environment"] == "prod"
assert template["metadata"]["labels"]["system"] == "awoooi"
assert (
template["spec"]["containers"][0]["image"]
== "registry.local/awoooi/api:abc123"
)
affinity_selector = template["spec"]["affinity"]["podAntiAffinity"][
"preferredDuringSchedulingIgnoredDuringExecution"
][0]["podAffinityTerm"]["labelSelector"]["matchLabels"]
assert affinity_selector["environment"] == "prod"
assert affinity_selector["system"] == "awoooi"
service = state["Service/awoooi-api-svc"]
assert service["metadata"]["namespace"] == "awoooi-prod"
assert service["spec"]["selector"]["environment"] == "prod"
assert service["spec"]["selector"]["system"] == "awoooi"
def test_kubernetes_api_defaults_do_not_create_drift():
detector = DriftDetector()
git_res = {
"kind": "Deployment",
"spec": {
"selector": {"matchLabels": {"app": "awoooi-api"}},
"template": {
"metadata": {"labels": {"app": "awoooi-api"}},
"spec": {
"serviceAccountName": "awoooi-executor",
"containers": [
{
"name": "api",
"image": "registry.local/awoooi/api:abc123",
"ports": [{"containerPort": 8000, "name": "http"}],
"readinessProbe": {
"httpGet": {"path": "/api/v1/health", "port": 8000},
"periodSeconds": 5,
},
"env": [
{
"name": "POD_NAME",
"valueFrom": {
"fieldRef": {"fieldPath": "metadata.name"}
},
}
],
}
],
"volumes": [
{"name": "runtime-secret", "secret": {"secretName": "s"}},
{"name": "runtime-config", "configMap": {"name": "c"}},
],
},
},
},
}
actual_res = deepcopy(git_res)
pod_spec = actual_res["spec"]["template"]["spec"]
pod_spec.update(
{
"serviceAccount": "awoooi-executor",
"restartPolicy": "Always",
"dnsPolicy": "ClusterFirst",
"schedulerName": "default-scheduler",
"terminationGracePeriodSeconds": 30,
"securityContext": {},
}
)
container = pod_spec["containers"][0]
container["terminationMessagePath"] = "/dev/termination-log"
container["terminationMessagePolicy"] = "File"
container["ports"][0]["protocol"] = "TCP"
container["readinessProbe"]["successThreshold"] = 1
container["readinessProbe"]["httpGet"]["scheme"] = "HTTP"
container["env"][0]["valueFrom"]["fieldRef"]["apiVersion"] = "v1"
pod_spec["volumes"][0]["secret"]["defaultMode"] = 420
pod_spec["volumes"][1]["configMap"]["defaultMode"] = 420
actual_res["spec"]["template"]["metadata"]["annotations"] = {
"kubectl.kubernetes.io/restartedAt": "2026-05-19T00:00:00+08:00"
}
assert (
detector._diff_resources(
git_res,
actual_res,
"Deployment",
"awoooi-api",
"awoooi-prod",
)
== []
)
def test_service_runtime_defaults_do_not_create_drift():
detector = DriftDetector()
git_res = {
"kind": "Service",
"spec": {
"type": "NodePort",
"selector": {"app": "awoooi-api"},
"ports": [{"port": 8000, "targetPort": 8000, "nodePort": 32334}],
},
}
actual_res = deepcopy(git_res)
actual_res["spec"].update(
{
"clusterIP": "10.43.156.119",
"clusterIPs": ["10.43.156.119"],
"ipFamilies": ["IPv4"],
"ipFamilyPolicy": "SingleStack",
"internalTrafficPolicy": "Cluster",
"externalTrafficPolicy": "Cluster",
"sessionAffinity": "None",
}
)
actual_res["spec"]["ports"][0]["protocol"] = "TCP"
assert (
detector._diff_resources(
git_res,
actual_res,
"Service",
"awoooi-api-svc",
"awoooi-prod",
)
== []
)
def test_real_container_env_drift_remains_visible():
detector = DriftDetector()
git_res = {
"kind": "Deployment",
"spec": {
"template": {
"metadata": {"labels": {"app": "awoooi-api"}},
"spec": {
"containers": [
{
"name": "api",
"image": "registry.local/awoooi/api:abc123",
"env": [{"name": "USE_AI_ROUTER", "value": "true"}],
}
]
},
}
},
}
actual_res = deepcopy(git_res)
actual_res["spec"]["template"]["spec"]["containers"][0]["env"].append(
{"name": "DUMMY_DEPLOY", "value": "1777914462"}
)
items = detector._diff_resources(
git_res,
actual_res,
"Deployment",
"awoooi-api",
"awoooi-prod",
)
assert [item.field_path for item in items] == ["spec.template.spec.containers"]
assert items[0].drift_level == DriftLevel.MEDIUM