483 lines
15 KiB
TypeScript
483 lines
15 KiB
TypeScript
import { prisma } from "./prisma";
|
||
import { redis } from "./redis";
|
||
import { sendTrafficAlert } from "./traffic-alert";
|
||
|
||
type FunnelSummary = {
|
||
discoveryEvents: number;
|
||
claimEvents: number;
|
||
submitEvents: number;
|
||
judgePassEvents: number;
|
||
judgeFailEvents: number;
|
||
openTaskCount: number;
|
||
sampleOpenTasks: string[];
|
||
mcpAuthMissingEvents: number;
|
||
mcpAuthForbiddenEvents: number;
|
||
externalOpenedActors: number;
|
||
externalClaimingActors: number;
|
||
externalSubmittingActors: number;
|
||
externalOnlyOpenActors: number;
|
||
topOpenOnlyActors: Array<{ actorId: string; opens: number }>;
|
||
judgeFailureReasons: Array<{
|
||
reason: string;
|
||
count: number;
|
||
examples: string[];
|
||
}>;
|
||
payoutCaptured: number;
|
||
payoutReleased: number;
|
||
periodMinutes: number;
|
||
};
|
||
|
||
type MonitorInput = {
|
||
surface: string;
|
||
periodMinutes?: number;
|
||
};
|
||
|
||
const DEFAULT_PERIOD_MINUTES = 10;
|
||
const ALERT_TTL_SECONDS = 600;
|
||
let redisDedupeFailureLogged = false;
|
||
|
||
function asRecordJson(value: unknown): Record<string, unknown> | undefined {
|
||
if (typeof value === "object" && value !== null && !Array.isArray(value)) {
|
||
return value as Record<string, unknown>;
|
||
}
|
||
return undefined;
|
||
}
|
||
|
||
function normalizedJudgeResult(value: unknown) {
|
||
if (typeof value !== "string") {
|
||
return "";
|
||
}
|
||
return value.trim().toLowerCase();
|
||
}
|
||
|
||
function isInternalActorId(actorId: string | null | undefined) {
|
||
if (!actorId) return true;
|
||
const actorIdValue = actorId.toLowerCase();
|
||
if (actorIdValue === "unknown" || actorIdValue === "mcp-anonymous") return true;
|
||
|
||
const ipMatch = actorIdValue.match(/^open-tasks:([a-z0-9.:_-]+)$/);
|
||
if (!ipMatch?.[1]) return false;
|
||
|
||
const actorIp = ipMatch[1];
|
||
if (
|
||
actorIp.startsWith("127.") ||
|
||
actorIp.startsWith("10.") ||
|
||
actorIp.startsWith("192.168.")
|
||
) {
|
||
return true;
|
||
}
|
||
|
||
if (actorIp.startsWith("172.")) {
|
||
const secondOctet = Number(actorIp.split(".")[1]);
|
||
return secondOctet >= 16 && secondOctet <= 31;
|
||
}
|
||
|
||
return false;
|
||
}
|
||
|
||
function isMissingLedgerTableError(error: unknown) {
|
||
return (
|
||
typeof error === "object" &&
|
||
error !== null &&
|
||
"code" in error &&
|
||
(error as { code: string }).code === "P2021"
|
||
);
|
||
}
|
||
|
||
async function fetchFunnelSummary(minutes: number): Promise<FunnelSummary> {
|
||
const since = new Date(Date.now() - minutes * 60 * 1000);
|
||
|
||
const [summaryRows, actorRows, judgeRows, openTaskRows, openTaskCount] = await Promise.all([
|
||
prisma.auditEvent.groupBy({
|
||
by: ["action"],
|
||
where: {
|
||
createdAt: { gte: since },
|
||
action: {
|
||
startsWith: "EXTERNAL_",
|
||
},
|
||
},
|
||
_count: { _all: true },
|
||
}),
|
||
prisma.auditEvent.groupBy({
|
||
by: ["actorId", "action"],
|
||
where: {
|
||
createdAt: { gte: since },
|
||
actorType: "AGENT",
|
||
action: {
|
||
startsWith: "EXTERNAL_",
|
||
},
|
||
},
|
||
_count: { _all: true },
|
||
}),
|
||
prisma.auditEvent.findMany({
|
||
where: {
|
||
createdAt: { gte: since },
|
||
action: "JUDGE_COMPLETE",
|
||
},
|
||
select: { metadata: true, entityId: true },
|
||
}),
|
||
prisma.task.findMany({
|
||
where: {
|
||
status: "OPEN",
|
||
title: {
|
||
not: {
|
||
startsWith: "GitHub Issue:",
|
||
},
|
||
},
|
||
},
|
||
orderBy: {
|
||
created_at: "desc",
|
||
},
|
||
select: {
|
||
id: true,
|
||
},
|
||
take: 5,
|
||
}),
|
||
prisma.task.count({
|
||
where: {
|
||
status: "OPEN",
|
||
title: {
|
||
not: {
|
||
startsWith: "GitHub Issue:",
|
||
},
|
||
},
|
||
},
|
||
}),
|
||
]);
|
||
|
||
let payoutCaptured = 0;
|
||
let payoutReleased = 0;
|
||
try {
|
||
[payoutCaptured, payoutReleased] = await Promise.all([
|
||
prisma.ledgerEntry.count({
|
||
where: {
|
||
created_at: { gte: since },
|
||
phase: "CAPTURE",
|
||
response_status: "SUCCESS",
|
||
},
|
||
}),
|
||
prisma.ledgerEntry.count({
|
||
where: {
|
||
created_at: { gte: since },
|
||
phase: "RELEASE",
|
||
response_status: "SUCCESS",
|
||
},
|
||
}),
|
||
]);
|
||
} catch (error) {
|
||
if (!isMissingLedgerTableError(error)) {
|
||
throw error;
|
||
}
|
||
}
|
||
|
||
const actionSummary = Object.fromEntries(
|
||
summaryRows.map((row) => [row.action, row._count._all])
|
||
);
|
||
|
||
const discoveryEvents =
|
||
(actionSummary["EXTERNAL_LIST_OPEN_TASKS"] || 0) +
|
||
(actionSummary["EXTERNAL_LIST_OPEN_TASKS_MCP"] || 0);
|
||
const claimEvents = actionSummary["EXTERNAL_CLAIM_TASK_SUCCESS"] || 0;
|
||
const submitEvents = actionSummary["EXTERNAL_SUBMIT_SOLUTION_SUCCESS"] || 0;
|
||
const mcpAuthMissingEvents = actionSummary["EXTERNAL_MCP_AUTH_MISSING"] || 0;
|
||
const mcpAuthForbiddenEvents = actionSummary["EXTERNAL_MCP_AUTH_FORBIDDEN"] || 0;
|
||
|
||
const judgePassRows = judgeRows.filter((row) => {
|
||
const metadata = asRecordJson(row.metadata);
|
||
return normalizedJudgeResult(metadata?.overall_result) === "pass";
|
||
});
|
||
const judgeFailRows = judgeRows.filter((row) => {
|
||
const metadata = asRecordJson(row.metadata);
|
||
return normalizedJudgeResult(metadata?.overall_result) === "fail";
|
||
});
|
||
const judgePassEvents = judgePassRows.length;
|
||
const judgeFailEvents = judgeFailRows.length;
|
||
|
||
const judgeFailureReasonMap = new Map<string, { count: number; examples: string[] }>();
|
||
judgeFailRows.forEach((row) => {
|
||
const metadata = asRecordJson(row.metadata);
|
||
const reasonRaw =
|
||
typeof metadata?.error_classification === "string" && metadata.error_classification.length > 0
|
||
? metadata.error_classification
|
||
: typeof metadata?.error_classification_legacy === "string" &&
|
||
metadata.error_classification_legacy.length > 0
|
||
? metadata.error_classification_legacy
|
||
: "UNKNOWN";
|
||
|
||
const reason = reasonRaw.toUpperCase();
|
||
const existing = judgeFailureReasonMap.get(reason);
|
||
if (!existing) {
|
||
judgeFailureReasonMap.set(reason, {
|
||
count: 1,
|
||
examples: row.entityId ? [row.entityId] : [],
|
||
});
|
||
return;
|
||
}
|
||
|
||
existing.count += 1;
|
||
if (row.entityId && existing.examples.length < 3) {
|
||
existing.examples.push(row.entityId);
|
||
}
|
||
});
|
||
|
||
const judgeFailureReasons = Array.from(judgeFailureReasonMap.entries())
|
||
.map(([reason, value]) => ({
|
||
reason,
|
||
count: value.count,
|
||
examples: value.examples,
|
||
}))
|
||
.sort((left, right) => {
|
||
if (right.count !== left.count) {
|
||
return right.count - left.count;
|
||
}
|
||
|
||
return left.reason.localeCompare(right.reason);
|
||
})
|
||
.slice(0, 5);
|
||
|
||
const actorMap = new Map<
|
||
string,
|
||
{ actorId: string; opens: number; claims: number; submits: number }
|
||
>();
|
||
actorRows.forEach((row) => {
|
||
const actorId = row.actorId || "agent:unknown";
|
||
if (isInternalActorId(actorId)) {
|
||
return;
|
||
}
|
||
|
||
const flow = actorMap.get(actorId) || {
|
||
actorId,
|
||
opens: 0,
|
||
claims: 0,
|
||
submits: 0,
|
||
};
|
||
|
||
if (row.action === "EXTERNAL_LIST_OPEN_TASKS" || row.action === "EXTERNAL_LIST_OPEN_TASKS_MCP") {
|
||
flow.opens += row._count._all;
|
||
} else if (row.action === "EXTERNAL_CLAIM_TASK_SUCCESS") {
|
||
flow.claims += row._count._all;
|
||
} else if (row.action === "EXTERNAL_SUBMIT_SOLUTION_SUCCESS") {
|
||
flow.submits += row._count._all;
|
||
}
|
||
|
||
actorMap.set(actorId, flow);
|
||
});
|
||
|
||
const externalOpenedActors = actorMap.size;
|
||
const externalClaimingActors = Array.from(actorMap.values()).filter((item) => item.claims > 0).length;
|
||
const externalSubmittingActors = Array.from(actorMap.values()).filter((item) => item.submits > 0).length;
|
||
const openOnlyActors = Array.from(actorMap.values())
|
||
.filter((item) => item.opens > 0 && item.claims === 0)
|
||
.sort((left, right) => right.opens - left.opens);
|
||
const externalOnlyOpenActors = openOnlyActors.length;
|
||
const topOpenOnlyActors = openOnlyActors.slice(0, 3).map((item) => ({
|
||
actorId: item.actorId,
|
||
opens: item.opens,
|
||
}));
|
||
const sampleOpenTasks = openTaskRows.map((task) => task.id);
|
||
|
||
return {
|
||
discoveryEvents,
|
||
claimEvents,
|
||
submitEvents,
|
||
judgePassEvents,
|
||
judgeFailEvents,
|
||
openTaskCount,
|
||
sampleOpenTasks,
|
||
mcpAuthMissingEvents,
|
||
mcpAuthForbiddenEvents,
|
||
externalOpenedActors,
|
||
externalClaimingActors,
|
||
externalSubmittingActors,
|
||
externalOnlyOpenActors,
|
||
topOpenOnlyActors,
|
||
judgeFailureReasons,
|
||
payoutCaptured,
|
||
payoutReleased,
|
||
periodMinutes: minutes,
|
||
};
|
||
}
|
||
|
||
function buildAlertMessage(rule: string, summary: FunnelSummary) {
|
||
const {
|
||
periodMinutes,
|
||
discoveryEvents,
|
||
claimEvents,
|
||
submitEvents,
|
||
judgePassEvents,
|
||
mcpAuthMissingEvents,
|
||
mcpAuthForbiddenEvents,
|
||
payoutCaptured,
|
||
openTaskCount,
|
||
sampleOpenTasks,
|
||
externalOpenedActors,
|
||
externalClaimingActors,
|
||
externalSubmittingActors,
|
||
externalOnlyOpenActors,
|
||
topOpenOnlyActors,
|
||
judgeFailureReasons,
|
||
} = summary;
|
||
|
||
const authBarrierEvents = mcpAuthMissingEvents + mcpAuthForbiddenEvents;
|
||
const topActorSummary = topOpenOnlyActors
|
||
.slice(0, 3)
|
||
.map((actor) => `${actor.actorId}(${actor.opens})`)
|
||
.join(", ");
|
||
const authBarrierHint =
|
||
authBarrierEvents > 0
|
||
? `偵測到 MCP 權限攔截: AUTH missing=${mcpAuthMissingEvents},FORBIDDEN=${mcpAuthForbiddenEvents}。` +
|
||
`請先確認外部 agent 是否已帶 ` +
|
||
"`Authorization: Bearer <YOUR_API_KEY>`。"
|
||
: "";
|
||
|
||
switch (rule) {
|
||
case "EXTERNAL_FUNNEL_CLAIM_STALL":
|
||
return `外部曝光已達 ${discoveryEvents}(最近 ${periodMinutes} 分鐘),待接任務 ${openTaskCount} 筆,但尚無接案(EXTERNAL_CLAIM_TASK_SUCCESS = ${claimEvents})。` +
|
||
`${sampleOpenTasks.length > 0 ? `可用任務樣本: ${sampleOpenTasks.join(", ")}。` : ""}` +
|
||
`${topActorSummary ? `高活躍 Actor(尚未接案): ${topActorSummary}。` : ""}` +
|
||
`${authBarrierHint}` +
|
||
`請檢查任務是否包含可直接執行的 npx 指令與明確交付條件。`;
|
||
case "EXTERNAL_FUNNEL_SUBMIT_STALL":
|
||
return `外部已有 ${claimEvents} 筆接案,但近期 ${periodMinutes} 分鐘無任何提交(EXTERNAL_SUBMIT_SOLUTION_SUCCESS = ${submitEvents})。請先加速回傳格式與驗收測試規格。`;
|
||
case "EXTERNAL_FUNNEL_PASS_STALL":
|
||
const failureHint =
|
||
judgeFailureReasons.length > 0
|
||
? `常見失敗原因:${judgeFailureReasons
|
||
.map((item) => `${item.reason}(${item.count})`)
|
||
.join("、")}。`
|
||
: "尚未取得明確失敗分類。";
|
||
|
||
const failureSampleTasks = judgeFailureReasons
|
||
.flatMap((item) => item.examples)
|
||
.filter(Boolean)
|
||
.slice(0, 3);
|
||
|
||
return `外部已提交 ${submitEvents} 次但尚無 PASS(JUDGE_RESULT PASS = ${judgePassEvents})。${failureHint}` +
|
||
`${failureSampleTasks.length > 0 ? `失敗任務樣本: ${failureSampleTasks.join(", ")}。` : ""}` +
|
||
`請先檢查 task acceptance_criteria 與測試欄位是否可自動驗證。`;
|
||
case "EXTERNAL_FUNNEL_PAYOUT_STALL":
|
||
return `有 PASS 但未收款(payout CAPTURE 成功 = ${payoutCaptured})。請確認支付授權、Stripe key 與 capture 任務是否正常。`;
|
||
case "EXTERNAL_FUNNEL_OPEN_COLD_STANDBY":
|
||
return `最近 ${periodMinutes} 分鐘觀測到 ${discoveryEvents} 次外部曝光,` +
|
||
`外部 Actor= ${externalOpenedActors} 位,` +
|
||
`已接案=${externalClaimingActors}、已提交=${externalSubmittingActors},` +
|
||
`仍停在曝光僅曝光階段 ${externalOnlyOpenActors} 位。` +
|
||
`${topActorSummary ? `先看未進一步的熱門 Actor:${topActorSummary}。` : ""}` +
|
||
`${authBarrierHint}`;
|
||
default:
|
||
return "外部 AI 流量轉化斷崖異常。";
|
||
}
|
||
}
|
||
|
||
function alertRules(summary: FunnelSummary): Array<{ action: string; message: string }> {
|
||
const alerts: Array<{ action: string; message: string }> = [];
|
||
|
||
if (summary.discoveryEvents >= 3 && summary.openTaskCount > 0 && summary.claimEvents === 0) {
|
||
alerts.push({
|
||
action: "EXTERNAL_FUNNEL_CLAIM_STALL",
|
||
message: buildAlertMessage("EXTERNAL_FUNNEL_CLAIM_STALL", summary),
|
||
});
|
||
}
|
||
|
||
if (summary.externalOnlyOpenActors >= 3 && summary.discoveryEvents >= 10 && summary.openTaskCount > 0) {
|
||
alerts.push({
|
||
action: "EXTERNAL_FUNNEL_OPEN_COLD_STANDBY",
|
||
message: buildAlertMessage("EXTERNAL_FUNNEL_OPEN_COLD_STANDBY", summary),
|
||
});
|
||
}
|
||
|
||
if (summary.claimEvents > 0 && summary.submitEvents === 0) {
|
||
alerts.push({
|
||
action: "EXTERNAL_FUNNEL_SUBMIT_STALL",
|
||
message: buildAlertMessage("EXTERNAL_FUNNEL_SUBMIT_STALL", summary),
|
||
});
|
||
}
|
||
|
||
if (summary.submitEvents > 0 && summary.judgePassEvents === 0) {
|
||
alerts.push({
|
||
action: "EXTERNAL_FUNNEL_PASS_STALL",
|
||
message: buildAlertMessage("EXTERNAL_FUNNEL_PASS_STALL", summary),
|
||
});
|
||
}
|
||
|
||
if (summary.judgePassEvents > 0 && summary.payoutCaptured === 0) {
|
||
alerts.push({
|
||
action: "EXTERNAL_FUNNEL_PAYOUT_STALL",
|
||
message: buildAlertMessage("EXTERNAL_FUNNEL_PAYOUT_STALL", summary),
|
||
});
|
||
}
|
||
|
||
return alerts;
|
||
}
|
||
|
||
async function shouldEmitAlert(key: string): Promise<boolean> {
|
||
try {
|
||
const result = await redis.set(key, String(Date.now()), "EX", ALERT_TTL_SECONDS, "NX");
|
||
if (redisDedupeFailureLogged) {
|
||
redisDedupeFailureLogged = false;
|
||
}
|
||
return result === "OK";
|
||
} catch (error) {
|
||
if (!redisDedupeFailureLogged) {
|
||
console.warn(
|
||
"[traffic-monitor] redis dedupe failed, fallback no-dedupe mode",
|
||
error instanceof Error ? error.message : error
|
||
);
|
||
redisDedupeFailureLogged = true;
|
||
}
|
||
return true;
|
||
}
|
||
}
|
||
|
||
export async function evaluateExternalFunnelHealth(input: MonitorInput): Promise<void> {
|
||
const periodMinutes = Math.max(input.periodMinutes ?? DEFAULT_PERIOD_MINUTES, 5);
|
||
|
||
try {
|
||
const summary = await fetchFunnelSummary(periodMinutes);
|
||
const alerts = alertRules(summary);
|
||
|
||
await Promise.all(
|
||
alerts.map(async (rule) => {
|
||
const dedupeKey = `vw:traffic-funnel:${rule.action}:${input.surface}`;
|
||
const shouldNotify = await shouldEmitAlert(dedupeKey);
|
||
if (!shouldNotify) return;
|
||
|
||
await sendTrafficAlert({
|
||
level: "warning",
|
||
action: rule.action,
|
||
surface: input.surface,
|
||
actorType: "SYSTEM",
|
||
actorId: "traffic-monitor",
|
||
sourceIp: "system",
|
||
userAgent: "traffic-monitor",
|
||
message: rule.message,
|
||
metadata: {
|
||
source_ip: "system",
|
||
user_agent: "traffic-monitor",
|
||
discovery_events: summary.discoveryEvents,
|
||
claim_events: summary.claimEvents,
|
||
submit_events: summary.submitEvents,
|
||
judge_pass_events: summary.judgePassEvents,
|
||
judge_fail_events: summary.judgeFailEvents,
|
||
open_task_count: summary.openTaskCount,
|
||
sample_open_tasks: summary.sampleOpenTasks,
|
||
external_opened_actors: summary.externalOpenedActors,
|
||
external_claiming_actors: summary.externalClaimingActors,
|
||
external_submitting_actors: summary.externalSubmittingActors,
|
||
external_only_open_actors: summary.externalOnlyOpenActors,
|
||
mcp_auth_missing_events: summary.mcpAuthMissingEvents,
|
||
mcp_auth_forbidden_events: summary.mcpAuthForbiddenEvents,
|
||
judge_failure_reasons: summary.judgeFailureReasons,
|
||
payout_captured: summary.payoutCaptured,
|
||
payout_released: summary.payoutReleased,
|
||
period_minutes: summary.periodMinutes,
|
||
top_open_only_actors: summary.topOpenOnlyActors,
|
||
},
|
||
});
|
||
})
|
||
);
|
||
} catch (error) {
|
||
console.error("[traffic-monitor] evaluateExternalFunnelHealth failed", error);
|
||
}
|
||
}
|