475 lines
17 KiB
Python
475 lines
17 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
Google Drive 服務模組
|
||
負責與 Google Drive API 互動
|
||
"""
|
||
|
||
import os
|
||
import io
|
||
import json
|
||
import logging
|
||
from typing import List, Optional, Dict, Any
|
||
from datetime import datetime
|
||
|
||
from google.auth.transport.requests import Request
|
||
from google.oauth2.credentials import Credentials
|
||
from google_auth_oauthlib.flow import InstalledAppFlow
|
||
from googleapiclient.discovery import build
|
||
from googleapiclient.http import MediaIoBaseDownload
|
||
from googleapiclient.errors import HttpError
|
||
|
||
# 設定日誌
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# Google Drive API 權限範圍
|
||
SCOPES = ['https://www.googleapis.com/auth/drive']
|
||
|
||
# 認證檔案路徑
|
||
CREDENTIALS_FILE = 'config/google_credentials.json'
|
||
TOKEN_FILE = 'config/google_token.json'
|
||
_LEGACY_PICKLE_FILE = 'config/google_token.pickle'
|
||
INTERACTIVE_AUTH_ENV = 'GOOGLE_DRIVE_ALLOW_INTERACTIVE_AUTH'
|
||
|
||
|
||
def _interactive_auth_allowed() -> bool:
|
||
"""Background jobs must not try to open a browser inside containers."""
|
||
return os.getenv(INTERACTIVE_AUTH_ENV, "").strip().lower() in {"1", "true", "yes", "on"}
|
||
|
||
|
||
class GoogleDriveService:
|
||
"""Google Drive 服務類別"""
|
||
|
||
def __init__(self):
|
||
"""初始化 Google Drive 服務"""
|
||
self.service = None
|
||
self.credentials = None
|
||
self.last_error = None
|
||
self.last_error_kind = None
|
||
|
||
def _clear_error(self) -> None:
|
||
self.last_error = None
|
||
self.last_error_kind = None
|
||
|
||
def _set_error(self, kind: str, message: str) -> None:
|
||
self.last_error_kind = kind
|
||
self.last_error = str(message)[:500]
|
||
|
||
def _save_credentials(self) -> bool:
|
||
try:
|
||
token_dir = os.path.dirname(TOKEN_FILE)
|
||
if token_dir:
|
||
os.makedirs(token_dir, exist_ok=True)
|
||
tmp_file = f"{TOKEN_FILE}.tmp"
|
||
with open(tmp_file, 'w') as token:
|
||
token.write(self.credentials.to_json())
|
||
try:
|
||
os.chmod(tmp_file, 0o600)
|
||
except OSError:
|
||
logger.debug("Google Drive token 暫存檔權限調整失敗", exc_info=True)
|
||
os.replace(tmp_file, TOKEN_FILE)
|
||
logger.info(f"憑證已儲存到: {TOKEN_FILE}")
|
||
return True
|
||
except Exception as exc:
|
||
error_message = (
|
||
f"Google Drive 授權可用但無法寫回 {TOKEN_FILE},"
|
||
"主機重啟後自動匯入會再次失敗。請修復正式 config 掛載目錄寫入權限。"
|
||
f"原始錯誤:{exc}"
|
||
)
|
||
self._set_error("token_store_failed", error_message)
|
||
logger.error(error_message)
|
||
return False
|
||
|
||
@staticmethod
|
||
def _escape_query_value(value: str) -> str:
|
||
return value.replace("\\", "\\\\").replace("'", "\\'")
|
||
|
||
def authenticate(self) -> bool:
|
||
"""
|
||
進行 Google Drive 認證
|
||
|
||
Returns:
|
||
bool: 認證是否成功
|
||
"""
|
||
try:
|
||
self._clear_error()
|
||
# 舊版 pickle token 遷移提示(不自動刪除舊檔)
|
||
if os.path.exists(_LEGACY_PICKLE_FILE) and not os.path.exists(TOKEN_FILE):
|
||
logger.warning(
|
||
"[GoogleDrive] 偵測到舊版 token.pickle,已改用 JSON 格式。"
|
||
"請重新執行認證流程以產生新 token,舊 pickle 檔案不會被自動刪除。"
|
||
)
|
||
|
||
# 檢查是否已有 token
|
||
if os.path.exists(TOKEN_FILE):
|
||
with open(TOKEN_FILE, 'r') as token:
|
||
token_data = json.load(token)
|
||
self.credentials = Credentials.from_authorized_user_info(token_data, SCOPES)
|
||
|
||
# 如果沒有有效憑證,進行認證流程
|
||
if not self.credentials or not self.credentials.valid:
|
||
if self.credentials and self.credentials.expired and self.credentials.refresh_token:
|
||
# 嘗試刷新 token
|
||
logger.info("刷新 Google Drive token...")
|
||
self.credentials.refresh(Request())
|
||
else:
|
||
# 需要重新認證
|
||
if not os.path.exists(CREDENTIALS_FILE):
|
||
error_message = f"找不到認證檔案: {CREDENTIALS_FILE}"
|
||
self._set_error("authentication_failed", error_message)
|
||
logger.error(error_message)
|
||
return False
|
||
|
||
if not _interactive_auth_allowed():
|
||
if os.path.exists(_LEGACY_PICKLE_FILE) and not os.path.exists(TOKEN_FILE):
|
||
error_message = (
|
||
"偵測到舊版 Google Drive 授權檔 config/google_token.pickle,"
|
||
"但正式排程只讀 config/google_token.json。請先執行一次性授權檔轉換,"
|
||
"再讓自動匯入任務重跑。"
|
||
)
|
||
else:
|
||
error_message = (
|
||
"Google Drive 需要重新授權,但背景排程不可啟動瀏覽器。"
|
||
"請在可互動環境完成 OAuth,或提供 config/google_token.json 後再重跑。"
|
||
)
|
||
self._set_error("reauthorization_required", error_message)
|
||
logger.error(error_message)
|
||
return False
|
||
|
||
logger.info("進行 Google Drive 認證...")
|
||
flow = InstalledAppFlow.from_client_secrets_file(
|
||
CREDENTIALS_FILE, SCOPES
|
||
)
|
||
# 對於「電腦版應用程式」類型,使用預設行為讓 Google 自動選擇埠號
|
||
self.credentials = flow.run_local_server()
|
||
|
||
# 儲存憑證供下次使用(JSON 格式,安全無 RCE 風險)
|
||
if not self._save_credentials():
|
||
return False
|
||
|
||
# 建立 Drive API 服務
|
||
self.service = build('drive', 'v3', credentials=self.credentials)
|
||
self._clear_error()
|
||
logger.info("Google Drive 服務已連接")
|
||
return True
|
||
|
||
except Exception as e:
|
||
self._set_error("authentication_failed", str(e))
|
||
logger.error(f"Google Drive 認證失敗: {str(e)}")
|
||
return False
|
||
|
||
def list_files_in_folder(self, folder_path: str, file_pattern: str = None) -> List[Dict[str, Any]]:
|
||
"""
|
||
列出指定資料夾中的檔案
|
||
|
||
Args:
|
||
folder_path: Google Drive 資料夾路徑(如: "業績報表/當日業績")
|
||
file_pattern: 檔案名稱模式(如: "即時業績_當日")
|
||
|
||
Returns:
|
||
List[Dict]: 檔案清單,每個檔案包含 id, name, mimeType, modifiedTime
|
||
"""
|
||
if not self.service:
|
||
if not self.authenticate():
|
||
return []
|
||
else:
|
||
self._clear_error()
|
||
|
||
try:
|
||
# 首先找到資料夾 ID
|
||
folder_id = self._get_folder_id_by_path(folder_path)
|
||
if not folder_id:
|
||
logger.warning(f"找不到資料夾: {folder_path}")
|
||
return []
|
||
|
||
# 建立查詢條件
|
||
query = f"'{folder_id}' in parents and trashed=false"
|
||
|
||
# 如果有指定檔案模式,加入名稱過濾
|
||
if file_pattern:
|
||
query += f" and name contains '{file_pattern}'"
|
||
|
||
# 只搜尋 Excel 檔案
|
||
query += " and (mimeType='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' or mimeType='application/vnd.ms-excel')"
|
||
|
||
# 執行查詢
|
||
results = self.service.files().list(
|
||
q=query,
|
||
spaces='drive',
|
||
fields='files(id, name, mimeType, modifiedTime, size)',
|
||
orderBy='modifiedTime desc'
|
||
).execute()
|
||
|
||
files = results.get('files', [])
|
||
self._clear_error()
|
||
logger.info(f"在 {folder_path} 找到 {len(files)} 個 Excel 檔案")
|
||
|
||
return files
|
||
|
||
except HttpError as error:
|
||
self._set_error("drive_api_failed", str(error))
|
||
logger.error(f"列出檔案時發生錯誤: {error}")
|
||
return []
|
||
|
||
def _get_folder_id_by_path(self, folder_path: str) -> Optional[str]:
|
||
"""
|
||
根據路徑取得資料夾 ID
|
||
|
||
Args:
|
||
folder_path: 資料夾路徑(如: "業績報表/當日業績"),空字串表示根目錄
|
||
|
||
Returns:
|
||
Optional[str]: 資料夾 ID,找不到則返回 None
|
||
"""
|
||
try:
|
||
# 如果路徑為空或只有斜線,直接返回根目錄
|
||
if not folder_path or folder_path.strip('/') == '':
|
||
logger.info("使用 Google Drive 根目錄")
|
||
return 'root'
|
||
|
||
# 分割路徑
|
||
path_parts = folder_path.strip('/').split('/')
|
||
|
||
# 從 root 開始搜尋
|
||
parent_id = 'root'
|
||
|
||
for folder_name in path_parts:
|
||
# 跳過空字串
|
||
if not folder_name:
|
||
continue
|
||
|
||
# 搜尋此層級的資料夾
|
||
safe_folder_name = self._escape_query_value(folder_name)
|
||
query = f"name='{safe_folder_name}' and '{parent_id}' in parents and mimeType='application/vnd.google-apps.folder' and trashed=false"
|
||
|
||
results = self.service.files().list(
|
||
q=query,
|
||
spaces='drive',
|
||
fields='files(id, name)',
|
||
pageSize=1
|
||
).execute()
|
||
|
||
folders = results.get('files', [])
|
||
|
||
if not folders:
|
||
logger.warning(f"找不到資料夾: {folder_name}")
|
||
return None
|
||
|
||
# 使用找到的資料夾作為下一層的 parent
|
||
parent_id = folders[0]['id']
|
||
|
||
return parent_id
|
||
|
||
except HttpError as error:
|
||
logger.error(f"搜尋資料夾時發生錯誤: {error}")
|
||
return None
|
||
|
||
def _ensure_folder_id_by_path(self, folder_path: str) -> Optional[str]:
|
||
"""
|
||
根據路徑取得資料夾 ID;若中途資料夾不存在則建立。
|
||
"""
|
||
try:
|
||
if not folder_path or folder_path.strip('/') == '':
|
||
return 'root'
|
||
|
||
path_parts = folder_path.strip('/').split('/')
|
||
parent_id = 'root'
|
||
|
||
for folder_name in path_parts:
|
||
if not folder_name:
|
||
continue
|
||
|
||
safe_folder_name = self._escape_query_value(folder_name)
|
||
query = f"name='{safe_folder_name}' and '{parent_id}' in parents and mimeType='application/vnd.google-apps.folder' and trashed=false"
|
||
results = self.service.files().list(
|
||
q=query,
|
||
spaces='drive',
|
||
fields='files(id, name)',
|
||
pageSize=1
|
||
).execute()
|
||
|
||
folders = results.get('files', [])
|
||
if folders:
|
||
parent_id = folders[0]['id']
|
||
continue
|
||
|
||
metadata = {
|
||
'name': folder_name,
|
||
'mimeType': 'application/vnd.google-apps.folder',
|
||
'parents': [parent_id],
|
||
}
|
||
folder = self.service.files().create(
|
||
body=metadata,
|
||
fields='id, name'
|
||
).execute()
|
||
parent_id = folder['id']
|
||
logger.info(f"已建立 Google Drive 資料夾: {folder_name}")
|
||
|
||
return parent_id
|
||
|
||
except HttpError as error:
|
||
logger.error(f"建立資料夾時發生錯誤: {error}")
|
||
return None
|
||
|
||
def download_file(self, file_id: str, destination_path: str) -> bool:
|
||
"""
|
||
下載檔案
|
||
|
||
Args:
|
||
file_id: Google Drive 檔案 ID
|
||
destination_path: 本地儲存路徑
|
||
|
||
Returns:
|
||
bool: 下載是否成功
|
||
"""
|
||
if not self.service:
|
||
if not self.authenticate():
|
||
return False
|
||
|
||
try:
|
||
# 取得檔案資訊
|
||
file = self.service.files().get(fileId=file_id).execute()
|
||
file_name = file.get('name')
|
||
|
||
logger.info(f"開始下載: {file_name}")
|
||
|
||
# 下載檔案
|
||
request = self.service.files().get_media(fileId=file_id)
|
||
|
||
fh = io.BytesIO()
|
||
downloader = MediaIoBaseDownload(fh, request)
|
||
|
||
done = False
|
||
while not done:
|
||
status, done = downloader.next_chunk()
|
||
if status:
|
||
logger.info(f"下載進度: {int(status.progress() * 100)}%")
|
||
|
||
# 儲存檔案
|
||
with open(destination_path, 'wb') as f:
|
||
f.write(fh.getvalue())
|
||
|
||
logger.info(f"檔案已下載到: {destination_path}")
|
||
return True
|
||
|
||
except HttpError as error:
|
||
logger.error(f"下載檔案時發生錯誤: {error}")
|
||
return False
|
||
except Exception as e:
|
||
logger.error(f"下載檔案時發生異常: {str(e)}")
|
||
return False
|
||
|
||
def move_file(self, file_id: str, destination_folder_path: str, create_missing: bool = False) -> bool:
|
||
"""
|
||
移動檔案到指定資料夾
|
||
|
||
Args:
|
||
file_id: Google Drive 檔案 ID
|
||
destination_folder_path: 目標資料夾路徑(如: "已匯入")
|
||
create_missing: 目標資料夾不存在時是否自動建立
|
||
|
||
Returns:
|
||
bool: 移動是否成功
|
||
"""
|
||
if not self.service:
|
||
if not self.authenticate():
|
||
return False
|
||
|
||
try:
|
||
# 取得檔案資訊
|
||
file = self.service.files().get(
|
||
fileId=file_id,
|
||
fields='id, name, parents'
|
||
).execute()
|
||
file_name = file.get('name')
|
||
previous_parents = ','.join(file.get('parents', []))
|
||
|
||
# 取得目標資料夾 ID
|
||
if create_missing:
|
||
destination_folder_id = self._ensure_folder_id_by_path(destination_folder_path)
|
||
else:
|
||
destination_folder_id = self._get_folder_id_by_path(destination_folder_path)
|
||
if not destination_folder_id:
|
||
logger.error(f"找不到目標資料夾: {destination_folder_path}")
|
||
return False
|
||
|
||
# 移動檔案(移除舊的 parent,添加新的 parent)
|
||
self.service.files().update(
|
||
fileId=file_id,
|
||
addParents=destination_folder_id,
|
||
removeParents=previous_parents,
|
||
fields='id, parents'
|
||
).execute()
|
||
|
||
logger.info(f"已移動檔案: {file_name} → {destination_folder_path}")
|
||
return True
|
||
|
||
except HttpError as error:
|
||
logger.error(f"移動檔案時發生錯誤: {error}")
|
||
return False
|
||
except Exception as e:
|
||
logger.error(f"移動檔案時發生異常: {str(e)}")
|
||
return False
|
||
|
||
def delete_file(self, file_id: str) -> bool:
|
||
"""
|
||
刪除檔案
|
||
|
||
Args:
|
||
file_id: Google Drive 檔案 ID
|
||
|
||
Returns:
|
||
bool: 刪除是否成功
|
||
"""
|
||
if not self.service:
|
||
if not self.authenticate():
|
||
return False
|
||
|
||
try:
|
||
# 取得檔案名稱(用於日誌)
|
||
file = self.service.files().get(fileId=file_id).execute()
|
||
file_name = file.get('name')
|
||
|
||
# 刪除檔案
|
||
self.service.files().delete(fileId=file_id).execute()
|
||
|
||
logger.info(f"已刪除檔案: {file_name} (ID: {file_id})")
|
||
return True
|
||
|
||
except HttpError as error:
|
||
logger.error(f"刪除檔案時發生錯誤: {error}")
|
||
return False
|
||
except Exception as e:
|
||
logger.error(f"刪除檔案時發生異常: {str(e)}")
|
||
return False
|
||
|
||
def get_file_info(self, file_id: str) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
取得檔案資訊
|
||
|
||
Args:
|
||
file_id: Google Drive 檔案 ID
|
||
|
||
Returns:
|
||
Optional[Dict]: 檔案資訊,失敗則返回 None
|
||
"""
|
||
if not self.service:
|
||
if not self.authenticate():
|
||
return None
|
||
|
||
try:
|
||
file = self.service.files().get(
|
||
fileId=file_id,
|
||
fields='id, name, mimeType, size, modifiedTime, webViewLink'
|
||
).execute()
|
||
|
||
return file
|
||
|
||
except HttpError as error:
|
||
logger.error(f"取得檔案資訊時發生錯誤: {error}")
|
||
return None
|
||
|
||
|
||
# 建立全域服務實例
|
||
drive_service = GoogleDriveService()
|