Files
awoooi/apps/api/src/routes/pipelines.py
OG T 196d269b92 feat: add all application source code
- apps/api: FastAPI backend with Dockerfile
- apps/web: Next.js frontend with Dockerfile
- apps/sensor: Signal collection agent
- packages: shared packages

Co-Authored-By: Claude <noreply@anthropic.com>
2026-03-22 18:57:44 +08:00

111 lines
2.8 KiB
Python

"""
Pipeline Endpoints
"""
from datetime import datetime
from typing import Literal
from uuid import UUID, uuid4
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
router = APIRouter()
class PipelineStep(BaseModel):
id: str
plugin_id: str
type: Literal["INPUT", "BRAIN", "OUTPUT", "ACTION", "DATA", "UI"]
config: dict | None = None
class Pipeline(BaseModel):
id: UUID
name: str
description: str | None = None
status: Literal["draft", "active", "paused", "archived"]
steps: list[PipelineStep]
created_at: datetime
updated_at: datetime
class PipelineCreate(BaseModel):
name: str
description: str | None = None
steps: list[PipelineStep]
class PipelineExecution(BaseModel):
id: UUID
pipeline_id: UUID
status: Literal["pending", "running", "completed", "failed", "cancelled"]
started_at: datetime
completed_at: datetime | None = None
class PipelineList(BaseModel):
items: list[Pipeline]
next_page_token: str | None = None
# In-memory storage
_pipelines: dict[UUID, Pipeline] = {}
@router.get("", response_model=PipelineList)
async def list_pipelines(
status: Literal["draft", "active", "paused", "archived"] | None = None,
) -> PipelineList:
"""列出工作流"""
items = list(_pipelines.values())
if status:
items = [p for p in items if p.status == status]
return PipelineList(items=items)
@router.post("", response_model=Pipeline, status_code=201)
async def create_pipeline(data: PipelineCreate) -> Pipeline:
"""建立工作流"""
now = datetime.utcnow()
pipeline = Pipeline(
id=uuid4(),
name=data.name,
description=data.description,
status="draft",
steps=data.steps,
created_at=now,
updated_at=now,
)
_pipelines[pipeline.id] = pipeline
return pipeline
@router.get("/{pipeline_id}", response_model=Pipeline)
async def get_pipeline(pipeline_id: UUID) -> Pipeline:
"""取得工作流詳情"""
if pipeline_id not in _pipelines:
raise HTTPException(status_code=404, detail="Pipeline not found")
return _pipelines[pipeline_id]
@router.delete("/{pipeline_id}", status_code=204)
async def delete_pipeline(pipeline_id: UUID) -> None:
"""刪除工作流"""
if pipeline_id not in _pipelines:
raise HTTPException(status_code=404, detail="Pipeline not found")
del _pipelines[pipeline_id]
@router.post("/{pipeline_id}/trigger", response_model=PipelineExecution, status_code=202)
async def trigger_pipeline(pipeline_id: UUID) -> PipelineExecution:
"""手動觸發工作流"""
if pipeline_id not in _pipelines:
raise HTTPException(status_code=404, detail="Pipeline not found")
return PipelineExecution(
id=uuid4(),
pipeline_id=pipeline_id,
status="pending",
started_at=datetime.utcnow(),
)