@@ -177,19 +177,18 @@ public static void setup() throws IOException {
177
177
backend .start ();
178
178
179
179
// Initialize TelemetryRetriever after backend starts with custom error handling
180
- telemetryRetriever = new TelemetryRetriever (
181
- backend .getMappedPort (BACKEND_PORT ),
182
- Duration .ofMinutes (2 )) {
183
- @ Override
184
- public void clearTelemetry () {
185
- try {
186
- super .clearTelemetry ();
187
- } catch (RuntimeException e ) {
188
- // Ignore cleanup errors - backend might already be stopped
189
- logger .debug ("Failed to clear telemetry: {}" , e .getMessage ());
190
- }
191
- }
192
- };
180
+ telemetryRetriever =
181
+ new TelemetryRetriever (backend .getMappedPort (BACKEND_PORT ), Duration .ofMinutes (2 )) {
182
+ @ Override
183
+ public void clearTelemetry () {
184
+ try {
185
+ super .clearTelemetry ();
186
+ } catch (RuntimeException e ) {
187
+ // Ignore cleanup errors - backend might already be stopped
188
+ logger .debug ("Failed to clear telemetry: {}" , e .getMessage ());
189
+ }
190
+ }
191
+ };
193
192
194
193
zookeeper =
195
194
new GenericContainer <>("confluentinc/cp-zookeeper:" + CONFLUENT_VERSION )
@@ -327,7 +326,6 @@ public void reset() {
327
326
clearTelemetryGracefully ();
328
327
}
329
328
330
-
331
329
@ Test
332
330
public void testKafkaConnectMongoSinkTaskInstrumentation ()
333
331
throws IOException , InterruptedException {
@@ -358,48 +356,70 @@ public void testKafkaConnectMongoSinkTaskInstrumentation()
358
356
359
357
// Use SmokeTestInstrumentationExtension's testing framework to wait for and assert traces
360
358
// Wait for traces and then find the specific trace we want
361
- await ().atMost (Duration .ofSeconds (30 )).untilAsserted (() -> {
362
- List <List <SpanData >> traces = testing .waitForTraces (1 );
363
-
364
- // Find the trace that contains our Kafka Connect Consumer span and database INSERT span
365
- List <SpanData > targetTrace = traces .stream ()
366
- .filter (trace -> {
367
- boolean hasKafkaConnectSpan = trace .stream ()
368
- .anyMatch (span -> span .getName ().contains (uniqueTopicName ) &&
369
- span .getKind () == io .opentelemetry .api .trace .SpanKind .CONSUMER );
370
-
371
- boolean hasInsertSpan = trace .stream ()
372
- .anyMatch (span -> span .getName ().contains ("update" ) &&
373
- span .getKind () == io .opentelemetry .api .trace .SpanKind .CLIENT );
374
-
375
- return hasKafkaConnectSpan && hasInsertSpan ;
376
- })
377
- .findFirst ()
378
- .orElse (null );
379
-
380
- // Assert that we found the target trace
381
- assertThat (targetTrace ).isNotNull ();
382
-
383
- // Assert on the spans in the target trace (should have at least 2 spans: Kafka Connect Consumer + database operations)
384
- assertThat (targetTrace ).hasSizeGreaterThanOrEqualTo (2 );
385
-
386
- // Find and assert the Kafka Connect Consumer span
387
- SpanData kafkaConnectSpan = targetTrace .stream ()
388
- .filter (span -> span .getName ().contains (uniqueTopicName ) &&
389
- span .getKind () == io .opentelemetry .api .trace .SpanKind .CONSUMER )
390
- .findFirst ()
391
- .orElse (null );
392
- assertThat (kafkaConnectSpan ).isNotNull ();
393
- assertThat (kafkaConnectSpan .getParentSpanContext ().isValid ()).isFalse (); // No parent
394
-
395
- // Find and assert the database UPDATE span
396
- SpanData insertSpan = targetTrace .stream ()
397
- .filter (span -> span .getName ().contains ("update" ) &&
398
- span .getKind () == io .opentelemetry .api .trace .SpanKind .CLIENT )
399
- .findFirst ()
400
- .orElse (null );
401
- assertThat (insertSpan ).isNotNull ();
402
- });
359
+ await ()
360
+ .atMost (Duration .ofSeconds (30 ))
361
+ .untilAsserted (
362
+ () -> {
363
+ List <List <SpanData >> traces = testing .waitForTraces (1 );
364
+
365
+ // Find the trace that contains our Kafka Connect Consumer span and database INSERT
366
+ // span
367
+ List <SpanData > targetTrace =
368
+ traces .stream ()
369
+ .filter (
370
+ trace -> {
371
+ boolean hasKafkaConnectSpan =
372
+ trace .stream ()
373
+ .anyMatch (
374
+ span ->
375
+ span .getName ().contains (uniqueTopicName )
376
+ && span .getKind ()
377
+ == io .opentelemetry .api .trace .SpanKind
378
+ .CONSUMER );
379
+
380
+ boolean hasInsertSpan =
381
+ trace .stream ()
382
+ .anyMatch (
383
+ span ->
384
+ span .getName ().contains ("update" )
385
+ && span .getKind ()
386
+ == io .opentelemetry .api .trace .SpanKind .CLIENT );
387
+
388
+ return hasKafkaConnectSpan && hasInsertSpan ;
389
+ })
390
+ .findFirst ()
391
+ .orElse (null );
392
+
393
+ // Assert that we found the target trace
394
+ assertThat (targetTrace ).isNotNull ();
395
+
396
+ // Assert on the spans in the target trace (should have at least 2 spans: Kafka
397
+ // Connect Consumer + database operations)
398
+ assertThat (targetTrace ).hasSizeGreaterThanOrEqualTo (2 );
399
+
400
+ // Find and assert the Kafka Connect Consumer span
401
+ SpanData kafkaConnectSpan =
402
+ targetTrace .stream ()
403
+ .filter (
404
+ span ->
405
+ span .getName ().contains (uniqueTopicName )
406
+ && span .getKind () == io .opentelemetry .api .trace .SpanKind .CONSUMER )
407
+ .findFirst ()
408
+ .orElse (null );
409
+ assertThat (kafkaConnectSpan ).isNotNull ();
410
+ assertThat (kafkaConnectSpan .getParentSpanContext ().isValid ()).isFalse (); // No parent
411
+
412
+ // Find and assert the database UPDATE span
413
+ SpanData insertSpan =
414
+ targetTrace .stream ()
415
+ .filter (
416
+ span ->
417
+ span .getName ().contains ("update" )
418
+ && span .getKind () == io .opentelemetry .api .trace .SpanKind .CLIENT )
419
+ .findFirst ()
420
+ .orElse (null );
421
+ assertThat (insertSpan ).isNotNull ();
422
+ });
403
423
}
404
424
405
425
@ Test
@@ -446,52 +466,74 @@ public void testKafkaConnectMongoSinkTaskMultiTopicInstrumentation()
446
466
447
467
// Use SmokeTestInstrumentationExtension's testing framework to wait for and assert traces
448
468
// Wait for traces and then find the specific trace we want
449
- await ().atMost (Duration .ofSeconds (30 )).untilAsserted (() -> {
450
- List <List <SpanData >> traces = testing .waitForTraces (1 );
451
-
452
- // Find the trace that contains our Kafka Connect Consumer span and database INSERT span
453
- List <SpanData > targetTrace = traces .stream ()
454
- .filter (trace -> {
455
- boolean hasKafkaConnectSpan = trace .stream ()
456
- .anyMatch (span -> (span .getName ().contains (topicName1 ) ||
457
- span .getName ().contains (topicName2 ) ||
458
- span .getName ().contains (topicName3 )) &&
459
- span .getKind () == io .opentelemetry .api .trace .SpanKind .CONSUMER );
460
-
461
- boolean hasInsertSpan = trace .stream ()
462
- .anyMatch (span -> span .getName ().contains ("update" ) &&
463
- span .getKind () == io .opentelemetry .api .trace .SpanKind .CLIENT );
464
-
465
- return hasKafkaConnectSpan && hasInsertSpan ;
466
- })
467
- .findFirst ()
468
- .orElse (null );
469
-
470
- // Assert that we found the target trace
471
- assertThat (targetTrace ).isNotNull ();
472
-
473
- // Assert on the spans in the target trace (should have at least 2 spans: Kafka Connect Consumer + database operations)
474
- assertThat (targetTrace ).hasSizeGreaterThanOrEqualTo (2 );
475
-
476
- // Find and assert the Kafka Connect Consumer span (multi-topic span)
477
- SpanData kafkaConnectSpan = targetTrace .stream ()
478
- .filter (span -> (span .getName ().contains (topicName1 ) ||
479
- span .getName ().contains (topicName2 ) ||
480
- span .getName ().contains (topicName3 )) &&
481
- span .getKind () == io .opentelemetry .api .trace .SpanKind .CONSUMER )
482
- .findFirst ()
483
- .orElse (null );
484
- assertThat (kafkaConnectSpan ).isNotNull ();
485
- assertThat (kafkaConnectSpan .getParentSpanContext ().isValid ()).isFalse (); // No parent
486
-
487
- // Find and assert the database UPDATE span
488
- SpanData insertSpan = targetTrace .stream ()
489
- .filter (span -> span .getName ().contains ("update" ) &&
490
- span .getKind () == io .opentelemetry .api .trace .SpanKind .CLIENT )
491
- .findFirst ()
492
- .orElse (null );
493
- assertThat (insertSpan ).isNotNull ();
494
- });
469
+ await ()
470
+ .atMost (Duration .ofSeconds (30 ))
471
+ .untilAsserted (
472
+ () -> {
473
+ List <List <SpanData >> traces = testing .waitForTraces (1 );
474
+
475
+ // Find the trace that contains our Kafka Connect Consumer span and database INSERT
476
+ // span
477
+ List <SpanData > targetTrace =
478
+ traces .stream ()
479
+ .filter (
480
+ trace -> {
481
+ boolean hasKafkaConnectSpan =
482
+ trace .stream ()
483
+ .anyMatch (
484
+ span ->
485
+ (span .getName ().contains (topicName1 )
486
+ || span .getName ().contains (topicName2 )
487
+ || span .getName ().contains (topicName3 ))
488
+ && span .getKind ()
489
+ == io .opentelemetry .api .trace .SpanKind
490
+ .CONSUMER );
491
+
492
+ boolean hasInsertSpan =
493
+ trace .stream ()
494
+ .anyMatch (
495
+ span ->
496
+ span .getName ().contains ("update" )
497
+ && span .getKind ()
498
+ == io .opentelemetry .api .trace .SpanKind .CLIENT );
499
+
500
+ return hasKafkaConnectSpan && hasInsertSpan ;
501
+ })
502
+ .findFirst ()
503
+ .orElse (null );
504
+
505
+ // Assert that we found the target trace
506
+ assertThat (targetTrace ).isNotNull ();
507
+
508
+ // Assert on the spans in the target trace (should have at least 2 spans: Kafka
509
+ // Connect Consumer + database operations)
510
+ assertThat (targetTrace ).hasSizeGreaterThanOrEqualTo (2 );
511
+
512
+ // Find and assert the Kafka Connect Consumer span (multi-topic span)
513
+ SpanData kafkaConnectSpan =
514
+ targetTrace .stream ()
515
+ .filter (
516
+ span ->
517
+ (span .getName ().contains (topicName1 )
518
+ || span .getName ().contains (topicName2 )
519
+ || span .getName ().contains (topicName3 ))
520
+ && span .getKind () == io .opentelemetry .api .trace .SpanKind .CONSUMER )
521
+ .findFirst ()
522
+ .orElse (null );
523
+ assertThat (kafkaConnectSpan ).isNotNull ();
524
+ assertThat (kafkaConnectSpan .getParentSpanContext ().isValid ()).isFalse (); // No parent
525
+
526
+ // Find and assert the database UPDATE span
527
+ SpanData insertSpan =
528
+ targetTrace .stream ()
529
+ .filter (
530
+ span ->
531
+ span .getName ().contains ("update" )
532
+ && span .getKind () == io .opentelemetry .api .trace .SpanKind .CLIENT )
533
+ .findFirst ()
534
+ .orElse (null );
535
+ assertThat (insertSpan ).isNotNull ();
536
+ });
495
537
}
496
538
497
539
// Private methods
@@ -695,6 +737,4 @@ public static void cleanup() {
695
737
}
696
738
}
697
739
}
698
-
699
-
700
740
}
0 commit comments