Skip to content

Commit 4d38b19

Browse files
committed
Virtual thread blocking API generation.
1 parent 30db076 commit 4d38b19

File tree

15 files changed

+329
-9
lines changed

15 files changed

+329
-9
lines changed
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package examples.grpc;
2+
3+
import io.vertx.core.Future;
4+
import io.vertx.core.Completable;
5+
import io.vertx.core.Handler;
6+
import io.vertx.core.net.SocketAddress;
7+
import io.vertx.grpc.client.GrpcClient;
8+
import io.vertx.grpc.client.GrpcClientRequest;
9+
import io.vertx.core.streams.ReadStream;
10+
import io.vertx.core.streams.WriteStream;
11+
import io.vertx.grpc.common.GrpcStatus;
12+
import io.vertx.grpc.common.ServiceName;
13+
import io.vertx.grpc.common.ServiceMethod;
14+
import io.vertx.grpc.common.GrpcMessageDecoder;
15+
import io.vertx.grpc.common.GrpcMessageEncoder;
16+
import java.util.stream.Stream;
17+
18+
/**
19+
* <p>A client for invoking the Greeter gRPC service.</p>
20+
*/
21+
public interface GreeterBlockingClient {
22+
23+
static GreeterBlockingClient create(GreeterClient client) {
24+
return new GreeterBlockingClientImpl(client);
25+
}
26+
27+
}
28+
29+
/**
30+
* The proxy implementation.
31+
*/
32+
class GreeterBlockingClientImpl implements GreeterBlockingClient {
33+
34+
private final GreeterClientInternal client;
35+
36+
GreeterBlockingClientImpl(Greeter client) {
37+
this.client = (GreeterClientInternal)java.util.Objects.requireNonNull(client);
38+
}
39+
}

vertx-grpc-docs/src/main/java/examples/grpc/GreeterClient.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,6 @@ public interface GreeterClient extends Greeter {
2828
@io.vertx.codegen.annotations.GenIgnore(io.vertx.codegen.annotations.GenIgnore.PERMITTED_TYPE)
2929
Future<examples.grpc.HelloReply> sayHello(examples.grpc.HelloRequest request);
3030
}
31+
32+
interface GreeterClientInternal extends GreeterClient {
33+
}

vertx-grpc-docs/src/main/java/examples/grpc/GreeterGrpcClient.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ static GreeterGrpcClient create(GrpcClient client, SocketAddress host, io.vertx.
5656
/**
5757
* The proxy implementation.
5858
*/
59-
class GreeterGrpcClientImpl implements GreeterGrpcClient {
59+
class GreeterGrpcClientImpl implements GreeterGrpcClient, GreeterClientInternal {
6060

6161
private final GrpcClient client;
6262
private final SocketAddress socketAddress;
@@ -72,6 +72,10 @@ class GreeterGrpcClientImpl implements GreeterGrpcClient {
7272
this.wireFormat = java.util.Objects.requireNonNull(wireFormat);
7373
}
7474

75+
public GrpcClient grpcClient() {
76+
return client;
77+
}
78+
7579
public Future<examples.grpc.HelloReply> sayHello(examples.grpc.HelloRequest request) {
7680
return client.request(socketAddress, SayHello).compose(req -> {
7781
req.format(wireFormat);

vertx-grpc-docs/src/main/java/examples/grpc/GreeterGrpcIo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public static GreeterStub newStub(io.vertx.core.Vertx vertx, io.grpc.Channel cha
3535
}
3636

3737

38-
public static final class GreeterStub extends io.grpc.stub.AbstractStub<GreeterStub> implements GreeterClient {
38+
public static final class GreeterStub extends io.grpc.stub.AbstractStub<GreeterStub> implements GreeterClientInternal {
3939
private final io.vertx.core.internal.ContextInternal context;
4040
private GreeterGrpc.GreeterStub delegateStub;
4141

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package examples.grpc;
2+
3+
import io.vertx.core.Future;
4+
import io.vertx.core.Completable;
5+
import io.vertx.core.Handler;
6+
import io.vertx.core.net.SocketAddress;
7+
import io.vertx.grpc.client.GrpcClient;
8+
import io.vertx.grpc.client.GrpcClientRequest;
9+
import io.vertx.core.streams.ReadStream;
10+
import io.vertx.core.streams.WriteStream;
11+
import io.vertx.grpc.common.GrpcStatus;
12+
import io.vertx.grpc.common.ServiceName;
13+
import io.vertx.grpc.common.ServiceMethod;
14+
import io.vertx.grpc.common.GrpcMessageDecoder;
15+
import io.vertx.grpc.common.GrpcMessageEncoder;
16+
import java.util.stream.Stream;
17+
18+
/**
19+
* <p>A client for invoking the Streaming gRPC service.</p>
20+
*/
21+
public interface StreamingBlockingClient {
22+
23+
static StreamingBlockingClient create(StreamingClient client) {
24+
return new StreamingBlockingClientImpl(client);
25+
}
26+
27+
28+
@io.vertx.codegen.annotations.GenIgnore
29+
Stream<examples.grpc.Item> source(examples.grpc.Empty request);
30+
31+
@io.vertx.codegen.annotations.GenIgnore
32+
default examples.grpc.Empty sink(java.util.List<examples.grpc.Item> streamOfMessages) {
33+
return sink(streamOfMessages.iterator());
34+
}
35+
36+
@io.vertx.codegen.annotations.GenIgnore
37+
examples.grpc.Empty sink(java.util.Iterator<examples.grpc.Item> streamOfMessages);
38+
}
39+
40+
/**
41+
* The proxy implementation.
42+
*/
43+
class StreamingBlockingClientImpl implements StreamingBlockingClient {
44+
45+
private final StreamingClientInternal client;
46+
47+
StreamingBlockingClientImpl(Streaming client) {
48+
this.client = (StreamingClientInternal)java.util.Objects.requireNonNull(client);
49+
}
50+
51+
public Stream<examples.grpc.Item> source(examples.grpc.Empty request) {
52+
/*
53+
Stream<examples.grpc.Item> iterator = client.source(request)
54+
.compose(req -> {
55+
req.end(request);
56+
return req.response().compose(resp -> {
57+
if (resp.status() != null && resp.status() != GrpcStatus.OK) {
58+
return Future.failedFuture("Invalid gRPC status " + resp.status());
59+
} else {
60+
return Future.succeededFuture(resp.blockingStream());
61+
}
62+
});
63+
}).await();
64+
return iterator;
65+
*/
66+
throw new UnsupportedOperationException();
67+
}
68+
69+
public examples.grpc.Empty sink(java.util.Iterator<examples.grpc.Item> request) {
70+
throw new UnsupportedOperationException();
71+
}
72+
}

vertx-grpc-docs/src/main/java/examples/grpc/StreamingClient.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,3 +82,7 @@ default Future<ReadStream<examples.grpc.Item>> pipe(ReadStream<examples.grpc.Ite
8282
});
8383
}
8484
}
85+
86+
interface StreamingClientInternal extends StreamingClient {
87+
void source(examples.grpc.Empty request, Completable<ReadStream<examples.grpc.Item>> completable);
88+
}

vertx-grpc-docs/src/main/java/examples/grpc/StreamingGrpcClient.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ static StreamingGrpcClient create(GrpcClient client, SocketAddress host, io.vert
7676
/**
7777
* The proxy implementation.
7878
*/
79-
class StreamingGrpcClientImpl implements StreamingGrpcClient {
79+
class StreamingGrpcClientImpl implements StreamingGrpcClient, StreamingClientInternal {
8080

8181
private final GrpcClient client;
8282
private final SocketAddress socketAddress;
@@ -92,6 +92,10 @@ class StreamingGrpcClientImpl implements StreamingGrpcClient {
9292
this.wireFormat = java.util.Objects.requireNonNull(wireFormat);
9393
}
9494

95+
public GrpcClient grpcClient() {
96+
return client;
97+
}
98+
9599
public Future<ReadStream<examples.grpc.Item>> source(examples.grpc.Empty request) {
96100
return client.request(socketAddress, Source).compose(req -> {
97101
req.format(wireFormat);
@@ -106,6 +110,10 @@ public Future<ReadStream<examples.grpc.Item>> source(examples.grpc.Empty request
106110
});
107111
}
108112

113+
public void source(examples.grpc.Empty request, Completable<ReadStream<examples.grpc.Item>> completable) {
114+
throw new UnsupportedOperationException();
115+
}
116+
109117
public Future<examples.grpc.Empty> sink(Completable<WriteStream<examples.grpc.Item>> completable) {
110118
return client.request(socketAddress, Sink)
111119
.andThen((res, err) -> {

vertx-grpc-docs/src/main/java/examples/grpc/StreamingGrpcIo.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public static StreamingStub newStub(io.vertx.core.Vertx vertx, io.grpc.Channel c
3535
}
3636

3737

38-
public static final class StreamingStub extends io.grpc.stub.AbstractStub<StreamingStub> implements StreamingClient {
38+
public static final class StreamingStub extends io.grpc.stub.AbstractStub<StreamingStub> implements StreamingClientInternal {
3939
private final io.vertx.core.internal.ContextInternal context;
4040
private StreamingGrpc.StreamingStub delegateStub;
4141

@@ -61,6 +61,9 @@ public io.vertx.core.Future<io.vertx.core.streams.ReadStream<examples.grpc.Item>
6161
return io.vertx.grpcio.common.impl.stub.ClientCalls.oneToMany(context, request, delegateStub::source);
6262
}
6363

64+
public void source(examples.grpc.Empty request, io.vertx.core.Completable<io.vertx.core.streams.ReadStream<examples.grpc.Item>> completable) {
65+
io.vertx.grpcio.common.impl.stub.ClientCalls.oneToMany(context, request, completable, delegateStub::source);
66+
}
6467

6568
public io.vertx.core.Future<examples.grpc.Empty> sink(io.vertx.core.Completable<io.vertx.core.streams.WriteStream<examples.grpc.Item>> handler) {
6669
return io.vertx.grpcio.common.impl.stub.ClientCalls.manyToOne(context, handler, delegateStub::sink);

vertx-grpc-it/src/test/java/io/vertx/grpc/it/ProtocPluginTestBase.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.util.List;
3838
import java.util.concurrent.TimeUnit;
3939
import java.util.concurrent.atomic.AtomicInteger;
40+
import java.util.stream.Stream;
4041

4142
import static org.junit.Assert.assertEquals;
4243

@@ -718,4 +719,41 @@ protected void streamingOutputCall(Messages.StreamingOutputCallRequest request,
718719

719720
async.awaitSuccess(20_000);
720721
}
722+
723+
@Test
724+
public void testUnaryMany_ReadStreamReturn_Blocking() throws Exception {
725+
// Create gRPC Server
726+
GrpcServer grpcServer = grpcServer();
727+
grpcServer.addService(testService(new TestServiceService() {
728+
@Override
729+
protected void streamingOutputCall(Messages.StreamingOutputCallRequest request, WriteStream<Messages.StreamingOutputCallResponse> response) {
730+
System.out.println("write messages");
731+
response.write(Messages.StreamingOutputCallResponse.newBuilder()
732+
.setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFrom("StreamingOutputResponse-1", StandardCharsets.UTF_8)).build())
733+
.build());
734+
response.write(Messages.StreamingOutputCallResponse.newBuilder()
735+
.setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFrom("StreamingOutputResponse-2", StandardCharsets.UTF_8)).build())
736+
.build());
737+
response.end();
738+
}
739+
}));
740+
741+
HttpServer httpServer = vertx.createHttpServer();
742+
httpServer.requestHandler(grpcServer)
743+
.listen(8080).toCompletionStage().toCompletableFuture().get(20, TimeUnit.SECONDS);
744+
745+
// Create gRPC Client
746+
GrpcClient grpcClient = grpcClient();
747+
TestServiceClient client = testClient(grpcClient, SocketAddress.inetSocketAddress(port, "localhost"));
748+
TestServiceBlockingClient blockingClient = TestServiceBlockingClient.create(client);
749+
750+
Messages.StreamingOutputCallRequest request = Messages.StreamingOutputCallRequest.newBuilder()
751+
.setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFrom("StreamingOutputRequest", StandardCharsets.UTF_8)).build())
752+
.build();
753+
Stream<Messages.StreamingOutputCallResponse> res = blockingClient.streamingOutputCall(request);
754+
Thread.sleep(100);
755+
List<Messages.StreamingOutputCallResponse> list = new ArrayList<>();
756+
res.forEach(msg -> list.add(msg));
757+
assertEquals(2, list.size());
758+
}
721759
}

vertx-grpc-protoc-plugin2/src/main/java/io/vertx/grpc/plugin/VertxGrpcGeneratorImpl.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,7 @@ private List<PluginProtos.CodeGeneratorResponse.File> buildFiles(ServiceContext
333333
if (generateGrpcClient) {
334334
files.add(buildClientFile(context));
335335
files.add(buildGrpcClientFile(context));
336+
files.add(buildBlockingClientFile(context));;
336337
}
337338
if (generateGrpcService) {
338339
files.add(buildServiceFile(context));
@@ -362,6 +363,12 @@ private PluginProtos.CodeGeneratorResponse.File buildGrpcClientFile(ServiceConte
362363
return buildFile(context, applyTemplate("grpc-client.mustache", context));
363364
}
364365

366+
private PluginProtos.CodeGeneratorResponse.File buildBlockingClientFile(ServiceContext context) {
367+
context.fileName = context.serviceName + "BlockingClient.java";
368+
context.className = context.serviceName + "BlockingClient";
369+
return buildFile(context, applyTemplate("blocking-client.mustache", context));
370+
}
371+
365372
private PluginProtos.CodeGeneratorResponse.File buildServiceFile(ServiceContext context) {
366373
context.fileName = context.serviceName + "Service.java";
367374
context.className = context.serviceName + "Service";

0 commit comments

Comments
 (0)