Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions iroh/examples/transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -522,13 +522,21 @@ fn parse_byte_size(s: &str) -> std::result::Result<u64, parse_size::Error> {
}

fn watch_conn_type(endpoint: &Endpoint, endpoint_id: EndpointId) -> AbortOnDropHandle<()> {
let mut stream = endpoint.conn_type(endpoint_id).unwrap().stream();
let info = endpoint.remote_info(endpoint_id).unwrap();
let mut stream = info.selected_path().stream();
let task = tokio::task::spawn(async move {
while let Some(conn_type) = stream.next().await {
println!(
"[{}] Connection type changed to: {conn_type}",
endpoint_id.fmt_short()
);
while let Some(selected_path) = stream.next().await {
if let Some(selected_path) = selected_path {
let label = match selected_path {
TransportAddr::Ip(addr) => format!("direct ({addr})"),
TransportAddr::Relay(url) => format!("relay ({url})"),
_ => format!("unknown transport"),
};
println!(
"[{}] Connection type changed to: {label}",
endpoint_id.fmt_short()
);
}
}
});
AbortOnDropHandle::new(task)
Expand Down
52 changes: 19 additions & 33 deletions iroh/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use url::Url;

pub use super::magicsock::{
AddEndpointAddrError, ConnectionType, DirectAddr, DirectAddrType, PathInfo, PathsInfo,
endpoint_map::Source,
RemoteInfo, endpoint_map::Source,
};
#[cfg(wasm_browser)]
use crate::discovery::pkarr::PkarrResolver;
Expand Down Expand Up @@ -55,8 +55,8 @@ pub use quinn::{
AcceptBi, AcceptUni, AckFrequencyConfig, ApplicationClose, Chunk, ClosedStream,
ConnectionClose, ConnectionError, ConnectionStats, MtuDiscoveryConfig, OpenBi, OpenUni,
ReadDatagram, ReadError, ReadExactError, ReadToEndError, RecvStream, ResetError, RetryError,
SendDatagramError, SendStream, ServerConfig, StoppedError, StreamId, TransportConfig, VarInt,
WeakConnectionHandle, WriteError,
SendDatagramError, SendStream, ServerConfig, Side, StoppedError, StreamId, TransportConfig,
VarInt, WeakConnectionHandle, WriteError,
};
pub use quinn_proto::{
FrameStats, PathStats, TransportError, TransportErrorCode, UdpStats, Written,
Expand All @@ -69,8 +69,8 @@ pub use quinn_proto::{

pub use self::connection::{
Accept, Accepting, AlpnError, AuthenticationError, Connecting, ConnectingError, Connection,
Incoming, IncomingZeroRttConnection, OutgoingZeroRttConnection, RemoteEndpointIdError,
ZeroRttStatus,
ConnectionInfo, Incoming, IncomingZeroRttConnection, OutgoingZeroRttConnection,
RemoteEndpointIdError, ZeroRttStatus,
};

/// The delay to fall back to discovery when direct addresses fail.
Expand Down Expand Up @@ -98,7 +98,7 @@ pub enum PathSelection {
/// new [`EndpointId`].
///
/// To create the [`Endpoint`] call [`Builder::bind`].
#[derive(Debug)]
#[derive(derive_more::Debug)]
pub struct Builder {
secret_key: Option<SecretKey>,
relay_mode: RelayMode,
Expand Down Expand Up @@ -963,38 +963,24 @@ impl Endpoint {
//
// Partially they return things passed into the builder.

/// Returns a [`Watcher`] that reports the current connection type and any changes for
/// given remote endpoint.
///
/// This watcher allows observing a stream of [`ConnectionType`] items by calling
/// [`Watcher::stream()`]. If the underlying connection to a remote endpoint changes, it will
/// yield a new item. These connection changes are when the connection switches between
/// using the Relay server and a direct connection.
///
/// Note that this does not guarantee each connection change is yielded in the stream.
/// If the connection type changes several times before this stream is polled, only the
/// last recorded state is returned. This can be observed e.g. right at the start of a
/// connection when the switch from a relayed to a direct connection can be so fast that
/// the relayed state is never exposed.
/// Information about a remote endpoint.
///
/// If there is currently a connection with the remote endpoint, then using [`Watcher::get`]
/// will immediately return either [`ConnectionType::Relay`], [`ConnectionType::Direct`]
/// or [`ConnectionType::Mixed`].
/// From the [`RemoteInfo`] you can watch which path is selected, get the current
/// round-trip time (latency), and get a list of [`ConnectionInfo`].
///
/// It is possible for the connection type to be [`ConnectionType::None`] if you've
/// recently connected to this endpoint id but previous methods of reaching the endpoint have
/// become inaccessible.
///
/// Will return `None` if we do not have any address information for the given `endpoint_id`.
pub fn conn_type(&self, endpoint_id: EndpointId) -> Option<n0_watcher::Direct<ConnectionType>> {
self.msock.conn_type(endpoint_id)
/// Returns `None` if we don't have any state for this remote.
pub fn remote_info(&self, endpoint_id: EndpointId) -> Option<RemoteInfo> {
self.msock.endpoint_map.remote_info(endpoint_id)
}

/// Returns the currently lowest latency for this endpoint.
/// Returns a list of all remote endpoints that this endpoint is dealing with.
///
/// This includes all endpoints to which we have active connections. It also may include endpoints
/// to which we are in the process of connecting, or have recently been connected to.
///
/// Will return `None` if we do not have any address information for the given `endpoint_id`.
pub async fn latency(&self, endpoint_id: EndpointId) -> Option<Duration> {
self.msock.latency(endpoint_id).await
/// TODO: Expand docs.
pub fn remotes(&self) -> Vec<RemoteInfo> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd consider making this impl Iterator, it means we don't have to allocate a vec (even if we currently do internally). E.g. if you have a busy server this could be thousands really.

self.msock.endpoint_map.remotes()
}

/// Returns the DNS resolver used in this [`Endpoint`].
Expand Down
90 changes: 84 additions & 6 deletions iroh/src/endpoint/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use n0_watcher::{Watchable, Watcher};
use pin_project::pin_project;
use quinn::{
AcceptBi, AcceptUni, ConnectionError, ConnectionStats, OpenBi, OpenUni, ReadDatagram,
RetryError, SendDatagramError, ServerConfig, VarInt,
RetryError, SendDatagramError, ServerConfig, Side, VarInt, WeakConnectionHandle,
};
use quinn_proto::PathId;
use tracing::warn;
Expand Down Expand Up @@ -241,15 +241,16 @@ fn conn_from_quinn_conn(
let alpn = alpn_from_quinn_conn(&conn).ok_or_else(|| e!(AuthenticationError::NoAlpn))?;
let paths_info_watchable = init_paths_info_watcher(&conn, ep);
let paths_info = paths_info_watchable.watch();
// Register this connection with the magicsock.
ep.msock
.register_connection(remote_id, &conn, paths_info_watchable.clone());
Ok(Connection {
let conn = Connection {
remote_id,
alpn,
inner: conn,
paths_info,
})
};
let info = conn.to_info();
// Register this connection with the magicsock.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove this comment now, so we look a bit less like an AI 😅

ep.msock.register_connection(info, paths_info_watchable);
Ok(conn)
}

fn init_paths_info_watcher(conn: &quinn::Connection, ep: &Endpoint) -> Watchable<PathsInfo> {
Expand Down Expand Up @@ -1530,6 +1531,83 @@ impl Connection {
pub fn set_max_concurrent_bi_streams(&self, count: VarInt) {
self.inner.set_max_concurrent_bi_streams(count)
}

/// Returns the side of the connection (client or server).
pub fn side(&self) -> Side {
self.inner.side()
}
Comment on lines +1535 to +1538
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, thanks for adding this!

I'd write a lot more docs, like explaining why on the QUIC-level this is a thing and why you might want to know for a protocol despite that iroh always allows you to be both. Basically assume you get a user landing from hn or reddit and exploring iroh for the first time without having a clue.


/// Returns a [`ConnectionInfo`], which is a weak handle to the connection
/// that does not keep the connection alive, but does allow to access some information
/// about the connection, and allows to wait for the connection to be closed.
Comment on lines +1540 to +1542
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a stickler for following the rust doc style guide strictly :)

So re-phrasing this so you have a 1-line summary followed by a blank line and longer explanation will focus you on writing meaningful words in that one line, because that's the line you'll see on rustdoc summary pages and will most often read from ide help.

(I'll only leave this comment once)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fully. Haven't done docs properly at all yet, wanna settle the approach first.

pub fn to_info(&self) -> ConnectionInfo {
ConnectionInfo {
alpn: self.alpn.clone(),
remote_id: self.remote_id,
inner: self.inner.weak_handle(),
paths_info: self.paths_info.clone(),
side: self.side(),
}
}
}

/// A [`ConnectionInfo`] is a weak handle to a connection that exposes some information about the connection,
/// but does not keep the connection alive.
#[derive(Debug, Clone)]
pub struct ConnectionInfo {
pub(crate) side: Side,
pub(crate) alpn: Vec<u8>,
pub(crate) remote_id: EndpointId,
pub(crate) inner: WeakConnectionHandle,
pub(crate) paths_info: n0_watcher::Direct<PathsInfo>,
}

#[allow(missing_docs)]
impl ConnectionInfo {
pub fn alpn(&self) -> &[u8] {
&self.alpn
}

pub fn remote_id(&self) -> &EndpointId {
&self.remote_id
}

pub fn is_alive(&self) -> bool {
self.inner.upgrade().is_some()
}

/// Returns information about the network paths in use by this connection.
///
/// A connection can have several network paths to the remote endpoint, commonly there
/// will be a path via the relay server and a holepunched path. This returns all the
/// paths in use by this connection.
pub fn paths_info(&self) -> &impl Watcher<Value = PathsInfo> {
&self.paths_info
}

// We could add such util methods here, not sure.
pub fn has_direct_path(&mut self) -> bool {
self.paths_info.get().keys().any(|addr| addr.is_ip())
}
Comment on lines +1588 to +1591
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think selected_path like RemoteInfo has is a better idea for now.


/// Current best estimate of this connection's latency (round-trip-time)
///
/// Returns `None` if the connection has been dropped.
pub fn rtt(&self) -> Option<Duration> {
self.inner.upgrade().map(|conn| conn.rtt())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where you are getting this value from is problematic currently. That's a known issue on the multipath branch that this is kinda bogus.

Probably should take the rtt of the selected path. If getting that isn't too expensive here. It would be nice to expose probably.

}

/// Returns connection statistics.
///
/// Returns `None` if the connection has been dropped.
pub fn stats(&self) -> Option<ConnectionStats> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One issue I have with ConnectionStats is that this includes PathStats indexed by PathId. I'm not really sure I want to expose PathIds to our users. We also need to newtype this anyway... so perhaps the first version can just drop PathStats? The more complete version would be to change that ConnectionStats::paths to be a map keyed by TransportAddr, but that might again be too much/expensive. Maybe if you want those you should go paths themselves? Not sure.

self.inner.upgrade().map(|conn| conn.stats())
}

/// Returns the side of the connection (client or server).
pub fn side(&self) -> Side {
self.side
}
}

#[cfg(test)]
Expand Down
39 changes: 6 additions & 33 deletions iroh/src/magicsock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ use crate::{
defaults::timeouts::NET_REPORT_TIMEOUT,
disco::{self, SendAddr},
discovery::{ConcurrentDiscovery, Discovery, EndpointData, UserData},
endpoint::ConnectionInfo,
key::{DecryptionError, SharedSecret, public_ed_box, secret_ed_box},
metrics::EndpointMetrics,
net_report::{self, IfStateDetails, Report},
Expand All @@ -77,7 +78,7 @@ pub(crate) mod transports;
use mapped_addrs::{EndpointIdMappedAddr, MappedAddr};

pub use self::{
endpoint_map::{ConnectionType, PathInfo, PathsInfo},
endpoint_map::{ConnectionType, PathInfo, PathsInfo, RemoteInfo},
metrics::Metrics,
};

Expand Down Expand Up @@ -174,7 +175,7 @@ pub(crate) struct Handle {
/// It is usually only necessary to use a single [`MagicSock`] instance in an application, it
/// means any QUIC endpoints on top will be sharing as much information about endpoints as
/// possible.
#[derive(Debug)]
#[derive(derive_more::Debug)]
pub(crate) struct MagicSock {
/// Channel to send to the internal actor.
actor_sender: mpsc::Sender<ActorMessage>,
Expand Down Expand Up @@ -270,8 +271,7 @@ impl MagicSock {
/// connection.
pub(crate) fn register_connection(
&self,
remote: EndpointId,
conn: &quinn::Connection,
conn: ConnectionInfo,
paths_info: n0_watcher::Watchable<PathsInfo>,
) {
// TODO: Spawning tasks like this is obviously bad. But it is solvable:
Expand All @@ -289,9 +289,8 @@ impl MagicSock {
// have a ZrttConnection::into_connection() function which can be async and actually
// send this. Before the handshake has completed we don't have anything useful to
// do with this connection inside of the EndpointStateActor anyway.
let weak_handle = conn.weak_handle();
let endpoint_state = self.endpoint_map.endpoint_state_actor(remote);
let msg = EndpointStateMessage::AddConnection(weak_handle, paths_info);
let endpoint_state = self.endpoint_map.endpoint_state_actor(conn.remote_id);
let msg = EndpointStateMessage::AddConnection(conn, paths_info);

task::spawn(async move {
endpoint_state.send(msg).await.ok();
Expand Down Expand Up @@ -397,32 +396,6 @@ impl MagicSock {
})
}

/// Returns a [`n0_watcher::Direct`] that reports the [`ConnectionType`] we have to the
/// given `endpoint_id`.
///
/// This gets us a copy of the [`n0_watcher::Direct`] for the [`Watchable`] with a
/// [`ConnectionType`] that the `EndpointMap` stores for each `endpoint_id`'s endpoint.
///
/// # Errors
///
/// Will return `None` if there is no address information known about the
/// given `endpoint_id`.
pub(crate) fn conn_type(&self, eid: EndpointId) -> Option<n0_watcher::Direct<ConnectionType>> {
self.endpoint_map.conn_type(eid)
}

// TODO: Build better info to expose to the user about remote nodes. We probably want
// to expose this as part of path information instead.
pub(crate) async fn latency(&self, eid: EndpointId) -> Option<Duration> {
let endpoint_state = self.endpoint_map.endpoint_state_actor(eid);
let (tx, rx) = oneshot::channel();
endpoint_state
.send(EndpointStateMessage::Latency(tx))
.await
.ok();
rx.await.unwrap_or_default()
}

/// Returns the socket address which can be used by the QUIC layer to dial this endpoint.
pub(crate) fn get_endpoint_mapped_addr(&self, eid: EndpointId) -> EndpointIdMappedAddr {
self.endpoint_map.endpoint_mapped_addr(eid)
Expand Down
32 changes: 19 additions & 13 deletions iroh/src/magicsock/endpoint_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ use super::{
mapped_addrs::{AddrMap, EndpointIdMappedAddr, MultipathMappedAddr, RelayMappedAddr},
transports::{self, OwnedTransmit, TransportsSender},
};
use crate::disco::{self};
use crate::disco;
// #[cfg(any(test, feature = "test-utils"))]
// use crate::endpoint::PathSelection;

mod endpoint_state;
mod path_state;

pub(super) use endpoint_state::EndpointStateMessage;
pub use endpoint_state::{ConnectionType, PathInfo, PathsInfo};
pub use endpoint_state::{ConnectionType, PathInfo, PathsInfo, RemoteInfo};
use endpoint_state::{EndpointStateActor, EndpointStateHandle};

// TODO: use this
Expand Down Expand Up @@ -122,17 +122,23 @@ impl EndpointMap {
}
}

/// Returns a [`n0_watcher::Direct`] for given endpoint's [`ConnectionType`].
///
/// # Errors
///
/// Will return `None` if there is not an entry in the [`EndpointMap`] for
/// the `endpoint_id`
pub(super) fn conn_type(
&self,
_endpoint_id: EndpointId,
) -> Option<n0_watcher::Direct<ConnectionType>> {
todo!();
pub(crate) fn remote_info(&self, eid: EndpointId) -> Option<RemoteInfo> {
self.actor_handles
.lock()
.expect("poisoned")
.get(&eid)
.map(|handle| RemoteInfo::new(eid, handle.sender.clone(), handle.selected_path.watch()))
}

pub(crate) fn remotes(&self) -> Vec<RemoteInfo> {
self.actor_handles
.lock()
.expect("poisoned")
.iter()
.map(|(eid, handle)| {
RemoteInfo::new(*eid, handle.sender.clone(), handle.selected_path.watch())
})
.collect()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right. It's the lock inside again that forces you to collect. This is kind of bad because of the many connections things.

I can think of wrapping the mutex in an Arc and that would allow you to clone it out and pass it to a custom iterator struct which could then take the lock on each .next() call... Not sure if that's much better. It was kind of nice not having an Arc here...

}

/// Returns the sender for the [`EndpointStateActor`].
Expand Down
Loading
Loading