Files
ewoooc/routes/system_routes.py
ogt 96e19b6b72
All checks were successful
CD Pipeline / deploy (push) Successful in 1m18s
security: harden system_routes.py — auth + input validation
Issues fixed:

1. [CRITICAL] No authentication on destructive routes (CWE-306)
   POST /api/system/cleanup/docker was unauthenticated (system_bp is
   CSRF-exempt, before_request only refreshes session, no login check).
   Any unauthenticated HTTP client could trigger docker system prune.
   Fix: _require_internal_key() checks X-Internal-Key header against
   INTERNAL_API_KEY env var on all 4 routes; fail-secure if key unset.

2. [MEDIUM] Unvalidated numeric inputs in find commands (CWE-20)
   max_size_mb / older_than_hours came from POST body and were
   interpolated into find -size / -mmin args. Negative/huge values
   could cause unexpected behavior.
   Fix: _validate_int() clamps to [1..10000] / [1..8760] with defaults.

3. [LOW] find -mmin arg missing leading '+' (logic bug)
   '-mmin 168' matches FILES EXACTLY 168 min old, not older-than.
   Fix: '-mmin', f'+{older_than_hours * 60}' (+ = older than)

4. [LOW] subprocess(['date', ...]) in health_check replaced
   with Python datetime.now(UTC).isoformat() — no subprocess needed.

INTERNAL_API_KEY added to .env.example with generation instructions.
Generate with: openssl rand -hex 32

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-20 05:47:04 +08:00

380 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from flask import Blueprint, jsonify, request
import subprocess
import os
system_bp = Blueprint('system', __name__, url_prefix='/api/system')
# =============================================================================
# 安全常數
# =============================================================================
# 內部 API 金鑰(必須透過環境變數設定,不可為空)
_INTERNAL_API_KEY = os.environ.get('INTERNAL_API_KEY', '')
# 輸入數值邊界(防止過大/負數值觸發危險行為)
_MAX_SIZE_MB_LIMIT = 10_000 # 最大 10GB
_MIN_HOURS = 1 # 最少 1 小時
_MAX_HOURS = 8_760 # 最多 365 天
def _require_internal_key():
"""
驗證 X-Internal-Key 標頭。
系統維護路由cleanup、health僅限內部服務呼叫。
回傳 (ok: bool, error_response | None)
"""
if not _INTERNAL_API_KEY:
# 未設定金鑰時拒絕所有請求fail-secure
return False, (jsonify({
'success': False,
'error': '伺服器未設定 INTERNAL_API_KEY拒絕存取'
}), 503)
provided = request.headers.get('X-Internal-Key', '')
if not provided or provided != _INTERNAL_API_KEY:
return False, (jsonify({
'success': False,
'error': '未授權:缺少或無效的 X-Internal-Key'
}), 401)
return True, None
def _validate_int(value, min_val, max_val, default):
"""
驗證整數輸入在安全範圍內。
非整數、超出範圍一律回傳 default。
"""
try:
v = int(value)
if min_val <= v <= max_val:
return v
except (TypeError, ValueError):
pass
return default
# =============================================================================
# 系统清理与维护
# =============================================================================
@system_bp.route('/cleanup/docker', methods=['POST'])
def cleanup_docker():
"""Docker 系统清理(安全加固版,需 X-Internal-Key"""
ok, err = _require_internal_key()
if not ok:
return err
data = request.get_json() or {}
dry_run = bool(data.get('dry_run', False))
confirm = bool(data.get('confirm', False))
if not confirm:
return jsonify({
'success': False,
'error': '缺少确认标记,请明确传递 confirm=true',
'dry_run': dry_run
}), 400
# 靜態指令,不含任何使用者輸入
cmd_parts = [
'docker', 'system', 'prune',
'-f',
'--filter', 'until=24h'
]
try:
result = subprocess.run(
cmd_parts,
capture_output=True,
text=True,
timeout=120
)
return jsonify({
'success': True,
'dry_run': dry_run,
'command': ' '.join(cmd_parts),
'stdout': result.stdout,
'stderr': result.stderr,
'returncode': result.returncode
})
except subprocess.TimeoutExpired:
return jsonify({
'success': False,
'error': '清理命令超时120秒',
'dry_run': dry_run
}), 504
except Exception as e:
return jsonify({
'success': False,
'error': str(e),
'dry_run': dry_run
}), 500
@system_bp.route('/cleanup/logs', methods=['POST'])
def cleanup_logs():
"""日志目录清理(需 X-Internal-Key數值輸入已嚴格驗證"""
ok, err = _require_internal_key()
if not ok:
return err
data = request.get_json() or {}
# 驗證並限制使用者輸入數值
max_size_mb = _validate_int(
data.get('max_size_mb', 500),
min_val=1, max_val=_MAX_SIZE_MB_LIMIT, default=500
)
older_than_hours = _validate_int(
data.get('older_than_hours', 168),
min_val=_MIN_HOURS, max_val=_MAX_HOURS, default=168
)
dry_run = bool(data.get('dry_run', False))
confirm = bool(data.get('confirm', False))
if not confirm:
return jsonify({
'success': False,
'error': '缺少确认标记,请明确传递 confirm=true',
'dry_run': dry_run
}), 400
log_dir = '/var/log'
if not os.path.isdir(log_dir):
return jsonify({
'success': False,
'error': f'日志目录不存在: {log_dir}'
}), 404
try:
# find 命令:全部使用 list 形式,數值已驗證為安全整數
cmd = [
'find', log_dir,
'-type', 'f',
'-mmin', f'+{older_than_hours * 60}', # 加 + 表示「超過」
'-size', f'+{max_size_mb}M',
'-print0'
]
if dry_run:
check_cmd = cmd + ['-print']
res = subprocess.run(
check_cmd,
capture_output=True,
text=True,
timeout=60
)
matched = [p for p in res.stdout.strip().split('\0') if p]
return jsonify({
'success': True,
'dry_run': True,
'matched_count': len(matched),
'matched_paths': matched,
'message': f'找到 {len(matched)} 个符合条件的日志文件dry-run未删除'
})
# 實際清理:使用 -delete比 -exec rm 更安全)
delete_cmd = cmd + ['-delete']
result = subprocess.run(
delete_cmd,
capture_output=True,
text=True,
timeout=120
)
return jsonify({
'success': True,
'dry_run': False,
'stdout': result.stdout,
'stderr': result.stderr,
'returncode': result.returncode
})
except subprocess.TimeoutExpired:
return jsonify({
'success': False,
'error': '清理命令超时120秒',
'dry_run': dry_run
}), 504
except Exception as e:
return jsonify({
'success': False,
'error': str(e),
'dry_run': dry_run
}), 500
@system_bp.route('/cleanup/temp', methods=['POST'])
def cleanup_temp():
"""临时文件清理(需 X-Internal-Key數值輸入已嚴格驗證"""
ok, err = _require_internal_key()
if not ok:
return err
data = request.get_json() or {}
older_than_hours = _validate_int(
data.get('older_than_hours', 48),
min_val=_MIN_HOURS, max_val=_MAX_HOURS, default=48
)
dry_run = bool(data.get('dry_run', False))
confirm = bool(data.get('confirm', False))
if not confirm:
return jsonify({
'success': False,
'error': '缺少确认标记,请明确传递 confirm=true',
'dry_run': dry_run
}), 400
temp_dirs = ['/tmp', '/var/tmp']
results = []
for base_dir in temp_dirs:
if not os.path.isdir(base_dir):
results.append({'dir': base_dir, 'skipped': True, 'reason': '目录不存在'})
continue
try:
cmd = [
'find', base_dir,
'-type', 'f',
'-mmin', f'+{older_than_hours * 60}',
'-print0'
]
if dry_run:
check_cmd = cmd + ['-print']
res = subprocess.run(check_cmd, capture_output=True, text=True, timeout=60)
matched = [p for p in res.stdout.strip().split('\0') if p]
results.append({
'dir': base_dir,
'dry_run': True,
'matched_count': len(matched),
'matched_paths': matched
})
else:
delete_cmd = cmd + ['-delete']
res = subprocess.run(
delete_cmd,
capture_output=True,
text=True,
timeout=120
)
results.append({
'dir': base_dir,
'dry_run': False,
'stdout': res.stdout,
'stderr': res.stderr,
'returncode': res.returncode
})
except subprocess.TimeoutExpired:
results.append({
'dir': base_dir,
'error': '超时120秒',
'dry_run': dry_run
})
except Exception as e:
results.append({
'dir': base_dir,
'error': str(e),
'dry_run': dry_run
})
return jsonify({
'success': True,
'results': results
})
@system_bp.route('/health', methods=['GET'])
def health_check():
"""系统健康检查端点(需 X-Internal-Key"""
ok, err = _require_internal_key()
if not ok:
return err
checks = {
'disk': _check_disk_space,
'docker': _check_docker,
'temp_dirs': _check_temp_dirs
}
results = {}
for name, checker in checks.items():
results[name] = checker()
overall = all(r.get('status') == 'ok' for r in results.values())
# 使用 Python datetime 而非 subprocess date避免不必要的 subprocess 呼叫)
from datetime import datetime, timezone
timestamp = datetime.now(timezone.utc).isoformat()
return jsonify({
'status': 'ok' if overall else 'degraded',
'checks': results,
'timestamp': timestamp
})
# =============================================================================
# 内部辅助函数
# =============================================================================
def _check_disk_space():
try:
result = subprocess.run(
['df', '-h', '/'],
capture_output=True,
text=True,
timeout=10
)
lines = result.stdout.strip().split('\n')
if len(lines) < 2:
return {'status': 'error', 'message': '无法解析磁盘信息'}
usage = lines[1].split()
pct = int(usage[4].replace('%', ''))
status = 'ok' if pct < 80 else 'warning' if pct < 90 else 'critical'
return {
'status': status,
'usage_percent': pct,
'output': lines[1]
}
except Exception as e:
return {'status': 'error', 'message': str(e)}
def _check_docker():
try:
result = subprocess.run(
['docker', 'info'],
capture_output=True,
text=True,
timeout=10
)
ok = result.returncode == 0
return {
'status': 'ok' if ok else 'critical',
'docker_available': ok
}
except Exception as e:
return {'status': 'critical', 'error': str(e)}
def _check_temp_dirs():
dirs = ['/tmp', '/var/tmp']
results = []
for d in dirs:
exists = os.path.isdir(d) and os.access(d, os.R_OK | os.W_OK)
results.append({
'path': d,
'exists': exists,
'writable': os.access(d, os.W_OK) if exists else False
})
all_ok = all(r['exists'] and r['writable'] for r in results)
return {
'status': 'ok' if all_ok else 'critical',
'details': results
}