Skip to content

Commit 02e6540

Browse files
authored
Add some exponential backoff to k8s watch restarts (#225)
We're still seeing task_proc/tron get stuck in pretty hot restart loops for expired resource versions - hopefully backing off a bit will help out here since one current theory we have is that by hitting the apiserver so hard is causing extra load and further exacerbating the issue. If this doesn't work, we'll likely want to switch to a pattern where we have a reconcilliation thread/process periodically reconciling our state with k8s' on top of having the watch always restart from a resourceVersion of 0 (which skips the initial pod listing and starts the watch "now").
1 parent e7bb15c commit 02e6540

File tree

1 file changed

+31
-1
lines changed

1 file changed

+31
-1
lines changed

task_processing/plugins/kubernetes/kubernetes_pod_executor.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,12 @@
6161
"Succeeded",
6262
"Unknown",
6363
}
64+
# arbitrarily chosen - we may honestly want to consider failing loudly if we actually get to
65+
# this amount of backoff since something is likely very wrong (this would require ~10 retries)
66+
MAX_WATCH_BACKOFF_S = 30
67+
# arbitrarily chosen - 1.5 seemed like a good compromise between multiple retries and giving the
68+
# control plane some breathing room
69+
RETRY_BACKOFF_EXPONENT = 1.5
6470

6571

6672
class KubernetesPodExecutor(TaskExecutor):
@@ -162,6 +168,11 @@ def _initialize_existing_task(self, task_config: KubernetesTaskConfig) -> None:
162168
def _pod_event_watch_loop(
163169
self, kube_client: KubeClient, watch: kube_watch.Watch
164170
) -> None:
171+
# this will generally only be used for recovering from expired resourceVersions exceptions
172+
# we've seen the restart get stuck in a loop - even when we restart the watch anew - so
173+
# let's add a small backoff time to avoid hammering the API server and hopefully avoid this
174+
# since we're not sure if this restart loop is further hampering recovery
175+
retry_attempt = 1
165176
logger.debug(f"Starting watching Pod events for namespace={self.namespace}.")
166177
# TODO: Do LIST + WATCH if we're not starting from a known/good resourceVersion to
167178
# guarantee that we always get ordered events (starting from a resourceVersion of 0)
@@ -180,6 +191,10 @@ def _pod_event_watch_loop(
180191
if not self.stopping:
181192
logger.debug("Adding Pod event to pending event queue.")
182193
self.pending_events.put(pod_event)
194+
195+
# we reset the retry count unconditionally since this is simpler
196+
# than checking if we had any exponential backoff
197+
retry_attempt = 1
183198
else:
184199
break
185200
except ApiException as e:
@@ -191,10 +206,25 @@ def _pod_event_watch_loop(
191206
# every pod in the namespace - so we *shouldn't* miss any events at the
192207
# cost of likely re-processing events we've already handled (which should
193208
# be fine)
194-
watch.resource_version = None
195209
logger.exception(
196210
"Unhandled API exception while watching Pod events - restarting watch!"
197211
)
212+
watch.resource_version = None
213+
# this should be safe since this function runs in its own thread so this
214+
# sleep shouldn't block the workload using this plugin or the other
215+
# task_proc threads - additionally, we don't take any locks in this thread
216+
# so we shouldn't have to worry about any sort of deadlocks :)
217+
backoff_time = min(
218+
retry_attempt * RETRY_BACKOFF_EXPONENT,
219+
MAX_WATCH_BACKOFF_S,
220+
)
221+
logger.info(
222+
"Sleeping for %d seconds on attempt %d before retrying...",
223+
backoff_time,
224+
retry_attempt,
225+
)
226+
time.sleep(backoff_time)
227+
198228
except Exception:
199229
# we want to avoid a potentially misleading log message should we encounter
200230
# an exception when we want to shutdown this thread since nothing of value

0 commit comments

Comments
 (0)