@@ -20,6 +20,7 @@ use crate::table_features::{
2020 ColumnMappingMode , TableFeature ,
2121} ;
2222use crate :: table_properties:: TableProperties ;
23+ use crate :: utils:: require;
2324use crate :: { DeltaResult , Error , Version } ;
2425use delta_kernel_derive:: internal_api;
2526
@@ -182,6 +183,27 @@ impl TableConfiguration {
182183 pub ( crate ) fn ensure_write_supported ( & self ) -> DeltaResult < ( ) > {
183184 self . protocol . ensure_write_supported ( ) ?;
184185
186+ // We allow Change Data Feed to be enabled only if AppendOnly is enabled.
187+ // This is because kernel does not yet support writing `.cdc` files for DML operations.
188+ if self
189+ . table_properties ( )
190+ . enable_change_data_feed
191+ . unwrap_or ( false )
192+ {
193+ require ! (
194+ self . is_append_only_enabled( ) ,
195+ Error :: unsupported( "Writing to table with Change Data Feed is only supported if append only mode is enabled" )
196+ ) ;
197+ require ! (
198+ self . is_cdf_read_supported( ) ,
199+ Error :: unsupported(
200+ "Change data feed is enabled on this table, but found invalid table
201+ table_configuration. Ensure that column mapping is disabled and ensure correct
202+ protocol reader/writer features"
203+ )
204+ ) ;
205+ }
206+
185207 // for now we don't allow invariants so although we support writer version 2 and the
186208 // ColumnInvariant TableFeature we _must_ check here that they are not actually in use
187209 if self . is_invariants_supported ( )
@@ -423,13 +445,65 @@ mod test {
423445
424446 use crate :: actions:: { Metadata , Protocol } ;
425447 use crate :: schema:: { DataType , StructField , StructType } ;
426- use crate :: table_features:: TableFeature ;
448+ use crate :: table_features:: { FeatureType , TableFeature } ;
427449 use crate :: table_properties:: TableProperties ;
428450 use crate :: utils:: test_utils:: assert_result_error_with_message;
429451 use crate :: Error ;
430452
431453 use super :: { InCommitTimestampEnablement , TableConfiguration } ;
432454
455+ fn create_mock_table_config (
456+ props_to_enable : & [ & str ] ,
457+ features : & [ TableFeature ] ,
458+ ) -> TableConfiguration {
459+ create_mock_table_config_with_version ( props_to_enable, features. into ( ) , 3 , 7 )
460+ }
461+
462+ fn create_mock_table_config_with_version (
463+ props_to_enable : & [ & str ] ,
464+ features_opt : Option < & [ TableFeature ] > ,
465+ min_reader_version : i32 ,
466+ min_writer_version : i32 ,
467+ ) -> TableConfiguration {
468+ let schema = StructType :: new_unchecked ( [ StructField :: nullable ( "value" , DataType :: INTEGER ) ] ) ;
469+ let metadata = Metadata :: try_new (
470+ None ,
471+ None ,
472+ schema,
473+ vec ! [ ] ,
474+ 0 ,
475+ HashMap :: from_iter (
476+ props_to_enable
477+ . iter ( )
478+ . map ( |key| ( key. to_string ( ) , "true" . to_string ( ) ) ) ,
479+ ) ,
480+ )
481+ . unwrap ( ) ;
482+ let reader_features = features_opt. map ( |features| {
483+ features
484+ . iter ( )
485+ . filter ( |feature| matches ! ( feature. feature_type( ) , FeatureType :: ReaderWriter ) )
486+ } ) ;
487+ let writer_features = features_opt. map ( |features| {
488+ features. iter ( ) . filter ( |feature| {
489+ matches ! (
490+ feature. feature_type( ) ,
491+ FeatureType :: Writer | FeatureType :: ReaderWriter
492+ )
493+ } )
494+ } ) ;
495+
496+ let protocol = Protocol :: try_new (
497+ min_reader_version,
498+ min_writer_version,
499+ reader_features,
500+ writer_features,
501+ )
502+ . unwrap ( ) ;
503+ let table_root = Url :: try_from ( "file:///" ) . unwrap ( ) ;
504+ TableConfiguration :: try_new ( metadata, protocol, table_root, 0 ) . unwrap ( )
505+ }
506+
433507 #[ test]
434508 fn dv_supported_not_enabled ( ) {
435509 let schema = StructType :: new_unchecked ( [ StructField :: nullable ( "value" , DataType :: INTEGER ) ] ) ;
@@ -454,6 +528,7 @@ mod test {
454528 assert ! ( table_config. is_deletion_vector_supported( ) ) ;
455529 assert ! ( !table_config. is_deletion_vector_enabled( ) ) ;
456530 }
531+
457532 #[ test]
458533 fn dv_enabled ( ) {
459534 let schema = StructType :: new_unchecked ( [ StructField :: nullable ( "value" , DataType :: INTEGER ) ] ) ;
@@ -484,6 +559,79 @@ mod test {
484559 assert ! ( table_config. is_deletion_vector_supported( ) ) ;
485560 assert ! ( table_config. is_deletion_vector_enabled( ) ) ;
486561 }
562+
563+ #[ test]
564+ fn write_with_cdf ( ) {
565+ use TableFeature :: * ;
566+ let cases = [
567+ (
568+ // Should fail since AppendOnly is not supported
569+ create_mock_table_config ( & [ "delta.enableChangeDataFeed" ] , & [ ChangeDataFeed ] ) ,
570+ Err ( Error :: unsupported ( "Writing to table with Change Data Feed is only supported if append only mode is enabled" ) )
571+ ) ,
572+ (
573+ // Should fail since AppendOnly is supported but not enabled
574+ create_mock_table_config (
575+ & [ "delta.enableChangeDataFeed" ] ,
576+ & [ ChangeDataFeed , AppendOnly ] ,
577+ ) ,
578+ Err ( Error :: unsupported ( "Writing to table with Change Data Feed is only supported if append only mode is enabled" ) )
579+ ) ,
580+ (
581+ // Should succeed since AppendOnly is enabled
582+ create_mock_table_config (
583+ & [ "delta.delta.enableChangeDataFeed" , "delta.appendOnly" ] ,
584+ & [ ChangeDataFeed , AppendOnly ] ,
585+ ) ,
586+ Ok ( ( ) ) ,
587+ ) ,
588+
589+ (
590+ // Fails since writes are not supported on min_writer_version=4. Once version 4 is
591+ // supported, ensure that this still fails since ChangeDataFeed is enabled while
592+ // append only is not enabled.
593+ create_mock_table_config_with_version ( & [ "delta.enableChangeDataFeed" ] , None , 1 , 4 ) ,
594+ Err ( Error :: unsupported ( "Currently delta-kernel-rs can only write to tables with protocol.minWriterVersion = 1, 2, or 7" ) )
595+
596+ ) ,
597+ // NOTE: The following cases should be updated if column mapping for writes is
598+ // supported before cdc is.
599+ (
600+ // Should fail since change data feed and column mapping features cannot both be
601+ // present.
602+ create_mock_table_config (
603+ & [ "delta.enableChangeDataFeed" , "delta.appendOnly" ] ,
604+ & [ ChangeDataFeed , ColumnMapping , AppendOnly ] ,
605+ ) ,
606+ Err ( Error :: unsupported ( r#"Found unsupported TableFeatures: "columnMapping". Supported TableFeatures: "changeDataFeed", "appendOnly", "deletionVectors", "domainMetadata", "inCommitTimestamp", "invariants", "rowTracking", "timestampNtz", "variantType", "variantType-preview", "variantShredding-preview""# ) ) ,
607+ ) ,
608+ (
609+ // The table does not require writing CDC files, so it is safe to write to it.
610+ create_mock_table_config (
611+ & [ "delta.appendOnly" ] ,
612+ & [ ChangeDataFeed , ColumnMapping , AppendOnly ] ,
613+ ) ,
614+ Err ( Error :: unsupported ( r#"Found unsupported TableFeatures: "columnMapping". Supported TableFeatures: "changeDataFeed", "appendOnly", "deletionVectors", "domainMetadata", "inCommitTimestamp", "invariants", "rowTracking", "timestampNtz", "variantType", "variantType-preview", "variantShredding-preview""# ) ) ,
615+ ) ,
616+ (
617+ // Should succeed since change data feed is not enabled
618+ create_mock_table_config ( & [ "delta.appendOnly" ] , & [ AppendOnly ] ) ,
619+ Ok ( ( ) ) ,
620+ ) ,
621+ ] ;
622+
623+ for ( table_configuration, result) in cases {
624+ match ( table_configuration. ensure_write_supported ( ) , result) {
625+ ( Ok ( ( ) ) , Ok ( ( ) ) ) => { /* Correct result */ }
626+ ( actual_result, Err ( expected) ) => {
627+ assert_result_error_with_message ( actual_result, & expected. to_string ( ) ) ;
628+ }
629+ ( Err ( actual_result) , Ok ( ( ) ) ) => {
630+ panic ! ( "Expected Ok but got error: {actual_result}" ) ;
631+ }
632+ }
633+ }
634+ }
487635 #[ test]
488636 fn ict_enabled_from_table_creation ( ) {
489637 let schema = StructType :: new_unchecked ( [ StructField :: nullable ( "value" , DataType :: INTEGER ) ] ) ;
0 commit comments