From 2af4dffcc6beb813f7ceba0af7268b75ff75e6c9 Mon Sep 17 00:00:00 2001 From: OG T Date: Sat, 11 Apr 2026 02:50:26 +0800 Subject: [PATCH] =?UTF-8?q?fix(security):=20Architecture=20Review=20?= =?UTF-8?q?=E4=BF=AE=E5=BE=A9=205=20=E9=A0=85=E9=AB=98=E4=BF=A1=E5=BF=83?= =?UTF-8?q?=E5=95=8F=E9=A1=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 安全修復 (P0): 1. ssh_provider: 新增 _validate_param() 白名單驗證,防止 command injection - container_name/service/filter_name: [a-zA-Z0-9._-]{1,128} - compose_dir: 必須以 /opt/ 或 /srv/ 開頭,禁止 .. - domain: FQDN 白名單 - tail/port/lines: int() 轉換 + 上下限夾緊 2. ssh_provider: known_hosts=None 改為讀 SSH_MCP_KNOWN_HOSTS_FILE 環境變數 - 預設仍 None(內網快速啟動),但啟動時寫入 warning log - 設定文件:ops/runbooks/ssh-mcp-setup.md (待補) 模組化修復 (P1): 3. km_conversion_service: 移除 import 時的 ALERT_EVENT_TYPES.update() 副作用 - ADR-071 event types 移入 alert_operation_log_repository.py 靜態集合 4. telegram_gateway: create_task() 改為 await + try/except - 避免 DB session 關閉後的競爭條件 - KM 轉換失敗記錄 warning log,不中斷主流程 5. km_conversion_service: 新增頂層 try/except,錯誤一律 error log 後 re-raise Co-Authored-By: Claude Sonnet 4.6 --- .../src/plugins/mcp/providers/ssh_provider.py | 64 +++++++++++++++---- .../alert_operation_log_repository.py | 6 ++ .../api/src/services/km_conversion_service.py | 27 +++++--- apps/api/src/services/telegram_gateway.py | 13 +++- 4 files changed, 85 insertions(+), 25 deletions(-) diff --git a/apps/api/src/plugins/mcp/providers/ssh_provider.py b/apps/api/src/plugins/mcp/providers/ssh_provider.py index 6789703c..bfe3b9a3 100644 --- a/apps/api/src/plugins/mcp/providers/ssh_provider.py +++ b/apps/api/src/plugins/mcp/providers/ssh_provider.py @@ -72,6 +72,33 @@ FORBIDDEN_PATTERNS = [ r">\s*/etc/", # 重定向到系統目錄 ] +# 參數白名單正則 — 用於 _validate_param() +# container_name / service / filter_name: 英數字、連字號、底線、點(k8s/docker 命名規範) +_RE_SAFE_NAME = re.compile(r'^[a-zA-Z0-9._-]{1,128}$') +# compose_dir: 必須以 /opt/ 或 /srv/ 開頭,不含 .. +_RE_SAFE_DIR = re.compile(r'^/(opt|srv)/[a-zA-Z0-9._/-]{1,200}$') +# domain: FQDN + 萬用字元 +_RE_SAFE_DOMAIN = re.compile(r'^(\*\.)?[a-zA-Z0-9.-]{1,253}$') + + +def _validate_param(key: str, value: str) -> str: + """ + 對用戶提供的字串參數套用白名單驗證。 + 傳回驗證後的值;驗證失敗拋出 ValueError(呼叫方應攔截並拒絕執行)。 + """ + if key in ("container_name", "filter_name", "service"): + if not _RE_SAFE_NAME.match(value): + raise ValueError(f"Unsafe {key}: {value!r}") + elif key == "compose_dir": + # 禁止路徑穿越 + if ".." in value or not _RE_SAFE_DIR.match(value): + raise ValueError(f"Unsafe compose_dir: {value!r}") + elif key == "domain": + if not _RE_SAFE_DOMAIN.match(value): + raise ValueError(f"Unsafe domain: {value!r}") + # tail / port / lines 由呼叫方 int() 轉換,不需字串白名單 + return value + # 群組 A(只讀) GROUP_A_TOOLS = { "ssh_get_top_processes", @@ -399,6 +426,7 @@ class SSHProvider(MCPToolProvider): } def _build_command(self, tool_name: str, params: dict) -> str: + # 所有接受用戶字串的工具,必須先通過 _validate_param() 白名單驗證 if tool_name == "ssh_get_top_processes": return "ps aux --sort=-%cpu | head -15" @@ -409,44 +437,44 @@ class SSHProvider(MCPToolProvider): return "free -h" if tool_name == "ssh_get_container_logs": - name = params["container_name"] - tail = int(params.get("tail", 50)) + name = _validate_param("container_name", params["container_name"]) + tail = max(1, min(int(params.get("tail", 50)), 1000)) # 上限 1000 行 return f"docker logs {name} --tail {tail} 2>&1" if tool_name == "ssh_get_container_status": - name = params["filter_name"] + name = _validate_param("filter_name", params["filter_name"]) return f"docker ps -a --filter name={name}" if tool_name == "ssh_get_service_status": - svc = params["service"] + svc = _validate_param("service", params["service"]) return f"systemctl status {svc} --no-pager -l 2>&1 | head -30" if tool_name == "ssh_check_port": - port = int(params["port"]) + port = max(1, min(int(params["port"]), 65535)) return f"ss -tlnp | grep :{port}" if tool_name == "ssh_get_nginx_error_log": - lines = int(params.get("lines", 50)) + lines = max(1, min(int(params.get("lines", 50)), 500)) # 上限 500 行 return f"tail -n {lines} /var/log/nginx/error.log 2>/dev/null || echo 'Log not found'" if tool_name == "ssh_get_swap_info": return "swapon --show; echo '---'; free -h" if tool_name == "ssh_docker_restart": - name = params["container_name"] + name = _validate_param("container_name", params["container_name"]) return f"docker restart {name}" if tool_name == "ssh_docker_compose_restart": - compose_dir = params["compose_dir"] - service = params["service"] + compose_dir = _validate_param("compose_dir", params["compose_dir"]) + service = _validate_param("service", params["service"]) return f"cd {compose_dir} && docker compose restart {service}" if tool_name == "ssh_systemctl_restart": - svc = params["service"] + svc = _validate_param("service", params["service"]) return f"systemctl restart {svc}" if tool_name == "ssh_clear_docker_logs": - name = params["container_name"] + name = _validate_param("container_name", params["container_name"]) # 透過 docker inspect 取得 log 路徑,再截斷 return ( f"LOG_PATH=$(docker inspect --format='{{{{.LogPath}}}}' {name} 2>/dev/null) " @@ -455,7 +483,7 @@ class SSHProvider(MCPToolProvider): ) if tool_name == "ssh_renew_ssl": - domain = params["domain"] + domain = _validate_param("domain", params["domain"]) return f"/snap/bin/certbot renew --cert-name {domain} --non-interactive 2>&1" if tool_name == "ssh_reload_nginx": @@ -484,11 +512,21 @@ class SSHProvider(MCPToolProvider): "Ensure K8s Secret 'ssh-mcp-key' is mounted correctly." ) + # known_hosts: 預設 None(內網快速啟動) + # 生產強化方式:設定 SSH_MCP_KNOWN_HOSTS_FILE 指向 ssh-keyscan 產生的文件 + import os as _os + known_hosts_path = _os.environ.get("SSH_MCP_KNOWN_HOSTS_FILE", None) + if known_hosts_path is None: + logger.warning( + "ssh_mcp.known_hosts_disabled", + note="Set SSH_MCP_KNOWN_HOSTS_FILE env var to enable host key verification", + ) + async with asyncssh.connect( host, username=SSH_USER, client_keys=[SSH_KEY_PATH], - known_hosts=None, # 內網信任,不驗證 known_hosts + known_hosts=known_hosts_path, # None = 跳過驗證(內網),或指定文件路徑 connect_timeout=timeout, ) as conn: result = await asyncssh.run(conn, cmd, timeout=timeout, check=False) diff --git a/apps/api/src/repositories/alert_operation_log_repository.py b/apps/api/src/repositories/alert_operation_log_repository.py index 956bf7c2..540be94f 100644 --- a/apps/api/src/repositories/alert_operation_log_repository.py +++ b/apps/api/src/repositories/alert_operation_log_repository.py @@ -44,6 +44,12 @@ ALERT_EVENT_TYPES = { "BACKUP_FAILED", "APPROVAL_ESCALATED", "CHANGE_APPLIED", + # ADR-071 通知生命週期 (2026-04-11 Claude Sonnet 4.6 Asia/Taipei) + "NOTIFICATION_CLASSIFIED", + "MANUAL_FIX_RECORDED", + "KM_CONVERTED", + "PLAYBOOK_DRAFT_CREATED", + "STATE_GUARD_BLOCKED", } diff --git a/apps/api/src/services/km_conversion_service.py b/apps/api/src/services/km_conversion_service.py index 06b833eb..2bb32cd2 100644 --- a/apps/api/src/services/km_conversion_service.py +++ b/apps/api/src/services/km_conversion_service.py @@ -33,14 +33,12 @@ from src.services.knowledge_service import get_knowledge_service logger = structlog.get_logger(__name__) -# 加入 ADR-071 新 event_type(避免 validation 攔截) -ALERT_EVENT_TYPES.update({ - "KM_CONVERTED", - "NOTIFICATION_CLASSIFIED", - "MANUAL_FIX_RECORDED", - "PLAYBOOK_DRAFT_CREATED", - "STATE_GUARD_BLOCKED", -}) +# ADR-071 新 event_type 已在 migrations/adr071_notification_lifecycle.sql 的 DB enum 中定義 +# 不在此做 runtime ALERT_EVENT_TYPES.update():避免模組副作用污染全局狀態 + 測試隔離失敗 +# 若 repository 層有 ALERT_EVENT_TYPES 白名單 validation, +# 請在 alert_operation_log_repository.py 的 ALERT_EVENT_TYPES 靜態集合中加入以下值: +# "KM_CONVERTED", "NOTIFICATION_CLASSIFIED", "MANUAL_FIX_RECORDED", +# "PLAYBOOK_DRAFT_CREATED", "STATE_GUARD_BLOCKED" # 通知類型 → KM 品質等級對應 _TYPE_TO_STATUS = { @@ -78,11 +76,22 @@ class KMConversionService: 將 Incident 轉換為 KnowledgeEntry Args: - incident: Incident ORM 物件 + incident: Incident ORM 物件(呼叫前確保 signals 已 eager load) Returns: dict with km_entry_id and quality_level, or None if skipped """ + try: + return await self._convert_inner(incident) + except Exception as e: + logger.error( + "km_conversion_error", + incident_id=getattr(incident, "incident_id", "unknown"), + error=str(e), + ) + raise + + async def _convert_inner(self, incident) -> dict | None: notification_type = getattr(incident, "notification_type", None) or "TYPE-3" # TYPE-1 不轉 KM diff --git a/apps/api/src/services/telegram_gateway.py b/apps/api/src/services/telegram_gateway.py index d7aadbf4..6f5f118e 100644 --- a/apps/api/src/services/telegram_gateway.py +++ b/apps/api/src/services/telegram_gateway.py @@ -2731,13 +2731,20 @@ class TelegramGateway: success=True, ) - # 觸發 KM 轉換(重讀最新 incident) + # 觸發 KM 轉換(直接 await,避免 create_task() 在 DB session 關閉後的競爭條件) + # 重讀 incident 確保 manual_fix_steps 已寫入 incident_updated = await incident_repo.get_by_id(approval.incident_id) if incident_updated: from src.services.km_conversion_service import get_km_conversion_service km_svc = get_km_conversion_service() - import asyncio as _asyncio - _asyncio.create_task(km_svc.convert(incident_updated)) + try: + await km_svc.convert(incident_updated) + except Exception as _km_err: + logger.warning( + "km_conversion_failed", + incident_id=approval.incident_id, + error=str(_km_err), + ) # 回覆確認 await self._send_request("sendMessage", {