fix(telegram): thread incident follow-up messages
All checks were successful
Code Review / ai-code-review (push) Successful in 10s
CD Pipeline / tests (push) Successful in 1m4s
CD Pipeline / build-and-deploy (push) Successful in 3m30s
CD Pipeline / post-deploy-checks (push) Successful in 1m19s

This commit is contained in:
Your Name
2026-05-07 01:11:02 +08:00
parent 1a72f771de
commit 1f4a16e625
2 changed files with 143 additions and 2 deletions

View File

@@ -66,6 +66,7 @@ POLLING_LEADER_WATCH = 30 # seconds - 非 Leader Pod 每 30s 嘗試接管
logger = structlog.get_logger(__name__)
_TELEGRAM_BOT_URL_RE = re.compile(r"(api\.telegram\.org/bot)[^/\s]+")
_INCIDENT_ID_RE = re.compile(r"\bINC-\d{8}-[A-Z0-9]{4,}\b")
def _sanitize_telegram_error(text: str) -> str:
@@ -81,6 +82,21 @@ def _is_noisy_failure_update(status_line: str) -> bool:
)
def _extract_incident_id_from_text(text: str) -> str | None:
"""從 Telegram 出站文字擷取 Incident ID。"""
match = _INCIDENT_ID_RE.search(text or "")
return match.group(0) if match else None
def _has_reply_context(payload: dict) -> bool:
return "reply_to_message_id" in payload or "reply_parameters" in payload
def _is_root_action_required_card(text: str) -> bool:
"""主告警卡片本身不自動 reply避免把新主卡接到舊訊息下。"""
return "ACTION REQUIRED" in text and "AI 自動化鏈路" in text
def _legacy_outbound_run_id(chat_id: str, provider_message_id: str) -> UUID:
"""Legacy Telegram 發送尚未有 run_id 時,產生穩定 soft run_id 供 Channel Hub 串接。"""
return uuid5(NAMESPACE_URL, f"awoooi:legacy-telegram:{chat_id}:{provider_message_id}")
@@ -88,7 +104,9 @@ def _legacy_outbound_run_id(chat_id: str, provider_message_id: str) -> UUID:
def _infer_outbound_message_type(text: str, payload: dict) -> str:
"""將既有 Telegram 訊息映射成 AwoooP outbound_message 的有限分類。"""
if "reply_to_message_id" in payload:
if "RUNBOOK REVIEW" in text or "待審核" in text:
return "approval_request"
if _has_reply_context(payload):
if "失敗" in text or "錯誤" in text or "FAILED" in text:
return "error"
return "final"
@@ -1505,6 +1523,8 @@ class TelegramGateway:
if not self._http_client:
raise TelegramGatewayError("HTTP client not initialized")
await self._attach_incident_thread_reply(method, payload)
url = f"{self.api_url}/{method}"
# OTEL Span: telegram.api.{method}
@@ -1567,6 +1587,58 @@ class TelegramGateway:
)
raise TelegramGatewayError(safe_error) from None
async def _attach_incident_thread_reply(self, method: str, payload: dict) -> None:
"""將同一 Incident 的後續 Telegram 訊息接回原告警卡片。
2026-05-07 Codex — 主卡 `tg_msg:{incident_id}` 已存在時,後續
Runbook / escalation / 執行摘要不要再形成頂層訊息洪水,而是以
Telegram reply thread 延續;主 ACTION REQUIRED 卡與已顯式 reply 的
payload 不改動。
"""
if method != "sendMessage" or _has_reply_context(payload):
return
text = str(payload.get("text") or "")
if not text or _is_root_action_required_card(text):
return
incident_id = _extract_incident_id_from_text(text)
if not incident_id:
return
try:
stored = await get_redis().get(f"tg_msg:{incident_id}")
except Exception as exc:
logger.debug(
"telegram_incident_thread_lookup_failed",
incident_id=incident_id,
error=str(exc),
)
return
if not stored:
return
try:
message_id = int(stored)
except (TypeError, ValueError):
logger.debug(
"telegram_incident_thread_invalid_message_id",
incident_id=incident_id,
stored=str(stored),
)
return
payload["reply_parameters"] = {
"message_id": message_id,
"allow_sending_without_reply": True,
}
logger.info(
"telegram_incident_thread_reply_attached",
incident_id=incident_id,
message_id=message_id,
)
async def _mirror_outbound_message(
self,
*,

View File

@@ -251,10 +251,79 @@ def test_outbound_message_type_inference():
assert (
telegram_gateway_module._infer_outbound_message_type(
"✅ 執行成功",
{"reply_to_message_id": 123},
{"reply_parameters": {"message_id": 123}},
)
== "final"
)
assert (
telegram_gateway_module._infer_outbound_message_type(
"📄 <b>RUNBOOK REVIEW待審核</b>",
{"reply_parameters": {"message_id": 123}},
)
== "approval_request"
)
def test_extract_incident_id_from_text():
"""Telegram 出站文字可擷取 Incident ID供後續訊息接回原告警卡片。"""
assert (
telegram_gateway_module._extract_incident_id_from_text(
"Incident: INC-20260506-E54736\nEntry ID: abc"
)
== "INC-20260506-E54736"
)
assert telegram_gateway_module._extract_incident_id_from_text("沒有事件編號") is None
@pytest.mark.asyncio
async def test_attach_incident_thread_reply(monkeypatch):
"""非主卡、含 Incident ID 的後續訊息,應自動 reply 原告警 message_id。"""
class FakeRedis:
async def get(self, key):
assert key == "tg_msg:INC-20260506-E54736"
return "9876"
gateway = TelegramGateway()
payload = {
"chat_id": gateway.alert_chat_id,
"text": "📄 RUNBOOK REVIEW待審核\nIncident: INC-20260506-E54736",
}
monkeypatch.setattr(telegram_gateway_module, "get_redis", lambda: FakeRedis())
await gateway._attach_incident_thread_reply("sendMessage", payload)
assert payload["reply_parameters"] == {
"message_id": 9876,
"allow_sending_without_reply": True,
}
@pytest.mark.asyncio
async def test_attach_incident_thread_skips_root_action_card(monkeypatch):
"""ACTION REQUIRED 主告警卡不應自動 reply 舊訊息。"""
class FakeRedis:
async def get(self, key): # pragma: no cover - 不應被呼叫
raise AssertionError(key)
gateway = TelegramGateway()
payload = {
"chat_id": gateway.alert_chat_id,
"text": (
" ACTION REQUIRED | 低風險\n"
"📋 INC-20260506-E54736\n"
"🤖 AI 自動化鏈路"
),
}
monkeypatch.setattr(telegram_gateway_module, "get_redis", lambda: FakeRedis())
await gateway._attach_incident_thread_reply("sendMessage", payload)
assert "reply_parameters" not in payload
def test_legacy_outbound_run_id_is_stable():