From bee7f2570f7a491634b48a2423e51994ecd9b789 Mon Sep 17 00:00:00 2001 From: Sindy Li Date: Thu, 17 Jul 2025 16:35:44 -0700 Subject: [PATCH 1/2] Merge context --- .chloggen/exporter-batcher-merge-context.yaml | 27 ++++++++ .../internal/queuebatch/batch_context.go | 7 +- .../internal/queuebatch/batch_context_test.go | 37 ++++++++++- .../internal/queuebatch/batcher.go | 5 +- .../internal/queuebatch/multi_batcher.go | 6 +- .../internal/queuebatch/multi_batcher_test.go | 2 + .../internal/queuebatch/partition_batcher.go | 10 ++- .../queuebatch/partition_batcher_test.go | 65 ++++++++++++++++--- .../internal/queuebatch/queue_batch.go | 2 + 9 files changed, 142 insertions(+), 19 deletions(-) create mode 100644 .chloggen/exporter-batcher-merge-context.yaml diff --git a/.chloggen/exporter-batcher-merge-context.yaml b/.chloggen/exporter-batcher-merge-context.yaml new file mode 100644 index 00000000000..4c807d69558 --- /dev/null +++ b/.chloggen/exporter-batcher-merge-context.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Provide an interface `queue_batch.Setting.MergeCtx` so users can control how context values are preserved or combined + +# One or more tracking issues or pull requests related to the change +issues: [13320] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + By supplying a custom mergeCtx function, users can control how context values are preserved or combined. + The default behavior is to preserve no context values. + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/exporter/exporterhelper/internal/queuebatch/batch_context.go b/exporter/exporterhelper/internal/queuebatch/batch_context.go index 9724ecf278f..81f2387d79a 100644 --- a/exporter/exporterhelper/internal/queuebatch/batch_context.go +++ b/exporter/exporterhelper/internal/queuebatch/batch_context.go @@ -31,10 +31,9 @@ func parentsFromContext(ctx context.Context) []trace.Link { return LinksFromContext(ctx) } -func contextWithMergedLinks(ctx1, ctx2 context.Context) context.Context { +func contextWithMergedLinks(mergedCtx, ctx1, ctx2 context.Context) context.Context { return context.WithValue( - context.Background(), + mergedCtx, batchSpanLinksKey, - append(parentsFromContext(ctx1), parentsFromContext(ctx2)...), - ) + append(parentsFromContext(ctx1), parentsFromContext(ctx2)...)) } diff --git a/exporter/exporterhelper/internal/queuebatch/batch_context_test.go b/exporter/exporterhelper/internal/queuebatch/batch_context_test.go index be8a9e4637d..731d04c2c1b 100644 --- a/exporter/exporterhelper/internal/queuebatch/batch_context_test.go +++ b/exporter/exporterhelper/internal/queuebatch/batch_context_test.go @@ -13,6 +13,32 @@ import ( "go.opentelemetry.io/collector/component/componenttest" ) +type testTimestampKeyType int + +const testTimestampKey testTimestampKeyType = iota + +// mergeCtxFunc corresponds to user specified mergeCtx function in the batcher settings. +// This specific merge Context function keeps the greater of timestamps from two contexts. +func mergeCtxFunc(ctx1, ctx2 context.Context) context.Context { + timestamp1 := ctx1.Value(testTimestampKey) + timestamp2 := ctx2.Value(testTimestampKey) + if timestamp1 != nil && timestamp2 != nil { + if timestamp1.(int) > timestamp2.(int) { + return context.WithValue(context.Background(), testTimestampKey, timestamp1) + } + return context.WithValue(context.Background(), testTimestampKey, timestamp2) + } + if timestamp1 != nil { + return context.WithValue(context.Background(), testTimestampKey, timestamp1) + } + return context.WithValue(context.Background(), testTimestampKey, timestamp2) +} + +// mergeContextHelper performs the same operation done during batching. +func mergeContextHelper(ctx1, ctx2 context.Context) context.Context { + return contextWithMergedLinks(mergeCtxFunc(ctx1, ctx2), ctx1, ctx2) +} + func TestBatchContextLink(t *testing.T) { tracerProvider := componenttest.NewTelemetry().NewTelemetrySettings().TracerProvider tracer := tracerProvider.Tracer("go.opentelemetry.io/collector/exporter/exporterhelper") @@ -28,8 +54,8 @@ func TestBatchContextLink(t *testing.T) { ctx4, span4 := tracer.Start(ctx1, "span4") defer span4.End() - batchContext := contextWithMergedLinks(ctx2, ctx3) - batchContext = contextWithMergedLinks(batchContext, ctx4) + batchContext := mergeContextHelper(ctx2, ctx3) + batchContext = mergeContextHelper(batchContext, ctx4) actualLinks := LinksFromContext(batchContext) require.Len(t, actualLinks, 3) @@ -37,3 +63,10 @@ func TestBatchContextLink(t *testing.T) { require.Equal(t, trace.SpanContextFromContext(ctx3), actualLinks[1].SpanContext) require.Equal(t, trace.SpanContextFromContext(ctx4), actualLinks[2].SpanContext) } + +func TestMergedContext_GetValue(t *testing.T) { + ctx1 := context.WithValue(context.Background(), testTimestampKey, 1234) + ctx2 := context.WithValue(context.Background(), testTimestampKey, 2345) + batchContext := mergeContextHelper(ctx1, ctx2) + require.Equal(t, 2345, batchContext.Value(testTimestampKey)) +} diff --git a/exporter/exporterhelper/internal/queuebatch/batcher.go b/exporter/exporterhelper/internal/queuebatch/batcher.go index f5238b25c55..a743c049360 100644 --- a/exporter/exporterhelper/internal/queuebatch/batcher.go +++ b/exporter/exporterhelper/internal/queuebatch/batcher.go @@ -26,6 +26,7 @@ type batcherSettings[T any] struct { itemsSizer request.Sizer[T] bytesSizer request.Sizer[T] partitioner Partitioner[T] + mergeCtx func(context.Context, context.Context) context.Context next sender.SendFunc[T] maxWorkers int logger *zap.Logger @@ -42,10 +43,10 @@ func NewBatcher(cfg configoptional.Optional[BatchConfig], set batcherSettings[re } if set.partitioner == nil { - return newPartitionBatcher(*cfg.Get(), sizer, newWorkerPool(set.maxWorkers), set.next, set.logger), nil + return newPartitionBatcher(*cfg.Get(), sizer, set.mergeCtx, newWorkerPool(set.maxWorkers), set.next, set.logger), nil } - return newMultiBatcher(*cfg.Get(), sizer, newWorkerPool(set.maxWorkers), set.partitioner, set.next, set.logger), nil + return newMultiBatcher(*cfg.Get(), sizer, newWorkerPool(set.maxWorkers), set.partitioner, set.mergeCtx, set.next, set.logger), nil } func activeSizer[T any](sizerType request.SizerType, itemsSizer, bytesSizer request.Sizer[T]) request.Sizer[T] { diff --git a/exporter/exporterhelper/internal/queuebatch/multi_batcher.go b/exporter/exporterhelper/internal/queuebatch/multi_batcher.go index 18f2035d3f5..92428ae1c04 100644 --- a/exporter/exporterhelper/internal/queuebatch/multi_batcher.go +++ b/exporter/exporterhelper/internal/queuebatch/multi_batcher.go @@ -21,6 +21,7 @@ type multiBatcher struct { wp *workerPool sizer request.Sizer[request.Request] partitioner Partitioner[request.Request] + mergeCtx func(context.Context, context.Context) context.Context consumeFunc sender.SendFunc[request.Request] shards sync.Map logger *zap.Logger @@ -31,6 +32,7 @@ func newMultiBatcher( sizer request.Sizer[request.Request], wp *workerPool, partitioner Partitioner[request.Request], + mergeCtx func(context.Context, context.Context) context.Context, next sender.SendFunc[request.Request], logger *zap.Logger, ) *multiBatcher { @@ -39,6 +41,7 @@ func newMultiBatcher( wp: wp, sizer: sizer, partitioner: partitioner, + mergeCtx: mergeCtx, consumeFunc: next, logger: logger, } @@ -51,7 +54,8 @@ func (mb *multiBatcher) getPartition(ctx context.Context, req request.Request) * if found { return s.(*partitionBatcher) } - newS := newPartitionBatcher(mb.cfg, mb.sizer, mb.wp, mb.consumeFunc, mb.logger) + + newS := newPartitionBatcher(mb.cfg, mb.sizer, mb.mergeCtx, mb.wp, mb.consumeFunc, mb.logger) _ = newS.Start(ctx, nil) s, loaded := mb.shards.LoadOrStore(key, newS) // If not loaded, there was a race condition in adding the new shard. Shutdown the newly created shard. diff --git a/exporter/exporterhelper/internal/queuebatch/multi_batcher_test.go b/exporter/exporterhelper/internal/queuebatch/multi_batcher_test.go index fcf04ce2c77..58cce15ce28 100644 --- a/exporter/exporterhelper/internal/queuebatch/multi_batcher_test.go +++ b/exporter/exporterhelper/internal/queuebatch/multi_batcher_test.go @@ -33,6 +33,7 @@ func TestMultiBatcher_NoTimeout(t *testing.T) { NewPartitioner(func(ctx context.Context, _ request.Request) string { return ctx.Value(partitionKey{}).(string) }), + nil, sink.Export, zap.NewNop(), ) @@ -85,6 +86,7 @@ func TestMultiBatcher_Timeout(t *testing.T) { NewPartitioner(func(ctx context.Context, _ request.Request) string { return ctx.Value(partitionKey{}).(string) }), + nil, sink.Export, zap.NewNop(), ) diff --git a/exporter/exporterhelper/internal/queuebatch/partition_batcher.go b/exporter/exporterhelper/internal/queuebatch/partition_batcher.go index 3410e71d770..d2941456775 100644 --- a/exporter/exporterhelper/internal/queuebatch/partition_batcher.go +++ b/exporter/exporterhelper/internal/queuebatch/partition_batcher.go @@ -30,6 +30,7 @@ type partitionBatcher struct { cfg BatchConfig wp *workerPool sizer request.Sizer[request.Request] + mergeCtx func(context.Context, context.Context) context.Context consumeFunc sender.SendFunc[request.Request] stopWG sync.WaitGroup currentBatchMu sync.Mutex @@ -42,6 +43,7 @@ type partitionBatcher struct { func newPartitionBatcher( cfg BatchConfig, sizer request.Sizer[request.Request], + mergeCtx func(context.Context, context.Context) context.Context, wp *workerPool, next sender.SendFunc[request.Request], logger *zap.Logger, @@ -50,6 +52,7 @@ func newPartitionBatcher( cfg: cfg, wp: wp, sizer: sizer, + mergeCtx: mergeCtx, consumeFunc: next, shutdownCh: make(chan struct{}, 1), logger: logger, @@ -147,7 +150,12 @@ func (qb *partitionBatcher) Consume(ctx context.Context, req request.Request, do // Logic on how to deal with the current batch: qb.currentBatch.req = reqList[0] qb.currentBatch.done = append(qb.currentBatch.done, done) - qb.currentBatch.ctx = contextWithMergedLinks(qb.currentBatch.ctx, ctx) + + mergedCtx := context.Background() + if qb.mergeCtx != nil { + mergedCtx = qb.mergeCtx(qb.currentBatch.ctx, ctx) + } + qb.currentBatch.ctx = contextWithMergedLinks(mergedCtx, qb.currentBatch.ctx, ctx) // Save the "currentBatch" if we need to flush it, because we want to execute flush without holding the lock, and // cannot unlock and re-lock because we are not done processing all the responses. diff --git a/exporter/exporterhelper/internal/queuebatch/partition_batcher_test.go b/exporter/exporterhelper/internal/queuebatch/partition_batcher_test.go index 10faa7ed419..f23b290fe6e 100644 --- a/exporter/exporterhelper/internal/queuebatch/partition_batcher_test.go +++ b/exporter/exporterhelper/internal/queuebatch/partition_batcher_test.go @@ -21,6 +21,10 @@ import ( "go.opentelemetry.io/collector/exporter/exporterhelper/internal/requesttest" ) +type testContextKey string + +const timestampKey testContextKey = "timestamp" + func TestPartitionBatcher_NoSplit_MinThresholdZero_TimeoutDisabled(t *testing.T) { tests := []struct { name string @@ -62,7 +66,7 @@ func TestPartitionBatcher_NoSplit_MinThresholdZero_TimeoutDisabled(t *testing.T) } sink := requesttest.NewSink() - ba := newPartitionBatcher(cfg, tt.sizer, newWorkerPool(tt.maxWorkers), sink.Export, zap.NewNop()) + ba := newPartitionBatcher(cfg, tt.sizer, nil, newWorkerPool(tt.maxWorkers), sink.Export, zap.NewNop()) require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { require.NoError(t, ba.Shutdown(context.Background())) @@ -128,7 +132,7 @@ func TestPartitionBatcher_NoSplit_TimeoutDisabled(t *testing.T) { } sink := requesttest.NewSink() - ba := newPartitionBatcher(cfg, tt.sizer, newWorkerPool(tt.maxWorkers), sink.Export, zap.NewNop()) + ba := newPartitionBatcher(cfg, tt.sizer, nil, newWorkerPool(tt.maxWorkers), sink.Export, zap.NewNop()) require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost())) done := newFakeDone() @@ -209,7 +213,7 @@ func TestPartitionBatcher_NoSplit_WithTimeout(t *testing.T) { } sink := requesttest.NewSink() - ba := newPartitionBatcher(cfg, tt.sizer, newWorkerPool(tt.maxWorkers), sink.Export, zap.NewNop()) + ba := newPartitionBatcher(cfg, tt.sizer, nil, newWorkerPool(tt.maxWorkers), sink.Export, zap.NewNop()) require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { require.NoError(t, ba.Shutdown(context.Background())) @@ -281,7 +285,7 @@ func TestPartitionBatcher_Split_TimeoutDisabled(t *testing.T) { } sink := requesttest.NewSink() - ba := newPartitionBatcher(cfg, tt.sizer, newWorkerPool(tt.maxWorkers), sink.Export, zap.NewNop()) + ba := newPartitionBatcher(cfg, tt.sizer, nil, newWorkerPool(tt.maxWorkers), sink.Export, zap.NewNop()) require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost())) done := newFakeDone() @@ -329,7 +333,7 @@ func TestPartitionBatcher_Shutdown(t *testing.T) { } sink := requesttest.NewSink() - ba := newPartitionBatcher(cfg, request.NewItemsSizer(), newWorkerPool(2), sink.Export, zap.NewNop()) + ba := newPartitionBatcher(cfg, request.NewItemsSizer(), nil, newWorkerPool(2), sink.Export, zap.NewNop()) require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost())) done := newFakeDone() @@ -358,7 +362,7 @@ func TestPartitionBatcher_MergeError(t *testing.T) { } sink := requesttest.NewSink() - ba := newPartitionBatcher(cfg, request.NewItemsSizer(), newWorkerPool(2), sink.Export, zap.NewNop()) + ba := newPartitionBatcher(cfg, request.NewItemsSizer(), nil, newWorkerPool(2), sink.Export, zap.NewNop()) require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { require.NoError(t, ba.Shutdown(context.Background())) @@ -392,7 +396,7 @@ func TestPartitionBatcher_PartialSuccessError(t *testing.T) { core, observed := observer.New(zap.WarnLevel) logger := zap.New(core) sink := requesttest.NewSink() - ba := newPartitionBatcher(cfg, request.NewItemsSizer(), newWorkerPool(1), sink.Export, logger) + ba := newPartitionBatcher(cfg, request.NewItemsSizer(), nil, newWorkerPool(1), sink.Export, logger) require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost())) done := newFakeDone() @@ -434,7 +438,7 @@ func TestSPartitionBatcher_PartialSuccessError_AfterOkRequest(t *testing.T) { core, observed := observer.New(zap.WarnLevel) logger := zap.New(core) sink := requesttest.NewSink() - ba := newPartitionBatcher(cfg, request.NewItemsSizer(), newWorkerPool(1), sink.Export, logger) + ba := newPartitionBatcher(cfg, request.NewItemsSizer(), nil, newWorkerPool(1), sink.Export, logger) require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost())) done := newFakeDone() @@ -494,7 +498,7 @@ func TestShardBatcher_EmptyRequestList(t *testing.T) { } sink := requesttest.NewSink() - ba := newPartitionBatcher(cfg, request.NewItemsSizer(), newWorkerPool(1), sink.Export, zap.NewNop()) + ba := newPartitionBatcher(cfg, request.NewItemsSizer(), nil, newWorkerPool(1), sink.Export, zap.NewNop()) require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { require.NoError(t, ba.Shutdown(context.Background())) @@ -513,3 +517,46 @@ func TestShardBatcher_EmptyRequestList(t *testing.T) { assert.Equal(t, int64(0), done.success.Load()) assert.Equal(t, 0, sink.RequestsCount()) } + +func TestPartitionBatcher_ContextMerging(t *testing.T) { + tests := []struct { + name string + mergeCtxFunc func(ctx1 context.Context, ctx2 context.Context) context.Context + }{ + { + name: "merge_context_with_timestamp", + mergeCtxFunc: func(ctx1 context.Context, _ context.Context) context.Context { + return context.WithValue(ctx1, timestampKey, 1234) + }, + }, + { + name: "merge_context_returns_background", + mergeCtxFunc: func(context.Context, context.Context) context.Context { + return context.Background() + }, + }, + { + name: "nil_merge_context", + mergeCtxFunc: nil, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg := BatchConfig{ + FlushTimeout: 0, + Sizer: request.SizerTypeItems, + MinSize: 10, + } + sink := requesttest.NewSink() + ba := newPartitionBatcher(cfg, request.NewItemsSizer(), tt.mergeCtxFunc, newWorkerPool(1), sink.Export, zap.NewNop()) + require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost())) + + done := newFakeDone() + ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 8, Bytes: 8}, done) + ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 8, Bytes: 8}, done) + <-time.After(10 * time.Millisecond) + assert.Equal(t, 1, sink.RequestsCount()) + assert.EqualValues(t, 2, done.success.Load()) + }) + } +} diff --git a/exporter/exporterhelper/internal/queuebatch/queue_batch.go b/exporter/exporterhelper/internal/queuebatch/queue_batch.go index 715a2a0841a..2ab3970c470 100644 --- a/exporter/exporterhelper/internal/queuebatch/queue_batch.go +++ b/exporter/exporterhelper/internal/queuebatch/queue_batch.go @@ -23,6 +23,7 @@ type Settings[T any] struct { ItemsSizer request.Sizer[T] BytesSizer request.Sizer[T] Partitioner Partitioner[T] + MergeCtx func(context.Context, context.Context) context.Context } type QueueBatch struct { @@ -39,6 +40,7 @@ func NewQueueBatch( itemsSizer: set.ItemsSizer, bytesSizer: set.BytesSizer, partitioner: set.Partitioner, + mergeCtx: set.MergeCtx, next: next, maxWorkers: cfg.NumConsumers, logger: set.Telemetry.Logger, From 2011f8991d13a9a903e7b922eda203f7582aed53 Mon Sep 17 00:00:00 2001 From: Sindy Li Date: Fri, 8 Aug 2025 14:10:04 -0700 Subject: [PATCH 2/2] Try make the linter happy --- .../exporterhelper/internal/queuebatch/partition_batcher.go | 4 ++-- .../internal/queuebatch/partition_batcher_test.go | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/exporter/exporterhelper/internal/queuebatch/partition_batcher.go b/exporter/exporterhelper/internal/queuebatch/partition_batcher.go index d2941456775..bedf07bf221 100644 --- a/exporter/exporterhelper/internal/queuebatch/partition_batcher.go +++ b/exporter/exporterhelper/internal/queuebatch/partition_batcher.go @@ -151,7 +151,7 @@ func (qb *partitionBatcher) Consume(ctx context.Context, req request.Request, do qb.currentBatch.req = reqList[0] qb.currentBatch.done = append(qb.currentBatch.done, done) - mergedCtx := context.Background() + mergedCtx := context.Background() //nolint:contextcheck if qb.mergeCtx != nil { mergedCtx = qb.mergeCtx(qb.currentBatch.ctx, ctx) } @@ -185,7 +185,7 @@ func (qb *partitionBatcher) Consume(ctx context.Context, req request.Request, do qb.currentBatchMu.Unlock() if firstBatch != nil { - qb.flush(firstBatch.ctx, firstBatch.req, firstBatch.done) //nolint:contextcheck //context already handled + qb.flush(firstBatch.ctx, firstBatch.req, firstBatch.done) } for i := 0; i < len(reqList); i++ { qb.flush(ctx, reqList[i], done) diff --git a/exporter/exporterhelper/internal/queuebatch/partition_batcher_test.go b/exporter/exporterhelper/internal/queuebatch/partition_batcher_test.go index f23b290fe6e..523a0bd9491 100644 --- a/exporter/exporterhelper/internal/queuebatch/partition_batcher_test.go +++ b/exporter/exporterhelper/internal/queuebatch/partition_batcher_test.go @@ -521,17 +521,17 @@ func TestShardBatcher_EmptyRequestList(t *testing.T) { func TestPartitionBatcher_ContextMerging(t *testing.T) { tests := []struct { name string - mergeCtxFunc func(ctx1 context.Context, ctx2 context.Context) context.Context + mergeCtxFunc func(ctx1, ctx2 context.Context) context.Context }{ { name: "merge_context_with_timestamp", - mergeCtxFunc: func(ctx1 context.Context, _ context.Context) context.Context { + mergeCtxFunc: func(ctx1, _ context.Context) context.Context { return context.WithValue(ctx1, timestampKey, 1234) }, }, { name: "merge_context_returns_background", - mergeCtxFunc: func(context.Context, context.Context) context.Context { + mergeCtxFunc: func(_, _ context.Context) context.Context { return context.Background() }, },