Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/integration_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ jobs:
- ./internal/impl/snowflake/streaming
# - ./internal/impl/splunk
# - ./internal/impl/sql
- ./internal/impl/tigerbeetle
# - ./internal/impl/zeromq

steps:
Expand Down
200 changes: 200 additions & 0 deletions docs/modules/components/pages/inputs/tigerbeetle_cdc.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
= tigerbeetle_cdc
:type: input
:status: beta
:categories: ["Services"]



////
THIS FILE IS AUTOGENERATED!

To make changes, edit the corresponding source file under:

https://github.com/redpanda-data/connect/tree/main/internal/impl/<provider>.

And:

https://github.com/redpanda-data/connect/tree/main/cmd/tools/docs_gen/templates/plugin.adoc.tmpl
////

// © 2024 Redpanda Data Inc.


component_type_dropdown::[]


Enables TigerBeetle CDC streaming for Redpanda Connect.

Introduced in version 0.0.1.

```yml
# Config fields, showing default values
input:
label: ""
tigerbeetle_cdc:
cluster_id: "" # No default (required)
addresses: [] # No default (required)
progress_cache: "" # No default (required)
rate_limit: ""
event_count_max: 2730
idle_interval_ms: 1000
timestamp_initial: ""
auto_replay_nacks: true
```

Listens to a TigerBeetle cluster and creates a message for each change.

Each message is a JSON object like:

```json
{
"timestamp": "1745328372758695656",
"type": "single_phase",
"ledger": 2,
"transfer": {
"id": "9082709",
"amount": "3794",
"pending_id": "0",
"user_data_128": "79248595801719937611592367840129079151",
"user_data_64": "13615171707598273871",
"user_data_32": 3229992513,
"timeout": 0,
"code": 20295,
"flags": 0,
"timestamp": "1745328372758695656"
},
"debit_account": {
"id": "3750",
"debits_pending": "0",
"debits_posted": "8463768",
"credits_pending": "0",
"credits_posted": "8861179",
"user_data_128": "118966247877720884212341541320399553321",
"user_data_64": "526432537153007844",
"user_data_32": 4157247332,
"code": 1,
"flags": 0,
"timestamp": "1745328270103398016"
},
"credit_account": {
"id": "6765",
"debits_pending": "0",
"debits_posted": "8669204",
"credits_pending": "0",
"credits_posted": "8637251",
"user_data_128": "43670023860556310170878798978091998141",
"user_data_64": "12485093662256535374",
"user_data_32": 1924162092,
"code": 1,
"flags": 0,
"timestamp": "1745328270103401031"
}
}
```

For more information refer to https://docs.tigerbeetle.com/operating/cdc/

== Metadata

This input adds the following metadata fields to each message:

- event_type: One of "single_phase", "two_phase_pending", "two_phase_posted", "two_phase_voided", or "two_phase_expired".
- ledger: The ledger code.
- transfer_code: The transfer code.
- debit_account_code: The debit account code.
- credit_account_code: The credit account code.
- timestamp: The unique event timestamp with nanosecond resolution.
- timestamp_ms: The event timestamp with millisecond resolution.

== Guarantees

This input guarantees _at-least-once semantics_, and makes a best effort to prevent
duplicate messages. However, during crash recovery, it may replay unacknowledged
messages that could have been already delivered to consumers.

It is the consumer’s responsibility to perform idempotency checks when processing messages.

== Upgrading

The TigerBeetle client version must not be newer than the cluster version, as it will fail
with an error message if so.

Requires TigerBeetle cluster version 0.16.57 or greater.

== Fields

=== `cluster_id`

The TigerBeetle unique 128-bit cluster ID.


*Type*: `string`


=== `addresses`

A list of IP addresses of all the TigerBeetle replicas in the cluster. The order of addresses must correspond to the order of replicas.


*Type*: `array`


=== `progress_cache`

A https://docs.redpanda.com/redpanda-connect/components/caches/about[cache resource^] used to track progress by storing the last acknowledged timestamp.
This allows Redpanda Connect to resume from the latest delivered event upon restart.


*Type*: `string`


=== `rate_limit`

An optional https://docs.redpanda.com/redpanda-connect/components/rate_limits/about/[rate limit^] to throttle the number of **requests** made to TigerBeetle.


*Type*: `string`

*Default*: `""`

=== `event_count_max`

The maximum number of events fetched from TigerBeetle per **request**.
Must be greater than zero.


*Type*: `int`

*Default*: `2730`

=== `idle_interval_ms`

The time interval in milliseconds to wait before querying again when the last request returned no events.
Must be greater than zero.


*Type*: `int`

*Default*: `1000`

=== `timestamp_initial`

The initial timestamp to start extracting events from. If not defined, all events since the beginning will be included.
Ignored if a more recent timestamp has already been acknowledged.
This is a TigerBeetle timestamp with nanosecond precision.


*Type*: `string`

*Default*: `""`

=== `auto_replay_nacks`

Whether messages that are rejected (nacked) at the output level should be automatically replayed indefinitely, eventually resulting in back pressure if the cause of the rejections is persistent. If set to `false` these messages will instead be deleted. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data can be discarded immediately upon consumption and mutation.


*Type*: `bool`

*Default*: `true`


1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ require (
github.com/testcontainers/testcontainers-go/modules/ollama v0.37.0
github.com/testcontainers/testcontainers-go/modules/qdrant v0.37.0
github.com/tetratelabs/wazero v1.7.3
github.com/tigerbeetle/tigerbeetle-go v0.16.57
github.com/timeplus-io/proton-go-driver/v2 v2.0.17
github.com/tmc/langchaingo v0.1.13
github.com/trinodb/trino-go-client v0.315.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1931,6 +1931,8 @@ github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhV
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4=
github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tigerbeetle/tigerbeetle-go v0.16.57 h1:2yZCg+SO+QHCBImjPvYyvl7xBEjqktyuXE8O8t90X5w=
github.com/tigerbeetle/tigerbeetle-go v0.16.57/go.mod h1:d6G7n4OlD7GLHd62x0VlWPXeI/L0SoNNTfm/ee24GJI=
github.com/tilinna/z85 v1.0.0 h1:uqFnJBlD01dosSeo5sK1G1YGbPuwqVHqR+12OJDRjUw=
github.com/tilinna/z85 v1.0.0/go.mod h1:EfpFU/DUY4ddEy6CRvk2l+UQNEzHbh+bqBQS+04Nkxs=
github.com/timeplus-io/proton-go-driver/v2 v2.0.17 h1:rXPT21/9FgQYFntSgLvJRL/7pgPAfTXWIZKp5UG+vQ0=
Expand Down
Loading