From 6940694e17832b465d27c454dd51b2d25a3435d7 Mon Sep 17 00:00:00 2001 From: DrakeLin Date: Wed, 22 Oct 2025 23:15:38 +0000 Subject: [PATCH 1/4] followup --- kernel/src/transaction/mod.rs | 67 +++++++++++++++++++++-------------- 1 file changed, 40 insertions(+), 27 deletions(-) diff --git a/kernel/src/transaction/mod.rs b/kernel/src/transaction/mod.rs index f647ba0e80..da624e4a42 100644 --- a/kernel/src/transaction/mod.rs +++ b/kernel/src/transaction/mod.rs @@ -8,6 +8,7 @@ use url::Url; use crate::actions::{ as_log_add_schema, domain_metadata::scan_domain_metadatas, get_log_commit_info_schema, get_log_domain_metadata_schema, get_log_txn_schema, CommitInfo, DomainMetadata, SetTransaction, + INTERNAL_DOMAIN_PREFIX, }; #[cfg(feature = "catalog-managed")] use crate::committer::FileSystemCommitter; @@ -134,7 +135,12 @@ pub struct Transaction { // commit-wide timestamp (in milliseconds since epoch) - used in ICT, `txn` action, etc. to // keep all timestamps within the same commit consistent. commit_timestamp: i64, + // Domain metadata additions for this transaction. domain_metadatas: Vec, + // Domain names to remove in this transaction. The configuration values are fetched during + // commit from the log to preserve the pre-image in tombstones. We use Vec to preserve + // duplicate detection during validation (duplicates are a user error). + domains_to_remove: Vec, // Whether this transaction contains any logical data changes. data_change: bool, } @@ -178,6 +184,7 @@ impl Transaction { set_transactions: vec![], commit_timestamp, domain_metadatas: vec![], + domains_to_remove: vec![], data_change: true, }) } @@ -362,9 +369,7 @@ impl Transaction { /// fail (that is, we don't eagerly check domain validity here). /// Removing metadata for multiple distinct domains is allowed. pub fn with_domain_metadata_removed(mut self, domain: String) -> Self { - // actual configuration value determined during commit - self.domain_metadatas - .push(DomainMetadata::remove(domain, String::new())); + self.domains_to_remove.push(domain); self } @@ -378,8 +383,8 @@ impl Transaction { engine: &'a dyn Engine, row_tracking_high_watermark: Option, ) -> DeltaResult>> + 'a> { - // if there are domain metadata actions, the table must support it - if !self.domain_metadatas.is_empty() + // if there are domain metadata actions (additions or removals), the table must support it + if (!self.domain_metadatas.is_empty() || !self.domains_to_remove.is_empty()) && !self .read_snapshot .table_configuration() @@ -390,9 +395,8 @@ impl Transaction { )); } - // validate user domain metadata and check if we have removals + // validate domain metadata additions let mut seen_domains = HashSet::new(); - let mut has_removals = false; for dm in &self.domain_metadatas { if dm.is_internal() { return Err(Error::Generic( @@ -407,41 +411,50 @@ impl Transaction { dm.domain() ))); } + } - if dm.is_removed() { - has_removals = true; + // validate domain metadata removals + for domain in &self.domains_to_remove { + if domain.starts_with(INTERNAL_DOMAIN_PREFIX) { + return Err(Error::Generic( + "Cannot modify domains that start with 'delta.' as those are system controlled" + .to_string(), + )); + } + + if !seen_domains.insert(domain) { + return Err(Error::Generic(format!( + "Metadata for domain {} already specified in this transaction", + domain + ))); } } - // fetch previous configuration values (requires log replay) - let existing_domains = if has_removals { + // fetch previous configuration values if we have removals (requires log replay) + let existing_domains = if !self.domains_to_remove.is_empty() { scan_domain_metadatas(self.read_snapshot.log_segment(), None, engine)? } else { HashMap::new() }; - let user_domains = self - .domain_metadatas - .iter() - .filter_map(move |dm: &DomainMetadata| { - if dm.is_removed() { - existing_domains.get(dm.domain()).map(|existing| { - DomainMetadata::remove( - dm.domain().to_string(), - existing.configuration().to_string(), - ) - }) - } else { - Some(dm.clone()) - } - }); + // process domain additions - clone directly since domain_metadatas now only contains additions + let domain_additions = self.domain_metadatas.iter().cloned(); + + // process domain removals - fetch configuration from existing domains and create tombstones + let domain_removals = self.domains_to_remove.iter().filter_map(move |domain| { + // if domain doesn't exist in the log, this is a no-op (filter it out) + existing_domains.get(domain).map(|existing| { + DomainMetadata::remove(domain.clone(), existing.configuration().to_string()) + }) + }); let system_domains = row_tracking_high_watermark .map(DomainMetadata::try_from) .transpose()? .into_iter(); - Ok(user_domains + Ok(domain_additions + .chain(domain_removals) .chain(system_domains) .map(|dm| dm.into_engine_data(get_log_domain_metadata_schema().clone(), engine))) } From 279ddd69759140a8ab5ac2f2ee7ab6e628dabe24 Mon Sep 17 00:00:00 2001 From: DrakeLin Date: Wed, 22 Oct 2025 23:52:17 +0000 Subject: [PATCH 2/4] fix2 --- kernel/src/actions/mod.rs | 17 ++------------ kernel/src/row_tracking.rs | 1 + kernel/src/transaction/mod.rs | 44 ++++++++++++++--------------------- 3 files changed, 20 insertions(+), 42 deletions(-) diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index ae7669912b..4c0a36f0c0 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -961,20 +961,11 @@ pub(crate) struct DomainMetadata { impl DomainMetadata { /// Create a new DomainMetadata action. - pub(crate) fn new(domain: String, configuration: String) -> Self { + pub(crate) fn new(domain: String, configuration: String, removed: bool) -> Self { Self { domain, configuration, - removed: false, - } - } - - // Create a new DomainMetadata action to remove a domain. - pub(crate) fn remove(domain: String, configuration: String) -> Self { - Self { - domain, - configuration, - removed: true, + removed, } } @@ -992,10 +983,6 @@ impl DomainMetadata { pub(crate) fn configuration(&self) -> &str { &self.configuration } - - pub(crate) fn is_removed(&self) -> bool { - self.removed - } } #[cfg(test)] diff --git a/kernel/src/row_tracking.rs b/kernel/src/row_tracking.rs index d4f56b3dd3..cc6b199d60 100644 --- a/kernel/src/row_tracking.rs +++ b/kernel/src/row_tracking.rs @@ -63,6 +63,7 @@ impl TryFrom for DomainMetadata { Ok(DomainMetadata::new( RowTrackingDomainMetadata::ROW_TRACKING_DOMAIN_NAME.to_string(), serde_json::to_string(&metadata)?, + false, )) } } diff --git a/kernel/src/transaction/mod.rs b/kernel/src/transaction/mod.rs index da624e4a42..2eb778392c 100644 --- a/kernel/src/transaction/mod.rs +++ b/kernel/src/transaction/mod.rs @@ -136,10 +136,9 @@ pub struct Transaction { // keep all timestamps within the same commit consistent. commit_timestamp: i64, // Domain metadata additions for this transaction. - domain_metadatas: Vec, + domain_metadatas_to_add: Vec, // Domain names to remove in this transaction. The configuration values are fetched during - // commit from the log to preserve the pre-image in tombstones. We use Vec to preserve - // duplicate detection during validation (duplicates are a user error). + // commit from the log to preserve the pre-image in tombstones. domains_to_remove: Vec, // Whether this transaction contains any logical data changes. data_change: bool, @@ -183,7 +182,7 @@ impl Transaction { add_files_metadata: vec![], set_transactions: vec![], commit_timestamp, - domain_metadatas: vec![], + domain_metadatas_to_add: vec![], domains_to_remove: vec![], data_change: true, }) @@ -354,8 +353,8 @@ impl Transaction { /// fail (that is, we don't eagerly check domain validity here). /// Setting metadata for multiple distinct domains is allowed. pub fn with_domain_metadata(mut self, domain: String, configuration: String) -> Self { - self.domain_metadatas - .push(DomainMetadata::new(domain, configuration)); + self.domain_metadatas_to_add + .push(DomainMetadata::new(domain, configuration, false)); self } @@ -384,7 +383,7 @@ impl Transaction { row_tracking_high_watermark: Option, ) -> DeltaResult>> + 'a> { // if there are domain metadata actions (additions or removals), the table must support it - if (!self.domain_metadatas.is_empty() || !self.domains_to_remove.is_empty()) + if (!self.domain_metadatas_to_add.is_empty() || !self.domains_to_remove.is_empty()) && !self .read_snapshot .table_configuration() @@ -395,26 +394,17 @@ impl Transaction { )); } - // validate domain metadata additions + // validate all domain metadata operations (additions and removals) let mut seen_domains = HashSet::new(); - for dm in &self.domain_metadatas { - if dm.is_internal() { - return Err(Error::Generic( - "Cannot modify domains that start with 'delta.' as those are system controlled" - .to_string(), - )); - } - if !seen_domains.insert(dm.domain()) { - return Err(Error::Generic(format!( - "Metadata for domain {} already specified in this transaction", - dm.domain() - ))); - } - } + // chain both additions and removals into a single iterator of domain names + let all_domains = self + .domain_metadatas_to_add + .iter() + .map(|dm| dm.domain()) + .chain(self.domains_to_remove.iter().map(String::as_str)); - // validate domain metadata removals - for domain in &self.domains_to_remove { + for domain in all_domains { if domain.starts_with(INTERNAL_DOMAIN_PREFIX) { return Err(Error::Generic( "Cannot modify domains that start with 'delta.' as those are system controlled" @@ -437,14 +427,14 @@ impl Transaction { HashMap::new() }; - // process domain additions - clone directly since domain_metadatas now only contains additions - let domain_additions = self.domain_metadatas.iter().cloned(); + // process domain additions - clone directly since domain_metadatas_to_add only contains additions + let domain_additions = self.domain_metadatas_to_add.iter().cloned(); // process domain removals - fetch configuration from existing domains and create tombstones let domain_removals = self.domains_to_remove.iter().filter_map(move |domain| { // if domain doesn't exist in the log, this is a no-op (filter it out) existing_domains.get(domain).map(|existing| { - DomainMetadata::remove(domain.clone(), existing.configuration().to_string()) + DomainMetadata::new(domain.clone(), existing.configuration().to_owned(), true) }) }); From 88c0c2fb3bc686d1d5db6ab0fc1d596e1519a079 Mon Sep 17 00:00:00 2001 From: DrakeLin Date: Thu, 23 Oct 2025 00:41:52 +0000 Subject: [PATCH 3/4] comments --- kernel/src/actions/mod.rs | 13 +++- kernel/src/row_tracking.rs | 1 - kernel/src/transaction/mod.rs | 141 ++++++++++++++++++++-------------- 3 files changed, 94 insertions(+), 61 deletions(-) diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index 4c0a36f0c0..5540b478b1 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -961,11 +961,20 @@ pub(crate) struct DomainMetadata { impl DomainMetadata { /// Create a new DomainMetadata action. - pub(crate) fn new(domain: String, configuration: String, removed: bool) -> Self { + pub(crate) fn new(domain: String, configuration: String) -> Self { Self { domain, configuration, - removed, + removed: false, + } + } + + /// Create a new DomainMetadata action to remove a domain. + pub(crate) fn remove(domain: String, configuration: String) -> Self { + Self { + domain, + configuration, + removed: true, } } diff --git a/kernel/src/row_tracking.rs b/kernel/src/row_tracking.rs index cc6b199d60..d4f56b3dd3 100644 --- a/kernel/src/row_tracking.rs +++ b/kernel/src/row_tracking.rs @@ -63,7 +63,6 @@ impl TryFrom for DomainMetadata { Ok(DomainMetadata::new( RowTrackingDomainMetadata::ROW_TRACKING_DOMAIN_NAME.to_string(), serde_json::to_string(&metadata)?, - false, )) } } diff --git a/kernel/src/transaction/mod.rs b/kernel/src/transaction/mod.rs index 2eb778392c..0382cc9874 100644 --- a/kernel/src/transaction/mod.rs +++ b/kernel/src/transaction/mod.rs @@ -1,4 +1,4 @@ -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; use std::iter; use std::ops::Deref; use std::sync::{Arc, LazyLock}; @@ -136,10 +136,10 @@ pub struct Transaction { // keep all timestamps within the same commit consistent. commit_timestamp: i64, // Domain metadata additions for this transaction. - domain_metadatas_to_add: Vec, + domain_metadata_additions: Vec, // Domain names to remove in this transaction. The configuration values are fetched during // commit from the log to preserve the pre-image in tombstones. - domains_to_remove: Vec, + domain_removals: Vec, // Whether this transaction contains any logical data changes. data_change: bool, } @@ -182,8 +182,8 @@ impl Transaction { add_files_metadata: vec![], set_transactions: vec![], commit_timestamp, - domain_metadatas_to_add: vec![], - domains_to_remove: vec![], + domain_metadata_additions: vec![], + domain_removals: vec![], data_change: true, }) } @@ -353,8 +353,8 @@ impl Transaction { /// fail (that is, we don't eagerly check domain validity here). /// Setting metadata for multiple distinct domains is allowed. pub fn with_domain_metadata(mut self, domain: String, configuration: String) -> Self { - self.domain_metadatas_to_add - .push(DomainMetadata::new(domain, configuration, false)); + self.domain_metadata_additions + .push(DomainMetadata::new(domain, configuration)); self } @@ -368,10 +368,50 @@ impl Transaction { /// fail (that is, we don't eagerly check domain validity here). /// Removing metadata for multiple distinct domains is allowed. pub fn with_domain_metadata_removed(mut self, domain: String) -> Self { - self.domains_to_remove.push(domain); + self.domain_removals.push(domain); self } + /// Validate that user domains don't conflict with system domains or each other. + fn validate_user_domain_operations(&self) -> DeltaResult<()> { + let mut seen_domains = HashSet::new(); + + // Validate domain additions + for dm in &self.domain_metadata_additions { + let domain = dm.domain(); + if domain.starts_with(INTERNAL_DOMAIN_PREFIX) { + return Err(Error::generic( + "Cannot modify domains that start with 'delta.' as those are system controlled", + )); + } + + if !seen_domains.insert(domain) { + return Err(Error::generic(format!( + "Metadata for domain {} already specified in this transaction", + domain + ))); + } + } + + // Validate domain removals + for domain in &self.domain_removals { + if domain.starts_with(INTERNAL_DOMAIN_PREFIX) { + return Err(Error::generic( + "Cannot modify domains that start with 'delta.' as those are system controlled", + )); + } + + if !seen_domains.insert(domain.as_str()) { + return Err(Error::generic(format!( + "Metadata for domain {} already specified in this transaction", + domain + ))); + } + } + + Ok(()) + } + /// Generate domain metadata actions with validation. Handle both user and system domains. /// /// This function may perform an expensive log replay operation if there are any domain removals. @@ -382,70 +422,55 @@ impl Transaction { engine: &'a dyn Engine, row_tracking_high_watermark: Option, ) -> DeltaResult>> + 'a> { - // if there are domain metadata actions (additions or removals), the table must support it - if (!self.domain_metadatas_to_add.is_empty() || !self.domains_to_remove.is_empty()) + // Validate feature support for user domain operations + if (!self.domain_metadata_additions.is_empty() || !self.domain_removals.is_empty()) && !self .read_snapshot .table_configuration() .is_domain_metadata_supported() { - return Err(Error::unsupported( - "Domain metadata operations require writer version 7 and the 'domainMetadata' writer feature" - )); + return Err(Error::generic("Domain metadata operations require writer version 7 and the 'domainMetadata' writer feature")); } - // validate all domain metadata operations (additions and removals) - let mut seen_domains = HashSet::new(); - - // chain both additions and removals into a single iterator of domain names - let all_domains = self - .domain_metadatas_to_add - .iter() - .map(|dm| dm.domain()) - .chain(self.domains_to_remove.iter().map(String::as_str)); - - for domain in all_domains { - if domain.starts_with(INTERNAL_DOMAIN_PREFIX) { - return Err(Error::Generic( - "Cannot modify domains that start with 'delta.' as those are system controlled" - .to_string(), - )); - } - - if !seen_domains.insert(domain) { - return Err(Error::Generic(format!( - "Metadata for domain {} already specified in this transaction", - domain - ))); - } - } - - // fetch previous configuration values if we have removals (requires log replay) - let existing_domains = if !self.domains_to_remove.is_empty() { - scan_domain_metadatas(self.read_snapshot.log_segment(), None, engine)? + // Validate user domain operations + self.validate_user_domain_operations()?; + + // Generate user domain removals via log replay (expensive if non-empty) + let removal_actions = if !self.domain_removals.is_empty() { + // Scan log to fetch existing configurations for tombstones + let existing_domains = + scan_domain_metadatas(self.read_snapshot.log_segment(), None, engine)?; + + // Create removal tombstones with pre-image configurations + let removals: Vec<_> = self + .domain_removals + .iter() + .filter_map(|domain| { + // If domain doesn't exist in the log, this is a no-op (filter it out) + existing_domains.get(domain).map(|existing| { + DomainMetadata::remove(domain.clone(), existing.configuration().to_owned()) + }) + }) + .collect(); + + removals } else { - HashMap::new() + vec![] }; - // process domain additions - clone directly since domain_metadatas_to_add only contains additions - let domain_additions = self.domain_metadatas_to_add.iter().cloned(); - - // process domain removals - fetch configuration from existing domains and create tombstones - let domain_removals = self.domains_to_remove.iter().filter_map(move |domain| { - // if domain doesn't exist in the log, this is a no-op (filter it out) - existing_domains.get(domain).map(|existing| { - DomainMetadata::new(domain.clone(), existing.configuration().to_owned(), true) - }) - }); - - let system_domains = row_tracking_high_watermark + // Generate system domain actions (row tracking) + let system_domain_actions = row_tracking_high_watermark .map(DomainMetadata::try_from) .transpose()? .into_iter(); - Ok(domain_additions - .chain(domain_removals) - .chain(system_domains) + // Chain all domain actions and convert to EngineData + Ok(self + .domain_metadata_additions + .clone() + .into_iter() + .chain(removal_actions) + .chain(system_domain_actions) .map(|dm| dm.into_engine_data(get_log_domain_metadata_schema().clone(), engine))) } From eb3a3575c95f74729c5dbd76fe0a1be004187310 Mon Sep 17 00:00:00 2001 From: DrakeLin Date: Mon, 27 Oct 2025 19:01:30 +0000 Subject: [PATCH 4/4] test --- kernel/src/transaction/mod.rs | 2 +- kernel/tests/write.rs | 36 +++++++++++++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/kernel/src/transaction/mod.rs b/kernel/src/transaction/mod.rs index 0382cc9874..1e8c2592a3 100644 --- a/kernel/src/transaction/mod.rs +++ b/kernel/src/transaction/mod.rs @@ -429,7 +429,7 @@ impl Transaction { .table_configuration() .is_domain_metadata_supported() { - return Err(Error::generic("Domain metadata operations require writer version 7 and the 'domainMetadata' writer feature")); + return Err(Error::unsupported("Domain metadata operations require writer version 7 and the 'domainMetadata' writer feature")); } // Validate user domain operations diff --git a/kernel/tests/write.rs b/kernel/tests/write.rs index cc534b36d1..53c6efc819 100644 --- a/kernel/tests/write.rs +++ b/kernel/tests/write.rs @@ -1409,6 +1409,42 @@ async fn test_set_domain_metadata_unsupported_writer_feature( Ok(()) } +#[tokio::test] +async fn test_remove_domain_metadata_unsupported_writer_feature( +) -> Result<(), Box> { + let _ = tracing_subscriber::fmt::try_init(); + + let schema = Arc::new(StructType::try_new(vec![StructField::nullable( + "number", + DataType::INTEGER, + )])?); + + let table_name = "test_remove_domain_metadata_unsupported"; + + // Create table WITHOUT domain metadata writer feature support + let (store, engine, table_location) = engine_store_setup(table_name, None); + let table_url = create_table( + store.clone(), + table_location, + schema.clone(), + &[], + true, + vec![], + vec![], + ) + .await?; + + let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?; + let res = snapshot + .transaction(Box::new(FileSystemCommitter::new()))? + .with_domain_metadata_removed("app.config".to_string()) + .commit(&engine); + + assert_result_error_with_message(res, "Domain metadata operations require writer version 7 and the 'domainMetadata' writer feature"); + + Ok(()) +} + #[tokio::test] async fn test_remove_domain_metadata_non_existent_domain() -> Result<(), Box> {