Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.Mutation;
Expand Down Expand Up @@ -177,7 +178,7 @@ private boolean hasViews(ColumnFamilyStore cfs)

private boolean hasCDC(ColumnFamilyStore cfs)
{
return cfs.metadata().params.cdc;
return DatabaseDescriptor.isCDCEnabled() && cfs.metadata().params.cdc;
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,16 @@
import com.google.common.collect.ImmutableMap;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ICluster;
import org.apache.cassandra.distributed.api.IInstanceConfig;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.repair.SystemDistributedKeyspace;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.concurrent.SimpleCondition;
import org.apache.cassandra.utils.progress.ProgressEventType;
Expand All @@ -46,10 +49,19 @@
import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
import static org.apache.cassandra.distributed.test.ExecUtil.rethrow;

@RunWith(Parameterized.class)
public class RepairTest extends TestBaseImpl
{
private static boolean nodesHaveCDC;
private static boolean tableHasCDC;
private static ICluster<IInvokableInstance> cluster;

@Parameterized.Parameters(name = "nodesHaveCDC={0}, tableHasCDC={1}")
public static Iterable<Object[]> data()
{
return Arrays.asList(new Object[][] {{ false, false }, { false, true } , { true, false }, { true, true }});
}

private static void insert(ICluster<IInvokableInstance> cluster, String keyspace, int start, int end, int ... nodes)
{
String insert = String.format("INSERT INTO %s.test (k, c1, c2) VALUES (?, 'value1', 'value2');", keyspace);
Expand Down Expand Up @@ -80,7 +92,7 @@ private static void flush(ICluster<IInvokableInstance> cluster, String keyspace,
cluster.get(node).runOnInstance(rethrow(() -> StorageService.instance.forceKeyspaceFlush(keyspace)));
}

private static ICluster create(Consumer<IInstanceConfig> configModifier) throws IOException
private ICluster create(Consumer<IInstanceConfig> configModifier) throws IOException
{
configModifier = configModifier.andThen(
config -> config.set("hinted_handoff_enabled", false)
Expand All @@ -94,6 +106,15 @@ private static ICluster create(Consumer<IInstanceConfig> configModifier) throws

static void repair(ICluster<IInvokableInstance> cluster, String keyspace, Map<String, String> options)
{
long[] startPositions = new long[cluster.size()];
for (int i = 1; i <= cluster.size(); i++)
{
IInvokableInstance node = cluster.get(i);
if (node.isShutdown())
continue;
startPositions[i - 1] = node.logs().mark();
}

cluster.get(1).runOnInstance(rethrow(() -> {
SimpleCondition await = new SimpleCondition();
StorageService.instance.repair(keyspace, options, ImmutableList.of((tag, event) -> {
Expand All @@ -102,14 +123,25 @@ static void repair(ICluster<IInvokableInstance> cluster, String keyspace, Map<St
})).right.get();
await.await(1L, MINUTES);
}));

for (int i = 1; i <= cluster.size(); i++)
{
IInvokableInstance node = cluster.get(i);
if (node.isShutdown())
continue;
Assert.assertEquals("We should use the local write path (which requires flushing) if CDC is enabled on both a node and table-level",
nodesHaveCDC && tableHasCDC,
!node.logs().grep(startPositions[i - 1],
"Enqueuing flush of test \\(STREAMS_RECEIVED\\)").getResult().isEmpty());
}
}

static void populate(ICluster<IInvokableInstance> cluster, String keyspace, String compression) throws Exception
void populate(ICluster<IInvokableInstance> cluster, String keyspace, String compression) throws Exception
{
try
{
cluster.schemaChange(String.format("DROP TABLE IF EXISTS %s.test;", keyspace));
cluster.schemaChange(String.format("CREATE TABLE %s.test (k text, c1 text, c2 text, PRIMARY KEY (k)) WITH compression = %s", keyspace, compression));
cluster.schemaChange(String.format("CREATE TABLE %s.test (k text, c1 text, c2 text, PRIMARY KEY (k)) WITH compression = %s AND cdc = %s;", keyspace, compression, tableHasCDC));

insert(cluster, keyspace, 0, 1000, 1, 2, 3);
flush(cluster, keyspace, 1);
Expand Down Expand Up @@ -143,10 +175,21 @@ void shutDownNodesAndForceRepair(ICluster<IInvokableInstance> cluster, String ke
repair(cluster, keyspace, ImmutableMap.of("forceRepair", "true"));
}

@BeforeClass
public static void setupCluster() throws IOException
public RepairTest(boolean nodesHaveCDC, boolean tableHasCDC) throws Exception
{
cluster = create(config -> {});
// This runs per method, but we only want to rebuild the cluster if nodesHaveCDC has changed since the last
// build and we need to update the configuration accordingly
if (cluster != null && RepairTest.nodesHaveCDC != nodesHaveCDC)
{
cluster.close();
cluster = null;
}

if (cluster == null)
cluster = create(config -> config.set("cdc_enabled", nodesHaveCDC));

RepairTest.nodesHaveCDC = nodesHaveCDC;
RepairTest.tableHasCDC = tableHasCDC;
}

@AfterClass
Expand Down Expand Up @@ -198,7 +241,11 @@ public void testForcedNormalRepairWithOneNodeDown() throws Exception
// The test uses its own keyspace with rf == 2
String forceRepairKeyspace = "test_force_repair_keyspace";
int rf = 2;
cluster.schemaChange("CREATE KEYSPACE " + forceRepairKeyspace + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + rf + "};");
cluster.schemaChange("CREATE KEYSPACE IF NOT EXISTS " + forceRepairKeyspace + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + rf + "};");

// Truncate distributed repair keyspace due to test class parameterization. We only want results
// from our run
cluster.schemaChange("TRUNCATE TABLE " + SchemaConstants.DISTRIBUTED_KEYSPACE_NAME + "." + SystemDistributedKeyspace.PARENT_REPAIR_HISTORY);

try
{
Expand Down