Files
ewoooc/tests/test_auto_import_failure_boundaries.py
ogt 39ff5d5605
Some checks failed
CD Pipeline / deploy (push) Has been cancelled
fix: sanitize import job summaries
2026-06-25 14:50:28 +08:00

339 lines
12 KiB
Python

import importlib
import json
import os
import sys
import types
import pandas as pd
from sqlalchemy import text
def _load_import_service(monkeypatch, database_url):
os.environ.setdefault("MOMO_ALLOW_INSECURE_CONFIG_FOR_TESTS", "true")
import config
monkeypatch.setattr(config, "DATABASE_PATH", database_url)
import services.import_service as import_service
return importlib.reload(import_service)
def _prepare_daily_sales_tables(import_service):
import_service.Base.metadata.create_all(import_service.engine)
with import_service.engine.begin() as conn:
conn.execute(text("DROP TABLE IF EXISTS daily_sales_snapshot"))
conn.execute(text("DROP TABLE IF EXISTS realtime_sales_monthly"))
conn.execute(text("""
CREATE TABLE daily_sales_snapshot (
"日期" TEXT,
"商品ID" TEXT,
"商品名稱" TEXT,
"銷售金額" INTEGER,
snapshot_date TEXT
)
"""))
conn.execute(text("""
CREATE TABLE realtime_sales_monthly (
"日期" TEXT,
"商品ID" TEXT,
"商品名稱" TEXT,
"銷售金額" INTEGER
)
"""))
def test_daily_sales_import_fails_when_monthly_sync_fails(monkeypatch, tmp_path):
import_service = _load_import_service(monkeypatch, f"sqlite:///{tmp_path / 'momo.db'}")
_prepare_daily_sales_tables(import_service)
class FakeNotificationManager:
sent_messages = []
def _send_telegram_messages(self, messages):
self.sent_messages.extend(messages)
monkeypatch.setitem(
sys.modules,
"services.notification_manager",
types.SimpleNamespace(NotificationManager=FakeNotificationManager),
)
source_df = pd.DataFrame([
{
"日期": "2026-06-24",
"商品ID": "A001",
"商品名稱": "測試商品",
"銷售金額": 1200,
}
])
monkeypatch.setattr(
import_service,
"_read_daily_sales_excel",
lambda _path: (
source_df.copy(),
{"date_col": "日期", "sheet_name": "即時業績明細", "header_row": 1},
),
)
original_to_sql = pd.DataFrame.to_sql
def fail_monthly_sync(self, name, *args, **kwargs):
if name == "realtime_sales_monthly":
raise RuntimeError("monthly sync boom")
return original_to_sql(self, name, *args, **kwargs)
monkeypatch.setattr(pd.DataFrame, "to_sql", fail_monthly_sync)
service = import_service.ImportService()
job_id = service.create_import_job("daily_sales", "drive-file-1", "daily.xlsx", 1024)
assert service.process_daily_sales_import(job_id, str(tmp_path / "daily.xlsx")) is False
session = import_service.Session()
try:
job = session.query(import_service.ImportJob).filter_by(id=job_id).one()
assert job.status == "failed"
assert job.progress_percent == 95
assert job.current_step == "業績分析儀表板同步失敗"
assert "monthly sync boom" in job.error_message
summary = json.loads(job.import_summary)
assert summary["sync_success"] is False
assert summary["synced_to"] is None
assert "monthly sync boom" in summary["sync_error"]
finally:
session.close()
with import_service.engine.connect() as conn:
snapshot_rows = conn.execute(text("SELECT COUNT(*) FROM daily_sales_snapshot")).scalar()
monthly_rows = conn.execute(text("SELECT COUNT(*) FROM realtime_sales_monthly")).scalar()
assert snapshot_rows == 1
assert monthly_rows == 0
assert FakeNotificationManager.sent_messages
def test_auto_import_does_not_move_drive_file_when_import_fails(monkeypatch, tmp_path):
import_service = _load_import_service(monkeypatch, f"sqlite:///{tmp_path / 'momo.db'}")
import_service.Base.metadata.create_all(import_service.engine)
class FakeDriveService:
moved_files = []
def list_files_in_folder(self, folder_path, file_pattern):
return [{"id": "drive-file-1", "name": "daily.xlsx", "size": 1024}]
def download_file(self, file_id, local_path):
os.makedirs(os.path.dirname(local_path), exist_ok=True)
with open(local_path, "wb") as handle:
handle.write(b"test")
return True
def move_file(self, file_id, folder, create_missing=False):
self.moved_files.append((file_id, folder, create_missing))
return True
fake_drive = FakeDriveService()
monkeypatch.setattr(import_service, "drive_service", fake_drive)
service = import_service.ImportService()
monkeypatch.setattr(service, "process_daily_sales_import", lambda job_id, path: False)
result = service.auto_import_from_drive()
assert result["success"] is False
assert result["failed_count"] == 1
assert fake_drive.moved_files == []
def test_auto_import_fails_closed_when_drive_auth_fails(monkeypatch, tmp_path):
import_service = _load_import_service(monkeypatch, f"sqlite:///{tmp_path / 'momo.db'}")
import_service.Base.metadata.create_all(import_service.engine)
class FakeDriveService:
last_error_kind = "authentication_failed"
last_error = "could not locate runnable browser"
def list_files_in_folder(self, folder_path, file_pattern):
return []
monkeypatch.setattr(import_service, "drive_service", FakeDriveService())
service = import_service.ImportService()
result = service.auto_import_from_drive()
assert result["success"] is False
assert result["file_count"] == 0
assert result["imported_count"] == 0
assert result["failed_count"] == 1
assert result["connection_error"] is True
assert result["error_kind"] == "authentication_failed"
assert "Google Drive" in result["message"]
assert "重新完成雲端授權" in result["message"]
assert "could not locate runnable browser" not in result["message"]
def test_import_job_public_payload_hides_database_error(monkeypatch, tmp_path):
import_service = _load_import_service(monkeypatch, f"sqlite:///{tmp_path / 'momo.db'}")
import_service.Base.metadata.create_all(import_service.engine)
service = import_service.ImportService()
job_id = service.create_import_job("daily_sales", "drive-file-1", "daily.xlsx", 1024)
raw_error = (
"(psycopg2.errors.UndefinedFunction) operator does not exist: "
"daily_sales_snapshot.snapshot_date >= realtime_sales_monthly.snapshot_date"
)
service.update_job_status(job_id, "failed", 100, "業績分析儀表板同步失敗", raw_error)
public_job = service.get_job_status(job_id)
public_error = public_job["error_message"]
assert public_job["display_error_message"] == public_error
assert "重新匯入最新檔案" in public_error
assert "psycopg2" not in public_error
assert "daily_sales_snapshot" not in public_error
assert "realtime_sales_monthly" not in public_error
assert "snapshot_date" not in public_error
session = import_service.Session()
try:
stored = session.query(import_service.ImportJob).filter_by(id=job_id).one()
assert raw_error in stored.error_message
finally:
session.close()
def test_import_job_public_payload_hides_import_summary_internals(monkeypatch, tmp_path):
import_service = _load_import_service(monkeypatch, f"sqlite:///{tmp_path / 'momo.db'}")
import_service.Base.metadata.create_all(import_service.engine)
service = import_service.ImportService()
job_id = service.create_import_job("daily_sales", "drive-file-1", "daily.xlsx", 1024)
session = import_service.Session()
try:
job = session.query(import_service.ImportJob).filter_by(id=job_id).one()
job.status = "completed"
job.import_summary = json.dumps({
"imported_count": 42,
"table_name": "daily_sales_snapshot",
"synced_to": "realtime_sales_monthly",
"sync_success": True,
"sync_error": None,
"verified": True,
"date_min": "2026-06-24",
"date_max": "2026-06-25",
"source_sheet": "即時業績明細",
"source_header_row": 1,
"message": "成功匯入 42 筆資料,已同步至業績分析儀表板",
}, ensure_ascii=False)
job.local_file_path = "data/temp/daily.xlsx"
session.commit()
finally:
session.close()
public_job = service.get_job_status(job_id)
public_text = json.dumps(public_job, ensure_ascii=False)
assert public_job["import_summary"]["imported_count"] == 42
assert public_job["import_summary"]["sync_status"] == "已更新業績分析儀表板"
assert "daily_sales_snapshot" not in public_text
assert "realtime_sales_monthly" not in public_text
assert "table_name" not in public_text
assert "synced_to" not in public_text
assert "drive-file-1" not in public_text
assert "local_file_path" not in public_text
def test_google_drive_auth_refuses_browser_without_json_token(monkeypatch, tmp_path):
import services.google_drive_service as google_drive_service
google_drive_service = importlib.reload(google_drive_service)
token_file = tmp_path / "google_token.json"
legacy_file = tmp_path / "google_token.pickle"
credentials_file = tmp_path / "google_credentials.json"
legacy_file.write_bytes(b"legacy-token-placeholder")
credentials_file.write_text("{}", encoding="utf-8")
monkeypatch.setattr(google_drive_service, "TOKEN_FILE", str(token_file))
monkeypatch.setattr(google_drive_service, "_LEGACY_PICKLE_FILE", str(legacy_file))
monkeypatch.setattr(google_drive_service, "CREDENTIALS_FILE", str(credentials_file))
monkeypatch.delenv("GOOGLE_DRIVE_ALLOW_INTERACTIVE_AUTH", raising=False)
def fail_if_browser_flow_is_started(*args, **kwargs):
raise AssertionError("背景匯入不可啟動瀏覽器 OAuth")
monkeypatch.setattr(
google_drive_service.InstalledAppFlow,
"from_client_secrets_file",
fail_if_browser_flow_is_started,
)
service = google_drive_service.GoogleDriveService()
assert service.authenticate() is False
assert service.last_error_kind == "reauthorization_required"
assert "google_token.json" in service.last_error
assert "google_token.pickle" in service.last_error
def test_google_drive_auth_fails_when_token_cannot_be_persisted(monkeypatch, tmp_path):
import services.google_drive_service as google_drive_service
google_drive_service = importlib.reload(google_drive_service)
token_file = tmp_path / "readonly" / "google_token.json"
token_file.parent.mkdir()
credentials_file = tmp_path / "google_credentials.json"
credentials_file.write_text("{}", encoding="utf-8")
class FakeCredentials:
valid = False
expired = True
refresh_token = "refresh-token"
def refresh(self, request):
self.valid = True
def to_json(self):
return "{}"
monkeypatch.setattr(google_drive_service, "TOKEN_FILE", str(token_file))
monkeypatch.setattr(google_drive_service, "CREDENTIALS_FILE", str(credentials_file))
monkeypatch.setattr(
google_drive_service.Credentials,
"from_authorized_user_info",
lambda *_args, **_kwargs: FakeCredentials(),
)
monkeypatch.setattr(
google_drive_service.os,
"replace",
lambda *_args, **_kwargs: (_ for _ in ()).throw(PermissionError("permission denied")),
)
token_file.write_text("{}", encoding="utf-8")
service = google_drive_service.GoogleDriveService()
assert service.authenticate() is False
assert service.last_error_kind == "token_store_failed"
assert "主機重啟後自動匯入會再次失敗" in service.last_error
def test_auto_import_empty_drive_folder_remains_success(monkeypatch, tmp_path):
import_service = _load_import_service(monkeypatch, f"sqlite:///{tmp_path / 'momo.db'}")
import_service.Base.metadata.create_all(import_service.engine)
class FakeDriveService:
last_error_kind = None
last_error = None
def list_files_in_folder(self, folder_path, file_pattern):
return []
monkeypatch.setattr(import_service, "drive_service", FakeDriveService())
service = import_service.ImportService()
result = service.auto_import_from_drive()
assert result["success"] is True
assert result["file_count"] == 0
assert result["message"] == "沒有找到待匯入的檔案"