#!/usr/bin/env python3 # 2026-04-28 ogt + Claude Opus 4.7: P2-1 ConfigMap vs code default drift checker # 2026-04-29 ogt + Claude Opus 4.7: critic M7 修 — regex 改 AST 解析,避免 false negative # 來源:tool-expert 統一治理方案 + critic PR review # 目的:CI / pre-commit 階段驗證 k8s ConfigMap 與 apps/api/src/core/config.py default 一致 """ ConfigMap vs Code Default Drift Checker (AST-based) 用法: python3 scripts/check_config_drift.py 退出碼: 0 = 全部對齊 1 = 至少一項 drift(CI 應 fail) 2 = 配置/解析錯誤 設計: 用 ast.parse 解析 config.py 找 ClassDef Settings → 每個 AnnAssign 的 Field(default=...),避免 regex 對多行 list / default_factory= / 含跳行字串 的 false negative(critic M7)。 """ from __future__ import annotations import ast import json import sys from pathlib import Path from typing import Any import yaml ROOT = Path(__file__).resolve().parent.parent CONFIGMAP_PATH = ROOT / "k8s" / "awoooi-prod" / "04-configmap.yaml" CONFIG_PY_PATH = ROOT / "apps" / "api" / "src" / "core" / "config.py" # 需要比對的欄位 CHECK_FIELDS: list[str] = [ "AI_FALLBACK_ORDER", "ARGOCD_URL", "PROMETHEUS_URL", "OLLAMA_URL", ] def _extract_field_default(call_node: ast.Call) -> Any: """從 ast.Call(Field(default=..., ...)) 提取 default value。 回傳 Python 物件(str / list / int / bool)或 None(找不到)。 """ for kw in call_node.keywords: if kw.arg == "default": return _ast_to_value(kw.value) if kw.arg == "default_factory": # default_factory 動態產生,無法靜態比對 return "" return None def _ast_to_value(node: ast.AST) -> Any: """ast 節點 → Python 物件(保守,無法解析回 None)。""" if isinstance(node, ast.Constant): return node.value if isinstance(node, ast.List): return [_ast_to_value(elt) for elt in node.elts] if isinstance(node, ast.Tuple): return tuple(_ast_to_value(elt) for elt in node.elts) if isinstance(node, ast.Dict): return { _ast_to_value(k): _ast_to_value(v) for k, v in zip(node.keys, node.values, strict=False) } if isinstance(node, ast.UnaryOp) and isinstance(node.op, ast.USub): v = _ast_to_value(node.operand) return -v if isinstance(v, (int, float)) else None return None def _parse_settings_defaults(py_path: Path) -> dict[str, Any]: """解析 config.py 的 Settings class 取所有欄位的 default value。""" src = py_path.read_text(encoding="utf-8") tree = ast.parse(src, filename=str(py_path)) defaults: dict[str, Any] = {} for cls_node in ast.walk(tree): if not isinstance(cls_node, ast.ClassDef) or cls_node.name != "Settings": continue for stmt in cls_node.body: if not isinstance(stmt, ast.AnnAssign): continue if not isinstance(stmt.target, ast.Name): continue field_name = stmt.target.id if stmt.value is None: continue # Settings = Field(default=..., ...) if isinstance(stmt.value, ast.Call) and isinstance(stmt.value.func, ast.Name): if stmt.value.func.id == "Field": val = _extract_field_default(stmt.value) if val is not None: defaults[field_name] = val continue # 直接 default: var = "value" val = _ast_to_value(stmt.value) if val is not None: defaults[field_name] = val return defaults def _normalize(raw: Any) -> Any: """ConfigMap 字串可能是 JSON list(如 AI_FALLBACK_ORDER),嘗試解析。""" if isinstance(raw, str): s = raw.strip() if s.startswith("[") and s.endswith("]"): try: return json.loads(s.replace("'", '"')) except json.JSONDecodeError: return s return raw def main() -> int: if not CONFIGMAP_PATH.exists(): print(f"[ERROR] ConfigMap not found: {CONFIGMAP_PATH}") return 2 if not CONFIG_PY_PATH.exists(): print(f"[ERROR] config.py not found: {CONFIG_PY_PATH}") return 2 try: with CONFIGMAP_PATH.open() as fh: cm_data: dict = (yaml.safe_load(fh) or {}).get("data", {}) or {} except yaml.YAMLError as exc: print(f"[ERROR] ConfigMap YAML parse: {exc}") return 2 try: py_defaults = _parse_settings_defaults(CONFIG_PY_PATH) except SyntaxError as exc: print(f"[ERROR] config.py AST parse: {exc}") return 2 exit_code = 0 print("=== ConfigMap ↔ code.default Drift Check (AST-based) ===") for field in CHECK_FIELDS: cm_raw = cm_data.get(field, "") py_val = py_defaults.get(field, "") cm_val = _normalize(cm_raw) if cm_val == py_val: print(f"[OK] {field}: {cm_val}") else: print(f"[DRIFT] {field}:") print(f" ConfigMap = {cm_val}") print(f" config.py = {py_val}") exit_code = 1 if exit_code == 0: print("=== All drift-check fields aligned ===") else: print("=== DRIFT detected, fix the inconsistency ===") return exit_code if __name__ == "__main__": sys.exit(main())