2323import io .grpc .stub .ServerCallStreamObserver ;
2424import io .grpc .stub .StreamObserver ;
2525import io .vertx .core .Promise ;
26+ import io .vertx .core .Vertx ;
27+ import io .vertx .core .net .NetClient ;
28+ import io .vertx .core .net .NetServer ;
29+ import io .vertx .core .net .NetSocket ;
2630import io .vertx .ext .unit .Async ;
2731import io .vertx .ext .unit .TestContext ;
2832import java .io .IOException ;
33+ import java .net .InetSocketAddress ;
34+ import java .net .SocketAddress ;
35+ import java .util .ArrayList ;
2936import java .util .Collections ;
30- import java .util .concurrent .TimeUnit ;
37+ import java .util .List ;
38+ import java .util .Map ;
39+ import org .jetbrains .annotations .Nullable ;
3140import org .junit .Test ;
3241
3342import java .util .concurrent .atomic .AtomicInteger ;
@@ -478,7 +487,7 @@ public void testCallNetworkInterrupted(TestContext should) throws InterruptedExc
478487 StreamingGrpc .StreamingImplBase impl = new StreamingImplBase () {
479488 @ Override
480489 public StreamObserver <Item > pipe (StreamObserver <Item > responseObserver ) {
481- return new StreamObserver <Item >() {
490+ return new StreamObserver <>() {
482491 @ Override
483492 public void onNext (Item item ) {
484493 requestCount .incrementAndGet ();
@@ -504,42 +513,92 @@ public void onCompleted() {
504513 serverStub .bind (server );
505514 startServer (server );
506515
507- Process client = ProcessHelper .exec (ServerBridgeTest .class , Collections .singletonList (String .valueOf (port )));
508- // waiting for doing request
509- Thread .sleep (1_000 );
510- client .destroy ();
511- client .waitFor ();
516+ try (var proxyServer = new ProxyServer (vertx , port + 1 , port )) {
517+ proxyServer .start ();
518+
519+ int proxyPort = proxyServer .proxyServer .actualPort ();
520+ Channel channel = ManagedChannelBuilder .forAddress ("localhost" , proxyPort ).usePlaintext ().build ();
521+ StreamingGrpc .StreamingStub stub = StreamingGrpc .newStub (channel );
522+ StreamObserver <Item > requestObserver = stub .pipe (new NoopStreamObserver <>());
523+ Item request = Item .newBuilder ().setValue ("item" ).build ();
524+ requestObserver .onNext (request );
525+ requestObserver .onNext (request );
526+ requestObserver .onNext (request );
527+
528+ // waiting for the connection to be established.
529+ Thread .sleep (1000 );
530+ }
512531
513532 async .await (20_000 );
514533
515534 should .assertEquals (requestCount .get (), 3 );
516535 should .assertTrue (completed .future ().failed ());
517536 }
518537
519- public static void main (String ... args ) throws InterruptedException {
520- StreamObserver <Item > noop = new StreamObserver <Item >() {
521- @ Override public void onNext (Item item ) {
538+ static class NoopStreamObserver <T > implements StreamObserver <T > {
539+ @ Override public void onNext (T ignored ) {}
522540
523- }
541+ @ Override public void onError ( Throwable ignored ) { }
524542
525- @ Override public void onError (Throwable throwable ) {
543+ @ Override public void onCompleted () {}
544+ }
526545
527- }
546+ static class ProxyServer implements AutoCloseable {
528547
529- @ Override public void onCompleted () {
548+ private final int listenPort ;
530549
531- }
532- };
550+ private final int targetPort ;
551+
552+ private final NetServer proxyServer ;
533553
534- Channel channel = ManagedChannelBuilder .forAddress ("localhost" , Integer .parseInt (args [0 ])).usePlaintext ().build ();
535- StreamingGrpc .StreamingStub stub = StreamingGrpc .newStub (channel );
536- StreamObserver <Item > requestObserver = stub .pipe (noop );
537- Item request = Item .newBuilder ().setValue ("item" ).build ();
538- requestObserver .onNext (request );
539- requestObserver .onNext (request );
540- requestObserver .onNext (request );
554+ private final NetClient proxyClient ;
541555
542- // waiting to be killed
543- Thread .currentThread ().join ();
556+ // live or dead
557+ private final List <Map .Entry <NetSocket , NetSocket >> sockets = new ArrayList <>();
558+
559+ ProxyServer (Vertx vertx , int listenPort , int targetPort ) {
560+ this .listenPort = listenPort ;
561+ this .targetPort = targetPort ;
562+ this .proxyServer = vertx .createNetServer ().connectHandler (this ::handle );
563+ this .proxyClient = vertx .createNetClient ();
564+ }
565+
566+ void start () {
567+ this .proxyServer .listen (listenPort ).toCompletionStage ().toCompletableFuture ().join ();
568+ }
569+
570+ void handle (NetSocket socket ) {
571+ socket .pause ();
572+
573+ proxyClient .connect (targetPort , "localhost" )
574+ .onComplete (ar -> {
575+ if (ar .succeeded ()) {
576+ NetSocket proxySocket = ar .result ();
577+ proxySocket .pause ();
578+
579+ socket .handler (proxySocket ::write );
580+ proxySocket .handler (socket ::write );
581+ socket .closeHandler (ignored -> proxySocket .close ());
582+ proxySocket .closeHandler (ignored -> socket .close ());
583+
584+ sockets .add (Map .entry (socket , proxySocket ));
585+
586+ proxySocket .resume ();
587+ socket .resume ();
588+ } else {
589+ socket .close ();
590+ }
591+ });
592+ }
593+
594+ @ Override
595+ public void close () {
596+ this .sockets .forEach (entry -> {
597+ entry .getKey ().close ();
598+ entry .getValue ().close ();
599+ });
600+ this .proxyClient .close ();
601+ this .proxyServer .close ();
602+ }
544603 }
545604}
0 commit comments