-
Notifications
You must be signed in to change notification settings - Fork 21
CNDB-13203: Use MessagingSuccess's version for cnx version #1621
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Checklist before you submit for review
|
Note that testMessagePurging takes 13 minutes to run now on my machine. Before it took 10 minutes though, so it's not a huge increase.
If the peer's messaging version is unknown, we assume that we can send a protocol message in the validation step. However, we always discover the version when we initiate a connection to send the message, and at the time of serialization, we might discover a mismatch. In that case, it is better to fail than to send an incorrectly serialized command.
| assert !state.isClosed(); | ||
|
|
||
| MessagingSuccess success = result.success(); | ||
| messagingVersion = success.messagingVersion; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked the whole code path to make sure this is the right fix, as the way messaging versions are handled is far from obvious. It is indeed the right one as it ultimately affect the established.messagingVersion as used here.
Just one additional note, should we also invoke settings.endpointToVersion.set() with the new version?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked the whole code path to make sure this is the right fix, as the way messaging versions are handled is far from obvious. It is indeed the right one as it ultimately affect the
established.messagingVersionas used here.
Thanks for reading it closely. I set it here with the intention of using it for the Established constructor, so we're on the same page there.
Just one additional note, should we also invoke
settings.endpointToVersion.set()with the new version?
I tested with this yesterday and I will test a bit more today. My primary reason for pushing this without that additional line is from the InboundConnectionInitator here:
cassandra/src/java/org/apache/cassandra/net/InboundConnectionInitiator.java
Lines 458 to 459 in ce0edf4
| // record the "true" endpoint, i.e. the one the peer is identified with, as opposed to the socket it connected over | |
| instance().versions.set(from, maxMessagingVersion); |
It seems like these might be "okay" to diverge, but that's only true if the endpointToVersion views the peer as having a greater version than it actually does (because the only reason the connection version in that case is that the local host has a lower version). However, maybe the difference is not always that the connection is LT the remote peer.
I can confirm that these numbers vary without settings.endpointToVersion.set().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will look at this a bit more today to come up with a decision.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sbtourist - I added assert messagingVersion <= settings.endpointToVersion.get(settings.to) to that code block and the ConnectionTest tests passed. I think that likely means my understanding is correct. I don't know all of the details here, and I definitely don't know the past implementations that we need to be compatible with. Let me know if you think I should add settings.endpointToVersion.set(...), but at this time, I don't think it is strictly necessary for this patch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My primary reason for pushing this without that additional line is from the InboundConnectionInitator here:
I've seen that, but the versions object you point at is not the same as the one used in settings, which seems specific to the connection. My proof to that is in the case RETRY block, we do set the version; wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't realize they were different versions objects. I saw the retry block, but wasn't quite sure how things integrated. I am happy to add the assignment in, if you think that is right. I just haven't had a chance to fully vet it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The versions inside settings.endpointToVersion don't seem to be used anywhere seriously, but setting the right version there seems correct to me nonetheless. Though honestly this code is not the best, so please take your time to vet my suggestion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
settings.endpointToVersion is set by calling endpointToVersion() in a constructor, which is:
public EndpointMessagingVersions endpointToVersion()
{
if (endpointToVersion == null)
return instance().versions;
return endpointToVersion;
}so unless I'm mistaken, there is only one EndpointMessagingVersions object. I don't see another way to configure it. As such, I hesitate to change this value. It seems like its own distinct change. I will leave in the assertion so that it'll trigger test failures if the value is LT the agreed upon version for the connection.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After adding the assertion and running the upgrade test, I hit the following:
java.lang.AssertionError: Found 101 > 100
at org.apache.cassandra.net.OutboundConnection$1Initiate.onCompletedHandshake(OutboundConnection.java:1125)
at org.apache.cassandra.net.OutboundConnection$1Initiate.lambda$attempt$1(OutboundConnection.java:1227)
at org.apache.cassandra.net.AsyncPromise.invokeListener(AsyncPromise.java:364)
at org.apache.cassandra.net.AsyncPromise.lambda$appendListener$2(AsyncPromise.java:379)
at org.apache.cassandra.net.AsyncPromise.invokeListener(AsyncPromise.java:364)
at org.apache.cassandra.net.AsyncPromise.notifyListeners(AsyncPromise.java:353)
at org.apache.cassandra.net.AsyncPromise.trySet(AsyncPromise.java:185)
at org.apache.cassandra.net.AsyncPromise.trySuccess(AsyncPromise.java:144)
After analyzing it and adding additional logs to CC (relevant logs pasted below), we are getting into trouble because IP addresses are getting reused and the versions map doesn't get cleaned up. writer-0 uses ds 10 and writer-1 uses ds 11, but they both end up with the same ip address because their runtimes do not overlap. The key log line comes here: Initiate(request: 100, min: 10, max: 101, type: SMALL_MESSAGES, framing: true, from: /192.168.240.4:7000) where we can see that the DS 11 coordinator sends a messaging requesting version 100 (ds 10) instead of 101 (ds 11).
This confirms that the versions map is shared. It also indicates we should either (1) update the map or (2) purge the map after a peer leaves (given the times in the logs, this doesn't appear automatic, but perhaps there is some timeout that must be hit before we purge the old versions).
[coordinatorDS11] INFO [Messaging-EventLoop-5-1] 2025-03-07 06:01:54,130 OutboundConnection.java:1222 - attempting to connect /192.168.240.4:7000->/192.168.240.5:7000-SMALL_MESSAGES-[no-channel] connecting to /192.168.240.5:7000(/192.168.240.5:7000), version = 101, accept AcceptVersions(min: 10, max: 101, dse: 4), framing = CRC, encryption = unencrypted
[writer-0] INFO [Messaging-EventLoop-5-2] 2025-03-07 06:01:54,155 InboundConnectionInitiator.java:506 - /192.168.240.4:7000(/192.168.240.4:52266)->/192.168.240.5:7000-SMALL_MESSAGES-bd0b46c6 messaging connection established, version = 100, framing = CRC, encryption = unencrypted
[coordinatorDS11] DEBUG [Messaging-EventLoop-5-1] 2025-03-07 06:01:54,164 EndpointMessagingVersions.java:67 - Assuming current protocol version for /192.168.240.5:7000
[coordinatorDS11] INFO [Messaging-EventLoop-5-1] 2025-03-07 06:01:54,165 EndpointMessagingVersions.java:45 - Setting version 100 for /192.168.240.5:7000
[coordinatorDS11] INFO [Messaging-EventLoop-5-1] 2025-03-07 06:01:54,182 OutboundConnection.java:1157 - /192.168.240.4:7000(/192.168.240.4:52266)->/192.168.240.5:7000-SMALL_MESSAGES-f7ff5fd6 successfully connected, version = 100, framing = CRC, encryption = unencrypted
...
[coordinatorDS11] INFO [Messaging-EventLoop-5-2] 2025-03-07 06:02:37,106 EndpointMessagingVersions.java:72 - Found version 100 for /192.168.240.5:7000, using min of that and current version 100
[coordinatorDS11] INFO [Messaging-EventLoop-5-2] 2025-03-07 06:02:37,106 EndpointMessagingVersions.java:72 - Found version 100 for /192.168.240.5:7000, using min of that and current version 100
[coordinatorDS11] INFO [Messaging-EventLoop-5-2] 2025-03-07 06:02:37,106 OutboundConnection.java:1741 - called messagingVersion() /192.168.240.4:7000->/192.168.240.5:7000-SMALL_MESSAGES-[no-channel] messaging version is 100
[coordinatorDS11] INFO [Messaging-EventLoop-5-2] 2025-03-07 06:02:37,106 EndpointMessagingVersions.java:72 - Found version 100 for /192.168.240.5:7000, using min of that and current version 100
[coordinatorDS11] INFO [Messaging-EventLoop-5-2] 2025-03-07 06:02:37,107 OutboundConnection.java:1222 - attempting to connect /192.168.240.4:7000->/192.168.240.5:7000-SMALL_MESSAGES-[no-channel] connecting to /192.168.240.5:7000(/192.168.240.5:7000), version = 100, accept AcceptVersions(min: 10, max: 101, dse: 4), framing = CRC, encryption = unencrypted
[writer-1] TRACE [Messaging-EventLoop-5-1] 2025-03-07 06:02:37,152 InboundConnectionInitiator.java:252 - Received handshake initiation message from peer /192.168.240.4:49722, message = Initiate(request: 100, min: 10, max: 101, type: SMALL_MESSAGES, framing: true, from: /192.168.240.4:7000)
[writer-1] TRACE [Messaging-EventLoop-5-1] 2025-03-07 06:02:37,155 InboundConnectionInitiator.java:263 - Connection version 101 (min 10) from /192.168.240.4:7000
[coordinatorDS11] INFO [Messaging-EventLoop-5-2] 2025-03-07 06:02:37,159 EndpointMessagingVersions.java:72 - Found version 100 for /192.168.240.5:7000, using min of that and current version 100
[writer-1] INFO [Messaging-EventLoop-5-1] 2025-03-07 06:02:37,159 EndpointMessagingVersions.java:45 - Setting version 101 for /192.168.240.4:7000
[coordinatorDS11] ERROR [Messaging-EventLoop-5-2] 2025-03-07 06:02:37,160 AsyncPromise.java:368 - Failed to invoke listener org.apache.cassandra.net.OutboundConnection$1Initiate$$Lambda/0x00007f8bd44083d0@164bc11b to (success: org.apache.cassandra.net.OutboundConnectionInitiator$Result$MessagingSuccess@53f7c87)
[coordinatorDS11] java.lang.AssertionError: Found 101 > 100
[coordinatorDS11] at org.apache.cassandra.net.OutboundConnection$1Initiate.onCompletedHandshake(OutboundConnection.java:1125)
[coordinatorDS11] at org.apache.cassandra.net.OutboundConnection$1Initiate.lambda$attempt$1(OutboundConnection.java:1234)
[coordinatorDS11] at org.apache.cassandra.net.AsyncPromise.invokeListener(AsyncPromise.java:364)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like the gossiper calls MessagingService.instance().versions.reset(endpoint);. So perhaps this isn't code that is likely to be encountered. Either way, I am good to set the value now.
This test passes locally for me, looking into it more. |
After extensive review, we want to set this version to ensure that we have agreement between the connection itself and other parts of the code that rely on the versions map to validate user queries.
|
|
||
| // Messaging versions are a kind of modifier, but they can only be applied once per setting, so they are broken | ||
| // out into a separate list. | ||
| static final List<Settings> SETTINGS = applyPowerSet( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The extra complexity of these settings appears to be an issue for some tests. In looking closer, I can see that the rpc timeout is affecting some, like testMessagePurging. I think it might also affect testCloseIfEndpointDown. For example, calling DatabaseDescriptor.setRpcTimeout(5000L);, speeds up these tests a bit. The test failures appear to be timeouts, so maybe there is something to look into here. I haven't seen legitimate failures yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For reference, the delay in testMessagePurging comes from this block:
cassandra/src/java/org/apache/cassandra/net/OutboundConnection.java
Lines 1089 to 1109 in 19bd463
| void onFailure(Throwable cause) | |
| { | |
| if (cause instanceof ConnectException) | |
| noSpamLogger.info("{} failed to connect", id(), cause); | |
| else | |
| noSpamLogger.error("{} failed to connect", id(), cause); | |
| JVMStabilityInspector.inspectThrowable(cause); | |
| if (hasPending()) | |
| { | |
| Promise<Result<MessagingSuccess>> result = new AsyncPromise<>(eventLoop); | |
| state = new Connecting(state.disconnected(), result, eventLoop.schedule(() -> attempt(result), max(100, retryRateMillis), MILLISECONDS)); | |
| retryRateMillis = min(1000, retryRateMillis * 2); | |
| } | |
| else | |
| { | |
| // this Initiate will be discarded | |
| state = Disconnected.dormant(state.disconnected().maintenance); | |
| } | |
| } |
|
@szymon-miezal - I see that the |
This is an experimental commit to see if it makes the testCloseIfEndpointDown pass. That test has been failing due to timeouts and it speeds up the test locally. We'll see what it does for CI.
It did pass in PR CI before I merged it. I will double-check it and get back to you. Thanks for raising it. |
|
I have started https://jenkins-stargazer.aws.dsinternal.org/view/cc-builds/job/ds-cassandra-build/988/ to verify whether it passes on jenkins as I do not see any history for that test on What I do know right now is that your branch does not contain a14cd51 without which the test will fail. |
sbtourist
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes look good but there's a test failure: https://jenkins-stargazer.aws.dsinternal.org/job/ds-cassandra-pr-gate/job/PR-1621/8/
|
Confirmed that test_revive_endpoint passes on |
sbtourist
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Retracting my approval until we figure out the test failure. Can't reproduce so far.
|
@szymon-miezal thank you for looking into it! I'll update the branch. @sbtourist - I reproduced the failure once on Friday morning, then added more logging to see what might explain it and then it would not reproduce. |
|
I can reproduce if I let it run indefinitely on repeat. I believe this is a race condition caused by:
More specifically, the race manifests if the following happens, in order:
Now, I am not really sure why we do the latter, but the former seems required as explained previously. Anyway, as previously noted, the issue should only manifest in the test, where we basically use the same |
I realized the above is not fully correct, or to be more specific, the problem is about having the same endpoint and the same set of
I implemented the latter and cannot reproduce with it, which confirms my hypothesis. I will push it. |
…ionSettings, in order to avoid race conditions caused by otherwise sharing the same EndpointMessagingVersions object with InboundConnectionInitiator.
|
❌ Build ds-cassandra-pr-gate/PR-1621 rejected by Butler1 new test failure(s) in 7 builds Found 1 new test failures
Found 15 known test failures |
jacek-lewandowski
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1;
nit: should we invalidate an entry in endpointsToVersion (which is done by Gossiper as pointed by Michael) on CNDB side when a service gets removed/disconnected?
|
Thanks @jacek-lewandowski.
Not needed though as we update the map anyway? |
|
General question: do you know whether it's a problem that might affect other versions like OSS and potentially DSE as well? |
@szymon-miezal I suggest to ask on slack. More people will see your question |
|
@szymon-miezal Good question. These bugs affect existing code, so OSS is definitely affected too. DSE has a very different messaging implementation but might also be affected by virtue of https://github.com/riptano/cndb/issues/13161. We need a better process to streamline changes across "products". |
Fixes riptano/cndb#13203 The underlying problem is that the `OutboundConnection` needs to store the message version from the peer. Otherwise, when the local `current_version` is higher than the remove `current_version`, we store the wrong value (we were storing the local value, not the agreed upon value). The PR also fixes an issue in the ANN_OPTIONS serialization logic that we can hit in rare cases where the peers haven't yet connected, so the validation logic that would prevent serialization due to a peer having too low of a message version is skipped. This PR fixes several issues related to upgrades by improving handshake logic. --------- Co-authored-by: Sergio Bossa <[email protected]>
Fixes riptano/cndb#13203 The underlying problem is that the `OutboundConnection` needs to store the message version from the peer. Otherwise, when the local `current_version` is higher than the remove `current_version`, we store the wrong value (we were storing the local value, not the agreed upon value). The PR also fixes an issue in the ANN_OPTIONS serialization logic that we can hit in rare cases where the peers haven't yet connected, so the validation logic that would prevent serialization due to a peer having too low of a message version is skipped. This PR fixes several issues related to upgrades by improving handshake logic. --------- Co-authored-by: Sergio Bossa <[email protected]>



What is the issue
Fixes https://github.com/riptano/cndb/issues/13203
The underlying problem is that the
OutboundConnectionneeds to store the message version from the peer. Otherwise, when the localcurrent_versionis higher than the removecurrent_version, we store the wrong value (we were storing the local value, not the agreed upon value).The PR also fixes an issue in the ANN_OPTIONS serialization logic that we can hit in rare cases where the peers haven't yet connected, so the validation logic that would prevent serialization due to a peer having too low of a message version is skipped.
What does this PR fix and why was it fixed
This PR fixes several issues related to upgrades by improving handshake logic.