Skip to content

Commit 282e8b8

Browse files
authored
feat: add more cmds (#4)
1 parent 930a787 commit 282e8b8

File tree

6 files changed

+322
-115
lines changed

6 files changed

+322
-115
lines changed

examples/client_simple.rs

Lines changed: 14 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -1,85 +1,24 @@
1-
// use bytes::Bytes;
2-
// use mini_redis::client;
3-
// use tokio::sync::{mpsc, oneshot};
4-
5-
// type Responder<T> = oneshot::Sender<mini_redis::Result<T>>;
6-
7-
// #[derive(Debug)]
8-
// enum Command {
9-
// Get {
10-
// key: String,
11-
// resp: Responder<Option<Bytes>>,
12-
// },
13-
// Set {
14-
// key: String,
15-
// val: Bytes,
16-
// resp: Responder<()>,
17-
// },
18-
// }
1+
use redis_async::{Client, Result};
192

203
#[tokio::main]
21-
async fn main() {
22-
// let (tx, mut rx) = mpsc::channel(32);
23-
// let tx2 = tx.clone();
24-
25-
// // simulate 1 client tosend a Get request to redis server
26-
// let sender_task1 = tokio::spawn(async move {
27-
// let (resp_tx, resp_rx) = oneshot::channel();
28-
29-
// let cmd = Command::Get {
30-
// key: "foo".to_string(),
31-
// resp: resp_tx,
32-
// };
33-
34-
// println!("Sending from first handle: {:?}", cmd);
35-
36-
// tx.send(cmd).await.unwrap();
37-
38-
// let res: Result<
39-
// Result<Option<Bytes>, Box<dyn Error + Send + Sync>>,
40-
// oneshot::error::RecvError,
41-
// > = resp_rx.await;
42-
// println!("Got: {:?}", res);
43-
// });
44-
45-
// // simulate 1 client tosend a Set request to redis server
46-
// let sender_task2 = tokio::spawn(async move {
47-
// let (resp_tx, resp_rx) = oneshot::channel();
48-
49-
// let cmd = Command::Set {
50-
// key: "foo".to_string(),
51-
// val: "bar".into(),
52-
// resp: resp_tx,
53-
// };
4+
async fn main() -> Result<()> {
5+
let mut client = Client::connect("127.0.0.1:6379").await?;
546

55-
// println!("Sending from second handle: {:?}", cmd);
7+
let resp = client.set("mykey", "myvalue").await?.unwrap();
568

57-
// tx2.send(cmd).await.unwrap();
9+
println!("SET command response: {}", resp);
5810

59-
// let res = resp_rx.await;
60-
// println!("Got: {:?}", res);
61-
// });
11+
let resp = client.get("mykey").await?;
6212

63-
// // spawn a task coordinator to proxy between client and server
64-
// let manager = tokio::spawn(async move {
65-
// let mut client = client::connect("127.0.0.1:6379").await.unwrap();
13+
if let Some(data) = resp {
14+
println!("GET command response: {}", data);
15+
} else {
16+
println!("Key not found");
17+
}
6618

67-
// while let Some(msg) = rx.recv().await {
68-
// use Command::*;
19+
let resp = client.del(vec!["mykey"]).await?;
6920

70-
// match msg {
71-
// Get { key, resp } => {
72-
// let res = client.get(&key).await;
73-
// let _ = resp.send(res);
74-
// }
75-
// Set { key, val, resp } => {
76-
// let res = client.set(&key, val).await;
77-
// let _ = resp.send(res);
78-
// }
79-
// };
80-
// }
81-
// });
21+
println!("DEL command response: {}", resp);
8222

83-
// let (res, ..) = tokio::join!(sender_task1, sender_task2, manager);
84-
// res.unwrap();
23+
Ok(())
8524
}

examples/hello_redis.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,17 @@ use tokio::task::JoinHandle;
88
#[tokio::main(worker_threads = 1)]
99
async fn main() -> Result<()> {
1010
let num_clients = 10;
11+
1112
let mut handles: Vec<JoinHandle<()>> = Vec::with_capacity(num_clients);
1213

1314
for id in 0..num_clients {
14-
let msg: String = "Some message".into();
15-
1615
let handle = tokio::spawn(async move {
1716
let mut c = Client::connect("127.0.0.1:6379").await.unwrap();
18-
let resp = c.ping(Some(msg.clone())).await.unwrap();
17+
let resp = c.ping(Some("Redis")).await.unwrap();
1918
// sleep for 1 second, this should not block other clients
2019
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
2120

22-
println!(
23-
"From client {id}: Pinged the Redis server with message: {msg}. Got response: {resp}"
24-
);
21+
println!("From client {id} Got response: {resp}");
2522
});
2623

2724
handles.push(handle);

src/client.rs

Lines changed: 79 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,19 @@
1+
//! Redis client CLI application. A simple command line interface to interact with a Redis server.
2+
//!
3+
//! The clients default to RESP2 unless HELLO 3 is explicitly sent.
4+
//! It can operate in two modes: interactive and single command mode.
5+
//! In interactive mode, the user can send commands to the server and get the response. It starts an REPL loop.
6+
//! In single command mode, the user can send a single command to the server and get the response.
7+
//! Both modes are blocking and synchronous.
8+
19
use crate::Connection;
210
use crate::Frame;
311
use crate::RedisError;
412
use crate::Result;
5-
use crate::cmd::{Command, Ping};
13+
use crate::cmd::*;
614
use crate::error::wrap_error;
715
use bytes::Bytes;
16+
use std::str::from_utf8;
817
use tokio::net::{TcpStream, ToSocketAddrs};
918

1019
/// Redis client implementation.
@@ -39,7 +48,7 @@ impl Client {
3948
///
4049
/// # Arguments
4150
///
42-
/// * `msg` - An optional message to send to the server.
51+
/// * `msg` - An optional message to send to the server
4352
///
4453
/// # Returns
4554
///
@@ -53,11 +62,10 @@ impl Client {
5362
///
5463
/// #[tokio::main]
5564
/// async fn main() {
56-
/// let mut c = Client::connect("127.0.0.1:6379").await.unwrap();
57-
///
58-
/// let resp = c.ping(Some("Hello Redis".to_string())).await.unwrap();
59-
/// }
60-
pub async fn ping(&mut self, msg: Option<String>) -> Result<String> {
65+
/// let mut client = Client::connect("127.0.0.1:6379").await.unwrap();
66+
/// let resp = client.ping(Some("Hello Redis".to_string())).await.unwrap();
67+
/// ```
68+
pub async fn ping(&mut self, msg: Option<&str>) -> Result<String> {
6169
let frame: Frame = Ping::new(msg).into_stream();
6270

6371
self.conn.write_frame(&frame).await?;
@@ -71,14 +79,70 @@ impl Client {
7179
}
7280
}
7381

74-
#[allow(dead_code)]
75-
pub async fn get(&self, _: &str) -> Self {
76-
unimplemented!()
82+
/// Sends a GET command to the Redis server, with a key.
83+
///
84+
/// # Arguments
85+
///
86+
/// * `key` - A required key to send to the server
87+
///
88+
/// # Returns
89+
///
90+
/// * `Ok(Some(String))` if the key to GET exists
91+
/// * `Ok(None)` if the key to GET does not exist
92+
/// * `Err(RedisError)` if an error occurs
93+
///
94+
/// # Examples
95+
///
96+
/// ```ignore
97+
/// use async_redis::Client;
98+
///
99+
/// #[tokio::main]
100+
/// async fn main() {
101+
/// let mut client = Client::connect("127.0.0.1:6379").await.unwrap();
102+
/// let resp = client.get("mykey").await?;
103+
/// ```
104+
pub async fn get(&mut self, key: &str) -> Result<Option<String>> {
105+
let frame: Frame = Get::new(key).into_stream();
106+
107+
self.conn.write_frame(&frame).await?;
108+
109+
match self.read_response().await? {
110+
Some(data) => {
111+
let resp = String::from_utf8(data.to_vec()).unwrap();
112+
Ok(Some(resp))
113+
}
114+
// no error, but the key doesn't exist
115+
None => Ok(None),
116+
}
117+
}
118+
119+
// todo: the real SET command has some other options like EX, PX, NX, XX
120+
// we need to add these options to the SET command. Possibly with option pattern
121+
pub async fn set(&mut self, key: &str, val: &str) -> Result<Option<String>> {
122+
let frame: Frame = Set::new(key, val).into_stream();
123+
124+
self.conn.write_frame(&frame).await?;
125+
126+
match self.read_response().await? {
127+
Some(data) => {
128+
let resp = String::from_utf8(data.to_vec()).unwrap();
129+
Ok(Some(resp))
130+
}
131+
// we shouldn't get here, if no key is deleted, we expect an 0
132+
None => Ok(None),
133+
}
77134
}
78135

79-
#[allow(dead_code)]
80-
pub async fn set(&self, _: &str, _: String) -> Self {
81-
unimplemented!()
136+
pub async fn del(&mut self, keys: Vec<&str>) -> Result<i64> {
137+
let frame: Frame = Del::new(keys).into_stream();
138+
139+
self.conn.write_frame(&frame).await?;
140+
141+
match self.read_response().await? {
142+
Some(data) => Ok(from_utf8(&data)?.parse::<i64>()?),
143+
// we shouldn't get here, we always expect a number from the server
144+
None => Err(wrap_error(RedisError::Other("Unknown error".into()))),
145+
}
82146
}
83147

84148
/// Reads the response from the server. The response is a searilzied frame.
@@ -93,7 +157,9 @@ impl Client {
93157
match self.conn.read_frame().await? {
94158
Some(Frame::SimpleString(data)) => Ok(Some(Bytes::from(data))),
95159
Some(Frame::SimpleError(data)) => Err(wrap_error(RedisError::Other(data.into()))),
160+
Some(Frame::Integer(data)) => Ok(Some(Bytes::from(data.to_string()))),
96161
Some(Frame::BulkString(data)) => Ok(Some(data)),
162+
Some(Frame::Null) => Ok(None),
97163
Some(_) => unimplemented!(),
98164
None => Err(wrap_error(RedisError::Other("Unknown error".into()))),
99165
}

0 commit comments

Comments
 (0)