Skip to content

Commit cd4811b

Browse files
committed
Message codec should be capable to handle two wire formats.
Motivation: The initial design of message codec assumed a single wire format that is protobuf. We can reasonnably let codec supports multiple wireformats in order to simplify deployment of call handlers and the generated code. Changes: Message codec are now capable of handling the two wire formats. Code generator and client/server changes have been implemented to support this.
1 parent 862aa2d commit cd4811b

File tree

24 files changed

+215
-436
lines changed

24 files changed

+215
-436
lines changed

vertx-grpc-common/src/main/java/io/vertx/grpc/common/GrpcMessageDecoder.java

Lines changed: 36 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
import com.google.protobuf.InvalidProtocolBufferException;
1414
import com.google.protobuf.Message;
15+
import com.google.protobuf.MessageOrBuilder;
1516
import com.google.protobuf.Parser;
1617
import com.google.protobuf.util.JsonFormat;
1718
import io.vertx.core.buffer.Buffer;
@@ -27,22 +28,37 @@ public interface GrpcMessageDecoder<T> {
2728

2829
/**
2930
* Create a decoder for a given protobuf {@link Parser}.
30-
* @param parser the parser that returns decoded messages of type {@code <T>}
31+
* @param messageOrBuilder the message or builder instance that returns decoded messages of type {@code <T>}
3132
* @return the message decoder
3233
*/
33-
static <T> GrpcMessageDecoder<T> decoder(Parser<T> parser) {
34-
return new GrpcMessageDecoder<T>() {
34+
static <T> GrpcMessageDecoder<T> decoder(MessageOrBuilder messageOrBuilder) {
35+
Message dit = messageOrBuilder.getDefaultInstanceForType();
36+
Parser<T> parser = (Parser<T>) dit.getParserForType();
37+
return new GrpcMessageDecoder<>() {
3538
@Override
3639
public T decode(GrpcMessage msg) throws CodecException {
37-
try {
38-
return parser.parseFrom(msg.payload().getBytes());
39-
} catch (InvalidProtocolBufferException e) {
40-
throw new CodecException(e);
40+
switch (msg.format()) {
41+
case PROTOBUF:
42+
try {
43+
return parser.parseFrom(msg.payload().getBytes());
44+
} catch (InvalidProtocolBufferException e) {
45+
throw new CodecException(e);
46+
}
47+
case JSON:
48+
try {
49+
Message.Builder builder = dit.toBuilder();
50+
JsonFormat.parser().merge(msg.payload().toString(StandardCharsets.UTF_8), builder);
51+
return (T) builder.build();
52+
} catch (InvalidProtocolBufferException e) {
53+
throw new CodecException(e);
54+
}
55+
default:
56+
throw new IllegalArgumentException("Invalid wire format: ");
4157
}
4258
}
4359
@Override
44-
public WireFormat format() {
45-
return WireFormat.PROTOBUF;
60+
public boolean accepts(WireFormat format) {
61+
return true;
4662
}
4763
};
4864
}
@@ -53,8 +69,8 @@ public Buffer decode(GrpcMessage msg) throws CodecException {
5369
return msg.payload();
5470
}
5571
@Override
56-
public WireFormat format() {
57-
return WireFormat.PROTOBUF;
72+
public boolean accepts(WireFormat format) {
73+
return true;
5874
}
5975
};
6076

@@ -76,8 +92,8 @@ public T decode(GrpcMessage msg) throws CodecException {
7692
}
7793
}
7894
@Override
79-
public WireFormat format() {
80-
return WireFormat.JSON;
95+
public boolean accepts(WireFormat format) {
96+
return format == WireFormat.JSON;
8197
}
8298
};
8399
}
@@ -103,8 +119,8 @@ public T decode(GrpcMessage msg) throws CodecException {
103119
}
104120
}
105121
@Override
106-
public WireFormat format() {
107-
return WireFormat.JSON;
122+
public boolean accepts(WireFormat format) {
123+
return format == WireFormat.JSON;
108124
}
109125
};
110126
}
@@ -123,8 +139,8 @@ public JsonObject decode(GrpcMessage msg) throws CodecException {
123139
}
124140
}
125141
@Override
126-
public WireFormat format() {
127-
return WireFormat.JSON;
142+
public boolean accepts(WireFormat format) {
143+
return format == WireFormat.JSON;
128144
}
129145
};
130146

@@ -144,13 +160,13 @@ public Object decode(GrpcMessage msg) throws CodecException {
144160
}
145161
}
146162
@Override
147-
public WireFormat format() {
148-
return WireFormat.JSON;
163+
public boolean accepts(WireFormat format) {
164+
return format == WireFormat.JSON;
149165
}
150166
};
151167

152168
T decode(GrpcMessage msg) throws CodecException;
153169

154-
WireFormat format();
170+
boolean accepts(WireFormat format);
155171

156172
}

vertx-grpc-common/src/main/java/io/vertx/grpc/common/GrpcMessageEncoder.java

Lines changed: 36 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,25 +19,44 @@ public interface GrpcMessageEncoder<T> {
1919
static <T extends MessageLite> GrpcMessageEncoder<T> encoder() {
2020
return new GrpcMessageEncoder<T>() {
2121
@Override
22-
public GrpcMessage encode(T msg) {
23-
byte[] bytes = msg.toByteArray();
24-
return GrpcMessage.message("identity", Buffer.buffer(bytes));
22+
public GrpcMessage encode(T msg, WireFormat format) throws CodecException {
23+
switch (format) {
24+
case PROTOBUF:
25+
byte[] bytes = msg.toByteArray();
26+
return GrpcMessage.message("identity", Buffer.buffer(bytes));
27+
case JSON:
28+
if (msg instanceof MessageOrBuilder) {
29+
MessageOrBuilder mob = (MessageOrBuilder) msg;
30+
try {
31+
String res = JsonFormat.printer().print(mob);
32+
return GrpcMessage.message("identity", WireFormat.JSON, Buffer.buffer(res));
33+
} catch (InvalidProtocolBufferException e) {
34+
throw new CodecException(e);
35+
}
36+
}
37+
return GrpcMessage.message(
38+
"identity",
39+
WireFormat.JSON,
40+
Json.encodeToBuffer(msg));
41+
default:
42+
throw new IllegalArgumentException("Invalid wire format: " + format);
43+
}
2544
}
2645
@Override
27-
public WireFormat format() {
28-
return WireFormat.PROTOBUF;
46+
public boolean accepts(WireFormat format) {
47+
return true;
2948
}
3049
};
3150
}
3251

3352
GrpcMessageEncoder<Buffer> IDENTITY = new GrpcMessageEncoder<>() {
3453
@Override
35-
public GrpcMessage encode(Buffer payload) {
36-
return GrpcMessage.message("identity", WireFormat.PROTOBUF, payload);
54+
public GrpcMessage encode(Buffer msg, WireFormat format) throws CodecException {
55+
return GrpcMessage.message("identity", format, msg);
3756
}
3857
@Override
39-
public WireFormat format() {
40-
return WireFormat.PROTOBUF;
58+
public boolean accepts(WireFormat format) {
59+
return true;
4160
}
4261
};
4362

@@ -50,7 +69,7 @@ public WireFormat format() {
5069
static <T> GrpcMessageEncoder<T> json() {
5170
return new GrpcMessageEncoder<>() {
5271
@Override
53-
public GrpcMessage encode(T msg) {
72+
public GrpcMessage encode(T msg, WireFormat format) throws CodecException {
5473
if (msg instanceof MessageOrBuilder) {
5574
MessageOrBuilder mob = (MessageOrBuilder) msg;
5675
try {
@@ -66,8 +85,8 @@ public GrpcMessage encode(T msg) {
6685
Json.encodeToBuffer(msg));
6786
}
6887
@Override
69-
public WireFormat format() {
70-
return WireFormat.JSON;
88+
public boolean accepts(WireFormat format) {
89+
return format == WireFormat.JSON;
7190
}
7291
};
7392
}
@@ -77,17 +96,17 @@ public WireFormat format() {
7796
*/
7897
GrpcMessageEncoder<JsonObject> JSON_OBJECT = new GrpcMessageEncoder<>() {
7998
@Override
80-
public GrpcMessage encode(JsonObject msg) {
99+
public GrpcMessage encode(JsonObject msg, WireFormat format) throws CodecException {
81100
return GrpcMessage.message("identity", WireFormat.JSON, msg == null ? Buffer.buffer("null") : msg.toBuffer());
82101
}
83102
@Override
84-
public WireFormat format() {
85-
return WireFormat.JSON;
103+
public boolean accepts(WireFormat format) {
104+
return format == WireFormat.JSON;
86105
}
87106
};
88107

89-
GrpcMessage encode(T msg);
108+
GrpcMessage encode(T msg, WireFormat format) throws CodecException;
90109

91-
WireFormat format();
110+
boolean accepts(WireFormat format);
92111

93112
}

vertx-grpc-common/src/main/java/io/vertx/grpc/common/impl/GrpcWriteStreamBase.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,12 +153,20 @@ public S setWriteQueueMaxSize(int maxSize) {
153153

154154
@Override
155155
public final Future<Void> write(T message) {
156-
return writeMessage(messageEncoder.encode(message));
156+
return writeMessage(encodeMessage(message));
157157
}
158158

159159
@Override
160160
public final Future<Void> end(T message) {
161-
return endMessage(messageEncoder.encode(message));
161+
return endMessage(encodeMessage(message));
162+
}
163+
164+
private GrpcMessage encodeMessage(T message) {
165+
WireFormat f = format;
166+
if (f == null) {
167+
f = WireFormat.PROTOBUF;
168+
}
169+
return messageEncoder.encode(message, f);
162170
}
163171

164172
@Override

vertx-grpc-common/src/main/java/io/vertx/grpc/common/impl/WriteStreamAdapter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
import io.vertx.grpc.common.GrpcWriteStream;
1414
import io.vertx.grpc.common.GrpcMessageEncoder;
15+
import io.vertx.grpc.common.WireFormat;
1516

1617
/**
1718
* An adapter between gRPC and Vert.x back-pressure.
@@ -44,7 +45,7 @@ public final synchronized boolean isReady() {
4445
}
4546

4647
public final void write(T msg) {
47-
stream.writeMessage(encoder.encode(msg));
48+
stream.writeMessage(encoder.encode(msg, WireFormat.PROTOBUF));
4849
synchronized (this) {
4950
ready = !stream.writeQueueFull();
5051
}

vertx-grpc-common/src/test/java/io/vertx/tests/common/grpc/TestConstants.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@ public final class TestConstants {
2121

2222
public static final ServiceName TEST_SERVICE = ServiceName.create("io.vertx.tests.common.grpc.tests.TestService");
2323
public static final GrpcMessageEncoder<Empty> EMPTY_ENC = encoder();
24-
public static final GrpcMessageDecoder<Empty> EMPTY_DEC = decoder(Empty.parser());
24+
public static final GrpcMessageDecoder<Empty> EMPTY_DEC = decoder(Empty.newBuilder());
2525
public static final GrpcMessageEncoder<Request> REQUEST_ENC = encoder();
26-
public static final GrpcMessageDecoder<Request> REQUEST_DEC = decoder(Request.parser());
26+
public static final GrpcMessageDecoder<Request> REQUEST_DEC = decoder(Request.newBuilder());
2727
public static final GrpcMessageEncoder<Reply> REPLY_ENC = encoder();
28-
public static final GrpcMessageDecoder<Reply> REPLY_DEC = decoder(Reply.parser());
28+
public static final GrpcMessageDecoder<Reply> REPLY_DEC = decoder(Reply.newBuilder());
2929

3030
private TestConstants() {
3131
}

vertx-grpc-docs/src/main/java/examples/GrpcClientExamples.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,8 @@ public void clientSideAddressBasedLoadBalancing2(GrpcClient client) {
176176

177177
public void jsonWireFormat01(GrpcClient client, SocketAddress server) {
178178
client
179-
.request(server, GreeterGrpcClient.Json.SayHello).compose(request -> {
179+
.request(server, GreeterGrpcClient.SayHello).compose(request -> {
180+
request.format(WireFormat.JSON);
180181
request.end(HelloRequest
181182
.newBuilder()
182183
.setName("Bob")

vertx-grpc-docs/src/main/java/examples/GrpcServerExamples.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ public void deadlineConfiguration(Vertx vertx) {
136136
}
137137

138138
public void jsonWireFormat01(GrpcServer server) {
139-
server.callHandler(GreeterGrpcService.Json.SayHello, request -> {
139+
server.callHandler(GreeterGrpcService.SayHello, request -> {
140140
request.last().onSuccess(helloRequest -> {
141141
request.response().end(HelloReply.newBuilder()
142142
.setMessage("Hello " + helloRequest.getName()).build()

vertx-grpc-docs/src/main/java/examples/grpc/GreeterGrpcClient.java

Lines changed: 3 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -27,23 +27,7 @@ public interface GreeterGrpcClient extends GreeterClient {
2727
ServiceName.create("examples.grpc", "Greeter"),
2828
"SayHello",
2929
GrpcMessageEncoder.encoder(),
30-
GrpcMessageDecoder.decoder(examples.grpc.HelloReply.parser()));
31-
32-
/**
33-
* Json client service methods.
34-
*/
35-
@io.vertx.codegen.annotations.GenIgnore(io.vertx.codegen.annotations.GenIgnore.PERMITTED_TYPE)
36-
final class Json {
37-
38-
/**
39-
* SayHello json RPC client service method.
40-
*/
41-
public static final ServiceMethod<examples.grpc.HelloReply, examples.grpc.HelloRequest> SayHello = ServiceMethod.client(
42-
ServiceName.create("examples.grpc", "Greeter"),
43-
"SayHello",
44-
GrpcMessageEncoder.json(),
45-
GrpcMessageDecoder.json(() -> examples.grpc.HelloReply.newBuilder()));
46-
}
30+
GrpcMessageDecoder.decoder(examples.grpc.HelloReply.newBuilder()));
4731

4832
/**
4933
* Create and return a Greeter gRPC service client. The assumed wire format is Protobuf.
@@ -89,18 +73,8 @@ class GreeterGrpcClientImpl implements GreeterGrpcClient {
8973
}
9074

9175
public Future<examples.grpc.HelloReply> sayHello(examples.grpc.HelloRequest request) {
92-
ServiceMethod<examples.grpc.HelloReply, examples.grpc.HelloRequest> serviceMethod;
93-
switch (wireFormat) {
94-
case PROTOBUF:
95-
serviceMethod = SayHello;
96-
break;
97-
case JSON:
98-
serviceMethod = Json.SayHello;
99-
break;
100-
default:
101-
throw new AssertionError();
102-
}
103-
return client.request(socketAddress, serviceMethod).compose(req -> {
76+
return client.request(socketAddress, SayHello).compose(req -> {
77+
req.format(wireFormat);
10478
req.end(request);
10579
return req.response().compose(resp -> resp.last());
10680
});

0 commit comments

Comments
 (0)