Skip to content

Commit 23d97f0

Browse files
driftxdjatnieks
authored andcommitted
CNDB-7237: add metrics to track replica response sizes
1 parent d2006ac commit 23d97f0

File tree

11 files changed

+639
-0
lines changed

11 files changed

+639
-0
lines changed

src/java/org/apache/cassandra/config/CassandraRelevantProperties.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -670,6 +670,8 @@ public enum CassandraRelevantProperties
670670
REPLACE_ADDRESS_FIRST_BOOT("cassandra.replace_address_first_boot"),
671671
REPLACE_NODE("cassandra.replace_node"),
672672
REPLACE_TOKEN("cassandra.replace_token"),
673+
// Enable/disable replica response size metrics collection
674+
REPLICA_RESPONSE_SIZE_METRICS_ENABLED("cassandra.replica_response_size_metrics_enabled", "true"),
673675
/**
674676
* Number of replicas required to store batchlog for atomicity, only accepts values of 1 or 2.
675677
*/

src/java/org/apache/cassandra/db/MultiRangeReadResponse.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,12 @@ public String toDebugString(ReadCommand command, DecoratedKey key)
108108
throw new UnsupportedOperationException();
109109
}
110110

111+
@Override
112+
public boolean supportsResponseSizeTracking()
113+
{
114+
return false;
115+
}
116+
111117
/**
112118
* A local response that is not meant to be serialized or used for caching remote endpoint's multi-range response.
113119
*/
@@ -235,6 +241,12 @@ public boolean isDigestResponse()
235241
{
236242
return false;
237243
}
244+
245+
@Override
246+
public boolean supportsResponseSizeTracking()
247+
{
248+
return false;
249+
}
238250
}
239251

240252
/**

src/java/org/apache/cassandra/db/ReadResponse.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,18 @@ public static ReadResponse createDigestResponse(UnfilteredPartitionIterator data
8080

8181
public abstract boolean isDigestResponse();
8282

83+
/**
84+
* Indicates whether this response type supports response size tracking for metrics.
85+
* Some response types (like MultiRangeReadResponse) may not support payload size calculation
86+
* and will throw UnsupportedOperationException when attempting to serialize for size calculation.
87+
*
88+
* @return true if this response supports size tracking, false otherwise
89+
*/
90+
public boolean supportsResponseSizeTracking()
91+
{
92+
return true;
93+
}
94+
8395
/**
8496
* Creates a string of the requested partition in this read response suitable for debugging.
8597
*/
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.cassandra.metrics;
19+
20+
import com.codahale.metrics.Counter;
21+
import com.codahale.metrics.Histogram;
22+
23+
import org.apache.cassandra.config.CassandraRelevantProperties;
24+
25+
import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
26+
27+
/**
28+
* Metrics for tracking result sizes coming from replicas/writers to coordinators.
29+
*/
30+
public class ReplicaResponseSizeMetrics
31+
{
32+
private static final String TYPE = "ReplicaResponseSize";
33+
34+
/**
35+
* Controls whether replica response size metrics collection is enabled.
36+
*/
37+
private static final boolean METRICS_ENABLED = CassandraRelevantProperties.REPLICA_RESPONSE_SIZE_METRICS_ENABLED.getBoolean();
38+
39+
/** Total bytes received from replicas in response messages */
40+
public static final Counter totalBytesReceived = Metrics.counter(DefaultNameFactory.createMetricName(TYPE, "TotalBytesReceived", null));
41+
42+
/** Histogram of response sizes from replicas */
43+
public static final Histogram bytesReceivedPerResponse = Metrics.histogram(DefaultNameFactory.createMetricName(TYPE, "BytesReceivedPerResponse", null), true);
44+
45+
/** Total bytes received from replicas in read responses */
46+
public static final Counter readResponseBytesReceived = Metrics.counter(DefaultNameFactory.createMetricName(TYPE, "ReadResponseBytesReceived", null));
47+
48+
/** Histogram of read response sizes from replicas */
49+
public static final Histogram readResponseBytesPerResponse = Metrics.histogram(DefaultNameFactory.createMetricName(TYPE, "ReadResponseBytesPerResponse", null), true);
50+
51+
/** Total bytes received from replicas in write responses */
52+
public static final Counter writeResponseBytesReceived = Metrics.counter(DefaultNameFactory.createMetricName(TYPE, "WriteResponseBytesReceived", null));
53+
54+
/** Histogram of write response sizes from replicas */
55+
public static final Histogram writeResponseBytesPerResponse = Metrics.histogram(DefaultNameFactory.createMetricName(TYPE, "WriteResponseBytesPerResponse", null), true);
56+
57+
/**
58+
* Check if metrics collection is enabled
59+
* @return true if metrics are enabled, false otherwise
60+
*/
61+
public static boolean isMetricsEnabled()
62+
{
63+
return METRICS_ENABLED;
64+
}
65+
66+
/**
67+
* Record the size of a read response received from a replica
68+
* @param responseSize the size of the response in bytes
69+
*/
70+
public static void recordReadResponseSize(int responseSize)
71+
{
72+
if (!METRICS_ENABLED)
73+
return;
74+
75+
totalBytesReceived.inc(responseSize);
76+
bytesReceivedPerResponse.update(responseSize);
77+
readResponseBytesReceived.inc(responseSize);
78+
readResponseBytesPerResponse.update(responseSize);
79+
}
80+
81+
/**
82+
* Record the size of a write response received from a replica
83+
* @param responseSize the size of the response in bytes
84+
*/
85+
public static void recordWriteResponseSize(int responseSize)
86+
{
87+
if (!METRICS_ENABLED)
88+
return;
89+
90+
totalBytesReceived.inc(responseSize);
91+
bytesReceivedPerResponse.update(responseSize);
92+
writeResponseBytesReceived.inc(responseSize);
93+
writeResponseBytesPerResponse.update(responseSize);
94+
}
95+
}

src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@
5151
import org.apache.cassandra.transport.Dispatcher;
5252
import org.apache.cassandra.utils.concurrent.Condition;
5353
import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
54+
import org.apache.cassandra.metrics.ReplicaResponseSizeMetrics;
55+
import org.apache.cassandra.net.MessagingService;
5456

5557
import static java.lang.Long.MAX_VALUE;
5658
import static java.lang.Math.min;
@@ -313,6 +315,24 @@ public Dispatcher.RequestTime getRequestTime()
313315
* null message means "response from local write"
314316
*/
315317
public abstract void onResponse(Message<T> msg);
318+
319+
/**
320+
* Track the size of a response message from a replica
321+
* @param msg the response message
322+
*/
323+
protected void trackReplicaResponseSize(Message<T> msg)
324+
{
325+
if (!ReplicaResponseSizeMetrics.isMetricsEnabled())
326+
return;
327+
328+
// Only track remote responses (local responses have null from field)
329+
// Also check that we have a valid payload and serializer
330+
if (msg != null && msg.from() != null && msg.payload != null && msg.verb().serializer() != null)
331+
{
332+
int responseSize = msg.payloadSize(MessagingService.current_version);
333+
ReplicaResponseSizeMetrics.recordWriteResponseSize(responseSize);
334+
}
335+
}
316336

317337
public void signal()
318338
{

src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ public DatacenterSyncWriteResponseHandler(ReplicaPlan.ForWrite replicaPlan,
7777

7878
public void onResponse(Message<T> message)
7979
{
80+
trackReplicaResponseSize(message);
8081
try
8182
{
8283
String dataCenter = message == null

src/java/org/apache/cassandra/service/WriteResponseHandler.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ public boolean trackLatencyForSnitch(Verb responseVerb, boolean isTimeout)
6868
@Override
6969
public void onResponse(Message<T> m)
7070
{
71+
trackReplicaResponseSize(m);
7172
if (responsesUpdater.decrementAndGet(this) == 0)
7273
signal();
7374
//Must be last after all subclass processing

src/java/org/apache/cassandra/service/reads/ReadCallback.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@
5151
import org.apache.cassandra.transport.Dispatcher;
5252
import org.apache.cassandra.utils.concurrent.Condition;
5353
import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
54+
import org.apache.cassandra.metrics.ReplicaResponseSizeMetrics;
55+
import org.apache.cassandra.net.MessagingService;
5456

5557
import static java.util.concurrent.TimeUnit.MILLISECONDS;
5658
import static java.util.concurrent.atomic.AtomicIntegerFieldUpdater.newUpdater;
@@ -220,7 +222,10 @@ public void onResponse(Message<ReadResponse> message)
220222
return;
221223
}
222224
}
225+
223226
resolver.preprocess(message);
227+
228+
trackReplicaResponseSize(message);
224229

225230
/*
226231
* Ensure that data is present and the response accumulator has properly published the
@@ -246,6 +251,25 @@ private WarningContext getWarningContext()
246251
return current;
247252
}
248253

254+
/**
255+
* Track the size of a response message from a replica
256+
* @param message the response message
257+
*/
258+
private void trackReplicaResponseSize(Message<ReadResponse> message)
259+
{
260+
if (!ReplicaResponseSizeMetrics.isMetricsEnabled())
261+
return;
262+
263+
// Only track remote responses (local responses have null from field)
264+
// check that we have a valid payload and serializer and the response type supports size tracking
265+
if (message != null && message.from() != null && message.payload != null
266+
&& message.verb().serializer() != null && message.payload.supportsResponseSizeTracking())
267+
{
268+
int responseSize = message.payloadSize(MessagingService.current_version);
269+
ReplicaResponseSizeMetrics.recordReadResponseSize(responseSize);
270+
}
271+
}
272+
249273
public void response(ReadResponse result)
250274
{
251275
Verb kind = command.isRangeRequest() ? Verb.RANGE_RSP : Verb.READ_RSP;
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.cassandra.metrics;
19+
20+
import org.junit.BeforeClass;
21+
import org.junit.Test;
22+
23+
import com.codahale.metrics.Snapshot;
24+
25+
import static org.junit.Assert.*;
26+
27+
public class ReplicaResponseSizeMetricsTest
28+
{
29+
@BeforeClass
30+
public static void setup()
31+
{
32+
// Note: Counter metrics cannot be reset, so tests track deltas
33+
}
34+
35+
@Test
36+
public void testReadResponseMetrics()
37+
{
38+
long initialTotal = ReplicaResponseSizeMetrics.totalBytesReceived.getCount();
39+
long initialRead = ReplicaResponseSizeMetrics.readResponseBytesReceived.getCount();
40+
41+
int responseSize = 1024;
42+
ReplicaResponseSizeMetrics.recordReadResponseSize(responseSize);
43+
44+
assertEquals(initialTotal + responseSize, ReplicaResponseSizeMetrics.totalBytesReceived.getCount());
45+
assertEquals(initialRead + responseSize, ReplicaResponseSizeMetrics.readResponseBytesReceived.getCount());
46+
47+
Snapshot readSnapshot = ReplicaResponseSizeMetrics.readResponseBytesPerResponse.getSnapshot();
48+
assertTrue(readSnapshot.size() > 0);
49+
assertTrue(readSnapshot.getMax() >= responseSize);
50+
}
51+
52+
@Test
53+
public void testWriteResponseMetrics()
54+
{
55+
long initialTotal = ReplicaResponseSizeMetrics.totalBytesReceived.getCount();
56+
long initialWrite = ReplicaResponseSizeMetrics.writeResponseBytesReceived.getCount();
57+
58+
int responseSize = 256;
59+
ReplicaResponseSizeMetrics.recordWriteResponseSize(responseSize);
60+
61+
assertEquals(initialTotal + responseSize, ReplicaResponseSizeMetrics.totalBytesReceived.getCount());
62+
assertEquals(initialWrite + responseSize, ReplicaResponseSizeMetrics.writeResponseBytesReceived.getCount());
63+
64+
Snapshot writeSnapshot = ReplicaResponseSizeMetrics.writeResponseBytesPerResponse.getSnapshot();
65+
assertTrue(writeSnapshot.size() > 0);
66+
}
67+
68+
@Test
69+
public void testMultipleResponses()
70+
{
71+
long initialTotal = ReplicaResponseSizeMetrics.totalBytesReceived.getCount();
72+
73+
int[] sizes = {100, 200, 300, 400, 500};
74+
int expectedTotal = 0;
75+
76+
for (int size : sizes)
77+
{
78+
// Alternate between read and write responses
79+
if (size % 2 == 0)
80+
ReplicaResponseSizeMetrics.recordReadResponseSize(size);
81+
else
82+
ReplicaResponseSizeMetrics.recordWriteResponseSize(size);
83+
expectedTotal += size;
84+
}
85+
86+
assertEquals(initialTotal + expectedTotal, ReplicaResponseSizeMetrics.totalBytesReceived.getCount());
87+
88+
Snapshot totalSnapshot = ReplicaResponseSizeMetrics.bytesReceivedPerResponse.getSnapshot();
89+
assertTrue(totalSnapshot.size() >= sizes.length);
90+
assertTrue(totalSnapshot.getMean() > 0);
91+
}
92+
}

0 commit comments

Comments
 (0)