綁定自動匯入日期查詢參數

This commit is contained in:
OoO
2026-05-13 10:28:48 +08:00
parent e29529f2a9
commit 4e6e9bfe5d
3 changed files with 51 additions and 17 deletions

View File

@@ -18,6 +18,7 @@
- Claude cost throttle成本節流檢查失敗仍維持 Claude 可用,但已改為 warning + stack避免成本保護失效無跡可查。
- `ai_call_logger` caller registryregistry 匯入失敗仍不阻擋 LLM 遙測,但已改為 warning + stack。
- Observability routepromotion review RAG 相似查詢、PPT audit history 缺表、host health probe 寫入、MCP 24h summary 缺表等 fail-safe 區塊已改成 debug/warning log不再完全靜默。
- Google Drive import`services/import_service.py` 的日期 `IN (...)` 刪除/驗證查詢已改為 SQLAlchemy bind params不再把 DataFrame 日期值拼進 SQL 字串。
## 已驗證為已修或過期

View File

@@ -10,7 +10,7 @@ import logging
import json
from datetime import datetime
from typing import Optional, Dict, Any
from sqlalchemy import create_engine
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
import pandas as pd
import pytz
@@ -25,6 +25,17 @@ from database.manager import ensure_metadata_initialized
# 設定日誌
logger = logging.getLogger(__name__)
def _build_in_clause(prefix: str, values) -> tuple:
"""Build a SQLAlchemy-safe IN clause placeholder list and params."""
params = {}
placeholders = []
for idx, value in enumerate(values):
key = f"{prefix}_{idx}"
placeholders.append(f":{key}")
params[key] = value
return ", ".join(placeholders), params
# 資料庫設定 - 使用 config.py 中的設定,支援 PostgreSQL 和 SQLite
def _create_engine_with_pool(db_path):
"""建立帶有連線池配置的資料庫引擎"""
@@ -364,7 +375,6 @@ class ImportService:
logger.info("未找到日期欄位,使用當前日期(台北時區)")
# 寫入資料庫 - 使用全域的 engine支援 PostgreSQL 和 SQLite
from sqlalchemy import text
# 使用模組頂部定義的 engine確保連接到正確的資料庫
# 更新進度
@@ -381,13 +391,14 @@ class ImportService:
valid_dates = [d for d in import_dates if d is not None]
if valid_dates:
# 將日期轉換為字串格式用於 SQL 查詢
date_list = ', '.join([f"'{d}'" for d in valid_dates])
date_placeholders, date_params = _build_in_clause("snapshot_date", valid_dates)
with engine.connect() as conn:
# 刪除相同日期的舊資料
delete_query = text(f"DELETE FROM {table_name} WHERE snapshot_date IN ({date_list})")
result = conn.execute(delete_query)
delete_query = text(
f"DELETE FROM {table_name} WHERE snapshot_date IN ({date_placeholders})"
)
result = conn.execute(delete_query, date_params)
deleted_count = result.rowcount
conn.commit()
@@ -421,13 +432,14 @@ class ImportService:
import_dates = df['snapshot_date'].dropna().unique()
if len(import_dates) > 0:
# 查詢資料庫中這些日期的資料筆數
from sqlalchemy import text
valid_dates = [str(d) for d in import_dates if d is not None]
date_list = ', '.join([f"'{d}'" for d in valid_dates])
valid_dates = [d for d in import_dates if d is not None]
date_placeholders, date_params = _build_in_clause("verify_date", valid_dates)
with engine.connect() as conn:
verify_query = text(f"SELECT COUNT(*) FROM {table_name} WHERE snapshot_date IN ({date_list})")
result = conn.execute(verify_query)
verify_query = text(
f"SELECT COUNT(*) FROM {table_name} WHERE snapshot_date IN ({date_placeholders})"
)
result = conn.execute(verify_query, date_params)
db_count = result.scalar()
# 驗證:資料庫筆數應該 >= 本次匯入筆數(可能有其他日期的舊資料)
@@ -514,11 +526,13 @@ class ImportService:
if len(unique_dates) > 0:
# 刪除 realtime_sales_monthly 中相同日期的舊資料(去重)
date_list_monthly = ', '.join([f"'{d}'" for d in unique_dates])
date_placeholders, date_params = _build_in_clause("monthly_date", unique_dates)
with engine.connect() as conn:
delete_monthly_query = text(f'DELETE FROM {monthly_table} WHERE "日期" IN ({date_list_monthly})')
result = conn.execute(delete_monthly_query)
delete_monthly_query = text(
f'DELETE FROM {monthly_table} WHERE "日期" IN ({date_placeholders})'
)
result = conn.execute(delete_monthly_query, date_params)
deleted_monthly = result.rowcount
conn.commit()
@@ -540,9 +554,11 @@ class ImportService:
# 驗證同步結果
if len(unique_dates) > 0:
with engine.connect() as conn:
date_list_verify = ', '.join([f"'{d}'" for d in unique_dates])
verify_query = text(f'SELECT COUNT(*) FROM {monthly_table} WHERE "日期" IN ({date_list_verify})')
verify_count = conn.execute(verify_query).scalar()
date_placeholders, date_params = _build_in_clause("monthly_verify_date", unique_dates)
verify_query = text(
f'SELECT COUNT(*) FROM {monthly_table} WHERE "日期" IN ({date_placeholders})'
)
verify_count = conn.execute(verify_query, date_params).scalar()
if verify_count >= len(df_monthly):
logger.info(f"任務 {job_id} 同步驗證成功: {monthly_table} 現有 {verify_count} 筆資料")

View File

@@ -0,0 +1,17 @@
from pathlib import Path
from services.import_service import _build_in_clause
def test_build_in_clause_binds_each_value():
clause, params = _build_in_clause("d", ["2026-05-01", "x' OR 1=1 --"])
assert clause == ":d_0, :d_1"
assert params == {"d_0": "2026-05-01", "d_1": "x' OR 1=1 --"}
def test_import_service_does_not_interpolate_date_values_into_in_clauses():
source = Path("services/import_service.py").read_text(encoding="utf-8")
assert "join([f\"'{d}'\"" not in source
assert "join([f\"'{d}'\" for d in" not in source