Skip to content

Commit 1bbdbf1

Browse files
committed
feat: add remote info and connection info
1 parent 0659765 commit 1bbdbf1

File tree

6 files changed

+256
-103
lines changed

6 files changed

+256
-103
lines changed

iroh/examples/transfer.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -522,13 +522,21 @@ fn parse_byte_size(s: &str) -> std::result::Result<u64, parse_size::Error> {
522522
}
523523

524524
fn watch_conn_type(endpoint: &Endpoint, endpoint_id: EndpointId) -> AbortOnDropHandle<()> {
525-
let mut stream = endpoint.conn_type(endpoint_id).unwrap().stream();
525+
let info = endpoint.remote_info(endpoint_id).unwrap();
526+
let mut stream = info.selected_path().stream();
526527
let task = tokio::task::spawn(async move {
527-
while let Some(conn_type) = stream.next().await {
528-
println!(
529-
"[{}] Connection type changed to: {conn_type}",
530-
endpoint_id.fmt_short()
531-
);
528+
while let Some(selected_path) = stream.next().await {
529+
if let Some(selected_path) = selected_path {
530+
let label = match selected_path {
531+
TransportAddr::Ip(addr) => format!("direct ({addr})"),
532+
TransportAddr::Relay(url) => format!("relay ({url})"),
533+
_ => format!("unknown transport"),
534+
};
535+
println!(
536+
"[{}] Connection type changed to: {label}",
537+
endpoint_id.fmt_short()
538+
);
539+
}
532540
}
533541
});
534542
AbortOnDropHandle::new(task)

iroh/src/endpoint.rs

Lines changed: 19 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use url::Url;
2626

2727
pub use super::magicsock::{
2828
AddEndpointAddrError, ConnectionType, DirectAddr, DirectAddrType, PathInfo, PathsInfo,
29-
endpoint_map::Source,
29+
RemoteInfo, endpoint_map::Source,
3030
};
3131
#[cfg(wasm_browser)]
3232
use crate::discovery::pkarr::PkarrResolver;
@@ -55,8 +55,8 @@ pub use quinn::{
5555
AcceptBi, AcceptUni, AckFrequencyConfig, ApplicationClose, Chunk, ClosedStream,
5656
ConnectionClose, ConnectionError, ConnectionStats, MtuDiscoveryConfig, OpenBi, OpenUni,
5757
ReadDatagram, ReadError, ReadExactError, ReadToEndError, RecvStream, ResetError, RetryError,
58-
SendDatagramError, SendStream, ServerConfig, StoppedError, StreamId, TransportConfig, VarInt,
59-
WeakConnectionHandle, WriteError,
58+
SendDatagramError, SendStream, ServerConfig, Side, StoppedError, StreamId, TransportConfig,
59+
VarInt, WeakConnectionHandle, WriteError,
6060
};
6161
pub use quinn_proto::{
6262
FrameStats, PathStats, TransportError, TransportErrorCode, UdpStats, Written,
@@ -69,8 +69,8 @@ pub use quinn_proto::{
6969

7070
pub use self::connection::{
7171
Accept, Accepting, AlpnError, AuthenticationError, Connecting, ConnectingError, Connection,
72-
Incoming, IncomingZeroRttConnection, OutgoingZeroRttConnection, RemoteEndpointIdError,
73-
ZeroRttStatus,
72+
ConnectionInfo, Incoming, IncomingZeroRttConnection, OutgoingZeroRttConnection,
73+
RemoteEndpointIdError, ZeroRttStatus,
7474
};
7575

7676
/// The delay to fall back to discovery when direct addresses fail.
@@ -98,7 +98,7 @@ pub enum PathSelection {
9898
/// new [`EndpointId`].
9999
///
100100
/// To create the [`Endpoint`] call [`Builder::bind`].
101-
#[derive(Debug)]
101+
#[derive(derive_more::Debug)]
102102
pub struct Builder {
103103
secret_key: Option<SecretKey>,
104104
relay_mode: RelayMode,
@@ -963,38 +963,24 @@ impl Endpoint {
963963
//
964964
// Partially they return things passed into the builder.
965965

966-
/// Returns a [`Watcher`] that reports the current connection type and any changes for
967-
/// given remote endpoint.
968-
///
969-
/// This watcher allows observing a stream of [`ConnectionType`] items by calling
970-
/// [`Watcher::stream()`]. If the underlying connection to a remote endpoint changes, it will
971-
/// yield a new item. These connection changes are when the connection switches between
972-
/// using the Relay server and a direct connection.
973-
///
974-
/// Note that this does not guarantee each connection change is yielded in the stream.
975-
/// If the connection type changes several times before this stream is polled, only the
976-
/// last recorded state is returned. This can be observed e.g. right at the start of a
977-
/// connection when the switch from a relayed to a direct connection can be so fast that
978-
/// the relayed state is never exposed.
966+
/// Information about a remote endpoint.
979967
///
980-
/// If there is currently a connection with the remote endpoint, then using [`Watcher::get`]
981-
/// will immediately return either [`ConnectionType::Relay`], [`ConnectionType::Direct`]
982-
/// or [`ConnectionType::Mixed`].
968+
/// From the [`RemoteInfo`] you can watch which path is selected, get the current
969+
/// round-trip time (latency), and get a list of [`ConnectionInfo`].
983970
///
984-
/// It is possible for the connection type to be [`ConnectionType::None`] if you've
985-
/// recently connected to this endpoint id but previous methods of reaching the endpoint have
986-
/// become inaccessible.
987-
///
988-
/// Will return `None` if we do not have any address information for the given `endpoint_id`.
989-
pub fn conn_type(&self, endpoint_id: EndpointId) -> Option<n0_watcher::Direct<ConnectionType>> {
990-
self.msock.conn_type(endpoint_id)
971+
/// Returns `None` if we don't have any state for this remote.
972+
pub fn remote_info(&self, endpoint_id: EndpointId) -> Option<RemoteInfo> {
973+
self.msock.endpoint_map.remote_info(endpoint_id)
991974
}
992975

993-
/// Returns the currently lowest latency for this endpoint.
976+
/// Returns a list of all remote endpoints that this endpoint is dealing with.
977+
///
978+
/// This includes all endpoints to which we have active connections. It also may include endpoints
979+
/// to which we are in the process of connecting, or have recently been connected to.
994980
///
995-
/// Will return `None` if we do not have any address information for the given `endpoint_id`.
996-
pub async fn latency(&self, endpoint_id: EndpointId) -> Option<Duration> {
997-
self.msock.latency(endpoint_id).await
981+
/// TODO: Expand docs.
982+
pub fn remotes(&self) -> Vec<RemoteInfo> {
983+
self.msock.endpoint_map.remotes()
998984
}
999985

1000986
/// Returns the DNS resolver used in this [`Endpoint`].

iroh/src/endpoint/connection.rs

Lines changed: 84 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use n0_watcher::{Watchable, Watcher};
3535
use pin_project::pin_project;
3636
use quinn::{
3737
AcceptBi, AcceptUni, ConnectionError, ConnectionStats, OpenBi, OpenUni, ReadDatagram,
38-
RetryError, SendDatagramError, ServerConfig, VarInt,
38+
RetryError, SendDatagramError, ServerConfig, Side, VarInt, WeakConnectionHandle,
3939
};
4040
use quinn_proto::PathId;
4141
use tracing::warn;
@@ -241,15 +241,16 @@ fn conn_from_quinn_conn(
241241
let alpn = alpn_from_quinn_conn(&conn).ok_or_else(|| e!(AuthenticationError::NoAlpn))?;
242242
let paths_info_watchable = init_paths_info_watcher(&conn, ep);
243243
let paths_info = paths_info_watchable.watch();
244-
// Register this connection with the magicsock.
245-
ep.msock
246-
.register_connection(remote_id, &conn, paths_info_watchable.clone());
247-
Ok(Connection {
244+
let conn = Connection {
248245
remote_id,
249246
alpn,
250247
inner: conn,
251248
paths_info,
252-
})
249+
};
250+
let info = conn.to_info();
251+
// Register this connection with the magicsock.
252+
ep.msock.register_connection(info, paths_info_watchable);
253+
Ok(conn)
253254
}
254255

255256
fn init_paths_info_watcher(conn: &quinn::Connection, ep: &Endpoint) -> Watchable<PathsInfo> {
@@ -1530,6 +1531,83 @@ impl Connection {
15301531
pub fn set_max_concurrent_bi_streams(&self, count: VarInt) {
15311532
self.inner.set_max_concurrent_bi_streams(count)
15321533
}
1534+
1535+
/// Returns the side of the connection (client or server).
1536+
pub fn side(&self) -> Side {
1537+
self.inner.side()
1538+
}
1539+
1540+
/// Returns a [`ConnectionInfo`], which is a weak handle to the connection
1541+
/// that does not keep the connection alive, but does allow to access some information
1542+
/// about the connection, and allows to wait for the connection to be closed.
1543+
pub fn to_info(&self) -> ConnectionInfo {
1544+
ConnectionInfo {
1545+
alpn: self.alpn.clone(),
1546+
remote_id: self.remote_id,
1547+
inner: self.inner.weak_handle(),
1548+
paths_info: self.paths_info.clone(),
1549+
side: self.side(),
1550+
}
1551+
}
1552+
}
1553+
1554+
/// A [`ConnectionInfo`] is a weak handle to a connection that exposes some information about the connection,
1555+
/// but does not keep the connection alive.
1556+
#[derive(Debug, Clone)]
1557+
pub struct ConnectionInfo {
1558+
pub(crate) side: Side,
1559+
pub(crate) alpn: Vec<u8>,
1560+
pub(crate) remote_id: EndpointId,
1561+
pub(crate) inner: WeakConnectionHandle,
1562+
pub(crate) paths_info: n0_watcher::Direct<PathsInfo>,
1563+
}
1564+
1565+
#[allow(missing_docs)]
1566+
impl ConnectionInfo {
1567+
pub fn alpn(&self) -> &[u8] {
1568+
&self.alpn
1569+
}
1570+
1571+
pub fn remote_id(&self) -> &EndpointId {
1572+
&self.remote_id
1573+
}
1574+
1575+
pub fn is_alive(&self) -> bool {
1576+
self.inner.upgrade().is_some()
1577+
}
1578+
1579+
/// Returns information about the network paths in use by this connection.
1580+
///
1581+
/// A connection can have several network paths to the remote endpoint, commonly there
1582+
/// will be a path via the relay server and a holepunched path. This returns all the
1583+
/// paths in use by this connection.
1584+
pub fn paths_info(&self) -> &impl Watcher<Value = PathsInfo> {
1585+
&self.paths_info
1586+
}
1587+
1588+
// We could add such util methods here, not sure.
1589+
pub fn has_direct_path(&mut self) -> bool {
1590+
self.paths_info.get().keys().any(|addr| addr.is_ip())
1591+
}
1592+
1593+
/// Current best estimate of this connection's latency (round-trip-time)
1594+
///
1595+
/// Returns `None` if the connection has been dropped.
1596+
pub fn rtt(&self) -> Option<Duration> {
1597+
self.inner.upgrade().map(|conn| conn.rtt())
1598+
}
1599+
1600+
/// Returns connection statistics.
1601+
///
1602+
/// Returns `None` if the connection has been dropped.
1603+
pub fn stats(&self) -> Option<ConnectionStats> {
1604+
self.inner.upgrade().map(|conn| conn.stats())
1605+
}
1606+
1607+
/// Returns the side of the connection (client or server).
1608+
pub fn side(&self) -> Side {
1609+
self.side
1610+
}
15331611
}
15341612

15351613
#[cfg(test)]

iroh/src/magicsock.rs

Lines changed: 6 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ use crate::{
6363
defaults::timeouts::NET_REPORT_TIMEOUT,
6464
disco::{self, SendAddr},
6565
discovery::{ConcurrentDiscovery, Discovery, EndpointData, UserData},
66+
endpoint::ConnectionInfo,
6667
key::{DecryptionError, SharedSecret, public_ed_box, secret_ed_box},
6768
metrics::EndpointMetrics,
6869
net_report::{self, IfStateDetails, Report},
@@ -77,7 +78,7 @@ pub(crate) mod transports;
7778
use mapped_addrs::{EndpointIdMappedAddr, MappedAddr};
7879

7980
pub use self::{
80-
endpoint_map::{ConnectionType, PathInfo, PathsInfo},
81+
endpoint_map::{ConnectionType, PathInfo, PathsInfo, RemoteInfo},
8182
metrics::Metrics,
8283
};
8384

@@ -174,7 +175,7 @@ pub(crate) struct Handle {
174175
/// It is usually only necessary to use a single [`MagicSock`] instance in an application, it
175176
/// means any QUIC endpoints on top will be sharing as much information about endpoints as
176177
/// possible.
177-
#[derive(Debug)]
178+
#[derive(derive_more::Debug)]
178179
pub(crate) struct MagicSock {
179180
/// Channel to send to the internal actor.
180181
actor_sender: mpsc::Sender<ActorMessage>,
@@ -270,8 +271,7 @@ impl MagicSock {
270271
/// connection.
271272
pub(crate) fn register_connection(
272273
&self,
273-
remote: EndpointId,
274-
conn: &quinn::Connection,
274+
conn: ConnectionInfo,
275275
paths_info: n0_watcher::Watchable<PathsInfo>,
276276
) {
277277
// TODO: Spawning tasks like this is obviously bad. But it is solvable:
@@ -289,9 +289,8 @@ impl MagicSock {
289289
// have a ZrttConnection::into_connection() function which can be async and actually
290290
// send this. Before the handshake has completed we don't have anything useful to
291291
// do with this connection inside of the EndpointStateActor anyway.
292-
let weak_handle = conn.weak_handle();
293-
let endpoint_state = self.endpoint_map.endpoint_state_actor(remote);
294-
let msg = EndpointStateMessage::AddConnection(weak_handle, paths_info);
292+
let endpoint_state = self.endpoint_map.endpoint_state_actor(conn.remote_id);
293+
let msg = EndpointStateMessage::AddConnection(conn, paths_info);
295294

296295
task::spawn(async move {
297296
endpoint_state.send(msg).await.ok();
@@ -397,32 +396,6 @@ impl MagicSock {
397396
})
398397
}
399398

400-
/// Returns a [`n0_watcher::Direct`] that reports the [`ConnectionType`] we have to the
401-
/// given `endpoint_id`.
402-
///
403-
/// This gets us a copy of the [`n0_watcher::Direct`] for the [`Watchable`] with a
404-
/// [`ConnectionType`] that the `EndpointMap` stores for each `endpoint_id`'s endpoint.
405-
///
406-
/// # Errors
407-
///
408-
/// Will return `None` if there is no address information known about the
409-
/// given `endpoint_id`.
410-
pub(crate) fn conn_type(&self, eid: EndpointId) -> Option<n0_watcher::Direct<ConnectionType>> {
411-
self.endpoint_map.conn_type(eid)
412-
}
413-
414-
// TODO: Build better info to expose to the user about remote nodes. We probably want
415-
// to expose this as part of path information instead.
416-
pub(crate) async fn latency(&self, eid: EndpointId) -> Option<Duration> {
417-
let endpoint_state = self.endpoint_map.endpoint_state_actor(eid);
418-
let (tx, rx) = oneshot::channel();
419-
endpoint_state
420-
.send(EndpointStateMessage::Latency(tx))
421-
.await
422-
.ok();
423-
rx.await.unwrap_or_default()
424-
}
425-
426399
/// Returns the socket address which can be used by the QUIC layer to dial this endpoint.
427400
pub(crate) fn get_endpoint_mapped_addr(&self, eid: EndpointId) -> EndpointIdMappedAddr {
428401
self.endpoint_map.endpoint_mapped_addr(eid)

iroh/src/magicsock/endpoint_map.rs

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,15 @@ use super::{
1717
mapped_addrs::{AddrMap, EndpointIdMappedAddr, MultipathMappedAddr, RelayMappedAddr},
1818
transports::{self, OwnedTransmit, TransportsSender},
1919
};
20-
use crate::disco::{self};
20+
use crate::disco;
2121
// #[cfg(any(test, feature = "test-utils"))]
2222
// use crate::endpoint::PathSelection;
2323

2424
mod endpoint_state;
2525
mod path_state;
2626

2727
pub(super) use endpoint_state::EndpointStateMessage;
28-
pub use endpoint_state::{ConnectionType, PathInfo, PathsInfo};
28+
pub use endpoint_state::{ConnectionType, PathInfo, PathsInfo, RemoteInfo};
2929
use endpoint_state::{EndpointStateActor, EndpointStateHandle};
3030

3131
// TODO: use this
@@ -122,17 +122,23 @@ impl EndpointMap {
122122
}
123123
}
124124

125-
/// Returns a [`n0_watcher::Direct`] for given endpoint's [`ConnectionType`].
126-
///
127-
/// # Errors
128-
///
129-
/// Will return `None` if there is not an entry in the [`EndpointMap`] for
130-
/// the `endpoint_id`
131-
pub(super) fn conn_type(
132-
&self,
133-
_endpoint_id: EndpointId,
134-
) -> Option<n0_watcher::Direct<ConnectionType>> {
135-
todo!();
125+
pub(crate) fn remote_info(&self, eid: EndpointId) -> Option<RemoteInfo> {
126+
self.actor_handles
127+
.lock()
128+
.expect("poisoned")
129+
.get(&eid)
130+
.map(|handle| RemoteInfo::new(eid, handle.sender.clone(), handle.selected_path.watch()))
131+
}
132+
133+
pub(crate) fn remotes(&self) -> Vec<RemoteInfo> {
134+
self.actor_handles
135+
.lock()
136+
.expect("poisoned")
137+
.iter()
138+
.map(|(eid, handle)| {
139+
RemoteInfo::new(*eid, handle.sender.clone(), handle.selected_path.watch())
140+
})
141+
.collect()
136142
}
137143

138144
/// Returns the sender for the [`EndpointStateActor`].

0 commit comments

Comments
 (0)