Skip to content

Commit 5ef3df1

Browse files
driftxmichaelsembwever
authored andcommitted
CNDB-15826: CNDB-14359: Add tracking of dropped mutations by table (#1790)
Fixes #14359
1 parent e8e1ce9 commit 5ef3df1

File tree

5 files changed

+225
-0
lines changed

5 files changed

+225
-0
lines changed

src/java/org/apache/cassandra/metrics/DroppedMessageMetrics.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,4 +86,12 @@ public DroppedMessageMetrics(Verb verb)
8686
crossNodeDroppedLatency = Metrics.timer(createMetricName(TYPE, "CrossNodeDroppedLatency", scope));
8787
}
8888
}
89+
90+
public DroppedMessageMetrics(String type, String scope)
91+
{
92+
MetricNameFactory factory = new DefaultNameFactory(type, scope);
93+
dropped = Metrics.meter(factory.createMetricName("Dropped"));
94+
internalDroppedLatency = Metrics.timer(factory.createMetricName("InternalDroppedLatency"));
95+
crossNodeDroppedLatency = Metrics.timer(factory.createMetricName("CrossNodeDroppedLatency"));
96+
}
8997
}

src/java/org/apache/cassandra/metrics/MessagingMetrics.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,9 @@ private static final class DroppedForVerb
9999

100100
// total dropped message counts for server lifetime
101101
private final Map<Verb, DroppedForVerb> droppedMessages = new ConcurrentHashMap<>();
102+
103+
// dropped mutations by table
104+
private final Map<String, DroppedMessageMetrics> droppedMutationsByTable = new ConcurrentHashMap<>();
102105

103106
public MessagingMetrics()
104107
{
@@ -161,6 +164,24 @@ public void recordTotalMessageProcessingTime(Verb verb, InetAddressAndPort from,
161164
public void recordDroppedMessage(Message<?> message, long timeElapsed, TimeUnit timeUnit)
162165
{
163166
recordDroppedMessage(message.verb(), timeElapsed, timeUnit, message.isCrossNode());
167+
168+
if (message.verb() == Verb.MUTATION_REQ && message.payload instanceof org.apache.cassandra.db.Mutation)
169+
{
170+
org.apache.cassandra.db.Mutation mutation = (org.apache.cassandra.db.Mutation) message.payload;
171+
for (org.apache.cassandra.db.partitions.PartitionUpdate update : mutation.getPartitionUpdates())
172+
{
173+
String tableKey = update.metadata().keyspace + '.' + update.metadata().name;
174+
DroppedMessageMetrics tableMetrics = droppedMutationsByTable.get(tableKey);
175+
if (tableMetrics == null)
176+
tableMetrics = droppedMutationsByTable.computeIfAbsent(tableKey,
177+
k -> new DroppedMessageMetrics("DroppedMutations", k));
178+
tableMetrics.dropped.mark();
179+
if (message.isCrossNode())
180+
tableMetrics.crossNodeDroppedLatency.update(timeElapsed, timeUnit);
181+
else
182+
tableMetrics.internalDroppedLatency.update(timeElapsed, timeUnit);
183+
}
184+
}
164185
}
165186

166187
public void recordDroppedMessage(Verb verb, long timeElapsed, TimeUnit timeUnit, boolean isCrossNode)
@@ -201,6 +222,14 @@ public Map<String, Integer> getDroppedMessages()
201222
map.put(entry.getKey().toString(), (int) entry.getValue().metrics.dropped.getCount());
202223
return map;
203224
}
225+
226+
public Map<String, Long> getDroppedMutationsByTable()
227+
{
228+
Map<String, Long> map = new HashMap<>(droppedMutationsByTable.size());
229+
for (Map.Entry<String, DroppedMessageMetrics> entry : droppedMutationsByTable.entrySet())
230+
map.put(entry.getKey(), entry.getValue().dropped.getCount());
231+
return map;
232+
}
204233

205234
private void logDroppedMessages()
206235
{
@@ -239,5 +268,6 @@ public int resetAndConsumeDroppedErrors(Consumer<String> messageConsumer)
239268
public void resetDroppedMessages()
240269
{
241270
droppedMessages.replaceAll((u, v) -> new DroppedForVerb(new DroppedMessageMetrics(u)));
271+
droppedMutationsByTable.clear();
242272
}
243273
}

src/java/org/apache/cassandra/net/MessagingServiceMBean.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,11 @@ public interface MessagingServiceMBean
108108
* dropped message counts for server lifetime
109109
*/
110110
public Map<String, Integer> getDroppedMessages();
111+
112+
/**
113+
* dropped mutation counts by table for server lifetime
114+
*/
115+
public Map<String, Long> getDroppedMutationsByTable();
111116

112117
/**
113118
* Total number of timeouts happened on this node

src/java/org/apache/cassandra/net/MessagingServiceMBeanImpl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,12 @@ public Map<String, Integer> getDroppedMessages()
219219
{
220220
return metrics.getDroppedMessages();
221221
}
222+
223+
@Override
224+
public Map<String, Long> getDroppedMutationsByTable()
225+
{
226+
return metrics.getDroppedMutationsByTable();
227+
}
222228

223229
@Override
224230
public long getTotalTimeouts()

test/unit/org/apache/cassandra/net/MessagingServiceTest.java

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,15 @@
5050
import org.apache.cassandra.config.CassandraRelevantProperties;
5151
import org.apache.cassandra.config.DatabaseDescriptor;
5252
import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
53+
import org.apache.cassandra.db.Mutation;
54+
import org.apache.cassandra.db.RowUpdateBuilder;
5355
import org.apache.cassandra.db.commitlog.CommitLog;
5456
import org.apache.cassandra.exceptions.ConfigurationException;
5557
import org.apache.cassandra.locator.InetAddressAndPort;
5658
import org.apache.cassandra.metrics.MessagingMetrics;
5759
import org.apache.cassandra.utils.ByteBufferUtil;
60+
import org.apache.cassandra.schema.MockSchema;
61+
import org.apache.cassandra.schema.TableMetadata;
5862
import org.apache.cassandra.utils.FBUtilities;
5963
import org.awaitility.Awaitility;
6064
import org.caffinitas.ohc.histo.EstimatedHistogram;
@@ -589,4 +593,176 @@ private static void sendMessages(int numOfMessages, Verb verb) throws UnknownHos
589593
MessagingService.instance().send(Message.out(verb, noPayload), address);
590594
}
591595
}
596+
597+
@Test
598+
public void testDroppedMutationsTrackedByTable()
599+
{
600+
// Reset metrics to ensure clean state
601+
messagingService.metrics.resetDroppedMessages();
602+
603+
// Create table metadata for a test table with unique name
604+
TableMetadata metadata = MockSchema.newTableMetadata("test_ks_drops", "test_table_drops");
605+
String tableKey = metadata.keyspace + "." + metadata.name;
606+
607+
// Create a mutation for the table
608+
Mutation mutation = new RowUpdateBuilder(metadata, 0, "key1")
609+
.clustering("col1")
610+
.add("value", "test_value")
611+
.build();
612+
613+
// Create a message with the mutation
614+
Message<Mutation> message = Message.builder(Verb.MUTATION_REQ, mutation).build();
615+
616+
// Record the mutation as dropped
617+
messagingService.metrics.recordDroppedMessage(message, 100, MILLISECONDS);
618+
619+
// Verify the table-specific metric was updated
620+
Map<String, Long> droppedByTable = messagingService.metrics.getDroppedMutationsByTable();
621+
assertNotNull(droppedByTable);
622+
assertEquals(Long.valueOf(1L), droppedByTable.get(tableKey));
623+
624+
// Drop another mutation for the same table
625+
messagingService.metrics.recordDroppedMessage(message, 200, MILLISECONDS);
626+
627+
// Verify the counter accumulated
628+
droppedByTable = messagingService.metrics.getDroppedMutationsByTable();
629+
assertEquals(Long.valueOf(2L), droppedByTable.get(tableKey));
630+
}
631+
632+
@Test
633+
public void testMultipleTablesTrackedIndependently()
634+
{
635+
// Reset metrics to ensure clean state
636+
messagingService.metrics.resetDroppedMessages();
637+
638+
// Create metadata for multiple tables
639+
TableMetadata table1 = MockSchema.newTableMetadata("ks1", "table1");
640+
TableMetadata table2 = MockSchema.newTableMetadata("ks1", "table2");
641+
TableMetadata table3 = MockSchema.newTableMetadata("ks2", "table1");
642+
643+
String tableKey1 = table1.keyspace + "." + table1.name;
644+
String tableKey2 = table2.keyspace + "." + table2.name;
645+
String tableKey3 = table3.keyspace + "." + table3.name;
646+
647+
// Create mutations for each table
648+
Mutation mutation1 = new RowUpdateBuilder(table1, 0, "key1").clustering("col1").add("value", "val1").build();
649+
Mutation mutation2 = new RowUpdateBuilder(table2, 0, "key2").clustering("col1").add("value", "val2").build();
650+
Mutation mutation3 = new RowUpdateBuilder(table3, 0, "key3").clustering("col1").add("value", "val3").build();
651+
652+
// Drop mutations for different tables
653+
messagingService.metrics.recordDroppedMessage(Message.builder(Verb.MUTATION_REQ, mutation1).build(), 100, MILLISECONDS);
654+
messagingService.metrics.recordDroppedMessage(Message.builder(Verb.MUTATION_REQ, mutation1).build(), 100, MILLISECONDS);
655+
messagingService.metrics.recordDroppedMessage(Message.builder(Verb.MUTATION_REQ, mutation2).build(), 100, MILLISECONDS);
656+
messagingService.metrics.recordDroppedMessage(Message.builder(Verb.MUTATION_REQ, mutation3).build(), 100, MILLISECONDS);
657+
messagingService.metrics.recordDroppedMessage(Message.builder(Verb.MUTATION_REQ, mutation3).build(), 100, MILLISECONDS);
658+
messagingService.metrics.recordDroppedMessage(Message.builder(Verb.MUTATION_REQ, mutation3).build(), 100, MILLISECONDS);
659+
660+
// Verify each table tracked independently
661+
Map<String, Long> droppedByTable = messagingService.metrics.getDroppedMutationsByTable();
662+
assertEquals(Long.valueOf(2L), droppedByTable.get(tableKey1));
663+
assertEquals(Long.valueOf(1L), droppedByTable.get(tableKey2));
664+
assertEquals(Long.valueOf(3L), droppedByTable.get(tableKey3));
665+
}
666+
667+
@Test
668+
public void testCrossNodeVsInternalLatencyPerTable()
669+
{
670+
// Reset metrics to ensure clean state
671+
messagingService.metrics.resetDroppedMessages();
672+
673+
TableMetadata metadata = MockSchema.newTableMetadata("test_ks", "test_table");
674+
String tableKey = metadata.keyspace + "." + metadata.name;
675+
676+
Mutation mutation = new RowUpdateBuilder(metadata, 0, "key1")
677+
.clustering("col1")
678+
.add("value", "test_value")
679+
.build();
680+
681+
Message<Mutation> crossNodeMessage = Message.builder(Verb.MUTATION_REQ, mutation)
682+
.from(InetAddressAndPort.getLocalHost())
683+
.build();
684+
Message<Mutation> internalMessage = Message.builder(Verb.MUTATION_REQ, mutation).build();
685+
686+
// Record cross-node dropped mutations
687+
messagingService.metrics.recordDroppedMessage(crossNodeMessage, 100, MILLISECONDS);
688+
messagingService.metrics.recordDroppedMessage(crossNodeMessage, 150, MILLISECONDS);
689+
690+
// Record internal dropped mutations
691+
messagingService.metrics.recordDroppedMessage(internalMessage, 50, MILLISECONDS);
692+
693+
// Verify the table-specific metric was updated for both types
694+
Map<String, Long> droppedByTable = messagingService.metrics.getDroppedMutationsByTable();
695+
assertEquals(Long.valueOf(3L), droppedByTable.get(tableKey));
696+
}
697+
698+
@Test
699+
public void testMultiplePartitionUpdatesInSingleMutation()
700+
{
701+
// Reset metrics to ensure clean state
702+
messagingService.metrics.resetDroppedMessages();
703+
704+
// Create two different tables in the same keyspace with unique names
705+
TableMetadata table1 = MockSchema.newTableMetadata("ks_multi_part", "table1_multi");
706+
TableMetadata table2 = MockSchema.newTableMetadata("ks_multi_part", "table2_multi");
707+
708+
String tableKey1 = table1.keyspace + "." + table1.name;
709+
String tableKey2 = table2.keyspace + "." + table2.name;
710+
711+
// Create mutations for each table with the same partition key
712+
Mutation mutation1 = new RowUpdateBuilder(table1, 0, "key1").clustering("col1").add("value", "val1").build();
713+
Mutation mutation2 = new RowUpdateBuilder(table2, 0, "key1").clustering("col1").add("value", "val2").build();
714+
715+
// Merge mutations into a single batch mutation (requires same keyspace and key)
716+
Mutation batchMutation = Mutation.merge(Arrays.asList(mutation1, mutation2));
717+
718+
// Drop the batch mutation
719+
Message<Mutation> message = Message.builder(Verb.MUTATION_REQ, batchMutation).build();
720+
messagingService.metrics.recordDroppedMessage(message, 100, MILLISECONDS);
721+
722+
// Verify both tables are tracked (each partition update counted)
723+
Map<String, Long> droppedByTable = messagingService.metrics.getDroppedMutationsByTable();
724+
assertEquals(Long.valueOf(1L), droppedByTable.get(tableKey1));
725+
assertEquals(Long.valueOf(1L), droppedByTable.get(tableKey2));
726+
}
727+
728+
@Test
729+
public void testMBeanExposesDroppedMutationsByTable()
730+
{
731+
// Reset metrics to ensure clean state
732+
messagingService.metrics.resetDroppedMessages();
733+
734+
// Create table metadata with unique name
735+
TableMetadata metadata = MockSchema.newTableMetadata("test_ks_mbean", "test_table_mbean");
736+
String tableKey = metadata.keyspace + "." + metadata.name;
737+
738+
// Create and drop a mutation
739+
Mutation mutation = new RowUpdateBuilder(metadata, 0, "key1")
740+
.clustering("col1")
741+
.add("value", "test_value")
742+
.build();
743+
Message<Mutation> message = Message.builder(Verb.MUTATION_REQ, mutation).build();
744+
messagingService.metrics.recordDroppedMessage(message, 100, MILLISECONDS);
745+
746+
// Access via MBean interface
747+
MessagingServiceMBean mbean = new MessagingServiceMBeanImpl(true, messagingService.versions, messagingService.metrics);
748+
Map<String, Long> droppedByTable = mbean.getDroppedMutationsByTable();
749+
750+
// Verify the data is accessible through MBean
751+
assertNotNull(droppedByTable);
752+
assertEquals(Long.valueOf(1L), droppedByTable.get(tableKey));
753+
}
754+
755+
@Test
756+
public void testEmptyDroppedMutationsByTableMap()
757+
{
758+
// Reset metrics to ensure clean state
759+
messagingService.metrics.resetDroppedMessages();
760+
761+
// Get dropped mutations by table when no mutations have been dropped
762+
Map<String, Long> droppedByTable = messagingService.metrics.getDroppedMutationsByTable();
763+
764+
// Verify it returns an empty map (not null)
765+
assertNotNull(droppedByTable);
766+
assertEquals(0, droppedByTable.size());
767+
}
592768
}

0 commit comments

Comments
 (0)