Skip to content

Commit 974720b

Browse files
OtaKJarema
authored andcommitted
Add double_ack_with method to JetStream Messages
This will allow consumers of the library to obtain exactly-once delivery semantics across a wider range of valid usecases that include NAKing entire ranges of Messages.
1 parent e92dc53 commit 974720b

File tree

2 files changed

+90
-17
lines changed

2 files changed

+90
-17
lines changed

async-nats/src/jetstream/message.rs

Lines changed: 86 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,7 @@ impl Message {
338338
}
339339
}
340340

341-
/// Acknowledges a message delivery by sending `+ACK` to the server
341+
/// Acknowledges a message delivery by sending a chosen [AckKind] to the server
342342
/// and awaits for confirmation for the server that it received the message.
343343
/// Useful if user wants to ensure `exactly once` semantics.
344344
///
@@ -350,6 +350,7 @@ impl Message {
350350
/// ```no_run
351351
/// # #[tokio::main]
352352
/// # async fn main() -> Result<(), async_nats::Error> {
353+
/// use async_nats::jetstream::AckKind;
353354
/// use futures_util::StreamExt;
354355
/// let client = async_nats::connect("localhost:4222").await?;
355356
/// let jetstream = async_nats::jetstream::new(client);
@@ -363,18 +364,18 @@ impl Message {
363364
/// let mut messages = consumer.fetch().max_messages(100).messages().await?;
364365
///
365366
/// while let Some(message) = messages.next().await {
366-
/// message?.double_ack().await?;
367+
/// message?.double_ack_with(AckKind::Ack).await?;
367368
/// }
368369
/// # Ok(())
369370
/// # }
370371
/// ```
371-
pub async fn double_ack(&self) -> Result<(), Error> {
372+
pub async fn double_ack_with(&self, ack_kind: AckKind) -> Result<(), Error> {
372373
if let Some(ref reply) = self.reply {
373374
let inbox = self.context.client.new_inbox();
374375
let mut subscription = self.context.client.subscribe(inbox.clone()).await?;
375376
self.context
376377
.client
377-
.publish_with_reply(reply.clone(), inbox, AckKind::Ack.into())
378+
.publish_with_reply(reply.clone(), inbox, ack_kind.into())
378379
.await?;
379380
match tokio::time::timeout(self.context.timeout, subscription.next())
380381
.await
@@ -394,6 +395,40 @@ impl Message {
394395
}
395396
}
396397

398+
/// Acknowledges a message delivery by sending `+ACK` to the server
399+
/// and awaits for confirmation for the server that it received the message.
400+
/// Useful if user wants to ensure `exactly once` semantics.
401+
///
402+
/// If [AckPolicy][crate::jetstream::consumer::AckPolicy] is set to `All` or `Explicit`, messages has to be acked.
403+
/// Otherwise redeliveries will occur and [Consumer][crate::jetstream::consumer::Consumer] will not be able to advance.
404+
///
405+
/// # Examples
406+
///
407+
/// ```no_run
408+
/// # #[tokio::main]
409+
/// # async fn main() -> Result<(), async_nats::Error> {
410+
/// use futures_util::StreamExt;
411+
/// let client = async_nats::connect("localhost:4222").await?;
412+
/// let jetstream = async_nats::jetstream::new(client);
413+
///
414+
/// let consumer = jetstream
415+
/// .get_stream("events")
416+
/// .await?
417+
/// .get_consumer("pull")
418+
/// .await?;
419+
///
420+
/// let mut messages = consumer.fetch().max_messages(100).messages().await?;
421+
///
422+
/// while let Some(message) = messages.next().await {
423+
/// message?.double_ack().await?;
424+
/// }
425+
/// # Ok(())
426+
/// # }
427+
/// ```
428+
pub async fn double_ack(&self) -> Result<(), Error> {
429+
self.double_ack_with(AckKind::Ack).await
430+
}
431+
397432
/// Returns the `JetStream` message ID
398433
/// if this is a `JetStream` message.
399434
#[allow(clippy::mixed_read_write_in_expression)]
@@ -535,8 +570,7 @@ impl Acker {
535570
/// ```no_run
536571
/// # #[tokio::main]
537572
/// # async fn main() -> Result<(), async_nats::Error> {
538-
/// use async_nats::jetstream::consumer::PullConsumer;
539-
/// use async_nats::jetstream::Message;
573+
/// use async_nats::jetstream::{consumer::PullConsumer, Message};
540574
/// use futures_util::StreamExt;
541575
/// let client = async_nats::connect("localhost:4222").await?;
542576
/// let jetstream = async_nats::jetstream::new(client);
@@ -581,9 +615,7 @@ impl Acker {
581615
/// ```no_run
582616
/// # #[tokio::main]
583617
/// # async fn main() -> Result<(), async_nats::Error> {
584-
/// use async_nats::jetstream::consumer::PullConsumer;
585-
/// use async_nats::jetstream::AckKind;
586-
/// use async_nats::jetstream::Message;
618+
/// use async_nats::jetstream::{consumer::PullConsumer, AckKind, Message};
587619
/// use futures_util::StreamExt;
588620
/// let client = async_nats::connect("localhost:4222").await?;
589621
/// let jetstream = async_nats::jetstream::new(client);
@@ -621,7 +653,7 @@ impl Acker {
621653
}
622654
}
623655

624-
/// Acknowledges a message delivery by sending `+ACK` to the server
656+
/// Acknowledges a message delivery by sending the chosen [AckKind] to the server
625657
/// and awaits for confirmation for the server that it received the message.
626658
/// Useful if user wants to ensure `exactly once` semantics.
627659
///
@@ -633,7 +665,7 @@ impl Acker {
633665
/// ```no_run
634666
/// # #[tokio::main]
635667
/// # async fn main() -> Result<(), async_nats::Error> {
636-
/// use async_nats::jetstream::Message;
668+
/// use async_nats::jetstream::{AckKind, Message};
637669
/// use futures_util::StreamExt;
638670
/// let client = async_nats::connect("localhost:4222").await?;
639671
/// let jetstream = async_nats::jetstream::new(client);
@@ -652,18 +684,18 @@ impl Acker {
652684
/// // while retaining ability to ack later.
653685
/// println!("message: {:?}", message);
654686
/// // Ack it. `Message` may be dropped already.
655-
/// acker.double_ack().await?;
687+
/// acker.double_ack_with(AckKind::Ack).await?;
656688
/// }
657689
/// # Ok(())
658690
/// # }
659691
/// ```
660-
pub async fn double_ack(&self) -> Result<(), Error> {
692+
pub async fn double_ack_with(&self, ack_kind: AckKind) -> Result<(), Error> {
661693
if let Some(ref reply) = self.reply {
662694
let inbox = self.context.client.new_inbox();
663695
let mut subscription = self.context.client.subscribe(inbox.to_owned()).await?;
664696
self.context
665697
.client
666-
.publish_with_reply(reply.to_owned(), inbox, AckKind::Ack.into())
698+
.publish_with_reply(reply.to_owned(), inbox, ack_kind.into())
667699
.await?;
668700
match tokio::time::timeout(self.context.timeout, subscription.next())
669701
.await
@@ -682,6 +714,46 @@ impl Acker {
682714
)))
683715
}
684716
}
717+
718+
/// Acknowledges a message delivery by sending `+ACK` to the server
719+
/// and awaits for confirmation for the server that it received the message.
720+
/// Useful if user wants to ensure `exactly once` semantics.
721+
///
722+
/// If [AckPolicy][crate::jetstream::consumer::AckPolicy] is set to `All` or `Explicit`, messages has to be acked.
723+
/// Otherwise redeliveries will occur and [Consumer][crate::jetstream::consumer::Consumer] will not be able to advance.
724+
///
725+
/// # Examples
726+
///
727+
/// ```no_run
728+
/// # #[tokio::main]
729+
/// # async fn main() -> Result<(), async_nats::Error> {
730+
/// use async_nats::jetstream::Message;
731+
/// use futures_util::StreamExt;
732+
/// let client = async_nats::connect("localhost:4222").await?;
733+
/// let jetstream = async_nats::jetstream::new(client);
734+
///
735+
/// let consumer = jetstream
736+
/// .get_stream("events")
737+
/// .await?
738+
/// .get_consumer("pull")
739+
/// .await?;
740+
///
741+
/// let mut messages = consumer.fetch().max_messages(100).messages().await?;
742+
///
743+
/// while let Some(message) = messages.next().await {
744+
/// let (message, acker) = message.map(Message::split)?;
745+
/// // Do something with the message. Ownership can be taken over `Message`.
746+
/// // while retaining ability to ack later.
747+
/// println!("message: {:?}", message);
748+
/// // Ack it. `Message` may be dropped already.
749+
/// acker.double_ack().await?;
750+
/// }
751+
/// # Ok(())
752+
/// # }
753+
/// ```
754+
pub async fn double_ack(&self) -> Result<(), Error> {
755+
self.double_ack_with(AckKind::Ack).await
756+
}
685757
}
686758
/// The kinds of response used for acknowledging a processed message.
687759
#[derive(Debug, Clone, Copy)]

async-nats/tests/jetstream_tests.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2457,9 +2457,9 @@ mod jetstream {
24572457
.unwrap();
24582458
let mut consumer = stream.get_consumer("pull").await.unwrap();
24592459

2460-
for _ in 0..10 {
2460+
for i in 0..10 {
24612461
context
2462-
.publish("events", "dat".into())
2462+
.publish("events", format!("dat-{i}").into())
24632463
.await
24642464
.unwrap()
24652465
.await
@@ -2523,10 +2523,11 @@ mod jetstream {
25232523
if let Some(message) = iter.next().await {
25242524
message
25252525
.unwrap()
2526-
.ack_with(async_nats::jetstream::AckKind::Nak(None))
2526+
.double_ack_with(async_nats::jetstream::AckKind::Nak(None))
25272527
.await
25282528
.unwrap();
25292529
}
2530+
25302531
client.flush().await.unwrap();
25312532

25322533
tokio::time::sleep(Duration::from_millis(100)).await;

0 commit comments

Comments
 (0)