fix: harden google drive import auth
Some checks are pending
CD Pipeline / deploy (push) Waiting to run

This commit is contained in:
ogt
2026-06-27 20:31:34 +08:00
parent 90e44a8f8a
commit f3e412cd21
8 changed files with 301 additions and 101 deletions

View File

@@ -402,7 +402,7 @@ YOUTUBE_API_KEY = os.getenv('YOUTUBE_API_KEY', '')
# ==========================================
# 系統版本與路徑
# ==========================================
SYSTEM_VERSION = "V10.724"
SYSTEM_VERSION = "V10.725"
LOG_FILE_PATH = os.path.join(BASE_DIR, 'logs/system.log')
public_url = PUBLIC_URL # 用於模板顯示

View File

@@ -2,7 +2,7 @@
> **最後更新**: 2026-06-27 (台北時間)
> **狀態**: 🟢 四 AI Agent 自動化閉環已落地LLM 路由紅線升級為 Ollama-first 三主機級聯PChome 後台業績匯入韌性已補強產品定位正名為「PChome 業績成長自動化作戰系統」外部市場來源正規化層、自動同步、作戰清單與價格參考表優先讀取、CSV 備援預檢、前台操作入口、高可見頁面繁中化守門、比價/作戰 UI 工作台化、跨平台來源治理與商品身份 UI 契約已建立GCP embedding 熔斷延後處理、110 proxy rescue 與 direct host health skip 已建立
> **適用版本**: V10.724
> **適用版本**: V10.725
---
@@ -809,3 +809,4 @@ POSTGRES_HOST=momo-db
| 2026-06-27 | 服務更新監控頁不得以內部工具名當主語 | V10.722 起 `/cicd` 可見文字使用「測試站、正式站、監控圖表、自動化流程、今日更新」;前台模板不得用 `issue.error_log` 判斷顯示診斷資料,也不得顯示 `UAT 狀態``PROD 狀態``Grafana``n8n` 作為主按鈕文字。 |
| 2026-06-27 | 系統事件頁不得顯示或下載原始工程紀錄 | V10.723 起 `/logs` 改為「系統事件紀錄」,前台只顯示事件等級、營運判讀與建議處置;不得以 raw log 變數、舊下載檔名或「系統日誌/下載日誌」作為使用者可見介面。 |
| 2026-06-27 | AI 挑品必須直接顯示商品證據與賣場入口 | V10.724 起商品看板 AI 挑品清單會在 selection 階段合併最新 PChome 同款證據,卡片需顯示 MOMO 商品ID、PChome 商品ID、同款信心與兩邊賣場入口商品圖需使用 PChome 圖片作為 fallback不得只給一段建議理由。 |
| 2026-06-27 | Google Drive 自動匯入不得在背景排程啟動瀏覽器授權 | V10.725 起正式容器與排程服務即使誤設 `GOOGLE_DRIVE_ALLOW_INTERACTIVE_AUTH=true` 也會硬阻擋互動 OAuth匯入前會檢查 JSON token 是否存在、可刷新且可寫回Drive 測試與檔案列表 API 會 fail-closed 回傳可處理的授權狀態。 |

View File

@@ -132,14 +132,21 @@ def test_drive_connection():
try:
# 嘗試認證
if drive_service.authenticate():
readiness = drive_service.check_auth_readiness(refresh_expired=False)
return jsonify({
'success': True,
'message': 'Google Drive 連接成功'
'message': 'Google Drive 連接成功',
'auth_ready': True,
'auth_status': readiness.get('kind') or 'ready',
})
else:
error_kind = getattr(drive_service, 'last_error_kind', None) or 'authentication_failed'
error_message = getattr(drive_service, 'last_error', None) or error_kind
return jsonify({
'success': False,
'message': 'Google Drive 認證失敗'
'message': humanize_import_error(error_message) or 'Google Drive 認證失敗,請重新確認雲端授權。',
'auth_ready': False,
'auth_status': error_kind,
}), 400
except Exception as e:
@@ -159,6 +166,16 @@ def list_drive_files():
file_pattern = data.get('file_pattern', '即時業績_當日')
files = drive_service.list_files_in_folder(folder_path, file_pattern)
error_kind = getattr(drive_service, 'last_error_kind', None)
if error_kind:
error_message = getattr(drive_service, 'last_error', None) or error_kind
return jsonify({
'success': False,
'message': humanize_import_error(error_message) or 'Google Drive 連線或授權異常,無法確認檔案清單。',
'auth_status': error_kind,
'data': [],
'count': 0,
}), 400
return jsonify({
'success': True,

View File

@@ -91,6 +91,7 @@ _PUBLIC_LOG_RAW_ERROR_MARKERS = (
'httpconnectionpool',
'all 3 hosts failed',
'multimodal data provided',
'could not locate runnable browser',
'traceback',
'secret',
'api_key',

View File

@@ -32,11 +32,21 @@ TOKEN_FILE = os.getenv('GOOGLE_DRIVE_TOKEN_FILE', 'config/google_token.json')
_LEGACY_PICKLE_FILE = os.getenv('GOOGLE_DRIVE_LEGACY_PICKLE_FILE', 'config/google_token.pickle')
INTERACTIVE_AUTH_ENV = 'GOOGLE_DRIVE_ALLOW_INTERACTIVE_AUTH'
INTERACTIVE_AUTH_TIMEOUT_ENV = 'GOOGLE_DRIVE_INTERACTIVE_AUTH_TIMEOUT_SECONDS'
NONINTERACTIVE_RUNTIME_ENV = 'GOOGLE_DRIVE_NONINTERACTIVE_RUNTIME'
def _interactive_auth_allowed() -> bool:
"""Background jobs must not try to open a browser inside containers."""
return os.getenv(INTERACTIVE_AUTH_ENV, "").strip().lower() in {"1", "true", "yes", "on"}
requested = os.getenv(INTERACTIVE_AUTH_ENV, "").strip().lower() in {"1", "true", "yes", "on"}
if not requested:
return False
noninteractive = (
os.getenv(NONINTERACTIVE_RUNTIME_ENV, "").strip().lower() in {"1", "true", "yes", "on"}
or os.getenv("FLASK_ENV", "").strip().lower() == "production"
or os.getenv("KUBERNETES_SERVICE_HOST")
or os.path.exists("/.dockerenv")
)
return not noninteractive
def _interactive_auth_timeout_seconds() -> int:
@@ -89,6 +99,92 @@ class GoogleDriveService:
logger.error(error_message)
return False
def check_auth_readiness(self, refresh_expired: bool = True) -> Dict[str, Any]:
"""Validate persisted Google Drive credentials without starting browser OAuth."""
self._clear_error()
status = {
"ready": False,
"kind": None,
"message": "",
"credentials_file_exists": os.path.exists(CREDENTIALS_FILE),
"token_file_exists": os.path.exists(TOKEN_FILE),
"token_file_writable": os.access(TOKEN_FILE, os.W_OK) if os.path.exists(TOKEN_FILE) else False,
"token_dir_writable": os.access(os.path.dirname(TOKEN_FILE) or ".", os.W_OK),
"has_refresh_token": False,
"interactive_auth_allowed": _interactive_auth_allowed(),
}
if not status["credentials_file_exists"]:
status.update({
"kind": "credentials_missing",
"message": "找不到 Google Drive 憑證檔,請確認雲端匯入設定。"
})
self._set_error(status["kind"], status["message"])
return status
if not status["token_file_exists"]:
status.update({
"kind": "reauthorization_required",
"message": "Google Drive 授權檔不存在;背景排程不可啟動瀏覽器,請先提供 config/google_token.json。"
})
self._set_error(status["kind"], status["message"])
return status
try:
with open(TOKEN_FILE, 'r') as token:
token_data = json.load(token)
self.credentials = Credentials.from_authorized_user_info(token_data, SCOPES)
status["has_refresh_token"] = bool(getattr(self.credentials, "refresh_token", None))
except Exception as exc:
status.update({
"kind": "invalid_token_file",
"message": "Google Drive 授權檔無法讀取;請重新提供有效的 JSON 授權檔。"
})
self._set_error(status["kind"], f"{status['message']} 原始錯誤:{exc}")
return status
if not status["token_file_writable"] or not status["token_dir_writable"]:
status.update({
"kind": "token_store_failed",
"message": "Google Drive 授權檔或設定目錄不可寫,重啟或刷新後可能再次失效。"
})
self._set_error(status["kind"], status["message"])
return status
if self.credentials and self.credentials.valid:
status.update({"ready": True, "kind": "ready", "message": "Google Drive 授權可用。"})
self._clear_error()
return status
if self.credentials and self.credentials.expired and self.credentials.refresh_token:
if not refresh_expired:
status.update({"kind": "token_expired", "message": "Google Drive 授權已過期,等待刷新。"})
self._set_error(status["kind"], status["message"])
return status
try:
logger.info("刷新 Google Drive token...")
self.credentials.refresh(Request())
except Exception as exc:
status.update({
"kind": "reauthorization_required",
"message": "Google Drive refresh token 已失效;請重新完成雲端授權。"
})
self._set_error(status["kind"], f"{status['message']} 原始錯誤:{exc}")
return status
if not self._save_credentials():
status.update({"kind": self.last_error_kind, "message": self.last_error or ""})
return status
status.update({"ready": True, "kind": "ready", "message": "Google Drive 授權已刷新並可用。"})
self._clear_error()
return status
status.update({
"kind": "reauthorization_required",
"message": "Google Drive 授權缺少可刷新憑證;背景排程不可啟動瀏覽器,請重新提供 config/google_token.json。"
})
self._set_error(status["kind"], status["message"])
return status
@staticmethod
def _escape_query_value(value: str) -> str:
return value.replace("\\", "\\\\").replace("'", "\\'")
@@ -109,56 +205,32 @@ class GoogleDriveService:
"請重新執行認證流程以產生新 token舊 pickle 檔案不會被自動刪除。"
)
# 檢查是否已有 token
if os.path.exists(TOKEN_FILE):
with open(TOKEN_FILE, 'r') as token:
token_data = json.load(token)
self.credentials = Credentials.from_authorized_user_info(token_data, SCOPES)
# 如果沒有有效憑證,進行認證流程
if not self.credentials or not self.credentials.valid:
if self.credentials and self.credentials.expired and self.credentials.refresh_token:
# 嘗試刷新 token
logger.info("刷新 Google Drive token...")
self.credentials.refresh(Request())
else:
# 需要重新認證
if not os.path.exists(CREDENTIALS_FILE):
error_message = f"找不到認證檔案: {CREDENTIALS_FILE}"
self._set_error("authentication_failed", error_message)
logger.error(error_message)
return False
if not _interactive_auth_allowed():
if os.path.exists(_LEGACY_PICKLE_FILE) and not os.path.exists(TOKEN_FILE):
error_message = (
"偵測到舊版 Google Drive 授權檔 config/google_token.pickle"
"但正式排程只讀 config/google_token.json。請先執行一次性授權檔轉換"
"再讓自動匯入任務重跑。"
)
else:
error_message = (
"Google Drive 需要重新授權,但背景排程不可啟動瀏覽器。"
"請在可互動環境完成 OAuth或提供 config/google_token.json 後再重跑。"
)
self._set_error("reauthorization_required", error_message)
logger.error(error_message)
return False
logger.info("進行 Google Drive 認證...")
flow = InstalledAppFlow.from_client_secrets_file(
CREDENTIALS_FILE, SCOPES
)
# 即使是人工授權,也只印授權 URL不在伺服器/容器內自動尋找瀏覽器。
self.credentials = flow.run_local_server(
open_browser=False,
timeout_seconds=_interactive_auth_timeout_seconds(),
authorization_prompt_message=(
"請在可登入 Google 的瀏覽器開啟以下網址完成授權:{url}"
),
readiness = self.check_auth_readiness(refresh_expired=True)
if not readiness.get("ready"):
if os.path.exists(_LEGACY_PICKLE_FILE) and not os.path.exists(TOKEN_FILE):
error_message = (
"偵測到舊版 Google Drive 授權檔 config/google_token.pickle"
"但正式排程只讀 config/google_token.json。請先執行一次性授權檔轉換"
"再讓自動匯入任務重跑。"
)
self._set_error("reauthorization_required", error_message)
logger.error(error_message)
return False
if not _interactive_auth_allowed():
logger.error(readiness.get("message") or "Google Drive 授權不可用。")
return False
logger.info("進行 Google Drive 認證...")
flow = InstalledAppFlow.from_client_secrets_file(CREDENTIALS_FILE, SCOPES)
self.credentials = flow.run_local_server(
open_browser=False,
timeout_seconds=_interactive_auth_timeout_seconds(),
authorization_prompt_message=(
"請在可登入 Google 的瀏覽器開啟以下網址完成授權:{url}"
),
)
# 儲存憑證供下次使用JSON 格式,安全無 RCE 風險)
if not self._save_credentials():
return False

View File

@@ -56,9 +56,15 @@ def humanize_import_error(message: Any) -> str:
return ""
lowered = raw.lower()
if "could not locate runnable browser" in lowered or "reauthorization_required" in lowered:
if (
"could not locate runnable browser" in lowered
or "reauthorization_required" in lowered
or "授權檔不存在" in raw
or "缺少可刷新" in raw
or "refresh token" in lowered
):
return "Google Drive 授權需要重新確認;請重新完成雲端授權後再執行匯入。"
if "token_store_failed" in lowered or ("permission" in lowered and "google" in lowered):
if "token_store_failed" in lowered or "不可寫" in raw or ("permission" in lowered and "google" in lowered):
return "Google Drive 授權無法穩定保存;請通知維護人員確認主機授權檔權限,避免重啟後再次失效。"
if "google drive" in lowered or "credentials" in lowered or "authenticate" in lowered or "token" in lowered:
return "Google Drive 連線或授權異常;請確認雲端資料夾權限後再執行匯入。"

View File

@@ -10,58 +10,63 @@ import os
# 確保可以導入專案模組
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
print("=" * 60)
print("Google Drive OAuth 2.0 認證")
print("=" * 60)
print()
print("請確認以下檔案存在:")
print(" 📁 config/google_credentials.json")
print()
# 檢查憑證檔案
if not os.path.exists('config/google_credentials.json'):
print("❌ 找不到憑證檔案!")
def main():
print("=" * 60)
print("Google Drive OAuth 2.0 認證")
print("=" * 60)
print()
print("依照以下步驟設定:")
print("1. 前往 Google Cloud Console 建立 OAuth 2.0 憑證")
print("2. 下載 JSON 檔案")
print("3. 重新命名為 google_credentials.json")
print("4. 放到 config/ 目錄中")
print("確認以下檔案存在:")
print(" 📁 config/google_credentials.json")
print()
print("詳細說明請參考: GOOGLE_DRIVE_SETUP.md")
sys.exit(1)
print("✅ 找到憑證檔案")
print()
print("正在啟動 OAuth 2.0 認證流程...")
print("終端機會顯示授權網址,請在可登入 Google 的瀏覽器開啟後完成授權。")
print()
os.environ["GOOGLE_DRIVE_ALLOW_INTERACTIVE_AUTH"] = "true"
if not os.path.exists('config/google_credentials.json'):
print("❌ 找不到憑證檔案!")
print()
print("請依照以下步驟設定:")
print("1. 前往 Google Cloud Console 建立 OAuth 2.0 憑證")
print("2. 下載 JSON 檔案")
print("3. 重新命名為 google_credentials.json")
print("4. 放到 config/ 目錄中")
print()
print("詳細說明請參考: GOOGLE_DRIVE_SETUP.md")
return 1
try:
from services.google_drive_service import drive_service
print("✅ 找到憑證檔案")
print()
print("正在啟動 OAuth 2.0 認證流程...")
print("終端機會顯示授權網址,請在可登入 Google 的瀏覽器開啟後完成授權。")
print()
os.environ["GOOGLE_DRIVE_ALLOW_INTERACTIVE_AUTH"] = "true"
try:
from services.google_drive_service import drive_service
if drive_service.authenticate():
print()
print("=" * 60)
print("✅ 認證成功!")
print("=" * 60)
print()
print("Token 已儲存至: config/google_token.json")
print()
print("現在可以:")
print(" 1. 在網頁介面測試連接: http://localhost/auto_import")
print(" 2. 執行測試腳本: python3 test_google_drive.py")
print()
return 0
if drive_service.authenticate():
print()
print("=" * 60)
print("✅ 認證成功!")
print("=" * 60)
print()
print("Token 已儲存至: config/google_token.json")
print()
print("現在可以:")
print(" 1. 在網頁介面測試連接: http://localhost/auto_import")
print(" 2. 執行測試腳本: python3 test_google_drive.py")
print()
else:
print()
print("❌ 認證失敗")
print("請檢查憑證檔案是否正確")
sys.exit(1)
return 1
except Exception as e:
print()
print(f"❌ 認證過程發生錯誤: {str(e)}")
import traceback
traceback.print_exc()
sys.exit(1)
except Exception as e:
print()
print(f"❌ 認證過程發生錯誤: {str(e)}")
import traceback
traceback.print_exc()
return 1
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,98 @@
import json
import types
def _configure_paths(monkeypatch, gds, tmp_path):
credentials_file = tmp_path / "google_credentials.json"
token_file = tmp_path / "google_token.json"
legacy_file = tmp_path / "google_token.pickle"
credentials_file.write_text("{}", encoding="utf-8")
monkeypatch.setattr(gds, "CREDENTIALS_FILE", str(credentials_file))
monkeypatch.setattr(gds, "TOKEN_FILE", str(token_file))
monkeypatch.setattr(gds, "_LEGACY_PICKLE_FILE", str(legacy_file))
return credentials_file, token_file, legacy_file
def test_production_runtime_never_starts_interactive_drive_auth(monkeypatch, tmp_path):
import services.google_drive_service as gds
_configure_paths(monkeypatch, gds, tmp_path)
monkeypatch.setenv("GOOGLE_DRIVE_ALLOW_INTERACTIVE_AUTH", "true")
monkeypatch.setenv("FLASK_ENV", "production")
class ExplodingFlow:
@staticmethod
def from_client_secrets_file(*_args, **_kwargs):
raise AssertionError("browser OAuth must not run in production")
monkeypatch.setattr(gds, "InstalledAppFlow", ExplodingFlow)
service = gds.GoogleDriveService()
assert service.authenticate() is False
assert service.last_error_kind == "reauthorization_required"
assert "授權檔不存在" in service.last_error
def test_drive_auth_readiness_refreshes_and_persists_json_token(monkeypatch, tmp_path):
import services.google_drive_service as gds
_, token_file, _ = _configure_paths(monkeypatch, gds, tmp_path)
token_file.write_text(json.dumps({"token": "old"}), encoding="utf-8")
class FakeCredentials:
valid = False
expired = True
refresh_token = "refresh-token"
def refresh(self, _request):
self.valid = True
self.expired = False
def to_json(self):
return json.dumps({"token": "new-token", "refresh_token": self.refresh_token})
fake_credentials = FakeCredentials()
monkeypatch.setattr(
gds,
"Credentials",
types.SimpleNamespace(
from_authorized_user_info=lambda _token_data, _scopes: fake_credentials
),
)
service = gds.GoogleDriveService()
status = service.check_auth_readiness(refresh_expired=True)
assert status["ready"] is True
assert status["kind"] == "ready"
assert service.last_error_kind is None
assert json.loads(token_file.read_text(encoding="utf-8"))["token"] == "new-token"
def test_drive_auth_readiness_requires_refresh_token(monkeypatch, tmp_path):
import services.google_drive_service as gds
_, token_file, _ = _configure_paths(monkeypatch, gds, tmp_path)
token_file.write_text(json.dumps({"token": "stale"}), encoding="utf-8")
class FakeCredentials:
valid = False
expired = False
refresh_token = None
monkeypatch.setattr(
gds,
"Credentials",
types.SimpleNamespace(
from_authorized_user_info=lambda _token_data, _scopes: FakeCredentials()
),
)
service = gds.GoogleDriveService()
status = service.check_auth_readiness(refresh_expired=True)
assert status["ready"] is False
assert status["kind"] == "reauthorization_required"
assert service.last_error_kind == "reauthorization_required"
assert "缺少可刷新憑證" in service.last_error