Skip to content

Commit c755234

Browse files
authored
CNDB-12922 ANNOptions validation now checks the endpoint connection messaging version (#1624)
This makes sure the proper version is used, as following CNDB-13203, each connection maintains its own version, separated from the one recorded in the MessagingService. ### What is the issue `ANNOptions#validate()` was previously checking endpoint versions via `MessagingService#versions`: but following #13203, in order to resolve a race condition, each endpoint connection tracks its own `EndpointMessagingVersions` object, with the version negotiated during handshake, while `MessagingService#versions` is updated with the max version of each endpoint. It follows checking `MessagingService#versions` might not rely on the right version used for the connection. ### What does this PR fix and why was it fixed This PR introduces a new `MessagingService` method to check the connection version, and uses it inside `ANNOptions#validate()`.
1 parent 10e7146 commit c755234

File tree

3 files changed

+32
-6
lines changed

3 files changed

+32
-6
lines changed

src/java/org/apache/cassandra/db/filter/ANNOptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public void validate(QueryState state, String keyspace, int limit)
7777

7878
// Ensure that all nodes in the cluster are in a version that supports ANN options, including this one
7979
assert keyspace != null;
80-
Set<InetAddressAndPort> badNodes = MessagingService.instance().endpointsWithVersionBelow(keyspace, MessagingService.VERSION_DS_11);
80+
Set<InetAddressAndPort> badNodes = MessagingService.instance().endpointsWithConnectionsOnVersionBelow(keyspace, MessagingService.VERSION_DS_11);
8181
if (MessagingService.current_version < MessagingService.VERSION_DS_11)
8282
badNodes.add(FBUtilities.getBroadcastAddressAndPort());
8383
if (!badNodes.isEmpty())

src/java/org/apache/cassandra/net/MessagingService.java

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,7 @@ private void doSend(Message message, InetAddressAndPort to, ConnectionType speci
396396
// expire the callback if the message failed to enqueue (failed to establish a connection or exceeded queue capacity)
397397
while (true)
398398
{
399-
OutboundConnections connections = getOutbound(to);
399+
OutboundConnections connections = getOutbound(to, true);
400400
try
401401
{
402402
connections.enqueue(message, specifyConnection);
@@ -602,10 +602,10 @@ private void shutdownExecutors(long deadlineNanos) throws TimeoutException, Inte
602602
socketFactory.awaitTerminationUntil(deadlineNanos);
603603
}
604604

605-
private OutboundConnections getOutbound(InetAddressAndPort to)
605+
private OutboundConnections getOutbound(InetAddressAndPort to, boolean tryRegister)
606606
{
607607
OutboundConnections connections = channelManagers.get(to);
608-
if (connections == null)
608+
if (connections == null && tryRegister)
609609
connections = OutboundConnections.tryRegister(channelManagers, to, new OutboundConnectionSettings(to).withDefaults(ConnectionCategory.MESSAGING));
610610
return connections;
611611
}
@@ -662,4 +662,28 @@ public Set<InetAddressAndPort> endpointsWithVersionBelow(String keyspace, int ve
662662
}
663663
return nodes;
664664
}
665+
666+
/**
667+
* Returns the endpoints for the given keyspace that are known to be alive and have a connection whose
668+
* messaging version is older than the given version. To be used for example when we want to be sure a message
669+
* can be serialized to all endpoints, according to their negotiated version at connection time.
670+
*
671+
* @param keyspace a keyspace
672+
* @param version a messaging version
673+
* @return a set of alive endpoints in the given keyspace with messaging version below the given version
674+
*/
675+
public Set<InetAddressAndPort> endpointsWithConnectionsOnVersionBelow(String keyspace, int version)
676+
{
677+
Set<InetAddressAndPort> nodes = new HashSet<>();
678+
for (InetAddressAndPort node : StorageService.instance.getTokenMetadataForKeyspace(keyspace).getAllEndpoints())
679+
{
680+
ConnectionType.MESSAGING_TYPES.forEach(type -> {
681+
OutboundConnections connections = getOutbound(node, false);
682+
OutboundConnection connection = connections != null ? connections.connectionFor(type) : null;
683+
if (connection != null && connection.messagingVersion() < version)
684+
nodes.add(node);
685+
});
686+
}
687+
return nodes;
688+
}
665689
}

test/distributed/org/apache/cassandra/distributed/test/sai/ANNOptionsDistributedTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package org.apache.cassandra.distributed.test.sai;
1818

19+
import java.util.concurrent.ThreadLocalRandom;
20+
1921
import org.junit.Test;
2022

2123
import net.bytebuddy.ByteBuddy;
@@ -118,9 +120,9 @@ public static class BB
118120
{
119121
public static void install(ClassLoader classLoader, int node)
120122
{
121-
if (node == 1)
123+
// inject randomly first or second node to make sure it works if the node is a coordinator or replica
124+
if (node == ThreadLocalRandom.current().nextInt(1, 3))
122125
{
123-
// set the current verson to DS 11, which suppors ANN options
124126
new ByteBuddy().rebase(MessagingService.class)
125127
.method(named("currentVersion"))
126128
.intercept(MethodDelegation.to(BB.class))

0 commit comments

Comments
 (0)