feat(telegram): record callback reply evidence
This commit is contained in:
@@ -73,6 +73,7 @@ _INCIDENT_ID_RE = re.compile(r"\bINC-\d{8}-[A-Z0-9]{4,}\b")
|
||||
_CODE_REF_RE = re.compile(r"<code>([0-9a-f]{7,12})</code>", re.IGNORECASE)
|
||||
_TELEGRAM_HTML_CHUNK_LIMIT = 3600
|
||||
_AWOOOP_WEB_BASE_URL = "https://awoooi.wooo.work"
|
||||
_AWOOOP_SOURCE_ENVELOPE_EXTRA_KEY = "_awooop_source_envelope_extra"
|
||||
|
||||
|
||||
def _top_gateway_bucket(
|
||||
@@ -472,6 +473,73 @@ def _outbound_source_envelope(method: str, payload: dict) -> dict[str, object]:
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def _callback_reply_source_envelope_extra(
|
||||
*,
|
||||
incident_id: str | None,
|
||||
failure_context: str,
|
||||
status: str,
|
||||
chunk_index: int,
|
||||
chunk_count: int,
|
||||
callback_action: str | None = None,
|
||||
parse_mode: str | None = None,
|
||||
error: str | None = None,
|
||||
) -> dict[str, object] | None:
|
||||
"""Build AwoooP metadata for Telegram detail/history callback replies."""
|
||||
if not incident_id:
|
||||
return None
|
||||
|
||||
action = callback_action or failure_context.removeprefix("incident_")
|
||||
callback_reply: dict[str, object] = {
|
||||
"schema_version": "telegram_callback_reply_v1",
|
||||
"status": status,
|
||||
"incident_id": incident_id,
|
||||
"action": action,
|
||||
"failure_context": failure_context,
|
||||
"chunk_index": chunk_index,
|
||||
"chunk_count": chunk_count,
|
||||
}
|
||||
if parse_mode:
|
||||
callback_reply["parse_mode"] = parse_mode
|
||||
if error:
|
||||
callback_reply["error"] = _sanitize_telegram_error(error)[:300]
|
||||
|
||||
return {
|
||||
"callback_reply": callback_reply,
|
||||
"source_refs": {
|
||||
"incident_ids": [incident_id],
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def _merge_outbound_source_envelope_extra(
|
||||
envelope: dict[str, object],
|
||||
extra: dict[str, object] | None,
|
||||
) -> dict[str, object]:
|
||||
"""Attach private AwoooP metadata after it has been stripped from Telegram payloads."""
|
||||
if not isinstance(extra, dict):
|
||||
return envelope
|
||||
|
||||
callback_reply = extra.get("callback_reply")
|
||||
if isinstance(callback_reply, dict):
|
||||
envelope["callback_reply"] = callback_reply
|
||||
|
||||
extra_refs = extra.get("source_refs")
|
||||
if isinstance(extra_refs, dict):
|
||||
source_refs = envelope.setdefault("source_refs", {})
|
||||
if isinstance(source_refs, dict):
|
||||
for key, values in extra_refs.items():
|
||||
if not isinstance(values, list):
|
||||
continue
|
||||
existing = source_refs.get(key)
|
||||
merged = list(existing) if isinstance(existing, list) else []
|
||||
for value in values:
|
||||
if value not in merged:
|
||||
merged.append(value)
|
||||
source_refs[key] = merged[:20]
|
||||
|
||||
return envelope
|
||||
|
||||
# 2026-04-27 Claude Sonnet 4.6: B3 — LLM 動態 Telegram 按鈕 Feature Flag
|
||||
# true → 優先使用 ActionPlan.recommended_actions 動態生成按鈕
|
||||
# false → 維持現有 callback_action_spec.yaml 路徑(預設,向下相容)
|
||||
@@ -1990,6 +2058,7 @@ class TelegramGateway:
|
||||
if not self._http_client:
|
||||
raise TelegramGatewayError("HTTP client not initialized")
|
||||
|
||||
source_envelope_extra = payload.pop(_AWOOOP_SOURCE_ENVELOPE_EXTRA_KEY, None)
|
||||
await self._attach_incident_thread_reply(method, payload)
|
||||
|
||||
url = f"{self.api_url}/{method}"
|
||||
@@ -2023,6 +2092,7 @@ class TelegramGateway:
|
||||
method=method,
|
||||
payload=payload,
|
||||
provider_message_id=str(result_val["message_id"]),
|
||||
source_envelope_extra=source_envelope_extra,
|
||||
)
|
||||
|
||||
span.set_status(trace.Status(trace.StatusCode.OK))
|
||||
@@ -2115,6 +2185,7 @@ class TelegramGateway:
|
||||
method: str,
|
||||
payload: dict,
|
||||
provider_message_id: str,
|
||||
source_envelope_extra: dict[str, object] | None = None,
|
||||
) -> None:
|
||||
"""將 legacy Telegram 出站訊息鏡像到 AwoooP,不改變實際發送行為。"""
|
||||
if method != "sendMessage":
|
||||
@@ -2141,7 +2212,10 @@ class TelegramGateway:
|
||||
channel_chat_id=chat_id,
|
||||
message_type=_infer_outbound_message_type(text, payload),
|
||||
content=text,
|
||||
source_envelope=_outbound_source_envelope(method, payload),
|
||||
source_envelope=_merge_outbound_source_envelope_extra(
|
||||
_outbound_source_envelope(method, payload),
|
||||
source_envelope_extra,
|
||||
),
|
||||
provider_message_id=provider_message_id,
|
||||
send_status="sent",
|
||||
triggered_by_state="legacy_gateway",
|
||||
@@ -2156,6 +2230,88 @@ class TelegramGateway:
|
||||
error=str(exc),
|
||||
)
|
||||
|
||||
async def _record_callback_reply_failure(
|
||||
self,
|
||||
*,
|
||||
chat_id: str,
|
||||
incident_id: str | None,
|
||||
failure_context: str,
|
||||
callback_action: str | None,
|
||||
chunk_index: int,
|
||||
chunk_count: int,
|
||||
error: Exception,
|
||||
) -> None:
|
||||
"""Persist callback reply delivery failures into AwoooP outbound evidence."""
|
||||
if not incident_id:
|
||||
return
|
||||
|
||||
safe_error = _sanitize_telegram_error(str(error))[:300]
|
||||
action = callback_action or failure_context.removeprefix("incident_")
|
||||
content = (
|
||||
"Telegram callback reply failed\n"
|
||||
f"Incident: {incident_id}\n"
|
||||
f"Action: {action}\n"
|
||||
f"Context: {failure_context}\n"
|
||||
f"Error: {safe_error}"
|
||||
)
|
||||
provider_message_id = (
|
||||
f"telegram_callback_reply:{incident_id}:{failure_context}:{chunk_index}:failed"
|
||||
)
|
||||
run_id = uuid5(
|
||||
NAMESPACE_URL,
|
||||
(
|
||||
"awoooi:telegram-callback-reply:"
|
||||
f"{chat_id}:{incident_id}:{failure_context}:{chunk_index}"
|
||||
),
|
||||
)
|
||||
source_envelope = _merge_outbound_source_envelope_extra(
|
||||
_outbound_source_envelope(
|
||||
"sendMessage",
|
||||
{
|
||||
"chat_id": chat_id,
|
||||
"text": content,
|
||||
},
|
||||
),
|
||||
_callback_reply_source_envelope_extra(
|
||||
incident_id=incident_id,
|
||||
failure_context=failure_context,
|
||||
status="callback_reply_failed",
|
||||
chunk_index=chunk_index,
|
||||
chunk_count=chunk_count,
|
||||
callback_action=callback_action,
|
||||
error=safe_error,
|
||||
),
|
||||
)
|
||||
|
||||
try:
|
||||
from src.core.context import get_current_project_id
|
||||
from src.db.base import get_db_context
|
||||
from src.services.channel_hub import record_outbound_message
|
||||
|
||||
project_id = get_current_project_id() or "awoooi"
|
||||
async with get_db_context(project_id) as db:
|
||||
await record_outbound_message(
|
||||
db,
|
||||
project_id=project_id,
|
||||
run_id=run_id,
|
||||
channel_type="telegram",
|
||||
channel_chat_id=chat_id,
|
||||
message_type="error",
|
||||
content=content,
|
||||
source_envelope=source_envelope,
|
||||
provider_message_id=provider_message_id,
|
||||
send_status="failed",
|
||||
triggered_by_state="telegram_callback_reply",
|
||||
is_shadow=False,
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
"telegram_callback_reply_failure_record_failed",
|
||||
incident_id=incident_id,
|
||||
failure_context=failure_context,
|
||||
error=str(exc),
|
||||
)
|
||||
|
||||
async def _build_inline_keyboard(
|
||||
self,
|
||||
approval_id: str,
|
||||
@@ -5587,6 +5743,8 @@ class TelegramGateway:
|
||||
lines,
|
||||
failure_context="incident_detail",
|
||||
reply_markup=_awooop_runs_reply_markup(incident_id),
|
||||
incident_id=incident_id,
|
||||
callback_action="detail",
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
@@ -5726,6 +5884,8 @@ class TelegramGateway:
|
||||
lines,
|
||||
failure_context="incident_history",
|
||||
reply_markup=_awooop_runs_reply_markup(incident_id),
|
||||
incident_id=incident_id,
|
||||
callback_action="history",
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
@@ -5962,16 +6122,30 @@ class TelegramGateway:
|
||||
chat_id: str | int | None = None,
|
||||
failure_context: str,
|
||||
reply_markup: dict | None = None,
|
||||
incident_id: str | None = None,
|
||||
callback_action: str | None = None,
|
||||
) -> None:
|
||||
"""Send a multi-line HTML message without cutting Telegram tags in half."""
|
||||
chunks = _telegram_html_chunks(lines)
|
||||
actual_chat_id = str(chat_id or self.alert_chat_id)
|
||||
for index, chunk in enumerate(chunks):
|
||||
try:
|
||||
payload: dict = {
|
||||
"chat_id": chat_id or self.alert_chat_id,
|
||||
"chat_id": actual_chat_id,
|
||||
"text": chunk,
|
||||
"parse_mode": "HTML",
|
||||
}
|
||||
source_extra = _callback_reply_source_envelope_extra(
|
||||
incident_id=incident_id,
|
||||
failure_context=failure_context,
|
||||
status="callback_reply_sent",
|
||||
chunk_index=index,
|
||||
chunk_count=len(chunks),
|
||||
callback_action=callback_action,
|
||||
parse_mode="HTML",
|
||||
)
|
||||
if source_extra:
|
||||
payload[_AWOOOP_SOURCE_ENVELOPE_EXTRA_KEY] = source_extra
|
||||
if index == 0 and reply_markup:
|
||||
payload["reply_markup"] = reply_markup
|
||||
await self._send_request(
|
||||
@@ -5987,9 +6161,21 @@ class TelegramGateway:
|
||||
error=str(exc),
|
||||
)
|
||||
fallback_payload: dict = {
|
||||
"chat_id": chat_id or self.alert_chat_id,
|
||||
"chat_id": actual_chat_id,
|
||||
"text": _plain_text_from_html(chunk),
|
||||
}
|
||||
fallback_source_extra = _callback_reply_source_envelope_extra(
|
||||
incident_id=incident_id,
|
||||
failure_context=failure_context,
|
||||
status="callback_reply_fallback_sent",
|
||||
chunk_index=index,
|
||||
chunk_count=len(chunks),
|
||||
callback_action=callback_action,
|
||||
parse_mode="plain_text",
|
||||
error=str(exc),
|
||||
)
|
||||
if fallback_source_extra:
|
||||
fallback_payload[_AWOOOP_SOURCE_ENVELOPE_EXTRA_KEY] = fallback_source_extra
|
||||
if index == 0 and reply_markup:
|
||||
fallback_payload["reply_markup"] = reply_markup
|
||||
try:
|
||||
@@ -6006,10 +6192,22 @@ class TelegramGateway:
|
||||
error=str(fallback_exc),
|
||||
)
|
||||
rescue_payload: dict = {
|
||||
"chat_id": chat_id or self.alert_chat_id,
|
||||
"chat_id": actual_chat_id,
|
||||
"text": _plain_text_from_html(chunk, limit=3500),
|
||||
"_skip_incident_thread_reply": True,
|
||||
}
|
||||
rescue_source_extra = _callback_reply_source_envelope_extra(
|
||||
incident_id=incident_id,
|
||||
failure_context=failure_context,
|
||||
status="callback_reply_rescue_sent",
|
||||
chunk_index=index,
|
||||
chunk_count=len(chunks),
|
||||
callback_action=callback_action,
|
||||
parse_mode="plain_text",
|
||||
error=str(fallback_exc),
|
||||
)
|
||||
if rescue_source_extra:
|
||||
rescue_payload[_AWOOOP_SOURCE_ENVELOPE_EXTRA_KEY] = rescue_source_extra
|
||||
try:
|
||||
await self._send_request(
|
||||
"sendMessage",
|
||||
@@ -6023,6 +6221,15 @@ class TelegramGateway:
|
||||
chunk_count=len(chunks),
|
||||
error=str(rescue_exc),
|
||||
)
|
||||
await self._record_callback_reply_failure(
|
||||
chat_id=actual_chat_id,
|
||||
incident_id=incident_id,
|
||||
failure_context=failure_context,
|
||||
callback_action=callback_action,
|
||||
chunk_index=index,
|
||||
chunk_count=len(chunks),
|
||||
error=rescue_exc,
|
||||
)
|
||||
|
||||
async def send_alert_notification(
|
||||
self,
|
||||
|
||||
@@ -121,6 +121,65 @@ async def test_build_inline_keyboard_includes_awooop_deep_link() -> None:
|
||||
}]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_request_strips_awooop_callback_metadata_before_telegram_api(monkeypatch):
|
||||
"""AwoooP truth-chain metadata must be mirrored, not sent to Telegram Bot API."""
|
||||
captured = {}
|
||||
gateway = TelegramGateway()
|
||||
gateway._initialized = True
|
||||
|
||||
class FakeResponse:
|
||||
def raise_for_status(self):
|
||||
return None
|
||||
|
||||
def json(self):
|
||||
return {"ok": True, "result": {"message_id": 456}}
|
||||
|
||||
class FakeClient:
|
||||
async def post(self, url, json):
|
||||
captured["payload"] = dict(json)
|
||||
return FakeResponse()
|
||||
|
||||
async def fake_mirror_outbound_message(
|
||||
*,
|
||||
method,
|
||||
payload,
|
||||
provider_message_id,
|
||||
source_envelope_extra=None,
|
||||
):
|
||||
captured["mirror"] = {
|
||||
"method": method,
|
||||
"payload": dict(payload),
|
||||
"provider_message_id": provider_message_id,
|
||||
"source_envelope_extra": source_envelope_extra,
|
||||
}
|
||||
|
||||
gateway._http_client = FakeClient()
|
||||
monkeypatch.setattr(gateway, "_mirror_outbound_message", fake_mirror_outbound_message)
|
||||
|
||||
result = await gateway._send_request(
|
||||
"sendMessage",
|
||||
{
|
||||
"chat_id": "chat",
|
||||
"text": "事件歷史統計 INC-20260513-79ED5E",
|
||||
"_skip_incident_thread_reply": True,
|
||||
"_awooop_source_envelope_extra": {
|
||||
"callback_reply": {
|
||||
"status": "callback_reply_sent",
|
||||
"incident_id": "INC-20260513-79ED5E",
|
||||
},
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
assert result["ok"] is True
|
||||
assert "_awooop_source_envelope_extra" not in captured["payload"]
|
||||
assert "_skip_incident_thread_reply" not in captured["payload"]
|
||||
assert captured["mirror"]["source_envelope_extra"]["callback_reply"]["status"] == (
|
||||
"callback_reply_sent"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_html_line_message_falls_back_to_plain_text_on_parse_error(monkeypatch):
|
||||
"""Telegram HTML parse 400 時要送純文字 fallback,不可回報成歷史查詢失敗。"""
|
||||
@@ -152,6 +211,41 @@ async def test_send_html_line_message_falls_back_to_plain_text_on_parse_error(mo
|
||||
assert "blocked" in sent_requests[1][1]["text"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_html_line_message_marks_callback_reply_evidence(monkeypatch):
|
||||
"""詳情/歷史 callback reply 要在 AwoooP envelope 標明 sent/fallback 狀態。"""
|
||||
sent_requests = []
|
||||
gateway = TelegramGateway()
|
||||
reply_markup = telegram_gateway_module._awooop_runs_reply_markup("INC-20260514-F85F21")
|
||||
|
||||
async def fake_send_request(method, payload):
|
||||
sent_requests.append((method, payload))
|
||||
if payload.get("parse_mode") == "HTML":
|
||||
raise telegram_gateway_module.TelegramGatewayError("HTTP error: 400")
|
||||
return {"ok": True}
|
||||
|
||||
monkeypatch.setattr(gateway, "_send_request", fake_send_request)
|
||||
|
||||
await gateway._send_html_line_message(
|
||||
["📊 <b>事件歷史統計</b>", "🔖 <code>INC-20260514-F85F21</code>"],
|
||||
chat_id="chat",
|
||||
failure_context="incident_history",
|
||||
reply_markup=reply_markup,
|
||||
incident_id="INC-20260514-F85F21",
|
||||
callback_action="history",
|
||||
)
|
||||
|
||||
first_extra = sent_requests[0][1]["_awooop_source_envelope_extra"]["callback_reply"]
|
||||
fallback_extra = sent_requests[1][1]["_awooop_source_envelope_extra"]["callback_reply"]
|
||||
|
||||
assert first_extra["status"] == "callback_reply_sent"
|
||||
assert first_extra["action"] == "history"
|
||||
assert first_extra["parse_mode"] == "HTML"
|
||||
assert fallback_extra["status"] == "callback_reply_fallback_sent"
|
||||
assert fallback_extra["incident_id"] == "INC-20260514-F85F21"
|
||||
assert fallback_extra["parse_mode"] == "plain_text"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_html_line_message_uses_rescue_when_markup_fallback_fails(monkeypatch):
|
||||
"""詳情/歷史 fallback 若仍被 Telegram 拒收,要再送無按鈕純文字救援。"""
|
||||
|
||||
Reference in New Issue
Block a user