Some checks failed
CD Pipeline / deploy (push) Failing after 5m18s
🔴 Critical - auto_heal_service: 補 import re + sqlalchemy.text + 修正 orchestrator 變數名 + autoheal_playbook→playbooks 表名 + _alert_and_store cooldown 修復 - aider_heal_executor: shell injection 改 shell=False + list 參數 - docker-compose: DISABLE_LOGIN 改 env var + 移除密碼 fallback + POSTGRES_HOST 修正 - app.py: /api/backup /api/run_task 等 6 個管理 API 加 @login_required - config.py + pg_sync + e2e_test: 移除 wooo_pg_2026 hardcoded 密碼 fallback - pg_backup.sh: 移除 TELEGRAM_TOKEN= 中間變數,直接用 $TELEGRAM_BOT_TOKEN - migration 014: trigger_pattern→match_pattern + 補 error_type NOT NULL 欄位 🟡 High - telegram_bot_service: str(e) 改通用訊息 + session try/finally + 移除 pa:/pr: 舊 callback - run_scheduler: ElephantAlpha thread 死亡監控 + 自動重啟 + Telegram 告警 + agent_context 03:30 TTL 定時清理任務 - openclaw_learning_service: build_rag_context 兩路徑加 .limit(200) - hooks: commit-quality + momo-prod-guard 空 catch 改 stderr+exit(1) - scripts/code_review: auto_yes 預設改 false - db_backup_service: PGPASSWORD 透過 env dict 傳遞 📦 Migrations - 013_autoheal: 修正建表順序 playbooks→incidents(外鍵前向引用) - 018_add_missing_indexes: heal_logs/incidents 外鍵索引 + cleanup_expired_agent_context() 🟢 Infrastructure - requirements.txt: 加版本下界 Flask>=2.3 SQLAlchemy>=1.4 等 - cd.yaml: 新增 run_scheduler.py + run_telegram_bot.py 監聽路徑 - .gitignore: insert_playbook_local.py 加入忽略 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
474 lines
15 KiB
Python
474 lines
15 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
SQLite 到 PostgreSQL 即時資料同步服務
|
||
WOOO TECH - Momo Pro System
|
||
|
||
同步策略:
|
||
1. 即時同步:資料寫入 SQLite 時自動同步到 PostgreSQL
|
||
2. 定時校驗:每 5 分鐘檢查資料一致性
|
||
3. 全量重建:支援手動觸發完整重建
|
||
|
||
使用方式:
|
||
# 作為模組引入,在資料寫入時呼叫
|
||
from services.pg_sync_service import sync_record, sync_batch
|
||
|
||
# 命令列執行
|
||
python services/pg_sync_service.py --full # 全量同步
|
||
python services/pg_sync_service.py --verify # 驗證一致性
|
||
python services/pg_sync_service.py --daemon # 背景服務模式
|
||
"""
|
||
|
||
import os
|
||
import sys
|
||
import sqlite3
|
||
import logging
|
||
import argparse
|
||
import threading
|
||
import time
|
||
from datetime import datetime, timedelta, timezone
|
||
from typing import List, Dict, Any
|
||
from contextlib import contextmanager
|
||
|
||
# 環境設定
|
||
PG_HOST = os.environ.get('POSTGRES_HOST', 'postgres') # Docker 內部網路
|
||
PG_PORT = os.environ.get('POSTGRES_PORT', '5432')
|
||
PG_USER = os.environ.get('POSTGRES_USER', 'momo')
|
||
PG_PASSWORD = os.environ.get('POSTGRES_PASSWORD')
|
||
PG_DATABASE = os.environ.get('POSTGRES_DB', 'momo_analytics')
|
||
SQLITE_PATH = os.environ.get('SQLITE_PATH', 'data/momo_database.db')
|
||
|
||
# 同步設定
|
||
SYNC_ENABLED = os.environ.get('PG_SYNC_ENABLED', 'false').lower() == 'true'
|
||
SYNC_INTERVAL_SECONDS = int(os.environ.get('PG_SYNC_INTERVAL', '300')) # 5 分鐘
|
||
|
||
# 台北時區
|
||
TAIPEI_TZ = timezone(timedelta(hours=8))
|
||
|
||
# Logging
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format='%(asctime)s - [PG_SYNC] %(levelname)s - %(message)s'
|
||
)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# PostgreSQL 連接池 (簡易實作)
|
||
_pg_connection = None
|
||
_pg_lock = threading.Lock()
|
||
|
||
|
||
# =============================================================================
|
||
# 連接管理
|
||
# =============================================================================
|
||
|
||
def get_pg_connection(retry: int = 3):
|
||
"""
|
||
取得 PostgreSQL 連接 (支援重試)
|
||
"""
|
||
global _pg_connection
|
||
|
||
if not SYNC_ENABLED:
|
||
return None
|
||
|
||
with _pg_lock:
|
||
# 檢查現有連接是否有效
|
||
if _pg_connection:
|
||
try:
|
||
cursor = _pg_connection.cursor()
|
||
cursor.execute("SELECT 1")
|
||
cursor.close()
|
||
return _pg_connection
|
||
except Exception:
|
||
_pg_connection = None
|
||
|
||
# 建立新連接
|
||
for attempt in range(retry):
|
||
try:
|
||
import psycopg2
|
||
conn = psycopg2.connect(
|
||
host=PG_HOST,
|
||
port=PG_PORT,
|
||
user=PG_USER,
|
||
password=PG_PASSWORD,
|
||
database=PG_DATABASE,
|
||
connect_timeout=10
|
||
)
|
||
conn.autocommit = False
|
||
_pg_connection = conn
|
||
logger.info(f"PostgreSQL 連接成功 ({PG_HOST}:{PG_PORT})")
|
||
return conn
|
||
except ImportError:
|
||
logger.warning("psycopg2 未安裝,PostgreSQL 同步已停用")
|
||
return None
|
||
except Exception as e:
|
||
logger.warning(f"PostgreSQL 連接失敗 (嘗試 {attempt + 1}/{retry}): {e}")
|
||
time.sleep(1)
|
||
|
||
logger.error("PostgreSQL 連接失敗,已達最大重試次數")
|
||
return None
|
||
|
||
|
||
@contextmanager
|
||
def pg_transaction():
|
||
"""PostgreSQL 交易管理器"""
|
||
conn = get_pg_connection()
|
||
if not conn:
|
||
yield None
|
||
return
|
||
|
||
try:
|
||
yield conn
|
||
conn.commit()
|
||
except Exception as e:
|
||
conn.rollback()
|
||
logger.error(f"PostgreSQL 交易失敗: {e}")
|
||
raise
|
||
|
||
|
||
# =============================================================================
|
||
# 即時同步 API (供其他模組呼叫)
|
||
# =============================================================================
|
||
|
||
def sync_record(table: str, record: Dict[str, Any], operation: str = 'INSERT'):
|
||
"""
|
||
即時同步單筆記錄
|
||
|
||
Args:
|
||
table: 資料表名稱
|
||
record: 資料字典
|
||
operation: 操作類型 (INSERT/UPDATE/DELETE)
|
||
|
||
Example:
|
||
sync_record('realtime_sales_monthly', {
|
||
'日期': '2026-01-18',
|
||
'訂單編號': 'ORD123',
|
||
'商品名稱': '測試商品',
|
||
...
|
||
})
|
||
"""
|
||
if not SYNC_ENABLED:
|
||
return True
|
||
|
||
try:
|
||
with pg_transaction() as conn:
|
||
if not conn:
|
||
return False
|
||
|
||
cursor = conn.cursor()
|
||
|
||
if operation == 'DELETE':
|
||
# 刪除操作需要指定條件
|
||
if '訂單編號' in record:
|
||
cursor.execute(
|
||
f'DELETE FROM {table} WHERE "訂單編號" = %s',
|
||
(record['訂單編號'],)
|
||
)
|
||
else:
|
||
# INSERT 或 UPDATE (使用 UPSERT)
|
||
columns = list(record.keys())
|
||
values = list(record.values())
|
||
placeholders = ', '.join(['%s'] * len(values))
|
||
cols_sql = ', '.join([f'"{c}"' for c in columns])
|
||
|
||
# PostgreSQL UPSERT (INSERT ON CONFLICT)
|
||
sql = f'''
|
||
INSERT INTO {table} ({cols_sql})
|
||
VALUES ({placeholders})
|
||
'''
|
||
cursor.execute(sql, values)
|
||
|
||
cursor.close()
|
||
logger.debug(f"[SYNC] {operation} {table}: 1 筆")
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"[SYNC] 同步失敗 ({table}): {e}")
|
||
return False
|
||
|
||
|
||
def sync_batch(table: str, records: List[Dict[str, Any]]):
|
||
"""
|
||
批次同步多筆記錄
|
||
|
||
Args:
|
||
table: 資料表名稱
|
||
records: 資料列表
|
||
"""
|
||
if not SYNC_ENABLED or not records:
|
||
return True
|
||
|
||
try:
|
||
with pg_transaction() as conn:
|
||
if not conn:
|
||
return False
|
||
|
||
cursor = conn.cursor()
|
||
|
||
# 取得欄位 (從第一筆記錄)
|
||
columns = list(records[0].keys())
|
||
cols_sql = ', '.join([f'"{c}"' for c in columns])
|
||
placeholders = ', '.join(['%s'] * len(columns))
|
||
|
||
sql = f'INSERT INTO {table} ({cols_sql}) VALUES ({placeholders})'
|
||
|
||
# 批次插入
|
||
for record in records:
|
||
values = [record.get(col) for col in columns]
|
||
cursor.execute(sql, values)
|
||
|
||
cursor.close()
|
||
logger.info(f"[SYNC] BATCH INSERT {table}: {len(records)} 筆")
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"[SYNC] 批次同步失敗 ({table}): {e}")
|
||
return False
|
||
|
||
|
||
# =============================================================================
|
||
# 全量同步
|
||
# =============================================================================
|
||
|
||
def full_sync(table: str = None):
|
||
"""
|
||
執行全量同步
|
||
|
||
Args:
|
||
table: 指定資料表,None 表示同步所有表
|
||
"""
|
||
logger.info("=" * 60)
|
||
logger.info("開始全量同步...")
|
||
logger.info("=" * 60)
|
||
|
||
if not SYNC_ENABLED:
|
||
logger.warning("PostgreSQL 同步未啟用,請設定 PG_SYNC_ENABLED=true")
|
||
return False
|
||
|
||
tables_to_sync = [table] if table else ['realtime_sales_monthly']
|
||
|
||
sqlite_conn = None
|
||
try:
|
||
# 連接 SQLite
|
||
if not os.path.exists(SQLITE_PATH):
|
||
logger.error(f"SQLite 資料庫不存在: {SQLITE_PATH}")
|
||
return False
|
||
|
||
sqlite_conn = sqlite3.connect(SQLITE_PATH)
|
||
sqlite_conn.row_factory = sqlite3.Row
|
||
|
||
for tbl in tables_to_sync:
|
||
sync_table_full(tbl, sqlite_conn)
|
||
|
||
logger.info("=" * 60)
|
||
logger.info("✅ 全量同步完成")
|
||
logger.info("=" * 60)
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"全量同步失敗: {e}")
|
||
return False
|
||
|
||
finally:
|
||
if sqlite_conn:
|
||
sqlite_conn.close()
|
||
|
||
|
||
def sanitize_column_name(col: str) -> str:
|
||
"""
|
||
轉換欄位名稱為 PostgreSQL 安全格式
|
||
- 將 % 替換為 _pct (避免 psycopg2 將其當作參數佔位符)
|
||
- 將 ( ) 替換為 _ (避免 SQL 語法問題)
|
||
"""
|
||
return col.replace('%', '_pct').replace('(', '_').replace(')', '_')
|
||
|
||
|
||
def sync_table_full(table: str, sqlite_conn):
|
||
"""同步單一資料表 (全量) - 同步所有欄位"""
|
||
logger.info(f"同步資料表: {table}")
|
||
|
||
try:
|
||
sqlite_cursor = sqlite_conn.cursor()
|
||
|
||
# 檢查資料表是否存在
|
||
sqlite_cursor.execute(
|
||
"SELECT name FROM sqlite_master WHERE type='table' AND name=?",
|
||
(table,)
|
||
)
|
||
if not sqlite_cursor.fetchone():
|
||
logger.warning(f"SQLite 中沒有 {table} 資料表")
|
||
return
|
||
|
||
# 取得 SQLite 所有欄位
|
||
sqlite_cursor.execute(f'PRAGMA table_info("{table}")')
|
||
col_info = sqlite_cursor.fetchall()
|
||
sqlite_columns = [c[1] for c in col_info] # 原始欄位名稱
|
||
|
||
# 轉換為 PostgreSQL 安全的欄位名稱
|
||
pg_columns = [sanitize_column_name(c) for c in sqlite_columns]
|
||
|
||
logger.info(f"同步所有欄位: {len(sqlite_columns)} 個")
|
||
|
||
# 檢查是否有特殊字元欄位
|
||
special_cols = [(i, c) for i, c in enumerate(sqlite_columns) if '%' in c or '(' in c or ')' in c]
|
||
if special_cols:
|
||
logger.info(f"包含特殊字元的欄位 ({len(special_cols)} 個):")
|
||
for i, c in special_cols:
|
||
logger.info(f" {i}: [{c}] -> [{sanitize_column_name(c)}]")
|
||
|
||
# 取得所有資料 (使用原始欄位名稱從 SQLite 讀取)
|
||
cols_sql = ', '.join([f'"{c}"' for c in sqlite_columns])
|
||
sqlite_cursor.execute(f'SELECT {cols_sql} FROM "{table}"')
|
||
rows = sqlite_cursor.fetchall()
|
||
|
||
logger.info(f"取得 {len(rows):,} 筆資料")
|
||
|
||
with pg_transaction() as conn:
|
||
if not conn:
|
||
return
|
||
|
||
cursor = conn.cursor()
|
||
|
||
# 動態建立/重建 PostgreSQL 表格
|
||
logger.info("重建 PostgreSQL 資料表...")
|
||
|
||
# 刪除舊表
|
||
cursor.execute(f'DROP TABLE IF EXISTS "{table}" CASCADE')
|
||
|
||
# 建立新表 (使用安全的欄位名稱)
|
||
cols_def = ', '.join([f'"{c}" TEXT' for c in pg_columns])
|
||
create_sql = f'CREATE TABLE "{table}" (id SERIAL PRIMARY KEY, {cols_def})'
|
||
cursor.execute(create_sql)
|
||
|
||
logger.info(f"已建立資料表,{len(pg_columns)} 個欄位")
|
||
|
||
# 批次插入
|
||
if rows:
|
||
insert_cols = ', '.join([f'"{c}"' for c in pg_columns])
|
||
placeholders = ', '.join(['%s'] * len(pg_columns))
|
||
sql = f'INSERT INTO "{table}" ({insert_cols}) VALUES ({placeholders})'
|
||
|
||
batch_size = 5000
|
||
total = len(rows)
|
||
for i in range(0, total, batch_size):
|
||
batch = rows[i:i + batch_size]
|
||
cursor.executemany(sql, batch)
|
||
progress = min(i + batch_size, total)
|
||
logger.info(f"已插入 {progress:,}/{total:,} 筆 ({progress*100//total}%)")
|
||
|
||
# 建立索引 (使用安全的欄位名稱)
|
||
logger.info("建立索引...")
|
||
index_cols = ['日期', '訂單編號', '廠商名稱', '分類名稱', '品牌名稱']
|
||
for col in index_cols:
|
||
pg_col = sanitize_column_name(col)
|
||
if pg_col in pg_columns:
|
||
try:
|
||
idx_name = f'idx_{table}_{pg_col}'.replace(' ', '_')
|
||
cursor.execute(f'CREATE INDEX IF NOT EXISTS "{idx_name}" ON "{table}" ("{pg_col}")')
|
||
except Exception as e:
|
||
logger.warning(f"建立索引 {pg_col} 失敗: {e}")
|
||
|
||
cursor.close()
|
||
|
||
logger.info(f"✅ {table} 同步完成: {len(rows):,} 筆,{len(pg_columns)} 個欄位")
|
||
|
||
except Exception as e:
|
||
logger.error(f"同步 {table} 失敗: {e}")
|
||
raise
|
||
|
||
|
||
# =============================================================================
|
||
# 一致性驗證
|
||
# =============================================================================
|
||
|
||
def verify_consistency():
|
||
"""驗證 SQLite 和 PostgreSQL 資料一致性"""
|
||
logger.info("開始驗證資料一致性...")
|
||
|
||
if not SYNC_ENABLED:
|
||
logger.warning("PostgreSQL 同步未啟用")
|
||
return False
|
||
|
||
try:
|
||
# SQLite 計數
|
||
sqlite_conn = sqlite3.connect(SQLITE_PATH)
|
||
sqlite_cursor = sqlite_conn.cursor()
|
||
sqlite_cursor.execute('SELECT COUNT(*) FROM realtime_sales_monthly')
|
||
sqlite_count = sqlite_cursor.fetchone()[0]
|
||
sqlite_conn.close()
|
||
|
||
# PostgreSQL 計數
|
||
with pg_transaction() as conn:
|
||
if not conn:
|
||
return False
|
||
cursor = conn.cursor()
|
||
cursor.execute('SELECT COUNT(*) FROM realtime_sales_monthly')
|
||
pg_count = cursor.fetchone()[0]
|
||
cursor.close()
|
||
|
||
# 比較
|
||
diff = sqlite_count - pg_count
|
||
if diff == 0:
|
||
logger.info(f"✅ 資料一致: SQLite={sqlite_count}, PostgreSQL={pg_count}")
|
||
return True
|
||
else:
|
||
logger.warning(f"⚠️ 資料不一致: SQLite={sqlite_count}, PostgreSQL={pg_count}, 差異={diff}")
|
||
return False
|
||
|
||
except Exception as e:
|
||
logger.error(f"驗證失敗: {e}")
|
||
return False
|
||
|
||
|
||
# =============================================================================
|
||
# 背景服務
|
||
# =============================================================================
|
||
|
||
def daemon_mode():
|
||
"""背景服務模式 - 定時同步"""
|
||
logger.info("啟動背景同步服務...")
|
||
logger.info(f"同步間隔: {SYNC_INTERVAL_SECONDS} 秒")
|
||
|
||
while True:
|
||
try:
|
||
# 驗證一致性
|
||
is_consistent = verify_consistency()
|
||
|
||
# 如果不一致,執行增量同步
|
||
if not is_consistent:
|
||
logger.info("偵測到資料不一致,執行增量同步...")
|
||
full_sync()
|
||
|
||
except Exception as e:
|
||
logger.error(f"同步循環錯誤: {e}")
|
||
|
||
time.sleep(SYNC_INTERVAL_SECONDS)
|
||
|
||
|
||
# =============================================================================
|
||
# 命令列介面
|
||
# =============================================================================
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(description='SQLite 到 PostgreSQL 資料同步')
|
||
parser.add_argument('--full', action='store_true', help='執行全量同步')
|
||
parser.add_argument('--verify', action='store_true', help='驗證資料一致性')
|
||
parser.add_argument('--daemon', action='store_true', help='背景服務模式')
|
||
args = parser.parse_args()
|
||
|
||
# 強制啟用同步 (命令列模式)
|
||
global SYNC_ENABLED
|
||
SYNC_ENABLED = True
|
||
|
||
if args.daemon:
|
||
daemon_mode()
|
||
elif args.verify:
|
||
verify_consistency()
|
||
elif args.full:
|
||
full_sync()
|
||
else:
|
||
parser.print_help()
|
||
|
||
|
||
if __name__ == '__main__':
|
||
main()
|