From 49f6b3ebd97bb1a1f7e2123778c2ba074f076198 Mon Sep 17 00:00:00 2001 From: OoO Date: Mon, 18 May 2026 14:23:49 +0800 Subject: [PATCH] fix: parameterize import snapshot date filters --- config.py | 2 +- services/import_service.py | 90 ++++++++++++++++++++++--- tests/test_import_service_sql_params.py | 16 +++++ 3 files changed, 98 insertions(+), 10 deletions(-) diff --git a/config.py b/config.py index e00d630..987bea8 100644 --- a/config.py +++ b/config.py @@ -320,7 +320,7 @@ YOUTUBE_API_KEY = os.getenv('YOUTUBE_API_KEY', '') # ========================================== # 系統版本與路徑 # ========================================== -SYSTEM_VERSION = "V10.182" +SYSTEM_VERSION = "V10.183" LOG_FILE_PATH = os.path.join(BASE_DIR, 'logs/system.log') public_url = PUBLIC_URL # 用於模板顯示 diff --git a/services/import_service.py b/services/import_service.py index f712b05..4640969 100644 --- a/services/import_service.py +++ b/services/import_service.py @@ -8,7 +8,7 @@ import os import logging import json -from datetime import datetime +from datetime import date, datetime from typing import Optional, Dict, Any from sqlalchemy import create_engine, text from sqlalchemy.orm import sessionmaker @@ -36,6 +36,50 @@ def _build_in_clause(prefix: str, values) -> tuple: params[key] = value return ", ".join(placeholders), params + +def _db_dialect_name() -> str: + try: + return engine.dialect.name + except Exception: + return "" + + +def _date_filter_expr(column_name: str) -> str: + """Return a DB-specific date expression for legacy text date columns.""" + if _db_dialect_name() == "postgresql": + return f"{column_name}::date" + return f"date({column_name})" + + +def _normalise_date_values_for_sql(values): + """Keep PostgreSQL params as date objects; use ISO strings for SQLite/text comparisons.""" + normalised = [] + use_native_date = _db_dialect_name() == "postgresql" + for value in values: + if value is None: + continue + if isinstance(value, datetime): + value = value.date() + elif hasattr(value, "date") and not isinstance(value, date): + try: + value = value.date() + except Exception: + pass + if use_native_date: + if isinstance(value, date): + normalised.append(value) + else: + parsed = pd.to_datetime(value, errors="coerce") + if pd.notna(parsed): + normalised.append(parsed.date()) + else: + if isinstance(value, date): + normalised.append(value.isoformat()) + else: + parsed = pd.to_datetime(value, errors="coerce") + normalised.append(parsed.strftime("%Y-%m-%d") if pd.notna(parsed) else str(value)) + return normalised + # 資料庫設定 - 使用 config.py 中的設定,支援 PostgreSQL 和 SQLite def _create_engine_with_pool(db_path): """建立帶有連線池配置的資料庫引擎""" @@ -388,15 +432,16 @@ class ImportService: # 刪除資料庫中相同日期的舊資料(覆蓋邏輯) if len(import_dates) > 0: # 過濾掉 None 值 - valid_dates = [d for d in import_dates if d is not None] + valid_dates = _normalise_date_values_for_sql(import_dates) if valid_dates: date_placeholders, date_params = _build_in_clause("snapshot_date", valid_dates) + snapshot_date_expr = _date_filter_expr("snapshot_date") with engine.connect() as conn: # 刪除相同日期的舊資料 delete_query = text( - f"DELETE FROM {table_name} WHERE snapshot_date IN ({date_placeholders})" + f"DELETE FROM {table_name} WHERE {snapshot_date_expr} IN ({date_placeholders})" ) result = conn.execute(delete_query, date_params) deleted_count = result.rowcount @@ -432,18 +477,20 @@ class ImportService: import_dates = df['snapshot_date'].dropna().unique() if len(import_dates) > 0: # 查詢資料庫中這些日期的資料筆數 - valid_dates = [d for d in import_dates if d is not None] + raw_valid_dates = [d for d in import_dates if d is not None] + valid_dates = _normalise_date_values_for_sql(raw_valid_dates) date_placeholders, date_params = _build_in_clause("verify_date", valid_dates) + snapshot_date_expr = _date_filter_expr("snapshot_date") with engine.connect() as conn: verify_query = text( - f"SELECT COUNT(*) FROM {table_name} WHERE snapshot_date IN ({date_placeholders})" + f"SELECT COUNT(*) FROM {table_name} WHERE {snapshot_date_expr} IN ({date_placeholders})" ) result = conn.execute(verify_query, date_params) db_count = result.scalar() # 驗證:資料庫筆數應該 >= 本次匯入筆數(可能有其他日期的舊資料) - expected_count = len(df[df['snapshot_date'].isin(valid_dates)]) + expected_count = len(df[df['snapshot_date'].isin(raw_valid_dates)]) if db_count >= expected_count: logger.info(f"任務 {job_id} 驗證成功: 預期 {expected_count} 筆, 資料庫有 {db_count} 筆") @@ -730,6 +777,7 @@ class ImportService: imported_count = 0 total_rows = 0 all_dates = [] # 收集所有匯入的日期 + failed_files = [] for file in files: file_id = file['id'] @@ -751,7 +799,9 @@ class ImportService: local_path = os.path.join(temp_dir, file_name) if not drive_service.download_file(file_id, local_path): - self.update_job_status(job_id, 'failed', 10, '下載失敗', '無法從 Google Drive 下載檔案') + err = '無法從 Google Drive 下載檔案' + self.update_job_status(job_id, 'failed', 10, '下載失敗', err) + failed_files.append({'file': file_name, 'error': err}) continue # 更新本地路徑 @@ -802,6 +852,17 @@ class ImportService: logger.info(f"已清理本地檔案: {local_path}") except Exception as e: logger.warning(f"清理本地檔案失敗: {str(e)}") + else: + session = Session() + try: + job = session.query(ImportJob).filter_by(id=job_id).first() + failed_files.append({ + 'file': file_name, + 'job_id': job_id, + 'error': (job.error_message if job else None) or '匯入失敗,請查看 import_jobs', + }) + finally: + session.close() # 計算日期範圍 date_range = None @@ -813,11 +874,22 @@ class ImportService: 'max': sorted_dates[-1] } + if failed_files: + first_error = failed_files[0].get('error') or '未知錯誤' + message = ( + f'找到 {len(files)} 個檔案,成功匯入 {imported_count} 個,' + f'失敗 {len(failed_files)} 個。首筆錯誤:{first_error[:220]}' + ) + else: + message = f'成功匯入 {imported_count} 個檔案' + return { - 'success': True, - 'message': f'成功匯入 {imported_count} 個檔案', + 'success': len(failed_files) == 0, + 'message': message, 'file_count': len(files), 'imported_count': imported_count, + 'failed_count': len(failed_files), + 'errors': failed_files, 'total_rows': total_rows, 'date_range': date_range } diff --git a/tests/test_import_service_sql_params.py b/tests/test_import_service_sql_params.py index cfa3e10..8c2cdf4 100644 --- a/tests/test_import_service_sql_params.py +++ b/tests/test_import_service_sql_params.py @@ -1,5 +1,7 @@ from pathlib import Path +from datetime import date +from services import import_service from services.import_service import _build_in_clause @@ -17,6 +19,20 @@ def test_import_service_does_not_interpolate_date_values_into_in_clauses(): assert "join([f\"'{d}'\" for d in" not in source +def test_daily_snapshot_delete_casts_text_date_column_on_postgres(monkeypatch): + monkeypatch.setattr(import_service, "_db_dialect_name", lambda: "postgresql") + + assert import_service._date_filter_expr("snapshot_date") == "snapshot_date::date" + assert import_service._normalise_date_values_for_sql(["2026-05-01"]) == [date(2026, 5, 1)] + + +def test_daily_snapshot_delete_uses_iso_dates_on_sqlite(monkeypatch): + monkeypatch.setattr(import_service, "_db_dialect_name", lambda: "sqlite") + + assert import_service._date_filter_expr("snapshot_date") == "date(snapshot_date)" + assert import_service._normalise_date_values_for_sql([date(2026, 5, 1)]) == ["2026-05-01"] + + def test_monthly_summary_import_does_not_replace_entire_table(): source = Path("routes/import_routes.py").read_text(encoding="utf-8") start = source.index("DELETE FROM monthly_summary_analysis WHERE year = :y AND month = :m")