Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions stub/src/main/java/io/grpc/stub/BlockingClientCall.java
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,13 @@ void sendSingleRequest(ReqT request) {
public void cancel(String message, Throwable cause) {
writeClosed = true;
call.cancel(message, cause);
// After canceling a BlockingClientCall, the caller may not interact with it further. That means
// the call executor, a ThreadSafeThreadlessExecutor, will not execute any more tasks. There may
// still be tasks submitted to the call executor, though, until the underlying call completes.
// Some of these tasks (e.g., server messages available) may leak native resources unless
// executed. So, we convert the executor to a "direct" executor in order to ensure all call
// tasks run.
executor.becomeDirect();
}

/**
Expand Down
28 changes: 26 additions & 2 deletions stub/src/main/java/io/grpc/stub/ClientCalls.java
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,7 @@ static final class ThreadSafeThreadlessExecutor extends ConcurrentLinkedQueue<Ru

private final Lock waiterLock = new ReentrantLock();
private final Condition waiterCondition = waiterLock.newCondition();
private boolean direct;

// Non private to avoid synthetic class
ThreadSafeThreadlessExecutor() {}
Expand Down Expand Up @@ -950,6 +951,20 @@ void drain() {
}
}

/**
* Turn the executor into a "direct" executor. Further calls to {@link #execute(Runnable)} will
* executable the runnable immediately in the calling thread.
*/
void becomeDirect() {
waiterLock.lock();
try {
direct = true;
} finally {
waiterLock.unlock();
}
drain();
}

private void signalAll() {
waiterLock.lock();
try {
Expand All @@ -975,13 +990,22 @@ private static void throwIfInterrupted() throws InterruptedException {

@Override
public void execute(Runnable runnable) {
boolean added = false;
waiterLock.lock();
try {
add(runnable);
waiterCondition.signalAll(); // If anything is waiting let it wake up and process this task
if (!direct) {
added = true;
add(runnable);
// If anything is waiting, let it wake up and process this task.
waiterCondition.signalAll();
}
} finally {
waiterLock.unlock();
}
if (!added) {
runQuietly(runnable);
signalAll();
}
}
}

Expand Down
Loading