#!/usr/bin/env python3 import asyncio import json import os import random import time import urllib.error import urllib.request from pathlib import Path from datetime import datetime, timezone from typing import Any, Dict, List, Optional, Set try: import websockets except Exception: websockets = None RELAY_URL = os.getenv("NOSTR_RELAY_URL", "wss://relay.damus.io") NOSTR_TAG = os.getenv("NOSTR_TAG", "VibeWork_Bounty") NOSTR_TAG_FALLBACK = "A2A" NOSTR_LIMIT = max(int(os.getenv("NOSTR_LIMIT", "40") or "40"), 1) MCP_API_BASE = os.getenv("MCP_API_BASE", "https://agent.wooo.work") MCP_API_KEY = os.getenv("MCP_API_KEY", os.getenv("API_KEY", "")) MCP_TIMEOUT_SECONDS = max(float(os.getenv("MCP_TIMEOUT_SECONDS", "12") or "12"), 1) MCP_AGENT_ID = os.getenv("MCP_AGENT_ID", f"ecosystem-hunter-{random.randint(10000, 99999)}") DEVELOPER_WALLET = os.getenv("DEVELOPER_WALLET", "acct_ecosystem_hunter") AUTO_CLAIM = os.getenv("AUTO_CLAIM", "false").lower() in {"1", "true", "yes"} AUTO_SUBMIT = os.getenv("AUTO_SUBMIT", "false").lower() in {"1", "true", "yes"} AUTO_SUBMIT_PR_URL = os.getenv( "AUTO_SUBMIT_PR_URL", "https://github.com/vibework/a2a-ecosystem-hunter/pull/1" ) SUBMISSION_README = "Automated ecosystem probe submission from external A2A crawler." SUBMISSION_NOTES = "This is a connectivity probe payload to verify list/open/claim flow." RUN_DAEMON = os.getenv("RUN_DAEMON", "false").lower() in {"1", "true", "yes"} RECONNECTION_BACKOFF_SECONDS = max(float(os.getenv("RECONNECTION_BACKOFF_SECONDS", "20") or "20"), 1) NOSTR_SUB_ID = os.getenv("NOSTR_SUB_ID", f"vh-ecosystem-hunter-{int(time.time())}") SCAN_INTERVAL_SECONDS = max(float(os.getenv("SCAN_INTERVAL_SECONDS", "0") or "0"), 0) REPORT_PATH = os.getenv("ECOSYSTEM_REPORT_PATH", "artifacts/ecosystem_hunter_report.jsonl").strip() KNOWN_ENDPOINTS_ENV = [ item.strip() for item in os.getenv("KNOWN_MCP_ENDPOINTS", "").split(",") if item.strip() ] REPO_ROOT = Path(os.getenv("APP_ROOT", "").strip() or Path(__file__).resolve().parents[1]).resolve() ENDPOINTS_FILE = os.getenv( "MCP_ENDPOINTS_FILE", str(REPO_ROOT / "scripts" / "ecosystem-hunter-endpoints.txt"), ) RAW_ENDPOINTS = [item.strip() for item in os.getenv("EXTERNAL_MCP_ENDPOINTS", "").split(",") if item.strip()] def read_endpoint_file(path: Optional[str]) -> List[str]: if not path: return [] file_path = Path(path) if not file_path.exists(): return [] try: raw_lines = file_path.read_text(encoding="utf-8").splitlines() except Exception: return [] candidates = [] for line in raw_lines: normalized = line.strip() if not normalized or normalized.startswith("#"): continue candidates.append(normalized) return candidates def collect_seed_endpoints() -> List[str]: seen: Set[str] = set() ordered: List[str] = [] candidate_groups = [ RAW_ENDPOINTS, read_endpoint_file(ENDPOINTS_FILE), KNOWN_ENDPOINTS_ENV, ] for group in candidate_groups: for hint in group: normalized = normalize_endpoint(hint) if not normalized: continue canonical = _dedupe(normalized, seen) if canonical: ordered.append(canonical) if not ordered: ordered.append(normalize_endpoint(MCP_API_BASE) or MCP_API_BASE) return ordered def append_report_line(payload: Dict[str, Any]) -> None: if not REPORT_PATH: return path = Path(REPORT_PATH) if path.parent and not path.parent.exists(): path.parent.mkdir(parents=True, exist_ok=True) payload_to_write = { "ts": now_ts(), **payload, } with path.open("a", encoding="utf-8") as f: f.write(json.dumps(payload_to_write, ensure_ascii=False) + "\n") def _dedupe(base: str, seen: Set[str]) -> str: key = base.lower() if key in seen: return "" seen.add(key) return base def now_ts() -> str: return datetime.now(timezone.utc).isoformat() def normalize_endpoint(endpoint: str) -> Optional[str]: if not endpoint: return None raw = endpoint.strip() if not raw: return None base = raw.rstrip("/") if base.endswith("/api/mcp"): return base if base.endswith("/mcp"): return base if base.endswith("/api"): return f"{base}/mcp" if base.endswith("/mcp"): return base if base.endswith("/mcp/"): return base[:-1] return f"{base}/api/mcp" def discover_candidate_endpoints(event_payload: Dict[str, Any], tags: Dict[str, str], seen: Set[str]) -> List[str]: candidates: List[str] = [] endpoint_hints = [ event_payload.get("endpoint"), event_payload.get("mcp_endpoint"), event_payload.get("mcp"), event_payload.get("url"), ] tag_hint = tags.get("web") or tags.get("r") if tag_hint: endpoint_hints.append(tag_hint) for hint in endpoint_hints: normalized = normalize_endpoint(str(hint).strip()) if hint else None if not normalized: continue canonical = _dedupe(normalized, seen) if canonical: candidates.append(canonical) if KNOWN_ENDPOINTS_ENV: for hint in KNOWN_ENDPOINTS_ENV: normalized = normalize_endpoint(hint) if not normalized: continue canonical = _dedupe(normalized, seen) if canonical: candidates.append(canonical) return candidates def parse_nostr_tags(tags: Any) -> Dict[str, str]: tag_map: Dict[str, str] = {} if not isinstance(tags, list): return tag_map for item in tags: if not isinstance(item, list) or len(item) < 2: continue key = str(item[0]).strip().lower() value = str(item[1]).strip() if key and value: # keep the first seen to keep stable behavior tag_map.setdefault(key, value) return tag_map def build_tool_url(base: str, tool: str) -> str: normalized = normalize_endpoint(base) if not normalized: return "" return f"{normalized}/{tool}" def read_json(payload: str) -> Optional[Dict[str, Any]]: if not payload: return None try: parsed = json.loads(payload) return parsed if isinstance(parsed, dict) else None except Exception: return None def parse_bounty_payload(content: str) -> Dict[str, Any]: parsed = read_json(content) or {} if not isinstance(parsed, dict): return { "id": None, "title": "unknown", "reward": None, "currency": "USD", "endpoint": None, "source": NOSTR_TAG, "payload": {}, } bounty = parsed.get("payload") if isinstance(parsed.get("payload"), dict) else parsed.get("bounty", parsed) if not isinstance(bounty, dict): bounty = {} endpoint_hint = bounty.get("endpoint") or parsed.get("endpoint") or parsed.get("mcp_endpoint") source_hint = bounty.get("source") or parsed.get("source") or NOSTR_TAG raw_endpoint_list = parsed.get("endpoints") endpoints: List[str] = [] if isinstance(raw_endpoint_list, list): endpoints = [str(item).strip() for item in raw_endpoint_list if isinstance(item, str)] if not isinstance(bounty, dict): bounty = {} return { "id": bounty.get("id"), "title": bounty.get("title", "unknown"), "reward": bounty.get("reward"), "currency": bounty.get("currency", "USD"), "endpoint": endpoint_hint, "source": source_hint, "endpoints": endpoints, "payload": bounty, } def build_headers() -> Dict[str, str]: return { "Content-Type": "application/json", "Authorization": f"Bearer {MCP_API_KEY}", "x-agent-id": MCP_AGENT_ID, "x-agent-name": "vibework-ecosystem-hunter", "x-client-id": "vibework-a2a-hunter", "User-Agent": "vibework-ecosystem-hunter/1.0", "x-request-id": f"hunt-{int(time.time() * 1000)}", } def post_json(url: str, payload: Dict[str, Any]) -> Dict[str, Any]: if not MCP_API_KEY: raise RuntimeError("Missing MCP_API_KEY / API_KEY; set one to run MCP endpoints.") data = json.dumps(payload).encode("utf-8") request = urllib.request.Request(url, data=data, headers=build_headers(), method="POST") try: with urllib.request.urlopen(request, timeout=MCP_TIMEOUT_SECONDS) as response: response_body = response.read().decode("utf-8", errors="ignore") parsed_body = read_json(response_body) or {} return { "status": response.status, "data": parsed_body, } except urllib.error.HTTPError as error: error_body = error.read().decode("utf-8", errors="ignore") raise RuntimeError(f"HTTP {error.code}: {error_body or error.reason}") except urllib.error.URLError as error: raise RuntimeError(f"URLError: {error}") async def probe_one_endpoint(base_url: str, source: str, source_tag: str) -> None: normalized = normalize_endpoint(base_url) if not normalized: return print(f"\n[probe] target={normalized} source={source} tag={source_tag}") try: list_response = post_json(build_tool_url(normalized, "list_open_tasks"), {"skills": []}) list_data = list_response.get("data", {}) task_count = len(list_data.get("tasks", []) or []) print(f" ✅ list_open_tasks status={list_response.get('status')} count={task_count}") append_report_line( { "type": "probe", "target": normalized, "action": "list_open_tasks", "source": source, "source_tag": source_tag, "status": list_response.get("status"), "task_count": task_count, } ) except Exception as error: print(f" ⚠️ list_open_tasks failed: {error}") append_report_line( { "type": "probe", "target": normalized, "action": "list_open_tasks", "source": source, "source_tag": source_tag, "status": "failed", "error": str(error), } ) return tasks = list_data.get("tasks", []) if not isinstance(tasks, list) or not tasks: return top_task = tasks[0] if tasks else {} if not isinstance(top_task, dict): top_task = {} task_id = top_task.get("task_id") or top_task.get("id") or "unknown" task_title = top_task.get("title", "unknown") print(f" 🧩 top_task={task_id} title={task_title}") if not AUTO_CLAIM: return try: claim_response = post_json( build_tool_url(normalized, "claim_task"), { "task_id": task_id, "agent_id": MCP_AGENT_ID, "developer_wallet": DEVELOPER_WALLET, }, ) claim_data = claim_response.get("data", {}) claim_token = claim_data.get("claim_token") print(f" 🧪 claim_task status={claim_response.get('status')} token={str(claim_token)[:10] if claim_token else 'none'}") append_report_line( { "type": "probe", "target": normalized, "action": "claim_task", "source": source, "source_tag": source_tag, "status": claim_response.get("status"), "task_id": task_id, "has_claim_token": bool(claim_token), } ) except Exception as error: print(f" ⚠️ claim_task failed: {error}") append_report_line( { "type": "probe", "target": normalized, "action": "claim_task", "source": source, "source_tag": source_tag, "status": "failed", "error": str(error), "task_id": task_id, } ) return if not AUTO_SUBMIT or not claim_token: return try: submit_response = post_json( build_tool_url(normalized, "submit_solution"), { "task_id": task_id, "claim_token": claim_token, "deliverables": { "README.md": SUBMISSION_README, "notes.txt": SUBMISSION_NOTES, }, "github_pr_url": AUTO_SUBMIT_PR_URL, }, ) print(f" 🚀 submit_solution status={submit_response.get('status')}") append_report_line( { "type": "probe", "target": normalized, "action": "submit_solution", "source": source, "source_tag": source_tag, "status": submit_response.get("status"), "task_id": task_id, } ) except Exception as error: print(f" ⚠️ submit_solution failed: {error}") append_report_line( { "type": "probe", "target": normalized, "action": "submit_solution", "source": source, "source_tag": source_tag, "status": "failed", "error": str(error), "task_id": task_id, } ) async def probe_static_endpoints() -> None: static_endpoints = collect_seed_endpoints() append_report_line( { "type": "scanner_seed_snapshot", "source": "timer" if SCAN_INTERVAL_SECONDS > 0 else "bootstrap", "seed_count": len(static_endpoints), "seeds": static_endpoints, } ) for index, base in enumerate(static_endpoints): await probe_one_endpoint(base, f"seed:{index}", "static") async def listen_for_bounties() -> None: if websockets is None: print("⚠️ websocket support unavailable; skipping Nostr real-time bounty discovery") return seen_events = set() subscribe_message = [ "REQ", NOSTR_SUB_ID, { "kinds": [1], "#t": [NOSTR_TAG, NOSTR_TAG_FALLBACK], "limit": NOSTR_LIMIT, }, ] while True: print(f"🤖 [Nostr Agent Client] connecting {RELAY_URL} ...") try: async with websockets.connect(RELAY_URL) as ws: print("✅ connected. listening for bounty intents...") await ws.send(json.dumps(subscribe_message)) async for raw in ws: try: data = json.loads(raw) except Exception: continue if not isinstance(data, list) or len(data) < 2 or data[0] != "EVENT": continue event = data[2] if len(data) > 2 and isinstance(data[2], dict) else None if not event: continue event_id = event.get("id") if event_id in seen_events: continue seen_events.add(event_id) if len(seen_events) > 512: seen_events.pop() payload = parse_bounty_payload(event.get("content", "")) tags = parse_nostr_tags(event.get("tags")) event_seen_targets: Set[str] = set() event_stats = { "processed": 0, "candidates": 0, "endpoint_hits": 0, } endpoint_candidates = discover_candidate_endpoints(payload.get("payload", {}), tags, event_seen_targets) if payload.get("endpoint"): candidate = _dedupe(normalize_endpoint(payload.get("endpoint") or "") or "", event_seen_targets) if candidate: endpoint_candidates.append(candidate) extra_endpoints = payload.get("endpoints") or [] if isinstance(extra_endpoints, list): for hint in extra_endpoints: normalized = normalize_endpoint(str(hint).strip()) if not normalized: continue canonical = _dedupe(normalized, event_seen_targets) if canonical: endpoint_candidates.append(canonical) reward = payload.get("reward") currency = payload.get("currency", "USD") task_id = payload.get("id") title = payload.get("title", "unknown") print(f"\n🚨 [Nostr bounty] id={task_id} title={title} reward={reward} {currency}") event_stats["processed"] += 1 if not endpoint_candidates: print(f"🤖 [Nostr event] id={event_id} no actionable endpoint; skipped") append_report_line( { "type": "event", "event_id": event_id, "source": "nostr", "source_tag": NOSTR_TAG, "status": "ignored", "reason": "no_endpoint_candidate", } ) continue for endpoint in endpoint_candidates: event_stats["candidates"] += 1 await probe_one_endpoint(endpoint, f"nostr:{event_id}", tags.get("t") or NOSTR_TAG) event_stats["endpoint_hits"] += 1 append_report_line( { "type": "event", "event_id": event_id, "source": "nostr", "source_tag": NOSTR_TAG, "status": "processed", "candidates": event_stats["candidates"], "endpoint_hits": event_stats["endpoint_hits"], } ) except Exception as error: print(f"⚠️ Connection failed: {error}") print(f"⏳ Reconnect in {RECONNECTION_BACKOFF_SECONDS} seconds") await asyncio.sleep(RECONNECTION_BACKOFF_SECONDS) async def periodic_static_probe() -> None: if SCAN_INTERVAL_SECONDS <= 0: return while True: await asyncio.sleep(SCAN_INTERVAL_SECONDS) append_report_line( { "type": "scanner", "source": "timer", "status": "tick", "interval_seconds": SCAN_INTERVAL_SECONDS, } ) await probe_static_endpoints() async def main(): print(f"🛰️ [VibeWork Ecosystem Hunter] started at {now_ts()}") print(f" MCP base: {MCP_API_BASE}") print(f" API key configured: {'yes' if MCP_API_KEY else 'no'}") print(f" Auto claim: {'on' if AUTO_CLAIM else 'off'}") print(f" Auto submit: {'on' if AUTO_SUBMIT else 'off'}") print(f" Report file: {REPORT_PATH}") print(f" Static scan interval: {SCAN_INTERVAL_SECONDS}s") if KNOWN_ENDPOINTS_ENV: print(f" Seed endpoints: {', '.join(KNOWN_ENDPOINTS_ENV)}") await probe_static_endpoints() if RUN_DAEMON: if SCAN_INTERVAL_SECONDS > 0: append_report_line( { "type": "bootstrap", "status": "daemon_started_with_scan", "scan_interval_seconds": SCAN_INTERVAL_SECONDS, } ) if websockets is None: await periodic_static_probe() else: await asyncio.gather(listen_for_bounties(), periodic_static_probe()) else: append_report_line({"type": "bootstrap", "status": "daemon_started_only_realtime"}) await listen_for_bounties() else: append_report_line({"type": "bootstrap", "status": "single_run"}) if __name__ == "__main__": asyncio.run(main())