@@ -29,7 +29,6 @@ const componentDefine = require("skyapm-nodejs/lib/trace/component-define");
2929 * @author Quanjie.Deng
3030 */
3131module . exports = function ( amqpModule , instrumentation , contextManager ) {
32- console . log ( "amqp hook" ) ;
3332 instrumentation . enhanceMethod ( amqpModule , "createConnection" , wrapCreateConnection ) ;
3433 return amqpModule ;
3534
@@ -39,7 +38,6 @@ module.exports = function(amqpModule, instrumentation, contextManager) {
3938 * @return {* }
4039 */
4140 function wrapCreateConnection ( original ) {
42- console . log ( "amqp createConnection 拦截触发" ) ;
4341 return function ( ) {
4442 let Connection = original . apply ( this , arguments ) ;
4543 enhanceConnectionsMethod ( Connection , instrumentation , contextManager ) ;
@@ -58,61 +56,76 @@ module.exports = function(amqpModule, instrumentation, contextManager) {
5856function enhanceConnectionsMethod ( obj , instrumentation , contextManager ) {
5957 let connection = obj ;
6058 instrumentation . enhanceMethod ( obj , "exchange" , wrapCreateExchange ) ;
61- // instrumentation.enhanceMethod(obj, "queue", wrapCreateQueue);
59+ instrumentation . enhanceMethod ( obj , "queue" , wrapCreateQueue ) ;
6260 return obj ;
6361 /**
6462 * filterParams
6563 * @param {original } original
6664 * @return {* }
6765 */
6866 function wrapCreateExchange ( original ) {
69- console . log ( "amqp exchange 拦截触发" ) ;
7067 return function ( ) {
7168 let exchange = original . apply ( this , arguments ) ;
7269 enhanceExchangeMethod ( connection , exchange , instrumentation , contextManager ) ;
7370 return exchange ;
7471 } ;
7572 }
7673
77- // function wrapCreateQueue(original){
78- // console.log("amqp Queue 拦截触发");
79- // return function(){
80- // let queue = original.apply(this, arguments);
81- // enhanceQueueMethod(connection,queue, instrumentation, contextManager);
82- // return queue;
83- // }
84- // }
74+ /**
75+ * filterParams
76+ * @param {original } original
77+ * @return {* }
78+ */
79+ function wrapCreateQueue ( original ) {
80+ return function ( ) {
81+ let queue = original . apply ( this , arguments ) ;
82+ enhanceQueueMethod ( queue , instrumentation , contextManager ) ;
83+ return queue ;
84+ } ;
85+ }
8586}
8687
87- // function enhanceQueueMethod(connection,obj, instrumentation, contextManager){
88- // let connections = connection;
89- // let queue = obj;
90- // instrumentation.enhanceMethod(obj, "subscribe", wrapQueueSubscribe);
91- // return obj;
88+ /**
89+ * filterParams
90+ * @param {obj } obj
91+ * @param {instrumentation } instrumentation
92+ * @param {contextManager } contextManager
93+ * @return {* }
94+ */
95+ function enhanceQueueMethod ( obj , instrumentation , contextManager ) {
96+ instrumentation . enhanceMethod ( obj , "subscribe" , wrapQueueSubscribe ) ;
97+ return obj ;
98+
99+ /**
100+ * filterParams
101+ * @param {original } original
102+ * @return {* }
103+ */
104+ function wrapQueueSubscribe ( original ) {
105+ return function ( options , messageListener ) {
106+ let optionsNew = function ( message ) {
107+ let contextCarrier = new ContextCarrier ( ) ;
108+ contextCarrier . fetchBy ( function ( key ) {
109+ if ( message . headers . hasOwnProperty ( key ) ) {
110+ return message . headers [ key ] ;
111+ }
112+ return undefined ;
113+ } ) ;
114+
115+ let span = contextManager . createEntrySpan ( obj . name , contextCarrier ) ;
116+ span . component ( componentDefine . Components . AMQP ) ;
117+ span . spanLayer ( layerDefine . Layers . MQ ) ;
92118
93- // function wrapQueueSubscribe(original){
94- // console.log("amqp Queue Subscribe 拦截触发");
95- // return function(options, messageListener){
96- // console.log(`subscribe----options:${options} `);
97- // console.log(`subscribe----messageListener:${messageListener} `);
98- // // let span = contextManager.createExitSpan(routingKey, connections.options.host+":"+connections.options.port);
119+ let res = options . apply ( this , arguments ) ;
120+ contextManager . finishSpan ( span ) ;
121+ return res ;
122+ } ;
99123
100- // // let contextCarrier = new ContextCarrier();
101- // // let span = contextManager.createExitSpan(options.path, (options.hostname || options.host) + ":" + options.port, contextCarrier);
102- // // contextCarrier.pushBy(function(key, value) {
103- // // if (!options.hasOwnProperty("headers") || !options.headers) {
104- // // options.headers = {};
105- // // }
106- // // options.headers[key] = value;
107- // // });
108- // // span.component(componentDefine.Components.HTTP);
109- // // span.spanLayer(layerDefine.Layers.HTTP);
110- // let result = original.apply(this, arguments);
111- // // contextManager.finishSpan(span);
112- // return result;
113- // }
114- // };
115- // }
124+ let result = original . apply ( this , [ optionsNew , messageListener ] ) ;
125+ return result ;
126+ } ;
127+ } ;
128+ }
116129
117130/**
118131 * filterParams
@@ -132,32 +145,24 @@ function enhanceExchangeMethod( connection, obj, instrumentation, contextManager
132145 * @return {* }
133146 */
134147 function wrapExchangePulish ( original ) {
135- console . log ( "amqp exchange-publish 拦截触发" ) ;
136148 return function ( routingKey , data , options , callback ) {
137- console . log ( "amqp wrapRequest function 参数1:" + routingKey ) ;
138- console . log ( "amqp wrapRequest function 参数2:" + JSON . stringify ( data ) ) ;
139- console . log ( "amqp wrapRequest function connections:" + connections . options . host + ":" + connections . options . port ) ;
140149 let enhanceCallback = callback ;
141150 let hasCallback = false ;
142151 let contextCarrier = new ContextCarrier ( ) ;
143- let span = contextManager . createExitSpan ( routingKey , connections . options . host + ":" + connections . options . port ) ;
152+ let span = contextManager . createExitSpan ( routingKey , connections . options . host + ":" + connections . options . port , contextCarrier ) ;
144153 contextCarrier . pushBy ( function ( key , value ) {
145154 if ( ! data . hasOwnProperty ( "headers" ) ) {
146155 data . headers = { } ;
147156 }
148157 data . headers [ key ] = value ;
149- console . log ( "添加 ContextCarrier k-v:" + key + ":" + value ) ;
150158 } ) ;
151- console . log ( "amqp wrapRequest function 参数2-2:" + JSON . stringify ( data ) ) ;
152159 span . component ( componentDefine . Components . AMQP ) ;
153160 span . spanLayer ( layerDefine . Layers . MQ ) ;
154161
155162
156163 if ( typeof callback === "function" ) {
157- console . log ( "amqp publish call_back is function" ) ;
158164 enhanceCallback = instrumentation . enhanceCallback ( span . traceContext ( ) ,
159165 contextManager , function ( ) {
160- console . log ( " exchange-publish call_back 触发" ) ;
161166 contextManager . finishSpan ( span ) ;
162167 return callback . apply ( this , arguments ) ;
163168 } ) ;
0 commit comments