From f3e412cd211f5e4601204b256aeb95eae073b441 Mon Sep 17 00:00:00 2001 From: ogt Date: Sat, 27 Jun 2026 20:31:34 +0800 Subject: [PATCH] fix: harden google drive import auth --- config.py | 2 +- docs/AI_INTELLIGENCE_MODULE_SOT.md | 3 +- routes/auto_import_routes.py | 21 ++- routes/system_public_routes.py | 1 + services/google_drive_service.py | 170 +++++++++++++++++------- services/import_service.py | 10 +- tests/test_google_drive_auth.py | 97 +++++++------- tests/test_google_drive_runtime_auth.py | 98 ++++++++++++++ 8 files changed, 301 insertions(+), 101 deletions(-) create mode 100644 tests/test_google_drive_runtime_auth.py diff --git a/config.py b/config.py index 9780593..5998e71 100644 --- a/config.py +++ b/config.py @@ -402,7 +402,7 @@ YOUTUBE_API_KEY = os.getenv('YOUTUBE_API_KEY', '') # ========================================== # 系統版本與路徑 # ========================================== -SYSTEM_VERSION = "V10.724" +SYSTEM_VERSION = "V10.725" LOG_FILE_PATH = os.path.join(BASE_DIR, 'logs/system.log') public_url = PUBLIC_URL # 用於模板顯示 diff --git a/docs/AI_INTELLIGENCE_MODULE_SOT.md b/docs/AI_INTELLIGENCE_MODULE_SOT.md index 3ea186a..22092c6 100644 --- a/docs/AI_INTELLIGENCE_MODULE_SOT.md +++ b/docs/AI_INTELLIGENCE_MODULE_SOT.md @@ -2,7 +2,7 @@ > **最後更新**: 2026-06-27 (台北時間) > **狀態**: 🟢 四 AI Agent 自動化閉環已落地;LLM 路由紅線升級為 Ollama-first 三主機級聯;PChome 後台業績匯入韌性已補強;產品定位正名為「PChome 業績成長自動化作戰系統」;外部市場來源正規化層、自動同步、作戰清單與價格參考表優先讀取、CSV 備援預檢、前台操作入口、高可見頁面繁中化守門、比價/作戰 UI 工作台化、跨平台來源治理與商品身份 UI 契約已建立,GCP embedding 熔斷延後處理、110 proxy rescue 與 direct host health skip 已建立 -> **適用版本**: V10.724 +> **適用版本**: V10.725 --- @@ -809,3 +809,4 @@ POSTGRES_HOST=momo-db | 2026-06-27 | 服務更新監控頁不得以內部工具名當主語 | V10.722 起 `/cicd` 可見文字使用「測試站、正式站、監控圖表、自動化流程、今日更新」;前台模板不得用 `issue.error_log` 判斷顯示診斷資料,也不得顯示 `UAT 狀態`、`PROD 狀態`、`Grafana`、`n8n` 作為主按鈕文字。 | | 2026-06-27 | 系統事件頁不得顯示或下載原始工程紀錄 | V10.723 起 `/logs` 改為「系統事件紀錄」,前台只顯示事件等級、營運判讀與建議處置;不得以 raw log 變數、舊下載檔名或「系統日誌/下載日誌」作為使用者可見介面。 | | 2026-06-27 | AI 挑品必須直接顯示商品證據與賣場入口 | V10.724 起商品看板 AI 挑品清單會在 selection 階段合併最新 PChome 同款證據,卡片需顯示 MOMO 商品ID、PChome 商品ID、同款信心與兩邊賣場入口;商品圖需使用 PChome 圖片作為 fallback,不得只給一段建議理由。 | +| 2026-06-27 | Google Drive 自動匯入不得在背景排程啟動瀏覽器授權 | V10.725 起正式容器與排程服務即使誤設 `GOOGLE_DRIVE_ALLOW_INTERACTIVE_AUTH=true` 也會硬阻擋互動 OAuth;匯入前會檢查 JSON token 是否存在、可刷新且可寫回,Drive 測試與檔案列表 API 會 fail-closed 回傳可處理的授權狀態。 | diff --git a/routes/auto_import_routes.py b/routes/auto_import_routes.py index 86e110a..2b3b5de 100644 --- a/routes/auto_import_routes.py +++ b/routes/auto_import_routes.py @@ -132,14 +132,21 @@ def test_drive_connection(): try: # 嘗試認證 if drive_service.authenticate(): + readiness = drive_service.check_auth_readiness(refresh_expired=False) return jsonify({ 'success': True, - 'message': 'Google Drive 連接成功' + 'message': 'Google Drive 連接成功', + 'auth_ready': True, + 'auth_status': readiness.get('kind') or 'ready', }) else: + error_kind = getattr(drive_service, 'last_error_kind', None) or 'authentication_failed' + error_message = getattr(drive_service, 'last_error', None) or error_kind return jsonify({ 'success': False, - 'message': 'Google Drive 認證失敗' + 'message': humanize_import_error(error_message) or 'Google Drive 認證失敗,請重新確認雲端授權。', + 'auth_ready': False, + 'auth_status': error_kind, }), 400 except Exception as e: @@ -159,6 +166,16 @@ def list_drive_files(): file_pattern = data.get('file_pattern', '即時業績_當日') files = drive_service.list_files_in_folder(folder_path, file_pattern) + error_kind = getattr(drive_service, 'last_error_kind', None) + if error_kind: + error_message = getattr(drive_service, 'last_error', None) or error_kind + return jsonify({ + 'success': False, + 'message': humanize_import_error(error_message) or 'Google Drive 連線或授權異常,無法確認檔案清單。', + 'auth_status': error_kind, + 'data': [], + 'count': 0, + }), 400 return jsonify({ 'success': True, diff --git a/routes/system_public_routes.py b/routes/system_public_routes.py index f124244..7608d1d 100644 --- a/routes/system_public_routes.py +++ b/routes/system_public_routes.py @@ -91,6 +91,7 @@ _PUBLIC_LOG_RAW_ERROR_MARKERS = ( 'httpconnectionpool', 'all 3 hosts failed', 'multimodal data provided', + 'could not locate runnable browser', 'traceback', 'secret', 'api_key', diff --git a/services/google_drive_service.py b/services/google_drive_service.py index 20b29d6..77f1499 100644 --- a/services/google_drive_service.py +++ b/services/google_drive_service.py @@ -32,11 +32,21 @@ TOKEN_FILE = os.getenv('GOOGLE_DRIVE_TOKEN_FILE', 'config/google_token.json') _LEGACY_PICKLE_FILE = os.getenv('GOOGLE_DRIVE_LEGACY_PICKLE_FILE', 'config/google_token.pickle') INTERACTIVE_AUTH_ENV = 'GOOGLE_DRIVE_ALLOW_INTERACTIVE_AUTH' INTERACTIVE_AUTH_TIMEOUT_ENV = 'GOOGLE_DRIVE_INTERACTIVE_AUTH_TIMEOUT_SECONDS' +NONINTERACTIVE_RUNTIME_ENV = 'GOOGLE_DRIVE_NONINTERACTIVE_RUNTIME' def _interactive_auth_allowed() -> bool: """Background jobs must not try to open a browser inside containers.""" - return os.getenv(INTERACTIVE_AUTH_ENV, "").strip().lower() in {"1", "true", "yes", "on"} + requested = os.getenv(INTERACTIVE_AUTH_ENV, "").strip().lower() in {"1", "true", "yes", "on"} + if not requested: + return False + noninteractive = ( + os.getenv(NONINTERACTIVE_RUNTIME_ENV, "").strip().lower() in {"1", "true", "yes", "on"} + or os.getenv("FLASK_ENV", "").strip().lower() == "production" + or os.getenv("KUBERNETES_SERVICE_HOST") + or os.path.exists("/.dockerenv") + ) + return not noninteractive def _interactive_auth_timeout_seconds() -> int: @@ -89,6 +99,92 @@ class GoogleDriveService: logger.error(error_message) return False + def check_auth_readiness(self, refresh_expired: bool = True) -> Dict[str, Any]: + """Validate persisted Google Drive credentials without starting browser OAuth.""" + self._clear_error() + status = { + "ready": False, + "kind": None, + "message": "", + "credentials_file_exists": os.path.exists(CREDENTIALS_FILE), + "token_file_exists": os.path.exists(TOKEN_FILE), + "token_file_writable": os.access(TOKEN_FILE, os.W_OK) if os.path.exists(TOKEN_FILE) else False, + "token_dir_writable": os.access(os.path.dirname(TOKEN_FILE) or ".", os.W_OK), + "has_refresh_token": False, + "interactive_auth_allowed": _interactive_auth_allowed(), + } + + if not status["credentials_file_exists"]: + status.update({ + "kind": "credentials_missing", + "message": "找不到 Google Drive 憑證檔,請確認雲端匯入設定。" + }) + self._set_error(status["kind"], status["message"]) + return status + + if not status["token_file_exists"]: + status.update({ + "kind": "reauthorization_required", + "message": "Google Drive 授權檔不存在;背景排程不可啟動瀏覽器,請先提供 config/google_token.json。" + }) + self._set_error(status["kind"], status["message"]) + return status + + try: + with open(TOKEN_FILE, 'r') as token: + token_data = json.load(token) + self.credentials = Credentials.from_authorized_user_info(token_data, SCOPES) + status["has_refresh_token"] = bool(getattr(self.credentials, "refresh_token", None)) + except Exception as exc: + status.update({ + "kind": "invalid_token_file", + "message": "Google Drive 授權檔無法讀取;請重新提供有效的 JSON 授權檔。" + }) + self._set_error(status["kind"], f"{status['message']} 原始錯誤:{exc}") + return status + + if not status["token_file_writable"] or not status["token_dir_writable"]: + status.update({ + "kind": "token_store_failed", + "message": "Google Drive 授權檔或設定目錄不可寫,重啟或刷新後可能再次失效。" + }) + self._set_error(status["kind"], status["message"]) + return status + + if self.credentials and self.credentials.valid: + status.update({"ready": True, "kind": "ready", "message": "Google Drive 授權可用。"}) + self._clear_error() + return status + + if self.credentials and self.credentials.expired and self.credentials.refresh_token: + if not refresh_expired: + status.update({"kind": "token_expired", "message": "Google Drive 授權已過期,等待刷新。"}) + self._set_error(status["kind"], status["message"]) + return status + try: + logger.info("刷新 Google Drive token...") + self.credentials.refresh(Request()) + except Exception as exc: + status.update({ + "kind": "reauthorization_required", + "message": "Google Drive refresh token 已失效;請重新完成雲端授權。" + }) + self._set_error(status["kind"], f"{status['message']} 原始錯誤:{exc}") + return status + if not self._save_credentials(): + status.update({"kind": self.last_error_kind, "message": self.last_error or ""}) + return status + status.update({"ready": True, "kind": "ready", "message": "Google Drive 授權已刷新並可用。"}) + self._clear_error() + return status + + status.update({ + "kind": "reauthorization_required", + "message": "Google Drive 授權缺少可刷新憑證;背景排程不可啟動瀏覽器,請重新提供 config/google_token.json。" + }) + self._set_error(status["kind"], status["message"]) + return status + @staticmethod def _escape_query_value(value: str) -> str: return value.replace("\\", "\\\\").replace("'", "\\'") @@ -109,56 +205,32 @@ class GoogleDriveService: "請重新執行認證流程以產生新 token,舊 pickle 檔案不會被自動刪除。" ) - # 檢查是否已有 token - if os.path.exists(TOKEN_FILE): - with open(TOKEN_FILE, 'r') as token: - token_data = json.load(token) - self.credentials = Credentials.from_authorized_user_info(token_data, SCOPES) - - # 如果沒有有效憑證,進行認證流程 - if not self.credentials or not self.credentials.valid: - if self.credentials and self.credentials.expired and self.credentials.refresh_token: - # 嘗試刷新 token - logger.info("刷新 Google Drive token...") - self.credentials.refresh(Request()) - else: - # 需要重新認證 - if not os.path.exists(CREDENTIALS_FILE): - error_message = f"找不到認證檔案: {CREDENTIALS_FILE}" - self._set_error("authentication_failed", error_message) - logger.error(error_message) - return False - - if not _interactive_auth_allowed(): - if os.path.exists(_LEGACY_PICKLE_FILE) and not os.path.exists(TOKEN_FILE): - error_message = ( - "偵測到舊版 Google Drive 授權檔 config/google_token.pickle," - "但正式排程只讀 config/google_token.json。請先執行一次性授權檔轉換," - "再讓自動匯入任務重跑。" - ) - else: - error_message = ( - "Google Drive 需要重新授權,但背景排程不可啟動瀏覽器。" - "請在可互動環境完成 OAuth,或提供 config/google_token.json 後再重跑。" - ) - self._set_error("reauthorization_required", error_message) - logger.error(error_message) - return False - - logger.info("進行 Google Drive 認證...") - flow = InstalledAppFlow.from_client_secrets_file( - CREDENTIALS_FILE, SCOPES - ) - # 即使是人工授權,也只印授權 URL,不在伺服器/容器內自動尋找瀏覽器。 - self.credentials = flow.run_local_server( - open_browser=False, - timeout_seconds=_interactive_auth_timeout_seconds(), - authorization_prompt_message=( - "請在可登入 Google 的瀏覽器開啟以下網址完成授權:{url}" - ), + readiness = self.check_auth_readiness(refresh_expired=True) + if not readiness.get("ready"): + if os.path.exists(_LEGACY_PICKLE_FILE) and not os.path.exists(TOKEN_FILE): + error_message = ( + "偵測到舊版 Google Drive 授權檔 config/google_token.pickle," + "但正式排程只讀 config/google_token.json。請先執行一次性授權檔轉換," + "再讓自動匯入任務重跑。" ) + self._set_error("reauthorization_required", error_message) + logger.error(error_message) + return False + + if not _interactive_auth_allowed(): + logger.error(readiness.get("message") or "Google Drive 授權不可用。") + return False + + logger.info("進行 Google Drive 認證...") + flow = InstalledAppFlow.from_client_secrets_file(CREDENTIALS_FILE, SCOPES) + self.credentials = flow.run_local_server( + open_browser=False, + timeout_seconds=_interactive_auth_timeout_seconds(), + authorization_prompt_message=( + "請在可登入 Google 的瀏覽器開啟以下網址完成授權:{url}" + ), + ) - # 儲存憑證供下次使用(JSON 格式,安全無 RCE 風險) if not self._save_credentials(): return False diff --git a/services/import_service.py b/services/import_service.py index f386e22..176685c 100644 --- a/services/import_service.py +++ b/services/import_service.py @@ -56,9 +56,15 @@ def humanize_import_error(message: Any) -> str: return "" lowered = raw.lower() - if "could not locate runnable browser" in lowered or "reauthorization_required" in lowered: + if ( + "could not locate runnable browser" in lowered + or "reauthorization_required" in lowered + or "授權檔不存在" in raw + or "缺少可刷新" in raw + or "refresh token" in lowered + ): return "Google Drive 授權需要重新確認;請重新完成雲端授權後再執行匯入。" - if "token_store_failed" in lowered or ("permission" in lowered and "google" in lowered): + if "token_store_failed" in lowered or "不可寫" in raw or ("permission" in lowered and "google" in lowered): return "Google Drive 授權無法穩定保存;請通知維護人員確認主機授權檔權限,避免重啟後再次失效。" if "google drive" in lowered or "credentials" in lowered or "authenticate" in lowered or "token" in lowered: return "Google Drive 連線或授權異常;請確認雲端資料夾權限後再執行匯入。" diff --git a/tests/test_google_drive_auth.py b/tests/test_google_drive_auth.py index 9251b1a..89e092e 100644 --- a/tests/test_google_drive_auth.py +++ b/tests/test_google_drive_auth.py @@ -10,58 +10,63 @@ import os # 確保可以導入專案模組 sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) -print("=" * 60) -print("Google Drive OAuth 2.0 認證") -print("=" * 60) -print() -print("請確認以下檔案存在:") -print(" 📁 config/google_credentials.json") -print() - -# 檢查憑證檔案 -if not os.path.exists('config/google_credentials.json'): - print("❌ 找不到憑證檔案!") +def main(): + print("=" * 60) + print("Google Drive OAuth 2.0 認證") + print("=" * 60) print() - print("請依照以下步驟設定:") - print("1. 前往 Google Cloud Console 建立 OAuth 2.0 憑證") - print("2. 下載 JSON 檔案") - print("3. 重新命名為 google_credentials.json") - print("4. 放到 config/ 目錄中") + print("請確認以下檔案存在:") + print(" 📁 config/google_credentials.json") print() - print("詳細說明請參考: GOOGLE_DRIVE_SETUP.md") - sys.exit(1) -print("✅ 找到憑證檔案") -print() -print("正在啟動 OAuth 2.0 認證流程...") -print("終端機會顯示授權網址,請在可登入 Google 的瀏覽器開啟後完成授權。") -print() -os.environ["GOOGLE_DRIVE_ALLOW_INTERACTIVE_AUTH"] = "true" + if not os.path.exists('config/google_credentials.json'): + print("❌ 找不到憑證檔案!") + print() + print("請依照以下步驟設定:") + print("1. 前往 Google Cloud Console 建立 OAuth 2.0 憑證") + print("2. 下載 JSON 檔案") + print("3. 重新命名為 google_credentials.json") + print("4. 放到 config/ 目錄中") + print() + print("詳細說明請參考: GOOGLE_DRIVE_SETUP.md") + return 1 -try: - from services.google_drive_service import drive_service + print("✅ 找到憑證檔案") + print() + print("正在啟動 OAuth 2.0 認證流程...") + print("終端機會顯示授權網址,請在可登入 Google 的瀏覽器開啟後完成授權。") + print() + os.environ["GOOGLE_DRIVE_ALLOW_INTERACTIVE_AUTH"] = "true" + + try: + from services.google_drive_service import drive_service + + if drive_service.authenticate(): + print() + print("=" * 60) + print("✅ 認證成功!") + print("=" * 60) + print() + print("Token 已儲存至: config/google_token.json") + print() + print("現在可以:") + print(" 1. 在網頁介面測試連接: http://localhost/auto_import") + print(" 2. 執行測試腳本: python3 test_google_drive.py") + print() + return 0 - if drive_service.authenticate(): - print() - print("=" * 60) - print("✅ 認證成功!") - print("=" * 60) - print() - print("Token 已儲存至: config/google_token.json") - print() - print("現在可以:") - print(" 1. 在網頁介面測試連接: http://localhost/auto_import") - print(" 2. 執行測試腳本: python3 test_google_drive.py") - print() - else: print() print("❌ 認證失敗") print("請檢查憑證檔案是否正確") - sys.exit(1) + return 1 -except Exception as e: - print() - print(f"❌ 認證過程發生錯誤: {str(e)}") - import traceback - traceback.print_exc() - sys.exit(1) + except Exception as e: + print() + print(f"❌ 認證過程發生錯誤: {str(e)}") + import traceback + traceback.print_exc() + return 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/tests/test_google_drive_runtime_auth.py b/tests/test_google_drive_runtime_auth.py new file mode 100644 index 0000000..7ea79a8 --- /dev/null +++ b/tests/test_google_drive_runtime_auth.py @@ -0,0 +1,98 @@ +import json +import types + + +def _configure_paths(monkeypatch, gds, tmp_path): + credentials_file = tmp_path / "google_credentials.json" + token_file = tmp_path / "google_token.json" + legacy_file = tmp_path / "google_token.pickle" + credentials_file.write_text("{}", encoding="utf-8") + monkeypatch.setattr(gds, "CREDENTIALS_FILE", str(credentials_file)) + monkeypatch.setattr(gds, "TOKEN_FILE", str(token_file)) + monkeypatch.setattr(gds, "_LEGACY_PICKLE_FILE", str(legacy_file)) + return credentials_file, token_file, legacy_file + + +def test_production_runtime_never_starts_interactive_drive_auth(monkeypatch, tmp_path): + import services.google_drive_service as gds + + _configure_paths(monkeypatch, gds, tmp_path) + monkeypatch.setenv("GOOGLE_DRIVE_ALLOW_INTERACTIVE_AUTH", "true") + monkeypatch.setenv("FLASK_ENV", "production") + + class ExplodingFlow: + @staticmethod + def from_client_secrets_file(*_args, **_kwargs): + raise AssertionError("browser OAuth must not run in production") + + monkeypatch.setattr(gds, "InstalledAppFlow", ExplodingFlow) + + service = gds.GoogleDriveService() + + assert service.authenticate() is False + assert service.last_error_kind == "reauthorization_required" + assert "授權檔不存在" in service.last_error + + +def test_drive_auth_readiness_refreshes_and_persists_json_token(monkeypatch, tmp_path): + import services.google_drive_service as gds + + _, token_file, _ = _configure_paths(monkeypatch, gds, tmp_path) + token_file.write_text(json.dumps({"token": "old"}), encoding="utf-8") + + class FakeCredentials: + valid = False + expired = True + refresh_token = "refresh-token" + + def refresh(self, _request): + self.valid = True + self.expired = False + + def to_json(self): + return json.dumps({"token": "new-token", "refresh_token": self.refresh_token}) + + fake_credentials = FakeCredentials() + monkeypatch.setattr( + gds, + "Credentials", + types.SimpleNamespace( + from_authorized_user_info=lambda _token_data, _scopes: fake_credentials + ), + ) + + service = gds.GoogleDriveService() + status = service.check_auth_readiness(refresh_expired=True) + + assert status["ready"] is True + assert status["kind"] == "ready" + assert service.last_error_kind is None + assert json.loads(token_file.read_text(encoding="utf-8"))["token"] == "new-token" + + +def test_drive_auth_readiness_requires_refresh_token(monkeypatch, tmp_path): + import services.google_drive_service as gds + + _, token_file, _ = _configure_paths(monkeypatch, gds, tmp_path) + token_file.write_text(json.dumps({"token": "stale"}), encoding="utf-8") + + class FakeCredentials: + valid = False + expired = False + refresh_token = None + + monkeypatch.setattr( + gds, + "Credentials", + types.SimpleNamespace( + from_authorized_user_info=lambda _token_data, _scopes: FakeCredentials() + ), + ) + + service = gds.GoogleDriveService() + status = service.check_auth_readiness(refresh_expired=True) + + assert status["ready"] is False + assert status["kind"] == "reauthorization_required" + assert service.last_error_kind == "reauthorization_required" + assert "缺少可刷新憑證" in service.last_error