Skip to content

Commit 91d734f

Browse files
bug: Add controller readiness barrier (#307)
Introduce a `tokio::sync::Barrier` to the shared controller state. Each main controller waits on this barrier after completing its initial setup, including dynamic watch registration and initial reconciliation. This allows external components or health checks to synchronize and wait for all core controllers to be ready before proceeding. Signed-off-by: Danil-Grigorev <[email protected]>
1 parent bd12a70 commit 91d734f

File tree

3 files changed

+18
-12
lines changed

3 files changed

+18
-12
lines changed

src/controller.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use kube::{
2929
},
3030
};
3131
use kube::{Resource, ResourceExt};
32+
use tokio::sync::Barrier;
3233

3334
use std::collections::BTreeMap;
3435

@@ -56,6 +57,9 @@ pub struct State {
5657

5758
// k8s api server minor version
5859
pub version: u32,
60+
61+
// Controller readiness barrier
62+
pub barrier: Arc<Barrier>,
5963
}
6064

6165
#[derive(Parser, Debug, Clone, Default)]
@@ -76,6 +80,7 @@ impl State {
7680
diagnostics: Default::default(),
7781
stream: BroadcastStream::new(Default::default()),
7882
version,
83+
barrier: Arc::new(Barrier::new(3)),
7984
}
8085
}
8186

@@ -98,6 +103,7 @@ impl State {
98103
dispatcher: self.dispatcher.clone(),
99104
stream: self.stream.clone(),
100105
version: self.version,
106+
barrier: self.barrier.clone(),
101107
})
102108
}
103109
}
@@ -181,6 +187,9 @@ pub async fn run_fleet_addon_config_controller(state: State) {
181187
.await
182188
.expect("Initial dynamic watches setup to succeed");
183189

190+
// Signal that this controller is ready
191+
state.barrier.wait().await;
192+
184193
tokio::select! {
185194
_ = watcher => {panic!("This should not happen before controllers exit")},
186195
_ = futures::future::join(dynamic_watches_controller, config_controller) => {}
@@ -324,6 +333,9 @@ pub async fn run_cluster_controller(state: State) {
324333
.default_backoff()
325334
.for_each(|_| futures::future::ready(()));
326335

336+
// Signal that this controller is ready
337+
state.barrier.wait().await;
338+
327339
tokio::join!(clusters, ns_controller);
328340
}
329341

@@ -374,6 +386,9 @@ pub async fn run_cluster_class_controller(state: State) {
374386
.default_backoff()
375387
.for_each(|_| futures::future::ready(()));
376388

389+
// Signal that this controller is ready
390+
state.barrier.wait().await;
391+
377392
tokio::join!(group_controller, cluster_class_controller);
378393
}
379394

src/controllers/controller.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use tracing::field::display;
2323
use std::fmt::Debug;
2424
use std::pin::Pin;
2525
use std::sync::Arc;
26-
use tokio::sync::RwLock;
26+
use tokio::sync::{Barrier, RwLock};
2727
use tracing::{self, debug, info, instrument, Span};
2828

2929
use super::{
@@ -51,6 +51,8 @@ pub struct Context {
5151
pub stream: BroadcastStream<DynamicStream>,
5252
// k8s minor version
5353
pub version: u32,
54+
// Controller readiness barrier
55+
pub barrier: Arc<Barrier>,
5456
}
5557

5658
#[instrument(skip_all, fields(name = res.name_any(), namespace = res.namespace(), api_version = typed_gvk::<R>(()).api_version(), kind = R::kind(&()).to_string()), err)]

src/multi_dispatcher.rs

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ use std::{
33
pin::Pin,
44
sync::Arc,
55
task::{Context, Poll},
6-
time::Duration,
76
};
87

98
use async_broadcast::{InactiveReceiver, Receiver, Sender};
@@ -19,7 +18,6 @@ use kube::{
1918
};
2019
use pin_project::pin_project;
2120
use serde::de::DeserializeOwned;
22-
use tokio::time::sleep;
2321

2422
#[derive(Clone)]
2523
pub struct MultiDispatcher {
@@ -72,11 +70,6 @@ impl MultiDispatcher {
7270
}
7371
}
7472
}
75-
76-
// Subscribers count returns the number of current receiving streams
77-
pub(crate) fn subscribers_count(&self) -> usize {
78-
self.dispatch_tx.receiver_count()
79-
}
8073
}
8174

8275
/// `BroadcastStream` allows to stream shared list of dynamic objects,
@@ -230,10 +223,6 @@ where
230223
W: Stream<Item = Result<Event<DynamicObject>>> + Unpin,
231224
{
232225
stream! {
233-
while writer.subscribers_count() == 0 {
234-
sleep(Duration::from_millis(100)).await;
235-
}
236-
237226
while let Some(event) = broadcast.next().await {
238227
match event {
239228
Ok(ev) => {

0 commit comments

Comments
 (0)