feat(Phase 5 Sprint 5.3): 寫類分類按鈕 nonce action 路由 + audit log
Some checks failed
CD Pipeline / build-and-deploy (push) Has been cancelled

插入點: _handle_callback_query Step 1.9 (nonce 驗證後, Step 2 approve/reject 前)

邏輯:
1. 從 spec registry 查 action 是否為註冊的寫類動作
2. 若 action in (approve/reject/silence/tune/log_manual_fix) → skip 走既有流程
3. 若 spec.requires_multi_sig=True 且 current_signatures < 2 → 提示「需 2 人簽核」
4. Audit log (category_write_action_audit_start) 含 user/risk/provider/tool
5. Ack Telegram (emoji + label + 執行中...)
6. 從 incident 取 labels 供模板替換
7. dispatch_action() → MCP 執行
8. Reply 結果到原告警卡片(Redis tg_msg lookup)
9. Audit log (category_write_action_audit_complete) 含 success/error/duration

支援的寫類 action:
- k8s_restart/scale_up/scale_down/rollback (kubernetes)
- host_restart_service/clear_log (host_resource)
- docker_restart/minio_restart (devops_tool/storage)
- reload_nginx/renew_cert (network/ssl_cert)
- kill_slow_query/clear_conn_pool (database)
- pause_1h/trigger_diagnose (business/flywheel)

Multi-Sig 支援 (Sprint 5.4 預留):
- secops_isolate/block_ip/evict → requires_multi_sig=True
- 簽核數未達 2 → 提示 + 不執行

回歸: 129/129

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
This commit is contained in:
OG T
2026-04-14 21:39:15 +08:00
parent 44545633a8
commit de8bbd8ab9

View File

@@ -2643,6 +2643,136 @@ class TelegramGateway:
if guard_result is not None:
return guard_result
# ===================================================================
# Step 1.9: Phase 5 Sprint 5.3 — 分類按鈕寫類 action 路由
# 2026-04-14 Claude Sonnet 4.6
# 若 action 在 callback_action_spec registry 且非 approve/reject/silence/tune
# → 走 dispatcher 執行 MCP + audit log
# ===================================================================
from src.services.callback_dispatcher import get_action_spec as _get_spec
_category_spec = _get_spec(action)
if _category_spec and action not in (
"approve", "reject", "silence", "tune", "log_manual_fix"
):
# Multi-Sig 守衛 (Sprint 5.4 secops 類)
if _category_spec.requires_multi_sig:
# 檢查 approval_records.current_signatures 是否已達 2
try:
from src.services.approval_db import get_approval_service as _svc
from uuid import UUID as _UUID
_existing = await _svc().get_approval(_UUID(approval_id))
_sigs = (
len(_existing.signatures) if _existing and _existing.signatures else 0
)
except Exception:
_sigs = 0
if _sigs < 2:
await self._answer_callback(
callback_query_id, action,
text=f"⚠️ 需 2 人簽核 ({_sigs}/2)",
)
logger.info(
"category_action_multi_sig_pending",
action=action, approval_id=approval_id, current_sigs=_sigs,
)
return {
"action": action, "approval_id": approval_id,
"user": user, "success": False,
"reason": "multi_sig_pending",
}
# Audit log 開始(寫類動作)
logger.info(
"category_write_action_audit_start",
action=action,
approval_id=approval_id,
user_id=user_id,
username=username,
risk=_category_spec.risk,
provider=_category_spec.mcp_provider,
tool=_category_spec.mcp_tool,
)
# Ack Telegram
await self._answer_callback(
callback_query_id, action,
text=f"{_category_spec.emoji} {_category_spec.label} 執行中...",
)
# 查 incident_id + labels for template
_incident_id_resolved = approval_id # fallback
_labels: dict = {}
try:
from src.repositories.incident_repository import get_incident_repository
_repo = get_incident_repository()
# approval_id 可能是 INC-xxx 或 UUID先試 INC 格式
if approval_id.startswith("INC-"):
_inc = await _repo.get_by_id(approval_id)
else:
# UUID → 找 approval → incident_id
from src.services.approval_db import get_approval_service
from uuid import UUID
_app = await get_approval_service().get_approval(UUID(approval_id))
_inc_id = getattr(_app, "incident_id", None) if _app else None
_inc = await _repo.get_by_id(_inc_id) if _inc_id else None
if _inc:
_incident_id_resolved = _inc.incident_id
if _inc and _inc.signals:
_labels = _inc.signals[0].labels or {}
except Exception as _e:
logger.debug("category_action_labels_lookup_failed", error=str(_e))
# Dispatch
from src.services.callback_dispatcher import dispatch_action as _dispatch
_result = await _dispatch(
action_name=action,
incident_id=_incident_id_resolved,
user_id=user_id,
labels=_labels,
)
# Reply 結果到原告警卡片
try:
from src.core.redis_client import get_redis as _gr
_rds = _gr()
_msg_id_raw = await _rds.get(f"tg_msg:{_incident_id_resolved}")
_orig_msg = int(_msg_id_raw) if _msg_id_raw else None
except Exception:
_orig_msg = None
try:
_payload = {
"chat_id": settings.OPENCLAW_TG_CHAT_ID,
"text": _result.result_text,
"parse_mode": "HTML",
}
if _orig_msg:
_payload["reply_to_message_id"] = _orig_msg
await self._http_client.post(
f"https://api.telegram.org/bot{settings.OPENCLAW_TG_BOT_TOKEN}/sendMessage",
json=_payload,
)
except Exception as _re:
logger.warning("category_action_reply_send_failed", error=str(_re))
# Audit log 完成
logger.info(
"category_write_action_audit_complete",
action=action,
approval_id=approval_id,
user_id=user_id,
success=_result.success,
error=_result.error,
duration_ms=round(_result.duration_ms, 1),
)
return {
"action": action,
"approval_id": approval_id,
"user": user,
"success": _result.success,
"category_action": True,
}
# ===================================================================
# Step 2: 處理自動調優 (Shadow Mode)
# ===================================================================