Conversation
|
+CC @mridulm @FMX @SteNicholas PTAL. I need to add this in Spark as well for parity |
afterincomparableyum
left a comment
There was a problem hiding this comment.
overall lgtm, just a couple suggestions I have
.../src/test/scala/org/apache/celeborn/service/deploy/cluster/SslClusterReadWriteLeakTest.scala
Outdated
Show resolved
Hide resolved
.../src/test/scala/org/apache/celeborn/service/deploy/cluster/SslClusterReadWriteLeakTest.scala
Outdated
Show resolved
Hide resolved
|
updated @afterincomparableyum |
mridulm
left a comment
There was a problem hiding this comment.
Looks good to me.
Will keep it open to allow others to review too.
|
Can you retrigger the tests ? A quick look indicated the text failures are unrelated though |
afterincomparableyum
left a comment
There was a problem hiding this comment.
thanks for addressing my comments, lgtm
Seems like you already triggered it, and all the tests are passing now :) |
...on/src/main/java/org/apache/celeborn/common/network/protocol/EncryptedMessageWithHeader.java
Show resolved
Hide resolved
...on/src/main/java/org/apache/celeborn/common/network/protocol/EncryptedMessageWithHeader.java
Show resolved
Hide resolved
.../src/test/scala/org/apache/celeborn/service/deploy/cluster/SslClusterReadWriteLeakTest.scala
Outdated
Show resolved
Hide resolved
SteNicholas
left a comment
There was a problem hiding this comment.
@akpatnam25, thanks for fix. I left some comments for this pull request. PTAL.
BTW, could this pull request be merged after Spark repository merged? Because EncryptedMessageWithHeader is based on EncryptedMessageWithHeader of Spark.
|
Sounds good, let me first post this PR in spark. After that is merged, I will make any necessary changes here, and we can merge here too. +CC @SteNicholas @mridulm |
|
created apache/spark#54894 |
|
@akpatnam25, could you take a look at the latest comments? |
updated @SteNicholas |
|
@akpatnam25, thanks for update. I left the following review feedback of claude code. BTW, please also take a look at the review comments of coiplot. |
There was a problem hiding this comment.
Pull request overview
Fixes a Netty ByteBuf reference-count leak on TLS-enabled transports by aligning the SSL encoding path’s ownership / release behavior with the non-SSL encoder, and adds regression coverage.
Changes:
- Update
SslMessageEncoderto always wrap payload-bearing messages inEncryptedMessageWithHeaderso the originatingManagedBuffercan be released on completion. - Extend
EncryptedMessageWithHeaderto supportByteBufbodies and to release both the body and the originatingManagedBufferonclose(). - Add unit + mini-cluster tests intended to catch TLS-related
ByteBufleaks.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
common/src/main/java/org/apache/celeborn/common/network/protocol/SslMessageEncoder.java |
Switches SSL encode path to route payloads through EncryptedMessageWithHeader for proper release semantics. |
common/src/main/java/org/apache/celeborn/common/network/protocol/EncryptedMessageWithHeader.java |
Adds ByteBuf body support and ensures close() releases the body + managed buffer. |
common/src/test/java/org/apache/celeborn/common/network/protocol/SslMessageEncoderSuiteJ.java |
New regression test verifying NettyManagedBuffer refCnt returns to 0 after SSL encoding + close. |
common/src/test/java/org/apache/celeborn/common/network/protocol/EncryptedMessageWithHeaderSuiteJ.java |
Updates tests to validate new ByteBuf body behavior and refcount handling. |
worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/SslClusterReadWriteLeakSuite.scala |
New TLS mini-cluster integration test attempting to assert no Netty leak reports during push/replicate/fetch. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
common/src/main/java/org/apache/celeborn/common/network/protocol/SslMessageEncoder.java
Show resolved
Hide resolved
...src/test/scala/org/apache/celeborn/service/deploy/cluster/SslClusterReadWriteLeakSuite.scala
Show resolved
Hide resolved
...src/test/scala/org/apache/celeborn/service/deploy/cluster/SslClusterReadWriteLeakSuite.scala
Show resolved
Hide resolved
...src/test/scala/org/apache/celeborn/service/deploy/cluster/SslClusterReadWriteLeakSuite.scala
Outdated
Show resolved
Hide resolved
|
sorry for the delay on this PR. I will take a look again later today |
|
@SteNicholas I updated the PR |
What changes were proposed in this pull request?
While running jobs with TLS enabled, we encountered memory leaks which cause worker OOMs.
When a Celeborn worker receives a PushData or PushMergedData message, it replicates that frame to a secondary worker for fault tolerance. On an SSL-enabled cluster this replication goes through SslMessageEncoder.encode(). Here is the flow of what happens inside SslMessageEncoder.encode():
The encoder asks the message body for an SSL-friendly copy by calling convertToNettyForSsl(). For shuffle data, the body is a NettyManagedBuffer — data already loaded in off-heap memory. This call runs buf.duplicate().retain(), which creates a second reference to the same memory and increments the reference count from 1 to 2.
The encoder places this second reference inside a composite buffer and hands it to Netty for writing.
Netty writes the composite to the network, then releases it — decrementing the count from 2 to 1.
Nothing releases the original NettyManagedBuffer's hold on the data, so the count stays at 1 forever.
This results in every replicated PushData frame leaking a chunk of off-heap memory, eventually causing OOM and worker crash.
The fix for this issue is to release the original message body, so that the net reference count is preserved. The second reference — now living inside the composite buffer in out — keeps the memory alive while Netty writes it to the network. When Netty finishes and releases the composite, the count reaches 0 and the memory is freed cleanly.
This is exactly what the non-SSL MessageEncoder already does via MessageWithHeader.deallocate() — the SSL path simply needed to replicate that behavior explicitly.
Why are the changes needed?
fix memory leak
Does this PR resolve a correctness bug?
Does this PR introduce any user-facing change?
no
How was this patch tested?
already internally in production and tested.
Also added unit tests