diff --git a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java index b3ea93bf04e8..cc13aaac8fbe 100644 --- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java +++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java @@ -215,6 +215,13 @@ public enum CassandraRelevantProperties */ COMPACTION_SKIP_REPAIR_STATE_CHECKING("cassandra.compaction.skip_repair_state_checking", "false"), + /** + * Compaction validation mode to determine whether to skip validation or warn on data loss or abort on data loss; + * + * Available options: NONE, WARN, ABORT. Default is NONE + */ + COMPACTION_VALIDATION_MODE("cassandra.compaction_validation_mode", "NONE"), + /** * This property indicates the location for the access file. If com.sun.management.jmxremote.authenticate is false, * then this property and the password and access files, are ignored. Otherwise, the access file must exist and diff --git a/src/java/org/apache/cassandra/db/compaction/validation/CompactionValidationMetrics.java b/src/java/org/apache/cassandra/db/compaction/validation/CompactionValidationMetrics.java new file mode 100644 index 000000000000..338974b0f995 --- /dev/null +++ b/src/java/org/apache/cassandra/db/compaction/validation/CompactionValidationMetrics.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.compaction.validation; + +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tags; +import org.apache.cassandra.metrics.MicrometerMetrics; + +/// Metrics for tracking compaction validation operations and results. +public class CompactionValidationMetrics extends MicrometerMetrics +{ + public static final CompactionValidationMetrics INSTANCE = new CompactionValidationMetrics(); + + public Counter validationCount; + public Counter validationWithoutAbsentKeys; + public Counter absentKeys; + public Counter potentialDataLosses; + + public CompactionValidationMetrics() + { + initializeMetrics(); + } + + @Override + public synchronized void register(MeterRegistry newRegistry, Tags newTags) + { + super.register(newRegistry, newTags); + initializeMetrics(); + } + + private void initializeMetrics() + { + this.validationCount = registryWithTags().left.counter("compaction_validation_total", registryWithTags().right); + this.validationWithoutAbsentKeys = registryWithTags().left.counter("compaction_validation_without_absent_keys_total", registryWithTags().right); + this.absentKeys = registryWithTags().left.counter("compaction_validation_absent_keys_count_from_output_total", registryWithTags().right); + this.potentialDataLosses = registryWithTags().left.counter("compaction_validation_potential_data_loss_total", registryWithTags().right); + } + + public void incrementValidation() + { + validationCount.increment(); + } + + public void incrementPotentialDataLosses() + { + potentialDataLosses.increment(); + } + + public void incrementValidationWithoutAbsentKeys() + { + validationWithoutAbsentKeys.increment(); + } + + public void incrementAbsentKeys(int keys) + { + absentKeys.increment(keys); + } +} diff --git a/src/java/org/apache/cassandra/db/compaction/validation/CompactionValidationTask.java b/src/java/org/apache/cassandra/db/compaction/validation/CompactionValidationTask.java new file mode 100644 index 000000000000..def90c5ca6b4 --- /dev/null +++ b/src/java/org/apache/cassandra/db/compaction/validation/CompactionValidationTask.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.compaction.validation; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.CassandraRelevantProperties; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Slices; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.rows.RowIterator; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.db.rows.UnfilteredRowIterators; +import org.apache.cassandra.io.sstable.SSTableReadsListener; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.TimeUUID; + +/// Validates compaction tasks to detect potential data loss during compaction operations caused by skipping some +/// subranges of source sstables, see HCD-130. +/// The validation ensures all boundary keys from input SSTables are either present in output SSTables +/// or properly obsoleted by tombstones. +public class CompactionValidationTask +{ + private static final Logger logger = LoggerFactory.getLogger(CompactionValidationTask.class); + + public enum Mode + { + NONE, + WARN, + ABORT; + + public boolean shouldValidate() + { + return this != NONE; + } + + public boolean shouldAbortOnDataLoss() + { + return this == ABORT; + } + + public static Mode parseConfig() + { + String rawConfig = null; + try + { + rawConfig = CassandraRelevantProperties.COMPACTION_VALIDATION_MODE.getString(); + return Mode.valueOf(rawConfig); + } + catch (IllegalArgumentException e) + { + logger.error("Unable to pase compaction validation config '{}', fall back to NONE", rawConfig, e); + return NONE; + } + } + } + + private final TimeUUID id; + private final Set inputSSTables; + private final Set outputSSTables; + private final CompactionValidationMetrics metrics; + + private final long nowInSec; + private final Mode mode; + + public CompactionValidationTask(TimeUUID id, Set inputSSTables, Set outputSSTables, CompactionValidationMetrics metrics) + { + this.id = id; + this.inputSSTables = inputSSTables; + this.outputSSTables = outputSSTables; + this.nowInSec = FBUtilities.nowInSeconds(); + this.metrics = metrics; + this.mode = Mode.parseConfig(); + } + + public void validate() + { + if (!mode.shouldValidate()) + return; + + try + { + doValidate(); + } + catch (DataLossException e) + { + // abort compaction task + throw e; + } + catch (Throwable t) + { + logger.error("Caught unexpected error on validation task for {}: {}", id, t.getMessage(), t); + } + } + + private void doValidate() + { + logger.info("Starting compaction validation for task {}", id); + long startedNanos = System.nanoTime(); + metrics.incrementValidation(); + + Set absentKeys = new HashSet<>(); + for (SSTableReader inputSSTable : inputSSTables) + { + DecoratedKey firstKey = inputSSTable.first; + DecoratedKey lastKey = inputSSTable.last; + + if (isKeyAbsentInOutputSSTables(firstKey)) + { + if (logger.isTraceEnabled()) + logger.trace("[Task {}] First key {} from input sstable {} not found in update sstables", + id, firstKey, inputSSTable.descriptor); + + absentKeys.add(firstKey); + } + + if (isKeyAbsentInOutputSSTables(lastKey)) + { + if (logger.isTraceEnabled()) + logger.trace("[Task {}] Last key {} from input sstable {} not found in update sstables", + id, lastKey, inputSSTable.descriptor); + + absentKeys.add(lastKey); + } + } + + if (absentKeys.isEmpty()) + { + metrics.incrementValidationWithoutAbsentKeys(); + logger.info("[Task {}] Compaction validation passed: all first/last keys found in update sstables, took {}ms", + id, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startedNanos)); + return; + } + + metrics.incrementAbsentKeys(absentKeys.size()); + if (validateAbsentKeysAgainstTombstones(absentKeys)) + logger.info("[Task {}] Compaction validation passed: all absent keys are properly obsoleted due to tombstones, took {} ms", + id, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startedNanos)); + } + + private boolean isKeyAbsentInOutputSSTables(DecoratedKey key) + { + for (SSTableReader outputSSTable : outputSSTables) + { + if (outputSSTable.first.compareTo(key) <= 0 && outputSSTable.last.compareTo(key) >= 0) + { + if (outputSSTable.getPosition(key, SSTableReader.Operator.EQ) >= 0) + { + return false; + } + } + } + return true; + } + + private boolean validateAbsentKeysAgainstTombstones(Set absentKeys) + { + logger.info("[Task {}] Validating {} absent keys against tombstones from input sstables", id, absentKeys.size()); + + for (DecoratedKey absentKey : absentKeys) + { + if (!isFullyExpired(absentKey)) + { + metrics.incrementPotentialDataLosses(); + String errorMsg = String.format( + "POTENTIAL DATA LOSS on compaction task %s: Key %s from input sstables not found in update sstables " + + "and the partition is not fully expired.", id, absentKey); + logger.error(errorMsg); + if (mode.shouldAbortOnDataLoss()) + throw new DataLossException(errorMsg); + + return false; + } + } + return true; + } + + private boolean isFullyExpired(DecoratedKey key) + { + List iterators = new ArrayList<>(); + for (SSTableReader sstable : inputSSTables) + { + if (sstable.mayContainAssumingKeyIsInRange(key)) + iterators.add(readPartition(key, sstable)); + } + + // merge all input iterators + try (UnfilteredRowIterator merged = UnfilteredRowIterators.merge(iterators)) + { + // apply purging function to get rid of all tombstones + RowIterator purged = UnfilteredRowIterators.filter(merged, nowInSec); + // if there are non-purgeable content, e.g. live rows or unexpired tombstones, they should appear in output sstables + if (purged.staticRow() != null && !purged.staticRow().isEmpty()) + return false; + if (purged.hasNext()) + return false; + } + + return true; + } + + private UnfilteredRowIterator readPartition(DecoratedKey partitionKey, SSTableReader sstable) + { + return sstable.rowIterator(partitionKey, Slices.ALL, ColumnFilter.all(sstable.metadata()), false, SSTableReadsListener.NOOP_LISTENER); + } + + public static class DataLossException extends RuntimeException + { + public DataLossException(String errorMsg) + { + super(errorMsg); + } + } +} diff --git a/src/java/org/apache/cassandra/db/lifecycle/AbstractLogTransaction.java b/src/java/org/apache/cassandra/db/lifecycle/AbstractLogTransaction.java index c2fc7e627883..1fccf1c75a8d 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/AbstractLogTransaction.java +++ b/src/java/org/apache/cassandra/db/lifecycle/AbstractLogTransaction.java @@ -18,12 +18,17 @@ package org.apache.cassandra.db.lifecycle; import java.util.List; +import java.util.Set; import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.db.compaction.validation.CompactionValidationMetrics; +import org.apache.cassandra.db.compaction.validation.CompactionValidationTask; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.utils.TimeUUID; import org.apache.cassandra.utils.concurrent.Transactional; +import static org.apache.cassandra.db.compaction.OperationType.COMPACTION; + /** * A class that tracks sstable files involved in a transaction across sstables: * if the transaction succeeds the old files should be deleted and the new ones kept; @@ -40,6 +45,26 @@ public abstract Throwable prepareForObsoletion(Iterable readers, Tracker tracker, Throwable accumulate); + /** + * Perform optional validation on current transaction's input sstables and output sstables + * + * @param obsolete sstables to obsolete + * @param update sstables to update to system + */ + public void validate(Set obsolete, Set update) + { + // Only validate compaction tasks. + if (opType() != COMPACTION) + return; + + // Nothing to verify if no obsolete SSTables + if (obsolete.isEmpty()) + return; + + CompactionValidationTask task = new CompactionValidationTask(id(), obsolete, update, CompactionValidationMetrics.INSTANCE); + task.validate(); + } + public static class Obsoletion { final SSTableReader reader; diff --git a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java index 43cc830263b9..b7a2456eabe0 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java +++ b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java @@ -34,6 +34,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import com.google.common.util.concurrent.Runnables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -238,6 +239,10 @@ public void doPrepare() // since those that are not original are early readers that share the same desc with the finals maybeFail(prepareForObsoletion(filterIn(logged.obsolete, originals), log, obsoletions = new ArrayList<>(), tracker, null)); + // Use original sstables instead of logged.obsolete which may change their starting position due to early-open + Set obsolete = Sets.newHashSet(filterIn(originals, logged.obsolete)); + log.validate(obsolete, logged.update); + // This needs to be called after checkpoint and having prepared the obsoletions because it will upload the deletion // marks in CNDB log.prepareToCommit(); diff --git a/test/unit/org/apache/cassandra/db/compaction/validation/CompactionValidationTest.java b/test/unit/org/apache/cassandra/db/compaction/validation/CompactionValidationTest.java new file mode 100644 index 000000000000..1120016673b2 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/compaction/validation/CompactionValidationTest.java @@ -0,0 +1,418 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.compaction.validation; + +import java.util.Set; + +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.config.CassandraRelevantProperties; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.inject.ActionBuilder; +import org.apache.cassandra.inject.Injection; +import org.apache.cassandra.inject.Injections; +import org.apache.cassandra.inject.InvokePointBuilder; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.utils.FBUtilities; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.Assert.assertEquals; + +public class CompactionValidationTest extends CQLTester +{ + private static final String CREATE_TABLE_TEMPLATE = "CREATE TABLE %s (pk int, ck1 int, ck2 int, v1 int, v2 int, primary key(pk, ck1, ck2))"; + + private static final Injection SIMULATE_NOT_FULLY_EXPIRED = Injections.newCustom("simulate_not_fully_expired") + .add(InvokePointBuilder.newInvokePoint().onClass("org.apache.cassandra.db.compaction.validation.CompactionValidationTask") + .onMethod("isFullyExpired")) + .add(ActionBuilder.newActionBuilder().actions().doReturn(false)) + .build(); + + @BeforeClass + public static void setupClass() + { + CQLTester.setUpClass(); + + DatabaseDescriptor.createAllDirectories(); + + requireNetwork(); + } + + @Before + public void setup() + { + CassandraRelevantProperties.COMPACTION_VALIDATION_MODE.reset(); + CassandraRelevantProperties.COMPACTION_VALIDATION_MODE.setString("WARN"); + } + + @After + public void removeInjections() + { + Injections.deleteAll(); + } + + @Test + public void testValidation() throws Throwable + { + createTable(CREATE_TABLE_TEMPLATE); + + populateSSTable(1, 10, 5, 5); + populateSSTable(13, 20, 5, 5); + populateSSTable(23, 30, 5, 5); + + ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); + assertThat(cfs.getLiveSSTables()).hasSize(3); + + Stats initial = Stats.fetch(); + + cfs.forceMajorCompaction(); + + assertThat(cfs.getLiveSSTables()).hasSize(1); + assertSuccessfulValidationWithoutAbsentKeys(initial); + } + + @Test + public void testValidationWithRowTombstone() throws Throwable + { + createTable(CREATE_TABLE_TEMPLATE); + + populateSSTable(1, 1, 5, 5); + populateRowDeletionSSTable(1, 1, 5, 5); + + ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); + assertThat(cfs.getLiveSSTables()).hasSize(2); + + Stats initial = Stats.fetch(); + + cfs.forceMajorCompaction(); + + assertThat(cfs.getLiveSSTables()).hasSize(1); + assertSuccessfulValidationWithoutAbsentKeys(initial); + } + + @Test + public void testValidationWithExpiredRowTombstone() throws Throwable + { + createTable(CREATE_TABLE_TEMPLATE + " WITH gc_grace_seconds = 1"); + + populateSSTable(0, 0, 4, 4); + populateSSTable(1, 1, 4, 4); + populateSSTable(2, 2, 4, 4); + populateRowDeletionSSTable(0, 1, 4, 4); // delete all rows in key 0 and key 1 + + ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); + assertThat(cfs.getLiveSSTables()).hasSize(4); + + // sleep 3 seconds to pass gc_grace_seconds + FBUtilities.sleepQuietly(3000); + + Stats initial = Stats.fetch(); + + cfs.forceMajorCompaction(); + assertThat(cfs.getLiveSSTables()).hasSize(1); + + assertSuccessfulValidationWithAbsentKeys(initial, 2); // key 0 and key 1 are removed + } + + @Test + public void testValidationWithExpiredRowTombstoneWithStatic() throws Throwable + { + createTable("CREATE TABLE %s (pk int, ck1 int, ck2 int, v1 int, v2 int, st int static, primary key(pk, ck1, ck2)) WITH gc_grace_seconds = 1"); + + populateSSTable(1, 1, 5, 5); + populateSSTable(2, 2, 5, 5); + execute("INSERT INTO %s (pk, st) VALUES (?, ?)", 2, 2); + flush(); + + populateRowDeletionSSTable(1, 1, 5, 5); // delete key 1 rows + populatePartitionDeletionSSTable(2, 2); // delete key 2 rows and static rows + flush(); + + ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); + assertThat(cfs.getLiveSSTables()).hasSize(5); + + // sleep 3 seconds to pass gc_grace_seconds + FBUtilities.sleepQuietly(3000); + + Stats initial = Stats.fetch(); + + cfs.forceMajorCompaction(); + assertThat(cfs.getLiveSSTables()).isEmpty(); + + assertSuccessfulValidationWithAbsentKeys(initial, 2); // key 1 are removed + } + + @Test + public void testValidationWithExpiredPartitionTombstone() throws Throwable + { + createTable(CREATE_TABLE_TEMPLATE + " WITH gc_grace_seconds = 1"); + + populateSSTable(0, 0, 5, 5); + populateSSTable(1, 1, 5, 5); + populateSSTable(2, 2, 5, 5); + populatePartitionDeletionSSTable(0, 1); + + ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); + assertThat(cfs.getLiveSSTables()).hasSize(4); + + // sleep 3 seconds to pass gc_grace_seconds + FBUtilities.sleepQuietly(3000); + + Stats initial = Stats.fetch(); + + cfs.forceMajorCompaction(); + assertThat(cfs.getLiveSSTables()).hasSize(1); + + assertSuccessfulValidationWithAbsentKeys(initial, 2); // key 0 and key 1 are removed + } + + @Test + public void testValidationWithExpiredRangeTombstone() throws Throwable + { + createTable(CREATE_TABLE_TEMPLATE + " WITH gc_grace_seconds = 1"); + + populateSSTable(0, 0, 5, 5); + populateSSTable(1, 1, 5, 5); + populateSSTable(2, 2, 5, 5); + populateRangeDeletionSSTable(0, 1, 5); // populate 5 range deletion for key 0 and 1 + + ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); + assertThat(cfs.getLiveSSTables()).hasSize(4); + + // sleep 3 seconds to pass gc_grace_seconds + FBUtilities.sleepQuietly(3000); + + Stats initial = Stats.fetch(); + + cfs.forceMajorCompaction(); + assertThat(cfs.getLiveSSTables()).hasSize(1); + + assertSuccessfulValidationWithAbsentKeys(initial, 2); // key 0 and key 1 are removed + } + + @Test + public void testValidationWithTTLRowTombstone() throws Throwable + { + createTable(CREATE_TABLE_TEMPLATE + " WITH default_time_to_live = 1"); + + populateSSTable(1, 1, 5, 5); + populateSSTable(2, 2, 5, 5); + populateSSTableWithTTL(3, 3, 4, 4, 100); + + ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); + assertThat(cfs.getLiveSSTables()).hasSize(3); + + Stats initial = Stats.fetch(); + + cfs.forceMajorCompaction(); + + assertThat(cfs.getLiveSSTables()).hasSize(1); + assertSuccessfulValidationWithoutAbsentKeys(initial); + } + + @Test + public void testValidationWithExpiredTTLRowTombstone() throws Throwable + { + createTable(CREATE_TABLE_TEMPLATE + " WITH default_time_to_live = 1 and gc_grace_seconds = 1"); + + populateSSTable(1, 3, 5, 5); + populateSSTableWithTTL(4, 4, 5, 5, 1000); // ttl of 1000 is not expiring + + ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); + assertThat(cfs.getLiveSSTables()).hasSize(2); + + // sleep 4 seconds to pass ttl and gc_grace_seconds + FBUtilities.sleepQuietly(4000); + + Stats initial = Stats.fetch(); + + cfs.forceMajorCompaction(); + + assertThat(cfs.getLiveSSTables()).hasSize(1); + assertSuccessfulValidationWithAbsentKeys(initial, 2); // 2 boundary keys from 1st sstable are absent + } + + @Test + public void testValidationWithoutOutputSSTable() throws Throwable + { + createTable(CREATE_TABLE_TEMPLATE + " WITH gc_grace_seconds = 1"); + + populateSSTable(1, 1, 5, 5); + populateSSTable(2, 2, 5, 5); + populateRowDeletionSSTable(1, 2, 5, 5); + + ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); + assertThat(cfs.getLiveSSTables()).hasSize(3); + + // sleep 3 seconds to pass gc_grace_seconds + FBUtilities.sleepQuietly(3_000); + + Stats initial = Stats.fetch(); + + cfs.forceMajorCompaction(); + + // all sstables are removed + assertThat(cfs.getLiveSSTables()).hasSize(0); + assertSuccessfulValidationWithAbsentKeys(initial, 2); + } + + @Test + public void testAbortOnDataLoss() throws Throwable + { + CassandraRelevantProperties.COMPACTION_VALIDATION_MODE.setString("ABORT"); + Injections.inject(SIMULATE_NOT_FULLY_EXPIRED); + + createTable(CREATE_TABLE_TEMPLATE + " WITH gc_grace_seconds = 1"); + + populateSSTable(1, 1, 5, 5); + populateRowDeletionSSTable(1, 1, 5, 5); + FBUtilities.sleepQuietly(2000); + + ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); + Set before = cfs.getLiveSSTables(); + assertThat(before).hasSize(2); + + Stats initial = Stats.fetch(); + + assertThatThrownBy(() -> cfs.forceMajorCompaction()).hasMessageContaining("POTENTIAL DATA LOSS"); + + assertThat(cfs.getLiveSSTables()).hasSize(2).isEqualTo(before); + assertDataLossWithAbsentKeys(initial, 1); // key 1 missing + } + + private void populateSSTable(int startPartition, int endPartition, int ck1PerPartition, int ck2PerClustering) throws Throwable + { + for (int partition = startPartition; partition <= endPartition; partition++) + { + for (int ck1 = 0; ck1 < ck1PerPartition; ck1++) + { + for (int ck2 = 0; ck2 < ck2PerClustering; ck2++) + { + execute("INSERT INTO %s (pk, ck1, ck2, v1, v2) VALUES (?, ?, ?, ?, ?)", partition, ck1, ck2, 0, 0); + } + } + } + flush(); + } + + private void populateSSTableWithTTL(int startPartition, int endPartition, int ck1PerPartition, int ck2PerClustering, int ttl) throws Throwable + { + for (int partition = startPartition; partition <= endPartition; partition++) + { + for (int ck1 = 0; ck1 < ck1PerPartition; ck1++) + { + for (int ck2 = 0; ck2 < ck2PerClustering; ck2++) + { + execute("INSERT INTO %s (pk, ck1, ck2, v1, v2) VALUES (?, ?, ?, ?, ?) USING TTL ?", partition, ck1, ck2, 0, 0, ttl); + } + } + } + flush(); + } + + private void populateRowDeletionSSTable(int startKey, int endKey, int ck1PerPartition, int ck2PerClustering) throws Throwable + { + for (int partition = startKey; partition <= endKey; partition++) + { + for (int ck1 = 0; ck1 < ck1PerPartition; ck1++) + { + for (int ck2 = 0; ck2 < ck2PerClustering; ck2++) + { + execute("DELETE FROM %s where pk = ? and ck1 = ? and ck2 = ?", partition, ck1, ck2); + } + } + } + flush(); + } + + private void populatePartitionDeletionSSTable(int startKey, int endKey) throws Throwable + { + for (int partition = startKey; partition <= endKey; partition++) + { + execute("DELETE FROM %s where pk = ?", partition); + } + flush(); + } + + private void populateRangeDeletionSSTable(int startKey, int endKey, int ck1PerPartition) throws Throwable + { + for (int partition = startKey; partition <= endKey; partition++) + { + for (int ck1 = 0; ck1 < ck1PerPartition; ck1++) + { + execute("DELETE FROM %s where pk = ? and ck1 = ?", partition, ck1); + } + } + flush(); + } + + private void assertSuccessfulValidationWithAbsentKeys(Stats initialStats, int absentKeys) + { + initialStats.assertStats(1, 0, absentKeys, 0); + } + + private void assertSuccessfulValidationWithoutAbsentKeys(Stats initialStats) + { + initialStats.assertStats(1, 1, 0, 0); + } + + private void assertDataLossWithAbsentKeys(Stats initialStats, int absentKeys) + { + initialStats.assertStats(1, 0, absentKeys, 1); + } + + private static class Stats + { + private final int validations; + private final int validationsWithoutAbsentKeys; + private final int absentKeys; + private final int potentialDataLosses; + + private Stats(int validations, int validationsWithoutAbsentKeys, int absentKeys, int potentialDataLosses) + { + this.validations = validations; + this.validationsWithoutAbsentKeys = validationsWithoutAbsentKeys; + this.absentKeys = absentKeys; + this.potentialDataLosses = potentialDataLosses; + } + + public static Stats fetch() + { + CompactionValidationMetrics metrics = CompactionValidationMetrics.INSTANCE; + return new Stats((int) metrics.validationCount.count(), + (int) metrics.validationWithoutAbsentKeys.count(), + (int) metrics.absentKeys.count(), + (int) metrics.potentialDataLosses.count()); + } + + public void assertStats(int validationDiff, int validationsWithoutAbsentKeysDiff, int absentKeysDiff, int potentialDataLossesDiff) + { + Stats current = Stats.fetch(); + assertEquals(this.validations + validationDiff, current.validations); + assertEquals(this.validationsWithoutAbsentKeys + validationsWithoutAbsentKeysDiff, current.validationsWithoutAbsentKeys); + assertEquals(this.absentKeys + absentKeysDiff, current.absentKeys); + assertEquals(this.potentialDataLosses + potentialDataLossesDiff, current.potentialDataLosses); + } + } +}