Skip to content

Commit c07c2d7

Browse files
authored
Optimize the auto num_proc calculation of operators in ray mode (#789)
* Optimize the auto num_proc calculation of operators in ray mode * set concurrency to none for cpu operator, using the default autoscaler of ray to ensure performance
1 parent c08e110 commit c07c2d7

29 files changed

+733
-142
lines changed

data_juicer/config/config.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
from data_juicer.utils.constant import RAY_JOB_ENV_VAR
2929
from data_juicer.utils.logger_utils import setup_logger
3030
from data_juicer.utils.mm_utils import SpecialTokens
31+
from data_juicer.utils.ray_utils import is_ray_mode
3132

3233
global_cfg = None
3334
global_parser = None
@@ -749,7 +750,6 @@ def init_setup_from_cfg(cfg: Namespace, load_configs_only=False):
749750
"audio_key": cfg.get("audio_key", "audios"),
750751
"video_key": cfg.get("video_key", "videos"),
751752
"image_bytes_key": cfg.get("image_bytes_key", "image_bytes"),
752-
"num_proc": cfg.get("np", None),
753753
"turbo": cfg.get("turbo", False),
754754
"skip_op_error": cfg.get("skip_op_error", True),
755755
"work_dir": cfg.work_dir,
@@ -758,6 +758,8 @@ def init_setup_from_cfg(cfg: Namespace, load_configs_only=False):
758758
"video_special_token": cfg.get("video_special_token", SpecialTokens.video),
759759
"eoc_special_token": cfg.get("eoc_special_token", SpecialTokens.eoc),
760760
}
761+
if not is_ray_mode():
762+
op_attrs.update({"num_proc": cfg.get("np", None)})
761763
cfg.process = update_op_attr(cfg.process, op_attrs)
762764

763765
return cfg

data_juicer/core/data/ray_dataset.py

Lines changed: 11 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
from __future__ import annotations
22

33
import os
4-
import sys
54
from functools import partial
65
from typing import Any, Dict, List, Literal, Optional, Union
76

@@ -16,7 +15,6 @@
1615
from data_juicer.utils.constant import Fields
1716
from data_juicer.utils.file_utils import is_remote_path
1817
from data_juicer.utils.lazy_loader import LazyLoader
19-
from data_juicer.utils.process_utils import calculate_np
2018
from data_juicer.utils.resource_utils import cuda_device_count
2119
from data_juicer.utils.webdataset_utils import _custom_default_decoder
2220

@@ -148,27 +146,16 @@ def process(self, operators, *, exporter=None, checkpointer=None, tracer=None) -
148146
return self
149147
if not isinstance(operators, list):
150148
operators = [operators]
149+
150+
from data_juicer.utils.process_utils import calculate_ray_np
151+
152+
calculate_ray_np(operators)
153+
151154
for op in operators:
152155
self._run_single_op(op)
153156
return self
154157

155158
def _run_single_op(self, op):
156-
# TODO: optimize auto proc
157-
auto_parallel = False
158-
if op.num_proc:
159-
op_proc = op.num_proc
160-
else:
161-
auto_parallel = True
162-
op_proc = sys.maxsize
163-
auto_op_proc = calculate_np(op._name, op.mem_required, op.cpu_required, op.use_cuda(), op.gpu_required)
164-
op_proc = min(op_proc, auto_op_proc)
165-
166-
# use ray default parallelism in cpu mode if op.num_proc is not specified
167-
if op.use_cuda() or not auto_parallel:
168-
logger.info(f"Op [{op._name}] running with number of procs:{op_proc}")
169-
170-
num_gpus = op.gpu_required if op.gpu_required else get_num_gpus(op, op_proc)
171-
172159
if op._name in TAGGING_OPS.modules and Fields.meta not in self.data.columns():
173160

174161
def process_batch_arrow(table: pyarrow.Table):
@@ -193,8 +180,8 @@ def process_batch_arrow(table: pyarrow.Table):
193180
fn_constructor_kwargs=op_kwargs,
194181
batch_size=batch_size,
195182
num_cpus=op.cpu_required,
196-
num_gpus=num_gpus,
197-
concurrency=op_proc,
183+
num_gpus=op.gpu_required,
184+
concurrency=op.num_proc,
198185
batch_format="pyarrow",
199186
)
200187
else:
@@ -203,9 +190,7 @@ def process_batch_arrow(table: pyarrow.Table):
203190
batch_size=batch_size,
204191
batch_format="pyarrow",
205192
num_cpus=op.cpu_required,
206-
concurrency=(
207-
None if auto_parallel else op_proc
208-
), # use ray default parallelism in cpu mode if num_proc is not specified
193+
concurrency=op.num_proc,
209194
)
210195
elif isinstance(op, Filter):
211196
columns = self.data.columns()
@@ -229,8 +214,8 @@ def process_batch_arrow(table: pyarrow.Table):
229214
fn_constructor_kwargs=op_kwargs,
230215
batch_size=batch_size,
231216
num_cpus=op.cpu_required,
232-
num_gpus=num_gpus,
233-
concurrency=op_proc,
217+
num_gpus=op.gpu_required,
218+
concurrency=op.num_proc,
234219
batch_format="pyarrow",
235220
)
236221
else:
@@ -239,9 +224,7 @@ def process_batch_arrow(table: pyarrow.Table):
239224
batch_size=batch_size,
240225
batch_format="pyarrow",
241226
num_cpus=op.cpu_required,
242-
concurrency=(
243-
None if auto_parallel else op_proc
244-
), # use ray default parallelism in cpu mode if num_proc is not specified
227+
concurrency=op.num_proc,
245228
)
246229
if op.stats_export_path is not None:
247230
self.data.write_json(op.stats_export_path, force_ascii=False)

data_juicer/ops/base_op.py

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from data_juicer.utils.mm_utils import SpecialTokens, size_to_bytes
99
from data_juicer.utils.model_utils import free_models
1010
from data_juicer.utils.process_utils import calculate_np
11+
from data_juicer.utils.ray_utils import is_ray_mode
1112
from data_juicer.utils.registry import Registry
1213
from data_juicer.utils.resource_utils import is_cuda_available
1314

@@ -191,10 +192,10 @@ def __init__(self, *args, **kwargs):
191192
self.accelerator = self._accelerator
192193

193194
# parameters to determine the number of procs for this op
194-
self.num_proc = kwargs.get("num_proc", None)
195-
self.cpu_required = kwargs.get("cpu_required", 1)
196-
self.gpu_required = kwargs.get("gpu_required", 0)
197-
self.mem_required = kwargs.get("mem_required", 0)
195+
self.num_proc = kwargs.get("num_proc", -1) # -1 means automatic calculation of concurrency
196+
self.cpu_required = kwargs.get("cpu_required", None)
197+
self.gpu_required = kwargs.get("gpu_required", None)
198+
self.mem_required = kwargs.get("mem_required", None)
198199
if isinstance(self.mem_required, str):
199200
self.mem_required = size_to_bytes(self.mem_required) / 1024**3
200201

@@ -215,6 +216,12 @@ def __init__(self, *args, **kwargs):
215216
method = wrap_func_with_nested_access(method)
216217
setattr(self, name, method)
217218

219+
def use_auto_proc(self):
220+
if is_ray_mode() and not self.use_cuda(): # ray task
221+
return self.num_proc == -1
222+
else:
223+
return not self.num_proc or self.num_proc == -1
224+
218225
def is_batched_op(self):
219226
return self._batched_op
220227

@@ -228,8 +235,10 @@ def runtime_np(self):
228235
# Local import to avoid logger being serialized in multiprocessing
229236
from loguru import logger
230237

231-
op_proc = calculate_np(self._name, self.mem_required, self.cpu_required, self.use_cuda(), self.gpu_required)
232-
if self.num_proc is not None:
238+
op_proc = calculate_np(
239+
self._name, self.mem_required, self.cpu_required or 1, self.use_cuda(), self.gpu_required
240+
)
241+
if not self.use_auto_proc():
233242
op_proc = min(op_proc, self.num_proc)
234243
logger.debug(f"Op [{self._name}] running with number of procs:{op_proc}")
235244
return op_proc

0 commit comments

Comments
 (0)