feat(api): Phase C P1 Telegram Gateway OTEL 追蹤
All checks were successful
CD Pipeline / build-and-deploy (push) Successful in 4m33s
E2E Health Check / e2e-health (push) Successful in 18s

- 新增 _tracer for awoooi.telegram_gateway
- _send_request: 追蹤所有 API 呼叫 (method, chat_id, message_id)
- send_cicd_progress: 追蹤 CI/CD 通知 (含重試次數)

首席架構師審查 P1 改進 - 可觀測性

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
OG T
2026-03-30 01:56:50 +08:00
parent 13bb1496b0
commit adaef514dc

View File

@@ -29,6 +29,7 @@ from datetime import UTC, datetime
import httpx
import structlog
from opentelemetry import trace
from src.core.config import settings
from src.core.redis_client import get_redis
@@ -48,6 +49,12 @@ SILENCE_TTL_SECONDS = 60 * 60 # 1 小時
logger = structlog.get_logger(__name__)
# =============================================================================
# OTEL Tracer (Phase C P1 可觀測性)
# 2026-03-30 Claude Code: 新增 Telegram Gateway 追蹤
# =============================================================================
_tracer = trace.get_tracer("awoooi.telegram_gateway", "1.0.0")
# =============================================================================
# Long Polling 配置 (Phase 5 內網修復)
@@ -786,6 +793,10 @@ class TelegramGateway:
"""
發送 Telegram API 請求
Phase C P1: 新增 OTEL 追蹤
@author Claude Code
@date 2026-03-30 (台北時間)
Args:
method: API 方法 (sendMessage, editMessageText, etc.)
payload: 請求 Payload
@@ -801,25 +812,50 @@ class TelegramGateway:
url = f"{self.api_url}/{method}"
try:
response = await self._http_client.post(url, json=payload)
response.raise_for_status()
result = response.json()
# OTEL Span: telegram.api.{method}
with _tracer.start_as_current_span(
f"telegram.api.{method}",
attributes={
"telegram.method": method,
"telegram.chat_id": str(payload.get("chat_id", "")),
"telegram.has_reply_markup": "reply_markup" in payload,
},
) as span:
try:
response = await self._http_client.post(url, json=payload)
response.raise_for_status()
result = response.json()
if not result.get("ok"):
raise TelegramGatewayError(
f"Telegram API error: {result.get('description', 'Unknown error')}"
)
if not result.get("ok"):
span.set_attribute("telegram.error", result.get("description", "Unknown"))
span.set_status(trace.Status(trace.StatusCode.ERROR))
raise TelegramGatewayError(
f"Telegram API error: {result.get('description', 'Unknown error')}"
)
return result
# 成功: 記錄 message_id
if "result" in result and "message_id" in result["result"]:
span.set_attribute("telegram.message_id", result["result"]["message_id"])
except httpx.HTTPStatusError as e:
logger.error("telegram_api_error", method=method, status=e.response.status_code)
raise TelegramGatewayError(f"HTTP error: {e.response.status_code}") from e
span.set_status(trace.Status(trace.StatusCode.OK))
return result
except Exception as e:
logger.error("telegram_request_failed", method=method, error=str(e))
raise TelegramGatewayError(str(e)) from e
except httpx.HTTPStatusError as e:
span.set_attribute("telegram.http_status", e.response.status_code)
span.set_status(trace.Status(trace.StatusCode.ERROR))
span.record_exception(e)
logger.error("telegram_api_error", method=method, status=e.response.status_code)
raise TelegramGatewayError(f"HTTP error: {e.response.status_code}") from e
except TelegramGatewayError:
# 已處理的錯誤,直接拋出
raise
except Exception as e:
span.set_status(trace.Status(trace.StatusCode.ERROR))
span.record_exception(e)
logger.error("telegram_request_failed", method=method, error=str(e))
raise TelegramGatewayError(str(e)) from e
def _build_inline_keyboard(
self,
@@ -1304,56 +1340,70 @@ class TelegramGateway:
Returns:
dict: Telegram API 回應
"""
msg = CICDProgressMessage(
job_name=job_name,
status=status,
stage=stage,
commit_sha=commit_sha,
triggered_by=triggered_by,
duration_seconds=duration_seconds,
message=message,
workflow_url=workflow_url,
)
# OTEL Span: telegram.send_cicd_progress
with _tracer.start_as_current_span(
"telegram.send_cicd_progress",
attributes={
"telegram.job_name": job_name,
"telegram.status": status,
"telegram.stage": stage,
"telegram.max_retries": max_retries,
},
) as span:
msg = CICDProgressMessage(
job_name=job_name,
status=status,
stage=stage,
commit_sha=commit_sha,
triggered_by=triggered_by,
duration_seconds=duration_seconds,
message=message,
workflow_url=workflow_url,
)
payload = {
"chat_id": self.chat_id,
"text": msg.format(),
"parse_mode": "HTML",
"disable_web_page_preview": True,
}
payload = {
"chat_id": self.chat_id,
"text": msg.format(),
"parse_mode": "HTML",
"disable_web_page_preview": True,
}
logger.info("telegram_cicd_progress_sending", job=job_name, status=status)
logger.info("telegram_cicd_progress_sending", job=job_name, status=status)
# 重試機制 (指數退避)
import asyncio
last_error = None
for attempt in range(max_retries):
try:
result = await self._send_request("sendMessage", payload)
logger.info("telegram_cicd_progress_sent", job=job_name, status=status, attempt=attempt + 1)
return result
except TelegramGatewayError as e:
last_error = e
if attempt < max_retries - 1:
delay = 2 ** attempt # 1, 2, 4 秒
logger.warning(
"telegram_cicd_progress_retry",
job=job_name,
attempt=attempt + 1,
delay=delay,
error=str(e),
)
await asyncio.sleep(delay)
# 重試機制 (指數退避)
last_error = None
for attempt in range(max_retries):
try:
result = await self._send_request("sendMessage", payload)
span.set_attribute("telegram.attempts", attempt + 1)
span.set_status(trace.Status(trace.StatusCode.OK))
logger.info("telegram_cicd_progress_sent", job=job_name, status=status, attempt=attempt + 1)
return result
except TelegramGatewayError as e:
last_error = e
if attempt < max_retries - 1:
delay = 2 ** attempt # 1, 2, 4 秒
logger.warning(
"telegram_cicd_progress_retry",
job=job_name,
attempt=attempt + 1,
delay=delay,
error=str(e),
)
await asyncio.sleep(delay)
# 所有重試都失敗
logger.error(
"telegram_cicd_progress_failed",
job=job_name,
status=status,
max_retries=max_retries,
error=str(last_error),
)
raise last_error
# 所有重試都失敗
span.set_attribute("telegram.attempts", max_retries)
span.set_status(trace.Status(trace.StatusCode.ERROR))
span.record_exception(last_error)
logger.error(
"telegram_cicd_progress_failed",
job=job_name,
status=status,
max_retries=max_retries,
error=str(last_error),
)
raise last_error
async def send_deploy_success(
self,