-
Notifications
You must be signed in to change notification settings - Fork 30
feat: pagination reset #781
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. Testing This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@maxi297/pagination_reset#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch maxi297/pagination_reset Helpful ResourcesPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
/autofix
|
📝 WalkthroughWalkthroughAdds pagination-reset support: new PaginationReset and PaginationResetLimits schemas/models, RESET_PAGINATION response action and exception, PaginationTracker with slice-reduction and attempt tracking, wiring into SimpleRetriever and ModelToComponentFactory, HTTP client raising on reset action, RootPath support for list responses, and unit tests for reset/limit flows. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant SR as SimpleRetriever
participant PT as PaginationTracker
participant P as Paginator
participant HC as HttpClient
participant EH as ErrorHandler
participant CC as ConcurrentCursor
Note over SR,PT: create tracker per read-run
SR->>PT: tracker = pagination_tracker_factory()
SR->>P: compute initial next page token
loop pages for a slice
SR->>HC: request(page token)
HC->>EH: evaluate response
alt action == RESET_PAGINATION
EH-->>HC: RESET_PAGINATION
HC-->>SR: raise PaginationResetRequiredException
SR->>PT: reset()
SR-->>SR: reinitialize paginator and retry same slice
else SUCCESS
HC-->>SR: records + next token
SR->>PT: observe(record)*
alt PT.has_reached_limit()
SR->>CC: reduce_slice_range(slice)
CC-->>SR: reduced slice
SR->>PT: reset()
SR-->>SR: restart with reduced slice
else
SR-->>SR: continue pagination
end
end
end
sequenceDiagram
autonumber
participant MF as ModelToComponentFactory
participant CUR as Cursor
participant SR as SimpleRetriever
participant PT as PaginationTracker
MF->>CUR: resolve/create cursor
MF->>MF: _create_pagination_tracker_factory(model.pagination_reset, cursor)
MF-->>SR: instantiate SimpleRetriever(..., cursor, pagination_tracker_factory, ...)
SR->>PT: create tracker per run via factory
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
Want a brief checklist of real-connector scenarios to validate PaginationReset end-to-end, wdyt? Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (1)
374-465
: Restore typing to satisfy mypyThe CI failures stem from the new
_get_initial_next_page_token
helper lacking type hints and being called from typed contexts. Could we add a precise signature (e.g. returningOptional[Mapping[str, Any]]
), adjust the localnext_page_token
variable annotations, and ensure_next_page_token
continues to receive a non-optional response (maybe by guarding the call or annotatingresponse
)? This should clear theno-untyped-call
andarg-type
errors flagged by mypy, wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (13)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml
(3 hunks)airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
(1 hunks)airbyte_cdk/sources/declarative/models/declarative_component_schema.py
(5 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(14 hunks)airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py
(1 hunks)airbyte_cdk/sources/declarative/retrievers/simple_retriever.py
(5 hunks)airbyte_cdk/sources/streams/concurrent/cursor.py
(4 hunks)airbyte_cdk/sources/streams/http/error_handlers/response_models.py
(1 hunks)airbyte_cdk/sources/streams/http/http_client.py
(2 hunks)airbyte_cdk/sources/streams/http/pagination_reset_exception.py
(1 hunks)unit_tests/sources/declarative/retrievers/test_pagination_tracker.py
(1 hunks)unit_tests/sources/declarative/retrievers/test_simple_retriever.py
(3 hunks)unit_tests/sources/declarative/test_concurrent_declarative_source.py
(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (8)
airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py (5)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
FailureType
(546-549)airbyte_cdk/sources/types.py (2)
Record
(21-72)StreamSlice
(75-169)airbyte_cdk/sources/streams/concurrent/cursor.py (4)
observe
(57-61)observe
(112-113)observe
(232-252)reduce_slice_range
(523-544)airbyte_cdk/utils/traced_exception.py (1)
AirbyteTracedException
(25-145)airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)
observe
(510-543)
unit_tests/sources/declarative/retrievers/test_simple_retriever.py (5)
airbyte_cdk/sources/declarative/extractors/http_selector.py (2)
HttpSelector
(13-37)select_records
(20-37)airbyte_cdk/sources/declarative/requesters/paginators/paginator.py (3)
Paginator
(18-65)get_initial_token
(27-30)next_page_token
(33-49)airbyte_cdk/sources/declarative/requesters/requester.py (7)
HttpMethod
(18-26)Requester
(29-156)send_request
(138-156)get_request_params
(80-91)get_request_headers
(94-103)get_request_body_data
(106-121)get_request_body_json
(124-135)airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py (4)
PaginationTracker
(9-61)has_reached_limit
(41-42)reset
(44-45)reduce_slice_range_if_possible
(47-61)airbyte_cdk/sources/streams/http/pagination_reset_exception.py (1)
PaginationResetRequiredException
(1-2)
airbyte_cdk/sources/streams/concurrent/cursor.py (3)
airbyte_cdk/sources/types.py (5)
Record
(21-72)StreamSlice
(75-169)partition
(99-104)cursor_slice
(107-112)extra_fields
(115-117)airbyte_cdk/sources/streams/concurrent/state_converters/abstract_stream_state_converter.py (1)
output_format
(133-137)airbyte_cdk/sources/streams/concurrent/state_converters/datetime_stream_state_converter.py (4)
output_format
(46-46)output_format
(133-134)output_format
(170-182)output_format
(214-215)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (4)
airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py (5)
PaginationTracker
(9-61)observe
(36-39)has_reached_limit
(41-42)reset
(44-45)reduce_slice_range_if_possible
(47-61)airbyte_cdk/sources/streams/http/pagination_reset_exception.py (1)
PaginationResetRequiredException
(1-2)airbyte_cdk/sources/types.py (7)
Record
(21-72)StreamSlice
(75-169)partition
(99-104)cursor_slice
(107-112)extra_fields
(115-117)data
(35-36)associated_slice
(39-40)airbyte_cdk/sources/declarative/requesters/paginators/default_paginator.py (2)
get_initial_token
(127-134)get_initial_token
(247-249)
unit_tests/sources/declarative/retrievers/test_pagination_tracker.py (5)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
FailureType
(546-549)airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py (5)
PaginationTracker
(9-61)observe
(36-39)has_reached_limit
(41-42)reset
(44-45)reduce_slice_range_if_possible
(47-61)airbyte_cdk/sources/types.py (6)
Record
(21-72)StreamSlice
(75-169)data
(35-36)associated_slice
(39-40)partition
(99-104)cursor_slice
(107-112)airbyte_cdk/sources/streams/concurrent/cursor.py (5)
ConcurrentCursor
(135-544)observe
(57-61)observe
(112-113)observe
(232-252)reduce_slice_range
(523-544)airbyte_cdk/utils/traced_exception.py (1)
AirbyteTracedException
(25-145)
unit_tests/sources/declarative/test_concurrent_declarative_source.py (4)
unit_tests/sources/test_source.py (2)
catalog
(70-93)source
(65-66)airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)
read
(365-380)airbyte_cdk/test/mock_http/mocker.py (2)
HttpMocker
(25-185)get
(94-95)airbyte_cdk/test/mock_http/request.py (1)
HttpRequest
(14-103)
airbyte_cdk/sources/streams/http/http_client.py (2)
airbyte_cdk/sources/streams/http/pagination_reset_exception.py (1)
PaginationResetRequiredException
(1-2)airbyte_cdk/sources/streams/http/error_handlers/response_models.py (1)
ResponseAction
(14-20)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (5)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (2)
PaginationResetLimits
(1189-1191)PaginationReset
(2075-2078)airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py (1)
PaginationTracker
(9-61)airbyte_cdk/sources/streams/concurrent/cursor.py (3)
Cursor
(51-87)FinalStateCursor
(90-132)ConcurrentCursor
(135-544)airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (2)
ConcurrentPerPartitionCursor
(106-660)create
(98-103)airbyte_cdk/sources/concurrent_source/concurrent_source.py (1)
create
(41-71)
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py
[error] 376-376: Call to untyped function "_get_initial_next_page_token" in typed context [no-untyped-call]. Command: poetry run mypy --config-file mypy.ini airbyte_cdk
[error] 438-438: Call to untyped function "_get_initial_next_page_token" in typed context [no-untyped-call]. Command: poetry run mypy --config-file mypy.ini airbyte_cdk
[error] 450-450: Argument "response" to "_next_page_token" of "SimpleRetriever" has incompatible type "Response | None"; expected "Response" [arg-type]. Command: poetry run mypy --config-file mypy.ini airbyte_cdk
[error] 461-461: Function is missing a return type annotation [no-untyped-def]. Command: poetry run mypy --config-file mypy.ini airbyte_cdk
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
[error] 3352-3352: Argument 1 to "_create_pagination_tracker_factory" of "ModelToComponentFactory" has incompatible type "PaginationReset | None"; expected "PaginationReset" [arg-type]. Command: poetry run mypy --config-file mypy.ini airbyte_cdk
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (12)
- GitHub Check: Check: source-intercom
- GitHub Check: Check: source-pokeapi
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: Check: source-shopify
- GitHub Check: Check: destination-motherduck
- GitHub Check: Pytest (All, Python 3.12, Ubuntu)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Manifest Server Docker Image Build
- GitHub Check: Pytest (All, Python 3.13, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (Fast)
🔇 Additional comments (1)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)
154-154
: Thanks for capturing the intent hereReally appreciate the context on why
_cursor_factory
stays accessible—it’ll save future spelunking when we revisit pagination reset wiring. Nicely done!
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Outdated
Show resolved
Hide resolved
airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (1)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (1)
387-425
: Preserve the original stream_slice to avoid incorrect reset logic.The property chunking branch overwrites
stream_slice
at lines 394-398, which means when a reset occurs (line 443),reduce_slice_range_if_possible
operates on the last chunk rather than the original slice. Could we preserve the original by introducing achunked_slice
variable and passing that to_fetch_next_page
, wdyt?Based on past review comments.
Apply this diff:
if ( self.additional_query_properties and self.additional_query_properties.property_chunking ): + original_slice = stream_slice for properties in self.additional_query_properties.get_request_property_chunks( stream_slice=stream_slice ): - stream_slice = StreamSlice( + chunked_slice = StreamSlice( partition=stream_slice.partition or {}, cursor_slice=stream_slice.cursor_slice or {}, extra_fields={"query_properties": properties}, ) response = self._fetch_next_page( - stream_state, stream_slice, next_page_token + stream_state, chunked_slice, next_page_token ) for current_record in records_generator_fn(response): merge_key = ( self.additional_query_properties.property_chunking.get_merge_key( current_record ) ) if merge_key: _deep_merge(merged_records[merge_key], current_record) else: # We should still emit records even if the record did not have a merge key pagination_tracker.observe(current_record) last_page_size += 1 last_record = current_record yield current_record for merged_record in merged_records.values(): record = Record( - data=merged_record, stream_name=self.name, associated_slice=stream_slice + data=merged_record, stream_name=self.name, associated_slice=original_slice )Then use
original_slice
(instead of the mutatedstream_slice
) at line 443:- stream_slice = pagination_tracker.reduce_slice_range_if_possible(stream_slice) + stream_slice = pagination_tracker.reduce_slice_range_if_possible(original_slice)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py
(5 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (4)
airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py (5)
PaginationTracker
(9-61)observe
(36-39)has_reached_limit
(41-42)reset
(44-45)reduce_slice_range_if_possible
(47-61)airbyte_cdk/sources/streams/concurrent/cursor.py (4)
Cursor
(51-87)observe
(57-61)observe
(112-113)observe
(232-252)airbyte_cdk/sources/streams/http/pagination_reset_exception.py (1)
PaginationResetRequiredException
(1-2)airbyte_cdk/sources/types.py (7)
Record
(21-72)StreamSlice
(75-169)partition
(99-104)cursor_slice
(107-112)extra_fields
(115-117)data
(35-36)associated_slice
(39-40)
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py
[error] 379-379: Call to untyped function "_get_initial_next_page_token" in typed context [no-untyped-call]
[error] 441-441: Call to untyped function "_get_initial_next_page_token" in typed context [no-untyped-call]
[error] 453-453: Argument "response" to "_next_page_token" of "SimpleRetriever" has incompatible type "Response | None"; expected "Response" [arg-type]
[error] 464-464: Function is missing a return type annotation [no-untyped-def]
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (12)
- GitHub Check: Check: source-intercom
- GitHub Check: Check: source-pokeapi
- GitHub Check: Check: source-shopify
- GitHub Check: Check: destination-motherduck
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: Manifest Server Docker Image Build
- GitHub Check: SDM Docker Image Build
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.13, Ubuntu)
- GitHub Check: Pytest (All, Python 3.12, Ubuntu)
🔇 Additional comments (4)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (4)
6-6
: LGTM!The new imports support the pagination reset functionality and are all used appropriately in the refactored code.
Also applies to: 42-42, 46-46, 48-50, 55-55
101-103
: LGTM!The factory pattern enables per-call tracker creation and supports dependency injection for testing, which is a solid approach.
106-108
: Verify that nullifying concurrent cursors is the intended behavior.This logic sets
self.cursor = None
forCursor
instances to prevent SimpleRetriever from managingConcurrentCursor
state. While this appears intentional for backward compatibility withCustomRetriever
implementations, ensure this doesn't silently break cursor functionality for consumers expecting the cursor to be present, wdyt?
433-447
: Clarify the reset vs. limit-reached flow.Both
PaginationResetRequiredException
andhas_reached_limit()
trigger the same reset logic. Is the intent that they're treated identically, or should limit-reached have slightly different handling (e.g., not reducing the slice)? The current implementation seems correct if both cases require a retry with a narrowed slice, wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (2)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (2)
377-425
: Preserve the original stream_slice before property chunking.Inside the property chunking loop (line 394),
stream_slice
is reassigned for each chunk. After the loop completes,stream_slice
holds the last chunk's slice instead of the original. When a reset occurs (line 443),reduce_slice_range_if_possible
operates on this mutated slice rather than the original one, and the log message (line 444-446) also references the wrong slice.Could we introduce a local variable like
original_stream_slice
to preserve the slice passed into_read_pages
, then use that for reset logic while keepingstream_slice
(or a newcurrent_chunk_slice
) for the chunking loop, wdyt?Based on past review comments.
464-467
: Add return type annotation to_get_initial_next_page_token
.This helper is missing a return type annotation, which mypy flags as an error in typed contexts (lines 379, 441).
Could we annotate it as
-> Optional[Mapping[str, Any]]
to match the return value structure and resolve the type-checking errors, wdyt?Apply this diff to add the missing return type:
- def _get_initial_next_page_token(self): + def _get_initial_next_page_token(self) -> Optional[Mapping[str, Any]]: initial_token = self._paginator.get_initial_token() next_page_token = {"next_page_token": initial_token} if initial_token is not None else None return next_page_tokenBased on past review comments.
🧹 Nitpick comments (2)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (1)
106-108
: Clarify the cursor validation logic.The comment explains this is for
CustomRetriever
compatibility, but settingself.cursor = None if isinstance(self.cursor, Cursor) else self.cursor
reads as "nullify Cursor instances but keep everything else." Is this the intended behavior? It seems counterintuitive – typically we'd want to keep Cursor instances and reject incompatible types, wdyt?Could we add more context about why Cursor instances need to be nullified for CustomRetriever, or would it be cleaner to handle this in the CustomRetriever class itself?
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
3331-3336
: Fix typo in error message.The error message on line 3336 has a grammatical error: "are not support" should be "are not supported".
Apply this diff to correct the typo:
- raise ValueError("PaginationResetLimits are not support while having record filter.") + raise ValueError("PaginationResetLimits are not supported while having a record filter.")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(14 hunks)airbyte_cdk/sources/declarative/retrievers/simple_retriever.py
(4 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (3)
airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py (5)
PaginationTracker
(9-61)observe
(36-39)has_reached_limit
(41-42)reset
(44-45)reduce_slice_range_if_possible
(47-61)airbyte_cdk/sources/streams/concurrent/cursor.py (4)
Cursor
(51-87)observe
(57-61)observe
(112-113)observe
(232-252)airbyte_cdk/sources/types.py (8)
Record
(21-72)StreamSlice
(75-169)partition
(99-104)cursor_slice
(107-112)extra_fields
(115-117)data
(35-36)associated_slice
(39-40)get
(146-147)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (2)
PaginationResetLimits
(1189-1191)PaginationReset
(2075-2078)airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py (1)
PaginationTracker
(9-61)airbyte_cdk/sources/streams/concurrent/cursor.py (3)
Cursor
(51-87)FinalStateCursor
(90-132)ConcurrentCursor
(135-544)airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (2)
ConcurrentPerPartitionCursor
(106-660)create
(98-103)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (13)
- GitHub Check: Check: source-pokeapi
- GitHub Check: Check: source-intercom
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: Check: source-shopify
- GitHub Check: Check: destination-motherduck
- GitHub Check: Manifest Server Docker Image Build
- GitHub Check: SDM Docker Image Build
- GitHub Check: Pytest (All, Python 3.12, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.13, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Analyze (python)
🔇 Additional comments (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
3357-3377
: LGTM on the pagination tracker factory logic!The method correctly handles the
Optional[PaginationResetModel]
parameter (addressing the past review concern) and safely extracts the cursor for pagination tracking based on its type. The fallback toPaginationTracker()
when no model is provided is sensible.Nice work ensuring the cursor selection logic handles
ConcurrentCursor
,ConcurrentPerPartitionCursor
, and theFinalStateCursor
fallback with appropriate logging!Based on past review comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (1)
airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py (1)
48-64
: Slice-reduction attempt logic looks correct and addresses prior feedbackThe attempt counter only increments on unchanged slices and resets on reduction; the allowed-attempts policy (1 with cursor, 2 without) is clear. This resolves the earlier “reset attempt counter per slice” issue noted previously—nice cleanup.
🧹 Nitpick comments (4)
airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py (4)
27-33
: Replace no-op triple-quoted string with comments or move into the class docstringThe standalone triple-quoted string inside
__init__
is a no-op. Would you switch it to comments or incorporate it into the class docstring for clarity, wdyt?- """ - Given we have a cursor, we do not allow for the same slice to be processed twice because we assume we will - always process the same slice. - - Given no cursor, we assume that the pagination reset is for retrying purposes and we allow to retry once. - """ + # Given we have a cursor, we do not allow the same slice to be processed twice because we assume + # we will always process the same slice. + # + # Given no cursor, we assume that the pagination reset is for retrying purposes and we allow + # one retry for the same slice.
36-40
: Confirm tracker instance scope vs concurrency
_record_count
increments aren’t synchronized. If a single PaginationTracker instance is shared across threads/partitions, increments could race. Are we guaranteed a per-partition/serialized use of the tracker (similar to the cursor’s assumptions)? If not, shall we add a light lock aroundobserve
to guard_record_count
, wdyt?
41-43
: Clarify non-positive limit behaviorIf
max_number_of_records
is 0 or negative,has_reached_limit()
immediately returns True at start. Should we treat non-positive values as “no limit” (None) or raise in__init__
? I can patch it to normalize to None, wdyt?- self._limit = max_number_of_records + self._limit = max_number_of_records + if self._limit is not None and self._limit <= 0: + # Treat non-positive limits as unlimited + self._limit = None
57-60
: Add a user-friendly exception message for UI consumersWould you add a
message=
forAirbyteTracedException
so users see a clearer error in the UI (while keepinginternal_message
intact), wdyt?- raise AirbyteTracedException( - internal_message=f"There were {self._number_of_attempt_with_same_slice} attempts with the same slice already while the max allowed is {self._allowed_number_of_attempt_with_same_slice}", - failure_type=FailureType.system_error, - ) + raise AirbyteTracedException( + internal_message=( + f"There were {self._number_of_attempt_with_same_slice} attempts with the same " + f"slice already while the max allowed is {self._allowed_number_of_attempt_with_same_slice}" + ), + message="Pagination reset failed: retried the same slice too many times.", + failure_type=FailureType.system_error, + )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
airbyte_cdk/sources/declarative/requesters/error_handlers/composite_error_handler.py
(1 hunks)airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py
(1 hunks)airbyte_cdk/test/mock_http/response_builder.py
(5 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py (4)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
FailureType
(546-549)airbyte_cdk/sources/types.py (2)
Record
(21-72)StreamSlice
(75-169)airbyte_cdk/sources/streams/concurrent/cursor.py (4)
observe
(57-61)observe
(112-113)observe
(232-252)reduce_slice_range
(523-544)airbyte_cdk/utils/traced_exception.py (1)
AirbyteTracedException
(25-145)
airbyte_cdk/sources/declarative/requesters/error_handlers/composite_error_handler.py (1)
airbyte_cdk/sources/streams/http/error_handlers/response_models.py (1)
ResponseAction
(14-20)
🪛 GitHub Actions: Linters
airbyte_cdk/test/mock_http/response_builder.py
[error] 83-83: mypy: Argument 1 of 'write' is incompatible with supertype 'Path' (override).
[error] 86-86: mypy: Argument 1 of 'update' is incompatible with supertype 'Path' (override).
[error] 90-90: mypy: Argument 1 of 'extract' is incompatible with supertype 'Path' (override).
[error] 199-199: mypy: Argument 1 to 'update' of 'RootPath' has incompatible type 'dict[str, Any]'; expected 'list[dict[str, Any]]'.
[error] 238-238: mypy: Argument 1 to 'extract' of 'RootPath' has incompatible type 'dict[str, Any]'; expected 'list[dict[str, Any]]'.
airbyte_cdk/sources/declarative/requesters/error_handlers/composite_error_handler.py
[error] 66-73: Ruff formatting issue detected. The file requires formatting changes. Run 'poetry run ruff format' to apply formatting.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (12)
- GitHub Check: Check: source-pokeapi
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: Check: destination-motherduck
- GitHub Check: Check: source-intercom
- GitHub Check: Check: source-shopify
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.12, Ubuntu)
- GitHub Check: Pytest (All, Python 3.13, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Manifest Server Docker Image Build
- GitHub Check: Pytest (Fast)
🔇 Additional comments (2)
airbyte_cdk/sources/declarative/requesters/error_handlers/composite_error_handler.py (1)
69-70
: Verify that RESET_PAGINATION should terminate the handler chain.By including
RESET_PAGINATION
in this check, any error handler that resolves toRESET_PAGINATION
will cause an immediate return, preventing subsequent handlers from being consulted. This seems like the correct behavior for a pagination reset signal, similar to howSUCCESS
,RETRY
, andIGNORE
are handled. Can you confirm this is the intended flow, wdyt?airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py (1)
3-6
: Keep Record/StreamSlice imports from declarative.types
Theairbyte_cdk.sources.declarative.types
module re-exports both types, as confirmed by existing code and tests.
airbyte_cdk/sources/declarative/requesters/error_handlers/composite_error_handler.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte_cdk/test/mock_http/response_builder.py
(6 hunks)
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/test/mock_http/response_builder.py
[error] 1-1: ruff format check failed. 1 file would be reformatted; formatting changes shown in diff for airbyte_cdk/test/mock_http/response_builder.py.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (13)
- GitHub Check: Check: source-intercom
- GitHub Check: Check: source-pokeapi
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: Check: destination-motherduck
- GitHub Check: Check: source-shopify
- GitHub Check: SDM Docker Image Build
- GitHub Check: Manifest Server Docker Image Build
- GitHub Check: Pytest (All, Python 3.13, Ubuntu)
- GitHub Check: Pytest (All, Python 3.12, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Analyze (python)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (2)
airbyte_cdk/test/mock_http/response_builder.py (2)
193-195
: Fix isinstance with typing.List - will raise TypeError at runtime.
typing.List
cannot be used withisinstance
- it will raiseTypeError: isinstance() arg 2 cannot be a parameterized generic
at runtime. Could we switch to the concretelist
type instead, wdyt?Apply this diff:
- elif isinstance(self._response, List): + elif isinstance(self._response, list): raise ValueError("pagination_strategy requires the response to be a dict but was list")
265-274
: Fix isinstance with typing types - will raise TypeError at runtime.Lines 269 and 271 use
typing.List
andtyping.Dict
withisinstance
, which will raiseTypeError: isinstance() arg 2 cannot be a parameterized generic
at runtime. Could we switch to the concretelist
anddict
types instead, wdyt?Apply this diff:
def _validate_path_with_response( records_path: Union[FieldPath, NestedPath, RootPath], response_template: Union[Dict[str, Any], List[Dict[str, Any]]], ) -> None: - if isinstance(response_template, List) and not isinstance(records_path, RootPath): + if isinstance(response_template, list) and not isinstance(records_path, RootPath): raise ValueError("templates of type lists require RootPath") - elif isinstance(response_template, Dict) and not isinstance( + elif isinstance(response_template, dict) and not isinstance( records_path, (FieldPath, NestedPath) ): raise ValueError("templates of type dict either require FieldPath or NestedPath")
🧹 Nitpick comments (1)
airbyte_cdk/test/mock_http/response_builder.py (1)
78-95
: Design decision: RootPath as standalone class.The choice to keep
RootPath
separate from thePath
ABC hierarchy avoids the Liskov Substitution Principle violation flagged in previous reviews. The Union type makes the non-substitutability explicit, which is a reasonable approach.One small note: Line 86 declares
value: List[Any]
, while line 83 usesList[Dict[str, Any]]
. Since the actual usage (line 204) passesList[Dict[str, Any]]
, would it make sense to align these signatures for consistency, wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte_cdk/test/mock_http/response_builder.py
(6 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
- GitHub Check: Check: destination-motherduck
- GitHub Check: Check: source-shopify
- GitHub Check: Pytest (All, Python 3.12, Ubuntu)
- GitHub Check: Pytest (All, Python 3.13, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Manifest Server Docker Image Build
- GitHub Check: SDM Docker Image Build
- GitHub Check: Pytest (Fast)
🔇 Additional comments (4)
airbyte_cdk/test/mock_http/response_builder.py (4)
171-175
: LGTM!The parameter updates properly support both dict and list templates, and the validation call ensures compatibility between the template shape and path type at initialization time.
204-204
: LGTM!The
type: ignore
comment is well-justified - the validation in__init__
ensures type safety at runtime, even if the type checker can't prove it statically.
235-244
: LGTM!The parameter updates and validation ensure that
RootPath
is properly supported for record extraction. Thetype: ignore
on line 244 is appropriately justified by the preceding validation.
258-262
: LGTM!The signature updates properly forward the support for list templates and
RootPath
to theHttpResponseBuilder
.
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
3357-3377
: Confirm private factory usage is safe but suggest public API
- Verified
ConcurrentPerPartitionCursor
setsself._cursor_factory
(line 154) and its.create()
produces a freshConcurrentCursor
, satisfyingPaginationTracker
’s expectations.- Consider exposing a public helper (e.g.
cursor.create_leaf_cursor()
) instead of accessing_cursor_factory
directly to avoid brittle private‐member reliance—wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(14 hunks)airbyte_cdk/sources/streams/concurrent/cursor.py
(6 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
airbyte_cdk/sources/streams/concurrent/cursor.py (4)
airbyte_cdk/sources/message/repository.py (2)
MessageRepository
(45-60)NoopMessageRepository
(63-71)airbyte_cdk/sources/types.py (5)
Record
(21-72)StreamSlice
(75-169)partition
(99-104)cursor_slice
(107-112)extra_fields
(115-117)airbyte_cdk/sources/connector_state_manager.py (1)
ConnectorStateManager
(32-161)airbyte_cdk/sources/streams/concurrent/state_converters/abstract_stream_state_converter.py (1)
output_format
(133-137)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (3)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (2)
PaginationResetLimits
(1189-1191)PaginationReset
(2075-2078)airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py (1)
PaginationTracker
(9-64)airbyte_cdk/sources/streams/concurrent/cursor.py (4)
Cursor
(51-87)FinalStateCursor
(90-132)ConcurrentCursor
(135-562)copy_without_state
(139-155)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (12)
- GitHub Check: Check: source-intercom
- GitHub Check: Check: source-pokeapi
- GitHub Check: Check: destination-motherduck
- GitHub Check: Check: source-shopify
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.12, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.13, Ubuntu)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Manifest Server Docker Image Build
🔇 Additional comments (5)
airbyte_cdk/sources/streams/concurrent/cursor.py (4)
139-155
: LGTM!The factory method correctly creates a stateless cursor copy while preserving all configuration parameters. Using
NoopMessageRepository
and an empty state is appropriate for this use case.
44-44
: LGTM!Broadening the return type to
Any
is appropriate since record data can contain various types, and the value is subsequently parsed by_connector_state_converter.parse_value()
.
195-195
: LGTM!The
_is_ascending_order
flag initialization is correct. Note that this is a cursor-level flag (not per-partition), and once set toFalse
, it remains so for the cursor's lifetime—which appears intentional for conservative non-ascending detection.
267-268
: LGTM!The non-ascending order detection correctly identifies when records within a partition are out of order. The cursor-level flag means that if any partition exhibits non-ascending behavior, all subsequent
reduce_slice_range
calls will log warnings—a conservative approach that seems intentional.airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
3191-3192
: LGTM—defensive cursor fallback is sensible.Creating a
FinalStateCursor
whencursor
isNone
ensures the stream emits at least one state message even if no cursor was provided upstream, which is a solid fallback for full-refresh or edge cases.
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
airbyte_cdk/sources/streams/concurrent/cursor.py (1)
5-5
: Runruff format
to unblock CI.CI is red because Ruff wants to reformat this file. Could you run
ruff format airbyte_cdk/sources/streams/concurrent/cursor.py
(orruff format .
) and include the result so the linter passes, wdyt?
♻️ Duplicate comments (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
3331-3337
: Should we also check for client-side incremental sync in this validation, wdyt?The validation correctly prevents using
PaginationResetLimits
with a record filter. However, whenis_client_side_incremental_sync
isTrue
, aClientSideIncrementalRecordFilterDecorator
is also added (lines 3074-3082 increate_record_selector
). Should we extend this validation to also rejectPaginationResetLimits
when client-side incremental sync is enabled?Apply this diff to broaden the validation:
if ( - model.record_selector.record_filter + (model.record_selector.record_filter or is_client_side_incremental_sync) and model.pagination_reset and model.pagination_reset.limits ): raise ValueError("PaginationResetLimits are not supported while having record filter.")
🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
3357-3378
: Consider exposing a public API for cursor factory access, wdyt?The method correctly handles different cursor types and creates appropriate
PaginationTracker
factories. However, line 3368 accesses the private attributecursor._cursor_factory
fromConcurrentPerPartitionCursor
. While the comment in theConcurrentPerPartitionCursor
class acknowledges this usage, accessing private attributes can lead to maintenance issues if the internal implementation changes.Would it make sense to add a public method to
ConcurrentPerPartitionCursor
to retrieve the cursor factory, reducing coupling to internal implementation details?Additionally, the two
# type: ignore
comments on lines 3366 and 3368 suggest type checking issues. Could we refine the types to eliminate these suppressions and improve type safety?Example approach for the public API:
Add to
ConcurrentPerPartitionCursor
:def get_cursor_factory(self) -> ConcurrentCursorFactory: """Return the cursor factory for creating new cursor instances.""" return self._cursor_factoryThen update line 3368:
cursor_factory = lambda: cursor.get_cursor_factory().create( {}, datetime.timedelta(0) )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(14 hunks)airbyte_cdk/sources/streams/concurrent/cursor.py
(6 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (2)
PaginationResetLimits
(1189-1191)PaginationReset
(2075-2078)airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py (1)
PaginationTracker
(9-64)airbyte_cdk/sources/streams/concurrent/cursor.py (4)
Cursor
(51-87)FinalStateCursor
(90-132)ConcurrentCursor
(135-565)copy_without_state
(139-155)airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (2)
ConcurrentPerPartitionCursor
(106-660)create
(98-103)
airbyte_cdk/sources/streams/concurrent/cursor.py (4)
airbyte_cdk/sources/message/repository.py (2)
MessageRepository
(45-60)NoopMessageRepository
(63-71)airbyte_cdk/sources/types.py (5)
Record
(21-72)StreamSlice
(75-169)partition
(99-104)cursor_slice
(107-112)extra_fields
(115-117)airbyte_cdk/sources/connector_state_manager.py (1)
ConnectorStateManager
(32-161)airbyte_cdk/sources/streams/concurrent/state_converters/abstract_stream_state_converter.py (1)
output_format
(133-137)
🪛 GitHub Actions: Linters
airbyte_cdk/sources/streams/concurrent/cursor.py
[error] 555-555: Ruff format check failed. 1 file would be reformatted. Run 'ruff format' to fix style issues.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
- GitHub Check: Check: source-shopify
- GitHub Check: Check: destination-motherduck
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.13, Ubuntu)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Pytest (All, Python 3.12, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Manifest Server Docker Image Build
- GitHub Check: Pytest (Fast)
🔇 Additional comments (5)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (5)
119-120
: LGTM on imports and logger setup!The new imports for pagination reset models and the module-level logger are correctly placed and will support the pagination tracker functionality introduced in this PR.
Also applies to: 362-364, 536-536, 652-652
2062-2065
: LGTM on cursor-based parameter passing!The new parameters (
has_stop_condition_cursor
,is_client_side_incremental_sync
,cursor
) correctly wire the cursor context through to the retriever, supporting the pagination reset feature.
3058-3058
: LGTM on record selector cursor integration!The refactor to pass
client_side_incremental_sync_cursor
instead of a boolean flag is cleaner and allows theClientSideIncrementalRecordFilterDecorator
to operate directly with the cursor instance.Also applies to: 3074-3082
3191-3193
: LGTM on default cursor creation!Creating a
FinalStateCursor
when no cursor is provided ensures the retriever always has a valid cursor instance, preventing null pointer issues downstream.
3351-3353
: LGTM on pagination tracker factory wiring!The
pagination_tracker_factory
parameter correctly integrates the pagination reset functionality into theSimpleRetriever
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (2)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (2)
439-457
: Guard _next_page_token with a non-None response to drop the type ignoreThis removes the need for the type: ignore and avoids accidental calls with None, wdyt?
- else: + elif response: last_page_token_value = ( next_page_token.get("next_page_token") if next_page_token else None ) - next_page_token = self._next_page_token( - response=response, # type:ignore # we are breaking from the loop on the try/else if there are no response so this should be fine + next_page_token = self._next_page_token( + response=response, last_page_size=last_page_size, last_record=last_record, last_page_token_value=last_page_token_value, ) if not next_page_token: break + else: + break
391-401
: Don’t mutate the original stream_slice inside the chunking loopOverwriting stream_slice per chunk causes resets to operate on the last chunk (and leak query_properties into subsequent attempts). Can we keep the original slice and use a local chunked_slice for requests, wdyt?
- for properties in self.additional_query_properties.get_request_property_chunks( + for properties in self.additional_query_properties.get_request_property_chunks( stream_slice=stream_slice ): - stream_slice = StreamSlice( - partition=stream_slice.partition or {}, - cursor_slice=stream_slice.cursor_slice or {}, - extra_fields={"query_properties": properties}, - ) - response = self._fetch_next_page( - stream_state, stream_slice, next_page_token - ) + chunked_slice = StreamSlice( + partition=stream_slice.partition or {}, + cursor_slice=stream_slice.cursor_slice or {}, + extra_fields={"query_properties": properties}, + ) + response = self._fetch_next_page( + stream_state, chunked_slice, next_page_token + )
🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
3334-3340
: Extend limits validation to client-side incremental filtering tooIf client-side incremental is enabled, a RecordFilter decorator is applied implicitly; we should also reject PaginationResetLimits in that case to avoid filtered counts. Add this guard, wdyt?
- if ( - model.record_selector.record_filter - and model.pagination_reset - and model.pagination_reset.limits - ): + if ( + (model.record_selector.record_filter or self._is_client_side_filtering_enabled(model)) + and model.pagination_reset + and model.pagination_reset.limits + ): raise ValueError("PaginationResetLimits are not supported while having record filter.")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(14 hunks)airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py
(1 hunks)airbyte_cdk/sources/declarative/retrievers/simple_retriever.py
(4 hunks)unit_tests/sources/declarative/retrievers/test_pagination_tracker.py
(1 hunks)unit_tests/sources/declarative/test_concurrent_declarative_source.py
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- unit_tests/sources/declarative/test_concurrent_declarative_source.py
🧰 Additional context used
🧬 Code graph analysis (4)
unit_tests/sources/declarative/retrievers/test_pagination_tracker.py (5)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
FailureType
(546-549)airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py (4)
PaginationTracker
(9-64)observe
(36-39)has_reached_limit
(41-42)reduce_slice_range_if_possible
(47-64)airbyte_cdk/sources/types.py (6)
Record
(21-72)StreamSlice
(75-169)data
(35-36)associated_slice
(39-40)partition
(99-104)cursor_slice
(107-112)airbyte_cdk/sources/streams/concurrent/cursor.py (5)
ConcurrentCursor
(135-565)observe
(57-61)observe
(112-113)observe
(250-270)reduce_slice_range
(541-565)airbyte_cdk/utils/traced_exception.py (1)
AirbyteTracedException
(25-145)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (3)
PaginationResetLimits
(1189-1191)PaginationReset
(2075-2078)Action1
(1184-1186)airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py (1)
PaginationTracker
(9-64)airbyte_cdk/sources/streams/concurrent/cursor.py (4)
Cursor
(51-87)FinalStateCursor
(90-132)ConcurrentCursor
(135-565)copy_without_state
(139-155)airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (2)
ConcurrentPerPartitionCursor
(106-660)create
(98-103)
airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py (4)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
FailureType
(546-549)airbyte_cdk/sources/types.py (2)
Record
(21-72)StreamSlice
(75-169)airbyte_cdk/sources/streams/concurrent/cursor.py (4)
observe
(57-61)observe
(112-113)observe
(250-270)reduce_slice_range
(541-565)airbyte_cdk/utils/traced_exception.py (1)
AirbyteTracedException
(25-145)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (6)
airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py (4)
PaginationTracker
(9-64)observe
(36-39)has_reached_limit
(41-42)reduce_slice_range_if_possible
(47-64)airbyte_cdk/sources/streams/concurrent/cursor.py (4)
Cursor
(51-87)observe
(57-61)observe
(112-113)observe
(250-270)airbyte_cdk/sources/streams/http/pagination_reset_exception.py (1)
PaginationResetRequiredException
(1-2)airbyte_cdk/sources/types.py (8)
Record
(21-72)StreamSlice
(75-169)partition
(99-104)cursor_slice
(107-112)extra_fields
(115-117)data
(35-36)associated_slice
(39-40)get
(146-147)airbyte_cdk/sources/declarative/requesters/paginators/default_paginator.py (4)
next_page_token
(136-152)next_page_token
(251-264)get_initial_token
(127-134)get_initial_token
(247-249)airbyte_cdk/sources/declarative/requesters/paginators/paginator.py (2)
next_page_token
(33-49)get_initial_token
(27-30)
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
[error] 5-641: I001 Import block is unsorted or un-formatted. Found 1 fixable with the --fix
option.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (12)
- GitHub Check: Check: source-intercom
- GitHub Check: Check: destination-motherduck
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: Check: source-pokeapi
- GitHub Check: Check: source-shopify
- GitHub Check: Manifest Server Docker Image Build
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.13, Ubuntu)
- GitHub Check: Pytest (All, Python 3.12, Ubuntu)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
🔇 Additional comments (3)
unit_tests/sources/declarative/retrievers/test_pagination_tracker.py (1)
23-87
: Good coverage of tracker behaviorsTests exercise observe, limits, and slice reduction (with/without cursor) well. No issues spotted.
airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py (1)
44-64
: Tracker logic looks consistentAttempt counting and record-count reset per reduce are coherent with the intended flow and tests.
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
5-11
: Fix import sorting (linters failing I001)Imports are unsorted/unformatted. Could you run the formatter (e.g., ruff/isort) to resolve I001, wdyt?
Re-run linters locally:
- ruff check . --fix
- or isort . --profile black
## What Blocked by airbytehq/airbyte-python-cdk#781 ## How Supporting pagination reset using cursor ## Review guide <!-- 1. `x.py` 2. `y.py` --> ## User Impact <!-- * What is the end result perceived by the user? * If there are negative side effects, please list them. --> ## Can this PR be safely reverted and rolled back? <!-- * If unsure, leave it blank. --> - [x] YES 💚 - [ ] NO ❌ --------- Co-authored-by: Octavia Squidington III <[email protected]>
## What Blocked by airbytehq/airbyte-python-cdk#781 ## How Supporting pagination reset using cursor ## Review guide <!-- 1. `x.py` 2. `y.py` --> ## User Impact <!-- * What is the end result perceived by the user? * If there are negative side effects, please list them. --> ## Can this PR be safely reverted and rolled back? <!-- * If unsure, leave it blank. --> - [x] YES 💚 - [ ] NO ❌ --------- Co-authored-by: Octavia Squidington III <[email protected]>
What
Detailed use cases defined here
source-freshdesk PR: airbytehq/airbyte#67109
How
Update SimpleRetriever to have a pagination tracker and handle
PaginationResetRequiredException
. I also refactored the SimpleRetriever to group all the logic for query properties in the same code block which I find else in terms of readability but I can revert that if neededSummary by CodeRabbit
New Features
Tests