Some checks failed
CD Pipeline / deploy (push) Failing after 3m24s
架構(Exception → Incident → PlayBook → Heal → KM → Telegram): 新增元件: - database/autoheal_models.py: Incident/Playbook/HealLog 三張表 + 7 條種子 PlayBook - migrations/013_autoheal.sql: 建表 DDL + 種子資料(冪等 INSERT) - services/auto_heal_service.py: 核心引擎 7 步閉環 - _classify_error: 8 類錯誤自動分類 (DNS_FAIL/DB_UNREACHABLE/OOM/...) - _match_playbook: error_type + keyword + 冷卻 + max_retries 保護 - _execute_playbook: DOCKER_RESTART/SSH_CMD/ALERT_ONLY/WAIT_RETRY - _sink_to_km: 修復知識寫入 ai_insights (auto_heal_playbook) - SSH 白名單:僅允許 docker restart / compose restart / docker start 修改元件: - database/manager.py: _init_autoheal_tables() 啟動時建表+種子 PlayBook - scheduler.py: 3 個核心任務植入 handle_exception (run_auto_import_task / run_icaim_analysis_task / run_weekly_strategy_task) - requirements.txt: paramiko(SSH 跳板;不可用時降級 subprocess+CLI ssh) 安全設計: CMD 白名單 + cooldown + max_retries escalation + DB 冪等 migration Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
397 lines
17 KiB
Python
397 lines
17 KiB
Python
import os
|
||
import re
|
||
from sqlalchemy import create_engine, desc, select, text, literal
|
||
from sqlalchemy.orm import sessionmaker
|
||
from datetime import datetime
|
||
from .models import Base, Category, Product, PriceRecord, MonthlySummaryAnalysis
|
||
from .user_models import User, LoginHistory # noqa: F401 - 必須在 trend_models 之前導入,解決 ForeignKey 依賴問題
|
||
from .edm_models import PromoProduct # V-Fix: 確保 EDM 模型被註冊,以便自動建表
|
||
from .trend_models import TrendRecord, TrendKeyword, TrendAnalysis, WebSearchCache, TelegramUser # noqa: F401 - 趨勢資料表
|
||
from .ai_models import AIGenerationHistory, AIInsight, AIUsageTracking, AIPromptTemplate # AI 記憶體與洞察模型
|
||
from .autoheal_models import Incident, Playbook, HealLog # noqa: F401 - ADR-013 AIOps 自動修復表
|
||
|
||
# 🚩 導入優化後的日誌管理模組
|
||
from services.logger_manager import SystemLogger
|
||
|
||
# 初始化資料庫模組專用 Logger
|
||
sys_log = SystemLogger("Database").get_logger()
|
||
|
||
def sanitize_timestamp(timestamp_str):
|
||
"""
|
||
驗證並清理時間戳字串,防止 SQL Injection
|
||
|
||
Args:
|
||
timestamp_str: 時間戳字串(格式:YYYY-MM-DD HH:MM:SS)
|
||
|
||
Returns:
|
||
str: 驗證通過的時間戳
|
||
|
||
Raises:
|
||
ValueError: 格式不正確
|
||
"""
|
||
# 只允許標準時間格式:YYYY-MM-DD HH:MM:SS
|
||
if not re.match(r'^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}$', timestamp_str):
|
||
raise ValueError(f"時間戳格式不正確: {timestamp_str}")
|
||
return timestamp_str
|
||
|
||
class DatabaseManager:
|
||
def __init__(self, db_path=None):
|
||
"""
|
||
初始化資料庫連線。
|
||
優先使用 PostgreSQL (透過 config.py 設定),否則回退到 SQLite。
|
||
"""
|
||
# V-Fix (2026-01-23): 優先使用 config.py 的資料庫設定
|
||
from config import DATABASE_PATH, DATABASE_TYPE
|
||
|
||
if DATABASE_TYPE == 'postgresql':
|
||
# PostgreSQL 模式 - 使用 config.py 的連線字串
|
||
# 連線池配置以提升穩定性
|
||
self.engine = create_engine(
|
||
DATABASE_PATH,
|
||
echo=False,
|
||
pool_pre_ping=True, # 自動檢測斷線連線
|
||
pool_size=5, # 連線池大小
|
||
max_overflow=10, # 額外連線數
|
||
pool_recycle=1800, # 30分鐘回收連線
|
||
pool_timeout=30, # 獲取連線超時
|
||
connect_args={
|
||
'connect_timeout': 10, # 連線超時 10 秒
|
||
'options': '-c statement_timeout=60000' # SQL 超時 60 秒
|
||
}
|
||
)
|
||
self.Session = sessionmaker(bind=self.engine)
|
||
sys_log.info(f"[Database] ✅ 使用 PostgreSQL 資料庫 (連線池已優化)")
|
||
# ADR-013: 確保 AIOps 自動修復表存在並植入種子 PlayBook
|
||
self._init_autoheal_tables()
|
||
else:
|
||
# SQLite 模式 - 向後相容
|
||
if db_path is None:
|
||
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||
db_path = os.path.join(base_dir, 'data', 'momo_database.db')
|
||
|
||
os.makedirs(os.path.dirname(db_path), exist_ok=True)
|
||
self.engine = create_engine(f'sqlite:///{db_path}', echo=False)
|
||
Base.metadata.create_all(self.engine)
|
||
self.Session = sessionmaker(bind=self.engine)
|
||
self._check_and_fix_schema()
|
||
sys_log.info(f"[Database] 使用 SQLite 資料庫: {db_path}")
|
||
|
||
def _check_and_fix_schema(self):
|
||
"""自動檢查並修復資料庫結構 (僅限 SQLite)"""
|
||
# 此方法使用 SQLite PRAGMA 語法,不適用於 PostgreSQL
|
||
from config import DATABASE_TYPE
|
||
if DATABASE_TYPE == 'postgresql':
|
||
return # PostgreSQL 不需要此修復邏輯
|
||
|
||
session = self.get_session()
|
||
try:
|
||
# 1. 檢查 promo_products 是否缺少 url 欄位
|
||
result = session.execute(text("PRAGMA table_info(promo_products)")).fetchall()
|
||
if result:
|
||
columns = [row[1] for row in result]
|
||
if 'url' not in columns:
|
||
sys_log.warning("⚠️ 偵測到 promo_products 表缺少 url 欄位,正在自動修復...")
|
||
session.execute(text("ALTER TABLE promo_products ADD COLUMN url TEXT"))
|
||
|
||
# 2. 檢查 products 表是否缺少 status 與 updated_at 欄位
|
||
result_prod = session.execute(text("PRAGMA table_info(products)")).fetchall()
|
||
if result_prod:
|
||
prod_columns = [row[1] for row in result_prod]
|
||
|
||
if 'status' not in prod_columns:
|
||
sys_log.warning("⚠️ 偵測到 products 表缺少 status 欄位,正在自動修復...")
|
||
session.execute(text("ALTER TABLE products ADD COLUMN status TEXT DEFAULT 'ACTIVE'"))
|
||
|
||
if 'updated_at' not in prod_columns:
|
||
sys_log.warning("⚠️ 偵測到 products 表缺少 updated_at 欄位,正在自動修復...")
|
||
now_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
# 使用清理函數防止 SQL Injection
|
||
safe_timestamp = sanitize_timestamp(now_str)
|
||
session.execute(text(f"ALTER TABLE products ADD COLUMN updated_at TIMESTAMP DEFAULT '{safe_timestamp}'"))
|
||
|
||
session.commit()
|
||
except Exception as e:
|
||
sys_log.error(f"❌ 資料庫結構檢查失敗: {e}")
|
||
finally:
|
||
session.close()
|
||
|
||
def _init_autoheal_tables(self):
|
||
"""
|
||
ADR-013: 在 PostgreSQL 模式下,確保 AIOps 三張表存在並植入種子 PlayBook。
|
||
使用 Base.metadata.create_all 以 checkfirst=True 確保冪等執行。
|
||
"""
|
||
try:
|
||
# 建立表(已存在則略過)
|
||
from .autoheal_models import Incident, Playbook, HealLog, SEED_PLAYBOOKS
|
||
from sqlalchemy import inspect as sa_inspect
|
||
inspector = sa_inspect(self.engine)
|
||
existing_tables = inspector.get_table_names()
|
||
|
||
for model in [Incident, Playbook, HealLog]:
|
||
if model.__tablename__ not in existing_tables:
|
||
model.__table__.create(self.engine, checkfirst=True)
|
||
sys_log.info(f"[Database] ✅ 建立 AIOps 表: {model.__tablename__}")
|
||
|
||
# 植入種子 PlayBook(首次)
|
||
session = self.get_session()
|
||
try:
|
||
count = session.query(Playbook).count()
|
||
if count == 0:
|
||
for seed in SEED_PLAYBOOKS:
|
||
session.add(Playbook(**seed))
|
||
session.commit()
|
||
sys_log.info(f"[Database] ✅ 植入 {len(SEED_PLAYBOOKS)} 筆種子 PlayBook")
|
||
else:
|
||
sys_log.info(f"[Database] PlayBook 已有 {count} 筆,略過種子植入")
|
||
except Exception as e:
|
||
session.rollback()
|
||
sys_log.warning(f"[Database] 種子 PlayBook 植入失敗: {e}")
|
||
finally:
|
||
session.close()
|
||
|
||
except Exception as e:
|
||
sys_log.error(f"[Database] _init_autoheal_tables 失敗 (不影響主程序): {e}")
|
||
|
||
def get_session(self):
|
||
"""
|
||
提供外部調用的 Session 實例。
|
||
"""
|
||
return self.Session()
|
||
|
||
def update_data(self, product_list):
|
||
"""
|
||
🚀 批次異動偵測與日誌分析邏輯:
|
||
1. 利用記憶體快照進行三重比對,並記錄詳細數據流向。
|
||
2. 針對大量數據新增 (異常監控) 觸發警告等級日誌。
|
||
3. 維持每 100 筆分段 Commit 的效能優勢。
|
||
"""
|
||
session = self.get_session()
|
||
count_added = 0
|
||
count_skipped = 0
|
||
|
||
try:
|
||
# 1. 建立對比快取:一次抓出所有商品最後一筆紀錄
|
||
latest_prices = session.query(
|
||
Product.i_code, Product.name, PriceRecord.price
|
||
).join(PriceRecord).order_by(PriceRecord.timestamp.desc()).all()
|
||
|
||
db_cache = {row[0]: (row[1], row[2]) for row in latest_prices}
|
||
|
||
sys_log.info(f"💾 開始數據比對:目前資料庫已知商品數 {len(db_cache)}")
|
||
|
||
for item in product_list:
|
||
i_code = item['i_code']
|
||
current_name = item['name']
|
||
current_price = item['price']
|
||
|
||
# 2. 三重比對邏輯:偵測商品是否真的需要更新
|
||
if i_code in db_cache:
|
||
last_name, last_price = db_cache[i_code]
|
||
if last_name == current_name and last_price == current_price:
|
||
count_skipped += 1
|
||
continue
|
||
|
||
# 3. 處理分類項目
|
||
category = session.query(Category).filter_by(name=item['category']).first()
|
||
if not category:
|
||
category = Category(name=item['category'])
|
||
session.add(category)
|
||
session.flush()
|
||
|
||
# 4. 處理商品基本資訊
|
||
product = session.query(Product).filter_by(i_code=i_code).first()
|
||
if not product:
|
||
product = Product(
|
||
i_code=i_code,
|
||
name=current_name,
|
||
url=item['url'],
|
||
category=item['category'],
|
||
category_id=category.id
|
||
)
|
||
session.add(product)
|
||
session.flush()
|
||
else:
|
||
product.name = current_name
|
||
product.category = item['category']
|
||
product.category_id = category.id
|
||
|
||
# 5. 寫入新的價格紀錄
|
||
new_price = PriceRecord(
|
||
product_id=product.id,
|
||
price=current_price,
|
||
timestamp=datetime.now()
|
||
)
|
||
session.add(new_price)
|
||
count_added += 1
|
||
|
||
if count_added % 100 == 0:
|
||
session.commit()
|
||
|
||
# 7. 最後提交並彙報日誌
|
||
session.commit()
|
||
sys_log.info(f"📊 數據同步彙報: [新增監控: {count_added}] [跳過重複: {count_skipped}]")
|
||
|
||
if count_added > 50:
|
||
sys_log.warning(f"⚠️ 偵測到異常大量新增數據 ({count_added} 筆),請確認分類 URL 或爬取範圍是否正確")
|
||
|
||
return count_added
|
||
|
||
except Exception as e:
|
||
session.rollback()
|
||
sys_log.error(f"❌ 資料寫入異常:{str(e)}")
|
||
return 0
|
||
finally:
|
||
session.close()
|
||
|
||
def get_price_analysis(self, product_id, external_session=None):
|
||
"""
|
||
🚩 整合優化:取得該商品的歷史價格波動分析。
|
||
用於 Excel 報表生成時計算漲跌數值。
|
||
"""
|
||
session = external_session if external_session else self.get_session()
|
||
try:
|
||
# 取得該商品所有的價格紀錄,按時間由新到舊排序
|
||
records = session.query(PriceRecord).filter_by(product_id=product_id)\
|
||
.order_by(PriceRecord.timestamp.desc()).all()
|
||
|
||
if not records:
|
||
return {'current': 0, '7d_diff': 0, '30d_diff': 0}
|
||
|
||
current_price = records[0].price
|
||
# 計算波動 (若數據長度不足則回傳 0)
|
||
# 註:此處假設每天一筆數據,records[7] 約為一週前,records[30] 約為一月前
|
||
diff_7d = current_price - records[7].price if len(records) > 7 else 0
|
||
diff_30 = current_price - records[30].price if len(records) > 30 else 0
|
||
|
||
return {
|
||
'current': current_price,
|
||
'7d_diff': diff_7d,
|
||
'30d_diff': diff_30
|
||
}
|
||
except Exception as e:
|
||
sys_log.error(f"❌ 價格分析計算失敗 (ID: {product_id}): {e}")
|
||
return {'current': 0, '7d_diff': 0, '30d_diff': 0}
|
||
finally:
|
||
if not external_session:
|
||
session.close()
|
||
|
||
def get_sales_data(self, table_name='realtime_sales_monthly', start_date=None, end_date=None, months=None):
|
||
"""
|
||
從指定的銷售資料表中讀取資料
|
||
|
||
Args:
|
||
table_name: 資料表名稱 (預設: realtime_sales_monthly)
|
||
start_date: 開始日期 (格式: YYYY-MM-DD)
|
||
end_date: 結束日期 (格式: YYYY-MM-DD)
|
||
months: 查詢最近幾個月的資料
|
||
|
||
Returns:
|
||
tuple: (DataFrame, cols_map) - 資料框和欄位映射字典
|
||
"""
|
||
import pandas as pd
|
||
from datetime import datetime, timedelta
|
||
|
||
try:
|
||
# 建立日期過濾條件
|
||
date_filter = ""
|
||
if start_date and end_date:
|
||
date_filter = f" WHERE \"日期\" BETWEEN '{start_date}' AND '{end_date}'"
|
||
elif months:
|
||
# 計算 months 個月前的日期
|
||
end_dt = datetime.now()
|
||
start_dt = end_dt - timedelta(days=months * 30)
|
||
start_date_str = start_dt.strftime('%Y-%m-%d')
|
||
end_date_str = end_dt.strftime('%Y-%m-%d')
|
||
date_filter = f" WHERE \"日期\" BETWEEN '{start_date_str}' AND '{end_date_str}'"
|
||
|
||
# 執行查詢
|
||
sql = f"SELECT * FROM {table_name}{date_filter}"
|
||
df = pd.read_sql(text(sql), self.engine)
|
||
|
||
# V-Fix: 將數值欄位轉換為數字類型
|
||
numeric_columns = ['總業績', '數量', '總成本', '退貨數量', '商品單位售價',
|
||
'折價券折扣金額', '折扣金額', '滿額再折扣金額', '分期手續費']
|
||
for col in numeric_columns:
|
||
if col in df.columns:
|
||
df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0)
|
||
|
||
# 建立欄位映射
|
||
cols = df.columns.tolist()
|
||
|
||
def find_col(keywords):
|
||
for keyword in keywords:
|
||
for col in cols:
|
||
if keyword in str(col):
|
||
return col
|
||
return None
|
||
|
||
cols_map = {
|
||
'name': find_col(['商品名稱', '品名', 'Name', 'Product']),
|
||
'pid': find_col(['商品ID', 'Product ID', 'ID', 'i_code']),
|
||
'date': find_col(['日期', '交易日期', 'Date']),
|
||
'time': find_col(['時間', 'Time']),
|
||
'amount': find_col(['總業績', '銷售金額', '業績', '金額', 'Amount', 'Sales']),
|
||
'qty': find_col(['數量', '銷售數量', '銷量', 'Qty', 'Quantity']),
|
||
'cost': find_col(['總成本', '成本', 'Cost']),
|
||
'profit': find_col(['毛利', 'Profit', 'Gross Margin']),
|
||
'category': find_col(['商品館', '館別', '分類', 'Category']),
|
||
'brand': find_col(['品牌', 'Brand']),
|
||
'vendor': find_col(['廠商名稱', 'Vendor Name', '廠商', '供應商', 'Vendor']),
|
||
'activity': find_col(['折扣活動名稱', '活動', 'Activity', 'Campaign']),
|
||
'payment': find_col(['付款', 'Payment', '付款方式']),
|
||
'price': find_col(['商品單位售價', '單價', 'Price']),
|
||
}
|
||
|
||
sys_log.info(f"[DB] get_sales_data 成功 | 表: {table_name} | 筆數: {len(df)}")
|
||
return df, cols_map
|
||
|
||
except Exception as e:
|
||
sys_log.error(f"[DB] get_sales_data 失敗: {e}")
|
||
return None, {}
|
||
|
||
|
||
# =============================================================================
|
||
# 全域資料庫管理器與便捷函數
|
||
# =============================================================================
|
||
# 預設使用 config.py 的 DATABASE_PATH (支援 SQLite 和 PostgreSQL)
|
||
_default_db_manager = None
|
||
|
||
|
||
def get_db_manager():
|
||
"""
|
||
取得全域 DatabaseManager 實例 (單例模式)
|
||
|
||
Returns:
|
||
DatabaseManager: 資料庫管理器實例
|
||
"""
|
||
global _default_db_manager
|
||
if _default_db_manager is None:
|
||
try:
|
||
from config import DATABASE_PATH
|
||
_default_db_manager = DatabaseManager(DATABASE_PATH)
|
||
except ImportError:
|
||
# 若 config 不可用,使用預設路徑
|
||
_default_db_manager = DatabaseManager()
|
||
return _default_db_manager
|
||
|
||
|
||
def get_session():
|
||
"""
|
||
取得資料庫 Session(便捷函數)
|
||
|
||
這是給其他模組使用的便捷函數,避免重複初始化 DatabaseManager。
|
||
|
||
Returns:
|
||
Session: SQLAlchemy Session 實例
|
||
|
||
Usage:
|
||
from database.manager import get_session
|
||
session = get_session()
|
||
try:
|
||
# 進行資料庫操作
|
||
session.query(...)
|
||
session.commit()
|
||
finally:
|
||
session.close()
|
||
"""
|
||
return get_db_manager().get_session() |