Skip to content

Commit d0afa8c

Browse files
authored
Fix java http client request cancellation (#14747)
1 parent 38c2b69 commit d0afa8c

File tree

5 files changed

+127
-12
lines changed

5 files changed

+127
-12
lines changed

instrumentation/java-http-client/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/javahttpclient/HttpClientInstrumentation.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ public static AsyncAdviceScope start(HttpRequest request) {
156156
}
157157

158158
public CompletableFuture<HttpResponse<?>> end(
159-
@Nullable Throwable throwable, @Nullable CompletableFuture<HttpResponse<?>> future) {
159+
@Nullable Throwable throwable, CompletableFuture<HttpResponse<?>> future) {
160160
if (callDepth.decrementAndGet() > 0 || scope == null) {
161161
// async end nested call
162162
return future;
@@ -167,8 +167,8 @@ public CompletableFuture<HttpResponse<?>> end(
167167
instrumenter().end(context, request, null, throwable);
168168
return future;
169169
}
170-
future = future.whenComplete(new ResponseConsumer(instrumenter(), context, request));
171-
return CompletableFutureWrapper.wrap(future, parentContext);
170+
return CompletableFutureWrapper.wrap(future, parentContext)
171+
.whenComplete(new ResponseConsumer(instrumenter(), context, request));
172172
}
173173
}
174174

@@ -182,7 +182,7 @@ public static AsyncAdviceScope methodEnter(
182182
@AssignReturned.ToReturned
183183
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
184184
public static CompletableFuture<HttpResponse<?>> methodExit(
185-
@Advice.Return @Nullable CompletableFuture<HttpResponse<?>> future,
185+
@Advice.Return CompletableFuture<HttpResponse<?>> future,
186186
@Advice.Thrown @Nullable Throwable throwable,
187187
@Advice.Enter @Nullable AsyncAdviceScope scope) {
188188
return scope == null ? future : scope.end(throwable, future);

instrumentation/java-http-client/library/src/main/java/io/opentelemetry/instrumentation/javahttpclient/internal/CompletableFutureWrapper.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,15 @@
1313
* This class is internal and is hence not for public use. Its APIs are unstable and can change at
1414
* any time.
1515
*/
16-
public final class CompletableFutureWrapper {
16+
public final class CompletableFutureWrapper<T> extends CompletableFuture<T> {
17+
private final CompletableFuture<?> future;
1718

18-
private CompletableFutureWrapper() {}
19+
private CompletableFutureWrapper(CompletableFuture<?> future) {
20+
this.future = future;
21+
}
1922

2023
public static <T> CompletableFuture<T> wrap(CompletableFuture<T> future, Context context) {
21-
CompletableFuture<T> result = new CompletableFuture<>();
24+
CompletableFuture<T> result = new CompletableFutureWrapper<>(future);
2225
future.whenComplete(
2326
(T value, Throwable throwable) -> {
2427
try (Scope ignored = context.makeCurrent()) {
@@ -32,4 +35,16 @@ public static <T> CompletableFuture<T> wrap(CompletableFuture<T> future, Context
3235

3336
return result;
3437
}
38+
39+
@Override
40+
public <U> CompletableFuture<U> newIncompleteFuture() {
41+
return new CompletableFutureWrapper<>(future);
42+
}
43+
44+
@Override
45+
public boolean cancel(boolean mayInterruptIfRunning) {
46+
boolean result = super.cancel(mayInterruptIfRunning);
47+
future.cancel(mayInterruptIfRunning);
48+
return result;
49+
}
3550
}

instrumentation/java-http-client/library/src/main/java/io/opentelemetry/instrumentation/javahttpclient/internal/OpenTelemetryHttpClient.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,8 +142,9 @@ private <T> CompletableFuture<HttpResponse<T>> traceAsync(
142142
instrumenter.end(context, request, null, t);
143143
throw t;
144144
}
145-
future = future.whenComplete(new ResponseConsumer(instrumenter, context, request));
146-
future = CompletableFutureWrapper.wrap(future, parentContext);
145+
future =
146+
CompletableFutureWrapper.wrap(future, parentContext)
147+
.whenComplete(new ResponseConsumer(instrumenter, context, request));
147148
return future;
148149
}
149150
}

instrumentation/java-http-client/testing/src/main/java/io/opentelemetry/instrumentation/javahttpclient/AbstractJavaHttpClientTest.java

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,22 @@
55

66
package io.opentelemetry.instrumentation.javahttpclient;
77

8+
import static io.opentelemetry.api.common.AttributeKey.stringKey;
9+
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
810
import static io.opentelemetry.semconv.NetworkAttributes.NETWORK_PROTOCOL_VERSION;
11+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
912

1013
import io.opentelemetry.api.common.AttributeKey;
14+
import io.opentelemetry.api.trace.Span;
15+
import io.opentelemetry.api.trace.SpanKind;
1116
import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpClientTest;
1217
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientResult;
1318
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestOptions;
19+
import io.opentelemetry.sdk.trace.data.StatusData;
20+
import io.opentelemetry.semconv.ErrorAttributes;
21+
import io.opentelemetry.semconv.HttpAttributes;
22+
import io.opentelemetry.semconv.ServerAttributes;
23+
import io.opentelemetry.semconv.UrlAttributes;
1424
import java.io.IOException;
1525
import java.net.URI;
1626
import java.net.http.HttpClient;
@@ -19,7 +29,11 @@
1929
import java.util.HashSet;
2030
import java.util.Map;
2131
import java.util.Set;
32+
import java.util.concurrent.CancellationException;
33+
import java.util.concurrent.CompletableFuture;
34+
import java.util.concurrent.TimeUnit;
2235
import org.junit.jupiter.api.BeforeAll;
36+
import org.junit.jupiter.api.Test;
2337

2438
public abstract class AbstractJavaHttpClientTest extends AbstractHttpClientTest<HttpRequest> {
2539

@@ -106,4 +120,76 @@ protected void configure(HttpClientTestOptions.Builder optionsBuilder) {
106120
return attributes;
107121
});
108122
}
123+
124+
@SuppressWarnings("Interruption") // test calls CompletableFuture.cancel with true
125+
@Test
126+
void cancelRequest() throws InterruptedException {
127+
boolean isJdk11 = "11".equals(System.getProperty("java.specification.version"));
128+
String method = "GET";
129+
URI uri = resolveAddress("/long-request");
130+
131+
CompletableFuture<String> future =
132+
testing.runWithSpan(
133+
"parent",
134+
() -> {
135+
HttpRequest request =
136+
HttpRequest.newBuilder()
137+
.uri(uri)
138+
.method(method, HttpRequest.BodyPublishers.noBody())
139+
.header("delay", String.valueOf(TimeUnit.SECONDS.toMillis(5)))
140+
.build();
141+
return client
142+
.sendAsync(request, HttpResponse.BodyHandlers.ofString())
143+
.thenApply(HttpResponse::body)
144+
.whenComplete(
145+
(response, throwable) ->
146+
testing.runWithSpan(
147+
"child",
148+
() -> {
149+
if (throwable != null && throwable.getCause() != null) {
150+
Span.current()
151+
.setAttribute(
152+
"throwable", throwable.getCause().getClass().getName());
153+
}
154+
}))
155+
// this stage is only added to trigger the whenComplete stage when this stage gets
156+
// cancelled
157+
.exceptionally(ex -> "cancelled");
158+
});
159+
160+
// sleep a bit to let the request start
161+
Thread.sleep(1_000);
162+
future.cancel(true);
163+
assertThatThrownBy(future::get).isInstanceOf(CancellationException.class);
164+
165+
testing.waitAndAssertTraces(
166+
trace ->
167+
trace.hasSpansSatisfyingExactly(
168+
span -> span.hasName("parent").hasNoParent(),
169+
span ->
170+
span.hasName("GET")
171+
.hasKind(SpanKind.CLIENT)
172+
.hasParent(trace.getSpan(0))
173+
.hasStatus(StatusData.error())
174+
.hasAttributesSatisfyingExactly(
175+
equalTo(UrlAttributes.URL_FULL, uri.toString()),
176+
equalTo(ServerAttributes.SERVER_ADDRESS, uri.getHost()),
177+
equalTo(ServerAttributes.SERVER_PORT, uri.getPort()),
178+
equalTo(HttpAttributes.HTTP_REQUEST_METHOD, method),
179+
equalTo(
180+
ErrorAttributes.ERROR_TYPE, CancellationException.class.getName())),
181+
span ->
182+
span.hasName("test-http-server")
183+
.hasKind(SpanKind.SERVER)
184+
.hasParent(trace.getSpan(1))
185+
// jdk 11 does not cancel the request on the server side so the request
186+
// succeeds
187+
.hasStatus(isJdk11 ? StatusData.unset() : StatusData.error()),
188+
span ->
189+
span.hasName("child")
190+
.hasParent(trace.getSpan(0))
191+
.hasAttributesSatisfyingExactly(
192+
equalTo(
193+
stringKey("throwable"), CancellationException.class.getName()))));
194+
}
109195
}

testing-common/src/main/java/io/opentelemetry/instrumentation/testing/junit/http/HttpClientTestServer.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@
1111
import static io.opentelemetry.testing.internal.armeria.common.MediaType.PLAIN_TEXT_UTF_8;
1212

1313
import io.opentelemetry.api.OpenTelemetry;
14+
import io.opentelemetry.api.trace.Span;
1415
import io.opentelemetry.api.trace.SpanBuilder;
16+
import io.opentelemetry.api.trace.StatusCode;
1517
import io.opentelemetry.api.trace.Tracer;
1618
import io.opentelemetry.context.Context;
1719
import io.opentelemetry.instrumentation.test.server.http.RequestContextGetter;
@@ -137,7 +139,7 @@ protected void configure(ServerBuilder sb) throws Exception {
137139
throw new AssertionError((Object) ("more than one " + field + " header present"));
138140
}
139141
}
140-
SpanBuilder span =
142+
SpanBuilder spanBuilder =
141143
tracer
142144
.spanBuilder("test-http-server")
143145
.setSpanKind(SERVER)
@@ -149,9 +151,20 @@ protected void configure(ServerBuilder sb) throws Exception {
149151

150152
String traceRequestId = req.headers().get("test-request-id");
151153
if (traceRequestId != null) {
152-
span.setAttribute("test.request.id", Integer.parseInt(traceRequestId));
154+
spanBuilder.setAttribute("test.request.id", Integer.parseInt(traceRequestId));
153155
}
154-
span.startSpan().end();
156+
Span span = spanBuilder.startSpan();
157+
ctx.log()
158+
.whenComplete()
159+
.thenAccept(
160+
log -> {
161+
Throwable error = log.responseCause();
162+
if (error != null) {
163+
span.recordException(error);
164+
span.setStatus(StatusCode.ERROR);
165+
}
166+
span.end();
167+
});
155168

156169
// this header is set by java http client http/2 tests
157170
// we delay the response a bit to ensure that client can send the full request before

0 commit comments

Comments
 (0)