Skip to content

[CELEBORN-2284] Fix TLS Memory Leak#3630

Open
akpatnam25 wants to merge 7 commits intoapache:mainfrom
akpatnam25:CELEBORN-2284
Open

[CELEBORN-2284] Fix TLS Memory Leak#3630
akpatnam25 wants to merge 7 commits intoapache:mainfrom
akpatnam25:CELEBORN-2284

Conversation

@akpatnam25
Copy link
Contributor

@akpatnam25 akpatnam25 commented Mar 16, 2026

What changes were proposed in this pull request?

While running jobs with TLS enabled, we encountered memory leaks which cause worker OOMs.

26/02/13 21:02:52,779 ERROR [push-server-9-9] ResourceLeakDetector: LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records:
Created at:
	io.netty.buffer.AbstractByteBufAllocator.compositeDirectBuffer(AbstractByteBufAllocator.java:224)
	io.netty.buffer.AbstractByteBufAllocator.compositeBuffer(AbstractByteBufAllocator.java:202)
	org.apache.celeborn.common.network.util.TransportFrameDecoder.decodeNext(TransportFrameDecoder.java:143)
	org.apache.celeborn.common.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:66)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1475)
	io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1338)
	io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1387)
	io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529)
	io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
	io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
	io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
	io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
	io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:840)

When a Celeborn worker receives a PushData or PushMergedData message, it replicates that frame to a secondary worker for fault tolerance. On an SSL-enabled cluster this replication goes through SslMessageEncoder.encode(). Here is the flow of what happens inside SslMessageEncoder.encode():

  • The encoder asks the message body for an SSL-friendly copy by calling convertToNettyForSsl(). For shuffle data, the body is a NettyManagedBuffer — data already loaded in off-heap memory. This call runs buf.duplicate().retain(), which creates a second reference to the same memory and increments the reference count from 1 to 2.

  • The encoder places this second reference inside a composite buffer and hands it to Netty for writing.

  • Netty writes the composite to the network, then releases it — decrementing the count from 2 to 1.

  • Nothing releases the original NettyManagedBuffer's hold on the data, so the count stays at 1 forever.

  • This results in every replicated PushData frame leaking a chunk of off-heap memory, eventually causing OOM and worker crash.

The fix for this issue is to release the original message body, so that the net reference count is preserved. The second reference — now living inside the composite buffer in out — keeps the memory alive while Netty writes it to the network. When Netty finishes and releases the composite, the count reaches 0 and the memory is freed cleanly.

This is exactly what the non-SSL MessageEncoder already does via MessageWithHeader.deallocate() — the SSL path simply needed to replicate that behavior explicitly.

Why are the changes needed?

fix memory leak

Does this PR resolve a correctness bug?

Does this PR introduce any user-facing change?

no

How was this patch tested?

already internally in production and tested.
Also added unit tests

@akpatnam25 akpatnam25 marked this pull request as ready for review March 16, 2026 21:45
@akpatnam25
Copy link
Contributor Author

+CC @mridulm @FMX @SteNicholas PTAL. I need to add this in Spark as well for parity

Copy link
Contributor

@afterincomparableyum afterincomparableyum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall lgtm, just a couple suggestions I have

@akpatnam25
Copy link
Contributor Author

updated @afterincomparableyum

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me.
Will keep it open to allow others to review too.

@mridulm
Copy link
Contributor

mridulm commented Mar 17, 2026

Can you retrigger the tests ? A quick look indicated the text failures are unrelated though

Copy link
Contributor

@afterincomparableyum afterincomparableyum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for addressing my comments, lgtm

@akpatnam25
Copy link
Contributor Author

Can you retrigger the tests ? A quick look indicated the text failures are unrelated though

Seems like you already triggered it, and all the tests are passing now :)

Copy link
Member

@SteNicholas SteNicholas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@akpatnam25, thanks for fix. I left some comments for this pull request. PTAL.
BTW, could this pull request be merged after Spark repository merged? Because EncryptedMessageWithHeader is based on EncryptedMessageWithHeader of Spark.

@akpatnam25
Copy link
Contributor Author

Sounds good, let me first post this PR in spark. After that is merged, I will make any necessary changes here, and we can merge here too. +CC @SteNicholas @mridulm

@akpatnam25
Copy link
Contributor Author

created apache/spark#54894

@SteNicholas
Copy link
Member

@akpatnam25, could you take a look at the latest comments?

@akpatnam25
Copy link
Contributor Author

@akpatnam25, could you take a look at the latest comments?

updated @SteNicholas

@SteNicholas SteNicholas requested a review from Copilot March 19, 2026 09:37
@SteNicholas
Copy link
Member

SteNicholas commented Mar 19, 2026

@akpatnam25, thanks for update. I left the following review feedback of claude code. BTW, please also take a look at the review comments of coiplot.

  ---                                                                                                                                                                                                                                                                          
  Code Review: PR #3630 - Apache Celeborn                                                                                                                                                                                                                                      
                                                                                                                                                                                                                                                                               
  Overview                                                                                                                                                                                                                                                                     

  This PR fixes a ByteBuf memory leak in SslMessageEncoder when handling messages with NettyManagedBuffer bodies. The fix involves:                                                                                                                                            
                                                                                                                                                                                                                                                                               
  1. Adding ByteBuf support to EncryptedMessageWithHeader (previously only supported InputStream and ChunkedStream)                                                                                                                                                            
  2. Simplifying SslMessageEncoder.encode() to always use EncryptedMessageWithHeader for bodies                                                                                                                                                                                
  3. Properly releasing the body buffer in close() using ReferenceCountUtil.release(body)                                                                                                                                                                                      
  4. Adding comprehensive unit and integration tests                                                                                                                                                                                                                           

  ---
  Code Quality Analysis

  ✅ Strengths

  1. Root cause properly addressed: The memory leak was caused by not releasing the body ByteBuf reference. The fix adds ReferenceCountUtil.release(body) in close().
  2. Good test coverage:
    - Unit test for EncryptedMessageWithHeader with ByteBuf body
    - Unit test for SslMessageEncoder reference counting
    - Integration test (SslClusterReadWriteLeakSuite) with PARANOID leak detection
  3. Clean simplification: Removing the conditional logic in SslMessageEncoder.encode() makes the code easier to maintain.

  ⚠️  Potential Issues

  1. Double-release risk in EncryptedMessageWithHeader.readChunk() (lines 91-94)

  if (body instanceof ByteBuf) {
    ByteBuf bodyBuf = (ByteBuf) body;
    totalBytesTransferred = headerLength + bodyLength;
    return Unpooled.wrappedBuffer(header.retain(), bodyBuf.retain());
  }

  The body is retained here, and then released again in close(). This is correct, but the close() method now releases body unconditionally:

  public void close() throws Exception {
    header.release();
    ReferenceCountUtil.release(body);  // Releases body regardless of type
    if (managedBuffer != null) {
      managedBuffer.release();
    }
  }

  For ByteBuf bodies, this works correctly because:
  - readChunk() does bodyBuf.retain()
  - close() does ReferenceCountUtil.release(body)

  However, for InputStream and ChunkedStream bodies, ReferenceCountUtil.release() will silently do nothing (since they're not ReferenceCounted). This is acceptable but could be clearer with a comment.

  2. Missing null check in close()

  If body is null, ReferenceCountUtil.release(null) is safe (it's a no-op), but an explicit check would be more defensive.

  3. Test assertion gap in SslMessageEncoderSuiteJ

  assertEquals(2, bodyBuf.refCnt());

  The comment explains why refCnt is 2, but this is testing implementation details of convertToNettyForSsl(). If that method changes, the test could fail even if the leak fix still works.

  ---
  Specific Suggestions

  1. Add a comment in close() explaining the release strategy:
  // Release body if it's a ByteBuf (InputStream/ChunkedStream are not ReferenceCounted)
  ReferenceCountUtil.release(body);

  2. Consider defensive null check:
  if (body != null) {
    ReferenceCountUtil.release(body);
  }

  3. The integration test is excellent but consider:
    - Adding a log statement showing reportedLeaks.get() if non-zero for debugging
    - The 2-second sleep before shutdown could potentially be flaky; consider using a more deterministic wait

  ---
  Security Considerations

  No security concerns. This is a memory management fix that prevents resource exhaustion.

  ---
  Performance Implications

  - Positive: Fixes memory leak that could cause OOM under load
  - Neutral: Unpooled.wrappedBuffer() creates a composite buffer efficiently without copying

  ---
  Summary

  ┌──────────────────┬───────────────────────────────────┐
  │      Aspect      │              Rating               │
  ├──────────────────┼───────────────────────────────────┤
  │ Code Correctness │ ✅ Good                           │
  ├──────────────────┼───────────────────────────────────┤
  │ Test Coverage    │ ✅ Excellent                      │
  ├──────────────────┼───────────────────────────────────┤
  │ Code Style       │ ✅ Clean                          │
  ├──────────────────┼───────────────────────────────────┤
  │ Documentation    │ ⚠️  Could use more inline comments │
  ├──────────────────┼───────────────────────────────────┤
  │ Risk Level       │ Low                               │
  └──────────────────┴───────────────────────────────────┘

  Recommendation: Approve with minor suggestions. The fix correctly addresses the memory leak, and the comprehensive test coverage (including PARANOID leak detection integration test) gives high confidence in the solution.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Fixes a Netty ByteBuf reference-count leak on TLS-enabled transports by aligning the SSL encoding path’s ownership / release behavior with the non-SSL encoder, and adds regression coverage.

Changes:

  • Update SslMessageEncoder to always wrap payload-bearing messages in EncryptedMessageWithHeader so the originating ManagedBuffer can be released on completion.
  • Extend EncryptedMessageWithHeader to support ByteBuf bodies and to release both the body and the originating ManagedBuffer on close().
  • Add unit + mini-cluster tests intended to catch TLS-related ByteBuf leaks.

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
common/src/main/java/org/apache/celeborn/common/network/protocol/SslMessageEncoder.java Switches SSL encode path to route payloads through EncryptedMessageWithHeader for proper release semantics.
common/src/main/java/org/apache/celeborn/common/network/protocol/EncryptedMessageWithHeader.java Adds ByteBuf body support and ensures close() releases the body + managed buffer.
common/src/test/java/org/apache/celeborn/common/network/protocol/SslMessageEncoderSuiteJ.java New regression test verifying NettyManagedBuffer refCnt returns to 0 after SSL encoding + close.
common/src/test/java/org/apache/celeborn/common/network/protocol/EncryptedMessageWithHeaderSuiteJ.java Updates tests to validate new ByteBuf body behavior and refcount handling.
worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/SslClusterReadWriteLeakSuite.scala New TLS mini-cluster integration test attempting to assert no Netty leak reports during push/replicate/fetch.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

You can also share your feedback on Copilot code review. Take the survey.

@akpatnam25
Copy link
Contributor Author

sorry for the delay on this PR. I will take a look again later today

@akpatnam25
Copy link
Contributor Author

@SteNicholas I updated the PR

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants