@@ -3,6 +3,7 @@ use std::{
33 net:: { Ipv4Addr , SocketAddr , SocketAddrV4 } ,
44} ;
55
6+ use anyhow:: bail;
67use futures_buffered:: BufferedStreamExt ;
78use irpc:: {
89 channel:: { mpsc, oneshot} ,
@@ -11,7 +12,6 @@ use irpc::{
1112 util:: { make_client_endpoint, make_server_endpoint} ,
1213 Client , Request , WithChannels ,
1314} ;
14- use n0_error:: { bail_any, Result , StdResultExt } ;
1515use n0_future:: {
1616 stream:: StreamExt ,
1717 task:: { self , AbortOnDropHandle } ,
@@ -144,23 +144,23 @@ struct ComputeApi {
144144}
145145
146146impl ComputeApi {
147- pub fn connect ( endpoint : quinn:: Endpoint , addr : SocketAddr ) -> Result < ComputeApi > {
147+ pub fn connect ( endpoint : quinn:: Endpoint , addr : SocketAddr ) -> anyhow :: Result < ComputeApi > {
148148 Ok ( ComputeApi {
149149 inner : Client :: quinn ( endpoint, addr) ,
150150 } )
151151 }
152152
153- pub fn listen ( & self , endpoint : quinn:: Endpoint ) -> Result < AbortOnDropHandle < ( ) > > {
153+ pub fn listen ( & self , endpoint : quinn:: Endpoint ) -> anyhow :: Result < AbortOnDropHandle < ( ) > > {
154154 let Some ( local) = self . inner . as_local ( ) else {
155- bail_any ! ( "cannot listen on a remote service" ) ;
155+ bail ! ( "cannot listen on a remote service" ) ;
156156 } ;
157157 let handler = ComputeProtocol :: remote_handler ( local) ;
158158 Ok ( AbortOnDropHandle :: new ( task:: spawn ( listen (
159159 endpoint, handler,
160160 ) ) ) )
161161 }
162162
163- pub async fn sqr ( & self , num : u64 ) -> Result < oneshot:: Receiver < u128 > > {
163+ pub async fn sqr ( & self , num : u64 ) -> anyhow :: Result < oneshot:: Receiver < u128 > > {
164164 let msg = Sqr { num } ;
165165 match self . inner . request ( ) . await ? {
166166 Request :: Local ( request) => {
@@ -175,7 +175,7 @@ impl ComputeApi {
175175 }
176176 }
177177
178- pub async fn sum ( & self ) -> Result < ( mpsc:: Sender < i64 > , oneshot:: Receiver < i64 > ) > {
178+ pub async fn sum ( & self ) -> anyhow :: Result < ( mpsc:: Sender < i64 > , oneshot:: Receiver < i64 > ) > {
179179 let msg = Sum ;
180180 match self . inner . request ( ) . await ? {
181181 Request :: Local ( request) => {
@@ -191,7 +191,7 @@ impl ComputeApi {
191191 }
192192 }
193193
194- pub async fn fibonacci ( & self , max : u64 ) -> Result < mpsc:: Receiver < u64 > > {
194+ pub async fn fibonacci ( & self , max : u64 ) -> anyhow :: Result < mpsc:: Receiver < u64 > > {
195195 let msg = Fibonacci { max } ;
196196 match self . inner . request ( ) . await ? {
197197 Request :: Local ( request) => {
@@ -206,7 +206,10 @@ impl ComputeApi {
206206 }
207207 }
208208
209- pub async fn multiply ( & self , initial : u64 ) -> Result < ( mpsc:: Sender < u64 > , mpsc:: Receiver < u64 > ) > {
209+ pub async fn multiply (
210+ & self ,
211+ initial : u64 ,
212+ ) -> anyhow:: Result < ( mpsc:: Sender < u64 > , mpsc:: Receiver < u64 > ) > {
210213 let msg = Multiply { initial } ;
211214 match self . inner . request ( ) . await ? {
212215 Request :: Local ( request) => {
@@ -224,7 +227,7 @@ impl ComputeApi {
224227}
225228
226229// Local usage example
227- async fn local ( ) -> Result < ( ) > {
230+ async fn local ( ) -> anyhow :: Result < ( ) > {
228231 let api = ComputeActor :: local ( ) ;
229232
230233 // Test Sqr
@@ -262,7 +265,7 @@ async fn local() -> Result<()> {
262265 Ok ( ( ) )
263266}
264267
265- fn remote_api ( ) -> Result < ( ComputeApi , AbortOnDropHandle < ( ) > ) > {
268+ fn remote_api ( ) -> anyhow :: Result < ( ComputeApi , AbortOnDropHandle < ( ) > ) > {
266269 let port = 10114 ;
267270 let ( server, cert) =
268271 make_server_endpoint ( SocketAddrV4 :: new ( Ipv4Addr :: UNSPECIFIED , port) . into ( ) ) ?;
@@ -275,7 +278,7 @@ fn remote_api() -> Result<(ComputeApi, AbortOnDropHandle<()>)> {
275278}
276279
277280// Remote usage example
278- async fn remote ( ) -> Result < ( ) > {
281+ async fn remote ( ) -> anyhow :: Result < ( ) > {
279282 let ( api, handle) = remote_api ( ) ?;
280283
281284 // Test Sqr
@@ -315,7 +318,7 @@ async fn remote() -> Result<()> {
315318}
316319
317320// Benchmark function using the new ComputeApi
318- async fn bench ( api : ComputeApi , n : u64 ) -> Result < ( ) > {
321+ async fn bench ( api : ComputeApi , n : u64 ) -> anyhow :: Result < ( ) > {
319322 // Individual RPCs (sequential)
320323 {
321324 let mut sum = 0 ;
@@ -339,7 +342,7 @@ async fn bench(api: ComputeApi, n: u64) -> Result<()> {
339342 let api = api. clone ( ) ;
340343 let reqs = n0_future:: stream:: iter ( ( 0 ..n) . map ( move |i| {
341344 let api = api. clone ( ) ;
342- async move { n0_error :: Ok ( api. sqr ( i) . await ?. await ?) }
345+ async move { anyhow :: Ok ( api. sqr ( i) . await ?. await ?) }
343346 } ) ) ;
344347 let resp: Vec < _ > = reqs. buffered_unordered ( 32 ) . try_collect ( ) . await ?;
345348 let sum = resp. into_iter ( ) . sum :: < u128 > ( ) ;
@@ -373,7 +376,7 @@ async fn bench(api: ComputeApi, n: u64) -> Result<()> {
373376 assert_eq ! ( sum, ( 0 ..n) . map( |x| x * 2 ) . sum:: <u64 >( ) ) ;
374377 clear_line ( ) ?;
375378 println ! ( "bidi seq {} rps" , rps. separate_with_underscores( ) ) ;
376- handle. await . std_context ( "panic in task" ) ??;
379+ handle. await ??;
377380 }
378381
379382 Ok ( ( ) )
@@ -392,7 +395,7 @@ fn clear_line() -> io::Result<()> {
392395}
393396
394397// Simple benchmark sending oneshot senders via an mpsc channel
395- pub async fn reference_bench ( n : u64 ) -> Result < ( ) > {
398+ pub async fn reference_bench ( n : u64 ) -> anyhow :: Result < ( ) > {
396399 // Create an mpsc channel to send oneshot senders
397400 let ( tx, mut rx) = tokio:: sync:: mpsc:: channel :: < tokio:: sync:: oneshot:: Sender < u64 > > ( 32 ) ;
398401
@@ -411,8 +414,8 @@ pub async fn reference_bench(n: u64) -> Result<()> {
411414 let t0 = std:: time:: Instant :: now ( ) ;
412415 for i in 0 ..n {
413416 let ( send, recv) = tokio:: sync:: oneshot:: channel ( ) ;
414- tx. send ( send) . await . anyerr ( ) ?;
415- sum += recv. await . anyerr ( ) ?;
417+ tx. send ( send) . await ?;
418+ sum += recv. await ?;
416419 if i % 10000 == 0 {
417420 print ! ( "." ) ;
418421 io:: stdout ( ) . flush ( ) ?;
@@ -429,8 +432,8 @@ pub async fn reference_bench(n: u64) -> Result<()> {
429432 let t0 = std:: time:: Instant :: now ( ) ;
430433 let reqs = n0_future:: stream:: iter ( ( 0 ..n) . map ( |_| async {
431434 let ( send, recv) = tokio:: sync:: oneshot:: channel ( ) ;
432- tx. send ( send) . await . anyerr ( ) ?;
433- n0_error :: Ok ( recv. await . anyerr ( ) ?)
435+ tx. send ( send) . await ?;
436+ anyhow :: Ok ( recv. await ?)
434437 } ) ) ;
435438 let resp: Vec < _ > = reqs. buffered_unordered ( 32 ) . try_collect ( ) . await ?;
436439 let sum = resp. into_iter ( ) . sum :: < u64 > ( ) ;
@@ -444,7 +447,7 @@ pub async fn reference_bench(n: u64) -> Result<()> {
444447}
445448
446449#[ tokio:: main]
447- async fn main ( ) -> Result < ( ) > {
450+ async fn main ( ) -> anyhow :: Result < ( ) > {
448451 tracing_subscriber:: fmt:: init ( ) ;
449452 println ! ( "Local use" ) ;
450453 local ( ) . await ?;
0 commit comments