diff --git a/Dockerfile b/Dockerfile index c4aee75..de15a71 100644 --- a/Dockerfile +++ b/Dockerfile @@ -10,6 +10,7 @@ RUN apt-get update && apt-get install -y \ g++ \ curl \ libpq-dev \ + postgresql-client \ # Chrome/Selenium 依賴 wget \ gnupg \ diff --git a/config.py b/config.py index 9bcde7f..4ac176e 100644 --- a/config.py +++ b/config.py @@ -402,7 +402,7 @@ YOUTUBE_API_KEY = os.getenv('YOUTUBE_API_KEY', '') # ========================================== # 系統版本與路徑 # ========================================== -SYSTEM_VERSION = "V10.604" +SYSTEM_VERSION = "V10.605" LOG_FILE_PATH = os.path.join(BASE_DIR, 'logs/system.log') public_url = PUBLIC_URL # 用於模板顯示 diff --git a/services/db_backup_service.py b/services/db_backup_service.py index 2b63c1a..a3a03da 100644 --- a/services/db_backup_service.py +++ b/services/db_backup_service.py @@ -3,9 +3,10 @@ DB Backup Service — EwoooC V10.3 負責執行 pg_dump 備份、保留策略、以及備份狀態寫入 backup_log """ import os -import subprocess import logging import glob +import shutil +import subprocess from datetime import datetime, timedelta, timezone TAIPEI_TZ = timezone(timedelta(hours=8)) @@ -25,6 +26,44 @@ def _ensure_backup_dir(): os.makedirs(BACKUP_DIR, exist_ok=True) +def _remove_partial_backup(filepath: str): + try: + if os.path.exists(filepath): + os.remove(filepath) + logger.warning(f"[Backup] 已移除不完整備份檔: {filepath}") + except Exception as exc: + logger.warning(f"[Backup] 移除不完整備份檔失敗 {filepath}: {exc}") + + +def _ensure_pg_dump_available() -> str: + pg_dump_path = shutil.which("pg_dump") + if pg_dump_path: + return pg_dump_path + + apt_get_path = shutil.which("apt-get") + if not apt_get_path: + raise RuntimeError("pg_dump 不存在,且容器沒有 apt-get;請重建 image 並安裝 postgresql-client") + + logger.info("[Backup] pg_dump 不存在,嘗試安裝 postgresql-client...") + commands = [ + [apt_get_path, "update", "-qq"], + [apt_get_path, "install", "-y", "-qq", "postgresql-client"], + ] + for command in commands: + proc = subprocess.run(command, capture_output=True, text=True, timeout=180) + if proc.returncode != 0: + stderr = (proc.stderr or proc.stdout or "").strip() + raise RuntimeError( + "pg_dump 不存在,自動安裝 postgresql-client 失敗:" + f"{' '.join(command)} → {stderr[:500]}" + ) + + pg_dump_path = shutil.which("pg_dump") + if not pg_dump_path: + raise RuntimeError("postgresql-client 安裝後仍找不到 pg_dump") + return pg_dump_path + + def _log_backup(filename, file_size, duration, status, error=None, storage_path=None): """寫入 backup_log 表,失敗不阻斷主流程""" try: @@ -68,14 +107,6 @@ def run_backup() -> dict: db_host = os.environ.get("POSTGRES_HOST", "momo-db") db_port = os.environ.get("POSTGRES_PORT", "5432") - # 若 pg_dump 不存在則嘗試安裝(容器重建後需重裝;Dockerfile 已加入 postgresql-client) - if not os.path.exists("/usr/bin/pg_dump"): - logger.info("[Backup] pg_dump 不存在,嘗試安裝 postgresql-client...") - subprocess.run( - ["apt-get", "install", "-y", "-qq", "postgresql-client"], - capture_output=True - ) - pg_password = os.environ.get("POSTGRES_PASSWORD") pg_env = {**os.environ, "PGPASSWORD": pg_password} if pg_password else dict(os.environ) @@ -83,9 +114,10 @@ def run_backup() -> dict: result = {"success": False, "filename": filename, "file_size": 0, "duration": 0, "error": None} try: + pg_dump_path = _ensure_pg_dump_available() with open(filepath, "wb") as out_f: pg_dump_proc = subprocess.Popen( - ["pg_dump", "-h", db_host, "-p", db_port, "-U", DB_USER, "-d", DB_NAME, + [pg_dump_path, "-h", db_host, "-p", db_port, "-U", DB_USER, "-d", DB_NAME, "--no-password", "-Fp"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=pg_env @@ -112,6 +144,7 @@ def run_backup() -> dict: if proc.returncode != 0: error_msg = proc.stderr.strip() or "pg_dump 非零退出碼" logger.error(f"[Backup] 備份失敗: {error_msg}") + _remove_partial_backup(filepath) result["error"] = error_msg result["duration"] = duration _log_backup(filename, 0, duration, "failed", error=error_msg) @@ -125,6 +158,7 @@ def run_backup() -> dict: duration = (datetime.now() - start).total_seconds() error_msg = "pg_dump 超時(300s)" logger.error(f"[Backup] {error_msg}") + _remove_partial_backup(filepath) result["error"] = error_msg result["duration"] = duration _log_backup(filename, 0, duration, "failed", error=error_msg) @@ -132,6 +166,7 @@ def run_backup() -> dict: duration = (datetime.now() - start).total_seconds() error_msg = str(e) logger.error(f"[Backup] 備份異常: {e}") + _remove_partial_backup(filepath) result["error"] = error_msg result["duration"] = duration _log_backup(filename, 0, duration, "failed", error=error_msg) diff --git a/services/google_drive_service.py b/services/google_drive_service.py index 9c6a928..4bcc268 100644 --- a/services/google_drive_service.py +++ b/services/google_drive_service.py @@ -39,6 +39,10 @@ class GoogleDriveService: self.service = None self.credentials = None + @staticmethod + def _escape_query_value(value: str) -> str: + return value.replace("\\", "\\\\").replace("'", "\\'") + def authenticate(self) -> bool: """ 進行 Google Drive 認證 @@ -170,7 +174,8 @@ class GoogleDriveService: continue # 搜尋此層級的資料夾 - query = f"name='{folder_name}' and '{parent_id}' in parents and mimeType='application/vnd.google-apps.folder' and trashed=false" + safe_folder_name = self._escape_query_value(folder_name) + query = f"name='{safe_folder_name}' and '{parent_id}' in parents and mimeType='application/vnd.google-apps.folder' and trashed=false" results = self.service.files().list( q=query, @@ -194,6 +199,53 @@ class GoogleDriveService: logger.error(f"搜尋資料夾時發生錯誤: {error}") return None + def _ensure_folder_id_by_path(self, folder_path: str) -> Optional[str]: + """ + 根據路徑取得資料夾 ID;若中途資料夾不存在則建立。 + """ + try: + if not folder_path or folder_path.strip('/') == '': + return 'root' + + path_parts = folder_path.strip('/').split('/') + parent_id = 'root' + + for folder_name in path_parts: + if not folder_name: + continue + + safe_folder_name = self._escape_query_value(folder_name) + query = f"name='{safe_folder_name}' and '{parent_id}' in parents and mimeType='application/vnd.google-apps.folder' and trashed=false" + results = self.service.files().list( + q=query, + spaces='drive', + fields='files(id, name)', + pageSize=1 + ).execute() + + folders = results.get('files', []) + if folders: + parent_id = folders[0]['id'] + continue + + metadata = { + 'name': folder_name, + 'mimeType': 'application/vnd.google-apps.folder', + 'parents': [parent_id], + } + folder = self.service.files().create( + body=metadata, + fields='id, name' + ).execute() + parent_id = folder['id'] + logger.info(f"已建立 Google Drive 資料夾: {folder_name}") + + return parent_id + + except HttpError as error: + logger.error(f"建立資料夾時發生錯誤: {error}") + return None + def download_file(self, file_id: str, destination_path: str) -> bool: """ 下載檔案 @@ -242,13 +294,14 @@ class GoogleDriveService: logger.error(f"下載檔案時發生異常: {str(e)}") return False - def move_file(self, file_id: str, destination_folder_path: str) -> bool: + def move_file(self, file_id: str, destination_folder_path: str, create_missing: bool = False) -> bool: """ 移動檔案到指定資料夾 Args: file_id: Google Drive 檔案 ID destination_folder_path: 目標資料夾路徑(如: "已匯入") + create_missing: 目標資料夾不存在時是否自動建立 Returns: bool: 移動是否成功 @@ -267,7 +320,10 @@ class GoogleDriveService: previous_parents = ','.join(file.get('parents', [])) # 取得目標資料夾 ID - destination_folder_id = self._get_folder_id_by_path(destination_folder_path) + if create_missing: + destination_folder_id = self._ensure_folder_id_by_path(destination_folder_path) + else: + destination_folder_id = self._get_folder_id_by_path(destination_folder_path) if not destination_folder_id: logger.error(f"找不到目標資料夾: {destination_folder_path}") return False diff --git a/services/import_service.py b/services/import_service.py index 70ed926..f4f7d64 100644 --- a/services/import_service.py +++ b/services/import_service.py @@ -5,19 +5,28 @@ 負責從 Google Drive 自動下載、匯入、刪除檔案 """ -import os import logging import json +import os from datetime import date, datetime -from typing import Optional, Dict, Any -from sqlalchemy import create_engine, text -from sqlalchemy.orm import sessionmaker +from typing import Any, Dict, Optional + import pandas as pd import pytz +from sqlalchemy import create_engine, text +from sqlalchemy.orm import sessionmaker # 台北時區 TAIPEI_TZ = pytz.timezone('Asia/Taipei') +DAILY_SALES_REQUIRED_COLUMN_GROUPS = { + "商品名稱類": ["商品名稱", "品名", "Product", "Name"], + "業績金額類": ["銷售金額", "總業績", "業績", "金額", "Amount", "Sales", "Total"], +} +DAILY_SALES_DATE_ALIASES = ["日期", "訂單日期", "交易日期", "Date"] +DAILY_SALES_DETAIL_SHEET_HINTS = ("即時業績明細", "業績明細", "明細", "detail", "daily") +DAILY_SALES_HEADER_SCAN_ROWS = 15 + from services.google_drive_service import drive_service from database.import_models import ImportJob, ImportConfig, Base from database.manager import ensure_metadata_initialized @@ -80,6 +89,211 @@ def _normalise_date_values_for_sql(values): normalised.append(parsed.strftime("%Y-%m-%d") if pd.notna(parsed) else str(value)) return normalised + +def _normalise_column_text(value) -> str: + if pd.isna(value): + return "" + return str(value).strip().replace("\n", "").replace("\r", "").replace(" ", "").lower() + + +def _stringify_column(value) -> str: + if pd.isna(value): + return "" + return str(value).strip() + + +def _dedupe_columns(columns) -> list[str]: + seen = {} + result = [] + for idx, column in enumerate(columns): + base = _stringify_column(column) or f"unnamed_{idx + 1}" + count = seen.get(base, 0) + 1 + seen[base] = count + result.append(base if count == 1 else f"{base}_{count}") + return result + + +def _columns_have_any(columns, keywords) -> bool: + normalised_columns = [_normalise_column_text(column) for column in columns] + normalised_keywords = [_normalise_column_text(keyword) for keyword in keywords] + return any( + keyword and column and keyword in column + for column in normalised_columns + for keyword in normalised_keywords + ) + + +def _missing_daily_sales_column_groups(columns) -> list[str]: + return [ + label + for label, keywords in DAILY_SALES_REQUIRED_COLUMN_GROUPS.items() + if not _columns_have_any(columns, keywords) + ] + + +def _find_daily_sales_date_column(columns) -> Optional[str]: + normalised_aliases = [_normalise_column_text(alias) for alias in DAILY_SALES_DATE_ALIASES] + for column in columns: + normalised_column = _normalise_column_text(column) + if normalised_column in normalised_aliases: + return column + for column in columns: + normalised_column = _normalise_column_text(column) + if normalised_column and any(alias and alias in normalised_column for alias in normalised_aliases): + return column + return None + + +def _score_daily_sales_candidate(sheet_name: str, header_row: int, columns) -> int: + missing = _missing_daily_sales_column_groups(columns) + if missing: + return 0 + + normalised_sheet_name = _normalise_column_text(sheet_name) + score = 1000 + if _find_daily_sales_date_column(columns): + score += 250 + if any(_normalise_column_text(hint) in normalised_sheet_name for hint in DAILY_SALES_DETAIL_SHEET_HINTS): + score += 150 + score += min(len([column for column in columns if _stringify_column(column)]), 80) + score += max(0, DAILY_SALES_HEADER_SCAN_ROWS - header_row) + return score + + +def _clean_daily_sales_dataframe(df: pd.DataFrame) -> pd.DataFrame: + cleaned = df.dropna(how="all").dropna(axis=1, how="all").copy() + cleaned.columns = _dedupe_columns(cleaned.columns) + return cleaned + + +def _format_daily_sales_diagnostics(candidates: list[dict]) -> str: + if not candidates: + return "未找到可辨識的工作表表頭" + + parts = [] + for candidate in sorted(candidates, key=lambda item: item["score"], reverse=True)[:8]: + columns_sample = [ + column for column in candidate["columns_sample"] + if column and not str(column).lower().startswith("nan") + ][:8] + missing = candidate.get("missing") or [] + parts.append( + f"{candidate['sheet_name']}!第{candidate['header_row'] + 1}列 " + f"缺少={missing or '無'} 欄位={columns_sample}" + ) + return ";".join(parts) + + +def _read_daily_sales_excel(file_path: str) -> tuple[pd.DataFrame, dict]: + """ + 讀取當日業績 Excel,支援多工作表與非第一列表頭。 + + Google Drive 匯出的檔案常把樞紐彙總表放在第一張 sheet, + 真正的匯入明細在「即時業績明細」。此處先掃描所有 sheet 的前幾列, + 再選出同時含商品名稱、業績金額且最好有日期欄位的明細 sheet。 + """ + candidates: list[dict] = [] + + with pd.ExcelFile(file_path, engine="openpyxl") as excel: + for sheet_name in excel.sheet_names: + try: + preview = pd.read_excel( + excel, + sheet_name=sheet_name, + header=None, + nrows=DAILY_SALES_HEADER_SCAN_ROWS, + dtype=str, + ) + except Exception as exc: + candidates.append({ + "sheet_name": sheet_name, + "header_row": 0, + "columns_sample": [f"讀取預覽失敗: {exc}"], + "missing": list(DAILY_SALES_REQUIRED_COLUMN_GROUPS.keys()), + "score": 0, + }) + continue + + for header_row, row in preview.iterrows(): + columns = [_stringify_column(value) for value in row.tolist()] + if not any(columns): + continue + missing = _missing_daily_sales_column_groups(columns) + score = _score_daily_sales_candidate(sheet_name, int(header_row), columns) + candidates.append({ + "sheet_name": sheet_name, + "header_row": int(header_row), + "columns_sample": columns[:30], + "missing": missing, + "score": score, + }) + + valid_candidates = [candidate for candidate in candidates if not candidate["missing"]] + if not valid_candidates: + diagnostics = _format_daily_sales_diagnostics(candidates) + raise ValueError( + "Excel 欄位防禦失敗:找不到可匯入的當日業績明細工作表。" + f"已檢查:{diagnostics}" + ) + + best = max(valid_candidates, key=lambda candidate: candidate["score"]) + df = pd.read_excel( + excel, + sheet_name=best["sheet_name"], + header=best["header_row"], + dtype=str, + ) + + df = _clean_daily_sales_dataframe(df) + if df.empty: + raise ValueError("Excel 檔案為空") + + missing = _missing_daily_sales_column_groups(df.columns) + if missing: + raise ValueError( + f"Excel 欄位防禦失敗:缺少必要欄位分類 {missing}。" + f"現有欄位:{list(df.columns)[:30]}" + ) + + metadata = { + "sheet_name": best["sheet_name"], + "header_row": best["header_row"] + 1, + "date_col": _find_daily_sales_date_column(df.columns), + "candidate_score": best["score"], + } + return df, metadata + + +def _daily_sales_staleness_guard_enabled() -> bool: + return os.getenv("DAILY_SALES_IMPORT_STALENESS_GUARD_ENABLED", "true").lower() in { + "1", + "true", + "yes", + "on", + } + + +def _daily_sales_max_stale_days() -> int: + try: + return max(0, int(os.getenv("DAILY_SALES_IMPORT_MAX_STALE_DAYS", "3"))) + except ValueError: + return 3 + + +def _should_quarantine_failed_import(error_message: str) -> bool: + if not error_message: + return False + + permanent_error_markers = [ + "Excel 欄位防禦失敗", + "Excel 日期防禦失敗", + "Excel 檔案為空", + "找不到可匯入", + "欄位驗證失敗", + "日期驗證失敗", + ] + return any(marker in error_message for marker in permanent_error_markers) + # 資料庫設定 - 使用 config.py 中的設定,支援 PostgreSQL 和 SQLite def _create_engine_with_pool(db_path): """建立帶有連線池配置的資料庫引擎""" @@ -369,50 +583,57 @@ class ImportService: # 讀取 Excel 檔案 logger.info(f"開始讀取 Excel 檔案: {file_path}") - df = pd.read_excel(file_path, engine='openpyxl', dtype=str) - - if df.empty: - error_msg = "Excel 檔案為空" - self.update_job_status(job_id, 'failed', 50, '匯入失敗', error_msg) - return False - - # ───────────────────────────────────────────── - # 2026-04-19: daily_sales_snapshot 前置欄位防禦 (技術債修復) - # 原因:若 Excel 欄位名靜默變更,匯入會成功但 Hermes SQL JOIN 會找不到數據 → 告警管線失真 - # 規則:至少需偵測到「商品名稱」與「銷售金額」類欄位 (容忍多種別名) - # ───────────────────────────────────────────── - def _has_any(cols, keywords): - return any(kw in c for c in cols for kw in keywords) - - required_groups = { - "商品名稱類": ["商品名稱", "品名", "Product", "Name"], - "業績金額類": ["銷售金額", "業績", "金額", "Amount", "Sales", "Total"], - } - missing = [label for label, kws in required_groups.items() - if not _has_any(df.columns, kws)] - if missing: - error_msg = ( - f"Excel 欄位防禦失敗:缺少必要欄位分類 {missing}。" - f"現有欄位:{list(df.columns)[:30]}" - ) + try: + df, excel_metadata = _read_daily_sales_excel(file_path) + except ValueError as exc: + error_msg = str(exc) + step = '欄位驗證失敗' if '欄位' in error_msg else '匯入失敗' logger.error(error_msg) - self.update_job_status(job_id, 'failed', 50, '欄位驗證失敗', error_msg) + self.update_job_status(job_id, 'failed', 50, step, error_msg) return False + logger.info( + "任務 %s 選用 Excel 工作表: %s | 表頭列=%s | 日期欄=%s | 欄位數=%s | 資料列=%s", + job_id, + excel_metadata.get("sheet_name"), + excel_metadata.get("header_row"), + excel_metadata.get("date_col") or "未提供", + len(df.columns), + len(df), + ) + # 匯入到資料庫 table_name = 'daily_sales_snapshot' # 找到日期欄位 - date_col = None - for possible_col in ['日期', '訂單日期', '交易日期', 'Date']: - if possible_col in df.columns: - date_col = possible_col - break + date_col = excel_metadata.get("date_col") or _find_daily_sales_date_column(df.columns) if date_col: # 解析日期 - df['snapshot_date'] = pd.to_datetime(df[date_col], errors='coerce').dt.date + parsed_dates = pd.to_datetime(df[date_col], errors='coerce') + if parsed_dates.dropna().empty: + error_msg = f"Excel 日期防禦失敗:日期欄位「{date_col}」無法解析出有效日期" + logger.error(error_msg) + self.update_job_status(job_id, 'failed', 50, '日期驗證失敗', error_msg) + return False + + df['snapshot_date'] = parsed_dates.dt.date logger.info(f"使用日期欄位: {date_col}") + + if _daily_sales_staleness_guard_enabled(): + max_snapshot_date = df['snapshot_date'].dropna().max() + today = datetime.now(TAIPEI_TZ).date() + lag_days = (today - max_snapshot_date).days + max_stale_days = _daily_sales_max_stale_days() + if lag_days > max_stale_days: + error_msg = ( + f"Excel 日期防禦失敗:明細最大日期 {max_snapshot_date}," + f"已落後 {lag_days} 天(上限 {max_stale_days} 天)。" + "請上傳最新當日業績檔,或將舊檔移至已匯入/匯入失敗資料夾。" + ) + logger.error(error_msg) + self.update_job_status(job_id, 'failed', 50, '日期驗證失敗', error_msg) + return False else: # 使用當前日期 df['snapshot_date'] = datetime.now(TAIPEI_TZ).date() @@ -688,6 +909,8 @@ class ImportService: 'verified': True, # daily_sales_snapshot 驗證 'date_min': date_min, 'date_max': date_max, + 'source_sheet': excel_metadata.get("sheet_name"), + 'source_header_row': excel_metadata.get("header_row"), 'message': sync_message } @@ -860,14 +1083,46 @@ class ImportService: session = Session() try: job = session.query(ImportJob).filter_by(id=job_id).first() + error_message = (job.error_message if job else None) or '匯入失敗,請查看 import_jobs' + quarantined = False + failed_folder = None + if _should_quarantine_failed_import(error_message): + default_failed_folder = ( + f"{folder_path.rstrip('/')}/匯入失敗" + if folder_path and folder_path.strip("/") + else "匯入失敗" + ) + failed_folder = self.get_config('gdrive_failed_folder', default_failed_folder) + if drive_service.move_file(file_id, failed_folder, create_missing=True): + quarantined = True + error_message = ( + f"{error_message}(已移至「{failed_folder}」避免重複告警)" + ) + self.update_job_status(job_id, 'failed', 100, '已移至匯入失敗', error_message) + logger.warning( + "已隔離不可自動修復的匯入失敗檔案: %s → %s", + file_name, + failed_folder, + ) + else: + logger.error("匯入失敗檔案隔離失敗: %s → %s", file_name, failed_folder) + failed_files.append({ 'file': file_name, 'job_id': job_id, - 'error': (job.error_message if job else None) or '匯入失敗,請查看 import_jobs', + 'error': error_message, + 'quarantined': quarantined, + 'failed_folder': failed_folder, }) finally: session.close() + try: + os.remove(local_path) + logger.info(f"已清理失敗匯入本地檔案: {local_path}") + except Exception as e: + logger.warning(f"清理失敗匯入本地檔案失敗: {str(e)}") + # 計算日期範圍 date_range = None if all_dates: diff --git a/tests/test_daily_sales_excel_detection.py b/tests/test_daily_sales_excel_detection.py new file mode 100644 index 0000000..ed2729f --- /dev/null +++ b/tests/test_daily_sales_excel_detection.py @@ -0,0 +1,77 @@ +import pandas as pd +import pytest + +from services.import_service import ( + _read_daily_sales_excel, + _should_quarantine_failed_import, +) + + +def test_daily_sales_reader_prefers_detail_sheet_over_summary_sheets(tmp_path): + workbook = tmp_path / "即時業績_當日.xlsx" + with pd.ExcelWriter(workbook, engine="openpyxl") as writer: + pd.DataFrame([ + [None, None, None, None], + [None, None, None, None], + ["列標籤", "加總 - 總業績", "加總 - 總成本", "加總 - 折扣金額"], + ["保健", "100", "60", "5"], + ]).to_excel(writer, sheet_name="工作表1", header=False, index=False) + pd.DataFrame({ + "列標籤": ["A"], + "商品名稱": ["彙總商品"], + "銷售網頁ID": ["P001"], + "加總 - 總業績": ["300"], + }).to_excel(writer, sheet_name="工作表2", index=False) + pd.DataFrame({ + "狀態": ["完成"], + "日期": ["2026-06-14"], + "商品ID": ["SKU001"], + "商品名稱": ["專業測試商品"], + "數量": ["1"], + "總業績": ["1200"], + }).to_excel(writer, sheet_name="即時業績明細", index=False) + + df, metadata = _read_daily_sales_excel(str(workbook)) + + assert metadata["sheet_name"] == "即時業績明細" + assert metadata["header_row"] == 1 + assert metadata["date_col"] == "日期" + assert list(df["商品名稱"]) == ["專業測試商品"] + + +def test_daily_sales_reader_scans_header_rows(tmp_path): + workbook = tmp_path / "header-offset.xlsx" + with pd.ExcelWriter(workbook, engine="openpyxl") as writer: + pd.DataFrame([ + ["報表產製時間", "2026-06-14", None], + [None, None, None], + ["日期", "商品名稱", "總業績"], + ["2026-06-14", "測試商品", "999"], + ]).to_excel(writer, sheet_name="即時業績明細", header=False, index=False) + + df, metadata = _read_daily_sales_excel(str(workbook)) + + assert metadata["header_row"] == 3 + assert df.iloc[0]["商品名稱"] == "測試商品" + + +def test_daily_sales_reader_reports_checked_sheets_when_no_candidate(tmp_path): + workbook = tmp_path / "invalid.xlsx" + with pd.ExcelWriter(workbook, engine="openpyxl") as writer: + pd.DataFrame({"Unnamed: 0": ["A"], "Unnamed: 1": ["B"]}).to_excel( + writer, + sheet_name="工作表1", + index=False, + ) + + with pytest.raises(ValueError) as exc: + _read_daily_sales_excel(str(workbook)) + + assert "Excel 欄位防禦失敗" in str(exc.value) + assert "工作表1" in str(exc.value) + + +def test_only_permanent_import_failures_are_quarantined(): + assert _should_quarantine_failed_import("Excel 欄位防禦失敗:缺少必要欄位分類") + assert _should_quarantine_failed_import("Excel 日期防禦失敗:明細最大日期 2026-06-10") + assert not _should_quarantine_failed_import("無法從 Google Drive 下載檔案")