Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -969,7 +969,7 @@ impl DomainMetadata {
}
}

// Create a new DomainMetadata action to remove a domain.
/// Create a new DomainMetadata action to remove a domain.
pub(crate) fn remove(domain: String, configuration: String) -> Self {
Self {
domain,
Expand All @@ -992,10 +992,6 @@ impl DomainMetadata {
pub(crate) fn configuration(&self) -> &str {
&self.configuration
}

pub(crate) fn is_removed(&self) -> bool {
self.removed
}
}

#[cfg(test)]
Expand Down
142 changes: 85 additions & 57 deletions kernel/src/transaction/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::{HashMap, HashSet};
use std::collections::HashSet;
use std::iter;
use std::ops::Deref;
use std::sync::{Arc, LazyLock};
Expand All @@ -8,6 +8,7 @@ use url::Url;
use crate::actions::{
as_log_add_schema, domain_metadata::scan_domain_metadatas, get_log_commit_info_schema,
get_log_domain_metadata_schema, get_log_txn_schema, CommitInfo, DomainMetadata, SetTransaction,
INTERNAL_DOMAIN_PREFIX,
};
#[cfg(feature = "catalog-managed")]
use crate::committer::FileSystemCommitter;
Expand Down Expand Up @@ -134,7 +135,11 @@ pub struct Transaction {
// commit-wide timestamp (in milliseconds since epoch) - used in ICT, `txn` action, etc. to
// keep all timestamps within the same commit consistent.
commit_timestamp: i64,
domain_metadatas: Vec<DomainMetadata>,
// Domain metadata additions for this transaction.
domain_metadata_additions: Vec<DomainMetadata>,
// Domain names to remove in this transaction. The configuration values are fetched during
// commit from the log to preserve the pre-image in tombstones.
domain_removals: Vec<String>,
// Whether this transaction contains any logical data changes.
data_change: bool,
}
Expand Down Expand Up @@ -177,7 +182,8 @@ impl Transaction {
add_files_metadata: vec![],
set_transactions: vec![],
commit_timestamp,
domain_metadatas: vec![],
domain_metadata_additions: vec![],
domain_removals: vec![],
data_change: true,
})
}
Expand Down Expand Up @@ -347,7 +353,7 @@ impl Transaction {
/// fail (that is, we don't eagerly check domain validity here).
/// Setting metadata for multiple distinct domains is allowed.
pub fn with_domain_metadata(mut self, domain: String, configuration: String) -> Self {
self.domain_metadatas
self.domain_metadata_additions
.push(DomainMetadata::new(domain, configuration));
self
}
Expand All @@ -362,12 +368,50 @@ impl Transaction {
/// fail (that is, we don't eagerly check domain validity here).
/// Removing metadata for multiple distinct domains is allowed.
pub fn with_domain_metadata_removed(mut self, domain: String) -> Self {
// actual configuration value determined during commit
self.domain_metadatas
.push(DomainMetadata::remove(domain, String::new()));
self.domain_removals.push(domain);
self
}

/// Validate that user domains don't conflict with system domains or each other.
fn validate_user_domain_operations(&self) -> DeltaResult<()> {
let mut seen_domains = HashSet::new();

// Validate domain additions
for dm in &self.domain_metadata_additions {
let domain = dm.domain();
if domain.starts_with(INTERNAL_DOMAIN_PREFIX) {
return Err(Error::generic(
"Cannot modify domains that start with 'delta.' as those are system controlled",
));
}

if !seen_domains.insert(domain) {
return Err(Error::generic(format!(
"Metadata for domain {} already specified in this transaction",
domain
)));
}
}

// Validate domain removals
for domain in &self.domain_removals {
if domain.starts_with(INTERNAL_DOMAIN_PREFIX) {
return Err(Error::generic(
"Cannot modify domains that start with 'delta.' as those are system controlled",
));
}

if !seen_domains.insert(domain.as_str()) {
return Err(Error::generic(format!(
"Metadata for domain {} already specified in this transaction",
domain
)));
}
}

Ok(())
}

/// Generate domain metadata actions with validation. Handle both user and system domains.
///
/// This function may perform an expensive log replay operation if there are any domain removals.
Expand All @@ -378,71 +422,55 @@ impl Transaction {
engine: &'a dyn Engine,
row_tracking_high_watermark: Option<RowTrackingDomainMetadata>,
) -> DeltaResult<impl Iterator<Item = DeltaResult<Box<dyn EngineData>>> + 'a> {
// if there are domain metadata actions, the table must support it
if !self.domain_metadatas.is_empty()
// Validate feature support for user domain operations
if (!self.domain_metadata_additions.is_empty() || !self.domain_removals.is_empty())
&& !self
.read_snapshot
.table_configuration()
.is_domain_metadata_supported()
{
return Err(Error::unsupported(
"Domain metadata operations require writer version 7 and the 'domainMetadata' writer feature"
));
return Err(Error::generic("Domain metadata operations require writer version 7 and the 'domainMetadata' writer feature"));
}

// validate user domain metadata and check if we have removals
let mut seen_domains = HashSet::new();
let mut has_removals = false;
for dm in &self.domain_metadatas {
if dm.is_internal() {
return Err(Error::Generic(
"Cannot modify domains that start with 'delta.' as those are system controlled"
.to_string(),
));
}

if !seen_domains.insert(dm.domain()) {
return Err(Error::Generic(format!(
"Metadata for domain {} already specified in this transaction",
dm.domain()
)));
}

if dm.is_removed() {
has_removals = true;
}
}
// Validate user domain operations
self.validate_user_domain_operations()?;

// Generate user domain removals via log replay (expensive if non-empty)
let removal_actions = if !self.domain_removals.is_empty() {
// Scan log to fetch existing configurations for tombstones
let existing_domains =
scan_domain_metadatas(self.read_snapshot.log_segment(), None, engine)?;

// Create removal tombstones with pre-image configurations
let removals: Vec<_> = self
.domain_removals
.iter()
.filter_map(|domain| {
// If domain doesn't exist in the log, this is a no-op (filter it out)
existing_domains.get(domain).map(|existing| {
DomainMetadata::remove(domain.clone(), existing.configuration().to_owned())
})
})
.collect();

// fetch previous configuration values (requires log replay)
let existing_domains = if has_removals {
scan_domain_metadatas(self.read_snapshot.log_segment(), None, engine)?
removals
} else {
HashMap::new()
vec![]
};

let user_domains = self
.domain_metadatas
.iter()
.filter_map(move |dm: &DomainMetadata| {
if dm.is_removed() {
existing_domains.get(dm.domain()).map(|existing| {
DomainMetadata::remove(
dm.domain().to_string(),
existing.configuration().to_string(),
)
})
} else {
Some(dm.clone())
}
});

let system_domains = row_tracking_high_watermark
// Generate system domain actions (row tracking)
let system_domain_actions = row_tracking_high_watermark
.map(DomainMetadata::try_from)
.transpose()?
.into_iter();

Ok(user_domains
.chain(system_domains)
// Chain all domain actions and convert to EngineData
Ok(self
.domain_metadata_additions
.clone()
.into_iter()
Comment on lines +469 to +471
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm.. cloning everything since this method takes &self? is it only used in commit (which takes self)? if so should we just make this take self too?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commit uses self later in the function, so we'd need to use &mut self + std::mem::take(), but it gets annoying because:

  • commit would need to be mut self
  • We'd leave the transaction in a partially-moved state mid-function

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will leave as is, with room for optimization later

.chain(removal_actions)
.chain(system_domain_actions)
.map(|dm| dm.into_engine_data(get_log_domain_metadata_schema().clone(), engine)))
}

Expand Down
Loading