feat(mcp-phase3): ArgoCD MCP + Sentry MCP + 完整 Provider 註冊
Some checks failed
CD Pipeline / build-and-deploy (push) Has been cancelled

ArgoCDProvider (3 工具):
  - argocd_list_apps: 列出所有 App + sync/health 狀態
  - argocd_get_app_status: 詳細狀態 + 問題資源清單
  - argocd_get_sync_history: 最近 N 筆部署記錄
  - 輸入驗證: app_name 白名單 regex
  - 需 ARGOCD_API_TOKEN + ARGOCD_MCP_ENABLED=true

SentryProvider (3 工具):
  - sentry_list_issues: 列出最近 Issues(狀態過濾)
  - sentry_get_issue: 詳情 + stacktrace 最後 5 frames
  - sentry_search_issues: PromQL 風格搜尋
  - issue_id 白名單驗證(只允許純數字)
  - 需 SENTRY_AUTH_TOKEN + SENTRY_MCP_ENABLED=true

providers/__init__.py: 補上 Prometheus + SSH + ArgoCD + Sentry 全部 10 個 providers
config.py: 新增 ARGOCD_URL / ARGOCD_API_TOKEN / ARGOCD_MCP_ENABLED / SENTRY_MCP_ENABLED

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
OG T
2026-04-11 09:11:53 +08:00
parent 3b896d0fbd
commit a2cc985f60
4 changed files with 550 additions and 0 deletions

View File

@@ -539,6 +539,28 @@ class Settings(BaseSettings):
description="允許 SSH 的主機 IP 清單(逗號分隔)",
)
# MCP Phase 3: ArgoCD MCP Server (2026-04-11 Claude Sonnet 4.6)
# ==========================================================================
ARGOCD_URL: str = Field(
default="http://192.168.0.125:32080",
description="ArgoCD API Server URLK3s NodePort",
)
ARGOCD_API_TOKEN: str = Field(
default="",
description="ArgoCD API Token從 K8s Secret 取得)",
)
ARGOCD_MCP_ENABLED: bool = Field(
default=False,
description="啟用 ArgoCD MCP Provider需 ARGOCD_API_TOKEN",
)
# MCP Phase 3: Sentry MCP Server (2026-04-11 Claude Sonnet 4.6)
# ==========================================================================
SENTRY_MCP_ENABLED: bool = Field(
default=False,
description="啟用 Sentry MCP Provider需 SENTRY_AUTH_TOKEN",
)
# ==========================================================================
# Phase 13.2: Grafana MCP Tool (#83)
# ==========================================================================

View File

@@ -9,21 +9,31 @@ MCP Tool Providers - ADR-015 模組化架構
- FilesystemProvider: 安全受限的文件讀取 (#82)
- GrafanaProvider: Grafana Dashboard 查詢 (#83)
- RAGProvider: 維運手冊語義搜尋 (#84)
- PrometheusProvider: Prometheus 即時查詢 (MCP Phase 2b)
- SSHProvider: 主機層 SSH 診斷/操作 (MCP Phase 2a)
- ArgoCDProvider: ArgoCD GitOps 狀態查詢 (MCP Phase 3)
- SentryProvider: Sentry 錯誤追蹤查詢 (MCP Phase 3)
@see docs/adr/ADR-015-mcp-modular-architecture.md
變更紀錄:
- 2026-04-11 v1.6: 新增 ArgoCDProvider + SentryProvider (MCP Phase 3) - Claude Sonnet 4.6
- 2026-04-11 v1.5: 新增 PrometheusProvider + SSHProvider (MCP Phase 2) - Claude Sonnet 4.6
- 2026-03-27 v1.3: 新增 RAGProvider (#84) - Claude Code (台北時區)
- 2026-03-26 v1.2: 新增 GrafanaProvider (#83) - Claude Code
- 2026-03-26 v1.1: 新增 FilesystemProvider (#82) - Claude Code
"""
from src.plugins.mcp.providers.argocd_provider import ArgoCDProvider
from src.plugins.mcp.providers.database_provider import DatabaseProvider
from src.plugins.mcp.providers.filesystem_provider import FilesystemProvider
from src.plugins.mcp.providers.grafana_provider import GrafanaProvider
from src.plugins.mcp.providers.k8s_provider import K8sProvider
from src.plugins.mcp.providers.prometheus_provider import PrometheusProvider
from src.plugins.mcp.providers.rag_provider import RAGProvider
from src.plugins.mcp.providers.sentry_provider import SentryProvider
from src.plugins.mcp.providers.signoz_provider import SignOzProvider
from src.plugins.mcp.providers.ssh_provider import SSHProvider
__all__ = [
"K8sProvider",
@@ -32,6 +42,10 @@ __all__ = [
"FilesystemProvider",
"GrafanaProvider",
"RAGProvider",
"PrometheusProvider",
"SSHProvider",
"ArgoCDProvider",
"SentryProvider",
]
@@ -49,3 +63,7 @@ def register_all_providers() -> None:
register_provider(FilesystemProvider())
register_provider(GrafanaProvider())
register_provider(RAGProvider())
register_provider(PrometheusProvider())
register_provider(SSHProvider())
register_provider(ArgoCDProvider())
register_provider(SentryProvider())

View File

@@ -0,0 +1,262 @@
"""
ArgoCD MCP Tool Provider — MCP Phase 3
========================================
提供三個 ArgoCD 管理工具,供 AI 查詢 GitOps 部署狀態:
argocd_list_apps — 列出所有 Application 及同步狀態
argocd_get_app_status — 取得特定 Application 詳細狀態
argocd_get_sync_history — 取得 Application 最近 N 筆部署記錄
設計原則:
- 唯讀(只呼叫 GET endpoints
- 透過 ArgoCD API Token 認證
- 逾時 10 秒
- 失敗回傳 MCPToolResult(success=False),不拋出
建立時間: 2026-04-11 (台北時區)
建立者: Claude Sonnet 4.6 — MCP Phase 3
@see docs/superpowers/specs/2026-04-10-infra-rebuild-sprint-abc-design.md §MCP-3
"""
import re
from typing import Any
import httpx
import structlog
from src.plugins.mcp.interfaces import MCPTool, MCPToolProvider, MCPToolResult
logger = structlog.get_logger(__name__)
_HTTP_TIMEOUT = 10.0
_RE_SAFE_APP_NAME = re.compile(r'^[a-zA-Z0-9][a-zA-Z0-9._-]{0,62}$')
def _validate_app_name(name: str) -> str:
if not _RE_SAFE_APP_NAME.match(name):
raise ValueError(f"Unsafe app name: {name!r}")
return name
class ArgoCDProvider(MCPToolProvider):
"""
ArgoCD MCP Tool Provider
三個工具:
argocd_list_apps — 列出所有 Applications
argocd_get_app_status — 取得單一 Application 狀態
argocd_get_sync_history — 取得部署歷史
"""
@property
def name(self) -> str:
return "argocd"
@property
def enabled(self) -> bool:
from src.core.config import settings
return getattr(settings, "ARGOCD_MCP_ENABLED", False)
def _base_url(self) -> str:
from src.core.config import settings
return getattr(settings, "ARGOCD_URL", "http://192.168.0.125:32080").rstrip("/")
def _headers(self) -> dict:
from src.core.config import settings
token = getattr(settings, "ARGOCD_API_TOKEN", "")
if token:
return {"Authorization": f"Bearer {token}"}
return {}
async def list_tools(self) -> list[MCPTool]:
return [
MCPTool(
name="argocd_list_apps",
description=(
"List all ArgoCD Applications with their sync and health status. "
"Returns name, sync_status (Synced/OutOfSync), health (Healthy/Degraded), "
"and last sync time."
),
input_schema={
"type": "object",
"properties": {
"namespace": {
"type": "string",
"description": "Filter by destination namespace (optional)",
}
},
"required": [],
},
),
MCPTool(
name="argocd_get_app_status",
description=(
"Get detailed status of a specific ArgoCD Application. "
"Returns sync status, health, resources list, and last sync result."
),
input_schema={
"type": "object",
"properties": {
"app_name": {
"type": "string",
"description": "ArgoCD Application name, e.g. 'awoooi-prod'",
}
},
"required": ["app_name"],
},
),
MCPTool(
name="argocd_get_sync_history",
description=(
"Get the recent sync history of an ArgoCD Application. "
"Returns last N deploys with revision, status, and timestamp."
),
input_schema={
"type": "object",
"properties": {
"app_name": {
"type": "string",
"description": "ArgoCD Application name",
},
"limit": {
"type": "integer",
"description": "Number of history entries to return (1-20, default 5)",
"default": 5,
},
},
"required": ["app_name"],
},
),
]
async def execute(self, tool_name: str, parameters: dict[str, Any]) -> MCPToolResult:
handlers = {
"argocd_list_apps": self._list_apps,
"argocd_get_app_status": self._get_app_status,
"argocd_get_sync_history": self._get_sync_history,
}
handler = handlers.get(tool_name)
if not handler:
return MCPToolResult(success=False, error=f"Unknown tool: {tool_name}")
try:
return await handler(parameters)
except Exception as e:
logger.error("argocd_mcp_error", tool=tool_name, error=str(e))
return MCPToolResult(success=False, error=str(e))
async def _list_apps(self, parameters: dict) -> MCPToolResult:
ns_filter = parameters.get("namespace", "")
url = f"{self._base_url()}/api/v1/applications"
params = {}
if ns_filter:
params["appNamespace"] = ns_filter
async with httpx.AsyncClient(timeout=_HTTP_TIMEOUT, verify=False) as client:
resp = await client.get(url, headers=self._headers(), params=params)
resp.raise_for_status()
data = resp.json()
apps = data.get("items") or []
result = []
for app in apps:
metadata = app.get("metadata", {})
status = app.get("status", {})
sync = status.get("sync", {})
health = status.get("health", {})
op_state = status.get("operationState", {})
result.append({
"name": metadata.get("name"),
"sync_status": sync.get("status"),
"health_status": health.get("status"),
"revision": sync.get("revision", "")[:8],
"last_sync": op_state.get("finishedAt", ""),
"destination_namespace": status.get("summary", {}).get("images"),
})
return MCPToolResult(
success=True,
data={"apps": result, "count": len(result)},
)
async def _get_app_status(self, parameters: dict) -> MCPToolResult:
app_name = _validate_app_name(parameters["app_name"])
url = f"{self._base_url()}/api/v1/applications/{app_name}"
async with httpx.AsyncClient(timeout=_HTTP_TIMEOUT, verify=False) as client:
resp = await client.get(url, headers=self._headers())
resp.raise_for_status()
app = resp.json()
status = app.get("status", {})
sync = status.get("sync", {})
health = status.get("health", {})
op_state = status.get("operationState", {})
resources = status.get("resources", [])
# 只取 degraded/outOfSync 資源
problem_resources = [
{
"kind": r.get("kind"),
"name": r.get("name"),
"namespace": r.get("namespace"),
"status": r.get("status"),
"health": r.get("health", {}).get("status") if r.get("health") else None,
}
for r in resources
if r.get("status") not in ("Synced", None) or
(r.get("health") or {}).get("status") not in ("Healthy", None)
]
return MCPToolResult(
success=True,
data={
"name": app.get("metadata", {}).get("name"),
"sync_status": sync.get("status"),
"health_status": health.get("status"),
"revision": sync.get("revision", "")[:8],
"last_sync_at": op_state.get("finishedAt"),
"last_sync_message": op_state.get("message"),
"total_resources": len(resources),
"problem_resources": problem_resources,
},
)
async def _get_sync_history(self, parameters: dict) -> MCPToolResult:
app_name = _validate_app_name(parameters["app_name"])
limit = max(1, min(int(parameters.get("limit", 5)), 20))
url = f"{self._base_url()}/api/v1/applications/{app_name}/revisions"
# ArgoCD history endpoint
history_url = f"{self._base_url()}/api/v1/applications/{app_name}"
async with httpx.AsyncClient(timeout=_HTTP_TIMEOUT, verify=False) as client:
resp = await client.get(history_url, headers=self._headers())
resp.raise_for_status()
app = resp.json()
history = app.get("status", {}).get("history", [])
history = sorted(history, key=lambda x: x.get("deployedAt", ""), reverse=True)[:limit]
entries = [
{
"revision": h.get("revision", "")[:8],
"deployed_at": h.get("deployedAt"),
"deploy_started_at": h.get("deployStartedAt"),
"id": h.get("id"),
}
for h in history
]
return MCPToolResult(
success=True,
data={"app_name": app_name, "history": entries, "count": len(entries)},
)
async def health_check(self) -> bool:
try:
url = f"{self._base_url()}/api/v1/applicationsets"
async with httpx.AsyncClient(timeout=5.0, verify=False) as client:
resp = await client.get(url, headers=self._headers())
return resp.status_code < 500
except Exception:
return False

View File

@@ -0,0 +1,248 @@
"""
Sentry MCP Tool Provider — MCP Phase 3
========================================
提供三個 Sentry 查詢工具,供 AI 關聯告警與程式錯誤:
sentry_list_issues — 列出最近 N 筆 Sentry Issues
sentry_get_issue — 取得特定 Issue 詳細資訊stacktrace / culprit
sentry_search_issues — 按 query 搜尋 Issues如 alertname、service
設計原則:
- 唯讀(只呼叫 GET endpoints
- 使用 SENTRY_AUTH_TOKEN + SENTRY_ORG + SENTRY_PROJECT
- 逾時 10 秒
- 失敗回傳 MCPToolResult(success=False),不拋出
建立時間: 2026-04-11 (台北時區)
建立者: Claude Sonnet 4.6 — MCP Phase 3
@see docs/superpowers/specs/2026-04-10-infra-rebuild-sprint-abc-design.md §MCP-3
"""
import re
from typing import Any
import httpx
import structlog
from src.plugins.mcp.interfaces import MCPTool, MCPToolProvider, MCPToolResult
logger = structlog.get_logger(__name__)
_HTTP_TIMEOUT = 10.0
_RE_SAFE_ISSUE_ID = re.compile(r'^\d{1,20}$')
class SentryProvider(MCPToolProvider):
"""
Sentry MCP Tool Provider
三個工具:
sentry_list_issues — 列出最近 Issues
sentry_get_issue — 取得 Issue 詳情
sentry_search_issues — 搜尋 Issues
"""
@property
def name(self) -> str:
return "sentry"
@property
def enabled(self) -> bool:
from src.core.config import settings
return (
getattr(settings, "SENTRY_MCP_ENABLED", False)
and bool(getattr(settings, "SENTRY_AUTH_TOKEN", ""))
)
def _base_url(self) -> str:
from src.core.config import settings
return settings.SENTRY_SELF_HOSTED_URL.rstrip("/")
def _org(self) -> str:
from src.core.config import settings
return settings.SENTRY_ORG
def _project(self) -> str:
from src.core.config import settings
return settings.SENTRY_PROJECT
def _headers(self) -> dict:
from src.core.config import settings
return {"Authorization": f"Bearer {settings.SENTRY_AUTH_TOKEN}"}
async def list_tools(self) -> list[MCPTool]:
return [
MCPTool(
name="sentry_list_issues",
description=(
"List recent Sentry issues for the configured project. "
"Returns issue ID, title, culprit, count, last seen, and status."
),
input_schema={
"type": "object",
"properties": {
"limit": {
"type": "integer",
"description": "Number of issues to return (1-25, default 10)",
"default": 10,
},
"status": {
"type": "string",
"description": "Filter by status: 'unresolved' (default), 'resolved', 'ignored'",
"default": "unresolved",
},
},
"required": [],
},
),
MCPTool(
name="sentry_get_issue",
description=(
"Get detailed information about a specific Sentry issue, "
"including the latest event's stacktrace and metadata."
),
input_schema={
"type": "object",
"properties": {
"issue_id": {
"type": "string",
"description": "Sentry issue ID (numeric string)",
}
},
"required": ["issue_id"],
},
),
MCPTool(
name="sentry_search_issues",
description=(
"Search Sentry issues by query string. "
"Supports Sentry search syntax, e.g. 'is:unresolved service:api'."
),
input_schema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Sentry search query, e.g. 'is:unresolved TypeError'",
},
"limit": {
"type": "integer",
"description": "Number of results (1-25, default 10)",
"default": 10,
},
},
"required": ["query"],
},
),
]
async def execute(self, tool_name: str, parameters: dict[str, Any]) -> MCPToolResult:
handlers = {
"sentry_list_issues": self._list_issues,
"sentry_get_issue": self._get_issue,
"sentry_search_issues": self._search_issues,
}
handler = handlers.get(tool_name)
if not handler:
return MCPToolResult(success=False, error=f"Unknown tool: {tool_name}")
try:
return await handler(parameters)
except Exception as e:
logger.error("sentry_mcp_error", tool=tool_name, error=str(e))
return MCPToolResult(success=False, error=str(e))
def _format_issue(self, issue: dict) -> dict:
return {
"id": issue.get("id"),
"title": issue.get("title", "")[:120],
"culprit": issue.get("culprit", "")[:100],
"status": issue.get("status"),
"count": issue.get("count"),
"user_count": issue.get("userCount"),
"first_seen": issue.get("firstSeen"),
"last_seen": issue.get("lastSeen"),
"short_id": issue.get("shortId"),
}
async def _list_issues(self, parameters: dict) -> MCPToolResult:
limit = max(1, min(int(parameters.get("limit", 10)), 25))
status = parameters.get("status", "unresolved")
# sanitize status
if status not in ("unresolved", "resolved", "ignored"):
status = "unresolved"
url = f"{self._base_url()}/api/0/projects/{self._org()}/{self._project()}/issues/"
params = {"query": f"is:{status}", "limit": limit}
async with httpx.AsyncClient(timeout=_HTTP_TIMEOUT) as client:
resp = await client.get(url, headers=self._headers(), params=params)
resp.raise_for_status()
issues = resp.json()
return MCPToolResult(
success=True,
data={"issues": [self._format_issue(i) for i in issues], "count": len(issues)},
)
async def _get_issue(self, parameters: dict) -> MCPToolResult:
issue_id = parameters["issue_id"]
if not _RE_SAFE_ISSUE_ID.match(str(issue_id)):
return MCPToolResult(success=False, error=f"Invalid issue_id: {issue_id!r}")
url = f"{self._base_url()}/api/0/issues/{issue_id}/"
events_url = f"{self._base_url()}/api/0/issues/{issue_id}/events/?limit=1&full=true"
async with httpx.AsyncClient(timeout=_HTTP_TIMEOUT) as client:
issue_resp = await client.get(url, headers=self._headers())
issue_resp.raise_for_status()
issue = issue_resp.json()
stacktrace = None
events_resp = await client.get(events_url, headers=self._headers())
if events_resp.status_code == 200:
events = events_resp.json()
if events:
entries = events[0].get("entries", [])
for entry in entries:
if entry.get("type") == "exception":
exc = entry.get("data", {}).get("values", [{}])[0]
frames = exc.get("stacktrace", {}).get("frames", [])
# 只取最後 5 frame
stacktrace = [
f"{f.get('filename')}:{f.get('lineNo')} in {f.get('function')}"
for f in frames[-5:]
]
break
result = self._format_issue(issue)
result["stacktrace"] = stacktrace
result["tags"] = {t["key"]: t["value"] for t in issue.get("tags", [])[:10]}
return MCPToolResult(success=True, data=result)
async def _search_issues(self, parameters: dict) -> MCPToolResult:
query = str(parameters["query"])[:200] # 限制長度防止注入
limit = max(1, min(int(parameters.get("limit", 10)), 25))
url = f"{self._base_url()}/api/0/projects/{self._org()}/{self._project()}/issues/"
params = {"query": query, "limit": limit}
async with httpx.AsyncClient(timeout=_HTTP_TIMEOUT) as client:
resp = await client.get(url, headers=self._headers(), params=params)
resp.raise_for_status()
issues = resp.json()
return MCPToolResult(
success=True,
data={"issues": [self._format_issue(i) for i in issues], "count": len(issues)},
)
async def health_check(self) -> bool:
try:
url = f"{self._base_url()}/api/0/projects/{self._org()}/{self._project()}/"
async with httpx.AsyncClient(timeout=5.0) as client:
resp = await client.get(url, headers=self._headers())
return resp.status_code < 500
except Exception:
return False