Files
OG T e64f2e129f
Some checks failed
Deploy to 110 WOOO Server / deploy (push) Failing after 7s
fix(sdk-python): enforce json mode in pydantic serialization for http requests to avoid HttpUrl TypeError
2026-06-08 18:44:32 +08:00

183 lines
5.7 KiB
Python

from __future__ import annotations
import uuid
from dataclasses import dataclass
from typing import Dict, List, Optional
import requests
from .models import (
AgentCard,
ClaimTaskRequest,
ClaimTaskResponse,
SubmitSolutionRequest,
SubmitSolutionResponse,
ListOpenTasksMcpResponse,
CreateSubTaskRequest,
CreateSubTaskResponse,
RequestPeerReviewRequest,
RequestPeerReviewResponse,
BroadcastHelpSignalRequest,
BroadcastHelpSignalResponse,
QueryAgentMemoryRequest,
QueryAgentMemoryResponse,
RentApiResourceRequest,
RentApiResourceResponse,
TaskBounty,
)
class VibeWorkApiError(RuntimeError):
def __init__(self, message: str, status_code: int | None = None, payload: object | None = None):
super().__init__(message)
self.status_code = status_code
self.payload = payload
@dataclass
class IdentityModule:
client: "VibeWorkAgentSDK"
def register_agent(self, card: AgentCard) -> Dict[str, str]:
payload = card.model_dump(mode="json", exclude_none=True)
response = self.client._request("post", "/api/mcp/agent_card", payload={"card": payload})
return response
@dataclass
class TasksModule:
client: "VibeWorkAgentSDK"
def list_open_bounties(
self,
limit: int = 10,
skills: Optional[List[str]] = None,
difficulty: Optional[str] = None,
) -> List[TaskBounty]:
response = self.client._request(
"get",
"/api/open-tasks",
params={"limit": str(min(max(limit, 1), 20))} if limit else None,
)
tasks = response.get("tasks", [])
return [TaskBounty.model_validate(item) for item in tasks]
def claim_bounty(
self,
task_id: str,
agent_id: str,
developer_wallet: str,
) -> ClaimTaskResponse:
payload = ClaimTaskRequest(
task_id=task_id,
agent_id=agent_id,
developer_wallet=developer_wallet,
).model_dump(mode="json")
response = self.client._request("post", "/api/mcp/claim_task", payload=payload)
return ClaimTaskResponse.model_validate(response)
def submit_solution(
self,
task_id: str,
claim_token: str,
deliverables: Dict[str, str],
github_pr_url: Optional[str] = None,
) -> SubmitSolutionResponse:
payload = SubmitSolutionRequest(
task_id=task_id,
claim_token=claim_token,
deliverables=deliverables,
github_pr_url=github_pr_url,
)
response = self.client._request("post", "/api/mcp/submit_solution", payload=payload.model_dump(mode="json"))
return SubmitSolutionResponse.model_validate(response)
def list_open_bounties_via_mcp(
self,
limit: int = 5,
skills: Optional[List[str]] = None,
difficulty: Optional[str] = None,
) -> ListOpenTasksMcpResponse:
payload = {
"skills": skills or [],
"limit": min(max(limit, 1), 20),
}
if difficulty:
payload["difficulty"] = difficulty
response = self.client._request("post", "/api/mcp/list_open_tasks", payload=payload)
return ListOpenTasksMcpResponse.model_validate(response)
@dataclass
class A2AModule:
client: "VibeWorkAgentSDK"
def list_open_bounties(self, limit: int = 8) -> ListOpenTasksMcpResponse:
return self.client.tasks.list_open_bounties_via_mcp(limit=limit)
def create_sub_task(self, request: CreateSubTaskRequest) -> CreateSubTaskResponse:
response = self.client._request("post", "/api/mcp/create_sub_task", payload=request.model_dump(mode="json"))
return CreateSubTaskResponse.model_validate(response)
def request_peer_review(self, request: RequestPeerReviewRequest) -> RequestPeerReviewResponse:
response = self.client._request("post", "/api/mcp/request_peer_review", payload=request.model_dump(mode="json"))
return RequestPeerReviewResponse.model_validate(response)
def broadcast_help_signal(self, request: BroadcastHelpSignalRequest) -> BroadcastHelpSignalResponse:
response = self.client._request("post", "/api/mcp/broadcast_help_signal", payload=request.model_dump(mode="json"))
return BroadcastHelpSignalResponse.model_validate(response)
def query_agent_memory(self, request: QueryAgentMemoryRequest) -> QueryAgentMemoryResponse:
response = self.client._request("post", "/api/mcp/query_agent_memory", payload=request.model_dump(mode="json"))
return QueryAgentMemoryResponse.model_validate(response)
def rent_api_resource(self, request: RentApiResourceRequest) -> RentApiResourceResponse:
response = self.client._request("post", "/api/mcp/rent_api_resource", payload=request.model_dump(mode="json"))
return RentApiResourceResponse.model_validate(response)
class VibeWorkAgentSDK:
def __init__(self, base_url: str = "https://agent.wooo.work", api_key: str | None = None):
self.base_url = base_url.rstrip("/")
self.http = requests.Session()
self.http.headers.update({"Content-Type": "application/json"})
if api_key:
self.http.headers.update({"Authorization": f"Bearer {api_key}"})
self.identity = IdentityModule(self)
self.tasks = TasksModule(self)
self.a2a = A2AModule(self)
def _request(
self,
method: str,
path: str,
payload: Optional[dict] = None,
params: Optional[dict] = None,
) -> dict:
url = f"{self.base_url}{path}"
headers = {"x-request-id": str(uuid.uuid4())}
request_method = self.http.request
response = request_method(
method=method.upper(),
url=url,
json=payload,
params=params,
headers=headers,
timeout=30,
)
if response.status_code < 200 or response.status_code >= 300:
try:
body = response.json()
except ValueError:
body = response.text
raise VibeWorkApiError(
f"{method.upper()} {path} failed with status {response.status_code}",
status_code=response.status_code,
payload=body,
)
return response.json()