Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 128 additions & 4 deletions src/java/org/apache/cassandra/db/compaction/unified/Controller.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.cassandra.db.compaction.unified;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -321,6 +322,15 @@ public abstract class Controller
static final String MAX_SSTABLES_PER_SHARD_FACTOR_OPTION = "max_sstables_per_shard_factor";
static final double DEFAULT_MAX_SSTABLES_PER_SHARD_FACTOR = UCS_MAX_SSTABLES_PER_SHARD_FACTOR.getDoubleWithLegacyFallback();

/**
* Whether to use factorization-based shard count growth for smoother progression when base_shard_count is not power of 2.
* When enabled (default: true), instead of using power-of-two jumps like 1→2→8→1000, the system will
* use prime factorization to create smooth sequences like 1→5→25→125→250→500→1000 for num_shards=1000.
* This prevents the large jumps that were involved in the data loss incident caused by HCD-130
* <p>
*/
static final boolean USE_FACTORIZATION_SHARD_COUNT_GROWTH = Boolean.parseBoolean(System.getProperty("use_factorization_shard_count_growth", "true"));

protected final MonotonicClock clock;
protected final Environment env;
protected final double[] survivalFactors;
Expand All @@ -336,6 +346,8 @@ public abstract class Controller
protected final TableMetadata metadata;

protected final int baseShardCount;
private final Optional<int[]> factorizedShardSequence;

private final boolean isReplicaAware;

protected final long targetSSTableSize;
Expand Down Expand Up @@ -410,6 +422,8 @@ public abstract class Controller
"Set it to 'true' to enable aggressive SSTable expiration.");
}
this.ignoreOverlapsInExpirationCheck = ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION && ignoreOverlapsInExpirationCheck;

this.factorizedShardSequence = useFactorizationShardCountGrowth() ? Optional.of(factorizedSmoothShardSequence(baseShardCount)) : Optional.empty();
}

public static File getControllerConfigPath(TableMetadata metadata)
Expand Down Expand Up @@ -527,10 +541,18 @@ public int getNumShards(double localDensity)
// Note: the minimum size cannot be larger than the target size's minimum.
if (!(count >= baseShardCount)) // also true for count == NaN
{
// Make it a power of two, rounding down so that sstables are greater in size than the min.
// Setting the bottom bit to 1 ensures the result is at least 1.
// If baseShardCount is not a power of 2, split only to powers of two that are divisors of baseShardCount so boundaries match higher levels
shards = Math.min(Integer.highestOneBit((int) count | 1), baseShardCount & -baseShardCount);
// Use factorization-based growth for smoother progression if it's not power of 2
if (factorizedShardSequence.isPresent())
{
shards = getLargestFactorizedShardCount(count);
}
else
{
// Make it a power of two, rounding down so that sstables are greater in size than the min.
// Setting the bottom bit to 1 ensures the result is at least 1.
// If baseShardCount is not a power of 2, split only to powers of two that are divisors of baseShardCount so boundaries match higher levels
shards = Math.min(Integer.highestOneBit((int) count | 1), baseShardCount & -baseShardCount);
}
if (logger.isDebugEnabled())
logger.debug("Shard count {} for density {}, {} times min size {}",
shards,
Expand Down Expand Up @@ -1569,6 +1591,108 @@ public List<CompactionAggregate.UnifiedAggregate> prioritize(List<CompactionAggr
return aggregates;
}

/**
* Check if factorization-based growth is enabled and base shard count is not power of 2.
*/
@VisibleForTesting
boolean useFactorizationShardCountGrowth()
{
return USE_FACTORIZATION_SHARD_COUNT_GROWTH && baseShardCount > 0 && Integer.bitCount(baseShardCount) != 1;
}

/**
* Compute a smooth shard count sequence based on prime factorization and find the largest shard count that is not larger
* than current shard count based on density or first shard if current count is NaN or negative
*
* For num_shards=1000 (5^3 * 2^3), returns the appropriate shard count
* based on current density to achieve smooth growth: 1, 5, 25, 125, 250, 500, 1000
*
* @param currentCountBasedOnDensity the current count based on density
* @return the largest shard count for the current density
*/
@VisibleForTesting
int getLargestFactorizedShardCount(double currentCountBasedOnDensity)
{
int[] sequence = factorizedShardSequence.get();
if (Double.isNaN(currentCountBasedOnDensity))
return sequence[0];

int searchKey = (int) Math.floor(currentCountBasedOnDensity);
int idx = Arrays.binarySearch(sequence, searchKey);
// exact match
if (idx >= 0)
return sequence[idx];

// insertion point
int insertionPoint = -idx - 1;
// we need the value before insertion point or 0 if insertion point is 0
int candidateIndex = Math.max(0, insertionPoint - 1);
return sequence[candidateIndex];
}

/**
* Generate a factorized shard sequence for smooth shard count growth.
* Uses prime factorization with largest factors first to create a cumulative sequence.
*
* For example: 1000 (5³×2³) → [1, 5, 25, 125, 250, 500, 1000]
* This provides much smoother growth than power-of-2 jumps.
*/
@VisibleForTesting
static int[] factorizedSmoothShardSequence(int target)
{
if (target <= 0) throw new IllegalArgumentException("target must be positive");
if (target == 1) return new int[]{ 1 };

// 1) Factorize targeShards into list of prime factors in ascending order
List<Integer> primesAscending = primeFactors(target);

// 2) Cumulative product to form the chain by using largest prime first.
int[] divisors = new int[primesAscending.size() + 1];
int cur = 1;
divisors[0] = cur;
for (int i = 0; i < primesAscending.size(); i++)
{
cur *= primesAscending.get(primesAscending.size() - 1 - i);
divisors[i + 1] = cur;
}
return divisors;
}

/**
* Prime factorization for shard count (usually small) to produce a list of prime factors in ascending order
* For example: 1000 -> [2, 2, 2, 5, 5, 5]
*/
@VisibleForTesting
static List<Integer> primeFactors(int num)
{
if (num <= 1)
throw new IllegalArgumentException("num must be greater than 1, got: " + num);

List<Integer> result = new ArrayList<>();

// Factor out 2 using more readable modulo check
while (num % 2 == 0)
{
result.add(2);
num /= 2;
}

for (int factor = 3; (long) factor * factor <= num; factor += 2)
{
while (num % factor == 0)
{
result.add(factor);
num /= factor;
}
}

// If num is still > 1, then it's a prime factor
if (num > 1)
result.add(num);

return result;
}

static final class Metrics
{
private final MetricNameFactory factory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
Expand Down Expand Up @@ -52,6 +53,7 @@

import static org.apache.cassandra.config.CassandraRelevantProperties.UCS_OVERRIDE_UCS_CONFIG_FOR_VECTOR_TABLES;
import static org.apache.cassandra.SchemaLoader.standardCFMD;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand Down Expand Up @@ -712,4 +714,207 @@ void testValidateCompactionStrategyOptions(boolean testLogType)
Map<String, String> uncheckedOptions = CompactionStrategyOptions.validateOptions(options);
assertNotNull(uncheckedOptions);
}
}

@Test
public void testFactorizedShardGrowth()
{
// Test factorization-based growth for num_shards=1000 (5^3 * 2^3)
Map<String, String> options = new HashMap<>();
options.put(Controller.NUM_SHARDS_OPTION, "1000");
options.put(Controller.MIN_SSTABLE_SIZE_OPTION, "1GiB");
mockFlushSize(100);
Controller controller = Controller.fromOptions(cfs, options);

// Verify shard count is set
assertEquals(1.0, controller.sstableGrowthModifier, 0.0);
assertTrue(controller.useFactorizationShardCountGrowth());

// Test the smooth progression sequence: 1, 5, 25, 125, 250, 500, 1000
assertEquals(1, controller.getNumShards(0)); // 0 GiB → 1 shard
assertEquals(1, controller.getNumShards(Math.scalb(1.5, 30))); // 1.5 GiB → 1 shard
assertEquals(5, controller.getNumShards(Math.scalb(6.0, 30))); // 6 GiB → 5 shards
assertEquals(25, controller.getNumShards(Math.scalb(30.0, 30))); // 30 GiB → 25 shards
assertEquals(125, controller.getNumShards(Math.scalb(130.0, 30))); // 130 GiB → 125 shards
assertEquals(250, controller.getNumShards(Math.scalb(300.0, 30))); // 300 GiB → 250 shards
assertEquals(500, controller.getNumShards(Math.scalb(600.0, 30))); // 600 GiB → 500 shards
assertEquals(1000, controller.getNumShards(Math.scalb(1000.0, 30))); // 1000 GiB → 1000 shards

// Test boundary cases
assertEquals(1, controller.getNumShards(Math.scalb(4.9, 30))); // Just below 5
assertEquals(5, controller.getNumShards(Math.scalb(5.0, 30))); // Exactly 5
assertEquals(5, controller.getNumShards(Math.scalb(24.9, 30))); // Just below 25
assertEquals(25, controller.getNumShards(Math.scalb(25.0, 30))); // Exactly 25

// Test very large values
assertEquals(1000, controller.getNumShards(Double.POSITIVE_INFINITY));
}

@Test
public void testFactorizedShardGrowthPowerOfTwo()
{
// Test with a power of 2 (should behave similarly to old logic)
Map<String, String> options = new HashMap<>();
options.put(Controller.NUM_SHARDS_OPTION, "1024"); // 2^10
options.put(Controller.MIN_SSTABLE_SIZE_OPTION, "1GiB");
mockFlushSize(100);
Controller controller = Controller.fromOptions(cfs, options);

// Should give smooth power-of-2 progression: 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024
assertEquals(1, controller.getNumShards(Math.scalb(1.5, 30)));
assertEquals(2, controller.getNumShards(Math.scalb(2.5, 30)));
assertEquals(4, controller.getNumShards(Math.scalb(5.0, 30)));
assertEquals(8, controller.getNumShards(Math.scalb(10.0, 30)));
assertEquals(16, controller.getNumShards(Math.scalb(20.0, 30)));
assertEquals(32, controller.getNumShards(Math.scalb(40.0, 30)));
assertEquals(64, controller.getNumShards(Math.scalb(80.0, 30)));
assertEquals(128, controller.getNumShards(Math.scalb(150.0, 30)));
assertEquals(256, controller.getNumShards(Math.scalb(300.0, 30)));
assertEquals(512, controller.getNumShards(Math.scalb(600.0, 30)));
assertEquals(1024, controller.getNumShards(Math.scalb(1024.0, 30)));
}

@Test
public void testNoFactorizedShardGrowthWithPowerOfTwo()
{
// Test no factorization-based growth for num_shards=128
Map<String, String> options = new HashMap<>();
options.put(Controller.NUM_SHARDS_OPTION, "128");
options.put(Controller.MIN_SSTABLE_SIZE_OPTION, "1GiB");
Controller controller = Controller.fromOptions(cfs, options);

assertFalse(controller.useFactorizationShardCountGrowth());
}

@Test
public void testFactorizedShardGrowthPrimeTarget()
{
// Test with a prime number (should stay at 1 until reaching the target)
Map<String, String> options = new HashMap<>();
options.put(Controller.NUM_SHARDS_OPTION, "17"); // Prime number
options.put(Controller.MIN_SSTABLE_SIZE_OPTION, "1GiB");
mockFlushSize(100);
Controller controller = Controller.fromOptions(cfs, options);

// Since 17 is prime, sequence should be just: 1, 17
assertEquals(1, controller.getNumShards(Double.NaN));
assertEquals(1, controller.getNumShards(Math.scalb(1.5, 30)));
assertEquals(1, controller.getNumShards(Math.scalb(5.0, 30)));
assertEquals(1, controller.getNumShards(Math.scalb(10.0, 30)));
assertEquals(1, controller.getNumShards(Math.scalb(16.9, 30)));
assertEquals(17, controller.getNumShards(Math.scalb(17.0, 30)));
assertEquals(17, controller.getNumShards(Math.scalb(100.0, 30)));
}

@Test
public void testFactorizedShardSequence()
{
// Test small numbers
assertArrayEquals(new int[]{ 1 }, Controller.factorizedSmoothShardSequence(1));
assertArrayEquals(new int[]{ 1, 2 }, Controller.factorizedSmoothShardSequence(2));
assertArrayEquals(new int[]{ 1, 3 }, Controller.factorizedSmoothShardSequence(3));
assertArrayEquals(new int[]{ 1, 2, 4 }, Controller.factorizedSmoothShardSequence(4));

// Test perfect squares
assertArrayEquals(new int[]{ 1, 3, 9 }, Controller.factorizedSmoothShardSequence(9));
assertArrayEquals(new int[]{ 1, 2, 4, 8, 16 }, Controller.factorizedSmoothShardSequence(16));

// Test primes
assertArrayEquals(new int[]{ 1, 7 }, Controller.factorizedSmoothShardSequence(7));
assertArrayEquals(new int[]{ 1, 11 }, Controller.factorizedSmoothShardSequence(11));

// Test composite numbers
assertArrayEquals(new int[]{ 1, 3, 6, 12 }, Controller.factorizedSmoothShardSequence(12));

// Test 1000 = 5^3 * 2^3
int[] expected1000 = new int[]{ 1, 5, 25, 125, 250, 500, 1000 };
assertArrayEquals(expected1000, Controller.factorizedSmoothShardSequence(1000));

// Test 3200 = 5^2 * 2^7
int[] expected3200 = new int[]{ 1, 5, 25, 50, 100, 200, 400, 800, 1600, 3200 };
assertArrayEquals(expected3200, Controller.factorizedSmoothShardSequence(3200));

// Test Max Int
assertArrayEquals(new int[]{ 1, Integer.MAX_VALUE }, Controller.factorizedSmoothShardSequence(Integer.MAX_VALUE));
}

@Test
public void testFactorizedShardSequenceInputValidation()
{
assertThatThrownBy(() -> Controller.factorizedSmoothShardSequence(0)).hasMessageContaining("must be positive");
assertThatThrownBy(() -> Controller.factorizedSmoothShardSequence(-5)).hasMessageContaining("must be positive");
}

@Test
public void testPrimeFactors()
{
// Test small numbers
assertEquals(Arrays.asList(2), Controller.primeFactors(2));
assertEquals(Arrays.asList(3), Controller.primeFactors(3));
assertEquals(Arrays.asList(2, 2), Controller.primeFactors(4));

// Test larger numbers
assertEquals(Arrays.asList(2, 2, 2, 5, 5, 5), Controller.primeFactors(1000));
assertEquals(Arrays.asList(2, 2, 2, 2, 2, 2, 2, 5, 5), Controller.primeFactors(3200));

// Test primes
assertEquals(Arrays.asList(7), Controller.primeFactors(7));
assertEquals(Arrays.asList(97), Controller.primeFactors(97));
assertEquals(Arrays.asList(Integer.MAX_VALUE), Controller.primeFactors(Integer.MAX_VALUE));
}

@Test
public void testPrimeFactorsInputValidation()
{
assertThatThrownBy(() -> Controller.primeFactors(0)).hasMessageContaining("greater than 1");
assertThatThrownBy(() -> Controller.primeFactors(1)).hasMessageContaining("greater than 1");
assertThatThrownBy(() -> Controller.primeFactors(-10)).hasMessageContaining("greater than 1");
}

@Test
public void testGetLargestFactorizedShardCount()
{
// Test with num_shards=1000 to get sequence: [1, 5, 25, 125, 250, 500, 1000]
Map<String, String> options = new HashMap<>();
options.put(Controller.NUM_SHARDS_OPTION, "1000");
options.put(Controller.MIN_SSTABLE_SIZE_OPTION, "1GiB");
mockFlushSize(100);
Controller controller = Controller.fromOptions(cfs, options);

// Test exact matches
assertEquals(1, controller.getLargestFactorizedShardCount(1.0));
assertEquals(5, controller.getLargestFactorizedShardCount(5.0));
assertEquals(25, controller.getLargestFactorizedShardCount(25.0));
assertEquals(125, controller.getLargestFactorizedShardCount(125.0));
assertEquals(250, controller.getLargestFactorizedShardCount(250.0));
assertEquals(500, controller.getLargestFactorizedShardCount(500.0));
assertEquals(1000, controller.getLargestFactorizedShardCount(1000.0));

// Test values between sequence elements (should return largest ≤ input)
assertEquals(1, controller.getLargestFactorizedShardCount(1.5));
assertEquals(1, controller.getLargestFactorizedShardCount(4.9));
assertEquals(5, controller.getLargestFactorizedShardCount(5.1));
assertEquals(5, controller.getLargestFactorizedShardCount(24.9));
assertEquals(25, controller.getLargestFactorizedShardCount(25.1));
assertEquals(25, controller.getLargestFactorizedShardCount(124.9));
assertEquals(125, controller.getLargestFactorizedShardCount(125.1));
assertEquals(125, controller.getLargestFactorizedShardCount(249.9));
assertEquals(250, controller.getLargestFactorizedShardCount(250.1));
assertEquals(250, controller.getLargestFactorizedShardCount(499.9));
assertEquals(500, controller.getLargestFactorizedShardCount(500.1));
assertEquals(500, controller.getLargestFactorizedShardCount(999.9));

// Test edge cases
assertEquals(1, controller.getLargestFactorizedShardCount(0.0));
assertEquals(1, controller.getLargestFactorizedShardCount(0.5));
assertEquals(1000, controller.getLargestFactorizedShardCount(1000.1));
assertEquals(1000, controller.getLargestFactorizedShardCount(2000.0));
assertEquals(1000, controller.getLargestFactorizedShardCount(Double.POSITIVE_INFINITY));

// Test NaN
assertEquals(1, controller.getLargestFactorizedShardCount(Double.NaN));

// Test negative values (should return first element)
assertEquals(1, controller.getLargestFactorizedShardCount(-1.0));
assertEquals(1, controller.getLargestFactorizedShardCount(-100.0));
}
}