Skip to content

Commit 80a0e2d

Browse files
authored
Instrument rejected tasks in ThreadPoolExecutor (#4272)
1 parent 90fa922 commit 80a0e2d

File tree

2 files changed

+48
-0
lines changed

2 files changed

+48
-0
lines changed

metrics-core/src/main/java/com/codahale/metrics/InstrumentedExecutorService.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import java.util.concurrent.ExecutorService;
1010
import java.util.concurrent.ForkJoinPool;
1111
import java.util.concurrent.Future;
12+
import java.util.concurrent.RejectedExecutionHandler;
1213
import java.util.concurrent.ThreadPoolExecutor;
1314
import java.util.concurrent.TimeUnit;
1415
import java.util.concurrent.TimeoutException;
@@ -30,6 +31,7 @@ public class InstrumentedExecutorService implements ExecutorService {
3031
private final Meter submitted;
3132
private final Counter running;
3233
private final Meter completed;
34+
private final Counter rejected;
3335
private final Timer idle;
3436
private final Timer duration;
3537

@@ -57,6 +59,7 @@ public InstrumentedExecutorService(ExecutorService delegate, MetricRegistry regi
5759
this.submitted = registry.meter(MetricRegistry.name(name, "submitted"));
5860
this.running = registry.counter(MetricRegistry.name(name, "running"));
5961
this.completed = registry.meter(MetricRegistry.name(name, "completed"));
62+
this.rejected = registry.counter(MetricRegistry.name(name, "rejected"));
6063
this.idle = registry.timer(MetricRegistry.name(name, "idle"));
6164
this.duration = registry.timer(MetricRegistry.name(name, "duration"));
6265

@@ -81,6 +84,8 @@ private void registerInternalMetrics() {
8184
queue::size);
8285
registry.registerGauge(MetricRegistry.name(name, "tasks.capacity"),
8386
queue::remainingCapacity);
87+
RejectedExecutionHandler delegateHandler = executor.getRejectedExecutionHandler();
88+
executor.setRejectedExecutionHandler(new InstrumentedRejectedExecutionHandler(delegateHandler));
8489
} else if (delegate instanceof ForkJoinPool) {
8590
ForkJoinPool forkJoinPool = (ForkJoinPool) delegate;
8691
registry.registerGauge(MetricRegistry.name(name, "tasks.stolen"),
@@ -223,6 +228,20 @@ public boolean awaitTermination(long l, TimeUnit timeUnit) throws InterruptedExc
223228
return delegate.awaitTermination(l, timeUnit);
224229
}
225230

231+
private class InstrumentedRejectedExecutionHandler implements RejectedExecutionHandler {
232+
private final RejectedExecutionHandler delegateHandler;
233+
234+
public InstrumentedRejectedExecutionHandler(RejectedExecutionHandler delegateHandler) {
235+
this.delegateHandler = delegateHandler;
236+
}
237+
238+
@Override
239+
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
240+
rejected.inc();
241+
this.delegateHandler.rejectedExecution(r, executor);
242+
}
243+
}
244+
226245
private class InstrumentedRunnable implements Runnable {
227246
private final Runnable task;
228247
private final Timer.Context idleContext;

metrics-core/src/test/java/com/codahale/metrics/InstrumentedExecutorServiceTest.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,17 @@
88

99
import java.time.Duration;
1010
import java.util.concurrent.Callable;
11+
import java.util.concurrent.CountDownLatch;
1112
import java.util.concurrent.ExecutorService;
1213
import java.util.concurrent.Executors;
1314
import java.util.concurrent.Future;
1415
import java.util.concurrent.LinkedBlockingQueue;
16+
import java.util.concurrent.RejectedExecutionException;
1517
import java.util.concurrent.ThreadPoolExecutor;
1618
import java.util.concurrent.TimeUnit;
1719

1820
import static org.assertj.core.api.Assertions.assertThat;
21+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
1922

2023
public class InstrumentedExecutorServiceTest {
2124

@@ -166,6 +169,32 @@ public void reportsTasksInformationForThreadPoolExecutor() throws Exception {
166169
assertThat(poolSize.getValue()).isEqualTo(1);
167170
}
168171

172+
@Test
173+
public void reportsRejectedTasksForThreadPoolExecutor() throws Exception {
174+
executor = new ThreadPoolExecutor(1, 1,
175+
0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1));
176+
instrumentedExecutorService = new InstrumentedExecutorService(executor, registry, "tp");
177+
final Counter rejected = registry.counter("tp.rejected");
178+
assertThat(rejected.getCount()).isEqualTo(0);
179+
180+
final CountDownLatch latch = new CountDownLatch(1);
181+
182+
Runnable runnable = () -> {
183+
try {
184+
latch.await();
185+
} catch (InterruptedException e) {
186+
throw new RuntimeException(e);
187+
}
188+
};
189+
190+
Future<?> executingFuture = instrumentedExecutorService.submit(runnable);
191+
Future<?> queuedFuture = instrumentedExecutorService.submit(runnable);
192+
assertThatThrownBy(() -> instrumentedExecutorService.submit(runnable))
193+
.isInstanceOf(RejectedExecutionException.class);
194+
latch.countDown();
195+
assertThat(rejected.getCount()).isEqualTo(1);
196+
}
197+
169198
@Test
170199
public void removesMetricsAfterShutdownForThreadPoolExecutor() {
171200
executor = new ThreadPoolExecutor(4, 16,

0 commit comments

Comments
 (0)