V10.605 修復當日業績匯入與資料庫備份
All checks were successful
CD Pipeline / deploy (push) Successful in 9m17s

This commit is contained in:
OoO
2026-06-15 14:52:33 +08:00
parent 8fa95b94a9
commit 55e14c0332
6 changed files with 476 additions and 52 deletions

View File

@@ -10,6 +10,7 @@ RUN apt-get update && apt-get install -y \
g++ \
curl \
libpq-dev \
postgresql-client \
# Chrome/Selenium 依賴
wget \
gnupg \

View File

@@ -402,7 +402,7 @@ YOUTUBE_API_KEY = os.getenv('YOUTUBE_API_KEY', '')
# ==========================================
# 系統版本與路徑
# ==========================================
SYSTEM_VERSION = "V10.604"
SYSTEM_VERSION = "V10.605"
LOG_FILE_PATH = os.path.join(BASE_DIR, 'logs/system.log')
public_url = PUBLIC_URL # 用於模板顯示

View File

@@ -3,9 +3,10 @@ DB Backup Service — EwoooC V10.3
負責執行 pg_dump 備份、保留策略、以及備份狀態寫入 backup_log
"""
import os
import subprocess
import logging
import glob
import shutil
import subprocess
from datetime import datetime, timedelta, timezone
TAIPEI_TZ = timezone(timedelta(hours=8))
@@ -25,6 +26,44 @@ def _ensure_backup_dir():
os.makedirs(BACKUP_DIR, exist_ok=True)
def _remove_partial_backup(filepath: str):
try:
if os.path.exists(filepath):
os.remove(filepath)
logger.warning(f"[Backup] 已移除不完整備份檔: {filepath}")
except Exception as exc:
logger.warning(f"[Backup] 移除不完整備份檔失敗 {filepath}: {exc}")
def _ensure_pg_dump_available() -> str:
pg_dump_path = shutil.which("pg_dump")
if pg_dump_path:
return pg_dump_path
apt_get_path = shutil.which("apt-get")
if not apt_get_path:
raise RuntimeError("pg_dump 不存在,且容器沒有 apt-get請重建 image 並安裝 postgresql-client")
logger.info("[Backup] pg_dump 不存在,嘗試安裝 postgresql-client...")
commands = [
[apt_get_path, "update", "-qq"],
[apt_get_path, "install", "-y", "-qq", "postgresql-client"],
]
for command in commands:
proc = subprocess.run(command, capture_output=True, text=True, timeout=180)
if proc.returncode != 0:
stderr = (proc.stderr or proc.stdout or "").strip()
raise RuntimeError(
"pg_dump 不存在,自動安裝 postgresql-client 失敗:"
f"{' '.join(command)}{stderr[:500]}"
)
pg_dump_path = shutil.which("pg_dump")
if not pg_dump_path:
raise RuntimeError("postgresql-client 安裝後仍找不到 pg_dump")
return pg_dump_path
def _log_backup(filename, file_size, duration, status, error=None, storage_path=None):
"""寫入 backup_log 表,失敗不阻斷主流程"""
try:
@@ -68,14 +107,6 @@ def run_backup() -> dict:
db_host = os.environ.get("POSTGRES_HOST", "momo-db")
db_port = os.environ.get("POSTGRES_PORT", "5432")
# 若 pg_dump 不存在則嘗試安裝容器重建後需重裝Dockerfile 已加入 postgresql-client
if not os.path.exists("/usr/bin/pg_dump"):
logger.info("[Backup] pg_dump 不存在,嘗試安裝 postgresql-client...")
subprocess.run(
["apt-get", "install", "-y", "-qq", "postgresql-client"],
capture_output=True
)
pg_password = os.environ.get("POSTGRES_PASSWORD")
pg_env = {**os.environ, "PGPASSWORD": pg_password} if pg_password else dict(os.environ)
@@ -83,9 +114,10 @@ def run_backup() -> dict:
result = {"success": False, "filename": filename, "file_size": 0, "duration": 0, "error": None}
try:
pg_dump_path = _ensure_pg_dump_available()
with open(filepath, "wb") as out_f:
pg_dump_proc = subprocess.Popen(
["pg_dump", "-h", db_host, "-p", db_port, "-U", DB_USER, "-d", DB_NAME,
[pg_dump_path, "-h", db_host, "-p", db_port, "-U", DB_USER, "-d", DB_NAME,
"--no-password", "-Fp"],
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
env=pg_env
@@ -112,6 +144,7 @@ def run_backup() -> dict:
if proc.returncode != 0:
error_msg = proc.stderr.strip() or "pg_dump 非零退出碼"
logger.error(f"[Backup] 備份失敗: {error_msg}")
_remove_partial_backup(filepath)
result["error"] = error_msg
result["duration"] = duration
_log_backup(filename, 0, duration, "failed", error=error_msg)
@@ -125,6 +158,7 @@ def run_backup() -> dict:
duration = (datetime.now() - start).total_seconds()
error_msg = "pg_dump 超時300s"
logger.error(f"[Backup] {error_msg}")
_remove_partial_backup(filepath)
result["error"] = error_msg
result["duration"] = duration
_log_backup(filename, 0, duration, "failed", error=error_msg)
@@ -132,6 +166,7 @@ def run_backup() -> dict:
duration = (datetime.now() - start).total_seconds()
error_msg = str(e)
logger.error(f"[Backup] 備份異常: {e}")
_remove_partial_backup(filepath)
result["error"] = error_msg
result["duration"] = duration
_log_backup(filename, 0, duration, "failed", error=error_msg)

View File

@@ -39,6 +39,10 @@ class GoogleDriveService:
self.service = None
self.credentials = None
@staticmethod
def _escape_query_value(value: str) -> str:
return value.replace("\\", "\\\\").replace("'", "\\'")
def authenticate(self) -> bool:
"""
進行 Google Drive 認證
@@ -170,7 +174,8 @@ class GoogleDriveService:
continue
# 搜尋此層級的資料夾
query = f"name='{folder_name}' and '{parent_id}' in parents and mimeType='application/vnd.google-apps.folder' and trashed=false"
safe_folder_name = self._escape_query_value(folder_name)
query = f"name='{safe_folder_name}' and '{parent_id}' in parents and mimeType='application/vnd.google-apps.folder' and trashed=false"
results = self.service.files().list(
q=query,
@@ -194,6 +199,53 @@ class GoogleDriveService:
logger.error(f"搜尋資料夾時發生錯誤: {error}")
return None
def _ensure_folder_id_by_path(self, folder_path: str) -> Optional[str]:
"""
根據路徑取得資料夾 ID若中途資料夾不存在則建立。
"""
try:
if not folder_path or folder_path.strip('/') == '':
return 'root'
path_parts = folder_path.strip('/').split('/')
parent_id = 'root'
for folder_name in path_parts:
if not folder_name:
continue
safe_folder_name = self._escape_query_value(folder_name)
query = f"name='{safe_folder_name}' and '{parent_id}' in parents and mimeType='application/vnd.google-apps.folder' and trashed=false"
results = self.service.files().list(
q=query,
spaces='drive',
fields='files(id, name)',
pageSize=1
).execute()
folders = results.get('files', [])
if folders:
parent_id = folders[0]['id']
continue
metadata = {
'name': folder_name,
'mimeType': 'application/vnd.google-apps.folder',
'parents': [parent_id],
}
folder = self.service.files().create(
body=metadata,
fields='id, name'
).execute()
parent_id = folder['id']
logger.info(f"已建立 Google Drive 資料夾: {folder_name}")
return parent_id
except HttpError as error:
logger.error(f"建立資料夾時發生錯誤: {error}")
return None
def download_file(self, file_id: str, destination_path: str) -> bool:
"""
下載檔案
@@ -242,13 +294,14 @@ class GoogleDriveService:
logger.error(f"下載檔案時發生異常: {str(e)}")
return False
def move_file(self, file_id: str, destination_folder_path: str) -> bool:
def move_file(self, file_id: str, destination_folder_path: str, create_missing: bool = False) -> bool:
"""
移動檔案到指定資料夾
Args:
file_id: Google Drive 檔案 ID
destination_folder_path: 目標資料夾路徑(如: "已匯入"
create_missing: 目標資料夾不存在時是否自動建立
Returns:
bool: 移動是否成功
@@ -267,6 +320,9 @@ class GoogleDriveService:
previous_parents = ','.join(file.get('parents', []))
# 取得目標資料夾 ID
if create_missing:
destination_folder_id = self._ensure_folder_id_by_path(destination_folder_path)
else:
destination_folder_id = self._get_folder_id_by_path(destination_folder_path)
if not destination_folder_id:
logger.error(f"找不到目標資料夾: {destination_folder_path}")

View File

@@ -5,19 +5,28 @@
負責從 Google Drive 自動下載、匯入、刪除檔案
"""
import os
import logging
import json
import os
from datetime import date, datetime
from typing import Optional, Dict, Any
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
from typing import Any, Dict, Optional
import pandas as pd
import pytz
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
# 台北時區
TAIPEI_TZ = pytz.timezone('Asia/Taipei')
DAILY_SALES_REQUIRED_COLUMN_GROUPS = {
"商品名稱類": ["商品名稱", "品名", "Product", "Name"],
"業績金額類": ["銷售金額", "總業績", "業績", "金額", "Amount", "Sales", "Total"],
}
DAILY_SALES_DATE_ALIASES = ["日期", "訂單日期", "交易日期", "Date"]
DAILY_SALES_DETAIL_SHEET_HINTS = ("即時業績明細", "業績明細", "明細", "detail", "daily")
DAILY_SALES_HEADER_SCAN_ROWS = 15
from services.google_drive_service import drive_service
from database.import_models import ImportJob, ImportConfig, Base
from database.manager import ensure_metadata_initialized
@@ -80,6 +89,211 @@ def _normalise_date_values_for_sql(values):
normalised.append(parsed.strftime("%Y-%m-%d") if pd.notna(parsed) else str(value))
return normalised
def _normalise_column_text(value) -> str:
if pd.isna(value):
return ""
return str(value).strip().replace("\n", "").replace("\r", "").replace(" ", "").lower()
def _stringify_column(value) -> str:
if pd.isna(value):
return ""
return str(value).strip()
def _dedupe_columns(columns) -> list[str]:
seen = {}
result = []
for idx, column in enumerate(columns):
base = _stringify_column(column) or f"unnamed_{idx + 1}"
count = seen.get(base, 0) + 1
seen[base] = count
result.append(base if count == 1 else f"{base}_{count}")
return result
def _columns_have_any(columns, keywords) -> bool:
normalised_columns = [_normalise_column_text(column) for column in columns]
normalised_keywords = [_normalise_column_text(keyword) for keyword in keywords]
return any(
keyword and column and keyword in column
for column in normalised_columns
for keyword in normalised_keywords
)
def _missing_daily_sales_column_groups(columns) -> list[str]:
return [
label
for label, keywords in DAILY_SALES_REQUIRED_COLUMN_GROUPS.items()
if not _columns_have_any(columns, keywords)
]
def _find_daily_sales_date_column(columns) -> Optional[str]:
normalised_aliases = [_normalise_column_text(alias) for alias in DAILY_SALES_DATE_ALIASES]
for column in columns:
normalised_column = _normalise_column_text(column)
if normalised_column in normalised_aliases:
return column
for column in columns:
normalised_column = _normalise_column_text(column)
if normalised_column and any(alias and alias in normalised_column for alias in normalised_aliases):
return column
return None
def _score_daily_sales_candidate(sheet_name: str, header_row: int, columns) -> int:
missing = _missing_daily_sales_column_groups(columns)
if missing:
return 0
normalised_sheet_name = _normalise_column_text(sheet_name)
score = 1000
if _find_daily_sales_date_column(columns):
score += 250
if any(_normalise_column_text(hint) in normalised_sheet_name for hint in DAILY_SALES_DETAIL_SHEET_HINTS):
score += 150
score += min(len([column for column in columns if _stringify_column(column)]), 80)
score += max(0, DAILY_SALES_HEADER_SCAN_ROWS - header_row)
return score
def _clean_daily_sales_dataframe(df: pd.DataFrame) -> pd.DataFrame:
cleaned = df.dropna(how="all").dropna(axis=1, how="all").copy()
cleaned.columns = _dedupe_columns(cleaned.columns)
return cleaned
def _format_daily_sales_diagnostics(candidates: list[dict]) -> str:
if not candidates:
return "未找到可辨識的工作表表頭"
parts = []
for candidate in sorted(candidates, key=lambda item: item["score"], reverse=True)[:8]:
columns_sample = [
column for column in candidate["columns_sample"]
if column and not str(column).lower().startswith("nan")
][:8]
missing = candidate.get("missing") or []
parts.append(
f"{candidate['sheet_name']}!第{candidate['header_row'] + 1}"
f"缺少={missing or ''} 欄位={columns_sample}"
)
return "".join(parts)
def _read_daily_sales_excel(file_path: str) -> tuple[pd.DataFrame, dict]:
"""
讀取當日業績 Excel支援多工作表與非第一列表頭。
Google Drive 匯出的檔案常把樞紐彙總表放在第一張 sheet
真正的匯入明細在「即時業績明細」。此處先掃描所有 sheet 的前幾列,
再選出同時含商品名稱、業績金額且最好有日期欄位的明細 sheet。
"""
candidates: list[dict] = []
with pd.ExcelFile(file_path, engine="openpyxl") as excel:
for sheet_name in excel.sheet_names:
try:
preview = pd.read_excel(
excel,
sheet_name=sheet_name,
header=None,
nrows=DAILY_SALES_HEADER_SCAN_ROWS,
dtype=str,
)
except Exception as exc:
candidates.append({
"sheet_name": sheet_name,
"header_row": 0,
"columns_sample": [f"讀取預覽失敗: {exc}"],
"missing": list(DAILY_SALES_REQUIRED_COLUMN_GROUPS.keys()),
"score": 0,
})
continue
for header_row, row in preview.iterrows():
columns = [_stringify_column(value) for value in row.tolist()]
if not any(columns):
continue
missing = _missing_daily_sales_column_groups(columns)
score = _score_daily_sales_candidate(sheet_name, int(header_row), columns)
candidates.append({
"sheet_name": sheet_name,
"header_row": int(header_row),
"columns_sample": columns[:30],
"missing": missing,
"score": score,
})
valid_candidates = [candidate for candidate in candidates if not candidate["missing"]]
if not valid_candidates:
diagnostics = _format_daily_sales_diagnostics(candidates)
raise ValueError(
"Excel 欄位防禦失敗:找不到可匯入的當日業績明細工作表。"
f"已檢查:{diagnostics}"
)
best = max(valid_candidates, key=lambda candidate: candidate["score"])
df = pd.read_excel(
excel,
sheet_name=best["sheet_name"],
header=best["header_row"],
dtype=str,
)
df = _clean_daily_sales_dataframe(df)
if df.empty:
raise ValueError("Excel 檔案為空")
missing = _missing_daily_sales_column_groups(df.columns)
if missing:
raise ValueError(
f"Excel 欄位防禦失敗:缺少必要欄位分類 {missing}"
f"現有欄位:{list(df.columns)[:30]}"
)
metadata = {
"sheet_name": best["sheet_name"],
"header_row": best["header_row"] + 1,
"date_col": _find_daily_sales_date_column(df.columns),
"candidate_score": best["score"],
}
return df, metadata
def _daily_sales_staleness_guard_enabled() -> bool:
return os.getenv("DAILY_SALES_IMPORT_STALENESS_GUARD_ENABLED", "true").lower() in {
"1",
"true",
"yes",
"on",
}
def _daily_sales_max_stale_days() -> int:
try:
return max(0, int(os.getenv("DAILY_SALES_IMPORT_MAX_STALE_DAYS", "3")))
except ValueError:
return 3
def _should_quarantine_failed_import(error_message: str) -> bool:
if not error_message:
return False
permanent_error_markers = [
"Excel 欄位防禦失敗",
"Excel 日期防禦失敗",
"Excel 檔案為空",
"找不到可匯入",
"欄位驗證失敗",
"日期驗證失敗",
]
return any(marker in error_message for marker in permanent_error_markers)
# 資料庫設定 - 使用 config.py 中的設定,支援 PostgreSQL 和 SQLite
def _create_engine_with_pool(db_path):
"""建立帶有連線池配置的資料庫引擎"""
@@ -369,50 +583,57 @@ class ImportService:
# 讀取 Excel 檔案
logger.info(f"開始讀取 Excel 檔案: {file_path}")
df = pd.read_excel(file_path, engine='openpyxl', dtype=str)
if df.empty:
error_msg = "Excel 檔案為空"
self.update_job_status(job_id, 'failed', 50, '匯入失敗', error_msg)
return False
# ─────────────────────────────────────────────
# 2026-04-19: daily_sales_snapshot 前置欄位防禦 (技術債修復)
# 原因:若 Excel 欄位名靜默變更,匯入會成功但 Hermes SQL JOIN 會找不到數據 → 告警管線失真
# 規則:至少需偵測到「商品名稱」與「銷售金額」類欄位 (容忍多種別名)
# ─────────────────────────────────────────────
def _has_any(cols, keywords):
return any(kw in c for c in cols for kw in keywords)
required_groups = {
"商品名稱類": ["商品名稱", "品名", "Product", "Name"],
"業績金額類": ["銷售金額", "業績", "金額", "Amount", "Sales", "Total"],
}
missing = [label for label, kws in required_groups.items()
if not _has_any(df.columns, kws)]
if missing:
error_msg = (
f"Excel 欄位防禦失敗:缺少必要欄位分類 {missing}"
f"現有欄位:{list(df.columns)[:30]}"
)
try:
df, excel_metadata = _read_daily_sales_excel(file_path)
except ValueError as exc:
error_msg = str(exc)
step = '欄位驗證失敗' if '欄位' in error_msg else '匯入失敗'
logger.error(error_msg)
self.update_job_status(job_id, 'failed', 50, '欄位驗證失敗', error_msg)
self.update_job_status(job_id, 'failed', 50, step, error_msg)
return False
logger.info(
"任務 %s 選用 Excel 工作表: %s | 表頭列=%s | 日期欄=%s | 欄位數=%s | 資料列=%s",
job_id,
excel_metadata.get("sheet_name"),
excel_metadata.get("header_row"),
excel_metadata.get("date_col") or "未提供",
len(df.columns),
len(df),
)
# 匯入到資料庫
table_name = 'daily_sales_snapshot'
# 找到日期欄位
date_col = None
for possible_col in ['日期', '訂單日期', '交易日期', 'Date']:
if possible_col in df.columns:
date_col = possible_col
break
date_col = excel_metadata.get("date_col") or _find_daily_sales_date_column(df.columns)
if date_col:
# 解析日期
df['snapshot_date'] = pd.to_datetime(df[date_col], errors='coerce').dt.date
parsed_dates = pd.to_datetime(df[date_col], errors='coerce')
if parsed_dates.dropna().empty:
error_msg = f"Excel 日期防禦失敗:日期欄位「{date_col}」無法解析出有效日期"
logger.error(error_msg)
self.update_job_status(job_id, 'failed', 50, '日期驗證失敗', error_msg)
return False
df['snapshot_date'] = parsed_dates.dt.date
logger.info(f"使用日期欄位: {date_col}")
if _daily_sales_staleness_guard_enabled():
max_snapshot_date = df['snapshot_date'].dropna().max()
today = datetime.now(TAIPEI_TZ).date()
lag_days = (today - max_snapshot_date).days
max_stale_days = _daily_sales_max_stale_days()
if lag_days > max_stale_days:
error_msg = (
f"Excel 日期防禦失敗:明細最大日期 {max_snapshot_date}"
f"已落後 {lag_days} 天(上限 {max_stale_days} 天)。"
"請上傳最新當日業績檔,或將舊檔移至已匯入/匯入失敗資料夾。"
)
logger.error(error_msg)
self.update_job_status(job_id, 'failed', 50, '日期驗證失敗', error_msg)
return False
else:
# 使用當前日期
df['snapshot_date'] = datetime.now(TAIPEI_TZ).date()
@@ -688,6 +909,8 @@ class ImportService:
'verified': True, # daily_sales_snapshot 驗證
'date_min': date_min,
'date_max': date_max,
'source_sheet': excel_metadata.get("sheet_name"),
'source_header_row': excel_metadata.get("header_row"),
'message': sync_message
}
@@ -860,14 +1083,46 @@ class ImportService:
session = Session()
try:
job = session.query(ImportJob).filter_by(id=job_id).first()
error_message = (job.error_message if job else None) or '匯入失敗,請查看 import_jobs'
quarantined = False
failed_folder = None
if _should_quarantine_failed_import(error_message):
default_failed_folder = (
f"{folder_path.rstrip('/')}/匯入失敗"
if folder_path and folder_path.strip("/")
else "匯入失敗"
)
failed_folder = self.get_config('gdrive_failed_folder', default_failed_folder)
if drive_service.move_file(file_id, failed_folder, create_missing=True):
quarantined = True
error_message = (
f"{error_message}(已移至「{failed_folder}」避免重複告警)"
)
self.update_job_status(job_id, 'failed', 100, '已移至匯入失敗', error_message)
logger.warning(
"已隔離不可自動修復的匯入失敗檔案: %s%s",
file_name,
failed_folder,
)
else:
logger.error("匯入失敗檔案隔離失敗: %s%s", file_name, failed_folder)
failed_files.append({
'file': file_name,
'job_id': job_id,
'error': (job.error_message if job else None) or '匯入失敗,請查看 import_jobs',
'error': error_message,
'quarantined': quarantined,
'failed_folder': failed_folder,
})
finally:
session.close()
try:
os.remove(local_path)
logger.info(f"已清理失敗匯入本地檔案: {local_path}")
except Exception as e:
logger.warning(f"清理失敗匯入本地檔案失敗: {str(e)}")
# 計算日期範圍
date_range = None
if all_dates:

View File

@@ -0,0 +1,77 @@
import pandas as pd
import pytest
from services.import_service import (
_read_daily_sales_excel,
_should_quarantine_failed_import,
)
def test_daily_sales_reader_prefers_detail_sheet_over_summary_sheets(tmp_path):
workbook = tmp_path / "即時業績_當日.xlsx"
with pd.ExcelWriter(workbook, engine="openpyxl") as writer:
pd.DataFrame([
[None, None, None, None],
[None, None, None, None],
["列標籤", "加總 - 總業績", "加總 - 總成本", "加總 - 折扣金額"],
["保健", "100", "60", "5"],
]).to_excel(writer, sheet_name="工作表1", header=False, index=False)
pd.DataFrame({
"列標籤": ["A"],
"商品名稱": ["彙總商品"],
"銷售網頁ID": ["P001"],
"加總 - 總業績": ["300"],
}).to_excel(writer, sheet_name="工作表2", index=False)
pd.DataFrame({
"狀態": ["完成"],
"日期": ["2026-06-14"],
"商品ID": ["SKU001"],
"商品名稱": ["專業測試商品"],
"數量": ["1"],
"總業績": ["1200"],
}).to_excel(writer, sheet_name="即時業績明細", index=False)
df, metadata = _read_daily_sales_excel(str(workbook))
assert metadata["sheet_name"] == "即時業績明細"
assert metadata["header_row"] == 1
assert metadata["date_col"] == "日期"
assert list(df["商品名稱"]) == ["專業測試商品"]
def test_daily_sales_reader_scans_header_rows(tmp_path):
workbook = tmp_path / "header-offset.xlsx"
with pd.ExcelWriter(workbook, engine="openpyxl") as writer:
pd.DataFrame([
["報表產製時間", "2026-06-14", None],
[None, None, None],
["日期", "商品名稱", "總業績"],
["2026-06-14", "測試商品", "999"],
]).to_excel(writer, sheet_name="即時業績明細", header=False, index=False)
df, metadata = _read_daily_sales_excel(str(workbook))
assert metadata["header_row"] == 3
assert df.iloc[0]["商品名稱"] == "測試商品"
def test_daily_sales_reader_reports_checked_sheets_when_no_candidate(tmp_path):
workbook = tmp_path / "invalid.xlsx"
with pd.ExcelWriter(workbook, engine="openpyxl") as writer:
pd.DataFrame({"Unnamed: 0": ["A"], "Unnamed: 1": ["B"]}).to_excel(
writer,
sheet_name="工作表1",
index=False,
)
with pytest.raises(ValueError) as exc:
_read_daily_sales_excel(str(workbook))
assert "Excel 欄位防禦失敗" in str(exc.value)
assert "工作表1" in str(exc.value)
def test_only_permanent_import_failures_are_quarantined():
assert _should_quarantine_failed_import("Excel 欄位防禦失敗:缺少必要欄位分類")
assert _should_quarantine_failed_import("Excel 日期防禦失敗:明細最大日期 2026-06-10")
assert not _should_quarantine_failed_import("無法從 Google Drive 下載檔案")