Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,12 @@ public DroppedMessageMetrics(Verb verb)
crossNodeDroppedLatency = Metrics.timer(createMetricName(TYPE, "CrossNodeDroppedLatency", scope));
}
}

public DroppedMessageMetrics(String type, String scope)
{
MetricNameFactory factory = new DefaultNameFactory(type, scope);
dropped = Metrics.meter(factory.createMetricName("Dropped"));
internalDroppedLatency = Metrics.timer(factory.createMetricName("InternalDroppedLatency"));
crossNodeDroppedLatency = Metrics.timer(factory.createMetricName("CrossNodeDroppedLatency"));
}
}
30 changes: 30 additions & 0 deletions src/java/org/apache/cassandra/metrics/MessagingMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ private static final class DroppedForVerb

// total dropped message counts for server lifetime
private final Map<Verb, DroppedForVerb> droppedMessages = new ConcurrentHashMap<>();

// dropped mutations by table
private final Map<String, DroppedMessageMetrics> droppedMutationsByTable = new ConcurrentHashMap<>();

public MessagingMetrics()
{
Expand Down Expand Up @@ -161,6 +164,24 @@ public void recordTotalMessageProcessingTime(Verb verb, InetAddressAndPort from,
public void recordDroppedMessage(Message<?> message, long timeElapsed, TimeUnit timeUnit)
{
recordDroppedMessage(message.verb(), timeElapsed, timeUnit, message.isCrossNode());

if (message.verb() == Verb.MUTATION_REQ && message.payload instanceof org.apache.cassandra.db.Mutation)
{
org.apache.cassandra.db.Mutation mutation = (org.apache.cassandra.db.Mutation) message.payload;
for (org.apache.cassandra.db.partitions.PartitionUpdate update : mutation.getPartitionUpdates())
{
String tableKey = update.metadata().keyspace + '.' + update.metadata().name;
DroppedMessageMetrics tableMetrics = droppedMutationsByTable.get(tableKey);
if (tableMetrics == null)
tableMetrics = droppedMutationsByTable.computeIfAbsent(tableKey,
k -> new DroppedMessageMetrics("DroppedMutations", k));
tableMetrics.dropped.mark();
if (message.isCrossNode())
tableMetrics.crossNodeDroppedLatency.update(timeElapsed, timeUnit);
else
tableMetrics.internalDroppedLatency.update(timeElapsed, timeUnit);
}
}
}

public void recordDroppedMessage(Verb verb, long timeElapsed, TimeUnit timeUnit, boolean isCrossNode)
Expand Down Expand Up @@ -201,6 +222,14 @@ public Map<String, Integer> getDroppedMessages()
map.put(entry.getKey().toString(), (int) entry.getValue().metrics.dropped.getCount());
return map;
}

public Map<String, Long> getDroppedMutationsByTable()
{
Map<String, Long> map = new HashMap<>(droppedMutationsByTable.size());
for (Map.Entry<String, DroppedMessageMetrics> entry : droppedMutationsByTable.entrySet())
map.put(entry.getKey(), entry.getValue().dropped.getCount());
return map;
}

private void logDroppedMessages()
{
Expand Down Expand Up @@ -239,5 +268,6 @@ public int resetAndConsumeDroppedErrors(Consumer<String> messageConsumer)
public void resetDroppedMessages()
{
droppedMessages.replaceAll((u, v) -> new DroppedForVerb(new DroppedMessageMetrics(u)));
droppedMutationsByTable.clear();
}
}
5 changes: 5 additions & 0 deletions src/java/org/apache/cassandra/net/MessagingServiceMBean.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ public interface MessagingServiceMBean
* dropped message counts for server lifetime
*/
public Map<String, Integer> getDroppedMessages();

/**
* dropped mutation counts by table for server lifetime
*/
public Map<String, Long> getDroppedMutationsByTable();

/**
* Total number of timeouts happened on this node
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,12 @@ public Map<String, Integer> getDroppedMessages()
{
return metrics.getDroppedMessages();
}

@Override
public Map<String, Long> getDroppedMutationsByTable()
{
return metrics.getDroppedMutationsByTable();
}

@Override
public long getTotalTimeouts()
Expand Down
176 changes: 176 additions & 0 deletions test/unit/org/apache/cassandra/net/MessagingServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,15 @@
import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.RowUpdateBuilder;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.metrics.MessagingMetrics;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.schema.MockSchema;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.utils.FBUtilities;
import org.awaitility.Awaitility;
import org.caffinitas.ohc.histo.EstimatedHistogram;
Expand Down Expand Up @@ -589,4 +593,176 @@ private static void sendMessages(int numOfMessages, Verb verb) throws UnknownHos
MessagingService.instance().send(Message.out(verb, noPayload), address);
}
}

@Test
public void testDroppedMutationsTrackedByTable()
{
// Reset metrics to ensure clean state
messagingService.metrics.resetDroppedMessages();

// Create table metadata for a test table with unique name
TableMetadata metadata = MockSchema.newTableMetadata("test_ks_drops", "test_table_drops");
String tableKey = metadata.keyspace + "." + metadata.name;

// Create a mutation for the table
Mutation mutation = new RowUpdateBuilder(metadata, 0, "key1")
.clustering("col1")
.add("value", "test_value")
.build();

// Create a message with the mutation
Message<Mutation> message = Message.builder(Verb.MUTATION_REQ, mutation).build();

// Record the mutation as dropped
messagingService.metrics.recordDroppedMessage(message, 100, MILLISECONDS);

// Verify the table-specific metric was updated
Map<String, Long> droppedByTable = messagingService.metrics.getDroppedMutationsByTable();
assertNotNull(droppedByTable);
assertEquals(Long.valueOf(1L), droppedByTable.get(tableKey));

// Drop another mutation for the same table
messagingService.metrics.recordDroppedMessage(message, 200, MILLISECONDS);

// Verify the counter accumulated
droppedByTable = messagingService.metrics.getDroppedMutationsByTable();
assertEquals(Long.valueOf(2L), droppedByTable.get(tableKey));
}

@Test
public void testMultipleTablesTrackedIndependently()
{
// Reset metrics to ensure clean state
messagingService.metrics.resetDroppedMessages();

// Create metadata for multiple tables
TableMetadata table1 = MockSchema.newTableMetadata("ks1", "table1");
TableMetadata table2 = MockSchema.newTableMetadata("ks1", "table2");
TableMetadata table3 = MockSchema.newTableMetadata("ks2", "table1");

String tableKey1 = table1.keyspace + "." + table1.name;
String tableKey2 = table2.keyspace + "." + table2.name;
String tableKey3 = table3.keyspace + "." + table3.name;

// Create mutations for each table
Mutation mutation1 = new RowUpdateBuilder(table1, 0, "key1").clustering("col1").add("value", "val1").build();
Mutation mutation2 = new RowUpdateBuilder(table2, 0, "key2").clustering("col1").add("value", "val2").build();
Mutation mutation3 = new RowUpdateBuilder(table3, 0, "key3").clustering("col1").add("value", "val3").build();

// Drop mutations for different tables
messagingService.metrics.recordDroppedMessage(Message.builder(Verb.MUTATION_REQ, mutation1).build(), 100, MILLISECONDS);
messagingService.metrics.recordDroppedMessage(Message.builder(Verb.MUTATION_REQ, mutation1).build(), 100, MILLISECONDS);
messagingService.metrics.recordDroppedMessage(Message.builder(Verb.MUTATION_REQ, mutation2).build(), 100, MILLISECONDS);
messagingService.metrics.recordDroppedMessage(Message.builder(Verb.MUTATION_REQ, mutation3).build(), 100, MILLISECONDS);
messagingService.metrics.recordDroppedMessage(Message.builder(Verb.MUTATION_REQ, mutation3).build(), 100, MILLISECONDS);
messagingService.metrics.recordDroppedMessage(Message.builder(Verb.MUTATION_REQ, mutation3).build(), 100, MILLISECONDS);

// Verify each table tracked independently
Map<String, Long> droppedByTable = messagingService.metrics.getDroppedMutationsByTable();
assertEquals(Long.valueOf(2L), droppedByTable.get(tableKey1));
assertEquals(Long.valueOf(1L), droppedByTable.get(tableKey2));
assertEquals(Long.valueOf(3L), droppedByTable.get(tableKey3));
}

@Test
public void testCrossNodeVsInternalLatencyPerTable()
{
// Reset metrics to ensure clean state
messagingService.metrics.resetDroppedMessages();

TableMetadata metadata = MockSchema.newTableMetadata("test_ks", "test_table");
String tableKey = metadata.keyspace + "." + metadata.name;

Mutation mutation = new RowUpdateBuilder(metadata, 0, "key1")
.clustering("col1")
.add("value", "test_value")
.build();

Message<Mutation> crossNodeMessage = Message.builder(Verb.MUTATION_REQ, mutation)
.from(InetAddressAndPort.getLocalHost())
.build();
Message<Mutation> internalMessage = Message.builder(Verb.MUTATION_REQ, mutation).build();

// Record cross-node dropped mutations
messagingService.metrics.recordDroppedMessage(crossNodeMessage, 100, MILLISECONDS);
messagingService.metrics.recordDroppedMessage(crossNodeMessage, 150, MILLISECONDS);

// Record internal dropped mutations
messagingService.metrics.recordDroppedMessage(internalMessage, 50, MILLISECONDS);

// Verify the table-specific metric was updated for both types
Map<String, Long> droppedByTable = messagingService.metrics.getDroppedMutationsByTable();
assertEquals(Long.valueOf(3L), droppedByTable.get(tableKey));
}

@Test
public void testMultiplePartitionUpdatesInSingleMutation()
{
// Reset metrics to ensure clean state
messagingService.metrics.resetDroppedMessages();

// Create two different tables in the same keyspace with unique names
TableMetadata table1 = MockSchema.newTableMetadata("ks_multi_part", "table1_multi");
TableMetadata table2 = MockSchema.newTableMetadata("ks_multi_part", "table2_multi");

String tableKey1 = table1.keyspace + "." + table1.name;
String tableKey2 = table2.keyspace + "." + table2.name;

// Create mutations for each table with the same partition key
Mutation mutation1 = new RowUpdateBuilder(table1, 0, "key1").clustering("col1").add("value", "val1").build();
Mutation mutation2 = new RowUpdateBuilder(table2, 0, "key1").clustering("col1").add("value", "val2").build();

// Merge mutations into a single batch mutation (requires same keyspace and key)
Mutation batchMutation = Mutation.merge(Arrays.asList(mutation1, mutation2));

// Drop the batch mutation
Message<Mutation> message = Message.builder(Verb.MUTATION_REQ, batchMutation).build();
messagingService.metrics.recordDroppedMessage(message, 100, MILLISECONDS);

// Verify both tables are tracked (each partition update counted)
Map<String, Long> droppedByTable = messagingService.metrics.getDroppedMutationsByTable();
assertEquals(Long.valueOf(1L), droppedByTable.get(tableKey1));
assertEquals(Long.valueOf(1L), droppedByTable.get(tableKey2));
}

@Test
public void testMBeanExposesDroppedMutationsByTable()
{
// Reset metrics to ensure clean state
messagingService.metrics.resetDroppedMessages();

// Create table metadata with unique name
TableMetadata metadata = MockSchema.newTableMetadata("test_ks_mbean", "test_table_mbean");
String tableKey = metadata.keyspace + "." + metadata.name;

// Create and drop a mutation
Mutation mutation = new RowUpdateBuilder(metadata, 0, "key1")
.clustering("col1")
.add("value", "test_value")
.build();
Message<Mutation> message = Message.builder(Verb.MUTATION_REQ, mutation).build();
messagingService.metrics.recordDroppedMessage(message, 100, MILLISECONDS);

// Access via MBean interface
MessagingServiceMBean mbean = new MessagingServiceMBeanImpl(true, messagingService.versions, messagingService.metrics);
Map<String, Long> droppedByTable = mbean.getDroppedMutationsByTable();

// Verify the data is accessible through MBean
assertNotNull(droppedByTable);
assertEquals(Long.valueOf(1L), droppedByTable.get(tableKey));
}

@Test
public void testEmptyDroppedMutationsByTableMap()
{
// Reset metrics to ensure clean state
messagingService.metrics.resetDroppedMessages();

// Get dropped mutations by table when no mutations have been dropped
Map<String, Long> droppedByTable = messagingService.metrics.getDroppedMutationsByTable();

// Verify it returns an empty map (not null)
assertNotNull(droppedByTable);
assertEquals(0, droppedByTable.size());
}
}