Skip to content

Commit b062af3

Browse files
committed
refactor: use RemoteService trait
1 parent 1441871 commit b062af3

File tree

7 files changed

+33
-34
lines changed

7 files changed

+33
-34
lines changed

examples/compute.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use anyhow::bail;
77
use futures_buffered::BufferedStreamExt;
88
use irpc::{
99
channel::{mpsc, oneshot},
10-
rpc::{listen, MessageWithChannels},
10+
rpc::{listen, RemoteService},
1111
rpc_requests,
1212
util::{make_client_endpoint, make_server_endpoint},
1313
Client, LocalSender, Request, WithChannels,
@@ -164,7 +164,7 @@ impl ComputeApi {
164164
let Some(local) = self.inner.local() else {
165165
bail!("cannot listen on a remote service");
166166
};
167-
let handler = ComputeMessage::forwarding_handler(local);
167+
let handler = ComputeProtocol::forwarding_handler(local);
168168
Ok(AbortOnDropHandle::new(task::spawn(listen(
169169
endpoint, handler,
170170
))))

examples/derive.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::{
66
use anyhow::{Context, Result};
77
use irpc::{
88
channel::{mpsc, oneshot},
9-
rpc::MessageWithChannels,
9+
rpc::RemoteService,
1010
rpc_requests,
1111
util::{make_client_endpoint, make_server_endpoint},
1212
Client, LocalSender, WithChannels,
@@ -130,7 +130,7 @@ impl StorageApi {
130130
let local = self.inner.local().context("cannot listen on remote API")?;
131131
let join_handle = task::spawn(irpc::rpc::listen(
132132
endpoint,
133-
StorageMessage::forwarding_handler(local),
133+
StorageProtocol::forwarding_handler(local),
134134
));
135135
Ok(AbortOnDropHandle::new(join_handle))
136136
}

irpc-derive/src/lib.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -134,16 +134,16 @@ fn generate_message_from_wire_impl(
134134
});
135135

136136
quote! {
137-
impl ::irpc::rpc::MessageWithChannels for #message_enum_name {
138-
type WireMessage = #proto_enum_name;
137+
impl ::irpc::rpc::RemoteService for #proto_enum_name {
139138
fn from_wire(
140139
msg: Self::WireMessage,
141140
rx: ::irpc::rpc::quinn::RecvStream,
142-
tx: ::irpc::rpc::quinn::SendStream) -> Self {
143-
match msg {
144-
#(#variants),*
145-
}
141+
tx: ::irpc::rpc::quinn::SendStream
142+
) -> Self::Message {
143+
match msg {
144+
#(#variants),*
146145
}
146+
}
147147
}
148148
}
149149
}

irpc-iroh/examples/auth.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ mod storage {
132132
impl ProtocolHandler for StorageServer {
133133
async fn accept(&self, conn: Connection) -> Result<(), AcceptError> {
134134
let mut authed = false;
135-
while let Some(msg) = read_request(&conn).await? {
135+
while let Some(msg) = read_request::<StorageProtocol>(&conn).await? {
136136
match msg {
137137
StorageMessage::Auth(msg) => {
138138
let WithChannels { inner, tx, .. } = msg;

irpc-iroh/examples/derive.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ mod storage {
6262
use iroh::{protocol::ProtocolHandler, Endpoint};
6363
use irpc::{
6464
channel::{mpsc, oneshot},
65-
rpc::MessageWithChannels,
65+
rpc::RemoteService,
6666
rpc_requests, Client, LocalSender, WithChannels,
6767
};
6868
// Import the macro
@@ -171,7 +171,9 @@ mod storage {
171171
.inner
172172
.local()
173173
.context("can not listen on remote service")?;
174-
Ok(IrohProtocol::new(StorageMessage::forwarding_handler(local)))
174+
Ok(IrohProtocol::new(StorageProtocol::forwarding_handler(
175+
local,
176+
)))
175177
}
176178

177179
pub async fn get(&self, key: String) -> irpc::Result<Option<String>> {

irpc-iroh/src/lib.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use iroh::{
1010
use irpc::{
1111
channel::RecvError,
1212
rpc::{
13-
Handler, MessageWithChannels, RemoteConnection, ERROR_CODE_MAX_MESSAGE_SIZE_EXCEEDED,
13+
Handler, RemoteConnection, RemoteService, ERROR_CODE_MAX_MESSAGE_SIZE_EXCEEDED,
1414
MAX_MESSAGE_SIZE,
1515
},
1616
util::AsyncReadVarintExt,
@@ -142,13 +142,13 @@ pub async fn handle_connection<R: DeserializeOwned + 'static>(
142142
}
143143
}
144144

145-
pub async fn read_request<M: MessageWithChannels>(
145+
pub async fn read_request<S: RemoteService>(
146146
connection: &Connection,
147-
) -> std::io::Result<Option<M>> {
147+
) -> std::io::Result<Option<S::Message>> {
148148
Ok(
149-
match read_request_raw::<M::WireMessage>(connection).await? {
149+
match read_request_raw::<S::WireMessage>(connection).await? {
150150
None => None,
151-
Some((msg, rx, tx)) => Some(M::from_wire(msg, rx, tx)),
151+
Some((msg, rx, tx)) => Some(S::from_wire(msg, rx, tx)),
152152
},
153153
)
154154
}

src/lib.rs

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ impl<T> RpcMessage for T where
115115
/// message type, and provides some type safety when sending messages.
116116
pub trait Service: Send + Sync + Debug + 'static {
117117
type WireMessage: Serialize + DeserializeOwned + Send + 'static;
118-
type Message: Send + 'static;
118+
type Message: Send + Unpin + 'static;
119119
}
120120

121121
mod sealed {
@@ -1605,17 +1605,14 @@ pub mod rpc {
16051605
+ 'static,
16061606
>;
16071607

1608-
pub trait MessageWithChannels: Send + Unpin + 'static {
1609-
type WireMessage: DeserializeOwned + Send;
1610-
1611-
fn from_wire(msg: Self::WireMessage, rx: quinn::RecvStream, tx: quinn::SendStream) -> Self;
1612-
1613-
fn forwarding_handler<S: Service<Message = Self>>(
1614-
local_sender: LocalSender<S>,
1615-
) -> Handler<Self::WireMessage>
1616-
where
1617-
Self: Sized,
1618-
{
1608+
pub trait RemoteService: Service + Sized {
1609+
/// Creates a message from a wire message and a pair of quic streams.
1610+
fn from_wire(
1611+
msg: Self::WireMessage,
1612+
rx: quinn::RecvStream,
1613+
tx: quinn::SendStream,
1614+
) -> Self::Message;
1615+
fn forwarding_handler(local_sender: LocalSender<Self>) -> Handler<Self::WireMessage> {
16191616
Arc::new(move |msg, rx, tx| {
16201617
let msg = Self::from_wire(msg, rx, tx);
16211618
Box::pin(local_sender.send_raw(msg))
@@ -1661,13 +1658,13 @@ pub mod rpc {
16611658
}
16621659
}
16631660

1664-
pub async fn read_request<M: MessageWithChannels>(
1661+
pub async fn read_request<S: RemoteService>(
16651662
connection: &quinn::Connection,
1666-
) -> std::io::Result<Option<M>> {
1663+
) -> std::io::Result<Option<S::Message>> {
16671664
Ok(
1668-
match read_request_raw::<M::WireMessage>(connection).await? {
1665+
match read_request_raw::<S::WireMessage>(connection).await? {
16691666
None => None,
1670-
Some((msg, rx, tx)) => Some(M::from_wire(msg, rx, tx)),
1667+
Some((msg, rx, tx)) => Some(S::from_wire(msg, rx, tx)),
16711668
},
16721669
)
16731670
}

0 commit comments

Comments
 (0)