Some checks failed
CD Pipeline / deploy (push) Failing after 59s
- 建立 Gitea Actions CD pipeline (.gitea/workflows/cd.yaml) - 部署模式: rsync Python 檔案至 188 → docker restart (volume mount) - Dockerfile/requirements 變動時自動重建 Docker image - 部署通知: Telegram (開始/成功/失敗) - 健康檢查: https://mo.wooo.work/health (最多 5 次重試) - 同步最新 CLAUDE.md / ADR-008 / memory (2026-04-19) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
199 lines
6.4 KiB
Python
199 lines
6.4 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
SQLite 資料庫優化腳本
|
|
建議在低流量時段執行(晚上 8 點後)
|
|
|
|
功能:
|
|
1. 添加缺失的索引(提升查詢效能)
|
|
2. 執行 VACUUM 清理(回收空間、優化儲存)
|
|
3. 分析資料庫統計(更新查詢規劃器統計資訊)
|
|
"""
|
|
|
|
import sqlite3
|
|
import os
|
|
import sys
|
|
import time
|
|
from datetime import datetime
|
|
|
|
# 資料庫路徑
|
|
DB_PATH = '/home/ogt/momo_pro_system/data/momo_database.db'
|
|
|
|
# 日誌函數
|
|
def log(message, level="INFO"):
|
|
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
|
print(f"[{timestamp}] [{level}] {message}")
|
|
|
|
# 檢查資料庫大小
|
|
def get_db_size():
|
|
size_bytes = os.path.getsize(DB_PATH)
|
|
size_mb = size_bytes / (1024 * 1024)
|
|
return size_bytes, size_mb
|
|
|
|
# 檢查索引是否存在
|
|
def index_exists(cursor, index_name):
|
|
cursor.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='index' AND name=?",
|
|
(index_name,)
|
|
)
|
|
return cursor.fetchone() is not None
|
|
|
|
# 添加索引
|
|
def add_indexes(conn):
|
|
log("開始添加索引...")
|
|
cursor = conn.cursor()
|
|
|
|
indexes = [
|
|
# products 表索引
|
|
("idx_products_category", "CREATE INDEX IF NOT EXISTS idx_products_category ON products (category)"),
|
|
("idx_products_status", "CREATE INDEX IF NOT EXISTS idx_products_status ON products (status)"),
|
|
("idx_products_updated_at", "CREATE INDEX IF NOT EXISTS idx_products_updated_at ON products (updated_at DESC)"),
|
|
|
|
# price_records 表索引
|
|
("idx_price_records_product_id", "CREATE INDEX IF NOT EXISTS idx_price_records_product_id ON price_records (product_id)"),
|
|
("idx_price_records_product_time", "CREATE INDEX IF NOT EXISTS idx_price_records_product_time ON price_records (product_id, timestamp DESC)"),
|
|
|
|
# promo_products 表索引
|
|
("idx_promo_crawled_at", "CREATE INDEX IF NOT EXISTS idx_promo_crawled_at ON promo_products (crawled_at DESC)"),
|
|
("idx_promo_batch_id", "CREATE INDEX IF NOT EXISTS idx_promo_batch_id ON promo_products (batch_id)"),
|
|
("idx_promo_status_change", "CREATE INDEX IF NOT EXISTS idx_promo_status_change ON promo_products (status_change)"),
|
|
]
|
|
|
|
added_count = 0
|
|
skipped_count = 0
|
|
|
|
for index_name, create_sql in indexes:
|
|
try:
|
|
if index_exists(cursor, index_name):
|
|
log(f" 索引已存在,跳過: {index_name}", "INFO")
|
|
skipped_count += 1
|
|
continue
|
|
|
|
log(f" 創建索引: {index_name}...", "INFO")
|
|
start_time = time.time()
|
|
cursor.execute(create_sql)
|
|
conn.commit()
|
|
elapsed = time.time() - start_time
|
|
log(f" ✓ 索引創建成功: {index_name} (耗時 {elapsed:.2f} 秒)", "SUCCESS")
|
|
added_count += 1
|
|
|
|
except Exception as e:
|
|
log(f" ✗ 索引創建失敗: {index_name} - {e}", "ERROR")
|
|
conn.rollback()
|
|
|
|
log(f"索引添加完成:新增 {added_count} 個,跳過 {skipped_count} 個", "INFO")
|
|
return added_count
|
|
|
|
# 執行 VACUUM
|
|
def run_vacuum(conn):
|
|
log("開始執行 VACUUM 清理...", "INFO")
|
|
log("⚠️ 注意:此操作會鎖定整個資料庫,期間無法讀寫", "WARNING")
|
|
|
|
try:
|
|
start_time = time.time()
|
|
conn.execute("VACUUM")
|
|
elapsed = time.time() - start_time
|
|
log(f"✓ VACUUM 執行成功 (耗時 {elapsed:.2f} 秒)", "SUCCESS")
|
|
return True
|
|
except Exception as e:
|
|
log(f"✗ VACUUM 執行失敗: {e}", "ERROR")
|
|
return False
|
|
|
|
# 分析資料庫統計
|
|
def analyze_database(conn):
|
|
log("開始分析資料庫統計...", "INFO")
|
|
try:
|
|
start_time = time.time()
|
|
conn.execute("ANALYZE")
|
|
elapsed = time.time() - start_time
|
|
log(f"✓ 資料庫分析完成 (耗時 {elapsed:.2f} 秒)", "SUCCESS")
|
|
return True
|
|
except Exception as e:
|
|
log(f"✗ 資料庫分析失敗: {e}", "ERROR")
|
|
return False
|
|
|
|
# 顯示優化前後對比
|
|
def show_statistics(before_size, after_size):
|
|
log("=" * 60)
|
|
log("資料庫優化統計報告", "INFO")
|
|
log("=" * 60)
|
|
log(f"優化前大小: {before_size:.2f} MB", "INFO")
|
|
log(f"優化後大小: {after_size:.2f} MB", "INFO")
|
|
|
|
if before_size > after_size:
|
|
saved = before_size - after_size
|
|
saved_pct = (saved / before_size) * 100
|
|
log(f"節省空間: {saved:.2f} MB ({saved_pct:.1f}%)", "SUCCESS")
|
|
else:
|
|
log("空間無變化(資料庫已經很緊湊)", "INFO")
|
|
|
|
log("=" * 60)
|
|
|
|
def main():
|
|
log("=" * 60)
|
|
log("SQLite 資料庫優化腳本啟動", "INFO")
|
|
log("=" * 60)
|
|
|
|
# 檢查資料庫是否存在
|
|
if not os.path.exists(DB_PATH):
|
|
log(f"錯誤:資料庫不存在 - {DB_PATH}", "ERROR")
|
|
sys.exit(1)
|
|
|
|
# 記錄開始時間和大小
|
|
before_bytes, before_mb = get_db_size()
|
|
log(f"當前資料庫大小: {before_mb:.2f} MB", "INFO")
|
|
|
|
try:
|
|
# 連接資料庫
|
|
log("連接到資料庫...", "INFO")
|
|
conn = sqlite3.connect(DB_PATH, timeout=30.0)
|
|
|
|
# 步驟 1: 添加索引
|
|
log("\n" + "=" * 60)
|
|
log("步驟 1/3: 添加索引", "INFO")
|
|
log("=" * 60)
|
|
added_count = add_indexes(conn)
|
|
|
|
# 步驟 2: 執行 VACUUM
|
|
log("\n" + "=" * 60)
|
|
log("步驟 2/3: 執行 VACUUM 清理", "INFO")
|
|
log("=" * 60)
|
|
vacuum_success = run_vacuum(conn)
|
|
|
|
# 步驟 3: 分析資料庫
|
|
log("\n" + "=" * 60)
|
|
log("步驟 3/3: 分析資料庫統計", "INFO")
|
|
log("=" * 60)
|
|
analyze_success = analyze_database(conn)
|
|
|
|
# 關閉連接
|
|
conn.close()
|
|
log("資料庫連接已關閉", "INFO")
|
|
|
|
# 顯示統計
|
|
after_bytes, after_mb = get_db_size()
|
|
log("")
|
|
show_statistics(before_mb, after_mb)
|
|
|
|
# 最終結果
|
|
log("")
|
|
log("=" * 60)
|
|
if added_count > 0 or vacuum_success:
|
|
log("✓ 資料庫優化完成!", "SUCCESS")
|
|
else:
|
|
log("資料庫優化執行完畢(無顯著變化)", "INFO")
|
|
log("=" * 60)
|
|
|
|
except sqlite3.OperationalError as e:
|
|
log(f"資料庫操作錯誤: {e}", "ERROR")
|
|
log("可能原因:資料庫正在被其他程序使用", "ERROR")
|
|
sys.exit(1)
|
|
except Exception as e:
|
|
log(f"未預期的錯誤: {e}", "ERROR")
|
|
import traceback
|
|
traceback.print_exc()
|
|
sys.exit(1)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|