diff --git a/CHANGES.txt b/CHANGES.txt index 443f844d4b5a..5f184d41be94 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -3,6 +3,7 @@ Future version (tbd) Merged from 5.1: * Expose current compaction throughput in nodetool (CASSANDRA-13890) Merged from 5.0: + * Fix range queries on early-open BTI files (CASSANDRA-20976) * Improve error messages when initializing auth classes (CASSANDRA-20368 and CASSANDRA-20450) * Use ParameterizedClass for all auth-related implementations (CASSANDRA-19946 and partially CASSANDRA-18554) * Enables IAuthenticator's to return own AuthenticateMessage (CASSANDRA-19984) diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java index e9f2e91dd280..76c7d92d3de1 100644 --- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java +++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java @@ -221,11 +221,14 @@ protected void flushData() runPostFlush.run(); } - public void updateFileHandle(FileHandle.Builder fhBuilder, long dataLength) + public FileHandle.Builder updateFileHandle(FileHandle.Builder fhBuilder, long dataLength) { long length = dataLength > 0 ? dataLength : lastFlushOffset; if (length > 0) - fhBuilder.withCompressionMetadata(metadataWriter.open(length, chunkOffset)); + return fhBuilder.withLength(-1) // get length from compression metadata + .withCompressionMetadata(metadataWriter.open(length, chunkOffset)); + else + return fhBuilder.withLength(length); // 0 or -1 } @Override diff --git a/src/java/org/apache/cassandra/io/compress/EncryptedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/EncryptedSequentialWriter.java index 68f49b7b25cb..40ee135abe86 100644 --- a/src/java/org/apache/cassandra/io/compress/EncryptedSequentialWriter.java +++ b/src/java/org/apache/cassandra/io/compress/EncryptedSequentialWriter.java @@ -178,10 +178,10 @@ protected void resetBuffer() buffer.clear(); } - public void updateFileHandle(FileHandle.Builder fhBuilder, long dataLength) + public FileHandle.Builder updateFileHandle(FileHandle.Builder fhBuilder, long dataLength) { // Set length to last content position to avoid having to read and decrypt the last chunk to find it. - fhBuilder.withLength(lastContent); + return fhBuilder.withLength(lastContent); } @Override diff --git a/src/java/org/apache/cassandra/io/sstable/format/SortedTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SortedTableWriter.java index 3ba16108fd41..e460fa297226 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SortedTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SortedTableWriter.java @@ -358,9 +358,9 @@ protected SSTableReader openFinal(SSTableReader.OpenReason reason, StorageHandle // need to move this inside the `try` so that the `storageHandler` callback can intercept reading issues. // Which would imply being able to get at the compressed/uncompressed sizes upfront (directly from the // writer, without reading the compression metadata written file) in some other way. - dataFile.updateFileHandle(dbuilder); - - FileHandle dfile = dbuilder.bufferSize(dataBufferSize).complete(); + FileHandle dfile = dataFile.updateFileHandle(dbuilder) + .bufferSize(dataBufferSize) + .complete(); invalidateCacheAtPreviousBoundary(dfile, Long.MAX_VALUE); DecoratedKey firstMinimized = getMinimalKey(first); diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java index 120cb4663201..9eebff8aadc9 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java @@ -204,11 +204,15 @@ public boolean openEarly(Consumer callWhenReady) IndexSummary indexSummary = iwriter.summary.build(metadata().partitioner, boundary); long indexFileLength = descriptor.fileFor(Component.PRIMARY_INDEX).length(); int indexBufferSize = optimizationStrategy.bufferSize(indexFileLength / indexSummary.size()); - iwriter.indexFile.updateFileHandle(iwriter.builder); - FileHandle ifile = iwriter.builder.bufferSize(indexBufferSize).withLength(boundary.indexLength).complete(); - dataFile.updateFileHandle(dbuilder, boundary.dataLength); + FileHandle ifile = iwriter.indexFile.updateFileHandle(iwriter.builder) + .bufferSize(indexBufferSize) + .withLength(boundary.indexLength) + .complete(); int dataBufferSize = optimizationStrategy.bufferSize(stats.estimatedPartitionSize.percentile(DatabaseDescriptor.getDiskOptimizationEstimatePercentile())); - FileHandle dfile = dbuilder.bufferSize(dataBufferSize).withLength(boundary.dataLength).complete(); + FileHandle dfile = dataFile.updateFileHandle(dbuilder, boundary.dataLength) + .bufferSize(dataBufferSize) + .withLength(boundary.dataLength) + .complete(); invalidateCacheAtPreviousBoundary(dfile, boundary.dataLength); SSTableReader sstable = BigTableReader.internalOpen(descriptor, components(), metadata, @@ -244,8 +248,7 @@ protected SSTableReader openReader(SSTableReader.OpenReason reason, FileHandle d IndexSummary indexSummary = iwriter.summary.build(metadata().partitioner); long indexFileLength = descriptor.fileFor(Component.PRIMARY_INDEX).length(); int indexBufferSize = optimizationStrategy.bufferSize(indexFileLength / indexSummary.size()); - iwriter.indexFile.updateFileHandle(iwriter.builder); - FileHandle ifile = iwriter.builder.bufferSize(indexBufferSize).complete(); + FileHandle ifile = iwriter.indexFile.updateFileHandle(iwriter.builder).bufferSize(indexBufferSize).complete(); return SSTableReader.internalOpen(descriptor, components(), metadata, diff --git a/src/java/org/apache/cassandra/io/sstable/format/trieindex/PartitionIndex.java b/src/java/org/apache/cassandra/io/sstable/format/trieindex/PartitionIndex.java index b11c50f0f7bd..c0056c95002e 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/trieindex/PartitionIndex.java +++ b/src/java/org/apache/cassandra/io/sstable/format/trieindex/PartitionIndex.java @@ -94,7 +94,7 @@ public PartitionIndex(FileHandle fh, long trieRoot, long keyCount, DecoratedKey this.version = version; } - private PartitionIndex(PartitionIndex src) + protected PartitionIndex(PartitionIndex src) { this(src.fh, src.root, src.keyCount, src.first, src.last, src.filterFirst, src.filterLast, src.version); } diff --git a/src/java/org/apache/cassandra/io/sstable/format/trieindex/PartitionIndexBuilder.java b/src/java/org/apache/cassandra/io/sstable/format/trieindex/PartitionIndexBuilder.java index 4d84e8301b47..17ac7684af38 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/trieindex/PartitionIndexBuilder.java +++ b/src/java/org/apache/cassandra/io/sstable/format/trieindex/PartitionIndexBuilder.java @@ -110,8 +110,7 @@ private void refreshReadableBoundary() if (partitionIndexSyncPosition < partialIndexPartitionEnd) return; - writer.updateFileHandle(fhBuilder); - try (FileHandle fh = fhBuilder.withLength(writer.getLastFlushOffset()).complete()) + try (FileHandle fh = writer.updateFileHandle(fhBuilder).complete()) { PartitionIndex pi = new PartitionIndexEarly(fh, partialIndexTail.root(), @@ -187,7 +186,6 @@ public long complete() throws IOException writer.writeLong(root); writer.sync(); - fhBuilder.withLength(writer.getLastFlushOffset()); writer.updateFileHandle(fhBuilder); return root; diff --git a/src/java/org/apache/cassandra/io/sstable/format/trieindex/PartitionIndexEarly.java b/src/java/org/apache/cassandra/io/sstable/format/trieindex/PartitionIndexEarly.java index b1854e50ceaf..5b7e83ebfb1a 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/trieindex/PartitionIndexEarly.java +++ b/src/java/org/apache/cassandra/io/sstable/format/trieindex/PartitionIndexEarly.java @@ -43,6 +43,19 @@ public PartitionIndexEarly(FileHandle fh, long trieRoot, long keyCount, Decorate this.tail = tail; } + protected PartitionIndexEarly(PartitionIndexEarly partitionIndexEarly) + { + super(partitionIndexEarly); + this.cutoff = partitionIndexEarly.cutoff; + this.tail = partitionIndexEarly.tail; + } + + @Override + public PartitionIndex sharedCopy() + { + return new PartitionIndexEarly(this); + } + @Override protected Rebufferer instantiateRebufferer() { diff --git a/src/java/org/apache/cassandra/io/sstable/format/trieindex/PartitionIterator.java b/src/java/org/apache/cassandra/io/sstable/format/trieindex/PartitionIterator.java index 1fb22736ada6..68ffa5b02230 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/trieindex/PartitionIterator.java +++ b/src/java/org/apache/cassandra/io/sstable/format/trieindex/PartitionIterator.java @@ -64,13 +64,21 @@ class PartitionIterator extends PartitionIndex.IndexPosIterator implements Parti this.rowIndexFile = rowIndexFile; this.dataFile = dataFile; - readNext(); - // first value can be off - if (nextKey != null && !(nextKey.compareTo(left) > inclusiveLeft)) + try { readNext(); + // first value can be off + if (nextKey != null && !(nextKey.compareTo(left) > inclusiveLeft)) + { + readNext(); + } + advance(); + } + catch (Throwable t) + { + super.close(); + throw t; } - advance(); } /** diff --git a/src/java/org/apache/cassandra/io/sstable/format/trieindex/TrieIndexSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/trieindex/TrieIndexSSTableWriter.java index 427b378dc75c..f5090a2967cd 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/trieindex/TrieIndexSSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/trieindex/TrieIndexSSTableWriter.java @@ -204,12 +204,13 @@ public boolean openEarly(Consumer callWhenReady) StatsMetadata stats = (StatsMetadata) finalMetadata.get(MetadataType.STATS); CompactionMetadata compactionMetadata = (CompactionMetadata) finalMetadata.get(MetadataType.COMPACTION); - FileHandle ifile = iwriter.rowIndexFHBuilder.withLength(iwriter.rowIndexFile.getLastFlushOffset()).complete(); - // With trie indices it is no longer necessary to limit the file size; just make sure indices and data - // get updated length / compression metadata. - dataFile.updateFileHandle(dbuilder, dataLength); + FileHandle ifile = iwriter.rowIndexFile.updateFileHandle(iwriter.rowIndexFHBuilder) + .complete(); int dataBufferSize = optimizationStrategy.bufferSize(stats.estimatedPartitionSize.percentile(DatabaseDescriptor.getDiskOptimizationEstimatePercentile())); - FileHandle dfile = dbuilder.bufferSize(dataBufferSize).withLength(dataLength).complete(); + FileHandle dfile = dataFile.updateFileHandle(dbuilder, dataLength) + .bufferSize(dataBufferSize) + .withLength(dataLength) + .complete(); invalidateCacheAtPreviousBoundary(dfile, dataLength); SSTableReader sstable = TrieIndexSSTableReader.internalOpen(descriptor, components(), metadata, @@ -232,6 +233,7 @@ public SSTableReader openFinalEarly() // ensure outstanding openEarly actions are not triggered. dataFile.sync(); iwriter.rowIndexFile.sync(); + iwriter.rowIndexFile.updateFileHandle(iwriter.rowIndexFHBuilder); // Note: Nothing must be written to any of the files after this point, as the chunk cache could pick up and // retain a partially-written page (see DB-2446). @@ -397,7 +399,6 @@ public long append(DecoratedKey key, RowIndexEntry indexEntry) throws IOExceptio public boolean buildPartial(long dataPosition, Consumer callWhenReady) { - rowIndexFile.updateFileHandle(rowIndexFHBuilder); return partitionIndex.buildPartial(callWhenReady, rowIndexFile.position(), dataPosition); } @@ -450,8 +451,6 @@ protected void doPrepare() // truncate index file rowIndexFile.prepareToCommit(); - rowIndexFHBuilder.withLength(rowIndexFile.getLastFlushOffset()); - //TODO figure out whether the update should be done before or after the prepare to commit rowIndexFile.updateFileHandle(rowIndexFHBuilder); complete(); diff --git a/src/java/org/apache/cassandra/io/tries/Walker.java b/src/java/org/apache/cassandra/io/tries/Walker.java index 332cf0a909e3..6699115c30f5 100644 --- a/src/java/org/apache/cassandra/io/tries/Walker.java +++ b/src/java/org/apache/cassandra/io/tries/Walker.java @@ -70,7 +70,7 @@ public Walker(Rebufferer source, long root, ByteComparable.Version version) bh = source.rebuffer(root); buf = bh.buffer(); } - catch (RuntimeException ex) + catch (Throwable ex) { if (bh != null) bh.release(); source.closeReader(); diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java index d52826536fd1..6508e36b1511 100644 --- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java +++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java @@ -308,15 +308,15 @@ public long paddedPosition() return PageAware.padded(position()); } - public void updateFileHandle(FileHandle.Builder fhBuilder) + public FileHandle.Builder updateFileHandle(FileHandle.Builder fhBuilder) { - updateFileHandle(fhBuilder, -1); + return updateFileHandle(fhBuilder, -1); } - public void updateFileHandle(FileHandle.Builder fhBuilder, long dataLength) + public FileHandle.Builder updateFileHandle(FileHandle.Builder fhBuilder, long dataLength) { // Set actual length to avoid having to read it off the file system. - fhBuilder.withLength(dataLength > 0 ? dataLength : lastFlushOffset); + return fhBuilder.withLength(dataLength > 0 ? dataLength : lastFlushOffset); } /** diff --git a/test/unit/org/apache/cassandra/cql3/EarlyOpenCompactionTest.java b/test/unit/org/apache/cassandra/cql3/EarlyOpenCompactionTest.java new file mode 100644 index 000000000000..d5a70c8fcacf --- /dev/null +++ b/test/unit/org/apache/cassandra/cql3/EarlyOpenCompactionTest.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.cql3; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.hamcrest.Matchers; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class EarlyOpenCompactionTest extends CQLTester +{ + private static final int NUM_PARTITIONS = 1000; + private static final int NUM_ROWS_PER_PARTITION = 100; + private static final int VALUE_SIZE = 1000; // ~1KB per row + private static final int VERIFICATION_THREADS = 4; + + private final AtomicBoolean stopVerification = new AtomicBoolean(false); + private final AtomicInteger verificationErrors = new AtomicInteger(0); + private final Random random = new Random(); + private ExecutorService executor; + + @After + public void cleanupAfter() throws Throwable + { + stopVerification.set(true); + if (executor != null) + { + executor.shutdownNow(); + executor.awaitTermination(1, TimeUnit.MINUTES); + } + DatabaseDescriptor.setSSTablePreemptiveOpenIntervalInMB(50); + } + + @Test + public void testEarlyOpenDuringCompaction() throws Throwable + { + // Create a table with a simple schema + createTable("CREATE TABLE %s (" + + "pk int, " + + "ck int, " + + "data text, " + + "PRIMARY KEY (pk, ck)" + + ")"); + + ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); + disableCompaction(); + + // Insert data to create multiple SSTables + System.out.println("Inserting test data..."); + for (int i = 0; i < NUM_PARTITIONS; i++) + { + for (int j = 0; j < NUM_ROWS_PER_PARTITION; j++) + { + String value = randomString(VALUE_SIZE); + execute("INSERT INTO %s (pk, ck, data) VALUES (?, ?, ?)", i, j, value); + } + + // Flush from time to time to get 10 sstables + if (i > 0 && i % Math.max(1, NUM_PARTITIONS / 10) == 0) + { + flush(); + } + } + + // Final flush to ensure all data is written + flush(); + + // Verify we have multiple SSTables + int sstableCount = cfs.getLiveSSTables().size(); + assertTrue("Expected multiple SSTables, got: " + sstableCount, sstableCount > 1); + + // Start verification threads + System.out.println("Starting verification threads..."); + executor = Executors.newFixedThreadPool(VERIFICATION_THREADS); + List> futures = new ArrayList<>(); + + for (int i = 0; i < VERIFICATION_THREADS; i++) + { + futures.add(executor.submit(new VerificationTask())); + } + + // Wait a bit to ensure verification is running + Thread.sleep(1000); + + // Set early open interval to 1MiB to trigger early open during compaction + int intervalMB = 1; + System.out.println("Setting early open interval to " + intervalMB + "MiB..."); + DatabaseDescriptor.setSSTablePreemptiveOpenIntervalInMB(intervalMB); + // Slow down compaction to give the verifier time to fail. + DatabaseDescriptor.setCompactionThroughputMbPerSec(10); + + // Trigger compaction and await its completion + System.out.println("Starting compaction..."); + cfs.enableAutoCompaction(true); + + // Let verification run for a while during and after compaction + System.out.println("Verifying data during and after compaction..."); + Thread.sleep(1000); + + // Stop verification + stopVerification.set(true); + + // Wait for verification to complete + for (Future future : futures) + { + try + { + future.get(10, TimeUnit.SECONDS); + } + catch (Exception e) + { + System.err.println("Verification task failed: " + e); + e.printStackTrace(); + } + } + + // Verify no errors occurred during verification + int errors = verificationErrors.get(); + assertEquals("Found " + errors + " verification errors. Check logs for details.", 0, errors); + + System.out.println("Test completed successfully"); + } + + private class VerificationTask implements Runnable + { + @Override + public void run() + { + try + { + Random localRandom = new Random(Thread.currentThread().getId()); + + while (!stopVerification.get() && !Thread.currentThread().isInterrupted()) + { + // Randomly choose between point query and partition range query + if (localRandom.nextBoolean()) + { + // Point query + int pk = localRandom.nextInt(NUM_PARTITIONS * 110 / 100); // 10% chance outside + int ck = localRandom.nextInt(NUM_ROWS_PER_PARTITION * 110 / 100); // 10% chance outside + + try + { + Assert.assertEquals(pk < NUM_PARTITIONS && ck < NUM_ROWS_PER_PARTITION ? 1 : 0, + execute("SELECT data FROM %s WHERE pk = ? AND ck = ?", pk, ck).size()); + } + catch (Throwable t) + { + verificationErrors.incrementAndGet(); + System.err.println("Point query failed for pk=" + pk + ", ck=" + ck + ": " + t); + t.printStackTrace(); + } + } + else + { + // Partition range query + int pk = localRandom.nextInt(NUM_PARTITIONS); + + try + { + Assert.assertThat(execute("SELECT data FROM %s WHERE token(pk) <= token(?) AND token(pk) >= token(?)", pk, pk).size(), + Matchers.greaterThanOrEqualTo(NUM_ROWS_PER_PARTITION)); + } + catch (Throwable t) + { + verificationErrors.incrementAndGet(); + System.err.println("Range query failed for pk in (" + pk + ", " + (pk + 1) + ", " + (pk + 2) + "): " + t); + t.printStackTrace(); + } + } + + // Add a small delay to prevent overwhelming the system + Thread.yield(); + } + } + catch (Throwable t) + { + verificationErrors.incrementAndGet(); + System.err.println("Verification task failed: " + t); + t.printStackTrace(); + } + } + } + + private String randomString(int length) + { + StringBuilder sb = new StringBuilder(length); + for (int i = 0; i < length; i++) + { + sb.append((char)('a' + random.nextInt(26))); + } + return sb.toString(); + } +} diff --git a/test/unit/org/apache/cassandra/io/sstable/format/trieindex/PartitionIndexTest.java b/test/unit/org/apache/cassandra/io/sstable/format/trieindex/PartitionIndexTest.java index 0e7e18bd1de4..a03097610063 100644 --- a/test/unit/org/apache/cassandra/io/sstable/format/trieindex/PartitionIndexTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/format/trieindex/PartitionIndexTest.java @@ -90,7 +90,7 @@ public class PartitionIndexTest //Lower the size of the indexes when running without the chunk cache, otherwise the test times out on Jenkins static final int COUNT = ChunkCache.instance != null ? 245256 : 24525; - @Parameterized.Parameters() + @Parameterized.Parameters(name = "{0} {1}") public static Collection generateData() { return Arrays.asList(new Object[]{ Config.DiskAccessMode.standard, OSS50 },