Files
awoooi/apps/api/src/services/telegram_gateway.py
OG T 665f93e83f
Some checks failed
CD Pipeline / build-and-deploy (push) Has been cancelled
fix(telegram): 首席架構師 R1 修正 — I-1/I-2/M-1/M-2
I-1: webhooks/sentry_webhook/signoz_webhook 三個呼叫者補 TODO 說明
     無 incident_id 是已知限制(Approval 路徑未建 Incident 關聯)
I-2: TestPushRequest 新增 incident_id 欄位,使 QA 可驗證按鈕渲染
M-1: 移除 _build_inline_keyboard 呼叫中多餘的 `or message.incident_id`
M-2: 補充 900/1000 截斷長度差異說明

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-05 13:07:42 +08:00

3600 lines
136 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Telegram Gateway - OpenClaw 行動戰情室 + SignOz 整合
====================================================
Phase 5.4.3 & 5.4.4: Telegram 推送與簽核接收
統帥校正: SignOz 為唯一全能視力中心
Features:
- 推送待簽核卡片到 Telegram (含 SignOz 指標)
- 動態 SignOz Trace URL (告警前後 5 分鐘)
- 自動調優按鈕 (Shadow Mode: 僅日誌輸出)
- 接收統帥簽核回調
- SOUL.md 訊息壓縮原則 100% 遵守
SOUL.md 鐵律 (4.1 Telegram 訊息壓縮原則):
- 狀態標籤: 20 字元
- 資源名稱: 50 字元
- 根因摘要: 100 字元
- 建議行動: 50 字元
- 總長度: 800 字元 (v7.0 擴展以容納 SignOz 區塊)
修復紀錄:
- 2026-03-26 Claude Code: 修復 HTML 解析錯誤 (Can't parse entities)
"""
import asyncio
import html
import os
from dataclasses import dataclass
from datetime import UTC, datetime
import httpx
import structlog
from opentelemetry import trace
from src.core.config import settings
from src.core.redis_client import get_redis
from src.services.security_interceptor import (
NonceReplayError,
UserNotWhitelistedError,
get_security_interceptor,
)
from src.services.chat_manager import get_chat_manager
# =============================================================================
# Snooze/Silence Redis Keys (2026-03-27 P1 優化)
# =============================================================================
SNOOZE_KEY_PREFIX = "telegram_snooze:" # {approval_id} -> 稍後提醒
SILENCE_KEY_PREFIX = "telegram_silence:" # {resource_name} -> 靜默
SNOOZE_TTL_SECONDS = 30 * 60 # 30 分鐘
SILENCE_TTL_SECONDS = 60 * 60 # 1 小時
# 2026-04-01 Claude Code: Long Polling 分散式 Leader Election
# 防止多 Pod 同時 getUpdates → 409 Conflict 互搶問題
POLLING_LEADER_KEY = "telegram:polling:leader"
POLLING_LEADER_TTL = 45 # seconds - Pod 宕掉後 45s 自動轉移
POLLING_LEADER_RENEW = 20 # seconds - 每 20s 續約
POLLING_LEADER_WATCH = 30 # seconds - 非 Leader Pod 每 30s 嘗試接管
logger = structlog.get_logger(__name__)
# =============================================================================
# OTEL Tracer (Phase C P1 可觀測性)
# 2026-03-30 Claude Code: 新增 Telegram Gateway 追蹤
# =============================================================================
_tracer = trace.get_tracer("awoooi.telegram_gateway", "1.0.0")
# =============================================================================
# Long Polling 配置 (Phase 5 內網修復)
# =============================================================================
LONG_POLLING_TIMEOUT = 30 # getUpdates timeout (秒)
LONG_POLLING_RETRY_DELAY = 5 # 錯誤後重試延遲 (秒)
# =============================================================================
# SignOz Metrics Block (v7.0)
# =============================================================================
@dataclass
class SignOzMetricsBlock:
"""
SignOz 指標區塊 (嵌入 Telegram 卡片)
格式:
📊 SignOz 指標
├ RPS: 150.2 📈
├ Error: 🟢 0.5%
└ P99: 245ms ➡️
"""
rps: float = 0.0
rps_trend: str = "stable" # up, down, stable
error_rate: float = 0.0
p99_latency_ms: float = 0.0
latency_trend: str = "stable"
trace_url: str = ""
def format(self) -> str:
"""格式化為 Telegram HTML"""
trend_emoji = {"up": "📈", "down": "📉", "stable": "➡️"}
error_emoji = "🟢" if self.error_rate < 1 else ("🟡" if self.error_rate < 5 else "🔴")
return (
f"📊 <b>SignOz 指標</b>\n"
f"├ RPS: <code>{self.rps:.1f}</code> {trend_emoji.get(self.rps_trend, '➡️')}\n"
f"├ Error: {error_emoji} <code>{self.error_rate:.2f}%</code>\n"
f"└ P99: <code>{self.p99_latency_ms:.0f}ms</code> {trend_emoji.get(self.latency_trend, '➡️')}"
)
# =============================================================================
# SOUL.md 訊息格式定義 (v7.0 + SignOz)
# =============================================================================
@dataclass
class TelegramMessage:
"""
Telegram 訊息結構 (SOUL.md 4.1 + v7.0 SignOz 整合)
格式:
═══════════════════════════
🚨 CRITICAL | harbor-core
═══════════════════════════
📋 INC-20260321-0001
🎯 資源: harbor-core-7d4b8c9f5
━━━━━━━━━━━━━━━━━━━
🤖 AI 仲裁判定
👥 責任: BE (後端)
📊 信心: 🟢 88%
💡 原因: JVM Heap 配置不當
━━━━━━━━━━━━━━━━━━━
📊 SignOz 指標
├ RPS: 150.2 📈
├ Error: 🟢 0.5%
└ P99: 245ms ➡️
━━━━━━━━━━━━━━━━━━━
🔧 建議: 刪除 Pod
⏱️ 停機: ~30s
🔍 SignOz Trace (±5min)
[✅ 簽核] [❌ 拒絕] [⚡ 自動調優]
"""
status_emoji: str # 🚨, ⚠️,
risk_level: str # CRITICAL, MEDIUM, LOW
resource_name: str # Pod/Deployment 名稱 (max 50)
root_cause: str # 根因摘要 (max 100)
suggested_action: str # 建議操作 (max 50)
estimated_downtime: str # 預計停機時間
approval_id: str # 簽核單 ID
# v6.0 AI 仲裁欄位
incident_id: str = "" # 事件編號 INC-YYYYMMDD-XXXX
primary_responsibility: str = "COLLAB" # FE/BE/INFRA/DB/COLLAB
confidence: float = 0.0 # 信心度 0.0-1.0
namespace: str = "default" # K8s namespace
# v7.0 SignOz 整合
signoz_metrics: SignOzMetricsBlock | None = None
signoz_trace_url: str = "" # 動態時間參數 URL
auto_tuning_command: str = "" # kubectl 調優指令
# 2026-03-29 ogt: AI Token/Cost 追蹤
ai_tokens: int = 0 # LLM Token 使用量
ai_cost: float = 0.0 # LLM 成本 (USD)
# 2026-03-29 ogt: ADR-037 異常頻率統計
anomaly_frequency: dict | None = None # AnomalyCounter 統計
# 2026-03-29 ogt: AI Provider 來源顯示
ai_provider: str = "" # ollama/gemini/claude/expert_system/mock
# 2026-04-04 ogt: 底層模型名稱 (e.g. qwen2.5:7b-instruct, nemotron-70b)
ai_model: str = ""
# ==========================================================================
# Phase 22: Nemotron 協作欄位 (ADR-044)
# 2026-03-31 Claude Code: OpenClaw + Nemotron 雙軌顯示
# ==========================================================================
nemotron_enabled: bool = False # 是否啟用 Nemotron 協作
nemotron_tools: list[dict] | None = None # Tool Calling 結果 [{"tool": str, "args": dict, "valid": bool}]
nemotron_validation: str = "" # "✅ 驗證通過" / "❌ 驗證失敗" / "⏳ 驗證中"
nemotron_latency_ms: float = 0.0 # Nemotron 呼叫延遲 (ms)
def format(self) -> str:
"""
格式化為 SOUL.md 規範的訊息 (含 AI 仲裁 + SignOz)
Returns:
str: 格式化的 Telegram 訊息 (max 900 字元)
"""
# 責任映射
resp_map = {
"FE": "👨‍💻 FE (前端)",
"BE": "⚙️ BE (後端)",
"INFRA": "🏗️ INFRA (基礎設施)",
"DB": "🗄️ DB (資料庫)",
"COLLAB": "🤝 COLLAB (協同處理)",
}
resp_display = resp_map.get(self.primary_responsibility, "❓ 未知")
# 信心度顯示
confidence_pct = int(self.confidence * 100)
if confidence_pct >= 80:
conf_emoji = "🟢"
elif confidence_pct >= 70:
conf_emoji = "🟡"
else:
conf_emoji = "🔴"
# 自動生成事件編號 (2026-03-27 ogt: 修復 INC-INC- 重複前綴)
if self.incident_id:
incident_id = self.incident_id
elif self.approval_id.startswith("INC-"):
incident_id = self.approval_id
else:
incident_id = f"INC-{self.approval_id[:8].upper()}"
# SignOz URL (優先使用動態 URL) - 必須 HTML 轉義防止解析錯誤
service_name = self.resource_name.split("-")[0] if "-" in self.resource_name else self.resource_name
raw_url = self.signoz_trace_url or f"http://192.168.0.188:3301/traces?service={service_name}"
signoz_url = html.escape(raw_url, quote=True)
# SignOz 指標區塊
signoz_block = ""
if self.signoz_metrics:
signoz_block = f"━━━━━━━━━━━━━━━━━━━\n{self.signoz_metrics.format()}\n"
# HTML 轉義用戶輸入內容,防止 "Can't parse entities" 錯誤
safe_resource = html.escape(self.resource_name[:35])
safe_root_cause = html.escape(self.root_cause[:50])
safe_action = html.escape(self.suggested_action[:35])
safe_downtime = html.escape(self.estimated_downtime)
# 2026-03-29 ogt: AI Token/Cost 顯示
ai_cost_display = ""
if self.ai_tokens > 0 or self.ai_cost > 0:
ai_cost_display = f"💰 Tokens: {self.ai_tokens:,} / ${self.ai_cost:.4f}\n"
# 2026-03-29 ogt: ADR-037 異常頻率顯示
frequency_block = ""
if self.anomaly_frequency and self.anomaly_frequency.get("count_24h", 0) > 1:
freq = self.anomaly_frequency
escalation_emoji = {
None: "",
"REPEAT": "⚠️",
"ESCALATE": "🔴",
"PERMANENT_FIX": "🚨",
}.get(freq.get("escalation_level"), "")
frequency_block = (
f"━━━━━━━━━━━━━━━━━━━\n"
f"📊 <b>頻率統計</b> {escalation_emoji}\n"
f"├ 1h: <code>{freq.get('count_1h', 0)}</code> 次\n"
f"├ 24h: <code>{freq.get('count_24h', 0)}</code> 次\n"
f"└ 修復: <code>{freq.get('auto_repair_count', 0)}</code> 次\n"
)
if freq.get("escalation_level"):
frequency_block += f"🔺 升級: <b>{freq['escalation_level']}</b>\n"
# 2026-03-29 ogt: 根據 confidence + ai_provider 動態顯示來源
# confidence > 0 = 真實 AI 分析, confidence == 0 = 規則匹配/降級
# 2026-04-04 ogt: 加入 ai_model 顯示底層模型名稱
if self.confidence > 0 and self.ai_provider:
provider_names = {
"ollama": "Ollama",
"gemini": "Gemini",
"claude": "Claude",
"nvidia": "Nemotron",
"openclaw_nemo": "OpenClaw Nemo",
"openclaw_nvidia_nim": "OpenClaw Nemo",
"openclaw_qwen": "OpenClaw Nemo",
}
provider_display = provider_names.get(self.ai_provider.lower(), self.ai_provider.upper())
model_suffix = f" ({html.escape(self.ai_model)})" if self.ai_model else ""
source_label = f"🤖 <b>{provider_display} 仲裁</b>{model_suffix}"
elif self.confidence > 0:
source_label = "🤖 <b>AI 仲裁判定</b>"
else:
source_label = "⚙️ <b>規則匹配</b>"
# 2026-04-05 Claude Code: 重設計訊息格式,提升易讀性
# 組裝訊息
message = (
f"{self.status_emoji} <b>{html.escape(self.risk_level)}</b> <code>{html.escape(incident_id)}</code>\n"
f"<b>{safe_resource}</b>\n"
f"\n"
f"{source_label} {conf_emoji} {confidence_pct}%\n"
f"👥 {resp_display}\n"
f"💡 {safe_root_cause}\n"
)
if ai_cost_display:
message += f"{ai_cost_display}"
if frequency_block:
message += f"\n{frequency_block}"
if signoz_block:
message += f"\n{signoz_block}"
message += (
f"\n🔧 <b>建議:</b> {safe_action}\n"
f"⏱️ 停機: {safe_downtime}\n"
f"🔍 <a href='{signoz_url}'>SignOz Trace</a>"
)
return message[:900] # 900 字元上限 (nemotron 版用 1000因有額外 tool 區塊)
def format_with_nemotron(self) -> str:
"""
格式化含 Nemotron 結果的訊息 (Phase 22 ADR-044)
格式:
═══════════════════════════
🚨 CRITICAL | harbor-core
═══════════════════════════
📋 INC-20260331-0001
🎯 資源: harbor-core-7d4b8c9f5
━━━━━━━━━━━━━━━━━━━
🤖 OpenClaw 仲裁
├ 📊 信心: 🟢 85%
├ 👥 責任: BE (後端)
└ 💡 原因: JVM Heap 配置不當
━━━━━━━━━━━━━━━━━━━
🔧 Nemotron 執行方案
✅ restart_deployment: awoooi-api
✅ scale_deployment: replicas=3
└ 驗證: ✅ 驗證通過
━━━━━━━━━━━━━━━━━━━
🔧 建議: 刪除 Pod
⏱️ 停機: ~30s
Returns:
str: 格式化的 Telegram 訊息 (max 1000 字元)
"""
# 責任映射
resp_map = {
"FE": "👨‍💻 FE (前端)",
"BE": "⚙️ BE (後端)",
"INFRA": "🏗️ INFRA (基礎設施)",
"DB": "🗄️ DB (資料庫)",
"COLLAB": "🤝 COLLAB (協同處理)",
}
resp_display = resp_map.get(self.primary_responsibility, "❓ 未知")
# 信心度顯示
confidence_pct = int(self.confidence * 100)
if confidence_pct >= 80:
conf_emoji = "🟢"
elif confidence_pct >= 70:
conf_emoji = "🟡"
else:
conf_emoji = "🔴"
# 自動生成事件編號
if self.incident_id:
incident_id = self.incident_id
elif self.approval_id.startswith("INC-"):
incident_id = self.approval_id
else:
incident_id = f"INC-{self.approval_id[:8].upper()}"
# HTML 轉義
safe_resource = html.escape(self.resource_name[:35])
safe_root_cause = html.escape(self.root_cause[:50])
safe_action = html.escape(self.suggested_action[:35])
safe_downtime = html.escape(self.estimated_downtime)
# AI Provider 顯示
# 2026-04-04 ogt: 加入 ai_model 顯示底層模型名稱
if self.confidence > 0 and self.ai_provider:
provider_names = {
"ollama": "Ollama",
"gemini": "Gemini",
"claude": "Claude",
"nvidia": "Nemotron",
"openclaw_nemo": "OpenClaw Nemo",
"openclaw_nvidia_nim": "OpenClaw Nemo",
"openclaw_qwen": "OpenClaw Nemo",
}
provider_display = provider_names.get(self.ai_provider.lower(), self.ai_provider.upper())
model_suffix = f" ({html.escape(self.ai_model)})" if self.ai_model else ""
source_label = f"🤖 <b>{provider_display} 仲裁</b>{model_suffix}"
elif self.confidence > 0:
source_label = "🤖 <b>OpenClaw 仲裁</b>"
else:
source_label = "⚙️ <b>規則匹配</b>"
# Nemotron 區塊
nemotron_block = ""
if self.nemotron_enabled and self.nemotron_tools:
tools_lines = []
for t in self.nemotron_tools[:3]: # 最多顯示 3 個
valid_emoji = "" if t.get("valid", False) else ""
tool_name = html.escape(str(t.get("tool", "unknown"))[:20])
args_str = str(t.get("args", {}))[:25]
safe_args = html.escape(args_str)
tools_lines.append(f" {valid_emoji} {tool_name}: {safe_args}")
tools_str = "\n".join(tools_lines)
validation_display = html.escape(self.nemotron_validation or "⏳ 驗證中")
nemotron_block = (
f"━━━━━━━━━━━━━━━━━━━\n"
f"🔧 <b>Nemotron 執行方案</b>\n"
f"{tools_str}\n"
f"└ 驗證: {validation_display}\n"
)
if self.nemotron_latency_ms > 0:
nemotron_block += f"└ 延遲: {self.nemotron_latency_ms:.0f}ms\n"
# 2026-04-05 Claude Code: 重設計訊息格式,提升易讀性
# 組裝訊息
message = (
f"{self.status_emoji} <b>{html.escape(self.risk_level)}</b> <code>{html.escape(incident_id)}</code>\n"
f"<b>{safe_resource}</b>\n"
f"\n"
f"{source_label} {conf_emoji} {confidence_pct}%\n"
f"👥 {resp_display}\n"
f"💡 {safe_root_cause}\n"
)
if nemotron_block:
message += f"\n{nemotron_block}"
message += (
f"\n🔧 <b>建議:</b> {safe_action}\n"
f"⏱️ 停機: {safe_downtime}"
)
return message[:1000]
# =============================================================================
# 新訊息模板 (2026-03-29 ogt: ADR-038 Telegram 訊息規範)
# =============================================================================
@dataclass
class SentryErrorMessage:
"""
Sentry 錯誤訊息 (SENTRY_ERROR)
2026-03-29 ogt: 新增,用於 Sentry 錯誤通知
按鈕: [🔍 查看詳情] [🔕 靜默 1h]
"""
error_id: str # Sentry Issue ID
error_type: str # TypeError, ValueError, etc.
error_message: str # 錯誤訊息 (max 100)
service_name: str # awoooi-api, awoooi-web, etc.
file_location: str # src/api/v1/incidents.py:123
occurrence_count: int = 1 # 發生次數
affected_users: int = 0 # 影響用戶數
first_seen: str = "" # 首次發生時間
stack_trace: list[str] | None = None # Stack trace (前 3 行)
sentry_url: str = "" # Sentry 連結
def format(self) -> str:
"""格式化為 Telegram HTML"""
safe_error = html.escape(self.error_message[:80])
safe_type = html.escape(self.error_type[:30])
safe_service = html.escape(self.service_name[:25])
safe_file = html.escape(self.file_location[:50])
# Stack trace 區塊
trace_block = ""
if self.stack_trace:
trace_lines = "\n".join(f"{html.escape(line[:50])}" for line in self.stack_trace[:3])
trace_block = f"🔗 Stack Trace (前 3 行):\n{trace_lines}\n"
# Sentry URL
sentry_link = ""
if self.sentry_url:
safe_url = html.escape(self.sentry_url, quote=True)
sentry_link = f"\n🔍 <a href='{safe_url}'>查看 Sentry</a>"
message = (
f"═══════════════════════════\n"
f"🐛 <b>SENTRY ERROR</b> | {safe_service}\n"
f"═══════════════════════════\n"
f"📋 <code>{html.escape(self.error_id)}</code>\n"
f"🎯 錯誤: <code>{safe_type}</code>\n"
f"━━━━━━━━━━━━━━━━━━━\n"
f"💬 {safe_error}\n"
f"━━━━━━━━━━━━━━━━━━━\n"
f"📊 <b>統計</b>\n"
f"├ 發生次數: <code>{self.occurrence_count}</code>\n"
f"├ 影響用戶: <code>{self.affected_users}</code>\n"
f"└ 首次發生: {html.escape(self.first_seen) if self.first_seen else 'N/A'}\n"
f"━━━━━━━━━━━━━━━━━━━\n"
f"📍 位置: <code>{safe_file}</code>\n"
f"{trace_block}"
f"{sentry_link}"
)
return message[:900]
@dataclass
class ResourceWarnMessage:
"""
資源告警訊息 (RESOURCE_WARN)
2026-03-29 ogt: 新增,用於資源耗盡警告
按鈕: [⚡ 自動擴展] [🔕 靜默 1h]
"""
resource_id: str # RES-YYYYMMDD-XXXX
pod_name: str # Pod 名稱
namespace: str = "default" # K8s namespace
cpu_percent: float = 0.0 # CPU 使用率
cpu_limit: str = "" # CPU limit (e.g., 500m)
memory_percent: float = 0.0 # Memory 使用率
memory_limit: str = "" # Memory limit (e.g., 512Mi)
disk_percent: float = 0.0 # Disk 使用率
trend_info: str = "" # 趨勢資訊
suggestion: str = "" # 建議操作
def format(self) -> str:
"""格式化為 Telegram HTML"""
safe_pod = html.escape(self.pod_name[:35])
safe_ns = html.escape(self.namespace[:20])
# 資源狀態 emoji
def get_status_emoji(percent: float) -> str:
if percent >= 90:
return "🔴"
elif percent >= 70:
return "🟡"
return "🟢"
cpu_emoji = get_status_emoji(self.cpu_percent)
mem_emoji = get_status_emoji(self.memory_percent)
disk_emoji = get_status_emoji(self.disk_percent)
# 趨勢和建議
trend_block = ""
if self.trend_info:
trend_block = f"📈 趨勢: {html.escape(self.trend_info[:50])}\n"
suggestion_block = ""
if self.suggestion:
suggestion_block = f"💡 建議: {html.escape(self.suggestion[:50])}\n"
message = (
f"═══════════════════════════\n"
f"⚠️ <b>資源告警</b> | {safe_ns}\n"
f"═══════════════════════════\n"
f"📋 <code>{html.escape(self.resource_id)}</code>\n"
f"🎯 Pod: <code>{safe_pod}</code>\n"
f"━━━━━━━━━━━━━━━━━━━\n"
f"📊 <b>資源使用率</b>\n"
f"├ CPU: {cpu_emoji} <code>{self.cpu_percent:.1f}%</code>"
f"{f' (limit: {self.cpu_limit})' if self.cpu_limit else ''}\n"
f"├ Memory: {mem_emoji} <code>{self.memory_percent:.1f}%</code>"
f"{f' (limit: {self.memory_limit})' if self.memory_limit else ''}\n"
f"└ Disk: {disk_emoji} <code>{self.disk_percent:.1f}%</code>\n"
f"━━━━━━━━━━━━━━━━━━━\n"
f"{trend_block}"
f"{suggestion_block}"
)
return message[:900]
@dataclass
class RepairReportMessage:
"""
自動修復報告訊息 (REPAIR_REPORT)
2026-03-29 ogt: 新增,用於每日自動修復彙總
按鈕: 無
"""
report_date: str # 報告日期 (YYYY-MM-DD)
total_repairs: int = 0 # 總修復次數
success_count: int = 0 # 成功次數
failure_count: int = 0 # 失敗次數
saved_minutes: int = 0 # 節省人工時間 (分鐘)
top_issues: list[tuple[str, int]] | None = None # Top 問題 [(name, count)]
ai_cost_gemini: float = 0.0 # Gemini 成本
ai_cost_nvidia: float = 0.0 # NVIDIA 成本 (免費)
ai_tokens_total: int = 0 # 總 Token 數
def format(self) -> str:
"""格式化為 Telegram HTML"""
# 成功率
success_rate = (self.success_count / self.total_repairs * 100) if self.total_repairs > 0 else 0
# Top 問題區塊
issues_block = ""
if self.top_issues:
issues_lines = "\n".join(
f" {i+1}. {html.escape(name[:30])} ({count} 次)"
for i, (name, count) in enumerate(self.top_issues[:3])
)
issues_block = f"━━━━━━━━━━━━━━━━━━━\n🔝 <b>Top 3 問題</b>:\n{issues_lines}\n"
# AI 成本
total_cost = self.ai_cost_gemini + self.ai_cost_nvidia
message = (
f"═══════════════════════════\n"
f"🔧 <b>自動修復報告</b> | 每日彙總\n"
f"═══════════════════════════\n"
f"📅 {html.escape(self.report_date)}\n"
f"━━━━━━━━━━━━━━━━━━━\n"
f"📊 <b>統計</b>\n"
f"├ 總修復次數: <code>{self.total_repairs}</code>\n"
f"├ 成功: ✅ <code>{self.success_count}</code> ({success_rate:.0f}%)\n"
f"├ 失敗: ❌ <code>{self.failure_count}</code>\n"
f"└ 節省人工: ~<code>{self.saved_minutes}</code> 分鐘\n"
f"{issues_block}"
f"━━━━━━━━━━━━━━━━━━━\n"
f"💰 <b>AI 成本</b>\n"
f"├ Gemini: ${self.ai_cost_gemini:.4f} ({self.ai_tokens_total:,} tokens)\n"
f"├ NVIDIA: ${self.ai_cost_nvidia:.4f} (免費)\n"
f"└ 總計: ${total_cost:.4f}"
)
return message[:900]
@dataclass
class DailySummaryMessage:
"""
每日摘要訊息 (DAILY_SUMMARY)
2026-03-29 ogt: 新增,用於每日系統狀態摘要
按鈕: 無
"""
summary_date: str # 摘要日期 (YYYY-MM-DD)
# 告警統計
alert_total: int = 0
alert_critical: int = 0
alert_medium: int = 0
alert_low: int = 0
# 處理統計
auto_repair_count: int = 0
manual_approval_count: int = 0
ignored_count: int = 0
avg_response_minutes: float = 0.0
# 可用性
api_availability: float = 99.9
web_availability: float = 99.9
worker_availability: float = 99.9
# 成本
ai_cost: float = 0.0
cloud_cost: float = 0.0
budget_remaining: float = 0.0
def format(self) -> str:
"""格式化為 Telegram HTML"""
# 處理百分比
total_handled = self.auto_repair_count + self.manual_approval_count + self.ignored_count
auto_pct = (self.auto_repair_count / total_handled * 100) if total_handled > 0 else 0
manual_pct = (self.manual_approval_count / total_handled * 100) if total_handled > 0 else 0
ignored_pct = (self.ignored_count / total_handled * 100) if total_handled > 0 else 0
message = (
f"═══════════════════════════\n"
f"📊 <b>每日摘要</b> | AWOOOI\n"
f"═══════════════════════════\n"
f"📅 {html.escape(self.summary_date)}\n"
f"━━━━━━━━━━━━━━━━━━━\n"
f"🚨 <b>告警統計</b>\n"
f"├ 總數: <code>{self.alert_total}</code>\n"
f"├ Critical: <code>{self.alert_critical}</code>\n"
f"├ Medium: <code>{self.alert_medium}</code>\n"
f"└ Low: <code>{self.alert_low}</code>\n"
f"━━━━━━━━━━━━━━━━━━━\n"
f"✅ <b>處理統計</b>\n"
f"├ 自動修復: <code>{self.auto_repair_count}</code> ({auto_pct:.0f}%)\n"
f"├ 人工簽核: <code>{self.manual_approval_count}</code> ({manual_pct:.0f}%)\n"
f"├ 忽略/靜默: <code>{self.ignored_count}</code> ({ignored_pct:.0f}%)\n"
f"└ 平均回應: <code>{self.avg_response_minutes:.1f}</code> 分鐘\n"
f"━━━━━━━━━━━━━━━━━━━\n"
f"📈 <b>可用性</b>\n"
f"├ API: <code>{self.api_availability:.2f}%</code>\n"
f"├ Web: <code>{self.web_availability:.2f}%</code>\n"
f"└ Worker: <code>{self.worker_availability:.2f}%</code>\n"
f"━━━━━━━━━━━━━━━━━━━\n"
f"💰 <b>成本</b>\n"
f"├ AI: ${self.ai_cost:.2f}\n"
f"├ 雲端: ${self.cloud_cost:.2f}\n"
f"└ 預算剩餘: ${self.budget_remaining:.2f}"
)
return message[:900]
@dataclass
class CICDProgressMessage:
"""
CI/CD 進度訊息 (CICD_PROGRESS)
2026-03-30 ogt: 新增,用於 CI/CD 流程進度通知
特性: 簡潔、不走 AI 仲裁、無按鈕
"""
job_name: str # Job 名稱 (e.g., Build, Test, Deploy)
status: str # running, success, failed
stage: str = "" # CI/CD 階段 (e.g., build, test, deploy)
commit_sha: str = "" # Git commit SHA
triggered_by: str = "" # 觸發者
duration_seconds: int = 0 # 執行時間
message: str = "" # 額外訊息
workflow_url: str = "" # Workflow 連結
def format(self) -> str:
"""格式化為 Telegram HTML (簡潔版)"""
# 狀態 emoji
status_emoji = {
"running": "🔄",
"success": "",
"failed": "",
"pending": "",
}.get(self.status.lower(), "📦")
safe_job = html.escape(self.job_name[:40])
safe_stage = html.escape(self.stage[:20]) if self.stage else ""
# 時間格式化
duration_str = ""
if self.duration_seconds > 0:
minutes = self.duration_seconds // 60
seconds = self.duration_seconds % 60
duration_str = f" ({minutes}m {seconds}s)" if minutes > 0 else f" ({seconds}s)"
# Commit 資訊
commit_info = ""
if self.commit_sha:
commit_info = f"\n📋 <code>{html.escape(self.commit_sha[:8])}</code>"
# Workflow 連結
workflow_link = ""
if self.workflow_url:
safe_url = html.escape(self.workflow_url, quote=True)
workflow_link = f"\n🔗 <a href='{safe_url}'>Workflow</a>"
# 簡潔訊息
stage_label = f" | {safe_stage}" if safe_stage else ""
message = (
f"{status_emoji} <b>[AWOOOI CI/CD]</b>{stage_label}\n"
f"📦 {safe_job}{duration_str}"
f"{commit_info}"
f"{workflow_link}"
)
return message[:900]
@dataclass
class DeploySuccessMessage:
"""
部署成功訊息 (DEPLOY_SUCCESS)
2026-03-29 ogt: 新增,用於 CD 部署成功通知
按鈕: 無
"""
commit_sha: str # Git commit SHA (short)
triggered_by: str # 觸發者
environment: str = "Production" # 環境
# 版本資訊
api_version: str = ""
web_version: str = ""
worker_version: str = ""
# 部署時間
duration_seconds: int = 0
# 測試結果
e2e_passed: int = 0
e2e_total: int = 0
health_check_passed: bool = True
# 連結
workflow_url: str = ""
def format(self) -> str:
"""格式化為 Telegram HTML"""
safe_commit = html.escape(self.commit_sha[:8])
safe_user = html.escape(self.triggered_by[:20])
safe_env = html.escape(self.environment[:15])
# 部署時間格式化
minutes = self.duration_seconds // 60
seconds = self.duration_seconds % 60
duration_str = f"{minutes}m {seconds}s" if minutes > 0 else f"{seconds}s"
# 測試結果
e2e_status = "" if self.e2e_passed == self.e2e_total else "⚠️"
health_status = "✅ 全部通過" if self.health_check_passed else "❌ 部分失敗"
# Workflow 連結
workflow_link = ""
if self.workflow_url:
safe_url = html.escape(self.workflow_url, quote=True)
workflow_link = f"\n🔗 <a href='{safe_url}'>查看 Workflow</a>"
message = (
f"✅ <b>部署成功</b> | {safe_env}\n\n"
f"📋 Commit: <code>{safe_commit}</code>\n"
f"👤 觸發者: @{safe_user}\n"
f"━━━━━━━━━━━━━━━━━━━\n"
f"📊 <b>部署詳情</b>\n"
f"├ API: {html.escape(self.api_version) if self.api_version else 'N/A'}\n"
f"├ Web: {html.escape(self.web_version) if self.web_version else 'N/A'}\n"
f"├ Worker: {html.escape(self.worker_version) if self.worker_version else 'N/A'}\n"
f"└ 耗時: {duration_str}\n"
f"━━━━━━━━━━━━━━━━━━━\n"
f"🧪 E2E 測試: {e2e_status} {self.e2e_passed}/{self.e2e_total} PASSED\n"
f"📊 健康檢查: {health_status}"
f"{workflow_link}"
)
return message[:900]
@dataclass
class RateLimitMessage:
"""
API 限額警告訊息 (RATE_LIMIT)
2026-03-29 ogt: 新增,用於 AI API 限額警告
按鈕: 無
"""
provider: str # gemini, openai, etc.
# 用量統計
daily_usage: int = 0
daily_limit: int = 0
token_usage: int = 0
token_limit: int = 0
cost_usd: float = 0.0
# 建議
suggestions: list[str] | None = None
# 重置時間
reset_time: str = ""
def format(self) -> str:
"""格式化為 Telegram HTML"""
safe_provider = html.escape(self.provider.upper()[:15])
# 使用率百分比
usage_pct = (self.daily_usage / self.daily_limit * 100) if self.daily_limit > 0 else 0
token_pct = (self.token_usage / self.token_limit * 100) if self.token_limit > 0 else 0
# 建議區塊
suggestion_block = ""
if self.suggestions:
suggestion_lines = "\n".join(f" - {html.escape(s[:50])}" for s in self.suggestions[:3])
suggestion_block = f"━━━━━━━━━━━━━━━━━━━\n💡 <b>建議</b>:\n{suggestion_lines}\n"
# 重置時間
reset_block = ""
if self.reset_time:
reset_block = f"\n🔄 將於 {html.escape(self.reset_time)} 重置"
message = (
f"⚠️ <b>API 限額警告</b>\n\n"
f"━━━━━━━━━━━━━━━━━━━\n"
f"📊 <b>{safe_provider} API</b>\n"
f"├ 今日用量: <code>{self.daily_usage}/{self.daily_limit}</code> ({usage_pct:.0f}%)\n"
f"├ Token: <code>{self.token_usage:,}/{self.token_limit:,}</code> ({token_pct:.0f}%)\n"
f"└ 成本: ${self.cost_usd:.4f}\n"
f"{suggestion_block}"
f"{reset_block}"
)
return message[:900]
@dataclass
class K3sStatusMessage:
"""
K3s 叢集狀態報告訊息 (K3S_STATUS)
2026-03-31 Claude Code: Phase 21.2 定期報告
用於每日 K3s 健康狀態推送
按鈕: 無
"""
report_date: str # 報告日期 (YYYY-MM-DD HH:MM)
# 節點狀態
node_total: int = 0
node_ready: int = 0
# Pod 狀態
pod_total: int = 0
pod_running: int = 0
pod_pending: int = 0
pod_failed: int = 0
# HPA 狀態
hpa_api_replicas: str = "2/6"
hpa_web_replicas: str = "2/6"
hpa_worker_replicas: str = "1/3"
# 備份狀態
etcd_backup_last: str = ""
velero_backup_last: str = ""
# 穩定指標
alert_count_48h: int = 0
pod_restart_48h: int = 0
# 版本資訊
k3s_version: str = ""
def format(self) -> str:
"""格式化為 Telegram HTML"""
# 健康狀態 emoji
node_health = "" if self.node_ready == self.node_total else "⚠️"
pod_health = "" if self.pod_failed == 0 and self.pod_pending == 0 else "⚠️"
stability = "" if self.alert_count_48h == 0 and self.pod_restart_48h == 0 else "⚠️"
# 備份狀態
etcd_status = html.escape(self.etcd_backup_last[:20]) if self.etcd_backup_last else "N/A"
velero_status = html.escape(self.velero_backup_last[:20]) if self.velero_backup_last else "N/A"
message = (
f"═══════════════════════════\n"
f"🎛️ <b>K3s 叢集狀態</b> | Daily\n"
f"═══════════════════════════\n"
f"📅 {html.escape(self.report_date)}\n"
f"━━━━━━━━━━━━━━━━━━━\n"
f"{node_health} <b>節點</b>: {self.node_ready}/{self.node_total} Ready\n"
f"━━━━━━━━━━━━━━━━━━━\n"
f"{pod_health} <b>Pod 狀態</b>\n"
f"├ Running: <code>{self.pod_running}</code>\n"
f"├ Pending: <code>{self.pod_pending}</code>\n"
f"└ Failed: <code>{self.pod_failed}</code>\n"
f"━━━━━━━━━━━━━━━━━━━\n"
f"📊 <b>HPA 副本數</b>\n"
f"├ API: <code>{html.escape(self.hpa_api_replicas)}</code>\n"
f"├ Web: <code>{html.escape(self.hpa_web_replicas)}</code>\n"
f"└ Worker: <code>{html.escape(self.hpa_worker_replicas)}</code>\n"
f"━━━━━━━━━━━━━━━━━━━\n"
f"💾 <b>備份</b>\n"
f"├ etcd: {etcd_status}\n"
f"└ Velero: {velero_status}\n"
f"━━━━━━━━━━━━━━━━━━━\n"
f"{stability} <b>48h 穩定度</b>\n"
f"├ 告警: <code>{self.alert_count_48h}</code>\n"
f"└ Pod 重啟: <code>{self.pod_restart_48h}</code>"
)
return message[:900]
@dataclass
class WeeklyReportMessage:
"""
週報訊息 (WEEKLY_REPORT)
2026-03-31 Claude Code: Phase 21.3 定期報告
每週五 18:00 台北發送
按鈕: 無
"""
week_range: str # 週次 (e.g., "2026-W14")
report_date: str # 報告日期時間
# 告警統計
alert_total: int = 0
alert_critical: int = 0
alert_resolved: int = 0
resolved_rate: float = 0.0
# AI 效能
ai_proposal_count: int = 0
ai_executed_count: int = 0
ai_success_rate: float = 0.0
avg_response_minutes: float = 0.0
# K3s 健康
k3s_uptime_pct: float = 99.9
pod_restart_total: int = 0
hpa_scale_events: int = 0
# Git 活動
commits_count: int = 0
deploy_count: int = 0
# 成本
ai_cost_week: float = 0.0
ai_tokens_week: int = 0
def format(self) -> str:
"""格式化為 Telegram HTML"""
# 健康狀態 emoji
alert_health = "" if self.resolved_rate >= 80 else "⚠️"
ai_health = "" if self.ai_success_rate >= 70 else "⚠️"
k3s_health = "" if self.k3s_uptime_pct >= 99 else "⚠️"
message = (
f"═══════════════════════════\n"
f"📊 <b>AWOOOI 週報</b>\n"
f"═══════════════════════════\n"
f"📅 {html.escape(self.week_range)} | {html.escape(self.report_date)}\n"
f"━━━━━━━━━━━━━━━━━━━\n"
f"{alert_health} <b>告警統計</b>\n"
f"├ 總數: <code>{self.alert_total}</code>\n"
f"├ Critical: <code>{self.alert_critical}</code>\n"
f"├ 已解決: <code>{self.alert_resolved}</code>\n"
f"└ 解決率: <code>{self.resolved_rate:.1f}%</code>\n"
f"━━━━━━━━━━━━━━━━━━━\n"
f"{ai_health} <b>AI 效能</b>\n"
f"├ 提案數: <code>{self.ai_proposal_count}</code>\n"
f"├ 執行數: <code>{self.ai_executed_count}</code>\n"
f"├ 成功率: <code>{self.ai_success_rate:.1f}%</code>\n"
f"└ 平均回應: <code>{self.avg_response_minutes:.1f}</code> 分鐘\n"
f"━━━━━━━━━━━━━━━━━━━\n"
f"{k3s_health} <b>K3s 健康</b>\n"
f"├ Uptime: <code>{self.k3s_uptime_pct:.2f}%</code>\n"
f"├ Pod 重啟: <code>{self.pod_restart_total}</code>\n"
f"└ HPA 擴縮: <code>{self.hpa_scale_events}</code> 次\n"
f"━━━━━━━━━━━━━━━━━━━\n"
f"📦 <b>開發活動</b>\n"
f"├ Commits: <code>{self.commits_count}</code>\n"
f"└ 部署: <code>{self.deploy_count}</code> 次\n"
f"━━━━━━━━━━━━━━━━━━━\n"
f"💰 <b>AI 成本</b>\n"
f"├ 費用: $<code>{self.ai_cost_week:.2f}</code>\n"
f"└ Tokens: <code>{self.ai_tokens_week:,}</code>"
)
return message[:900]
@dataclass
class InfraAlertMessage:
"""
基礎設施異常告警訊息 (INFRA_ALERT)
2026-04-03 ogt: 新增 — 補足 Nemotron/NIM 等基礎設施異常的標準告警格式
用途: 非 incident 型的系統元件異常通知 (AI provider, DB, 外部 API 等)
按鈕: 無 (資訊型告警)
"""
component: str # 元件名稱 (e.g., "Nemotron NIM")
status: str # 狀態描述 (e.g., "⚠️ 超時 (>25s)")
impact: str # 影響說明
auto_fixed: bool = False # 是否已自動修復
fix_action: str = "" # 執行的修復動作 (auto_fixed=True 時顯示)
note: str = "" # 附加說明 (info_only 情境用,不顯示修復區塊)
def format(self) -> str:
"""格式化為 Telegram HTML"""
# 有 note 表示「資訊性提示」,不顯示修復區塊
if self.note:
footer = f"━━━━━━━━━━━━━━━━━━━\n💡 {html.escape(self.note[:150])}\n"
elif self.auto_fixed:
footer = f"━━━━━━━━━━━━━━━━━━━\n✅ <b>已自動修復</b>\n{html.escape(self.fix_action[:100])}\n"
else:
footer = f"━━━━━━━━━━━━━━━━━━━\n⚠️ <b>需要關注</b>\n{html.escape(self.fix_action[:100] or '請確認元件狀態')}\n"
return (
f"🚨 <b>基礎設施異常</b>\n"
f"━━━━━━━━━━━━━━━━━━━\n"
f"⚙️ <b>{html.escape(self.component)}</b>: {html.escape(self.status)}\n"
f"📛 影響: {html.escape(self.impact[:150])}\n"
f"{footer}"
)[:900]
# =============================================================================
# Risk Level Emoji Mapping
# =============================================================================
RISK_EMOJI_MAP = {
"critical": "🚨",
"high": "🔴",
"medium": "⚠️",
"low": "",
}
# =============================================================================
# Telegram Gateway
# =============================================================================
class TelegramGatewayError(Exception):
"""Telegram Gateway 錯誤"""
pass
class TelegramGateway:
"""
Telegram Gateway - 行動戰情室 + SignOz 整合
職責:
1. 推送待簽核卡片到 Telegram (含 SignOz 指標)
2. 接收並驗證簽核/調優回調
3. Shadow Mode 調優執行 (僅日誌)
4. 遵守 SOUL.md 訊息壓縮原則
"""
TELEGRAM_API_BASE = "https://api.telegram.org"
def __init__(self):
self._http_client: httpx.AsyncClient | None = None
self._security = get_security_interceptor()
self._initialized = False
# Long Polling 狀態 (Phase 5 內網修復)
self._polling_active = False
self._polling_task: asyncio.Task | None = None
self._last_update_id = 0
# 2026-04-01 Claude Code: 分散式 Leader Election (防 2-Pod 409 互搶)
self._pod_id = os.environ.get("POD_NAME", os.urandom(8).hex())
self._leader_task: asyncio.Task | None = None
# Phase 6.5: 心跳監控 (防止沉默盲點)
self._last_message_time: datetime | None = None
self._heartbeat_task: asyncio.Task | None = None
self._heartbeat_active = False
async def initialize(self) -> bool:
"""初始化 Gateway"""
if not settings.OPENCLAW_TG_BOT_TOKEN:
logger.warning("telegram_gateway_disabled", reason="Bot token not configured")
return False
if not settings.OPENCLAW_TG_CHAT_ID:
logger.warning("telegram_gateway_disabled", reason="Chat ID not configured")
return False
# 2026-04-03 ogt: timeout 改用 httpx.Timeout 分開設定
# connect=10s, read=50s (getUpdates long polling timeout 40s + buffer)
# 舊的 timeout=30.0 會讓 getUpdates(timeout=40s) 每次都被 client 先打斷
self._http_client = httpx.AsyncClient(
timeout=httpx.Timeout(connect=10.0, read=50.0, write=10.0, pool=10.0),
headers={"Content-Type": "application/json"},
)
await self._security.initialize()
self._initialized = True
logger.info("telegram_gateway_initialized")
return True
@property
def bot_token(self) -> str:
"""取得 Bot Token"""
return settings.OPENCLAW_TG_BOT_TOKEN
@property
def chat_id(self) -> str:
"""取得 Chat ID"""
return settings.OPENCLAW_TG_CHAT_ID
@property
def api_url(self) -> str:
"""取得 Telegram API URL"""
return f"{self.TELEGRAM_API_BASE}/bot{self.bot_token}"
async def _send_request(
self,
method: str,
payload: dict,
) -> dict:
"""
發送 Telegram API 請求
Phase C P1: 新增 OTEL 追蹤
@author Claude Code
@date 2026-03-30 (台北時間)
Args:
method: API 方法 (sendMessage, editMessageText, etc.)
payload: 請求 Payload
Returns:
dict: API 回應
"""
if not self._initialized:
await self.initialize()
if not self._http_client:
raise TelegramGatewayError("HTTP client not initialized")
url = f"{self.api_url}/{method}"
# OTEL Span: telegram.api.{method}
with _tracer.start_as_current_span(
f"telegram.api.{method}",
attributes={
"telegram.method": method,
"telegram.chat_id": str(payload.get("chat_id", "")),
"telegram.has_reply_markup": "reply_markup" in payload,
},
) as span:
try:
response = await self._http_client.post(url, json=payload)
response.raise_for_status()
result = response.json()
if not result.get("ok"):
span.set_attribute("telegram.error", result.get("description", "Unknown"))
span.set_status(trace.Status(trace.StatusCode.ERROR))
raise TelegramGatewayError(
f"Telegram API error: {result.get('description', 'Unknown error')}"
)
# 成功: 記錄 message_id
if "result" in result and "message_id" in result["result"]:
span.set_attribute("telegram.message_id", result["result"]["message_id"])
span.set_status(trace.Status(trace.StatusCode.OK))
return result
except httpx.HTTPStatusError as e:
span.set_attribute("telegram.http_status", e.response.status_code)
span.set_status(trace.Status(trace.StatusCode.ERROR))
span.record_exception(e)
logger.error("telegram_api_error", method=method, status=e.response.status_code)
raise TelegramGatewayError(f"HTTP error: {e.response.status_code}") from e
except TelegramGatewayError:
# 已處理的錯誤,直接拋出
raise
except Exception as e:
span.set_status(trace.Status(trace.StatusCode.ERROR))
span.record_exception(e)
logger.error("telegram_request_failed", method=method, error=str(e))
raise TelegramGatewayError(str(e)) from e
def _build_inline_keyboard(
self,
approval_id: str,
include_auto_tuning: bool = True,
auto_tuning_command: str = "",
incident_id: str = "",
) -> dict:
"""
建立 Inline Keyboard (ADR-050 v2.0 六鍵佈局)
2026-04-01 Claude Code (ADR-050): 重組為 6 鍵 + 可選自動調優
- 第一行: [✅ 批准] [❌ 拒絕] [🔕 靜默] ← nonce 防重放
- 第二行: [📋 詳情] [🔄 重診] [📊 歷史] ← incident_id format (read-only)
- 第三行: [⚡ 自動調優] (可選)
Args:
approval_id: 簽核單 ID (用於 nonce 生成)
include_auto_tuning: 是否包含自動調優按鈕
auto_tuning_command: kubectl 調優指令
incident_id: 關聯 Incident ID (用於 detail/reanalyze/history 按鈕)
Returns:
dict: Telegram InlineKeyboardMarkup
"""
# 產生 Nonce (防重放,用於寫操作)
approve_nonce = self._security.generate_callback_nonce(approval_id, "approve")
reject_nonce = self._security.generate_callback_nonce(approval_id, "reject")
silence_nonce = self._security.generate_callback_nonce(approval_id, "silence")
# 第一行: 主要簽核操作 (nonce 保護)
buttons = [
[
{"text": "✅ 批准", "callback_data": approve_nonce},
{"text": "❌ 拒絕", "callback_data": reject_nonce},
{"text": "🔕 靜默", "callback_data": silence_nonce},
],
]
# 第二行: 資訊查詢按鈕 (ADR-050: read-only, format: action:incident_id)
if incident_id:
buttons.append([
{"text": "📋 詳情", "callback_data": f"detail:{incident_id}"},
{"text": "🔄 重診", "callback_data": f"reanalyze:{incident_id}"},
{"text": "📊 歷史", "callback_data": f"history:{incident_id}"},
])
# 第三行: 自動調優按鈕 (v7.0)
if include_auto_tuning and auto_tuning_command:
tuning_nonce = self._security.generate_callback_nonce(approval_id, "tune")
buttons.append([
{"text": "⚡ 執行自動調優", "callback_data": tuning_nonce}
])
return {"inline_keyboard": buttons}
async def send_approval_card(
self,
approval_id: str,
risk_level: str,
resource_name: str,
root_cause: str,
suggested_action: str,
estimated_downtime: str = "~30s",
# v6.0 AI 仲裁欄位
primary_responsibility: str = "COLLAB",
confidence: float = 0.0,
namespace: str = "default",
# v7.0 SignOz 整合
signoz_rps: float = 0.0,
signoz_rps_trend: str = "stable",
signoz_error_rate: float = 0.0,
signoz_p99_latency: float = 0.0,
signoz_latency_trend: str = "stable",
signoz_trace_url: str = "",
auto_tuning_command: str = "",
# 2026-03-29 ogt: AI Token/Cost 追蹤
ai_tokens: int = 0,
ai_cost: float = 0.0,
# 2026-03-29 ogt: ADR-037 異常頻率統計
anomaly_frequency: dict | None = None,
# 2026-03-29 ogt: AI Provider 來源顯示
ai_provider: str = "",
# 2026-04-04 ogt: 底層模型名稱
ai_model: str = "",
# 2026-04-02 ogt: Phase 22 Nemotron 協作 (ADR-044)
nemotron_enabled: bool = False,
nemotron_tools: list[dict] | None = None,
nemotron_validation: str = "",
nemotron_latency_ms: float = 0.0,
# 2026-04-05 Claude Code: incident_id 用於 detail/reanalyze/history 按鈕
incident_id: str = "",
) -> dict:
"""
推送待簽核卡片到 Telegram (v7.0 含 SignOz 整合)
SOUL.md 4.1 + AI 仲裁 + SignOz 訊息格式
Phase 21 (ADR-037): 含異常頻率統計
Args:
approval_id: 簽核單 ID
risk_level: 風險等級 (critical/medium/low)
resource_name: 資源名稱
root_cause: 根因摘要
suggested_action: 建議操作
estimated_downtime: 預計停機時間
primary_responsibility: 責任團隊 (FE/BE/INFRA/DB/COLLAB)
confidence: AI 信心度 (0.0-1.0)
namespace: K8s namespace
signoz_*: SignOz Gold Metrics
signoz_trace_url: 動態時間參數的 Trace URL
auto_tuning_command: kubectl 調優指令
anomaly_frequency: 異常頻率統計 (ADR-037)
Returns:
dict: Telegram API 回應
"""
# 取得狀態 Emoji
emoji = RISK_EMOJI_MAP.get(risk_level.lower(), "⚠️")
# 建立 SignOz 指標區塊
signoz_metrics = None
if signoz_rps > 0 or signoz_error_rate > 0 or signoz_p99_latency > 0:
signoz_metrics = SignOzMetricsBlock(
rps=signoz_rps,
rps_trend=signoz_rps_trend,
error_rate=signoz_error_rate,
p99_latency_ms=signoz_p99_latency,
latency_trend=signoz_latency_trend,
trace_url=signoz_trace_url,
)
# 建立訊息結構 (含 AI 仲裁 + SignOz + Token/Cost + 頻率統計)
message = TelegramMessage(
status_emoji=emoji,
risk_level=risk_level.upper(),
resource_name=resource_name,
root_cause=root_cause,
suggested_action=suggested_action,
estimated_downtime=estimated_downtime,
approval_id=approval_id,
incident_id=incident_id,
primary_responsibility=primary_responsibility,
confidence=confidence,
namespace=namespace,
signoz_metrics=signoz_metrics,
signoz_trace_url=signoz_trace_url,
auto_tuning_command=auto_tuning_command,
# 2026-03-29 ogt: AI Token/Cost 追蹤
ai_tokens=ai_tokens,
ai_cost=ai_cost,
# 2026-03-29 ogt: ADR-037 異常頻率統計
anomaly_frequency=anomaly_frequency,
# 2026-03-29 ogt: AI Provider 來源顯示
ai_provider=ai_provider,
# 2026-04-04 ogt: 底層模型名稱
ai_model=ai_model,
# 2026-04-02 ogt: Phase 22 Nemotron 協作 (ADR-044)
nemotron_enabled=nemotron_enabled,
nemotron_tools=nemotron_tools,
nemotron_validation=nemotron_validation,
nemotron_latency_ms=nemotron_latency_ms,
)
# 格式化訊息 — Phase 22: 如果 Nemotron 啟用,使用雙軌格式
text = message.format_with_nemotron() if nemotron_enabled else message.format()
# 建立按鈕 (含自動調優)
# 2026-04-05 Claude Code: 傳入 incident_id 以啟用 detail/reanalyze/history 按鈕
keyboard = self._build_inline_keyboard(
approval_id=approval_id,
include_auto_tuning=bool(auto_tuning_command),
auto_tuning_command=auto_tuning_command,
incident_id=incident_id,
)
# 發送訊息
payload = {
"chat_id": self.chat_id,
"text": text,
"parse_mode": "HTML",
"reply_markup": keyboard,
"disable_web_page_preview": True, # 避免 SignOz URL 預覽
}
logger.info(
"telegram_approval_card_sending",
approval_id=approval_id,
risk_level=risk_level,
resource=resource_name,
signoz_integrated=signoz_metrics is not None,
auto_tuning_available=bool(auto_tuning_command),
)
result = await self._send_request("sendMessage", payload)
logger.info(
"telegram_approval_card_sent",
approval_id=approval_id,
message_id=result.get("result", {}).get("message_id"),
)
# 2026-04-03 ogt: 發到 SRE 群組並觸發 AI 雙向討論 (Triumvirate ADR-053)
# 非同步執行,失敗不影響告警主流程
if settings.SRE_GROUP_CHAT_ID:
asyncio.create_task(
self._send_approval_card_to_group(
approval_id=approval_id,
risk_level=risk_level,
resource_name=resource_name,
root_cause=root_cause,
suggested_action=suggested_action,
)
)
return result
async def _send_approval_card_to_group(
self,
approval_id: str,
risk_level: str,
resource_name: str,
root_cause: str,
suggested_action: str,
) -> None:
"""
發送告警卡片到 SRE 群組並觸發 AI 討論
由 asyncio.create_task 非同步呼叫,失敗不影響主告警流程。
"""
try:
# 2026-04-03 ogt: 選項 C 區塊式格式(老闆指示)
# 2026-04-03 ogt: 老闆指示移除自動 AI 分析,告警只發卡片
risk_emoji = RISK_EMOJI_MAP.get(risk_level.lower(), "")
summary = (
f"{risk_emoji} {risk_level.upper()}{html.escape(resource_name[:40])}\n\n"
f"📋 <code>{html.escape(approval_id)}</code>\n"
f"🔍 {html.escape(root_cause[:150])}\n"
f"🔧 {html.escape(suggested_action[:80])}"
)
await self.send_to_group(text=summary)
except Exception as e:
logger.error("send_approval_card_to_group_failed", error=str(e))
# =========================================================================
# 新訊息發送方法 (2026-03-29 ogt: ADR-038)
# =========================================================================
def _build_sentry_keyboard(self, error_id: str) -> dict:
"""建立 Sentry 錯誤訊息按鈕"""
view_nonce = self._security.generate_callback_nonce(error_id, "view")
silence_nonce = self._security.generate_callback_nonce(error_id, "silence")
return {
"inline_keyboard": [
[
{"text": "🔍 查看詳情", "callback_data": view_nonce},
{"text": "🔕 靜默 1h", "callback_data": silence_nonce},
]
]
}
def _build_resource_keyboard(self, resource_id: str) -> dict:
"""建立資源告警按鈕"""
scale_nonce = self._security.generate_callback_nonce(resource_id, "scale")
silence_nonce = self._security.generate_callback_nonce(resource_id, "silence")
return {
"inline_keyboard": [
[
{"text": "⚡ 自動擴展", "callback_data": scale_nonce},
{"text": "🔕 靜默 1h", "callback_data": silence_nonce},
]
]
}
async def send_sentry_error(
self,
error_id: str,
error_type: str,
error_message: str,
service_name: str,
file_location: str,
occurrence_count: int = 1,
affected_users: int = 0,
first_seen: str = "",
stack_trace: list[str] | None = None,
sentry_url: str = "",
) -> dict:
"""
發送 Sentry 錯誤通知
2026-03-29 ogt: 新增
Args:
error_id: Sentry Issue ID
error_type: 錯誤類型 (TypeError, etc.)
error_message: 錯誤訊息
service_name: 服務名稱
file_location: 檔案位置
occurrence_count: 發生次數
affected_users: 影響用戶數
first_seen: 首次發生時間
stack_trace: Stack trace
sentry_url: Sentry 連結
Returns:
dict: Telegram API 回應
"""
message = SentryErrorMessage(
error_id=error_id,
error_type=error_type,
error_message=error_message,
service_name=service_name,
file_location=file_location,
occurrence_count=occurrence_count,
affected_users=affected_users,
first_seen=first_seen,
stack_trace=stack_trace,
sentry_url=sentry_url,
)
payload = {
"chat_id": self.chat_id,
"text": message.format(),
"parse_mode": "HTML",
"reply_markup": self._build_sentry_keyboard(error_id),
"disable_web_page_preview": True,
}
logger.info("telegram_sentry_error_sending", error_id=error_id, service=service_name)
result = await self._send_request("sendMessage", payload)
logger.info("telegram_sentry_error_sent", error_id=error_id)
return result
async def send_resource_warning(
self,
resource_id: str,
pod_name: str,
namespace: str = "default",
cpu_percent: float = 0.0,
cpu_limit: str = "",
memory_percent: float = 0.0,
memory_limit: str = "",
disk_percent: float = 0.0,
trend_info: str = "",
suggestion: str = "",
) -> dict:
"""
發送資源告警通知
2026-03-29 ogt: 新增
Args:
resource_id: 資源 ID
pod_name: Pod 名稱
namespace: K8s namespace
cpu_percent: CPU 使用率
memory_percent: Memory 使用率
disk_percent: Disk 使用率
trend_info: 趨勢資訊
suggestion: 建議
Returns:
dict: Telegram API 回應
"""
message = ResourceWarnMessage(
resource_id=resource_id,
pod_name=pod_name,
namespace=namespace,
cpu_percent=cpu_percent,
cpu_limit=cpu_limit,
memory_percent=memory_percent,
memory_limit=memory_limit,
disk_percent=disk_percent,
trend_info=trend_info,
suggestion=suggestion,
)
payload = {
"chat_id": self.chat_id,
"text": message.format(),
"parse_mode": "HTML",
"reply_markup": self._build_resource_keyboard(resource_id),
"disable_web_page_preview": True,
}
logger.info("telegram_resource_warning_sending", resource_id=resource_id, pod=pod_name)
result = await self._send_request("sendMessage", payload)
logger.info("telegram_resource_warning_sent", resource_id=resource_id)
return result
async def send_repair_report(
self,
report_date: str,
total_repairs: int = 0,
success_count: int = 0,
failure_count: int = 0,
saved_minutes: int = 0,
top_issues: list[tuple[str, int]] | None = None,
ai_cost_gemini: float = 0.0,
ai_cost_nvidia: float = 0.0,
ai_tokens_total: int = 0,
) -> dict:
"""
發送自動修復報告
2026-03-29 ogt: 新增
Args:
report_date: 報告日期
total_repairs: 總修復次數
success_count: 成功次數
failure_count: 失敗次數
saved_minutes: 節省人工時間
top_issues: Top 問題列表
ai_cost_gemini: Gemini 成本
ai_cost_nvidia: NVIDIA 成本
ai_tokens_total: 總 Token 數
Returns:
dict: Telegram API 回應
"""
message = RepairReportMessage(
report_date=report_date,
total_repairs=total_repairs,
success_count=success_count,
failure_count=failure_count,
saved_minutes=saved_minutes,
top_issues=top_issues,
ai_cost_gemini=ai_cost_gemini,
ai_cost_nvidia=ai_cost_nvidia,
ai_tokens_total=ai_tokens_total,
)
payload = {
"chat_id": self.chat_id,
"text": message.format(),
"parse_mode": "HTML",
"disable_web_page_preview": True,
}
logger.info("telegram_repair_report_sending", date=report_date)
result = await self._send_request("sendMessage", payload)
logger.info("telegram_repair_report_sent", date=report_date)
return result
async def send_daily_summary(
self,
summary_date: str,
alert_total: int = 0,
alert_critical: int = 0,
alert_medium: int = 0,
alert_low: int = 0,
auto_repair_count: int = 0,
manual_approval_count: int = 0,
ignored_count: int = 0,
avg_response_minutes: float = 0.0,
api_availability: float = 99.9,
web_availability: float = 99.9,
worker_availability: float = 99.9,
ai_cost: float = 0.0,
cloud_cost: float = 0.0,
budget_remaining: float = 0.0,
) -> dict:
"""
發送每日摘要
2026-03-29 ogt: 新增
Returns:
dict: Telegram API 回應
"""
message = DailySummaryMessage(
summary_date=summary_date,
alert_total=alert_total,
alert_critical=alert_critical,
alert_medium=alert_medium,
alert_low=alert_low,
auto_repair_count=auto_repair_count,
manual_approval_count=manual_approval_count,
ignored_count=ignored_count,
avg_response_minutes=avg_response_minutes,
api_availability=api_availability,
web_availability=web_availability,
worker_availability=worker_availability,
ai_cost=ai_cost,
cloud_cost=cloud_cost,
budget_remaining=budget_remaining,
)
payload = {
"chat_id": self.chat_id,
"text": message.format(),
"parse_mode": "HTML",
"disable_web_page_preview": True,
}
logger.info("telegram_daily_summary_sending", date=summary_date)
result = await self._send_request("sendMessage", payload)
logger.info("telegram_daily_summary_sent", date=summary_date)
return result
async def send_cicd_progress(
self,
job_name: str,
status: str,
stage: str = "",
commit_sha: str = "",
triggered_by: str = "",
duration_seconds: int = 0,
message: str = "",
workflow_url: str = "",
max_retries: int = 3,
) -> dict:
"""
發送 CI/CD 進度通知 (簡潔版,不走 AI 仲裁)
2026-03-30 ogt: 新增,解決 CI/CD 告警被當成事件處理的問題
2026-03-30 P1: 新增重試機制 (指數退避)
Args:
max_retries: 最大重試次數 (預設 3)
Returns:
dict: Telegram API 回應
"""
# OTEL Span: telegram.send_cicd_progress
with _tracer.start_as_current_span(
"telegram.send_cicd_progress",
attributes={
"telegram.job_name": job_name,
"telegram.status": status,
"telegram.stage": stage,
"telegram.max_retries": max_retries,
},
) as span:
msg = CICDProgressMessage(
job_name=job_name,
status=status,
stage=stage,
commit_sha=commit_sha,
triggered_by=triggered_by,
duration_seconds=duration_seconds,
message=message,
workflow_url=workflow_url,
)
payload = {
"chat_id": self.chat_id,
"text": msg.format(),
"parse_mode": "HTML",
"disable_web_page_preview": True,
}
logger.info("telegram_cicd_progress_sending", job=job_name, status=status)
# 重試機制 (指數退避)
last_error = None
for attempt in range(max_retries):
try:
result = await self._send_request("sendMessage", payload)
span.set_attribute("telegram.attempts", attempt + 1)
span.set_status(trace.Status(trace.StatusCode.OK))
logger.info("telegram_cicd_progress_sent", job=job_name, status=status, attempt=attempt + 1)
return result
except TelegramGatewayError as e:
last_error = e
if attempt < max_retries - 1:
delay = 2 ** attempt # 1, 2, 4 秒
logger.warning(
"telegram_cicd_progress_retry",
job=job_name,
attempt=attempt + 1,
delay=delay,
error=str(e),
)
await asyncio.sleep(delay)
# 所有重試都失敗
span.set_attribute("telegram.attempts", max_retries)
span.set_status(trace.Status(trace.StatusCode.ERROR))
span.record_exception(last_error)
logger.error(
"telegram_cicd_progress_failed",
job=job_name,
status=status,
max_retries=max_retries,
error=str(last_error),
)
raise last_error
async def send_deploy_success(
self,
commit_sha: str,
triggered_by: str,
environment: str = "Production",
api_version: str = "",
web_version: str = "",
worker_version: str = "",
duration_seconds: int = 0,
e2e_passed: int = 0,
e2e_total: int = 0,
health_check_passed: bool = True,
workflow_url: str = "",
) -> dict:
"""
發送部署成功通知
2026-03-29 ogt: 新增
Returns:
dict: Telegram API 回應
"""
message = DeploySuccessMessage(
commit_sha=commit_sha,
triggered_by=triggered_by,
environment=environment,
api_version=api_version,
web_version=web_version,
worker_version=worker_version,
duration_seconds=duration_seconds,
e2e_passed=e2e_passed,
e2e_total=e2e_total,
health_check_passed=health_check_passed,
workflow_url=workflow_url,
)
payload = {
"chat_id": self.chat_id,
"text": message.format(),
"parse_mode": "HTML",
"disable_web_page_preview": True,
}
logger.info("telegram_deploy_success_sending", commit=commit_sha[:8])
result = await self._send_request("sendMessage", payload)
logger.info("telegram_deploy_success_sent", commit=commit_sha[:8])
return result
async def send_rate_limit_warning(
self,
provider: str,
daily_usage: int = 0,
daily_limit: int = 0,
token_usage: int = 0,
token_limit: int = 0,
cost_usd: float = 0.0,
suggestions: list[str] | None = None,
reset_time: str = "",
) -> dict:
"""
發送 API 限額警告
2026-03-29 ogt: 新增
Returns:
dict: Telegram API 回應
"""
message = RateLimitMessage(
provider=provider,
daily_usage=daily_usage,
daily_limit=daily_limit,
token_usage=token_usage,
token_limit=token_limit,
cost_usd=cost_usd,
suggestions=suggestions,
reset_time=reset_time,
)
payload = {
"chat_id": self.chat_id,
"text": message.format(),
"parse_mode": "HTML",
"disable_web_page_preview": True,
}
logger.info("telegram_rate_limit_warning_sending", provider=provider)
result = await self._send_request("sendMessage", payload)
logger.info("telegram_rate_limit_warning_sent", provider=provider)
return result
async def handle_callback(
self,
callback_query_id: str,
callback_data: str,
user_id: int,
message_id: int,
original_text: str = "",
username: str = "",
) -> dict:
"""
處理簽核/調優回調
Args:
callback_query_id: Telegram Callback Query ID
callback_data: Callback Data (包含 nonce)
user_id: Telegram User ID
message_id: 原始訊息 ID
original_text: 原始卡片內容 (用於保留上下文)
username: 簽核者使用者名稱
Returns:
dict: 處理結果 {action, approval_id, user, auto_tuning_result?}
"""
try:
# ===================================================================
# Step 1: 解析 Callback Data (支援兩種格式)
# ===================================================================
parsed = self._security.parse_callback_data(callback_data)
action = parsed["action"]
approval_id = parsed["approval_id"]
# ===================================================================
# Step 1.5: ADR-050 Info Actions (read-only, 只需白名單驗證)
# ===================================================================
# 2026-04-01 Claude Code (ADR-050 P1): detail/reanalyze/history
if parsed.get("is_info_action"):
if not self._security.is_whitelisted(user_id):
raise UserNotWhitelistedError(f"User {user_id} not in whitelist")
incident_id = parsed.get("incident_id", approval_id)
if action == "detail":
# ADR-050 P2: 取得事件詳情,傳送新訊息 (保留原始簽核卡片+按鈕)
# 2026-04-01 Claude Code (ADR-050 P2)
await self._answer_callback(callback_query_id, action, text="📋 詳情傳送中...")
await self._send_incident_detail(incident_id)
elif action == "history":
# ADR-050 P2: 取得頻率統計
# 2026-04-01 Claude Code (ADR-050 P2)
await self._answer_callback(callback_query_id, action, text="📊 歷史統計傳送中...")
await self._send_incident_history(incident_id)
elif action == "reanalyze":
# ADR-050 P2: 觸發重診
# 2026-04-01 Claude Code (ADR-050 P2): reanalyze button handler
await self._answer_callback(callback_query_id, action, text="🔄 重診排程中...")
await self._send_reanalyze_result(incident_id)
else:
await self._answer_callback(callback_query_id, action, text="⚠️ 未知操作")
return {
"action": action,
"approval_id": approval_id,
"user": {"id": user_id, "username": username},
"success": True,
"info_action": True,
}
nonce = parsed["nonce"]
# 驗證使用者 + Nonce
user = await self._security.verify_callback(
user_id=user_id,
callback_id=callback_query_id,
nonce=nonce,
)
# ===================================================================
# Step 2: 處理自動調優 (Shadow Mode)
# ===================================================================
auto_tuning_result = None
if action == "tune":
auto_tuning_result = await self._handle_auto_tuning(
approval_id=approval_id,
user_id=user_id,
username=username,
)
# 回應 Callback Query
await self._answer_callback(
callback_query_id,
"tune",
text="⚡ 調優指令已記錄 (Shadow Mode)",
)
# 更新訊息
await self._update_message_after_action(
message_id=message_id,
action="tune",
username=username,
original_text=original_text,
extra_info=auto_tuning_result.get("command", ""),
)
return {
"action": action,
"approval_id": approval_id,
"user": user,
"success": True,
"auto_tuning_result": auto_tuning_result,
}
# ===================================================================
# Step 2.5: 處理稍後/靜默 (2026-03-27 P1 優化)
# ===================================================================
if action == "snooze":
snooze_result = await self._handle_snooze(
approval_id=approval_id,
username=username,
)
await self._answer_callback(
callback_query_id,
"snooze",
text="⏰ 30 分鐘後再提醒",
)
await self._update_message_after_action(
message_id=message_id,
action="snooze",
username=username,
original_text=original_text,
)
return {
"action": action,
"approval_id": approval_id,
"user": user,
"success": True,
"snooze_result": snooze_result,
}
if action == "silence":
silence_result = await self._handle_silence(
approval_id=approval_id,
username=username,
original_text=original_text,
)
await self._answer_callback(
callback_query_id,
"silence",
text="🔕 此類告警靜默 1 小時",
)
await self._update_message_after_action(
message_id=message_id,
action="silence",
username=username,
original_text=original_text,
extra_info=silence_result.get("resource_name", ""),
)
return {
"action": action,
"approval_id": approval_id,
"user": user,
"success": True,
"silence_result": silence_result,
}
# ===================================================================
# Step 3: 回應 Callback Query (簽核/拒絕)
# ===================================================================
await self._answer_callback(callback_query_id, action)
# ===================================================================
# Step 4: 更新訊息 (保留原始內容 + 簽核鋼印)
# ===================================================================
await self._update_message_after_action(
message_id=message_id,
action=action,
username=username,
original_text=original_text,
)
logger.info(
"telegram_callback_processed",
action=action,
approval_id=approval_id,
user_id=user_id,
)
return {
"action": action,
"approval_id": approval_id,
"user": user,
"success": True,
}
except UserNotWhitelistedError as e:
logger.warning("telegram_callback_denied", error=str(e), user_id=user_id)
await self._answer_callback(
callback_query_id,
"denied",
text="⛔ 您沒有簽核權限",
)
return {"success": False, "error": str(e)}
except NonceReplayError as e:
logger.warning("telegram_callback_replay", error=str(e))
await self._answer_callback(
callback_query_id,
"replay",
text="⚠️ 此操作已處理過",
)
return {"success": False, "error": str(e)}
except Exception as e:
logger.error("telegram_callback_error", error=str(e))
await self._answer_callback(
callback_query_id,
"error",
text="❌ 處理失敗",
)
return {"success": False, "error": str(e)}
async def _handle_auto_tuning(
self,
approval_id: str,
user_id: int,
username: str,
) -> dict:
"""
處理自動調優請求 (Shadow Mode)
統帥鐵律: Shadow Mode 下嚴禁實際執行 K8s 命令
Args:
approval_id: 簽核單 ID
user_id: 執行者 Telegram ID
username: 執行者名稱
Returns:
dict: 調優結果
"""
try:
# Shadow Mode: 僅記錄調優請求
# 實際生產環境需從 ApprovalRecord 取得完整調優指令
# Shadow Mode: 僅記錄調優請求
# 實際生產環境需從 ApprovalRecord 取得完整調優指令
log_message = f"[SHADOW MODE] 自動調優請求 - 簽核單: {approval_id}"
if settings.SHADOW_MODE_ENABLED:
logger.info(
"shadow_mode_auto_tuning_triggered",
approval_id=approval_id,
user_id=user_id,
username=username,
shadow_mode=True,
)
print(f"\n{'='*60}")
print("[SHADOW MODE] AI 生成的調優指令請求")
print(f"簽核單: {approval_id}")
print(f"執行者: @{username} (ID: {user_id})")
print(f"時間: {datetime.now(UTC).isoformat()}")
print("狀態: 僅記錄,未實際執行")
print(f"{'='*60}\n")
return {
"executed": False,
"shadow_mode": True,
"approval_id": approval_id,
"triggered_by": username,
"command": "kubectl command logged (see server logs)",
"log": log_message,
}
else:
logger.warning(
"auto_tuning_blocked_not_shadow_mode",
approval_id=approval_id,
message="Production execution requires multi-sig approval",
)
return {
"executed": False,
"shadow_mode": False,
"approval_id": approval_id,
"error": "Production execution requires multi-sig approval",
}
except Exception as e:
logger.error("auto_tuning_error", error=str(e), approval_id=approval_id)
return {
"executed": False,
"error": str(e),
}
async def _handle_snooze(
self,
approval_id: str,
username: str,
) -> dict:
"""
處理稍後提醒 (2026-03-27 P1 優化)
功能: 延遲 30 分鐘後再提醒此告警
Args:
approval_id: 簽核單 ID
username: 執行者名稱
Returns:
dict: 處理結果
"""
try:
redis = get_redis()
snooze_key = f"{SNOOZE_KEY_PREFIX}{approval_id}"
# 設置 30 分鐘延遲標記
await redis.setex(
snooze_key,
SNOOZE_TTL_SECONDS,
f"{username}:{datetime.now(UTC).isoformat()}",
)
logger.info(
"telegram_snooze_set",
approval_id=approval_id,
username=username,
ttl_minutes=SNOOZE_TTL_SECONDS // 60,
)
return {
"snoozed": True,
"approval_id": approval_id,
"snooze_until": datetime.now(UTC).isoformat(),
"ttl_minutes": SNOOZE_TTL_SECONDS // 60,
}
except Exception as e:
logger.error("snooze_error", error=str(e), approval_id=approval_id)
return {
"snoozed": False,
"error": str(e),
}
async def _handle_silence(
self,
approval_id: str,
username: str,
original_text: str,
) -> dict:
"""
處理靜默 1 小時 (2026-03-27 P1 優化)
功能: 同類告警 (相同資源) 1 小時內不再發送
Args:
approval_id: 簽核單 ID
username: 執行者名稱
original_text: 原始訊息 (用於解析資源名稱)
Returns:
dict: 處理結果
"""
try:
redis = get_redis()
# 從原始訊息解析資源名稱 (格式: 🎯 資源: xxx)
resource_name = "unknown"
for line in original_text.split("\n"):
if "🎯 資源:" in line or "🎯 資源: " in line:
resource_name = line.split(":")[-1].strip()
break
silence_key = f"{SILENCE_KEY_PREFIX}{resource_name}"
# 設置 1 小時靜默標記
await redis.setex(
silence_key,
SILENCE_TTL_SECONDS,
f"{username}:{datetime.now(UTC).isoformat()}:{approval_id}",
)
logger.info(
"telegram_silence_set",
approval_id=approval_id,
resource_name=resource_name,
username=username,
ttl_hours=SILENCE_TTL_SECONDS // 3600,
)
return {
"silenced": True,
"approval_id": approval_id,
"resource_name": resource_name,
"silence_until": datetime.now(UTC).isoformat(),
"ttl_hours": SILENCE_TTL_SECONDS // 3600,
}
except Exception as e:
logger.error("silence_error", error=str(e), approval_id=approval_id)
return {
"silenced": False,
"error": str(e),
}
async def _answer_callback(
self,
callback_query_id: str,
action: str,
text: str | None = None,
) -> None:
"""回應 Callback Query"""
if text is None:
if action == "approve":
text = "✅ 已簽核"
elif action == "reject":
text = "❌ 已拒絕"
elif action == "tune":
text = "⚡ 調優中..."
elif action == "snooze":
text = "⏰ 30 分鐘後再提醒"
elif action == "silence":
text = "🔕 此類告警靜默 1 小時"
else:
text = "✓ 已處理"
await self._send_request("answerCallbackQuery", {
"callback_query_id": callback_query_id,
"text": text,
"show_alert": False,
})
async def _update_message_after_action(
self,
message_id: int,
action: str,
username: str,
original_text: str,
extra_info: str = "",
) -> None:
"""
更新訊息: 保留原始卡片內容 + 簽核/調優鋼印
UX 要求:
- 嚴禁覆蓋原始內容
- 必須在底部加上分隔線與簽核狀態
- 移除所有按鈕
"""
# 構建鋼印 (2026-03-27 ogt: 新增 snooze/silence)
if action == "approve":
stamp = f"✅ 已由 @{username} 授權執行"
elif action == "reject":
stamp = f"❌ 已由 @{username} 拒絕執行"
elif action == "tune":
stamp = f"⚡ 已由 @{username} 觸發自動調優 (Shadow Mode)"
if extra_info:
stamp += "\n📝 指令已記錄"
elif action == "snooze":
stamp = f"⏰ @{username} 已設定 30 分鐘後再提醒"
elif action == "silence":
resource_info = f" ({extra_info})" if extra_info else ""
stamp = f"🔕 @{username} 已靜默此類告警 1 小時{resource_info}"
else:
stamp = f"✓ 已由 @{username} 處理"
# 組合: 原始內容 + 分隔線 + 鋼印
separator = "──────────────"
updated_text = f"{original_text}\n{separator}\n{stamp}"
# 使用 editMessageText 同時更新內容並移除按鈕
await self._send_request("editMessageText", {
"chat_id": self.chat_id,
"message_id": message_id,
"text": updated_text,
"parse_mode": "HTML",
"reply_markup": {"inline_keyboard": []},
"disable_web_page_preview": True,
})
async def _send_incident_detail(self, incident_id: str) -> None:
"""
ADR-050 P2: 傳送事件詳情訊息 (不修改原始簽核卡片)
2026-04-01 Claude Code (ADR-050 P2): detail button handler
"""
# 延遲 import 避免循環依賴 (與 approval_service 同一模式)
from src.repositories.incident_repository import get_incident_repository
try:
repo = get_incident_repository()
incident = await repo.get_by_id(incident_id)
if not incident:
await self.send_notification(f"⚠️ 找不到事件 <code>{html.escape(incident_id)}</code>")
return
dc = incident.decision_chain
confidence_bar = "" * int((dc.confidence if dc else 0) * 10) + "" * (10 - int((dc.confidence if dc else 0) * 10))
lines = [
f"📋 <b>事件詳情</b>",
f"",
f"🔖 <b>ID:</b> <code>{html.escape(incident.incident_id)}</code>",
f"📊 <b>狀態:</b> {incident.status.value}",
f"⚡ <b>嚴重度:</b> {incident.severity.value}",
]
if incident.affected_services:
lines.append(f"🎯 <b>受影響服務:</b> {', '.join(html.escape(s) for s in incident.affected_services[:3])}")
if dc:
lines += [
f"",
f"🤖 <b>AI 分析</b> ({html.escape(dc.model_used)})",
f"💡 {html.escape(dc.hypothesis[:200])}{'...' if len(dc.hypothesis) > 200 else ''}",
f"📈 信心: [{confidence_bar}] {dc.confidence:.0%}",
]
if dc.probable_root_causes:
lines.append(f"🔍 根因: {html.escape(dc.probable_root_causes[0][:100])}")
# 2026-04-02 Claude Code: 修正時區 — 必須轉台北時區 (feedback_timezone_taipei.md)
from zoneinfo import ZoneInfo
created_taipei = incident.created_at.astimezone(ZoneInfo("Asia/Taipei")) if incident.created_at else incident.created_at
lines += [
f"",
f"🕐 <b>建立:</b> {created_taipei.strftime('%m/%d %H:%M') if created_taipei else 'N/A'}",
]
if incident.frequency_stats:
fs = incident.frequency_stats
lines.append(f"📉 <b>頻率:</b> 1h={fs.count_1h} 24h={fs.count_24h} 7d={fs.count_7d}")
await self.send_notification("\n".join(lines))
except Exception as e:
logger.warning("send_incident_detail_failed", incident_id=incident_id, error=str(e))
await self.send_notification(f"⚠️ 無法取得事件詳情: {html.escape(str(e)[:100])}")
async def _send_incident_history(self, incident_id: str) -> None:
"""
ADR-050 P2: 傳送事件頻率統計訊息
2026-04-01 Claude Code (ADR-050 P2): history button handler
"""
from src.repositories.incident_repository import get_incident_repository
try:
repo = get_incident_repository()
incident = await repo.get_by_id(incident_id)
if not incident:
await self.send_notification(f"⚠️ 找不到事件 <code>{html.escape(incident_id)}</code>")
return
fs = incident.frequency_stats
if not fs:
await self.send_notification(
f"📊 <b>事件歷史</b>\n\n🔖 <code>{html.escape(incident.incident_id)}</code>\n\n無頻率統計資料"
)
return
lines = [
f"📊 <b>事件歷史統計</b>",
f"",
f"🔖 <code>{html.escape(incident.incident_id)}</code>",
f"🔑 告警鍵: <code>{html.escape(fs.anomaly_key[:60])}</code>",
f"",
f"⏱ <b>發生頻率</b>",
f" 1小時: {fs.count_1h}",
f" 24小時: {fs.count_24h}",
f" 7天: {fs.count_7d}",
f" 30天: {fs.count_30d}",
]
if fs.auto_repair_count > 0:
lines += [
f"",
f"🔧 <b>自動修復</b>",
f" 執行次數: {fs.auto_repair_count}",
]
if fs.last_repair_action:
lines.append(f" 最後動作: {html.escape(fs.last_repair_action[:80])}")
await self.send_notification("\n".join(lines))
except Exception as e:
logger.warning("send_incident_history_failed", incident_id=incident_id, error=str(e))
await self.send_notification(f"⚠️ 無法取得歷史統計: {html.escape(str(e)[:100])}")
async def _send_reanalyze_result(self, incident_id: str) -> None:
"""
ADR-050 P2: 觸發重診並傳送結果訊息
呼叫 IncidentService.trigger_reanalysis(),以新訊息回報排程結果。
不修改原始簽核卡片,避免干擾授權流程。
2026-04-01 Claude Code (ADR-050 P2): reanalyze button handler
"""
from src.services.incident_service import get_incident_service
try:
service = get_incident_service()
result = await service.trigger_reanalysis(incident_id)
if result["already_analyzing"]:
msg = (
f"⏳ <b>重診進行中</b>\n\n"
f"🔖 <code>{html.escape(incident_id)}</code>\n\n"
f"{html.escape(result['message'])}"
)
elif result["triggered"]:
msg = (
f"🔄 <b>重診已排程</b>\n\n"
f"🔖 <code>{html.escape(incident_id)}</code>\n\n"
f"{html.escape(result['message'])}\n"
f"AI 分析結果將自動更新事件狀態。"
)
else:
msg = (
f"⚠️ <b>重診失敗</b>\n\n"
f"🔖 <code>{html.escape(incident_id)}</code>\n\n"
f"{html.escape(result['message'])}"
)
await self.send_notification(msg)
except Exception as e:
logger.warning("send_reanalyze_result_failed", incident_id=incident_id, error=str(e))
await self.send_notification(
f"⚠️ 重診觸發失敗: {html.escape(str(e)[:100])}"
)
async def send_notification(
self,
text: str,
parse_mode: str = "HTML",
chat_id: str | int | None = None,
) -> dict:
"""
發送純文字通知
Args:
text: 訊息內容
parse_mode: 解析模式
Returns:
dict: API 回應
"""
payload = {
"chat_id": chat_id or self.chat_id,
"text": text[:500], # SOUL.md 字數限制
"parse_mode": parse_mode,
}
return await self._send_request("sendMessage", payload)
# =========================================================================
# 2026-04-03 ogt: SRE 戰情室群組三頭政治 (Triumvirate) — ADR-053
# @tsenyangbot 發告警卡片到群組OpenClaw/NemoClaw Bot 各自回覆分析
# =========================================================================
async def send_to_group(
self,
text: str,
parse_mode: str = "HTML",
reply_markup: dict | None = None,
) -> dict:
"""
用 @tsenyangbot 發訊息到 SRE 群組 (SRE_GROUP_CHAT_ID)
Args:
text: 訊息內容
parse_mode: 解析模式
reply_markup: 按鈕 (可選)
Returns:
dict: Telegram API 回應 (含 message_id)
"""
if not settings.SRE_GROUP_CHAT_ID:
logger.warning("send_to_group_skipped", reason="SRE_GROUP_CHAT_ID not configured")
return {}
payload: dict = {
"chat_id": settings.SRE_GROUP_CHAT_ID,
"text": text[:4096],
"parse_mode": parse_mode,
}
if reply_markup:
payload["reply_markup"] = reply_markup
return await self._send_request("sendMessage", payload)
async def _send_as_bot(
self,
token: str,
chat_id: str,
text: str,
reply_to_message_id: int | None = None,
parse_mode: str = "HTML",
) -> dict:
"""
用指定 Bot Token 發訊息(不走 self._http_client獨立建立請求
Args:
token: Bot Token
chat_id: 群組 Chat ID
text: 訊息內容
reply_to_message_id: 回覆哪則訊息的 message_id
parse_mode: 解析模式
Returns:
dict: Telegram API 回應
"""
if not self._http_client:
raise TelegramGatewayError("HTTP client not initialized")
url = f"{self.TELEGRAM_API_BASE}/bot{token}/sendMessage"
payload: dict = {
"chat_id": chat_id,
"text": text[:4096],
"parse_mode": parse_mode,
}
# 2026-04-03 ogt: supergroup 跨 Bot reply 需用 reply_parameters (Bot API v6.7+)
# 舊的 reply_to_message_id 在 supergroup 會 400改用新格式 + allow_sending_without_reply
if reply_to_message_id:
payload["reply_parameters"] = {
"message_id": reply_to_message_id,
"allow_sending_without_reply": True,
}
response = await self._http_client.post(url, json=payload)
response.raise_for_status()
return response.json()
async def send_as_openclaw(
self,
text: str,
reply_to_message_id: int | None = None,
) -> dict:
"""
用 @OpenClawAwoooI_Bot 在群組發言
Args:
text: 訊息內容
reply_to_message_id: 回覆哪則訊息
Returns:
dict: Telegram API 回應
"""
if not settings.OPENCLAW_BOT_TOKEN or not settings.SRE_GROUP_CHAT_ID:
logger.warning("send_as_openclaw_skipped", reason="OPENCLAW_BOT_TOKEN or SRE_GROUP_CHAT_ID not configured")
return {}
return await self._send_as_bot(
token=settings.OPENCLAW_BOT_TOKEN,
chat_id=settings.SRE_GROUP_CHAT_ID,
text=text,
reply_to_message_id=reply_to_message_id,
)
async def send_as_nemotron(
self,
text: str,
reply_to_message_id: int | None = None,
) -> dict:
"""
用 @NemoTronAwoooI_Bot 在群組發言
Args:
text: 訊息內容
reply_to_message_id: 回覆哪則訊息
Returns:
dict: Telegram API 回應
"""
if not settings.NEMOTRON_BOT_TOKEN or not settings.SRE_GROUP_CHAT_ID:
logger.warning("send_as_nemotron_skipped", reason="NEMOTRON_BOT_TOKEN or SRE_GROUP_CHAT_ID not configured")
return {}
return await self._send_as_bot(
token=settings.NEMOTRON_BOT_TOKEN,
chat_id=settings.SRE_GROUP_CHAT_ID,
text=text,
reply_to_message_id=reply_to_message_id,
)
async def trigger_group_ai_discussion(
self,
alert_message_id: int,
alert_summary: str,
) -> None:
"""
觸發群組 AI 並行分析(三頭政治核心流程)
流程 (2026-04-03 ogt: 統帥指示改為並行):
- OpenClaw 和 NemoClaw 同時對告警進行獨立分析
- 兩者都 reply 同一條告警訊息
- 並行執行,總等待時間 = max(OpenClaw, NemoClaw) 而非相加
此方法由 asyncio.create_task 非同步呼叫,失敗不影響主流程。
Args:
alert_message_id: 告警訊息的 message_id兩個 Bot 回覆的起點)
alert_summary: 告警摘要文字(提供給 AI 分析用)
"""
try:
from src.services.chat_manager import ChatManager # noqa: PLC0415
except ImportError:
logger.error("trigger_group_ai_discussion_failed", reason="Cannot import ChatManager")
return
try:
chat_mgr = ChatManager()
# 2026-04-03 ogt: 老闆指示 — 告警分析只由 OpenClaw 負責NemoClaw 不分析告警
openclaw_prompt = (
f"你是 OpenClawAWOOOI SRE 戰情室首席 AI精通 K8s、Prometheus、告警分析。\n"
f"以下是一則基礎設施告警,請進行 RCA 根因分析並給出 3 點具體建議行動。\n"
f"繁體中文回應,不超過 300 字:\n\n"
f"{alert_summary}"
)
openclaw_analysis = await chat_mgr._call_openclaw(
system_prompt="你是 OpenClawAWOOOI SRE 戰情室首席 AI。稱呼用戶為「老闆」。",
user_message=openclaw_prompt,
)
if openclaw_analysis and not isinstance(openclaw_analysis, Exception):
await self.send_as_openclaw(
text=f"🦞 <b>OpenClaw 分析</b>\n\n{openclaw_analysis}",
reply_to_message_id=alert_message_id,
)
logger.info("group_ai_discussion_openclaw_sent")
else:
logger.warning("trigger_group_ai_discussion_openclaw_empty")
logger.info("group_ai_discussion_completed", alert_message_id=alert_message_id)
except Exception as e:
# 群組 AI 討論失敗不影響主流程
logger.error("trigger_group_ai_discussion_failed", error=str(e))
async def close(self) -> None:
"""關閉 Gateway"""
# 停止 Long Polling 與 Leader 相關 Tasks
self._polling_active = False
for task in (self._polling_task, self._leader_task):
if task and not task.done():
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
self._polling_task = None
self._leader_task = None
if self._http_client:
await self._http_client.aclose()
self._http_client = None
self._initialized = False
logger.info("telegram_gateway_closed")
# =========================================================================
# Long Polling 實作 (Phase 5 內網修復)
# =========================================================================
async def start_long_polling(self) -> None:
"""
啟動 Long Polling 背景任務
取代 Webhook 模式,適用於內網環境
統帥鐵律: 內網無法接收外部 Webhook必須主動輪詢
2026-04-01 Claude Code: 加入 Redis Leader Election
多 Pod 環境下,只有 Leader 執行 getUpdates其餘 Pod 進入 Watcher 模式
"""
if not self._initialized:
success = await self.initialize()
if not success:
logger.error("telegram_long_polling_failed", reason="Gateway not initialized")
return
if self._polling_active:
logger.warning("telegram_long_polling_already_running")
return
# 嘗試取得 Leader Lock (NX = 僅在不存在時設定)
redis = await get_redis()
acquired = await redis.set(POLLING_LEADER_KEY, self._pod_id, nx=True, ex=POLLING_LEADER_TTL)
if not acquired:
current_leader = await redis.get(POLLING_LEADER_KEY)
logger.info(
"telegram_polling_not_leader",
pod_id=self._pod_id,
current_leader=current_leader,
action="watcher_mode",
)
# 啟動 Watcher定期嘗試接管
self._leader_task = asyncio.create_task(self._leader_watcher())
return
# 取得 Leader Lock開始 Polling
await self._delete_webhook()
self._polling_active = True
self._last_update_id = 0
self._polling_task = asyncio.create_task(self._polling_loop())
self._leader_task = asyncio.create_task(self._leader_renewer())
logger.info(
"telegram_long_polling_started",
pod_id=self._pod_id,
timeout=LONG_POLLING_TIMEOUT,
chat_id=self.chat_id[:10] + "..." if self.chat_id else "N/A",
)
async def _delete_webhook(self) -> None:
"""
刪除現有 Webhook (切換至 Long Polling 模式)
統帥鐵律: Webhook 和 Long Polling 不能共存
必須先刪除 Webhook 才能使用 getUpdates
"""
if not self._http_client:
return
try:
# Step 1: 刪除 Webhook
url = f"{self.api_url}/deleteWebhook"
response = await self._http_client.post(url, json={"drop_pending_updates": True})
result = response.json()
if result.get("ok"):
logger.info(
"telegram_webhook_deleted",
description=result.get("description", "Webhook deleted"),
)
else:
logger.warning(
"telegram_webhook_delete_failed",
error=result.get("description"),
)
# Step 2: 等待 Telegram 伺服器同步 (避免 409 Conflict)
await asyncio.sleep(1)
# Step 3: 驗證 Webhook 狀態
info_url = f"{self.api_url}/getWebhookInfo"
info_response = await self._http_client.get(info_url)
info_result = info_response.json()
webhook_url = info_result.get("result", {}).get("url", "")
if webhook_url:
logger.warning(
"telegram_webhook_still_active",
url=webhook_url[:50],
)
else:
logger.info("telegram_webhook_confirmed_deleted")
except Exception as e:
logger.error("telegram_webhook_delete_error", error=str(e))
async def _polling_loop(self) -> None:
"""
Long Polling 主循環
使用 getUpdates API 持續監聽 Telegram 更新
"""
logger.info("[Telegram] Long polling started - 神經已接通,等待統帥指令...")
while self._polling_active:
try:
updates = await self._get_updates()
for update in updates:
await self._process_update(update)
except asyncio.CancelledError:
logger.info("telegram_long_polling_cancelled")
break
except httpx.TimeoutException:
# Long polling timeout 是正常的,繼續下一輪
continue
except httpx.HTTPStatusError as e:
if e.response.status_code == 409:
# 409 Conflict: 另一個 Pod 正在 polling主動釋放 Leader Lock
# 2026-04-01 Claude Code: 改為釋放 Lock 讓 Watcher 競爭
# (舊: 侵略性搶佔 2s已不適用 - 現在是多 Pod 場景而非 .188 搶佔)
logger.warning(
"telegram_polling_conflict",
status=409,
pod_id=self._pod_id,
action="releasing_leader_lock",
)
redis = await get_redis()
current = await redis.get(POLLING_LEADER_KEY)
if current == self._pod_id:
await redis.delete(POLLING_LEADER_KEY)
self._polling_active = False
# Watcher 會在 POLLING_LEADER_WATCH 秒後重新競爭
self._leader_task = asyncio.create_task(self._leader_watcher())
break
else:
logger.error("telegram_polling_http_error", status=e.response.status_code)
await asyncio.sleep(LONG_POLLING_RETRY_DELAY)
except Exception as e:
logger.error("telegram_polling_error", error=str(e))
# 錯誤後等待再重試
await asyncio.sleep(LONG_POLLING_RETRY_DELAY)
logger.info("telegram_long_polling_stopped")
async def _leader_renewer(self) -> None:
"""
Leader Lock 續約背景任務
每 POLLING_LEADER_RENEW 秒更新 Redis TTL
確保 Leader 在 Poll 期間持續持有 Lock。
若 Lock 被搶走,停止 Polling。
2026-04-01 Claude Code: 分散式 Leader Election
"""
while self._polling_active:
await asyncio.sleep(POLLING_LEADER_RENEW)
if not self._polling_active:
break
try:
redis = await get_redis()
current = await redis.get(POLLING_LEADER_KEY)
if current != self._pod_id:
logger.warning(
"telegram_leader_lock_lost",
pod_id=self._pod_id,
current_leader=current,
)
self._polling_active = False
break
await redis.expire(POLLING_LEADER_KEY, POLLING_LEADER_TTL)
except Exception as e:
logger.error("telegram_leader_renew_error", error=str(e))
async def _leader_watcher(self) -> None:
"""
非 Leader Pod 的接管監控任務
每 POLLING_LEADER_WATCH 秒嘗試取得 Leader Lock。
若原 Leader 宕掉TTL 過期),此 Pod 接管 Polling。
2026-04-01 Claude Code: 分散式 Leader Election
"""
while not self._polling_active:
await asyncio.sleep(POLLING_LEADER_WATCH)
try:
redis = await get_redis()
acquired = await redis.set(
POLLING_LEADER_KEY, self._pod_id, nx=True, ex=POLLING_LEADER_TTL
)
if acquired:
logger.info(
"telegram_leader_acquired",
pod_id=self._pod_id,
action="starting_polling",
)
await self._delete_webhook()
self._polling_active = True
self._last_update_id = 0
self._polling_task = asyncio.create_task(self._polling_loop())
self._leader_task = asyncio.create_task(self._leader_renewer())
break
except asyncio.CancelledError:
break
except Exception as e:
logger.error("telegram_leader_watch_error", error=str(e))
async def _get_updates(self) -> list[dict]:
"""
呼叫 Telegram getUpdates API
Returns:
list[dict]: 更新列表
"""
if not self._http_client:
return []
url = f"{self.api_url}/getUpdates"
payload = {
"offset": self._last_update_id + 1,
"timeout": LONG_POLLING_TIMEOUT,
"allowed_updates": ["callback_query", "message"], # 監聽按鈕與文字訊息
}
response = await self._http_client.post(
url,
json=payload,
timeout=LONG_POLLING_TIMEOUT + 10, # 比 API timeout 多一點
)
response.raise_for_status()
result = response.json()
if not result.get("ok"):
raise TelegramGatewayError(f"getUpdates failed: {result.get('description')}")
updates = result.get("result", [])
# 更新 offset
if updates:
self._last_update_id = updates[-1]["update_id"]
return updates
async def _process_update(self, update: dict) -> None:
"""
處理單個 Telegram Update
Args:
update: Telegram Update 物件
"""
update_id = update.get("update_id")
callback_query = update.get("callback_query")
message = update.get("message")
if not callback_query and not message:
logger.debug("telegram_update_ignored", update_id=update_id, reason="unsupported update type")
return
if callback_query:
await self._handle_callback_query(update_id, callback_query)
elif message:
await self._handle_chat_message(update_id, message)
async def _handle_callback_query(self, update_id: int, callback_query: dict) -> None:
"""處理按鈕點擊更新"""
callback_query_id = callback_query.get("id")
callback_data = callback_query.get("data")
user = callback_query.get("from", {})
user_id = user.get("id")
if not all([callback_query_id, callback_data, user_id]):
logger.warning("telegram_callback_invalid", update_id=update_id)
return
username = user.get("username") or user.get("first_name") or str(user_id)
original_text = callback_query.get("message", {}).get("text", "")
message_id = callback_query.get("message", {}).get("message_id")
logger.info(
"telegram_callback_received",
update_id=update_id,
user_id=user_id,
username=username,
)
# 呼叫現有的 handle_callback 邏輯
result = await self.handle_callback(
callback_query_id=callback_query_id,
callback_data=callback_data,
user_id=user_id,
message_id=message_id,
original_text=original_text,
username=username,
)
if result.get("success"):
# 執行資料庫更新 (簽核/拒絕)
await self._execute_approval_action(
action=result["action"],
approval_id=result["approval_id"],
user_id=user_id,
username=username,
message_id=message_id,
)
async def _handle_chat_message(self, update_id: int, message: dict) -> None:
"""處理统帥的文字訊息(個人 chat 或 SRE 群組)"""
text = message.get("text")
user = message.get("from", {})
user_id = user.get("id")
chat_id = message.get("chat", {}).get("id")
chat_type = message.get("chat", {}).get("type", "private")
message_id = message.get("message_id")
username = user.get("username") or user.get("first_name") or str(user_id)
if not text or not user_id:
return
# Bot 訊息忽略(避免 Bot 互相觸發無限循環)
if user.get("is_bot"):
return
logger.info(
"telegram_chat_received",
update_id=update_id,
user_id=user_id,
username=username,
chat_type=chat_type,
text=text[:50],
)
# 1. 群組訊息路由優先 (2026-04-03 ogt: SRE 戰情室群組無需個人白名單)
# 群組是封閉環境,成員由 Telegram 群組管理員控制,不走個人 whitelist
is_group = chat_type in ("group", "supergroup")
is_sre_group = str(chat_id) == str(settings.SRE_GROUP_CHAT_ID)
if is_group and is_sre_group:
reply_to_message = message.get("reply_to_message")
await self._handle_group_message(text, user_id, username, chat_id, message_id, reply_to_message)
return
# 2. 個人 chat 安全檢查 (ADR-012)
try:
interceptor = get_security_interceptor()
await interceptor.intercept_telegram(user_id)
except Exception as e:
logger.warning("telegram_chat_unauthorized", user_id=user_id, error=str(e))
return
# 3. /ai 指令攔截 (Phase 24 C — 2026-04-03 ogt)
if text.strip().lower().startswith("/ai"):
whitelist = settings.get_tg_user_whitelist()
if not whitelist or user_id not in whitelist:
logger.warning("telegram_ai_command_unauthorized", user_id=user_id, whitelist_empty=not whitelist)
await self.send_notification("⛔ 未授權:/ai 指令僅限白名單用戶", parse_mode="HTML", chat_id=chat_id)
return
from src.services.ai_control import handle_ai_command
response = await handle_ai_command(text.strip())
await self.send_notification(response, parse_mode="HTML", chat_id=chat_id)
logger.info("telegram_ai_command_handled", user_id=user_id, text=text[:50])
return
# 4. 個人 chat — 顯示輸入狀態
await self._send_chat_action(chat_id, "typing")
# 5. ChatManager 處理(個人 chat
chat_manager = get_chat_manager()
response = await chat_manager.generate_response(
user_id=user_id,
username=username,
message_text=text,
)
await self.send_notification(response, parse_mode="HTML", chat_id=chat_id)
async def _handle_group_message(
self,
text: str,
user_id: int,
username: str,
chat_id: int, # noqa: ARG002
message_id: int | None,
reply_to_message: dict | None = None,
) -> None:
"""
處理 SRE 群組訊息 (2026-04-03 ogt: Phase 22.6 Triumvirate)
路由規則:
Reply OpenClaw 訊息 → 只有 OpenClaw 回應
Reply NemoClaw 訊息 → 只有 NemoClaw 回應
@OpenClawAwoooI_Bot <msg> → 只有 OpenClaw 回應
@NemoTronAwoooI_Bot <msg> → 只有 NemoClaw 回應
其他訊息 → 兩個 AI 並行回應
"""
# ── 指令路由 (2026-04-03 ogt: 方案B slash commands) ──────────────────
cmd = text.strip().split()[0].lower().split("@")[0] if text.strip() else ""
if cmd.startswith("/"):
await self._handle_group_command(cmd, chat_id, message_id)
return
from src.services.chat_manager import get_chat_manager as _get_cm
chat_mgr = _get_cm()
# 全形/半形統一化後比較
import unicodedata
text_normalized = unicodedata.normalize("NFKC", text).lower()
# Reply 路由: 若 Reply 的是 Bot 訊息,直接認定目標 AI (2026-04-03 ogt)
if reply_to_message:
replied_from = reply_to_message.get("from", {})
if replied_from.get("is_bot"):
replied_username = (replied_from.get("username") or "").lower()
if "openclawawoooi" in replied_username:
mention_openclaw, mention_nemo = True, False
elif "nemotronawoooi" in replied_username:
mention_openclaw, mention_nemo = False, True
else:
mention_openclaw = "@openclawawoooi_bot" in text_normalized or "小o" in text_normalized
mention_nemo = "@nemotronawoooi_bot" in text_normalized or "小賀" in text_normalized or "小贺" in text_normalized
else:
mention_openclaw = "@openclawawoooi_bot" in text_normalized or "小o" in text_normalized
mention_nemo = "@nemotronawoooi_bot" in text_normalized or "小賀" in text_normalized or "小贺" in text_normalized
else:
# 別名: 小O / 小o (含全形O) → OpenClaw; 小賀 / 小贺 → NemoClaw
mention_openclaw = "@openclawawoooi_bot" in text_normalized or "小o" in text_normalized
mention_nemo = "@nemotronawoooi_bot" in text_normalized or "小賀" in text_normalized or "小贺" in text_normalized
# 去掉 @ mention 與別名,取出純訊息
clean_text = unicodedata.normalize("NFKC", text)
for token in ["@openclawawoooi_bot", "@OpenClawAwoooI_Bot", "@nemotronawoooi_bot", "@NemoTronAwoooI_Bot",
"小O", "小o", "小O", "小o", "小賀", "小贺"]:
clean_text = clean_text.replace(token, "").strip()
if not clean_text:
clean_text = text
context = await chat_mgr.get_system_context()
if mention_openclaw and not mention_nemo:
# 只 OpenClaw 回應
result = await chat_mgr._call_openclaw(
f"{context}\n用戶 {username} 在 SRE 戰情室問你:",
clean_text,
)
await self.send_as_openclaw(
text=f"🦞 <b>OpenClaw</b>\n\n{result or '🔴 無響應'}",
reply_to_message_id=message_id,
)
elif mention_nemo and not mention_openclaw:
# 只 NemoClaw 回應
result = await chat_mgr._call_nemotron(
f"{context}\n用戶 {username} 在 SRE 戰情室問你:",
clean_text,
)
await self.send_as_nemotron(
text=f"🤖 <b>NemoClaw</b>\n\n{result or '🔴 無響應 (NIM 超時)'}",
reply_to_message_id=message_id,
)
else:
# 兩個 AI 並行回應,完成後互相評論
oc_task = asyncio.create_task(
chat_mgr._call_openclaw(f"{context}\n用戶 {username} 在 SRE 戰情室:", clean_text)
)
nemo_task = asyncio.create_task(
chat_mgr._call_nemotron(f"{context}\n用戶 {username} 在 SRE 戰情室:", clean_text)
)
oc_result, nemo_result = await asyncio.gather(oc_task, nemo_task, return_exceptions=True)
if oc_result and not isinstance(oc_result, Exception):
await self.send_as_openclaw(
text=f"🦞 <b>OpenClaw</b>\n\n{oc_result}",
reply_to_message_id=message_id,
)
if nemo_result and not isinstance(nemo_result, Exception):
await self.send_as_nemotron(
text=f"🤖 <b>NemoClaw</b>\n\n{nemo_result}",
reply_to_message_id=message_id,
)
logger.info("group_message_handled", user_id=user_id, text=text[:50])
async def _handle_group_command(self, cmd: str, chat_id: int, message_id: int | None) -> None: # noqa: ARG002
"""
SRE 群組 Slash Commands (2026-04-03 ogt: 方案B)
/status → K8s Cluster 健康狀態
/incidents → 活躍告警列表
/cost → 本月 AI 費用統計
/pods → 異常 Pod 列表
/help → 指令說明
"""
from src.repositories.k8s_repository import get_k8s_repository
from src.repositories.incident_repository import get_incident_repository
from src.core.redis_client import get_redis
from src.utils.timezone import now_taipei
if cmd == "/status":
try:
k8s = get_k8s_repository()
s = await k8s.get_pod_status_summary(namespace="awoooi-prod")
running, total = s.get("running", 0), s.get("total", 0)
problems = s.get("problem_pods", [])
lines = [f"<b>🖥 Cluster 狀態</b>", f"• Pods: {running}/{total} Running"]
if problems:
lines.append(f"• 異常: {len(problems)}")
for p in problems[:5]:
lines.append(f" ⚠️ {p}")
else:
lines.append("• 全部正常 ✅")
msg = "\n".join(lines)
except Exception as e:
msg = f"<b>🖥 Cluster 狀態</b>\n⚠️ 無法取得: {e}"
await self.send_as_openclaw(text=msg, reply_to_message_id=message_id)
elif cmd == "/incidents":
try:
repo = get_incident_repository()
incidents = await repo.get_active()
if incidents:
lines = ["<b>🚨 活躍告警</b>"]
for inc in incidents[:10]:
lines.append(f"• <code>{inc.incident_id}</code> SEV{inc.severity.value}{inc.status.value}")
msg = "\n".join(lines)
else:
msg = "<b>🚨 活躍告警</b>\n✅ 目前無告警"
except Exception as e:
msg = f"<b>🚨 活躍告警</b>\n⚠️ 無法取得: {e}"
await self.send_as_openclaw(text=msg, reply_to_message_id=message_id)
elif cmd == "/cost":
redis = get_redis()
month = now_taipei().strftime("%Y-%m")
try:
gemini_cost = float(await redis.get(f"gemini_cost:{month}") or 0)
claude_cost = float(await redis.get(f"claude_cost:{month}") or 0)
total = gemini_cost + claude_cost
msg = (
f"<b>💰 {month} AI 費用統計</b>\n"
f"• 🦞 OpenClaw (Gemini Flash-Lite): <b>${gemini_cost:.4f}</b> / $10.00 上限\n"
f"• 🤖 NemoClaw (Claude Haiku 4.5): <b>${claude_cost:.4f}</b>\n"
f"• 合計: <b>${total:.4f}</b>"
)
except Exception as e:
msg = f"<b>💰 費用統計</b>\n⚠️ 無法取得: {e}"
await self.send_as_openclaw(text=msg, reply_to_message_id=message_id)
elif cmd == "/pods":
try:
k8s = get_k8s_repository()
s = await k8s.get_pod_status_summary(namespace="awoooi-prod")
problems = s.get("problem_pods", [])
if problems:
lines = [f"<b>⚠️ 異常 Pod ({len(problems)} 個)</b>"]
for p in problems[:15]:
lines.append(f"• <code>{p}</code>")
msg = "\n".join(lines)
else:
msg = "<b>⚠️ 異常 Pod</b>\n✅ 全部 Pod 正常"
except Exception as e:
msg = f"<b>⚠️ 異常 Pod</b>\n⚠️ 無法取得: {e}"
await self.send_as_openclaw(text=msg, reply_to_message_id=message_id)
elif cmd == "/help":
msg = (
"<b>🤖 SRE 戰情室指令</b>\n\n"
"/status — 查詢 K8s Cluster 狀態\n"
"/incidents — 列出活躍告警\n"
"/cost — 查詢本月 AI 費用\n"
"/pods — 列出異常 Pod\n"
"/help — 顯示此說明\n\n"
"<b>對話方式:</b>\n"
"• 直接輸入 → 小O + 小賀 同時回應\n"
"• 小O 或 @OpenClawAwoooI_Bot → 只有 OpenClaw\n"
"• 小賀 或 @NemoTronAwoooI_Bot → 只有 NemoClaw\n"
"• Reply 某個 Bot 的訊息 → 只有那個 Bot 回應"
)
await self.send_as_openclaw(text=msg, reply_to_message_id=message_id)
else:
logger.debug("group_unknown_command", cmd=cmd)
async def _send_chat_action(self, chat_id: int, action: str) -> None:
"""發送聊天狀態 (e.g., typing)"""
if not self._http_client: return
try:
url = f"{self.api_url}/sendChatAction"
await self._http_client.post(url, json={"chat_id": chat_id, "action": action})
except: pass
async def _execute_approval_action(
self,
action: str,
approval_id: str,
user_id: int,
username: str,
message_id: int, # noqa: ARG002
) -> None:
"""
執行簽核動作 (更新資料庫)
Args:
action: approve/reject/tune
approval_id: 簽核單 ID
user_id: Telegram User ID
username: 使用者名稱
message_id: 訊息 ID
"""
# 2026-03-29 ogt: 修復方法呼叫 - add_signature/reject 不存在
# 正確方法: sign_approval / reject_approval
from uuid import UUID
from src.services.approval_db import get_approval_service
try:
service = get_approval_service()
if action == "approve":
# 2026-03-29 ogt: 正確呼叫 sign_approval (返回 tuple)
approval, message, execution_triggered = await service.sign_approval(
approval_id=UUID(approval_id),
signer_id=f"tg_{user_id}",
signer_name=username,
comment="Telegram 簽核 (Long Polling)",
)
if approval:
logger.info(
"telegram_approval_signed_via_polling",
approval_id=approval_id,
user_id=user_id,
status=approval.status.value,
execution_triggered=execution_triggered,
)
print(f"\n{'='*60}")
print("✅ 統帥已授權執行!")
print(f"簽核單: {approval_id}")
print(f"簽核者: @{username} (ID: {user_id})")
print(f"狀態: {approval.status.value}")
print(f"執行觸發: {execution_triggered}")
print(f"時間: {datetime.now(UTC).isoformat()}")
print(f"{'='*60}\n")
elif action == "reject":
# 2026-03-29 ogt: 正確呼叫 reject_approval (返回 tuple)
approval, message = await service.reject_approval(
approval_id=UUID(approval_id),
rejector_id=f"tg_{user_id}",
rejector_name=username,
reason="Telegram 拒絕 (Long Polling)",
)
if approval:
logger.info(
"telegram_approval_rejected_via_polling",
approval_id=approval_id,
user_id=user_id,
)
print(f"\n{'='*60}")
print("❌ 統帥已拒絕執行!")
print(f"簽核單: {approval_id}")
print(f"拒絕者: @{username}")
print(f"{'='*60}\n")
elif action == "tune":
# 自動調優已在 handle_callback 中處理
logger.info(
"telegram_auto_tuning_via_polling",
approval_id=approval_id,
user_id=user_id,
)
except Exception as e:
logger.error(
"telegram_approval_action_failed",
action=action,
approval_id=approval_id,
error=str(e),
)
# =============================================================================
# Phase 6.5: 心跳監控方法
# =============================================================================
async def _check_nemotron_health(self) -> tuple[bool, str]:
"""
探測 Nemotron (NVIDIA NIM) 是否可用
2026-04-03 ogt: 新增 — Nemotron 100% 超時但沒有告警,補足監控盲區
Returns: (is_healthy, status_text)
"""
import httpx
from src.core.config import get_settings
settings = get_settings()
api_key = settings.NVIDIA_API_KEY
if not api_key:
return False, "❌ NVIDIA_API_KEY 未設定"
# 2026-04-03 ogt: 用 /v1/models 輕量端點探測,避免觸發推理計費
# timeout 改為 25s — NIM 免費 tier 冷啟動可能需要 15-20s
try:
async with httpx.AsyncClient(timeout=25.0) as client:
resp = await client.get(
"https://integrate.api.nvidia.com/v1/models",
headers={"Authorization": f"Bearer {api_key}"},
)
if resp.status_code == 200:
return True, "✅ 正常"
return False, f"❌ HTTP {resp.status_code}"
except httpx.TimeoutException:
return False, "⚠️ 超時 (>25s)"
except Exception as e:
return False, f"{str(e)[:40]}"
async def send_heartbeat(self) -> bool:
"""
發送心跳訊息 (系統狀態摘要,含 Nemotron 健康探測)
每 30 分鐘執行一次,證明告警鏈路正常運作
2026-04-03 ogt: 加入 Nemotron 健康探測 — 補足監控盲區
"""
try:
if not self._initialized:
await self.initialize()
from src.utils.timezone import now_taipei
taipei_now = now_taipei()
# Nemotron 健康探測
nemo_ok, nemo_status = await self._check_nemotron_health()
text = f"""💓 <b>AWOOOI 心跳</b>
{taipei_now.strftime('%Y-%m-%d %H:%M:%S')} (台北)
📡 告警鏈路: ✅ 正常
🤖 Nemotron NIM: {nemo_status}"""
await self.send_notification(text)
self._last_message_time = datetime.now(UTC)
# Nemotron 異常時告警通知不自動關閉NIM 免費 tier 本來就慢)
# 2026-04-03 ogt: 修正 — 之前錯誤地自動關閉 Nemotron 協作
# Nemotron 是產品核心11-45s是免費 tier 特性,不是需要修復的異常
if not nemo_ok:
alert = InfraAlertMessage(
component="Nemotron NIM (NVIDIA API)",
status=nemo_status,
impact="NIM 免費 tier 回應慢 (11-45s)@nemo 對話可能需等待",
note="NIM 慢屬正常 — 免費 tier 特性,非故障。如需快速回應請用 @openclaw",
)
await self.send_notification(alert.format(), parse_mode="HTML")
logger.warning("nemotron_health_slow_alert", status=nemo_status)
logger.info("telegram_heartbeat_sent", nemotron_ok=nemo_ok)
return True
except Exception as e:
logger.error("telegram_heartbeat_failed", error=str(e))
return False
async def start_heartbeat_monitor(
self,
heartbeat_interval_minutes: int = 30,
silence_threshold_hours: int = 2,
) -> None:
"""
啟動心跳監控背景任務
Args:
heartbeat_interval_minutes: 心跳間隔 (預設 30 分鐘)
silence_threshold_hours: 沉默告警閾值 (預設 2 小時)
"""
if self._heartbeat_active:
logger.warning("telegram_heartbeat_already_running")
return
self._heartbeat_active = True
self._heartbeat_task = asyncio.create_task(
self._heartbeat_loop(heartbeat_interval_minutes, silence_threshold_hours)
)
logger.info(
"telegram_heartbeat_monitor_started",
interval_minutes=heartbeat_interval_minutes,
silence_threshold_hours=silence_threshold_hours,
)
async def _heartbeat_loop(
self,
interval_minutes: int,
silence_hours: int,
) -> None:
"""心跳監控循環"""
interval_seconds = interval_minutes * 60
silence_seconds = silence_hours * 3600
while self._heartbeat_active:
try:
# 1. 檢查沉默告警
if self._last_message_time:
silence_duration = (datetime.now(UTC) - self._last_message_time).total_seconds()
if silence_duration > silence_seconds:
# 發送沉默告警
hours = int(silence_duration // 3600)
await self.send_notification(
f"⚠️ <b>沉默告警</b>\n\n"
f"Telegram 已 <b>{hours} 小時</b>沒有收到任何訊息!\n"
f"請檢查告警鏈路是否正常運作。"
)
logger.warning(
"telegram_silence_alert",
silence_hours=hours,
)
# 2. 發送心跳
await self.send_heartbeat()
# 3. 等待下一次
await asyncio.sleep(interval_seconds)
except asyncio.CancelledError:
break
except Exception as e:
logger.error("telegram_heartbeat_loop_error", error=str(e))
await asyncio.sleep(60) # 錯誤後等待 1 分鐘重試
async def stop_heartbeat_monitor(self) -> None:
"""停止心跳監控"""
self._heartbeat_active = False
if self._heartbeat_task and not self._heartbeat_task.done():
self._heartbeat_task.cancel()
try:
await self._heartbeat_task
except asyncio.CancelledError:
pass
self._heartbeat_task = None
logger.info("telegram_heartbeat_monitor_stopped")
# =============================================================================
# Singleton
# =============================================================================
_gateway: TelegramGateway | None = None
def get_telegram_gateway() -> TelegramGateway:
"""取得全域 TelegramGateway 實例"""
global _gateway
if _gateway is None:
_gateway = TelegramGateway()
return _gateway