Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -878,6 +878,12 @@ pub(crate) struct Remove {
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub(crate) size: Option<i64>,

/// Contains [statistics] (e.g., count, min/max values for columns) about the data in this logical file encoded as a JSON string.
///
/// [statistics]: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#Per-file-Statistics
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub stats: Option<String>,

/// Map containing metadata about this logical file.
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub(crate) tags: Option<HashMap<String, String>>,
Expand Down Expand Up @@ -1237,6 +1243,7 @@ mod tests {
StructField::nullable("extendedFileMetadata", DataType::BOOLEAN),
partition_values_field(),
StructField::nullable("size", DataType::LONG),
StructField::nullable("stats", DataType::STRING),
tags_field(),
deletion_vector_field(),
StructField::nullable("baseRowId", DataType::LONG),
Expand Down
92 changes: 85 additions & 7 deletions kernel/src/actions/visitors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ impl RemoveVisitor {
getters: &[&'a dyn GetData<'a>],
) -> DeltaResult<Remove> {
require!(
getters.len() == 14,
getters.len() == 15,
Error::InternalError(format!(
"Wrong number of RemoveVisitor getters: {}",
getters.len()
Expand All @@ -194,14 +194,14 @@ impl RemoveVisitor {
getters[4].get_opt(row_index, "remove.partitionValues")?;

let size: Option<i64> = getters[5].get_opt(row_index, "remove.size")?;
let stats: Option<String> = getters[6].get_opt(row_index, "remove.stats")?;
// TODO(nick) tags are skipped in getters[7]

// TODO(nick) tags are skipped in getters[6]

let deletion_vector = visit_deletion_vector_at(row_index, &getters[7..])?;
let deletion_vector = visit_deletion_vector_at(row_index, &getters[8..])?;

let base_row_id: Option<i64> = getters[12].get_opt(row_index, "remove.baseRowId")?;
let base_row_id: Option<i64> = getters[13].get_opt(row_index, "remove.baseRowId")?;
let default_row_commit_version: Option<i64> =
getters[13].get_opt(row_index, "remove.defaultRowCommitVersion")?;
getters[14].get_opt(row_index, "remove.defaultRowCommitVersion")?;

Ok(Remove {
path,
Expand All @@ -210,6 +210,7 @@ impl RemoveVisitor {
extended_file_metadata,
partition_values,
size,
stats,
tags: None,
deletion_vector,
base_row_id,
Expand Down Expand Up @@ -834,7 +835,7 @@ mod tests {
let json_strings: StringArray = vec![
r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#,
r#"{"metaData":{"id":"aff5cb91-8cd9-4195-aef9-446908507302","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c3\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["c1","c2"],"configuration":{},"createdTime":1670892997849}}"#,
r#"{"remove":{"path":"c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet","deletionTimestamp":1670892998135,"dataChange":true,"partitionValues":{"c1":"4","c2":"c"},"size":452}}"#,
r#"{"remove":{"path":"c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet","deletionTimestamp":1670892998135,"dataChange":true,"partitionValues":{"c1":"4","c2":"c"},"size":452,"stats":"{\"numRecords\":1}"}}"#,
]
.into();
let batch = parse_json_batch(json_strings);
Expand All @@ -850,6 +851,7 @@ mod tests {
("c2".to_string(), "c".to_string()),
])),
size: Some(452),
stats: Some(r#"{"numRecords":1}"#.to_string()),
..Default::default()
};
assert_eq!(
Expand All @@ -863,6 +865,82 @@ mod tests {
);
}

#[test]
fn test_parse_remove_all_fields_unique() {
// This test verifies that all fields in the Remove action are correctly parsed
// and that each field gets a unique value, ensuring no index collisions
let json_strings: StringArray = vec![
r#"{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["deletionVectors"],"writerFeatures":["deletionVectors"]}}"#,
r#"{"metaData":{"id":"test-id","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1670892997849}}"#,
r#"{"remove":{"path":"test-path.parquet","deletionTimestamp":1234567890,"dataChange":false,"extendedFileMetadata":true,"partitionValues":{"part":"value"},"size":9999,"stats":"{\"numRecords\":42}","deletionVector":{"storageType":"u","pathOrInlineDv":"vBn[lx{q8@P<9BNH/isA","offset":1,"sizeInBytes":36,"cardinality":3},"baseRowId":100,"defaultRowCommitVersion":5}}"#,
]
.into();
let batch = parse_json_batch(json_strings);
let mut remove_visitor = RemoveVisitor::default();
remove_visitor.visit_rows_of(batch.as_ref()).unwrap();

assert_eq!(
remove_visitor.removes.len(),
1,
"Expected exactly one remove action"
);

let remove = &remove_visitor.removes[0];

// Verify each field has the expected unique value
assert_eq!(remove.path, "test-path.parquet", "path mismatch");
assert_eq!(
remove.deletion_timestamp,
Some(1234567890),
"deletion_timestamp mismatch"
);
assert!(!remove.data_change, "data_change mismatch");
assert_eq!(
remove.extended_file_metadata,
Some(true),
"extended_file_metadata mismatch"
);
assert_eq!(
remove.partition_values,
Some(HashMap::from([("part".to_string(), "value".to_string())])),
"partition_values mismatch"
);
assert_eq!(remove.size, Some(9999), "size mismatch");
assert_eq!(
remove.stats,
Some(r#"{"numRecords":42}"#.to_string()),
"stats mismatch"
);

// Verify deletion vector fields
let dv = remove
.deletion_vector
.as_ref()
.expect("deletion_vector should be present");
assert_eq!(
dv.path_or_inline_dv, "vBn[lx{q8@P<9BNH/isA",
"deletion_vector.path_or_inline_dv mismatch"
);
assert_eq!(dv.offset, Some(1), "deletion_vector.offset mismatch");
assert_eq!(
dv.size_in_bytes, 36,
"deletion_vector.size_in_bytes mismatch"
);
assert_eq!(dv.cardinality, 3, "deletion_vector.cardinality mismatch");

// Verify row tracking fields (these would have been incorrect with the bug)
assert_eq!(
remove.base_row_id,
Some(100),
"base_row_id mismatch - check getter index"
);
assert_eq!(
remove.default_row_commit_version,
Some(5),
"default_row_commit_version mismatch - check getter index"
);
}

#[test]
fn test_parse_txn() {
let json_strings: StringArray = vec![
Expand Down
Loading