2525#include "src/common/libutil/fsd.h"
2626#include "src/common/libutil/blobref.h"
2727#include "src/common/libcontent/content.h"
28+ #include "ccan/ptrint/ptrint.h"
2829#include "ccan/str/str.h"
2930
3031#include "builtin.h"
3132
33+ struct dump_valref_data
34+ {
35+ flux_t * h ;
36+ json_t * treeobj ;
37+ const flux_msg_t * * msgs ;
38+ const char * path ;
39+ int total_size ;
40+ int index ;
41+ int count ;
42+ int in_flight ;
43+ int errorcount ;
44+ int errnum ;
45+ };
46+
47+ static void get_blobref (struct dump_valref_data * dvd );
48+
3249static void dump_treeobj (struct archive * ar ,
3350 flux_t * h ,
3451 const char * path ,
@@ -43,6 +60,7 @@ static time_t dump_time;
4360static gid_t dump_gid ;
4461static uid_t dump_uid ;
4562static int keycount ;
63+ static int async_max ;
4664
4765static void read_verror (const char * fmt , va_list ap )
4866{
@@ -156,18 +174,62 @@ static void dump_write_data (struct archive *ar, const void *data, int size)
156174 "assuming non-fatal libarchive write size reporting error" );
157175}
158176
177+ static void get_blobref_continuation (flux_future_t * f , void * arg )
178+ {
179+ struct dump_valref_data * dvd = arg ;
180+ const flux_msg_t * msg ;
181+ size_t len ;
182+ int index ;
183+
184+ index = ptr2int (flux_future_aux_get (f , "index" ));
185+ if (flux_future_get (f , (const void * * )& msg ) < 0
186+ || flux_response_decode_raw (msg , NULL , NULL , & len ) < 0 ) {
187+ read_error ("%s: missing blobref %d: %s" ,
188+ dvd -> path ,
189+ index ,
190+ future_strerror (f , errno ));
191+ flux_future_destroy (f );
192+ dvd -> errorcount ++ ;
193+ dvd -> errnum = errno ; /* we'll report the last errno */
194+ return ;
195+ }
196+ dvd -> in_flight -- ;
197+ dvd -> total_size += len ;
198+ dvd -> msgs [index ] = flux_msg_incref (msg );
199+
200+ /* if an error has occurred, we won't get more blobrefs */
201+ if (dvd -> index < dvd -> count
202+ && !dvd -> errorcount ) {
203+ get_blobref (dvd );
204+ dvd -> in_flight ++ ;
205+ dvd -> index ++ ;
206+ }
207+ flux_future_destroy (f );
208+ }
209+
210+ static void get_blobref (struct dump_valref_data * dvd )
211+ {
212+ const char * blobref ;
213+ flux_future_t * f ;
214+
215+ blobref = treeobj_get_blobref (dvd -> treeobj , dvd -> index );
216+
217+ if (!(f = content_load_byblobref (dvd -> h , blobref , content_flags ))
218+ || flux_future_then (f , -1 , get_blobref_continuation , dvd ) < 0 )
219+ log_err_exit ("%s: cannot load blobref %d" , dvd -> path , dvd -> index );
220+ if (flux_future_aux_set (f , "index" , int2ptr (dvd -> index ), NULL ) < 0 )
221+ log_err_exit ("could not save index value" );
222+ }
223+
159224static void dump_valref (struct archive * ar ,
160225 flux_t * h ,
161226 const char * path ,
162227 json_t * treeobj )
163228{
164229 int count = treeobj_get_count (treeobj );
165230 const flux_msg_t * * msgs ;
166- const flux_msg_t * msg ;
167- int total_size = 0 ;
168231 struct archive_entry * entry ;
169- const void * data ;
170- size_t len ;
232+ struct dump_valref_data dvd = {0 };
171233
172234 /* Load all data comprising the valref before starting the archive
173235 * entry. This is because the total size of the value must
@@ -183,33 +245,34 @@ static void dump_valref (struct archive *ar,
183245 * retaining the futures for a second pass, just retain references to the
184246 * content.load response messages.
185247 */
186- if (!(msgs = calloc (count , sizeof (msg ))))
248+ if (!(msgs = calloc (count , sizeof (msgs [ 0 ] ))))
187249 log_err_exit ("could not create messages array" );
188- for ( int i = 0 ; i < count ; i ++ ) {
189- flux_future_t * f ;
190- if (!( f = content_load_byblobref ( h ,
191- treeobj_get_blobref ( treeobj , i ),
192- content_flags ))
193- || flux_future_get ( f , ( const void * * ) & msg ) < 0
194- || flux_response_decode_raw ( msg , NULL , NULL , & len ) < 0 ) {
195- read_error ( "%s: missing blobref %d: %s" ,
196- path ,
197- i ,
198- future_strerror ( f , errno )) ;
199- flux_future_destroy ( f ) ;
200- for ( int j = 0 ; j < i ; j ++ )
201- flux_msg_decref ( msgs [ j ]);
202- free ( msgs );
203- return ;
204- }
205- msgs [ i ] = flux_msg_incref ( msg );
206- total_size += len ;
207- flux_future_destroy ( f ) ;
250+
251+ dvd . h = h ;
252+ dvd . treeobj = treeobj ;
253+ dvd . msgs = msgs ;
254+ dvd . path = path ;
255+ dvd . count = count ;
256+
257+ while ( dvd . in_flight < async_max
258+ && dvd . index < dvd . count ) {
259+ get_blobref ( & dvd );
260+ dvd . in_flight ++ ;
261+ dvd . index ++ ;
262+ }
263+
264+ if ( flux_reactor_run ( flux_get_reactor ( h ), 0 ) < 0 )
265+ log_err_exit ( "flux_reactor_run" ) ;
266+
267+ if ( dvd . errorcount ) {
268+ errno = dvd . errnum ;
269+ goto cleanup ;
208270 }
271+
209272 if (!(entry = archive_entry_new ()))
210273 log_msg_exit ("error creating archive entry" );
211274 archive_entry_set_pathname (entry , path );
212- archive_entry_set_size (entry , total_size );
275+ archive_entry_set_size (entry , dvd . total_size );
213276 archive_entry_set_perm (entry , 0644 );
214277 archive_entry_set_filetype (entry , AE_IFREG );
215278 archive_entry_set_mtime (entry , dump_time , 0 );
@@ -218,15 +281,23 @@ static void dump_valref (struct archive *ar,
218281
219282 if (archive_write_header (ar , entry ) != ARCHIVE_OK )
220283 log_msg_exit ("%s" , archive_error_string (ar ));
221- for (int i = 0 ; i < count ; i ++ ) {
284+ for (int i = 0 ; i < dvd .count ; i ++ ) {
285+ const void * data ;
286+ size_t len ;
222287 if (flux_response_decode_raw (msgs [i ], NULL , & data , & len ) < 0 )
223288 log_err_exit ("error processing stashed valref responses" );
224289 if (len > 0 )
225290 dump_write_data (ar , data , len );
226291 flux_msg_decref (msgs [i ]);
292+ msgs [i ] = NULL ;
227293 }
228294 archive_entry_free (entry );
229295 progress (h , 1 );
296+ cleanup :
297+ for (int i = 0 ; i < dvd .count ; i ++ ) {
298+ if (msgs [i ])
299+ flux_msg_decref (msgs [i ]);
300+ }
230301 free (msgs );
231302}
232303
@@ -426,6 +497,10 @@ static int cmd_dump (optparse_t *p, int ac, char *av[])
426497 content_flags |= CONTENT_FLAG_CACHE_BYPASS ;
427498 kvs_checkpoint_flags |= KVS_CHECKPOINT_FLAG_CACHE_BYPASS ;
428499 }
500+ /* default = 1, no parallelism */
501+ async_max = optparse_get_int (p , "maxreqs" , 1 );
502+ if (async_max <= 0 )
503+ log_err_exit ("invalid value for maxreqs" );
429504
430505 dump_time = time (NULL );
431506 dump_uid = getuid ();
@@ -500,6 +575,9 @@ static struct optparse_option dump_opts[] = {
500575 { .name = "sd-notify" , .has_arg = 0 ,
501576 .usage = "Send status updates to systemd via flux-broker(1)" ,
502577 },
578+ { .name = "maxreqs" , .has_arg = 1 , .arginfo = "N" ,
579+ .usage = "Increase number of concurrent requests (default 1)" ,
580+ },
503581 OPTPARSE_TABLE_END
504582};
505583
0 commit comments