#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 測試完整匯入流程 """ import os import sys from datetime import date, datetime import numpy as np import pandas as pd import pytest sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from database.vendor_manager import VendorDatabaseManager from database.vendor_models import VendorStockout EXCEL_PATH = "/Users/ogt/Downloads/缺貨測試.xlsx" @pytest.mark.skipif(not os.path.exists(EXCEL_PATH), reason="缺少本機測試 Excel 檔案") def test_full_import(): df = pd.read_excel(EXCEL_PATH) has_date_column = "當前日期" in df.columns row = df.iloc[0] row_import_date = date.today() if has_date_column: date_value = row.get("當前日期") if pd.notna(date_value) and isinstance(date_value, (int, float, np.integer, np.floating)): parsed_date = pd.to_datetime(date_value, unit="D", origin="1899-12-30", errors="coerce") if pd.notna(parsed_date): row_import_date = parsed_date.date() stockout_date_value = row.get("缺貨日期") stockout_date_obj = None if pd.notna(stockout_date_value) and isinstance(stockout_date_value, (int, float, np.integer, np.floating)): stockout_parsed = pd.to_datetime(stockout_date_value, unit="D", origin="1899-12-30", errors="coerce") if pd.notna(stockout_parsed): stockout_date_obj = stockout_parsed.date() record = { "import_date": row_import_date, "department": row.get("處別"), "section": row.get("科別"), "pm_name": row.get("PM姓名"), "zone_id": row.get("區ID"), "zone_name": row.get("區名稱"), "product_code": str(row.get("商品ID", "")).strip(), "product_name": str(row.get("商品名稱", "")).strip(), "product_spec": row.get("單品/組合商品"), "borrow_transfer": row.get("借採轉"), "sale_price": None, "cost_price": None, "vendor_code": str(row.get("來源供應商編號", "")).strip(), "vendor_name": str(row.get("來源供應商名稱", "")).strip(), "current_stock": row.get("商品可賣量"), "stockout_date": stockout_date_obj, "stockout_days": row.get("缺貨天數"), "monthly_sales_amount": row.get("缺貨商品前30天業績"), "monthly_sales_qty": row.get("最近30天銷售量"), "daily_avg_sales": None, "safe_stock_days": row.get("庫存水位"), "notes": None, } db = VendorDatabaseManager() session = db.get_session() try: batch_id = datetime.now().strftime("%Y%m%d_%H%M%S") stockout_item = VendorStockout( batch_id=batch_id, import_date=record["import_date"], import_time=datetime.now(), department=record["department"], section=record["section"], pm_name=record["pm_name"], zone_id=record["zone_id"], zone_name=record["zone_name"], product_code=record["product_code"], product_name=record["product_name"], product_spec=record["product_spec"], borrow_transfer=record["borrow_transfer"], sale_price=record["sale_price"], cost_price=record["cost_price"], vendor_code=record["vendor_code"], vendor_name=record["vendor_name"], monthly_sales_qty=record["monthly_sales_qty"], monthly_sales_amount=record["monthly_sales_amount"], daily_avg_sales=record["daily_avg_sales"], current_stock=record["current_stock"], stockout_date=record["stockout_date"], stockout_days=record["stockout_days"], safe_stock_days=record["safe_stock_days"], notes=record["notes"], status="pending", ) session.add(stockout_item) session.commit() session.expire_all() saved = session.query(VendorStockout).filter_by(product_code=record["product_code"]).first() assert saved is not None assert saved.product_code == record["product_code"] finally: session.close()