340 lines
13 KiB
Python
340 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""Build owner request drafts for Telegram notification egress surfaces.
|
|
|
|
The draft groups direct Bot API sendMessage call sites by file path. It creates
|
|
metadata-only handoff envelopes and never sends requests, calls Telegram, reads
|
|
secrets, or modifies workflows/scripts.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import subprocess
|
|
import sys
|
|
from collections import defaultdict
|
|
from datetime import datetime, timedelta, timezone
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
TAIPEI = timezone(timedelta(hours=8))
|
|
|
|
SOURCE_SNAPSHOT = Path("docs/security/telegram-notification-egress-inventory.snapshot.json")
|
|
|
|
REQUEST_FIELDS = [
|
|
"request_draft_id",
|
|
"source_inventory_schema_version",
|
|
"source_path",
|
|
"surface_kind",
|
|
"direct_call_count",
|
|
"line_refs",
|
|
"line_hash_refs",
|
|
"owner_role_or_team",
|
|
"routing_purpose",
|
|
"current_sender",
|
|
"target_chat_route",
|
|
"message_shape_contract",
|
|
"redaction_contract",
|
|
"formatter_convergence_decision",
|
|
"gateway_or_alertmanager_target",
|
|
"break_glass_fallback_decision",
|
|
"delivery_receipt_ref",
|
|
"dedup_or_fingerprint_plan",
|
|
"fallback_or_degraded_mode",
|
|
"migration_or_exception_reason",
|
|
"maintenance_window",
|
|
"rollback_owner",
|
|
"postcheck_evidence_ref",
|
|
"no_secret_value_attestation",
|
|
"no_raw_payload_attestation",
|
|
"no_false_green_attestation",
|
|
"not_authorization",
|
|
]
|
|
|
|
REQUIRED_OWNER_FIELDS = [
|
|
"owner_role_or_team",
|
|
"routing_purpose",
|
|
"current_sender",
|
|
"target_chat_route",
|
|
"message_shape_contract",
|
|
"redaction_contract",
|
|
"formatter_convergence_decision",
|
|
"gateway_or_alertmanager_target",
|
|
"break_glass_fallback_decision",
|
|
"delivery_receipt_ref",
|
|
"dedup_or_fingerprint_plan",
|
|
"fallback_or_degraded_mode",
|
|
"migration_or_exception_reason",
|
|
"maintenance_window",
|
|
"rollback_owner",
|
|
"postcheck_evidence_ref",
|
|
"no_secret_value_attestation",
|
|
"no_raw_payload_attestation",
|
|
"no_false_green_attestation",
|
|
]
|
|
|
|
PREFLIGHT_CHECKS = [
|
|
"source_inventory_current",
|
|
"owner_role_present",
|
|
"route_purpose_present",
|
|
"message_shape_contract_present",
|
|
"redaction_contract_present",
|
|
"formatter_convergence_decision_present",
|
|
"break_glass_fallback_explicit",
|
|
"delivery_receipt_metadata_present",
|
|
"dedup_or_fingerprint_present",
|
|
"maintenance_window_present_for_change",
|
|
"rollback_owner_present",
|
|
"postcheck_evidence_present",
|
|
"no_secret_value_attested",
|
|
"no_raw_payload_attested",
|
|
"no_false_green_attested",
|
|
"runtime_gate_stays_zero",
|
|
]
|
|
|
|
OUTCOME_LANES = [
|
|
"draft_waiting_owner_dispatch",
|
|
"request_owner_route_supplement",
|
|
"request_formatter_convergence_supplement",
|
|
"request_break_glass_fallback_supplement",
|
|
"request_redaction_or_receipt_supplement",
|
|
"quarantine_secret_or_raw_payload",
|
|
"reject_false_green_claim",
|
|
"ready_for_manual_dispatch",
|
|
"waiting_runtime_gate",
|
|
]
|
|
|
|
FORBIDDEN_PAYLOADS = [
|
|
"bot_token_value",
|
|
"chat_secret_value",
|
|
"secret_hash",
|
|
"partial_token",
|
|
"masked_token",
|
|
"authorization_header",
|
|
"raw_message_payload",
|
|
"raw_workflow_log",
|
|
"raw_action_log",
|
|
"raw_screenshot_with_secret",
|
|
"internal_work_window_transcript",
|
|
"private_namespace",
|
|
"unredacted_internal_path",
|
|
"unredacted_private_ip",
|
|
]
|
|
|
|
BLOCKED_ACTIONS = [
|
|
"send_owner_request",
|
|
"confirm_recipient",
|
|
"emit_audit_event",
|
|
"telegram_send",
|
|
"bot_api_call",
|
|
"workflow_modification",
|
|
"script_modification",
|
|
"api_sender_refactor",
|
|
"change_chat_route",
|
|
"change_bot_token",
|
|
"read_secret_store",
|
|
"collect_secret_value",
|
|
"collect_secret_hash",
|
|
"collect_partial_token",
|
|
"collect_chat_id_secret",
|
|
"store_raw_message_payload",
|
|
"store_unredacted_log",
|
|
"workflow_dispatch",
|
|
"production_deploy",
|
|
"accept_cd_success_as_delivery_receipt",
|
|
"accept_route_200_as_notification_delivery",
|
|
"accept_ui_visible_as_notification_acceptance",
|
|
"skip_formatter_convergence",
|
|
"skip_redaction_contract",
|
|
"open_runtime_gate",
|
|
"add_action_button",
|
|
]
|
|
|
|
|
|
def git_short_sha(root: Path) -> str:
|
|
try:
|
|
result = subprocess.run(
|
|
["git", "rev-parse", "--short", "HEAD"],
|
|
cwd=root,
|
|
check=True,
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
return result.stdout.strip()
|
|
except Exception:
|
|
return "unknown"
|
|
|
|
|
|
def load_json(path: Path) -> dict[str, Any]:
|
|
return json.loads(path.read_text(encoding="utf-8"))
|
|
|
|
|
|
def request_id_for(path: str) -> str:
|
|
safe = path.replace("/", "_").replace(".", "_").replace("-", "_")
|
|
return f"telegram_notification_egress_owner_request:{safe}"
|
|
|
|
|
|
def build_report(root: Path, generated_at: str | None = None) -> dict[str, Any]:
|
|
generated = generated_at or datetime.now(TAIPEI).isoformat(timespec="seconds")
|
|
source_path = root / SOURCE_SNAPSHOT
|
|
source = load_json(source_path)
|
|
grouped: dict[str, list[dict[str, Any]]] = defaultdict(list)
|
|
for item in source["direct_bot_api_calls"]:
|
|
grouped[item["path"]].append(item)
|
|
|
|
request_drafts: list[dict[str, Any]] = []
|
|
for path in sorted(grouped):
|
|
calls = sorted(grouped[path], key=lambda item: item["line"])
|
|
surface_kind = calls[0]["surface_kind"]
|
|
request_drafts.append(
|
|
{
|
|
"request_draft_id": request_id_for(path),
|
|
"source_inventory_schema_version": source["schema_version"],
|
|
"source_path": path,
|
|
"surface_kind": surface_kind,
|
|
"direct_call_count": len(calls),
|
|
"line_refs": [item["line"] for item in calls],
|
|
"line_hash_refs": [item["line_hash"] for item in calls],
|
|
"request_fields": REQUEST_FIELDS,
|
|
"required_owner_fields": REQUIRED_OWNER_FIELDS,
|
|
"preflight_checks": PREFLIGHT_CHECKS,
|
|
"outcome_lanes": OUTCOME_LANES,
|
|
"forbidden_payloads": FORBIDDEN_PAYLOADS,
|
|
"blocked_actions": BLOCKED_ACTIONS,
|
|
"request_sent": False,
|
|
"recipient_confirmed": False,
|
|
"audit_event_emitted": False,
|
|
"owner_response_received": False,
|
|
"owner_response_accepted": False,
|
|
"formatter_convergence_accepted": False,
|
|
"redaction_contract_accepted": False,
|
|
"delivery_receipt_accepted": False,
|
|
"break_glass_fallback_accepted": False,
|
|
"direct_bot_api_migration_authorized": False,
|
|
"telegram_send_authorized": False,
|
|
"bot_api_call_authorized": False,
|
|
"workflow_modification_authorized": False,
|
|
"script_modification_authorized": False,
|
|
"api_sender_refactor_authorized": False,
|
|
"secret_value_collection_allowed": False,
|
|
"raw_payload_storage_allowed": False,
|
|
"production_write_authorized": False,
|
|
"runtime_gate": False,
|
|
"action_buttons_allowed": False,
|
|
"not_authorization": True,
|
|
}
|
|
)
|
|
|
|
workflow_drafts = [item for item in request_drafts if item["surface_kind"] == "gitea_workflow_direct_bot_api"]
|
|
ops_drafts = [item for item in request_drafts if item["surface_kind"] == "ops_script_direct_bot_api"]
|
|
ci_drafts = [item for item in request_drafts if item["surface_kind"] == "ci_script_direct_bot_api"]
|
|
api_drafts = [item for item in request_drafts if item["surface_kind"] == "api_direct_bot_api"]
|
|
|
|
return {
|
|
"schema_version": "telegram_notification_egress_owner_request_draft_v1",
|
|
"generated_at": generated,
|
|
"git_commit": git_short_sha(root),
|
|
"status": "owner_request_draft_ready_no_dispatch_no_runtime_action",
|
|
"mode": "metadata_only_no_secret_value_no_telegram_send_no_workflow_change",
|
|
"source_snapshot": SOURCE_SNAPSHOT.as_posix(),
|
|
"source_schema_version": source["schema_version"],
|
|
"source_status": source["status"],
|
|
"summary": {
|
|
"source_direct_bot_api_call_count": source["summary"]["direct_bot_api_call_count"],
|
|
"source_direct_bot_api_file_count": source["summary"]["direct_bot_api_file_count"],
|
|
"request_draft_count": len(request_drafts),
|
|
"workflow_request_draft_count": len(workflow_drafts),
|
|
"ops_script_request_draft_count": len(ops_drafts),
|
|
"ci_script_request_draft_count": len(ci_drafts),
|
|
"api_direct_request_draft_count": len(api_drafts),
|
|
"request_field_count": len(REQUEST_FIELDS),
|
|
"required_owner_field_count": len(REQUIRED_OWNER_FIELDS),
|
|
"preflight_check_count": len(PREFLIGHT_CHECKS),
|
|
"outcome_lane_count": len(OUTCOME_LANES),
|
|
"forbidden_payload_count": len(FORBIDDEN_PAYLOADS),
|
|
"blocked_action_count": len(BLOCKED_ACTIONS),
|
|
"request_sent_count": 0,
|
|
"recipient_confirmed_count": 0,
|
|
"audit_event_emitted_count": 0,
|
|
"owner_response_received_count": 0,
|
|
"owner_response_accepted_count": 0,
|
|
"formatter_convergence_accepted_count": 0,
|
|
"redaction_contract_accepted_count": 0,
|
|
"delivery_receipt_accepted_count": 0,
|
|
"break_glass_fallback_accepted_count": 0,
|
|
"direct_bot_api_migration_authorized_count": 0,
|
|
"telegram_send_authorized_count": 0,
|
|
"bot_api_call_authorized_count": 0,
|
|
"workflow_modification_authorized_count": 0,
|
|
"script_modification_authorized_count": 0,
|
|
"api_sender_refactor_authorized_count": 0,
|
|
"secret_value_collection_allowed_count": 0,
|
|
"raw_payload_storage_allowed_count": 0,
|
|
"production_write_authorized_count": 0,
|
|
"runtime_gate_count": 0,
|
|
"action_button_count": 0,
|
|
},
|
|
"execution_boundaries": {
|
|
"runtime_execution_authorized": False,
|
|
"owner_request_send_authorized": False,
|
|
"recipient_confirmation_authorized": False,
|
|
"audit_event_emit_authorized": False,
|
|
"telegram_send_authorized": False,
|
|
"bot_api_call_authorized": False,
|
|
"workflow_modification_authorized": False,
|
|
"script_modification_authorized": False,
|
|
"api_sender_refactor_authorized": False,
|
|
"secret_value_collection_allowed": False,
|
|
"raw_payload_storage_allowed": False,
|
|
"production_write_authorized": False,
|
|
"action_buttons_allowed": False,
|
|
"not_authorization": True,
|
|
},
|
|
"request_drafts": request_drafts,
|
|
"operator_interpretation": [
|
|
"This is a draft packet for manual owner dispatch; request_sent_count remains 0.",
|
|
"Each file-level draft must decide whether the path converges to TelegramGateway, AWOOI Alertmanager, or a documented break-glass fallback.",
|
|
"No workflow, script, API, secret, Telegram, or production action is authorized by this snapshot.",
|
|
],
|
|
}
|
|
|
|
|
|
def validate(root: Path) -> None:
|
|
report = build_report(root)
|
|
if report["summary"]["request_draft_count"] != report["summary"]["source_direct_bot_api_file_count"]:
|
|
raise SystemExit("BLOCKED telegram egress owner request draft: file/draft count mismatch")
|
|
if report["summary"]["runtime_gate_count"] != 0:
|
|
raise SystemExit("BLOCKED telegram egress owner request draft: runtime gate must stay 0")
|
|
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser(description="Build Telegram notification egress owner request draft")
|
|
parser.add_argument("--root", default=".", help="repository root")
|
|
parser.add_argument("--output", help="write JSON snapshot")
|
|
parser.add_argument("--generated-at", help="fixed generated_at timestamp")
|
|
args = parser.parse_args()
|
|
|
|
root = Path(args.root).resolve()
|
|
report = build_report(root, args.generated_at)
|
|
payload = json.dumps(report, ensure_ascii=False, indent=2) + "\n"
|
|
if args.output:
|
|
Path(args.output).write_text(payload, encoding="utf-8")
|
|
else:
|
|
sys.stdout.write(payload)
|
|
|
|
print(
|
|
"TELEGRAM_NOTIFICATION_EGRESS_OWNER_REQUEST_DRAFT_OK "
|
|
f"drafts={report['summary']['request_draft_count']} "
|
|
f"workflow={report['summary']['workflow_request_draft_count']} "
|
|
f"ops={report['summary']['ops_script_request_draft_count']} "
|
|
f"api={report['summary']['api_direct_request_draft_count']} "
|
|
f"sent={report['summary']['request_sent_count']} "
|
|
f"runtime_gate={report['summary']['runtime_gate_count']}",
|
|
file=sys.stderr,
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|