945 lines
38 KiB
Python
945 lines
38 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
自動匯入服務
|
||
負責從 Google Drive 自動下載、匯入、刪除檔案
|
||
"""
|
||
|
||
import os
|
||
import logging
|
||
import json
|
||
from datetime import date, datetime
|
||
from typing import Optional, Dict, Any
|
||
from sqlalchemy import create_engine, text
|
||
from sqlalchemy.orm import sessionmaker
|
||
import pandas as pd
|
||
import pytz
|
||
|
||
# 台北時區
|
||
TAIPEI_TZ = pytz.timezone('Asia/Taipei')
|
||
|
||
from services.google_drive_service import drive_service
|
||
from database.import_models import ImportJob, ImportConfig, Base
|
||
from database.manager import ensure_metadata_initialized
|
||
|
||
# 設定日誌
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
def _build_in_clause(prefix: str, values) -> tuple:
|
||
"""Build a SQLAlchemy-safe IN clause placeholder list and params."""
|
||
params = {}
|
||
placeholders = []
|
||
for idx, value in enumerate(values):
|
||
key = f"{prefix}_{idx}"
|
||
placeholders.append(f":{key}")
|
||
params[key] = value
|
||
return ", ".join(placeholders), params
|
||
|
||
|
||
def _db_dialect_name() -> str:
|
||
try:
|
||
return engine.dialect.name
|
||
except Exception:
|
||
return ""
|
||
|
||
|
||
def _date_filter_expr(column_name: str) -> str:
|
||
"""Return a DB-specific date expression for legacy text date columns."""
|
||
if _db_dialect_name() == "postgresql":
|
||
return f"{column_name}::date"
|
||
return f"date({column_name})"
|
||
|
||
|
||
def _normalise_date_values_for_sql(values):
|
||
"""Keep PostgreSQL params as date objects; use ISO strings for SQLite/text comparisons."""
|
||
normalised = []
|
||
use_native_date = _db_dialect_name() == "postgresql"
|
||
for value in values:
|
||
if value is None:
|
||
continue
|
||
if isinstance(value, datetime):
|
||
value = value.date()
|
||
elif hasattr(value, "date") and not isinstance(value, date):
|
||
try:
|
||
value = value.date()
|
||
except Exception:
|
||
pass
|
||
if use_native_date:
|
||
if isinstance(value, date):
|
||
normalised.append(value)
|
||
else:
|
||
parsed = pd.to_datetime(value, errors="coerce")
|
||
if pd.notna(parsed):
|
||
normalised.append(parsed.date())
|
||
else:
|
||
if isinstance(value, date):
|
||
normalised.append(value.isoformat())
|
||
else:
|
||
parsed = pd.to_datetime(value, errors="coerce")
|
||
normalised.append(parsed.strftime("%Y-%m-%d") if pd.notna(parsed) else str(value))
|
||
return normalised
|
||
|
||
# 資料庫設定 - 使用 config.py 中的設定,支援 PostgreSQL 和 SQLite
|
||
def _create_engine_with_pool(db_path):
|
||
"""建立帶有連線池配置的資料庫引擎"""
|
||
if db_path.startswith('postgresql://'):
|
||
return create_engine(
|
||
db_path,
|
||
pool_pre_ping=True,
|
||
pool_size=5,
|
||
max_overflow=10,
|
||
pool_recycle=1800,
|
||
pool_timeout=30,
|
||
connect_args={
|
||
'connect_timeout': 10,
|
||
'options': '-c statement_timeout=120000' # 匯入需要更長的超時
|
||
}
|
||
)
|
||
elif db_path.startswith('sqlite://'):
|
||
return create_engine(db_path)
|
||
else:
|
||
return create_engine(f'sqlite:///{db_path}')
|
||
|
||
try:
|
||
from config import DATABASE_PATH as CONFIG_DATABASE_PATH
|
||
engine = _create_engine_with_pool(CONFIG_DATABASE_PATH)
|
||
logger.info(f"使用資料庫: {CONFIG_DATABASE_PATH.split('@')[-1] if '@' in CONFIG_DATABASE_PATH else CONFIG_DATABASE_PATH}")
|
||
except ImportError:
|
||
# 備援方案:使用環境變數或預設值
|
||
DATABASE_PATH = os.getenv('DATABASE_PATH', 'data/momo_database.db')
|
||
engine = _create_engine_with_pool(DATABASE_PATH)
|
||
logger.warning(f"無法匯入 config,使用備援資料庫路徑: {DATABASE_PATH}")
|
||
|
||
Session = sessionmaker(bind=engine)
|
||
|
||
|
||
class ImportService:
|
||
"""匯入服務類別"""
|
||
|
||
def __init__(self):
|
||
"""初始化匯入服務"""
|
||
self._init_database()
|
||
|
||
def _init_database(self):
|
||
"""初始化資料庫表"""
|
||
try:
|
||
ensure_metadata_initialized(engine, use_postgres_lock=str(engine.url).startswith('postgresql'))
|
||
logger.info("匯入追蹤表已初始化")
|
||
except Exception as e:
|
||
logger.error(f"初始化資料庫表失敗: {str(e)}")
|
||
|
||
def get_config(self, key: str, default: str = None) -> Optional[str]:
|
||
"""
|
||
取得配置值
|
||
|
||
Args:
|
||
key: 配置鍵
|
||
default: 預設值
|
||
|
||
Returns:
|
||
Optional[str]: 配置值
|
||
"""
|
||
session = Session()
|
||
try:
|
||
config = session.query(ImportConfig).filter_by(config_key=key).first()
|
||
if config:
|
||
return config.config_value
|
||
return default
|
||
finally:
|
||
session.close()
|
||
|
||
def set_config(self, key: str, value: str, config_type: str = 'string', description: str = None):
|
||
"""
|
||
設定配置值
|
||
|
||
Args:
|
||
key: 配置鍵
|
||
value: 配置值
|
||
config_type: 配置類型
|
||
description: 配置說明
|
||
"""
|
||
session = Session()
|
||
try:
|
||
config = session.query(ImportConfig).filter_by(config_key=key).first()
|
||
if config:
|
||
config.config_value = value
|
||
config.config_type = config_type
|
||
if description:
|
||
config.description = description
|
||
config.updated_at = datetime.now(TAIPEI_TZ).replace(tzinfo=None)
|
||
else:
|
||
config = ImportConfig(
|
||
config_key=key,
|
||
config_value=value,
|
||
config_type=config_type,
|
||
description=description
|
||
)
|
||
session.add(config)
|
||
|
||
session.commit()
|
||
logger.info(f"配置已更新: {key} = {value}")
|
||
|
||
except Exception as e:
|
||
session.rollback()
|
||
logger.error(f"設定配置失敗: {str(e)}")
|
||
finally:
|
||
session.close()
|
||
|
||
def create_import_job(self, job_type: str, drive_file_id: str, drive_file_name: str,
|
||
drive_file_size: int = None) -> Optional[int]:
|
||
"""
|
||
建立匯入任務
|
||
|
||
Args:
|
||
job_type: 任務類型(daily_sales 或 vendor_stockout)
|
||
drive_file_id: Google Drive 檔案 ID
|
||
drive_file_name: 檔案名稱
|
||
drive_file_size: 檔案大小
|
||
|
||
Returns:
|
||
Optional[int]: 任務 ID
|
||
"""
|
||
session = Session()
|
||
try:
|
||
job = ImportJob(
|
||
job_type=job_type,
|
||
status='pending',
|
||
drive_file_id=drive_file_id,
|
||
drive_file_name=drive_file_name,
|
||
drive_file_size=drive_file_size,
|
||
progress_percent=0.0,
|
||
current_step='等待開始...'
|
||
)
|
||
|
||
session.add(job)
|
||
session.commit()
|
||
|
||
job_id = job.id
|
||
logger.info(f"已建立匯入任務: ID={job_id}, 檔案={drive_file_name}")
|
||
return job_id
|
||
|
||
except Exception as e:
|
||
session.rollback()
|
||
logger.error(f"建立匯入任務失敗: {str(e)}")
|
||
return None
|
||
finally:
|
||
session.close()
|
||
|
||
def update_job_status(self, job_id: int, status: str, progress: float = None,
|
||
current_step: str = None, error_message: str = None):
|
||
"""
|
||
更新任務狀態
|
||
|
||
Args:
|
||
job_id: 任務 ID
|
||
status: 狀態
|
||
progress: 進度百分比
|
||
current_step: 當前步驟
|
||
error_message: 錯誤訊息
|
||
"""
|
||
session = Session()
|
||
try:
|
||
job = session.query(ImportJob).filter_by(id=job_id).first()
|
||
if not job:
|
||
logger.warning(f"找不到任務: ID={job_id}")
|
||
return
|
||
|
||
job.status = status
|
||
|
||
if progress is not None:
|
||
job.progress_percent = progress
|
||
|
||
if current_step:
|
||
job.current_step = current_step
|
||
|
||
if error_message:
|
||
job.error_message = error_message
|
||
|
||
# 更新時間戳 (2026-01-30 修正:使用台北時區)
|
||
if status == 'downloading' or status == 'importing':
|
||
if not job.started_at:
|
||
job.started_at = datetime.now(TAIPEI_TZ).replace(tzinfo=None)
|
||
elif status in ['completed', 'failed']:
|
||
job.completed_at = datetime.now(TAIPEI_TZ).replace(tzinfo=None)
|
||
|
||
session.commit()
|
||
logger.info(f"任務 {job_id} 狀態已更新: {status} ({progress}%)")
|
||
|
||
except Exception as e:
|
||
session.rollback()
|
||
logger.error(f"更新任務狀態失敗: {str(e)}")
|
||
finally:
|
||
session.close()
|
||
|
||
def update_job_progress(self, job_id: int, total_rows: int = None, processed_rows: int = None,
|
||
success_rows: int = None, error_rows: int = None):
|
||
"""
|
||
更新任務進度
|
||
|
||
Args:
|
||
job_id: 任務 ID
|
||
total_rows: 總行數
|
||
processed_rows: 已處理行數
|
||
success_rows: 成功行數
|
||
error_rows: 錯誤行數
|
||
"""
|
||
session = Session()
|
||
try:
|
||
job = session.query(ImportJob).filter_by(id=job_id).first()
|
||
if not job:
|
||
return
|
||
|
||
if total_rows is not None:
|
||
job.total_rows = total_rows
|
||
|
||
if processed_rows is not None:
|
||
job.processed_rows = processed_rows
|
||
|
||
if success_rows is not None:
|
||
job.success_rows = success_rows
|
||
|
||
if error_rows is not None:
|
||
job.error_rows = error_rows
|
||
|
||
# 計算進度百分比
|
||
if job.total_rows and job.processed_rows:
|
||
job.progress_percent = (job.processed_rows / job.total_rows) * 100
|
||
|
||
session.commit()
|
||
|
||
except Exception as e:
|
||
session.rollback()
|
||
logger.error(f"更新任務進度失敗: {str(e)}")
|
||
finally:
|
||
session.close()
|
||
|
||
def get_job_status(self, job_id: int) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
取得任務狀態
|
||
|
||
Args:
|
||
job_id: 任務 ID
|
||
|
||
Returns:
|
||
Optional[Dict]: 任務資訊
|
||
"""
|
||
session = Session()
|
||
try:
|
||
job = session.query(ImportJob).filter_by(id=job_id).first()
|
||
if job:
|
||
return job.to_dict()
|
||
return None
|
||
finally:
|
||
session.close()
|
||
|
||
def get_recent_jobs(self, limit: int = 20) -> list:
|
||
"""
|
||
取得最近的任務清單
|
||
|
||
Args:
|
||
limit: 返回數量
|
||
|
||
Returns:
|
||
list: 任務清單
|
||
"""
|
||
session = Session()
|
||
try:
|
||
jobs = session.query(ImportJob).order_by(
|
||
ImportJob.created_at.desc()
|
||
).limit(limit).all()
|
||
|
||
return [job.to_dict() for job in jobs]
|
||
finally:
|
||
session.close()
|
||
|
||
def process_daily_sales_import(self, job_id: int, file_path: str) -> bool:
|
||
"""
|
||
處理當日業績匯入
|
||
|
||
Args:
|
||
job_id: 任務 ID
|
||
file_path: Excel 檔案路徑
|
||
|
||
Returns:
|
||
bool: 是否成功
|
||
"""
|
||
try:
|
||
self.update_job_status(job_id, 'importing', 50, '正在匯入資料...')
|
||
|
||
# 讀取 Excel 檔案
|
||
logger.info(f"開始讀取 Excel 檔案: {file_path}")
|
||
df = pd.read_excel(file_path, engine='openpyxl', dtype=str)
|
||
|
||
if df.empty:
|
||
error_msg = "Excel 檔案為空"
|
||
self.update_job_status(job_id, 'failed', 50, '匯入失敗', error_msg)
|
||
return False
|
||
|
||
# ─────────────────────────────────────────────
|
||
# 2026-04-19: daily_sales_snapshot 前置欄位防禦 (技術債修復)
|
||
# 原因:若 Excel 欄位名靜默變更,匯入會成功但 Hermes SQL JOIN 會找不到數據 → 告警管線失真
|
||
# 規則:至少需偵測到「商品名稱」與「銷售金額」類欄位 (容忍多種別名)
|
||
# ─────────────────────────────────────────────
|
||
def _has_any(cols, keywords):
|
||
return any(kw in c for c in cols for kw in keywords)
|
||
|
||
required_groups = {
|
||
"商品名稱類": ["商品名稱", "品名", "Product", "Name"],
|
||
"業績金額類": ["銷售金額", "業績", "金額", "Amount", "Sales", "Total"],
|
||
}
|
||
missing = [label for label, kws in required_groups.items()
|
||
if not _has_any(df.columns, kws)]
|
||
if missing:
|
||
error_msg = (
|
||
f"Excel 欄位防禦失敗:缺少必要欄位分類 {missing}。"
|
||
f"現有欄位:{list(df.columns)[:30]}"
|
||
)
|
||
logger.error(error_msg)
|
||
self.update_job_status(job_id, 'failed', 50, '欄位驗證失敗', error_msg)
|
||
return False
|
||
|
||
# 匯入到資料庫
|
||
table_name = 'daily_sales_snapshot'
|
||
|
||
# 找到日期欄位
|
||
date_col = None
|
||
for possible_col in ['日期', '訂單日期', '交易日期', 'Date']:
|
||
if possible_col in df.columns:
|
||
date_col = possible_col
|
||
break
|
||
|
||
if date_col:
|
||
# 解析日期
|
||
df['snapshot_date'] = pd.to_datetime(df[date_col], errors='coerce').dt.date
|
||
logger.info(f"使用日期欄位: {date_col}")
|
||
else:
|
||
# 使用當前日期
|
||
df['snapshot_date'] = datetime.now(TAIPEI_TZ).date()
|
||
logger.info("未找到日期欄位,使用當前日期(台北時區)")
|
||
|
||
# 寫入資料庫 - 使用全域的 engine(支援 PostgreSQL 和 SQLite)
|
||
# 使用模組頂部定義的 engine,確保連接到正確的資料庫
|
||
|
||
# 更新進度
|
||
total_rows = len(df)
|
||
self.update_job_progress(job_id, total_rows=total_rows, processed_rows=0)
|
||
|
||
# 取得此次匯入的日期範圍
|
||
import_dates = df['snapshot_date'].unique()
|
||
logger.info(f"本次匯入包含 {len(import_dates)} 個日期的資料")
|
||
|
||
# 刪除資料庫中相同日期的舊資料(覆蓋邏輯)
|
||
if len(import_dates) > 0:
|
||
# 過濾掉 None 值
|
||
valid_dates = _normalise_date_values_for_sql(import_dates)
|
||
|
||
if valid_dates:
|
||
date_placeholders, date_params = _build_in_clause("snapshot_date", valid_dates)
|
||
snapshot_date_expr = _date_filter_expr("snapshot_date")
|
||
|
||
with engine.connect() as conn:
|
||
# 刪除相同日期的舊資料
|
||
delete_query = text(
|
||
f"DELETE FROM {table_name} WHERE {snapshot_date_expr} IN ({date_placeholders})"
|
||
)
|
||
result = conn.execute(delete_query, date_params)
|
||
deleted_count = result.rowcount
|
||
conn.commit()
|
||
|
||
if deleted_count > 0:
|
||
logger.info(f"已刪除 {deleted_count} 筆舊資料(覆蓋模式)")
|
||
|
||
# 寫入資料庫(帶驗證和重試機制)
|
||
max_retries = 2
|
||
retry_count = 0
|
||
write_success = False
|
||
|
||
while retry_count <= max_retries and not write_success:
|
||
try:
|
||
if retry_count > 0:
|
||
logger.warning(f"任務 {job_id} 第 {retry_count} 次重試寫入...")
|
||
self.update_job_status(job_id, 'importing', 60, f'重試寫入中 ({retry_count}/{max_retries})...')
|
||
|
||
df.to_sql(
|
||
table_name,
|
||
engine,
|
||
if_exists='append',
|
||
index=False,
|
||
method='multi',
|
||
chunksize=1000
|
||
)
|
||
|
||
# V-Fix: 匯入後驗證 - 確認資料已正確寫入資料庫
|
||
self.update_job_status(job_id, 'importing', 85, '驗證資料寫入...')
|
||
|
||
# 取得本次匯入的日期
|
||
import_dates = df['snapshot_date'].dropna().unique()
|
||
if len(import_dates) > 0:
|
||
# 查詢資料庫中這些日期的資料筆數
|
||
raw_valid_dates = [d for d in import_dates if d is not None]
|
||
valid_dates = _normalise_date_values_for_sql(raw_valid_dates)
|
||
date_placeholders, date_params = _build_in_clause("verify_date", valid_dates)
|
||
snapshot_date_expr = _date_filter_expr("snapshot_date")
|
||
|
||
with engine.connect() as conn:
|
||
verify_query = text(
|
||
f"SELECT COUNT(*) FROM {table_name} WHERE {snapshot_date_expr} IN ({date_placeholders})"
|
||
)
|
||
result = conn.execute(verify_query, date_params)
|
||
db_count = result.scalar()
|
||
|
||
# 驗證:資料庫筆數應該 >= 本次匯入筆數(可能有其他日期的舊資料)
|
||
expected_count = len(df[df['snapshot_date'].isin(raw_valid_dates)])
|
||
|
||
if db_count >= expected_count:
|
||
logger.info(f"任務 {job_id} 驗證成功: 預期 {expected_count} 筆, 資料庫有 {db_count} 筆")
|
||
write_success = True
|
||
else:
|
||
logger.warning(f"任務 {job_id} 驗證失敗: 預期 {expected_count} 筆, 資料庫只有 {db_count} 筆")
|
||
retry_count += 1
|
||
else:
|
||
# 沒有有效日期,跳過驗證
|
||
logger.warning(f"任務 {job_id} 無法驗證: 沒有有效的 snapshot_date")
|
||
write_success = True
|
||
|
||
except Exception as write_error:
|
||
logger.error(f"任務 {job_id} 寫入失敗 (嘗試 {retry_count + 1}): {str(write_error)}")
|
||
retry_count += 1
|
||
if retry_count > max_retries:
|
||
raise write_error
|
||
|
||
if not write_success:
|
||
error_msg = f"資料寫入驗證失敗,已重試 {max_retries} 次"
|
||
self.update_job_status(job_id, 'failed', 85, '驗證失敗', error_msg)
|
||
logger.error(f"任務 {job_id} {error_msg}")
|
||
return False
|
||
|
||
# === V-New 2026-01-15: 同步寫入 realtime_sales_monthly ===
|
||
# 目的:讓當日業績 raw data 同時呈現在「業績分析儀表板」
|
||
# 2026-01-30 修復:加強欄位驗證、同步狀態追蹤、失敗告警
|
||
self.update_job_status(job_id, 'importing', 90, '同步至業績分析儀表板...')
|
||
|
||
sync_success = False
|
||
sync_error_msg = None
|
||
monthly_table = 'realtime_sales_monthly'
|
||
|
||
try:
|
||
# 準備資料:移除 snapshot_date 欄位(realtime_sales_monthly 不需要此欄位)
|
||
df_monthly = df.drop(columns=['snapshot_date'], errors='ignore')
|
||
|
||
# 2026-01-30 修正:強化欄位名稱轉換
|
||
# 將特殊字符轉換為 PostgreSQL 安全格式
|
||
column_mapping = {}
|
||
for col in df_monthly.columns:
|
||
new_col = col.replace('%', '_pct').replace('(', '_').replace(')', '_')
|
||
column_mapping[col] = new_col
|
||
df_monthly = df_monthly.rename(columns=column_mapping)
|
||
|
||
# 記錄轉換的欄位
|
||
converted_cols = [f"'{k}' -> '{v}'" for k, v in column_mapping.items() if k != v]
|
||
if converted_cols:
|
||
logger.info(f"任務 {job_id} 欄位名稱轉換: {', '.join(converted_cols)}")
|
||
logger.info(f"任務 {job_id} 欄位轉換完成,共 {len(df_monthly.columns)} 個欄位")
|
||
|
||
# 2026-01-30 新增:驗證 DataFrame 欄位和目標表欄位是否一致
|
||
with engine.connect() as conn:
|
||
col_query = text(f"""
|
||
SELECT column_name FROM information_schema.columns
|
||
WHERE table_name = '{monthly_table}' AND column_name != 'id'
|
||
ORDER BY ordinal_position
|
||
""")
|
||
result = conn.execute(col_query)
|
||
target_columns = set([row[0] for row in result])
|
||
|
||
df_columns = set(df_monthly.columns)
|
||
missing_in_table = df_columns - target_columns
|
||
missing_in_df = target_columns - df_columns
|
||
|
||
if missing_in_table:
|
||
logger.warning(f"任務 {job_id} 欄位警告: DataFrame 有但表中沒有: {missing_in_table}")
|
||
# 移除表中沒有的欄位,避免 INSERT 失敗
|
||
df_monthly = df_monthly.drop(columns=list(missing_in_table), errors='ignore')
|
||
logger.info(f"任務 {job_id} 已移除多餘欄位,剩餘 {len(df_monthly.columns)} 個欄位")
|
||
|
||
if missing_in_df:
|
||
logger.warning(f"任務 {job_id} 欄位警告: 表中有但 DataFrame 沒有: {missing_in_df}")
|
||
|
||
# 取得本次匯入的日期列表(使用原始「日期」欄位)
|
||
unique_dates = []
|
||
if '日期' in df.columns:
|
||
unique_dates = df['日期'].dropna().unique().tolist()
|
||
logger.info(f"任務 {job_id} 準備同步 {len(unique_dates)} 個日期的資料")
|
||
|
||
if len(unique_dates) > 0:
|
||
# 刪除 realtime_sales_monthly 中相同日期的舊資料(去重)
|
||
date_placeholders, date_params = _build_in_clause("monthly_date", unique_dates)
|
||
|
||
with engine.connect() as conn:
|
||
delete_monthly_query = text(
|
||
f'DELETE FROM {monthly_table} WHERE "日期" IN ({date_placeholders})'
|
||
)
|
||
result = conn.execute(delete_monthly_query, date_params)
|
||
deleted_monthly = result.rowcount
|
||
conn.commit()
|
||
|
||
if deleted_monthly > 0:
|
||
logger.info(f"任務 {job_id} 已從 {monthly_table} 刪除 {deleted_monthly} 筆同日期舊資料")
|
||
|
||
# 寫入 realtime_sales_monthly
|
||
df_monthly.to_sql(
|
||
monthly_table,
|
||
engine,
|
||
if_exists='append',
|
||
index=False,
|
||
method='multi',
|
||
chunksize=1000
|
||
)
|
||
|
||
logger.info(f"任務 {job_id} 已同步 {len(df_monthly)} 筆資料至 {monthly_table}")
|
||
|
||
# 驗證同步結果
|
||
if len(unique_dates) > 0:
|
||
with engine.connect() as conn:
|
||
date_placeholders, date_params = _build_in_clause("monthly_verify_date", unique_dates)
|
||
verify_query = text(
|
||
f'SELECT COUNT(*) FROM {monthly_table} WHERE "日期" IN ({date_placeholders})'
|
||
)
|
||
verify_count = conn.execute(verify_query, date_params).scalar()
|
||
|
||
if verify_count >= len(df_monthly):
|
||
logger.info(f"任務 {job_id} 同步驗證成功: {monthly_table} 現有 {verify_count} 筆資料")
|
||
sync_success = True
|
||
else:
|
||
sync_error_msg = f"同步驗證失敗: 預期 {len(df_monthly)} 筆, 實際 {verify_count} 筆"
|
||
logger.error(f"任務 {job_id} {sync_error_msg}")
|
||
else:
|
||
sync_success = True # 沒有日期資料時視為成功
|
||
|
||
except Exception as sync_error:
|
||
# 同步失敗,記錄完整錯誤
|
||
import traceback
|
||
sync_error_msg = str(sync_error)
|
||
logger.error(f"任務 {job_id} 同步至 {monthly_table} 失敗: {sync_error_msg}")
|
||
logger.error(f"任務 {job_id} 同步錯誤堆疊:\n{traceback.format_exc()}")
|
||
|
||
# 2026-01-30 新增:發送同步失敗告警
|
||
try:
|
||
from services.notification_manager import NotificationManager
|
||
notifier = NotificationManager()
|
||
alert_msg = (
|
||
f"⚠️ 業績資料同步失敗告警\n"
|
||
f"{'='*30}\n"
|
||
f"任務 ID: {job_id}\n"
|
||
f"目標表: {monthly_table}\n"
|
||
f"錯誤: {sync_error_msg[:200]}\n"
|
||
f"{'='*30}\n"
|
||
f"daily_sales_snapshot 已匯入成功,但業績分析儀表板需要手動同步"
|
||
)
|
||
notifier._send_telegram_messages([alert_msg])
|
||
logger.info(f"任務 {job_id} 已發送同步失敗告警")
|
||
except Exception as notify_error:
|
||
logger.error(f"任務 {job_id} 發送告警失敗: {notify_error}")
|
||
|
||
|
||
# 更新成功資訊
|
||
self.update_job_progress(
|
||
job_id,
|
||
processed_rows=total_rows,
|
||
success_rows=total_rows
|
||
)
|
||
|
||
# 2026-01-30 修正:根據同步狀態設置完成訊息
|
||
if sync_success:
|
||
completion_msg = '匯入完成(已同步至業績分析儀表板)'
|
||
else:
|
||
completion_msg = '匯入完成(警告:業績分析儀表板同步失敗,需手動處理)'
|
||
|
||
self.update_job_status(
|
||
job_id,
|
||
'completed',
|
||
100,
|
||
completion_msg
|
||
)
|
||
|
||
# 計算日期範圍
|
||
date_min = None
|
||
date_max = None
|
||
valid_dates = df['snapshot_date'].dropna().unique()
|
||
if len(valid_dates) > 0:
|
||
sorted_dates = sorted([d for d in valid_dates if d is not None])
|
||
if sorted_dates:
|
||
date_min = str(sorted_dates[0])
|
||
date_max = str(sorted_dates[-1])
|
||
logger.info(f"任務 {job_id} 日期範圍: {date_min} ~ {date_max}")
|
||
|
||
# 更新匯入摘要 (2026-01-30 修正:加入同步狀態)
|
||
if sync_success:
|
||
sync_message = f'成功匯入 {total_rows} 筆資料,已同步至業績分析儀表板'
|
||
else:
|
||
sync_message = f'成功匯入 {total_rows} 筆資料,但同步至業績分析儀表板失敗: {sync_error_msg}'
|
||
|
||
summary = {
|
||
'imported_count': total_rows,
|
||
'table_name': table_name,
|
||
'synced_to': 'realtime_sales_monthly' if sync_success else None,
|
||
'sync_success': sync_success,
|
||
'sync_error': sync_error_msg,
|
||
'verified': True, # daily_sales_snapshot 驗證
|
||
'date_min': date_min,
|
||
'date_max': date_max,
|
||
'message': sync_message
|
||
}
|
||
|
||
session = Session()
|
||
try:
|
||
job = session.query(ImportJob).filter_by(id=job_id).first()
|
||
if job:
|
||
job.import_summary = json.dumps(summary, ensure_ascii=False)
|
||
session.commit()
|
||
finally:
|
||
session.close()
|
||
|
||
logger.info(f"任務 {job_id} 匯入成功: {total_rows} 筆")
|
||
try:
|
||
from services.cache_service import clear_growth_cache
|
||
clear_growth_cache()
|
||
except Exception as cache_error:
|
||
logger.warning(f"任務 {job_id} 成長分析快取清除失敗: {cache_error}")
|
||
# daily_sales cache 失效仍靠 _get_data_fingerprint(DB max(snapshot_date)+count(*));
|
||
# growth cache 另有 shared file + source fingerprint,匯入後主動清掉避免短 TTL 內看到舊圖。
|
||
return True
|
||
|
||
except Exception as e:
|
||
error_msg = f"匯入過程發生異常: {str(e)}"
|
||
self.update_job_status(job_id, 'failed', 50, '匯入失敗', error_msg)
|
||
logger.error(f"任務 {job_id} 匯入異常: {str(e)}")
|
||
import traceback
|
||
logger.error(traceback.format_exc())
|
||
return False
|
||
|
||
def auto_import_from_drive(self) -> Dict[str, Any]:
|
||
"""
|
||
從 Google Drive 自動匯入檔案
|
||
|
||
Returns:
|
||
Dict: 執行結果
|
||
"""
|
||
try:
|
||
# 取得配置
|
||
folder_path = self.get_config('gdrive_folder_path', '業績報表/當日業績')
|
||
file_pattern = self.get_config('gdrive_file_pattern', '即時業績_當日')
|
||
|
||
logger.info(f"開始檢查 Google Drive: {folder_path}")
|
||
|
||
# 列出檔案
|
||
files = drive_service.list_files_in_folder(folder_path, file_pattern)
|
||
|
||
if not files:
|
||
logger.info("沒有找到待匯入的檔案")
|
||
|
||
# Staleness gate (critic-approved 2026-05-03)
|
||
# 'move-then-success' 反模式:成功 import 後 move_file 把 Excel 搬到
|
||
# 「已匯入」資料夾 → 後續排程 list 回空 → 走此分支 silent return success
|
||
# → 4/27~5/2 daily_sales_snapshot 停更 8 天無告警。補主動偵測:
|
||
# Drive 空 + DB ≥3 天無新資料時主動發催促告警(週末跨假期不誤觸)。
|
||
try:
|
||
from database.manager import get_session
|
||
from sqlalchemy import text
|
||
from datetime import date
|
||
from services.openclaw_strategist_service import _send_data_stale_alert
|
||
|
||
_stale_session = get_session()
|
||
try:
|
||
last_date = _stale_session.execute(
|
||
text("SELECT MAX(snapshot_date)::date FROM daily_sales_snapshot")
|
||
).scalar()
|
||
finally:
|
||
_stale_session.close()
|
||
|
||
if last_date:
|
||
days_since = (date.today() - last_date).days
|
||
if days_since >= 3:
|
||
_send_data_stale_alert(
|
||
report_type="upstream_drive",
|
||
last_date=str(last_date),
|
||
period=f"已停更 {days_since} 天",
|
||
)
|
||
except Exception:
|
||
logger.error(
|
||
"staleness check failed in auto_import_from_drive",
|
||
exc_info=True,
|
||
)
|
||
|
||
return {
|
||
'success': True,
|
||
'message': '沒有找到待匯入的檔案',
|
||
'file_count': 0
|
||
}
|
||
|
||
# 處理每個檔案
|
||
imported_count = 0
|
||
total_rows = 0
|
||
all_dates = [] # 收集所有匯入的日期
|
||
failed_files = []
|
||
|
||
for file in files:
|
||
file_id = file['id']
|
||
file_name = file['name']
|
||
file_size = file.get('size', 0)
|
||
|
||
logger.info(f"發現檔案: {file_name}")
|
||
|
||
# 建立匯入任務
|
||
job_id = self.create_import_job('daily_sales', file_id, file_name, file_size)
|
||
if not job_id:
|
||
continue
|
||
|
||
# 下載檔案
|
||
self.update_job_status(job_id, 'downloading', 10, '正在下載檔案...')
|
||
|
||
temp_dir = 'data/temp'
|
||
os.makedirs(temp_dir, exist_ok=True)
|
||
local_path = os.path.join(temp_dir, file_name)
|
||
|
||
if not drive_service.download_file(file_id, local_path):
|
||
err = '無法從 Google Drive 下載檔案'
|
||
self.update_job_status(job_id, 'failed', 10, '下載失敗', err)
|
||
failed_files.append({'file': file_name, 'error': err})
|
||
continue
|
||
|
||
# 更新本地路徑
|
||
session = Session()
|
||
try:
|
||
job = session.query(ImportJob).filter_by(id=job_id).first()
|
||
if job:
|
||
job.local_file_path = local_path
|
||
session.commit()
|
||
finally:
|
||
session.close()
|
||
|
||
self.update_job_status(job_id, 'downloading', 40, '下載完成')
|
||
|
||
# 匯入資料
|
||
if self.process_daily_sales_import(job_id, local_path):
|
||
# 移動 Google Drive 檔案到「已匯入」資料夾
|
||
self.update_job_status(job_id, 'completed', 90, '正在移動雲端檔案...')
|
||
|
||
# 取得「已匯入」資料夾路徑配置
|
||
archive_folder = self.get_config('gdrive_archive_folder', '已匯入')
|
||
|
||
if drive_service.move_file(file_id, archive_folder):
|
||
logger.info(f"已移動 Google Drive 檔案到「{archive_folder}」: {file_name}")
|
||
else:
|
||
logger.warning(f"無法移動 Google Drive 檔案: {file_name}")
|
||
|
||
self.update_job_status(job_id, 'completed', 100, '完成')
|
||
imported_count += 1
|
||
|
||
# 讀取 job summary 取得匯入筆數和日期範圍
|
||
session = Session()
|
||
try:
|
||
job = session.query(ImportJob).filter_by(id=job_id).first()
|
||
if job and job.import_summary:
|
||
summary = json.loads(job.import_summary)
|
||
total_rows += summary.get('imported_count', 0)
|
||
if summary.get('date_min'):
|
||
all_dates.append(summary['date_min'])
|
||
if summary.get('date_max'):
|
||
all_dates.append(summary['date_max'])
|
||
finally:
|
||
session.close()
|
||
|
||
# 清理本地檔案
|
||
try:
|
||
os.remove(local_path)
|
||
logger.info(f"已清理本地檔案: {local_path}")
|
||
except Exception as e:
|
||
logger.warning(f"清理本地檔案失敗: {str(e)}")
|
||
else:
|
||
session = Session()
|
||
try:
|
||
job = session.query(ImportJob).filter_by(id=job_id).first()
|
||
failed_files.append({
|
||
'file': file_name,
|
||
'job_id': job_id,
|
||
'error': (job.error_message if job else None) or '匯入失敗,請查看 import_jobs',
|
||
})
|
||
finally:
|
||
session.close()
|
||
|
||
# 計算日期範圍
|
||
date_range = None
|
||
if all_dates:
|
||
sorted_dates = sorted(set(all_dates))
|
||
if sorted_dates:
|
||
date_range = {
|
||
'min': sorted_dates[0],
|
||
'max': sorted_dates[-1]
|
||
}
|
||
|
||
if failed_files:
|
||
first_error = failed_files[0].get('error') or '未知錯誤'
|
||
message = (
|
||
f'找到 {len(files)} 個檔案,成功匯入 {imported_count} 個,'
|
||
f'失敗 {len(failed_files)} 個。首筆錯誤:{first_error[:220]}'
|
||
)
|
||
else:
|
||
message = f'成功匯入 {imported_count} 個檔案'
|
||
|
||
return {
|
||
'success': len(failed_files) == 0,
|
||
'message': message,
|
||
'file_count': len(files),
|
||
'imported_count': imported_count,
|
||
'failed_count': len(failed_files),
|
||
'errors': failed_files,
|
||
'total_rows': total_rows,
|
||
'date_range': date_range
|
||
}
|
||
|
||
except Exception as e:
|
||
error_msg = str(e)
|
||
logger.error(f"自動匯入失敗: {error_msg}")
|
||
|
||
# 區分連線錯誤和真正的匯入錯誤
|
||
# 連線錯誤(如 Broken pipe、網路問題)不應發送告警
|
||
connection_errors = [
|
||
'Broken pipe',
|
||
'Connection refused',
|
||
'Connection reset',
|
||
'Connection timed out',
|
||
'Name or service not known',
|
||
'No route to host',
|
||
'Network is unreachable',
|
||
'SSL',
|
||
'authenticate',
|
||
'credentials',
|
||
'token'
|
||
]
|
||
|
||
is_connection_error = any(err.lower() in error_msg.lower() for err in connection_errors)
|
||
|
||
if is_connection_error:
|
||
# 連線錯誤:返回成功但無檔案(避免發送告警)
|
||
logger.warning(f"Google Drive 連線問題,跳過本次匯入檢查: {error_msg}")
|
||
return {
|
||
'success': True, # 標記為成功避免告警
|
||
'message': f'Google Drive 連線問題,跳過本次檢查',
|
||
'file_count': 0,
|
||
'imported_count': 0,
|
||
'connection_error': True # 標記為連線錯誤供日誌記錄
|
||
}
|
||
else:
|
||
# 真正的匯入錯誤:返回失敗
|
||
return {
|
||
'success': False,
|
||
'message': f'自動匯入失敗: {error_msg}',
|
||
'file_count': 0,
|
||
'imported_count': 0
|
||
}
|
||
|
||
|
||
# 建立全域服務實例
|
||
import_service = ImportService()
|