Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -3826,6 +3826,13 @@ public static TableMetrics metricsFor(TableId tableId)
return Objects.requireNonNull(getIfExists(tableId)).metric;
}

@Nullable
public static TableMetrics metricsForIfPresent(TableId tableId)
{
ColumnFamilyStore cfs = getIfExists(tableId);
return cfs == null ? null : cfs.metric;
}

// Used by CNDB
public long getMemtablesLiveSize()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import java.util.function.BiFunction;
import java.util.function.Function;

import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.metrics.TableMetrics;

public abstract class AbstractMetricsProviders<R extends SSTableReader> implements MetricsProviders
{
Expand All @@ -40,5 +42,24 @@ protected final <T extends Number> GaugeProvider<T> newGaugeProvider(String name
});
}

protected final GaugeProvider<Long> bloomFilterOffHeapMemoryUsedAwareCFSGaugeProvider(String name, Long neutralValue, Function<R, Long> extractor, BiFunction<Long, Long, Long> combiner)
{
return new SimpleGaugeProvider<>(this::map, name, readers -> {
Long total = neutralValue;

// Optionally add in-flight bloom filter off-heap memory used
if (readers.iterator().hasNext()) {
R firstReader = readers.iterator().next();
TableMetrics tableMetrics = ColumnFamilyStore.metricsForIfPresent(firstReader.metadata().id);
if (tableMetrics != null)
total = tableMetrics.inFlightBloomFilterOffHeapMemoryUsed.get();
}

for (R reader : readers)
total = combiner.apply(total, extractor.apply(reader));
return total;
});
}

protected abstract R map(SSTableReader r);
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ protected R map(SSTableReader r)
/**
* Off heap memory used by bloom filter
*/
public final GaugeProvider<Long> bloomFilterOffHeapMemoryUsed = newGaugeProvider("BloomFilterOffHeapMemoryUsed",
public final GaugeProvider<Long> bloomFilterOffHeapMemoryUsed = bloomFilterOffHeapMemoryUsedAwareCFSGaugeProvider("BloomFilterOffHeapMemoryUsed",
0L,
SSTableReaderWithFilter::getFilterOffHeapSize,
Long::sum);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,15 @@
import java.util.function.Consumer;
import java.util.function.Supplier;

import javax.annotation.Nullable;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.DeletionTime;
import org.apache.cassandra.db.compaction.OperationType;
Expand All @@ -54,6 +58,7 @@
import org.apache.cassandra.io.util.FileHandle;
import org.apache.cassandra.io.util.MmappedRegionsCache;
import org.apache.cassandra.io.util.SequentialWriter;
import org.apache.cassandra.metrics.TableMetrics;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.IFilter;
Expand Down Expand Up @@ -189,6 +194,9 @@ protected static class IndexWriter extends SortedTableWriter.AbstractIndexWriter
private DataPosition riMark;
private DataPosition piMark;

@Nullable
private final TableMetrics tableMetrics;

IndexWriter(Builder b, SequentialWriter dataWriter)
{
super(b);
Expand Down Expand Up @@ -230,10 +238,22 @@ protected static class IndexWriter extends SortedTableWriter.AbstractIndexWriter
.withMmappedRegionsCache(b.getMmappedRegionsCache());
}
partitionIndex = new PartitionIndexBuilder(partitionIndexWriter, partitionIndexFHBuilder, descriptor.version.getByteComparableVersion());

// register listeners to be alerted when the data files are flushed
partitionIndexWriter.setPostFlushListener(partitionIndex::markPartitionIndexSynced);
rowIndexWriter.setPostFlushListener(partitionIndex::markRowIndexSynced);
dataWriter.setPostFlushListener(partitionIndex::markDataSynced);


// The per-table bloom filter memory is tracked when:
// 1. Periodic early open: Opens incomplete sstables when size threshold is hit during writing.
// The BF memory usage is tracked via Tracker.
// 2. Completion early open: Opens completed sstables when compaction results in multiple sstables.
// The BF memory usage is tracked via Tracker.
// 3. A new sstable is first created here if early-open is not enabled.
tableMetrics = DatabaseDescriptor.getSSTablePreemptiveOpenIntervalInMiB() <= 0 ? ColumnFamilyStore.metricsForIfPresent(metadata.id) : null;
if (tableMetrics != null && bf != null)
tableMetrics.inFlightBloomFilterOffHeapMemoryUsed.getAndAdd(bf.offHeapSize());
}

public long append(DecoratedKey key, AbstractRowIndexEntry indexEntry) throws IOException
Expand Down Expand Up @@ -371,6 +391,8 @@ protected Throwable doAbort(Throwable accumulate)
@Override
protected Throwable doPostCleanup(Throwable accumulate)
{
if (tableMetrics != null && bf != null)
tableMetrics.inFlightBloomFilterOffHeapMemoryUsed.getAndAdd(-bf.offHeapSize());
return Throwables.close(accumulate, bf, partitionIndex, rowIndexWriter, partitionIndexWriter);
}
}
Expand Down
25 changes: 14 additions & 11 deletions src/java/org/apache/cassandra/metrics/TableMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongSupplier;
import java.util.function.Predicate;
Expand Down Expand Up @@ -263,6 +264,8 @@ public String asCQLString()
public final Gauge<Long> meanPartitionSize;
/** False positive ratio of bloom filter */
public final Gauge<Double> bloomFilterFalseRatio;

public final AtomicLong inFlightBloomFilterOffHeapMemoryUsed = new AtomicLong(0);
/** Off heap memory used by compression meta data*/
public final Gauge<Long> compressionMetadataOffHeapMemoryUsed;

Expand Down Expand Up @@ -456,19 +459,19 @@ public Double getValue()
public static final Gauge<Long> globalBytesRepaired = Metrics.register(GLOBAL_FACTORY.createMetricName("BytesRepaired"),
() -> totalNonSystemTablesSize(SSTableReader::isRepaired).left);

public static final Gauge<Long> globalBytesUnrepaired =
public static final Gauge<Long> globalBytesUnrepaired =
Metrics.register(GLOBAL_FACTORY.createMetricName("BytesUnrepaired"),
() -> totalNonSystemTablesSize(s -> !s.isRepaired() && !s.isPendingRepair()).left);

public static final Gauge<Long> globalBytesPendingRepair =
public static final Gauge<Long> globalBytesPendingRepair =
Metrics.register(GLOBAL_FACTORY.createMetricName("BytesPendingRepair"),
() -> totalNonSystemTablesSize(SSTableReader::isPendingRepair).left);

public final Meter readRepairRequests;
public final Meter shortReadProtectionRequests;

public final Meter replicaFilteringProtectionRequests;

/**
* This histogram records the maximum number of rows {@link org.apache.cassandra.service.reads.ReplicaFilteringProtection}
* caches at a point in time per query. With no replica divergence, this is equivalent to the maximum number of
Expand Down Expand Up @@ -611,20 +614,20 @@ public String toString(ByteBuffer value)
samplers.put(SamplerType.READ_TOMBSTONE_COUNT, topReadPartitionTombstoneCount);
samplers.put(SamplerType.READ_SSTABLE_COUNT, topReadPartitionSSTableCount);

memtableColumnsCount = createTableGauge("MemtableColumnsCount",
memtableColumnsCount = createTableGauge("MemtableColumnsCount",
() -> cfs.getTracker().getView().getCurrentMemtable().operationCount());

// MemtableOnHeapSize naming deprecated in 4.0
memtableOnHeapDataSize = createTableGaugeWithDeprecation("MemtableOnHeapDataSize", "MemtableOnHeapSize",
memtableOnHeapDataSize = createTableGaugeWithDeprecation("MemtableOnHeapDataSize", "MemtableOnHeapSize",
() -> Memtable.getMemoryUsage(cfs.getTracker().getView().getCurrentMemtable()).ownsOnHeap,
new GlobalTableGauge("MemtableOnHeapDataSize"));

// MemtableOffHeapSize naming deprecated in 4.0
memtableOffHeapDataSize = createTableGaugeWithDeprecation("MemtableOffHeapDataSize", "MemtableOffHeapSize",
memtableOffHeapDataSize = createTableGaugeWithDeprecation("MemtableOffHeapDataSize", "MemtableOffHeapSize",
() -> Memtable.getMemoryUsage(cfs.getTracker().getView().getCurrentMemtable()).ownsOffHeap,
new GlobalTableGauge("MemtableOnHeapDataSize"));
memtableLiveDataSize = createTableGauge("MemtableLiveDataSize",

memtableLiveDataSize = createTableGauge("MemtableLiveDataSize",
() -> cfs.getTracker().getView().getCurrentMemtable().getLiveDataSize());

// AllMemtablesHeapSize naming deprecated in 4.0
Expand Down Expand Up @@ -706,7 +709,7 @@ public Long loadValue()
};

estimatedColumnCountHistogram = createTableGauge("EstimatedColumnCountHistogram", "EstimatedColumnCountHistogram",
() -> combineHistograms(cfs.getSSTables(SSTableSet.CANONICAL),
() -> combineHistograms(cfs.getSSTables(SSTableSet.CANONICAL),
SSTableReader::getEstimatedCellPerPartitionCount), null);

estimatedRowCount = createTableGauge("EstimatedRowCount", "EstimatedRowCount", new CachedGauge<>(1, TimeUnit.SECONDS)
Expand Down Expand Up @@ -743,7 +746,7 @@ public Long loadValue()
return sstableRows + memtableRows;
}
}, null);

sstablesPerReadHistogram = createTableHistogram("SSTablesPerReadHistogram", cfs.getKeyspaceMetrics().sstablesPerReadHistogram, true);
sstablesPerRangeReadHistogram = createTableHistogram("SSTablesPerRangeReadHistogram", cfs.getKeyspaceMetrics().sstablesPerRangeReadHistogram, true);
sstablePartitionReadLatency = ExpMovingAverage.decayBy100();
Expand Down
2 changes: 2 additions & 0 deletions src/java/org/apache/cassandra/utils/FilterFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;

import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -76,6 +77,7 @@ public static IFilter getFilter(long numElements, int targetBucketsPerElem, Memo
* Asserts that the given probability can be satisfied using this
* filter.
*/
@VisibleForTesting
public static IFilter getFilter(long numElements, double maxFalsePosProbability)
{
return getFilter(numElements, maxFalsePosProbability, BloomFilter.memoryLimiter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.LongPredicate;

Expand All @@ -36,6 +38,9 @@

import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.sstable.filter.BloomFilterMetrics;
import org.apache.cassandra.metrics.TableMetrics;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
Expand All @@ -50,6 +55,7 @@
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.WrappedRunnable;
import org.awaitility.Awaitility;
import org.jboss.byteman.contrib.bmunit.BMRule;
import org.jboss.byteman.contrib.bmunit.BMRules;
import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
Expand All @@ -58,6 +64,7 @@
import static org.apache.cassandra.db.ColumnFamilyStore.FlushReason.UNIT_TESTS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNotNull;

Expand Down Expand Up @@ -203,6 +210,65 @@ public void testGetFullyExpiredSSTables()
assertEquals(1, expired.size());
}

@Test
@BMRule(name = "DelayCompaction",
targetClass = "CompactionTask$CompactionOperation",
targetMethod = "maybeStopOrUpdateState",
action = "java.lang.Thread.sleep(1000L);")
public void testInFlightBloomFilterMemory()
{
int earlyOpen = DatabaseDescriptor.getSSTablePreemptiveOpenIntervalInMiB();
try
{
DatabaseDescriptor.setSSTablePreemptiveOpenIntervalInMiB(0);

Keyspace keyspace = Keyspace.open(KEYSPACE);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF1);
cfs.truncateBlocking();
cfs.disableAutoCompaction();

for (int s = 0; s < 3; ++s)
{
for (int k = 0; k < 5; k++)
writeRows(cfs, Util.dk("k" + k), k, k + 1);
cfs.forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS);
}

TableMetrics tableMetrics = ColumnFamilyStore.metricsForIfPresent(cfs.metadata.id);
long bloomFilterMemoryBeforeCompaction = BloomFilterMetrics.instance.bloomFilterOffHeapMemoryUsed.getTableGauge(cfs).getValue();
assertNotEquals(0, bloomFilterMemoryBeforeCompaction);
assertEquals(0, tableMetrics.inFlightBloomFilterOffHeapMemoryUsed.get());

// Submit a compaction where all tombstones are expired to make compactor read memtables.
Future<?> future = Executors.newCachedThreadPool().submit(() -> {
cfs.forceMajorCompaction();
});

Awaitility.await("In flight bloom filter is tracked")
.pollInterval(1, TimeUnit.MILLISECONDS)
.atMost(10, TimeUnit.SECONDS)
.until(() ->
{
// in-flight bf memory is included
return tableMetrics.inFlightBloomFilterOffHeapMemoryUsed.get() != 0 &&
bloomFilterMemoryBeforeCompaction < BloomFilterMetrics.instance.bloomFilterOffHeapMemoryUsed.getTableGauge(cfs).getValue() &&
// in-flight sstable is not part of tracker
3 == cfs.getLiveSSTables().size();
});

// Compaction must have succeeded.
FBUtilities.waitOnFuture(future);
assertEquals(1, cfs.getLiveSSTables().size());

// In flight bloom filter is clear
assertEquals(0, tableMetrics.inFlightBloomFilterOffHeapMemoryUsed.get());
}
finally
{
DatabaseDescriptor.setSSTablePreemptiveOpenIntervalInMiB(earlyOpen);
}
}

@Test
@BMRules(rules = {
@BMRule(name = "Pause compaction",
Expand Down