Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ protected PrimaryKey computeNext()
{
// The highest primary key seen on any range iterator so far.
// It can become null when we reach the end of the iterator.
PrimaryKey highestKey = ranges.get(0).hasNext() ? ranges.get(0).next() : null;
PrimaryKey highestKey = ranges.get(0).hasNext() ? ranges.get(0).peek() : null;
// Index of the range iterator that has advanced beyond the others
int alreadyAdvanced = 0;
int indexOfHighestKey = 0;
rangeStats[0]++;

outer:
Expand All @@ -80,31 +80,55 @@ protected PrimaryKey computeNext()
// Once this inner loop finishes normally, all iterators are guaranteed to be at the same value.
for (int index = 0; index < ranges.size(); index++)
{
if (index != alreadyAdvanced)
if (index != indexOfHighestKey)
{
KeyRangeIterator range = ranges.get(index);
PrimaryKey nextKey = nextOrNull(range, highestKey);

range.skipTo(highestKey);
PrimaryKey nextKey = range.hasNext() ? range.peek() : null;

rangeStats[index]++;
int comparisonResult;
if (nextKey == null || (comparisonResult = nextKey.compareTo(highestKey)) > 0)
{
// We jumped over the highest key seen so far, so make it the new highest key.
highestKey = nextKey;
// Remember this iterator to avoid advancing it again, because it is already at the highest key
alreadyAdvanced = index;
indexOfHighestKey = index;
// This iterator jumped over, so the other iterators are lagging behind now,
// including the ones already advanced in the earlier cycles of the inner loop.
// Therefore, restart the inner loop in order to advance
// the other iterators except this one to match the new highest key.
continue outer;
}

assert comparisonResult == 0 :
String.format("skipTo skipped to an item smaller than the target; " +
"iterator: %s, target key: %s, returned key: %s", range, highestKey, nextKey);

// More specific keys should win over full partitions,
// because they match a single row instead of the whole partition.
// However, because this key matches with the earlier keys, we can continue the inner loop.
if (!nextKey.hasEmptyClustering())
{
highestKey = nextKey;
indexOfHighestKey = index;
}
}
}
// If we reached here, next() has been called at least once on each range iterator and
// the last call to next() on each iterator returned a value equal to the highestKey.
// If we reached here, we have a match - all iterators are at the same key == highestKey.

// Now we need to advance the iterators to avoid returning the same key again.
// This is tricky because of empty clustering keys that match the whole partition.
// We must not advance ranges at keys with empty clustering because they
// may still match the next keys returned by other iterators in the next cycles.
// However, if all ranges are at the same partition with empty clustering (highestKey.hasEmptyClustering()),
// we must advance all of them, because we return the key for the whole partition and that partition is done.
for (var range : ranges)
{
if (highestKey.hasEmptyClustering() || !range.peek().hasEmptyClustering())
range.next();
}

// Move the iterator that was called the least times to the start of the list.
// This is an optimisation assuming that iterator is likely a more selective one.
Expand Down Expand Up @@ -147,16 +171,6 @@ protected void performSkipTo(PrimaryKey nextToken)
range.skipTo(nextToken);
}

/**
* Fetches the next available item from the iterator, such that the item is not lower than the given key.
* If no such items are available, returns null.
*/
private PrimaryKey nextOrNull(KeyRangeIterator iterator, PrimaryKey minKey)
{
iterator.skipTo(minKey);
return iterator.hasNext() ? iterator.next() : null;
}

public void close() throws IOException
{
ranges.forEach(FileUtils::closeQuietly);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,23 @@

import com.google.common.collect.Iterables;

import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.index.sai.utils.PrimaryKey;
import org.apache.cassandra.io.util.FileUtils;

/**
* Range Union Iterator is used to return sorted stream of elements from multiple KeyRangeIterator instances.
* Keys are sorted by natural order of PrimaryKey, however if two keys are equal by their natural order,
* the one with an empty clustering always wins.
*/
@SuppressWarnings("resource")
public class KeyRangeUnionIterator extends KeyRangeIterator
{
public final List<KeyRangeIterator> ranges;

// If set, we must first skip this partition.
private DecoratedKey partitionToSkip = null;

private KeyRangeUnionIterator(Builder.Statistics statistics, List<KeyRangeIterator> ranges)
{
super(statistics);
Expand All @@ -43,6 +49,10 @@ private KeyRangeUnionIterator(Builder.Statistics statistics, List<KeyRangeIterat

public PrimaryKey computeNext()
{
// If we already emitted a partition key for the whole partition (== pk with empty clustering),
// we should not emit any more keys from this partition.
maybeSkipCurrentPartition();

// Keep track of the next best candidate. If another candidate has the same value, advance it to prevent
// duplicate results. This design avoids unnecessary list operations.
KeyRangeIterator candidate = null;
Expand All @@ -59,14 +69,64 @@ public PrimaryKey computeNext()
{
int cmp = candidate.peek().compareTo(range.peek());
if (cmp == 0)
range.next();
{
// Due to the way how we compare PrimaryKeys with empty clusterings which is hard to change now,
// the fact that two primary keys compare the same doesn't guarantee they have the same clustering.
// The clustering information is ignored if one key has empty clustering, so a key with an empty
// clustering will match any key with a non-empty clustering (as long as the partition keys are the same).
// So we may end up in a situation when one or more ranges have empty clustering and the others
// have non-empty. This situation is likely if we mix row-aware (DC, EC, ...) indexes with older
// non-row-aware (AA) indexes.
// In that case we absolutely *must* pick the key with an empty clustering,
// as it matches all rows in the partition
// (and hence, it includes all the rows matched by the keys from the other candidates).
// Thanks to postfiltering, we are allowed to return more rows than necessary in SAI, but not less.
// If we chose one of the specific keys with non-empty clustering (e.g. pick the first one we see),
// we may miss rows matched by the non-row-aware index, as well as the rows matched by
// the other row-aware indexes.
if (range.peek().hasEmptyClustering() && !candidate.peek().hasEmptyClustering())
candidate = range;
else
range.next(); // truly equal by partition and clustering, so we can just get rid of one
}
else if (cmp > 0)
{
candidate = range;
}
}
}

if (candidate == null)
return endOfData();
return candidate.next();

var result = candidate.next();

// If the winning candidate has an empty clustering, this means it selects the whole partition, so
// advance all other ranges to the end of this partition to avoid duplicates.
// We delay that to the next call to computeNext() though, because if we have a wide partition, it's better
// to first let the caller consume all the rows from this partition - maybe they won't call again.
if (result.hasEmptyClustering())
partitionToSkip = result.partitionKey();

return result;
}

private void maybeSkipCurrentPartition()
{
if (partitionToSkip != null)
{
for (KeyRangeIterator range : ranges)
skipPartition(range, partitionToSkip);

partitionToSkip = null;
}
}

private void skipPartition(KeyRangeIterator iterator, DecoratedKey partitionKey)
{
// TODO: Push this logic down to the iterator where it can be more efficient
while (iterator.hasNext() && iterator.peek().partitionKey() != null && iterator.peek().partitionKey().compareTo(partitionKey) <= 0)
iterator.next();
}

protected void performSkipTo(PrimaryKey nextKey)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ private boolean isEqualToLastKey(PrimaryKey key)
// filtered and considered as a result multiple times).
return lastKey != null &&
Objects.equals(lastKey.partitionKey(), key.partitionKey()) &&
Objects.equals(lastKey.clustering(), key.clustering());
(lastKey.hasEmptyClustering() || key.hasEmptyClustering() || Objects.equals(lastKey.clustering(), key.clustering()));
}

private void fillNextSelectedKeysInPartition(DecoratedKey partitionKey, List<PrimaryKey> nextPrimaryKeys)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.index.sai.cql;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import org.junit.Test;

import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.index.sai.SAITester;
import org.apache.cassandra.index.sai.SAIUtil;
import org.apache.cassandra.index.sai.disk.format.Version;
import org.apache.cassandra.index.sai.plan.QueryController;

import static org.junit.Assert.assertEquals;

public class NumericIndexMixedVersionTest extends SAITester
{
// Versions in random order
final static List<Version> VERSIONS = getVersions();

private static List<Version> getVersions()
{
var versions = new ArrayList<>(Version.ALL);
Collections.reverse(versions);
// AA is the earliest version and produces different data for flush vs compaction, so we have
// special logic to hit that and make this first.
assert versions.get(0).equals(Version.AA);
logger.info("Running mixed version test with versions: {}", versions);
return versions;
}


// This test does not trigger an issue. It simply confirms that we can query across versions.
@Test
public void testMultiVersionCompatibilityNoClusteringColumns() throws Throwable
{
createTable("CREATE TABLE %s (pk int, val int, PRIMARY KEY(pk))");
createIndex("CREATE CUSTOM INDEX ON %s(val) USING 'StorageAttachedIndex'");

// Note that we do not test the multi-version path where compaction produces different sstables, which is
// the norm in CNDB. If we had a way to compnact individual sstables, we could.
disableCompaction();

SAIUtil.setCurrentVersion(Version.AA);
for (int j = 0; j < 500; j++)
execute("INSERT INTO %s (pk, val) VALUES (?, ?)", j, j);
flush();
compact();

// Insert 500 rows per version, each with a unique pk but overlapping values.
int pk = 0;
for (var version : VERSIONS)
{
SAIUtil.setCurrentVersion(version);
for (int i = 0; i < 500; i++)
execute("INSERT INTO %s (pk, val) VALUES (?, ?)", pk++, i);
flush();
}

// Confirm that compaction (aka rebuilding all indexes onto same version) also produces correct results
final int expectedRows = pk;
runThenFlushThenCompact(() -> {
var batchLimit = CassandraRelevantProperties.SAI_PARTITION_ROW_BATCH_SIZE.getInt();
// Query that will hit all sstables and exceed the cassandra.sai.partition_row_batch_size limit
var rows = executeNetWithPaging("SELECT pk FROM %s WHERE val >= 0 LIMIT 10000", batchLimit / 2);
assertEquals(expectedRows, rows.all().size());

rows = executeNetWithPaging("SELECT pk FROM %s WHERE val >= 0 LIMIT 10000", batchLimit);
assertEquals(expectedRows, rows.all().size());

rows = executeNetWithPaging("SELECT pk FROM %s WHERE val >= 0 LIMIT 10000", batchLimit * 2);
assertEquals(expectedRows, rows.all().size());

// Test without paging
assertNumRows(expectedRows, "SELECT pk FROM %%s WHERE val >= 0 LIMIT 10000");
});
}

@Test
public void testMultiVersionCompatibilityWithClusteringColumns() throws Throwable
{
createTable("CREATE TABLE %s (pk int, ck int, val int, PRIMARY KEY(pk, ck)) WITH CLUSTERING ORDER BY (ck ASC)");
createIndex("CREATE CUSTOM INDEX ON %s(val) USING 'StorageAttachedIndex'");

// Note that we do not test the multi-version path where compaction produces different sstables, which is
// the norm in CNDB. If we had a way to compact individual sstables, we could.
disableCompaction();

SAIUtil.setCurrentVersion(Version.AA);
int ck = 0;
for (int j = 0; j < 500; j++)
execute("INSERT INTO %s (pk, ck, val) VALUES (1, ?, ?)", ck++, j);
flush();
compact();

// Insert 500 rows per version
for (var version : VERSIONS)
{
SAIUtil.setCurrentVersion(version);
for (int j = 0; j < 500; j++)
execute("INSERT INTO %s (pk, ck, val) VALUES (1, ?, ?)", ck++, j);
flush();
}

// Confirm that compaction (aka rebuilding all indexes onto same version) also produces correct results
final int expectedRows = ck;
runThenFlushThenCompact(() -> {
// When using paging, we get an excessive number of results because of logic within the contoller.select
// method that short circuits when one of the indexes is aa (not row aware).
var batchLimit = CassandraRelevantProperties.SAI_PARTITION_ROW_BATCH_SIZE.getInt();
var rows = executeNetWithPaging("SELECT ck FROM %s WHERE val >= 0 LIMIT 10000", batchLimit / 2);
assertEquals(expectedRows, rows.all().size());

rows = executeNetWithPaging("SELECT ck FROM %s WHERE val >= 0 LIMIT 10000", batchLimit);
assertEquals(expectedRows, rows.all().size());

rows = executeNetWithPaging("SELECT ck FROM %s WHERE val >= 0 LIMIT 10000", batchLimit * 2);
assertEquals(expectedRows, rows.all().size());

// Test without paging. This test actually fails by producing fewer than expected rows because of an issue in
// partition-only primary keys and row aware primary keys that are considered equal. When they are unioned
// in the iterator, we take one and leave the other (they evaluate to equal after all) but this behavior
// filters out a result that would have loaded the whole partition and might have returned a unique result.
assertNumRows(expectedRows, "SELECT ck FROM %%s WHERE val >= 0 LIMIT 10000");
});
}


@Test
public void testMultiVersionCompatibilityWithClustringColumnsIntersection() throws Throwable
{
QueryController.QUERY_OPT_LEVEL = 0;
SAIUtil.setCurrentVersion(Version.AA);

createTable("CREATE TABLE %s (pk int, ck int, val1 int, val2 int, PRIMARY KEY(pk, ck))");
createIndex("CREATE CUSTOM INDEX ON %s(val1) USING 'StorageAttachedIndex'");
disableCompaction();

// Insert rows so that all have v1 == 1. Index has AA version, and don't compact to get the AA version where we
// get a single primary key per partition in the internal iterator.
for (int j = 0; j < 500; j++)
{
execute("INSERT INTO %s (pk, ck, val1) VALUES (-1, ?, 1)", j);
execute("INSERT INTO %s (pk, ck, val1) VALUES (?, ?, ?)", j, j, j);
}
flush();

// Now, create rows with v2 values and index with all versions
SAIUtil.setCurrentVersion(Version.DB);
createIndex("CREATE CUSTOM INDEX ON %s(val2) USING 'StorageAttachedIndex'");


flush(); // force new memtable classes to get version
for (int j = 0; j < 10; j++)
execute("INSERT INTO %s (pk, ck, val2) VALUES (-1, ?, ?)", j, j);

beforeAndAfterFlush(() -> {
assertNumRows(10, "SELECT ck FROM %%s WHERE val1 = 1 AND val2 >= 0 LIMIT 1000");
});
}
}
Loading
Loading