feat(telegram): record callback reply evidence
All checks were successful
Code Review / ai-code-review (push) Successful in 11s
CD Pipeline / tests (push) Successful in 1m13s
CD Pipeline / build-and-deploy (push) Successful in 3m21s
CD Pipeline / post-deploy-checks (push) Successful in 1m19s

This commit is contained in:
Your Name
2026-05-18 14:40:47 +08:00
parent e9e6cda06e
commit c97230252a
2 changed files with 305 additions and 4 deletions

View File

@@ -73,6 +73,7 @@ _INCIDENT_ID_RE = re.compile(r"\bINC-\d{8}-[A-Z0-9]{4,}\b")
_CODE_REF_RE = re.compile(r"<code>([0-9a-f]{7,12})</code>", re.IGNORECASE)
_TELEGRAM_HTML_CHUNK_LIMIT = 3600
_AWOOOP_WEB_BASE_URL = "https://awoooi.wooo.work"
_AWOOOP_SOURCE_ENVELOPE_EXTRA_KEY = "_awooop_source_envelope_extra"
def _top_gateway_bucket(
@@ -472,6 +473,73 @@ def _outbound_source_envelope(method: str, payload: dict) -> dict[str, object]:
},
}
def _callback_reply_source_envelope_extra(
*,
incident_id: str | None,
failure_context: str,
status: str,
chunk_index: int,
chunk_count: int,
callback_action: str | None = None,
parse_mode: str | None = None,
error: str | None = None,
) -> dict[str, object] | None:
"""Build AwoooP metadata for Telegram detail/history callback replies."""
if not incident_id:
return None
action = callback_action or failure_context.removeprefix("incident_")
callback_reply: dict[str, object] = {
"schema_version": "telegram_callback_reply_v1",
"status": status,
"incident_id": incident_id,
"action": action,
"failure_context": failure_context,
"chunk_index": chunk_index,
"chunk_count": chunk_count,
}
if parse_mode:
callback_reply["parse_mode"] = parse_mode
if error:
callback_reply["error"] = _sanitize_telegram_error(error)[:300]
return {
"callback_reply": callback_reply,
"source_refs": {
"incident_ids": [incident_id],
},
}
def _merge_outbound_source_envelope_extra(
envelope: dict[str, object],
extra: dict[str, object] | None,
) -> dict[str, object]:
"""Attach private AwoooP metadata after it has been stripped from Telegram payloads."""
if not isinstance(extra, dict):
return envelope
callback_reply = extra.get("callback_reply")
if isinstance(callback_reply, dict):
envelope["callback_reply"] = callback_reply
extra_refs = extra.get("source_refs")
if isinstance(extra_refs, dict):
source_refs = envelope.setdefault("source_refs", {})
if isinstance(source_refs, dict):
for key, values in extra_refs.items():
if not isinstance(values, list):
continue
existing = source_refs.get(key)
merged = list(existing) if isinstance(existing, list) else []
for value in values:
if value not in merged:
merged.append(value)
source_refs[key] = merged[:20]
return envelope
# 2026-04-27 Claude Sonnet 4.6: B3 — LLM 動態 Telegram 按鈕 Feature Flag
# true → 優先使用 ActionPlan.recommended_actions 動態生成按鈕
# false → 維持現有 callback_action_spec.yaml 路徑(預設,向下相容)
@@ -1990,6 +2058,7 @@ class TelegramGateway:
if not self._http_client:
raise TelegramGatewayError("HTTP client not initialized")
source_envelope_extra = payload.pop(_AWOOOP_SOURCE_ENVELOPE_EXTRA_KEY, None)
await self._attach_incident_thread_reply(method, payload)
url = f"{self.api_url}/{method}"
@@ -2023,6 +2092,7 @@ class TelegramGateway:
method=method,
payload=payload,
provider_message_id=str(result_val["message_id"]),
source_envelope_extra=source_envelope_extra,
)
span.set_status(trace.Status(trace.StatusCode.OK))
@@ -2115,6 +2185,7 @@ class TelegramGateway:
method: str,
payload: dict,
provider_message_id: str,
source_envelope_extra: dict[str, object] | None = None,
) -> None:
"""將 legacy Telegram 出站訊息鏡像到 AwoooP不改變實際發送行為。"""
if method != "sendMessage":
@@ -2141,7 +2212,10 @@ class TelegramGateway:
channel_chat_id=chat_id,
message_type=_infer_outbound_message_type(text, payload),
content=text,
source_envelope=_outbound_source_envelope(method, payload),
source_envelope=_merge_outbound_source_envelope_extra(
_outbound_source_envelope(method, payload),
source_envelope_extra,
),
provider_message_id=provider_message_id,
send_status="sent",
triggered_by_state="legacy_gateway",
@@ -2156,6 +2230,88 @@ class TelegramGateway:
error=str(exc),
)
async def _record_callback_reply_failure(
self,
*,
chat_id: str,
incident_id: str | None,
failure_context: str,
callback_action: str | None,
chunk_index: int,
chunk_count: int,
error: Exception,
) -> None:
"""Persist callback reply delivery failures into AwoooP outbound evidence."""
if not incident_id:
return
safe_error = _sanitize_telegram_error(str(error))[:300]
action = callback_action or failure_context.removeprefix("incident_")
content = (
"Telegram callback reply failed\n"
f"Incident: {incident_id}\n"
f"Action: {action}\n"
f"Context: {failure_context}\n"
f"Error: {safe_error}"
)
provider_message_id = (
f"telegram_callback_reply:{incident_id}:{failure_context}:{chunk_index}:failed"
)
run_id = uuid5(
NAMESPACE_URL,
(
"awoooi:telegram-callback-reply:"
f"{chat_id}:{incident_id}:{failure_context}:{chunk_index}"
),
)
source_envelope = _merge_outbound_source_envelope_extra(
_outbound_source_envelope(
"sendMessage",
{
"chat_id": chat_id,
"text": content,
},
),
_callback_reply_source_envelope_extra(
incident_id=incident_id,
failure_context=failure_context,
status="callback_reply_failed",
chunk_index=chunk_index,
chunk_count=chunk_count,
callback_action=callback_action,
error=safe_error,
),
)
try:
from src.core.context import get_current_project_id
from src.db.base import get_db_context
from src.services.channel_hub import record_outbound_message
project_id = get_current_project_id() or "awoooi"
async with get_db_context(project_id) as db:
await record_outbound_message(
db,
project_id=project_id,
run_id=run_id,
channel_type="telegram",
channel_chat_id=chat_id,
message_type="error",
content=content,
source_envelope=source_envelope,
provider_message_id=provider_message_id,
send_status="failed",
triggered_by_state="telegram_callback_reply",
is_shadow=False,
)
except Exception as exc:
logger.warning(
"telegram_callback_reply_failure_record_failed",
incident_id=incident_id,
failure_context=failure_context,
error=str(exc),
)
async def _build_inline_keyboard(
self,
approval_id: str,
@@ -5587,6 +5743,8 @@ class TelegramGateway:
lines,
failure_context="incident_detail",
reply_markup=_awooop_runs_reply_markup(incident_id),
incident_id=incident_id,
callback_action="detail",
)
except Exception as e:
@@ -5726,6 +5884,8 @@ class TelegramGateway:
lines,
failure_context="incident_history",
reply_markup=_awooop_runs_reply_markup(incident_id),
incident_id=incident_id,
callback_action="history",
)
except Exception as e:
@@ -5962,16 +6122,30 @@ class TelegramGateway:
chat_id: str | int | None = None,
failure_context: str,
reply_markup: dict | None = None,
incident_id: str | None = None,
callback_action: str | None = None,
) -> None:
"""Send a multi-line HTML message without cutting Telegram tags in half."""
chunks = _telegram_html_chunks(lines)
actual_chat_id = str(chat_id or self.alert_chat_id)
for index, chunk in enumerate(chunks):
try:
payload: dict = {
"chat_id": chat_id or self.alert_chat_id,
"chat_id": actual_chat_id,
"text": chunk,
"parse_mode": "HTML",
}
source_extra = _callback_reply_source_envelope_extra(
incident_id=incident_id,
failure_context=failure_context,
status="callback_reply_sent",
chunk_index=index,
chunk_count=len(chunks),
callback_action=callback_action,
parse_mode="HTML",
)
if source_extra:
payload[_AWOOOP_SOURCE_ENVELOPE_EXTRA_KEY] = source_extra
if index == 0 and reply_markup:
payload["reply_markup"] = reply_markup
await self._send_request(
@@ -5987,9 +6161,21 @@ class TelegramGateway:
error=str(exc),
)
fallback_payload: dict = {
"chat_id": chat_id or self.alert_chat_id,
"chat_id": actual_chat_id,
"text": _plain_text_from_html(chunk),
}
fallback_source_extra = _callback_reply_source_envelope_extra(
incident_id=incident_id,
failure_context=failure_context,
status="callback_reply_fallback_sent",
chunk_index=index,
chunk_count=len(chunks),
callback_action=callback_action,
parse_mode="plain_text",
error=str(exc),
)
if fallback_source_extra:
fallback_payload[_AWOOOP_SOURCE_ENVELOPE_EXTRA_KEY] = fallback_source_extra
if index == 0 and reply_markup:
fallback_payload["reply_markup"] = reply_markup
try:
@@ -6006,10 +6192,22 @@ class TelegramGateway:
error=str(fallback_exc),
)
rescue_payload: dict = {
"chat_id": chat_id or self.alert_chat_id,
"chat_id": actual_chat_id,
"text": _plain_text_from_html(chunk, limit=3500),
"_skip_incident_thread_reply": True,
}
rescue_source_extra = _callback_reply_source_envelope_extra(
incident_id=incident_id,
failure_context=failure_context,
status="callback_reply_rescue_sent",
chunk_index=index,
chunk_count=len(chunks),
callback_action=callback_action,
parse_mode="plain_text",
error=str(fallback_exc),
)
if rescue_source_extra:
rescue_payload[_AWOOOP_SOURCE_ENVELOPE_EXTRA_KEY] = rescue_source_extra
try:
await self._send_request(
"sendMessage",
@@ -6023,6 +6221,15 @@ class TelegramGateway:
chunk_count=len(chunks),
error=str(rescue_exc),
)
await self._record_callback_reply_failure(
chat_id=actual_chat_id,
incident_id=incident_id,
failure_context=failure_context,
callback_action=callback_action,
chunk_index=index,
chunk_count=len(chunks),
error=rescue_exc,
)
async def send_alert_notification(
self,

View File

@@ -121,6 +121,65 @@ async def test_build_inline_keyboard_includes_awooop_deep_link() -> None:
}]
@pytest.mark.asyncio
async def test_send_request_strips_awooop_callback_metadata_before_telegram_api(monkeypatch):
"""AwoooP truth-chain metadata must be mirrored, not sent to Telegram Bot API."""
captured = {}
gateway = TelegramGateway()
gateway._initialized = True
class FakeResponse:
def raise_for_status(self):
return None
def json(self):
return {"ok": True, "result": {"message_id": 456}}
class FakeClient:
async def post(self, url, json):
captured["payload"] = dict(json)
return FakeResponse()
async def fake_mirror_outbound_message(
*,
method,
payload,
provider_message_id,
source_envelope_extra=None,
):
captured["mirror"] = {
"method": method,
"payload": dict(payload),
"provider_message_id": provider_message_id,
"source_envelope_extra": source_envelope_extra,
}
gateway._http_client = FakeClient()
monkeypatch.setattr(gateway, "_mirror_outbound_message", fake_mirror_outbound_message)
result = await gateway._send_request(
"sendMessage",
{
"chat_id": "chat",
"text": "事件歷史統計 INC-20260513-79ED5E",
"_skip_incident_thread_reply": True,
"_awooop_source_envelope_extra": {
"callback_reply": {
"status": "callback_reply_sent",
"incident_id": "INC-20260513-79ED5E",
},
},
},
)
assert result["ok"] is True
assert "_awooop_source_envelope_extra" not in captured["payload"]
assert "_skip_incident_thread_reply" not in captured["payload"]
assert captured["mirror"]["source_envelope_extra"]["callback_reply"]["status"] == (
"callback_reply_sent"
)
@pytest.mark.asyncio
async def test_send_html_line_message_falls_back_to_plain_text_on_parse_error(monkeypatch):
"""Telegram HTML parse 400 時要送純文字 fallback不可回報成歷史查詢失敗。"""
@@ -152,6 +211,41 @@ async def test_send_html_line_message_falls_back_to_plain_text_on_parse_error(mo
assert "blocked" in sent_requests[1][1]["text"]
@pytest.mark.asyncio
async def test_send_html_line_message_marks_callback_reply_evidence(monkeypatch):
"""詳情/歷史 callback reply 要在 AwoooP envelope 標明 sent/fallback 狀態。"""
sent_requests = []
gateway = TelegramGateway()
reply_markup = telegram_gateway_module._awooop_runs_reply_markup("INC-20260514-F85F21")
async def fake_send_request(method, payload):
sent_requests.append((method, payload))
if payload.get("parse_mode") == "HTML":
raise telegram_gateway_module.TelegramGatewayError("HTTP error: 400")
return {"ok": True}
monkeypatch.setattr(gateway, "_send_request", fake_send_request)
await gateway._send_html_line_message(
["📊 <b>事件歷史統計</b>", "🔖 <code>INC-20260514-F85F21</code>"],
chat_id="chat",
failure_context="incident_history",
reply_markup=reply_markup,
incident_id="INC-20260514-F85F21",
callback_action="history",
)
first_extra = sent_requests[0][1]["_awooop_source_envelope_extra"]["callback_reply"]
fallback_extra = sent_requests[1][1]["_awooop_source_envelope_extra"]["callback_reply"]
assert first_extra["status"] == "callback_reply_sent"
assert first_extra["action"] == "history"
assert first_extra["parse_mode"] == "HTML"
assert fallback_extra["status"] == "callback_reply_fallback_sent"
assert fallback_extra["incident_id"] == "INC-20260514-F85F21"
assert fallback_extra["parse_mode"] == "plain_text"
@pytest.mark.asyncio
async def test_send_html_line_message_uses_rescue_when_markup_fallback_fails(monkeypatch):
"""詳情/歷史 fallback 若仍被 Telegram 拒收,要再送無按鈕純文字救援。"""