Skip to content

Commit 6f0dec7

Browse files
authored
feat: Make it easier to use existing connections with irpc. (#79)
E.g. to use connections out of a connection pool without having to build a irpc client pool. - rename the lazy, reconnecting connections to ...Lazy... - introduce IrohRemoteConnection to just wrap an iroh connection - implement RemoteConnection directly for quinn::Connection
1 parent 428988e commit 6f0dec7

File tree

5 files changed

+69
-43
lines changed

5 files changed

+69
-43
lines changed

irpc-iroh/examples/0rtt.rs

Lines changed: 5 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -188,12 +188,9 @@ mod cli {
188188
mod ping {
189189
use anyhow::{Context, Result};
190190
use futures_util::FutureExt;
191-
use iroh::{
192-
endpoint::{Connection, RecvStream, SendStream},
193-
Endpoint,
194-
};
191+
use iroh::Endpoint;
195192
use irpc::{channel::oneshot, rpc::RemoteService, rpc_requests, Client, WithChannels};
196-
use irpc_iroh::{Iroh0RttProtocol, IrohProtocol};
193+
use irpc_iroh::{Iroh0RttProtocol, IrohProtocol, IrohRemoteConnection};
197194
use n0_future::future;
198195
use serde::{Deserialize, Serialize};
199196
use tracing::info;
@@ -250,7 +247,7 @@ mod ping {
250247
.context("failed to connect to remote service")?;
251248
let fut: future::Boxed<bool> = Box::pin(async { true });
252249
Ok(EchoApi {
253-
inner: Client::boxed(IrohConnection(conn)),
250+
inner: Client::boxed(IrohRemoteConnection::new(conn)),
254251
zero_rtt_accepted: fut.shared(),
255252
})
256253
}
@@ -268,7 +265,7 @@ mod ping {
268265
info!("0-RTT possible from our side");
269266
let fut: future::Boxed<bool> = Box::pin(zero_rtt_accepted);
270267
Ok(EchoApi {
271-
inner: Client::boxed(IrohConnection(conn)),
268+
inner: Client::boxed(IrohRemoteConnection::new(conn)),
272269
zero_rtt_accepted: fut.shared(),
273270
})
274271
}
@@ -277,7 +274,7 @@ mod ping {
277274
let fut: future::Boxed<bool> = Box::pin(async { true });
278275
let conn = connecting.await?;
279276
Ok(EchoApi {
280-
inner: Client::boxed(IrohConnection(conn)),
277+
inner: Client::boxed(IrohRemoteConnection::new(conn)),
281278
zero_rtt_accepted: fut.shared(),
282279
})
283280
}
@@ -321,25 +318,4 @@ mod ping {
321318
}
322319
}
323320
}
324-
325-
#[derive(Debug, Clone)]
326-
struct IrohConnection(Connection);
327-
328-
impl irpc::rpc::RemoteConnection for IrohConnection {
329-
fn clone_boxed(&self) -> Box<dyn irpc::rpc::RemoteConnection> {
330-
Box::new(self.clone())
331-
}
332-
333-
fn open_bi(
334-
&self,
335-
) -> n0_future::future::Boxed<
336-
std::result::Result<(SendStream, RecvStream), irpc::RequestError>,
337-
> {
338-
let conn = self.0.clone();
339-
Box::pin(async move {
340-
let (send, recv) = conn.open_bi().await?;
341-
Ok((send, recv))
342-
})
343-
}
344-
}
345321
}

irpc-iroh/examples/auth.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ mod storage {
7676
rpc_requests, Client, WithChannels,
7777
};
7878
// Import the macro
79-
use irpc_iroh::{read_request, IrohRemoteConnection};
79+
use irpc_iroh::{read_request, IrohLazyRemoteConnection};
8080
use serde::{Deserialize, Serialize};
8181
use tracing::info;
8282

@@ -225,7 +225,7 @@ mod storage {
225225
pub const ALPN: &[u8] = ALPN;
226226

227227
pub fn connect(endpoint: Endpoint, addr: impl Into<iroh::EndpointAddr>) -> StorageClient {
228-
let conn = IrohRemoteConnection::new(endpoint, addr.into(), Self::ALPN.to_vec());
228+
let conn = IrohLazyRemoteConnection::new(endpoint, addr.into(), Self::ALPN.to_vec());
229229
StorageClient {
230230
inner: Client::boxed(conn),
231231
}

irpc-iroh/examples/derive.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ mod storage {
6565
rpc_requests, Client, WithChannels,
6666
};
6767
// Import the macro
68-
use irpc_iroh::{IrohProtocol, IrohRemoteConnection};
68+
use irpc_iroh::{IrohLazyRemoteConnection, IrohProtocol};
6969
use serde::{Deserialize, Serialize};
7070
use tracing::info;
7171

@@ -161,7 +161,7 @@ mod storage {
161161
endpoint: Endpoint,
162162
addr: impl Into<iroh::EndpointAddr>,
163163
) -> Result<StorageApi> {
164-
let conn = IrohRemoteConnection::new(endpoint, addr.into(), Self::ALPN.to_vec());
164+
let conn = IrohLazyRemoteConnection::new(endpoint, addr.into(), Self::ALPN.to_vec());
165165
Ok(StorageApi {
166166
inner: Client::boxed(conn),
167167
})

irpc-iroh/src/lib.rs

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,48 @@ pub fn client<S: irpc::Service>(
2626
addr: impl Into<iroh::EndpointAddr>,
2727
alpn: impl AsRef<[u8]>,
2828
) -> irpc::Client<S> {
29-
let conn = IrohRemoteConnection::new(endpoint, addr.into(), alpn.as_ref().to_vec());
29+
let conn = IrohLazyRemoteConnection::new(endpoint, addr.into(), alpn.as_ref().to_vec());
3030
irpc::Client::boxed(conn)
3131
}
32+
33+
/// Wrap an existing iroh connection as an irpc remote connection.
34+
///
35+
/// This will stop working as soon as the underlying iroh connection is closed.
36+
/// If you need to support reconnects, use [`IrohLazyRemoteConnection`] instead.
37+
// TODO: remove this and provide a From instance as soon as iroh is 1.0 and
38+
// we can move irpc-iroh into irpc?
39+
#[derive(Debug, Clone)]
40+
pub struct IrohRemoteConnection(Connection);
41+
42+
impl IrohRemoteConnection {
43+
pub fn new(connection: Connection) -> Self {
44+
Self(connection)
45+
}
46+
}
47+
48+
impl irpc::rpc::RemoteConnection for IrohRemoteConnection {
49+
fn clone_boxed(&self) -> Box<dyn irpc::rpc::RemoteConnection> {
50+
Box::new(self.clone())
51+
}
52+
53+
fn open_bi(
54+
&self,
55+
) -> n0_future::future::Boxed<std::result::Result<(SendStream, RecvStream), irpc::RequestError>>
56+
{
57+
let conn = self.0.clone();
58+
Box::pin(async move {
59+
let (send, recv) = conn.open_bi().await?;
60+
Ok((send, recv))
61+
})
62+
}
63+
}
64+
3265
/// A connection to a remote service.
3366
///
3467
/// Initially this does just have the endpoint and the address. Once a
3568
/// connection is established, it will be stored.
3669
#[derive(Debug, Clone)]
37-
pub struct IrohRemoteConnection(Arc<IrohRemoteConnectionInner>);
70+
pub struct IrohLazyRemoteConnection(Arc<IrohRemoteConnectionInner>);
3871

3972
#[derive(Debug)]
4073
struct IrohRemoteConnectionInner {
@@ -44,7 +77,7 @@ struct IrohRemoteConnectionInner {
4477
alpn: Vec<u8>,
4578
}
4679

47-
impl IrohRemoteConnection {
80+
impl IrohLazyRemoteConnection {
4881
pub fn new(endpoint: iroh::Endpoint, addr: iroh::EndpointAddr, alpn: Vec<u8>) -> Self {
4982
Self(Arc::new(IrohRemoteConnectionInner {
5083
endpoint,
@@ -55,7 +88,7 @@ impl IrohRemoteConnection {
5588
}
5689
}
5790

58-
impl RemoteConnection for IrohRemoteConnection {
91+
impl RemoteConnection for IrohLazyRemoteConnection {
5992
fn clone_boxed(&self) -> Box<dyn RemoteConnection> {
6093
Box::new(self.clone())
6194
}

src/lib.rs

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1285,7 +1285,7 @@ impl<S: Service> Client<S> {
12851285
/// and a socket `addr` of the remote service.
12861286
#[cfg(feature = "rpc")]
12871287
pub fn quinn(endpoint: quinn::Endpoint, addr: std::net::SocketAddr) -> Self {
1288-
Self::boxed(rpc::QuinnRemoteConnection::new(endpoint, addr))
1288+
Self::boxed(rpc::QuinnLazyRemoteConnection::new(endpoint, addr))
12891289
}
12901290

12911291
/// Create a new client from a `rpc::RemoteConnection` trait object.
@@ -1899,26 +1899,43 @@ pub mod rpc {
18991899
/// Initially this does just have the endpoint and the address. Once a
19001900
/// connection is established, it will be stored.
19011901
#[derive(Debug, Clone)]
1902-
pub(crate) struct QuinnRemoteConnection(Arc<QuinnRemoteConnectionInner>);
1902+
pub(crate) struct QuinnLazyRemoteConnection(Arc<QuinnLazyRemoteConnectionInner>);
19031903

19041904
#[derive(Debug)]
1905-
struct QuinnRemoteConnectionInner {
1905+
struct QuinnLazyRemoteConnectionInner {
19061906
pub endpoint: quinn::Endpoint,
19071907
pub addr: std::net::SocketAddr,
19081908
pub connection: tokio::sync::Mutex<Option<quinn::Connection>>,
19091909
}
19101910

1911-
impl QuinnRemoteConnection {
1911+
impl RemoteConnection for quinn::Connection {
1912+
fn clone_boxed(&self) -> Box<dyn RemoteConnection> {
1913+
Box::new(self.clone())
1914+
}
1915+
1916+
fn open_bi(
1917+
&self,
1918+
) -> BoxFuture<std::result::Result<(quinn::SendStream, quinn::RecvStream), RequestError>>
1919+
{
1920+
let conn = self.clone();
1921+
Box::pin(async move {
1922+
let pair = conn.open_bi().await?;
1923+
Ok(pair)
1924+
})
1925+
}
1926+
}
1927+
1928+
impl QuinnLazyRemoteConnection {
19121929
pub fn new(endpoint: quinn::Endpoint, addr: std::net::SocketAddr) -> Self {
1913-
Self(Arc::new(QuinnRemoteConnectionInner {
1930+
Self(Arc::new(QuinnLazyRemoteConnectionInner {
19141931
endpoint,
19151932
addr,
19161933
connection: Default::default(),
19171934
}))
19181935
}
19191936
}
19201937

1921-
impl RemoteConnection for QuinnRemoteConnection {
1938+
impl RemoteConnection for QuinnLazyRemoteConnection {
19221939
fn clone_boxed(&self) -> Box<dyn RemoteConnection> {
19231940
Box::new(self.clone())
19241941
}

0 commit comments

Comments
 (0)