Skip to content

Commit 63578ec

Browse files
committed
Return a blocking stream instead of an iterable
1 parent c438ac8 commit 63578ec

File tree

5 files changed

+25
-32
lines changed

5 files changed

+25
-32
lines changed

vertx-grpc-docs/src/main/java/examples/GreeterClient.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import io.vertx.grpc.common.ServiceMethod;
1414
import io.vertx.grpc.common.GrpcMessageDecoder;
1515
import io.vertx.grpc.common.GrpcMessageEncoder;
16+
import java.util.stream.Stream;
1617

1718
/**
1819
* <p>A client for invoking the Greeter gRPC service.</p>

vertx-grpc-docs/src/main/java/examples/StreamingClient.java

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import io.vertx.grpc.common.ServiceMethod;
1414
import io.vertx.grpc.common.GrpcMessageDecoder;
1515
import io.vertx.grpc.common.GrpcMessageEncoder;
16+
import java.util.stream.Stream;
1617

1718
/**
1819
* <p>A client for invoking the Streaming gRPC service.</p>
@@ -117,7 +118,7 @@ static StreamingClient create(GrpcClient client, SocketAddress host, io.vertx.gr
117118
Future<ReadStream<examples.Item>> source(examples.Empty request);
118119

119120
@io.vertx.codegen.annotations.GenIgnore
120-
Iterable<examples.Item> source_sync(examples.Empty request);
121+
Stream<examples.Item> source_sync(examples.Empty request);
121122

122123
/**
123124
* Calls the Sink RPC service method.
@@ -208,28 +209,19 @@ public Future<ReadStream<examples.Item>> source(examples.Empty request) {
208209
});
209210
}
210211

211-
public Iterable<examples.Item> source_sync(examples.Empty request) {
212-
java.util.Iterator<examples.Item> iterator = source_(request)
212+
public Stream<examples.Item> source_sync(examples.Empty request) {
213+
Stream<examples.Item> iterator = source_(request)
213214
.compose(req -> {
214215
req.end(request);
215216
return req.response().compose(resp -> {
216217
if (resp.status() != null && resp.status() != GrpcStatus.OK) {
217218
return Future.failedFuture("Invalid gRPC status " + resp.status());
218219
} else {
219-
return Future.succeededFuture(io.vertx.core.internal.streams.ReadStreamIterator.iterator(resp));
220+
return Future.succeededFuture(resp.blockingStream());
220221
}
221222
});
222223
}).await();
223-
return new Iterable<>() {
224-
boolean consumed = false;
225-
public java.util.Iterator<examples.Item> iterator() {
226-
if (consumed) {
227-
throw new IllegalStateException();
228-
}
229-
consumed = true;
230-
return iterator;
231-
}
232-
};
224+
return iterator;
233225
}
234226

235227
public Future<GrpcClientRequest<examples.Empty, examples.Item>> source_(examples.Empty request) {

vertx-grpc-it/src/test/java/io/vertx/grpc/it/DeadlineTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616
import io.grpc.examples.helloworld.HelloRequest;
1717
import io.vertx.core.Future;
1818
import io.vertx.core.http.HttpServer;
19+
import io.vertx.core.internal.streams.ReadStreamIterator;
1920
import io.vertx.core.net.SocketAddress;
21+
import io.vertx.core.streams.ReadStream;
2022
import io.vertx.ext.unit.Async;
2123
import io.vertx.ext.unit.TestContext;
2224
import io.vertx.grpc.client.GrpcClient;
@@ -28,7 +30,9 @@
2830
import io.vertx.grpc.server.GrpcServerOptions;
2931
import org.junit.Test;
3032

33+
import java.util.Spliterators;
3134
import java.util.concurrent.TimeUnit;
35+
import java.util.stream.StreamSupport;
3236

3337
import static io.grpc.examples.helloworld.GreeterService.SayHello;
3438

@@ -39,6 +43,11 @@ public class DeadlineTest extends ProxyTestBase {
3943

4044
@Test
4145
public void testAutomaticPropagation(TestContext should) {
46+
47+
ReadStream<String> rs = null;
48+
49+
StreamSupport.stream(() -> Spliterators.spliteratorUnknownSize(ReadStreamIterator.iterator(rs), 0), 0, false);
50+
4251
Async latch = should.async(3);
4352
GrpcClientOptions clientOptions = new GrpcClientOptions().setScheduleDeadlineAutomatically(true);
4453
GrpcServerOptions serverOptions = new GrpcServerOptions().setDeadlinePropagation(true);

vertx-grpc-it/src/test/java/io/vertx/grpc/it/ProtocPluginTest.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.util.ArrayList;
3939
import java.util.List;
4040
import java.util.concurrent.TimeUnit;
41+
import java.util.stream.Stream;
4142

4243
import static io.grpc.examples.helloworld.GreeterService.SayHello;
4344
import static io.grpc.testing.integration.TestServiceService.*;
@@ -407,12 +408,10 @@ public ReadStream<Messages.StreamingOutputCallResponse> streamingOutputCall(Mess
407408
Messages.StreamingOutputCallRequest request = Messages.StreamingOutputCallRequest.newBuilder()
408409
.setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFrom("StreamingOutputRequest", StandardCharsets.UTF_8)).build())
409410
.build();
410-
Iterable<Messages.StreamingOutputCallResponse> res = client.streamingOutputCall_sync(request);
411+
Stream<Messages.StreamingOutputCallResponse> res = client.streamingOutputCall_sync(request);
411412
Thread.sleep(100);
412413
List<Messages.StreamingOutputCallResponse> list = new ArrayList<>();
413-
for (Messages.StreamingOutputCallResponse msg : res) {
414-
list.add(msg);
415-
}
414+
res.forEach(msg -> list.add(msg));
416415
assertEquals(2, list.size());
417416
}
418417

vertx-grpc-protoc-plugin2/src/main/resources/client.mustache

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import io.vertx.grpc.common.ServiceName;
1515
import io.vertx.grpc.common.ServiceMethod;
1616
import io.vertx.grpc.common.GrpcMessageDecoder;
1717
import io.vertx.grpc.common.GrpcMessageEncoder;
18+
import java.util.stream.Stream;
1819

1920
/**
2021
* <p>A client for invoking the {{serviceName}} gRPC service.</p>
@@ -100,7 +101,7 @@ public interface {{className}} {
100101
Future<ReadStream<{{outputType}}>> {{vertxMethodName}}({{inputType}} request);
101102

102103
@io.vertx.codegen.annotations.GenIgnore
103-
Iterable<{{outputType}}> {{vertxMethodName}}_sync({{inputType}} request);
104+
Stream<{{outputType}}> {{vertxMethodName}}_sync({{inputType}} request);
104105
{{/unaryManyMethods}}
105106
{{#manyUnaryMethods}}
106107

@@ -221,28 +222,19 @@ class {{className}}Impl implements {{className}} {
221222
});
222223
}
223224

224-
public Iterable<{{outputType}}> {{vertxMethodName}}_sync({{inputType}} request) {
225-
java.util.Iterator<{{outputType}}> iterator = {{vertxMethodName}}_(request)
225+
public Stream<{{outputType}}> {{vertxMethodName}}_sync({{inputType}} request) {
226+
Stream<{{outputType}}> iterator = {{vertxMethodName}}_(request)
226227
.compose(req -> {
227228
req.end(request);
228229
return req.response().compose(resp -> {
229230
if (resp.status() != null && resp.status() != GrpcStatus.OK) {
230231
return Future.failedFuture("Invalid gRPC status " + resp.status());
231232
} else {
232-
return Future.succeededFuture(io.vertx.core.internal.streams.ReadStreamIterator.iterator(resp));
233+
return Future.succeededFuture(resp.blockingStream());
233234
}
234235
});
235236
}).await();
236-
return new Iterable<>() {
237-
boolean consumed = false;
238-
public java.util.Iterator<{{outputType}}> iterator() {
239-
if (consumed) {
240-
throw new IllegalStateException();
241-
}
242-
consumed = true;
243-
return iterator;
244-
}
245-
};
237+
return iterator;
246238
}
247239

248240
public Future<GrpcClientRequest<{{inputType}}, {{outputType}}>> {{vertxMethodName}}_({{inputType}} request) {

0 commit comments

Comments
 (0)