353 lines
13 KiB
Python
353 lines
13 KiB
Python
import importlib
|
|
import json
|
|
import os
|
|
import sys
|
|
import types
|
|
|
|
import pandas as pd
|
|
from sqlalchemy import text
|
|
|
|
|
|
def _load_import_service(monkeypatch, database_url):
|
|
os.environ.setdefault("MOMO_ALLOW_INSECURE_CONFIG_FOR_TESTS", "true")
|
|
import config
|
|
|
|
monkeypatch.setattr(config, "DATABASE_PATH", database_url)
|
|
import services.import_service as import_service
|
|
|
|
return importlib.reload(import_service)
|
|
|
|
|
|
def _prepare_daily_sales_tables(import_service):
|
|
import_service.Base.metadata.create_all(import_service.engine)
|
|
with import_service.engine.begin() as conn:
|
|
conn.execute(text("DROP TABLE IF EXISTS daily_sales_snapshot"))
|
|
conn.execute(text("DROP TABLE IF EXISTS realtime_sales_monthly"))
|
|
conn.execute(text("""
|
|
CREATE TABLE daily_sales_snapshot (
|
|
"日期" TEXT,
|
|
"商品ID" TEXT,
|
|
"商品名稱" TEXT,
|
|
"銷售金額" INTEGER,
|
|
snapshot_date TEXT
|
|
)
|
|
"""))
|
|
conn.execute(text("""
|
|
CREATE TABLE realtime_sales_monthly (
|
|
"日期" TEXT,
|
|
"商品ID" TEXT,
|
|
"商品名稱" TEXT,
|
|
"銷售金額" INTEGER
|
|
)
|
|
"""))
|
|
|
|
|
|
def test_daily_sales_import_fails_when_monthly_sync_fails(monkeypatch, tmp_path):
|
|
import_service = _load_import_service(monkeypatch, f"sqlite:///{tmp_path / 'momo.db'}")
|
|
_prepare_daily_sales_tables(import_service)
|
|
|
|
class FakeNotificationManager:
|
|
sent_messages = []
|
|
|
|
def _send_telegram_messages(self, messages):
|
|
self.sent_messages.extend(messages)
|
|
|
|
monkeypatch.setitem(
|
|
sys.modules,
|
|
"services.notification_manager",
|
|
types.SimpleNamespace(NotificationManager=FakeNotificationManager),
|
|
)
|
|
|
|
source_df = pd.DataFrame([
|
|
{
|
|
"日期": "2026-06-24",
|
|
"商品ID": "A001",
|
|
"商品名稱": "測試商品",
|
|
"銷售金額": 1200,
|
|
}
|
|
])
|
|
monkeypatch.setattr(
|
|
import_service,
|
|
"_read_daily_sales_excel",
|
|
lambda _path: (
|
|
source_df.copy(),
|
|
{"date_col": "日期", "sheet_name": "即時業績明細", "header_row": 1},
|
|
),
|
|
)
|
|
|
|
original_to_sql = pd.DataFrame.to_sql
|
|
|
|
def fail_monthly_sync(self, name, *args, **kwargs):
|
|
if name == "realtime_sales_monthly":
|
|
raise RuntimeError("monthly sync boom")
|
|
return original_to_sql(self, name, *args, **kwargs)
|
|
|
|
monkeypatch.setattr(pd.DataFrame, "to_sql", fail_monthly_sync)
|
|
|
|
service = import_service.ImportService()
|
|
job_id = service.create_import_job("daily_sales", "drive-file-1", "daily.xlsx", 1024)
|
|
|
|
assert service.process_daily_sales_import(job_id, str(tmp_path / "daily.xlsx")) is False
|
|
|
|
session = import_service.Session()
|
|
try:
|
|
job = session.query(import_service.ImportJob).filter_by(id=job_id).one()
|
|
assert job.status == "failed"
|
|
assert job.progress_percent == 95
|
|
assert job.current_step == "業績分析儀表板同步失敗"
|
|
assert "monthly sync boom" in job.error_message
|
|
summary = json.loads(job.import_summary)
|
|
assert summary["sync_success"] is False
|
|
assert summary["synced_to"] is None
|
|
assert "monthly sync boom" in summary["sync_error"]
|
|
finally:
|
|
session.close()
|
|
|
|
with import_service.engine.connect() as conn:
|
|
snapshot_rows = conn.execute(text("SELECT COUNT(*) FROM daily_sales_snapshot")).scalar()
|
|
monthly_rows = conn.execute(text("SELECT COUNT(*) FROM realtime_sales_monthly")).scalar()
|
|
|
|
assert snapshot_rows == 1
|
|
assert monthly_rows == 0
|
|
assert FakeNotificationManager.sent_messages
|
|
|
|
|
|
def test_auto_import_does_not_move_drive_file_when_import_fails(monkeypatch, tmp_path):
|
|
import_service = _load_import_service(monkeypatch, f"sqlite:///{tmp_path / 'momo.db'}")
|
|
import_service.Base.metadata.create_all(import_service.engine)
|
|
|
|
class FakeDriveService:
|
|
moved_files = []
|
|
|
|
def list_files_in_folder(self, folder_path, file_pattern):
|
|
return [{"id": "drive-file-1", "name": "daily.xlsx", "size": 1024}]
|
|
|
|
def download_file(self, file_id, local_path):
|
|
os.makedirs(os.path.dirname(local_path), exist_ok=True)
|
|
with open(local_path, "wb") as handle:
|
|
handle.write(b"test")
|
|
return True
|
|
|
|
def move_file(self, file_id, folder, create_missing=False):
|
|
self.moved_files.append((file_id, folder, create_missing))
|
|
return True
|
|
|
|
fake_drive = FakeDriveService()
|
|
monkeypatch.setattr(import_service, "drive_service", fake_drive)
|
|
|
|
service = import_service.ImportService()
|
|
monkeypatch.setattr(service, "process_daily_sales_import", lambda job_id, path: False)
|
|
|
|
result = service.auto_import_from_drive()
|
|
|
|
assert result["success"] is False
|
|
assert result["failed_count"] == 1
|
|
assert fake_drive.moved_files == []
|
|
|
|
|
|
def test_auto_import_fails_closed_when_drive_auth_fails(monkeypatch, tmp_path):
|
|
import_service = _load_import_service(monkeypatch, f"sqlite:///{tmp_path / 'momo.db'}")
|
|
import_service.Base.metadata.create_all(import_service.engine)
|
|
|
|
class FakeDriveService:
|
|
last_error_kind = "authentication_failed"
|
|
last_error = "could not locate runnable browser"
|
|
|
|
def list_files_in_folder(self, folder_path, file_pattern):
|
|
return []
|
|
|
|
monkeypatch.setattr(import_service, "drive_service", FakeDriveService())
|
|
|
|
service = import_service.ImportService()
|
|
result = service.auto_import_from_drive()
|
|
|
|
assert result["success"] is False
|
|
assert result["file_count"] == 0
|
|
assert result["imported_count"] == 0
|
|
assert result["failed_count"] == 1
|
|
assert result["connection_error"] is True
|
|
assert result["error_kind"] == "authentication_failed"
|
|
assert "Google Drive" in result["message"]
|
|
assert "重新完成雲端授權" in result["message"]
|
|
assert "could not locate runnable browser" not in result["message"]
|
|
|
|
|
|
def test_import_job_public_payload_hides_database_error(monkeypatch, tmp_path):
|
|
import_service = _load_import_service(monkeypatch, f"sqlite:///{tmp_path / 'momo.db'}")
|
|
import_service.Base.metadata.create_all(import_service.engine)
|
|
|
|
service = import_service.ImportService()
|
|
job_id = service.create_import_job("daily_sales", "drive-file-1", "daily.xlsx", 1024)
|
|
raw_error = (
|
|
"(psycopg2.errors.UndefinedFunction) operator does not exist: "
|
|
"daily_sales_snapshot.snapshot_date >= realtime_sales_monthly.snapshot_date"
|
|
)
|
|
service.update_job_status(job_id, "failed", 100, "業績分析儀表板同步失敗", raw_error)
|
|
|
|
public_job = service.get_job_status(job_id)
|
|
public_error = public_job["error_message"]
|
|
|
|
assert public_job["display_error_message"] == public_error
|
|
assert "重新匯入最新檔案" in public_error
|
|
assert "psycopg2" not in public_error
|
|
assert "daily_sales_snapshot" not in public_error
|
|
assert "realtime_sales_monthly" not in public_error
|
|
assert "snapshot_date" not in public_error
|
|
|
|
session = import_service.Session()
|
|
try:
|
|
stored = session.query(import_service.ImportJob).filter_by(id=job_id).one()
|
|
assert raw_error in stored.error_message
|
|
finally:
|
|
session.close()
|
|
|
|
|
|
def test_import_job_public_payload_hides_import_summary_internals(monkeypatch, tmp_path):
|
|
import_service = _load_import_service(monkeypatch, f"sqlite:///{tmp_path / 'momo.db'}")
|
|
import_service.Base.metadata.create_all(import_service.engine)
|
|
|
|
service = import_service.ImportService()
|
|
job_id = service.create_import_job("daily_sales", "drive-file-1", "daily.xlsx", 1024)
|
|
|
|
session = import_service.Session()
|
|
try:
|
|
job = session.query(import_service.ImportJob).filter_by(id=job_id).one()
|
|
job.status = "completed"
|
|
job.import_summary = json.dumps({
|
|
"imported_count": 42,
|
|
"table_name": "daily_sales_snapshot",
|
|
"synced_to": "realtime_sales_monthly",
|
|
"sync_success": True,
|
|
"sync_error": None,
|
|
"verified": True,
|
|
"date_min": "2026-06-24",
|
|
"date_max": "2026-06-25",
|
|
"source_sheet": "即時業績明細",
|
|
"source_header_row": 1,
|
|
"message": "成功匯入 42 筆資料,已同步至業績分析儀表板",
|
|
}, ensure_ascii=False)
|
|
job.local_file_path = "data/temp/daily.xlsx"
|
|
session.commit()
|
|
finally:
|
|
session.close()
|
|
|
|
public_job = service.get_job_status(job_id)
|
|
public_text = json.dumps(public_job, ensure_ascii=False)
|
|
|
|
assert public_job["import_summary"]["imported_count"] == 42
|
|
assert public_job["import_summary"]["sync_status"] == "已更新業績分析儀表板"
|
|
assert "daily_sales_snapshot" not in public_text
|
|
assert "realtime_sales_monthly" not in public_text
|
|
assert "table_name" not in public_text
|
|
assert "synced_to" not in public_text
|
|
assert "drive-file-1" not in public_text
|
|
assert "local_file_path" not in public_text
|
|
|
|
|
|
def test_google_drive_auth_refuses_browser_without_json_token(monkeypatch, tmp_path):
|
|
import services.google_drive_service as google_drive_service
|
|
|
|
google_drive_service = importlib.reload(google_drive_service)
|
|
token_file = tmp_path / "google_token.json"
|
|
legacy_file = tmp_path / "google_token.pickle"
|
|
credentials_file = tmp_path / "google_credentials.json"
|
|
legacy_file.write_bytes(b"legacy-token-placeholder")
|
|
credentials_file.write_text("{}", encoding="utf-8")
|
|
|
|
monkeypatch.setattr(google_drive_service, "TOKEN_FILE", str(token_file))
|
|
monkeypatch.setattr(google_drive_service, "_LEGACY_PICKLE_FILE", str(legacy_file))
|
|
monkeypatch.setattr(google_drive_service, "CREDENTIALS_FILE", str(credentials_file))
|
|
monkeypatch.delenv("GOOGLE_DRIVE_ALLOW_INTERACTIVE_AUTH", raising=False)
|
|
|
|
def fail_if_browser_flow_is_started(*args, **kwargs):
|
|
raise AssertionError("背景匯入不可啟動瀏覽器 OAuth")
|
|
|
|
monkeypatch.setattr(
|
|
google_drive_service.InstalledAppFlow,
|
|
"from_client_secrets_file",
|
|
fail_if_browser_flow_is_started,
|
|
)
|
|
|
|
service = google_drive_service.GoogleDriveService()
|
|
|
|
assert service.authenticate() is False
|
|
assert service.last_error_kind == "reauthorization_required"
|
|
assert "google_token.json" in service.last_error
|
|
assert "google_token.pickle" in service.last_error
|
|
|
|
|
|
def test_google_drive_auth_paths_can_be_pinned_by_container_env(monkeypatch):
|
|
monkeypatch.setenv("GOOGLE_DRIVE_CREDENTIALS_FILE", "/app/config/google_credentials.json")
|
|
monkeypatch.setenv("GOOGLE_DRIVE_TOKEN_FILE", "/app/config/google_token.json")
|
|
monkeypatch.setenv("GOOGLE_DRIVE_LEGACY_PICKLE_FILE", "/app/config/google_token.pickle")
|
|
|
|
import services.google_drive_service as google_drive_service
|
|
|
|
google_drive_service = importlib.reload(google_drive_service)
|
|
|
|
assert google_drive_service.CREDENTIALS_FILE == "/app/config/google_credentials.json"
|
|
assert google_drive_service.TOKEN_FILE == "/app/config/google_token.json"
|
|
assert google_drive_service._LEGACY_PICKLE_FILE == "/app/config/google_token.pickle"
|
|
|
|
|
|
def test_google_drive_auth_fails_when_token_cannot_be_persisted(monkeypatch, tmp_path):
|
|
import services.google_drive_service as google_drive_service
|
|
|
|
google_drive_service = importlib.reload(google_drive_service)
|
|
token_file = tmp_path / "readonly" / "google_token.json"
|
|
token_file.parent.mkdir()
|
|
credentials_file = tmp_path / "google_credentials.json"
|
|
credentials_file.write_text("{}", encoding="utf-8")
|
|
|
|
class FakeCredentials:
|
|
valid = False
|
|
expired = True
|
|
refresh_token = "refresh-token"
|
|
|
|
def refresh(self, request):
|
|
self.valid = True
|
|
|
|
def to_json(self):
|
|
return "{}"
|
|
|
|
monkeypatch.setattr(google_drive_service, "TOKEN_FILE", str(token_file))
|
|
monkeypatch.setattr(google_drive_service, "CREDENTIALS_FILE", str(credentials_file))
|
|
monkeypatch.setattr(
|
|
google_drive_service.Credentials,
|
|
"from_authorized_user_info",
|
|
lambda *_args, **_kwargs: FakeCredentials(),
|
|
)
|
|
monkeypatch.setattr(
|
|
google_drive_service.os,
|
|
"replace",
|
|
lambda *_args, **_kwargs: (_ for _ in ()).throw(PermissionError("permission denied")),
|
|
)
|
|
token_file.write_text("{}", encoding="utf-8")
|
|
|
|
service = google_drive_service.GoogleDriveService()
|
|
|
|
assert service.authenticate() is False
|
|
assert service.last_error_kind == "token_store_failed"
|
|
assert "主機重啟後自動匯入會再次失敗" in service.last_error
|
|
|
|
|
|
def test_auto_import_empty_drive_folder_remains_success(monkeypatch, tmp_path):
|
|
import_service = _load_import_service(monkeypatch, f"sqlite:///{tmp_path / 'momo.db'}")
|
|
import_service.Base.metadata.create_all(import_service.engine)
|
|
|
|
class FakeDriveService:
|
|
last_error_kind = None
|
|
last_error = None
|
|
|
|
def list_files_in_folder(self, folder_path, file_pattern):
|
|
return []
|
|
|
|
monkeypatch.setattr(import_service, "drive_service", FakeDriveService())
|
|
|
|
service = import_service.ImportService()
|
|
result = service.auto_import_from_drive()
|
|
|
|
assert result["success"] is True
|
|
assert result["file_count"] == 0
|
|
assert result["message"] == "沒有找到待匯入的檔案"
|