Skip to content

Commit e422335

Browse files
authored
Merge pull request #7 from n0-computer/stream-util
feat: add conversions to stream and sink
2 parents 60994d9 + ad82340 commit e422335

File tree

4 files changed

+28
-2
lines changed

4 files changed

+28
-2
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ anyhow = { workspace = true, optional = true }
4040
futures-buffered ={ version = "0.2.9", optional = true }
4141
# for AbortOnDropHandle
4242
n0-future = { workspace = true, optional = true }
43+
futures-util = { workspace = true, optional = true }
4344

4445
[target.'cfg(not(all(target_family = "wasm", target_os = "unknown")))'.dependencies]
4546
quinn = { workspace = true, optional = true, features = ["runtime-tokio"] }
@@ -64,7 +65,8 @@ rpc = ["dep:quinn", "dep:postcard", "dep:anyhow", "dep:smallvec", "dep:tracing",
6465
quinn_endpoint_setup = ["rpc", "dep:rustls", "dep:rcgen", "dep:anyhow", "dep:futures-buffered", "quinn/rustls-ring"]
6566
# pick up parent span when creating channel messages
6667
message_spans = []
67-
default = ["rpc", "quinn_endpoint_setup", "message_spans"]
68+
stream = ["dep:futures-util"]
69+
default = ["rpc", "quinn_endpoint_setup", "message_spans", "stream"]
6870

6971
[workspace]
7072
members = ["irpc-derive", "irpc-iroh"]
@@ -86,4 +88,4 @@ n0-future = { version = "0.1.2", default-features = false }
8688
tracing-subscriber = { version = "0.3.19" }
8789
iroh = { version = "0.34" }
8890
quinn = { package = "iroh-quinn", version = "0.13.0", default-features = false }
89-
91+
futures-util = { version = "0.3", features = ["sink"] }

irpc-derive/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,7 @@ pub fn rpc_requests(attr: TokenStream, item: TokenStream) -> TokenStream {
301301
.iter()
302302
.map(|(variant_name, inner_type)| {
303303
quote! {
304+
#[allow(missing_docs)]
304305
#variant_name(::irpc::WithChannels<#inner_type, #service_name>)
305306
}
306307
})
@@ -311,6 +312,7 @@ pub fn rpc_requests(attr: TokenStream, item: TokenStream) -> TokenStream {
311312

312313
// Create the message enum definition
313314
let message_enum = quote! {
315+
#[allow(missing_docs)]
314316
#[derive(Debug)]
315317
pub enum #message_enum_name {
316318
#(#message_variants),*

src/lib.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,17 @@ pub mod channel {
362362
Sender::Boxed(x) => x.is_rpc(),
363363
}
364364
}
365+
366+
#[cfg(feature = "stream")]
367+
pub fn into_sink(self) -> impl n0_future::Sink<T, Error = SendError> + Send + 'static
368+
where
369+
T: RpcMessage,
370+
{
371+
futures_util::sink::unfold(self, |mut sink, value| async move {
372+
sink.send(value).await?;
373+
Ok(sink)
374+
})
375+
}
365376
}
366377

367378
impl<T> From<tokio::sync::mpsc::Sender<T>> for Sender<T> {
@@ -489,6 +500,16 @@ pub mod channel {
489500
Self::Boxed(rx) => Ok(rx.recv().await?),
490501
}
491502
}
503+
504+
#[cfg(feature = "stream")]
505+
pub fn into_stream(
506+
self,
507+
) -> impl n0_future::Stream<Item = std::result::Result<T, RecvError>> + Send + 'static
508+
{
509+
n0_future::stream::unfold(self, |mut recv| async move {
510+
recv.recv().await.transpose().map(|msg| (msg, recv))
511+
})
512+
}
492513
}
493514

494515
impl<T> From<tokio::sync::mpsc::Receiver<T>> for Receiver<T> {

0 commit comments

Comments
 (0)