Skip to content

Commit 34fb289

Browse files
committed
feat: watch connections
1 parent 212e13a commit 34fb289

File tree

9 files changed

+304
-31
lines changed

9 files changed

+304
-31
lines changed

Cargo.lock

Lines changed: 13 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,11 @@ unexpected_cfgs = { level = "warn", check-cfg = ["cfg(iroh_docsrs)", "cfg(iroh_l
4141
[workspace.lints.clippy]
4242
unused-async = "warn"
4343

44-
4544
[patch.crates-io]
46-
netwatch = { git = "https://github.com/n0-computer/net-tools", branch = "feat-multipath" }
47-
portmapper = { git = "https://github.com/n0-computer/net-tools", branch = "feat-multipath" }
45+
netwatch = { git = "https://github.com/n0-computer/net-tools", branch = "Frando/on_closed" }
46+
portmapper = { git = "https://github.com/n0-computer/net-tools", branch = "Frando/on_closed" }
4847

49-
[patch."https://github.com/n0-computer/quinn"]
48+
# [patch."https://github.com/n0-computer/quinn"]
5049
# iroh-quinn = { path = "../iroh-quinn/quinn" }
5150
# iroh-quinn-proto = { path = "../iroh-quinn/quinn-proto" }
5251
# iroh-quinn-udp = { path = "../iroh-quinn/quinn-udp" }

iroh-relay/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ postcard = { version = "1", default-features = false, features = [
4242
"use-std",
4343
"experimental-derive",
4444
] }
45-
quinn = { package = "iroh-quinn", git = "https://github.com/n0-computer/quinn", branch = "main-iroh", default-features = false, features = ["rustls-ring"] }
46-
quinn-proto = { package = "iroh-quinn-proto", git = "https://github.com/n0-computer/quinn", branch = "main-iroh" }
45+
quinn = { package = "iroh-quinn", git = "https://github.com/n0-computer/quinn", branch = "Frando/on_closed", default-features = false, features = ["rustls-ring"] }
46+
quinn-proto = { package = "iroh-quinn-proto", git = "https://github.com/n0-computer/quinn", branch = "Frando/on_closed" }
4747
rand = "0.9.2"
4848
reqwest = { version = "0.12", default-features = false, features = [
4949
"rustls-tls",

iroh/Cargo.toml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,9 @@ n0-watcher = "0.5"
4444
netwatch = { version = "0.12" }
4545
pin-project = "1"
4646
pkarr = { version = "5", default-features = false, features = ["relays"] }
47-
quinn = { package = "iroh-quinn", git = "https://github.com/n0-computer/quinn", branch = "main-iroh", default-features = false, features = ["rustls-ring"] }
48-
quinn-proto = { package = "iroh-quinn-proto", git = "https://github.com/n0-computer/quinn", branch = "main-iroh" }
49-
quinn-udp = { package = "iroh-quinn-udp", git = "https://github.com/n0-computer/quinn", branch = "main-iroh" }
47+
quinn = { package = "iroh-quinn", git = "https://github.com/n0-computer/quinn", branch = "Frando/on_closed", default-features = false, features = ["rustls-ring"] }
48+
quinn-proto = { package = "iroh-quinn-proto", git = "https://github.com/n0-computer/quinn", branch = "Frando/on_closed" }
49+
quinn-udp = { package = "iroh-quinn-udp", git = "https://github.com/n0-computer/quinn", branch = "Frando/on_closed" }
5050
rand = "0.9.2"
5151
reqwest = { version = "0.12", default-features = false, features = [
5252
"rustls-tls",
@@ -90,7 +90,7 @@ hickory-resolver = "0.25.1"
9090
igd-next = { version = "0.16", features = ["aio_tokio"] }
9191
netdev = { version = "0.38.1" }
9292
portmapper = { version = "0.12", default-features = false }
93-
quinn = { package = "iroh-quinn", git = "https://github.com/n0-computer/quinn", branch = "main-iroh", default-features = false, features = ["runtime-tokio", "rustls-ring"] }
93+
quinn = { package = "iroh-quinn", git = "https://github.com/n0-computer/quinn", branch = "Frando/on_closed", default-features = false, features = ["runtime-tokio", "rustls-ring"] }
9494
tokio = { version = "1", features = [
9595
"io-util",
9696
"macros",

iroh/bench/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ iroh = { path = ".." }
1212
iroh-metrics = "0.37"
1313
n0-future = "0.3.0"
1414
n0-error = "0.1.0"
15-
quinn = { package = "iroh-quinn", git = "https://github.com/n0-computer/quinn", branch = "main-iroh" }
15+
quinn = { package = "iroh-quinn", git = "https://github.com/n0-computer/quinn", branch = "Frando/on_closed" }
1616
rand = "0.9.2"
1717
rcgen = "0.14"
1818
rustls = { version = "0.23.33", default-features = false, features = ["ring"] }
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
use std::{sync::Arc, time::Duration};
2+
3+
use iroh::{
4+
Endpoint, RelayMode,
5+
endpoint::{ConnectionInfo, ConnectionMonitor},
6+
};
7+
use n0_error::{Result, StackResultExt, StdResultExt};
8+
use n0_future::task::AbortOnDropHandle;
9+
use tokio::{
10+
sync::mpsc::{UnboundedReceiver, UnboundedSender},
11+
task::JoinSet,
12+
};
13+
use tracing::{Instrument, info, info_span};
14+
15+
const ALPN: &[u8] = b"iroh/test";
16+
17+
#[tokio::main]
18+
async fn main() -> Result {
19+
tracing_subscriber::fmt()
20+
.with_env_filter(
21+
tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| "info".into()),
22+
)
23+
.init();
24+
25+
let monitor = Monitor::new();
26+
let server = Endpoint::empty_builder(RelayMode::Disabled)
27+
.alpns(vec![ALPN.to_vec()])
28+
.monitor_connections(monitor.clone())
29+
.bind()
30+
.instrument(info_span!("server"))
31+
.await?;
32+
let server_addr = server.addr();
33+
34+
let client_task = tokio::spawn(
35+
async move {
36+
let client = Endpoint::empty_builder(RelayMode::Disabled)
37+
.bind()
38+
.instrument(info_span!("client"))
39+
.await?;
40+
let conn1 = client.connect(server_addr.clone(), ALPN).await?;
41+
info!("conn1");
42+
conn1.close(23u32.into(), b"bye");
43+
let conn2 = client.connect(server_addr.clone(), ALPN).await?;
44+
info!("conn2");
45+
conn2.close(23u32.into(), b"bye");
46+
tokio::time::sleep(Duration::from_secs(1)).await;
47+
client.close().await;
48+
n0_error::Ok(client)
49+
}
50+
.instrument(info_span!("client")),
51+
);
52+
53+
let server_task = tokio::spawn(
54+
async move {
55+
let mut i = 0;
56+
while let Some(conn) = server.accept().await {
57+
info!("accepted");
58+
let conn = conn.await?;
59+
info!(remote=%conn.remote_id().fmt_short(), "established");
60+
conn.closed().await;
61+
info!(remote=%conn.remote_id().fmt_short(), "closed");
62+
i += 1;
63+
if i == 2 {
64+
break;
65+
}
66+
}
67+
info!("done");
68+
n0_error::Ok(server)
69+
}
70+
.instrument(info_span!("server")),
71+
);
72+
let client = client_task.await.std_context("client")?.context("client")?;
73+
let server = server_task.await.std_context("server")?.context("server")?;
74+
client.close().await;
75+
server.close().await;
76+
77+
tokio::time::sleep(Duration::from_secs(1)).await;
78+
drop(monitor);
79+
80+
Ok(())
81+
}
82+
83+
/// Our connection monitor impl.
84+
///
85+
/// This here only logs connection open and close events via tracing.
86+
/// It could also maintain a datastructure of all connections, or send the stats to some metrics service.
87+
#[derive(Clone)]
88+
struct Monitor {
89+
tx: UnboundedSender<ConnectionInfo>,
90+
_task: Arc<AbortOnDropHandle<()>>,
91+
}
92+
93+
impl ConnectionMonitor for Monitor {
94+
fn on_connection(&self, connection: ConnectionInfo) {
95+
self.tx.send(connection).ok();
96+
}
97+
}
98+
99+
impl Monitor {
100+
fn new() -> Self {
101+
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
102+
let task = tokio::spawn(Self::run(rx).instrument(info_span!("watcher")));
103+
Self {
104+
tx,
105+
_task: Arc::new(AbortOnDropHandle::new(task)),
106+
}
107+
}
108+
109+
async fn run(mut rx: UnboundedReceiver<ConnectionInfo>) {
110+
let mut tasks = JoinSet::new();
111+
loop {
112+
tokio::select! {
113+
Some(conn) = rx.recv() => {
114+
let alpn = String::from_utf8_lossy(conn.alpn()).to_string();
115+
let remote = conn.remote_id().fmt_short();
116+
info!(%remote, %alpn, rtt=?conn.latency(), "new connection");
117+
tasks.spawn(async move {
118+
match conn.closed().await {
119+
Some((close_reason, stats)) => {
120+
// We have access to the final stats of the connection!
121+
info!(%remote, %alpn, ?close_reason, udp_rx=stats.udp_rx.bytes, udp_tx=stats.udp_tx.bytes, "connection closed");
122+
}
123+
None => {
124+
// The connection was closed before we could register our stats-on-close listener.
125+
info!(%remote, %alpn, "connection closed before tracking started");
126+
}
127+
}
128+
}.instrument(tracing::Span::current()));
129+
}
130+
Some(res) = tasks.join_next(), if !tasks.is_empty() => res.expect("conn close task panicked"),
131+
else => break,
132+
}
133+
while let Some(res) = tasks.join_next().await {
134+
res.expect("conn close task panicked");
135+
}
136+
}
137+
}
138+
}

iroh/src/endpoint.rs

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,8 @@ pub use quinn_proto::{
6969

7070
pub use self::connection::{
7171
Accept, Accepting, AlpnError, AuthenticationError, Connecting, ConnectingError, Connection,
72-
Incoming, IncomingZeroRttConnection, OutgoingZeroRttConnection, RemoteEndpointIdError,
73-
ZeroRttStatus,
72+
ConnectionInfo, Incoming, IncomingZeroRttConnection, OutgoingZeroRttConnection,
73+
RemoteEndpointIdError, ZeroRttStatus,
7474
};
7575

7676
/// The delay to fall back to discovery when direct addresses fail.
@@ -98,7 +98,7 @@ pub enum PathSelection {
9898
/// new [`EndpointId`].
9999
///
100100
/// To create the [`Endpoint`] call [`Builder::bind`].
101-
#[derive(Debug)]
101+
#[derive(derive_more::Debug)]
102102
pub struct Builder {
103103
secret_key: Option<SecretKey>,
104104
relay_mode: RelayMode,
@@ -117,6 +117,8 @@ pub struct Builder {
117117
#[cfg(any(test, feature = "test-utils"))]
118118
path_selection: PathSelection,
119119
max_tls_tickets: usize,
120+
#[debug("{}", connection_monitor.as_ref().map(|_| "Some(Box<dyn ConnectionMonitor>)").unwrap_or("None"))]
121+
connection_monitor: Option<Box<dyn ConnectionMonitor>>,
120122
}
121123

122124
impl Builder {
@@ -158,6 +160,7 @@ impl Builder {
158160
#[cfg(any(test, feature = "test-utils"))]
159161
path_selection: PathSelection::default(),
160162
max_tls_tickets: DEFAULT_MAX_TLS_TICKETS,
163+
connection_monitor: None,
161164
}
162165
}
163166

@@ -208,6 +211,7 @@ impl Builder {
208211
#[cfg(any(test, feature = "test-utils"))]
209212
path_selection: self.path_selection,
210213
metrics,
214+
connection_monitor: self.connection_monitor,
211215
};
212216

213217
let msock = magicsock::MagicSock::spawn(msock_opts).await?;
@@ -432,6 +436,44 @@ impl Builder {
432436
self.max_tls_tickets = n;
433437
self
434438
}
439+
440+
// TODO docs
441+
/// Register a handler that is invoked for each connection the endpoint accepts or initiates.
442+
///
443+
/// The [`ConnectionMonitor::on_connection`] method is invoked synchronosuly, from within a tokio
444+
/// context. So you can spawn tasks if needed.
445+
/// Make sure that whatever you do with the connection info here is non-blocking.
446+
/// Usually you'd want to send the info over a broadcast or unbounded channel,
447+
/// or insert it into some persistent datastructure.
448+
///
449+
/// The `ConnectionInfo` internally contains a weak reference to the connection,
450+
/// so keeping the struct alive does not keep the connection alive.
451+
/// Note however that `ConnectionInfo` does keep an allocation per connection alive
452+
/// so to not leak memory you should drop the `ConnectionInfo` eventually
453+
///
454+
/// [`ConnectionMonitor`] is implemented for `Fn(ConnectionInfo)`, so you can
455+
/// also pass a closure that takes [`ConnectionInfo`] to this function.
456+
pub fn monitor_connections(mut self, monitor: impl ConnectionMonitor) -> Self {
457+
self.connection_monitor = Some(Box::new(monitor));
458+
self
459+
}
460+
}
461+
462+
/// Monitor each connection accepted or initiated by the endpoint.
463+
pub trait ConnectionMonitor: Send + Sync + 'static {
464+
/// Called for each new connection the endpoint accepts or initiates.
465+
///
466+
/// This is only called when a connection is fully established.
467+
fn on_connection(&self, connection: ConnectionInfo);
468+
}
469+
470+
impl<T> ConnectionMonitor for T
471+
where
472+
T: Fn(ConnectionInfo) + Send + Sync + 'static,
473+
{
474+
fn on_connection(&self, connection: ConnectionInfo) {
475+
(self)(connection)
476+
}
435477
}
436478

437479
/// Configuration for a [`quinn::Endpoint`] that cannot be changed at runtime.

0 commit comments

Comments
 (0)