Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.DiskBoundaries;
import org.apache.cassandra.db.compaction.unified.Environment;
import org.apache.cassandra.db.compaction.unified.RealEnvironment;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.db.memtable.Memtable;
Expand Down Expand Up @@ -126,12 +125,12 @@ default IPartitioner getPartitioner()
* Return the estimated partition count, used when the number of partitions in an sstable is not sufficient to give
* a sensible range estimation.
*/
default long estimatedPartitionCount()
default long estimatedPartitionCountInSSTables()
{
final long INITIAL_ESTIMATED_PARTITION_COUNT = 1 << 16; // If we don't yet have a count, use a sensible default.
if (metrics() == null)
return INITIAL_ESTIMATED_PARTITION_COUNT;
final Long estimation = metrics().estimatedPartitionCount.getValue();
final Long estimation = metrics().estimatedPartitionCountInSSTablesCached.getValue();
if (estimation == null || estimation == 0)
return INITIAL_ESTIMATED_PARTITION_COUNT;
return estimation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public double shardSetCoverage()
@Override
public double minimumPerPartitionSpan()
{
return localSpaceCoverage() / Math.max(1, realm.estimatedPartitionCount());
return localSpaceCoverage() / Math.max(1, realm.estimatedPartitionCountInSSTables());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public double shardSetCoverage()

public double minimumPerPartitionSpan()
{
return localSpaceCoverage() / Math.max(1, localRanges.getRealm().estimatedPartitionCount());
return localSpaceCoverage() / Math.max(1, localRanges.getRealm().estimatedPartitionCountInSSTables());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,15 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import javax.annotation.Nullable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.dht.tokenallocator.IsolatedTokenAllocator;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.TokenMetadata;

Expand Down Expand Up @@ -94,7 +88,7 @@ public double shardSetCoverage()
@Override
public double minimumPerPartitionSpan()
{
return localSpaceCoverage() / Math.max(1, realm.estimatedPartitionCount());
return localSpaceCoverage() / Math.max(1, realm.estimatedPartitionCountInSSTables());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ public static long getApproximateKeyCount(Iterable<? extends SSTableReader> ssta
long count = -1;

if (Iterables.isEmpty(sstables))
return count;
return 0;

boolean failed = false;
ICardinality cardinality = null;
Expand Down
58 changes: 51 additions & 7 deletions src/java/org/apache/cassandra/metrics/TableMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.cassandra.metrics;

import java.lang.ref.WeakReference;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
Expand All @@ -30,6 +31,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongSupplier;
import java.util.function.Predicate;
import java.util.stream.Stream;

Expand All @@ -44,6 +47,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.codahale.metrics.CachedGauge;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
Expand Down Expand Up @@ -74,6 +78,7 @@
import org.apache.cassandra.utils.Hex;
import org.apache.cassandra.utils.MovingAverage;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.concurrent.Refs;

import static org.apache.cassandra.io.sstable.format.SSTableReader.selectOnlyBigTableReaders;
import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
Expand Down Expand Up @@ -174,6 +179,12 @@ public String asCQLString()
public final Gauge<long[]> estimatedPartitionSizeHistogram;
/** Approximate number of keys in table. */
public final Gauge<Long> estimatedPartitionCount;
/** This function is used to calculate estimated partition count in sstables and store the calculated value for the
* current set of sstables. */
public final LongSupplier estimatedPartitionCountInSSTables;
/** A cached version of the estimated partition count in sstables, used by compaction. This value will be more
* precise when the table has a small number of partitions that keep getting written to. */
public final Gauge<Long> estimatedPartitionCountInSSTablesCached;
/** Histogram of estimated number of columns. */
public final Gauge<long[]> estimatedColumnCountHistogram;

Expand Down Expand Up @@ -605,20 +616,53 @@ public Long getValue()
estimatedPartitionSizeHistogram = createTableGauge("EstimatedPartitionSizeHistogram", "EstimatedRowSizeHistogram",
() -> combineHistograms(cfs.getSSTables(SSTableSet.CANONICAL),
SSTableReader::getEstimatedPartitionSize), null);


estimatedPartitionCountInSSTables = new LongSupplier()
{
// Since the sstables only change when the tracker view changes, we can cache the value.
AtomicReference<Pair<WeakReference<View>, Long>> collected = new AtomicReference<>(Pair.create(new WeakReference<>(null), 0L));

public long getAsLong()
{
final View currentView = cfs.getTracker().getView();
final Pair<WeakReference<View>, Long> currentCollected = collected.get();
if (currentView != currentCollected.left.get())
{
Refs refs = Refs.tryRef(currentView.select(SSTableSet.CANONICAL));
if (refs != null)
{
try (refs)
{
long collectedValue = SSTableReader.getApproximateKeyCount(refs);
final Pair<WeakReference<View>, Long> newCollected = Pair.create(new WeakReference<>(currentView), collectedValue);
collected.compareAndSet(currentCollected, newCollected); // okay if failed, a different thread did it
return collectedValue;
}
}
// If we can't reference, simply return the previous collected value; it can only result in a delay
// in reporting the correct key count.
}
return currentCollected.right;
}
};
estimatedPartitionCount = createTableGauge("EstimatedPartitionCount", "EstimatedRowCount", new Gauge<Long>()
{
public Long getValue()
{
long memtablePartitions = 0;
long estimatedPartitions = estimatedPartitionCountInSSTables.getAsLong();
for (Memtable memtable : cfs.getTracker().getView().getAllMemtables())
memtablePartitions += memtable.partitionCount();
try(ColumnFamilyStore.RefViewFragment refViewFragment = cfs.selectAndReference(View.selectFunction(SSTableSet.CANONICAL)))
{
return SSTableReader.getApproximateKeyCount(refViewFragment.sstables) + memtablePartitions;
}
estimatedPartitions += memtable.partitionCount();
return estimatedPartitions;
}
}, null);
estimatedPartitionCountInSSTablesCached = new CachedGauge<Long>(1, TimeUnit.SECONDS)
{
public Long loadValue()
{
return estimatedPartitionCountInSSTables.getAsLong();
}
};

estimatedColumnCountHistogram = createTableGauge("EstimatedColumnCountHistogram", "EstimatedColumnCountHistogram",
() -> combineHistograms(cfs.getSSTables(SSTableSet.CANONICAL),
SSTableReader::getEstimatedCellPerPartitionCount), null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public void testWrappingShardManagerNoDisks()
{
CompactionRealm realm = Mockito.mock(CompactionRealm.class);
when(realm.getPartitioner()).thenReturn(partitioner);
when(realm.estimatedPartitionCount()).thenReturn(1L << 16);
when(realm.estimatedPartitionCountInSSTables()).thenReturn(1L << 16);
SortedLocalRanges localRanges = SortedLocalRanges.forTestingFull(realm);
ShardManager delegate = new ShardManagerNoDisks(localRanges);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class ShardManagerReplicaAwareTest
public void testRangeEndsForShardCountEqualtToNumTokensPlusOne() throws UnknownHostException
{
var mockCompationRealm = mock(CompactionRealm.class);
when(mockCompationRealm.estimatedPartitionCount()).thenReturn(1L<<16);
when(mockCompationRealm.estimatedPartitionCountInSSTables()).thenReturn(1L<<16);

for (int numTokens = 1; numTokens < 32; numTokens++)
{
Expand All @@ -75,7 +75,7 @@ public void testRangeEndsForShardCountEqualtToNumTokensPlusOne() throws UnknownH
public void testRangeEndsAreFromTokenListAndContainLowerRangeEnds() throws UnknownHostException
{
var mockCompationRealm = mock(CompactionRealm.class);
when(mockCompationRealm.estimatedPartitionCount()).thenReturn(1L<<16);
when(mockCompationRealm.estimatedPartitionCountInSSTables()).thenReturn(1L<<16);

for (int nodeCount = 1; nodeCount <= 6; nodeCount++)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.util.stream.Collectors;

import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;

Expand Down Expand Up @@ -69,7 +68,7 @@ public void setUp()
localRanges = Mockito.mock(SortedLocalRanges.class, Mockito.withSettings().defaultAnswer(Mockito.CALLS_REAL_METHODS));
Mockito.when(localRanges.getRanges()).thenAnswer(invocation -> weightedRanges);
Mockito.when(localRanges.getRealm()).thenReturn(realm);
Mockito.when(realm.estimatedPartitionCount()).thenReturn(10000L);
Mockito.when(realm.estimatedPartitionCountInSSTables()).thenReturn(10000L);
}

@Test
Expand Down
57 changes: 57 additions & 0 deletions test/unit/org/apache/cassandra/metrics/TableMetricsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,63 @@ public void testViewMetricsCleanupOnDrop()
assertEquals(metrics.get().collect(Collectors.joining(",")), 0, metrics.get().count());
}

@Test
public void testEstimatedPartitionCount() throws InterruptedException
{
ColumnFamilyStore cfs = recreateTable();
assertEquals(0L, cfs.metric.estimatedPartitionCount.getValue().longValue());
assertEquals(0L, cfs.metric.estimatedPartitionCountInSSTablesCached.getValue().longValue());
long startTime = System.currentTimeMillis();

int partitionCount = 10;
int numRows = 100;

for (int i = 0; i < numRows; i++)
session.execute(String.format("INSERT INTO %s.%s (id, val1, val2) VALUES (%d, '%s', '%s')", KEYSPACE, TABLE, i % partitionCount, "val" + i, "val" + i));

assertEquals(partitionCount, cfs.metric.estimatedPartitionCount.getValue().longValue());
cfs.forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS);
assertEquals(partitionCount, cfs.metric.estimatedPartitionCount.getValue().longValue());

long estimatedPartitionCountInSSTables = cfs.metric.estimatedPartitionCountInSSTablesCached.getValue().longValue();
long elapsedTime = System.currentTimeMillis() - startTime;
// the caching time is one second; avoid flakiness by only checking if a long time has not passed
if (elapsedTime < 980)
assertEquals(0, estimatedPartitionCountInSSTables);

for (int i = 0; i < numRows; i++)
session.execute(String.format("INSERT INTO %s.%s (id, val1, val2) VALUES (%d, '%s', '%s')", KEYSPACE, TABLE, i % partitionCount, "val" + i, "val" + i));

estimatedPartitionCountInSSTables = cfs.metric.estimatedPartitionCountInSSTablesCached.getValue().longValue();
elapsedTime = System.currentTimeMillis() - startTime;
if (elapsedTime < 980)
assertEquals(0, estimatedPartitionCountInSSTables);
else if (elapsedTime >= 1020)
assertEquals(partitionCount, estimatedPartitionCountInSSTables);

// The answer below is incorrect but what the metric currently returns.
assertEquals(partitionCount * 2, cfs.metric.estimatedPartitionCount.getValue().longValue());
cfs.forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS);
// Recalculation for the new sstable set will correct it.
assertEquals(partitionCount, cfs.metric.estimatedPartitionCount.getValue().longValue());

// The cached estimatedPartitionCountInSSTables lags one second, check that.
estimatedPartitionCountInSSTables = cfs.metric.estimatedPartitionCountInSSTablesCached.getValue().longValue();
elapsedTime = System.currentTimeMillis() - startTime;
if (elapsedTime < 980)
assertEquals(0, estimatedPartitionCountInSSTables);
else if (elapsedTime >= 1020)
assertEquals(partitionCount, estimatedPartitionCountInSSTables);

// Wait to let it update.
if (elapsedTime < 1020)
Thread.sleep(1200 - elapsedTime);

// It must now report the correct value.
estimatedPartitionCountInSSTables = cfs.metric.estimatedPartitionCountInSSTablesCached.getValue().longValue();
assertEquals(partitionCount, estimatedPartitionCountInSSTables);
}


@AfterClass
public static void tearDown()
Expand Down
Loading