From f5d6471f0d6822b0d396db9eb516b06a56ba457b Mon Sep 17 00:00:00 2001 From: Geert G <117188496+cll-gg@users.noreply.github.com> Date: Mon, 24 Jun 2024 15:04:29 +0200 Subject: [PATCH 1/4] Make it possible to disable retrying the broker connection in sarama and leave it up to the framework instead --- internal/impl/kafka/input_sarama_kafka.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/internal/impl/kafka/input_sarama_kafka.go b/internal/impl/kafka/input_sarama_kafka.go index 97f49e3d33..e594af7663 100644 --- a/internal/impl/kafka/input_sarama_kafka.go +++ b/internal/impl/kafka/input_sarama_kafka.go @@ -47,6 +47,7 @@ const ( iskFieldFetchBufferCap = "fetch_buffer_cap" iskFieldMultiHeader = "multi_header" iskFieldBatching = "batching" + iskFieldRetryBrokerConnection = "retry_broker_connection" ) func iskConfigSpec() *service.ConfigSpec { @@ -158,6 +159,9 @@ Unfortunately this error message will appear for a wide range of connection prob Description("Decode headers into lists to allow handling of multiple values with the same key"). Advanced().Default(false), service.NewBatchPolicyField(iskFieldBatching).Advanced(), + service.NewBoolField(iskFieldRetryBrokerConnection). + Description("Retry connecting to the broker when the connection fails. Setting this to false leaves retrying up to Redpanda Connect itself."). + Advanced().Default(true), ) } @@ -509,6 +513,14 @@ func (k *kafkaReader) saramaConfigFromParsed(conf *service.ParsedConfig) (*saram config.Consumer.Offsets.Initial = sarama.OffsetOldest } + var retryBrokerConn bool + if retryBrokerConn, err = conf.FieldBool(iskFieldRetryBrokerConnection); err != nil { + return nil, err + } + if !retryBrokerConn { + config.Metadata.Retry.Max = 0 // disables retrying + } + if err := ApplySaramaSASLFromParsed(conf, k.mgr, config); err != nil { return nil, err } From b35da916294aa5fe73d9a10e3054bb8f5fe20140 Mon Sep 17 00:00:00 2001 From: Geert G <117188496+cll-gg@users.noreply.github.com> Date: Mon, 24 Jun 2024 15:50:49 +0200 Subject: [PATCH 2/4] Disabling was not the right way (benthos would try to reconnect almost instantly), now setting the interval --- internal/impl/kafka/input_sarama_kafka.go | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/internal/impl/kafka/input_sarama_kafka.go b/internal/impl/kafka/input_sarama_kafka.go index e594af7663..62abef04ca 100644 --- a/internal/impl/kafka/input_sarama_kafka.go +++ b/internal/impl/kafka/input_sarama_kafka.go @@ -47,7 +47,7 @@ const ( iskFieldFetchBufferCap = "fetch_buffer_cap" iskFieldMultiHeader = "multi_header" iskFieldBatching = "batching" - iskFieldRetryBrokerConnection = "retry_broker_connection" + iskFieldBrokerConnectionRetryInterval = "broker_connection_retry_interval" ) func iskConfigSpec() *service.ConfigSpec { @@ -159,9 +159,9 @@ Unfortunately this error message will appear for a wide range of connection prob Description("Decode headers into lists to allow handling of multiple values with the same key"). Advanced().Default(false), service.NewBatchPolicyField(iskFieldBatching).Advanced(), - service.NewBoolField(iskFieldRetryBrokerConnection). - Description("Retry connecting to the broker when the connection fails. Setting this to false leaves retrying up to Redpanda Connect itself."). - Advanced().Default(true), + service.NewDurationField(iskFieldBrokerConnectionRetryInterval). + Description("Default interval used to reconnect to the broker"). + Advanced().Default(250*time.Millisecond), ) } @@ -513,13 +513,10 @@ func (k *kafkaReader) saramaConfigFromParsed(conf *service.ParsedConfig) (*saram config.Consumer.Offsets.Initial = sarama.OffsetOldest } - var retryBrokerConn bool - if retryBrokerConn, err = conf.FieldBool(iskFieldRetryBrokerConnection); err != nil { + if config.Metadata.Retry.Backoff, err = conf.FieldDuration(iskFieldBrokerConnectionRetryInterval); err != nil { return nil, err } - if !retryBrokerConn { - config.Metadata.Retry.Max = 0 // disables retrying - } + k.mgr.Logger().Debugf("Sarama broker reconnection interval set to %v.\n", config.Metadata.Retry.Backoff) if err := ApplySaramaSASLFromParsed(conf, k.mgr, config); err != nil { return nil, err @@ -562,7 +559,7 @@ func (k *kafkaReader) ReadBatch(ctx context.Context) (service.MessageBatch, serv return nil, nil, ctx.Err() } -// CloseAsync shuts down the kafkaReader input and stops processing requests. +// Close shuts down the kafkaReader input and stops processing requests. func (k *kafkaReader) Close(ctx context.Context) (err error) { k.closeGroupAndConsumers() select { From c0a4ce957008e0ae71cf81c2cb0f3fd7ee438f4e Mon Sep 17 00:00:00 2001 From: Geert G <117188496+cll-gg@users.noreply.github.com> Date: Mon, 24 Jun 2024 18:08:46 +0200 Subject: [PATCH 3/4] Remove log statement + update description --- internal/impl/kafka/input_sarama_kafka.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/internal/impl/kafka/input_sarama_kafka.go b/internal/impl/kafka/input_sarama_kafka.go index 62abef04ca..4195436a86 100644 --- a/internal/impl/kafka/input_sarama_kafka.go +++ b/internal/impl/kafka/input_sarama_kafka.go @@ -160,7 +160,7 @@ Unfortunately this error message will appear for a wide range of connection prob Advanced().Default(false), service.NewBatchPolicyField(iskFieldBatching).Advanced(), service.NewDurationField(iskFieldBrokerConnectionRetryInterval). - Description("Default interval used to reconnect to the broker"). + Description("Interval used to reconnect to the broker"). Advanced().Default(250*time.Millisecond), ) } @@ -516,7 +516,6 @@ func (k *kafkaReader) saramaConfigFromParsed(conf *service.ParsedConfig) (*saram if config.Metadata.Retry.Backoff, err = conf.FieldDuration(iskFieldBrokerConnectionRetryInterval); err != nil { return nil, err } - k.mgr.Logger().Debugf("Sarama broker reconnection interval set to %v.\n", config.Metadata.Retry.Backoff) if err := ApplySaramaSASLFromParsed(conf, k.mgr, config); err != nil { return nil, err From d1fef2712d80bd7312944dbef3ae0d8a33b397ab Mon Sep 17 00:00:00 2001 From: Geert G <117188496+cll-gg@users.noreply.github.com> Date: Mon, 24 Jun 2024 18:13:59 +0200 Subject: [PATCH 4/4] Run `make docs` --- docs/modules/components/pages/inputs/kafka.adoc | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/docs/modules/components/pages/inputs/kafka.adoc b/docs/modules/components/pages/inputs/kafka.adoc index a3713fa025..72eba0ad41 100644 --- a/docs/modules/components/pages/inputs/kafka.adoc +++ b/docs/modules/components/pages/inputs/kafka.adoc @@ -85,6 +85,7 @@ input: period: "" check: "" processors: [] # No default (optional) + broker_connection_retry_interval: 250ms ``` -- @@ -698,4 +699,13 @@ processors: format: json_array ``` +=== `broker_connection_retry_interval` + +Interval used to reconnect to the broker + + +*Type*: `string` + +*Default*: `250000000` +