179 lines
6.6 KiB
Python
179 lines
6.6 KiB
Python
from __future__ import annotations
|
||
|
||
from src.services.telegram_gateway import (
|
||
format_aiops_signal_alert_card,
|
||
_outbound_source_envelope,
|
||
_sanitize_telegram_error,
|
||
)
|
||
|
||
|
||
def test_telegram_gateway_sanitizes_bot_token_url() -> None:
|
||
raw = "Client error for https://api.telegram.org/bot123456:SECRET/sendMessage"
|
||
|
||
sanitized = _sanitize_telegram_error(raw)
|
||
|
||
assert "SECRET" not in sanitized
|
||
assert "bot<redacted>" in sanitized
|
||
|
||
|
||
def test_outbound_source_envelope_keeps_replay_context_without_raw_payload() -> None:
|
||
payload = {
|
||
"chat_id": "-100123",
|
||
"text": (
|
||
"ACTION REQUIRED INC-20260513-9B082D "
|
||
"<code>7f858956</code> token "
|
||
"1234567890:abcdefghijklmnopqrstuvwxyzABCDEFGH"
|
||
),
|
||
"parse_mode": "HTML",
|
||
"reply_markup": {
|
||
"inline_keyboard": [
|
||
[
|
||
{"text": "批准", "callback_data": "approve:approval-id-secret"},
|
||
{"text": "詳情", "callback_data": "details:approval-id-secret"},
|
||
]
|
||
]
|
||
},
|
||
}
|
||
|
||
envelope = _outbound_source_envelope("sendMessage", payload)
|
||
|
||
assert envelope["adapter"] == "legacy_telegram_gateway"
|
||
assert envelope["method"] == "sendMessage"
|
||
assert envelope["payload_sha256"]
|
||
assert envelope["payload_keys"] == ["chat_id", "parse_mode", "reply_markup", "text"]
|
||
assert envelope["parse_mode"] == "HTML"
|
||
assert envelope["reply_markup"]["button_count"] == 2
|
||
assert envelope["reply_markup"]["buttons"][0]["callback_prefix"] == "approve"
|
||
assert envelope["reply_markup"]["buttons"][1]["callback_prefix"] == "details"
|
||
assert envelope["source_refs"]["incident_ids"] == ["INC-20260513-9B082D"]
|
||
assert envelope["source_refs"]["code_refs"] == ["7f858956"]
|
||
assert "approval-id-secret" not in str(envelope)
|
||
assert "1234567890:" not in str(envelope)
|
||
assert "ACTION REQUIRED" not in str(envelope)
|
||
|
||
|
||
def test_outbound_source_envelope_reads_incident_refs_from_buttons() -> None:
|
||
payload = {
|
||
"chat_id": "-100123",
|
||
"text": "ACTION REQUIRED without incident id in visible text",
|
||
"reply_markup": {
|
||
"inline_keyboard": [
|
||
[
|
||
{"text": "詳情", "callback_data": "detail:INC-20260525-ABC123"},
|
||
{"text": "歷史", "callback_data": "history:INC-20260525-ABC123"},
|
||
],
|
||
[
|
||
{"text": "重診", "callback_data": "reanalyze:INC-20260525-DEF456"},
|
||
],
|
||
],
|
||
},
|
||
}
|
||
|
||
envelope = _outbound_source_envelope("sendMessage", payload)
|
||
|
||
assert envelope["source_refs"]["incident_ids"] == [
|
||
"INC-20260525-ABC123",
|
||
"INC-20260525-DEF456",
|
||
]
|
||
assert envelope["reply_markup"]["buttons"][0]["callback_prefix"] == "detail"
|
||
assert "detail:INC-20260525-ABC123" not in str(envelope)
|
||
|
||
|
||
def test_outbound_source_envelope_reads_meta_event_ref_from_text() -> None:
|
||
payload = {
|
||
"chat_id": "-100123",
|
||
"text": (
|
||
"⚙️ META SYSTEM\n"
|
||
"📋 <code>META-20260525201300</code>\n"
|
||
"異常元件:AI 自健診異常"
|
||
),
|
||
"parse_mode": "HTML",
|
||
"reply_markup": {
|
||
"inline_keyboard": [
|
||
[{"text": "靜默", "callback_data": "silence:opaque-secret"}]
|
||
],
|
||
},
|
||
}
|
||
|
||
envelope = _outbound_source_envelope("sendMessage", payload)
|
||
|
||
assert envelope["source_refs"]["event_ids"] == ["META-20260525201300"]
|
||
assert envelope["source_refs"]["incident_ids"] == []
|
||
assert envelope["reply_markup"]["buttons"][0]["callback_prefix"] == "silence"
|
||
assert "opaque-secret" not in str(envelope)
|
||
|
||
|
||
def test_outbound_source_envelope_reads_ai_advisory_refs_without_raw_callback() -> None:
|
||
payload = {
|
||
"chat_id": "-100123",
|
||
"text": "Coverage 缺口分析",
|
||
"reply_markup": {
|
||
"inline_keyboard": [
|
||
[
|
||
{
|
||
"text": "已處理",
|
||
"callback_data": (
|
||
"ai_advisory_handled:coverage_gap:auto_rule_creation"
|
||
),
|
||
},
|
||
{
|
||
"text": "忽略",
|
||
"callback_data": (
|
||
"ai_advisory_snooze:coverage_gap:auto_rule_creation"
|
||
),
|
||
},
|
||
]
|
||
],
|
||
},
|
||
}
|
||
|
||
envelope = _outbound_source_envelope("sendMessage", payload)
|
||
|
||
assert envelope["source_refs"]["event_ids"] == [
|
||
"ai_advisory:coverage_gap:auto_rule_creation"
|
||
]
|
||
assert envelope["source_refs"]["advisory_ids"] == [
|
||
"coverage_gap:auto_rule_creation"
|
||
]
|
||
assert envelope["source_refs"]["alert_ids"] == ["coverage_gap"]
|
||
assert envelope["source_refs"]["fingerprints"] == [
|
||
"ai_advisory:coverage_gap:auto_rule_creation"
|
||
]
|
||
assert "ai_advisory_handled:coverage_gap:auto_rule_creation" not in str(envelope)
|
||
|
||
|
||
def test_outbound_source_envelope_marks_wazuh_ai_alert_card_for_awooop_readback() -> None:
|
||
raw_alert = """
|
||
wazuh_dashboard_api_readback_degraded dashboard agent list disappeared
|
||
POST /api/check-stored-api status=429 POST /api/check-api status=500
|
||
https://127.0.0.1:55000 is unreachable manager registry readback blocked
|
||
full_log=/var/ossec/logs/alerts/alerts.json Authorization: Bearer abcdefghijklmnopqrstuvwxyz
|
||
"""
|
||
card = format_aiops_signal_alert_card(raw_alert)
|
||
payload = {
|
||
"chat_id": "-100123",
|
||
"text": card,
|
||
"parse_mode": "HTML",
|
||
}
|
||
|
||
envelope = _outbound_source_envelope("sendMessage", payload)
|
||
|
||
card_metadata = envelope["ai_automation_alert_card"]
|
||
assert card_metadata["schema_version"] == "ai_automation_alert_card_mirror_v1"
|
||
assert card_metadata["card_schema"] == "ai_automation_alert_card_v1"
|
||
assert card_metadata["event_type"] == "wazuh_dashboard_api_readback_degraded"
|
||
assert card_metadata["lane"] == "siem_observability_readback_degraded"
|
||
assert card_metadata["target"] == "wazuh_dashboard_api"
|
||
assert card_metadata["gates"] == ["candidate_only", "runtime_write_gate=0"]
|
||
assert card_metadata["runtime_write_gate_count"] == 0
|
||
assert card_metadata["delivery_receipt_readback_required"] is True
|
||
assert envelope["source_refs"]["alert_ids"] == [
|
||
"wazuh_dashboard_api_readback_degraded"
|
||
]
|
||
assert envelope["source_refs"]["fingerprints"] == [
|
||
"ai_automation_alert_card:wazuh_dashboard_api_readback_degraded:siem_observability_readback_degraded"
|
||
]
|
||
assert "127.0.0.1:55000" not in str(envelope)
|
||
assert "/var/ossec" not in str(envelope)
|
||
assert "abcdefghijkl" not in str(envelope)
|