diff --git a/apps/api/src/api/v1/drift.py b/apps/api/src/api/v1/drift.py index 4775cf03..04920335 100644 --- a/apps/api/src/api/v1/drift.py +++ b/apps/api/src/api/v1/drift.py @@ -48,7 +48,6 @@ async def trigger_drift_scan( """ detector = get_drift_detector() analyzer = get_drift_analyzer() - interpreter = get_drift_interpreter() all_items = [] last_report: DriftReport | None = None @@ -115,18 +114,22 @@ async def rollback_drift(report_id: str) -> dict: return result -@router.post("/reports/{report_id}/adopt", summary="承認變更並更新 Git") -async def adopt_drift(_report_id: str) -> dict: +@router.post("/reports/{report_id}/adopt", summary="承認變更並建立 Git PR") +async def adopt_drift(report_id: str) -> dict: """ - 承認 K8s 漂移,更新 Git 使其與實際狀態一致 + 承認 K8s 漂移,透過 Gitea PR API 將漂移寫回 Git - ⚠️ 2026-04-04 ogt: C2 首席架構師裁示 — 暫時停用(ADR-057 起草後再啟用) - API Pod 內執行 git add -A 有安全風險,改用 Gitea PR API 實作後才開放。 + 2026-04-05 Claude Code: ADR-057 實作 — 改用 Gitea PR API(不再 git push main) + 流程: 建立 drift/adopt-* branch → commit YAML 注解 → 建立 PR → Telegram 通知 SRE """ - raise HTTPException( - status_code=501, - detail="adopt() 端點暫停開放。ADR-057 起草後將改由 Gitea PR API 實作。", - ) + report = _recent_reports.get(report_id) + if not report: + raise HTTPException(status_code=404, detail=f"Report {report_id} not found") + + from src.services.drift_adopt_service import get_drift_adopt_service + adopt_svc = get_drift_adopt_service() + result = await adopt_svc.adopt(report) + return result # ============================================================================= diff --git a/apps/api/src/core/config.py b/apps/api/src/core/config.py index 318766b2..15c8ce29 100644 --- a/apps/api/src/core/config.py +++ b/apps/api/src/core/config.py @@ -97,6 +97,20 @@ class Settings(BaseSettings): description="Phase 25 P0: FORCE_LOCAL DIAGNOSE Ollama timeout (秒),實測 ~173s,200s 含 buffer", ) + # ========================================================================== + # Gitea — ADR-057 adopt() Gitea PR API (2026-04-05) + # ========================================================================== + GITEA_API_URL: str = Field( + default="http://192.168.0.110:3001", + description="Gitea 內網 API base URL", + ) + GITEA_API_TOKEN: str = Field( + default="", + description="Gitea API Token(需 write:repository scope),ADR-057 adopt() 使用", + ) + GITEA_REPO_OWNER: str = Field(default="wooo", description="Gitea repo owner") + GITEA_REPO_NAME: str = Field(default="awoooi", description="Gitea repo name") + # ========================================================================== # CORS - 嚴格白名單 (無 UAT, 無 wildcard) # ========================================================================== diff --git a/apps/api/src/services/drift_adopt_service.py b/apps/api/src/services/drift_adopt_service.py new file mode 100644 index 00000000..26fd8c9b --- /dev/null +++ b/apps/api/src/services/drift_adopt_service.py @@ -0,0 +1,317 @@ +""" +Drift Adopt Service — ADR-057: Gitea PR API 實作 +================================================= +職責:將合法漂移反向寫回 Git,透過 Gitea PR API 而非直接 git push + +設計邊界(核心原則): +- 不直接操作 git(不 git add/commit/push) +- 透過 Gitea REST API 建立 branch + commit + PR +- PR 需 SRE 手動 review 後 merge(不自動 merge main) +- 只修改漂移相關的 YAML 欄位(不 add -A) + +流程: + adopt() 被呼叫 + → 讀取 k8s/ 目錄對應 YAML 檔 + → 建立 drift/adopt-{report_id[:8]}-{ts} branch + → 透過 Gitea API commit YAML 更新 + → 建立 PR + 推送 Telegram 通知 + +版本: v1.0 +建立: 2026-04-05 (台北時區) +建立者: Claude Code (ADR-057 實作) +關聯 ADR: ADR-057 +""" + +from __future__ import annotations + +import base64 +import pathlib +from typing import TYPE_CHECKING + +import httpx +import structlog + +from src.core.config import get_settings +from src.utils.timezone import now_taipei + +if TYPE_CHECKING: + from src.models.drift import DriftReport + +logger = structlog.get_logger(__name__) + + +class DriftAdoptService: + """ + 透過 Gitea PR API 將漂移寫回 Git + + ADR-057 安全設計: + ✅ 使用 Gitea API(不在 API Pod 內執行 git) + ✅ 建立 PR 讓 SRE review(不直接 push main) + ✅ 只 commit k8s/ 目錄的 YAML(不 git add -A) + ✅ API Token 從 K8s Secret 注入(不寫死) + """ + + def __init__(self) -> None: + settings = get_settings() + self._api_url = settings.GITEA_API_URL.rstrip("/") + self._token = settings.GITEA_API_TOKEN + self._owner = settings.GITEA_REPO_OWNER + self._repo = settings.GITEA_REPO_NAME + self._k8s_dir = pathlib.Path("k8s") + + async def adopt(self, report: "DriftReport", field_description: str = "") -> dict: + """ + 將漂移寫回 Git:建立 branch + commit + PR + + Args: + report: 漂移報告(含具體 drift items) + field_description: 漂移欄位說明(用於 PR title) + + Returns: + {"success": bool, "pr_url": str, "message": str} + """ + if not self._token: + return { + "success": False, + "message": "GITEA_API_TOKEN 未設定,請在 K8s Secret 中新增此 key", + "pr_url": None, + } + + ts = now_taipei().strftime("%Y%m%d%H%M%S") + branch_name = f"drift/adopt-{report.report_id[:8]}-{ts}" + pr_title = f"chore: adopt drift — {report.namespace} {field_description or report.summary}" + + logger.info( + "drift_adopt_start", + report_id=report.report_id, + branch=branch_name, + ) + + try: + async with httpx.AsyncClient(timeout=30.0) as client: + headers = { + "Authorization": f"token {self._token}", + "Content-Type": "application/json", + } + + # Step 1: 取得 main branch 的最新 SHA + main_sha = await self._get_main_sha(client, headers) + if not main_sha: + return {"success": False, "message": "無法取得 main branch SHA", "pr_url": None} + + # Step 2: 建立新 branch + branch_ok = await self._create_branch(client, headers, branch_name, main_sha) + if not branch_ok: + return {"success": False, "message": f"建立 branch {branch_name} 失敗", "pr_url": None} + + # Step 3: 找出受影響的 YAML 檔並 commit 更新 + committed_files = await self._commit_drift_yaml(client, headers, branch_name, report) + + # Step 4: 建立 PR + pr_url = await self._create_pr( + client, headers, branch_name, pr_title, report, committed_files + ) + if not pr_url: + return {"success": False, "message": "建立 PR 失敗", "pr_url": None} + + # Step 5: 推送 Telegram 通知 + await self._notify_telegram(pr_url, report, pr_title) + + logger.info("drift_adopt_pr_created", report_id=report.report_id, pr_url=pr_url) + return { + "success": True, + "message": f"PR 已建立,請 SRE review 後 merge", + "pr_url": pr_url, + } + + except Exception as e: + logger.error("drift_adopt_failed", report_id=report.report_id, error=str(e)) + return {"success": False, "message": f"adopt 失敗: {str(e)}", "pr_url": None} + + # ========================================================================= + # Private helpers + # ========================================================================= + + async def _get_main_sha(self, client: httpx.AsyncClient, headers: dict) -> str | None: + """取得 main branch 最新 commit SHA""" + resp = await client.get( + f"{self._api_url}/api/v1/repos/{self._owner}/{self._repo}/branches/main", + headers=headers, + ) + if resp.status_code == 200: + return resp.json()["commit"]["id"] + logger.error("drift_adopt_get_main_sha_failed", status=resp.status_code) + return None + + async def _create_branch( + self, client: httpx.AsyncClient, headers: dict, branch_name: str, main_sha: str + ) -> bool: + """建立新 branch""" + resp = await client.post( + f"{self._api_url}/api/v1/repos/{self._owner}/{self._repo}/branches", + headers=headers, + json={"new_branch_name": branch_name, "old_branch_name": "main"}, + ) + if resp.status_code in (200, 201): + return True + logger.error("drift_adopt_create_branch_failed", status=resp.status_code, body=resp.text[:200]) + return False + + async def _commit_drift_yaml( + self, client: httpx.AsyncClient, headers: dict, branch_name: str, report: "DriftReport" + ) -> list[str]: + """ + 找出受漂移影響的 YAML 檔,更新實際值後 commit 到 branch + + 目前策略:標記 YAML 檔為「已承認漂移」(加注解), + 等 ADR-057 Phase 2 再實作精確 YAML patch 邏輯。 + """ + committed = [] + affected_kinds = { + item.resource_kind.lower() + for item in report.items + if not item.is_allowlisted + } + + for yaml_file in sorted(self._k8s_dir.glob("*.yaml")): + # 判斷此 YAML 是否與漂移相關 + file_stem = yaml_file.stem.lower() + if not any(kind in file_stem for kind in affected_kinds): + continue + + try: + content = yaml_file.read_text() + + # 取得 Gitea 上的 file SHA(用於 update API) + file_sha = await self._get_file_sha(client, headers, str(yaml_file)) + + # 在檔案末尾加入漂移承認注解 + from src.utils.timezone import now_taipei as _now + ts_str = _now().strftime("%Y-%m-%d %H:%M:%S +0800") + annotation = ( + f"\n# [drift-adopted] {ts_str}\n" + f"# Report: {report.report_id}\n" + f"# Namespace: {report.namespace}\n" + f"# Summary: {report.summary}\n" + ) + updated_content = content + annotation + encoded = base64.b64encode(updated_content.encode()).decode() + + # Commit 到 branch + payload = { + "message": f"chore(drift): adopt {report.namespace} drift — {yaml_file.name}", + "content": encoded, + "branch": branch_name, + } + if file_sha: + payload["sha"] = file_sha + + url = f"{self._api_url}/api/v1/repos/{self._owner}/{self._repo}/contents/{yaml_file}" + resp = await client.put(url, headers=headers, json=payload) + if resp.status_code in (200, 201): + committed.append(str(yaml_file)) + logger.info("drift_adopt_file_committed", file=str(yaml_file)) + else: + logger.warning("drift_adopt_file_commit_failed", file=str(yaml_file), status=resp.status_code) + + except Exception as e: + logger.warning("drift_adopt_file_error", file=str(yaml_file), error=str(e)) + + return committed + + async def _get_file_sha( + self, client: httpx.AsyncClient, headers: dict, file_path: str + ) -> str | None: + """取得 Gitea 上檔案的 SHA(update 時需要)""" + resp = await client.get( + f"{self._api_url}/api/v1/repos/{self._owner}/{self._repo}/contents/{file_path}", + headers=headers, + params={"ref": "main"}, + ) + if resp.status_code == 200: + return resp.json().get("sha") + return None + + async def _create_pr( + self, + client: httpx.AsyncClient, + headers: dict, + branch_name: str, + pr_title: str, + report: "DriftReport", + committed_files: list[str], + ) -> str | None: + """建立 Pull Request,回傳 PR URL""" + files_md = "\n".join(f"- `{f}`" for f in committed_files) if committed_files else "(無直接匹配的 YAML 檔)" + + intent_label = "❓ 意圖不明" + if report.interpretation: + intent_map = { + "emergency_hotfix": "🚨 緊急 Hotfix", + "human_error": "⚠️ 人為誤操作", + "automated_change": "🤖 系統自動變更", + "unknown": "❓ 意圖不明", + } + intent_label = intent_map.get(report.interpretation.intent.value, "❓ 意圖不明") + + body = ( + f"## Config Drift 承認\n\n" + f"**Report ID**: `{report.report_id}`\n" + f"**Namespace**: `{report.namespace}`\n" + f"**漂移摘要**: {report.summary}\n" + f"**Nemotron 意圖**: {intent_label}\n" + f"{f'**說明**: {report.interpretation.explanation}' if report.interpretation else ''}\n\n" + f"## 異動檔案\n\n{files_md}\n\n" + f"## 說明\n\n" + f"此 PR 由 AWOOOI Config Drift Detection 系統自動建立。\n" + f"承認此次 K8s 漂移為合法變更,將漂移狀態寫回 Git。\n\n" + f"> **SRE 確認事項**:\n" + f"> - [ ] 確認漂移是預期的合法變更\n" + f"> - [ ] 確認 YAML 注解正確反映變更意圖\n" + f"> - [ ] merge 後手動更新 K8s YAML 的實際差異值\n" + ) + + resp = await client.post( + f"{self._api_url}/api/v1/repos/{self._owner}/{self._repo}/pulls", + headers=headers, + json={ + "title": pr_title, + "body": body, + "head": branch_name, + "base": "main", + }, + ) + if resp.status_code in (200, 201): + pr_data = resp.json() + return pr_data.get("html_url") or pr_data.get("url") + logger.error("drift_adopt_create_pr_failed", status=resp.status_code, body=resp.text[:300]) + return None + + async def _notify_telegram(self, pr_url: str, report: "DriftReport", pr_title: str) -> None: + """推送 Telegram 通知 SRE""" + try: + from src.services.telegram_gateway import get_telegram_gateway + tg = get_telegram_gateway() + await tg.send_text( + f"📋 Config Drift 承認 PR 已建立\n" + f"Namespace: {report.namespace}\n" + f"漂移: {report.summary}\n\n" + f"PR: {pr_url}\n\n" + f"請 SRE review 後 merge。" + ) + except Exception as e: + logger.warning("drift_adopt_telegram_failed", error=str(e)) + + +# ============================================================================= +# Singleton +# ============================================================================= + +_adopt_service: DriftAdoptService | None = None + + +def get_drift_adopt_service() -> DriftAdoptService: + global _adopt_service + if _adopt_service is None: + _adopt_service = DriftAdoptService() + return _adopt_service