Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions docs/modules/components/pages/inputs/kafka_franz.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ input:
processors: [] # No default (optional)
topic_lag_refresh_period: 5s
auto_replay_nacks: true
extract_tracing_map: root = @ # No default (optional)
```

--
Expand Down Expand Up @@ -840,4 +841,21 @@ Whether messages that are rejected (nacked) at the output level should be automa

*Default*: `true`

=== `extract_tracing_map`

EXPERIMENTAL: A xref:guides:bloblang/about.adoc[Bloblang mapping] that attempts to extract an object containing tracing propagation information, which will then be used as the root tracing span for the message. The specification of the extracted fields must match the format used by the service wide tracer.


*Type*: `string`

Requires version 3.45.0 or newer

```yml
# Examples

extract_tracing_map: root = @

extract_tracing_map: root = this.meta.span
```


18 changes: 18 additions & 0 deletions docs/modules/components/pages/outputs/kafka_franz.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ output:
period: ""
check: ""
processors: [] # No default (optional)
inject_tracing_map: meta = @.merge(this) # No default (optional)
partitioner: "" # No default (optional)
idempotent_write: true
compression: "" # No default (optional)
Expand Down Expand Up @@ -716,6 +717,23 @@ processors:
format: json_array
```

=== `inject_tracing_map`

EXPERIMENTAL: A xref:guides:bloblang/about.adoc[Bloblang mapping] used to inject an object containing tracing propagation information into outbound messages. The specification of the injected fields will match the format used by the service wide tracer.


*Type*: `string`

Requires version 3.45.0 or newer

```yml
# Examples

inject_tracing_map: meta = @.merge(this)

inject_tracing_map: root.meta.span = this
```

=== `partitioner`

Override the default murmur2 hashing partitioner.
Expand Down
8 changes: 7 additions & 1 deletion internal/impl/kafka/input_kafka_franz.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func FranzKafkaInputConfigFields() []*service.ConfigField {
FranzReaderUnorderedConfigFields(),
[]*service.ConfigField{
service.NewAutoRetryNacksToggleField(),
service.NewExtractTracingSpanMappingField(),
},
)
}
Expand All @@ -83,6 +84,11 @@ func init() {
return nil, err
}

return service.AutoRetryNacksBatchedToggled(conf, rdr)
r, err := service.AutoRetryNacksBatchedToggled(conf, rdr)
if err != nil {
return nil, err
}

return conf.WrapBatchInputExtractTracingSpanMapping("kafka_franz", r)
})
}
6 changes: 6 additions & 0 deletions internal/impl/kafka/output_kafka_franz.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func FranzKafkaOutputConfigFields() []*service.ConfigField {
Description("The maximum number of batches to be sending in parallel at any given time.").
Default(10),
service.NewBatchPolicyField(kfoFieldBatching),
service.NewInjectTracingSpanMappingField(),

// Deprecated
service.NewStringField(kfoFieldRackID).Deprecated(),
Expand Down Expand Up @@ -118,6 +119,11 @@ func init() {
client = nil
return nil
}))
if err != nil {
return
}

output, err = conf.WrapBatchOutputExtractTracingSpanMapping("kafka_franz", output)
return
})
}