|
52 | 52 | import org.apache.cassandra.schema.Schema; |
53 | 53 | import org.apache.cassandra.schema.TableId; |
54 | 54 | import org.apache.cassandra.schema.TableMetadata; |
| 55 | +import org.apache.cassandra.sensors.Context; |
| 56 | +import org.apache.cassandra.sensors.RequestSensors; |
| 57 | +import org.apache.cassandra.sensors.RequestTracker; |
| 58 | +import org.apache.cassandra.sensors.SensorsCustomParams; |
| 59 | +import org.apache.cassandra.sensors.SensorsFactory; |
| 60 | +import org.apache.cassandra.sensors.Type; |
55 | 61 | import org.apache.cassandra.service.PendingRangeCalculatorService; |
56 | 62 | import org.apache.cassandra.service.paxos.PaxosPrepare.Status.Outcome; |
57 | 63 | import org.apache.cassandra.tracing.Tracing; |
@@ -1021,11 +1027,33 @@ public static class RequestHandler implements IVerbHandler<Request> |
1021 | 1027 | @Override |
1022 | 1028 | public void doVerb(Message<Request> message) |
1023 | 1029 | { |
| 1030 | + // Initialize the sensor and set ExecutorLocals |
| 1031 | + RequestSensors sensors = SensorsFactory.instance.createRequestSensors(message.payload.table.keyspace); |
| 1032 | + Context context = Context.from(message.payload.table); |
| 1033 | + |
| 1034 | + // Prepare phase incorporates a read to check the cas condition, so a read sensor is registered in addition to the write sensor |
| 1035 | + sensors.registerSensor(context, Type.READ_BYTES); |
| 1036 | + sensors.registerSensor(context, Type.WRITE_BYTES); |
| 1037 | + sensors.registerSensor(context, Type.INTERNODE_BYTES); |
| 1038 | + sensors.incrementSensor(context, Type.INTERNODE_BYTES, message.payloadSize(MessagingService.current_version)); |
| 1039 | + RequestTracker.instance.set(sensors); |
| 1040 | + |
1024 | 1041 | Response response = execute(message.payload, message.from()); |
1025 | | - if (response == null) |
1026 | | - MessagingService.instance().respondWithFailure(UNKNOWN, message); |
| 1042 | + |
| 1043 | + // calculate outbound internode bytes before adding the sensor to the response |
| 1044 | + if (response != null) |
| 1045 | + { |
| 1046 | + Message.Builder<Response> reply = message.responseWithBuilder(response); |
| 1047 | + int size = reply.currentPayloadSize(MessagingService.current_version); |
| 1048 | + sensors.incrementSensor(context, Type.INTERNODE_BYTES, size); |
| 1049 | + sensors.syncAllSensors(); |
| 1050 | + SensorsCustomParams.addSensorsToInternodeResponse(sensors, reply); |
| 1051 | + MessagingService.instance().send(reply.build(), message.from()); |
| 1052 | + } |
1027 | 1053 | else |
1028 | | - MessagingService.instance().respond(response, message); |
| 1054 | + { |
| 1055 | + MessagingService.instance().respondWithFailure(UNKNOWN, message); |
| 1056 | + } |
1029 | 1057 | } |
1030 | 1058 |
|
1031 | 1059 | static Response execute(AbstractRequest<?> request, InetAddressAndPort from) |
|
0 commit comments