Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 54 additions & 3 deletions src/java/org/apache/cassandra/index/sai/SSTableContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.cassandra.index.sai;

import java.io.IOException;

import com.google.common.base.Objects;

import org.apache.cassandra.config.CassandraRelevantProperties;
Expand All @@ -43,17 +45,49 @@ public class SSTableContext extends SharedCloseableImpl
public final PrimaryKey.Factory primaryKeyFactory;
public final PrimaryKeyMap.Factory primaryKeyMapFactory;

// The first and last key for the whole sstable
private final PrimaryKey minSSTableKey;
private final PrimaryKey maxSSTableKey;

private SSTableContext(SSTableReader sstable,
IndexComponents.ForRead perSSTableComponents,
PrimaryKey.Factory primaryKeyFactory,
PrimaryKeyMap.Factory primaryKeyMapFactory,
Cleanup cleanup)
boolean skipLoadingMinMaxKeys,
Cleanup cleanup) throws IOException
{
super(cleanup);
this.sstable = sstable;
this.perSSTableComponents = perSSTableComponents;
this.primaryKeyFactory = primaryKeyFactory;
this.primaryKeyMapFactory = primaryKeyMapFactory;

// If the min/max keys are null but then subsequently attempted to be accessed, we throw an exception
// as we do not expect this to happen.
if (skipLoadingMinMaxKeys)
{
minSSTableKey = null;
maxSSTableKey = null;
}
else
{
// If we throw, the caller releases the sstable ref and runs the cleanup.
try (var pkm = primaryKeyMapFactory.newPerSSTablePrimaryKeyMap())
{
if (pkm.count() == 0)
{
minSSTableKey = null;
maxSSTableKey = null;
}
else
{
PrimaryKey min = pkm.primaryKeyFromRowId(0);
PrimaryKey max = pkm.primaryKeyFromRowId(pkm.count() - 1);
minSSTableKey = pkm.primaryKeyFromRowId(0, min, max).loadDeferred();
maxSSTableKey = pkm.primaryKeyFromRowId(pkm.count() - 1, min, max).loadDeferred();
}
}
}
}

private SSTableContext(SSTableContext copy)
Expand All @@ -63,6 +97,8 @@ private SSTableContext(SSTableContext copy)
this.perSSTableComponents = copy.perSSTableComponents;
this.primaryKeyFactory = copy.primaryKeyFactory;
this.primaryKeyMapFactory = copy.primaryKeyMapFactory;
this.minSSTableKey = copy.minSSTableKey;
this.maxSSTableKey = copy.maxSSTableKey;
}

@SuppressWarnings("resource")
Expand All @@ -85,13 +121,14 @@ public static SSTableContext create(SSTableReader sstable, IndexComponents.ForRe
}

// avoid opening SAI metadata if reads are disabled
primaryKeyMapFactory = CassandraRelevantProperties.SAI_INDEX_READS_DISABLED.getBoolean()
boolean readsDisabled = CassandraRelevantProperties.SAI_INDEX_READS_DISABLED.getBoolean();
primaryKeyMapFactory = readsDisabled
? new PrimaryKeyMap.DummyThrowingFactory()
: onDiskFormat.newPrimaryKeyMapFactory(perSSTableComponents, primaryKeyFactory, sstable);

Cleanup cleanup = new Cleanup(primaryKeyMapFactory, sstableRef);

return new SSTableContext(sstable, perSSTableComponents, primaryKeyFactory, primaryKeyMapFactory, cleanup);
return new SSTableContext(sstable, perSSTableComponents, primaryKeyFactory, primaryKeyMapFactory, readsDisabled, cleanup);
}
catch (Throwable t)
{
Expand Down Expand Up @@ -141,6 +178,20 @@ public PrimaryKeyMap.Factory primaryKeyMapFactory()
return primaryKeyMapFactory;
}

public PrimaryKey minSSTableKey()
{
if (minSSTableKey == null)
throw new IllegalStateException("minSSTableKey is null");
return minSSTableKey;
}

public PrimaryKey maxSSTableKey()
{
if (maxSSTableKey == null)
throw new IllegalStateException("maxSSTableKey is null");
return maxSSTableKey;
}

/**
* @return number of open files per {@link SSTableContext} instance
*/
Expand Down
13 changes: 3 additions & 10 deletions src/java/org/apache/cassandra/index/sai/SSTableIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ private static SearchableIndex createSearchableIndex(SSTableContext sstableConte
if (CassandraRelevantProperties.SAI_INDEX_READS_DISABLED.getBoolean())
{
logger.info("Creating dummy (empty) index searcher for sstable {} as SAI index reads are disabled", sstableContext.sstable.descriptor);
return new EmptyIndex();
return new EmptyIndex(sstableContext);
}

return perIndexComponents.onDiskFormat().newSearchableIndex(sstableContext, perIndexComponents);
Expand Down Expand Up @@ -252,17 +252,10 @@ private KeyRangeIterator getNonEqIterator(Expression expression,
QueryContext context,
boolean defer) throws IOException
{
KeyRangeIterator allKeys = allSSTableKeys(keyRange);
if (TypeUtil.supportsRounding(expression.validator))
{
return allKeys;
}
return allSSTableKeys(keyRange);
else
{
Expression negExpression = expression.negated();
KeyRangeIterator matchedKeys = searchableIndex.search(negExpression, keyRange, context, defer);
return KeyRangeAntiJoinIterator.create(allKeys, matchedKeys);
}
return searchableIndex.search(expression, keyRange, context, defer);
}

public KeyRangeIterator search(Expression expression,
Expand Down
12 changes: 11 additions & 1 deletion src/java/org/apache/cassandra/index/sai/disk/EmptyIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.cassandra.db.virtual.SimpleDataSet;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.index.sai.QueryContext;
import org.apache.cassandra.index.sai.SSTableContext;
import org.apache.cassandra.index.sai.disk.v1.Segment;
import org.apache.cassandra.index.sai.iterators.KeyRangeIterator;
import org.apache.cassandra.index.sai.plan.Expression;
Expand All @@ -38,6 +39,13 @@

public class EmptyIndex implements SearchableIndex
{
private final SSTableContext sstableContext;

public EmptyIndex(SSTableContext sstableContext)
{
this.sstableContext = sstableContext;
}

@Override
public long indexFileCacheSize()
{
Expand Down Expand Up @@ -98,7 +106,9 @@ public KeyRangeIterator search(Expression expression,
QueryContext context,
boolean defer) throws IOException
{
return KeyRangeIterator.empty();
return expression.getOp().isNonEquality()
? PrimaryKeyMapIterator.create(sstableContext, keyRange)
: KeyRangeIterator.empty();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;

import org.apache.cassandra.index.sai.QueryContext;
import org.apache.cassandra.index.sai.disk.v1.SegmentMetadata;
import org.apache.cassandra.index.sai.utils.PrimaryKey;

public class IndexSearcherContext
Expand All @@ -30,15 +31,22 @@ public class IndexSearcherContext

final PrimaryKey minimumKey;
final PrimaryKey maximumKey;
final long minSSTableRowId;
final long maxSSTableRowId;
final long segmentRowIdOffset;
final long maxPartitionOffset;

public IndexSearcherContext(SegmentMetadata metadata,
QueryContext context,
PostingList postingList) throws IOException
{
this(metadata.minKey,
metadata.maxKey,
metadata.segmentRowIdOffset,
context,
postingList);
}

public IndexSearcherContext(PrimaryKey minimumKey,
PrimaryKey maximumKey,
long minSSTableRowId,
long maxSSTableRowId,
long segmentRowIdOffset,
QueryContext context,
PostingList postingList) throws IOException
Expand All @@ -53,8 +61,6 @@ public IndexSearcherContext(PrimaryKey minimumKey,
// use segment's metadata for the range iterator, may not be accurate, but should not matter to performance.
this.maximumKey = maximumKey;

this.minSSTableRowId = minSSTableRowId;
this.maxSSTableRowId = maxSSTableRowId;
this.maxPartitionOffset = Long.MAX_VALUE;
}

Expand Down
80 changes: 57 additions & 23 deletions src/java/org/apache/cassandra/index/sai/disk/v1/IndexSearcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.index.sai.IndexContext;
import org.apache.cassandra.index.sai.QueryContext;
import org.apache.cassandra.index.sai.SSTableContext;
import org.apache.cassandra.index.sai.disk.IndexSearcherContext;
import org.apache.cassandra.index.sai.disk.PostingList;
import org.apache.cassandra.index.sai.disk.PostingListKeyRangeIterator;
import org.apache.cassandra.index.sai.disk.PrimaryKeyMap;
import org.apache.cassandra.index.sai.disk.v1.postings.ComplementPostingList;
import org.apache.cassandra.index.sai.iterators.KeyRangeIterator;
import org.apache.cassandra.index.sai.iterators.RowIdToPrimaryKeyWithSortKeyIterator;
import org.apache.cassandra.index.sai.plan.Expression;
Expand All @@ -63,23 +65,43 @@
*/
public abstract class IndexSearcher implements Closeable, SegmentOrdering
{
private final SSTableContext sstableContext;
protected final PrimaryKeyMap.Factory primaryKeyMapFactory;
final PerIndexFiles indexFiles;
protected final SegmentMetadata metadata;
protected final IndexContext indexContext;

protected final ColumnFilter columnFilter;

protected IndexSearcher(PrimaryKeyMap.Factory primaryKeyMapFactory,
// These row ids are inclusive of all rows in the sstable, not just rows that are indexed. They are used
// to determine the first and last row ids for a non-equality query.
private final boolean isFirstSegment;
private final boolean isLastSegment;
private final int firstSegmentRowId;
private final int lastSegmentRowId;

protected IndexSearcher(SSTableContext sstableContext,
PerIndexFiles perIndexFiles,
SegmentMetadata segmentMetadata,
IndexContext indexContext)
{
this.primaryKeyMapFactory = primaryKeyMapFactory;
this.sstableContext = sstableContext;
this.primaryKeyMapFactory = sstableContext.primaryKeyMapFactory();
this.indexFiles = perIndexFiles;
this.metadata = segmentMetadata;
this.indexContext = indexContext;
columnFilter = ColumnFilter.selection(RegularAndStaticColumns.of(indexContext.getDefinition()));

// TODO how do we deal with version AA here?? Is NEQ supported on AA? It seems like it can't be, but I haven't
// been able to confirm the behavior.

// The first segment's minimum row id and the last segment's maximum row id need to be adjusted when searching
// for non-equality queries because the SegmentMetadata's min/max row ids do not include rows that do not have
// values for the indexed column.
this.isFirstSegment = metadata.segmentRowIdOffset == 0;
this.isLastSegment = metadata.isLastSegmentInSSTable;
this.firstSegmentRowId = metadata.toSegmentRowId(isFirstSegment ? 0 : metadata.minSSTableRowId);
this.lastSegmentRowId = metadata.toSegmentRowId(isLastSegment ? primaryKeyMapFactory.count() - 1 : metadata.minSSTableRowId);
}

/**
Expand All @@ -96,7 +118,38 @@ protected IndexSearcher(PrimaryKeyMap.Factory primaryKeyMapFactory,
* @param defer create the iterator in a deferred state
* @return {@link KeyRangeIterator} that matches given expression
*/
public abstract KeyRangeIterator search(Expression expression, AbstractBounds<PartitionPosition> keyRange, QueryContext queryContext, boolean defer) throws IOException;
public KeyRangeIterator search(Expression expression, AbstractBounds<PartitionPosition> keyRange, QueryContext queryContext, boolean defer) throws IOException
{
IndexSearcherContext searcherContext;
if (expression.getOp().isNonEquality())
{
var negatedPostingList = searchInternal(expression.negated(), keyRange, queryContext, defer);
var postingList = new ComplementPostingList(firstSegmentRowId, lastSegmentRowId, negatedPostingList);
if (postingList.isEmpty())
{
postingList.close(); // Closing the posting list also closes the negated posting list.
return KeyRangeIterator.empty();
}

// Use the appropriate min and max keys for the first and last segments.
searcherContext = new IndexSearcherContext(isFirstSegment ? sstableContext.minSSTableKey() : metadata.minKey,
isLastSegment ? sstableContext.maxSSTableKey() : metadata.maxKey,
metadata.segmentRowIdOffset,
queryContext,
postingList);
}
else
{
var postingList = searchInternal(expression, keyRange, queryContext, defer);
if (postingList == null || postingList.isEmpty())
return KeyRangeIterator.empty();

searcherContext = new IndexSearcherContext(metadata, queryContext, postingList);
}
return new PostingListKeyRangeIterator(indexContext, primaryKeyMapFactory.newPerSSTablePrimaryKeyMap(), searcherContext);
}

protected abstract PostingList searchInternal(Expression expression, AbstractBounds<PartitionPosition> keyRange, QueryContext queryContext, boolean defer) throws IOException;

/**
* Order the rows by the given Orderer. Used for ORDER BY clause when
Expand Down Expand Up @@ -154,21 +207,6 @@ private ByteComparable encode(ByteBuffer input)
: v -> TypeUtil.asComparableBytes(input, indexContext.getValidator(), v);
}

protected KeyRangeIterator toPrimaryKeyIterator(PostingList postingList, QueryContext queryContext) throws IOException
{
if (postingList == null || postingList.size() == 0)
return KeyRangeIterator.empty();

IndexSearcherContext searcherContext = new IndexSearcherContext(metadata.minKey,
metadata.maxKey,
metadata.minSSTableRowId,
metadata.maxSSTableRowId,
metadata.segmentRowIdOffset,
queryContext,
postingList);
return new PostingListKeyRangeIterator(indexContext, primaryKeyMapFactory.newPerSSTablePrimaryKeyMap(), searcherContext);
}

protected CloseableIterator<PrimaryKeyWithSortKey> toMetaSortedIterator(CloseableIterator<? extends RowIdWithMeta> rowIdIterator, QueryContext queryContext) throws IOException
{
if (rowIdIterator == null || !rowIdIterator.hasNext())
Expand All @@ -177,11 +215,7 @@ protected CloseableIterator<PrimaryKeyWithSortKey> toMetaSortedIterator(Closeabl
return CloseableIterator.emptyIterator();
}

IndexSearcherContext searcherContext = new IndexSearcherContext(metadata.minKey,
metadata.maxKey,
metadata.minSSTableRowId,
metadata.maxSSTableRowId,
metadata.segmentRowIdOffset,
IndexSearcherContext searcherContext = new IndexSearcherContext(metadata,
queryContext,
null);
var pkm = primaryKeyMapFactory.newPerSSTablePrimaryKeyMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import org.apache.cassandra.index.sai.disk.format.IndexComponentType;
import org.apache.cassandra.index.sai.disk.format.Version;
import org.apache.cassandra.index.sai.disk.v1.postings.IntersectingPostingList;
import org.apache.cassandra.index.sai.iterators.KeyRangeIterator;
import org.apache.cassandra.index.sai.metrics.MulticastQueryEventListeners;
import org.apache.cassandra.index.sai.metrics.QueryEventListener;
import org.apache.cassandra.index.sai.plan.Expression;
Expand Down Expand Up @@ -98,7 +97,7 @@ protected InvertedIndexSearcher(SSTableContext sstableContext,
Version version,
boolean filterRangeResults) throws IOException
{
super(sstableContext.primaryKeyMapFactory(), perIndexFiles, segmentMetadata, indexContext);
super(sstableContext, perIndexFiles, segmentMetadata, indexContext);
this.sstable = sstableContext.sstable;

long root = metadata.getIndexRoot(IndexComponentType.TERMS_DATA);
Expand Down Expand Up @@ -134,13 +133,7 @@ public long indexFileCacheSize()
}

@SuppressWarnings("resource")
public KeyRangeIterator search(Expression exp, AbstractBounds<PartitionPosition> keyRange, QueryContext context, boolean defer) throws IOException
{
PostingList postingList = searchPosting(exp, context);
return toPrimaryKeyIterator(postingList, context);
}

private PostingList searchPosting(Expression exp, QueryContext context)
protected PostingList searchInternal(Expression exp, AbstractBounds<PartitionPosition> keyRange, QueryContext context, boolean defer) throws IOException
{
if (logger.isTraceEnabled())
logger.trace(indexContext.logMessage("Searching on expression '{}'..."), exp);
Expand Down
Loading