495 lines
22 KiB
Python
495 lines
22 KiB
Python
import os
|
||
import re
|
||
import threading
|
||
from sqlalchemy import create_engine, desc, select, text, literal
|
||
from sqlalchemy.engine.url import make_url
|
||
from sqlalchemy.orm import sessionmaker
|
||
from datetime import datetime
|
||
from .models import Base, Category, Product, PriceRecord, MonthlySummaryAnalysis
|
||
from .user_models import User, LoginHistory # noqa: F401 - 必須在 trend_models 之前導入,解決 ForeignKey 依賴問題
|
||
from .edm_models import PromoProduct # V-Fix: 確保 EDM 模型被註冊,以便自動建表
|
||
from .trend_models import TrendRecord, TrendKeyword, TrendAnalysis, WebSearchCache, TelegramUser # noqa: F401 - 趨勢資料表
|
||
from .permission_models import Permission, UserPermission # noqa: F401 - 確保權限表被 Base.metadata 管理
|
||
from .ai_models import AIGenerationHistory, AIPromptTemplate, AIUsageTracking, AIInsight # noqa: F401 - AI history/template 表
|
||
from .autoheal_models import ( # noqa: F401 - ADR-013 AIOps 自動修復表
|
||
AgentContext,
|
||
ActionPlan,
|
||
ActionOutcome,
|
||
AgentStrategyWeights,
|
||
Incident,
|
||
Playbook,
|
||
HealLog,
|
||
)
|
||
from .import_models import ImportJob, ImportConfig # noqa: F401 - 確保 import_jobs/import_config 被 Base.metadata 管理
|
||
from .notification_models import NotificationTemplate # noqa: F401 - 確保 notification_templates 表被 Base.metadata 管理
|
||
from .ppt_reports import PPTReport # noqa: F401 - 確保 ppt_reports 表被 Base.metadata 管理
|
||
from .vendor_models import VendorStockout, VendorList, VendorEmail, EmailSendLog # noqa: F401 - 確保 vendor 表被 Base.metadata 管理
|
||
from .realtime_sales_models import RealtimeSalesMonthly # noqa: F401 - 確保 realtime_sales_monthly 被 Base.metadata 管理
|
||
|
||
# 🚩 導入優化後的日誌管理模組
|
||
from utils.logger_manager import SystemLogger
|
||
|
||
# 初始化資料庫模組專用 Logger
|
||
sys_log = SystemLogger("Database").get_logger()
|
||
|
||
_metadata_init_lock = threading.Lock()
|
||
_metadata_initialized = False
|
||
_POSTGRES_METADATA_LOCK_ID = 170017
|
||
|
||
|
||
def ensure_metadata_initialized(engine, use_postgres_lock=False):
|
||
"""冪等初始化 SQLAlchemy metadata,避免一般流程重複碰 DDL。"""
|
||
global _metadata_initialized
|
||
if _metadata_initialized:
|
||
return
|
||
|
||
with _metadata_init_lock:
|
||
if _metadata_initialized:
|
||
return
|
||
|
||
if use_postgres_lock:
|
||
with engine.begin() as conn:
|
||
conn.execute(text("SELECT pg_advisory_lock(:lock_id)"), {"lock_id": _POSTGRES_METADATA_LOCK_ID})
|
||
try:
|
||
Base.metadata.create_all(conn)
|
||
finally:
|
||
conn.execute(text("SELECT pg_advisory_unlock(:lock_id)"), {"lock_id": _POSTGRES_METADATA_LOCK_ID})
|
||
else:
|
||
Base.metadata.create_all(engine)
|
||
|
||
_metadata_initialized = True
|
||
|
||
def sanitize_timestamp(timestamp_str):
|
||
"""
|
||
驗證並清理時間戳字串,防止 SQL Injection
|
||
|
||
Args:
|
||
timestamp_str: 時間戳字串(格式:YYYY-MM-DD HH:MM:SS)
|
||
|
||
Returns:
|
||
str: 驗證通過的時間戳
|
||
|
||
Raises:
|
||
ValueError: 格式不正確
|
||
"""
|
||
# 只允許標準時間格式:YYYY-MM-DD HH:MM:SS
|
||
if not re.match(r'^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}$', timestamp_str):
|
||
raise ValueError(f"時間戳格式不正確: {timestamp_str}")
|
||
return timestamp_str
|
||
|
||
class DatabaseManager:
|
||
_instance_cache = {}
|
||
_instance_lock = threading.Lock()
|
||
|
||
def __init__(self, db_path=None):
|
||
"""
|
||
初始化資料庫連線。
|
||
優先使用 PostgreSQL (透過 config.py 設定),否則回退到 SQLite。
|
||
"""
|
||
# V-Fix (2026-01-23): 優先使用 config.py 的資料庫設定
|
||
from config import DATABASE_PATH, DATABASE_TYPE
|
||
|
||
effective_db_path = DATABASE_PATH if db_path is None else db_path
|
||
cache_key = (DATABASE_TYPE, str(effective_db_path))
|
||
|
||
# V-Fix (2026-04-30): DatabaseManager 在多 route 內被頻繁直接 new,
|
||
# 若每次都 create_engine 會不斷新增連線池,最終把 PostgreSQL clients 用光。
|
||
with self._instance_lock:
|
||
cached = self._instance_cache.get(cache_key)
|
||
if cached:
|
||
self.engine = cached['engine']
|
||
self.Session = cached['Session']
|
||
return
|
||
|
||
if DATABASE_TYPE == 'postgresql':
|
||
# PostgreSQL 模式 - 使用 config.py 的連線字串
|
||
# V-Fix: gunicorn 多 worker 下需控制每個 process 的最大連線數。
|
||
self.engine = create_engine(
|
||
effective_db_path,
|
||
echo=False,
|
||
pool_pre_ping=True, # 自動檢測斷線連線
|
||
pool_size=2, # 每個 worker 保留少量常駐連線
|
||
max_overflow=3, # 突發上限,避免吃滿 PostgreSQL clients
|
||
pool_recycle=1800, # 30分鐘回收連線
|
||
pool_timeout=30, # 獲取連線超時
|
||
connect_args={
|
||
'connect_timeout': 10, # 連線超時 10 秒
|
||
'options': '-c statement_timeout=60000' # SQL 超時 60 秒
|
||
}
|
||
)
|
||
ensure_metadata_initialized(self.engine, use_postgres_lock=True)
|
||
self.Session = sessionmaker(bind=self.engine)
|
||
self._instance_cache[cache_key] = {
|
||
'engine': self.engine,
|
||
'Session': self.Session,
|
||
}
|
||
sys_log.info(f"[Database] ✅ 使用 PostgreSQL 資料庫 (連線池已收斂)")
|
||
# ADR-013: 確保 AIOps 自動修復表存在並植入種子 PlayBook
|
||
self._init_autoheal_tables()
|
||
else:
|
||
# SQLite 模式 - 向後相容
|
||
if db_path is None:
|
||
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||
effective_db_path = os.path.join(base_dir, 'data', 'momo_database.db')
|
||
|
||
if str(effective_db_path).startswith('sqlite://'):
|
||
sqlite_db_file = make_url(effective_db_path).database
|
||
if sqlite_db_file:
|
||
os.makedirs(os.path.dirname(sqlite_db_file), exist_ok=True)
|
||
self.engine = create_engine(effective_db_path, echo=False)
|
||
else:
|
||
os.makedirs(os.path.dirname(effective_db_path), exist_ok=True)
|
||
self.engine = create_engine(f'sqlite:///{effective_db_path}', echo=False)
|
||
Base.metadata.create_all(self.engine)
|
||
self.Session = sessionmaker(bind=self.engine)
|
||
self._instance_cache[cache_key] = {
|
||
'engine': self.engine,
|
||
'Session': self.Session,
|
||
}
|
||
self._check_and_fix_schema()
|
||
sys_log.info(f"[Database] 使用 SQLite 資料庫: {effective_db_path}")
|
||
|
||
def _check_and_fix_schema(self):
|
||
"""自動檢查並修復資料庫結構 (僅限 SQLite)"""
|
||
# 此方法使用 SQLite PRAGMA 語法,不適用於 PostgreSQL
|
||
from config import DATABASE_TYPE
|
||
if DATABASE_TYPE == 'postgresql':
|
||
return # PostgreSQL 不需要此修復邏輯
|
||
|
||
session = self.get_session()
|
||
try:
|
||
# 1. 檢查 promo_products 是否缺少 url 欄位
|
||
result = session.execute(text("PRAGMA table_info(promo_products)")).fetchall()
|
||
if result:
|
||
columns = [row[1] for row in result]
|
||
if 'url' not in columns:
|
||
sys_log.warning("⚠️ 偵測到 promo_products 表缺少 url 欄位,正在自動修復...")
|
||
session.execute(text("ALTER TABLE promo_products ADD COLUMN url TEXT"))
|
||
|
||
# 2. 檢查 products 表是否缺少 status 與 updated_at 欄位
|
||
result_prod = session.execute(text("PRAGMA table_info(products)")).fetchall()
|
||
if result_prod:
|
||
prod_columns = [row[1] for row in result_prod]
|
||
|
||
if 'status' not in prod_columns:
|
||
sys_log.warning("⚠️ 偵測到 products 表缺少 status 欄位,正在自動修復...")
|
||
session.execute(text("ALTER TABLE products ADD COLUMN status TEXT DEFAULT 'ACTIVE'"))
|
||
|
||
if 'updated_at' not in prod_columns:
|
||
sys_log.warning("⚠️ 偵測到 products 表缺少 updated_at 欄位,正在自動修復...")
|
||
now_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
# 使用清理函數防止 SQL Injection
|
||
safe_timestamp = sanitize_timestamp(now_str)
|
||
session.execute(text(f"ALTER TABLE products ADD COLUMN updated_at TIMESTAMP DEFAULT '{safe_timestamp}'"))
|
||
|
||
session.commit()
|
||
except Exception as e:
|
||
sys_log.error(f"❌ 資料庫結構檢查失敗: {e}")
|
||
finally:
|
||
session.close()
|
||
|
||
def _init_autoheal_tables(self):
|
||
"""
|
||
ADR-013: 在 PostgreSQL 模式下,確保 AIOps 三張表存在並植入種子 PlayBook。
|
||
使用 Base.metadata.create_all 以 checkfirst=True 確保冪等執行。
|
||
"""
|
||
try:
|
||
# 建立表(已存在則略過)
|
||
from .autoheal_models import Incident, Playbook, HealLog, SEED_PLAYBOOKS
|
||
from sqlalchemy import inspect as sa_inspect
|
||
inspector = sa_inspect(self.engine)
|
||
existing_tables = inspector.get_table_names()
|
||
|
||
for model in [Playbook, Incident, HealLog]:
|
||
if model.__tablename__ not in existing_tables:
|
||
model.__table__.create(self.engine, checkfirst=True)
|
||
sys_log.info(f"[Database] ✅ 建立 AIOps 表: {model.__tablename__}")
|
||
|
||
# 植入種子 PlayBook(首次)
|
||
session = self.get_session()
|
||
try:
|
||
count = session.query(Playbook).count()
|
||
if count == 0:
|
||
for seed in SEED_PLAYBOOKS:
|
||
session.add(Playbook(**seed))
|
||
session.commit()
|
||
sys_log.info(f"[Database] ✅ 植入 {len(SEED_PLAYBOOKS)} 筆種子 PlayBook")
|
||
else:
|
||
sys_log.info(f"[Database] PlayBook 已有 {count} 筆,略過種子植入")
|
||
except Exception as e:
|
||
session.rollback()
|
||
sys_log.warning(f"[Database] 種子 PlayBook 植入失敗: {e}")
|
||
finally:
|
||
session.close()
|
||
|
||
except Exception as e:
|
||
sys_log.error(f"[Database] _init_autoheal_tables 失敗 (不影響主程序): {e}")
|
||
|
||
def get_session(self):
|
||
"""
|
||
提供外部調用的 Session 實例。
|
||
"""
|
||
return self.Session()
|
||
|
||
def update_data(self, product_list):
|
||
"""
|
||
🚀 批次異動偵測與日誌分析邏輯:
|
||
1. 利用記憶體快照進行三重比對,並記錄詳細數據流向。
|
||
2. 針對大量數據新增 (異常監控) 觸發警告等級日誌。
|
||
3. 維持每 100 筆分段 Commit 的效能優勢。
|
||
"""
|
||
session = self.get_session()
|
||
count_added = 0
|
||
count_skipped = 0
|
||
|
||
try:
|
||
# 1. 建立對比快取:一次抓出所有商品最後一筆紀錄
|
||
latest_prices = session.query(
|
||
Product.i_code, Product.name, PriceRecord.price
|
||
).join(PriceRecord).order_by(PriceRecord.timestamp.desc()).all()
|
||
|
||
db_cache = {row[0]: (row[1], row[2]) for row in latest_prices}
|
||
|
||
sys_log.info(f"💾 開始數據比對:目前資料庫已知商品數 {len(db_cache)}")
|
||
|
||
for item in product_list:
|
||
i_code = item['i_code']
|
||
current_name = item['name']
|
||
current_price = item['price']
|
||
|
||
# 2. 三重比對邏輯:偵測商品是否真的需要更新
|
||
if i_code in db_cache:
|
||
last_name, last_price = db_cache[i_code]
|
||
if last_name == current_name and last_price == current_price:
|
||
count_skipped += 1
|
||
continue
|
||
|
||
# 3. 處理分類項目
|
||
category = session.query(Category).filter_by(name=item['category']).first()
|
||
if not category:
|
||
category = Category(name=item['category'])
|
||
session.add(category)
|
||
session.flush()
|
||
|
||
# 4. 處理商品基本資訊
|
||
product = session.query(Product).filter_by(i_code=i_code).first()
|
||
if not product:
|
||
product = Product(
|
||
i_code=i_code,
|
||
name=current_name,
|
||
url=item['url'],
|
||
category=item['category'],
|
||
category_id=category.id
|
||
)
|
||
session.add(product)
|
||
session.flush()
|
||
else:
|
||
product.name = current_name
|
||
product.category = item['category']
|
||
product.category_id = category.id
|
||
|
||
# 5. 寫入新的價格紀錄
|
||
new_price = PriceRecord(
|
||
product_id=product.id,
|
||
price=current_price,
|
||
timestamp=datetime.now()
|
||
)
|
||
session.add(new_price)
|
||
count_added += 1
|
||
|
||
if count_added % 100 == 0:
|
||
session.commit()
|
||
|
||
# 7. 最後提交並彙報日誌
|
||
session.commit()
|
||
sys_log.info(f"📊 數據同步彙報: [新增監控: {count_added}] [跳過重複: {count_skipped}]")
|
||
|
||
if count_added > 50:
|
||
sys_log.warning(f"⚠️ 偵測到異常大量新增數據 ({count_added} 筆),請確認分類 URL 或爬取範圍是否正確")
|
||
|
||
return count_added
|
||
|
||
except Exception as e:
|
||
session.rollback()
|
||
sys_log.error(f"❌ 資料寫入異常:{str(e)}")
|
||
return 0
|
||
finally:
|
||
session.close()
|
||
|
||
def get_price_analysis(self, product_id, external_session=None):
|
||
"""
|
||
🚩 整合優化:取得該商品的歷史價格波動分析。
|
||
用於 Excel 報表生成時計算漲跌數值。
|
||
"""
|
||
session = external_session if external_session else self.get_session()
|
||
try:
|
||
# 取得該商品所有的價格紀錄,按時間由新到舊排序
|
||
records = session.query(PriceRecord).filter_by(product_id=product_id)\
|
||
.order_by(PriceRecord.timestamp.desc()).all()
|
||
|
||
if not records:
|
||
return {'current': 0, '7d_diff': 0, '30d_diff': 0}
|
||
|
||
current_price = records[0].price
|
||
# 計算波動 (若數據長度不足則回傳 0)
|
||
# 註:此處假設每天一筆數據,records[7] 約為一週前,records[30] 約為一月前
|
||
diff_7d = current_price - records[7].price if len(records) > 7 else 0
|
||
diff_30 = current_price - records[30].price if len(records) > 30 else 0
|
||
|
||
return {
|
||
'current': current_price,
|
||
'7d_diff': diff_7d,
|
||
'30d_diff': diff_30
|
||
}
|
||
except Exception as e:
|
||
sys_log.error(f"❌ 價格分析計算失敗 (ID: {product_id}): {e}")
|
||
return {'current': 0, '7d_diff': 0, '30d_diff': 0}
|
||
finally:
|
||
if not external_session:
|
||
session.close()
|
||
|
||
# S5 安全修復:table_name 白名單,防止 SQL Injection
|
||
ALLOWED_SALES_TABLES = frozenset({
|
||
'realtime_sales_monthly',
|
||
'daily_sales_snapshot',
|
||
'monthly_summary_analysis',
|
||
'vendor_performance',
|
||
'daily_performance',
|
||
'edm_products',
|
||
'festival_products',
|
||
'competitor_prices',
|
||
})
|
||
|
||
def get_sales_data(self, table_name='realtime_sales_monthly', start_date=None, end_date=None, months=None):
|
||
"""
|
||
從指定的銷售資料表中讀取資料
|
||
|
||
Args:
|
||
table_name: 資料表名稱 (預設: realtime_sales_monthly)
|
||
start_date: 開始日期 (格式: YYYY-MM-DD)
|
||
end_date: 結束日期 (格式: YYYY-MM-DD)
|
||
months: 查詢最近幾個月的資料
|
||
|
||
Returns:
|
||
tuple: (DataFrame, cols_map) - 資料框和欄位映射字典
|
||
"""
|
||
import pandas as pd
|
||
from datetime import datetime, timedelta
|
||
import re as _re_sec
|
||
|
||
# S5 白名單驗證:只允許已知的銷售資料表
|
||
if table_name not in self.ALLOWED_SALES_TABLES:
|
||
raise ValueError(f"[Security] 非法的銷售資料表名稱:{table_name!r}。允許的表名:{sorted(self.ALLOWED_SALES_TABLES)}")
|
||
|
||
try:
|
||
# 建立日期過濾條件(使用參數化查詢防止日期注入)
|
||
date_filter = ""
|
||
params = {}
|
||
if start_date and end_date:
|
||
# 格式驗證:只允許 YYYY-MM-DD 格式
|
||
if not _re_sec.match(r'^\d{4}-\d{2}-\d{2}$', str(start_date)) or \
|
||
not _re_sec.match(r'^\d{4}-\d{2}-\d{2}$', str(end_date)):
|
||
raise ValueError(f"日期格式不合法:start={start_date}, end={end_date}")
|
||
date_filter = " WHERE \"日期\" BETWEEN :start_date AND :end_date"
|
||
params = {'start_date': str(start_date), 'end_date': str(end_date)}
|
||
elif months:
|
||
# 計算 months 個月前的日期
|
||
end_dt = datetime.now()
|
||
start_dt = end_dt - timedelta(days=months * 30)
|
||
start_date_str = start_dt.strftime('%Y-%m-%d')
|
||
end_date_str = end_dt.strftime('%Y-%m-%d')
|
||
date_filter = " WHERE \"日期\" BETWEEN :start_date AND :end_date"
|
||
params = {'start_date': start_date_str, 'end_date': end_date_str}
|
||
|
||
# 執行查詢(table_name 已白名單驗證,日期已參數化)
|
||
sql = f"SELECT * FROM {table_name}{date_filter}"
|
||
df = pd.read_sql(text(sql), self.engine, params=params if params else None)
|
||
|
||
# V-Fix: 將數值欄位轉換為數字類型
|
||
numeric_columns = ['總業績', '數量', '總成本', '退貨數量', '商品單位售價',
|
||
'折價券折扣金額', '折扣金額', '滿額再折扣金額', '分期手續費']
|
||
for col in numeric_columns:
|
||
if col in df.columns:
|
||
df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0)
|
||
|
||
# 建立欄位映射
|
||
cols = df.columns.tolist()
|
||
|
||
def find_col(keywords):
|
||
for keyword in keywords:
|
||
for col in cols:
|
||
if keyword in str(col):
|
||
return col
|
||
return None
|
||
|
||
cols_map = {
|
||
'name': find_col(['商品名稱', '品名', 'Name', 'Product']),
|
||
'pid': find_col(['商品ID', 'Product ID', 'ID', 'i_code']),
|
||
'date': find_col(['日期', '交易日期', 'Date']),
|
||
'time': find_col(['時間', 'Time']),
|
||
'amount': find_col(['總業績', '銷售金額', '業績', '金額', 'Amount', 'Sales']),
|
||
'qty': find_col(['數量', '銷售數量', '銷量', 'Qty', 'Quantity']),
|
||
'cost': find_col(['總成本', '成本', 'Cost']),
|
||
'profit': find_col(['毛利', 'Profit', 'Gross Margin']),
|
||
'category': find_col(['商品館', '館別', '分類', 'Category']),
|
||
'brand': find_col(['品牌', 'Brand']),
|
||
'vendor': find_col(['廠商名稱', 'Vendor Name', '廠商', '供應商', 'Vendor']),
|
||
'activity': find_col(['折扣活動名稱', '活動', 'Activity', 'Campaign']),
|
||
'payment': find_col(['付款', 'Payment', '付款方式']),
|
||
'price': find_col(['商品單位售價', '單價', 'Price']),
|
||
}
|
||
|
||
sys_log.info(f"[DB] get_sales_data 成功 | 表: {table_name} | 筆數: {len(df)}")
|
||
return df, cols_map
|
||
|
||
except Exception as e:
|
||
sys_log.error(f"[DB] get_sales_data 失敗: {e}")
|
||
return None, {}
|
||
|
||
|
||
# =============================================================================
|
||
# 全域資料庫管理器與便捷函數
|
||
# =============================================================================
|
||
# 預設使用 config.py 的 DATABASE_PATH (支援 SQLite 和 PostgreSQL)
|
||
_default_db_manager = None
|
||
|
||
|
||
def get_db_manager():
|
||
"""
|
||
取得全域 DatabaseManager 實例 (單例模式)
|
||
|
||
Returns:
|
||
DatabaseManager: 資料庫管理器實例
|
||
"""
|
||
global _default_db_manager
|
||
if _default_db_manager is None:
|
||
try:
|
||
from config import DATABASE_PATH
|
||
_default_db_manager = DatabaseManager(DATABASE_PATH)
|
||
except ImportError:
|
||
# 若 config 不可用,使用預設路徑
|
||
_default_db_manager = DatabaseManager()
|
||
return _default_db_manager
|
||
|
||
|
||
def get_session():
|
||
"""
|
||
取得資料庫 Session(便捷函數)
|
||
|
||
這是給其他模組使用的便捷函數,避免重複初始化 DatabaseManager。
|
||
|
||
Returns:
|
||
Session: SQLAlchemy Session 實例
|
||
|
||
Usage:
|
||
from database.manager import get_session
|
||
session = get_session()
|
||
try:
|
||
# 進行資料庫操作
|
||
session.query(...)
|
||
session.commit()
|
||
finally:
|
||
session.close()
|
||
"""
|
||
return get_db_manager().get_session()
|