@@ -416,22 +416,104 @@ async fn request(
416
416
417
417
let send_stream_stats = stream_stats. new_sender ( & send, upload) ;
418
418
419
- static DATA : [ u8 ; 1024 * 1024 ] = [ 42 ; 1024 * 1024 ] ;
419
+ static DATA : [ u8 ; 1024 * 1024 ] = [ 42 ; 1024 * 1024 ] ; // 1MB of data
420
+ let unrolled = true ;
421
+
420
422
let mut remaining = upload;
421
423
let upload_start = Instant :: now ( ) ;
422
424
while remaining > 0 {
423
- let chunk_len = remaining. min ( DATA . len ( ) as u64 ) ;
425
+ if unrolled {
426
+ #[ rustfmt:: skip]
427
+ let mut data_chunks = [ // 32 MB of data
428
+ Bytes :: from_static ( & DATA ) , Bytes :: from_static ( & DATA ) ,
429
+ Bytes :: from_static ( & DATA ) , Bytes :: from_static ( & DATA ) ,
430
+ Bytes :: from_static ( & DATA ) , Bytes :: from_static ( & DATA ) ,
431
+ Bytes :: from_static ( & DATA ) , Bytes :: from_static ( & DATA ) ,
432
+ Bytes :: from_static ( & DATA ) , Bytes :: from_static ( & DATA ) ,
433
+ Bytes :: from_static ( & DATA ) , Bytes :: from_static ( & DATA ) ,
434
+ Bytes :: from_static ( & DATA ) , Bytes :: from_static ( & DATA ) ,
435
+ Bytes :: from_static ( & DATA ) , Bytes :: from_static ( & DATA ) ,
436
+ Bytes :: from_static ( & DATA ) , Bytes :: from_static ( & DATA ) ,
437
+ Bytes :: from_static ( & DATA ) , Bytes :: from_static ( & DATA ) ,
438
+ Bytes :: from_static ( & DATA ) , Bytes :: from_static ( & DATA ) ,
439
+ Bytes :: from_static ( & DATA ) , Bytes :: from_static ( & DATA ) ,
440
+ Bytes :: from_static ( & DATA ) , Bytes :: from_static ( & DATA ) ,
441
+ Bytes :: from_static ( & DATA ) , Bytes :: from_static ( & DATA ) ,
442
+ Bytes :: from_static ( & DATA ) , Bytes :: from_static ( & DATA ) ,
443
+ Bytes :: from_static ( & DATA ) , Bytes :: from_static ( & DATA ) ,
444
+ ] ;
445
+ let one_data_chunks_len = data_chunks[ 0 ] . len ( ) as u64 ;
446
+ let all_data_chunks_len = data_chunks[ ..data_chunks. len ( ) ]
447
+ . iter ( )
448
+ . map ( |b| b. len ( ) as u64 )
449
+ . sum :: < u64 > ( ) ;
450
+
451
+ if remaining > all_data_chunks_len {
452
+ // send all chunks at the same time
453
+ tokio:: select! {
454
+ biased;
455
+ _ = shutdown. cancelled( ) => {
456
+ break ;
457
+ } ,
458
+ res = send. write_chunks( & mut data_chunks) => {
459
+ let res = res. context( "sending all chunks" ) ?;
460
+
461
+ info!( "sent {} chunks for {} bytes remaining {remaining}" , res. chunks, res. bytes) ;
462
+
463
+ send_stream_stats. on_bytes( res. bytes) ;
464
+ remaining -= res. bytes as u64 ;
465
+ }
466
+ }
467
+ } else if remaining <= one_data_chunks_len {
468
+ // manually send remaining data
469
+ let chunk_len = remaining. min ( DATA . len ( ) as u64 ) ;
470
+
471
+ tokio:: select! {
472
+ biased;
473
+ _ = shutdown. cancelled( ) => {
474
+ break ;
475
+ } ,
476
+ res = send. write_chunk( Bytes :: from_static( & DATA [ ..chunk_len as usize ] ) ) => {
477
+ res. context( "sending response" ) ?;
478
+
479
+ info!( "sent {chunk_len} bytes remaining {remaining}" ) ;
480
+
481
+ send_stream_stats. on_bytes( chunk_len as usize ) ;
482
+ remaining -= chunk_len;
483
+ }
484
+ }
485
+ } else {
486
+ // send a bunch of chunks but not all
487
+ let chunk_count = remaining / one_data_chunks_len;
488
+ tokio:: select! {
489
+ biased;
490
+ _ = shutdown. cancelled( ) => {
491
+ break ;
492
+ } ,
493
+ res = send. write_chunks( & mut data_chunks[ ..chunk_count as usize ] ) => {
494
+ let res = res. context( "sending some chunks" ) ?;
495
+
496
+ info!( "sent {} chunks for {} bytes remaining {remaining}" , res. chunks, res. bytes) ;
497
+
498
+ send_stream_stats. on_bytes( res. bytes) ;
499
+ remaining -= res. bytes as u64 ;
500
+ }
501
+ }
502
+ }
503
+ } else {
504
+ let chunk_len = remaining. min ( DATA . len ( ) as u64 ) ;
424
505
425
- tokio:: select! {
426
- biased;
427
- _ = shutdown. cancelled( ) => {
428
- break ;
429
- } ,
430
- res = send. write_chunk( Bytes :: from_static( & DATA [ ..chunk_len as usize ] ) ) => {
431
- res. context( "sending response" ) ?;
506
+ tokio:: select! {
507
+ biased;
508
+ _ = shutdown. cancelled( ) => {
509
+ break ;
510
+ } ,
511
+ res = send. write_chunk( Bytes :: from_static( & DATA [ ..chunk_len as usize ] ) ) => {
512
+ res. context( "sending response" ) ?;
432
513
433
- send_stream_stats. on_bytes( chunk_len as usize ) ;
434
- remaining -= chunk_len;
514
+ send_stream_stats. on_bytes( chunk_len as usize ) ;
515
+ remaining -= chunk_len;
516
+ }
435
517
}
436
518
}
437
519
}
0 commit comments