-
Notifications
You must be signed in to change notification settings - Fork 308
HPCC-34816 Add support for io_iring to jlib and roxie sender #20406
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: candidate-10.0.x
Are you sure you want to change the base?
Conversation
Signed-off-by: Gavin Halliday <[email protected]>
Jira Issue: https://hpccsystems.atlassian.net//browse/HPCC-34816 Jirabot Action Result: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This pull request adds support for io_uring to jlib and roxie sender, implementing asynchronous I/O operations for improved performance on Linux systems. The changes introduce a new IAsyncProcessor interface for handling asynchronous operations and integrate it with the existing socket communication infrastructure.
Key Changes:
- Added io_uring support through new IAsyncProcessor interface and URingProcessor implementation
- Enhanced CTcpSender with asynchronous write capabilities and state machine management
- Integrated io_uring support into Roxie's TCP sender with configurable enablement
Reviewed Changes
Copilot reviewed 22 out of 22 changed files in this pull request and generated 5 comments.
Show a summary per file
File | Description |
---|---|
vcpkg.json.in | Added liburing dependency for Linux platforms |
system/jlib/jiouring.hpp | New interface definitions for async processor and callback |
system/jlib/jiouring.cpp | Complete io_uring implementation with threaded/unthreaded variants |
system/jlib/CMakeLists.txt | Build configuration for liburing dependency |
system/jlib/jmutex.hpp | Added tryEnter() method to CriticalSection classes |
system/jlib/jlib.hpp | Added common size constants (oneKB, oneMB, oneGB) |
system/jlib/jstats.cpp | Updated to use new size constants |
system/jlib/jfile.hpp | Updated size constants and includes |
system/jlib/jencrypt.cpp | Improved error handling with OpenSSL error details |
system/security/securesocket/socketutils.hpp | Enhanced CSocketTarget with async write support and state machine |
system/security/securesocket/socketutils.cpp | Implemented async write functionality with queuing and reconnection logic |
roxie/udplib/tcptrs.cpp | Integrated io_uring support with Roxie TCP sender |
roxie/roxiemem/roxiemem.cpp | Added memory registration for io_uring fixed buffers |
roxie/ccd/ccdqueue.cpp | Added configuration option for io_uring usage |
testing/unittests/jlibtests2.cpp | Added comprehensive unit tests for io_uring functionality |
for (;;) | ||
{ | ||
io_uring_sqe *sqe = io_uring_get_sqe(&ring); | ||
if (sqe) | ||
return sqe; | ||
// MORE: If the buffer is full we need to wait for some completions and try again | ||
MilliSleep(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The busy-wait loop with MilliSleep(1) is inefficient and could cause unnecessary CPU usage. Consider implementing a proper blocking mechanism or exponential backoff strategy instead of a fixed 1ms sleep.
for (;;) | |
{ | |
io_uring_sqe *sqe = io_uring_get_sqe(&ring); | |
if (sqe) | |
return sqe; | |
// MORE: If the buffer is full we need to wait for some completions and try again | |
MilliSleep(1); | |
unsigned sleepMs = 1; | |
const unsigned maxSleepMs = 64; | |
for (;;) | |
{ | |
io_uring_sqe *sqe = io_uring_get_sqe(&ring); | |
if (sqe) | |
return sqe; | |
// Exponential backoff to reduce CPU usage while waiting for an available SQE | |
MilliSleep(sleepMs); | |
if (sleepMs < maxSleepMs) | |
sleepMs *= 2; |
Copilot uses AI. Check for mistakes.
//MORE: Check that the buffer does not overlap a 1GB boundary | ||
const byte * start = static_cast<const byte *>(buf); | ||
return (start >= startLockedMemory) && (start + len <= endLockedMemory); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment indicates that buffer boundary checking is missing, but the code doesn't validate that buffers don't cross 1GB boundaries as required by io_uring fixed buffer operations. This could cause runtime failures when buffers span multiple registered memory blocks.
//MORE: Check that the buffer does not overlap a 1GB boundary | |
const byte * start = static_cast<const byte *>(buf); | |
return (start >= startLockedMemory) && (start + len <= endLockedMemory); | |
// Ensure the buffer does not overlap a 1GB boundary | |
const byte * start = static_cast<const byte *>(buf); | |
const byte * end = start + len - 1; | |
// Check that start and end are within the same 1GB block | |
bool withinSameBlock = (((memsize_t)start) & ~(oneGB - 1)) == (((memsize_t)end) & ~(oneGB - 1)); | |
return (start >= startLockedMemory) && (end < endLockedMemory) && withinSameBlock; |
Copilot uses AI. Check for mistakes.
} | ||
else | ||
{ | ||
OERRLOG("Short write on socket %u of %zu", result, request.len); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The format specifiers are incorrect. result
is an int
but uses %u
(unsigned), and request.len
is size_t
but uses %zu
which may not be supported on all platforms. Use %d
for result
and cast request.len
to unsigned
with %u
.
OERRLOG("Short write on socket %u of %zu", result, request.len); | |
OERRLOG("Short write on socket %d of %u", result, (unsigned)request.len); |
Copilot uses AI. Check for mistakes.
for(;;) | ||
{ | ||
//If reconnecting, drop the packets until the connection has succeeded. | ||
if (unlikely((state == State::Aborting) || (state == State::Reconnecting))) | ||
{ | ||
sender.releaseBuffer(ownedBuffer); | ||
return; | ||
} | ||
|
||
//Ensure there is space to record the next item to send.. | ||
//Maybe this should always expand rather than block??? | ||
if (likely(numRequests != maxQueueDepth)) | ||
break; | ||
|
||
waitForRequestSpace(block); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The infinite loop with break conditions scattered throughout makes the control flow difficult to follow. Consider restructuring this into a while loop with a clear condition or extracting the logic into separate helper methods.
for(;;) | |
{ | |
//If reconnecting, drop the packets until the connection has succeeded. | |
if (unlikely((state == State::Aborting) || (state == State::Reconnecting))) | |
{ | |
sender.releaseBuffer(ownedBuffer); | |
return; | |
} | |
//Ensure there is space to record the next item to send.. | |
//Maybe this should always expand rather than block??? | |
if (likely(numRequests != maxQueueDepth)) | |
break; | |
waitForRequestSpace(block); | |
// If reconnecting or aborting, drop the packets until the connection has succeeded. | |
if (unlikely((state == State::Aborting) || (state == State::Reconnecting))) | |
{ | |
sender.releaseBuffer(ownedBuffer); | |
return; | |
} | |
// Ensure there is space to record the next item to send. | |
// Maybe this should always expand rather than block??? | |
while (numRequests == maxQueueDepth) | |
{ | |
waitForRequestSpace(block); | |
// After waiting, check state again in case it changed. | |
if (unlikely((state == State::Aborting) || (state == State::Reconnecting))) | |
{ | |
sender.releaseBuffer(ownedBuffer); | |
return; | |
} |
Copilot uses AI. Check for mistakes.
{ | ||
sender.setAsyncProcessor(asyncSender); | ||
|
||
//MORE: I am not sure if this is actually worthwhile - need to perform some performance tests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This TODO comment indicates uncertainty about the performance benefits. Either conduct the performance tests and update the code accordingly, or document the expected benefits and conditions under which this optimization should be used.
//MORE: I am not sure if this is actually worthwhile - need to perform some performance tests | |
// Register Roxie memory with the asyncSender to enable optimized memory management when using io_uring. | |
// This is expected to improve performance by allowing the async processor to efficiently handle Roxie-managed buffers. | |
// Enable this registration when using io_uring-based async senders, especially in high-throughput scenarios. |
Copilot uses AI. Check for mistakes.
@mckellyln I closed the previous PR and opened a clean one. Commits have been squashed because the steps didn't clarify the work. |
Signed-off-by: Gavin Halliday <[email protected]>
Type of change:
Checklist:
Smoketest:
Testing: