diff --git a/kernel/src/kernel_predicates/mod.rs b/kernel/src/kernel_predicates/mod.rs index db4c8850e..99f8c39e8 100644 --- a/kernel/src/kernel_predicates/mod.rs +++ b/kernel/src/kernel_predicates/mod.rs @@ -842,6 +842,33 @@ pub trait DataSkippingPredicateEvaluator { inverted: bool, ) -> Option { let max = self.get_max_stat(col, &val.data_type())?; + + // Delta Lake min/max stats are stored with millisecond precision (truncated, not rounded up), so we can't use direct comparison. + // For max stats comparison, we adjust timestamp values by subtracting 999 microseconds from the value to ensure + // that comparisons against max stats are correct. Any rows that pass this filter will be re-evaluated later for exact matches. + // See: + // - https://github.com/delta-io/delta-kernel-rs/pull/1003 + if matches!(val, Scalar::Timestamp(_) | Scalar::TimestampNtz(_)) { + match (ord, inverted) { + // col > val => stats.max.col > val + // NOT(col < val) => NOT(stats.max.col < val) + (Ordering::Greater, false) | (Ordering::Less, true) => { + let max_ts_adjusted = timestamp_subtract(val, 999); + tracing::debug!( + "Adjusted timestamp value for col {col} for max stat comparison from {val:?} to {max_ts_adjusted:?}" + ); + return self.eval_partial_cmp(ord, max, &max_ts_adjusted, inverted); + } + // // The following case is not currently used but included for completeness and to ensure correctness in the future if logic is changed to use it. + // !(max > val) or max < val + (Ordering::Greater, true) | (Ordering::Less, false) => { + return self.eval_partial_cmp(ord, max, &val, inverted); + } + // Equality comparison can't be applied as max stats is truncated to milliseconds, so actual microsecond value is unknown. + (Ordering::Equal, _) => return None, + } + } + self.eval_partial_cmp(ord, max, val, inverted) } @@ -986,3 +1013,12 @@ impl KernelPredicateEvaluator for T self.finish_eval_pred_junction(op, preds, inverted) } } + +/// Adjust timestamp value by subtracting the given interval in microseconds. +fn timestamp_subtract(val: &Scalar, interval_micros: i64) -> Scalar { + match val { + Scalar::Timestamp(ts) => Scalar::Timestamp(*ts - interval_micros), + Scalar::TimestampNtz(ts) => Scalar::TimestampNtz(*ts - interval_micros), + _ => val.clone(), + } +} diff --git a/kernel/src/scan/data_skipping.rs b/kernel/src/scan/data_skipping.rs index a0398a0ec..6cc4a7920 100644 --- a/kernel/src/scan/data_skipping.rs +++ b/kernel/src/scan/data_skipping.rs @@ -218,14 +218,8 @@ impl DataSkippingPredicateEvaluator for DataSkippingPredicateCreator { Some(joined_column_expr!("minValues", col)) } - /// Retrieves the maximum value of a column, if it exists and has the requested type. - // TODO(#1002): we currently don't support file skipping on timestamp columns' max stat since - // they are truncated to milliseconds in add.stats. - fn get_max_stat(&self, col: &ColumnName, data_type: &DataType) -> Option { - match data_type { - &DataType::TIMESTAMP | &DataType::TIMESTAMP_NTZ => None, - _ => Some(joined_column_expr!("maxValues", col)), - } + fn get_max_stat(&self, col: &ColumnName, _data_type: &DataType) -> Option { + Some(joined_column_expr!("maxValues", col)) } /// Retrieves the null count of a column, if it exists. diff --git a/kernel/src/scan/data_skipping/tests.rs b/kernel/src/scan/data_skipping/tests.rs index f492b8fab..6e6ee3007 100644 --- a/kernel/src/scan/data_skipping/tests.rs +++ b/kernel/src/scan/data_skipping/tests.rs @@ -340,42 +340,11 @@ fn test_sql_where() { do_test(ALL_NULL, pred, MISSING, None, None); } -// TODO(#1002): we currently don't support file skipping on timestamp columns' max stat since they -// are truncated to milliseconds in add.stats. #[test] -fn test_timestamp_skipping_disabled() { - let creator = DataSkippingPredicateCreator; - let col = &column_name!("timestamp_col"); - - assert!( - creator.get_min_stat(col, &DataType::TIMESTAMP).is_some(), - "get_min_stat should return Some: allow data skipping on timestamp minValues" - ); - assert_eq!( - creator.get_max_stat(col, &DataType::TIMESTAMP), - None, - "get_max_stat should return None: no data skipping on timestamp maxValues" - ); - assert!( - creator - .get_min_stat(col, &DataType::TIMESTAMP_NTZ) - .is_some(), - "get_min_stat should return Some: allow data skipping on timestamp_ntz minValues" - ); - assert_eq!( - creator.get_max_stat(col, &DataType::TIMESTAMP_NTZ), - None, - "get_max_stat should return None: no data skipping on timestamp_ntz maxValues" - ); -} - -// TODO(#1002): we currently don't support file skipping on timestamp columns' max stat since they -// are truncated to milliseconds in add.stats. -#[test] -fn test_timestamp_predicates_dont_data_skip() { +fn test_timestamp_predicates_data_skip() { let col = &column_expr!("ts_col"); for timestamp in [&Scalar::Timestamp(1000000), &Scalar::TimestampNtz(1000000)] { - // LT will do minValues -> OK + // LT will do minValues let pred = Pred::lt(col.clone(), timestamp.clone()); let skipping_pred = as_data_skipping_predicate(&pred); assert_eq!( @@ -383,19 +352,19 @@ fn test_timestamp_predicates_dont_data_skip() { "Column(minValues.ts_col) < 1000000" ); - // GT will do maxValues -> BLOCKED + // GT will adjust predicate value for maxValues comparison let pred = Pred::gt(col.clone(), timestamp.clone()); let skipping_pred = as_data_skipping_predicate(&pred); - assert!( - skipping_pred.is_none(), - "Expected no data skipping for timestamp predicate: {pred:#?}, got {skipping_pred:#?}" + assert_eq!( + skipping_pred.unwrap().to_string(), + "Column(maxValues.ts_col) > 999001" ); let pred = Pred::eq(col.clone(), timestamp.clone()); let skipping_pred = as_data_skipping_predicate(&pred); assert_eq!( skipping_pred.unwrap().to_string(), - "AND(NOT(Column(minValues.ts_col) > 1000000), null)" + "AND(NOT(Column(minValues.ts_col) > 1000000), NOT(Column(maxValues.ts_col) < 999001))" ); let pred = Pred::ne(col.clone(), timestamp.clone());