-
Notifications
You must be signed in to change notification settings - Fork 24
Add standard retry mode #545
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: standard-retries
Are you sure you want to change the base?
Changes from all commits
989a343
006c5e2
0e5e2eb
1cc6631
fe4b444
a99562e
9199b8e
3dae830
d96c0db
21cde20
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,4 @@ | ||
| { | ||
| "type": "feature", | ||
| "description": "Added support for `standard` retry mode." | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,7 @@ | ||
| # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| import random | ||
| import threading | ||
| from collections.abc import Callable | ||
| from dataclasses import dataclass | ||
| from enum import Enum | ||
|
|
@@ -207,7 +208,7 @@ def __init__( | |
| def acquire_initial_retry_token( | ||
| self, *, token_scope: str | None = None | ||
| ) -> SimpleRetryToken: | ||
| """Called before any retries (for the first attempt at the operation). | ||
| """Create a base retry token for the start of a request. | ||
|
|
||
| :param token_scope: This argument is ignored by this retry strategy. | ||
| """ | ||
|
|
@@ -242,3 +243,174 @@ def refresh_retry_token_for_retry( | |
|
|
||
| def record_success(self, *, token: retries_interface.RetryToken) -> None: | ||
| """Not used by this retry strategy.""" | ||
|
|
||
|
|
||
| class StandardRetryQuota: | ||
| """Retry quota used by :py:class:`StandardRetryStrategy`.""" | ||
|
|
||
| INITIAL_RETRY_TOKENS: int = 500 | ||
| RETRY_COST: int = 5 | ||
| NO_RETRY_INCREMENT: int = 1 | ||
| TIMEOUT_RETRY_COST: int = 10 | ||
|
|
||
| def __init__(self, initial_capacity: int = INITIAL_RETRY_TOKENS): | ||
| """Initialize retry quota with configurable capacity. | ||
|
|
||
| :param initial_capacity: The initial and maximum capacity for the retry quota. | ||
| """ | ||
| self._max_capacity = initial_capacity | ||
| self._available_capacity = initial_capacity | ||
| self._lock = threading.Lock() | ||
|
|
||
| def acquire(self, *, error: Exception) -> int: | ||
| """Attempt to acquire capacity for a retry attempt. | ||
|
|
||
| If there's insufficient capacity available, raise an exception. | ||
| Otherwise, return the amount of capacity successfully allocated. | ||
| """ | ||
| capacity_amount = self.RETRY_COST | ||
|
|
||
| with self._lock: | ||
| if capacity_amount > self._available_capacity: | ||
| raise RetryError("Retry quota exceeded") | ||
| self._available_capacity -= capacity_amount | ||
| return capacity_amount | ||
|
|
||
| def release(self, *, release_amount: int) -> None: | ||
| """Release capacity back to the retry quota. | ||
|
|
||
| The capacity being released will be truncated if necessary to ensure the max | ||
| capacity is never exceeded. | ||
| """ | ||
| increment = self.NO_RETRY_INCREMENT if release_amount == 0 else release_amount | ||
|
|
||
| if self._available_capacity == self._max_capacity: | ||
| return | ||
|
|
||
| with self._lock: | ||
| self._available_capacity = min( | ||
| self._available_capacity + increment, self._max_capacity | ||
| ) | ||
|
|
||
| @property | ||
| def available_capacity(self) -> int: | ||
| """Return the amount of capacity available.""" | ||
| return self._available_capacity | ||
|
|
||
|
|
||
| @dataclass(kw_only=True) | ||
| class StandardRetryToken: | ||
| retry_count: int | ||
| """Retry count is the total number of attempts minus the initial attempt.""" | ||
|
|
||
| retry_delay: float | ||
| """Delay in seconds to wait before the retry attempt.""" | ||
|
|
||
| quota_consumed: int = 0 | ||
| """The total amount of quota consumed.""" | ||
|
|
||
| last_quota_acquired: int = 0 | ||
| """The amount of last quota acquired.""" | ||
|
|
||
|
|
||
| class StandardRetryStrategy(retries_interface.RetryStrategy): | ||
| def __init__( | ||
| self, | ||
| *, | ||
| backoff_strategy: retries_interface.RetryBackoffStrategy | None = None, | ||
| max_attempts: int = 3, | ||
| retry_quota: StandardRetryQuota | None = None, | ||
| ): | ||
| """Standard retry strategy using truncated binary exponential backoff with full | ||
| jitter. | ||
|
|
||
| :param backoff_strategy: The backoff strategy used by returned tokens to compute | ||
| the retry delay. Defaults to :py:class:`ExponentialRetryBackoffStrategy`. | ||
|
|
||
| :param max_attempts: Upper limit on total number of attempts made, including | ||
| initial attempt and retries. | ||
|
|
||
| :param retry_quota: The retry quota to use for managing retry capacity. Defaults | ||
| to a new :py:class:`StandardRetryQuota` instance. | ||
| """ | ||
| if max_attempts < 0: | ||
| raise ValueError( | ||
| f"max_attempts must be a non-negative integer, got {max_attempts}" | ||
| ) | ||
|
|
||
| self.backoff_strategy = backoff_strategy or ExponentialRetryBackoffStrategy( | ||
| backoff_scale_value=1, | ||
alexgromero marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| max_backoff=20, | ||
| jitter_type=ExponentialBackoffJitterType.FULL, | ||
| ) | ||
| self.max_attempts = max_attempts | ||
| self._retry_quota = retry_quota or StandardRetryQuota() | ||
|
|
||
| def acquire_initial_retry_token( | ||
| self, *, token_scope: str | None = None | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why does this take
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That makes sense. I added it because it's in the Should we remove it from the interface entirely or is there a use case where we'd want this param?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can leave it as is for the moment, but let's talk to Jordon about long term intent. I'll need to go look at the reference architecture again to see if there's info there. It seems like we should have our reasoning in the design but it's not there right now. That'll help future implementers both on our team and externally. |
||
| ) -> StandardRetryToken: | ||
| """Create a base retry token for the start of a request. | ||
|
|
||
| :param token_scope: This argument is ignored by this retry strategy. | ||
| """ | ||
| retry_delay = self.backoff_strategy.compute_next_backoff_delay(0) | ||
| return StandardRetryToken(retry_count=0, retry_delay=retry_delay) | ||
|
|
||
| def refresh_retry_token_for_retry( | ||
| self, | ||
| *, | ||
| token_to_renew: retries_interface.RetryToken, | ||
| error: Exception, | ||
| ) -> StandardRetryToken: | ||
| """Replace an existing retry token from a failed attempt with a new token. | ||
|
|
||
| This retry strategy always returns a token until the attempt count stored in | ||
| the new token exceeds the ``max_attempts`` value. | ||
|
|
||
| :param token_to_renew: The token used for the previous failed attempt. | ||
| :param error: The error that triggered the need for a retry. | ||
| :raises RetryError: If no further retry attempts are allowed. | ||
| """ | ||
| if not isinstance(token_to_renew, StandardRetryToken): | ||
| raise TypeError( | ||
| f"StandardRetryStrategy requires StandardRetryToken, got {type(token_to_renew).__name__}" | ||
| ) | ||
|
|
||
| if isinstance(error, retries_interface.ErrorRetryInfo) and error.is_retry_safe: | ||
| retry_count = token_to_renew.retry_count + 1 | ||
| if retry_count >= self.max_attempts: | ||
| raise RetryError( | ||
| f"Reached maximum number of allowed attempts: {self.max_attempts}" | ||
| ) from error | ||
|
|
||
| # Acquire additional quota for this retry attempt | ||
| # (may raise a RetryError if none is available) | ||
| quota_acquired = self._retry_quota.acquire(error=error) | ||
| total_quota: int = token_to_renew.quota_consumed + quota_acquired | ||
|
|
||
| if error.retry_after is not None: | ||
| retry_delay = error.retry_after | ||
| else: | ||
| retry_delay = self.backoff_strategy.compute_next_backoff_delay( | ||
| retry_count | ||
| ) | ||
|
|
||
| return StandardRetryToken( | ||
| retry_count=retry_count, | ||
| retry_delay=retry_delay, | ||
| quota_consumed=total_quota, | ||
| last_quota_acquired=quota_acquired, | ||
| ) | ||
| else: | ||
| raise RetryError(f"Error is not retryable: {error}") from error | ||
|
|
||
| def record_success(self, *, token: retries_interface.RetryToken) -> None: | ||
| """Release retry quota back based on the amount consumed by the last retry. | ||
|
|
||
| :param token: The token used for the previous successful attempt. | ||
| """ | ||
| if not isinstance(token, StandardRetryToken): | ||
| raise TypeError( | ||
| f"StandardRetryStrategy requires StandardRetryToken, got {type(token).__name__}" | ||
| ) | ||
| self._retry_quota.release(release_amount=token.last_quota_acquired) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are we doing with this? It looks like we're tracking it on the token but I don't actually see it used elsewhere?