Skip to content

Commit 319b1cd

Browse files
committed
0.2.8: background asynchronous batches
1 parent 0b3a84d commit 319b1cd

File tree

7 files changed

+132
-52
lines changed

7 files changed

+132
-52
lines changed

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

README.md

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,16 +30,23 @@ cargo add batched
3030
Or add this to your `Cargo.toml`:
3131
```toml
3232
[dependencies]
33-
batched = "0.2.7"
33+
batched = "0.2.8"
3434
```
3535

36+
### Nightly Rust
37+
Due to the use of advanced features, `batched` requires a nightly Rust compiler.
38+
39+
3640
## #[batched]
37-
- **limit**: Maximum amount of items that can be grouped and processed in a single batch.
41+
- **limit**: Maximum amount of items that can be grouped and processed in a single batch. (required)
3842
- **concurrent**: Maximum amount of concurrent batched tasks running (default: `Infinity`)
39-
- **window**: Maximum amount of time (in milliseconds) the background thread waits after the first call before processing a batch.
40-
- **window[x]**: Maximum amount of time (in milliseconds) the background thread waits after the first call before processing a batch, when the buffer size is <= x
43+
- **asynchronous**: If true, the caller does not wait for the batch to complete, and the return value is `()`. (default: `false`).
44+
- **window**: Maximum amount of time (in milliseconds) the background thread waits after the first call before processing a batch. (required)
45+
- **window[x]**: Maximum amount of time (in milliseconds) the background thread waits after the first call before processing a batch, when the buffer size is <= x. (This allows for more granular control of the batching window based on the current load. For example, you might want to use a shorter window when there are fewer items in the buffer to reduce latency, and a longer window when there are more items to maximize batching efficiency.)
46+
47+
4148

42-
The target function must have a single argument, a vector of items (`Vec<T>`).
49+
The target function must have a single input argument, a vector of items (`Vec<T>`).
4350

4451
The return value of the batched function is propagated (cloned) to all async calls of the batch, unless the batched function returns a `Vec<T>`, in which case the return value for each call is pulled from the iterator in the same order of the input.
4552

@@ -48,7 +55,7 @@ If the return value is not an iterator, The target function return type must imp
4855

4956
## Prerequisites
5057
- Built for async environments (tokio), will not work without a tokio async runtime
51-
- Target function must have async
58+
- The target function must be an async function
5259
- Not supported inside structs:
5360
```rust
5461
struct A;
@@ -65,7 +72,7 @@ impl A {
6572
### [`tracing_span`]
6673
This feature automatically adds tracing spans to call functions for batched requests (`x`, `x_multiple`).
6774

68-
## [`tracing_opentelemetry`]
75+
### [`tracing_opentelemetry`]
6976
This feature adds support for linking spans from callers to the inner batched call when using OpenTelemetry. Depending on whether your OpenTelemetry client supports it, you should be able to see the linked span to the batched call.
7077

7178
## Examples

batched/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[package]
22
name = "batched"
33
description = "rust macro util for batching expensive operations"
4-
version = "0.2.7"
4+
version = "0.2.8"
55
edition = "2024"
66
license = "MIT"
77
readme = "../README.md"
@@ -15,7 +15,7 @@ tracing_opentelemetry = ["opentelemetry", "tracing-opentelemetry"]
1515

1616
[dependencies]
1717
anyhow = "1.0.98"
18-
batched_derive = { version = "0.2.7", path = "../batched_derive" }
18+
batched_derive = { version = "0.2.8", path = "../batched_derive" }
1919
opentelemetry = { version = "0.30.0", optional = true }
2020
tracing = "0.1.41"
2121
tracing-opentelemetry = { version = "0.31.0", optional = true }

batched_derive/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[package]
22
name = "batched_derive"
33
description = "rust macro util for batching expensive operations"
4-
version = "0.2.7"
4+
version = "0.2.8"
55
edition = "2024"
66
license = "MIT"
77
readme = "../README.md"

batched_derive/src/builder.rs

Lines changed: 81 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,22 @@ fn build_identifiers(call_function: &Function) -> Identifiers {
3333
}
3434
}
3535

36-
pub fn build_code(call_function: Function, options: Attributes) -> TokenStream {
37-
let identifiers = build_identifiers(&call_function);
36+
pub fn build_code(function: Function, options: Attributes) -> TokenStream {
37+
let identifiers = build_identifiers(&function);
38+
let executor = build_executor(&identifiers, &function, &options);
39+
let public_interface = build_public_interface(&identifiers, &function, &options);
3840

41+
quote! {
42+
#executor
43+
#public_interface
44+
}
45+
}
46+
47+
fn build_public_interface(
48+
identifiers: &Identifiers,
49+
call_function: &Function,
50+
options: &Attributes,
51+
) -> TokenStream {
3952
let macros = &call_function.macros;
4053
let visibility = &call_function.visibility;
4154
let arg = &call_function.batched_arg;
@@ -45,8 +58,9 @@ pub fn build_code(call_function: Function, options: Attributes) -> TokenStream {
4558
let returned = &call_function.returned.tokens;
4659

4760
let (is_result, is_vec) = function_flags(&call_function);
61+
let asynchronous = options.asynchronous;
4862

49-
let public_interface_returned_type = match &call_function.returned.result_type {
63+
let return_type = match &call_function.returned.result_type {
5064
FunctionResultType::Raw(token) => token.clone(),
5165
FunctionResultType::VectorRaw(token) => token.clone(),
5266
FunctionResultType::Result(output, error, _) => {
@@ -57,7 +71,7 @@ pub fn build_code(call_function: Function, options: Attributes) -> TokenStream {
5771
}
5872
}
5973
};
60-
let public_interface_return_result = if is_result {
74+
let return_result = if is_result {
6175
if is_vec {
6276
quote! {
6377
let mut result = result?;
@@ -81,7 +95,7 @@ pub fn build_code(call_function: Function, options: Attributes) -> TokenStream {
8195
}
8296
};
8397

84-
let public_interface_multiple_returned_type = match &call_function.returned.result_type {
98+
let return_type_multiple = match &call_function.returned.result_type {
8599
FunctionResultType::Raw(token) => token.clone(),
86100
FunctionResultType::VectorRaw(token) => quote! { Vec<#token> },
87101
FunctionResultType::Result(output, error, _) => {
@@ -92,7 +106,7 @@ pub fn build_code(call_function: Function, options: Attributes) -> TokenStream {
92106
}
93107
}
94108
};
95-
let public_interface_multiple_return_result = if is_result {
109+
let return_result_multiple = if is_result {
96110
quote! {
97111
let result = result?;
98112
Ok(result)
@@ -109,58 +123,82 @@ pub fn build_code(call_function: Function, options: Attributes) -> TokenStream {
109123
}),
110124
};
111125

112-
let executor = build_executor(&identifiers, &call_function, &options);
113-
let executor_producer_channel = identifiers.executor_producer_channel;
114-
let executor_background_fn = identifiers.executor_background_fn;
115-
let inner_batched = identifiers.inner_batched;
116-
let public_interface = identifiers.public_interface;
117-
let public_interface_multiple = identifiers.public_interface_multiple;
126+
let executor_producer_channel = &identifiers.executor_producer_channel;
127+
let executor_background_fn = &identifiers.executor_background_fn;
128+
let inner_batched = &identifiers.inner_batched;
129+
let public_interface = &identifiers.public_interface;
130+
let public_interface_multiple = &identifiers.public_interface_multiple;
118131

119132
#[cfg(feature = "tracing_span")]
120133
let tracing_span = quote! { #[tracing::instrument(skip_all)] };
121134
#[cfg(not(feature = "tracing_span"))]
122135
let tracing_span = quote! {};
123136

124-
quote! {
125-
#executor
126-
137+
let inner_batched = quote! {
127138
#(#macros)*
128139
async fn #inner_batched(#arg) -> #returned {
129140
let result = async { #inner_body };
130141
let result = result.await;
131142
#cast_result_error
132143
result
133144
}
145+
};
134146

135-
#tracing_span
136-
#visibility async fn #public_interface(#arg_name: #arg_type) -> #public_interface_returned_type {
137-
let mut result = #public_interface_multiple(vec![#arg_name]).await;
138-
#public_interface_return_result
147+
if asynchronous {
148+
quote! {
149+
#inner_batched
150+
151+
#tracing_span
152+
#visibility async fn #public_interface(#arg_name: #arg_type) {
153+
#public_interface_multiple(vec![#arg_name]).await;
154+
}
155+
156+
#tracing_span
157+
#visibility async fn #public_interface_multiple(#arg_name: Vec<#arg_type>) {
158+
let channel = &#executor_producer_channel;
159+
let channel = channel.get_or_init(async || { #executor_background_fn().await }).await;
160+
161+
let span = ::batched::tracing::Span::current();
162+
channel.send((#arg_name, span, None)).await
163+
.expect("batched function panicked (send)");
164+
}
139165
}
166+
} else {
167+
quote! {
168+
#inner_batched
140169

141-
#tracing_span
142-
#visibility async fn #public_interface_multiple(#arg_name: Vec<#arg_type>) -> #public_interface_multiple_returned_type {
143-
let channel = &#executor_producer_channel;
144-
let channel = channel.get_or_init(async || { #executor_background_fn().await }).await;
170+
#tracing_span
171+
#visibility async fn #public_interface(#arg_name: #arg_type) -> #return_type {
172+
let mut result = #public_interface_multiple(vec![#arg_name]).await;
173+
#return_result
174+
}
175+
176+
#tracing_span
177+
#visibility async fn #public_interface_multiple(#arg_name: Vec<#arg_type>) -> #return_type_multiple {
178+
let channel = &#executor_producer_channel;
179+
let channel = channel.get_or_init(async || { #executor_background_fn().await }).await;
145180

146-
let (response_channel_sender, mut response_channel_recv) = ::tokio::sync::mpsc::channel(1);
147-
let span = ::batched::tracing::Span::current();
148-
channel.send((#arg_name, span, response_channel_sender)).await
149-
.expect("batched function panicked (send)");
181+
let (response_channel_sender, mut response_channel_recv) = ::tokio::sync::mpsc::channel(1);
182+
let span = ::batched::tracing::Span::current();
183+
channel.send((#arg_name, span, Some(response_channel_sender))).await
184+
.expect("batched function panicked (send)");
150185

151-
let result = response_channel_recv.recv().await
152-
.expect("batched function panicked (recv)");
153-
#public_interface_multiple_return_result
186+
let result = response_channel_recv.recv().await
187+
.expect("batched function panicked (recv)");
188+
#return_result_multiple
189+
}
154190
}
155191
}
192+
156193
}
157194

158-
const SEMAPHORE_MAX_PERMITS: usize = 2305843009213693951;
159195
fn build_executor(
160196
identifiers: &Identifiers,
161197
call_function: &Function,
162198
options: &Attributes,
163199
) -> TokenStream {
200+
const SEMAPHORE_MAX_PERMITS: usize = 2305843009213693951;
201+
164202
let capacity = options.limit;
165203
let concurrent_limit = options.concurrent_limit.unwrap_or(SEMAPHORE_MAX_PERMITS);
166204
let default_window = options.default_window;
@@ -208,7 +246,13 @@ fn build_executor(
208246
let result = result.clone();
209247
}
210248
};
211-
let channel_type = quote! { (Vec<#arg_type>, ::batched::tracing::Span, ::tokio::sync::mpsc::Sender<#returned_type_plural>) };
249+
let handle_result = if options.asynchronous {
250+
quote! {}
251+
} else {
252+
handle_result
253+
};
254+
255+
let channel_type = quote! { (Vec<#arg_type>, ::batched::tracing::Span, Option<::tokio::sync::mpsc::Sender<#returned_type_plural>>) };
212256

213257
let inner_batched = &identifiers.inner_batched;
214258
let batched_span_name = inner_batched.to_string();
@@ -230,8 +274,8 @@ fn build_executor(
230274
let semaphore = ::std::sync::Arc::new(::tokio::sync::Semaphore::new(#concurrent_limit));
231275

232276
loop {
233-
let mut data_buffer = Vec::with_capacity(capacity);
234-
let mut return_channels: Vec<(::tokio::sync::mpsc::Sender<#returned_type_plural>, usize)> = vec![];
277+
let mut data_buffer = Vec::new();
278+
let mut return_channels: Vec<(Option<::tokio::sync::mpsc::Sender<#returned_type_plural>>, usize)> = vec![];
235279
let mut waiting_spans: Vec<::batched::tracing::Span> = vec![];
236280

237281
let window_start = ::std::time::Instant::now();
@@ -294,7 +338,9 @@ fn build_executor(
294338
let mut result = ::batched::tracing::Instrument::instrument(future, batched_span).await;
295339
for (channel, count) in channels {
296340
#handle_result
297-
let _ = channel.try_send(result);
341+
if let Some(channel) = channel {
342+
let _ = channel.try_send(result);
343+
}
298344
}
299345
});
300346
}

batched_derive/src/parse.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ pub struct FunctionResult {
3131
pub enum FunctionResultType {
3232
Raw(TokenStream),
3333
VectorRaw(TokenStream),
34-
Result(Box<FunctionResult>, TokenStream, Option<TokenStream>),
34+
Result(Box<FunctionResult>, TokenStream, Option<TokenStream>)
3535
}
3636

3737
fn inner_shared_error(_type: &Type) -> Option<TokenStream> {
@@ -115,7 +115,7 @@ impl Function {
115115
}
116116
let returned = match function.sig.output {
117117
ReturnType::Default => FunctionResult {
118-
tokens: function.sig.output.into_token_stream(),
118+
tokens: syn::parse_str("()").unwrap(),
119119
result_type: FunctionResultType::Raw(syn::parse_str("()").unwrap()),
120120
},
121121
ReturnType::Type(_, _type) => parsed_returned(&_type),
@@ -181,6 +181,7 @@ impl Function {
181181
pub struct Attributes {
182182
pub limit: usize,
183183
pub concurrent_limit: Option<usize>,
184+
pub asynchronous: bool,
184185
pub default_window: u64,
185186
pub windows: BTreeMap<u64, u64>,
186187
}
@@ -189,12 +190,14 @@ impl Attributes {
189190
pub fn parse(tokens: TokenStream) -> Self {
190191
let mut limit: Option<usize> = None;
191192
let mut concurrent_limit: Option<usize> = None;
193+
let mut asynchronous = false;
192194
let mut default_window: Option<u64> = None;
193195
let mut windows = BTreeMap::new();
194196

195197
static WINDOW_ATTR: &str = "window";
196198
static LIMIT_ATTR: &str = "limit";
197199
static CONCURRENT_LIMIT_ATTR: &str = "concurrent";
200+
static ASYNCHRONOUS_ATTR: &str = "asynchronous";
198201

199202
let parser = Punctuated::<Meta, Token![,]>::parse_separated_nonempty;
200203
let attributes = parser.parse(tokens.into()).unwrap();
@@ -216,6 +219,8 @@ impl Attributes {
216219
};
217220

218221
concurrent_limit = expr_to_u64(value).map(|u| u as usize);
222+
} else if path.is_ident(ASYNCHRONOUS_ATTR) {
223+
asynchronous = true;
219224
} else if path.is_ident(WINDOW_ATTR) {
220225
let value = match attr {
221226
Meta::NameValue(attr) => &attr.value,
@@ -253,8 +258,9 @@ impl Attributes {
253258
Self {
254259
limit,
255260
concurrent_limit,
261+
asynchronous,
256262
default_window,
257263
windows,
258264
}
259265
}
260-
}
266+
}

0 commit comments

Comments
 (0)