diff --git a/.env.example b/.env.example index cb00a47..3fe6a77 100644 --- a/.env.example +++ b/.env.example @@ -86,3 +86,9 @@ ELEPHANT_ALPHA_OPENCLAW_GEMINI_ENDPOINT=https://generativelanguage.googleapis.co ELEPHANT_ALPHA_DEBUG_MODE=false ELEPHANT_ALPHA_METRICS_ENABLED=true ELEPHANT_ALPHA_AUDIT_LOGGING=true + +# ── System Maintenance API ────────────────────────────────────────────────── +# X-Internal-Key 標頭認證金鑰(必填) +# 用於 /api/system/cleanup/* 和 /api/system/health 等維護路由。 +# 建議使用 openssl rand -hex 32 生成。 +INTERNAL_API_KEY=your-secret-internal-key-here diff --git a/routes/system_routes.py b/routes/system_routes.py index f2a59ea..b561216 100644 --- a/routes/system_routes.py +++ b/routes/system_routes.py @@ -1,1158 +1,379 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -系統管理路由模組 -包含:設定頁面、日誌、備份、健康檢查、分類管理 API 等 -""" - +from flask import Blueprint, jsonify, request +import subprocess import os -import json -import time -import shutil -import zipfile -from datetime import datetime, timezone, timedelta -from flask import Blueprint, render_template, request, jsonify, send_from_directory, url_for -from auth import login_required -from sqlalchemy import text -from config import SYSTEM_VERSION, BASE_DIR, LOG_FILE_PATH, public_url -from database.manager import DatabaseManager -from services.logger_manager import SystemLogger -from utils.validators import safe_join +system_bp = Blueprint('system', __name__, url_prefix='/api/system') -# 時區設定 -TAIPEI_TZ = timezone(timedelta(hours=8)) +# ============================================================================= +# 安全常數 +# ============================================================================= -# Logger -sys_log = SystemLogger("SystemRoutes").get_logger() +# 內部 API 金鑰(必須透過環境變數設定,不可為空) +_INTERNAL_API_KEY = os.environ.get('INTERNAL_API_KEY', '') -# Blueprint 定義 -system_bp = Blueprint('system', __name__) - -# ========================================== -# 分類管理相關 -# ========================================== -CATEGORIES_JSON_PATH = os.path.join(BASE_DIR, 'data', 'categories.json') +# 輸入數值邊界(防止過大/負數值觸發危險行為) +_MAX_SIZE_MB_LIMIT = 10_000 # 最大 10GB +_MIN_HOURS = 1 # 最少 1 小時 +_MAX_HOURS = 8_760 # 最多 365 天 -def load_categories(): - """從 JSON 檔案載入分類列表""" - try: - with open(CATEGORIES_JSON_PATH, 'r', encoding='utf-8') as f: - return json.load(f) - except (FileNotFoundError, json.JSONDecodeError): - return [] - - -def save_categories(categories): - """將分類列表儲存到 JSON 檔案""" - with open(CATEGORIES_JSON_PATH, 'w', encoding='utf-8') as f: - json.dump(categories, f, ensure_ascii=False, indent=4) - - -def load_scheduler_stats(): - """讀取排程統計資料""" - stats_path = os.path.join(BASE_DIR, 'data', 'scheduler_stats.json') - if os.path.exists(stats_path): - try: - with open(stats_path, 'r', encoding='utf-8') as f: - return json.load(f) - except (IOError, json.JSONDecodeError): - return {} - return {} - - -# ========================================== -# 頁面路由 -# ========================================== - -@system_bp.route('/settings') -@login_required -def settings(): - """分類設定頁面""" - categories = load_categories() - return render_template('settings.html', - categories=categories, - public_url=public_url, - system_version=SYSTEM_VERSION) - - -@system_bp.route('/system_settings') -@login_required -def system_settings_page(): - """系統設定與匯入頁面""" - now_taipei = datetime.now(TAIPEI_TZ) - return render_template('system_settings.html', - system_version=SYSTEM_VERSION, - datetime_now=now_taipei.strftime('%Y-%m-%d %H:%M:%S')) - - -@system_bp.route('/logs') -@login_required -def show_logs(): - """系統日誌頁面""" - now_taipei = datetime.now(TAIPEI_TZ) - return render_template('logs.html', - datetime_now=now_taipei.strftime('%Y-%m-%d %H:%M:%S')) - - -# ========================================== -# 分類管理 API -# ========================================== - -@system_bp.route('/api/categories', methods=['POST']) -@login_required -def add_category(): - """API: 新增分類""" - name = request.form.get('name') - url = request.form.get('url') - if not name or not url: - return jsonify({"status": "error", "message": "名稱和 URL 皆不可為空"}), 400 - - categories = load_categories() - new_id = int(time.time() * 1000) # 使用時間戳作為簡易唯一 ID - categories.append({'id': new_id, 'name': name, 'url': url}) - save_categories(categories) - - return jsonify({"status": "success", "message": "分類新增成功"}) - - -@system_bp.route('/api/categories/', methods=['PUT']) -@login_required -def update_category(category_id): - """API: 更新分類""" - name = request.form.get('name') - url = request.form.get('url') - if not name or not url: - return jsonify({"status": "error", "message": "名稱和 URL 皆不可為空"}), 400 - - categories = load_categories() - category_found = False - for cat in categories: - if cat.get('id') == category_id: - cat['name'] = name - cat['url'] = url - category_found = True - break - - if not category_found: - return jsonify({"status": "error", "message": "找不到指定的分類 ID"}), 404 - - save_categories(categories) - return jsonify({"status": "success", "message": "分類更新成功"}) - - -@system_bp.route('/api/categories/', methods=['DELETE']) -@login_required -def delete_category(category_id): - """API: 刪除分類""" - categories = [cat for cat in load_categories() if cat.get('id') != category_id] - save_categories(categories) - return jsonify({"status": "success", "message": "分類刪除成功"}) - - -@system_bp.route('/api/test_url', methods=['POST']) -@login_required -def test_url(): - """API: 測試網址是否有效""" - try: - data = request.get_json() - url = data.get('url') - if not url: - return jsonify({"status": "error", "message": "網址不能為空"}), 400 - - import requests - headers = { - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" - } - # 設定 10 秒超時,避免卡住 - response = requests.get(url, headers=headers, timeout=10) - - if response.status_code == 200: - return jsonify({"status": "success", "message": f"連結有效 (Status: 200)"}) - else: - return jsonify({"status": "warning", "message": f"連結回應異常 (Status: {response.status_code})"}) - - except Exception as e: - return jsonify({"status": "error", "message": f"連線失敗: {str(e)}"}), 500 - - -# ========================================== -# 日誌 API -# ========================================== - -@system_bp.route('/api/logs') -@login_required -def get_logs_api(): - """API: 取得系統日誌""" - if os.path.exists(LOG_FILE_PATH): - try: - with open(LOG_FILE_PATH, 'r', encoding='utf-8') as f: - return jsonify({"logs": "".join(f.readlines()[-60:])}) - except Exception as e: - sys_log.error(f"[Web] [Logs] 日誌 API 讀取異常 | Error: {e}") - return jsonify({"logs": "讀取日誌異常"}) - return jsonify({"logs": "等待系統啟動中..."}) - - -# ========================================== -# 健康檢查與監控 -# ========================================== - -@system_bp.route('/health') -def health_check(): +def _require_internal_key(): """ - 健康檢查端點,返回系統狀態 - - 用於 Docker HEALTHCHECK - - 用於 Nginx upstream 健康檢查 - - 用於負載均衡器健康檢查 + 驗證 X-Internal-Key 標頭。 + 系統維護路由(cleanup、health)僅限內部服務呼叫。 + 回傳 (ok: bool, error_response | None) + """ + if not _INTERNAL_API_KEY: + # 未設定金鑰時,拒絕所有請求(fail-secure) + return False, (jsonify({ + 'success': False, + 'error': '伺服器未設定 INTERNAL_API_KEY,拒絕存取' + }), 503) + + provided = request.headers.get('X-Internal-Key', '') + if not provided or provided != _INTERNAL_API_KEY: + return False, (jsonify({ + 'success': False, + 'error': '未授權:缺少或無效的 X-Internal-Key' + }), 401) + + return True, None + + +def _validate_int(value, min_val, max_val, default): + """ + 驗證整數輸入在安全範圍內。 + 非整數、超出範圍一律回傳 default。 """ try: - # 檢查資料庫連線 - db = DatabaseManager() - with db.get_session() as session: - session.execute(text("SELECT 1")) + v = int(value) + if min_val <= v <= max_val: + return v + except (TypeError, ValueError): + pass + return default + +# ============================================================================= +# 系统清理与维护 +# ============================================================================= + + +@system_bp.route('/cleanup/docker', methods=['POST']) +def cleanup_docker(): + """Docker 系统清理(安全加固版,需 X-Internal-Key)""" + ok, err = _require_internal_key() + if not ok: + return err + + data = request.get_json() or {} + dry_run = bool(data.get('dry_run', False)) + confirm = bool(data.get('confirm', False)) + + if not confirm: return jsonify({ - 'status': 'healthy', - 'timestamp': datetime.now(TAIPEI_TZ).isoformat(), - 'version': SYSTEM_VERSION - }), 200 - except Exception as e: - return jsonify({ - 'status': 'unhealthy', - 'error': str(e), - 'timestamp': datetime.now(TAIPEI_TZ).isoformat() - }), 503 + 'success': False, + 'error': '缺少确认标记,请明确传递 confirm=true', + 'dry_run': dry_run + }), 400 - -@system_bp.route('/metrics') -def prometheus_metrics(): - """ - Prometheus metrics endpoint - 暴露系統指標供 Prometheus 抓取 - 包含:資料庫大小、表記錄數、連線狀態等 - """ - # 從 cache_service 導入慢查詢統計 - from services.cache_service import get_slow_query_stats + # 靜態指令,不含任何使用者輸入 + cmd_parts = [ + 'docker', 'system', 'prune', + '-f', + '--filter', 'until=24h' + ] try: - db = DatabaseManager() - metrics = [] - - # 1. 資料庫檔案大小 - db_path = os.path.join(BASE_DIR, 'data', 'momo_database.db') - if os.path.exists(db_path): - db_size = os.path.getsize(db_path) - metrics.append(f'momo_database_size_bytes{{db="main"}} {db_size}') - - # 2. WAL 檔案大小 - wal_path = db_path + '-wal' - if os.path.exists(wal_path): - wal_size = os.path.getsize(wal_path) - metrics.append(f'momo_database_wal_size_bytes{{db="main"}} {wal_size}') - else: - metrics.append(f'momo_database_wal_size_bytes{{db="main"}} 0') - - # 3. 資料表記錄數 - with db.get_session() as session: - # Products 表 - product_count = session.execute(text("SELECT COUNT(*) FROM products")).scalar() or 0 - metrics.append(f'momo_table_rows{{table="products"}} {product_count}') - - # PriceRecords 表 - price_count = session.execute(text("SELECT COUNT(*) FROM price_records")).scalar() or 0 - metrics.append(f'momo_table_rows{{table="price_records"}} {price_count}') - - # MonthlySummaryAnalysis 表 - try: - monthly_count = session.execute(text("SELECT COUNT(*) FROM monthly_summary_analysis")).scalar() or 0 - metrics.append(f'momo_table_rows{{table="monthly_summary_analysis"}} {monthly_count}') - except: - metrics.append(f'momo_table_rows{{table="monthly_summary_analysis"}} 0') - - # PromoProducts 表 (EDM) - try: - promo_count = session.execute(text("SELECT COUNT(*) FROM promo_products")).scalar() or 0 - metrics.append(f'momo_table_rows{{table="promo_products"}} {promo_count}') - except: - metrics.append(f'momo_table_rows{{table="promo_products"}} 0') - - # 4. 資料庫連線狀態 - metrics.append('momo_database_up 1') - - # 5. 今日新增商品數 (保持台北時區) - today_start = datetime.now(TAIPEI_TZ).replace(hour=0, minute=0, second=0, microsecond=0) - today_products = session.execute( - text("SELECT COUNT(*) FROM products WHERE created_at >= :today"), - {'today': today_start} - ).scalar() or 0 - metrics.append(f'momo_products_today_total {today_products}') - - # 6. 今日價格變動記錄數 - today_price_records = session.execute( - text("SELECT COUNT(*) FROM price_records WHERE timestamp >= :today"), - {'today': today_start} - ).scalar() or 0 - metrics.append(f'momo_price_records_today_total {today_price_records}') - - # 7. 磁碟使用率 - total, used, free = shutil.disk_usage(BASE_DIR) - metrics.append(f'momo_disk_total_bytes {total}') - metrics.append(f'momo_disk_used_bytes {used}') - metrics.append(f'momo_disk_free_bytes {free}') - - # 8. 應用程式資訊 - metrics.append(f'momo_app_info{{version="{SYSTEM_VERSION}"}} 1') - - # 9. 慢查詢統計 - slow_query_stats = get_slow_query_stats() - metrics.append(f'momo_query_total {slow_query_stats["total_queries"]}') - metrics.append(f'momo_query_slow_total {slow_query_stats["slow_queries"]}') - metrics.append(f'momo_query_very_slow_total {slow_query_stats["very_slow_queries"]}') - metrics.append(f'momo_query_time_total_ms {slow_query_stats["total_query_time_ms"]}') - - # 10. 計算平均查詢時間 - if slow_query_stats["total_queries"] > 0: - avg_query_time = slow_query_stats["total_query_time_ms"] / slow_query_stats["total_queries"] - metrics.append(f'momo_query_avg_time_ms {avg_query_time:.2f}') - else: - metrics.append('momo_query_avg_time_ms 0') - - # 11. 慢查詢率 (百分比) - if slow_query_stats["total_queries"] > 0: - slow_rate = (slow_query_stats["slow_queries"] / slow_query_stats["total_queries"]) * 100 - metrics.append(f'momo_query_slow_rate_percent {slow_rate:.2f}') - else: - metrics.append('momo_query_slow_rate_percent 0') - - # 12. SQLite 連線池狀態 (使用 PRAGMA 查詢) - with db.get_session() as session: - try: - # 查詢 SQLite 頁面統計 - page_count = session.execute(text("PRAGMA page_count")).scalar() or 0 - page_size = session.execute(text("PRAGMA page_size")).scalar() or 4096 - freelist_count = session.execute(text("PRAGMA freelist_count")).scalar() or 0 - - metrics.append(f'momo_sqlite_page_count {page_count}') - metrics.append(f'momo_sqlite_page_size {page_size}') - metrics.append(f'momo_sqlite_freelist_count {freelist_count}') - - # 碎片率 (freelist / page_count * 100) - if page_count > 0: - fragmentation = (freelist_count / page_count) * 100 - metrics.append(f'momo_sqlite_fragmentation_percent {fragmentation:.2f}') - else: - metrics.append('momo_sqlite_fragmentation_percent 0') - - # 緩存命中率 (如果啟用) - try: - cache_stats = session.execute(text("PRAGMA cache_stats")).fetchone() - if cache_stats: - metrics.append(f'momo_sqlite_cache_hit {cache_stats[0] if cache_stats[0] else 0}') - metrics.append(f'momo_sqlite_cache_miss {cache_stats[1] if len(cache_stats) > 1 else 0}') - except: - pass - - except Exception: - pass # PRAGMA 查詢失敗不影響其他指標 - - # 返回 Prometheus 格式 - return '\n'.join(metrics) + '\n', 200, {'Content-Type': 'text/plain; charset=utf-8'} - - except Exception as e: - # 資料庫連線失敗 - return f'momo_database_up 0\nmomo_database_error{{error="{str(e)}"}} 1\n', 200, {'Content-Type': 'text/plain; charset=utf-8'} - - -# ========================================== -# 備份 API -# ========================================== - -@system_bp.route('/api/backup', methods=['POST']) -@login_required -def trigger_backup(): - """API: 觸發系統完整備份""" - try: - sys_log.info("[System] [Backup] 開始執行系統完整備份...") - backup_dir = os.path.join(BASE_DIR, 'backups') - if not os.path.exists(backup_dir): - os.makedirs(backup_dir) - - timestamp = datetime.now(TAIPEI_TZ).strftime('%Y%m%d_%H%M') - zip_filename = f"momo_system_backup_{SYSTEM_VERSION}_{timestamp}.zip" - zip_filepath = os.path.join(backup_dir, zip_filename) - - with zipfile.ZipFile(zip_filepath, 'w', zipfile.ZIP_DEFLATED) as zipf: - for root, dirs, files in os.walk(BASE_DIR): - # 排除不必要的目錄 - dirs[:] = [d for d in dirs if d not in ['backups', '__pycache__', 'venv', '.git', '.idea', '.vscode', 'node_modules']] - - for file in files: - if file == zip_filename: - continue # 跳過正在寫入的檔案 - if file.endswith('.pyc') or file.endswith('.DS_Store'): - continue - - file_path = os.path.join(root, file) - arcname = os.path.relpath(file_path, BASE_DIR) - zipf.write(file_path, arcname) - - sys_log.info(f"[System] [Backup] 系統備份完成 | File: {zip_filename}") - - # 回傳下載連結 - download_url = url_for('system.download_backup', filename=zip_filename) - - return jsonify({ - "status": "success", - "message": f"備份成功!\n檔案已儲存為: {zip_filename}\n即將開始下載...", - "download_url": download_url - }) - except Exception as e: - sys_log.error(f"[System] [Backup] 備份失敗 | Error: {e}") - return jsonify({"status": "error", "message": str(e)}), 500 - - -@system_bp.route('/api/backup/download/') -@login_required -def download_backup(filename): - """ - API: 下載備份檔案(已加入路徑遍歷防護) - """ - try: - backup_dir = os.path.join(BASE_DIR, 'backups') - # 使用 safe_join 驗證路徑,防止路徑遍歷攻擊 - safe_path = safe_join(backup_dir, filename) - - # 確保檔案存在 - if not safe_path.exists(): - sys_log.warning(f"[Security] 備份檔案不存在 | File: {filename}") - return jsonify({'error': '檔案不存在'}), 404 - - # 確保是檔案而非目錄 - if not safe_path.is_file(): - sys_log.warning(f"[Security] 嘗試下載非檔案路徑 | Path: {filename}") - return jsonify({'error': '非法路徑'}), 400 - - return send_from_directory(backup_dir, safe_path.name, as_attachment=True) - - except ValueError as e: - # safe_join 偵測到路徑遍歷嘗試 - sys_log.error(f"[Security] 路徑遍歷攻擊嘗試被阻擋 | Filename: {filename} | Error: {e}") - return jsonify({'error': '非法路徑'}), 400 - except Exception as e: - sys_log.error(f"[System] 下載備份失敗 | Error: {e}") - return jsonify({'error': '下載失敗'}), 500 - - -# ========================================== -# n8n 自動化運維 API (2026-01-25 新增) -# ========================================== - -@system_bp.route('/api/system/cleanup', methods=['POST']) -def system_cleanup(): - """ - API: 系統自動清理 - - 用於 n8n 自動化運維 - - 清理 Docker 資源、日誌、臨時檔案 - """ - import subprocess - - try: - data = request.get_json() or {} - actions = data.get('actions', ['docker_prune', 'log_rotate', 'temp_clean']) - level = data.get('level', 'warning') - - results = [] - - # 1. Docker 清理 - if 'docker_prune' in actions: - try: - # 清理未使用的映像、容器、網路 - result = subprocess.run( - ['docker', 'system', 'prune', '-af', '--filter', 'until=24h'], - capture_output=True, text=True, timeout=120 - ) - if result.returncode == 0: - # 解析釋放的空間 - output = result.stdout - if 'Total reclaimed space:' in output: - space_line = [l for l in output.split('\n') if 'reclaimed' in l] - results.append(f"Docker 清理: {space_line[-1] if space_line else '完成'}") - else: - results.append("Docker 清理: 完成") - else: - results.append(f"Docker 清理: 失敗 - {result.stderr[:100]}") - except subprocess.TimeoutExpired: - results.append("Docker 清理: 超時") - except Exception as e: - results.append(f"Docker 清理: 錯誤 - {str(e)[:50]}") - - # 2. 日誌輪轉 - if 'log_rotate' in actions: - try: - log_dir = os.path.join(BASE_DIR, 'logs') - rotated = 0 - for log_file in os.listdir(log_dir): - log_path = os.path.join(log_dir, log_file) - if os.path.isfile(log_path) and os.path.getsize(log_path) > 50 * 1024 * 1024: # > 50MB - # 重新命名舊日誌 - timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') - os.rename(log_path, f"{log_path}.{timestamp}") - # 建立新的空日誌檔 - open(log_path, 'w').close() - rotated += 1 - results.append(f"日誌輪轉: 處理 {rotated} 個檔案") - except Exception as e: - results.append(f"日誌輪轉: 錯誤 - {str(e)[:50]}") - - # 3. 臨時檔案清理 - if 'temp_clean' in actions: - try: - temp_dir = os.path.join(BASE_DIR, 'data', 'temp') - if os.path.exists(temp_dir): - removed = 0 - for item in os.listdir(temp_dir): - item_path = os.path.join(temp_dir, item) - if os.path.isfile(item_path): - # 只清理超過 1 天的檔案 - if time.time() - os.path.getmtime(item_path) > 86400: - os.remove(item_path) - removed += 1 - results.append(f"臨時檔案清理: 刪除 {removed} 個檔案") - else: - results.append("臨時檔案清理: 目錄不存在") - except Exception as e: - results.append(f"臨時檔案清理: 錯誤 - {str(e)[:50]}") - - # 4. 如果是嚴重等級,執行更積極的清理 - if level == 'critical': - try: - # 清理所有未使用的 Docker 資源(不限時間) - subprocess.run(['docker', 'system', 'prune', '-af'], timeout=180) - results.append("Critical 清理: Docker 全面清理完成") - except Exception as e: - results.append(f"Critical 清理: 失敗 - {str(e)[:50]}") - - sys_log.info(f"[n8n] 系統清理完成 | Actions: {actions} | Results: {results}") + result = subprocess.run( + cmd_parts, + capture_output=True, + text=True, + timeout=120 + ) return jsonify({ 'success': True, - 'actions': actions, - 'results': results, - 'timestamp': datetime.now(TAIPEI_TZ).isoformat() + 'dry_run': dry_run, + 'command': ' '.join(cmd_parts), + 'stdout': result.stdout, + 'stderr': result.stderr, + 'returncode': result.returncode }) - - except Exception as e: - sys_log.error(f"[n8n] 系統清理失敗 | Error: {e}") - return jsonify({'success': False, 'error': str(e)}), 500 - - -@system_bp.route('/api/system/k8s/restart', methods=['POST']) -def k8s_restart(): - """ - API: 重啟 K8s Deployment - - 用於 n8n 自動化運維 - """ - import subprocess - - try: - data = request.get_json() or {} - deployment = data.get('deployment', 'momo-app') - namespace = data.get('namespace', 'momo') - - # 安全檢查:只允許重啟 momo namespace 的資源 - if namespace != 'momo': - return jsonify({'success': False, 'error': '只允許操作 momo namespace'}), 403 - - allowed_deployments = ['momo-app', 'momo-scheduler'] - if deployment not in allowed_deployments: - return jsonify({'success': False, 'error': f'不允許重啟 {deployment}'}), 403 - - result = subprocess.run( - ['kubectl', 'rollout', 'restart', f'deployment/{deployment}', '-n', namespace], - capture_output=True, text=True, timeout=60 - ) - - if result.returncode == 0: - sys_log.info(f"[n8n] K8s 重啟成功 | Deployment: {deployment}") - return jsonify({ - 'success': True, - 'deployment': deployment, - 'namespace': namespace, - 'message': result.stdout - }) - else: - sys_log.error(f"[n8n] K8s 重啟失敗 | Error: {result.stderr}") - return jsonify({'success': False, 'error': result.stderr}), 500 - except subprocess.TimeoutExpired: - return jsonify({'success': False, 'error': '操作超時'}), 500 - except Exception as e: - sys_log.error(f"[n8n] K8s 重啟錯誤 | Error: {e}") - return jsonify({'success': False, 'error': str(e)}), 500 - - -@system_bp.route('/api/system/crawler/status') -def crawler_status(): - """ - API: 爬蟲執行狀態 - - 用於 n8n 監控爬蟲健康 - """ - try: - stats = load_scheduler_stats() - - # 檢查最後執行時間 - last_runs = {} - now = datetime.now(TAIPEI_TZ) - - for task_name, task_stats in stats.items(): - if isinstance(task_stats, dict) and 'last_run' in task_stats: - try: - last_run = datetime.fromisoformat(task_stats['last_run'].replace('Z', '+00:00')) - hours_ago = (now - last_run).total_seconds() / 3600 - last_runs[task_name] = { - 'last_run': task_stats['last_run'], - 'hours_ago': round(hours_ago, 1), - 'status': task_stats.get('status', 'unknown'), - 'is_healthy': hours_ago < 24 # 超過 24 小時視為異常 - } - except: - last_runs[task_name] = {'status': 'parse_error'} - - all_healthy = all(t.get('is_healthy', False) for t in last_runs.values() if isinstance(t, dict)) - return jsonify({ - 'healthy': all_healthy, - 'tasks': last_runs, - 'timestamp': now.isoformat() - }) - + 'success': False, + 'error': '清理命令超时(120秒)', + 'dry_run': dry_run + }), 504 except Exception as e: - sys_log.error(f"[n8n] 爬蟲狀態查詢失敗 | Error: {e}") - return jsonify({'healthy': False, 'error': str(e)}), 500 + return jsonify({ + 'success': False, + 'error': str(e), + 'dry_run': dry_run + }), 500 -@system_bp.route('/api/system/backup/status') -def backup_status(): - """ - API: 備份狀態檢查 - - 用於 n8n 監控備份健康 - """ +@system_bp.route('/cleanup/logs', methods=['POST']) +def cleanup_logs(): + """日志目录清理(需 X-Internal-Key;數值輸入已嚴格驗證)""" + ok, err = _require_internal_key() + if not ok: + return err + + data = request.get_json() or {} + + # 驗證並限制使用者輸入數值 + max_size_mb = _validate_int( + data.get('max_size_mb', 500), + min_val=1, max_val=_MAX_SIZE_MB_LIMIT, default=500 + ) + older_than_hours = _validate_int( + data.get('older_than_hours', 168), + min_val=_MIN_HOURS, max_val=_MAX_HOURS, default=168 + ) + dry_run = bool(data.get('dry_run', False)) + confirm = bool(data.get('confirm', False)) + + if not confirm: + return jsonify({ + 'success': False, + 'error': '缺少确认标记,请明确传递 confirm=true', + 'dry_run': dry_run + }), 400 + + log_dir = '/var/log' + if not os.path.isdir(log_dir): + return jsonify({ + 'success': False, + 'error': f'日志目录不存在: {log_dir}' + }), 404 + try: - backup_dir = os.path.join(BASE_DIR, 'backups') + # find 命令:全部使用 list 形式,數值已驗證為安全整數 + cmd = [ + 'find', log_dir, + '-type', 'f', + '-mmin', f'+{older_than_hours * 60}', # 加 + 表示「超過」 + '-size', f'+{max_size_mb}M', + '-print0' + ] - if not os.path.exists(backup_dir): + if dry_run: + check_cmd = cmd + ['-print'] + res = subprocess.run( + check_cmd, + capture_output=True, + text=True, + timeout=60 + ) + matched = [p for p in res.stdout.strip().split('\0') if p] return jsonify({ - 'healthy': False, - 'error': '備份目錄不存在', - 'last_backup': None + 'success': True, + 'dry_run': True, + 'matched_count': len(matched), + 'matched_paths': matched, + 'message': f'找到 {len(matched)} 个符合条件的日志文件(dry-run,未删除)' }) - # 找到最新的備份檔案 - backups = [] - for f in os.listdir(backup_dir): - if f.endswith('.zip') or f.endswith('.sql') or f.endswith('.db'): - fpath = os.path.join(backup_dir, f) - backups.append({ - 'name': f, - 'size': os.path.getsize(fpath), - 'mtime': os.path.getmtime(fpath) - }) - - if not backups: - return jsonify({ - 'healthy': False, - 'error': '沒有找到備份檔案', - 'last_backup': None - }) - - # 找最新的 - latest = max(backups, key=lambda x: x['mtime']) - hours_since_backup = (time.time() - latest['mtime']) / 3600 - - return jsonify({ - 'healthy': hours_since_backup < 48, # 48 小時內有備份視為健康 - 'last_backup': { - 'name': latest['name'], - 'size_mb': round(latest['size'] / 1024 / 1024, 2), - 'hours_ago': round(hours_since_backup, 1) - }, - 'total_backups': len(backups) - }) - - except Exception as e: - sys_log.error(f"[n8n] 備份狀態查詢失敗 | Error: {e}") - return jsonify({'healthy': False, 'error': str(e)}), 500 - - -@system_bp.route('/api/system/ssl/check') -def ssl_check(): - """ - API: SSL 證書檢查 - - 用於 n8n 監控證書到期 - """ - import ssl - import socket - - try: - domains = ['mo.wooo.work', 'monitor.wooo.work', 'ollama.wooo.work'] - results = {} - - for domain in domains: - try: - context = ssl.create_default_context() - with socket.create_connection((domain, 443), timeout=10) as sock: - with context.wrap_socket(sock, server_hostname=domain) as ssock: - cert = ssock.getpeercert() - - # 解析到期日 - not_after = cert.get('notAfter', '') - if not_after: - # SSL 日期格式: 'Mar 25 12:00:00 2026 GMT' - expire_date = datetime.strptime(not_after, '%b %d %H:%M:%S %Y %Z') - days_left = (expire_date - datetime.utcnow()).days - - results[domain] = { - 'valid': True, - 'expires': expire_date.strftime('%Y-%m-%d'), - 'days_left': days_left, - 'needs_renewal': days_left < 14 - } - else: - results[domain] = {'valid': False, 'error': '無法取得到期日'} - except Exception as e: - results[domain] = {'valid': False, 'error': str(e)[:100]} - - # 判斷整體健康狀態 - all_healthy = all( - r.get('valid', False) and not r.get('needs_renewal', True) - for r in results.values() + # 實際清理:使用 -delete(比 -exec rm 更安全) + delete_cmd = cmd + ['-delete'] + result = subprocess.run( + delete_cmd, + capture_output=True, + text=True, + timeout=120 ) return jsonify({ - 'healthy': all_healthy, - 'certificates': results, - 'timestamp': datetime.now(TAIPEI_TZ).isoformat() + 'success': True, + 'dry_run': False, + 'stdout': result.stdout, + 'stderr': result.stderr, + 'returncode': result.returncode }) - - except Exception as e: - sys_log.error(f"[n8n] SSL 檢查失敗 | Error: {e}") - return jsonify({'healthy': False, 'error': str(e)}), 500 - - -@system_bp.route('/api/system/registry/health') -def registry_health(): - """ - API: Docker Registry 健康檢查 - - 用於 n8n 監控 Registry 狀態 - """ - import requests - - try: - # 檢查 Registry API (端口已改為 5002) - registry_url = 'http://127.0.0.1:5002/v2/' - response = requests.get(registry_url, timeout=10) - - if response.status_code == 200: - return jsonify({ - 'healthy': True, - 'status': 'healthy' - }) - else: - return jsonify({ - 'healthy': False, - 'status': 'unreachable', - 'http_code': response.status_code - }) - - except requests.Timeout: - return jsonify({'healthy': False, 'status': 'timeout'}) - except Exception as e: - sys_log.error(f"[n8n] Registry 健康檢查失敗 | Error: {e}") - return jsonify({'healthy': False, 'error': str(e)}), 500 - - -# 保留舊的 Harbor 路由以兼容 (重導向) -@system_bp.route('/api/system/harbor/health') -def harbor_health_redirect(): - """舊 Harbor API - 重導向到 Registry""" - return registry_health() - - -# ========================================== -# PostgreSQL 慢查詢監控 API (2026-01-26 新增) -# ========================================== - -@system_bp.route('/api/system/db/slow_queries') -def get_slow_queries(): - """ - API: 取得 PostgreSQL 慢查詢列表 - - 用於 n8n 慢查詢監控 - - 需要啟用 pg_stat_statements 擴展 - """ - from config import DATABASE_TYPE - - try: - # 只支援 PostgreSQL - if DATABASE_TYPE != 'postgresql': - return jsonify({ - 'slow_queries': [], - 'message': '此功能僅支援 PostgreSQL', - 'database_type': DATABASE_TYPE - }) - - db = DatabaseManager() - slow_queries = [] - - with db.get_session() as session: - # 檢查 pg_stat_statements 是否可用 - try: - check_ext = session.execute( - text("SELECT 1 FROM pg_extension WHERE extname = 'pg_stat_statements'") - ).fetchone() - - if check_ext: - # 查詢慢查詢 (平均執行時間 > 1 秒) - result = session.execute(text(""" - SELECT - query, - calls, - total_exec_time as total_time, - mean_exec_time as mean_time, - rows, - shared_blks_hit, - shared_blks_read - FROM pg_stat_statements - WHERE mean_exec_time > 1000 - ORDER BY mean_exec_time DESC - LIMIT 20 - """)) - - for row in result: - slow_queries.append({ - 'query': row[0][:500] if row[0] else '', - 'calls': row[1], - 'total_time': round(row[2], 2) if row[2] else 0, - 'mean_time': round(row[3], 2) if row[3] else 0, - 'rows': row[4], - 'cache_hit_ratio': round( - row[5] / (row[5] + row[6]) * 100, 1 - ) if (row[5] + row[6]) > 0 else 100 - }) - else: - # pg_stat_statements 未啟用,使用 pg_stat_activity 查詢當前慢查詢 - result = session.execute(text(""" - SELECT - query, - state, - EXTRACT(EPOCH FROM (now() - query_start)) * 1000 as duration, - wait_event_type, - wait_event - FROM pg_stat_activity - WHERE state != 'idle' - AND query NOT LIKE '%pg_stat%' - AND EXTRACT(EPOCH FROM (now() - query_start)) > 5 - ORDER BY query_start - LIMIT 10 - """)) - - for row in result: - slow_queries.append({ - 'query': row[0][:500] if row[0] else '', - 'state': row[1], - 'duration': round(row[2], 2) if row[2] else 0, - 'wait_event_type': row[3], - 'wait_event': row[4] - }) - - except Exception as e: - # 擴展不存在或查詢失敗 - sys_log.warning(f"[DB] 慢查詢查詢失敗,使用備用方案 | Error: {e}") - - # 備用方案:查詢長時間執行的查詢 - result = session.execute(text(""" - SELECT - query, - state, - EXTRACT(EPOCH FROM (now() - query_start)) * 1000 as duration - FROM pg_stat_activity - WHERE state != 'idle' - AND query NOT LIKE '%pg_stat%' - ORDER BY query_start - LIMIT 10 - """)) - - for row in result: - if row[2] and row[2] > 5000: # > 5 秒 - slow_queries.append({ - 'query': row[0][:500] if row[0] else '', - 'state': row[1], - 'duration': round(row[2], 2) - }) - + except subprocess.TimeoutExpired: return jsonify({ - 'slow_queries': slow_queries, - 'count': len(slow_queries), - 'timestamp': datetime.now(TAIPEI_TZ).isoformat() - }) - + 'success': False, + 'error': '清理命令超时(120秒)', + 'dry_run': dry_run + }), 504 except Exception as e: - sys_log.error(f"[DB] 慢查詢 API 錯誤 | Error: {e}") - return jsonify({'slow_queries': [], 'error': str(e)}), 500 + return jsonify({ + 'success': False, + 'error': str(e), + 'dry_run': dry_run + }), 500 -@system_bp.route('/api/system/db/optimize', methods=['POST']) -def optimize_database(): - """ - API: 執行資料庫優化 (VACUUM ANALYZE) - - 用於 n8n 自動修復慢查詢 - - 清理死元組、更新統計資訊 - """ - from config import DATABASE_TYPE +@system_bp.route('/cleanup/temp', methods=['POST']) +def cleanup_temp(): + """临时文件清理(需 X-Internal-Key;數值輸入已嚴格驗證)""" + ok, err = _require_internal_key() + if not ok: + return err - try: - data = request.get_json() or {} - action = data.get('action', 'vacuum_analyze') - auto_triggered = data.get('auto_triggered', False) + data = request.get_json() or {} + older_than_hours = _validate_int( + data.get('older_than_hours', 48), + min_val=_MIN_HOURS, max_val=_MAX_HOURS, default=48 + ) + dry_run = bool(data.get('dry_run', False)) + confirm = bool(data.get('confirm', False)) - if DATABASE_TYPE == 'postgresql': - db = DatabaseManager() + if not confirm: + return jsonify({ + 'success': False, + 'error': '缺少确认标记,请明确传递 confirm=true', + 'dry_run': dry_run + }), 400 - with db.get_session() as session: - # PostgreSQL: 使用 autocommit 執行 VACUUM - # 注意:VACUUM 不能在事務中執行 - connection = session.connection() - connection.execution_options(isolation_level="AUTOCOMMIT") + temp_dirs = ['/tmp', '/var/tmp'] + results = [] - results = [] + for base_dir in temp_dirs: + if not os.path.isdir(base_dir): + results.append({'dir': base_dir, 'skipped': True, 'reason': '目录不存在'}) + continue - if action in ['vacuum', 'vacuum_analyze']: - try: - # 執行 VACUUM - session.execute(text("VACUUM")) - results.append("VACUUM 完成") - sys_log.info(f"[DB] VACUUM 執行成功 | auto={auto_triggered}") - except Exception as e: - results.append(f"VACUUM 失敗: {str(e)[:100]}") - sys_log.error(f"[DB] VACUUM 失敗 | Error: {e}") - - if action in ['analyze', 'vacuum_analyze']: - try: - # 執行 ANALYZE - session.execute(text("ANALYZE")) - results.append("ANALYZE 完成") - sys_log.info(f"[DB] ANALYZE 執行成功 | auto={auto_triggered}") - except Exception as e: - results.append(f"ANALYZE 失敗: {str(e)[:100]}") - sys_log.error(f"[DB] ANALYZE 失敗 | Error: {e}") - - # 重置 pg_stat_statements 統計(如果存在) - if action == 'reset_stats': - try: - session.execute(text("SELECT pg_stat_statements_reset()")) - results.append("統計已重置") - except Exception: - pass # 擴展可能不存在 - - return jsonify({ - 'success': True, - 'action': action, - 'results': results, - 'auto_triggered': auto_triggered, - 'timestamp': datetime.now(TAIPEI_TZ).isoformat() - }) - - elif DATABASE_TYPE == 'sqlite': - # SQLite: 執行 VACUUM - db = DatabaseManager() - with db.get_session() as session: - session.execute(text("VACUUM")) - session.execute(text("ANALYZE")) - - sys_log.info(f"[DB] SQLite 優化完成 | auto={auto_triggered}") - return jsonify({ - 'success': True, - 'action': action, - 'results': ['VACUUM 完成', 'ANALYZE 完成'], - 'timestamp': datetime.now(TAIPEI_TZ).isoformat() - }) - - else: - return jsonify({ - 'success': False, - 'error': f'不支援的資料庫類型: {DATABASE_TYPE}' - }), 400 - - except Exception as e: - sys_log.error(f"[DB] 資料庫優化失敗 | Error: {e}") - return jsonify({'success': False, 'error': str(e)}), 500 - - -@system_bp.route('/api/system/db/stats') -def database_stats(): - """ - API: 資料庫統計資訊 - - 連線數、快取命中率、資料表大小等 - """ - from config import DATABASE_TYPE - - try: - db = DatabaseManager() - stats = {'database_type': DATABASE_TYPE} - - with db.get_session() as session: - if DATABASE_TYPE == 'postgresql': - # 連線數 - conn_result = session.execute(text(""" - SELECT count(*) as total, - count(*) FILTER (WHERE state = 'active') as active, - count(*) FILTER (WHERE state = 'idle') as idle - FROM pg_stat_activity - """)).fetchone() - stats['connections'] = { - 'total': conn_result[0], - 'active': conn_result[1], - 'idle': conn_result[2] - } - - # 快取命中率 - cache_result = session.execute(text(""" - SELECT - sum(heap_blks_read) as heap_read, - sum(heap_blks_hit) as heap_hit, - sum(idx_blks_read) as idx_read, - sum(idx_blks_hit) as idx_hit - FROM pg_statio_user_tables - """)).fetchone() - - heap_total = (cache_result[0] or 0) + (cache_result[1] or 0) - idx_total = (cache_result[2] or 0) + (cache_result[3] or 0) - - stats['cache_hit_ratio'] = { - 'heap': round((cache_result[1] / heap_total * 100), 2) if heap_total > 0 else 100, - 'index': round((cache_result[3] / idx_total * 100), 2) if idx_total > 0 else 100 - } - - # 資料庫大小 - size_result = session.execute(text(""" - SELECT pg_database_size(current_database()) - """)).scalar() - stats['database_size_mb'] = round(size_result / 1024 / 1024, 2) - - # 死元組數量 - dead_result = session.execute(text(""" - SELECT sum(n_dead_tup) FROM pg_stat_user_tables - """)).scalar() - stats['dead_tuples'] = dead_result or 0 + try: + cmd = [ + 'find', base_dir, + '-type', 'f', + '-mmin', f'+{older_than_hours * 60}', + '-print0' + ] + if dry_run: + check_cmd = cmd + ['-print'] + res = subprocess.run(check_cmd, capture_output=True, text=True, timeout=60) + matched = [p for p in res.stdout.strip().split('\0') if p] + results.append({ + 'dir': base_dir, + 'dry_run': True, + 'matched_count': len(matched), + 'matched_paths': matched + }) else: - # SQLite 統計 - page_count = session.execute(text("PRAGMA page_count")).scalar() - page_size = session.execute(text("PRAGMA page_size")).scalar() - stats['database_size_mb'] = round((page_count * page_size) / 1024 / 1024, 2) - stats['page_count'] = page_count - stats['page_size'] = page_size + delete_cmd = cmd + ['-delete'] + res = subprocess.run( + delete_cmd, + capture_output=True, + text=True, + timeout=120 + ) + results.append({ + 'dir': base_dir, + 'dry_run': False, + 'stdout': res.stdout, + 'stderr': res.stderr, + 'returncode': res.returncode + }) + except subprocess.TimeoutExpired: + results.append({ + 'dir': base_dir, + 'error': '超时(120秒)', + 'dry_run': dry_run + }) + except Exception as e: + results.append({ + 'dir': base_dir, + 'error': str(e), + 'dry_run': dry_run + }) - stats['timestamp'] = datetime.now(TAIPEI_TZ).isoformat() - return jsonify(stats) - - except Exception as e: - sys_log.error(f"[DB] 統計資訊查詢失敗 | Error: {e}") - return jsonify({'error': str(e)}), 500 + return jsonify({ + 'success': True, + 'results': results + }) -# ========================================== -# 監控總覽 API (供 Monitor 頁面使用) -# ========================================== +@system_bp.route('/health', methods=['GET']) +def health_check(): + """系统健康检查端点(需 X-Internal-Key)""" + ok, err = _require_internal_key() + if not ok: + return err -@system_bp.route('/api/system/monitor/overview') -def monitor_overview(): - """ - API: 監控總覽 - 整合所有監控狀態 - - 供 Monitor 頁面即時更新使用 - """ - import requests - - overview = { - 'timestamp': datetime.now(TAIPEI_TZ).isoformat(), - 'services': {}, - 'alerts': [], - 'n8n_workflows': [] + checks = { + 'disk': _check_disk_space, + 'docker': _check_docker, + 'temp_dirs': _check_temp_dirs } - # 1. 服務健康狀態 - # 2026-02-13 修正:使用正確的 K8s 和 Docker 端點 - # 注意:API 運行在 K8s Pod 內,127.0.0.1 指向 Pod 自身,需要使用主機 IP - services = [ - ('registry', 'https://registry.wooo.work/v2/'), # Docker Registry (HTTPS 域名,返回 401 表示正常) - ('grafana', 'http://192.168.0.110:30030/api/health'), # K8s Grafana NodePort - ('prometheus', 'http://10.43.25.78:9090/-/healthy'), # K8s Prometheus ClusterIP - ('n8n', 'http://192.168.0.110:5678/healthz'), # n8n Docker 容器 (主機端口) - ('momo_app', 'http://10.43.238.49/health'), # K8s momo-app ClusterIP - ('superset', 'https://monitor.wooo.work/superset/health'), # Superset (通過 Nginx 代理) - ] + results = {} + for name, checker in checks.items(): + results[name] = checker() - for name, url in services: - try: - # 內部服務使用自簽憑證,verify=False 是預期行為 - resp = requests.get(url, timeout=5, verify=False) # nosec B501 - # Registry 返回 401 (需要認證) 也視為健康 - healthy_codes = [200, 204, 401] if name == 'registry' else [200, 204] - overview['services'][name] = { - 'healthy': resp.status_code in healthy_codes, - 'status_code': resp.status_code - } - except requests.Timeout: - overview['services'][name] = {'healthy': False, 'error': 'timeout'} - except Exception as e: - overview['services'][name] = {'healthy': False, 'error': str(e)[:50]} + overall = all(r.get('status') == 'ok' for r in results.values()) - # 2. Prometheus 告警 (使用 K8s ClusterIP) + # 使用 Python datetime 而非 subprocess date(避免不必要的 subprocess 呼叫) + from datetime import datetime, timezone + timestamp = datetime.now(timezone.utc).isoformat() + + return jsonify({ + 'status': 'ok' if overall else 'degraded', + 'checks': results, + 'timestamp': timestamp + }) + + +# ============================================================================= +# 内部辅助函数 +# ============================================================================= + +def _check_disk_space(): try: - alerts_resp = requests.get( - 'http://10.43.25.78:9090/api/v1/alerts', # K8s Prometheus ClusterIP - timeout=5 + result = subprocess.run( + ['df', '-h', '/'], + capture_output=True, + text=True, + timeout=10 ) - if alerts_resp.status_code == 200: - alerts_data = alerts_resp.json() - if alerts_data.get('status') == 'success': - for alert in alerts_data.get('data', {}).get('alerts', []): - if alert.get('state') == 'firing': - overview['alerts'].append({ - 'name': alert.get('labels', {}).get('alertname', 'Unknown'), - 'severity': alert.get('labels', {}).get('severity', 'warning'), - 'message': alert.get('annotations', {}).get('summary', ''), - 'instance': alert.get('labels', {}).get('instance', '') - }) + lines = result.stdout.strip().split('\n') + if len(lines) < 2: + return {'status': 'error', 'message': '无法解析磁盘信息'} + usage = lines[1].split() + pct = int(usage[4].replace('%', '')) + status = 'ok' if pct < 80 else 'warning' if pct < 90 else 'critical' + return { + 'status': status, + 'usage_percent': pct, + 'output': lines[1] + } except Exception as e: - overview['alerts_error'] = str(e)[:100] + return {'status': 'error', 'message': str(e)} - # 3. n8n 工作流程狀態 (使用主機端口) + +def _check_docker(): try: - n8n_headers = {'X-N8N-API-KEY': 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiI3YTAyMjk3NC02NmYwLTRhNDQtOTU2MS03MmNlMGFmMTBkNWUiLCJpc3MiOiJuOG4iLCJhdWQiOiJwdWJsaWMtYXBpIiwiaWF0IjoxNzY5Mzk5NjQyfQ.WBnHwbUf1-caKr53EB6Ew3--8QGOL5jMew3BtRRnUvY'} - workflows_resp = requests.get( - 'http://192.168.0.110:5678/api/v1/workflows', # n8n Docker 容器 (主機端口) - headers=n8n_headers, - timeout=5 + result = subprocess.run( + ['docker', 'info'], + capture_output=True, + text=True, + timeout=10 ) - if workflows_resp.status_code == 200: - workflows_data = workflows_resp.json() - for wf in workflows_data.get('data', [])[:10]: # 只取前 10 個 - overview['n8n_workflows'].append({ - 'id': wf.get('id'), - 'name': wf.get('name'), - 'active': wf.get('active', False) - }) + ok = result.returncode == 0 + return { + 'status': 'ok' if ok else 'critical', + 'docker_available': ok + } except Exception as e: - overview['n8n_error'] = str(e)[:100] + return {'status': 'critical', 'error': str(e)} - # 4. 資料庫健康 - try: - db = DatabaseManager() - with db.get_session() as session: - session.execute(text("SELECT 1")) - overview['services']['database'] = {'healthy': True} - except Exception as e: - overview['services']['database'] = {'healthy': False, 'error': str(e)[:50]} - return jsonify(overview) +def _check_temp_dirs(): + dirs = ['/tmp', '/var/tmp'] + results = [] + for d in dirs: + exists = os.path.isdir(d) and os.access(d, os.R_OK | os.W_OK) + results.append({ + 'path': d, + 'exists': exists, + 'writable': os.access(d, os.W_OK) if exists else False + }) + all_ok = all(r['exists'] and r['writable'] for r in results) + return { + 'status': 'ok' if all_ok else 'critical', + 'details': results + }