Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2208,6 +2208,14 @@ def _build_concurrent_cursor(
and stream_slicer
and not isinstance(stream_slicer, SinglePartitionRouter)
):
if isinstance(model.incremental_sync, IncrementingCountCursorModel):
# We don't currently support usage of partition routing and IncrementingCountCursor at the
# same time because we didn't solve for design questions like what the lookback window would
# be as well as global cursor fall backs. We have not seen customers that have needed both
# at the same time yet and are currently punting on this until we need to solve it.
raise ValueError(
f"The low-code framework does not currently support usage of a PartitionRouter and an IncrementingCountCursor at the same time. Please specify only one of these options for stream {stream_name}."
)
return self.create_concurrent_cursor_from_perpartition_cursor( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing
state_manager=self._connector_state_manager,
model_type=DatetimeBasedCursorModel,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3669,6 +3669,58 @@ def test_create_concurrent_cursor_from_perpartition_cursor_runs_state_migrations
)


def test_incrementing_count_cursor_with_partition_router_raises_error():
content = """
type: DeclarativeStream
primary_key: "id"
name: test
schema_loader:
type: InlineSchemaLoader
schema:
$schema: "http://json-schema.org/draft-07/schema"
type: object
properties:
id:
type: string
incremental_sync:
type: "IncrementingCountCursor"
cursor_field: "mid"
start_value: "0"
retriever:
type: SimpleRetriever
name: test
requester:
type: HttpRequester
name: "test"
url_base: "https://api.test.com/v3/"
http_method: "GET"
authenticator:
type: NoAuth
record_selector:
type: RecordSelector
extractor:
type: DpathExtractor
field_path: []
partition_router:
type: ListPartitionRouter
cursor_field: arbitrary
values:
- item_1
- item_2
"""

factory = ModelToComponentFactory(
emit_connector_builder_messages=True, connector_state_manager=ConnectorStateManager()
)

with pytest.raises(ValueError):
factory.create_component(
model_type=DeclarativeStreamModel,
component_definition=YamlDeclarativeSource._parse(content),
config=input_config,
)


def test_create_concurrent_cursor_uses_min_max_datetime_format_if_defined():
"""
Validates a special case for when the start_time.datetime_format and end_time.datetime_format are defined, the date to
Expand Down
Loading