Skip to content

Commit 8579601

Browse files
authored
Fixed the issue where quorum retry could not catch non-timeout errors. (#259)
* Fixed the issue where quorum retry could not catch non-timeout errors * remove log * update * fix * fix * add ut * avoid too small sleep_time * fix ut
1 parent af5a4ff commit 8579601

File tree

1 file changed

+141
-17
lines changed

1 file changed

+141
-17
lines changed

src/manager.rs

Lines changed: 141 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -252,34 +252,44 @@ impl Manager {
252252

253253
let result = tokio::time::timeout(timeout, client.quorum(request)).await;
254254

255+
let mut sleep_time = 100;
255256
match result {
256-
Ok(response) => {
257-
return response;
257+
Ok(Ok(response)) => {
258+
return Ok(response);
258259
}
259-
Err(e) => {
260+
Ok(Err(e)) => {
260261
info_with_replica!(
261262
self.replica_id,
262263
"lighthouse quorum failed. error: {}",
263264
e.to_string()
264265
);
266+
sleep_time = timeout.as_millis() as u64
267+
/ (std::cmp::max(self.quorum_retries + 1, 1 as i64)) as u64;
268+
sleep_time = std::cmp::max(100, sleep_time);
269+
}
270+
Err(e) => {
271+
info_with_replica!(
272+
self.replica_id,
273+
"lighthouse quorum timeout. error: {}",
274+
e.to_string()
275+
);
276+
}
277+
}
265278

266-
if retry_count == self.quorum_retries {
267-
return Err(Status::internal(format!(
268-
"lighthouse quorum failed after {} retries. error: {}",
269-
retry_count,
270-
e.to_string(),
271-
)));
272-
}
279+
if retry_count == self.quorum_retries {
280+
return Err(Status::internal(format!(
281+
"lighthouse quorum failed after {} retries.",
282+
retry_count,
283+
)));
284+
}
273285

274-
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
286+
tokio::time::sleep(Duration::from_millis(sleep_time)).await;
275287

276-
// Reset the client since the lighthouse server might have failed
277-
// If this also fails, consider increasing `connect_timeout`.
278-
let _ = self.create_lighthouse_client().await;
288+
// Reset the client since the lighthouse server might have failed
289+
// If this also fails, consider increasing `connect_timeout`.
290+
let _ = self.create_lighthouse_client().await;
279291

280-
retry_count += 1;
281-
}
282-
}
292+
retry_count += 1;
283293
}
284294
}
285295

@@ -607,6 +617,10 @@ fn compute_quorum_results(
607617
mod tests {
608618
use super::*;
609619
use crate::lighthouse::{Lighthouse, LighthouseOpt};
620+
use crate::torchftpb::lighthouse_service_server::{LighthouseService, LighthouseServiceServer};
621+
use crate::torchftpb::LighthouseHeartbeatResponse;
622+
use tokio::net::TcpListener;
623+
use tonic::codegen::tokio_stream::wrappers::TcpListenerStream;
610624

611625
async fn should_commit(group_rank: i64, should_commit: bool) -> Result<ShouldCommitResponse> {
612626
let mut client = manager_client_new(
@@ -1078,4 +1092,114 @@ mod tests {
10781092

10791093
Ok(())
10801094
}
1095+
1096+
#[derive(Default)]
1097+
pub struct MockLighthouse {
1098+
request_count: Arc<Mutex<usize>>,
1099+
fail_count: usize,
1100+
}
1101+
1102+
impl MockLighthouse {
1103+
pub fn new(fail_count: usize) -> Self {
1104+
Self {
1105+
request_count: Arc::new(Mutex::new(0)),
1106+
fail_count,
1107+
}
1108+
}
1109+
}
1110+
1111+
#[tonic::async_trait]
1112+
impl LighthouseService for MockLighthouse {
1113+
async fn quorum(
1114+
&self,
1115+
request: Request<LighthouseQuorumRequest>,
1116+
) -> Result<Response<LighthouseQuorumResponse>, Status> {
1117+
let mut count = self.request_count.lock().await;
1118+
*count += 1;
1119+
let current_count = *count;
1120+
1121+
if current_count <= self.fail_count {
1122+
return Err(Status::internal(format!(
1123+
"simulated failure (request {}/{})",
1124+
current_count, self.fail_count
1125+
)));
1126+
}
1127+
1128+
let requester = request.into_inner().requester.unwrap_or_default();
1129+
1130+
Ok(Response::new(LighthouseQuorumResponse {
1131+
quorum: Some(Quorum {
1132+
participants: vec![requester],
1133+
quorum_id: 1,
1134+
created: None,
1135+
}),
1136+
}))
1137+
}
1138+
1139+
async fn heartbeat(
1140+
&self,
1141+
_request: Request<LighthouseHeartbeatRequest>,
1142+
) -> Result<Response<LighthouseHeartbeatResponse>, Status> {
1143+
Ok(Response::new(LighthouseHeartbeatResponse {}))
1144+
}
1145+
}
1146+
1147+
pub async fn start_mock_lighthouse(
1148+
fail_count: usize,
1149+
) -> Result<String, Box<dyn std::error::Error>> {
1150+
let listener = TcpListener::bind("[::]:0").await?;
1151+
let addr = listener.local_addr()?;
1152+
let mock_service = LighthouseServiceServer::new(MockLighthouse::new(fail_count));
1153+
let incoming = TcpListenerStream::new(listener);
1154+
tokio::spawn(async move {
1155+
Server::builder()
1156+
.add_service(mock_service)
1157+
.serve_with_incoming(incoming)
1158+
.await
1159+
.expect("Mock Lighthouse server failed to start");
1160+
});
1161+
Ok(format!(
1162+
"http://{}:{}",
1163+
gethostname::gethostname().into_string().unwrap(),
1164+
addr.port()
1165+
))
1166+
}
1167+
1168+
#[tokio::test]
1169+
async fn test_get_quorum_when_lighthouse_down() -> Result<()> {
1170+
let addr = start_mock_lighthouse(1).await.unwrap();
1171+
1172+
let manager = Manager::new(
1173+
"rep_id".to_string(),
1174+
addr,
1175+
"localhost".to_string(),
1176+
"[::]:0".to_string(),
1177+
"store_addr".to_string(),
1178+
1,
1179+
Duration::from_millis(100),
1180+
Duration::from_secs(10),
1181+
1,
1182+
)
1183+
.await?;
1184+
let manager_fut = tokio::spawn(manager.clone().run());
1185+
1186+
let mut client = manager_client_new(manager.address(), Duration::from_secs(3)).await?;
1187+
1188+
let mut request = tonic::Request::new(ManagerQuorumRequest {
1189+
group_rank: 0,
1190+
step: 123,
1191+
checkpoint_metadata: "addr".to_string(),
1192+
shrink_only: false,
1193+
init_sync: true,
1194+
commit_failures: 3,
1195+
});
1196+
request.set_timeout(Duration::from_secs(3));
1197+
let resp = client.quorum(request).await?.into_inner();
1198+
1199+
manager_fut.abort();
1200+
1201+
assert_eq!(resp.quorum_id, 1);
1202+
1203+
Ok(())
1204+
}
10811205
}

0 commit comments

Comments
 (0)