Skip to content

Conversation

maxi297
Copy link
Contributor

@maxi297 maxi297 commented Oct 6, 2025

What

Detailed use cases defined here

source-freshdesk PR: airbytehq/airbyte#67109

How

Update SimpleRetriever to have a pagination tracker and handle PaginationResetRequiredException. I also refactored the SimpleRetriever to group all the logic for query properties in the same code block which I find else in terms of readability but I can revert that if needed

Summary by CodeRabbit

  • New Features

    • Added a pagination-reset flow: new response action to trigger pagination resets, per-retriever pagination_reset configuration with optional record-count limits, a PaginationTracker to manage counts/retries and slice reduction, HTTP client support for triggering resets, and cursor improvements for slice reduction and out-of-order detection.
    • Response builder now supports root-level array templates.
  • Tests

    • Added tests covering pagination tracker, retriever reset behavior, slice reduction, and reset-triggering responses.

Copy link

github-actions bot commented Oct 6, 2025

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

Testing This CDK Version

You can test this version of the CDK using the following:

# Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@maxi297/pagination_reset#egg=airbyte-python-cdk[dev]' --help

# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch maxi297/pagination_reset

Helpful Resources

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /autofix - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test - Runs connector tests with the updated CDK
  • /poe build - Regenerate git-committed build artifacts, such as the pydantic models which are generated from the manifest JSON schema in YAML.
  • /poe <command> - Runs any poe command in the CDK environment

📝 Edit this welcome message.

@github-actions github-actions bot added the enhancement New feature or request label Oct 6, 2025
@maxi297 maxi297 requested review from brianjlai and tolik0 October 6, 2025 13:14
@maxi297
Copy link
Contributor Author

maxi297 commented Oct 6, 2025

/autofix

Auto-Fix Job Info

This job attempts to auto-fix any linting or formating issues. If any fixes are made,
those changes will be automatically committed and pushed back to the PR.

Note: This job can only be run by maintainers. On PRs from forks, this command requires
that the PR author has enabled the Allow edits from maintainers option.

PR auto-fix job started... Check job output.

🟦 Job completed successfully (no changes).

Copy link

github-actions bot commented Oct 6, 2025

PyTest Results (Fast)

3 796 tests  +17   3 784 ✅ +17   6m 32s ⏱️ +4s
    1 suites ± 0      12 💤 ± 0 
    1 files   ± 0       0 ❌ ± 0 

Results for commit 92b9dff. ± Comparison against base commit c67570b.

♻️ This comment has been updated with latest results.

Copy link
Contributor

coderabbitai bot commented Oct 6, 2025

📝 Walkthrough

Walkthrough

Adds pagination-reset support: new PaginationReset and PaginationResetLimits schemas/models, RESET_PAGINATION response action and exception, PaginationTracker with slice-reduction and attempt tracking, wiring into SimpleRetriever and ModelToComponentFactory, HTTP client raising on reset action, RootPath support for list responses, and unit tests for reset/limit flows.

Changes

Cohort / File(s) Summary
Schema
airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Adds PaginationReset and PaginationResetLimits; extends HttpResponseFilter enum with RESET_PAGINATION; adds pagination_reset on SimpleRetriever.
Models / Schema binding
airbyte_cdk/sources/declarative/models/declarative_component_schema.py
Adds PaginationReset, PaginationResetLimits, supporting enum/class (Action1 / new enum members), and RESET_PAGINATION; adds pagination_reset field to SimpleRetriever.
Factory / parser
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Cursor-centric signature changes; adds _create_pagination_tracker_factory; wires pagination_tracker_factory into retriever construction; validates incompatible configurations; adds module LOGGER.
Retriever logic
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py
Adds pagination_tracker_factory field and _get_initial_next_page_token; integrates PaginationTracker per read, handles PaginationResetRequiredException, slice-reduction, reset/retry flows and logging.
Pagination tracker
airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py
New PaginationTracker class: observes records, enforces optional max records, delegates slice reduction to cursor, tracks same-slice attempts, raises traced exception on repeated non-reduction.
Concurrent cursor
airbyte_cdk/sources/streams/concurrent/cursor.py
Adds _is_ascending_order tracking, ordering detection, reduce_slice_range to shrink slices using most-recent cursor values, copy_without_state helper; relaxes extract_value return type.
HTTP layer & exception
airbyte_cdk/sources/streams/http/error_handlers/response_models.py
airbyte_cdk/sources/streams/http/http_client.py
airbyte_cdk/sources/streams/http/pagination_reset_exception.py
Adds RESET_PAGINATION response action; http_client raises PaginationResetRequiredException when encountered; new exception class added.
Request error handling
airbyte_cdk/sources/declarative/requesters/error_handlers/composite_error_handler.py
interpret_response treats RESET_PAGINATION as a terminal action (early return) alongside SUCCESS/RETRY/IGNORE.
Test utilities
airbyte_cdk/test/mock_http/response_builder.py
Adds RootPath to support root-level array responses; extends response/record/cursor path types to accept RootPath; runtime validation of path vs template shape; updates pagination/record builder signatures.
Minor / comment-only
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
Adds inline comment only; no behavioral change.
Tests
unit_tests/sources/declarative/retrievers/test_pagination_tracker.py
unit_tests/sources/declarative/retrievers/test_simple_retriever.py
unit_tests/sources/declarative/test_concurrent_declarative_source.py
Adds unit tests for PaginationTracker, SimpleRetriever reset/limit flows, and an end-to-end RESET_PAGINATION scenario in concurrent declarative source tests (note: duplicate test function appears).

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant SR as SimpleRetriever
  participant PT as PaginationTracker
  participant P as Paginator
  participant HC as HttpClient
  participant EH as ErrorHandler
  participant CC as ConcurrentCursor

  Note over SR,PT: create tracker per read-run
  SR->>PT: tracker = pagination_tracker_factory()
  SR->>P: compute initial next page token

  loop pages for a slice
    SR->>HC: request(page token)
    HC->>EH: evaluate response
    alt action == RESET_PAGINATION
      EH-->>HC: RESET_PAGINATION
      HC-->>SR: raise PaginationResetRequiredException
      SR->>PT: reset()
      SR-->>SR: reinitialize paginator and retry same slice
    else SUCCESS
      HC-->>SR: records + next token
      SR->>PT: observe(record)*
      alt PT.has_reached_limit()
        SR->>CC: reduce_slice_range(slice)
        CC-->>SR: reduced slice
        SR->>PT: reset()
        SR-->>SR: restart with reduced slice
      else
        SR-->>SR: continue pagination
      end
    end
  end
Loading
sequenceDiagram
  autonumber
  participant MF as ModelToComponentFactory
  participant CUR as Cursor
  participant SR as SimpleRetriever
  participant PT as PaginationTracker

  MF->>CUR: resolve/create cursor
  MF->>MF: _create_pagination_tracker_factory(model.pagination_reset, cursor)
  MF-->>SR: instantiate SimpleRetriever(..., cursor, pagination_tracker_factory, ...)
  SR->>PT: create tracker per run via factory
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Suggested reviewers

  • tolik0
  • brianjlai

Want a brief checklist of real-connector scenarios to validate PaginationReset end-to-end, wdyt?

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 10.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The title “feat: pagination reset” concisely summarises the primary enhancement of adding pagination reset behavior, aligning with the introduction of the PaginationTracker and exception handling. It uses a clear conventional prefix and highlights the key feature without extraneous detail. This ensures that anyone scanning the history understands the focus of the changeset.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch maxi297/pagination_reset

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (1)

374-465: Restore typing to satisfy mypy

The CI failures stem from the new _get_initial_next_page_token helper lacking type hints and being called from typed contexts. Could we add a precise signature (e.g. returning Optional[Mapping[str, Any]]), adjust the local next_page_token variable annotations, and ensure _next_page_token continues to receive a non-optional response (maybe by guarding the call or annotating response)? This should clear the no-untyped-call and arg-type errors flagged by mypy, wdyt?

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c67570b and a2bc95d.

📒 Files selected for processing (13)
  • airbyte_cdk/sources/declarative/declarative_component_schema.yaml (3 hunks)
  • airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1 hunks)
  • airbyte_cdk/sources/declarative/models/declarative_component_schema.py (5 hunks)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (14 hunks)
  • airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py (1 hunks)
  • airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (5 hunks)
  • airbyte_cdk/sources/streams/concurrent/cursor.py (4 hunks)
  • airbyte_cdk/sources/streams/http/error_handlers/response_models.py (1 hunks)
  • airbyte_cdk/sources/streams/http/http_client.py (2 hunks)
  • airbyte_cdk/sources/streams/http/pagination_reset_exception.py (1 hunks)
  • unit_tests/sources/declarative/retrievers/test_pagination_tracker.py (1 hunks)
  • unit_tests/sources/declarative/retrievers/test_simple_retriever.py (3 hunks)
  • unit_tests/sources/declarative/test_concurrent_declarative_source.py (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (8)
airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py (5)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
  • FailureType (546-549)
airbyte_cdk/sources/types.py (2)
  • Record (21-72)
  • StreamSlice (75-169)
airbyte_cdk/sources/streams/concurrent/cursor.py (4)
  • observe (57-61)
  • observe (112-113)
  • observe (232-252)
  • reduce_slice_range (523-544)
airbyte_cdk/utils/traced_exception.py (1)
  • AirbyteTracedException (25-145)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)
  • observe (510-543)
unit_tests/sources/declarative/retrievers/test_simple_retriever.py (5)
airbyte_cdk/sources/declarative/extractors/http_selector.py (2)
  • HttpSelector (13-37)
  • select_records (20-37)
airbyte_cdk/sources/declarative/requesters/paginators/paginator.py (3)
  • Paginator (18-65)
  • get_initial_token (27-30)
  • next_page_token (33-49)
airbyte_cdk/sources/declarative/requesters/requester.py (7)
  • HttpMethod (18-26)
  • Requester (29-156)
  • send_request (138-156)
  • get_request_params (80-91)
  • get_request_headers (94-103)
  • get_request_body_data (106-121)
  • get_request_body_json (124-135)
airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py (4)
  • PaginationTracker (9-61)
  • has_reached_limit (41-42)
  • reset (44-45)
  • reduce_slice_range_if_possible (47-61)
airbyte_cdk/sources/streams/http/pagination_reset_exception.py (1)
  • PaginationResetRequiredException (1-2)
airbyte_cdk/sources/streams/concurrent/cursor.py (3)
airbyte_cdk/sources/types.py (5)
  • Record (21-72)
  • StreamSlice (75-169)
  • partition (99-104)
  • cursor_slice (107-112)
  • extra_fields (115-117)
airbyte_cdk/sources/streams/concurrent/state_converters/abstract_stream_state_converter.py (1)
  • output_format (133-137)
airbyte_cdk/sources/streams/concurrent/state_converters/datetime_stream_state_converter.py (4)
  • output_format (46-46)
  • output_format (133-134)
  • output_format (170-182)
  • output_format (214-215)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (4)
airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py (5)
  • PaginationTracker (9-61)
  • observe (36-39)
  • has_reached_limit (41-42)
  • reset (44-45)
  • reduce_slice_range_if_possible (47-61)
airbyte_cdk/sources/streams/http/pagination_reset_exception.py (1)
  • PaginationResetRequiredException (1-2)
airbyte_cdk/sources/types.py (7)
  • Record (21-72)
  • StreamSlice (75-169)
  • partition (99-104)
  • cursor_slice (107-112)
  • extra_fields (115-117)
  • data (35-36)
  • associated_slice (39-40)
airbyte_cdk/sources/declarative/requesters/paginators/default_paginator.py (2)
  • get_initial_token (127-134)
  • get_initial_token (247-249)
unit_tests/sources/declarative/retrievers/test_pagination_tracker.py (5)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
  • FailureType (546-549)
airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py (5)
  • PaginationTracker (9-61)
  • observe (36-39)
  • has_reached_limit (41-42)
  • reset (44-45)
  • reduce_slice_range_if_possible (47-61)
airbyte_cdk/sources/types.py (6)
  • Record (21-72)
  • StreamSlice (75-169)
  • data (35-36)
  • associated_slice (39-40)
  • partition (99-104)
  • cursor_slice (107-112)
airbyte_cdk/sources/streams/concurrent/cursor.py (5)
  • ConcurrentCursor (135-544)
  • observe (57-61)
  • observe (112-113)
  • observe (232-252)
  • reduce_slice_range (523-544)
airbyte_cdk/utils/traced_exception.py (1)
  • AirbyteTracedException (25-145)
unit_tests/sources/declarative/test_concurrent_declarative_source.py (4)
unit_tests/sources/test_source.py (2)
  • catalog (70-93)
  • source (65-66)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)
  • read (365-380)
airbyte_cdk/test/mock_http/mocker.py (2)
  • HttpMocker (25-185)
  • get (94-95)
airbyte_cdk/test/mock_http/request.py (1)
  • HttpRequest (14-103)
airbyte_cdk/sources/streams/http/http_client.py (2)
airbyte_cdk/sources/streams/http/pagination_reset_exception.py (1)
  • PaginationResetRequiredException (1-2)
airbyte_cdk/sources/streams/http/error_handlers/response_models.py (1)
  • ResponseAction (14-20)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (5)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (2)
  • PaginationResetLimits (1189-1191)
  • PaginationReset (2075-2078)
airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py (1)
  • PaginationTracker (9-61)
airbyte_cdk/sources/streams/concurrent/cursor.py (3)
  • Cursor (51-87)
  • FinalStateCursor (90-132)
  • ConcurrentCursor (135-544)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (2)
  • ConcurrentPerPartitionCursor (106-660)
  • create (98-103)
airbyte_cdk/sources/concurrent_source/concurrent_source.py (1)
  • create (41-71)
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py

[error] 376-376: Call to untyped function "_get_initial_next_page_token" in typed context [no-untyped-call]. Command: poetry run mypy --config-file mypy.ini airbyte_cdk


[error] 438-438: Call to untyped function "_get_initial_next_page_token" in typed context [no-untyped-call]. Command: poetry run mypy --config-file mypy.ini airbyte_cdk


[error] 450-450: Argument "response" to "_next_page_token" of "SimpleRetriever" has incompatible type "Response | None"; expected "Response" [arg-type]. Command: poetry run mypy --config-file mypy.ini airbyte_cdk


[error] 461-461: Function is missing a return type annotation [no-untyped-def]. Command: poetry run mypy --config-file mypy.ini airbyte_cdk

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

[error] 3352-3352: Argument 1 to "_create_pagination_tracker_factory" of "ModelToComponentFactory" has incompatible type "PaginationReset | None"; expected "PaginationReset" [arg-type]. Command: poetry run mypy --config-file mypy.ini airbyte_cdk

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (12)
  • GitHub Check: Check: source-intercom
  • GitHub Check: Check: source-pokeapi
  • GitHub Check: Check: source-hardcoded-records
  • GitHub Check: Check: source-shopify
  • GitHub Check: Check: destination-motherduck
  • GitHub Check: Pytest (All, Python 3.12, Ubuntu)
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Manifest Server Docker Image Build
  • GitHub Check: Pytest (All, Python 3.13, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (Fast)
🔇 Additional comments (1)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)

154-154: Thanks for capturing the intent here

Really appreciate the context on why _cursor_factory stays accessible—it’ll save future spelunking when we revisit pagination reset wiring. Nicely done!

Copy link

github-actions bot commented Oct 6, 2025

PyTest Results (Full)

3 799 tests  +17   3 787 ✅ +17   11m 3s ⏱️ +9s
    1 suites ± 0      12 💤 ± 0 
    1 files   ± 0       0 ❌ ± 0 

Results for commit 92b9dff. ± Comparison against base commit c67570b.

♻️ This comment has been updated with latest results.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (1)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (1)

387-425: Preserve the original stream_slice to avoid incorrect reset logic.

The property chunking branch overwrites stream_slice at lines 394-398, which means when a reset occurs (line 443), reduce_slice_range_if_possible operates on the last chunk rather than the original slice. Could we preserve the original by introducing a chunked_slice variable and passing that to _fetch_next_page, wdyt?

Based on past review comments.

Apply this diff:

                 if (
                     self.additional_query_properties
                     and self.additional_query_properties.property_chunking
                 ):
+                    original_slice = stream_slice
                     for properties in self.additional_query_properties.get_request_property_chunks(
                         stream_slice=stream_slice
                     ):
-                        stream_slice = StreamSlice(
+                        chunked_slice = StreamSlice(
                             partition=stream_slice.partition or {},
                             cursor_slice=stream_slice.cursor_slice or {},
                             extra_fields={"query_properties": properties},
                         )
                         response = self._fetch_next_page(
-                            stream_state, stream_slice, next_page_token
+                            stream_state, chunked_slice, next_page_token
                         )
 
                         for current_record in records_generator_fn(response):
                             merge_key = (
                                 self.additional_query_properties.property_chunking.get_merge_key(
                                     current_record
                                 )
                             )
                             if merge_key:
                                 _deep_merge(merged_records[merge_key], current_record)
                             else:
                                 # We should still emit records even if the record did not have a merge key
                                 pagination_tracker.observe(current_record)
                                 last_page_size += 1
                                 last_record = current_record
                                 yield current_record
 
                     for merged_record in merged_records.values():
                         record = Record(
-                            data=merged_record, stream_name=self.name, associated_slice=stream_slice
+                            data=merged_record, stream_name=self.name, associated_slice=original_slice
                         )

Then use original_slice (instead of the mutated stream_slice) at line 443:

-                stream_slice = pagination_tracker.reduce_slice_range_if_possible(stream_slice)
+                stream_slice = pagination_tracker.reduce_slice_range_if_possible(original_slice)
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a2bc95d and f90b110.

📒 Files selected for processing (1)
  • airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (5 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (4)
airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py (5)
  • PaginationTracker (9-61)
  • observe (36-39)
  • has_reached_limit (41-42)
  • reset (44-45)
  • reduce_slice_range_if_possible (47-61)
airbyte_cdk/sources/streams/concurrent/cursor.py (4)
  • Cursor (51-87)
  • observe (57-61)
  • observe (112-113)
  • observe (232-252)
airbyte_cdk/sources/streams/http/pagination_reset_exception.py (1)
  • PaginationResetRequiredException (1-2)
airbyte_cdk/sources/types.py (7)
  • Record (21-72)
  • StreamSlice (75-169)
  • partition (99-104)
  • cursor_slice (107-112)
  • extra_fields (115-117)
  • data (35-36)
  • associated_slice (39-40)
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py

[error] 379-379: Call to untyped function "_get_initial_next_page_token" in typed context [no-untyped-call]


[error] 441-441: Call to untyped function "_get_initial_next_page_token" in typed context [no-untyped-call]


[error] 453-453: Argument "response" to "_next_page_token" of "SimpleRetriever" has incompatible type "Response | None"; expected "Response" [arg-type]


[error] 464-464: Function is missing a return type annotation [no-untyped-def]

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (12)
  • GitHub Check: Check: source-intercom
  • GitHub Check: Check: source-pokeapi
  • GitHub Check: Check: source-shopify
  • GitHub Check: Check: destination-motherduck
  • GitHub Check: Check: source-hardcoded-records
  • GitHub Check: Manifest Server Docker Image Build
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.13, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.12, Ubuntu)
🔇 Additional comments (4)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (4)

6-6: LGTM!

The new imports support the pagination reset functionality and are all used appropriately in the refactored code.

Also applies to: 42-42, 46-46, 48-50, 55-55


101-103: LGTM!

The factory pattern enables per-call tracker creation and supports dependency injection for testing, which is a solid approach.


106-108: Verify that nullifying concurrent cursors is the intended behavior.

This logic sets self.cursor = None for Cursor instances to prevent SimpleRetriever from managing ConcurrentCursor state. While this appears intentional for backward compatibility with CustomRetriever implementations, ensure this doesn't silently break cursor functionality for consumers expecting the cursor to be present, wdyt?


433-447: Clarify the reset vs. limit-reached flow.

Both PaginationResetRequiredException and has_reached_limit() trigger the same reset logic. Is the intent that they're treated identically, or should limit-reached have slightly different handling (e.g., not reducing the slice)? The current implementation seems correct if both cases require a retry with a narrowed slice, wdyt?

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (2)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (2)

377-425: Preserve the original stream_slice before property chunking.

Inside the property chunking loop (line 394), stream_slice is reassigned for each chunk. After the loop completes, stream_slice holds the last chunk's slice instead of the original. When a reset occurs (line 443), reduce_slice_range_if_possible operates on this mutated slice rather than the original one, and the log message (line 444-446) also references the wrong slice.

Could we introduce a local variable like original_stream_slice to preserve the slice passed into _read_pages, then use that for reset logic while keeping stream_slice (or a new current_chunk_slice) for the chunking loop, wdyt?

Based on past review comments.


464-467: Add return type annotation to _get_initial_next_page_token.

This helper is missing a return type annotation, which mypy flags as an error in typed contexts (lines 379, 441).

Could we annotate it as -> Optional[Mapping[str, Any]] to match the return value structure and resolve the type-checking errors, wdyt?

Apply this diff to add the missing return type:

-    def _get_initial_next_page_token(self):
+    def _get_initial_next_page_token(self) -> Optional[Mapping[str, Any]]:
         initial_token = self._paginator.get_initial_token()
         next_page_token = {"next_page_token": initial_token} if initial_token is not None else None
         return next_page_token

Based on past review comments.

🧹 Nitpick comments (2)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (1)

106-108: Clarify the cursor validation logic.

The comment explains this is for CustomRetriever compatibility, but setting self.cursor = None if isinstance(self.cursor, Cursor) else self.cursor reads as "nullify Cursor instances but keep everything else." Is this the intended behavior? It seems counterintuitive – typically we'd want to keep Cursor instances and reject incompatible types, wdyt?

Could we add more context about why Cursor instances need to be nullified for CustomRetriever, or would it be cleaner to handle this in the CustomRetriever class itself?

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)

3331-3336: Fix typo in error message.

The error message on line 3336 has a grammatical error: "are not support" should be "are not supported".

Apply this diff to correct the typo:

-            raise ValueError("PaginationResetLimits are not support while having record filter.")
+            raise ValueError("PaginationResetLimits are not supported while having a record filter.")
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f90b110 and 3fb450c.

📒 Files selected for processing (2)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (14 hunks)
  • airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (4 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (3)
airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py (5)
  • PaginationTracker (9-61)
  • observe (36-39)
  • has_reached_limit (41-42)
  • reset (44-45)
  • reduce_slice_range_if_possible (47-61)
airbyte_cdk/sources/streams/concurrent/cursor.py (4)
  • Cursor (51-87)
  • observe (57-61)
  • observe (112-113)
  • observe (232-252)
airbyte_cdk/sources/types.py (8)
  • Record (21-72)
  • StreamSlice (75-169)
  • partition (99-104)
  • cursor_slice (107-112)
  • extra_fields (115-117)
  • data (35-36)
  • associated_slice (39-40)
  • get (146-147)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (2)
  • PaginationResetLimits (1189-1191)
  • PaginationReset (2075-2078)
airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py (1)
  • PaginationTracker (9-61)
airbyte_cdk/sources/streams/concurrent/cursor.py (3)
  • Cursor (51-87)
  • FinalStateCursor (90-132)
  • ConcurrentCursor (135-544)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (2)
  • ConcurrentPerPartitionCursor (106-660)
  • create (98-103)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (13)
  • GitHub Check: Check: source-pokeapi
  • GitHub Check: Check: source-intercom
  • GitHub Check: Check: source-hardcoded-records
  • GitHub Check: Check: source-shopify
  • GitHub Check: Check: destination-motherduck
  • GitHub Check: Manifest Server Docker Image Build
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Pytest (All, Python 3.12, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.13, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Analyze (python)
🔇 Additional comments (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)

3357-3377: LGTM on the pagination tracker factory logic!

The method correctly handles the Optional[PaginationResetModel] parameter (addressing the past review concern) and safely extracts the cursor for pagination tracking based on its type. The fallback to PaginationTracker() when no model is provided is sensible.

Nice work ensuring the cursor selection logic handles ConcurrentCursor, ConcurrentPerPartitionCursor, and the FinalStateCursor fallback with appropriate logging!

Based on past review comments.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (1)
airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py (1)

48-64: Slice-reduction attempt logic looks correct and addresses prior feedback

The attempt counter only increments on unchanged slices and resets on reduction; the allowed-attempts policy (1 with cursor, 2 without) is clear. This resolves the earlier “reset attempt counter per slice” issue noted previously—nice cleanup.

🧹 Nitpick comments (4)
airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py (4)

27-33: Replace no-op triple-quoted string with comments or move into the class docstring

The standalone triple-quoted string inside __init__ is a no-op. Would you switch it to comments or incorporate it into the class docstring for clarity, wdyt?

-        """
-        Given we have a cursor, we do not allow for the same slice to be processed twice because we assume we will
-        always process the same slice.
-
-        Given no cursor, we assume that the pagination reset is for retrying purposes and we allow to retry once.
-        """
+        # Given we have a cursor, we do not allow the same slice to be processed twice because we assume
+        # we will always process the same slice.
+        #
+        # Given no cursor, we assume that the pagination reset is for retrying purposes and we allow
+        # one retry for the same slice.

36-40: Confirm tracker instance scope vs concurrency

_record_count increments aren’t synchronized. If a single PaginationTracker instance is shared across threads/partitions, increments could race. Are we guaranteed a per-partition/serialized use of the tracker (similar to the cursor’s assumptions)? If not, shall we add a light lock around observe to guard _record_count, wdyt?


41-43: Clarify non-positive limit behavior

If max_number_of_records is 0 or negative, has_reached_limit() immediately returns True at start. Should we treat non-positive values as “no limit” (None) or raise in __init__? I can patch it to normalize to None, wdyt?

-        self._limit = max_number_of_records
+        self._limit = max_number_of_records
+        if self._limit is not None and self._limit <= 0:
+            # Treat non-positive limits as unlimited
+            self._limit = None

57-60: Add a user-friendly exception message for UI consumers

Would you add a message= for AirbyteTracedException so users see a clearer error in the UI (while keeping internal_message intact), wdyt?

-                raise AirbyteTracedException(
-                    internal_message=f"There were {self._number_of_attempt_with_same_slice} attempts with the same slice already while the max allowed is {self._allowed_number_of_attempt_with_same_slice}",
-                    failure_type=FailureType.system_error,
-                )
+                raise AirbyteTracedException(
+                    internal_message=(
+                        f"There were {self._number_of_attempt_with_same_slice} attempts with the same "
+                        f"slice already while the max allowed is {self._allowed_number_of_attempt_with_same_slice}"
+                    ),
+                    message="Pagination reset failed: retried the same slice too many times.",
+                    failure_type=FailureType.system_error,
+                )
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3fb450c and 2a50950.

📒 Files selected for processing (3)
  • airbyte_cdk/sources/declarative/requesters/error_handlers/composite_error_handler.py (1 hunks)
  • airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py (1 hunks)
  • airbyte_cdk/test/mock_http/response_builder.py (5 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py (4)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
  • FailureType (546-549)
airbyte_cdk/sources/types.py (2)
  • Record (21-72)
  • StreamSlice (75-169)
airbyte_cdk/sources/streams/concurrent/cursor.py (4)
  • observe (57-61)
  • observe (112-113)
  • observe (232-252)
  • reduce_slice_range (523-544)
airbyte_cdk/utils/traced_exception.py (1)
  • AirbyteTracedException (25-145)
airbyte_cdk/sources/declarative/requesters/error_handlers/composite_error_handler.py (1)
airbyte_cdk/sources/streams/http/error_handlers/response_models.py (1)
  • ResponseAction (14-20)
🪛 GitHub Actions: Linters
airbyte_cdk/test/mock_http/response_builder.py

[error] 83-83: mypy: Argument 1 of 'write' is incompatible with supertype 'Path' (override).


[error] 86-86: mypy: Argument 1 of 'update' is incompatible with supertype 'Path' (override).


[error] 90-90: mypy: Argument 1 of 'extract' is incompatible with supertype 'Path' (override).


[error] 199-199: mypy: Argument 1 to 'update' of 'RootPath' has incompatible type 'dict[str, Any]'; expected 'list[dict[str, Any]]'.


[error] 238-238: mypy: Argument 1 to 'extract' of 'RootPath' has incompatible type 'dict[str, Any]'; expected 'list[dict[str, Any]]'.

airbyte_cdk/sources/declarative/requesters/error_handlers/composite_error_handler.py

[error] 66-73: Ruff formatting issue detected. The file requires formatting changes. Run 'poetry run ruff format' to apply formatting.

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (12)
  • GitHub Check: Check: source-pokeapi
  • GitHub Check: Check: source-hardcoded-records
  • GitHub Check: Check: destination-motherduck
  • GitHub Check: Check: source-intercom
  • GitHub Check: Check: source-shopify
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.12, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.13, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Manifest Server Docker Image Build
  • GitHub Check: Pytest (Fast)
🔇 Additional comments (2)
airbyte_cdk/sources/declarative/requesters/error_handlers/composite_error_handler.py (1)

69-70: Verify that RESET_PAGINATION should terminate the handler chain.

By including RESET_PAGINATION in this check, any error handler that resolves to RESET_PAGINATION will cause an immediate return, preventing subsequent handlers from being consulted. This seems like the correct behavior for a pagination reset signal, similar to how SUCCESS, RETRY, and IGNORE are handled. Can you confirm this is the intended flow, wdyt?

airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py (1)

3-6: Keep Record/StreamSlice imports from declarative.types
The airbyte_cdk.sources.declarative.types module re-exports both types, as confirmed by existing code and tests.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2029415 and ab5d315.

📒 Files selected for processing (1)
  • airbyte_cdk/test/mock_http/response_builder.py (6 hunks)
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/test/mock_http/response_builder.py

[error] 1-1: ruff format check failed. 1 file would be reformatted; formatting changes shown in diff for airbyte_cdk/test/mock_http/response_builder.py.

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (13)
  • GitHub Check: Check: source-intercom
  • GitHub Check: Check: source-pokeapi
  • GitHub Check: Check: source-hardcoded-records
  • GitHub Check: Check: destination-motherduck
  • GitHub Check: Check: source-shopify
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Manifest Server Docker Image Build
  • GitHub Check: Pytest (All, Python 3.13, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.12, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Analyze (python)

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (2)
airbyte_cdk/test/mock_http/response_builder.py (2)

193-195: Fix isinstance with typing.List - will raise TypeError at runtime.

typing.List cannot be used with isinstance - it will raise TypeError: isinstance() arg 2 cannot be a parameterized generic at runtime. Could we switch to the concrete list type instead, wdyt?

Apply this diff:

-        elif isinstance(self._response, List):
+        elif isinstance(self._response, list):
             raise ValueError("pagination_strategy requires the response to be a dict but was list")

265-274: Fix isinstance with typing types - will raise TypeError at runtime.

Lines 269 and 271 use typing.List and typing.Dict with isinstance, which will raise TypeError: isinstance() arg 2 cannot be a parameterized generic at runtime. Could we switch to the concrete list and dict types instead, wdyt?

Apply this diff:

 def _validate_path_with_response(
     records_path: Union[FieldPath, NestedPath, RootPath],
     response_template: Union[Dict[str, Any], List[Dict[str, Any]]],
 ) -> None:
-    if isinstance(response_template, List) and not isinstance(records_path, RootPath):
+    if isinstance(response_template, list) and not isinstance(records_path, RootPath):
         raise ValueError("templates of type lists require RootPath")
-    elif isinstance(response_template, Dict) and not isinstance(
+    elif isinstance(response_template, dict) and not isinstance(
         records_path, (FieldPath, NestedPath)
     ):
         raise ValueError("templates of type dict either require FieldPath or NestedPath")
🧹 Nitpick comments (1)
airbyte_cdk/test/mock_http/response_builder.py (1)

78-95: Design decision: RootPath as standalone class.

The choice to keep RootPath separate from the Path ABC hierarchy avoids the Liskov Substitution Principle violation flagged in previous reviews. The Union type makes the non-substitutability explicit, which is a reasonable approach.

One small note: Line 86 declares value: List[Any], while line 83 uses List[Dict[str, Any]]. Since the actual usage (line 204) passes List[Dict[str, Any]], would it make sense to align these signatures for consistency, wdyt?

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ab5d315 and cc4da11.

📒 Files selected for processing (1)
  • airbyte_cdk/test/mock_http/response_builder.py (6 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
  • GitHub Check: Check: destination-motherduck
  • GitHub Check: Check: source-shopify
  • GitHub Check: Pytest (All, Python 3.12, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.13, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Manifest Server Docker Image Build
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Pytest (Fast)
🔇 Additional comments (4)
airbyte_cdk/test/mock_http/response_builder.py (4)

171-175: LGTM!

The parameter updates properly support both dict and list templates, and the validation call ensures compatibility between the template shape and path type at initialization time.


204-204: LGTM!

The type: ignore comment is well-justified - the validation in __init__ ensures type safety at runtime, even if the type checker can't prove it statically.


235-244: LGTM!

The parameter updates and validation ensure that RootPath is properly supported for record extraction. The type: ignore on line 244 is appropriately justified by the preceding validation.


258-262: LGTM!

The signature updates properly forward the support for list templates and RootPath to the HttpResponseBuilder.

@maxi297 maxi297 requested a review from tolik0 October 8, 2025 21:23
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)

3357-3377: Confirm private factory usage is safe but suggest public API

  • Verified ConcurrentPerPartitionCursor sets self._cursor_factory (line 154) and its .create() produces a fresh ConcurrentCursor, satisfying PaginationTracker’s expectations.
  • Consider exposing a public helper (e.g. cursor.create_leaf_cursor()) instead of accessing _cursor_factory directly to avoid brittle private‐member reliance—wdyt?
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between cc4da11 and f247e93.

📒 Files selected for processing (2)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (14 hunks)
  • airbyte_cdk/sources/streams/concurrent/cursor.py (6 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
airbyte_cdk/sources/streams/concurrent/cursor.py (4)
airbyte_cdk/sources/message/repository.py (2)
  • MessageRepository (45-60)
  • NoopMessageRepository (63-71)
airbyte_cdk/sources/types.py (5)
  • Record (21-72)
  • StreamSlice (75-169)
  • partition (99-104)
  • cursor_slice (107-112)
  • extra_fields (115-117)
airbyte_cdk/sources/connector_state_manager.py (1)
  • ConnectorStateManager (32-161)
airbyte_cdk/sources/streams/concurrent/state_converters/abstract_stream_state_converter.py (1)
  • output_format (133-137)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (3)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (2)
  • PaginationResetLimits (1189-1191)
  • PaginationReset (2075-2078)
airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py (1)
  • PaginationTracker (9-64)
airbyte_cdk/sources/streams/concurrent/cursor.py (4)
  • Cursor (51-87)
  • FinalStateCursor (90-132)
  • ConcurrentCursor (135-562)
  • copy_without_state (139-155)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (12)
  • GitHub Check: Check: source-intercom
  • GitHub Check: Check: source-pokeapi
  • GitHub Check: Check: destination-motherduck
  • GitHub Check: Check: source-shopify
  • GitHub Check: Check: source-hardcoded-records
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.12, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.13, Ubuntu)
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Manifest Server Docker Image Build
🔇 Additional comments (5)
airbyte_cdk/sources/streams/concurrent/cursor.py (4)

139-155: LGTM!

The factory method correctly creates a stateless cursor copy while preserving all configuration parameters. Using NoopMessageRepository and an empty state is appropriate for this use case.


44-44: LGTM!

Broadening the return type to Any is appropriate since record data can contain various types, and the value is subsequently parsed by _connector_state_converter.parse_value().


195-195: LGTM!

The _is_ascending_order flag initialization is correct. Note that this is a cursor-level flag (not per-partition), and once set to False, it remains so for the cursor's lifetime—which appears intentional for conservative non-ascending detection.


267-268: LGTM!

The non-ascending order detection correctly identifies when records within a partition are out of order. The cursor-level flag means that if any partition exhibits non-ascending behavior, all subsequent reduce_slice_range calls will log warnings—a conservative approach that seems intentional.

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)

3191-3192: LGTM—defensive cursor fallback is sensible.

Creating a FinalStateCursor when cursor is None ensures the stream emits at least one state message even if no cursor was provided upstream, which is a solid fallback for full-refresh or edge cases.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
airbyte_cdk/sources/streams/concurrent/cursor.py (1)

5-5: Run ruff format to unblock CI.

CI is red because Ruff wants to reformat this file. Could you run ruff format airbyte_cdk/sources/streams/concurrent/cursor.py (or ruff format .) and include the result so the linter passes, wdyt?

♻️ Duplicate comments (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)

3331-3337: Should we also check for client-side incremental sync in this validation, wdyt?

The validation correctly prevents using PaginationResetLimits with a record filter. However, when is_client_side_incremental_sync is True, a ClientSideIncrementalRecordFilterDecorator is also added (lines 3074-3082 in create_record_selector). Should we extend this validation to also reject PaginationResetLimits when client-side incremental sync is enabled?

Apply this diff to broaden the validation:

 if (
-    model.record_selector.record_filter
+    (model.record_selector.record_filter or is_client_side_incremental_sync)
     and model.pagination_reset
     and model.pagination_reset.limits
 ):
     raise ValueError("PaginationResetLimits are not supported while having record filter.")
🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)

3357-3378: Consider exposing a public API for cursor factory access, wdyt?

The method correctly handles different cursor types and creates appropriate PaginationTracker factories. However, line 3368 accesses the private attribute cursor._cursor_factory from ConcurrentPerPartitionCursor. While the comment in the ConcurrentPerPartitionCursor class acknowledges this usage, accessing private attributes can lead to maintenance issues if the internal implementation changes.

Would it make sense to add a public method to ConcurrentPerPartitionCursor to retrieve the cursor factory, reducing coupling to internal implementation details?

Additionally, the two # type: ignore comments on lines 3366 and 3368 suggest type checking issues. Could we refine the types to eliminate these suppressions and improve type safety?

Example approach for the public API:

Add to ConcurrentPerPartitionCursor:

def get_cursor_factory(self) -> ConcurrentCursorFactory:
    """Return the cursor factory for creating new cursor instances."""
    return self._cursor_factory

Then update line 3368:

cursor_factory = lambda: cursor.get_cursor_factory().create(
    {}, datetime.timedelta(0)
)
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f247e93 and 71065f0.

📒 Files selected for processing (2)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (14 hunks)
  • airbyte_cdk/sources/streams/concurrent/cursor.py (6 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (2)
  • PaginationResetLimits (1189-1191)
  • PaginationReset (2075-2078)
airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py (1)
  • PaginationTracker (9-64)
airbyte_cdk/sources/streams/concurrent/cursor.py (4)
  • Cursor (51-87)
  • FinalStateCursor (90-132)
  • ConcurrentCursor (135-565)
  • copy_without_state (139-155)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (2)
  • ConcurrentPerPartitionCursor (106-660)
  • create (98-103)
airbyte_cdk/sources/streams/concurrent/cursor.py (4)
airbyte_cdk/sources/message/repository.py (2)
  • MessageRepository (45-60)
  • NoopMessageRepository (63-71)
airbyte_cdk/sources/types.py (5)
  • Record (21-72)
  • StreamSlice (75-169)
  • partition (99-104)
  • cursor_slice (107-112)
  • extra_fields (115-117)
airbyte_cdk/sources/connector_state_manager.py (1)
  • ConnectorStateManager (32-161)
airbyte_cdk/sources/streams/concurrent/state_converters/abstract_stream_state_converter.py (1)
  • output_format (133-137)
🪛 GitHub Actions: Linters
airbyte_cdk/sources/streams/concurrent/cursor.py

[error] 555-555: Ruff format check failed. 1 file would be reformatted. Run 'ruff format' to fix style issues.

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
  • GitHub Check: Check: source-shopify
  • GitHub Check: Check: destination-motherduck
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.13, Ubuntu)
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Pytest (All, Python 3.12, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Manifest Server Docker Image Build
  • GitHub Check: Pytest (Fast)
🔇 Additional comments (5)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (5)

119-120: LGTM on imports and logger setup!

The new imports for pagination reset models and the module-level logger are correctly placed and will support the pagination tracker functionality introduced in this PR.

Also applies to: 362-364, 536-536, 652-652


2062-2065: LGTM on cursor-based parameter passing!

The new parameters (has_stop_condition_cursor, is_client_side_incremental_sync, cursor) correctly wire the cursor context through to the retriever, supporting the pagination reset feature.


3058-3058: LGTM on record selector cursor integration!

The refactor to pass client_side_incremental_sync_cursor instead of a boolean flag is cleaner and allows the ClientSideIncrementalRecordFilterDecorator to operate directly with the cursor instance.

Also applies to: 3074-3082


3191-3193: LGTM on default cursor creation!

Creating a FinalStateCursor when no cursor is provided ensures the retriever always has a valid cursor instance, preventing null pointer issues downstream.


3351-3353: LGTM on pagination tracker factory wiring!

The pagination_tracker_factory parameter correctly integrates the pagination reset functionality into the SimpleRetriever.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (2)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (2)

439-457: Guard _next_page_token with a non-None response to drop the type ignore

This removes the need for the type: ignore and avoids accidental calls with None, wdyt?

-            else:
+            elif response:
                 last_page_token_value = (
                     next_page_token.get("next_page_token") if next_page_token else None
                 )
-                next_page_token = self._next_page_token(
-                    response=response,  # type:ignore # we are breaking from the loop on the try/else if there are no response so this should be fine
+                next_page_token = self._next_page_token(
+                    response=response,
                     last_page_size=last_page_size,
                     last_record=last_record,
                     last_page_token_value=last_page_token_value,
                 )
                 if not next_page_token:
                     break
+            else:
+                break

391-401: Don’t mutate the original stream_slice inside the chunking loop

Overwriting stream_slice per chunk causes resets to operate on the last chunk (and leak query_properties into subsequent attempts). Can we keep the original slice and use a local chunked_slice for requests, wdyt?

-                    for properties in self.additional_query_properties.get_request_property_chunks(
+                    for properties in self.additional_query_properties.get_request_property_chunks(
                         stream_slice=stream_slice
                     ):
-                        stream_slice = StreamSlice(
-                            partition=stream_slice.partition or {},
-                            cursor_slice=stream_slice.cursor_slice or {},
-                            extra_fields={"query_properties": properties},
-                        )
-                        response = self._fetch_next_page(
-                            stream_state, stream_slice, next_page_token
-                        )
+                        chunked_slice = StreamSlice(
+                            partition=stream_slice.partition or {},
+                            cursor_slice=stream_slice.cursor_slice or {},
+                            extra_fields={"query_properties": properties},
+                        )
+                        response = self._fetch_next_page(
+                            stream_state, chunked_slice, next_page_token
+                        )
🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)

3334-3340: Extend limits validation to client-side incremental filtering too

If client-side incremental is enabled, a RecordFilter decorator is applied implicitly; we should also reject PaginationResetLimits in that case to avoid filtered counts. Add this guard, wdyt?

-        if (
-            model.record_selector.record_filter
-            and model.pagination_reset
-            and model.pagination_reset.limits
-        ):
+        if (
+            (model.record_selector.record_filter or self._is_client_side_filtering_enabled(model))
+            and model.pagination_reset
+            and model.pagination_reset.limits
+        ):
             raise ValueError("PaginationResetLimits are not supported while having record filter.")
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 71065f0 and 2ae3f57.

📒 Files selected for processing (5)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (14 hunks)
  • airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py (1 hunks)
  • airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (4 hunks)
  • unit_tests/sources/declarative/retrievers/test_pagination_tracker.py (1 hunks)
  • unit_tests/sources/declarative/test_concurrent_declarative_source.py (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • unit_tests/sources/declarative/test_concurrent_declarative_source.py
🧰 Additional context used
🧬 Code graph analysis (4)
unit_tests/sources/declarative/retrievers/test_pagination_tracker.py (5)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
  • FailureType (546-549)
airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py (4)
  • PaginationTracker (9-64)
  • observe (36-39)
  • has_reached_limit (41-42)
  • reduce_slice_range_if_possible (47-64)
airbyte_cdk/sources/types.py (6)
  • Record (21-72)
  • StreamSlice (75-169)
  • data (35-36)
  • associated_slice (39-40)
  • partition (99-104)
  • cursor_slice (107-112)
airbyte_cdk/sources/streams/concurrent/cursor.py (5)
  • ConcurrentCursor (135-565)
  • observe (57-61)
  • observe (112-113)
  • observe (250-270)
  • reduce_slice_range (541-565)
airbyte_cdk/utils/traced_exception.py (1)
  • AirbyteTracedException (25-145)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (3)
  • PaginationResetLimits (1189-1191)
  • PaginationReset (2075-2078)
  • Action1 (1184-1186)
airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py (1)
  • PaginationTracker (9-64)
airbyte_cdk/sources/streams/concurrent/cursor.py (4)
  • Cursor (51-87)
  • FinalStateCursor (90-132)
  • ConcurrentCursor (135-565)
  • copy_without_state (139-155)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (2)
  • ConcurrentPerPartitionCursor (106-660)
  • create (98-103)
airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py (4)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
  • FailureType (546-549)
airbyte_cdk/sources/types.py (2)
  • Record (21-72)
  • StreamSlice (75-169)
airbyte_cdk/sources/streams/concurrent/cursor.py (4)
  • observe (57-61)
  • observe (112-113)
  • observe (250-270)
  • reduce_slice_range (541-565)
airbyte_cdk/utils/traced_exception.py (1)
  • AirbyteTracedException (25-145)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (6)
airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py (4)
  • PaginationTracker (9-64)
  • observe (36-39)
  • has_reached_limit (41-42)
  • reduce_slice_range_if_possible (47-64)
airbyte_cdk/sources/streams/concurrent/cursor.py (4)
  • Cursor (51-87)
  • observe (57-61)
  • observe (112-113)
  • observe (250-270)
airbyte_cdk/sources/streams/http/pagination_reset_exception.py (1)
  • PaginationResetRequiredException (1-2)
airbyte_cdk/sources/types.py (8)
  • Record (21-72)
  • StreamSlice (75-169)
  • partition (99-104)
  • cursor_slice (107-112)
  • extra_fields (115-117)
  • data (35-36)
  • associated_slice (39-40)
  • get (146-147)
airbyte_cdk/sources/declarative/requesters/paginators/default_paginator.py (4)
  • next_page_token (136-152)
  • next_page_token (251-264)
  • get_initial_token (127-134)
  • get_initial_token (247-249)
airbyte_cdk/sources/declarative/requesters/paginators/paginator.py (2)
  • next_page_token (33-49)
  • get_initial_token (27-30)
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

[error] 5-641: I001 Import block is unsorted or un-formatted. Found 1 fixable with the --fix option.

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (12)
  • GitHub Check: Check: source-intercom
  • GitHub Check: Check: destination-motherduck
  • GitHub Check: Check: source-hardcoded-records
  • GitHub Check: Check: source-pokeapi
  • GitHub Check: Check: source-shopify
  • GitHub Check: Manifest Server Docker Image Build
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.13, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.12, Ubuntu)
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
🔇 Additional comments (3)
unit_tests/sources/declarative/retrievers/test_pagination_tracker.py (1)

23-87: Good coverage of tracker behaviors

Tests exercise observe, limits, and slice reduction (with/without cursor) well. No issues spotted.

airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py (1)

44-64: Tracker logic looks consistent

Attempt counting and record-count reset per reduce are coherent with the intended flow and tests.

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)

5-11: Fix import sorting (linters failing I001)

Imports are unsorted/unformatted. Could you run the formatter (e.g., ruff/isort) to resolve I001, wdyt?

Re-run linters locally:

  • ruff check . --fix
  • or isort . --profile black

@maxi297 maxi297 merged commit 7a35833 into main Oct 9, 2025
26 of 27 checks passed
@maxi297 maxi297 deleted the maxi297/pagination_reset branch October 9, 2025 15:49
maxi297 added a commit to airbytehq/airbyte that referenced this pull request Oct 9, 2025
## What
Blocked by airbytehq/airbyte-python-cdk#781

## How
Supporting pagination reset using cursor

## Review guide
<!--
1. `x.py`
2. `y.py`
-->

## User Impact
<!--
* What is the end result perceived by the user?
* If there are negative side effects, please list them. 
-->

## Can this PR be safely reverted and rolled back?
<!--
* If unsure, leave it blank.
-->
- [x] YES 💚
- [ ] NO ❌

---------

Co-authored-by: Octavia Squidington III <[email protected]>
matteogp pushed a commit to airbytehq/airbyte that referenced this pull request Oct 10, 2025
## What
Blocked by airbytehq/airbyte-python-cdk#781

## How
Supporting pagination reset using cursor

## Review guide
<!--
1. `x.py`
2. `y.py`
-->

## User Impact
<!--
* What is the end result perceived by the user?
* If there are negative side effects, please list them. 
-->

## Can this PR be safely reverted and rolled back?
<!--
* If unsure, leave it blank.
-->
- [x] YES 💚
- [ ] NO ❌

---------

Co-authored-by: Octavia Squidington III <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants