diff --git a/services/import_service.py b/services/import_service.py index f4f7d64..25c1a1a 100644 --- a/services/import_service.py +++ b/services/import_service.py @@ -60,6 +60,20 @@ def _date_filter_expr(column_name: str) -> str: return f"date({column_name})" +def _table_columns(conn, table_name: str) -> set[str]: + """Return non-id table columns for PostgreSQL and SQLite-backed tests.""" + if _db_dialect_name() == "postgresql": + rows = conn.execute(text(""" + SELECT column_name FROM information_schema.columns + WHERE table_name = :table_name AND column_name != 'id' + ORDER BY ordinal_position + """), {"table_name": table_name}).fetchall() + return {row[0] for row in rows} + + rows = conn.execute(text(f'PRAGMA table_info("{table_name}")')).fetchall() + return {row[1] for row in rows if row[1] != 'id'} + + def _normalise_date_values_for_sql(values): """Keep PostgreSQL params as date objects; use ISO strings for SQLite/text comparisons.""" normalised = [] @@ -765,13 +779,7 @@ class ImportService: # 2026-01-30 新增:驗證 DataFrame 欄位和目標表欄位是否一致 with engine.connect() as conn: - col_query = text(f""" - SELECT column_name FROM information_schema.columns - WHERE table_name = '{monthly_table}' AND column_name != 'id' - ORDER BY ordinal_position - """) - result = conn.execute(col_query) - target_columns = set([row[0] for row in result]) + target_columns = _table_columns(conn, monthly_table) df_columns = set(df_monthly.columns) missing_in_table = df_columns - target_columns @@ -863,26 +871,6 @@ class ImportService: logger.error(f"任務 {job_id} 發送告警失敗: {notify_error}") - # 更新成功資訊 - self.update_job_progress( - job_id, - processed_rows=total_rows, - success_rows=total_rows - ) - - # 2026-01-30 修正:根據同步狀態設置完成訊息 - if sync_success: - completion_msg = '匯入完成(已同步至業績分析儀表板)' - else: - completion_msg = '匯入完成(警告:業績分析儀表板同步失敗,需手動處理)' - - self.update_job_status( - job_id, - 'completed', - 100, - completion_msg - ) - # 計算日期範圍 date_min = None date_max = None @@ -923,6 +911,43 @@ class ImportService: finally: session.close() + if not sync_success: + error_msg = sync_error_msg or '同步至業績分析儀表板失敗' + self.update_job_progress( + job_id, + processed_rows=total_rows, + success_rows=0, + error_rows=total_rows, + ) + self.update_job_status( + job_id, + 'failed', + 95, + '業績分析儀表板同步失敗', + error_msg, + ) + logger.error( + "任務 %s 匯入未完成:daily_sales_snapshot 已寫入,但 %s 同步失敗;" + "保留來源檔案等待重試,不移動 Google Drive 檔案", + job_id, + monthly_table, + ) + return False + + # 更新成功資訊 + self.update_job_progress( + job_id, + processed_rows=total_rows, + success_rows=total_rows + ) + + self.update_job_status( + job_id, + 'completed', + 100, + '匯入完成(已同步至業績分析儀表板)' + ) + logger.info(f"任務 {job_id} 匯入成功: {total_rows} 筆") try: from services.cache_service import clear_growth_cache diff --git a/tests/test_auto_import_failure_boundaries.py b/tests/test_auto_import_failure_boundaries.py new file mode 100644 index 0000000..2082507 --- /dev/null +++ b/tests/test_auto_import_failure_boundaries.py @@ -0,0 +1,145 @@ +import importlib +import json +import os +import sys +import types + +import pandas as pd +from sqlalchemy import text + + +def _load_import_service(monkeypatch, database_url): + os.environ.setdefault("MOMO_ALLOW_INSECURE_CONFIG_FOR_TESTS", "true") + import config + + monkeypatch.setattr(config, "DATABASE_PATH", database_url) + import services.import_service as import_service + + return importlib.reload(import_service) + + +def _prepare_daily_sales_tables(import_service): + import_service.Base.metadata.create_all(import_service.engine) + with import_service.engine.begin() as conn: + conn.execute(text("DROP TABLE IF EXISTS daily_sales_snapshot")) + conn.execute(text("DROP TABLE IF EXISTS realtime_sales_monthly")) + conn.execute(text(""" + CREATE TABLE daily_sales_snapshot ( + "日期" TEXT, + "商品ID" TEXT, + "商品名稱" TEXT, + "銷售金額" INTEGER, + snapshot_date TEXT + ) + """)) + conn.execute(text(""" + CREATE TABLE realtime_sales_monthly ( + "日期" TEXT, + "商品ID" TEXT, + "商品名稱" TEXT, + "銷售金額" INTEGER + ) + """)) + + +def test_daily_sales_import_fails_when_monthly_sync_fails(monkeypatch, tmp_path): + import_service = _load_import_service(monkeypatch, f"sqlite:///{tmp_path / 'momo.db'}") + _prepare_daily_sales_tables(import_service) + + class FakeNotificationManager: + sent_messages = [] + + def _send_telegram_messages(self, messages): + self.sent_messages.extend(messages) + + monkeypatch.setitem( + sys.modules, + "services.notification_manager", + types.SimpleNamespace(NotificationManager=FakeNotificationManager), + ) + + source_df = pd.DataFrame([ + { + "日期": "2026-06-24", + "商品ID": "A001", + "商品名稱": "測試商品", + "銷售金額": 1200, + } + ]) + monkeypatch.setattr( + import_service, + "_read_daily_sales_excel", + lambda _path: ( + source_df.copy(), + {"date_col": "日期", "sheet_name": "即時業績明細", "header_row": 1}, + ), + ) + + original_to_sql = pd.DataFrame.to_sql + + def fail_monthly_sync(self, name, *args, **kwargs): + if name == "realtime_sales_monthly": + raise RuntimeError("monthly sync boom") + return original_to_sql(self, name, *args, **kwargs) + + monkeypatch.setattr(pd.DataFrame, "to_sql", fail_monthly_sync) + + service = import_service.ImportService() + job_id = service.create_import_job("daily_sales", "drive-file-1", "daily.xlsx", 1024) + + assert service.process_daily_sales_import(job_id, str(tmp_path / "daily.xlsx")) is False + + session = import_service.Session() + try: + job = session.query(import_service.ImportJob).filter_by(id=job_id).one() + assert job.status == "failed" + assert job.progress_percent == 95 + assert job.current_step == "業績分析儀表板同步失敗" + assert "monthly sync boom" in job.error_message + summary = json.loads(job.import_summary) + assert summary["sync_success"] is False + assert summary["synced_to"] is None + assert "monthly sync boom" in summary["sync_error"] + finally: + session.close() + + with import_service.engine.connect() as conn: + snapshot_rows = conn.execute(text("SELECT COUNT(*) FROM daily_sales_snapshot")).scalar() + monthly_rows = conn.execute(text("SELECT COUNT(*) FROM realtime_sales_monthly")).scalar() + + assert snapshot_rows == 1 + assert monthly_rows == 0 + assert FakeNotificationManager.sent_messages + + +def test_auto_import_does_not_move_drive_file_when_import_fails(monkeypatch, tmp_path): + import_service = _load_import_service(monkeypatch, f"sqlite:///{tmp_path / 'momo.db'}") + import_service.Base.metadata.create_all(import_service.engine) + + class FakeDriveService: + moved_files = [] + + def list_files_in_folder(self, folder_path, file_pattern): + return [{"id": "drive-file-1", "name": "daily.xlsx", "size": 1024}] + + def download_file(self, file_id, local_path): + os.makedirs(os.path.dirname(local_path), exist_ok=True) + with open(local_path, "wb") as handle: + handle.write(b"test") + return True + + def move_file(self, file_id, folder, create_missing=False): + self.moved_files.append((file_id, folder, create_missing)) + return True + + fake_drive = FakeDriveService() + monkeypatch.setattr(import_service, "drive_service", fake_drive) + + service = import_service.ImportService() + monkeypatch.setattr(service, "process_daily_sales_import", lambda job_id, path: False) + + result = service.auto_import_from_drive() + + assert result["success"] is False + assert result["failed_count"] == 1 + assert fake_drive.moved_files == []