-
Notifications
You must be signed in to change notification settings - Fork 30
feat: support state migrations that affects parent streams #770
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support state migrations that affects parent streams #770
Conversation
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. Testing This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@maxi297/support_state_migrations_affecting_parent_streams#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch maxi297/support_state_migrations_affecting_parent_streams Helpful ResourcesPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
/autofix
|
📝 WalkthroughWalkthroughCentralized stream-state migration into a new _migrate_state method invoked during stream creation; concurrent cursor factories now accept an explicit Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Caller as Caller
participant Factory as ModelToComponentFactory
participant StateMgr as ConnectorStateManager
participant Migrators as StateMigration(s)
participant Cursor as ConcurrentCursor
Caller->>Factory: create_default_stream(model, config)
Factory->>StateMgr: get_stream_state(stream)
StateMgr-->>Factory: current_stream_state
alt model.state_migrations present
Factory->>Migrators: apply migrations(current_stream_state)
Migrators-->>Factory: migrated_state
Factory->>StateMgr: write_stream_state(stream, migrated_state)
StateMgr-->>Factory: ack
Factory->>Factory: use migrated_state
else
Factory->>Factory: use current_stream_state
end
Factory->>Factory: _build_concurrent_cursor(..., stream_state)
Factory->>Cursor: create_concurrent_cursor_...(stream_state, ...)
Cursor-->>Factory: cursor
Factory-->>Caller: stream (with cursor)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
Would you like a short migration checklist added to the PR description to remind integrators about the new Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (6)
unit_tests/sources/declarative/parsers/testing_components.py (1)
59-66
: Harden access to 'updated_at' in the test migration.If a caller omits 'updated_at', this will KeyError with a vague message. Want to raise a more explicit error, wdyt?
- updated_at = stream_state["updated_at"] + try: + updated_at = stream_state["updated_at"] + except KeyError as e: + raise KeyError("TestingStateMigration expects 'updated_at' in stream_state") from eunit_tests/sources/declarative/parsers/test_model_to_component_factory.py (1)
3651-3653
: Optional: make the timestamp assertion format-agnostic.Since timezone offsets can appear as “+0000” vs “+00:00”, would you prefer parsing both sides with ab_datetime_parse (or comparing to a parsed constant) to reduce brittleness, wdyt?
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4)
2116-2129
: Add type annotations to the new private methodThe
_migrate_state
method is missing type annotations for its return type and parameters. This causes the linter to flag it asno-untyped-def
. Would you consider adding proper type hints like this?- def _migrate_state(self, config, model): + def _migrate_state(self, config: Config, model: DeclarativeStreamModel) -> None:
2117-2119
: Handle potential None value for model.nameThe
get_stream_state
call usesmodel.name
which could beNone
according to the type definition, but the method expects astr
. Should we add a check or default value here?+ stream_name = model.name or "" stream_state = self._connector_state_manager.get_stream_state( - stream_name=model.name, namespace=None + stream_name=stream_name, namespace=None )
2201-2201
: Potential None value in get_stream_state callSimilar to the issue in
_migrate_state
, this call toget_stream_state
usesmodel.name
which could beNone
. Should we use thestream_name
variable that's already defined above (line 2200)?- stream_state = self._connector_state_manager.get_stream_state(model.name, None) + stream_state = self._connector_state_manager.get_stream_state(stream_name, None)
2120-2129
: Consider extracting state migration creation logicThe state migration creation logic (lines 2120-2126) follows the same pattern as other component creations. Would it be cleaner to extract this into a helper method for consistency and potential reuse?
+ def _create_state_migrations(self, model: DeclarativeStreamModel, config: Config) -> List[Any]: + if model.state_migrations: + return [ + self._create_component_from_model(state_migration, config, declarative_stream=model) + for state_migration in model.state_migrations + ] + return [] + def _migrate_state(self, config: Config, model: DeclarativeStreamModel) -> None: stream_state = self._connector_state_manager.get_stream_state( stream_name=model.name, namespace=None ) - if model.state_migrations: - state_transformations = [ - self._create_component_from_model(state_migration, config, declarative_stream=model) - for state_migration in model.state_migrations - ] - else: - state_transformations = [] + state_transformations = self._create_state_migrations(model, config) stream_state = self.apply_stream_state_migrations(state_transformations, stream_state) self._connector_state_manager.update_state_for_stream(stream_name=model.name, namespace=None, value=stream_state)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(7 hunks)unit_tests/sources/declarative/parsers/test_model_to_component_factory.py
(3 hunks)unit_tests/sources/declarative/parsers/testing_components.py
(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (3)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)
ModelToComponentFactory
(648-4168)create_component
(789-822)airbyte_cdk/sources/connector_state_manager.py (1)
ConnectorStateManager
(32-161)airbyte_cdk/sources/declarative/yaml_declarative_source.py (2)
YamlDeclarativeSource
(17-69)_parse
(62-69)
unit_tests/sources/declarative/parsers/testing_components.py (1)
airbyte_cdk/sources/declarative/migrations/state_migration.py (1)
StateMigration
(7-24)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)
airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py (1)
get_stream_state
(143-145)airbyte_cdk/sources/connector_state_manager.py (2)
get_stream_state
(53-67)update_state_for_stream
(69-79)
🪛 GitHub Actions: Linters
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py
[error] 1-1: ruff format would reformat this file. Run 'ruff format' to fix code style issues.
unit_tests/sources/declarative/parsers/testing_components.py
[error] 1-1: ruff format would reformat this file. Run 'ruff format' to fix code style issues.
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
[error] 1954-1954: Call to untyped function "_migrate_state" in typed context [no-untyped-call]
[error] 2116-2116: Function is missing a type annotation [no-untyped-def]
[error] 2201-2201: Argument 1 to "get_stream_state" of "ConnectorStateManager" has incompatible type "str | None"; expected "str" [arg-type]
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (13)
- GitHub Check: Check: source-intercom
- GitHub Check: Check: source-shopify
- GitHub Check: Check: destination-motherduck
- GitHub Check: Check: source-pokeapi
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.13, Ubuntu)
- GitHub Check: Pytest (All, Python 3.12, Ubuntu)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Manifest Server Docker Image Build
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Analyze (python)
🔇 Additional comments (7)
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (4)
3328-3351
: LGTM: passing stream_state explicitly into the cursor factory.This aligns with the new “migrate-before-create” flow and keeps state handling explicit. Nice.
3462-3472
: Confirm API surface: is state_manager still a supported kwarg here?Earlier you used create_concurrent_cursor_from_datetime_based_cursor without state_manager (Line 3344), but here you pass it. Should we drop this arg for consistency, or is the method intentionally accepting an override manager in some paths, wdyt?
3490-3531
: Great coverage for migration-before-stream-creation.This test clearly validates that state_migrations are applied and persisted prior to cursor construction.
3288-3289
: Ruff format applied — linter clean.
Ranruff format
andruff check --fix
on unit_tests/sources/declarative/parsers/test_model_to_component_factory.py: 1 file reformatted; All checks passed — wdyt?unit_tests/sources/declarative/parsers/testing_components.py (1)
6-10
: Ruff format applied — one file reformatted and error fixed.
unit_tests/sources/declarative/parsers/testing_components.py was reformatted (1 file reformatted, 1 error fixed). Could you commit and push the reformatted file so CI is satisfied, wdyt?airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)
1267-1267
: Good addition of stream_state parameter!The addition of the explicit
stream_state
parameter to the concurrent cursor factory methods is a clean approach that ensures state migrations happen before cursor creation. This aligns well with the PR's breaking change note about state migration timing.Also applies to: 1491-1491
1568-1568
: Good consistent addition of stream_state parameter!The addition of
stream_state
parameter tocreate_concurrent_cursor_from_perpartition_cursor
maintains consistency with the other cursor creation methods. The implementation properly passes this through to the cursor construction.
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)
1619-1631
: Remove unexpected kwarg in partial: passingstate_manager
will raise at runtime.
create_concurrent_cursor_from_datetime_based_cursor
doesn’t acceptstate_manager
; keeping it in thepartial(...)
will cause aTypeError
when the factory is invoked. Can we drop it, since state is now supplied viastream_state
, wdyt?- cursor_factory = ConcurrentCursorFactory( - partial( - self.create_concurrent_cursor_from_datetime_based_cursor, - state_manager=state_manager, - model_type=model_type, - component_definition=component_definition, - stream_name=stream_name, - stream_namespace=stream_namespace, - config=config, - message_repository=NoopMessageRepository(), - ) - ) + cursor_factory = ConcurrentCursorFactory( + partial( + self.create_concurrent_cursor_from_datetime_based_cursor, + model_type=model_type, + component_definition=component_definition, + stream_name=stream_name, + stream_namespace=stream_namespace, + config=config, + message_repository=NoopMessageRepository(), + ) + )
1261-1273
: Update remaining call sites to pass stream_state (replace state_manager)?rg found lingering call sites that still use the old parameter or omit stream_state — unit tests and an internal call need updating; no partial(...) matches were found.
- unit_tests/sources/declarative/parsers/test_model_to_component_factory.py: lines ~3344, 3464, 3475, 3705, 3805, 3817
- airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py: def at ~1261, def at ~1485, callsite at ~2232 (missing/incorrect arg)
Can you update these call sites to pass the new stream_state (MutableMapping) instead of state_manager and ensure the internal call at line ~2232 forwards stream_state? wdyt?
🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
1346-1354
: Avoid mutating the caller-providedstream_state
in place.Shall we shallow-copy before applying the runtime lookback adjustment to prevent surprising side effects for callers that pass a shared mapping, wdyt?
- if runtime_lookback_window and stream_state_value: - new_stream_state = ( + if runtime_lookback_window and stream_state_value: + stream_state = dict(stream_state) # avoid mutating input mapping + new_stream_state = ( connector_state_converter.parse_timestamp(stream_state_value) - runtime_lookback_window ) stream_state[cursor_field.cursor_field_key] = connector_state_converter.output_format( new_stream_state )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(7 hunks)unit_tests/sources/declarative/parsers/test_model_to_component_factory.py
(3 hunks)unit_tests/sources/declarative/parsers/testing_components.py
(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- unit_tests/sources/declarative/parsers/testing_components.py
🧰 Additional context used
🧬 Code graph analysis (1)
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (5)
airbyte_cdk/sources/connector_state_manager.py (1)
ConnectorStateManager
(32-161)airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)
state
(199-219)airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (4)
state
(579-580)state
(583-586)name
(106-114)name
(117-119)airbyte_cdk/models/airbyte_protocol.py (3)
AirbyteStateMessage
(67-75)AirbyteStreamState
(55-57)AirbyteStateBlob
(15-50)airbyte_cdk/sources/declarative/yaml_declarative_source.py (2)
YamlDeclarativeSource
(17-69)_parse
(62-69)
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
[error] 1954-1954: poetry run mypy --config-file mypy.ini airbyte_cdk: Call to untyped function "_migrate_state" in typed context [no-untyped-call]
[error] 2116-2116: poetry run mypy --config-file mypy.ini airbyte_cdk: Function is missing a type annotation [no-untyped-def]
[error] 2203-2203: poetry run mypy --config-file mypy.ini airbyte_cdk: Argument 1 to "get_stream_state" of "ConnectorStateManager" has incompatible type "str | None"; expected "str" [arg-type]
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (11)
- GitHub Check: Check: source-shopify
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: Check: destination-motherduck
- GitHub Check: Check: source-intercom
- GitHub Check: Pytest (Fast)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Manifest Server Docker Image Build
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.13, Ubuntu)
- GitHub Check: Pytest (All, Python 3.12, Ubuntu)
🔇 Additional comments (8)
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (4)
3328-3328
: LGTM! Factory initialization with connector state manager looks good.The initialization correctly includes the
connector_state_manager
parameter, aligning with the new API changes.
3349-3349
: LGTM! Stream state parameter correctly passed to cursor creation.The
stream_state
parameter is being passed directly to the cursor creation method, which aligns with the updated API that no longer performs inline state migrations.
3489-3557
: Excellent test coverage for state migrations during stream creation!This test validates that state migrations run before partition router construction, which addresses the core issue mentioned in the PR objectives. The test correctly:
- Creates a connector state manager with initial state
- Uses a factory with the state manager
- Verifies that the cursor state contains the expected migrated partitions
The assertion confirms that the
TestingStateMigration
component successfully transformed the state into the expected partition format.
3559-3660
: Great test for concurrent cursor with parent-child state migrations!This test effectively validates that state migrations work correctly for both child and parent streams in a SubstreamPartitionRouter scenario. The test demonstrates:
- Proper migration of the child stream state (lookback_window set to 20)
- Correct parent stream state migration (updated_at changed to "2024-02-01T00:00:00.000000+0000")
This thoroughly covers the use case mentioned in the PR description about custom SubstreamPartitionRouter implementations that modify parent state.
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4)
1954-1962
: State migration before partition-router construction looks good.This aligns with the PR objective and should prevent pre‑migration mutations by custom routers. Can we confirm
_migrate_state
is idempotent and cheap to call when a stream is constructed multiple times, wdyt?
1485-1540
: Incrementing-count concurrent cursor wiring looks correct.The added
stream_state
is plumbed through and the converter is sequential as required. All good from my side, wdyt?
1561-1572
: Per-partition factory signature consistent with new state flow.Accepting
stream_state
here aligns with_build_concurrent_cursor
andConcurrentPerPartitionCursor
. Looks good.
2210-2221
: Callers updated to passstream_state
: good coverage.All three branches in
_build_concurrent_cursor
now supplystream_state
, which should keep behavior consistent across cursor types. Thanks!Also applies to: 2223-2231, 2232-2241
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Outdated
Show resolved
Hide resolved
PyTest Results (Fast)3 757 tests ±0 3 745 ✅ ±0 6m 27s ⏱️ +22s Results for commit 0b159f7. ± Comparison against base commit c67c556. This pull request removes 2 and adds 2 tests. Note that renamed tests count towards both.
♻️ This comment has been updated with latest results. |
PyTest Results (Full)3 760 tests ±0 3 748 ✅ ±0 11m 4s ⏱️ -2s Results for commit 0b159f7. ± Comparison against base commit c67c556. This pull request removes 2 and adds 2 tests. Note that renamed tests count towards both.
♻️ This comment has been updated with latest results. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
1491-1540
: Add stream_state to remaining create_concurrent_cursor_from_incrementing_count_cursor call sitesVerification found these call sites missing stream_state:
- airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py:1485
- airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py:1486
- airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py:1487
- airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py:1488
- airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py:1489
- airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py:1490
- airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py:1491
- airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py:1492
- airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py:1493
- airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py:1494
- airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py:1495
Can you update these call sites to pass/forward stream_state (or confirm why they should not), wdyt?
🧹 Nitpick comments (3)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (3)
1950-1962
: Migrating state before building the stream is the right sequencingThis ensures partition routers are instantiated with migrated state and addresses the Slack v7 issue described in the PR. Looks solid. Do you want to add a brief comment here to document that ordering guarantee, wdyt?
2116-2132
: Typed, centralized _migrate_state is clean; consider no-op early returnThe implementation is clear and typed. Do we want to skip update_state_for_stream if there are no migrations or if the state didn't change to avoid unnecessary writes, wdyt?
def _migrate_state(self, model: DeclarativeStreamModel, config: Config) -> None: stream_name = model.name or "" - stream_state = self._connector_state_manager.get_stream_state( + original_state = self._connector_state_manager.get_stream_state( stream_name=stream_name, namespace=None ) - if model.state_migrations: - state_transformations = [ + if not model.state_migrations: + return + state_transformations = [ self._create_component_from_model(state_migration, config, declarative_stream=model) for state_migration in model.state_migrations - ] - else: - state_transformations = [] - stream_state = self.apply_stream_state_migrations(state_transformations, stream_state) + ] + stream_state = self.apply_stream_state_migrations(state_transformations, dict(original_state)) + if stream_state == original_state: + return self._connector_state_manager.update_state_for_stream( stream_name=stream_name, namespace=None, value=stream_state )
1618-1631
: Drop unused kwarg in partial to avoid confusionThe partial passes state_manager to create_concurrent_cursor_from_datetime_based_cursor, but that function doesn't consume it (it only accepts **kwargs). Shall we remove it to prevent future misreads, wdyt?
- cursor_factory = ConcurrentCursorFactory( - partial( - self.create_concurrent_cursor_from_datetime_based_cursor, - state_manager=state_manager, - model_type=model_type, - component_definition=component_definition, - stream_name=stream_name, - stream_namespace=stream_namespace, - config=config, - message_repository=NoopMessageRepository(), - ) - ) + cursor_factory = ConcurrentCursorFactory( + partial( + self.create_concurrent_cursor_from_datetime_based_cursor, + model_type=model_type, + component_definition=component_definition, + stream_name=stream_name, + stream_namespace=stream_namespace, + config=config, + message_repository=NoopMessageRepository(), + ) + )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(7 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (6)
airbyte_cdk/legacy/sources/declarative/incremental/per_partition_with_global.py (1)
get_stream_state
(130-141)airbyte_cdk/legacy/sources/declarative/incremental/per_partition_cursor.py (1)
get_stream_state
(170-186)airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py (1)
get_stream_state
(143-145)airbyte_cdk/sources/declarative/partition_routers/partition_router.py (1)
get_stream_state
(22-40)airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py (1)
get_stream_state
(376-399)airbyte_cdk/sources/connector_state_manager.py (2)
get_stream_state
(53-67)update_state_for_stream
(69-79)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
- GitHub Check: Check: source-shopify
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.13, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.12, Ubuntu)
🔇 Additional comments (2)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)
2203-2217
: Fetching stream_state once in _build_concurrent_cursor is correctGrabbing the state up-front and plumbing it through ensures all cursor types get the migrated snapshot. Looks good.
2216-2241
: Forwarding stream_state to all cursor builders is consistentAll three branches (per-partition, incrementing-count, datetime-based) now forward the same stream_state. Nice. Do we also want to thread through a namespace if/when the model supports it later, to future-proof this path, wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (1)
3345-3352
: LGTM on explicit stream_state propagation — migrate remaining call sitesFound several remaining call sites still using the legacy state_manager parameter or omitting stream_state; these should be migrated to the new API.
- unit_tests/sources/declarative/parsers/test_model_to_component_factory.py: 3465, 3476, 3706, 3806, 3818 (calls include state_manager=connector_state_manager)
- airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py: 2233 (call to create_concurrent_cursor_from_datetime_based_cursor without stream_state)
Could you update these to pass stream_state (and remove legacy state_manager usage) for consistency? Wdyt?
🧹 Nitpick comments (3)
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (3)
3329-3329
: Nit: avoid enabling connector‑builder message emission where unusedHere we don't assert on emitted messages. Dropping
emit_connector_builder_messages=True
trims overhead and keeps tests focused. Shall we simplify, wdyt?-connector_builder_factory = ModelToComponentFactory(emit_connector_builder_messages=True) +connector_builder_factory = ModelToComponentFactory()
3490-3558
: Great coverage: verifies state migrations run before stream constructionThis test clearly exercises “migrate before build” for a datetime cursor. One small enhancement: do we also want to assert the state manager was updated with the migrated state (ensuring
update_state_for_stream
ran)? Adding this makes the contract explicit, wdyt?stream = factory.create_component( model_type=DeclarativeStreamModel, component_definition=YamlDeclarativeSource._parse(content), config=input_config, ) assert stream.cursor.state["states"] == [ {"cursor": {"updated_at": stream_state["updated_at"]}, "partition": {"type": "type_1"}}, {"cursor": {"updated_at": stream_state["updated_at"]}, "partition": {"type": "type_2"}}, ] + # Verify ConnectorStateManager holds the migrated state as well + assert connector_state_manager.get_stream_state("test", None) == stream.cursor.state
3561-3660
: Solid: validates migrations affecting both child and parent statesNice end‑to‑end on parent/child migration. For extra confidence in the ordering guarantee (migrations strictly before partition router instantiation), would you consider a tiny test double partition router that snapshots parent state in its init and exposes it for assertion? That would catch regressions in construction order directly, wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py
(8 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (2)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)
ModelToComponentFactory
(648-4171)create_component
(789-822)airbyte_cdk/sources/declarative/yaml_declarative_source.py (1)
_parse
(62-69)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (13)
- GitHub Check: Check: destination-motherduck
- GitHub Check: Check: source-shopify
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: Check: source-pokeapi
- GitHub Check: Check: source-intercom
- GitHub Check: Manifest Server Docker Image Build
- GitHub Check: SDM Docker Image Build
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.12, Ubuntu)
- GitHub Check: Pytest (All, Python 3.13, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Analyze (python)
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py
Show resolved
Hide resolved
I don't care too much about the problem of re-using state in those tests as they don't do anything with it
What
Maybe this is a bug fix, maybe this is a new feature. Basically, we wanted to migration source-slack on CDK v7 but it was reimplementing SubstreamPartitionRouter to make changes to the parent state.
The problem is that we were building the partition router before doing state migrations.
How
The first thing when
create_default_stream
is called is to update the state in the ConnectorStateManager.Breaking changes: This means that people can't call the create cursor methods and assume the state will be migrated.
Summary by CodeRabbit
New Features
Refactor
Tests
Chores
Important
Auto-merge enabled.
This PR is set to merge automatically when all requirements are met.
Note
Auto-merge may have been disabled. Please check the PR status to confirm.