Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ Future version (tbd)
Merged from 5.1:
* Expose current compaction throughput in nodetool (CASSANDRA-13890)
Merged from 5.0:
* Fix range queries on early-open BTI files (CASSANDRA-20976)
* Improve error messages when initializing auth classes (CASSANDRA-20368 and CASSANDRA-20450)
* Use ParameterizedClass for all auth-related implementations (CASSANDRA-19946 and partially CASSANDRA-18554)
* Enables IAuthenticator's to return own AuthenticateMessage (CASSANDRA-19984)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,11 +221,14 @@ protected void flushData()
runPostFlush.run();
}

public void updateFileHandle(FileHandle.Builder fhBuilder, long dataLength)
public FileHandle.Builder updateFileHandle(FileHandle.Builder fhBuilder, long dataLength)
{
long length = dataLength > 0 ? dataLength : lastFlushOffset;
if (length > 0)
fhBuilder.withCompressionMetadata(metadataWriter.open(length, chunkOffset));
return fhBuilder.withLength(-1) // get length from compression metadata
.withCompressionMetadata(metadataWriter.open(length, chunkOffset));
else
return fhBuilder.withLength(length); // 0 or -1
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,10 @@ protected void resetBuffer()
buffer.clear();
}

public void updateFileHandle(FileHandle.Builder fhBuilder, long dataLength)
public FileHandle.Builder updateFileHandle(FileHandle.Builder fhBuilder, long dataLength)
{
// Set length to last content position to avoid having to read and decrypt the last chunk to find it.
fhBuilder.withLength(lastContent);
return fhBuilder.withLength(lastContent);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,9 +358,9 @@ protected SSTableReader openFinal(SSTableReader.OpenReason reason, StorageHandle
// need to move this inside the `try` so that the `storageHandler` callback can intercept reading issues.
// Which would imply being able to get at the compressed/uncompressed sizes upfront (directly from the
// writer, without reading the compression metadata written file) in some other way.
dataFile.updateFileHandle(dbuilder);

FileHandle dfile = dbuilder.bufferSize(dataBufferSize).complete();
FileHandle dfile = dataFile.updateFileHandle(dbuilder)
.bufferSize(dataBufferSize)
.complete();
invalidateCacheAtPreviousBoundary(dfile, Long.MAX_VALUE);

DecoratedKey firstMinimized = getMinimalKey(first);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,11 +204,15 @@ public boolean openEarly(Consumer<SSTableReader> callWhenReady)
IndexSummary indexSummary = iwriter.summary.build(metadata().partitioner, boundary);
long indexFileLength = descriptor.fileFor(Component.PRIMARY_INDEX).length();
int indexBufferSize = optimizationStrategy.bufferSize(indexFileLength / indexSummary.size());
iwriter.indexFile.updateFileHandle(iwriter.builder);
FileHandle ifile = iwriter.builder.bufferSize(indexBufferSize).withLength(boundary.indexLength).complete();
dataFile.updateFileHandle(dbuilder, boundary.dataLength);
FileHandle ifile = iwriter.indexFile.updateFileHandle(iwriter.builder)
.bufferSize(indexBufferSize)
.withLength(boundary.indexLength)
.complete();
int dataBufferSize = optimizationStrategy.bufferSize(stats.estimatedPartitionSize.percentile(DatabaseDescriptor.getDiskOptimizationEstimatePercentile()));
FileHandle dfile = dbuilder.bufferSize(dataBufferSize).withLength(boundary.dataLength).complete();
FileHandle dfile = dataFile.updateFileHandle(dbuilder, boundary.dataLength)
.bufferSize(dataBufferSize)
.withLength(boundary.dataLength)
.complete();
invalidateCacheAtPreviousBoundary(dfile, boundary.dataLength);
SSTableReader sstable = BigTableReader.internalOpen(descriptor,
components(), metadata,
Expand Down Expand Up @@ -244,8 +248,7 @@ protected SSTableReader openReader(SSTableReader.OpenReason reason, FileHandle d
IndexSummary indexSummary = iwriter.summary.build(metadata().partitioner);
long indexFileLength = descriptor.fileFor(Component.PRIMARY_INDEX).length();
int indexBufferSize = optimizationStrategy.bufferSize(indexFileLength / indexSummary.size());
iwriter.indexFile.updateFileHandle(iwriter.builder);
FileHandle ifile = iwriter.builder.bufferSize(indexBufferSize).complete();
FileHandle ifile = iwriter.indexFile.updateFileHandle(iwriter.builder).bufferSize(indexBufferSize).complete();
return SSTableReader.internalOpen(descriptor,
components(),
metadata,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public PartitionIndex(FileHandle fh, long trieRoot, long keyCount, DecoratedKey
this.version = version;
}

private PartitionIndex(PartitionIndex src)
protected PartitionIndex(PartitionIndex src)
{
this(src.fh, src.root, src.keyCount, src.first, src.last, src.filterFirst, src.filterLast, src.version);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,7 @@ private void refreshReadableBoundary()
if (partitionIndexSyncPosition < partialIndexPartitionEnd)
return;

writer.updateFileHandle(fhBuilder);
try (FileHandle fh = fhBuilder.withLength(writer.getLastFlushOffset()).complete())
try (FileHandle fh = writer.updateFileHandle(fhBuilder).complete())
{
PartitionIndex pi = new PartitionIndexEarly(fh,
partialIndexTail.root(),
Expand Down Expand Up @@ -187,7 +186,6 @@ public long complete() throws IOException
writer.writeLong(root);

writer.sync();
fhBuilder.withLength(writer.getLastFlushOffset());
writer.updateFileHandle(fhBuilder);

return root;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,19 @@ public PartitionIndexEarly(FileHandle fh, long trieRoot, long keyCount, Decorate
this.tail = tail;
}

protected PartitionIndexEarly(PartitionIndexEarly partitionIndexEarly)
{
super(partitionIndexEarly);
this.cutoff = partitionIndexEarly.cutoff;
this.tail = partitionIndexEarly.tail;
}

@Override
public PartitionIndex sharedCopy()
{
return new PartitionIndexEarly(this);
}

@Override
protected Rebufferer instantiateRebufferer()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,21 @@ class PartitionIterator extends PartitionIndex.IndexPosIterator implements Parti
this.rowIndexFile = rowIndexFile;
this.dataFile = dataFile;

readNext();
// first value can be off
if (nextKey != null && !(nextKey.compareTo(left) > inclusiveLeft))
try
{
readNext();
// first value can be off
if (nextKey != null && !(nextKey.compareTo(left) > inclusiveLeft))
{
readNext();
}
advance();
}
catch (Throwable t)
{
super.close();
throw t;
}
advance();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,12 +204,13 @@ public boolean openEarly(Consumer<SSTableReader> callWhenReady)
StatsMetadata stats = (StatsMetadata) finalMetadata.get(MetadataType.STATS);
CompactionMetadata compactionMetadata = (CompactionMetadata) finalMetadata.get(MetadataType.COMPACTION);

FileHandle ifile = iwriter.rowIndexFHBuilder.withLength(iwriter.rowIndexFile.getLastFlushOffset()).complete();
// With trie indices it is no longer necessary to limit the file size; just make sure indices and data
// get updated length / compression metadata.
dataFile.updateFileHandle(dbuilder, dataLength);
FileHandle ifile = iwriter.rowIndexFile.updateFileHandle(iwriter.rowIndexFHBuilder)
.complete();
int dataBufferSize = optimizationStrategy.bufferSize(stats.estimatedPartitionSize.percentile(DatabaseDescriptor.getDiskOptimizationEstimatePercentile()));
FileHandle dfile = dbuilder.bufferSize(dataBufferSize).withLength(dataLength).complete();
FileHandle dfile = dataFile.updateFileHandle(dbuilder, dataLength)
.bufferSize(dataBufferSize)
.withLength(dataLength)
.complete();
invalidateCacheAtPreviousBoundary(dfile, dataLength);
SSTableReader sstable = TrieIndexSSTableReader.internalOpen(descriptor,
components(), metadata,
Expand All @@ -232,6 +233,7 @@ public SSTableReader openFinalEarly()
// ensure outstanding openEarly actions are not triggered.
dataFile.sync();
iwriter.rowIndexFile.sync();
iwriter.rowIndexFile.updateFileHandle(iwriter.rowIndexFHBuilder);
// Note: Nothing must be written to any of the files after this point, as the chunk cache could pick up and
// retain a partially-written page (see DB-2446).

Expand Down Expand Up @@ -397,7 +399,6 @@ public long append(DecoratedKey key, RowIndexEntry indexEntry) throws IOExceptio

public boolean buildPartial(long dataPosition, Consumer<PartitionIndex> callWhenReady)
{
rowIndexFile.updateFileHandle(rowIndexFHBuilder);
return partitionIndex.buildPartial(callWhenReady, rowIndexFile.position(), dataPosition);
}

Expand Down Expand Up @@ -450,8 +451,6 @@ protected void doPrepare()

// truncate index file
rowIndexFile.prepareToCommit();
rowIndexFHBuilder.withLength(rowIndexFile.getLastFlushOffset());
//TODO figure out whether the update should be done before or after the prepare to commit
rowIndexFile.updateFileHandle(rowIndexFHBuilder);

complete();
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/io/tries/Walker.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public Walker(Rebufferer source, long root, ByteComparable.Version version)
bh = source.rebuffer(root);
buf = bh.buffer();
}
catch (RuntimeException ex)
catch (Throwable ex)
{
if (bh != null) bh.release();
source.closeReader();
Expand Down
8 changes: 4 additions & 4 deletions src/java/org/apache/cassandra/io/util/SequentialWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -308,15 +308,15 @@ public long paddedPosition()
return PageAware.padded(position());
}

public void updateFileHandle(FileHandle.Builder fhBuilder)
public FileHandle.Builder updateFileHandle(FileHandle.Builder fhBuilder)
{
updateFileHandle(fhBuilder, -1);
return updateFileHandle(fhBuilder, -1);
}

public void updateFileHandle(FileHandle.Builder fhBuilder, long dataLength)
public FileHandle.Builder updateFileHandle(FileHandle.Builder fhBuilder, long dataLength)
{
// Set actual length to avoid having to read it off the file system.
fhBuilder.withLength(dataLength > 0 ? dataLength : lastFlushOffset);
return fhBuilder.withLength(dataLength > 0 ? dataLength : lastFlushOffset);
}

/**
Expand Down
Loading