Skip to content

Commit 50b88c2

Browse files
committed
Implement generation of unary request
1 parent 8c286e5 commit 50b88c2

File tree

4 files changed

+140
-20
lines changed

4 files changed

+140
-20
lines changed

vertx-grpc-docs/src/main/java/examples/grpc/GreeterClient.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import io.vertx.core.Handler;
66
import io.vertx.core.net.SocketAddress;
77
import io.vertx.grpc.client.GrpcClient;
8+
import io.vertx.grpc.client.GrpcClientRequest;
89
import io.vertx.core.streams.ReadStream;
910
import io.vertx.core.streams.WriteStream;
1011
import io.vertx.grpc.common.GrpcStatus;
@@ -76,6 +77,9 @@ static GreeterClient create(GrpcClient client, SocketAddress host, io.vertx.grpc
7677
*/
7778
@io.vertx.codegen.annotations.GenIgnore(io.vertx.codegen.annotations.GenIgnore.PERMITTED_TYPE)
7879
Future<examples.grpc.HelloReply> sayHello(examples.grpc.HelloRequest request);
80+
81+
@io.vertx.codegen.annotations.GenIgnore
82+
examples.grpc.HelloReply sayHello_sync(examples.grpc.HelloRequest request);
7983
}
8084

8185
/**
@@ -114,4 +118,8 @@ public Future<examples.grpc.HelloReply> sayHello(examples.grpc.HelloRequest requ
114118
return req.response().compose(resp -> resp.last());
115119
});
116120
}
121+
122+
public examples.grpc.HelloReply sayHello_sync(examples.grpc.HelloRequest request) {
123+
return sayHello(request).await();
124+
}
117125
}

vertx-grpc-docs/src/main/java/examples/grpc/StreamingClient.java

Lines changed: 42 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import io.vertx.core.Handler;
66
import io.vertx.core.net.SocketAddress;
77
import io.vertx.grpc.client.GrpcClient;
8+
import io.vertx.grpc.client.GrpcClientRequest;
89
import io.vertx.core.streams.ReadStream;
910
import io.vertx.core.streams.WriteStream;
1011
import io.vertx.grpc.common.GrpcStatus;
@@ -115,6 +116,9 @@ static StreamingClient create(GrpcClient client, SocketAddress host, io.vertx.gr
115116
@io.vertx.codegen.annotations.GenIgnore(io.vertx.codegen.annotations.GenIgnore.PERMITTED_TYPE)
116117
Future<ReadStream<examples.grpc.Item>> source(examples.grpc.Empty request);
117118

119+
@io.vertx.codegen.annotations.GenIgnore
120+
Iterable<examples.grpc.Item> source_sync(examples.grpc.Empty request);
121+
118122
/**
119123
* Calls the Sink RPC service method.
120124
*
@@ -172,6 +176,43 @@ class StreamingClientImpl implements StreamingClient {
172176
}
173177

174178
public Future<ReadStream<examples.grpc.Item>> source(examples.grpc.Empty request) {
179+
return source_(request).compose(req -> {
180+
req.end(request);
181+
return req.response().flatMap(resp -> {
182+
if (resp.status() != null && resp.status() != GrpcStatus.OK) {
183+
return Future.failedFuture("Invalid gRPC status " + resp.status());
184+
} else {
185+
return Future.succeededFuture(resp);
186+
}
187+
});
188+
});
189+
}
190+
191+
public Iterable<examples.grpc.Item> source_sync(examples.grpc.Empty request) {
192+
java.util.Iterator<examples.grpc.Item> iterator = source_(request)
193+
.compose(req -> {
194+
req.end(request);
195+
return req.response().compose(resp -> {
196+
if (resp.status() != null && resp.status() != GrpcStatus.OK) {
197+
return Future.failedFuture("Invalid gRPC status " + resp.status());
198+
} else {
199+
return Future.succeededFuture(io.vertx.core.internal.streams.ReadStreamIterator.iterator(resp));
200+
}
201+
});
202+
}).await();
203+
return new Iterable<>() {
204+
boolean consumed = false;
205+
public java.util.Iterator<examples.grpc.Item> iterator() {
206+
if (consumed) {
207+
throw new IllegalStateException();
208+
}
209+
consumed = true;
210+
return iterator;
211+
}
212+
};
213+
}
214+
215+
public Future<GrpcClientRequest<examples.grpc.Empty, examples.grpc.Item>> source_(examples.grpc.Empty request) {
175216
ServiceMethod<examples.grpc.Item, examples.grpc.Empty> serviceMethod;
176217
switch (wireFormat) {
177218
case PROTOBUF:
@@ -183,16 +224,7 @@ public Future<ReadStream<examples.grpc.Item>> source(examples.grpc.Empty request
183224
default:
184225
throw new AssertionError();
185226
}
186-
return client.request(socketAddress, serviceMethod).compose(req -> {
187-
req.end(request);
188-
return req.response().flatMap(resp -> {
189-
if (resp.status() != null && resp.status() != GrpcStatus.OK) {
190-
return Future.failedFuture("Invalid gRPC status " + resp.status());
191-
} else {
192-
return Future.succeededFuture(resp);
193-
}
194-
});
195-
});
227+
return client.request(socketAddress, serviceMethod);
196228
}
197229

198230
public Future<examples.grpc.Empty> sink(Completable<WriteStream<examples.grpc.Item>> completable) {

vertx-grpc-it/src/test/java/io/vertx/grpc/it/ProtocPluginTest.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939
import java.util.List;
4040
import java.util.concurrent.TimeUnit;
4141

42+
import static org.junit.Assert.assertEquals;
43+
4244
public class ProtocPluginTest extends ProxyTestBase {
4345

4446
@Test
@@ -373,6 +375,45 @@ public ReadStream<Messages.StreamingOutputCallResponse> streamingOutputCall(Mess
373375
test.awaitSuccess();
374376
}
375377

378+
@Test
379+
public void testUnaryMany_ReadStreamReturn_Sync(TestContext should) throws Exception {
380+
// Create gRPC Server
381+
GrpcServer grpcServer = GrpcServer.server(vertx);
382+
grpcServer.addService(new TestServiceService() {
383+
@Override
384+
public ReadStream<Messages.StreamingOutputCallResponse> streamingOutputCall(Messages.StreamingOutputCallRequest request) {
385+
FakeStream<Messages.StreamingOutputCallResponse> response = new FakeStream<>();
386+
response.pause();
387+
response.write(Messages.StreamingOutputCallResponse.newBuilder()
388+
.setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFrom("StreamingOutputResponse-1", StandardCharsets.UTF_8)).build())
389+
.build());
390+
response.write(Messages.StreamingOutputCallResponse.newBuilder()
391+
.setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFrom("StreamingOutputResponse-2", StandardCharsets.UTF_8)).build())
392+
.build());
393+
response.end();
394+
return response;
395+
}
396+
});
397+
HttpServer httpServer = vertx.createHttpServer();
398+
httpServer.requestHandler(grpcServer)
399+
.listen(8080).toCompletionStage().toCompletableFuture().get(20, TimeUnit.SECONDS);
400+
401+
// Create gRPC Client
402+
GrpcClient grpcClient = GrpcClient.client(vertx);
403+
TestServiceClient client = TestServiceClient.create(grpcClient, SocketAddress.inetSocketAddress(port, "localhost"));
404+
405+
Messages.StreamingOutputCallRequest request = Messages.StreamingOutputCallRequest.newBuilder()
406+
.setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFrom("StreamingOutputRequest", StandardCharsets.UTF_8)).build())
407+
.build();
408+
Iterable<Messages.StreamingOutputCallResponse> res = client.streamingOutputCall_sync(request);
409+
Thread.sleep(100);
410+
List<Messages.StreamingOutputCallResponse> list = new ArrayList<>();
411+
for (Messages.StreamingOutputCallResponse msg : res) {
412+
list.add(msg);
413+
}
414+
assertEquals(2, list.size());
415+
}
416+
376417
@Test
377418
public void testUnaryMany_ReadStreamReturn_ErrorHandling(TestContext should) throws Exception {
378419
// Create gRPC Server

vertx-grpc-protoc-plugin2/src/main/resources/client.mustache

Lines changed: 49 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import io.vertx.core.Completable;
77
import io.vertx.core.Handler;
88
import io.vertx.core.net.SocketAddress;
99
import io.vertx.grpc.client.GrpcClient;
10+
import io.vertx.grpc.client.GrpcClientRequest;
1011
import io.vertx.core.streams.ReadStream;
1112
import io.vertx.core.streams.WriteStream;
1213
import io.vertx.grpc.common.GrpcStatus;
@@ -83,6 +84,9 @@ public interface {{className}} {
8384
*/
8485
@io.vertx.codegen.annotations.GenIgnore(io.vertx.codegen.annotations.GenIgnore.PERMITTED_TYPE)
8586
Future<{{outputType}}> {{vertxMethodName}}({{inputType}} request);
87+
88+
@io.vertx.codegen.annotations.GenIgnore
89+
{{outputType}} {{vertxMethodName}}_sync({{inputType}} request);
8690
{{/unaryMethods}}
8791
{{#unaryManyMethods}}
8892

@@ -94,6 +98,9 @@ public interface {{className}} {
9498
*/
9599
@io.vertx.codegen.annotations.GenIgnore(io.vertx.codegen.annotations.GenIgnore.PERMITTED_TYPE)
96100
Future<ReadStream<{{outputType}}>> {{vertxMethodName}}({{inputType}} request);
101+
102+
@io.vertx.codegen.annotations.GenIgnore
103+
Iterable<{{outputType}}> {{vertxMethodName}}_sync({{inputType}} request);
97104
{{/unaryManyMethods}}
98105
{{#manyUnaryMethods}}
99106

@@ -174,10 +181,51 @@ class {{className}}Impl implements {{className}} {
174181
return req.response().compose(resp -> resp.last());
175182
});
176183
}
184+
185+
public {{outputType}} {{vertxMethodName}}_sync({{inputType}} request) {
186+
return {{vertxMethodName}}(request).await();
187+
}
177188
{{/unaryMethods}}
178189
{{#unaryManyMethods}}
179190

180191
public Future<ReadStream<{{outputType}}>> {{vertxMethodName}}({{inputType}} request) {
192+
return {{vertxMethodName}}_(request).compose(req -> {
193+
req.end(request);
194+
return req.response().flatMap(resp -> {
195+
if (resp.status() != null && resp.status() != GrpcStatus.OK) {
196+
return Future.failedFuture("Invalid gRPC status " + resp.status());
197+
} else {
198+
return Future.succeededFuture(resp);
199+
}
200+
});
201+
});
202+
}
203+
204+
public Iterable<{{outputType}}> {{vertxMethodName}}_sync({{inputType}} request) {
205+
java.util.Iterator<{{outputType}}> iterator = {{vertxMethodName}}_(request)
206+
.compose(req -> {
207+
req.end(request);
208+
return req.response().compose(resp -> {
209+
if (resp.status() != null && resp.status() != GrpcStatus.OK) {
210+
return Future.failedFuture("Invalid gRPC status " + resp.status());
211+
} else {
212+
return Future.succeededFuture(io.vertx.core.internal.streams.ReadStreamIterator.iterator(resp));
213+
}
214+
});
215+
}).await();
216+
return new Iterable<>() {
217+
boolean consumed = false;
218+
public java.util.Iterator<{{outputType}}> iterator() {
219+
if (consumed) {
220+
throw new IllegalStateException();
221+
}
222+
consumed = true;
223+
return iterator;
224+
}
225+
};
226+
}
227+
228+
public Future<GrpcClientRequest<{{inputType}}, {{outputType}}>> {{vertxMethodName}}_({{inputType}} request) {
181229
ServiceMethod<{{outputType}}, {{inputType}}> serviceMethod;
182230
switch (wireFormat) {
183231
case PROTOBUF:
@@ -189,16 +237,7 @@ class {{className}}Impl implements {{className}} {
189237
default:
190238
throw new AssertionError();
191239
}
192-
return client.request(socketAddress, serviceMethod).compose(req -> {
193-
req.end(request);
194-
return req.response().flatMap(resp -> {
195-
if (resp.status() != null && resp.status() != GrpcStatus.OK) {
196-
return Future.failedFuture("Invalid gRPC status " + resp.status());
197-
} else {
198-
return Future.succeededFuture(resp);
199-
}
200-
});
201-
});
240+
return client.request(socketAddress, serviceMethod);
202241
}
203242
{{/unaryManyMethods}}
204243
{{#manyUnaryMethods}}

0 commit comments

Comments
 (0)