fix(campaign): persist full crawl snapshots
All checks were successful
CD Pipeline / deploy (push) Successful in 2m22s

This commit is contained in:
OoO
2026-05-01 20:48:28 +08:00
parent bb99dfeab6
commit c1f43b0ae4
4 changed files with 104 additions and 49 deletions

4
app.py
View File

@@ -95,8 +95,8 @@ except Exception as e:
sys_log.error(f"無法檢測磁碟空間: {e}")
# 🚩 系統版本定義 (備份與顯示用)
# 🚩 2026-05-01 V10.70: Restore campaign operations table signals
SYSTEM_VERSION = "V10.70"
# 🚩 2026-05-01 V10.71: Persist full campaign crawl snapshots
SYSTEM_VERSION = "V10.71"
# ==========================================
# 🔒 SQL Injection 防護函數

View File

@@ -254,7 +254,7 @@ YOUTUBE_API_KEY = os.getenv('YOUTUBE_API_KEY', '')
# ==========================================
# 系統版本與路徑
# ==========================================
SYSTEM_VERSION = "V10.70"
SYSTEM_VERSION = "V10.71"
LOG_FILE_PATH = os.path.join(BASE_DIR, 'logs/system.log')
public_url = PUBLIC_URL # 用於模板顯示

View File

@@ -553,6 +553,7 @@ def run_edm_task(lpn_code="O1K5FBOqsvN"):
logging.info(f"[Crawler] [EDM] 📦 偵測到 {len(product_areas)} 個商品區塊")
count = 0
snapshot_count = 0
current_scan_icodes = set()
seen_time_slots = set() # V9.60: 記錄本次掃描到的所有時段
changed_products = [] # V-New: 收集異動商品以發送通知
@@ -722,32 +723,34 @@ def run_edm_task(lpn_code="O1K5FBOqsvN"):
status_change = "UPDATE"
previous_price = prev_record.previous_price
# 只記錄有變動的 (新增 或 價格變動)
if status_change != "NONE":
new_promo = PromoProduct(
batch_id=batch_id,
i_code=i_code,
name=name,
price=price,
discount_text=discount_text,
url=link_url,
previous_price=previous_price, # V9.64: 寫入舊價格
time_slot=time_slot,
status_change=status_change,
crawled_at=now,
activity_time_text=activity_time_text,
session_time_text=session_time_text,
remain_qty=remain_qty,
page_type=PAGE_TYPE
)
# V9.62: 嘗試寫入圖片 (若 Model 尚未更新欄位定義,此行可能無效,但 DB 已有欄位)
new_promo.image_url = image_url
session.add(new_promo)
is_changed = status_change != "NONE"
new_promo = PromoProduct(
batch_id=batch_id,
i_code=i_code,
name=name,
price=price,
discount_text=discount_text,
url=link_url,
previous_price=previous_price, # V9.64: 寫入舊價格
time_slot=time_slot,
status_change=status_change if is_changed else "ACTIVE",
crawled_at=now,
activity_time_text=activity_time_text,
session_time_text=session_time_text,
remain_qty=remain_qty,
page_type=PAGE_TYPE
)
# V9.62: 嘗試寫入圖片 (若 Model 尚未更新欄位定義,此行可能無效,但 DB 已有欄位)
new_promo.image_url = image_url
session.add(new_promo)
snapshot_count += 1
if is_changed:
changed_products.append(new_promo)
count += 1
else:
logging.debug(f"[Crawler] [EDM] [=] 無變動跳過 | Name: {name}")
logging.debug(f"[Crawler] [EDM] [=] 無變動快照已寫入 | Name: {name}")
except Exception as e:
logging.warning(f"[Crawler] [EDM] ⚠️ EDM 單一商品解析失敗 | Error: {e}")
continue # 單一商品失敗不影響整體
@@ -778,6 +781,7 @@ def run_edm_task(lpn_code="O1K5FBOqsvN"):
)
delisted_promo.image_url = record.image_url if hasattr(record, 'image_url') else None
session.add(delisted_promo)
snapshot_count += 1
changed_products.append(delisted_promo)
count += 1
@@ -840,9 +844,10 @@ def run_edm_task(lpn_code="O1K5FBOqsvN"):
except Exception as e:
logging.error(f"[Crawler] [EDM] ❌ 發送通知時發生錯誤 | Error: {e}")
logging.info(f"[Crawler] [EDM] ✅ EDM 任務完成 | New Records: {count} | Batch: {batch_id}")
logging.info(f"[Crawler] [EDM] ✅ EDM 任務完成 | Changed Records: {count} | Snapshot Records: {snapshot_count} | Batch: {batch_id}")
stats = {
"changed_records": count,
"snapshot_records": snapshot_count,
"batch_id": batch_id,
"status": "Success"
}
@@ -1018,6 +1023,7 @@ def run_festival_task(lpn_code="O7ylWfihYUM"):
logging.warning("[Crawler] [Festival] 🚨 未偵測到任何商品區塊 | Action: 任務提前結束 | Info: 請檢查偵錯檔案")
return
count = 0
snapshot_count = 0
current_scan_items = set()
seen_groups = set()
changed_products = [] # V-New: 收集異動商品以發送通知
@@ -1155,19 +1161,22 @@ def run_festival_task(lpn_code="O7ylWfihYUM"):
status_change = "UPDATE"
logging.info(f"[Crawler] [Festival] -> 狀態: 圖片更新 (UPDATE)")
if status_change != "NONE":
new_promo = PromoProduct(
batch_id=batch_id, i_code=i_code, name=name, price=price, url=link_url,
image_url=image_url, previous_price=previous_price, time_slot=group_title,
status_change=status_change, crawled_at=now, activity_time_text=activity_name,
session_time_text=group_title, page_type=PAGE_TYPE
)
session.add(new_promo)
is_changed = status_change != "NONE"
new_promo = PromoProduct(
batch_id=batch_id, i_code=i_code, name=name, price=price, url=link_url,
image_url=image_url, previous_price=previous_price, time_slot=group_title,
status_change=status_change if is_changed else "ACTIVE", crawled_at=now, activity_time_text=activity_name,
session_time_text=group_title, page_type=PAGE_TYPE
)
session.add(new_promo)
snapshot_count += 1
if is_changed:
changed_products.append(new_promo) # V-New: 收集異動商品
count += 1
logging.info(f"[Crawler] [Festival] -> 寫入資料庫: {status_change}")
else:
logging.info("[Crawler] [Festival] -> 狀態: 無變動 (NONE) | Action: Skip Write")
logging.info("[Crawler] [Festival] -> 狀態: 無變動 (ACTIVE) | Action: Snapshot Written")
except Exception as e:
logging.error(f"[Crawler] [Festival] ❌ 解析商品時發生未預期錯誤 | Error: {e}")
@@ -1184,12 +1193,13 @@ def run_festival_task(lpn_code="O7ylWfihYUM"):
session_time_text=getattr(record, 'session_time_text', activity_name), page_type=PAGE_TYPE
)
session.add(delisted_promo)
snapshot_count += 1
changed_products.append(delisted_promo) # V-New: 收集下架商品
count += 1
session.commit()
logging.info(f"[Crawler] [Festival] ✅ {PAGE_TYPE} 任務完成 | New Records: {count} | Batch: {batch_id}")
stats = { "changed_records": count, "batch_id": batch_id, "status": "Success" }
logging.info(f"[Crawler] [Festival] ✅ {PAGE_TYPE} 任務完成 | Changed Records: {count} | Snapshot Records: {snapshot_count} | Batch: {batch_id}")
stats = { "changed_records": count, "snapshot_records": snapshot_count, "batch_id": batch_id, "status": "Success" }
_save_stats('festival_task', stats) # 為統計資料使用新的任務名稱
# V-New: 發送通知 - 如果有異動商品則發送 Telegram 和 Line 通知
@@ -1357,6 +1367,7 @@ def run_promo_event_task(lpn_code, page_type, activity_name):
return
count = 0
snapshot_count = 0
current_scan_items = set()
seen_groups = set()
changed_products = []
@@ -1485,19 +1496,22 @@ def run_promo_event_task(lpn_code, page_type, activity_name):
status_change = "UPDATE"
logging.info(f"[Crawler] [{page_type.upper()}] -> 狀態: 圖片更新 (UPDATE)")
if status_change != "NONE":
new_promo = PromoProduct(
batch_id=batch_id, i_code=i_code, name=name, price=price, url=link_url,
image_url=image_url, previous_price=previous_price, time_slot=group_title,
status_change=status_change, crawled_at=now, activity_time_text=activity_name,
session_time_text=group_title, page_type=page_type
)
session.add(new_promo)
is_changed = status_change != "NONE"
new_promo = PromoProduct(
batch_id=batch_id, i_code=i_code, name=name, price=price, url=link_url,
image_url=image_url, previous_price=previous_price, time_slot=group_title,
status_change=status_change if is_changed else "ACTIVE", crawled_at=now, activity_time_text=activity_name,
session_time_text=group_title, page_type=page_type
)
session.add(new_promo)
snapshot_count += 1
if is_changed:
changed_products.append(new_promo)
count += 1
logging.info(f"[Crawler] [{page_type.upper()}] -> 寫入資料庫: {status_change}")
else:
logging.info(f"[Crawler] [{page_type.upper()}] -> 狀態: 無變動 (NONE) | Action: Skip Write")
logging.info(f"[Crawler] [{page_type.upper()}] -> 狀態: 無變動 (ACTIVE) | Action: Snapshot Written")
except Exception as e:
logging.error(f"[Crawler] [{page_type.upper()}] ❌ 解析商品時發生未預期錯誤 | Error: {e}")
@@ -1513,12 +1527,13 @@ def run_promo_event_task(lpn_code, page_type, activity_name):
session_time_text=getattr(record, 'session_time_text', activity_name), page_type=page_type
)
session.add(delisted_promo)
snapshot_count += 1
changed_products.append(delisted_promo)
count += 1
session.commit()
logging.info(f"[Crawler] [{page_type.upper()}] ✅ {page_type} 任務完成 | New Records: {count} | Batch: {batch_id}")
stats = { "changed_records": count, "batch_id": batch_id, "status": "Success" }
logging.info(f"[Crawler] [{page_type.upper()}] ✅ {page_type} 任務完成 | Changed Records: {count} | Snapshot Records: {snapshot_count} | Batch: {batch_id}")
stats = { "changed_records": count, "snapshot_records": snapshot_count, "batch_id": batch_id, "status": "Success" }
_save_stats(f'{page_type}_task', stats)
if changed_products:

View File

@@ -0,0 +1,40 @@
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
def test_promo_crawlers_append_full_snapshots_for_unchanged_items():
scheduler_source = (ROOT / "scheduler.py").read_text(encoding="utf-8")
assert 'status_change=status_change if is_changed else "ACTIVE"' in scheduler_source
assert "snapshot_count = 0" in scheduler_source
assert "snapshot_count += 1" in scheduler_source
assert '"snapshot_records": snapshot_count' in scheduler_source
assert "無變動快照已寫入" in scheduler_source
assert "Snapshot Written" in scheduler_source
assert "Skip Write" not in scheduler_source
assert "無變動跳過" not in scheduler_source
def test_promo_product_model_contains_crawled_fields_needed_for_history():
model_source = (ROOT / "database/edm_models.py").read_text(encoding="utf-8")
for field in [
"batch_id",
"crawled_at",
"time_slot",
"activity_time_text",
"session_time_text",
"i_code",
"name",
"price",
"discount_text",
"url",
"image_url",
"previous_price",
"remain_qty",
"status_change",
"page_type",
]:
assert field in model_source