Files
awoooi/apps/api/src/services/telegram_gateway.py
OG T 88696dba9b
Some checks failed
CD Pipeline / build-and-deploy (push) Failing after 1m33s
Type Sync Check / check-type-sync (push) Failing after 58s
feat(sprint5.1): Data Safety Guardrails 全鏈路整合 (L1-L5)
Layer 0 - K8s RBAC:
  - k8s/rbac/api-velero-reader.yaml: awoooi-executor SA Velero backup reader

Layer 1 - DB Migration (已在 188 執行):
  - M-002: approval_records 新增 approval_level/votes/required_votes
  - M-003: alert_event_type ENUM 新增 8 個值

Layer 2 - IaC:
  - ops/config/service-registry.yaml: 全服務 Stateful 分級清單 (BLOCK/CRITICAL_HITL/STANDARD_HITL/AUTO)

Layer 3 - Python Services:
  - service_registry.py: 讀取 YAML,提供 is_blocked/requires_multisig/get_required_votes
  - velero_client.py: kubectl 查詢 Velero 備份年齡,失敗 fallback 999h
  - preflight_service.py: Pre-flight 安全檢查 (Q2/Q4 決策)

Layer 1-M001 - Playbook model:
  - playbook.py: 新增 requires_approval_level/stateful_targets/requires_pre_backup

Layer 4 - 業務邏輯:
  - alert_operation_log_repository.py: 新增 8 個 event_type (Guardrail/Pre-flight/MultiSig/備份)
  - auto_repair_service.py: 注入 Service Registry Guardrail 檢查 (BLOCK → 直接拒絕)
  - webhooks.py: ALERT_RECEIVED 溯源記錄 + auto_repair flag Q9 + Langfuse trace_id Q10
  - db/models.py: ApprovalRecord 同步 approval_level/votes/required_votes 欄位
  - docker-health-monitor.sh: 純感知層改造(移除所有 docker restart 邏輯)

Layer 5 - Telegram 通知:
  - telegram_gateway.py: T1-T6 六個新通知方法 (Guardrail/Pre-flight/Backup/MultiSig/ChangeApplied)

參考: ADR-062 Data Safety Guardrails, ADR-063 Service Registry IaC

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-08 16:24:09 +08:00

3876 lines
148 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Telegram Gateway - OpenClaw 行動戰情室 + SignOz 整合
====================================================
Phase 5.4.3 & 5.4.4: Telegram 推送與簽核接收
統帥校正: SignOz 為唯一全能視力中心
Features:
- 推送待簽核卡片到 Telegram (含 SignOz 指標)
- 動態 SignOz Trace URL (告警前後 5 分鐘)
- 自動調優按鈕 (Shadow Mode: 僅日誌輸出)
- 接收統帥簽核回調
- SOUL.md 訊息壓縮原則 100% 遵守
SOUL.md 鐵律 (4.1 Telegram 訊息壓縮原則):
- 狀態標籤: 20 字元
- 資源名稱: 50 字元
- 根因摘要: 100 字元
- 建議行動: 50 字元
- 總長度: 800 字元 (v7.0 擴展以容納 SignOz 區塊)
修復紀錄:
- 2026-03-26 Claude Code: 修復 HTML 解析錯誤 (Can't parse entities)
"""
import asyncio
import html
import os
from dataclasses import dataclass
from datetime import UTC, datetime
import httpx
import structlog
from opentelemetry import trace
from src.core.config import settings
from src.core.redis_client import get_redis
from src.services.security_interceptor import (
NonceReplayError,
UserNotWhitelistedError,
get_security_interceptor,
)
from src.services.chat_manager import get_chat_manager
# =============================================================================
# Snooze/Silence Redis Keys (2026-03-27 P1 優化)
# =============================================================================
SNOOZE_KEY_PREFIX = "telegram_snooze:" # {approval_id} -> 稍後提醒
SILENCE_KEY_PREFIX = "telegram_silence:" # {resource_name} -> 靜默
SNOOZE_TTL_SECONDS = 30 * 60 # 30 分鐘
SILENCE_TTL_SECONDS = 60 * 60 # 1 小時
# 2026-04-01 Claude Code: Long Polling 分散式 Leader Election
# 防止多 Pod 同時 getUpdates → 409 Conflict 互搶問題
POLLING_LEADER_KEY = "telegram:polling:leader"
POLLING_LEADER_TTL = 45 # seconds - Pod 宕掉後 45s 自動轉移
POLLING_LEADER_RENEW = 20 # seconds - 每 20s 續約
POLLING_LEADER_WATCH = 30 # seconds - 非 Leader Pod 每 30s 嘗試接管
logger = structlog.get_logger(__name__)
# =============================================================================
# OTEL Tracer (Phase C P1 可觀測性)
# 2026-03-30 Claude Code: 新增 Telegram Gateway 追蹤
# =============================================================================
_tracer = trace.get_tracer("awoooi.telegram_gateway", "1.0.0")
# =============================================================================
# Long Polling 配置 (Phase 5 內網修復)
# =============================================================================
LONG_POLLING_TIMEOUT = 30 # getUpdates timeout (秒)
LONG_POLLING_RETRY_DELAY = 5 # 錯誤後重試延遲 (秒)
# =============================================================================
# SignOz Metrics Block (v7.0)
# =============================================================================
@dataclass
class SignOzMetricsBlock:
"""
SignOz 指標區塊 (嵌入 Telegram 卡片)
格式:
📊 SignOz 指標
├ RPS: 150.2 📈
├ Error: 🟢 0.5%
└ P99: 245ms ➡️
"""
rps: float = 0.0
rps_trend: str = "stable" # up, down, stable
error_rate: float = 0.0
p99_latency_ms: float = 0.0
latency_trend: str = "stable"
trace_url: str = ""
def format(self) -> str:
"""格式化為 Telegram HTML"""
trend_emoji = {"up": "📈", "down": "📉", "stable": "➡️"}
error_emoji = "🟢" if self.error_rate < 1 else ("🟡" if self.error_rate < 5 else "🔴")
return (
f"📊 <b>SignOz 指標</b>\n"
f"├ RPS: <code>{self.rps:.1f}</code> {trend_emoji.get(self.rps_trend, '➡️')}\n"
f"├ Error: {error_emoji} <code>{self.error_rate:.2f}%</code>\n"
f"└ P99: <code>{self.p99_latency_ms:.0f}ms</code> {trend_emoji.get(self.latency_trend, '➡️')}"
)
# =============================================================================
# SOUL.md 訊息格式定義 (v7.0 + SignOz)
# =============================================================================
@dataclass
class TelegramMessage:
"""
Telegram 訊息結構 (SOUL.md 4.1 + v7.0 SignOz 整合)
格式:
═══════════════════════════
🚨 CRITICAL | harbor-core
═══════════════════════════
📋 INC-20260321-0001
🎯 資源: harbor-core-7d4b8c9f5
━━━━━━━━━━━━━━━━━━━
🤖 AI 仲裁判定
👥 責任: BE (後端)
📊 信心: 🟢 88%
💡 原因: JVM Heap 配置不當
━━━━━━━━━━━━━━━━━━━
📊 SignOz 指標
├ RPS: 150.2 📈
├ Error: 🟢 0.5%
└ P99: 245ms ➡️
━━━━━━━━━━━━━━━━━━━
🔧 建議: 刪除 Pod
⏱️ 停機: ~30s
🔍 SignOz Trace (±5min)
[✅ 簽核] [❌ 拒絕] [⚡ 自動調優]
"""
status_emoji: str # 🚨, ⚠️,
risk_level: str # CRITICAL, MEDIUM, LOW
resource_name: str # Pod/Deployment 名稱 (max 50)
root_cause: str # 根因摘要 (max 100)
suggested_action: str # 建議操作 (max 50)
estimated_downtime: str # 預計停機時間
approval_id: str # 簽核單 ID
# v6.0 AI 仲裁欄位
incident_id: str = "" # 事件編號 INC-YYYYMMDD-XXXX
primary_responsibility: str = "COLLAB" # FE/BE/INFRA/DB/COLLAB
confidence: float = 0.0 # 信心度 0.0-1.0
namespace: str = "default" # K8s namespace
# v7.0 SignOz 整合
signoz_metrics: SignOzMetricsBlock | None = None
signoz_trace_url: str = "" # 動態時間參數 URL
auto_tuning_command: str = "" # kubectl 調優指令
# 2026-03-29 ogt: AI Token/Cost 追蹤
ai_tokens: int = 0 # LLM Token 使用量
ai_cost: float = 0.0 # LLM 成本 (USD)
# 2026-03-29 ogt: ADR-037 異常頻率統計
anomaly_frequency: dict | None = None # AnomalyCounter 統計
# 2026-03-29 ogt: AI Provider 來源顯示
ai_provider: str = "" # ollama/gemini/claude/expert_system/mock
# 2026-04-04 ogt: 底層模型名稱 (e.g. qwen2.5:7b-instruct, nemotron-70b)
ai_model: str = ""
# ==========================================================================
# Phase 22: Nemotron 協作欄位 (ADR-044)
# 2026-03-31 Claude Code: OpenClaw + Nemotron 雙軌顯示
# ==========================================================================
nemotron_enabled: bool = False # 是否啟用 Nemotron 協作
nemotron_tools: list[dict] | None = None # Tool Calling 結果 [{"tool": str, "args": dict, "valid": bool}]
nemotron_validation: str = "" # "✅ 驗證通過" / "❌ 驗證失敗" / "⏳ 驗證中"
nemotron_latency_ms: float = 0.0 # Nemotron 呼叫延遲 (ms)
def format(self) -> str:
"""
格式化為 SOUL.md 規範的訊息 (含 AI 仲裁 + SignOz)
Returns:
str: 格式化的 Telegram 訊息 (max 900 字元)
"""
# 責任映射
resp_map = {
"FE": "👨‍💻 FE (前端)",
"BE": "⚙️ BE (後端)",
"INFRA": "🏗️ INFRA (基礎設施)",
"DB": "🗄️ DB (資料庫)",
"COLLAB": "🤝 COLLAB (協同處理)",
}
resp_display = resp_map.get(self.primary_responsibility, "❓ 未知")
# 信心度顯示
confidence_pct = int(self.confidence * 100)
if confidence_pct >= 80:
conf_emoji = "🟢"
elif confidence_pct >= 70:
conf_emoji = "🟡"
else:
conf_emoji = "🔴"
# 自動生成事件編號 (2026-03-27 ogt: 修復 INC-INC- 重複前綴)
if self.incident_id:
incident_id = self.incident_id
elif self.approval_id.startswith("INC-"):
incident_id = self.approval_id
else:
incident_id = f"INC-{self.approval_id[:8].upper()}"
# SignOz URL (優先使用動態 URL) - 必須 HTML 轉義防止解析錯誤
service_name = self.resource_name.split("-")[0] if "-" in self.resource_name else self.resource_name
raw_url = self.signoz_trace_url or f"http://192.168.0.188:3301/traces?service={service_name}"
signoz_url = html.escape(raw_url, quote=True)
# SignOz 指標區塊
signoz_block = ""
if self.signoz_metrics:
signoz_block = f"━━━━━━━━━━━━━━━━━━━\n{self.signoz_metrics.format()}\n"
# HTML 轉義用戶輸入內容,防止 "Can't parse entities" 錯誤
safe_resource = html.escape(self.resource_name[:35])
safe_root_cause = html.escape(self.root_cause[:50])
safe_action = html.escape(self.suggested_action[:35])
safe_downtime = html.escape(self.estimated_downtime)
# 2026-03-29 ogt: AI Token/Cost 顯示
ai_cost_display = ""
if self.ai_tokens > 0 or self.ai_cost > 0:
ai_cost_display = f"💰 Tokens: {self.ai_tokens:,} / ${self.ai_cost:.4f}\n"
# 2026-03-29 ogt: ADR-037 異常頻率顯示
frequency_block = ""
if self.anomaly_frequency and self.anomaly_frequency.get("count_24h", 0) > 1:
freq = self.anomaly_frequency
escalation_emoji = {
None: "",
"REPEAT": "⚠️",
"ESCALATE": "🔴",
"PERMANENT_FIX": "🚨",
}.get(freq.get("escalation_level"), "")
# 2026-04-07 Claude Code: Sprint 4 D1 — 處置統計行
auto_r = freq.get("auto_repair_count", 0)
human_a = freq.get("human_approved_count", 0)
manual_r = freq.get("manual_resolved_count", 0)
cold_s = freq.get("cold_start_trust_count", 0)
total_res = freq.get("total_resolution_count", 0)
# 處置分佈行 (只在有處置紀錄時顯示)
disposition_line = ""
if total_res > 0:
auto_total = auto_r + cold_s
auto_rate = int(auto_total / total_res * 100) if total_res > 0 else 0
disposition_line = (
f"├ 🤖 自動: <code>{auto_total}</code>"
f" | 👤 審核: <code>{human_a}</code>"
f" | 🔧 手動: <code>{manual_r}</code>\n"
f"├ 自動化率: <b>{auto_rate}%</b>\n"
)
frequency_block = (
f"━━━━━━━━━━━━━━━━━━━\n"
f"📊 <b>頻率統計</b> {escalation_emoji}\n"
f"├ 1h: <code>{freq.get('count_1h', 0)}</code> 次"
f" | 24h: <code>{freq.get('count_24h', 0)}</code> 次\n"
f"{disposition_line}"
f"└ 累計修復: <code>{auto_r}</code> 次\n"
)
if freq.get("escalation_level"):
frequency_block += f"🔺 升級: <b>{freq['escalation_level']}</b>\n"
# 2026-03-29 ogt: 根據 confidence + ai_provider 動態顯示來源
# confidence > 0 = 真實 AI 分析, confidence == 0 = 規則匹配/降級
# 2026-04-04 ogt: 加入 ai_model 顯示底層模型名稱
if self.confidence > 0 and self.ai_provider:
provider_names = {
"ollama": "Ollama",
"gemini": "Gemini",
"claude": "Claude",
"nvidia": "Nemotron",
"openclaw_nemo": "OpenClaw Nemo",
"openclaw_nvidia_nim": "OpenClaw Nemo",
"openclaw_qwen": "OpenClaw Nemo",
}
provider_display = provider_names.get(self.ai_provider.lower(), self.ai_provider.upper())
model_suffix = f" ({html.escape(self.ai_model)})" if self.ai_model else ""
source_label = f"🤖 <b>{provider_display} 仲裁</b>{model_suffix}"
elif self.confidence > 0:
source_label = "🤖 <b>AI 仲裁判定</b>"
else:
source_label = "⚙️ <b>規則匹配</b>"
# 2026-04-05 Claude Code: 重設計訊息格式,提升易讀性
# 組裝訊息
message = (
f"{self.status_emoji} <b>{html.escape(self.risk_level)}</b> <code>{html.escape(incident_id)}</code>\n"
f"<b>{safe_resource}</b>\n"
f"\n"
f"{source_label} {conf_emoji} {confidence_pct}%\n"
f"👥 {resp_display}\n"
f"💡 {safe_root_cause}\n"
)
if ai_cost_display:
message += f"{ai_cost_display}"
if frequency_block:
message += f"\n{frequency_block}"
if signoz_block:
message += f"\n{signoz_block}"
message += (
f"\n🔧 <b>建議:</b> {safe_action}\n"
f"⏱️ 停機: {safe_downtime}\n"
f"🔍 <a href='{signoz_url}'>SignOz Trace</a>"
)
return message[:900] # 900 字元上限 (nemotron 版用 1000因有額外 tool 區塊)
def format_with_nemotron(self) -> str:
"""
格式化含 Nemotron 結果的訊息 (Phase 22 ADR-044)
格式:
═══════════════════════════
🚨 CRITICAL | harbor-core
═══════════════════════════
📋 INC-20260331-0001
🎯 資源: harbor-core-7d4b8c9f5
━━━━━━━━━━━━━━━━━━━
🤖 OpenClaw 仲裁
├ 📊 信心: 🟢 85%
├ 👥 責任: BE (後端)
└ 💡 原因: JVM Heap 配置不當
━━━━━━━━━━━━━━━━━━━
🔧 Nemotron 執行方案
✅ restart_deployment: awoooi-api
✅ scale_deployment: replicas=3
└ 驗證: ✅ 驗證通過
━━━━━━━━━━━━━━━━━━━
🔧 建議: 刪除 Pod
⏱️ 停機: ~30s
Returns:
str: 格式化的 Telegram 訊息 (max 1000 字元)
"""
# 責任映射
resp_map = {
"FE": "👨‍💻 FE (前端)",
"BE": "⚙️ BE (後端)",
"INFRA": "🏗️ INFRA (基礎設施)",
"DB": "🗄️ DB (資料庫)",
"COLLAB": "🤝 COLLAB (協同處理)",
}
resp_display = resp_map.get(self.primary_responsibility, "❓ 未知")
# 信心度顯示
confidence_pct = int(self.confidence * 100)
if confidence_pct >= 80:
conf_emoji = "🟢"
elif confidence_pct >= 70:
conf_emoji = "🟡"
else:
conf_emoji = "🔴"
# 自動生成事件編號
if self.incident_id:
incident_id = self.incident_id
elif self.approval_id.startswith("INC-"):
incident_id = self.approval_id
else:
incident_id = f"INC-{self.approval_id[:8].upper()}"
# HTML 轉義
safe_resource = html.escape(self.resource_name[:35])
safe_root_cause = html.escape(self.root_cause[:50])
safe_action = html.escape(self.suggested_action[:35])
safe_downtime = html.escape(self.estimated_downtime)
# AI Provider 顯示
# 2026-04-04 ogt: 加入 ai_model 顯示底層模型名稱
if self.confidence > 0 and self.ai_provider:
provider_names = {
"ollama": "Ollama",
"gemini": "Gemini",
"claude": "Claude",
"nvidia": "Nemotron",
"openclaw_nemo": "OpenClaw Nemo",
"openclaw_nvidia_nim": "OpenClaw Nemo",
"openclaw_qwen": "OpenClaw Nemo",
}
provider_display = provider_names.get(self.ai_provider.lower(), self.ai_provider.upper())
model_suffix = f" ({html.escape(self.ai_model)})" if self.ai_model else ""
source_label = f"🤖 <b>{provider_display} 仲裁</b>{model_suffix}"
elif self.confidence > 0:
source_label = "🤖 <b>OpenClaw 仲裁</b>"
else:
source_label = "⚙️ <b>規則匹配</b>"
# Nemotron 區塊
nemotron_block = ""
if self.nemotron_enabled and self.nemotron_tools:
tools_lines = []
for t in self.nemotron_tools[:3]: # 最多顯示 3 個
valid_emoji = "" if t.get("valid", False) else ""
tool_name = html.escape(str(t.get("tool", "unknown"))[:20])
# 2026-04-05 ogt: 格式化 args 為可讀的 key=value而非 Python dict 字串
args = t.get("args", {})
if isinstance(args, dict) and args:
args_str = ", ".join(f"{k}={v}" for k, v in list(args.items())[:2])
else:
args_str = str(args)[:30]
safe_args = html.escape(args_str[:40])
tools_lines.append(f" {valid_emoji} {tool_name}: {safe_args}")
tools_str = "\n".join(tools_lines)
validation_display = html.escape(self.nemotron_validation or "⏳ 驗證中")
nemotron_block = (
f"━━━━━━━━━━━━━━━━━━━\n"
f"🔧 <b>Nemotron 執行方案</b>\n"
f"{tools_str}\n"
f"└ 驗證: {validation_display}\n"
)
if self.nemotron_latency_ms > 0:
nemotron_block += f"└ 延遲: {self.nemotron_latency_ms:.0f}ms\n"
# 2026-04-05 Claude Code: 重設計訊息格式,提升易讀性
# 組裝訊息
message = (
f"{self.status_emoji} <b>{html.escape(self.risk_level)}</b> <code>{html.escape(incident_id)}</code>\n"
f"<b>{safe_resource}</b>\n"
f"\n"
f"{source_label} {conf_emoji} {confidence_pct}%\n"
f"👥 {resp_display}\n"
f"💡 {safe_root_cause}\n"
)
if nemotron_block:
message += f"\n{nemotron_block}"
message += (
f"\n🔧 <b>建議:</b> {safe_action}\n"
f"⏱️ 停機: {safe_downtime}"
)
return message[:1000]
# =============================================================================
# 新訊息模板 (2026-03-29 ogt: ADR-038 Telegram 訊息規範)
# =============================================================================
@dataclass
class SentryErrorMessage:
"""
Sentry 錯誤訊息 (SENTRY_ERROR)
2026-03-29 ogt: 新增,用於 Sentry 錯誤通知
按鈕: [🔍 查看詳情] [🔕 靜默 1h]
"""
error_id: str # Sentry Issue ID
error_type: str # TypeError, ValueError, etc.
error_message: str # 錯誤訊息 (max 100)
service_name: str # awoooi-api, awoooi-web, etc.
file_location: str # src/api/v1/incidents.py:123
occurrence_count: int = 1 # 發生次數
affected_users: int = 0 # 影響用戶數
first_seen: str = "" # 首次發生時間
stack_trace: list[str] | None = None # Stack trace (前 3 行)
sentry_url: str = "" # Sentry 連結
def format(self) -> str:
"""格式化為 Telegram HTML"""
safe_error = html.escape(self.error_message[:80])
safe_type = html.escape(self.error_type[:30])
safe_service = html.escape(self.service_name[:25])
safe_file = html.escape(self.file_location[:50])
# Stack trace 區塊
trace_block = ""
if self.stack_trace:
trace_lines = "\n".join(f"{html.escape(line[:50])}" for line in self.stack_trace[:3])
trace_block = f"🔗 Stack Trace (前 3 行):\n{trace_lines}\n"
# Sentry URL
sentry_link = ""
if self.sentry_url:
safe_url = html.escape(self.sentry_url, quote=True)
sentry_link = f"\n🔍 <a href='{safe_url}'>查看 Sentry</a>"
message = (
f"═══════════════════════════\n"
f"🐛 <b>SENTRY ERROR</b> | {safe_service}\n"
f"═══════════════════════════\n"
f"📋 <code>{html.escape(self.error_id)}</code>\n"
f"🎯 錯誤: <code>{safe_type}</code>\n"
f"━━━━━━━━━━━━━━━━━━━\n"
f"💬 {safe_error}\n"
f"━━━━━━━━━━━━━━━━━━━\n"
f"📊 <b>統計</b>\n"
f"├ 發生次數: <code>{self.occurrence_count}</code>\n"
f"├ 影響用戶: <code>{self.affected_users}</code>\n"
f"└ 首次發生: {html.escape(self.first_seen) if self.first_seen else 'N/A'}\n"
f"━━━━━━━━━━━━━━━━━━━\n"
f"📍 位置: <code>{safe_file}</code>\n"
f"{trace_block}"
f"{sentry_link}"
)
return message[:900]
@dataclass
class ResourceWarnMessage:
"""
資源告警訊息 (RESOURCE_WARN)
2026-03-29 ogt: 新增,用於資源耗盡警告
按鈕: [⚡ 自動擴展] [🔕 靜默 1h]
"""
resource_id: str # RES-YYYYMMDD-XXXX
pod_name: str # Pod 名稱
namespace: str = "default" # K8s namespace
cpu_percent: float = 0.0 # CPU 使用率
cpu_limit: str = "" # CPU limit (e.g., 500m)
memory_percent: float = 0.0 # Memory 使用率
memory_limit: str = "" # Memory limit (e.g., 512Mi)
disk_percent: float = 0.0 # Disk 使用率
trend_info: str = "" # 趨勢資訊
suggestion: str = "" # 建議操作
def format(self) -> str:
"""格式化為 Telegram HTML"""
safe_pod = html.escape(self.pod_name[:35])
safe_ns = html.escape(self.namespace[:20])
# 資源狀態 emoji
def get_status_emoji(percent: float) -> str:
if percent >= 90:
return "🔴"
elif percent >= 70:
return "🟡"
return "🟢"
cpu_emoji = get_status_emoji(self.cpu_percent)
mem_emoji = get_status_emoji(self.memory_percent)
disk_emoji = get_status_emoji(self.disk_percent)
# 趨勢和建議
trend_block = ""
if self.trend_info:
trend_block = f"📈 趨勢: {html.escape(self.trend_info[:50])}\n"
suggestion_block = ""
if self.suggestion:
suggestion_block = f"💡 建議: {html.escape(self.suggestion[:50])}\n"
message = (
f"═══════════════════════════\n"
f"⚠️ <b>資源告警</b> | {safe_ns}\n"
f"═══════════════════════════\n"
f"📋 <code>{html.escape(self.resource_id)}</code>\n"
f"🎯 Pod: <code>{safe_pod}</code>\n"
f"━━━━━━━━━━━━━━━━━━━\n"
f"📊 <b>資源使用率</b>\n"
f"├ CPU: {cpu_emoji} <code>{self.cpu_percent:.1f}%</code>"
f"{f' (limit: {self.cpu_limit})' if self.cpu_limit else ''}\n"
f"├ Memory: {mem_emoji} <code>{self.memory_percent:.1f}%</code>"
f"{f' (limit: {self.memory_limit})' if self.memory_limit else ''}\n"
f"└ Disk: {disk_emoji} <code>{self.disk_percent:.1f}%</code>\n"
f"━━━━━━━━━━━━━━━━━━━\n"
f"{trend_block}"
f"{suggestion_block}"
)
return message[:900]
@dataclass
class RepairReportMessage:
"""
自動修復報告訊息 (REPAIR_REPORT)
2026-03-29 ogt: 新增,用於每日自動修復彙總
按鈕: 無
"""
report_date: str # 報告日期 (YYYY-MM-DD)
total_repairs: int = 0 # 總修復次數
success_count: int = 0 # 成功次數
failure_count: int = 0 # 失敗次數
saved_minutes: int = 0 # 節省人工時間 (分鐘)
top_issues: list[tuple[str, int]] | None = None # Top 問題 [(name, count)]
ai_cost_gemini: float = 0.0 # Gemini 成本
ai_cost_nvidia: float = 0.0 # NVIDIA 成本 (免費)
ai_tokens_total: int = 0 # 總 Token 數
def format(self) -> str:
"""格式化為 Telegram HTML"""
# 成功率
success_rate = (self.success_count / self.total_repairs * 100) if self.total_repairs > 0 else 0
# Top 問題區塊
issues_block = ""
if self.top_issues:
issues_lines = "\n".join(
f" {i+1}. {html.escape(name[:30])} ({count} 次)"
for i, (name, count) in enumerate(self.top_issues[:3])
)
issues_block = f"━━━━━━━━━━━━━━━━━━━\n🔝 <b>Top 3 問題</b>:\n{issues_lines}\n"
# AI 成本
total_cost = self.ai_cost_gemini + self.ai_cost_nvidia
message = (
f"═══════════════════════════\n"
f"🔧 <b>自動修復報告</b> | 每日彙總\n"
f"═══════════════════════════\n"
f"📅 {html.escape(self.report_date)}\n"
f"━━━━━━━━━━━━━━━━━━━\n"
f"📊 <b>統計</b>\n"
f"├ 總修復次數: <code>{self.total_repairs}</code>\n"
f"├ 成功: ✅ <code>{self.success_count}</code> ({success_rate:.0f}%)\n"
f"├ 失敗: ❌ <code>{self.failure_count}</code>\n"
f"└ 節省人工: ~<code>{self.saved_minutes}</code> 分鐘\n"
f"{issues_block}"
f"━━━━━━━━━━━━━━━━━━━\n"
f"💰 <b>AI 成本</b>\n"
f"├ Gemini: ${self.ai_cost_gemini:.4f} ({self.ai_tokens_total:,} tokens)\n"
f"├ NVIDIA: ${self.ai_cost_nvidia:.4f} (免費)\n"
f"└ 總計: ${total_cost:.4f}"
)
return message[:900]
@dataclass
class DailySummaryMessage:
"""
每日摘要訊息 (DAILY_SUMMARY)
2026-03-29 ogt: 新增,用於每日系統狀態摘要
按鈕: 無
"""
summary_date: str # 摘要日期 (YYYY-MM-DD)
# 告警統計
alert_total: int = 0
alert_critical: int = 0
alert_medium: int = 0
alert_low: int = 0
# 處理統計
auto_repair_count: int = 0
manual_approval_count: int = 0
ignored_count: int = 0
avg_response_minutes: float = 0.0
# 可用性
api_availability: float = 99.9
web_availability: float = 99.9
worker_availability: float = 99.9
# 成本
ai_cost: float = 0.0
cloud_cost: float = 0.0
budget_remaining: float = 0.0
def format(self) -> str:
"""格式化為 Telegram HTML"""
# 處理百分比
total_handled = self.auto_repair_count + self.manual_approval_count + self.ignored_count
auto_pct = (self.auto_repair_count / total_handled * 100) if total_handled > 0 else 0
manual_pct = (self.manual_approval_count / total_handled * 100) if total_handled > 0 else 0
ignored_pct = (self.ignored_count / total_handled * 100) if total_handled > 0 else 0
message = (
f"═══════════════════════════\n"
f"📊 <b>每日摘要</b> | AWOOOI\n"
f"═══════════════════════════\n"
f"📅 {html.escape(self.summary_date)}\n"
f"━━━━━━━━━━━━━━━━━━━\n"
f"🚨 <b>告警統計</b>\n"
f"├ 總數: <code>{self.alert_total}</code>\n"
f"├ Critical: <code>{self.alert_critical}</code>\n"
f"├ Medium: <code>{self.alert_medium}</code>\n"
f"└ Low: <code>{self.alert_low}</code>\n"
f"━━━━━━━━━━━━━━━━━━━\n"
f"✅ <b>處理統計</b>\n"
f"├ 自動修復: <code>{self.auto_repair_count}</code> ({auto_pct:.0f}%)\n"
f"├ 人工簽核: <code>{self.manual_approval_count}</code> ({manual_pct:.0f}%)\n"
f"├ 忽略/靜默: <code>{self.ignored_count}</code> ({ignored_pct:.0f}%)\n"
f"└ 平均回應: <code>{self.avg_response_minutes:.1f}</code> 分鐘\n"
f"━━━━━━━━━━━━━━━━━━━\n"
f"📈 <b>可用性</b>\n"
f"├ API: <code>{self.api_availability:.2f}%</code>\n"
f"├ Web: <code>{self.web_availability:.2f}%</code>\n"
f"└ Worker: <code>{self.worker_availability:.2f}%</code>\n"
f"━━━━━━━━━━━━━━━━━━━\n"
f"💰 <b>成本</b>\n"
f"├ AI: ${self.ai_cost:.2f}\n"
f"├ 雲端: ${self.cloud_cost:.2f}\n"
f"└ 預算剩餘: ${self.budget_remaining:.2f}"
)
return message[:900]
@dataclass
class CICDProgressMessage:
"""
CI/CD 進度訊息 (CICD_PROGRESS)
2026-03-30 ogt: 新增,用於 CI/CD 流程進度通知
特性: 簡潔、不走 AI 仲裁、無按鈕
"""
job_name: str # Job 名稱 (e.g., Build, Test, Deploy)
status: str # running, success, failed
stage: str = "" # CI/CD 階段 (e.g., build, test, deploy)
commit_sha: str = "" # Git commit SHA
triggered_by: str = "" # 觸發者
duration_seconds: int = 0 # 執行時間
message: str = "" # 額外訊息
workflow_url: str = "" # Workflow 連結
def format(self) -> str:
"""格式化為 Telegram HTML (簡潔版)"""
# 狀態 emoji
status_emoji = {
"running": "🔄",
"success": "",
"failed": "",
"pending": "",
}.get(self.status.lower(), "📦")
safe_job = html.escape(self.job_name[:40])
safe_stage = html.escape(self.stage[:20]) if self.stage else ""
# 時間格式化
duration_str = ""
if self.duration_seconds > 0:
minutes = self.duration_seconds // 60
seconds = self.duration_seconds % 60
duration_str = f" ({minutes}m {seconds}s)" if minutes > 0 else f" ({seconds}s)"
# Commit 資訊
commit_info = ""
if self.commit_sha:
commit_info = f"\n📋 <code>{html.escape(self.commit_sha[:8])}</code>"
# Workflow 連結
workflow_link = ""
if self.workflow_url:
safe_url = html.escape(self.workflow_url, quote=True)
workflow_link = f"\n🔗 <a href='{safe_url}'>Workflow</a>"
# 簡潔訊息
stage_label = f" | {safe_stage}" if safe_stage else ""
message = (
f"{status_emoji} <b>[AWOOOI CI/CD]</b>{stage_label}\n"
f"📦 {safe_job}{duration_str}"
f"{commit_info}"
f"{workflow_link}"
)
return message[:900]
@dataclass
class DeploySuccessMessage:
"""
部署成功訊息 (DEPLOY_SUCCESS)
2026-03-29 ogt: 新增,用於 CD 部署成功通知
按鈕: 無
"""
commit_sha: str # Git commit SHA (short)
triggered_by: str # 觸發者
environment: str = "Production" # 環境
# 版本資訊
api_version: str = ""
web_version: str = ""
worker_version: str = ""
# 部署時間
duration_seconds: int = 0
# 測試結果
e2e_passed: int = 0
e2e_total: int = 0
health_check_passed: bool = True
# 連結
workflow_url: str = ""
def format(self) -> str:
"""格式化為 Telegram HTML"""
safe_commit = html.escape(self.commit_sha[:8])
safe_user = html.escape(self.triggered_by[:20])
safe_env = html.escape(self.environment[:15])
# 部署時間格式化
minutes = self.duration_seconds // 60
seconds = self.duration_seconds % 60
duration_str = f"{minutes}m {seconds}s" if minutes > 0 else f"{seconds}s"
# 測試結果
e2e_status = "" if self.e2e_passed == self.e2e_total else "⚠️"
health_status = "✅ 全部通過" if self.health_check_passed else "❌ 部分失敗"
# Workflow 連結
workflow_link = ""
if self.workflow_url:
safe_url = html.escape(self.workflow_url, quote=True)
workflow_link = f"\n🔗 <a href='{safe_url}'>查看 Workflow</a>"
message = (
f"✅ <b>部署成功</b> | {safe_env}\n\n"
f"📋 Commit: <code>{safe_commit}</code>\n"
f"👤 觸發者: @{safe_user}\n"
f"━━━━━━━━━━━━━━━━━━━\n"
f"📊 <b>部署詳情</b>\n"
f"├ API: {html.escape(self.api_version) if self.api_version else 'N/A'}\n"
f"├ Web: {html.escape(self.web_version) if self.web_version else 'N/A'}\n"
f"├ Worker: {html.escape(self.worker_version) if self.worker_version else 'N/A'}\n"
f"└ 耗時: {duration_str}\n"
f"━━━━━━━━━━━━━━━━━━━\n"
f"🧪 E2E 測試: {e2e_status} {self.e2e_passed}/{self.e2e_total} PASSED\n"
f"📊 健康檢查: {health_status}"
f"{workflow_link}"
)
return message[:900]
@dataclass
class RateLimitMessage:
"""
API 限額警告訊息 (RATE_LIMIT)
2026-03-29 ogt: 新增,用於 AI API 限額警告
按鈕: 無
"""
provider: str # gemini, openai, etc.
# 用量統計
daily_usage: int = 0
daily_limit: int = 0
token_usage: int = 0
token_limit: int = 0
cost_usd: float = 0.0
# 建議
suggestions: list[str] | None = None
# 重置時間
reset_time: str = ""
def format(self) -> str:
"""格式化為 Telegram HTML"""
safe_provider = html.escape(self.provider.upper()[:15])
# 使用率百分比
usage_pct = (self.daily_usage / self.daily_limit * 100) if self.daily_limit > 0 else 0
token_pct = (self.token_usage / self.token_limit * 100) if self.token_limit > 0 else 0
# 建議區塊
suggestion_block = ""
if self.suggestions:
suggestion_lines = "\n".join(f" - {html.escape(s[:50])}" for s in self.suggestions[:3])
suggestion_block = f"━━━━━━━━━━━━━━━━━━━\n💡 <b>建議</b>:\n{suggestion_lines}\n"
# 重置時間
reset_block = ""
if self.reset_time:
reset_block = f"\n🔄 將於 {html.escape(self.reset_time)} 重置"
message = (
f"⚠️ <b>API 限額警告</b>\n\n"
f"━━━━━━━━━━━━━━━━━━━\n"
f"📊 <b>{safe_provider} API</b>\n"
f"├ 今日用量: <code>{self.daily_usage}/{self.daily_limit}</code> ({usage_pct:.0f}%)\n"
f"├ Token: <code>{self.token_usage:,}/{self.token_limit:,}</code> ({token_pct:.0f}%)\n"
f"└ 成本: ${self.cost_usd:.4f}\n"
f"{suggestion_block}"
f"{reset_block}"
)
return message[:900]
@dataclass
class K3sStatusMessage:
"""
K3s 叢集狀態報告訊息 (K3S_STATUS)
2026-03-31 Claude Code: Phase 21.2 定期報告
用於每日 K3s 健康狀態推送
按鈕: 無
"""
report_date: str # 報告日期 (YYYY-MM-DD HH:MM)
# 節點狀態
node_total: int = 0
node_ready: int = 0
# Pod 狀態
pod_total: int = 0
pod_running: int = 0
pod_pending: int = 0
pod_failed: int = 0
# HPA 狀態
hpa_api_replicas: str = "2/6"
hpa_web_replicas: str = "2/6"
hpa_worker_replicas: str = "1/3"
# 備份狀態
etcd_backup_last: str = ""
velero_backup_last: str = ""
# 穩定指標
alert_count_48h: int = 0
pod_restart_48h: int = 0
# 版本資訊
k3s_version: str = ""
def format(self) -> str:
"""格式化為 Telegram HTML"""
# 健康狀態 emoji
node_health = "" if self.node_ready == self.node_total else "⚠️"
pod_health = "" if self.pod_failed == 0 and self.pod_pending == 0 else "⚠️"
stability = "" if self.alert_count_48h == 0 and self.pod_restart_48h == 0 else "⚠️"
# 備份狀態
etcd_status = html.escape(self.etcd_backup_last[:20]) if self.etcd_backup_last else "N/A"
velero_status = html.escape(self.velero_backup_last[:20]) if self.velero_backup_last else "N/A"
message = (
f"═══════════════════════════\n"
f"🎛️ <b>K3s 叢集狀態</b> | Daily\n"
f"═══════════════════════════\n"
f"📅 {html.escape(self.report_date)}\n"
f"━━━━━━━━━━━━━━━━━━━\n"
f"{node_health} <b>節點</b>: {self.node_ready}/{self.node_total} Ready\n"
f"━━━━━━━━━━━━━━━━━━━\n"
f"{pod_health} <b>Pod 狀態</b>\n"
f"├ Running: <code>{self.pod_running}</code>\n"
f"├ Pending: <code>{self.pod_pending}</code>\n"
f"└ Failed: <code>{self.pod_failed}</code>\n"
f"━━━━━━━━━━━━━━━━━━━\n"
f"📊 <b>HPA 副本數</b>\n"
f"├ API: <code>{html.escape(self.hpa_api_replicas)}</code>\n"
f"├ Web: <code>{html.escape(self.hpa_web_replicas)}</code>\n"
f"└ Worker: <code>{html.escape(self.hpa_worker_replicas)}</code>\n"
f"━━━━━━━━━━━━━━━━━━━\n"
f"💾 <b>備份</b>\n"
f"├ etcd: {etcd_status}\n"
f"└ Velero: {velero_status}\n"
f"━━━━━━━━━━━━━━━━━━━\n"
f"{stability} <b>48h 穩定度</b>\n"
f"├ 告警: <code>{self.alert_count_48h}</code>\n"
f"└ Pod 重啟: <code>{self.pod_restart_48h}</code>"
)
return message[:900]
@dataclass
class WeeklyReportMessage:
"""
週報訊息 (WEEKLY_REPORT)
2026-03-31 Claude Code: Phase 21.3 定期報告
每週五 18:00 台北發送
按鈕: 無
"""
week_range: str # 週次 (e.g., "2026-W14")
report_date: str # 報告日期時間
# 告警統計
alert_total: int = 0
alert_critical: int = 0
alert_resolved: int = 0
resolved_rate: float = 0.0
# AI 效能
ai_proposal_count: int = 0
ai_executed_count: int = 0
ai_success_rate: float = 0.0
avg_response_minutes: float = 0.0
# K3s 健康
k3s_uptime_pct: float = 99.9
pod_restart_total: int = 0
hpa_scale_events: int = 0
# Git 活動
commits_count: int = 0
deploy_count: int = 0
# 成本
ai_cost_week: float = 0.0
ai_tokens_week: int = 0
# 2026-04-07 Claude Code: Sprint 4 F1 — 處置分佈
disposition_auto: int = 0
disposition_human: int = 0
disposition_manual: int = 0
disposition_cold_start: int = 0
disposition_total: int = 0
def format(self) -> str:
"""格式化為 Telegram HTML"""
# 健康狀態 emoji
alert_health = "" if self.resolved_rate >= 80 else "⚠️"
ai_health = "" if self.ai_success_rate >= 70 else "⚠️"
k3s_health = "" if self.k3s_uptime_pct >= 99 else "⚠️"
message = (
f"═══════════════════════════\n"
f"📊 <b>AWOOOI 週報</b>\n"
f"═══════════════════════════\n"
f"📅 {html.escape(self.week_range)} | {html.escape(self.report_date)}\n"
f"━━━━━━━━━━━━━━━━━━━\n"
f"{alert_health} <b>告警統計</b>\n"
f"├ 總數: <code>{self.alert_total}</code>\n"
f"├ Critical: <code>{self.alert_critical}</code>\n"
f"├ 已解決: <code>{self.alert_resolved}</code>\n"
f"└ 解決率: <code>{self.resolved_rate:.1f}%</code>\n"
f"━━━━━━━━━━━━━━━━━━━\n"
f"{ai_health} <b>AI 效能</b>\n"
f"├ 提案數: <code>{self.ai_proposal_count}</code>\n"
f"├ 執行數: <code>{self.ai_executed_count}</code>\n"
f"├ 成功率: <code>{self.ai_success_rate:.1f}%</code>\n"
f"└ 平均回應: <code>{self.avg_response_minutes:.1f}</code> 分鐘\n"
f"━━━━━━━━━━━━━━━━━━━\n"
f"{k3s_health} <b>K3s 健康</b>\n"
f"├ Uptime: <code>{self.k3s_uptime_pct:.2f}%</code>\n"
f"├ Pod 重啟: <code>{self.pod_restart_total}</code>\n"
f"└ HPA 擴縮: <code>{self.hpa_scale_events}</code> 次\n"
f"━━━━━━━━━━━━━━━━━━━\n"
f"📦 <b>開發活動</b>\n"
f"├ Commits: <code>{self.commits_count}</code>\n"
f"└ 部署: <code>{self.deploy_count}</code> 次\n"
f"━━━━━━━━━━━━━━━━━━━\n"
f"💰 <b>AI 成本</b>\n"
f"├ 費用: $<code>{self.ai_cost_week:.2f}</code>\n"
f"└ Tokens: <code>{self.ai_tokens_week:,}</code>\n"
)
# Sprint 4 F1: 處置分佈(有資料才加)
if self.disposition_total > 0:
auto_total = self.disposition_auto + self.disposition_cold_start
auto_rate = int(auto_total / self.disposition_total * 100) if self.disposition_total > 0 else 0
message += (
f"━━━━━━━━━━━━━━━━━━━\n"
f"📋 <b>處置分佈</b>\n"
f"├ 🤖 自動修復: <code>{self.disposition_auto}</code>\n"
f"├ ❄️ 冷啟動信任: <code>{self.disposition_cold_start}</code>\n"
f"├ 👤 人工審核: <code>{self.disposition_human}</code>\n"
f"├ 🔧 手動處理: <code>{self.disposition_manual}</code>\n"
f"└ 自動化率: <b>{auto_rate}%</b>"
)
return message[:1200]
@dataclass
class InfraAlertMessage:
"""
基礎設施異常告警訊息 (INFRA_ALERT)
2026-04-03 ogt: 新增 — 補足 Nemotron/NIM 等基礎設施異常的標準告警格式
用途: 非 incident 型的系統元件異常通知 (AI provider, DB, 外部 API 等)
按鈕: 無 (資訊型告警)
"""
component: str # 元件名稱 (e.g., "Nemotron NIM")
status: str # 狀態描述 (e.g., "⚠️ 超時 (>25s)")
impact: str # 影響說明
auto_fixed: bool = False # 是否已自動修復
fix_action: str = "" # 執行的修復動作 (auto_fixed=True 時顯示)
note: str = "" # 附加說明 (info_only 情境用,不顯示修復區塊)
def format(self) -> str:
"""格式化為 Telegram HTML"""
# 有 note 表示「資訊性提示」,不顯示修復區塊
if self.note:
footer = f"━━━━━━━━━━━━━━━━━━━\n💡 {html.escape(self.note[:150])}\n"
elif self.auto_fixed:
footer = f"━━━━━━━━━━━━━━━━━━━\n✅ <b>已自動修復</b>\n{html.escape(self.fix_action[:100])}\n"
else:
footer = f"━━━━━━━━━━━━━━━━━━━\n⚠️ <b>需要關注</b>\n{html.escape(self.fix_action[:100] or '請確認元件狀態')}\n"
return (
f"🚨 <b>基礎設施異常</b>\n"
f"━━━━━━━━━━━━━━━━━━━\n"
f"⚙️ <b>{html.escape(self.component)}</b>: {html.escape(self.status)}\n"
f"📛 影響: {html.escape(self.impact[:150])}\n"
f"{footer}"
)[:900]
# =============================================================================
# Risk Level Emoji Mapping
# =============================================================================
RISK_EMOJI_MAP = {
"critical": "🚨",
"high": "🔴",
"medium": "⚠️",
"low": "",
}
# =============================================================================
# Telegram Gateway
# =============================================================================
class TelegramGatewayError(Exception):
"""Telegram Gateway 錯誤"""
pass
class TelegramGateway:
"""
Telegram Gateway - 行動戰情室 + SignOz 整合
職責:
1. 推送待簽核卡片到 Telegram (含 SignOz 指標)
2. 接收並驗證簽核/調優回調
3. Shadow Mode 調優執行 (僅日誌)
4. 遵守 SOUL.md 訊息壓縮原則
"""
TELEGRAM_API_BASE = "https://api.telegram.org"
def __init__(self):
self._http_client: httpx.AsyncClient | None = None
self._security = get_security_interceptor()
self._initialized = False
# Long Polling 狀態 (Phase 5 內網修復)
self._polling_active = False
self._polling_task: asyncio.Task | None = None
self._last_update_id = 0
# 2026-04-01 Claude Code: 分散式 Leader Election (防 2-Pod 409 互搶)
self._pod_id = os.environ.get("POD_NAME", os.urandom(8).hex())
self._leader_task: asyncio.Task | None = None
# Phase 6.5: 心跳監控 (防止沉默盲點)
self._last_message_time: datetime | None = None
self._heartbeat_task: asyncio.Task | None = None
self._heartbeat_active = False
async def initialize(self) -> bool:
"""初始化 Gateway"""
if not settings.OPENCLAW_TG_BOT_TOKEN:
logger.warning("telegram_gateway_disabled", reason="Bot token not configured")
return False
if not settings.OPENCLAW_TG_CHAT_ID:
logger.warning("telegram_gateway_disabled", reason="Chat ID not configured")
return False
# 2026-04-03 ogt: timeout 改用 httpx.Timeout 分開設定
# connect=10s, read=50s (getUpdates long polling timeout 40s + buffer)
# 舊的 timeout=30.0 會讓 getUpdates(timeout=40s) 每次都被 client 先打斷
self._http_client = httpx.AsyncClient(
timeout=httpx.Timeout(connect=10.0, read=50.0, write=10.0, pool=10.0),
headers={"Content-Type": "application/json"},
)
await self._security.initialize()
self._initialized = True
logger.info("telegram_gateway_initialized")
return True
@property
def bot_token(self) -> str:
"""取得 Bot Token"""
return settings.OPENCLAW_TG_BOT_TOKEN
@property
def chat_id(self) -> str:
"""取得 Chat ID"""
return settings.OPENCLAW_TG_CHAT_ID
@property
def api_url(self) -> str:
"""取得 Telegram API URL"""
return f"{self.TELEGRAM_API_BASE}/bot{self.bot_token}"
async def _send_request(
self,
method: str,
payload: dict,
) -> dict:
"""
發送 Telegram API 請求
Phase C P1: 新增 OTEL 追蹤
@author Claude Code
@date 2026-03-30 (台北時間)
Args:
method: API 方法 (sendMessage, editMessageText, etc.)
payload: 請求 Payload
Returns:
dict: API 回應
"""
if not self._initialized:
await self.initialize()
if not self._http_client:
raise TelegramGatewayError("HTTP client not initialized")
url = f"{self.api_url}/{method}"
# OTEL Span: telegram.api.{method}
with _tracer.start_as_current_span(
f"telegram.api.{method}",
attributes={
"telegram.method": method,
"telegram.chat_id": str(payload.get("chat_id", "")),
"telegram.has_reply_markup": "reply_markup" in payload,
},
) as span:
try:
response = await self._http_client.post(url, json=payload)
response.raise_for_status()
result = response.json()
if not result.get("ok"):
span.set_attribute("telegram.error", result.get("description", "Unknown"))
span.set_status(trace.Status(trace.StatusCode.ERROR))
raise TelegramGatewayError(
f"Telegram API error: {result.get('description', 'Unknown error')}"
)
# 成功: 記錄 message_id (result 可能是 dict 或 bool需防禦)
result_val = result.get("result")
if isinstance(result_val, dict) and "message_id" in result_val:
span.set_attribute("telegram.message_id", result_val["message_id"])
span.set_status(trace.Status(trace.StatusCode.OK))
return result
except httpx.HTTPStatusError as e:
span.set_attribute("telegram.http_status", e.response.status_code)
span.set_status(trace.Status(trace.StatusCode.ERROR))
span.record_exception(e)
logger.error("telegram_api_error", method=method, status=e.response.status_code)
raise TelegramGatewayError(f"HTTP error: {e.response.status_code}") from e
except TelegramGatewayError:
# 已處理的錯誤,直接拋出
raise
except Exception as e:
span.set_status(trace.Status(trace.StatusCode.ERROR))
span.record_exception(e)
logger.error("telegram_request_failed", method=method, error=str(e))
raise TelegramGatewayError(str(e)) from e
def _build_inline_keyboard(
self,
approval_id: str,
include_auto_tuning: bool = True,
auto_tuning_command: str = "",
incident_id: str = "",
) -> dict:
"""
建立 Inline Keyboard (ADR-050 v2.0 六鍵佈局)
2026-04-01 Claude Code (ADR-050): 重組為 6 鍵 + 可選自動調優
- 第一行: [✅ 批准] [❌ 拒絕] [🔕 靜默] ← nonce 防重放
- 第二行: [📋 詳情] [🔄 重診] [📊 歷史] ← incident_id format (read-only)
- 第三行: [⚡ 自動調優] (可選)
Args:
approval_id: 簽核單 ID (用於 nonce 生成)
include_auto_tuning: 是否包含自動調優按鈕
auto_tuning_command: kubectl 調優指令
incident_id: 關聯 Incident ID (用於 detail/reanalyze/history 按鈕)
Returns:
dict: Telegram InlineKeyboardMarkup
"""
# 產生 Nonce (防重放,用於寫操作)
approve_nonce = self._security.generate_callback_nonce(approval_id, "approve")
reject_nonce = self._security.generate_callback_nonce(approval_id, "reject")
silence_nonce = self._security.generate_callback_nonce(approval_id, "silence")
# 第一行: 主要簽核操作 (nonce 保護)
buttons = [
[
{"text": "✅ 批准", "callback_data": approve_nonce},
{"text": "❌ 拒絕", "callback_data": reject_nonce},
{"text": "🔕 靜默", "callback_data": silence_nonce},
],
]
# 第二行: 資訊查詢按鈕 (ADR-050: read-only, format: action:incident_id)
if incident_id:
buttons.append([
{"text": "📋 詳情", "callback_data": f"detail:{incident_id}"},
{"text": "🔄 重診", "callback_data": f"reanalyze:{incident_id}"},
{"text": "📊 歷史", "callback_data": f"history:{incident_id}"},
])
# 第三行: 自動調優按鈕 (v7.0)
if include_auto_tuning and auto_tuning_command:
tuning_nonce = self._security.generate_callback_nonce(approval_id, "tune")
buttons.append([
{"text": "⚡ 執行自動調優", "callback_data": tuning_nonce}
])
return {"inline_keyboard": buttons}
async def send_approval_card(
self,
approval_id: str,
risk_level: str,
resource_name: str,
root_cause: str,
suggested_action: str,
estimated_downtime: str = "~30s",
# v6.0 AI 仲裁欄位
primary_responsibility: str = "COLLAB",
confidence: float = 0.0,
namespace: str = "default",
# v7.0 SignOz 整合
signoz_rps: float = 0.0,
signoz_rps_trend: str = "stable",
signoz_error_rate: float = 0.0,
signoz_p99_latency: float = 0.0,
signoz_latency_trend: str = "stable",
signoz_trace_url: str = "",
auto_tuning_command: str = "",
# 2026-03-29 ogt: AI Token/Cost 追蹤
ai_tokens: int = 0,
ai_cost: float = 0.0,
# 2026-03-29 ogt: ADR-037 異常頻率統計
anomaly_frequency: dict | None = None,
# 2026-03-29 ogt: AI Provider 來源顯示
ai_provider: str = "",
# 2026-04-04 ogt: 底層模型名稱
ai_model: str = "",
# 2026-04-02 ogt: Phase 22 Nemotron 協作 (ADR-044)
nemotron_enabled: bool = False,
nemotron_tools: list[dict] | None = None,
nemotron_validation: str = "",
nemotron_latency_ms: float = 0.0,
# 2026-04-05 Claude Code: incident_id 用於 detail/reanalyze/history 按鈕
incident_id: str = "",
) -> dict:
"""
推送待簽核卡片到 Telegram (v7.0 含 SignOz 整合)
SOUL.md 4.1 + AI 仲裁 + SignOz 訊息格式
Phase 21 (ADR-037): 含異常頻率統計
Args:
approval_id: 簽核單 ID
risk_level: 風險等級 (critical/medium/low)
resource_name: 資源名稱
root_cause: 根因摘要
suggested_action: 建議操作
estimated_downtime: 預計停機時間
primary_responsibility: 責任團隊 (FE/BE/INFRA/DB/COLLAB)
confidence: AI 信心度 (0.0-1.0)
namespace: K8s namespace
signoz_*: SignOz Gold Metrics
signoz_trace_url: 動態時間參數的 Trace URL
auto_tuning_command: kubectl 調優指令
anomaly_frequency: 異常頻率統計 (ADR-037)
Returns:
dict: Telegram API 回應
"""
# 取得狀態 Emoji
emoji = RISK_EMOJI_MAP.get(risk_level.lower(), "⚠️")
# 建立 SignOz 指標區塊
signoz_metrics = None
if signoz_rps > 0 or signoz_error_rate > 0 or signoz_p99_latency > 0:
signoz_metrics = SignOzMetricsBlock(
rps=signoz_rps,
rps_trend=signoz_rps_trend,
error_rate=signoz_error_rate,
p99_latency_ms=signoz_p99_latency,
latency_trend=signoz_latency_trend,
trace_url=signoz_trace_url,
)
# 建立訊息結構 (含 AI 仲裁 + SignOz + Token/Cost + 頻率統計)
message = TelegramMessage(
status_emoji=emoji,
risk_level=risk_level.upper(),
resource_name=resource_name,
root_cause=root_cause,
suggested_action=suggested_action,
estimated_downtime=estimated_downtime,
approval_id=approval_id,
incident_id=incident_id,
primary_responsibility=primary_responsibility,
confidence=confidence,
namespace=namespace,
signoz_metrics=signoz_metrics,
signoz_trace_url=signoz_trace_url,
auto_tuning_command=auto_tuning_command,
# 2026-03-29 ogt: AI Token/Cost 追蹤
ai_tokens=ai_tokens,
ai_cost=ai_cost,
# 2026-03-29 ogt: ADR-037 異常頻率統計
anomaly_frequency=anomaly_frequency,
# 2026-03-29 ogt: AI Provider 來源顯示
ai_provider=ai_provider,
# 2026-04-04 ogt: 底層模型名稱
ai_model=ai_model,
# 2026-04-02 ogt: Phase 22 Nemotron 協作 (ADR-044)
nemotron_enabled=nemotron_enabled,
nemotron_tools=nemotron_tools,
nemotron_validation=nemotron_validation,
nemotron_latency_ms=nemotron_latency_ms,
)
# 格式化訊息 — Phase 22: 如果 Nemotron 啟用,使用雙軌格式
text = message.format_with_nemotron() if nemotron_enabled else message.format()
# 建立按鈕 (含自動調優)
# 2026-04-05 Claude Code: 傳入 incident_id 以啟用 detail/reanalyze/history 按鈕
keyboard = self._build_inline_keyboard(
approval_id=approval_id,
include_auto_tuning=bool(auto_tuning_command),
auto_tuning_command=auto_tuning_command,
incident_id=incident_id,
)
# 發送訊息
payload = {
"chat_id": self.chat_id,
"text": text,
"parse_mode": "HTML",
"reply_markup": keyboard,
"disable_web_page_preview": True, # 避免 SignOz URL 預覽
}
logger.info(
"telegram_approval_card_sending",
approval_id=approval_id,
risk_level=risk_level,
resource=resource_name,
signoz_integrated=signoz_metrics is not None,
auto_tuning_available=bool(auto_tuning_command),
)
result = await self._send_request("sendMessage", payload)
logger.info(
"telegram_approval_card_sent",
approval_id=approval_id,
message_id=result.get("result", {}).get("message_id"),
)
# 2026-04-03 ogt: 發到 SRE 群組並觸發 AI 雙向討論 (Triumvirate ADR-053)
# 2026-04-05 ogt: 升級為完整 v7.0 格式,與個人 chat 一致
# 非同步執行,失敗不影響告警主流程
if settings.SRE_GROUP_CHAT_ID:
asyncio.create_task(
self._send_approval_card_to_group(
approval_id=approval_id,
risk_level=risk_level,
resource_name=resource_name,
root_cause=root_cause,
suggested_action=suggested_action,
estimated_downtime=estimated_downtime,
primary_responsibility=primary_responsibility,
confidence=confidence,
namespace=namespace,
signoz_rps=signoz_rps,
signoz_rps_trend=signoz_rps_trend,
signoz_error_rate=signoz_error_rate,
signoz_p99_latency=signoz_p99_latency,
signoz_latency_trend=signoz_latency_trend,
signoz_trace_url=signoz_trace_url,
auto_tuning_command=auto_tuning_command,
ai_tokens=ai_tokens,
ai_cost=ai_cost,
anomaly_frequency=anomaly_frequency,
ai_provider=ai_provider,
ai_model=ai_model,
nemotron_enabled=nemotron_enabled,
nemotron_tools=nemotron_tools,
nemotron_validation=nemotron_validation,
nemotron_latency_ms=nemotron_latency_ms,
incident_id=incident_id,
)
)
return result
async def _send_approval_card_to_group(
self,
approval_id: str,
risk_level: str,
resource_name: str,
root_cause: str,
suggested_action: str,
estimated_downtime: str = "~30s",
primary_responsibility: str = "COLLAB",
confidence: float = 0.0,
namespace: str = "default",
signoz_rps: float = 0.0,
signoz_rps_trend: str = "stable",
signoz_error_rate: float = 0.0,
signoz_p99_latency: float = 0.0,
signoz_latency_trend: str = "stable",
signoz_trace_url: str = "",
auto_tuning_command: str = "",
ai_tokens: int = 0,
ai_cost: float = 0.0,
anomaly_frequency: dict | None = None,
ai_provider: str = "",
ai_model: str = "",
nemotron_enabled: bool = False,
nemotron_tools: list[dict] | None = None,
nemotron_validation: str = "",
nemotron_latency_ms: float = 0.0,
incident_id: str = "",
) -> None:
"""
發送告警卡片到 SRE 群組 — 與個人 chat 相同的完整 v7.0 格式
2026-04-05 ogt: 升級為完整格式(含 SignOz/AI/Nemotron移除精簡版
由 asyncio.create_task 非同步呼叫,失敗不影響主告警流程。
"""
try:
emoji = RISK_EMOJI_MAP.get(risk_level.lower(), "⚠️")
signoz_metrics = None
if signoz_rps > 0 or signoz_error_rate > 0 or signoz_p99_latency > 0:
signoz_metrics = SignOzMetricsBlock(
rps=signoz_rps,
rps_trend=signoz_rps_trend,
error_rate=signoz_error_rate,
p99_latency_ms=signoz_p99_latency,
latency_trend=signoz_latency_trend,
trace_url=signoz_trace_url,
)
message = TelegramMessage(
status_emoji=emoji,
risk_level=risk_level.upper(),
resource_name=resource_name,
root_cause=root_cause,
suggested_action=suggested_action,
estimated_downtime=estimated_downtime,
approval_id=approval_id,
incident_id=incident_id,
primary_responsibility=primary_responsibility,
confidence=confidence,
namespace=namespace,
signoz_metrics=signoz_metrics,
signoz_trace_url=signoz_trace_url,
auto_tuning_command=auto_tuning_command,
ai_tokens=ai_tokens,
ai_cost=ai_cost,
anomaly_frequency=anomaly_frequency,
ai_provider=ai_provider,
ai_model=ai_model,
nemotron_enabled=nemotron_enabled,
nemotron_tools=nemotron_tools,
nemotron_validation=nemotron_validation,
nemotron_latency_ms=nemotron_latency_ms,
)
text = message.format_with_nemotron() if nemotron_enabled else message.format()
await self.send_to_group(text=text)
except Exception as e:
logger.error("send_approval_card_to_group_failed", error=str(e))
# =========================================================================
# 新訊息發送方法 (2026-03-29 ogt: ADR-038)
# =========================================================================
def _build_sentry_keyboard(self, error_id: str) -> dict:
"""建立 Sentry 錯誤訊息按鈕"""
view_nonce = self._security.generate_callback_nonce(error_id, "view")
silence_nonce = self._security.generate_callback_nonce(error_id, "silence")
return {
"inline_keyboard": [
[
{"text": "🔍 查看詳情", "callback_data": view_nonce},
{"text": "🔕 靜默 1h", "callback_data": silence_nonce},
]
]
}
def _build_resource_keyboard(self, resource_id: str) -> dict:
"""建立資源告警按鈕"""
scale_nonce = self._security.generate_callback_nonce(resource_id, "scale")
silence_nonce = self._security.generate_callback_nonce(resource_id, "silence")
return {
"inline_keyboard": [
[
{"text": "⚡ 自動擴展", "callback_data": scale_nonce},
{"text": "🔕 靜默 1h", "callback_data": silence_nonce},
]
]
}
async def send_sentry_error(
self,
error_id: str,
error_type: str,
error_message: str,
service_name: str,
file_location: str,
occurrence_count: int = 1,
affected_users: int = 0,
first_seen: str = "",
stack_trace: list[str] | None = None,
sentry_url: str = "",
) -> dict:
"""
發送 Sentry 錯誤通知
2026-03-29 ogt: 新增
Args:
error_id: Sentry Issue ID
error_type: 錯誤類型 (TypeError, etc.)
error_message: 錯誤訊息
service_name: 服務名稱
file_location: 檔案位置
occurrence_count: 發生次數
affected_users: 影響用戶數
first_seen: 首次發生時間
stack_trace: Stack trace
sentry_url: Sentry 連結
Returns:
dict: Telegram API 回應
"""
message = SentryErrorMessage(
error_id=error_id,
error_type=error_type,
error_message=error_message,
service_name=service_name,
file_location=file_location,
occurrence_count=occurrence_count,
affected_users=affected_users,
first_seen=first_seen,
stack_trace=stack_trace,
sentry_url=sentry_url,
)
payload = {
"chat_id": self.chat_id,
"text": message.format(),
"parse_mode": "HTML",
"reply_markup": self._build_sentry_keyboard(error_id),
"disable_web_page_preview": True,
}
logger.info("telegram_sentry_error_sending", error_id=error_id, service=service_name)
result = await self._send_request("sendMessage", payload)
logger.info("telegram_sentry_error_sent", error_id=error_id)
return result
async def send_resource_warning(
self,
resource_id: str,
pod_name: str,
namespace: str = "default",
cpu_percent: float = 0.0,
cpu_limit: str = "",
memory_percent: float = 0.0,
memory_limit: str = "",
disk_percent: float = 0.0,
trend_info: str = "",
suggestion: str = "",
) -> dict:
"""
發送資源告警通知
2026-03-29 ogt: 新增
Args:
resource_id: 資源 ID
pod_name: Pod 名稱
namespace: K8s namespace
cpu_percent: CPU 使用率
memory_percent: Memory 使用率
disk_percent: Disk 使用率
trend_info: 趨勢資訊
suggestion: 建議
Returns:
dict: Telegram API 回應
"""
message = ResourceWarnMessage(
resource_id=resource_id,
pod_name=pod_name,
namespace=namespace,
cpu_percent=cpu_percent,
cpu_limit=cpu_limit,
memory_percent=memory_percent,
memory_limit=memory_limit,
disk_percent=disk_percent,
trend_info=trend_info,
suggestion=suggestion,
)
payload = {
"chat_id": self.chat_id,
"text": message.format(),
"parse_mode": "HTML",
"reply_markup": self._build_resource_keyboard(resource_id),
"disable_web_page_preview": True,
}
logger.info("telegram_resource_warning_sending", resource_id=resource_id, pod=pod_name)
result = await self._send_request("sendMessage", payload)
logger.info("telegram_resource_warning_sent", resource_id=resource_id)
return result
async def send_repair_report(
self,
report_date: str,
total_repairs: int = 0,
success_count: int = 0,
failure_count: int = 0,
saved_minutes: int = 0,
top_issues: list[tuple[str, int]] | None = None,
ai_cost_gemini: float = 0.0,
ai_cost_nvidia: float = 0.0,
ai_tokens_total: int = 0,
) -> dict:
"""
發送自動修復報告
2026-03-29 ogt: 新增
Args:
report_date: 報告日期
total_repairs: 總修復次數
success_count: 成功次數
failure_count: 失敗次數
saved_minutes: 節省人工時間
top_issues: Top 問題列表
ai_cost_gemini: Gemini 成本
ai_cost_nvidia: NVIDIA 成本
ai_tokens_total: 總 Token 數
Returns:
dict: Telegram API 回應
"""
message = RepairReportMessage(
report_date=report_date,
total_repairs=total_repairs,
success_count=success_count,
failure_count=failure_count,
saved_minutes=saved_minutes,
top_issues=top_issues,
ai_cost_gemini=ai_cost_gemini,
ai_cost_nvidia=ai_cost_nvidia,
ai_tokens_total=ai_tokens_total,
)
payload = {
"chat_id": self.chat_id,
"text": message.format(),
"parse_mode": "HTML",
"disable_web_page_preview": True,
}
logger.info("telegram_repair_report_sending", date=report_date)
result = await self._send_request("sendMessage", payload)
logger.info("telegram_repair_report_sent", date=report_date)
return result
async def send_daily_summary(
self,
summary_date: str,
alert_total: int = 0,
alert_critical: int = 0,
alert_medium: int = 0,
alert_low: int = 0,
auto_repair_count: int = 0,
manual_approval_count: int = 0,
ignored_count: int = 0,
avg_response_minutes: float = 0.0,
api_availability: float = 99.9,
web_availability: float = 99.9,
worker_availability: float = 99.9,
ai_cost: float = 0.0,
cloud_cost: float = 0.0,
budget_remaining: float = 0.0,
) -> dict:
"""
發送每日摘要
2026-03-29 ogt: 新增
Returns:
dict: Telegram API 回應
"""
message = DailySummaryMessage(
summary_date=summary_date,
alert_total=alert_total,
alert_critical=alert_critical,
alert_medium=alert_medium,
alert_low=alert_low,
auto_repair_count=auto_repair_count,
manual_approval_count=manual_approval_count,
ignored_count=ignored_count,
avg_response_minutes=avg_response_minutes,
api_availability=api_availability,
web_availability=web_availability,
worker_availability=worker_availability,
ai_cost=ai_cost,
cloud_cost=cloud_cost,
budget_remaining=budget_remaining,
)
payload = {
"chat_id": self.chat_id,
"text": message.format(),
"parse_mode": "HTML",
"disable_web_page_preview": True,
}
logger.info("telegram_daily_summary_sending", date=summary_date)
result = await self._send_request("sendMessage", payload)
logger.info("telegram_daily_summary_sent", date=summary_date)
return result
async def send_cicd_progress(
self,
job_name: str,
status: str,
stage: str = "",
commit_sha: str = "",
triggered_by: str = "",
duration_seconds: int = 0,
message: str = "",
workflow_url: str = "",
max_retries: int = 3,
) -> dict:
"""
發送 CI/CD 進度通知 (簡潔版,不走 AI 仲裁)
2026-03-30 ogt: 新增,解決 CI/CD 告警被當成事件處理的問題
2026-03-30 P1: 新增重試機制 (指數退避)
Args:
max_retries: 最大重試次數 (預設 3)
Returns:
dict: Telegram API 回應
"""
# OTEL Span: telegram.send_cicd_progress
with _tracer.start_as_current_span(
"telegram.send_cicd_progress",
attributes={
"telegram.job_name": job_name,
"telegram.status": status,
"telegram.stage": stage,
"telegram.max_retries": max_retries,
},
) as span:
msg = CICDProgressMessage(
job_name=job_name,
status=status,
stage=stage,
commit_sha=commit_sha,
triggered_by=triggered_by,
duration_seconds=duration_seconds,
message=message,
workflow_url=workflow_url,
)
payload = {
"chat_id": self.chat_id,
"text": msg.format(),
"parse_mode": "HTML",
"disable_web_page_preview": True,
}
logger.info("telegram_cicd_progress_sending", job=job_name, status=status)
# 重試機制 (指數退避)
last_error = None
for attempt in range(max_retries):
try:
result = await self._send_request("sendMessage", payload)
span.set_attribute("telegram.attempts", attempt + 1)
span.set_status(trace.Status(trace.StatusCode.OK))
logger.info("telegram_cicd_progress_sent", job=job_name, status=status, attempt=attempt + 1)
return result
except TelegramGatewayError as e:
last_error = e
if attempt < max_retries - 1:
delay = 2 ** attempt # 1, 2, 4 秒
logger.warning(
"telegram_cicd_progress_retry",
job=job_name,
attempt=attempt + 1,
delay=delay,
error=str(e),
)
await asyncio.sleep(delay)
# 所有重試都失敗
span.set_attribute("telegram.attempts", max_retries)
span.set_status(trace.Status(trace.StatusCode.ERROR))
span.record_exception(last_error)
logger.error(
"telegram_cicd_progress_failed",
job=job_name,
status=status,
max_retries=max_retries,
error=str(last_error),
)
raise last_error
async def send_deploy_success(
self,
commit_sha: str,
triggered_by: str,
environment: str = "Production",
api_version: str = "",
web_version: str = "",
worker_version: str = "",
duration_seconds: int = 0,
e2e_passed: int = 0,
e2e_total: int = 0,
health_check_passed: bool = True,
workflow_url: str = "",
) -> dict:
"""
發送部署成功通知
2026-03-29 ogt: 新增
Returns:
dict: Telegram API 回應
"""
message = DeploySuccessMessage(
commit_sha=commit_sha,
triggered_by=triggered_by,
environment=environment,
api_version=api_version,
web_version=web_version,
worker_version=worker_version,
duration_seconds=duration_seconds,
e2e_passed=e2e_passed,
e2e_total=e2e_total,
health_check_passed=health_check_passed,
workflow_url=workflow_url,
)
payload = {
"chat_id": self.chat_id,
"text": message.format(),
"parse_mode": "HTML",
"disable_web_page_preview": True,
}
logger.info("telegram_deploy_success_sending", commit=commit_sha[:8])
result = await self._send_request("sendMessage", payload)
logger.info("telegram_deploy_success_sent", commit=commit_sha[:8])
return result
async def send_rate_limit_warning(
self,
provider: str,
daily_usage: int = 0,
daily_limit: int = 0,
token_usage: int = 0,
token_limit: int = 0,
cost_usd: float = 0.0,
suggestions: list[str] | None = None,
reset_time: str = "",
) -> dict:
"""
發送 API 限額警告
2026-03-29 ogt: 新增
Returns:
dict: Telegram API 回應
"""
message = RateLimitMessage(
provider=provider,
daily_usage=daily_usage,
daily_limit=daily_limit,
token_usage=token_usage,
token_limit=token_limit,
cost_usd=cost_usd,
suggestions=suggestions,
reset_time=reset_time,
)
payload = {
"chat_id": self.chat_id,
"text": message.format(),
"parse_mode": "HTML",
"disable_web_page_preview": True,
}
logger.info("telegram_rate_limit_warning_sending", provider=provider)
result = await self._send_request("sendMessage", payload)
logger.info("telegram_rate_limit_warning_sent", provider=provider)
return result
async def handle_callback(
self,
callback_query_id: str,
callback_data: str,
user_id: int,
message_id: int,
original_text: str = "",
username: str = "",
) -> dict:
"""
處理簽核/調優回調
Args:
callback_query_id: Telegram Callback Query ID
callback_data: Callback Data (包含 nonce)
user_id: Telegram User ID
message_id: 原始訊息 ID
original_text: 原始卡片內容 (用於保留上下文)
username: 簽核者使用者名稱
Returns:
dict: 處理結果 {action, approval_id, user, auto_tuning_result?}
"""
try:
# ===================================================================
# Step 1: 解析 Callback Data (支援兩種格式)
# ===================================================================
parsed = self._security.parse_callback_data(callback_data)
action = parsed["action"]
approval_id = parsed["approval_id"]
# ===================================================================
# Step 1.5: ADR-050 Info Actions (read-only, 只需白名單驗證)
# ===================================================================
# 2026-04-01 Claude Code (ADR-050 P1): detail/reanalyze/history
if parsed.get("is_info_action"):
if not self._security.is_whitelisted(user_id):
raise UserNotWhitelistedError(f"User {user_id} not in whitelist")
incident_id = parsed.get("incident_id", approval_id)
if action == "detail":
# ADR-050 P2: 取得事件詳情,傳送新訊息 (保留原始簽核卡片+按鈕)
# 2026-04-01 Claude Code (ADR-050 P2)
await self._answer_callback(callback_query_id, action, text="📋 詳情傳送中...")
await self._send_incident_detail(incident_id)
elif action == "history":
# ADR-050 P2: 取得頻率統計
# 2026-04-01 Claude Code (ADR-050 P2)
await self._answer_callback(callback_query_id, action, text="📊 歷史統計傳送中...")
await self._send_incident_history(incident_id)
elif action == "reanalyze":
# ADR-050 P2: 觸發重診
# 2026-04-01 Claude Code (ADR-050 P2): reanalyze button handler
await self._answer_callback(callback_query_id, action, text="🔄 重診排程中...")
await self._send_reanalyze_result(incident_id)
else:
await self._answer_callback(callback_query_id, action, text="⚠️ 未知操作")
return {
"action": action,
"approval_id": approval_id,
"user": {"id": user_id, "username": username},
"success": True,
"info_action": True,
}
nonce = parsed["nonce"]
# 驗證使用者 + Nonce
user = await self._security.verify_callback(
user_id=user_id,
callback_id=callback_query_id,
nonce=nonce,
)
# ===================================================================
# Step 2: 處理自動調優 (Shadow Mode)
# ===================================================================
auto_tuning_result = None
if action == "tune":
auto_tuning_result = await self._handle_auto_tuning(
approval_id=approval_id,
user_id=user_id,
username=username,
)
# 回應 Callback Query
await self._answer_callback(
callback_query_id,
"tune",
text="⚡ 調優指令已記錄 (Shadow Mode)",
)
# 更新訊息
await self._update_message_after_action(
message_id=message_id,
action="tune",
username=username,
original_text=original_text,
extra_info=auto_tuning_result.get("command", ""),
)
return {
"action": action,
"approval_id": approval_id,
"user": user,
"success": True,
"auto_tuning_result": auto_tuning_result,
}
# ===================================================================
# Step 2.5: 處理稍後/靜默 (2026-03-27 P1 優化)
# ===================================================================
if action == "snooze":
snooze_result = await self._handle_snooze(
approval_id=approval_id,
username=username,
)
await self._answer_callback(
callback_query_id,
"snooze",
text="⏰ 30 分鐘後再提醒",
)
await self._update_message_after_action(
message_id=message_id,
action="snooze",
username=username,
original_text=original_text,
)
return {
"action": action,
"approval_id": approval_id,
"user": user,
"success": True,
"snooze_result": snooze_result,
}
if action == "silence":
silence_result = await self._handle_silence(
approval_id=approval_id,
username=username,
original_text=original_text,
)
await self._answer_callback(
callback_query_id,
"silence",
text="🔕 此類告警靜默 1 小時",
)
await self._update_message_after_action(
message_id=message_id,
action="silence",
username=username,
original_text=original_text,
extra_info=silence_result.get("resource_name", ""),
)
return {
"action": action,
"approval_id": approval_id,
"user": user,
"success": True,
"silence_result": silence_result,
}
# ===================================================================
# Step 3: 回應 Callback Query (簽核/拒絕)
# ===================================================================
await self._answer_callback(callback_query_id, action)
# ===================================================================
# Step 4: 更新訊息 (保留原始內容 + 簽核鋼印)
# ===================================================================
await self._update_message_after_action(
message_id=message_id,
action=action,
username=username,
original_text=original_text,
)
logger.info(
"telegram_callback_processed",
action=action,
approval_id=approval_id,
user_id=user_id,
)
return {
"action": action,
"approval_id": approval_id,
"user": user,
"success": True,
}
except UserNotWhitelistedError as e:
logger.warning("telegram_callback_denied", error=str(e), user_id=user_id)
await self._answer_callback(
callback_query_id,
"denied",
text="⛔ 您沒有簽核權限",
)
return {"success": False, "error": str(e)}
except NonceReplayError as e:
logger.warning("telegram_callback_replay", error=str(e))
await self._answer_callback(
callback_query_id,
"replay",
text="⚠️ 此操作已處理過",
)
return {"success": False, "error": str(e)}
except Exception as e:
logger.error("telegram_callback_error", error=str(e))
await self._answer_callback(
callback_query_id,
"error",
text="❌ 處理失敗",
)
return {"success": False, "error": str(e)}
async def _handle_auto_tuning(
self,
approval_id: str,
user_id: int,
username: str,
) -> dict:
"""
處理自動調優請求 (Shadow Mode)
統帥鐵律: Shadow Mode 下嚴禁實際執行 K8s 命令
Args:
approval_id: 簽核單 ID
user_id: 執行者 Telegram ID
username: 執行者名稱
Returns:
dict: 調優結果
"""
try:
# Shadow Mode: 僅記錄調優請求
# 實際生產環境需從 ApprovalRecord 取得完整調優指令
# Shadow Mode: 僅記錄調優請求
# 實際生產環境需從 ApprovalRecord 取得完整調優指令
log_message = f"[SHADOW MODE] 自動調優請求 - 簽核單: {approval_id}"
if settings.SHADOW_MODE_ENABLED:
logger.info(
"shadow_mode_auto_tuning_triggered",
approval_id=approval_id,
user_id=user_id,
username=username,
shadow_mode=True,
)
print(f"\n{'='*60}")
print("[SHADOW MODE] AI 生成的調優指令請求")
print(f"簽核單: {approval_id}")
print(f"執行者: @{username} (ID: {user_id})")
print(f"時間: {datetime.now(UTC).isoformat()}")
print("狀態: 僅記錄,未實際執行")
print(f"{'='*60}\n")
return {
"executed": False,
"shadow_mode": True,
"approval_id": approval_id,
"triggered_by": username,
"command": "kubectl command logged (see server logs)",
"log": log_message,
}
else:
logger.warning(
"auto_tuning_blocked_not_shadow_mode",
approval_id=approval_id,
message="Production execution requires multi-sig approval",
)
return {
"executed": False,
"shadow_mode": False,
"approval_id": approval_id,
"error": "Production execution requires multi-sig approval",
}
except Exception as e:
logger.error("auto_tuning_error", error=str(e), approval_id=approval_id)
return {
"executed": False,
"error": str(e),
}
async def _handle_snooze(
self,
approval_id: str,
username: str,
) -> dict:
"""
處理稍後提醒 (2026-03-27 P1 優化)
功能: 延遲 30 分鐘後再提醒此告警
Args:
approval_id: 簽核單 ID
username: 執行者名稱
Returns:
dict: 處理結果
"""
try:
redis = get_redis()
snooze_key = f"{SNOOZE_KEY_PREFIX}{approval_id}"
# 設置 30 分鐘延遲標記
await redis.setex(
snooze_key,
SNOOZE_TTL_SECONDS,
f"{username}:{datetime.now(UTC).isoformat()}",
)
logger.info(
"telegram_snooze_set",
approval_id=approval_id,
username=username,
ttl_minutes=SNOOZE_TTL_SECONDS // 60,
)
return {
"snoozed": True,
"approval_id": approval_id,
"snooze_until": datetime.now(UTC).isoformat(),
"ttl_minutes": SNOOZE_TTL_SECONDS // 60,
}
except Exception as e:
logger.error("snooze_error", error=str(e), approval_id=approval_id)
return {
"snoozed": False,
"error": str(e),
}
async def _handle_silence(
self,
approval_id: str,
username: str,
original_text: str,
) -> dict:
"""
處理靜默 1 小時 (2026-03-27 P1 優化)
功能: 同類告警 (相同資源) 1 小時內不再發送
Args:
approval_id: 簽核單 ID
username: 執行者名稱
original_text: 原始訊息 (用於解析資源名稱)
Returns:
dict: 處理結果
"""
try:
redis = get_redis()
# 從原始訊息解析資源名稱 (格式: 🎯 資源: xxx)
resource_name = "unknown"
for line in original_text.split("\n"):
if "🎯 資源:" in line or "🎯 資源: " in line:
resource_name = line.split(":")[-1].strip()
break
silence_key = f"{SILENCE_KEY_PREFIX}{resource_name}"
# 設置 1 小時靜默標記
await redis.setex(
silence_key,
SILENCE_TTL_SECONDS,
f"{username}:{datetime.now(UTC).isoformat()}:{approval_id}",
)
logger.info(
"telegram_silence_set",
approval_id=approval_id,
resource_name=resource_name,
username=username,
ttl_hours=SILENCE_TTL_SECONDS // 3600,
)
return {
"silenced": True,
"approval_id": approval_id,
"resource_name": resource_name,
"silence_until": datetime.now(UTC).isoformat(),
"ttl_hours": SILENCE_TTL_SECONDS // 3600,
}
except Exception as e:
logger.error("silence_error", error=str(e), approval_id=approval_id)
return {
"silenced": False,
"error": str(e),
}
async def _answer_callback(
self,
callback_query_id: str,
action: str,
text: str | None = None,
) -> None:
"""回應 Callback Query"""
if text is None:
if action == "approve":
text = "✅ 已簽核"
elif action == "reject":
text = "❌ 已拒絕"
elif action == "tune":
text = "⚡ 調優中..."
elif action == "snooze":
text = "⏰ 30 分鐘後再提醒"
elif action == "silence":
text = "🔕 此類告警靜默 1 小時"
else:
text = "✓ 已處理"
await self._send_request("answerCallbackQuery", {
"callback_query_id": callback_query_id,
"text": text,
"show_alert": False,
})
async def _update_message_after_action(
self,
message_id: int,
action: str,
username: str,
original_text: str,
extra_info: str = "",
) -> None:
"""
更新訊息: 保留原始卡片內容 + 簽核/調優鋼印
UX 要求:
- 嚴禁覆蓋原始內容
- 必須在底部加上分隔線與簽核狀態
- 移除所有按鈕
"""
# 構建鋼印 (2026-03-27 ogt: 新增 snooze/silence)
if action == "approve":
stamp = f"✅ 已由 @{username} 授權執行"
elif action == "reject":
stamp = f"❌ 已由 @{username} 拒絕執行"
elif action == "tune":
stamp = f"⚡ 已由 @{username} 觸發自動調優 (Shadow Mode)"
if extra_info:
stamp += "\n📝 指令已記錄"
elif action == "snooze":
stamp = f"⏰ @{username} 已設定 30 分鐘後再提醒"
elif action == "silence":
resource_info = f" ({extra_info})" if extra_info else ""
stamp = f"🔕 @{username} 已靜默此類告警 1 小時{resource_info}"
else:
stamp = f"✓ 已由 @{username} 處理"
# Step 1: 先移除按鈕 (確保按鈕一定消失,即使文字更新失敗)
# 2026-04-05 Claude Code: editMessageText 因 HTML 特殊字符可能失敗,
# 先用 editMessageReplyMarkup 確保按鈕移除,再嘗試更新文字
try:
await self._send_request("editMessageReplyMarkup", {
"chat_id": self.chat_id,
"message_id": message_id,
"reply_markup": {"inline_keyboard": []},
})
except TelegramGatewayError as e:
logger.warning("telegram_remove_buttons_failed", message_id=message_id, error=str(e))
# Step 2: 嘗試更新文字 (原始文字已轉義,確保 HTML 安全)
separator = "──────────────"
safe_original = html.escape(original_text)
safe_updated_text = f"{safe_original}\n{separator}\n{stamp}"
try:
await self._send_request("editMessageText", {
"chat_id": self.chat_id,
"message_id": message_id,
"text": safe_updated_text,
"parse_mode": "HTML",
"reply_markup": {"inline_keyboard": []},
"disable_web_page_preview": True,
})
except TelegramGatewayError as e:
# 文字更新失敗不影響整體流程,按鈕已移除
logger.warning("telegram_update_text_failed", message_id=message_id, error=str(e))
async def _send_incident_detail(self, incident_id: str) -> None:
"""
ADR-050 P2: 傳送事件詳情訊息 (不修改原始簽核卡片)
2026-04-01 Claude Code (ADR-050 P2): detail button handler
"""
# 延遲 import 避免循環依賴 (與 approval_service 同一模式)
from src.repositories.incident_repository import get_incident_repository
try:
repo = get_incident_repository()
incident = await repo.get_by_id(incident_id)
if not incident:
await self.send_notification(f"⚠️ 找不到事件 <code>{html.escape(incident_id)}</code>")
return
dc = incident.decision_chain
confidence_bar = "" * int((dc.confidence if dc else 0) * 10) + "" * (10 - int((dc.confidence if dc else 0) * 10))
lines = [
f"📋 <b>事件詳情</b>",
f"",
f"🔖 <b>ID:</b> <code>{html.escape(incident.incident_id)}</code>",
f"📊 <b>狀態:</b> {incident.status.value}",
f"⚡ <b>嚴重度:</b> {incident.severity.value}",
]
if incident.affected_services:
lines.append(f"🎯 <b>受影響服務:</b> {', '.join(html.escape(s) for s in incident.affected_services[:3])}")
if dc:
lines += [
f"",
f"🤖 <b>AI 分析</b> ({html.escape(dc.model_used)})",
f"💡 {html.escape(dc.hypothesis[:200])}{'...' if len(dc.hypothesis) > 200 else ''}",
f"📈 信心: [{confidence_bar}] {dc.confidence:.0%}",
]
if dc.probable_root_causes:
lines.append(f"🔍 根因: {html.escape(dc.probable_root_causes[0][:100])}")
# 2026-04-02 Claude Code: 修正時區 — 必須轉台北時區 (feedback_timezone_taipei.md)
from zoneinfo import ZoneInfo
created_taipei = incident.created_at.astimezone(ZoneInfo("Asia/Taipei")) if incident.created_at else incident.created_at
lines += [
f"",
f"🕐 <b>建立:</b> {created_taipei.strftime('%m/%d %H:%M') if created_taipei else 'N/A'}",
]
if incident.frequency_stats:
fs = incident.frequency_stats
lines.append(f"📉 <b>頻率:</b> 1h={fs.count_1h} 24h={fs.count_24h} 7d={fs.count_7d}")
await self.send_notification("\n".join(lines))
except Exception as e:
logger.warning("send_incident_detail_failed", incident_id=incident_id, error=str(e))
await self.send_notification(f"⚠️ 無法取得事件詳情: {html.escape(str(e)[:100])}")
async def _send_incident_history(self, incident_id: str) -> None:
"""
ADR-050 P2: 傳送事件頻率統計訊息
2026-04-01 Claude Code (ADR-050 P2): history button handler
"""
from src.repositories.incident_repository import get_incident_repository
try:
repo = get_incident_repository()
incident = await repo.get_by_id(incident_id)
if not incident:
await self.send_notification(f"⚠️ 找不到事件 <code>{html.escape(incident_id)}</code>")
return
fs = incident.frequency_stats
if not fs:
await self.send_notification(
f"📊 <b>事件歷史</b>\n\n🔖 <code>{html.escape(incident.incident_id)}</code>\n\n無頻率統計資料"
)
return
lines = [
f"📊 <b>事件歷史統計</b>",
f"",
f"🔖 <code>{html.escape(incident.incident_id)}</code>",
f"🔑 告警鍵: <code>{html.escape(fs.anomaly_key[:60])}</code>",
f"",
f"⏱ <b>發生頻率</b>",
f" 1小時: {fs.count_1h}",
f" 24小時: {fs.count_24h}",
f" 7天: {fs.count_7d}",
f" 30天: {fs.count_30d}",
]
if fs.auto_repair_count > 0:
lines += [
f"",
f"🔧 <b>自動修復</b>",
f" 執行次數: {fs.auto_repair_count}",
]
if fs.last_repair_action:
lines.append(f" 最後動作: {html.escape(fs.last_repair_action[:80])}")
# 2026-04-07 Claude Code: Sprint 4 D2 — 處置分佈明細
total_res = fs.total_resolution_count
if total_res > 0:
auto_total = fs.auto_repair_count + fs.cold_start_trust_count
auto_rate = int(auto_total / total_res * 100) if total_res > 0 else 0
lines += [
f"",
f"📋 <b>處置分佈</b> (共 {total_res} 次)",
f" 🤖 自動修復: {fs.auto_repair_count}",
f" ❄️ 冷啟動信任: {fs.cold_start_trust_count}",
f" 👤 人工審核: {fs.human_approved_count}",
f" 🔧 手動處理: {fs.manual_resolved_count}",
f" 📈 自動化率: <b>{auto_rate}%</b>",
]
await self.send_notification("\n".join(lines))
except Exception as e:
logger.warning("send_incident_history_failed", incident_id=incident_id, error=str(e))
await self.send_notification(f"⚠️ 無法取得歷史統計: {html.escape(str(e)[:100])}")
async def _send_reanalyze_result(self, incident_id: str) -> None:
"""
ADR-050 P2: 觸發重診並傳送結果訊息
呼叫 IncidentService.trigger_reanalysis(),以新訊息回報排程結果。
不修改原始簽核卡片,避免干擾授權流程。
2026-04-01 Claude Code (ADR-050 P2): reanalyze button handler
"""
from src.services.incident_service import get_incident_service
try:
service = get_incident_service()
result = await service.trigger_reanalysis(incident_id)
if result["already_analyzing"]:
msg = (
f"⏳ <b>重診進行中</b>\n\n"
f"🔖 <code>{html.escape(incident_id)}</code>\n\n"
f"{html.escape(result['message'])}"
)
elif result["triggered"]:
msg = (
f"🔄 <b>重診已排程</b>\n\n"
f"🔖 <code>{html.escape(incident_id)}</code>\n\n"
f"{html.escape(result['message'])}\n"
f"AI 分析結果將自動更新事件狀態。"
)
else:
msg = (
f"⚠️ <b>重診失敗</b>\n\n"
f"🔖 <code>{html.escape(incident_id)}</code>\n\n"
f"{html.escape(result['message'])}"
)
await self.send_notification(msg)
except Exception as e:
logger.warning("send_reanalyze_result_failed", incident_id=incident_id, error=str(e))
await self.send_notification(
f"⚠️ 重診觸發失敗: {html.escape(str(e)[:100])}"
)
# =========================================================================
# Sprint 5.1 T1-T6: Data Safety Guardrail 通知場景
# (2026-04-08 Claude Sonnet 4.6 Asia/TaipeiADR-062)
# =========================================================================
async def send_guardrail_blocked(
self,
service_name: str,
alertname: str,
reason: str,
) -> None:
"""T1: GUARDRAIL_BLOCKED — 服務屬於 BLOCK 等級,禁止自動修復"""
text = (
"🚫 <b>[服務保護] 自動修復已阻擋</b>\n"
"━━━━━━━━━━━━━━━━━\n"
f"服務: <code>{html.escape(service_name)}</code>\n"
f"告警: <code>{html.escape(alertname)}</code>\n"
f"原因: {html.escape(reason)}\n"
"━━━━━━━━━━━━━━━━━\n"
"⚠️ 請人工評估並手動處理"
)
await self.send_notification(text)
async def send_preflight_failed(
self,
service_name: str,
backup_age_hours: float,
max_age_hours: float,
backup_name: str | None,
) -> None:
"""T2: PRE_FLIGHT_FAILED + BACKUP_TRIGGERED — 備份過期,修復暫停"""
backup_status = (
f"緊急備份: 已啟動 <code>{html.escape(backup_name)}</code>"
if backup_name
else "緊急備份: <b>啟動失敗</b>,請人工處理"
)
text = (
"⏸ <b>[Pre-flight 阻擋] 備份已過期,修復暫停</b>\n"
"━━━━━━━━━━━━━━━━━\n"
f"服務: <code>{html.escape(service_name)}</code>\n"
f"備份距今: {backup_age_hours:.1f} 小時(上限 {max_age_hours:.0f} 小時)\n"
f"{backup_status}\n"
"━━━━━━━━━━━━━━━━━\n"
"請等待備份完成後,人工重新評估修復方案"
)
await self.send_notification(text)
async def send_backup_result(
self,
backup_name: str,
success: bool,
error_msg: str | None = None,
) -> None:
"""T3: BACKUP_COMPLETED / BACKUP_FAILED — 緊急備份結果"""
if success:
text = (
"✅ <b>緊急備份完成</b>\n"
f"備份: <code>{html.escape(backup_name)}</code>\n"
"可繼續手動執行修復"
)
else:
err = html.escape(error_msg or "未知錯誤")
text = (
"❌ <b>緊急備份失敗</b>\n"
f"備份: <code>{html.escape(backup_name)}</code>\n"
f"錯誤: {err}\n"
"請人工介入,備份異常"
)
await self.send_notification(text)
async def send_multisig_waiting(
self,
action: str,
service_name: str,
votes_received: int,
votes_required: int,
approval_id: str,
) -> None:
"""T4: APPROVAL_ESCALATED — 第 1 票完成,等待第 2 票"""
text = (
"🔐 <b>[MultiSig] 等待第 2 票授權</b>\n"
"━━━━━━━━━━━━━━━━━\n"
f"操作: {html.escape(action)}\n"
f"服務: <code>{html.escape(service_name)}</code>\n"
f"風險: CRITICALHITL 雙簽)\n"
f"已獲授權: {votes_received}/{votes_required}\n"
f"審核 ID: <code>{html.escape(approval_id)}</code>\n"
"━━━━━━━━━━━━━━━━━\n"
"請第二位審核者登入確認"
)
await self.send_notification(text)
async def send_multisig_approved(
self,
action: str,
service_name: str,
) -> None:
"""T5: MultiSig 完成2/2"""
text = (
"✅ <b>[MultiSig 完成] 雙簽授權通過</b>\n"
f"操作: {html.escape(action)}\n"
f"服務: <code>{html.escape(service_name)}</code>\n"
"授權: 2/2 票 開始執行..."
)
await self.send_notification(text)
async def send_change_applied(
self,
operator: str,
action_description: str,
timestamp: str,
) -> None:
"""T6: CHANGE_APPLIED — 手動變更記錄"""
text = (
"📝 <b>[變更記錄] 手動操作已記錄</b>\n"
"━━━━━━━━━━━━━━━━━\n"
f"操作者: {html.escape(operator)}\n"
f"動作: {html.escape(action_description)}\n"
f"時間: {html.escape(timestamp)}"
)
await self.send_notification(text)
async def send_notification(
self,
text: str,
parse_mode: str = "HTML",
chat_id: str | int | None = None,
) -> dict:
"""
發送純文字通知
Args:
text: 訊息內容
parse_mode: 解析模式
Returns:
dict: API 回應
"""
payload = {
"chat_id": chat_id or self.chat_id,
"text": text[:500], # SOUL.md 字數限制
"parse_mode": parse_mode,
}
return await self._send_request("sendMessage", payload)
# =========================================================================
# 2026-04-03 ogt: SRE 戰情室群組三頭政治 (Triumvirate) — ADR-053
# @tsenyangbot 發告警卡片到群組OpenClaw/NemoClaw Bot 各自回覆分析
# =========================================================================
async def send_to_group(
self,
text: str,
parse_mode: str = "HTML",
reply_markup: dict | None = None,
) -> dict:
"""
用 @tsenyangbot 發訊息到 SRE 群組 (SRE_GROUP_CHAT_ID)
Args:
text: 訊息內容
parse_mode: 解析模式
reply_markup: 按鈕 (可選)
Returns:
dict: Telegram API 回應 (含 message_id)
"""
if not settings.SRE_GROUP_CHAT_ID:
logger.warning("send_to_group_skipped", reason="SRE_GROUP_CHAT_ID not configured")
return {}
payload: dict = {
"chat_id": settings.SRE_GROUP_CHAT_ID,
"text": text[:4096],
"parse_mode": parse_mode,
}
if reply_markup:
payload["reply_markup"] = reply_markup
return await self._send_request("sendMessage", payload)
async def _send_as_bot(
self,
token: str,
chat_id: str,
text: str,
reply_to_message_id: int | None = None,
parse_mode: str = "HTML",
) -> dict:
"""
用指定 Bot Token 發訊息(不走 self._http_client獨立建立請求
Args:
token: Bot Token
chat_id: 群組 Chat ID
text: 訊息內容
reply_to_message_id: 回覆哪則訊息的 message_id
parse_mode: 解析模式
Returns:
dict: Telegram API 回應
"""
if not self._http_client:
raise TelegramGatewayError("HTTP client not initialized")
url = f"{self.TELEGRAM_API_BASE}/bot{token}/sendMessage"
payload: dict = {
"chat_id": chat_id,
"text": text[:4096],
"parse_mode": parse_mode,
}
# 2026-04-03 ogt: supergroup 跨 Bot reply 需用 reply_parameters (Bot API v6.7+)
# 舊的 reply_to_message_id 在 supergroup 會 400改用新格式 + allow_sending_without_reply
if reply_to_message_id:
payload["reply_parameters"] = {
"message_id": reply_to_message_id,
"allow_sending_without_reply": True,
}
response = await self._http_client.post(url, json=payload)
response.raise_for_status()
return response.json()
async def send_as_openclaw(
self,
text: str,
reply_to_message_id: int | None = None,
) -> dict:
"""
用 @OpenClawAwoooI_Bot 在群組發言
Args:
text: 訊息內容
reply_to_message_id: 回覆哪則訊息
Returns:
dict: Telegram API 回應
"""
if not settings.OPENCLAW_BOT_TOKEN or not settings.SRE_GROUP_CHAT_ID:
logger.warning("send_as_openclaw_skipped", reason="OPENCLAW_BOT_TOKEN or SRE_GROUP_CHAT_ID not configured")
return {}
return await self._send_as_bot(
token=settings.OPENCLAW_BOT_TOKEN,
chat_id=settings.SRE_GROUP_CHAT_ID,
text=text,
reply_to_message_id=reply_to_message_id,
)
async def send_as_nemotron(
self,
text: str,
reply_to_message_id: int | None = None,
) -> dict:
"""
用 @NemoTronAwoooI_Bot 在群組發言
Args:
text: 訊息內容
reply_to_message_id: 回覆哪則訊息
Returns:
dict: Telegram API 回應
"""
if not settings.NEMOTRON_BOT_TOKEN or not settings.SRE_GROUP_CHAT_ID:
logger.warning("send_as_nemotron_skipped", reason="NEMOTRON_BOT_TOKEN or SRE_GROUP_CHAT_ID not configured")
return {}
return await self._send_as_bot(
token=settings.NEMOTRON_BOT_TOKEN,
chat_id=settings.SRE_GROUP_CHAT_ID,
text=text,
reply_to_message_id=reply_to_message_id,
)
async def trigger_group_ai_discussion(
self,
alert_message_id: int,
alert_summary: str,
) -> None:
"""
觸發群組 AI 並行分析(三頭政治核心流程)
流程 (2026-04-03 ogt: 統帥指示改為並行):
- OpenClaw 和 NemoClaw 同時對告警進行獨立分析
- 兩者都 reply 同一條告警訊息
- 並行執行,總等待時間 = max(OpenClaw, NemoClaw) 而非相加
此方法由 asyncio.create_task 非同步呼叫,失敗不影響主流程。
Args:
alert_message_id: 告警訊息的 message_id兩個 Bot 回覆的起點)
alert_summary: 告警摘要文字(提供給 AI 分析用)
"""
try:
from src.services.chat_manager import ChatManager # noqa: PLC0415
except ImportError:
logger.error("trigger_group_ai_discussion_failed", reason="Cannot import ChatManager")
return
try:
chat_mgr = ChatManager()
# 2026-04-03 ogt: 老闆指示 — 告警分析只由 OpenClaw 負責NemoClaw 不分析告警
openclaw_prompt = (
f"你是 OpenClawAWOOOI SRE 戰情室首席 AI精通 K8s、Prometheus、告警分析。\n"
f"以下是一則基礎設施告警,請進行 RCA 根因分析並給出 3 點具體建議行動。\n"
f"繁體中文回應,不超過 300 字:\n\n"
f"{alert_summary}"
)
openclaw_analysis = await chat_mgr._call_openclaw(
system_prompt="你是 OpenClawAWOOOI SRE 戰情室首席 AI。稱呼用戶為「老闆」。",
user_message=openclaw_prompt,
)
if openclaw_analysis and not isinstance(openclaw_analysis, Exception):
await self.send_as_openclaw(
text=f"🦞 <b>OpenClaw 分析</b>\n\n{openclaw_analysis}",
reply_to_message_id=alert_message_id,
)
logger.info("group_ai_discussion_openclaw_sent")
else:
logger.warning("trigger_group_ai_discussion_openclaw_empty")
logger.info("group_ai_discussion_completed", alert_message_id=alert_message_id)
except Exception as e:
# 群組 AI 討論失敗不影響主流程
logger.error("trigger_group_ai_discussion_failed", error=str(e))
async def close(self) -> None:
"""關閉 Gateway"""
# 停止 Long Polling 與 Leader 相關 Tasks
self._polling_active = False
for task in (self._polling_task, self._leader_task):
if task and not task.done():
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
self._polling_task = None
self._leader_task = None
if self._http_client:
await self._http_client.aclose()
self._http_client = None
self._initialized = False
logger.info("telegram_gateway_closed")
# =========================================================================
# Long Polling 實作 (Phase 5 內網修復)
# =========================================================================
async def start_long_polling(self) -> None:
"""
啟動 Long Polling 背景任務
取代 Webhook 模式,適用於內網環境
統帥鐵律: 內網無法接收外部 Webhook必須主動輪詢
2026-04-01 Claude Code: 加入 Redis Leader Election
多 Pod 環境下,只有 Leader 執行 getUpdates其餘 Pod 進入 Watcher 模式
"""
if not self._initialized:
success = await self.initialize()
if not success:
logger.error("telegram_long_polling_failed", reason="Gateway not initialized")
return
if self._polling_active:
logger.warning("telegram_long_polling_already_running")
return
# 嘗試取得 Leader Lock (NX = 僅在不存在時設定)
redis = await get_redis()
acquired = await redis.set(POLLING_LEADER_KEY, self._pod_id, nx=True, ex=POLLING_LEADER_TTL)
if not acquired:
current_leader = await redis.get(POLLING_LEADER_KEY)
logger.info(
"telegram_polling_not_leader",
pod_id=self._pod_id,
current_leader=current_leader,
action="watcher_mode",
)
# 啟動 Watcher定期嘗試接管
self._leader_task = asyncio.create_task(self._leader_watcher())
return
# 取得 Leader Lock開始 Polling
await self._delete_webhook()
self._polling_active = True
self._last_update_id = 0
self._polling_task = asyncio.create_task(self._polling_loop())
self._leader_task = asyncio.create_task(self._leader_renewer())
logger.info(
"telegram_long_polling_started",
pod_id=self._pod_id,
timeout=LONG_POLLING_TIMEOUT,
chat_id=self.chat_id[:10] + "..." if self.chat_id else "N/A",
)
async def _delete_webhook(self) -> None:
"""
刪除現有 Webhook (切換至 Long Polling 模式)
統帥鐵律: Webhook 和 Long Polling 不能共存
必須先刪除 Webhook 才能使用 getUpdates
"""
if not self._http_client:
return
try:
# Step 1: 刪除 Webhook
url = f"{self.api_url}/deleteWebhook"
response = await self._http_client.post(url, json={"drop_pending_updates": True})
result = response.json()
if result.get("ok"):
logger.info(
"telegram_webhook_deleted",
description=result.get("description", "Webhook deleted"),
)
else:
logger.warning(
"telegram_webhook_delete_failed",
error=result.get("description"),
)
# Step 2: 等待 Telegram 伺服器同步 (避免 409 Conflict)
await asyncio.sleep(1)
# Step 3: 驗證 Webhook 狀態
info_url = f"{self.api_url}/getWebhookInfo"
info_response = await self._http_client.get(info_url)
info_result = info_response.json()
webhook_url = info_result.get("result", {}).get("url", "")
if webhook_url:
logger.warning(
"telegram_webhook_still_active",
url=webhook_url[:50],
)
else:
logger.info("telegram_webhook_confirmed_deleted")
except Exception as e:
logger.error("telegram_webhook_delete_error", error=str(e))
async def _polling_loop(self) -> None:
"""
Long Polling 主循環
使用 getUpdates API 持續監聽 Telegram 更新
"""
logger.info("[Telegram] Long polling started - 神經已接通,等待統帥指令...")
while self._polling_active:
try:
updates = await self._get_updates()
for update in updates:
await self._process_update(update)
except asyncio.CancelledError:
logger.info("telegram_long_polling_cancelled")
break
except httpx.TimeoutException:
# Long polling timeout 是正常的,繼續下一輪
continue
except httpx.HTTPStatusError as e:
if e.response.status_code == 409:
# 409 Conflict: 另一個 Pod 正在 polling主動釋放 Leader Lock
# 2026-04-01 Claude Code: 改為釋放 Lock 讓 Watcher 競爭
# (舊: 侵略性搶佔 2s已不適用 - 現在是多 Pod 場景而非 .188 搶佔)
logger.warning(
"telegram_polling_conflict",
status=409,
pod_id=self._pod_id,
action="releasing_leader_lock",
)
redis = await get_redis()
current = await redis.get(POLLING_LEADER_KEY)
if current == self._pod_id:
await redis.delete(POLLING_LEADER_KEY)
self._polling_active = False
# Watcher 會在 POLLING_LEADER_WATCH 秒後重新競爭
self._leader_task = asyncio.create_task(self._leader_watcher())
break
else:
logger.error("telegram_polling_http_error", status=e.response.status_code)
await asyncio.sleep(LONG_POLLING_RETRY_DELAY)
except Exception as e:
logger.error("telegram_polling_error", error=str(e))
# 錯誤後等待再重試
await asyncio.sleep(LONG_POLLING_RETRY_DELAY)
logger.info("telegram_long_polling_stopped")
async def _leader_renewer(self) -> None:
"""
Leader Lock 續約背景任務
每 POLLING_LEADER_RENEW 秒更新 Redis TTL
確保 Leader 在 Poll 期間持續持有 Lock。
若 Lock 被搶走,停止 Polling。
2026-04-01 Claude Code: 分散式 Leader Election
"""
while self._polling_active:
await asyncio.sleep(POLLING_LEADER_RENEW)
if not self._polling_active:
break
try:
redis = await get_redis()
current = await redis.get(POLLING_LEADER_KEY)
if current != self._pod_id:
logger.warning(
"telegram_leader_lock_lost",
pod_id=self._pod_id,
current_leader=current,
)
self._polling_active = False
break
await redis.expire(POLLING_LEADER_KEY, POLLING_LEADER_TTL)
except Exception as e:
logger.error("telegram_leader_renew_error", error=str(e))
async def _leader_watcher(self) -> None:
"""
非 Leader Pod 的接管監控任務
每 POLLING_LEADER_WATCH 秒嘗試取得 Leader Lock。
若原 Leader 宕掉TTL 過期),此 Pod 接管 Polling。
2026-04-01 Claude Code: 分散式 Leader Election
"""
while not self._polling_active:
await asyncio.sleep(POLLING_LEADER_WATCH)
try:
redis = await get_redis()
acquired = await redis.set(
POLLING_LEADER_KEY, self._pod_id, nx=True, ex=POLLING_LEADER_TTL
)
if acquired:
logger.info(
"telegram_leader_acquired",
pod_id=self._pod_id,
action="starting_polling",
)
await self._delete_webhook()
self._polling_active = True
self._last_update_id = 0
self._polling_task = asyncio.create_task(self._polling_loop())
self._leader_task = asyncio.create_task(self._leader_renewer())
break
except asyncio.CancelledError:
break
except Exception as e:
logger.error("telegram_leader_watch_error", error=str(e))
async def _get_updates(self) -> list[dict]:
"""
呼叫 Telegram getUpdates API
Returns:
list[dict]: 更新列表
"""
if not self._http_client:
return []
url = f"{self.api_url}/getUpdates"
payload = {
"offset": self._last_update_id + 1,
"timeout": LONG_POLLING_TIMEOUT,
"allowed_updates": ["callback_query", "message"], # 監聽按鈕與文字訊息
}
response = await self._http_client.post(
url,
json=payload,
timeout=LONG_POLLING_TIMEOUT + 10, # 比 API timeout 多一點
)
response.raise_for_status()
result = response.json()
if not result.get("ok"):
raise TelegramGatewayError(f"getUpdates failed: {result.get('description')}")
updates = result.get("result", [])
# 更新 offset
if updates:
self._last_update_id = updates[-1]["update_id"]
return updates
async def _process_update(self, update: dict) -> None:
"""
處理單個 Telegram Update
Args:
update: Telegram Update 物件
"""
update_id = update.get("update_id")
callback_query = update.get("callback_query")
message = update.get("message")
if not callback_query and not message:
logger.debug("telegram_update_ignored", update_id=update_id, reason="unsupported update type")
return
if callback_query:
await self._handle_callback_query(update_id, callback_query)
elif message:
await self._handle_chat_message(update_id, message)
async def _handle_callback_query(self, update_id: int, callback_query: dict) -> None:
"""處理按鈕點擊更新"""
callback_query_id = callback_query.get("id")
callback_data = callback_query.get("data")
user = callback_query.get("from", {})
user_id = user.get("id")
if not all([callback_query_id, callback_data, user_id]):
logger.warning("telegram_callback_invalid", update_id=update_id)
return
username = user.get("username") or user.get("first_name") or str(user_id)
original_text = callback_query.get("message", {}).get("text", "")
message_id = callback_query.get("message", {}).get("message_id")
logger.info(
"telegram_callback_received",
update_id=update_id,
user_id=user_id,
username=username,
)
# 呼叫現有的 handle_callback 邏輯
result = await self.handle_callback(
callback_query_id=callback_query_id,
callback_data=callback_data,
user_id=user_id,
message_id=message_id,
original_text=original_text,
username=username,
)
if result.get("success"):
# 執行資料庫更新 (簽核/拒絕)
await self._execute_approval_action(
action=result["action"],
approval_id=result["approval_id"],
user_id=user_id,
username=username,
message_id=message_id,
)
async def _handle_chat_message(self, update_id: int, message: dict) -> None:
"""處理统帥的文字訊息(個人 chat 或 SRE 群組)"""
text = message.get("text")
user = message.get("from", {})
user_id = user.get("id")
chat_id = message.get("chat", {}).get("id")
chat_type = message.get("chat", {}).get("type", "private")
message_id = message.get("message_id")
username = user.get("username") or user.get("first_name") or str(user_id)
if not text or not user_id:
return
# Bot 訊息忽略(避免 Bot 互相觸發無限循環)
if user.get("is_bot"):
return
logger.info(
"telegram_chat_received",
update_id=update_id,
user_id=user_id,
username=username,
chat_type=chat_type,
text=text[:50],
)
# 1. 群組訊息路由優先 (2026-04-03 ogt: SRE 戰情室群組無需個人白名單)
# 群組是封閉環境,成員由 Telegram 群組管理員控制,不走個人 whitelist
is_group = chat_type in ("group", "supergroup")
is_sre_group = str(chat_id) == str(settings.SRE_GROUP_CHAT_ID)
if is_group and is_sre_group:
reply_to_message = message.get("reply_to_message")
await self._handle_group_message(text, user_id, username, chat_id, message_id, reply_to_message)
return
# 2. 個人 chat 安全檢查 (ADR-012)
try:
interceptor = get_security_interceptor()
await interceptor.intercept_telegram(user_id)
except Exception as e:
logger.warning("telegram_chat_unauthorized", user_id=user_id, error=str(e))
return
# 3. /ai 指令攔截 (Phase 24 C — 2026-04-03 ogt)
if text.strip().lower().startswith("/ai"):
whitelist = settings.get_tg_user_whitelist()
if not whitelist or user_id not in whitelist:
logger.warning("telegram_ai_command_unauthorized", user_id=user_id, whitelist_empty=not whitelist)
await self.send_notification("⛔ 未授權:/ai 指令僅限白名單用戶", parse_mode="HTML", chat_id=chat_id)
return
from src.services.ai_control import handle_ai_command
response = await handle_ai_command(text.strip())
await self.send_notification(response, parse_mode="HTML", chat_id=chat_id)
logger.info("telegram_ai_command_handled", user_id=user_id, text=text[:50])
return
# 4. 個人 chat — 顯示輸入狀態
await self._send_chat_action(chat_id, "typing")
# 5. ChatManager 處理(個人 chat
chat_manager = get_chat_manager()
response = await chat_manager.generate_response(
user_id=user_id,
username=username,
message_text=text,
)
await self.send_notification(response, parse_mode="HTML", chat_id=chat_id)
async def _handle_group_message(
self,
text: str,
user_id: int,
username: str,
chat_id: int, # noqa: ARG002
message_id: int | None,
reply_to_message: dict | None = None,
) -> None:
"""
處理 SRE 群組訊息 (2026-04-03 ogt: Phase 22.6 Triumvirate)
路由規則:
Reply OpenClaw 訊息 → 只有 OpenClaw 回應
Reply NemoClaw 訊息 → 只有 NemoClaw 回應
@OpenClawAwoooI_Bot <msg> → 只有 OpenClaw 回應
@NemoTronAwoooI_Bot <msg> → 只有 NemoClaw 回應
其他訊息 → 兩個 AI 並行回應
"""
# ── 指令路由 (2026-04-03 ogt: 方案B slash commands) ──────────────────
cmd = text.strip().split()[0].lower().split("@")[0] if text.strip() else ""
if cmd.startswith("/"):
await self._handle_group_command(cmd, chat_id, message_id)
return
from src.services.chat_manager import get_chat_manager as _get_cm
chat_mgr = _get_cm()
# 全形/半形統一化後比較
import unicodedata
text_normalized = unicodedata.normalize("NFKC", text).lower()
# Reply 路由: 若 Reply 的是 Bot 訊息,直接認定目標 AI (2026-04-03 ogt)
if reply_to_message:
replied_from = reply_to_message.get("from", {})
if replied_from.get("is_bot"):
replied_username = (replied_from.get("username") or "").lower()
if "openclawawoooi" in replied_username:
mention_openclaw, mention_nemo = True, False
elif "nemotronawoooi" in replied_username:
mention_openclaw, mention_nemo = False, True
else:
mention_openclaw = "@openclawawoooi_bot" in text_normalized or "小o" in text_normalized
mention_nemo = "@nemotronawoooi_bot" in text_normalized or "小賀" in text_normalized or "小贺" in text_normalized
else:
mention_openclaw = "@openclawawoooi_bot" in text_normalized or "小o" in text_normalized
mention_nemo = "@nemotronawoooi_bot" in text_normalized or "小賀" in text_normalized or "小贺" in text_normalized
else:
# 別名: 小O / 小o (含全形O) → OpenClaw; 小賀 / 小贺 → NemoClaw
mention_openclaw = "@openclawawoooi_bot" in text_normalized or "小o" in text_normalized
mention_nemo = "@nemotronawoooi_bot" in text_normalized or "小賀" in text_normalized or "小贺" in text_normalized
# 去掉 @ mention 與別名,取出純訊息
clean_text = unicodedata.normalize("NFKC", text)
for token in ["@openclawawoooi_bot", "@OpenClawAwoooI_Bot", "@nemotronawoooi_bot", "@NemoTronAwoooI_Bot",
"小O", "小o", "小O", "小o", "小賀", "小贺"]:
clean_text = clean_text.replace(token, "").strip()
if not clean_text:
clean_text = text
context = await chat_mgr.get_system_context()
if mention_openclaw and not mention_nemo:
# 只 OpenClaw 回應
result = await chat_mgr._call_openclaw(
f"{context}\n用戶 {username} 在 SRE 戰情室問你:",
clean_text,
)
await self.send_as_openclaw(
text=f"🦞 <b>OpenClaw</b>\n\n{result or '🔴 無響應'}",
reply_to_message_id=message_id,
)
elif mention_nemo and not mention_openclaw:
# 只 NemoClaw 回應
result = await chat_mgr._call_nemotron(
f"{context}\n用戶 {username} 在 SRE 戰情室問你:",
clean_text,
)
await self.send_as_nemotron(
text=f"🤖 <b>NemoClaw</b>\n\n{result or '🔴 無響應 (NIM 超時)'}",
reply_to_message_id=message_id,
)
else:
# 兩個 AI 並行回應,完成後互相評論
oc_task = asyncio.create_task(
chat_mgr._call_openclaw(f"{context}\n用戶 {username} 在 SRE 戰情室:", clean_text)
)
nemo_task = asyncio.create_task(
chat_mgr._call_nemotron(f"{context}\n用戶 {username} 在 SRE 戰情室:", clean_text)
)
oc_result, nemo_result = await asyncio.gather(oc_task, nemo_task, return_exceptions=True)
if oc_result and not isinstance(oc_result, Exception):
await self.send_as_openclaw(
text=f"🦞 <b>OpenClaw</b>\n\n{oc_result}",
reply_to_message_id=message_id,
)
if nemo_result and not isinstance(nemo_result, Exception):
await self.send_as_nemotron(
text=f"🤖 <b>NemoClaw</b>\n\n{nemo_result}",
reply_to_message_id=message_id,
)
logger.info("group_message_handled", user_id=user_id, text=text[:50])
async def _handle_group_command(self, cmd: str, chat_id: int, message_id: int | None) -> None: # noqa: ARG002
"""
SRE 群組 Slash Commands (2026-04-03 ogt: 方案B)
/status → K8s Cluster 健康狀態
/incidents → 活躍告警列表
/cost → 本月 AI 費用統計
/pods → 異常 Pod 列表
/help → 指令說明
"""
from src.repositories.k8s_repository import get_k8s_repository
from src.repositories.incident_repository import get_incident_repository
from src.core.redis_client import get_redis
from src.utils.timezone import now_taipei
if cmd == "/status":
try:
k8s = get_k8s_repository()
s = await k8s.get_pod_status_summary(namespace="awoooi-prod")
running, total = s.get("running", 0), s.get("total", 0)
problems = s.get("problem_pods", [])
lines = [f"<b>🖥 Cluster 狀態</b>", f"• Pods: {running}/{total} Running"]
if problems:
lines.append(f"• 異常: {len(problems)}")
for p in problems[:5]:
lines.append(f" ⚠️ {p}")
else:
lines.append("• 全部正常 ✅")
msg = "\n".join(lines)
except Exception as e:
msg = f"<b>🖥 Cluster 狀態</b>\n⚠️ 無法取得: {e}"
await self.send_as_openclaw(text=msg, reply_to_message_id=message_id)
elif cmd == "/incidents":
try:
repo = get_incident_repository()
incidents = await repo.get_active()
if incidents:
lines = ["<b>🚨 活躍告警</b>"]
for inc in incidents[:10]:
lines.append(f"• <code>{inc.incident_id}</code> SEV{inc.severity.value}{inc.status.value}")
msg = "\n".join(lines)
else:
msg = "<b>🚨 活躍告警</b>\n✅ 目前無告警"
except Exception as e:
msg = f"<b>🚨 活躍告警</b>\n⚠️ 無法取得: {e}"
await self.send_as_openclaw(text=msg, reply_to_message_id=message_id)
elif cmd == "/cost":
redis = get_redis()
month = now_taipei().strftime("%Y-%m")
try:
gemini_cost = float(await redis.get(f"gemini_cost:{month}") or 0)
claude_cost = float(await redis.get(f"claude_cost:{month}") or 0)
total = gemini_cost + claude_cost
msg = (
f"<b>💰 {month} AI 費用統計</b>\n"
f"• 🦞 OpenClaw (Gemini Flash-Lite): <b>${gemini_cost:.4f}</b> / $10.00 上限\n"
f"• 🤖 NemoClaw (Claude Haiku 4.5): <b>${claude_cost:.4f}</b>\n"
f"• 合計: <b>${total:.4f}</b>"
)
except Exception as e:
msg = f"<b>💰 費用統計</b>\n⚠️ 無法取得: {e}"
await self.send_as_openclaw(text=msg, reply_to_message_id=message_id)
elif cmd == "/pods":
try:
k8s = get_k8s_repository()
s = await k8s.get_pod_status_summary(namespace="awoooi-prod")
problems = s.get("problem_pods", [])
if problems:
lines = [f"<b>⚠️ 異常 Pod ({len(problems)} 個)</b>"]
for p in problems[:15]:
lines.append(f"• <code>{p}</code>")
msg = "\n".join(lines)
else:
msg = "<b>⚠️ 異常 Pod</b>\n✅ 全部 Pod 正常"
except Exception as e:
msg = f"<b>⚠️ 異常 Pod</b>\n⚠️ 無法取得: {e}"
await self.send_as_openclaw(text=msg, reply_to_message_id=message_id)
elif cmd == "/help":
msg = (
"<b>🤖 SRE 戰情室指令</b>\n\n"
"/status — 查詢 K8s Cluster 狀態\n"
"/incidents — 列出活躍告警\n"
"/cost — 查詢本月 AI 費用\n"
"/pods — 列出異常 Pod\n"
"/help — 顯示此說明\n\n"
"<b>對話方式:</b>\n"
"• 直接輸入 → 小O + 小賀 同時回應\n"
"• 小O 或 @OpenClawAwoooI_Bot → 只有 OpenClaw\n"
"• 小賀 或 @NemoTronAwoooI_Bot → 只有 NemoClaw\n"
"• Reply 某個 Bot 的訊息 → 只有那個 Bot 回應"
)
await self.send_as_openclaw(text=msg, reply_to_message_id=message_id)
else:
logger.debug("group_unknown_command", cmd=cmd)
async def _send_chat_action(self, chat_id: int, action: str) -> None:
"""發送聊天狀態 (e.g., typing)"""
if not self._http_client: return
try:
url = f"{self.api_url}/sendChatAction"
await self._http_client.post(url, json={"chat_id": chat_id, "action": action})
except: pass
async def _execute_approval_action(
self,
action: str,
approval_id: str,
user_id: int,
username: str,
message_id: int, # noqa: ARG002
) -> None:
"""
執行簽核動作 (更新資料庫)
Args:
action: approve/reject/tune
approval_id: 簽核單 ID
user_id: Telegram User ID
username: 使用者名稱
message_id: 訊息 ID
"""
# 2026-03-29 ogt: 修復方法呼叫 - add_signature/reject 不存在
# 正確方法: sign_approval / reject_approval
from uuid import UUID
from src.services.approval_db import get_approval_service
try:
service = get_approval_service()
# approval_id 可能是 INC-xxx (incident_id) 格式,需查出真正的 UUID
# 2026-04-06 Claude Code: decision_manager 傳入的是 incident.incident_id
approval_uuid: UUID | None = None
try:
approval_uuid = UUID(approval_id)
except ValueError:
# 非 UUID 格式,嘗試用 incident_id 查出 pending approval
pending_list = await service.get_all_approvals(incident_id=approval_id)
if pending_list:
approval_uuid = UUID(pending_list[0].id) if isinstance(pending_list[0].id, str) else pending_list[0].id
else:
logger.warning(
"telegram_approval_not_found_by_incident",
approval_id=approval_id,
)
return
if action == "approve":
# 2026-03-29 ogt: 正確呼叫 sign_approval (返回 tuple)
approval, message, execution_triggered = await service.sign_approval(
approval_id=approval_uuid,
signer_id=f"tg_{user_id}",
signer_name=username,
comment="Telegram 簽核 (Long Polling)",
)
if approval:
logger.info(
"telegram_approval_signed_via_polling",
approval_id=approval_id,
user_id=user_id,
status=approval.status.value,
execution_triggered=execution_triggered,
)
elif action == "reject":
# 2026-03-29 ogt: 正確呼叫 reject_approval (返回 tuple)
approval, message = await service.reject_approval(
approval_id=approval_uuid,
rejector_id=f"tg_{user_id}",
rejector_name=username,
reason="Telegram 拒絕 (Long Polling)",
)
if approval:
logger.info(
"telegram_approval_rejected_via_polling",
approval_id=approval_id,
user_id=user_id,
)
elif action == "tune":
# 自動調優已在 handle_callback 中處理
logger.info(
"telegram_auto_tuning_via_polling",
approval_id=approval_id,
user_id=user_id,
)
except Exception as e:
logger.error(
"telegram_approval_action_failed",
action=action,
approval_id=approval_id,
error=str(e),
)
# =============================================================================
# Phase 6.5: 心跳監控方法
# =============================================================================
async def _check_nemotron_health(self) -> tuple[bool, str]:
"""
探測 Nemotron (NVIDIA NIM) 是否可用
2026-04-03 ogt: 新增 — Nemotron 100% 超時但沒有告警,補足監控盲區
Returns: (is_healthy, status_text)
"""
import httpx
from src.core.config import get_settings
settings = get_settings()
api_key = settings.NVIDIA_API_KEY
if not api_key:
return False, "❌ NVIDIA_API_KEY 未設定"
# 2026-04-03 ogt: 用 /v1/models 輕量端點探測,避免觸發推理計費
# timeout 改為 25s — NIM 免費 tier 冷啟動可能需要 15-20s
try:
async with httpx.AsyncClient(timeout=25.0) as client:
resp = await client.get(
"https://integrate.api.nvidia.com/v1/models",
headers={"Authorization": f"Bearer {api_key}"},
)
if resp.status_code == 200:
return True, "✅ 正常"
return False, f"❌ HTTP {resp.status_code}"
except httpx.TimeoutException:
return False, "⚠️ 超時 (>25s)"
except Exception as e:
return False, f"{str(e)[:40]}"
async def send_heartbeat(self) -> bool:
"""
發送心跳訊息 (系統狀態摘要,含 Nemotron 健康探測)
每 30 分鐘執行一次,證明告警鏈路正常運作
2026-04-03 ogt: 加入 Nemotron 健康探測 — 補足監控盲區
"""
try:
if not self._initialized:
await self.initialize()
from src.utils.timezone import now_taipei
taipei_now = now_taipei()
# Nemotron 健康探測
nemo_ok, nemo_status = await self._check_nemotron_health()
text = f"""💓 <b>AWOOOI 心跳</b>
{taipei_now.strftime('%Y-%m-%d %H:%M:%S')} (台北)
📡 告警鏈路: ✅ 正常
🤖 Nemotron NIM: {nemo_status}"""
await self.send_notification(text)
self._last_message_time = datetime.now(UTC)
# Nemotron 異常時告警通知不自動關閉NIM 免費 tier 本來就慢)
# 2026-04-03 ogt: 修正 — 之前錯誤地自動關閉 Nemotron 協作
# Nemotron 是產品核心11-45s是免費 tier 特性,不是需要修復的異常
if not nemo_ok:
alert = InfraAlertMessage(
component="Nemotron NIM (NVIDIA API)",
status=nemo_status,
impact="NIM 免費 tier 回應慢 (11-45s)@nemo 對話可能需等待",
note="NIM 慢屬正常 — 免費 tier 特性,非故障。如需快速回應請用 @openclaw",
)
await self.send_notification(alert.format(), parse_mode="HTML")
logger.warning("nemotron_health_slow_alert", status=nemo_status)
logger.info("telegram_heartbeat_sent", nemotron_ok=nemo_ok)
return True
except Exception as e:
logger.error("telegram_heartbeat_failed", error=str(e))
return False
async def start_heartbeat_monitor(
self,
heartbeat_interval_minutes: int = 30,
silence_threshold_hours: int = 2,
) -> None:
"""
啟動心跳監控背景任務
Args:
heartbeat_interval_minutes: 心跳間隔 (預設 30 分鐘)
silence_threshold_hours: 沉默告警閾值 (預設 2 小時)
"""
if self._heartbeat_active:
logger.warning("telegram_heartbeat_already_running")
return
self._heartbeat_active = True
self._heartbeat_task = asyncio.create_task(
self._heartbeat_loop(heartbeat_interval_minutes, silence_threshold_hours)
)
logger.info(
"telegram_heartbeat_monitor_started",
interval_minutes=heartbeat_interval_minutes,
silence_threshold_hours=silence_threshold_hours,
)
async def _heartbeat_loop(
self,
interval_minutes: int,
silence_hours: int,
) -> None:
"""心跳監控循環"""
interval_seconds = interval_minutes * 60
silence_seconds = silence_hours * 3600
while self._heartbeat_active:
try:
# 1. 檢查沉默告警
if self._last_message_time:
silence_duration = (datetime.now(UTC) - self._last_message_time).total_seconds()
if silence_duration > silence_seconds:
# 發送沉默告警
hours = int(silence_duration // 3600)
await self.send_notification(
f"⚠️ <b>沉默告警</b>\n\n"
f"Telegram 已 <b>{hours} 小時</b>沒有收到任何訊息!\n"
f"請檢查告警鏈路是否正常運作。"
)
logger.warning(
"telegram_silence_alert",
silence_hours=hours,
)
# 2. 發送心跳
await self.send_heartbeat()
# 3. 等待下一次
await asyncio.sleep(interval_seconds)
except asyncio.CancelledError:
break
except Exception as e:
logger.error("telegram_heartbeat_loop_error", error=str(e))
await asyncio.sleep(60) # 錯誤後等待 1 分鐘重試
async def stop_heartbeat_monitor(self) -> None:
"""停止心跳監控"""
self._heartbeat_active = False
if self._heartbeat_task and not self._heartbeat_task.done():
self._heartbeat_task.cancel()
try:
await self._heartbeat_task
except asyncio.CancelledError:
pass
self._heartbeat_task = None
logger.info("telegram_heartbeat_monitor_stopped")
# =============================================================================
# Singleton
# =============================================================================
_gateway: TelegramGateway | None = None
def get_telegram_gateway() -> TelegramGateway:
"""取得全域 TelegramGateway 實例"""
global _gateway
if _gateway is None:
_gateway = TelegramGateway()
return _gateway