Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/java/org/apache/cassandra/io/util/FileHandle.java
Original file line number Diff line number Diff line change
Expand Up @@ -515,9 +515,9 @@ private void updateRegions(ChannelProxy channel, long length, long startOffset)
}

if (regions == null)
regions = MmappedRegions.map(channel, length, startOffset, adviseRandom);
regions = MmappedRegions.map(channel, length, bufferSize, startOffset, adviseRandom);
else
regions.extend(length);
regions.extend(length, bufferSize);
}
}

Expand Down
57 changes: 23 additions & 34 deletions src/java/org/apache/cassandra/io/util/MmappedRegions.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,6 @@
package org.apache.cassandra.io.util;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.FloatBuffer;
import java.nio.IntBuffer;
import java.nio.LongBuffer;
import java.nio.channels.FileChannel;
import java.util.Arrays;

Expand Down Expand Up @@ -61,34 +57,22 @@ public class MmappedRegions extends SharedCloseableImpl
*/
private volatile State copy;

private MmappedRegions(ChannelProxy channel, CompressionMetadata metadata, long length, long uncompressedSliceOffset, boolean adviseRandom)
private MmappedRegions(State state, CompressionMetadata metadata, long uncompressedSliceOffset)
{
this(new State(channel, offsetFrom(metadata, uncompressedSliceOffset), adviseRandom),
metadata,
length,
uncompressedSliceOffset);
}

private static long offsetFrom(CompressionMetadata metadata, long uncompressedSliceOffset)
{
return metadata == null ? uncompressedSliceOffset : metadata.chunkFor(uncompressedSliceOffset).offset;
super(new Tidier(state));
this.state = state;
updateState(metadata, uncompressedSliceOffset);
this.copy = new State(state);
}

private MmappedRegions(State state, CompressionMetadata metadata, long length, long uncompressedOffset)
private MmappedRegions(State state, long length, int chunkSize)
{
super(new Tidier(state));

this.state = state;

if (metadata != null)
{
assert length == 0 : "expected no length with metadata";
updateState(metadata, uncompressedOffset);
}
else if (length > 0)
{
updateState(length);
}
if (length > 0)
updateState(length, chunkSize);

this.copy = new State(state);
}
Expand All @@ -101,7 +85,7 @@ private MmappedRegions(MmappedRegions original)

public static MmappedRegions empty(ChannelProxy channel)
{
return new MmappedRegions(channel, null, 0, 0, false);
return new MmappedRegions(new State(channel, 0, false), 0, 0);
}

/**
Expand All @@ -119,16 +103,17 @@ public static MmappedRegions map(ChannelProxy channel, CompressionMetadata metad
{
if (metadata == null)
throw new IllegalArgumentException("metadata cannot be null");

return new MmappedRegions(channel, metadata, 0, uncompressedSliceOffset, adviseRandom);
State state = new State(channel, metadata.chunkFor(uncompressedSliceOffset).offset, adviseRandom);
return new MmappedRegions(state, metadata, uncompressedSliceOffset);
}

public static MmappedRegions map(ChannelProxy channel, long length, long uncompressedSliceOffset, boolean adviseRandom)
public static MmappedRegions map(ChannelProxy channel, long length, int chunkSize, long uncompressedSliceOffset, boolean adviseRandom)
{
if (length <= 0)
throw new IllegalArgumentException("Length must be positive");

return new MmappedRegions(channel, null, length, uncompressedSliceOffset, adviseRandom);
State state = new State(channel, uncompressedSliceOffset, adviseRandom);
return new MmappedRegions(state, length, chunkSize);
}

/**
Expand All @@ -145,8 +130,10 @@ private boolean isCopy()
return copy == null;
}

public void extend(long length)
public void extend(long length, int chunkSize)
{
// We cannot enforce length to be a multiple of chunkSize (at the very least the last extend on a file
// will not satisfy this), so we hope the caller knows what they are doing.
if (length < 0)
throw new IllegalArgumentException("Length must not be negative");

Expand All @@ -155,17 +142,19 @@ public void extend(long length)
if (length <= state.length)
return;

updateState(length);
updateState(length, chunkSize);
copy = new State(state);
}

private void updateState(long length)
private void updateState(long length, int chunkSize)
{
// make sure the regions span whole chunks
long maxSize = (long) (MAX_SEGMENT_SIZE / chunkSize) * chunkSize;
state.length = length;
long pos = state.getPosition();
while (pos < length)
{
long size = Math.min(MAX_SEGMENT_SIZE, length - pos);
long size = Math.min(maxSize, length - pos);
state.add(pos, size);
pos += size;
}
Expand Down Expand Up @@ -292,7 +281,7 @@ private static final class State
private final long onDiskSliceOffset;

/** whether to apply fadv_random to mapped regions */
private boolean adviseRandom;
private final boolean adviseRandom;

private State(ChannelProxy channel, long onDiskSliceOffset, boolean adviseRandom)
{
Expand Down
73 changes: 54 additions & 19 deletions test/unit/org/apache/cassandra/io/util/MmappedRegionsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,11 @@ public void testEmpty() throws Exception
public void testTwoSegments() throws Exception
{
ByteBuffer buffer = allocateBuffer(2048);
int bufSize = 1024;
try(ChannelProxy channel = new ChannelProxy(writeFile("testTwoSegments", buffer));
MmappedRegions regions = MmappedRegions.empty(channel))
{
regions.extend(1024);
regions.extend(1024, bufSize);
for (int i = 0; i < 1024; i++)
{
MmappedRegions.Region region = regions.floor(i);
Expand All @@ -108,7 +109,7 @@ public void testTwoSegments() throws Exception
assertEquals(1024, region.end());
}

regions.extend(2048);
regions.extend(2048, bufSize);
for (int i = 0; i < 2048; i++)
{
MmappedRegions.Region region = regions.floor(i);
Expand All @@ -131,15 +132,16 @@ public void testTwoSegments() throws Exception
public void testSmallSegmentSize() throws Exception
{
int OLD_MAX_SEGMENT_SIZE = MmappedRegions.MAX_SEGMENT_SIZE;
int bufSize = 1024;
MmappedRegions.MAX_SEGMENT_SIZE = 1024;

ByteBuffer buffer = allocateBuffer(4096);
try(ChannelProxy channel = new ChannelProxy(writeFile("testSmallSegmentSize", buffer));
MmappedRegions regions = MmappedRegions.empty(channel))
{
regions.extend(1024);
regions.extend(2048);
regions.extend(4096);
regions.extend(1024, bufSize);
regions.extend(2048, bufSize);
regions.extend(4096, bufSize);

final int SIZE = MmappedRegions.MAX_SEGMENT_SIZE;
for (int i = 0; i < buffer.capacity(); i++)
Expand All @@ -156,18 +158,46 @@ public void testSmallSegmentSize() throws Exception
}
}

@Test
public void testSizeIsChunkMultiple() throws Exception
{
final int oldMaxSegmentSize = MmappedRegions.MAX_SEGMENT_SIZE;
final int bufSize = 1024;
MmappedRegions.MAX_SEGMENT_SIZE = 2047;
ByteBuffer buffer = allocateBuffer(4096);
try(ChannelProxy channel = new ChannelProxy(writeFile("testSmallSegmentSize", buffer));
MmappedRegions regions = MmappedRegions.empty(channel))
{
regions.extend(1024, bufSize);
regions.extend(2048, bufSize);
regions.extend(4096, bufSize);
for (int i = 0; i < buffer.capacity(); i++)
{
MmappedRegions.Region region = regions.floor(i);
assertNotNull(region);
assertEquals(bufSize * (i / bufSize), region.offset());
assertEquals(bufSize + (bufSize * (i / bufSize)), region.end());
}
}
finally
{
MmappedRegions.MAX_SEGMENT_SIZE = oldMaxSegmentSize;
}
}

@Test
public void testAllocRegions() throws Exception
{
int OLD_MAX_SEGMENT_SIZE = MmappedRegions.MAX_SEGMENT_SIZE;
MmappedRegions.MAX_SEGMENT_SIZE = 1024;
int bufSize = 1024;

ByteBuffer buffer = allocateBuffer(MmappedRegions.MAX_SEGMENT_SIZE * MmappedRegions.REGION_ALLOC_SIZE * 3);

try(ChannelProxy channel = new ChannelProxy(writeFile("testAllocRegions", buffer));
MmappedRegions regions = MmappedRegions.empty(channel))
{
regions.extend(buffer.capacity());
regions.extend(buffer.capacity(), bufSize);

final int SIZE = MmappedRegions.MAX_SEGMENT_SIZE;
for (int i = 0; i < buffer.capacity(); i++)
Expand All @@ -188,17 +218,18 @@ public void testAllocRegions() throws Exception
public void testCopy() throws Exception
{
ByteBuffer buffer = allocateBuffer(128 * 1024);
int bufSize = 4096;

MmappedRegions snapshot;
ChannelProxy channelCopy;

try(ChannelProxy channel = new ChannelProxy(writeFile("testSnapshot", buffer));
MmappedRegions regions = MmappedRegions.map(channel, buffer.capacity() / 4, 0, false))
MmappedRegions regions = MmappedRegions.map(channel, buffer.capacity() / 4, bufSize, 0, false))
{
// create 3 more segments, one per quater capacity
regions.extend(buffer.capacity() / 2);
regions.extend(3 * buffer.capacity() / 4);
regions.extend(buffer.capacity());
regions.extend(buffer.capacity() / 2, bufSize);
regions.extend(3 * buffer.capacity() / 4, bufSize);
regions.extend(buffer.capacity(), bufSize);

// make a snapshot
snapshot = regions.sharedCopy();
Expand Down Expand Up @@ -230,14 +261,15 @@ public void testCopy() throws Exception
public void testCopyCannotExtend() throws Exception
{
ByteBuffer buffer = allocateBuffer(128 * 1024);
int bufSize = 1024;

MmappedRegions snapshot;
ChannelProxy channelCopy;

try(ChannelProxy channel = new ChannelProxy(writeFile("testSnapshotCannotExtend", buffer));
MmappedRegions regions = MmappedRegions.empty(channel))
{
regions.extend(buffer.capacity() / 2);
regions.extend(buffer.capacity() / 2, bufSize);

// make a snapshot
snapshot = regions.sharedCopy();
Expand All @@ -248,7 +280,7 @@ public void testCopyCannotExtend() throws Exception

try
{
snapshot.extend(buffer.capacity());
snapshot.extend(buffer.capacity(), bufSize);
}
finally
{
Expand All @@ -261,12 +293,13 @@ public void testCopyCannotExtend() throws Exception
public void testExtendOutOfOrder() throws Exception
{
ByteBuffer buffer = allocateBuffer(4096);
int bufSize = 1024;
try(ChannelProxy channel = new ChannelProxy(writeFile("testExtendOutOfOrder", buffer));
MmappedRegions regions = MmappedRegions.empty(channel))
{
regions.extend(4096);
regions.extend(1024);
regions.extend(2048);
regions.extend(4096, bufSize);
regions.extend(1024, bufSize);
regions.extend(2048, bufSize);

for (int i = 0; i < buffer.capacity(); i++)
{
Expand All @@ -282,10 +315,11 @@ public void testExtendOutOfOrder() throws Exception
public void testNegativeExtend() throws Exception
{
ByteBuffer buffer = allocateBuffer(1024);
int bufSize = 1024;
try(ChannelProxy channel = new ChannelProxy(writeFile("testNegativeExtend", buffer));
MmappedRegions regions = MmappedRegions.empty(channel))
{
regions.extend(-1);
regions.extend(-1, bufSize);
}
}

Expand Down Expand Up @@ -346,8 +380,9 @@ public void testMapForCompressionMetadata() throws Exception
public void testIllegalArgForMap1() throws Exception
{
ByteBuffer buffer = allocateBuffer(1024);
int bufSize = 1024;
try(ChannelProxy channel = new ChannelProxy(writeFile("testIllegalArgForMap1", buffer));
MmappedRegions regions = MmappedRegions.map(channel, 0, 0, false))
MmappedRegions regions = MmappedRegions.map(channel, 0, bufSize, 0, false))
{
assertTrue(regions.isEmpty());
}
Expand All @@ -357,8 +392,9 @@ public void testIllegalArgForMap1() throws Exception
public void testIllegalArgForMap2() throws Exception
{
ByteBuffer buffer = allocateBuffer(1024);
int bufSize = 1024;
try(ChannelProxy channel = new ChannelProxy(writeFile("testIllegalArgForMap2", buffer));
MmappedRegions regions = MmappedRegions.map(channel, -1L, 0, false))
MmappedRegions regions = MmappedRegions.map(channel, -1L, bufSize, 0, false))
{
assertTrue(regions.isEmpty());
}
Expand All @@ -374,5 +410,4 @@ public void testIllegalArgForMap3() throws Exception
assertTrue(regions.isEmpty());
}
}

}