Files
awoooi/scripts/security/telegram-notification-egress-owner-request-draft.py

340 lines
13 KiB
Python

#!/usr/bin/env python3
"""Build owner request drafts for Telegram notification egress surfaces.
The draft groups direct Bot API sendMessage call sites by file path. It creates
metadata-only handoff envelopes and never sends requests, calls Telegram, reads
secrets, or modifies workflows/scripts.
"""
from __future__ import annotations
import argparse
import json
import subprocess
import sys
from collections import defaultdict
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any
TAIPEI = timezone(timedelta(hours=8))
SOURCE_SNAPSHOT = Path("docs/security/telegram-notification-egress-inventory.snapshot.json")
REQUEST_FIELDS = [
"request_draft_id",
"source_inventory_schema_version",
"source_path",
"surface_kind",
"direct_call_count",
"line_refs",
"line_hash_refs",
"owner_role_or_team",
"routing_purpose",
"current_sender",
"target_chat_route",
"message_shape_contract",
"redaction_contract",
"formatter_convergence_decision",
"gateway_or_alertmanager_target",
"break_glass_fallback_decision",
"delivery_receipt_ref",
"dedup_or_fingerprint_plan",
"fallback_or_degraded_mode",
"migration_or_exception_reason",
"maintenance_window",
"rollback_owner",
"postcheck_evidence_ref",
"no_secret_value_attestation",
"no_raw_payload_attestation",
"no_false_green_attestation",
"not_authorization",
]
REQUIRED_OWNER_FIELDS = [
"owner_role_or_team",
"routing_purpose",
"current_sender",
"target_chat_route",
"message_shape_contract",
"redaction_contract",
"formatter_convergence_decision",
"gateway_or_alertmanager_target",
"break_glass_fallback_decision",
"delivery_receipt_ref",
"dedup_or_fingerprint_plan",
"fallback_or_degraded_mode",
"migration_or_exception_reason",
"maintenance_window",
"rollback_owner",
"postcheck_evidence_ref",
"no_secret_value_attestation",
"no_raw_payload_attestation",
"no_false_green_attestation",
]
PREFLIGHT_CHECKS = [
"source_inventory_current",
"owner_role_present",
"route_purpose_present",
"message_shape_contract_present",
"redaction_contract_present",
"formatter_convergence_decision_present",
"break_glass_fallback_explicit",
"delivery_receipt_metadata_present",
"dedup_or_fingerprint_present",
"maintenance_window_present_for_change",
"rollback_owner_present",
"postcheck_evidence_present",
"no_secret_value_attested",
"no_raw_payload_attested",
"no_false_green_attested",
"runtime_gate_stays_zero",
]
OUTCOME_LANES = [
"draft_waiting_owner_dispatch",
"request_owner_route_supplement",
"request_formatter_convergence_supplement",
"request_break_glass_fallback_supplement",
"request_redaction_or_receipt_supplement",
"quarantine_secret_or_raw_payload",
"reject_false_green_claim",
"ready_for_manual_dispatch",
"waiting_runtime_gate",
]
FORBIDDEN_PAYLOADS = [
"bot_token_value",
"chat_secret_value",
"secret_hash",
"partial_token",
"masked_token",
"authorization_header",
"raw_message_payload",
"raw_workflow_log",
"raw_action_log",
"raw_screenshot_with_secret",
"internal_work_window_transcript",
"private_namespace",
"unredacted_internal_path",
"unredacted_private_ip",
]
BLOCKED_ACTIONS = [
"send_owner_request",
"confirm_recipient",
"emit_audit_event",
"telegram_send",
"bot_api_call",
"workflow_modification",
"script_modification",
"api_sender_refactor",
"change_chat_route",
"change_bot_token",
"read_secret_store",
"collect_secret_value",
"collect_secret_hash",
"collect_partial_token",
"collect_chat_id_secret",
"store_raw_message_payload",
"store_unredacted_log",
"workflow_dispatch",
"production_deploy",
"accept_cd_success_as_delivery_receipt",
"accept_route_200_as_notification_delivery",
"accept_ui_visible_as_notification_acceptance",
"skip_formatter_convergence",
"skip_redaction_contract",
"open_runtime_gate",
"add_action_button",
]
def git_short_sha(root: Path) -> str:
try:
result = subprocess.run(
["git", "rev-parse", "--short", "HEAD"],
cwd=root,
check=True,
capture_output=True,
text=True,
)
return result.stdout.strip()
except Exception:
return "unknown"
def load_json(path: Path) -> dict[str, Any]:
return json.loads(path.read_text(encoding="utf-8"))
def request_id_for(path: str) -> str:
safe = path.replace("/", "_").replace(".", "_").replace("-", "_")
return f"telegram_notification_egress_owner_request:{safe}"
def build_report(root: Path, generated_at: str | None = None) -> dict[str, Any]:
generated = generated_at or datetime.now(TAIPEI).isoformat(timespec="seconds")
source_path = root / SOURCE_SNAPSHOT
source = load_json(source_path)
grouped: dict[str, list[dict[str, Any]]] = defaultdict(list)
for item in source["direct_bot_api_calls"]:
grouped[item["path"]].append(item)
request_drafts: list[dict[str, Any]] = []
for path in sorted(grouped):
calls = sorted(grouped[path], key=lambda item: item["line"])
surface_kind = calls[0]["surface_kind"]
request_drafts.append(
{
"request_draft_id": request_id_for(path),
"source_inventory_schema_version": source["schema_version"],
"source_path": path,
"surface_kind": surface_kind,
"direct_call_count": len(calls),
"line_refs": [item["line"] for item in calls],
"line_hash_refs": [item["line_hash"] for item in calls],
"request_fields": REQUEST_FIELDS,
"required_owner_fields": REQUIRED_OWNER_FIELDS,
"preflight_checks": PREFLIGHT_CHECKS,
"outcome_lanes": OUTCOME_LANES,
"forbidden_payloads": FORBIDDEN_PAYLOADS,
"blocked_actions": BLOCKED_ACTIONS,
"request_sent": False,
"recipient_confirmed": False,
"audit_event_emitted": False,
"owner_response_received": False,
"owner_response_accepted": False,
"formatter_convergence_accepted": False,
"redaction_contract_accepted": False,
"delivery_receipt_accepted": False,
"break_glass_fallback_accepted": False,
"direct_bot_api_migration_authorized": False,
"telegram_send_authorized": False,
"bot_api_call_authorized": False,
"workflow_modification_authorized": False,
"script_modification_authorized": False,
"api_sender_refactor_authorized": False,
"secret_value_collection_allowed": False,
"raw_payload_storage_allowed": False,
"production_write_authorized": False,
"runtime_gate": False,
"action_buttons_allowed": False,
"not_authorization": True,
}
)
workflow_drafts = [item for item in request_drafts if item["surface_kind"] == "gitea_workflow_direct_bot_api"]
ops_drafts = [item for item in request_drafts if item["surface_kind"] == "ops_script_direct_bot_api"]
ci_drafts = [item for item in request_drafts if item["surface_kind"] == "ci_script_direct_bot_api"]
api_drafts = [item for item in request_drafts if item["surface_kind"] == "api_direct_bot_api"]
return {
"schema_version": "telegram_notification_egress_owner_request_draft_v1",
"generated_at": generated,
"git_commit": git_short_sha(root),
"status": "owner_request_draft_ready_no_dispatch_no_runtime_action",
"mode": "metadata_only_no_secret_value_no_telegram_send_no_workflow_change",
"source_snapshot": SOURCE_SNAPSHOT.as_posix(),
"source_schema_version": source["schema_version"],
"source_status": source["status"],
"summary": {
"source_direct_bot_api_call_count": source["summary"]["direct_bot_api_call_count"],
"source_direct_bot_api_file_count": source["summary"]["direct_bot_api_file_count"],
"request_draft_count": len(request_drafts),
"workflow_request_draft_count": len(workflow_drafts),
"ops_script_request_draft_count": len(ops_drafts),
"ci_script_request_draft_count": len(ci_drafts),
"api_direct_request_draft_count": len(api_drafts),
"request_field_count": len(REQUEST_FIELDS),
"required_owner_field_count": len(REQUIRED_OWNER_FIELDS),
"preflight_check_count": len(PREFLIGHT_CHECKS),
"outcome_lane_count": len(OUTCOME_LANES),
"forbidden_payload_count": len(FORBIDDEN_PAYLOADS),
"blocked_action_count": len(BLOCKED_ACTIONS),
"request_sent_count": 0,
"recipient_confirmed_count": 0,
"audit_event_emitted_count": 0,
"owner_response_received_count": 0,
"owner_response_accepted_count": 0,
"formatter_convergence_accepted_count": 0,
"redaction_contract_accepted_count": 0,
"delivery_receipt_accepted_count": 0,
"break_glass_fallback_accepted_count": 0,
"direct_bot_api_migration_authorized_count": 0,
"telegram_send_authorized_count": 0,
"bot_api_call_authorized_count": 0,
"workflow_modification_authorized_count": 0,
"script_modification_authorized_count": 0,
"api_sender_refactor_authorized_count": 0,
"secret_value_collection_allowed_count": 0,
"raw_payload_storage_allowed_count": 0,
"production_write_authorized_count": 0,
"runtime_gate_count": 0,
"action_button_count": 0,
},
"execution_boundaries": {
"runtime_execution_authorized": False,
"owner_request_send_authorized": False,
"recipient_confirmation_authorized": False,
"audit_event_emit_authorized": False,
"telegram_send_authorized": False,
"bot_api_call_authorized": False,
"workflow_modification_authorized": False,
"script_modification_authorized": False,
"api_sender_refactor_authorized": False,
"secret_value_collection_allowed": False,
"raw_payload_storage_allowed": False,
"production_write_authorized": False,
"action_buttons_allowed": False,
"not_authorization": True,
},
"request_drafts": request_drafts,
"operator_interpretation": [
"This is a draft packet for manual owner dispatch; request_sent_count remains 0.",
"Each file-level draft must decide whether the path converges to TelegramGateway, AWOOI Alertmanager, or a documented break-glass fallback.",
"No workflow, script, API, secret, Telegram, or production action is authorized by this snapshot.",
],
}
def validate(root: Path) -> None:
report = build_report(root)
if report["summary"]["request_draft_count"] != report["summary"]["source_direct_bot_api_file_count"]:
raise SystemExit("BLOCKED telegram egress owner request draft: file/draft count mismatch")
if report["summary"]["runtime_gate_count"] != 0:
raise SystemExit("BLOCKED telegram egress owner request draft: runtime gate must stay 0")
def main() -> None:
parser = argparse.ArgumentParser(description="Build Telegram notification egress owner request draft")
parser.add_argument("--root", default=".", help="repository root")
parser.add_argument("--output", help="write JSON snapshot")
parser.add_argument("--generated-at", help="fixed generated_at timestamp")
args = parser.parse_args()
root = Path(args.root).resolve()
report = build_report(root, args.generated_at)
payload = json.dumps(report, ensure_ascii=False, indent=2) + "\n"
if args.output:
Path(args.output).write_text(payload, encoding="utf-8")
else:
sys.stdout.write(payload)
print(
"TELEGRAM_NOTIFICATION_EGRESS_OWNER_REQUEST_DRAFT_OK "
f"drafts={report['summary']['request_draft_count']} "
f"workflow={report['summary']['workflow_request_draft_count']} "
f"ops={report['summary']['ops_script_request_draft_count']} "
f"api={report['summary']['api_direct_request_draft_count']} "
f"sent={report['summary']['request_sent_count']} "
f"runtime_gate={report['summary']['runtime_gate_count']}",
file=sys.stderr,
)
if __name__ == "__main__":
main()