From 58ac1d3dcad5776a198dd4f9295f09dbb39b3182 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ko=C5=82aczkowski?= Date: Thu, 21 Aug 2025 21:22:58 +0200 Subject: [PATCH 1/2] CNDB-14207: Don't mark the index non-queryable if flush fails (#1770) If the index fails to build, mark the index non-queryable, but don't fail the C* flush. This way the node can continue to run the other queries. --- .../sai/disk/StorageAttachedIndexWriter.java | 10 ++++ .../index/sai/functional/FlushingTest.java | 49 +++++++++++++++++++ 2 files changed, 59 insertions(+) diff --git a/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java b/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java index b43c21fe5a18..e6a6b9fdd2f7 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java +++ b/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java @@ -317,6 +317,16 @@ public void abort(Throwable accumulator, boolean fromIndex) // Mark the write aborted, so we can short-circuit any further operations on the component writers. aborted = true; + // For non-compaction and non-flush, make any indexes involved in this transaction non-queryable, + // as they will likely not match the backing table. + // For compaction and flush: the task should be aborted and new sstables will not be added to tracker. + // We do not want to mark the index as non-queryable on compaction and flush because otherwise + // the index status would be propagated to the other nodes and that would make querying the index impossible + // also on the other nodes. If the problem with compaction or flush repeats, then it is better to fail + // on this node only and let the rest of the cluster operate normally. + if (fromIndex && opType != OperationType.COMPACTION && opType != OperationType.FLUSH) + indices.forEach(StorageAttachedIndex::makeIndexNonQueryable); + for (PerIndexWriter perIndexWriter : perIndexWriters) { try diff --git a/test/unit/org/apache/cassandra/index/sai/functional/FlushingTest.java b/test/unit/org/apache/cassandra/index/sai/functional/FlushingTest.java index 23861fb21099..abbc80eceea1 100644 --- a/test/unit/org/apache/cassandra/index/sai/functional/FlushingTest.java +++ b/test/unit/org/apache/cassandra/index/sai/functional/FlushingTest.java @@ -23,12 +23,19 @@ import org.junit.Test; import com.datastax.driver.core.ResultSet; +import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.marshal.Int32Type; import org.apache.cassandra.index.sai.IndexContext; import org.apache.cassandra.index.sai.SAITester; import org.apache.cassandra.index.sai.disk.v1.kdtree.NumericIndexWriter; +import org.apache.cassandra.inject.ActionBuilder; +import org.apache.cassandra.inject.Expression; +import org.apache.cassandra.inject.Injection; +import org.apache.cassandra.inject.Injections; +import org.apache.cassandra.inject.InvokePointBuilder; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; public class FlushingTest extends SAITester { @@ -84,4 +91,46 @@ public void testFlushingOverwriteDelete() throws Throwable assertIndexFilesInToc(indexFiles()); } + + @Test + public void testMemtableIndexFlushFailure() throws Throwable + { + Injection failMemtableComplete = Injections.newCustom("FailMemtableIndexWriterComplete") + .add(InvokePointBuilder.newInvokePoint() + .onClass("org.apache.cassandra.index.sai.disk.v1.MemtableIndexWriter") + .onMethod("complete", "com.google.common.base.Stopwatch") + ) + .add(ActionBuilder.newActionBuilder().actions() + .doThrow(java.io.IOException.class, Expression.quote("Byteman-injected fault in MemtableIndexWriter.complete")) + ) + .build(); + Injections.inject(failMemtableComplete); + + createTable(CREATE_TABLE_TEMPLATE); + String indexName = createIndex(String.format(CREATE_INDEX_TEMPLATE, "v1")); + + String pkValue = "key_bm_flush_fail"; + int indexedValue = 456; + + execute("INSERT INTO %s (id1, v1) VALUES (?, ?)", pkValue, indexedValue); + + long oldCommitLogSize = CommitLog.instance.getActiveContentSize(); + + // The Byteman rule will cause the SAI part of the flush to fail + assertThrows(RuntimeException.class, this::flush); + + // Assert that the index is still queryable for the inserted data despite the injected fault. + assertEquals(0, getNotQueryableIndexes().size()); + ResultSet indexQueryResults = executeNet("SELECT id1 FROM %s WHERE v1 = ?", indexedValue); + assertEquals("The index should be still usable despite flush failure", 1, indexQueryResults.all().size()); + + // Assert that the table is still queryable by primary key. + ResultSet pkQueryResults = executeNet("SELECT v1 FROM %s WHERE id1 = ?", pkValue); + assertEquals("The table should still be queryable by primary key", 1, pkQueryResults.all().size()); + + // Make sure no sstable has been created: + assertEquals(0, getCurrentColumnFamilyStore().getLiveSSTables().size()); + verifySSTableIndexes(indexName, 0, 0); + assertEquals(oldCommitLogSize, CommitLog.instance.getActiveContentSize()); + } } From e143d65a896506db7903dfac4ba5189a1febbb15 Mon Sep 17 00:00:00 2001 From: Daniel Jatnieks Date: Thu, 4 Sep 2025 15:38:10 -0500 Subject: [PATCH 2/2] Fix Apache C* tests to now expect IndexNotAvailableException after injected errors --- .../test/sai/ImportIndexedSSTablesTest.java | 15 ++++++++++-- .../test/sai/IndexStreamingFailureTest.java | 24 ++++++++++++++++--- 2 files changed, 34 insertions(+), 5 deletions(-) diff --git a/test/distributed/org/apache/cassandra/distributed/test/sai/ImportIndexedSSTablesTest.java b/test/distributed/org/apache/cassandra/distributed/test/sai/ImportIndexedSSTablesTest.java index c979bd5f9dd8..a0bda39242b6 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/sai/ImportIndexedSSTablesTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/sai/ImportIndexedSSTablesTest.java @@ -47,6 +47,7 @@ import static org.apache.cassandra.distributed.api.Feature.GOSSIP; import static org.apache.cassandra.distributed.api.Feature.NETWORK; +import static org.assertj.core.api.Assertions.fail; public class ImportIndexedSSTablesTest extends TestBaseImpl { @@ -102,8 +103,18 @@ public void testIndexBuildingFailureDuringImport() rs = first.executeInternal(String.format("SELECT pk FROM %s.%s WHERE pk = ?", KEYSPACE, table), 1); assertThat(rs.length).isEqualTo(0); - rs = first.executeInternal(String.format("SELECT pk FROM %s.%s WHERE v = ?", KEYSPACE, table), "v1"); - assertThat(rs.length).isEqualTo(0); + + try + { + // The injected ByteBuddy error will cause the index build to fail, so querying the index should fail. + first.executeInternal(String.format("SELECT pk FROM %s.%s WHERE v = ?", KEYSPACE, table), "v1"); + fail("Expected IndexNotAvailableException exception"); + } + catch (Exception e) + { + assertThat(e.getClass().getName()).contains("IndexNotAvailableException"); + assertThat(e.getMessage()).contains("The secondary index 'fail_during_import_test_v_index' is not yet available"); + } } @Test diff --git a/test/distributed/org/apache/cassandra/distributed/test/sai/IndexStreamingFailureTest.java b/test/distributed/org/apache/cassandra/distributed/test/sai/IndexStreamingFailureTest.java index 9a9c659a0b24..ab6cc12c47d3 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/sai/IndexStreamingFailureTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/sai/IndexStreamingFailureTest.java @@ -41,6 +41,7 @@ import static org.apache.cassandra.distributed.api.Feature.GOSSIP; import static org.apache.cassandra.distributed.api.Feature.NETWORK; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; import static org.junit.Assert.assertFalse; public class IndexStreamingFailureTest extends TestBaseImpl @@ -108,9 +109,26 @@ private void testAvailabilityAfterStreaming(Cluster cluster, String table, boole rs = second.executeInternal(String.format("SELECT pk FROM %s.%s WHERE pk = ?", KEYSPACE, table), 1); assertThat(rs.length).isEqualTo(0); - // ...and querying the index also returns nothing, as the index for the streamed SSTable was never built. - rs = second.executeInternal(String.format("SELECT pk FROM %s.%s WHERE v = ?", KEYSPACE, table), "v1"); - assertThat(rs.length).isEqualTo(0); + if (streamEntireSSTables) + { + // ...and querying the index also returns nothing, as the index for the streamed SSTable was never built. + rs = second.executeInternal(String.format("SELECT pk FROM %s.%s WHERE v = ?", KEYSPACE, table), "v1"); + assertThat(rs.length).isEqualTo(0); + } + else + { + // The injected ByteBuddy error will cause the index build to fail, so querying the index should fail. + try + { + second.executeInternal(String.format("SELECT pk FROM %s.%s WHERE v = ?", KEYSPACE, table), "v1"); + fail("Expected IndexNotAvailableException exception"); + } + catch (Exception e) + { + assertThat(e.getClass().getName()).contains("IndexNotAvailableException"); + assertThat(e.getMessage()).contains("The secondary index 'non_entire_file_test_v_index' is not yet available"); + } + } // On restart, ensure that the index remains querable and does not include the data we attempted to stream. ClusterUtils.stopUnchecked(second);