Skip to content

Commit f5d11fc

Browse files
committed
feat(handshake): add ping latency tracking to MetaAddr
1 parent 1640e0b commit f5d11fc

File tree

5 files changed

+95
-39
lines changed

5 files changed

+95
-39
lines changed

zebra-network/src/address_book/tests/vectors.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
//! Fixed test vectors for the address book.
22
3-
use std::time::Instant;
3+
use std::time::{Duration, Instant};
44

55
use chrono::Utc;
66
use tracing::Span;
@@ -128,7 +128,9 @@ fn address_book_peer_order() {
128128
fn reconnection_peers_skips_recently_updated_ip() {
129129
// tests that reconnection_peers() skips addresses where there's a connection at that IP with a recent:
130130
// - `last_response`
131-
test_reconnection_peers_skips_recently_updated_ip(true, MetaAddr::new_responded);
131+
test_reconnection_peers_skips_recently_updated_ip(true, |addr| {
132+
MetaAddr::new_responded(addr, Duration::ZERO)
133+
});
132134

133135
// tests that reconnection_peers() *does not* skip addresses where there's a connection at that IP with a recent:
134136
// - `last_attempt`

zebra-network/src/address_book_peers/mock.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,31 +2,35 @@
22
33
use std::{
44
sync::{Arc, Mutex},
5-
time::Instant,
5+
time::{Duration, Instant},
66
};
77

88
use crate::{meta_addr::MetaAddr, AddressBookPeers, PeerSocketAddr};
99

10+
const DEFAULT_RTT: Duration = Duration::from_millis(100);
11+
1012
/// A mock [`AddressBookPeers`] implementation that's always empty.
1113
#[derive(Debug, Default, Clone)]
1214
pub struct MockAddressBookPeers {
1315
/// Return value for mock `recently_live_peers` method.
1416
recently_live_peers: Vec<MetaAddr>,
17+
default_rtt: Duration,
1518
}
1619

1720
impl MockAddressBookPeers {
1821
/// Creates a new [`MockAddressBookPeers`]
1922
pub fn new(recently_live_peers: Vec<MetaAddr>) -> Self {
2023
Self {
2124
recently_live_peers,
25+
default_rtt: DEFAULT_RTT,
2226
}
2327
}
2428

2529
/// Adds a peer to the mock address book.
2630
pub fn add_peer(&mut self, peer: PeerSocketAddr) -> bool {
2731
// The real add peer will use `MetaAddr::new_initial_peer` but we just want to get a `MetaAddr` for the mock.
2832
self.recently_live_peers.push(
29-
MetaAddr::new_responded(peer).into_new_meta_addr(
33+
MetaAddr::new_responded(peer, self.default_rtt).into_new_meta_addr(
3034
Instant::now(),
3135
chrono::Utc::now()
3236
.try_into()

zebra-network/src/meta_addr.rs

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
33
use std::{
44
cmp::{max, Ordering},
5-
time::Instant,
5+
time::{Duration, Instant},
66
};
77

88
use chrono::Utc;
@@ -189,6 +189,12 @@ pub struct MetaAddr {
189189
/// See the [`MetaAddr::last_seen`] method for details.
190190
last_response: Option<DateTime32>,
191191

192+
/// The last measured round-trip time (RTT) for this peer, if available.
193+
///
194+
/// This value is updated when the peer responds to a ping (Pong).
195+
#[allow(dead_code)]
196+
pub(crate) rtt: Option<Duration>,
197+
192198
/// The last time we tried to open an outbound connection to this peer.
193199
///
194200
/// See the [`MetaAddr::last_attempt`] method for details.
@@ -278,6 +284,7 @@ pub enum MetaAddrChange {
278284
proptest(strategy = "canonical_peer_addr_strategy()")
279285
)]
280286
addr: PeerSocketAddr,
287+
rtt: Option<Duration>,
281288
},
282289

283290
/// Updates an existing `MetaAddr` when a peer fails.
@@ -320,6 +327,7 @@ impl MetaAddr {
320327
services: Some(untrusted_services),
321328
untrusted_last_seen: Some(untrusted_last_seen),
322329
last_response: None,
330+
rtt: None,
323331
last_attempt: None,
324332
last_failure: None,
325333
last_connection_state: NeverAttemptedGossiped,
@@ -380,9 +388,10 @@ impl MetaAddr {
380388
/// - malicious peers could interfere with other peers' [`AddressBook`](crate::AddressBook)
381389
/// state, or
382390
/// - Zebra could advertise unreachable addresses to its own peers.
383-
pub fn new_responded(addr: PeerSocketAddr) -> MetaAddrChange {
391+
pub fn new_responded(addr: PeerSocketAddr, rtt: Duration) -> MetaAddrChange {
384392
UpdateResponded {
385393
addr: canonical_peer_addr(*addr),
394+
rtt: Some(rtt),
386395
}
387396
}
388397

@@ -691,6 +700,7 @@ impl MetaAddr {
691700
untrusted_last_seen: Some(last_seen),
692701
last_response: None,
693702
// these fields aren't sent to the remote peer, but sanitize them anyway
703+
rtt: None,
694704
last_attempt: None,
695705
last_failure: None,
696706
last_connection_state: NeverAttemptedGossiped,
@@ -826,6 +836,14 @@ impl MetaAddrChange {
826836
}
827837
}
828838

839+
/// Return the RTT for this change, if available
840+
pub fn rtt(&self) -> Option<Duration> {
841+
match self {
842+
UpdateResponded { rtt, .. } => *rtt,
843+
_ => None,
844+
}
845+
}
846+
829847
/// Return the last failure for this change, if available.
830848
pub fn last_failure(&self, now: Instant) -> Option<Instant> {
831849
match self {
@@ -864,6 +882,7 @@ impl MetaAddrChange {
864882
services: self.untrusted_services(),
865883
untrusted_last_seen: self.untrusted_last_seen(local_now),
866884
last_response: self.last_response(local_now),
885+
rtt: self.rtt(),
867886
last_attempt: self.last_attempt(instant_now),
868887
last_failure: self.last_failure(instant_now),
869888
last_connection_state: self.peer_addr_state(),
@@ -907,6 +926,7 @@ impl MetaAddrChange {
907926
services: self.untrusted_services(),
908927
untrusted_last_seen: self.untrusted_last_seen(local_now),
909928
last_response: self.last_response(local_now),
929+
rtt: None,
910930
last_attempt: None,
911931
last_failure: None,
912932
last_connection_state: self.peer_addr_state(),
@@ -1061,6 +1081,7 @@ impl MetaAddrChange {
10611081
.or_else(|| self.untrusted_last_seen(local_now)),
10621082
// The peer has not been attempted, so these fields must be None
10631083
last_response: None,
1084+
rtt: None,
10641085
last_attempt: None,
10651086
last_failure: None,
10661087
last_connection_state: self.peer_addr_state(),
@@ -1083,6 +1104,7 @@ impl MetaAddrChange {
10831104
// This is a wall clock time, but we already checked that responses are in order.
10841105
// Even if the wall clock time has jumped, we want to use the latest time.
10851106
last_response: self.last_response(local_now).or(previous.last_response),
1107+
rtt: self.rtt(),
10861108
// These are monotonic times, we already checked the responses are in order.
10871109
last_attempt: self.last_attempt(instant_now).or(previous.last_attempt),
10881110
last_failure: self.last_failure(instant_now).or(previous.last_failure),

zebra-network/src/meta_addr/tests/vectors.rs

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
//! Fixed test cases for MetaAddr and MetaAddrChange.
22
3-
use std::time::Instant;
3+
use std::time::{Duration, Instant};
44

55
use chrono::Utc;
66

@@ -33,6 +33,7 @@ fn sanitize_extremes() {
3333
services: Default::default(),
3434
untrusted_last_seen: Some(u32::MIN.into()),
3535
last_response: Some(u32::MIN.into()),
36+
rtt: Some(Duration::ZERO),
3637
last_attempt: None,
3738
last_failure: None,
3839
last_connection_state: Default::default(),
@@ -45,6 +46,7 @@ fn sanitize_extremes() {
4546
services: Default::default(),
4647
untrusted_last_seen: Some(u32::MAX.into()),
4748
last_response: Some(u32::MAX.into()),
49+
rtt: Some(Duration::ZERO),
4850
last_attempt: None,
4951
last_failure: None,
5052
last_connection_state: Default::default(),
@@ -154,7 +156,7 @@ fn recently_responded_peer_is_gossipable() {
154156
let peer_seed = MetaAddr::new_initial_peer(address).into_new_meta_addr(instant_now, local_now);
155157

156158
// Create a peer that has responded
157-
let peer = MetaAddr::new_responded(address)
159+
let peer = MetaAddr::new_responded(address, Duration::ZERO)
158160
.apply_to_meta_addr(peer_seed, instant_now, chrono_now)
159161
.expect("Failed to create MetaAddr for responded peer");
160162

@@ -174,7 +176,7 @@ fn not_so_recently_responded_peer_is_still_gossipable() {
174176
let peer_seed = MetaAddr::new_initial_peer(address).into_new_meta_addr(instant_now, local_now);
175177

176178
// Create a peer that has responded
177-
let mut peer = MetaAddr::new_responded(address)
179+
let mut peer = MetaAddr::new_responded(address, Duration::ZERO)
178180
.apply_to_meta_addr(peer_seed, instant_now, chrono_now)
179181
.expect("Failed to create MetaAddr for responded peer");
180182

@@ -204,7 +206,7 @@ fn responded_long_ago_peer_is_not_gossipable() {
204206
let peer_seed = MetaAddr::new_initial_peer(address).into_new_meta_addr(instant_now, local_now);
205207

206208
// Create a peer that has responded
207-
let mut peer = MetaAddr::new_responded(address)
209+
let mut peer = MetaAddr::new_responded(address, Duration::ZERO)
208210
.apply_to_meta_addr(peer_seed, instant_now, chrono_now)
209211
.expect("Failed to create MetaAddr for responded peer");
210212

@@ -234,7 +236,7 @@ fn long_delayed_change_is_not_applied() {
234236
let peer_seed = MetaAddr::new_initial_peer(address).into_new_meta_addr(instant_now, local_now);
235237

236238
// Create a peer that has responded
237-
let peer = MetaAddr::new_responded(address)
239+
let peer = MetaAddr::new_responded(address, Duration::ZERO)
238240
.apply_to_meta_addr(peer_seed, instant_now, chrono_now)
239241
.expect("Failed to create MetaAddr for responded peer");
240242

@@ -277,7 +279,7 @@ fn later_revert_change_is_applied() {
277279
let peer_seed = MetaAddr::new_initial_peer(address).into_new_meta_addr(instant_now, local_now);
278280

279281
// Create a peer that has responded
280-
let peer = MetaAddr::new_responded(address)
282+
let peer = MetaAddr::new_responded(address, Duration::ZERO)
281283
.apply_to_meta_addr(peer_seed, instant_now, chrono_now)
282284
.expect("Failed to create MetaAddr for responded peer");
283285

@@ -319,7 +321,7 @@ fn concurrent_state_revert_change_is_not_applied() {
319321
let peer_seed = MetaAddr::new_initial_peer(address).into_new_meta_addr(instant_now, local_now);
320322

321323
// Create a peer that has responded
322-
let peer = MetaAddr::new_responded(address)
324+
let peer = MetaAddr::new_responded(address, Duration::ZERO)
323325
.apply_to_meta_addr(peer_seed, instant_now, chrono_now)
324326
.expect("Failed to create MetaAddr for responded peer");
325327

@@ -378,7 +380,7 @@ fn concurrent_state_progress_change_is_applied() {
378380
let peer_seed = MetaAddr::new_initial_peer(address).into_new_meta_addr(instant_now, local_now);
379381

380382
// Create a peer that has responded
381-
let peer = MetaAddr::new_responded(address)
383+
let peer = MetaAddr::new_responded(address, Duration::ZERO)
382384
.apply_to_meta_addr(peer_seed, instant_now, chrono_now)
383385
.expect("Failed to create MetaAddr for responded peer");
384386

@@ -423,3 +425,24 @@ fn concurrent_state_progress_change_is_applied() {
423425
peer: {peer:?}"
424426
);
425427
}
428+
429+
#[test]
430+
fn rtt_is_stored_correctly_in_meta_addr() {
431+
let _init_guard = zebra_test::init();
432+
433+
let instant_now = Instant::now();
434+
let chrono_now = Utc::now();
435+
let local_now: DateTime32 = chrono_now.try_into().expect("will succeed until 2038");
436+
437+
let address = PeerSocketAddr::from(([192, 168, 180, 9], 10_000));
438+
let peer_seed = MetaAddr::new_initial_peer(address).into_new_meta_addr(instant_now, local_now);
439+
440+
let rtt = Duration::from_millis(128);
441+
442+
// Create a peer that has responded
443+
let peer = MetaAddr::new_responded(address, rtt)
444+
.apply_to_meta_addr(peer_seed, instant_now, chrono_now)
445+
.expect("Failed to create MetaAddr for responded peer");
446+
447+
assert_eq!(peer.rtt, Some(rtt));
448+
}

zebra-network/src/peer/handshake.rs

Lines changed: 30 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use std::{
99
pin::Pin,
1010
sync::Arc,
1111
task::{Context, Poll},
12+
time::Duration,
1213
};
1314

1415
use chrono::{TimeZone, Utc};
@@ -1313,20 +1314,22 @@ async fn send_periodic_heartbeats_run_loop(
13131314
// We've reached another heartbeat interval without
13141315
// shutting down, so do a heartbeat request.
13151316
let heartbeat = send_one_heartbeat(&mut server_tx);
1316-
heartbeat_timeout(heartbeat, &heartbeat_ts_collector, &connected_addr).await?;
1317+
let rtt = heartbeat_timeout(heartbeat, &heartbeat_ts_collector, &connected_addr).await?;
13171318

1318-
// # Security
1319-
//
1320-
// Peer heartbeats are rate-limited because:
1321-
// - opening connections is rate-limited
1322-
// - the number of connections is limited
1323-
// - Zebra initiates each heartbeat using a timer
1324-
if let Some(book_addr) = connected_addr.get_address_book_addr() {
1325-
// the collector doesn't depend on network activity,
1326-
// so this await should not hang
1327-
let _ = heartbeat_ts_collector
1328-
.send(MetaAddr::new_responded(book_addr))
1329-
.await;
1319+
if let Some(rtt) = rtt {
1320+
// # Security
1321+
//
1322+
// Peer heartbeats are rate-limited because:
1323+
// - opening connections is rate-limited
1324+
// - the number of connections is limited
1325+
// - Zebra initiates each heartbeat using a timer
1326+
if let Some(book_addr) = connected_addr.get_address_book_addr() {
1327+
// the collector doesn't depend on network activity,
1328+
// so this await should not hang
1329+
let _ = heartbeat_ts_collector
1330+
.send(MetaAddr::new_responded(book_addr, rtt))
1331+
.await;
1332+
}
13301333
}
13311334
}
13321335

@@ -1336,7 +1339,7 @@ async fn send_periodic_heartbeats_run_loop(
13361339
/// Send one heartbeat using `server_tx`.
13371340
async fn send_one_heartbeat(
13381341
server_tx: &mut futures::channel::mpsc::Sender<ClientRequest>,
1339-
) -> Result<(), BoxError> {
1342+
) -> Result<Response, BoxError> {
13401343
// We just reached a heartbeat interval, so start sending
13411344
// a heartbeat.
13421345
let (tx, rx) = oneshot::channel();
@@ -1376,23 +1379,20 @@ async fn send_one_heartbeat(
13761379
// Heartbeats are checked internally to the
13771380
// connection logic, but we need to wait on the
13781381
// response to avoid canceling the request.
1379-
rx.await??;
1380-
tracing::trace!("got heartbeat response");
1382+
let response = rx.await??;
1383+
tracing::trace!(?response, "got heartbeat response");
13811384

1382-
Ok(())
1385+
Ok(response)
13831386
}
13841387

13851388
/// Wrap `fut` in a timeout, handing any inner or outer errors using
13861389
/// `handle_heartbeat_error`.
1387-
async fn heartbeat_timeout<F, T>(
1388-
fut: F,
1390+
async fn heartbeat_timeout(
1391+
fut: impl Future<Output = Result<Response, BoxError>>,
13891392
address_book_updater: &tokio::sync::mpsc::Sender<MetaAddrChange>,
13901393
connected_addr: &ConnectedAddr,
1391-
) -> Result<T, BoxError>
1392-
where
1393-
F: Future<Output = Result<T, BoxError>>,
1394-
{
1395-
let t = match timeout(constants::HEARTBEAT_INTERVAL, fut).await {
1394+
) -> Result<Option<Duration>, BoxError> {
1395+
let response = match timeout(constants::HEARTBEAT_INTERVAL, fut).await {
13961396
Ok(inner_result) => {
13971397
handle_heartbeat_error(inner_result, address_book_updater, connected_addr).await?
13981398
}
@@ -1401,7 +1401,12 @@ where
14011401
}
14021402
};
14031403

1404-
Ok(t)
1404+
let rtt = match response {
1405+
Response::Pong(rtt) => Some(rtt),
1406+
_ => None,
1407+
};
1408+
1409+
Ok(rtt)
14051410
}
14061411

14071412
/// If `result.is_err()`, mark `connected_addr` as failed using `address_book_updater`.

0 commit comments

Comments
 (0)