Skip to content

Commit 930a787

Browse files
authored
feat: add more Frame types (#3)
1 parent f337f0b commit 930a787

File tree

7 files changed

+363
-154
lines changed

7 files changed

+363
-154
lines changed

src/bin/cli.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
//! A Redis CLI application to interact with the Redis server.
1+
//! A Redis CLI application.
22
33
use tokio::sync::mpsc;
44

src/client.rs

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -62,13 +62,12 @@ impl Client {
6262

6363
self.conn.write_frame(&frame).await?;
6464

65-
// todo: read response from the server and return to the client
6665
match self.read_response().await? {
6766
Some(data) => {
6867
let resp = String::from_utf8(data.to_vec()).unwrap();
6968
Ok(resp)
7069
}
71-
None => Err(wrap_error(RedisError::Other("Unknown error".to_string()))),
70+
None => Err(wrap_error(RedisError::Other("Unknown error".into()))),
7271
}
7372
}
7473

@@ -93,14 +92,10 @@ impl Client {
9392
async fn read_response(&mut self) -> Result<Option<Bytes>> {
9493
match self.conn.read_frame().await? {
9594
Some(Frame::SimpleString(data)) => Ok(Some(Bytes::from(data))),
96-
Some(Frame::SimpleError(data)) => Err(wrap_error(RedisError::Other(data))),
97-
Some(Frame::BulkString(data)) => Ok(Some(Bytes::from(data))),
98-
Some(_) => Err(wrap_error(RedisError::Other(
99-
"Unknown frame type: not implemented".to_string(),
100-
))),
101-
None => Err(wrap_error(RedisError::Other(
102-
"Error reading frame".to_string(),
103-
))),
95+
Some(Frame::SimpleError(data)) => Err(wrap_error(RedisError::Other(data.into()))),
96+
Some(Frame::BulkString(data)) => Ok(Some(data)),
97+
Some(_) => unimplemented!(),
98+
None => Err(wrap_error(RedisError::Other("Unknown error".into()))),
10499
}
105100
}
106101
}

src/cmd.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
//! Redis commands.
22
3+
use bytes::Bytes;
4+
35
use crate::Frame;
46

57
/// A trait for all Redis commands.
@@ -8,11 +10,13 @@ pub trait Command {
810
fn into_stream(self) -> Frame;
911
}
1012

13+
/// A Redis PING command.
14+
///
15+
/// Useful for testing whether a connection is still alive, or to measure latency.
1116
pub struct Ping {
1217
msg: Option<String>,
1318
}
1419

15-
/// Implements the Redis Ping command.
1620
impl Ping {
1721
/// Creates a new Ping command.
1822
///
@@ -23,6 +27,12 @@ impl Ping {
2327
/// # Returns
2428
///
2529
/// A new Ping command
30+
///
31+
/// # Examples
32+
///
33+
/// ```ignore
34+
/// let ping = Ping::new(Some("hello".into()));
35+
/// ```
2636
pub fn new(msg: Option<String>) -> Self {
2737
Self { msg }
2838
}
@@ -32,10 +42,11 @@ impl Command for Ping {
3242
/// Converts the ping command into a Frame to be transimitted over the stream.
3343
fn into_stream(self) -> Frame {
3444
let mut frame: Frame = Frame::array();
35-
frame.push_bulk_str("ping".into());
45+
frame.push_frame_to_array(Frame::BulkString("ping".into()));
3646

47+
// do not push the message if it is None
3748
if let Some(msg) = self.msg {
38-
frame.push_bulk_str(msg);
49+
frame.push_frame_to_array(Frame::BulkString(Bytes::from(msg)));
3950
}
4051

4152
frame

src/connection.rs

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use crate::Frame;
22
use crate::RedisError;
33
use crate::Result;
44
use crate::error::wrap_error;
5+
use bytes::Buf;
56
use bytes::{Bytes, BytesMut};
67
use std::io::Cursor;
78
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter};
@@ -57,7 +58,9 @@ impl Connection {
5758
if self.buffer.is_empty() {
5859
return Ok(None);
5960
} else {
60-
return Err(wrap_error(RedisError::Other("Unknown error".to_string())));
61+
return Err(wrap_error(RedisError::Other(
62+
"Connection reset by peer".into(),
63+
)));
6164
}
6265
}
6366
}
@@ -97,25 +100,20 @@ impl Connection {
97100
/// None if the Frame is incomplete and more data is needed.
98101
/// An error if the Frame is invalid.
99102
async fn try_parse_frame(&mut self) -> Result<Option<Frame>> {
100-
let mut buf: Cursor<&[u8]> = Cursor::new(&self.buffer[..]);
103+
let mut cursor: Cursor<&[u8]> = Cursor::new(&self.buffer[..]);
101104

102-
match Frame::check(&mut buf).await {
103-
// Ok means we can parse a complete frame
104-
Ok(()) => {
105-
let len = buf.position() as usize;
106-
107-
let bytes = self.buffer.split_to(len).freeze();
108-
109-
// once we have read the frame, we can advance the buffer
110-
println!("try_parse_frame: len={len}, bytes={bytes:?}");
111-
112-
Ok(Some(Frame::deserialize(bytes).await?))
105+
match Frame::try_parse(&mut cursor) {
106+
Ok(frame) => {
107+
self.buffer.advance(cursor.position() as usize);
108+
Ok(Some(frame))
109+
}
110+
Err(err) => {
111+
if let Some(RedisError::IncompleteFrame) = err.downcast_ref::<RedisError>() {
112+
Ok(None)
113+
} else {
114+
Err(err)
115+
}
113116
}
114-
Err(err) => match &*err {
115-
// IncompleteFrame means we need to read more data
116-
RedisError::IncompleteFrame => Ok(None),
117-
_ => Err(err),
118-
},
119117
}
120118
}
121119
}

src/error.rs

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,24 @@
11
//! Custom error handling for Redis client and a specialized Result type
22
//! used as the return type for Redis operations.
3+
//!
4+
//! todo: implement From trait for RedisError so that we can capture more built in e
35
4-
use std::{error, fmt, io, result, sync};
6+
use std::{error, fmt, io, result};
57

68
/// Represents errors that can occur when working with Redis.
79
#[derive(Debug)]
810
pub enum RedisError {
9-
/// An I/O error that occurred while working with a Redis connection.
10-
Io(io::Error),
1111
/// An incomplete frame was received when reading from the socket.
1212
IncompleteFrame,
1313
/// An invalid frame was received when reading from the socket. According to RESP3 spec.
1414
InvalidFrame,
15-
Other(String),
16-
}
17-
18-
impl From<io::Error> for RedisError {
19-
fn from(err: io::Error) -> Self {
20-
RedisError::Io(err)
21-
}
15+
/// Generic error type.
16+
Other(Error),
2217
}
2318

2419
impl fmt::Display for RedisError {
2520
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2621
match self {
27-
RedisError::Io(err) => write!(f, "IO error: {}", err),
2822
RedisError::IncompleteFrame => write!(f, "incomplete frame"),
2923
RedisError::InvalidFrame => write!(f, "invalid frame"),
3024
RedisError::Other(s) => write!(f, "other error: {}", s),
@@ -35,12 +29,31 @@ impl fmt::Display for RedisError {
3529
// Implement std::error::Error for RedisError.
3630
impl error::Error for RedisError {}
3731

38-
type Error = sync::Arc<RedisError>;
32+
impl From<io::Error> for RedisError {
33+
fn from(err: io::Error) -> Self {
34+
RedisError::Other(err.into())
35+
}
36+
}
37+
38+
impl From<String> for RedisError {
39+
fn from(val: String) -> Self {
40+
RedisError::Other(val.into())
41+
}
42+
}
3943

40-
/// Helper function to wrap errors into Arc.
41-
pub fn wrap_error<E: Into<RedisError>>(err: E) -> Error {
42-
sync::Arc::new(err.into())
44+
impl From<&str> for RedisError {
45+
fn from(val: &str) -> Self {
46+
RedisError::Other(val.into())
47+
}
4348
}
4449

50+
/// Boxed generic error types.
51+
type Error = Box<dyn std::error::Error + Send + Sync>;
52+
4553
/// A specialized `Result` type for Redis operations.
4654
pub type Result<T> = result::Result<T, Error>;
55+
56+
/// Helper function to wrap errors into Box.
57+
pub fn wrap_error<E: Into<RedisError>>(err: E) -> Error {
58+
Box::new(err.into())
59+
}

0 commit comments

Comments
 (0)