Skip to content
Open
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
252 changes: 252 additions & 0 deletions vortex-datafusion/src/persistent/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use vortex::expr::select;
use vortex::layout::LayoutReader;
use vortex::metrics::VortexMetrics;
use vortex::scan::ScanBuilder;
use vortex::scan::Selection;
use vortex::session::VortexSession;
use vortex_utils::aliases::dash_map::DashMap;
use vortex_utils::aliases::dash_map::Entry;
Expand Down Expand Up @@ -174,6 +175,7 @@ impl FileOpener for VortexOpener {
let metrics = self.metrics.clone();
let layout_reader = self.layout_readers.clone();
let has_output_ordering = self.has_output_ordering;
let extensions = file.extensions.clone();

let projected_schema = match projection.as_ref() {
None => logical_schema.clone(),
Expand Down Expand Up @@ -305,6 +307,9 @@ impl FileOpener for VortexOpener {
};

let mut scan_builder = ScanBuilder::new(session, layout_reader);
if let Some(initial_plan) = create_initial_plan(extensions) {
scan_builder = scan_builder.with_selection(initial_plan);
}
if let Some(file_range) = file_meta.range {
scan_builder = apply_byte_range(
file_range,
Expand Down Expand Up @@ -407,6 +412,17 @@ fn byte_range_to_row_range(byte_range: Range<u64>, row_count: u64, total_size: u
start_row..u64::min(row_count, end_row)
}

fn create_initial_plan(
extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
) -> Option<Selection> {
if let Some(extensions) = extensions
&& let Some(selection) = extensions.downcast_ref::<Selection>()
{
return Some(selection.clone());
}
None
}

#[cfg(test)]
mod tests {
use std::sync::LazyLock;
Expand All @@ -433,6 +449,7 @@ mod tests {
use rstest::rstest;
use vortex::VortexSessionDefault;
use vortex::arrow::FromArrowArray;
use vortex::buffer::Buffer;
use vortex::file::WriteOptionsSessionExt;
use vortex::io::ObjectStoreWriter;
use vortex::io::VortexWrite;
Expand Down Expand Up @@ -517,6 +534,44 @@ mod tests {
}
}

fn make_test_batch_with_10_rows() -> RecordBatch {
record_batch!(
(
"a",
Int32,
vec![
Some(0),
Some(1),
Some(2),
Some(3),
Some(4),
Some(5),
Some(6),
Some(7),
Some(8),
Some(9)
]
),
(
"b",
Utf8,
vec![
Some("row0"),
Some("row1"),
Some("row2"),
Some("row3"),
Some("row4"),
Some("row5"),
Some("row6"),
Some("row7"),
Some("row8"),
Some("row9")
]
)
)
.unwrap()
}

#[rstest]
#[case(Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _), (1, 3), (0, 0))]
// If we don't have a physical expr adapter, we just drop filters on partition values
Expand Down Expand Up @@ -913,4 +968,201 @@ mod tests {

Ok(())
}

#[tokio::test]
// Test that Selection::IncludeByIndex filters to specific row indices.
async fn test_selection_include_by_index() -> anyhow::Result<()> {
use datafusion::arrow::util::pretty::pretty_format_batches_with_options;

let object_store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
let file_path = "/path/file.vortex";

let batch = make_test_batch_with_10_rows();
let data_size =
write_arrow_to_vortex(object_store.clone(), file_path, batch.clone()).await?;

let table_schema = batch.schema();
let file_meta = make_meta(file_path, data_size);
let mut file = PartitionedFile::new(file_path.to_string(), data_size);
file.extensions = Some(Arc::new(Selection::IncludeByIndex(Buffer::from_iter(
vec![1, 3, 5, 7],
))));

let opener = VortexOpener {
session: SESSION.clone(),
object_store: object_store.clone(),
projection: Some(vec![0, 1].into()),
filter: None,
file_pruning_predicate: None,
expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _),
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
partition_fields: vec![],
file_cache: VortexFileCache::new(1, 1, SESSION.clone()),
logical_schema: table_schema.clone(),
batch_size: 100,
limit: None,
metrics: Default::default(),
layout_readers: Default::default(),
has_output_ordering: false,
};

let stream = opener.open(file_meta, file)?.await?;
let data = stream.try_collect::<Vec<_>>().await?;
let format_opts = FormatOptions::new().with_types_info(true);

assert_snapshot!(pretty_format_batches_with_options(&data, &format_opts)?.to_string(), @r"
+-------+------+
| a | b |
| Int32 | Utf8 |
+-------+------+
| 1 | row1 |
| 3 | row3 |
| 5 | row5 |
| 7 | row7 |
+-------+------+
");

Ok(())
}

#[tokio::test]
// Test that Selection::ExcludeByIndex excludes specific row indices.
async fn test_selection_exclude_by_index() -> anyhow::Result<()> {
use datafusion::arrow::util::pretty::pretty_format_batches_with_options;

let object_store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
let file_path = "/path/file.vortex";

let batch = make_test_batch_with_10_rows();
let data_size =
write_arrow_to_vortex(object_store.clone(), file_path, batch.clone()).await?;

let table_schema = batch.schema();
let file_meta = make_meta(file_path, data_size);
let mut file = PartitionedFile::new(file_path.to_string(), data_size);
file.extensions = Some(Arc::new(Selection::ExcludeByIndex(Buffer::from_iter(
vec![0, 2, 4, 6, 8],
))));

let opener = VortexOpener {
session: SESSION.clone(),
object_store: object_store.clone(),
projection: Some(vec![0, 1].into()),
filter: None,
file_pruning_predicate: None,
expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _),
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
partition_fields: vec![],
file_cache: VortexFileCache::new(1, 1, SESSION.clone()),
logical_schema: table_schema.clone(),
batch_size: 100,
limit: None,
metrics: Default::default(),
layout_readers: Default::default(),
has_output_ordering: false,
};

let stream = opener.open(file_meta, file)?.await?;
let data = stream.try_collect::<Vec<_>>().await?;
let format_opts = FormatOptions::new().with_types_info(true);

assert_snapshot!(pretty_format_batches_with_options(&data, &format_opts)?.to_string(), @r"
+-------+------+
| a | b |
| Int32 | Utf8 |
+-------+------+
| 1 | row1 |
| 3 | row3 |
| 5 | row5 |
| 7 | row7 |
| 9 | row9 |
+-------+------+
");

Ok(())
}

#[tokio::test]
// Test that Selection::All returns all rows.
async fn test_selection_all() -> anyhow::Result<()> {
let object_store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
let file_path = "/path/file.vortex";

let batch = make_test_batch_with_10_rows();
let data_size =
write_arrow_to_vortex(object_store.clone(), file_path, batch.clone()).await?;

let table_schema = batch.schema();
let file_meta = make_meta(file_path, data_size);
let mut file = PartitionedFile::new(file_path.to_string(), data_size);
file.extensions = Some(Arc::new(Selection::All));

let opener = VortexOpener {
session: SESSION.clone(),
object_store: object_store.clone(),
projection: Some(vec![0].into()),
filter: None,
file_pruning_predicate: None,
expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _),
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
partition_fields: vec![],
file_cache: VortexFileCache::new(1, 1, SESSION.clone()),
logical_schema: table_schema.clone(),
batch_size: 100,
limit: None,
metrics: Default::default(),
layout_readers: Default::default(),
has_output_ordering: false,
};

let stream = opener.open(file_meta, file)?.await?;
let data = stream.try_collect::<Vec<_>>().await?;

let total_rows: usize = data.iter().map(|rb| rb.num_rows()).sum();
assert_eq!(total_rows, 10);

Ok(())
}

#[tokio::test]
// Test that when no extensions are provided, all rows are returned (backward compatibility).
async fn test_selection_no_extensions() -> anyhow::Result<()> {
let object_store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
let file_path = "/path/file.vortex";

let batch = make_test_batch_with_10_rows();
let data_size =
write_arrow_to_vortex(object_store.clone(), file_path, batch.clone()).await?;

let table_schema = batch.schema();
let file_meta = make_meta(file_path, data_size);
let file = PartitionedFile::new(file_path.to_string(), data_size);
// file.extensions is None by default

let opener = VortexOpener {
session: SESSION.clone(),
object_store: object_store.clone(),
projection: Some(vec![0].into()),
filter: None,
file_pruning_predicate: None,
expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _),
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
partition_fields: vec![],
file_cache: VortexFileCache::new(1, 1, SESSION.clone()),
logical_schema: table_schema.clone(),
batch_size: 100,
limit: None,
metrics: Default::default(),
layout_readers: Default::default(),
has_output_ordering: false,
};

let stream = opener.open(file_meta, file)?.await?;
let data = stream.try_collect::<Vec<_>>().await?;

let total_rows: usize = data.iter().map(|rb| rb.num_rows()).sum();
assert_eq!(total_rows, 10);

Ok(())
}
}