Skip to content

Commit 017f631

Browse files
committed
async http
1 parent cc16f85 commit 017f631

File tree

6 files changed

+224
-82
lines changed

6 files changed

+224
-82
lines changed

jena-arq/src/main/java/org/apache/jena/http/HttpLib.java

Lines changed: 81 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.time.Duration;
3838
import java.util.*;
3939
import java.util.concurrent.CompletableFuture;
40+
import java.util.concurrent.CompletionException;
4041
import java.util.concurrent.TimeUnit;
4142
import java.util.function.Consumer;
4243
import java.util.function.Function;
@@ -560,7 +561,7 @@ public static Builder contentTypeHeader(Builder builder, String contentType) {
560561
* @return HttpResponse
561562
*/
562563
public static HttpResponse<InputStream> execute(HttpClient httpClient, HttpRequest httpRequest) {
563-
return execute(httpClient, httpRequest, BodyHandlers.ofInputStream());
564+
return AsyncHttpRDF.getOrElseThrow(executeAsync(httpClient, httpRequest, BodyHandlers.ofInputStream()));
564565
}
565566

566567
/**
@@ -580,13 +581,35 @@ public static HttpResponse<InputStream> execute(HttpClient httpClient, HttpReque
580581
*
581582
* @param httpClient
582583
* @param httpRequest
583-
* @param bodyHandler
584584
* @return HttpResponse
585585
*/
586-
/*package*/ static <X> HttpResponse<X> execute(HttpClient httpClient, HttpRequest httpRequest, BodyHandler<X> bodyHandler) {
586+
public static CompletableFuture<HttpResponse<InputStream>> executeAsync(HttpClient httpClient, HttpRequest httpRequest) {
587+
return executeAsync(httpClient, httpRequest, BodyHandlers.ofInputStream());
588+
}
589+
590+
/**
591+
* Execute a request, return a {@code HttpResponse<X>} which
592+
* can be passed to {@link #handleHttpStatusCode(HttpResponse)} which will
593+
* convert non-2xx status code to {@link HttpException HttpExceptions}.
594+
* <p>
595+
* This function applies the HTTP authentication challenge support
596+
* and will repeat the request if necessary with added authentication.
597+
* <p>
598+
* See {@link AuthEnv} for authentication registration.
599+
* <br/>
600+
* See {@link #executeJDK} to execute exactly once without challenge response handling.
601+
*
602+
* @see AuthEnv AuthEnv for authentic registration
603+
* @see #executeJDK executeJDK to execute exacly once.
604+
*
605+
* @param httpClient
606+
* @param httpRequest
607+
* @param bodyHandler
608+
* @return HttpResponse
609+
*/ /*package*/ static <X> CompletableFuture<HttpResponse<X>> executeAsync(HttpClient httpClient, HttpRequest httpRequest, BodyHandler<X> bodyHandler) {
587610
// To run with no jena-supplied authentication handling.
588611
if ( false )
589-
return executeJDK(httpClient, httpRequest, bodyHandler);
612+
return executeJDKAsync(httpClient, httpRequest, bodyHandler);
590613
URI uri = httpRequest.uri();
591614
URI key = null;
592615

@@ -602,29 +625,16 @@ public static HttpResponse<InputStream> execute(HttpClient httpClient, HttpReque
602625
authEnv.registerUsernamePassword(key, userpasswd[0], userpasswd[1]);
603626
}
604627
}
605-
try {
606-
return AuthLib.authExecute(httpClient, httpRequest, bodyHandler);
607-
} finally {
608-
if ( key != null )
609-
// The AuthEnv is "per tenant".
610-
// Temporary registration within the AuthEnv of the
611-
// user:password is acceptable.
612-
authEnv.unregisterUsernamePassword(key);
613-
}
614-
}
615628

616-
/**
617-
* Execute request and return a {@code HttpResponse<InputStream>} response.
618-
* Status codes have not been handled. The response can be passed to
619-
* {@link #handleResponseInputStream(HttpResponse)} which will convert non-2xx
620-
* status code to {@link HttpException HttpExceptions}.
621-
*
622-
* @param httpClient
623-
* @param httpRequest
624-
* @return HttpResponse
625-
*/
626-
public static HttpResponse<InputStream> executeJDK(HttpClient httpClient, HttpRequest httpRequest) {
627-
return execute(httpClient, httpRequest, BodyHandlers.ofInputStream());
629+
URI finalKey = key;
630+
return AuthLib.authExecuteAsync(httpClient, httpRequest, bodyHandler)
631+
.whenComplete((httpResponse, throwable) -> {
632+
if ( finalKey != null )
633+
// The AuthEnv is "per tenant".
634+
// Temporary registration within the AuthEnv of the
635+
// user:password is acceptable.
636+
authEnv.unregisterUsernamePassword(finalKey);
637+
});
628638
}
629639

630640
/**
@@ -640,25 +650,52 @@ public static HttpResponse<InputStream> executeJDK(HttpClient httpClient, HttpRe
640650
* @return HttpResponse
641651
*/
642652
public static <T> HttpResponse<T> executeJDK(HttpClient httpClient, HttpRequest httpRequest, BodyHandler<T> bodyHandler) {
643-
try {
644-
// This is the one place all HTTP requests go through.
645-
logRequest(httpRequest);
646-
HttpResponse<T> httpResponse = httpClient.send(httpRequest, bodyHandler);
647-
logResponse(httpResponse);
648-
return httpResponse;
649-
//} catch (HttpTimeoutException ex) {
650-
} catch (IOException | InterruptedException ex) {
651-
if ( ex.getMessage() != null ) {
652-
// This is silly.
653-
// Rather than an HTTP exception, bad authentication becomes IOException("too many authentication attempts");
654-
// or IOException("No credentials provided") if the authenticator decides to return null.
655-
if ( ex.getMessage().contains("too many authentication attempts") ||
656-
ex.getMessage().contains("No credentials provided") ) {
657-
throw new HttpException(401, HttpSC.getMessage(401));
653+
return AsyncHttpRDF.getOrElseThrow(executeJDKAsync(httpClient, httpRequest, bodyHandler));
654+
}
655+
656+
/**
657+
* Execute request and return a {@code HttpResponse<InputStream>} response.
658+
* Status codes have not been handled. The response can be passed to
659+
* {@link #handleResponseInputStream(HttpResponse)} which will convert non-2xx
660+
* status code to {@link HttpException HttpExceptions}.
661+
*
662+
* @param httpClient
663+
* @param httpRequest
664+
* @return HttpResponse
665+
*/
666+
public static CompletableFuture<HttpResponse<InputStream>> executeJDKAsync(HttpClient httpClient, HttpRequest httpRequest) {
667+
return executeAsync(httpClient, httpRequest, BodyHandlers.ofInputStream());
668+
}
669+
670+
public static <T> CompletableFuture<HttpResponse<T>> executeJDKAsync(HttpClient httpClient, HttpRequest httpRequest, BodyHandler<T> bodyHandler) {
671+
// This is the one place all HTTP requests go through.
672+
logRequest(httpRequest);
673+
CompletableFuture<HttpResponse<T>> future = httpClient.sendAsync(httpRequest, bodyHandler)
674+
.thenApply(httpResponse -> {
675+
logResponse(httpResponse);
676+
return httpResponse;
677+
})
678+
.exceptionally(ex -> {
679+
Throwable cause = ex instanceof CompletionException ? ex.getCause() : ex;
680+
if (cause instanceof IOException) {
681+
if ( ex.getMessage() != null ) {
682+
// This is silly.
683+
// Rather than an HTTP exception, bad authentication becomes IOException("too many authentication attempts");
684+
// or IOException("No credentials provided") if the authenticator decides to return null.
685+
if ( ex.getMessage().contains("too many authentication attempts") ||
686+
ex.getMessage().contains("No credentials provided") ) {
687+
throw new HttpException(401, HttpSC.getMessage(401));
688+
}
689+
}
690+
// Note: Can't reuse AsyncHttpRDF.handleRuntimeException because of this HttpException.
691+
throw new HttpException(httpRequest.method()+" "+httpRequest.uri().toString(), cause);
692+
} else if (cause instanceof RuntimeException re) {
693+
throw re;
694+
} else {
695+
throw new RuntimeException(cause);
658696
}
659-
}
660-
throw new HttpException(httpRequest.method()+" "+httpRequest.uri().toString(), ex);
661-
}
697+
});
698+
return future;
662699
}
663700

664701
/*package*/ static CompletableFuture<HttpResponse<InputStream>> asyncExecute(HttpClient httpClient, HttpRequest httpRequest) {

jena-arq/src/main/java/org/apache/jena/http/auth/AuthLib.java

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,19 @@
2929
import java.nio.charset.StandardCharsets;
3030
import java.util.Base64;
3131
import java.util.List;
32+
import java.util.concurrent.CompletableFuture;
3233

3334
import org.apache.jena.atlas.lib.Bytes;
3435
import org.apache.jena.atlas.web.AuthScheme;
3536
import org.apache.jena.atlas.web.HttpException;
37+
import org.apache.jena.http.AsyncHttpRDF;
3638
import org.apache.jena.http.HttpLib;
3739
import org.apache.jena.riot.web.HttpNames;
3840
import org.apache.jena.web.HttpSC;
3941

4042
public class AuthLib {
4143
/**
42-
* Call {@link HttpClient#send} after applying an active {@link AuthRequestModifier}
44+
* Call {@link HttpClient} after applying an active {@link AuthRequestModifier}
4345
* to modify the {@link java.net.http.HttpRequest.Builder}.
4446
* If no {@link AuthRequestModifier} is available and if a 401 response is received,
4547
* setup a {@link AuthRequestModifier} passed on registered username and password information.
@@ -51,24 +53,41 @@ public class AuthLib {
5153
* @return HttpResponse&lt;T&gt;
5254
*/
5355
public static <T> HttpResponse<T> authExecute(HttpClient httpClient, HttpRequest httpRequest, BodyHandler<T> bodyHandler) {
54-
HttpResponse<T> httpResponse = HttpLib.executeJDK(httpClient, httpRequest, bodyHandler);
56+
return AsyncHttpRDF.getOrElseThrow(authExecuteAsync(httpClient, httpRequest, bodyHandler));
57+
}
5558

56-
// -- 401 handling.
57-
if ( httpResponse.statusCode() != 401 )
58-
return httpResponse;
59-
HttpResponse<T> httpResponse2 = handle401(httpClient, httpRequest, bodyHandler, httpResponse);
60-
return httpResponse2;
59+
/**
60+
* Call {@link HttpClient#sendAsync} after applying an active {@link AuthRequestModifier}
61+
* to modify the {@link java.net.http.HttpRequest.Builder}.
62+
* If no {@link AuthRequestModifier} is available and if a 401 response is received,
63+
* setup a {@link AuthRequestModifier} passed on registered username and password information.
64+
* This function supports basic and digest authentication.
65+
*
66+
* @param httpClient HttpClient
67+
* @param httpRequest
68+
* @param bodyHandler
69+
* @return CompletableFuture&lt;HttpResponse&lt;T&gt;&gt;
70+
*/
71+
public static <T> CompletableFuture<HttpResponse<T>> authExecuteAsync(HttpClient httpClient, HttpRequest httpRequest, BodyHandler<T> bodyHandler) {
72+
return HttpLib.executeJDKAsync(httpClient, httpRequest, bodyHandler)
73+
.thenCompose(httpResponse -> {
74+
// -- 401 handling.
75+
if ( httpResponse.statusCode() != 401 )
76+
return CompletableFuture.completedFuture(httpResponse);
77+
CompletableFuture<HttpResponse<T>> httpResponse2 = handle401Async(httpClient, httpRequest, bodyHandler, httpResponse);
78+
return httpResponse2;
79+
});
6180
}
6281

6382
/* Handle a 401 (authentication challenge). */
64-
private static <T> HttpResponse<T> handle401(HttpClient httpClient,
83+
private static <T> CompletableFuture<HttpResponse<T>> handle401Async(HttpClient httpClient,
6584
HttpRequest request,
6685
BodyHandler<T> bodyHandler,
6786
HttpResponse<T> httpResponse401) {
6887
AuthChallenge aHeader = wwwAuthenticateHeader(httpResponse401);
6988
if ( aHeader == null )
7089
// No valid header - simply return the original response.
71-
return httpResponse401;
90+
return CompletableFuture.completedFuture(httpResponse401);
7291

7392
// Currently on a URI endpoint-by-endpoint basis.
7493
// String realm = aHeader.getRealm();
@@ -102,14 +121,14 @@ private static <T> HttpResponse<T> handle401(HttpClient httpClient,
102121
}
103122
case UNKNOWN :
104123
// Not handled. Pass back the 401.
105-
return httpResponse401;
124+
return CompletableFuture.completedFuture(httpResponse401);
106125
default:
107126
throw new HttpException("Not an authentication scheme -- "+aHeader.authScheme);
108127
}
109128

110129
// Failed to generate a request modifier for a retry.
111130
if ( authRequestModifier == null)
112-
return httpResponse401;
131+
return CompletableFuture.completedFuture(httpResponse401);
113132

114133
// ---- Register for next time the app calls this URI.
115134
AuthEnv.get().registerAuthModifier(request.uri().toString(), authRequestModifier);
@@ -119,7 +138,7 @@ private static <T> HttpResponse<T> handle401(HttpClient httpClient,
119138
request2builder = authRequestModifier.addAuth(request2builder);
120139

121140
HttpRequest httpRequest2 = request2builder.build();
122-
HttpResponse<T> httpResponse2 = HttpLib.executeJDK(httpClient, httpRequest2, bodyHandler);
141+
CompletableFuture<HttpResponse<T>> httpResponse2 = HttpLib.executeJDKAsync(httpClient, httpRequest2, bodyHandler);
123142
// Pass back to application regardless of response code.
124143
return httpResponse2;
125144
}

0 commit comments

Comments
 (0)