Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
0550658
WIP
ericallam Dec 5, 2025
7b5056b
some fixes
ericallam Dec 5, 2025
ceddba0
fair queue baby
ericallam Dec 8, 2025
c1743bb
wip of the streaming batch trigger stuff
ericallam Dec 8, 2025
8a9532d
new async iterable version of batch trigger
ericallam Dec 9, 2025
fdbed14
pnpm lock changes
ericallam Dec 9, 2025
98a5709
add new batch status to api schema
ericallam Dec 9, 2025
f066ae5
Handle large payloads and correct the trace ID propogation to child runs
ericallam Dec 9, 2025
8552b3d
record when the batch processing is completed
ericallam Dec 9, 2025
d72068c
more batch processing work
ericallam Dec 11, 2025
d55dbb4
better dequeuing from fair queue
ericallam Dec 11, 2025
08eb6dd
fixed tests and removed the run number incrementor from the run engin…
ericallam Dec 12, 2025
7c85267
restructure the batch queue callbacks to prevent circular import
ericallam Dec 12, 2025
f3354b1
handle batch failures more reliably
ericallam Dec 12, 2025
00d5428
Better handling of batches when sealed = false
ericallam Dec 13, 2025
4d76367
improve completion tracker callback calling and cleanup
ericallam Dec 13, 2025
6c6de84
Add unique index to batch task run errors on the index field and more…
ericallam Dec 13, 2025
d69cfa5
batch task run seal update is atomic
ericallam Dec 13, 2025
5b8ec88
better ndjson parsing with bounded memory usage
ericallam Dec 13, 2025
5211ba6
release concurrency slot when moving to the DLQ
ericallam Dec 13, 2025
f592aac
Fix fair queue heartbeat
ericallam Dec 13, 2025
1610766
fixed batch payload handling
ericallam Dec 13, 2025
da247bf
capture and pass the planType through to triggering
ericallam Dec 13, 2025
b77c910
should preserve runFriendlyId across retries when RunDuplicateIdempot…
ericallam Dec 14, 2025
02eec34
better handling streaming ndjson retries
ericallam Dec 14, 2025
8f1619b
Better batch friendly ID parsing errors
ericallam Dec 14, 2025
82da484
only parse payloads when they are application/json
ericallam Dec 15, 2025
7e073b4
fix batch queue redis config
ericallam Dec 15, 2025
a5791a0
cancel read stream before releasing lock
ericallam Dec 15, 2025
628a7d2
Add changeset
ericallam Dec 15, 2025
fce7874
Fix stream batch items test
ericallam Dec 15, 2025
e78b619
all tests should use root vitest
ericallam Dec 15, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/fluffy-crews-rhyme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@trigger.dev/sdk": patch
---

feat: Support for new batch trigger system
8 changes: 7 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,10 @@ POSTHOG_PROJECT_KEY=
# These control the server-side internal telemetry
# INTERNAL_OTEL_TRACE_EXPORTER_URL=<URL to send traces to>
# INTERNAL_OTEL_TRACE_LOGGING_ENABLED=1
# INTERNAL_OTEL_TRACE_INSTRUMENT_PRISMA_ENABLED=0,
# INTERNAL_OTEL_TRACE_INSTRUMENT_PRISMA_ENABLED=0

# Enable local observability stack (requires `pnpm run docker` to start otel-collector)
# Uncomment these to send metrics to the local Prometheus via OTEL Collector:
# INTERNAL_OTEL_METRIC_EXPORTER_ENABLED=1
# INTERNAL_OTEL_METRIC_EXPORTER_URL=http://localhost:4318/v1/metrics
# INTERNAL_OTEL_METRIC_EXPORTER_INTERVAL_MS=15000
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,5 @@ apps/**/public/build
/packages/trigger-sdk/src/package.json
/packages/python/src/package.json
.claude
.mcp.log
.mcp.log
.cursor/debug.log
32 changes: 27 additions & 5 deletions apps/webapp/app/components/runs/v3/BatchStatus.tsx
Original file line number Diff line number Diff line change
@@ -1,17 +1,27 @@
import { CheckCircleIcon, XCircleIcon } from "@heroicons/react/20/solid";
import {
CheckCircleIcon,
ExclamationTriangleIcon,
XCircleIcon,
} from "@heroicons/react/20/solid";
import type { BatchTaskRunStatus } from "@trigger.dev/database";
import assertNever from "assert-never";
import { Spinner } from "~/components/primitives/Spinner";
import { cn } from "~/utils/cn";

export const allBatchStatuses = ["PENDING", "COMPLETED", "ABORTED"] as const satisfies Readonly<
Array<BatchTaskRunStatus>
>;
export const allBatchStatuses = [
"PROCESSING",
"PENDING",
"COMPLETED",
"PARTIAL_FAILED",
"ABORTED",
] as const satisfies Readonly<Array<BatchTaskRunStatus>>;

const descriptions: Record<BatchTaskRunStatus, string> = {
PROCESSING: "The batch is being processed and runs are being created.",
PENDING: "The batch has child runs that have not yet completed.",
COMPLETED: "All the batch child runs have finished.",
ABORTED: "The batch was aborted because some child tasks could not be triggered.",
PARTIAL_FAILED: "Some runs failed to be created. Successfully created runs are still executing.",
ABORTED: "The batch was aborted because child tasks could not be triggered.",
};

export function descriptionForBatchStatus(status: BatchTaskRunStatus): string {
Expand Down Expand Up @@ -47,10 +57,14 @@ export function BatchStatusIcon({
className: string;
}) {
switch (status) {
case "PROCESSING":
return <Spinner className={cn(batchStatusColor(status), className)} />;
case "PENDING":
return <Spinner className={cn(batchStatusColor(status), className)} />;
case "COMPLETED":
return <CheckCircleIcon className={cn(batchStatusColor(status), className)} />;
case "PARTIAL_FAILED":
return <ExclamationTriangleIcon className={cn(batchStatusColor(status), className)} />;
case "ABORTED":
return <XCircleIcon className={cn(batchStatusColor(status), className)} />;
default: {
Expand All @@ -61,10 +75,14 @@ export function BatchStatusIcon({

export function batchStatusColor(status: BatchTaskRunStatus): string {
switch (status) {
case "PROCESSING":
return "text-blue-500";
case "PENDING":
return "text-pending";
case "COMPLETED":
return "text-success";
case "PARTIAL_FAILED":
return "text-warning";
case "ABORTED":
return "text-error";
default: {
Expand All @@ -75,10 +93,14 @@ export function batchStatusColor(status: BatchTaskRunStatus): string {

export function batchStatusTitle(status: BatchTaskRunStatus): string {
switch (status) {
case "PROCESSING":
return "Processing";
case "PENDING":
return "In progress";
case "COMPLETED":
return "Completed";
case "PARTIAL_FAILED":
return "Partial failure";
case "ABORTED":
return "Aborted";
default: {
Expand Down
30 changes: 15 additions & 15 deletions apps/webapp/app/entry.server.tsx
Original file line number Diff line number Diff line change
@@ -1,22 +1,28 @@
import {
createReadableStreamFromReadable,
type DataFunctionArgs,
type EntryContext,
} from "@remix-run/node"; // or cloudflare/deno
import { createReadableStreamFromReadable, type EntryContext } from "@remix-run/node"; // or cloudflare/deno
import { RemixServer } from "@remix-run/react";
import { wrapHandleErrorWithSentry } from "@sentry/remix";
import { parseAcceptLanguage } from "intl-parse-accept-language";
import isbot from "isbot";
import { renderToPipeableStream } from "react-dom/server";
import { PassThrough } from "stream";
import * as Worker from "~/services/worker.server";
import { bootstrap } from "./bootstrap";
import { LocaleContextProvider } from "./components/primitives/LocaleProvider";
import {
OperatingSystemContextProvider,
OperatingSystemPlatform,
} from "./components/primitives/OperatingSystemProvider";
import { Prisma } from "./db.server";
import { env } from "./env.server";
import { eventLoopMonitor } from "./eventLoopMonitor.server";
import { logger } from "./services/logger.server";
import { resourceMonitor } from "./services/resourceMonitor.server";
import { singleton } from "./utils/singleton";
import { bootstrap } from "./bootstrap";
import { wrapHandleErrorWithSentry } from "@sentry/remix";
import { remoteBuildsEnabled } from "./v3/remoteImageBuilder.server";
import {
registerRunEngineEventBusHandlers,
setupBatchQueueCallbacks,
} from "./v3/runEngineHandlers.server";

const ABORT_DELAY = 30000;

Expand Down Expand Up @@ -228,19 +234,13 @@ process.on("uncaughtException", (error, origin) => {
});

singleton("RunEngineEventBusHandlers", registerRunEngineEventBusHandlers);
singleton("SetupBatchQueueCallbacks", setupBatchQueueCallbacks);

export { apiRateLimiter } from "./services/apiRateLimit.server";
export { engineRateLimiter } from "./services/engineRateLimit.server";
export { runWithHttpContext } from "./services/httpAsyncStorage.server";
export { socketIo } from "./v3/handleSocketIo.server";
export { wss } from "./v3/handleWebsockets.server";
export { runWithHttpContext } from "./services/httpAsyncStorage.server";
import { eventLoopMonitor } from "./eventLoopMonitor.server";
import { env } from "./env.server";
import { logger } from "./services/logger.server";
import { Prisma } from "./db.server";
import { registerRunEngineEventBusHandlers } from "./v3/runEngineHandlers.server";
import { remoteBuildsEnabled } from "./v3/remoteImageBuilder.server";
import { resourceMonitor } from "./services/resourceMonitor.server";

if (env.EVENT_LOOP_MONITOR_ENABLED === "1") {
eventLoopMonitor.enable();
Expand Down
28 changes: 28 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,7 @@ const EnvironmentSchema = z
MAXIMUM_TRACE_SUMMARY_VIEW_COUNT: z.coerce.number().int().default(25_000),
MAXIMUM_TRACE_DETAILED_SUMMARY_VIEW_COUNT: z.coerce.number().int().default(10_000),
TASK_PAYLOAD_OFFLOAD_THRESHOLD: z.coerce.number().int().default(524_288), // 512KB
BATCH_PAYLOAD_OFFLOAD_THRESHOLD: z.coerce.number().int().optional(), // Defaults to TASK_PAYLOAD_OFFLOAD_THRESHOLD if not set
TASK_PAYLOAD_MAXIMUM_SIZE: z.coerce.number().int().default(3_145_728), // 3MB
BATCH_TASK_PAYLOAD_MAXIMUM_SIZE: z.coerce.number().int().default(1_000_000), // 1MB
TASK_RUN_METADATA_MAXIMUM_SIZE: z.coerce.number().int().default(262_144), // 256KB
Expand All @@ -537,6 +538,14 @@ const EnvironmentSchema = z
MAX_BATCH_V2_TRIGGER_ITEMS: z.coerce.number().int().default(500),
MAX_BATCH_AND_WAIT_V2_TRIGGER_ITEMS: z.coerce.number().int().default(500),

// 2-phase batch API settings
STREAMING_BATCH_MAX_ITEMS: z.coerce.number().int().default(1_000), // Max items in streaming batch
STREAMING_BATCH_ITEM_MAXIMUM_SIZE: z.coerce.number().int().default(3_145_728),
BATCH_RATE_LIMIT_REFILL_RATE: z.coerce.number().int().default(10),
BATCH_RATE_LIMIT_MAX: z.coerce.number().int().default(1200),
BATCH_RATE_LIMIT_REFILL_INTERVAL: z.string().default("10s"),
BATCH_CONCURRENCY_LIMIT_DEFAULT: z.coerce.number().int().default(10),

REALTIME_STREAM_VERSION: z.enum(["v1", "v2"]).default("v1"),
REALTIME_STREAM_MAX_LENGTH: z.coerce.number().int().default(1000),
REALTIME_STREAM_TTL: z.coerce
Expand Down Expand Up @@ -931,6 +940,25 @@ const EnvironmentSchema = z
.default(process.env.REDIS_TLS_DISABLED ?? "false"),
BATCH_TRIGGER_WORKER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),

// BatchQueue DRR settings (Run Engine v2)
BATCH_QUEUE_DRR_QUANTUM: z.coerce.number().int().default(5),
BATCH_QUEUE_MAX_DEFICIT: z.coerce.number().int().default(50),
BATCH_QUEUE_CONSUMER_COUNT: z.coerce.number().int().optional(),
BATCH_QUEUE_CONSUMER_INTERVAL_MS: z.coerce.number().int().optional(),
// Global rate limit: max items processed per second across all consumers
// If not set, no global rate limiting is applied
BATCH_QUEUE_GLOBAL_RATE_LIMIT: z.coerce.number().int().positive().optional(),

// Batch rate limits and concurrency by plan type
// Rate limit: max items per minute for batch creation
BATCH_RATE_LIMIT_FREE: z.coerce.number().int().default(100), // 100 items/min for free
BATCH_RATE_LIMIT_PAID: z.coerce.number().int().default(10_000), // 10k items/min for paid
BATCH_RATE_LIMIT_ENTERPRISE: z.coerce.number().int().default(100_000), // 100k items/min for enterprise
// Processing concurrency: max concurrent batch items being processed
BATCH_CONCURRENCY_FREE: z.coerce.number().int().default(1),
BATCH_CONCURRENCY_PAID: z.coerce.number().int().default(10),
BATCH_CONCURRENCY_ENTERPRISE: z.coerce.number().int().default(50),

ADMIN_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"),
ADMIN_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2),
ADMIN_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(10),
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/presenters/v3/BatchListPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ WHERE
throw new Error(`Environment not found for Batch ${batch.id}`);
}

const hasFinished = batch.status !== "PENDING";
const hasFinished = batch.status !== "PENDING" && batch.status !== "PROCESSING";

return {
id: batch.id,
Expand Down
122 changes: 122 additions & 0 deletions apps/webapp/app/presenters/v3/BatchPresenter.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import { type BatchTaskRunStatus } from "@trigger.dev/database";
import { displayableEnvironment } from "~/models/runtimeEnvironment.server";
import { engine } from "~/v3/runEngine.server";
import { BasePresenter } from "./basePresenter.server";

type BatchPresenterOptions = {
environmentId: string;
batchId: string;
userId?: string;
};

export type BatchPresenterData = Awaited<ReturnType<BatchPresenter["call"]>>;

export class BatchPresenter extends BasePresenter {
public async call({ environmentId, batchId, userId }: BatchPresenterOptions) {
const batch = await this._replica.batchTaskRun.findFirst({
select: {
id: true,
friendlyId: true,
status: true,
runCount: true,
batchVersion: true,
createdAt: true,
updatedAt: true,
completedAt: true,
processingStartedAt: true,
processingCompletedAt: true,
successfulRunCount: true,
failedRunCount: true,
idempotencyKey: true,
runtimeEnvironment: {
select: {
id: true,
type: true,
slug: true,
orgMember: {
select: {
user: {
select: {
id: true,
name: true,
displayName: true,
},
},
},
},
},
},
errors: {
select: {
id: true,
index: true,
taskIdentifier: true,
error: true,
errorCode: true,
createdAt: true,
},
orderBy: {
index: "asc",
},
},
},
where: {
runtimeEnvironmentId: environmentId,
friendlyId: batchId,
},
});

if (!batch) {
throw new Error("Batch not found");
}

const hasFinished = batch.status !== "PENDING" && batch.status !== "PROCESSING";
const isV2 = batch.batchVersion === "runengine:v2";

// For v2 batches in PROCESSING state, get live progress from Redis
// This provides real-time updates without waiting for the batch to complete
let liveSuccessCount = batch.successfulRunCount ?? 0;
let liveFailureCount = batch.failedRunCount ?? 0;

if (isV2 && batch.status === "PROCESSING") {
const liveProgress = await engine.getBatchQueueProgress(batch.id);
if (liveProgress) {
liveSuccessCount = liveProgress.successCount;
liveFailureCount = liveProgress.failureCount;
}
}

return {
id: batch.id,
friendlyId: batch.friendlyId,
status: batch.status as BatchTaskRunStatus,
runCount: batch.runCount,
batchVersion: batch.batchVersion,
isV2,
createdAt: batch.createdAt.toISOString(),
updatedAt: batch.updatedAt.toISOString(),
completedAt: batch.completedAt?.toISOString(),
processingStartedAt: batch.processingStartedAt?.toISOString(),
processingCompletedAt: batch.processingCompletedAt?.toISOString(),
finishedAt: batch.completedAt
? batch.completedAt.toISOString()
: hasFinished
? batch.updatedAt.toISOString()
: undefined,
hasFinished,
successfulRunCount: liveSuccessCount,
failedRunCount: liveFailureCount,
idempotencyKey: batch.idempotencyKey,
environment: displayableEnvironment(batch.runtimeEnvironment, userId),
errors: batch.errors.map((error) => ({
id: error.id,
index: error.index,
taskIdentifier: error.taskIdentifier,
error: error.error,
errorCode: error.errorCode,
createdAt: error.createdAt.toISOString(),
})),
};
}
}

Loading
Loading