Postgres-native job queues with retries, scheduling, and dead letter handling. Jobs commit or rollback with your transaction.
Good fit: Background tasks, email sending, webhook delivery, periodic maintenance, or any work that needs retries and should survive app restarts.
Not a fit: Sub-second latency requirements, millions of jobs per second, or fan-out pub/sub. Use Redis or a message broker for those.
See installation instructions in the main README.
-- Set tenant context
SELECT queue.set_tenant('acme');
-- Push a job
SELECT queue.push('acme', 'email', '{"to": "alice@example.com", "subject": "Welcome"}');
-- -> 1 (job ID)
-- Pull next job (locks it for processing)
SELECT * FROM queue.pull('acme', 'email', p_worker_id := 'worker-1');
-- -> id: 1, queue: 'email', payload: {...}, attempts: 1, ...
-- Success: acknowledge completion
SELECT queue.ack('acme', 1);
-- -> true
-- Failure: return to queue for retry
SELECT queue.nack('acme', 1, p_error := 'SMTP timeout');
-- -> true (back to pending with exponential backoff)
-- Permanent failure: move to dead letter queue
SELECT queue.fail('acme', 1, p_error := 'invalid recipient');
-- -> trueJobs move through four states:
- pending - waiting to be pulled (includes scheduled/delayed jobs)
- running - locked by a worker, with a visibility timeout
- completed - acknowledged by worker (deleted or archived per config)
- dead - moved to dead letter queue after max attempts or explicit fail
If a worker crashes, the visibility timeout expires and the job returns to pending automatically.
Create recurring jobs with cron expressions or fixed intervals:
-- Every 5 minutes
SELECT queue.create_schedule(
'acme', 'cleanup', 'maintenance',
'{"action": "expire_sessions"}',
p_cron_expression := '*/5 * * * *'
);
-- Every 2 hours
SELECT queue.create_schedule(
'acme', 'sync', 'data',
'{"source": "upstream"}',
p_every_interval := '2 hours'
);
-- Process due schedules (call from an external cron or timer)
SELECT * FROM queue.tick_schedules('acme');
-- -> schedule_name, job_id, next_run_at for each fired schedulePause and resume without losing schedule definitions:
SELECT queue.pause_schedule('acme', 'cleanup');
SELECT queue.resume_schedule('acme', 'cleanup');Prevent duplicate jobs with unique keys:
SELECT queue.push('acme', 'sync', '{"user": 42}', p_unique_key := 'sync:user:42');
-- -> 5
SELECT queue.push('acme', 'sync', '{"user": 42}', p_unique_key := 'sync:user:42');
-- -> NULL (deduplicated)-- Pull from multiple queues in priority order
SELECT * FROM queue.pull_any('acme', ARRAY['critical', 'default', 'bulk']);
-- Pull a batch of jobs
SELECT * FROM queue.pull_batch('acme', 'email', p_limit := 10);
-- Acknowledge a batch
SELECT queue.ack_batch('acme', ARRAY[1, 2, 3]);
-- Extend processing time for a long-running job
SELECT queue.extend_visibility('acme', 1, '10 minutes');
-- Namespace-wide stats
SELECT * FROM queue.get_stats('acme');
-- -> total_jobs, pending, running, completed, dead, total_queuesSee docs/queue/ for full API reference.
When using connection pools (e.g., PgBouncer, application-level pools), clear context before returning connections:
# After request completes, before returning connection to pool
queue.clear_actor() # Clear audit actor contextTenant context (queue.tenant_id) is set per-request via QueueClient(cursor, namespace=...), so it's automatically overwritten on next use.