@@ -2358,10 +2358,28 @@ impl Commander {
23582358 gc_interval. set_missed_tick_behavior ( MissedTickBehavior :: Delay ) ;
23592359
23602360 loop {
2361- select ! {
2362- command = command_rx. recv( ) => {
2363- let Some ( ( ack_tx, command) ) = command else { break } ;
2361+ enum Event {
2362+ Command ( Option < ( oneshot:: Sender < ( ) > , DemultiplexCommand ) > ) ,
23642363
2364+ FromWorker ( Option < Multiplexed < WorkerMessage > > ) ,
2365+
2366+ // Find any channels where the receivers have been
2367+ // dropped and clear out the sending halves.
2368+ Gc ,
2369+ }
2370+ use Event :: * ;
2371+
2372+ let event = select ! {
2373+ command = command_rx. recv( ) => Command ( command) ,
2374+
2375+ msg = from_worker_rx. recv( ) => FromWorker ( msg) ,
2376+
2377+ _ = gc_interval. tick( ) => Gc ,
2378+ } ;
2379+
2380+ match event {
2381+ Command ( None ) => break ,
2382+ Command ( Some ( ( ack_tx, command) ) ) => {
23652383 match command {
23662384 DemultiplexCommand :: Listen ( job_id, waiter) => {
23672385 trace ! ( "adding listener for {job_id:?}" ) ;
@@ -2377,11 +2395,10 @@ impl Commander {
23772395 }
23782396
23792397 ack_tx. send ( ( ) ) . ok ( /* Don't care about it */ ) ;
2380- } ,
2381-
2382- msg = from_worker_rx. recv( ) => {
2383- let Some ( Multiplexed ( job_id, msg) ) = msg else { break } ;
2398+ }
23842399
2400+ FromWorker ( None ) => break ,
2401+ FromWorker ( Some ( Multiplexed ( job_id, msg) ) ) => {
23852402 if let Some ( waiter) = waiting_once. remove ( & job_id) {
23862403 trace ! ( "notifying listener for {job_id:?}" ) ;
23872404 waiter. send ( msg) . ok ( /* Don't care about it */ ) ;
@@ -2397,9 +2414,7 @@ impl Commander {
23972414 warn ! ( "no listener for {job_id:?}" ) ;
23982415 }
23992416
2400- // Find any channels where the receivers have been
2401- // dropped and clear out the sending halves.
2402- _ = gc_interval. tick( ) => {
2417+ Gc => {
24032418 waiting = mem:: take ( & mut waiting)
24042419 . into_iter ( )
24052420 . filter ( |( _job_id, tx) | !tx. is_closed ( ) )
0 commit comments