fix(drift): normalize kustomize runtime defaults
This commit is contained in:
@@ -14,6 +14,7 @@ from __future__ import annotations
|
||||
import asyncio
|
||||
import subprocess
|
||||
import uuid
|
||||
from copy import deepcopy
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
@@ -89,6 +90,7 @@ class GitStateReader:
|
||||
|
||||
def _read_sync(self, namespace: str) -> dict[str, Any]:
|
||||
resources: dict[str, Any] = {}
|
||||
kustomization_cache: dict[Path, dict[str, Any] | None] = {}
|
||||
|
||||
if not self._k8s_dir.exists():
|
||||
logger.warning("k8s_dir_not_found", path=str(self._k8s_dir))
|
||||
@@ -98,9 +100,12 @@ class GitStateReader:
|
||||
try:
|
||||
with open(yaml_file) as f:
|
||||
docs = list(yaml.safe_load_all(f))
|
||||
kustomization = self._kustomization_for_file(yaml_file, kustomization_cache)
|
||||
for doc in docs:
|
||||
if not doc or not isinstance(doc, dict):
|
||||
continue
|
||||
if kustomization:
|
||||
doc = self._apply_kustomization(doc, kustomization)
|
||||
metadata = doc.get("metadata", {})
|
||||
ns = metadata.get("namespace", "")
|
||||
if ns and ns != namespace:
|
||||
@@ -115,6 +120,165 @@ class GitStateReader:
|
||||
|
||||
return resources
|
||||
|
||||
def _kustomization_for_file(
|
||||
self,
|
||||
yaml_file: Path,
|
||||
cache: dict[Path, dict[str, Any] | None],
|
||||
) -> dict[str, Any] | None:
|
||||
"""Return same-directory Kustomize settings when the file is an included resource."""
|
||||
directory = yaml_file.parent
|
||||
if yaml_file.name == "kustomization.yaml":
|
||||
return None
|
||||
if directory not in cache:
|
||||
path = directory / "kustomization.yaml"
|
||||
if not path.exists():
|
||||
cache[directory] = None
|
||||
else:
|
||||
try:
|
||||
with open(path) as f:
|
||||
config = yaml.safe_load(f) or {}
|
||||
cache[directory] = config if isinstance(config, dict) else None
|
||||
except Exception as e:
|
||||
logger.debug("kustomization_parse_failed", file=str(path), error=str(e))
|
||||
cache[directory] = None
|
||||
|
||||
config = cache.get(directory)
|
||||
if not config:
|
||||
return None
|
||||
|
||||
resources = config.get("resources") or []
|
||||
resource_names = {Path(str(resource)).name for resource in resources}
|
||||
if resource_names and yaml_file.name not in resource_names:
|
||||
return None
|
||||
return config
|
||||
|
||||
def _apply_kustomization(self, resource: dict[str, Any], config: dict[str, Any]) -> dict[str, Any]:
|
||||
"""Apply the Kustomize transforms that affect drift-relevant spec fields.
|
||||
|
||||
The scanner compares Git intent with live ArgoCD output. Reading raw YAML
|
||||
skips Kustomize commonLabels and image transforms, which creates repeated
|
||||
false drift alerts for selectors, affinity, and image tags.
|
||||
"""
|
||||
transformed = deepcopy(resource)
|
||||
|
||||
namespace = config.get("namespace")
|
||||
if namespace and transformed.get("kind") != "Namespace":
|
||||
metadata = transformed.setdefault("metadata", {})
|
||||
metadata.setdefault("namespace", namespace)
|
||||
|
||||
common_labels = config.get("commonLabels") or {}
|
||||
if isinstance(common_labels, dict) and common_labels:
|
||||
self._apply_common_labels(transformed, common_labels)
|
||||
|
||||
images = config.get("images") or []
|
||||
if isinstance(images, list) and images:
|
||||
self._apply_image_overrides(transformed, images)
|
||||
|
||||
return transformed
|
||||
|
||||
def _apply_common_labels(self, resource: dict[str, Any], labels: dict[str, Any]) -> None:
|
||||
labels = {str(k): str(v) for k, v in labels.items()}
|
||||
kind = resource.get("kind")
|
||||
metadata = resource.setdefault("metadata", {})
|
||||
self._merge_labels(metadata.setdefault("labels", {}), labels)
|
||||
|
||||
spec = resource.setdefault("spec", {})
|
||||
if kind in {"Deployment", "StatefulSet", "DaemonSet", "ReplicaSet"}:
|
||||
selector = spec.setdefault("selector", {}).setdefault("matchLabels", {})
|
||||
self._merge_labels(selector, labels)
|
||||
template_metadata = spec.setdefault("template", {}).setdefault("metadata", {})
|
||||
self._merge_labels(template_metadata.setdefault("labels", {}), labels)
|
||||
elif kind == "Service":
|
||||
selector = spec.setdefault("selector", {})
|
||||
if isinstance(selector, dict):
|
||||
self._merge_labels(selector, labels)
|
||||
elif kind == "PodDisruptionBudget":
|
||||
selector = spec.setdefault("selector", {}).setdefault("matchLabels", {})
|
||||
self._merge_labels(selector, labels)
|
||||
|
||||
self._apply_label_selector_labels(spec, labels)
|
||||
|
||||
@staticmethod
|
||||
def _merge_labels(target: dict[str, Any], labels: dict[str, str]) -> None:
|
||||
for key, value in labels.items():
|
||||
target.setdefault(key, value)
|
||||
|
||||
def _apply_label_selector_labels(self, value: Any, labels: dict[str, str]) -> None:
|
||||
if isinstance(value, dict):
|
||||
selector = value.get("labelSelector")
|
||||
if isinstance(selector, dict):
|
||||
match_labels = selector.setdefault("matchLabels", {})
|
||||
if isinstance(match_labels, dict):
|
||||
self._merge_labels(match_labels, labels)
|
||||
for child in value.values():
|
||||
self._apply_label_selector_labels(child, labels)
|
||||
elif isinstance(value, list):
|
||||
for child in value:
|
||||
self._apply_label_selector_labels(child, labels)
|
||||
|
||||
def _apply_image_overrides(self, resource: dict[str, Any], images: list[Any]) -> None:
|
||||
overrides = [image for image in images if isinstance(image, dict)]
|
||||
if not overrides:
|
||||
return
|
||||
|
||||
for pod_spec in self._iter_pod_specs(resource):
|
||||
for key in ("containers", "initContainers"):
|
||||
containers = pod_spec.get(key)
|
||||
if not isinstance(containers, list):
|
||||
continue
|
||||
for container in containers:
|
||||
if not isinstance(container, dict) or "image" not in container:
|
||||
continue
|
||||
image = container.get("image")
|
||||
if isinstance(image, str):
|
||||
container["image"] = self._rewrite_image(image, overrides)
|
||||
|
||||
@staticmethod
|
||||
def _iter_pod_specs(resource: dict[str, Any]) -> list[dict[str, Any]]:
|
||||
spec = resource.get("spec")
|
||||
if not isinstance(spec, dict):
|
||||
return []
|
||||
|
||||
pod_specs: list[dict[str, Any]] = []
|
||||
template_spec = spec.get("template", {}).get("spec")
|
||||
if isinstance(template_spec, dict):
|
||||
pod_specs.append(template_spec)
|
||||
|
||||
job_template_spec = (
|
||||
spec.get("jobTemplate", {})
|
||||
.get("spec", {})
|
||||
.get("template", {})
|
||||
.get("spec")
|
||||
)
|
||||
if isinstance(job_template_spec, dict):
|
||||
pod_specs.append(job_template_spec)
|
||||
|
||||
if resource.get("kind") == "Pod":
|
||||
pod_specs.append(spec)
|
||||
|
||||
return pod_specs
|
||||
|
||||
def _rewrite_image(self, current_image: str, overrides: list[dict[str, Any]]) -> str:
|
||||
current_name = self._image_name_without_tag(current_image)
|
||||
for override in overrides:
|
||||
source = str(override.get("name", ""))
|
||||
new_name = str(override.get("newName") or self._image_name_without_tag(source))
|
||||
new_tag = override.get("newTag")
|
||||
source_name = self._image_name_without_tag(source)
|
||||
if current_image != source and current_name != source_name:
|
||||
continue
|
||||
return f"{new_name}:{new_tag}" if new_tag else new_name
|
||||
return current_image
|
||||
|
||||
@staticmethod
|
||||
def _image_name_without_tag(image: str) -> str:
|
||||
without_digest = image.split("@", 1)[0]
|
||||
slash_index = without_digest.rfind("/")
|
||||
colon_index = without_digest.rfind(":")
|
||||
if colon_index > slash_index:
|
||||
return without_digest[:colon_index]
|
||||
return without_digest
|
||||
|
||||
|
||||
class K8sStateReader:
|
||||
"""從 kubectl 讀取 K8s 實際狀態"""
|
||||
@@ -257,8 +421,8 @@ class DriftDetector:
|
||||
items: list[DriftItem] = []
|
||||
|
||||
# 只比對 spec 層(metadata 的動態欄位太多)
|
||||
git_spec = git_res.get("spec", {})
|
||||
actual_spec = actual_res.get("spec", {})
|
||||
git_spec = self._normalized_spec(git_res)
|
||||
actual_spec = self._normalized_spec(actual_res)
|
||||
|
||||
diffs = self._flatten_diff("spec", git_spec, actual_spec)
|
||||
for field_path, (git_val, actual_val) in diffs.items():
|
||||
@@ -283,6 +447,151 @@ class DriftDetector:
|
||||
|
||||
return items
|
||||
|
||||
def _normalized_spec(self, resource: dict[str, Any]) -> dict[str, Any]:
|
||||
"""Normalize Kubernetes API defaults before field diffing."""
|
||||
spec = deepcopy(resource.get("spec", {}))
|
||||
if not isinstance(spec, dict):
|
||||
return {}
|
||||
|
||||
kind = resource.get("kind")
|
||||
if kind == "Service":
|
||||
self._normalize_service_spec(spec)
|
||||
if kind in {"Deployment", "StatefulSet", "DaemonSet", "ReplicaSet"}:
|
||||
self._normalize_controller_defaults(spec)
|
||||
self._normalize_template_defaults(spec)
|
||||
return spec
|
||||
|
||||
@staticmethod
|
||||
def _normalize_service_spec(spec: dict[str, Any]) -> None:
|
||||
for field in (
|
||||
"clusterIP",
|
||||
"clusterIPs",
|
||||
"ipFamilies",
|
||||
"ipFamilyPolicy",
|
||||
"internalTrafficPolicy",
|
||||
):
|
||||
spec.pop(field, None)
|
||||
if spec.get("externalTrafficPolicy") == "Cluster":
|
||||
spec.pop("externalTrafficPolicy", None)
|
||||
if spec.get("sessionAffinity") == "None":
|
||||
spec.pop("sessionAffinity", None)
|
||||
ports = spec.get("ports")
|
||||
if isinstance(ports, list):
|
||||
for port in ports:
|
||||
if isinstance(port, dict) and port.get("protocol") == "TCP":
|
||||
port.pop("protocol", None)
|
||||
|
||||
def _normalize_controller_defaults(self, spec: dict[str, Any]) -> None:
|
||||
if spec.get("progressDeadlineSeconds") == 600:
|
||||
spec.pop("progressDeadlineSeconds", None)
|
||||
if spec.get("revisionHistoryLimit") == 10:
|
||||
spec.pop("revisionHistoryLimit", None)
|
||||
strategy = spec.get("strategy")
|
||||
if isinstance(strategy, dict):
|
||||
rolling_update = strategy.get("rollingUpdate")
|
||||
if isinstance(rolling_update, dict):
|
||||
if rolling_update.get("maxSurge") == "25%":
|
||||
rolling_update.pop("maxSurge", None)
|
||||
if rolling_update.get("maxUnavailable") == "25%":
|
||||
rolling_update.pop("maxUnavailable", None)
|
||||
if not rolling_update:
|
||||
strategy.pop("rollingUpdate", None)
|
||||
if strategy.get("type") == "RollingUpdate" and len(strategy) == 1:
|
||||
spec.pop("strategy", None)
|
||||
|
||||
def _normalize_template_defaults(self, spec: dict[str, Any]) -> None:
|
||||
template = spec.get("template")
|
||||
if not isinstance(template, dict):
|
||||
return
|
||||
|
||||
template_metadata = template.get("metadata")
|
||||
if isinstance(template_metadata, dict):
|
||||
annotations = template_metadata.get("annotations")
|
||||
if isinstance(annotations, dict):
|
||||
annotations.pop("kubectl.kubernetes.io/restartedAt", None)
|
||||
if not annotations:
|
||||
template_metadata.pop("annotations", None)
|
||||
|
||||
pod_spec = template.get("spec")
|
||||
if isinstance(pod_spec, dict):
|
||||
self._normalize_pod_spec_defaults(pod_spec)
|
||||
|
||||
def _normalize_pod_spec_defaults(self, pod_spec: dict[str, Any]) -> None:
|
||||
defaults = {
|
||||
"restartPolicy": "Always",
|
||||
"dnsPolicy": "ClusterFirst",
|
||||
"schedulerName": "default-scheduler",
|
||||
"terminationGracePeriodSeconds": 30,
|
||||
}
|
||||
for field, default in defaults.items():
|
||||
if pod_spec.get(field) == default:
|
||||
pod_spec.pop(field, None)
|
||||
|
||||
if pod_spec.get("securityContext") == {}:
|
||||
pod_spec.pop("securityContext", None)
|
||||
if pod_spec.get("serviceAccount") == pod_spec.get("serviceAccountName"):
|
||||
pod_spec.pop("serviceAccount", None)
|
||||
|
||||
for key in ("containers", "initContainers"):
|
||||
containers = pod_spec.get(key)
|
||||
if isinstance(containers, list):
|
||||
for container in containers:
|
||||
if isinstance(container, dict):
|
||||
self._normalize_container_defaults(container)
|
||||
|
||||
volumes = pod_spec.get("volumes")
|
||||
if isinstance(volumes, list):
|
||||
for volume in volumes:
|
||||
if isinstance(volume, dict):
|
||||
self._normalize_volume_defaults(volume)
|
||||
|
||||
def _normalize_container_defaults(self, container: dict[str, Any]) -> None:
|
||||
if container.get("terminationMessagePath") == "/dev/termination-log":
|
||||
container.pop("terminationMessagePath", None)
|
||||
if container.get("terminationMessagePolicy") == "File":
|
||||
container.pop("terminationMessagePolicy", None)
|
||||
|
||||
ports = container.get("ports")
|
||||
if isinstance(ports, list):
|
||||
for port in ports:
|
||||
if isinstance(port, dict) and port.get("protocol") == "TCP":
|
||||
port.pop("protocol", None)
|
||||
|
||||
for probe_name in ("livenessProbe", "readinessProbe", "startupProbe"):
|
||||
probe = container.get(probe_name)
|
||||
if isinstance(probe, dict):
|
||||
self._normalize_probe_defaults(probe)
|
||||
|
||||
env = container.get("env")
|
||||
if isinstance(env, list):
|
||||
for item in env:
|
||||
if isinstance(item, dict):
|
||||
self._normalize_env_defaults(item)
|
||||
|
||||
@staticmethod
|
||||
def _normalize_probe_defaults(probe: dict[str, Any]) -> None:
|
||||
if probe.get("successThreshold") == 1:
|
||||
probe.pop("successThreshold", None)
|
||||
http_get = probe.get("httpGet")
|
||||
if isinstance(http_get, dict) and http_get.get("scheme") == "HTTP":
|
||||
http_get.pop("scheme", None)
|
||||
|
||||
@staticmethod
|
||||
def _normalize_env_defaults(env_item: dict[str, Any]) -> None:
|
||||
value_from = env_item.get("valueFrom")
|
||||
if not isinstance(value_from, dict):
|
||||
return
|
||||
field_ref = value_from.get("fieldRef")
|
||||
if isinstance(field_ref, dict) and field_ref.get("apiVersion") == "v1":
|
||||
field_ref.pop("apiVersion", None)
|
||||
|
||||
@staticmethod
|
||||
def _normalize_volume_defaults(volume: dict[str, Any]) -> None:
|
||||
for source_key in ("secret", "configMap"):
|
||||
source = volume.get(source_key)
|
||||
if isinstance(source, dict) and source.get("defaultMode") in {420, "420"}:
|
||||
source.pop("defaultMode", None)
|
||||
|
||||
def _flatten_diff(
|
||||
self,
|
||||
prefix: str,
|
||||
|
||||
237
apps/api/tests/test_drift_detector_normalization.py
Normal file
237
apps/api/tests/test_drift_detector_normalization.py
Normal file
@@ -0,0 +1,237 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from copy import deepcopy
|
||||
|
||||
from src.models.drift import DriftLevel
|
||||
from src.services.drift_detector import DriftDetector, GitStateReader
|
||||
|
||||
|
||||
def test_git_state_reader_applies_kustomize_labels_and_images(tmp_path):
|
||||
prod_dir = tmp_path / "k8s" / "awoooi-prod"
|
||||
prod_dir.mkdir(parents=True)
|
||||
(prod_dir / "kustomization.yaml").write_text(
|
||||
"""
|
||||
apiVersion: kustomize.config.k8s.io/v1beta1
|
||||
kind: Kustomization
|
||||
namespace: awoooi-prod
|
||||
commonLabels:
|
||||
environment: prod
|
||||
system: awoooi
|
||||
resources:
|
||||
- deployment.yaml
|
||||
images:
|
||||
- name: registry.local/library/api:IMAGE_TAG_PLACEHOLDER
|
||||
newName: registry.local/awoooi/api
|
||||
newTag: abc123
|
||||
""",
|
||||
encoding="utf-8",
|
||||
)
|
||||
(prod_dir / "deployment.yaml").write_text(
|
||||
"""
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
name: awoooi-api
|
||||
spec:
|
||||
selector:
|
||||
matchLabels:
|
||||
app: awoooi-api
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
app: awoooi-api
|
||||
spec:
|
||||
containers:
|
||||
- name: api
|
||||
image: registry.local/library/api:IMAGE_TAG_PLACEHOLDER
|
||||
affinity:
|
||||
podAntiAffinity:
|
||||
preferredDuringSchedulingIgnoredDuringExecution:
|
||||
- weight: 100
|
||||
podAffinityTerm:
|
||||
labelSelector:
|
||||
matchLabels:
|
||||
app: awoooi-api
|
||||
topologyKey: kubernetes.io/hostname
|
||||
---
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
name: awoooi-api-svc
|
||||
spec:
|
||||
selector:
|
||||
app: awoooi-api
|
||||
ports:
|
||||
- port: 8000
|
||||
targetPort: 8000
|
||||
""",
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
state = GitStateReader(str(tmp_path / "k8s"))._read_sync("awoooi-prod")
|
||||
|
||||
deployment = state["Deployment/awoooi-api"]
|
||||
template = deployment["spec"]["template"]
|
||||
assert deployment["metadata"]["namespace"] == "awoooi-prod"
|
||||
assert deployment["spec"]["selector"]["matchLabels"]["environment"] == "prod"
|
||||
assert template["metadata"]["labels"]["system"] == "awoooi"
|
||||
assert (
|
||||
template["spec"]["containers"][0]["image"]
|
||||
== "registry.local/awoooi/api:abc123"
|
||||
)
|
||||
affinity_selector = template["spec"]["affinity"]["podAntiAffinity"][
|
||||
"preferredDuringSchedulingIgnoredDuringExecution"
|
||||
][0]["podAffinityTerm"]["labelSelector"]["matchLabels"]
|
||||
assert affinity_selector["environment"] == "prod"
|
||||
assert affinity_selector["system"] == "awoooi"
|
||||
|
||||
service = state["Service/awoooi-api-svc"]
|
||||
assert service["metadata"]["namespace"] == "awoooi-prod"
|
||||
assert service["spec"]["selector"]["environment"] == "prod"
|
||||
assert service["spec"]["selector"]["system"] == "awoooi"
|
||||
|
||||
|
||||
def test_kubernetes_api_defaults_do_not_create_drift():
|
||||
detector = DriftDetector()
|
||||
git_res = {
|
||||
"kind": "Deployment",
|
||||
"spec": {
|
||||
"selector": {"matchLabels": {"app": "awoooi-api"}},
|
||||
"template": {
|
||||
"metadata": {"labels": {"app": "awoooi-api"}},
|
||||
"spec": {
|
||||
"serviceAccountName": "awoooi-executor",
|
||||
"containers": [
|
||||
{
|
||||
"name": "api",
|
||||
"image": "registry.local/awoooi/api:abc123",
|
||||
"ports": [{"containerPort": 8000, "name": "http"}],
|
||||
"readinessProbe": {
|
||||
"httpGet": {"path": "/api/v1/health", "port": 8000},
|
||||
"periodSeconds": 5,
|
||||
},
|
||||
"env": [
|
||||
{
|
||||
"name": "POD_NAME",
|
||||
"valueFrom": {
|
||||
"fieldRef": {"fieldPath": "metadata.name"}
|
||||
},
|
||||
}
|
||||
],
|
||||
}
|
||||
],
|
||||
"volumes": [
|
||||
{"name": "runtime-secret", "secret": {"secretName": "s"}},
|
||||
{"name": "runtime-config", "configMap": {"name": "c"}},
|
||||
],
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
actual_res = deepcopy(git_res)
|
||||
pod_spec = actual_res["spec"]["template"]["spec"]
|
||||
pod_spec.update(
|
||||
{
|
||||
"serviceAccount": "awoooi-executor",
|
||||
"restartPolicy": "Always",
|
||||
"dnsPolicy": "ClusterFirst",
|
||||
"schedulerName": "default-scheduler",
|
||||
"terminationGracePeriodSeconds": 30,
|
||||
"securityContext": {},
|
||||
}
|
||||
)
|
||||
container = pod_spec["containers"][0]
|
||||
container["terminationMessagePath"] = "/dev/termination-log"
|
||||
container["terminationMessagePolicy"] = "File"
|
||||
container["ports"][0]["protocol"] = "TCP"
|
||||
container["readinessProbe"]["successThreshold"] = 1
|
||||
container["readinessProbe"]["httpGet"]["scheme"] = "HTTP"
|
||||
container["env"][0]["valueFrom"]["fieldRef"]["apiVersion"] = "v1"
|
||||
pod_spec["volumes"][0]["secret"]["defaultMode"] = 420
|
||||
pod_spec["volumes"][1]["configMap"]["defaultMode"] = 420
|
||||
actual_res["spec"]["template"]["metadata"]["annotations"] = {
|
||||
"kubectl.kubernetes.io/restartedAt": "2026-05-19T00:00:00+08:00"
|
||||
}
|
||||
|
||||
assert (
|
||||
detector._diff_resources(
|
||||
git_res,
|
||||
actual_res,
|
||||
"Deployment",
|
||||
"awoooi-api",
|
||||
"awoooi-prod",
|
||||
)
|
||||
== []
|
||||
)
|
||||
|
||||
|
||||
def test_service_runtime_defaults_do_not_create_drift():
|
||||
detector = DriftDetector()
|
||||
git_res = {
|
||||
"kind": "Service",
|
||||
"spec": {
|
||||
"type": "NodePort",
|
||||
"selector": {"app": "awoooi-api"},
|
||||
"ports": [{"port": 8000, "targetPort": 8000, "nodePort": 32334}],
|
||||
},
|
||||
}
|
||||
actual_res = deepcopy(git_res)
|
||||
actual_res["spec"].update(
|
||||
{
|
||||
"clusterIP": "10.43.156.119",
|
||||
"clusterIPs": ["10.43.156.119"],
|
||||
"ipFamilies": ["IPv4"],
|
||||
"ipFamilyPolicy": "SingleStack",
|
||||
"internalTrafficPolicy": "Cluster",
|
||||
"externalTrafficPolicy": "Cluster",
|
||||
"sessionAffinity": "None",
|
||||
}
|
||||
)
|
||||
actual_res["spec"]["ports"][0]["protocol"] = "TCP"
|
||||
|
||||
assert (
|
||||
detector._diff_resources(
|
||||
git_res,
|
||||
actual_res,
|
||||
"Service",
|
||||
"awoooi-api-svc",
|
||||
"awoooi-prod",
|
||||
)
|
||||
== []
|
||||
)
|
||||
|
||||
|
||||
def test_real_container_env_drift_remains_visible():
|
||||
detector = DriftDetector()
|
||||
git_res = {
|
||||
"kind": "Deployment",
|
||||
"spec": {
|
||||
"template": {
|
||||
"metadata": {"labels": {"app": "awoooi-api"}},
|
||||
"spec": {
|
||||
"containers": [
|
||||
{
|
||||
"name": "api",
|
||||
"image": "registry.local/awoooi/api:abc123",
|
||||
"env": [{"name": "USE_AI_ROUTER", "value": "true"}],
|
||||
}
|
||||
]
|
||||
},
|
||||
}
|
||||
},
|
||||
}
|
||||
actual_res = deepcopy(git_res)
|
||||
actual_res["spec"]["template"]["spec"]["containers"][0]["env"].append(
|
||||
{"name": "DUMMY_DEPLOY", "value": "1777914462"}
|
||||
)
|
||||
|
||||
items = detector._diff_resources(
|
||||
git_res,
|
||||
actual_res,
|
||||
"Deployment",
|
||||
"awoooi-api",
|
||||
"awoooi-prod",
|
||||
)
|
||||
|
||||
assert [item.field_path for item in items] == ["spec.template.spec.containers"]
|
||||
assert items[0].drift_level == DriftLevel.MEDIUM
|
||||
Reference in New Issue
Block a user