Files
ewoooc/services/vendor_stockout_query_service.py
OoO ea15aa6437
Some checks failed
CD Pipeline / deploy (push) Failing after 8m57s
refactor(vendor): 抽出廠商管理查詢服務
2026-05-01 14:20:09 +08:00

395 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
廠商缺貨查詢服務
集中管理 Vendor V2 頁面所需的統計、篩選、分頁與排序查詢,讓
routes/vendor_routes.py 保持 thin Blueprint。
"""
from sqlalchemy import desc, func, or_
from database.vendor_models import EmailSendLog, VendorEmail, VendorList, VendorStockout
def _pending_stockout_filter():
return or_(
VendorStockout.status == 'pending',
VendorStockout.status.is_(None)
)
def get_vendor_dashboard_stats(vendor_db):
"""彙整廠商缺貨首頁 V2 所需的真實資料庫統計。"""
session = vendor_db.get_session()
try:
total_stockouts = session.query(VendorStockout).count()
pending_stockouts = session.query(VendorStockout).filter(_pending_stockout_filter()).count()
sent_stockouts = session.query(VendorStockout).filter(VendorStockout.status == 'sent').count()
failed_stockouts = session.query(VendorStockout).filter(VendorStockout.status == 'failed').count()
duplicate_stockouts = session.query(VendorStockout).filter(VendorStockout.is_duplicate.is_(True)).count()
distinct_vendor_count = session.query(VendorStockout.vendor_code).distinct().count()
active_vendors = session.query(VendorList).filter(VendorList.is_active.is_(True)).count()
inactive_vendors = session.query(VendorList).filter(VendorList.is_active.is_(False)).count()
active_emails = session.query(VendorEmail).filter(VendorEmail.is_active.is_(True)).count()
email_total = session.query(EmailSendLog).count()
email_success = session.query(EmailSendLog).filter(EmailSendLog.status == 'sent').count()
email_failed = session.query(EmailSendLog).filter(EmailSendLog.status == 'failed').count()
email_pending = session.query(EmailSendLog).filter(EmailSendLog.status == 'pending').count()
email_success_rate = round(email_success / email_total * 100, 1) if email_total else 0
latest_import = session.query(VendorStockout).order_by(desc(VendorStockout.created_at)).first()
latest_email = session.query(EmailSendLog).order_by(desc(EmailSendLog.created_at)).first()
latest_batch_time = func.max(VendorStockout.created_at).label('latest_date')
latest_batch = session.query(
VendorStockout.batch_id,
func.count(VendorStockout.id).label('record_count'),
latest_batch_time
).group_by(VendorStockout.batch_id).order_by(desc(latest_batch_time)).first()
return {
'total_stockouts': total_stockouts,
'pending_stockouts': pending_stockouts,
'sent_stockouts': sent_stockouts,
'failed_stockouts': failed_stockouts,
'duplicate_stockouts': duplicate_stockouts,
'distinct_vendor_count': distinct_vendor_count,
'active_vendors': active_vendors,
'inactive_vendors': inactive_vendors,
'active_emails': active_emails,
'email_total': email_total,
'email_success': email_success,
'email_failed': email_failed,
'email_pending': email_pending,
'email_success_rate': email_success_rate,
'latest_import_time': latest_import.created_at if latest_import else None,
'latest_import_vendor': latest_import.vendor_name if latest_import else None,
'latest_import_product': latest_import.product_name if latest_import else None,
'latest_email_time': latest_email.created_at if latest_email else None,
'latest_email_status': latest_email.status if latest_email else None,
'latest_email_vendor_id': latest_email.vendor_id if latest_email else None,
'latest_batch_id': latest_batch.batch_id if latest_batch else None,
'latest_batch_count': latest_batch.record_count if latest_batch else 0,
'latest_batch_time': latest_batch.latest_date if latest_batch else None,
}
finally:
session.close()
def _apply_stockout_filters(query, batch_id=None, vendor_keyword=None):
if batch_id:
query = query.filter(VendorStockout.batch_id == batch_id)
if vendor_keyword:
query = query.filter(or_(
VendorStockout.vendor_code.like(f'%{vendor_keyword}%'),
VendorStockout.vendor_name.like(f'%{vendor_keyword}%'),
VendorStockout.product_code.like(f'%{vendor_keyword}%'),
VendorStockout.product_name.like(f'%{vendor_keyword}%')
))
return query
def _apply_stockout_status_filter(query, status_filter):
if status_filter == 'pending':
return query.filter(_pending_stockout_filter()), status_filter
if status_filter in ['sent', 'failed']:
return query.filter(VendorStockout.status == status_filter), status_filter
if status_filter == 'duplicate':
return query.filter(VendorStockout.is_duplicate.is_(True)), status_filter
return query, 'all'
def _apply_stockout_sort(query, sort_by):
if sort_by == 'created_asc':
return query.order_by(VendorStockout.created_at.asc()), sort_by
if sort_by == 'vendor_asc':
return query.order_by(VendorStockout.vendor_code.asc(), VendorStockout.created_at.desc()), sort_by
if sort_by == 'stockout_days_desc':
return query.order_by(
VendorStockout.stockout_days.desc().nullslast(),
VendorStockout.created_at.desc()
), sort_by
return query.order_by(VendorStockout.created_at.desc()), 'created_desc'
def get_vendor_stockout_list_context(
vendor_db,
page=1,
page_size=30,
status_filter='all',
batch_filter='',
vendor_keyword='',
sort_by='created_desc',
):
"""缺貨清單 V2以資料庫真實資料組裝篩選、分頁與列表。"""
page = max(page or 1, 1)
page_size = min(max(page_size or 30, 10), 100)
status_filter = (status_filter or 'all').strip()
batch_filter = (batch_filter or '').strip()
vendor_keyword = (vendor_keyword or '').strip()
sort_by = (sort_by or 'created_desc').strip()
session = vendor_db.get_session()
try:
filtered_base = _apply_stockout_filters(
session.query(VendorStockout),
batch_filter,
vendor_keyword
)
query, status_filter = _apply_stockout_status_filter(filtered_base, status_filter)
query, sort_by = _apply_stockout_sort(query, sort_by)
total_items = query.count()
total_pages = max((total_items + page_size - 1) // page_size, 1)
if page > total_pages:
page = total_pages
records = query.offset((page - 1) * page_size).limit(page_size).all()
latest_batch_time = func.max(VendorStockout.created_at).label('latest_date')
batches = session.query(
VendorStockout.batch_id,
func.count(VendorStockout.id).label('record_count'),
latest_batch_time
).group_by(VendorStockout.batch_id).order_by(desc(latest_batch_time)).limit(30).all()
return {
'records': records,
'batches': batches,
'total_items': total_items,
'total_pages': total_pages,
'current_page': page,
'page_size': page_size,
'current_status': status_filter,
'current_batch': batch_filter,
'search_query': vendor_keyword,
'current_sort': sort_by,
'stats': {
'total': filtered_base.count(),
'pending': filtered_base.filter(_pending_stockout_filter()).count(),
'sent': filtered_base.filter(VendorStockout.status == 'sent').count(),
'failed': filtered_base.filter(VendorStockout.status == 'failed').count(),
'duplicate': filtered_base.filter(VendorStockout.is_duplicate.is_(True)).count(),
'vendor_count': filtered_base.with_entities(VendorStockout.vendor_code).distinct().count(),
}
}
finally:
session.close()
def _apply_stockout_api_filters(query, batch_number=None, vendor_filter=None):
if batch_number:
query = query.filter(VendorStockout.batch_id == batch_number)
if vendor_filter:
query = query.filter(or_(
VendorStockout.vendor_code.like(f'%{vendor_filter}%'),
VendorStockout.vendor_name.like(f'%{vendor_filter}%')
))
return query
def _apply_stockout_api_sort(query, sort_by):
if sort_by == 'created_at_asc':
return query.order_by(VendorStockout.created_at.asc())
if sort_by == 'vendor_code_asc':
return query.order_by(VendorStockout.vendor_code.asc())
if sort_by == 'stockout_days_desc':
return query.order_by(VendorStockout.safe_stock_days.desc())
return query.order_by(VendorStockout.created_at.desc())
def _serialize_stockout_record(record):
return {
'id': record.id,
'batch_number': record.batch_id,
'vendor_code': record.vendor_code,
'vendor_name': record.vendor_name,
'product_code': record.product_code,
'product_name': record.product_name,
'stockout_days': record.safe_stock_days,
'daily_avg_sales': float(record.daily_avg_sales) if record.daily_avg_sales else None,
'current_stock': record.current_stock,
'send_status': record.status or 'pending',
'created_at': record.created_at.isoformat() if record.created_at else None,
'notes': record.notes
}
def get_stockout_api_list_payload(
vendor_db,
page=1,
page_size=50,
batch_number=None,
vendor_filter=None,
status_filter=None,
sort_by='created_at_desc',
):
"""回傳舊版缺貨清單 API 相容 payload。"""
page = page or 1
page_size = page_size or 50
session = vendor_db.get_session()
try:
query = _apply_stockout_api_filters(
session.query(VendorStockout),
batch_number=batch_number,
vendor_filter=vendor_filter
)
if status_filter:
query = query.filter(VendorStockout.status == status_filter)
query = _apply_stockout_api_sort(query, sort_by)
total = query.count()
records = query.offset((page - 1) * page_size).limit(page_size).all()
stats_query = _apply_stockout_api_filters(
session.query(VendorStockout),
batch_number=batch_number,
vendor_filter=vendor_filter
)
total_stats = stats_query.count()
pending_count = stats_query.filter(_pending_stockout_filter()).count()
sent_count = stats_query.filter(VendorStockout.status == 'sent').count()
vendor_count = session.query(VendorStockout.vendor_code).distinct().count()
return {
'records': [_serialize_stockout_record(record) for record in records],
'total': total,
'page': page,
'page_size': page_size,
'stats': {
'total': total_stats,
'pending': pending_count,
'sent': sent_count,
'vendor_count': vendor_count
}
}
finally:
session.close()
def get_stockout_batches_payload(vendor_db):
"""回傳舊版批次清單 API 相容 payload。"""
session = vendor_db.get_session()
try:
batches = session.query(
VendorStockout.batch_id,
func.count(VendorStockout.id).label('count'),
func.max(VendorStockout.created_at).label('latest_date')
).group_by(VendorStockout.batch_id)\
.order_by(desc('latest_date'))\
.all()
return [
{
'batch_number': batch.batch_id,
'count': batch.count,
'latest_date': batch.latest_date.isoformat() if batch.latest_date else None
}
for batch in batches
]
finally:
session.close()
def _serialize_vendor(vendor, emails):
return {
'id': vendor.id,
'vendor_code': vendor.vendor_code,
'vendor_name': vendor.vendor_name,
'is_active': vendor.is_active,
'email_count': len(emails),
'emails': emails,
'created_at': vendor.created_at.isoformat() if vendor.created_at else None
}
def get_vendor_list_payload(
vendor_db,
page=1,
page_size=20,
search='',
active_only=True,
):
"""回傳舊版廠商清單 API 相容 payload。"""
page = page or 1
page_size = page_size or 20
search = (search or '').strip()
session = vendor_db.get_session()
try:
query = session.query(VendorList)
if active_only:
query = query.filter(VendorList.is_active.is_(True))
if search:
query = query.filter(or_(
VendorList.vendor_code.like(f'%{search}%'),
VendorList.vendor_name.like(f'%{search}%')
))
total = query.count()
vendors = query.order_by(VendorList.vendor_code.asc())\
.offset((page - 1) * page_size)\
.limit(page_size)\
.all()
vendors_data = []
for vendor in vendors:
emails = [
email.email
for email in session.query(VendorEmail).filter(
VendorEmail.vendor_id == vendor.id,
VendorEmail.is_active.is_(True)
).all()
]
vendors_data.append(_serialize_vendor(vendor, emails))
total_vendors = session.query(VendorList).filter(VendorList.is_active.is_(True)).count()
total_emails = session.query(VendorEmail).filter(VendorEmail.is_active.is_(True)).count()
avg_emails = round(total_emails / total_vendors, 1) if total_vendors > 0 else 0
return {
'vendors': vendors_data,
'total': total,
'page': page,
'page_size': page_size,
'stats': {
'total_vendors': total_vendors,
'total_emails': total_emails,
'avg_emails': avg_emails
}
}
finally:
session.close()
def get_vendor_detail_payload(vendor_db, vendor_code):
"""回傳單一廠商 API 相容 payload找不到時回傳 None。"""
session = vendor_db.get_session()
try:
vendor = session.query(VendorList).filter_by(vendor_code=vendor_code).first()
if not vendor:
return None
emails = [
email.email
for email in session.query(VendorEmail).filter(
VendorEmail.vendor_id == vendor.id,
VendorEmail.is_active.is_(True)
).all()
]
return _serialize_vendor(vendor, emails)
finally:
session.close()