Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 137 additions & 1 deletion python/distrdf/backends/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ endif()
if(ROOT_test_distrdf_dask_FOUND)
list(APPEND DISTRDF_BACKENDS_IN_USE "dask")
list(APPEND DISTRDF_RESOURCE_LOCKS "dask_resource_lock")
#list(APPEND DISTRDF_FIXTURES "dask_cluster")
endif()

if(ROOT_test_distrdf_pyspark_FOUND)
Expand All @@ -27,6 +28,37 @@ if(ROOT_test_distrdf_pyspark_FOUND)
list(APPEND DISTRDF_ENVIRONMENT_VARS ${PYSPARK_ENV_VARS})
list(APPEND DISTRDF_BACKENDS_IN_USE "spark")
list(APPEND DISTRDF_RESOURCE_LOCKS "spark_resource_lock")

find_program(SPARK_CLASS_CMD NAMES "spark-class")
if (SPARK_CLASS_CMD-NOTFOUND)
message(FATAL "Cannot find spark-class.")
else()
message(WARNING "Found ${SPARK_CLASS_CMD}")
endif()

ROOTTEST_ADD_TEST(start_spark_master
EXEC ${CMAKE_CURRENT_SOURCE_DIR}/scripts/start_spark_master.sh
OPTS ${SPARK_CLASS_CMD}
ENVIRONMENT "${PYSPARK_ENV_VARS}"
FIXTURES_SETUP spark_master)
ROOTTEST_ADD_TEST(stop_spark_master
EXEC ${CMAKE_CURRENT_SOURCE_DIR}/scripts/kill_pid.sh
OPTS ${CMAKE_CURRENT_BINARY_DIR}/start_spark_master.log
FIXTURES_CLEANUP spark_master)

ROOTTEST_ADD_TEST(start_spark_worker
EXEC ${CMAKE_CURRENT_SOURCE_DIR}/scripts/start_spark_worker.sh
OPTS ${SPARK_CLASS_CMD} ${CMAKE_CURRENT_BINARY_DIR}
ENVIRONMENT "${PYSPARK_ENV_VARS}"
FIXTURES_REQUIRED spark_master
FIXTURES_SETUP spark_worker)
ROOTTEST_ADD_TEST(stop_spark_worker
EXEC ${CMAKE_CURRENT_SOURCE_DIR}/scripts/kill_pid.sh
OPTS ${CMAKE_CURRENT_BINARY_DIR}/start_spark_worker.log
FIXTURES_REQUIRED spark_master
FIXTURES_CLEANUP spark_worker)

list(APPEND DISTRDF_FIXTURES "spark_worker")
endif()


Expand All @@ -41,11 +73,13 @@ list(APPEND DISTRDF_ENVIRONMENT_VARS DISTRDF_BACKENDS_IN_USE=${DISTRDF_BACKENDS_
# setting the property to 4. The test also locks a resource for the creation of
# the clusters, depending on how many backends are active. The resource lock
# is shared with the "common" folder and the tutorials of the main repository.
message(WARNING ${DISTRDF_FIXTURES})
ROOTTEST_ADD_TEST(test_all
MACRO test_all.py
ENVIRONMENT "${DISTRDF_ENVIRONMENT_VARS}"
TIMEOUT 1200
PROPERTIES PROCESSORS 4)
PROPERTIES PROCESSORS 4
FIXTURES_REQUIRED ${DISTRDF_FIXTURES})

# This test has to take multiple resource locks. This means that they should
# be passed as a cmake list (semi-colon separated strings) after the
Expand All @@ -56,3 +90,105 @@ ROOTTEST_ADD_TEST(test_all
# function directly here, so we can be sure that the PROPERTIES argument
# will be properly parsed.
set_tests_properties(roottest-python-distrdf-backends-test_all PROPERTIES RESOURCE_LOCK "${DISTRDF_RESOURCE_LOCKS}")

ROOTTEST_ADD_TEST(test_distrdf_backend_mytestflag
MACRO check_backend.py
ENVIRONMENT "${DISTRDF_ENVIRONMENT_VARS}"
TIMEOUT 1200
PROPERTIES RESOURCE_LOCK spark_resource_lock
FIXTURES_REQUIRED ${DISTRDF_FIXTURES})

ROOTTEST_ADD_TEST(test_distrdf_cloned_actions_mytestflag
MACRO check_cloned_actions.py
ENVIRONMENT "${DISTRDF_ENVIRONMENT_VARS}"
TIMEOUT 1200
PROPERTIES RESOURCE_LOCK spark_resource_lock
FIXTURES_REQUIRED ${DISTRDF_FIXTURES})

ROOTTEST_ADD_TEST(test_distrdf_distribute_cppcode_mytestflag
MACRO check_distribute_cppcode.py
ENVIRONMENT "${DISTRDF_ENVIRONMENT_VARS}"
TIMEOUT 1200
PROPERTIES RESOURCE_LOCK spark_resource_lock
FIXTURES_REQUIRED ${DISTRDF_FIXTURES})

ROOTTEST_ADD_TEST(test_distrdf_distribute_headers_sharedlibs_files_mytestflag
MACRO check_distribute_headers_sharedlibs_files.py
ENVIRONMENT "${DISTRDF_ENVIRONMENT_VARS}"
TIMEOUT 1200
PROPERTIES RESOURCE_LOCK spark_resource_lock
FIXTURES_REQUIRED ${DISTRDF_FIXTURES})


ROOTTEST_ADD_TEST(test_distrdf_explicit_api_mytestflag
MACRO check_explicit_api.py
ENVIRONMENT "${DISTRDF_ENVIRONMENT_VARS}"
TIMEOUT 1200
PROPERTIES RESOURCE_LOCK spark_resource_lock
FIXTURES_REQUIRED ${DISTRDF_FIXTURES})

ROOTTEST_ADD_TEST(test_distrdf_friend_trees_alignment_mytestflag
MACRO check_friend_trees_alignment.py
ENVIRONMENT "${DISTRDF_ENVIRONMENT_VARS}"
TIMEOUT 1200
PROPERTIES RESOURCE_LOCK spark_resource_lock
FIXTURES_REQUIRED ${DISTRDF_FIXTURES})

ROOTTEST_ADD_TEST(test_distrdf_friend_trees_mytestflag
MACRO check_friend_trees.py
ENVIRONMENT "${DISTRDF_ENVIRONMENT_VARS}"
TIMEOUT 1200
PROPERTIES RESOURCE_LOCK spark_resource_lock
FIXTURES_REQUIRED ${DISTRDF_FIXTURES})


ROOTTEST_ADD_TEST(test_distrdf_histo_write_mytestflag
MACRO check_histo_write.py
ENVIRONMENT "${DISTRDF_ENVIRONMENT_VARS}"
TIMEOUT 1200
PROPERTIES RESOURCE_LOCK spark_resource_lock
FIXTURES_REQUIRED ${DISTRDF_FIXTURES})


ROOTTEST_ADD_TEST(test_distrdf_inv_mass_mytestflag
MACRO check_inv_mass.py
ENVIRONMENT "${DISTRDF_ENVIRONMENT_VARS}"
TIMEOUT 1200
PROPERTIES RESOURCE_LOCK spark_resource_lock
FIXTURES_REQUIRED ${DISTRDF_FIXTURES})


ROOTTEST_ADD_TEST(test_distrdf_live_visualize_mytestflag
MACRO check_live_visualize.py
ENVIRONMENT "${DISTRDF_ENVIRONMENT_VARS}"
TIMEOUT 1200
PROPERTIES RESOURCE_LOCK spark_resource_lock
FIXTURES_REQUIRED ${DISTRDF_FIXTURES})

ROOTTEST_ADD_TEST(test_distrdf_missing_values_mytestflag
MACRO check_missing_values.py
ENVIRONMENT "${DISTRDF_ENVIRONMENT_VARS}"
TIMEOUT 1200
PROPERTIES RESOURCE_LOCK spark_resource_lock
FIXTURES_REQUIRED ${DISTRDF_FIXTURES})

ROOTTEST_ADD_TEST(test_distrdf_reducer_merge_mytestflag
MACRO check_reducer_merge.py
ENVIRONMENT "${DISTRDF_ENVIRONMENT_VARS}"
TIMEOUT 1200
PROPERTIES RESOURCE_LOCK spark_resource_lock
FIXTURES_REQUIRED ${DISTRDF_FIXTURES})

ROOTTEST_ADD_TEST(test_distrdf_rungraphs_mytestflag
MACRO check_rungraphs.py
ENVIRONMENT "${DISTRDF_ENVIRONMENT_VARS}"
TIMEOUT 1200
PROPERTIES RESOURCE_LOCK spark_resource_lock
FIXTURES_REQUIRED ${DISTRDF_FIXTURES})

ROOTTEST_ADD_TEST(test_distrdf_variations_mytestflag
MACRO check_variations.py
ENVIRONMENT "${DISTRDF_ENVIRONMENT_VARS}"
TIMEOUT 1200
PROPERTIES RESOURCE_LOCK spark_resource_lock
FIXTURES_REQUIRED ${DISTRDF_FIXTURES})
11 changes: 7 additions & 4 deletions python/distrdf/backends/check_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
import DistRDF
import ROOT

from pathlib import Path
DATA_DIR = str(Path().absolute().parent / "data/ttree")


class TestBackendInit:
"""
Expand Down Expand Up @@ -132,7 +135,7 @@ def test_histo_from_empty_root_file(self, payload):
connection, _ = payload
# Create an RDataFrame from a file with an empty tree
rdf = ROOT.RDataFrame(
"empty", "../data/ttree/empty.root", executor=connection)
"empty", f"{DATA_DIR}/empty.root", executor=connection)
histo = rdf.Histo1D(("empty", "empty", 10, 0, 10), "mybranch")

# Get entries in the histogram, raises error
Expand All @@ -149,10 +152,10 @@ def test_count_with_some_empty_trees(self, payload):
connection, _ = payload
treenames = [f"tree_{i}" for i in range(3)]
filenames = [
f"../data/ttree/distrdf_roottest_check_backend_{i}.root" for i in range(3)]
f"{DATA_DIR}/distrdf_roottest_check_backend_{i}.root" for i in range(3)]

empty_treename = "empty"
empty_filename = "../data/ttree/empty.root"
empty_filename = f"{DATA_DIR}/empty.root"

# Create the final dataset with some empty trees
final_treenames = []
Expand Down Expand Up @@ -185,7 +188,7 @@ def test_count_with_same_tree_repeated(self, payload):
"""
connection, _ = payload
treename = "tree_0"
filename = "../data/ttree/distrdf_roottest_check_backend_0.root"
filename = f"{DATA_DIR}/distrdf_roottest_check_backend_0.root"
filenames = [filename] * 3

rdf = ROOT.RDataFrame(treename, filenames, executor=connection)
Expand Down
5 changes: 4 additions & 1 deletion python/distrdf/backends/check_cloned_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@

import ROOT

from pathlib import Path
DATA_DIR = str(Path().absolute().parent / "data/ttree")


class TestAsNumpy:
"""
Expand All @@ -18,7 +21,7 @@ def test_clone_asnumpyresult(self, payload, nparts):
"""

datasetname = "Events"
filename = "../data/ttree/distrdf_roottest_check_cloned_actions_asnumpy.root"
filename = f"{DATA_DIR}/distrdf_roottest_check_cloned_actions_asnumpy.root"
connection, _ = payload
distrdf = ROOT.RDataFrame(datasetname, filename, executor=connection, npartitions=nparts)

Expand Down
6 changes: 5 additions & 1 deletion python/distrdf/backends/check_definepersample.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,16 @@

import DistRDF

from pathlib import Path
DATA_DIR = str(Path().absolute().parent / "data/ttree")


class TestDefinePerSample:
"""Check the working of merge operations in the reducer function."""

samples = ["sample1", "sample2", "sample3"]
filenames = [
f"../data/ttree/distrdf_roottest_definepersample_{sample}.root" for sample in samples]
f"{DATA_DIR}/distrdf_roottest_definepersample_{sample}.root" for sample in samples]
maintreename = "Events"

def test_definepersample_simple(self, payload):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@
import pytest
import math

from pathlib import Path
FILES_DIR = str(Path().absolute().parent / "test_files")
HEADERS_DIR = str(Path().absolute().parent / "test_headers")
LIBS_DIR = str(Path().absolute().parent / "test_shared_libs")


class TestInterfaceHeadersLibrariesFiles:
"""
Expand All @@ -17,26 +22,26 @@ def _create_shared_libs(self):
[
"g++",
"-fPIC",
"../test_shared_libs/mysource6.cpp",
f"{LIBS_DIR}/mysource6.cpp",
"-shared",
"-o",
"../test_shared_libs/mylib6.so",
f"{LIBS_DIR}/mylib6.so",
]
)
subprocess.run(
[
"g++",
"-fPIC",
"../test_shared_libs/mysource7.cpp",
f"{LIBS_DIR}/mysource7.cpp",
"-shared",
"-o",
"../test_shared_libs/mylib7.so",
f"{LIBS_DIR}/mylib7.so",
]
)

def _remove_shared_libs(self):
os.remove("../test_shared_libs/mylib6.so")
os.remove("../test_shared_libs/mylib7.so")
os.remove(f"{LIBS_DIR}/mylib6.so")
os.remove(f"{LIBS_DIR}/mylib7.so")

def _check_rdf_histos_5(self, rdf):
# This filters out all numbers less than 5
Expand Down Expand Up @@ -92,7 +97,7 @@ def _distribute_header_check_filter_and_histo(self, connection):
rdf = ROOT.RDataFrame(10, executor=connection)

ROOT.RDF.Experimental.Distributed.DistributeHeaders(
"../test_headers/header1.hxx"
f"{HEADERS_DIR}/header1.hxx"
)

self._check_rdf_histos_5(rdf)
Expand All @@ -103,7 +108,7 @@ def _extend_ROOT_include_path(self, connection):
specified in `DistRDF.include_headers()` so references between headers
are correctly solved.
"""
header_folder = "../test_headers/headers_folder"
header_folder = f"{HEADERS_DIR}/headers_folder"

# Create an RDataFrame with 100 integers from 0 to 99
rdf = ROOT.RDataFrame(100, executor=connection)
Expand Down Expand Up @@ -135,10 +140,10 @@ def _distribute_shared_lib_check_filter_and_histo(self, connection):
rdf = ROOT.RDataFrame(15, executor=connection)

ROOT.RDF.Experimental.Distributed.DistributeHeaders(
"../test_shared_libs/myheader7.h"
f"{LIBS_DIR}/myheader7.h"
)
ROOT.RDF.Experimental.Distributed.DistributeSharedLibs(
"../test_shared_libs/mylib7.so"
f"{LIBS_DIR}/mylib7.so"
)
self._check_rdf_histos_7(rdf)

Expand All @@ -150,10 +155,10 @@ def _distribute_shared_lib_folder_check_filter_and_histo(self, connection):
rdf = ROOT.RDataFrame(15, executor=connection)

ROOT.RDF.Experimental.Distributed.DistributeHeaders(
"../test_shared_libs/myheader6.h"
f"{LIBS_DIR}/myheader6.h"
)
ROOT.RDF.Experimental.Distributed.DistributeSharedLibs(
"../test_shared_libs/")
f"{LIBS_DIR}/")
self._check_rdf_histos_6(rdf)

def _distribute_multiple_shared_lib_check_filter_and_histo(
Expand All @@ -166,10 +171,10 @@ def _distribute_multiple_shared_lib_check_filter_and_histo(
rdf = ROOT.RDataFrame(15, executor=connection)

ROOT.RDF.Experimental.Distributed.DistributeHeaders(
["../test_shared_libs/myheader7.h", "../test_shared_libs/myheader6.h"]
[f"{LIBS_DIR}/myheader7.h", f"{LIBS_DIR}/myheader6.h"]
)
ROOT.RDF.Experimental.Distributed.DistributeSharedLibs(
["../test_shared_libs/mylib7.so", "../test_shared_libs/mylib6.so"]
[f"{LIBS_DIR}/mylib7.so", f"{LIBS_DIR}/mylib6.so"]
)
self._check_rdf_histos_6(rdf)
self._check_rdf_histos_7(rdf)
Expand All @@ -184,10 +189,10 @@ def _distribute_multiple_shared_lib_folder_check_filter_and_histo(
rdf = ROOT.RDataFrame(15, executor=connection)

ROOT.RDF.Experimental.Distributed.DistributeHeaders(
["../test_shared_libs/myheader7.h", "../test_shared_libs/myheader6.h"]
[f"{LIBS_DIR}/myheader7.h", f"{LIBS_DIR}/myheader6.h"]
)
ROOT.RDF.Experimental.Distributed.DistributeSharedLibs(
"../test_shared_libs/")
f"{LIBS_DIR}/")
self._check_rdf_histos_6(rdf)
self._check_rdf_histos_7(rdf)

Expand All @@ -198,7 +203,7 @@ def _distribute_single_file(self, connection, backend):
rdf = ROOT.RDataFrame(10, executor=connection)

ROOT.RDF.Experimental.Distributed.DistributeFiles(
"../test_files/file.txt")
f"{FILES_DIR}/file.txt")

if backend == "dask":

Expand Down Expand Up @@ -233,7 +238,7 @@ def _distribute_multiple_files(self, connection, backend):
rdf = ROOT.RDataFrame(10, executor=connection)

ROOT.RDF.Experimental.Distributed.DistributeFiles(
["../test_files/file.txt", "../test_files/file_1.txt"]
[f"{FILES_DIR}/file.txt", f"{FILES_DIR}/file_1.txt"]
)

if backend == "dask":
Expand Down
4 changes: 3 additions & 1 deletion python/distrdf/backends/check_explicit_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

import DistRDF

from pathlib import Path
DATA_DIR = str(Path().absolute().parent / "data/ttree")

class TestExplicitAPI:
"""
Expand Down Expand Up @@ -38,7 +40,7 @@ def test_rungraphs_3histos(self, payload):
"""
# Create a test file for processing
treename = "tree"
filename = "../data/ttree/distrdf_roottest_check_rungraphs.root"
filename = f"{DATA_DIR}/distrdf_roottest_check_rungraphs.root"
nentries = 10000
connection, backend = payload
if backend == "dask":
Expand Down
Loading