import logging from sqlalchemy import desc from database.manager import DatabaseManager from database.edm_models import PromoProduct # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') def cleanup_duplicate_promo_products(): """ 清理 promo_products 表,為每個 i_code 只保留最新的一筆紀錄。 """ db = DatabaseManager() session = db.get_session() try: logging.info("🧹 開始清理限時搶購 (promo_products) 資料庫中的重複數據...") # 1. 找出所有唯一的 i_code unique_icodes_query = session.query(PromoProduct.i_code).distinct() unique_icodes = [item[0] for item in unique_icodes_query.all()] total_unique_items = len(unique_icodes) logging.info(f"🔍 發現 {total_unique_items} 個不重複的商品 (i_code)。") deleted_count = 0 processed_count = 0 # 2. 遍歷每個 i_code for i_code in unique_icodes: # 找出該 i_code 的所有紀錄,按 ID 降序排列 (ID 越大代表越新) records = session.query(PromoProduct).filter(PromoProduct.i_code == i_code).order_by(desc(PromoProduct.id)).all() if len(records) > 1: # 保留最新的一筆 (records[0]),刪除其餘的 (records[1:]) records_to_delete = records[1:] num_to_delete = len(records_to_delete) for record in records_to_delete: session.delete(record) deleted_count += num_to_delete processed_count += 1 if processed_count % 100 == 0: logging.info(f"🔄 已處理 {processed_count}/{total_unique_items} 個商品...") # 3. 提交變更 if deleted_count > 0: logging.info(f"⏳ 正在提交資料庫變更,準備刪除 {deleted_count} 筆舊紀錄...") session.commit() logging.info(f"✅ 清理完成!總共刪除了 {deleted_count} 筆重複的舊資料。") else: logging.info("✅ 資料庫很乾淨,無需清理。") except Exception as e: logging.error(f"❌ 清理過程中發生錯誤: {e}") session.rollback() finally: session.close() if __name__ == "__main__": cleanup_duplicate_promo_products()