feat(Phase 5 Sprint 5.0): Callback Dispatcher 規格 + 骨架 + 22 測試
Some checks failed
CD Pipeline / build-and-deploy (push) Has been cancelled
Some checks failed
CD Pipeline / build-and-deploy (push) Has been cancelled
統帥批准 Phase 5 全 Sprint,Sprint 5.0 產出:
1. callback_action_spec.yaml (24 actions)
- 10 查類 (info 2-part callback, 無副作用): check_process, check_port,
check_log_*, check_health, check_pod_logs, describe_pod, open_signoz,
open_flywheel
- 10 寫類 (nonce 4-part, 有副作用): k8s_restart/scale_up/scale_down/rollback,
host_restart_service/clear_log, docker_restart, minio_restart,
reload_nginx, renew_cert
- 4 secops (Multi-Sig CRITICAL): secops_isolate/block_ip/evict/authorize
2. callback_dispatcher.py
- Registry pattern (lru_cache): get_action_spec / list_actions_for_category
- 模板變數替換: {incident_id} / {labels.xxx} / {signals[0].xxx}
- dispatch_action() 骨架 (Sprint 5.2+ 接 MCP)
- _format_reply: text/code/truncated/url 4 種格式
3. test_callback_dispatcher.py (22 tests全過)
- Registry loading 正確性
- Category filtering
- Template resolution (含 nested list index)
- dispatch stub 返回正確 spec 提示
下一步 Sprint 5.1: 接入 MCP registry + telegram callback_handler 整合
MCP 底層能力已有 (k8s 10+ tools, ssh 15 tools)
Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
This commit is contained in:
430
apps/api/src/services/callback_action_spec.yaml
Normal file
430
apps/api/src/services/callback_action_spec.yaml
Normal file
@@ -0,0 +1,430 @@
|
||||
# Telegram 分類按鈕 Action 規格註冊表
|
||||
# ============================================
|
||||
# Phase 5 Sprint 5.0 — 2026-04-14 Claude Sonnet 4.6
|
||||
# 相關: docs/superpowers/plans/2026-04-14-PHASE-5-category-buttons-completion.md
|
||||
#
|
||||
# 格式:
|
||||
# actions:
|
||||
# <action_name>:
|
||||
# label: <UI 顯示文字>
|
||||
# emoji: <按鈕 emoji>
|
||||
# risk: low | medium | high | critical
|
||||
# callback_format: info | nonce # info=2-part (查類), nonce=4-part (寫類)
|
||||
# category: <alert_category> # 綁定於 classify_alert_early 輸出
|
||||
# mcp:
|
||||
# provider: k8s | ssh | prometheus | signoz | database | internal
|
||||
# tool: <MCP tool name>
|
||||
# params: # 參數模板 (支援 {incident_id}/{labels.xxx}/{signals[0].xxx})
|
||||
# <key>: <template>
|
||||
# reply_format: text | code | url | truncated # 執行結果 reply 樣式
|
||||
# requires_multi_sig: false # secops 類設 true
|
||||
# timeout_sec: 10 # MCP 呼叫 timeout
|
||||
# description: <說明>
|
||||
|
||||
version: "1.0"
|
||||
last_updated: "2026-04-14"
|
||||
|
||||
actions:
|
||||
# ==========================================================================
|
||||
# 查類按鈕(無副作用,2-part info callback)
|
||||
# ==========================================================================
|
||||
|
||||
check_process:
|
||||
label: "查程序"
|
||||
emoji: "🔍"
|
||||
risk: low
|
||||
callback_format: info
|
||||
category: host_resource
|
||||
mcp:
|
||||
provider: ssh
|
||||
tool: ssh_get_top_processes
|
||||
params:
|
||||
host: "{labels.instance}"
|
||||
limit: 10
|
||||
reply_format: code
|
||||
timeout_sec: 8
|
||||
description: "列出主機上 CPU 最高的 10 個程序"
|
||||
|
||||
check_log_nginx:
|
||||
label: "查 Nginx Log"
|
||||
emoji: "📋"
|
||||
risk: low
|
||||
callback_format: info
|
||||
category: network
|
||||
mcp:
|
||||
provider: ssh
|
||||
tool: ssh_get_nginx_error_log
|
||||
params:
|
||||
host: "{labels.instance}"
|
||||
lines: 50
|
||||
reply_format: truncated
|
||||
timeout_sec: 10
|
||||
description: "最後 50 行 Nginx error log"
|
||||
|
||||
check_log_container:
|
||||
label: "查容器 Log"
|
||||
emoji: "📋"
|
||||
risk: low
|
||||
callback_format: info
|
||||
category: devops_tool
|
||||
mcp:
|
||||
provider: ssh
|
||||
tool: ssh_get_container_logs
|
||||
params:
|
||||
host: "{labels.instance}"
|
||||
container: "{labels.container}"
|
||||
lines: 50
|
||||
reply_format: truncated
|
||||
timeout_sec: 10
|
||||
description: "最後 50 行容器 stdout/stderr log"
|
||||
|
||||
check_log_minio:
|
||||
label: "查 MinIO Log"
|
||||
emoji: "📋"
|
||||
risk: low
|
||||
callback_format: info
|
||||
category: storage
|
||||
mcp:
|
||||
provider: ssh
|
||||
tool: ssh_get_container_logs
|
||||
params:
|
||||
host: "192.168.0.188"
|
||||
container: "minio"
|
||||
lines: 50
|
||||
reply_format: truncated
|
||||
timeout_sec: 10
|
||||
description: "MinIO 容器最後 50 行 log"
|
||||
|
||||
check_port:
|
||||
label: "查 Port"
|
||||
emoji: "🔌"
|
||||
risk: low
|
||||
callback_format: info
|
||||
category: network
|
||||
mcp:
|
||||
provider: ssh
|
||||
tool: ssh_check_port
|
||||
params:
|
||||
host: "{labels.instance}"
|
||||
port: "{labels.port}"
|
||||
reply_format: text
|
||||
timeout_sec: 5
|
||||
description: "檢查主機上指定 port 是否 LISTEN"
|
||||
|
||||
check_health:
|
||||
label: "查健康狀態"
|
||||
emoji: "🔍"
|
||||
risk: low
|
||||
callback_format: info
|
||||
category: external_site
|
||||
mcp:
|
||||
provider: prometheus
|
||||
tool: prometheus_query
|
||||
params:
|
||||
query: 'probe_success{{instance="{labels.instance}"}}'
|
||||
reply_format: text
|
||||
timeout_sec: 5
|
||||
description: "Blackbox probe 健康狀態"
|
||||
|
||||
check_pod_logs:
|
||||
label: "查 Pod Log"
|
||||
emoji: "📋"
|
||||
risk: low
|
||||
callback_format: info
|
||||
category: kubernetes
|
||||
mcp:
|
||||
provider: k8s
|
||||
tool: k8s_get_pod_logs
|
||||
params:
|
||||
namespace: "{labels.namespace}"
|
||||
pod: "{labels.pod}"
|
||||
tail_lines: 50
|
||||
reply_format: truncated
|
||||
timeout_sec: 8
|
||||
description: "Pod 最後 50 行 log"
|
||||
|
||||
describe_pod:
|
||||
label: "詳細 Pod"
|
||||
emoji: "📜"
|
||||
risk: low
|
||||
callback_format: info
|
||||
category: kubernetes
|
||||
mcp:
|
||||
provider: k8s
|
||||
tool: k8s_describe_pod
|
||||
params:
|
||||
namespace: "{labels.namespace}"
|
||||
pod: "{labels.pod}"
|
||||
reply_format: truncated
|
||||
timeout_sec: 5
|
||||
description: "kubectl describe pod 完整資訊"
|
||||
|
||||
open_signoz:
|
||||
label: "查 SignOz"
|
||||
emoji: "🔍"
|
||||
risk: low
|
||||
callback_format: info
|
||||
category: business
|
||||
mcp:
|
||||
provider: internal
|
||||
tool: build_signoz_url
|
||||
params:
|
||||
service: "{labels.service}"
|
||||
reply_format: url
|
||||
timeout_sec: 1
|
||||
description: "返回 SignOz deeplink"
|
||||
|
||||
open_flywheel:
|
||||
label: "飛輪面板"
|
||||
emoji: "📊"
|
||||
risk: low
|
||||
callback_format: info
|
||||
category: flywheel_health
|
||||
mcp:
|
||||
provider: internal
|
||||
tool: build_flywheel_url
|
||||
params: {}
|
||||
reply_format: url
|
||||
timeout_sec: 1
|
||||
description: "返回飛輪儀表板 URL"
|
||||
|
||||
# ==========================================================================
|
||||
# 寫類按鈕(有副作用,4-part nonce callback)
|
||||
# ==========================================================================
|
||||
|
||||
k8s_restart:
|
||||
label: "重啟"
|
||||
emoji: "🔄"
|
||||
risk: medium
|
||||
callback_format: nonce
|
||||
category: kubernetes
|
||||
mcp:
|
||||
provider: k8s
|
||||
tool: kubectl_restart
|
||||
params:
|
||||
namespace: "{labels.namespace}"
|
||||
deployment: "{labels.deployment}"
|
||||
reply_format: text
|
||||
timeout_sec: 30
|
||||
description: "kubectl rollout restart deployment"
|
||||
|
||||
k8s_scale_up:
|
||||
label: "擴容 +1"
|
||||
emoji: "📈"
|
||||
risk: medium
|
||||
callback_format: nonce
|
||||
category: kubernetes
|
||||
mcp:
|
||||
provider: k8s
|
||||
tool: kubectl_scale
|
||||
params:
|
||||
namespace: "{labels.namespace}"
|
||||
deployment: "{labels.deployment}"
|
||||
replicas_delta: 1
|
||||
reply_format: text
|
||||
timeout_sec: 10
|
||||
description: "副本數 +1(單調遞增)"
|
||||
|
||||
k8s_scale_down:
|
||||
label: "縮容 -1"
|
||||
emoji: "📉"
|
||||
risk: medium
|
||||
callback_format: nonce
|
||||
category: kubernetes
|
||||
mcp:
|
||||
provider: k8s
|
||||
tool: kubectl_scale
|
||||
params:
|
||||
namespace: "{labels.namespace}"
|
||||
deployment: "{labels.deployment}"
|
||||
replicas_delta: -1
|
||||
min_replicas: 1 # 禁止 scale to 0
|
||||
reply_format: text
|
||||
timeout_sec: 10
|
||||
description: "副本數 -1(禁止到 0)"
|
||||
|
||||
k8s_rollback:
|
||||
label: "回滾"
|
||||
emoji: "⏪"
|
||||
risk: high
|
||||
callback_format: nonce
|
||||
category: kubernetes
|
||||
mcp:
|
||||
provider: k8s
|
||||
tool: kubectl_rollout_undo # 需 Sprint 5.3 新增此 MCP tool
|
||||
params:
|
||||
namespace: "{labels.namespace}"
|
||||
deployment: "{labels.deployment}"
|
||||
reply_format: text
|
||||
timeout_sec: 60
|
||||
description: "kubectl rollout undo(回到上一版)"
|
||||
|
||||
host_restart_service:
|
||||
label: "重啟服務"
|
||||
emoji: "🔄"
|
||||
risk: high
|
||||
callback_format: nonce
|
||||
category: host_resource
|
||||
mcp:
|
||||
provider: ssh
|
||||
tool: ssh_systemctl_restart
|
||||
params:
|
||||
host: "{labels.instance}"
|
||||
service: "{labels.service}"
|
||||
reply_format: text
|
||||
timeout_sec: 30
|
||||
description: "systemctl restart 服務(主機層)"
|
||||
|
||||
host_clear_log:
|
||||
label: "清 Log"
|
||||
emoji: "🗑"
|
||||
risk: low
|
||||
callback_format: nonce
|
||||
category: host_resource
|
||||
mcp:
|
||||
provider: ssh
|
||||
tool: ssh_clear_docker_logs
|
||||
params:
|
||||
host: "{labels.instance}"
|
||||
container: "{labels.container}"
|
||||
reply_format: text
|
||||
timeout_sec: 10
|
||||
description: "清空容器 log file"
|
||||
|
||||
docker_restart:
|
||||
label: "重啟容器"
|
||||
emoji: "🔄"
|
||||
risk: high
|
||||
callback_format: nonce
|
||||
category: devops_tool
|
||||
mcp:
|
||||
provider: ssh
|
||||
tool: ssh_docker_restart
|
||||
params:
|
||||
host: "{labels.instance}"
|
||||
container: "{labels.container}"
|
||||
reply_format: text
|
||||
timeout_sec: 30
|
||||
description: "docker restart <container>"
|
||||
|
||||
minio_restart:
|
||||
label: "重啟 MinIO"
|
||||
emoji: "🔄"
|
||||
risk: high
|
||||
callback_format: nonce
|
||||
category: storage
|
||||
mcp:
|
||||
provider: ssh
|
||||
tool: ssh_docker_restart
|
||||
params:
|
||||
host: "192.168.0.188"
|
||||
container: "minio"
|
||||
reply_format: text
|
||||
timeout_sec: 30
|
||||
description: "重啟 MinIO Docker 容器"
|
||||
|
||||
reload_nginx:
|
||||
label: "重載 Nginx"
|
||||
emoji: "🔄"
|
||||
risk: low
|
||||
callback_format: nonce
|
||||
category: network
|
||||
mcp:
|
||||
provider: ssh
|
||||
tool: ssh_reload_nginx
|
||||
params:
|
||||
host: "{labels.instance}"
|
||||
reply_format: text
|
||||
timeout_sec: 10
|
||||
description: "nginx -s reload(不停機)"
|
||||
|
||||
renew_cert:
|
||||
label: "更新憑證"
|
||||
emoji: "🔐"
|
||||
risk: medium
|
||||
callback_format: nonce
|
||||
category: ssl_cert
|
||||
mcp:
|
||||
provider: ssh
|
||||
tool: ssh_renew_ssl
|
||||
params:
|
||||
host: "{labels.instance}"
|
||||
domain: "{labels.domain}"
|
||||
reply_format: text
|
||||
timeout_sec: 60
|
||||
description: "certbot renew"
|
||||
|
||||
# ==========================================================================
|
||||
# 資安按鈕(CRITICAL,Multi-Sig 必要)
|
||||
# ==========================================================================
|
||||
|
||||
secops_isolate:
|
||||
label: "隔離資源"
|
||||
emoji: "🚫"
|
||||
risk: critical
|
||||
callback_format: nonce
|
||||
category: secops
|
||||
requires_multi_sig: true
|
||||
mcp:
|
||||
provider: k8s
|
||||
tool: kubectl_exec # 需 Sprint 5.4 實作 NetworkPolicy apply
|
||||
params:
|
||||
namespace: "{labels.namespace}"
|
||||
command: "apply_network_policy_isolate"
|
||||
target_pod: "{labels.pod}"
|
||||
reply_format: text
|
||||
timeout_sec: 30
|
||||
description: "套用 NetworkPolicy 阻斷所有流量進入 Pod"
|
||||
|
||||
secops_block_ip:
|
||||
label: "封鎖來源 IP"
|
||||
emoji: "⛔"
|
||||
risk: critical
|
||||
callback_format: nonce
|
||||
category: secops
|
||||
requires_multi_sig: true
|
||||
mcp:
|
||||
provider: ssh
|
||||
tool: ssh_systemctl_restart # 需擴充加 iptables rule
|
||||
params:
|
||||
host: "{labels.instance}"
|
||||
service: "iptables-block"
|
||||
source_ip: "{labels.attacker_ip}"
|
||||
reply_format: text
|
||||
timeout_sec: 30
|
||||
description: "iptables DROP 來源 IP"
|
||||
|
||||
secops_evict:
|
||||
label: "強制驅逐 Pod"
|
||||
emoji: "🔄"
|
||||
risk: critical
|
||||
callback_format: nonce
|
||||
category: secops
|
||||
requires_multi_sig: true
|
||||
mcp:
|
||||
provider: k8s
|
||||
tool: kubectl_delete
|
||||
params:
|
||||
namespace: "{labels.namespace}"
|
||||
resource_type: pod
|
||||
name: "{labels.pod}"
|
||||
reply_format: text
|
||||
timeout_sec: 30
|
||||
description: "kubectl delete pod(會自動重建)"
|
||||
|
||||
secops_authorize:
|
||||
label: "確認授權"
|
||||
emoji: "✅"
|
||||
risk: high
|
||||
callback_format: nonce
|
||||
category: secops
|
||||
requires_multi_sig: false # 確認授權不需 multi-sig
|
||||
mcp:
|
||||
provider: internal
|
||||
tool: record_authorization
|
||||
params:
|
||||
user_id: "{callback.user_id}"
|
||||
source: "{labels.instance}"
|
||||
reply_format: text
|
||||
timeout_sec: 5
|
||||
description: "記錄授權,後續同 source 不再告警(24h)"
|
||||
324
apps/api/src/services/callback_dispatcher.py
Normal file
324
apps/api/src/services/callback_dispatcher.py
Normal file
@@ -0,0 +1,324 @@
|
||||
"""
|
||||
Telegram Callback Dispatcher — 分類按鈕統一調度
|
||||
================================================
|
||||
Phase 5 Sprint 5.0-5.1 — 2026-04-14 Claude Sonnet 4.6
|
||||
相關: docs/superpowers/plans/2026-04-14-PHASE-5-category-buttons-completion.md
|
||||
ADR-079 分類按鈕完整化
|
||||
|
||||
職責:
|
||||
1. 從 callback_action_spec.yaml 載入 action registry
|
||||
2. 接收 Telegram callback_data (action:incident_id or action:id:ts:rand)
|
||||
3. 驗證 nonce(寫類按鈕)或 allow info(查類按鈕)
|
||||
4. 依 spec 呼叫對應 MCP tool
|
||||
5. Reply 執行結果到原告警卡片(reply_to_message_id)
|
||||
|
||||
設計原則:
|
||||
- Registry pattern — 新增按鈕只需 yaml 一行,無需改 dispatcher code
|
||||
- 模板變數: {incident_id} / {labels.xxx} / {signals[0].xxx} / {callback.user_id}
|
||||
- 所有 action 都有 audit log(寫類額外 nonce 驗證 log)
|
||||
- reply_to 原告警 message_id(從 Redis tg_msg:{incident_id})
|
||||
|
||||
遵守「禁止 Mock 測試鐵律」: 純邏輯 + MCP dispatch,測試用真實 registry。
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from functools import lru_cache
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import structlog
|
||||
import yaml
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Data Types
|
||||
# =============================================================================
|
||||
|
||||
|
||||
@dataclass
|
||||
class ActionSpec:
|
||||
"""從 callback_action_spec.yaml 載入的單一 action 規格"""
|
||||
|
||||
name: str
|
||||
label: str
|
||||
emoji: str
|
||||
risk: str # low | medium | high | critical
|
||||
callback_format: str # info | nonce
|
||||
category: str
|
||||
mcp_provider: str # k8s | ssh | prometheus | signoz | database | internal
|
||||
mcp_tool: str
|
||||
mcp_params: dict[str, Any]
|
||||
reply_format: str # text | code | url | truncated
|
||||
timeout_sec: int
|
||||
description: str
|
||||
requires_multi_sig: bool = False
|
||||
|
||||
|
||||
@dataclass
|
||||
class DispatchResult:
|
||||
"""Dispatcher 執行結果"""
|
||||
|
||||
success: bool
|
||||
action: str
|
||||
incident_id: str
|
||||
user_id: int | None
|
||||
result_text: str
|
||||
error: str | None = None
|
||||
duration_ms: float = 0.0
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Spec Registry
|
||||
# =============================================================================
|
||||
|
||||
|
||||
@lru_cache(maxsize=1)
|
||||
def load_action_registry() -> dict[str, ActionSpec]:
|
||||
"""
|
||||
載入 callback_action_spec.yaml 並快取(進程內不重載,重啟 Pod 才更新)
|
||||
|
||||
Returns:
|
||||
{action_name: ActionSpec}
|
||||
"""
|
||||
spec_path = Path(__file__).parent / "callback_action_spec.yaml"
|
||||
if not spec_path.exists():
|
||||
logger.warning("callback_action_spec_not_found", path=str(spec_path))
|
||||
return {}
|
||||
|
||||
with spec_path.open("r", encoding="utf-8") as f:
|
||||
data = yaml.safe_load(f)
|
||||
|
||||
registry: dict[str, ActionSpec] = {}
|
||||
for name, spec_dict in (data.get("actions") or {}).items():
|
||||
mcp = spec_dict.get("mcp", {}) or {}
|
||||
registry[name] = ActionSpec(
|
||||
name=name,
|
||||
label=spec_dict.get("label", name),
|
||||
emoji=spec_dict.get("emoji", ""),
|
||||
risk=spec_dict.get("risk", "medium"),
|
||||
callback_format=spec_dict.get("callback_format", "info"),
|
||||
category=spec_dict.get("category", ""),
|
||||
mcp_provider=mcp.get("provider", ""),
|
||||
mcp_tool=mcp.get("tool", ""),
|
||||
mcp_params=mcp.get("params") or {},
|
||||
reply_format=spec_dict.get("reply_format", "text"),
|
||||
timeout_sec=int(spec_dict.get("timeout_sec", 10)),
|
||||
description=spec_dict.get("description", ""),
|
||||
requires_multi_sig=bool(spec_dict.get("requires_multi_sig", False)),
|
||||
)
|
||||
logger.info("callback_action_registry_loaded", count=len(registry))
|
||||
return registry
|
||||
|
||||
|
||||
def get_action_spec(action_name: str) -> ActionSpec | None:
|
||||
"""查找單一 action 規格"""
|
||||
return load_action_registry().get(action_name)
|
||||
|
||||
|
||||
def list_actions_for_category(alert_category: str) -> list[ActionSpec]:
|
||||
"""列出特定分類的所有可用 action(供 _build_inline_keyboard 使用)"""
|
||||
return [
|
||||
spec for spec in load_action_registry().values()
|
||||
if spec.category == alert_category
|
||||
]
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Template Variable Substitution
|
||||
# =============================================================================
|
||||
|
||||
|
||||
def _resolve_template(template: Any, context: dict) -> Any:
|
||||
"""
|
||||
遞迴替換模板變數。
|
||||
|
||||
支援:
|
||||
- {incident_id}
|
||||
- {labels.xxx} / {labels.xxx.yyy}
|
||||
- {signals[0].xxx}
|
||||
- {callback.user_id}
|
||||
|
||||
Example:
|
||||
template = {"host": "{labels.instance}", "lines": 50}
|
||||
context = {"labels": {"instance": "192.168.0.110"}, "incident_id": "INC-123"}
|
||||
→ {"host": "192.168.0.110", "lines": 50}
|
||||
"""
|
||||
if isinstance(template, dict):
|
||||
return {k: _resolve_template(v, context) for k, v in template.items()}
|
||||
if isinstance(template, list):
|
||||
return [_resolve_template(v, context) for v in template]
|
||||
if isinstance(template, str) and "{" in template:
|
||||
# 找出所有 {xxx} placeholder 並替換
|
||||
import re
|
||||
def _repl(m: re.Match) -> str:
|
||||
key = m.group(1)
|
||||
val = _lookup_context(key, context)
|
||||
return str(val) if val is not None else m.group(0)
|
||||
return re.sub(r"\{([a-zA-Z0-9_.\[\]]+)\}", _repl, template)
|
||||
return template
|
||||
|
||||
|
||||
def _lookup_context(key: str, context: dict) -> Any:
|
||||
"""
|
||||
從 context 查表(支援巢狀 key: labels.instance / signals[0].alert_name)
|
||||
"""
|
||||
parts = key.replace("[", ".").replace("]", "").split(".")
|
||||
cur: Any = context
|
||||
for part in parts:
|
||||
if part == "":
|
||||
continue
|
||||
if isinstance(cur, dict):
|
||||
cur = cur.get(part)
|
||||
elif isinstance(cur, list):
|
||||
try:
|
||||
cur = cur[int(part)]
|
||||
except (ValueError, IndexError):
|
||||
return None
|
||||
else:
|
||||
return None
|
||||
if cur is None:
|
||||
return None
|
||||
return cur
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Dispatcher (Sprint 5.1)
|
||||
# =============================================================================
|
||||
|
||||
|
||||
async def dispatch_action(
|
||||
action_name: str,
|
||||
incident_id: str,
|
||||
user_id: int | None = None,
|
||||
labels: dict | None = None,
|
||||
extra_context: dict | None = None,
|
||||
) -> DispatchResult:
|
||||
"""
|
||||
執行 callback action — 依 spec 呼叫 MCP tool
|
||||
|
||||
Args:
|
||||
action_name: action 名稱(對應 spec registry)
|
||||
incident_id: 關聯 incident
|
||||
user_id: Telegram user id(callback 來源)
|
||||
labels: alert labels(供模板替換)
|
||||
extra_context: 額外上下文(signals 等)
|
||||
|
||||
Returns:
|
||||
DispatchResult(包含 result_text 供 reply 使用)
|
||||
"""
|
||||
start = time.perf_counter()
|
||||
spec = get_action_spec(action_name)
|
||||
if not spec:
|
||||
logger.warning("dispatch_action_unknown", action=action_name)
|
||||
return DispatchResult(
|
||||
success=False,
|
||||
action=action_name,
|
||||
incident_id=incident_id,
|
||||
user_id=user_id,
|
||||
result_text="",
|
||||
error=f"Unknown action: {action_name}",
|
||||
duration_ms=(time.perf_counter() - start) * 1000,
|
||||
)
|
||||
|
||||
# 建立模板 context
|
||||
context = {
|
||||
"incident_id": incident_id,
|
||||
"labels": labels or {},
|
||||
"callback": {"user_id": user_id or 0},
|
||||
**(extra_context or {}),
|
||||
}
|
||||
resolved_params = _resolve_template(spec.mcp_params, context)
|
||||
|
||||
# Audit log (all actions)
|
||||
logger.info(
|
||||
"dispatch_action_start",
|
||||
action=action_name,
|
||||
incident_id=incident_id,
|
||||
user_id=user_id,
|
||||
risk=spec.risk,
|
||||
provider=spec.mcp_provider,
|
||||
tool=spec.mcp_tool,
|
||||
params=resolved_params,
|
||||
)
|
||||
|
||||
# MCP 呼叫(Sprint 5.1 只實作骨架,實際 MCP dispatch 在 Sprint 5.2/5.3)
|
||||
try:
|
||||
# Sprint 5.1 TODO: 接入 MCP registry
|
||||
# from src.plugins.mcp.registry import get_provider
|
||||
# provider = get_provider(spec.mcp_provider)
|
||||
# import asyncio
|
||||
# mcp_result = await asyncio.wait_for(
|
||||
# provider.execute(spec.mcp_tool, resolved_params),
|
||||
# timeout=spec.timeout_sec,
|
||||
# )
|
||||
# result_text = _format_reply(mcp_result, spec.reply_format, spec.label, spec.emoji)
|
||||
|
||||
# Sprint 5.0 骨架:僅返回「尚未實作」提示
|
||||
result_text = (
|
||||
f"{spec.emoji} <b>{spec.label}</b>\n"
|
||||
f"⚠️ Phase 5 Sprint 5.2+ 實作中(spec 已載入:{spec.mcp_provider}/{spec.mcp_tool})\n"
|
||||
f"參數: <code>{resolved_params}</code>"
|
||||
)
|
||||
duration = (time.perf_counter() - start) * 1000
|
||||
logger.info(
|
||||
"dispatch_action_stub",
|
||||
action=action_name,
|
||||
incident_id=incident_id,
|
||||
duration_ms=round(duration, 1),
|
||||
)
|
||||
return DispatchResult(
|
||||
success=True,
|
||||
action=action_name,
|
||||
incident_id=incident_id,
|
||||
user_id=user_id,
|
||||
result_text=result_text,
|
||||
duration_ms=duration,
|
||||
)
|
||||
except Exception as e:
|
||||
duration = (time.perf_counter() - start) * 1000
|
||||
logger.error(
|
||||
"dispatch_action_failed",
|
||||
action=action_name,
|
||||
incident_id=incident_id,
|
||||
error=str(e),
|
||||
duration_ms=round(duration, 1),
|
||||
)
|
||||
return DispatchResult(
|
||||
success=False,
|
||||
action=action_name,
|
||||
incident_id=incident_id,
|
||||
user_id=user_id,
|
||||
result_text=f"{spec.emoji} {spec.label} 執行失敗",
|
||||
error=str(e),
|
||||
duration_ms=duration,
|
||||
)
|
||||
|
||||
|
||||
def _format_reply(
|
||||
mcp_result: Any, reply_format: str, label: str, emoji: str
|
||||
) -> str:
|
||||
"""
|
||||
依 spec 格式化 reply 文字。
|
||||
|
||||
reply_format:
|
||||
- text: 單行文字
|
||||
- code: <code>...</code>
|
||||
- truncated: 截斷到 500 字
|
||||
- url: 直接返回 URL
|
||||
"""
|
||||
header = f"{emoji} <b>{label}</b>"
|
||||
if reply_format == "url":
|
||||
return f"{header}\n{mcp_result}"
|
||||
if reply_format == "code":
|
||||
return f"{header}\n<code>{str(mcp_result)[:800]}</code>"
|
||||
if reply_format == "truncated":
|
||||
text = str(mcp_result)[:500]
|
||||
if len(str(mcp_result)) > 500:
|
||||
text += "...\n<i>(已截斷)</i>"
|
||||
return f"{header}\n<pre>{text}</pre>"
|
||||
return f"{header}\n{mcp_result}"
|
||||
192
apps/api/tests/test_callback_dispatcher.py
Normal file
192
apps/api/tests/test_callback_dispatcher.py
Normal file
@@ -0,0 +1,192 @@
|
||||
"""
|
||||
Phase 5 Sprint 5.0-5.1 Callback Dispatcher 單元測試
|
||||
======================================================
|
||||
建立: 2026-04-14 台北深夜 Claude Sonnet 4.6
|
||||
|
||||
覆蓋:
|
||||
- callback_action_spec.yaml 載入正確性
|
||||
- 24 個 action 都能解析
|
||||
- 模板變數替換(_resolve_template)
|
||||
- Context lookup(labels.instance / signals[0].alert_name)
|
||||
- dispatch_action 骨架(Sprint 5.0 階段返回 stub)
|
||||
|
||||
🔴 遵循「禁止 Mock 測試鐵律」: 用真實 spec registry,不 mock。
|
||||
"""
|
||||
|
||||
import pytest
|
||||
|
||||
from src.services.callback_dispatcher import (
|
||||
dispatch_action,
|
||||
get_action_spec,
|
||||
list_actions_for_category,
|
||||
load_action_registry,
|
||||
_lookup_context,
|
||||
_resolve_template,
|
||||
)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Registry loading
|
||||
# =============================================================================
|
||||
|
||||
|
||||
class TestRegistryLoading:
|
||||
def test_registry_loads_all_24_actions(self):
|
||||
registry = load_action_registry()
|
||||
# 10 查類 + 10 寫類 + 4 secops = 24
|
||||
assert len(registry) >= 20, f"expected >= 20 actions, got {len(registry)}"
|
||||
|
||||
def test_all_actions_have_required_fields(self):
|
||||
registry = load_action_registry()
|
||||
for name, spec in registry.items():
|
||||
assert spec.name == name
|
||||
assert spec.label
|
||||
assert spec.risk in ("low", "medium", "high", "critical")
|
||||
assert spec.callback_format in ("info", "nonce")
|
||||
assert spec.mcp_provider, f"{name} missing mcp_provider"
|
||||
assert spec.mcp_tool, f"{name} missing mcp_tool"
|
||||
|
||||
def test_secops_requires_multi_sig(self):
|
||||
for sa in ("secops_isolate", "secops_block_ip", "secops_evict"):
|
||||
spec = get_action_spec(sa)
|
||||
assert spec and spec.requires_multi_sig is True, \
|
||||
f"{sa} should require multi_sig"
|
||||
|
||||
def test_info_actions_dont_need_multi_sig(self):
|
||||
spec = get_action_spec("check_process")
|
||||
assert spec and spec.requires_multi_sig is False
|
||||
|
||||
def test_write_actions_use_nonce_format(self):
|
||||
for wa in ("k8s_restart", "k8s_scale_up", "k8s_rollback", "host_restart_service"):
|
||||
spec = get_action_spec(wa)
|
||||
assert spec and spec.callback_format == "nonce", \
|
||||
f"{wa} should use nonce format"
|
||||
|
||||
def test_query_actions_use_info_format(self):
|
||||
for qa in ("check_process", "check_port", "open_signoz"):
|
||||
spec = get_action_spec(qa)
|
||||
assert spec and spec.callback_format == "info", \
|
||||
f"{qa} should use info format"
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Category filtering
|
||||
# =============================================================================
|
||||
|
||||
|
||||
class TestCategoryFiltering:
|
||||
def test_kubernetes_has_4_write_actions(self):
|
||||
actions = list_actions_for_category("kubernetes")
|
||||
write_actions = [a for a in actions if a.callback_format == "nonce"]
|
||||
assert len(write_actions) >= 4, \
|
||||
f"kubernetes should have at least 4 write actions, got {len(write_actions)}"
|
||||
|
||||
def test_secops_has_4_actions(self):
|
||||
actions = list_actions_for_category("secops")
|
||||
assert len(actions) == 4, f"secops should have 4 actions, got {len(actions)}"
|
||||
|
||||
def test_host_resource_has_mix(self):
|
||||
actions = list_actions_for_category("host_resource")
|
||||
assert len(actions) >= 2
|
||||
assert any(a.callback_format == "info" for a in actions), "需至少 1 個查類"
|
||||
assert any(a.callback_format == "nonce" for a in actions), "需至少 1 個寫類"
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Template variable resolution
|
||||
# =============================================================================
|
||||
|
||||
|
||||
class TestTemplateResolution:
|
||||
def test_lookup_simple_key(self):
|
||||
ctx = {"incident_id": "INC-123"}
|
||||
assert _lookup_context("incident_id", ctx) == "INC-123"
|
||||
|
||||
def test_lookup_nested_labels(self):
|
||||
ctx = {"labels": {"instance": "192.168.0.110"}}
|
||||
assert _lookup_context("labels.instance", ctx) == "192.168.0.110"
|
||||
|
||||
def test_lookup_deep_nested(self):
|
||||
ctx = {"labels": {"k8s": {"pod": "api-1"}}}
|
||||
assert _lookup_context("labels.k8s.pod", ctx) == "api-1"
|
||||
|
||||
def test_lookup_list_index(self):
|
||||
ctx = {"signals": [{"alert_name": "KubePodCrashLooping"}]}
|
||||
assert _lookup_context("signals[0].alert_name", ctx) == "KubePodCrashLooping"
|
||||
|
||||
def test_lookup_missing_returns_none(self):
|
||||
ctx = {"labels": {}}
|
||||
assert _lookup_context("labels.instance", ctx) is None
|
||||
|
||||
def test_resolve_template_dict(self):
|
||||
tpl = {"host": "{labels.instance}", "lines": 50}
|
||||
ctx = {"labels": {"instance": "10.0.0.1"}}
|
||||
out = _resolve_template(tpl, ctx)
|
||||
assert out == {"host": "10.0.0.1", "lines": 50}
|
||||
|
||||
def test_resolve_keeps_unresolved(self):
|
||||
"""若 context 缺 key,保留原 {...}(便於 debug)"""
|
||||
tpl = {"host": "{labels.missing}"}
|
||||
ctx = {"labels": {}}
|
||||
out = _resolve_template(tpl, ctx)
|
||||
assert out == {"host": "{labels.missing}"}
|
||||
|
||||
def test_resolve_string_with_multiple_placeholders(self):
|
||||
tpl = "host={labels.instance} port={labels.port}"
|
||||
ctx = {"labels": {"instance": "10.0.0.1", "port": "9100"}}
|
||||
out = _resolve_template(tpl, ctx)
|
||||
assert out == "host=10.0.0.1 port=9100"
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# dispatch_action stub (Sprint 5.0 骨架)
|
||||
# =============================================================================
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
class TestDispatchActionStub:
|
||||
async def test_unknown_action_returns_failure(self):
|
||||
result = await dispatch_action(
|
||||
action_name="unknown_action",
|
||||
incident_id="INC-TEST-001",
|
||||
)
|
||||
assert result.success is False
|
||||
assert "Unknown action" in (result.error or "")
|
||||
|
||||
async def test_check_process_stub_returns_spec_hint(self):
|
||||
result = await dispatch_action(
|
||||
action_name="check_process",
|
||||
incident_id="INC-TEST-002",
|
||||
user_id=12345,
|
||||
labels={"instance": "192.168.0.110"},
|
||||
)
|
||||
assert result.success is True
|
||||
# Sprint 5.0 階段 stub:應含「Sprint 5.2+ 實作中」提示 + 解析後的參數
|
||||
assert "Sprint 5.2+" in result.result_text
|
||||
assert "192.168.0.110" in result.result_text # labels.instance 正確替換
|
||||
|
||||
async def test_k8s_restart_stub_with_labels(self):
|
||||
result = await dispatch_action(
|
||||
action_name="k8s_restart",
|
||||
incident_id="INC-TEST-003",
|
||||
labels={"namespace": "awoooi-prod", "deployment": "awoooi-api"},
|
||||
)
|
||||
assert result.success is True
|
||||
# 驗證 namespace 和 deployment 都被替換
|
||||
assert "awoooi-prod" in result.result_text
|
||||
assert "awoooi-api" in result.result_text
|
||||
|
||||
async def test_dispatch_includes_duration(self):
|
||||
result = await dispatch_action(
|
||||
action_name="check_process",
|
||||
incident_id="INC-TEST-004",
|
||||
labels={"instance": "10.0.0.1"},
|
||||
)
|
||||
assert result.duration_ms >= 0
|
||||
assert result.duration_ms < 5000 # stub 應極快
|
||||
|
||||
async def test_secops_action_flag_preserved(self):
|
||||
"""secops 動作在 dispatcher 結果中能識別 (供上層 Multi-Sig 處理)"""
|
||||
spec = get_action_spec("secops_isolate")
|
||||
assert spec and spec.requires_multi_sig is True
|
||||
# dispatcher 本身不做 multi-sig 攔截(留給 callback_handler),只記錄 spec
|
||||
Reference in New Issue
Block a user