Skip to content

Commit 85a2628

Browse files
committed
Add utils for writing after all...
1 parent 53940d1 commit 85a2628

File tree

1 file changed

+59
-3
lines changed

1 file changed

+59
-3
lines changed

src/util.rs

Lines changed: 59 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -174,8 +174,9 @@ mod varint_util {
174174
io::{self, Error},
175175
};
176176

177-
use serde::Serialize;
178-
use tokio::io::{AsyncRead, AsyncReadExt};
177+
use serde::{de::DeserializeOwned, Serialize};
178+
use smallvec::SmallVec;
179+
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
179180

180181
/// Reads a u64 varint from an AsyncRead source, using the Postcard/LEB128 format.
181182
///
@@ -291,12 +292,38 @@ mod varint_util {
291292
///
292293
/// If the stream is at the end, this returns `Ok(None)`.
293294
fn read_varint_u64(&mut self) -> impl Future<Output = io::Result<Option<u64>>>;
295+
296+
fn read_length_prefixed<T: DeserializeOwned>(
297+
&mut self,
298+
max_size: usize,
299+
) -> impl Future<Output = io::Result<T>>;
294300
}
295301

296302
impl<T: AsyncRead + Unpin> AsyncReadVarintExt for T {
297303
fn read_varint_u64(&mut self) -> impl Future<Output = io::Result<Option<u64>>> {
298304
read_varint_u64(self)
299305
}
306+
307+
async fn read_length_prefixed<I: DeserializeOwned>(
308+
&mut self,
309+
max_size: usize,
310+
) -> io::Result<I> {
311+
let size = match self.read_varint_u64().await? {
312+
Some(size) => size,
313+
None => return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "EOF reached")),
314+
};
315+
316+
if size > max_size as u64 {
317+
return Err(io::Error::new(
318+
io::ErrorKind::InvalidData,
319+
"Length-prefixed value too large",
320+
));
321+
}
322+
323+
let mut buf = vec![0; size as usize];
324+
self.read_exact(&mut buf).await?;
325+
postcard::from_bytes(&buf).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
326+
}
300327
}
301328

302329
/// Provides a fn to write a varint to an [`io::Write`] target, as well as a
@@ -318,9 +345,38 @@ mod varint_util {
318345
write_length_prefixed(self, value)
319346
}
320347
}
348+
349+
/// Provides a fn to write a varint to an [`io::Write`] target, as well as a
350+
/// helper to write a length-prefixed value.
351+
pub trait AsyncWriteVarintExt: AsyncWrite + Unpin {
352+
/// Write a varint
353+
fn write_varint_u64(&mut self, value: u64) -> impl Future<Output = io::Result<usize>>;
354+
/// Write a value with a varint enoded length prefix.
355+
fn write_length_prefixed<T: Serialize>(
356+
&mut self,
357+
value: T,
358+
) -> impl Future<Output = io::Result<usize>>;
359+
}
360+
361+
impl<T: AsyncWrite + Unpin> AsyncWriteVarintExt for T {
362+
async fn write_varint_u64(&mut self, value: u64) -> io::Result<usize> {
363+
let mut buf: SmallVec<[u8; 10]> = Default::default();
364+
write_varint_u64_sync(&mut buf, value).unwrap();
365+
self.write_all(&buf[..]).await?;
366+
Ok(buf.len())
367+
}
368+
369+
async fn write_length_prefixed<V: Serialize>(&mut self, value: V) -> io::Result<usize> {
370+
let mut buf = Vec::new();
371+
write_length_prefixed(&mut buf, value)?;
372+
let size = buf.len();
373+
self.write_all(&buf).await?;
374+
Ok(size)
375+
}
376+
}
321377
}
322378
#[cfg(feature = "rpc")]
323-
pub use varint_util::{AsyncReadVarintExt, WriteVarintExt};
379+
pub use varint_util::{AsyncReadVarintExt, AsyncWriteVarintExt, WriteVarintExt};
324380

325381
mod fuse_wrapper {
326382
use std::{

0 commit comments

Comments
 (0)