1244 lines
49 KiB
Python
1244 lines
49 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
自動匯入服務
|
||
負責從 Google Drive 自動下載、匯入、刪除檔案
|
||
"""
|
||
|
||
import logging
|
||
import json
|
||
import os
|
||
from datetime import date, datetime
|
||
from typing import Any, Dict, Optional
|
||
|
||
import pandas as pd
|
||
import pytz
|
||
from sqlalchemy import create_engine, text
|
||
from sqlalchemy.orm import sessionmaker
|
||
|
||
# 台北時區
|
||
TAIPEI_TZ = pytz.timezone('Asia/Taipei')
|
||
|
||
DAILY_SALES_REQUIRED_COLUMN_GROUPS = {
|
||
"商品名稱類": ["商品名稱", "品名", "Product", "Name"],
|
||
"業績金額類": ["銷售金額", "總業績", "業績", "金額", "Amount", "Sales", "Total"],
|
||
}
|
||
DAILY_SALES_DATE_ALIASES = ["日期", "訂單日期", "交易日期", "Date"]
|
||
DAILY_SALES_DETAIL_SHEET_HINTS = ("即時業績明細", "業績明細", "明細", "detail", "daily")
|
||
DAILY_SALES_HEADER_SCAN_ROWS = 15
|
||
|
||
from services.google_drive_service import drive_service
|
||
from database.import_models import ImportJob, ImportConfig, Base
|
||
from database.manager import ensure_metadata_initialized
|
||
|
||
# 設定日誌
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
def _build_in_clause(prefix: str, values) -> tuple:
|
||
"""Build a SQLAlchemy-safe IN clause placeholder list and params."""
|
||
params = {}
|
||
placeholders = []
|
||
for idx, value in enumerate(values):
|
||
key = f"{prefix}_{idx}"
|
||
placeholders.append(f":{key}")
|
||
params[key] = value
|
||
return ", ".join(placeholders), params
|
||
|
||
|
||
def _db_dialect_name() -> str:
|
||
try:
|
||
return engine.dialect.name
|
||
except Exception:
|
||
return ""
|
||
|
||
|
||
def _date_filter_expr(column_name: str) -> str:
|
||
"""Return a DB-specific date expression for legacy text date columns."""
|
||
if _db_dialect_name() == "postgresql":
|
||
return f"{column_name}::date"
|
||
return f"date({column_name})"
|
||
|
||
|
||
def _table_columns(conn, table_name: str) -> set[str]:
|
||
"""Return non-id table columns for PostgreSQL and SQLite-backed tests."""
|
||
if _db_dialect_name() == "postgresql":
|
||
rows = conn.execute(text("""
|
||
SELECT column_name FROM information_schema.columns
|
||
WHERE table_name = :table_name AND column_name != 'id'
|
||
ORDER BY ordinal_position
|
||
"""), {"table_name": table_name}).fetchall()
|
||
return {row[0] for row in rows}
|
||
|
||
rows = conn.execute(text(f'PRAGMA table_info("{table_name}")')).fetchall()
|
||
return {row[1] for row in rows if row[1] != 'id'}
|
||
|
||
|
||
def _normalise_date_values_for_sql(values):
|
||
"""Keep PostgreSQL params as date objects; use ISO strings for SQLite/text comparisons."""
|
||
normalised = []
|
||
use_native_date = _db_dialect_name() == "postgresql"
|
||
for value in values:
|
||
if value is None:
|
||
continue
|
||
if isinstance(value, datetime):
|
||
value = value.date()
|
||
elif hasattr(value, "date") and not isinstance(value, date):
|
||
try:
|
||
value = value.date()
|
||
except Exception:
|
||
pass
|
||
if use_native_date:
|
||
if isinstance(value, date):
|
||
normalised.append(value)
|
||
else:
|
||
parsed = pd.to_datetime(value, errors="coerce")
|
||
if pd.notna(parsed):
|
||
normalised.append(parsed.date())
|
||
else:
|
||
if isinstance(value, date):
|
||
normalised.append(value.isoformat())
|
||
else:
|
||
parsed = pd.to_datetime(value, errors="coerce")
|
||
normalised.append(parsed.strftime("%Y-%m-%d") if pd.notna(parsed) else str(value))
|
||
return normalised
|
||
|
||
|
||
def _normalise_column_text(value) -> str:
|
||
if pd.isna(value):
|
||
return ""
|
||
return str(value).strip().replace("\n", "").replace("\r", "").replace(" ", "").lower()
|
||
|
||
|
||
def _stringify_column(value) -> str:
|
||
if pd.isna(value):
|
||
return ""
|
||
return str(value).strip()
|
||
|
||
|
||
def _dedupe_columns(columns) -> list[str]:
|
||
seen = {}
|
||
result = []
|
||
for idx, column in enumerate(columns):
|
||
base = _stringify_column(column) or f"unnamed_{idx + 1}"
|
||
count = seen.get(base, 0) + 1
|
||
seen[base] = count
|
||
result.append(base if count == 1 else f"{base}_{count}")
|
||
return result
|
||
|
||
|
||
def _columns_have_any(columns, keywords) -> bool:
|
||
normalised_columns = [_normalise_column_text(column) for column in columns]
|
||
normalised_keywords = [_normalise_column_text(keyword) for keyword in keywords]
|
||
return any(
|
||
keyword and column and keyword in column
|
||
for column in normalised_columns
|
||
for keyword in normalised_keywords
|
||
)
|
||
|
||
|
||
def _missing_daily_sales_column_groups(columns) -> list[str]:
|
||
return [
|
||
label
|
||
for label, keywords in DAILY_SALES_REQUIRED_COLUMN_GROUPS.items()
|
||
if not _columns_have_any(columns, keywords)
|
||
]
|
||
|
||
|
||
def _find_daily_sales_date_column(columns) -> Optional[str]:
|
||
normalised_aliases = [_normalise_column_text(alias) for alias in DAILY_SALES_DATE_ALIASES]
|
||
for column in columns:
|
||
normalised_column = _normalise_column_text(column)
|
||
if normalised_column in normalised_aliases:
|
||
return column
|
||
for column in columns:
|
||
normalised_column = _normalise_column_text(column)
|
||
if normalised_column and any(alias and alias in normalised_column for alias in normalised_aliases):
|
||
return column
|
||
return None
|
||
|
||
|
||
def _score_daily_sales_candidate(sheet_name: str, header_row: int, columns) -> int:
|
||
missing = _missing_daily_sales_column_groups(columns)
|
||
if missing:
|
||
return 0
|
||
|
||
normalised_sheet_name = _normalise_column_text(sheet_name)
|
||
score = 1000
|
||
if _find_daily_sales_date_column(columns):
|
||
score += 250
|
||
if any(_normalise_column_text(hint) in normalised_sheet_name for hint in DAILY_SALES_DETAIL_SHEET_HINTS):
|
||
score += 150
|
||
score += min(len([column for column in columns if _stringify_column(column)]), 80)
|
||
score += max(0, DAILY_SALES_HEADER_SCAN_ROWS - header_row)
|
||
return score
|
||
|
||
|
||
def _clean_daily_sales_dataframe(df: pd.DataFrame) -> pd.DataFrame:
|
||
cleaned = df.dropna(how="all").dropna(axis=1, how="all").copy()
|
||
cleaned.columns = _dedupe_columns(cleaned.columns)
|
||
return cleaned
|
||
|
||
|
||
def _format_daily_sales_diagnostics(candidates: list[dict]) -> str:
|
||
if not candidates:
|
||
return "未找到可辨識的工作表表頭"
|
||
|
||
parts = []
|
||
for candidate in sorted(candidates, key=lambda item: item["score"], reverse=True)[:8]:
|
||
columns_sample = [
|
||
column for column in candidate["columns_sample"]
|
||
if column and not str(column).lower().startswith("nan")
|
||
][:8]
|
||
missing = candidate.get("missing") or []
|
||
parts.append(
|
||
f"{candidate['sheet_name']}!第{candidate['header_row'] + 1}列 "
|
||
f"缺少={missing or '無'} 欄位={columns_sample}"
|
||
)
|
||
return ";".join(parts)
|
||
|
||
|
||
def _read_daily_sales_excel(file_path: str) -> tuple[pd.DataFrame, dict]:
|
||
"""
|
||
讀取當日業績 Excel,支援多工作表與非第一列表頭。
|
||
|
||
Google Drive 匯出的檔案常把樞紐彙總表放在第一張 sheet,
|
||
真正的匯入明細在「即時業績明細」。此處先掃描所有 sheet 的前幾列,
|
||
再選出同時含商品名稱、業績金額且最好有日期欄位的明細 sheet。
|
||
"""
|
||
candidates: list[dict] = []
|
||
|
||
with pd.ExcelFile(file_path, engine="openpyxl") as excel:
|
||
for sheet_name in excel.sheet_names:
|
||
try:
|
||
preview = pd.read_excel(
|
||
excel,
|
||
sheet_name=sheet_name,
|
||
header=None,
|
||
nrows=DAILY_SALES_HEADER_SCAN_ROWS,
|
||
dtype=str,
|
||
)
|
||
except Exception as exc:
|
||
candidates.append({
|
||
"sheet_name": sheet_name,
|
||
"header_row": 0,
|
||
"columns_sample": [f"讀取預覽失敗: {exc}"],
|
||
"missing": list(DAILY_SALES_REQUIRED_COLUMN_GROUPS.keys()),
|
||
"score": 0,
|
||
})
|
||
continue
|
||
|
||
for header_row, row in preview.iterrows():
|
||
columns = [_stringify_column(value) for value in row.tolist()]
|
||
if not any(columns):
|
||
continue
|
||
missing = _missing_daily_sales_column_groups(columns)
|
||
score = _score_daily_sales_candidate(sheet_name, int(header_row), columns)
|
||
candidates.append({
|
||
"sheet_name": sheet_name,
|
||
"header_row": int(header_row),
|
||
"columns_sample": columns[:30],
|
||
"missing": missing,
|
||
"score": score,
|
||
})
|
||
|
||
valid_candidates = [candidate for candidate in candidates if not candidate["missing"]]
|
||
if not valid_candidates:
|
||
diagnostics = _format_daily_sales_diagnostics(candidates)
|
||
raise ValueError(
|
||
"Excel 欄位防禦失敗:找不到可匯入的當日業績明細工作表。"
|
||
f"已檢查:{diagnostics}"
|
||
)
|
||
|
||
best = max(valid_candidates, key=lambda candidate: candidate["score"])
|
||
df = pd.read_excel(
|
||
excel,
|
||
sheet_name=best["sheet_name"],
|
||
header=best["header_row"],
|
||
dtype=str,
|
||
)
|
||
|
||
df = _clean_daily_sales_dataframe(df)
|
||
if df.empty:
|
||
raise ValueError("Excel 檔案為空")
|
||
|
||
missing = _missing_daily_sales_column_groups(df.columns)
|
||
if missing:
|
||
raise ValueError(
|
||
f"Excel 欄位防禦失敗:缺少必要欄位分類 {missing}。"
|
||
f"現有欄位:{list(df.columns)[:30]}"
|
||
)
|
||
|
||
metadata = {
|
||
"sheet_name": best["sheet_name"],
|
||
"header_row": best["header_row"] + 1,
|
||
"date_col": _find_daily_sales_date_column(df.columns),
|
||
"candidate_score": best["score"],
|
||
}
|
||
return df, metadata
|
||
|
||
|
||
def _daily_sales_staleness_guard_enabled() -> bool:
|
||
return os.getenv("DAILY_SALES_IMPORT_STALENESS_GUARD_ENABLED", "true").lower() in {
|
||
"1",
|
||
"true",
|
||
"yes",
|
||
"on",
|
||
}
|
||
|
||
|
||
def _daily_sales_max_stale_days() -> int:
|
||
try:
|
||
return max(0, int(os.getenv("DAILY_SALES_IMPORT_MAX_STALE_DAYS", "3")))
|
||
except ValueError:
|
||
return 3
|
||
|
||
|
||
def _should_quarantine_failed_import(error_message: str) -> bool:
|
||
if not error_message:
|
||
return False
|
||
|
||
permanent_error_markers = [
|
||
"Excel 欄位防禦失敗",
|
||
"Excel 日期防禦失敗",
|
||
"Excel 檔案為空",
|
||
"找不到可匯入",
|
||
"欄位驗證失敗",
|
||
"日期驗證失敗",
|
||
]
|
||
return any(marker in error_message for marker in permanent_error_markers)
|
||
|
||
# 資料庫設定 - 使用 config.py 中的設定,支援 PostgreSQL 和 SQLite
|
||
def _create_engine_with_pool(db_path):
|
||
"""建立帶有連線池配置的資料庫引擎"""
|
||
if db_path.startswith('postgresql://'):
|
||
return create_engine(
|
||
db_path,
|
||
pool_pre_ping=True,
|
||
pool_size=5,
|
||
max_overflow=10,
|
||
pool_recycle=1800,
|
||
pool_timeout=30,
|
||
connect_args={
|
||
'connect_timeout': 10,
|
||
'options': '-c statement_timeout=120000' # 匯入需要更長的超時
|
||
}
|
||
)
|
||
elif db_path.startswith('sqlite://'):
|
||
return create_engine(db_path)
|
||
else:
|
||
return create_engine(f'sqlite:///{db_path}')
|
||
|
||
try:
|
||
from config import DATABASE_PATH as CONFIG_DATABASE_PATH
|
||
engine = _create_engine_with_pool(CONFIG_DATABASE_PATH)
|
||
logger.info(f"使用資料庫: {CONFIG_DATABASE_PATH.split('@')[-1] if '@' in CONFIG_DATABASE_PATH else CONFIG_DATABASE_PATH}")
|
||
except ImportError:
|
||
# 備援方案:使用環境變數或預設值
|
||
DATABASE_PATH = os.getenv('DATABASE_PATH', 'data/momo_database.db')
|
||
engine = _create_engine_with_pool(DATABASE_PATH)
|
||
logger.warning(f"無法匯入 config,使用備援資料庫路徑: {DATABASE_PATH}")
|
||
|
||
Session = sessionmaker(bind=engine)
|
||
|
||
|
||
class ImportService:
|
||
"""匯入服務類別"""
|
||
|
||
def __init__(self):
|
||
"""初始化匯入服務"""
|
||
self._init_database()
|
||
|
||
def _init_database(self):
|
||
"""初始化資料庫表"""
|
||
try:
|
||
ensure_metadata_initialized(engine, use_postgres_lock=str(engine.url).startswith('postgresql'))
|
||
logger.info("匯入追蹤表已初始化")
|
||
except Exception as e:
|
||
logger.error(f"初始化資料庫表失敗: {str(e)}")
|
||
|
||
def get_config(self, key: str, default: str = None) -> Optional[str]:
|
||
"""
|
||
取得配置值
|
||
|
||
Args:
|
||
key: 配置鍵
|
||
default: 預設值
|
||
|
||
Returns:
|
||
Optional[str]: 配置值
|
||
"""
|
||
session = Session()
|
||
try:
|
||
config = session.query(ImportConfig).filter_by(config_key=key).first()
|
||
if config:
|
||
return config.config_value
|
||
return default
|
||
finally:
|
||
session.close()
|
||
|
||
def set_config(self, key: str, value: str, config_type: str = 'string', description: str = None):
|
||
"""
|
||
設定配置值
|
||
|
||
Args:
|
||
key: 配置鍵
|
||
value: 配置值
|
||
config_type: 配置類型
|
||
description: 配置說明
|
||
"""
|
||
session = Session()
|
||
try:
|
||
config = session.query(ImportConfig).filter_by(config_key=key).first()
|
||
if config:
|
||
config.config_value = value
|
||
config.config_type = config_type
|
||
if description:
|
||
config.description = description
|
||
config.updated_at = datetime.now(TAIPEI_TZ).replace(tzinfo=None)
|
||
else:
|
||
config = ImportConfig(
|
||
config_key=key,
|
||
config_value=value,
|
||
config_type=config_type,
|
||
description=description
|
||
)
|
||
session.add(config)
|
||
|
||
session.commit()
|
||
logger.info(f"配置已更新: {key} = {value}")
|
||
|
||
except Exception as e:
|
||
session.rollback()
|
||
logger.error(f"設定配置失敗: {str(e)}")
|
||
finally:
|
||
session.close()
|
||
|
||
def create_import_job(self, job_type: str, drive_file_id: str, drive_file_name: str,
|
||
drive_file_size: int = None) -> Optional[int]:
|
||
"""
|
||
建立匯入任務
|
||
|
||
Args:
|
||
job_type: 任務類型(daily_sales 或 vendor_stockout)
|
||
drive_file_id: Google Drive 檔案 ID
|
||
drive_file_name: 檔案名稱
|
||
drive_file_size: 檔案大小
|
||
|
||
Returns:
|
||
Optional[int]: 任務 ID
|
||
"""
|
||
session = Session()
|
||
try:
|
||
job = ImportJob(
|
||
job_type=job_type,
|
||
status='pending',
|
||
drive_file_id=drive_file_id,
|
||
drive_file_name=drive_file_name,
|
||
drive_file_size=drive_file_size,
|
||
progress_percent=0.0,
|
||
current_step='等待開始...'
|
||
)
|
||
|
||
session.add(job)
|
||
session.commit()
|
||
|
||
job_id = job.id
|
||
logger.info(f"已建立匯入任務: ID={job_id}, 檔案={drive_file_name}")
|
||
return job_id
|
||
|
||
except Exception as e:
|
||
session.rollback()
|
||
logger.error(f"建立匯入任務失敗: {str(e)}")
|
||
return None
|
||
finally:
|
||
session.close()
|
||
|
||
def update_job_status(self, job_id: int, status: str, progress: float = None,
|
||
current_step: str = None, error_message: str = None):
|
||
"""
|
||
更新任務狀態
|
||
|
||
Args:
|
||
job_id: 任務 ID
|
||
status: 狀態
|
||
progress: 進度百分比
|
||
current_step: 當前步驟
|
||
error_message: 錯誤訊息
|
||
"""
|
||
session = Session()
|
||
try:
|
||
job = session.query(ImportJob).filter_by(id=job_id).first()
|
||
if not job:
|
||
logger.warning(f"找不到任務: ID={job_id}")
|
||
return
|
||
|
||
job.status = status
|
||
|
||
if progress is not None:
|
||
job.progress_percent = progress
|
||
|
||
if current_step:
|
||
job.current_step = current_step
|
||
|
||
if error_message:
|
||
job.error_message = error_message
|
||
|
||
# 更新時間戳 (2026-01-30 修正:使用台北時區)
|
||
if status == 'downloading' or status == 'importing':
|
||
if not job.started_at:
|
||
job.started_at = datetime.now(TAIPEI_TZ).replace(tzinfo=None)
|
||
elif status in ['completed', 'failed']:
|
||
job.completed_at = datetime.now(TAIPEI_TZ).replace(tzinfo=None)
|
||
|
||
session.commit()
|
||
logger.info(f"任務 {job_id} 狀態已更新: {status} ({progress}%)")
|
||
|
||
except Exception as e:
|
||
session.rollback()
|
||
logger.error(f"更新任務狀態失敗: {str(e)}")
|
||
finally:
|
||
session.close()
|
||
|
||
def update_job_progress(self, job_id: int, total_rows: int = None, processed_rows: int = None,
|
||
success_rows: int = None, error_rows: int = None):
|
||
"""
|
||
更新任務進度
|
||
|
||
Args:
|
||
job_id: 任務 ID
|
||
total_rows: 總行數
|
||
processed_rows: 已處理行數
|
||
success_rows: 成功行數
|
||
error_rows: 錯誤行數
|
||
"""
|
||
session = Session()
|
||
try:
|
||
job = session.query(ImportJob).filter_by(id=job_id).first()
|
||
if not job:
|
||
return
|
||
|
||
if total_rows is not None:
|
||
job.total_rows = total_rows
|
||
|
||
if processed_rows is not None:
|
||
job.processed_rows = processed_rows
|
||
|
||
if success_rows is not None:
|
||
job.success_rows = success_rows
|
||
|
||
if error_rows is not None:
|
||
job.error_rows = error_rows
|
||
|
||
# 計算進度百分比
|
||
if job.total_rows and job.processed_rows:
|
||
job.progress_percent = (job.processed_rows / job.total_rows) * 100
|
||
|
||
session.commit()
|
||
|
||
except Exception as e:
|
||
session.rollback()
|
||
logger.error(f"更新任務進度失敗: {str(e)}")
|
||
finally:
|
||
session.close()
|
||
|
||
def get_job_status(self, job_id: int) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
取得任務狀態
|
||
|
||
Args:
|
||
job_id: 任務 ID
|
||
|
||
Returns:
|
||
Optional[Dict]: 任務資訊
|
||
"""
|
||
session = Session()
|
||
try:
|
||
job = session.query(ImportJob).filter_by(id=job_id).first()
|
||
if job:
|
||
return job.to_dict()
|
||
return None
|
||
finally:
|
||
session.close()
|
||
|
||
def get_recent_jobs(self, limit: int = 20) -> list:
|
||
"""
|
||
取得最近的任務清單
|
||
|
||
Args:
|
||
limit: 返回數量
|
||
|
||
Returns:
|
||
list: 任務清單
|
||
"""
|
||
session = Session()
|
||
try:
|
||
jobs = session.query(ImportJob).order_by(
|
||
ImportJob.created_at.desc()
|
||
).limit(limit).all()
|
||
|
||
return [job.to_dict() for job in jobs]
|
||
finally:
|
||
session.close()
|
||
|
||
def process_daily_sales_import(self, job_id: int, file_path: str) -> bool:
|
||
"""
|
||
處理當日業績匯入
|
||
|
||
Args:
|
||
job_id: 任務 ID
|
||
file_path: Excel 檔案路徑
|
||
|
||
Returns:
|
||
bool: 是否成功
|
||
"""
|
||
try:
|
||
self.update_job_status(job_id, 'importing', 50, '正在匯入資料...')
|
||
|
||
# 讀取 Excel 檔案
|
||
logger.info(f"開始讀取 Excel 檔案: {file_path}")
|
||
try:
|
||
df, excel_metadata = _read_daily_sales_excel(file_path)
|
||
except ValueError as exc:
|
||
error_msg = str(exc)
|
||
step = '欄位驗證失敗' if '欄位' in error_msg else '匯入失敗'
|
||
logger.error(error_msg)
|
||
self.update_job_status(job_id, 'failed', 50, step, error_msg)
|
||
return False
|
||
|
||
logger.info(
|
||
"任務 %s 選用 Excel 工作表: %s | 表頭列=%s | 日期欄=%s | 欄位數=%s | 資料列=%s",
|
||
job_id,
|
||
excel_metadata.get("sheet_name"),
|
||
excel_metadata.get("header_row"),
|
||
excel_metadata.get("date_col") or "未提供",
|
||
len(df.columns),
|
||
len(df),
|
||
)
|
||
|
||
# 匯入到資料庫
|
||
table_name = 'daily_sales_snapshot'
|
||
|
||
# 找到日期欄位
|
||
date_col = excel_metadata.get("date_col") or _find_daily_sales_date_column(df.columns)
|
||
|
||
if date_col:
|
||
# 解析日期
|
||
parsed_dates = pd.to_datetime(df[date_col], errors='coerce')
|
||
if parsed_dates.dropna().empty:
|
||
error_msg = f"Excel 日期防禦失敗:日期欄位「{date_col}」無法解析出有效日期"
|
||
logger.error(error_msg)
|
||
self.update_job_status(job_id, 'failed', 50, '日期驗證失敗', error_msg)
|
||
return False
|
||
|
||
df['snapshot_date'] = parsed_dates.dt.date
|
||
logger.info(f"使用日期欄位: {date_col}")
|
||
|
||
if _daily_sales_staleness_guard_enabled():
|
||
max_snapshot_date = df['snapshot_date'].dropna().max()
|
||
today = datetime.now(TAIPEI_TZ).date()
|
||
lag_days = (today - max_snapshot_date).days
|
||
max_stale_days = _daily_sales_max_stale_days()
|
||
if lag_days > max_stale_days:
|
||
error_msg = (
|
||
f"Excel 日期防禦失敗:明細最大日期 {max_snapshot_date},"
|
||
f"已落後 {lag_days} 天(上限 {max_stale_days} 天)。"
|
||
"請上傳最新當日業績檔,或將舊檔移至已匯入/匯入失敗資料夾。"
|
||
)
|
||
logger.error(error_msg)
|
||
self.update_job_status(job_id, 'failed', 50, '日期驗證失敗', error_msg)
|
||
return False
|
||
else:
|
||
# 使用當前日期
|
||
df['snapshot_date'] = datetime.now(TAIPEI_TZ).date()
|
||
logger.info("未找到日期欄位,使用當前日期(台北時區)")
|
||
|
||
# 寫入資料庫 - 使用全域的 engine(支援 PostgreSQL 和 SQLite)
|
||
# 使用模組頂部定義的 engine,確保連接到正確的資料庫
|
||
|
||
# 更新進度
|
||
total_rows = len(df)
|
||
self.update_job_progress(job_id, total_rows=total_rows, processed_rows=0)
|
||
|
||
# 取得此次匯入的日期範圍
|
||
import_dates = df['snapshot_date'].unique()
|
||
logger.info(f"本次匯入包含 {len(import_dates)} 個日期的資料")
|
||
|
||
# 刪除資料庫中相同日期的舊資料(覆蓋邏輯)
|
||
if len(import_dates) > 0:
|
||
# 過濾掉 None 值
|
||
valid_dates = _normalise_date_values_for_sql(import_dates)
|
||
|
||
if valid_dates:
|
||
date_placeholders, date_params = _build_in_clause("snapshot_date", valid_dates)
|
||
snapshot_date_expr = _date_filter_expr("snapshot_date")
|
||
|
||
with engine.connect() as conn:
|
||
# 刪除相同日期的舊資料
|
||
delete_query = text(
|
||
f"DELETE FROM {table_name} WHERE {snapshot_date_expr} IN ({date_placeholders})"
|
||
)
|
||
result = conn.execute(delete_query, date_params)
|
||
deleted_count = result.rowcount
|
||
conn.commit()
|
||
|
||
if deleted_count > 0:
|
||
logger.info(f"已刪除 {deleted_count} 筆舊資料(覆蓋模式)")
|
||
|
||
# 寫入資料庫(帶驗證和重試機制)
|
||
max_retries = 2
|
||
retry_count = 0
|
||
write_success = False
|
||
|
||
while retry_count <= max_retries and not write_success:
|
||
try:
|
||
if retry_count > 0:
|
||
logger.warning(f"任務 {job_id} 第 {retry_count} 次重試寫入...")
|
||
self.update_job_status(job_id, 'importing', 60, f'重試寫入中 ({retry_count}/{max_retries})...')
|
||
|
||
df.to_sql(
|
||
table_name,
|
||
engine,
|
||
if_exists='append',
|
||
index=False,
|
||
method='multi',
|
||
chunksize=1000
|
||
)
|
||
|
||
# V-Fix: 匯入後驗證 - 確認資料已正確寫入資料庫
|
||
self.update_job_status(job_id, 'importing', 85, '驗證資料寫入...')
|
||
|
||
# 取得本次匯入的日期
|
||
import_dates = df['snapshot_date'].dropna().unique()
|
||
if len(import_dates) > 0:
|
||
# 查詢資料庫中這些日期的資料筆數
|
||
raw_valid_dates = [d for d in import_dates if d is not None]
|
||
valid_dates = _normalise_date_values_for_sql(raw_valid_dates)
|
||
date_placeholders, date_params = _build_in_clause("verify_date", valid_dates)
|
||
snapshot_date_expr = _date_filter_expr("snapshot_date")
|
||
|
||
with engine.connect() as conn:
|
||
verify_query = text(
|
||
f"SELECT COUNT(*) FROM {table_name} WHERE {snapshot_date_expr} IN ({date_placeholders})"
|
||
)
|
||
result = conn.execute(verify_query, date_params)
|
||
db_count = result.scalar()
|
||
|
||
# 驗證:資料庫筆數應該 >= 本次匯入筆數(可能有其他日期的舊資料)
|
||
expected_count = len(df[df['snapshot_date'].isin(raw_valid_dates)])
|
||
|
||
if db_count >= expected_count:
|
||
logger.info(f"任務 {job_id} 驗證成功: 預期 {expected_count} 筆, 資料庫有 {db_count} 筆")
|
||
write_success = True
|
||
else:
|
||
logger.warning(f"任務 {job_id} 驗證失敗: 預期 {expected_count} 筆, 資料庫只有 {db_count} 筆")
|
||
retry_count += 1
|
||
else:
|
||
# 沒有有效日期,跳過驗證
|
||
logger.warning(f"任務 {job_id} 無法驗證: 沒有有效的 snapshot_date")
|
||
write_success = True
|
||
|
||
except Exception as write_error:
|
||
logger.error(f"任務 {job_id} 寫入失敗 (嘗試 {retry_count + 1}): {str(write_error)}")
|
||
retry_count += 1
|
||
if retry_count > max_retries:
|
||
raise write_error
|
||
|
||
if not write_success:
|
||
error_msg = f"資料寫入驗證失敗,已重試 {max_retries} 次"
|
||
self.update_job_status(job_id, 'failed', 85, '驗證失敗', error_msg)
|
||
logger.error(f"任務 {job_id} {error_msg}")
|
||
return False
|
||
|
||
# === V-New 2026-01-15: 同步寫入 realtime_sales_monthly ===
|
||
# 目的:讓當日業績 raw data 同時呈現在「業績分析儀表板」
|
||
# 2026-01-30 修復:加強欄位驗證、同步狀態追蹤、失敗告警
|
||
self.update_job_status(job_id, 'importing', 90, '同步至業績分析儀表板...')
|
||
|
||
sync_success = False
|
||
sync_error_msg = None
|
||
monthly_table = 'realtime_sales_monthly'
|
||
|
||
try:
|
||
# 準備資料:移除 snapshot_date 欄位(realtime_sales_monthly 不需要此欄位)
|
||
df_monthly = df.drop(columns=['snapshot_date'], errors='ignore')
|
||
|
||
# 2026-01-30 修正:強化欄位名稱轉換
|
||
# 將特殊字符轉換為 PostgreSQL 安全格式
|
||
column_mapping = {}
|
||
for col in df_monthly.columns:
|
||
new_col = col.replace('%', '_pct').replace('(', '_').replace(')', '_')
|
||
column_mapping[col] = new_col
|
||
df_monthly = df_monthly.rename(columns=column_mapping)
|
||
|
||
# 記錄轉換的欄位
|
||
converted_cols = [f"'{k}' -> '{v}'" for k, v in column_mapping.items() if k != v]
|
||
if converted_cols:
|
||
logger.info(f"任務 {job_id} 欄位名稱轉換: {', '.join(converted_cols)}")
|
||
logger.info(f"任務 {job_id} 欄位轉換完成,共 {len(df_monthly.columns)} 個欄位")
|
||
|
||
# 2026-01-30 新增:驗證 DataFrame 欄位和目標表欄位是否一致
|
||
with engine.connect() as conn:
|
||
target_columns = _table_columns(conn, monthly_table)
|
||
|
||
df_columns = set(df_monthly.columns)
|
||
missing_in_table = df_columns - target_columns
|
||
missing_in_df = target_columns - df_columns
|
||
|
||
if missing_in_table:
|
||
logger.warning(f"任務 {job_id} 欄位警告: DataFrame 有但表中沒有: {missing_in_table}")
|
||
# 移除表中沒有的欄位,避免 INSERT 失敗
|
||
df_monthly = df_monthly.drop(columns=list(missing_in_table), errors='ignore')
|
||
logger.info(f"任務 {job_id} 已移除多餘欄位,剩餘 {len(df_monthly.columns)} 個欄位")
|
||
|
||
if missing_in_df:
|
||
logger.warning(f"任務 {job_id} 欄位警告: 表中有但 DataFrame 沒有: {missing_in_df}")
|
||
|
||
# 取得本次匯入的日期列表(使用原始「日期」欄位)
|
||
unique_dates = []
|
||
if '日期' in df.columns:
|
||
unique_dates = df['日期'].dropna().unique().tolist()
|
||
logger.info(f"任務 {job_id} 準備同步 {len(unique_dates)} 個日期的資料")
|
||
|
||
if len(unique_dates) > 0:
|
||
# 刪除 realtime_sales_monthly 中相同日期的舊資料(去重)
|
||
date_placeholders, date_params = _build_in_clause("monthly_date", unique_dates)
|
||
|
||
with engine.connect() as conn:
|
||
delete_monthly_query = text(
|
||
f'DELETE FROM {monthly_table} WHERE "日期" IN ({date_placeholders})'
|
||
)
|
||
result = conn.execute(delete_monthly_query, date_params)
|
||
deleted_monthly = result.rowcount
|
||
conn.commit()
|
||
|
||
if deleted_monthly > 0:
|
||
logger.info(f"任務 {job_id} 已從 {monthly_table} 刪除 {deleted_monthly} 筆同日期舊資料")
|
||
|
||
# 寫入 realtime_sales_monthly
|
||
df_monthly.to_sql(
|
||
monthly_table,
|
||
engine,
|
||
if_exists='append',
|
||
index=False,
|
||
method='multi',
|
||
chunksize=1000
|
||
)
|
||
|
||
logger.info(f"任務 {job_id} 已同步 {len(df_monthly)} 筆資料至 {monthly_table}")
|
||
|
||
# 驗證同步結果
|
||
if len(unique_dates) > 0:
|
||
with engine.connect() as conn:
|
||
date_placeholders, date_params = _build_in_clause("monthly_verify_date", unique_dates)
|
||
verify_query = text(
|
||
f'SELECT COUNT(*) FROM {monthly_table} WHERE "日期" IN ({date_placeholders})'
|
||
)
|
||
verify_count = conn.execute(verify_query, date_params).scalar()
|
||
|
||
if verify_count >= len(df_monthly):
|
||
logger.info(f"任務 {job_id} 同步驗證成功: {monthly_table} 現有 {verify_count} 筆資料")
|
||
sync_success = True
|
||
else:
|
||
sync_error_msg = f"同步驗證失敗: 預期 {len(df_monthly)} 筆, 實際 {verify_count} 筆"
|
||
logger.error(f"任務 {job_id} {sync_error_msg}")
|
||
else:
|
||
sync_success = True # 沒有日期資料時視為成功
|
||
|
||
except Exception as sync_error:
|
||
# 同步失敗,記錄完整錯誤
|
||
import traceback
|
||
sync_error_msg = str(sync_error)
|
||
logger.error(f"任務 {job_id} 同步至 {monthly_table} 失敗: {sync_error_msg}")
|
||
logger.error(f"任務 {job_id} 同步錯誤堆疊:\n{traceback.format_exc()}")
|
||
|
||
# 2026-01-30 新增:發送同步失敗告警
|
||
try:
|
||
from services.notification_manager import NotificationManager
|
||
notifier = NotificationManager()
|
||
alert_msg = (
|
||
f"⚠️ 業績資料同步失敗告警\n"
|
||
f"{'='*30}\n"
|
||
f"任務 ID: {job_id}\n"
|
||
f"目標表: {monthly_table}\n"
|
||
f"錯誤: {sync_error_msg[:200]}\n"
|
||
f"{'='*30}\n"
|
||
f"daily_sales_snapshot 已匯入成功,但業績分析儀表板需要手動同步"
|
||
)
|
||
notifier._send_telegram_messages([alert_msg])
|
||
logger.info(f"任務 {job_id} 已發送同步失敗告警")
|
||
except Exception as notify_error:
|
||
logger.error(f"任務 {job_id} 發送告警失敗: {notify_error}")
|
||
|
||
|
||
# 計算日期範圍
|
||
date_min = None
|
||
date_max = None
|
||
valid_dates = df['snapshot_date'].dropna().unique()
|
||
if len(valid_dates) > 0:
|
||
sorted_dates = sorted([d for d in valid_dates if d is not None])
|
||
if sorted_dates:
|
||
date_min = str(sorted_dates[0])
|
||
date_max = str(sorted_dates[-1])
|
||
logger.info(f"任務 {job_id} 日期範圍: {date_min} ~ {date_max}")
|
||
|
||
# 更新匯入摘要 (2026-01-30 修正:加入同步狀態)
|
||
if sync_success:
|
||
sync_message = f'成功匯入 {total_rows} 筆資料,已同步至業績分析儀表板'
|
||
else:
|
||
sync_message = f'成功匯入 {total_rows} 筆資料,但同步至業績分析儀表板失敗: {sync_error_msg}'
|
||
|
||
summary = {
|
||
'imported_count': total_rows,
|
||
'table_name': table_name,
|
||
'synced_to': 'realtime_sales_monthly' if sync_success else None,
|
||
'sync_success': sync_success,
|
||
'sync_error': sync_error_msg,
|
||
'verified': True, # daily_sales_snapshot 驗證
|
||
'date_min': date_min,
|
||
'date_max': date_max,
|
||
'source_sheet': excel_metadata.get("sheet_name"),
|
||
'source_header_row': excel_metadata.get("header_row"),
|
||
'message': sync_message
|
||
}
|
||
|
||
session = Session()
|
||
try:
|
||
job = session.query(ImportJob).filter_by(id=job_id).first()
|
||
if job:
|
||
job.import_summary = json.dumps(summary, ensure_ascii=False)
|
||
session.commit()
|
||
finally:
|
||
session.close()
|
||
|
||
if not sync_success:
|
||
error_msg = sync_error_msg or '同步至業績分析儀表板失敗'
|
||
self.update_job_progress(
|
||
job_id,
|
||
processed_rows=total_rows,
|
||
success_rows=0,
|
||
error_rows=total_rows,
|
||
)
|
||
self.update_job_status(
|
||
job_id,
|
||
'failed',
|
||
95,
|
||
'業績分析儀表板同步失敗',
|
||
error_msg,
|
||
)
|
||
logger.error(
|
||
"任務 %s 匯入未完成:daily_sales_snapshot 已寫入,但 %s 同步失敗;"
|
||
"保留來源檔案等待重試,不移動 Google Drive 檔案",
|
||
job_id,
|
||
monthly_table,
|
||
)
|
||
return False
|
||
|
||
# 更新成功資訊
|
||
self.update_job_progress(
|
||
job_id,
|
||
processed_rows=total_rows,
|
||
success_rows=total_rows
|
||
)
|
||
|
||
self.update_job_status(
|
||
job_id,
|
||
'completed',
|
||
100,
|
||
'匯入完成(已同步至業績分析儀表板)'
|
||
)
|
||
|
||
logger.info(f"任務 {job_id} 匯入成功: {total_rows} 筆")
|
||
try:
|
||
from services.cache_service import clear_growth_cache
|
||
clear_growth_cache()
|
||
except Exception as cache_error:
|
||
logger.warning(f"任務 {job_id} 成長分析快取清除失敗: {cache_error}")
|
||
# daily_sales cache 失效仍靠 _get_data_fingerprint(DB max(snapshot_date)+count(*));
|
||
# growth cache 另有 shared file + source fingerprint,匯入後主動清掉避免短 TTL 內看到舊圖。
|
||
return True
|
||
|
||
except Exception as e:
|
||
error_msg = f"匯入過程發生異常: {str(e)}"
|
||
self.update_job_status(job_id, 'failed', 50, '匯入失敗', error_msg)
|
||
logger.error(f"任務 {job_id} 匯入異常: {str(e)}")
|
||
import traceback
|
||
logger.error(traceback.format_exc())
|
||
return False
|
||
|
||
def auto_import_from_drive(self) -> Dict[str, Any]:
|
||
"""
|
||
從 Google Drive 自動匯入檔案
|
||
|
||
Returns:
|
||
Dict: 執行結果
|
||
"""
|
||
try:
|
||
# 取得配置
|
||
folder_path = self.get_config('gdrive_folder_path', '業績報表/當日業績')
|
||
file_pattern = self.get_config('gdrive_file_pattern', '即時業績_當日')
|
||
|
||
logger.info(f"開始檢查 Google Drive: {folder_path}")
|
||
|
||
# 列出檔案
|
||
files = drive_service.list_files_in_folder(folder_path, file_pattern)
|
||
drive_error_kind = getattr(drive_service, "last_error_kind", None)
|
||
drive_error = getattr(drive_service, "last_error", None)
|
||
|
||
if drive_error_kind:
|
||
message = (
|
||
"Google Drive 連線或認證失敗,未能確認來源資料夾是否有新檔案:"
|
||
f"{drive_error or drive_error_kind}"
|
||
)
|
||
logger.error(message)
|
||
return {
|
||
'success': False,
|
||
'message': message,
|
||
'file_count': 0,
|
||
'imported_count': 0,
|
||
'failed_count': 1,
|
||
'connection_error': True,
|
||
'error_kind': drive_error_kind,
|
||
}
|
||
|
||
if not files:
|
||
logger.info("沒有找到待匯入的檔案")
|
||
|
||
# Staleness gate (critic-approved 2026-05-03)
|
||
# 'move-then-success' 反模式:成功 import 後 move_file 把 Excel 搬到
|
||
# 「已匯入」資料夾 → 後續排程 list 回空 → 走此分支 silent return success
|
||
# → 4/27~5/2 daily_sales_snapshot 停更 8 天無告警。補主動偵測:
|
||
# Drive 空 + DB ≥3 天無新資料時主動發催促告警(週末跨假期不誤觸)。
|
||
try:
|
||
from database.manager import get_session
|
||
from sqlalchemy import text
|
||
from datetime import date
|
||
from services.openclaw_strategist_service import _send_data_stale_alert
|
||
|
||
_stale_session = get_session()
|
||
try:
|
||
last_date = _stale_session.execute(
|
||
text("SELECT MAX(snapshot_date)::date FROM daily_sales_snapshot")
|
||
).scalar()
|
||
finally:
|
||
_stale_session.close()
|
||
|
||
if last_date:
|
||
days_since = (date.today() - last_date).days
|
||
if days_since >= 3:
|
||
_send_data_stale_alert(
|
||
report_type="upstream_drive",
|
||
last_date=str(last_date),
|
||
period=f"已停更 {days_since} 天",
|
||
)
|
||
except Exception:
|
||
logger.error(
|
||
"staleness check failed in auto_import_from_drive",
|
||
exc_info=True,
|
||
)
|
||
|
||
return {
|
||
'success': True,
|
||
'message': '沒有找到待匯入的檔案',
|
||
'file_count': 0
|
||
}
|
||
|
||
# 處理每個檔案
|
||
imported_count = 0
|
||
total_rows = 0
|
||
all_dates = [] # 收集所有匯入的日期
|
||
failed_files = []
|
||
|
||
for file in files:
|
||
file_id = file['id']
|
||
file_name = file['name']
|
||
file_size = file.get('size', 0)
|
||
|
||
logger.info(f"發現檔案: {file_name}")
|
||
|
||
# 建立匯入任務
|
||
job_id = self.create_import_job('daily_sales', file_id, file_name, file_size)
|
||
if not job_id:
|
||
continue
|
||
|
||
# 下載檔案
|
||
self.update_job_status(job_id, 'downloading', 10, '正在下載檔案...')
|
||
|
||
temp_dir = 'data/temp'
|
||
os.makedirs(temp_dir, exist_ok=True)
|
||
local_path = os.path.join(temp_dir, file_name)
|
||
|
||
if not drive_service.download_file(file_id, local_path):
|
||
err = '無法從 Google Drive 下載檔案'
|
||
self.update_job_status(job_id, 'failed', 10, '下載失敗', err)
|
||
failed_files.append({'file': file_name, 'error': err})
|
||
continue
|
||
|
||
# 更新本地路徑
|
||
session = Session()
|
||
try:
|
||
job = session.query(ImportJob).filter_by(id=job_id).first()
|
||
if job:
|
||
job.local_file_path = local_path
|
||
session.commit()
|
||
finally:
|
||
session.close()
|
||
|
||
self.update_job_status(job_id, 'downloading', 40, '下載完成')
|
||
|
||
# 匯入資料
|
||
if self.process_daily_sales_import(job_id, local_path):
|
||
# 移動 Google Drive 檔案到「已匯入」資料夾
|
||
self.update_job_status(job_id, 'completed', 90, '正在移動雲端檔案...')
|
||
|
||
# 取得「已匯入」資料夾路徑配置
|
||
archive_folder = self.get_config('gdrive_archive_folder', '已匯入')
|
||
|
||
if drive_service.move_file(file_id, archive_folder):
|
||
logger.info(f"已移動 Google Drive 檔案到「{archive_folder}」: {file_name}")
|
||
else:
|
||
logger.warning(f"無法移動 Google Drive 檔案: {file_name}")
|
||
|
||
self.update_job_status(job_id, 'completed', 100, '完成')
|
||
imported_count += 1
|
||
|
||
# 讀取 job summary 取得匯入筆數和日期範圍
|
||
session = Session()
|
||
try:
|
||
job = session.query(ImportJob).filter_by(id=job_id).first()
|
||
if job and job.import_summary:
|
||
summary = json.loads(job.import_summary)
|
||
total_rows += summary.get('imported_count', 0)
|
||
if summary.get('date_min'):
|
||
all_dates.append(summary['date_min'])
|
||
if summary.get('date_max'):
|
||
all_dates.append(summary['date_max'])
|
||
finally:
|
||
session.close()
|
||
|
||
# 清理本地檔案
|
||
try:
|
||
os.remove(local_path)
|
||
logger.info(f"已清理本地檔案: {local_path}")
|
||
except Exception as e:
|
||
logger.warning(f"清理本地檔案失敗: {str(e)}")
|
||
else:
|
||
session = Session()
|
||
try:
|
||
job = session.query(ImportJob).filter_by(id=job_id).first()
|
||
error_message = (job.error_message if job else None) or '匯入失敗,請查看 import_jobs'
|
||
quarantined = False
|
||
failed_folder = None
|
||
if _should_quarantine_failed_import(error_message):
|
||
default_failed_folder = (
|
||
f"{folder_path.rstrip('/')}/匯入失敗"
|
||
if folder_path and folder_path.strip("/")
|
||
else "匯入失敗"
|
||
)
|
||
failed_folder = self.get_config('gdrive_failed_folder', default_failed_folder)
|
||
if drive_service.move_file(file_id, failed_folder, create_missing=True):
|
||
quarantined = True
|
||
error_message = (
|
||
f"{error_message}(已移至「{failed_folder}」避免重複告警)"
|
||
)
|
||
self.update_job_status(job_id, 'failed', 100, '已移至匯入失敗', error_message)
|
||
logger.warning(
|
||
"已隔離不可自動修復的匯入失敗檔案: %s → %s",
|
||
file_name,
|
||
failed_folder,
|
||
)
|
||
else:
|
||
logger.error("匯入失敗檔案隔離失敗: %s → %s", file_name, failed_folder)
|
||
|
||
failed_files.append({
|
||
'file': file_name,
|
||
'job_id': job_id,
|
||
'error': error_message,
|
||
'quarantined': quarantined,
|
||
'failed_folder': failed_folder,
|
||
})
|
||
finally:
|
||
session.close()
|
||
|
||
try:
|
||
os.remove(local_path)
|
||
logger.info(f"已清理失敗匯入本地檔案: {local_path}")
|
||
except Exception as e:
|
||
logger.warning(f"清理失敗匯入本地檔案失敗: {str(e)}")
|
||
|
||
# 計算日期範圍
|
||
date_range = None
|
||
if all_dates:
|
||
sorted_dates = sorted(set(all_dates))
|
||
if sorted_dates:
|
||
date_range = {
|
||
'min': sorted_dates[0],
|
||
'max': sorted_dates[-1]
|
||
}
|
||
|
||
if failed_files:
|
||
first_error = failed_files[0].get('error') or '未知錯誤'
|
||
message = (
|
||
f'找到 {len(files)} 個檔案,成功匯入 {imported_count} 個,'
|
||
f'失敗 {len(failed_files)} 個。首筆錯誤:{first_error[:220]}'
|
||
)
|
||
else:
|
||
message = f'成功匯入 {imported_count} 個檔案'
|
||
|
||
return {
|
||
'success': len(failed_files) == 0,
|
||
'message': message,
|
||
'file_count': len(files),
|
||
'imported_count': imported_count,
|
||
'failed_count': len(failed_files),
|
||
'errors': failed_files,
|
||
'total_rows': total_rows,
|
||
'date_range': date_range
|
||
}
|
||
|
||
except Exception as e:
|
||
error_msg = str(e)
|
||
logger.error(f"自動匯入失敗: {error_msg}")
|
||
|
||
# 區分連線錯誤和真正的匯入錯誤
|
||
# 連線錯誤(如 Broken pipe、網路問題)不應發送告警
|
||
connection_errors = [
|
||
'Broken pipe',
|
||
'Connection refused',
|
||
'Connection reset',
|
||
'Connection timed out',
|
||
'Name or service not known',
|
||
'No route to host',
|
||
'Network is unreachable',
|
||
'SSL',
|
||
'authenticate',
|
||
'credentials',
|
||
'token'
|
||
]
|
||
|
||
is_connection_error = any(err.lower() in error_msg.lower() for err in connection_errors)
|
||
|
||
if is_connection_error:
|
||
# Drive 連線 / 認證錯誤不是「無檔案」,必須 fail-closed 才能觸發告警與人工補件。
|
||
logger.error(f"Google Drive 連線問題,無法確認待匯入檔案: {error_msg}")
|
||
return {
|
||
'success': False,
|
||
'message': f'Google Drive 連線問題,無法確認待匯入檔案: {error_msg}',
|
||
'file_count': 0,
|
||
'imported_count': 0,
|
||
'failed_count': 1,
|
||
'connection_error': True
|
||
}
|
||
else:
|
||
# 真正的匯入錯誤:返回失敗
|
||
return {
|
||
'success': False,
|
||
'message': f'自動匯入失敗: {error_msg}',
|
||
'file_count': 0,
|
||
'imported_count': 0
|
||
}
|
||
|
||
|
||
# 建立全域服務實例
|
||
import_service = ImportService()
|