Files
ewoooc/database/manager.py
OoO 74d64092bc
All checks were successful
CD Pipeline / deploy (push) Successful in 1m35s
fix(db): 收斂 DatabaseManager PostgreSQL 連線池
2026-04-30 10:08:31 +08:00

495 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import re
import threading
from sqlalchemy import create_engine, desc, select, text, literal
from sqlalchemy.engine.url import make_url
from sqlalchemy.orm import sessionmaker
from datetime import datetime
from .models import Base, Category, Product, PriceRecord, MonthlySummaryAnalysis
from .user_models import User, LoginHistory # noqa: F401 - 必須在 trend_models 之前導入,解決 ForeignKey 依賴問題
from .edm_models import PromoProduct # V-Fix: 確保 EDM 模型被註冊,以便自動建表
from .trend_models import TrendRecord, TrendKeyword, TrendAnalysis, WebSearchCache, TelegramUser # noqa: F401 - 趨勢資料表
from .permission_models import Permission, UserPermission # noqa: F401 - 確保權限表被 Base.metadata 管理
from .ai_models import AIGenerationHistory, AIPromptTemplate, AIUsageTracking, AIInsight # noqa: F401 - AI history/template 表
from .autoheal_models import ( # noqa: F401 - ADR-013 AIOps 自動修復表
AgentContext,
ActionPlan,
ActionOutcome,
AgentStrategyWeights,
Incident,
Playbook,
HealLog,
)
from .import_models import ImportJob, ImportConfig # noqa: F401 - 確保 import_jobs/import_config 被 Base.metadata 管理
from .notification_models import NotificationTemplate # noqa: F401 - 確保 notification_templates 表被 Base.metadata 管理
from .ppt_reports import PPTReport # noqa: F401 - 確保 ppt_reports 表被 Base.metadata 管理
from .vendor_models import VendorStockout, VendorList, VendorEmail, EmailSendLog # noqa: F401 - 確保 vendor 表被 Base.metadata 管理
from .realtime_sales_models import RealtimeSalesMonthly # noqa: F401 - 確保 realtime_sales_monthly 被 Base.metadata 管理
# 🚩 導入優化後的日誌管理模組
from utils.logger_manager import SystemLogger
# 初始化資料庫模組專用 Logger
sys_log = SystemLogger("Database").get_logger()
_metadata_init_lock = threading.Lock()
_metadata_initialized = False
_POSTGRES_METADATA_LOCK_ID = 170017
def ensure_metadata_initialized(engine, use_postgres_lock=False):
"""冪等初始化 SQLAlchemy metadata避免一般流程重複碰 DDL。"""
global _metadata_initialized
if _metadata_initialized:
return
with _metadata_init_lock:
if _metadata_initialized:
return
if use_postgres_lock:
with engine.begin() as conn:
conn.execute(text("SELECT pg_advisory_lock(:lock_id)"), {"lock_id": _POSTGRES_METADATA_LOCK_ID})
try:
Base.metadata.create_all(conn)
finally:
conn.execute(text("SELECT pg_advisory_unlock(:lock_id)"), {"lock_id": _POSTGRES_METADATA_LOCK_ID})
else:
Base.metadata.create_all(engine)
_metadata_initialized = True
def sanitize_timestamp(timestamp_str):
"""
驗證並清理時間戳字串,防止 SQL Injection
Args:
timestamp_str: 時間戳字串格式YYYY-MM-DD HH:MM:SS
Returns:
str: 驗證通過的時間戳
Raises:
ValueError: 格式不正確
"""
# 只允許標準時間格式YYYY-MM-DD HH:MM:SS
if not re.match(r'^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}$', timestamp_str):
raise ValueError(f"時間戳格式不正確: {timestamp_str}")
return timestamp_str
class DatabaseManager:
_instance_cache = {}
_instance_lock = threading.Lock()
def __init__(self, db_path=None):
"""
初始化資料庫連線。
優先使用 PostgreSQL (透過 config.py 設定),否則回退到 SQLite。
"""
# V-Fix (2026-01-23): 優先使用 config.py 的資料庫設定
from config import DATABASE_PATH, DATABASE_TYPE
effective_db_path = DATABASE_PATH if db_path is None else db_path
cache_key = (DATABASE_TYPE, str(effective_db_path))
# V-Fix (2026-04-30): DatabaseManager 在多 route 內被頻繁直接 new
# 若每次都 create_engine 會不斷新增連線池,最終把 PostgreSQL clients 用光。
with self._instance_lock:
cached = self._instance_cache.get(cache_key)
if cached:
self.engine = cached['engine']
self.Session = cached['Session']
return
if DATABASE_TYPE == 'postgresql':
# PostgreSQL 模式 - 使用 config.py 的連線字串
# V-Fix: gunicorn 多 worker 下需控制每個 process 的最大連線數。
self.engine = create_engine(
effective_db_path,
echo=False,
pool_pre_ping=True, # 自動檢測斷線連線
pool_size=2, # 每個 worker 保留少量常駐連線
max_overflow=3, # 突發上限,避免吃滿 PostgreSQL clients
pool_recycle=1800, # 30分鐘回收連線
pool_timeout=30, # 獲取連線超時
connect_args={
'connect_timeout': 10, # 連線超時 10 秒
'options': '-c statement_timeout=60000' # SQL 超時 60 秒
}
)
ensure_metadata_initialized(self.engine, use_postgres_lock=True)
self.Session = sessionmaker(bind=self.engine)
self._instance_cache[cache_key] = {
'engine': self.engine,
'Session': self.Session,
}
sys_log.info(f"[Database] ✅ 使用 PostgreSQL 資料庫 (連線池已收斂)")
# ADR-013: 確保 AIOps 自動修復表存在並植入種子 PlayBook
self._init_autoheal_tables()
else:
# SQLite 模式 - 向後相容
if db_path is None:
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
effective_db_path = os.path.join(base_dir, 'data', 'momo_database.db')
if str(effective_db_path).startswith('sqlite://'):
sqlite_db_file = make_url(effective_db_path).database
if sqlite_db_file:
os.makedirs(os.path.dirname(sqlite_db_file), exist_ok=True)
self.engine = create_engine(effective_db_path, echo=False)
else:
os.makedirs(os.path.dirname(effective_db_path), exist_ok=True)
self.engine = create_engine(f'sqlite:///{effective_db_path}', echo=False)
Base.metadata.create_all(self.engine)
self.Session = sessionmaker(bind=self.engine)
self._instance_cache[cache_key] = {
'engine': self.engine,
'Session': self.Session,
}
self._check_and_fix_schema()
sys_log.info(f"[Database] 使用 SQLite 資料庫: {effective_db_path}")
def _check_and_fix_schema(self):
"""自動檢查並修復資料庫結構 (僅限 SQLite)"""
# 此方法使用 SQLite PRAGMA 語法,不適用於 PostgreSQL
from config import DATABASE_TYPE
if DATABASE_TYPE == 'postgresql':
return # PostgreSQL 不需要此修復邏輯
session = self.get_session()
try:
# 1. 檢查 promo_products 是否缺少 url 欄位
result = session.execute(text("PRAGMA table_info(promo_products)")).fetchall()
if result:
columns = [row[1] for row in result]
if 'url' not in columns:
sys_log.warning("⚠️ 偵測到 promo_products 表缺少 url 欄位,正在自動修復...")
session.execute(text("ALTER TABLE promo_products ADD COLUMN url TEXT"))
# 2. 檢查 products 表是否缺少 status 與 updated_at 欄位
result_prod = session.execute(text("PRAGMA table_info(products)")).fetchall()
if result_prod:
prod_columns = [row[1] for row in result_prod]
if 'status' not in prod_columns:
sys_log.warning("⚠️ 偵測到 products 表缺少 status 欄位,正在自動修復...")
session.execute(text("ALTER TABLE products ADD COLUMN status TEXT DEFAULT 'ACTIVE'"))
if 'updated_at' not in prod_columns:
sys_log.warning("⚠️ 偵測到 products 表缺少 updated_at 欄位,正在自動修復...")
now_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 使用清理函數防止 SQL Injection
safe_timestamp = sanitize_timestamp(now_str)
session.execute(text(f"ALTER TABLE products ADD COLUMN updated_at TIMESTAMP DEFAULT '{safe_timestamp}'"))
session.commit()
except Exception as e:
sys_log.error(f"❌ 資料庫結構檢查失敗: {e}")
finally:
session.close()
def _init_autoheal_tables(self):
"""
ADR-013: 在 PostgreSQL 模式下,確保 AIOps 三張表存在並植入種子 PlayBook。
使用 Base.metadata.create_all 以 checkfirst=True 確保冪等執行。
"""
try:
# 建立表(已存在則略過)
from .autoheal_models import Incident, Playbook, HealLog, SEED_PLAYBOOKS
from sqlalchemy import inspect as sa_inspect
inspector = sa_inspect(self.engine)
existing_tables = inspector.get_table_names()
for model in [Playbook, Incident, HealLog]:
if model.__tablename__ not in existing_tables:
model.__table__.create(self.engine, checkfirst=True)
sys_log.info(f"[Database] ✅ 建立 AIOps 表: {model.__tablename__}")
# 植入種子 PlayBook首次
session = self.get_session()
try:
count = session.query(Playbook).count()
if count == 0:
for seed in SEED_PLAYBOOKS:
session.add(Playbook(**seed))
session.commit()
sys_log.info(f"[Database] ✅ 植入 {len(SEED_PLAYBOOKS)} 筆種子 PlayBook")
else:
sys_log.info(f"[Database] PlayBook 已有 {count} 筆,略過種子植入")
except Exception as e:
session.rollback()
sys_log.warning(f"[Database] 種子 PlayBook 植入失敗: {e}")
finally:
session.close()
except Exception as e:
sys_log.error(f"[Database] _init_autoheal_tables 失敗 (不影響主程序): {e}")
def get_session(self):
"""
提供外部調用的 Session 實例。
"""
return self.Session()
def update_data(self, product_list):
"""
🚀 批次異動偵測與日誌分析邏輯:
1. 利用記憶體快照進行三重比對,並記錄詳細數據流向。
2. 針對大量數據新增 (異常監控) 觸發警告等級日誌。
3. 維持每 100 筆分段 Commit 的效能優勢。
"""
session = self.get_session()
count_added = 0
count_skipped = 0
try:
# 1. 建立對比快取:一次抓出所有商品最後一筆紀錄
latest_prices = session.query(
Product.i_code, Product.name, PriceRecord.price
).join(PriceRecord).order_by(PriceRecord.timestamp.desc()).all()
db_cache = {row[0]: (row[1], row[2]) for row in latest_prices}
sys_log.info(f"💾 開始數據比對:目前資料庫已知商品數 {len(db_cache)}")
for item in product_list:
i_code = item['i_code']
current_name = item['name']
current_price = item['price']
# 2. 三重比對邏輯:偵測商品是否真的需要更新
if i_code in db_cache:
last_name, last_price = db_cache[i_code]
if last_name == current_name and last_price == current_price:
count_skipped += 1
continue
# 3. 處理分類項目
category = session.query(Category).filter_by(name=item['category']).first()
if not category:
category = Category(name=item['category'])
session.add(category)
session.flush()
# 4. 處理商品基本資訊
product = session.query(Product).filter_by(i_code=i_code).first()
if not product:
product = Product(
i_code=i_code,
name=current_name,
url=item['url'],
category=item['category'],
category_id=category.id
)
session.add(product)
session.flush()
else:
product.name = current_name
product.category = item['category']
product.category_id = category.id
# 5. 寫入新的價格紀錄
new_price = PriceRecord(
product_id=product.id,
price=current_price,
timestamp=datetime.now()
)
session.add(new_price)
count_added += 1
if count_added % 100 == 0:
session.commit()
# 7. 最後提交並彙報日誌
session.commit()
sys_log.info(f"📊 數據同步彙報: [新增監控: {count_added}] [跳過重複: {count_skipped}]")
if count_added > 50:
sys_log.warning(f"⚠️ 偵測到異常大量新增數據 ({count_added} 筆),請確認分類 URL 或爬取範圍是否正確")
return count_added
except Exception as e:
session.rollback()
sys_log.error(f"❌ 資料寫入異常:{str(e)}")
return 0
finally:
session.close()
def get_price_analysis(self, product_id, external_session=None):
"""
🚩 整合優化:取得該商品的歷史價格波動分析。
用於 Excel 報表生成時計算漲跌數值。
"""
session = external_session if external_session else self.get_session()
try:
# 取得該商品所有的價格紀錄,按時間由新到舊排序
records = session.query(PriceRecord).filter_by(product_id=product_id)\
.order_by(PriceRecord.timestamp.desc()).all()
if not records:
return {'current': 0, '7d_diff': 0, '30d_diff': 0}
current_price = records[0].price
# 計算波動 (若數據長度不足則回傳 0)
# 註此處假設每天一筆數據records[7] 約為一週前records[30] 約為一月前
diff_7d = current_price - records[7].price if len(records) > 7 else 0
diff_30 = current_price - records[30].price if len(records) > 30 else 0
return {
'current': current_price,
'7d_diff': diff_7d,
'30d_diff': diff_30
}
except Exception as e:
sys_log.error(f"❌ 價格分析計算失敗 (ID: {product_id}): {e}")
return {'current': 0, '7d_diff': 0, '30d_diff': 0}
finally:
if not external_session:
session.close()
# S5 安全修復table_name 白名單,防止 SQL Injection
ALLOWED_SALES_TABLES = frozenset({
'realtime_sales_monthly',
'daily_sales_snapshot',
'monthly_summary_analysis',
'vendor_performance',
'daily_performance',
'edm_products',
'festival_products',
'competitor_prices',
})
def get_sales_data(self, table_name='realtime_sales_monthly', start_date=None, end_date=None, months=None):
"""
從指定的銷售資料表中讀取資料
Args:
table_name: 資料表名稱 (預設: realtime_sales_monthly)
start_date: 開始日期 (格式: YYYY-MM-DD)
end_date: 結束日期 (格式: YYYY-MM-DD)
months: 查詢最近幾個月的資料
Returns:
tuple: (DataFrame, cols_map) - 資料框和欄位映射字典
"""
import pandas as pd
from datetime import datetime, timedelta
import re as _re_sec
# S5 白名單驗證:只允許已知的銷售資料表
if table_name not in self.ALLOWED_SALES_TABLES:
raise ValueError(f"[Security] 非法的銷售資料表名稱:{table_name!r}。允許的表名:{sorted(self.ALLOWED_SALES_TABLES)}")
try:
# 建立日期過濾條件(使用參數化查詢防止日期注入)
date_filter = ""
params = {}
if start_date and end_date:
# 格式驗證:只允許 YYYY-MM-DD 格式
if not _re_sec.match(r'^\d{4}-\d{2}-\d{2}$', str(start_date)) or \
not _re_sec.match(r'^\d{4}-\d{2}-\d{2}$', str(end_date)):
raise ValueError(f"日期格式不合法start={start_date}, end={end_date}")
date_filter = " WHERE \"日期\" BETWEEN :start_date AND :end_date"
params = {'start_date': str(start_date), 'end_date': str(end_date)}
elif months:
# 計算 months 個月前的日期
end_dt = datetime.now()
start_dt = end_dt - timedelta(days=months * 30)
start_date_str = start_dt.strftime('%Y-%m-%d')
end_date_str = end_dt.strftime('%Y-%m-%d')
date_filter = " WHERE \"日期\" BETWEEN :start_date AND :end_date"
params = {'start_date': start_date_str, 'end_date': end_date_str}
# 執行查詢table_name 已白名單驗證,日期已參數化)
sql = f"SELECT * FROM {table_name}{date_filter}"
df = pd.read_sql(text(sql), self.engine, params=params if params else None)
# V-Fix: 將數值欄位轉換為數字類型
numeric_columns = ['總業績', '數量', '總成本', '退貨數量', '商品單位售價',
'折價券折扣金額', '折扣金額', '滿額再折扣金額', '分期手續費']
for col in numeric_columns:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0)
# 建立欄位映射
cols = df.columns.tolist()
def find_col(keywords):
for keyword in keywords:
for col in cols:
if keyword in str(col):
return col
return None
cols_map = {
'name': find_col(['商品名稱', '品名', 'Name', 'Product']),
'pid': find_col(['商品ID', 'Product ID', 'ID', 'i_code']),
'date': find_col(['日期', '交易日期', 'Date']),
'time': find_col(['時間', 'Time']),
'amount': find_col(['總業績', '銷售金額', '業績', '金額', 'Amount', 'Sales']),
'qty': find_col(['數量', '銷售數量', '銷量', 'Qty', 'Quantity']),
'cost': find_col(['總成本', '成本', 'Cost']),
'profit': find_col(['毛利', 'Profit', 'Gross Margin']),
'category': find_col(['商品館', '館別', '分類', 'Category']),
'brand': find_col(['品牌', 'Brand']),
'vendor': find_col(['廠商名稱', 'Vendor Name', '廠商', '供應商', 'Vendor']),
'activity': find_col(['折扣活動名稱', '活動', 'Activity', 'Campaign']),
'payment': find_col(['付款', 'Payment', '付款方式']),
'price': find_col(['商品單位售價', '單價', 'Price']),
}
sys_log.info(f"[DB] get_sales_data 成功 | 表: {table_name} | 筆數: {len(df)}")
return df, cols_map
except Exception as e:
sys_log.error(f"[DB] get_sales_data 失敗: {e}")
return None, {}
# =============================================================================
# 全域資料庫管理器與便捷函數
# =============================================================================
# 預設使用 config.py 的 DATABASE_PATH (支援 SQLite 和 PostgreSQL)
_default_db_manager = None
def get_db_manager():
"""
取得全域 DatabaseManager 實例 (單例模式)
Returns:
DatabaseManager: 資料庫管理器實例
"""
global _default_db_manager
if _default_db_manager is None:
try:
from config import DATABASE_PATH
_default_db_manager = DatabaseManager(DATABASE_PATH)
except ImportError:
# 若 config 不可用,使用預設路徑
_default_db_manager = DatabaseManager()
return _default_db_manager
def get_session():
"""
取得資料庫 Session便捷函數
這是給其他模組使用的便捷函數,避免重複初始化 DatabaseManager。
Returns:
Session: SQLAlchemy Session 實例
Usage:
from database.manager import get_session
session = get_session()
try:
# 進行資料庫操作
session.query(...)
session.commit()
finally:
session.close()
"""
return get_db_manager().get_session()