Files
awoooi/apps/api/src/services/drift_adopt_service.py
OG T a81bf50537 feat(drift): ADR-057 adopt() Gitea PR API 實作
- DriftAdoptService: 透過 Gitea REST API 建立 branch + commit + PR
  不在 API Pod 內執行 git(修復 C2 安全漏洞)
- adopt() 端點: 501 → 真實實作(呼叫 DriftAdoptService)
- config.py: 新增 GITEA_API_URL / GITEA_API_TOKEN / GITEA_REPO_OWNER / GITEA_REPO_NAME
- K8s secret awoooi-secrets 已注入 GITEA_API_TOKEN
- drift.py: 移除 trigger_drift_scan 中未使用的 interpreter 變數

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-05 00:39:29 +08:00

318 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Drift Adopt Service — ADR-057: Gitea PR API 實作
=================================================
職責:將合法漂移反向寫回 Git透過 Gitea PR API 而非直接 git push
設計邊界(核心原則):
- 不直接操作 git不 git add/commit/push
- 透過 Gitea REST API 建立 branch + commit + PR
- PR 需 SRE 手動 review 後 merge不自動 merge main
- 只修改漂移相關的 YAML 欄位(不 add -A
流程:
adopt() 被呼叫
→ 讀取 k8s/ 目錄對應 YAML 檔
→ 建立 drift/adopt-{report_id[:8]}-{ts} branch
→ 透過 Gitea API commit YAML 更新
→ 建立 PR + 推送 Telegram 通知
版本: v1.0
建立: 2026-04-05 (台北時區)
建立者: Claude Code (ADR-057 實作)
關聯 ADR: ADR-057
"""
from __future__ import annotations
import base64
import pathlib
from typing import TYPE_CHECKING
import httpx
import structlog
from src.core.config import get_settings
from src.utils.timezone import now_taipei
if TYPE_CHECKING:
from src.models.drift import DriftReport
logger = structlog.get_logger(__name__)
class DriftAdoptService:
"""
透過 Gitea PR API 將漂移寫回 Git
ADR-057 安全設計:
✅ 使用 Gitea API不在 API Pod 內執行 git
✅ 建立 PR 讓 SRE review不直接 push main
✅ 只 commit k8s/ 目錄的 YAML不 git add -A
✅ API Token 從 K8s Secret 注入(不寫死)
"""
def __init__(self) -> None:
settings = get_settings()
self._api_url = settings.GITEA_API_URL.rstrip("/")
self._token = settings.GITEA_API_TOKEN
self._owner = settings.GITEA_REPO_OWNER
self._repo = settings.GITEA_REPO_NAME
self._k8s_dir = pathlib.Path("k8s")
async def adopt(self, report: "DriftReport", field_description: str = "") -> dict:
"""
將漂移寫回 Git建立 branch + commit + PR
Args:
report: 漂移報告(含具體 drift items
field_description: 漂移欄位說明(用於 PR title
Returns:
{"success": bool, "pr_url": str, "message": str}
"""
if not self._token:
return {
"success": False,
"message": "GITEA_API_TOKEN 未設定,請在 K8s Secret 中新增此 key",
"pr_url": None,
}
ts = now_taipei().strftime("%Y%m%d%H%M%S")
branch_name = f"drift/adopt-{report.report_id[:8]}-{ts}"
pr_title = f"chore: adopt drift — {report.namespace} {field_description or report.summary}"
logger.info(
"drift_adopt_start",
report_id=report.report_id,
branch=branch_name,
)
try:
async with httpx.AsyncClient(timeout=30.0) as client:
headers = {
"Authorization": f"token {self._token}",
"Content-Type": "application/json",
}
# Step 1: 取得 main branch 的最新 SHA
main_sha = await self._get_main_sha(client, headers)
if not main_sha:
return {"success": False, "message": "無法取得 main branch SHA", "pr_url": None}
# Step 2: 建立新 branch
branch_ok = await self._create_branch(client, headers, branch_name, main_sha)
if not branch_ok:
return {"success": False, "message": f"建立 branch {branch_name} 失敗", "pr_url": None}
# Step 3: 找出受影響的 YAML 檔並 commit 更新
committed_files = await self._commit_drift_yaml(client, headers, branch_name, report)
# Step 4: 建立 PR
pr_url = await self._create_pr(
client, headers, branch_name, pr_title, report, committed_files
)
if not pr_url:
return {"success": False, "message": "建立 PR 失敗", "pr_url": None}
# Step 5: 推送 Telegram 通知
await self._notify_telegram(pr_url, report, pr_title)
logger.info("drift_adopt_pr_created", report_id=report.report_id, pr_url=pr_url)
return {
"success": True,
"message": f"PR 已建立,請 SRE review 後 merge",
"pr_url": pr_url,
}
except Exception as e:
logger.error("drift_adopt_failed", report_id=report.report_id, error=str(e))
return {"success": False, "message": f"adopt 失敗: {str(e)}", "pr_url": None}
# =========================================================================
# Private helpers
# =========================================================================
async def _get_main_sha(self, client: httpx.AsyncClient, headers: dict) -> str | None:
"""取得 main branch 最新 commit SHA"""
resp = await client.get(
f"{self._api_url}/api/v1/repos/{self._owner}/{self._repo}/branches/main",
headers=headers,
)
if resp.status_code == 200:
return resp.json()["commit"]["id"]
logger.error("drift_adopt_get_main_sha_failed", status=resp.status_code)
return None
async def _create_branch(
self, client: httpx.AsyncClient, headers: dict, branch_name: str, main_sha: str
) -> bool:
"""建立新 branch"""
resp = await client.post(
f"{self._api_url}/api/v1/repos/{self._owner}/{self._repo}/branches",
headers=headers,
json={"new_branch_name": branch_name, "old_branch_name": "main"},
)
if resp.status_code in (200, 201):
return True
logger.error("drift_adopt_create_branch_failed", status=resp.status_code, body=resp.text[:200])
return False
async def _commit_drift_yaml(
self, client: httpx.AsyncClient, headers: dict, branch_name: str, report: "DriftReport"
) -> list[str]:
"""
找出受漂移影響的 YAML 檔,更新實際值後 commit 到 branch
目前策略:標記 YAML 檔為「已承認漂移」(加注解),
等 ADR-057 Phase 2 再實作精確 YAML patch 邏輯。
"""
committed = []
affected_kinds = {
item.resource_kind.lower()
for item in report.items
if not item.is_allowlisted
}
for yaml_file in sorted(self._k8s_dir.glob("*.yaml")):
# 判斷此 YAML 是否與漂移相關
file_stem = yaml_file.stem.lower()
if not any(kind in file_stem for kind in affected_kinds):
continue
try:
content = yaml_file.read_text()
# 取得 Gitea 上的 file SHA用於 update API
file_sha = await self._get_file_sha(client, headers, str(yaml_file))
# 在檔案末尾加入漂移承認注解
from src.utils.timezone import now_taipei as _now
ts_str = _now().strftime("%Y-%m-%d %H:%M:%S +0800")
annotation = (
f"\n# [drift-adopted] {ts_str}\n"
f"# Report: {report.report_id}\n"
f"# Namespace: {report.namespace}\n"
f"# Summary: {report.summary}\n"
)
updated_content = content + annotation
encoded = base64.b64encode(updated_content.encode()).decode()
# Commit 到 branch
payload = {
"message": f"chore(drift): adopt {report.namespace} drift — {yaml_file.name}",
"content": encoded,
"branch": branch_name,
}
if file_sha:
payload["sha"] = file_sha
url = f"{self._api_url}/api/v1/repos/{self._owner}/{self._repo}/contents/{yaml_file}"
resp = await client.put(url, headers=headers, json=payload)
if resp.status_code in (200, 201):
committed.append(str(yaml_file))
logger.info("drift_adopt_file_committed", file=str(yaml_file))
else:
logger.warning("drift_adopt_file_commit_failed", file=str(yaml_file), status=resp.status_code)
except Exception as e:
logger.warning("drift_adopt_file_error", file=str(yaml_file), error=str(e))
return committed
async def _get_file_sha(
self, client: httpx.AsyncClient, headers: dict, file_path: str
) -> str | None:
"""取得 Gitea 上檔案的 SHAupdate 時需要)"""
resp = await client.get(
f"{self._api_url}/api/v1/repos/{self._owner}/{self._repo}/contents/{file_path}",
headers=headers,
params={"ref": "main"},
)
if resp.status_code == 200:
return resp.json().get("sha")
return None
async def _create_pr(
self,
client: httpx.AsyncClient,
headers: dict,
branch_name: str,
pr_title: str,
report: "DriftReport",
committed_files: list[str],
) -> str | None:
"""建立 Pull Request回傳 PR URL"""
files_md = "\n".join(f"- `{f}`" for f in committed_files) if committed_files else "(無直接匹配的 YAML 檔)"
intent_label = "❓ 意圖不明"
if report.interpretation:
intent_map = {
"emergency_hotfix": "🚨 緊急 Hotfix",
"human_error": "⚠️ 人為誤操作",
"automated_change": "🤖 系統自動變更",
"unknown": "❓ 意圖不明",
}
intent_label = intent_map.get(report.interpretation.intent.value, "❓ 意圖不明")
body = (
f"## Config Drift 承認\n\n"
f"**Report ID**: `{report.report_id}`\n"
f"**Namespace**: `{report.namespace}`\n"
f"**漂移摘要**: {report.summary}\n"
f"**Nemotron 意圖**: {intent_label}\n"
f"{f'**說明**: {report.interpretation.explanation}' if report.interpretation else ''}\n\n"
f"## 異動檔案\n\n{files_md}\n\n"
f"## 說明\n\n"
f"此 PR 由 AWOOOI Config Drift Detection 系統自動建立。\n"
f"承認此次 K8s 漂移為合法變更,將漂移狀態寫回 Git。\n\n"
f"> **SRE 確認事項**:\n"
f"> - [ ] 確認漂移是預期的合法變更\n"
f"> - [ ] 確認 YAML 注解正確反映變更意圖\n"
f"> - [ ] merge 後手動更新 K8s YAML 的實際差異值\n"
)
resp = await client.post(
f"{self._api_url}/api/v1/repos/{self._owner}/{self._repo}/pulls",
headers=headers,
json={
"title": pr_title,
"body": body,
"head": branch_name,
"base": "main",
},
)
if resp.status_code in (200, 201):
pr_data = resp.json()
return pr_data.get("html_url") or pr_data.get("url")
logger.error("drift_adopt_create_pr_failed", status=resp.status_code, body=resp.text[:300])
return None
async def _notify_telegram(self, pr_url: str, report: "DriftReport", pr_title: str) -> None:
"""推送 Telegram 通知 SRE"""
try:
from src.services.telegram_gateway import get_telegram_gateway
tg = get_telegram_gateway()
await tg.send_text(
f"📋 <b>Config Drift 承認 PR 已建立</b>\n"
f"Namespace: {report.namespace}\n"
f"漂移: {report.summary}\n\n"
f"PR: {pr_url}\n\n"
f"請 SRE review 後 merge。"
)
except Exception as e:
logger.warning("drift_adopt_telegram_failed", error=str(e))
# =============================================================================
# Singleton
# =============================================================================
_adopt_service: DriftAdoptService | None = None
def get_drift_adopt_service() -> DriftAdoptService:
global _adopt_service
if _adopt_service is None:
_adopt_service = DriftAdoptService()
return _adopt_service