Files
agent-bounty-protocol/scripts/nostr_agent_client.py
OG T 752a4a45d7
Some checks failed
Deploy to 110 WOOO Server / deploy (push) Failing after 8s
feat: Enhance login page UI with delayed redirect instead of transparent 307
2026-06-08 18:37:35 +08:00

611 lines
20 KiB
Python

#!/usr/bin/env python3
import asyncio
import json
import os
import random
import time
import urllib.error
import urllib.request
from pathlib import Path
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional, Set
try:
import websockets
except Exception:
websockets = None
RELAY_URL = os.getenv("NOSTR_RELAY_URL", "wss://relay.damus.io")
NOSTR_TAG = os.getenv("NOSTR_TAG", "VibeWork_Bounty")
NOSTR_TAG_FALLBACK = "A2A"
NOSTR_LIMIT = max(int(os.getenv("NOSTR_LIMIT", "40") or "40"), 1)
MCP_API_BASE = os.getenv("MCP_API_BASE", "https://agent.wooo.work")
MCP_API_KEY = os.getenv("MCP_API_KEY", os.getenv("API_KEY", ""))
MCP_TIMEOUT_SECONDS = max(float(os.getenv("MCP_TIMEOUT_SECONDS", "12") or "12"), 1)
MCP_AGENT_ID = os.getenv("MCP_AGENT_ID", f"ecosystem-hunter-{random.randint(10000, 99999)}")
DEVELOPER_WALLET = os.getenv("DEVELOPER_WALLET", "acct_ecosystem_hunter")
AUTO_CLAIM = os.getenv("AUTO_CLAIM", "false").lower() in {"1", "true", "yes"}
AUTO_SUBMIT = os.getenv("AUTO_SUBMIT", "false").lower() in {"1", "true", "yes"}
AUTO_SUBMIT_PR_URL = os.getenv(
"AUTO_SUBMIT_PR_URL",
"https://github.com/vibework/a2a-ecosystem-hunter/pull/1"
)
SUBMISSION_README = "Automated ecosystem probe submission from external A2A crawler."
SUBMISSION_NOTES = "This is a connectivity probe payload to verify list/open/claim flow."
RUN_DAEMON = os.getenv("RUN_DAEMON", "false").lower() in {"1", "true", "yes"}
RECONNECTION_BACKOFF_SECONDS = max(float(os.getenv("RECONNECTION_BACKOFF_SECONDS", "20") or "20"), 1)
NOSTR_SUB_ID = os.getenv("NOSTR_SUB_ID", f"vh-ecosystem-hunter-{int(time.time())}")
SCAN_INTERVAL_SECONDS = max(float(os.getenv("SCAN_INTERVAL_SECONDS", "0") or "0"), 0)
REPORT_PATH = os.getenv("ECOSYSTEM_REPORT_PATH", "artifacts/ecosystem_hunter_report.jsonl").strip()
KNOWN_ENDPOINTS_ENV = [
item.strip() for item in os.getenv("KNOWN_MCP_ENDPOINTS", "").split(",") if item.strip()
]
REPO_ROOT = Path(os.getenv("APP_ROOT", "").strip() or Path(__file__).resolve().parents[1]).resolve()
ENDPOINTS_FILE = os.getenv(
"MCP_ENDPOINTS_FILE",
str(REPO_ROOT / "scripts" / "ecosystem-hunter-endpoints.txt"),
)
RAW_ENDPOINTS = [item.strip() for item in os.getenv("EXTERNAL_MCP_ENDPOINTS", "").split(",") if item.strip()]
def read_endpoint_file(path: Optional[str]) -> List[str]:
if not path:
return []
file_path = Path(path)
if not file_path.exists():
return []
try:
raw_lines = file_path.read_text(encoding="utf-8").splitlines()
except Exception:
return []
candidates = []
for line in raw_lines:
normalized = line.strip()
if not normalized or normalized.startswith("#"):
continue
candidates.append(normalized)
return candidates
def collect_seed_endpoints() -> List[str]:
seen: Set[str] = set()
ordered: List[str] = []
candidate_groups = [
RAW_ENDPOINTS,
read_endpoint_file(ENDPOINTS_FILE),
KNOWN_ENDPOINTS_ENV,
]
for group in candidate_groups:
for hint in group:
normalized = normalize_endpoint(hint)
if not normalized:
continue
canonical = _dedupe(normalized, seen)
if canonical:
ordered.append(canonical)
if not ordered:
ordered.append(normalize_endpoint(MCP_API_BASE) or MCP_API_BASE)
return ordered
def append_report_line(payload: Dict[str, Any]) -> None:
if not REPORT_PATH:
return
path = Path(REPORT_PATH)
if path.parent and not path.parent.exists():
path.parent.mkdir(parents=True, exist_ok=True)
payload_to_write = {
"ts": now_ts(),
**payload,
}
with path.open("a", encoding="utf-8") as f:
f.write(json.dumps(payload_to_write, ensure_ascii=False) + "\n")
def _dedupe(base: str, seen: Set[str]) -> str:
key = base.lower()
if key in seen:
return ""
seen.add(key)
return base
def now_ts() -> str:
return datetime.now(timezone.utc).isoformat()
def normalize_endpoint(endpoint: str) -> Optional[str]:
if not endpoint:
return None
raw = endpoint.strip()
if not raw:
return None
base = raw.rstrip("/")
if base.endswith("/api/mcp"):
return base
if base.endswith("/mcp"):
return base
if base.endswith("/api"):
return f"{base}/mcp"
if base.endswith("/mcp"):
return base
if base.endswith("/mcp/"):
return base[:-1]
return f"{base}/api/mcp"
def discover_candidate_endpoints(event_payload: Dict[str, Any], tags: Dict[str, str], seen: Set[str]) -> List[str]:
candidates: List[str] = []
endpoint_hints = [
event_payload.get("endpoint"),
event_payload.get("mcp_endpoint"),
event_payload.get("mcp"),
event_payload.get("url"),
]
tag_hint = tags.get("web") or tags.get("r")
if tag_hint:
endpoint_hints.append(tag_hint)
for hint in endpoint_hints:
normalized = normalize_endpoint(str(hint).strip()) if hint else None
if not normalized:
continue
canonical = _dedupe(normalized, seen)
if canonical:
candidates.append(canonical)
if KNOWN_ENDPOINTS_ENV:
for hint in KNOWN_ENDPOINTS_ENV:
normalized = normalize_endpoint(hint)
if not normalized:
continue
canonical = _dedupe(normalized, seen)
if canonical:
candidates.append(canonical)
return candidates
def parse_nostr_tags(tags: Any) -> Dict[str, str]:
tag_map: Dict[str, str] = {}
if not isinstance(tags, list):
return tag_map
for item in tags:
if not isinstance(item, list) or len(item) < 2:
continue
key = str(item[0]).strip().lower()
value = str(item[1]).strip()
if key and value:
# keep the first seen to keep stable behavior
tag_map.setdefault(key, value)
return tag_map
def build_tool_url(base: str, tool: str) -> str:
normalized = normalize_endpoint(base)
if not normalized:
return ""
return f"{normalized}/{tool}"
def read_json(payload: str) -> Optional[Dict[str, Any]]:
if not payload:
return None
try:
parsed = json.loads(payload)
return parsed if isinstance(parsed, dict) else None
except Exception:
return None
def parse_bounty_payload(content: str) -> Dict[str, Any]:
parsed = read_json(content) or {}
if not isinstance(parsed, dict):
return {
"id": None,
"title": "unknown",
"reward": None,
"currency": "USD",
"endpoint": None,
"source": NOSTR_TAG,
"payload": {},
}
bounty = parsed.get("payload") if isinstance(parsed.get("payload"), dict) else parsed.get("bounty", parsed)
if not isinstance(bounty, dict):
bounty = {}
endpoint_hint = bounty.get("endpoint") or parsed.get("endpoint") or parsed.get("mcp_endpoint")
source_hint = bounty.get("source") or parsed.get("source") or NOSTR_TAG
raw_endpoint_list = parsed.get("endpoints")
endpoints: List[str] = []
if isinstance(raw_endpoint_list, list):
endpoints = [str(item).strip() for item in raw_endpoint_list if isinstance(item, str)]
if not isinstance(bounty, dict):
bounty = {}
return {
"id": bounty.get("id"),
"title": bounty.get("title", "unknown"),
"reward": bounty.get("reward"),
"currency": bounty.get("currency", "USD"),
"endpoint": endpoint_hint,
"source": source_hint,
"endpoints": endpoints,
"payload": bounty,
}
def build_headers() -> Dict[str, str]:
return {
"Content-Type": "application/json",
"Authorization": f"Bearer {MCP_API_KEY}",
"x-agent-id": MCP_AGENT_ID,
"x-agent-name": "vibework-ecosystem-hunter",
"x-client-id": "vibework-a2a-hunter",
"User-Agent": "vibework-ecosystem-hunter/1.0",
"x-request-id": f"hunt-{int(time.time() * 1000)}",
}
def post_json(url: str, payload: Dict[str, Any]) -> Dict[str, Any]:
if not MCP_API_KEY:
raise RuntimeError("Missing MCP_API_KEY / API_KEY; set one to run MCP endpoints.")
data = json.dumps(payload).encode("utf-8")
request = urllib.request.Request(url, data=data, headers=build_headers(), method="POST")
try:
with urllib.request.urlopen(request, timeout=MCP_TIMEOUT_SECONDS) as response:
response_body = response.read().decode("utf-8", errors="ignore")
parsed_body = read_json(response_body) or {}
return {
"status": response.status,
"data": parsed_body,
}
except urllib.error.HTTPError as error:
error_body = error.read().decode("utf-8", errors="ignore")
raise RuntimeError(f"HTTP {error.code}: {error_body or error.reason}")
except urllib.error.URLError as error:
raise RuntimeError(f"URLError: {error}")
async def probe_one_endpoint(base_url: str, source: str, source_tag: str) -> None:
normalized = normalize_endpoint(base_url)
if not normalized:
return
print(f"\n[probe] target={normalized} source={source} tag={source_tag}")
try:
list_response = post_json(build_tool_url(normalized, "list_open_tasks"), {"skills": []})
list_data = list_response.get("data", {})
task_count = len(list_data.get("tasks", []) or [])
print(f" ✅ list_open_tasks status={list_response.get('status')} count={task_count}")
append_report_line(
{
"type": "probe",
"target": normalized,
"action": "list_open_tasks",
"source": source,
"source_tag": source_tag,
"status": list_response.get("status"),
"task_count": task_count,
}
)
except Exception as error:
print(f" ⚠️ list_open_tasks failed: {error}")
append_report_line(
{
"type": "probe",
"target": normalized,
"action": "list_open_tasks",
"source": source,
"source_tag": source_tag,
"status": "failed",
"error": str(error),
}
)
return
tasks = list_data.get("tasks", [])
if not isinstance(tasks, list) or not tasks:
return
top_task = tasks[0] if tasks else {}
if not isinstance(top_task, dict):
top_task = {}
task_id = top_task.get("task_id") or top_task.get("id") or "unknown"
task_title = top_task.get("title", "unknown")
print(f" 🧩 top_task={task_id} title={task_title}")
if not AUTO_CLAIM:
return
try:
claim_response = post_json(
build_tool_url(normalized, "claim_task"),
{
"task_id": task_id,
"agent_id": MCP_AGENT_ID,
"developer_wallet": DEVELOPER_WALLET,
},
)
claim_data = claim_response.get("data", {})
claim_token = claim_data.get("claim_token")
print(f" 🧪 claim_task status={claim_response.get('status')} token={str(claim_token)[:10] if claim_token else 'none'}")
append_report_line(
{
"type": "probe",
"target": normalized,
"action": "claim_task",
"source": source,
"source_tag": source_tag,
"status": claim_response.get("status"),
"task_id": task_id,
"has_claim_token": bool(claim_token),
}
)
except Exception as error:
print(f" ⚠️ claim_task failed: {error}")
append_report_line(
{
"type": "probe",
"target": normalized,
"action": "claim_task",
"source": source,
"source_tag": source_tag,
"status": "failed",
"error": str(error),
"task_id": task_id,
}
)
return
if not AUTO_SUBMIT or not claim_token:
return
try:
submit_response = post_json(
build_tool_url(normalized, "submit_solution"),
{
"task_id": task_id,
"claim_token": claim_token,
"deliverables": {
"README.md": SUBMISSION_README,
"notes.txt": SUBMISSION_NOTES,
},
"github_pr_url": AUTO_SUBMIT_PR_URL,
},
)
print(f" 🚀 submit_solution status={submit_response.get('status')}")
append_report_line(
{
"type": "probe",
"target": normalized,
"action": "submit_solution",
"source": source,
"source_tag": source_tag,
"status": submit_response.get("status"),
"task_id": task_id,
}
)
except Exception as error:
print(f" ⚠️ submit_solution failed: {error}")
append_report_line(
{
"type": "probe",
"target": normalized,
"action": "submit_solution",
"source": source,
"source_tag": source_tag,
"status": "failed",
"error": str(error),
"task_id": task_id,
}
)
async def probe_static_endpoints() -> None:
static_endpoints = collect_seed_endpoints()
append_report_line(
{
"type": "scanner_seed_snapshot",
"source": "timer" if SCAN_INTERVAL_SECONDS > 0 else "bootstrap",
"seed_count": len(static_endpoints),
"seeds": static_endpoints,
}
)
for index, base in enumerate(static_endpoints):
await probe_one_endpoint(base, f"seed:{index}", "static")
async def listen_for_bounties() -> None:
if websockets is None:
print("⚠️ websocket support unavailable; skipping Nostr real-time bounty discovery")
return
seen_events = set()
subscribe_message = [
"REQ",
NOSTR_SUB_ID,
{
"kinds": [1],
"#t": [NOSTR_TAG, NOSTR_TAG_FALLBACK],
"limit": NOSTR_LIMIT,
},
]
while True:
print(f"🤖 [Nostr Agent Client] connecting {RELAY_URL} ...")
try:
async with websockets.connect(RELAY_URL) as ws:
print("✅ connected. listening for bounty intents...")
await ws.send(json.dumps(subscribe_message))
async for raw in ws:
try:
data = json.loads(raw)
except Exception:
continue
if not isinstance(data, list) or len(data) < 2 or data[0] != "EVENT":
continue
event = data[2] if len(data) > 2 and isinstance(data[2], dict) else None
if not event:
continue
event_id = event.get("id")
if event_id in seen_events:
continue
seen_events.add(event_id)
if len(seen_events) > 512:
seen_events.pop()
payload = parse_bounty_payload(event.get("content", ""))
tags = parse_nostr_tags(event.get("tags"))
event_seen_targets: Set[str] = set()
event_stats = {
"processed": 0,
"candidates": 0,
"endpoint_hits": 0,
}
endpoint_candidates = discover_candidate_endpoints(payload.get("payload", {}), tags, event_seen_targets)
if payload.get("endpoint"):
candidate = _dedupe(normalize_endpoint(payload.get("endpoint") or "") or "", event_seen_targets)
if candidate:
endpoint_candidates.append(candidate)
extra_endpoints = payload.get("endpoints") or []
if isinstance(extra_endpoints, list):
for hint in extra_endpoints:
normalized = normalize_endpoint(str(hint).strip())
if not normalized:
continue
canonical = _dedupe(normalized, event_seen_targets)
if canonical:
endpoint_candidates.append(canonical)
reward = payload.get("reward")
currency = payload.get("currency", "USD")
task_id = payload.get("id")
title = payload.get("title", "unknown")
print(f"\n🚨 [Nostr bounty] id={task_id} title={title} reward={reward} {currency}")
event_stats["processed"] += 1
if not endpoint_candidates:
print(f"🤖 [Nostr event] id={event_id} no actionable endpoint; skipped")
append_report_line(
{
"type": "event",
"event_id": event_id,
"source": "nostr",
"source_tag": NOSTR_TAG,
"status": "ignored",
"reason": "no_endpoint_candidate",
}
)
continue
for endpoint in endpoint_candidates:
event_stats["candidates"] += 1
await probe_one_endpoint(endpoint, f"nostr:{event_id}", tags.get("t") or NOSTR_TAG)
event_stats["endpoint_hits"] += 1
append_report_line(
{
"type": "event",
"event_id": event_id,
"source": "nostr",
"source_tag": NOSTR_TAG,
"status": "processed",
"candidates": event_stats["candidates"],
"endpoint_hits": event_stats["endpoint_hits"],
}
)
except Exception as error:
print(f"⚠️ Connection failed: {error}")
print(f"⏳ Reconnect in {RECONNECTION_BACKOFF_SECONDS} seconds")
await asyncio.sleep(RECONNECTION_BACKOFF_SECONDS)
async def periodic_static_probe() -> None:
if SCAN_INTERVAL_SECONDS <= 0:
return
while True:
await asyncio.sleep(SCAN_INTERVAL_SECONDS)
append_report_line(
{
"type": "scanner",
"source": "timer",
"status": "tick",
"interval_seconds": SCAN_INTERVAL_SECONDS,
}
)
await probe_static_endpoints()
async def main():
print(f"🛰️ [VibeWork Ecosystem Hunter] started at {now_ts()}")
print(f" MCP base: {MCP_API_BASE}")
print(f" API key configured: {'yes' if MCP_API_KEY else 'no'}")
print(f" Auto claim: {'on' if AUTO_CLAIM else 'off'}")
print(f" Auto submit: {'on' if AUTO_SUBMIT else 'off'}")
print(f" Report file: {REPORT_PATH}")
print(f" Static scan interval: {SCAN_INTERVAL_SECONDS}s")
if KNOWN_ENDPOINTS_ENV:
print(f" Seed endpoints: {', '.join(KNOWN_ENDPOINTS_ENV)}")
await probe_static_endpoints()
if RUN_DAEMON:
if SCAN_INTERVAL_SECONDS > 0:
append_report_line(
{
"type": "bootstrap",
"status": "daemon_started_with_scan",
"scan_interval_seconds": SCAN_INTERVAL_SECONDS,
}
)
if websockets is None:
await periodic_static_probe()
else:
await asyncio.gather(listen_for_bounties(), periodic_static_probe())
else:
append_report_line({"type": "bootstrap", "status": "daemon_started_only_realtime"})
await listen_for_bounties()
else:
append_report_line({"type": "bootstrap", "status": "single_run"})
if __name__ == "__main__":
asyncio.run(main())