-
Notifications
You must be signed in to change notification settings - Fork 30
feat: pagination reset #781
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 7 commits
Commits
Show all changes
15 commits
Select commit
Hold shift + click to select a range
a2bc95d
pagination reset
maxi297 f90b110
fix case with custom retriever
maxi297 3fb450c
mypy
maxi297 37698f5
fix warnings
maxi297 2a50950
fixes while testing
maxi297 2029415
mypy + format
maxi297 ab5d315
mypy
maxi297 26c2735
mypy
maxi297 cc4da11
format
maxi297 c8e5c1e
Merge branch 'main' into maxi297/pagination_reset
maxi297 f247e93
One cursor per PaginationTracker
maxi297 71065f0
coderabbitai
maxi297 2ae3f57
code review
maxi297 fbf907c
fix test
maxi297 92b9dff
format/lint
maxi297 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
64 changes: 64 additions & 0 deletions
64
airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
from typing import Optional | ||
|
||
from airbyte_cdk.sources.declarative.models import FailureType | ||
from airbyte_cdk.sources.declarative.types import Record, StreamSlice | ||
from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor | ||
from airbyte_cdk.utils.traced_exception import AirbyteTracedException | ||
|
||
|
||
class PaginationTracker: | ||
_record_count: int | ||
_number_of_attempt_with_same_slice: int | ||
|
||
def __init__( | ||
self, cursor: Optional[ConcurrentCursor] = None, max_number_of_records: Optional[int] = None | ||
) -> None: | ||
""" | ||
Ideally, we would have passed the `Cursor` interface here instead of `ConcurrentCursor` but not all | ||
implementations of `Cursor` can support this use case. For example, if the `ConcurrentPerPartitionCursor` | ||
switch to global state, we stop keeping track of the state per partition and therefore can't get an accurate | ||
view for a specific stream_slice. In order to solve that, we decided to scope this feature to use only | ||
ConcurrentCursor which is the only "leaf" cursor that actually emits stream slices with `cursor_partition`. | ||
""" | ||
self._cursor = cursor | ||
self._limit = max_number_of_records | ||
self.reset() | ||
|
||
""" | ||
Given we have a cursor, we do not allow for the same slice to be processed twice because we assume we will | ||
always process the same slice. | ||
Given no cursor, we assume that the pagination reset is for retrying purposes and we allow to retry once. | ||
""" | ||
self._allowed_number_of_attempt_with_same_slice = 1 if self._cursor else 2 | ||
self._number_of_attempt_with_same_slice = 0 | ||
|
||
def observe(self, record: Record) -> None: | ||
self._record_count += 1 | ||
if self._cursor: | ||
maxi297 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self._cursor.observe(record) | ||
|
||
def has_reached_limit(self) -> bool: | ||
return self._limit is not None and self._record_count >= self._limit | ||
|
||
def reset(self) -> None: | ||
self._record_count = 0 | ||
self._number_of_attempt_with_same_slice = 0 | ||
|
||
def reduce_slice_range_if_possible(self, stream_slice: StreamSlice) -> StreamSlice: | ||
new_slice = self._cursor.reduce_slice_range(stream_slice) if self._cursor else stream_slice | ||
|
||
if new_slice == stream_slice: | ||
self._number_of_attempt_with_same_slice += 1 | ||
if ( | ||
self._number_of_attempt_with_same_slice | ||
>= self._allowed_number_of_attempt_with_same_slice | ||
): | ||
raise AirbyteTracedException( | ||
internal_message=f"There were {self._number_of_attempt_with_same_slice} attempts with the same slice already while the max allowed is {self._allowed_number_of_attempt_with_same_slice}", | ||
failure_type=FailureType.system_error, | ||
) | ||
else: | ||
self._number_of_attempt_with_same_slice = 0 | ||
|
||
return new_slice |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.