Skip to content

Commit 02fd086

Browse files
committed
Fix service Info to include endpoints info
Signed-off-by: Tomasz Pietrek <[email protected]>
1 parent 740b415 commit 02fd086

File tree

3 files changed

+70
-9
lines changed

3 files changed

+70
-9
lines changed

async-nats/src/service/endpoint.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,3 +183,15 @@ pub struct Stats {
183183
/// Queue group to which this endpoint is assigned to.
184184
pub queue_group: String,
185185
}
186+
187+
#[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq, Eq)]
188+
pub struct Info {
189+
/// Name of the endpoint.
190+
pub name: String,
191+
/// Endpoint subject.
192+
pub subject: String,
193+
/// Queue group to which this endpoint is assigned.
194+
pub queue_group: String,
195+
/// Endpoint-specific metadata.
196+
pub metadata: HashMap<String, String>,
197+
}

async-nats/src/service/mod.rs

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -93,10 +93,10 @@ pub struct Info {
9393
pub description: Option<String>,
9494
/// Service version.
9595
pub version: String,
96-
/// All service endpoints.
97-
pub subjects: Vec<String>,
9896
/// Additional metadata
9997
pub metadata: HashMap<String, String>,
98+
/// Info about all service endpoints.
99+
pub endpoints: Vec<endpoint::Info>,
100100
}
101101

102102
/// Configuration of the [Service].
@@ -322,6 +322,10 @@ impl Service {
322322
"service name is not a valid string (only A-Z, a-z, 0-9, _, - are allowed)",
323323
)));
324324
}
325+
let endpoints_state = Arc::new(Mutex::new(Endpoints {
326+
endpoints: HashMap::new(),
327+
}));
328+
325329
let queue_group = config
326330
.queue_group
327331
.unwrap_or(DEFAULT_QUEUE_GROUP.to_string());
@@ -334,15 +338,12 @@ impl Service {
334338
id: id.clone(),
335339
description: config.description.clone(),
336340
version: config.version.clone(),
337-
subjects: Vec::default(),
338341
metadata: config.metadata.clone().unwrap_or_default(),
342+
endpoints: Vec::new(),
339343
};
340344

341345
let (shutdown_tx, _) = tokio::sync::broadcast::channel(1);
342346

343-
let endpoints = HashMap::new();
344-
let endpoints_state = Arc::new(Mutex::new(Endpoints { endpoints }));
345-
346347
// create subscriptions for all verbs.
347348
let mut pings =
348349
verb_subscription(client.clone(), Verb::Ping, config.name.clone(), id.clone()).await?;
@@ -355,7 +356,6 @@ impl Service {
355356
let handle = tokio::task::spawn({
356357
let mut stats_callback = config.stats_handler;
357358
let info = info.clone();
358-
let subjects = subjects.clone();
359359
let endpoints_state = endpoints_state.clone();
360360
let client = client.clone();
361361
async move {
@@ -371,10 +371,20 @@ impl Service {
371371
client.publish(ping.reply.unwrap(), pong.into()).await?;
372372
},
373373
Some(info_request) = infos.next() => {
374-
let subjects = subjects.clone();
375374
let info = info.clone();
375+
376+
let endpoints: Vec<endpoint::Info> = {
377+
endpoints_state.lock().unwrap().endpoints.values().map(|value| {
378+
endpoint::Info {
379+
name: value.name.to_owned(),
380+
subject: value.subject.to_owned(),
381+
queue_group: value.queue_group.to_owned(),
382+
metadata: value.metadata.to_owned()
383+
}
384+
}).collect()
385+
};
376386
let info = Info {
377-
subjects: subjects.lock().unwrap().to_vec(),
387+
endpoints,
378388
..info
379389
};
380390
let info_json = serde_json::to_vec(&info).map(Bytes::from)?;

async-nats/tests/service_tests.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -469,6 +469,45 @@ mod service {
469469
assert_eq!(responses.take(2).count().await, 2);
470470
}
471471

472+
#[tokio::test]
473+
async fn info() {
474+
let server = nats_server::run_basic_server();
475+
let client = async_nats::connect(server.client_url()).await.unwrap();
476+
477+
let service = client
478+
.service_builder()
479+
.start("service", "1.0.0")
480+
.await
481+
.unwrap();
482+
483+
let endpoint_info = service::endpoint::Info {
484+
name: "endpoint_1".to_string(),
485+
subject: "subject".to_string(),
486+
queue_group: "queue".to_string(),
487+
metadata: HashMap::from([("key".to_string(), "value".to_string())]),
488+
};
489+
490+
service
491+
.endpoint_builder()
492+
.name(&endpoint_info.name)
493+
.metadata(endpoint_info.metadata.clone())
494+
.queue_group(&endpoint_info.queue_group)
495+
.add(&endpoint_info.subject)
496+
.await
497+
.unwrap();
498+
499+
let info: service::Info = serde_json::from_slice(
500+
&client
501+
.request("$SRV.INFO".into(), "".into())
502+
.await
503+
.unwrap()
504+
.payload,
505+
)
506+
.unwrap();
507+
508+
assert_eq!(&endpoint_info, info.endpoints.first().unwrap());
509+
}
510+
472511
#[tokio::test]
473512
#[cfg(not(target_os = "windows"))]
474513
async fn cross_clients_tests() {

0 commit comments

Comments
 (0)