fix: parameterize import snapshot date filters
All checks were successful
CD Pipeline / deploy (push) Successful in 1m3s
All checks were successful
CD Pipeline / deploy (push) Successful in 1m3s
This commit is contained in:
@@ -320,7 +320,7 @@ YOUTUBE_API_KEY = os.getenv('YOUTUBE_API_KEY', '')
|
||||
# ==========================================
|
||||
# 系統版本與路徑
|
||||
# ==========================================
|
||||
SYSTEM_VERSION = "V10.182"
|
||||
SYSTEM_VERSION = "V10.183"
|
||||
LOG_FILE_PATH = os.path.join(BASE_DIR, 'logs/system.log')
|
||||
public_url = PUBLIC_URL # 用於模板顯示
|
||||
|
||||
|
||||
@@ -8,7 +8,7 @@
|
||||
import os
|
||||
import logging
|
||||
import json
|
||||
from datetime import datetime
|
||||
from datetime import date, datetime
|
||||
from typing import Optional, Dict, Any
|
||||
from sqlalchemy import create_engine, text
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
@@ -36,6 +36,50 @@ def _build_in_clause(prefix: str, values) -> tuple:
|
||||
params[key] = value
|
||||
return ", ".join(placeholders), params
|
||||
|
||||
|
||||
def _db_dialect_name() -> str:
|
||||
try:
|
||||
return engine.dialect.name
|
||||
except Exception:
|
||||
return ""
|
||||
|
||||
|
||||
def _date_filter_expr(column_name: str) -> str:
|
||||
"""Return a DB-specific date expression for legacy text date columns."""
|
||||
if _db_dialect_name() == "postgresql":
|
||||
return f"{column_name}::date"
|
||||
return f"date({column_name})"
|
||||
|
||||
|
||||
def _normalise_date_values_for_sql(values):
|
||||
"""Keep PostgreSQL params as date objects; use ISO strings for SQLite/text comparisons."""
|
||||
normalised = []
|
||||
use_native_date = _db_dialect_name() == "postgresql"
|
||||
for value in values:
|
||||
if value is None:
|
||||
continue
|
||||
if isinstance(value, datetime):
|
||||
value = value.date()
|
||||
elif hasattr(value, "date") and not isinstance(value, date):
|
||||
try:
|
||||
value = value.date()
|
||||
except Exception:
|
||||
pass
|
||||
if use_native_date:
|
||||
if isinstance(value, date):
|
||||
normalised.append(value)
|
||||
else:
|
||||
parsed = pd.to_datetime(value, errors="coerce")
|
||||
if pd.notna(parsed):
|
||||
normalised.append(parsed.date())
|
||||
else:
|
||||
if isinstance(value, date):
|
||||
normalised.append(value.isoformat())
|
||||
else:
|
||||
parsed = pd.to_datetime(value, errors="coerce")
|
||||
normalised.append(parsed.strftime("%Y-%m-%d") if pd.notna(parsed) else str(value))
|
||||
return normalised
|
||||
|
||||
# 資料庫設定 - 使用 config.py 中的設定,支援 PostgreSQL 和 SQLite
|
||||
def _create_engine_with_pool(db_path):
|
||||
"""建立帶有連線池配置的資料庫引擎"""
|
||||
@@ -388,15 +432,16 @@ class ImportService:
|
||||
# 刪除資料庫中相同日期的舊資料(覆蓋邏輯)
|
||||
if len(import_dates) > 0:
|
||||
# 過濾掉 None 值
|
||||
valid_dates = [d for d in import_dates if d is not None]
|
||||
valid_dates = _normalise_date_values_for_sql(import_dates)
|
||||
|
||||
if valid_dates:
|
||||
date_placeholders, date_params = _build_in_clause("snapshot_date", valid_dates)
|
||||
snapshot_date_expr = _date_filter_expr("snapshot_date")
|
||||
|
||||
with engine.connect() as conn:
|
||||
# 刪除相同日期的舊資料
|
||||
delete_query = text(
|
||||
f"DELETE FROM {table_name} WHERE snapshot_date IN ({date_placeholders})"
|
||||
f"DELETE FROM {table_name} WHERE {snapshot_date_expr} IN ({date_placeholders})"
|
||||
)
|
||||
result = conn.execute(delete_query, date_params)
|
||||
deleted_count = result.rowcount
|
||||
@@ -432,18 +477,20 @@ class ImportService:
|
||||
import_dates = df['snapshot_date'].dropna().unique()
|
||||
if len(import_dates) > 0:
|
||||
# 查詢資料庫中這些日期的資料筆數
|
||||
valid_dates = [d for d in import_dates if d is not None]
|
||||
raw_valid_dates = [d for d in import_dates if d is not None]
|
||||
valid_dates = _normalise_date_values_for_sql(raw_valid_dates)
|
||||
date_placeholders, date_params = _build_in_clause("verify_date", valid_dates)
|
||||
snapshot_date_expr = _date_filter_expr("snapshot_date")
|
||||
|
||||
with engine.connect() as conn:
|
||||
verify_query = text(
|
||||
f"SELECT COUNT(*) FROM {table_name} WHERE snapshot_date IN ({date_placeholders})"
|
||||
f"SELECT COUNT(*) FROM {table_name} WHERE {snapshot_date_expr} IN ({date_placeholders})"
|
||||
)
|
||||
result = conn.execute(verify_query, date_params)
|
||||
db_count = result.scalar()
|
||||
|
||||
# 驗證:資料庫筆數應該 >= 本次匯入筆數(可能有其他日期的舊資料)
|
||||
expected_count = len(df[df['snapshot_date'].isin(valid_dates)])
|
||||
expected_count = len(df[df['snapshot_date'].isin(raw_valid_dates)])
|
||||
|
||||
if db_count >= expected_count:
|
||||
logger.info(f"任務 {job_id} 驗證成功: 預期 {expected_count} 筆, 資料庫有 {db_count} 筆")
|
||||
@@ -730,6 +777,7 @@ class ImportService:
|
||||
imported_count = 0
|
||||
total_rows = 0
|
||||
all_dates = [] # 收集所有匯入的日期
|
||||
failed_files = []
|
||||
|
||||
for file in files:
|
||||
file_id = file['id']
|
||||
@@ -751,7 +799,9 @@ class ImportService:
|
||||
local_path = os.path.join(temp_dir, file_name)
|
||||
|
||||
if not drive_service.download_file(file_id, local_path):
|
||||
self.update_job_status(job_id, 'failed', 10, '下載失敗', '無法從 Google Drive 下載檔案')
|
||||
err = '無法從 Google Drive 下載檔案'
|
||||
self.update_job_status(job_id, 'failed', 10, '下載失敗', err)
|
||||
failed_files.append({'file': file_name, 'error': err})
|
||||
continue
|
||||
|
||||
# 更新本地路徑
|
||||
@@ -802,6 +852,17 @@ class ImportService:
|
||||
logger.info(f"已清理本地檔案: {local_path}")
|
||||
except Exception as e:
|
||||
logger.warning(f"清理本地檔案失敗: {str(e)}")
|
||||
else:
|
||||
session = Session()
|
||||
try:
|
||||
job = session.query(ImportJob).filter_by(id=job_id).first()
|
||||
failed_files.append({
|
||||
'file': file_name,
|
||||
'job_id': job_id,
|
||||
'error': (job.error_message if job else None) or '匯入失敗,請查看 import_jobs',
|
||||
})
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
# 計算日期範圍
|
||||
date_range = None
|
||||
@@ -813,11 +874,22 @@ class ImportService:
|
||||
'max': sorted_dates[-1]
|
||||
}
|
||||
|
||||
if failed_files:
|
||||
first_error = failed_files[0].get('error') or '未知錯誤'
|
||||
message = (
|
||||
f'找到 {len(files)} 個檔案,成功匯入 {imported_count} 個,'
|
||||
f'失敗 {len(failed_files)} 個。首筆錯誤:{first_error[:220]}'
|
||||
)
|
||||
else:
|
||||
message = f'成功匯入 {imported_count} 個檔案'
|
||||
|
||||
return {
|
||||
'success': True,
|
||||
'message': f'成功匯入 {imported_count} 個檔案',
|
||||
'success': len(failed_files) == 0,
|
||||
'message': message,
|
||||
'file_count': len(files),
|
||||
'imported_count': imported_count,
|
||||
'failed_count': len(failed_files),
|
||||
'errors': failed_files,
|
||||
'total_rows': total_rows,
|
||||
'date_range': date_range
|
||||
}
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
from pathlib import Path
|
||||
from datetime import date
|
||||
|
||||
from services import import_service
|
||||
from services.import_service import _build_in_clause
|
||||
|
||||
|
||||
@@ -17,6 +19,20 @@ def test_import_service_does_not_interpolate_date_values_into_in_clauses():
|
||||
assert "join([f\"'{d}'\" for d in" not in source
|
||||
|
||||
|
||||
def test_daily_snapshot_delete_casts_text_date_column_on_postgres(monkeypatch):
|
||||
monkeypatch.setattr(import_service, "_db_dialect_name", lambda: "postgresql")
|
||||
|
||||
assert import_service._date_filter_expr("snapshot_date") == "snapshot_date::date"
|
||||
assert import_service._normalise_date_values_for_sql(["2026-05-01"]) == [date(2026, 5, 1)]
|
||||
|
||||
|
||||
def test_daily_snapshot_delete_uses_iso_dates_on_sqlite(monkeypatch):
|
||||
monkeypatch.setattr(import_service, "_db_dialect_name", lambda: "sqlite")
|
||||
|
||||
assert import_service._date_filter_expr("snapshot_date") == "date(snapshot_date)"
|
||||
assert import_service._normalise_date_values_for_sql([date(2026, 5, 1)]) == ["2026-05-01"]
|
||||
|
||||
|
||||
def test_monthly_summary_import_does_not_replace_entire_table():
|
||||
source = Path("routes/import_routes.py").read_text(encoding="utf-8")
|
||||
start = source.index("DELETE FROM monthly_summary_analysis WHERE year = :y AND month = :m")
|
||||
|
||||
Reference in New Issue
Block a user