Skip to content

Conversation

maxi297
Copy link
Contributor

@maxi297 maxi297 commented Sep 22, 2025

What

Maybe this is a bug fix, maybe this is a new feature. Basically, we wanted to migration source-slack on CDK v7 but it was reimplementing SubstreamPartitionRouter to make changes to the parent state.

The problem is that we were building the partition router before doing state migrations.

How

The first thing when create_default_stream is called is to update the state in the ConnectorStateManager.

Breaking changes: This means that people can't call the create cursor methods and assume the state will be migrated.

Summary by CodeRabbit

  • New Features

    • Stream state is now migrated and persisted at stream creation for more consistent startup behavior.
  • Refactor

    • Centralized state migration and explicit propagation of stream state to concurrent cursors, improving reliability for datetime, incrementing, and partitioned streams.
    • Factory constructors accept injected state or an optional state manager to make state flows explicit.
  • Tests

    • Expanded tests and added test migrations and helpers to validate migrated state and cursor behavior.
  • Chores

    • Internal wiring streamlined to reduce duplication and improve maintainability.

Important

Auto-merge enabled.

This PR is set to merge automatically when all requirements are met.

Note

Auto-merge may have been disabled. Please check the PR status to confirm.

@maxi297 maxi297 requested review from brianjlai and darynaishchenko and removed request for brianjlai September 22, 2025 16:47
@github-actions github-actions bot added the enhancement New feature or request label Sep 22, 2025
Copy link

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

Testing This CDK Version

You can test this version of the CDK using the following:

# Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@maxi297/support_state_migrations_affecting_parent_streams#egg=airbyte-python-cdk[dev]' --help

# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch maxi297/support_state_migrations_affecting_parent_streams

Helpful Resources

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /autofix - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test - Runs connector tests with the updated CDK
  • /poe build - Regenerate git-committed build artifacts, such as the pydantic models which are generated from the manifest JSON schema in YAML.
  • /poe <command> - Runs any poe command in the CDK environment

📝 Edit this welcome message.

@maxi297 maxi297 requested a review from brianjlai September 22, 2025 16:47
@maxi297
Copy link
Contributor Author

maxi297 commented Sep 22, 2025

/autofix

Auto-Fix Job Info

This job attempts to auto-fix any linting or formating issues. If any fixes are made,
those changes will be automatically committed and pushed back to the PR.

Note: This job can only be run by maintainers. On PRs from forks, this command requires
that the PR author has enabled the Allow edits from maintainers option.

PR auto-fix job started... Check job output.

✅ Changes applied successfully.

Copy link
Contributor

coderabbitai bot commented Sep 22, 2025

📝 Walkthrough

Walkthrough

Centralized stream-state migration into a new _migrate_state method invoked during stream creation; concurrent cursor factories now accept an explicit stream_state and no longer perform inline migrations. _build_concurrent_cursor fetches stream state once and forwards it. Tests and test helpers updated accordingly.

Changes

Cohort / File(s) Summary
Factory refactor: centralized state migrations and API changes
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Added private _migrate_state(self, model, config) to fetch/apply/persist stream state via the connector state manager. create_default_stream calls it at creation time. _build_concurrent_cursor reads stream_state once and propagates it into cursor factories. Signatures for concurrent cursor factories updated to accept stream_state; inline migration plumbing removed.
Unit tests: adapt to new state flow and signatures
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py
Tests updated to pass stream_state (often _NO_STATE) into factory cursor creation calls, optionally supply connector_state_manager to ModelToComponentFactory ctor when needed, and adjust assertions to reflect centralized migration/persistence. Removed or changed in-test state manager construction where not required.
Test helpers: migration implementations
unit_tests/sources/declarative/parsers/testing_components.py
Added TestingStateMigration and TestingStateMigrationWithParentState (implementing StateMigration) to exercise partitioned-state and parent-state migration scenarios used by tests.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant Caller as Caller
  participant Factory as ModelToComponentFactory
  participant StateMgr as ConnectorStateManager
  participant Migrators as StateMigration(s)
  participant Cursor as ConcurrentCursor

  Caller->>Factory: create_default_stream(model, config)
  Factory->>StateMgr: get_stream_state(stream)
  StateMgr-->>Factory: current_stream_state
  alt model.state_migrations present
    Factory->>Migrators: apply migrations(current_stream_state)
    Migrators-->>Factory: migrated_state
    Factory->>StateMgr: write_stream_state(stream, migrated_state)
    StateMgr-->>Factory: ack
    Factory->>Factory: use migrated_state
  else
    Factory->>Factory: use current_stream_state
  end
  Factory->>Factory: _build_concurrent_cursor(..., stream_state)
  Factory->>Cursor: create_concurrent_cursor_...(stream_state, ...)
  Cursor-->>Factory: cursor
  Factory-->>Caller: stream (with cursor)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested reviewers

  • brianjlai
  • tolik0

Would you like a short migration checklist added to the PR description to remind integrators about the new stream_state parameter in custom cursor factories, wdyt?

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 14.29% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Title Check ✅ Passed The title concisely targets the primary change: enabling state migrations that impact parent streams, which matches the PR objective to run migrations before building partition routers and to centralize migration logic and stream_state propagation; it correctly reflects the changes in the code and tests. The phrasing is specific enough for a reviewer scanning history to understand the main intent and is not misleading. The only minor issue is a grammar slip ("affects" → "affect") but that does not obscure the meaning.
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
✨ Finishing touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch maxi297/support_state_migrations_affecting_parent_streams

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (6)
unit_tests/sources/declarative/parsers/testing_components.py (1)

59-66: Harden access to 'updated_at' in the test migration.

If a caller omits 'updated_at', this will KeyError with a vague message. Want to raise a more explicit error, wdyt?

-        updated_at = stream_state["updated_at"]
+        try:
+            updated_at = stream_state["updated_at"]
+        except KeyError as e:
+            raise KeyError("TestingStateMigration expects 'updated_at' in stream_state") from e
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (1)

3651-3653: Optional: make the timestamp assertion format-agnostic.

Since timezone offsets can appear as “+0000” vs “+00:00”, would you prefer parsing both sides with ab_datetime_parse (or comparing to a parsed constant) to reduce brittleness, wdyt?

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4)

2116-2129: Add type annotations to the new private method

The _migrate_state method is missing type annotations for its return type and parameters. This causes the linter to flag it as no-untyped-def. Would you consider adding proper type hints like this?

-    def _migrate_state(self, config, model):
+    def _migrate_state(self, config: Config, model: DeclarativeStreamModel) -> None:

2117-2119: Handle potential None value for model.name

The get_stream_state call uses model.name which could be None according to the type definition, but the method expects a str. Should we add a check or default value here?

+        stream_name = model.name or ""
         stream_state = self._connector_state_manager.get_stream_state(
-            stream_name=model.name, namespace=None
+            stream_name=stream_name, namespace=None
         )

2201-2201: Potential None value in get_stream_state call

Similar to the issue in _migrate_state, this call to get_stream_state uses model.name which could be None. Should we use the stream_name variable that's already defined above (line 2200)?

-        stream_state = self._connector_state_manager.get_stream_state(model.name, None)
+        stream_state = self._connector_state_manager.get_stream_state(stream_name, None)

2120-2129: Consider extracting state migration creation logic

The state migration creation logic (lines 2120-2126) follows the same pattern as other component creations. Would it be cleaner to extract this into a helper method for consistency and potential reuse?

+    def _create_state_migrations(self, model: DeclarativeStreamModel, config: Config) -> List[Any]:
+        if model.state_migrations:
+            return [
+                self._create_component_from_model(state_migration, config, declarative_stream=model)
+                for state_migration in model.state_migrations
+            ]
+        return []
+
     def _migrate_state(self, config: Config, model: DeclarativeStreamModel) -> None:
         stream_state = self._connector_state_manager.get_stream_state(
             stream_name=model.name, namespace=None
         )
-        if model.state_migrations:
-            state_transformations = [
-                self._create_component_from_model(state_migration, config, declarative_stream=model)
-                for state_migration in model.state_migrations
-            ]
-        else:
-            state_transformations = []
+        state_transformations = self._create_state_migrations(model, config)
         stream_state = self.apply_stream_state_migrations(state_transformations, stream_state)
         self._connector_state_manager.update_state_for_stream(stream_name=model.name, namespace=None, value=stream_state)
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c67c556 and 568c50a.

📒 Files selected for processing (3)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (7 hunks)
  • unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (3 hunks)
  • unit_tests/sources/declarative/parsers/testing_components.py (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (3)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)
  • ModelToComponentFactory (648-4168)
  • create_component (789-822)
airbyte_cdk/sources/connector_state_manager.py (1)
  • ConnectorStateManager (32-161)
airbyte_cdk/sources/declarative/yaml_declarative_source.py (2)
  • YamlDeclarativeSource (17-69)
  • _parse (62-69)
unit_tests/sources/declarative/parsers/testing_components.py (1)
airbyte_cdk/sources/declarative/migrations/state_migration.py (1)
  • StateMigration (7-24)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)
airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py (1)
  • get_stream_state (143-145)
airbyte_cdk/sources/connector_state_manager.py (2)
  • get_stream_state (53-67)
  • update_state_for_stream (69-79)
🪛 GitHub Actions: Linters
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py

[error] 1-1: ruff format would reformat this file. Run 'ruff format' to fix code style issues.

unit_tests/sources/declarative/parsers/testing_components.py

[error] 1-1: ruff format would reformat this file. Run 'ruff format' to fix code style issues.

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

[error] 1954-1954: Call to untyped function "_migrate_state" in typed context [no-untyped-call]


[error] 2116-2116: Function is missing a type annotation [no-untyped-def]


[error] 2201-2201: Argument 1 to "get_stream_state" of "ConnectorStateManager" has incompatible type "str | None"; expected "str" [arg-type]

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (13)
  • GitHub Check: Check: source-intercom
  • GitHub Check: Check: source-shopify
  • GitHub Check: Check: destination-motherduck
  • GitHub Check: Check: source-pokeapi
  • GitHub Check: Check: source-hardcoded-records
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.13, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.12, Ubuntu)
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Manifest Server Docker Image Build
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Analyze (python)
🔇 Additional comments (7)
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (4)

3328-3351: LGTM: passing stream_state explicitly into the cursor factory.

This aligns with the new “migrate-before-create” flow and keeps state handling explicit. Nice.


3462-3472: Confirm API surface: is state_manager still a supported kwarg here?

Earlier you used create_concurrent_cursor_from_datetime_based_cursor without state_manager (Line 3344), but here you pass it. Should we drop this arg for consistency, or is the method intentionally accepting an override manager in some paths, wdyt?


3490-3531: Great coverage for migration-before-stream-creation.

This test clearly validates that state_migrations are applied and persisted prior to cursor construction.


3288-3289: Ruff format applied — linter clean.
Ran ruff format and ruff check --fix on unit_tests/sources/declarative/parsers/test_model_to_component_factory.py: 1 file reformatted; All checks passed — wdyt?

unit_tests/sources/declarative/parsers/testing_components.py (1)

6-10: Ruff format applied — one file reformatted and error fixed.
unit_tests/sources/declarative/parsers/testing_components.py was reformatted (1 file reformatted, 1 error fixed). Could you commit and push the reformatted file so CI is satisfied, wdyt?

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)

1267-1267: Good addition of stream_state parameter!

The addition of the explicit stream_state parameter to the concurrent cursor factory methods is a clean approach that ensures state migrations happen before cursor creation. This aligns well with the PR's breaking change note about state migration timing.

Also applies to: 1491-1491


1568-1568: Good consistent addition of stream_state parameter!

The addition of stream_state parameter to create_concurrent_cursor_from_perpartition_cursor maintains consistency with the other cursor creation methods. The implementation properly passes this through to the cursor construction.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)

1619-1631: Remove unexpected kwarg in partial: passing state_manager will raise at runtime.

create_concurrent_cursor_from_datetime_based_cursor doesn’t accept state_manager; keeping it in the partial(...) will cause a TypeError when the factory is invoked. Can we drop it, since state is now supplied via stream_state, wdyt?

-        cursor_factory = ConcurrentCursorFactory(
-            partial(
-                self.create_concurrent_cursor_from_datetime_based_cursor,
-                state_manager=state_manager,
-                model_type=model_type,
-                component_definition=component_definition,
-                stream_name=stream_name,
-                stream_namespace=stream_namespace,
-                config=config,
-                message_repository=NoopMessageRepository(),
-            )
-        )
+        cursor_factory = ConcurrentCursorFactory(
+            partial(
+                self.create_concurrent_cursor_from_datetime_based_cursor,
+                model_type=model_type,
+                component_definition=component_definition,
+                stream_name=stream_name,
+                stream_namespace=stream_namespace,
+                config=config,
+                message_repository=NoopMessageRepository(),
+            )
+        )

1261-1273: Update remaining call sites to pass stream_state (replace state_manager)?

rg found lingering call sites that still use the old parameter or omit stream_state — unit tests and an internal call need updating; no partial(...) matches were found.

  • unit_tests/sources/declarative/parsers/test_model_to_component_factory.py: lines ~3344, 3464, 3475, 3705, 3805, 3817
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py: def at ~1261, def at ~1485, callsite at ~2232 (missing/incorrect arg)

Can you update these call sites to pass the new stream_state (MutableMapping) instead of state_manager and ensure the internal call at line ~2232 forwards stream_state? wdyt?

🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)

1346-1354: Avoid mutating the caller-provided stream_state in place.

Shall we shallow-copy before applying the runtime lookback adjustment to prevent surprising side effects for callers that pass a shared mapping, wdyt?

-        if runtime_lookback_window and stream_state_value:
-            new_stream_state = (
+        if runtime_lookback_window and stream_state_value:
+            stream_state = dict(stream_state)  # avoid mutating input mapping
+            new_stream_state = (
                 connector_state_converter.parse_timestamp(stream_state_value)
                 - runtime_lookback_window
             )
             stream_state[cursor_field.cursor_field_key] = connector_state_converter.output_format(
                 new_stream_state
             )
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 568c50a and 197a071.

📒 Files selected for processing (3)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (7 hunks)
  • unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (3 hunks)
  • unit_tests/sources/declarative/parsers/testing_components.py (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • unit_tests/sources/declarative/parsers/testing_components.py
🧰 Additional context used
🧬 Code graph analysis (1)
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (5)
airbyte_cdk/sources/connector_state_manager.py (1)
  • ConnectorStateManager (32-161)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)
  • state (199-219)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (4)
  • state (579-580)
  • state (583-586)
  • name (106-114)
  • name (117-119)
airbyte_cdk/models/airbyte_protocol.py (3)
  • AirbyteStateMessage (67-75)
  • AirbyteStreamState (55-57)
  • AirbyteStateBlob (15-50)
airbyte_cdk/sources/declarative/yaml_declarative_source.py (2)
  • YamlDeclarativeSource (17-69)
  • _parse (62-69)
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

[error] 1954-1954: poetry run mypy --config-file mypy.ini airbyte_cdk: Call to untyped function "_migrate_state" in typed context [no-untyped-call]


[error] 2116-2116: poetry run mypy --config-file mypy.ini airbyte_cdk: Function is missing a type annotation [no-untyped-def]


[error] 2203-2203: poetry run mypy --config-file mypy.ini airbyte_cdk: Argument 1 to "get_stream_state" of "ConnectorStateManager" has incompatible type "str | None"; expected "str" [arg-type]

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (11)
  • GitHub Check: Check: source-shopify
  • GitHub Check: Check: source-hardcoded-records
  • GitHub Check: Check: destination-motherduck
  • GitHub Check: Check: source-intercom
  • GitHub Check: Pytest (Fast)
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Manifest Server Docker Image Build
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.13, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.12, Ubuntu)
🔇 Additional comments (8)
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (4)

3328-3328: LGTM! Factory initialization with connector state manager looks good.

The initialization correctly includes the connector_state_manager parameter, aligning with the new API changes.


3349-3349: LGTM! Stream state parameter correctly passed to cursor creation.

The stream_state parameter is being passed directly to the cursor creation method, which aligns with the updated API that no longer performs inline state migrations.


3489-3557: Excellent test coverage for state migrations during stream creation!

This test validates that state migrations run before partition router construction, which addresses the core issue mentioned in the PR objectives. The test correctly:

  1. Creates a connector state manager with initial state
  2. Uses a factory with the state manager
  3. Verifies that the cursor state contains the expected migrated partitions

The assertion confirms that the TestingStateMigration component successfully transformed the state into the expected partition format.


3559-3660: Great test for concurrent cursor with parent-child state migrations!

This test effectively validates that state migrations work correctly for both child and parent streams in a SubstreamPartitionRouter scenario. The test demonstrates:

  1. Proper migration of the child stream state (lookback_window set to 20)
  2. Correct parent stream state migration (updated_at changed to "2024-02-01T00:00:00.000000+0000")

This thoroughly covers the use case mentioned in the PR description about custom SubstreamPartitionRouter implementations that modify parent state.

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4)

1954-1962: State migration before partition-router construction looks good.

This aligns with the PR objective and should prevent pre‑migration mutations by custom routers. Can we confirm _migrate_state is idempotent and cheap to call when a stream is constructed multiple times, wdyt?


1485-1540: Incrementing-count concurrent cursor wiring looks correct.

The added stream_state is plumbed through and the converter is sequential as required. All good from my side, wdyt?


1561-1572: Per-partition factory signature consistent with new state flow.

Accepting stream_state here aligns with _build_concurrent_cursor and ConcurrentPerPartitionCursor. Looks good.


2210-2221: Callers updated to pass stream_state: good coverage.

All three branches in _build_concurrent_cursor now supply stream_state, which should keep behavior consistent across cursor types. Thanks!

Also applies to: 2223-2231, 2232-2241

Copy link

github-actions bot commented Sep 22, 2025

PyTest Results (Fast)

3 757 tests  ±0   3 745 ✅ ±0   6m 27s ⏱️ +22s
    1 suites ±0      12 💤 ±0 
    1 files   ±0       0 ❌ ±0 

Results for commit 0b159f7. ± Comparison against base commit c67c556.

This pull request removes 2 and adds 2 tests. Note that renamed tests count towards both.
unit_tests.sources.declarative.parsers.test_model_to_component_factory ‑ test_create_concurrent_cursor_from_datetime_based_cursor_runs_state_migrations
unit_tests.sources.declarative.parsers.test_model_to_component_factory ‑ test_create_concurrent_cursor_from_perpartition_cursor_runs_state_migrations
unit_tests.sources.declarative.parsers.test_model_to_component_factory ‑ test_create_concurrent_cursor_from_perpartition_cursor_runs_state_migrations_on_both_child_and_parent
unit_tests.sources.declarative.parsers.test_model_to_component_factory ‑ test_create_default_stream_with_datetime_cursor_then_runs_state_migrations

♻️ This comment has been updated with latest results.

Copy link

github-actions bot commented Sep 22, 2025

PyTest Results (Full)

3 760 tests  ±0   3 748 ✅ ±0   11m 4s ⏱️ -2s
    1 suites ±0      12 💤 ±0 
    1 files   ±0       0 ❌ ±0 

Results for commit 0b159f7. ± Comparison against base commit c67c556.

This pull request removes 2 and adds 2 tests. Note that renamed tests count towards both.
unit_tests.sources.declarative.parsers.test_model_to_component_factory ‑ test_create_concurrent_cursor_from_datetime_based_cursor_runs_state_migrations
unit_tests.sources.declarative.parsers.test_model_to_component_factory ‑ test_create_concurrent_cursor_from_perpartition_cursor_runs_state_migrations
unit_tests.sources.declarative.parsers.test_model_to_component_factory ‑ test_create_concurrent_cursor_from_perpartition_cursor_runs_state_migrations_on_both_child_and_parent
unit_tests.sources.declarative.parsers.test_model_to_component_factory ‑ test_create_default_stream_with_datetime_cursor_then_runs_state_migrations

♻️ This comment has been updated with latest results.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)

1491-1540: Add stream_state to remaining create_concurrent_cursor_from_incrementing_count_cursor call sites

Verification found these call sites missing stream_state:

  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py:1485
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py:1486
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py:1487
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py:1488
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py:1489
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py:1490
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py:1491
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py:1492
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py:1493
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py:1494
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py:1495

Can you update these call sites to pass/forward stream_state (or confirm why they should not), wdyt?

🧹 Nitpick comments (3)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (3)

1950-1962: Migrating state before building the stream is the right sequencing

This ensures partition routers are instantiated with migrated state and addresses the Slack v7 issue described in the PR. Looks solid. Do you want to add a brief comment here to document that ordering guarantee, wdyt?


2116-2132: Typed, centralized _migrate_state is clean; consider no-op early return

The implementation is clear and typed. Do we want to skip update_state_for_stream if there are no migrations or if the state didn't change to avoid unnecessary writes, wdyt?

 def _migrate_state(self, model: DeclarativeStreamModel, config: Config) -> None:
     stream_name = model.name or ""
-    stream_state = self._connector_state_manager.get_stream_state(
+    original_state = self._connector_state_manager.get_stream_state(
         stream_name=stream_name, namespace=None
     )
-    if model.state_migrations:
-        state_transformations = [
+    if not model.state_migrations:
+        return
+    state_transformations = [
         self._create_component_from_model(state_migration, config, declarative_stream=model)
         for state_migration in model.state_migrations
-        ]
-    else:
-        state_transformations = []
-    stream_state = self.apply_stream_state_migrations(state_transformations, stream_state)
+    ]
+    stream_state = self.apply_stream_state_migrations(state_transformations, dict(original_state))
+    if stream_state == original_state:
+        return
     self._connector_state_manager.update_state_for_stream(
         stream_name=stream_name, namespace=None, value=stream_state
     )

1618-1631: Drop unused kwarg in partial to avoid confusion

The partial passes state_manager to create_concurrent_cursor_from_datetime_based_cursor, but that function doesn't consume it (it only accepts **kwargs). Shall we remove it to prevent future misreads, wdyt?

-        cursor_factory = ConcurrentCursorFactory(
-            partial(
-                self.create_concurrent_cursor_from_datetime_based_cursor,
-                state_manager=state_manager,
-                model_type=model_type,
-                component_definition=component_definition,
-                stream_name=stream_name,
-                stream_namespace=stream_namespace,
-                config=config,
-                message_repository=NoopMessageRepository(),
-            )
-        )
+        cursor_factory = ConcurrentCursorFactory(
+            partial(
+                self.create_concurrent_cursor_from_datetime_based_cursor,
+                model_type=model_type,
+                component_definition=component_definition,
+                stream_name=stream_name,
+                stream_namespace=stream_namespace,
+                config=config,
+                message_repository=NoopMessageRepository(),
+            )
+        )
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 197a071 and 9f9ac58.

📒 Files selected for processing (1)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (7 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (6)
airbyte_cdk/legacy/sources/declarative/incremental/per_partition_with_global.py (1)
  • get_stream_state (130-141)
airbyte_cdk/legacy/sources/declarative/incremental/per_partition_cursor.py (1)
  • get_stream_state (170-186)
airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py (1)
  • get_stream_state (143-145)
airbyte_cdk/sources/declarative/partition_routers/partition_router.py (1)
  • get_stream_state (22-40)
airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py (1)
  • get_stream_state (376-399)
airbyte_cdk/sources/connector_state_manager.py (2)
  • get_stream_state (53-67)
  • update_state_for_stream (69-79)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
  • GitHub Check: Check: source-shopify
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.13, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.12, Ubuntu)
🔇 Additional comments (2)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)

2203-2217: Fetching stream_state once in _build_concurrent_cursor is correct

Grabbing the state up-front and plumbing it through ensures all cursor types get the migrated snapshot. Looks good.


2216-2241: Forwarding stream_state to all cursor builders is consistent

All three branches (per-partition, incrementing-count, datetime-based) now forward the same stream_state. Nice. Do we also want to thread through a namespace if/when the model supports it later, to future-proof this path, wdyt?

coderabbitai[bot]
coderabbitai bot previously requested changes Sep 22, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (1)

3345-3352: LGTM on explicit stream_state propagation — migrate remaining call sites

Found several remaining call sites still using the legacy state_manager parameter or omitting stream_state; these should be migrated to the new API.

  • unit_tests/sources/declarative/parsers/test_model_to_component_factory.py: 3465, 3476, 3706, 3806, 3818 (calls include state_manager=connector_state_manager)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py: 2233 (call to create_concurrent_cursor_from_datetime_based_cursor without stream_state)

Could you update these to pass stream_state (and remove legacy state_manager usage) for consistency? Wdyt?

🧹 Nitpick comments (3)
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (3)

3329-3329: Nit: avoid enabling connector‑builder message emission where unused

Here we don't assert on emitted messages. Dropping emit_connector_builder_messages=True trims overhead and keeps tests focused. Shall we simplify, wdyt?

-connector_builder_factory = ModelToComponentFactory(emit_connector_builder_messages=True)
+connector_builder_factory = ModelToComponentFactory()

3490-3558: Great coverage: verifies state migrations run before stream construction

This test clearly exercises “migrate before build” for a datetime cursor. One small enhancement: do we also want to assert the state manager was updated with the migrated state (ensuring update_state_for_stream ran)? Adding this makes the contract explicit, wdyt?

     stream = factory.create_component(
         model_type=DeclarativeStreamModel,
         component_definition=YamlDeclarativeSource._parse(content),
         config=input_config,
     )
     assert stream.cursor.state["states"] == [
         {"cursor": {"updated_at": stream_state["updated_at"]}, "partition": {"type": "type_1"}},
         {"cursor": {"updated_at": stream_state["updated_at"]}, "partition": {"type": "type_2"}},
     ]
+    # Verify ConnectorStateManager holds the migrated state as well
+    assert connector_state_manager.get_stream_state("test", None) == stream.cursor.state

3561-3660: Solid: validates migrations affecting both child and parent states

Nice end‑to‑end on parent/child migration. For extra confidence in the ordering guarantee (migrations strictly before partition router instantiation), would you consider a tiny test double partition router that snapshots parent state in its init and exposes it for assertion? That would catch regressions in construction order directly, wdyt?

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 9f9ac58 and 0b159f7.

📒 Files selected for processing (1)
  • unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (8 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (2)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)
  • ModelToComponentFactory (648-4171)
  • create_component (789-822)
airbyte_cdk/sources/declarative/yaml_declarative_source.py (1)
  • _parse (62-69)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (13)
  • GitHub Check: Check: destination-motherduck
  • GitHub Check: Check: source-shopify
  • GitHub Check: Check: source-hardcoded-records
  • GitHub Check: Check: source-pokeapi
  • GitHub Check: Check: source-intercom
  • GitHub Check: Manifest Server Docker Image Build
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.12, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.13, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Analyze (python)

@maxi297 maxi297 dismissed coderabbitai[bot]’s stale review September 22, 2025 19:00

I don't care too much about the problem of re-using state in those tests as they don't do anything with it

@maxi297 maxi297 enabled auto-merge (squash) September 22, 2025 19:08
@maxi297 maxi297 disabled auto-merge September 22, 2025 19:08
@maxi297 maxi297 merged commit a1428bf into main Sep 22, 2025
30 of 31 checks passed
@maxi297 maxi297 deleted the maxi297/support_state_migrations_affecting_parent_streams branch September 22, 2025 19:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants