[CELEBORN-2219][CIP-14] Support PushMergedData in CppClient#3611
[CELEBORN-2219][CIP-14] Support PushMergedData in CppClient#3611afterincomparableyum wants to merge 3 commits intoapache:mainfrom
Conversation
|
@afterincomparableyum, could you please update the description of the pull request which should refer to the template of pull request? |
Implement PushMergedData functionality in the C++ client, enabling batch merging and pushing of shuffle data grouped by worker address.
Key changes:
- Add mergeData() and pushMergedData() to ShuffleClient, which accumulate
per partition data batches and push them as merged payloads when the
buffer threshold is exceeded or at mapper end.
- Introduce DataBatches class to manage batch accumulation with
thread-safe add/take operations and size-bounded requireBatches().
- Add PushMergedDataCallback to handle success responses (split handling,
congestion control, MAP_ENDED) and failure responses with revive-based
retry via submitRetryPushMergedData().
- Add PushMergedData network message type with encoding for
partitionUniqueIds and batchOffsets arrays.
- Extend Encoders with encode/decode support for vector<string> and
vector<int32_t>.
- Add pushMergedDataAsync() to TransportClient.
- Add unit tests for DataBatches, PushMergedData message encoding, and
array encoders.
Tested through running unit tests and compiling locally.
8149bb9 to
a234845
Compare
|
ping @SteNicholas @FMX @RexXiong @HolyLow for review. Thank you for your review too. |
|
@afterincomparableyum, the description of this pull request does not follow the template of the pull request yet. Please update the description. |
There was a problem hiding this comment.
Pull request overview
This PR adds PushMergedData support to the C++ Celeborn client by buffering per-partition batches grouped by worker address, merging buffered payloads, and pushing them as a single request (with split/congestion/MAP_ENDED handling and revive-based retries).
Changes:
- Add array encode/decode helpers in
protocol::Encodersand tests for vector/vector<int32_t>. - Introduce
network::PushMergedDatamessage +TransportClient::pushMergedDataAsync()and associated message tests. - Add
DataBatches+PushMergedDataCallbackand integrate merged pushing intoShuffleClientImpl(flush at mapper end) with new unit tests.
Reviewed changes
Copilot reviewed 21 out of 21 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
| cpp/celeborn/protocol/Encoders.h | Adds declarations for vector/vector<int32_t> encoding helpers |
| cpp/celeborn/protocol/Encoders.cpp | Implements array encodedLength/encode/decode helpers |
| cpp/celeborn/protocol/tests/EncodersTest.cpp | Adds unit tests for the new array encoders |
| cpp/celeborn/network/Message.h | Introduces PushMergedData message class |
| cpp/celeborn/network/Message.cpp | Implements PushMergedData encoding and copy ctor |
| cpp/celeborn/network/MessageDispatcher.cpp | Ensures dispatcher extracts requestId for PUSH_MERGED_DATA |
| cpp/celeborn/network/TransportClient.h | Adds pushMergedDataAsync() API |
| cpp/celeborn/network/TransportClient.cpp | Implements async send path for PushMergedData |
| cpp/celeborn/network/tests/MessageTest.cpp | Adds encoding/copy-constructor tests for PushMergedData |
| cpp/celeborn/client/writer/DataBatches.h | New thread-safe batch accumulator abstraction |
| cpp/celeborn/client/writer/DataBatches.cpp | Implements batch accumulation and size-bounded draining |
| cpp/celeborn/client/writer/PushMergedDataCallback.h | New callback for PushMergedData response/failure handling |
| cpp/celeborn/client/writer/PushMergedDataCallback.cpp | Implements split parsing, congestion control, revive-based retry |
| cpp/celeborn/client/writer/PushState.h | Adds batch-buffering map and merge-related APIs |
| cpp/celeborn/client/writer/PushState.cpp | Implements buffering/take operations for merged pushing |
| cpp/celeborn/client/ShuffleClient.h | Adds mergeData() + pushMergedData() interface and impl hooks |
| cpp/celeborn/client/ShuffleClient.cpp | Implements merge/buffer/flush/push logic and retry regrouping |
| cpp/celeborn/client/tests/DataBatchesTest.cpp | Adds unit tests for DataBatches behavior |
| cpp/celeborn/client/tests/CelebornInputStreamRetryTest.cpp | Updates test stub to satisfy new ShuffleClient interface |
| cpp/celeborn/client/tests/CMakeLists.txt | Adds DataBatchesTest to client test target |
| cpp/celeborn/client/CMakeLists.txt | Adds new writer sources to the client library |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
722dae1 to
85022fa
Compare
|
Resolved comments @SteNicholas, and left a couple of TODOs in this PR as well based off of some of the comments from CoPilot. |
4eef732 to
5d5c4b5
Compare
|
@SteNicholas wanted to follow up and see if you had a chance to look over the PR again. Thanks! |
|
It's an interesting PR, let me watch it. |
|
@SteNicholas you might notice some small gaps between java and c++ implementation of this. that’s mainly because SlowStartPushStrategy has not been implemented yet and PushExcludedWorker has not been implemented yet too. I’ll add some detailed TODO comments to document what I need to do for this as well after I double check what needs to be done. |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 22 out of 22 changed files in this pull request and generated 8 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| static_cast<size_t>(count) * sizeof(int32_t), | ||
| buffer.remainingSize(), |
| const std::vector<std::string>& arr) { | ||
| buffer.write<int>(static_cast<int>(arr.size())); | ||
| for (const auto& s : arr) { | ||
| encode(buffer, s); | ||
| } |
| return; | ||
| } | ||
|
|
||
| if (response->remainingSize() <= 0) { |
| case protocol::StatusCode::MAP_ENDED: { | ||
| auto mapperEndSet = sharedClient->mapperEndSets().computeIfAbsent( | ||
| shuffleId_, | ||
| []() { return std::make_shared<utils::ConcurrentHashSet<int>>(); }); | ||
| mapperEndSet->insert(mapId_); | ||
| pushState_->removeBatch(groupedBatchId_, hostAndPushPort_); | ||
| break; |
| default: { | ||
| LOG(WARNING) << "unhandled PushMergedData success StatusCode: " << reason; | ||
| pushState_->removeBatch(groupedBatchId_, hostAndPushPort_); | ||
| } |
| auto host = hostAndPushPort.substr(0, hostAndPushPort.find(':')); | ||
| auto portStr = hostAndPushPort.substr(hostAndPushPort.find(':') + 1); | ||
| auto port = static_cast<uint16_t>(std::stoi(portStr)); |
| int count = buffer.read<int>(); | ||
| CELEBORN_CHECK_GE(count, 0, "Invalid string array count: {}", count); | ||
| CELEBORN_CHECK_LE( | ||
| static_cast<size_t>(count) * sizeof(int), | ||
| buffer.remainingSize(), | ||
| "String array count {} exceeds remaining buffer size {}", | ||
| count, | ||
| buffer.remainingSize()); |
| auto result = batchesMap_.get(addressPairKey); | ||
| if (result.has_value()) { | ||
| batchesMap_.erase(addressPairKey); | ||
| return result.value(); |
|
I will take a look at comments soon |
What changes were proposed in this pull request?
Implement PushMergedData functionality in the C++ client, enabling batch merging and pushing of shuffle data grouped by worker address.
Key changes:
Why are the changes needed?
This is needed to extend the functionality of C++ Client and there is a Bolt dependency on it with bytedance/bolt#370
Does this PR resolve a correctness bug?
No
Does this PR introduce any user-facing change?
Yes because it is a new functionality in the c++ client
How was this patch tested?
Tested through running unit tests and compiling locally.