Skip to content

Commit 6f1d8eb

Browse files
authored
Merge pull request #7 from n0-computer/mem-rpc-client
feat: add mem rpc client
2 parents 5489d1e + 3cf208a commit 6f1d8eb

File tree

5 files changed

+58
-10
lines changed

5 files changed

+58
-10
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ tracing = "0.1"
5959

6060
# rpc
6161
nested_enum_utils = { version = "0.1.0", optional = true }
62-
quic-rpc = { version = "0.15", optional = true }
62+
quic-rpc = { version = "0.15.1", optional = true }
6363
quic-rpc-derive = { version = "0.15", optional = true }
6464
serde-error = { version = "0.1.3", optional = true }
6565
portable-atomic = { version = "1.9.0", optional = true }

src/engine.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ pub struct Engine<D> {
5656
content_status_cb: ContentStatusCallback,
5757
local_pool_handle: LocalPoolHandle,
5858
blob_store: D,
59+
#[cfg(feature = "rpc")]
60+
pub(crate) rpc_handler: Arc<std::sync::OnceLock<crate::rpc::RpcHandler>>,
5961
}
6062

6163
impl<D: iroh_blobs::store::Store> Engine<D> {
@@ -118,6 +120,8 @@ impl<D: iroh_blobs::store::Store> Engine<D> {
118120
default_author: Arc::new(default_author),
119121
local_pool_handle,
120122
blob_store: bao_store,
123+
#[cfg(feature = "rpc")]
124+
rpc_handler: Default::default(),
121125
})
122126
}
123127

src/rpc.rs

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
//! Quic RPC implementation for docs.
22
3-
use proto::RpcService;
4-
use quic_rpc::server::{ChannelTypes, RpcChannel};
3+
use proto::{Request, RpcService};
4+
use quic_rpc::{
5+
server::{ChannelTypes, RpcChannel},
6+
RpcClient, RpcServer,
7+
};
8+
use tokio_util::task::AbortOnDropHandle;
59

610
use crate::engine::Engine;
711

@@ -14,15 +18,22 @@ type RpcError = serde_error::Error;
1418
type RpcResult<T> = std::result::Result<T, RpcError>;
1519

1620
impl<D: iroh_blobs::store::Store> Engine<D> {
21+
/// Get an in memory client to interact with the docs engine.
22+
pub fn client(&self) -> &client::docs::MemClient {
23+
&self
24+
.rpc_handler
25+
.get_or_init(|| RpcHandler::new(self))
26+
.client
27+
}
28+
1729
/// Handle a docs request from the RPC server.
1830
pub async fn handle_rpc_request<C: ChannelTypes<RpcService>>(
19-
&self,
20-
msg: crate::rpc::proto::Request,
31+
self,
32+
msg: Request,
2133
chan: RpcChannel<RpcService, C>,
2234
) -> Result<(), quic_rpc::server::RpcServerError<C>> {
23-
use crate::rpc::proto::Request::*;
24-
25-
let this = self.clone();
35+
use Request::*;
36+
let this = self;
2637
match msg {
2738
Open(msg) => chan.rpc(msg, this, Self::doc_open).await,
2839
Close(msg) => chan.rpc(msg, this, Self::doc_close).await,
@@ -65,3 +76,23 @@ impl<D: iroh_blobs::store::Store> Engine<D> {
6576
}
6677
}
6778
}
79+
80+
#[derive(Debug)]
81+
pub(crate) struct RpcHandler {
82+
/// Client to hand out
83+
client: client::docs::MemClient,
84+
/// Handler task
85+
_handler: AbortOnDropHandle<()>,
86+
}
87+
88+
impl RpcHandler {
89+
fn new<D: iroh_blobs::store::Store>(engine: &Engine<D>) -> Self {
90+
let engine = engine.clone();
91+
let (listener, connector) = quic_rpc::transport::flume::channel(1);
92+
let listener = RpcServer::new(listener);
93+
let client = client::docs::MemClient::new(RpcClient::new(connector));
94+
let _handler = listener
95+
.spawn_accept_loop(move |req, chan| engine.clone().handle_rpc_request(req, chan));
96+
Self { client, _handler }
97+
}
98+
}

src/rpc/client/authors.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use crate::{
1919

2020
/// Iroh docs client.
2121
#[derive(Debug, Clone)]
22+
#[repr(transparent)]
2223
pub struct Client<C = BoxedConnector<RpcService>> {
2324
pub(super) rpc: quic_rpc::RpcClient<RpcService, C>,
2425
}

src/rpc/client/docs.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,12 @@ use iroh_base::node_addr::AddrInfoOptions;
1616
use iroh_blobs::{export::ExportProgress, store::ExportMode, Hash};
1717
use iroh_net::NodeAddr;
1818
use portable_atomic::{AtomicBool, Ordering};
19-
use quic_rpc::{client::BoxedConnector, message::RpcMsg, Connector};
19+
use quic_rpc::{
20+
client::BoxedConnector, message::RpcMsg, transport::flume::FlumeConnector, Connector,
21+
};
2022
use serde::{Deserialize, Serialize};
2123

22-
use super::flatten;
24+
use super::{authors, flatten};
2325
use crate::{
2426
actor::OpenState,
2527
rpc::proto::{
@@ -38,8 +40,13 @@ pub use crate::{
3840
Entry,
3941
};
4042

43+
/// Type alias for a memory-backed client.
44+
pub type MemClient =
45+
Client<FlumeConnector<crate::rpc::proto::Response, crate::rpc::proto::Request>>;
46+
4147
/// Iroh docs client.
4248
#[derive(Debug, Clone)]
49+
#[repr(transparent)]
4350
pub struct Client<C = BoxedConnector<RpcService>> {
4451
pub(super) rpc: quic_rpc::RpcClient<RpcService, C>,
4552
}
@@ -50,6 +57,11 @@ impl<C: Connector<RpcService>> Client<C> {
5057
Self { rpc }
5158
}
5259

60+
/// Returns an authors client.
61+
pub fn authors(&self) -> authors::Client<C> {
62+
authors::Client::new(self.rpc.clone())
63+
}
64+
5365
/// Creates a client.
5466
pub async fn create(&self) -> Result<Doc<C>> {
5567
let res = self.rpc.rpc(CreateRequest {}).await??;

0 commit comments

Comments
 (0)