Skip to content

Commit 2d30586

Browse files
KubernetesPodOperator PodManager retries during create pod on too many requests error (#58033)
* Retry create pod also on too many requests issue * Fix unit test * fix static checks --------- Co-authored-by: AutomationDev85 <AutomationDev85>
1 parent 5b54ebf commit 2d30586

File tree

3 files changed

+135
-61
lines changed

3 files changed

+135
-61
lines changed

providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py

Lines changed: 75 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import asyncio
2222
import enum
23+
import functools
2324
import json
2425
import math
2526
import time
@@ -38,6 +39,7 @@
3839
from pendulum.parsing.exceptions import ParserError
3940
from urllib3.exceptions import HTTPError, TimeoutError
4041

42+
from airflow.configuration import conf
4143
from airflow.exceptions import AirflowException
4244
from airflow.providers.cncf.kubernetes.callbacks import ExecutionMode, KubernetesPodOperatorCallback
4345
from airflow.providers.cncf.kubernetes.utils.container import (
@@ -71,17 +73,76 @@
7173
"""
7274

7375

74-
class PodLaunchFailedException(AirflowException):
75-
"""When pod launching fails in KubernetesPodOperator."""
76+
API_RETRIES = conf.getint("workers", "api_retries", fallback=5)
77+
API_RETRY_WAIT_MIN = conf.getfloat("workers", "api_retry_wait_min", fallback=1)
78+
API_RETRY_WAIT_MAX = conf.getfloat("workers", "api_retry_wait_max", fallback=15)
79+
80+
_default_wait = tenacity.wait_exponential(min=API_RETRY_WAIT_MIN, max=API_RETRY_WAIT_MAX)
81+
82+
83+
def get_retry_after_seconds(retry_state) -> int:
84+
"""Extract Retry-After header from ApiException if present and log wait time."""
85+
exception = retry_state.outcome.exception() if retry_state.outcome else None
86+
if exception and isinstance(exception, ApiException) and str(exception.status) == "429":
87+
retry_after = exception.headers.get("Retry-After") if exception.headers else None
88+
if retry_after:
89+
try:
90+
wait_seconds = int(retry_after)
91+
return wait_seconds
92+
except ValueError:
93+
pass
94+
# Default exponential backoff
95+
wait_seconds = int(_default_wait(retry_state))
96+
return wait_seconds
97+
98+
99+
def generic_api_retry(func):
100+
"""Apply tenacity retry logic for generic Kubernetes API calls."""
101+
102+
@functools.wraps(func)
103+
def wrapper(*args, **kwargs):
104+
retry_decorator = tenacity.retry(
105+
stop=tenacity.stop_after_attempt(API_RETRIES),
106+
wait=get_retry_after_seconds,
107+
reraise=True,
108+
)
109+
return retry_decorator(func)(*args, **kwargs)
110+
111+
return wrapper
76112

77113

78114
def should_retry_start_pod(exception: BaseException) -> bool:
79115
"""Check if an Exception indicates a transient error and warrants retrying."""
80116
if isinstance(exception, ApiException):
81-
return str(exception.status) == "409"
117+
# Retry on 409 (conflict) and 429 (too many requests)
118+
return str(exception.status) in ("409", "429")
82119
return False
83120

84121

122+
def create_pod_api_retry(func):
123+
"""
124+
Apply tenacity retry logic for pod creation.
125+
126+
Retries on 409 and 429 errors, and respects Retry-After header for 429.
127+
"""
128+
129+
@functools.wraps(func)
130+
def wrapper(*args, **kwargs):
131+
retry_decorator = tenacity.retry(
132+
stop=tenacity.stop_after_attempt(API_RETRIES),
133+
wait=get_retry_after_seconds,
134+
reraise=True,
135+
retry=tenacity.retry_if_exception(should_retry_start_pod),
136+
)
137+
return retry_decorator(func)(*args, **kwargs)
138+
139+
return wrapper
140+
141+
142+
class PodLaunchFailedException(AirflowException):
143+
"""When pod launching fails in KubernetesPodOperator."""
144+
145+
85146
class PodPhase:
86147
"""
87148
Possible pod phases.
@@ -355,12 +416,7 @@ def delete_pod(self, pod: V1Pod) -> None:
355416
if str(e.status) != "404":
356417
raise
357418

358-
@tenacity.retry(
359-
stop=tenacity.stop_after_attempt(3),
360-
wait=tenacity.wait_random_exponential(),
361-
reraise=True,
362-
retry=tenacity.retry_if_exception(should_retry_start_pod),
363-
)
419+
@create_pod_api_retry
364420
def create_pod(self, pod: V1Pod) -> V1Pod:
365421
"""Launch the pod asynchronously."""
366422
return self.run_pod_async(pod)
@@ -718,7 +774,7 @@ def container_is_terminated(self, pod: V1Pod, container_name: str) -> bool:
718774
remote_pod = self.read_pod(pod)
719775
return container_is_terminated(pod=remote_pod, container_name=container_name)
720776

721-
@tenacity.retry(stop=tenacity.stop_after_attempt(6), wait=tenacity.wait_exponential(max=15), reraise=True)
777+
@generic_api_retry
722778
def read_pod_logs(
723779
self,
724780
pod: V1Pod,
@@ -761,7 +817,7 @@ def read_pod_logs(
761817
post_termination_timeout=post_termination_timeout,
762818
)
763819

764-
@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
820+
@generic_api_retry
765821
def get_init_container_names(self, pod: V1Pod) -> list[str]:
766822
"""
767823
Return container names from the POD except for the airflow-xcom-sidecar container.
@@ -770,7 +826,7 @@ def get_init_container_names(self, pod: V1Pod) -> list[str]:
770826
"""
771827
return [container_spec.name for container_spec in pod.spec.init_containers]
772828

773-
@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
829+
@generic_api_retry
774830
def get_container_names(self, pod: V1Pod) -> list[str]:
775831
"""
776832
Return container names from the POD except for the airflow-xcom-sidecar container.
@@ -784,7 +840,7 @@ def get_container_names(self, pod: V1Pod) -> list[str]:
784840
if container_spec.name != PodDefaults.SIDECAR_CONTAINER_NAME
785841
]
786842

787-
@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
843+
@generic_api_retry
788844
def read_pod_events(self, pod: V1Pod) -> CoreV1EventList:
789845
"""Read events from the POD."""
790846
try:
@@ -794,7 +850,7 @@ def read_pod_events(self, pod: V1Pod) -> CoreV1EventList:
794850
except HTTPError as e:
795851
raise AirflowException(f"There was an error reading the kubernetes API: {e}")
796852

797-
@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
853+
@generic_api_retry
798854
def read_pod(self, pod: V1Pod) -> V1Pod:
799855
"""Read POD information."""
800856
try:
@@ -839,11 +895,7 @@ def extract_xcom(self, pod: V1Pod) -> str:
839895
finally:
840896
self.extract_xcom_kill(pod)
841897

842-
@tenacity.retry(
843-
stop=tenacity.stop_after_attempt(5),
844-
wait=tenacity.wait_exponential(multiplier=1, min=4, max=10),
845-
reraise=True,
846-
)
898+
@generic_api_retry
847899
def extract_xcom_json(self, pod: V1Pod) -> str:
848900
"""Retrieve XCom value and also check if xcom json is valid."""
849901
command = (
@@ -884,11 +936,7 @@ def extract_xcom_json(self, pod: V1Pod) -> str:
884936
raise AirflowException(f"Failed to extract xcom from pod: {pod.metadata.name}")
885937
return result
886938

887-
@tenacity.retry(
888-
stop=tenacity.stop_after_attempt(5),
889-
wait=tenacity.wait_exponential(multiplier=1, min=4, max=10),
890-
reraise=True,
891-
)
939+
@generic_api_retry
892940
def extract_xcom_kill(self, pod: V1Pod):
893941
"""Kill xcom sidecar container."""
894942
with closing(
@@ -992,15 +1040,15 @@ def __init__(
9921040
self._callbacks = callbacks or []
9931041
self.stop_watching_events = False
9941042

995-
@tenacity.retry(stop=tenacity.stop_after_attempt(5), wait=tenacity.wait_exponential(), reraise=True)
1043+
@generic_api_retry
9961044
async def read_pod(self, pod: V1Pod) -> V1Pod:
9971045
"""Read POD information."""
9981046
return await self._hook.get_pod(
9991047
pod.metadata.name,
10001048
pod.metadata.namespace,
10011049
)
10021050

1003-
@tenacity.retry(stop=tenacity.stop_after_attempt(5), wait=tenacity.wait_exponential(), reraise=True)
1051+
@generic_api_retry
10041052
async def read_pod_events(self, pod: V1Pod) -> CoreV1EventList:
10051053
"""Get pod's events."""
10061054
return await self._hook.get_pod_events(
@@ -1034,7 +1082,7 @@ async def await_pod_start(
10341082
check_interval=check_interval,
10351083
)
10361084

1037-
@tenacity.retry(stop=tenacity.stop_after_attempt(5), wait=tenacity.wait_exponential(), reraise=True)
1085+
@generic_api_retry
10381086
async def fetch_container_logs_before_current_sec(
10391087
self, pod: V1Pod, container_name: str, since_time: DateTime | None = None
10401088
) -> DateTime | None:

providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,11 @@ async def test_running_log_interval(
297297

298298
mock_datetime.timedelta = datetime.timedelta
299299
mock_datetime.timezone = datetime.timezone
300-
mock_fetch_container_logs_before_current_sec.return_value = DateTime(2022, 1, 1)
300+
301+
async def async_datetime_return(*args, **kwargs):
302+
return DateTime(2022, 1, 1)
303+
304+
mock_fetch_container_logs_before_current_sec.side_effect = async_datetime_return
301305
define_container_state.side_effect = ["running", "running", "terminated"]
302306
trigger = KubernetesPodTrigger(
303307
pod_name=POD_NAME,

0 commit comments

Comments
 (0)