From 94bfec19ad88d0953050fa49d9ce4c0032ea6e05 Mon Sep 17 00:00:00 2001 From: Huaijin Date: Mon, 24 Nov 2025 19:22:07 +0800 Subject: [PATCH 1/4] feat: enhance VortexOpener with selection support in ScanBuilder Signed-off-by: Huaijin --- vortex-datafusion/src/persistent/opener.rs | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index a0fa396b0c4..531a34b8671 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -39,7 +39,7 @@ use vortex::expr::root; use vortex::expr::select; use vortex::layout::LayoutReader; use vortex::metrics::VortexMetrics; -use vortex::scan::ScanBuilder; +use vortex::scan::{ScanBuilder, Selection}; use vortex::session::VortexSession; use vortex_utils::aliases::dash_map::DashMap; use vortex_utils::aliases::dash_map::Entry; @@ -174,6 +174,7 @@ impl FileOpener for VortexOpener { let metrics = self.metrics.clone(); let layout_reader = self.layout_readers.clone(); let has_output_ordering = self.has_output_ordering; + let extensions = file_meta.extensions.clone(); let projected_schema = match projection.as_ref() { None => logical_schema.clone(), @@ -305,6 +306,9 @@ impl FileOpener for VortexOpener { }; let mut scan_builder = ScanBuilder::new(session, layout_reader); + if let Some(initial_plan) = create_initial_plan(extensions) { + scan_builder = scan_builder.with_selection(initial_plan); + } if let Some(file_range) = file_meta.range { scan_builder = apply_byte_range( file_range, @@ -407,6 +411,17 @@ fn byte_range_to_row_range(byte_range: Range, row_count: u64, total_size: u start_row..u64::min(row_count, end_row) } +fn create_initial_plan( + extensions: Option>, +) -> Option { + if let Some(extensions) = extensions + && let Some(selection) = extensions.downcast_ref::() + { + return Some(selection.clone()); + } + None +} + #[cfg(test)] mod tests { use std::sync::LazyLock; From 9f4cacd384c6e018930ea114956481f6c3741f73 Mon Sep 17 00:00:00 2001 From: Huaijin Date: Thu, 27 Nov 2025 14:36:56 +0800 Subject: [PATCH 2/4] add test case Signed-off-by: Huaijin --- vortex-datafusion/src/persistent/opener.rs | 241 ++++++++++++++++++++- 1 file changed, 239 insertions(+), 2 deletions(-) diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index 531a34b8671..afb92891506 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -39,7 +39,8 @@ use vortex::expr::root; use vortex::expr::select; use vortex::layout::LayoutReader; use vortex::metrics::VortexMetrics; -use vortex::scan::{ScanBuilder, Selection}; +use vortex::scan::ScanBuilder; +use vortex::scan::Selection; use vortex::session::VortexSession; use vortex_utils::aliases::dash_map::DashMap; use vortex_utils::aliases::dash_map::Entry; @@ -174,7 +175,7 @@ impl FileOpener for VortexOpener { let metrics = self.metrics.clone(); let layout_reader = self.layout_readers.clone(); let has_output_ordering = self.has_output_ordering; - let extensions = file_meta.extensions.clone(); + let extensions = file.extensions.clone(); let projected_schema = match projection.as_ref() { None => logical_schema.clone(), @@ -448,6 +449,7 @@ mod tests { use rstest::rstest; use vortex::VortexSessionDefault; use vortex::arrow::FromArrowArray; + use vortex::buffer::Buffer; use vortex::file::WriteOptionsSessionExt; use vortex::io::ObjectStoreWriter; use vortex::io::VortexWrite; @@ -532,6 +534,44 @@ mod tests { } } + fn make_test_batch_with_10_rows() -> RecordBatch { + record_batch!( + ( + "a", + Int32, + vec![ + Some(0), + Some(1), + Some(2), + Some(3), + Some(4), + Some(5), + Some(6), + Some(7), + Some(8), + Some(9) + ] + ), + ( + "b", + Utf8, + vec![ + Some("row0"), + Some("row1"), + Some("row2"), + Some("row3"), + Some("row4"), + Some("row5"), + Some("row6"), + Some("row7"), + Some("row8"), + Some("row9") + ] + ) + ) + .unwrap() + } + #[rstest] #[case(Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _), (1, 3), (0, 0))] // If we don't have a physical expr adapter, we just drop filters on partition values @@ -928,4 +968,201 @@ mod tests { Ok(()) } + + #[tokio::test] + // Test that Selection::IncludeByIndex filters to specific row indices. + async fn test_selection_include_by_index() -> anyhow::Result<()> { + use datafusion::arrow::util::pretty::pretty_format_batches_with_options; + + let object_store = Arc::new(InMemory::new()) as Arc; + let file_path = "/path/file.vortex"; + + let batch = make_test_batch_with_10_rows(); + let data_size = + write_arrow_to_vortex(object_store.clone(), file_path, batch.clone()).await?; + + let table_schema = batch.schema(); + let file_meta = make_meta(file_path, data_size); + let mut file = PartitionedFile::new(file_path.to_string(), data_size); + file.extensions = Some(Arc::new(Selection::IncludeByIndex(Buffer::from_iter( + vec![1, 3, 5, 7], + )))); + + let opener = VortexOpener { + session: SESSION.clone(), + object_store: object_store.clone(), + projection: Some(vec![0, 1].into()), + filter: None, + file_pruning_predicate: None, + expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _), + schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), + partition_fields: vec![], + file_cache: VortexFileCache::new(1, 1, SESSION.clone()), + logical_schema: table_schema.clone(), + batch_size: 100, + limit: None, + metrics: Default::default(), + layout_readers: Default::default(), + has_output_ordering: false, + }; + + let stream = opener.open(file_meta, file)?.await?; + let data = stream.try_collect::>().await?; + let format_opts = FormatOptions::new().with_types_info(true); + + assert_snapshot!(pretty_format_batches_with_options(&data, &format_opts)?.to_string(), @r" + +-------+------+ + | a | b | + | Int32 | Utf8 | + +-------+------+ + | 1 | row1 | + | 3 | row3 | + | 5 | row5 | + | 7 | row7 | + +-------+------+ + "); + + Ok(()) + } + + #[tokio::test] + // Test that Selection::ExcludeByIndex excludes specific row indices. + async fn test_selection_exclude_by_index() -> anyhow::Result<()> { + use datafusion::arrow::util::pretty::pretty_format_batches_with_options; + + let object_store = Arc::new(InMemory::new()) as Arc; + let file_path = "/path/file.vortex"; + + let batch = make_test_batch_with_10_rows(); + let data_size = + write_arrow_to_vortex(object_store.clone(), file_path, batch.clone()).await?; + + let table_schema = batch.schema(); + let file_meta = make_meta(file_path, data_size); + let mut file = PartitionedFile::new(file_path.to_string(), data_size); + file.extensions = Some(Arc::new(Selection::ExcludeByIndex(Buffer::from_iter( + vec![0, 2, 4, 6, 8], + )))); + + let opener = VortexOpener { + session: SESSION.clone(), + object_store: object_store.clone(), + projection: Some(vec![0, 1].into()), + filter: None, + file_pruning_predicate: None, + expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _), + schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), + partition_fields: vec![], + file_cache: VortexFileCache::new(1, 1, SESSION.clone()), + logical_schema: table_schema.clone(), + batch_size: 100, + limit: None, + metrics: Default::default(), + layout_readers: Default::default(), + has_output_ordering: false, + }; + + let stream = opener.open(file_meta, file)?.await?; + let data = stream.try_collect::>().await?; + let format_opts = FormatOptions::new().with_types_info(true); + + assert_snapshot!(pretty_format_batches_with_options(&data, &format_opts)?.to_string(), @r" + +-------+------+ + | a | b | + | Int32 | Utf8 | + +-------+------+ + | 1 | row1 | + | 3 | row3 | + | 5 | row5 | + | 7 | row7 | + | 9 | row9 | + +-------+------+ + "); + + Ok(()) + } + + #[tokio::test] + // Test that Selection::All returns all rows. + async fn test_selection_all() -> anyhow::Result<()> { + let object_store = Arc::new(InMemory::new()) as Arc; + let file_path = "/path/file.vortex"; + + let batch = make_test_batch_with_10_rows(); + let data_size = + write_arrow_to_vortex(object_store.clone(), file_path, batch.clone()).await?; + + let table_schema = batch.schema(); + let file_meta = make_meta(file_path, data_size); + let mut file = PartitionedFile::new(file_path.to_string(), data_size); + file.extensions = Some(Arc::new(Selection::All)); + + let opener = VortexOpener { + session: SESSION.clone(), + object_store: object_store.clone(), + projection: Some(vec![0].into()), + filter: None, + file_pruning_predicate: None, + expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _), + schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), + partition_fields: vec![], + file_cache: VortexFileCache::new(1, 1, SESSION.clone()), + logical_schema: table_schema.clone(), + batch_size: 100, + limit: None, + metrics: Default::default(), + layout_readers: Default::default(), + has_output_ordering: false, + }; + + let stream = opener.open(file_meta, file)?.await?; + let data = stream.try_collect::>().await?; + + let total_rows: usize = data.iter().map(|rb| rb.num_rows()).sum(); + assert_eq!(total_rows, 10); + + Ok(()) + } + + #[tokio::test] + // Test that when no extensions are provided, all rows are returned (backward compatibility). + async fn test_selection_no_extensions() -> anyhow::Result<()> { + let object_store = Arc::new(InMemory::new()) as Arc; + let file_path = "/path/file.vortex"; + + let batch = make_test_batch_with_10_rows(); + let data_size = + write_arrow_to_vortex(object_store.clone(), file_path, batch.clone()).await?; + + let table_schema = batch.schema(); + let file_meta = make_meta(file_path, data_size); + let file = PartitionedFile::new(file_path.to_string(), data_size); + // file.extensions is None by default + + let opener = VortexOpener { + session: SESSION.clone(), + object_store: object_store.clone(), + projection: Some(vec![0].into()), + filter: None, + file_pruning_predicate: None, + expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _), + schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), + partition_fields: vec![], + file_cache: VortexFileCache::new(1, 1, SESSION.clone()), + logical_schema: table_schema.clone(), + batch_size: 100, + limit: None, + metrics: Default::default(), + layout_readers: Default::default(), + has_output_ordering: false, + }; + + let stream = opener.open(file_meta, file)?.await?; + let data = stream.try_collect::>().await?; + + let total_rows: usize = data.iter().map(|rb| rb.num_rows()).sum(); + assert_eq!(total_rows, 10); + + Ok(()) + } } From 6b49b97ac6a924ca43bc0c3e2e0d1d97bbe073c0 Mon Sep 17 00:00:00 2001 From: Huaijin Date: Thu, 27 Nov 2025 19:00:10 +0800 Subject: [PATCH 3/4] apply suggestion Signed-off-by: Huaijin --- vortex-datafusion/src/persistent/opener.rs | 48 +++++----------------- 1 file changed, 11 insertions(+), 37 deletions(-) diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index afb92891506..28674d9e419 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -536,37 +536,11 @@ mod tests { fn make_test_batch_with_10_rows() -> RecordBatch { record_batch!( - ( - "a", - Int32, - vec![ - Some(0), - Some(1), - Some(2), - Some(3), - Some(4), - Some(5), - Some(6), - Some(7), - Some(8), - Some(9) - ] - ), + ("a", Int32, (0..=9).map(Some).collect::>()), ( "b", Utf8, - vec![ - Some("row0"), - Some("row1"), - Some("row2"), - Some("row3"), - Some("row4"), - Some("row5"), - Some("row6"), - Some("row7"), - Some("row8"), - Some("row9") - ] + (0..=9).map(|i| Some(format!("r{}", i))).collect::>() ) ) .unwrap() @@ -1015,10 +989,10 @@ mod tests { | a | b | | Int32 | Utf8 | +-------+------+ - | 1 | row1 | - | 3 | row3 | - | 5 | row5 | - | 7 | row7 | + | 1 | r1 | + | 3 | r3 | + | 5 | r5 | + | 7 | r7 | +-------+------+ "); @@ -1071,11 +1045,11 @@ mod tests { | a | b | | Int32 | Utf8 | +-------+------+ - | 1 | row1 | - | 3 | row3 | - | 5 | row5 | - | 7 | row7 | - | 9 | row9 | + | 1 | r1 | + | 3 | r3 | + | 5 | r5 | + | 7 | r7 | + | 9 | r9 | +-------+------+ "); From f26674e5f80fb4aed71e9f5ed52185e331ab30af Mon Sep 17 00:00:00 2001 From: Huaijin Date: Sat, 29 Nov 2025 19:33:40 +0800 Subject: [PATCH 4/4] add doc Signed-off-by: Huaijin --- vortex-datafusion/src/persistent/opener.rs | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index 2a204669876..f6f6b0a454c 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -307,8 +307,8 @@ impl FileOpener for VortexOpener { }; let mut scan_builder = ScanBuilder::new(session, layout_reader); - if let Some(initial_plan) = create_initial_plan(extensions) { - scan_builder = scan_builder.with_selection(initial_plan); + if let Some(selection) = get_selection_from_extensions(extensions) { + scan_builder = scan_builder.with_selection(selection); } if let Some(file_range) = file_meta.range { scan_builder = apply_byte_range( @@ -412,7 +412,19 @@ fn byte_range_to_row_range(byte_range: Range, row_count: u64, total_size: u start_row..u64::min(row_count, end_row) } -fn create_initial_plan( +/// Attempts to extract a `Selection` from the extensions object, if present. +/// +/// This function is used to retrieve the row selection plan that may have been +/// attached to a `PartitionedFile` via its `extensions` field. +/// +/// # Arguments +/// +/// * `extensions` - Optional type-erased extensions object that may contain a `Selection` +/// +/// # Returns +/// +/// Returns `Some(Selection)` if the extensions contain a valid `Selection`, otherwise `None`. +fn get_selection_from_extensions( extensions: Option>, ) -> Option { if let Some(extensions) = extensions