@@ -2,6 +2,7 @@ package nramqp
22
33import (
44 "context"
5+ "strings"
56
67 amqp "github.com/rabbitmq/amqp091-go"
78
@@ -16,7 +17,7 @@ const (
1617
1718func init () { internal .TrackUsage ("integration" , "messagebroker" , "nramqp" ) }
1819
19- func creatProducerSegment (exchange , key string ) * newrelic.MessageProducerSegment {
20+ func createProducerSegment (exchange , key string ) * newrelic.MessageProducerSegment {
2021 s := newrelic.MessageProducerSegment {
2122 Library : RabbitMQLibrary ,
2223 DestinationName : "Default" ,
@@ -33,13 +34,34 @@ func creatProducerSegment(exchange, key string) *newrelic.MessageProducerSegment
3334 return & s
3435}
3536
37+ func GetHostAndPortFromURL (url string ) (string , string ) {
38+ // url is of format amqp://user:password@host:port or amqp://host:port
39+ var hostPortPart string
40+
41+ // extract the part after "@" symbol, if present
42+ if parts := strings .Split (url , "@" ); len (parts ) == 2 {
43+ hostPortPart = parts [1 ]
44+ } else {
45+ // assume the whole url after "amqp://" is the host:port part
46+ hostPortPart = strings .TrimPrefix (url , "amqp://" )
47+ }
48+
49+ // split the host:port part
50+ strippedURL := strings .Split (hostPortPart , ":" )
51+ if len (strippedURL ) != 2 {
52+ return "" , ""
53+ }
54+ return strippedURL [0 ], strippedURL [1 ]
55+ }
56+
3657// PublishedWithContext looks for a newrelic transaction in the context object, and if found, creates a message producer segment.
3758// It will also inject distributed tracing headers into the message.
38- func PublishWithContext (ch * amqp.Channel , ctx context.Context , exchange , key string , mandatory , immediate bool , msg amqp.Publishing ) error {
59+ func PublishWithContext (ch * amqp.Channel , ctx context.Context , exchange , key , url string , mandatory , immediate bool , msg amqp.Publishing ) error {
60+ host , port := GetHostAndPortFromURL (url )
3961 txn := newrelic .FromContext (ctx )
4062 if txn != nil {
4163 // generate message broker segment
42- s := creatProducerSegment (exchange , key )
64+ s := createProducerSegment (exchange , key )
4365
4466 // capture telemetry for AMQP producer
4567 if msg .Headers != nil && len (msg .Headers ) > 0 {
@@ -49,15 +71,18 @@ func PublishWithContext(ch *amqp.Channel, ctx context.Context, exchange, key str
4971 }
5072 integrationsupport .AddAgentSpanAttribute (txn , newrelic .AttributeMessageHeaders , hdrStr )
5173 }
74+ s .StartTime = txn .StartSegmentNow ()
5275
76+ // inject DT headers into headers object
77+ msg .Headers = injectDtHeaders (txn , msg .Headers )
78+ integrationsupport .AddAgentSpanAttribute (txn , newrelic .AttributeSpanKind , "producer" )
79+ integrationsupport .AddAgentSpanAttribute (txn , newrelic .AttributeServerAddress , host )
80+ integrationsupport .AddAgentSpanAttribute (txn , newrelic .AttributeServerPort , port )
81+ integrationsupport .AddAgentSpanAttribute (txn , newrelic .AttributeMessageDestinationName , exchange )
5382 integrationsupport .AddAgentSpanAttribute (txn , newrelic .AttributeMessageRoutingKey , key )
5483 integrationsupport .AddAgentSpanAttribute (txn , newrelic .AttributeMessageCorrelationID , msg .CorrelationId )
5584 integrationsupport .AddAgentSpanAttribute (txn , newrelic .AttributeMessageReplyTo , msg .ReplyTo )
5685
57- // inject DT headers into headers object
58- msg .Headers = injectDtHeaders (txn , msg .Headers )
59-
60- s .StartTime = txn .StartSegmentNow ()
6186 err := ch .PublishWithContext (ctx , exchange , key , mandatory , immediate , msg )
6287 s .End ()
6388 return err
@@ -91,8 +116,10 @@ func Consume(app *newrelic.Application, ch *amqp.Channel, queue, consumer string
91116 integrationsupport .AddAgentAttribute (txn , newrelic .AttributeMessageHeaders , hdrStr , nil )
92117 }
93118 }
94-
119+ integrationsupport . AddAgentAttribute ( txn , newrelic . AttributeSpanKind , "consumer" , nil )
95120 integrationsupport .AddAgentAttribute (txn , newrelic .AttributeMessageQueueName , queue , nil )
121+ integrationsupport .AddAgentAttribute (txn , newrelic .AttributeMessageDestinationName , queue , nil )
122+ integrationsupport .AddAgentAttribute (txn , newrelic .AttributeMessagingDestinationPublishName , delivery .Exchange , nil )
96123 integrationsupport .AddAgentAttribute (txn , newrelic .AttributeMessageRoutingKey , delivery .RoutingKey , nil )
97124 integrationsupport .AddAgentAttribute (txn , newrelic .AttributeMessageCorrelationID , delivery .CorrelationId , nil )
98125 integrationsupport .AddAgentAttribute (txn , newrelic .AttributeMessageReplyTo , delivery .ReplyTo , nil )
0 commit comments