Skip to content

Commit 8237106

Browse files
author
hujia
committed
GrpcServiceBridgeImpl handle HttpClosedException
fixes #101
1 parent 741022c commit 8237106

File tree

3 files changed

+114
-0
lines changed

3 files changed

+114
-0
lines changed

vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServiceBridgeImpl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import io.grpc.ServerServiceDefinition;
2626
import io.grpc.Status;
2727
import io.vertx.core.Vertx;
28+
import io.vertx.core.http.HttpClosedException;
2829
import io.vertx.core.net.SocketAddress;
2930
import io.vertx.grpc.common.GrpcError;
3031
import io.vertx.grpc.common.GrpcStatus;
@@ -152,6 +153,11 @@ void init(ServerCall.Listener<Req> listener) {
152153
listener.onCancel();
153154
}
154155
});
156+
req.exceptionHandler(throwable -> {
157+
if (throwable instanceof HttpClosedException && !closed) {
158+
listener.onCancel();
159+
}
160+
});
155161
readAdapter.init(req, new BridgeMessageDecoder<>(methodDef.getMethodDescriptor().getRequestMarshaller(), decompressor));
156162
writeAdapter.init(req.response(), new BridgeMessageEncoder<>(methodDef.getMethodDescriptor().getResponseMarshaller(), compressor));
157163
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package io.vertx.grpc.server;
2+
3+
import java.io.File;
4+
import java.io.IOException;
5+
import java.util.ArrayList;
6+
import java.util.Collections;
7+
import java.util.List;
8+
import java.util.Objects;
9+
10+
public class ProcessHelper {
11+
12+
private ProcessHelper() {}
13+
14+
public static Process exec(Class<?> main, List<String> args) throws IOException {
15+
16+
List<String> command = new ArrayList<>();
17+
// java binary executable
18+
command.add(System.getProperty("java.home") + File.separator + "bin" + File.separator + "java");
19+
command.add("-cp");
20+
// inherit classpath
21+
command.add(System.getProperty("java.class.path"));
22+
// main class name
23+
command.add(main.getName());
24+
// args
25+
command.addAll(Objects.requireNonNullElse(args, Collections.emptyList()));
26+
27+
return new ProcessBuilder(command).inheritIO().start();
28+
}
29+
}

vertx-grpc-server/src/test/java/io/vertx/grpc/server/ServerBridgeTest.java

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,16 @@
1818
import io.grpc.examples.streaming.Empty;
1919
import io.grpc.examples.streaming.Item;
2020
import io.grpc.examples.streaming.StreamingGrpc;
21+
import io.grpc.examples.streaming.StreamingGrpc.StreamingImplBase;
2122
import io.grpc.protobuf.StatusProto;
2223
import io.grpc.stub.ServerCallStreamObserver;
2324
import io.grpc.stub.StreamObserver;
25+
import io.vertx.core.Promise;
2426
import io.vertx.ext.unit.Async;
2527
import io.vertx.ext.unit.TestContext;
28+
import java.io.IOException;
29+
import java.util.Collections;
30+
import java.util.concurrent.TimeUnit;
2631
import org.junit.Test;
2732

2833
import java.util.concurrent.atomic.AtomicInteger;
@@ -463,4 +468,78 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, Re
463468
HelloReply res = stub.sayHello(request);
464469
should.assertEquals(1, testAttributesStep.get());
465470
}
471+
472+
@Test
473+
public void testCallNetworkInterrupted(TestContext should) throws InterruptedException, IOException {
474+
AtomicInteger requestCount = new AtomicInteger();
475+
Promise<Void> completed = Promise.promise();
476+
Async async = should.async();
477+
478+
StreamingGrpc.StreamingImplBase impl = new StreamingImplBase() {
479+
@Override
480+
public StreamObserver<Item> pipe(StreamObserver<Item> responseObserver) {
481+
return new StreamObserver<Item>() {
482+
@Override
483+
public void onNext(Item item) {
484+
requestCount.incrementAndGet();
485+
}
486+
487+
@Override
488+
public void onError(Throwable throwable) {
489+
completed.fail(throwable);
490+
async.complete();
491+
}
492+
493+
@Override
494+
public void onCompleted() {
495+
completed.complete();
496+
async.complete();
497+
}
498+
};
499+
}
500+
};
501+
502+
GrpcServer server = GrpcServer.server(vertx);
503+
GrpcServiceBridge serverStub = GrpcServiceBridge.bridge(impl);
504+
serverStub.bind(server);
505+
startServer(server);
506+
507+
Process client = ProcessHelper.exec(ServerBridgeTest.class, Collections.singletonList(String.valueOf(port)));
508+
// waiting for doing request
509+
Thread.sleep(1_000);
510+
client.destroy();
511+
client.waitFor();
512+
513+
async.await(20_000);
514+
515+
should.assertEquals(requestCount.get(), 3);
516+
should.assertTrue(completed.future().failed());
517+
}
518+
519+
public static void main(String... args) throws InterruptedException {
520+
StreamObserver<Item> noop = new StreamObserver<Item>() {
521+
@Override public void onNext(Item item) {
522+
523+
}
524+
525+
@Override public void onError(Throwable throwable) {
526+
527+
}
528+
529+
@Override public void onCompleted() {
530+
531+
}
532+
};
533+
534+
Channel channel = ManagedChannelBuilder.forAddress("localhost", Integer.parseInt(args[0])).usePlaintext().build();
535+
StreamingGrpc.StreamingStub stub = StreamingGrpc.newStub(channel);
536+
StreamObserver<Item> requestObserver = stub.pipe(noop);
537+
Item request = Item.newBuilder().setValue("item").build();
538+
requestObserver.onNext(request);
539+
requestObserver.onNext(request);
540+
requestObserver.onNext(request);
541+
542+
// waiting to be killed
543+
Thread.currentThread().join();
544+
}
466545
}

0 commit comments

Comments
 (0)