Skip to content

nats_jetstream input: allow configurable batch size for Fetch() to improve throughput #4160

@michal-billtech

Description

@michal-billtech

Who is this for and what problem do they have today?

Users consuming high-volume JetStream streams where throughput matters more than per-message latency.

The nats_jetstream input hardcodes Fetch(1), pulling one message per network roundtrip. In our use case it caps throughput at ~2 MiB/min per consumer regardless of max_ack_pending. The only workaround is a broker with high copies, wasting goroutines and connections on sequential single-message fetches when NATS natively supports batch pulls.

What are the success criteria?

A new optional field (e.g. batch_size, defaulting to 1) passed to the NATS client's Fetch(batch int):

input:
    nats_jetstream:
      urls: ["nats://server:4222"]
      stream: my-stream
      durable: my-consumer
      batch_size: 256

Why is solving this problem impactful?

The NATS Go client's Fetch(batch int) already supports this - the hardcoded 1 leaves significant throughput on the table and forces users into workarounds. This would bring nats_jetstream in line with how other inputs handle batching (e.g. kafka with fetch_buffer_cap).

Additional notes

We ran into this while archiving ~2.5M DLQ messages to S3. Love the simplicity of the YAML pipeline approach - would be great to not need the broker workaround for this. Happy to submit a PR if that would be helpful!

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions