Skip to content

Commit 9155ff7

Browse files
committed
feat!: Make expression and predicate evaluator constructors fallible
This PR refactors the `new_expression_evaluator` and `new_predicate_evaluator` methods in the `EvaluationHandler` trait to return `DeltaResult` instead of being infallible. This is a breaking API change that enables: - Better error reporting at evaluator construction time rather than evaluation - Early validation of expression/predicate compatibility with input schemas - More idiomatic Rust error handling patterns **Changes:** - Updated `EvaluationHandler` trait signatures to return `DeltaResult<Arc<dyn Evaluator>>` - Updated `ArrowEvaluationHandler` implementation to wrap returns in `Ok(...)` - Made `ScanLogReplayProcessor::new` fallible - Updated `scan_action_iter` to return `DeltaResult<impl Iterator<...>>` - Updated 12 callsites across the codebase to propagate errors with `?` operator - Updated FFI layer to handle errors (`.unwrap()` for backward compatibility) - All 630 existing kernel tests pass - All 18 FFI tests pass - Verified compilation with `cargo check --all-features` - No functional changes - purely refactoring error handling patterns
1 parent 2931ebb commit 9155ff7

File tree

12 files changed

+63
-48
lines changed

12 files changed

+63
-48
lines changed

ffi/src/engine_funcs.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -171,10 +171,10 @@ fn new_expression_evaluator_impl(
171171
output_type: DataType,
172172
) -> Handle<SharedExpressionEvaluator> {
173173
let engine = extern_engine.engine();
174-
let evaluator =
175-
engine
176-
.evaluation_handler()
177-
.new_expression_evaluator(input_schema, expression, output_type);
174+
let evaluator = engine
175+
.evaluation_handler()
176+
.new_expression_evaluator(input_schema, expression, output_type)
177+
.unwrap();
178178
evaluator.into()
179179
}
180180

kernel/src/actions/visitors.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1091,6 +1091,7 @@ mod tests {
10911091
expression.into(),
10921092
InCommitTimestampVisitor::schema().into(),
10931093
)
1094+
.unwrap()
10941095
.evaluate(batch.as_ref())
10951096
.unwrap()
10961097
}

kernel/src/engine/arrow_expression/mod.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -238,23 +238,23 @@ impl EvaluationHandler for ArrowEvaluationHandler {
238238
schema: SchemaRef,
239239
expression: ExpressionRef,
240240
output_type: DataType,
241-
) -> Arc<dyn ExpressionEvaluator> {
242-
Arc::new(DefaultExpressionEvaluator {
241+
) -> DeltaResult<Arc<dyn ExpressionEvaluator>> {
242+
Ok(Arc::new(DefaultExpressionEvaluator {
243243
input_schema: schema,
244244
expression,
245245
output_type,
246-
})
246+
}))
247247
}
248248

249249
fn new_predicate_evaluator(
250250
&self,
251251
schema: SchemaRef,
252252
predicate: PredicateRef,
253-
) -> Arc<dyn PredicateEvaluator> {
254-
Arc::new(DefaultPredicateEvaluator {
253+
) -> DeltaResult<Arc<dyn PredicateEvaluator>> {
254+
Ok(Arc::new(DefaultPredicateEvaluator {
255255
input_schema: schema,
256256
predicate,
257-
})
257+
}))
258258
}
259259

260260
/// Create a single-row array with all-null leaf values. Note that if a nested struct is

kernel/src/engine/default/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ impl<E: TaskExecutor> DefaultEngine<E> {
106106
input_schema.into(),
107107
transform.clone(),
108108
output_schema.clone().into(),
109-
);
109+
)?;
110110
let physical_data = logical_to_physical_expr.evaluate(data)?;
111111
self.parquet
112112
.write_parquet_file(write_context.target_dir(), physical_data, partition_values)

kernel/src/lib.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -426,7 +426,7 @@ pub trait EvaluationHandler: AsAny {
426426
input_schema: SchemaRef,
427427
expression: ExpressionRef,
428428
output_type: DataType,
429-
) -> Arc<dyn ExpressionEvaluator>;
429+
) -> DeltaResult<Arc<dyn ExpressionEvaluator>>;
430430

431431
/// Create a [`PredicateEvaluator`] that can evaluate the given [`Predicate`] on columnar
432432
/// batches with the given [`Schema`] to produce a column of boolean results.
@@ -443,7 +443,7 @@ pub trait EvaluationHandler: AsAny {
443443
&self,
444444
input_schema: SchemaRef,
445445
predicate: PredicateRef,
446-
) -> Arc<dyn PredicateEvaluator>;
446+
) -> DeltaResult<Arc<dyn PredicateEvaluator>>;
447447

448448
/// Create a single-row all-null-value [`EngineData`] with the schema specified by
449449
/// `output_schema`.
@@ -474,7 +474,8 @@ trait EvaluationHandlerExtension: EvaluationHandler {
474474
schema_transform.transform_struct(schema.as_ref());
475475
let row_expr = schema_transform.try_into_expr()?;
476476

477-
let eval = self.new_expression_evaluator(null_row_schema, row_expr.into(), schema.into());
477+
let eval =
478+
self.new_expression_evaluator(null_row_schema, row_expr.into(), schema.into())?;
478479
eval.evaluate(null_row.as_ref())
479480
}
480481
}

kernel/src/scan/data_skipping.rs

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -134,21 +134,28 @@ impl DataSkippingFilter {
134134
//
135135
// 3. The selection evaluator does DISTINCT(col(predicate), 'false') to produce true (= keep) when
136136
// the predicate is true/null and false (= skip) when the predicate is false.
137-
let select_stats_evaluator = engine.evaluation_handler().new_expression_evaluator(
138-
// safety: kernel is very broken if we don't have the schema for Add actions
139-
get_log_add_schema().clone(),
140-
STATS_EXPR.clone(),
141-
DataType::STRING,
142-
);
143-
144-
let skipping_evaluator = engine.evaluation_handler().new_predicate_evaluator(
145-
stats_schema.clone(),
146-
Arc::new(as_sql_data_skipping_predicate(&predicate)?),
147-
);
137+
let select_stats_evaluator = engine
138+
.evaluation_handler()
139+
.new_expression_evaluator(
140+
// safety: kernel is very broken if we don't have the schema for Add actions
141+
get_log_add_schema().clone(),
142+
STATS_EXPR.clone(),
143+
DataType::STRING,
144+
)
145+
.ok()?;
146+
147+
let skipping_evaluator = engine
148+
.evaluation_handler()
149+
.new_predicate_evaluator(
150+
stats_schema.clone(),
151+
Arc::new(as_sql_data_skipping_predicate(&predicate)?),
152+
)
153+
.ok()?;
148154

149155
let filter_evaluator = engine
150156
.evaluation_handler()
151-
.new_predicate_evaluator(stats_schema.clone(), FILTER_PRED.clone());
157+
.new_predicate_evaluator(stats_schema.clone(), FILTER_PRED.clone())
158+
.ok()?;
152159

153160
Some(Self {
154161
stats_schema,

kernel/src/scan/log_replay.rs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ pub(crate) struct ScanLogReplayProcessor {
5353

5454
impl ScanLogReplayProcessor {
5555
/// Create a new [`ScanLogReplayProcessor`] instance
56-
fn new(engine: &dyn Engine, state_info: Arc<StateInfo>) -> Self {
56+
fn new(engine: &dyn Engine, state_info: Arc<StateInfo>) -> DeltaResult<Self> {
5757
// Extract the physical predicate from StateInfo's PhysicalPredicate enum.
5858
// The DataSkippingFilter and partition_filter components expect the predicate
5959
// in the format Option<(PredicateRef, SchemaRef)>, so we need to convert from
@@ -72,17 +72,17 @@ impl ScanLogReplayProcessor {
7272
None
7373
}
7474
};
75-
Self {
75+
Ok(Self {
7676
partition_filter: physical_predicate.as_ref().map(|(e, _)| e.clone()),
7777
data_skipping_filter: DataSkippingFilter::new(engine, physical_predicate),
7878
add_transform: engine.evaluation_handler().new_expression_evaluator(
7979
get_log_add_schema().clone(),
8080
get_add_transform_expr(),
8181
SCAN_ROW_DATATYPE.clone(),
82-
),
82+
)?,
8383
seen_file_keys: Default::default(),
8484
state_info,
85-
}
85+
})
8686
}
8787
}
8888

@@ -385,8 +385,8 @@ pub(crate) fn scan_action_iter(
385385
engine: &dyn Engine,
386386
action_iter: impl Iterator<Item = DeltaResult<ActionsBatch>>,
387387
state_info: Arc<StateInfo>,
388-
) -> impl Iterator<Item = DeltaResult<ScanMetadata>> {
389-
ScanLogReplayProcessor::new(engine, state_info).process_actions_iter(action_iter)
388+
) -> DeltaResult<impl Iterator<Item = DeltaResult<ScanMetadata>>> {
389+
Ok(ScanLogReplayProcessor::new(engine, state_info)?.process_actions_iter(action_iter))
390390
}
391391

392392
#[cfg(test)]
@@ -478,7 +478,8 @@ mod tests {
478478
.into_iter()
479479
.map(|batch| Ok(ActionsBatch::new(batch as _, true))),
480480
state_info,
481-
);
481+
)
482+
.unwrap();
482483
for res in iter {
483484
let scan_metadata = res.unwrap();
484485
assert!(
@@ -503,7 +504,8 @@ mod tests {
503504
.into_iter()
504505
.map(|batch| Ok(ActionsBatch::new(batch as _, true))),
505506
Arc::new(state_info),
506-
);
507+
)
508+
.unwrap();
507509

508510
fn validate_transform(transform: Option<&ExpressionRef>, expected_date_offset: i32) {
509511
assert!(transform.is_some());
@@ -580,7 +582,8 @@ mod tests {
580582
.into_iter()
581583
.map(|batch| Ok(ActionsBatch::new(batch as _, true))),
582584
Arc::new(state_info),
583-
);
585+
)
586+
.unwrap();
584587

585588
for res in iter {
586589
let scan_metadata = res.unwrap();

kernel/src/scan/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -478,7 +478,7 @@ impl Scan {
478478
scan_row_schema(),
479479
get_scan_metadata_transform_expr(),
480480
RESTORED_ADD_SCHEMA.clone(),
481-
);
481+
)?;
482482
let apply_transform = move |data: Box<dyn EngineData>| {
483483
Ok(ActionsBatch::new(transform.evaluate(data.as_ref())?, false))
484484
};
@@ -537,7 +537,7 @@ impl Scan {
537537
if let PhysicalPredicate::StaticSkipAll = self.state_info.physical_predicate {
538538
return Ok(None.into_iter().flatten());
539539
}
540-
let it = scan_action_iter(engine, action_batch_iter, self.state_info.clone());
540+
let it = scan_action_iter(engine, action_batch_iter, self.state_info.clone())?;
541541
Ok(Some(it).into_iter().flatten())
542542
}
543543

@@ -850,7 +850,8 @@ pub(crate) mod test_utils {
850850
.into_iter()
851851
.map(|batch| Ok(ActionsBatch::new(batch as _, true))),
852852
state_info,
853-
);
853+
)
854+
.unwrap();
854855
let mut batch_count = 0;
855856
for res in iter {
856857
let scan_metadata = res.unwrap();

kernel/src/scan/state.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ pub fn transform_to_logical(
107107
physical_schema.clone(),
108108
transform,
109109
logical_schema.clone().into(), // TODO: expensive deep clone!
110-
)
110+
)?
111111
.evaluate(physical_data.as_ref()),
112112
None => Ok(physical_data),
113113
}

kernel/src/table_changes/log_replay.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ impl LogReplayScanner {
243243
get_log_add_schema().clone(),
244244
Arc::new(cdf_scan_row_expression(timestamp, commit_version)),
245245
cdf_scan_row_schema().into(),
246-
);
246+
)?;
247247

248248
let result = action_iter.map(move |actions| -> DeltaResult<_> {
249249
let actions = actions?;

0 commit comments

Comments
 (0)