From 4d770bc5c972db15d15e0752ec8fb37d4fd122ff Mon Sep 17 00:00:00 2001 From: Luis Perez Date: Tue, 25 Mar 2025 10:55:47 -0700 Subject: [PATCH] Handle Watch expiration by resetting the resourceVersion We've seen an increase in cases where the Watch (for a reason I don't quite understand yet - some of the things I've been reading suggest that there's maybe something re: k8s API caches and whatnot exipring) starts erroring out with exceptions like: ``` kubernetes.client.exceptions.ApiException: (410) Reason: Expired: too old resource version: 1642912881 (1644329743 ``` While in the past this seems to have generally always fixed itself with the pre-existing restart code, in the aforementioned cases we've seen this loop get stuck in a restart-loop that it we either never recover from until a human intervenes OR it takes long enough to recover that we start losing events. This diff ensures that when we restart the watch loop, we restart as if we were starting a fresh Watch - this does mean that we'll re-process existing events as the current behavior of a `Watch().stream()` includes returning the current state, but that's fine since while GH issues mention that this isn't necessarily ordered, our state machine/transition logic should ensure that out-of-order events don't actually hurt us - at worst, we'll simply waste time. --- .../plugins/kubernetes/kubernetes_pod_executor.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/task_processing/plugins/kubernetes/kubernetes_pod_executor.py b/task_processing/plugins/kubernetes/kubernetes_pod_executor.py index e965880..5e90516 100644 --- a/task_processing/plugins/kubernetes/kubernetes_pod_executor.py +++ b/task_processing/plugins/kubernetes/kubernetes_pod_executor.py @@ -163,14 +163,12 @@ def _pod_event_watch_loop( self, kube_client: KubeClient, watch: kube_watch.Watch ) -> None: logger.debug(f"Starting watching Pod events for namespace={self.namespace}.") - # TODO(TASKPROC-243): we'll need to correctly handle resourceVersion expiration for the case - # where the gap between task_proc shutting down and coming back up is long enough for data - # to have expired from etcd as well as actually restarting from the last resourceVersion in - # case of an exception # TODO: Do LIST + WATCH if we're not starting from a known/good resourceVersion to # guarantee that we always get ordered events (starting from a resourceVersion of 0) # has no such guarantees that the initial events will be ordered in any meaningful way # see: https://github.com/kubernetes/kubernetes/issues/74022 + # NOTE: we'll also probably want to add some sort of reconciliation if we do ^ + # since it's possible that we'll have skipped some events while not self.stopping: try: for pod_event in watch.stream( @@ -187,6 +185,13 @@ def _pod_event_watch_loop( except ApiException as e: if not self.stopping: if not kube_client.maybe_reload_on_exception(exception=e): + # we reset the resource version since it's possible that the last one is no + # longer able to be resumed from + # NOTE: this means that when we restart the stream, we'll get events for + # every pod in the namespace - so we *shouldn't* miss any events at the + # cost of likely re-processing events we've already handled (which should + # be fine) + watch.resource_version = None logger.exception( "Unhandled API exception while watching Pod events - restarting watch!" )