Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,9 @@ public enum CassandraRelevantProperties
/** The current version of the SAI on-disk index format. */
SAI_CURRENT_VERSION("cassandra.sai.latest.version", "dc"),

/** The class to use for selecting the current version of the SAI on-disk index format on a per-keyspace basis. */
SAI_VERSION_SELECTOR_CLASS("cassandra.sai.version.selector.class", ""),

/** Controls the maximum top-k limit for vector search */
SAI_VECTOR_SEARCH_MAX_TOP_K("cassandra.sai.vector_search.max_top_k", "1000"),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,9 +278,10 @@ private void validateIndexTarget(TableMetadata table, IndexMetadata.Kind kind, I

private String generateIndexName(KeyspaceMetadata keyspace, List<IndexTarget> targets)
{
assert keyspace.name.equals(keyspaceName);
String baseName = targets.size() == 1
? IndexMetadata.generateDefaultIndexName(tableName, targets.get(0).column)
: IndexMetadata.generateDefaultIndexName(tableName, null);
? IndexMetadata.generateDefaultIndexName(keyspaceName, tableName, targets.get(0).column)
: IndexMetadata.generateDefaultIndexName(keyspaceName, tableName, null);
return keyspace.findAvailableIndexName(baseName);
}

Expand Down
5 changes: 3 additions & 2 deletions src/java/org/apache/cassandra/index/Index.java
Original file line number Diff line number Diff line change
Expand Up @@ -843,8 +843,9 @@ default void unload() { }
* Returns the set of sstable-attached components that this group will create for a newly flushed sstable.
*
* Note that the result of this method is only valid for newly flushed/written sstables as the components
* returned will assume a version of {@link Version#current()} and a generation of 0. SSTables for which some
* index have been rebuild may have index components that do not match what this method return in particular.
* returned will assume a version of {@link Version#current(String)} and a generation of 0. SSTables for which
* some index have been rebuild may have index components that do not match what this method return in
* particular.
*/
Set<Component> componentsForNewSSTable();

Expand Down
15 changes: 11 additions & 4 deletions src/java/org/apache/cassandra/index/sai/IndexContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ public class IndexContext
private final AbstractAnalyzer.AnalyzerFactory queryAnalyzerFactory;
private final PrimaryKey.Factory primaryKeyFactory;

private final Version version;
private final int maxTermSize;

private volatile boolean dropped = false;
Expand All @@ -177,7 +178,8 @@ public IndexContext(@Nonnull String keyspace,
this.viewManager = new IndexViewManager(this);
this.validator = TypeUtil.cellValueType(column, indexType);
this.cfs = cfs;
this.primaryKeyFactory = Version.current().onDiskFormat().newPrimaryKeyFactory(clusteringComparator);
this.version = Version.current(keyspace);
this.primaryKeyFactory = version.onDiskFormat().newPrimaryKeyFactory(clusteringComparator);

String columnName = column.name.toString();

Expand Down Expand Up @@ -223,6 +225,11 @@ public IndexContext(@Nonnull String keyspace,
logger.debug(logMessage("Initialized index context with index writer config: {}"), indexWriterConfig);
}

public Version version()
{
return version;
}

public AbstractType<?> keyValidator()
{
return partitionKeyType;
Expand Down Expand Up @@ -409,7 +416,7 @@ private boolean validateCumulativeAnalyzedTermLimit(DecoratedKey key, AbstractAn

public void update(DecoratedKey key, Row oldRow, Row newRow, Memtable memtable, OpOrder.Group opGroup)
{
if (Version.current().equals(Version.AA))
if (version.equals(Version.AA))
{
// AA cannot handle updates because it indexes partition keys instead of fully qualified primary keys.
index(key, newRow, memtable, opGroup);
Expand Down Expand Up @@ -683,7 +690,7 @@ public View getReferencedView(long timeoutNanos)
*/
public int openPerIndexFiles()
{
return viewManager.getView().size() * Version.current().onDiskFormat().openFilesPerIndex(this);
return viewManager.getView().size() * version.onDiskFormat().openFilesPerIndex(this);
}

public void prepareSSTablesForRebuild(Collection<SSTableReader> sstablesToRebuild)
Expand Down Expand Up @@ -1020,7 +1027,7 @@ public long indexFileCacheSize()

public IndexFeatureSet indexFeatureSet()
{
IndexFeatureSet.Accumulator accumulator = new IndexFeatureSet.Accumulator();
IndexFeatureSet.Accumulator accumulator = new IndexFeatureSet.Accumulator(version);
getView().getIndexes().stream().map(SSTableIndex::indexFeatureSet).forEach(set -> accumulator.accumulate(set));
return accumulator.complete();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,12 +448,12 @@ private Future<?> startInitialBuild(ColumnFamilyStore baseCfs, boolean validate,
return CompletableFuture.completedFuture(null);
}

if (indexContext.isVector() && Version.current().compareTo(Version.JVECTOR_EARLIEST) < 0)
if (indexContext.isVector() && indexContext.version().compareTo(Version.JVECTOR_EARLIEST) < 0)
{
throw new FeatureNeedsIndexRebuildException(String.format("The current configured on-disk format version %s does not support vector indexes. " +
"The minimum version that supports vectors is %s. " +
"The on-disk format version can be set via the -D%s system property.",
Version.current(),
indexContext.version(),
Version.JVECTOR_EARLIEST,
CassandraRelevantProperties.SAI_CURRENT_VERSION.name()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.apache.cassandra.index.sai.disk.format.ComponentsBuildId;
import org.apache.cassandra.index.sai.disk.format.IndexComponents;
import org.apache.cassandra.index.sai.disk.format.IndexDescriptor;
import org.apache.cassandra.index.sai.disk.format.Version;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
Expand Down Expand Up @@ -321,7 +320,7 @@ private static void prepareForRebuild(IndexComponents.ForRead components, Set<Co
// where immutable components was enabled, but then disabled for some reason. If that happens, we still
// want to ensure a new build removes the old files both from disk (happens below) and from the sstable TOC
// (which is what `replacedComponents` is about)).
if (components.version().useImmutableComponentFiles() || !components.buildId().equals(ComponentsBuildId.forNewSSTable()))
if (components.version().useImmutableComponentFiles() || !components.buildId().equals(ComponentsBuildId.forNewSSTable(components.version())))
replacedComponents.addAll(components.allAsCustomComponents());

if (!components.version().useImmutableComponentFiles())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.cassandra.index.sai.disk.StorageAttachedIndexWriter;
import org.apache.cassandra.index.sai.disk.format.IndexComponent;
import org.apache.cassandra.index.sai.disk.format.IndexDescriptor;
import org.apache.cassandra.index.sai.disk.format.Version;
import org.apache.cassandra.index.sai.metrics.IndexGroupMetrics;
import org.apache.cassandra.index.sai.metrics.TableQueryMetrics;
import org.apache.cassandra.index.sai.metrics.TableStateMetrics;
Expand Down Expand Up @@ -91,6 +92,7 @@ public class StorageAttachedIndexGroup implements Index.Group, INotificationCons
private final ColumnFamilyStore baseCfs;

private final SSTableContextManager contextManager;
private final Version version;



Expand All @@ -101,6 +103,7 @@ public class StorageAttachedIndexGroup implements Index.Group, INotificationCons
this.stateMetrics = new TableStateMetrics(baseCfs.metadata(), this);
this.groupMetrics = new IndexGroupMetrics(baseCfs.metadata(), this);
this.contextManager = new SSTableContextManager(baseCfs.getTracker());
this.version = Version.current(baseCfs.keyspace.getName());

Tracker tracker = baseCfs.getTracker();
tracker.subscribe(this);
Expand Down Expand Up @@ -282,7 +285,7 @@ public boolean handles(IndexTransaction.Type type)
@Override
public Set<Component> componentsForNewSSTable()
{
return IndexDescriptor.componentsForNewlyFlushedSSTable(indices);
return IndexDescriptor.componentsForNewlyFlushedSSTable(indices, version);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Map;
import java.util.Set;

import org.apache.cassandra.index.sai.disk.format.Version;
import org.apache.cassandra.index.sai.disk.io.IndexOutput;
import org.apache.cassandra.index.sai.disk.oldlucene.ResettableByteBuffersIndexOutput;
import org.apache.lucene.store.ByteBuffersDataInput;
Expand All @@ -44,9 +45,9 @@ public class ModernResettableByteBuffersIndexOutput extends ResettableByteBuffer
private final ByteBuffersIndexOutput bbio;
private final ByteBuffersDataOutput delegate;

public ModernResettableByteBuffersIndexOutput(int expectedSize, String name)
public ModernResettableByteBuffersIndexOutput(int expectedSize, String name, Version version)
{
super("", name, ByteOrder.LITTLE_ENDIAN);
super("", name, ByteOrder.LITTLE_ENDIAN, version);
delegate = new ByteBuffersDataOutput(expectedSize);
bbio = new ByteBuffersIndexOutput(delegate, "", name + "-bb");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public StorageAttachedIndexWriter(IndexDescriptor indexDescriptor,
TableMetrics tableMetrics) throws IOException
{
// We always write at the latest version (through what that version is can be configured for specific cases)
var onDiskFormat = Version.current().onDiskFormat();
var onDiskFormat = Version.current(indexDescriptor.descriptor.ksname).onDiskFormat();
this.indexDescriptor = indexDescriptor;
// Note: I think there is a silent assumption here. That is, the PK factory we use here must be for the current
// format version, because that is what `IndexContext.keyFactory` always uses (see ctor)
Expand Down Expand Up @@ -126,7 +126,7 @@ public void begin()
public void startPartition(DecoratedKey key, long position)
{
if (aborted) return;

currentKey = key;

try
Expand Down Expand Up @@ -174,7 +174,7 @@ public void partitionLevelDeletion(DeletionTime deletionTime, long position)
public void staticRow(Row staticRow, long position)
{
if (aborted) return;

if (staticRow.isEmpty())
return;

Expand Down Expand Up @@ -279,7 +279,7 @@ public void complete(SSTable sstable)

/**
* Aborts all column index writers and, only if they have not yet completed, SSTable-level component writers.
*
*
* @param accumulator the initial exception thrown from the failed writer
*/
@Override
Expand All @@ -300,12 +300,12 @@ public void abort(Throwable accumulator, boolean fromIndex)

// Mark the write aborted, so we can short-circuit any further operations on the component writers.
aborted = true;

// For non-compaction, make any indexes involved in this transaction non-queryable, as they will likely not match the backing table.
// For compaction: the compaction task should be aborted and new sstables will not be added to tracker
if (fromIndex && opType != OperationType.COMPACTION)
indices.forEach(StorageAttachedIndex::makeIndexNonQueryable);

for (PerIndexWriter perIndexWriter : perIndexWriters)
{
try
Expand All @@ -320,10 +320,10 @@ public void abort(Throwable accumulator, boolean fromIndex)
}
}
}

if (!tokenOffsetWriterCompleted)
{
// If the token/offset files have already been written successfully, they can be reused later.
// If the token/offset files have already been written successfully, they can be reused later.
perSSTableWriter.abort(accumulator);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
*/
public class ComponentsBuildId implements Comparable<ComponentsBuildId>
{
private static final ComponentsBuildId FOR_NEW_SSTABLE = ComponentsBuildId.current(0);

private final Version version;
private final int generation;

Expand All @@ -47,19 +45,13 @@ public static ComponentsBuildId of(Version version, int generation)
return new ComponentsBuildId(version, generation);
}

public static ComponentsBuildId current(int generation)
{
return of(Version.current(), generation);
}

public static ComponentsBuildId forNewSSTable()
public static ComponentsBuildId forNewSSTable(Version version)
{
return FOR_NEW_SSTABLE;
return ComponentsBuildId.of(version, 0);
}

public static ComponentsBuildId forNewBuild(@Nullable ComponentsBuildId previousBuild, Predicate<ComponentsBuildId> newBuildIsUsablePredicate)
public static ComponentsBuildId forNewBuild(Version version, @Nullable ComponentsBuildId previousBuild, Predicate<ComponentsBuildId> newBuildIsUsablePredicate)
{
Version version = Version.current();
// If we're not using immutable components, we always use generation 0, and we're fine if that overrides existing files
if (!version.useImmutableComponentFiles())
return new ComponentsBuildId(version, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
* of the completion marker, but not of the other component of the group). The bumping of generations takes incomplete
* groups into account, and so incomplete groups are not overridden either. Essentially, the generation used by a new
* build is always one more than the highest generation of any component found on disk (for the group in question, and
* the version we writting, usually {@link Version#current()}).
* the version we writting, usually {@link Version#current(String)}).
*/
public interface IndexComponents
{
Expand Down
Loading
Loading