Skip to content

Commit 45b1d7e

Browse files
authored
Add bounds check for heartbeat interval (#1068)
* add bounds check for heartbeat * lint * Fix integ-tests * Add logs * Revert add logs, change assert to account for rounding to 0.0
1 parent 67c986a commit 45b1d7e

File tree

2 files changed

+32
-16
lines changed

2 files changed

+32
-16
lines changed

crates/sdk-core/src/lib.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -241,19 +241,35 @@ pub struct CoreRuntime {
241241
}
242242

243243
/// Holds telemetry options, as well as worker heartbeat_interval. Construct with [RuntimeOptionsBuilder]
244-
#[derive(derive_builder::Builder)]
244+
#[derive(Default, derive_builder::Builder)]
245+
#[builder(build_fn(validate = "Self::validate"))]
245246
#[non_exhaustive]
246-
#[derive(Default)]
247247
pub struct RuntimeOptions {
248248
/// Telemetry configuration options.
249249
#[builder(default)]
250250
telemetry_options: TelemetryOptions,
251251
/// Optional worker heartbeat interval - This configures the heartbeat setting of all
252252
/// workers created using this runtime.
253+
///
254+
/// Interval must be between 1s and 60s, inclusive.
253255
#[builder(default = "Some(Duration::from_secs(60))")]
254256
heartbeat_interval: Option<Duration>,
255257
}
256258

259+
impl RuntimeOptionsBuilder {
260+
fn validate(&self) -> Result<(), String> {
261+
if let Some(Some(interval)) = self.heartbeat_interval
262+
&& (interval < Duration::from_secs(1) || interval > Duration::from_secs(60))
263+
{
264+
return Err(format!(
265+
"heartbeat_interval ({interval:?}) must be between 1s and 60s",
266+
));
267+
}
268+
269+
Ok(())
270+
}
271+
}
272+
257273
/// Wraps a [tokio::runtime::Builder] to allow layering multiple on_thread_start functions
258274
pub struct TokioRuntimeBuilder<F> {
259275
/// The underlying tokio runtime builder

crates/sdk-core/tests/integ_tests/worker_heartbeat_tests.rs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ fn within_duration(dur: PbDuration, threshold: Duration) -> bool {
5656
fn new_no_metrics_starter(wf_name: &str) -> CoreWfStarter {
5757
let runtimeopts = RuntimeOptionsBuilder::default()
5858
.telemetry_options(TelemetryOptionsBuilder::default().build().unwrap())
59-
.heartbeat_interval(Some(Duration::from_millis(100)))
59+
.heartbeat_interval(Some(Duration::from_secs(1)))
6060
.build()
6161
.unwrap();
6262
CoreWfStarter::new_with_runtime(wf_name, CoreRuntime::new_assume_tokio(runtimeopts).unwrap())
@@ -105,7 +105,7 @@ async fn docker_worker_heartbeat_basic(#[values("otel", "prom", "no_metrics")] b
105105
};
106106
let runtimeopts = RuntimeOptionsBuilder::default()
107107
.telemetry_options(telemopts)
108-
.heartbeat_interval(Some(Duration::from_millis(100)))
108+
.heartbeat_interval(Some(Duration::from_secs(1)))
109109
.build()
110110
.unwrap();
111111
let mut rt = CoreRuntime::new_assume_tokio(runtimeopts).unwrap();
@@ -153,7 +153,7 @@ async fn docker_worker_heartbeat_basic(#[values("otel", "prom", "no_metrics")] b
153153
ctx.activity(ActivityOptions {
154154
activity_type: "pass_fail_act".to_string(),
155155
input: "pass".as_json_payload().expect("serializes fine"),
156-
start_to_close_timeout: Some(Duration::from_secs(1)),
156+
start_to_close_timeout: Some(Duration::from_secs(5)),
157157
..Default::default()
158158
})
159159
.await;
@@ -184,7 +184,7 @@ async fn docker_worker_heartbeat_basic(#[values("otel", "prom", "no_metrics")] b
184184

185185
let test_fut = async {
186186
// Give enough time to ensure heartbeat interval has been hit
187-
tokio::time::sleep(Duration::from_millis(110)).await;
187+
tokio::time::sleep(Duration::from_millis(1500)).await;
188188
acts_started.notified().await;
189189
let client = starter.get_client().await;
190190
let mut raw_client = (*client).clone();
@@ -267,7 +267,7 @@ async fn docker_worker_heartbeat_tuner() {
267267
}
268268
let runtimeopts = RuntimeOptionsBuilder::default()
269269
.telemetry_options(get_integ_telem_options())
270-
.heartbeat_interval(Some(Duration::from_millis(100)))
270+
.heartbeat_interval(Some(Duration::from_secs(1)))
271271
.build()
272272
.unwrap();
273273
let mut rt = CoreRuntime::new_assume_tokio(runtimeopts).unwrap();
@@ -454,8 +454,8 @@ fn after_shutdown_checks(
454454
assert!(!host_info.host_name.is_empty());
455455
assert!(!host_info.process_key.is_empty());
456456
assert!(!host_info.process_id.is_empty());
457-
assert_ne!(host_info.current_host_cpu_usage, 0.0);
458-
assert_ne!(host_info.current_host_mem_usage, 0.0);
457+
assert!(host_info.current_host_cpu_usage >= 0.0);
458+
assert!(host_info.current_host_mem_usage >= 0.0);
459459

460460
assert!(heartbeat.task_queue.starts_with(wf_name));
461461
assert_eq!(
@@ -479,7 +479,7 @@ fn after_shutdown_checks(
479479
);
480480
assert!(within_duration(
481481
heartbeat.elapsed_since_last_heartbeat.unwrap(),
482-
Duration::from_millis(200)
482+
Duration::from_millis(2000)
483483
));
484484

485485
let workflow_task_slots = heartbeat.workflow_task_slots_info.clone().unwrap();
@@ -647,7 +647,7 @@ async fn worker_heartbeat_sticky_cache_miss() {
647647

648648
tokio::join!(orchestrator, runner);
649649

650-
sleep(Duration::from_millis(200)).await;
650+
sleep(Duration::from_secs(2)).await;
651651
let mut heartbeats =
652652
list_worker_heartbeats(&client, format!("WorkerInstanceKey=\"{worker_key}\"")).await;
653653
assert_eq!(heartbeats.len(), 1);
@@ -694,7 +694,7 @@ async fn worker_heartbeat_multiple_workers() {
694694
let _ = starter_b.start_with_worker(wf_name, &mut worker_b).await;
695695
worker_b.run_until_done().await.unwrap();
696696

697-
sleep(Duration::from_millis(200)).await;
697+
sleep(Duration::from_secs(2)).await;
698698

699699
let all = list_worker_heartbeats(&client, String::new()).await;
700700
let keys: HashSet<_> = all
@@ -776,7 +776,7 @@ async fn worker_heartbeat_failure_metrics() {
776776
.activity(ActivityOptions {
777777
activity_type: "failing_act".to_string(),
778778
input: "boom".as_json_payload().expect("serialize"),
779-
start_to_close_timeout: Some(Duration::from_secs(1)),
779+
start_to_close_timeout: Some(Duration::from_secs(5)),
780780
retry_policy: Some(RetryPolicy {
781781
initial_interval: Some(prost_dur!(from_millis(10))),
782782
backoff_coefficient: 1.0,
@@ -859,7 +859,7 @@ async fn worker_heartbeat_failure_metrics() {
859859
}
860860
Err("activity_slots.last_interval_failure_tasks still 0, retrying")
861861
},
862-
Duration::from_millis(150),
862+
Duration::from_millis(1500),
863863
)
864864
.await
865865
.unwrap();
@@ -901,7 +901,7 @@ async fn worker_heartbeat_failure_metrics() {
901901
}
902902
Err("workflow_slots.last_interval_failure_tasks still 0, retrying")
903903
},
904-
Duration::from_millis(150),
904+
Duration::from_millis(1500),
905905
)
906906
.await
907907
.unwrap();
@@ -1000,7 +1000,7 @@ async fn worker_heartbeat_skip_client_worker_set_check() {
10001000
let wf_name = "worker_heartbeat_skip_client_worker_set_check";
10011001
let runtimeopts = RuntimeOptionsBuilder::default()
10021002
.telemetry_options(get_integ_telem_options())
1003-
.heartbeat_interval(Some(Duration::from_millis(100)))
1003+
.heartbeat_interval(Some(Duration::from_secs(1)))
10041004
.build()
10051005
.unwrap();
10061006
let rt = CoreRuntime::new_assume_tokio(runtimeopts).unwrap();

0 commit comments

Comments
 (0)