Skip to content

Commit d0f0265

Browse files
authored
HCD-84 Feature flag to skip Ford Fulkerson (#1612)
The Ford Fulkerson optimization may take too long in some configs ### What is the issue Some configs make the FF computation take too long ### What does this PR fix and why was it fixed This PR adds a feature flag so you can workaround it
1 parent 3f00247 commit d0f0265

File tree

3 files changed

+97
-19
lines changed

3 files changed

+97
-19
lines changed

src/java/org/apache/cassandra/config/CassandraRelevantProperties.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -589,7 +589,14 @@ public enum CassandraRelevantProperties
589589
* Which compression algorithm to use for SSTable compression when not specified explicitly in the sstable options.
590590
* Can be "fast", which selects {@link LZ4Compressor}, or "adaptive" which selects {@link AdaptiveCompressor}.
591591
*/
592-
DEFAULT_SSTABLE_COMPRESSION("cassandra.default_sstable_compression", "fast");
592+
DEFAULT_SSTABLE_COMPRESSION("cassandra.default_sstable_compression", "fast"),
593+
594+
/**
595+
* Do not try to calculate optimal streaming candidates. This can take a lot of time in some configs specially
596+
* with vnodes.
597+
*/
598+
SKIP_OPTIMAL_STREAMING_CANDIDATES_CALCULATION("cassandra.skip_optimal_streaming_candidates_calculation", "false");
599+
593600

594601
CassandraRelevantProperties(String key, String defaultVal)
595602
{

src/java/org/apache/cassandra/dht/RangeStreamer.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.slf4j.Logger;
4242
import org.slf4j.LoggerFactory;
4343

44+
import org.apache.cassandra.config.CassandraRelevantProperties;
4445
import org.apache.cassandra.db.Keyspace;
4546
import org.apache.cassandra.db.SystemKeyspace;
4647
import org.apache.cassandra.gms.IFailureDetector;
@@ -329,9 +330,13 @@ public void addRanges(String keyspaceName, ReplicaCollection<?> replicas)
329330
logger.info("{}: range {} exists on {} for keyspace {}", description, entry.getKey(), entry.getValue(), keyspaceName);
330331

331332
Multimap<InetAddressAndPort, FetchReplica> workMap;
332-
//Only use the optimized strategy if we don't care about strict sources, have a replication factor > 1, and no
333-
//transient replicas.
334-
if (useStrictSource || strat == null || strat.getReplicationFactor().allReplicas == 1 || strat.getReplicationFactor().hasTransientReplicas())
333+
//Only use the optimized strategy if we don't care about strict sources, have a replication factor > 1, no
334+
//transient replicas or HCD-84
335+
if (CassandraRelevantProperties.SKIP_OPTIMAL_STREAMING_CANDIDATES_CALCULATION.getBoolean() ||
336+
useStrictSource ||
337+
strat == null ||
338+
strat.getReplicationFactor().allReplicas == 1 ||
339+
strat.getReplicationFactor().hasTransientReplicas())
335340
{
336341
workMap = convertPreferredEndpointsToWorkMap(fetchMap);
337342
}

test/unit/org/apache/cassandra/dht/BootStrapperTest.java

Lines changed: 81 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.net.UnknownHostException;
2121
import java.util.List;
2222
import java.util.Random;
23+
import java.util.concurrent.atomic.AtomicBoolean;
2324

2425
import com.google.common.base.Predicate;
2526
import com.google.common.base.Predicates;
@@ -28,8 +29,10 @@
2829
import org.junit.AfterClass;
2930
import org.junit.BeforeClass;
3031
import org.junit.Test;
32+
import org.junit.runner.RunWith;
3133

3234
import org.apache.cassandra.SchemaLoader;
35+
import org.apache.cassandra.config.CassandraRelevantProperties;
3336
import org.apache.cassandra.config.DatabaseDescriptor;
3437
import org.apache.cassandra.db.Keyspace;
3538
import org.apache.cassandra.dht.RangeStreamer.FetchReplica;
@@ -42,16 +45,36 @@
4245
import org.apache.cassandra.schema.Schema;
4346
import org.apache.cassandra.service.StorageService;
4447
import org.apache.cassandra.streaming.StreamOperation;
48+
import org.jboss.byteman.contrib.bmunit.BMRule;
49+
import org.jboss.byteman.contrib.bmunit.BMRules;
50+
import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
4551

4652
import static org.junit.Assert.assertEquals;
4753
import static org.junit.Assert.assertNotNull;
4854

49-
55+
@RunWith(BMUnitRunner.class)
5056
public class BootStrapperTest
5157
{
5258
static IPartitioner oldPartitioner;
5359

5460
static Predicate<Replica> originalAlivePredicate = RangeStreamer.ALIVE_PREDICATE;
61+
private static AtomicBoolean nonOptimizationHit = new AtomicBoolean(false);
62+
private static AtomicBoolean optimizationHit = new AtomicBoolean(false);
63+
private static final IFailureDetector mockFailureDetector = new IFailureDetector()
64+
{
65+
public boolean isAlive(InetAddressAndPort ep)
66+
{
67+
return true;
68+
}
69+
70+
public void interpret(InetAddressAndPort ep) { throw new UnsupportedOperationException(); }
71+
public void report(InetAddressAndPort ep) { throw new UnsupportedOperationException(); }
72+
public void registerFailureDetectionEventListener(IFailureDetectionEventListener listener) { throw new UnsupportedOperationException(); }
73+
public void unregisterFailureDetectionEventListener(IFailureDetectionEventListener listener) { throw new UnsupportedOperationException(); }
74+
public void remove(InetAddressAndPort ep) { throw new UnsupportedOperationException(); }
75+
public void forceConviction(InetAddressAndPort ep) { throw new UnsupportedOperationException(); }
76+
};
77+
5578
@BeforeClass
5679
public static void setup() throws ConfigurationException
5780
{
@@ -83,6 +106,63 @@ public void testSourceTargetComputation() throws UnknownHostException
83106
}
84107
}
85108

109+
@Test
110+
@BMRules(rules = { @BMRule(name = "Make sure the non-optimized path is picked up for some operations",
111+
targetClass = "org.apache.cassandra.dht.RangeStreamer",
112+
targetMethod = "convertPreferredEndpointsToWorkMap(EndpointsByReplica)",
113+
action = "org.apache.cassandra.dht.BootStrapperTest.nonOptimizationHit()"),
114+
@BMRule(name = "Make sure the optimized path is picked up for some operations",
115+
targetClass = "org.apache.cassandra.dht.RangeStreamer",
116+
targetMethod = "getOptimizedWorkMap(EndpointsByReplica,Collection,String)",
117+
action = "org.apache.cassandra.dht.BootStrapperTest.optimizationHit()") })
118+
public void testStreamingCandidatesOptmizationSkip() throws UnknownHostException
119+
{
120+
testSkipStreamingCandidatesOptmizationFeatureFlag(true, true, false);
121+
testSkipStreamingCandidatesOptmizationFeatureFlag(false, true, true);
122+
}
123+
124+
private void testSkipStreamingCandidatesOptmizationFeatureFlag(boolean disableOptimization, boolean nonOptimizedPathHit, boolean optimizedPathHit) throws UnknownHostException
125+
{
126+
try
127+
{
128+
nonOptimizationHit.set(false);
129+
optimizationHit.set(false);
130+
CassandraRelevantProperties.SKIP_OPTIMAL_STREAMING_CANDIDATES_CALCULATION.setBoolean(disableOptimization);
131+
132+
for (String keyspaceName : Schema.instance.getNonLocalStrategyKeyspaces().names())
133+
{
134+
StorageService ss = StorageService.instance;
135+
TokenMetadata tmd = ss.getTokenMetadata();
136+
137+
generateFakeEndpoints(10);
138+
Token myToken = tmd.partitioner.getRandomToken();
139+
InetAddressAndPort myEndpoint = InetAddressAndPort.getByName("127.0.0.1");
140+
141+
assertEquals(10, tmd.sortedTokens().size());
142+
RangeStreamer s = new RangeStreamer(tmd, null, myEndpoint, StreamOperation.BOOTSTRAP, true, DatabaseDescriptor.getEndpointSnitch(), new StreamStateStore(), mockFailureDetector, false, 1);
143+
s.addRanges(keyspaceName, Keyspace.open(keyspaceName).getReplicationStrategy().getPendingAddressRanges(tmd, myToken, myEndpoint));
144+
}
145+
146+
assertEquals(nonOptimizedPathHit, nonOptimizationHit.get());
147+
assertEquals(optimizedPathHit, optimizationHit.get());
148+
}
149+
finally
150+
{
151+
CassandraRelevantProperties.SKIP_OPTIMAL_STREAMING_CANDIDATES_CALCULATION.reset();
152+
}
153+
}
154+
155+
// used by byteman
156+
private static void nonOptimizationHit()
157+
{
158+
nonOptimizationHit.set(true);
159+
}
160+
161+
private static void optimizationHit()
162+
{
163+
optimizationHit.set(true);
164+
}
165+
86166
private RangeStreamer testSourceTargetComputation(String keyspaceName, int numOldNodes, int replicationFactor) throws UnknownHostException
87167
{
88168
StorageService ss = StorageService.instance;
@@ -93,20 +173,6 @@ private RangeStreamer testSourceTargetComputation(String keyspaceName, int numOl
93173
InetAddressAndPort myEndpoint = InetAddressAndPort.getByName("127.0.0.1");
94174

95175
assertEquals(numOldNodes, tmd.sortedTokens().size());
96-
IFailureDetector mockFailureDetector = new IFailureDetector()
97-
{
98-
public boolean isAlive(InetAddressAndPort ep)
99-
{
100-
return true;
101-
}
102-
103-
public void interpret(InetAddressAndPort ep) { throw new UnsupportedOperationException(); }
104-
public void report(InetAddressAndPort ep) { throw new UnsupportedOperationException(); }
105-
public void registerFailureDetectionEventListener(IFailureDetectionEventListener listener) { throw new UnsupportedOperationException(); }
106-
public void unregisterFailureDetectionEventListener(IFailureDetectionEventListener listener) { throw new UnsupportedOperationException(); }
107-
public void remove(InetAddressAndPort ep) { throw new UnsupportedOperationException(); }
108-
public void forceConviction(InetAddressAndPort ep) { throw new UnsupportedOperationException(); }
109-
};
110176
RangeStreamer s = new RangeStreamer(tmd, null, myEndpoint, StreamOperation.BOOTSTRAP, true, DatabaseDescriptor.getEndpointSnitch(), new StreamStateStore(), mockFailureDetector, false, 1);
111177
assertNotNull(Keyspace.open(keyspaceName));
112178
s.addRanges(keyspaceName, Keyspace.open(keyspaceName).getReplicationStrategy().getPendingAddressRanges(tmd, myToken, myEndpoint));

0 commit comments

Comments
 (0)