queue.ack(p_namespace: text, p_job_id: int8) -> boolAcknowledge successful job completion.
Parameters:
p_namespace: Tenant namespacep_job_id: Job ID
Returns: True if acknowledged, false if job not found or not running
Source: queue/src/functions/030_ack.sql:1
queue.ack_batch(p_namespace: text, p_job_ids: int8[]) -> int4Acknowledge multiple jobs as completed.
Parameters:
p_namespace: Tenant namespacep_job_ids: Array of job IDs
Returns: Count of jobs acknowledged
Source: queue/src/functions/030_ack.sql:58
queue.cancel(p_namespace: text, p_job_id: int8) -> boolCancel a pending job by deleting it.
Parameters:
p_namespace: Tenant namespacep_job_id: Job ID
Returns: True if cancelled, false if job not found or not pending
Source: queue/src/functions/030_ack.sql:248
queue.fail(p_namespace: text, p_job_id: int8, p_error: text) -> boolMove job to dead letter queue (permanent failure).
Parameters:
p_namespace: Tenant namespacep_job_id: Job IDp_error: Error message
Returns: True if moved to DLQ, false if job not found or not running
Source: queue/src/functions/030_ack.sql:198
queue.nack(p_namespace: text, p_job_id: int8, p_error: text, p_backoff: interval) -> boolReturn job to queue for retry (temporary failure).
Parameters:
p_namespace: Tenant namespacep_job_id: Job IDp_error: Error message (stored for debugging)p_backoff: Optional custom backoff delay (default: exponential)
Returns: True if returned to queue, false if max attempts exceeded (moved to DLQ)
Source: queue/src/functions/030_ack.sql:112
queue.purge_queue(p_namespace: text, p_queue: text) -> int4Delete all pending jobs from a queue.
Parameters:
p_namespace: Tenant namespacep_queue: Queue name
Returns: Count of deleted jobs
Source: queue/src/functions/030_ack.sql:335
queue.release_jobs(p_namespace: text, p_worker_id: text) -> int4Release all jobs held by a worker, returning them to pending.
Parameters:
p_namespace: Tenant namespacep_worker_id: Worker identifier (as passed to pull)
Returns: Count of jobs released
Source: queue/src/functions/030_ack.sql:288
queue.clear_actor() -> voidClear actor context. Call before returning connections to pool.
Source: queue/src/functions/080_rls.sql:53
queue.clear_tenant() -> voidClear the tenant context. Call before returning connections to pool.
Source: queue/src/functions/080_rls.sql:19
queue.set_actor(p_actor_id: text, p_request_id: text, p_on_behalf_of: text, p_reason: text) -> voidSet actor context for audit trail.
Parameters:
p_actor_id: ID of the user/system performing the actionp_request_id: Optional request/trace ID for correlationp_on_behalf_of: Optional ID if acting on behalf of another userp_reason: Optional reason for the action (for audit) Actor context is captured when jobs are pushed and stored with the job.
Source: queue/src/functions/080_rls.sql:30
queue.set_tenant(p_tenant_id: text) -> voidSet the tenant context for RLS policies.
Parameters:
p_tenant_id: Tenant/namespace identifier Must be called before any operations. Transaction-local scope.
Source: queue/src/functions/080_rls.sql:1
queue.purge_dead_letters(p_namespace: text, p_queue: text, p_older_than: interval) -> int4Delete old un-retried dead letters.
Parameters:
p_namespace: Tenant namespacep_queue: Queue filter (NULL = all queues)p_older_than: Only delete entries older than this interval (default 30 days)
Returns: Count of deleted dead letters
Source: queue/src/functions/060_dead_letters.sql:203
queue.retry_dead_letter(p_namespace: text, p_dead_letter_id: int8, p_queue: text) -> int8Retry a dead-lettered job by creating a new job from its payload.
Parameters:
p_namespace: Tenant namespacep_dead_letter_id: Dead letter IDp_queue: Queue override (NULL = use original queue)
Returns: New job ID
Source: queue/src/functions/060_dead_letters.sql:1
queue.retry_dead_letters(p_namespace: text, p_queue: text, p_limit: int4) -> table(dead_letter_id: int8, job_id: int8)Retry multiple dead letters for a queue in a single transaction.
Parameters:
p_namespace: Tenant namespacep_queue: Queue to retry dead letters fromp_limit: Maximum dead letters to retry (clamped to 1000)
Returns: Rows of (dead_letter_id, job_id) for each retried entry
Source: queue/src/functions/060_dead_letters.sql:110
queue.extend_visibility(p_namespace: text, p_job_id: int8, p_extension: interval) -> boolExtend the visibility timeout of a running job.
Parameters:
p_namespace: Tenant namespacep_job_id: Job IDp_extension: How much time to add
Returns: True if extended, false if job not found or not running
Source: queue/src/functions/020_pull.sql:193
queue.pull(p_namespace: text, p_queue: text, p_worker_id: text, p_visibility_timeout: interval) -> setof queue.jobsPull one job from a queue.
Parameters:
p_namespace: Tenant namespacep_queue: Queue namep_worker_id: Worker identifier (for debugging stuck jobs)p_visibility_timeout: How long before job returns to queue if not ack'd
Returns: Job record, or NULL if queue is empty
Source: queue/src/functions/020_pull.sql:1
queue.pull_any(p_namespace: text, p_queues: text[], p_worker_id: text, p_visibility_timeout: interval) -> setof queue.jobsPull one job from multiple queues (priority order).
Parameters:
p_namespace: Tenant namespacep_queues: Queue names in priority order (first queue checked first)p_worker_id: Worker identifierp_visibility_timeout: How long before job returns to queue
Returns: Job record from first queue with available job, or NULL
Source: queue/src/functions/020_pull.sql:122
queue.pull_batch(p_namespace: text, p_queue: text, p_limit: int4, p_worker_id: text, p_visibility_timeout: interval) -> setof queue.jobsPull multiple jobs from a queue.
Parameters:
p_namespace: Tenant namespacep_queue: Queue namep_limit: Maximum jobs to pullp_worker_id: Worker identifier (for debugging stuck jobs)p_visibility_timeout: How long before jobs return to queue if not ack'd
Returns: Set of job records
Source: queue/src/functions/020_pull.sql:60
queue.push(p_namespace: text, p_queue: text, p_payload: jsonb, p_delay: interval, p_priority: int4, p_max_attempts: int4, p_unique_key: text, p_tags: text[], p_metadata: jsonb) -> int8Push a job onto a queue.
Parameters:
p_namespace: Tenant namespacep_queue: Queue namep_payload: Job payload (JSONB)p_delay: Optional delay before job becomes visiblep_priority: Priority (-1000 to 1000, higher = more important)p_max_attempts: Maximum retry attemptsp_unique_key: Deduplication key (NULL = no dedup)p_tags: Optional tags for filteringp_metadata: Optional metadata
Returns: Job ID, or NULL if deduplicated
Source: queue/src/functions/010_push.sql:1
queue.push_batch(p_namespace: text, p_queue: text, p_payloads: jsonb[], p_priority: int4, p_max_attempts: int4, p_tags: text[]) -> int8[]Push multiple jobs onto a queue efficiently.
Parameters:
p_namespace: Tenant namespacep_queue: Queue namep_payloads: Array of job payloadsp_priority: Priority for all jobs (default 0)p_max_attempts: Maximum retry attempts for all jobsp_tags: Tags for all jobs
Returns: Array of job IDs
Source: queue/src/functions/010_push.sql:110
queue.create_schedule(p_namespace: text, p_name: text, p_queue: text, p_payload: jsonb, p_cron_expression: text, p_cron_timezone: text, p_every_interval: interval, p_priority: int4, p_max_attempts: int4, p_tags: text[], p_is_active: bool) -> int8Create a recurring schedule that produces jobs automatically.
Parameters:
p_namespace: Tenant namespacep_name: Schedule name (unique per namespace)p_queue: Target queue for generated jobsp_payload: Job payload templatep_cron_expression: 5-field cron expression (mutually exclusive with p_every_interval)p_cron_timezone: Timezone for cron evaluation (default 'UTC')p_every_interval: Fixed interval between runs (mutually exclusive with p_cron_expression)p_priority: Job priority (-1000 to 1000)p_max_attempts: Maximum retry attempts for generated jobsp_tags: Tags applied to generated jobsp_is_active: Whether schedule starts active
Returns: Schedule ID
Source: queue/src/functions/050_schedules.sql:1
queue.delete_schedule(p_namespace: text, p_name: text) -> boolDelete a schedule by name.
Parameters:
p_namespace: Tenant namespacep_name: Schedule name
Returns: True if deleted, false if not found
Source: queue/src/functions/050_schedules.sql:193
queue.get_schedule(p_namespace: text, p_name: text) -> table(id: int8, name: text, queue: text, payload: jsonb, priority: int4, max_attempts: int4, tags: text[], cron_expression: text, cron_timezone: text, every_interval: interval, is_active: bool, last_run_at: timestamptz, last_job_id: int8, next_run_at: timestamptz, run_count: int8, created_at: timestamptz, updated_at: timestamptz)Get a schedule by name.
Parameters:
p_namespace: Tenant namespacep_name: Schedule name
Returns: Schedule row, or empty if not found
Source: queue/src/functions/050_schedules.sql:100
queue.list_schedules(p_namespace: text, p_queue: text, p_is_active: bool, p_limit: int4, p_cursor: text) -> table(name: text, queue: text, cron_expression: text, every_interval: interval, is_active: bool, next_run_at: timestamptz, last_run_at: timestamptz, run_count: int8, created_at: timestamptz)List schedules with optional filters and cursor pagination.
Parameters:
p_namespace: Tenant namespacep_queue: Filter by target queue (NULL = all)p_is_active: Filter by active status (NULL = all)p_limit: Maximum results (clamped to 1000)p_cursor: Last schedule name from previous page
Returns: Schedule rows ordered by name
Source: queue/src/functions/050_schedules.sql:145
queue.pause_schedule(p_namespace: text, p_name: text) -> boolPause an active schedule.
Parameters:
p_namespace: Tenant namespacep_name: Schedule name
Returns: True if paused, false if already paused or not found
Source: queue/src/functions/050_schedules.sql:220
queue.resume_schedule(p_namespace: text, p_name: text) -> boolResume a paused schedule. Recalculates next_run_at from now.
Parameters:
p_namespace: Tenant namespacep_name: Schedule name
Returns: True if resumed, false if already active or not found
Source: queue/src/functions/050_schedules.sql:251
queue.tick_schedules(p_namespace: text, p_limit: int4) -> table(schedule_name: text, job_id: int8, next_run_at: timestamptz)Process due schedules and create jobs.
Parameters:
p_namespace: Tenant namespace (NULL = all namespaces, requires RLS bypass)p_limit: Maximum schedules to process per call
Returns: Rows of (schedule_name, job_id, next_run_at) for each processed schedule
Source: queue/src/functions/055_tick.sql:1
queue.tick_timeouts(p_namespace: text, p_limit: int4) -> table(job_id: int8, queue: text, stuck_duration: interval)Reclaim running jobs whose visibility timeout has expired.
Parameters:
p_namespace: Tenant namespace (NULL = all namespaces, requires RLS bypass)p_limit: Maximum jobs to reclaim per call
Returns: Rows of (job_id, queue, stuck_duration) for each reclaimed job
Source: queue/src/functions/055_tick.sql:101
queue.get_queue_stats(p_namespace: text, p_queue: text) -> table(queue: text, pending: int8, running: int8, completed: int8, dead: int8, oldest_pending_seconds: numeric, dead_letters: int8)Get per-queue statistics with operational metrics.
Parameters:
p_namespace: Tenant namespacep_queue: Queue filter (NULL = all queues)
Returns: Row per queue with status counts, oldest pending age, and un-retried dead letter count
Source: queue/src/functions/040_stats.sql:33
queue.get_stats(p_namespace: text) -> table(total_jobs: int8, pending: int8, running: int8, completed: int8, dead: int8, total_queues: int8)Get namespace-wide queue statistics.
Parameters:
p_namespace: Tenant namespace
Returns: Row with total_jobs, pending, running, completed, dead, total_queues
Source: queue/src/functions/040_stats.sql:1