fix: harden google drive auto import auth
All checks were successful
CD Pipeline / deploy (push) Successful in 6m31s

This commit is contained in:
ogt
2026-06-25 13:20:22 +08:00
parent deb771d6f7
commit 14f8ba05ec
8 changed files with 194 additions and 7 deletions

View File

@@ -402,7 +402,7 @@ YOUTUBE_API_KEY = os.getenv('YOUTUBE_API_KEY', '')
# ==========================================
# 系統版本與路徑
# ==========================================
SYSTEM_VERSION = "V10.670"
SYSTEM_VERSION = "V10.671"
LOG_FILE_PATH = os.path.join(BASE_DIR, 'logs/system.log')
public_url = PUBLIC_URL # 用於模板顯示

View File

@@ -212,6 +212,7 @@ services:
- ./config.py:/app/config.py:ro
- ./scheduler.py:/app/scheduler.py:ro
- ./run_scheduler.py:/app/run_scheduler.py:ro
- ./scripts:/app/scripts:ro
- ./services:/app/services:ro
- ./routes:/app/routes:ro
- ./database:/app/database:ro # 資料庫模型

View File

@@ -747,3 +747,4 @@ POSTGRES_HOST=momo-db
| 2026-06-25 | 低頻治理頁與舊入口不可出現英文工程狀態或教學句 | V10.668 起 legacy bridge、維護/權限、匯入、設定、通知模板、缺貨管理、登入歷史、系統日誌與 AI 健康檢查頁統一改為繁中短句,聚焦資料可靠、權限守門、供貨風險與部署治理如何支援業績流程。 |
| 2026-06-25 | 匯入頁不可把資料表流程當成使用者主訊息 | V10.669 起雲端匯入與系統匯入完成訊息改說明業績資料新鮮度與更新筆數,不再用「下載→匯入資料庫→刪除」或資料表名稱作為前台重點。 |
| 2026-06-25 | 可見操作頁不可把權杖、DB、Agent、Pipeline 當成主語 | V10.670 起 AI 助手、日報、銷售分析、缺貨、部署監控與觀測台頁面進一步改用「用量、產出紀錄、AI 分工、部署流程、知識命中」等營運可讀語言。 |
| 2026-06-25 | Google Drive 自動匯入不可在正式排程開瀏覽器 | V10.671 起背景匯入缺少 `config/google_token.json` 時 fail-closed 並提示一次性授權檔轉換;正式 scheduler 不再嘗試 `run_local_server()`,且 token refresh 必須能寫回共用 `config/` 掛載,避免主機重啟後再次出現 `could not locate runnable browser` 或授權檔遺失。 |

View File

@@ -20,12 +20,22 @@
---
## 🛠️ 首次認證步驟
`config/google_token.pickle` 遺失或過期,執行以下指令:
正式排程不可啟動瀏覽器;只有人工在可互動環境重新授權時,才允許開啟 OAuth 瀏覽器。
`config/google_token.json` 遺失或過期,執行以下指令:
```bash
python3 -c "from services.google_drive_service import drive_service; drive_service.authenticate()"
GOOGLE_DRIVE_ALLOW_INTERACTIVE_AUTH=true python3 -c "from services.google_drive_service import drive_service; drive_service.authenticate()"
```
執行後會彈出瀏覽器要求授權。
若正式環境仍只有舊版 `config/google_token.pickle`,需在可信任的正式容器中做一次性轉換:
```bash
MOMO_ALLOW_LEGACY_GOOGLE_TOKEN_PICKLE_MIGRATION=true python3 scripts/tools/migrate_google_drive_token.py
```
正式 `config/` 是 scheduler 與 app 共用的 bind mount容器必須能寫入
`config/google_token.json`,否則 token refresh 後無法持久化,主機重啟後會再次失敗。
若正式 Docker 啟用 user namespace remaphost 端需讓 remap 後的 container root 可寫入此目錄。
---
## 📁 資料夾結構要求

View File

@@ -0,0 +1,55 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""一次性把舊版 Google Drive pickle token 轉成 JSON token。
此腳本只供受控維運使用。pickle 可能執行任意程式碼,所以必須用明確
環境變數批准,且只在可信任的正式 config 來源上執行。
"""
import json
import os
import pickle
from pathlib import Path
LEGACY_TOKEN_FILE = Path(os.getenv("GOOGLE_DRIVE_LEGACY_TOKEN_FILE", "config/google_token.pickle"))
TARGET_TOKEN_FILE = Path(os.getenv("GOOGLE_DRIVE_TOKEN_FILE", "config/google_token.json"))
ALLOW_ENV = "MOMO_ALLOW_LEGACY_GOOGLE_TOKEN_PICKLE_MIGRATION"
def _allowed() -> bool:
return os.getenv(ALLOW_ENV, "").strip().lower() in {"1", "true", "yes", "on"}
def main() -> int:
if not _allowed():
print(f"拒絕執行:請先設定 {ALLOW_ENV}=true。")
return 2
if not LEGACY_TOKEN_FILE.exists():
print(f"找不到舊版授權檔:{LEGACY_TOKEN_FILE}")
return 1
if TARGET_TOKEN_FILE.exists():
print(f"JSON 授權檔已存在:{TARGET_TOKEN_FILE}")
return 0
with LEGACY_TOKEN_FILE.open("rb") as handle:
credentials = pickle.load(handle)
if not hasattr(credentials, "to_json"):
print("舊版授權檔格式不支援轉換。")
return 1
token_payload = json.loads(credentials.to_json())
TARGET_TOKEN_FILE.parent.mkdir(parents=True, exist_ok=True)
tmp_path = TARGET_TOKEN_FILE.with_name(f"{TARGET_TOKEN_FILE.name}.tmp")
tmp_path.write_text(json.dumps(token_payload, ensure_ascii=False, indent=2), encoding="utf-8")
os.chmod(tmp_path, 0o600)
os.replace(tmp_path, TARGET_TOKEN_FILE)
print(f"已產生 JSON 授權檔:{TARGET_TOKEN_FILE}")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -29,6 +29,12 @@ SCOPES = ['https://www.googleapis.com/auth/drive']
CREDENTIALS_FILE = 'config/google_credentials.json'
TOKEN_FILE = 'config/google_token.json'
_LEGACY_PICKLE_FILE = 'config/google_token.pickle'
INTERACTIVE_AUTH_ENV = 'GOOGLE_DRIVE_ALLOW_INTERACTIVE_AUTH'
def _interactive_auth_allowed() -> bool:
"""Background jobs must not try to open a browser inside containers."""
return os.getenv(INTERACTIVE_AUTH_ENV, "").strip().lower() in {"1", "true", "yes", "on"}
class GoogleDriveService:
@@ -49,6 +55,31 @@ class GoogleDriveService:
self.last_error_kind = kind
self.last_error = str(message)[:500]
def _save_credentials(self) -> bool:
try:
token_dir = os.path.dirname(TOKEN_FILE)
if token_dir:
os.makedirs(token_dir, exist_ok=True)
tmp_file = f"{TOKEN_FILE}.tmp"
with open(tmp_file, 'w') as token:
token.write(self.credentials.to_json())
try:
os.chmod(tmp_file, 0o600)
except OSError:
logger.debug("Google Drive token 暫存檔權限調整失敗", exc_info=True)
os.replace(tmp_file, TOKEN_FILE)
logger.info(f"憑證已儲存到: {TOKEN_FILE}")
return True
except Exception as exc:
error_message = (
f"Google Drive 授權可用但無法寫回 {TOKEN_FILE}"
"主機重啟後自動匯入會再次失敗。請修復正式 config 掛載目錄寫入權限。"
f"原始錯誤:{exc}"
)
self._set_error("token_store_failed", error_message)
logger.error(error_message)
return False
@staticmethod
def _escape_query_value(value: str) -> str:
return value.replace("\\", "\\\\").replace("'", "\\'")
@@ -89,6 +120,22 @@ class GoogleDriveService:
logger.error(error_message)
return False
if not _interactive_auth_allowed():
if os.path.exists(_LEGACY_PICKLE_FILE) and not os.path.exists(TOKEN_FILE):
error_message = (
"偵測到舊版 Google Drive 授權檔 config/google_token.pickle"
"但正式排程只讀 config/google_token.json。請先執行一次性授權檔轉換"
"再讓自動匯入任務重跑。"
)
else:
error_message = (
"Google Drive 需要重新授權,但背景排程不可啟動瀏覽器。"
"請在可互動環境完成 OAuth或提供 config/google_token.json 後再重跑。"
)
self._set_error("reauthorization_required", error_message)
logger.error(error_message)
return False
logger.info("進行 Google Drive 認證...")
flow = InstalledAppFlow.from_client_secrets_file(
CREDENTIALS_FILE, SCOPES
@@ -97,9 +144,8 @@ class GoogleDriveService:
self.credentials = flow.run_local_server()
# 儲存憑證供下次使用JSON 格式,安全無 RCE 風險)
with open(TOKEN_FILE, 'w') as token:
token.write(self.credentials.to_json())
logger.info(f"憑證已儲存到: {TOKEN_FILE}")
if not self._save_credentials():
return False
# 建立 Drive API 服務
self.service = build('drive', 'v3', credentials=self.credentials)

View File

@@ -171,6 +171,79 @@ def test_auto_import_fails_closed_when_drive_auth_fails(monkeypatch, tmp_path):
assert "could not locate runnable browser" in result["message"]
def test_google_drive_auth_refuses_browser_without_json_token(monkeypatch, tmp_path):
import services.google_drive_service as google_drive_service
google_drive_service = importlib.reload(google_drive_service)
token_file = tmp_path / "google_token.json"
legacy_file = tmp_path / "google_token.pickle"
credentials_file = tmp_path / "google_credentials.json"
legacy_file.write_bytes(b"legacy-token-placeholder")
credentials_file.write_text("{}", encoding="utf-8")
monkeypatch.setattr(google_drive_service, "TOKEN_FILE", str(token_file))
monkeypatch.setattr(google_drive_service, "_LEGACY_PICKLE_FILE", str(legacy_file))
monkeypatch.setattr(google_drive_service, "CREDENTIALS_FILE", str(credentials_file))
monkeypatch.delenv("GOOGLE_DRIVE_ALLOW_INTERACTIVE_AUTH", raising=False)
def fail_if_browser_flow_is_started(*args, **kwargs):
raise AssertionError("背景匯入不可啟動瀏覽器 OAuth")
monkeypatch.setattr(
google_drive_service.InstalledAppFlow,
"from_client_secrets_file",
fail_if_browser_flow_is_started,
)
service = google_drive_service.GoogleDriveService()
assert service.authenticate() is False
assert service.last_error_kind == "reauthorization_required"
assert "google_token.json" in service.last_error
assert "google_token.pickle" in service.last_error
def test_google_drive_auth_fails_when_token_cannot_be_persisted(monkeypatch, tmp_path):
import services.google_drive_service as google_drive_service
google_drive_service = importlib.reload(google_drive_service)
token_file = tmp_path / "readonly" / "google_token.json"
token_file.parent.mkdir()
credentials_file = tmp_path / "google_credentials.json"
credentials_file.write_text("{}", encoding="utf-8")
class FakeCredentials:
valid = False
expired = True
refresh_token = "refresh-token"
def refresh(self, request):
self.valid = True
def to_json(self):
return "{}"
monkeypatch.setattr(google_drive_service, "TOKEN_FILE", str(token_file))
monkeypatch.setattr(google_drive_service, "CREDENTIALS_FILE", str(credentials_file))
monkeypatch.setattr(
google_drive_service.Credentials,
"from_authorized_user_info",
lambda *_args, **_kwargs: FakeCredentials(),
)
monkeypatch.setattr(
google_drive_service.os,
"replace",
lambda *_args, **_kwargs: (_ for _ in ()).throw(PermissionError("permission denied")),
)
token_file.write_text("{}", encoding="utf-8")
service = google_drive_service.GoogleDriveService()
assert service.authenticate() is False
assert service.last_error_kind == "token_store_failed"
assert "主機重啟後自動匯入會再次失敗" in service.last_error
def test_auto_import_empty_drive_folder_remains_success(monkeypatch, tmp_path):
import_service = _load_import_service(monkeypatch, f"sqlite:///{tmp_path / 'momo.db'}")
import_service.Base.metadata.create_all(import_service.engine)

View File

@@ -36,6 +36,7 @@ print()
print("正在啟動 OAuth 2.0 認證流程...")
print("瀏覽器將自動開啟,請完成授權。")
print()
os.environ["GOOGLE_DRIVE_ALLOW_INTERACTIVE_AUTH"] = "true"
try:
from services.google_drive_service import drive_service
@@ -46,7 +47,7 @@ try:
print("✅ 認證成功!")
print("=" * 60)
print()
print("Token 已儲存至: config/google_token.pickle")
print("Token 已儲存至: config/google_token.json")
print()
print("現在可以:")
print(" 1. 在網頁介面測試連接: http://localhost/auto_import")