diff --git a/task_processing/plugins/kubernetes/kubernetes_pod_executor.py b/task_processing/plugins/kubernetes/kubernetes_pod_executor.py index e965880..5e90516 100644 --- a/task_processing/plugins/kubernetes/kubernetes_pod_executor.py +++ b/task_processing/plugins/kubernetes/kubernetes_pod_executor.py @@ -163,14 +163,12 @@ def _pod_event_watch_loop( self, kube_client: KubeClient, watch: kube_watch.Watch ) -> None: logger.debug(f"Starting watching Pod events for namespace={self.namespace}.") - # TODO(TASKPROC-243): we'll need to correctly handle resourceVersion expiration for the case - # where the gap between task_proc shutting down and coming back up is long enough for data - # to have expired from etcd as well as actually restarting from the last resourceVersion in - # case of an exception # TODO: Do LIST + WATCH if we're not starting from a known/good resourceVersion to # guarantee that we always get ordered events (starting from a resourceVersion of 0) # has no such guarantees that the initial events will be ordered in any meaningful way # see: https://github.com/kubernetes/kubernetes/issues/74022 + # NOTE: we'll also probably want to add some sort of reconciliation if we do ^ + # since it's possible that we'll have skipped some events while not self.stopping: try: for pod_event in watch.stream( @@ -187,6 +185,13 @@ def _pod_event_watch_loop( except ApiException as e: if not self.stopping: if not kube_client.maybe_reload_on_exception(exception=e): + # we reset the resource version since it's possible that the last one is no + # longer able to be resumed from + # NOTE: this means that when we restart the stream, we'll get events for + # every pod in the namespace - so we *shouldn't* miss any events at the + # cost of likely re-processing events we've already handled (which should + # be fine) + watch.resource_version = None logger.exception( "Unhandled API exception while watching Pod events - restarting watch!" )