diff --git a/src/java/org/apache/cassandra/io/util/FileHandle.java b/src/java/org/apache/cassandra/io/util/FileHandle.java index e01f6ba1169..c85903692dd 100644 --- a/src/java/org/apache/cassandra/io/util/FileHandle.java +++ b/src/java/org/apache/cassandra/io/util/FileHandle.java @@ -515,9 +515,9 @@ private void updateRegions(ChannelProxy channel, long length, long startOffset) } if (regions == null) - regions = MmappedRegions.map(channel, length, startOffset, adviseRandom); + regions = MmappedRegions.map(channel, length, bufferSize, startOffset, adviseRandom); else - regions.extend(length); + regions.extend(length, bufferSize); } } diff --git a/src/java/org/apache/cassandra/io/util/MmappedRegions.java b/src/java/org/apache/cassandra/io/util/MmappedRegions.java index c1b87fa31e6..896d36aad22 100644 --- a/src/java/org/apache/cassandra/io/util/MmappedRegions.java +++ b/src/java/org/apache/cassandra/io/util/MmappedRegions.java @@ -19,10 +19,6 @@ package org.apache.cassandra.io.util; import java.nio.ByteBuffer; -import java.nio.ByteOrder; -import java.nio.FloatBuffer; -import java.nio.IntBuffer; -import java.nio.LongBuffer; import java.nio.channels.FileChannel; import java.util.Arrays; @@ -61,34 +57,22 @@ public class MmappedRegions extends SharedCloseableImpl */ private volatile State copy; - private MmappedRegions(ChannelProxy channel, CompressionMetadata metadata, long length, long uncompressedSliceOffset, boolean adviseRandom) + private MmappedRegions(State state, CompressionMetadata metadata, long uncompressedSliceOffset) { - this(new State(channel, offsetFrom(metadata, uncompressedSliceOffset), adviseRandom), - metadata, - length, - uncompressedSliceOffset); - } - - private static long offsetFrom(CompressionMetadata metadata, long uncompressedSliceOffset) - { - return metadata == null ? uncompressedSliceOffset : metadata.chunkFor(uncompressedSliceOffset).offset; + super(new Tidier(state)); + this.state = state; + updateState(metadata, uncompressedSliceOffset); + this.copy = new State(state); } - private MmappedRegions(State state, CompressionMetadata metadata, long length, long uncompressedOffset) + private MmappedRegions(State state, long length, int chunkSize) { super(new Tidier(state)); this.state = state; - if (metadata != null) - { - assert length == 0 : "expected no length with metadata"; - updateState(metadata, uncompressedOffset); - } - else if (length > 0) - { - updateState(length); - } + if (length > 0) + updateState(length, chunkSize); this.copy = new State(state); } @@ -101,7 +85,7 @@ private MmappedRegions(MmappedRegions original) public static MmappedRegions empty(ChannelProxy channel) { - return new MmappedRegions(channel, null, 0, 0, false); + return new MmappedRegions(new State(channel, 0, false), 0, 0); } /** @@ -119,16 +103,17 @@ public static MmappedRegions map(ChannelProxy channel, CompressionMetadata metad { if (metadata == null) throw new IllegalArgumentException("metadata cannot be null"); - - return new MmappedRegions(channel, metadata, 0, uncompressedSliceOffset, adviseRandom); + State state = new State(channel, metadata.chunkFor(uncompressedSliceOffset).offset, adviseRandom); + return new MmappedRegions(state, metadata, uncompressedSliceOffset); } - public static MmappedRegions map(ChannelProxy channel, long length, long uncompressedSliceOffset, boolean adviseRandom) + public static MmappedRegions map(ChannelProxy channel, long length, int chunkSize, long uncompressedSliceOffset, boolean adviseRandom) { if (length <= 0) throw new IllegalArgumentException("Length must be positive"); - return new MmappedRegions(channel, null, length, uncompressedSliceOffset, adviseRandom); + State state = new State(channel, uncompressedSliceOffset, adviseRandom); + return new MmappedRegions(state, length, chunkSize); } /** @@ -145,8 +130,10 @@ private boolean isCopy() return copy == null; } - public void extend(long length) + public void extend(long length, int chunkSize) { + // We cannot enforce length to be a multiple of chunkSize (at the very least the last extend on a file + // will not satisfy this), so we hope the caller knows what they are doing. if (length < 0) throw new IllegalArgumentException("Length must not be negative"); @@ -155,17 +142,19 @@ public void extend(long length) if (length <= state.length) return; - updateState(length); + updateState(length, chunkSize); copy = new State(state); } - private void updateState(long length) + private void updateState(long length, int chunkSize) { + // make sure the regions span whole chunks + long maxSize = (long) (MAX_SEGMENT_SIZE / chunkSize) * chunkSize; state.length = length; long pos = state.getPosition(); while (pos < length) { - long size = Math.min(MAX_SEGMENT_SIZE, length - pos); + long size = Math.min(maxSize, length - pos); state.add(pos, size); pos += size; } @@ -292,7 +281,7 @@ private static final class State private final long onDiskSliceOffset; /** whether to apply fadv_random to mapped regions */ - private boolean adviseRandom; + private final boolean adviseRandom; private State(ChannelProxy channel, long onDiskSliceOffset, boolean adviseRandom) { diff --git a/test/unit/org/apache/cassandra/io/util/MmappedRegionsTest.java b/test/unit/org/apache/cassandra/io/util/MmappedRegionsTest.java index 3a5446d44b5..c5673e92a8b 100644 --- a/test/unit/org/apache/cassandra/io/util/MmappedRegionsTest.java +++ b/test/unit/org/apache/cassandra/io/util/MmappedRegionsTest.java @@ -96,10 +96,11 @@ public void testEmpty() throws Exception public void testTwoSegments() throws Exception { ByteBuffer buffer = allocateBuffer(2048); + int bufSize = 1024; try(ChannelProxy channel = new ChannelProxy(writeFile("testTwoSegments", buffer)); MmappedRegions regions = MmappedRegions.empty(channel)) { - regions.extend(1024); + regions.extend(1024, bufSize); for (int i = 0; i < 1024; i++) { MmappedRegions.Region region = regions.floor(i); @@ -108,7 +109,7 @@ public void testTwoSegments() throws Exception assertEquals(1024, region.end()); } - regions.extend(2048); + regions.extend(2048, bufSize); for (int i = 0; i < 2048; i++) { MmappedRegions.Region region = regions.floor(i); @@ -131,15 +132,16 @@ public void testTwoSegments() throws Exception public void testSmallSegmentSize() throws Exception { int OLD_MAX_SEGMENT_SIZE = MmappedRegions.MAX_SEGMENT_SIZE; + int bufSize = 1024; MmappedRegions.MAX_SEGMENT_SIZE = 1024; ByteBuffer buffer = allocateBuffer(4096); try(ChannelProxy channel = new ChannelProxy(writeFile("testSmallSegmentSize", buffer)); MmappedRegions regions = MmappedRegions.empty(channel)) { - regions.extend(1024); - regions.extend(2048); - regions.extend(4096); + regions.extend(1024, bufSize); + regions.extend(2048, bufSize); + regions.extend(4096, bufSize); final int SIZE = MmappedRegions.MAX_SEGMENT_SIZE; for (int i = 0; i < buffer.capacity(); i++) @@ -156,18 +158,46 @@ public void testSmallSegmentSize() throws Exception } } + @Test + public void testSizeIsChunkMultiple() throws Exception + { + final int oldMaxSegmentSize = MmappedRegions.MAX_SEGMENT_SIZE; + final int bufSize = 1024; + MmappedRegions.MAX_SEGMENT_SIZE = 2047; + ByteBuffer buffer = allocateBuffer(4096); + try(ChannelProxy channel = new ChannelProxy(writeFile("testSmallSegmentSize", buffer)); + MmappedRegions regions = MmappedRegions.empty(channel)) + { + regions.extend(1024, bufSize); + regions.extend(2048, bufSize); + regions.extend(4096, bufSize); + for (int i = 0; i < buffer.capacity(); i++) + { + MmappedRegions.Region region = regions.floor(i); + assertNotNull(region); + assertEquals(bufSize * (i / bufSize), region.offset()); + assertEquals(bufSize + (bufSize * (i / bufSize)), region.end()); + } + } + finally + { + MmappedRegions.MAX_SEGMENT_SIZE = oldMaxSegmentSize; + } + } + @Test public void testAllocRegions() throws Exception { int OLD_MAX_SEGMENT_SIZE = MmappedRegions.MAX_SEGMENT_SIZE; MmappedRegions.MAX_SEGMENT_SIZE = 1024; + int bufSize = 1024; ByteBuffer buffer = allocateBuffer(MmappedRegions.MAX_SEGMENT_SIZE * MmappedRegions.REGION_ALLOC_SIZE * 3); try(ChannelProxy channel = new ChannelProxy(writeFile("testAllocRegions", buffer)); MmappedRegions regions = MmappedRegions.empty(channel)) { - regions.extend(buffer.capacity()); + regions.extend(buffer.capacity(), bufSize); final int SIZE = MmappedRegions.MAX_SEGMENT_SIZE; for (int i = 0; i < buffer.capacity(); i++) @@ -188,17 +218,18 @@ public void testAllocRegions() throws Exception public void testCopy() throws Exception { ByteBuffer buffer = allocateBuffer(128 * 1024); + int bufSize = 4096; MmappedRegions snapshot; ChannelProxy channelCopy; try(ChannelProxy channel = new ChannelProxy(writeFile("testSnapshot", buffer)); - MmappedRegions regions = MmappedRegions.map(channel, buffer.capacity() / 4, 0, false)) + MmappedRegions regions = MmappedRegions.map(channel, buffer.capacity() / 4, bufSize, 0, false)) { // create 3 more segments, one per quater capacity - regions.extend(buffer.capacity() / 2); - regions.extend(3 * buffer.capacity() / 4); - regions.extend(buffer.capacity()); + regions.extend(buffer.capacity() / 2, bufSize); + regions.extend(3 * buffer.capacity() / 4, bufSize); + regions.extend(buffer.capacity(), bufSize); // make a snapshot snapshot = regions.sharedCopy(); @@ -230,6 +261,7 @@ public void testCopy() throws Exception public void testCopyCannotExtend() throws Exception { ByteBuffer buffer = allocateBuffer(128 * 1024); + int bufSize = 1024; MmappedRegions snapshot; ChannelProxy channelCopy; @@ -237,7 +269,7 @@ public void testCopyCannotExtend() throws Exception try(ChannelProxy channel = new ChannelProxy(writeFile("testSnapshotCannotExtend", buffer)); MmappedRegions regions = MmappedRegions.empty(channel)) { - regions.extend(buffer.capacity() / 2); + regions.extend(buffer.capacity() / 2, bufSize); // make a snapshot snapshot = regions.sharedCopy(); @@ -248,7 +280,7 @@ public void testCopyCannotExtend() throws Exception try { - snapshot.extend(buffer.capacity()); + snapshot.extend(buffer.capacity(), bufSize); } finally { @@ -261,12 +293,13 @@ public void testCopyCannotExtend() throws Exception public void testExtendOutOfOrder() throws Exception { ByteBuffer buffer = allocateBuffer(4096); + int bufSize = 1024; try(ChannelProxy channel = new ChannelProxy(writeFile("testExtendOutOfOrder", buffer)); MmappedRegions regions = MmappedRegions.empty(channel)) { - regions.extend(4096); - regions.extend(1024); - regions.extend(2048); + regions.extend(4096, bufSize); + regions.extend(1024, bufSize); + regions.extend(2048, bufSize); for (int i = 0; i < buffer.capacity(); i++) { @@ -282,10 +315,11 @@ public void testExtendOutOfOrder() throws Exception public void testNegativeExtend() throws Exception { ByteBuffer buffer = allocateBuffer(1024); + int bufSize = 1024; try(ChannelProxy channel = new ChannelProxy(writeFile("testNegativeExtend", buffer)); MmappedRegions regions = MmappedRegions.empty(channel)) { - regions.extend(-1); + regions.extend(-1, bufSize); } } @@ -346,8 +380,9 @@ public void testMapForCompressionMetadata() throws Exception public void testIllegalArgForMap1() throws Exception { ByteBuffer buffer = allocateBuffer(1024); + int bufSize = 1024; try(ChannelProxy channel = new ChannelProxy(writeFile("testIllegalArgForMap1", buffer)); - MmappedRegions regions = MmappedRegions.map(channel, 0, 0, false)) + MmappedRegions regions = MmappedRegions.map(channel, 0, bufSize, 0, false)) { assertTrue(regions.isEmpty()); } @@ -357,8 +392,9 @@ public void testIllegalArgForMap1() throws Exception public void testIllegalArgForMap2() throws Exception { ByteBuffer buffer = allocateBuffer(1024); + int bufSize = 1024; try(ChannelProxy channel = new ChannelProxy(writeFile("testIllegalArgForMap2", buffer)); - MmappedRegions regions = MmappedRegions.map(channel, -1L, 0, false)) + MmappedRegions regions = MmappedRegions.map(channel, -1L, bufSize, 0, false)) { assertTrue(regions.isEmpty()); } @@ -374,5 +410,4 @@ public void testIllegalArgForMap3() throws Exception assertTrue(regions.isEmpty()); } } - }