Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion ffi/examples/read-table/arrow.c
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,18 @@ static ExclusiveEngineData* apply_transform(
return data;
}
print_diag(" Applying transform\n");
SharedExpressionEvaluator* evaluator = new_expression_evaluator(
ExternResultHandleSharedExpressionEvaluator evaluator_res = new_expression_evaluator(
context->engine,
context->physical_schema, // input schema
context->arrow_context->cur_transform,
context->logical_schema); // output schema
if (evaluator_res.tag != OkHandleSharedExpressionEvaluator) {
print_error("Failed to create expression evaluator.", (Error*)evaluator_res.err);
free_error((Error*)evaluator_res.err);
free_engine_data(data);
return NULL;
}
SharedExpressionEvaluator* evaluator = evaluator_res.ok;
ExternResultHandleExclusiveEngineData transformed_res = evaluate_expression(
context->engine,
&data,
Expand Down
22 changes: 13 additions & 9 deletions ffi/src/engine_funcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,26 +156,28 @@ pub unsafe extern "C" fn new_expression_evaluator(
expression: &Expression,
// TODO: Make this a data_type, and give a way for c code to go between schema <-> datatype
output_type: Handle<SharedSchema>,
) -> Handle<SharedExpressionEvaluator> {
) -> ExternResult<Handle<SharedExpressionEvaluator>> {
let engine = unsafe { engine.clone_as_arc() };
let input_schema = unsafe { input_schema.clone_as_arc() };
let output_type: DataType = output_type.as_ref().clone().into();
let expression = Arc::new(expression.clone());
new_expression_evaluator_impl(engine, input_schema, expression, output_type)
let res = new_expression_evaluator_impl(engine.clone(), input_schema, expression, output_type);
res.into_extern_result(&engine.as_ref())
}

fn new_expression_evaluator_impl(
extern_engine: Arc<dyn ExternEngine>,
input_schema: SchemaRef,
expression: ExpressionRef,
output_type: DataType,
) -> Handle<SharedExpressionEvaluator> {
) -> DeltaResult<Handle<SharedExpressionEvaluator>> {
let engine = extern_engine.engine();
let evaluator =
engine
.evaluation_handler()
.new_expression_evaluator(input_schema, expression, output_type);
evaluator.into()
let evaluator = engine.evaluation_handler().new_expression_evaluator(
input_schema,
expression,
output_type,
)?;
Ok(evaluator.into())
}

/// Free an expression evaluator
Expand Down Expand Up @@ -215,6 +217,7 @@ fn evaluate_expression_impl(
#[cfg(test)]
mod tests {
use super::{free_expression_evaluator, new_expression_evaluator};
use crate::ffi_test_utils::ok_or_panic;
use crate::{free_engine, handle::Handle, tests::get_default_engine, SharedSchema};
use delta_kernel::{
schema::{DataType, StructField, StructType},
Expand All @@ -232,12 +235,13 @@ mod tests {
let output_type: Handle<SharedSchema> = in_schema.clone().into();
let in_schema_handle: Handle<SharedSchema> = in_schema.into();
unsafe {
let evaluator = new_expression_evaluator(
let result = new_expression_evaluator(
engine.shallow_copy(),
in_schema_handle.shallow_copy(),
&expr,
output_type.shallow_copy(),
);
let evaluator = ok_or_panic(result);
in_schema_handle.drop_handle();
output_type.drop_handle();
free_engine(engine);
Expand Down
1 change: 1 addition & 0 deletions kernel/src/actions/visitors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1169,6 +1169,7 @@ mod tests {
expression.into(),
InCommitTimestampVisitor::schema().into(),
)
.unwrap()
.evaluate(batch.as_ref())
.unwrap()
}
Expand Down
12 changes: 6 additions & 6 deletions kernel/src/engine/arrow_expression/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,23 +238,23 @@ impl EvaluationHandler for ArrowEvaluationHandler {
schema: SchemaRef,
expression: ExpressionRef,
output_type: DataType,
) -> Arc<dyn ExpressionEvaluator> {
Arc::new(DefaultExpressionEvaluator {
) -> DeltaResult<Arc<dyn ExpressionEvaluator>> {
Ok(Arc::new(DefaultExpressionEvaluator {
input_schema: schema,
expression,
output_type,
})
}))
}

fn new_predicate_evaluator(
&self,
schema: SchemaRef,
predicate: PredicateRef,
) -> Arc<dyn PredicateEvaluator> {
Arc::new(DefaultPredicateEvaluator {
) -> DeltaResult<Arc<dyn PredicateEvaluator>> {
Ok(Arc::new(DefaultPredicateEvaluator {
input_schema: schema,
predicate,
})
}))
}

/// Create a single-row array with all-null leaf values. Note that if a nested struct is
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/engine/default/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ impl<E: TaskExecutor> DefaultEngine<E> {
input_schema.into(),
transform.clone(),
output_schema.clone().into(),
);
)?;
let physical_data = logical_to_physical_expr.evaluate(data)?;
self.parquet
.write_parquet_file(write_context.target_dir(), physical_data, partition_values)
Expand Down
7 changes: 4 additions & 3 deletions kernel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ pub trait EvaluationHandler: AsAny {
input_schema: SchemaRef,
expression: ExpressionRef,
output_type: DataType,
) -> Arc<dyn ExpressionEvaluator>;
) -> DeltaResult<Arc<dyn ExpressionEvaluator>>;

/// Create a [`PredicateEvaluator`] that can evaluate the given [`Predicate`] on columnar
/// batches with the given [`Schema`] to produce a column of boolean results.
Expand All @@ -443,7 +443,7 @@ pub trait EvaluationHandler: AsAny {
&self,
input_schema: SchemaRef,
predicate: PredicateRef,
) -> Arc<dyn PredicateEvaluator>;
) -> DeltaResult<Arc<dyn PredicateEvaluator>>;

/// Create a single-row all-null-value [`EngineData`] with the schema specified by
/// `output_schema`.
Expand Down Expand Up @@ -474,7 +474,8 @@ trait EvaluationHandlerExtension: EvaluationHandler {
schema_transform.transform_struct(schema.as_ref());
let row_expr = schema_transform.try_into_expr()?;

let eval = self.new_expression_evaluator(null_row_schema, row_expr.into(), schema.into());
let eval =
self.new_expression_evaluator(null_row_schema, row_expr.into(), schema.into())?;
eval.evaluate(null_row.as_ref())
}
}
Expand Down
42 changes: 29 additions & 13 deletions kernel/src/scan/data_skipping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::borrow::Cow;
use std::cmp::Ordering;
use std::sync::{Arc, LazyLock};

use tracing::debug;
use tracing::{debug, error};

use crate::actions::get_log_add_schema;
use crate::actions::visitors::SelectionVectorVisitor;
Expand Down Expand Up @@ -134,21 +134,37 @@ impl DataSkippingFilter {
//
// 3. The selection evaluator does DISTINCT(col(predicate), 'false') to produce true (= keep) when
// the predicate is true/null and false (= skip) when the predicate is false.
let select_stats_evaluator = engine.evaluation_handler().new_expression_evaluator(
// safety: kernel is very broken if we don't have the schema for Add actions
get_log_add_schema().clone(),
STATS_EXPR.clone(),
DataType::STRING,
);

let skipping_evaluator = engine.evaluation_handler().new_predicate_evaluator(
stats_schema.clone(),
Arc::new(as_sql_data_skipping_predicate(&predicate)?),
);
let select_stats_evaluator = engine
.evaluation_handler()
.new_expression_evaluator(
// safety: kernel is very broken if we don't have the schema for Add actions
get_log_add_schema().clone(),
STATS_EXPR.clone(),
DataType::STRING,
)
// A stats expression failure here doesn't affect correctness
// as its a performance optimization so we log the error and continue.
.inspect_err(|e| error!("Failed to create select stats evaluator: {e}"))
.ok()?;

let skipping_evaluator = engine
.evaluation_handler()
.new_predicate_evaluator(
stats_schema.clone(),
Arc::new(as_sql_data_skipping_predicate(&predicate)?),
)
// A skipping predicate expression failure here doesn't affect correctness
// as its a performance optimization so we log the error and continue.
.inspect_err(|e| error!("Failed to create skipping evaluator: {e}"))
.ok()?;

let filter_evaluator = engine
.evaluation_handler()
.new_predicate_evaluator(stats_schema.clone(), FILTER_PRED.clone());
.new_predicate_evaluator(stats_schema.clone(), FILTER_PRED.clone())
// A filter predicate expression failure here doesn't affect correctness
// as its a performance optimization so we log the error and continue.
.inspect_err(|e| error!("Failed to create filter evaluator: {e}"))
.ok()?;

Some(Self {
stats_schema,
Expand Down
21 changes: 12 additions & 9 deletions kernel/src/scan/log_replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub(crate) struct ScanLogReplayProcessor {

impl ScanLogReplayProcessor {
/// Create a new [`ScanLogReplayProcessor`] instance
fn new(engine: &dyn Engine, state_info: Arc<StateInfo>) -> Self {
fn new(engine: &dyn Engine, state_info: Arc<StateInfo>) -> DeltaResult<Self> {
// Extract the physical predicate from StateInfo's PhysicalPredicate enum.
// The DataSkippingFilter and partition_filter components expect the predicate
// in the format Option<(PredicateRef, SchemaRef)>, so we need to convert from
Expand All @@ -72,17 +72,17 @@ impl ScanLogReplayProcessor {
None
}
};
Self {
Ok(Self {
partition_filter: physical_predicate.as_ref().map(|(e, _)| e.clone()),
data_skipping_filter: DataSkippingFilter::new(engine, physical_predicate),
add_transform: engine.evaluation_handler().new_expression_evaluator(
get_log_add_schema().clone(),
get_add_transform_expr(),
SCAN_ROW_DATATYPE.clone(),
),
)?,
seen_file_keys: Default::default(),
state_info,
}
})
}
}

Expand Down Expand Up @@ -385,8 +385,8 @@ pub(crate) fn scan_action_iter(
engine: &dyn Engine,
action_iter: impl Iterator<Item = DeltaResult<ActionsBatch>>,
state_info: Arc<StateInfo>,
) -> impl Iterator<Item = DeltaResult<ScanMetadata>> {
ScanLogReplayProcessor::new(engine, state_info).process_actions_iter(action_iter)
) -> DeltaResult<impl Iterator<Item = DeltaResult<ScanMetadata>>> {
Ok(ScanLogReplayProcessor::new(engine, state_info)?.process_actions_iter(action_iter))
}

#[cfg(test)]
Expand Down Expand Up @@ -478,7 +478,8 @@ mod tests {
.into_iter()
.map(|batch| Ok(ActionsBatch::new(batch as _, true))),
state_info,
);
)
.unwrap();
for res in iter {
let scan_metadata = res.unwrap();
assert!(
Expand All @@ -503,7 +504,8 @@ mod tests {
.into_iter()
.map(|batch| Ok(ActionsBatch::new(batch as _, true))),
Arc::new(state_info),
);
)
.unwrap();

fn validate_transform(transform: Option<&ExpressionRef>, expected_date_offset: i32) {
assert!(transform.is_some());
Expand Down Expand Up @@ -580,7 +582,8 @@ mod tests {
.into_iter()
.map(|batch| Ok(ActionsBatch::new(batch as _, true))),
Arc::new(state_info),
);
)
.unwrap();

for res in iter {
let scan_metadata = res.unwrap();
Expand Down
7 changes: 4 additions & 3 deletions kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ impl Scan {
scan_row_schema(),
get_scan_metadata_transform_expr(),
RESTORED_ADD_SCHEMA.clone(),
);
)?;
let apply_transform = move |data: Box<dyn EngineData>| {
Ok(ActionsBatch::new(transform.evaluate(data.as_ref())?, false))
};
Expand Down Expand Up @@ -537,7 +537,7 @@ impl Scan {
if let PhysicalPredicate::StaticSkipAll = self.state_info.physical_predicate {
return Ok(None.into_iter().flatten());
}
let it = scan_action_iter(engine, action_batch_iter, self.state_info.clone());
let it = scan_action_iter(engine, action_batch_iter, self.state_info.clone())?;
Ok(Some(it).into_iter().flatten())
}

Expand Down Expand Up @@ -850,7 +850,8 @@ pub(crate) mod test_utils {
.into_iter()
.map(|batch| Ok(ActionsBatch::new(batch as _, true))),
state_info,
);
)
.unwrap();
let mut batch_count = 0;
for res in iter {
let scan_metadata = res.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/scan/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ pub fn transform_to_logical(
physical_schema.clone(),
transform,
logical_schema.clone().into(), // TODO: expensive deep clone!
)
)?
.evaluate(physical_data.as_ref()),
None => Ok(physical_data),
}
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/table_changes/log_replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ impl LogReplayScanner {
get_log_add_schema().clone(),
Arc::new(cdf_scan_row_expression(timestamp, commit_version)),
cdf_scan_row_schema().into(),
);
)?;

let result = action_iter.map(move |actions| -> DeltaResult<_> {
let actions = actions?;
Expand Down
16 changes: 9 additions & 7 deletions kernel/src/table_changes/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,13 +239,15 @@ fn read_scan_file(
let transform_expr = get_cdf_transform_expr(&scan_file, state_info, physical_schema.as_ref())?;

// Only create an evaluator if transformation is needed
let phys_to_logical_eval = transform_expr.map(|expr| {
engine.evaluation_handler().new_expression_evaluator(
physical_schema.clone(),
expr,
state_info.logical_schema.clone().into(),
)
});
let phys_to_logical_eval = transform_expr
.map(|expr| {
engine.evaluation_handler().new_expression_evaluator(
physical_schema.clone(),
expr,
state_info.logical_schema.clone().into(),
)
})
.transpose()?;
// Determine if the scan file was derived from a deletion vector pair
let is_dv_resolved_pair = scan_file.remove_dv.is_some();

Expand Down
2 changes: 1 addition & 1 deletion kernel/src/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ impl Transaction {
input_schema.clone(),
Arc::new(adds_expr),
as_log_add_schema(output_schema.clone()).into(),
);
)?;
adds_evaluator.evaluate(add_files_batch?.deref())
})
}
Expand Down
Loading