From adaef514dc4cf01868640cb0f4e7250b31fb21b6 Mon Sep 17 00:00:00 2001 From: OG T Date: Mon, 30 Mar 2026 01:56:50 +0800 Subject: [PATCH] =?UTF-8?q?feat(api):=20Phase=20C=20P1=20Telegram=20Gatewa?= =?UTF-8?q?y=20OTEL=20=E8=BF=BD=E8=B9=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 _tracer for awoooi.telegram_gateway - _send_request: 追蹤所有 API 呼叫 (method, chat_id, message_id) - send_cicd_progress: 追蹤 CI/CD 通知 (含重試次數) 首席架構師審查 P1 改進 - 可觀測性 Co-Authored-By: Claude Opus 4.5 --- apps/api/src/services/telegram_gateway.py | 172 ++++++++++++++-------- 1 file changed, 111 insertions(+), 61 deletions(-) diff --git a/apps/api/src/services/telegram_gateway.py b/apps/api/src/services/telegram_gateway.py index 97aaf730..d147391b 100644 --- a/apps/api/src/services/telegram_gateway.py +++ b/apps/api/src/services/telegram_gateway.py @@ -29,6 +29,7 @@ from datetime import UTC, datetime import httpx import structlog +from opentelemetry import trace from src.core.config import settings from src.core.redis_client import get_redis @@ -48,6 +49,12 @@ SILENCE_TTL_SECONDS = 60 * 60 # 1 小時 logger = structlog.get_logger(__name__) +# ============================================================================= +# OTEL Tracer (Phase C P1 可觀測性) +# 2026-03-30 Claude Code: 新增 Telegram Gateway 追蹤 +# ============================================================================= +_tracer = trace.get_tracer("awoooi.telegram_gateway", "1.0.0") + # ============================================================================= # Long Polling 配置 (Phase 5 內網修復) @@ -786,6 +793,10 @@ class TelegramGateway: """ 發送 Telegram API 請求 + Phase C P1: 新增 OTEL 追蹤 + @author Claude Code + @date 2026-03-30 (台北時間) + Args: method: API 方法 (sendMessage, editMessageText, etc.) payload: 請求 Payload @@ -801,25 +812,50 @@ class TelegramGateway: url = f"{self.api_url}/{method}" - try: - response = await self._http_client.post(url, json=payload) - response.raise_for_status() - result = response.json() + # OTEL Span: telegram.api.{method} + with _tracer.start_as_current_span( + f"telegram.api.{method}", + attributes={ + "telegram.method": method, + "telegram.chat_id": str(payload.get("chat_id", "")), + "telegram.has_reply_markup": "reply_markup" in payload, + }, + ) as span: + try: + response = await self._http_client.post(url, json=payload) + response.raise_for_status() + result = response.json() - if not result.get("ok"): - raise TelegramGatewayError( - f"Telegram API error: {result.get('description', 'Unknown error')}" - ) + if not result.get("ok"): + span.set_attribute("telegram.error", result.get("description", "Unknown")) + span.set_status(trace.Status(trace.StatusCode.ERROR)) + raise TelegramGatewayError( + f"Telegram API error: {result.get('description', 'Unknown error')}" + ) - return result + # 成功: 記錄 message_id + if "result" in result and "message_id" in result["result"]: + span.set_attribute("telegram.message_id", result["result"]["message_id"]) - except httpx.HTTPStatusError as e: - logger.error("telegram_api_error", method=method, status=e.response.status_code) - raise TelegramGatewayError(f"HTTP error: {e.response.status_code}") from e + span.set_status(trace.Status(trace.StatusCode.OK)) + return result - except Exception as e: - logger.error("telegram_request_failed", method=method, error=str(e)) - raise TelegramGatewayError(str(e)) from e + except httpx.HTTPStatusError as e: + span.set_attribute("telegram.http_status", e.response.status_code) + span.set_status(trace.Status(trace.StatusCode.ERROR)) + span.record_exception(e) + logger.error("telegram_api_error", method=method, status=e.response.status_code) + raise TelegramGatewayError(f"HTTP error: {e.response.status_code}") from e + + except TelegramGatewayError: + # 已處理的錯誤,直接拋出 + raise + + except Exception as e: + span.set_status(trace.Status(trace.StatusCode.ERROR)) + span.record_exception(e) + logger.error("telegram_request_failed", method=method, error=str(e)) + raise TelegramGatewayError(str(e)) from e def _build_inline_keyboard( self, @@ -1304,56 +1340,70 @@ class TelegramGateway: Returns: dict: Telegram API 回應 """ - msg = CICDProgressMessage( - job_name=job_name, - status=status, - stage=stage, - commit_sha=commit_sha, - triggered_by=triggered_by, - duration_seconds=duration_seconds, - message=message, - workflow_url=workflow_url, - ) + # OTEL Span: telegram.send_cicd_progress + with _tracer.start_as_current_span( + "telegram.send_cicd_progress", + attributes={ + "telegram.job_name": job_name, + "telegram.status": status, + "telegram.stage": stage, + "telegram.max_retries": max_retries, + }, + ) as span: + msg = CICDProgressMessage( + job_name=job_name, + status=status, + stage=stage, + commit_sha=commit_sha, + triggered_by=triggered_by, + duration_seconds=duration_seconds, + message=message, + workflow_url=workflow_url, + ) - payload = { - "chat_id": self.chat_id, - "text": msg.format(), - "parse_mode": "HTML", - "disable_web_page_preview": True, - } + payload = { + "chat_id": self.chat_id, + "text": msg.format(), + "parse_mode": "HTML", + "disable_web_page_preview": True, + } - logger.info("telegram_cicd_progress_sending", job=job_name, status=status) + logger.info("telegram_cicd_progress_sending", job=job_name, status=status) - # 重試機制 (指數退避) - import asyncio - last_error = None - for attempt in range(max_retries): - try: - result = await self._send_request("sendMessage", payload) - logger.info("telegram_cicd_progress_sent", job=job_name, status=status, attempt=attempt + 1) - return result - except TelegramGatewayError as e: - last_error = e - if attempt < max_retries - 1: - delay = 2 ** attempt # 1, 2, 4 秒 - logger.warning( - "telegram_cicd_progress_retry", - job=job_name, - attempt=attempt + 1, - delay=delay, - error=str(e), - ) - await asyncio.sleep(delay) + # 重試機制 (指數退避) + last_error = None + for attempt in range(max_retries): + try: + result = await self._send_request("sendMessage", payload) + span.set_attribute("telegram.attempts", attempt + 1) + span.set_status(trace.Status(trace.StatusCode.OK)) + logger.info("telegram_cicd_progress_sent", job=job_name, status=status, attempt=attempt + 1) + return result + except TelegramGatewayError as e: + last_error = e + if attempt < max_retries - 1: + delay = 2 ** attempt # 1, 2, 4 秒 + logger.warning( + "telegram_cicd_progress_retry", + job=job_name, + attempt=attempt + 1, + delay=delay, + error=str(e), + ) + await asyncio.sleep(delay) - # 所有重試都失敗 - logger.error( - "telegram_cicd_progress_failed", - job=job_name, - status=status, - max_retries=max_retries, - error=str(last_error), - ) - raise last_error + # 所有重試都失敗 + span.set_attribute("telegram.attempts", max_retries) + span.set_status(trace.Status(trace.StatusCode.ERROR)) + span.record_exception(last_error) + logger.error( + "telegram_cicd_progress_failed", + job=job_name, + status=status, + max_retries=max_retries, + error=str(last_error), + ) + raise last_error async def send_deploy_success( self,