Files
ewoooc/routes/cicd_routes.py
ogt 903cf1a27a
All checks were successful
CD Pipeline / deploy (push) Successful in 1m5s
fix: align deploy health checks with live endpoint
2026-06-25 14:45:02 +08:00

1024 lines
35 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# =============================================================================
# WOOO TECH - Momo Pro System
# 部署監控路由
# =============================================================================
from flask import Blueprint, jsonify, render_template, request
import requests
import subprocess
from datetime import datetime
import json
import logging
import os
import re
from urllib.parse import urlparse
cicd_bp = Blueprint('cicd', __name__)
cicd_log = logging.getLogger('cicd_routes')
# =============================================================================
# 錯誤分類與修復建議
# =============================================================================
ERROR_PATTERNS = {
'registry_connection': {
'pattern': r'(connection refused|registry.*unreachable|pull.*failed)',
'message': 'Registry 連線失敗',
'severity': 'critical',
'fix_suggestion': '檢查 Registry 服務狀態,可嘗試自動修復',
'auto_fixable': True,
'fix_action': 'restart_registry'
},
'deploy_timeout': {
'pattern': r'(timed out|timeout|deadline exceeded)',
'message': '部署操作超時',
'severity': 'warning',
'fix_suggestion': '網路可能不穩定,請稍後重試或檢查 Gitea/CD runner 與 188 Docker Compose 狀態',
'auto_fixable': False
},
'runtime_crash': {
'pattern': r'(CrashLoopBackOff|OOMKilled|Error|ImagePullBackOff)',
'message': '容器或舊叢集狀態異常',
'severity': 'critical',
'fix_suggestion': 'EwoooC 現行 runtime 是 188 Docker Compose請依 DevOps 手冊檢查容器與 /health',
'auto_fixable': False
},
'test_failed': {
'pattern': r'(pytest.*failed|test.*error|AssertionError)',
'message': '測試失敗',
'severity': 'warning',
'fix_suggestion': '檢查測試程式碼或修復功能錯誤',
'auto_fixable': False
},
'build_failed': {
'pattern': r'(docker.*build.*failed|pip.*install.*error|ModuleNotFoundError)',
'message': '建置失敗',
'severity': 'critical',
'fix_suggestion': '檢查 Dockerfile 或 requirements.txt',
'auto_fixable': False
},
'ssh_failed': {
'pattern': r'(ssh.*connection.*refused|Permission denied|Host key verification)',
'message': 'SSH 連線失敗',
'severity': 'critical',
'fix_suggestion': '檢查 SSH 金鑰配置或網路連線',
'auto_fixable': False
}
}
def analyze_error(text):
"""分析錯誤文字並返回結構化錯誤信息"""
if not text:
return None
for error_type, config in ERROR_PATTERNS.items():
if re.search(config['pattern'], text, re.IGNORECASE):
return {
'type': error_type,
'message': config['message'],
'severity': config['severity'],
'fix_suggestion': config['fix_suggestion'],
'auto_fixable': config['auto_fixable'],
'fix_action': config.get('fix_action')
}
return None
# GitLab 配置
GITLAB_URL = os.environ.get('GITLAB_URL', 'http://192.168.0.110:8929')
GITLAB_TOKEN = os.environ.get('GITLAB_TOKEN', '')
GITLAB_PROJECT_ID = os.environ.get('GITLAB_PROJECT_ID', '1')
GITLAB_ENABLED = (
os.environ.get('GITLAB_ENABLED', 'false').lower() in {'1', 'true', 'yes', 'on'}
and bool(GITLAB_TOKEN)
)
if not GITLAB_ENABLED:
cicd_log.info(
'[CI/CD] GitLab legacy API disabled; set GITLAB_ENABLED=true and GITLAB_TOKEN to query legacy pipelines.'
)
# 環境配置
def _normalize_base_url(value, fallback='https://mo.wooo.work'):
candidate = (value or '').strip().rstrip('/') or fallback
parsed = urlparse(candidate)
if parsed.scheme not in {'http', 'https'} or not parsed.netloc:
return fallback.rstrip('/')
return candidate
def _health_endpoint_for(base_url):
return f"{base_url.rstrip('/')}/health"
PUBLIC_BASE_URL = _normalize_base_url(os.getenv('PUBLIC_URL'), 'https://mo.wooo.work')
CICD_UAT_BASE_URL = _normalize_base_url(
os.getenv('CICD_UAT_BASE_URL')
or os.getenv('MOMO_BASE_URL')
or PUBLIC_BASE_URL,
PUBLIC_BASE_URL,
)
CICD_PROD_BASE_URL = _normalize_base_url(
os.getenv('CICD_PROD_BASE_URL')
or os.getenv('PROD_BASE_URL')
or PUBLIC_BASE_URL,
PUBLIC_BASE_URL,
)
ENVIRONMENTS = {
'uat': {
'name': 'LIVE',
'label': '線上入口',
'color': '#3498db',
'icon': '🟦',
'url': CICD_UAT_BASE_URL,
'health_endpoint': _health_endpoint_for(CICD_UAT_BASE_URL),
'runtime_host': '192.168.0.188'
},
'prod': {
'name': 'PROD',
'label': '正式環境',
'color': '#e74c3c',
'icon': '🟥',
'url': CICD_PROD_BASE_URL,
'health_endpoint': _health_endpoint_for(CICD_PROD_BASE_URL),
'runtime_host': '192.168.0.188'
}
}
def _public_health_error(exc):
text = str(exc or '').lower()
if 'timeout' in text or 'timed out' in text:
return '健康檢查逾時請確認正式入口、Nginx 與 188 應用容器狀態。'
if 'connection' in text or 'refused' in text or 'max retries' in text:
return '健康檢查無法連線請確認正式入口、Nginx 與 188 應用容器狀態。'
return '健康檢查暫時無法完成,請稍後重試或查看部署診斷。'
# =============================================================================
# 部署監控頁面
# =============================================================================
@cicd_bp.route('/cicd')
def cicd_dashboard():
"""部署監控主頁面"""
return render_template('cicd_dashboard.html', active_page='cicd')
# =============================================================================
# API 端點
# =============================================================================
@cicd_bp.route('/api/cicd/status')
def get_cicd_status():
"""取得完整的 CI/CD 狀態"""
try:
# 收集所有狀態
pipelines = get_recent_pipelines(limit=10)
latest_pipeline = pipelines[0] if pipelines else None
environments = get_all_environments_status()
# 取得最新部署流程的詳細工作項目資訊
latest_jobs = []
failed_jobs = []
if latest_pipeline:
latest_jobs = get_pipeline_jobs(latest_pipeline['id'])
failed_jobs = [j for j in latest_jobs if j.get('status') == 'failed']
# 如果最新部署流程失敗且有未通知的失敗工作項目,發送告警
if latest_pipeline.get('status') == 'failed' and failed_jobs:
# 使用緩存避免重複通知
cache_key = f"pipeline_alert_{latest_pipeline['id']}"
if not hasattr(get_cicd_status, '_alert_cache'):
get_cicd_status._alert_cache = set()
if cache_key not in get_cicd_status._alert_cache:
send_pipeline_failure_alert(latest_pipeline, failed_jobs)
get_cicd_status._alert_cache.add(cache_key)
# 限制緩存大小
if len(get_cicd_status._alert_cache) > 100:
get_cicd_status._alert_cache = set(list(get_cicd_status._alert_cache)[-50:])
# 生成問題摘要
issues = []
# 檢查環境問題
for env_id, env_status in environments.items():
if not env_status.get('healthy'):
issues.append({
'type': 'environment',
'environment': env_id,
'message': f"{env_status.get('name')} 環境異常",
'error': env_status.get('display_error') or env_status.get('error'),
'severity': 'critical',
'auto_fixable': True,
'fix_action': 'diagnose'
})
# 舊 runtime 資訊若由歷史資料帶入,只允許診斷,不觸發重啟副作用。
for pod in env_status.get('pods', []):
if not pod.get('healthy'):
issues.append({
'type': 'runtime',
'environment': env_id,
'message': f"服務 {pod.get('name')} 不健康",
'status': pod.get('status'),
'restarts': pod.get('restarts'),
'severity': 'warning' if pod.get('restarts', 0) < 5 else 'critical',
'auto_fixable': True,
'fix_action': 'diagnose'
})
# 檢查 Pipeline 問題
for job in failed_jobs:
error_info = job.get('error_info') or {}
issues.append({
'type': 'job',
'job_name': job.get('name'),
'stage': job.get('stage'),
'message': error_info.get('message', job.get('failure_reason', 'Job 失敗')),
'error_log': job.get('error_log', '')[:500], # 限制長度
'severity': error_info.get('severity', 'warning'),
'auto_fixable': error_info.get('auto_fixable', False),
'fix_action': error_info.get('fix_action'),
'fix_suggestion': error_info.get('fix_suggestion')
})
return jsonify({
'success': True,
'timestamp': datetime.now().isoformat(),
'latest_pipeline': latest_pipeline,
'latest_jobs': latest_jobs,
'pipelines': pipelines,
'environments': environments,
'issues': issues,
'summary': {
'total_pipelines_today': sum(1 for p in pipelines if is_today(p.get('created_at', ''))),
'success_rate': calculate_success_rate(pipelines),
'uat_healthy': environments.get('uat', {}).get('healthy', False),
'prod_healthy': environments.get('prod', {}).get('healthy', False),
'issues_count': len(issues),
'critical_issues': sum(1 for i in issues if i.get('severity') == 'critical')
}
})
except Exception as e:
return jsonify({
'success': False,
'error': str(e),
'timestamp': datetime.now().isoformat()
}), 500
@cicd_bp.route('/api/cicd/pipelines')
def get_pipelines():
"""取得最近的 Pipeline 列表"""
limit = request.args.get('limit', 20, type=int)
pipelines = get_recent_pipelines(limit=limit)
return jsonify({
'success': True,
'pipelines': pipelines,
'count': len(pipelines)
})
@cicd_bp.route('/api/cicd/pipeline/<int:pipeline_id>')
def get_pipeline_detail(pipeline_id):
"""取得單一 Pipeline 詳細資訊"""
try:
pipeline = get_pipeline_info(pipeline_id)
jobs = get_pipeline_jobs(pipeline_id)
return jsonify({
'success': True,
'pipeline': pipeline,
'jobs': jobs,
'stages': extract_stages(jobs)
})
except Exception as e:
return jsonify({
'success': False,
'error': '部署監控暫時無法完成,請稍後重試或查看服務健康狀態。'
}), 500
@cicd_bp.route('/api/cicd/environments')
def get_environments():
"""取得所有環境狀態"""
environments = get_all_environments_status()
return jsonify({
'success': True,
'environments': environments,
'timestamp': datetime.now().isoformat()
})
@cicd_bp.route('/api/cicd/environment/<env_id>')
def get_environment_detail(env_id):
"""取得單一環境詳細狀態"""
if env_id not in ENVIRONMENTS:
return jsonify({'success': False, 'error': 'Unknown environment'}), 404
env_status = get_environment_status(env_id)
return jsonify({
'success': True,
'environment': env_status
})
@cicd_bp.route('/api/cicd/deploy', methods=['POST'])
def trigger_deploy():
"""手動觸發部署"""
data = request.get_json() or {}
env = data.get('environment', 'uat')
if env not in ENVIRONMENTS:
return jsonify({'success': False, 'error': 'Unknown environment'}), 400
# 觸發 GitLab Pipeline
try:
result = trigger_gitlab_pipeline(ref='main', variables={
'DEPLOY_ENV': env
})
return jsonify({
'success': True,
'message': f'已觸發 {ENVIRONMENTS[env]["name"]} 部署',
'pipeline': result
})
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
}), 500
@cicd_bp.route('/api/cicd/rollback', methods=['POST'])
def trigger_rollback():
"""觸發回滾"""
data = request.get_json() or {}
env = data.get('environment', 'uat')
if env not in ENVIRONMENTS:
return jsonify({'success': False, 'error': 'Unknown environment'}), 400
return jsonify({
'success': False,
'error': (
'舊叢集回滾已停用。EwoooC 現行 runtime 是 188 Docker Compose'
'請依 Gitea CD / docs/guides/deployment_sop.md 執行人工回滾。'
)
}), 410
# =============================================================================
# 自動修復 API
# =============================================================================
# 只允許這幾種 fix_action任何不在清單的請求直接 400。
# 舊叢集自動重啟已依 ADR-008/011 停用,避免誤打已撤除 runtime。
ALLOWED_FIX_ACTIONS = frozenset({'restart_registry', 'diagnose', 'full_repair'})
@cicd_bp.route('/api/cicd/auto-fix', methods=['POST'])
def trigger_auto_fix():
"""自動診斷並修復問題allowlist 嚴格過濾)"""
data = request.get_json() or {}
fix_action = data.get('action')
env = data.get('environment', 'uat')
if env not in ENVIRONMENTS:
return jsonify({'success': False, 'error': '未知環境'}), 400
# Security: 嚴格 allowlist防止任意 action 注入
if fix_action not in ALLOWED_FIX_ACTIONS:
return jsonify({'success': False, 'error': f'不允許的修復動作: {fix_action}'}), 400
results = []
try:
if fix_action == 'restart_registry':
result = fix_registry()
results.append(result)
elif fix_action == 'diagnose':
result = run_diagnosis(env)
results.append(result)
elif fix_action == 'full_repair':
# 完整修復保留 registry best-effortruntime 只診斷,不做舊叢集重啟。
results.append(fix_registry())
results.append(run_diagnosis(env))
# 發送 Telegram 通知
send_fix_notification(env, fix_action, results)
return jsonify({
'success': True,
'message': f'已執行修復動作: {fix_action}',
'results': results
})
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
}), 500
@cicd_bp.route('/api/cicd/diagnose', methods=['POST'])
def diagnose_environment():
"""診斷環境問題"""
data = request.get_json() or {}
env = data.get('environment', 'uat')
if env not in ENVIRONMENTS:
return jsonify({'success': False, 'error': '未知環境'}), 400
try:
diagnosis = run_diagnosis(env)
return jsonify({
'success': True,
'diagnosis': diagnosis
})
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
}), 500
def fix_registry():
"""修復 Registry 服務"""
try:
result = subprocess.run(
['ssh', '-o', 'StrictHostKeyChecking=no', '-o', 'ConnectTimeout=5',
'wooo@192.168.0.110',
'cd /home/wooo/devops/registry && docker compose restart'],
capture_output=True, text=True, timeout=60
)
return {
'action': 'restart_registry',
'success': result.returncode == 0,
'output': result.stdout,
'error': result.stderr if result.returncode != 0 else None
}
except Exception as e:
return {'action': 'restart_registry', 'success': False, 'error': str(e)}
def run_diagnosis(env):
"""執行環境診斷"""
diagnosis = {
'action': 'diagnose',
'environment': env,
'timestamp': datetime.now().isoformat(),
'checks': []
}
try:
# 檢查健康端點
env_config = ENVIRONMENTS.get(env, {})
try:
# 內部健康檢查,可能使用自簽憑證 nosec B501
response = requests.get(env_config.get('health_endpoint'), timeout=10, verify=False) # nosec B501
diagnosis['checks'].append({
'name': '健康端點',
'status': 'ok' if response.status_code == 200 else 'failed',
'response_time': response.elapsed.total_seconds() * 1000,
'status_code': response.status_code
})
except Exception as e:
diagnosis['checks'].append({
'name': '健康端點',
'status': 'failed',
'error': _public_health_error(e)
})
# EwoooC 已撤除舊叢集 runtime這裡只保留現行 Docker Compose 狀態說明。
diagnosis['checks'].append({
'name': '執行環境狀態',
'status': 'ok',
'runtime': 'Docker Compose on 192.168.0.188',
'details': '舊叢集探測已停用;容器狀態請依 DevOps 手冊在 188 查 docker compose / /health。'
})
# 檢查 Registry (僅 UAT)
if env == 'uat':
try:
# Registry 健康檢查 nosec B501
reg_response = requests.get('https://registry.wooo.work/v2/', timeout=10, verify=False) # nosec B501
diagnosis['checks'].append({
'name': 'Registry',
'status': 'ok' if reg_response.status_code in [200, 401] else 'failed',
'status_code': reg_response.status_code
})
except Exception as e:
diagnosis['checks'].append({
'name': 'Registry',
'status': 'failed',
'error': '映像倉庫健康檢查暫時無法完成,請查看 Registry 服務狀態。'
})
# 生成總結
failed_checks = [c for c in diagnosis['checks'] if c['status'] == 'failed']
warning_checks = [c for c in diagnosis['checks'] if c['status'] == 'warning']
diagnosis['summary'] = {
'overall_status': 'critical' if failed_checks else ('warning' if warning_checks else 'healthy'),
'failed_count': len(failed_checks),
'warning_count': len(warning_checks),
'recommendations': []
}
# 生成修復建議
for check in failed_checks:
if check['name'] == '健康端點':
diagnosis['summary']['recommendations'].append({
'action': 'diagnose',
'description': '先診斷健康端點與 188 Docker Compose 狀態,避免自動重啟資料庫或舊叢集'
})
elif check['name'] == 'Registry':
diagnosis['summary']['recommendations'].append({
'action': 'restart_registry',
'description': '建議重啟 Registry 服務'
})
except Exception as e:
diagnosis['error'] = '部署診斷暫時無法完成,請稍後重試。'
return diagnosis
# =============================================================================
# Telegram 告警
# =============================================================================
TELEGRAM_BOT_TOKEN = os.environ.get('TELEGRAM_BOT_TOKEN', '')
_chat_ids_raw = os.environ.get('TELEGRAM_CHAT_IDS', '[]')
try:
_chat_ids_list = json.loads(_chat_ids_raw)
TELEGRAM_CHAT_ID = str(_chat_ids_list[0]) if _chat_ids_list else os.environ.get('TELEGRAM_CHAT_ID', '')
except Exception:
TELEGRAM_CHAT_ID = os.environ.get('TELEGRAM_CHAT_ID', '')
if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID:
import logging
logging.getLogger('cicd_routes').warning(
'[SECURITY] TELEGRAM_BOT_TOKEN or TELEGRAM_CHAT_ID is not set. '
'Telegram notifications will silently fail. Set these environment variables.'
)
def send_telegram_message(message):
"""
發送 Telegram 訊息CI/CD pipeline 失敗等告警)
ADR-019 Phase 5: 改走 EventRouter 統一入口。
舊版 parse_mode=Markdown 改由 EventRouter 模板統一 HTMLCI/CD 訊息以 *bold*/
backtick code 為主HTML <b>/<code> 包裝由 templates 處理;既有訊息會直接
當 plain text 顯示,不再渲染 Markdown 控制字元,可讀性反而提升)。
"""
try:
from services.event_router import dispatch_sync
admin_ids = [TELEGRAM_CHAT_ID] if TELEGRAM_CHAT_ID else None
result = dispatch_sync(event={
"event_type": "cicd_pipeline_alert",
"severity": "alert",
"source": "CICD.Routes",
"title": "CI/CD 告警",
"summary": message[:400],
"status": "pipeline_event",
"payload": {"raw_message": message},
}, admin_chat_ids=admin_ids)
return bool(result.get("delivered"))
except Exception as e:
print(f"Telegram send error: {e}")
return False
def send_pipeline_failure_alert(pipeline, failed_jobs):
"""部署流程失敗時發送告警"""
job_details = '\n'.join([
f"{j['name']}: {j.get('failure_reason', '未知原因')}"
for j in failed_jobs
])
message = f"""🚨 *部署流程失敗*
📌 *部署編號:* #{pipeline.get('id')}
🌿 *分支:* `{pipeline.get('ref')}`
📝 *提交:* `{pipeline.get('sha', '')[:8]}`
❌ *失敗工作項目:*
{job_details}
🔗 [查看詳情]({pipeline.get('web_url')})
"""
return send_telegram_message(message)
def send_fix_notification(env, action, results):
"""發送修復通知"""
env_icon = '🟦' if env == 'uat' else '🟥'
env_name = ENVIRONMENTS.get(env, {}).get('name', env)
success_count = sum(1 for r in results if r.get('success'))
total_count = len(results)
status_emoji = '' if success_count == total_count else '⚠️'
result_details = '\n'.join([
f"{r.get('action')}: {'✅ 成功' if r.get('success') else '❌ 失敗'}"
for r in results
])
message = f"""{status_emoji} *部署監控自動修復執行完成*
{env_icon} *環境:* {env_name}
🔧 *動作:* {action}
📊 *結果:* {success_count}/{total_count} 成功
{result_details}
{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
"""
return send_telegram_message(message)
# =============================================================================
# 輔助函數 - GitLab API
# =============================================================================
def gitlab_api(endpoint, method='GET', data=None):
"""呼叫 GitLab API支援 SSH 備用方案)"""
if not GITLAB_ENABLED:
return None
url = f"{GITLAB_URL}/api/v4{endpoint}"
headers = {'PRIVATE-TOKEN': GITLAB_TOKEN}
try:
if method == 'GET':
response = requests.get(url, headers=headers, timeout=10)
elif method == 'POST':
response = requests.post(url, headers=headers, json=data, timeout=10)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
cicd_log.warning("[CI/CD] GitLab direct API unavailable: %s", e)
# 備用方案:透過 SSH 在主機上執行 curl
return gitlab_api_via_ssh(endpoint, method, data)
def gitlab_api_via_ssh(endpoint, method='GET', data=None):
"""
透過 SSH 在主機上呼叫 GitLab API當 app 無法直接連接時)。
Security: curl 參數以 list 形式傳給 subprocess避免 shell injection。
endpoint 和 json_data 均作為獨立 argv 傳入,不經過 shell 解析。
"""
if not GITLAB_ENABLED:
return None
try:
# 使用本地 GitLab URLendpoint 由 gitlab_api() 內部構造,不含外部輸入
url = f"http://127.0.0.1:8929/api/v4{endpoint}"
if method == 'GET':
# list 形式:每個 curl 參數獨立,不走 shell
remote_argv = [
'curl', '-s',
'-H', f'PRIVATE-TOKEN: {GITLAB_TOKEN}',
url
]
else:
json_data = json.dumps(data) if data else '{}'
remote_argv = [
'curl', '-s', '-X', 'POST',
'-H', f'PRIVATE-TOKEN: {GITLAB_TOKEN}',
'-H', 'Content-Type: application/json',
'-d', json_data,
url
]
# SSH 以 list 模式執行remote_argv 整體作為單一 SSH command 字串
# 由於 remote_argv 內的 GitLab token 和 url 均來自受控環境變數,
# 不含使用者輸入,此處拼接是安全的。
ssh_cmd = ['ssh',
'-o', 'StrictHostKeyChecking=no',
'-o', 'ConnectTimeout=5',
'wooo@192.168.0.110'] + remote_argv
result = subprocess.run(
ssh_cmd,
capture_output=True, text=True, timeout=15
)
if result.returncode == 0 and result.stdout:
return json.loads(result.stdout)
else:
cicd_log.warning("[CI/CD] GitLab SSH API unavailable: %s", result.stderr)
return None
except Exception as e:
cicd_log.warning("[CI/CD] GitLab SSH fallback failed: %s", e)
return None
def get_recent_pipelines(limit=20):
"""取得最近的 Pipeline"""
result = gitlab_api(f"/projects/{GITLAB_PROJECT_ID}/pipelines?per_page={limit}")
if not result:
return []
pipelines = []
for p in result:
pipelines.append({
'id': p.get('id'),
'status': p.get('status'),
'ref': p.get('ref'),
'sha': p.get('sha', '')[:8],
'web_url': p.get('web_url'),
'created_at': p.get('created_at'),
'updated_at': p.get('updated_at'),
'duration': calculate_duration(p.get('created_at'), p.get('updated_at')),
'status_icon': get_status_icon(p.get('status')),
'status_color': get_status_color(p.get('status'))
})
return pipelines
def get_pipeline_info(pipeline_id):
"""取得 Pipeline 詳細資訊"""
return gitlab_api(f"/projects/{GITLAB_PROJECT_ID}/pipelines/{pipeline_id}")
def get_pipeline_jobs(pipeline_id):
"""取得 Pipeline 的所有 Jobs包含錯誤詳情"""
result = gitlab_api(f"/projects/{GITLAB_PROJECT_ID}/pipelines/{pipeline_id}/jobs")
if not result:
return []
jobs = []
for j in result:
job_data = {
'id': j.get('id'),
'name': j.get('name'),
'stage': j.get('stage'),
'status': j.get('status'),
'duration': j.get('duration'),
'web_url': j.get('web_url'),
'started_at': j.get('started_at'),
'finished_at': j.get('finished_at'),
'status_icon': get_status_icon(j.get('status')),
'status_color': get_status_color(j.get('status')),
'failure_reason': j.get('failure_reason'),
'error_info': None
}
# 如果 Job 失敗,嘗試取得錯誤日誌摘要
if j.get('status') == 'failed':
error_log = get_job_error_summary(j.get('id'))
if error_log:
job_data['error_log'] = error_log
job_data['error_info'] = analyze_error(error_log)
jobs.append(job_data)
return sorted(jobs, key=lambda x: (x.get('stage', ''), x.get('name', '')))
def get_job_error_summary(job_id, max_lines=50):
"""從 GitLab 取得 Job 日誌並提取錯誤摘要"""
try:
# 取得 Job 日誌GitLab API 返回純文字)
url = f"{GITLAB_URL}/api/v4/projects/{GITLAB_PROJECT_ID}/jobs/{job_id}/trace"
headers = {'PRIVATE-TOKEN': GITLAB_TOKEN}
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200:
log_text = response.text
lines = log_text.split('\n')
# 找出錯誤相關行
error_lines = []
for i, line in enumerate(lines):
if any(keyword in line.lower() for keyword in ['error', 'failed', 'exception', 'traceback', 'fatal']):
# 取得上下文
start = max(0, i - 2)
end = min(len(lines), i + 3)
context = lines[start:end]
error_lines.extend(context)
if error_lines:
# 去重並限制長度
unique_lines = list(dict.fromkeys(error_lines))[:max_lines]
return '\n'.join(unique_lines)
# 如果沒找到明確錯誤,返回最後幾行
return '\n'.join(lines[-20:])
except Exception as e:
print(f"Error getting job log for {job_id}: {e}")
return None
def extract_stages(jobs):
"""從 Jobs 提取 Stage 資訊"""
stages = {}
stage_order = ['test', 'build', 'deploy']
for job in jobs:
stage = job.get('stage', 'unknown')
if stage not in stages:
stages[stage] = {
'name': stage,
'status': 'pending',
'jobs': [],
'duration': 0
}
stages[stage]['jobs'].append(job)
# 更新 stage 狀態
if job.get('status') == 'failed':
stages[stage]['status'] = 'failed'
elif job.get('status') == 'running' and stages[stage]['status'] != 'failed':
stages[stage]['status'] = 'running'
elif job.get('status') == 'success' and stages[stage]['status'] == 'pending':
stages[stage]['status'] = 'success'
# 累加時間
if job.get('duration'):
stages[stage]['duration'] += job.get('duration', 0)
# 按順序排列
ordered_stages = []
for stage_name in stage_order:
if stage_name in stages:
stages[stage_name]['status_icon'] = get_status_icon(stages[stage_name]['status'])
stages[stage_name]['status_color'] = get_status_color(stages[stage_name]['status'])
ordered_stages.append(stages[stage_name])
return ordered_stages
def trigger_gitlab_pipeline(ref='main', variables=None):
"""觸發 GitLab Pipeline"""
data = {'ref': ref}
if variables:
data['variables'] = [{'key': k, 'value': v} for k, v in variables.items()]
return gitlab_api(f"/projects/{GITLAB_PROJECT_ID}/pipeline", method='POST', data=data)
# =============================================================================
# 輔助函數 - 環境狀態
# =============================================================================
def get_all_environments_status():
"""取得所有環境狀態"""
environments = {}
for env_id, env_config in ENVIRONMENTS.items():
environments[env_id] = get_environment_status(env_id)
return environments
def get_environment_status(env_id):
"""取得單一環境狀態"""
env_config = ENVIRONMENTS.get(env_id, {})
status = {
'id': env_id,
'name': env_config.get('name'),
'label': env_config.get('label'),
'color': env_config.get('color'),
'icon': env_config.get('icon'),
'url': env_config.get('url'),
'healthy': False,
'pods': [],
'last_deploy': None,
'version': None,
'response_time': None
}
# 健康檢查(內部服務可能使用自簽憑證)
try:
start_time = datetime.now()
response = requests.get(
env_config.get('health_endpoint'),
timeout=10,
verify=False # nosec B501 - 內部健康檢查
)
response_time = (datetime.now() - start_time).total_seconds() * 1000
if response.status_code == 200:
health_data = response.json()
status['healthy'] = health_data.get('status') == 'healthy'
status['version'] = health_data.get('version')
status['response_time'] = round(response_time, 2)
status['last_check'] = datetime.now().isoformat()
except Exception as e:
cicd_log.warning(
"[CI/CD] Health check failed env=%s url=%s error=%s",
env_id,
env_config.get('health_endpoint'),
e,
)
status['error'] = _public_health_error(e)
status['runtime_note'] = 'Docker Compose on 192.168.0.188; legacy cluster probes disabled.'
return status
# =============================================================================
# 輔助函數 - 通用
# =============================================================================
def get_status_icon(status):
"""取得狀態圖示"""
icons = {
'success': '',
'passed': '',
'failed': '',
'running': '🔄',
'pending': '',
'canceled': '',
'skipped': '⏭️',
'manual': '👆',
'created': '🆕'
}
return icons.get(status, '')
def get_status_color(status):
"""取得狀態顏色"""
colors = {
'success': '#28a745',
'passed': '#28a745',
'failed': '#dc3545',
'running': '#007bff',
'pending': '#ffc107',
'canceled': '#6c757d',
'skipped': '#6c757d',
'manual': '#17a2b8',
'created': '#6c757d'
}
return colors.get(status, '#6c757d')
def calculate_duration(start, end):
"""計算持續時間"""
if not start or not end:
return None
try:
start_dt = datetime.fromisoformat(start.replace('Z', '+00:00'))
end_dt = datetime.fromisoformat(end.replace('Z', '+00:00'))
duration = (end_dt - start_dt).total_seconds()
if duration < 60:
return f"{int(duration)}"
elif duration < 3600:
return f"{int(duration // 60)}{int(duration % 60)}"
else:
return f"{int(duration // 3600)}{int((duration % 3600) // 60)}"
except:
return None
def calculate_age(timestamp):
"""計算年齡"""
if not timestamp:
return "未知"
try:
created = datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
now = datetime.now(created.tzinfo)
diff = now - created
if diff.days > 0:
return f"{diff.days}"
elif diff.seconds >= 3600:
return f"{diff.seconds // 3600}"
elif diff.seconds >= 60:
return f"{diff.seconds // 60}"
else:
return f"{diff.seconds}"
except:
return "未知"
def is_today(timestamp):
"""判斷是否為今天"""
if not timestamp:
return False
try:
dt = datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
return dt.date() == datetime.now().date()
except:
return False
def calculate_success_rate(pipelines):
"""計算成功率"""
if not pipelines:
return 0
success_count = sum(1 for p in pipelines if p.get('status') in ['success', 'passed'])
return round(success_count / len(pipelines) * 100, 1)