183 lines
5.7 KiB
Python
183 lines
5.7 KiB
Python
from __future__ import annotations
|
|
|
|
import uuid
|
|
from dataclasses import dataclass
|
|
from typing import Dict, List, Optional
|
|
|
|
import requests
|
|
|
|
from .models import (
|
|
AgentCard,
|
|
ClaimTaskRequest,
|
|
ClaimTaskResponse,
|
|
SubmitSolutionRequest,
|
|
SubmitSolutionResponse,
|
|
ListOpenTasksMcpResponse,
|
|
CreateSubTaskRequest,
|
|
CreateSubTaskResponse,
|
|
RequestPeerReviewRequest,
|
|
RequestPeerReviewResponse,
|
|
BroadcastHelpSignalRequest,
|
|
BroadcastHelpSignalResponse,
|
|
QueryAgentMemoryRequest,
|
|
QueryAgentMemoryResponse,
|
|
RentApiResourceRequest,
|
|
RentApiResourceResponse,
|
|
TaskBounty,
|
|
)
|
|
|
|
|
|
class VibeWorkApiError(RuntimeError):
|
|
def __init__(self, message: str, status_code: int | None = None, payload: object | None = None):
|
|
super().__init__(message)
|
|
self.status_code = status_code
|
|
self.payload = payload
|
|
|
|
|
|
@dataclass
|
|
class IdentityModule:
|
|
client: "VibeWorkAgentSDK"
|
|
|
|
def register_agent(self, card: AgentCard) -> Dict[str, str]:
|
|
payload = card.model_dump(mode="json", exclude_none=True)
|
|
response = self.client._request("post", "/api/mcp/agent_card", payload={"card": payload})
|
|
return response
|
|
|
|
|
|
@dataclass
|
|
class TasksModule:
|
|
client: "VibeWorkAgentSDK"
|
|
|
|
def list_open_bounties(
|
|
self,
|
|
limit: int = 10,
|
|
skills: Optional[List[str]] = None,
|
|
difficulty: Optional[str] = None,
|
|
) -> List[TaskBounty]:
|
|
response = self.client._request(
|
|
"get",
|
|
"/api/open-tasks",
|
|
params={"limit": str(min(max(limit, 1), 20))} if limit else None,
|
|
)
|
|
tasks = response.get("tasks", [])
|
|
return [TaskBounty.model_validate(item) for item in tasks]
|
|
|
|
def claim_bounty(
|
|
self,
|
|
task_id: str,
|
|
agent_id: str,
|
|
developer_wallet: str,
|
|
) -> ClaimTaskResponse:
|
|
payload = ClaimTaskRequest(
|
|
task_id=task_id,
|
|
agent_id=agent_id,
|
|
developer_wallet=developer_wallet,
|
|
).model_dump(mode="json")
|
|
response = self.client._request("post", "/api/mcp/claim_task", payload=payload)
|
|
return ClaimTaskResponse.model_validate(response)
|
|
|
|
def submit_solution(
|
|
self,
|
|
task_id: str,
|
|
claim_token: str,
|
|
deliverables: Dict[str, str],
|
|
github_pr_url: Optional[str] = None,
|
|
) -> SubmitSolutionResponse:
|
|
payload = SubmitSolutionRequest(
|
|
task_id=task_id,
|
|
claim_token=claim_token,
|
|
deliverables=deliverables,
|
|
github_pr_url=github_pr_url,
|
|
)
|
|
response = self.client._request("post", "/api/mcp/submit_solution", payload=payload.model_dump(mode="json"))
|
|
return SubmitSolutionResponse.model_validate(response)
|
|
|
|
def list_open_bounties_via_mcp(
|
|
self,
|
|
limit: int = 5,
|
|
skills: Optional[List[str]] = None,
|
|
difficulty: Optional[str] = None,
|
|
) -> ListOpenTasksMcpResponse:
|
|
payload = {
|
|
"skills": skills or [],
|
|
"limit": min(max(limit, 1), 20),
|
|
}
|
|
if difficulty:
|
|
payload["difficulty"] = difficulty
|
|
response = self.client._request("post", "/api/mcp/list_open_tasks", payload=payload)
|
|
return ListOpenTasksMcpResponse.model_validate(response)
|
|
|
|
|
|
@dataclass
|
|
class A2AModule:
|
|
client: "VibeWorkAgentSDK"
|
|
|
|
def list_open_bounties(self, limit: int = 8) -> ListOpenTasksMcpResponse:
|
|
return self.client.tasks.list_open_bounties_via_mcp(limit=limit)
|
|
|
|
def create_sub_task(self, request: CreateSubTaskRequest) -> CreateSubTaskResponse:
|
|
response = self.client._request("post", "/api/mcp/create_sub_task", payload=request.model_dump(mode="json"))
|
|
return CreateSubTaskResponse.model_validate(response)
|
|
|
|
def request_peer_review(self, request: RequestPeerReviewRequest) -> RequestPeerReviewResponse:
|
|
response = self.client._request("post", "/api/mcp/request_peer_review", payload=request.model_dump(mode="json"))
|
|
return RequestPeerReviewResponse.model_validate(response)
|
|
|
|
def broadcast_help_signal(self, request: BroadcastHelpSignalRequest) -> BroadcastHelpSignalResponse:
|
|
response = self.client._request("post", "/api/mcp/broadcast_help_signal", payload=request.model_dump(mode="json"))
|
|
return BroadcastHelpSignalResponse.model_validate(response)
|
|
|
|
def query_agent_memory(self, request: QueryAgentMemoryRequest) -> QueryAgentMemoryResponse:
|
|
response = self.client._request("post", "/api/mcp/query_agent_memory", payload=request.model_dump(mode="json"))
|
|
return QueryAgentMemoryResponse.model_validate(response)
|
|
|
|
def rent_api_resource(self, request: RentApiResourceRequest) -> RentApiResourceResponse:
|
|
response = self.client._request("post", "/api/mcp/rent_api_resource", payload=request.model_dump(mode="json"))
|
|
return RentApiResourceResponse.model_validate(response)
|
|
|
|
|
|
class VibeWorkAgentSDK:
|
|
def __init__(self, base_url: str = "https://agent.wooo.work", api_key: str | None = None):
|
|
self.base_url = base_url.rstrip("/")
|
|
self.http = requests.Session()
|
|
self.http.headers.update({"Content-Type": "application/json"})
|
|
|
|
if api_key:
|
|
self.http.headers.update({"Authorization": f"Bearer {api_key}"})
|
|
|
|
self.identity = IdentityModule(self)
|
|
self.tasks = TasksModule(self)
|
|
self.a2a = A2AModule(self)
|
|
|
|
def _request(
|
|
self,
|
|
method: str,
|
|
path: str,
|
|
payload: Optional[dict] = None,
|
|
params: Optional[dict] = None,
|
|
) -> dict:
|
|
url = f"{self.base_url}{path}"
|
|
headers = {"x-request-id": str(uuid.uuid4())}
|
|
request_method = self.http.request
|
|
response = request_method(
|
|
method=method.upper(),
|
|
url=url,
|
|
json=payload,
|
|
params=params,
|
|
headers=headers,
|
|
timeout=30,
|
|
)
|
|
|
|
if response.status_code < 200 or response.status_code >= 300:
|
|
try:
|
|
body = response.json()
|
|
except ValueError:
|
|
body = response.text
|
|
raise VibeWorkApiError(
|
|
f"{method.upper()} {path} failed with status {response.status_code}",
|
|
status_code=response.status_code,
|
|
payload=body,
|
|
)
|
|
|
|
return response.json()
|