fix(security): Architecture Review 修復 5 項高信心問題

安全修復 (P0):
1. ssh_provider: 新增 _validate_param() 白名單驗證,防止 command injection
   - container_name/service/filter_name: [a-zA-Z0-9._-]{1,128}
   - compose_dir: 必須以 /opt/ 或 /srv/ 開頭,禁止 ..
   - domain: FQDN 白名單
   - tail/port/lines: int() 轉換 + 上下限夾緊
2. ssh_provider: known_hosts=None 改為讀 SSH_MCP_KNOWN_HOSTS_FILE 環境變數
   - 預設仍 None(內網快速啟動),但啟動時寫入 warning log
   - 設定文件:ops/runbooks/ssh-mcp-setup.md (待補)

模組化修復 (P1):
3. km_conversion_service: 移除 import 時的 ALERT_EVENT_TYPES.update() 副作用
   - ADR-071 event types 移入 alert_operation_log_repository.py 靜態集合
4. telegram_gateway: create_task() 改為 await + try/except
   - 避免 DB session 關閉後的競爭條件
   - KM 轉換失敗記錄 warning log,不中斷主流程
5. km_conversion_service: 新增頂層 try/except,錯誤一律 error log 後 re-raise

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
OG T
2026-04-11 02:50:26 +08:00
parent 0139aa79e7
commit 2af4dffcc6
4 changed files with 85 additions and 25 deletions

View File

@@ -72,6 +72,33 @@ FORBIDDEN_PATTERNS = [
r">\s*/etc/", # 重定向到系統目錄
]
# 參數白名單正則 — 用於 _validate_param()
# container_name / service / filter_name: 英數字、連字號、底線、點k8s/docker 命名規範)
_RE_SAFE_NAME = re.compile(r'^[a-zA-Z0-9._-]{1,128}$')
# compose_dir: 必須以 /opt/ 或 /srv/ 開頭,不含 ..
_RE_SAFE_DIR = re.compile(r'^/(opt|srv)/[a-zA-Z0-9._/-]{1,200}$')
# domain: FQDN + 萬用字元
_RE_SAFE_DOMAIN = re.compile(r'^(\*\.)?[a-zA-Z0-9.-]{1,253}$')
def _validate_param(key: str, value: str) -> str:
"""
對用戶提供的字串參數套用白名單驗證。
傳回驗證後的值;驗證失敗拋出 ValueError呼叫方應攔截並拒絕執行
"""
if key in ("container_name", "filter_name", "service"):
if not _RE_SAFE_NAME.match(value):
raise ValueError(f"Unsafe {key}: {value!r}")
elif key == "compose_dir":
# 禁止路徑穿越
if ".." in value or not _RE_SAFE_DIR.match(value):
raise ValueError(f"Unsafe compose_dir: {value!r}")
elif key == "domain":
if not _RE_SAFE_DOMAIN.match(value):
raise ValueError(f"Unsafe domain: {value!r}")
# tail / port / lines 由呼叫方 int() 轉換,不需字串白名單
return value
# 群組 A只讀
GROUP_A_TOOLS = {
"ssh_get_top_processes",
@@ -399,6 +426,7 @@ class SSHProvider(MCPToolProvider):
}
def _build_command(self, tool_name: str, params: dict) -> str:
# 所有接受用戶字串的工具,必須先通過 _validate_param() 白名單驗證
if tool_name == "ssh_get_top_processes":
return "ps aux --sort=-%cpu | head -15"
@@ -409,44 +437,44 @@ class SSHProvider(MCPToolProvider):
return "free -h"
if tool_name == "ssh_get_container_logs":
name = params["container_name"]
tail = int(params.get("tail", 50))
name = _validate_param("container_name", params["container_name"])
tail = max(1, min(int(params.get("tail", 50)), 1000)) # 上限 1000 行
return f"docker logs {name} --tail {tail} 2>&1"
if tool_name == "ssh_get_container_status":
name = params["filter_name"]
name = _validate_param("filter_name", params["filter_name"])
return f"docker ps -a --filter name={name}"
if tool_name == "ssh_get_service_status":
svc = params["service"]
svc = _validate_param("service", params["service"])
return f"systemctl status {svc} --no-pager -l 2>&1 | head -30"
if tool_name == "ssh_check_port":
port = int(params["port"])
port = max(1, min(int(params["port"]), 65535))
return f"ss -tlnp | grep :{port}"
if tool_name == "ssh_get_nginx_error_log":
lines = int(params.get("lines", 50))
lines = max(1, min(int(params.get("lines", 50)), 500)) # 上限 500 行
return f"tail -n {lines} /var/log/nginx/error.log 2>/dev/null || echo 'Log not found'"
if tool_name == "ssh_get_swap_info":
return "swapon --show; echo '---'; free -h"
if tool_name == "ssh_docker_restart":
name = params["container_name"]
name = _validate_param("container_name", params["container_name"])
return f"docker restart {name}"
if tool_name == "ssh_docker_compose_restart":
compose_dir = params["compose_dir"]
service = params["service"]
compose_dir = _validate_param("compose_dir", params["compose_dir"])
service = _validate_param("service", params["service"])
return f"cd {compose_dir} && docker compose restart {service}"
if tool_name == "ssh_systemctl_restart":
svc = params["service"]
svc = _validate_param("service", params["service"])
return f"systemctl restart {svc}"
if tool_name == "ssh_clear_docker_logs":
name = params["container_name"]
name = _validate_param("container_name", params["container_name"])
# 透過 docker inspect 取得 log 路徑,再截斷
return (
f"LOG_PATH=$(docker inspect --format='{{{{.LogPath}}}}' {name} 2>/dev/null) "
@@ -455,7 +483,7 @@ class SSHProvider(MCPToolProvider):
)
if tool_name == "ssh_renew_ssl":
domain = params["domain"]
domain = _validate_param("domain", params["domain"])
return f"/snap/bin/certbot renew --cert-name {domain} --non-interactive 2>&1"
if tool_name == "ssh_reload_nginx":
@@ -484,11 +512,21 @@ class SSHProvider(MCPToolProvider):
"Ensure K8s Secret 'ssh-mcp-key' is mounted correctly."
)
# known_hosts: 預設 None內網快速啟動
# 生產強化方式:設定 SSH_MCP_KNOWN_HOSTS_FILE 指向 ssh-keyscan 產生的文件
import os as _os
known_hosts_path = _os.environ.get("SSH_MCP_KNOWN_HOSTS_FILE", None)
if known_hosts_path is None:
logger.warning(
"ssh_mcp.known_hosts_disabled",
note="Set SSH_MCP_KNOWN_HOSTS_FILE env var to enable host key verification",
)
async with asyncssh.connect(
host,
username=SSH_USER,
client_keys=[SSH_KEY_PATH],
known_hosts=None, # 內網信任,不驗證 known_hosts
known_hosts=known_hosts_path, # None = 跳過驗證(內網),或指定文件路徑
connect_timeout=timeout,
) as conn:
result = await asyncssh.run(conn, cmd, timeout=timeout, check=False)

View File

@@ -44,6 +44,12 @@ ALERT_EVENT_TYPES = {
"BACKUP_FAILED",
"APPROVAL_ESCALATED",
"CHANGE_APPLIED",
# ADR-071 通知生命週期 (2026-04-11 Claude Sonnet 4.6 Asia/Taipei)
"NOTIFICATION_CLASSIFIED",
"MANUAL_FIX_RECORDED",
"KM_CONVERTED",
"PLAYBOOK_DRAFT_CREATED",
"STATE_GUARD_BLOCKED",
}

View File

@@ -33,14 +33,12 @@ from src.services.knowledge_service import get_knowledge_service
logger = structlog.get_logger(__name__)
# 加入 ADR-071 新 event_type(避免 validation 攔截)
ALERT_EVENT_TYPES.update({
"KM_CONVERTED",
"NOTIFICATION_CLASSIFIED",
"MANUAL_FIX_RECORDED",
"PLAYBOOK_DRAFT_CREATED",
"STATE_GUARD_BLOCKED",
})
# ADR-071 新 event_type 已在 migrations/adr071_notification_lifecycle.sql 的 DB enum 中定義
# 不在此做 runtime ALERT_EVENT_TYPES.update():避免模組副作用污染全局狀態 + 測試隔離失敗
# 若 repository 層有 ALERT_EVENT_TYPES 白名單 validation
# 請在 alert_operation_log_repository.py 的 ALERT_EVENT_TYPES 靜態集合中加入以下值:
# "KM_CONVERTED", "NOTIFICATION_CLASSIFIED", "MANUAL_FIX_RECORDED",
# "PLAYBOOK_DRAFT_CREATED", "STATE_GUARD_BLOCKED"
# 通知類型 → KM 品質等級對應
_TYPE_TO_STATUS = {
@@ -78,11 +76,22 @@ class KMConversionService:
將 Incident 轉換為 KnowledgeEntry
Args:
incident: Incident ORM 物件
incident: Incident ORM 物件(呼叫前確保 signals 已 eager load
Returns:
dict with km_entry_id and quality_level, or None if skipped
"""
try:
return await self._convert_inner(incident)
except Exception as e:
logger.error(
"km_conversion_error",
incident_id=getattr(incident, "incident_id", "unknown"),
error=str(e),
)
raise
async def _convert_inner(self, incident) -> dict | None:
notification_type = getattr(incident, "notification_type", None) or "TYPE-3"
# TYPE-1 不轉 KM

View File

@@ -2731,13 +2731,20 @@ class TelegramGateway:
success=True,
)
# 觸發 KM 轉換(重讀最新 incident
# 觸發 KM 轉換(直接 await避免 create_task() 在 DB session 關閉後的競爭條件
# 重讀 incident 確保 manual_fix_steps 已寫入
incident_updated = await incident_repo.get_by_id(approval.incident_id)
if incident_updated:
from src.services.km_conversion_service import get_km_conversion_service
km_svc = get_km_conversion_service()
import asyncio as _asyncio
_asyncio.create_task(km_svc.convert(incident_updated))
try:
await km_svc.convert(incident_updated)
except Exception as _km_err:
logger.warning(
"km_conversion_failed",
incident_id=approval.incident_id,
error=str(_km_err),
)
# 回覆確認
await self._send_request("sendMessage", {