Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion zebra-network/src/address_book/tests/vectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,9 @@ fn address_book_peer_order() {
fn reconnection_peers_skips_recently_updated_ip() {
// tests that reconnection_peers() skips addresses where there's a connection at that IP with a recent:
// - `last_response`
test_reconnection_peers_skips_recently_updated_ip(true, MetaAddr::new_responded);
test_reconnection_peers_skips_recently_updated_ip(true, |addr| {
MetaAddr::new_responded(addr, None, None)
});

// tests that reconnection_peers() *does not* skip addresses where there's a connection at that IP with a recent:
// - `last_attempt`
Expand Down
5 changes: 3 additions & 2 deletions zebra-network/src/address_book_peers/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use std::{
sync::{Arc, Mutex},
time::Instant,
time::{Duration, Instant},
};

use crate::{meta_addr::MetaAddr, AddressBookPeers, PeerSocketAddr};
Expand All @@ -25,8 +25,9 @@ impl MockAddressBookPeers {
/// Adds a peer to the mock address book.
pub fn add_peer(&mut self, peer: PeerSocketAddr) -> bool {
// The real add peer will use `MetaAddr::new_initial_peer` but we just want to get a `MetaAddr` for the mock.
let rtt = Duration::from_millis(100);
self.recently_live_peers.push(
MetaAddr::new_responded(peer).into_new_meta_addr(
MetaAddr::new_responded(peer, Some(rtt), None).into_new_meta_addr(
Instant::now(),
chrono::Utc::now()
.try_into()
Expand Down
101 changes: 98 additions & 3 deletions zebra-network/src/meta_addr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use std::{
cmp::{max, Ordering},
time::Instant,
time::{Duration, Instant},
};

use chrono::Utc;
Expand Down Expand Up @@ -189,6 +189,17 @@ pub struct MetaAddr {
/// See the [`MetaAddr::last_seen`] method for details.
last_response: Option<DateTime32>,

/// The last measured round-trip time (RTT) for this peer, if available.
///
/// This value is updated when the peer responds to a ping (Pong).
rtt: Option<Duration>,

/// The last time we sent a ping to this peer.
///
/// This value is updated each time a heartbeat ping is sent,
/// even if we never receive a response.
ping_sent_at: Option<Instant>,

/// The last time we tried to open an outbound connection to this peer.
///
/// See the [`MetaAddr::last_attempt`] method for details.
Expand Down Expand Up @@ -271,13 +282,25 @@ pub enum MetaAddrChange {
is_inbound: bool,
},

/// Updates an existing `MetaAddr` when we send a ping to a peer.
UpdatePingSent {
#[cfg_attr(
any(test, feature = "proptest-impl"),
proptest(strategy = "canonical_peer_addr_strategy()")
)]
addr: PeerSocketAddr,
ping_sent_at: Instant,
},

/// Updates an existing `MetaAddr` when a peer responds with a message.
UpdateResponded {
#[cfg_attr(
any(test, feature = "proptest-impl"),
proptest(strategy = "canonical_peer_addr_strategy()")
)]
addr: PeerSocketAddr,
rtt: Option<Duration>,
ping_sent_at: Option<Instant>,
},

/// Updates an existing `MetaAddr` when a peer fails.
Expand Down Expand Up @@ -320,6 +343,8 @@ impl MetaAddr {
services: Some(untrusted_services),
untrusted_last_seen: Some(untrusted_last_seen),
last_response: None,
rtt: None,
ping_sent_at: None,
last_attempt: None,
last_failure: None,
last_connection_state: NeverAttemptedGossiped,
Expand Down Expand Up @@ -369,6 +394,14 @@ impl MetaAddr {
}
}

/// Returns a [`MetaAddrChange::UpdatePingSent`] for a peer that we just sent a ping to.
pub fn new_ping_sent(addr: PeerSocketAddr, ping_sent_at: Instant) -> MetaAddrChange {
UpdatePingSent {
addr: canonical_peer_addr(*addr),
ping_sent_at,
}
}

/// Returns a [`MetaAddrChange::UpdateResponded`] for a peer that has just
/// sent us a message.
///
Expand All @@ -380,9 +413,15 @@ impl MetaAddr {
/// - malicious peers could interfere with other peers' [`AddressBook`](crate::AddressBook)
/// state, or
/// - Zebra could advertise unreachable addresses to its own peers.
pub fn new_responded(addr: PeerSocketAddr) -> MetaAddrChange {
pub fn new_responded(
addr: PeerSocketAddr,
rtt: Option<Duration>,
ping_sent_at: Option<Instant>,
) -> MetaAddrChange {
UpdateResponded {
addr: canonical_peer_addr(*addr),
rtt,
ping_sent_at,
}
}

Expand Down Expand Up @@ -450,6 +489,16 @@ impl MetaAddr {
self.is_inbound
}

/// Returns the round-trip time (RTT) for this peer, if available.
pub fn rtt(&self) -> Option<Duration> {
self.rtt
}

/// Returns the time this peer was last pinged, if available.
pub fn ping_sent_at(&self) -> Option<Instant> {
self.ping_sent_at
}

/// Returns the unverified "last seen time" gossiped by the remote peer that
/// sent us this address.
///
Expand Down Expand Up @@ -691,6 +740,8 @@ impl MetaAddr {
untrusted_last_seen: Some(last_seen),
last_response: None,
// these fields aren't sent to the remote peer, but sanitize them anyway
rtt: None,
ping_sent_at: None,
last_attempt: None,
last_failure: None,
last_connection_state: NeverAttemptedGossiped,
Expand Down Expand Up @@ -719,6 +770,7 @@ impl MetaAddrChange {
| NewLocal { addr, .. }
| UpdateAttempt { addr }
| UpdateConnected { addr, .. }
| UpdatePingSent { addr, .. }
| UpdateResponded { addr, .. }
| UpdateFailed { addr, .. }
| UpdateMisbehavior { addr, .. } => *addr,
Expand All @@ -736,6 +788,7 @@ impl MetaAddrChange {
| NewLocal { addr, .. }
| UpdateAttempt { addr }
| UpdateConnected { addr, .. }
| UpdatePingSent { addr, .. }
| UpdateResponded { addr, .. }
| UpdateFailed { addr, .. }
| UpdateMisbehavior { addr, .. } => *addr = new_addr,
Expand All @@ -754,6 +807,7 @@ impl MetaAddrChange {
NewLocal { .. } => Some(PeerServices::NODE_NETWORK),
UpdateAttempt { .. } => None,
UpdateConnected { services, .. } => Some(*services),
UpdatePingSent { .. } => None,
UpdateResponded { .. } => None,
UpdateFailed { services, .. } => *services,
UpdateMisbehavior { .. } => None,
Expand All @@ -772,6 +826,7 @@ impl MetaAddrChange {
NewLocal { .. } => Some(now),
UpdateAttempt { .. }
| UpdateConnected { .. }
| UpdatePingSent { .. }
| UpdateResponded { .. }
| UpdateFailed { .. }
| UpdateMisbehavior { .. } => None,
Expand Down Expand Up @@ -806,6 +861,7 @@ impl MetaAddrChange {
// handshake time.
UpdateAttempt { .. } => Some(now),
UpdateConnected { .. }
| UpdatePingSent { .. }
| UpdateResponded { .. }
| UpdateFailed { .. }
| UpdateMisbehavior { .. } => None,
Expand All @@ -823,6 +879,31 @@ impl MetaAddrChange {
// reconnection attempts.
UpdateConnected { .. } | UpdateResponded { .. } => Some(now),
UpdateFailed { .. } | UpdateMisbehavior { .. } => None,
UpdatePingSent { .. } => None,
}
}

/// Return the timestamp when a ping was last sent, if available.
pub fn ping_sent(&self) -> Option<Instant> {
match self {
UpdatePingSent { ping_sent_at, .. } => Some(*ping_sent_at),
_ => None,
}
}

/// Return the RTT for this change, if available
pub fn rtt(&self) -> Option<Duration> {
match self {
UpdateResponded { rtt, .. } => *rtt,
_ => None,
}
}

/// Returns the timestamp when a ping was last sent, if available.
pub fn ping_sent_at(&self) -> Option<Instant> {
match self {
UpdateResponded { ping_sent_at, .. } => *ping_sent_at,
_ => None,
}
}

Expand All @@ -834,6 +915,7 @@ impl MetaAddrChange {
| NewLocal { .. }
| UpdateAttempt { .. }
| UpdateConnected { .. }
| UpdatePingSent { .. }
| UpdateResponded { .. } => None,
// If there is a large delay applying this change, then:
// - the peer might stay in the `AttemptPending` or `Responded`
Expand All @@ -852,7 +934,12 @@ impl MetaAddrChange {
// local listeners get sanitized, so the state doesn't matter here
NewLocal { .. } => NeverAttemptedGossiped,
UpdateAttempt { .. } => AttemptPending,
UpdateConnected { .. } | UpdateResponded { .. } | UpdateMisbehavior { .. } => Responded,
UpdateConnected { .. }
// Sending a ping is an interaction with a connected peer, but does not indicate new liveness.
// Peers stay in Responded once connected, so we keep them in that state for UpdatePingSent.
| UpdatePingSent { .. }
| UpdateResponded { .. }
| UpdateMisbehavior { .. } => Responded,
UpdateFailed { .. } => Failed,
}
}
Expand All @@ -864,6 +951,8 @@ impl MetaAddrChange {
services: self.untrusted_services(),
untrusted_last_seen: self.untrusted_last_seen(local_now),
last_response: self.last_response(local_now),
rtt: self.rtt(),
ping_sent_at: self.ping_sent_at(),
last_attempt: self.last_attempt(instant_now),
last_failure: self.last_failure(instant_now),
last_connection_state: self.peer_addr_state(),
Expand Down Expand Up @@ -907,6 +996,8 @@ impl MetaAddrChange {
services: self.untrusted_services(),
untrusted_last_seen: self.untrusted_last_seen(local_now),
last_response: self.last_response(local_now),
rtt: None,
ping_sent_at: None,
last_attempt: None,
last_failure: None,
last_connection_state: self.peer_addr_state(),
Expand Down Expand Up @@ -1061,6 +1152,8 @@ impl MetaAddrChange {
.or_else(|| self.untrusted_last_seen(local_now)),
// The peer has not been attempted, so these fields must be None
last_response: None,
rtt: None,
ping_sent_at: None,
last_attempt: None,
last_failure: None,
last_connection_state: self.peer_addr_state(),
Expand All @@ -1083,6 +1176,8 @@ impl MetaAddrChange {
// This is a wall clock time, but we already checked that responses are in order.
// Even if the wall clock time has jumped, we want to use the latest time.
last_response: self.last_response(local_now).or(previous.last_response),
rtt: self.rtt(),
ping_sent_at: self.ping_sent_at(),
// These are monotonic times, we already checked the responses are in order.
last_attempt: self.last_attempt(instant_now).or(previous.last_attempt),
last_failure: self.last_failure(instant_now).or(previous.last_failure),
Expand Down
41 changes: 33 additions & 8 deletions zebra-network/src/meta_addr/tests/vectors.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Fixed test cases for MetaAddr and MetaAddrChange.

use std::time::Instant;
use std::time::{Duration, Instant};

use chrono::Utc;

Expand Down Expand Up @@ -33,6 +33,8 @@ fn sanitize_extremes() {
services: Default::default(),
untrusted_last_seen: Some(u32::MIN.into()),
last_response: Some(u32::MIN.into()),
rtt: Some(Duration::ZERO),
ping_sent_at: None,
last_attempt: None,
last_failure: None,
last_connection_state: Default::default(),
Expand All @@ -45,6 +47,8 @@ fn sanitize_extremes() {
services: Default::default(),
untrusted_last_seen: Some(u32::MAX.into()),
last_response: Some(u32::MAX.into()),
rtt: Some(Duration::ZERO),
ping_sent_at: None,
last_attempt: None,
last_failure: None,
last_connection_state: Default::default(),
Expand Down Expand Up @@ -154,7 +158,7 @@ fn recently_responded_peer_is_gossipable() {
let peer_seed = MetaAddr::new_initial_peer(address).into_new_meta_addr(instant_now, local_now);

// Create a peer that has responded
let peer = MetaAddr::new_responded(address)
let peer = MetaAddr::new_responded(address, None, None)
.apply_to_meta_addr(peer_seed, instant_now, chrono_now)
.expect("Failed to create MetaAddr for responded peer");

Expand All @@ -174,7 +178,7 @@ fn not_so_recently_responded_peer_is_still_gossipable() {
let peer_seed = MetaAddr::new_initial_peer(address).into_new_meta_addr(instant_now, local_now);

// Create a peer that has responded
let mut peer = MetaAddr::new_responded(address)
let mut peer = MetaAddr::new_responded(address, None, None)
.apply_to_meta_addr(peer_seed, instant_now, chrono_now)
.expect("Failed to create MetaAddr for responded peer");

Expand Down Expand Up @@ -204,7 +208,7 @@ fn responded_long_ago_peer_is_not_gossipable() {
let peer_seed = MetaAddr::new_initial_peer(address).into_new_meta_addr(instant_now, local_now);

// Create a peer that has responded
let mut peer = MetaAddr::new_responded(address)
let mut peer = MetaAddr::new_responded(address, None, None)
.apply_to_meta_addr(peer_seed, instant_now, chrono_now)
.expect("Failed to create MetaAddr for responded peer");

Expand Down Expand Up @@ -234,7 +238,7 @@ fn long_delayed_change_is_not_applied() {
let peer_seed = MetaAddr::new_initial_peer(address).into_new_meta_addr(instant_now, local_now);

// Create a peer that has responded
let peer = MetaAddr::new_responded(address)
let peer = MetaAddr::new_responded(address, None, None)
.apply_to_meta_addr(peer_seed, instant_now, chrono_now)
.expect("Failed to create MetaAddr for responded peer");

Expand Down Expand Up @@ -277,7 +281,7 @@ fn later_revert_change_is_applied() {
let peer_seed = MetaAddr::new_initial_peer(address).into_new_meta_addr(instant_now, local_now);

// Create a peer that has responded
let peer = MetaAddr::new_responded(address)
let peer = MetaAddr::new_responded(address, None, None)
.apply_to_meta_addr(peer_seed, instant_now, chrono_now)
.expect("Failed to create MetaAddr for responded peer");

Expand Down Expand Up @@ -319,7 +323,7 @@ fn concurrent_state_revert_change_is_not_applied() {
let peer_seed = MetaAddr::new_initial_peer(address).into_new_meta_addr(instant_now, local_now);

// Create a peer that has responded
let peer = MetaAddr::new_responded(address)
let peer = MetaAddr::new_responded(address, None, None)
.apply_to_meta_addr(peer_seed, instant_now, chrono_now)
.expect("Failed to create MetaAddr for responded peer");

Expand Down Expand Up @@ -378,7 +382,7 @@ fn concurrent_state_progress_change_is_applied() {
let peer_seed = MetaAddr::new_initial_peer(address).into_new_meta_addr(instant_now, local_now);

// Create a peer that has responded
let peer = MetaAddr::new_responded(address)
let peer = MetaAddr::new_responded(address, None, None)
.apply_to_meta_addr(peer_seed, instant_now, chrono_now)
.expect("Failed to create MetaAddr for responded peer");

Expand Down Expand Up @@ -423,3 +427,24 @@ fn concurrent_state_progress_change_is_applied() {
peer: {peer:?}"
);
}

#[test]
fn rtt_is_stored_correctly_in_meta_addr() {
let _init_guard = zebra_test::init();

let instant_now = Instant::now();
let chrono_now = Utc::now();
let local_now: DateTime32 = chrono_now.try_into().expect("will succeed until 2038");

let address = PeerSocketAddr::from(([192, 168, 180, 9], 10_000));
let peer_seed = MetaAddr::new_initial_peer(address).into_new_meta_addr(instant_now, local_now);

let rtt = Duration::from_millis(128);

// Create a peer that has responded
let peer = MetaAddr::new_responded(address, Some(rtt), Some(instant_now))
.apply_to_meta_addr(peer_seed, instant_now, chrono_now)
.expect("Failed to create MetaAddr for responded peer");

assert_eq!(peer.rtt, Some(rtt));
}
Loading
Loading