From 1a7de17ab2c2ae74497cb5acfb0b03ea08e684f1 Mon Sep 17 00:00:00 2001 From: Zhao Yang Date: Wed, 20 Aug 2025 16:51:54 +0800 Subject: [PATCH 1/2] CNDB-15058: track in-flight BF memory usage from trie index writer if early open is not enabled (#1949) cndb-15058: table metrics - bloom filter memory usage doesn't include partially written sstables' if early open is not enabled Include in flight BF memory usage if early open is not enabled --- .../cassandra/db/ColumnFamilyStore.java | 7 ++ .../io/sstable/filter/BloomFilterMetrics.java | 3 + .../io/sstable/format/bti/BtiTableWriter.java | 22 +++++++ .../cassandra/metrics/TableMetrics.java | 25 +++---- .../apache/cassandra/utils/FilterFactory.java | 2 + .../compaction/CompactionControllerTest.java | 66 +++++++++++++++++++ 6 files changed, 114 insertions(+), 11 deletions(-) diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 37284f19cf06..457d0452238e 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -3826,6 +3826,13 @@ public static TableMetrics metricsFor(TableId tableId) return Objects.requireNonNull(getIfExists(tableId)).metric; } + @Nullable + public static TableMetrics metricsForIfPresent(TableId tableId) + { + ColumnFamilyStore cfs = getIfExists(tableId); + return cfs == null ? null : cfs.metric; + } + // Used by CNDB public long getMemtablesLiveSize() { diff --git a/src/java/org/apache/cassandra/io/sstable/filter/BloomFilterMetrics.java b/src/java/org/apache/cassandra/io/sstable/filter/BloomFilterMetrics.java index ee5fc3c111fc..f96abd0ffec6 100644 --- a/src/java/org/apache/cassandra/io/sstable/filter/BloomFilterMetrics.java +++ b/src/java/org/apache/cassandra/io/sstable/filter/BloomFilterMetrics.java @@ -66,6 +66,9 @@ protected R map(SSTableReader r) * Off heap memory used by bloom filter */ public final GaugeProvider bloomFilterOffHeapMemoryUsed = newGaugeProvider("BloomFilterOffHeapMemoryUsed", + // FIXME: CNDB-15213 + // Seems to correspond to TableMetrics.bloomFilterOffHeapMemoryUsed.getLong() in main + // where we want TableMetrics.inFlightBloomFilterOffHeapMemoryUsed.get() as the initial value; 0L, SSTableReaderWithFilter::getFilterOffHeapSize, Long::sum); diff --git a/src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableWriter.java index 75ca5b0a3bd8..6e37e09b25d9 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableWriter.java @@ -24,11 +24,15 @@ import java.util.function.Consumer; import java.util.function.Supplier; +import javax.annotation.Nullable; + import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.DeletionTime; import org.apache.cassandra.db.compaction.OperationType; @@ -54,6 +58,7 @@ import org.apache.cassandra.io.util.FileHandle; import org.apache.cassandra.io.util.MmappedRegionsCache; import org.apache.cassandra.io.util.SequentialWriter; +import org.apache.cassandra.metrics.TableMetrics; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.Clock; import org.apache.cassandra.utils.IFilter; @@ -189,6 +194,9 @@ protected static class IndexWriter extends SortedTableWriter.AbstractIndexWriter private DataPosition riMark; private DataPosition piMark; + @Nullable + private final TableMetrics tableMetrics; + IndexWriter(Builder b, SequentialWriter dataWriter) { super(b); @@ -230,10 +238,22 @@ protected static class IndexWriter extends SortedTableWriter.AbstractIndexWriter .withMmappedRegionsCache(b.getMmappedRegionsCache()); } partitionIndex = new PartitionIndexBuilder(partitionIndexWriter, partitionIndexFHBuilder, descriptor.version.getByteComparableVersion()); + // register listeners to be alerted when the data files are flushed partitionIndexWriter.setPostFlushListener(partitionIndex::markPartitionIndexSynced); rowIndexWriter.setPostFlushListener(partitionIndex::markRowIndexSynced); dataWriter.setPostFlushListener(partitionIndex::markDataSynced); + + + // The per-table bloom filter memory is tracked when: + // 1. Periodic early open: Opens incomplete sstables when size threshold is hit during writing. + // The BF memory usage is tracked via Tracker. + // 2. Completion early open: Opens completed sstables when compaction results in multiple sstables. + // The BF memory usage is tracked via Tracker. + // 3. A new sstable is first created here if early-open is not enabled. + tableMetrics = DatabaseDescriptor.getSSTablePreemptiveOpenIntervalInMiB() <= 0 ? ColumnFamilyStore.metricsForIfPresent(metadata.id) : null; + if (tableMetrics != null && bf != null) + tableMetrics.inFlightBloomFilterOffHeapMemoryUsed.getAndAdd(bf.offHeapSize()); } public long append(DecoratedKey key, AbstractRowIndexEntry indexEntry) throws IOException @@ -371,6 +391,8 @@ protected Throwable doAbort(Throwable accumulate) @Override protected Throwable doPostCleanup(Throwable accumulate) { + if (tableMetrics != null && bf != null) + tableMetrics.inFlightBloomFilterOffHeapMemoryUsed.getAndAdd(-bf.offHeapSize()); return Throwables.close(accumulate, bf, partitionIndex, rowIndexWriter, partitionIndexWriter); } } diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java index 67708a99a64a..5a9a41f950cf 100644 --- a/src/java/org/apache/cassandra/metrics/TableMetrics.java +++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java @@ -31,6 +31,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.LongSupplier; import java.util.function.Predicate; @@ -263,6 +264,8 @@ public String asCQLString() public final Gauge meanPartitionSize; /** False positive ratio of bloom filter */ public final Gauge bloomFilterFalseRatio; + + public final AtomicLong inFlightBloomFilterOffHeapMemoryUsed = new AtomicLong(0); /** Off heap memory used by compression meta data*/ public final Gauge compressionMetadataOffHeapMemoryUsed; @@ -456,19 +459,19 @@ public Double getValue() public static final Gauge globalBytesRepaired = Metrics.register(GLOBAL_FACTORY.createMetricName("BytesRepaired"), () -> totalNonSystemTablesSize(SSTableReader::isRepaired).left); - public static final Gauge globalBytesUnrepaired = + public static final Gauge globalBytesUnrepaired = Metrics.register(GLOBAL_FACTORY.createMetricName("BytesUnrepaired"), () -> totalNonSystemTablesSize(s -> !s.isRepaired() && !s.isPendingRepair()).left); - public static final Gauge globalBytesPendingRepair = + public static final Gauge globalBytesPendingRepair = Metrics.register(GLOBAL_FACTORY.createMetricName("BytesPendingRepair"), () -> totalNonSystemTablesSize(SSTableReader::isPendingRepair).left); public final Meter readRepairRequests; public final Meter shortReadProtectionRequests; - + public final Meter replicaFilteringProtectionRequests; - + /** * This histogram records the maximum number of rows {@link org.apache.cassandra.service.reads.ReplicaFilteringProtection} * caches at a point in time per query. With no replica divergence, this is equivalent to the maximum number of @@ -611,20 +614,20 @@ public String toString(ByteBuffer value) samplers.put(SamplerType.READ_TOMBSTONE_COUNT, topReadPartitionTombstoneCount); samplers.put(SamplerType.READ_SSTABLE_COUNT, topReadPartitionSSTableCount); - memtableColumnsCount = createTableGauge("MemtableColumnsCount", + memtableColumnsCount = createTableGauge("MemtableColumnsCount", () -> cfs.getTracker().getView().getCurrentMemtable().operationCount()); // MemtableOnHeapSize naming deprecated in 4.0 - memtableOnHeapDataSize = createTableGaugeWithDeprecation("MemtableOnHeapDataSize", "MemtableOnHeapSize", + memtableOnHeapDataSize = createTableGaugeWithDeprecation("MemtableOnHeapDataSize", "MemtableOnHeapSize", () -> Memtable.getMemoryUsage(cfs.getTracker().getView().getCurrentMemtable()).ownsOnHeap, new GlobalTableGauge("MemtableOnHeapDataSize")); // MemtableOffHeapSize naming deprecated in 4.0 - memtableOffHeapDataSize = createTableGaugeWithDeprecation("MemtableOffHeapDataSize", "MemtableOffHeapSize", + memtableOffHeapDataSize = createTableGaugeWithDeprecation("MemtableOffHeapDataSize", "MemtableOffHeapSize", () -> Memtable.getMemoryUsage(cfs.getTracker().getView().getCurrentMemtable()).ownsOffHeap, new GlobalTableGauge("MemtableOnHeapDataSize")); - - memtableLiveDataSize = createTableGauge("MemtableLiveDataSize", + + memtableLiveDataSize = createTableGauge("MemtableLiveDataSize", () -> cfs.getTracker().getView().getCurrentMemtable().getLiveDataSize()); // AllMemtablesHeapSize naming deprecated in 4.0 @@ -706,7 +709,7 @@ public Long loadValue() }; estimatedColumnCountHistogram = createTableGauge("EstimatedColumnCountHistogram", "EstimatedColumnCountHistogram", - () -> combineHistograms(cfs.getSSTables(SSTableSet.CANONICAL), + () -> combineHistograms(cfs.getSSTables(SSTableSet.CANONICAL), SSTableReader::getEstimatedCellPerPartitionCount), null); estimatedRowCount = createTableGauge("EstimatedRowCount", "EstimatedRowCount", new CachedGauge<>(1, TimeUnit.SECONDS) @@ -743,7 +746,7 @@ public Long loadValue() return sstableRows + memtableRows; } }, null); - + sstablesPerReadHistogram = createTableHistogram("SSTablesPerReadHistogram", cfs.getKeyspaceMetrics().sstablesPerReadHistogram, true); sstablesPerRangeReadHistogram = createTableHistogram("SSTablesPerRangeReadHistogram", cfs.getKeyspaceMetrics().sstablesPerRangeReadHistogram, true); sstablePartitionReadLatency = ExpMovingAverage.decayBy100(); diff --git a/src/java/org/apache/cassandra/utils/FilterFactory.java b/src/java/org/apache/cassandra/utils/FilterFactory.java index 89fb0a6f2a85..1275de356bba 100644 --- a/src/java/org/apache/cassandra/utils/FilterFactory.java +++ b/src/java/org/apache/cassandra/utils/FilterFactory.java @@ -19,6 +19,7 @@ import java.io.IOException; +import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,6 +77,7 @@ public static IFilter getFilter(long numElements, int targetBucketsPerElem, Memo * Asserts that the given probability can be satisfied using this * filter. */ + @VisibleForTesting public static IFilter getFilter(long numElements, double maxFalsePosProbability) { return getFilter(numElements, maxFalsePosProbability, BloomFilter.memoryLimiter); diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java index c2ee854f20ee..123a00beb306 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java @@ -23,7 +23,9 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.function.LongPredicate; @@ -36,6 +38,9 @@ import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.io.sstable.filter.BloomFilterMetrics; +import org.apache.cassandra.metrics.TableMetrics; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.DecoratedKey; @@ -50,6 +55,7 @@ import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.WrappedRunnable; +import org.awaitility.Awaitility; import org.jboss.byteman.contrib.bmunit.BMRule; import org.jboss.byteman.contrib.bmunit.BMRules; import org.jboss.byteman.contrib.bmunit.BMUnitRunner; @@ -58,6 +64,7 @@ import static org.apache.cassandra.db.ColumnFamilyStore.FlushReason.UNIT_TESTS; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertNotNull; @@ -203,6 +210,65 @@ public void testGetFullyExpiredSSTables() assertEquals(1, expired.size()); } + @Test + @BMRule(name = "DelayCompaction", + targetClass = "CompactionTask$CompactionOperation", + targetMethod = "maybeStopOrUpdateState", + action = "java.lang.Thread.sleep(1000L);") + public void testInFlightBloomFilterMemory() + { + int earlyOpen = DatabaseDescriptor.getSSTablePreemptiveOpenIntervalInMiB(); + try + { + DatabaseDescriptor.setSSTablePreemptiveOpenIntervalInMiB(0); + + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF1); + cfs.truncateBlocking(); + cfs.disableAutoCompaction(); + + for (int s = 0; s < 3; ++s) + { + for (int k = 0; k < 5; k++) + writeRows(cfs, Util.dk("k" + k), k, k + 1); + cfs.forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS); + } + + TableMetrics tableMetrics = ColumnFamilyStore.metricsForIfPresent(cfs.metadata.id); + long bloomFilterMemoryBeforeCompaction = BloomFilterMetrics.instance.bloomFilterOffHeapMemoryUsed.getTableGauge(cfs).getValue(); + assertNotEquals(0, bloomFilterMemoryBeforeCompaction); + assertEquals(0, tableMetrics.inFlightBloomFilterOffHeapMemoryUsed.get()); + + // Submit a compaction where all tombstones are expired to make compactor read memtables. + Future future = Executors.newCachedThreadPool().submit(() -> { + cfs.forceMajorCompaction(); + }); + + Awaitility.await("In flight bloom filter is tracked") + .pollInterval(1, TimeUnit.MILLISECONDS) + .atMost(10, TimeUnit.SECONDS) + .until(() -> + { + // in-flight bf memory is included + return tableMetrics.inFlightBloomFilterOffHeapMemoryUsed.get() != 0 && + bloomFilterMemoryBeforeCompaction < BloomFilterMetrics.instance.bloomFilterOffHeapMemoryUsed.getTableGauge(cfs).getValue() && + // in-flight sstable is not part of tracker + 3 == cfs.getLiveSSTables().size(); + }); + + // Compaction must have succeeded. + FBUtilities.waitOnFuture(future); + assertEquals(1, cfs.getLiveSSTables().size()); + + // In flight bloom filter is clear + assertEquals(0, tableMetrics.inFlightBloomFilterOffHeapMemoryUsed.get()); + } + finally + { + DatabaseDescriptor.setSSTablePreemptiveOpenIntervalInMiB(earlyOpen); + } + } + @Test @BMRules(rules = { @BMRule(name = "Pause compaction", From 8f1273ce6dc250602b9728f245784ff3386599de Mon Sep 17 00:00:00 2001 From: Daniel Jatnieks Date: Wed, 27 Aug 2025 11:58:40 -0500 Subject: [PATCH 2/2] Add bloomFilterOffHeapMemoryUsedAwareCFSGaugeProvider and use with BloomFilterMetrics.bloomFilterOffHeapMemoryUsed --- .../io/sstable/AbstractMetricsProviders.java | 21 +++++++++++++++++++ .../io/sstable/filter/BloomFilterMetrics.java | 5 +---- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractMetricsProviders.java b/src/java/org/apache/cassandra/io/sstable/AbstractMetricsProviders.java index 15b1161cf9ba..8897d4364394 100644 --- a/src/java/org/apache/cassandra/io/sstable/AbstractMetricsProviders.java +++ b/src/java/org/apache/cassandra/io/sstable/AbstractMetricsProviders.java @@ -21,7 +21,9 @@ import java.util.function.BiFunction; import java.util.function.Function; +import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.metrics.TableMetrics; public abstract class AbstractMetricsProviders implements MetricsProviders { @@ -40,5 +42,24 @@ protected final GaugeProvider newGaugeProvider(String name }); } + protected final GaugeProvider bloomFilterOffHeapMemoryUsedAwareCFSGaugeProvider(String name, Long neutralValue, Function extractor, BiFunction combiner) + { + return new SimpleGaugeProvider<>(this::map, name, readers -> { + Long total = neutralValue; + + // Optionally add in-flight bloom filter off-heap memory used + if (readers.iterator().hasNext()) { + R firstReader = readers.iterator().next(); + TableMetrics tableMetrics = ColumnFamilyStore.metricsForIfPresent(firstReader.metadata().id); + if (tableMetrics != null) + total = tableMetrics.inFlightBloomFilterOffHeapMemoryUsed.get(); + } + + for (R reader : readers) + total = combiner.apply(total, extractor.apply(reader)); + return total; + }); + } + protected abstract R map(SSTableReader r); } diff --git a/src/java/org/apache/cassandra/io/sstable/filter/BloomFilterMetrics.java b/src/java/org/apache/cassandra/io/sstable/filter/BloomFilterMetrics.java index f96abd0ffec6..007f67298f83 100644 --- a/src/java/org/apache/cassandra/io/sstable/filter/BloomFilterMetrics.java +++ b/src/java/org/apache/cassandra/io/sstable/filter/BloomFilterMetrics.java @@ -65,10 +65,7 @@ protected R map(SSTableReader r) /** * Off heap memory used by bloom filter */ - public final GaugeProvider bloomFilterOffHeapMemoryUsed = newGaugeProvider("BloomFilterOffHeapMemoryUsed", - // FIXME: CNDB-15213 - // Seems to correspond to TableMetrics.bloomFilterOffHeapMemoryUsed.getLong() in main - // where we want TableMetrics.inFlightBloomFilterOffHeapMemoryUsed.get() as the initial value; + public final GaugeProvider bloomFilterOffHeapMemoryUsed = bloomFilterOffHeapMemoryUsedAwareCFSGaugeProvider("BloomFilterOffHeapMemoryUsed", 0L, SSTableReaderWithFilter::getFilterOffHeapSize, Long::sum);