611 lines
20 KiB
Python
611 lines
20 KiB
Python
#!/usr/bin/env python3
|
|
import asyncio
|
|
import json
|
|
import os
|
|
import random
|
|
import time
|
|
import urllib.error
|
|
import urllib.request
|
|
from pathlib import Path
|
|
from datetime import datetime, timezone
|
|
from typing import Any, Dict, List, Optional, Set
|
|
|
|
try:
|
|
import websockets
|
|
except Exception:
|
|
websockets = None
|
|
|
|
|
|
RELAY_URL = os.getenv("NOSTR_RELAY_URL", "wss://relay.damus.io")
|
|
NOSTR_TAG = os.getenv("NOSTR_TAG", "VibeWork_Bounty")
|
|
NOSTR_TAG_FALLBACK = "A2A"
|
|
NOSTR_LIMIT = max(int(os.getenv("NOSTR_LIMIT", "40") or "40"), 1)
|
|
|
|
MCP_API_BASE = os.getenv("MCP_API_BASE", "https://agent.wooo.work")
|
|
MCP_API_KEY = os.getenv("MCP_API_KEY", os.getenv("API_KEY", ""))
|
|
MCP_TIMEOUT_SECONDS = max(float(os.getenv("MCP_TIMEOUT_SECONDS", "12") or "12"), 1)
|
|
MCP_AGENT_ID = os.getenv("MCP_AGENT_ID", f"ecosystem-hunter-{random.randint(10000, 99999)}")
|
|
DEVELOPER_WALLET = os.getenv("DEVELOPER_WALLET", "acct_ecosystem_hunter")
|
|
AUTO_CLAIM = os.getenv("AUTO_CLAIM", "false").lower() in {"1", "true", "yes"}
|
|
AUTO_SUBMIT = os.getenv("AUTO_SUBMIT", "false").lower() in {"1", "true", "yes"}
|
|
AUTO_SUBMIT_PR_URL = os.getenv(
|
|
"AUTO_SUBMIT_PR_URL",
|
|
"https://github.com/vibework/a2a-ecosystem-hunter/pull/1"
|
|
)
|
|
SUBMISSION_README = "Automated ecosystem probe submission from external A2A crawler."
|
|
SUBMISSION_NOTES = "This is a connectivity probe payload to verify list/open/claim flow."
|
|
RUN_DAEMON = os.getenv("RUN_DAEMON", "false").lower() in {"1", "true", "yes"}
|
|
RECONNECTION_BACKOFF_SECONDS = max(float(os.getenv("RECONNECTION_BACKOFF_SECONDS", "20") or "20"), 1)
|
|
NOSTR_SUB_ID = os.getenv("NOSTR_SUB_ID", f"vh-ecosystem-hunter-{int(time.time())}")
|
|
SCAN_INTERVAL_SECONDS = max(float(os.getenv("SCAN_INTERVAL_SECONDS", "0") or "0"), 0)
|
|
|
|
REPORT_PATH = os.getenv("ECOSYSTEM_REPORT_PATH", "artifacts/ecosystem_hunter_report.jsonl").strip()
|
|
|
|
KNOWN_ENDPOINTS_ENV = [
|
|
item.strip() for item in os.getenv("KNOWN_MCP_ENDPOINTS", "").split(",") if item.strip()
|
|
]
|
|
REPO_ROOT = Path(os.getenv("APP_ROOT", "").strip() or Path(__file__).resolve().parents[1]).resolve()
|
|
ENDPOINTS_FILE = os.getenv(
|
|
"MCP_ENDPOINTS_FILE",
|
|
str(REPO_ROOT / "scripts" / "ecosystem-hunter-endpoints.txt"),
|
|
)
|
|
|
|
|
|
RAW_ENDPOINTS = [item.strip() for item in os.getenv("EXTERNAL_MCP_ENDPOINTS", "").split(",") if item.strip()]
|
|
|
|
|
|
def read_endpoint_file(path: Optional[str]) -> List[str]:
|
|
if not path:
|
|
return []
|
|
|
|
file_path = Path(path)
|
|
if not file_path.exists():
|
|
return []
|
|
|
|
try:
|
|
raw_lines = file_path.read_text(encoding="utf-8").splitlines()
|
|
except Exception:
|
|
return []
|
|
|
|
candidates = []
|
|
for line in raw_lines:
|
|
normalized = line.strip()
|
|
if not normalized or normalized.startswith("#"):
|
|
continue
|
|
candidates.append(normalized)
|
|
return candidates
|
|
|
|
|
|
def collect_seed_endpoints() -> List[str]:
|
|
seen: Set[str] = set()
|
|
ordered: List[str] = []
|
|
|
|
candidate_groups = [
|
|
RAW_ENDPOINTS,
|
|
read_endpoint_file(ENDPOINTS_FILE),
|
|
KNOWN_ENDPOINTS_ENV,
|
|
]
|
|
|
|
for group in candidate_groups:
|
|
for hint in group:
|
|
normalized = normalize_endpoint(hint)
|
|
if not normalized:
|
|
continue
|
|
canonical = _dedupe(normalized, seen)
|
|
if canonical:
|
|
ordered.append(canonical)
|
|
|
|
if not ordered:
|
|
ordered.append(normalize_endpoint(MCP_API_BASE) or MCP_API_BASE)
|
|
|
|
return ordered
|
|
|
|
def append_report_line(payload: Dict[str, Any]) -> None:
|
|
if not REPORT_PATH:
|
|
return
|
|
|
|
path = Path(REPORT_PATH)
|
|
if path.parent and not path.parent.exists():
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
payload_to_write = {
|
|
"ts": now_ts(),
|
|
**payload,
|
|
}
|
|
|
|
with path.open("a", encoding="utf-8") as f:
|
|
f.write(json.dumps(payload_to_write, ensure_ascii=False) + "\n")
|
|
|
|
|
|
def _dedupe(base: str, seen: Set[str]) -> str:
|
|
key = base.lower()
|
|
if key in seen:
|
|
return ""
|
|
seen.add(key)
|
|
return base
|
|
|
|
|
|
def now_ts() -> str:
|
|
return datetime.now(timezone.utc).isoformat()
|
|
|
|
|
|
def normalize_endpoint(endpoint: str) -> Optional[str]:
|
|
if not endpoint:
|
|
return None
|
|
|
|
raw = endpoint.strip()
|
|
if not raw:
|
|
return None
|
|
|
|
base = raw.rstrip("/")
|
|
if base.endswith("/api/mcp"):
|
|
return base
|
|
if base.endswith("/mcp"):
|
|
return base
|
|
if base.endswith("/api"):
|
|
return f"{base}/mcp"
|
|
|
|
if base.endswith("/mcp"):
|
|
return base
|
|
|
|
if base.endswith("/mcp/"):
|
|
return base[:-1]
|
|
|
|
return f"{base}/api/mcp"
|
|
|
|
|
|
def discover_candidate_endpoints(event_payload: Dict[str, Any], tags: Dict[str, str], seen: Set[str]) -> List[str]:
|
|
candidates: List[str] = []
|
|
|
|
endpoint_hints = [
|
|
event_payload.get("endpoint"),
|
|
event_payload.get("mcp_endpoint"),
|
|
event_payload.get("mcp"),
|
|
event_payload.get("url"),
|
|
]
|
|
|
|
tag_hint = tags.get("web") or tags.get("r")
|
|
if tag_hint:
|
|
endpoint_hints.append(tag_hint)
|
|
|
|
for hint in endpoint_hints:
|
|
normalized = normalize_endpoint(str(hint).strip()) if hint else None
|
|
if not normalized:
|
|
continue
|
|
canonical = _dedupe(normalized, seen)
|
|
if canonical:
|
|
candidates.append(canonical)
|
|
|
|
if KNOWN_ENDPOINTS_ENV:
|
|
for hint in KNOWN_ENDPOINTS_ENV:
|
|
normalized = normalize_endpoint(hint)
|
|
if not normalized:
|
|
continue
|
|
canonical = _dedupe(normalized, seen)
|
|
if canonical:
|
|
candidates.append(canonical)
|
|
|
|
return candidates
|
|
|
|
|
|
def parse_nostr_tags(tags: Any) -> Dict[str, str]:
|
|
tag_map: Dict[str, str] = {}
|
|
if not isinstance(tags, list):
|
|
return tag_map
|
|
|
|
for item in tags:
|
|
if not isinstance(item, list) or len(item) < 2:
|
|
continue
|
|
key = str(item[0]).strip().lower()
|
|
value = str(item[1]).strip()
|
|
if key and value:
|
|
# keep the first seen to keep stable behavior
|
|
tag_map.setdefault(key, value)
|
|
|
|
return tag_map
|
|
|
|
|
|
def build_tool_url(base: str, tool: str) -> str:
|
|
normalized = normalize_endpoint(base)
|
|
if not normalized:
|
|
return ""
|
|
return f"{normalized}/{tool}"
|
|
|
|
|
|
def read_json(payload: str) -> Optional[Dict[str, Any]]:
|
|
if not payload:
|
|
return None
|
|
try:
|
|
parsed = json.loads(payload)
|
|
return parsed if isinstance(parsed, dict) else None
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def parse_bounty_payload(content: str) -> Dict[str, Any]:
|
|
parsed = read_json(content) or {}
|
|
if not isinstance(parsed, dict):
|
|
return {
|
|
"id": None,
|
|
"title": "unknown",
|
|
"reward": None,
|
|
"currency": "USD",
|
|
"endpoint": None,
|
|
"source": NOSTR_TAG,
|
|
"payload": {},
|
|
}
|
|
|
|
bounty = parsed.get("payload") if isinstance(parsed.get("payload"), dict) else parsed.get("bounty", parsed)
|
|
if not isinstance(bounty, dict):
|
|
bounty = {}
|
|
|
|
endpoint_hint = bounty.get("endpoint") or parsed.get("endpoint") or parsed.get("mcp_endpoint")
|
|
source_hint = bounty.get("source") or parsed.get("source") or NOSTR_TAG
|
|
|
|
raw_endpoint_list = parsed.get("endpoints")
|
|
endpoints: List[str] = []
|
|
if isinstance(raw_endpoint_list, list):
|
|
endpoints = [str(item).strip() for item in raw_endpoint_list if isinstance(item, str)]
|
|
|
|
if not isinstance(bounty, dict):
|
|
bounty = {}
|
|
|
|
return {
|
|
"id": bounty.get("id"),
|
|
"title": bounty.get("title", "unknown"),
|
|
"reward": bounty.get("reward"),
|
|
"currency": bounty.get("currency", "USD"),
|
|
"endpoint": endpoint_hint,
|
|
"source": source_hint,
|
|
"endpoints": endpoints,
|
|
"payload": bounty,
|
|
}
|
|
|
|
|
|
def build_headers() -> Dict[str, str]:
|
|
return {
|
|
"Content-Type": "application/json",
|
|
"Authorization": f"Bearer {MCP_API_KEY}",
|
|
"x-agent-id": MCP_AGENT_ID,
|
|
"x-agent-name": "vibework-ecosystem-hunter",
|
|
"x-client-id": "vibework-a2a-hunter",
|
|
"User-Agent": "vibework-ecosystem-hunter/1.0",
|
|
"x-request-id": f"hunt-{int(time.time() * 1000)}",
|
|
}
|
|
|
|
|
|
def post_json(url: str, payload: Dict[str, Any]) -> Dict[str, Any]:
|
|
if not MCP_API_KEY:
|
|
raise RuntimeError("Missing MCP_API_KEY / API_KEY; set one to run MCP endpoints.")
|
|
|
|
data = json.dumps(payload).encode("utf-8")
|
|
request = urllib.request.Request(url, data=data, headers=build_headers(), method="POST")
|
|
|
|
try:
|
|
with urllib.request.urlopen(request, timeout=MCP_TIMEOUT_SECONDS) as response:
|
|
response_body = response.read().decode("utf-8", errors="ignore")
|
|
parsed_body = read_json(response_body) or {}
|
|
return {
|
|
"status": response.status,
|
|
"data": parsed_body,
|
|
}
|
|
except urllib.error.HTTPError as error:
|
|
error_body = error.read().decode("utf-8", errors="ignore")
|
|
raise RuntimeError(f"HTTP {error.code}: {error_body or error.reason}")
|
|
except urllib.error.URLError as error:
|
|
raise RuntimeError(f"URLError: {error}")
|
|
|
|
|
|
async def probe_one_endpoint(base_url: str, source: str, source_tag: str) -> None:
|
|
normalized = normalize_endpoint(base_url)
|
|
if not normalized:
|
|
return
|
|
|
|
print(f"\n[probe] target={normalized} source={source} tag={source_tag}")
|
|
|
|
try:
|
|
list_response = post_json(build_tool_url(normalized, "list_open_tasks"), {"skills": []})
|
|
list_data = list_response.get("data", {})
|
|
task_count = len(list_data.get("tasks", []) or [])
|
|
print(f" ✅ list_open_tasks status={list_response.get('status')} count={task_count}")
|
|
append_report_line(
|
|
{
|
|
"type": "probe",
|
|
"target": normalized,
|
|
"action": "list_open_tasks",
|
|
"source": source,
|
|
"source_tag": source_tag,
|
|
"status": list_response.get("status"),
|
|
"task_count": task_count,
|
|
}
|
|
)
|
|
except Exception as error:
|
|
print(f" ⚠️ list_open_tasks failed: {error}")
|
|
append_report_line(
|
|
{
|
|
"type": "probe",
|
|
"target": normalized,
|
|
"action": "list_open_tasks",
|
|
"source": source,
|
|
"source_tag": source_tag,
|
|
"status": "failed",
|
|
"error": str(error),
|
|
}
|
|
)
|
|
return
|
|
|
|
tasks = list_data.get("tasks", [])
|
|
if not isinstance(tasks, list) or not tasks:
|
|
return
|
|
|
|
top_task = tasks[0] if tasks else {}
|
|
if not isinstance(top_task, dict):
|
|
top_task = {}
|
|
task_id = top_task.get("task_id") or top_task.get("id") or "unknown"
|
|
task_title = top_task.get("title", "unknown")
|
|
print(f" 🧩 top_task={task_id} title={task_title}")
|
|
|
|
if not AUTO_CLAIM:
|
|
return
|
|
|
|
try:
|
|
claim_response = post_json(
|
|
build_tool_url(normalized, "claim_task"),
|
|
{
|
|
"task_id": task_id,
|
|
"agent_id": MCP_AGENT_ID,
|
|
"developer_wallet": DEVELOPER_WALLET,
|
|
},
|
|
)
|
|
claim_data = claim_response.get("data", {})
|
|
claim_token = claim_data.get("claim_token")
|
|
print(f" 🧪 claim_task status={claim_response.get('status')} token={str(claim_token)[:10] if claim_token else 'none'}")
|
|
append_report_line(
|
|
{
|
|
"type": "probe",
|
|
"target": normalized,
|
|
"action": "claim_task",
|
|
"source": source,
|
|
"source_tag": source_tag,
|
|
"status": claim_response.get("status"),
|
|
"task_id": task_id,
|
|
"has_claim_token": bool(claim_token),
|
|
}
|
|
)
|
|
except Exception as error:
|
|
print(f" ⚠️ claim_task failed: {error}")
|
|
append_report_line(
|
|
{
|
|
"type": "probe",
|
|
"target": normalized,
|
|
"action": "claim_task",
|
|
"source": source,
|
|
"source_tag": source_tag,
|
|
"status": "failed",
|
|
"error": str(error),
|
|
"task_id": task_id,
|
|
}
|
|
)
|
|
return
|
|
|
|
if not AUTO_SUBMIT or not claim_token:
|
|
return
|
|
|
|
try:
|
|
submit_response = post_json(
|
|
build_tool_url(normalized, "submit_solution"),
|
|
{
|
|
"task_id": task_id,
|
|
"claim_token": claim_token,
|
|
"deliverables": {
|
|
"README.md": SUBMISSION_README,
|
|
"notes.txt": SUBMISSION_NOTES,
|
|
},
|
|
"github_pr_url": AUTO_SUBMIT_PR_URL,
|
|
},
|
|
)
|
|
print(f" 🚀 submit_solution status={submit_response.get('status')}")
|
|
append_report_line(
|
|
{
|
|
"type": "probe",
|
|
"target": normalized,
|
|
"action": "submit_solution",
|
|
"source": source,
|
|
"source_tag": source_tag,
|
|
"status": submit_response.get("status"),
|
|
"task_id": task_id,
|
|
}
|
|
)
|
|
except Exception as error:
|
|
print(f" ⚠️ submit_solution failed: {error}")
|
|
append_report_line(
|
|
{
|
|
"type": "probe",
|
|
"target": normalized,
|
|
"action": "submit_solution",
|
|
"source": source,
|
|
"source_tag": source_tag,
|
|
"status": "failed",
|
|
"error": str(error),
|
|
"task_id": task_id,
|
|
}
|
|
)
|
|
|
|
|
|
async def probe_static_endpoints() -> None:
|
|
static_endpoints = collect_seed_endpoints()
|
|
append_report_line(
|
|
{
|
|
"type": "scanner_seed_snapshot",
|
|
"source": "timer" if SCAN_INTERVAL_SECONDS > 0 else "bootstrap",
|
|
"seed_count": len(static_endpoints),
|
|
"seeds": static_endpoints,
|
|
}
|
|
)
|
|
|
|
for index, base in enumerate(static_endpoints):
|
|
await probe_one_endpoint(base, f"seed:{index}", "static")
|
|
|
|
|
|
async def listen_for_bounties() -> None:
|
|
if websockets is None:
|
|
print("⚠️ websocket support unavailable; skipping Nostr real-time bounty discovery")
|
|
return
|
|
|
|
seen_events = set()
|
|
subscribe_message = [
|
|
"REQ",
|
|
NOSTR_SUB_ID,
|
|
{
|
|
"kinds": [1],
|
|
"#t": [NOSTR_TAG, NOSTR_TAG_FALLBACK],
|
|
"limit": NOSTR_LIMIT,
|
|
},
|
|
]
|
|
|
|
while True:
|
|
print(f"🤖 [Nostr Agent Client] connecting {RELAY_URL} ...")
|
|
try:
|
|
async with websockets.connect(RELAY_URL) as ws:
|
|
print("✅ connected. listening for bounty intents...")
|
|
await ws.send(json.dumps(subscribe_message))
|
|
async for raw in ws:
|
|
try:
|
|
data = json.loads(raw)
|
|
except Exception:
|
|
continue
|
|
|
|
if not isinstance(data, list) or len(data) < 2 or data[0] != "EVENT":
|
|
continue
|
|
|
|
event = data[2] if len(data) > 2 and isinstance(data[2], dict) else None
|
|
if not event:
|
|
continue
|
|
|
|
event_id = event.get("id")
|
|
if event_id in seen_events:
|
|
continue
|
|
seen_events.add(event_id)
|
|
if len(seen_events) > 512:
|
|
seen_events.pop()
|
|
|
|
payload = parse_bounty_payload(event.get("content", ""))
|
|
tags = parse_nostr_tags(event.get("tags"))
|
|
event_seen_targets: Set[str] = set()
|
|
event_stats = {
|
|
"processed": 0,
|
|
"candidates": 0,
|
|
"endpoint_hits": 0,
|
|
}
|
|
endpoint_candidates = discover_candidate_endpoints(payload.get("payload", {}), tags, event_seen_targets)
|
|
if payload.get("endpoint"):
|
|
candidate = _dedupe(normalize_endpoint(payload.get("endpoint") or "") or "", event_seen_targets)
|
|
if candidate:
|
|
endpoint_candidates.append(candidate)
|
|
|
|
extra_endpoints = payload.get("endpoints") or []
|
|
if isinstance(extra_endpoints, list):
|
|
for hint in extra_endpoints:
|
|
normalized = normalize_endpoint(str(hint).strip())
|
|
if not normalized:
|
|
continue
|
|
canonical = _dedupe(normalized, event_seen_targets)
|
|
if canonical:
|
|
endpoint_candidates.append(canonical)
|
|
|
|
reward = payload.get("reward")
|
|
currency = payload.get("currency", "USD")
|
|
task_id = payload.get("id")
|
|
title = payload.get("title", "unknown")
|
|
print(f"\n🚨 [Nostr bounty] id={task_id} title={title} reward={reward} {currency}")
|
|
event_stats["processed"] += 1
|
|
|
|
if not endpoint_candidates:
|
|
print(f"🤖 [Nostr event] id={event_id} no actionable endpoint; skipped")
|
|
append_report_line(
|
|
{
|
|
"type": "event",
|
|
"event_id": event_id,
|
|
"source": "nostr",
|
|
"source_tag": NOSTR_TAG,
|
|
"status": "ignored",
|
|
"reason": "no_endpoint_candidate",
|
|
}
|
|
)
|
|
continue
|
|
|
|
for endpoint in endpoint_candidates:
|
|
event_stats["candidates"] += 1
|
|
await probe_one_endpoint(endpoint, f"nostr:{event_id}", tags.get("t") or NOSTR_TAG)
|
|
event_stats["endpoint_hits"] += 1
|
|
|
|
append_report_line(
|
|
{
|
|
"type": "event",
|
|
"event_id": event_id,
|
|
"source": "nostr",
|
|
"source_tag": NOSTR_TAG,
|
|
"status": "processed",
|
|
"candidates": event_stats["candidates"],
|
|
"endpoint_hits": event_stats["endpoint_hits"],
|
|
}
|
|
)
|
|
|
|
except Exception as error:
|
|
print(f"⚠️ Connection failed: {error}")
|
|
print(f"⏳ Reconnect in {RECONNECTION_BACKOFF_SECONDS} seconds")
|
|
await asyncio.sleep(RECONNECTION_BACKOFF_SECONDS)
|
|
|
|
|
|
async def periodic_static_probe() -> None:
|
|
if SCAN_INTERVAL_SECONDS <= 0:
|
|
return
|
|
|
|
while True:
|
|
await asyncio.sleep(SCAN_INTERVAL_SECONDS)
|
|
append_report_line(
|
|
{
|
|
"type": "scanner",
|
|
"source": "timer",
|
|
"status": "tick",
|
|
"interval_seconds": SCAN_INTERVAL_SECONDS,
|
|
}
|
|
)
|
|
await probe_static_endpoints()
|
|
|
|
|
|
async def main():
|
|
print(f"🛰️ [VibeWork Ecosystem Hunter] started at {now_ts()}")
|
|
print(f" MCP base: {MCP_API_BASE}")
|
|
print(f" API key configured: {'yes' if MCP_API_KEY else 'no'}")
|
|
print(f" Auto claim: {'on' if AUTO_CLAIM else 'off'}")
|
|
print(f" Auto submit: {'on' if AUTO_SUBMIT else 'off'}")
|
|
print(f" Report file: {REPORT_PATH}")
|
|
print(f" Static scan interval: {SCAN_INTERVAL_SECONDS}s")
|
|
if KNOWN_ENDPOINTS_ENV:
|
|
print(f" Seed endpoints: {', '.join(KNOWN_ENDPOINTS_ENV)}")
|
|
|
|
await probe_static_endpoints()
|
|
if RUN_DAEMON:
|
|
if SCAN_INTERVAL_SECONDS > 0:
|
|
append_report_line(
|
|
{
|
|
"type": "bootstrap",
|
|
"status": "daemon_started_with_scan",
|
|
"scan_interval_seconds": SCAN_INTERVAL_SECONDS,
|
|
}
|
|
)
|
|
if websockets is None:
|
|
await periodic_static_probe()
|
|
else:
|
|
await asyncio.gather(listen_for_bounties(), periodic_static_probe())
|
|
else:
|
|
append_report_line({"type": "bootstrap", "status": "daemon_started_only_realtime"})
|
|
await listen_for_bounties()
|
|
else:
|
|
append_report_line({"type": "bootstrap", "status": "single_run"})
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|