Skip to content

Commit 9db7372

Browse files
authored
Merge pull request #10 from n0-computer/static-sugar-futures
feat: Make the futures returned by the rpc sugar functions 'static
2 parents c797453 + 1168865 commit 9db7372

File tree

1 file changed

+107
-85
lines changed

1 file changed

+107
-85
lines changed

src/lib.rs

Lines changed: 107 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -813,73 +813,84 @@ impl<M, R, S> Client<M, R, S> {
813813
}
814814

815815
/// Performs a request for which the server returns a oneshot receiver.
816-
pub async fn rpc<Req, Res>(&self, msg: Req) -> Result<Res, Error>
816+
pub fn rpc<Req, Res>(
817+
&self,
818+
msg: Req,
819+
) -> impl Future<Output = Result<Res, Error>> + Send + 'static
817820
where
818821
S: Service,
819822
M: From<WithChannels<Req, S>> + Send + Sync + Unpin + 'static,
820-
R: From<Req> + Serialize + 'static,
821-
Req: Channels<S, Tx = channel::oneshot::Sender<Res>, Rx = NoReceiver>,
823+
R: From<Req> + Serialize + Send + Sync + 'static,
824+
Req: Channels<S, Tx = channel::oneshot::Sender<Res>, Rx = NoReceiver> + Send + 'static,
822825
Res: RpcMessage,
823826
{
824-
let recv: channel::oneshot::Receiver<Res> = match self.request().await? {
825-
Request::Local(request) => {
826-
let (tx, rx) = channel::oneshot::channel();
827-
request.send((msg, tx)).await?;
828-
rx
829-
}
830-
#[cfg(not(feature = "rpc"))]
831-
Request::Remote(_request) => unreachable!(),
832-
#[cfg(feature = "rpc")]
833-
Request::Remote(request) => {
834-
let (_tx, rx) = request.write(msg).await?;
835-
rx.into()
836-
}
837-
};
838-
let res = recv.await?;
839-
Ok(res)
827+
let request = self.request();
828+
async move {
829+
let recv: channel::oneshot::Receiver<Res> = match request.await? {
830+
Request::Local(request) => {
831+
let (tx, rx) = channel::oneshot::channel();
832+
request.send((msg, tx)).await?;
833+
rx
834+
}
835+
#[cfg(not(feature = "rpc"))]
836+
Request::Remote(_request) => unreachable!(),
837+
#[cfg(feature = "rpc")]
838+
Request::Remote(request) => {
839+
let (_tx, rx) = request.write(msg).await?;
840+
rx.into()
841+
}
842+
};
843+
let res = recv.await?;
844+
Ok(res)
845+
}
840846
}
841847

842848
/// Performs a request for which the server returns a spsc receiver.
843-
pub async fn server_streaming<Req, Res>(
849+
pub fn server_streaming<Req, Res>(
844850
&self,
845851
msg: Req,
846852
local_response_cap: usize,
847-
) -> Result<channel::spsc::Receiver<Res>, Error>
853+
) -> impl Future<Output = Result<channel::spsc::Receiver<Res>, Error>> + Send + 'static
848854
where
849855
S: Service,
850856
M: From<WithChannels<Req, S>> + Send + Sync + Unpin + 'static,
851-
R: From<Req> + Serialize + 'static,
852-
Req: Channels<S, Tx = channel::spsc::Sender<Res>, Rx = NoReceiver>,
857+
R: From<Req> + Serialize + Send + Sync + 'static,
858+
Req: Channels<S, Tx = channel::spsc::Sender<Res>, Rx = NoReceiver> + Send + 'static,
853859
Res: RpcMessage,
854860
{
855-
let recv: channel::spsc::Receiver<Res> = match self.request().await? {
856-
Request::Local(request) => {
857-
let (tx, rx) = channel::spsc::channel(local_response_cap);
858-
request.send((msg, tx)).await?;
859-
rx
860-
}
861-
#[cfg(not(feature = "rpc"))]
862-
Request::Remote(_request) => unreachable!(),
863-
#[cfg(feature = "rpc")]
864-
Request::Remote(request) => {
865-
let (_tx, rx) = request.write(msg).await?;
866-
rx.into()
867-
}
868-
};
869-
Ok(recv)
861+
let request = self.request();
862+
async move {
863+
let recv: channel::spsc::Receiver<Res> = match request.await? {
864+
Request::Local(request) => {
865+
let (tx, rx) = channel::spsc::channel(local_response_cap);
866+
request.send((msg, tx)).await?;
867+
rx
868+
}
869+
#[cfg(not(feature = "rpc"))]
870+
Request::Remote(_request) => unreachable!(),
871+
#[cfg(feature = "rpc")]
872+
Request::Remote(request) => {
873+
let (_tx, rx) = request.write(msg).await?;
874+
rx.into()
875+
}
876+
};
877+
Ok(recv)
878+
}
870879
}
871880

872881
/// Performs a request for which the client can send updates.
873-
pub async fn client_streaming<Req, Update, Res>(
882+
pub fn client_streaming<Req, Update, Res>(
874883
&self,
875884
msg: Req,
876885
local_update_cap: usize,
877-
) -> Result<
878-
(
879-
channel::spsc::Sender<Update>,
880-
channel::oneshot::Receiver<Res>,
881-
),
882-
Error,
886+
) -> impl Future<
887+
Output = Result<
888+
(
889+
channel::spsc::Sender<Update>,
890+
channel::oneshot::Receiver<Res>,
891+
),
892+
Error,
893+
>,
883894
>
884895
where
885896
S: Service,
@@ -889,59 +900,70 @@ impl<M, R, S> Client<M, R, S> {
889900
Update: RpcMessage,
890901
Res: RpcMessage,
891902
{
892-
let (update_tx, res_rx): (
893-
channel::spsc::Sender<Update>,
894-
channel::oneshot::Receiver<Res>,
895-
) = match self.request().await? {
896-
Request::Local(request) => {
897-
let (req_tx, req_rx) = channel::spsc::channel(local_update_cap);
898-
let (res_tx, res_rx) = channel::oneshot::channel();
899-
request.send((msg, res_tx, req_rx)).await?;
900-
(req_tx, res_rx)
901-
}
902-
#[cfg(not(feature = "rpc"))]
903-
Request::Remote(_request) => unreachable!(),
904-
#[cfg(feature = "rpc")]
905-
Request::Remote(request) => {
906-
let (tx, rx) = request.write(msg).await?;
907-
(tx.into(), rx.into())
908-
}
909-
};
910-
Ok((update_tx, res_rx))
903+
let request = self.request();
904+
async move {
905+
let (update_tx, res_rx): (
906+
channel::spsc::Sender<Update>,
907+
channel::oneshot::Receiver<Res>,
908+
) = match request.await? {
909+
Request::Local(request) => {
910+
let (req_tx, req_rx) = channel::spsc::channel(local_update_cap);
911+
let (res_tx, res_rx) = channel::oneshot::channel();
912+
request.send((msg, res_tx, req_rx)).await?;
913+
(req_tx, res_rx)
914+
}
915+
#[cfg(not(feature = "rpc"))]
916+
Request::Remote(_request) => unreachable!(),
917+
#[cfg(feature = "rpc")]
918+
Request::Remote(request) => {
919+
let (tx, rx) = request.write(msg).await?;
920+
(tx.into(), rx.into())
921+
}
922+
};
923+
Ok((update_tx, res_rx))
924+
}
911925
}
912926

913927
/// Performs a request for which the client can send updates, and the server returns a spsc receiver.
914-
pub async fn bidi_streaming<Req, Update, Res>(
928+
pub fn bidi_streaming<Req, Update, Res>(
915929
&self,
916930
msg: Req,
917931
local_update_cap: usize,
918932
local_response_cap: usize,
919-
) -> Result<(channel::spsc::Sender<Update>, channel::spsc::Receiver<Res>), Error>
933+
) -> impl Future<
934+
Output = Result<(channel::spsc::Sender<Update>, channel::spsc::Receiver<Res>), Error>,
935+
> + Send
936+
+ 'static
920937
where
921938
S: Service,
922939
M: From<WithChannels<Req, S>> + Send + Sync + Unpin + 'static,
923-
R: From<Req> + Serialize + 'static,
924-
Req: Channels<S, Tx = channel::spsc::Sender<Res>, Rx = channel::spsc::Receiver<Update>>,
940+
R: From<Req> + Serialize + Send + 'static,
941+
Req: Channels<S, Tx = channel::spsc::Sender<Res>, Rx = channel::spsc::Receiver<Update>>
942+
+ Send
943+
+ 'static,
925944
Update: RpcMessage,
926945
Res: RpcMessage,
927946
{
928-
let (update_tx, res_rx): (channel::spsc::Sender<Update>, channel::spsc::Receiver<Res>) =
929-
match self.request().await? {
930-
Request::Local(request) => {
931-
let (update_tx, update_rx) = channel::spsc::channel(local_update_cap);
932-
let (res_tx, res_rx) = channel::spsc::channel(local_response_cap);
933-
request.send((msg, res_tx, update_rx)).await?;
934-
(update_tx, res_rx)
935-
}
936-
#[cfg(not(feature = "rpc"))]
937-
Request::Remote(_request) => unreachable!(),
938-
#[cfg(feature = "rpc")]
939-
Request::Remote(request) => {
940-
let (tx, rx) = request.write(msg).await?;
941-
(tx.into(), rx.into())
942-
}
943-
};
944-
Ok((update_tx, res_rx))
947+
let request = self.request();
948+
async move {
949+
let (update_tx, res_rx): (channel::spsc::Sender<Update>, channel::spsc::Receiver<Res>) =
950+
match request.await? {
951+
Request::Local(request) => {
952+
let (update_tx, update_rx) = channel::spsc::channel(local_update_cap);
953+
let (res_tx, res_rx) = channel::spsc::channel(local_response_cap);
954+
request.send((msg, res_tx, update_rx)).await?;
955+
(update_tx, res_rx)
956+
}
957+
#[cfg(not(feature = "rpc"))]
958+
Request::Remote(_request) => unreachable!(),
959+
#[cfg(feature = "rpc")]
960+
Request::Remote(request) => {
961+
let (tx, rx) = request.write(msg).await?;
962+
(tx.into(), rx.into())
963+
}
964+
};
965+
Ok((update_tx, res_rx))
966+
}
945967
}
946968
}
947969

0 commit comments

Comments
 (0)