Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
981efcd
fc
praateekmahajan Aug 28, 2025
fb1a628
some more changes
praateekmahajan Aug 28, 2025
39c4fe9
pr review
praateekmahajan Aug 28, 2025
663d74c
Merge branch 'main' into praateek/add-removal-workflow
praateekmahajan Aug 28, 2025
13addb6
fc
praateekmahajan Aug 28, 2025
006a023
Merge branch 'main' into praateek/add-removal-workflow
praateekmahajan Aug 28, 2025
c899f2e
typo missed
praateekmahajan Aug 28, 2025
be63848
Huvu/image dedup removal (#951)
huvunvidia Aug 28, 2025
e568ca2
Remove outdated Dask files and organize new Ray files (#971)
sarahyurick Aug 28, 2025
a8cd862
Merge branch 'main' into praateek/add-removal-workflow
praateekmahajan Aug 29, 2025
9482b21
delete base
praateekmahajan Aug 29, 2025
1d16cd4
..
praateekmahajan Aug 29, 2025
0e7db8a
fc
praateekmahajan Aug 29, 2025
3976ddb
use nc
praateekmahajan Aug 29, 2025
0e1b44c
...
praateekmahajan Aug 29, 2025
797dc41
no need for path in kill id gen
praateekmahajan Aug 29, 2025
d4d9dc7
move to finally
praateekmahajan Aug 29, 2025
18fd779
Merge branch 'praateek/add-removal-workflow' into praateek/nc-benchma…
praateekmahajan Aug 29, 2025
7ac071f
..
praateekmahajan Sep 2, 2025
14e39a6
Merge remote-tracking branch 'upstream/main' into praateek/nc-benchma…
praateekmahajan Sep 2, 2025
1bfeff8
fc
praateekmahajan Sep 2, 2025
1102553
Merge branch 'main' into praateek/nc-benchmarking-scripts
praateekmahajan Sep 3, 2025
195b5f0
common crawl
praateekmahajan Sep 4, 2025
176119d
Merge branch 'praateek/nc-benchmarking-scripts' of github.com:praatee…
praateekmahajan Sep 4, 2025
511245e
Merge remote-tracking branch 'upstream/main' into praateek/nc-benchma…
praateekmahajan Sep 4, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions nemo_curator/stages/text/download/base/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,3 +174,8 @@ def xenna_stage_spec(self) -> dict[str, Any]:
return {
"num_workers_per_node": self.downloader.num_workers_per_node(),
}

def ray_stage_spec(self) -> dict[str, Any]:
return {
"is_actor_stage": os.environ.get("CAST_AS_ACTOR", "false").lower() == "true",
}
6 changes: 6 additions & 0 deletions nemo_curator/stages/text/download/base/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import os
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any
Expand Down Expand Up @@ -109,3 +110,8 @@ def process(self, task: DocumentBatch) -> DocumentBatch:
},
_stage_perf=task._stage_perf,
)

def ray_stage_spec(self) -> dict[str, Any]:
return {
"is_actor_stage": os.environ.get("CAST_AS_ACTOR", "false").lower() == "true",
}
5 changes: 5 additions & 0 deletions nemo_curator/stages/text/download/base/iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,8 @@ def process(self, task: FileGroupTask) -> DocumentBatch:
},
_stage_perf=task._stage_perf,
)

def ray_stage_spec(self) -> dict[str, Any]:
return {
"is_actor_stage": os.environ.get("CAST_AS_ACTOR", "false").lower() == "true",
}
2 changes: 2 additions & 0 deletions nemo_curator/stages/text/download/base/url_generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import os
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any
Expand Down Expand Up @@ -80,4 +81,5 @@ def process(self, task: _EmptyTask) -> list[FileGroupTask]:
def ray_stage_spec(self) -> dict[str, Any]:
return {
"is_fanout_stage": True,
"is_actor_stage": os.environ.get("CAST_AS_ACTOR", "false").lower() == "true",
}
146 changes: 146 additions & 0 deletions nightly_benchmarking/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
## NeMo-Curator Nightly Benchmarking

Run a matrix of benchmark scripts with per-entry Ray isolation, timeouts, environment capture, and pluggable metric sinks.

### Quick start

```bash
# Minimal
python -m nightly_benchmarking.run \
--matrix nightly_benchmarking/matrix.yaml \
--datasets nightly_benchmarking/dataset_paths.json

# With MLflow and a custom session name
MLFLOW_TRACKING_URI=http://your-mlflow:8265 \
python -m nightly_benchmarking.run \
--matrix nightly_benchmarking/matrix.yaml \
--datasets nightly_benchmarking/dataset_paths.json \
--sink mlflow \
--session-name dedup_removal
```

### How it works

- **Isolated runs**: For each matrix entry the driver starts a fresh Ray head, runs the command, then stops Ray.
- **Placeholders**: Command args can reference `{dataset:name,format}` and `{session}/...` paths.
- **Environment capture**: Saves `pip-freeze.txt`, `conda-explicit.txt` (if available), and `sys-env.json` per entry.
- **Results**: Scripts write standardized results; the driver aggregates and logs params/metrics to the chosen sink.

### Matrix YAML

Required top-level fields and supported options:

```yaml
results_dir: /path/to/results
default_timeout_s: 7200 # Optional; per-entry timeout overrides this
delete_scratch: true # Optional; auto-delete {session}/scratch after run

# Optional sinks (passed through to the sink factory)
mlflow:
tracking_uri: ${MLFLOW_TRACKING_URI}
experiment: ray-curator-xyz
wandb: {}
slack: {}

entries:
- name: xyz_id_xenna
script: some_benchmark.py # Resolved under nightly_benchmarking/scripts by default
script_base_dir: nightly_benchmarking/scripts # Optional override
args: >-
--input-path {dataset:dataset_name,jsonl}
--output-path {session}/scratch
--executor xenna
timeout_s: 1800 # Optional per-entry timeout
delete_scratch: true # Optional per-entry override
ray:
num_cpus: 128
num_gpus: 0
enable_object_spilling: false
```

Notes:
- `${ENV_VAR}` in the YAML is resolved at load time and must be set; otherwise YAML loading fails.
- Only `script` is supported (no custom `cmd` field). The driver appends `--benchmark-results-path {session}/benchmark_results` to the command.

### Placeholders and datasets

- **`{dataset:name,format}`**: Resolved via `dataset_paths.json`.
- **`{session}/...`**: Expanded to the entry directory path.

Example `dataset_paths.json`:

```json
{
"dataset_name": {
"jsonl": "/path/to/dataset_json/",
"parquet": "/path/to/dataset_parquet/"
}
}
```

### Script contract (what your benchmark must do)

- Accept `--benchmark-results-path` (required).
- Write the following files to that directory:
- `params.json`: your input parameters.
- `metrics.json`: include at least `is_success: true|false` plus any metrics.
- `tasks.pkl`: pickled task objects; the driver aggregates per-stage metrics (sum/mean/std).
- Exit with code `0` on success, non-zero on failure.

Minimal skeleton:

```python
import argparse, json, pickle
from pathlib import Path

def main():
p = argparse.ArgumentParser()
p.add_argument("--benchmark-results-path", required=True)
args = p.parse_args()
out = Path(args.benchmark_results_path); out.mkdir(parents=True, exist_ok=True)
Path(out/"params.json").write_text(json.dumps({}))
Path(out/"metrics.json").write_text(json.dumps({"is_success": true}))
with open(out/"tasks.pkl", "wb") as f: pickle.dump([], f)
return 0

if __name__ == "__main__":
raise SystemExit(main())
```

### CLI

```bash
python -m nightly_benchmarking.run \
--matrix PATH_TO_YAML \
--datasets PATH_TO_dataset_paths.json \
[--sink {mlflow,wandb,none}] \
[--session-name NAME]
```

### Output layout (per entry)

```
<results_dir>/<session_name>/<entry_name>/
├── scratch/ # Temporary workspace (deleted if delete_scratch=true)
├── ray_cluster/ # Ray debug artifacts
├── logs/ # stdout/stderr and ray startup logs
│ ├── stdout.log
│ ├── stderr.log
│ └── ray_startup.log
├── artifacts/ # Environment snapshots
│ ├── pip-freeze.txt
│ ├── conda-explicit.txt
│ └── sys-env.json
├── benchmark_results/ # Script outputs (required by contract)
│ ├── params.json
│ ├── metrics.json
│ └── tasks.pkl
└── results.json # Normalized driver result
```

### Troubleshooting

- Set appropriate `timeout_s`/`default_timeout_s` for long runs.
- Ensure `${ENV_VAR}` used in YAML is exported; otherwise config loading fails.
- If datasets fail to resolve, check names/formats in `dataset_paths.json`.
- Inspect `logs/stderr.log` and `logs/ray_startup.log` for failures.
Empty file.
28 changes: 28 additions & 0 deletions nightly_benchmarking/build_docker.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/usr/bin/env bash
set -euo pipefail

# Build the benchmarking image using ray-curator/Dockerfile
# Usage:
# micromamba run -n ray_curator_2506 bash nightly_benchmarking/build_docker.sh [tag]

REPO_ROOT="$(cd "$(dirname "$0")/.." && pwd)"
TAG="${1:-curator-bench:$(date +%Y%m%d)-$(git -C "$REPO_ROOT" rev-parse --short HEAD)}"

echo "Building Docker image: ${TAG}"
echo "Repo root: ${REPO_ROOT}"

export DOCKER_BUILDKIT=1

micromamba run -n ray_curator_2506 docker build \
-f "$REPO_ROOT/ray-curator/Dockerfile" \
-t "$TAG" \
--build-arg CURATOR_ENV=dev \
"$REPO_ROOT"

# Print image digest if available
if command -v docker >/dev/null 2>&1; then
DIGEST=$(micromamba run -n ray_curator_2506 docker inspect --format='{{index .RepoDigests 0}}' "$TAG" || true)
echo "Built image: $TAG digest: ${DIGEST:-unknown}"
fi

echo "Done."
52 changes: 52 additions & 0 deletions nightly_benchmarking/common_crawl_benchmark_matrix.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Matrix configuration for Common Crawl download/extract benchmark

default_timeout_s: 7200
results_dir: /raid/benchmarks/ray-curator/common-crawl-results

# Optional sinks
mlflow:
tracking_uri: ${MLFLOW_TRACKING_URI}
experiment: ray-curator-common-crawl

# Whether to delete scratch dirs after each run
delete_scratch: true

entries:
- name: cc_main_ray_data_cast_as_actor
script: common_crawl_benchmark.py
args: >-
--download_path {session}/scratch/downloads
--output_path {session}/scratch/output
--output_format parquet
--crawl_type main
--start_snapshot 2023-01
--end_snapshot 2023-10
--html_extraction justext
--url_limit 768
--add_filename_column
--executor ray_data
--ray_data_cast_as_actor
timeout_s: 30000
ray:
num_cpus: 64
num_gpus: 0
enable_object_spilling: false

- name: cc_main_raydata
script: common_crawl_benchmark.py
args: >-
--download_path {session}/scratch/downloads
--output_path {session}/scratch/output
--output_format parquet
--crawl_type main
--start_snapshot 2023-01
--end_snapshot 2023-10
--html_extraction justext
--url_limit 768
--add_filename_column
--executor ray_data
timeout_s: 20000
ray:
num_cpus: 64
num_gpus: 0
enable_object_spilling: false
14 changes: 14 additions & 0 deletions nightly_benchmarking/dataset_paths.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"sample": {
"json": "/tmp/sample_data.json",
"parquet": "/tmp/sample_data.parquet"
},
"tinystories": {
"json": "/raid/prospector-lm/clean/tinystories_train",
"parquet": "/raid/prospector-lm/clean/tinystories_train_parquet"
},
"cleaned_cc": {
"jsonl": "/raid/prospector-lm/cleaned_exact_dedup_all_cc/",
"parquet": "/raid/prospector-lm/cleaned_exact_dedup_all_cc_parquet/"
}
}
6 changes: 6 additions & 0 deletions nightly_benchmarking/dataset_paths.template.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"tinystories": {
"json": "/datasets/tinystories_train/",
"parquet": "/datasets/tinystories_train_parquet/"
}
}
37 changes: 37 additions & 0 deletions nightly_benchmarking/removal_benchmark_matrix.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Matrix configuration for removal benchmark
# Usage: python nightly_benchmarking/bench_driver.py --matrix nightly_benchmarking/removal_benchmark_matrix.yaml --datasets nightly_benchmarking/dataset_paths.json

default_timeout_s: 3600 # 1 hour timeout
results_dir: /raid/benchmarks/ray-curator/removal-results
artifacts_dir: /raid/benchmarks/ray-curator/removal-artifacts

# MLflow configuration
mlflow:
tracking_uri: ${MLFLOW_TRACKING_URI}
experiment: ray-curator-removal

# Slack notifications (optional)
slack:
webhook_env: SLACK_WEBHOOK_URL

entries:
# Ray Data executor with curator dedup IDs
- name: removal_curator_dedup_id_ray_data_1fpp_use_file_partitioning_raydata_blocksize_1
script: removal_benchmark.py
args: >-
--input-path {dataset:cleaned_cc,parquet}
--ids-to-remove-path /raid/praateekm/test_ray_fuzzy/output/FuzzyDuplicateIds/
--id-generator-path /raid/praateekm/test_ray_fuzzy/output/fuzzy_id_generator.json
--output-path {session}/scratch
--executor ray_data
--input-filetype parquet
--output-filetype parquet
--id-field _curator_dedup_id
--duplicate-id-field _curator_dedup_id
--files-per-partition 1
--use-ray-data-settings
timeout_s: 7200
ray:
num_cpus: 64
num_gpus: 0
enable_object_spilling: false
Loading
Loading