Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,7 @@ where
.filter(|feature| !supported_features.contains(*feature));

Err(Error::Unsupported(format!(
"Unknown {}s: \"{}\". Supported {}s: \"{}\"",
"Found unsupported {}s: \"{}\". Supported {}s: \"{}\"",
features_type,
unsupported.join("\", \""),
features_type,
Expand Down Expand Up @@ -1595,7 +1595,7 @@ mod tests {
.unwrap();
assert_result_error_with_message(
protocol.ensure_write_supported(),
r#"Unsupported: Unknown TableFeatures: "identityColumns". Supported TableFeatures: "appendOnly", "deletionVectors", "domainMetadata", "inCommitTimestamp", "invariants", "rowTracking", "timestampNtz", "variantType", "variantType-preview", "variantShredding-preview""#,
r#"Unsupported: Found unsupported TableFeatures: "identityColumns". Supported TableFeatures: "changeDataFeed", "appendOnly", "deletionVectors", "domainMetadata", "inCommitTimestamp", "invariants", "rowTracking", "timestampNtz", "variantType", "variantType-preview", "variantShredding-preview""#,
);

// Unknown writer features are allowed during creation for forward compatibility,
Expand All @@ -1609,7 +1609,7 @@ mod tests {
.unwrap();
assert_result_error_with_message(
protocol.ensure_write_supported(),
r#"Unsupported: Unknown TableFeatures: "unsupported writer". Supported TableFeatures: "appendOnly", "deletionVectors", "domainMetadata", "inCommitTimestamp", "invariants", "rowTracking", "timestampNtz", "variantType", "variantType-preview", "variantShredding-preview""#,
r#"Unsupported: Found unsupported TableFeatures: "unsupported writer". Supported TableFeatures: "changeDataFeed", "appendOnly", "deletionVectors", "domainMetadata", "inCommitTimestamp", "invariants", "rowTracking", "timestampNtz", "variantType", "variantType-preview", "variantShredding-preview""#,
);
}

Expand Down Expand Up @@ -1643,7 +1643,7 @@ mod tests {
let error = ensure_supported_features(&table_features, &supported_features).unwrap_err();
match error {
Error::Unsupported(e) if e ==
"Unknown TableFeatures: \"idk\". Supported TableFeatures: \"columnMapping\", \"deletionVectors\""
"Found unsupported TableFeatures: \"idk\". Supported TableFeatures: \"columnMapping\", \"deletionVectors\""
=> {},
_ => panic!("Expected unsupported error, got: {error}"),
}
Expand Down
150 changes: 149 additions & 1 deletion kernel/src/table_configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::table_features::{
ColumnMappingMode, TableFeature,
};
use crate::table_properties::TableProperties;
use crate::utils::require;
use crate::{DeltaResult, Error, Version};
use delta_kernel_derive::internal_api;

Expand Down Expand Up @@ -182,6 +183,27 @@ impl TableConfiguration {
pub(crate) fn ensure_write_supported(&self) -> DeltaResult<()> {
self.protocol.ensure_write_supported()?;

// We allow Change Data Feed to be enabled only if AppendOnly is enabled.
// This is because kernel does not yet support writing `.cdc` files for DML operations.
if self
.table_properties()
.enable_change_data_feed
.unwrap_or(false)
{
require!(
self.is_append_only_enabled(),
Error::unsupported("Writing to table with Change Data Feed is only supported if append only mode is enabled")
);
require!(
self.is_cdf_read_supported(),
Error::unsupported(
"Change data feed is enabled on this table, but found invalid table
table_configuration. Ensure that column mapping is disabled and ensure correct
protocol reader/writer features"
)
);
}

// for now we don't allow invariants so although we support writer version 2 and the
// ColumnInvariant TableFeature we _must_ check here that they are not actually in use
if self.is_invariants_supported()
Expand Down Expand Up @@ -423,13 +445,65 @@ mod test {

use crate::actions::{Metadata, Protocol};
use crate::schema::{DataType, StructField, StructType};
use crate::table_features::TableFeature;
use crate::table_features::{FeatureType, TableFeature};
use crate::table_properties::TableProperties;
use crate::utils::test_utils::assert_result_error_with_message;
use crate::Error;

use super::{InCommitTimestampEnablement, TableConfiguration};

fn create_mock_table_config(
props_to_enable: &[&str],
features: &[TableFeature],
) -> TableConfiguration {
create_mock_table_config_with_version(props_to_enable, features.into(), 3, 7)
}

fn create_mock_table_config_with_version(
props_to_enable: &[&str],
features_opt: Option<&[TableFeature]>,
min_reader_version: i32,
min_writer_version: i32,
) -> TableConfiguration {
let schema = StructType::new_unchecked([StructField::nullable("value", DataType::INTEGER)]);
let metadata = Metadata::try_new(
None,
None,
schema,
vec![],
0,
HashMap::from_iter(
props_to_enable
.iter()
.map(|key| (key.to_string(), "true".to_string())),
),
)
.unwrap();
let reader_features = features_opt.map(|features| {
features
.iter()
.filter(|feature| matches!(feature.feature_type(), FeatureType::ReaderWriter))
});
let writer_features = features_opt.map(|features| {
features.iter().filter(|feature| {
matches!(
feature.feature_type(),
FeatureType::Writer | FeatureType::ReaderWriter
)
})
});

let protocol = Protocol::try_new(
min_reader_version,
min_writer_version,
reader_features,
writer_features,
)
.unwrap();
let table_root = Url::try_from("file:///").unwrap();
TableConfiguration::try_new(metadata, protocol, table_root, 0).unwrap()
}

#[test]
fn dv_supported_not_enabled() {
let schema = StructType::new_unchecked([StructField::nullable("value", DataType::INTEGER)]);
Expand All @@ -454,6 +528,7 @@ mod test {
assert!(table_config.is_deletion_vector_supported());
assert!(!table_config.is_deletion_vector_enabled());
}

#[test]
fn dv_enabled() {
let schema = StructType::new_unchecked([StructField::nullable("value", DataType::INTEGER)]);
Expand Down Expand Up @@ -484,6 +559,79 @@ mod test {
assert!(table_config.is_deletion_vector_supported());
assert!(table_config.is_deletion_vector_enabled());
}

#[test]
fn write_with_cdf() {
use TableFeature::*;
let cases = [
(
// Should fail since AppendOnly is not supported
create_mock_table_config(&["delta.enableChangeDataFeed"], &[ChangeDataFeed]),
Err(Error::unsupported("Writing to table with Change Data Feed is only supported if append only mode is enabled"))
),
(
// Should fail since AppendOnly is supported but not enabled
create_mock_table_config(
&["delta.enableChangeDataFeed"],
&[ChangeDataFeed, AppendOnly],
),
Err(Error::unsupported("Writing to table with Change Data Feed is only supported if append only mode is enabled"))
),
(
// Should succeed since AppendOnly is enabled
create_mock_table_config(
&["delta.delta.enableChangeDataFeed", "delta.appendOnly"],
&[ChangeDataFeed, AppendOnly],
),
Ok(()),
),

(
// Fails since writes are not supported on min_writer_version=4. Once version 4 is
// supported, ensure that this still fails since ChangeDataFeed is enabled while
// append only is not enabled.
create_mock_table_config_with_version(&["delta.enableChangeDataFeed"],None, 1, 4),
Err(Error::unsupported("Currently delta-kernel-rs can only write to tables with protocol.minWriterVersion = 1, 2, or 7"))

),
// NOTE: The following cases should be updated if column mapping for writes is
// supported before cdc is.
(
// Should fail since change data feed and column mapping features cannot both be
// present.
create_mock_table_config(
&["delta.enableChangeDataFeed", "delta.appendOnly"],
&[ChangeDataFeed, ColumnMapping, AppendOnly],
),
Err(Error::unsupported(r#"Found unsupported TableFeatures: "columnMapping". Supported TableFeatures: "changeDataFeed", "appendOnly", "deletionVectors", "domainMetadata", "inCommitTimestamp", "invariants", "rowTracking", "timestampNtz", "variantType", "variantType-preview", "variantShredding-preview""#)),
),
(
// The table does not require writing CDC files, so it is safe to write to it.
create_mock_table_config(
&["delta.appendOnly"],
&[ChangeDataFeed, ColumnMapping, AppendOnly],
),
Err(Error::unsupported(r#"Found unsupported TableFeatures: "columnMapping". Supported TableFeatures: "changeDataFeed", "appendOnly", "deletionVectors", "domainMetadata", "inCommitTimestamp", "invariants", "rowTracking", "timestampNtz", "variantType", "variantType-preview", "variantShredding-preview""#)),
),
(
// Should succeed since change data feed is not enabled
create_mock_table_config(&["delta.appendOnly"], &[AppendOnly]),
Ok(()),
),
];

for (table_configuration, result) in cases {
match (table_configuration.ensure_write_supported(), result) {
(Ok(()), Ok(())) => { /* Correct result */ }
(actual_result, Err(expected)) => {
assert_result_error_with_message(actual_result, &expected.to_string());
}
(Err(actual_result), Ok(())) => {
panic!("Expected Ok but got error: {actual_result}");
}
}
}
}
#[test]
fn ict_enabled_from_table_creation() {
let schema = StructType::new_unchecked([StructField::nullable("value", DataType::INTEGER)]);
Expand Down
1 change: 1 addition & 0 deletions kernel/src/table_features/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ pub(crate) static SUPPORTED_READER_FEATURES: LazyLock<Vec<TableFeature>> = LazyL
/// tables with row tracking yet.
pub(crate) static SUPPORTED_WRITER_FEATURES: LazyLock<Vec<TableFeature>> = LazyLock::new(|| {
vec![
TableFeature::ChangeDataFeed,
TableFeature::AppendOnly,
TableFeature::DeletionVectors,
TableFeature::DomainMetadata,
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ pub(crate) mod test_utils {
message: &str,
) {
match res {
Ok(_) => panic!("Expected error, but got Ok result"),
Ok(_) => panic!("Expected error with message {message}, but got Ok result"),
Err(error) => {
let error_str = error.to_string();
assert!(
Expand Down
Loading