#!/usr/bin/env python3 """ Superset 資料集與儀表板自動設定腳本 MOMO Pro System - UAT 環境 使用方式: docker exec momo-superset python /app/setup_datasets.py """ import sys import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # 要建立的資料集 DATASETS = [ { "table_name": "daily_sales_snapshot", "description": "每日銷售快照 - 用於當日業績分析", "main_dttm_col": "snapshot_date", }, { "table_name": "realtime_sales_monthly", "description": "即時業績月度資料 - 用於銷售分析和成長分析", "main_dttm_col": None, }, { "table_name": "monthly_summary_analysis", "description": "月度總結分析 - 用於月度報表", "main_dttm_col": "report_month", }, { "table_name": "products", "description": "商品資料 - 用於商品看板和 ABC 分析", "main_dttm_col": "updated_at", }, { "table_name": "price_records", "description": "價格記錄 - 用於價格趨勢分析", "main_dttm_col": "timestamp", }, ] def setup_datasets(): """建立所有資料集""" # 在函數內部導入,確保 app context 正確 from superset.app import create_app app = create_app() with app.app_context(): from superset.extensions import db from superset.connectors.sqla.models import SqlaTable from superset.models.core import Database # 找到 MOMO_UAT 資料庫 database = db.session.query(Database).filter_by(database_name="MOMO_UAT").first() if not database: logger.error("找不到 MOMO_UAT 資料庫,請先新增資料庫連線") sys.exit(1) logger.info(f"找到資料庫: {database.database_name} (ID: {database.id})") created_count = 0 for ds_config in DATASETS: table_name = ds_config["table_name"] # 檢查是否已存在 existing = db.session.query(SqlaTable).filter_by( table_name=table_name, database_id=database.id ).first() if existing: logger.info(f"資料集已存在: {table_name}") continue # 建立新資料集 dataset = SqlaTable( table_name=table_name, database_id=database.id, schema="public", description=ds_config.get("description", ""), ) # 設定時間欄位 if ds_config.get("main_dttm_col"): dataset.main_dttm_col = ds_config["main_dttm_col"] db.session.add(dataset) logger.info(f"建立資料集: {table_name}") created_count += 1 db.session.commit() logger.info(f"完成! 建立了 {created_count} 個資料集") # 同步資料集欄位 logger.info("同步資料集欄位...") for ds_config in DATASETS: table_name = ds_config["table_name"] dataset = db.session.query(SqlaTable).filter_by( table_name=table_name, database_id=database.id ).first() if dataset: try: dataset.fetch_metadata() logger.info(f"已同步欄位: {table_name}") except Exception as e: logger.warning(f"同步欄位失敗 {table_name}: {e}") db.session.commit() logger.info("資料集設定完成!") if __name__ == "__main__": setup_datasets()