Skip to content

Commit 739d2ee

Browse files
ianbbqzyIan Lee
andauthored
feat: use StreamErrorHandler to send back invalid argument error in bidirectional streaming (#4864)
* feat: use StreamErrorHandler to send back invalid argument error in bidirectional streaming * add unit tests and fix casing * try integration tests * give up on async request streaming in integration test * stablized local runs --------- Co-authored-by: Ian Lee <[email protected]>
1 parent 82435a8 commit 739d2ee

File tree

12 files changed

+430
-35
lines changed

12 files changed

+430
-35
lines changed

examples/internal/integration/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ go_test(
2525
"@org_golang_google_protobuf//encoding/protojson",
2626
"@org_golang_google_protobuf//proto",
2727
"@org_golang_google_protobuf//testing/protocmp",
28+
"@org_golang_google_protobuf//types/known/durationpb",
2829
"@org_golang_google_protobuf//types/known/emptypb",
2930
"@org_golang_google_protobuf//types/known/fieldmaskpb",
3031
"@org_golang_google_protobuf//types/known/structpb",

examples/internal/integration/integration_test.go

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"google.golang.org/protobuf/encoding/protojson"
2828
"google.golang.org/protobuf/proto"
2929
"google.golang.org/protobuf/testing/protocmp"
30+
"google.golang.org/protobuf/types/known/durationpb"
3031
"google.golang.org/protobuf/types/known/emptypb"
3132
fieldmaskpb "google.golang.org/protobuf/types/known/fieldmaskpb"
3233
"google.golang.org/protobuf/types/known/structpb"
@@ -521,6 +522,7 @@ func TestABE(t *testing.T) {
521522
testABEDownload(t, 8088)
522523
testABEBulkEcho(t, 8088)
523524
testABEBulkEchoZeroLength(t, 8088)
525+
testABEBulkEchoDurationError(t, 8088)
524526
testAdditionalBindings(t, 8088)
525527
testABERepeated(t, 8088)
526528
testABEExists(t, 8088)
@@ -1448,6 +1450,98 @@ func testABEBulkEchoZeroLength(t *testing.T, port int) {
14481450
}
14491451
}
14501452

1453+
func testABEBulkEchoDurationError(t *testing.T, port int) {
1454+
reqr, reqw := io.Pipe()
1455+
var wg sync.WaitGroup
1456+
var want []*durationpb.Duration
1457+
wg.Add(1)
1458+
go func() {
1459+
defer wg.Done()
1460+
defer reqw.Close()
1461+
for i := 0; i < 10; i++ {
1462+
s := fmt.Sprintf("%d.123s", i)
1463+
if i == 5 {
1464+
s = "invalidDurationFormat"
1465+
}
1466+
buf, err := marshaler.Marshal(s)
1467+
if err != nil {
1468+
t.Errorf("marshaler.Marshal(%v) failed with %v; want success", s, err)
1469+
return
1470+
}
1471+
if _, err = reqw.Write(buf); err != nil {
1472+
t.Errorf("reqw.Write(%q) failed with %v; want success", string(buf), err)
1473+
return
1474+
}
1475+
want = append(want, &durationpb.Duration{Seconds: int64(i), Nanos: int32(0.123 * 1e9)})
1476+
}
1477+
}()
1478+
apiURL := fmt.Sprintf("http://localhost:%d/v1/example/a_bit_of_everything/echo_duration", port)
1479+
req, err := http.NewRequest("POST", apiURL, reqr)
1480+
if err != nil {
1481+
t.Errorf("http.NewRequest(%q, %q, reqr) failed with %v; want success", "POST", apiURL, err)
1482+
return
1483+
}
1484+
req.Header.Set("Content-Type", "application/json")
1485+
req.Header.Set("Transfer-Encoding", "chunked")
1486+
resp, err := http.DefaultClient.Do(req)
1487+
if err != nil {
1488+
t.Errorf("http.Post(%q, %q, req) failed with %v; want success", apiURL, "application/json", err)
1489+
return
1490+
}
1491+
defer resp.Body.Close()
1492+
if got, want := resp.StatusCode, http.StatusOK; got != want {
1493+
t.Errorf("resp.StatusCode = %d; want %d", got, want)
1494+
}
1495+
1496+
var got []*durationpb.Duration
1497+
var invalidArgumentCount int
1498+
wg.Add(1)
1499+
go func() {
1500+
defer wg.Done()
1501+
1502+
dec := marshaler.NewDecoder(resp.Body)
1503+
for i := 0; ; i++ {
1504+
var item struct {
1505+
Result json.RawMessage `json:"result"`
1506+
Error map[string]interface{} `json:"error"`
1507+
}
1508+
err := dec.Decode(&item)
1509+
if err == io.EOF {
1510+
break
1511+
}
1512+
if err != nil {
1513+
t.Errorf("dec.Decode(&item) failed with %v; want success; i = %d", err, i)
1514+
}
1515+
if len(item.Error) != 0 {
1516+
code, ok := item.Error["code"].(float64)
1517+
if !ok {
1518+
t.Errorf("item.Error[code] not found or not a number: %#v; i = %d", item.Error, i)
1519+
} else if int32(code) == 3 {
1520+
invalidArgumentCount++
1521+
} else {
1522+
t.Errorf("item.Error[code] = %v; want 3; i = %d", code, i)
1523+
}
1524+
continue
1525+
}
1526+
1527+
msg := new(durationpb.Duration)
1528+
if err := marshaler.Unmarshal(item.Result, msg); err != nil {
1529+
t.Errorf("marshaler.Unmarshal(%q, msg) failed with %v; want success", item.Result, err)
1530+
}
1531+
got = append(got, msg)
1532+
}
1533+
1534+
if invalidArgumentCount != 1 {
1535+
t.Errorf("got %d errors with code 3; want exactly 1", invalidArgumentCount)
1536+
}
1537+
}()
1538+
1539+
wg.Wait()
1540+
if diff := cmp.Diff(got, want[:5], protocmp.Transform()); diff != "" {
1541+
t.Error(diff)
1542+
}
1543+
}
1544+
14511545
func testAdditionalBindings(t *testing.T, port int) {
14521546
for i, f := range []func() *http.Response{
14531547
func() *http.Response {

examples/internal/proto/examplepb/flow_combination.pb.gw.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

examples/internal/proto/examplepb/stream.pb.go

Lines changed: 41 additions & 27 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

examples/internal/proto/examplepb/stream.pb.gw.go

Lines changed: 90 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

examples/internal/proto/examplepb/stream.proto

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import "examples/internal/proto/examplepb/a_bit_of_everything.proto";
66
import "examples/internal/proto/sub/message.proto";
77
import "google/api/annotations.proto";
88
import "google/api/httpbody.proto";
9+
import "google/protobuf/duration.proto";
910
import "google/protobuf/empty.proto";
1011

1112
option go_package = "github.com/grpc-ecosystem/grpc-gateway/v2/examples/internal/proto/examplepb";
@@ -27,6 +28,12 @@ service StreamService {
2728
body: "*"
2829
};
2930
}
31+
rpc BulkEchoDuration(stream google.protobuf.Duration) returns (stream google.protobuf.Duration) {
32+
option (google.api.http) = {
33+
post: "/v1/example/a_bit_of_everything/echo_duration"
34+
body: "*"
35+
};
36+
}
3037

3138
rpc Download(Options) returns (stream google.api.HttpBody) {
3239
option (google.api.http) = {get: "/v1/example/download"};

0 commit comments

Comments
 (0)