Skip to content

Commit 9f4cacd

Browse files
committed
add test case
Signed-off-by: Huaijin <[email protected]>
1 parent 94bfec1 commit 9f4cacd

File tree

1 file changed

+239
-2
lines changed

1 file changed

+239
-2
lines changed

vortex-datafusion/src/persistent/opener.rs

Lines changed: 239 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ use vortex::expr::root;
3939
use vortex::expr::select;
4040
use vortex::layout::LayoutReader;
4141
use vortex::metrics::VortexMetrics;
42-
use vortex::scan::{ScanBuilder, Selection};
42+
use vortex::scan::ScanBuilder;
43+
use vortex::scan::Selection;
4344
use vortex::session::VortexSession;
4445
use vortex_utils::aliases::dash_map::DashMap;
4546
use vortex_utils::aliases::dash_map::Entry;
@@ -174,7 +175,7 @@ impl FileOpener for VortexOpener {
174175
let metrics = self.metrics.clone();
175176
let layout_reader = self.layout_readers.clone();
176177
let has_output_ordering = self.has_output_ordering;
177-
let extensions = file_meta.extensions.clone();
178+
let extensions = file.extensions.clone();
178179

179180
let projected_schema = match projection.as_ref() {
180181
None => logical_schema.clone(),
@@ -448,6 +449,7 @@ mod tests {
448449
use rstest::rstest;
449450
use vortex::VortexSessionDefault;
450451
use vortex::arrow::FromArrowArray;
452+
use vortex::buffer::Buffer;
451453
use vortex::file::WriteOptionsSessionExt;
452454
use vortex::io::ObjectStoreWriter;
453455
use vortex::io::VortexWrite;
@@ -532,6 +534,44 @@ mod tests {
532534
}
533535
}
534536

537+
fn make_test_batch_with_10_rows() -> RecordBatch {
538+
record_batch!(
539+
(
540+
"a",
541+
Int32,
542+
vec![
543+
Some(0),
544+
Some(1),
545+
Some(2),
546+
Some(3),
547+
Some(4),
548+
Some(5),
549+
Some(6),
550+
Some(7),
551+
Some(8),
552+
Some(9)
553+
]
554+
),
555+
(
556+
"b",
557+
Utf8,
558+
vec![
559+
Some("row0"),
560+
Some("row1"),
561+
Some("row2"),
562+
Some("row3"),
563+
Some("row4"),
564+
Some("row5"),
565+
Some("row6"),
566+
Some("row7"),
567+
Some("row8"),
568+
Some("row9")
569+
]
570+
)
571+
)
572+
.unwrap()
573+
}
574+
535575
#[rstest]
536576
#[case(Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _), (1, 3), (0, 0))]
537577
// If we don't have a physical expr adapter, we just drop filters on partition values
@@ -928,4 +968,201 @@ mod tests {
928968

929969
Ok(())
930970
}
971+
972+
#[tokio::test]
973+
// Test that Selection::IncludeByIndex filters to specific row indices.
974+
async fn test_selection_include_by_index() -> anyhow::Result<()> {
975+
use datafusion::arrow::util::pretty::pretty_format_batches_with_options;
976+
977+
let object_store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
978+
let file_path = "/path/file.vortex";
979+
980+
let batch = make_test_batch_with_10_rows();
981+
let data_size =
982+
write_arrow_to_vortex(object_store.clone(), file_path, batch.clone()).await?;
983+
984+
let table_schema = batch.schema();
985+
let file_meta = make_meta(file_path, data_size);
986+
let mut file = PartitionedFile::new(file_path.to_string(), data_size);
987+
file.extensions = Some(Arc::new(Selection::IncludeByIndex(Buffer::from_iter(
988+
vec![1, 3, 5, 7],
989+
))));
990+
991+
let opener = VortexOpener {
992+
session: SESSION.clone(),
993+
object_store: object_store.clone(),
994+
projection: Some(vec![0, 1].into()),
995+
filter: None,
996+
file_pruning_predicate: None,
997+
expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _),
998+
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
999+
partition_fields: vec![],
1000+
file_cache: VortexFileCache::new(1, 1, SESSION.clone()),
1001+
logical_schema: table_schema.clone(),
1002+
batch_size: 100,
1003+
limit: None,
1004+
metrics: Default::default(),
1005+
layout_readers: Default::default(),
1006+
has_output_ordering: false,
1007+
};
1008+
1009+
let stream = opener.open(file_meta, file)?.await?;
1010+
let data = stream.try_collect::<Vec<_>>().await?;
1011+
let format_opts = FormatOptions::new().with_types_info(true);
1012+
1013+
assert_snapshot!(pretty_format_batches_with_options(&data, &format_opts)?.to_string(), @r"
1014+
+-------+------+
1015+
| a | b |
1016+
| Int32 | Utf8 |
1017+
+-------+------+
1018+
| 1 | row1 |
1019+
| 3 | row3 |
1020+
| 5 | row5 |
1021+
| 7 | row7 |
1022+
+-------+------+
1023+
");
1024+
1025+
Ok(())
1026+
}
1027+
1028+
#[tokio::test]
1029+
// Test that Selection::ExcludeByIndex excludes specific row indices.
1030+
async fn test_selection_exclude_by_index() -> anyhow::Result<()> {
1031+
use datafusion::arrow::util::pretty::pretty_format_batches_with_options;
1032+
1033+
let object_store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
1034+
let file_path = "/path/file.vortex";
1035+
1036+
let batch = make_test_batch_with_10_rows();
1037+
let data_size =
1038+
write_arrow_to_vortex(object_store.clone(), file_path, batch.clone()).await?;
1039+
1040+
let table_schema = batch.schema();
1041+
let file_meta = make_meta(file_path, data_size);
1042+
let mut file = PartitionedFile::new(file_path.to_string(), data_size);
1043+
file.extensions = Some(Arc::new(Selection::ExcludeByIndex(Buffer::from_iter(
1044+
vec![0, 2, 4, 6, 8],
1045+
))));
1046+
1047+
let opener = VortexOpener {
1048+
session: SESSION.clone(),
1049+
object_store: object_store.clone(),
1050+
projection: Some(vec![0, 1].into()),
1051+
filter: None,
1052+
file_pruning_predicate: None,
1053+
expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _),
1054+
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
1055+
partition_fields: vec![],
1056+
file_cache: VortexFileCache::new(1, 1, SESSION.clone()),
1057+
logical_schema: table_schema.clone(),
1058+
batch_size: 100,
1059+
limit: None,
1060+
metrics: Default::default(),
1061+
layout_readers: Default::default(),
1062+
has_output_ordering: false,
1063+
};
1064+
1065+
let stream = opener.open(file_meta, file)?.await?;
1066+
let data = stream.try_collect::<Vec<_>>().await?;
1067+
let format_opts = FormatOptions::new().with_types_info(true);
1068+
1069+
assert_snapshot!(pretty_format_batches_with_options(&data, &format_opts)?.to_string(), @r"
1070+
+-------+------+
1071+
| a | b |
1072+
| Int32 | Utf8 |
1073+
+-------+------+
1074+
| 1 | row1 |
1075+
| 3 | row3 |
1076+
| 5 | row5 |
1077+
| 7 | row7 |
1078+
| 9 | row9 |
1079+
+-------+------+
1080+
");
1081+
1082+
Ok(())
1083+
}
1084+
1085+
#[tokio::test]
1086+
// Test that Selection::All returns all rows.
1087+
async fn test_selection_all() -> anyhow::Result<()> {
1088+
let object_store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
1089+
let file_path = "/path/file.vortex";
1090+
1091+
let batch = make_test_batch_with_10_rows();
1092+
let data_size =
1093+
write_arrow_to_vortex(object_store.clone(), file_path, batch.clone()).await?;
1094+
1095+
let table_schema = batch.schema();
1096+
let file_meta = make_meta(file_path, data_size);
1097+
let mut file = PartitionedFile::new(file_path.to_string(), data_size);
1098+
file.extensions = Some(Arc::new(Selection::All));
1099+
1100+
let opener = VortexOpener {
1101+
session: SESSION.clone(),
1102+
object_store: object_store.clone(),
1103+
projection: Some(vec![0].into()),
1104+
filter: None,
1105+
file_pruning_predicate: None,
1106+
expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _),
1107+
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
1108+
partition_fields: vec![],
1109+
file_cache: VortexFileCache::new(1, 1, SESSION.clone()),
1110+
logical_schema: table_schema.clone(),
1111+
batch_size: 100,
1112+
limit: None,
1113+
metrics: Default::default(),
1114+
layout_readers: Default::default(),
1115+
has_output_ordering: false,
1116+
};
1117+
1118+
let stream = opener.open(file_meta, file)?.await?;
1119+
let data = stream.try_collect::<Vec<_>>().await?;
1120+
1121+
let total_rows: usize = data.iter().map(|rb| rb.num_rows()).sum();
1122+
assert_eq!(total_rows, 10);
1123+
1124+
Ok(())
1125+
}
1126+
1127+
#[tokio::test]
1128+
// Test that when no extensions are provided, all rows are returned (backward compatibility).
1129+
async fn test_selection_no_extensions() -> anyhow::Result<()> {
1130+
let object_store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
1131+
let file_path = "/path/file.vortex";
1132+
1133+
let batch = make_test_batch_with_10_rows();
1134+
let data_size =
1135+
write_arrow_to_vortex(object_store.clone(), file_path, batch.clone()).await?;
1136+
1137+
let table_schema = batch.schema();
1138+
let file_meta = make_meta(file_path, data_size);
1139+
let file = PartitionedFile::new(file_path.to_string(), data_size);
1140+
// file.extensions is None by default
1141+
1142+
let opener = VortexOpener {
1143+
session: SESSION.clone(),
1144+
object_store: object_store.clone(),
1145+
projection: Some(vec![0].into()),
1146+
filter: None,
1147+
file_pruning_predicate: None,
1148+
expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _),
1149+
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
1150+
partition_fields: vec![],
1151+
file_cache: VortexFileCache::new(1, 1, SESSION.clone()),
1152+
logical_schema: table_schema.clone(),
1153+
batch_size: 100,
1154+
limit: None,
1155+
metrics: Default::default(),
1156+
layout_readers: Default::default(),
1157+
has_output_ordering: false,
1158+
};
1159+
1160+
let stream = opener.open(file_meta, file)?.await?;
1161+
let data = stream.try_collect::<Vec<_>>().await?;
1162+
1163+
let total_rows: usize = data.iter().map(|rb| rb.num_rows()).sum();
1164+
assert_eq!(total_rows, 10);
1165+
1166+
Ok(())
1167+
}
9311168
}

0 commit comments

Comments
 (0)