Skip to content

Commit 8db0017

Browse files
authored
refactor!: Make Sender clone (#25)
This makes the `Sender` be `Clone` and take `&self` in the methods. For the remote sender, this is achieved by putting the actual send stream and buffer into a `tokio::sync::Mutex`. We also ensure that if sending failed on one sender, subsequent sends on cloned senders will fail as well. And if a `send` future is dropped before completion, further sends will fail as well, as we cannot guarantee integrity of the send stream anymore if a send future was dropped. ## Breaking changes The methods from `Sender` all take `&self` instead of `&mut self`. ## Notes and open questions My quick benchmark shows some weird numbers: ``` Local bench RPC seq 128_996 rps RPC par 1_093_166 rps bidi seq 4_281_373 rps Remote bench RPC seq 11_393 rps RPC par 60_073 rps bidi seq 1_480_826 rps Reference bench Reference seq 139_889 rps Reference par 2_784_308 rps ``` Compared to #24 the `bidi seq` for remote got *much* faster. I can't really explain that? But I ran quite a few times, without much change on either `Frando/improve-bench` or `Frando/clone-sender`. Fixes #23
1 parent c35c97f commit 8db0017

File tree

9 files changed

+260
-55
lines changed

9 files changed

+260
-55
lines changed

Cargo.lock

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ tokio = { workspace = true, features = ["full"] }
5757
thousands = "0.2.0"
5858
# macro tests
5959
trybuild = "1.0.104"
60+
testresult = "0.4.1"
6061

6162
[features]
6263
# enable the remote transport

examples/compute.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ impl ComputeActor {
123123
tx, inner, span, ..
124124
} = fib;
125125
let _entered = span.enter();
126-
let mut sender = tx;
126+
let sender = tx;
127127
let mut a = 0u64;
128128
let mut b = 1u64;
129129
while a <= inner.max {
@@ -144,7 +144,7 @@ impl ComputeActor {
144144
} = mult;
145145
let _entered = span.enter();
146146
let mut receiver = rx;
147-
let mut sender = tx;
147+
let sender = tx;
148148
let multiplier = inner.initial;
149149
while let Some(num) = receiver.recv().await? {
150150
sender.send(multiplier * num).await?;
@@ -260,7 +260,7 @@ async fn local() -> anyhow::Result<()> {
260260
println!("Local: 5^2 = {}", rx.await?);
261261

262262
// Test Sum
263-
let (mut tx, rx) = api.sum().await?;
263+
let (tx, rx) = api.sum().await?;
264264
tx.send(1).await?;
265265
tx.send(2).await?;
266266
tx.send(3).await?;
@@ -276,7 +276,7 @@ async fn local() -> anyhow::Result<()> {
276276
println!();
277277

278278
// Test Multiply
279-
let (mut in_tx, mut out_rx) = api.multiply(3).await?;
279+
let (in_tx, mut out_rx) = api.multiply(3).await?;
280280
in_tx.send(2).await?;
281281
in_tx.send(4).await?;
282282
in_tx.send(6).await?;
@@ -311,7 +311,7 @@ async fn remote() -> anyhow::Result<()> {
311311
println!("Remote: 4^2 = {}", rx.await?);
312312

313313
// Test Sum
314-
let (mut tx, rx) = api.sum().await?;
314+
let (tx, rx) = api.sum().await?;
315315
tx.send(4).await?;
316316
tx.send(5).await?;
317317
tx.send(6).await?;
@@ -327,7 +327,7 @@ async fn remote() -> anyhow::Result<()> {
327327
println!();
328328

329329
// Test Multiply
330-
let (mut in_tx, mut out_rx) = api.multiply(5).await?;
330+
let (in_tx, mut out_rx) = api.multiply(5).await?;
331331
in_tx.send(1).await?;
332332
in_tx.send(2).await?;
333333
in_tx.send(3).await?;
@@ -380,7 +380,7 @@ async fn bench(api: ComputeApi, n: u64) -> anyhow::Result<()> {
380380
// Sequential streaming (using Multiply instead of MultiplyUpdate)
381381
{
382382
let t0 = std::time::Instant::now();
383-
let (mut send, mut recv) = api.multiply(2).await?;
383+
let (send, mut recv) = api.multiply(2).await?;
384384
let handle = tokio::task::spawn(async move {
385385
for i in 0..n {
386386
send.send(i).await?;

examples/derive.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ impl StorageActor {
111111
}
112112
StorageMessage::List(list) => {
113113
info!("list {:?}", list);
114-
let WithChannels { mut tx, .. } = list;
114+
let WithChannels { tx, .. } = list;
115115
for (key, value) in &self.state {
116116
if tx.send(format!("{key}={value}")).await.is_err() {
117117
break;
@@ -172,7 +172,7 @@ async fn client_demo(api: StorageApi) -> Result<()> {
172172
let value = api.get("hello".to_string()).await?;
173173
println!("get: hello = {:?}", value);
174174

175-
let (mut tx, rx) = api.set_many().await?;
175+
let (tx, rx) = api.set_many().await?;
176176
for i in 0..3 {
177177
tx.send((format!("key{i}"), format!("value{i}"))).await?;
178178
}

examples/storage.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ impl StorageActor {
104104
}
105105
StorageMessage::List(list) => {
106106
info!("list {:?}", list);
107-
let WithChannels { mut tx, .. } = list;
107+
let WithChannels { tx, .. } = list;
108108
for (key, value) in &self.state {
109109
if tx.send(format!("{key}={value}")).await.is_err() {
110110
break;

irpc-iroh/examples/auth.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ mod storage {
218218
}
219219
StorageMessage::List(list) => {
220220
info!("list {:?}", list);
221-
let WithChannels { mut tx, .. } = list;
221+
let WithChannels { tx, .. } = list;
222222
let values = {
223223
let state = self.state.lock().unwrap();
224224
// TODO: use async lock to not clone here.

irpc-iroh/examples/derive.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ mod storage {
141141
}
142142
StorageMessage::List(list) => {
143143
info!("list {:?}", list);
144-
let WithChannels { mut tx, .. } = list;
144+
let WithChannels { tx, .. } = list;
145145
for (key, value) in &self.state {
146146
if tx.send(format!("{key}={value}")).await.is_err() {
147147
break;

0 commit comments

Comments
 (0)