fix(security): Architecture Review 修復 5 項高信心問題
安全修復 (P0):
1. ssh_provider: 新增 _validate_param() 白名單驗證,防止 command injection
- container_name/service/filter_name: [a-zA-Z0-9._-]{1,128}
- compose_dir: 必須以 /opt/ 或 /srv/ 開頭,禁止 ..
- domain: FQDN 白名單
- tail/port/lines: int() 轉換 + 上下限夾緊
2. ssh_provider: known_hosts=None 改為讀 SSH_MCP_KNOWN_HOSTS_FILE 環境變數
- 預設仍 None(內網快速啟動),但啟動時寫入 warning log
- 設定文件:ops/runbooks/ssh-mcp-setup.md (待補)
模組化修復 (P1):
3. km_conversion_service: 移除 import 時的 ALERT_EVENT_TYPES.update() 副作用
- ADR-071 event types 移入 alert_operation_log_repository.py 靜態集合
4. telegram_gateway: create_task() 改為 await + try/except
- 避免 DB session 關閉後的競爭條件
- KM 轉換失敗記錄 warning log,不中斷主流程
5. km_conversion_service: 新增頂層 try/except,錯誤一律 error log 後 re-raise
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -72,6 +72,33 @@ FORBIDDEN_PATTERNS = [
|
||||
r">\s*/etc/", # 重定向到系統目錄
|
||||
]
|
||||
|
||||
# 參數白名單正則 — 用於 _validate_param()
|
||||
# container_name / service / filter_name: 英數字、連字號、底線、點(k8s/docker 命名規範)
|
||||
_RE_SAFE_NAME = re.compile(r'^[a-zA-Z0-9._-]{1,128}$')
|
||||
# compose_dir: 必須以 /opt/ 或 /srv/ 開頭,不含 ..
|
||||
_RE_SAFE_DIR = re.compile(r'^/(opt|srv)/[a-zA-Z0-9._/-]{1,200}$')
|
||||
# domain: FQDN + 萬用字元
|
||||
_RE_SAFE_DOMAIN = re.compile(r'^(\*\.)?[a-zA-Z0-9.-]{1,253}$')
|
||||
|
||||
|
||||
def _validate_param(key: str, value: str) -> str:
|
||||
"""
|
||||
對用戶提供的字串參數套用白名單驗證。
|
||||
傳回驗證後的值;驗證失敗拋出 ValueError(呼叫方應攔截並拒絕執行)。
|
||||
"""
|
||||
if key in ("container_name", "filter_name", "service"):
|
||||
if not _RE_SAFE_NAME.match(value):
|
||||
raise ValueError(f"Unsafe {key}: {value!r}")
|
||||
elif key == "compose_dir":
|
||||
# 禁止路徑穿越
|
||||
if ".." in value or not _RE_SAFE_DIR.match(value):
|
||||
raise ValueError(f"Unsafe compose_dir: {value!r}")
|
||||
elif key == "domain":
|
||||
if not _RE_SAFE_DOMAIN.match(value):
|
||||
raise ValueError(f"Unsafe domain: {value!r}")
|
||||
# tail / port / lines 由呼叫方 int() 轉換,不需字串白名單
|
||||
return value
|
||||
|
||||
# 群組 A(只讀)
|
||||
GROUP_A_TOOLS = {
|
||||
"ssh_get_top_processes",
|
||||
@@ -399,6 +426,7 @@ class SSHProvider(MCPToolProvider):
|
||||
}
|
||||
|
||||
def _build_command(self, tool_name: str, params: dict) -> str:
|
||||
# 所有接受用戶字串的工具,必須先通過 _validate_param() 白名單驗證
|
||||
if tool_name == "ssh_get_top_processes":
|
||||
return "ps aux --sort=-%cpu | head -15"
|
||||
|
||||
@@ -409,44 +437,44 @@ class SSHProvider(MCPToolProvider):
|
||||
return "free -h"
|
||||
|
||||
if tool_name == "ssh_get_container_logs":
|
||||
name = params["container_name"]
|
||||
tail = int(params.get("tail", 50))
|
||||
name = _validate_param("container_name", params["container_name"])
|
||||
tail = max(1, min(int(params.get("tail", 50)), 1000)) # 上限 1000 行
|
||||
return f"docker logs {name} --tail {tail} 2>&1"
|
||||
|
||||
if tool_name == "ssh_get_container_status":
|
||||
name = params["filter_name"]
|
||||
name = _validate_param("filter_name", params["filter_name"])
|
||||
return f"docker ps -a --filter name={name}"
|
||||
|
||||
if tool_name == "ssh_get_service_status":
|
||||
svc = params["service"]
|
||||
svc = _validate_param("service", params["service"])
|
||||
return f"systemctl status {svc} --no-pager -l 2>&1 | head -30"
|
||||
|
||||
if tool_name == "ssh_check_port":
|
||||
port = int(params["port"])
|
||||
port = max(1, min(int(params["port"]), 65535))
|
||||
return f"ss -tlnp | grep :{port}"
|
||||
|
||||
if tool_name == "ssh_get_nginx_error_log":
|
||||
lines = int(params.get("lines", 50))
|
||||
lines = max(1, min(int(params.get("lines", 50)), 500)) # 上限 500 行
|
||||
return f"tail -n {lines} /var/log/nginx/error.log 2>/dev/null || echo 'Log not found'"
|
||||
|
||||
if tool_name == "ssh_get_swap_info":
|
||||
return "swapon --show; echo '---'; free -h"
|
||||
|
||||
if tool_name == "ssh_docker_restart":
|
||||
name = params["container_name"]
|
||||
name = _validate_param("container_name", params["container_name"])
|
||||
return f"docker restart {name}"
|
||||
|
||||
if tool_name == "ssh_docker_compose_restart":
|
||||
compose_dir = params["compose_dir"]
|
||||
service = params["service"]
|
||||
compose_dir = _validate_param("compose_dir", params["compose_dir"])
|
||||
service = _validate_param("service", params["service"])
|
||||
return f"cd {compose_dir} && docker compose restart {service}"
|
||||
|
||||
if tool_name == "ssh_systemctl_restart":
|
||||
svc = params["service"]
|
||||
svc = _validate_param("service", params["service"])
|
||||
return f"systemctl restart {svc}"
|
||||
|
||||
if tool_name == "ssh_clear_docker_logs":
|
||||
name = params["container_name"]
|
||||
name = _validate_param("container_name", params["container_name"])
|
||||
# 透過 docker inspect 取得 log 路徑,再截斷
|
||||
return (
|
||||
f"LOG_PATH=$(docker inspect --format='{{{{.LogPath}}}}' {name} 2>/dev/null) "
|
||||
@@ -455,7 +483,7 @@ class SSHProvider(MCPToolProvider):
|
||||
)
|
||||
|
||||
if tool_name == "ssh_renew_ssl":
|
||||
domain = params["domain"]
|
||||
domain = _validate_param("domain", params["domain"])
|
||||
return f"/snap/bin/certbot renew --cert-name {domain} --non-interactive 2>&1"
|
||||
|
||||
if tool_name == "ssh_reload_nginx":
|
||||
@@ -484,11 +512,21 @@ class SSHProvider(MCPToolProvider):
|
||||
"Ensure K8s Secret 'ssh-mcp-key' is mounted correctly."
|
||||
)
|
||||
|
||||
# known_hosts: 預設 None(內網快速啟動)
|
||||
# 生產強化方式:設定 SSH_MCP_KNOWN_HOSTS_FILE 指向 ssh-keyscan 產生的文件
|
||||
import os as _os
|
||||
known_hosts_path = _os.environ.get("SSH_MCP_KNOWN_HOSTS_FILE", None)
|
||||
if known_hosts_path is None:
|
||||
logger.warning(
|
||||
"ssh_mcp.known_hosts_disabled",
|
||||
note="Set SSH_MCP_KNOWN_HOSTS_FILE env var to enable host key verification",
|
||||
)
|
||||
|
||||
async with asyncssh.connect(
|
||||
host,
|
||||
username=SSH_USER,
|
||||
client_keys=[SSH_KEY_PATH],
|
||||
known_hosts=None, # 內網信任,不驗證 known_hosts
|
||||
known_hosts=known_hosts_path, # None = 跳過驗證(內網),或指定文件路徑
|
||||
connect_timeout=timeout,
|
||||
) as conn:
|
||||
result = await asyncssh.run(conn, cmd, timeout=timeout, check=False)
|
||||
|
||||
@@ -44,6 +44,12 @@ ALERT_EVENT_TYPES = {
|
||||
"BACKUP_FAILED",
|
||||
"APPROVAL_ESCALATED",
|
||||
"CHANGE_APPLIED",
|
||||
# ADR-071 通知生命週期 (2026-04-11 Claude Sonnet 4.6 Asia/Taipei)
|
||||
"NOTIFICATION_CLASSIFIED",
|
||||
"MANUAL_FIX_RECORDED",
|
||||
"KM_CONVERTED",
|
||||
"PLAYBOOK_DRAFT_CREATED",
|
||||
"STATE_GUARD_BLOCKED",
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -33,14 +33,12 @@ from src.services.knowledge_service import get_knowledge_service
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
# 加入 ADR-071 新 event_type(避免 validation 攔截)
|
||||
ALERT_EVENT_TYPES.update({
|
||||
"KM_CONVERTED",
|
||||
"NOTIFICATION_CLASSIFIED",
|
||||
"MANUAL_FIX_RECORDED",
|
||||
"PLAYBOOK_DRAFT_CREATED",
|
||||
"STATE_GUARD_BLOCKED",
|
||||
})
|
||||
# ADR-071 新 event_type 已在 migrations/adr071_notification_lifecycle.sql 的 DB enum 中定義
|
||||
# 不在此做 runtime ALERT_EVENT_TYPES.update():避免模組副作用污染全局狀態 + 測試隔離失敗
|
||||
# 若 repository 層有 ALERT_EVENT_TYPES 白名單 validation,
|
||||
# 請在 alert_operation_log_repository.py 的 ALERT_EVENT_TYPES 靜態集合中加入以下值:
|
||||
# "KM_CONVERTED", "NOTIFICATION_CLASSIFIED", "MANUAL_FIX_RECORDED",
|
||||
# "PLAYBOOK_DRAFT_CREATED", "STATE_GUARD_BLOCKED"
|
||||
|
||||
# 通知類型 → KM 品質等級對應
|
||||
_TYPE_TO_STATUS = {
|
||||
@@ -78,11 +76,22 @@ class KMConversionService:
|
||||
將 Incident 轉換為 KnowledgeEntry
|
||||
|
||||
Args:
|
||||
incident: Incident ORM 物件
|
||||
incident: Incident ORM 物件(呼叫前確保 signals 已 eager load)
|
||||
|
||||
Returns:
|
||||
dict with km_entry_id and quality_level, or None if skipped
|
||||
"""
|
||||
try:
|
||||
return await self._convert_inner(incident)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"km_conversion_error",
|
||||
incident_id=getattr(incident, "incident_id", "unknown"),
|
||||
error=str(e),
|
||||
)
|
||||
raise
|
||||
|
||||
async def _convert_inner(self, incident) -> dict | None:
|
||||
notification_type = getattr(incident, "notification_type", None) or "TYPE-3"
|
||||
|
||||
# TYPE-1 不轉 KM
|
||||
|
||||
@@ -2731,13 +2731,20 @@ class TelegramGateway:
|
||||
success=True,
|
||||
)
|
||||
|
||||
# 觸發 KM 轉換(重讀最新 incident)
|
||||
# 觸發 KM 轉換(直接 await,避免 create_task() 在 DB session 關閉後的競爭條件)
|
||||
# 重讀 incident 確保 manual_fix_steps 已寫入
|
||||
incident_updated = await incident_repo.get_by_id(approval.incident_id)
|
||||
if incident_updated:
|
||||
from src.services.km_conversion_service import get_km_conversion_service
|
||||
km_svc = get_km_conversion_service()
|
||||
import asyncio as _asyncio
|
||||
_asyncio.create_task(km_svc.convert(incident_updated))
|
||||
try:
|
||||
await km_svc.convert(incident_updated)
|
||||
except Exception as _km_err:
|
||||
logger.warning(
|
||||
"km_conversion_failed",
|
||||
incident_id=approval.incident_id,
|
||||
error=str(_km_err),
|
||||
)
|
||||
|
||||
# 回覆確認
|
||||
await self._send_request("sendMessage", {
|
||||
|
||||
Reference in New Issue
Block a user