Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/integration_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ jobs:
- ./internal/impl/snowflake/streaming
# - ./internal/impl/splunk
# - ./internal/impl/sql
- ./internal/impl/tigerbeetle
# - ./internal/impl/zeromq

steps:
Expand Down
200 changes: 200 additions & 0 deletions docs/modules/components/pages/inputs/tigerbeetle_cdc.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
= tigerbeetle_cdc
:type: input
:status: beta
:categories: ["Services"]



////
THIS FILE IS AUTOGENERATED!

To make changes, edit the corresponding source file under:

https://github.com/redpanda-data/connect/tree/main/internal/impl/<provider>.

And:

https://github.com/redpanda-data/connect/tree/main/cmd/tools/docs_gen/templates/plugin.adoc.tmpl
////

// © 2024 Redpanda Data Inc.


component_type_dropdown::[]


Enables TigerBeetle CDC streaming for Redpanda Connect.

Introduced in version 0.0.1.

```yml
# Config fields, showing default values
input:
label: ""
tigerbeetle_cdc:
cluster_id: "" # No default (required)
addresses: [] # No default (required)
progress_cache: "" # No default (required)
rate_limit: ""
event_count_max: 2730
idle_interval_ms: 1000
timestamp_initial: ""
auto_replay_nacks: true
```

Listens to a TigerBeetle cluster and creates a message for each change.

Each message is a JSON object like:

```json
{
"timestamp": "1745328372758695656",
"type": "single_phase",
"ledger": 2,
"transfer": {
"id": "9082709",
"amount": "3794",
"pending_id": "0",
"user_data_128": "79248595801719937611592367840129079151",
"user_data_64": "13615171707598273871",
"user_data_32": 3229992513,
"timeout": 0,
"code": 20295,
"flags": 0,
"timestamp": "1745328372758695656"
},
"debit_account": {
"id": "3750",
"debits_pending": "0",
"debits_posted": "8463768",
"credits_pending": "0",
"credits_posted": "8861179",
"user_data_128": "118966247877720884212341541320399553321",
"user_data_64": "526432537153007844",
"user_data_32": 4157247332,
"code": 1,
"flags": 0,
"timestamp": "1745328270103398016"
},
"credit_account": {
"id": "6765",
"debits_pending": "0",
"debits_posted": "8669204",
"credits_pending": "0",
"credits_posted": "8637251",
"user_data_128": "43670023860556310170878798978091998141",
"user_data_64": "12485093662256535374",
"user_data_32": 1924162092,
"code": 1,
"flags": 0,
"timestamp": "1745328270103401031"
}
}
```

For more information refer to https://docs.tigerbeetle.com/operating/cdc/

== Metadata

This input adds the following metadata fields to each message:

- event_type: One of "single_phase", "two_phase_pending", "two_phase_posted", "two_phase_voided", or "two_phase_expired".
- ledger: The ledger code.
- transfer_code: The transfer code.
- debit_account_code: The debit account code.
- credit_account_code: The credit account code.
- timestamp: The unique event timestamp with nanosecond resolution.
- timestamp_ms: The event timestamp with millisecond resolution.

== Guarantees

This input guarantees _at-least-once semantics_, and makes a best effort to prevent
duplicate messages. However, during crash recovery, it may replay unacknowledged
messages that could have been already delivered to consumers.

It is the consumer’s responsibility to perform idempotency checks when processing messages.

== Upgrading

The TigerBeetle client version must not be newer than the cluster version, as it will fail
with an error message if so.

Requires TigerBeetle cluster version 0.16.57 or greater.

== Fields

=== `cluster_id`

The TigerBeetle unique 128-bit cluster ID.


*Type*: `string`


=== `addresses`

A list of IP addresses of all the TigerBeetle replicas in the cluster. The order of addresses must correspond to the order of replicas.


*Type*: `array`


=== `progress_cache`

A https://docs.redpanda.com/redpanda-connect/components/caches/about[cache resource^] used to track progress by storing the last acknowledged timestamp.
This allows Redpanda Connect to resume from the latest delivered event upon restart.


*Type*: `string`


=== `rate_limit`

An optional https://docs.redpanda.com/redpanda-connect/components/rate_limits/about/[rate limit^] to throttle the number of **requests** made to TigerBeetle.


*Type*: `string`

*Default*: `""`

=== `event_count_max`

The maximum number of events fetched from TigerBeetle per **request**.
Must be greater than zero.


*Type*: `int`

*Default*: `2730`

=== `idle_interval_ms`

The time interval in milliseconds to wait before querying again when the last request returned no events.
Must be greater than zero.


*Type*: `int`

*Default*: `1000`

=== `timestamp_initial`

The initial timestamp to start extracting events from. If not defined, all events since the beginning will be included.
Ignored if a more recent timestamp has already been acknowledged.
This is a TigerBeetle timestamp with nanosecond precision.


*Type*: `string`

*Default*: `""`

=== `auto_replay_nacks`

Whether messages that are rejected (nacked) at the output level should be automatically replayed indefinitely, eventually resulting in back pressure if the cause of the rejections is persistent. If set to `false` these messages will instead be deleted. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data can be discarded immediately upon consumption and mutation.


*Type*: `bool`

*Default*: `true`


1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ require (
github.com/testcontainers/testcontainers-go/modules/ollama v0.37.0
github.com/testcontainers/testcontainers-go/modules/qdrant v0.37.0
github.com/tetratelabs/wazero v1.7.3
github.com/tigerbeetle/tigerbeetle-go v0.16.57
github.com/timeplus-io/proton-go-driver/v2 v2.0.17
github.com/tmc/langchaingo v0.1.13
github.com/trinodb/trino-go-client v0.315.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1931,6 +1931,8 @@ github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhV
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4=
github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tigerbeetle/tigerbeetle-go v0.16.57 h1:2yZCg+SO+QHCBImjPvYyvl7xBEjqktyuXE8O8t90X5w=
github.com/tigerbeetle/tigerbeetle-go v0.16.57/go.mod h1:d6G7n4OlD7GLHd62x0VlWPXeI/L0SoNNTfm/ee24GJI=
github.com/tilinna/z85 v1.0.0 h1:uqFnJBlD01dosSeo5sK1G1YGbPuwqVHqR+12OJDRjUw=
github.com/tilinna/z85 v1.0.0/go.mod h1:EfpFU/DUY4ddEy6CRvk2l+UQNEzHbh+bqBQS+04Nkxs=
github.com/timeplus-io/proton-go-driver/v2 v2.0.17 h1:rXPT21/9FgQYFntSgLvJRL/7pgPAfTXWIZKp5UG+vQ0=
Expand Down
169 changes: 169 additions & 0 deletions internal/impl/tigerbeetle/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package tigerbeetle

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/redpanda-data/benthos/v4/public/service"
)

func TestConfigLinting(t *testing.T) {
linter := service.NewEnvironment().NewComponentConfigLinter()

tests := []struct {
name string
conf string
lintErr string
}{
{
name: "basic config",
conf: `
tigerbeetle_cdc:
cluster_id: 0
addresses: [ "3000" ]
progress_cache: foocache
`,
},
{
name: "advanced config",
conf: `
tigerbeetle_cdc:
cluster_id: 181161957064799711348825326453165787824
addresses: [ "127.0.0.1:3000", "127.0.0.1:3001", "127.0.0.1:3002" ]
progress_cache: foocache
event_count_max: 1024
idle_interval_ms: 5000
timestamp_initial: 1756549800322811551
`,
},
{
name: "invalid cluster_id",
conf: `
tigerbeetle_cdc:
cluster_id: xyz
addresses: [ "3000" ]
progress_cache: foocache
`,
lintErr: "(3,1) field 'cluster_id' must be a valid integer",
},
{
name: "empty cluster_id",
conf: `
tigerbeetle_cdc:
cluster_id:
addresses: [ "3000" ]
progress_cache: foocache
`,
lintErr: "(3,1) field 'cluster_id' must be a valid integer",
},
{
name: "missing cluster_id",
conf: `
tigerbeetle_cdc:
addresses: [ "3000" ]
progress_cache: foocache
`,
lintErr: "(3,1) field cluster_id is required",
},
{
name: "empty addresses",
conf: `
tigerbeetle_cdc:
cluster_id: 0
addresses: [ ]
progress_cache: foocache
`,
lintErr: "(4,1) field 'addresses' must contain at least one address",
},
{
name: "missing progress_cache",
conf: `
tigerbeetle_cdc:
cluster_id: 0
addresses: [ "3000" ]
`,
lintErr: "(3,1) field progress_cache is required",
},
{
name: "zeroed event_count_max",
conf: `
tigerbeetle_cdc:
cluster_id: 0
addresses: [ "3000" ]
progress_cache: foocache
event_count_max: 0
`,
lintErr: "(6,1) field 'event_count_max' must be greater than 0",
},
{
name: "negative event_count_max",
conf: `
tigerbeetle_cdc:
cluster_id: 0
addresses: [ "3000" ]
progress_cache: foocache
event_count_max: -1
`,
lintErr: "(6,1) field 'event_count_max' must be greater than 0",
},
{
name: "zeroed idle_interval_ms",
conf: `
tigerbeetle_cdc:
cluster_id: 0
addresses: [ "3000" ]
progress_cache: foocache
idle_interval_ms: 0
`,
lintErr: "(6,1) field 'idle_interval_ms' must be greater than 0",
},
{
name: "negative idle_interval_ms",
conf: `
tigerbeetle_cdc:
cluster_id: 0
addresses: [ "3000" ]
progress_cache: foocache
idle_interval_ms: -1
`,
lintErr: "(6,1) field 'idle_interval_ms' must be greater than 0",
},
{
name: "negative timestamp_initial",
conf: `
tigerbeetle_cdc:
cluster_id: 0
addresses: [ "3000" ]
progress_cache: foocache
timestamp_initial: -1
`,
lintErr: "(6,1) field 'timestamp_initial' must be a valid integer",
},
{
name: "invalid timestamp_initial",
conf: `
tigerbeetle_cdc:
cluster_id: 0
addresses: [ "3000" ]
progress_cache: foocache
timestamp_initial: xyz
`,
lintErr: "(6,1) field 'timestamp_initial' must be a valid integer",
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
lints, err := linter.LintInputYAML([]byte(test.conf))
require.NoError(t, err)
if test.lintErr != "" {
assert.Len(t, lints, 1)
assert.Equal(t, test.lintErr, lints[0].Error())
} else {
assert.Empty(t, lints)
}
})
}
}
Loading