Skip to content

Commit 255a992

Browse files
committed
Extemd example so it can be run on a remote node.
1 parent 3a1fca9 commit 255a992

File tree

5 files changed

+114
-33
lines changed

5 files changed

+114
-33
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,5 +105,6 @@ tracing = { version = "0.1.41", default-features = false }
105105
n0-future = { version = "0.1.2", default-features = false }
106106
tracing-subscriber = { version = "0.3.19" }
107107
iroh = { version = "0.91" }
108+
iroh-base = { version = "0.91" }
108109
quinn = { package = "iroh-quinn", version = "0.14.0", default-features = false }
109110
futures-util = { version = "0.3", features = ["sink"] }

irpc-iroh/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ serde = { workspace = true }
1818
postcard = { workspace = true, features = ["alloc", "use-std"] }
1919
n0-future = { workspace = true }
2020
irpc = { version = "0.7.0", path = ".." }
21+
iroh-base.workspace = true
2122

2223
[target.'cfg(all(target_family = "wasm", target_os = "unknown"))'.dependencies]
2324
getrandom = { version = "0.3", features = ["wasm_js"] }

irpc-iroh/examples/0rtt.rs

Lines changed: 108 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,54 +1,114 @@
1+
use std::{env::Args, time::Instant, usize};
2+
13
use anyhow::Result;
2-
use iroh::{protocol::Router, Endpoint, Watcher};
4+
use clap::Parser;
5+
use iroh::{protocol::Router, Endpoint, NodeAddr, Watcher};
6+
use iroh_base::ticket::NodeTicket;
37
use ping::EchoApi;
48

59
#[tokio::main]
610
async fn main() -> Result<()> {
7-
// tracing_subscriber::fmt().init();
8-
println!("Local use");
9-
local().await?;
10-
println!("Remote use");
11-
remote().await?;
12-
Ok(())
13-
}
14-
15-
async fn local() -> Result<()> {
16-
let api = EchoApi::spawn();
17-
let res = api.echo(b"hello".to_vec()).await?;
18-
println!("value = {}", String::from_utf8_lossy(&res));
11+
tracing_subscriber::fmt().init();
12+
let args = cli::Args::parse();
13+
match args {
14+
cli::Args::Listen { use_0rtt } => {
15+
let (server_router, server_addr) = {
16+
let endpoint = Endpoint::builder().bind().await?;
17+
endpoint.home_relay().initialized().await;
18+
let addr = endpoint.node_addr().initialized().await;
19+
let api = EchoApi::spawn();
20+
let router = Router::builder(endpoint.clone());
21+
let router = if use_0rtt {
22+
router.accept(EchoApi::ALPN, api.expose_0rtt()?)
23+
} else {
24+
router.accept(EchoApi::ALPN, api.expose()?)
25+
};
26+
let router = router.spawn();
27+
(router, addr)
28+
};
29+
println!("NodeId: {}", server_addr.node_id);
30+
println!("Accepting 0rtt connections: {}", use_0rtt);
31+
let ticket = NodeTicket::from(server_addr);
32+
println!("Connect using:\n\ncargo run --example 0rtt connect {ticket}\n");
33+
println!("Control-C to stop");
34+
tokio::signal::ctrl_c()
35+
.await
36+
.expect("failed to listen for ctrl_c");
37+
server_router.shutdown().await?;
38+
}
39+
cli::Args::Connect {
40+
ticket,
41+
n,
42+
delay_ms,
43+
use_0rtt,
44+
} => {
45+
let n = n
46+
.iter()
47+
.filter_map(|x| u64::try_from(*x).ok())
48+
.next()
49+
.unwrap_or(u64::MAX);
50+
let delay = std::time::Duration::from_millis(delay_ms);
51+
let endpoint = Endpoint::builder().bind().await?;
52+
let addr: NodeAddr = ticket.into();
53+
for i in 0..n {
54+
if use_0rtt {
55+
let api = EchoApi::connect_0rtt(endpoint.clone(), addr.clone()).await?;
56+
let msg = i.to_be_bytes();
57+
let t0 = Instant::now();
58+
let res = api.echo_0rtt(msg.to_vec()).await;
59+
drop(api);
60+
match res {
61+
Ok(data) => {
62+
let elapsed = t0.elapsed();
63+
assert!(data == msg);
64+
println!("{}ms", elapsed.as_micros() as f64 / 1000.0);
65+
}
66+
Err(err) => {
67+
eprintln!("RPC error: {err}");
68+
}
69+
}
70+
tokio::time::sleep(delay).await;
71+
} else {
72+
}
73+
}
74+
}
75+
}
1976
Ok(())
2077
}
2178

22-
async fn remote() -> Result<()> {
23-
let (server_router, server_addr) = {
24-
let endpoint = Endpoint::builder().discovery_n0().bind().await?;
25-
let api = EchoApi::spawn();
26-
let router = Router::builder(endpoint.clone())
27-
.accept(EchoApi::ALPN, api.expose_0rtt()?)
28-
.spawn();
29-
let addr = endpoint.node_addr().initialized().await;
30-
(router, addr)
31-
};
79+
mod cli {
80+
use anyhow::Result;
81+
use clap::Parser;
82+
use iroh::NodeId;
83+
use iroh_base::ticket::NodeTicket;
3284

33-
let client_endpoint = Endpoint::builder().bind().await?;
34-
for i in 0..10 {
35-
let api = EchoApi::connect(client_endpoint.clone(), server_addr.clone()).await?;
36-
let res = api.echo_0rtt(b"hello".to_vec()).await?;
37-
println!("value = {}", String::from_utf8_lossy(&res));
85+
#[derive(Debug, Parser)]
86+
pub enum Args {
87+
Listen {
88+
#[clap(long, default_value = "true")]
89+
use_0rtt: bool,
90+
},
91+
Connect {
92+
ticket: NodeTicket,
93+
#[clap(short)]
94+
n: Option<usize>,
95+
#[clap(long, default_value = "true")]
96+
use_0rtt: bool,
97+
#[clap(long, default_value = "1000")]
98+
delay_ms: u64,
99+
},
38100
}
39-
drop(server_router);
40-
Ok(())
41101
}
42102

43103
mod ping {
44104
use anyhow::{Context, Result};
45105
use futures_util::FutureExt;
46106
use iroh::{
47-
endpoint::{Connection, RecvStream, SendStream, ZeroRttAccepted},
107+
endpoint::{Connection, RecvStream, SendStream},
48108
Endpoint,
49109
};
50110
use irpc::{channel::oneshot, rpc::RemoteService, rpc_requests, Client, WithChannels};
51-
use irpc_iroh::{Iroh0RttProtocol, IrohProtocol, IrohRemoteConnection};
111+
use irpc_iroh::{Iroh0RttProtocol, IrohProtocol};
52112
use n0_future::future;
53113
use serde::{Deserialize, Serialize};
54114
use tracing::info;
@@ -95,7 +155,22 @@ mod ping {
95155
Ok(IrohProtocol::new(EchoProtocol::remote_handler(local)))
96156
}
97157

98-
pub async fn connect(
158+
// pub async fn connect(
159+
// endpoint: Endpoint,
160+
// addr: impl Into<iroh::NodeAddr>,
161+
// ) -> Result<EchoApi> {
162+
// let conn = endpoint
163+
// .connect(addr, Self::ALPN)
164+
// .await
165+
// .context("failed to connect to remote service")?;
166+
// let fut: future::Boxed<bool> = Box::pin(async { true });
167+
// Ok(EchoApi {
168+
// inner: Client::boxed(IrohConnection(conn)),
169+
// zero_rtt_accepted: fut.shared(),
170+
// })
171+
// }
172+
173+
pub async fn connect_0rtt(
99174
endpoint: Endpoint,
100175
addr: impl Into<iroh::NodeAddr>,
101176
) -> Result<EchoApi> {

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1336,9 +1336,11 @@ impl<S: Service> Client<S> {
13361336
let buf = rpc::prepare_write::<S>(msg)?;
13371337
let (_tx, rx) = request.write_raw(&buf).await?;
13381338
if zero_rtt_accepted.await {
1339+
println!("0-RTT accepted");
13391340
rx
13401341
} else {
13411342
// 0rtt was not accepted, the data is lost, send it again!
1343+
println!("0-RTT not accepted");
13421344
let Request::Remote(request) = this.request().await? else {
13431345
unreachable!()
13441346
};

0 commit comments

Comments
 (0)