@@ -56,6 +56,7 @@ type grpcExecutor struct {
5656 onWorkItemConnection func (context.Context ) error
5757 onWorkItemDisconnect func (context.Context ) error
5858 streamShutdownChan <- chan any
59+ streamSendTimeout * time.Duration
5960}
6061
6162type grpcExecutorOptions func (g * grpcExecutor )
@@ -90,6 +91,12 @@ func WithStreamShutdownChannel(c <-chan any) grpcExecutorOptions {
9091 }
9192}
9293
94+ func WithStreamSendTimeout (d time.Duration ) grpcExecutorOptions {
95+ return func (g * grpcExecutor ) {
96+ g .streamSendTimeout = & d
97+ }
98+ }
99+
93100// NewGrpcExecutor returns the Executor object and a method to invoke to register the gRPC server in the executor.
94101func NewGrpcExecutor (be Backend , logger Logger , opts ... grpcExecutorOptions ) (executor Executor , registerServerFn func (grpcServer grpc.ServiceRegistrar )) {
95102 grpcExecutor := & grpcExecutor {
@@ -322,7 +329,7 @@ func (g *grpcExecutor) GetWorkItems(req *protos.GetWorkItemsRequest, stream prot
322329 }
323330 }
324331
325- if err := stream . Send ( wi ); err != nil {
332+ if err := g . sendWorkItem ( stream , wi ); err != nil {
326333 g .logger .Errorf ("encountered an error while sending work item: %v" , err )
327334 return err
328335 }
@@ -336,6 +343,27 @@ func (g *grpcExecutor) GetWorkItems(req *protos.GetWorkItemsRequest, stream prot
336343 }
337344}
338345
346+ func (g * grpcExecutor ) sendWorkItem (stream protos.TaskHubSidecarService_GetWorkItemsServer , wi * protos.WorkItem ) error {
347+ ctx := stream .Context ()
348+ if g .streamSendTimeout != nil {
349+ var cancel context.CancelFunc
350+ ctx , cancel = context .WithTimeout (ctx , * g .streamSendTimeout )
351+ defer cancel ()
352+ }
353+
354+ errCh := make (chan error , 2 )
355+ go func () {
356+ select {
357+ case errCh <- stream .Send (wi ):
358+ case <- ctx .Done ():
359+ g .logger .Errorf ("timed out while sending work item" )
360+ errCh <- ctx .Err ()
361+ }
362+ }()
363+
364+ return <- errCh
365+ }
366+
339367// CompleteOrchestratorTask implements protos.TaskHubSidecarServiceServer
340368func (g * grpcExecutor ) CompleteOrchestratorTask (ctx context.Context , res * protos.OrchestratorResponse ) (* protos.CompleteTaskResponse , error ) {
341369 iid := api .InstanceID (res .InstanceId )
0 commit comments