Skip to content

Commit c794818

Browse files
DSP-24615: Fix reading mmapped trie-index exceeding 2GiB (#1574)
### What is the issue Memory-mapping is done in buffers of size less than 2GiB. When these buffers aren't aligned to 4KiB and the trie-index file spans many buffers then reading it results in going out of buffer bounds. ### What does this PR fix and why was it fixed This patch fixes it by making sure that the buffers are correctly aligned.
1 parent 68f6267 commit c794818

File tree

3 files changed

+79
-55
lines changed

3 files changed

+79
-55
lines changed

src/java/org/apache/cassandra/io/util/FileHandle.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -515,9 +515,9 @@ private void updateRegions(ChannelProxy channel, long length, long startOffset)
515515
}
516516

517517
if (regions == null)
518-
regions = MmappedRegions.map(channel, length, startOffset, adviseRandom);
518+
regions = MmappedRegions.map(channel, length, bufferSize, startOffset, adviseRandom);
519519
else
520-
regions.extend(length);
520+
regions.extend(length, bufferSize);
521521
}
522522
}
523523

src/java/org/apache/cassandra/io/util/MmappedRegions.java

Lines changed: 23 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,6 @@
1919
package org.apache.cassandra.io.util;
2020

2121
import java.nio.ByteBuffer;
22-
import java.nio.ByteOrder;
23-
import java.nio.FloatBuffer;
24-
import java.nio.IntBuffer;
25-
import java.nio.LongBuffer;
2622
import java.nio.channels.FileChannel;
2723
import java.util.Arrays;
2824

@@ -61,34 +57,22 @@ public class MmappedRegions extends SharedCloseableImpl
6157
*/
6258
private volatile State copy;
6359

64-
private MmappedRegions(ChannelProxy channel, CompressionMetadata metadata, long length, long uncompressedSliceOffset, boolean adviseRandom)
60+
private MmappedRegions(State state, CompressionMetadata metadata, long uncompressedSliceOffset)
6561
{
66-
this(new State(channel, offsetFrom(metadata, uncompressedSliceOffset), adviseRandom),
67-
metadata,
68-
length,
69-
uncompressedSliceOffset);
70-
}
71-
72-
private static long offsetFrom(CompressionMetadata metadata, long uncompressedSliceOffset)
73-
{
74-
return metadata == null ? uncompressedSliceOffset : metadata.chunkFor(uncompressedSliceOffset).offset;
62+
super(new Tidier(state));
63+
this.state = state;
64+
updateState(metadata, uncompressedSliceOffset);
65+
this.copy = new State(state);
7566
}
7667

77-
private MmappedRegions(State state, CompressionMetadata metadata, long length, long uncompressedOffset)
68+
private MmappedRegions(State state, long length, int chunkSize)
7869
{
7970
super(new Tidier(state));
8071

8172
this.state = state;
8273

83-
if (metadata != null)
84-
{
85-
assert length == 0 : "expected no length with metadata";
86-
updateState(metadata, uncompressedOffset);
87-
}
88-
else if (length > 0)
89-
{
90-
updateState(length);
91-
}
74+
if (length > 0)
75+
updateState(length, chunkSize);
9276

9377
this.copy = new State(state);
9478
}
@@ -101,7 +85,7 @@ private MmappedRegions(MmappedRegions original)
10185

10286
public static MmappedRegions empty(ChannelProxy channel)
10387
{
104-
return new MmappedRegions(channel, null, 0, 0, false);
88+
return new MmappedRegions(new State(channel, 0, false), 0, 0);
10589
}
10690

10791
/**
@@ -119,16 +103,17 @@ public static MmappedRegions map(ChannelProxy channel, CompressionMetadata metad
119103
{
120104
if (metadata == null)
121105
throw new IllegalArgumentException("metadata cannot be null");
122-
123-
return new MmappedRegions(channel, metadata, 0, uncompressedSliceOffset, adviseRandom);
106+
State state = new State(channel, metadata.chunkFor(uncompressedSliceOffset).offset, adviseRandom);
107+
return new MmappedRegions(state, metadata, uncompressedSliceOffset);
124108
}
125109

126-
public static MmappedRegions map(ChannelProxy channel, long length, long uncompressedSliceOffset, boolean adviseRandom)
110+
public static MmappedRegions map(ChannelProxy channel, long length, int chunkSize, long uncompressedSliceOffset, boolean adviseRandom)
127111
{
128112
if (length <= 0)
129113
throw new IllegalArgumentException("Length must be positive");
130114

131-
return new MmappedRegions(channel, null, length, uncompressedSliceOffset, adviseRandom);
115+
State state = new State(channel, uncompressedSliceOffset, adviseRandom);
116+
return new MmappedRegions(state, length, chunkSize);
132117
}
133118

134119
/**
@@ -145,8 +130,10 @@ private boolean isCopy()
145130
return copy == null;
146131
}
147132

148-
public void extend(long length)
133+
public void extend(long length, int chunkSize)
149134
{
135+
// We cannot enforce length to be a multiple of chunkSize (at the very least the last extend on a file
136+
// will not satisfy this), so we hope the caller knows what they are doing.
150137
if (length < 0)
151138
throw new IllegalArgumentException("Length must not be negative");
152139

@@ -155,17 +142,19 @@ public void extend(long length)
155142
if (length <= state.length)
156143
return;
157144

158-
updateState(length);
145+
updateState(length, chunkSize);
159146
copy = new State(state);
160147
}
161148

162-
private void updateState(long length)
149+
private void updateState(long length, int chunkSize)
163150
{
151+
// make sure the regions span whole chunks
152+
long maxSize = (long) (MAX_SEGMENT_SIZE / chunkSize) * chunkSize;
164153
state.length = length;
165154
long pos = state.getPosition();
166155
while (pos < length)
167156
{
168-
long size = Math.min(MAX_SEGMENT_SIZE, length - pos);
157+
long size = Math.min(maxSize, length - pos);
169158
state.add(pos, size);
170159
pos += size;
171160
}
@@ -292,7 +281,7 @@ private static final class State
292281
private final long onDiskSliceOffset;
293282

294283
/** whether to apply fadv_random to mapped regions */
295-
private boolean adviseRandom;
284+
private final boolean adviseRandom;
296285

297286
private State(ChannelProxy channel, long onDiskSliceOffset, boolean adviseRandom)
298287
{

test/unit/org/apache/cassandra/io/util/MmappedRegionsTest.java

Lines changed: 54 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -96,10 +96,11 @@ public void testEmpty() throws Exception
9696
public void testTwoSegments() throws Exception
9797
{
9898
ByteBuffer buffer = allocateBuffer(2048);
99+
int bufSize = 1024;
99100
try(ChannelProxy channel = new ChannelProxy(writeFile("testTwoSegments", buffer));
100101
MmappedRegions regions = MmappedRegions.empty(channel))
101102
{
102-
regions.extend(1024);
103+
regions.extend(1024, bufSize);
103104
for (int i = 0; i < 1024; i++)
104105
{
105106
MmappedRegions.Region region = regions.floor(i);
@@ -108,7 +109,7 @@ public void testTwoSegments() throws Exception
108109
assertEquals(1024, region.end());
109110
}
110111

111-
regions.extend(2048);
112+
regions.extend(2048, bufSize);
112113
for (int i = 0; i < 2048; i++)
113114
{
114115
MmappedRegions.Region region = regions.floor(i);
@@ -131,15 +132,16 @@ public void testTwoSegments() throws Exception
131132
public void testSmallSegmentSize() throws Exception
132133
{
133134
int OLD_MAX_SEGMENT_SIZE = MmappedRegions.MAX_SEGMENT_SIZE;
135+
int bufSize = 1024;
134136
MmappedRegions.MAX_SEGMENT_SIZE = 1024;
135137

136138
ByteBuffer buffer = allocateBuffer(4096);
137139
try(ChannelProxy channel = new ChannelProxy(writeFile("testSmallSegmentSize", buffer));
138140
MmappedRegions regions = MmappedRegions.empty(channel))
139141
{
140-
regions.extend(1024);
141-
regions.extend(2048);
142-
regions.extend(4096);
142+
regions.extend(1024, bufSize);
143+
regions.extend(2048, bufSize);
144+
regions.extend(4096, bufSize);
143145

144146
final int SIZE = MmappedRegions.MAX_SEGMENT_SIZE;
145147
for (int i = 0; i < buffer.capacity(); i++)
@@ -156,18 +158,46 @@ public void testSmallSegmentSize() throws Exception
156158
}
157159
}
158160

161+
@Test
162+
public void testSizeIsChunkMultiple() throws Exception
163+
{
164+
final int oldMaxSegmentSize = MmappedRegions.MAX_SEGMENT_SIZE;
165+
final int bufSize = 1024;
166+
MmappedRegions.MAX_SEGMENT_SIZE = 2047;
167+
ByteBuffer buffer = allocateBuffer(4096);
168+
try(ChannelProxy channel = new ChannelProxy(writeFile("testSmallSegmentSize", buffer));
169+
MmappedRegions regions = MmappedRegions.empty(channel))
170+
{
171+
regions.extend(1024, bufSize);
172+
regions.extend(2048, bufSize);
173+
regions.extend(4096, bufSize);
174+
for (int i = 0; i < buffer.capacity(); i++)
175+
{
176+
MmappedRegions.Region region = regions.floor(i);
177+
assertNotNull(region);
178+
assertEquals(bufSize * (i / bufSize), region.offset());
179+
assertEquals(bufSize + (bufSize * (i / bufSize)), region.end());
180+
}
181+
}
182+
finally
183+
{
184+
MmappedRegions.MAX_SEGMENT_SIZE = oldMaxSegmentSize;
185+
}
186+
}
187+
159188
@Test
160189
public void testAllocRegions() throws Exception
161190
{
162191
int OLD_MAX_SEGMENT_SIZE = MmappedRegions.MAX_SEGMENT_SIZE;
163192
MmappedRegions.MAX_SEGMENT_SIZE = 1024;
193+
int bufSize = 1024;
164194

165195
ByteBuffer buffer = allocateBuffer(MmappedRegions.MAX_SEGMENT_SIZE * MmappedRegions.REGION_ALLOC_SIZE * 3);
166196

167197
try(ChannelProxy channel = new ChannelProxy(writeFile("testAllocRegions", buffer));
168198
MmappedRegions regions = MmappedRegions.empty(channel))
169199
{
170-
regions.extend(buffer.capacity());
200+
regions.extend(buffer.capacity(), bufSize);
171201

172202
final int SIZE = MmappedRegions.MAX_SEGMENT_SIZE;
173203
for (int i = 0; i < buffer.capacity(); i++)
@@ -188,17 +218,18 @@ public void testAllocRegions() throws Exception
188218
public void testCopy() throws Exception
189219
{
190220
ByteBuffer buffer = allocateBuffer(128 * 1024);
221+
int bufSize = 4096;
191222

192223
MmappedRegions snapshot;
193224
ChannelProxy channelCopy;
194225

195226
try(ChannelProxy channel = new ChannelProxy(writeFile("testSnapshot", buffer));
196-
MmappedRegions regions = MmappedRegions.map(channel, buffer.capacity() / 4, 0, false))
227+
MmappedRegions regions = MmappedRegions.map(channel, buffer.capacity() / 4, bufSize, 0, false))
197228
{
198229
// create 3 more segments, one per quater capacity
199-
regions.extend(buffer.capacity() / 2);
200-
regions.extend(3 * buffer.capacity() / 4);
201-
regions.extend(buffer.capacity());
230+
regions.extend(buffer.capacity() / 2, bufSize);
231+
regions.extend(3 * buffer.capacity() / 4, bufSize);
232+
regions.extend(buffer.capacity(), bufSize);
202233

203234
// make a snapshot
204235
snapshot = regions.sharedCopy();
@@ -230,14 +261,15 @@ public void testCopy() throws Exception
230261
public void testCopyCannotExtend() throws Exception
231262
{
232263
ByteBuffer buffer = allocateBuffer(128 * 1024);
264+
int bufSize = 1024;
233265

234266
MmappedRegions snapshot;
235267
ChannelProxy channelCopy;
236268

237269
try(ChannelProxy channel = new ChannelProxy(writeFile("testSnapshotCannotExtend", buffer));
238270
MmappedRegions regions = MmappedRegions.empty(channel))
239271
{
240-
regions.extend(buffer.capacity() / 2);
272+
regions.extend(buffer.capacity() / 2, bufSize);
241273

242274
// make a snapshot
243275
snapshot = regions.sharedCopy();
@@ -248,7 +280,7 @@ public void testCopyCannotExtend() throws Exception
248280

249281
try
250282
{
251-
snapshot.extend(buffer.capacity());
283+
snapshot.extend(buffer.capacity(), bufSize);
252284
}
253285
finally
254286
{
@@ -261,12 +293,13 @@ public void testCopyCannotExtend() throws Exception
261293
public void testExtendOutOfOrder() throws Exception
262294
{
263295
ByteBuffer buffer = allocateBuffer(4096);
296+
int bufSize = 1024;
264297
try(ChannelProxy channel = new ChannelProxy(writeFile("testExtendOutOfOrder", buffer));
265298
MmappedRegions regions = MmappedRegions.empty(channel))
266299
{
267-
regions.extend(4096);
268-
regions.extend(1024);
269-
regions.extend(2048);
300+
regions.extend(4096, bufSize);
301+
regions.extend(1024, bufSize);
302+
regions.extend(2048, bufSize);
270303

271304
for (int i = 0; i < buffer.capacity(); i++)
272305
{
@@ -282,10 +315,11 @@ public void testExtendOutOfOrder() throws Exception
282315
public void testNegativeExtend() throws Exception
283316
{
284317
ByteBuffer buffer = allocateBuffer(1024);
318+
int bufSize = 1024;
285319
try(ChannelProxy channel = new ChannelProxy(writeFile("testNegativeExtend", buffer));
286320
MmappedRegions regions = MmappedRegions.empty(channel))
287321
{
288-
regions.extend(-1);
322+
regions.extend(-1, bufSize);
289323
}
290324
}
291325

@@ -346,8 +380,9 @@ public void testMapForCompressionMetadata() throws Exception
346380
public void testIllegalArgForMap1() throws Exception
347381
{
348382
ByteBuffer buffer = allocateBuffer(1024);
383+
int bufSize = 1024;
349384
try(ChannelProxy channel = new ChannelProxy(writeFile("testIllegalArgForMap1", buffer));
350-
MmappedRegions regions = MmappedRegions.map(channel, 0, 0, false))
385+
MmappedRegions regions = MmappedRegions.map(channel, 0, bufSize, 0, false))
351386
{
352387
assertTrue(regions.isEmpty());
353388
}
@@ -357,8 +392,9 @@ public void testIllegalArgForMap1() throws Exception
357392
public void testIllegalArgForMap2() throws Exception
358393
{
359394
ByteBuffer buffer = allocateBuffer(1024);
395+
int bufSize = 1024;
360396
try(ChannelProxy channel = new ChannelProxy(writeFile("testIllegalArgForMap2", buffer));
361-
MmappedRegions regions = MmappedRegions.map(channel, -1L, 0, false))
397+
MmappedRegions regions = MmappedRegions.map(channel, -1L, bufSize, 0, false))
362398
{
363399
assertTrue(regions.isEmpty());
364400
}
@@ -374,5 +410,4 @@ public void testIllegalArgForMap3() throws Exception
374410
assertTrue(regions.isEmpty());
375411
}
376412
}
377-
378413
}

0 commit comments

Comments
 (0)