Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,8 @@ public enum CassandraRelevantProperties
REPLACE_ADDRESS_FIRST_BOOT("cassandra.replace_address_first_boot"),
REPLACE_NODE("cassandra.replace_node"),
REPLACE_TOKEN("cassandra.replace_token"),
// Enable/disable replica response size metrics collection
REPLICA_RESPONSE_SIZE_METRICS_ENABLED("cassandra.replica_response_size_metrics_enabled", "true"),
/**
* Number of replicas required to store batchlog for atomicity, only accepts values of 1 or 2.
*/
Expand Down
12 changes: 12 additions & 0 deletions src/java/org/apache/cassandra/db/MultiRangeReadResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,12 @@ public String toDebugString(ReadCommand command, DecoratedKey key)
throw new UnsupportedOperationException();
}

@Override
public boolean supportsResponseSizeTracking()
{
return false;
}

/**
* A local response that is not meant to be serialized or used for caching remote endpoint's multi-range response.
*/
Expand Down Expand Up @@ -235,6 +241,12 @@ public boolean isDigestResponse()
{
return false;
}

@Override
public boolean supportsResponseSizeTracking()
{
return false;
}
}

/**
Expand Down
12 changes: 12 additions & 0 deletions src/java/org/apache/cassandra/db/ReadResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,18 @@ public static ReadResponse createDigestResponse(UnfilteredPartitionIterator data

public abstract boolean isDigestResponse();

/**
* Indicates whether this response type supports response size tracking for metrics.
* Some response types (like MultiRangeReadResponse) may not support payload size calculation
* and will throw UnsupportedOperationException when attempting to serialize for size calculation.
*
* @return true if this response supports size tracking, false otherwise
*/
public boolean supportsResponseSizeTracking()
{
return true;
}

/**
* Creates a string of the requested partition in this read response suitable for debugging.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.metrics;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram;

import org.apache.cassandra.config.CassandraRelevantProperties;

import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;

/**
* Metrics for tracking result sizes coming from replicas/writers to coordinators.
*/
public class ReplicaResponseSizeMetrics
{
private static final String TYPE = "ReplicaResponseSize";

/**
* Controls whether replica response size metrics collection is enabled.
*/
private static final boolean METRICS_ENABLED = CassandraRelevantProperties.REPLICA_RESPONSE_SIZE_METRICS_ENABLED.getBoolean();

/** Total bytes received from replicas in response messages */
public static final Counter totalBytesReceived = Metrics.counter(DefaultNameFactory.createMetricName(TYPE, "TotalBytesReceived", null));

/** Histogram of response sizes from replicas */
public static final Histogram bytesReceivedPerResponse = Metrics.histogram(DefaultNameFactory.createMetricName(TYPE, "BytesReceivedPerResponse", null), true);

/** Total bytes received from replicas in read responses */
public static final Counter readResponseBytesReceived = Metrics.counter(DefaultNameFactory.createMetricName(TYPE, "ReadResponseBytesReceived", null));

/** Histogram of read response sizes from replicas */
public static final Histogram readResponseBytesPerResponse = Metrics.histogram(DefaultNameFactory.createMetricName(TYPE, "ReadResponseBytesPerResponse", null), true);

/** Total bytes received from replicas in write responses */
public static final Counter writeResponseBytesReceived = Metrics.counter(DefaultNameFactory.createMetricName(TYPE, "WriteResponseBytesReceived", null));

/** Histogram of write response sizes from replicas */
public static final Histogram writeResponseBytesPerResponse = Metrics.histogram(DefaultNameFactory.createMetricName(TYPE, "WriteResponseBytesPerResponse", null), true);

/**
* Check if metrics collection is enabled
* @return true if metrics are enabled, false otherwise
*/
public static boolean isMetricsEnabled()
{
return METRICS_ENABLED;
}

/**
* Record the size of a read response received from a replica
* @param responseSize the size of the response in bytes
*/
public static void recordReadResponseSize(int responseSize)
{
if (!METRICS_ENABLED)
return;

totalBytesReceived.inc(responseSize);
bytesReceivedPerResponse.update(responseSize);
readResponseBytesReceived.inc(responseSize);
readResponseBytesPerResponse.update(responseSize);
}

/**
* Record the size of a write response received from a replica
* @param responseSize the size of the response in bytes
*/
public static void recordWriteResponseSize(int responseSize)
{
if (!METRICS_ENABLED)
return;

totalBytesReceived.inc(responseSize);
bytesReceivedPerResponse.update(responseSize);
writeResponseBytesReceived.inc(responseSize);
writeResponseBytesPerResponse.update(responseSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
import org.apache.cassandra.transport.Dispatcher;
import org.apache.cassandra.utils.concurrent.Condition;
import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
import org.apache.cassandra.metrics.ReplicaResponseSizeMetrics;
import org.apache.cassandra.net.MessagingService;

import static java.lang.Long.MAX_VALUE;
import static java.lang.Math.min;
Expand Down Expand Up @@ -313,6 +315,24 @@ public Dispatcher.RequestTime getRequestTime()
* null message means "response from local write"
*/
public abstract void onResponse(Message<T> msg);

/**
* Track the size of a response message from a replica
* @param msg the response message
*/
protected void trackReplicaResponseSize(Message<T> msg)
{
if (!ReplicaResponseSizeMetrics.isMetricsEnabled())
return;

// Only track remote responses (local responses have null from field)
// Also check that we have a valid payload and serializer
if (msg != null && msg.from() != null && msg.payload != null && msg.verb().serializer() != null)
{
int responseSize = msg.payloadSize(MessagingService.current_version);
ReplicaResponseSizeMetrics.recordWriteResponseSize(responseSize);
}
}

public void signal()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public DatacenterSyncWriteResponseHandler(ReplicaPlan.ForWrite replicaPlan,

public void onResponse(Message<T> message)
{
trackReplicaResponseSize(message);
try
{
String dataCenter = message == null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public boolean trackLatencyForSnitch(Verb responseVerb, boolean isTimeout)
@Override
public void onResponse(Message<T> m)
{
trackReplicaResponseSize(m);
if (responsesUpdater.decrementAndGet(this) == 0)
signal();
//Must be last after all subclass processing
Expand Down
24 changes: 24 additions & 0 deletions src/java/org/apache/cassandra/service/reads/ReadCallback.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
import org.apache.cassandra.transport.Dispatcher;
import org.apache.cassandra.utils.concurrent.Condition;
import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
import org.apache.cassandra.metrics.ReplicaResponseSizeMetrics;
import org.apache.cassandra.net.MessagingService;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.atomic.AtomicIntegerFieldUpdater.newUpdater;
Expand Down Expand Up @@ -220,7 +222,10 @@ public void onResponse(Message<ReadResponse> message)
return;
}
}

resolver.preprocess(message);

trackReplicaResponseSize(message);

/*
* Ensure that data is present and the response accumulator has properly published the
Expand All @@ -246,6 +251,25 @@ private WarningContext getWarningContext()
return current;
}

/**
* Track the size of a response message from a replica
* @param message the response message
*/
private void trackReplicaResponseSize(Message<ReadResponse> message)
{
if (!ReplicaResponseSizeMetrics.isMetricsEnabled())
return;

// Only track remote responses (local responses have null from field)
// check that we have a valid payload and serializer and the response type supports size tracking
if (message != null && message.from() != null && message.payload != null
&& message.verb().serializer() != null && message.payload.supportsResponseSizeTracking())
{
int responseSize = message.payloadSize(MessagingService.current_version);
ReplicaResponseSizeMetrics.recordReadResponseSize(responseSize);
}
}

public void response(ReadResponse result)
{
Verb kind = command.isRangeRequest() ? Verb.RANGE_RSP : Verb.READ_RSP;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.metrics;

import org.junit.BeforeClass;
import org.junit.Test;

import com.codahale.metrics.Snapshot;

import static org.junit.Assert.*;

public class ReplicaResponseSizeMetricsTest
{
@BeforeClass
public static void setup()
{
// Note: Counter metrics cannot be reset, so tests track deltas
}

@Test
public void testReadResponseMetrics()
{
long initialTotal = ReplicaResponseSizeMetrics.totalBytesReceived.getCount();
long initialRead = ReplicaResponseSizeMetrics.readResponseBytesReceived.getCount();

int responseSize = 1024;
ReplicaResponseSizeMetrics.recordReadResponseSize(responseSize);

assertEquals(initialTotal + responseSize, ReplicaResponseSizeMetrics.totalBytesReceived.getCount());
assertEquals(initialRead + responseSize, ReplicaResponseSizeMetrics.readResponseBytesReceived.getCount());

Snapshot readSnapshot = ReplicaResponseSizeMetrics.readResponseBytesPerResponse.getSnapshot();
assertTrue(readSnapshot.size() > 0);
assertTrue(readSnapshot.getMax() >= responseSize);
}

@Test
public void testWriteResponseMetrics()
{
long initialTotal = ReplicaResponseSizeMetrics.totalBytesReceived.getCount();
long initialWrite = ReplicaResponseSizeMetrics.writeResponseBytesReceived.getCount();

int responseSize = 256;
ReplicaResponseSizeMetrics.recordWriteResponseSize(responseSize);

assertEquals(initialTotal + responseSize, ReplicaResponseSizeMetrics.totalBytesReceived.getCount());
assertEquals(initialWrite + responseSize, ReplicaResponseSizeMetrics.writeResponseBytesReceived.getCount());

Snapshot writeSnapshot = ReplicaResponseSizeMetrics.writeResponseBytesPerResponse.getSnapshot();
assertTrue(writeSnapshot.size() > 0);
}

@Test
public void testMultipleResponses()
{
long initialTotal = ReplicaResponseSizeMetrics.totalBytesReceived.getCount();

int[] sizes = {100, 200, 300, 400, 500};
int expectedTotal = 0;

for (int size : sizes)
{
// Alternate between read and write responses
if (size % 2 == 0)
ReplicaResponseSizeMetrics.recordReadResponseSize(size);
else
ReplicaResponseSizeMetrics.recordWriteResponseSize(size);
expectedTotal += size;
}

assertEquals(initialTotal + expectedTotal, ReplicaResponseSizeMetrics.totalBytesReceived.getCount());

Snapshot totalSnapshot = ReplicaResponseSizeMetrics.bytesReceivedPerResponse.getSnapshot();
assertTrue(totalSnapshot.size() >= sizes.length);
assertTrue(totalSnapshot.getMean() > 0);
}
}
Loading