Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2279,12 +2279,14 @@ definitions:
- FAIL
- RETRY
- IGNORE
- RESET_PAGINATION
- RATE_LIMITED
examples:
- SUCCESS
- FAIL
- RETRY
- IGNORE
- RESET_PAGINATION
- RATE_LIMITED
failure_type:
title: Failure Type
Expand Down Expand Up @@ -3707,6 +3709,9 @@ definitions:
anyOf:
- "$ref": "#/definitions/DefaultPaginator"
- "$ref": "#/definitions/NoPagination"
pagination_reset:
description: Describes what triggers pagination reset and how to handle it.
"$ref": "#/definitions/PaginationReset"
ignore_stream_slicer_parameters_on_paginated_requests:
description: If true, the partition router and incremental request options will be ignored when paginating requests. Request options set directly on the requester will not be ignored.
type: boolean
Expand All @@ -3730,6 +3735,36 @@ definitions:
$parameters:
type: object
additionalProperties: true
PaginationReset:
title: Pagination Reset
description: Describes what triggers pagination reset and how to handle it. If SPLIT_USING_CURSOR, the connector developer is accountable for ensuring that the records are returned in ascending order.
type: object
required:
- type
- action
properties:
type:
type: string
enum: [ PaginationReset ]
action:
type: string
enum:
- SPLIT_USING_CURSOR
- RESET
limits:
"$ref": "#/definitions/PaginationResetLimits"
PaginationResetLimits:
title: Pagination Reset Limits
description: Describes the limits that trigger pagination reset
type: object
required:
- type
properties:
type:
type: string
enum: [ PaginationResetLimits ]
number_of_records:
type: integer
GzipDecoder:
title: gzip
description: Select 'gzip' for response data that is compressed with gzip. Requires specifying an inner data type/decoder to parse the decompressed data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ def __init__(
self._connector_state_converter = connector_state_converter
self._cursor_field = cursor_field

self._cursor_factory = cursor_factory
self._cursor_factory = cursor_factory # self._cursor_factory is flagged as private but is used in model_to_component_factory to ease pagination reset instantiation
self._partition_router = partition_router

# The dict is ordered to ensure that once the maximum number of partitions is reached,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,7 @@ class Action(Enum):
FAIL = "FAIL"
RETRY = "RETRY"
IGNORE = "IGNORE"
RESET_PAGINATION = "RESET_PAGINATION"
RATE_LIMITED = "RATE_LIMITED"


Expand All @@ -553,7 +554,14 @@ class HttpResponseFilter(BaseModel):
action: Optional[Action] = Field(
None,
description="Action to execute if a response matches the filter.",
examples=["SUCCESS", "FAIL", "RETRY", "IGNORE", "RATE_LIMITED"],
examples=[
"SUCCESS",
"FAIL",
"RETRY",
"IGNORE",
"RESET_PAGINATION",
"RATE_LIMITED",
],
title="Action",
)
failure_type: Optional[FailureType] = Field(
Expand Down Expand Up @@ -1173,6 +1181,16 @@ class LegacySessionTokenAuthenticator(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class Action1(Enum):
SPLIT_USING_CURSOR = "SPLIT_USING_CURSOR"
RESET = "RESET"


class PaginationResetLimits(BaseModel):
type: Literal["PaginationResetLimits"]
number_of_records: Optional[int] = None


class CsvDecoder(BaseModel):
type: Literal["CsvDecoder"]
encoding: Optional[str] = "utf-8"
Expand Down Expand Up @@ -2054,6 +2072,12 @@ class RecordSelector(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class PaginationReset(BaseModel):
type: Literal["PaginationReset"]
action: Action1
limits: Optional[PaginationResetLimits] = None


class GzipDecoder(BaseModel):
type: Literal["GzipDecoder"]
decoder: Union[CsvDecoder, GzipDecoder, JsonDecoder, JsonlDecoder]
Expand Down Expand Up @@ -2822,6 +2846,10 @@ class SimpleRetriever(BaseModel):
None,
description="Paginator component that describes how to navigate through the API's pages.",
)
pagination_reset: Optional[PaginationReset] = Field(
None,
description="Describes what triggers pagination reset and how to handle it.",
)
ignore_stream_slicer_parameters_on_paginated_requests: Optional[bool] = Field(
False,
description="If true, the partition router and incremental request options will be ignored when paginating requests. Request options set directly on the requester will not be ignored.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@
)
from airbyte_cdk.sources.declarative.models import (
CustomStateMigration,
PaginationResetLimits,
)
from airbyte_cdk.sources.declarative.models.base_model_with_deprecations import (
DEPRECATION_LOGS_TAG,
Expand Down Expand Up @@ -358,6 +359,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
PageIncrement as PageIncrementModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
PaginationReset as PaginationResetModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
ParametrizedComponentsResolver as ParametrizedComponentsResolverModel,
)
Expand Down Expand Up @@ -529,6 +533,7 @@
LocalFileSystemFileWriter,
NoopFileWriter,
)
from airbyte_cdk.sources.declarative.retrievers.pagination_tracker import PaginationTracker
from airbyte_cdk.sources.declarative.schema import (
ComplexFieldType,
DefaultSchemaLoader,
Expand Down Expand Up @@ -644,6 +649,8 @@
# this would be a circular import
MAX_SLICES = 5

LOGGER = logging.getLogger(f"airbyte.model_to_component_factory")


class ModelToComponentFactory:
EPOCH_DATETIME_FORMAT = "%s"
Expand Down Expand Up @@ -2043,6 +2050,7 @@ def create_default_stream(
if isinstance(concurrent_cursor, FinalStateCursor)
else concurrent_cursor
)

retriever = self._create_component_from_model(
model=model.retriever,
config=config,
Expand All @@ -2051,12 +2059,9 @@ def create_default_stream(
request_options_provider=request_options_provider,
stream_slicer=stream_slicer,
partition_router=partition_router,
stop_condition_cursor=concurrent_cursor
if self._is_stop_condition_on_cursor(model)
else None,
client_side_incremental_sync={"cursor": concurrent_cursor}
if self._is_client_side_filtering_enabled(model)
else None,
has_stop_condition_cursor=self._is_stop_condition_on_cursor(model),
is_client_side_incremental_sync=self._is_client_side_filtering_enabled(model),
cursor=concurrent_cursor,
transformations=transformations,
file_uploader=file_uploader,
incremental_sync=model.incremental_sync,
Expand Down Expand Up @@ -3050,7 +3055,7 @@ def create_record_selector(
name: str,
transformations: List[RecordTransformation] | None = None,
decoder: Decoder | None = None,
client_side_incremental_sync: Dict[str, Any] | None = None,
client_side_incremental_sync_cursor: Optional[Cursor] = None,
file_uploader: Optional[DefaultFileUploader] = None,
**kwargs: Any,
) -> RecordSelector:
Expand All @@ -3066,14 +3071,14 @@ def create_record_selector(
transform_before_filtering = (
False if model.transform_before_filtering is None else model.transform_before_filtering
)
if client_side_incremental_sync:
if client_side_incremental_sync_cursor:
record_filter = ClientSideIncrementalRecordFilterDecorator(
config=config,
parameters=model.parameters,
condition=model.record_filter.condition
if (model.record_filter and hasattr(model.record_filter, "condition"))
else None,
**client_side_incremental_sync,
cursor=client_side_incremental_sync_cursor,
)
transform_before_filtering = (
True
Expand Down Expand Up @@ -3151,8 +3156,9 @@ def create_simple_retriever(
name: str,
primary_key: Optional[Union[str, List[str], List[List[str]]]],
request_options_provider: Optional[RequestOptionsProvider] = None,
stop_condition_cursor: Optional[Cursor] = None,
client_side_incremental_sync: Optional[Dict[str, Any]] = None,
cursor: Optional[Cursor] = None,
has_stop_condition_cursor: bool = False,
is_client_side_incremental_sync: bool = False,
transformations: List[RecordTransformation],
file_uploader: Optional[DefaultFileUploader] = None,
incremental_sync: Optional[
Expand Down Expand Up @@ -3182,6 +3188,9 @@ def _get_url(req: Requester) -> str:

return _url or _url_base

if cursor is None:
cursor = FinalStateCursor(name, None, self._message_repository)

decoder = (
self._create_component_from_model(model=model.decoder, config=config)
if model.decoder
Expand All @@ -3193,7 +3202,7 @@ def _get_url(req: Requester) -> str:
config=config,
decoder=decoder,
transformations=transformations,
client_side_incremental_sync=client_side_incremental_sync,
client_side_incremental_sync_cursor=cursor if is_client_side_incremental_sync else None,
file_uploader=file_uploader,
)

Expand Down Expand Up @@ -3270,7 +3279,7 @@ def _get_url(req: Requester) -> str:
url_base=_get_url(requester),
extractor_model=model.record_selector.extractor,
decoder=decoder,
cursor_used_for_stop_condition=stop_condition_cursor or None,
cursor_used_for_stop_condition=cursor if has_stop_condition_cursor else None,
)
if model.paginator
else NoPagination(parameters={})
Expand Down Expand Up @@ -3319,6 +3328,13 @@ def _get_url(req: Requester) -> str:
parameters=model.parameters or {},
)

if (
model.record_selector.record_filter
and model.pagination_reset
and model.pagination_reset.limits
):
raise ValueError("PaginationResetLimits are not support while having record filter.")

return SimpleRetriever(
name=name,
paginator=paginator,
Expand All @@ -3332,9 +3348,34 @@ def _get_url(req: Requester) -> str:
ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests,
additional_query_properties=query_properties,
log_formatter=self._get_log_formatter(log_formatter, name),
pagination_tracker_factory=self._create_pagination_tracker_factory(
model.pagination_reset, cursor
),
parameters=model.parameters or {},
)

def _create_pagination_tracker_factory(
self, model: Optional[PaginationResetModel], cursor: Cursor
) -> Callable[[], PaginationTracker]:
if model is None:
return lambda: PaginationTracker()

# Until we figure out a way to use any cursor for PaginationTracker, we will have to have this cursor selector logic
cursor_for_pagination_tracking = None
if isinstance(cursor, ConcurrentCursor):
cursor_for_pagination_tracking = cursor
elif isinstance(cursor, ConcurrentPerPartitionCursor):
cursor_for_pagination_tracking = cursor._cursor_factory.create( # type: ignore # if this becomes a problem, we would need to extract the cursor_factory instantiation logic and make it accessible here
{}, datetime.timedelta(0)
)
elif not isinstance(cursor, FinalStateCursor):
LOGGER.warning(
"Unknown cursor for PaginationTracker. Pagination resets might not work properly"
)

limit = model.limits.number_of_records if model and model.limits else None
return lambda: PaginationTracker(cursor_for_pagination_tracking, limit)

def _get_log_formatter(
self, log_formatter: Callable[[Response], Any] | None, name: str
) -> Callable[[Response], Any] | None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,14 @@ def interpret_response(
if not isinstance(matched_error_resolution, ErrorResolution):
continue

if matched_error_resolution.response_action == ResponseAction.SUCCESS:
if matched_error_resolution.response_action in [
ResponseAction.SUCCESS,
ResponseAction.RETRY,
ResponseAction.IGNORE,
ResponseAction.RESET_PAGINATION,
]:
return matched_error_resolution

if (
matched_error_resolution.response_action == ResponseAction.RETRY
or matched_error_resolution.response_action == ResponseAction.IGNORE
):
return matched_error_resolution
if matched_error_resolution:
return matched_error_resolution

Expand Down
64 changes: 64 additions & 0 deletions airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from typing import Optional

from airbyte_cdk.sources.declarative.models import FailureType
from airbyte_cdk.sources.declarative.types import Record, StreamSlice
from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor
from airbyte_cdk.utils.traced_exception import AirbyteTracedException


class PaginationTracker:
_record_count: int
_number_of_attempt_with_same_slice: int

def __init__(
self, cursor: Optional[ConcurrentCursor] = None, max_number_of_records: Optional[int] = None
) -> None:
"""
Ideally, we would have passed the `Cursor` interface here instead of `ConcurrentCursor` but not all
implementations of `Cursor` can support this use case. For example, if the `ConcurrentPerPartitionCursor`
switch to global state, we stop keeping track of the state per partition and therefore can't get an accurate
view for a specific stream_slice. In order to solve that, we decided to scope this feature to use only
ConcurrentCursor which is the only "leaf" cursor that actually emits stream slices with `cursor_partition`.
"""
self._cursor = cursor
self._limit = max_number_of_records
self.reset()

"""
Given we have a cursor, we do not allow for the same slice to be processed twice because we assume we will
always process the same slice.
Given no cursor, we assume that the pagination reset is for retrying purposes and we allow to retry once.
"""
self._allowed_number_of_attempt_with_same_slice = 1 if self._cursor else 2
self._number_of_attempt_with_same_slice = 0

def observe(self, record: Record) -> None:
self._record_count += 1
if self._cursor:
self._cursor.observe(record)

def has_reached_limit(self) -> bool:
return self._limit is not None and self._record_count >= self._limit

def reset(self) -> None:
self._record_count = 0
self._number_of_attempt_with_same_slice = 0

def reduce_slice_range_if_possible(self, stream_slice: StreamSlice) -> StreamSlice:
new_slice = self._cursor.reduce_slice_range(stream_slice) if self._cursor else stream_slice

if new_slice == stream_slice:
self._number_of_attempt_with_same_slice += 1
if (
self._number_of_attempt_with_same_slice
>= self._allowed_number_of_attempt_with_same_slice
):
raise AirbyteTracedException(
internal_message=f"There were {self._number_of_attempt_with_same_slice} attempts with the same slice already while the max allowed is {self._allowed_number_of_attempt_with_same_slice}",
failure_type=FailureType.system_error,
)
else:
self._number_of_attempt_with_same_slice = 0

return new_slice
Loading
Loading