-
Notifications
You must be signed in to change notification settings - Fork 356
Add a process span to ConfluentKafka instrumentation #1937
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a process span to ConfluentKafka instrumentation #1937
Conversation
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #1937 +/- ##
==========================================
- Coverage 73.91% 71.26% -2.66%
==========================================
Files 267 311 +44
Lines 9615 11347 +1732
==========================================
+ Hits 7107 8086 +979
- Misses 2508 3261 +753 Flags with carried forward coverage won't be shown. Click here to find out more. |
b587ce5 to
7ede802
Compare
7ede802 to
a312578
Compare
a312578 to
c479be6
Compare
|
This PR was marked stale due to lack of activity. It will be closed in 7 days. |
|
This PR was marked stale due to lack of activity. It will be closed in 7 days. |
c479be6 to
6d6e73e
Compare
| { | ||
| await handler(consumeResult, cancellationToken).ConfigureAwait(false); | ||
| } | ||
| finally |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-
If handler throws would we want to flag the
Activityas error status? Also if user wants to set the status of the span would we expect them to do that throughActivity.Current? Would it be cleaner if we passed inprocessActivityto the callback? -
Is there any kind of duration metric defined for processing a message in the semantic conventions? Just wondering if we should tick a histogram here or anything as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we do need to set status to error and also report error.type attribute.
And yep, messaging semconv define metrics. We've revised them recently and it's best to add metrics later (once next semconv version is released or after we freeze conventions which should happen relatively soon).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated with your suggestions.
- When the handler throws ths activity status is set to error, the attribute error.type is set valued with the exception type (i attempted here to limit the cardinality).
- The handler delegate type is enriched with the activity we may have created.
- Regarding metric we already implement metrics from 1.24.0 https://github.com/open-telemetry/semantic-conventions/blob/v1.24.0/docs/messaging/messaging-metrics.md and those metrics will be reported.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In 1.24.0 we have [messaging.deliver.duration](https://github.com/open-telemetry/semantic-conventions/blob/v1.24.0/docs/messaging/messaging-metrics.md#metric-messagingdeliverduration) that should cover the same duration as process span.
Still, it'll be renamed and I'd consider waiting for stable messaging semconv to implement it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then i will implement this new metric in a PR dedicated to 1.24.0 - 1.26.0 migration
| if (TryExtractPropagationContext(consumeResult, out var propagationContext)) | ||
| { | ||
| TKey key = consumeResult.Message.Key; | ||
| object? keyAsObject = key; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious why this object? conversion is being performed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This cast is no longer needed as i added a generic parameter to the private StartProcessActivity method to which the key is passed.
| using OpenTelemetry.Context.Propagation; | ||
| using OpenTelemetry.Trace; | ||
|
|
||
| namespace OpenTelemetry.Instrumentation.ConfluentKafka; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this go in the namespace for ConsumeResult<TKey, TValue>? That would make it more discoverable. But we may not want it to be easily discoverable 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea. I moved the class OpenTelemetryConsumeResultExtensions to Confluent.Kafka namespace.
src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedProducer.cs
Show resolved
Hide resolved
| object? keyAsObject = key; | ||
| processActivity = StartProcessActivity( | ||
| propagationContext, | ||
| start, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the reason to pass start time? We don't need to pretend that processing starts before consumption and can use real start timestamp.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point! updated
src/OpenTelemetry.Instrumentation.ConfluentKafka/OpenTelemetryConsumeResultExtensions.cs
Show resolved
Hide resolved
| ? new[] { new ActivityLink(propagationContext.ActivityContext) } | ||
| : Array.Empty<ActivityLink>(); | ||
|
|
||
| Activity? activity = ConfluentKafkaCommon.ActivitySource.StartActivity(spanName, kind: ActivityKind.Consumer, links: activityLinks, startTime: start, parentContext: default); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you may (but don't have to) also set a parent to the same context as one if the message (if it's a single message).
That's something most of the users seem to want, but some want the opposite.
In the scope of this PR it looks right. If I understand correctly, That's the same case as described here: https://github.com/open-telemetry/semantic-conventions/blob/main/docs/messaging/kafka.md (but with a later version of semantic conventions, so there are some small changes to naming). For API like this await using ServiceBusProcessor processor = client.CreateProcessor(queueName, options);
processor.ProcessMessageAsync += MessageHandler;
processor.ProcessErrorAsync += ErrorHandler;
async Task MessageHandler(ProcessMessageEventArgs args)
{
string body = args.Message.Body.ToString();
await args.CompleteMessageAsync(args.Message);
}where there is no user-facing API provided by the original SDK to receive the message, messages are pushed to user application and receive operation should not be reported. For APIs that effectively pull messages and then process them, especially if such APIs are not part of the original client library, it's best to report both - receive and process. |
ccdaf16 to
d0a7ab9
Compare
src/OpenTelemetry.Instrumentation.ConfluentKafka/ConsumeAndProcessMessageHandler.cs
Outdated
Show resolved
Hide resolved
CodeBlanch
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM with a CHANGELOG entry
@CodeBlanch done |
Co-authored-by: Liudmila Molkova <[email protected]>
a8b727a to
7df435c
Compare
Co-authored-by: Piotr Kiełkowicz <[email protected]>
Resolves #1932
@CodeBlanch : I adapted a bit the API you suggested.
@lmolkova : Can you confirm that's OK to output both a receive and a process span for a single message at consumer side ?