diff --git a/acceptance/Cargo.toml b/acceptance/Cargo.toml index b9926441a..263fa2179 100644 --- a/acceptance/Cargo.toml +++ b/acceptance/Cargo.toml @@ -36,6 +36,7 @@ tar = "0.4" [dev-dependencies] datatest-stable = "0.3" test-log = { version = "0.2", default-features = false, features = ["trace"] } +test_utils = { path = "../test-utils" } tempfile = "3" test-case = { version = "3.3.1" } tokio = { version = "1.47" } diff --git a/acceptance/tests/dat_reader.rs b/acceptance/tests/dat_reader.rs index 0d8d283e7..596af4cf1 100644 --- a/acceptance/tests/dat_reader.rs +++ b/acceptance/tests/dat_reader.rs @@ -1,9 +1,6 @@ use std::path::Path; -use std::sync::Arc; use acceptance::read_dat_case; -use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; -use delta_kernel::engine::default::DefaultEngine; // TODO(zach): skip iceberg_compat_v1 test until DAT is fixed static SKIPPED_TESTS: &[&str; 1] = &["iceberg_compat_v1"]; @@ -27,14 +24,7 @@ fn reader_test(path: &Path) -> datatest_stable::Result<()> { .block_on(async { let case = read_dat_case(root_dir).unwrap(); let table_root = case.table_root().unwrap(); - let engine = Arc::new( - DefaultEngine::try_new( - &table_root, - std::iter::empty::<(&str, &str)>(), - Arc::new(TokioBackgroundExecutor::new()), - ) - .unwrap(), - ); + let engine = test_utils::create_default_engine(&table_root).unwrap(); case.assert_metadata(engine.clone()).await.unwrap(); acceptance::data::assert_scan_metadata(engine.clone(), &case) diff --git a/acceptance/tests/other.rs b/acceptance/tests/other.rs index b6e9db681..09d316371 100644 --- a/acceptance/tests/other.rs +++ b/acceptance/tests/other.rs @@ -38,9 +38,7 @@ async fn test_read_table_with_checkpoint() { )) .unwrap(); let location = url::Url::from_directory_path(path).unwrap(); - let engine = Arc::new( - DefaultEngine::try_new(&location, HashMap::::new()).unwrap(), - ); + let engine = test_utils::create_default_engine(&location).unwrap(); let snapshot = Snapshot::try_new(location, engine, None) .await .unwrap(); diff --git a/ffi/src/domain_metadata.rs b/ffi/src/domain_metadata.rs index 7f51dd607..82ed29a93 100644 --- a/ffi/src/domain_metadata.rs +++ b/ffi/src/domain_metadata.rs @@ -46,7 +46,6 @@ mod tests { recover_string, }; use crate::{engine_to_handle, free_engine, free_snapshot, kernel_string_slice, snapshot}; - use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; use delta_kernel::engine::default::DefaultEngine; use delta_kernel::DeltaResult; use object_store::memory::InMemory; @@ -58,7 +57,7 @@ mod tests { async fn test_domain_metadata() -> DeltaResult<()> { let storage = Arc::new(InMemory::new()); - let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(storage.clone()); let engine = engine_to_handle(Arc::new(engine), allocate_err); let path = "memory:///"; diff --git a/ffi/src/lib.rs b/ffi/src/lib.rs index 746b3ed88..60cb101e0 100644 --- a/ffi/src/lib.rs +++ b/ffi/src/lib.rs @@ -560,13 +560,11 @@ fn get_default_engine_impl( allocate_error: AllocateErrorFn, ) -> DeltaResult> { use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; + use delta_kernel::engine::default::storage::store_from_url_opts; use delta_kernel::engine::default::DefaultEngine; - let engine = DefaultEngine::::try_new( - &url, - options, - Arc::new(TokioBackgroundExecutor::new()), - ); - Ok(engine_to_handle(Arc::new(engine?), allocate_error)) + let store = store_from_url_opts(&url, options)?; + let engine = DefaultEngine::::new(store); + Ok(engine_to_handle(Arc::new(engine), allocate_error)) } /// # Safety @@ -823,7 +821,7 @@ mod tests { allocate_err, allocate_str, assert_extern_result_error_with_message, ok_or_panic, recover_string, }; - use delta_kernel::engine::default::{executor::tokio::TokioBackgroundExecutor, DefaultEngine}; + use delta_kernel::engine::default::DefaultEngine; use object_store::memory::InMemory; use test_utils::{actions_to_string, actions_to_string_partitioned, add_commit, TestAction}; @@ -870,7 +868,7 @@ mod tests { actions_to_string(vec![TestAction::Metadata]), ) .await?; - let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(storage.clone()); let engine = engine_to_handle(Arc::new(engine), allocate_err); let path = "memory:///"; @@ -916,7 +914,7 @@ mod tests { actions_to_string_partitioned(vec![TestAction::Metadata]), ) .await?; - let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(storage.clone()); let engine = engine_to_handle(Arc::new(engine), allocate_err); let path = "memory:///"; @@ -952,7 +950,7 @@ mod tests { actions_to_string(vec![TestAction::Metadata]), ) .await?; - let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(storage.clone()); let engine = engine_to_handle(Arc::new(engine), allocate_null_err); let path = "memory:///"; diff --git a/ffi/src/table_changes.rs b/ffi/src/table_changes.rs index 1df3a682e..450f0ef07 100644 --- a/ffi/src/table_changes.rs +++ b/ffi/src/table_changes.rs @@ -351,7 +351,7 @@ mod tests { use delta_kernel::arrow::record_batch::RecordBatch; use delta_kernel::arrow::util::pretty::pretty_format_batches; use delta_kernel::engine::arrow_conversion::TryIntoArrow as _; - use delta_kernel::engine::default::{executor::tokio::TokioBackgroundExecutor, DefaultEngine}; + use delta_kernel::engine::default::DefaultEngine; use delta_kernel::schema::{DataType, StructField, StructType}; use delta_kernel::Engine; use delta_kernel_ffi::engine_data::get_engine_data; @@ -545,7 +545,7 @@ mod tests { put_file(storage.as_ref(), PARQUET_FILE2.to_string(), &batch).await?; let path = "memory:///"; - let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(storage); let engine = engine_to_handle(Arc::new(engine), allocate_err); let table_changes = ok_or_panic(unsafe { @@ -632,7 +632,7 @@ mod tests { put_file(storage.as_ref(), PARQUET_FILE2.to_string(), &batch).await?; let path = "memory:///"; - let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(storage); let engine = engine_to_handle(Arc::new(engine), allocate_err); let table_changes = ok_or_panic(unsafe { @@ -688,7 +688,7 @@ mod tests { put_file(storage.as_ref(), PARQUET_FILE2.to_string(), &batch).await?; let path = "memory:///"; - let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(storage); let engine = engine_to_handle(Arc::new(engine), allocate_err); let table_changes = ok_or_panic(unsafe { @@ -768,7 +768,7 @@ mod tests { put_file(storage.as_ref(), PARQUET_FILE2.to_string(), &batch).await?; let path = "memory:///"; - let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(storage); let engine = engine_to_handle(Arc::new(engine), allocate_err); let table_changes = ok_or_panic(unsafe { diff --git a/kernel/benches/metadata_bench.rs b/kernel/benches/metadata_bench.rs index 86168baf1..65676675e 100644 --- a/kernel/benches/metadata_bench.rs +++ b/kernel/benches/metadata_bench.rs @@ -17,7 +17,6 @@ //! //! Follow-ups: -use std::collections::HashMap; use std::sync::Arc; use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; @@ -41,9 +40,9 @@ fn setup() -> (TempDir, Url, Arc>) { let table_path = tempdir.path().join(table); let url = try_parse_uri(table_path.to_str().unwrap()).expect("Failed to parse table path"); // TODO: use multi-threaded executor - let executor = Arc::new(TokioBackgroundExecutor::new()); - let engine = DefaultEngine::try_new(&url, HashMap::::new(), executor) - .expect("Failed to create engine"); + use delta_kernel::engine::default::storage::store_from_url; + let store = store_from_url(&url).expect("Failed to create store"); + let engine = DefaultEngine::new(store); (tempdir, url, Arc::new(engine)) } diff --git a/kernel/examples/common/src/lib.rs b/kernel/examples/common/src/lib.rs index 7023f2632..e21e12033 100644 --- a/kernel/examples/common/src/lib.rs +++ b/kernel/examples/common/src/lib.rs @@ -4,11 +4,9 @@ use std::{collections::HashMap, sync::Arc}; use clap::{Args, CommandFactory, FromArgMatches}; use delta_kernel::{ - arrow::array::RecordBatch, - engine::default::{executor::tokio::TokioBackgroundExecutor, DefaultEngine}, - scan::Scan, - schema::MetadataColumnSpec, - DeltaResult, SnapshotRef, + arrow::array::RecordBatch, engine::default::executor::tokio::TokioBackgroundExecutor, + engine::default::storage::store_from_url_opts, engine::default::DefaultEngine, scan::Scan, + schema::MetadataColumnSpec, DeltaResult, SnapshotRef, }; use object_store::{ @@ -158,16 +156,13 @@ pub fn get_engine( ))); } }; - Ok(DefaultEngine::new( - store, - Arc::new(TokioBackgroundExecutor::new()), - )) + Ok(DefaultEngine::new(Arc::new(store))) } else if !args.option.is_empty() { let opts = args.option.iter().map(|option| { let parts: Vec<&str> = option.split("=").collect(); (parts[0].to_ascii_lowercase(), parts[1]) }); - DefaultEngine::try_new(url, opts, Arc::new(TokioBackgroundExecutor::new())) + Ok(DefaultEngine::new(store_from_url_opts(url, opts)?)) } else { let mut options = if let Some(ref region) = args.region { HashMap::from([("region", region.clone())]) @@ -177,7 +172,7 @@ pub fn get_engine( if args.public { options.insert("skip_signature", "true".to_string()); } - DefaultEngine::try_new(url, options, Arc::new(TokioBackgroundExecutor::new())) + Ok(DefaultEngine::new(store_from_url_opts(url, options)?)) } } diff --git a/kernel/examples/write-table/src/main.rs b/kernel/examples/write-table/src/main.rs index a1335be7a..d561b4e01 100644 --- a/kernel/examples/write-table/src/main.rs +++ b/kernel/examples/write-table/src/main.rs @@ -76,11 +76,8 @@ async fn try_main() -> DeltaResult<()> { println!("Using Delta table at: {url}"); // Get the engine for local filesystem - let engine = DefaultEngine::try_new( - &url, - HashMap::::new(), - Arc::new(TokioBackgroundExecutor::new()), - )?; + use delta_kernel::engine::default::storage::store_from_url; + let engine = DefaultEngine::new(store_from_url(&url)?); // Create or get the table let snapshot = create_or_get_base_snapshot(&url, &engine, &cli.schema).await?; diff --git a/kernel/src/checkpoint/tests.rs b/kernel/src/checkpoint/tests.rs index 64fca3d2c..abf9507b5 100644 --- a/kernel/src/checkpoint/tests.rs +++ b/kernel/src/checkpoint/tests.rs @@ -8,7 +8,7 @@ use crate::arrow::array::{ArrayRef, StructArray}; use crate::arrow::datatypes::{DataType, Schema}; use crate::checkpoint::create_last_checkpoint_data; use crate::engine::arrow_data::ArrowEngineData; -use crate::engine::default::{executor::tokio::TokioBackgroundExecutor, DefaultEngine}; +use crate::engine::default::DefaultEngine; use crate::log_replay::HasSelectionVector; use crate::schema::{DataType as KernelDataType, StructField, StructType}; use crate::utils::test_utils::Action; @@ -60,7 +60,7 @@ fn test_deleted_file_retention_timestamp() -> DeltaResult<()> { #[test] fn test_create_checkpoint_metadata_batch() -> DeltaResult<()> { let (store, _) = new_in_memory_store(); - let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(store.clone()); // 1st commit (version 0) - metadata and protocol actions // Protocol action does not include the v2Checkpoint reader/writer feature. @@ -116,7 +116,7 @@ fn test_create_last_checkpoint_data() -> DeltaResult<()> { let add_actions_counter = 75; let size_in_bytes: i64 = 1024 * 1024; // 1MB let (store, _) = new_in_memory_store(); - let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(store.clone()); // Create last checkpoint metadata let last_checkpoint_batch = create_last_checkpoint_data( @@ -277,7 +277,7 @@ fn read_last_checkpoint_file(store: &Arc) -> DeltaResult { #[test] fn test_v1_checkpoint_latest_version_by_default() -> DeltaResult<()> { let (store, _) = new_in_memory_store(); - let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(store.clone()); // 1st commit: adds `fake_path_1` write_commit_to_store(&store, vec![create_add_action("fake_path_1")], 0)?; @@ -347,7 +347,7 @@ fn test_v1_checkpoint_latest_version_by_default() -> DeltaResult<()> { #[test] fn test_v1_checkpoint_specific_version() -> DeltaResult<()> { let (store, _) = new_in_memory_store(); - let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(store.clone()); // 1st commit (version 0) - metadata and protocol actions // Protocol action does not include the v2Checkpoint reader/writer feature. @@ -409,7 +409,7 @@ fn test_v1_checkpoint_specific_version() -> DeltaResult<()> { #[test] fn test_finalize_errors_if_checkpoint_data_iterator_is_not_exhausted() -> DeltaResult<()> { let (store, _) = new_in_memory_store(); - let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(store.clone()); // 1st commit (version 0) - metadata and protocol actions write_commit_to_store( @@ -451,7 +451,7 @@ fn test_finalize_errors_if_checkpoint_data_iterator_is_not_exhausted() -> DeltaR #[test] fn test_v2_checkpoint_supported_table() -> DeltaResult<()> { let (store, _) = new_in_memory_store(); - let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(store.clone()); // 1st commit: adds `fake_path_2` & removes `fake_path_1` write_commit_to_store( @@ -523,7 +523,7 @@ fn test_v2_checkpoint_supported_table() -> DeltaResult<()> { #[test] fn test_no_checkpoint_staged_commits() -> DeltaResult<()> { let (store, _) = new_in_memory_store(); - let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(store.clone()); // normal commit write_commit_to_store( diff --git a/kernel/src/committer.rs b/kernel/src/committer.rs index cd61ca7e6..a5e58db94 100644 --- a/kernel/src/committer.rs +++ b/kernel/src/committer.rs @@ -159,7 +159,6 @@ mod tests { use std::sync::Arc; - use crate::engine::default::executor::tokio::TokioBackgroundExecutor; use crate::engine::default::DefaultEngine; use crate::path::LogRoot; @@ -212,7 +211,7 @@ mod tests { async fn catalog_managed_tables_block_transactions() { let storage = Arc::new(InMemory::new()); let table_root = Url::parse("memory:///").unwrap(); - let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(storage.clone()); let actions = [ r#"{"commitInfo":{"timestamp":12345678900,"inCommitTimestamp":12345678900}}"#, diff --git a/kernel/src/engine/default/filesystem.rs b/kernel/src/engine/default/filesystem.rs index 73d06088c..b524639a7 100644 --- a/kernel/src/engine/default/filesystem.rs +++ b/kernel/src/engine/default/filesystem.rs @@ -274,7 +274,7 @@ mod tests { store.put(&name, data.clone().into()).await.unwrap(); let table_root = Url::parse("memory:///").expect("valid url"); - let engine = DefaultEngine::new(store, Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(store); let files: Vec<_> = engine .storage_handler() .list_from(&table_root.join("_delta_log").unwrap().join("0").unwrap()) @@ -304,7 +304,7 @@ mod tests { let url = Url::from_directory_path(tmp.path()).unwrap(); let store = Arc::new(LocalFileSystem::new()); - let engine = DefaultEngine::new(store, Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(store); let files = engine .storage_handler() .list_from(&url.join("_delta_log").unwrap().join("0").unwrap()) diff --git a/kernel/src/engine/default/mod.rs b/kernel/src/engine/default/mod.rs index 73331fdf9..2f2f5db59 100644 --- a/kernel/src/engine/default/mod.rs +++ b/kernel/src/engine/default/mod.rs @@ -9,7 +9,6 @@ use std::collections::HashMap; use std::sync::Arc; -use self::storage::parse_url_opts; use object_store::DynObjectStore; use url::Url; @@ -42,35 +41,34 @@ pub struct DefaultEngine { evaluation: Arc, } -impl DefaultEngine { - /// Create a new [`DefaultEngine`] instance +impl DefaultEngine { + /// Create a new [`DefaultEngine`] instance with the default executor. + /// + /// Uses `TokioBackgroundExecutor` as the default executor. + /// For custom executors, use [`DefaultEngine::new_with_executor`]. /// /// # Parameters /// - /// - `table_root`: The URL of the table within storage. - /// - `options`: key/value pairs of options to pass to the object store. - /// - `task_executor`: Used to spawn async IO tasks. See [executor::TaskExecutor]. - pub fn try_new( - table_root: &Url, - options: impl IntoIterator, - task_executor: Arc, - ) -> DeltaResult - where - K: AsRef, - V: Into, - { - // table root is the path of the table in the ObjectStore - let (object_store, _table_root) = parse_url_opts(table_root, options)?; - Ok(Self::new(Arc::new(object_store), task_executor)) + /// - `object_store`: The object store to use. + pub fn new(object_store: Arc) -> Self { + Self::new_with_executor( + object_store, + Arc::new(executor::tokio::TokioBackgroundExecutor::new()), + ) } +} - /// Create a new [`DefaultEngine`] instance +impl DefaultEngine { + /// Create a new [`DefaultEngine`] instance with a custom executor. + /// + /// Most users should use [`DefaultEngine::new`] instead. This method is only + /// needed for specialized testing scenarios (e.g., multi-threaded executors). /// /// # Parameters /// /// - `object_store`: The object store to use. /// - `task_executor`: Used to spawn async IO tasks. See [executor::TaskExecutor]. - pub fn new(object_store: Arc, task_executor: Arc) -> Self { + pub fn new_with_executor(object_store: Arc, task_executor: Arc) -> Self { Self { storage: Arc::new(ObjectStoreStorageHandler::new( object_store.clone(), @@ -163,7 +161,6 @@ impl UrlExt for Url { #[cfg(test)] mod tests { - use super::executor::tokio::TokioBackgroundExecutor; use super::*; use crate::engine::tests::test_arrow_engine; use object_store::local::LocalFileSystem; @@ -173,7 +170,7 @@ mod tests { let tmp = tempfile::tempdir().unwrap(); let url = Url::from_directory_path(tmp.path()).unwrap(); let object_store = Arc::new(LocalFileSystem::new()); - let engine = DefaultEngine::new(object_store, Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(object_store); test_arrow_engine(&engine, &url); } diff --git a/kernel/src/engine/default/storage.rs b/kernel/src/engine/default/storage.rs index d5fd93238..dcb06ad18 100644 --- a/kernel/src/engine/default/storage.rs +++ b/kernel/src/engine/default/storage.rs @@ -1,6 +1,5 @@ -use object_store::parse_url_opts as parse_url_opts_object_store; use object_store::path::Path; -use object_store::{Error, ObjectStore}; +use object_store::{self, Error, ObjectStore}; use url::Url; use crate::Error as DeltaError; @@ -11,8 +10,8 @@ use std::sync::{Arc, LazyLock, RwLock}; type ClosureReturn = Result<(Box, Path), Error>; /// This type alias makes it easier to reference the handler closure(s) /// -/// It uses a HashMap which _must_ be converted in our [parse_url_opts] because we -/// cannot use generics in this scenario. +/// It uses a HashMap which _must_ be converted in [store_from_url_opts] +/// because we cannot use generics in this scenario. type HandlerClosure = Arc) -> ClosureReturn + Send + Sync>; /// hashmap containing scheme => handler fn mappings to allow consumers of delta-kernel-rs provide /// their own url opts parsers for different scemes @@ -20,9 +19,9 @@ type Handlers = HashMap; /// The URL_REGISTRY contains the custom URL scheme handlers that will parse URL options static URL_REGISTRY: LazyLock> = LazyLock::new(|| RwLock::new(HashMap::default())); -/// Insert a new URL handler for [parse_url_opts] with the given `scheme`. This allows users to -/// provide their own custom URL handler to plug new [object_store::ObjectStore] instances into -/// delta-kernel +/// Insert a new URL handler for [store_from_url_opts] with the given `scheme`. This allows +/// users to provide their own custom URL handler to plug new [object_store::ObjectStore] +/// instances into delta-kernel, which is used by [store_from_url_opts] to parse the URL. pub fn insert_url_handler( scheme: impl AsRef, handler_closure: HandlerClosure, @@ -36,28 +35,76 @@ pub fn insert_url_handler( Ok(()) } -/// Parse the given URL options to produce a valid and configured [ObjectStore] +/// Create an [`ObjectStore`] from a URL. /// -/// This function will first attempt to use any schemes registered via [insert_url_handler], -/// falling back to the default behavior of [object_store::parse_url_opts] -pub fn parse_url_opts(url: &Url, options: I) -> Result<(Box, Path), Error> +/// Returns an `Arc` ready to use with [`crate::engine::default::DefaultEngine`]. +/// +/// This function checks for custom URL handlers registered via [`insert_url_handler`] +/// before falling back to [`object_store`]'s default behavior. +/// +/// # Example +/// +/// ```rust +/// # use url::Url; +/// # use delta_kernel::engine::default::storage::store_from_url; +/// # use delta_kernel::DeltaResult; +/// # fn example() -> DeltaResult<()> { +/// let url = Url::parse("file:///path/to/table")?; +/// let store = store_from_url(&url)?; +/// # Ok(()) +/// # } +/// ``` +pub fn store_from_url(url: &Url) -> crate::DeltaResult> { + store_from_url_opts(url, std::iter::empty::<(&str, &str)>()) +} + +/// Create an [`ObjectStore`] from a URL with custom options. +/// +/// Returns an `Arc` ready to use with [`crate::engine::default::DefaultEngine`]. +/// +/// This function checks for custom URL handlers registered via [`insert_url_handler`] +/// before falling back to [`object_store`]'s default behavior. +/// +/// # Example +/// +/// ```rust +/// # use url::Url; +/// # use std::collections::HashMap; +/// # use delta_kernel::engine::default::storage::store_from_url_opts; +/// # use delta_kernel::DeltaResult; +/// # fn example() -> DeltaResult<()> { +/// let url = Url::parse("s3://my-bucket/path/to/table")?; +/// let options = HashMap::from([("region", "us-west-2")]); +/// let store = store_from_url_opts(&url, options)?; +/// # Ok(()) +/// # } +/// ``` +pub fn store_from_url_opts( + url: &Url, + options: I, +) -> crate::DeltaResult> where I: IntoIterator, K: AsRef, V: Into, { - if let Ok(handlers) = URL_REGISTRY.read() { + // First attempt to use any schemes registered via insert_url_handler, + // falling back to the default behavior of object_store::parse_url_opts + let (store, _path) = if let Ok(handlers) = URL_REGISTRY.read() { if let Some(handler) = handlers.get(url.scheme()) { - let options: HashMap = HashMap::from_iter( - options - .into_iter() - .map(|(k, v)| (k.as_ref().to_string(), v.into())), - ); - - return handler(url, options); + let options = options + .into_iter() + .map(|(k, v)| (k.as_ref().to_string(), v.into())) + .collect(); + handler(url, options)? + } else { + object_store::parse_url_opts(url, options)? } - } - parse_url_opts_object_store(url, options) + } else { + object_store::parse_url_opts(url, options)? + }; + + Ok(Arc::new(store)) } #[cfg(test)] @@ -110,15 +157,14 @@ mod tests { // Currently constructing an [HdfsObjectStore] won't work if there isn't an actual HDFS // to connect to, so the only way to really verify that we got the object store we // expected is to inspect the `store` on the error v_v - if let Err(store_error) = parse_url_opts(&url, options) { - match store_error { - object_store::Error::Generic { store, source: _ } => { - assert_eq!(store, "HdfsObjectStore"); - } - unexpected => panic!("Unexpected error happened: {unexpected:?}"), + match store_from_url_opts(&url, options) { + Err(crate::Error::ObjectStore(object_store::Error::Generic { store, source: _ })) => { + assert_eq!(store, "HdfsObjectStore"); + } + Err(unexpected) => panic!("Unexpected error happened: {unexpected:?}"), + Ok(_) => { + panic!("Expected to get an error when constructing an HdfsObjectStore, but something didn't work as expected! Either the parse_url_opts_hdfs_native function didn't get called, or the hdfs-native-object-store no longer errors when it cannot connect to HDFS"); } - } else { - panic!("Expected to get an error when constructing an HdfsObjectStore, but something didn't work as expected! Either the parse_url_opts_hdfs_native function didn't get called, or the hdfs-native-object-store no longer errors when it cannot connect to HDFS"); } } } diff --git a/kernel/src/log_compaction/tests.rs b/kernel/src/log_compaction/tests.rs index 3c1940c54..02cdf6998 100644 --- a/kernel/src/log_compaction/tests.rs +++ b/kernel/src/log_compaction/tests.rs @@ -231,13 +231,13 @@ fn test_version_filtering() { #[test] fn test_no_compaction_staged_commits() { use crate::actions::Add; - use crate::engine::default::{executor::tokio::TokioBackgroundExecutor, DefaultEngine}; + use crate::engine::default::DefaultEngine; use object_store::{memory::InMemory, path::Path, ObjectStore}; use std::sync::Arc; // Set up in-memory store let store = Arc::new(InMemory::new()); - let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(store.clone()); // Create basic commits with proper metadata and protocol use crate::actions::{Metadata, Protocol}; diff --git a/kernel/src/log_segment/tests.rs b/kernel/src/log_segment/tests.rs index 6986fc426..ce5d0618c 100644 --- a/kernel/src/log_segment/tests.rs +++ b/kernel/src/log_segment/tests.rs @@ -959,7 +959,7 @@ fn test_checkpoint_batch_with_no_sidecars_returns_none() -> DeltaResult<()> { #[test] fn test_checkpoint_batch_with_sidecars_returns_sidecar_batches() -> DeltaResult<()> { let (store, log_root) = new_in_memory_store(); - let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(store.clone()); let read_schema = get_all_actions_schema().project(&[ADD_NAME, REMOVE_NAME, SIDECAR_NAME])?; add_sidecar_to_store( @@ -999,7 +999,7 @@ fn test_checkpoint_batch_with_sidecars_returns_sidecar_batches() -> DeltaResult< #[test] fn test_checkpoint_batch_with_sidecar_files_that_do_not_exist() -> DeltaResult<()> { let (store, log_root) = new_in_memory_store(); - let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(store.clone()); let checkpoint_batch = sidecar_batch_with_given_paths( vec!["sidecarfile1.parquet", "sidecarfile2.parquet"], @@ -1026,7 +1026,7 @@ fn test_checkpoint_batch_with_sidecar_files_that_do_not_exist() -> DeltaResult<( #[test] fn test_reading_sidecar_files_with_predicate() -> DeltaResult<()> { let (store, log_root) = new_in_memory_store(); - let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(store.clone()); let read_schema = get_all_actions_schema().project(&[ADD_NAME, REMOVE_NAME, SIDECAR_NAME])?; let checkpoint_batch = @@ -1066,7 +1066,7 @@ fn test_reading_sidecar_files_with_predicate() -> DeltaResult<()> { fn test_create_checkpoint_stream_returns_checkpoint_batches_as_is_if_schema_has_no_file_actions( ) -> DeltaResult<()> { let (store, log_root) = new_in_memory_store(); - let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(store.clone()); add_checkpoint_to_store( &store, // Create a checkpoint batch with sidecar actions to verify that the sidecar actions are not read. @@ -1113,7 +1113,7 @@ fn test_create_checkpoint_stream_returns_checkpoint_batches_as_is_if_schema_has_ fn test_create_checkpoint_stream_returns_checkpoint_batches_if_checkpoint_is_multi_part( ) -> DeltaResult<()> { let (store, log_root) = new_in_memory_store(); - let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(store.clone()); // Multi-part checkpoints should never contain sidecar actions. // This test intentionally includes batches with sidecar actions in multi-part checkpoints @@ -1181,7 +1181,7 @@ fn test_create_checkpoint_stream_returns_checkpoint_batches_if_checkpoint_is_mul fn test_create_checkpoint_stream_reads_parquet_checkpoint_batch_without_sidecars() -> DeltaResult<()> { let (store, log_root) = new_in_memory_store(); - let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(store.clone()); add_checkpoint_to_store( &store, @@ -1224,7 +1224,7 @@ fn test_create_checkpoint_stream_reads_parquet_checkpoint_batch_without_sidecars #[test] fn test_create_checkpoint_stream_reads_json_checkpoint_batch_without_sidecars() -> DeltaResult<()> { let (store, log_root) = new_in_memory_store(); - let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(store.clone()); let filename = "00000000000000000010.checkpoint.80a083e8-7026-4e79-81be-64bd76c43a11.json"; @@ -1282,7 +1282,7 @@ fn test_create_checkpoint_stream_reads_json_checkpoint_batch_without_sidecars() fn test_create_checkpoint_stream_reads_checkpoint_file_and_returns_sidecar_batches( ) -> DeltaResult<()> { let (store, log_root) = new_in_memory_store(); - let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(store.clone()); add_checkpoint_to_store( &store, diff --git a/kernel/src/path.rs b/kernel/src/path.rs index e3c094ef1..6cf472459 100644 --- a/kernel/src/path.rs +++ b/kernel/src/path.rs @@ -394,7 +394,6 @@ mod tests { use std::sync::Arc; use super::*; - use crate::engine::default::executor::tokio::TokioBackgroundExecutor; use crate::engine::default::DefaultEngine; use crate::engine::sync::SyncEngine; use crate::utils::test_utils::assert_result_error_with_message; @@ -904,7 +903,7 @@ mod tests { #[tokio::test] async fn test_read_in_commit_timestamp_success() { let store = Arc::new(InMemory::new()); - let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(store.clone()); let table_url = url::Url::parse("memory://test/").unwrap(); // Create a commit file with ICT using add_commit @@ -933,7 +932,7 @@ mod tests { #[tokio::test] async fn test_read_in_commit_timestamp_missing_ict() { let store = Arc::new(InMemory::new()); - let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(store.clone()); let table_url = url::Url::parse("memory://test/").unwrap(); // Create a commit file without ICT diff --git a/kernel/src/snapshot.rs b/kernel/src/snapshot.rs index de6c3e362..4b176a69e 100644 --- a/kernel/src/snapshot.rs +++ b/kernel/src/snapshot.rs @@ -627,7 +627,7 @@ mod tests { // in each test we will modify versions 1 and 2 to test different scenarios fn test_new_from(store: Arc) -> DeltaResult<()> { let url = Url::parse("memory:///")?; - let engine = DefaultEngine::new(store, Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(store); let base_snapshot = Snapshot::builder_for(url.clone()) .at_version(0) .build(&engine)?; @@ -677,10 +677,7 @@ mod tests { // 3. new version > existing version // a. no new log segment let url = Url::parse("memory:///")?; - let engine = DefaultEngine::new( - Arc::new(store.fork()), - Arc::new(TokioBackgroundExecutor::new()), - ); + let engine = DefaultEngine::new(Arc::new(store.fork())); let base_snapshot = Snapshot::builder_for(url.clone()) .at_version(0) .build(&engine)?; @@ -753,7 +750,7 @@ mod tests { // new commits AND request version > end of log let url = Url::parse("memory:///")?; - let engine = DefaultEngine::new(store_3c_i, Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(store_3c_i); let base_snapshot = Snapshot::builder_for(url.clone()) .at_version(0) .build(&engine)?; @@ -797,7 +794,7 @@ mod tests { async fn test_snapshot_new_from_crc() -> Result<(), Box> { let store = Arc::new(InMemory::new()); let url = Url::parse("memory:///")?; - let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(store.clone()); let protocol = |reader_version, writer_version| { json!({ "protocol": { @@ -1041,7 +1038,7 @@ mod tests { async fn test_domain_metadata() -> DeltaResult<()> { let url = Url::parse("memory:///")?; let store = Arc::new(InMemory::new()); - let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(store.clone()); // commit0 // - domain1: not removed @@ -1169,7 +1166,7 @@ mod tests { async fn test_timestamp_with_ict_disabled() -> Result<(), Box> { let store = Arc::new(InMemory::new()); let url = url::Url::parse("memory://test/")?; - let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(store.clone()); // Create a basic commit without ICT enabled let commit0 = create_basic_commit(false, None); @@ -1189,7 +1186,7 @@ mod tests { { let store = Arc::new(InMemory::new()); let url = url::Url::parse("memory://test/")?; - let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(store.clone()); // Create initial commit without ICT let commit0 = create_basic_commit(false, None); @@ -1229,7 +1226,7 @@ mod tests { // Test invalid state where snapshot has enablement version in the future - should error let url = Url::parse("memory:///table2")?; let store = Arc::new(InMemory::new()); - let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(store.clone()); let commit_data = [ json!({ @@ -1278,7 +1275,7 @@ mod tests { // Test missing ICT when it should be present - should error let url = Url::parse("memory:///table3")?; let store = Arc::new(InMemory::new()); - let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(store.clone()); let commit_data = [ create_protocol(true, Some(3)), @@ -1310,7 +1307,7 @@ mod tests { let url = Url::parse("memory:///missing_commit_test")?; let store = Arc::new(InMemory::new()); - let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(store.clone()); // Create initial commit with ICT enabled let commit_data = [ @@ -1365,7 +1362,7 @@ mod tests { // Test the scenario where both checkpoint and commit exist at the same version with ICT enabled. let url = Url::parse("memory:///checkpoint_commit_test")?; let store = Arc::new(InMemory::new()); - let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(store.clone()); // Create 00000000000000000000.json with ICT enabled let commit0_data = [ @@ -1434,7 +1431,7 @@ mod tests { async fn test_try_new_from_empty_log_tail() -> DeltaResult<()> { let store = Arc::new(InMemory::new()); let url = Url::parse("memory:///")?; - let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(store.clone()); // Create initial commit let commit0 = vec![ @@ -1472,7 +1469,7 @@ mod tests { async fn test_try_new_from_latest_commit_preservation() -> DeltaResult<()> { let store = Arc::new(InMemory::new()); let url = Url::parse("memory:///")?; - let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(store.clone()); // Create commits 0-2 let base_commit = vec![ @@ -1549,7 +1546,7 @@ mod tests { async fn test_try_new_from_version_boundary_cases() -> DeltaResult<()> { let store = Arc::new(InMemory::new()); let url = Url::parse("memory:///")?; - let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(store.clone()); // Create commits let base_commit = vec![ diff --git a/kernel/src/snapshot/builder.rs b/kernel/src/snapshot/builder.rs index 860ed443c..c66e8ba4e 100644 --- a/kernel/src/snapshot/builder.rs +++ b/kernel/src/snapshot/builder.rs @@ -121,10 +121,7 @@ mod tests { ) { let table_root = Url::parse("memory:///test_table").unwrap(); let store = Arc::new(InMemory::new()); - let engine = Arc::new(DefaultEngine::new( - store.clone(), - Arc::new(TokioBackgroundExecutor::new()), - )); + let engine = Arc::new(DefaultEngine::new(store.clone())); (engine, store, table_root) } diff --git a/kernel/src/table_changes/mod.rs b/kernel/src/table_changes/mod.rs index 65815538a..ab5478af9 100644 --- a/kernel/src/table_changes/mod.rs +++ b/kernel/src/table_changes/mod.rs @@ -3,14 +3,14 @@ //! # Example //! ```rust //! # use std::sync::Arc; -//! # use test_utils::DefaultEngineExtension; //! # use delta_kernel::engine::default::DefaultEngine; //! # use delta_kernel::expressions::{column_expr, Scalar}; //! # use delta_kernel::{Predicate, Snapshot, SnapshotRef, Error, Engine}; //! # use delta_kernel::table_changes::TableChanges; //! # let path = "./tests/data/table-with-cdf"; -//! # let engine = DefaultEngine::new_local(); //! let url = delta_kernel::try_parse_uri(path)?; +//! # use delta_kernel::engine::default::storage::store_from_url; +//! # let engine = std::sync::Arc::new(DefaultEngine::new(store_from_url(&url)?)); //! // Get the table changes (change data feed) between version 0 and 1 //! let table_changes = TableChanges::try_new(url, engine.as_ref(), 0, Some(1))?; //! @@ -95,14 +95,13 @@ static CDF_FIELDS: LazyLock<[StructField; 3]> = LazyLock::new(|| { /// # Examples /// Get `TableChanges` for versions 0 to 1 (inclusive) /// ```rust -/// # use delta_kernel::engine::default::DefaultEngine; -/// # use test_utils::DefaultEngineExtension; +/// # use delta_kernel::engine::default::{storage::store_from_url, DefaultEngine}; /// # use delta_kernel::{SnapshotRef, Error}; /// # use delta_kernel::table_changes::TableChanges; -/// # let engine = DefaultEngine::new_local(); /// # let path = "./tests/data/table-with-cdf"; -/// let url = delta_kernel::try_parse_uri(path).unwrap(); -/// let table_changes = TableChanges::try_new(url, engine.as_ref(), 0, Some(1))?; +/// let url = delta_kernel::try_parse_uri(path)?; +/// # let engine = DefaultEngine::new(store_from_url(&url)?); +/// let table_changes = TableChanges::try_new(url, &engine, 0, Some(1))?; /// # Ok::<(), Error>(()) /// ```` /// For more details, see the following sections of the protocol: diff --git a/kernel/src/table_changes/scan.rs b/kernel/src/table_changes/scan.rs index fa537e4a5..d5d504cf1 100644 --- a/kernel/src/table_changes/scan.rs +++ b/kernel/src/table_changes/scan.rs @@ -42,15 +42,14 @@ pub struct TableChangesScan { /// Construct a [`TableChangesScan`] from `table_changes` with a given schema and predicate /// ```rust /// # use std::sync::Arc; -/// # use test_utils::DefaultEngineExtension; -/// # use delta_kernel::engine::default::DefaultEngine; /// # use delta_kernel::expressions::{column_expr, Scalar}; /// # use delta_kernel::Predicate; /// # use delta_kernel::table_changes::TableChanges; /// # let path = "./tests/data/table-with-cdf"; -/// # let engine = DefaultEngine::new_local(); /// # let url = delta_kernel::try_parse_uri(path).unwrap(); -/// # let table_changes = TableChanges::try_new(url, engine.as_ref(), 0, Some(1)).unwrap(); +/// # use delta_kernel::engine::default::{storage::store_from_url, DefaultEngine}; +/// # let engine = DefaultEngine::new(store_from_url(&url).unwrap()); +/// # let table_changes = TableChanges::try_new(url, &engine, 0, Some(1)).unwrap(); /// let schema = table_changes /// .schema() /// .project(&["id", "_commit_version"]) diff --git a/kernel/tests/cdf.rs b/kernel/tests/cdf.rs index 07e1fb34f..15a8dd082 100644 --- a/kernel/tests/cdf.rs +++ b/kernel/tests/cdf.rs @@ -6,13 +6,11 @@ use delta_kernel::arrow::datatypes::Schema as ArrowSchema; use itertools::Itertools; use delta_kernel::engine::arrow_conversion::TryFromKernel as _; -use delta_kernel::engine::default::DefaultEngine; use delta_kernel::table_changes::TableChanges; use delta_kernel::{DeltaResult, Error, PredicateRef, Version}; mod common; -use test_utils::DefaultEngineExtension; use test_utils::{load_test_data, to_arrow}; fn read_cdf_for_table( @@ -24,7 +22,7 @@ fn read_cdf_for_table( let test_dir = load_test_data("tests/data", test_name.as_ref()).unwrap(); let test_path = test_dir.path().join(test_name.as_ref()); let test_path = delta_kernel::try_parse_uri(test_path.to_str().expect("table path to string"))?; - let engine = DefaultEngine::new_local(); + let engine = test_utils::create_default_engine(&test_path)?; let table_changes = TableChanges::try_new( test_path, engine.as_ref(), diff --git a/kernel/tests/dv.rs b/kernel/tests/dv.rs index a8def7b29..46c2b9e83 100644 --- a/kernel/tests/dv.rs +++ b/kernel/tests/dv.rs @@ -3,10 +3,8 @@ use std::ops::Add; use std::path::PathBuf; -use delta_kernel::engine::default::DefaultEngine; use delta_kernel::scan::ScanResult; use delta_kernel::{DeltaResult, Snapshot}; -use test_utils::DefaultEngineExtension; use itertools::Itertools; use test_log::test; @@ -30,7 +28,7 @@ fn count_total_scan_rows( fn dv_table() -> Result<(), Box> { let path = std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/"))?; let url = url::Url::from_directory_path(path).unwrap(); - let engine = DefaultEngine::new_local(); + let engine = test_utils::create_default_engine(&url)?; let snapshot = Snapshot::builder_for(url).build(engine.as_ref())?; let scan = snapshot.scan_builder().build()?; @@ -45,7 +43,7 @@ fn dv_table() -> Result<(), Box> { fn non_dv_table() -> Result<(), Box> { let path = std::fs::canonicalize(PathBuf::from("./tests/data/table-without-dv-small/"))?; let url = url::Url::from_directory_path(path).unwrap(); - let engine = DefaultEngine::new_local(); + let engine = test_utils::create_default_engine(&url)?; let snapshot = Snapshot::builder_for(url).build(engine.as_ref())?; let scan = snapshot.scan_builder().build()?; diff --git a/kernel/tests/golden_tables.rs b/kernel/tests/golden_tables.rs index 758902e7f..c4a7c94fd 100644 --- a/kernel/tests/golden_tables.rs +++ b/kernel/tests/golden_tables.rs @@ -212,12 +212,8 @@ fn setup_golden_table( let table_path = test_path.join("delta"); let url = delta_kernel::try_parse_uri(table_path.to_str().expect("table path to string")) .expect("table from uri"); - let engine = DefaultEngine::try_new( - &url, - std::iter::empty::<(&str, &str)>(), - Arc::new(TokioBackgroundExecutor::new()), - ) - .unwrap(); + let engine = Arc::try_unwrap(test_utils::create_default_engine(&url).unwrap()) + .expect("Arc should have single reference"); let expected_path = test_path.join("expected"); let expected_path = expected_path.exists().then_some(expected_path); (engine, url, expected_path, test_dir) diff --git a/kernel/tests/hdfs.rs b/kernel/tests/hdfs.rs index da39c9c77..334a59d40 100644 --- a/kernel/tests/hdfs.rs +++ b/kernel/tests/hdfs.rs @@ -7,15 +7,12 @@ // cargo test --features integration-test --test hdfs #![cfg(all(feature = "integration-test", not(target_os = "windows")))] -use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; -use delta_kernel::engine::default::DefaultEngine; use delta_kernel::Snapshot; use hdfs_native::{Client, WriteOptions}; use hdfs_native_object_store::minidfs::MiniDfs; use std::collections::HashSet; use std::fs; use std::path::Path; -use std::sync::Arc; extern crate walkdir; use walkdir::WalkDir; @@ -67,13 +64,9 @@ async fn read_table_version_hdfs() -> Result<(), Box> { let url_str = format!("{}/my-delta-table", minidfs.url); let url = url::Url::parse(&url_str).unwrap(); - let engine = DefaultEngine::try_new( - &url, - std::iter::empty::<(&str, &str)>(), - Arc::new(TokioBackgroundExecutor::new()), - )?; + let engine = test_utils::create_default_engine(&url)?; - let snapshot = Snapshot::builder_for(url).build(&engine)?; + let snapshot = Snapshot::builder_for(url).build(engine.as_ref())?; assert_eq!(snapshot.version(), 1); Ok(()) diff --git a/kernel/tests/log_tail.rs b/kernel/tests/log_tail.rs index c4f602fd4..1d91d626d 100644 --- a/kernel/tests/log_tail.rs +++ b/kernel/tests/log_tail.rs @@ -34,10 +34,7 @@ fn setup_test() -> ( ) { let storage = Arc::new(InMemory::new()); let table_root = Url::parse("memory:///").unwrap(); - let engine = Arc::new(DefaultEngine::new( - storage.clone(), - Arc::new(TokioBackgroundExecutor::new()), - )); + let engine = Arc::new(DefaultEngine::new(storage.clone())); (storage, engine, table_root) } diff --git a/kernel/tests/read.rs b/kernel/tests/read.rs index 0a8c474a3..b96a13647 100644 --- a/kernel/tests/read.rs +++ b/kernel/tests/read.rs @@ -8,7 +8,6 @@ use delta_kernel::arrow::compute::{concat_batches, filter_record_batch}; use delta_kernel::arrow::datatypes::{Int64Type, Schema as ArrowSchema}; use delta_kernel::arrow::record_batch::RecordBatch; use delta_kernel::engine::arrow_conversion::TryFromKernel as _; -use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; use delta_kernel::engine::default::DefaultEngine; use delta_kernel::expressions::{ column_expr, column_pred, Expression as Expr, ExpressionRef, Predicate as Pred, @@ -79,10 +78,7 @@ async fn single_commit_two_add_files() -> Result<(), Box> .await?; let location = Url::parse("memory:///")?; - let engine = Arc::new(DefaultEngine::new( - storage.clone(), - Arc::new(TokioBackgroundExecutor::new()), - )); + let engine = Arc::new(DefaultEngine::new(storage.clone())); let expected_data = vec![batch.clone(), batch]; @@ -134,7 +130,7 @@ async fn two_commits() -> Result<(), Box> { .await?; let location = Url::parse("memory:///").unwrap(); - let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(storage.clone()); let expected_data = vec![batch.clone(), batch]; @@ -187,7 +183,7 @@ async fn remove_action() -> Result<(), Box> { .await?; let location = Url::parse("memory:///").unwrap(); - let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(storage.clone()); let expected_data = vec![batch]; @@ -258,10 +254,7 @@ async fn stats() -> Result<(), Box> { .await?; let location = Url::parse("memory:///").unwrap(); - let engine = Arc::new(DefaultEngine::new( - storage.clone(), - Arc::new(TokioBackgroundExecutor::new()), - )); + let engine = Arc::new(DefaultEngine::new(storage.clone())); let snapshot = Snapshot::builder_for(location).build(engine.as_ref())?; // The first file has id between 1 and 3; the second has id between 5 and 7. For each operator, @@ -447,11 +440,7 @@ fn read_table_data( let path = std::fs::canonicalize(PathBuf::from(path))?; let predicate = predicate.map(Arc::new); let url = url::Url::from_directory_path(path).unwrap(); - let engine = Arc::new(DefaultEngine::try_new( - &url, - std::iter::empty::<(&str, &str)>(), - Arc::new(TokioBackgroundExecutor::new()), - )?); + let engine = test_utils::create_default_engine(&url)?; let snapshot = Snapshot::builder_for(url.clone()).build(engine.as_ref())?; @@ -1073,10 +1062,7 @@ async fn predicate_on_non_nullable_partition_column() -> Result<(), Box Result<(), Box Result<(), Box Result<(), Box) -> DeltaResult Result<(), Box> { // Step 3: Create engine and snapshot let store = Arc::new(LocalFileSystem::new()); let url = Url::from_directory_path(table_path).unwrap(); - let engine = Arc::new(DefaultEngine::new( - store, - Arc::new(TokioBackgroundExecutor::new()), - )); + let engine = Arc::new(DefaultEngine::new(store)); let snapshot = Snapshot::builder_for(url) .build(engine.as_ref()) diff --git a/mem-test/tests/dhat_large_table_log.rs b/mem-test/tests/dhat_large_table_log.rs index a1186c8bf..20be2e2d8 100644 --- a/mem-test/tests/dhat_large_table_log.rs +++ b/mem-test/tests/dhat_large_table_log.rs @@ -7,7 +7,6 @@ use std::io::Write; use std::path::Path; use std::sync::Arc; -use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; use delta_kernel::engine::default::DefaultEngine; use delta_kernel::Snapshot; use object_store::local::LocalFileSystem; @@ -111,7 +110,7 @@ fn test_dhat_large_table_log() -> Result<(), Box> { let _profiler = dhat::Profiler::builder().testing().build(); let store = Arc::new(LocalFileSystem::new()); let url = Url::from_directory_path(table_path).unwrap(); - let engine = DefaultEngine::new(store, Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(store); let snapshot = Snapshot::builder_for(url) .build(&engine) diff --git a/test-utils/src/lib.rs b/test-utils/src/lib.rs index 2283b61d5..99e96e2fc 100644 --- a/test-utils/src/lib.rs +++ b/test-utils/src/lib.rs @@ -11,7 +11,7 @@ use delta_kernel::arrow::error::ArrowError; use delta_kernel::arrow::util::pretty::pretty_format_batches; use delta_kernel::engine::arrow_data::ArrowEngineData; use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; -use delta_kernel::engine::default::executor::TaskExecutor; +use delta_kernel::engine::default::storage::store_from_url; use delta_kernel::engine::default::DefaultEngine; use delta_kernel::parquet::arrow::arrow_writer::ArrowWriter; use delta_kernel::parquet::file::properties::WriterProperties; @@ -194,27 +194,14 @@ pub fn into_record_batch(engine_data: Box) -> RecordBatch { .into() } -/// Simple extension trait with helpful methods (just constuctor for now) for creating/using -/// DefaultEngine in our tests. +/// Helper to create a DefaultEngine with the default executor for tests. /// -/// Note: we implment this extension trait here so that we can import this trait (from test-utils -/// crate) and get to use all these test-only helper methods from places where we don't have access -pub trait DefaultEngineExtension { - type Executor: TaskExecutor; - - fn new_local() -> Arc>; -} - -impl DefaultEngineExtension for DefaultEngine { - type Executor = TokioBackgroundExecutor; - - fn new_local() -> Arc> { - let object_store = Arc::new(LocalFileSystem::new()); - Arc::new(DefaultEngine::new( - object_store, - TokioBackgroundExecutor::new().into(), - )) - } +/// Uses `TokioBackgroundExecutor` as the default executor. +pub fn create_default_engine( + table_root: &url::Url, +) -> DeltaResult>> { + let store = store_from_url(table_root)?; + Ok(Arc::new(DefaultEngine::new(store))) } // setup default engine with in-memory (local_directory=None) or local fs (local_directory=Some(Url)) @@ -236,8 +223,7 @@ pub fn engine_store_setup( Url::parse(format!("{dir}{table_name}/").as_str()).expect("valid url"), ), }; - let executor = Arc::new(TokioBackgroundExecutor::new()); - let engine = DefaultEngine::new(Arc::clone(&storage), executor); + let engine = DefaultEngine::new(Arc::clone(&storage)); (storage, engine, url) } diff --git a/uc-catalog/src/lib.rs b/uc-catalog/src/lib.rs index f41cd9293..6faeaac9c 100644 --- a/uc-catalog/src/lib.rs +++ b/uc-catalog/src/lib.rs @@ -118,7 +118,6 @@ impl<'a> UCCatalog<'a> { mod tests { use std::env; - use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; use delta_kernel::engine::default::DefaultEngine; use super::*; @@ -175,11 +174,10 @@ mod tests { let table_url = Url::parse(&table_uri)?; let (store, path) = object_store::parse_url_opts(&table_url, options)?; - let store: Arc<_> = store.into(); info!("created object store: {:?}\npath: {:?}\n", store, path); - let engine = DefaultEngine::new(store, Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(store.into()); // read table let snapshot = catalog