Skip to content

Conversation

@g7ed6e
Copy link
Contributor

@g7ed6e g7ed6e commented Jul 2, 2024

Resolves #1932

@CodeBlanch : I adapted a bit the API you suggested.
@lmolkova : Can you confirm that's OK to output both a receive and a process span for a single message at consumer side ?

@g7ed6e g7ed6e requested a review from a team July 2, 2024 15:28
@g7ed6e g7ed6e changed the title Feature/confluent kafka part 2 Add a process span to ConfluentKafka instrumentation Jul 2, 2024
@codecov
Copy link

codecov bot commented Jul 2, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 71.26%. Comparing base (71655ce) to head (6867f71).
Report is 398 commits behind head on main.

Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #1937      +/-   ##
==========================================
- Coverage   73.91%   71.26%   -2.66%     
==========================================
  Files         267      311      +44     
  Lines        9615    11347    +1732     
==========================================
+ Hits         7107     8086     +979     
- Misses       2508     3261     +753     
Flag Coverage Δ
unittests-Exporter.Geneva 53.20% <ø> (?)
unittests-Exporter.InfluxDB 95.88% <ø> (?)
unittests-Exporter.Instana 71.24% <ø> (?)
unittests-Exporter.OneCollector 93.07% <ø> (?)
unittests-Exporter.Stackdriver 75.73% <ø> (?)
unittests-Extensions 88.57% <ø> (?)
unittests-Extensions.AWS 77.24% <ø> (?)
unittests-Extensions.Enrichment 100.00% <ø> (?)
unittests-Instrumentation.AWS 75.77% <ø> (?)
unittests-Instrumentation.AWSLambda 87.96% <ø> (?)
unittests-Instrumentation.AspNet 77.00% <ø> (?)
unittests-Instrumentation.AspNetCore 85.27% <ø> (?)
unittests-Instrumentation.ElasticsearchClient 79.87% <ø> (?)
unittests-Instrumentation.EntityFrameworkCore 55.49% <ø> (?)
unittests-Instrumentation.EventCounters 76.36% <ø> (?)
unittests-Instrumentation.GrpcNetClient 79.61% <ø> (?)
unittests-Instrumentation.Hangfire 93.58% <ø> (?)
unittests-Instrumentation.Http 82.05% <ø> (?)
unittests-Instrumentation.Owin 85.79% <ø> (?)
unittests-Instrumentation.Process 100.00% <ø> (?)
unittests-Instrumentation.Quartz 78.94% <ø> (?)
unittests-Instrumentation.Runtime 100.00% <ø> (?)
unittests-Instrumentation.SqlClient 91.89% <ø> (?)
unittests-Instrumentation.StackExchangeRedis 67.02% <ø> (?)
unittests-Instrumentation.Wcf 48.91% <ø> (?)
unittests-PersistentStorage 65.44% <ø> (?)
unittests-Resources.AWS 75.88% <ø> (?)
unittests-Resources.Azure 82.83% <ø> (?)
unittests-Resources.Container 72.41% <ø> (?)
unittests-Resources.Gcp 72.54% <ø> (?)
unittests-Resources.Host 73.94% <ø> (?)
unittests-Resources.OperatingSystem 71.87% <ø> (?)
unittests-Resources.Process 100.00% <ø> (?)
unittests-Resources.ProcessRuntime 94.11% <ø> (?)
unittests-Sampler.AWS 88.09% <ø> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

see 337 files with indirect coverage changes

@g7ed6e g7ed6e force-pushed the feature/confluent-kafka-part-2 branch from b587ce5 to 7ede802 Compare July 2, 2024 15:40
@github-actions github-actions bot added infra Infra work - CI/CD, code coverage, linters comp:instrumentation.confluentkafka Things related to OpenTelemetry.Instrumentation.ConfluentKafka labels Jul 2, 2024
@g7ed6e g7ed6e force-pushed the feature/confluent-kafka-part-2 branch from 7ede802 to a312578 Compare July 3, 2024 07:16
@g7ed6e g7ed6e force-pushed the feature/confluent-kafka-part-2 branch from a312578 to c479be6 Compare July 3, 2024 09:21
@github-actions
Copy link
Contributor

This PR was marked stale due to lack of activity. It will be closed in 7 days.

@github-actions github-actions bot added the Stale label Jul 11, 2024
@Kielek Kielek removed the Stale label Jul 11, 2024
@github-actions
Copy link
Contributor

This PR was marked stale due to lack of activity. It will be closed in 7 days.

@github-actions github-actions bot added Stale and removed Stale labels Jul 19, 2024
@g7ed6e g7ed6e force-pushed the feature/confluent-kafka-part-2 branch from c479be6 to 6d6e73e Compare July 24, 2024 11:40
{
await handler(consumeResult, cancellationToken).ConfigureAwait(false);
}
finally
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • If handler throws would we want to flag the Activity as error status? Also if user wants to set the status of the span would we expect them to do that through Activity.Current? Would it be cleaner if we passed in processActivity to the callback?

  • Is there any kind of duration metric defined for processing a message in the semantic conventions? Just wondering if we should tick a histogram here or anything as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we do need to set status to error and also report error.type attribute.

And yep, messaging semconv define metrics. We've revised them recently and it's best to add metrics later (once next semconv version is released or after we freeze conventions which should happen relatively soon).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated with your suggestions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In 1.24.0 we have [messaging.deliver.duration](https://github.com/open-telemetry/semantic-conventions/blob/v1.24.0/docs/messaging/messaging-metrics.md#metric-messagingdeliverduration) that should cover the same duration as process span.

Still, it'll be renamed and I'd consider waiting for stable messaging semconv to implement it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then i will implement this new metric in a PR dedicated to 1.24.0 - 1.26.0 migration

if (TryExtractPropagationContext(consumeResult, out var propagationContext))
{
TKey key = consumeResult.Message.Key;
object? keyAsObject = key;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious why this object? conversion is being performed?

Copy link
Contributor Author

@g7ed6e g7ed6e Jul 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cast is no longer needed as i added a generic parameter to the private StartProcessActivity method to which the key is passed.

using OpenTelemetry.Context.Propagation;
using OpenTelemetry.Trace;

namespace OpenTelemetry.Instrumentation.ConfluentKafka;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this go in the namespace for ConsumeResult<TKey, TValue>? That would make it more discoverable. But we may not want it to be easily discoverable 😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. I moved the class OpenTelemetryConsumeResultExtensions to Confluent.Kafka namespace.

object? keyAsObject = key;
processActivity = StartProcessActivity(
propagationContext,
start,
Copy link
Member

@lmolkova lmolkova Jul 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the reason to pass start time? We don't need to pretend that processing starts before consumption and can use real start timestamp.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! updated

? new[] { new ActivityLink(propagationContext.ActivityContext) }
: Array.Empty<ActivityLink>();

Activity? activity = ConfluentKafkaCommon.ActivitySource.StartActivity(spanName, kind: ActivityKind.Consumer, links: activityLinks, startTime: start, parentContext: default);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you may (but don't have to) also set a parent to the same context as one if the message (if it's a single message).
That's something most of the users seem to want, but some want the opposite.

@lmolkova
Copy link
Member

can you confirm that's OK to output both a receive and a process span for a single message at consumer side ?

In the scope of this PR it looks right. If I understand correctly, ConsumeAndProcessMessageAsync method receives the message with consumer.Consume(...) (this is where receive operation comes from) and then executes user-provided handler which is traced with process span.

That's the same case as described here: https://github.com/open-telemetry/semantic-conventions/blob/main/docs/messaging/kafka.md (but with a later version of semantic conventions, so there are some small changes to naming).

For API like this

await using ServiceBusProcessor processor = client.CreateProcessor(queueName, options);

processor.ProcessMessageAsync += MessageHandler;
processor.ProcessErrorAsync += ErrorHandler;

async Task MessageHandler(ProcessMessageEventArgs args)
{
    string body = args.Message.Body.ToString();
    await args.CompleteMessageAsync(args.Message);
}

where there is no user-facing API provided by the original SDK to receive the message, messages are pushed to user application and receive operation should not be reported.

For APIs that effectively pull messages and then process them, especially if such APIs are not part of the original client library, it's best to report both - receive and process.

Copy link
Member

@CodeBlanch CodeBlanch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM with a CHANGELOG entry

@g7ed6e
Copy link
Contributor Author

g7ed6e commented Aug 20, 2024

LGTM with a CHANGELOG entry

@CodeBlanch done

@g7ed6e g7ed6e force-pushed the feature/confluent-kafka-part-2 branch from a8b727a to 7df435c Compare August 22, 2024 16:17
@Kielek Kielek merged commit 544eb98 into open-telemetry:main Sep 3, 2024
1 check failed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

comp:instrumentation.confluentkafka Things related to OpenTelemetry.Instrumentation.ConfluentKafka infra Infra work - CI/CD, code coverage, linters

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[ConfluentKafka] Implement a process span to ease connecting traces

5 participants