Skip to content

Commit 979b792

Browse files
CNDB-13203: Use MessagingSuccess's version for cnx version (#1621)
### What is the issue Fixes riptano/cndb#13203 The underlying problem is that the `OutboundConnection` needs to store the message version from the peer. Otherwise, when the local `current_version` is higher than the remove `current_version`, we store the wrong value (we were storing the local value, not the agreed upon value). The PR also fixes an issue in the ANN_OPTIONS serialization logic that we can hit in rare cases where the peers haven't yet connected, so the validation logic that would prevent serialization due to a peer having too low of a message version is skipped. ### What does this PR fix and why was it fixed This PR fixes several issues related to upgrades by improving handshake logic. --------- Co-authored-by: Sergio Bossa <[email protected]>
1 parent 840d5af commit 979b792

File tree

8 files changed

+103
-18
lines changed

8 files changed

+103
-18
lines changed

src/java/org/apache/cassandra/db/filter/ANNOptions.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,11 @@ public void serialize(ANNOptions options, DataOutputPlus out, int version) throw
179179
{
180180
// ANN options are only supported in DS 11 and above, so don't serialize anything if the messaging version is lower
181181
if (version < MessagingService.VERSION_DS_11)
182+
{
183+
if (options != NONE)
184+
throw new IllegalStateException("Unable to serialize ANN options with messaging version: " + version);
182185
return;
186+
}
183187

184188
int flags = flags(options);
185189
out.writeInt(flags);

src/java/org/apache/cassandra/net/EndpointMessagingVersions.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,15 @@ public class EndpointMessagingVersions
3737
// protocol versions of the other nodes in the cluster
3838
private final ConcurrentMap<InetAddressAndPort, Integer> versions = new NonBlockingHashMap<>();
3939

40+
public EndpointMessagingVersions()
41+
{
42+
}
43+
44+
private EndpointMessagingVersions(EndpointMessagingVersions versions)
45+
{
46+
this.versions.putAll(versions.versions);
47+
}
48+
4049
/**
4150
* @return the last version associated with address, or @param version if this is the first such version
4251
*/
@@ -91,4 +100,9 @@ public boolean knows(InetAddressAndPort endpoint)
91100
{
92101
return versions.containsKey(endpoint);
93102
}
103+
104+
public EndpointMessagingVersions copy()
105+
{
106+
return new EndpointMessagingVersions(this);
107+
}
94108
}

src/java/org/apache/cassandra/net/InboundMessageHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public class InboundMessageHandler extends AbstractMessageHandler
7575
private final ConnectionType type;
7676
private final InetAddressAndPort self;
7777
private final InetAddressAndPort peer;
78-
private final int version;
78+
final int version;
7979

8080
private final InboundMessageCallbacks callbacks;
8181
private final Consumer<Message<?>> consumer;

src/java/org/apache/cassandra/net/InboundMessageHandlers.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -437,6 +437,13 @@ private long sumCounters(ToLongFunction<InboundCounters> mapping)
437437
+ mapping.applyAsLong(legacyCounters);
438438
}
439439

440+
@VisibleForTesting
441+
public void assertHandlersMessagingVersion(int expectedVersion)
442+
{
443+
for (InboundMessageHandler handler : handlers)
444+
assert handler.version == expectedVersion : "Expected all handlers to be at version " + expectedVersion + " but found " + handler.version;
445+
}
446+
440447
interface HandlerProvider
441448
{
442449
InboundMessageHandler provide(FrameDecoder decoder,

src/java/org/apache/cassandra/net/OutboundConnection.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1117,6 +1117,8 @@ void onCompletedHandshake(Result<MessagingSuccess> result)
11171117
assert !state.isClosed();
11181118

11191119
MessagingSuccess success = result.success();
1120+
messagingVersion = success.messagingVersion;
1121+
settings.endpointToVersion.set(settings.to, messagingVersion);
11201122
debug.onConnect(success.messagingVersion, settings);
11211123
state.disconnected().maintenance.cancel(false);
11221124

src/java/org/apache/cassandra/net/OutboundConnectionSettings.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -498,7 +498,10 @@ public OutboundConnectionSettings withDefaults(ConnectionCategory category)
498498
applicationSendQueueReserveGlobalCapacityInBytes(),
499499
tcpNoDelay(), flushLowWaterMark, flushHighWaterMark,
500500
tcpConnectTimeoutInMS(), tcpUserTimeoutInMS(category), acceptVersions(category),
501-
from(), socketFactory(), callbacks(), debug(), endpointToVersion());
501+
from(), socketFactory(), callbacks(), debug(),
502+
// If a set of versions is passed, make sure we do a copy of it, as the version might be later updated
503+
// depending on the handshake result (i.e. nodes might handshake a different version)
504+
endpointToVersion().copy());
502505
}
503506

504507
private static boolean isInLocalDC(IEndpointSnitch snitch, InetAddressAndPort localHost, InetAddressAndPort remoteHost)

test/unit/org/apache/cassandra/db/filter/ANNOptionsTest.java

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -241,13 +241,26 @@ private void testTransport(String query, ANNOptions expectedOptions)
241241

242242
// ...with a version that doesn't support ANN options
243243
out = new DataOutputBuffer();
244-
ReadCommand.serializer.serialize(command, out, MessagingService.VERSION_DS_10);
245-
Assertions.assertThat(ReadCommand.serializer.serializedSize(command, MessagingService.VERSION_DS_10))
246-
.isEqualTo(out.buffer().remaining());
247-
in = new DataInputBuffer(out.buffer(), true);
248-
command = ReadCommand.serializer.deserialize(in, MessagingService.VERSION_DS_10);
249-
actualOptions = command.rowFilter().annOptions();
250-
Assertions.assertThat(actualOptions).isEqualTo(ANNOptions.NONE);
244+
if (expectedOptions != ANNOptions.NONE) {
245+
try
246+
{
247+
ReadCommand.serializer.serialize(command, out, MessagingService.VERSION_DS_10);
248+
}
249+
catch (IllegalStateException e)
250+
{
251+
// expected
252+
Assertions.assertThat(e)
253+
.hasMessageContaining("Unable to serialize ANN options with messaging version: " + MessagingService.VERSION_DS_10);
254+
}
255+
} else {
256+
ReadCommand.serializer.serialize(command, out, MessagingService.VERSION_DS_10);
257+
Assertions.assertThat(ReadCommand.serializer.serializedSize(command, MessagingService.VERSION_DS_10))
258+
.isEqualTo(out.buffer().remaining());
259+
in = new DataInputBuffer(out.buffer(), true);
260+
command = ReadCommand.serializer.deserialize(in, MessagingService.VERSION_DS_10);
261+
actualOptions = command.rowFilter().annOptions();
262+
Assertions.assertThat(actualOptions).isEqualTo(ANNOptions.NONE);
263+
}
251264
}
252265
catch (IOException e)
253266
{

test/unit/org/apache/cassandra/net/ConnectionTest.java

Lines changed: 51 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,9 @@
7474
import static org.apache.cassandra.net.MessagingService.VERSION_30;
7575
import static org.apache.cassandra.net.MessagingService.VERSION_3014;
7676
import static org.apache.cassandra.net.MessagingService.VERSION_40;
77+
import static org.apache.cassandra.net.MessagingService.VERSION_DS_10;
78+
import static org.apache.cassandra.net.MessagingService.VERSION_DS_11;
79+
import static org.apache.cassandra.net.MessagingService.minimum_version;
7780
import static org.apache.cassandra.net.NoPayload.noPayload;
7881
import static org.apache.cassandra.net.MessagingService.current_version;
7982
import static org.apache.cassandra.net.ConnectionUtils.*;
@@ -121,17 +124,24 @@ public void resetVerbs() throws Throwable
121124
timeouts.clear();
122125
}
123126

127+
private static volatile long originalRpcTimeout = 0;
128+
124129
@BeforeClass
125130
public static void startup()
126131
{
127132
DatabaseDescriptor.daemonInitialization();
128133
CommitLog.instance.start();
134+
// At the time of this commit, the default is 20 seconds and leads to significant delays
135+
// in this test class, especially in testMessagePurging and testCloseIfEndpointDown.
136+
originalRpcTimeout = DatabaseDescriptor.getRpcTimeout(TimeUnit.MILLISECONDS);
137+
DatabaseDescriptor.setRpcTimeout(5000L);
129138
}
130139

131140
@AfterClass
132141
public static void cleanup() throws InterruptedException
133142
{
134143
factory.shutdownNow();
144+
DatabaseDescriptor.setRpcTimeout(originalRpcTimeout);
135145
}
136146

137147
interface SendTest
@@ -192,30 +202,54 @@ Settings override(Settings settings)
192202

193203
// 30 is used for CNDB compatibility
194204
static final AcceptVersions legacy = new AcceptVersions(VERSION_3014, VERSION_3014);
205+
static final AcceptVersions ds10 = new AcceptVersions(minimum_version, VERSION_DS_10);
206+
static final AcceptVersions ds11 = new AcceptVersions(minimum_version, VERSION_DS_11);
207+
static final AcceptVersions current = new AcceptVersions(current_version, current_version);
195208

196-
static final List<Function<Settings, Settings>> MODIFIERS = ImmutableList.of(
209+
static final List<Function<Settings, Settings>> MESSAGGING_VERSIONS = ImmutableList.of(
197210
settings -> settings.outbound(outbound -> outbound.withAcceptVersions(legacy))
198211
.inbound(inbound -> inbound.withAcceptMessaging(legacy)),
212+
// Mismatched versions (in both directions) to ensure both peers will still agree on the same version.
213+
settings -> settings.outbound(outbound -> outbound.withAcceptVersions(ds11))
214+
.inbound(inbound -> inbound.withAcceptMessaging(ds10)),
215+
settings -> settings.outbound(outbound -> outbound.withAcceptVersions(ds10))
216+
.inbound(inbound -> inbound.withAcceptMessaging(ds11)),
217+
// This setting ensures that we cover the current case for the power set where no versions are overridden.
218+
settings -> settings.outbound(outbound -> outbound.withAcceptVersions(current))
219+
.inbound(inbound -> inbound.withAcceptMessaging(current))
220+
);
221+
222+
223+
static final List<Function<Settings, Settings>> MODIFIERS = ImmutableList.of(
199224
settings -> settings.outbound(outbound -> outbound.withEncryption(encryptionOptions))
200225
.inbound(inbound -> inbound.withEncryption(encryptionOptions)),
201226
settings -> settings.outbound(outbound -> outbound.withFraming(LZ4))
202227
);
203228

229+
// Messaging versions are a kind of modifier, but they can only be applied once per setting, so they are broken
230+
// out into a separate list.
204231
static final List<Settings> SETTINGS = applyPowerSet(
205-
ImmutableList.of(Settings.SMALL, Settings.LARGE),
232+
ImmutableList.of(ConnectionTest.Settings.SMALL, ConnectionTest.Settings.LARGE),
233+
MESSAGGING_VERSIONS,
206234
MODIFIERS
207235
);
208236

209-
private static <T> List<T> applyPowerSet(List<T> settings, List<Function<T, T>> modifiers)
237+
private static List<Settings> applyPowerSet(List<ConnectionTest.Settings> settings,
238+
List<Function<ConnectionTest.Settings, ConnectionTest.Settings>> messagingVersions,
239+
List<Function<ConnectionTest.Settings, ConnectionTest.Settings>> modifiers)
210240
{
211-
List<T> result = new ArrayList<>();
212-
for (Set<Function<T, T>> set : Sets.powerSet(new HashSet<>(modifiers)))
241+
List<Settings> result = new ArrayList<>();
242+
for (Function<ConnectionTest.Settings, ConnectionTest.Settings> messagingVersion : messagingVersions)
213243
{
214-
for (T s : settings)
244+
for (Set<Function<Settings, ConnectionTest.Settings>> set : Sets.powerSet(new HashSet<>(modifiers)))
215245
{
216-
for (Function<T, T> f : set)
217-
s = f.apply(s);
218-
result.add(s);
246+
for (ConnectionTest.Settings s : settings)
247+
{
248+
for (Function<Settings, ConnectionTest.Settings> f : set)
249+
s = f.apply(s);
250+
s = messagingVersion.apply(s);
251+
result.add(s);
252+
}
219253
}
220254
}
221255
return result;
@@ -306,6 +340,10 @@ public void testSendSmall() throws Throwable
306340
.expired ( 0, 0)
307341
.error ( 0, 0)
308342
.check();
343+
344+
// Ensure version is the same
345+
inbound.assertHandlersMessagingVersion(outbound.messagingVersion());
346+
Assert.assertEquals(outbound.settings().endpointToVersion.get(endpoint), outbound.messagingVersion());
309347
});
310348
}
311349

@@ -360,6 +398,10 @@ public long serializedSize(Object noPayload, int version)
360398
.expired ( 0, 0)
361399
.error ( 0, 0)
362400
.check();
401+
402+
// Ensure version is the same
403+
inbound.assertHandlersMessagingVersion(outbound.messagingVersion());
404+
Assert.assertEquals(outbound.settings().endpointToVersion.get(endpoint), outbound.messagingVersion());
363405
});
364406
}
365407

0 commit comments

Comments
 (0)