#!/usr/bin/env python3 """驗證 Telegram 告警最後出口維持可讀、脫敏與 AI 事件卡格式。 本 guard 只讀 repo 內 source、test 與 snapshot。它不送 Telegram、不呼叫 Bot API、不讀 secret、不連線主機,也不啟動任何 runtime gate。 """ from __future__ import annotations import argparse import json import subprocess from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Any TAIPEI = timezone(timedelta(hours=8)) SNAPSHOT_PATH = Path("docs/security/telegram-alert-readability-guard.snapshot.json") SOURCE_PATH = Path("apps/api/src/services/telegram_gateway.py") TEST_PATH = Path("apps/api/tests/test_telegram_message_templates.py") SOURCE_MARKERS = [ "def format_host_resource_alert_card", "def format_aiops_signal_alert_card", "def normalize_alert_notification_payload", "def normalize_telegram_send_message_payload", "_HOST_PROCESS_LINE_RE", "_SECRET_ASSIGNMENT_RE", "_BEARER_RE", "_PRIVATE_IP_RE", "_ABSOLUTE_PATH_RE", "_URL_RE", "payload = normalize_telegram_send_message_payload(method, payload)", ] FINAL_EXIT_CONTRACTS = [ { "exit_id": "telegram_gateway:_send_request", "function_marker": "async def _send_request", "required_marker": "normalize_telegram_send_message_payload", }, { "exit_id": "telegram_gateway:send_alert_notification", "function_marker": "async def send_alert_notification", "required_marker": "normalize_alert_notification_payload", }, { "exit_id": "telegram_gateway:send_text", "function_marker": "async def send_text", "required_marker": "normalize_alert_notification_payload", }, ] TEST_CONTRACTS = [ "test_ci_runner_load_alert_becomes_capacity_event_packet", "test_wazuh_alert_becomes_aiops_signal_event_packet", "test_wazuh_dashboard_api_degraded_alert_becomes_readback_gap_event_packet", "test_nginx_drift_alert_becomes_public_gateway_event_packet", "test_aiops_signal_formatter_covers_non_host_alert_lanes", "test_send_alert_notification_normalizes_host_resource_raw_dump", "test_send_alert_notification_normalizes_aiops_signal_alert", "test_prisma_generate_alert_redacts_raw_process_json_and_urls", "test_send_alert_notification_forces_html_card_for_markdown_host_alert", "test_send_text_normalizes_host_resource_alert", "test_send_request_payload_normalizer_blocks_direct_host_raw_dump", ] AI_SIGNAL_LANES = [ "wazuh_dashboard_api_readback_degraded", "wazuh_intrusion_signal", "kali_assessment_signal", "nginx_config_drift", "backup_restore_escrow_signal", "provider_freshness_signal", "supply_chain_drift", ] HOST_RESOURCE_LANES = [ "orphan_browser_smoke_runaway_process", "ci_runner_load_saturation", "runner_prisma_generate_resource_pressure", "runner_build_resource_pressure", "node_process_resource_pressure", "host_resource_pressure_triage", ] PRIVATE_IP_SOURCE_MARKER = ".".join(["192", "168", "0", "110"]) CHECKPOINT_SOURCE_MARKER = ".".join(["checkpoint", "prisma", "io"]) TOOLCACHE_SOURCE_MARKER = "/" + "/".join(["opt", "hostedtoolcache"]) WORKSPACE_SOURCE_MARKER = "/" + "/".join(["workspace", "wooo"]) SECRET_SOURCE_MARKER = "super" + "-secret" TOKEN_LIKE_SOURCE_MARKER = "abcd" + "efghijkl" PRISMA_JSON_SOURCE_MARKER = '"' + "product" + '":"' + "prisma" + '"' BLOCKED_RAW_OUTPUT_SOURCE_MARKERS = [ "root 311", "root 365", CHECKPOINT_SOURCE_MARKER, "node_modules", TOOLCACHE_SOURCE_MARKER, WORKSPACE_SOURCE_MARKER, "/var/ossec", "/etc/nginx", SECRET_SOURCE_MARKER, TOKEN_LIKE_SOURCE_MARKER, PRIVATE_IP_SOURCE_MARKER, PRISMA_JSON_SOURCE_MARKER, ] BLOCKED_RAW_OUTPUT_MARKERS = [ "root-process-line", "root-prisma-process-line", "external-checkpoint-url", "package-tree-path", "hosted-toolcache-path", "workspace-path", "raw-wazuh-path", "raw-nginx-path", "secret-placeholder", "token-like-placeholder", "internal-host-ip", "raw-prisma-json-product-marker", ] REQUIRED_OUTPUT_MARKERS = [ "ai_automation_alert_card_v1", "AI 自動化判讀", "runtime_write_gate=0", "candidate_only", "Top evidence", "禁止事項", ] EXECUTION_BOUNDARIES = { "not_authorization": True, "telegram_send_authorized": False, "bot_api_call_authorized": False, "direct_bot_api_migration_authorized": False, "workflow_modification_authorized": False, "ops_script_modification_authorized": False, "api_sender_refactor_authorized": False, "secret_value_collection_allowed": False, "raw_payload_storage_allowed": False, "host_write_authorized": False, "production_write_authorized": False, "runtime_execution_authorized": False, "runtime_gate_open": False, "action_buttons_allowed": False, } def read_text(root: Path, relative_path: Path) -> str: path = root / relative_path if not path.exists(): raise SystemExit(f"BLOCKED missing file: {relative_path}") return path.read_text(encoding="utf-8") def require_contains(label: str, text: str, marker: str) -> None: if marker not in text: raise SystemExit(f"BLOCKED {label}: missing {marker!r}") def function_segment(text: str, marker: str, *, limit: int = 2200) -> str: start = text.find(marker) if start == -1: raise SystemExit(f"BLOCKED source function marker missing: {marker!r}") return text[start : start + limit] def git_commit(root: Path) -> str: try: return subprocess.check_output( ["git", "rev-parse", "--short", "HEAD"], cwd=root, text=True, stderr=subprocess.DEVNULL, ).strip() except Exception: return "unknown" def validate_source_contract(root: Path) -> None: source = read_text(root, SOURCE_PATH) tests = read_text(root, TEST_PATH) for marker in SOURCE_MARKERS: require_contains("telegram_gateway.source", source, marker) for contract in FINAL_EXIT_CONTRACTS: segment = function_segment(source, contract["function_marker"]) require_contains( f"{contract['exit_id']}.normalizer", segment, contract["required_marker"], ) for marker in TEST_CONTRACTS: require_contains("telegram_message_templates.tests", tests, marker) for marker in AI_SIGNAL_LANES: require_contains("telegram_gateway.ai_signal_lanes", source, marker) for marker in HOST_RESOURCE_LANES: require_contains("telegram_gateway.host_resource_lanes", source, marker) for marker in BLOCKED_RAW_OUTPUT_SOURCE_MARKERS: require_contains("telegram_message_templates.blocked_raw_outputs", tests, marker) for marker in REQUIRED_OUTPUT_MARKERS: require_contains("telegram_message_templates.required_output_markers", tests, marker) def build_snapshot(root: Path) -> dict[str, Any]: validate_source_contract(root) return { "schema_version": "telegram_alert_readability_guard_v1", "status": "telegram_alert_readability_guard_ready_no_runtime_action", "generated_at": datetime.now(TAIPEI).replace(microsecond=0).isoformat(), "git_commit": git_commit(root), "source_refs": [str(SOURCE_PATH), str(TEST_PATH)], "source_markers": SOURCE_MARKERS, "final_exit_contracts": FINAL_EXIT_CONTRACTS, "test_contracts": TEST_CONTRACTS, "ai_signal_lanes": AI_SIGNAL_LANES, "host_resource_lanes": HOST_RESOURCE_LANES, "blocked_raw_output_markers": BLOCKED_RAW_OUTPUT_MARKERS, "required_output_markers": REQUIRED_OUTPUT_MARKERS, "execution_boundaries": EXECUTION_BOUNDARIES, "summary": { "source_formatter_marker_count": len(SOURCE_MARKERS), "final_exit_contract_count": len(FINAL_EXIT_CONTRACTS), "test_contract_count": len(TEST_CONTRACTS), "ai_signal_lane_count": len(AI_SIGNAL_LANES), "host_resource_lane_count": len(HOST_RESOURCE_LANES), "blocked_raw_output_marker_count": len(BLOCKED_RAW_OUTPUT_MARKERS), "required_output_marker_count": len(REQUIRED_OUTPUT_MARKERS), "telegram_send_authorized_count": 0, "bot_api_call_authorized_count": 0, "raw_payload_storage_allowed_count": 0, "secret_value_collection_allowed_count": 0, "production_write_authorized_count": 0, "runtime_gate_count": 0, "action_button_count": 0, }, } def validate_snapshot(snapshot: dict[str, Any]) -> None: expected_summary = { "source_formatter_marker_count": len(SOURCE_MARKERS), "final_exit_contract_count": len(FINAL_EXIT_CONTRACTS), "test_contract_count": len(TEST_CONTRACTS), "ai_signal_lane_count": len(AI_SIGNAL_LANES), "host_resource_lane_count": len(HOST_RESOURCE_LANES), "blocked_raw_output_marker_count": len(BLOCKED_RAW_OUTPUT_MARKERS), "required_output_marker_count": len(REQUIRED_OUTPUT_MARKERS), "telegram_send_authorized_count": 0, "bot_api_call_authorized_count": 0, "raw_payload_storage_allowed_count": 0, "secret_value_collection_allowed_count": 0, "production_write_authorized_count": 0, "runtime_gate_count": 0, "action_button_count": 0, } if snapshot.get("schema_version") != "telegram_alert_readability_guard_v1": raise SystemExit("BLOCKED telegram alert readability schema mismatch") if snapshot.get("status") != "telegram_alert_readability_guard_ready_no_runtime_action": raise SystemExit("BLOCKED telegram alert readability status mismatch") for key, expected in expected_summary.items(): actual = snapshot.get("summary", {}).get(key) if actual != expected: raise SystemExit(f"BLOCKED telegram alert readability summary.{key}: expected {expected!r}, got {actual!r}") for key, expected in EXECUTION_BOUNDARIES.items(): actual = snapshot.get("execution_boundaries", {}).get(key) if actual is not expected: raise SystemExit(f"BLOCKED telegram alert readability boundary.{key}: expected {expected!r}, got {actual!r}") for list_key, expected in [ ("source_markers", SOURCE_MARKERS), ("test_contracts", TEST_CONTRACTS), ("ai_signal_lanes", AI_SIGNAL_LANES), ("host_resource_lanes", HOST_RESOURCE_LANES), ("blocked_raw_output_markers", BLOCKED_RAW_OUTPUT_MARKERS), ("required_output_markers", REQUIRED_OUTPUT_MARKERS), ]: actual = snapshot.get(list_key) if actual != expected: raise SystemExit(f"BLOCKED telegram alert readability {list_key} drift") def validate(root: Path) -> None: validate_source_contract(root) snapshot_path = root / SNAPSHOT_PATH if not snapshot_path.exists(): raise SystemExit(f"BLOCKED missing snapshot: {SNAPSHOT_PATH}") validate_snapshot(json.loads(snapshot_path.read_text(encoding="utf-8"))) def main() -> None: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument( "--root", default=Path(__file__).resolve().parents[2], type=Path, help="Repository root. Defaults to this script's repository.", ) parser.add_argument( "--write", action="store_true", help="Write the committed snapshot JSON.", ) args = parser.parse_args() root = args.root.resolve() if args.write: snapshot = build_snapshot(root) target = root / SNAPSHOT_PATH target.write_text( json.dumps(snapshot, ensure_ascii=False, indent=2, sort_keys=True) + "\n", encoding="utf-8", ) validate(root) print( "TELEGRAM_ALERT_READABILITY_GUARD_OK " f"tests={len(TEST_CONTRACTS)} ai_lanes={len(AI_SIGNAL_LANES)} " f"host_lanes={len(HOST_RESOURCE_LANES)} runtime_gate=0" ) if __name__ == "__main__": main()