Skip to content

Conversation

@michaeljmarshall
Copy link
Member

@michaeljmarshall michaeljmarshall commented Mar 4, 2025

What is the issue

Fixes https://github.com/riptano/cndb/issues/13203

The underlying problem is that the OutboundConnection needs to store the message version from the peer. Otherwise, when the local current_version is higher than the remove current_version, we store the wrong value (we were storing the local value, not the agreed upon value).

The PR also fixes an issue in the ANN_OPTIONS serialization logic that we can hit in rare cases where the peers haven't yet connected, so the validation logic that would prevent serialization due to a peer having too low of a message version is skipped.

What does this PR fix and why was it fixed

This PR fixes several issues related to upgrades by improving handshake logic.

@github-actions
Copy link

github-actions bot commented Mar 4, 2025

Checklist before you submit for review

  • Make sure there is a PR in the CNDB project updating the Converged Cassandra version
  • Use NoSpamLogger for log lines that may appear frequently in the logs
  • Verify test results on Butler
  • Test coverage for new/modified code is > 80%
  • Proper code formatting
  • Proper title for each commit staring with the project-issue number, like CNDB-1234
  • Each commit has a meaningful description
  • Each commit is not very long and contains related changes
  • Renames, moves and reformatting are in distinct commits

Note that testMessagePurging takes 13 minutes
to run now on my machine. Before it took 10
minutes though, so it's not a huge increase.
If the peer's messaging version is unknown, we
assume that we can send a protocol message
in the validation step. However, we always discover
the version when we initiate a connection to
send the message, and at the time of serialization,
we might discover a mismatch. In that case,
it is better to fail than to send an incorrectly
serialized command.
@michaeljmarshall michaeljmarshall self-assigned this Mar 6, 2025
assert !state.isClosed();

MessagingSuccess success = result.success();
messagingVersion = success.messagingVersion;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked the whole code path to make sure this is the right fix, as the way messaging versions are handled is far from obvious. It is indeed the right one as it ultimately affect the established.messagingVersion as used here.

Just one additional note, should we also invoke settings.endpointToVersion.set() with the new version?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked the whole code path to make sure this is the right fix, as the way messaging versions are handled is far from obvious. It is indeed the right one as it ultimately affect the established.messagingVersion as used here.

Thanks for reading it closely. I set it here with the intention of using it for the Established constructor, so we're on the same page there.

Just one additional note, should we also invoke settings.endpointToVersion.set() with the new version?

I tested with this yesterday and I will test a bit more today. My primary reason for pushing this without that additional line is from the InboundConnectionInitator here:

// record the "true" endpoint, i.e. the one the peer is identified with, as opposed to the socket it connected over
instance().versions.set(from, maxMessagingVersion);

It seems like these might be "okay" to diverge, but that's only true if the endpointToVersion views the peer as having a greater version than it actually does (because the only reason the connection version in that case is that the local host has a lower version). However, maybe the difference is not always that the connection is LT the remote peer.

I can confirm that these numbers vary without settings.endpointToVersion.set().

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will look at this a bit more today to come up with a decision.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sbtourist - I added assert messagingVersion <= settings.endpointToVersion.get(settings.to) to that code block and the ConnectionTest tests passed. I think that likely means my understanding is correct. I don't know all of the details here, and I definitely don't know the past implementations that we need to be compatible with. Let me know if you think I should add settings.endpointToVersion.set(...), but at this time, I don't think it is strictly necessary for this patch.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My primary reason for pushing this without that additional line is from the InboundConnectionInitator here:

I've seen that, but the versions object you point at is not the same as the one used in settings, which seems specific to the connection. My proof to that is in the case RETRY block, we do set the version; wdyt?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't realize they were different versions objects. I saw the retry block, but wasn't quite sure how things integrated. I am happy to add the assignment in, if you think that is right. I just haven't had a chance to fully vet it.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The versions inside settings.endpointToVersion don't seem to be used anywhere seriously, but setting the right version there seems correct to me nonetheless. Though honestly this code is not the best, so please take your time to vet my suggestion.

Copy link
Member Author

@michaeljmarshall michaeljmarshall Mar 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

settings.endpointToVersion is set by calling endpointToVersion() in a constructor, which is:

    public EndpointMessagingVersions endpointToVersion()
    {
        if (endpointToVersion == null)
            return instance().versions;
        return endpointToVersion;
    }

so unless I'm mistaken, there is only one EndpointMessagingVersions object. I don't see another way to configure it. As such, I hesitate to change this value. It seems like its own distinct change. I will leave in the assertion so that it'll trigger test failures if the value is LT the agreed upon version for the connection.

Copy link
Member Author

@michaeljmarshall michaeljmarshall Mar 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After adding the assertion and running the upgrade test, I hit the following:

java.lang.AssertionError: Found 101 > 100
	at org.apache.cassandra.net.OutboundConnection$1Initiate.onCompletedHandshake(OutboundConnection.java:1125)
	at org.apache.cassandra.net.OutboundConnection$1Initiate.lambda$attempt$1(OutboundConnection.java:1227)
	at org.apache.cassandra.net.AsyncPromise.invokeListener(AsyncPromise.java:364)
	at org.apache.cassandra.net.AsyncPromise.lambda$appendListener$2(AsyncPromise.java:379)
	at org.apache.cassandra.net.AsyncPromise.invokeListener(AsyncPromise.java:364)
	at org.apache.cassandra.net.AsyncPromise.notifyListeners(AsyncPromise.java:353)
	at org.apache.cassandra.net.AsyncPromise.trySet(AsyncPromise.java:185)
	at org.apache.cassandra.net.AsyncPromise.trySuccess(AsyncPromise.java:144)

After analyzing it and adding additional logs to CC (relevant logs pasted below), we are getting into trouble because IP addresses are getting reused and the versions map doesn't get cleaned up. writer-0 uses ds 10 and writer-1 uses ds 11, but they both end up with the same ip address because their runtimes do not overlap. The key log line comes here: Initiate(request: 100, min: 10, max: 101, type: SMALL_MESSAGES, framing: true, from: /192.168.240.4:7000) where we can see that the DS 11 coordinator sends a messaging requesting version 100 (ds 10) instead of 101 (ds 11).

This confirms that the versions map is shared. It also indicates we should either (1) update the map or (2) purge the map after a peer leaves (given the times in the logs, this doesn't appear automatic, but perhaps there is some timeout that must be hit before we purge the old versions).

[coordinatorDS11] INFO  [Messaging-EventLoop-5-1] 2025-03-07 06:01:54,130 OutboundConnection.java:1222 - attempting to connect /192.168.240.4:7000->/192.168.240.5:7000-SMALL_MESSAGES-[no-channel] connecting to /192.168.240.5:7000(/192.168.240.5:7000), version = 101, accept AcceptVersions(min: 10, max: 101, dse: 4), framing = CRC, encryption = unencrypted
[writer-0] INFO  [Messaging-EventLoop-5-2] 2025-03-07 06:01:54,155 InboundConnectionInitiator.java:506 - /192.168.240.4:7000(/192.168.240.4:52266)->/192.168.240.5:7000-SMALL_MESSAGES-bd0b46c6 messaging connection established, version = 100, framing = CRC, encryption = unencrypted
[coordinatorDS11] DEBUG [Messaging-EventLoop-5-1] 2025-03-07 06:01:54,164 EndpointMessagingVersions.java:67 - Assuming current protocol version for /192.168.240.5:7000
[coordinatorDS11] INFO  [Messaging-EventLoop-5-1] 2025-03-07 06:01:54,165 EndpointMessagingVersions.java:45 - Setting version 100 for /192.168.240.5:7000
[coordinatorDS11] INFO  [Messaging-EventLoop-5-1] 2025-03-07 06:01:54,182 OutboundConnection.java:1157 - /192.168.240.4:7000(/192.168.240.4:52266)->/192.168.240.5:7000-SMALL_MESSAGES-f7ff5fd6 successfully connected, version = 100, framing = CRC, encryption = unencrypted

...

[coordinatorDS11] INFO  [Messaging-EventLoop-5-2] 2025-03-07 06:02:37,106 EndpointMessagingVersions.java:72 - Found version 100 for /192.168.240.5:7000, using min of that and current version 100
[coordinatorDS11] INFO  [Messaging-EventLoop-5-2] 2025-03-07 06:02:37,106 EndpointMessagingVersions.java:72 - Found version 100 for /192.168.240.5:7000, using min of that and current version 100
[coordinatorDS11] INFO  [Messaging-EventLoop-5-2] 2025-03-07 06:02:37,106 OutboundConnection.java:1741 - called messagingVersion() /192.168.240.4:7000->/192.168.240.5:7000-SMALL_MESSAGES-[no-channel] messaging version is 100
[coordinatorDS11] INFO  [Messaging-EventLoop-5-2] 2025-03-07 06:02:37,106 EndpointMessagingVersions.java:72 - Found version 100 for /192.168.240.5:7000, using min of that and current version 100
[coordinatorDS11] INFO  [Messaging-EventLoop-5-2] 2025-03-07 06:02:37,107 OutboundConnection.java:1222 - attempting to connect /192.168.240.4:7000->/192.168.240.5:7000-SMALL_MESSAGES-[no-channel] connecting to /192.168.240.5:7000(/192.168.240.5:7000), version = 100, accept AcceptVersions(min: 10, max: 101, dse: 4), framing = CRC, encryption = unencrypted
[writer-1] TRACE [Messaging-EventLoop-5-1] 2025-03-07 06:02:37,152 InboundConnectionInitiator.java:252 - Received handshake initiation message from peer /192.168.240.4:49722, message = Initiate(request: 100, min: 10, max: 101, type: SMALL_MESSAGES, framing: true, from: /192.168.240.4:7000)
[writer-1] TRACE [Messaging-EventLoop-5-1] 2025-03-07 06:02:37,155 InboundConnectionInitiator.java:263 - Connection version 101 (min 10) from /192.168.240.4:7000
[coordinatorDS11] INFO  [Messaging-EventLoop-5-2] 2025-03-07 06:02:37,159 EndpointMessagingVersions.java:72 - Found version 100 for /192.168.240.5:7000, using min of that and current version 100
[writer-1] INFO  [Messaging-EventLoop-5-1] 2025-03-07 06:02:37,159 EndpointMessagingVersions.java:45 - Setting version 101 for /192.168.240.4:7000
[coordinatorDS11] ERROR [Messaging-EventLoop-5-2] 2025-03-07 06:02:37,160 AsyncPromise.java:368 - Failed to invoke listener org.apache.cassandra.net.OutboundConnection$1Initiate$$Lambda/0x00007f8bd44083d0@164bc11b to (success: org.apache.cassandra.net.OutboundConnectionInitiator$Result$MessagingSuccess@53f7c87)
[coordinatorDS11] java.lang.AssertionError: Found 101 > 100
[coordinatorDS11] 	at org.apache.cassandra.net.OutboundConnection$1Initiate.onCompletedHandshake(OutboundConnection.java:1125)
[coordinatorDS11] 	at org.apache.cassandra.net.OutboundConnection$1Initiate.lambda$attempt$1(OutboundConnection.java:1234)
[coordinatorDS11] 	at org.apache.cassandra.net.AsyncPromise.invokeListener(AsyncPromise.java:364)

Copy link
Member Author

@michaeljmarshall michaeljmarshall Mar 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the gossiper calls MessagingService.instance().versions.reset(endpoint);. So perhaps this isn't code that is likely to be encountered. Either way, I am good to set the value now.

@michaeljmarshall
Copy link
Member Author

ConnectionTest.testCloseIfEndpointDown

This test passes locally for me, looking into it more.

After extensive review, we want to set this
version to ensure that we have agreement between
the connection itself and other parts of the code
that rely on the versions map to validate user
queries.

// Messaging versions are a kind of modifier, but they can only be applied once per setting, so they are broken
// out into a separate list.
static final List<Settings> SETTINGS = applyPowerSet(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The extra complexity of these settings appears to be an issue for some tests. In looking closer, I can see that the rpc timeout is affecting some, like testMessagePurging. I think it might also affect testCloseIfEndpointDown. For example, calling DatabaseDescriptor.setRpcTimeout(5000L);, speeds up these tests a bit. The test failures appear to be timeouts, so maybe there is something to look into here. I haven't seen legitimate failures yet.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For reference, the delay in testMessagePurging comes from this block:

void onFailure(Throwable cause)
{
if (cause instanceof ConnectException)
noSpamLogger.info("{} failed to connect", id(), cause);
else
noSpamLogger.error("{} failed to connect", id(), cause);
JVMStabilityInspector.inspectThrowable(cause);
if (hasPending())
{
Promise<Result<MessagingSuccess>> result = new AsyncPromise<>(eventLoop);
state = new Connecting(state.disconnected(), result, eventLoop.schedule(() -> attempt(result), max(100, retryRateMillis), MILLISECONDS));
retryRateMillis = min(1000, retryRateMillis * 2);
}
else
{
// this Initiate will be discarded
state = Disconnected.dormant(state.disconnected().maintenance);
}
}

@michaeljmarshall
Copy link
Member Author

@szymon-miezal - I see that the cassandra-dtest TestReplaceAddress.test_revive_endpoint was just added yesterday with datastax/cassandra-dtest#75. It is failing on this branch, which does change some things that could theoretically break the test. Do you know if this passes on main? A quick look at butler suggests it might not be, but I'm not familiar with all of the places to look.

This is an experimental commit
to see if it makes the testCloseIfEndpointDown
pass. That test has been failing
due to timeouts and it speeds up
the test locally. We'll see what
it does for CI.
@szymon-miezal
Copy link

@szymon-miezal - I see that the cassandra-dtest TestReplaceAddress.test_revive_endpoint was just added yesterday with datastax/cassandra-dtest#75. It is failing on this branch, which does change some things that could theoretically break the test. Do you know if this passes on main? A quick look at butler suggests it might not be, but I'm not familiar with all of the places to look.

It did pass in PR CI before I merged it. I will double-check it and get back to you. Thanks for raising it.

@szymon-miezal
Copy link

I have started https://jenkins-stargazer.aws.dsinternal.org/view/cc-builds/job/ds-cassandra-build/988/ to verify whether it passes on jenkins as I do not see any history for that test on main in butler. That's a bit weird. I thought these tests were executed daily - but on the other hand I do not know much about CC CI.

What I do know right now is that your branch does not contain a14cd51 without which the test will fail.

Copy link

@sbtourist sbtourist left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@szymon-miezal
Copy link

Confirmed that test_revive_endpoint passes on main, rebasing should be sufficient.

Copy link

@sbtourist sbtourist left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Retracting my approval until we figure out the test failure. Can't reproduce so far.

@michaeljmarshall
Copy link
Member Author

@szymon-miezal thank you for looking into it! I'll update the branch.

@sbtourist - I reproduced the failure once on Friday morning, then added more logging to see what might explain it and then it would not reproduce.

@sbtourist
Copy link

I can reproduce if I let it run indefinitely on repeat. I believe this is a race condition caused by:

  • Using the same endpointVersions instance across all connections, outbound and inbound.
  • Running the test in the same JVM where inbound and outbound endpoints are the same, and the MessagingService is shared.

More specifically, the race manifests if the following happens, in order:

  1. OutboundConnection sets settings.endpointToVersion for the endpoint to the agreed version.
  2. InboundConnectionInitiator#setupMessagingPipeline() resets it to the max version.

Now, I am not really sure why we do the latter, but the former seems required as explained previously.

Anyway, as previously noted, the issue should only manifest in the test, where we basically use the same MessagingService singleton across all connections: otherwise, the race above is irrelevant, hence we could simply remove the assertions added here. The alternative is to copy the endpointVersions passed to the outbound connection, rather than reusing the MessagingService ones, but is a code change warranted, given this is a test-only issue, and given this code is already fragile enough?

@sbtourist
Copy link

the issue should only manifest in the test, where we basically use the same MessagingService singleton across all connections: otherwise, the race above is irrelevant

I realized the above is not fully correct, or to be more specific, the problem is about having the same endpoint and the same set of EndpointMessagingVersions across inbound and outbound connections, so the race can also happen outside tests.

we could simply remove the assertions added here. The alternative is to copy the endpointVersions passed to the outbound connection

I implemented the latter and cannot reproduce with it, which confirms my hypothesis. I will push it.

…ionSettings, in order to avoid race conditions caused by otherwise sharing the same EndpointMessagingVersions object with InboundConnectionInitiator.
@sonarqubecloud
Copy link

@cassci-bot
Copy link

❌ Build ds-cassandra-pr-gate/PR-1621 rejected by Butler


1 new test failure(s) in 7 builds
See build details here


Found 1 new test failures

Test Explanation Branch history Upstream history
r.TestReplaceAddress.test_revive_endpoint regression 🔴🔴🔴🔴

Found 15 known test failures

Copy link

@jacek-lewandowski jacek-lewandowski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1;
nit: should we invalidate an entry in endpointsToVersion (which is done by Gossiper as pointed by Michael) on CNDB side when a service gets removed/disconnected?

@sbtourist
Copy link

Thanks @jacek-lewandowski.

should we invalidate an entry in endpointsToVersion (which is done by Gossiper as pointed by Michael) on CNDB side when a service gets removed/disconnected?

Not needed though as we update the map anyway?

@sbtourist sbtourist merged commit 979b792 into main Mar 11, 2025
466 of 479 checks passed
@sbtourist sbtourist deleted the cndb-13203 branch March 11, 2025 07:20
@szymon-miezal
Copy link

General question: do you know whether it's a problem that might affect other versions like OSS and potentially DSE as well?

@eolivelli
Copy link

General question: do you know whether it's a problem that might affect other versions like OSS and potentially DSE as well?

@szymon-miezal I suggest to ask on slack. More people will see your question

@sbtourist
Copy link

@szymon-miezal Good question. These bugs affect existing code, so OSS is definitely affected too. DSE has a very different messaging implementation but might also be affected by virtue of https://github.com/riptano/cndb/issues/13161. We need a better process to streamline changes across "products".

djatnieks pushed a commit that referenced this pull request Mar 11, 2025
Fixes riptano/cndb#13203

The underlying problem is that the `OutboundConnection` needs to store
the message version from the peer. Otherwise, when the local
`current_version` is higher than the remove `current_version`, we store
the wrong value (we were storing the local value, not the agreed upon
value).

The PR also fixes an issue in the ANN_OPTIONS serialization logic that
we can hit in rare cases where the peers haven't yet connected, so the
validation logic that would prevent serialization due to a peer having
too low of a message version is skipped.

This PR fixes several issues related to upgrades by improving handshake
logic.

---------

Co-authored-by: Sergio Bossa <[email protected]>
djatnieks pushed a commit that referenced this pull request May 18, 2025
Fixes riptano/cndb#13203

The underlying problem is that the `OutboundConnection` needs to store
the message version from the peer. Otherwise, when the local
`current_version` is higher than the remove `current_version`, we store
the wrong value (we were storing the local value, not the agreed upon
value).

The PR also fixes an issue in the ANN_OPTIONS serialization logic that
we can hit in rare cases where the peers haven't yet connected, so the
validation logic that would prevent serialization due to a peer having
too low of a message version is skipped.

This PR fixes several issues related to upgrades by improving handshake
logic.

---------

Co-authored-by: Sergio Bossa <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants