Skip to content

Commit bee7f25

Browse files
committed
Merge context
1 parent 19a57a5 commit bee7f25

File tree

9 files changed

+142
-19
lines changed

9 files changed

+142
-19
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: exporterhelper
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Provide an interface `queue_batch.Setting.MergeCtx` so users can control how context values are preserved or combined
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [13320]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
By supplying a custom mergeCtx function, users can control how context values are preserved or combined.
20+
The default behavior is to preserve no context values.
21+
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [api]

exporter/exporterhelper/internal/queuebatch/batch_context.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,9 @@ func parentsFromContext(ctx context.Context) []trace.Link {
3131
return LinksFromContext(ctx)
3232
}
3333

34-
func contextWithMergedLinks(ctx1, ctx2 context.Context) context.Context {
34+
func contextWithMergedLinks(mergedCtx, ctx1, ctx2 context.Context) context.Context {
3535
return context.WithValue(
36-
context.Background(),
36+
mergedCtx,
3737
batchSpanLinksKey,
38-
append(parentsFromContext(ctx1), parentsFromContext(ctx2)...),
39-
)
38+
append(parentsFromContext(ctx1), parentsFromContext(ctx2)...))
4039
}

exporter/exporterhelper/internal/queuebatch/batch_context_test.go

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,32 @@ import (
1313
"go.opentelemetry.io/collector/component/componenttest"
1414
)
1515

16+
type testTimestampKeyType int
17+
18+
const testTimestampKey testTimestampKeyType = iota
19+
20+
// mergeCtxFunc corresponds to user specified mergeCtx function in the batcher settings.
21+
// This specific merge Context function keeps the greater of timestamps from two contexts.
22+
func mergeCtxFunc(ctx1, ctx2 context.Context) context.Context {
23+
timestamp1 := ctx1.Value(testTimestampKey)
24+
timestamp2 := ctx2.Value(testTimestampKey)
25+
if timestamp1 != nil && timestamp2 != nil {
26+
if timestamp1.(int) > timestamp2.(int) {
27+
return context.WithValue(context.Background(), testTimestampKey, timestamp1)
28+
}
29+
return context.WithValue(context.Background(), testTimestampKey, timestamp2)
30+
}
31+
if timestamp1 != nil {
32+
return context.WithValue(context.Background(), testTimestampKey, timestamp1)
33+
}
34+
return context.WithValue(context.Background(), testTimestampKey, timestamp2)
35+
}
36+
37+
// mergeContextHelper performs the same operation done during batching.
38+
func mergeContextHelper(ctx1, ctx2 context.Context) context.Context {
39+
return contextWithMergedLinks(mergeCtxFunc(ctx1, ctx2), ctx1, ctx2)
40+
}
41+
1642
func TestBatchContextLink(t *testing.T) {
1743
tracerProvider := componenttest.NewTelemetry().NewTelemetrySettings().TracerProvider
1844
tracer := tracerProvider.Tracer("go.opentelemetry.io/collector/exporter/exporterhelper")
@@ -28,12 +54,19 @@ func TestBatchContextLink(t *testing.T) {
2854
ctx4, span4 := tracer.Start(ctx1, "span4")
2955
defer span4.End()
3056

31-
batchContext := contextWithMergedLinks(ctx2, ctx3)
32-
batchContext = contextWithMergedLinks(batchContext, ctx4)
57+
batchContext := mergeContextHelper(ctx2, ctx3)
58+
batchContext = mergeContextHelper(batchContext, ctx4)
3359

3460
actualLinks := LinksFromContext(batchContext)
3561
require.Len(t, actualLinks, 3)
3662
require.Equal(t, trace.SpanContextFromContext(ctx2), actualLinks[0].SpanContext)
3763
require.Equal(t, trace.SpanContextFromContext(ctx3), actualLinks[1].SpanContext)
3864
require.Equal(t, trace.SpanContextFromContext(ctx4), actualLinks[2].SpanContext)
3965
}
66+
67+
func TestMergedContext_GetValue(t *testing.T) {
68+
ctx1 := context.WithValue(context.Background(), testTimestampKey, 1234)
69+
ctx2 := context.WithValue(context.Background(), testTimestampKey, 2345)
70+
batchContext := mergeContextHelper(ctx1, ctx2)
71+
require.Equal(t, 2345, batchContext.Value(testTimestampKey))
72+
}

exporter/exporterhelper/internal/queuebatch/batcher.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ type batcherSettings[T any] struct {
2626
itemsSizer request.Sizer[T]
2727
bytesSizer request.Sizer[T]
2828
partitioner Partitioner[T]
29+
mergeCtx func(context.Context, context.Context) context.Context
2930
next sender.SendFunc[T]
3031
maxWorkers int
3132
logger *zap.Logger
@@ -42,10 +43,10 @@ func NewBatcher(cfg configoptional.Optional[BatchConfig], set batcherSettings[re
4243
}
4344

4445
if set.partitioner == nil {
45-
return newPartitionBatcher(*cfg.Get(), sizer, newWorkerPool(set.maxWorkers), set.next, set.logger), nil
46+
return newPartitionBatcher(*cfg.Get(), sizer, set.mergeCtx, newWorkerPool(set.maxWorkers), set.next, set.logger), nil
4647
}
4748

48-
return newMultiBatcher(*cfg.Get(), sizer, newWorkerPool(set.maxWorkers), set.partitioner, set.next, set.logger), nil
49+
return newMultiBatcher(*cfg.Get(), sizer, newWorkerPool(set.maxWorkers), set.partitioner, set.mergeCtx, set.next, set.logger), nil
4950
}
5051

5152
func activeSizer[T any](sizerType request.SizerType, itemsSizer, bytesSizer request.Sizer[T]) request.Sizer[T] {

exporter/exporterhelper/internal/queuebatch/multi_batcher.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ type multiBatcher struct {
2121
wp *workerPool
2222
sizer request.Sizer[request.Request]
2323
partitioner Partitioner[request.Request]
24+
mergeCtx func(context.Context, context.Context) context.Context
2425
consumeFunc sender.SendFunc[request.Request]
2526
shards sync.Map
2627
logger *zap.Logger
@@ -31,6 +32,7 @@ func newMultiBatcher(
3132
sizer request.Sizer[request.Request],
3233
wp *workerPool,
3334
partitioner Partitioner[request.Request],
35+
mergeCtx func(context.Context, context.Context) context.Context,
3436
next sender.SendFunc[request.Request],
3537
logger *zap.Logger,
3638
) *multiBatcher {
@@ -39,6 +41,7 @@ func newMultiBatcher(
3941
wp: wp,
4042
sizer: sizer,
4143
partitioner: partitioner,
44+
mergeCtx: mergeCtx,
4245
consumeFunc: next,
4346
logger: logger,
4447
}
@@ -51,7 +54,8 @@ func (mb *multiBatcher) getPartition(ctx context.Context, req request.Request) *
5154
if found {
5255
return s.(*partitionBatcher)
5356
}
54-
newS := newPartitionBatcher(mb.cfg, mb.sizer, mb.wp, mb.consumeFunc, mb.logger)
57+
58+
newS := newPartitionBatcher(mb.cfg, mb.sizer, mb.mergeCtx, mb.wp, mb.consumeFunc, mb.logger)
5559
_ = newS.Start(ctx, nil)
5660
s, loaded := mb.shards.LoadOrStore(key, newS)
5761
// If not loaded, there was a race condition in adding the new shard. Shutdown the newly created shard.

exporter/exporterhelper/internal/queuebatch/multi_batcher_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ func TestMultiBatcher_NoTimeout(t *testing.T) {
3333
NewPartitioner(func(ctx context.Context, _ request.Request) string {
3434
return ctx.Value(partitionKey{}).(string)
3535
}),
36+
nil,
3637
sink.Export,
3738
zap.NewNop(),
3839
)
@@ -85,6 +86,7 @@ func TestMultiBatcher_Timeout(t *testing.T) {
8586
NewPartitioner(func(ctx context.Context, _ request.Request) string {
8687
return ctx.Value(partitionKey{}).(string)
8788
}),
89+
nil,
8890
sink.Export,
8991
zap.NewNop(),
9092
)

exporter/exporterhelper/internal/queuebatch/partition_batcher.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ type partitionBatcher struct {
3030
cfg BatchConfig
3131
wp *workerPool
3232
sizer request.Sizer[request.Request]
33+
mergeCtx func(context.Context, context.Context) context.Context
3334
consumeFunc sender.SendFunc[request.Request]
3435
stopWG sync.WaitGroup
3536
currentBatchMu sync.Mutex
@@ -42,6 +43,7 @@ type partitionBatcher struct {
4243
func newPartitionBatcher(
4344
cfg BatchConfig,
4445
sizer request.Sizer[request.Request],
46+
mergeCtx func(context.Context, context.Context) context.Context,
4547
wp *workerPool,
4648
next sender.SendFunc[request.Request],
4749
logger *zap.Logger,
@@ -50,6 +52,7 @@ func newPartitionBatcher(
5052
cfg: cfg,
5153
wp: wp,
5254
sizer: sizer,
55+
mergeCtx: mergeCtx,
5356
consumeFunc: next,
5457
shutdownCh: make(chan struct{}, 1),
5558
logger: logger,
@@ -147,7 +150,12 @@ func (qb *partitionBatcher) Consume(ctx context.Context, req request.Request, do
147150
// Logic on how to deal with the current batch:
148151
qb.currentBatch.req = reqList[0]
149152
qb.currentBatch.done = append(qb.currentBatch.done, done)
150-
qb.currentBatch.ctx = contextWithMergedLinks(qb.currentBatch.ctx, ctx)
153+
154+
mergedCtx := context.Background()
155+
if qb.mergeCtx != nil {
156+
mergedCtx = qb.mergeCtx(qb.currentBatch.ctx, ctx)
157+
}
158+
qb.currentBatch.ctx = contextWithMergedLinks(mergedCtx, qb.currentBatch.ctx, ctx)
151159

152160
// Save the "currentBatch" if we need to flush it, because we want to execute flush without holding the lock, and
153161
// cannot unlock and re-lock because we are not done processing all the responses.

exporter/exporterhelper/internal/queuebatch/partition_batcher_test.go

Lines changed: 56 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ import (
2121
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/requesttest"
2222
)
2323

24+
type testContextKey string
25+
26+
const timestampKey testContextKey = "timestamp"
27+
2428
func TestPartitionBatcher_NoSplit_MinThresholdZero_TimeoutDisabled(t *testing.T) {
2529
tests := []struct {
2630
name string
@@ -62,7 +66,7 @@ func TestPartitionBatcher_NoSplit_MinThresholdZero_TimeoutDisabled(t *testing.T)
6266
}
6367

6468
sink := requesttest.NewSink()
65-
ba := newPartitionBatcher(cfg, tt.sizer, newWorkerPool(tt.maxWorkers), sink.Export, zap.NewNop())
69+
ba := newPartitionBatcher(cfg, tt.sizer, nil, newWorkerPool(tt.maxWorkers), sink.Export, zap.NewNop())
6670
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
6771
t.Cleanup(func() {
6872
require.NoError(t, ba.Shutdown(context.Background()))
@@ -128,7 +132,7 @@ func TestPartitionBatcher_NoSplit_TimeoutDisabled(t *testing.T) {
128132
}
129133

130134
sink := requesttest.NewSink()
131-
ba := newPartitionBatcher(cfg, tt.sizer, newWorkerPool(tt.maxWorkers), sink.Export, zap.NewNop())
135+
ba := newPartitionBatcher(cfg, tt.sizer, nil, newWorkerPool(tt.maxWorkers), sink.Export, zap.NewNop())
132136
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
133137

134138
done := newFakeDone()
@@ -209,7 +213,7 @@ func TestPartitionBatcher_NoSplit_WithTimeout(t *testing.T) {
209213
}
210214

211215
sink := requesttest.NewSink()
212-
ba := newPartitionBatcher(cfg, tt.sizer, newWorkerPool(tt.maxWorkers), sink.Export, zap.NewNop())
216+
ba := newPartitionBatcher(cfg, tt.sizer, nil, newWorkerPool(tt.maxWorkers), sink.Export, zap.NewNop())
213217
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
214218
t.Cleanup(func() {
215219
require.NoError(t, ba.Shutdown(context.Background()))
@@ -281,7 +285,7 @@ func TestPartitionBatcher_Split_TimeoutDisabled(t *testing.T) {
281285
}
282286

283287
sink := requesttest.NewSink()
284-
ba := newPartitionBatcher(cfg, tt.sizer, newWorkerPool(tt.maxWorkers), sink.Export, zap.NewNop())
288+
ba := newPartitionBatcher(cfg, tt.sizer, nil, newWorkerPool(tt.maxWorkers), sink.Export, zap.NewNop())
285289
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
286290

287291
done := newFakeDone()
@@ -329,7 +333,7 @@ func TestPartitionBatcher_Shutdown(t *testing.T) {
329333
}
330334

331335
sink := requesttest.NewSink()
332-
ba := newPartitionBatcher(cfg, request.NewItemsSizer(), newWorkerPool(2), sink.Export, zap.NewNop())
336+
ba := newPartitionBatcher(cfg, request.NewItemsSizer(), nil, newWorkerPool(2), sink.Export, zap.NewNop())
333337
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
334338

335339
done := newFakeDone()
@@ -358,7 +362,7 @@ func TestPartitionBatcher_MergeError(t *testing.T) {
358362
}
359363

360364
sink := requesttest.NewSink()
361-
ba := newPartitionBatcher(cfg, request.NewItemsSizer(), newWorkerPool(2), sink.Export, zap.NewNop())
365+
ba := newPartitionBatcher(cfg, request.NewItemsSizer(), nil, newWorkerPool(2), sink.Export, zap.NewNop())
362366
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
363367
t.Cleanup(func() {
364368
require.NoError(t, ba.Shutdown(context.Background()))
@@ -392,7 +396,7 @@ func TestPartitionBatcher_PartialSuccessError(t *testing.T) {
392396
core, observed := observer.New(zap.WarnLevel)
393397
logger := zap.New(core)
394398
sink := requesttest.NewSink()
395-
ba := newPartitionBatcher(cfg, request.NewItemsSizer(), newWorkerPool(1), sink.Export, logger)
399+
ba := newPartitionBatcher(cfg, request.NewItemsSizer(), nil, newWorkerPool(1), sink.Export, logger)
396400
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
397401

398402
done := newFakeDone()
@@ -434,7 +438,7 @@ func TestSPartitionBatcher_PartialSuccessError_AfterOkRequest(t *testing.T) {
434438
core, observed := observer.New(zap.WarnLevel)
435439
logger := zap.New(core)
436440
sink := requesttest.NewSink()
437-
ba := newPartitionBatcher(cfg, request.NewItemsSizer(), newWorkerPool(1), sink.Export, logger)
441+
ba := newPartitionBatcher(cfg, request.NewItemsSizer(), nil, newWorkerPool(1), sink.Export, logger)
438442
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
439443

440444
done := newFakeDone()
@@ -494,7 +498,7 @@ func TestShardBatcher_EmptyRequestList(t *testing.T) {
494498
}
495499

496500
sink := requesttest.NewSink()
497-
ba := newPartitionBatcher(cfg, request.NewItemsSizer(), newWorkerPool(1), sink.Export, zap.NewNop())
501+
ba := newPartitionBatcher(cfg, request.NewItemsSizer(), nil, newWorkerPool(1), sink.Export, zap.NewNop())
498502
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
499503
t.Cleanup(func() {
500504
require.NoError(t, ba.Shutdown(context.Background()))
@@ -513,3 +517,46 @@ func TestShardBatcher_EmptyRequestList(t *testing.T) {
513517
assert.Equal(t, int64(0), done.success.Load())
514518
assert.Equal(t, 0, sink.RequestsCount())
515519
}
520+
521+
func TestPartitionBatcher_ContextMerging(t *testing.T) {
522+
tests := []struct {
523+
name string
524+
mergeCtxFunc func(ctx1 context.Context, ctx2 context.Context) context.Context
525+
}{
526+
{
527+
name: "merge_context_with_timestamp",
528+
mergeCtxFunc: func(ctx1 context.Context, _ context.Context) context.Context {
529+
return context.WithValue(ctx1, timestampKey, 1234)
530+
},
531+
},
532+
{
533+
name: "merge_context_returns_background",
534+
mergeCtxFunc: func(context.Context, context.Context) context.Context {
535+
return context.Background()
536+
},
537+
},
538+
{
539+
name: "nil_merge_context",
540+
mergeCtxFunc: nil,
541+
},
542+
}
543+
for _, tt := range tests {
544+
t.Run(tt.name, func(t *testing.T) {
545+
cfg := BatchConfig{
546+
FlushTimeout: 0,
547+
Sizer: request.SizerTypeItems,
548+
MinSize: 10,
549+
}
550+
sink := requesttest.NewSink()
551+
ba := newPartitionBatcher(cfg, request.NewItemsSizer(), tt.mergeCtxFunc, newWorkerPool(1), sink.Export, zap.NewNop())
552+
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
553+
554+
done := newFakeDone()
555+
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 8, Bytes: 8}, done)
556+
ba.Consume(context.Background(), &requesttest.FakeRequest{Items: 8, Bytes: 8}, done)
557+
<-time.After(10 * time.Millisecond)
558+
assert.Equal(t, 1, sink.RequestsCount())
559+
assert.EqualValues(t, 2, done.success.Load())
560+
})
561+
}
562+
}

exporter/exporterhelper/internal/queuebatch/queue_batch.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ type Settings[T any] struct {
2323
ItemsSizer request.Sizer[T]
2424
BytesSizer request.Sizer[T]
2525
Partitioner Partitioner[T]
26+
MergeCtx func(context.Context, context.Context) context.Context
2627
}
2728

2829
type QueueBatch struct {
@@ -39,6 +40,7 @@ func NewQueueBatch(
3940
itemsSizer: set.ItemsSizer,
4041
bytesSizer: set.BytesSizer,
4142
partitioner: set.Partitioner,
43+
mergeCtx: set.MergeCtx,
4244
next: next,
4345
maxWorkers: cfg.NumConsumers,
4446
logger: set.Telemetry.Logger,

0 commit comments

Comments
 (0)