Files
awoooi/scripts/security/telegram-notification-egress-inventory.py

327 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""Build a repo-only Telegram notification egress inventory.
This scanner identifies Telegram Bot API sendMessage paths that can bypass
TelegramGateway's final-exit formatter. It does not read secrets, call
Telegram, modify workflows, or send notifications.
"""
from __future__ import annotations
import argparse
import hashlib
import json
import re
import subprocess
import sys
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any
TAIPEI = timezone(timedelta(hours=8))
SCAN_ROOTS = (
Path(".gitea/workflows"),
Path("scripts/ops"),
Path("scripts/ci"),
Path("apps/api/src"),
)
SCAN_SUFFIXES = {".py", ".sh", ".js", ".yml", ".yaml"}
DIRECT_BOT_API_RE = re.compile(
r"api\.telegram\.org/bot.*sendMessage|sendMessage.*api\.telegram\.org/bot"
)
GATEWAY_CALLSITE_RE = re.compile(
r"(?:send_alert_notification\(|\b(?:tg|gw|gateway|telegram)\.send_text\(|_send_request\(\s*[\"']sendMessage[\"'])"
)
SECRET_INTERPOLATION_RE = re.compile(r"\$\{\{\s*secrets\.[^}]+\}\}")
BOT_TOKEN_URL_RE = re.compile(r"api\.telegram\.org/bot.*?/sendMessage")
REQUIRED_OWNER_FIELDS = [
"egress_surface_id",
"owner_role_or_team",
"routing_purpose",
"current_sender",
"target_chat_route",
"message_shape_contract",
"redaction_contract",
"formatter_convergence_plan",
"delivery_receipt_ref",
"dedup_or_fingerprint_plan",
"fallback_or_degraded_mode",
"migration_or_exception_reason",
"maintenance_window",
"rollback_owner",
"postcheck_evidence_ref",
"no_secret_value_attestation",
"no_raw_payload_attestation",
"no_false_green_attestation",
]
REVIEWER_CHECKS = [
"direct_bot_api_surface_identified",
"owner_role_present",
"target_route_is_sre_owned",
"message_shape_is_ai_automation_card_or_documented_exception",
"redaction_contract_present",
"formatter_convergence_path_present",
"delivery_receipt_metadata_only",
"dedup_or_fingerprint_present",
"fallback_mode_does_not_leak_raw_payload",
"secret_name_only_no_value",
"workflow_or_script_change_requires_separate_approval",
"telegram_send_not_executed_by_inventory",
"no_false_green_claim",
"runtime_gate_stays_zero",
]
OUTCOME_LANES = [
"waiting_owner_response",
"request_owner_route_supplement",
"request_formatter_convergence_plan",
"request_redaction_contract",
"request_delivery_receipt_metadata",
"quarantine_secret_or_raw_payload",
"reject_false_green_claim",
"ready_for_notification_egress_review",
"waiting_runtime_gate",
]
BLOCKED_ACTIONS = [
"telegram_send",
"bot_api_call",
"workflow_modification",
"script_modification_without_owner",
"secret_value_collection",
"secret_hash_collection",
"partial_token_collection",
"chat_id_collection_without_owner",
"store_raw_message_payload",
"store_unredacted_workflow_log",
"change_chat_route",
"change_bot_token",
"rotate_secret",
"workflow_dispatch",
"production_deploy",
"accept_route_200_as_delivery_receipt",
"accept_cd_success_as_notification_acceptance",
"accept_ui_visible_as_notification_acceptance",
"skip_formatter_convergence",
"skip_redaction_review",
"open_runtime_gate",
"add_action_button",
]
def git_short_sha(root: Path) -> str:
try:
result = subprocess.run(
["git", "rev-parse", "--short", "HEAD"],
cwd=root,
check=True,
capture_output=True,
text=True,
)
return result.stdout.strip()
except Exception:
return "unknown"
def iter_scannable_files(root: Path) -> list[Path]:
files: list[Path] = []
for scan_root in SCAN_ROOTS:
absolute_root = root / scan_root
if not absolute_root.exists():
continue
for path in absolute_root.rglob("*"):
if path.is_file() and path.suffix in SCAN_SUFFIXES:
files.append(path)
return sorted(files)
def sanitize_excerpt(line: str) -> str:
excerpt = line.strip()
excerpt = SECRET_INTERPOLATION_RE.sub("${{ secrets.<redacted> }}", excerpt)
excerpt = BOT_TOKEN_URL_RE.sub("api.telegram.org/bot<redacted>/sendMessage", excerpt)
return excerpt[:180]
def surface_kind(relative_path: str) -> str:
if relative_path.startswith(".gitea/workflows/"):
return "gitea_workflow_direct_bot_api"
if relative_path.startswith("scripts/ops/"):
return "ops_script_direct_bot_api"
if relative_path.startswith("scripts/ci/"):
return "ci_script_direct_bot_api"
if relative_path.startswith("apps/api/src/"):
return "api_direct_bot_api"
return "other_direct_bot_api"
def line_hash(relative_path: str, line_number: int, line: str) -> str:
payload = f"{relative_path}:{line_number}:{line.strip()}".encode("utf-8")
return hashlib.sha256(payload).hexdigest()[:16]
def build_report(root: Path, generated_at: str | None = None) -> dict[str, Any]:
generated = generated_at or datetime.now(TAIPEI).isoformat(timespec="seconds")
files = iter_scannable_files(root)
direct_calls: list[dict[str, Any]] = []
gateway_calls: list[dict[str, Any]] = []
for path in files:
relative_path = path.relative_to(root).as_posix()
text = path.read_text(encoding="utf-8", errors="replace")
for line_number, line in enumerate(text.splitlines(), start=1):
if DIRECT_BOT_API_RE.search(line):
kind = surface_kind(relative_path)
direct_calls.append(
{
"egress_surface_id": f"telegram_egress:{kind}:{relative_path}:{line_number}",
"surface_kind": kind,
"path": relative_path,
"line": line_number,
"line_hash": line_hash(relative_path, line_number, line),
"sanitized_excerpt": sanitize_excerpt(line),
"required_owner_fields": REQUIRED_OWNER_FIELDS,
"reviewer_checks": REVIEWER_CHECKS,
"outcome_lanes": OUTCOME_LANES,
"blocked_actions": BLOCKED_ACTIONS,
"owner_response_received": False,
"owner_response_accepted": False,
"formatter_convergence_accepted": False,
"redaction_contract_accepted": False,
"delivery_receipt_accepted": False,
"direct_bot_api_migration_authorized": False,
"telegram_send_authorized": False,
"bot_api_call_authorized": False,
"workflow_modification_authorized": False,
"script_modification_authorized": False,
"secret_value_collection_allowed": False,
"raw_payload_storage_allowed": False,
"production_write_authorized": False,
"runtime_gate": False,
"action_buttons_allowed": False,
"not_authorization": True,
}
)
if GATEWAY_CALLSITE_RE.search(line):
gateway_calls.append(
{
"path": relative_path,
"line": line_number,
"line_hash": line_hash(relative_path, line_number, line),
}
)
direct_files = sorted({item["path"] for item in direct_calls})
workflow_direct_calls = [item for item in direct_calls if item["surface_kind"] == "gitea_workflow_direct_bot_api"]
ops_direct_calls = [item for item in direct_calls if item["surface_kind"] == "ops_script_direct_bot_api"]
ci_direct_calls = [item for item in direct_calls if item["surface_kind"] == "ci_script_direct_bot_api"]
api_direct_calls = [item for item in direct_calls if item["surface_kind"] == "api_direct_bot_api"]
telegram_gateway_path = root / "apps/api/src/services/telegram_gateway.py"
telegram_gateway_text = telegram_gateway_path.read_text(encoding="utf-8", errors="replace")
gateway_formatter_present = "normalize_telegram_send_message_payload" in telegram_gateway_text
return {
"schema_version": "telegram_notification_egress_inventory_v1",
"generated_at": generated,
"git_commit": git_short_sha(root),
"status": "inventory_ready_no_runtime_action",
"mode": "repo_only_scan_no_secret_value_no_telegram_send",
"scan_roots": [path.as_posix() for path in SCAN_ROOTS],
"summary": {
"scanned_file_count": len(files),
"direct_bot_api_file_count": len(direct_files),
"direct_bot_api_call_count": len(direct_calls),
"workflow_direct_bot_api_call_count": len(workflow_direct_calls),
"ops_script_direct_bot_api_call_count": len(ops_direct_calls),
"ci_script_direct_bot_api_call_count": len(ci_direct_calls),
"api_direct_bot_api_call_count": len(api_direct_calls),
"gateway_normalized_callsite_count": len(gateway_calls),
"gateway_final_exit_formatter_present_count": 1 if gateway_formatter_present else 0,
"required_owner_field_count": len(REQUIRED_OWNER_FIELDS),
"reviewer_check_count": len(REVIEWER_CHECKS),
"outcome_lane_count": len(OUTCOME_LANES),
"blocked_action_count": len(BLOCKED_ACTIONS),
"owner_response_received_count": 0,
"owner_response_accepted_count": 0,
"formatter_convergence_accepted_count": 0,
"redaction_contract_accepted_count": 0,
"delivery_receipt_accepted_count": 0,
"direct_bot_api_migration_authorized_count": 0,
"telegram_send_authorized_count": 0,
"bot_api_call_authorized_count": 0,
"workflow_modification_authorized_count": 0,
"script_modification_authorized_count": 0,
"secret_value_collection_allowed_count": 0,
"raw_payload_storage_allowed_count": 0,
"production_write_authorized_count": 0,
"runtime_gate_count": 0,
"action_button_count": 0,
},
"execution_boundaries": {
"runtime_execution_authorized": False,
"telegram_send_authorized": False,
"bot_api_call_authorized": False,
"workflow_modification_authorized": False,
"script_modification_authorized": False,
"secret_value_collection_allowed": False,
"secret_hash_collection_allowed": False,
"partial_token_collection_allowed": False,
"raw_payload_storage_allowed": False,
"chat_route_change_authorized": False,
"bot_token_change_authorized": False,
"workflow_dispatch_authorized": False,
"production_deploy_authorized": False,
"action_buttons_allowed": False,
"not_authorization": True,
},
"direct_bot_api_calls": direct_calls,
"gateway_normalized_callsite_refs": gateway_calls,
"operator_interpretation": [
"direct_bot_api_call_count 大於 0 代表仍有 workflow / ops / API 旁路可能繞過 TelegramGateway formatter。",
"本清冊只建立 metadata-only egress surface不送 Telegram、不修改 workflow / script、不讀 secret value。",
"後續要收斂 direct Bot API 必須另走 owner response、formatter convergence、redaction contract、delivery receipt 與維護窗口。",
],
}
def validate(root: Path) -> None:
report = build_report(root)
if report["summary"]["gateway_final_exit_formatter_present_count"] != 1:
raise SystemExit("BLOCKED telegram egress inventory: gateway formatter not found")
def main() -> None:
parser = argparse.ArgumentParser(description="Build Telegram notification egress inventory")
parser.add_argument("--root", default=".", help="repository root")
parser.add_argument("--output", help="write JSON snapshot")
parser.add_argument("--generated-at", help="fixed generated_at timestamp")
args = parser.parse_args()
root = Path(args.root).resolve()
report = build_report(root, args.generated_at)
payload = json.dumps(report, ensure_ascii=False, indent=2) + "\n"
if args.output:
Path(args.output).write_text(payload, encoding="utf-8")
else:
sys.stdout.write(payload)
print(
"TELEGRAM_NOTIFICATION_EGRESS_INVENTORY_OK "
f"direct_calls={report['summary']['direct_bot_api_call_count']} "
f"files={report['summary']['direct_bot_api_file_count']} "
f"workflow={report['summary']['workflow_direct_bot_api_call_count']} "
f"ops={report['summary']['ops_script_direct_bot_api_call_count']} "
f"api={report['summary']['api_direct_bot_api_call_count']} "
f"runtime_gate={report['summary']['runtime_gate_count']}",
file=sys.stderr,
)
if __name__ == "__main__":
main()