diff --git a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
index 71bbdd215f97..2008838b31b7 100644
--- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
+++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
@@ -360,6 +360,9 @@ public enum CassandraRelevantProperties
/** The current version of the SAI on-disk index format. */
SAI_CURRENT_VERSION("cassandra.sai.latest.version", "dc"),
+ /** The class to use for selecting the current version of the SAI on-disk index format on a per-keyspace basis. */
+ SAI_VERSION_SELECTOR_CLASS("cassandra.sai.version.selector.class", ""),
+
/** Controls the maximum top-k limit for vector search */
SAI_VECTOR_SEARCH_MAX_TOP_K("cassandra.sai.vector_search.max_top_k", "1000"),
diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/CreateIndexStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/CreateIndexStatement.java
index 44dab07febc0..1d95335a85d0 100644
--- a/src/java/org/apache/cassandra/cql3/statements/schema/CreateIndexStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/schema/CreateIndexStatement.java
@@ -278,9 +278,10 @@ private void validateIndexTarget(TableMetadata table, IndexMetadata.Kind kind, I
private String generateIndexName(KeyspaceMetadata keyspace, List targets)
{
+ assert keyspace.name.equals(keyspaceName);
String baseName = targets.size() == 1
- ? IndexMetadata.generateDefaultIndexName(tableName, targets.get(0).column)
- : IndexMetadata.generateDefaultIndexName(tableName, null);
+ ? IndexMetadata.generateDefaultIndexName(keyspaceName, tableName, targets.get(0).column)
+ : IndexMetadata.generateDefaultIndexName(keyspaceName, tableName, null);
return keyspace.findAvailableIndexName(baseName);
}
diff --git a/src/java/org/apache/cassandra/index/Index.java b/src/java/org/apache/cassandra/index/Index.java
index 615755190ce5..70c91d5a9536 100644
--- a/src/java/org/apache/cassandra/index/Index.java
+++ b/src/java/org/apache/cassandra/index/Index.java
@@ -843,8 +843,9 @@ default void unload() { }
* Returns the set of sstable-attached components that this group will create for a newly flushed sstable.
*
* Note that the result of this method is only valid for newly flushed/written sstables as the components
- * returned will assume a version of {@link Version#current()} and a generation of 0. SSTables for which some
- * index have been rebuild may have index components that do not match what this method return in particular.
+ * returned will assume a version of {@link Version#current(String)} and a generation of 0. SSTables for which
+ * some index have been rebuild may have index components that do not match what this method return in
+ * particular.
*/
Set componentsForNewSSTable();
diff --git a/src/java/org/apache/cassandra/index/sai/IndexContext.java b/src/java/org/apache/cassandra/index/sai/IndexContext.java
index c480f4cebea3..01e1bbd1d953 100644
--- a/src/java/org/apache/cassandra/index/sai/IndexContext.java
+++ b/src/java/org/apache/cassandra/index/sai/IndexContext.java
@@ -152,6 +152,7 @@ public class IndexContext
private final AbstractAnalyzer.AnalyzerFactory queryAnalyzerFactory;
private final PrimaryKey.Factory primaryKeyFactory;
+ private final Version version;
private final int maxTermSize;
private volatile boolean dropped = false;
@@ -177,7 +178,8 @@ public IndexContext(@Nonnull String keyspace,
this.viewManager = new IndexViewManager(this);
this.validator = TypeUtil.cellValueType(column, indexType);
this.cfs = cfs;
- this.primaryKeyFactory = Version.current().onDiskFormat().newPrimaryKeyFactory(clusteringComparator);
+ this.version = Version.current(keyspace);
+ this.primaryKeyFactory = version.onDiskFormat().newPrimaryKeyFactory(clusteringComparator);
String columnName = column.name.toString();
@@ -223,6 +225,11 @@ public IndexContext(@Nonnull String keyspace,
logger.debug(logMessage("Initialized index context with index writer config: {}"), indexWriterConfig);
}
+ public Version version()
+ {
+ return version;
+ }
+
public AbstractType> keyValidator()
{
return partitionKeyType;
@@ -409,7 +416,7 @@ private boolean validateCumulativeAnalyzedTermLimit(DecoratedKey key, AbstractAn
public void update(DecoratedKey key, Row oldRow, Row newRow, Memtable memtable, OpOrder.Group opGroup)
{
- if (Version.current().equals(Version.AA))
+ if (version.equals(Version.AA))
{
// AA cannot handle updates because it indexes partition keys instead of fully qualified primary keys.
index(key, newRow, memtable, opGroup);
@@ -683,7 +690,7 @@ public View getReferencedView(long timeoutNanos)
*/
public int openPerIndexFiles()
{
- return viewManager.getView().size() * Version.current().onDiskFormat().openFilesPerIndex(this);
+ return viewManager.getView().size() * version.onDiskFormat().openFilesPerIndex(this);
}
public void prepareSSTablesForRebuild(Collection sstablesToRebuild)
@@ -1020,7 +1027,7 @@ public long indexFileCacheSize()
public IndexFeatureSet indexFeatureSet()
{
- IndexFeatureSet.Accumulator accumulator = new IndexFeatureSet.Accumulator();
+ IndexFeatureSet.Accumulator accumulator = new IndexFeatureSet.Accumulator(version);
getView().getIndexes().stream().map(SSTableIndex::indexFeatureSet).forEach(set -> accumulator.accumulate(set));
return accumulator.complete();
}
diff --git a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java
index efc8920fef14..aa19f21dbab1 100644
--- a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java
+++ b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java
@@ -448,12 +448,12 @@ private Future> startInitialBuild(ColumnFamilyStore baseCfs, boolean validate,
return CompletableFuture.completedFuture(null);
}
- if (indexContext.isVector() && Version.current().compareTo(Version.JVECTOR_EARLIEST) < 0)
+ if (indexContext.isVector() && indexContext.version().compareTo(Version.JVECTOR_EARLIEST) < 0)
{
throw new FeatureNeedsIndexRebuildException(String.format("The current configured on-disk format version %s does not support vector indexes. " +
"The minimum version that supports vectors is %s. " +
"The on-disk format version can be set via the -D%s system property.",
- Version.current(),
+ indexContext.version(),
Version.JVECTOR_EARLIEST,
CassandraRelevantProperties.SAI_CURRENT_VERSION.name()));
}
diff --git a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexBuilder.java b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexBuilder.java
index 03dba3929839..326ab035659a 100644
--- a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexBuilder.java
+++ b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexBuilder.java
@@ -48,7 +48,6 @@
import org.apache.cassandra.index.sai.disk.format.ComponentsBuildId;
import org.apache.cassandra.index.sai.disk.format.IndexComponents;
import org.apache.cassandra.index.sai.disk.format.IndexDescriptor;
-import org.apache.cassandra.index.sai.disk.format.Version;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
@@ -321,7 +320,7 @@ private static void prepareForRebuild(IndexComponents.ForRead components, Set componentsForNewSSTable()
{
- return IndexDescriptor.componentsForNewlyFlushedSSTable(indices);
+ return IndexDescriptor.componentsForNewlyFlushedSSTable(indices, version);
}
@Override
diff --git a/src/java/org/apache/cassandra/index/sai/disk/ModernResettableByteBuffersIndexOutput.java b/src/java/org/apache/cassandra/index/sai/disk/ModernResettableByteBuffersIndexOutput.java
index 5fa0c6839fba..396a827b345e 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/ModernResettableByteBuffersIndexOutput.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/ModernResettableByteBuffersIndexOutput.java
@@ -23,6 +23,7 @@
import java.util.Map;
import java.util.Set;
+import org.apache.cassandra.index.sai.disk.format.Version;
import org.apache.cassandra.index.sai.disk.io.IndexOutput;
import org.apache.cassandra.index.sai.disk.oldlucene.ResettableByteBuffersIndexOutput;
import org.apache.lucene.store.ByteBuffersDataInput;
@@ -44,9 +45,9 @@ public class ModernResettableByteBuffersIndexOutput extends ResettableByteBuffer
private final ByteBuffersIndexOutput bbio;
private final ByteBuffersDataOutput delegate;
- public ModernResettableByteBuffersIndexOutput(int expectedSize, String name)
+ public ModernResettableByteBuffersIndexOutput(int expectedSize, String name, Version version)
{
- super("", name, ByteOrder.LITTLE_ENDIAN);
+ super("", name, ByteOrder.LITTLE_ENDIAN, version);
delegate = new ByteBuffersDataOutput(expectedSize);
bbio = new ByteBuffersIndexOutput(delegate, "", name + "-bb");
}
diff --git a/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java b/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java
index 0d0171323456..41cec9ec73d1 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java
@@ -91,7 +91,7 @@ public StorageAttachedIndexWriter(IndexDescriptor indexDescriptor,
TableMetrics tableMetrics) throws IOException
{
// We always write at the latest version (through what that version is can be configured for specific cases)
- var onDiskFormat = Version.current().onDiskFormat();
+ var onDiskFormat = Version.current(indexDescriptor.descriptor.ksname).onDiskFormat();
this.indexDescriptor = indexDescriptor;
// Note: I think there is a silent assumption here. That is, the PK factory we use here must be for the current
// format version, because that is what `IndexContext.keyFactory` always uses (see ctor)
@@ -126,7 +126,7 @@ public void begin()
public void startPartition(DecoratedKey key, long position)
{
if (aborted) return;
-
+
currentKey = key;
try
@@ -174,7 +174,7 @@ public void partitionLevelDeletion(DeletionTime deletionTime, long position)
public void staticRow(Row staticRow, long position)
{
if (aborted) return;
-
+
if (staticRow.isEmpty())
return;
@@ -279,7 +279,7 @@ public void complete(SSTable sstable)
/**
* Aborts all column index writers and, only if they have not yet completed, SSTable-level component writers.
- *
+ *
* @param accumulator the initial exception thrown from the failed writer
*/
@Override
@@ -300,12 +300,12 @@ public void abort(Throwable accumulator, boolean fromIndex)
// Mark the write aborted, so we can short-circuit any further operations on the component writers.
aborted = true;
-
+
// For non-compaction, make any indexes involved in this transaction non-queryable, as they will likely not match the backing table.
// For compaction: the compaction task should be aborted and new sstables will not be added to tracker
if (fromIndex && opType != OperationType.COMPACTION)
indices.forEach(StorageAttachedIndex::makeIndexNonQueryable);
-
+
for (PerIndexWriter perIndexWriter : perIndexWriters)
{
try
@@ -320,10 +320,10 @@ public void abort(Throwable accumulator, boolean fromIndex)
}
}
}
-
+
if (!tokenOffsetWriterCompleted)
{
- // If the token/offset files have already been written successfully, they can be reused later.
+ // If the token/offset files have already been written successfully, they can be reused later.
perSSTableWriter.abort(accumulator);
}
}
diff --git a/src/java/org/apache/cassandra/index/sai/disk/format/ComponentsBuildId.java b/src/java/org/apache/cassandra/index/sai/disk/format/ComponentsBuildId.java
index d59a1fae59c6..6f0e87d22a13 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/format/ComponentsBuildId.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/format/ComponentsBuildId.java
@@ -31,8 +31,6 @@
*/
public class ComponentsBuildId implements Comparable
{
- private static final ComponentsBuildId FOR_NEW_SSTABLE = ComponentsBuildId.current(0);
-
private final Version version;
private final int generation;
@@ -47,19 +45,13 @@ public static ComponentsBuildId of(Version version, int generation)
return new ComponentsBuildId(version, generation);
}
- public static ComponentsBuildId current(int generation)
- {
- return of(Version.current(), generation);
- }
-
- public static ComponentsBuildId forNewSSTable()
+ public static ComponentsBuildId forNewSSTable(Version version)
{
- return FOR_NEW_SSTABLE;
+ return ComponentsBuildId.of(version, 0);
}
- public static ComponentsBuildId forNewBuild(@Nullable ComponentsBuildId previousBuild, Predicate newBuildIsUsablePredicate)
+ public static ComponentsBuildId forNewBuild(Version version, @Nullable ComponentsBuildId previousBuild, Predicate newBuildIsUsablePredicate)
{
- Version version = Version.current();
// If we're not using immutable components, we always use generation 0, and we're fine if that overrides existing files
if (!version.useImmutableComponentFiles())
return new ComponentsBuildId(version, 0);
diff --git a/src/java/org/apache/cassandra/index/sai/disk/format/IndexComponents.java b/src/java/org/apache/cassandra/index/sai/disk/format/IndexComponents.java
index 89799a4740d6..ae92f9a6add3 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/format/IndexComponents.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/format/IndexComponents.java
@@ -75,7 +75,7 @@
* of the completion marker, but not of the other component of the group). The bumping of generations takes incomplete
* groups into account, and so incomplete groups are not overridden either. Essentially, the generation used by a new
* build is always one more than the highest generation of any component found on disk (for the group in question, and
- * the version we writting, usually {@link Version#current()}).
+ * the version we writting, usually {@link Version#current(String)}).
*/
public interface IndexComponents
{
diff --git a/src/java/org/apache/cassandra/index/sai/disk/format/IndexDescriptor.java b/src/java/org/apache/cassandra/index/sai/disk/format/IndexDescriptor.java
index 64ac811e011f..fddfd6c5f016 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/format/IndexDescriptor.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/format/IndexDescriptor.java
@@ -77,14 +77,13 @@ public class IndexDescriptor
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 1, TimeUnit.MINUTES);
- private static final ComponentsBuildId EMPTY_GROUP_MARKER = ComponentsBuildId.current(-1);
-
// TODO Because indexes can be added at any time to existing data, the Version of a column index
// may not match the Version of the base sstable. OnDiskFormat + IndexFeatureSet + IndexDescriptor
// was not designed with this in mind, leading to some awkwardness, notably in IFS where some features
// are per-sstable (`isRowAware`) and some are per-column (`hasVectorIndexChecksum`).
public final Descriptor descriptor;
+ private final ComponentsBuildId emptyGroupMarker;
// The per-sstable components for this descriptor. This is never `null` in practice, but 1) it's a bit easier to
// initialize it outsides of the ctor, and 2) it can actually change upon calls to `reload`.
@@ -94,6 +93,7 @@ public class IndexDescriptor
private IndexDescriptor(Descriptor descriptor)
{
this.descriptor = descriptor;
+ this.emptyGroupMarker = ComponentsBuildId.of(Version.current(descriptor.ksname), -1);
}
public static IndexDescriptor empty(Descriptor descriptor)
@@ -166,7 +166,7 @@ private IndexComponentsImpl initializeGroup(@Nullable IndexContext context, @Nul
private IndexComponentsImpl createEmptyGroup(@Nullable IndexContext context)
{
- return new IndexComponentsImpl(context, EMPTY_GROUP_MARKER);
+ return new IndexComponentsImpl(context, emptyGroupMarker);
}
/**
@@ -176,9 +176,9 @@ private IndexComponentsImpl createEmptyGroup(@Nullable IndexContext context)
* Please note that the final sstable may not contain all of these components, as some may be empty or not written
* due to the specific of the flush, but this should be a superset of the components written.
*/
- public static Set componentsForNewlyFlushedSSTable(Collection indices)
+ public static Set componentsForNewlyFlushedSSTable(Collection indices, Version version)
{
- ComponentsBuildId buildId = ComponentsBuildId.forNewSSTable();
+ ComponentsBuildId buildId = ComponentsBuildId.forNewSSTable(version);
Set components = new HashSet<>();
for (IndexComponentType component : buildId.version().onDiskFormat().perSSTableComponentTypes())
components.add(customComponentFor(buildId, component, null));
@@ -191,11 +191,11 @@ public static Set componentsForNewlyFlushedSSTable(Collection
- * This is a subset of {@link #componentsForNewlyFlushedSSTable(Collection)} and has the same caveats.
+ * This is a subset of {@link #componentsForNewlyFlushedSSTable(Collection, Version)} and has the same caveats.
*/
public static Set perIndexComponentsForNewlyFlushedSSTable(IndexContext context)
{
- return addPerIndexComponentsForNewlyFlushedSSTable(new HashSet<>(), ComponentsBuildId.forNewSSTable(), context);
+ return addPerIndexComponentsForNewlyFlushedSSTable(new HashSet<>(), ComponentsBuildId.forNewSSTable(context.version()), context);
}
private static Set addPerIndexComponentsForNewlyFlushedSSTable(Set addTo, ComponentsBuildId buildId, IndexContext context)
@@ -271,8 +271,9 @@ public IndexComponents.ForWrite newPerIndexComponentsForWrite(IndexContext conte
private IndexComponents.ForWrite newComponentsForWrite(@Nullable IndexContext context, IndexComponentsImpl currentComponents)
{
+ Version version = context != null ? context.version() : Version.current(descriptor.ksname);
var currentBuildId = currentComponents == null ? null : currentComponents.buildId;
- return new IndexComponentsImpl(context, ComponentsBuildId.forNewBuild(currentBuildId, candidateId -> {
+ return new IndexComponentsImpl(context, ComponentsBuildId.forNewBuild(version, currentBuildId, candidateId -> {
// This checks that there is no existing files on disk we would overwrite by using `candidateId` for our
// new build.
IndexComponentsImpl candidate = new IndexComponentsImpl(context, candidateId);
@@ -511,7 +512,7 @@ public void addIfExists(IndexComponentType component)
{
Preconditions.checkArgument(!sealed, "Should not add components for SSTable %s at this point; the completion marker has already been written", descriptor);
// When a sstable doesn't have any complete group, we use a marker empty one with a generation of -1:
- Preconditions.checkArgument(buildId != EMPTY_GROUP_MARKER, "Should not be adding component to empty components");
+ Preconditions.checkArgument(!buildId.equals(emptyGroupMarker), "Should not be adding component to empty components");
components.computeIfAbsent(component, type -> {
var created = new IndexComponentImpl(type);
return created.file().exists() ? created : null;
@@ -523,7 +524,7 @@ public IndexComponent.ForWrite addOrGet(IndexComponentType component)
{
Preconditions.checkArgument(!sealed, "Should not add components for SSTable %s at this point; the completion marker has already been written", descriptor);
// When a sstable doesn't have any complete group, we use a marker empty one with a generation of -1:
- Preconditions.checkArgument(buildId != EMPTY_GROUP_MARKER, "Should not be adding component to empty components");
+ Preconditions.checkArgument(!buildId.equals(emptyGroupMarker), "Should not be adding component to empty components");
return components.computeIfAbsent(component, IndexComponentImpl::new);
}
@@ -687,7 +688,7 @@ public IndexOutputWriter openOutput(boolean append) throws IOException
component,
file);
- return IndexFileUtils.instance.openOutput(file, byteOrder(), append);
+ return IndexFileUtils.instance.openOutput(file, byteOrder(), append, buildId.version());
}
@Override
diff --git a/src/java/org/apache/cassandra/index/sai/disk/format/IndexFeatureSet.java b/src/java/org/apache/cassandra/index/sai/disk/format/IndexFeatureSet.java
index 42fef730d673..5d38df5fc104 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/format/IndexFeatureSet.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/format/IndexFeatureSet.java
@@ -52,9 +52,9 @@ public interface IndexFeatureSet
* multiple sources. This will include all the SSTables included in a query and all the indexes
* attached to those SSTables, added using {@link Accumulator#accumulate}.
*
- * The feature set of the current version denoted by {@link Version#current()}
+ * The feature set of the current version denoted by {@link Version#current(String)}
* is implicitly added, so the result feature set will include only the features supported by the
- * current version.
+ * current version for the keyspace.
*
* The {@code Accumulator} creates an {@code IndexFeatureSet} this contains the features from
* all the associated feature sets where {@code false} is the highest priority. This means if any
@@ -68,9 +68,9 @@ class Accumulator
boolean hasTermsHistogram = true;
boolean complete = false;
- public Accumulator()
+ public Accumulator(Version version)
{
- accumulate(Version.current().onDiskFormat().indexFeatureSet());
+ accumulate(version.onDiskFormat().indexFeatureSet());
}
/**
diff --git a/src/java/org/apache/cassandra/index/sai/disk/format/Version.java b/src/java/org/apache/cassandra/index/sai/disk/format/Version.java
index 1d4a439bf682..de1c8fd174c0 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/format/Version.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/format/Version.java
@@ -20,12 +20,16 @@
import java.util.List;
import java.util.Optional;
import java.util.regex.Pattern;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.index.sai.IndexContext;
import org.apache.cassandra.index.sai.disk.v1.V1OnDiskFormat;
@@ -38,6 +42,7 @@
import org.apache.cassandra.index.sai.utils.TypeUtil;
import org.apache.cassandra.io.sstable.format.SSTableFormat;
import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.bytecomparable.ByteComparable;
import static com.google.common.base.Preconditions.checkArgument;
@@ -48,6 +53,7 @@
*/
public class Version implements Comparable
{
+ private static final Logger LOGGER = LoggerFactory.getLogger(Version.class);
// 6.8 formats
public static final Version AA = new Version("aa", V1OnDiskFormat.instance, Version::aaFileNameFormat);
// Stargazer
@@ -73,10 +79,11 @@ public class Version implements Comparable
public static final Version JVECTOR_EARLIEST = CA;
public static final Version BM25_EARLIEST = EC;
public static final Version LATEST = EC;
- // The current version can be configured to be an earlier version to support partial upgrades that don't
- // write newer versions of the on-disk formats. This is volatile rather than final so that tests may
- // use reflection to change it and safely publish across threads.
- public static volatile Version CURRENT = parse(currentVersionProperty());
+
+ // This is volatile rather than final so that tests may use reflection to change it and safely publish across threads,
+ // but it should not be changed outside of tests.
+ @SuppressWarnings("FieldMayBeFinal")
+ private static volatile Selector SELECTOR = Selector.fromProperty();
private static final Pattern GENERATION_PATTERN = Pattern.compile("\\d+");
@@ -91,11 +98,6 @@ private Version(String version, OnDiskFormat onDiskFormat, FileNameFormatter fil
this.fileNameFormatter = fileNameFormatter;
}
- private static String currentVersionProperty()
- {
- return CassandraRelevantProperties.SAI_CURRENT_VERSION.getString();
- }
-
public static Version parse(String input)
{
checkArgument(input != null);
@@ -108,9 +110,14 @@ public static Version parse(String input)
throw new IllegalArgumentException("Unrecognized SAI version string " + input);
}
- public static Version current()
+ /**
+ * @param keyspace the keyspace for which to select a version, assumed to belong to an existing keyspace.
+ * @return the version to use on new SSTables for the provided keyspace.
+ */
+ public static Version current(String keyspace)
{
- return CURRENT;
+ assert keyspace != null : "Keyspace name must not be null";
+ return SELECTOR.select(keyspace);
}
/**
@@ -118,9 +125,9 @@ public static Version current()
* do not exceed the system's filename length limit (defined in {@link SchemaConstants#FILENAME_LENGTH}).
* This accounts for all additional components in the filename.
*/
- public static int calculateIndexNameAllowedLength()
+ public static int calculateIndexNameAllowedLength(String keyspace)
{
- int addedLength = getAddedLengthFromDescriptorAndVersion();
+ int addedLength = getAddedLengthFromDescriptorAndVersion(keyspace);
assert addedLength < SchemaConstants.FILENAME_LENGTH;
return SchemaConstants.FILENAME_LENGTH - addedLength;
}
@@ -131,10 +138,10 @@ public static int calculateIndexNameAllowedLength()
*
* @return the length of the added prefixes and suffixes
*/
- private static int getAddedLengthFromDescriptorAndVersion()
+ private static int getAddedLengthFromDescriptorAndVersion(String keyspace)
{
// Prefixes and suffixes constructed by Version.stargazerFileNameFormat
- int versionNameLength = current().toString().length();
+ int versionNameLength = current(keyspace).toString().length();
// room for up to 999 generations
int generationLength = 3 + SAI_SEPARATOR.length();
int addedLength = SAI_DESCRIPTOR.length()
@@ -310,6 +317,51 @@ private static Optional tryParseAAFileName(String componentStr)
return Optional.of(new ParsedFileName(ComponentsBuildId.of(AA, 0), indexComponentType, indexName));
}
+ /**
+ * This is an interface for selecting the appropriate version of the on-disk format to use.
+ * This is going to be used by CNDB to inject the version of SAI to use for a given tenant
+ */
+ public interface Selector
+ {
+ /**
+ * Default version used by {@link #DEFAULT}.
+ */
+ Version DEFAULT_VERSION = Version.parse(CassandraRelevantProperties.SAI_CURRENT_VERSION.getString());
+
+ /**
+ * Default version selector that uses the same version for all keyspaces.
+ */
+ Selector DEFAULT = keyspace -> DEFAULT_VERSION;
+
+ /**
+ * @param keyspace the keyspace for which to select a version, assumed to belong to an existing keyspace.
+ * @return the version of the on-disk format to use for the provided keyspace, it should not be null.
+ */
+ @Nonnull
+ Version select(@Nonnull String keyspace);
+
+ static Selector fromProperty()
+ {
+ try
+ {
+ String selectorClass = CassandraRelevantProperties.SAI_VERSION_SELECTOR_CLASS.getString();
+ if (selectorClass.isEmpty())
+ {
+ return Selector.DEFAULT;
+ }
+ else
+ {
+ LOGGER.info("Using SAI version selector: {}", selectorClass);
+ return FBUtilities.construct(selectorClass, "SAI version selector");
+ }
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
//
// Stargazer filename formatter. This is the current SAI on-disk filename format
//
diff --git a/src/java/org/apache/cassandra/index/sai/disk/io/IndexOutput.java b/src/java/org/apache/cassandra/index/sai/disk/io/IndexOutput.java
index a57989088a43..ee4d48accabb 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/io/IndexOutput.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/io/IndexOutput.java
@@ -22,6 +22,7 @@
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import org.apache.cassandra.index.sai.disk.format.Version;
import org.apache.cassandra.utils.ByteBufferUtil;
/**
@@ -31,11 +32,13 @@
public abstract class IndexOutput extends org.apache.lucene.store.IndexOutput
{
protected final ByteOrder order;
+ protected final Version version;
- public IndexOutput(String resourceDescription, String name, ByteOrder order)
+ protected IndexOutput(String resourceDescription, String name, ByteOrder order, Version version)
{
super(resourceDescription, name);
this.order = order;
+ this.version = version;
}
public ByteOrder order()
@@ -43,6 +46,11 @@ public ByteOrder order()
return order;
}
+ public Version version()
+ {
+ return version;
+ }
+
public final void writeBytes(ByteBuffer buf) throws IOException
{
byte[] bytes = ByteBufferUtil.getArray(buf);
diff --git a/src/java/org/apache/cassandra/index/sai/disk/io/IndexOutputWriter.java b/src/java/org/apache/cassandra/index/sai/disk/io/IndexOutputWriter.java
index 1fdb3bdc5a41..7527ccecbc84 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/io/IndexOutputWriter.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/io/IndexOutputWriter.java
@@ -25,6 +25,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.index.sai.disk.format.Version;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.index.sai.utils.IndexFileUtils;
import org.apache.cassandra.io.util.SequentialWriter;
@@ -36,9 +37,9 @@ public class IndexOutputWriter extends IndexOutput
private final SequentialWriter out;
private boolean closed;
- public IndexOutputWriter(SequentialWriter out, ByteOrder order)
+ public IndexOutputWriter(SequentialWriter out, ByteOrder order, Version version)
{
- super(out.getFile().toString(), out.getFile().name(), order);
+ super(out.getFile().toString(), out.getFile().name(), order, version);
this.out = out;
}
diff --git a/src/java/org/apache/cassandra/index/sai/disk/oldlucene/LegacyByteBuffersIndexOutput.java b/src/java/org/apache/cassandra/index/sai/disk/oldlucene/LegacyByteBuffersIndexOutput.java
index c3d08cd3e399..6a685bb53a70 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/oldlucene/LegacyByteBuffersIndexOutput.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/oldlucene/LegacyByteBuffersIndexOutput.java
@@ -26,6 +26,7 @@
import java.util.zip.CRC32;
import java.util.zip.Checksum;
+import org.apache.cassandra.index.sai.disk.format.Version;
import org.apache.cassandra.index.sai.disk.io.IndexOutput;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.DataInput;
@@ -52,16 +53,16 @@ public final class LegacyByteBuffersIndexOutput extends IndexOutput
private LegacyByteBuffersDataOutput delegate;
- public LegacyByteBuffersIndexOutput(LegacyByteBuffersDataOutput delegate, String resourceDescription, String name)
+ public LegacyByteBuffersIndexOutput(LegacyByteBuffersDataOutput delegate, String resourceDescription, String name, Version version)
{
- this(delegate, resourceDescription, name, new CRC32(), null);
+ this(delegate, resourceDescription, name, new CRC32(), null, version);
}
public LegacyByteBuffersIndexOutput(LegacyByteBuffersDataOutput delegate, String resourceDescription, String name,
Checksum checksum,
- Consumer onClose)
+ Consumer onClose, Version version)
{
- super(resourceDescription, name, ByteOrder.BIG_ENDIAN);
+ super(resourceDescription, name, ByteOrder.BIG_ENDIAN, version);
this.delegate = delegate;
this.checksum = checksum;
this.onClose = onClose;
diff --git a/src/java/org/apache/cassandra/index/sai/disk/oldlucene/LegacyResettableByteBuffersIndexOutput.java b/src/java/org/apache/cassandra/index/sai/disk/oldlucene/LegacyResettableByteBuffersIndexOutput.java
index c92028e20a48..642624ebab2f 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/oldlucene/LegacyResettableByteBuffersIndexOutput.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/oldlucene/LegacyResettableByteBuffersIndexOutput.java
@@ -23,6 +23,7 @@
import java.util.Map;
import java.util.Set;
+import org.apache.cassandra.index.sai.disk.format.Version;
import org.apache.cassandra.index.sai.disk.io.IndexOutput;
import org.apache.lucene.store.ByteBuffersIndexOutput;
import org.apache.lucene.store.DataInput;
@@ -39,11 +40,11 @@ public class LegacyResettableByteBuffersIndexOutput extends ResettableByteBuffer
private final LegacyByteBuffersIndexOutput bbio;
private final LegacyByteBuffersDataOutput delegate;
- public LegacyResettableByteBuffersIndexOutput(int expectedSize, String name)
+ public LegacyResettableByteBuffersIndexOutput(int expectedSize, String name, Version version)
{
- super("", name, ByteOrder.BIG_ENDIAN);
+ super("", name, ByteOrder.BIG_ENDIAN, version);
delegate = new LegacyByteBuffersDataOutput(expectedSize);
- bbio = new LegacyByteBuffersIndexOutput(delegate, "", name + "-bb");
+ bbio = new LegacyByteBuffersIndexOutput(delegate, "", name + "-bb", version);
}
public LegacyByteBuffersDataInput toDataInput()
diff --git a/src/java/org/apache/cassandra/index/sai/disk/oldlucene/LuceneCompat.java b/src/java/org/apache/cassandra/index/sai/disk/oldlucene/LuceneCompat.java
index 1df6d80cb798..c664bdd9edc0 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/oldlucene/LuceneCompat.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/oldlucene/LuceneCompat.java
@@ -21,6 +21,7 @@
import java.nio.ByteOrder;
import org.apache.cassandra.index.sai.disk.ModernResettableByteBuffersIndexOutput;
+import org.apache.cassandra.index.sai.disk.format.Version;
import org.apache.cassandra.index.sai.utils.SeekingRandomAccessInput;
import org.apache.lucene.backward_codecs.packed.LegacyDirectReader;
import org.apache.lucene.backward_codecs.packed.LegacyDirectWriter;
@@ -55,11 +56,11 @@ public static int directWriterUnsignedBitsRequired(ByteOrder order, long maxValu
: LegacyDirectWriter.unsignedBitsRequired(maxValue);
}
- public static ResettableByteBuffersIndexOutput getResettableByteBuffersIndexOutput(ByteOrder order, int expectedSize, String name)
+ public static ResettableByteBuffersIndexOutput getResettableByteBuffersIndexOutput(ByteOrder order, int expectedSize, String name, Version version)
{
// Lucene 7.5 and earlier used big-endian ordering
- return order == ByteOrder.LITTLE_ENDIAN ? new ModernResettableByteBuffersIndexOutput(expectedSize, name)
- : new LegacyResettableByteBuffersIndexOutput(expectedSize, name);
+ return order == ByteOrder.LITTLE_ENDIAN ? new ModernResettableByteBuffersIndexOutput(expectedSize, name, version)
+ : new LegacyResettableByteBuffersIndexOutput(expectedSize, name, version);
}
public static ByteBuffersDataOutputAdapter getByteBuffersDataOutputAdapter(ByteOrder order, long expectedSize)
diff --git a/src/java/org/apache/cassandra/index/sai/disk/oldlucene/ResettableByteBuffersIndexOutput.java b/src/java/org/apache/cassandra/index/sai/disk/oldlucene/ResettableByteBuffersIndexOutput.java
index be8ec7b62eff..3fb463217768 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/oldlucene/ResettableByteBuffersIndexOutput.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/oldlucene/ResettableByteBuffersIndexOutput.java
@@ -21,6 +21,7 @@
import java.io.IOException;
import java.nio.ByteOrder;
+import org.apache.cassandra.index.sai.disk.format.Version;
import org.apache.cassandra.index.sai.disk.io.IndexOutput;
/**
@@ -30,10 +31,9 @@
public abstract class ResettableByteBuffersIndexOutput extends IndexOutput
{
-
- public ResettableByteBuffersIndexOutput(String resourceDescription, String name, ByteOrder order)
+ protected ResettableByteBuffersIndexOutput(String resourceDescription, String name, ByteOrder order, Version version)
{
- super(resourceDescription, name, order);
+ super(resourceDescription, name, order, version);
}
public abstract void copyTo(IndexOutput out) throws IOException;
diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java b/src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java
index bfade2e0baf8..9f5d42cedaf5 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java
@@ -208,7 +208,7 @@ private long flush(DecoratedKey minKey, DecoratedKey maxKey, AbstractType> ter
private boolean writeFrequencies()
{
- return indexContext().isAnalyzed() && Version.current().onOrAfter(Version.BM25_EARLIEST);
+ return indexContext().isAnalyzed() && indexContext().version().onOrAfter(Version.BM25_EARLIEST);
}
private void flushVectorIndex(DecoratedKey minKey, DecoratedKey maxKey, long startTime, Stopwatch stopwatch) throws IOException
@@ -233,7 +233,8 @@ private void flushVectorIndex(DecoratedKey minKey, DecoratedKey maxKey, long sta
ByteBufferUtil.bytes(0), // VSTODO by pass min max terms for vectors
ByteBufferUtil.bytes(0), // VSTODO by pass min max terms for vectors
null,
- metadataMap);
+ metadataMap,
+ perIndexComponents.version());
try (MetadataWriter writer = new MetadataWriter(perIndexComponents))
{
diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/MetadataWriter.java b/src/java/org/apache/cassandra/index/sai/disk/v1/MetadataWriter.java
index 7cc2df0ba118..d6fe73763cd9 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/v1/MetadataWriter.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/v1/MetadataWriter.java
@@ -48,7 +48,7 @@ public MetadataWriter(IndexComponents.ForWrite components) throws IOException
public IndexOutput builder(String name)
{
if (output.order() == ByteOrder.BIG_ENDIAN) {
- return new LegacyResettableByteBuffersIndexOutput(1024, name) {
+ return new LegacyResettableByteBuffersIndexOutput(1024, name, version) {
@Override
public void close()
{
@@ -56,7 +56,7 @@ public void close()
}
};
} else {
- return new ModernResettableByteBuffersIndexOutput(1024, name) {
+ return new ModernResettableByteBuffersIndexOutput(1024, name, version) {
@Override
public void close()
{
diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentBuilder.java b/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentBuilder.java
index a5d48f2fa066..3b1c31cc6aa9 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentBuilder.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentBuilder.java
@@ -203,7 +203,7 @@ public static class RAMStringSegmentBuilder extends SegmentBuilder
private boolean writeFrequencies()
{
- return !(analyzer instanceof NoOpAnalyzer) && Version.current().onOrAfter(Version.BM25_EARLIEST);
+ return !(analyzer instanceof NoOpAnalyzer) && components.version().onOrAfter(Version.BM25_EARLIEST);
}
public boolean isEmpty()
@@ -502,8 +502,8 @@ private long add(List terms, PrimaryKey key, long sstableRowId)
// Update term boundaries for all terms in this row
for (ByteBuffer term : terms)
{
- minTerm = TypeUtil.min(term, minTerm, termComparator, Version.current());
- maxTerm = TypeUtil.max(term, maxTerm, termComparator, Version.current());
+ minTerm = TypeUtil.min(term, minTerm, termComparator, components.version());
+ maxTerm = TypeUtil.max(term, maxTerm, termComparator, components.version());
}
rowCount++;
diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentMetadata.java b/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentMetadata.java
index c4d3ef5772ab..39caf9808842 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentMetadata.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentMetadata.java
@@ -110,7 +110,8 @@ public class SegmentMetadata implements Comparable
ByteBuffer minTerm,
ByteBuffer maxTerm,
TermsDistribution termsDistribution,
- ComponentMetadataMap componentMetadatas)
+ ComponentMetadataMap componentMetadatas,
+ Version version)
{
// numRows can exceed Integer.MAX_VALUE because it is the count of unique term and segmentRowId pairs.
Objects.requireNonNull(minKey);
@@ -118,7 +119,7 @@ public class SegmentMetadata implements Comparable
Objects.requireNonNull(minTerm);
Objects.requireNonNull(maxTerm);
- this.version = Version.current();
+ this.version = version;
this.segmentRowIdOffset = segmentRowIdOffset;
this.minSSTableRowId = minSSTableRowId;
this.maxSSTableRowId = maxSSTableRowId;
@@ -206,7 +207,7 @@ public static void write(MetadataWriter writer, List segments)
{
if (metadata.termsDistribution != null)
{
- var tmp = new ModernResettableByteBuffersIndexOutput(1024, "");
+ var tmp = new ModernResettableByteBuffersIndexOutput(1024, "", output.version());
metadata.termsDistribution.write(tmp);
output.writeInt(tmp.intSize());
tmp.copyTo(output);
diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentMetadataBuilder.java b/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentMetadataBuilder.java
index eec15eb53589..82901deb5877 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentMetadataBuilder.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentMetadataBuilder.java
@@ -39,7 +39,6 @@
import org.apache.cassandra.index.sai.disk.v1.kdtree.MutableOneDimPointValues;
import org.apache.cassandra.index.sai.disk.v6.TermsDistribution;
import org.apache.cassandra.index.sai.utils.PrimaryKey;
-import org.apache.cassandra.index.sai.utils.TypeUtil;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.bytecomparable.ByteComparable;
import org.apache.lucene.util.BytesRef;
@@ -74,18 +73,20 @@ public class SegmentMetadataBuilder
private long numRows;
private final TermsDistribution.Builder termsDistributionBuilder;
+ private final Version version;
SegmentMetadata.ComponentMetadataMap metadataMap;
public SegmentMetadataBuilder(long segmentRowIdOffset, IndexComponents.ForWrite components)
{
IndexContext context = Objects.requireNonNull(components.context());
+ this.version = context.version();
this.segmentRowIdOffset = segmentRowIdOffset;
this.byteComparableVersion = components.byteComparableVersionFor(IndexComponentType.TERMS_DATA);
int histogramSize = context.getIntOption(HISTOGRAM_SIZE_OPTION, 128);
int mostFrequentTermsCount = context.getIntOption(MFT_COUNT_OPTION, 128);
- this.termsDistributionBuilder = new TermsDistribution.Builder(context.getValidator(), byteComparableVersion, histogramSize, mostFrequentTermsCount);
+ this.termsDistributionBuilder = new TermsDistribution.Builder(context.getValidator(), byteComparableVersion, histogramSize, mostFrequentTermsCount, version);
}
public void setKeyRange(@Nonnull PrimaryKey minKey, @Nonnull PrimaryKey maxKey)
@@ -156,7 +157,8 @@ void add(ByteComparable term, int rowCount)
minTerm,
maxTerm,
termsDistributionBuilder.build(),
- metadataMap);
+ metadataMap,
+ version);
}
/**
@@ -373,5 +375,3 @@ public void close() throws IOException
}
}
-
-
diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java b/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java
index d1336bc5e2ce..a9d208b9febe 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java
@@ -221,8 +221,8 @@ protected Version getExpectedEarliestVersion(IndexContext context, IndexComponen
Version earliest = Version.EARLIEST;
if (isVectorDataComponent(context, indexComponentType))
{
- if (!Version.current().onOrAfter(Version.VECTOR_EARLIEST))
- throw new IllegalStateException("Configured current version " + Version.current() + " is not compatible with vector index");
+ if (!context.version().onOrAfter(Version.VECTOR_EARLIEST))
+ throw new IllegalStateException("Configured current version " + context.version() + " is not compatible with vector index");
earliest = Version.VECTOR_EARLIEST;
}
return earliest;
diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/bitpack/AbstractBlockPackedWriter.java b/src/java/org/apache/cassandra/index/sai/disk/v1/bitpack/AbstractBlockPackedWriter.java
index 01888b18cda8..e2f233525df8 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/v1/bitpack/AbstractBlockPackedWriter.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/v1/bitpack/AbstractBlockPackedWriter.java
@@ -49,7 +49,7 @@ public abstract class AbstractBlockPackedWriter
{
checkBlockSize(blockSize, MIN_BLOCK_SIZE, MAX_BLOCK_SIZE);
this.out = out;
- this.blockMetaWriter = LuceneCompat.getResettableByteBuffersIndexOutput(out.order(), 1024, "NumericValuesMeta");
+ this.blockMetaWriter = LuceneCompat.getResettableByteBuffersIndexOutput(out.order(), 1024, "NumericValuesMeta", out.version());
values = new long[blockSize];
}
diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/kdtree/BKDWriter.java b/src/java/org/apache/cassandra/index/sai/disk/v1/kdtree/BKDWriter.java
index c89544817507..79b9fbdd40e8 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/v1/kdtree/BKDWriter.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/v1/kdtree/BKDWriter.java
@@ -28,6 +28,7 @@
import com.google.common.base.MoreObjects;
+import org.apache.cassandra.index.sai.disk.format.Version;
import org.apache.cassandra.index.sai.disk.io.CryptoUtils;
import org.apache.cassandra.index.sai.disk.io.IndexOutput;
import org.apache.cassandra.index.sai.disk.oldlucene.ByteBuffersDataOutputAdapter;
@@ -126,6 +127,7 @@ public class BKDWriter implements Closeable
private final ICompressor compressor;
private final ByteOrder order;
+ private final Version version;
// reused when writing leaf blocks
private final ByteBuffersDataOutputAdapter scratchOut;
@@ -133,16 +135,16 @@ public class BKDWriter implements Closeable
public BKDWriter(long maxDoc, int numDims, int bytesPerDim,
int maxPointsInLeafNode, double maxMBSortInHeap, long totalPointCount, boolean singleValuePerDoc,
- ICompressor compressor, ByteOrder order) throws IOException
+ ICompressor compressor, ByteOrder order, Version version) throws IOException
{
this(maxDoc, numDims, bytesPerDim, maxPointsInLeafNode, maxMBSortInHeap, totalPointCount, singleValuePerDoc,
- totalPointCount > Integer.MAX_VALUE, compressor, order);
+ totalPointCount > Integer.MAX_VALUE, compressor, order, version);
}
protected BKDWriter(long maxDoc, int numDims, int bytesPerDim,
int maxPointsInLeafNode, double maxMBSortInHeap, long totalPointCount,
boolean singleValuePerDoc, boolean longOrds, ICompressor compressor,
- ByteOrder order) throws IOException
+ ByteOrder order, Version version) throws IOException
{
verifyParams(numDims, maxPointsInLeafNode, maxMBSortInHeap, totalPointCount);
// We use tracking dir to deal with removing files on exception, so each place that
@@ -154,6 +156,7 @@ protected BKDWriter(long maxDoc, int numDims, int bytesPerDim,
this.maxDoc = maxDoc;
this.compressor = compressor;
this.order = order;
+ this.version = version;
docsSeen = new LongBitSet(maxDoc);
packedBytesLength = numDims * bytesPerDim;
@@ -629,7 +632,7 @@ private byte[] packIndex(long[] leafBlockFPs, byte[] splitPackedValues) throws I
}
// Reused while packing the index
- var writeBuffer = LuceneCompat.getResettableByteBuffersIndexOutput(order, 1024, "");
+ var writeBuffer = LuceneCompat.getResettableByteBuffersIndexOutput(order, 1024, "", version);
// This is the "file" we append the byte[] to:
List blocks = new ArrayList<>();
@@ -843,7 +846,7 @@ private void writeIndex(IndexOutput out, int countPerLeaf, int numLeaves, byte[]
if (compressor != null)
{
- var ramOut = LuceneCompat.getResettableByteBuffersIndexOutput(order, 1024, "");
+ var ramOut = LuceneCompat.getResettableByteBuffersIndexOutput(order, 1024, "", out.version());
ramOut.writeBytes(minPackedValue, 0, packedBytesLength);
ramOut.writeBytes(maxPackedValue, 0, packedBytesLength);
diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/kdtree/NumericIndexWriter.java b/src/java/org/apache/cassandra/index/sai/disk/v1/kdtree/NumericIndexWriter.java
index 48567e9e6fe0..da6bcad25d25 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/v1/kdtree/NumericIndexWriter.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/v1/kdtree/NumericIndexWriter.java
@@ -94,7 +94,8 @@ public NumericIndexWriter(IndexComponents.ForWrite components,
BKDWriter.DEFAULT_MAX_MB_SORT_IN_HEAP,
numRows,
true, null,
- components.addOrGet(IndexComponentType.KD_TREE).byteOrder());
+ components.addOrGet(IndexComponentType.KD_TREE).byteOrder(),
+ components.version());
}
@Override
diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/postings/PostingsWriter.java b/src/java/org/apache/cassandra/index/sai/disk/v1/postings/PostingsWriter.java
index 5bf0decfb8e4..b62cacc7372a 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/v1/postings/PostingsWriter.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/v1/postings/PostingsWriter.java
@@ -144,7 +144,7 @@ private PostingsWriter(IndexOutput dataOutput, int blockEntries, boolean writeFr
startOffset = dataOutput.getFilePointer();
deltaBuffer = new long[blockEntries];
freqBuffer = new int[blockEntries];
- inMemoryOutput = LuceneCompat.getResettableByteBuffersIndexOutput(dataOutput.order(), 1024, "blockOffsets");
+ inMemoryOutput = LuceneCompat.getResettableByteBuffersIndexOutput(dataOutput.order(), 1024, "blockOffsets", dataOutput.version());
SAICodecUtils.writeHeader(dataOutput);
}
diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/trie/InvertedIndexWriter.java b/src/java/org/apache/cassandra/index/sai/disk/v1/trie/InvertedIndexWriter.java
index 3e571b819bd5..161c83d0fd56 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/v1/trie/InvertedIndexWriter.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/v1/trie/InvertedIndexWriter.java
@@ -56,7 +56,7 @@ public InvertedIndexWriter(IndexComponents.ForWrite components, boolean writeFre
{
this.termsDictionaryWriter = new TrieTermsDictionaryWriter(components);
this.postingsWriter = new PostingsWriter(components, writeFrequencies);
- this.docLengthsWriter = Version.current().onOrAfter(Version.BM25_EARLIEST) ? new DocLengthsWriter(components) : null;
+ this.docLengthsWriter = components.version().onOrAfter(Version.BM25_EARLIEST) ? new DocLengthsWriter(components) : null;
}
/**
diff --git a/src/java/org/apache/cassandra/index/sai/disk/v2/sortedterms/SortedTermsWriter.java b/src/java/org/apache/cassandra/index/sai/disk/v2/sortedterms/SortedTermsWriter.java
index 6c18f7979040..97f856fe3aef 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/v2/sortedterms/SortedTermsWriter.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/v2/sortedterms/SortedTermsWriter.java
@@ -112,7 +112,7 @@ public SortedTermsWriter(@NonNull IndexComponent.ForWrite termsDataComponent,
SAICodecUtils.writeHeader(this.trieOutput);
this.trieWriter = IncrementalTrieWriter.open(trieSerializer, trieOutput.asSequentialWriter(), TypeUtil.BYTE_COMPARABLE_VERSION);
this.termsOutput = termsDataComponent.openOutput();
- SAICodecUtils.writeHeader(termsOutput);
+ SAICodecUtils.writeHeader(termsOutput, termsDataComponent.parent().version());
this.bytesStartFP = termsOutput.getFilePointer();
this.offsetsWriter = termsDataBlockOffsets;
}
diff --git a/src/java/org/apache/cassandra/index/sai/disk/v5/V5OnDiskFormat.java b/src/java/org/apache/cassandra/index/sai/disk/v5/V5OnDiskFormat.java
index b6763c816a3f..449c6e7cbd4b 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/v5/V5OnDiskFormat.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/v5/V5OnDiskFormat.java
@@ -32,9 +32,9 @@ public class V5OnDiskFormat extends V4OnDiskFormat
{
public static final V5OnDiskFormat instance = new V5OnDiskFormat();
- public static boolean writeV5VectorPostings()
+ public static boolean writeV5VectorPostings(Version version)
{
- return Version.current().onOrAfter(Version.DC);
+ return version.onOrAfter(Version.DC);
}
@Override
diff --git a/src/java/org/apache/cassandra/index/sai/disk/v5/V5VectorPostingsWriter.java b/src/java/org/apache/cassandra/index/sai/disk/v5/V5VectorPostingsWriter.java
index 27e2317b057f..3fed1d69cf9a 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/v5/V5VectorPostingsWriter.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/v5/V5VectorPostingsWriter.java
@@ -40,6 +40,7 @@
import org.agrona.collections.Int2IntHashMap;
import org.agrona.collections.Int2ObjectHashMap;
import org.agrona.collections.IntArrayList;
+import org.apache.cassandra.index.sai.disk.format.Version;
import org.apache.cassandra.index.sai.disk.vector.VectorPostings;
import org.apache.cassandra.io.util.SequentialWriter;
@@ -386,9 +387,9 @@ public RemappedPostings(Structure structure, int maxNewOrdinal, int maxRowId, Bi
/**
* @see RemappedPostings
*/
- public static RemappedPostings remapForMemtable(Map, ? extends VectorPostings> postingsMap)
+ public static RemappedPostings remapForMemtable(Map, ? extends VectorPostings> postingsMap, Version version)
{
- assert V5OnDiskFormat.writeV5VectorPostings();
+ assert V5OnDiskFormat.writeV5VectorPostings(version);
BiMap ordinalMap = HashBiMap.create();
Int2IntHashMap extraPostings = new Int2IntHashMap(Integer.MIN_VALUE);
diff --git a/src/java/org/apache/cassandra/index/sai/disk/v6/TermsDistribution.java b/src/java/org/apache/cassandra/index/sai/disk/v6/TermsDistribution.java
index ff75dd847c01..891c3d2d7ab7 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/v6/TermsDistribution.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/v6/TermsDistribution.java
@@ -440,6 +440,7 @@ private static Version decodeIndexVersion(String versionStr) throws IOException
public static class Builder
{
final AbstractType> termType;
+ final Version version;
final ByteComparable.Version byteComparableVersion;
final int histogramSize;
final int mostFrequentTermsTableSize;
@@ -456,12 +457,14 @@ public static class Builder
public Builder(AbstractType> termType,
ByteComparable.Version byteComparableVersion,
int histogramSize,
- int mostFrequentTermsTableSize)
+ int mostFrequentTermsTableSize,
+ Version version)
{
this.termType = termType;
this.byteComparableVersion = byteComparableVersion;
this.histogramSize = histogramSize;
this.mostFrequentTermsTableSize = mostFrequentTermsTableSize;
+ this.version = version;
// Let's start with adding buckets for every point.
// This will be corrected to a higher value once the histogram gets too large and we'll do shrinking.
@@ -508,7 +511,7 @@ public TermsDistribution build()
mft.put(point.term, point.rowCount);
}
- return new TermsDistribution(termType, buckets, mft, Version.current(), byteComparableVersion);
+ return new TermsDistribution(termType, buckets, mft, version, byteComparableVersion);
}
/**
diff --git a/src/java/org/apache/cassandra/index/sai/disk/vector/CassandraOnHeapGraph.java b/src/java/org/apache/cassandra/index/sai/disk/vector/CassandraOnHeapGraph.java
index a0c3ab13ff0d..1d70352c1a1a 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/vector/CassandraOnHeapGraph.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/vector/CassandraOnHeapGraph.java
@@ -36,6 +36,7 @@
import java.util.stream.IntStream;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.cassandra.index.sai.disk.format.Version;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -134,6 +135,10 @@ public enum PQVersion {
*/
public CassandraOnHeapGraph(IndexContext context, boolean forSearching, Memtable memtable)
{
+ // TODO CNDB-15619: we might want to port back CNDB-14301
+ if (!context.version().onOrAfter(Version.JVECTOR_EARLIEST))
+ throw new UnsupportedOperationException("JVector is not supported in V2OnDiskFormat");
+
this.source = memtable == null
? "null"
: memtable.getClass().getSimpleName() + '@' + Integer.toHexString(memtable.hashCode());
@@ -400,7 +405,7 @@ public SegmentMetadata.ComponentMetadataMap flush(IndexComponents.ForWrite perIn
// compute the remapping of old ordinals to new (to fill in holes from deletion and/or to create a
// closer correspondance to rowids, simplifying postings lookups later)
V5VectorPostingsWriter.RemappedPostings remappedPostings;
- if (V5OnDiskFormat.writeV5VectorPostings())
+ if (V5OnDiskFormat.writeV5VectorPostings(perIndexComponents.version()))
{
// remove postings corresponding to marked-deleted vectors
var it = postingsMap.entrySet().iterator();
@@ -419,7 +424,7 @@ public SegmentMetadata.ComponentMetadataMap flush(IndexComponents.ForWrite perIn
deletedOrdinals.stream().parallel().forEach(builder::markNodeDeleted);
deletedOrdinals.clear();
builder.cleanup();
- remappedPostings = V5VectorPostingsWriter.remapForMemtable(postingsMap);
+ remappedPostings = V5VectorPostingsWriter.remapForMemtable(postingsMap, perIndexComponents.version());
}
else
{
@@ -448,7 +453,7 @@ public SegmentMetadata.ComponentMetadataMap flush(IndexComponents.ForWrite perIn
SAICodecUtils.writeHeader(pqOutput);
SAICodecUtils.writeHeader(postingsOutput);
indexWriter.getOutput().seek(indexFile.length()); // position at the end of the previous segment before writing our own header
- SAICodecUtils.writeHeader(SAICodecUtils.toLuceneOutput(indexWriter.getOutput()));
+ SAICodecUtils.writeHeader(SAICodecUtils.toLuceneOutput(indexWriter.getOutput()), perIndexComponents.version());
assert indexWriter.getOutput().position() == termsOffset : "termsOffset " + termsOffset + " != " + indexWriter.getOutput().position();
// compute and write PQ
@@ -459,7 +464,7 @@ public SegmentMetadata.ComponentMetadataMap flush(IndexComponents.ForWrite perIn
// write postings
long postingsOffset = postingsOutput.getFilePointer();
long postingsPosition;
- if (V5OnDiskFormat.writeV5VectorPostings())
+ if (V5OnDiskFormat.writeV5VectorPostings(perIndexComponents.version()))
{
assert deletedOrdinals.isEmpty(); // V5 format does not support recording deleted ordinals
postingsPosition = new V5VectorPostingsWriter(remappedPostings)
diff --git a/src/java/org/apache/cassandra/index/sai/disk/vector/CompactionGraph.java b/src/java/org/apache/cassandra/index/sai/disk/vector/CompactionGraph.java
index 8e09abfbe0bc..7c820b0bd903 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/vector/CompactionGraph.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/vector/CompactionGraph.java
@@ -34,6 +34,7 @@
import java.util.stream.IntStream;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.cassandra.index.sai.disk.format.Version;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -169,7 +170,7 @@ public CompactionGraph(IndexComponents.ForWrite perIndexComponents, VectorCompre
// with "holes" in the ordinal sequence (and pq and data files) which we would prefer to avoid
// (hence the effort to predict `allRowsHaveVectors`) but will not cause correctness issues,
// and the next compaction will fill in the holes.
- this.useSyntheticOrdinals = !V5OnDiskFormat.writeV5VectorPostings() || !allRowsHaveVectors;
+ this.useSyntheticOrdinals = !V5OnDiskFormat.writeV5VectorPostings(context.version()) || !allRowsHaveVectors;
// the extension here is important to signal to CFS.scrubDataDirectories that it should be removed if present at restart
Component tmpComponent = new Component(Component.Type.CUSTOM, "chronicle" + Descriptor.TMP_EXT);
@@ -218,7 +219,7 @@ else if (compressor instanceof BinaryQuantization)
// placeholder writer, will be replaced at flush time when we finalize the index contents
writer = createTermsWriterBuilder().withMapper(new OrdinalMapper.IdentityMapper(maxRowsInGraph)).build();
writer.getOutput().seek(termsFile.length()); // position at the end of the previous segment before writing our own header
- SAICodecUtils.writeHeader(SAICodecUtils.toLuceneOutput(writer.getOutput()));
+ SAICodecUtils.writeHeader(SAICodecUtils.toLuceneOutput(writer.getOutput()), context.version());
}
private OnDiskGraphIndexWriter.Builder createTermsWriterBuilder() throws IOException
@@ -394,6 +395,7 @@ public SegmentMetadata.ComponentMetadataMap flush() throws IOException
// write PQ (time to do this is negligible, don't bother doing it async)
long pqOffset = pqOutput.getFilePointer();
+ Version version = context.version();
CassandraOnHeapGraph.writePqHeader(pqOutput.asSequentialWriter(), unitVectors, VectorCompression.CompressionType.PRODUCT_QUANTIZATION);
compressedVectors.write(pqOutput.asSequentialWriter(), JVECTOR_VERSION); // VSTODO old version until we add APQ
long pqLength = pqOutput.getFilePointer() - pqOffset;
@@ -412,7 +414,7 @@ public SegmentMetadata.ComponentMetadataMap flush() throws IOException
// (ending up at ONE_TO_MANY when the source sstables were not is unusual, but possible,
// if a row with null vector in sstable A gets updated with a vector in sstable B)
if (postingsStructure == Structure.ONE_TO_MANY
- && (!V5OnDiskFormat.writeV5VectorPostings() || useSyntheticOrdinals))
+ && (!V5OnDiskFormat.writeV5VectorPostings(version) || useSyntheticOrdinals))
{
postingsStructure = Structure.ZERO_OR_ONE_TO_MANY;
}
@@ -422,7 +424,7 @@ public SegmentMetadata.ComponentMetadataMap flush() throws IOException
ordinalMapper.set(rp.ordinalMapper);
try (var view = index.getView())
{
- if (V5OnDiskFormat.writeV5VectorPostings())
+ if (V5OnDiskFormat.writeV5VectorPostings(version))
{
return new V5VectorPostingsWriter(rp).writePostings(postingsOutput.asSequentialWriter(), view, postingsMap);
}
diff --git a/src/java/org/apache/cassandra/index/sai/memory/TrieMemoryIndex.java b/src/java/org/apache/cassandra/index/sai/memory/TrieMemoryIndex.java
index b66f5275c7c2..5c8ee5d2c7be 100644
--- a/src/java/org/apache/cassandra/index/sai/memory/TrieMemoryIndex.java
+++ b/src/java/org/apache/cassandra/index/sai/memory/TrieMemoryIndex.java
@@ -97,6 +97,7 @@ public class TrieMemoryIndex extends MemoryIndex
private ByteBuffer minTerm;
private ByteBuffer maxTerm;
+ private final Version version;
private static final FastThreadLocal lastQueueSize = new FastThreadLocal()
{
@@ -115,12 +116,13 @@ public TrieMemoryIndex(IndexContext indexContext)
public TrieMemoryIndex(IndexContext indexContext, Memtable memtable, AbstractBounds keyBounds)
{
super(indexContext);
+ this.version = indexContext.version();
this.keyBounds = keyBounds;
this.primaryKeysHeapAllocations = new LongAdder();
this.primaryKeysAccumulator = new PrimaryKeysAccumulator(primaryKeysHeapAllocations);
this.primaryKeysRemover = new PrimaryKeysRemover(primaryKeysHeapAllocations);
this.analyzerTransformsValue = indexContext.getAnalyzerFactory().create().transformValue();
- this.data = InMemoryTrie.longLived(TypeUtil.byteComparableVersionForTermsData(), TrieMemtable.BUFFER_TYPE, indexContext.columnFamilyStore().readOrdering());
+ this.data = InMemoryTrie.longLived(TypeUtil.byteComparableVersionForTermsData(indexContext.version()), TrieMemtable.BUFFER_TYPE, indexContext.columnFamilyStore().readOrdering());
this.memtable = memtable;
}
@@ -354,12 +356,12 @@ private KeyRangeIterator rangeMatch(Expression expression, AbstractBounds {
// Before version DB, we encoded composite types using a non order-preserving function. In order to
// perform a range query on a map, we use the bounds to get all entries for a given map key and then
// only keep the map entries that satisfy the expression.
- assert entry.getKey().encodingVersion() == TypeUtil.BYTE_COMPARABLE_VERSION || Version.current() == Version.AA;
+ assert entry.getKey().encodingVersion() == TypeUtil.BYTE_COMPARABLE_VERSION || version == Version.AA;
byte[] key = ByteSourceInverse.readBytes(entry.getKey().getPreencodedBytes());
if (expression.isSatisfiedBy(ByteBuffer.wrap(key)))
mergingIteratorBuilder.add(entry.getValue());
@@ -420,7 +422,7 @@ private long estimateNumRowsMatchingRange(Expression expression)
return 0;
AbstractType> termType = indexContext.getValidator();
- ByteBuffer endTerm = expression.upper != null && TypeUtil.compare(expression.upper.value.encoded, maxTerm, termType, Version.current()) < 0
+ ByteBuffer endTerm = expression.upper != null && TypeUtil.compare(expression.upper.value.encoded, maxTerm, termType, version) < 0
? expression.upper.value.encoded
: maxTerm;
@@ -472,7 +474,7 @@ private long estimateNumRowsMatchingRange(Expression expression)
*/
private BigDecimal toBigDecimal(ByteBuffer endTerm)
{
- ByteComparable bc = Version.current().onDiskFormat().encodeForTrie(endTerm, indexContext.getValidator());
+ ByteComparable bc = version.onDiskFormat().encodeForTrie(endTerm, indexContext.getValidator());
return toBigDecimal(bc);
}
@@ -484,7 +486,7 @@ private BigDecimal toBigDecimal(ByteBuffer endTerm)
private BigDecimal toBigDecimal(ByteComparable term)
{
AbstractType> type = indexContext.getValidator();
- return TermsDistribution.toBigDecimal(term, type, Version.current(), TypeUtil.BYTE_COMPARABLE_VERSION);
+ return TermsDistribution.toBigDecimal(term, type, version, TypeUtil.BYTE_COMPARABLE_VERSION);
}
private Trie getSubtrie(@Nullable Expression expression)
@@ -496,7 +498,7 @@ private Trie getSubtrie(@Nullable Expression expression)
boolean lowerInclusive, upperInclusive;
if (expression.lower != null)
{
- lowerBound = expression.getEncodedLowerBoundByteComparable(Version.current());
+ lowerBound = expression.getEncodedLowerBoundByteComparable(version);
lowerInclusive = expression.lower.inclusive;
}
else
@@ -507,7 +509,7 @@ private Trie getSubtrie(@Nullable Expression expression)
if (expression.upper != null)
{
- upperBound = expression.getEncodedUpperBoundByteComparable(Version.current());
+ upperBound = expression.getEncodedUpperBoundByteComparable(version);
upperInclusive = expression.upper.inclusive;
}
else
@@ -537,13 +539,13 @@ private void setMinMaxTerm(ByteBuffer term)
// An alternative solution could use the trie to find the min/max term, but the trie has ByteComparable
// objects, not the ByteBuffer, and we would need to implement a custom decoder to undo the encodeForTrie
// mapping.
- minTerm = TypeUtil.min(term, minTerm, indexContext.getValidator(), Version.current());
- maxTerm = TypeUtil.max(term, maxTerm, indexContext.getValidator(), Version.current());
+ minTerm = TypeUtil.min(term, minTerm, indexContext.getValidator(), version);
+ maxTerm = TypeUtil.max(term, maxTerm, indexContext.getValidator(), version);
}
private ByteComparable asByteComparable(ByteBuffer input)
{
- return Version.current().onDiskFormat().encodeForTrie(input, indexContext.getValidator());
+ return version.onDiskFormat().encodeForTrie(input, indexContext.getValidator());
}
/**
diff --git a/src/java/org/apache/cassandra/index/sai/memory/TrieMemtableIndex.java b/src/java/org/apache/cassandra/index/sai/memory/TrieMemtableIndex.java
index a47dae9cb345..fe4a7189be57 100644
--- a/src/java/org/apache/cassandra/index/sai/memory/TrieMemtableIndex.java
+++ b/src/java/org/apache/cassandra/index/sai/memory/TrieMemtableIndex.java
@@ -89,6 +89,7 @@ public class TrieMemtableIndex implements MemtableIndex
private final Memtable memtable;
private final Context sensorContext;
private final RequestTracker requestTracker;
+ protected final Version version;
public TrieMemtableIndex(IndexContext indexContext, Memtable memtable)
{
@@ -109,6 +110,7 @@ public TrieMemtableIndex(IndexContext indexContext, Memtable memtable, int shard
}
this.sensorContext = Context.from(indexContext);
this.requestTracker = RequestTracker.instance;
+ this.version = indexContext.version();
}
@Override
@@ -160,7 +162,7 @@ public ByteBuffer getMinTerm()
return Arrays.stream(rangeIndexes)
.map(MemoryIndex::getMinTerm)
.filter(Objects::nonNull)
- .reduce((a, b) -> TypeUtil.min(a, b, validator, Version.current()))
+ .reduce((a, b) -> TypeUtil.min(a, b, validator, version))
.orElse(null);
}
@@ -177,7 +179,7 @@ public ByteBuffer getMaxTerm()
return Arrays.stream(rangeIndexes)
.map(MemoryIndex::getMaxTerm)
.filter(Objects::nonNull)
- .reduce((a, b) -> TypeUtil.max(a, b, validator, Version.current()))
+ .reduce((a, b) -> TypeUtil.max(a, b, validator, version))
.orElse(null);
}
@@ -463,7 +465,7 @@ private org.apache.cassandra.db.rows.Cell> getCellForKey(PrimaryKey key)
private ByteComparable encode(ByteBuffer input)
{
- return Version.current().onDiskFormat().encodeForTrie(input, indexContext.getValidator());
+ return version.onDiskFormat().encodeForTrie(input, indexContext.getValidator());
}
/**
diff --git a/src/java/org/apache/cassandra/index/sai/plan/Plan.java b/src/java/org/apache/cassandra/index/sai/plan/Plan.java
index 1346be04c269..e03caa7f9840 100644
--- a/src/java/org/apache/cassandra/index/sai/plan/Plan.java
+++ b/src/java/org/apache/cassandra/index/sai/plan/Plan.java
@@ -1270,9 +1270,9 @@ private KeysIterationCost estimateAnnSortCost()
double expectedKeys = access.expectedAccessCount(source.expectedKeys());
int expectedKeysInt = max(1, (int) Math.ceil(expectedKeys));
int expectedSourceKeysInt = max(1, (int) Math.ceil(source.expectedKeys()));
- double initCost = ANN_SORT_OPEN_COST * factory.tableMetrics.sstables
+ double initCost = annSortOpenCost(factory.keyspace) * factory.tableMetrics.sstables
+ source.fullCost()
- + source.expectedKeys() * CostCoefficients.ANN_SORT_KEY_COST;
+ + source.expectedKeys() * CostCoefficients.annSortKeyCost(factory.keyspace);
double searchCost = factory.costEstimator.estimateAnnSearchCost(ordering,
expectedKeysInt,
expectedSourceKeysInt);
@@ -1698,6 +1698,8 @@ protected String title()
@NotThreadSafe
public static final class Factory
{
+ public final String keyspace;
+
/** Table metrics that affect cost estimates, e.g. row count, sstable count etc */
public final TableMetrics tableMetrics;
@@ -1719,8 +1721,9 @@ public static final class Factory
* Creates a factory that produces Plan nodes.
* @param tableMetrics allows the planner to adapt the cost estimates to the actual amount of data stored in the table
*/
- public Factory(TableMetrics tableMetrics, CostEstimator costEstimator)
+ public Factory(String keyspace, TableMetrics tableMetrics, CostEstimator costEstimator)
{
+ this.keyspace = keyspace;
this.tableMetrics = tableMetrics;
this.costEstimator = costEstimator;
this.nothing = new Nothing(-1, this);
@@ -1985,14 +1988,6 @@ public static class CostCoefficients
/** Cost to advance the index iterator to the next key and load the key. Common for literal and numeric indexes. */
public final static double SAI_KEY_COST = 0.1;
- /** Cost to begin processing PKs into index ordinals for estimateAnnSortCost */
- // DC introduced the one-to-many ordinal mapping optimization
- public final static double ANN_SORT_OPEN_COST = Version.current().onOrAfter(Version.DC) ? 370 : 4200;
-
- /** Additional overhead needed to process each input key fed to the ANN index searcher */
- // DC introduced the one-to-many ordinal mapping optimization
- public final static double ANN_SORT_KEY_COST = Version.current().onOrAfter(Version.DC) ? 0.03 : 0.2;
-
/** Cost to get a scored key from DiskANN (~rerank cost). Affected by cache hit rate */
public final static double ANN_SCORED_KEY_COST = 15;
@@ -2013,6 +2008,21 @@ public static class CostCoefficients
/** Cost to perform BM25 scoring, per query term */
public final static double BM25_SCORE_COST = 0.5;
+
+ /** Cost to begin processing PKs into index ordinals for estimateAnnSortCost */
+ // DC introduced the one-to-many ordinal mapping optimization
+ public static double annSortOpenCost(String keyspace)
+ {
+ return Version.current(keyspace).onOrAfter(Version.DC) ? 370 : 4200;
+ }
+
+ /** Additional overhead needed to process each input key fed to the ANN index searcher */
+ // DC introduced the one-to-many ordinal mapping optimization
+ public static double annSortKeyCost(String keyspace)
+ {
+ return Version.current(keyspace).onOrAfter(Version.DC) ? 0.03 : 0.2;
+ }
+
}
/** Convenience builder for building intersection and union nodes */
diff --git a/src/java/org/apache/cassandra/index/sai/plan/QueryController.java b/src/java/org/apache/cassandra/index/sai/plan/QueryController.java
index b489d0ecd356..bb1aef495469 100644
--- a/src/java/org/apache/cassandra/index/sai/plan/QueryController.java
+++ b/src/java/org/apache/cassandra/index/sai/plan/QueryController.java
@@ -185,7 +185,7 @@ public QueryController(ColumnFamilyStore cfs,
avgCellsPerRow(),
avgRowSizeInBytes(),
cfs.getLiveSSTables().size());
- this.planFactory = new Plan.Factory(tableMetrics, this);
+ this.planFactory = new Plan.Factory(cfs.metadata.keyspace, tableMetrics, this);
}
public PrimaryKey.Factory primaryKeyFactory()
@@ -605,7 +605,7 @@ public CloseableIterator getTopKRows(Expression predicate
{
// Only the disk format limits the features of the index, but we also fail for in memory indexes because they
// will fail when flushed.
- if (orderer.isBM25() && !Version.current().onOrAfter(Version.BM25_EARLIEST))
+ if (orderer.isBM25() && !orderer.context.version().onOrAfter(Version.BM25_EARLIEST))
{
throw new FeatureNeedsIndexRebuildException(String.format(INDEX_VERSION_DOES_NOT_SUPPORT_BM25,
orderer.context.getIndexName()));
diff --git a/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexQueryPlan.java b/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexQueryPlan.java
index 39a66cbad9de..dcafa4c942dc 100644
--- a/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexQueryPlan.java
+++ b/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexQueryPlan.java
@@ -36,6 +36,7 @@
import org.apache.cassandra.index.Index;
import org.apache.cassandra.index.sai.StorageAttachedIndex;
import org.apache.cassandra.index.sai.disk.format.IndexFeatureSet;
+import org.apache.cassandra.index.sai.disk.format.Version;
import org.apache.cassandra.index.sai.metrics.TableQueryMetrics;
public class StorageAttachedIndexQueryPlan implements Index.QueryPlan
@@ -79,7 +80,8 @@ public static StorageAttachedIndexQueryPlan create(ColumnFamilyStore cfs,
return null;
// collect the features of the selected indexes
- IndexFeatureSet.Accumulator accumulator = new IndexFeatureSet.Accumulator();
+ Version version = Version.current(cfs.keyspace.getName());
+ IndexFeatureSet.Accumulator accumulator = new IndexFeatureSet.Accumulator(version);
for (StorageAttachedIndex index : selectedIndexes)
accumulator.accumulate(index.getIndexContext().indexFeatureSet());
diff --git a/src/java/org/apache/cassandra/index/sai/utils/IndexFileUtils.java b/src/java/org/apache/cassandra/index/sai/utils/IndexFileUtils.java
index 86e0ca7b4cba..995f5a94de61 100644
--- a/src/java/org/apache/cassandra/index/sai/utils/IndexFileUtils.java
+++ b/src/java/org/apache/cassandra/index/sai/utils/IndexFileUtils.java
@@ -35,6 +35,7 @@
import io.github.jbellis.jvector.disk.BufferedRandomAccessWriter;
import net.nicoulaj.compilecommand.annotations.DontInline;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.index.sai.disk.format.Version;
import org.apache.cassandra.index.sai.disk.io.IndexInputReader;
import org.apache.cassandra.index.sai.disk.io.IndexOutputWriter;
import org.apache.cassandra.io.compress.BufferType;
@@ -77,11 +78,11 @@ protected IndexFileUtils(SequentialWriterOption writerOption)
this.writerOption = writerOption;
}
- public IndexOutputWriter openOutput(File file, ByteOrder order, boolean append) throws IOException
+ public IndexOutputWriter openOutput(File file, ByteOrder order, boolean append, Version version) throws IOException
{
assert writerOption.finishOnClose() : "IndexOutputWriter relies on close() to sync with disk.";
var checksumWriter = new IncrementalChecksumSequentialWriter(file, writerOption, append);
- return new IndexOutputWriter(checksumWriter, order);
+ return new IndexOutputWriter(checksumWriter, order, version);
}
public BufferedRandomAccessWriter openRandomAccessOutput(File file, boolean append) throws IOException
diff --git a/src/java/org/apache/cassandra/index/sai/utils/SAICodecUtils.java b/src/java/org/apache/cassandra/index/sai/utils/SAICodecUtils.java
index 93ff91702479..68a99217b335 100644
--- a/src/java/org/apache/cassandra/index/sai/utils/SAICodecUtils.java
+++ b/src/java/org/apache/cassandra/index/sai/utils/SAICodecUtils.java
@@ -55,10 +55,15 @@ public void write(int b) throws IOException
return new OutputStreamDataOutput(os);
}
- public static void writeHeader(DataOutput out) throws IOException
+ public static void writeHeader(org.apache.cassandra.index.sai.disk.io.IndexOutput out) throws IOException
+ {
+ writeHeader(out, out.version());
+ }
+
+ public static void writeHeader(DataOutput out, Version version) throws IOException
{
writeBEInt(out, CODEC_MAGIC);
- out.writeString(Version.current().toString());
+ out.writeString(version.toString());
}
public static int headerSize() {
diff --git a/src/java/org/apache/cassandra/index/sai/utils/TypeUtil.java b/src/java/org/apache/cassandra/index/sai/utils/TypeUtil.java
index 54babc48a68d..0bc24f905647 100644
--- a/src/java/org/apache/cassandra/index/sai/utils/TypeUtil.java
+++ b/src/java/org/apache/cassandra/index/sai/utils/TypeUtil.java
@@ -646,8 +646,8 @@ public static ByteBuffer decodeDecimal(ByteBuffer value)
return DecimalType.instance.fromComparableBytes(peekableValue, BYTE_COMPARABLE_VERSION);
}
- public static ByteComparable.Version byteComparableVersionForTermsData()
+ public static ByteComparable.Version byteComparableVersionForTermsData(Version version)
{
- return Version.current().byteComparableVersionFor(IndexComponentType.TERMS_DATA, SSTableFormat.Type.current().info.getLatestVersion());
+ return version.byteComparableVersionFor(IndexComponentType.TERMS_DATA, SSTableFormat.Type.current().info.getLatestVersion());
}
}
diff --git a/src/java/org/apache/cassandra/schema/IndexMetadata.java b/src/java/org/apache/cassandra/schema/IndexMetadata.java
index 4a4185de5baf..39727fd8b046 100644
--- a/src/java/org/apache/cassandra/schema/IndexMetadata.java
+++ b/src/java/org/apache/cassandra/schema/IndexMetadata.java
@@ -118,11 +118,12 @@ public static IndexMetadata fromIndexTargets(List targets,
* Characters other than alphanumeric and underscore are removed.
* Long index names are truncated to fit the length allowing constructing filenames.
*
+ * @param keyspace the keyspace name
* @param table the table name
* @param column the column identifier. Can be null if the index is not column specific.
* @return the generated index name
*/
- public static String generateDefaultIndexName(String table, @Nullable ColumnIdentifier column)
+ public static String generateDefaultIndexName(String keyspace, String table, @Nullable ColumnIdentifier column)
{
String indexNameUncleaned = table;
if (column != null)
@@ -130,7 +131,7 @@ public static String generateDefaultIndexName(String table, @Nullable ColumnIden
String indexNameUntrimmed = PATTERN_NON_WORD_CHAR.matcher(indexNameUncleaned).replaceAll("");
String indexNameTrimmed = indexNameUntrimmed
.substring(0,
- Math.min(calculateGeneratedIndexNameMaxLength(),
+ Math.min(calculateGeneratedIndexNameMaxLength(keyspace),
indexNameUntrimmed.length()));
return indexNameTrimmed + INDEX_POSTFIX;
}
@@ -143,14 +144,14 @@ public static String generateDefaultIndexName(String table, @Nullable ColumnIden
*
* @return the allowed length of the generated index name
*/
- private static int calculateGeneratedIndexNameMaxLength()
+ private static int calculateGeneratedIndexNameMaxLength(String keyspace)
{
// Speculative assumption that uniqueness breaker will fit into 999.
// The value is used for trimming the index name if needed.
// Introducing validation of index name length is TODO for CNDB-13198.
int uniquenessSuffixLength = 4;
int indexNameAddition = uniquenessSuffixLength + INDEX_POSTFIX.length();
- int allowedIndexNameLength = Version.calculateIndexNameAllowedLength();
+ int allowedIndexNameLength = Version.calculateIndexNameAllowedLength(keyspace);
assert allowedIndexNameLength >= indexNameAddition : "cannot happen with current implementation as allowedIndexNameLength is approximately 255 - ~76. However, allowedIndexNameLength was " + allowedIndexNameLength + " and indexNameAddition was " + indexNameAddition;
diff --git a/test/distributed/org/apache/cassandra/distributed/test/sai/IndexAvailabilityTest.java b/test/distributed/org/apache/cassandra/distributed/test/sai/IndexAvailabilityTest.java
index 4ea81708a34a..251d03245aba 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/sai/IndexAvailabilityTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/sai/IndexAvailabilityTest.java
@@ -60,7 +60,7 @@ public class IndexAvailabilityTest extends TestBaseImpl
private static final String CREATE_TABLE = "CREATE TABLE %s.%s (pk text primary key, v1 int, v2 text) " +
"WITH compaction = {'class' : 'SizeTieredCompactionStrategy', 'enabled' : false }";
private static final String CREATE_INDEX = "CREATE CUSTOM INDEX %s ON %s.%s(%s) USING 'StorageAttachedIndex'";
-
+
private static final Map expectedNodeIndexQueryability = new ConcurrentHashMap<>();
private List keyspaces;
private List indexesPerKs;
@@ -271,7 +271,7 @@ private void shouldSkipNonQueryableNode(int nodes, List... nonQueryable
// get index name base on node id to have different non-queryable index on different nodes.
Function nodeIdToColumn = nodeId -> "v" + (nodeId % 2 + 1);
- IntFunction nodeIdToIndex = nodeId -> IndexMetadata.generateDefaultIndexName(table, ColumnIdentifier.getInterned(nodeIdToColumn.apply(nodeId), false));
+ IntFunction nodeIdToIndex = nodeId -> IndexMetadata.generateDefaultIndexName(KEYSPACE, table, ColumnIdentifier.getInterned(nodeIdToColumn.apply(nodeId), false));
for (List nonQueryableNodes : nonQueryableNodesList)
{
@@ -428,7 +428,7 @@ private static Index.Status getNodeIndexStatus(IInvokableInstance node, String k
{
return Index.Status.values()[node.callsOnInstance(() -> getIndexStatus(keyspaceName, indexName, replica).ordinal()).call()];
}
-
+
private static Index.Status getIndexStatus(String keyspaceName, String indexName, InetAddressAndPort replica)
{
KeyspaceMetadata keyspace = Schema.instance.getKeyspaceMetadata(keyspaceName);
@@ -440,7 +440,7 @@ private static Index.Status getIndexStatus(String keyspaceName, String indexName
return Index.Status.UNKNOWN;
SecondaryIndexManager indexManager = Keyspace.openAndGetStore(table).indexManager;
-
+
return indexManager.getIndexStatus(replica, keyspaceName, indexName);
}
@@ -450,7 +450,7 @@ private static InetAddressAndPort getFullAddress(IInvokableInstance node)
int port = node.callOnInstance(() -> FBUtilities.getBroadcastAddressAndPort().port);
return InetAddressAndPort.getByAddressOverrideDefaults(address, port);
}
-
+
private static class NodeIndex
{
private final String keyspace;
diff --git a/test/distributed/org/apache/cassandra/distributed/test/sai/NativeIndexDDLTest.java b/test/distributed/org/apache/cassandra/distributed/test/sai/NativeIndexDDLTest.java
index da93d5a787e7..e99e350ff81d 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/sai/NativeIndexDDLTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/sai/NativeIndexDDLTest.java
@@ -253,7 +253,7 @@ protected long getIndexedCellCount(int node, String table, String column) throws
{
ColumnIdentifier columnID = ColumnIdentifier.getInterned(column, true);
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(table);
- String indexName = IndexMetadata.generateDefaultIndexName(table, columnID);
+ String indexName = IndexMetadata.generateDefaultIndexName(KEYSPACE, table, columnID);
StorageAttachedIndex index = (StorageAttachedIndex) cfs.indexManager.getIndexByName(indexName);
return index.getIndexContext().getCellCount();
}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/sai/features/FeaturesVersionSupportTester.java b/test/distributed/org/apache/cassandra/distributed/test/sai/features/FeaturesVersionSupportTester.java
index 30ceded6c459..a034d24ab65e 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/sai/features/FeaturesVersionSupportTester.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/sai/features/FeaturesVersionSupportTester.java
@@ -17,13 +17,13 @@
package org.apache.cassandra.distributed.test.sai.features;
import java.io.IOException;
+import java.util.Map;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
import org.junit.AfterClass;
import org.junit.Test;
-import net.bytebuddy.ByteBuddy;
-import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
-import net.bytebuddy.implementation.FixedValue;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ICoordinator;
import org.apache.cassandra.distributed.test.TestBaseImpl;
@@ -33,7 +33,6 @@
import org.apache.cassandra.index.sai.disk.format.Version;
import org.assertj.core.api.Assertions;
-import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
import static org.apache.cassandra.distributed.api.ConsistencyLevel.ONE;
import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
@@ -43,10 +42,15 @@
* Tests the availabilty of features in different versions of the SAI on-disk format in a multinode, multiversion cluster.
*
* This is done with a 2-node cluster where the first node has the newer version of the SAI on-disk format and the
- * second node has the older version being tested.
+ * second node has the older version being tested configured as the default current version.
+ *
+ * Additionally, there will be a keyspace per version configured to use that version on the second node instead of the
+ * default one.
*/
public abstract class FeaturesVersionSupportTester extends TestBaseImpl
{
+ private static final Map VERSIONS_PER_KEYSPACE = Version.ALL.stream().collect(Collectors.toMap(v -> "ks_" + v.toString(), v -> v));
+
private static Version version;
private static Cluster cluster;
@@ -54,10 +58,20 @@ public abstract class FeaturesVersionSupportTester extends TestBaseImpl
protected static void initCluster(Version version) throws IOException
{
FeaturesVersionSupportTester.version = version;
+
cluster = init(Cluster.build(2)
- .withInstanceInitializer((cl, n) -> BB.install(cl, n, version))
.withConfig(config -> config.with(GOSSIP).with(NETWORK))
.start(), 1);
+
+ // The first node, which will be the coordinator in all tests, uses the latest version for all keyspaces
+ cluster.get(1).runOnInstance(() -> org.apache.cassandra.index.sai.SAIUtil.setCurrentVersion(Version.LATEST));
+
+ // The second node will use the tested version as the default, and specific versions for known keyspaces
+ String versionString = version.toString();
+ cluster.get(2).runOnInstance(() -> org.apache.cassandra.index.sai.SAIUtil.setCurrentVersion(Version.parse(versionString), VERSIONS_PER_KEYSPACE));
+
+ for (String keyspace : VERSIONS_PER_KEYSPACE.keySet())
+ cluster.schemaChange("CREATE KEYSPACE " + keyspace + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}");
}
@AfterClass
@@ -73,26 +87,31 @@ public static void cleanup()
@Test
public void testANN()
{
- cluster.schemaChange(withKeyspace("CREATE TABLE %s.ann (k int PRIMARY KEY, v vector)"));
+ test(this::testANN);
+ }
+
+ private void testANN(String keyspace, Version version)
+ {
+ cluster.schemaChange("CREATE TABLE " + keyspace + ".ann (k int PRIMARY KEY, v vector)");
ICoordinator coordinator = cluster.coordinator(1);
- coordinator.execute(withKeyspace("INSERT INTO %s.ann (k, v) VALUES (0, [1.0, 2.0])"), ALL);
- coordinator.execute(withKeyspace("INSERT INTO %s.ann (k, v) VALUES (1, [2.0, 3.0])"), ALL);
- coordinator.execute(withKeyspace("INSERT INTO %s.ann (k, v) VALUES (2, [3.0, 4.0])"), ALL);
- coordinator.execute(withKeyspace("INSERT INTO %s.ann (k, v) VALUES (3, [4.0, 5.0])"), ALL);
- cluster.forEach(node -> node.flush(KEYSPACE));
+ coordinator.execute("INSERT INTO " + keyspace + ".ann (k, v) VALUES (0, [1.0, 2.0])", ALL);
+ coordinator.execute("INSERT INTO " + keyspace + ".ann (k, v) VALUES (1, [2.0, 3.0])", ALL);
+ coordinator.execute("INSERT INTO " + keyspace + ".ann (k, v) VALUES (2, [3.0, 4.0])", ALL);
+ coordinator.execute("INSERT INTO " + keyspace + ".ann (k, v) VALUES (3, [4.0, 5.0])", ALL);
+ cluster.forEach(node -> node.flush(keyspace));
- String createIndexQuery = withKeyspace("CREATE CUSTOM INDEX ann_idx ON %s.ann(v) USING 'StorageAttachedIndex'" +
- " WITH OPTIONS = {'similarity_function' : 'euclidean'}");
- String annSelectQuery = withKeyspace("SELECT k FROM %s.ann ORDER BY v ANN OF [2.5, 3.5] LIMIT 3");
- String geoSelectQuery = withKeyspace("SELECT k FROM %s.ann WHERE GEO_DISTANCE(v, [2.5, 3.5]) < 157000");
+ String createIndexQuery = "CREATE CUSTOM INDEX ann_idx ON " + keyspace + ".ann(v) USING 'StorageAttachedIndex' " +
+ "WITH OPTIONS = {'similarity_function' : 'euclidean'}";
+ String annSelectQuery = "SELECT k FROM " + keyspace + ".ann ORDER BY v ANN OF [2.5, 3.5] LIMIT 3";
+ String geoSelectQuery = "SELECT k FROM " + keyspace + ".ann WHERE GEO_DISTANCE(v, [2.5, 3.5]) < 157000";
if (version.onOrAfter(Version.JVECTOR_EARLIEST))
{
cluster.schemaChange(createIndexQuery);
- SAIUtil.waitForIndexQueryable(cluster, KEYSPACE, "ann_idx");
- Assertions.assertThat(coordinator.execute(withKeyspace(annSelectQuery), ONE)).hasNumberOfRows(3);
- Assertions.assertThat(coordinator.execute(withKeyspace(geoSelectQuery), ONE)).hasNumberOfRows(2);
+ SAIUtil.waitForIndexQueryable(cluster, keyspace, "ann_idx");
+ Assertions.assertThat(coordinator.execute(annSelectQuery, ONE)).hasNumberOfRows(3);
+ Assertions.assertThat(coordinator.execute(geoSelectQuery, ONE)).hasNumberOfRows(2);
}
else
{
@@ -101,7 +120,7 @@ public void testANN()
t.getClass().getName().contains(FeatureNeedsIndexRebuildException.class.getName()) &&
t.getMessage().contains("does not support vector indexes"));
cluster.schemaChange(createIndexQuery);
- SAIUtil.assertIndexBuildFailed(cluster.get(1), cluster.get(2), KEYSPACE, "ann_idx");
+ SAIUtil.assertIndexBuildFailed(cluster.get(1), cluster.get(2), keyspace, "ann_idx");
}
}
@@ -112,28 +131,33 @@ public void testANN()
@Test
public void testBM25()
{
- cluster.schemaChange(withKeyspace("CREATE TABLE %s.bm25 (k int PRIMARY KEY, v text)"));
+ test(this::testBM25);
+ }
+
+ private void testBM25(String keyspace, Version version)
+ {
+ cluster.schemaChange("CREATE TABLE " + keyspace + ".bm25 (k int PRIMARY KEY, v text)");
ICoordinator coordinator = cluster.coordinator(1);
- coordinator.execute(withKeyspace("INSERT INTO %s.bm25 (k, v) VALUES (1, 'apple')"), ALL);
- coordinator.execute(withKeyspace("INSERT INTO %s.bm25 (k, v) VALUES (2, 'orange')"), ALL);
- coordinator.execute(withKeyspace("INSERT INTO %s.bm25 (k, v) VALUES (3, 'banana')"), ALL);
- coordinator.execute(withKeyspace("INSERT INTO %s.bm25 (k, v) VALUES (4, 'kiwi')"), ALL);
- cluster.forEach(node -> node.flush(KEYSPACE));
-
- cluster.schemaChange(withKeyspace("CREATE CUSTOM INDEX bm25_idx ON %s.bm25(v) " +
- "USING 'org.apache.cassandra.index.sai.StorageAttachedIndex' " +
- "WITH OPTIONS = {" +
- "'index_analyzer': '{" +
- "\"tokenizer\" : {\"name\" : \"standard\"}, " +
- "\"filters\" : [{\"name\" : \"porterstem\"}]}'}"));
- SAIUtil.waitForIndexQueryable(cluster, KEYSPACE, "bm25_idx");
-
- String query = withKeyspace("SELECT k FROM %s.bm25 ORDER BY v BM25 OF 'apple' LIMIT 3");
+ coordinator.execute("INSERT INTO " + keyspace + ".bm25 (k, v) VALUES (1, 'apple')", ALL);
+ coordinator.execute("INSERT INTO " + keyspace + ".bm25 (k, v) VALUES (2, 'orange')", ALL);
+ coordinator.execute("INSERT INTO " + keyspace + ".bm25 (k, v) VALUES (3, 'banana')", ALL);
+ coordinator.execute("INSERT INTO " + keyspace + ".bm25 (k, v) VALUES (4, 'kiwi')", ALL);
+ cluster.forEach(node -> node.flush(keyspace));
+
+ cluster.schemaChange("CREATE CUSTOM INDEX bm25_idx ON " + keyspace + ".bm25(v) " +
+ "USING 'org.apache.cassandra.index.sai.StorageAttachedIndex' " +
+ "WITH OPTIONS = {" +
+ "'index_analyzer': '{" +
+ "\"tokenizer\" : {\"name\" : \"standard\"}, " +
+ "\"filters\" : [{\"name\" : \"porterstem\"}]}'}");
+ SAIUtil.waitForIndexQueryable(cluster, keyspace, "bm25_idx");
+
+ String query = "SELECT k FROM " + keyspace + ".bm25 ORDER BY v BM25 OF 'apple' LIMIT 3";
if (version.onOrAfter(Version.BM25_EARLIEST))
{
- Assertions.assertThat(coordinator.execute(withKeyspace(query), ONE)).hasNumberOfRows(1);
+ Assertions.assertThat(coordinator.execute(query, ONE)).hasNumberOfRows(1);
}
else
{
@@ -148,19 +172,24 @@ public void testBM25()
@Test
public void testIndexAnalyzer()
{
- cluster.schemaChange(withKeyspace("CREATE TABLE %s.analyzer (k int PRIMARY KEY, v text)"));
+ test(this::testIndexAnalyzer);
+ }
+
+ private void testIndexAnalyzer(String keyspace, Version version)
+ {
+ cluster.schemaChange("CREATE TABLE " + keyspace + ".analyzer (k int PRIMARY KEY, v text)");
ICoordinator coordinator = cluster.coordinator(1);
- coordinator.execute(withKeyspace("INSERT INTO %s.analyzer (k, v) VALUES (0, 'Quick fox')"), ALL);
- coordinator.execute(withKeyspace("INSERT INTO %s.analyzer (k, v) VALUES (1, 'Lazy dogs')"), ALL);
- cluster.forEach(node -> node.flush(KEYSPACE));
+ coordinator.execute("INSERT INTO " + keyspace + ".analyzer (k, v) VALUES (0, 'Quick fox')", ALL);
+ coordinator.execute("INSERT INTO " + keyspace + ".analyzer (k, v) VALUES (1, 'Lazy dogs')", ALL);
+ cluster.forEach(node -> node.flush(keyspace));
- cluster.schemaChange(withKeyspace("CREATE CUSTOM INDEX analyzer_idx ON %s.analyzer(v)" +
- " USING 'org.apache.cassandra.index.sai.StorageAttachedIndex'" +
- "WITH OPTIONS = { 'index_analyzer': 'standard' }"));
- SAIUtil.waitForIndexQueryable(cluster, KEYSPACE, "analyzer_idx");
+ cluster.schemaChange("CREATE CUSTOM INDEX analyzer_idx ON " + keyspace + ".analyzer(v)" +
+ " USING 'org.apache.cassandra.index.sai.StorageAttachedIndex'" +
+ "WITH OPTIONS = { 'index_analyzer': 'standard' }");
+ SAIUtil.waitForIndexQueryable(cluster, keyspace, "analyzer_idx");
- Assertions.assertThat(coordinator.execute(withKeyspace("SELECT * FROM %s.analyzer WHERE v = 'dogs'"), ONE))
+ Assertions.assertThat(coordinator.execute("SELECT * FROM " + keyspace + ".analyzer WHERE v = 'dogs'", ONE))
.hasNumberOfRows(1);
}
@@ -170,50 +199,42 @@ public void testIndexAnalyzer()
@Test
public void testQueryAnalyzer()
{
- cluster.schemaChange(withKeyspace("CREATE TABLE %s.q_analyzer (k int PRIMARY KEY, v text)"));
+ test(this::testQueryAnalyzer);
+ }
+
+ private void testQueryAnalyzer(String keyspace, Version version)
+ {
+ cluster.schemaChange("CREATE TABLE " + keyspace + ".q_analyzer (k int PRIMARY KEY, v text)");
ICoordinator coordinator = cluster.coordinator(1);
- coordinator.execute(withKeyspace("INSERT INTO %s.q_analyzer (k, v) VALUES (1, 'astra quick fox')"), ALL);
- coordinator.execute(withKeyspace("INSERT INTO %s.q_analyzer (k, v) VALUES (2, 'astra2 quick fox')"), ALL);
- coordinator.execute(withKeyspace("INSERT INTO %s.q_analyzer (k, v) VALUES (3, 'astra3 quick foxes')"), ALL);
- cluster.forEach(node -> node.flush(KEYSPACE));
-
- cluster.schemaChange(withKeyspace("CREATE CUSTOM INDEX q_analyzer_idx ON %s.q_analyzer(v) USING 'StorageAttachedIndex' WITH OPTIONS = {" +
- "'index_analyzer': '{" +
- " \"tokenizer\" : { \"name\" : \"whitespace\", \"args\" : {} }," +
- " \"filters\" : [ { \"name\" : \"lowercase\", \"args\": {} }, " +
- " { \"name\" : \"edgengram\", \"args\": { \"minGramSize\":\"1\", \"maxGramSize\":\"30\" } }]," +
- " \"charFilters\" : []}', " +
- "'query_analyzer': '{" +
- " \"tokenizer\" : { \"name\" : \"whitespace\", \"args\" : {} }," +
- " \"filters\" : [ {\"name\" : \"lowercase\",\"args\": {}} ]}'}"));
- SAIUtil.waitForIndexQueryable(cluster, KEYSPACE, "q_analyzer_idx");
-
- Assertions.assertThat(coordinator.execute(withKeyspace("SELECT k FROM %s.q_analyzer WHERE v : 'ast'"), ONE))
- .hasNumberOfRows(3);
- Assertions.assertThat(coordinator.execute(withKeyspace("SELECT k FROM %s.q_analyzer WHERE v : 'astra'"), ONE))
- .hasNumberOfRows(3);
- Assertions.assertThat(coordinator.execute(withKeyspace("SELECT k FROM %s.q_analyzer WHERE v : 'astra2'"), ONE))
- .hasNumberOfRows(1);
- Assertions.assertThat(coordinator.execute(withKeyspace("SELECT k FROM %s.q_analyzer WHERE v : 'fox'"), ONE))
- .hasNumberOfRows(3);
- Assertions.assertThat(coordinator.execute(withKeyspace("SELECT k FROM %s.q_analyzer WHERE v : 'foxes'"), ONE))
- .hasNumberOfRows(1);
+ coordinator.execute("INSERT INTO " + keyspace + ".q_analyzer (k, v) VALUES (1, 'astra quick fox')", ALL);
+ coordinator.execute("INSERT INTO " + keyspace + ".q_analyzer (k, v) VALUES (2, 'astra2 quick fox')", ALL);
+ coordinator.execute("INSERT INTO " + keyspace + ".q_analyzer (k, v) VALUES (3, 'astra3 quick foxes')", ALL);
+ cluster.forEach(node -> node.flush(keyspace));
+
+ cluster.schemaChange("CREATE CUSTOM INDEX q_analyzer_idx ON " + keyspace + ".q_analyzer(v) " +
+ "USING 'StorageAttachedIndex' WITH OPTIONS = {" +
+ "'index_analyzer': '{" +
+ " \"tokenizer\" : { \"name\" : \"whitespace\", \"args\" : {} }," +
+ " \"filters\" : [ { \"name\" : \"lowercase\", \"args\": {} }, " +
+ " { \"name\" : \"edgengram\", \"args\": { \"minGramSize\":\"1\", \"maxGramSize\":\"30\" } }]," +
+ " \"charFilters\" : []}', " +
+ "'query_analyzer': '{" +
+ " \"tokenizer\" : { \"name\" : \"whitespace\", \"args\" : {} }," +
+ " \"filters\" : [ {\"name\" : \"lowercase\",\"args\": {}} ]}'}");
+ SAIUtil.waitForIndexQueryable(cluster, keyspace, "q_analyzer_idx");
+
+ String query = "SELECT k FROM " + keyspace + ".q_analyzer WHERE v : ";
+ Assertions.assertThat(coordinator.execute(query + "'ast'", ONE)).hasNumberOfRows(3);
+ Assertions.assertThat(coordinator.execute(query + "'astra'", ONE)).hasNumberOfRows(3);
+ Assertions.assertThat(coordinator.execute(query + "'astra2'", ONE)).hasNumberOfRows(1);
+ Assertions.assertThat(coordinator.execute(query + "'fox'", ONE)).hasNumberOfRows(3);
+ Assertions.assertThat(coordinator.execute(query + "'foxes'", ONE)).hasNumberOfRows(1);
}
- /**
- * Injection to set the SAI on-disk version on the first one to the most recent one, and the tested version to the second node.
- */
- public static class BB
+ private static void test(BiConsumer testMethod)
{
- @SuppressWarnings("resource")
- public static void install(ClassLoader classLoader, int node, Version version)
- {
- new ByteBuddy().rebase(Version.class)
- .method(named("currentVersionProperty"))
- .intercept(FixedValue.value(node == 1 ? Version.LATEST.toString() : version.toString()))
- .make()
- .load(classLoader, ClassLoadingStrategy.Default.INJECTION);
- }
+ testMethod.accept(KEYSPACE, version);
+ VERSIONS_PER_KEYSPACE.forEach(testMethod);
}
}
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index bba0a52774f2..cc545cfbc403 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -887,7 +887,12 @@ protected void createAggregateOverload(String aggregateName, String argTypes, St
protected String createKeyspace(String query)
{
- String currentKeyspace = createKeyspaceName();
+ return createKeyspace(query, null);
+ }
+
+ protected String createKeyspace(String query, @Nullable String keyspaceName)
+ {
+ String currentKeyspace = createKeyspaceName(keyspaceName);
String fullQuery = String.format(query, currentKeyspace);
logger.info(fullQuery);
schemaChange(fullQuery);
@@ -910,7 +915,12 @@ protected void alterKeyspaceMayThrow(String query) throws Throwable
protected String createKeyspaceName()
{
- String currentKeyspace = String.format("keyspace_%02d", seqNumber.getAndIncrement());
+ return createKeyspaceName(null);
+ }
+
+ protected String createKeyspaceName(@Nullable String keyspaceName)
+ {
+ String currentKeyspace = keyspaceName == null ? String.format("keyspace_%02d", seqNumber.getAndIncrement()) : keyspaceName;
keyspaces.add(currentKeyspace);
return currentKeyspace;
}
@@ -1073,8 +1083,8 @@ protected static Pair getCreateIndexName(String keyspace, String
isQuotedGeneratedIndexName = ParseUtils.isQuoted(column, '\"');
String baseName = Strings.isNullOrEmpty(column)
- ? IndexMetadata.generateDefaultIndexName(table, null)
- : IndexMetadata.generateDefaultIndexName(table, new ColumnIdentifier(column, true));
+ ? IndexMetadata.generateDefaultIndexName(keyspace, table, null)
+ : IndexMetadata.generateDefaultIndexName(keyspace, table, new ColumnIdentifier(column, true));
KeyspaceMetadata ks = Schema.instance.getKeyspaceMetadata(keyspace);
assertNotNull(ks);
diff --git a/test/unit/org/apache/cassandra/index/IndexNameTest.java b/test/unit/org/apache/cassandra/index/IndexNameTest.java
index 6fefea75d5b3..b61a0b26fd8a 100644
--- a/test/unit/org/apache/cassandra/index/IndexNameTest.java
+++ b/test/unit/org/apache/cassandra/index/IndexNameTest.java
@@ -145,7 +145,7 @@ public void testTooLongNamesInternal() throws Throwable
@Test
public void testMaxAcceptableLongNamesNewIndex() throws Throwable
{
- assertEquals(182, Version.calculateIndexNameAllowedLength());
+ assertEquals(182, Version.calculateIndexNameAllowedLength(KEYSPACE));
String longName = "a".repeat(182);
createTable("CREATE TABLE %s (" +
"key int PRIMARY KEY," +
diff --git a/test/unit/org/apache/cassandra/index/sai/SAITester.java b/test/unit/org/apache/cassandra/index/sai/SAITester.java
index b831f034f060..045bf6905320 100644
--- a/test/unit/org/apache/cassandra/index/sai/SAITester.java
+++ b/test/unit/org/apache/cassandra/index/sai/SAITester.java
@@ -44,6 +44,7 @@
import com.google.common.base.Predicates;
import com.google.common.collect.Sets;
+import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -139,8 +140,7 @@ protected static Injections.Counter.CounterBuilder addConditions(Injections.Coun
public static final ClusteringComparator EMPTY_COMPARATOR = new ClusteringComparator();
- public static final PrimaryKey.Factory TEST_FACTORY = Version.current().onDiskFormat().newPrimaryKeyFactory(EMPTY_COMPARATOR);
-
+ public static final PrimaryKey.Factory TEST_FACTORY = Version.current(KEYSPACE).onDiskFormat().newPrimaryKeyFactory(EMPTY_COMPARATOR);
static
{
@@ -286,9 +286,25 @@ public static IndexContext createIndexContext(String columnName, String indexNam
MockSchema.newCFS("test_ks"));
}
+ public Set getIndexContexts(String keyspace, String table)
+ {
+ ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
+ StorageAttachedIndexGroup group = StorageAttachedIndexGroup.getIndexGroup(cfs);
+ Assertions.assertThat(group).isNotNull();
+ return group.getIndexes()
+ .stream()
+ .map(StorageAttachedIndex::getIndexContext)
+ .collect(Collectors.toSet());
+ }
+
public IndexContext getIndexContext(String indexName)
{
- ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(currentTable());
+ return getIndexContext(KEYSPACE, currentTable(), indexName);
+ }
+
+ public IndexContext getIndexContext(String keysapce, String table, String indexName)
+ {
+ ColumnFamilyStore cfs = Keyspace.open(keysapce).getColumnFamilyStore(table);
return StorageAttachedIndexGroup.getIndexGroup(cfs)
.getIndexes()
.stream()
@@ -373,21 +389,26 @@ protected boolean verifyChecksum(IndexContext context)
return true;
}
- protected void verifySAIVersionInUse(Version expectedVersion, IndexContext... contexts)
+ protected void verifySAIVersionInUse(Version expectedVersion)
{
- ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(currentTable());
+ verifySAIVersionInUse(expectedVersion, KEYSPACE, currentTable());
+ }
+
+ protected void verifySAIVersionInUse(Version expectedVersion, String keyspace, String table)
+ {
+ ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
StorageAttachedIndexGroup group = StorageAttachedIndexGroup.getIndexGroup(cfs);
for (SSTableReader sstable : cfs.getLiveSSTables())
{
IndexDescriptor indexDescriptor = loadDescriptor(sstable, cfs);
- assertEquals(indexDescriptor.perSSTableComponents().version(), expectedVersion);
+ assertEquals(expectedVersion, indexDescriptor.perSSTableComponents().version());
SSTableContext ssTableContext = group.sstableContextManager().getContext(sstable);
// This is to make sure the context uses the actual files we think
- assertEquals(ssTableContext.usedPerSSTableComponents().version(), expectedVersion);
+ assertEquals(expectedVersion, ssTableContext.usedPerSSTableComponents().version());
- for (IndexContext indexContext : contexts)
+ for (IndexContext indexContext : getIndexContexts(keyspace, table))
{
assertEquals(indexDescriptor.perIndexComponents(indexContext).version(), expectedVersion);
@@ -585,21 +606,23 @@ protected void verifyIndexFiles(IndexContext numericIndexContext,
{
Set indexFiles = indexFiles();
- for (IndexComponentType indexComponentType : Version.current().onDiskFormat().perSSTableComponentTypes())
+ for (IndexComponentType indexComponentType : Version.current(KEYSPACE).onDiskFormat().perSSTableComponentTypes())
{
- Set tableFiles = componentFiles(indexFiles, new Component(Component.Type.CUSTOM, Version.current().fileNameFormatter().format(indexComponentType, (String)null, 0)));
+ String name = Version.current(KEYSPACE).fileNameFormatter().format(indexComponentType, (String)null, 0);
+ Component component = new Component(Component.Type.CUSTOM, name);
+ Set tableFiles = componentFiles(indexFiles, component);
assertEquals(tableFiles.toString(), perSSTableFiles, tableFiles.size());
}
if (literalIndexContext != null)
{
- for (IndexComponentType indexComponentType : Version.current().onDiskFormat().perIndexComponentTypes(literalIndexContext))
+ for (IndexComponentType indexComponentType : Version.current(KEYSPACE).onDiskFormat().perIndexComponentTypes(literalIndexContext))
{
Set stringIndexFiles = componentFiles(indexFiles,
new Component(Component.Type.CUSTOM,
- Version.current().fileNameFormatter().format(indexComponentType,
- literalIndexContext,
- 0)));
+ Version.current(KEYSPACE)
+ .fileNameFormatter()
+ .format(indexComponentType, literalIndexContext, 0)));
if (isBuildCompletionMarker(indexComponentType))
assertEquals(literalCompletionMarkers, stringIndexFiles.size());
else
@@ -609,13 +632,13 @@ protected void verifyIndexFiles(IndexContext numericIndexContext,
if (numericIndexContext != null)
{
- for (IndexComponentType indexComponentType : Version.current().onDiskFormat().perIndexComponentTypes(numericIndexContext))
+ for (IndexComponentType indexComponentType : Version.current(KEYSPACE).onDiskFormat().perIndexComponentTypes(numericIndexContext))
{
Set numericIndexFiles = componentFiles(indexFiles,
new Component(Component.Type.CUSTOM,
- Version.current().fileNameFormatter().format(indexComponentType,
- numericIndexContext,
- 0)));
+ Version.current(KEYSPACE)
+ .fileNameFormatter()
+ .format(indexComponentType, numericIndexContext, 0)));
if (isBuildCompletionMarker(indexComponentType))
assertEquals(numericCompletionMarkers, numericIndexFiles.size());
else
@@ -724,6 +747,22 @@ protected void truncate(boolean snapshot)
cfs.truncateBlockingWithoutSnapshot();
}
+ protected Set listIndexNames(ColumnFamilyStore cfs)
+ {
+ return cfs.indexManager.listIndexes().stream().map(i -> i.getIndexMetadata().name).collect(Collectors.toSet());
+ }
+
+ protected void rebuildTableIndexes()
+ {
+ rebuildTableIndexes(KEYSPACE, currentTable());
+ }
+
+ protected void rebuildTableIndexes(String keyspace, String table)
+ {
+ ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
+ cfs.indexManager.rebuildIndexesBlocking(listIndexNames(cfs));
+ }
+
protected void rebuildIndexes(String... indexes)
{
ColumnFamilyStore.rebuildSecondaryIndex(KEYSPACE, currentTable(), indexes);
@@ -735,11 +774,16 @@ protected void reloadSSTableIndex()
StorageAttachedIndexGroup.getIndexGroup(cfs).unsafeReload();
}
+ protected void reloadSSTableIndexInPlace()
+ {
+ reloadSSTableIndexInPlace(KEYSPACE, currentTable());
+ }
+
// `reloadSSTalbleIndex` calls `unsafeReload`, which clear all contexts, and then recreate from scratch. This method
// simply signal updates to every sstable without previously clearing anything.
- protected void reloadSSTableIndexInPlace()
+ protected void reloadSSTableIndexInPlace(String keyspace, String table)
{
- ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(currentTable());
+ ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
StorageAttachedIndexGroup group = StorageAttachedIndexGroup.getIndexGroup(cfs);
group.onSSTableChanged(Collections.emptySet(), cfs.getLiveSSTables(), group.getIndexes(), true);
}
@@ -873,7 +917,7 @@ protected Set componentFiles(Collection indexFiles, Component compon
protected Set componentFiles(Collection indexFiles, IndexComponentType indexComponentType, IndexContext indexContext)
{
- String componentName = Version.current().fileNameFormatter().format(indexComponentType, indexContext, 0);
+ String componentName = Version.current(KEYSPACE).fileNameFormatter().format(indexComponentType, indexContext, 0);
return indexFiles.stream().filter(c -> c.name().endsWith(componentName)).collect(Collectors.toSet());
}
diff --git a/test/unit/org/apache/cassandra/index/sai/SAIUtil.java b/test/unit/org/apache/cassandra/index/sai/SAIUtil.java
index f3549cd02be8..0c01cda59380 100644
--- a/test/unit/org/apache/cassandra/index/sai/SAIUtil.java
+++ b/test/unit/org/apache/cassandra/index/sai/SAIUtil.java
@@ -19,26 +19,99 @@
package org.apache.cassandra.index.sai;
import java.lang.reflect.Field;
+import java.util.Map;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.index.Index;
+import org.apache.cassandra.index.SecondaryIndexManager;
import org.apache.cassandra.index.sai.disk.format.Version;
+import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.utils.ReflectionUtils;
public class SAIUtil
{
+ public static Version currentVersion()
+ {
+ return Version.current(CQLTester.KEYSPACE);
+ }
+
+ public static void resetCurrentVersion()
+ {
+ setCurrentVersion(Version.Selector.DEFAULT);
+ }
+
public static void setCurrentVersion(Version version)
{
- Field current = null;
+ setCurrentVersion(version, Map.of());
+ }
+
+ public static void setCurrentVersion(Version defaultVersion, Map versionsPerKeyspace)
+ {
+ setCurrentVersion(new CustomVersionSelector(defaultVersion, versionsPerKeyspace));
+ }
+
+ public static void setCurrentVersion(Version.Selector versionSelector)
+ {
try
{
- current = Version.class.getDeclaredField("CURRENT");
- current.setAccessible(true);
+ // set the current version
+ Field field = Version.class.getDeclaredField("SELECTOR");
+ field.setAccessible(true);
Field modifiersField = ReflectionUtils.getField(Field.class, "modifiers");
modifiersField.setAccessible(true);
- current.set(null, version);
+ field.set(null, versionSelector);
+
+ // update the index contexts for each keyspace
+ for (String keyspaceName : Schema.instance.getKeyspaces())
+ {
+ Keyspace keyspace = Keyspace.open(keyspaceName);
+ Version version = versionSelector.select(keyspaceName);
+
+ for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores())
+ {
+ SecondaryIndexManager sim = cfs.getIndexManager();
+ for (Index index : sim.listIndexes())
+ {
+ if (index instanceof StorageAttachedIndex)
+ {
+ StorageAttachedIndex sai = (StorageAttachedIndex)index;
+ IndexContext context = sai.getIndexContext();
+
+ field = IndexContext.class.getDeclaredField("version");
+ field.setAccessible(true);
+ field.set(context, version);
+
+ field = IndexContext.class.getDeclaredField("primaryKeyFactory");
+ field.setAccessible(true);
+ field.set(context, version.onDiskFormat().newPrimaryKeyFactory(cfs.metadata().comparator));
+ }
+ }
+ }
+ }
}
catch (Exception e)
{
throw new RuntimeException(e);
}
}
+
+ public static class CustomVersionSelector implements Version.Selector
+ {
+ private final Version defaultVersion;
+ private final Map versionsPerKeyspace;
+
+ public CustomVersionSelector(Version defaultVersion, Map versionsPerKeyspace)
+ {
+ this.defaultVersion = defaultVersion;
+ this.versionsPerKeyspace = versionsPerKeyspace;
+ }
+
+ @Override
+ public Version select(String keyspace)
+ {
+ return versionsPerKeyspace.getOrDefault(keyspace, defaultVersion);
+ }
+ }
}
diff --git a/test/unit/org/apache/cassandra/index/sai/SAIUtilTest.java b/test/unit/org/apache/cassandra/index/sai/SAIUtilTest.java
new file mode 100644
index 000000000000..a9556818b00e
--- /dev/null
+++ b/test/unit/org/apache/cassandra/index/sai/SAIUtilTest.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright DataStax, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.index.sai;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.cassandra.index.sai.disk.format.Version;
+import org.assertj.core.api.Assertions;
+
+@RunWith(Parameterized.class)
+public class SAIUtilTest
+{
+ @Parameterized.Parameter
+ public Version currentVersion;
+
+ @Parameterized.Parameter(1)
+ public Version keyspaceVersion;
+
+ private String keyspaceWithVersion;
+
+ @Parameterized.Parameters(name = "currentVersion={0} keyspaceVersion={1}")
+ public static Collection