Skip to content

Commit cd13b9e

Browse files
committed
0.2.7: tracing support, dynamic call window
1 parent e143b5e commit cd13b9e

File tree

10 files changed

+589
-49
lines changed

10 files changed

+589
-49
lines changed

Cargo.lock

Lines changed: 433 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

README.md

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,14 @@ cargo add batched
99
Or add this to your `Cargo.toml`:
1010
```toml
1111
[dependencies]
12-
batched = "0.2.3"
12+
batched = "0.2.7"
1313
```
1414

1515
## #[batched]
16-
- **window**: Minimum amount of time (in milliseconds) the background thread waits before processing a batch.
1716
- **limit**: Maximum amount of items that can be grouped and processed in a single batch.
1817
- **concurrent**: Maximum amount of concurrent batched tasks running (default: `Infinity`)
18+
- **window**: Minimum amount of time (in milliseconds) the background thread waits before processing a batch.
19+
- **window[x]**: Minimum amount of time (in milliseconds) the background thread waits before processing a batch when latest buffer size is <= x
1920

2021
The target function must have a single argument, a vector of items (`Vec<T>`).
2122

@@ -39,6 +40,12 @@ impl A {
3940
}
4041
```
4142

43+
## Tracing
44+
### [`tracing_span`]
45+
This feature automatically adds tracing spans to call functions for batched requests (`x`, `x_multiple`).
46+
47+
## [`tracing_opentelemetry`]
48+
This feature adds support for linking spans from callers to the inner batched call when using OpenTelemetry. Depending on whether your OpenTelemetry client supports it, you should be able to see the linked span to the batched call.
4249

4350
## Examples
4451

batched/Cargo.toml

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,21 @@
11
[package]
22
name = "batched"
33
description = "rust macro util for batching expensive operations"
4-
version = "0.2.5"
4+
version = "0.2.7"
55
edition = "2024"
66
license = "MIT"
77
readme = "../README.md"
88
repository = "https://github.com/hackermondev/batched"
99
keywords = ["batch", "performance", "efficient"]
1010

11+
[features]
12+
default = []
13+
tracing_span = ["batched_derive/tracing_span"]
14+
tracing_opentelemetry = ["opentelemetry", "tracing-opentelemetry"]
1115

1216
[dependencies]
1317
anyhow = "1.0.98"
14-
batched_derive = { version = "0.2.5", path = "../batched_derive" }
18+
batched_derive = { version = "0.2.7", path = "../batched_derive" }
19+
opentelemetry = { version = "0.30.0", optional = true }
20+
tracing = "0.1.41"
21+
tracing-opentelemetry = { version = "0.31.0", optional = true }

batched/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
#![feature(try_trait_v2)]
22
#![feature(negative_impls)]
3+
#![feature(specialization)]
34

45
pub mod error;
56
pub use batched_derive::batched;
7+
pub mod tracing;

batched/src/tracing.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
pub use tracing::*;
2+
3+
pub trait TracingSpan {
4+
fn link_span(&mut self, span: &Span);
5+
}
6+
7+
impl<N> TracingSpan for N {
8+
default fn link_span(&mut self, _span: &Span) {}
9+
}
10+
11+
#[cfg(feature = "tracing_opentelemetry")]
12+
impl<T: tracing_opentelemetry::OpenTelemetrySpanExt> TracingSpan for T {
13+
fn link_span(&mut self, span: &Span) {
14+
self.add_link(
15+
opentelemetry::trace::TraceContextExt::span(
16+
&tracing_opentelemetry::OpenTelemetrySpanExt::context(span),
17+
)
18+
.span_context()
19+
.clone(),
20+
);
21+
}
22+
}

batched_derive/Cargo.toml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
11
[package]
22
name = "batched_derive"
33
description = "rust macro util for batching expensive operations"
4-
version = "0.2.5"
4+
version = "0.2.7"
55
edition = "2024"
66
license = "MIT"
77
readme = "../README.md"
88
repository = "https://github.com/hackermondev/batched"
99
keywords = ["batch", "performance", "efficient"]
1010

11+
[features]
12+
default = []
13+
tracing_span = []
14+
1115
[lib]
1216
proc-macro = true
1317

batched_derive/src/builder.rs

Lines changed: 58 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,15 @@ struct Identifiers {
1111
public_interface_multiple: Ident,
1212
inner_batched: Ident,
1313
executor_producer_channel: Ident,
14-
executor_background_fn: Ident
14+
executor_background_fn: Ident,
1515
}
1616

1717
fn build_identifiers(call_function: &Function) -> Identifiers {
1818
let id = &call_function.identifier;
1919

2020
let public_interface = format_ident!("{id}");
2121
let public_interface_multiple = format_ident!("{id}_multiple");
22-
let inner_batched = format_ident!("__{id}__");
22+
let inner_batched = format_ident!("{id}__batched");
2323

2424
let executor_producer_channel = format_ident!("BATCHED_{}", id.to_uppercase());
2525
let executor_background_fn = format_ident!("spawn_executor_{id}");
@@ -29,7 +29,7 @@ fn build_identifiers(call_function: &Function) -> Identifiers {
2929
public_interface_multiple,
3030
inner_batched,
3131
executor_producer_channel,
32-
executor_background_fn
32+
executor_background_fn,
3333
}
3434
}
3535

@@ -104,14 +104,9 @@ pub fn build_code(call_function: Function, options: Attributes) -> TokenStream {
104104
let cast_result_error = match &call_function.returned.result_type {
105105
FunctionResultType::Raw(_) => None,
106106
FunctionResultType::VectorRaw(_) => None,
107-
FunctionResultType::Result(_, _, inner_shared_error) => {
108-
match inner_shared_error {
109-
Some(inner_shared_error) => Some(quote! {
110-
let result = result.map_err(|e: #inner_shared_error| e.into());
111-
}),
112-
_ => None
113-
}
114-
}
107+
FunctionResultType::Result(_, _, inner_shared_error) => inner_shared_error.as_ref().map(|inner_shared_error| quote! {
108+
let result = result.map_err(|e: #inner_shared_error| e.into());
109+
}),
115110
};
116111

117112
let executor = build_executor(&identifiers, &call_function, &options);
@@ -121,6 +116,11 @@ pub fn build_code(call_function: Function, options: Attributes) -> TokenStream {
121116
let public_interface = identifiers.public_interface;
122117
let public_interface_multiple = identifiers.public_interface_multiple;
123118

119+
#[cfg(feature = "tracing_span")]
120+
let tracing_span = quote! { #[tracing::instrument(skip_all)] };
121+
#[cfg(not(feature = "tracing_span"))]
122+
let tracing_span = quote! {};
123+
124124
quote! {
125125
#executor
126126

@@ -132,17 +132,20 @@ pub fn build_code(call_function: Function, options: Attributes) -> TokenStream {
132132
result
133133
}
134134

135+
#tracing_span
135136
#visibility async fn #public_interface(#arg_name: #arg_type) -> #public_interface_returned_type {
136137
let mut result = #public_interface_multiple(vec![#arg_name]).await;
137138
#public_interface_return_result
138139
}
139140

141+
#tracing_span
140142
#visibility async fn #public_interface_multiple(#arg_name: Vec<#arg_type>) -> #public_interface_multiple_returned_type {
141143
let channel = &#executor_producer_channel;
142144
let channel = channel.get_or_init(async || { #executor_background_fn().await }).await;
143145

144146
let (response_channel_sender, mut response_channel_recv) = ::tokio::sync::mpsc::channel(1);
145-
channel.send((#arg_name, response_channel_sender)).await
147+
let span = ::batched::tracing::Span::current();
148+
channel.send((#arg_name, span, response_channel_sender)).await
146149
.expect("batched function panicked (send)");
147150

148151
let result = response_channel_recv.recv().await
@@ -159,8 +162,17 @@ fn build_executor(
159162
options: &Attributes,
160163
) -> TokenStream {
161164
let capacity = options.limit;
162-
let window = options.window;
163165
let concurrent_limit = options.concurrent_limit.unwrap_or(SEMAPHORE_MAX_PERMITS);
166+
let default_window = options.default_window;
167+
168+
let windows = options.windows.iter();
169+
let windows = windows.map(|(call_size, call_window)| {
170+
quote! { windows.insert(#call_size, #call_window); }
171+
});
172+
let windows = quote! {
173+
let mut windows = ::std::collections::BTreeMap::<u64, u64>::new();
174+
#(#windows)*
175+
};
164176

165177
let arg_type = &call_function.batched_arg_type;
166178
let returned_type_plural = match &call_function.returned.result_type {
@@ -196,19 +208,22 @@ fn build_executor(
196208
let result = result.clone();
197209
}
198210
};
199-
let channel_type =
200-
quote! { (Vec<#arg_type>, ::tokio::sync::mpsc::Sender<#returned_type_plural>) };
211+
let channel_type = quote! { (Vec<#arg_type>, ::batched::tracing::Span, ::tokio::sync::mpsc::Sender<#returned_type_plural>) };
201212

202213
let inner_batched = &identifiers.inner_batched;
214+
let batched_span_name = inner_batched.to_string();
203215
let executor_producer_channel = &identifiers.executor_producer_channel;
204216
let executor_background_fn = &identifiers.executor_background_fn;
217+
205218
quote! {
206219
static #executor_producer_channel:
207220
::tokio::sync::OnceCell<::tokio::sync::mpsc::Sender<#channel_type>> = ::tokio::sync::OnceCell::const_new();
208221

209222
async fn #executor_background_fn() -> ::tokio::sync::mpsc::Sender<#channel_type> {
210223
let capacity = #capacity;
211-
let window = ::tokio::time::Duration::from_millis(#window);
224+
let default_window = #default_window;
225+
#windows
226+
212227

213228
let (sender, mut receiver) = tokio::sync::mpsc::channel(capacity);
214229
tokio::task::spawn(async move {
@@ -217,31 +232,39 @@ fn build_executor(
217232
loop {
218233
let mut data_buffer = Vec::with_capacity(capacity);
219234
let mut return_channels: Vec<(::tokio::sync::mpsc::Sender<#returned_type_plural>, usize)> = vec![];
220-
let mut timer = tokio::time::interval(window);
235+
let mut waiting_spans: Vec<::batched::tracing::Span> = vec![];
236+
237+
let window_start = ::std::time::Instant::now();
221238

222239
loop {
240+
let window = windows.iter()
241+
.find(|(max_calls, _)| **max_calls >= data_buffer.len() as u64)
242+
.map(|(_, window)| window);
243+
let window = window.unwrap_or(&default_window);
244+
let window = ::std::time::Duration::from_millis(*window as u64);
245+
246+
let window_end = window_start + window;
247+
let remaining_duration = window_end.duration_since(std::time::Instant::now());
248+
223249
tokio::select! {
224250
event = receiver.recv() => {
225251
if event.is_none() {
226252
return;
227253
}
228254

229-
if data_buffer.is_empty() {
230-
timer.reset();
231-
}
232-
233255
let event: #channel_type = event.unwrap();
234-
let (mut data, channel) = event;
256+
let (mut data, span, channel) = event;
235257

236258
return_channels.push((channel, data.len()));
259+
waiting_spans.push(span);
237260
data_buffer.append(&mut data);
238261

239262
if data_buffer.len() >= capacity {
240263
break;
241264
}
242265
}
243266

244-
_ = timer.tick() => {
267+
_ = ::tokio::time::sleep(remaining_duration) => {
245268
break;
246269
}
247270
}
@@ -252,15 +275,23 @@ fn build_executor(
252275
}
253276

254277
let mut data = vec![];
278+
let mut spans = vec![];
255279
let mut channels = vec![];
256280

257281
std::mem::swap(&mut data, &mut data_buffer);
282+
std::mem::swap(&mut spans, &mut waiting_spans);
258283
std::mem::swap(&mut channels, &mut return_channels);
259284

260285
let permit = semaphore.clone().acquire_owned().await.unwrap();
261286
tokio::task::spawn(async move {
262287
let _permit = permit;
263-
let mut result = #inner_batched(data).await;
288+
let batched_span = ::batched::tracing::info_span!(#batched_span_name, count = data.len());
289+
for mut span in spans {
290+
::batched::tracing::TracingSpan::link_span(&mut span, &batched_span);
291+
}
292+
293+
let future = #inner_batched(data);
294+
let mut result = ::batched::tracing::Instrument::instrument(future, batched_span).await;
264295
for (channel, count) in channels {
265296
#handle_result
266297
let _ = channel.try_send(result);
@@ -282,7 +313,9 @@ fn function_flags(function: &Function) -> (bool, bool) {
282313
match &function.returned.result_type {
283314
FunctionResultType::Result(result, _, _) => {
284315
is_result = true;
285-
if let FunctionResultType::VectorRaw(_) = result.result_type { is_vec = true };
316+
if let FunctionResultType::VectorRaw(_) = result.result_type {
317+
is_vec = true
318+
};
286319
}
287320
FunctionResultType::VectorRaw(_) => is_vec = true,
288321
_ => {}

0 commit comments

Comments
 (0)