diff --git a/apps/web/src/app/api/mcp/[tool]/route.ts b/apps/web/src/app/api/mcp/[tool]/route.ts index da7f050..aea5ad3 100644 --- a/apps/web/src/app/api/mcp/[tool]/route.ts +++ b/apps/web/src/app/api/mcp/[tool]/route.ts @@ -14,6 +14,7 @@ import { logAuditEvent } from "@/lib/audit"; import { redis } from "@/lib/redis"; import { authHold, capturePayment } from "@/lib/payment"; import { sendTrafficAlert } from "@/lib/traffic-alert"; +import { evaluateExternalFunnelHealth } from "@/lib/traffic-conversion-monitor"; import crypto from "crypto"; import { z } from "zod"; @@ -182,6 +183,13 @@ export async function POST(request: NextRequest, props: { params: Promise<{ tool }, }); + if (isPublicIp) { + void evaluateExternalFunnelHealth({ + surface: "mcp/list_open_tasks", + periodMinutes: 10, + }); + } + void prisma.auditEvent.count({ where: { createdAt: { @@ -288,6 +296,11 @@ export async function POST(request: NextRequest, props: { params: Promise<{ tool }, }); + void evaluateExternalFunnelHealth({ + surface: "mcp/claim_task", + periodMinutes: 10, + }); + // Set Redis TTL key (3600 seconds) await redis.set(`vw:task:${claim.task_id}:executing`, claim.claim_token, "EX", 3600); @@ -364,6 +377,11 @@ export async function POST(request: NextRequest, props: { params: Promise<{ tool }, }); + void evaluateExternalFunnelHealth({ + surface: "mcp/submit_solution", + periodMinutes: 10, + }); + // Async trigger E2B Sandbox evaluation const taskObj = await prisma.task.findUnique({ where: { id: submission.task_id }}); if (taskObj && typeof taskObj.acceptance_criteria === "object" && taskObj.acceptance_criteria !== null) { diff --git a/apps/web/src/app/api/open-tasks/route.ts b/apps/web/src/app/api/open-tasks/route.ts index e5ef007..1a7b3e9 100644 --- a/apps/web/src/app/api/open-tasks/route.ts +++ b/apps/web/src/app/api/open-tasks/route.ts @@ -3,6 +3,7 @@ import { prisma } from "@/lib/prisma"; import { TaskStatus } from "@agent-bounty/contracts"; import { sendTrafficAlert } from "@/lib/traffic-alert"; import { isIP } from "node:net"; +import { evaluateExternalFunnelHealth } from "@/lib/traffic-conversion-monitor"; export const dynamic = "force-dynamic"; @@ -235,6 +236,13 @@ export async function GET(request: Request) { } }).catch(() => {}); + if (isPublicIp) { + void evaluateExternalFunnelHealth({ + surface: "public-open-tasks", + periodMinutes: 10, + }); + } + return NextResponse.json({ platform: "VibeWork", version: "v1", diff --git a/apps/web/src/app/api/traffic/route.ts b/apps/web/src/app/api/traffic/route.ts index 0d1e6c9..69bd1bf 100644 --- a/apps/web/src/app/api/traffic/route.ts +++ b/apps/web/src/app/api/traffic/route.ts @@ -162,6 +162,14 @@ export async function GET(request: NextRequest) { return false; }; + const isInternalActor = (params: { + actorType: string | null | undefined; + actorId: string | null | undefined; + }) => { + if (params.actorType === "AGENT") return false; + return isInternalActorId(params.actorId); + }; + const externalActorSummary = externalActorRows .map((row) => ({ actorId: row.actorId || "unknown", @@ -246,7 +254,11 @@ export async function GET(request: NextRequest) { }); const recentExternalEvents = recentEvents.filter((event) => - event.action.startsWith("EXTERNAL_") && !isInternalActorId(event.actorId) + event.action.startsWith("EXTERNAL_") && + !isInternalActor({ + actorType: event.actorType, + actorId: event.actorId, + }) ); const recentInternalEvents = recentEvents.filter( diff --git a/apps/web/src/app/traffic/page.tsx b/apps/web/src/app/traffic/page.tsx index 03a6846..e9c6213 100644 --- a/apps/web/src/app/traffic/page.tsx +++ b/apps/web/src/app/traffic/page.tsx @@ -17,6 +17,10 @@ const EVENT_LABELS: Record = { EXTERNAL_SUBMIT_SOLUTION_ERROR: "外部提交失敗", EXTERNAL_LIST_OPEN_TASKS_ERROR: "外部公開流量端點錯誤", EXTERNAL_LIST_OPEN_TASKS_MCP_ERROR: "外部 MCP 流量端點錯誤", + EXTERNAL_FUNNEL_CLAIM_STALL: "外部曝光後未接案", + EXTERNAL_FUNNEL_SUBMIT_STALL: "外部接案後未提交", + EXTERNAL_FUNNEL_PASS_STALL: "外部提交後未 PASS", + EXTERNAL_FUNNEL_PAYOUT_STALL: "PASS 後未出金", JUDGE_COMPLETE: "AI 交件判定完成", }; @@ -61,6 +65,11 @@ function isInternalActorId(value: string | null | undefined) { return false; } +function isInternalActor(input: { actorType: string | null | undefined; actorId: string | null | undefined }) { + if (input.actorType === "AGENT") return false; + return isInternalActorId(input.actorId); +} + function isAuthorizedToken(token: string | undefined, tokenHeader: string | undefined) { if (!token) return true; return tokenHeader === token; @@ -245,7 +254,13 @@ async function getTrafficSummary(minutes: number) { externalActorSummary, externalEventTypes, internalEventTypes, - recentExternalEvents: recentEvents.filter((event) => event.action.startsWith("EXTERNAL_") && !isInternalActorId(event.actorId)), + recentExternalEvents: recentEvents.filter((event) => + event.action.startsWith("EXTERNAL_") && + !isInternalActor({ + actorType: event.actorType, + actorId: event.actorId, + }) + ), recentInternalEvents: recentEvents.filter((event) => !event.action.startsWith("EXTERNAL_")), conversionSummary, conversionRates, diff --git a/apps/web/src/lib/traffic-conversion-monitor.ts b/apps/web/src/lib/traffic-conversion-monitor.ts new file mode 100644 index 0000000..0d7b8d1 --- /dev/null +++ b/apps/web/src/lib/traffic-conversion-monitor.ts @@ -0,0 +1,216 @@ +import { prisma } from "./prisma"; +import { redis } from "./redis"; +import { sendTrafficAlert } from "./traffic-alert"; + +type FunnelSummary = { + discoveryEvents: number; + claimEvents: number; + submitEvents: number; + judgePassEvents: number; + judgeFailEvents: number; + payoutCaptured: number; + payoutReleased: number; + periodMinutes: number; +}; + +type MonitorInput = { + surface: string; + periodMinutes?: number; +}; + +const DEFAULT_PERIOD_MINUTES = 10; +const ALERT_TTL_SECONDS = 600; + +function asRecordJson(value: unknown): Record | undefined { + if (typeof value === "object" && value !== null && !Array.isArray(value)) { + return value as Record; + } + return undefined; +} + +function isMissingLedgerTableError(error: unknown) { + return ( + typeof error === "object" && + error !== null && + "code" in error && + (error as { code: string }).code === "P2021" + ); +} + +async function fetchFunnelSummary(minutes: number): Promise { + const since = new Date(Date.now() - minutes * 60 * 1000); + + const summaryRows = await prisma.auditEvent.groupBy({ + by: ["action"], + where: { + createdAt: { gte: since }, + action: { + startsWith: "EXTERNAL_", + }, + }, + _count: { _all: true }, + }); + + const judgeRows = await prisma.auditEvent.findMany({ + where: { + createdAt: { gte: since }, + action: "JUDGE_COMPLETE", + }, + select: { metadata: true }, + }); + + let payoutCaptured = 0; + let payoutReleased = 0; + try { + [payoutCaptured, payoutReleased] = await Promise.all([ + prisma.ledgerEntry.count({ + where: { + created_at: { gte: since }, + phase: "CAPTURE", + response_status: "SUCCESS", + }, + }), + prisma.ledgerEntry.count({ + where: { + created_at: { gte: since }, + phase: "RELEASE", + response_status: "SUCCESS", + }, + }), + ]); + } catch (error) { + if (!isMissingLedgerTableError(error)) { + throw error; + } + } + + const actionSummary = Object.fromEntries( + summaryRows.map((row) => [row.action, row._count._all]) + ); + + const discoveryEvents = + (actionSummary["EXTERNAL_LIST_OPEN_TASKS"] || 0) + + (actionSummary["EXTERNAL_LIST_OPEN_TASKS_MCP"] || 0); + const claimEvents = actionSummary["EXTERNAL_CLAIM_TASK_SUCCESS"] || 0; + const submitEvents = actionSummary["EXTERNAL_SUBMIT_SOLUTION_SUCCESS"] || 0; + + const judgePassEvents = judgeRows.filter((row) => { + const metadata = asRecordJson(row.metadata); + return metadata?.overall_result === "PASS"; + }).length; + + const judgeFailEvents = judgeRows.filter((row) => { + const metadata = asRecordJson(row.metadata); + return metadata?.overall_result === "FAIL"; + }).length; + + return { + discoveryEvents, + claimEvents, + submitEvents, + judgePassEvents, + judgeFailEvents, + payoutCaptured, + payoutReleased, + periodMinutes: minutes, + }; +} + +function buildAlertMessage(rule: string, summary: FunnelSummary) { + const { periodMinutes, discoveryEvents, claimEvents, submitEvents, judgePassEvents, payoutCaptured } = summary; + + switch (rule) { + case "EXTERNAL_FUNNEL_CLAIM_STALL": + return `外部曝光已達 ${discoveryEvents}(最近 ${periodMinutes} 分鐘),但尚無接案(EXTERNAL_CLAIM_TASK_SUCCESS = ${claimEvents})。請檢查任務是否包含可直接執行的 npx 指令與明確交付條件。`; + case "EXTERNAL_FUNNEL_SUBMIT_STALL": + return `外部已有 ${claimEvents} 筆接案,但近期 ${periodMinutes} 分鐘無任何提交(EXTERNAL_SUBMIT_SOLUTION_SUCCESS = ${submitEvents})。請先加速回傳格式與驗收測試規格。`; + case "EXTERNAL_FUNNEL_PASS_STALL": + return `外部已提交 ${submitEvents} 次但尚無 PASS(JUDGE_RESULT PASS = ${judgePassEvents})。請先檢查 task acceptance_criteria 與測試欄位是否可自動驗證。`; + case "EXTERNAL_FUNNEL_PAYOUT_STALL": + return `有 PASS 但未收款(payout CAPTURE 成功 = ${payoutCaptured})。請確認支付授權、Stripe key 與 capture 任務是否正常。`; + default: + return "外部 AI 流量轉化斷崖異常。"; + } +} + +function alertRules(summary: FunnelSummary): Array<{ action: string; message: string }> { + const alerts: Array<{ action: string; message: string }> = []; + + if (summary.discoveryEvents > 0 && summary.claimEvents === 0) { + alerts.push({ + action: "EXTERNAL_FUNNEL_CLAIM_STALL", + message: buildAlertMessage("EXTERNAL_FUNNEL_CLAIM_STALL", summary), + }); + } + + if (summary.claimEvents > 0 && summary.submitEvents === 0) { + alerts.push({ + action: "EXTERNAL_FUNNEL_SUBMIT_STALL", + message: buildAlertMessage("EXTERNAL_FUNNEL_SUBMIT_STALL", summary), + }); + } + + if (summary.submitEvents > 0 && summary.judgePassEvents === 0) { + alerts.push({ + action: "EXTERNAL_FUNNEL_PASS_STALL", + message: buildAlertMessage("EXTERNAL_FUNNEL_PASS_STALL", summary), + }); + } + + if (summary.judgePassEvents > 0 && summary.payoutCaptured === 0) { + alerts.push({ + action: "EXTERNAL_FUNNEL_PAYOUT_STALL", + message: buildAlertMessage("EXTERNAL_FUNNEL_PAYOUT_STALL", summary), + }); + } + + return alerts; +} + +async function shouldEmitAlert(key: string): Promise { + try { + const result = await redis.set(key, String(Date.now()), "NX", "EX", ALERT_TTL_SECONDS); + return result === "OK"; + } catch (error) { + console.warn("[traffic-monitor] redis dedupe failed", error); + return true; + } +} + +export async function evaluateExternalFunnelHealth(input: MonitorInput): Promise { + const periodMinutes = Math.max(input.periodMinutes ?? DEFAULT_PERIOD_MINUTES, 5); + + try { + const summary = await fetchFunnelSummary(periodMinutes); + const alerts = alertRules(summary); + + await Promise.all( + alerts.map(async (rule) => { + const dedupeKey = `vw:traffic-funnel:${rule.action}:${input.surface}`; + const shouldNotify = await shouldEmitAlert(dedupeKey); + if (!shouldNotify) return; + + await sendTrafficAlert({ + level: "warning", + action: rule.action, + surface: input.surface, + actorType: "SYSTEM", + actorId: "traffic-monitor", + message: rule.message, + metadata: { + discovery_events: summary.discoveryEvents, + claim_events: summary.claimEvents, + submit_events: summary.submitEvents, + judge_pass_events: summary.judgePassEvents, + judge_fail_events: summary.judgeFailEvents, + payout_captured: summary.payoutCaptured, + payout_released: summary.payoutReleased, + period_minutes: summary.periodMinutes, + }, + }); + }) + ); + } catch (error) { + console.error("[traffic-monitor] evaluateExternalFunnelHealth failed", error); + } +}