From 4e6e9bfe5d3a798d6e2c2c9edb65a099ce4640d9 Mon Sep 17 00:00:00 2001 From: OoO Date: Wed, 13 May 2026 10:28:48 +0800 Subject: [PATCH] =?UTF-8?q?=E7=B6=81=E5=AE=9A=E8=87=AA=E5=8B=95=E5=8C=AF?= =?UTF-8?q?=E5=85=A5=E6=97=A5=E6=9C=9F=E6=9F=A5=E8=A9=A2=E5=8F=83=E6=95=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../claude_inventory_validation_20260513.md | 1 + services/import_service.py | 50 ++++++++++++------- tests/test_import_service_sql_params.py | 17 +++++++ 3 files changed, 51 insertions(+), 17 deletions(-) create mode 100644 tests/test_import_service_sql_params.py diff --git a/docs/memory/claude_inventory_validation_20260513.md b/docs/memory/claude_inventory_validation_20260513.md index bbb171e..b59fa86 100644 --- a/docs/memory/claude_inventory_validation_20260513.md +++ b/docs/memory/claude_inventory_validation_20260513.md @@ -18,6 +18,7 @@ - Claude cost throttle:成本節流檢查失敗仍維持 Claude 可用,但已改為 warning + stack,避免成本保護失效無跡可查。 - `ai_call_logger` caller registry:registry 匯入失敗仍不阻擋 LLM 遙測,但已改為 warning + stack。 - Observability route:promotion review RAG 相似查詢、PPT audit history 缺表、host health probe 寫入、MCP 24h summary 缺表等 fail-safe 區塊已改成 debug/warning log,不再完全靜默。 +- Google Drive import:`services/import_service.py` 的日期 `IN (...)` 刪除/驗證查詢已改為 SQLAlchemy bind params,不再把 DataFrame 日期值拼進 SQL 字串。 ## 已驗證為已修或過期 diff --git a/services/import_service.py b/services/import_service.py index e789725..f712b05 100644 --- a/services/import_service.py +++ b/services/import_service.py @@ -10,7 +10,7 @@ import logging import json from datetime import datetime from typing import Optional, Dict, Any -from sqlalchemy import create_engine +from sqlalchemy import create_engine, text from sqlalchemy.orm import sessionmaker import pandas as pd import pytz @@ -25,6 +25,17 @@ from database.manager import ensure_metadata_initialized # 設定日誌 logger = logging.getLogger(__name__) + +def _build_in_clause(prefix: str, values) -> tuple: + """Build a SQLAlchemy-safe IN clause placeholder list and params.""" + params = {} + placeholders = [] + for idx, value in enumerate(values): + key = f"{prefix}_{idx}" + placeholders.append(f":{key}") + params[key] = value + return ", ".join(placeholders), params + # 資料庫設定 - 使用 config.py 中的設定,支援 PostgreSQL 和 SQLite def _create_engine_with_pool(db_path): """建立帶有連線池配置的資料庫引擎""" @@ -364,7 +375,6 @@ class ImportService: logger.info("未找到日期欄位,使用當前日期(台北時區)") # 寫入資料庫 - 使用全域的 engine(支援 PostgreSQL 和 SQLite) - from sqlalchemy import text # 使用模組頂部定義的 engine,確保連接到正確的資料庫 # 更新進度 @@ -381,13 +391,14 @@ class ImportService: valid_dates = [d for d in import_dates if d is not None] if valid_dates: - # 將日期轉換為字串格式用於 SQL 查詢 - date_list = ', '.join([f"'{d}'" for d in valid_dates]) + date_placeholders, date_params = _build_in_clause("snapshot_date", valid_dates) with engine.connect() as conn: # 刪除相同日期的舊資料 - delete_query = text(f"DELETE FROM {table_name} WHERE snapshot_date IN ({date_list})") - result = conn.execute(delete_query) + delete_query = text( + f"DELETE FROM {table_name} WHERE snapshot_date IN ({date_placeholders})" + ) + result = conn.execute(delete_query, date_params) deleted_count = result.rowcount conn.commit() @@ -421,13 +432,14 @@ class ImportService: import_dates = df['snapshot_date'].dropna().unique() if len(import_dates) > 0: # 查詢資料庫中這些日期的資料筆數 - from sqlalchemy import text - valid_dates = [str(d) for d in import_dates if d is not None] - date_list = ', '.join([f"'{d}'" for d in valid_dates]) + valid_dates = [d for d in import_dates if d is not None] + date_placeholders, date_params = _build_in_clause("verify_date", valid_dates) with engine.connect() as conn: - verify_query = text(f"SELECT COUNT(*) FROM {table_name} WHERE snapshot_date IN ({date_list})") - result = conn.execute(verify_query) + verify_query = text( + f"SELECT COUNT(*) FROM {table_name} WHERE snapshot_date IN ({date_placeholders})" + ) + result = conn.execute(verify_query, date_params) db_count = result.scalar() # 驗證:資料庫筆數應該 >= 本次匯入筆數(可能有其他日期的舊資料) @@ -514,11 +526,13 @@ class ImportService: if len(unique_dates) > 0: # 刪除 realtime_sales_monthly 中相同日期的舊資料(去重) - date_list_monthly = ', '.join([f"'{d}'" for d in unique_dates]) + date_placeholders, date_params = _build_in_clause("monthly_date", unique_dates) with engine.connect() as conn: - delete_monthly_query = text(f'DELETE FROM {monthly_table} WHERE "日期" IN ({date_list_monthly})') - result = conn.execute(delete_monthly_query) + delete_monthly_query = text( + f'DELETE FROM {monthly_table} WHERE "日期" IN ({date_placeholders})' + ) + result = conn.execute(delete_monthly_query, date_params) deleted_monthly = result.rowcount conn.commit() @@ -540,9 +554,11 @@ class ImportService: # 驗證同步結果 if len(unique_dates) > 0: with engine.connect() as conn: - date_list_verify = ', '.join([f"'{d}'" for d in unique_dates]) - verify_query = text(f'SELECT COUNT(*) FROM {monthly_table} WHERE "日期" IN ({date_list_verify})') - verify_count = conn.execute(verify_query).scalar() + date_placeholders, date_params = _build_in_clause("monthly_verify_date", unique_dates) + verify_query = text( + f'SELECT COUNT(*) FROM {monthly_table} WHERE "日期" IN ({date_placeholders})' + ) + verify_count = conn.execute(verify_query, date_params).scalar() if verify_count >= len(df_monthly): logger.info(f"任務 {job_id} 同步驗證成功: {monthly_table} 現有 {verify_count} 筆資料") diff --git a/tests/test_import_service_sql_params.py b/tests/test_import_service_sql_params.py new file mode 100644 index 0000000..1d24e0e --- /dev/null +++ b/tests/test_import_service_sql_params.py @@ -0,0 +1,17 @@ +from pathlib import Path + +from services.import_service import _build_in_clause + + +def test_build_in_clause_binds_each_value(): + clause, params = _build_in_clause("d", ["2026-05-01", "x' OR 1=1 --"]) + + assert clause == ":d_0, :d_1" + assert params == {"d_0": "2026-05-01", "d_1": "x' OR 1=1 --"} + + +def test_import_service_does_not_interpolate_date_values_into_in_clauses(): + source = Path("services/import_service.py").read_text(encoding="utf-8") + + assert "join([f\"'{d}'\"" not in source + assert "join([f\"'{d}'\" for d in" not in source