diff --git a/src/java/org/apache/cassandra/metrics/DroppedMessageMetrics.java b/src/java/org/apache/cassandra/metrics/DroppedMessageMetrics.java index 6e288734e0ba..03ab030d55fe 100644 --- a/src/java/org/apache/cassandra/metrics/DroppedMessageMetrics.java +++ b/src/java/org/apache/cassandra/metrics/DroppedMessageMetrics.java @@ -86,4 +86,12 @@ public DroppedMessageMetrics(Verb verb) crossNodeDroppedLatency = Metrics.timer(createMetricName(TYPE, "CrossNodeDroppedLatency", scope)); } } + + public DroppedMessageMetrics(String type, String scope) + { + MetricNameFactory factory = new DefaultNameFactory(type, scope); + dropped = Metrics.meter(factory.createMetricName("Dropped")); + internalDroppedLatency = Metrics.timer(factory.createMetricName("InternalDroppedLatency")); + crossNodeDroppedLatency = Metrics.timer(factory.createMetricName("CrossNodeDroppedLatency")); + } } diff --git a/src/java/org/apache/cassandra/metrics/MessagingMetrics.java b/src/java/org/apache/cassandra/metrics/MessagingMetrics.java index 682ee421cbaf..fd34bd9b330b 100644 --- a/src/java/org/apache/cassandra/metrics/MessagingMetrics.java +++ b/src/java/org/apache/cassandra/metrics/MessagingMetrics.java @@ -99,6 +99,9 @@ private static final class DroppedForVerb // total dropped message counts for server lifetime private final Map droppedMessages = new ConcurrentHashMap<>(); + + // dropped mutations by table + private final Map droppedMutationsByTable = new ConcurrentHashMap<>(); public MessagingMetrics() { @@ -161,6 +164,24 @@ public void recordTotalMessageProcessingTime(Verb verb, InetAddressAndPort from, public void recordDroppedMessage(Message message, long timeElapsed, TimeUnit timeUnit) { recordDroppedMessage(message.verb(), timeElapsed, timeUnit, message.isCrossNode()); + + if (message.verb() == Verb.MUTATION_REQ && message.payload instanceof org.apache.cassandra.db.Mutation) + { + org.apache.cassandra.db.Mutation mutation = (org.apache.cassandra.db.Mutation) message.payload; + for (org.apache.cassandra.db.partitions.PartitionUpdate update : mutation.getPartitionUpdates()) + { + String tableKey = update.metadata().keyspace + '.' + update.metadata().name; + DroppedMessageMetrics tableMetrics = droppedMutationsByTable.get(tableKey); + if (tableMetrics == null) + tableMetrics = droppedMutationsByTable.computeIfAbsent(tableKey, + k -> new DroppedMessageMetrics("DroppedMutations", k)); + tableMetrics.dropped.mark(); + if (message.isCrossNode()) + tableMetrics.crossNodeDroppedLatency.update(timeElapsed, timeUnit); + else + tableMetrics.internalDroppedLatency.update(timeElapsed, timeUnit); + } + } } public void recordDroppedMessage(Verb verb, long timeElapsed, TimeUnit timeUnit, boolean isCrossNode) @@ -201,6 +222,14 @@ public Map getDroppedMessages() map.put(entry.getKey().toString(), (int) entry.getValue().metrics.dropped.getCount()); return map; } + + public Map getDroppedMutationsByTable() + { + Map map = new HashMap<>(droppedMutationsByTable.size()); + for (Map.Entry entry : droppedMutationsByTable.entrySet()) + map.put(entry.getKey(), entry.getValue().dropped.getCount()); + return map; + } private void logDroppedMessages() { @@ -239,5 +268,6 @@ public int resetAndConsumeDroppedErrors(Consumer messageConsumer) public void resetDroppedMessages() { droppedMessages.replaceAll((u, v) -> new DroppedForVerb(new DroppedMessageMetrics(u))); + droppedMutationsByTable.clear(); } } diff --git a/src/java/org/apache/cassandra/net/MessagingServiceMBean.java b/src/java/org/apache/cassandra/net/MessagingServiceMBean.java index 55e3a063673c..1e57835e1fd1 100644 --- a/src/java/org/apache/cassandra/net/MessagingServiceMBean.java +++ b/src/java/org/apache/cassandra/net/MessagingServiceMBean.java @@ -108,6 +108,11 @@ public interface MessagingServiceMBean * dropped message counts for server lifetime */ public Map getDroppedMessages(); + + /** + * dropped mutation counts by table for server lifetime + */ + public Map getDroppedMutationsByTable(); /** * Total number of timeouts happened on this node diff --git a/src/java/org/apache/cassandra/net/MessagingServiceMBeanImpl.java b/src/java/org/apache/cassandra/net/MessagingServiceMBeanImpl.java index 3b89834a3116..b762332edbb5 100644 --- a/src/java/org/apache/cassandra/net/MessagingServiceMBeanImpl.java +++ b/src/java/org/apache/cassandra/net/MessagingServiceMBeanImpl.java @@ -219,6 +219,12 @@ public Map getDroppedMessages() { return metrics.getDroppedMessages(); } + + @Override + public Map getDroppedMutationsByTable() + { + return metrics.getDroppedMutationsByTable(); + } @Override public long getTotalTimeouts() diff --git a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java index 7db3bb12f7fb..60c98f50bba4 100644 --- a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java +++ b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java @@ -50,11 +50,15 @@ import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.RowUpdateBuilder; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.metrics.MessagingMetrics; import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.schema.MockSchema; +import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.utils.FBUtilities; import org.awaitility.Awaitility; import org.caffinitas.ohc.histo.EstimatedHistogram; @@ -589,4 +593,176 @@ private static void sendMessages(int numOfMessages, Verb verb) throws UnknownHos MessagingService.instance().send(Message.out(verb, noPayload), address); } } + + @Test + public void testDroppedMutationsTrackedByTable() + { + // Reset metrics to ensure clean state + messagingService.metrics.resetDroppedMessages(); + + // Create table metadata for a test table with unique name + TableMetadata metadata = MockSchema.newTableMetadata("test_ks_drops", "test_table_drops"); + String tableKey = metadata.keyspace + "." + metadata.name; + + // Create a mutation for the table + Mutation mutation = new RowUpdateBuilder(metadata, 0, "key1") + .clustering("col1") + .add("value", "test_value") + .build(); + + // Create a message with the mutation + Message message = Message.builder(Verb.MUTATION_REQ, mutation).build(); + + // Record the mutation as dropped + messagingService.metrics.recordDroppedMessage(message, 100, MILLISECONDS); + + // Verify the table-specific metric was updated + Map droppedByTable = messagingService.metrics.getDroppedMutationsByTable(); + assertNotNull(droppedByTable); + assertEquals(Long.valueOf(1L), droppedByTable.get(tableKey)); + + // Drop another mutation for the same table + messagingService.metrics.recordDroppedMessage(message, 200, MILLISECONDS); + + // Verify the counter accumulated + droppedByTable = messagingService.metrics.getDroppedMutationsByTable(); + assertEquals(Long.valueOf(2L), droppedByTable.get(tableKey)); + } + + @Test + public void testMultipleTablesTrackedIndependently() + { + // Reset metrics to ensure clean state + messagingService.metrics.resetDroppedMessages(); + + // Create metadata for multiple tables + TableMetadata table1 = MockSchema.newTableMetadata("ks1", "table1"); + TableMetadata table2 = MockSchema.newTableMetadata("ks1", "table2"); + TableMetadata table3 = MockSchema.newTableMetadata("ks2", "table1"); + + String tableKey1 = table1.keyspace + "." + table1.name; + String tableKey2 = table2.keyspace + "." + table2.name; + String tableKey3 = table3.keyspace + "." + table3.name; + + // Create mutations for each table + Mutation mutation1 = new RowUpdateBuilder(table1, 0, "key1").clustering("col1").add("value", "val1").build(); + Mutation mutation2 = new RowUpdateBuilder(table2, 0, "key2").clustering("col1").add("value", "val2").build(); + Mutation mutation3 = new RowUpdateBuilder(table3, 0, "key3").clustering("col1").add("value", "val3").build(); + + // Drop mutations for different tables + messagingService.metrics.recordDroppedMessage(Message.builder(Verb.MUTATION_REQ, mutation1).build(), 100, MILLISECONDS); + messagingService.metrics.recordDroppedMessage(Message.builder(Verb.MUTATION_REQ, mutation1).build(), 100, MILLISECONDS); + messagingService.metrics.recordDroppedMessage(Message.builder(Verb.MUTATION_REQ, mutation2).build(), 100, MILLISECONDS); + messagingService.metrics.recordDroppedMessage(Message.builder(Verb.MUTATION_REQ, mutation3).build(), 100, MILLISECONDS); + messagingService.metrics.recordDroppedMessage(Message.builder(Verb.MUTATION_REQ, mutation3).build(), 100, MILLISECONDS); + messagingService.metrics.recordDroppedMessage(Message.builder(Verb.MUTATION_REQ, mutation3).build(), 100, MILLISECONDS); + + // Verify each table tracked independently + Map droppedByTable = messagingService.metrics.getDroppedMutationsByTable(); + assertEquals(Long.valueOf(2L), droppedByTable.get(tableKey1)); + assertEquals(Long.valueOf(1L), droppedByTable.get(tableKey2)); + assertEquals(Long.valueOf(3L), droppedByTable.get(tableKey3)); + } + + @Test + public void testCrossNodeVsInternalLatencyPerTable() + { + // Reset metrics to ensure clean state + messagingService.metrics.resetDroppedMessages(); + + TableMetadata metadata = MockSchema.newTableMetadata("test_ks", "test_table"); + String tableKey = metadata.keyspace + "." + metadata.name; + + Mutation mutation = new RowUpdateBuilder(metadata, 0, "key1") + .clustering("col1") + .add("value", "test_value") + .build(); + + Message crossNodeMessage = Message.builder(Verb.MUTATION_REQ, mutation) + .from(InetAddressAndPort.getLocalHost()) + .build(); + Message internalMessage = Message.builder(Verb.MUTATION_REQ, mutation).build(); + + // Record cross-node dropped mutations + messagingService.metrics.recordDroppedMessage(crossNodeMessage, 100, MILLISECONDS); + messagingService.metrics.recordDroppedMessage(crossNodeMessage, 150, MILLISECONDS); + + // Record internal dropped mutations + messagingService.metrics.recordDroppedMessage(internalMessage, 50, MILLISECONDS); + + // Verify the table-specific metric was updated for both types + Map droppedByTable = messagingService.metrics.getDroppedMutationsByTable(); + assertEquals(Long.valueOf(3L), droppedByTable.get(tableKey)); + } + + @Test + public void testMultiplePartitionUpdatesInSingleMutation() + { + // Reset metrics to ensure clean state + messagingService.metrics.resetDroppedMessages(); + + // Create two different tables in the same keyspace with unique names + TableMetadata table1 = MockSchema.newTableMetadata("ks_multi_part", "table1_multi"); + TableMetadata table2 = MockSchema.newTableMetadata("ks_multi_part", "table2_multi"); + + String tableKey1 = table1.keyspace + "." + table1.name; + String tableKey2 = table2.keyspace + "." + table2.name; + + // Create mutations for each table with the same partition key + Mutation mutation1 = new RowUpdateBuilder(table1, 0, "key1").clustering("col1").add("value", "val1").build(); + Mutation mutation2 = new RowUpdateBuilder(table2, 0, "key1").clustering("col1").add("value", "val2").build(); + + // Merge mutations into a single batch mutation (requires same keyspace and key) + Mutation batchMutation = Mutation.merge(Arrays.asList(mutation1, mutation2)); + + // Drop the batch mutation + Message message = Message.builder(Verb.MUTATION_REQ, batchMutation).build(); + messagingService.metrics.recordDroppedMessage(message, 100, MILLISECONDS); + + // Verify both tables are tracked (each partition update counted) + Map droppedByTable = messagingService.metrics.getDroppedMutationsByTable(); + assertEquals(Long.valueOf(1L), droppedByTable.get(tableKey1)); + assertEquals(Long.valueOf(1L), droppedByTable.get(tableKey2)); + } + + @Test + public void testMBeanExposesDroppedMutationsByTable() + { + // Reset metrics to ensure clean state + messagingService.metrics.resetDroppedMessages(); + + // Create table metadata with unique name + TableMetadata metadata = MockSchema.newTableMetadata("test_ks_mbean", "test_table_mbean"); + String tableKey = metadata.keyspace + "." + metadata.name; + + // Create and drop a mutation + Mutation mutation = new RowUpdateBuilder(metadata, 0, "key1") + .clustering("col1") + .add("value", "test_value") + .build(); + Message message = Message.builder(Verb.MUTATION_REQ, mutation).build(); + messagingService.metrics.recordDroppedMessage(message, 100, MILLISECONDS); + + // Access via MBean interface + MessagingServiceMBean mbean = new MessagingServiceMBeanImpl(true, messagingService.versions, messagingService.metrics); + Map droppedByTable = mbean.getDroppedMutationsByTable(); + + // Verify the data is accessible through MBean + assertNotNull(droppedByTable); + assertEquals(Long.valueOf(1L), droppedByTable.get(tableKey)); + } + + @Test + public void testEmptyDroppedMutationsByTableMap() + { + // Reset metrics to ensure clean state + messagingService.metrics.resetDroppedMessages(); + + // Get dropped mutations by table when no mutations have been dropped + Map droppedByTable = messagingService.metrics.getDroppedMutationsByTable(); + + // Verify it returns an empty map (not null) + assertNotNull(droppedByTable); + assertEquals(0, droppedByTable.size()); + } }