Skip to content

Commit 5092a98

Browse files
szymon-miezaldjatnieks
authored andcommitted
DSP-24615: Fix reading mmapped trie-index exceeding 2GiB (#1574)
Memory-mapping is done in buffers of size less than 2GiB. When these buffers aren't aligned to 4KiB and the trie-index file spans many buffers then reading it results in going out of buffer bounds. This patch fixes it by making sure that the buffers are correctly aligned.
1 parent 6d59a53 commit 5092a98

File tree

4 files changed

+99
-79
lines changed

4 files changed

+99
-79
lines changed

src/java/org/apache/cassandra/io/util/FileHandle.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -428,14 +428,14 @@ else if (mmapped)
428428
{
429429
if (compressionMetadata != null)
430430
{
431-
regions = mmappedRegionsCache != null ? mmappedRegionsCache.getOrCreate(channel, compressionMetadata, sliceDescriptor.sliceStart)
431+
regions = mmappedRegionsCache != null ? mmappedRegionsCache.getOrCreate(channel, compressionMetadata, bufferSize, sliceDescriptor.sliceStart)
432432
: MmappedRegions.map(channel, compressionMetadata, sliceDescriptor.sliceStart, adviseRandom);
433433
rebuffererFactory = maybeCached(new CompressedChunkReader.Mmap(channel, compressionMetadata, regions, crcCheckChanceSupplier, sliceDescriptor.sliceStart));
434434
}
435435
else
436436
{
437-
regions = mmappedRegionsCache != null ? mmappedRegionsCache.getOrCreate(channel, sliceDescriptor.dataEndOr(length), sliceDescriptor.sliceStart)
438-
: MmappedRegions.map(channel, sliceDescriptor.dataEndOr(length), sliceDescriptor.sliceStart, adviseRandom);
437+
regions = mmappedRegionsCache != null ? mmappedRegionsCache.getOrCreate(channel, sliceDescriptor.dataEndOr(length), bufferSize, sliceDescriptor.sliceStart)
438+
: MmappedRegions.map(channel, sliceDescriptor.dataEndOr(length), bufferSize, sliceDescriptor.sliceStart, adviseRandom);
439439
rebuffererFactory = new MmapRebufferer(channel, sliceDescriptor.dataEndOr(length), regions);
440440
}
441441
}

src/java/org/apache/cassandra/io/util/MmappedRegions.java

Lines changed: 28 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -63,37 +63,21 @@ public class MmappedRegions extends SharedCloseableImpl
6363
*/
6464
private volatile State copy;
6565

66-
private final long uncompressedSliceOffset;
67-
68-
private MmappedRegions(ChannelProxy channel, CompressionMetadata metadata, long length, long uncompressedSliceOffset, boolean adviseRandom)
66+
private MmappedRegions(State state, CompressionMetadata metadata, long uncompressedSliceOffset)
6967
{
70-
this(new State(channel, offsetFrom(metadata, uncompressedSliceOffset), adviseRandom),
71-
metadata,
72-
length,
73-
uncompressedSliceOffset);
74-
}
75-
76-
private static long offsetFrom(CompressionMetadata metadata, long uncompressedSliceOffset)
77-
{
78-
return metadata == null ? uncompressedSliceOffset : metadata.chunkFor(uncompressedSliceOffset).offset;
68+
super(new Tidier(state));
69+
this.state = state;
70+
updateState(metadata, uncompressedSliceOffset);
71+
this.copy = new State(state);
7972
}
8073

81-
private MmappedRegions(State state, CompressionMetadata metadata, long length, long uncompressedSliceOffset)
74+
private MmappedRegions(State state, long length, int chunkSize)
8275
{
8376
super(new Tidier(state));
8477

8578
this.state = state;
86-
this.uncompressedSliceOffset = uncompressedSliceOffset;
87-
88-
if (metadata != null)
89-
{
90-
assert length == 0 : "expected no length with metadata";
91-
updateState(metadata);
92-
}
93-
else if (length > 0)
94-
{
95-
updateState(length);
96-
}
79+
if (length > 0)
80+
updateState(length, chunkSize);
9781

9882
this.copy = new State(state);
9983
}
@@ -102,12 +86,11 @@ private MmappedRegions(MmappedRegions original)
10286
{
10387
super(original);
10488
this.state = original.copy;
105-
this.uncompressedSliceOffset = original.uncompressedSliceOffset;
10689
}
10790

10891
public static MmappedRegions empty(ChannelProxy channel)
10992
{
110-
return new MmappedRegions(channel, null, 0, 0, false);
93+
return new MmappedRegions(new State(channel, 0, false), 0, 0);
11194
}
11295

11396
/**
@@ -125,16 +108,17 @@ public static MmappedRegions map(ChannelProxy channel, CompressionMetadata metad
125108
{
126109
if (metadata == null)
127110
throw new IllegalArgumentException("metadata cannot be null");
128-
129-
return new MmappedRegions(channel, metadata, 0, uncompressedSliceOffset, adviseRandom);
111+
State state = new State(channel, metadata.chunkFor(uncompressedSliceOffset).offset, adviseRandom);
112+
return new MmappedRegions(state, metadata, uncompressedSliceOffset);
130113
}
131114

132-
public static MmappedRegions map(ChannelProxy channel, long length, long uncompressedSliceOffset, boolean adviseRandom)
115+
public static MmappedRegions map(ChannelProxy channel, long length, int chunkSize, long uncompressedSliceOffset, boolean adviseRandom)
133116
{
134117
if (length <= 0)
135118
throw new IllegalArgumentException("Length must be positive");
136119

137-
return new MmappedRegions(channel, null, length, uncompressedSliceOffset, adviseRandom);
120+
State state = new State(channel, uncompressedSliceOffset, adviseRandom);
121+
return new MmappedRegions(state, length, chunkSize);
138122
}
139123

140124
/**
@@ -153,23 +137,21 @@ private boolean isCopy()
153137

154138
/**
155139
* Extends this collection of mmapped regions up to the provided total length.
156-
*
157-
* @return {@code true} if new regions have been created
158140
*/
159-
public boolean extend(long length)
141+
public void extend(long length, int chunkSize)
160142
{
143+
// We cannot enforce length to be a multiple of chunkSize (at the very least the last extend on a file
144+
// will not satisfy this), so we hope the caller knows what they are doing.
161145
if (length < 0)
162146
throw new IllegalArgumentException("Length must not be negative");
163147

164148
assert !isCopy() : "Copies cannot be extended";
165149

166150
if (length <= state.length)
167-
return false;
151+
return;
168152

169-
int initialRegions = state.last;
170-
updateState(length);
153+
updateState(length, chunkSize);
171154
copy = new State(state);
172-
return state.last > initialRegions;
173155
}
174156

175157
/**
@@ -178,7 +160,7 @@ public boolean extend(long length)
178160
*
179161
* @return {@code true} if new regions have been created
180162
*/
181-
public boolean extend(CompressionMetadata compressionMetadata)
163+
public boolean extend(CompressionMetadata compressionMetadata, int chunkSize, long uncompressedSliceOffset)
182164
{
183165
assert !isCopy() : "Copies cannot be extended";
184166

@@ -187,9 +169,9 @@ public boolean extend(CompressionMetadata compressionMetadata)
187169

188170
int initialRegions = state.last;
189171
if (compressionMetadata.compressedFileLength - state.length <= MAX_SEGMENT_SIZE)
190-
updateState(compressionMetadata.compressedFileLength);
172+
updateState(compressionMetadata.compressedFileLength, chunkSize);
191173
else
192-
updateState(compressionMetadata);
174+
updateState(compressionMetadata, uncompressedSliceOffset);
193175

194176
copy = new State(state);
195177
return state.last > initialRegions;
@@ -199,19 +181,21 @@ public boolean extend(CompressionMetadata compressionMetadata)
199181
* Updates state by adding the remaining segments. It starts with the current state last segment end position and
200182
* subsequently add new segments until all data up to the provided length are mapped.
201183
*/
202-
private void updateState(long length)
184+
private void updateState(long length, int chunkSize)
203185
{
186+
// make sure the regions span whole chunks
187+
long maxSize = (long) (MAX_SEGMENT_SIZE / chunkSize) * chunkSize;
204188
state.length = length;
205189
long pos = state.getPosition();
206190
while (pos < length)
207191
{
208-
long size = Math.min(MAX_SEGMENT_SIZE, length - pos);
192+
long size = Math.min(maxSize, length - pos);
209193
state.add(pos, size);
210194
pos += size;
211195
}
212196
}
213197

214-
private void updateState(CompressionMetadata metadata)
198+
private void updateState(CompressionMetadata metadata, long uncompressedSliceOffset)
215199
{
216200
long uncompressedPosition = metadata.getDataOffsetForChunkOffset(state.getPosition()); // uncompressed position of the current compressed chunk in the original (compressed) file
217201
long compressedPosition = state.getPosition(); // position on disk of the current compressed chunk in the original (compressed) file
@@ -342,7 +326,7 @@ private static final class State
342326
private final long onDiskSliceOffset;
343327

344328
/** whether to apply fadv_random to mapped regions */
345-
private boolean adviseRandom;
329+
private final boolean adviseRandom;
346330

347331
private State(ChannelProxy channel, long onDiskSliceOffset, boolean adviseRandom)
348332
{

src/java/org/apache/cassandra/io/util/MmappedRegionsCache.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,12 @@ public class MmappedRegionsCache implements AutoCloseable
4545
* @param length length of the file
4646
* @return a shared copy of the cached mmapped regions
4747
*/
48-
public MmappedRegions getOrCreate(ChannelProxy channel, long length, long uncompressedSliceOffset)
48+
public MmappedRegions getOrCreate(ChannelProxy channel, long length, int bufferSize, long uncompressedSliceOffset)
4949
{
5050
Preconditions.checkState(!closed);
51-
MmappedRegions regions = cache.computeIfAbsent(channel.file(), ignored -> MmappedRegions.map(channel, length, uncompressedSliceOffset, false));
51+
MmappedRegions regions = cache.computeIfAbsent(channel.file(), ignored -> MmappedRegions.map(channel, length, bufferSize, uncompressedSliceOffset, false));
5252
Preconditions.checkArgument(regions.isValid(channel));
53-
regions.extend(length);
53+
regions.extend(length, bufferSize);
5454
return regions.sharedCopy();
5555
}
5656

@@ -62,12 +62,12 @@ public MmappedRegions getOrCreate(ChannelProxy channel, long length, long uncomp
6262
* @param metadata compression metadata of the file
6363
* @return a shared copy of the cached mmapped regions
6464
*/
65-
public MmappedRegions getOrCreate(ChannelProxy channel, CompressionMetadata metadata, long uncompressedSliceOffset)
65+
public MmappedRegions getOrCreate(ChannelProxy channel, CompressionMetadata metadata, int bufferSize, long uncompressedSliceOffset)
6666
{
6767
Preconditions.checkState(!closed);
6868
MmappedRegions regions = cache.computeIfAbsent(channel.file(), ignored -> MmappedRegions.map(channel, metadata, uncompressedSliceOffset, false));
6969
Preconditions.checkArgument(regions.isValid(channel));
70-
regions.extend(metadata);
70+
regions.extend(metadata, bufferSize, uncompressedSliceOffset);
7171
return regions.sharedCopy();
7272
}
7373

0 commit comments

Comments
 (0)