1- use std:: collections:: { HashMap , HashSet } ;
1+ use std:: collections:: HashSet ;
22use std:: iter;
33use std:: ops:: Deref ;
44use std:: sync:: { Arc , LazyLock } ;
@@ -8,6 +8,7 @@ use url::Url;
88use crate :: actions:: {
99 as_log_add_schema, domain_metadata:: scan_domain_metadatas, get_log_commit_info_schema,
1010 get_log_domain_metadata_schema, get_log_txn_schema, CommitInfo , DomainMetadata , SetTransaction ,
11+ INTERNAL_DOMAIN_PREFIX ,
1112} ;
1213#[ cfg( feature = "catalog-managed" ) ]
1314use crate :: committer:: FileSystemCommitter ;
@@ -134,7 +135,11 @@ pub struct Transaction {
134135 // commit-wide timestamp (in milliseconds since epoch) - used in ICT, `txn` action, etc. to
135136 // keep all timestamps within the same commit consistent.
136137 commit_timestamp : i64 ,
137- domain_metadatas : Vec < DomainMetadata > ,
138+ // Domain metadata additions for this transaction.
139+ domain_metadata_additions : Vec < DomainMetadata > ,
140+ // Domain names to remove in this transaction. The configuration values are fetched during
141+ // commit from the log to preserve the pre-image in tombstones.
142+ domain_removals : Vec < String > ,
138143 // Whether this transaction contains any logical data changes.
139144 data_change : bool ,
140145}
@@ -177,7 +182,8 @@ impl Transaction {
177182 add_files_metadata : vec ! [ ] ,
178183 set_transactions : vec ! [ ] ,
179184 commit_timestamp,
180- domain_metadatas : vec ! [ ] ,
185+ domain_metadata_additions : vec ! [ ] ,
186+ domain_removals : vec ! [ ] ,
181187 data_change : true ,
182188 } )
183189 }
@@ -347,7 +353,7 @@ impl Transaction {
347353 /// fail (that is, we don't eagerly check domain validity here).
348354 /// Setting metadata for multiple distinct domains is allowed.
349355 pub fn with_domain_metadata ( mut self , domain : String , configuration : String ) -> Self {
350- self . domain_metadatas
356+ self . domain_metadata_additions
351357 . push ( DomainMetadata :: new ( domain, configuration) ) ;
352358 self
353359 }
@@ -362,12 +368,50 @@ impl Transaction {
362368 /// fail (that is, we don't eagerly check domain validity here).
363369 /// Removing metadata for multiple distinct domains is allowed.
364370 pub fn with_domain_metadata_removed ( mut self , domain : String ) -> Self {
365- // actual configuration value determined during commit
366- self . domain_metadatas
367- . push ( DomainMetadata :: remove ( domain, String :: new ( ) ) ) ;
371+ self . domain_removals . push ( domain) ;
368372 self
369373 }
370374
375+ /// Validate that user domains don't conflict with system domains or each other.
376+ fn validate_user_domain_operations ( & self ) -> DeltaResult < ( ) > {
377+ let mut seen_domains = HashSet :: new ( ) ;
378+
379+ // Validate domain additions
380+ for dm in & self . domain_metadata_additions {
381+ let domain = dm. domain ( ) ;
382+ if domain. starts_with ( INTERNAL_DOMAIN_PREFIX ) {
383+ return Err ( Error :: generic (
384+ "Cannot modify domains that start with 'delta.' as those are system controlled" ,
385+ ) ) ;
386+ }
387+
388+ if !seen_domains. insert ( domain) {
389+ return Err ( Error :: generic ( format ! (
390+ "Metadata for domain {} already specified in this transaction" ,
391+ domain
392+ ) ) ) ;
393+ }
394+ }
395+
396+ // Validate domain removals
397+ for domain in & self . domain_removals {
398+ if domain. starts_with ( INTERNAL_DOMAIN_PREFIX ) {
399+ return Err ( Error :: generic (
400+ "Cannot modify domains that start with 'delta.' as those are system controlled" ,
401+ ) ) ;
402+ }
403+
404+ if !seen_domains. insert ( domain. as_str ( ) ) {
405+ return Err ( Error :: generic ( format ! (
406+ "Metadata for domain {} already specified in this transaction" ,
407+ domain
408+ ) ) ) ;
409+ }
410+ }
411+
412+ Ok ( ( ) )
413+ }
414+
371415 /// Generate domain metadata actions with validation. Handle both user and system domains.
372416 ///
373417 /// This function may perform an expensive log replay operation if there are any domain removals.
@@ -378,71 +422,55 @@ impl Transaction {
378422 engine : & ' a dyn Engine ,
379423 row_tracking_high_watermark : Option < RowTrackingDomainMetadata > ,
380424 ) -> DeltaResult < impl Iterator < Item = DeltaResult < Box < dyn EngineData > > > + ' a > {
381- // if there are domain metadata actions, the table must support it
382- if !self . domain_metadatas . is_empty ( )
425+ // Validate feature support for user domain operations
426+ if ( !self . domain_metadata_additions . is_empty ( ) || ! self . domain_removals . is_empty ( ) )
383427 && !self
384428 . read_snapshot
385429 . table_configuration ( )
386430 . is_domain_metadata_supported ( )
387431 {
388- return Err ( Error :: unsupported (
389- "Domain metadata operations require writer version 7 and the 'domainMetadata' writer feature"
390- ) ) ;
432+ return Err ( Error :: unsupported ( "Domain metadata operations require writer version 7 and the 'domainMetadata' writer feature" ) ) ;
391433 }
392434
393- // validate user domain metadata and check if we have removals
394- let mut seen_domains = HashSet :: new ( ) ;
395- let mut has_removals = false ;
396- for dm in & self . domain_metadatas {
397- if dm. is_internal ( ) {
398- return Err ( Error :: Generic (
399- "Cannot modify domains that start with 'delta.' as those are system controlled"
400- . to_string ( ) ,
401- ) ) ;
402- }
403-
404- if !seen_domains. insert ( dm. domain ( ) ) {
405- return Err ( Error :: Generic ( format ! (
406- "Metadata for domain {} already specified in this transaction" ,
407- dm. domain( )
408- ) ) ) ;
409- }
410-
411- if dm. is_removed ( ) {
412- has_removals = true ;
413- }
414- }
435+ // Validate user domain operations
436+ self . validate_user_domain_operations ( ) ?;
437+
438+ // Generate user domain removals via log replay (expensive if non-empty)
439+ let removal_actions = if !self . domain_removals . is_empty ( ) {
440+ // Scan log to fetch existing configurations for tombstones
441+ let existing_domains =
442+ scan_domain_metadatas ( self . read_snapshot . log_segment ( ) , None , engine) ?;
443+
444+ // Create removal tombstones with pre-image configurations
445+ let removals: Vec < _ > = self
446+ . domain_removals
447+ . iter ( )
448+ . filter_map ( |domain| {
449+ // If domain doesn't exist in the log, this is a no-op (filter it out)
450+ existing_domains. get ( domain) . map ( |existing| {
451+ DomainMetadata :: remove ( domain. clone ( ) , existing. configuration ( ) . to_owned ( ) )
452+ } )
453+ } )
454+ . collect ( ) ;
415455
416- // fetch previous configuration values (requires log replay)
417- let existing_domains = if has_removals {
418- scan_domain_metadatas ( self . read_snapshot . log_segment ( ) , None , engine) ?
456+ removals
419457 } else {
420- HashMap :: new ( )
458+ vec ! [ ]
421459 } ;
422460
423- let user_domains = self
424- . domain_metadatas
425- . iter ( )
426- . filter_map ( move |dm : & DomainMetadata | {
427- if dm. is_removed ( ) {
428- existing_domains. get ( dm. domain ( ) ) . map ( |existing| {
429- DomainMetadata :: remove (
430- dm. domain ( ) . to_string ( ) ,
431- existing. configuration ( ) . to_string ( ) ,
432- )
433- } )
434- } else {
435- Some ( dm. clone ( ) )
436- }
437- } ) ;
438-
439- let system_domains = row_tracking_high_watermark
461+ // Generate system domain actions (row tracking)
462+ let system_domain_actions = row_tracking_high_watermark
440463 . map ( DomainMetadata :: try_from)
441464 . transpose ( ) ?
442465 . into_iter ( ) ;
443466
444- Ok ( user_domains
445- . chain ( system_domains)
467+ // Chain all domain actions and convert to EngineData
468+ Ok ( self
469+ . domain_metadata_additions
470+ . clone ( )
471+ . into_iter ( )
472+ . chain ( removal_actions)
473+ . chain ( system_domain_actions)
446474 . map ( |dm| dm. into_engine_data ( get_log_domain_metadata_schema ( ) . clone ( ) , engine) ) )
447475 }
448476
0 commit comments