fix(telegram): thread incident follow-up messages
This commit is contained in:
@@ -66,6 +66,7 @@ POLLING_LEADER_WATCH = 30 # seconds - 非 Leader Pod 每 30s 嘗試接管
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
_TELEGRAM_BOT_URL_RE = re.compile(r"(api\.telegram\.org/bot)[^/\s]+")
|
||||
_INCIDENT_ID_RE = re.compile(r"\bINC-\d{8}-[A-Z0-9]{4,}\b")
|
||||
|
||||
|
||||
def _sanitize_telegram_error(text: str) -> str:
|
||||
@@ -81,6 +82,21 @@ def _is_noisy_failure_update(status_line: str) -> bool:
|
||||
)
|
||||
|
||||
|
||||
def _extract_incident_id_from_text(text: str) -> str | None:
|
||||
"""從 Telegram 出站文字擷取 Incident ID。"""
|
||||
match = _INCIDENT_ID_RE.search(text or "")
|
||||
return match.group(0) if match else None
|
||||
|
||||
|
||||
def _has_reply_context(payload: dict) -> bool:
|
||||
return "reply_to_message_id" in payload or "reply_parameters" in payload
|
||||
|
||||
|
||||
def _is_root_action_required_card(text: str) -> bool:
|
||||
"""主告警卡片本身不自動 reply,避免把新主卡接到舊訊息下。"""
|
||||
return "ACTION REQUIRED" in text and "AI 自動化鏈路" in text
|
||||
|
||||
|
||||
def _legacy_outbound_run_id(chat_id: str, provider_message_id: str) -> UUID:
|
||||
"""Legacy Telegram 發送尚未有 run_id 時,產生穩定 soft run_id 供 Channel Hub 串接。"""
|
||||
return uuid5(NAMESPACE_URL, f"awoooi:legacy-telegram:{chat_id}:{provider_message_id}")
|
||||
@@ -88,7 +104,9 @@ def _legacy_outbound_run_id(chat_id: str, provider_message_id: str) -> UUID:
|
||||
|
||||
def _infer_outbound_message_type(text: str, payload: dict) -> str:
|
||||
"""將既有 Telegram 訊息映射成 AwoooP outbound_message 的有限分類。"""
|
||||
if "reply_to_message_id" in payload:
|
||||
if "RUNBOOK REVIEW" in text or "待審核" in text:
|
||||
return "approval_request"
|
||||
if _has_reply_context(payload):
|
||||
if "失敗" in text or "錯誤" in text or "FAILED" in text:
|
||||
return "error"
|
||||
return "final"
|
||||
@@ -1505,6 +1523,8 @@ class TelegramGateway:
|
||||
if not self._http_client:
|
||||
raise TelegramGatewayError("HTTP client not initialized")
|
||||
|
||||
await self._attach_incident_thread_reply(method, payload)
|
||||
|
||||
url = f"{self.api_url}/{method}"
|
||||
|
||||
# OTEL Span: telegram.api.{method}
|
||||
@@ -1567,6 +1587,58 @@ class TelegramGateway:
|
||||
)
|
||||
raise TelegramGatewayError(safe_error) from None
|
||||
|
||||
async def _attach_incident_thread_reply(self, method: str, payload: dict) -> None:
|
||||
"""將同一 Incident 的後續 Telegram 訊息接回原告警卡片。
|
||||
|
||||
2026-05-07 Codex — 主卡 `tg_msg:{incident_id}` 已存在時,後續
|
||||
Runbook / escalation / 執行摘要不要再形成頂層訊息洪水,而是以
|
||||
Telegram reply thread 延續;主 ACTION REQUIRED 卡與已顯式 reply 的
|
||||
payload 不改動。
|
||||
"""
|
||||
if method != "sendMessage" or _has_reply_context(payload):
|
||||
return
|
||||
|
||||
text = str(payload.get("text") or "")
|
||||
if not text or _is_root_action_required_card(text):
|
||||
return
|
||||
|
||||
incident_id = _extract_incident_id_from_text(text)
|
||||
if not incident_id:
|
||||
return
|
||||
|
||||
try:
|
||||
stored = await get_redis().get(f"tg_msg:{incident_id}")
|
||||
except Exception as exc:
|
||||
logger.debug(
|
||||
"telegram_incident_thread_lookup_failed",
|
||||
incident_id=incident_id,
|
||||
error=str(exc),
|
||||
)
|
||||
return
|
||||
|
||||
if not stored:
|
||||
return
|
||||
|
||||
try:
|
||||
message_id = int(stored)
|
||||
except (TypeError, ValueError):
|
||||
logger.debug(
|
||||
"telegram_incident_thread_invalid_message_id",
|
||||
incident_id=incident_id,
|
||||
stored=str(stored),
|
||||
)
|
||||
return
|
||||
|
||||
payload["reply_parameters"] = {
|
||||
"message_id": message_id,
|
||||
"allow_sending_without_reply": True,
|
||||
}
|
||||
logger.info(
|
||||
"telegram_incident_thread_reply_attached",
|
||||
incident_id=incident_id,
|
||||
message_id=message_id,
|
||||
)
|
||||
|
||||
async def _mirror_outbound_message(
|
||||
self,
|
||||
*,
|
||||
|
||||
@@ -251,10 +251,79 @@ def test_outbound_message_type_inference():
|
||||
assert (
|
||||
telegram_gateway_module._infer_outbound_message_type(
|
||||
"✅ 執行成功",
|
||||
{"reply_to_message_id": 123},
|
||||
{"reply_parameters": {"message_id": 123}},
|
||||
)
|
||||
== "final"
|
||||
)
|
||||
assert (
|
||||
telegram_gateway_module._infer_outbound_message_type(
|
||||
"📄 <b>RUNBOOK REVIEW|待審核</b>",
|
||||
{"reply_parameters": {"message_id": 123}},
|
||||
)
|
||||
== "approval_request"
|
||||
)
|
||||
|
||||
|
||||
def test_extract_incident_id_from_text():
|
||||
"""Telegram 出站文字可擷取 Incident ID,供後續訊息接回原告警卡片。"""
|
||||
|
||||
assert (
|
||||
telegram_gateway_module._extract_incident_id_from_text(
|
||||
"Incident: INC-20260506-E54736\nEntry ID: abc"
|
||||
)
|
||||
== "INC-20260506-E54736"
|
||||
)
|
||||
assert telegram_gateway_module._extract_incident_id_from_text("沒有事件編號") is None
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_attach_incident_thread_reply(monkeypatch):
|
||||
"""非主卡、含 Incident ID 的後續訊息,應自動 reply 原告警 message_id。"""
|
||||
|
||||
class FakeRedis:
|
||||
async def get(self, key):
|
||||
assert key == "tg_msg:INC-20260506-E54736"
|
||||
return "9876"
|
||||
|
||||
gateway = TelegramGateway()
|
||||
payload = {
|
||||
"chat_id": gateway.alert_chat_id,
|
||||
"text": "📄 RUNBOOK REVIEW|待審核\nIncident: INC-20260506-E54736",
|
||||
}
|
||||
|
||||
monkeypatch.setattr(telegram_gateway_module, "get_redis", lambda: FakeRedis())
|
||||
|
||||
await gateway._attach_incident_thread_reply("sendMessage", payload)
|
||||
|
||||
assert payload["reply_parameters"] == {
|
||||
"message_id": 9876,
|
||||
"allow_sending_without_reply": True,
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_attach_incident_thread_skips_root_action_card(monkeypatch):
|
||||
"""ACTION REQUIRED 主告警卡不應自動 reply 舊訊息。"""
|
||||
|
||||
class FakeRedis:
|
||||
async def get(self, key): # pragma: no cover - 不應被呼叫
|
||||
raise AssertionError(key)
|
||||
|
||||
gateway = TelegramGateway()
|
||||
payload = {
|
||||
"chat_id": gateway.alert_chat_id,
|
||||
"text": (
|
||||
"ℹ️ ACTION REQUIRED | 低風險\n"
|
||||
"📋 INC-20260506-E54736\n"
|
||||
"🤖 AI 自動化鏈路"
|
||||
),
|
||||
}
|
||||
|
||||
monkeypatch.setattr(telegram_gateway_module, "get_redis", lambda: FakeRedis())
|
||||
|
||||
await gateway._attach_incident_thread_reply("sendMessage", payload)
|
||||
|
||||
assert "reply_parameters" not in payload
|
||||
|
||||
|
||||
def test_legacy_outbound_run_id_is_stable():
|
||||
|
||||
Reference in New Issue
Block a user