Skip to content

feat: add LazyPartition trait for LazyMemoryExec, migrate generate_series to partitions#20369

Open
ethan-tyler wants to merge 6 commits intoapache:mainfrom
ethan-tyler:lazy-partition-migration
Open

feat: add LazyPartition trait for LazyMemoryExec, migrate generate_series to partitions#20369
ethan-tyler wants to merge 6 commits intoapache:mainfrom
ethan-tyler:lazy-partition-migration

Conversation

@ethan-tyler
Copy link
Contributor

Which issue does this PR close?

Rationale for this change

LazyMemoryExec currently uses generator closures (LazyBatchGenerator) as its partition interface. This replaces with a LazyPartition trait that gives partitions a proper abstraction while keeping the old generator path working through a legacy adapter.

generate_series/range are migrated to native LazyPartition as the first consumer. Proto roundtrip is updated to handle both native and legacy forms.

What changes are included in this PR?

  • LazyPartition trait as the native partition interface for LazyMemoryExec
  • LazyBatchGeneratorPartition adapter allowing existing generator to work
  • Deprecated compatibility APIs preserved: LazyMemoryExec::try_new, LazyMemoryExec::generators, GenerateSeriesTable::as_generator.
  • generate_series/range migrated to native GenerateSeriesPartition
  • Shared helper for generate series state construction (dedupes native and deprecated paths)
  • Proto roundtrip updated for native GenerateSeriesPartition with legacy adapter fallback
  • Core coop test helper updated to use LazyPartition
  • Proto regression test coverage added

Are these changes tested?

yes

  • cargo test -p datafusion-physical-plan lazy_memory_tests --lib
  • cargo test -p datafusion-functions-table generate_series_tests --lib
  • cargo test -p datafusion-proto roundtrip_generate_series --test proto_integration
  • cargo test -p datafusion execution::coop --test core_integration
  • cargo check -p datafusion-physical-plan -p datafusion-functions-table -p datafusion-proto -p datafusion

Are there any user-facing changes?

No. LazyBatchGenerator and existing public APIs remain available (deprecated)

@github-actions github-actions bot added core Core DataFusion crate proto Related to proto crate functions Changes to functions implementation physical-plan Changes to the physical-plan crate labels Feb 15, 2026
@ethan-tyler
Copy link
Contributor Author

@2010YOUY01 @Jefffrey - this might be of interest if you.

Copy link
Contributor

@kosiew kosiew left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ethan-tyler

Thanks for working on this.

Self { schema, args }
}

pub fn as_partition(&self, batch_size: usize) -> Result<Arc<dyn LazyPartition>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GenerateSeriesTable::as_partition currently constructs GenerateSeriesPartition without validating timestamp timezone input.
Previously as_generator parsed timezone eagerly and could fail during planning. Consider validating in GenerateSeriesPartition::new (or a try_new) to keep error timing consistent and fail earlier.

@@ -268,10 +408,10 @@ impl DisplayAs for LazyMemoryExec {
write!(
f,
"LazyMemoryExec: partitions={}, batch_generators=[{}]",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DisplayAs still prints batch_generators= even though the primary abstraction is now partitions?

/// Get the batch generators
#[deprecated(note = "Use LazyMemoryExec::partitions instead")]
#[expect(deprecated)]
pub fn generators(&self) -> &Vec<Arc<RwLock<dyn LazyBatchGenerator>>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not the fault of this PR but
generators() returns &Vec<_>. Since this is a compatibility accessor, returning a slice (&[_]) would avoid exposing container type and align with other accessor style.

}

#[allow(deprecated)]
fn try_from_lazy_memory_exec(exec: &LazyMemoryExec) -> Result<Option<Self>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The native GenerateSeriesPartition path uses serialize_generate_series_args, but legacy generator downcasts still manually reconstruct protobuf args.

Could we extract a helper that converts legacy generator variants into GenSeriesArgs first, then reuse serialize_generate_series_args to reduce duplicated conversion logic?

pub fn try_set_partitioning(&mut self, partitioning: Partitioning) -> Result<()> {
let partition_count = partitioning.partition_count();
let generator_count = self.batch_generators.len();
let generator_count = self.partitions.len();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generator_count although this now counts partitions?

ethan-tyler and others added 5 commits March 11, 2026 12:18
…tions

Signed-off-by: Ethan Urbanski <ethanurbanski@gmail.com>
Signed-off-by: Ethan Urbanski <ethanurbanski@gmail.com>
Signed-off-by: Ethan Urbanski <ethanurbanski@gmail.com>
Signed-off-by: Ethan Urbanski <ethanurbanski@gmail.com>
@ethan-tyler ethan-tyler force-pushed the lazy-partition-migration branch from 5bbb386 to 3253f6c Compare March 11, 2026 16:20
@ethan-tyler
Copy link
Contributor Author

@ethan-tyler

Thanks for working on this.

@kosiew I must have forgotten to reply! Thanks for the review, I addressed your feedback. Let me know if there's anymore, thanks!

@kosiew
Copy link
Contributor

kosiew commented Mar 18, 2026

@ethan-tyler

Just want to confirmt that you have pushed the changes already.

I noted a few outstanding points:

  1. GenerateSeriesPartition::try_new does not eagerly validate timestamp timezones

    The fail-fast behavior called out in the review remains unresolved.

    • generate_series.rs (line 246)
    • generate_series.rs (line 274)

    These locations only validate:

    batch_size == 0

    Timezone parsing still happens later in:

    build_generate_series_state
    • generate_series.rs (line 362)

  1. Naming cleanup in try_set_partitioning was not completed

    The local variable is still named:

    generator_count

    even though it now counts partitions.

    • memory.rs (line 372)

  1. DisplayAs still emits batch_generators=

    • memory.rs (line 418)

  1. Proto code still duplicates the legacy generator-to-protobuf conversion

    Instead of normalizing legacy variants into GenSeriesArgs and reusing:

    serialize_generate_series_args

    The native partition path uses the helper, but the legacy adapter path still manually reconstructs protobuf args.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate functions Changes to functions implementation physical-plan Changes to the physical-plan crate proto Related to proto crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Simplify LazyMemoryExec with SendableRecordBatchStream

2 participants