Skip to content

Commit 7f9b7d8

Browse files
committed
Merge remote-tracking branch 'origin/main' into dj_agents
2 parents 2ab9c61 + c07c2d7 commit 7f9b7d8

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+1580
-151
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ Besides, our paper is also updated to [v3](https://arxiv.org/abs/2309.02033).
107107
through the [sandbox laboratory](docs/Sandbox.md), and providing features such as feedback loops and visualization, so that you can better understand and improve your data and models. Many effect-proven datasets and models have been derived from DJ, in scenarios such as pre-training, text-to-video and image-to-text generation.
108108
![Data-in-the-loop](https://img.alicdn.com/imgextra/i2/O1CN017U7Zz31Y7XtCJ5GOz_!!6000000003012-0-tps-3640-1567.jpg)
109109

110-
## Doucmentation
110+
## Documentation
111111

112112
- Tutorial
113113
- [DJ-Cookbook](docs/tutorial/DJ-Cookbook.md)

configs/data_juicer_recipes/sandbox/auto_prompt_optimization/sandbox_auto_prompt_optimization.yaml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ resume: true # allo
1010
# iteration related parameters
1111
max_iter_num: 5
1212
iter_targets:
13-
- "grader_model_prompt_optimization.grades_evaluation.min_mse <= 0.26"
13+
- "grader_model_prompt_optimization.grades_evaluation.min_mse <= 0.2"
1414
iter_updater:
1515
select_and_merge_data_pools.merge_single_prompt_data_pools.merged_top_prompt_dataset: grader_model_prompt_optimization.generate_new_prompts.dj_configs.dataset_path
1616

@@ -53,11 +53,12 @@ pipelines:
5353
type: 'api'
5454
model: 'qwen2.5-32b-instruct'
5555
max_retry_num: 5
56-
build_messages_func: 'build_messages'
56+
build_messages_func: 'build_messages_for_math_qa'
5757
parse_output_func: 'parse_output'
5858
func_kwargs:
5959
system_key: "prompt"
6060
query_key: "question"
61+
response_key: "answer"
6162
# data related
6263
dataset_path: '<replaced_by_the_input>'
6364
export_path: './outputs/auto-prompt-optimization-for-grader-model/infer_results/'

data_juicer/config/config.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
from data_juicer.utils.constant import RAY_JOB_ENV_VAR
2929
from data_juicer.utils.logger_utils import setup_logger
3030
from data_juicer.utils.mm_utils import SpecialTokens
31+
from data_juicer.utils.ray_utils import is_ray_mode
3132

3233
global_cfg = None
3334
global_parser = None
@@ -749,7 +750,6 @@ def init_setup_from_cfg(cfg: Namespace, load_configs_only=False):
749750
"audio_key": cfg.get("audio_key", "audios"),
750751
"video_key": cfg.get("video_key", "videos"),
751752
"image_bytes_key": cfg.get("image_bytes_key", "image_bytes"),
752-
"num_proc": cfg.get("np", None),
753753
"turbo": cfg.get("turbo", False),
754754
"skip_op_error": cfg.get("skip_op_error", True),
755755
"work_dir": cfg.work_dir,
@@ -758,6 +758,8 @@ def init_setup_from_cfg(cfg: Namespace, load_configs_only=False):
758758
"video_special_token": cfg.get("video_special_token", SpecialTokens.video),
759759
"eoc_special_token": cfg.get("eoc_special_token", SpecialTokens.eoc),
760760
}
761+
if not is_ray_mode():
762+
op_attrs.update({"num_proc": cfg.get("np", None)})
761763
cfg.process = update_op_attr(cfg.process, op_attrs)
762764

763765
return cfg

data_juicer/core/data/ray_dataset.py

Lines changed: 11 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
from __future__ import annotations
22

33
import os
4-
import sys
54
from functools import partial
65
from typing import Any, Dict, List, Literal, Optional, Union
76

@@ -16,7 +15,6 @@
1615
from data_juicer.utils.constant import Fields
1716
from data_juicer.utils.file_utils import is_remote_path
1817
from data_juicer.utils.lazy_loader import LazyLoader
19-
from data_juicer.utils.process_utils import calculate_np
2018
from data_juicer.utils.resource_utils import cuda_device_count
2119
from data_juicer.utils.webdataset_utils import _custom_default_decoder
2220

@@ -148,27 +146,16 @@ def process(self, operators, *, exporter=None, checkpointer=None, tracer=None) -
148146
return self
149147
if not isinstance(operators, list):
150148
operators = [operators]
149+
150+
from data_juicer.utils.process_utils import calculate_ray_np
151+
152+
calculate_ray_np(operators)
153+
151154
for op in operators:
152155
self._run_single_op(op)
153156
return self
154157

155158
def _run_single_op(self, op):
156-
# TODO: optimize auto proc
157-
auto_parallel = False
158-
if op.num_proc:
159-
op_proc = op.num_proc
160-
else:
161-
auto_parallel = True
162-
op_proc = sys.maxsize
163-
auto_op_proc = calculate_np(op._name, op.mem_required, op.cpu_required, op.use_cuda(), op.gpu_required)
164-
op_proc = min(op_proc, auto_op_proc)
165-
166-
# use ray default parallelism in cpu mode if op.num_proc is not specified
167-
if op.use_cuda() or not auto_parallel:
168-
logger.info(f"Op [{op._name}] running with number of procs:{op_proc}")
169-
170-
num_gpus = op.gpu_required if op.gpu_required else get_num_gpus(op, op_proc)
171-
172159
if op._name in TAGGING_OPS.modules and Fields.meta not in self.data.columns():
173160

174161
def process_batch_arrow(table: pyarrow.Table):
@@ -193,8 +180,8 @@ def process_batch_arrow(table: pyarrow.Table):
193180
fn_constructor_kwargs=op_kwargs,
194181
batch_size=batch_size,
195182
num_cpus=op.cpu_required,
196-
num_gpus=num_gpus,
197-
concurrency=op_proc,
183+
num_gpus=op.gpu_required,
184+
concurrency=op.num_proc,
198185
batch_format="pyarrow",
199186
)
200187
else:
@@ -203,9 +190,7 @@ def process_batch_arrow(table: pyarrow.Table):
203190
batch_size=batch_size,
204191
batch_format="pyarrow",
205192
num_cpus=op.cpu_required,
206-
concurrency=(
207-
None if auto_parallel else op_proc
208-
), # use ray default parallelism in cpu mode if num_proc is not specified
193+
concurrency=op.num_proc,
209194
)
210195
elif isinstance(op, Filter):
211196
columns = self.data.columns()
@@ -229,8 +214,8 @@ def process_batch_arrow(table: pyarrow.Table):
229214
fn_constructor_kwargs=op_kwargs,
230215
batch_size=batch_size,
231216
num_cpus=op.cpu_required,
232-
num_gpus=num_gpus,
233-
concurrency=op_proc,
217+
num_gpus=op.gpu_required,
218+
concurrency=op.num_proc,
234219
batch_format="pyarrow",
235220
)
236221
else:
@@ -239,9 +224,7 @@ def process_batch_arrow(table: pyarrow.Table):
239224
batch_size=batch_size,
240225
batch_format="pyarrow",
241226
num_cpus=op.cpu_required,
242-
concurrency=(
243-
None if auto_parallel else op_proc
244-
), # use ray default parallelism in cpu mode if num_proc is not specified
227+
concurrency=op.num_proc,
245228
)
246229
if op.stats_export_path is not None:
247230
self.data.write_json(op.stats_export_path, force_ascii=False)

data_juicer/core/sandbox/helper_funcs.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,23 @@ def parse_output(output: str, item: dict, **kwargs):
2727
A simple implementation.
2828
"""
2929
return output
30+
31+
32+
# Math QA grader
33+
@ALL_FUNCS.register_module("build_messages_for_math_qa")
34+
def build_messages_for_math_qa(item: dict, **kwargs):
35+
"""
36+
Build message for math QA grader.
37+
"""
38+
system_key = kwargs.get("system_key", "system")
39+
query_key = kwargs.get("query_key", "query")
40+
response_key = kwargs.get("response_key", "response")
41+
42+
system_prompt = item.get(system_key, "")
43+
question = item[query_key]
44+
answer = item[response_key]
45+
messages = []
46+
if system_prompt:
47+
messages.append({"role": "system", "content": system_prompt})
48+
messages.append({"role": "user", "content": f"Question: {question}\nAnswer: {answer}"})
49+
return messages

data_juicer/ops/base_op.py

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from data_juicer.utils.mm_utils import SpecialTokens, size_to_bytes
99
from data_juicer.utils.model_utils import free_models
1010
from data_juicer.utils.process_utils import calculate_np
11+
from data_juicer.utils.ray_utils import is_ray_mode
1112
from data_juicer.utils.registry import Registry
1213
from data_juicer.utils.resource_utils import is_cuda_available
1314

@@ -191,10 +192,10 @@ def __init__(self, *args, **kwargs):
191192
self.accelerator = self._accelerator
192193

193194
# parameters to determine the number of procs for this op
194-
self.num_proc = kwargs.get("num_proc", None)
195-
self.cpu_required = kwargs.get("cpu_required", 1)
196-
self.gpu_required = kwargs.get("gpu_required", 0)
197-
self.mem_required = kwargs.get("mem_required", 0)
195+
self.num_proc = kwargs.get("num_proc", -1) # -1 means automatic calculation of concurrency
196+
self.cpu_required = kwargs.get("cpu_required", None)
197+
self.gpu_required = kwargs.get("gpu_required", None)
198+
self.mem_required = kwargs.get("mem_required", None)
198199
if isinstance(self.mem_required, str):
199200
self.mem_required = size_to_bytes(self.mem_required) / 1024**3
200201

@@ -215,6 +216,12 @@ def __init__(self, *args, **kwargs):
215216
method = wrap_func_with_nested_access(method)
216217
setattr(self, name, method)
217218

219+
def use_auto_proc(self):
220+
if is_ray_mode() and not self.use_cuda(): # ray task
221+
return self.num_proc == -1
222+
else:
223+
return not self.num_proc or self.num_proc == -1
224+
218225
def is_batched_op(self):
219226
return self._batched_op
220227

@@ -228,8 +235,10 @@ def runtime_np(self):
228235
# Local import to avoid logger being serialized in multiprocessing
229236
from loguru import logger
230237

231-
op_proc = calculate_np(self._name, self.mem_required, self.cpu_required, self.use_cuda(), self.gpu_required)
232-
if self.num_proc is not None:
238+
op_proc = calculate_np(
239+
self._name, self.mem_required, self.cpu_required or 1, self.use_cuda(), self.gpu_required
240+
)
241+
if not self.use_auto_proc():
233242
op_proc = min(op_proc, self.num_proc)
234243
logger.debug(f"Op [{self._name}] running with number of procs:{op_proc}")
235244
return op_proc

data_juicer/ops/filter/flagged_words_filter.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
# Some code here has been modified from:
22
# https://huggingface.co/spaces/huggingface/text-data-filtering
3+
#
4+
# The flagged words list comes from https://huggingface.co/spaces/huggingface/text-data-filtering
5+
# and https://github.com/LDNOOBW/List-of-Dirty-Naughty-Obscene-and-Otherwise-Bad-Words
36
# --------------------------------------------------------
47

58
from typing import List

data_juicer/ops/mapper/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212
from .clean_html_mapper import CleanHtmlMapper
1313
from .clean_ip_mapper import CleanIpMapper
1414
from .clean_links_mapper import CleanLinksMapper
15+
from .detect_character_attributes_mapper import DetectCharacterAttributesMapper
16+
from .detect_character_locations_mapper import DetectCharacterLocationsMapper
17+
from .detect_main_character_mapper import DetectMainCharacterMapper
1518
from .dialog_intent_detection_mapper import DialogIntentDetectionMapper
1619
from .dialog_sentiment_detection_mapper import DialogSentimentDetectionMapper
1720
from .dialog_sentiment_intensity_mapper import DialogSentimentIntensityMapper
@@ -101,6 +104,9 @@
101104
"CleanHtmlMapper",
102105
"CleanIpMapper",
103106
"CleanLinksMapper",
107+
"DetectCharacterAttributesMapper",
108+
"DetectCharacterLocationsMapper",
109+
"DetectMainCharacterMapper",
104110
"DialogIntentDetectionMapper",
105111
"DialogSentimentDetectionMapper",
106112
"DialogSentimentIntensityMapper",

0 commit comments

Comments
 (0)