From 02ee72b03cc4a61bf8bcc2be0be647b93bc90799 Mon Sep 17 00:00:00 2001 From: Joel Knighton Date: Wed, 8 Oct 2025 17:19:30 -0500 Subject: [PATCH 1/3] CNDB-15623: Only use write path for CDC tables in CassandraStreamReceiver if CDC is enabled on the node --- .../apache/cassandra/db/streaming/CassandraStreamReceiver.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java index d9be692a3e4..de44911f006 100644 --- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java +++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java @@ -28,6 +28,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.Mutation; @@ -177,7 +178,7 @@ private boolean hasViews(ColumnFamilyStore cfs) private boolean hasCDC(ColumnFamilyStore cfs) { - return cfs.metadata().params.cdc; + return DatabaseDescriptor.isCDCEnabled() && cfs.metadata().params.cdc; } /* From 84aecd90a28e4aa582a50db10c6fc580482ead6d Mon Sep 17 00:00:00 2001 From: Joel Knighton Date: Thu, 9 Oct 2025 13:03:17 -0500 Subject: [PATCH 2/3] CNDB-15623: Parameterize the in-JVM dtest RepairTest to cover CDC repairs Repairs on CDC-enabled tables use the local write path when the stream is received on a CDC-enabled node. There is no existing test coverage of this local write path. This commit parameterizes this existing test to run end-to-end repairs for every combination of node-enabled/table-enabled, adding assertions to cover which stream path is used. --- .../distributed/test/RepairTest.java | 68 ++++++++++++++++--- 1 file changed, 60 insertions(+), 8 deletions(-) diff --git a/test/distributed/org/apache/cassandra/distributed/test/RepairTest.java b/test/distributed/org/apache/cassandra/distributed/test/RepairTest.java index b127a74d217..a0c1c70b39a 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/RepairTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/RepairTest.java @@ -29,13 +29,16 @@ import com.google.common.collect.ImmutableMap; import org.junit.AfterClass; import org.junit.Assert; -import org.junit.BeforeClass; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.apache.cassandra.distributed.Cluster; import org.apache.cassandra.distributed.api.ICluster; import org.apache.cassandra.distributed.api.IInstanceConfig; import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.repair.SystemDistributedKeyspace; +import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.concurrent.SimpleCondition; import org.apache.cassandra.utils.progress.ProgressEventType; @@ -46,10 +49,19 @@ import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows; import static org.apache.cassandra.distributed.test.ExecUtil.rethrow; +@RunWith(Parameterized.class) public class RepairTest extends TestBaseImpl { + private static boolean nodesHaveCDC; + private static boolean tableHasCDC; private static ICluster cluster; + @Parameterized.Parameters(name = "nodesHaveCDC={0}, tableHasCDC={1}") + public static Iterable data() + { + return Arrays.asList(new Object[][] {{ false, false }, { false, true } , { true, false }, { true, true }}); + } + private static void insert(ICluster cluster, String keyspace, int start, int end, int ... nodes) { String insert = String.format("INSERT INTO %s.test (k, c1, c2) VALUES (?, 'value1', 'value2');", keyspace); @@ -80,7 +92,7 @@ private static void flush(ICluster cluster, String keyspace, cluster.get(node).runOnInstance(rethrow(() -> StorageService.instance.forceKeyspaceFlush(keyspace))); } - private static ICluster create(Consumer configModifier) throws IOException + private ICluster create(Consumer configModifier) throws IOException { configModifier = configModifier.andThen( config -> config.set("hinted_handoff_enabled", false) @@ -94,6 +106,15 @@ private static ICluster create(Consumer configModifier) throws static void repair(ICluster cluster, String keyspace, Map options) { + long[] startPositions = new long[cluster.size()]; + for (int i = 1; i <= cluster.size(); i++) + { + IInvokableInstance node = cluster.get(i); + if (node.isShutdown()) + continue; + startPositions[i - 1] = node.logs().mark(); + } + cluster.get(1).runOnInstance(rethrow(() -> { SimpleCondition await = new SimpleCondition(); StorageService.instance.repair(keyspace, options, ImmutableList.of((tag, event) -> { @@ -102,14 +123,25 @@ static void repair(ICluster cluster, String keyspace, Map cluster, String keyspace, String compression) throws Exception + void populate(ICluster cluster, String keyspace, String compression) throws Exception { try { cluster.schemaChange(String.format("DROP TABLE IF EXISTS %s.test;", keyspace)); - cluster.schemaChange(String.format("CREATE TABLE %s.test (k text, c1 text, c2 text, PRIMARY KEY (k)) WITH compression = %s", keyspace, compression)); + cluster.schemaChange(String.format("CREATE TABLE %s.test (k text, c1 text, c2 text, PRIMARY KEY (k)) WITH compression = %s AND cdc = %s;", keyspace, compression, tableHasCDC)); insert(cluster, keyspace, 0, 1000, 1, 2, 3); flush(cluster, keyspace, 1); @@ -134,6 +166,11 @@ void repair(ICluster cluster, boolean sequential, String com populate(cluster, KEYSPACE, compression); repair(cluster, KEYSPACE, ImmutableMap.of("parallelism", sequential ? "sequential" : "parallel")); verify(cluster, KEYSPACE, 0, 2001, 1, 2, 3); + + for (int i = 1; i <= cluster.size(); i++) + Assert.assertEquals("We should use the local write path (which requires flushing) if CDC is enabled on both a node and table-level", + nodesHaveCDC && tableHasCDC, + !cluster.get(i).logs().grep("Enqueuing flush of test \\(STREAMS_RECEIVED\\)").getResult().isEmpty()); } void shutDownNodesAndForceRepair(ICluster cluster, String keyspace, int downNode) throws Exception @@ -143,10 +180,21 @@ void shutDownNodesAndForceRepair(ICluster cluster, String ke repair(cluster, keyspace, ImmutableMap.of("forceRepair", "true")); } - @BeforeClass - public static void setupCluster() throws IOException + public RepairTest(boolean nodesHaveCDC, boolean tableHasCDC) throws Exception { - cluster = create(config -> {}); + // This runs per method, but we only want to rebuild the cluster if nodesHaveCDC has changed since the last + // build and we need to update the configuration accordingly + if (cluster != null && RepairTest.nodesHaveCDC != nodesHaveCDC) + { + cluster.close(); + cluster = null; + } + + if (cluster == null) + cluster = create(config -> config.set("cdc_enabled", nodesHaveCDC)); + + RepairTest.nodesHaveCDC = nodesHaveCDC; + RepairTest.tableHasCDC = tableHasCDC; } @AfterClass @@ -198,7 +246,11 @@ public void testForcedNormalRepairWithOneNodeDown() throws Exception // The test uses its own keyspace with rf == 2 String forceRepairKeyspace = "test_force_repair_keyspace"; int rf = 2; - cluster.schemaChange("CREATE KEYSPACE " + forceRepairKeyspace + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + rf + "};"); + cluster.schemaChange("CREATE KEYSPACE IF NOT EXISTS " + forceRepairKeyspace + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + rf + "};"); + + // Truncate distributed repair keyspace due to test class parameterization. We only want results + // from our run + cluster.schemaChange("TRUNCATE TABLE " + SchemaConstants.DISTRIBUTED_KEYSPACE_NAME + "." + SystemDistributedKeyspace.PARENT_REPAIR_HISTORY); try { From 61c096f37c48b1174fc48bba2b358526a2a590e8 Mon Sep 17 00:00:00 2001 From: Joel Knighton Date: Thu, 9 Oct 2025 15:29:14 -0500 Subject: [PATCH 3/3] CNDB-15623: Clean up redundant log assert left over from refactoring RepairTest --- .../org/apache/cassandra/distributed/test/RepairTest.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/test/distributed/org/apache/cassandra/distributed/test/RepairTest.java b/test/distributed/org/apache/cassandra/distributed/test/RepairTest.java index a0c1c70b39a..ef564db2aca 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/RepairTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/RepairTest.java @@ -166,11 +166,6 @@ void repair(ICluster cluster, boolean sequential, String com populate(cluster, KEYSPACE, compression); repair(cluster, KEYSPACE, ImmutableMap.of("parallelism", sequential ? "sequential" : "parallel")); verify(cluster, KEYSPACE, 0, 2001, 1, 2, 3); - - for (int i = 1; i <= cluster.size(); i++) - Assert.assertEquals("We should use the local write path (which requires flushing) if CDC is enabled on both a node and table-level", - nodesHaveCDC && tableHasCDC, - !cluster.get(i).logs().grep("Enqueuing flush of test \\(STREAMS_RECEIVED\\)").getResult().isEmpty()); } void shutDownNodesAndForceRepair(ICluster cluster, String keyspace, int downNode) throws Exception