@@ -27,6 +27,7 @@ use tokio::sync::Mutex;
2727use tokio:: task:: JoinSet ;
2828use tokio:: time:: sleep;
2929use tonic:: service:: Routes ;
30+ use tonic:: transport:: server:: TcpIncoming ;
3031use tonic:: transport:: Server ;
3132use tonic:: { Request , Response , Status } ;
3233
@@ -56,23 +57,25 @@ struct State {
5657pub struct Lighthouse {
5758 state : Mutex < State > ,
5859 opt : LighthouseOpt ,
60+ listener : Mutex < Option < tokio:: net:: TcpListener > > ,
61+ local_addr : SocketAddr ,
5962}
6063
6164#[ derive( StructOpt , Debug ) ]
6265#[ structopt( ) ]
6366pub struct LighthouseOpt {
6467 // bind is the address to bind the server to.
6568 #[ structopt( long = "bind" , default_value = "[::]:29510" ) ]
66- bind : String ,
69+ pub bind : String ,
6770
6871 #[ structopt( long = "join_timeout_ms" , default_value = "60000" ) ]
69- join_timeout_ms : u64 ,
72+ pub join_timeout_ms : u64 ,
7073
7174 #[ structopt( long = "min_replicas" ) ]
72- min_replicas : u64 ,
75+ pub min_replicas : u64 ,
7376
7477 #[ structopt( long = "quorum_tick_ms" , default_value = "100" ) ]
75- quorum_tick_ms : u64 ,
78+ pub quorum_tick_ms : u64 ,
7679}
7780
7881fn quorum_changed ( a : & Vec < QuorumMember > , b : & Vec < QuorumMember > ) -> bool {
@@ -83,9 +86,10 @@ fn quorum_changed(a: &Vec<QuorumMember>, b: &Vec<QuorumMember>) -> bool {
8386}
8487
8588impl Lighthouse {
86- pub fn new ( opt : LighthouseOpt ) -> Arc < Self > {
89+ pub async fn new ( opt : LighthouseOpt ) -> Result < Arc < Self > > {
8790 let ( tx, _) = broadcast:: channel ( 16 ) ;
88- Arc :: new ( Self {
91+ let listener = tokio:: net:: TcpListener :: bind ( & opt. bind ) . await ?;
92+ Ok ( Arc :: new ( Self {
8993 state : Mutex :: new ( State {
9094 participants : HashMap :: new ( ) ,
9195 channel : tx,
@@ -94,7 +98,9 @@ impl Lighthouse {
9498 heartbeats : HashMap :: new ( ) ,
9599 } ) ,
96100 opt : opt,
97- } )
101+ local_addr : listener. local_addr ( ) ?,
102+ listener : Mutex :: new ( Some ( listener) ) ,
103+ } ) )
98104 }
99105
100106 // Checks whether the quorum is valid and an explanation for the state.
@@ -209,13 +215,20 @@ impl Lighthouse {
209215 }
210216 }
211217
212- async fn _run_grpc ( self : Arc < Self > ) -> Result < ( ) > {
213- let bind: SocketAddr = self . opt . bind . parse ( ) ?;
214- info ! (
215- "Lighthouse listening on: http://{}:{}" ,
218+ pub fn address ( & self ) -> String {
219+ format ! (
220+ "http://{}:{}" ,
216221 gethostname( ) . into_string( ) . unwrap( ) ,
217- bind. port( )
218- ) ;
222+ self . local_addr. port( )
223+ )
224+ }
225+
226+ async fn _run_grpc ( self : Arc < Self > ) -> Result < ( ) > {
227+ info ! ( "Lighthouse listening on: {}" , self . address( ) ) ;
228+
229+ let listener = self . listener . lock ( ) . await . take ( ) . unwrap ( ) ;
230+ let incoming =
231+ TcpIncoming :: from_listener ( listener, true , None ) . map_err ( |e| anyhow:: anyhow!( e) ) ?;
219232
220233 // Setup HTTP endpoints
221234 let app = Router :: new ( )
@@ -245,7 +258,7 @@ impl Lighthouse {
245258 // allow non-GRPC connections
246259 . accept_http1 ( true )
247260 . add_routes ( routes)
248- . serve ( bind )
261+ . serve_with_incoming ( incoming )
249262 . await
250263 . map_err ( |e| e. into ( ) )
251264 }
@@ -429,14 +442,14 @@ mod tests {
429442
430443 use crate :: torchftpb:: lighthouse_service_client:: LighthouseServiceClient ;
431444
432- fn lighthouse_test_new ( ) -> Arc < Lighthouse > {
445+ async fn lighthouse_test_new ( ) -> Result < Arc < Lighthouse > > {
433446 let opt = LighthouseOpt {
434447 min_replicas : 1 ,
435- bind : "0.0.0.0:29510 " . to_string ( ) ,
448+ bind : "[::]:0 " . to_string ( ) ,
436449 join_timeout_ms : 60 * 60 * 1000 , // 1hr
437450 quorum_tick_ms : 10 ,
438451 } ;
439- Lighthouse :: new ( opt)
452+ Lighthouse :: new ( opt) . await
440453 }
441454
442455 async fn lighthouse_client_new ( addr : String ) -> Result < LighthouseServiceClient < Channel > > {
@@ -448,8 +461,8 @@ mod tests {
448461 }
449462
450463 #[ tokio:: test]
451- async fn test_quorum_join_timeout ( ) {
452- let lighthouse = lighthouse_test_new ( ) ;
464+ async fn test_quorum_join_timeout ( ) -> Result < ( ) > {
465+ let lighthouse = lighthouse_test_new ( ) . await ? ;
453466 assert ! ( !lighthouse. quorum_valid( ) . await . 0 ) ;
454467
455468 {
@@ -478,11 +491,13 @@ mod tests {
478491 }
479492
480493 assert ! ( lighthouse. quorum_valid( ) . await . 0 ) ;
494+
495+ Ok ( ( ) )
481496 }
482497
483498 #[ tokio:: test]
484- async fn test_quorum_fast_prev_quorum ( ) {
485- let lighthouse = lighthouse_test_new ( ) ;
499+ async fn test_quorum_fast_prev_quorum ( ) -> Result < ( ) > {
500+ let lighthouse = lighthouse_test_new ( ) . await ? ;
486501 assert ! ( !lighthouse. quorum_valid( ) . await . 0 ) ;
487502
488503 {
@@ -520,23 +535,23 @@ mod tests {
520535 }
521536
522537 assert ! ( lighthouse. quorum_valid( ) . await . 0 ) ;
538+
539+ Ok ( ( ) )
523540 }
524541
525542 #[ tokio:: test]
526- async fn test_lighthouse_e2e ( ) {
543+ async fn test_lighthouse_e2e ( ) -> Result < ( ) > {
527544 let opt = LighthouseOpt {
528545 min_replicas : 1 ,
529- bind : "0.0.0.0:29510 " . to_string ( ) ,
546+ bind : "[::]:0 " . to_string ( ) ,
530547 join_timeout_ms : 1 ,
531548 quorum_tick_ms : 10 ,
532549 } ;
533- let lighthouse = Lighthouse :: new ( opt) ;
550+ let lighthouse = Lighthouse :: new ( opt) . await ? ;
534551
535552 let lighthouse_task = tokio:: spawn ( lighthouse. clone ( ) . run ( ) ) ;
536553
537- let mut client = lighthouse_client_new ( "http://localhost:29510" . to_string ( ) )
538- . await
539- . unwrap ( ) ;
554+ let mut client = lighthouse_client_new ( lighthouse. address ( ) ) . await . unwrap ( ) ;
540555
541556 {
542557 let request = tonic:: Request :: new ( LighthouseHeartbeatRequest {
@@ -563,6 +578,7 @@ mod tests {
563578 }
564579
565580 lighthouse_task. abort ( ) ;
581+ Ok ( ( ) )
566582 }
567583
568584 #[ tokio:: test]
0 commit comments