Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/modules/components/pages/inputs/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ input:
period: ""
check: ""
processors: [] # No default (optional)
broker_connection_retry_interval: 250ms
```

--
Expand Down Expand Up @@ -698,4 +699,13 @@ processors:
format: json_array
```

=== `broker_connection_retry_interval`

Interval used to reconnect to the broker


*Type*: `string`

*Default*: `250000000`


10 changes: 9 additions & 1 deletion internal/impl/kafka/input_sarama_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ const (
iskFieldFetchBufferCap = "fetch_buffer_cap"
iskFieldMultiHeader = "multi_header"
iskFieldBatching = "batching"
iskFieldBrokerConnectionRetryInterval = "broker_connection_retry_interval"
)

func iskConfigSpec() *service.ConfigSpec {
Expand Down Expand Up @@ -158,6 +159,9 @@ Unfortunately this error message will appear for a wide range of connection prob
Description("Decode headers into lists to allow handling of multiple values with the same key").
Advanced().Default(false),
service.NewBatchPolicyField(iskFieldBatching).Advanced(),
service.NewDurationField(iskFieldBrokerConnectionRetryInterval).
Description("Interval used to reconnect to the broker").
Advanced().Default(250*time.Millisecond),
)
}

Expand Down Expand Up @@ -509,6 +513,10 @@ func (k *kafkaReader) saramaConfigFromParsed(conf *service.ParsedConfig) (*saram
config.Consumer.Offsets.Initial = sarama.OffsetOldest
}

if config.Metadata.Retry.Backoff, err = conf.FieldDuration(iskFieldBrokerConnectionRetryInterval); err != nil {
return nil, err
}

if err := ApplySaramaSASLFromParsed(conf, k.mgr, config); err != nil {
return nil, err
}
Expand Down Expand Up @@ -550,7 +558,7 @@ func (k *kafkaReader) ReadBatch(ctx context.Context) (service.MessageBatch, serv
return nil, nil, ctx.Err()
}

// CloseAsync shuts down the kafkaReader input and stops processing requests.
// Close shuts down the kafkaReader input and stops processing requests.
func (k *kafkaReader) Close(ctx context.Context) (err error) {
k.closeGroupAndConsumers()
select {
Expand Down