#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 測試廠商郵件寫入功能 驗證 Excel 匯入時郵件是否正確寫入資料庫 """ import sys import os # 加入專案路徑 sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from database.vendor_manager import VendorDatabaseManager from database.vendor_models import VendorList, VendorEmail def test_vendor_email(): """測試廠商郵件功能""" print("=" * 60) print("廠商郵件功能測試") print("=" * 60) # 初始化資料庫管理器 db = VendorDatabaseManager() session = db.get_session() try: # 1. 測試新增廠商 print("\n[測試 1] 新增測試廠商...") test_vendor_code = "TEST001" test_vendor_name = "測試廠商股份有限公司" # 先刪除測試資料(如果存在) existing = session.query(VendorList).filter_by(vendor_code=test_vendor_code).first() if existing: session.query(VendorEmail).filter_by(vendor_id=existing.id).delete() session.delete(existing) session.commit() print(f" 已清除現有測試資料") vendor = db.add_vendor(test_vendor_code, test_vendor_name) if vendor: print(f" ✅ 成功新增廠商 | 代碼: {test_vendor_code} | ID: {vendor.id}") else: print(f" ❌ 新增廠商失敗") return # 2. 測試新增郵件 print("\n[測試 2] 新增郵件地址...") test_emails = [ "vendor1@example.com", "vendor2@example.com", "vendor3@example.com" ] for email in test_emails: result = db.add_vendor_email( vendor_code=test_vendor_code, email=email, email_type='primary' ) if result: print(f" ✅ 成功新增郵件: {email}") else: print(f" ⚠️ 郵件已存在或新增失敗: {email}") # 3. 測試去重功能 print("\n[測試 3] 測試郵件去重...") duplicate_result = db.add_vendor_email( vendor_code=test_vendor_code, email=test_emails[0], email_type='primary' ) if duplicate_result is None: print(f" ✅ 去重功能正常,重複郵件被跳過") else: print(f" ❌ 去重功能失敗,重複郵件被寫入") # 4. 查詢廠商郵件 print("\n[測試 4] 查詢廠商郵件...") vendor_refresh = session.query(VendorList).filter_by(vendor_code=test_vendor_code).first() emails_query = session.query(VendorEmail).filter_by(vendor_id=vendor_refresh.id).all() print(f" 廠商代碼: {vendor_refresh.vendor_code}") print(f" 廠商名稱: {vendor_refresh.vendor_name}") print(f" 郵件數量: {len(emails_query)}") print(f" 郵件清單:") for email_obj in emails_query: print(f" - {email_obj.email} (ID: {email_obj.id}, 類型: {email_obj.email_type})") # 5. 測試 get_vendor_emails 方法 print("\n[測試 5] 使用 get_vendor_emails 方法...") emails_dict = db.get_vendor_emails(test_vendor_code) print(f" Primary 郵件: {len(emails_dict['primary'])} 個") print(f" CC 郵件: {len(emails_dict['cc'])} 個") print(f" BCC 郵件: {len(emails_dict['bcc'])} 個") # 6. 清理測試資料 print("\n[測試 6] 清理測試資料...") session.query(VendorEmail).filter_by(vendor_id=vendor_refresh.id).delete() session.delete(vendor_refresh) session.commit() print(f" ✅ 測試資料已清除") print("\n" + "=" * 60) print("✅ 所有測試通過!廠商郵件功能正常運作。") print("=" * 60) except Exception as e: print(f"\n❌ 測試過程發生錯誤: {e}") import traceback traceback.print_exc() session.rollback() finally: session.close() def check_existing_vendors(): """檢查現有廠商的郵件資料""" print("\n" + "=" * 60) print("檢查現有廠商郵件資料") print("=" * 60) db = VendorDatabaseManager() session = db.get_session() try: # 查詢所有廠商 vendors = session.query(VendorList).limit(10).all() if not vendors: print("\n 目前沒有廠商資料") return print(f"\n共有 {session.query(VendorList).count()} 個廠商,顯示前 10 筆:\n") for vendor in vendors: emails = session.query(VendorEmail).filter_by(vendor_id=vendor.id).all() print(f"廠商: {vendor.vendor_name} ({vendor.vendor_code})") print(f" ID: {vendor.id}") print(f" 郵件數: {len(emails)}") if emails: for email in emails: print(f" - {email.email}") else: print(f" (無郵件)") print() except Exception as e: print(f"\n❌ 查詢失敗: {e}") import traceback traceback.print_exc() finally: session.close() if __name__ == '__main__': # 執行測試 test_vendor_email() # 檢查現有資料 check_existing_vendors()