Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 128 additions & 4 deletions src/java/org/apache/cassandra/db/compaction/unified/Controller.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
package org.apache.cassandra.db.compaction.unified;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -316,6 +318,14 @@ public abstract class Controller
static final String MAX_SSTABLES_PER_SHARD_FACTOR_OPTION = "max_sstables_per_shard_factor";
static final double DEFAULT_MAX_SSTABLES_PER_SHARD_FACTOR = Double.parseDouble(getSystemProperty(MAX_SSTABLES_PER_SHARD_FACTOR_OPTION, "10"));

/**
* Whether to use factorization-based shard count growth for smoother progression when base_shard_count is not power of 2.
* When enabled (default: true), instead of using power-of-two jumps like 1→2→8→1000, the system will
* use prime factorization to create smooth sequences like 1→5→25→125→250→500→1000 for num_shards=1000.
* This prevents the large jumps that were involved in the data loss incident caused by HCD-130
* <p>
*/
static final boolean USE_FACTORIZATION_SHARD_COUNT_GROWTH = Boolean.parseBoolean(getSystemProperty("use_factorization_shard_count_growth", "true"));

protected final MonotonicClock clock;
protected final Environment env;
Expand All @@ -332,6 +342,8 @@ public abstract class Controller
protected final TableMetadata metadata;

protected final int baseShardCount;
private final Optional<int[]> factorizedShardSequence;

private final boolean isReplicaAware;

protected final long targetSSTableSize;
Expand Down Expand Up @@ -406,6 +418,8 @@ public abstract class Controller
"Set it to 'true' to enable aggressive SSTable expiration.");
}
this.ignoreOverlapsInExpirationCheck = ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION && ignoreOverlapsInExpirationCheck;

this.factorizedShardSequence = useFactorizationShardCountGrowth() ? Optional.of(factorizedSmoothShardSequence(baseShardCount)) : Optional.empty();
}

public static File getControllerConfigPath(TableMetadata metadata)
Expand Down Expand Up @@ -523,10 +537,18 @@ public int getNumShards(double localDensity)
// Note: the minimum size cannot be larger than the target size's minimum.
if (!(count >= baseShardCount)) // also true for count == NaN
{
// Make it a power of two, rounding down so that sstables are greater in size than the min.
// Setting the bottom bit to 1 ensures the result is at least 1.
// If baseShardCount is not a power of 2, split only to powers of two that are divisors of baseShardCount so boundaries match higher levels
shards = Math.min(Integer.highestOneBit((int) count | 1), baseShardCount & -baseShardCount);
// Use factorization-based growth for smoother progression if it's not power of 2
if (factorizedShardSequence.isPresent())
{
shards = getLargestFactorizedShardCount(count);
}
else
{
// Make it a power of two, rounding down so that sstables are greater in size than the min.
// Setting the bottom bit to 1 ensures the result is at least 1.
// If baseShardCount is not a power of 2, split only to powers of two that are divisors of baseShardCount so boundaries match higher levels
shards = Math.min(Integer.highestOneBit((int) count | 1), baseShardCount & -baseShardCount);
}
if (logger.isDebugEnabled())
logger.debug("Shard count {} for density {}, {} times min size {}",
shards,
Expand Down Expand Up @@ -1565,6 +1587,108 @@ public List<CompactionAggregate.UnifiedAggregate> prioritize(List<CompactionAggr
return aggregates;
}

/**
* Check if factorization-based growth is enabled and base shard count is not power of 2.
*/
@VisibleForTesting
boolean useFactorizationShardCountGrowth()
{
return USE_FACTORIZATION_SHARD_COUNT_GROWTH && baseShardCount > 0 && Integer.bitCount(baseShardCount) != 1;
}

/**
* Compute a smooth shard count sequence based on prime factorization and find the largest shard count that is not larger
* than current shard count based on density or first shard if current count is NaN or negative
*
* For num_shards=1000 (5^3 * 2^3), returns the appropriate shard count
* based on current density to achieve smooth growth: 1, 5, 25, 125, 250, 500, 1000
*
* @param currentCountBasedOnDensity the current count based on density
* @return the largest shard count for the current density
*/
@VisibleForTesting
int getLargestFactorizedShardCount(double currentCountBasedOnDensity)
{
int[] sequence = factorizedShardSequence.get();
if (Double.isNaN(currentCountBasedOnDensity))
return sequence[0];

int searchKey = (int) Math.floor(currentCountBasedOnDensity);
int idx = Arrays.binarySearch(sequence, searchKey);
// exact match
if (idx >= 0)
return sequence[idx];

// insertion point
int insertionPoint = -idx - 1;
// we need the value before insertion point or 0 if insertion point is 0
int candidateIndex = Math.max(0, insertionPoint - 1);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: max is not needed here (-(-1) -1 is 0). Generally it could result in sequence.length, but that will happen only if the count is larger than the base shard count, which won't end up in this function.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, -idx - 1 is always positive. It's searching for the value before insertion point via insertionPoint - 1, so it uses max when insertionPoint is 0.

return sequence[candidateIndex];
}

/**
* Generate a factorized shard sequence for smooth shard count growth.
* Uses prime factorization with largest factors first to create a cumulative sequence.
*
* For example: 1000 (5³×2³) → [1, 5, 25, 125, 250, 500, 1000]
* This provides much smoother growth than power-of-2 jumps.
*/
@VisibleForTesting
static int[] factorizedSmoothShardSequence(int target)
{
if (target <= 0) throw new IllegalArgumentException("target must be positive");
if (target == 1) return new int[]{ 1 };

// 1) Factorize targeShards into list of prime factors in ascending order
List<Integer> primesAscending = primeFactors(target);

// 2) Cumulative product to form the chain by using largest prime first.
int[] divisors = new int[primesAscending.size() + 1];
int cur = 1;
divisors[0] = cur;
for (int i = 0; i < primesAscending.size(); i++)
{
cur *= primesAscending.get(primesAscending.size() - 1 - i);
divisors[i + 1] = cur;
}
return divisors;
}

/**
* Prime factorization for shard count (usually small) to produce a list of prime factors in ascending order
* For example: 1000 -> [2, 2, 2, 5, 5, 5]
*/
@VisibleForTesting
static List<Integer> primeFactors(int num)
{
if (num <= 1)
throw new IllegalArgumentException("num must be greater than 1, got: " + num);

List<Integer> result = new ArrayList<>();

// Factor out 2 using more readable modulo check
while (num % 2 == 0)
{
result.add(2);
num /= 2;
}

for (int factor = 3; (long) factor * factor <= num; factor += 2)
{
while (num % factor == 0)
{
result.add(factor);
num /= factor;
}
}

// If num is still > 1, then it's a prime factor
if (num > 1)
result.add(num);

return result;
}

static final class Metrics
{
private final MetricNameFactory factory;
Expand Down
Loading