Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package examples.grpc;

import io.vertx.core.Future;
import io.vertx.core.Completable;
import io.vertx.core.Handler;
import io.vertx.core.net.SocketAddress;
import io.vertx.grpc.client.GrpcClient;
import io.vertx.grpc.client.GrpcClientRequest;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.WriteStream;
import io.vertx.grpc.common.GrpcStatus;
import io.vertx.grpc.common.ServiceName;
import io.vertx.grpc.common.ServiceMethod;
import io.vertx.grpc.common.GrpcMessageDecoder;
import io.vertx.grpc.common.GrpcMessageEncoder;
import java.util.stream.Stream;

/**
* <p>A client for invoking the Greeter gRPC service.</p>
*/
public interface GreeterBlockingClient {

static GreeterBlockingClient create(GreeterClient client) {
return new GreeterBlockingClientImpl(client);
}

}

/**
* The proxy implementation.
*/
class GreeterBlockingClientImpl implements GreeterBlockingClient {

private final GreeterClientInternal client;

GreeterBlockingClientImpl(Greeter client) {
this.client = (GreeterClientInternal)java.util.Objects.requireNonNull(client);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,6 @@ public interface GreeterClient extends Greeter {
@io.vertx.codegen.annotations.GenIgnore(io.vertx.codegen.annotations.GenIgnore.PERMITTED_TYPE)
Future<examples.grpc.HelloReply> sayHello(examples.grpc.HelloRequest request);
}

interface GreeterClientInternal extends GreeterClient {
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ static GreeterGrpcClient create(GrpcClient client, SocketAddress host, io.vertx.
/**
* The proxy implementation.
*/
class GreeterGrpcClientImpl implements GreeterGrpcClient {
class GreeterGrpcClientImpl implements GreeterGrpcClient, GreeterClientInternal {

private final GrpcClient client;
private final SocketAddress socketAddress;
Expand All @@ -72,6 +72,10 @@ class GreeterGrpcClientImpl implements GreeterGrpcClient {
this.wireFormat = java.util.Objects.requireNonNull(wireFormat);
}

public GrpcClient grpcClient() {
return client;
}

public Future<examples.grpc.HelloReply> sayHello(examples.grpc.HelloRequest request) {
return client.request(socketAddress, SayHello).compose(req -> {
req.format(wireFormat);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public static GreeterStub newStub(io.vertx.core.Vertx vertx, io.grpc.Channel cha
}


public static final class GreeterStub extends io.grpc.stub.AbstractStub<GreeterStub> implements GreeterClient {
public static final class GreeterStub extends io.grpc.stub.AbstractStub<GreeterStub> implements GreeterClientInternal {
private final io.vertx.core.internal.ContextInternal context;
private GreeterGrpc.GreeterStub delegateStub;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package examples.grpc;

import io.vertx.core.Future;
import io.vertx.core.Completable;
import io.vertx.core.Handler;
import io.vertx.core.net.SocketAddress;
import io.vertx.grpc.client.GrpcClient;
import io.vertx.grpc.client.GrpcClientRequest;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.WriteStream;
import io.vertx.grpc.common.GrpcStatus;
import io.vertx.grpc.common.ServiceName;
import io.vertx.grpc.common.ServiceMethod;
import io.vertx.grpc.common.GrpcMessageDecoder;
import io.vertx.grpc.common.GrpcMessageEncoder;
import java.util.stream.Stream;

/**
* <p>A client for invoking the Streaming gRPC service.</p>
*/
public interface StreamingBlockingClient {

static StreamingBlockingClient create(StreamingClient client) {
return new StreamingBlockingClientImpl(client);
}


@io.vertx.codegen.annotations.GenIgnore
Stream<examples.grpc.Item> source(examples.grpc.Empty request);

@io.vertx.codegen.annotations.GenIgnore
default examples.grpc.Empty sink(java.util.List<examples.grpc.Item> streamOfMessages) {
return sink(streamOfMessages.iterator());
}

@io.vertx.codegen.annotations.GenIgnore
examples.grpc.Empty sink(java.util.Iterator<examples.grpc.Item> streamOfMessages);
}

/**
* The proxy implementation.
*/
class StreamingBlockingClientImpl implements StreamingBlockingClient {

private final StreamingClientInternal client;

StreamingBlockingClientImpl(Streaming client) {
this.client = (StreamingClientInternal)java.util.Objects.requireNonNull(client);
}

public Stream<examples.grpc.Item> source(examples.grpc.Empty request) {
/*
Stream<examples.grpc.Item> iterator = client.source(request)
.compose(req -> {
req.end(request);
return req.response().compose(resp -> {
if (resp.status() != null && resp.status() != GrpcStatus.OK) {
return Future.failedFuture("Invalid gRPC status " + resp.status());
} else {
return Future.succeededFuture(resp.blockingStream());
}
});
}).await();
return iterator;
*/
throw new UnsupportedOperationException();
}

public examples.grpc.Empty sink(java.util.Iterator<examples.grpc.Item> request) {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,7 @@ default Future<ReadStream<examples.grpc.Item>> pipe(ReadStream<examples.grpc.Ite
});
}
}

interface StreamingClientInternal extends StreamingClient {
void source(examples.grpc.Empty request, Completable<ReadStream<examples.grpc.Item>> completable);
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ static StreamingGrpcClient create(GrpcClient client, SocketAddress host, io.vert
/**
* The proxy implementation.
*/
class StreamingGrpcClientImpl implements StreamingGrpcClient {
class StreamingGrpcClientImpl implements StreamingGrpcClient, StreamingClientInternal {

private final GrpcClient client;
private final SocketAddress socketAddress;
Expand All @@ -92,6 +92,10 @@ class StreamingGrpcClientImpl implements StreamingGrpcClient {
this.wireFormat = java.util.Objects.requireNonNull(wireFormat);
}

public GrpcClient grpcClient() {
return client;
}

public Future<ReadStream<examples.grpc.Item>> source(examples.grpc.Empty request) {
return client.request(socketAddress, Source).compose(req -> {
req.format(wireFormat);
Expand All @@ -106,6 +110,10 @@ public Future<ReadStream<examples.grpc.Item>> source(examples.grpc.Empty request
});
}

public void source(examples.grpc.Empty request, Completable<ReadStream<examples.grpc.Item>> completable) {
throw new UnsupportedOperationException();
}

public Future<examples.grpc.Empty> sink(Completable<WriteStream<examples.grpc.Item>> completable) {
return client.request(socketAddress, Sink)
.andThen((res, err) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public static StreamingStub newStub(io.vertx.core.Vertx vertx, io.grpc.Channel c
}


public static final class StreamingStub extends io.grpc.stub.AbstractStub<StreamingStub> implements StreamingClient {
public static final class StreamingStub extends io.grpc.stub.AbstractStub<StreamingStub> implements StreamingClientInternal {
private final io.vertx.core.internal.ContextInternal context;
private StreamingGrpc.StreamingStub delegateStub;

Expand All @@ -61,6 +61,9 @@ public io.vertx.core.Future<io.vertx.core.streams.ReadStream<examples.grpc.Item>
return io.vertx.grpcio.common.impl.stub.ClientCalls.oneToMany(context, request, delegateStub::source);
}

public void source(examples.grpc.Empty request, io.vertx.core.Completable<io.vertx.core.streams.ReadStream<examples.grpc.Item>> completable) {
io.vertx.grpcio.common.impl.stub.ClientCalls.oneToMany(context, request, completable, delegateStub::source);
}

public io.vertx.core.Future<examples.grpc.Empty> sink(io.vertx.core.Completable<io.vertx.core.streams.WriteStream<examples.grpc.Item>> handler) {
return io.vertx.grpcio.common.impl.stub.ClientCalls.manyToOne(context, handler, delegateStub::sink);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;

import static org.junit.Assert.assertEquals;

Expand Down Expand Up @@ -718,4 +719,41 @@ protected void streamingOutputCall(Messages.StreamingOutputCallRequest request,

async.awaitSuccess(20_000);
}

@Test
public void testUnaryMany_ReadStreamReturn_Blocking() throws Exception {
// Create gRPC Server
GrpcServer grpcServer = grpcServer();
grpcServer.addService(testService(new TestServiceService() {
@Override
protected void streamingOutputCall(Messages.StreamingOutputCallRequest request, WriteStream<Messages.StreamingOutputCallResponse> response) {
System.out.println("write messages");
response.write(Messages.StreamingOutputCallResponse.newBuilder()
.setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFrom("StreamingOutputResponse-1", StandardCharsets.UTF_8)).build())
.build());
response.write(Messages.StreamingOutputCallResponse.newBuilder()
.setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFrom("StreamingOutputResponse-2", StandardCharsets.UTF_8)).build())
.build());
response.end();
}
}));

HttpServer httpServer = vertx.createHttpServer();
httpServer.requestHandler(grpcServer)
.listen(8080).toCompletionStage().toCompletableFuture().get(20, TimeUnit.SECONDS);

// Create gRPC Client
GrpcClient grpcClient = grpcClient();
TestServiceClient client = testClient(grpcClient, SocketAddress.inetSocketAddress(port, "localhost"));
TestServiceBlockingClient blockingClient = TestServiceBlockingClient.create(client);

Messages.StreamingOutputCallRequest request = Messages.StreamingOutputCallRequest.newBuilder()
.setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFrom("StreamingOutputRequest", StandardCharsets.UTF_8)).build())
.build();
Stream<Messages.StreamingOutputCallResponse> res = blockingClient.streamingOutputCall(request);
Thread.sleep(100);
List<Messages.StreamingOutputCallResponse> list = new ArrayList<>();
res.forEach(msg -> list.add(msg));
assertEquals(2, list.size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ private List<PluginProtos.CodeGeneratorResponse.File> buildFiles(ServiceContext
if (generateGrpcClient) {
files.add(buildClientFile(context));
files.add(buildGrpcClientFile(context));
files.add(buildBlockingClientFile(context));;
}
if (generateGrpcService) {
files.add(buildServiceFile(context));
Expand Down Expand Up @@ -362,6 +363,12 @@ private PluginProtos.CodeGeneratorResponse.File buildGrpcClientFile(ServiceConte
return buildFile(context, applyTemplate("grpc-client.mustache", context));
}

private PluginProtos.CodeGeneratorResponse.File buildBlockingClientFile(ServiceContext context) {
context.fileName = context.serviceName + "BlockingClient.java";
context.className = context.serviceName + "BlockingClient";
return buildFile(context, applyTemplate("blocking-client.mustache", context));
}

private PluginProtos.CodeGeneratorResponse.File buildServiceFile(ServiceContext context) {
context.fileName = context.serviceName + "Service.java";
context.className = context.serviceName + "Service";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
{{#javaPackageName}}
package {{javaPackageName}};
{{/javaPackageName}}

import io.vertx.core.Future;
import io.vertx.core.Completable;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.net.SocketAddress;
import io.vertx.grpc.client.GrpcClient;
import io.vertx.grpc.client.GrpcClientRequest;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.WriteStream;
import io.vertx.grpc.common.GrpcStatus;
import io.vertx.grpc.common.ServiceName;
import io.vertx.grpc.common.ServiceMethod;
import io.vertx.grpc.common.GrpcMessageDecoder;
import io.vertx.grpc.common.GrpcMessageEncoder;
import java.util.stream.Stream;

/**
* <p>A client for invoking the {{serviceName}} gRPC service.</p>
*/
public interface {{className}} {

static {{className}} create({{serviceName}}Client client) {
return new {{className}}Impl(client);
}

{{#unaryUnaryMethods}}
{{outputType}} {{vertxMethodName}}({{inputType}} request);
{{/unaryUnaryMethods}}
{{#unaryManyMethods}}
Stream<{{outputType}}> {{vertxMethodName}}({{inputType}} request);
{{/unaryManyMethods}}
{{#manyUnaryMethods}}
default {{outputType}} {{vertxMethodName}}(java.util.List<{{inputType}}> streamOfMessages) {
return {{vertxMethodName}}(streamOfMessages.iterator());
}
{{outputType}} {{vertxMethodName}}(java.util.Iterator<{{inputType}}> streamOfMessages);
{{/manyUnaryMethods}}
}

/**
* The proxy implementation.
*/
class {{className}}Impl implements {{className}} {

private final {{serviceName}}ClientInternal client;

{{className}}Impl({{serviceName}} client) {
this.client = ({{serviceName}}ClientInternal)java.util.Objects.requireNonNull(client);
}
{{#unaryUnaryMethods}}

public {{outputType}} {{vertxMethodName}}({{inputType}} request) {
return client.{{vertxMethodName}}(request).await();
}
{{/unaryUnaryMethods}}
{{#unaryManyMethods}}

public Stream<{{outputType}}> {{vertxMethodName}}({{inputType}} request) {
Promise<Stream<{{outputType}}>> promise = Promise.promise();
client.{{vertxMethodName}}(request, (resp, err) -> {
if (err == null) {
promise.complete(resp.blockingStream());
} else {
// Todo
}
});
return promise.future().await();
}
{{/unaryManyMethods}}
{{#manyUnaryMethods}}

public {{outputType}} {{vertxMethodName}}(java.util.Iterator<{{inputType}}> request) {
throw new UnsupportedOperationException();
}
{{/manyUnaryMethods}}
}
6 changes: 6 additions & 0 deletions vertx-grpc-protoc-plugin2/src/main/resources/client.mustache
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,9 @@ public interface {{className}} extends {{serviceName}} {
}
{{/manyManyMethods}}
}

interface {{className}}Internal extends {{className}} {
{{#unaryManyMethods}}
void {{vertxMethodName}}({{inputType}} request, Completable<ReadStream<{{outputType}}>> completable);
{{/unaryManyMethods}}
}
Loading
Loading