#!/usr/bin/env python3 """Build a repo-only Telegram notification egress inventory. This scanner identifies Telegram Bot API sendMessage paths that can bypass TelegramGateway's final-exit formatter. It does not read secrets, call Telegram, modify workflows, or send notifications. """ from __future__ import annotations import argparse import hashlib import json import re import subprocess import sys from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Any TAIPEI = timezone(timedelta(hours=8)) SCAN_ROOTS = ( Path(".gitea/workflows"), Path("scripts/ops"), Path("scripts/ci"), Path("apps/api/src"), ) SCAN_SUFFIXES = {".py", ".sh", ".js", ".yml", ".yaml"} DIRECT_BOT_API_RE = re.compile( r"api\.telegram\.org/bot.*sendMessage|sendMessage.*api\.telegram\.org/bot" ) GATEWAY_CALLSITE_RE = re.compile( r"(?:send_alert_notification\(|\b(?:tg|gw|gateway|telegram)\.send_text\(|_send_request\(\s*[\"']sendMessage[\"'])" ) SECRET_INTERPOLATION_RE = re.compile(r"\$\{\{\s*secrets\.[^}]+\}\}") BOT_TOKEN_URL_RE = re.compile(r"api\.telegram\.org/bot.*?/sendMessage") REQUIRED_OWNER_FIELDS = [ "egress_surface_id", "owner_role_or_team", "routing_purpose", "current_sender", "target_chat_route", "message_shape_contract", "redaction_contract", "formatter_convergence_plan", "delivery_receipt_ref", "dedup_or_fingerprint_plan", "fallback_or_degraded_mode", "migration_or_exception_reason", "maintenance_window", "rollback_owner", "postcheck_evidence_ref", "no_secret_value_attestation", "no_raw_payload_attestation", "no_false_green_attestation", ] REVIEWER_CHECKS = [ "direct_bot_api_surface_identified", "owner_role_present", "target_route_is_sre_owned", "message_shape_is_ai_automation_card_or_documented_exception", "redaction_contract_present", "formatter_convergence_path_present", "delivery_receipt_metadata_only", "dedup_or_fingerprint_present", "fallback_mode_does_not_leak_raw_payload", "secret_name_only_no_value", "workflow_or_script_change_requires_separate_approval", "telegram_send_not_executed_by_inventory", "no_false_green_claim", "runtime_gate_stays_zero", ] OUTCOME_LANES = [ "waiting_owner_response", "request_owner_route_supplement", "request_formatter_convergence_plan", "request_redaction_contract", "request_delivery_receipt_metadata", "quarantine_secret_or_raw_payload", "reject_false_green_claim", "ready_for_notification_egress_review", "waiting_runtime_gate", ] BLOCKED_ACTIONS = [ "telegram_send", "bot_api_call", "workflow_modification", "script_modification_without_owner", "secret_value_collection", "secret_hash_collection", "partial_token_collection", "chat_id_collection_without_owner", "store_raw_message_payload", "store_unredacted_workflow_log", "change_chat_route", "change_bot_token", "rotate_secret", "workflow_dispatch", "production_deploy", "accept_route_200_as_delivery_receipt", "accept_cd_success_as_notification_acceptance", "accept_ui_visible_as_notification_acceptance", "skip_formatter_convergence", "skip_redaction_review", "open_runtime_gate", "add_action_button", ] def git_short_sha(root: Path) -> str: try: result = subprocess.run( ["git", "rev-parse", "--short", "HEAD"], cwd=root, check=True, capture_output=True, text=True, ) return result.stdout.strip() except Exception: return "unknown" def iter_scannable_files(root: Path) -> list[Path]: files: list[Path] = [] for scan_root in SCAN_ROOTS: absolute_root = root / scan_root if not absolute_root.exists(): continue for path in absolute_root.rglob("*"): if path.is_file() and path.suffix in SCAN_SUFFIXES: files.append(path) return sorted(files) def sanitize_excerpt(line: str) -> str: excerpt = line.strip() excerpt = SECRET_INTERPOLATION_RE.sub("${{ secrets. }}", excerpt) excerpt = BOT_TOKEN_URL_RE.sub("api.telegram.org/bot/sendMessage", excerpt) return excerpt[:180] def surface_kind(relative_path: str) -> str: if relative_path.startswith(".gitea/workflows/"): return "gitea_workflow_direct_bot_api" if relative_path.startswith("scripts/ops/"): return "ops_script_direct_bot_api" if relative_path.startswith("scripts/ci/"): return "ci_script_direct_bot_api" if relative_path.startswith("apps/api/src/"): return "api_direct_bot_api" return "other_direct_bot_api" def line_hash(relative_path: str, line_number: int, line: str) -> str: payload = f"{relative_path}:{line_number}:{line.strip()}".encode("utf-8") return hashlib.sha256(payload).hexdigest()[:16] def build_report(root: Path, generated_at: str | None = None) -> dict[str, Any]: generated = generated_at or datetime.now(TAIPEI).isoformat(timespec="seconds") files = iter_scannable_files(root) direct_calls: list[dict[str, Any]] = [] gateway_calls: list[dict[str, Any]] = [] for path in files: relative_path = path.relative_to(root).as_posix() text = path.read_text(encoding="utf-8", errors="replace") for line_number, line in enumerate(text.splitlines(), start=1): if DIRECT_BOT_API_RE.search(line): kind = surface_kind(relative_path) direct_calls.append( { "egress_surface_id": f"telegram_egress:{kind}:{relative_path}:{line_number}", "surface_kind": kind, "path": relative_path, "line": line_number, "line_hash": line_hash(relative_path, line_number, line), "sanitized_excerpt": sanitize_excerpt(line), "required_owner_fields": REQUIRED_OWNER_FIELDS, "reviewer_checks": REVIEWER_CHECKS, "outcome_lanes": OUTCOME_LANES, "blocked_actions": BLOCKED_ACTIONS, "owner_response_received": False, "owner_response_accepted": False, "formatter_convergence_accepted": False, "redaction_contract_accepted": False, "delivery_receipt_accepted": False, "direct_bot_api_migration_authorized": False, "telegram_send_authorized": False, "bot_api_call_authorized": False, "workflow_modification_authorized": False, "script_modification_authorized": False, "secret_value_collection_allowed": False, "raw_payload_storage_allowed": False, "production_write_authorized": False, "runtime_gate": False, "action_buttons_allowed": False, "not_authorization": True, } ) if GATEWAY_CALLSITE_RE.search(line): gateway_calls.append( { "path": relative_path, "line": line_number, "line_hash": line_hash(relative_path, line_number, line), } ) direct_files = sorted({item["path"] for item in direct_calls}) workflow_direct_calls = [item for item in direct_calls if item["surface_kind"] == "gitea_workflow_direct_bot_api"] ops_direct_calls = [item for item in direct_calls if item["surface_kind"] == "ops_script_direct_bot_api"] ci_direct_calls = [item for item in direct_calls if item["surface_kind"] == "ci_script_direct_bot_api"] api_direct_calls = [item for item in direct_calls if item["surface_kind"] == "api_direct_bot_api"] telegram_gateway_path = root / "apps/api/src/services/telegram_gateway.py" telegram_gateway_text = telegram_gateway_path.read_text(encoding="utf-8", errors="replace") gateway_formatter_present = "normalize_telegram_send_message_payload" in telegram_gateway_text return { "schema_version": "telegram_notification_egress_inventory_v1", "generated_at": generated, "git_commit": git_short_sha(root), "status": "inventory_ready_no_runtime_action", "mode": "repo_only_scan_no_secret_value_no_telegram_send", "scan_roots": [path.as_posix() for path in SCAN_ROOTS], "summary": { "scanned_file_count": len(files), "direct_bot_api_file_count": len(direct_files), "direct_bot_api_call_count": len(direct_calls), "workflow_direct_bot_api_call_count": len(workflow_direct_calls), "ops_script_direct_bot_api_call_count": len(ops_direct_calls), "ci_script_direct_bot_api_call_count": len(ci_direct_calls), "api_direct_bot_api_call_count": len(api_direct_calls), "gateway_normalized_callsite_count": len(gateway_calls), "gateway_final_exit_formatter_present_count": 1 if gateway_formatter_present else 0, "required_owner_field_count": len(REQUIRED_OWNER_FIELDS), "reviewer_check_count": len(REVIEWER_CHECKS), "outcome_lane_count": len(OUTCOME_LANES), "blocked_action_count": len(BLOCKED_ACTIONS), "owner_response_received_count": 0, "owner_response_accepted_count": 0, "formatter_convergence_accepted_count": 0, "redaction_contract_accepted_count": 0, "delivery_receipt_accepted_count": 0, "direct_bot_api_migration_authorized_count": 0, "telegram_send_authorized_count": 0, "bot_api_call_authorized_count": 0, "workflow_modification_authorized_count": 0, "script_modification_authorized_count": 0, "secret_value_collection_allowed_count": 0, "raw_payload_storage_allowed_count": 0, "production_write_authorized_count": 0, "runtime_gate_count": 0, "action_button_count": 0, }, "execution_boundaries": { "runtime_execution_authorized": False, "telegram_send_authorized": False, "bot_api_call_authorized": False, "workflow_modification_authorized": False, "script_modification_authorized": False, "secret_value_collection_allowed": False, "secret_hash_collection_allowed": False, "partial_token_collection_allowed": False, "raw_payload_storage_allowed": False, "chat_route_change_authorized": False, "bot_token_change_authorized": False, "workflow_dispatch_authorized": False, "production_deploy_authorized": False, "action_buttons_allowed": False, "not_authorization": True, }, "direct_bot_api_calls": direct_calls, "gateway_normalized_callsite_refs": gateway_calls, "operator_interpretation": [ "direct_bot_api_call_count 大於 0 代表仍有 workflow / ops / API 旁路可能繞過 TelegramGateway formatter。", "本清冊只建立 metadata-only egress surface,不送 Telegram、不修改 workflow / script、不讀 secret value。", "後續要收斂 direct Bot API 必須另走 owner response、formatter convergence、redaction contract、delivery receipt 與維護窗口。", ], } def validate(root: Path) -> None: report = build_report(root) if report["summary"]["gateway_final_exit_formatter_present_count"] != 1: raise SystemExit("BLOCKED telegram egress inventory: gateway formatter not found") def main() -> None: parser = argparse.ArgumentParser(description="Build Telegram notification egress inventory") parser.add_argument("--root", default=".", help="repository root") parser.add_argument("--output", help="write JSON snapshot") parser.add_argument("--generated-at", help="fixed generated_at timestamp") args = parser.parse_args() root = Path(args.root).resolve() report = build_report(root, args.generated_at) payload = json.dumps(report, ensure_ascii=False, indent=2) + "\n" if args.output: Path(args.output).write_text(payload, encoding="utf-8") else: sys.stdout.write(payload) print( "TELEGRAM_NOTIFICATION_EGRESS_INVENTORY_OK " f"direct_calls={report['summary']['direct_bot_api_call_count']} " f"files={report['summary']['direct_bot_api_file_count']} " f"workflow={report['summary']['workflow_direct_bot_api_call_count']} " f"ops={report['summary']['ops_script_direct_bot_api_call_count']} " f"api={report['summary']['api_direct_bot_api_call_count']} " f"runtime_gate={report['summary']['runtime_gate_count']}", file=sys.stderr, ) if __name__ == "__main__": main()