Files
awoooi/apps/api/src/routes/plugins.py
OG T 196d269b92 feat: add all application source code
- apps/api: FastAPI backend with Dockerfile
- apps/web: Next.js frontend with Dockerfile
- apps/sensor: Signal collection agent
- packages: shared packages

Co-Authored-By: Claude <noreply@anthropic.com>
2026-03-22 18:57:44 +08:00

99 lines
2.4 KiB
Python

"""
Plugin Management Endpoints
"""
from datetime import datetime
from typing import Literal
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
router = APIRouter()
PluginCategory = Literal["INPUT", "BRAIN", "OUTPUT", "ACTION", "DATA", "UI"]
class Plugin(BaseModel):
id: str
name: str
version: str
category: PluginCategory
enabled: bool
description: str | None = None
class PluginHealth(BaseModel):
plugin_id: str
status: Literal["healthy", "unhealthy", "unknown"]
last_check: datetime
error: str | None = None
# Mock data
MOCK_PLUGINS: list[Plugin] = [
Plugin(
id="lewooogo-input-webhook",
name="Webhook Trigger",
version="0.1.0",
category="INPUT",
enabled=True,
description="HTTP Webhook 觸發器",
),
Plugin(
id="lewooogo-brain-llm-router",
name="LLM Router",
version="0.1.0",
category="BRAIN",
enabled=True,
description="多模型路由器",
),
Plugin(
id="lewooogo-output-telegram",
name="Telegram Notifier",
version="0.1.0",
category="OUTPUT",
enabled=True,
description="Telegram 通知",
),
]
@router.get("", response_model=list[Plugin])
async def list_plugins(
category: PluginCategory | None = None,
enabled: bool | None = None,
) -> list[Plugin]:
"""列出所有已註冊 Plugin"""
result = MOCK_PLUGINS
if category:
result = [p for p in result if p.category == category]
if enabled is not None:
result = [p for p in result if p.enabled == enabled]
return result
@router.get("/{plugin_id}", response_model=Plugin)
async def get_plugin(plugin_id: str) -> Plugin:
"""取得 Plugin 詳情"""
for plugin in MOCK_PLUGINS:
if plugin.id == plugin_id:
return plugin
raise HTTPException(status_code=404, detail="Plugin not found")
@router.get("/{plugin_id}/health", response_model=PluginHealth)
async def get_plugin_health(plugin_id: str) -> PluginHealth:
"""Plugin 健康檢查"""
# Check if plugin exists
found = any(p.id == plugin_id for p in MOCK_PLUGINS)
if not found:
raise HTTPException(status_code=404, detail="Plugin not found")
return PluginHealth(
plugin_id=plugin_id,
status="healthy",
last_check=datetime.utcnow(),
)