Skip to content

[CELEBORN-2219][CIP-14] Support PushMergedData in CppClient#3611

Open
afterincomparableyum wants to merge 3 commits intoapache:mainfrom
afterincomparableyum:cpp-client/celeborn-2219
Open

[CELEBORN-2219][CIP-14] Support PushMergedData in CppClient#3611
afterincomparableyum wants to merge 3 commits intoapache:mainfrom
afterincomparableyum:cpp-client/celeborn-2219

Conversation

@afterincomparableyum
Copy link
Contributor

@afterincomparableyum afterincomparableyum commented Mar 2, 2026

What changes were proposed in this pull request?

Implement PushMergedData functionality in the C++ client, enabling batch merging and pushing of shuffle data grouped by worker address.

Key changes:

  • Add mergeData() and pushMergedData() to ShuffleClient, which accumulateper partition data batches and push them as merged payloads when the buffer threshold is exceeded or at mapper end.
  • Introduce DataBatches class to manage batch accumulation thread-safe add/take operations and size-bounded requireBatches().
  • Add PushMergedDataCallback to handle success responses (split handling, congestion control, MAP_ENDED) and failure responses with revive-based retry via submitRetryPushMergedData().
  • Add PushMergedData network message type with encoding for partitionUniqueIds and batchOffsets arrays.
  • Extend Encoders with encode/decode support for vector and vector<int32_t>.
  • Add pushMergedDataAsync() to TransportClient.
  • Add unit tests for DataBatches, PushMergedData message encoding, and array encoders.

Why are the changes needed?

This is needed to extend the functionality of C++ Client and there is a Bolt dependency on it with bytedance/bolt#370

Does this PR resolve a correctness bug?

No

Does this PR introduce any user-facing change?

Yes because it is a new functionality in the c++ client

How was this patch tested?

Tested through running unit tests and compiling locally.

@SteNicholas
Copy link
Member

SteNicholas commented Mar 2, 2026

@afterincomparableyum, could you please update the description of the pull request which should refer to the template of pull request?

Implement PushMergedData functionality in the C++ client, enabling batch merging and pushing of shuffle data grouped by worker address.

Key changes:
  - Add mergeData() and pushMergedData() to ShuffleClient, which accumulate
    per partition data batches and push them as merged payloads when the
    buffer threshold is exceeded or at mapper end.
  - Introduce DataBatches class to manage batch accumulation with
    thread-safe add/take operations and size-bounded requireBatches().
  - Add PushMergedDataCallback to handle success responses (split handling,
    congestion control, MAP_ENDED) and failure responses with revive-based
    retry via submitRetryPushMergedData().
  - Add PushMergedData network message type with encoding for
    partitionUniqueIds and batchOffsets arrays.
  - Extend Encoders with encode/decode support for vector<string> and
    vector<int32_t>.
  - Add pushMergedDataAsync() to TransportClient.
  - Add unit tests for DataBatches, PushMergedData message encoding, and
    array encoders.

Tested through running unit tests and compiling locally.
@afterincomparableyum afterincomparableyum changed the title [WIP][CELEBORN-2219][CIP-14] Support PushMergedData in CppClient [CELEBORN-2219][CIP-14] Support PushMergedData in CppClient Mar 9, 2026
@afterincomparableyum afterincomparableyum marked this pull request as ready for review March 9, 2026 03:32
@afterincomparableyum
Copy link
Contributor Author

ping @SteNicholas @FMX @RexXiong @HolyLow for review. Thank you for your review too.

@SteNicholas
Copy link
Member

@afterincomparableyum, the description of this pull request does not follow the template of the pull request yet. Please update the description.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds PushMergedData support to the C++ Celeborn client by buffering per-partition batches grouped by worker address, merging buffered payloads, and pushing them as a single request (with split/congestion/MAP_ENDED handling and revive-based retries).

Changes:

  • Add array encode/decode helpers in protocol::Encoders and tests for vector/vector<int32_t>.
  • Introduce network::PushMergedData message + TransportClient::pushMergedDataAsync() and associated message tests.
  • Add DataBatches + PushMergedDataCallback and integrate merged pushing into ShuffleClientImpl (flush at mapper end) with new unit tests.

Reviewed changes

Copilot reviewed 21 out of 21 changed files in this pull request and generated 10 comments.

Show a summary per file
File Description
cpp/celeborn/protocol/Encoders.h Adds declarations for vector/vector<int32_t> encoding helpers
cpp/celeborn/protocol/Encoders.cpp Implements array encodedLength/encode/decode helpers
cpp/celeborn/protocol/tests/EncodersTest.cpp Adds unit tests for the new array encoders
cpp/celeborn/network/Message.h Introduces PushMergedData message class
cpp/celeborn/network/Message.cpp Implements PushMergedData encoding and copy ctor
cpp/celeborn/network/MessageDispatcher.cpp Ensures dispatcher extracts requestId for PUSH_MERGED_DATA
cpp/celeborn/network/TransportClient.h Adds pushMergedDataAsync() API
cpp/celeborn/network/TransportClient.cpp Implements async send path for PushMergedData
cpp/celeborn/network/tests/MessageTest.cpp Adds encoding/copy-constructor tests for PushMergedData
cpp/celeborn/client/writer/DataBatches.h New thread-safe batch accumulator abstraction
cpp/celeborn/client/writer/DataBatches.cpp Implements batch accumulation and size-bounded draining
cpp/celeborn/client/writer/PushMergedDataCallback.h New callback for PushMergedData response/failure handling
cpp/celeborn/client/writer/PushMergedDataCallback.cpp Implements split parsing, congestion control, revive-based retry
cpp/celeborn/client/writer/PushState.h Adds batch-buffering map and merge-related APIs
cpp/celeborn/client/writer/PushState.cpp Implements buffering/take operations for merged pushing
cpp/celeborn/client/ShuffleClient.h Adds mergeData() + pushMergedData() interface and impl hooks
cpp/celeborn/client/ShuffleClient.cpp Implements merge/buffer/flush/push logic and retry regrouping
cpp/celeborn/client/tests/DataBatchesTest.cpp Adds unit tests for DataBatches behavior
cpp/celeborn/client/tests/CelebornInputStreamRetryTest.cpp Updates test stub to satisfy new ShuffleClient interface
cpp/celeborn/client/tests/CMakeLists.txt Adds DataBatchesTest to client test target
cpp/celeborn/client/CMakeLists.txt Adds new writer sources to the client library

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

You can also share your feedback on Copilot code review. Take the survey.

@afterincomparableyum
Copy link
Contributor Author

Resolved comments @SteNicholas, and left a couple of TODOs in this PR as well based off of some of the comments from CoPilot.

@afterincomparableyum
Copy link
Contributor Author

@SteNicholas wanted to follow up and see if you had a chance to look over the PR again. Thanks!

@FMX
Copy link
Contributor

FMX commented Mar 17, 2026

It's an interesting PR, let me watch it.

@afterincomparableyum
Copy link
Contributor Author

@SteNicholas you might notice some small gaps between java and c++ implementation of this.

that’s mainly because SlowStartPushStrategy has not been implemented yet and PushExcludedWorker has not been implemented yet too.

I’ll add some detailed TODO comments to document what I need to do for this as well after I double check what needs to be done.

@SteNicholas SteNicholas requested a review from Copilot March 19, 2026 09:49
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 22 out of 22 changed files in this pull request and generated 8 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

You can also share your feedback on Copilot code review. Take the survey.

Comment on lines +89 to +90
static_cast<size_t>(count) * sizeof(int32_t),
buffer.remainingSize(),
Comment on lines +48 to +52
const std::vector<std::string>& arr) {
buffer.write<int>(static_cast<int>(arr.size()));
for (const auto& s : arr) {
encode(buffer, s);
}
return;
}

if (response->remainingSize() <= 0) {
Comment on lines +103 to +109
case protocol::StatusCode::MAP_ENDED: {
auto mapperEndSet = sharedClient->mapperEndSets().computeIfAbsent(
shuffleId_,
[]() { return std::make_shared<utils::ConcurrentHashSet<int>>(); });
mapperEndSet->insert(mapId_);
pushState_->removeBatch(groupedBatchId_, hostAndPushPort_);
break;
Comment on lines +267 to +270
default: {
LOG(WARNING) << "unhandled PushMergedData success StatusCode: " << reason;
pushState_->removeBatch(groupedBatchId_, hostAndPushPort_);
}
Comment on lines +530 to +532
auto host = hostAndPushPort.substr(0, hostAndPushPort.find(':'));
auto portStr = hostAndPushPort.substr(hostAndPushPort.find(':') + 1);
auto port = static_cast<uint16_t>(std::stoi(portStr));
Comment on lines +56 to +63
int count = buffer.read<int>();
CELEBORN_CHECK_GE(count, 0, "Invalid string array count: {}", count);
CELEBORN_CHECK_LE(
static_cast<size_t>(count) * sizeof(int),
buffer.remainingSize(),
"String array count {} exceeds remaining buffer size {}",
count,
buffer.remainingSize());
Comment on lines +253 to +256
auto result = batchesMap_.get(addressPairKey);
if (result.has_value()) {
batchesMap_.erase(addressPairKey);
return result.value();
@afterincomparableyum
Copy link
Contributor Author

I will take a look at comments soon

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants