fix(import): fail daily sales job on monthly sync failure
All checks were successful
CD Pipeline / deploy (push) Successful in 1m3s

This commit is contained in:
ogt
2026-06-24 21:56:24 +08:00
parent 8240b59b84
commit 84035906ab
2 changed files with 197 additions and 27 deletions

View File

@@ -60,6 +60,20 @@ def _date_filter_expr(column_name: str) -> str:
return f"date({column_name})"
def _table_columns(conn, table_name: str) -> set[str]:
"""Return non-id table columns for PostgreSQL and SQLite-backed tests."""
if _db_dialect_name() == "postgresql":
rows = conn.execute(text("""
SELECT column_name FROM information_schema.columns
WHERE table_name = :table_name AND column_name != 'id'
ORDER BY ordinal_position
"""), {"table_name": table_name}).fetchall()
return {row[0] for row in rows}
rows = conn.execute(text(f'PRAGMA table_info("{table_name}")')).fetchall()
return {row[1] for row in rows if row[1] != 'id'}
def _normalise_date_values_for_sql(values):
"""Keep PostgreSQL params as date objects; use ISO strings for SQLite/text comparisons."""
normalised = []
@@ -765,13 +779,7 @@ class ImportService:
# 2026-01-30 新增:驗證 DataFrame 欄位和目標表欄位是否一致
with engine.connect() as conn:
col_query = text(f"""
SELECT column_name FROM information_schema.columns
WHERE table_name = '{monthly_table}' AND column_name != 'id'
ORDER BY ordinal_position
""")
result = conn.execute(col_query)
target_columns = set([row[0] for row in result])
target_columns = _table_columns(conn, monthly_table)
df_columns = set(df_monthly.columns)
missing_in_table = df_columns - target_columns
@@ -863,26 +871,6 @@ class ImportService:
logger.error(f"任務 {job_id} 發送告警失敗: {notify_error}")
# 更新成功資訊
self.update_job_progress(
job_id,
processed_rows=total_rows,
success_rows=total_rows
)
# 2026-01-30 修正:根據同步狀態設置完成訊息
if sync_success:
completion_msg = '匯入完成(已同步至業績分析儀表板)'
else:
completion_msg = '匯入完成(警告:業績分析儀表板同步失敗,需手動處理)'
self.update_job_status(
job_id,
'completed',
100,
completion_msg
)
# 計算日期範圍
date_min = None
date_max = None
@@ -923,6 +911,43 @@ class ImportService:
finally:
session.close()
if not sync_success:
error_msg = sync_error_msg or '同步至業績分析儀表板失敗'
self.update_job_progress(
job_id,
processed_rows=total_rows,
success_rows=0,
error_rows=total_rows,
)
self.update_job_status(
job_id,
'failed',
95,
'業績分析儀表板同步失敗',
error_msg,
)
logger.error(
"任務 %s 匯入未完成daily_sales_snapshot 已寫入,但 %s 同步失敗;"
"保留來源檔案等待重試,不移動 Google Drive 檔案",
job_id,
monthly_table,
)
return False
# 更新成功資訊
self.update_job_progress(
job_id,
processed_rows=total_rows,
success_rows=total_rows
)
self.update_job_status(
job_id,
'completed',
100,
'匯入完成(已同步至業績分析儀表板)'
)
logger.info(f"任務 {job_id} 匯入成功: {total_rows}")
try:
from services.cache_service import clear_growth_cache

View File

@@ -0,0 +1,145 @@
import importlib
import json
import os
import sys
import types
import pandas as pd
from sqlalchemy import text
def _load_import_service(monkeypatch, database_url):
os.environ.setdefault("MOMO_ALLOW_INSECURE_CONFIG_FOR_TESTS", "true")
import config
monkeypatch.setattr(config, "DATABASE_PATH", database_url)
import services.import_service as import_service
return importlib.reload(import_service)
def _prepare_daily_sales_tables(import_service):
import_service.Base.metadata.create_all(import_service.engine)
with import_service.engine.begin() as conn:
conn.execute(text("DROP TABLE IF EXISTS daily_sales_snapshot"))
conn.execute(text("DROP TABLE IF EXISTS realtime_sales_monthly"))
conn.execute(text("""
CREATE TABLE daily_sales_snapshot (
"日期" TEXT,
"商品ID" TEXT,
"商品名稱" TEXT,
"銷售金額" INTEGER,
snapshot_date TEXT
)
"""))
conn.execute(text("""
CREATE TABLE realtime_sales_monthly (
"日期" TEXT,
"商品ID" TEXT,
"商品名稱" TEXT,
"銷售金額" INTEGER
)
"""))
def test_daily_sales_import_fails_when_monthly_sync_fails(monkeypatch, tmp_path):
import_service = _load_import_service(monkeypatch, f"sqlite:///{tmp_path / 'momo.db'}")
_prepare_daily_sales_tables(import_service)
class FakeNotificationManager:
sent_messages = []
def _send_telegram_messages(self, messages):
self.sent_messages.extend(messages)
monkeypatch.setitem(
sys.modules,
"services.notification_manager",
types.SimpleNamespace(NotificationManager=FakeNotificationManager),
)
source_df = pd.DataFrame([
{
"日期": "2026-06-24",
"商品ID": "A001",
"商品名稱": "測試商品",
"銷售金額": 1200,
}
])
monkeypatch.setattr(
import_service,
"_read_daily_sales_excel",
lambda _path: (
source_df.copy(),
{"date_col": "日期", "sheet_name": "即時業績明細", "header_row": 1},
),
)
original_to_sql = pd.DataFrame.to_sql
def fail_monthly_sync(self, name, *args, **kwargs):
if name == "realtime_sales_monthly":
raise RuntimeError("monthly sync boom")
return original_to_sql(self, name, *args, **kwargs)
monkeypatch.setattr(pd.DataFrame, "to_sql", fail_monthly_sync)
service = import_service.ImportService()
job_id = service.create_import_job("daily_sales", "drive-file-1", "daily.xlsx", 1024)
assert service.process_daily_sales_import(job_id, str(tmp_path / "daily.xlsx")) is False
session = import_service.Session()
try:
job = session.query(import_service.ImportJob).filter_by(id=job_id).one()
assert job.status == "failed"
assert job.progress_percent == 95
assert job.current_step == "業績分析儀表板同步失敗"
assert "monthly sync boom" in job.error_message
summary = json.loads(job.import_summary)
assert summary["sync_success"] is False
assert summary["synced_to"] is None
assert "monthly sync boom" in summary["sync_error"]
finally:
session.close()
with import_service.engine.connect() as conn:
snapshot_rows = conn.execute(text("SELECT COUNT(*) FROM daily_sales_snapshot")).scalar()
monthly_rows = conn.execute(text("SELECT COUNT(*) FROM realtime_sales_monthly")).scalar()
assert snapshot_rows == 1
assert monthly_rows == 0
assert FakeNotificationManager.sent_messages
def test_auto_import_does_not_move_drive_file_when_import_fails(monkeypatch, tmp_path):
import_service = _load_import_service(monkeypatch, f"sqlite:///{tmp_path / 'momo.db'}")
import_service.Base.metadata.create_all(import_service.engine)
class FakeDriveService:
moved_files = []
def list_files_in_folder(self, folder_path, file_pattern):
return [{"id": "drive-file-1", "name": "daily.xlsx", "size": 1024}]
def download_file(self, file_id, local_path):
os.makedirs(os.path.dirname(local_path), exist_ok=True)
with open(local_path, "wb") as handle:
handle.write(b"test")
return True
def move_file(self, file_id, folder, create_missing=False):
self.moved_files.append((file_id, folder, create_missing))
return True
fake_drive = FakeDriveService()
monkeypatch.setattr(import_service, "drive_service", fake_drive)
service = import_service.ImportService()
monkeypatch.setattr(service, "process_daily_sales_import", lambda job_id, path: False)
result = service.auto_import_from_drive()
assert result["success"] is False
assert result["failed_count"] == 1
assert fake_drive.moved_files == []