2929import org .apache .cassandra .db .ColumnFamilyStore ;
3030import org .apache .cassandra .distributed .shared .ClusterUtils ;
3131import org .apache .cassandra .utils .concurrent .Condition ;
32+
3233import org .junit .AfterClass ;
3334import org .junit .Assert ;
34- import org .junit .BeforeClass ;
3535import org .junit .Test ;
36+ import org .junit .runner .RunWith ;
37+ import org .junit .runners .Parameterized ;
3638
3739import org .apache .cassandra .distributed .Cluster ;
3840import org .apache .cassandra .distributed .api .ICluster ;
3941import org .apache .cassandra .distributed .api .IInstanceConfig ;
4042import org .apache .cassandra .distributed .api .IInvokableInstance ;
43+ import org .apache .cassandra .schema .SchemaConstants ;
44+ import org .apache .cassandra .schema .SystemDistributedKeyspace ;
4145import org .apache .cassandra .service .StorageService ;
4246
4347import static com .google .common .collect .ImmutableList .of ;
48+
4449import static java .util .concurrent .TimeUnit .MINUTES ;
50+
4551import static org .apache .cassandra .distributed .api .Feature .GOSSIP ;
4652import static org .apache .cassandra .distributed .api .Feature .NETWORK ;
4753import static org .apache .cassandra .distributed .shared .AssertUtils .assertRows ;
5056import static org .apache .cassandra .utils .concurrent .Condition .newOneTimeCondition ;
5157import static org .apache .cassandra .utils .progress .ProgressEventType .COMPLETE ;
5258
59+ @ RunWith (Parameterized .class )
5360public class RepairTest extends TestBaseImpl
5461{
62+ private static boolean nodesHaveCDC ;
63+ private static boolean tableHasCDC ;
5564 private static ICluster <IInvokableInstance > cluster ;
5665
66+ @ Parameterized .Parameters (name = "nodesHaveCDC={0}, tableHasCDC={1}" )
67+ public static Iterable <Object []> data ()
68+ {
69+ return Arrays .asList (new Object [][] {{ false , false }, { false , true } , { true , false }, { true , true }});
70+ }
71+
5772 private static void insert (ICluster <IInvokableInstance > cluster , String keyspace , int start , int end , int ... nodes )
5873 {
5974 String insert = String .format ("INSERT INTO %s.test (k, c1, c2) VALUES (?, 'value1', 'value2');" , keyspace );
@@ -85,7 +100,7 @@ private static void flush(ICluster<IInvokableInstance> cluster, String keyspace,
85100 ColumnFamilyStore .FlushReason .UNIT_TESTS )));
86101 }
87102
88- private static ICluster create (Consumer <IInstanceConfig > configModifier ) throws IOException
103+ private ICluster create (Consumer <IInstanceConfig > configModifier ) throws IOException
89104 {
90105 configModifier = configModifier .andThen (
91106 config -> config .set ("hinted_handoff_enabled" , false )
@@ -98,6 +113,15 @@ private static ICluster create(Consumer<IInstanceConfig> configModifier) throws
98113
99114 static void repair (ICluster <IInvokableInstance > cluster , String keyspace , Map <String , String > options )
100115 {
116+ long [] startPositions = new long [cluster .size ()];
117+ for (int i = 1 ; i <= cluster .size (); i ++)
118+ {
119+ IInvokableInstance node = cluster .get (i );
120+ if (node .isShutdown ())
121+ continue ;
122+ startPositions [i - 1 ] = node .logs ().mark ();
123+ }
124+
101125 cluster .get (1 ).runOnInstance (rethrow (() -> {
102126 Condition await = newOneTimeCondition ();
103127 instance .repair (keyspace , options , of ((tag , event ) -> {
@@ -106,14 +130,25 @@ static void repair(ICluster<IInvokableInstance> cluster, String keyspace, Map<St
106130 })).right .get ();
107131 await .await (1L , MINUTES );
108132 }));
133+
134+ for (int i = 1 ; i <= cluster .size (); i ++)
135+ {
136+ IInvokableInstance node = cluster .get (i );
137+ if (node .isShutdown ())
138+ continue ;
139+ Assert .assertEquals ("We should use the local write path (which requires flushing) if CDC is enabled on both a node and table-level" ,
140+ nodesHaveCDC && tableHasCDC ,
141+ !node .logs ().grep (startPositions [i - 1 ],
142+ "Enqueuing flush of test \\ (STREAMS_RECEIVED\\ )" ).getResult ().isEmpty ());
143+ }
109144 }
110145
111- static void populate (ICluster <IInvokableInstance > cluster , String keyspace , String compression ) throws Exception
146+ void populate (ICluster <IInvokableInstance > cluster , String keyspace , String compression ) throws Exception
112147 {
113148 try
114149 {
115150 cluster .schemaChange (String .format ("DROP TABLE IF EXISTS %s.test;" , keyspace ));
116- cluster .schemaChange (String .format ("CREATE TABLE %s.test (k text, c1 text, c2 text, PRIMARY KEY (k)) WITH compression = %s" , keyspace , compression ));
151+ cluster .schemaChange (String .format ("CREATE TABLE %s.test (k text, c1 text, c2 text, PRIMARY KEY (k)) WITH compression = %s AND cdc = %s; " , keyspace , compression , tableHasCDC ));
117152
118153 insert (cluster , keyspace , 0 , 1000 , 1 , 2 , 3 );
119154 flush (cluster , keyspace , 1 );
@@ -147,10 +182,21 @@ void shutDownNodesAndForceRepair(ICluster<IInvokableInstance> cluster, String ke
147182 repair (cluster , keyspace , ImmutableMap .of ("forceRepair" , "true" ));
148183 }
149184
150- @ BeforeClass
151- public static void setupCluster () throws IOException
185+ public RepairTest (boolean nodesHaveCDC , boolean tableHasCDC ) throws Exception
152186 {
153- cluster = create (config -> {});
187+ // This runs per method, but we only want to rebuild the cluster if nodesHaveCDC has changed since the last
188+ // build and we need to update the configuration accordingly
189+ if (cluster != null && RepairTest .nodesHaveCDC != nodesHaveCDC )
190+ {
191+ cluster .close ();
192+ cluster = null ;
193+ }
194+
195+ if (cluster == null )
196+ cluster = create (config -> config .set ("cdc_enabled" , nodesHaveCDC ));
197+
198+ RepairTest .nodesHaveCDC = nodesHaveCDC ;
199+ RepairTest .tableHasCDC = tableHasCDC ;
154200 }
155201
156202 @ AfterClass
@@ -203,7 +249,12 @@ public void testForcedNormalRepairWithOneNodeDown() throws Exception
203249 String forceRepairKeyspace = "test_force_repair_keyspace" ;
204250 int rf = 2 ;
205251 int tokenCount = ClusterUtils .getTokenCount (cluster .get (1 ));
206- cluster .schemaChange ("CREATE KEYSPACE " + forceRepairKeyspace + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + rf + "};" );
252+
253+ cluster .schemaChange ("CREATE KEYSPACE IF NOT EXISTS " + forceRepairKeyspace + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + rf + "};" );
254+
255+ // Truncate distributed repair keyspace due to test class parameterization. We only want results
256+ // from our run
257+ cluster .schemaChange ("TRUNCATE TABLE " + SchemaConstants .DISTRIBUTED_KEYSPACE_NAME + "." + SystemDistributedKeyspace .PARENT_REPAIR_HISTORY );
207258
208259 try
209260 {
0 commit comments