-
Notifications
You must be signed in to change notification settings - Fork 3.4k
Description
Hudi Split Loader Deadlock: Shared Thread Pool Between Producers and Consumers
Summary
The Hudi connector experiences deadlocks when all threads in the shared thread pool become blocked waiting for queue operations. This occurs because both the split producers (offer operations) and consumers (getBatch operations) use the same underlying thread pool, creating a circular dependency that can lead to complete thread pool exhaustion. All hudi queries are waiting for hudi-split-manager threads to be available.
The current implementation has a fundamental flaw in its thread pool design:
-
Shared Executor: The
ThrottledAsyncQueue
is created with a singleExecutorService
that is used for both:- Producer operations:
AsyncQueue.offer()
→completeAsync(executor, notFullSignal)
- Consumer operations:
AsyncQueue.borrowBatchAsync()
→Futures.transform(..., executor)
- Producer operations:
-
Circular Dependency: When the queue becomes full:
- Split generators call
asyncQueue.offer()
which returns aListenableFuture<Void>
- This future is completed by
completeAsync(executor, notFullSignal)
using the same executor - Meanwhile, consumers call
getBatchAsync()
which also uses the same executor for completion - If all threads are blocked in producers waiting for consumers, and consumers need the same threads to complete, a deadlock occurs
- Split generators call
Code References
In HudiSplitSource constructor:
- Line 89:
this.queue = new ThrottledAsyncQueue<>(maxSplitsPerSecond, maxOutstandingSplits, executor);
- Line 95:
new BoundedExecutor(executor, getSplitGeneratorParallelism(session))
In AsyncQueue.offer():
- Lines 112-113: Returns
notFullSignal
when queue reaches target size
In AsyncQueue.completeAsync():
- Lines 238-241:
executor.execute(() -> future.set(null))
- Uses same executor for signal completion
In AsyncQueue.borrowBatchAsync():
- Lines 194-205: Uses same executor for
Futures.transform()
operations
In HudiBackgroundSplitLoader.loadSplits():
- Lines 101-103:
Futures.transformAsync(future, ignored -> loadSplits(partitionQueue), splitGeneratorExecutor)
- Creates recursive async chain using same executor
Deadlock Scenario
- Initial State: Queue reaches
targetQueueSize
, all subsequentoffer()
calls return pending futures - Producer Threads: All split generator threads become blocked waiting for
notFullSignal
completion - Consumer Threads:
getBatchAsync()
operations also need threads from the same pool to complete - Thread Pool Exhaustion: All threads in the executor are blocked waiting for queue operations
- Deadlock: No threads available to complete the futures that would unblock the waiting threads
Reproduction Steps
- Query a large Hudi table with many partitions
- Configure a small
maxOutstandingSplits
value to trigger queue full conditions quickly - Monitor thread dumps during split generation
- Observe all threads blocked in queue operations with no progress
Note: Always reproducable and queries are stuck infinitely
Expected Behavior
Producers and consumers should use separate thread pools or the queue implementation should be designed to avoid circular dependencies between offer and getBatch operations.
Environment
- Trino Version: 455 (Checked with master too)
- Connector: Hudi