Skip to content

Commit 6599679

Browse files
Ian LeeIan Lee
authored andcommitted
stablized local runs
1 parent 55650fa commit 6599679

File tree

3 files changed

+43
-22
lines changed

3 files changed

+43
-22
lines changed

examples/internal/integration/integration_test.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1460,7 +1460,7 @@ func testABEBulkEchoDurationError(t *testing.T, port int) {
14601460
defer reqw.Close()
14611461
for i := 0; i < 10; i++ {
14621462
s := fmt.Sprintf("%d.123s", i)
1463-
if i == 9 {
1463+
if i == 5 {
14641464
s = "invalidDurationFormat"
14651465
}
14661466
buf, err := marshaler.Marshal(s)
@@ -1494,6 +1494,7 @@ func testABEBulkEchoDurationError(t *testing.T, port int) {
14941494
}
14951495

14961496
var got []*durationpb.Duration
1497+
var invalidArgumentCount int
14971498
wg.Add(1)
14981499
go func() {
14991500
defer wg.Done()
@@ -1515,7 +1516,9 @@ func testABEBulkEchoDurationError(t *testing.T, port int) {
15151516
code, ok := item.Error["code"].(float64)
15161517
if !ok {
15171518
t.Errorf("item.Error[code] not found or not a number: %#v; i = %d", item.Error, i)
1518-
} else if int32(code) != 3 {
1519+
} else if int32(code) == 3 {
1520+
invalidArgumentCount++
1521+
} else {
15191522
t.Errorf("item.Error[code] = %v; want 3; i = %d", code, i)
15201523
}
15211524
continue
@@ -1527,10 +1530,13 @@ func testABEBulkEchoDurationError(t *testing.T, port int) {
15271530
}
15281531
got = append(got, msg)
15291532
}
1533+
1534+
if invalidArgumentCount != 1 {
1535+
t.Errorf("got %d errors with code 3; want exactly 1", invalidArgumentCount)
1536+
}
15301537
}()
15311538

15321539
wg.Wait()
1533-
15341540
if diff := cmp.Diff(got, want[:len(got)], protocmp.Transform()); diff != "" {
15351541
t.Error(diff)
15361542
}

examples/internal/server/a_bit_of_everything.go

Lines changed: 33 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -326,18 +326,6 @@ func (s *_ABitOfEverythingServer) BulkEcho(stream examples.StreamService_BulkEch
326326
}
327327

328328
func (s *_ABitOfEverythingServer) BulkEchoDuration(stream examples.StreamService_BulkEchoDurationServer) error {
329-
var msgs []*durationpb.Duration
330-
for {
331-
msg, err := stream.Recv()
332-
if err == io.EOF {
333-
break
334-
}
335-
if err != nil {
336-
return err
337-
}
338-
msgs = append(msgs, msg)
339-
}
340-
341329
hmd := metadata.New(map[string]string{
342330
"foo": "foo1",
343331
"bar": "bar1",
@@ -346,18 +334,45 @@ func (s *_ABitOfEverythingServer) BulkEchoDuration(stream examples.StreamService
346334
return err
347335
}
348336

349-
for _, msg := range msgs {
350-
grpclog.Info(msg)
351-
if err := stream.Send(msg); err != nil {
352-
return err
337+
// Channel to coordinate between read and write goroutines
338+
msgChan := make(chan *durationpb.Duration)
339+
errChan := make(chan error)
340+
341+
go func() {
342+
defer close(msgChan)
343+
for {
344+
msg, err := stream.Recv()
345+
if err == io.EOF {
346+
return
347+
}
348+
if err != nil {
349+
errChan <- err
350+
return
351+
}
352+
msgChan <- msg
353353
}
354-
}
354+
}()
355+
356+
go func() {
357+
for msg := range msgChan {
358+
grpclog.Info(msg)
359+
if err := stream.Send(msg); err != nil {
360+
errChan <- err
361+
return
362+
}
363+
}
364+
// Sleep to accomodate the integration test client which is not a bidirectional streaming client.
365+
time.Sleep(1 * time.Second)
366+
close(errChan)
367+
}()
368+
369+
err := <-errChan
355370

356371
stream.SetTrailer(metadata.New(map[string]string{
357372
"foo": "foo2",
358373
"bar": "bar2",
359374
}))
360-
return nil
375+
return err
361376
}
362377

363378
func (s *_ABitOfEverythingServer) DeepPathEcho(ctx context.Context, msg *examples.ABitOfEverything) (*examples.ABitOfEverything, error) {

runtime/errors.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ func HTTPError(ctx context.Context, mux *ServeMux, marshaler Marshaler, w http.R
8181
mux.errorHandler(ctx, mux, marshaler, w, r, err)
8282
}
8383

84-
// HttpStreamError uses the mux-configured stream error handler to notify error to the client without closing the connection.
84+
// HTTPStreamError uses the mux-configured stream error handler to notify error to the client without closing the connection.
8585
func HTTPStreamError(ctx context.Context, mux *ServeMux, marshaler Marshaler, w http.ResponseWriter, r *http.Request, err error) {
8686
st := mux.streamErrorHandler(ctx, err)
8787
msg := errorChunk(st)

0 commit comments

Comments
 (0)