Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,7 @@ protected PrimaryKey computeNext()
if (rowId == PostingList.END_OF_STREAM)
return endOfData();

var primaryKey = primaryKeyMap.primaryKeyFromRowId(rowId);
return new PrimaryKeyWithSource(primaryKey, primaryKeyMap.getSSTableId(), rowId);
return primaryKeyMap.primaryKeyFromRowId(rowId);
}
catch (Throwable t)
{
Expand Down Expand Up @@ -161,20 +160,11 @@ private long getNextRowId() throws IOException
long segmentRowId;
if (needsSkipping)
{
long targetSstableRowId;
if (skipToToken instanceof PrimaryKeyWithSource
&& ((PrimaryKeyWithSource) skipToToken).getSourceSstableId().equals(primaryKeyMap.getSSTableId()))
long targetSstableRowId = primaryKeyMap.ceiling(skipToToken);
// skipToToken is larger than max token in token file
if (targetSstableRowId < 0)
{
targetSstableRowId = ((PrimaryKeyWithSource) skipToToken).getSourceRowId();
}
else
{
targetSstableRowId = primaryKeyMap.ceiling(skipToToken);
// skipToToken is larger than max token in token file
if (targetSstableRowId < 0)
{
return PostingList.END_OF_STREAM;
}
return PostingList.END_OF_STREAM;
}
int targetSegmentRowId = Math.toIntExact(targetSstableRowId - searcherContext.getSegmentRowIdOffset());
segmentRowId = postingList.advance(targetSegmentRowId);
Expand Down
19 changes: 19 additions & 0 deletions src/java/org/apache/cassandra/index/sai/disk/PrimaryKeyMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.Closeable;
import java.io.IOException;

import javax.annotation.Nonnull;
import javax.annotation.concurrent.NotThreadSafe;

import org.apache.cassandra.index.sai.utils.PrimaryKey;
Expand Down Expand Up @@ -84,6 +85,24 @@ default void close() throws IOException
*/
PrimaryKey primaryKeyFromRowId(long sstableRowId);

/**
* Returns a {@link PrimaryKey} for a row Id
*
* Note: the lower and upper bounds are used to avoid reading the primary key from disk in the event
* that compared primary keys are in non-overlapping ranges. The ranges can be within the table, and must
* contain the row id. This requirement is not validated, as validation would remove the performance benefit
* of this optimization.
*
* @param sstableRowId the row Id to lookup
* @param lowerBound the inclusive lower bound of the primary key being created
* @param upperBound the inclusive upper bound of the primary key being created
* @return the {@link PrimaryKey} associated with the row Id
*/
default PrimaryKey primaryKeyFromRowId(long sstableRowId, @Nonnull PrimaryKey lowerBound, @Nonnull PrimaryKey upperBound)
{
return primaryKeyFromRowId(sstableRowId);
}

/**
* Returns a row Id for a {@link PrimaryKey}. If there is no such term, returns the `-(next row id) - 1` where
* `next row id` is the row id of the next greatest {@link PrimaryKey} in the map.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,19 @@
* limitations under the License.
*/

package org.apache.cassandra.index.sai.disk;
package org.apache.cassandra.index.sai.disk.v2;

import io.github.jbellis.jvector.util.RamUsageEstimator;
import org.apache.cassandra.db.Clustering;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.index.sai.disk.PrimaryKeyMap;
import org.apache.cassandra.index.sai.utils.PrimaryKey;
import org.apache.cassandra.io.sstable.SSTableId;
import org.apache.cassandra.utils.bytecomparable.ByteComparable;
import org.apache.cassandra.utils.bytecomparable.ByteSource;

public class PrimaryKeyWithSource implements PrimaryKey
class PrimaryKeyWithSource implements PrimaryKey
{
private final PrimaryKey primaryKey;
private final SSTableId<?> sourceSstableId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,11 @@ public static class RowAwarePrimaryKeyMapFactory implements Factory
private FileHandle termsTrie = null;
private final IPartitioner partitioner;
private final ClusteringComparator clusteringComparator;
private final PrimaryKey.Factory primaryKeyFactory;
private final RowAwarePrimaryKeyFactory primaryKeyFactory;
private final SSTableId<?> sstableId;
private final boolean hasStaticColumns;

public RowAwarePrimaryKeyMapFactory(IndexComponents.ForRead perSSTableComponents, PrimaryKey.Factory primaryKeyFactory, SSTableReader sstable)
public RowAwarePrimaryKeyMapFactory(IndexComponents.ForRead perSSTableComponents, RowAwarePrimaryKeyFactory primaryKeyFactory, SSTableReader sstable)
{
try
{
Expand All @@ -105,6 +106,7 @@ public RowAwarePrimaryKeyMapFactory(IndexComponents.ForRead perSSTableComponents
this.primaryKeyFactory = primaryKeyFactory;
this.clusteringComparator = sstable.metadata().comparator;
this.sstableId = sstable.getId();
this.hasStaticColumns = sstable.metadata().hasStaticColumns();
}
catch (Throwable t)
{
Expand All @@ -124,7 +126,8 @@ public PrimaryKeyMap newPerSSTablePrimaryKeyMap()
partitioner,
primaryKeyFactory,
clusteringComparator,
sstableId);
sstableId,
hasStaticColumns);
}
catch (IOException e)
{
Expand All @@ -149,17 +152,19 @@ public void close() throws IOException
private final SortedTermsReader sortedTermsReader;
private final SortedTermsReader.Cursor cursor;
private final IPartitioner partitioner;
private final PrimaryKey.Factory primaryKeyFactory;
private final RowAwarePrimaryKeyFactory primaryKeyFactory;
private final ClusteringComparator clusteringComparator;
private final SSTableId<?> sstableId;
private final boolean hasStaticColumns;

private RowAwarePrimaryKeyMap(LongArray rowIdToToken,
SortedTermsReader sortedTermsReader,
SortedTermsReader.Cursor cursor,
IPartitioner partitioner,
PrimaryKey.Factory primaryKeyFactory,
RowAwarePrimaryKeyFactory primaryKeyFactory,
ClusteringComparator clusteringComparator,
SSTableId<?> sstableId)
SSTableId<?> sstableId,
boolean hasStaticColumns)
{
this.rowIdToToken = rowIdToToken;
this.sortedTermsReader = sortedTermsReader;
Expand All @@ -168,6 +173,7 @@ private RowAwarePrimaryKeyMap(LongArray rowIdToToken,
this.primaryKeyFactory = primaryKeyFactory;
this.clusteringComparator = clusteringComparator;
this.sstableId = sstableId;
this.hasStaticColumns = hasStaticColumns;
}

@Override
Expand All @@ -185,7 +191,9 @@ public long count()
public PrimaryKey primaryKeyFromRowId(long sstableRowId)
{
long token = rowIdToToken.get(sstableRowId);
return primaryKeyFactory.createDeferred(partitioner.getTokenFactory().fromLongValue(token), () -> supplier(sstableRowId));
var primaryKey = primaryKeyFactory.createDeferred(partitioner.getTokenFactory().fromLongValue(token), () -> supplier(sstableRowId));
return hasStaticColumns ? primaryKey
: new PrimaryKeyWithSource(primaryKey, sstableId, sstableRowId);
}

private long skinnyExactRowIdOrInvertedCeiling(PrimaryKey key)
Expand All @@ -212,6 +220,13 @@ private long skinnyExactRowIdOrInvertedCeiling(PrimaryKey key)
@Override
public long exactRowIdOrInvertedCeiling(PrimaryKey key)
{
if (key instanceof PrimaryKeyWithSource)
{
var pkws = (PrimaryKeyWithSource) key;
if (pkws.getSourceSstableId().equals(sstableId))
return pkws.getSourceRowId();
}

if (clusteringComparator.size() == 0)
return skinnyExactRowIdOrInvertedCeiling(key);

Expand All @@ -226,6 +241,13 @@ public long exactRowIdOrInvertedCeiling(PrimaryKey key)
@Override
public long ceiling(PrimaryKey key)
{
if (key instanceof PrimaryKeyWithSource)
{
var pkws = (PrimaryKeyWithSource) key;
if (pkws.getSourceSstableId().equals(sstableId))
return pkws.getSourceRowId();
}

if (clusteringComparator.size() == 0)
{
long rowId = skinnyExactRowIdOrInvertedCeiling(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ public PrimaryKey.Factory newPrimaryKeyFactory(ClusteringComparator comparator)
@Override
public PrimaryKeyMap.Factory newPrimaryKeyMapFactory(IndexComponents.ForRead perSSTableComponents, PrimaryKey.Factory primaryKeyFactory, SSTableReader sstable)
{
return new RowAwarePrimaryKeyMap.RowAwarePrimaryKeyMapFactory(perSSTableComponents, primaryKeyFactory, sstable);
assert primaryKeyFactory instanceof RowAwarePrimaryKeyFactory;
return new RowAwarePrimaryKeyMap.RowAwarePrimaryKeyMapFactory(perSSTableComponents, (RowAwarePrimaryKeyFactory) primaryKeyFactory, sstable);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.apache.cassandra.index.sai.QueryContext;
import org.apache.cassandra.index.sai.disk.PostingList;
import org.apache.cassandra.index.sai.disk.PrimaryKeyMap;
import org.apache.cassandra.index.sai.disk.PrimaryKeyWithSource;
import org.apache.cassandra.index.sai.disk.v1.IndexSearcher;
import org.apache.cassandra.index.sai.disk.v1.PerIndexFiles;
import org.apache.cassandra.index.sai.disk.v1.SegmentMetadata;
Expand Down Expand Up @@ -534,12 +533,7 @@ private SegmentRowIdOrdinalPairs flatmapPrimaryKeysToBitsAndRows(List<PrimaryKey
{
// turn the pk back into a row id, with a fast path for the case where the pk is from this sstable
var primaryKey = keysInRange.get(i);
long sstableRowId;
if (primaryKey instanceof PrimaryKeyWithSource
&& ((PrimaryKeyWithSource) primaryKey).getSourceSstableId().equals(primaryKeyMap.getSSTableId()))
sstableRowId = ((PrimaryKeyWithSource) primaryKey).getSourceRowId();
else
sstableRowId = primaryKeyMap.exactRowIdOrInvertedCeiling(primaryKey);
long sstableRowId = primaryKeyMap.exactRowIdOrInvertedCeiling(primaryKey);

if (sstableRowId < 0)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ protected PrimaryKey computeNext()
{
// The highest primary key seen on any range iterator so far.
// It can become null when we reach the end of the iterator.
PrimaryKey highestKey = ranges.get(0).hasNext() ? ranges.get(0).next() : null;
PrimaryKey highestKey = ranges.get(0).hasNext() ? ranges.get(0).peek() : null;
// Index of the range iterator that has advanced beyond the others
int alreadyAdvanced = 0;
int indexOfHighestKey = 0;
rangeStats[0]++;

outer:
Expand All @@ -80,31 +80,55 @@ protected PrimaryKey computeNext()
// Once this inner loop finishes normally, all iterators are guaranteed to be at the same value.
for (int index = 0; index < ranges.size(); index++)
{
if (index != alreadyAdvanced)
if (index != indexOfHighestKey)
{
KeyRangeIterator range = ranges.get(index);
PrimaryKey nextKey = nextOrNull(range, highestKey);

range.skipTo(highestKey);
PrimaryKey nextKey = range.hasNext() ? range.peek() : null;

rangeStats[index]++;
int comparisonResult;
if (nextKey == null || (comparisonResult = nextKey.compareTo(highestKey)) > 0)
{
// We jumped over the highest key seen so far, so make it the new highest key.
highestKey = nextKey;
// Remember this iterator to avoid advancing it again, because it is already at the highest key
alreadyAdvanced = index;
indexOfHighestKey = index;
// This iterator jumped over, so the other iterators are lagging behind now,
// including the ones already advanced in the earlier cycles of the inner loop.
// Therefore, restart the inner loop in order to advance
// the other iterators except this one to match the new highest key.
continue outer;
}

assert comparisonResult == 0 :
String.format("skipTo skipped to an item smaller than the target; " +
"iterator: %s, target key: %s, returned key: %s", range, highestKey, nextKey);

// More specific keys should win over full partitions,
// because they match a single row instead of the whole partition.
// However, because this key matches with the earlier keys, we can continue the inner loop.
if (!nextKey.hasEmptyClustering())
{
highestKey = nextKey;
indexOfHighestKey = index;
}
}
}
// If we reached here, next() has been called at least once on each range iterator and
// the last call to next() on each iterator returned a value equal to the highestKey.
// If we reached here, we have a match - all iterators are at the same key == highestKey.

// Now we need to advance the iterators to avoid returning the same key again.
// This is tricky because of empty clustering keys that match the whole partition.
// We must not advance ranges at keys with empty clustering because they
// may still match the next keys returned by other iterators in the next cycles.
// However, if all ranges are at the same partition with empty clustering (highestKey.hasEmptyClustering()),
// we must advance all of them, because we return the key for the whole partition and that partition is done.
for (var range : ranges)
{
if (highestKey.hasEmptyClustering() || !range.peek().hasEmptyClustering())
range.next();
}

// Move the iterator that was called the least times to the start of the list.
// This is an optimisation assuming that iterator is likely a more selective one.
Expand Down Expand Up @@ -147,16 +171,6 @@ protected void performSkipTo(PrimaryKey nextToken)
range.skipTo(nextToken);
}

/**
* Fetches the next available item from the iterator, such that the item is not lower than the given key.
* If no such items are available, returns null.
*/
private PrimaryKey nextOrNull(KeyRangeIterator iterator, PrimaryKey minKey)
{
iterator.skipTo(minKey);
return iterator.hasNext() ? iterator.next() : null;
}

public void close() throws IOException
{
ranges.forEach(FileUtils::closeQuietly);
Expand Down
Loading