Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions vortex-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ object_store = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "fs"] }
tokio-stream = { workspace = true }
tracing = { workspace = true }
url = { workspace = true }
vortex = { workspace = true, features = ["object_store", "tokio"] }
vortex-utils = { workspace = true, features = ["dashmap"] }

Expand Down
82 changes: 2 additions & 80 deletions vortex-datafusion/src/convert/exprs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,8 @@ pub(crate) fn can_be_pushed_down(df_expr: &PhysicalExprRef, schema: &Schema) ->
} else if let Some(in_list) = expr.downcast_ref::<df_expr::InListExpr>() {
can_be_pushed_down(in_list.expr(), schema)
&& in_list.list().iter().all(|e| can_be_pushed_down(e, schema))
} else if let Some(scalar_fn) = expr.downcast_ref::<ScalarFunctionExpr>() {
can_scalar_fn_be_pushed_down(scalar_fn, schema)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as discussed in the other PR, this was never needed, and in fact is not possible to check in the source because we don't know which fields are present in the file or not.

When we've reached this point, DF planner has already determined that all column references are valid, we just need to ensure that the only scalarfunction that gets called is the GetFieldFunc

} else if ScalarFunctionExpr::try_downcast_func::<GetFieldFunc>(df_expr.as_ref()).is_some() {
true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will returning true here avoid the planning of a downstream filter? We've talked about edge cases before, i.e. is not null on a non-existent column in one file that is filled with nulls. I don't have a good answer for this and tbh care more about perf than correctness for these edge cases.

One thing we've mentioned is to pass in the list of file schemas to the source so it can check for these cases so we can get the best of both worlds, not sure how you view that.

I also am not very familiar with the try_pushdown_filters interface so this might not be a problem. Maybe the issue is that we can only return PushedDown::Yes or PushedDown::No and might need to upstream an equivalent of Inexact that table providers return for filters.

} else {
tracing::debug!(%df_expr, "DataFusion expression can't be pushed down");
false
Expand Down Expand Up @@ -292,60 +292,12 @@ fn supported_data_types(dt: &DataType) -> bool {
is_supported
}

/// Checks if a GetField scalar function can be pushed down.
fn can_scalar_fn_be_pushed_down(scalar_fn: &ScalarFunctionExpr, schema: &Schema) -> bool {
let Some(get_field_fn) = ScalarFunctionExpr::try_downcast_func::<GetFieldFunc>(scalar_fn)
else {
// Only get_field pushdown is supported.
return false;
};

let args = get_field_fn.args();
if args.len() != 2 {
tracing::debug!(
"Expected 2 arguments for GetField, not pushing down {} arguments",
args.len()
);
return false;
}
let source_expr = &args[0];
let field_name_expr = &args[1];
let Some(field_name) = field_name_expr
.as_any()
.downcast_ref::<df_expr::Literal>()
.and_then(|lit| lit.value().try_as_str().flatten())
else {
return false;
};

let Ok(source_dt) = source_expr.data_type(schema) else {
tracing::debug!(
field_name = field_name,
schema = ?schema,
source_expr = ?source_expr,
"Failed to get source type for GetField, not pushing down"
);
return false;
};
let DataType::Struct(fields) = source_dt else {
tracing::debug!(
field_name = field_name,
schema = ?schema,
source_expr = ?source_expr,
"Failed to get source type as struct for GetField, not pushing down"
);
return false;
};
fields.find(field_name).is_some()
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use arrow_schema::DataType;
use arrow_schema::Field;
use arrow_schema::Fields;
use arrow_schema::Schema;
use arrow_schema::TimeUnit as ArrowTimeUnit;
use datafusion::functions::core::getfield::GetFieldFunc;
Expand Down Expand Up @@ -673,34 +625,4 @@ mod tests {
└── input: vortex.root
"#);
}

#[rstest]
#[case::valid_field("field1", true)]
#[case::missing_field("nonexistent_field", false)]
fn test_can_be_pushed_down_get_field(#[case] field_name: &str, #[case] expected: bool) {
let struct_fields = Fields::from(vec![
Field::new("field1", DataType::Utf8, true),
Field::new("field2", DataType::Int32, true),
]);
let schema = Schema::new(vec![Field::new(
"my_struct",
DataType::Struct(struct_fields),
true,
)]);

let struct_col = Arc::new(df_expr::Column::new("my_struct", 0)) as Arc<dyn PhysicalExpr>;
let field_name_lit = Arc::new(df_expr::Literal::new(ScalarValue::Utf8(Some(
field_name.to_string(),
)))) as Arc<dyn PhysicalExpr>;

let get_field_expr = Arc::new(ScalarFunctionExpr::new(
"get_field",
Arc::new(ScalarUDF::from(GetFieldFunc::new())),
vec![struct_col, field_name_lit],
Arc::new(Field::new(field_name, DataType::Utf8, true)),
Arc::new(ConfigOptions::new()),
)) as Arc<dyn PhysicalExpr>;

assert_eq!(can_be_pushed_down(&get_field_expr, &schema), expected);
}
}
Loading
Loading