Skip to content
2 changes: 1 addition & 1 deletion benchmark/benchmain/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ func prepareMessages(streams [][]testgrpc.BenchmarkService_StreamingCallClient,
// Makes a UnaryCall gRPC request using the given BenchmarkServiceClient and
// request and response sizes.
func unaryCaller(client testgrpc.BenchmarkServiceClient, reqSize, respSize int) {
if err := benchmark.DoUnaryCall(client, reqSize, respSize); err != nil {
if err := benchmark.DoUnaryCall(context.TODO(), client, reqSize, respSize); err != nil {
logger.Fatalf("DoUnaryCall failed: %v", err)
}
}
Expand Down
6 changes: 3 additions & 3 deletions benchmark/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,15 +276,15 @@ func StartServer(info ServerInfo, opts ...grpc.ServerOption) func() {
}
}

// DoUnaryCall performs a unary RPC with given stub and request and response sizes.
func DoUnaryCall(tc testgrpc.BenchmarkServiceClient, reqSize, respSize int) error {
// DoUnaryCall performs a unary RPC with given context, stub and request and response sizes.
func DoUnaryCall(ctx context.Context, tc testgrpc.BenchmarkServiceClient, reqSize, respSize int) error {
pl := NewPayload(testpb.PayloadType_COMPRESSABLE, reqSize)
req := &testpb.SimpleRequest{
ResponseType: pl.Type,
ResponseSize: int32(respSize),
Payload: pl,
}
if _, err := tc.UnaryCall(context.Background(), req); err != nil {
if _, err := tc.UnaryCall(ctx, req); err != nil {
return fmt.Errorf("/BenchmarkService/UnaryCall(_, _) = _, %v, want _, <nil>", err)
}
return nil
Expand Down
50 changes: 22 additions & 28 deletions benchmark/worker/benchmark_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ func (h *lockingHistogram) mergeInto(merged *stats.Histogram) {

type benchmarkClient struct {
closeConns func()
stop chan bool
lastResetTime time.Time
histogramOptions stats.HistogramOptions
lockingHistograms []lockingHistogram
Expand Down Expand Up @@ -168,7 +167,7 @@ func createConns(config *testpb.ClientConfig) ([]*grpc.ClientConn, func(), error
}, nil
}

func performRPCs(config *testpb.ClientConfig, conns []*grpc.ClientConn, bc *benchmarkClient) error {
func performRPCs(ctx context.Context, config *testpb.ClientConfig, conns []*grpc.ClientConn, bc *benchmarkClient) error {
// Read payload size and type from config.
var (
payloadReqSize, payloadRespSize int
Expand Down Expand Up @@ -212,17 +211,17 @@ func performRPCs(config *testpb.ClientConfig, conns []*grpc.ClientConn, bc *benc

switch config.RpcType {
case testpb.RpcType_UNARY:
bc.unaryLoop(conns, rpcCountPerConn, payloadReqSize, payloadRespSize, poissonLambda)
bc.unaryLoop(ctx, conns, rpcCountPerConn, payloadReqSize, payloadRespSize, poissonLambda)
case testpb.RpcType_STREAMING:
bc.streamingLoop(conns, rpcCountPerConn, payloadReqSize, payloadRespSize, payloadType, poissonLambda)
bc.streamingLoop(ctx, conns, rpcCountPerConn, payloadReqSize, payloadRespSize, payloadType, poissonLambda)
default:
return status.Errorf(codes.InvalidArgument, "unknown rpc type: %v", config.RpcType)
}

return nil
}

func startBenchmarkClient(config *testpb.ClientConfig) (*benchmarkClient, error) {
func startBenchmarkClient(ctx context.Context, config *testpb.ClientConfig) (*benchmarkClient, error) {
printClientConfig(config)

// Set running environment like how many cores to use.
Expand All @@ -243,13 +242,12 @@ func startBenchmarkClient(config *testpb.ClientConfig) (*benchmarkClient, error)
},
lockingHistograms: make([]lockingHistogram, rpcCountPerConn*len(conns)),

stop: make(chan bool),
lastResetTime: time.Now(),
closeConns: closeConns,
rusageLastReset: syscall.GetRusage(),
}

if err = performRPCs(config, conns, bc); err != nil {
if err = performRPCs(ctx, config, conns, bc); err != nil {
// Close all connections if performRPCs failed.
closeConns()
return nil, err
Expand All @@ -258,14 +256,17 @@ func startBenchmarkClient(config *testpb.ClientConfig) (*benchmarkClient, error)
return bc, nil
}

func (bc *benchmarkClient) unaryLoop(conns []*grpc.ClientConn, rpcCountPerConn int, reqSize int, respSize int, poissonLambda *float64) {
func (bc *benchmarkClient) unaryLoop(ctx context.Context, conns []*grpc.ClientConn, rpcCountPerConn int, reqSize int, respSize int, poissonLambda *float64) {
for ic, conn := range conns {
client := testgrpc.NewBenchmarkServiceClient(conn)
// For each connection, create rpcCountPerConn goroutines to do rpc.
for j := range rpcCountPerConn {
// Create histogram for each goroutine.
idx := ic*rpcCountPerConn + j
bc.lockingHistograms[idx].histogram = stats.NewHistogram(bc.histogramOptions)
if ctx.Err() != nil {
return
}
// Start goroutine on the created mutex and histogram.
go func(idx int) {
// TODO: do warm up if necessary.
Expand All @@ -274,31 +275,25 @@ func (bc *benchmarkClient) unaryLoop(conns []*grpc.ClientConn, rpcCountPerConn i
// before starting benchmark.
if poissonLambda == nil { // Closed loop.
for {
select {
case <-bc.stop:
return
default:
}
start := time.Now()
if err := benchmark.DoUnaryCall(client, reqSize, respSize); err != nil {
continue
if err := benchmark.DoUnaryCall(ctx, client, reqSize, respSize); err != nil {
return
}
elapse := time.Since(start)
bc.lockingHistograms[idx].add(int64(elapse))
}
} else { // Open loop.
timeBetweenRPCs := time.Duration((rand.ExpFloat64() / *poissonLambda) * float64(time.Second))
time.AfterFunc(timeBetweenRPCs, func() {
bc.poissonUnary(client, idx, reqSize, respSize, *poissonLambda)
bc.poissonUnary(ctx, client, idx, reqSize, respSize, *poissonLambda)
})
}

}(idx)
}
}
}

func (bc *benchmarkClient) streamingLoop(conns []*grpc.ClientConn, rpcCountPerConn int, reqSize int, respSize int, payloadType string, poissonLambda *float64) {
func (bc *benchmarkClient) streamingLoop(ctx context.Context, conns []*grpc.ClientConn, rpcCountPerConn int, reqSize int, respSize int, payloadType string, poissonLambda *float64) {
var doRPC func(testgrpc.BenchmarkService_StreamingCallClient, int, int) error
if payloadType == "bytebuf" {
doRPC = benchmark.DoByteBufStreamingRoundTrip
Expand All @@ -309,13 +304,16 @@ func (bc *benchmarkClient) streamingLoop(conns []*grpc.ClientConn, rpcCountPerCo
// For each connection, create rpcCountPerConn goroutines to do rpc.
for j := 0; j < rpcCountPerConn; j++ {
c := testgrpc.NewBenchmarkServiceClient(conn)
stream, err := c.StreamingCall(context.Background())
stream, err := c.StreamingCall(ctx)
if err != nil {
logger.Fatalf("%v.StreamingCall(_) = _, %v", c, err)
}
idx := ic*rpcCountPerConn + j
bc.lockingHistograms[idx].histogram = stats.NewHistogram(bc.histogramOptions)
if poissonLambda == nil { // Closed loop.
if stream.Context().Err() != nil {
return
}
// Start goroutine on the created mutex and histogram.
go func(idx int) {
// TODO: do warm up if necessary.
Comment on lines 312 to 319
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You said

We usually do the check just before performing an asynchronous operation

so I decided to do it like that. If it is not the appropriate way, please tell me

Expand All @@ -329,11 +327,6 @@ func (bc *benchmarkClient) streamingLoop(conns []*grpc.ClientConn, rpcCountPerCo
}
elapse := time.Since(start)
bc.lockingHistograms[idx].add(int64(elapse))
select {
case <-bc.stop:
return
default:
}
}
}(idx)
} else { // Open loop.
Expand All @@ -346,24 +339,26 @@ func (bc *benchmarkClient) streamingLoop(conns []*grpc.ClientConn, rpcCountPerCo
}
}

func (bc *benchmarkClient) poissonUnary(client testgrpc.BenchmarkServiceClient, idx int, reqSize int, respSize int, lambda float64) {
func (bc *benchmarkClient) poissonUnary(ctx context.Context, client testgrpc.BenchmarkServiceClient, idx int, reqSize int, respSize int, lambda float64) {
go func() {
start := time.Now()
if err := benchmark.DoUnaryCall(client, reqSize, respSize); err != nil {

if err := benchmark.DoUnaryCall(ctx, client, reqSize, respSize); err != nil {
return
}
elapse := time.Since(start)
bc.lockingHistograms[idx].add(int64(elapse))
}()
timeBetweenRPCs := time.Duration((rand.ExpFloat64() / lambda) * float64(time.Second))
time.AfterFunc(timeBetweenRPCs, func() {
bc.poissonUnary(client, idx, reqSize, respSize, lambda)
bc.poissonUnary(ctx, client, idx, reqSize, respSize, lambda)
})
}

func (bc *benchmarkClient) poissonStreaming(stream testgrpc.BenchmarkService_StreamingCallClient, idx int, reqSize int, respSize int, lambda float64, doRPC func(testgrpc.BenchmarkService_StreamingCallClient, int, int) error) {
go func() {
start := time.Now()

if err := doRPC(stream, reqSize, respSize); err != nil {
return
}
Expand Down Expand Up @@ -430,6 +425,5 @@ func (bc *benchmarkClient) getStats(reset bool) *testpb.ClientStats {
}

func (bc *benchmarkClient) shutdown() {
close(bc.stop)
bc.closeConns()
}
5 changes: 4 additions & 1 deletion benchmark/worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,10 @@ func (s *workerServer) RunClient(stream testgrpc.WorkerService_RunClientServer)
logger.Infof("client setup received when client already exists, shutting down the existing client")
bc.shutdown()
}
bc, err = startBenchmarkClient(t.Setup)

ctx, cancel := context.WithCancel(stream.Context())
defer cancel()
bc, err = startBenchmarkClient(ctx, t.Setup)
if err != nil {
return err
}
Expand Down