193 lines
6.6 KiB
Python
193 lines
6.6 KiB
Python
import importlib
|
|
import json
|
|
import os
|
|
import sys
|
|
import types
|
|
|
|
import pandas as pd
|
|
from sqlalchemy import text
|
|
|
|
|
|
def _load_import_service(monkeypatch, database_url):
|
|
os.environ.setdefault("MOMO_ALLOW_INSECURE_CONFIG_FOR_TESTS", "true")
|
|
import config
|
|
|
|
monkeypatch.setattr(config, "DATABASE_PATH", database_url)
|
|
import services.import_service as import_service
|
|
|
|
return importlib.reload(import_service)
|
|
|
|
|
|
def _prepare_daily_sales_tables(import_service):
|
|
import_service.Base.metadata.create_all(import_service.engine)
|
|
with import_service.engine.begin() as conn:
|
|
conn.execute(text("DROP TABLE IF EXISTS daily_sales_snapshot"))
|
|
conn.execute(text("DROP TABLE IF EXISTS realtime_sales_monthly"))
|
|
conn.execute(text("""
|
|
CREATE TABLE daily_sales_snapshot (
|
|
"日期" TEXT,
|
|
"商品ID" TEXT,
|
|
"商品名稱" TEXT,
|
|
"銷售金額" INTEGER,
|
|
snapshot_date TEXT
|
|
)
|
|
"""))
|
|
conn.execute(text("""
|
|
CREATE TABLE realtime_sales_monthly (
|
|
"日期" TEXT,
|
|
"商品ID" TEXT,
|
|
"商品名稱" TEXT,
|
|
"銷售金額" INTEGER
|
|
)
|
|
"""))
|
|
|
|
|
|
def test_daily_sales_import_fails_when_monthly_sync_fails(monkeypatch, tmp_path):
|
|
import_service = _load_import_service(monkeypatch, f"sqlite:///{tmp_path / 'momo.db'}")
|
|
_prepare_daily_sales_tables(import_service)
|
|
|
|
class FakeNotificationManager:
|
|
sent_messages = []
|
|
|
|
def _send_telegram_messages(self, messages):
|
|
self.sent_messages.extend(messages)
|
|
|
|
monkeypatch.setitem(
|
|
sys.modules,
|
|
"services.notification_manager",
|
|
types.SimpleNamespace(NotificationManager=FakeNotificationManager),
|
|
)
|
|
|
|
source_df = pd.DataFrame([
|
|
{
|
|
"日期": "2026-06-24",
|
|
"商品ID": "A001",
|
|
"商品名稱": "測試商品",
|
|
"銷售金額": 1200,
|
|
}
|
|
])
|
|
monkeypatch.setattr(
|
|
import_service,
|
|
"_read_daily_sales_excel",
|
|
lambda _path: (
|
|
source_df.copy(),
|
|
{"date_col": "日期", "sheet_name": "即時業績明細", "header_row": 1},
|
|
),
|
|
)
|
|
|
|
original_to_sql = pd.DataFrame.to_sql
|
|
|
|
def fail_monthly_sync(self, name, *args, **kwargs):
|
|
if name == "realtime_sales_monthly":
|
|
raise RuntimeError("monthly sync boom")
|
|
return original_to_sql(self, name, *args, **kwargs)
|
|
|
|
monkeypatch.setattr(pd.DataFrame, "to_sql", fail_monthly_sync)
|
|
|
|
service = import_service.ImportService()
|
|
job_id = service.create_import_job("daily_sales", "drive-file-1", "daily.xlsx", 1024)
|
|
|
|
assert service.process_daily_sales_import(job_id, str(tmp_path / "daily.xlsx")) is False
|
|
|
|
session = import_service.Session()
|
|
try:
|
|
job = session.query(import_service.ImportJob).filter_by(id=job_id).one()
|
|
assert job.status == "failed"
|
|
assert job.progress_percent == 95
|
|
assert job.current_step == "業績分析儀表板同步失敗"
|
|
assert "monthly sync boom" in job.error_message
|
|
summary = json.loads(job.import_summary)
|
|
assert summary["sync_success"] is False
|
|
assert summary["synced_to"] is None
|
|
assert "monthly sync boom" in summary["sync_error"]
|
|
finally:
|
|
session.close()
|
|
|
|
with import_service.engine.connect() as conn:
|
|
snapshot_rows = conn.execute(text("SELECT COUNT(*) FROM daily_sales_snapshot")).scalar()
|
|
monthly_rows = conn.execute(text("SELECT COUNT(*) FROM realtime_sales_monthly")).scalar()
|
|
|
|
assert snapshot_rows == 1
|
|
assert monthly_rows == 0
|
|
assert FakeNotificationManager.sent_messages
|
|
|
|
|
|
def test_auto_import_does_not_move_drive_file_when_import_fails(monkeypatch, tmp_path):
|
|
import_service = _load_import_service(monkeypatch, f"sqlite:///{tmp_path / 'momo.db'}")
|
|
import_service.Base.metadata.create_all(import_service.engine)
|
|
|
|
class FakeDriveService:
|
|
moved_files = []
|
|
|
|
def list_files_in_folder(self, folder_path, file_pattern):
|
|
return [{"id": "drive-file-1", "name": "daily.xlsx", "size": 1024}]
|
|
|
|
def download_file(self, file_id, local_path):
|
|
os.makedirs(os.path.dirname(local_path), exist_ok=True)
|
|
with open(local_path, "wb") as handle:
|
|
handle.write(b"test")
|
|
return True
|
|
|
|
def move_file(self, file_id, folder, create_missing=False):
|
|
self.moved_files.append((file_id, folder, create_missing))
|
|
return True
|
|
|
|
fake_drive = FakeDriveService()
|
|
monkeypatch.setattr(import_service, "drive_service", fake_drive)
|
|
|
|
service = import_service.ImportService()
|
|
monkeypatch.setattr(service, "process_daily_sales_import", lambda job_id, path: False)
|
|
|
|
result = service.auto_import_from_drive()
|
|
|
|
assert result["success"] is False
|
|
assert result["failed_count"] == 1
|
|
assert fake_drive.moved_files == []
|
|
|
|
|
|
def test_auto_import_fails_closed_when_drive_auth_fails(monkeypatch, tmp_path):
|
|
import_service = _load_import_service(monkeypatch, f"sqlite:///{tmp_path / 'momo.db'}")
|
|
import_service.Base.metadata.create_all(import_service.engine)
|
|
|
|
class FakeDriveService:
|
|
last_error_kind = "authentication_failed"
|
|
last_error = "could not locate runnable browser"
|
|
|
|
def list_files_in_folder(self, folder_path, file_pattern):
|
|
return []
|
|
|
|
monkeypatch.setattr(import_service, "drive_service", FakeDriveService())
|
|
|
|
service = import_service.ImportService()
|
|
result = service.auto_import_from_drive()
|
|
|
|
assert result["success"] is False
|
|
assert result["file_count"] == 0
|
|
assert result["imported_count"] == 0
|
|
assert result["failed_count"] == 1
|
|
assert result["connection_error"] is True
|
|
assert result["error_kind"] == "authentication_failed"
|
|
assert "Google Drive" in result["message"]
|
|
assert "could not locate runnable browser" in result["message"]
|
|
|
|
|
|
def test_auto_import_empty_drive_folder_remains_success(monkeypatch, tmp_path):
|
|
import_service = _load_import_service(monkeypatch, f"sqlite:///{tmp_path / 'momo.db'}")
|
|
import_service.Base.metadata.create_all(import_service.engine)
|
|
|
|
class FakeDriveService:
|
|
last_error_kind = None
|
|
last_error = None
|
|
|
|
def list_files_in_folder(self, folder_path, file_pattern):
|
|
return []
|
|
|
|
monkeypatch.setattr(import_service, "drive_service", FakeDriveService())
|
|
|
|
service = import_service.ImportService()
|
|
result = service.auto_import_from_drive()
|
|
|
|
assert result["success"] is True
|
|
assert result["file_count"] == 0
|
|
assert result["message"] == "沒有找到待匯入的檔案"
|