diff --git a/python/distrdf/backends/CMakeLists.txt b/python/distrdf/backends/CMakeLists.txt index f4a242cc09..8744062a08 100644 --- a/python/distrdf/backends/CMakeLists.txt +++ b/python/distrdf/backends/CMakeLists.txt @@ -5,6 +5,7 @@ endif() if(ROOT_test_distrdf_dask_FOUND) list(APPEND DISTRDF_BACKENDS_IN_USE "dask") list(APPEND DISTRDF_RESOURCE_LOCKS "dask_resource_lock") + #list(APPEND DISTRDF_FIXTURES "dask_cluster") endif() if(ROOT_test_distrdf_pyspark_FOUND) @@ -27,6 +28,37 @@ if(ROOT_test_distrdf_pyspark_FOUND) list(APPEND DISTRDF_ENVIRONMENT_VARS ${PYSPARK_ENV_VARS}) list(APPEND DISTRDF_BACKENDS_IN_USE "spark") list(APPEND DISTRDF_RESOURCE_LOCKS "spark_resource_lock") + + find_program(SPARK_CLASS_CMD NAMES "spark-class") + if (SPARK_CLASS_CMD-NOTFOUND) + message(FATAL "Cannot find spark-class.") + else() + message(WARNING "Found ${SPARK_CLASS_CMD}") + endif() + + ROOTTEST_ADD_TEST(start_spark_master + EXEC ${CMAKE_CURRENT_SOURCE_DIR}/scripts/start_spark_master.sh + OPTS ${SPARK_CLASS_CMD} + ENVIRONMENT "${PYSPARK_ENV_VARS}" + FIXTURES_SETUP spark_master) + ROOTTEST_ADD_TEST(stop_spark_master + EXEC ${CMAKE_CURRENT_SOURCE_DIR}/scripts/kill_pid.sh + OPTS ${CMAKE_CURRENT_BINARY_DIR}/start_spark_master.log + FIXTURES_CLEANUP spark_master) + + ROOTTEST_ADD_TEST(start_spark_worker + EXEC ${CMAKE_CURRENT_SOURCE_DIR}/scripts/start_spark_worker.sh + OPTS ${SPARK_CLASS_CMD} ${CMAKE_CURRENT_BINARY_DIR} + ENVIRONMENT "${PYSPARK_ENV_VARS}" + FIXTURES_REQUIRED spark_master + FIXTURES_SETUP spark_worker) + ROOTTEST_ADD_TEST(stop_spark_worker + EXEC ${CMAKE_CURRENT_SOURCE_DIR}/scripts/kill_pid.sh + OPTS ${CMAKE_CURRENT_BINARY_DIR}/start_spark_worker.log + FIXTURES_REQUIRED spark_master + FIXTURES_CLEANUP spark_worker) + + list(APPEND DISTRDF_FIXTURES "spark_worker") endif() @@ -41,11 +73,13 @@ list(APPEND DISTRDF_ENVIRONMENT_VARS DISTRDF_BACKENDS_IN_USE=${DISTRDF_BACKENDS_ # setting the property to 4. The test also locks a resource for the creation of # the clusters, depending on how many backends are active. The resource lock # is shared with the "common" folder and the tutorials of the main repository. +message(WARNING ${DISTRDF_FIXTURES}) ROOTTEST_ADD_TEST(test_all MACRO test_all.py ENVIRONMENT "${DISTRDF_ENVIRONMENT_VARS}" TIMEOUT 1200 - PROPERTIES PROCESSORS 4) + PROPERTIES PROCESSORS 4 + FIXTURES_REQUIRED ${DISTRDF_FIXTURES}) # This test has to take multiple resource locks. This means that they should # be passed as a cmake list (semi-colon separated strings) after the @@ -56,3 +90,105 @@ ROOTTEST_ADD_TEST(test_all # function directly here, so we can be sure that the PROPERTIES argument # will be properly parsed. set_tests_properties(roottest-python-distrdf-backends-test_all PROPERTIES RESOURCE_LOCK "${DISTRDF_RESOURCE_LOCKS}") + +ROOTTEST_ADD_TEST(test_distrdf_backend_mytestflag + MACRO check_backend.py + ENVIRONMENT "${DISTRDF_ENVIRONMENT_VARS}" + TIMEOUT 1200 + PROPERTIES RESOURCE_LOCK spark_resource_lock + FIXTURES_REQUIRED ${DISTRDF_FIXTURES}) + +ROOTTEST_ADD_TEST(test_distrdf_cloned_actions_mytestflag + MACRO check_cloned_actions.py + ENVIRONMENT "${DISTRDF_ENVIRONMENT_VARS}" + TIMEOUT 1200 + PROPERTIES RESOURCE_LOCK spark_resource_lock + FIXTURES_REQUIRED ${DISTRDF_FIXTURES}) + +ROOTTEST_ADD_TEST(test_distrdf_distribute_cppcode_mytestflag + MACRO check_distribute_cppcode.py + ENVIRONMENT "${DISTRDF_ENVIRONMENT_VARS}" + TIMEOUT 1200 + PROPERTIES RESOURCE_LOCK spark_resource_lock + FIXTURES_REQUIRED ${DISTRDF_FIXTURES}) + +ROOTTEST_ADD_TEST(test_distrdf_distribute_headers_sharedlibs_files_mytestflag + MACRO check_distribute_headers_sharedlibs_files.py + ENVIRONMENT "${DISTRDF_ENVIRONMENT_VARS}" + TIMEOUT 1200 + PROPERTIES RESOURCE_LOCK spark_resource_lock + FIXTURES_REQUIRED ${DISTRDF_FIXTURES}) + + +ROOTTEST_ADD_TEST(test_distrdf_explicit_api_mytestflag + MACRO check_explicit_api.py + ENVIRONMENT "${DISTRDF_ENVIRONMENT_VARS}" + TIMEOUT 1200 + PROPERTIES RESOURCE_LOCK spark_resource_lock + FIXTURES_REQUIRED ${DISTRDF_FIXTURES}) + +ROOTTEST_ADD_TEST(test_distrdf_friend_trees_alignment_mytestflag + MACRO check_friend_trees_alignment.py + ENVIRONMENT "${DISTRDF_ENVIRONMENT_VARS}" + TIMEOUT 1200 + PROPERTIES RESOURCE_LOCK spark_resource_lock + FIXTURES_REQUIRED ${DISTRDF_FIXTURES}) + +ROOTTEST_ADD_TEST(test_distrdf_friend_trees_mytestflag + MACRO check_friend_trees.py + ENVIRONMENT "${DISTRDF_ENVIRONMENT_VARS}" + TIMEOUT 1200 + PROPERTIES RESOURCE_LOCK spark_resource_lock + FIXTURES_REQUIRED ${DISTRDF_FIXTURES}) + + +ROOTTEST_ADD_TEST(test_distrdf_histo_write_mytestflag + MACRO check_histo_write.py + ENVIRONMENT "${DISTRDF_ENVIRONMENT_VARS}" + TIMEOUT 1200 + PROPERTIES RESOURCE_LOCK spark_resource_lock + FIXTURES_REQUIRED ${DISTRDF_FIXTURES}) + + +ROOTTEST_ADD_TEST(test_distrdf_inv_mass_mytestflag + MACRO check_inv_mass.py + ENVIRONMENT "${DISTRDF_ENVIRONMENT_VARS}" + TIMEOUT 1200 + PROPERTIES RESOURCE_LOCK spark_resource_lock + FIXTURES_REQUIRED ${DISTRDF_FIXTURES}) + + +ROOTTEST_ADD_TEST(test_distrdf_live_visualize_mytestflag + MACRO check_live_visualize.py + ENVIRONMENT "${DISTRDF_ENVIRONMENT_VARS}" + TIMEOUT 1200 + PROPERTIES RESOURCE_LOCK spark_resource_lock + FIXTURES_REQUIRED ${DISTRDF_FIXTURES}) + +ROOTTEST_ADD_TEST(test_distrdf_missing_values_mytestflag + MACRO check_missing_values.py + ENVIRONMENT "${DISTRDF_ENVIRONMENT_VARS}" + TIMEOUT 1200 + PROPERTIES RESOURCE_LOCK spark_resource_lock + FIXTURES_REQUIRED ${DISTRDF_FIXTURES}) + +ROOTTEST_ADD_TEST(test_distrdf_reducer_merge_mytestflag + MACRO check_reducer_merge.py + ENVIRONMENT "${DISTRDF_ENVIRONMENT_VARS}" + TIMEOUT 1200 + PROPERTIES RESOURCE_LOCK spark_resource_lock + FIXTURES_REQUIRED ${DISTRDF_FIXTURES}) + +ROOTTEST_ADD_TEST(test_distrdf_rungraphs_mytestflag + MACRO check_rungraphs.py + ENVIRONMENT "${DISTRDF_ENVIRONMENT_VARS}" + TIMEOUT 1200 + PROPERTIES RESOURCE_LOCK spark_resource_lock + FIXTURES_REQUIRED ${DISTRDF_FIXTURES}) + +ROOTTEST_ADD_TEST(test_distrdf_variations_mytestflag + MACRO check_variations.py + ENVIRONMENT "${DISTRDF_ENVIRONMENT_VARS}" + TIMEOUT 1200 + PROPERTIES RESOURCE_LOCK spark_resource_lock + FIXTURES_REQUIRED ${DISTRDF_FIXTURES}) \ No newline at end of file diff --git a/python/distrdf/backends/check_backend.py b/python/distrdf/backends/check_backend.py index 6f9fcbda82..fa90a9082a 100644 --- a/python/distrdf/backends/check_backend.py +++ b/python/distrdf/backends/check_backend.py @@ -4,6 +4,9 @@ import DistRDF import ROOT +from pathlib import Path +DATA_DIR = str(Path().absolute().parent / "data/ttree") + class TestBackendInit: """ @@ -132,7 +135,7 @@ def test_histo_from_empty_root_file(self, payload): connection, _ = payload # Create an RDataFrame from a file with an empty tree rdf = ROOT.RDataFrame( - "empty", "../data/ttree/empty.root", executor=connection) + "empty", f"{DATA_DIR}/empty.root", executor=connection) histo = rdf.Histo1D(("empty", "empty", 10, 0, 10), "mybranch") # Get entries in the histogram, raises error @@ -149,10 +152,10 @@ def test_count_with_some_empty_trees(self, payload): connection, _ = payload treenames = [f"tree_{i}" for i in range(3)] filenames = [ - f"../data/ttree/distrdf_roottest_check_backend_{i}.root" for i in range(3)] + f"{DATA_DIR}/distrdf_roottest_check_backend_{i}.root" for i in range(3)] empty_treename = "empty" - empty_filename = "../data/ttree/empty.root" + empty_filename = f"{DATA_DIR}/empty.root" # Create the final dataset with some empty trees final_treenames = [] @@ -185,7 +188,7 @@ def test_count_with_same_tree_repeated(self, payload): """ connection, _ = payload treename = "tree_0" - filename = "../data/ttree/distrdf_roottest_check_backend_0.root" + filename = f"{DATA_DIR}/distrdf_roottest_check_backend_0.root" filenames = [filename] * 3 rdf = ROOT.RDataFrame(treename, filenames, executor=connection) diff --git a/python/distrdf/backends/check_cloned_actions.py b/python/distrdf/backends/check_cloned_actions.py index 734ffbef64..c5fba8fa90 100644 --- a/python/distrdf/backends/check_cloned_actions.py +++ b/python/distrdf/backends/check_cloned_actions.py @@ -3,6 +3,9 @@ import ROOT +from pathlib import Path +DATA_DIR = str(Path().absolute().parent / "data/ttree") + class TestAsNumpy: """ @@ -18,7 +21,7 @@ def test_clone_asnumpyresult(self, payload, nparts): """ datasetname = "Events" - filename = "../data/ttree/distrdf_roottest_check_cloned_actions_asnumpy.root" + filename = f"{DATA_DIR}/distrdf_roottest_check_cloned_actions_asnumpy.root" connection, _ = payload distrdf = ROOT.RDataFrame(datasetname, filename, executor=connection, npartitions=nparts) diff --git a/python/distrdf/backends/check_definepersample.py b/python/distrdf/backends/check_definepersample.py index 85fa7740f8..0d724dd791 100644 --- a/python/distrdf/backends/check_definepersample.py +++ b/python/distrdf/backends/check_definepersample.py @@ -4,12 +4,16 @@ import DistRDF +from pathlib import Path +DATA_DIR = str(Path().absolute().parent / "data/ttree") + + class TestDefinePerSample: """Check the working of merge operations in the reducer function.""" samples = ["sample1", "sample2", "sample3"] filenames = [ - f"../data/ttree/distrdf_roottest_definepersample_{sample}.root" for sample in samples] + f"{DATA_DIR}/distrdf_roottest_definepersample_{sample}.root" for sample in samples] maintreename = "Events" def test_definepersample_simple(self, payload): diff --git a/python/distrdf/backends/check_distribute_headers_sharedlibs_files.py b/python/distrdf/backends/check_distribute_headers_sharedlibs_files.py index d92d44935f..73540aebdb 100644 --- a/python/distrdf/backends/check_distribute_headers_sharedlibs_files.py +++ b/python/distrdf/backends/check_distribute_headers_sharedlibs_files.py @@ -6,6 +6,11 @@ import pytest import math +from pathlib import Path +FILES_DIR = str(Path().absolute().parent / "test_files") +HEADERS_DIR = str(Path().absolute().parent / "test_headers") +LIBS_DIR = str(Path().absolute().parent / "test_shared_libs") + class TestInterfaceHeadersLibrariesFiles: """ @@ -17,26 +22,26 @@ def _create_shared_libs(self): [ "g++", "-fPIC", - "../test_shared_libs/mysource6.cpp", + f"{LIBS_DIR}/mysource6.cpp", "-shared", "-o", - "../test_shared_libs/mylib6.so", + f"{LIBS_DIR}/mylib6.so", ] ) subprocess.run( [ "g++", "-fPIC", - "../test_shared_libs/mysource7.cpp", + f"{LIBS_DIR}/mysource7.cpp", "-shared", "-o", - "../test_shared_libs/mylib7.so", + f"{LIBS_DIR}/mylib7.so", ] ) def _remove_shared_libs(self): - os.remove("../test_shared_libs/mylib6.so") - os.remove("../test_shared_libs/mylib7.so") + os.remove(f"{LIBS_DIR}/mylib6.so") + os.remove(f"{LIBS_DIR}/mylib7.so") def _check_rdf_histos_5(self, rdf): # This filters out all numbers less than 5 @@ -92,7 +97,7 @@ def _distribute_header_check_filter_and_histo(self, connection): rdf = ROOT.RDataFrame(10, executor=connection) ROOT.RDF.Experimental.Distributed.DistributeHeaders( - "../test_headers/header1.hxx" + f"{HEADERS_DIR}/header1.hxx" ) self._check_rdf_histos_5(rdf) @@ -103,7 +108,7 @@ def _extend_ROOT_include_path(self, connection): specified in `DistRDF.include_headers()` so references between headers are correctly solved. """ - header_folder = "../test_headers/headers_folder" + header_folder = f"{HEADERS_DIR}/headers_folder" # Create an RDataFrame with 100 integers from 0 to 99 rdf = ROOT.RDataFrame(100, executor=connection) @@ -135,10 +140,10 @@ def _distribute_shared_lib_check_filter_and_histo(self, connection): rdf = ROOT.RDataFrame(15, executor=connection) ROOT.RDF.Experimental.Distributed.DistributeHeaders( - "../test_shared_libs/myheader7.h" + f"{LIBS_DIR}/myheader7.h" ) ROOT.RDF.Experimental.Distributed.DistributeSharedLibs( - "../test_shared_libs/mylib7.so" + f"{LIBS_DIR}/mylib7.so" ) self._check_rdf_histos_7(rdf) @@ -150,10 +155,10 @@ def _distribute_shared_lib_folder_check_filter_and_histo(self, connection): rdf = ROOT.RDataFrame(15, executor=connection) ROOT.RDF.Experimental.Distributed.DistributeHeaders( - "../test_shared_libs/myheader6.h" + f"{LIBS_DIR}/myheader6.h" ) ROOT.RDF.Experimental.Distributed.DistributeSharedLibs( - "../test_shared_libs/") + f"{LIBS_DIR}/") self._check_rdf_histos_6(rdf) def _distribute_multiple_shared_lib_check_filter_and_histo( @@ -166,10 +171,10 @@ def _distribute_multiple_shared_lib_check_filter_and_histo( rdf = ROOT.RDataFrame(15, executor=connection) ROOT.RDF.Experimental.Distributed.DistributeHeaders( - ["../test_shared_libs/myheader7.h", "../test_shared_libs/myheader6.h"] + [f"{LIBS_DIR}/myheader7.h", f"{LIBS_DIR}/myheader6.h"] ) ROOT.RDF.Experimental.Distributed.DistributeSharedLibs( - ["../test_shared_libs/mylib7.so", "../test_shared_libs/mylib6.so"] + [f"{LIBS_DIR}/mylib7.so", f"{LIBS_DIR}/mylib6.so"] ) self._check_rdf_histos_6(rdf) self._check_rdf_histos_7(rdf) @@ -184,10 +189,10 @@ def _distribute_multiple_shared_lib_folder_check_filter_and_histo( rdf = ROOT.RDataFrame(15, executor=connection) ROOT.RDF.Experimental.Distributed.DistributeHeaders( - ["../test_shared_libs/myheader7.h", "../test_shared_libs/myheader6.h"] + [f"{LIBS_DIR}/myheader7.h", f"{LIBS_DIR}/myheader6.h"] ) ROOT.RDF.Experimental.Distributed.DistributeSharedLibs( - "../test_shared_libs/") + f"{LIBS_DIR}/") self._check_rdf_histos_6(rdf) self._check_rdf_histos_7(rdf) @@ -198,7 +203,7 @@ def _distribute_single_file(self, connection, backend): rdf = ROOT.RDataFrame(10, executor=connection) ROOT.RDF.Experimental.Distributed.DistributeFiles( - "../test_files/file.txt") + f"{FILES_DIR}/file.txt") if backend == "dask": @@ -233,7 +238,7 @@ def _distribute_multiple_files(self, connection, backend): rdf = ROOT.RDataFrame(10, executor=connection) ROOT.RDF.Experimental.Distributed.DistributeFiles( - ["../test_files/file.txt", "../test_files/file_1.txt"] + [f"{FILES_DIR}/file.txt", f"{FILES_DIR}/file_1.txt"] ) if backend == "dask": diff --git a/python/distrdf/backends/check_explicit_api.py b/python/distrdf/backends/check_explicit_api.py index ba47ec2315..ef4bfb7b94 100644 --- a/python/distrdf/backends/check_explicit_api.py +++ b/python/distrdf/backends/check_explicit_api.py @@ -4,6 +4,8 @@ import DistRDF +from pathlib import Path +DATA_DIR = str(Path().absolute().parent / "data/ttree") class TestExplicitAPI: """ @@ -38,7 +40,7 @@ def test_rungraphs_3histos(self, payload): """ # Create a test file for processing treename = "tree" - filename = "../data/ttree/distrdf_roottest_check_rungraphs.root" + filename = f"{DATA_DIR}/distrdf_roottest_check_rungraphs.root" nentries = 10000 connection, backend = payload if backend == "dask": diff --git a/python/distrdf/backends/check_friend_trees.py b/python/distrdf/backends/check_friend_trees.py index d91ca144fa..357c6e6544 100644 --- a/python/distrdf/backends/check_friend_trees.py +++ b/python/distrdf/backends/check_friend_trees.py @@ -4,6 +4,9 @@ from DistRDF.Backends import Dask +from pathlib import Path +DATA_DIR = str(Path().absolute().parent / "data/ttree") + def check_histograms(h_parent, h_friend): """Check equality of histograms in tests""" # Both trees have the same number of entries, i.e. 10000 @@ -22,8 +25,8 @@ def check_histograms(h_parent, h_friend): class TestDaskFriendTrees: """Integration tests to check the working of DistRDF with friend trees""" - main_filename = "../data/ttree/distrdf_roottest_check_friend_trees_main.root" - friend_filename = "../data/ttree/distrdf_roottest_check_friend_trees_friend.root" + main_filename = f"{DATA_DIR}/distrdf_roottest_check_friend_trees_main.root" + friend_filename = f"{DATA_DIR}/distrdf_roottest_check_friend_trees_friend.root" def test_tchain_with_friend_tchain_histo(self, payload): """ @@ -84,9 +87,9 @@ def test_friends_tchain_noname_add_fullpath_addfriend_alias(self, payload): chainFriend = ROOT.TChain() chain.Add( - "../data/ttree/distrdf_roottest_check_friend_trees_7584.root/randomNumbers") + f"{DATA_DIR}/distrdf_roottest_check_friend_trees_7584.root/randomNumbers") chainFriend.Add( - "../data/ttree/distrdf_roottest_check_friend_trees_7584.root/randomNumbersBis") + f"{DATA_DIR}/distrdf_roottest_check_friend_trees_7584.root/randomNumbersBis") chain.AddFriend(chainFriend, "myfriend") diff --git a/python/distrdf/backends/check_friend_trees_alignment.py b/python/distrdf/backends/check_friend_trees_alignment.py index f4d98591a9..d51c4346a3 100644 --- a/python/distrdf/backends/check_friend_trees_alignment.py +++ b/python/distrdf/backends/check_friend_trees_alignment.py @@ -4,10 +4,13 @@ from DistRDF.Backends import Dask +from pathlib import Path +DATA_DIR = str(Path().absolute().parent / "data/ttree") + TREENAMES = [ f"distrdf_roottest_check_friend_trees_alignment_{i}" for i in range(1, 7)] FILENAMES = [ - f"../data/ttree/distrdf_roottest_check_friend_trees_alignment_{i}.root" for i in range(1, 7)] + f"{DATA_DIR}/distrdf_roottest_check_friend_trees_alignment_{i}.root" for i in range(1, 7)] def create_chain(): diff --git a/python/distrdf/backends/check_histo_write.py b/python/distrdf/backends/check_histo_write.py index 45681662cb..e45f3ce4c8 100644 --- a/python/distrdf/backends/check_histo_write.py +++ b/python/distrdf/backends/check_histo_write.py @@ -6,6 +6,9 @@ from DistRDF.Backends import Dask +from pathlib import Path +DATA_DIR = str(Path().absolute().parent / "data/ttree") + class TestDaskHistoWrite: """ Integration tests to check writing histograms to a `TFile` distributedly. @@ -26,7 +29,7 @@ def test_write_histo(self, payload): with ROOT.TFile("out_file.root", "recreate") as outfile: # We can reuse the same dataset from another test treename = "T" - filename = "../data/ttree/distrdf_roottest_check_friend_trees_main.root" + filename = f"{DATA_DIR}/distrdf_roottest_check_friend_trees_main.root" # Create a DistRDF RDataFrame with the parent and the friend trees connection, _ = payload df = ROOT.RDataFrame(treename, filename, executor=connection) diff --git a/python/distrdf/backends/check_missing_values.py b/python/distrdf/backends/check_missing_values.py index 1033096e3f..082162ce6e 100644 --- a/python/distrdf/backends/check_missing_values.py +++ b/python/distrdf/backends/check_missing_values.py @@ -2,6 +2,8 @@ import ROOT +from pathlib import Path +DATA_DIR = str(Path().absolute().parent / "data/ttree") class TestMissingValues: """Tests of dealing with missing values in the input dataset.""" @@ -13,9 +15,9 @@ def test_defaults_and_missing(self, payload): """ filenames = [ # 10k entries, defining b1, b2, b3 (Int_t), all always equal to 42 - f"../data/ttree/distrdf_roottest_check_rungraphs.root", + f"{DATA_DIR}/distrdf_roottest_check_rungraphs.root", # 100 entries defining 'v' (Double_t) - f"../data/ttree/distrdf_roottest_check_reducer_merge_1.root", + f"{DATA_DIR}/distrdf_roottest_check_reducer_merge_1.root", ] connection, _ = payload df = ROOT.RDataFrame("tree", filenames, executor=connection) diff --git a/python/distrdf/backends/check_reducer_merge.py b/python/distrdf/backends/check_reducer_merge.py index b42a7ee770..c7c7e1be79 100644 --- a/python/distrdf/backends/check_reducer_merge.py +++ b/python/distrdf/backends/check_reducer_merge.py @@ -9,6 +9,17 @@ from DistRDF.Backends import Dask +from pathlib import Path +DATA_DIR = str(Path().absolute().parent / "data/ttree") + +def get_root_dir(): + try: + from pyspark.files import SparkFiles + raise RuntimeError(SparkFiles.getRootDirectory()) + except: + pass + + class TestReducerMerge: """Check the working of merge operations in the reducer function.""" @@ -312,7 +323,7 @@ def test_distributed_asnumpy_lazy(self, payload): npy = npy_lazy.GetValue() self.check_npy_dict(npy) - def check_snapshot_df(self, snapdf, snapfilename): + def check_snapshot_df(self, snapdf, snapfilename, final_dir): # Count the rows in the snapshotted dataframe snapcount = snapdf.Count() @@ -321,9 +332,9 @@ def check_snapshot_df(self, snapdf, snapfilename): # Retrieve list of file from the snapshotted dataframe input_files = snapdf.proxied_node.inputfiles # Create list of supposed filenames for the intermediary files - tmp_files = [f"{snapfilename}_0.root", f"{snapfilename}_1.root"] + tmp_files = [f"{final_dir}/{snapfilename}_0.root", f"{final_dir}/{snapfilename}_1.root"] # Check that the two lists are the same - assert input_files == tmp_files + assert input_files == tmp_files, f"{input_files}!={tmp_files}" # Check that the intermediary .root files were created with the right # names, then remove them because they are not necessary for filename in tmp_files: @@ -334,6 +345,9 @@ def test_distributed_snapshot(self, payload): """Test support for `Snapshot` in distributed backend""" # A simple dataframe with ten sequential numbers from 0 to 9 connection, _ = payload + spark_local_dir = connection.getConf().get("spark.local.dir") + spark_app_id = connection.getConf().get("spark.app.id") + final_dir = os.path.join(spark_local_dir, spark_app_id, "0") df = ROOT.RDataFrame(10, executor=connection) df = df.Define("x", "rdfentry_") @@ -341,7 +355,7 @@ def test_distributed_snapshot(self, payload): # Snapshot to two files, build a ROOT.TChain with them and retrieve a # Dask.RDataFrame snapdf = df.Snapshot("snapTree", "snapFile.root") - self.check_snapshot_df(snapdf, "snapFile") + self.check_snapshot_df(snapdf, "snapFile", final_dir) def test_distributed_snapshot_columnlist(self, payload): """ @@ -360,10 +374,12 @@ def test_distributed_snapshot_columnlist(self, payload): ) expectedcolumns = ["a", "b"] df.Snapshot("snapTree_columnlist", "distrdf_dask_snapfile_columnlist.root", expectedcolumns) - + spark_local_dir = connection.getConf().get("spark.local.dir") + spark_app_id = connection.getConf().get("spark.app.id") + final_dir = os.path.join(spark_local_dir, spark_app_id, "0") # Create a traditional RDF from the snapshotted files to retrieve the # list of columns - tmp_files = ["distrdf_dask_snapfile_columnlist_0.root", "distrdf_dask_snapfile_columnlist_1.root"] + tmp_files = [f"{final_dir}/distrdf_dask_snapfile_columnlist_0.root", f"{final_dir}/distrdf_dask_snapfile_columnlist_1.root"] rdf = ROOT.RDataFrame("snapTree_columnlist", tmp_files) snapcolumns = [str(column) for column in rdf.GetColumnNames()] @@ -376,6 +392,9 @@ def test_distributed_snapshot_lazy(self, payload): """Test that `Snapshot` can be still called lazily in distributed mode""" # A simple dataframe with ten sequential numbers from 0 to 9 connection, _ = payload + spark_local_dir = connection.getConf().get("spark.local.dir") + spark_app_id = connection.getConf().get("spark.app.id") + final_dir = os.path.join(spark_local_dir, spark_app_id, "0") df = ROOT.RDataFrame(10, executor=connection) df = df.Define("x", "rdfentry_") @@ -386,7 +405,7 @@ def test_distributed_snapshot_lazy(self, payload): assert snap_lazy.proxied_node.value is None snapdf = snap_lazy.GetValue() - self.check_snapshot_df(snapdf, "snapFile_lazy") + self.check_snapshot_df(snapdf, "snapFile_lazy", final_dir) def test_redefine_one_column(self, payload): """Test that values of one column can be properly redefined.""" @@ -409,7 +428,7 @@ def test_distributed_stddev(self, payload): # Create dataset with fixed series of entries treename = "tree" - filename = "../data/ttree/distrdf_roottest_check_reducer_merge_1.root" + filename = f"{DATA_DIR}/distrdf_roottest_check_reducer_merge_1.root" connection, _ = payload df = ROOT.RDataFrame(treename, filename, executor=connection) @@ -424,7 +443,7 @@ def test_distributed_stats(self, payload): """Test support for the Stats action.""" # Create dataset with fixed series of entries treename = "tree" - filename = "../data/ttree/distrdf_roottest_check_reducer_merge_1.root" + filename = f"{DATA_DIR}/distrdf_roottest_check_reducer_merge_1.root" connection, _ = payload df = ROOT.RDataFrame(treename, filename, executor=connection) diff --git a/python/distrdf/backends/check_rungraphs.py b/python/distrdf/backends/check_rungraphs.py index 025d259bb1..1787a86811 100644 --- a/python/distrdf/backends/check_rungraphs.py +++ b/python/distrdf/backends/check_rungraphs.py @@ -4,6 +4,8 @@ import DistRDF +from pathlib import Path +DATA_DIR = str(Path().absolute().parent / "data/ttree") class TestRunGraphs: """Tests usage of RunGraphs function with Dask backend""" @@ -14,7 +16,7 @@ def test_rungraphs_dask_3histos(self, payload): """ # Create a test file for processing treename = "tree" - filename = "../data/ttree/distrdf_roottest_check_rungraphs.root" + filename = f"{DATA_DIR}/distrdf_roottest_check_rungraphs.root" nentries = 10000 connection, _ = payload df = ROOT.RDataFrame(treename, filename, executor=connection, npartitions=2) diff --git a/python/distrdf/backends/conftest.py b/python/distrdf/backends/conftest.py index 2b3bcf7c5d..d974538026 100644 --- a/python/distrdf/backends/conftest.py +++ b/python/distrdf/backends/conftest.py @@ -11,6 +11,7 @@ import pyspark import ROOT +from pathlib import Path def create_dask_connection(): connection = Client(LocalCluster( @@ -23,8 +24,10 @@ def cleanup_dask_connection(connection): def create_spark_connection(): - conf = {"spark.master": "local[2]", "spark.driver.memory": "4g", - "spark.app.name": "roottest-distrdf-spark"} + conf = {"spark.app.name": "roottest-distrdf-spark", + "spark.master": "spark://127.0.0.1:7077", + "spark.cores.max": "2", + "spark.local.dir": str(Path().absolute())} sparkconf = pyspark.SparkConf().setAll(conf.items()) connection = pyspark.SparkContext(conf=sparkconf) return connection diff --git a/python/distrdf/backends/scripts/kill_pid.sh b/python/distrdf/backends/scripts/kill_pid.sh new file mode 100755 index 0000000000..e2a040375a --- /dev/null +++ b/python/distrdf/backends/scripts/kill_pid.sh @@ -0,0 +1,10 @@ +#! /usr/bin/env bash + +logfile=$1 +if [ ! -f $logfile ]; then + echo "log file not found, cannot retrieve PID!" + exit 1 +fi + +PID=$(< $logfile) +kill -9 $PID diff --git a/python/distrdf/backends/scripts/start_spark_master.sh b/python/distrdf/backends/scripts/start_spark_master.sh new file mode 100755 index 0000000000..b4eb01b3dd --- /dev/null +++ b/python/distrdf/backends/scripts/start_spark_master.sh @@ -0,0 +1,7 @@ +#! /usr/bin/env bash + +spark_class_cmd=$1 +$spark_class_cmd org.apache.spark.deploy.master.Master -h 127.0.0.1 2>~/spark_master.err 1>~/spark_master.out 0~/spark_worker.err 1>~/spark_worker.out 0