@@ -88,23 +88,30 @@ static uint32_t validateMessage(TakyonBuffer *buffer, uint64_t message_bytes, co
8888  return  detected_drops ;
8989}
9090
91- static  void  sendMessage (TakyonPath  * path , const  uint64_t  message_bytes , uint64_t  message_offset , const  bool  use_polling_completion , const  uint32_t  message_index ) {
92-   // Setup the send request 
93-   bool  use_sent_notification  =  ((message_index + 1 ) % path -> attrs .max_pending_send_requests ) ==  0 ; // Need to get sent notification to implicitly wait for all pending transfers to complete or else the provider may get a buffer overflow 
94-   TakyonSubBuffer  sender_sub_buffer  =  { .buffer_index  =  0 , .bytes  =  message_bytes , .offset  =  message_offset  };
95-   TakyonSendRequest  send_request  =  { .sub_buffer_count  =  (message_bytes == 0 ) ? 0  : 1 ,
96-                                      .sub_buffers  =  (message_bytes == 0 ) ? NULL  : & sender_sub_buffer ,
97-                                      .submit_fence  =  false,
98-                                      .use_is_sent_notification  =  use_sent_notification ,
99-                                      .use_polling_completion  =  use_polling_completion ,
100-                                      .usec_sleep_between_poll_attempts  =  0  };
101- 
102-   // Start the send 
103-   uint32_t  piggyback_message  =  message_index + 1 ;
104-   takyonSend (path , & send_request , piggyback_message , TAKYON_WAIT_FOREVER , NULL );
91+ static  void  sendMessage (TakyonPath  * path , TakyonSendRequest  * send_request , const  uint64_t  message_bytes , uint64_t  message_offset , const  bool  use_polling_completion , const  uint32_t  message_index ) {
92+   // If this send request was previously used and was asynchronous, then wait for it to complete 
93+   if  (path -> capabilities .IsSent_function_supported  &&  send_request -> use_is_sent_notification ) {
94+     takyonIsSent (path , send_request , TAKYON_WAIT_FOREVER , NULL );
95+     send_request -> use_is_sent_notification  =  false;
96+   }
10597
106-   // If the provider supports non blocking sends, then need to know when it's complete 
107-   if  (path -> capabilities .IsSent_function_supported  &&  send_request .use_is_sent_notification ) takyonIsSent (path , & send_request , TAKYON_WAIT_FOREVER , NULL );
98+   // Prepare the new send request 
99+   if  (message_bytes  >  0 ) {
100+     send_request -> sub_buffers [0 ].buffer_index  =  0 ;
101+     send_request -> sub_buffers [0 ].bytes  =  message_bytes ;
102+     send_request -> sub_buffers [0 ].offset  =  message_offset ;
103+   }
104+   send_request -> sub_buffer_count  =  (message_bytes == 0 ) ? 0  : 1 ;
105+   send_request -> submit_fence  =  false;
106+   send_request -> use_is_sent_notification  =  true; // Get notification when the send is done; by later using takyonIsSent() if supported 
107+   send_request -> use_polling_completion  =  use_polling_completion ;
108+   send_request -> usec_sleep_between_poll_attempts  =  0 ;
109+ 
110+   // Start the send: 
111+   //   For asynchronous providers, this will only add the request to the provider's native request queue, the transfer will start at some time after this functions returns 
112+   //   For synchronous providers, this will block until the data is sent out. This does not mean the receiver has received it yet. 
113+   uint32_t  piggyback_message  =  message_index + 1 ;
114+   takyonSend (path , send_request , piggyback_message , TAKYON_WAIT_FOREVER , NULL );
108115}
109116
110117static  bool  recvMessage (TakyonPath  * path , TakyonRecvRequest  * recv_request , const  uint32_t  message_index , uint32_t  iterations , uint32_t  messages_received , uint64_t  * bytes_received_out , uint32_t  * piggyback_message_out ) {
@@ -185,30 +192,35 @@ static void readMessage(TakyonPath *path, const uint64_t message_bytes, const ui
185192  if  (path -> capabilities .IsOneSidedDone_function_supported  &&  request .use_is_done_notification ) takyonIsOneSidedDone (path , & request , TAKYON_WAIT_FOREVER , NULL );
186193}
187194
188- static  void  sendSignal (TakyonPath  * path , const  bool  use_polling_completion ) {
189-   // Setup the send request 
190-   TakyonSendRequest  send_request  =  { .sub_buffer_count  =  0 ,
191-                                      .sub_buffers  =  NULL ,
192-                                      .submit_fence  =  false,
193-                                      .use_is_sent_notification  =  true,
194-                                      .use_polling_completion  =  use_polling_completion ,
195-                                      .usec_sleep_between_poll_attempts  =  0  };
195+ static  void  sendEmptyMessage (TakyonPath  * path , TakyonSendRequest  * send_request , const  bool  use_polling_completion ) {
196+   // If this send request was previously used and was asynchronous, then wait for it to complete 
197+   if  (path -> capabilities .IsSent_function_supported  &&  send_request -> use_is_sent_notification ) {
198+     takyonIsSent (path , send_request , TAKYON_WAIT_FOREVER , NULL );
199+     send_request -> use_is_sent_notification  =  false;
200+   }
196201
197-   // Start the send 
202+   // Prepare the new zero-byte send request 
203+   send_request -> sub_buffer_count  =  0 ;
204+   send_request -> sub_buffers  =  NULL ;
205+   send_request -> submit_fence  =  false;
206+   send_request -> use_is_sent_notification  =  true; // Get notification when the send is done; by later using takyonIsSent() if supported 
207+   send_request -> use_polling_completion  =  use_polling_completion ;
208+   send_request -> usec_sleep_between_poll_attempts  =  0 ;
209+ 
210+   // Start the send: 
211+   //   For asynchronous providers, this will only add the request to the provider's native request queue, the transfer will start at some time after this functions returns 
212+   //   For synchronous providers, this will block until the data is sent out. This does not mean the receiver has received it yet. 
198213  uint32_t  piggyback_message  =  0 ;
199-   takyonSend (path , & send_request , piggyback_message , TAKYON_WAIT_FOREVER , NULL );
200- 
201-   // If the provider supports non blocking sends, then need to know when it's complete 
202-   if  (path -> capabilities .IsSent_function_supported  &&  send_request .use_is_sent_notification ) takyonIsSent (path , & send_request , TAKYON_WAIT_FOREVER , NULL );
214+   takyonSend (path , send_request , piggyback_message , TAKYON_WAIT_FOREVER , NULL );
203215}
204216
205- static  bool  recvSignal (TakyonPath  * path , TakyonRecvRequest  * recv_request ) {
217+ static  bool  recvEmptyMessage (TakyonPath  * path , TakyonRecvRequest  * recv_request ) {
206218  // Wait for data to arrive 
207219  uint64_t  bytes_received ;
208220  bool  timed_out ;
209221  takyonIsRecved (path , recv_request , ACTIVE_RECV_TIMEOUT_SECONDS , & timed_out , & bytes_received , NULL );
210222  if  (timed_out ) {
211-     printf ("\nTimed out waiting for signal . Make sure both endpoints define the same number of recv buffers\n" );
223+     printf ("\nTimed out waiting for empty message notification . Make sure both endpoints define the same number of recv buffers\n" );
212224    return  false;
213225  }
214226  if  (bytes_received  !=  0 ) { printf ("\nExpected a zero-byte message, but got "  UINT64_FORMAT  " bytes.\n" , bytes_received ); exit (EXIT_FAILURE ); }
@@ -231,7 +243,7 @@ static void twoSidedThroughput(const bool is_endpointA, const char *provider, co
231243  attrs .verbosity                          =  TAKYON_VERBOSITY_ERRORS ; //  | TAKYON_VERBOSITY_CREATE_DESTROY | TAKYON_VERBOSITY_CREATE_DESTROY_MORE | TAKYON_VERBOSITY_TRANSFERS | TAKYON_VERBOSITY_TRANSFERS_MORE; 
232244  attrs .buffer_count                       =  (message_bytes == 0 ) ? 0  : 1 ;
233245  attrs .buffers                            =  (message_bytes == 0 ) ? NULL  : buffer ;
234-   attrs .max_pending_send_requests          =  is_endpointA  ?  src_buffer_count  :  1 ;
246+   attrs .max_pending_send_requests          =  src_buffer_count ;
235247  attrs .max_pending_recv_requests          =  is_endpointA  ? 1  : dest_buffer_count ;
236248  attrs .max_pending_write_requests         =  0 ;
237249  attrs .max_pending_read_requests          =  0 ;
@@ -241,14 +253,14 @@ static void twoSidedThroughput(const bool is_endpointA, const char *provider, co
241253  attrs .max_sub_buffers_per_write_request  =  0 ;
242254  attrs .max_sub_buffers_per_read_request   =  0 ;
243255
244-   // Setup the receive request and it's sub buffer  
256+   // Setup the receive requests  
245257  //   - This is done before the path is setup in the case the receiver needs the recvs posted before sending can start 
246258  uint32_t  recv_request_count  =  is_endpointA  ? 0  : dest_buffer_count ;
247259  TakyonSubBuffer  * recv_sub_buffers  =  NULL ;
248260  TakyonRecvRequest  * recv_requests  =  NULL ;
249261  TakyonRecvRequest  repost_recv_request ;
250262  if  (is_endpointA ) {
251-     // Only need a single zero-byte recv request to handle the re-post signaling  
263+     // Only need a single zero-byte recv request to handle the empty message notification (informs sender that the receiver reposted)  
252264    recv_request_count  =  1 ;
253265    recv_requests  =  & repost_recv_request ;
254266    repost_recv_request .sub_buffer_count  =  0 ;
@@ -270,12 +282,27 @@ static void twoSidedThroughput(const bool is_endpointA, const char *provider, co
270282    }
271283  }
272284
285+   // Setup the send requests (used by both end-points) 
286+   //   - This is done to have persistant requests to track is-sent notifications for asynchronous sends 
287+   TakyonSubBuffer  * send_sub_buffers  =  calloc (src_buffer_count , sizeof (TakyonSubBuffer ));
288+   TakyonSendRequest  * send_requests  =  calloc (src_buffer_count , sizeof (TakyonSendRequest ));
289+   for  (uint32_t  i = 0 ; i < src_buffer_count ; i ++ ) {
290+     send_sub_buffers [i ].buffer_index  =  0 ;
291+     send_sub_buffers [i ].bytes  =  0 ;
292+     send_sub_buffers [i ].offset  =  0 ;
293+     send_requests [i ].sub_buffer_count  =  (message_bytes == 0 ) ? 0  : 1 ;
294+     send_requests [i ].sub_buffers  =  (message_bytes == 0 ) ? NULL  : & send_sub_buffers [i ];
295+     send_requests [i ].use_polling_completion  =  use_polling_completion ;
296+     send_requests [i ].usec_sleep_between_poll_attempts  =  0 ;
297+   }
298+ 
273299  // Create one side of the path 
274300  //   - The other side will be created in a different thread/process 
275301  TakyonPath  * path ;
276302  (void )takyonCreate (& attrs , recv_request_count , recv_requests , TAKYON_WAIT_FOREVER , & path );
277303
278304  // Do the transfers, and calculate the throughput 
305+   uint32_t  send_request_index  =  0 ;
279306  uint32_t  recv_request_index  =  0 ;
280307  double  start_time  =  clockTimeSeconds ();
281308  int64_t  bytes_transferred  =  0 ;
@@ -297,7 +324,8 @@ static void twoSidedThroughput(const bool is_endpointA, const char *provider, co
297324	fillInMessage (buffer , message_bytes , message_offset , i );
298325      }
299326      // Send message 
300-       sendMessage (path , message_bytes , message_offset , use_polling_completion , i );
327+       sendMessage (path , & send_requests [send_request_index ], message_bytes , message_offset , use_polling_completion , i );
328+       send_request_index  =  (send_request_index  +  1 ) % src_buffer_count ;
301329
302330    } else  {
303331      // Wait for the message to arrive (will reuse the recv_request that was already prepared) 
@@ -341,7 +369,7 @@ static void twoSidedThroughput(const bool is_endpointA, const char *provider, co
341369      if  (path -> attrs .is_endpointA ) {
342370        // Wait for the recvs to be posted, but if the provider is unreliable then no needed since dropped messages are allowed 
343371        if  (!path -> capabilities .is_unreliable ) {
344-           bool  ok  =  recvSignal (path , & repost_recv_request );
372+           bool  ok  =  recvEmptyMessage (path , & repost_recv_request );
345373          if  (!ok ) break ; // Probably dropped packets and sender is done 
346374        }
347375      } else  {
@@ -350,8 +378,12 @@ static void twoSidedThroughput(const bool is_endpointA, const char *provider, co
350378	TakyonRecvRequest  * half_recv_requests  =  post_first_half  ? recv_requests  : & recv_requests [half_dest_buffer_count ];
351379	post_first_half  =  !post_first_half ;
352380        if  (path -> capabilities .PostRecvs_function_supported ) takyonPostRecvs (path , half_dest_buffer_count , half_recv_requests );
353-         // Let the send know the recvs are posted, but if the provider is unreliable then no needed since dropped messages are allowed 
354-         if  (!path -> capabilities .is_unreliable ) sendSignal (path , use_polling_completion );
381+ 
382+         // Let the sender know this receiver is ready for more data, but if the provider is unreliable then no needed since dropped messages are allowed 
383+         if  (!path -> capabilities .is_unreliable ) {
384+           sendEmptyMessage (path , & send_requests [send_request_index ], use_polling_completion );
385+           send_request_index  =  (send_request_index  +  1 ) % src_buffer_count ;
386+         }
355387      }
356388      messages_to_be_reposted  =  0 ;
357389    }
@@ -395,6 +427,8 @@ static void twoSidedThroughput(const bool is_endpointA, const char *provider, co
395427    free (recv_sub_buffers );
396428    free (recv_requests );
397429  }
430+   free (send_sub_buffers );
431+   free (send_requests );
398432}
399433
400434static  void  oneSidedThroughput (const  bool  is_endpointA , const  char  * provider , const  uint32_t  iterations , const  uint64_t  message_bytes , const  uint32_t  src_buffer_count , const  uint32_t  dest_buffer_count , const  bool  use_polling_completion , const  bool  validate , const  bool  is_multi_threaded , TakyonBuffer  * buffer , const  char  * transfer_mode ) {
@@ -421,7 +455,7 @@ static void oneSidedThroughput(const bool is_endpointA, const char *provider, co
421455  attrs .max_sub_buffers_per_write_request  =  1 ;
422456  attrs .max_sub_buffers_per_read_request   =  1 ;
423457
424-   // Recv request  used for signaling  
458+   // Recv requests  used for empty-message synchronization  
425459  TakyonRecvRequest  recv_requests [2 ];
426460  for  (uint32_t  i = 0 ; i < 2 ; i ++ ) {
427461    recv_requests [i ].sub_buffer_count  =  0 ;
@@ -430,6 +464,15 @@ static void oneSidedThroughput(const bool is_endpointA, const char *provider, co
430464    recv_requests [i ].usec_sleep_between_poll_attempts  =  0 ;
431465  }
432466
467+   // Send requests used for empty-message synchronization 
468+   TakyonSendRequest  send_requests [2 ];
469+   for  (uint32_t  i = 0 ; i < 2 ; i ++ ) {
470+     send_requests [i ].sub_buffer_count  =  0 ;
471+     send_requests [i ].sub_buffers  =  NULL ;
472+     send_requests [i ].use_polling_completion  =  use_polling_completion ;
473+     send_requests [i ].usec_sleep_between_poll_attempts  =  0 ;
474+   }
475+ 
433476  // Create one side of the path 
434477  //   - The other side will be created in a different thread/process 
435478  TakyonPath  * path ;
@@ -446,7 +489,7 @@ static void oneSidedThroughput(const bool is_endpointA, const char *provider, co
446489        if  (path -> attrs .is_endpointA ) {
447490          // Wait for permission to start filling in the next batch of data (no need to wait if this is the first round of transfers) 
448491          if  (completed_iterations  >  0 ) {
449-             recvSignal (path , & recv_requests [half_index ]);
492+             recvEmptyMessage (path , & recv_requests [half_index ]);
450493          }
451494          // Fill in the message 
452495          if  (validate ) {
@@ -460,10 +503,10 @@ static void oneSidedThroughput(const bool is_endpointA, const char *provider, co
460503            }
461504          }
462505          // Let the remote endpoint know the set of messages are ready to be read 
463-           sendSignal (path , use_polling_completion );
506+           sendEmptyMessage (path ,  & send_requests [ half_index ] , use_polling_completion );
464507        } else  {
465-           // Wait for signal  to inform that messages can be read 
466-           recvSignal (path , & recv_requests [half_index ]);
508+           // Wait for the empty message to arrive  to inform that messages can be read 
509+           recvEmptyMessage (path , & recv_requests [half_index ]);
467510          // Read messages 
468511          for  (uint32_t  i = 0 ; i < src_buffer_count /2 ; i ++ ) {
469512            uint32_t  i2  =  (half_index == 0 ) ? i  : src_buffer_count /2 + i ;
@@ -483,19 +526,19 @@ static void oneSidedThroughput(const bool is_endpointA, const char *provider, co
483526              validateMessage (buffer , message_bytes , message_offset , message_index , & previous_start_value );
484527            }
485528          }
486-           // Send signal  to inform the remote endpoint that more message can be written 
487-           sendSignal (path , use_polling_completion );
529+           // Send empty message  to inform the remote endpoint that more message can be written 
530+           sendEmptyMessage (path ,  & send_requests [ half_index ] , use_polling_completion );
488531        }
489532      }
490533
491534    } else  {
492535      // 'write' throughput 
493-       // Transfer in one half at a time to allow for overlapping of 'writes' and receiving signals  
536+       // Transfer in one half at a time to allow for overlapping of 'writes' 
494537      for  (uint32_t  half_index = 0 ; half_index < 2 ; half_index ++ ) {
495538        if  (path -> attrs .is_endpointA ) {
496539          // Wait for permission to write (no need to wait if this is the first round of transfers) 
497540          if  (completed_iterations  >  0 ) {
498-             recvSignal (path , & recv_requests [half_index ]);
541+             recvEmptyMessage (path , & recv_requests [half_index ]);
499542          }
500543          for  (uint32_t  i = 0 ; i < src_buffer_count /2 ; i ++ ) {
501544            uint32_t  i2  =  (half_index == 0 ) ? i  : src_buffer_count /2 + i ;
@@ -511,11 +554,11 @@ static void oneSidedThroughput(const bool is_endpointA, const char *provider, co
511554            writeMessage (path , message_bytes , message_offset , use_polling_completion );
512555          }
513556          // Let the remote endpoint know the set of messages arrived 
514-           sendSignal (path , use_polling_completion );
557+           sendEmptyMessage (path ,  & send_requests [ half_index ] , use_polling_completion );
515558
516559        } else  {
517-           // Wait for signal  to inform that messages were written 
518-           recvSignal (path , & recv_requests [half_index ]);
560+           // Wait for the empty message to arrive  to inform that messages were written 
561+           recvEmptyMessage (path , & recv_requests [half_index ]);
519562          // Validate messages 
520563          if  (validate ) {
521564            static  uint32_t  previous_start_value  =  0 ;
@@ -527,8 +570,8 @@ static void oneSidedThroughput(const bool is_endpointA, const char *provider, co
527570              validateMessage (buffer , message_bytes , message_offset , message_index , & previous_start_value );
528571            }
529572          }
530-           // Send signal  to inform the remote endpoint that more message can be written 
531-           sendSignal (path , use_polling_completion );
573+           // Send empty message  to inform the remote endpoint that more message can be written 
574+           sendEmptyMessage (path ,  & send_requests [ half_index ] , use_polling_completion );
532575        }
533576      }
534577    }
0 commit comments