Files
ewoooc/database/vendor_manager.py
ogt 1b4f3a7bbe
Some checks failed
CD Pipeline / deploy (push) Failing after 59s
feat: EwoooC 初始化 — 完整專案推版至 Gitea
- 建立 Gitea Actions CD pipeline (.gitea/workflows/cd.yaml)
- 部署模式: rsync Python 檔案至 188 → docker restart (volume mount)
- Dockerfile/requirements 變動時自動重建 Docker image
- 部署通知: Telegram (開始/成功/失敗)
- 健康檢查: https://mo.wooo.work/health (最多 5 次重試)
- 同步最新 CLAUDE.md / ADR-008 / memory (2026-04-19)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-19 01:21:13 +08:00

666 lines
22 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
廠商缺貨通知系統 - 資料庫管理器
提供廠商缺貨資料的 CRUD 操作
"""
import os
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from datetime import datetime
from .vendor_models import Base, VendorStockout, VendorList, VendorEmail, EmailSendLog
# 導入日誌管理模組
from services.logger_manager import SystemLogger
# 初始化日誌
sys_log = SystemLogger("VendorDatabase").get_logger()
class VendorDatabaseManager:
"""廠商缺貨系統資料庫管理器"""
def __init__(self, db_path=None):
"""
初始化資料庫連線。
優先使用 PostgreSQL (透過 config.py 設定),否則回退到 SQLite。
Args:
db_path: 資料庫檔案路徑 (僅 SQLite 模式使用),若為 None 則使用預設路徑
"""
# V-Fix (2026-01-23): 優先使用 config.py 的資料庫設定,與主 DatabaseManager 保持一致
from config import DATABASE_PATH, DATABASE_TYPE
if DATABASE_TYPE == 'postgresql':
# PostgreSQL 模式 - 使用 config.py 的連線字串
self.engine = create_engine(DATABASE_PATH, echo=False, pool_pre_ping=True)
self.Session = sessionmaker(bind=self.engine)
Base.metadata.create_all(self.engine)
sys_log.info(f"[VendorDatabase] ✅ 使用 PostgreSQL 資料庫")
else:
# SQLite 模式 - 向後相容
if db_path is None:
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
db_path = os.path.join(base_dir, 'data', 'momo_database.db')
sys_log.info(f"廠商缺貨系統資料庫連線初始化 | Path: {db_path}")
# 確保資料夾存在
os.makedirs(os.path.dirname(db_path), exist_ok=True)
# 建立引擎與 Session
self.engine = create_engine(f'sqlite:///{db_path}', echo=False)
Base.metadata.create_all(self.engine)
self.Session = sessionmaker(bind=self.engine)
sys_log.info("[VendorDatabase] 使用 SQLite 資料庫")
sys_log.info("✅ 廠商缺貨系統資料表已建立/更新")
def get_session(self):
"""
提供外部調用的 Session 實例。
Returns:
sqlalchemy.orm.Session: 資料庫 Session
"""
return self.Session()
# ==========================================
# 廠商清單管理
# ==========================================
def add_vendor(self, vendor_code, vendor_name, is_active=True):
"""
新增廠商
Args:
vendor_code: 廠商代碼
vendor_name: 廠商名稱
is_active: 是否啟用
Returns:
VendorList: 新增的廠商物件,若已存在則回傳 None
"""
session = self.get_session()
try:
# 檢查是否已存在
existing = session.query(VendorList).filter_by(vendor_code=vendor_code).first()
if existing:
sys_log.warning(f"廠商已存在 | 代碼: {vendor_code}")
return None
# 建立新廠商
vendor = VendorList(
vendor_code=vendor_code,
vendor_name=vendor_name,
is_active=is_active
)
session.add(vendor)
session.commit()
sys_log.info(f"✅ 新增廠商成功 | 代碼: {vendor_code} | 名稱: {vendor_name}")
return vendor
except Exception as e:
session.rollback()
sys_log.error(f"❌ 新增廠商失敗 | 代碼: {vendor_code} | 錯誤: {e}")
return None
finally:
session.close()
def get_vendor_by_code(self, vendor_code):
"""
根據廠商代碼查詢廠商
Args:
vendor_code: 廠商代碼
Returns:
VendorList: 廠商物件,若不存在則回傳 None
"""
session = self.get_session()
try:
vendor = session.query(VendorList).filter_by(vendor_code=vendor_code).first()
return vendor
finally:
session.close()
def get_all_vendors(self, active_only=True):
"""
取得所有廠商清單
Args:
active_only: 是否只取得啟用中的廠商
Returns:
list: 廠商物件清單
"""
session = self.get_session()
try:
query = session.query(VendorList)
if active_only:
query = query.filter_by(is_active=True)
vendors = query.order_by(VendorList.vendor_code).all()
return vendors
finally:
session.close()
def update_vendor(self, vendor_code, vendor_name=None, is_active=None):
"""
更新廠商資訊
Args:
vendor_code: 廠商代碼
vendor_name: 廠商名稱 (可選)
is_active: 是否啟用 (可選)
Returns:
bool: 是否更新成功
"""
session = self.get_session()
try:
vendor = session.query(VendorList).filter_by(vendor_code=vendor_code).first()
if not vendor:
sys_log.warning(f"廠商不存在 | 代碼: {vendor_code}")
return False
# 更新欄位
if vendor_name is not None:
vendor.vendor_name = vendor_name
if is_active is not None:
vendor.is_active = is_active
vendor.updated_at = datetime.now()
session.commit()
sys_log.info(f"✅ 更新廠商成功 | 代碼: {vendor_code}")
return True
except Exception as e:
session.rollback()
sys_log.error(f"❌ 更新廠商失敗 | 代碼: {vendor_code} | 錯誤: {e}")
return False
finally:
session.close()
def delete_vendor(self, vendor_code):
"""
刪除廠商 (會連帶刪除相關的郵件與發送記錄)
Args:
vendor_code: 廠商代碼
Returns:
bool: 是否刪除成功
"""
session = self.get_session()
try:
vendor = session.query(VendorList).filter_by(vendor_code=vendor_code).first()
if not vendor:
sys_log.warning(f"廠商不存在 | 代碼: {vendor_code}")
return False
session.delete(vendor)
session.commit()
sys_log.info(f"✅ 刪除廠商成功 | 代碼: {vendor_code}")
return True
except Exception as e:
session.rollback()
sys_log.error(f"❌ 刪除廠商失敗 | 代碼: {vendor_code} | 錯誤: {e}")
return False
finally:
session.close()
# ==========================================
# 廠商郵件管理
# ==========================================
def add_vendor_email(self, vendor_code, email, contact_name=None,
email_type='primary', is_active=True, notes=None):
"""
新增廠商郵件(自動去重)
Args:
vendor_code: 廠商代碼
email: 郵件地址
contact_name: 聯絡人姓名
email_type: 郵件類型 (primary/cc/bcc)
is_active: 是否啟用
notes: 備註
Returns:
VendorEmail: 新增的郵件物件,若失敗或已存在則回傳 None
"""
session = self.get_session()
try:
# 查詢廠商
vendor = session.query(VendorList).filter_by(vendor_code=vendor_code).first()
if not vendor:
sys_log.warning(f"廠商不存在 | 代碼: {vendor_code}")
return None
# 檢查郵件是否已存在(去重)
existing_email = session.query(VendorEmail).filter_by(
vendor_id=vendor.id,
email=email
).first()
if existing_email:
sys_log.debug(f"郵件已存在,跳過 | 廠商: {vendor_code} | 郵件: {email}")
return None
# 建立新郵件
vendor_email = VendorEmail(
vendor_id=vendor.id,
email=email,
contact_name=contact_name,
email_type=email_type,
is_active=is_active,
notes=notes
)
session.add(vendor_email)
session.commit()
sys_log.info(f"✅ 新增廠商郵件成功 | 廠商: {vendor_code} | 郵件: {email}")
return vendor_email
except Exception as e:
session.rollback()
sys_log.error(f"❌ 新增廠商郵件失敗 | 廠商: {vendor_code} | 錯誤: {e}")
return None
finally:
session.close()
def get_vendor_emails(self, vendor_code, active_only=True):
"""
取得廠商的所有郵件
Args:
vendor_code: 廠商代碼
active_only: 是否只取得啟用中的郵件
Returns:
dict: 郵件清單,依類型分類 {'primary': [...], 'cc': [...], 'bcc': [...]}
"""
session = self.get_session()
try:
vendor = session.query(VendorList).filter_by(vendor_code=vendor_code).first()
if not vendor:
return {'primary': [], 'cc': [], 'bcc': []}
query = session.query(VendorEmail).filter_by(vendor_id=vendor.id)
if active_only:
query = query.filter_by(is_active=True)
emails = query.all()
# 依類型分類
result = {'primary': [], 'cc': [], 'bcc': []}
for email in emails:
result[email.email_type].append(email)
return result
finally:
session.close()
def update_vendor_email(self, email_id, email=None, contact_name=None,
email_type=None, is_active=None, notes=None):
"""
更新廠商郵件
Args:
email_id: 郵件 ID
email: 郵件地址 (可選)
contact_name: 聯絡人姓名 (可選)
email_type: 郵件類型 (可選)
is_active: 是否啟用 (可選)
notes: 備註 (可選)
Returns:
bool: 是否更新成功
"""
session = self.get_session()
try:
vendor_email = session.query(VendorEmail).filter_by(id=email_id).first()
if not vendor_email:
sys_log.warning(f"郵件不存在 | ID: {email_id}")
return False
# 更新欄位
if email is not None:
vendor_email.email = email
if contact_name is not None:
vendor_email.contact_name = contact_name
if email_type is not None:
vendor_email.email_type = email_type
if is_active is not None:
vendor_email.is_active = is_active
if notes is not None:
vendor_email.notes = notes
vendor_email.updated_at = datetime.now()
session.commit()
sys_log.info(f"✅ 更新廠商郵件成功 | ID: {email_id}")
return True
except Exception as e:
session.rollback()
sys_log.error(f"❌ 更新廠商郵件失敗 | ID: {email_id} | 錯誤: {e}")
return False
finally:
session.close()
def delete_vendor_email(self, email_id):
"""
刪除廠商郵件
Args:
email_id: 郵件 ID
Returns:
bool: 是否刪除成功
"""
session = self.get_session()
try:
vendor_email = session.query(VendorEmail).filter_by(id=email_id).first()
if not vendor_email:
sys_log.warning(f"郵件不存在 | ID: {email_id}")
return False
session.delete(vendor_email)
session.commit()
sys_log.info(f"✅ 刪除廠商郵件成功 | ID: {email_id}")
return True
except Exception as e:
session.rollback()
sys_log.error(f"❌ 刪除廠商郵件失敗 | ID: {email_id} | 錯誤: {e}")
return False
finally:
session.close()
# ==========================================
# 缺貨記錄管理
# ==========================================
def add_stockout_records(self, records_list, batch_id):
"""
批次新增缺貨記錄 (用於 Excel 匯入)
Args:
records_list: 缺貨記錄清單 (dict list)
batch_id: 批次編號
Returns:
tuple: (成功筆數, 失敗筆數, 重複筆數)
"""
session = self.get_session()
success_count = 0
failed_count = 0
duplicate_count = 0
try:
for record in records_list:
try:
# 檢查是否為重複資料 (同一天 + 同廠商 + 同商品)
import_date = record.get('import_date', datetime.now().date())
vendor_code = record.get('vendor_code')
product_code = record.get('product_code')
existing = session.query(VendorStockout).filter_by(
import_date=import_date,
vendor_code=vendor_code,
product_code=product_code
).first()
if existing:
# 標記為重複並更新計數
existing.duplicate_count += 1
existing.status = 'duplicate'
duplicate_count += 1
sys_log.debug(f"偵測到重複資料 | 廠商: {vendor_code} | 商品: {product_code}")
continue
# 建立新記錄
stockout = VendorStockout(
batch_id=batch_id,
import_date=import_date,
department=record.get('department'),
section=record.get('section'),
pm_name=record.get('pm_name'),
zone_id=record.get('zone_id'),
zone_name=record.get('zone_name'),
product_code=product_code,
product_name=record.get('product_name'),
product_spec=record.get('product_spec'),
borrow_transfer=record.get('borrow_transfer'),
sale_price=record.get('sale_price'),
cost_price=record.get('cost_price'),
vendor_code=vendor_code,
vendor_name=record.get('vendor_name'),
monthly_sales_qty=record.get('monthly_sales_qty'),
monthly_sales_amount=record.get('monthly_sales_amount'),
daily_avg_sales=record.get('daily_avg_sales'),
current_stock=record.get('current_stock'),
stockout_date=record.get('stockout_date'),
stockout_days=record.get('stockout_days'),
safe_stock_days=record.get('safe_stock_days'),
notes=record.get('notes')
)
session.add(stockout)
success_count += 1
# 每 100 筆提交一次
if success_count % 100 == 0:
session.commit()
except Exception as e:
sys_log.error(f"❌ 新增缺貨記錄失敗 | 錯誤: {e}")
failed_count += 1
continue
# 最後提交
session.commit()
sys_log.info(f"✅ 批次匯入完成 | 成功: {success_count} | 失敗: {failed_count} | 重複: {duplicate_count}")
return success_count, failed_count, duplicate_count
except Exception as e:
session.rollback()
sys_log.error(f"❌ 批次匯入失敗 | 錯誤: {e}")
return success_count, failed_count, duplicate_count
finally:
session.close()
def get_stockout_records(self, batch_id=None, vendor_code=None, status=None,
start_date=None, end_date=None, limit=None):
"""
查詢缺貨記錄
Args:
batch_id: 批次編號 (可選)
vendor_code: 廠商代碼 (可選)
status: 狀態 (可選)
start_date: 開始日期 (可選)
end_date: 結束日期 (可選)
limit: 限制筆數 (可選)
Returns:
list: 缺貨記錄清單
"""
session = self.get_session()
try:
query = session.query(VendorStockout)
# 篩選條件
if batch_id:
query = query.filter_by(batch_id=batch_id)
if vendor_code:
query = query.filter_by(vendor_code=vendor_code)
if status:
query = query.filter_by(status=status)
if start_date:
query = query.filter(VendorStockout.import_date >= start_date)
if end_date:
query = query.filter(VendorStockout.import_date <= end_date)
# 排序與限制
query = query.order_by(VendorStockout.import_date.desc())
if limit:
query = query.limit(limit)
records = query.all()
return records
finally:
session.close()
def update_stockout_status(self, stockout_id, status, error_message=None,
sent_date=None, sent_by=None):
"""
更新缺貨記錄狀態
Args:
stockout_id: 缺貨記錄 ID
status: 狀態
error_message: 錯誤訊息 (可選)
sent_date: 發送時間 (可選)
sent_by: 發送人員 (可選)
Returns:
bool: 是否更新成功
"""
session = self.get_session()
try:
stockout = session.query(VendorStockout).filter_by(id=stockout_id).first()
if not stockout:
sys_log.warning(f"缺貨記錄不存在 | ID: {stockout_id}")
return False
stockout.status = status
if error_message is not None:
stockout.error_message = error_message
if sent_date is not None:
stockout.sent_date = sent_date
if sent_by is not None:
stockout.sent_by = sent_by
stockout.updated_at = datetime.now()
session.commit()
sys_log.debug(f"更新缺貨記錄狀態 | ID: {stockout_id} | 狀態: {status}")
return True
except Exception as e:
session.rollback()
sys_log.error(f"❌ 更新缺貨記錄狀態失敗 | ID: {stockout_id} | 錯誤: {e}")
return False
finally:
session.close()
# ==========================================
# 郵件發送記錄管理
# ==========================================
def add_email_log(self, vendor_code, batch_id, sender_email, recipient_email,
subject, product_count, cc_emails=None, bcc_emails=None,
attachment_filename=None, attachment_size=None, stockout_id=None):
"""
新增郵件發送記錄
Args:
vendor_code: 廠商代碼
batch_id: 發送批次編號
sender_email: 寄件者郵件
recipient_email: 收件者郵件
subject: 郵件主旨
product_count: 商品數量
cc_emails: CC 郵件清單 (JSON 字串)
bcc_emails: BCC 郵件清單 (JSON 字串)
attachment_filename: 附件檔名
attachment_size: 附件大小
stockout_id: 缺貨記錄 ID
Returns:
EmailSendLog: 新增的記錄物件,若失敗則回傳 None
"""
session = self.get_session()
try:
# 查詢廠商
vendor = session.query(VendorList).filter_by(vendor_code=vendor_code).first()
if not vendor:
sys_log.warning(f"廠商不存在 | 代碼: {vendor_code}")
return None
# 建立記錄
log = EmailSendLog(
vendor_id=vendor.id,
stockout_id=stockout_id,
batch_id=batch_id,
sender_email=sender_email,
recipient_email=recipient_email,
cc_emails=cc_emails,
bcc_emails=bcc_emails,
subject=subject,
product_count=product_count,
attachment_filename=attachment_filename,
attachment_size=attachment_size,
status='pending'
)
session.add(log)
session.commit()
sys_log.debug(f"新增郵件發送記錄 | 廠商: {vendor_code} | 收件者: {recipient_email}")
return log
except Exception as e:
session.rollback()
sys_log.error(f"❌ 新增郵件發送記錄失敗 | 廠商: {vendor_code} | 錯誤: {e}")
return None
finally:
session.close()
def update_email_log_status(self, log_id, status, error_message=None, sent_at=None):
"""
更新郵件發送記錄狀態
Args:
log_id: 記錄 ID
status: 狀態
error_message: 錯誤訊息 (可選)
sent_at: 發送時間 (可選)
Returns:
bool: 是否更新成功
"""
session = self.get_session()
try:
log = session.query(EmailSendLog).filter_by(id=log_id).first()
if not log:
sys_log.warning(f"郵件發送記錄不存在 | ID: {log_id}")
return False
log.status = status
if error_message is not None:
log.error_message = error_message
if sent_at is not None:
log.sent_at = sent_at
session.commit()
sys_log.debug(f"更新郵件發送記錄狀態 | ID: {log_id} | 狀態: {status}")
return True
except Exception as e:
session.rollback()
sys_log.error(f"❌ 更新郵件發送記錄狀態失敗 | ID: {log_id} | 錯誤: {e}")
return False
finally:
session.close()