Skip to content

Commit 6782d24

Browse files
committed
editoast: mq_client: use a dashmap for correlation ids
Signed-off-by: Younes Khoudli <[email protected]>
1 parent 3f6bc58 commit 6782d24

File tree

4 files changed

+9
-8
lines changed

4 files changed

+9
-8
lines changed

editoast/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

editoast/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ chrono = { version = "0.4.42", default-features = false, features = ["serde"] }
4040
clap = { version = "4.5", features = ["derive", "env"] }
4141
common = { path = "./common" }
4242
core_client = { path = "./core_client" }
43+
dashmap = "6.1.0"
4344
darling = "0.21"
4445
database = { path = "./database" }
4546
deadpool = { version = "0.12.3", default-features = false, features = [
@@ -158,7 +159,7 @@ clap.workspace = true
158159
colored = "3.0.0"
159160
common = { workspace = true }
160161
core_client = { workspace = true, features = ["mocking_client"] }
161-
dashmap = "6.1.0"
162+
dashmap.workspace = true
162163
database.workspace = true
163164
deadpool.workspace = true
164165
deadpool-redis.workspace = true

editoast/core_client/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ mocking_client = []
1111
approx.workspace = true
1212
chrono.workspace = true
1313
common = { workspace = true }
14+
dashmap.workspace = true
1415
deadpool.workspace = true
1516
editoast_derive.workspace = true
1617
educe.workspace = true

editoast/core_client/src/mq_client.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use dashmap::DashMap;
12
use deadpool::managed::Manager;
23
use deadpool::managed::Metrics;
34
use deadpool::managed::Pool;
@@ -18,7 +19,6 @@ use lapin::types::FieldTable;
1819
use lapin::types::ShortString;
1920
use serde::Serialize;
2021
use serde_json::to_vec;
21-
use std::collections::HashMap;
2222
use std::fmt::Debug;
2323
use std::sync::Arc;
2424
use thiserror::Error;
@@ -93,15 +93,15 @@ impl Manager for ChannelManager {
9393
#[derive(Debug)]
9494
pub struct ChannelWorker {
9595
channel: Arc<Channel>,
96-
response_tracker: Arc<RwLock<HashMap<String, oneshot::Sender<Delivery>>>>,
96+
response_tracker: Arc<DashMap<String, oneshot::Sender<Delivery>>>,
9797
consumer_tag: String,
9898
}
9999

100100
impl ChannelWorker {
101101
pub async fn new(channel: Arc<Channel>, hostname: String) -> Self {
102102
let worker = ChannelWorker {
103103
channel,
104-
response_tracker: Arc::new(RwLock::new(HashMap::new())),
104+
response_tracker: Arc::new(DashMap::new()),
105105
consumer_tag: format!("{}-{}", hostname, Uuid::new_v4()),
106106
};
107107
worker.dispatching_loop().await;
@@ -117,8 +117,7 @@ impl ChannelWorker {
117117
correlation_id: String,
118118
tx: oneshot::Sender<Delivery>,
119119
) {
120-
let mut response_tracker = self.response_tracker.write().await;
121-
response_tracker.insert(correlation_id, tx);
120+
self.response_tracker.insert(correlation_id, tx);
122121
}
123122

124123
pub fn should_reuse(&self) -> bool {
@@ -147,8 +146,7 @@ impl ChannelWorker {
147146
while let Some(delivery) = consumer.next().await {
148147
let delivery = delivery.expect("Error in receiving message");
149148
if let Some(correlation_id) = delivery.properties.correlation_id().as_ref() {
150-
let mut tracker = response_tracker.write().await;
151-
if let Some(sender) = tracker.remove(correlation_id.as_str()) {
149+
if let Some((_, sender)) = response_tracker.remove(correlation_id.as_str()) {
152150
let _ = sender.send(delivery);
153151
}
154152
} else {

0 commit comments

Comments
 (0)