fix(drift): read git state from gitea main
This commit is contained in:
@@ -17,6 +17,9 @@ import uuid
|
||||
from copy import deepcopy
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from urllib.error import HTTPError, URLError
|
||||
from urllib.parse import quote
|
||||
from urllib.request import Request, urlopen
|
||||
|
||||
import structlog
|
||||
import yaml
|
||||
@@ -89,6 +92,10 @@ class GitStateReader:
|
||||
return {}
|
||||
|
||||
def _read_sync(self, namespace: str) -> dict[str, Any]:
|
||||
remote_resources = self._read_remote_sync(namespace)
|
||||
if remote_resources:
|
||||
return remote_resources
|
||||
|
||||
resources: dict[str, Any] = {}
|
||||
kustomization_cache: dict[Path, dict[str, Any] | None] = {}
|
||||
|
||||
@@ -120,6 +127,120 @@ class GitStateReader:
|
||||
|
||||
return resources
|
||||
|
||||
def _read_remote_sync(self, namespace: str) -> dict[str, Any]:
|
||||
"""Read drift source of truth from Gitea main when available."""
|
||||
if self._k8s_dir.is_absolute():
|
||||
return {}
|
||||
|
||||
try:
|
||||
from src.core.config import get_settings
|
||||
settings = get_settings()
|
||||
except Exception as e:
|
||||
logger.debug("drift_remote_settings_unavailable", error=str(e))
|
||||
return {}
|
||||
|
||||
repo_dir = self._k8s_dir.as_posix().strip("/")
|
||||
namespace_dir = f"{repo_dir}/{namespace}"
|
||||
kustomization_path = f"{namespace_dir}/kustomization.yaml"
|
||||
kustomization_text = self._fetch_gitea_raw(settings, kustomization_path)
|
||||
if not kustomization_text:
|
||||
return {}
|
||||
|
||||
try:
|
||||
config = yaml.safe_load(kustomization_text) or {}
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
"drift_remote_kustomization_parse_failed",
|
||||
path=kustomization_path,
|
||||
error=str(e),
|
||||
)
|
||||
return {}
|
||||
if not isinstance(config, dict):
|
||||
return {}
|
||||
|
||||
resources: dict[str, Any] = {}
|
||||
resource_paths = self._remote_resource_paths(namespace_dir, config)
|
||||
for resource_path in resource_paths:
|
||||
content = self._fetch_gitea_raw(settings, resource_path)
|
||||
if not content:
|
||||
continue
|
||||
try:
|
||||
docs = list(yaml.safe_load_all(content))
|
||||
except Exception as e:
|
||||
logger.debug(
|
||||
"drift_remote_yaml_parse_failed",
|
||||
path=resource_path,
|
||||
error=str(e),
|
||||
)
|
||||
continue
|
||||
self._collect_resources(resources, docs, namespace, config)
|
||||
|
||||
if resources:
|
||||
logger.info(
|
||||
"drift_git_state_remote_read",
|
||||
namespace=namespace,
|
||||
resources=len(resources),
|
||||
source="gitea_main",
|
||||
)
|
||||
return resources
|
||||
|
||||
@staticmethod
|
||||
def _remote_resource_paths(namespace_dir: str, config: dict[str, Any]) -> list[str]:
|
||||
resources = config.get("resources") or []
|
||||
paths: list[str] = []
|
||||
for resource in resources:
|
||||
resource_str = str(resource)
|
||||
if not resource_str.endswith((".yaml", ".yml")):
|
||||
continue
|
||||
path = (Path(namespace_dir) / resource_str).as_posix()
|
||||
paths.append(path)
|
||||
return paths
|
||||
|
||||
def _fetch_gitea_raw(self, settings: Any, path: str) -> str | None:
|
||||
api_url = str(settings.GITEA_API_URL).rstrip("/")
|
||||
owner = quote(str(settings.GITEA_REPO_OWNER), safe="")
|
||||
repo = quote(str(settings.GITEA_REPO_NAME), safe="")
|
||||
encoded_path = quote(path, safe="/")
|
||||
url = f"{api_url}/api/v1/repos/{owner}/{repo}/raw/{encoded_path}?ref=main"
|
||||
headers = {"Accept": "text/plain"}
|
||||
token = getattr(settings, "GITEA_API_TOKEN", "")
|
||||
if token:
|
||||
headers["Authorization"] = f"token {token}"
|
||||
|
||||
request = Request(url, headers=headers)
|
||||
try:
|
||||
with urlopen(request, timeout=10) as response:
|
||||
return response.read().decode("utf-8")
|
||||
except HTTPError as e:
|
||||
logger.debug("drift_remote_raw_http_error", path=path, status=e.code)
|
||||
except URLError as e:
|
||||
logger.debug("drift_remote_raw_url_error", path=path, error=str(e))
|
||||
except Exception as e:
|
||||
logger.debug("drift_remote_raw_fetch_failed", path=path, error=str(e))
|
||||
return None
|
||||
|
||||
def _collect_resources(
|
||||
self,
|
||||
resources: dict[str, Any],
|
||||
docs: list[Any],
|
||||
namespace: str,
|
||||
kustomization: dict[str, Any] | None = None,
|
||||
) -> None:
|
||||
for doc in docs:
|
||||
if not doc or not isinstance(doc, dict):
|
||||
continue
|
||||
if kustomization:
|
||||
doc = self._apply_kustomization(doc, kustomization)
|
||||
metadata = doc.get("metadata", {})
|
||||
ns = metadata.get("namespace", "")
|
||||
if ns and ns != namespace:
|
||||
continue
|
||||
kind = doc.get("kind", "")
|
||||
name = metadata.get("name", "")
|
||||
if kind and name:
|
||||
key = f"{kind}/{name}"
|
||||
resources[key] = doc
|
||||
|
||||
def _kustomization_for_file(
|
||||
self,
|
||||
yaml_file: Path,
|
||||
|
||||
@@ -91,6 +91,56 @@ spec:
|
||||
assert service["spec"]["selector"]["system"] == "awoooi"
|
||||
|
||||
|
||||
def test_git_state_reader_prefers_gitea_main_raw_files(monkeypatch):
|
||||
reader = GitStateReader("k8s")
|
||||
remote_files = {
|
||||
"k8s/awoooi-prod/kustomization.yaml": """
|
||||
apiVersion: kustomize.config.k8s.io/v1beta1
|
||||
kind: Kustomization
|
||||
namespace: awoooi-prod
|
||||
commonLabels:
|
||||
environment: prod
|
||||
system: awoooi
|
||||
resources:
|
||||
- deployment.yaml
|
||||
images:
|
||||
- name: registry.local/library/api:IMAGE_TAG_PLACEHOLDER
|
||||
newName: registry.local/awoooi/api
|
||||
newTag: live-main-sha
|
||||
""",
|
||||
"k8s/awoooi-prod/deployment.yaml": """
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
name: awoooi-api
|
||||
spec:
|
||||
selector:
|
||||
matchLabels:
|
||||
app: awoooi-api
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
app: awoooi-api
|
||||
spec:
|
||||
containers:
|
||||
- name: api
|
||||
image: registry.local/library/api:IMAGE_TAG_PLACEHOLDER
|
||||
""",
|
||||
}
|
||||
|
||||
def fake_fetch(settings, path):
|
||||
return remote_files.get(path)
|
||||
|
||||
monkeypatch.setattr(reader, "_fetch_gitea_raw", fake_fetch)
|
||||
|
||||
state = reader._read_sync("awoooi-prod")
|
||||
|
||||
assert set(state) == {"Deployment/awoooi-api"}
|
||||
container = state["Deployment/awoooi-api"]["spec"]["template"]["spec"]["containers"][0]
|
||||
assert container["image"] == "registry.local/awoooi/api:live-main-sha"
|
||||
assert state["Deployment/awoooi-api"]["spec"]["selector"]["matchLabels"]["system"] == "awoooi"
|
||||
|
||||
|
||||
def test_kubernetes_api_defaults_do_not_create_drift():
|
||||
detector = DriftDetector()
|
||||
git_res = {
|
||||
|
||||
Reference in New Issue
Block a user