fix(drift): read git state from gitea main
All checks were successful
Code Review / ai-code-review (push) Successful in 19s
CD Pipeline / tests (push) Successful in 1m29s
CD Pipeline / build-and-deploy (push) Successful in 3m40s
CD Pipeline / post-deploy-checks (push) Successful in 2m5s

This commit is contained in:
Your Name
2026-05-19 02:14:26 +08:00
parent 2c4e8bb666
commit 01ba1e6f13
2 changed files with 171 additions and 0 deletions

View File

@@ -17,6 +17,9 @@ import uuid
from copy import deepcopy
from pathlib import Path
from typing import Any
from urllib.error import HTTPError, URLError
from urllib.parse import quote
from urllib.request import Request, urlopen
import structlog
import yaml
@@ -89,6 +92,10 @@ class GitStateReader:
return {}
def _read_sync(self, namespace: str) -> dict[str, Any]:
remote_resources = self._read_remote_sync(namespace)
if remote_resources:
return remote_resources
resources: dict[str, Any] = {}
kustomization_cache: dict[Path, dict[str, Any] | None] = {}
@@ -120,6 +127,120 @@ class GitStateReader:
return resources
def _read_remote_sync(self, namespace: str) -> dict[str, Any]:
"""Read drift source of truth from Gitea main when available."""
if self._k8s_dir.is_absolute():
return {}
try:
from src.core.config import get_settings
settings = get_settings()
except Exception as e:
logger.debug("drift_remote_settings_unavailable", error=str(e))
return {}
repo_dir = self._k8s_dir.as_posix().strip("/")
namespace_dir = f"{repo_dir}/{namespace}"
kustomization_path = f"{namespace_dir}/kustomization.yaml"
kustomization_text = self._fetch_gitea_raw(settings, kustomization_path)
if not kustomization_text:
return {}
try:
config = yaml.safe_load(kustomization_text) or {}
except Exception as e:
logger.warning(
"drift_remote_kustomization_parse_failed",
path=kustomization_path,
error=str(e),
)
return {}
if not isinstance(config, dict):
return {}
resources: dict[str, Any] = {}
resource_paths = self._remote_resource_paths(namespace_dir, config)
for resource_path in resource_paths:
content = self._fetch_gitea_raw(settings, resource_path)
if not content:
continue
try:
docs = list(yaml.safe_load_all(content))
except Exception as e:
logger.debug(
"drift_remote_yaml_parse_failed",
path=resource_path,
error=str(e),
)
continue
self._collect_resources(resources, docs, namespace, config)
if resources:
logger.info(
"drift_git_state_remote_read",
namespace=namespace,
resources=len(resources),
source="gitea_main",
)
return resources
@staticmethod
def _remote_resource_paths(namespace_dir: str, config: dict[str, Any]) -> list[str]:
resources = config.get("resources") or []
paths: list[str] = []
for resource in resources:
resource_str = str(resource)
if not resource_str.endswith((".yaml", ".yml")):
continue
path = (Path(namespace_dir) / resource_str).as_posix()
paths.append(path)
return paths
def _fetch_gitea_raw(self, settings: Any, path: str) -> str | None:
api_url = str(settings.GITEA_API_URL).rstrip("/")
owner = quote(str(settings.GITEA_REPO_OWNER), safe="")
repo = quote(str(settings.GITEA_REPO_NAME), safe="")
encoded_path = quote(path, safe="/")
url = f"{api_url}/api/v1/repos/{owner}/{repo}/raw/{encoded_path}?ref=main"
headers = {"Accept": "text/plain"}
token = getattr(settings, "GITEA_API_TOKEN", "")
if token:
headers["Authorization"] = f"token {token}"
request = Request(url, headers=headers)
try:
with urlopen(request, timeout=10) as response:
return response.read().decode("utf-8")
except HTTPError as e:
logger.debug("drift_remote_raw_http_error", path=path, status=e.code)
except URLError as e:
logger.debug("drift_remote_raw_url_error", path=path, error=str(e))
except Exception as e:
logger.debug("drift_remote_raw_fetch_failed", path=path, error=str(e))
return None
def _collect_resources(
self,
resources: dict[str, Any],
docs: list[Any],
namespace: str,
kustomization: dict[str, Any] | None = None,
) -> None:
for doc in docs:
if not doc or not isinstance(doc, dict):
continue
if kustomization:
doc = self._apply_kustomization(doc, kustomization)
metadata = doc.get("metadata", {})
ns = metadata.get("namespace", "")
if ns and ns != namespace:
continue
kind = doc.get("kind", "")
name = metadata.get("name", "")
if kind and name:
key = f"{kind}/{name}"
resources[key] = doc
def _kustomization_for_file(
self,
yaml_file: Path,

View File

@@ -91,6 +91,56 @@ spec:
assert service["spec"]["selector"]["system"] == "awoooi"
def test_git_state_reader_prefers_gitea_main_raw_files(monkeypatch):
reader = GitStateReader("k8s")
remote_files = {
"k8s/awoooi-prod/kustomization.yaml": """
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
namespace: awoooi-prod
commonLabels:
environment: prod
system: awoooi
resources:
- deployment.yaml
images:
- name: registry.local/library/api:IMAGE_TAG_PLACEHOLDER
newName: registry.local/awoooi/api
newTag: live-main-sha
""",
"k8s/awoooi-prod/deployment.yaml": """
apiVersion: apps/v1
kind: Deployment
metadata:
name: awoooi-api
spec:
selector:
matchLabels:
app: awoooi-api
template:
metadata:
labels:
app: awoooi-api
spec:
containers:
- name: api
image: registry.local/library/api:IMAGE_TAG_PLACEHOLDER
""",
}
def fake_fetch(settings, path):
return remote_files.get(path)
monkeypatch.setattr(reader, "_fetch_gitea_raw", fake_fetch)
state = reader._read_sync("awoooi-prod")
assert set(state) == {"Deployment/awoooi-api"}
container = state["Deployment/awoooi-api"]["spec"]["template"]["spec"]["containers"][0]
assert container["image"] == "registry.local/awoooi/api:live-main-sha"
assert state["Deployment/awoooi-api"]["spec"]["selector"]["matchLabels"]["system"] == "awoooi"
def test_kubernetes_api_defaults_do_not_create_drift():
detector = DriftDetector()
git_res = {