Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/fine-taxes-drive.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@vorsteh-queue/adapter-drizzle": minor
"@vorsteh-queue/adapter-kysely": minor
"@vorsteh-queue/adapter-prisma": minor
---

Added support for batch processing
43 changes: 43 additions & 0 deletions .changeset/sweet-oranges-cut.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
---
"@vorsteh-queue/core": minor
---

### Added

- Batch processing support: You can now register batch handlers via `queue.registerBatch`, allowing the queue to process multiple jobs at once according to configurable batch sizes and timing.
- New `batch` configuration options: `minSize`, `maxSize`, and `waitFor` allow fine-grained control over when and how batches are processed.
- Type-safe batch jobs: Batch jobs are strictly separated from scheduled/single jobs and **do not support** cron, delay, or repeat options.
- Adapter API extended: All core adapters now support efficient batch operations.
- Events for batch lifecycle: The queue emits `batch:processing`, `batch:completed`, and `batch:failed` events for batch jobs.

**Handler exclusivity:** A queue can handle only batch jobs or single jobs — not both. Attempting to register both handler types in the same queue will throw an error. This ensures clear and predictable processing.

#### Example

```ts
import { MemoryQueueAdapter, Queue } from "@vorsteh-queue/core"

type EmailPayload = { to: string; body: string }
type EmailResult = { ok: boolean }

const adapter = new MemoryQueueAdapter()
const queue = new Queue<EmailPayload, EmailResult>(adapter, {
name: "batch-demo",
batch: { minSize: 5, maxSize: 20, waitFor: 1000 },
})

queue.registerBatch("send-emails", async (jobs) => {
// jobs is an array of up to 20 jobs
await sendBulkEmails(jobs.map((j) => j.payload))
return jobs.map(() => ({ ok: true }))
})

// Add jobs as usual
await queue.addJobs("send-emails", [
{ to: "[email protected]", body: "Hi A" },
{ to: "[email protected]", body: "Hi B" },
// ...
])

queue.start()
```
4 changes: 0 additions & 4 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ jobs:
- name: Setup pnpm
uses: pnpm/action-setup@v4
with:
version: 10.15.1
run_install: false

- shell: bash
Expand Down Expand Up @@ -95,7 +94,6 @@ jobs:
- name: Setup pnpm
uses: pnpm/action-setup@v4
with:
version: 10.15.1
run_install: false

- shell: bash
Expand Down Expand Up @@ -127,7 +125,6 @@ jobs:
- name: Setup pnpm
uses: pnpm/action-setup@v4
with:
version: 10.15.1
run_install: false

- shell: bash
Expand Down Expand Up @@ -161,7 +158,6 @@ jobs:
- name: Setup pnpm
uses: pnpm/action-setup@v4
with:
version: 10.15.1
run_install: false

- shell: bash
Expand Down
33 changes: 28 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@
## Features

- **Type-safe**: Full TypeScript support with generic job payloads
- **Multiple adapters**: Drizzle ORM (PostgreSQL), Prisma ORM (PostgreSQL), and in-memory implementations
- **Multiple adapters**: Drizzle ORM (PostgreSQL), Prisma ORM (PostgreSQL), Kysely (PostgreSQL) and in-memory implementations
- **Priority queues**: Numeric priority system (lower = higher priority)
- **Delayed jobs**: Schedule jobs for future execution
- **Recurring jobs**: Cron expressions and interval-based repetition
- **Batch processing**: Process multiple jobs concurrently for higher efficiency and better performance by supporting parallel execution and grouping of jobs.
- **UTC-first timezone support**: Reliable timezone handling with UTC storage
- **Progress tracking**: Real-time job progress updates
- **Event system**: Listen to job lifecycle events
Expand All @@ -24,6 +25,7 @@
│ ├── core/ # Core queue logic and interfaces
│ ├── adapter-drizzle/ # Drizzle ORM adapter (PostgreSQL)
│ └── adapter-prisma/ # Prisma ORM adapter (PostgreSQL)
│ └── adapter-kysely/ # Kysely adapter (PostgreSQL)
├── examples/ # Standalone usage examples
└── tooling/ # Shared development tools
```
Expand All @@ -49,9 +51,14 @@ pnpm add @vorsteh-queue/core @vorsteh-queue/adapter-drizzle

```typescript
// Drizzle ORM with PostgreSQL

// Prisma ORM with PostgreSQL
import { PrismaClient } from "@prisma/client"
import { drizzle } from "drizzle-orm/node-postgres"
import { Pool } from "pg"

import { PostgresQueueAdapter } from "@vorsteh-queue/adapter-drizzle"
import { PostgresPrismaQueueAdapter } from "@vorsteh-queue/adapter-prisma"
import { Queue } from "@vorsteh-queue/core"

interface EmailPayload {
Expand All @@ -69,10 +76,6 @@ const pool = new Pool({ connectionString: "postgresql://..." })
const db = drizzle(pool)
const queue = new Queue(new PostgresQueueAdapter(db), { name: "my-queue" })

// Prisma ORM with PostgreSQL
import { PrismaClient } from "@prisma/client"
import { PostgresPrismaQueueAdapter } from "@vorsteh-queue/adapter-prisma"

const prisma = new PrismaClient()
const queue = new Queue(new PostgresPrismaQueueAdapter(prisma), { name: "my-queue" })

Expand Down Expand Up @@ -139,6 +142,26 @@ await queue.add("health-check", payload, {
})
```

## Batch Processing

Process multiple jobs in a single batch for higher throughput and efficiency.

```typescript
queue.registerBatch<{ file: string }, { ok: boolean }>("process-files", async (jobs) => {
console.log(`Processing batch of ${jobs.length} files...`)
return jobs.map(() => ({ ok: true }))
})

await queue.addJobs("process-files", [{ file: "a.csv" }, { file: "b.csv" }, { file: "c.csv" }])

queue.on("batch:processing", (jobs) => {
console.log(`Batch started: ${jobs.length} jobs`)
})
queue.on("batch:completed", (jobs) => {
console.log(`Batch completed: ${jobs.length} jobs`)
})
```

## Job Cleanup

```typescript
Expand Down
11 changes: 6 additions & 5 deletions apps/docs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
"@mdx-js/loader": "3.1.1",
"@mdx-js/node-loader": "3.1.1",
"@mdx-js/react": "3.1.1",
"@next/mdx": "15.5.3",
"@next/mdx": "15.5.4",
"@radix-ui/react-collapsible": "^1.1.12",
"@radix-ui/react-compose-refs": "1.1.2",
"@radix-ui/react-dialog": "^1.1.15",
Expand All @@ -41,7 +41,7 @@
"interweave": "13.1.1",
"lucide-react": "0.544.0",
"multimatch": "7.0.0",
"next": "15.5.3",
"next": "15.5.4",
"next-themes": "latest",
"p-map": "7.0.3",
"react": "19.1.1",
Expand All @@ -53,23 +53,24 @@
"remark-mdx-frontmatter": "5.2.0",
"remark-squeeze-paragraphs": "6.0.0",
"remark-strip-badges": "7.0.0",
"renoun": "10.1.0",
"renoun": "10.1.2",
"tm-grammars": "1.24.13",
"tm-themes": "1.10.9",
"ts-morph": "27.0.0",
"tw-animate-css": "^1.3.8",
"tw-animate-css": "^1.4.0",
"use-debounce": "10.0.6",
"zod": "4.1.11"
},
"devDependencies": {
"@tailwindcss/postcss": "4.1.13",
"@tailwindcss/typography": "0.5.18",
"@tailwindcss/typography": "0.5.19",
"@types/mdx": "2.0.13",
"@types/node": "22.18.6",
"@types/react": "19.1.13",
"@types/react-dom": "19.1.9",
"@types/serve-handler": "6.1.4",
"@vorsteh-queue/adapter-drizzle": "workspace:*",
"@vorsteh-queue/adapter-kysely": "workspace:*",
"@vorsteh-queue/adapter-prisma": "workspace:*",
"@vorsteh-queue/core": "workspace:*",
"@vorsteh-queue/eslint-config": "workspace:*",
Expand Down
1 change: 1 addition & 0 deletions examples/batch-processing/.env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DATABASE_URL=postgresql://postgres:password@localhost:5432/queue_db
10 changes: 10 additions & 0 deletions examples/batch-processing/drizzle.config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { defineConfig } from "drizzle-kit"

export default defineConfig({
schema: "./src/schema.ts",
out: "./drizzle",
dialect: "postgresql",
dbCredentials: {
url: process.env.DATABASE_URL || "postgresql://postgres:postgres@localhost:5432/queue_tracking",
},
})
22 changes: 22 additions & 0 deletions examples/batch-processing/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"name": "batch-processing-example",
"version": "1.0.0",
"description": "batch processing example using Drizzle ORM with postgres.js",
"type": "module",
"private": true,
"scripts": {
"dev": "tsx src/index.ts",
"db:push": "drizzle-kit push"
},
"dependencies": {
"@vorsteh-queue/adapter-drizzle": "workspace:*",
"@vorsteh-queue/core": "workspace:*",
"drizzle-orm": "^0.44.5",
"postgres": "^3.4.7"
},
"devDependencies": {
"drizzle-kit": "^0.31.4",
"tsx": "4.20.5",
"typescript": "^5.9.2"
}
}
18 changes: 18 additions & 0 deletions examples/batch-processing/readme.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
---
title: Batch Processing Example
navTitle: Batch Processing
description: Demonstrates batch job processing. Shows how to register batch handlers, dispatch jobs in batches, and monitor batch events for robust, event-driven workflows.
---

## Setup

Use the CLI to create this example:

```bash
npx create-vorsteh-queue@latest my-project --template batch-processing
cd my-project
cp .env.example .env
# Edit .env with your PostgreSQL database URL
pnpm db:push
pnpm dev
```
13 changes: 13 additions & 0 deletions examples/batch-processing/src/database.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { drizzle } from "drizzle-orm/postgres-js"
import postgres from "postgres"

import * as schema from "./schema"

// Shared database connection
const client = postgres(
process.env.DATABASE_URL || "postgresql://postgres:password@localhost:5432/queue_tracking",
{ max: 10 } // Connection pool
)

export const db = drizzle(client, { schema })
export { client }
72 changes: 72 additions & 0 deletions examples/batch-processing/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import { PostgresQueueAdapter } from "@vorsteh-queue/adapter-drizzle"
import { Queue } from "@vorsteh-queue/core"

import { client, db } from "./database"

// Queue setup with batch config
const queue = new Queue(new PostgresQueueAdapter(db), {
name: "batch-demo",
batch: { minSize: 3, maxSize: 10, waitFor: 2000 },
removeOnComplete: 5,
removeOnFail: 3,
})

interface FilePayload {
file: string
}
interface FileResult {
ok: boolean
}

// Register a batch handler for processing files
queue.registerBatch<FilePayload, FileResult>("process-files", async (jobs) => {
console.log(`Processing batch of ${jobs.length} files...`)
// Simulate processing
await Promise.all(
jobs.map(async (job) => {
await new Promise((resolve) => setTimeout(resolve, 200))
console.log(` ✔️ Processed: ${job.payload.file}`)
}),
)
return jobs.map(() => ({ ok: true }))
})

// Listen to batch events
queue.on("batch:processing", (jobs) => {
console.log(`Batch processing started: ${jobs.length} jobs`)
})
queue.on("batch:completed", (jobs) => {
console.log(`Batch completed: ${jobs.length} jobs`)
})
queue.on("batch:failed", ({ jobs, error }) => {
console.error(`Batch failed: ${jobs.length} jobs`, error)
})

async function main() {
console.log("🚀 Starting Batch Processing Example\n")

// Add jobs in a batch
await queue.addJobs("process-files", [
{ file: "a.csv" },
{ file: "b.csv" },
{ file: "c.csv" },
{ file: "d.csv" },
])

// Start processing
queue.start()
console.log("🔄 Queue processing started!")

// Wait for batches to complete
setTimeout(async () => {
await queue.stop()
await client.end()
console.log("✅ Batch processing complete. Shutdown.")
process.exit(0)
}, 5000)
}

main().catch((error) => {
console.error("❌ Batch processing error:", error)
process.exit(1)
})
3 changes: 3 additions & 0 deletions examples/batch-processing/src/schema.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import { postgresSchema } from "@vorsteh-queue/adapter-drizzle"

export const { queueJobs } = postgresSchema
13 changes: 13 additions & 0 deletions examples/batch-processing/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"compilerOptions": {
"target": "ES2022",
"module": "ESNext",
"moduleResolution": "bundler",
"strict": true,
"esModuleInterop": true,
"skipLibCheck": true,
"forceConsistentCasingInFileNames": true
},
"include": ["src/**/*"],
"exclude": ["node_modules", "dist"]
}
2 changes: 1 addition & 1 deletion examples/drizzle-pglite/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"db:push": "drizzle-kit push"
},
"dependencies": {
"@electric-sql/pglite": "^0.3.8",
"@electric-sql/pglite": "^0.3.10",
"@vorsteh-queue/adapter-drizzle": "workspace:*",
"@vorsteh-queue/core": "workspace:*",
"drizzle-orm": "^0.44.5"
Expand Down
2 changes: 1 addition & 1 deletion examples/pm2-workers/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
"devDependencies": {
"dotenv-cli": "10.0.0",
"drizzle-kit": "^0.31.4",
"pm2": "6.0.11",
"pm2": "6.0.13",
"tsx": "4.20.5",
"typescript": "^5.9.2"
}
Expand Down
2 changes: 1 addition & 1 deletion examples/result-storage/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"db:push": "drizzle-kit push"
},
"dependencies": {
"@electric-sql/pglite": "^0.3.8",
"@electric-sql/pglite": "^0.3.10",
"@vorsteh-queue/adapter-drizzle": "workspace:*",
"@vorsteh-queue/core": "workspace:*",
"drizzle-orm": "^0.44.5"
Expand Down
Loading
Loading