Skip to content

Commit 41d7165

Browse files
committed
GH-3471: Async HTTP
1 parent 33d2fa0 commit 41d7165

File tree

8 files changed

+273
-109
lines changed

8 files changed

+273
-109
lines changed

jena-arq/src/main/java/org/apache/jena/atlas/web/HttpException.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,15 @@ public HttpException(int statusCode) {
3434
}
3535

3636
public HttpException(int statusCode, String statusLine) {
37-
super(exMessage(statusCode, statusLine));
38-
this.statusCode = statusCode;
39-
this.statusLine = statusLine ;
40-
this.response = null;
37+
this(statusCode, statusLine, null, null);
4138
}
4239

4340
public HttpException(int statusCode, String statusLine, String responseMessage) {
44-
super(exMessage(statusCode, statusLine));
41+
this(statusCode, statusLine, responseMessage, null);
42+
}
43+
44+
public HttpException(int statusCode, String statusLine, String responseMessage, Throwable cause) {
45+
super(exMessage(statusCode, statusLine), cause);
4546
this.statusCode = statusCode;
4647
this.statusLine = statusLine ;
4748
this.response = responseMessage;

jena-arq/src/main/java/org/apache/jena/http/AsyncHttpRDF.java

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -160,26 +160,50 @@ public static void syncOrElseThrow(CompletableFuture<Void> cf) {
160160
* This operation extracts RuntimeException from the {@code CompletableFuture}.
161161
*/
162162
public static <T> T getOrElseThrow(CompletableFuture<T> cf) {
163+
return getOrElseThrow(cf, null);
164+
}
165+
166+
/**
167+
* Get the value of a {@link CompletableFuture} that executes of an HTTP request.
168+
* In case on any error, an {@link HttpException} is thrown.
169+
*
170+
* @param <T> The type of the value being computed.
171+
* @param cf The completable future.
172+
* @param httpRequest An optional HttpRequest for improving feedback in case of exceptions.
173+
* @return The value computed by the completable future.
174+
*/
175+
public static <T> T getOrElseThrow(CompletableFuture<T> cf, HttpRequest httpRequest) {
163176
Objects.requireNonNull(cf);
164177
try {
165178
return cf.join();
166179
//} catch (CancellationException ex1) { // Let this pass out.
167180
} catch (CompletionException ex) {
168-
if ( ex.getCause() != null ) {
169-
Throwable cause = ex.getCause();
170-
if ( cause instanceof RuntimeException )
171-
throw (RuntimeException)cause;
181+
Throwable cause = ex.getCause();
182+
if ( cause != null ) {
183+
184+
// Pass on our own HttpException instances such as 401 Unauthorized.
185+
if ( cause instanceof HttpException httpEx ) {
186+
throw new HttpException(httpEx.getStatusCode(), httpEx.getStatusLine(), httpEx.getResponse(), cause);
187+
}
188+
189+
final String msg = cause.getMessage();
190+
172191
if ( cause instanceof IOException ) {
173-
IOException iox = (IOException)cause;
174192
// Rather than an HTTP exception, bad authentication becomes IOException("too many authentication attempts");
175-
if ( iox.getMessage().contains("too many authentication attempts") ||
176-
iox.getMessage().contains("No credentials provided") ) {
177-
throw new HttpException(401, HttpSC.getMessage(401));
193+
if ( msg != null &&
194+
( msg.contains("too many authentication attempts") ||
195+
msg.contains("No credentials provided") ) ) {
196+
throw new HttpException(401, HttpSC.getMessage(401), null, cause);
197+
}
198+
if (httpRequest != null) {
199+
throw new HttpException(httpRequest.method()+" "+httpRequest.uri().toString(), cause);
178200
}
179-
IO.exception((IOException)cause);
180201
}
202+
203+
throw new HttpException(msg, cause);
181204
}
182-
throw ex;
205+
// Note: CompletionException without cause should never happen.
206+
throw new HttpException(ex);
183207
}
184208
}
185209

jena-arq/src/main/java/org/apache/jena/http/HttpLib.java

Lines changed: 62 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.time.Duration;
3838
import java.util.*;
3939
import java.util.concurrent.CompletableFuture;
40+
import java.util.concurrent.CompletionException;
4041
import java.util.concurrent.TimeUnit;
4142
import java.util.function.Consumer;
4243
import java.util.function.Function;
@@ -560,7 +561,7 @@ public static Builder contentTypeHeader(Builder builder, String contentType) {
560561
* @return HttpResponse
561562
*/
562563
public static HttpResponse<InputStream> execute(HttpClient httpClient, HttpRequest httpRequest) {
563-
return execute(httpClient, httpRequest, BodyHandlers.ofInputStream());
564+
return AsyncHttpRDF.getOrElseThrow(executeAsync(httpClient, httpRequest, BodyHandlers.ofInputStream()), httpRequest);
564565
}
565566

566567
/**
@@ -580,13 +581,35 @@ public static HttpResponse<InputStream> execute(HttpClient httpClient, HttpReque
580581
*
581582
* @param httpClient
582583
* @param httpRequest
583-
* @param bodyHandler
584584
* @return HttpResponse
585585
*/
586-
/*package*/ static <X> HttpResponse<X> execute(HttpClient httpClient, HttpRequest httpRequest, BodyHandler<X> bodyHandler) {
586+
public static CompletableFuture<HttpResponse<InputStream>> executeAsync(HttpClient httpClient, HttpRequest httpRequest) {
587+
return executeAsync(httpClient, httpRequest, BodyHandlers.ofInputStream());
588+
}
589+
590+
/**
591+
* Execute a request, return a {@code HttpResponse<X>} which
592+
* can be passed to {@link #handleHttpStatusCode(HttpResponse)} which will
593+
* convert non-2xx status code to {@link HttpException HttpExceptions}.
594+
* <p>
595+
* This function applies the HTTP authentication challenge support
596+
* and will repeat the request if necessary with added authentication.
597+
* <p>
598+
* See {@link AuthEnv} for authentication registration.
599+
* <br/>
600+
* See {@link #executeJDK} to execute exactly once without challenge response handling.
601+
*
602+
* @see AuthEnv AuthEnv for authentic registration
603+
* @see #executeJDK executeJDK to execute exacly once.
604+
*
605+
* @param httpClient
606+
* @param httpRequest
607+
* @param bodyHandler
608+
* @return HttpResponse
609+
*/ /*package*/ static <X> CompletableFuture<HttpResponse<X>> executeAsync(HttpClient httpClient, HttpRequest httpRequest, BodyHandler<X> bodyHandler) {
587610
// To run with no jena-supplied authentication handling.
588611
if ( false )
589-
return executeJDK(httpClient, httpRequest, bodyHandler);
612+
return executeJDKAsync(httpClient, httpRequest, bodyHandler);
590613
URI uri = httpRequest.uri();
591614
URI key = null;
592615

@@ -602,29 +625,16 @@ public static HttpResponse<InputStream> execute(HttpClient httpClient, HttpReque
602625
authEnv.registerUsernamePassword(key, userpasswd[0], userpasswd[1]);
603626
}
604627
}
605-
try {
606-
return AuthLib.authExecute(httpClient, httpRequest, bodyHandler);
607-
} finally {
608-
if ( key != null )
609-
// The AuthEnv is "per tenant".
610-
// Temporary registration within the AuthEnv of the
611-
// user:password is acceptable.
612-
authEnv.unregisterUsernamePassword(key);
613-
}
614-
}
615628

616-
/**
617-
* Execute request and return a {@code HttpResponse<InputStream>} response.
618-
* Status codes have not been handled. The response can be passed to
619-
* {@link #handleResponseInputStream(HttpResponse)} which will convert non-2xx
620-
* status code to {@link HttpException HttpExceptions}.
621-
*
622-
* @param httpClient
623-
* @param httpRequest
624-
* @return HttpResponse
625-
*/
626-
public static HttpResponse<InputStream> executeJDK(HttpClient httpClient, HttpRequest httpRequest) {
627-
return execute(httpClient, httpRequest, BodyHandlers.ofInputStream());
629+
URI finalKey = key;
630+
return AuthLib.authExecuteAsync(httpClient, httpRequest, bodyHandler)
631+
.whenComplete((httpResponse, throwable) -> {
632+
if ( finalKey != null )
633+
// The AuthEnv is "per tenant".
634+
// Temporary registration within the AuthEnv of the
635+
// user:password is acceptable.
636+
authEnv.unregisterUsernamePassword(finalKey);
637+
});
628638
}
629639

630640
/**
@@ -640,25 +650,32 @@ public static HttpResponse<InputStream> executeJDK(HttpClient httpClient, HttpRe
640650
* @return HttpResponse
641651
*/
642652
public static <T> HttpResponse<T> executeJDK(HttpClient httpClient, HttpRequest httpRequest, BodyHandler<T> bodyHandler) {
643-
try {
644-
// This is the one place all HTTP requests go through.
645-
logRequest(httpRequest);
646-
HttpResponse<T> httpResponse = httpClient.send(httpRequest, bodyHandler);
647-
logResponse(httpResponse);
648-
return httpResponse;
649-
//} catch (HttpTimeoutException ex) {
650-
} catch (IOException | InterruptedException ex) {
651-
if ( ex.getMessage() != null ) {
652-
// This is silly.
653-
// Rather than an HTTP exception, bad authentication becomes IOException("too many authentication attempts");
654-
// or IOException("No credentials provided") if the authenticator decides to return null.
655-
if ( ex.getMessage().contains("too many authentication attempts") ||
656-
ex.getMessage().contains("No credentials provided") ) {
657-
throw new HttpException(401, HttpSC.getMessage(401));
658-
}
659-
}
660-
throw new HttpException(httpRequest.method()+" "+httpRequest.uri().toString(), ex);
661-
}
653+
return AsyncHttpRDF.getOrElseThrow(executeJDKAsync(httpClient, httpRequest, bodyHandler), httpRequest);
654+
}
655+
656+
/**
657+
* Execute request and return a {@code HttpResponse<InputStream>} response.
658+
* Status codes have not been handled. The response can be passed to
659+
* {@link #handleResponseInputStream(HttpResponse)} which will convert non-2xx
660+
* status code to {@link HttpException HttpExceptions}.
661+
*
662+
* @param httpClient
663+
* @param httpRequest
664+
* @return HttpResponse
665+
*/
666+
public static CompletableFuture<HttpResponse<InputStream>> executeJDKAsync(HttpClient httpClient, HttpRequest httpRequest) {
667+
return executeAsync(httpClient, httpRequest, BodyHandlers.ofInputStream());
668+
}
669+
670+
public static <T> CompletableFuture<HttpResponse<T>> executeJDKAsync(HttpClient httpClient, HttpRequest httpRequest, BodyHandler<T> bodyHandler) {
671+
// This is the one place all HTTP requests go through.
672+
logRequest(httpRequest);
673+
CompletableFuture<HttpResponse<T>> future = httpClient.sendAsync(httpRequest, bodyHandler)
674+
.thenApply(httpResponse -> {
675+
logResponse(httpResponse);
676+
return httpResponse;
677+
});
678+
return future;
662679
}
663680

664681
/*package*/ static CompletableFuture<HttpResponse<InputStream>> asyncExecute(HttpClient httpClient, HttpRequest httpRequest) {

jena-arq/src/main/java/org/apache/jena/http/auth/AuthLib.java

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,19 @@
2929
import java.nio.charset.StandardCharsets;
3030
import java.util.Base64;
3131
import java.util.List;
32+
import java.util.concurrent.CompletableFuture;
3233

3334
import org.apache.jena.atlas.lib.Bytes;
3435
import org.apache.jena.atlas.web.AuthScheme;
3536
import org.apache.jena.atlas.web.HttpException;
37+
import org.apache.jena.http.AsyncHttpRDF;
3638
import org.apache.jena.http.HttpLib;
3739
import org.apache.jena.riot.web.HttpNames;
3840
import org.apache.jena.web.HttpSC;
3941

4042
public class AuthLib {
4143
/**
42-
* Call {@link HttpClient#send} after applying an active {@link AuthRequestModifier}
44+
* Call the {@link HttpClient} after applying an active {@link AuthRequestModifier}
4345
* to modify the {@link java.net.http.HttpRequest.Builder}.
4446
* If no {@link AuthRequestModifier} is available and if a 401 response is received,
4547
* setup a {@link AuthRequestModifier} passed on registered username and password information.
@@ -51,24 +53,41 @@ public class AuthLib {
5153
* @return HttpResponse&lt;T&gt;
5254
*/
5355
public static <T> HttpResponse<T> authExecute(HttpClient httpClient, HttpRequest httpRequest, BodyHandler<T> bodyHandler) {
54-
HttpResponse<T> httpResponse = HttpLib.executeJDK(httpClient, httpRequest, bodyHandler);
56+
return AsyncHttpRDF.getOrElseThrow(authExecuteAsync(httpClient, httpRequest, bodyHandler), httpRequest);
57+
}
5558

56-
// -- 401 handling.
57-
if ( httpResponse.statusCode() != 401 )
58-
return httpResponse;
59-
HttpResponse<T> httpResponse2 = handle401(httpClient, httpRequest, bodyHandler, httpResponse);
60-
return httpResponse2;
59+
/**
60+
* Call {@link HttpClient#sendAsync} after applying an active {@link AuthRequestModifier}
61+
* to modify the {@link java.net.http.HttpRequest.Builder}.
62+
* If no {@link AuthRequestModifier} is available and if a 401 response is received,
63+
* setup a {@link AuthRequestModifier} passed on registered username and password information.
64+
* This function supports basic and digest authentication.
65+
*
66+
* @param httpClient HttpClient
67+
* @param httpRequest
68+
* @param bodyHandler
69+
* @return CompletableFuture&lt;HttpResponse&lt;T&gt;&gt;
70+
*/
71+
public static <T> CompletableFuture<HttpResponse<T>> authExecuteAsync(HttpClient httpClient, HttpRequest httpRequest, BodyHandler<T> bodyHandler) {
72+
return HttpLib.executeJDKAsync(httpClient, httpRequest, bodyHandler)
73+
.thenCompose(httpResponse -> {
74+
// -- 401 handling.
75+
if ( httpResponse.statusCode() != 401 )
76+
return CompletableFuture.completedFuture(httpResponse);
77+
CompletableFuture<HttpResponse<T>> httpResponse2 = handle401Async(httpClient, httpRequest, bodyHandler, httpResponse);
78+
return httpResponse2;
79+
});
6180
}
6281

6382
/* Handle a 401 (authentication challenge). */
64-
private static <T> HttpResponse<T> handle401(HttpClient httpClient,
83+
private static <T> CompletableFuture<HttpResponse<T>> handle401Async(HttpClient httpClient,
6584
HttpRequest request,
6685
BodyHandler<T> bodyHandler,
6786
HttpResponse<T> httpResponse401) {
6887
AuthChallenge aHeader = wwwAuthenticateHeader(httpResponse401);
6988
if ( aHeader == null )
7089
// No valid header - simply return the original response.
71-
return httpResponse401;
90+
return CompletableFuture.completedFuture(httpResponse401);
7291

7392
// Currently on a URI endpoint-by-endpoint basis.
7493
// String realm = aHeader.getRealm();
@@ -102,14 +121,14 @@ private static <T> HttpResponse<T> handle401(HttpClient httpClient,
102121
}
103122
case UNKNOWN :
104123
// Not handled. Pass back the 401.
105-
return httpResponse401;
124+
return CompletableFuture.completedFuture(httpResponse401);
106125
default:
107126
throw new HttpException("Not an authentication scheme -- "+aHeader.authScheme);
108127
}
109128

110129
// Failed to generate a request modifier for a retry.
111130
if ( authRequestModifier == null)
112-
return httpResponse401;
131+
return CompletableFuture.completedFuture(httpResponse401);
113132

114133
// ---- Register for next time the app calls this URI.
115134
AuthEnv.get().registerAuthModifier(request.uri().toString(), authRequestModifier);
@@ -119,7 +138,7 @@ private static <T> HttpResponse<T> handle401(HttpClient httpClient,
119138
request2builder = authRequestModifier.addAuth(request2builder);
120139

121140
HttpRequest httpRequest2 = request2builder.build();
122-
HttpResponse<T> httpResponse2 = HttpLib.executeJDK(httpClient, httpRequest2, bodyHandler);
141+
CompletableFuture<HttpResponse<T>> httpResponse2 = HttpLib.executeJDKAsync(httpClient, httpRequest2, bodyHandler);
123142
// Pass back to application regardless of response code.
124143
return httpResponse2;
125144
}

0 commit comments

Comments
 (0)