All checks were successful
CD Pipeline / deploy (push) Successful in 1m18s
Issues fixed:
1. [CRITICAL] No authentication on destructive routes (CWE-306)
POST /api/system/cleanup/docker was unauthenticated (system_bp is
CSRF-exempt, before_request only refreshes session, no login check).
Any unauthenticated HTTP client could trigger docker system prune.
Fix: _require_internal_key() checks X-Internal-Key header against
INTERNAL_API_KEY env var on all 4 routes; fail-secure if key unset.
2. [MEDIUM] Unvalidated numeric inputs in find commands (CWE-20)
max_size_mb / older_than_hours came from POST body and were
interpolated into find -size / -mmin args. Negative/huge values
could cause unexpected behavior.
Fix: _validate_int() clamps to [1..10000] / [1..8760] with defaults.
3. [LOW] find -mmin arg missing leading '+' (logic bug)
'-mmin 168' matches FILES EXACTLY 168 min old, not older-than.
Fix: '-mmin', f'+{older_than_hours * 60}' (+ = older than)
4. [LOW] subprocess(['date', ...]) in health_check replaced
with Python datetime.now(UTC).isoformat() — no subprocess needed.
INTERNAL_API_KEY added to .env.example with generation instructions.
Generate with: openssl rand -hex 32
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
380 lines
11 KiB
Python
380 lines
11 KiB
Python
from flask import Blueprint, jsonify, request
|
||
import subprocess
|
||
import os
|
||
|
||
system_bp = Blueprint('system', __name__, url_prefix='/api/system')
|
||
|
||
# =============================================================================
|
||
# 安全常數
|
||
# =============================================================================
|
||
|
||
# 內部 API 金鑰(必須透過環境變數設定,不可為空)
|
||
_INTERNAL_API_KEY = os.environ.get('INTERNAL_API_KEY', '')
|
||
|
||
# 輸入數值邊界(防止過大/負數值觸發危險行為)
|
||
_MAX_SIZE_MB_LIMIT = 10_000 # 最大 10GB
|
||
_MIN_HOURS = 1 # 最少 1 小時
|
||
_MAX_HOURS = 8_760 # 最多 365 天
|
||
|
||
|
||
def _require_internal_key():
|
||
"""
|
||
驗證 X-Internal-Key 標頭。
|
||
系統維護路由(cleanup、health)僅限內部服務呼叫。
|
||
回傳 (ok: bool, error_response | None)
|
||
"""
|
||
if not _INTERNAL_API_KEY:
|
||
# 未設定金鑰時,拒絕所有請求(fail-secure)
|
||
return False, (jsonify({
|
||
'success': False,
|
||
'error': '伺服器未設定 INTERNAL_API_KEY,拒絕存取'
|
||
}), 503)
|
||
|
||
provided = request.headers.get('X-Internal-Key', '')
|
||
if not provided or provided != _INTERNAL_API_KEY:
|
||
return False, (jsonify({
|
||
'success': False,
|
||
'error': '未授權:缺少或無效的 X-Internal-Key'
|
||
}), 401)
|
||
|
||
return True, None
|
||
|
||
|
||
def _validate_int(value, min_val, max_val, default):
|
||
"""
|
||
驗證整數輸入在安全範圍內。
|
||
非整數、超出範圍一律回傳 default。
|
||
"""
|
||
try:
|
||
v = int(value)
|
||
if min_val <= v <= max_val:
|
||
return v
|
||
except (TypeError, ValueError):
|
||
pass
|
||
return default
|
||
|
||
|
||
# =============================================================================
|
||
# 系统清理与维护
|
||
# =============================================================================
|
||
|
||
|
||
@system_bp.route('/cleanup/docker', methods=['POST'])
|
||
def cleanup_docker():
|
||
"""Docker 系统清理(安全加固版,需 X-Internal-Key)"""
|
||
ok, err = _require_internal_key()
|
||
if not ok:
|
||
return err
|
||
|
||
data = request.get_json() or {}
|
||
dry_run = bool(data.get('dry_run', False))
|
||
confirm = bool(data.get('confirm', False))
|
||
|
||
if not confirm:
|
||
return jsonify({
|
||
'success': False,
|
||
'error': '缺少确认标记,请明确传递 confirm=true',
|
||
'dry_run': dry_run
|
||
}), 400
|
||
|
||
# 靜態指令,不含任何使用者輸入
|
||
cmd_parts = [
|
||
'docker', 'system', 'prune',
|
||
'-f',
|
||
'--filter', 'until=24h'
|
||
]
|
||
|
||
try:
|
||
result = subprocess.run(
|
||
cmd_parts,
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=120
|
||
)
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'dry_run': dry_run,
|
||
'command': ' '.join(cmd_parts),
|
||
'stdout': result.stdout,
|
||
'stderr': result.stderr,
|
||
'returncode': result.returncode
|
||
})
|
||
except subprocess.TimeoutExpired:
|
||
return jsonify({
|
||
'success': False,
|
||
'error': '清理命令超时(120秒)',
|
||
'dry_run': dry_run
|
||
}), 504
|
||
except Exception as e:
|
||
return jsonify({
|
||
'success': False,
|
||
'error': str(e),
|
||
'dry_run': dry_run
|
||
}), 500
|
||
|
||
|
||
@system_bp.route('/cleanup/logs', methods=['POST'])
|
||
def cleanup_logs():
|
||
"""日志目录清理(需 X-Internal-Key;數值輸入已嚴格驗證)"""
|
||
ok, err = _require_internal_key()
|
||
if not ok:
|
||
return err
|
||
|
||
data = request.get_json() or {}
|
||
|
||
# 驗證並限制使用者輸入數值
|
||
max_size_mb = _validate_int(
|
||
data.get('max_size_mb', 500),
|
||
min_val=1, max_val=_MAX_SIZE_MB_LIMIT, default=500
|
||
)
|
||
older_than_hours = _validate_int(
|
||
data.get('older_than_hours', 168),
|
||
min_val=_MIN_HOURS, max_val=_MAX_HOURS, default=168
|
||
)
|
||
dry_run = bool(data.get('dry_run', False))
|
||
confirm = bool(data.get('confirm', False))
|
||
|
||
if not confirm:
|
||
return jsonify({
|
||
'success': False,
|
||
'error': '缺少确认标记,请明确传递 confirm=true',
|
||
'dry_run': dry_run
|
||
}), 400
|
||
|
||
log_dir = '/var/log'
|
||
if not os.path.isdir(log_dir):
|
||
return jsonify({
|
||
'success': False,
|
||
'error': f'日志目录不存在: {log_dir}'
|
||
}), 404
|
||
|
||
try:
|
||
# find 命令:全部使用 list 形式,數值已驗證為安全整數
|
||
cmd = [
|
||
'find', log_dir,
|
||
'-type', 'f',
|
||
'-mmin', f'+{older_than_hours * 60}', # 加 + 表示「超過」
|
||
'-size', f'+{max_size_mb}M',
|
||
'-print0'
|
||
]
|
||
|
||
if dry_run:
|
||
check_cmd = cmd + ['-print']
|
||
res = subprocess.run(
|
||
check_cmd,
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=60
|
||
)
|
||
matched = [p for p in res.stdout.strip().split('\0') if p]
|
||
return jsonify({
|
||
'success': True,
|
||
'dry_run': True,
|
||
'matched_count': len(matched),
|
||
'matched_paths': matched,
|
||
'message': f'找到 {len(matched)} 个符合条件的日志文件(dry-run,未删除)'
|
||
})
|
||
|
||
# 實際清理:使用 -delete(比 -exec rm 更安全)
|
||
delete_cmd = cmd + ['-delete']
|
||
result = subprocess.run(
|
||
delete_cmd,
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=120
|
||
)
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'dry_run': False,
|
||
'stdout': result.stdout,
|
||
'stderr': result.stderr,
|
||
'returncode': result.returncode
|
||
})
|
||
except subprocess.TimeoutExpired:
|
||
return jsonify({
|
||
'success': False,
|
||
'error': '清理命令超时(120秒)',
|
||
'dry_run': dry_run
|
||
}), 504
|
||
except Exception as e:
|
||
return jsonify({
|
||
'success': False,
|
||
'error': str(e),
|
||
'dry_run': dry_run
|
||
}), 500
|
||
|
||
|
||
@system_bp.route('/cleanup/temp', methods=['POST'])
|
||
def cleanup_temp():
|
||
"""临时文件清理(需 X-Internal-Key;數值輸入已嚴格驗證)"""
|
||
ok, err = _require_internal_key()
|
||
if not ok:
|
||
return err
|
||
|
||
data = request.get_json() or {}
|
||
older_than_hours = _validate_int(
|
||
data.get('older_than_hours', 48),
|
||
min_val=_MIN_HOURS, max_val=_MAX_HOURS, default=48
|
||
)
|
||
dry_run = bool(data.get('dry_run', False))
|
||
confirm = bool(data.get('confirm', False))
|
||
|
||
if not confirm:
|
||
return jsonify({
|
||
'success': False,
|
||
'error': '缺少确认标记,请明确传递 confirm=true',
|
||
'dry_run': dry_run
|
||
}), 400
|
||
|
||
temp_dirs = ['/tmp', '/var/tmp']
|
||
results = []
|
||
|
||
for base_dir in temp_dirs:
|
||
if not os.path.isdir(base_dir):
|
||
results.append({'dir': base_dir, 'skipped': True, 'reason': '目录不存在'})
|
||
continue
|
||
|
||
try:
|
||
cmd = [
|
||
'find', base_dir,
|
||
'-type', 'f',
|
||
'-mmin', f'+{older_than_hours * 60}',
|
||
'-print0'
|
||
]
|
||
|
||
if dry_run:
|
||
check_cmd = cmd + ['-print']
|
||
res = subprocess.run(check_cmd, capture_output=True, text=True, timeout=60)
|
||
matched = [p for p in res.stdout.strip().split('\0') if p]
|
||
results.append({
|
||
'dir': base_dir,
|
||
'dry_run': True,
|
||
'matched_count': len(matched),
|
||
'matched_paths': matched
|
||
})
|
||
else:
|
||
delete_cmd = cmd + ['-delete']
|
||
res = subprocess.run(
|
||
delete_cmd,
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=120
|
||
)
|
||
results.append({
|
||
'dir': base_dir,
|
||
'dry_run': False,
|
||
'stdout': res.stdout,
|
||
'stderr': res.stderr,
|
||
'returncode': res.returncode
|
||
})
|
||
except subprocess.TimeoutExpired:
|
||
results.append({
|
||
'dir': base_dir,
|
||
'error': '超时(120秒)',
|
||
'dry_run': dry_run
|
||
})
|
||
except Exception as e:
|
||
results.append({
|
||
'dir': base_dir,
|
||
'error': str(e),
|
||
'dry_run': dry_run
|
||
})
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'results': results
|
||
})
|
||
|
||
|
||
@system_bp.route('/health', methods=['GET'])
|
||
def health_check():
|
||
"""系统健康检查端点(需 X-Internal-Key)"""
|
||
ok, err = _require_internal_key()
|
||
if not ok:
|
||
return err
|
||
|
||
checks = {
|
||
'disk': _check_disk_space,
|
||
'docker': _check_docker,
|
||
'temp_dirs': _check_temp_dirs
|
||
}
|
||
|
||
results = {}
|
||
for name, checker in checks.items():
|
||
results[name] = checker()
|
||
|
||
overall = all(r.get('status') == 'ok' for r in results.values())
|
||
|
||
# 使用 Python datetime 而非 subprocess date(避免不必要的 subprocess 呼叫)
|
||
from datetime import datetime, timezone
|
||
timestamp = datetime.now(timezone.utc).isoformat()
|
||
|
||
return jsonify({
|
||
'status': 'ok' if overall else 'degraded',
|
||
'checks': results,
|
||
'timestamp': timestamp
|
||
})
|
||
|
||
|
||
# =============================================================================
|
||
# 内部辅助函数
|
||
# =============================================================================
|
||
|
||
def _check_disk_space():
|
||
try:
|
||
result = subprocess.run(
|
||
['df', '-h', '/'],
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=10
|
||
)
|
||
lines = result.stdout.strip().split('\n')
|
||
if len(lines) < 2:
|
||
return {'status': 'error', 'message': '无法解析磁盘信息'}
|
||
usage = lines[1].split()
|
||
pct = int(usage[4].replace('%', ''))
|
||
status = 'ok' if pct < 80 else 'warning' if pct < 90 else 'critical'
|
||
return {
|
||
'status': status,
|
||
'usage_percent': pct,
|
||
'output': lines[1]
|
||
}
|
||
except Exception as e:
|
||
return {'status': 'error', 'message': str(e)}
|
||
|
||
|
||
def _check_docker():
|
||
try:
|
||
result = subprocess.run(
|
||
['docker', 'info'],
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=10
|
||
)
|
||
ok = result.returncode == 0
|
||
return {
|
||
'status': 'ok' if ok else 'critical',
|
||
'docker_available': ok
|
||
}
|
||
except Exception as e:
|
||
return {'status': 'critical', 'error': str(e)}
|
||
|
||
|
||
def _check_temp_dirs():
|
||
dirs = ['/tmp', '/var/tmp']
|
||
results = []
|
||
for d in dirs:
|
||
exists = os.path.isdir(d) and os.access(d, os.R_OK | os.W_OK)
|
||
results.append({
|
||
'path': d,
|
||
'exists': exists,
|
||
'writable': os.access(d, os.W_OK) if exists else False
|
||
})
|
||
all_ok = all(r['exists'] and r['writable'] for r in results)
|
||
return {
|
||
'status': 'ok' if all_ok else 'critical',
|
||
'details': results
|
||
}
|