Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/exporter-batcher-merge-context.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Provide an interface `queue_batch.Setting.MergeCtx` so users can control how context values are preserved or combined

# One or more tracking issues or pull requests related to the change
issues: [13320]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
By supplying a custom mergeCtx function, users can control how context values are preserved or combined.
The default behavior is to preserve no context values.

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,9 @@ func parentsFromContext(ctx context.Context) []trace.Link {
return LinksFromContext(ctx)
}

func contextWithMergedLinks(ctx1, ctx2 context.Context) context.Context {
func contextWithMergedLinks(mergedCtx, ctx1, ctx2 context.Context) context.Context {
return context.WithValue(
context.Background(),
mergedCtx,
batchSpanLinksKey,
append(parentsFromContext(ctx1), parentsFromContext(ctx2)...),
)
append(parentsFromContext(ctx1), parentsFromContext(ctx2)...))
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,32 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
)

type testTimestampKeyType int

const testTimestampKey testTimestampKeyType = iota

// mergeCtxFunc corresponds to user specified mergeCtx function in the batcher settings.
// This specific merge Context function keeps the greater of timestamps from two contexts.
func mergeCtxFunc(ctx1, ctx2 context.Context) context.Context {
timestamp1 := ctx1.Value(testTimestampKey)
timestamp2 := ctx2.Value(testTimestampKey)
if timestamp1 != nil && timestamp2 != nil {
if timestamp1.(int) > timestamp2.(int) {
return context.WithValue(context.Background(), testTimestampKey, timestamp1)
}
return context.WithValue(context.Background(), testTimestampKey, timestamp2)
}
if timestamp1 != nil {
return context.WithValue(context.Background(), testTimestampKey, timestamp1)
}
return context.WithValue(context.Background(), testTimestampKey, timestamp2)
}

// mergeContextHelper performs the same operation done during batching.
func mergeContextHelper(ctx1, ctx2 context.Context) context.Context {
return contextWithMergedLinks(mergeCtxFunc(ctx1, ctx2), ctx1, ctx2)
}

func TestBatchContextLink(t *testing.T) {
tracerProvider := componenttest.NewTelemetry().NewTelemetrySettings().TracerProvider
tracer := tracerProvider.Tracer("go.opentelemetry.io/collector/exporter/exporterhelper")
Expand All @@ -28,12 +54,19 @@ func TestBatchContextLink(t *testing.T) {
ctx4, span4 := tracer.Start(ctx1, "span4")
defer span4.End()

batchContext := contextWithMergedLinks(ctx2, ctx3)
batchContext = contextWithMergedLinks(batchContext, ctx4)
batchContext := mergeContextHelper(ctx2, ctx3)
batchContext = mergeContextHelper(batchContext, ctx4)

actualLinks := LinksFromContext(batchContext)
require.Len(t, actualLinks, 3)
require.Equal(t, trace.SpanContextFromContext(ctx2), actualLinks[0].SpanContext)
require.Equal(t, trace.SpanContextFromContext(ctx3), actualLinks[1].SpanContext)
require.Equal(t, trace.SpanContextFromContext(ctx4), actualLinks[2].SpanContext)
}

func TestMergedContext_GetValue(t *testing.T) {
ctx1 := context.WithValue(context.Background(), testTimestampKey, 1234)
ctx2 := context.WithValue(context.Background(), testTimestampKey, 2345)
batchContext := mergeContextHelper(ctx1, ctx2)
require.Equal(t, 2345, batchContext.Value(testTimestampKey))
}
5 changes: 3 additions & 2 deletions exporter/exporterhelper/internal/queuebatch/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type batcherSettings[T any] struct {
itemsSizer request.Sizer[T]
bytesSizer request.Sizer[T]
partitioner Partitioner[T]
mergeCtx func(context.Context, context.Context) context.Context
next sender.SendFunc[T]
maxWorkers int
logger *zap.Logger
Expand All @@ -42,10 +43,10 @@ func NewBatcher(cfg configoptional.Optional[BatchConfig], set batcherSettings[re
}

if set.partitioner == nil {
return newPartitionBatcher(*cfg.Get(), sizer, newWorkerPool(set.maxWorkers), set.next, set.logger), nil
return newPartitionBatcher(*cfg.Get(), sizer, set.mergeCtx, newWorkerPool(set.maxWorkers), set.next, set.logger), nil
}

return newMultiBatcher(*cfg.Get(), sizer, newWorkerPool(set.maxWorkers), set.partitioner, set.next, set.logger), nil
return newMultiBatcher(*cfg.Get(), sizer, newWorkerPool(set.maxWorkers), set.partitioner, set.mergeCtx, set.next, set.logger), nil
}

func activeSizer[T any](sizerType request.SizerType, itemsSizer, bytesSizer request.Sizer[T]) request.Sizer[T] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type multiBatcher struct {
wp *workerPool
sizer request.Sizer[request.Request]
partitioner Partitioner[request.Request]
mergeCtx func(context.Context, context.Context) context.Context
consumeFunc sender.SendFunc[request.Request]
shards sync.Map
logger *zap.Logger
Expand All @@ -31,6 +32,7 @@ func newMultiBatcher(
sizer request.Sizer[request.Request],
wp *workerPool,
partitioner Partitioner[request.Request],
mergeCtx func(context.Context, context.Context) context.Context,
next sender.SendFunc[request.Request],
logger *zap.Logger,
) *multiBatcher {
Expand All @@ -39,6 +41,7 @@ func newMultiBatcher(
wp: wp,
sizer: sizer,
partitioner: partitioner,
mergeCtx: mergeCtx,
consumeFunc: next,
logger: logger,
}
Expand All @@ -51,7 +54,8 @@ func (mb *multiBatcher) getPartition(ctx context.Context, req request.Request) *
if found {
return s.(*partitionBatcher)
}
newS := newPartitionBatcher(mb.cfg, mb.sizer, mb.wp, mb.consumeFunc, mb.logger)

newS := newPartitionBatcher(mb.cfg, mb.sizer, mb.mergeCtx, mb.wp, mb.consumeFunc, mb.logger)
_ = newS.Start(ctx, nil)
s, loaded := mb.shards.LoadOrStore(key, newS)
// If not loaded, there was a race condition in adding the new shard. Shutdown the newly created shard.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func TestMultiBatcher_NoTimeout(t *testing.T) {
NewPartitioner(func(ctx context.Context, _ request.Request) string {
return ctx.Value(partitionKey{}).(string)
}),
nil,
sink.Export,
zap.NewNop(),
)
Expand Down Expand Up @@ -85,6 +86,7 @@ func TestMultiBatcher_Timeout(t *testing.T) {
NewPartitioner(func(ctx context.Context, _ request.Request) string {
return ctx.Value(partitionKey{}).(string)
}),
nil,
sink.Export,
zap.NewNop(),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type partitionBatcher struct {
cfg BatchConfig
wp *workerPool
sizer request.Sizer[request.Request]
mergeCtx func(context.Context, context.Context) context.Context
consumeFunc sender.SendFunc[request.Request]
stopWG sync.WaitGroup
currentBatchMu sync.Mutex
Expand All @@ -42,6 +43,7 @@ type partitionBatcher struct {
func newPartitionBatcher(
cfg BatchConfig,
sizer request.Sizer[request.Request],
mergeCtx func(context.Context, context.Context) context.Context,
wp *workerPool,
next sender.SendFunc[request.Request],
logger *zap.Logger,
Expand All @@ -50,6 +52,7 @@ func newPartitionBatcher(
cfg: cfg,
wp: wp,
sizer: sizer,
mergeCtx: mergeCtx,
consumeFunc: next,
shutdownCh: make(chan struct{}, 1),
logger: logger,
Expand Down Expand Up @@ -147,7 +150,12 @@ func (qb *partitionBatcher) Consume(ctx context.Context, req request.Request, do
// Logic on how to deal with the current batch:
qb.currentBatch.req = reqList[0]
qb.currentBatch.done = append(qb.currentBatch.done, done)
qb.currentBatch.ctx = contextWithMergedLinks(qb.currentBatch.ctx, ctx)

mergedCtx := context.Background() //nolint:contextcheck
if qb.mergeCtx != nil {
mergedCtx = qb.mergeCtx(qb.currentBatch.ctx, ctx)
}
qb.currentBatch.ctx = contextWithMergedLinks(mergedCtx, qb.currentBatch.ctx, ctx)

// Save the "currentBatch" if we need to flush it, because we want to execute flush without holding the lock, and
// cannot unlock and re-lock because we are not done processing all the responses.
Expand Down Expand Up @@ -177,7 +185,7 @@ func (qb *partitionBatcher) Consume(ctx context.Context, req request.Request, do

qb.currentBatchMu.Unlock()
if firstBatch != nil {
qb.flush(firstBatch.ctx, firstBatch.req, firstBatch.done) //nolint:contextcheck //context already handled
qb.flush(firstBatch.ctx, firstBatch.req, firstBatch.done)
}
for i := 0; i < len(reqList); i++ {
qb.flush(ctx, reqList[i], done)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ import (
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/requesttest"
)

type testContextKey string

const timestampKey testContextKey = "timestamp"

func TestPartitionBatcher_NoSplit_MinThresholdZero_TimeoutDisabled(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -62,7 +66,7 @@ func TestPartitionBatcher_NoSplit_MinThresholdZero_TimeoutDisabled(t *testing.T)
}

sink := requesttest.NewSink()
ba := newPartitionBatcher(cfg, tt.sizer, newWorkerPool(tt.maxWorkers), sink.Export, zap.NewNop())
ba := newPartitionBatcher(cfg, tt.sizer, nil, newWorkerPool(tt.maxWorkers), sink.Export, zap.NewNop())
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
require.NoError(t, ba.Shutdown(context.Background()))
Expand Down Expand Up @@ -128,7 +132,7 @@ func TestPartitionBatcher_NoSplit_TimeoutDisabled(t *testing.T) {
}

sink := requesttest.NewSink()
ba := newPartitionBatcher(cfg, tt.sizer, newWorkerPool(tt.maxWorkers), sink.Export, zap.NewNop())
ba := newPartitionBatcher(cfg, tt.sizer, nil, newWorkerPool(tt.maxWorkers), sink.Export, zap.NewNop())
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))

done := newFakeDone()
Expand Down Expand Up @@ -209,7 +213,7 @@ func TestPartitionBatcher_NoSplit_WithTimeout(t *testing.T) {
}

sink := requesttest.NewSink()
ba := newPartitionBatcher(cfg, tt.sizer, newWorkerPool(tt.maxWorkers), sink.Export, zap.NewNop())
ba := newPartitionBatcher(cfg, tt.sizer, nil, newWorkerPool(tt.maxWorkers), sink.Export, zap.NewNop())
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
require.NoError(t, ba.Shutdown(context.Background()))
Expand Down Expand Up @@ -281,7 +285,7 @@ func TestPartitionBatcher_Split_TimeoutDisabled(t *testing.T) {
}

sink := requesttest.NewSink()
ba := newPartitionBatcher(cfg, tt.sizer, newWorkerPool(tt.maxWorkers), sink.Export, zap.NewNop())
ba := newPartitionBatcher(cfg, tt.sizer, nil, newWorkerPool(tt.maxWorkers), sink.Export, zap.NewNop())
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))

done := newFakeDone()
Expand Down Expand Up @@ -329,7 +333,7 @@ func TestPartitionBatcher_Shutdown(t *testing.T) {
}

sink := requesttest.NewSink()
ba := newPartitionBatcher(cfg, request.NewItemsSizer(), newWorkerPool(2), sink.Export, zap.NewNop())
ba := newPartitionBatcher(cfg, request.NewItemsSizer(), nil, newWorkerPool(2), sink.Export, zap.NewNop())
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))

done := newFakeDone()
Expand Down Expand Up @@ -358,7 +362,7 @@ func TestPartitionBatcher_MergeError(t *testing.T) {
}

sink := requesttest.NewSink()
ba := newPartitionBatcher(cfg, request.NewItemsSizer(), newWorkerPool(2), sink.Export, zap.NewNop())
ba := newPartitionBatcher(cfg, request.NewItemsSizer(), nil, newWorkerPool(2), sink.Export, zap.NewNop())
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
require.NoError(t, ba.Shutdown(context.Background()))
Expand Down Expand Up @@ -392,7 +396,7 @@ func TestPartitionBatcher_PartialSuccessError(t *testing.T) {
core, observed := observer.New(zap.WarnLevel)
logger := zap.New(core)
sink := requesttest.NewSink()
ba := newPartitionBatcher(cfg, request.NewItemsSizer(), newWorkerPool(1), sink.Export, logger)
ba := newPartitionBatcher(cfg, request.NewItemsSizer(), nil, newWorkerPool(1), sink.Export, logger)
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))

done := newFakeDone()
Expand Down Expand Up @@ -434,7 +438,7 @@ func TestSPartitionBatcher_PartialSuccessError_AfterOkRequest(t *testing.T) {
core, observed := observer.New(zap.WarnLevel)
logger := zap.New(core)
sink := requesttest.NewSink()
ba := newPartitionBatcher(cfg, request.NewItemsSizer(), newWorkerPool(1), sink.Export, logger)
ba := newPartitionBatcher(cfg, request.NewItemsSizer(), nil, newWorkerPool(1), sink.Export, logger)
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))

done := newFakeDone()
Expand Down Expand Up @@ -494,7 +498,7 @@ func TestShardBatcher_EmptyRequestList(t *testing.T) {
}

sink := requesttest.NewSink()
ba := newPartitionBatcher(cfg, request.NewItemsSizer(), newWorkerPool(1), sink.Export, zap.NewNop())
ba := newPartitionBatcher(cfg, request.NewItemsSizer(), nil, newWorkerPool(1), sink.Export, zap.NewNop())
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
require.NoError(t, ba.Shutdown(context.Background()))
Expand All @@ -513,3 +517,46 @@ func TestShardBatcher_EmptyRequestList(t *testing.T) {
assert.Equal(t, int64(0), done.success.Load())
assert.Equal(t, 0, sink.RequestsCount())
}

func TestPartitionBatcher_ContextMerging(t *testing.T) {
tests := []struct {
name string
mergeCtxFunc func(ctx1, ctx2 context.Context) context.Context
}{
{
name: "merge_context_with_timestamp",
mergeCtxFunc: func(ctx1, _ context.Context) context.Context {
return context.WithValue(ctx1, timestampKey, 1234)
},
},
{
name: "merge_context_returns_background",
mergeCtxFunc: func(_, _ context.Context) context.Context {
return context.Background()
},
},
{
name: "nil_merge_context",
mergeCtxFunc: nil,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cfg := BatchConfig{
FlushTimeout: 0,
Sizer: request.SizerTypeItems,
MinSize: 10,
}
sink := requesttest.NewSink()
ba := newPartitionBatcher(cfg, request.NewItemsSizer(), tt.mergeCtxFunc, newWorkerPool(1), sink.Export, zap.NewNop())
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))

done := newFakeDone()
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 8, Bytes: 8}, done)
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 8, Bytes: 8}, done)
<-time.After(10 * time.Millisecond)
assert.Equal(t, 1, sink.RequestsCount())
assert.EqualValues(t, 2, done.success.Load())
})
}
}
2 changes: 2 additions & 0 deletions exporter/exporterhelper/internal/queuebatch/queue_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Settings[T any] struct {
ItemsSizer request.Sizer[T]
BytesSizer request.Sizer[T]
Partitioner Partitioner[T]
MergeCtx func(context.Context, context.Context) context.Context
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting -- I agree this is a good place for programmable configuration. I expect there is a default implementation that mostly just works for most exporters.

Copy link
Contributor Author

@sfc-gh-sili sfc-gh-sili Jul 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am leaning towards the most conservative default behavior — dropping everything — which also aligns with the current behavior. I’d love to hear your thoughts on this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I assume that without the batcher, context otherwise passes through the exporterhelper path so that extensions like https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/extension/headerssetterextension work.

}

type QueueBatch struct {
Expand All @@ -39,6 +40,7 @@ func NewQueueBatch(
itemsSizer: set.ItemsSizer,
bytesSizer: set.BytesSizer,
partitioner: set.Partitioner,
mergeCtx: set.MergeCtx,
next: next,
maxWorkers: cfg.NumConsumers,
logger: set.Telemetry.Logger,
Expand Down