#!/usr/bin/env python3 """ IwoooS agent-bounty-protocol owner request draft 產生器。 本工具讀取 agent-bounty-protocol onboarding handoff snapshot,將 repo / refs、 deployment、產品 surface、資料分級、MCP / A2A / treasury 邊界轉成人工送件前 request draft。它不讀 agent-bounty-protocol 工作樹、不讀 .env、不部署、不啟動 cron / daemon、不 claim / submit、不 payout / withdraw。 """ from __future__ import annotations import argparse import json import subprocess import sys from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Any TAIPEI = timezone(timedelta(hours=8)) REQUEST_FIELDS = [ "request_id", "scope_id", "label", "request_kind", "priority", "source_handoff_ref", "source_evidence_refs", "routes", "boundary", "data_classes", "owner_role_fields", "decision", "decision_reason", "affected_scope", "redacted_evidence_refs", "canonical_repo_ref", "repo_dirty_disposition", "deployment_boundary_ref", "auth_abuse_boundary_ref", "external_agent_boundary_ref", "settlement_treasury_boundary_ref", "maintenance_window", "rollback_owner", "validation_plan", "followup_owner", "not_approval", ] REQUIRED_OWNER_FIELDS = [ "product_owner_role_or_team", "security_owner_role_or_team", "source_control_owner_role_or_team", "deployment_owner_role_or_team", "data_classification_owner_role_or_team", "external_agent_boundary_owner_role_or_team", "settlement_or_treasury_owner_role_or_team", "notification_owner_role_or_team", "surface_scope", "decision", "decision_reason", "redacted_evidence_refs", "canonical_repo_ref", "repo_dirty_disposition", "deployment_boundary_ref", "auth_abuse_boundary_ref", "external_agent_boundary_ref", "settlement_treasury_boundary_ref", "maintenance_window", "rollback_owner", "validation_plan", "followup_owner", ] CONTROL_REQUESTS = [ { "scope_id": "repo_refs_boundary", "label": "Repo / refs / workflow boundary", "request_kind": "source_control_boundary", "priority": "P0", "routes": [], "boundary": "確認 canonical repo、dirty workspace disposition、workflow / runner / secret name owner;不得 push、sync refs 或建立 repo。", "data_classes": [], "write_capable": True, "treasury_related": False, "mcp_a2a_related": False, }, { "scope_id": "deployment_boundary", "label": "Production / compose / domain boundary", "request_kind": "deployment_boundary", "priority": "P0", "routes": ["https://agent.wooo.work", "/api/v1/health"], "boundary": "確認 production host、compose directory、domain / TLS、health smoke 與 rollback owner;不得 deploy、restart 或 migration。", "data_classes": [], "write_capable": True, "treasury_related": False, "mcp_a2a_related": False, }, { "scope_id": "data_classification_boundary", "label": "Data classification / evidence intake boundary", "request_kind": "data_classification_boundary", "priority": "P0", "routes": [], "boundary": "確認 task、agent、webhook、traffic、treasury、admin、cron 資料分級;只收 metadata,不收 raw payload 或 secret。", "data_classes": "all", "write_capable": False, "treasury_related": True, "mcp_a2a_related": True, }, { "scope_id": "external_agent_treasury_boundary", "label": "MCP / A2A / external agent / treasury boundary", "request_kind": "external_agent_treasury_boundary", "priority": "P0", "routes": ["/api/mcp/[tool]", "/api/a2a/*", "/admin/treasury", "/api/admin/withdraw"], "boundary": "確認外部 agent 自主行為、settlement、staking、payout、withdrawal 與 notification owner;不得 claim、submit、daemon、send 或 payout。", "data_classes": ["MCP tool calls / A2A protocol events", "settlement / staking / treasury / Stripe"], "write_capable": True, "treasury_related": True, "mcp_a2a_related": True, }, ] FALSE_COUNTER_KEYS = [ "request_sent_count", "recipient_confirmed_count", "owner_response_received_count", "owner_response_accepted_count", "live_evidence_received_count", "repo_refs_truth_accepted_count", "data_classification_accepted_count", "deployment_boundary_accepted_count", "external_agent_boundary_accepted_count", "settlement_treasury_accepted_count", "auth_abuse_boundary_accepted_count", "runtime_gate_count", "runtime_execution_authorized_count", "production_deploy_authorized_count", "repo_creation_authorized_count", "refs_sync_authorized_count", "workflow_modification_authorized_count", "secret_value_collection_allowed_count", "env_file_read_authorized_count", "active_scan_authorized_count", "credentialed_scan_authorized_count", "deploy_authorized_count", "compose_restart_authorized_count", "db_migration_authorized_count", "daemon_start_authorized_count", "cron_enable_authorized_count", "auto_claim_authorized_count", "auto_submit_authorized_count", "external_agent_message_authorized_count", "telegram_send_authorized_count", "discord_send_authorized_count", "github_comment_authorized_count", "payout_authorized_count", "withdrawal_authorized_count", "staking_action_authorized_count", "webhook_secret_change_authorized_count", "shared_database_authorized_count", "shared_session_authorized_count", "shared_rbac_authorized_count", "host_write_authorized_count", "production_write_authorized_count", "action_button_count", ] FALSE_DRAFT_KEYS = [ "request_sent", "recipient_confirmed", "owner_response_received", "owner_response_accepted", "live_evidence_received", "repo_refs_truth_accepted", "data_classification_accepted", "deployment_boundary_accepted", "external_agent_boundary_accepted", "settlement_treasury_accepted", "auth_abuse_boundary_accepted", "runtime_gate", "runtime_execution_authorized", "production_deploy_authorized", "repo_creation_authorized", "refs_sync_authorized", "workflow_modification_authorized", "secret_value_collection_allowed", "env_file_read_authorized", "active_scan_authorized", "credentialed_scan_authorized", "deploy_authorized", "compose_restart_authorized", "db_migration_authorized", "daemon_start_authorized", "cron_enable_authorized", "auto_claim_authorized", "auto_submit_authorized", "external_agent_message_authorized", "telegram_send_authorized", "discord_send_authorized", "github_comment_authorized", "payout_authorized", "withdrawal_authorized", "staking_action_authorized", "webhook_secret_change_authorized", "shared_database_authorized", "shared_session_authorized", "shared_rbac_authorized", "host_write_authorized", "production_write_authorized", "action_buttons_allowed", ] def git_short_sha(root: Path) -> str: try: result = subprocess.run( ["git", "rev-parse", "--short", "HEAD"], cwd=root, check=True, capture_output=True, text=True, ) return result.stdout.strip() except Exception: return "unknown" def load_json(path: Path) -> dict[str, Any]: return json.loads(path.read_text(encoding="utf-8")) def request_draft( *, scope_id: str, label: str, request_kind: str, priority: str, routes: list[str], boundary: str, data_classes: list[str], write_capable: bool, treasury_related: bool, mcp_a2a_related: bool, handoff: dict[str, Any], ) -> dict[str, Any]: draft = { "request_id": f"agent_bounty_owner_request:{scope_id}", "status": "draft_not_dispatched", "scope_id": scope_id, "label": label, "request_kind": request_kind, "priority": priority, "source_handoff_ref": "docs/security/agent-bounty-iwooos-onboarding-handoff.snapshot.json", "source_evidence_refs": handoff["source_evidence_refs"], "routes": routes, "boundary": boundary, "data_classes": data_classes, "write_capable_scope": write_capable, "treasury_related_scope": treasury_related, "mcp_a2a_related_scope": mcp_a2a_related, "requires_live_evidence": True, "request_fields": REQUEST_FIELDS, "owner_role_fields": handoff["owner_response_handoff"]["required_response_fields"], "required_owner_fields": REQUIRED_OWNER_FIELDS, "forbidden_inputs": handoff["owner_response_handoff"]["forbidden_inputs"], "blocked_actions": handoff["forbidden_actions"], "decision": "pending_owner_decision", "decision_reason": "pending_decision_reason", "affected_scope": "pending_affected_scope", "redacted_evidence_refs": [], "canonical_repo_ref": None, "repo_dirty_disposition": "pending_repo_dirty_disposition", "deployment_boundary_ref": None, "auth_abuse_boundary_ref": None, "external_agent_boundary_ref": None, "settlement_treasury_boundary_ref": None, "maintenance_window": "pending_maintenance_window", "rollback_owner": "pending_rollback_owner", "validation_plan": "pending_validation_plan", "followup_owner": "pending_followup_owner", "not_approval": True, } for key in FALSE_DRAFT_KEYS: draft[key] = False return draft def build_report(root: Path, handoff: dict[str, Any], generated_at: str | None) -> dict[str, Any]: report_time = generated_at or datetime.now(TAIPEI).isoformat(timespec="seconds") data_types = [item["data_type"] for item in handoff["data_classification_intake"]] drafts: list[dict[str, Any]] = [] for control in CONTROL_REQUESTS: data_classes = data_types if control["data_classes"] == "all" else list(control["data_classes"]) drafts.append( request_draft( scope_id=control["scope_id"], label=control["label"], request_kind=control["request_kind"], priority=control["priority"], routes=list(control["routes"]), boundary=control["boundary"], data_classes=data_classes, write_capable=bool(control["write_capable"]), treasury_related=bool(control["treasury_related"]), mcp_a2a_related=bool(control["mcp_a2a_related"]), handoff=handoff, ) ) for surface in handoff["product_surfaces"]: surface_id = surface["surface_id"] treasury_related = surface_id in {"a2a-agent-protocol", "admin-and-treasury"} mcp_a2a_related = surface_id in {"mcp-and-open-task-api", "a2a-agent-protocol", "automation-and-cron"} write_capable = surface_id in { "mcp-and-open-task-api", "a2a-agent-protocol", "automation-and-cron", "admin-and-treasury", "webhooks-and-traffic", } drafts.append( request_draft( scope_id=surface_id, label=surface_id.replace("-", " ").title(), request_kind="product_surface_boundary", priority="P0", routes=surface["routes"], boundary=surface["boundary"], data_classes=[], write_capable=write_capable, treasury_related=treasury_related, mcp_a2a_related=mcp_a2a_related, handoff=handoff, ) ) write_capable_drafts = [item for item in drafts if item["write_capable_scope"]] treasury_related_drafts = [item for item in drafts if item["treasury_related_scope"]] mcp_a2a_related_drafts = [item for item in drafts if item["mcp_a2a_related_scope"]] summary = { "request_draft_count": len(drafts), "control_boundary_request_count": len(CONTROL_REQUESTS), "product_surface_request_count": len(handoff["product_surfaces"]), "write_capable_request_draft_count": len(write_capable_drafts), "treasury_related_request_draft_count": len(treasury_related_drafts), "mcp_a2a_related_request_draft_count": len(mcp_a2a_related_drafts), "live_evidence_required_request_count": len(drafts), "request_field_count": len(REQUEST_FIELDS), "required_owner_field_count": len(REQUIRED_OWNER_FIELDS), "owner_role_field_count": len(handoff["owner_response_handoff"]["required_response_fields"]), "forbidden_input_count": len(handoff["owner_response_handoff"]["forbidden_inputs"]), "blocked_action_count": len(handoff["forbidden_actions"]), } for key in FALSE_COUNTER_KEYS: summary[key] = 0 execution_boundaries = { ("action_buttons_allowed" if key == "action_button_count" else key.replace("_count", "")): False for key in FALSE_COUNTER_KEYS } execution_boundaries["not_authorization"] = True return { "schema_version": "agent_bounty_owner_request_draft_v1", "generated_at": report_time, "git_commit": git_short_sha(root), "source_handoff_schema_version": handoff["schema_version"], "source_handoff_status": handoff["status"], "status": "owner_request_draft_ready_not_dispatched", "product_name": handoff["summary"]["product_name"], "summary": summary, "execution_boundaries": execution_boundaries, "request_fields": REQUEST_FIELDS, "required_owner_fields": REQUIRED_OWNER_FIELDS, "owner_role_fields": handoff["owner_response_handoff"]["required_response_fields"], "forbidden_inputs": handoff["owner_response_handoff"]["forbidden_inputs"], "blocked_actions": handoff["forbidden_actions"], "request_drafts": drafts, "next_steps": [ "人工送件前確認 product / security / source-control / deployment / external-agent / treasury owner role。", "owner 只能提供脫敏 repo refs、dirty workspace disposition、deployment boundary、auth / abuse boundary、MCP / A2A boundary、treasury boundary 與 validation plan。", "收到回覆後先做欄位完整性、敏感 payload 隔離、source-control / runtime / financial action 拒收與 reviewer checklist;不得直接 deploy、claim、submit、daemon、payout 或 send notification。", ], } def main() -> int: parser = argparse.ArgumentParser(description="IwoooS agent-bounty owner request draft 產生器") parser.add_argument("--root", default=".", help="repo root") parser.add_argument( "--handoff-report", default="docs/security/agent-bounty-iwooos-onboarding-handoff.snapshot.json", help="agent-bounty-iwooos-onboarding handoff JSON", ) parser.add_argument("--output", help="寫出 JSON 報告") parser.add_argument("--generated-at", help="固定報告時間,供 committed snapshot 使用") args = parser.parse_args() root = Path(args.root).resolve() handoff = load_json(root / args.handoff_report) report = build_report(root, handoff, args.generated_at) payload = json.dumps(report, ensure_ascii=False, indent=2, sort_keys=True) if args.output: output = Path(args.output) output.parent.mkdir(parents=True, exist_ok=True) output.write_text(payload + "\n", encoding="utf-8") else: print(payload) summary = report["summary"] print( "AGENT_BOUNTY_OWNER_REQUEST_DRAFT_OK " f"drafts={summary['request_draft_count']} " f"write_capable={summary['write_capable_request_draft_count']} " f"treasury={summary['treasury_related_request_draft_count']} " f"fields={summary['required_owner_field_count']} " f"sent={summary['request_sent_count']} " f"runtime_gate={summary['runtime_gate_count']}", file=sys.stderr, ) return 0 if __name__ == "__main__": sys.exit(main())