22
33import io .vertx .core .Vertx ;
44import io .vertx .core .http .*;
5- import io .vertx .core .net .JksOptions ;
5+ import io .vertx .core .net .KeyCertOptions ;
66import io .vertx .core .net .SocketAddress ;
7+ import io .vertx .core .net .TrustOptions ;
78
89import java .util .Set ;
910import java .util .concurrent .ConcurrentHashMap ;
1011import java .util .concurrent .TimeUnit ;
1112import java .util .concurrent .atomic .AtomicBoolean ;
13+ import java .util .function .Predicate ;
1214
1315public class HttpProxy {
1416
1517 private final Vertx vertx ;
1618 private HttpServer server ;
1719 private HttpClient httpClient ;
18- private WebSocketClient wsClient ;
19- private SocketAddress origin ;
20- private int port ;
20+ private WebSocketClient webSocketClient ;
2121 private final Set <WebSocketBase > webSockets = ConcurrentHashMap .newKeySet ();
22+ private HttpClientOptions httpConfig = new HttpClientOptions ().setTrustAll (true );
23+ private WebSocketClientOptions webSocketConfig = new WebSocketClientOptions ().setTrustAll (true );
24+ private int port ;
25+ private Predicate <HttpServerRequest > requestHandler = req -> true ;
2226
2327 public HttpProxy (Vertx vertx ) {
2428 this .vertx = vertx ;
25- this .httpClient = vertx .createHttpClient ();
26- this .wsClient = vertx .createWebSocketClient ();
29+ }
30+
31+ public HttpProxy requestHandler (Predicate <HttpServerRequest > requestHandler ) {
32+ this .requestHandler = requestHandler == null ? req -> true : requestHandler ;
33+ return this ;
2734 }
2835
2936 public HttpProxy origin (SocketAddress origin ) {
30- this .origin = origin ;
37+ httpConfig .setDefaultPort (origin .port ());
38+ httpConfig .setDefaultHost (origin .host ());
39+ webSocketConfig .setDefaultPort (origin .port ());
40+ webSocketConfig .setDefaultHost (origin .host ());
3141 return this ;
3242 }
3343
@@ -36,13 +46,33 @@ public HttpProxy port(int port) {
3646 return this ;
3747 }
3848
49+ public HttpProxy keyCertOptions (KeyCertOptions keyCertOptions ) {
50+ httpConfig .setKeyCertOptions (keyCertOptions .copy ());
51+ webSocketConfig .setKeyCertOptions (keyCertOptions .copy ());
52+ return this ;
53+ }
54+
55+ public HttpProxy trustOptions (TrustOptions trustOptions ) {
56+ httpConfig .setTrustOptions (trustOptions .copy ());
57+ webSocketConfig .setTrustOptions (trustOptions .copy ());
58+ return this ;
59+ }
60+
61+ public HttpProxy protocol (HttpVersion version ) {
62+ httpConfig .setProtocolVersion (version );
63+ return this ;
64+ }
65+
66+ public HttpProxy ssl (boolean ssl ) {
67+ httpConfig .setSsl (ssl );
68+ webSocketConfig .setSsl (ssl );
69+ return this ;
70+ }
71+
3972 public void start () throws Exception {
40- HttpServer server = vertx .createHttpServer (new HttpServerOptions ()
41- .setSsl (true )
42- .setKeyCertOptions (new JksOptions ()
43- .setPath ("server-keystore.jks" )
44- .setPassword ("wibble" ))
45- );
73+ this .httpClient = vertx .createHttpClient (httpConfig );
74+ this .webSocketClient = vertx .createWebSocketClient (webSocketConfig );
75+ server = vertx .createHttpServer ();
4676 server .requestHandler (this ::handleRequest );
4777 server
4878 .listen (port )
@@ -52,18 +82,22 @@ public void start() throws Exception {
5282 }
5383
5484 private void handleRequest (HttpServerRequest serverRequest ) {
55- if (serverRequest .getHeader ("upgrade" ) != null ) {
85+ Predicate <HttpServerRequest > handler = requestHandler ;
86+ if (!handler .test (serverRequest )) {
87+ return ;
88+ }
89+ if (serverRequest .canUpgradeToWebSocket ()) {
5690 WebSocketConnectOptions options = new WebSocketConnectOptions ();
57- options .setServer (origin );
5891 options .setURI (serverRequest .uri ());
5992 serverRequest .pause ();
60- wsClient .connect (options ).onComplete (ar -> {
93+ webSocketClient .connect (options ).onComplete (ar -> {
6194 if (ar .succeeded ()) {
6295 WebSocket wsc = ar .result ();
6396 AtomicBoolean closed = new AtomicBoolean ();
6497 wsc .closeHandler (v -> {
6598 closed .set (true );
6699 });
100+ wsc .pause ();
67101 serverRequest .toWebSocket ().onComplete (ar2 -> {
68102 if (!closed .get ()) {
69103 if (ar2 .succeeded ()) {
@@ -83,6 +117,7 @@ private void handleRequest(HttpServerRequest serverRequest) {
83117 wss .close ();
84118 });
85119 webSockets .add (wss );
120+ wsc .resume ();
86121 } else {
87122 wsc .close ();
88123 }
@@ -98,37 +133,35 @@ private void handleRequest(HttpServerRequest serverRequest) {
98133 });
99134 } else {
100135 RequestOptions options = new RequestOptions ()
101- .setServer (origin )
102136 .setMethod (serverRequest .method ())
103137 .setURI (serverRequest .uri ());
104- serverRequest .body ().onComplete (ar_ -> {
105- if (ar_ .succeeded ()) {
106- httpClient .request (options ).onComplete (ar -> {
138+ serverRequest .body ().onSuccess (requestBody -> {
139+ httpClient
140+ .request (options )
141+ .compose (clientRequest -> clientRequest
142+ .send (requestBody )
143+ .compose (clientResponse -> {
144+ serverRequest .response ().setStatusCode (clientResponse .statusCode ());
145+ return clientResponse .body ();
146+ }))
147+ .onComplete (ar -> {
107148 if (ar .succeeded ()) {
108- HttpClientRequest clientRequest = ar .result ();
109- clientRequest .send (ar_ .result ()).onComplete (ar2 -> {
110- if (ar2 .succeeded ()) {
111- HttpClientResponse clientResponse = ar2 .result ();
112- HttpServerResponse serverResponse = serverRequest .response ();
113- serverResponse .putHeader (HttpHeaders .CONTENT_LENGTH , clientResponse .getHeader (HttpHeaders .CONTENT_LENGTH ));
114- clientResponse .pipeTo (serverResponse );
115- } else {
116- serverRequest .response ().setStatusCode (500 ).end ();
117- }
118- });
149+ serverRequest .response ().end (ar .result ());
119150 } else {
120- ar .cause ().printStackTrace ();
121- serverRequest .response ().reset ();
151+ serverRequest .response ().setStatusCode (500 ).end ();
122152 }
123153 });
124- } else {
125- // Nothing to do ? (compose?)
126- }
127154 });
128155 }
129156 }
130157
131158 public Set <WebSocketBase > webSockets () {
132159 return webSockets ;
133160 }
161+
162+ public void close () {
163+ webSocketClient .close ().await ();
164+ httpClient .close ().await ();
165+ server .close ().await ();
166+ }
134167}
0 commit comments