Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion task_processing/plugins/kubernetes/kubernetes_pod_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@
"Succeeded",
"Unknown",
}
# arbitrarily chosen - we may honestly want to consider failing loudly if we actually get to
# this amount of backoff since something is likely very wrong (this would require ~10 retries)
MAX_WATCH_BACKOFF_S = 30
# arbitrarily chosen - 1.5 seemed like a good compromise between multiple retries and giving the
# control plane some breathing room
RETRY_BACKOFF_EXPONENT = 1.5


class KubernetesPodExecutor(TaskExecutor):
Expand Down Expand Up @@ -162,6 +168,11 @@ def _initialize_existing_task(self, task_config: KubernetesTaskConfig) -> None:
def _pod_event_watch_loop(
self, kube_client: KubeClient, watch: kube_watch.Watch
) -> None:
# this will generally only be used for recovering from expired resourceVersions exceptions
# we've seen the restart get stuck in a loop - even when we restart the watch anew - so
# let's add a small backoff time to avoid hammering the API server and hopefully avoid this
# since we're not sure if this restart loop is further hampering recovery
retry_attempt = 1
logger.debug(f"Starting watching Pod events for namespace={self.namespace}.")
# TODO: Do LIST + WATCH if we're not starting from a known/good resourceVersion to
# guarantee that we always get ordered events (starting from a resourceVersion of 0)
Expand All @@ -180,6 +191,10 @@ def _pod_event_watch_loop(
if not self.stopping:
logger.debug("Adding Pod event to pending event queue.")
self.pending_events.put(pod_event)

# we reset the retry count unconditionally since this is simpler
# than checking if we had any exponential backoff
retry_attempt = 1
else:
break
except ApiException as e:
Expand All @@ -191,10 +206,25 @@ def _pod_event_watch_loop(
# every pod in the namespace - so we *shouldn't* miss any events at the
# cost of likely re-processing events we've already handled (which should
# be fine)
watch.resource_version = None
logger.exception(
"Unhandled API exception while watching Pod events - restarting watch!"
)
watch.resource_version = None
# this should be safe since this function runs in its own thread so this
# sleep shouldn't block the workload using this plugin or the other
# task_proc threads - additionally, we don't take any locks in this thread
# so we shouldn't have to worry about any sort of deadlocks :)
backoff_time = min(
retry_attempt * RETRY_BACKOFF_EXPONENT,
MAX_WATCH_BACKOFF_S,
)
logger.info(
"Sleeping for %d seconds on attempt %d before retrying...",
backoff_time,
retry_attempt,
)
time.sleep(backoff_time)

except Exception:
# we want to avoid a potentially misleading log message should we encounter
# an exception when we want to shutdown this thread since nothing of value
Expand Down