Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 42 additions & 48 deletions iroh/src/magicsock/endpoint_map/endpoint_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::{
endpoint::DirectAddr,
magicsock::{
DiscoState, HEARTBEAT_INTERVAL, MagicsockMetrics, PATH_MAX_IDLE_TIMEOUT,
mapped_addrs::{AddrMap, MappedAddr, MultipathMappedAddr, RelayMappedAddr},
mapped_addrs::{AddrMap, MappedAddr, RelayMappedAddr},
transports::{self, OwnedTransmit},
},
util::MaybeFuture,
Expand Down Expand Up @@ -341,60 +341,54 @@ impl EndpointStateActor {
let events = BroadcastStream::new(conn.path_events());
let stream = events.map(move |evt| (conn_id, evt));
self.path_events.push(Box::pin(stream));
self.connections.insert(
conn_id,
ConnectionState {
let conn_state = self
.connections
.entry(conn_id)
.insert_entry(ConnectionState {
handle: handle.clone(),
pub_open_paths: paths_watchable,
paths: Default::default(),
open_paths: Default::default(),
path_ids: Default::default(),
},
);
})
.into_mut();

// Store PathId(0), set path_status and select best path, check if holepunching
// is needed.
if let Some(conn) = handle.upgrade() {
if let Some(path) = conn.path(PathId::ZERO) {
if let Some(path_remote) = path
.remote_address()
.map_or(None, |remote| Some(MultipathMappedAddr::from(remote)))
.and_then(|mmaddr| mmaddr.to_transport_addr(&self.relay_mapped_addrs))
{
trace!(?path_remote, "added new connection");
let path_remote_is_ip = path_remote.is_ip();
let status = match path_remote {
transports::Addr::Ip(_) => PathStatus::Available,
transports::Addr::Relay(_, _) => PathStatus::Backup,
};
path.set_status(status).ok();
let conn_state =
self.connections.get_mut(&conn_id).expect("inserted above");
conn_state.add_open_path(path_remote.clone(), PathId::ZERO);
self.paths
.entry(path_remote)
.or_default()
.sources
.insert(Source::Connection, Instant::now());
self.select_path();

if path_remote_is_ip {
// We may have raced this with a relay address. Try and add any
// relay addresses we have back.
let relays = self
.paths
.keys()
.filter(|a| a.is_relay())
.cloned()
.collect::<Vec<_>>();
for remote in relays {
self.open_path(&remote);
}
}
if let Some(path) = conn.path(PathId::ZERO)
&& let Ok(socketaddr) = path.remote_address()
&& let Some(path_remote) = self.relay_mapped_addrs.to_transport_addr(socketaddr)
{
trace!(?path_remote, "added new connection");
let path_remote_is_ip = path_remote.is_ip();
let status = match path_remote {
transports::Addr::Ip(_) => PathStatus::Available,
transports::Addr::Relay(_, _) => PathStatus::Backup,
};
path.set_status(status).ok();
conn_state.add_open_path(path_remote.clone(), PathId::ZERO);
self.paths
.entry(path_remote)
.or_default()
.sources
.insert(Source::Connection, Instant::now());
self.select_path();

if path_remote_is_ip {
// We may have raced this with a relay address. Try and add any
// relay addresses we have back.
let relays = self
.paths
.keys()
.filter(|a| a.is_relay())
.cloned()
.collect::<Vec<_>>();
for remote in relays {
self.open_path(&remote);
}
}
self.trigger_holepunching().await;
}
self.trigger_holepunching().await;
}
}

Expand Down Expand Up @@ -802,10 +796,8 @@ impl EndpointStateActor {
path.set_keep_alive_interval(Some(HEARTBEAT_INTERVAL)).ok();
path.set_max_idle_timeout(Some(PATH_MAX_IDLE_TIMEOUT)).ok();

if let Some(path_remote) = path
.remote_address()
.map_or(None, |remote| Some(MultipathMappedAddr::from(remote)))
.and_then(|mmaddr| mmaddr.to_transport_addr(&self.relay_mapped_addrs))
if let Ok(socketaddr) = path.remote_address()
&& let Some(path_remote) = self.relay_mapped_addrs.to_transport_addr(socketaddr)
{
event!(
target: "iroh::_events::path::open",
Expand Down Expand Up @@ -1156,6 +1148,8 @@ impl ConnectionState {
/// Watchables for the open paths and selected transmission path in a connection.
///
/// This is stored in the [`Connection`], and the watchables are set from within the endpoint state actor.
///
/// [`Connection`]: crate::endpoint::Connection
#[derive(Debug, Default, Clone)]
pub(crate) struct PathsWatchable {
/// Watchable for the open paths (in this connection).
Expand Down
62 changes: 40 additions & 22 deletions iroh/src/magicsock/mapped_addrs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,28 +95,6 @@ impl From<SocketAddr> for MultipathMappedAddr {
}
}

impl MultipathMappedAddr {
pub(super) fn to_transport_addr(
&self,
relay_mapped_addrs: &AddrMap<(RelayUrl, EndpointId), RelayMappedAddr>,
) -> Option<transports::Addr> {
match self {
Self::Mixed(_) => {
error!("Mixed addr has no transports::Addr");
None
}
Self::Relay(mapped) => match relay_mapped_addrs.lookup(mapped) {
Some(parts) => Some(transports::Addr::from(parts)),
None => {
error!("Unknown RelayMappedAddr");
None
}
},
Self::Ip(addr) => Some(transports::Addr::from(*addr)),
}
}
}

/// An address used to address a endpoint on any or all paths.
///
/// This is only used for initially connecting to a remote endpoint. We instruct Quinn to
Expand Down Expand Up @@ -307,3 +285,43 @@ impl<K, V> Default for AddrMapInner<K, V> {
}
}
}

/// Functions for the relay mapped address map.
impl AddrMap<(RelayUrl, EndpointId), RelayMappedAddr> {
/// Converts a mapped socket address to a transport address.
///
/// This takes a socket address, converts it into a [`MultipathMappedAddr`] and then tries
/// to convert the mapped address into a [`transports::Addr`].
///
/// Returns `Some` with the transport address for IP mapped addresses and for relay mapped
/// addresses if an entry for the mapped address exists in `self`.
///
/// Returns `None` and emits an error log if the mapped address is a [`MultipathMappedAddr::Mixed`],
/// or if the mapped address is a [`MultipathMappedAddr::Relay`] and `self` does not contain the
/// mapped address.
pub(crate) fn to_transport_addr(
&self,
addr: impl Into<MultipathMappedAddr>,
) -> Option<transports::Addr> {
match addr.into() {
MultipathMappedAddr::Mixed(_) => {
error!(
"Failed to convert addr to transport addr: Mixed mapped addr has no transport address"
);
None
}
MultipathMappedAddr::Relay(relay_mapped_addr) => {
match self.lookup(&relay_mapped_addr) {
Some(parts) => Some(transports::Addr::from(parts)),
None => {
error!(
"Failed to convert addr to transport addr: Unknown relay mapped addr"
);
None
}
}
}
MultipathMappedAddr::Ip(addr) => Some(transports::Addr::from(addr)),
}
}
}
Loading