diff --git a/benchmarking/Dockerfile b/benchmarking/Dockerfile new file mode 100644 index 000000000..d8e521cc9 --- /dev/null +++ b/benchmarking/Dockerfile @@ -0,0 +1,43 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +ARG NEMO_CURATOR_IMAGE=nemo_curator +FROM ${NEMO_CURATOR_IMAGE} AS nemo_curator_benchmarking + +# Add system utilities useful for benchmark and debug +RUN apt-get update \ + && apt-get install -y --no-install-recommends \ + less \ + openssh-client \ + vim \ + wget \ + && apt-get autoremove -y \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* + +# Add dependencies for benchmarking to the Curator Python environment +RUN cd /opt/Curator \ + && uv add \ + GitPython \ + oauth2client \ + pydrive2 \ + pynvml \ + rich \ + && uv cache prune + +# Add the Curator repo to the safe.directory list to avoid GitPython warnings +RUN git config --global --add safe.directory /opt/Curator + + # Set the entrypoint to the main benchmarking runner script +ENTRYPOINT ["python", "/opt/Curator/benchmarking/run.py"] diff --git a/benchmarking/README.md b/benchmarking/README.md new file mode 100644 index 000000000..30c96967c --- /dev/null +++ b/benchmarking/README.md @@ -0,0 +1,581 @@ +# NeMo Curator Benchmarking Framework + +A comprehensive benchmarking framework for measuring and tracking the performance of NeMo Curator. This tool enables developers to ensure quality and performance by running standardized benchmark scripts in reproducible environments. + +## Table of Contents + +- [Quick Start](#quick-start) +- [Concepts](#concepts) +- [Configuration](#configuration) +- [Running benchmarks and using the container](#running-benchmarks-and-using-the-container) +- [Writing Benchmark Scripts](#writing-benchmark-scripts) +- [Sinks: Custom Reporting & Actions](#sinks-custom-reporting--actions) + +--- + +## Quick Start + +**1. Build the Docker image:** + +Assuming the working directory is the NeMo Curator repo root dir: +```bash +./benchmarking/tools/build_docker.sh +``` + +This builds the `curator_benchmarking` image with: +- CUDA support +- Python 3.12 environment +- NeMo Curator from source in repo root dir +- All NeMo Curator dependencies +- Benchmarking framework and scripts + +Note: you may only need to do this periodically when the environment needs to be updated. See the `--use-host-curator` example below. + +**2. Update config:** + +Update `results_path`, `artifacts_path`, and `datasets_path` in the YAML config file based on your preferences. In this example, we'll edit the YAML config `./benchmarking/nightly-benchmark.yaml` + +```yaml +results_path: /path/where/results/are/stored +artifacts_path: /path/where/artifacts/are/stored +datasets_path: /path/to/datasets +``` + +**3. Run benchmarks:** + +```bash +./benchmarking/tools/run.sh --config ./benchmarking/nightly-benchmark.yaml +``` + +To run using the Curator sources on the host instead of those in the image, pass the `--use-host-curator` option: +```bash +./benchmarking/tools/run.sh --config ./benchmarking/nightly-benchmark.yaml --use-host-curator +``` +This is especially useful during active development and debugging since it avoids a costly rebuild step. + + +**4. View results:** + +Results are written to the `results_path` specified in your configuration, organized by session timestamp. + +--- + +## Concepts + +### Session + +A **session** represents a single invocation of the benchmarking framework. Each session: +- Has a unique name with timestamp (e.g., `benchmark-run__2025-01-23__14-30-00`) +- Contains one or more benchmark entries +- Produces a session directory with results and artifacts +- Captures environment metadata (system info, package versions, etc.) + +### Scripts + +**Benchmark scripts** are Python programs that: +- Reside in the `scripts/` directory +- Receive arguments from the framework (paths, parameters, etc.) +- Execute Curator operations and collect metrics +- Write standardized output files (params.json, metrics.json, tasks.pkl) +- Can be run standalone outside of the benchmark framework to debug problems, perform useful work, or be used as examples. +- Can be written by users to benchmark specific use cases. +- Are referenced in the YAML configuration as "entries" to be included in benchmark runs with specific options. + +See [Writing Benchmark Scripts](#writing-benchmark-scripts) for details. + +### Entry + +An **entry** is a single benchmark run within a session. Each entry: +- Runs a specific benchmark script with defined arguments +- Has its own timeout, Ray configuration, sink configuration, pass/fail requirments, or can inherit from session-wide defaults +- Produces metrics, parameters, and run performance data +- Can reference datasets using template syntax +- Can pass additional data to sinks to provide for customized operations unique to the entry. For example, the `slack_sink` can accept additional metrics to report for an entry that other entries may not have. +- Can specify specific requirements that must be met in order to return a passing status. For example, an entry can require that a specific throughput metric meet or exceed a minimum value. + +### Sinks + +**Sinks** are pluggable modules that are called by the framework at various stages to allow for custom processing of benchmark data: +- Initialize at session start +- Process each entry's individual benchmark results +- Finalize at session end + +Built-in sinks include: +- **Slack**: Post results to Slack channels +- **Google Drive**: Upload results to cloud storage (extensible) +- **MLflow**: Track experiments and metrics + +See [Sinks: Custom Reporting & Actions](#sinks-custom-reporting--actions) for details. + +## Configuration + +### YAML Configuration Files + +The framework uses one or more YAML files to configure benchmark sessions. Multiple configuration files are merged, allowing separation of concerns (e.g., machine-specific paths vs. benchmark definitions). + +A useful pattern is to use multiple YAML files, where configuration that does not typically change is in one or more files, and user or machine-specific configuration is others. For example, `my_paths_and_reports.yaml` could have results / artifacts / datasets paths and personal sink settings (individual slack channel, etc.), and `release-benchmarks.yaml` could have the team-wide configuration containing the individual benchmark entries and performance requirements. + +This can be especially useful during development. During development you'll not only want to use your own paths and report settings, you'll also want to use the standard benchmarking environment (i.e. a container), but cannot afford to rebuild the Docker image for each code change you're evaluating. The `--use-host-curator` flag is intended for this case. This flag will use your Curator source dir on host inside the container via a volume mount (this works because the container has curator installed in editable mode), and no image rebuild step is needed. + +An example of a development scenario using this pattern looks like this: +```bash +./benchmarking/tools/run.sh --use-host-curator --config ~/curator_benchmarking/my_paths_and_reports.yaml --config ./benchmarking/release-benchmarks.yaml +``` + +### Configuration Structure + +```yaml +# Required: Base paths for results, artifacts, and datasets +# These paths must exist on the host machine +# When running in Docker with tools/run.sh, paths are automatically mapped to container volumes +# These base paths can be referenced in other configuration values using {results_path}, {artifacts_path}, {datasets_path} +# NOTE: the current version of the framework does not use artifacts_path +results_path: /path/to/results +artifacts_path: /path/to/artifacts +datasets_path: /path/to/datasets + +# Optional: Global timeout for all entries (seconds) +default_timeout_s: 7200 + +# Optional: Delete scratch directories after each entry completes +# The path {session_entry_dir}/scratch is automatically created when an entry starts and can be used by benchmark +#scripts for writing temp files. This directory is automatically cleaned up on completion of the entry if +# delete_scratch is true. +delete_scratch: true + +# Optional: Configure sinks for result processing +sinks: + - name: mlflow + enabled: true + tracking_uri: ${MLFLOW_TRACKING_URI} + experiment: my-experiment + - name: slack + enabled: true + webhook_url: ${SLACK_WEBHOOK_URL} + default_metrics: ["exec_time_s"] # Metrics to report by default for all entries + - name: gdrive + enabled: false + drive_folder_id: ${GDRIVE_FOLDER_ID} + service_account_file: ${GDRIVE_SERVICE_ACCOUNT_FILE} + +# Optional: Define datasets for template substitution +datasets: + - name: common_crawl + formats: + - type: json + path: "{datasets_path}/cc_sample" # Can reference base paths + - type: parquet + path: "{datasets_path}/cc_sample" + +# Required: List of benchmark entries to run +entries: + - name: my_benchmark + enabled: true # Optional: Whether to run this entry (default: true) + script: my_script.py + args: >- + --input {dataset:common_crawl,parquet} + --output {session_entry_dir}/output + timeout_s: 1800 # Optional: Override global timeout + + # Optional: Per-entry sink configuration + sink_data: + - name: slack + additional_metrics: ["throughput_docs_per_sec", "num_documents_processed"] + + # Optional: Ray configuration for this entry + ray: + num_cpus: 32 + num_gpus: 1 + enable_object_spilling: false + + # Optional: Requirements for the benchmark to pass + requirements: + - metric: throughput_docs_per_sec + min_value: 100 + + # Optional: Override global delete_scratch setting + delete_scratch: false +``` + +### Passing Configuration Files + +**Multiple config files:** + +```bash +python benchmarking/run.py \ + --config config.yaml \ + --config paths.yaml \ + --config machine_specific.yaml +``` + +Files are merged in order. Later files override earlier ones for conflicting keys. + +**Session naming:** + +```bash +python benchmarking/run.py \ + --config config.yaml \ + --session-name my-experiment-v2 +``` + +### Environment Variables + +Configuration values can reference environment variables using `${VAR_NAME}` syntax: + +```yaml +results_path: "${HOME}/benchmarks/results" +sinks: + - name: slack + webhook_url: ${SLACK_WEBHOOK_URL} + - name: mlflow + tracking_uri: ${MLFLOW_TRACKING_URI} +``` + +### Template Substitution and Path Resolution + +The framework supports several types of placeholders in configuration values: + +**Base path references** - Reference the configured base paths: + +```yaml +datasets: + - name: my_dataset + formats: + - type: parquet + path: "{datasets_path}/subdir/data.parquet" +``` + +Available base path placeholders: +- `{results_path}` - Resolves to the configured `results_path` +- `{artifacts_path}` - Resolves to the configured `artifacts_path` *Note: unused in current version of the framework* +- `{datasets_path}` - Resolves to the configured `datasets_path` + +**Dataset references** - Reference datasets in entry arguments: + +```yaml +args: --input {dataset:common_crawl,parquet} +``` + +Resolves to the path defined in the `datasets` section for that dataset and format. + +**Session entry directory** - Reference the entry's runtime directory: + +```yaml +args: --output {session_entry_dir}/results +``` + +Resolves to the entry's unique directory within the session (e.g., `/results/session-name__timestamp/entry-name/results`). + +### Entry Configuration Details + +**enabled**: Controls whether an entry is run (default: `true`). Useful for temporarily disabling entries without removing them from the configuration. + +**sink_data**: Provides entry-specific configuration for sinks. For example, the Slack sink can accept `additional_metrics` to report metrics beyond the default set: + +```yaml +sink_data: + - name: slack + additional_metrics: ["num_documents_processed", "throughput_docs_per_sec"] +``` + +**requirements**: Defines pass/fail criteria for the benchmark. If any requirement is not met, the entry is marked as failed: + +```yaml +requirements: + - metric: throughput_docs_per_sec + min_value: 100 + - metric: peak_memory_gb + max_value: 64 +``` + +**ray**: Configures Ray resources for the entry: + +```yaml +ray: + num_cpus: 64 + num_gpus: 4 + enable_object_spilling: false # Disable object spilling to local disk +``` + +--- + +## Running benchmarks and using the container + +The `benchmarking/tools/run.sh` script provides a convenient way to run benchmarks in a Docker container with proper volume mounts, GPU access, and environment configuration. + +### Basic Usage + +Run benchmarks using a configuration file: + +```bash +./benchmarking/tools/run.sh --config benchmarking/my-benchmark.yaml +``` + +This command: +- Reads the configuration file and extracts `results_path`, `artifacts_path`, and `datasets_path` +- Automatically creates volume mounts to map these paths into the container +- Runs the benchmarking framework with the Curator code built into the Docker image +- Passes environment variables like `SLACK_WEBHOOK_URL` and `MLFLOW_TRACKING_URI` to the container + +### Using Host Curator Sources + +To run benchmarks using Curator source code from your local repository instead of the version built into the image: + +```bash +./benchmarking/tools/run.sh --use-host-curator --config benchmarking/my-benchmark.yaml +``` + +This mounts your local Curator repository (from `$HOST_CURATOR_DIR`) into the container at `/opt/Curator`, allowing you to: +- Test local changes without rebuilding the Docker image +- Quickly iterate on Curator development +- Debug issues with modified source code + +The `HOST_CURATOR_DIR` environment variable defaults to the repository root but can be overridden: + +```bash +HOST_CURATOR_DIR=/path/to/my/curator/fork ./benchmarking/tools/run.sh --use-host-curator --config my-benchmark.yaml +``` + +### Interactive Shell + +Get an interactive bash shell in the container environment: + +```bash +./benchmarking/tools/run.sh --shell +``` + +This is useful for: +- Exploring the container environment +- Running benchmarks manually for debugging +- Checking installed packages and versions +- Testing commands before adding them to scripts + +### Running Commands in the Container + +Execute a specific command in the container without an interactive shell: + +```bash +./benchmarking/tools/run.sh --shell "uv pip list" +``` + +This runs the command and exits. Examples: + +```bash +# Check installed packages +./benchmarking/tools/run.sh --shell "uv pip list | grep curator" + +# Verify Python environment +./benchmarking/tools/run.sh --shell "python -c 'import nemo_curator; print(nemo_curator.__version__)'" + +# List available benchmark scripts +./benchmarking/tools/run.sh --shell "ls -l /opt/Curator/benchmarking/scripts/" +``` + +### Controlling GPU Access + +Use the `GPUS` environment variable to control which GPUs are visible to the container: + +```bash +# Use all GPUs (default) +./benchmarking/tools/run.sh --config my-benchmark.yaml + +# Use specific GPUs +GPUS="device=0,1" ./benchmarking/tools/run.sh --config my-benchmark.yaml + +# Use only GPU 2 +GPUS="device=2" ./benchmarking/tools/run.sh --config my-benchmark.yaml + +# Run without GPU access +GPUS="none" ./benchmarking/tools/run.sh --config my-benchmark.yaml +``` + +The `GPUS` value is passed directly to Docker's `--gpus` flag. + +### More details +For more details, refer to the `--help` output for `run.sh` +```bash +./benchmarking/tools/run.sh --help +``` + +--- + +## Writing Benchmark Scripts + +### Script Location + +Benchmark scripts should be placed in the `benchmarking/scripts/` directory. Scripts are referenced by filename in the YAML configuration. + +### Required Script Interface + +Benchmark scripts must follow these requirements: + +#### 1. Accept Framework Arguments + +Your script must accept the `--benchmark-results-path` argument. This is automatically passed by the framework and specifies the directory where output files should be written. You can add any additional custom arguments your benchmark needs. + +#### 2. Generate Required Output Files + +Your script **must** write three JSON/pickle files to the `--benchmark-results-path` directory: + +**`params.json`** - A JSON file containing all parameters used in the benchmark run (input paths, configuration options, etc.). This allows for reproducibility and tracking of what settings were used. + +**`metrics.json`** - A JSON file containing all measured metrics from the benchmark (execution time, throughput, memory usage, etc.). Metric names used here can be referenced in entry requirements and sink configurations. + +**`tasks.pkl`** - A pickle file containing NeMo Curator `Task` objects that capture detailed performance data. Use `nemo_curator.tasks.Task` with `TaskPerfUtils()` to wrap operations in your script, then save all tasks using `Task.get_all_tasks()`. + +### Reference Implementations + +See existing scripts in `scripts/` for complete examples: +- `domain_classification_benchmark.py` - Domain classification with model inference +- `embedding_generation_benchmark.py` - Embedding generation benchmark +- `removal_benchmark.py` - Data removal operations benchmark + +--- + +## Sinks: Custom Reporting & Actions + +### Overview + +Sinks extend the framework to perform custom actions at various stages of the benchmark lifecycle: + +1. **Initialize**: Called once at session start with session metadata +2. **Process Result**: Called after each entry completes with that entry's results +3. **Finalize**: Called once at session end to perform final actions + +### Built-in Sinks + +#### MLflow Sink + +Tracks experiments and metrics in MLflow: + +```yaml +sinks: + - name: mlflow + tracking_uri: http://mlflow-server:5000 + experiment: my-experiment + enabled: true +``` + +#### Slack Sink + +Posts results to Slack channels: + +```yaml +sinks: + - name: slack + webhook_url: https://hooks.slack.com/services/YOUR/WEBHOOK/URL + enabled: true +``` + +Results are formatted as interactive Slack messages with environment info and metrics. + +#### Google Drive Sink + +Placeholder for uploading results to Google Drive: + +```yaml +sinks: + - name: gdrive + enabled: false +``` + +### Writing a Custom Sink + +**1. Create a new sink class** in `runner/sinks/`: + +```python +# runner/sinks/my_custom_sink.py +from typing import Any +from loguru import logger +from runner.sinks.sink import Sink + + +class MyCustomSink(Sink): + def __init__(self, config: dict[str, Any]): + super().__init__(config) + self.config = config + self.enabled = config.get("enabled", True) + self.api_endpoint = config.get("api_endpoint") + + # Initialize any resources + if not self.api_endpoint: + raise ValueError("MyCustomSink: api_endpoint is required") + + def initialize(self, session_name: str, env_data: dict[str, Any]) -> None: + """Called at session start.""" + self.session_name = session_name + self.env_data = env_data + + if self.enabled: + logger.info(f"MyCustomSink: Starting session {session_name}") + # Perform initialization (e.g., create remote session) + + def process_result(self, result: dict[str, Any]) -> None: + """Called after each entry completes.""" + if self.enabled: + logger.info(f"MyCustomSink: Processing {result['name']}") + # Send result to your API, database, etc. + self._send_to_api(result) + + def finalize(self) -> None: + """Called at session end.""" + if self.enabled: + logger.info("MyCustomSink: Finalizing session") + # Perform cleanup, send summary, etc. + + def _send_to_api(self, data: dict) -> None: + """Helper method for API calls.""" + # Your implementation + pass +``` + +**2. Register your sink** in `runner/matrix.py`: + +```python +@classmethod +def load_sinks(cls, sink_configs: list[dict]) -> list[Sink]: + sinks = [] + for sink_config in sink_configs: + sink_name = sink_config["name"] + if sink_name == "my_custom": + from runner.sinks.my_custom_sink import MyCustomSink + sinks.append(MyCustomSink(config=sink_config)) + # ... other sinks ... + return sinks +``` + +**3. Use in configuration:** + +```yaml +sinks: + - name: my_custom + api_endpoint: https://api.example.com/benchmarks + enabled: true +``` + +### Result Data Structure + +Results passed to `process_result()` contain: + +```python +{ + "name": "entry_name", + "success": True, + "exec_time_s": 123.45, + "timeout": False, + "script_params": { ... }, # From params.json + "script_metrics": { ... }, # From metrics.json + "tasks": [ ... ], # From tasks.pkl + "command": "python script.py ...", + "returncode": 0, + "stdouterr_file": "/path/to/log.txt" +} +``` + +--- + +## License + +Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. + +Licensed under the Apache License, Version 2.0. See the main repository LICENSE file for details. diff --git a/benchmarking/__init__.py b/benchmarking/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/benchmarking/config.yaml b/benchmarking/config.yaml new file mode 100644 index 000000000..356cd969f --- /dev/null +++ b/benchmarking/config.yaml @@ -0,0 +1,65 @@ +# See tools/run.sh for the env vars used in this config +results_dir: ${CONTAINER_RESULTS_DIR} +artifacts_dir: ${CONTAINER_ARTIFACTS_DIR} +datasets: + - name: cc + formats: + - type: json + path: ${CONTAINER_DATASETS_DIR}/sample/25de70f6c6a6.jsonl + - type: parquet + path: ${CONTAINER_DATASETS_DIR}/sample/25de70f6c6a6.parquet + +default_timeout_s: 7200 + +# Optional sinks +sinks: + - name: mlflow + tracking_uri: ${MLFLOW_TRACKING_URI} + experiment: ray-curator-common-crawl + enabled: false + - name: slack + webhook_url: ${SLACK_WEBHOOK_URL} + enabled: true + - name: gdrive + enabled: false + +# Whether to delete scratch dirs after each run +delete_scratch: true + +entries: + - name: cc_main_raydata + script: common_crawl_benchmark.py + args: >- + --download_path {session_entry_dir}/scratch/downloads + --output_path {session_entry_dir}/scratch/output + --output_format parquet + --crawl_type main + --start_snapshot 2023-01 + --end_snapshot 2023-10 + --html_extraction justext + --url_limit 10 + --add_filename_column + --executor ray_data + timeout_s: 20000 + ray: + num_cpus: 64 + num_gpus: 0 + enable_object_spilling: false + - name: removal_main_raydata + script: removal_benchmark.py + args: >- + --download_path {session_entry_dir}/scratch/downloads + --output_path {session_entry_dir}/scratch/output + --output_format parquet + --crawl_type main + --start_snapshot 2023-01 + --end_snapshot 2023-10 + --html_extraction justext + --url_limit 10 + --add_filename_column + --executor ray_data + timeout_s: 20000 + ray: + num_cpus: 64 + num_gpus: 0 + enable_object_spilling: false diff --git a/benchmarking/dummy-config.yaml b/benchmarking/dummy-config.yaml new file mode 100644 index 000000000..04677b74f --- /dev/null +++ b/benchmarking/dummy-config.yaml @@ -0,0 +1,53 @@ +# This is a dummy config for testing the benchmarking framework. + +# See tools/run.sh for the env vars used in this config +results_dir: "${CONTAINER_RESULTS_DIR}" +artifacts_dir: "${CONTAINER_ARTIFACTS_DIR}" +datasets: + - name: "cc" + formats: + - type: "json" + path: "${CONTAINER_DATASETS_DIR}/sample/25de70f6c6a6.jsonl" + - type: "parquet" + path: "${CONTAINER_DATASETS_DIR}/sample/25de70f6c6a6.parquet" + +default_timeout_s: 7200 + +# Optional sinks +sinks: + - name: mlflow + enabled: false + tracking_uri: ${MLFLOW_TRACKING_URI} + experiment: ray-curator-common-crawl + - name: slack + enabled: true + webhook_url: ${SLACK_WEBHOOK_URL} + - name: gdrive + enabled: false + drive_folder_id: ${GDRIVE_FOLDER_ID} + service_account_file: ${GDRIVE_SERVICE_ACCOUNT_FILE} + +# Whether to delete scratch dirs after each run +delete_scratch: true + +entries: + - name: demo + script: dummy_benchmark.py + args: >- + --input-path {dataset:cc,parquet} + --output-path {session_entry_dir}/scratch/output + timeout_s: 20000 + ray: + num_cpus: 4 + num_gpus: 0 + enable_object_spilling: false + - name: demo2 + script: dummy_benchmark.py + args: >- + --input-path {dataset:cc,parquet} + --output-path {session_entry_dir}/scratch/output + timeout_s: 20000 + ray: + num_cpus: 4 + num_gpus: 0 + enable_object_spilling: false diff --git a/benchmarking/nightly-benchmark.yaml b/benchmarking/nightly-benchmark.yaml new file mode 100644 index 000000000..5c2ba69e0 --- /dev/null +++ b/benchmarking/nightly-benchmark.yaml @@ -0,0 +1,138 @@ +# These three paths must be defined and are used to resolve the paths for +# results, artifacts, and datasets. These must be existing paths on the host. +# If running inside a Docker container started with tools/run.sh, the paths +# will automatically be mapped to the appropriate volume mount. +# If running on the host, the paths will remain as defined below. +# Paths can be used in other values in this file by using their placeholder +# (e.g. {datasets_path}/my/test/dataset.parquet) and will be resolved to the +# appropriate path at runtime. +results_path: /raid/curator-team/nightly/results +artifacts_path: /raid/curator-team/nightly/artifacts +datasets_path: /raid + +datasets: + - name: "tinystories_train" + formats: + - type: "parquet" + path: "{datasets_path}/prospector-lm/clean/tinystories_train_parquet" + +default_timeout_s: 7200 + +# Optional sinks +sinks: +# - name: mlflow +# enabled: false +# tracking_uri: ${MLFLOW_TRACKING_URI} +# experiment: ray-curator-common-crawl + - name: slack + enabled: true + webhook_url: ${SLACK_WEBHOOK_URL} + default_metrics: ["exec_time_s"] +# - name: gdrive +# enabled: false +# drive_folder_id: ${GDRIVE_FOLDER_ID} +# service_account_file: ${GDRIVE_SERVICE_ACCOUNT_FILE} + +# Whether to delete scratch dirs after each run +delete_scratch: true + +entries: + - name: domain_classification_raydata + enabled: true + script: domain_classification_benchmark.py + args: >- + --executor=ray_data + --input-path={dataset:tinystories_train,parquet} + --dataset-size-gb=10 + --model-inference-batch-size=1024 + timeout_s: 20000 + sink_data: + - name: slack + # Additional metrics to include in the Slack report. These must be present in the metrics.json file generated by the script. + additional_metrics: ["num_documents_processed", "throughput_docs_per_sec"] + ray: + num_cpus: 64 + num_gpus: 4 + enable_object_spilling: false + # Optional: Requirements for the benchmark to pass. These will result in the benchmark being marked as failed if not met. + requirements: + - metric: throughput_docs_per_sec + min_value: 0.2 + + - name: domain_classification_xenna + enabled: false + script: domain_classification_benchmark.py + args: >- + --executor=xenna + --input-path={dataset:tinystories_train,parquet} + --dataset-size-gb=10 + --model-inference-batch-size=1024 + timeout_s: 20000 + + - name: embedding_generation_raydata + enabled: true + script: embedding_generation_benchmark.py + args: >- + --executor=ray_data + --input-path={dataset:tinystories_train,parquet} + --dataset-size-gb=10 + --model-identifier=sentence-transformers/all-MiniLM-L6-v2 + --model-inference-batch-size=1024 + timeout_s: 20000 + sink_data: + - name: slack + # Additional metrics to include in the Slack report. These must be present in the metrics.json file generated by the script. + additional_metrics: ["num_documents_processed", "throughput_docs_per_sec"] + ray: + num_cpus: 64 + num_gpus: 4 + enable_object_spilling: false + + - name: embedding_generation_xenna + enabled: false + script: embedding_generation_benchmark.py + args: >- + --executor=xenna + --input-path={dataset:tinystories_train,parquet} + --dataset-size-gb=10 + --model-identifier=sentence-transformers/all-MiniLM-L6-v2 + --model-inference-batch-size=1024 + timeout_s: 20000 + + - name: removal_raydata + enabled: false + script: removal_benchmark.py + args: >- + --executor=ray_data + --input-path={dataset:tinystories_train,parquet} + --ids-to-remove-path=some_path + --id-generator-path=some_path + --output-path={session_entry_dir}/scratch/output + --input-filetype=parquet + --input-fields=id,text + --input-id-field=CURATOR_DEDUP_ID_STR + --input-files-per-partition=1 + --ids-to-remove-fields=id + --output-filetype=parquet + timeout_s: 20000 + ray: + num_cpus: 64 + num_gpus: 4 + enable_object_spilling: false + + - name: removal_xenna + enabled: false + script: removal_benchmark.py + args: >- + --executor=xenna + --input-path={dataset:tinystories_train,parquet} + --ids-to-remove-path=some_path + --id-generator-path=some_path + --output-path={session_entry_dir}/scratch/output + --input-filetype=parquet + --input-fields=id,text + --input-id-field=CURATOR_DEDUP_ID_STR + --input-files-per-partition=1 + --ids-to-remove-fields=id + --output-filetype=parquet + timeout_s: 20000 diff --git a/benchmarking/run.py b/benchmarking/run.py new file mode 100755 index 000000000..e291f8a3c --- /dev/null +++ b/benchmarking/run.py @@ -0,0 +1,318 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import json +import os +import pickle +import shutil +import sys +import time +import traceback +from pathlib import Path +from typing import Any + +import yaml +from loguru import logger + +from nemo_curator.tasks.utils import TaskPerfUtils +from nemo_curator.utils.file_utils import create_or_overwrite_dir + +_this_script_dir = Path(__file__).parent + +# TODO: How do we want to package this tool? Perhaps a package extra for +# nemo-curator, i.e. nemo-curator[benchmarking]? +# For now, add this directory to PYTHONPATH to import the runner modules +sys.path.insert(0, _this_script_dir) + +# ruff: noqa: E402 +from runner.datasets import DatasetResolver +from runner.env_capture import dump_env +from runner.matrix import MatrixConfig, MatrixEntry +from runner.path_resolver import PathResolver +from runner.process import run_command_with_timeout +from runner.ray_cluster import ( + setup_ray_cluster_and_env, + teardown_ray_cluster_and_env, +) +from runner.utils import find_result, get_obj_for_json, resolve_env_vars + + +def ensure_dir(dir_path: Path) -> None: + """Ensure dir_path and parents exists, creating them if necessary.""" + dir_path.mkdir(parents=True, exist_ok=True) + + +def get_entry_script_persisted_data(benchmark_results_path: Path) -> dict[str, Any]: + """Read the files that are expected to be generated by the individual benchmark scripts.""" + params_json = benchmark_results_path / "params.json" + if not params_json.exists(): + logger.warning(f"Params JSON file not found at {params_json}") + script_params = {} + else: + with open(params_json) as f: + script_params = json.load(f) + + metrics_json = benchmark_results_path / "metrics.json" + if not metrics_json.exists(): + logger.warning(f"Metrics JSON file not found at {metrics_json}") + script_metrics = {} + else: + with open(metrics_json) as f: + script_metrics = json.load(f) + + tasks_pkl = benchmark_results_path / "tasks.pkl" + if not tasks_pkl.exists(): + logger.warning(f"Tasks pickle file not found at {tasks_pkl}") + script_tasks = [] + else: + with open(tasks_pkl, "rb") as f: + script_tasks = pickle.load(f) # noqa: S301 + if isinstance(script_tasks, list): + script_metrics.update(TaskPerfUtils.aggregate_task_metrics(script_tasks, prefix="task")) + elif isinstance(script_tasks, dict): + for pipeline_name, pipeline_tasks in script_tasks.items(): + script_metrics.update( + TaskPerfUtils.aggregate_task_metrics(pipeline_tasks, prefix=pipeline_name.lower()) + ) + + return {"params": script_params, "metrics": script_metrics} + + +def check_requirements_update_results(result_data: dict[str, Any], requirements: dict[str, Any]) -> bool: + """ + Check if the benchmark meets the requirements. Creates a new "requirements" key in the result_data + dictionary with the results of the requirements checks. + Returns True if the benchmark meets the requirements, False otherwise. + """ + meets_requirements = True + requirements_data = {} + + for metric_name, requirement_dict in requirements.items(): + reason_not_met = None + actual_value = find_result(result_data, metric_name) + if actual_value is None: + reason_not_met = f"{metric_name} not found in metrics" + else: + has_min = "min_value" in requirement_dict + has_max = "max_value" in requirement_dict + if has_min: + min_value = requirement_dict["min_value"] + if actual_value < min_value: + reason_not_met = f"{metric_name} < {min_value}" + if has_max: + max_value = requirement_dict["max_value"] + if actual_value > max_value: + reason_not_met = f"{metric_name} > {max_value}" + if not has_min and not has_max: + reason_not_met = f"No min or max value specified for {metric_name}" + + # Update the requirements_data dictionary with the result of the requirements check + meets_requirements &= reason_not_met is None + if reason_not_met is None: + logger.debug(f"\t\t✅ Requirement for {metric_name} was met") + else: + requirements_data[metric_name] = reason_not_met + logger.error(f"\t\t❌ Requirement for {metric_name} was not met: {reason_not_met}") + + result_data["requirements_not_met"] = requirements_data + return meets_requirements + + +def run_entry( + entry: MatrixEntry, + path_resolver: PathResolver, + dataset_resolver: DatasetResolver, + session_path: Path, + result_data: dict[str, Any], +) -> bool: + session_entry_path = session_path / entry.name + + # scratch_path : This is the directory user can use to store scratch data; it'll be cleaned up after the entry is done if delete_scratch is True + # ray_cluster_path : This is the directory where Ray cluster is started; it'll be cleaned up after the entry is done + # logs_path : This is the directory where logs are stored + # benchmark_results_path : This is the directory where benchmark results are stored + scratch_path, ray_cluster_path, logs_path, benchmark_results_path = [ + (session_entry_path / d).absolute() for d in ["scratch", "ray_cluster", "logs", "benchmark_results"] + ] + cmd = entry.get_command_to_run(session_entry_path, benchmark_results_path, path_resolver, dataset_resolver) + run_id = result_data.get("run_id", f"{entry.name}-{int(time.time())}") + + try: + # Create directories individually + for directory in [scratch_path, ray_cluster_path, logs_path, benchmark_results_path]: + create_or_overwrite_dir(directory) + + ray_client, ray_temp_dir, ray_env = setup_ray_cluster_and_env( + num_cpus=entry.ray.get("num_cpus", os.cpu_count() or 1), + num_gpus=entry.ray.get("num_gpus", 0), + enable_object_spilling=bool(entry.ray.get("enable_object_spilling", False)), + ray_log_path=logs_path / "ray.log", + ) + + # Execute command with timeout + logger.info(f"\t\tRunning command {' '.join(cmd) if isinstance(cmd, list) else cmd}") + started_exec = time.time() + run_data = run_command_with_timeout( + command=cmd, + timeout=entry.timeout_s, + stdouterr_path=logs_path / "stdouterr.log", + env=ray_env, + run_id=run_id, + fancy=os.environ.get("CURATOR_BENCHMARKING_DEBUG", "0") == "0", + ) + ended_exec = time.time() + duration = ended_exec - started_exec + + # Update result_data + result_data.update( + { + "cmd": cmd, + "exec_started_at": started_exec, + "exec_time_s": duration, + "exit_code": run_data["returncode"], + "timed_out": run_data["timed_out"], + "logs_dir": logs_path, + } + ) + ray_data = {} + # script_persisted_data is a dictionary with keys "params" and "metrics" + # "params" will contain everything the script wrote to its params.json file + # "metrics" will contain everything the script wrote to its metrics.json file plus metrics + # from the Task objects restored from the tasks.pkl file. + script_persisted_data = get_entry_script_persisted_data(benchmark_results_path) + result_data.update( + { + "ray_data": ray_data, + "metrics": script_persisted_data["metrics"], + "params": script_persisted_data["params"], + } + ) + + # Check if the run itself returned a success code, if so, use the updated + # result_data to check if requirements were met. + if run_data["returncode"] == 0: + success = check_requirements_update_results(result_data, entry.requirements) + else: + success = False + logger.error(f"\t\t❌ Run Failed in {duration:.1f} seconds") + if run_data["timed_out"]: + logger.warning(f"\t\t⏰ Timed out after {entry.timeout_s}s") + + result_data["success"] = success + logger.info(f"\t\tLogs found in {logs_path}") + Path(session_entry_path / "results.json").write_text(json.dumps(get_obj_for_json(result_data))) + + return success + + finally: + teardown_ray_cluster_and_env(ray_client, ray_temp_dir, ray_cluster_path) + + # Clean up the scratch dir if configured to delete + if entry.delete_scratch: + shutil.rmtree(scratch_path, ignore_errors=True) + + +def main() -> int: + parser = argparse.ArgumentParser(description="Runs the benchmarking application") + parser.add_argument( + "--config", + type=Path, + action="append", + required=True, + help=( + "Path to YAML config for benchmark matrix, machine paths, etc. Can be " + "specified multiple times to merge configs." + ), + ) + parser.add_argument( + "--session-name", + default=None, + help=("Optional human-readable session name. Default is benchmark-run__."), + ) + args = parser.parse_args() + + # Consolidate the configuration from all YAML files into a single dict + config_dict = {} + for yml_file in args.config: + with open(yml_file) as f: + config_dicts = yaml.full_load_all(f) + for d in config_dicts: + config_dict.update(d) + # Preprocess the config dict prior to creating objects from it + try: + MatrixConfig.assert_valid_config_dict(config_dict) + config_dict = resolve_env_vars(config_dict) + except ValueError as e: + logger.error(f"Invalid configuration: {e}") + return 1 + + config = MatrixConfig.create_from_dict(config_dict) + + # Create session folder under results_dir + session_name = args.session_name or time.strftime("benchmark-run__%Y-%m-%d__%H-%M-%S") + session_path = (config.results_path / session_name).absolute() + ensure_dir(session_path) + + session_overall_success = True + logger.info(f"Started session {session_name}...") + env_dict = dump_env(session_path) + + for sink in config.sinks: + sink.initialize(session_name=session_name, matrix_config=config, env_dict=env_dict) + + for entry in config.entries: + run_success = False + run_id = f"{entry.name}-{int(time.time())}" + result_data = { + "name": entry.name, + "run_id": run_id, + "success": run_success, + } + logger.info(f"\tRunning {entry.name} (run ID: {run_id})") + try: + run_success = run_entry( + entry=entry, + path_resolver=config.path_resolver, + dataset_resolver=config.dataset_resolver, + session_path=session_path, + result_data=result_data, + ) + + except Exception as e: # noqa: BLE001 + run_success = False + error_traceback = traceback.format_exc() + logger.error(f"\t\t❌ Entry failed with exception: {e}") + logger.debug(f"Full traceback:\n{error_traceback}") + result_data.update( + { + "error": str(e), + "traceback": error_traceback, + "success": run_success, + } + ) + + finally: + session_overall_success &= run_success + for sink in config.sinks: + sink.process_result(result_dict=result_data, matrix_entry=entry) + + for sink in config.sinks: + sink.finalize() + logger.info(f"Session {session_name} completed with overall success: {session_overall_success}") + return 0 if session_overall_success else 1 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/benchmarking/runner/__init__.py b/benchmarking/runner/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/benchmarking/runner/datasets.py b/benchmarking/runner/datasets.py new file mode 100644 index 000000000..eafa7b73e --- /dev/null +++ b/benchmarking/runner/datasets.py @@ -0,0 +1,59 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + + +class DatasetResolver: + def __init__(self, data: list[dict]) -> None: + """ + Constructor for a DatasetResolver which accepts a list of dataset dictionaries. + Dataset dictionaries are likely created from reading one or more YAML files. + Example input: + [ { "name": "my_dataset", + "formats": [ + {"type": "parquet", "path": "/path/to/my_dataset.parquet"}, + {"type": "jsonl", "path": "/path/to/my_dataset.jsonl"}, + ] + }, + ...] + """ + self._map = {} + + # Check for duplicate dataset names before proceeding + names = [d["name"] for d in data] if len(data) else [] + if len(names) != len(set(names)): + duplicates = {name for name in names if names.count(name) > 1} + msg = f"Duplicate dataset name(s) found: {', '.join(duplicates)}" + raise ValueError(msg) + + for dataset in data: + formats = dataset["formats"] + if not isinstance(formats, list): + msg = "formats must be a list" + raise TypeError(msg) + format_map = {} + for fmt in formats: + format_map[fmt["type"]] = fmt["path"] + self._map[dataset["name"]] = format_map + + def resolve(self, dataset_name: str, file_format: str) -> str: + if dataset_name not in self._map: + msg = f"Unknown dataset: {dataset_name}" + raise KeyError(msg) + formats = self._map[dataset_name] + if file_format not in formats: + msg = f"Unknown format '{file_format}' for dataset '{dataset_name}'" + raise KeyError(msg) + return formats[file_format] diff --git a/benchmarking/runner/env_capture.py b/benchmarking/runner/env_capture.py new file mode 100644 index 000000000..88244fc0e --- /dev/null +++ b/benchmarking/runner/env_capture.py @@ -0,0 +1,126 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import os +import platform +import shutil +import subprocess +import sys +from pathlib import Path +from typing import Any + +import git +import pynvml +from loguru import logger +from runner.utils import get_obj_for_json + + +def dump_env(output_path: Path) -> dict[str, Any]: + env_data = get_env() + + # Try package managers in order of preference for capturing the environment + # package_managers = [("uv", "pip freeze"), ("pip", "freeze"), ("micromamba", "list --explicit"), ("conda", "list --explicit")] # noqa: ERA001 + package_managers = [("uv", "pip freeze")] + env_dumped = False + for package_manager, cmd in package_managers: + if shutil.which(package_manager): + cmd_list = [package_manager, *cmd.split(" ")] + exp = subprocess.check_output(cmd_list, text=True, timeout=120) # noqa: S603 + packages_txt_path = output_path / "packages.txt" + packages_txt_path.write_text(exp) + env_data["packages_txt"] = str(packages_txt_path) + logger.info(f"Captured packages from {package_manager} {cmd} to {packages_txt_path}") + env_dumped = True + break + if not env_dumped: + logger.warning( + f"No package manager ({', '.join([pm for pm, _ in package_managers])}) found in PATH, skipping environment capture" + ) + + # Write env data to file as JSON and return the dictionary written + (output_path / "env.json").write_text(json.dumps(get_obj_for_json(env_data))) + return env_data + + +def get_env() -> dict[str, Any]: + try: + import ray + + ray_version = ray.__version__ + except ModuleNotFoundError: + ray_version = "not_installed" + + git_commit_string = get_git_commit_string() + cuda_visible_devices = get_gpu_info_string() + # The image digest is not known at image build time and is not available inside the + # container, so it must be passed in when the container is run. + # Get the image digest via the env var set from tools/run.sh + image_digest = os.getenv("IMAGE_DIGEST", "unknown") + + # Better support container environment usage by looking for the env var HOST_HOSTNAME + # which can be set to the container hostname (see tools/run.sh) since the name of the + # host machine is expected by most users. + return { + "hostname": os.getenv("HOST_HOSTNAME", platform.node()), + "platform": platform.platform(), + "ray_version": ray_version, + "git_commit": git_commit_string, + "image_digest": image_digest, + "python_version": platform.python_version(), + "executable": sys.executable, + "cuda_visible_devices": cuda_visible_devices, + } + + +def get_git_commit_string() -> str: + """Returns the git commit string for Curator.""" + # Use the directory where this script is located (which is assumed to be the Curator repo) + # Another option is to use the file location of the nemo_curator __init__.py file, but that may not be the location of the repo if nemo_curator is installed as a package. + # Note: if the benchmarking tools (i.e. this file and others) eventually become an installable package, this approach may not work if these tools are installed as a package. + try: + repo = git.Repo(Path(__file__).parent, search_parent_directories=True) + commit_str = repo.head.commit.hexsha + except Exception as e: # noqa: BLE001 + logger.warning(f"Failed to get git commit string: {e}") + commit_str = "unknown" + + return commit_str + + +def get_gpu_info_string() -> str: + """Returns a string describing the GPUs visible to the process. + If no GPUs are visible, returns "No GPUs found". + If multiple GPUs are visible, returns a string with the number of each GPU model (e.g. "2X H100"). + """ + try: + pynvml.nvmlInit() + # Rather than assume all visible GPUs are the same model (which is by + # far the most common case), count the number of each model just in case. + gpu_names_count = {} + for i in range(pynvml.nvmlDeviceGetCount()): + handle = pynvml.nvmlDeviceGetHandleByIndex(i) + name = pynvml.nvmlDeviceGetName(handle) + gpu_names_count[name] = gpu_names_count.get(name, 0) + 1 + pynvml.nvmlShutdown() + if len(gpu_names_count) > 0: + counts = [f"{count}X {name}" for name, count in gpu_names_count.items()] + gpu_info_str = ", ".join(counts) + else: + gpu_info_str = "No GPUs found" + except Exception as e: # noqa: BLE001 + logger.warning(f"Failed to get GPU info: {e}") + gpu_info_str = "unknown" + + return gpu_info_str diff --git a/benchmarking/runner/matrix.py b/benchmarking/runner/matrix.py new file mode 100644 index 000000000..7ee0cdaca --- /dev/null +++ b/benchmarking/runner/matrix.py @@ -0,0 +1,258 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# ruff: noqa: ERA001 + +from __future__ import annotations + +import re +from dataclasses import dataclass, field, fields +from pathlib import Path +from typing import TYPE_CHECKING, Any + +from loguru import logger + +if TYPE_CHECKING: + from runner.sinks.sink import Sink +from runner.datasets import DatasetResolver +from runner.path_resolver import PathResolver + + +@dataclass +class MatrixEntry: + name: str + script: str | None = None + args: str | None = None + script_base_dir: Path = Path(__file__).parent.parent / "scripts" + timeout_s: int | None = None + sink_data: list[dict[str, Any]] | dict[str, Any] = field(default_factory=dict) + requirements: list[dict[str, Any]] | dict[str, Any] = field(default_factory=dict) + ray: dict[str, Any] = field(default_factory=dict) # supports only single node: num_cpus,num_gpus,object_store_gb + # If set, overrides the session-level delete_scratch setting for this entry + delete_scratch: bool | None = None + enabled: bool = True + + def __post_init__(self) -> None: # noqa: C901 + """Post-initialization checks and updates for dataclass.""" + # Convert the sink_data list of dicts to a dict of dicts for easier lookup with key from "name". + # sink_data typically starts as a list of dicts from reading YAML, like this: + # sink_data: + # - name: slack + # additional_metrics: ["num_documents_processed", "throughput_docs_per_sec"] + # - name: gdrive + # ... + sink_data = {} + # Will be a list of dicts if reading from YAML, in which case make it a dict of dicts with key + # from "name" for easy lookup based on sink name. + if isinstance(self.sink_data, list): + for data in self.sink_data: + sink_data[data["name"]] = data + # If already a dict, use it directly and assume it is already in the correct format. + elif isinstance(self.sink_data, dict): + sink_data = self.sink_data + else: + msg = f"Invalid sink_data type: {type(self.sink_data)}" + raise TypeError(msg) + self.sink_data = sink_data + + # Convert the requirements list of dicts to a dict of dicts for easier lookup with key from "metric". + # requirements typically starts as a list of dicts from reading YAML, like this: + # requirements: + # - metric: throughput_docs_per_sec + # min_value: 200 + # - metric: num_documents_processed + # ... + requirements = {} + # Will be a list of dicts if reading from YAML, in which case make it a dict of dicts with key + # from "metric" for easy lookup based on metric name. + if isinstance(self.requirements, list): + for data in self.requirements: + requirements[data["metric"]] = data + # If already a dict, use it directly and assume it is already in the correct format. + elif isinstance(self.requirements, dict): + requirements = self.requirements + else: + msg = f"Invalid requirements type: {type(self.requirements)}" + raise TypeError(msg) + # For each requirement dict in requirements, check that if both min_value and max_value are present, + # then max_value >= min_value. Raise ValueError if not. + # Raise TypeError if req is not a dict. + for metric_name, req in requirements.items(): + if not isinstance(req, dict): + msg = f"Requirement for metric '{metric_name}' is not a dict: {type(req)}" + raise TypeError(msg) + has_min = "min_value" in req + has_max = "max_value" in req + if has_min and has_max: + min_value = req["min_value"] + max_value = req["max_value"] + if max_value < min_value: + msg = f"Invalid requirement for metric '{metric_name}': max_value ({max_value}) < min_value ({min_value})" + raise ValueError(msg) + self.requirements = requirements + + def get_command_to_run( + self, + session_entry_path: Path, + benchmark_results_path: Path, + path_resolver: PathResolver, + dataset_resolver: DatasetResolver, + ) -> str: + if self.script: + script_path = self.script_base_dir / self.script + # TODO: should --benchmark-results-path always be passed? + cmd = f"python {script_path} {self.args or ''} --benchmark-results-path={benchmark_results_path}" + + cmd = self.substitute_paths_in_cmd(cmd, path_resolver, dataset_resolver) + cmd = self.substitute_template_placeholders(cmd, session_entry_path) + else: + msg = f"Entry {self.name} must specify either cmd or script" + raise ValueError(msg) + + return cmd + + def get_sink_data(self, sink_name: str) -> dict[str, Any]: + return self.sink_data.get(sink_name, {}) + + @staticmethod + def substitute_paths_in_cmd(cmd: str, path_resolver: PathResolver, dataset_resolver: DatasetResolver) -> str: + dataset_pattern = re.compile(r"\{dataset:([^,}]+),([^}]+)\}") + + def _replace_dataset(match: re.Match[str]) -> str: + dataset_name = match.group(1).strip() + dataset_format = match.group(2).strip() + return str(dataset_resolver.resolve(dataset_name, dataset_format)) + + path_pattern = re.compile(r"\{([a-zA-Z_][a-zA-Z0-9_]*)\}") + + def _replace_path(match: re.Match[str]) -> str: + path_name = match.group(1).strip() + # PathResolver.resolve() only matches specific paths intended to be mapped between host and container. + # ValueError is raised if this is not one of those paths, meaning either the path is meant for template + # substitution instead or possibly should be used as-is, in which case simply return the original string. + try: + return str(path_resolver.resolve(path_name)) + except ValueError: + return match.group(0) + + return path_pattern.sub(_replace_path, dataset_pattern.sub(_replace_dataset, cmd)) + + @staticmethod + def substitute_template_placeholders(cmd: str, session_entry_path: Path) -> str: + """Substitute template placeholders in command. + Example: + - {session_entry_dir}/results.json -> /path/to/session/entry/results.json + """ + session_entry_pattern = re.compile(r"\{session_entry_dir\}") + + def replace_session_entry_path(match: re.Match[str]) -> str: # noqa: ARG001 + return str(session_entry_path) + + return session_entry_pattern.sub(replace_session_entry_path, cmd) + + +@dataclass(frozen=True, kw_only=True) +class MatrixConfig: + results_path: Path + artifacts_path: Path + entries: list[MatrixEntry] = field(default_factory=list) + sinks: list[Sink] = field(default_factory=list) + default_timeout_s: int = 7200 + # Whether to delete the entry's scratch directory after completion by default + delete_scratch: bool = True + path_resolver: PathResolver = None + dataset_resolver: DatasetResolver = None + + def __post_init__(self) -> None: + """Post-initialization checks and updates for dataclass.""" + names = [entry.name for entry in self.entries] + if len(names) != len(set(names)): + duplicates = {name for name in names if names.count(name) > 1} + msg = f"Duplicate entry name(s) found: {', '.join(duplicates)}" + raise ValueError(msg) + + # Update delete_scratch for each entry that has not been set to the session-level delete_scratch setting + for entry in self.entries: + if entry.delete_scratch is None: + entry.delete_scratch = self.delete_scratch + + # Update timeout_s for each entry that has not been set to the session-level default_timeout_s + for entry in self.entries: + if entry.timeout_s is None: + entry.timeout_s = self.default_timeout_s + + @classmethod + def assert_valid_config_dict(cls, data: dict) -> None: + """Assert that the configuration contains the minimum required config values.""" + required_fields = ["results_path", "artifacts_path", "datasets_path", "entries"] + missing_fields = [k for k in required_fields if k not in data] + if missing_fields: + msg = f"Invalid configuration: missing required fields: {missing_fields}" + raise ValueError(msg) + + @classmethod + def create_from_dict(cls, data: dict) -> MatrixConfig: + """ + Factory method to create a MatrixConfig from a dictionary. + + The dictionary is typically created from reading one or more YAML files. + This method resolves environment variables and converts the list of + entry dicts to MatrixEntry objects, and returns a new MatrixConfig + object. + """ + path_resolver = PathResolver(data) + dataset_resolver = DatasetResolver(data.get("datasets", [])) + + # Filter out data not needed for a MatrixConfig object. + mc_field_names = {f.name for f in fields(cls)} + mc_data = {k: v for k, v in data.items() if k in mc_field_names} + sinks = cls.create_sinks_from_dict(mc_data.get("sinks", [])) + # Load entries only if enabled (enabled by default) + # TODO: should entries be created unconditionally and have an "enabled" field instead? + entries = [MatrixEntry(**e) for e in mc_data["entries"] if e.get("enabled", True)] + + mc_data["results_path"] = path_resolver.resolve("results_path") + mc_data["artifacts_path"] = path_resolver.resolve("artifacts_path") + mc_data["entries"] = entries + mc_data["sinks"] = sinks + mc_data["path_resolver"] = path_resolver + mc_data["dataset_resolver"] = dataset_resolver + + return cls(**mc_data) + + @classmethod + def create_sinks_from_dict(cls, sink_configs: list[dict]) -> list[Sink]: + """Load sinks from the list of sink configuration dictionaries.""" + sinks = [] + for sink_config in sink_configs: + sink_name = sink_config["name"] + sink_enabled = sink_config.get("enabled", True) + if not sink_enabled: + logger.warning(f"Sink {sink_name} is not enabled, skipping") + continue + if sink_name == "mlflow": + from runner.sinks.mlflow_sink import MlflowSink + + sinks.append(MlflowSink(sink_config=sink_config)) + elif sink_name == "slack": + from runner.sinks.slack_sink import SlackSink + + sinks.append(SlackSink(sink_config=sink_config)) + elif sink_name == "gdrive": + from runner.sinks.gdrive_sink import GdriveSink + + sinks.append(GdriveSink(sink_config=sink_config)) + else: + logger.warning(f"Unknown sink: {sink_name}, skipping") + return sinks diff --git a/benchmarking/runner/path_resolver.py b/benchmarking/runner/path_resolver.py new file mode 100644 index 000000000..2525dc9df --- /dev/null +++ b/benchmarking/runner/path_resolver.py @@ -0,0 +1,58 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pathlib import Path + +CONTAINER_CURATOR_DIR = "/opt/Curator" + +# Currently all set to /MOUNT to make it obvious in log/error messages from the +# container that these paths are available on the host. The assumption is that +# /MOUNT is simply prepended to the host absolute path, meaning each mounted dir +# will automatically be unique (unless the user intentionally used the same host +# paths). This also makes copy-and-paste easier, since all but "/MOUNT" can be +# copied and used on the host as-is. +CONTAINER_CONFIG_DIR_ROOT = "/MOUNT" +CONTAINER_RESULTS_DIR_ROOT = "/MOUNT" +CONTAINER_ARTIFACTS_DIR_ROOT = "/MOUNT" +CONTAINER_DATASETS_DIR_ROOT = "/MOUNT" + + +class PathResolver: + """ + Resolves host/container paths for results, artifacts, and datasets. + """ + + def __init__(self, data: dict) -> None: + """ + data is a dictionary containing the paths for results, artifacts, and datasets + Resolved paths for a container are simply a root dir (see above) prepended to the host path. + """ + # TODO: Is this the best way to determine if running inside a Docker container? + in_docker = Path("/.dockerenv").exists() + (rp, ap, dp) = (Path(data["results_path"]), Path(data["artifacts_path"]), Path(data["datasets_path"])) + self.path_map = { + "results_path": Path(f"{CONTAINER_RESULTS_DIR_ROOT}/{rp}") if in_docker else rp, + "artifacts_path": Path(f"{CONTAINER_ARTIFACTS_DIR_ROOT}/{ap}") if in_docker else ap, + "datasets_path": Path(f"{CONTAINER_DATASETS_DIR_ROOT}/{dp}") if in_docker else dp, + } + + def resolve(self, dir_type: str) -> Path: + """ + Given a directory type (e.g., 'results_path'), return the path. + """ + if dir_type not in self.path_map: + msg = f"Unknown dir_type: {dir_type}" + raise ValueError(msg) + + return self.path_map[dir_type] diff --git a/benchmarking/runner/process.py b/benchmarking/runner/process.py new file mode 100644 index 000000000..bdc4762ac --- /dev/null +++ b/benchmarking/runner/process.py @@ -0,0 +1,316 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import shlex +import subprocess +import sys +import threading +import time +import traceback +import unicodedata +from collections import deque +from pathlib import Path +from typing import Any + +from rich.live import Live +from rich.panel import Panel +from rich.text import Text + +# Create a translation table that maps all control characters to None for deletion in order to safely print subprocess output to the scrolling live window. +# This includes characters in the Unicode category 'Cc' (Control). +_control_chars = {c: None for c in range(sys.maxunicode) if unicodedata.category(chr(c)) == "Cc"} + + +def run_command_with_timeout( # noqa: PLR0913 + command: str, + timeout: int, + stdouterr_path: Path = Path("stdouterr.log"), + env: dict[str, str] | None = None, + run_id: str | None = None, + fancy: bool = True, + collapse_on_success: bool = True, +) -> dict[str, Any]: + """Run a shell command with an optional timeout, streaming output to a log file. + + If running in an interactive terminal, displays subprocess output in a live, scrolling window. + Otherwise, prints output to the console and saves it to a log file. + + Args: + command: The shell command to run (as a string or list of arguments). + timeout: Timeout (in seconds) to terminate the command. + stdouterr_path: Path to the file for writing combined stdout and stderr. + env: Optional dictionary of environment variables. + run_id: Optional run ID to identify the run. + fancy: If True, displays subprocess output in a live, scrolling window. + collapse_on_success: If True and command succeeds, collapses live window output (only for fancy=True and interactive mode). + + Returns: + dict: Contains 'returncode' and 'timed_out' fields. + """ + cmd_list = command if isinstance(command, list) else shlex.split(command) + + if sys.stdout.isatty() and fancy: + return display_scrolling_subprocess( + cmd_list, + timeout=timeout, + stdouterr_path=stdouterr_path, + env=env, + window_height=6, + collapse_on_success=collapse_on_success, + run_id=run_id, + ) + else: + return display_simple_subprocess( + cmd_list, timeout=timeout, stdouterr_path=stdouterr_path, env=env, run_id=run_id + ) + + +def display_simple_subprocess( + cmd_list: list[str], + timeout: int, + stdouterr_path: Path = Path("stdouterr.log"), + env: dict[str, str] | None = None, + run_id: str | None = None, +) -> dict[str, Any]: + """Run a shell command with an optional timeout, streaming both stdout and stderr to a log file. + + This function runs the given command using subprocess, writes all combined output (stdout and stderr) + to the provided log file, and streams it live to sys.stdout. Output is processed line by line in a dedicated + thread to ensure real-time updates. If the command does not complete within the specified timeout, it is terminated + and, if necessary, force killed. In case of timeout, a message is written to both the log file and stdout. + + Args: + cmd_list: List of command arguments to execute. + timeout: Maximum allowed time in seconds before the process is terminated. + stdouterr_path: Destination file to save all subprocess output. + env: Optional dictionary of environment variables to use. + run_id: Optional run ID to identify the run. + + Returns: + dict: Contains 'returncode' (process exit code or 124 if timed out) and 'timed_out' (True if killed on timeout). + """ + return_code = 0 + timed_out = False + msg = "" + run_id_msg = f" for run ID: {run_id}" if run_id else "" + + with open(stdouterr_path, "w") as outfile: + start_time = time.time() + try: + process = subprocess.Popen( # noqa: S603 + cmd_list, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + env=env, + text=True, + bufsize=1, + universal_newlines=True, + ) + + def reader() -> None: + """Reads process output line by line and updates both the file and stdout.""" + for line in process.stdout: + outfile.write(line) + outfile.flush() + sys.stdout.write(line) + sys.stdout.flush() + + reader_thread = threading.Thread(target=reader) + reader_thread.start() + reader_thread.join(timeout=timeout) + + if reader_thread.is_alive(): + # Timeout occurred + process.terminate() + try: + process.wait(timeout=1) # Give it a second to terminate gracefully + except subprocess.TimeoutExpired: + process.kill() # Force kill if it doesn't respond + + reader_thread.join() # Wait for the reader thread to finish + msg = f"\n--- Subprocess TIMED OUT after {timeout}s{run_id_msg} ---\n" + return_code = 124 + timed_out = True + + else: + # If here, the process completed within the timeout + return_code = process.wait() + timed_out = False + # Determine the final message based on success/failure + if return_code == 0: + msg = ( + f"\n--- Subprocess completed successfully in {time.time() - start_time:.2f}s{run_id_msg} ---\n" + ) + else: + msg = f"\n--- Subprocess failed (Exit Code: {return_code}){run_id_msg} ---\n" + + except Exception as e: # noqa: BLE001 + tb = traceback.format_exc() + msg = f"\n--- An error occurred:\n{e}\n{tb}{run_id_msg} ---\n" + + finally: + outfile.write(msg) + outfile.flush() + sys.stdout.write(msg) + sys.stdout.flush() + + return {"returncode": return_code, "timed_out": timed_out} + + +def display_scrolling_subprocess( # noqa: PLR0913,PLR0915 + cmd_list: list[str], + timeout: int, + stdouterr_path: Path = Path("stdouterr.log"), + env: dict[str, str] | None = None, + window_height: int = 6, + run_id: str | None = None, + collapse_on_success: bool = True, +) -> dict[str, Any]: + """ + Runs the given shell command in a subprocess, streaming combined stdout and stderr + both to a log file (stdouterr_path) and to a live scrolling window in the terminal. + The process output is displayed in a limited-height ("window_height") panel that updates live. + + If the process runs longer than 'timeout' seconds, it is terminated and the function + returns with a special timeout code. + + Args: + cmd_list (list[str]): Command and arguments to execute. + timeout (int): Timeout in seconds. + stdouterr_path (Path): Log file path to write stdout/stderr. + env (dict[str, str] | None): Environment variables for the subprocess. + window_height (int): Number of output lines to display in the live panel. + run_id (str | None): Optional run ID to identify the run. + collapse_on_success (bool): If True, collapse panel after successful completion. + + Returns: + dict: { + "returncode": int (the exit code, or 124 if timed out), + "timed_out": bool (True if timeout occurred) + } + """ + output_buffer = deque(maxlen=window_height) + return_code = 0 + timed_out = False + msg = "" + run_id_msg = f" for run ID: {run_id}" if run_id else "" + + with ( + Live(auto_refresh=False, vertical_overflow="visible") as live, + open(stdouterr_path, "w") as outfile, + ): + start_time = time.time() + final_panel = None + try: + process = subprocess.Popen( # noqa: S603 + cmd_list, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + env=env, + text=True, + bufsize=1, + universal_newlines=True, + ) + + def reader() -> None: + """Reads process output line by line and updates both the file and live display.""" + last_line_not_blank = True + for line in process.stdout: + outfile.write(line) + outfile.flush() + # Filter out chars that might break the scrolling live window before adding to the buffer. + line = line.translate(_control_chars).strip() # noqa: PLW2901 + # Do not allow multiple blank lines, it's a waste of already limited space. + if line or last_line_not_blank: + output_buffer.append(line) + display_text = Text("\n".join(output_buffer), no_wrap=True) + panel = Panel( + display_text, + title=f"[bold blue]Subprocess Output{run_id_msg}, elapsed time: {time.time() - start_time:.2f}s[/]", + border_style="green", + height=window_height + 2, # +2 for top/bottom borders + ) + live.update(panel) + live.refresh() + last_line_not_blank = True if line else False # noqa: SIM210 + + reader_thread = threading.Thread(target=reader) + reader_thread.start() + reader_thread.join(timeout=timeout) + + if reader_thread.is_alive(): + # Timeout occurred + process.terminate() + try: + process.wait(timeout=1) # Give it a second to terminate gracefully + except subprocess.TimeoutExpired: + process.kill() # Force kill if it doesn't respond + + reader_thread.join() # Wait for the reader thread to finish + msg = f"Subprocess TIMED OUT after {timeout}s{run_id_msg}" + final_panel = Panel( + Text("\n".join(output_buffer), no_wrap=True), + title=f"[bold red]{msg}[/]", + border_style="red", + height=window_height + 2, + ) + timed_out = True + return_code = 124 + + else: + # If here, the process completed within the timeout + return_code = process.wait() + runtime = time.time() - start_time + timed_out = False + + # Determine the final state of the panel based on success/failure + if return_code == 0: + msg = f"Subprocess completed successfully in {runtime:.2f}s{run_id_msg}" + if collapse_on_success: + final_panel = Panel( + Text(msg), + title=f"[bold blue]{msg}[/]", + border_style="green", + height=3, # A smaller height for the collapsed view + ) + else: + final_panel = Panel( + Text("\n".join(output_buffer), no_wrap=True), + title=f"[bold blue]{msg}[/]", + border_style="green", + height=window_height + 2, + ) + else: + msg = f"Subprocess failed (Exit Code: {return_code}){run_id_msg}" + final_panel = Panel( + Text("\n".join(output_buffer), no_wrap=True), + title=f"[bold red]{msg}[/]", + border_style="red", + height=window_height + 2, + ) + + except Exception as e: # noqa: BLE001 + tb = traceback.format_exc() + msg = f"An error occurred:\n{e}\n{tb}{run_id_msg}" + final_panel = Panel(f"[bold red]{msg}[/]", title="[bold red]Error[/]") + + finally: + live.update(final_panel) + live.refresh() + outfile.write(f"\n--- {msg} ---\n") + outfile.flush() + + return {"returncode": return_code, "timed_out": timed_out} diff --git a/benchmarking/runner/ray_cluster.py b/benchmarking/runner/ray_cluster.py new file mode 100644 index 000000000..561456d11 --- /dev/null +++ b/benchmarking/runner/ray_cluster.py @@ -0,0 +1,232 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import shutil +import subprocess +import time +import uuid +from pathlib import Path + +from loguru import logger + +from nemo_curator.core.client import RayClient + +ray_client_start_timeout_s = 30 +ray_client_start_poll_interval_s = 0.5 + + +def setup_ray_cluster_and_env( + num_cpus: int, + num_gpus: int, + enable_object_spilling: bool, + ray_log_path: Path, +) -> tuple[RayClient, Path, dict[str, str]]: + """Setup Ray cluster and environment variables.""" + ray_client, ray_temp_path = start_ray_head( + num_cpus=num_cpus, + num_gpus=num_gpus, + enable_object_spilling=enable_object_spilling, + ray_log_path=ray_log_path, + ) + verify_ray_responsive(ray_client) + + env = os.environ.copy() + ray_address = f"localhost:{ray_client.ray_port}" + env["RAY_ADDRESS"] = ray_address + # Also set globally so Ray Data executor can find it + os.environ["RAY_ADDRESS"] = ray_address + logger.debug(f"Set RAY_ADDRESS={ray_address}") + + return ray_client, ray_temp_path, env + + +def teardown_ray_cluster_and_env( + ray_client: RayClient, + ray_temp_path: Path, + ray_cluster_path: Path, +) -> None: + """Teardown Ray cluster and environment variables.""" + if ray_client is not None: + stop_ray_head(ray_client, ray_temp_path, ray_cluster_path) + + # Clean up RAY_ADDRESS environment variable immediately after stopping cluster + if "RAY_ADDRESS" in os.environ: + del os.environ["RAY_ADDRESS"] + logger.debug("Cleaned up RAY_ADDRESS environment variable") + + +def start_ray_head( + num_cpus: int, + num_gpus: int, + include_dashboard: bool = True, + enable_object_spilling: bool = False, + ray_log_path: Path | None = None, +) -> tuple[RayClient, Path]: + # Create a short temp dir to avoid Unix socket path length limits + short_temp_path = Path(f"/tmp/ray_{uuid.uuid4().hex[:8]}") # noqa: S108 + short_temp_path.mkdir(parents=True, exist_ok=True) + + # Check environment variables that might interfere + ray_address_env = os.environ.get("RAY_ADDRESS") + if ray_address_env: + logger.warning(f"RAY_ADDRESS already set in environment: {ray_address_env}") + client = RayClient( + ray_temp_dir=str(short_temp_path), + include_dashboard=include_dashboard, + num_gpus=num_gpus, + num_cpus=num_cpus, + enable_object_spilling=enable_object_spilling, + ray_dashboard_host="0.0.0.0", # noqa: S104 + ) + # Redirect Ray startup output to log file if provided, otherwise suppress it + import sys + + if ray_log_path: + with open(ray_log_path, "w") as f: + # Save original stdout/stderr + original_stdout = sys.stdout + original_stderr = sys.stderr + try: + # Redirect to log file + sys.stdout = f + sys.stderr = f + client.start() + finally: + # Restore original stdout/stderr + sys.stdout = original_stdout + sys.stderr = original_stderr + else: + # Suppress Ray startup output by redirecting to devnull + with open(os.devnull, "w") as devnull: + original_stdout = sys.stdout + original_stderr = sys.stderr + try: + sys.stdout = devnull + sys.stderr = devnull + client.start() + finally: + sys.stdout = original_stdout + sys.stderr = original_stderr + # Wait for Ray client to start, no longer than timeout + wait_for_ray_client_start(client, ray_client_start_timeout_s, ray_client_start_poll_interval_s) + logger.debug(f"RayClient started successfully: pid={client.ray_process.pid}, port={client.ray_port}") + + return client, short_temp_path + + +def wait_for_ray_client_start(client: RayClient, timeout_s: int, poll_interval_s: float) -> None: + """Wait for Ray client to start, no longer than timeout.""" + elapsed_s = 0 + while client.ray_process is None and elapsed_s < timeout_s: + if client.ray_process is not None: + break + time.sleep(poll_interval_s) + elapsed_s += poll_interval_s + if client.ray_process is None: + msg = f"Ray client failed to start in {timeout_s} seconds" + raise RuntimeError(msg) + + +def verify_ray_responsive(client: RayClient, timeout_s: int = 15) -> None: + env = os.environ.copy() + env["RAY_ADDRESS"] = f"localhost:{client.ray_port}" + t0 = time.time() + while time.time() - t0 < timeout_s: + try: + subprocess.run( # noqa: S603 + ["ray", "status"], # noqa: S607 + check=True, + env=env, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + except Exception: # noqa: BLE001, PERF203 + time.sleep(1) + else: + return + + msg = "Ray cluster did not become responsive in time" + raise TimeoutError(msg) + + +def _stop_ray_client(client: RayClient) -> None: + """Stop the Ray client and clean up environment variables.""" + try: + client.stop() + # Clean up RAY_ADDRESS environment variable to prevent interference + if "RAY_ADDRESS" in os.environ: + del os.environ["RAY_ADDRESS"] + logger.debug("Cleaned up RAY_ADDRESS environment variable") + except Exception: # noqa: BLE001 + logger.exception("Failed to stop Ray client") + + +def _copy_item_safely(src_path: Path, dst_path: Path) -> None: + """Copy a single file or directory, logging warnings on failure.""" + try: + if src_path.is_dir(): + shutil.copytree(src_path, dst_path, dirs_exist_ok=True) + else: + shutil.copy2(src_path, dst_path) + except Exception as e: # noqa: BLE001 + logger.warning(f"Failed to copy {src_path.name}: {e}") + + +def _copy_session_contents(session_src: Path, session_dst: Path) -> None: + """Copy session directory contents, excluding sockets.""" + session_dst.mkdir(parents=True, exist_ok=True) + + for item in session_src.iterdir(): + if item.name == "sockets": # Skip sockets directory + logger.debug("Skipping sockets directory") + continue + + dst_item = session_dst / item.name + _copy_item_safely(item, dst_item) + + +def _copy_ray_debug_artifacts(short_temp_path: Path, ray_destination_path: Path) -> None: + """Copy Ray debugging artifacts to the specified ray destination directory.""" + + if not short_temp_path.exists(): + return + + # Use the provided ray destination directory directly + ray_destination_path.mkdir(parents=True, exist_ok=True) + + # Copy log files from Ray temp dir + logs_src = short_temp_path / "logs" + if logs_src.exists(): + logs_dst = ray_destination_path / "logs" + shutil.copytree(logs_src, logs_dst, dirs_exist_ok=True, ignore_errors=True) + + # Copy session info but skip sockets directory + session_src = short_temp_path / "session_latest" + if session_src.exists(): + session_dst = ray_destination_path / "session_latest" + _copy_session_contents(session_src, session_dst) + + +def stop_ray_head(client: RayClient, ray_temp_path: Path, ray_destination_path: Path) -> None: + """Stop Ray head node and clean up artifacts.""" + # Stop the Ray client + _stop_ray_client(client) + + # Copy debugging artifacts and clean up temp directory + try: + _copy_ray_debug_artifacts(ray_temp_path, ray_destination_path) + shutil.rmtree(ray_temp_path, ignore_errors=True) + except Exception: # noqa: BLE001 + logger.exception("Failed to copy/remove Ray temp dir") diff --git a/benchmarking/runner/sinks/__init__.py b/benchmarking/runner/sinks/__init__.py new file mode 100644 index 000000000..8b1378917 --- /dev/null +++ b/benchmarking/runner/sinks/__init__.py @@ -0,0 +1 @@ + diff --git a/benchmarking/runner/sinks/gdrive_sink.py b/benchmarking/runner/sinks/gdrive_sink.py new file mode 100644 index 000000000..121d839c0 --- /dev/null +++ b/benchmarking/runner/sinks/gdrive_sink.py @@ -0,0 +1,93 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import tarfile +import traceback +from pathlib import Path +from typing import Any + +# If this sink is not enabled this entire file will not be imported, so these +# dependencies are only needed if the user intends to enable/use this sink. +from loguru import logger +from oauth2client.service_account import ServiceAccountCredentials +from pydrive2.auth import GoogleAuth +from pydrive2.drive import GoogleDrive +from runner.matrix import MatrixConfig, MatrixEntry +from runner.sinks.sink import Sink + + +class GdriveSink(Sink): + def __init__(self, sink_config: dict[str, Any]): + super().__init__(sink_config) + self.sink_config = sink_config + self.enabled = self.sink_config.get("enabled", True) + self.results: list[dict[str, Any]] = [] + self.session_name: str = None + self.matrix_config: MatrixConfig = None + self.env_dict: dict[str, Any] = None + self.drive_folder_id: str = None + self.service_account_file: str = None + + def initialize(self, session_name: str, matrix_config: MatrixConfig, env_dict: dict[str, Any]) -> None: + self.session_name = session_name + self.matrix_config = matrix_config + self.env_dict = env_dict + self.drive_folder_id = self.sink_config.get("drive_folder_id") + if not self.drive_folder_id: + msg = "GdriveSink: No drive folder ID configured" + raise ValueError(msg) + self.service_account_file = self.sink_config.get("service_account_file") + if not self.service_account_file: + msg = "GdriveSink: No service account file configured" + raise ValueError(msg) + + def process_result(self, result_dict: dict[str, Any], matrix_entry: MatrixEntry) -> None: + pass + + def finalize(self) -> None: + if self.enabled: + try: + tar_path = self._tar_results_and_artifacts() + self._upload_to_gdrive(tar_path) + except Exception as e: # noqa: BLE001 + tb = traceback.format_exc() + logger.error(f"GdriveSink: Error uploading to Google Drive: {e}\n{tb}") + finally: + self._delete_tar_file(tar_path) + else: + logger.warning("GdriveSink: Not enabled, skipping post.") + + def _tar_results_and_artifacts(self) -> Path: + results_path = Path(self.matrix_config.results_path) + artifacts_path = Path(self.matrix_config.artifacts_dir) + tar_path = results_path / f"{self.session_name}.tar.gz" + with tarfile.open(tar_path, "w:gz") as tar: + tar.add(results_path, arcname=results_path.name) + tar.add(artifacts_path, arcname=artifacts_path.name) + return tar_path + + def _upload_to_gdrive(self, tar_path: Path) -> None: + gauth = GoogleAuth() + scope = ["https://www.googleapis.com/auth/drive"] + gauth.credentials = ServiceAccountCredentials.from_json_keyfile_name(self.service_account_file, scope) + drive = GoogleDrive(gauth) + + drive_file = drive.CreateFile({"parents": [{"id": self.drive_folder_id}], "title": tar_path.name}) + drive_file.SetContentFile(tar_path) + drive_file.Upload() + return drive_file["alternateLink"] + + def _delete_tar_file(self, tar_path: Path) -> None: + if tar_path.exists(): + tar_path.unlink() diff --git a/benchmarking/runner/sinks/mlflow_sink.py b/benchmarking/runner/sinks/mlflow_sink.py new file mode 100644 index 000000000..ffd634cf9 --- /dev/null +++ b/benchmarking/runner/sinks/mlflow_sink.py @@ -0,0 +1,66 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import traceback +from typing import Any + +from loguru import logger +from runner.matrix import MatrixConfig, MatrixEntry +from runner.sinks.sink import Sink + + +class MlflowSink(Sink): + def __init__(self, sink_config: dict[str, Any]): + super().__init__(sink_config) + self.sink_config = sink_config + self.tracking_uri = sink_config.get("tracking_uri") + if not self.tracking_uri: + msg = "MlflowSink: No tracking URI configured" + raise ValueError(msg) + self.experiment = sink_config.get("experiment") + if not self.experiment: + msg = "MlflowSink: No experiment configured" + raise ValueError(msg) + self.enabled = self.sink_config.get("enabled", True) + self.results: list[dict[str, Any]] = [] + self.session_name: str = None + self.matrix_config: MatrixConfig = None + self.env_dict: dict[str, Any] = None + + def initialize(self, session_name: str, matrix_config: MatrixConfig, env_dict: dict[str, Any]) -> None: + self.session_name = session_name + self.matrix_config = matrix_config + self.env_dict = env_dict + + def process_result(self, result_dict: dict[str, Any], matrix_entry: MatrixEntry) -> None: + # Use the matrix_entry to get any entry-specific settings for the Slack report + # such as additional metrics to include in the report. + if matrix_entry: + additional_metrics = matrix_entry.get_sink_data(self.name).get("additional_metrics", []) + else: + additional_metrics = [] + self.results.append((additional_metrics, result_dict)) + + def finalize(self) -> None: + if self.enabled: + try: + self._push(self.results) + except Exception as e: # noqa: BLE001 + tb = traceback.format_exc() + logger.error(f"MlflowSink: Error posting to Mlflow: {e}\n{tb}") + else: + logger.warning("MlflowSink: Not enabled, skipping post.") + + def _push(self, results: list[dict[str, Any]]) -> None: + pass diff --git a/benchmarking/runner/sinks/sink.py b/benchmarking/runner/sinks/sink.py new file mode 100644 index 000000000..219ae0cc0 --- /dev/null +++ b/benchmarking/runner/sinks/sink.py @@ -0,0 +1,57 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from abc import ABC, abstractmethod +from typing import Any + +from runner.matrix import MatrixConfig, MatrixEntry + + +class Sink(ABC): + """Abstract base class for benchmark result sinks.""" + + @abstractmethod + def __init__(self, sink_config: dict[str, Any]): + """Initialize the sink with configuration. + + Args: + sink_config: Configuration dictionary for the sink. + """ + + @abstractmethod + def initialize( + self, + session_name: str, + matrix_config: MatrixConfig, + env_dict: dict[str, Any], + ) -> None: + """Initialize the sink for a benchmark session. + + Args: + session_name: Name of the benchmark session. + matrix_config: Matrix configuration for the session. + env_dict: Environment dictionary for the session. + """ + + @abstractmethod + def process_result(self, result_dict: dict[str, Any], matrix_entry: MatrixEntry) -> None: + """Process an individual benchmark result. + + Args: + result_dict: Dictionary containing benchmark result data. + matrix_entry: Matrix entry configuration. + """ + + @abstractmethod + def finalize(self) -> None: + """Finalize the sink after all results have been processed.""" diff --git a/benchmarking/runner/sinks/slack_sink.py b/benchmarking/runner/sinks/slack_sink.py new file mode 100644 index 000000000..fac174dc7 --- /dev/null +++ b/benchmarking/runner/sinks/slack_sink.py @@ -0,0 +1,290 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import re +import traceback +from collections.abc import Generator +from typing import Any + +import requests +from loguru import logger +from runner.matrix import MatrixConfig, MatrixEntry +from runner.sinks.sink import Sink +from runner.utils import find_result, get_obj_for_json + +_post_template = """ +{ + "username": "Curator Benchmark Runner", + "icon_emoji": ":robot_face:", + "blocks": [ + { + "type": "header", + "text": { + "type": "plain_text", + "text": "Curator Benchmark Summary" + } + }, + { + "type": "section", + "text": { + "type": "mrkdwn", + "text": "$EXECUTIVE_SUMMARY" + } + }, + { + "type": "divider" + }, + $REPORT_JSON_TEXT, + { + "type": "actions", + "elements": [ + { + "type": "button", + "text": { + "type": "plain_text", + "text": "Logs" + }, + "url": "$GOOGLE_DRIVE_LINK" + } + ] + } + ] +} +""" +_blank_row = [ + {"type": "rich_text", "elements": [{"type": "rich_text_section", "elements": [{"type": "text", "text": " "}]}]}, + {"type": "rich_text", "elements": [{"type": "rich_text_section", "elements": [{"type": "text", "text": " "}]}]}, +] + + +class SlackSink(Sink): + name: str = "slack" + + def __init__(self, sink_config: dict[str, Any]): + super().__init__(sink_config) + self.sink_config = sink_config + self.enabled = self.sink_config.get("enabled", True) + self.session_name: str = None + self.matrix_config: MatrixConfig = None + self.env_dict: dict[str, Any] = None + + self.results_to_report: list[tuple[list[str], dict[str, Any]]] = [] # list of tuples of (metrics, result_dict) + self.webhook_url = sink_config.get("webhook_url") + if not self.webhook_url: + msg = "SlackSink: No webhook URL configured" + raise ValueError(msg) + self.default_metrics = sink_config.get("default_metrics", []) + if not self.default_metrics: + msg = "SlackSink: No default metrics configured" + raise ValueError(msg) + + def initialize(self, session_name: str, matrix_config: MatrixConfig, env_dict: dict[str, Any]) -> None: + # Initializes the sink for the session. + self.session_name = session_name + self.env_dict = env_dict + self.matrix_config = matrix_config + + def process_result(self, result_dict: dict[str, Any], matrix_entry: MatrixEntry) -> None: + # Use the matrix_entry to get any entry-specific settings for the Slack report + # such as additional metrics to include in the report. + if matrix_entry: + additional_metrics = matrix_entry.get_sink_data(self.name).get("additional_metrics", []) + else: + additional_metrics = [] + # Queues the individual result for posting as a final report during finalize. + self.results_to_report.append((self.default_metrics + additional_metrics, result_dict)) + + def finalize(self) -> None: + # Posts the queued results to slack as a final report. + if self.enabled: + try: + self._post() + except Exception as e: # noqa: BLE001 + # Optionally, log or handle posting errors + tb = traceback.format_exc() + logger.error(f"SlackSink: Error posting to Slack: {e}\n{tb}") + else: + logger.warning("SlackSink: Not enabled, skipping post.") + + def _post(self) -> None: # noqa: C901 + message_text_values = { + "REPORT_JSON_TEXT": "REPORT_JSON_TEXT", + "GOOGLE_DRIVE_LINK": "https://google.com", + "EXECUTIVE_SUMMARY": " ", + } + + # Create REPORT_JSON_TEXT: Build the report data as a Python data structure which maps to JSON, + # then call json.dumps() to convert to a string. + report_data = [] + table_dict = {"type": "table", "rows": []} + rows = [] + # Summary rows - list overall status, each individual entry and its success status + overall_status = ( + "✅ success" + if all(find_result(results, "success") for _, results in self.results_to_report) + else "❌ one or more FAILED" + ) + rows.append(self._two_column_row_bold("OVERALL STATUS", overall_status)) + for _, results in self.results_to_report: + # Name and success icon row + entry_name = find_result(results, "name") + success_str = "✅ success" if find_result(results, "success") else "❌ FAILED" + rows.append(self._two_column_row_bold(entry_name, success_str)) + + rows.append(_blank_row) + + # Environment header row + rows.append(self._two_column_row_bold("ENVIRONMENT", " ")) + # Environment rows + for var, val in self.env_dict.items(): + if var in {"pip_freeze_txt", "conda_explicit_txt"}: + continue + rows.append(self._two_column_row(str(var), str(val))) + + rows.append(_blank_row) + # Results header row + rows.append(self._two_column_row_bold("RESULTS", " ")) + # Results rows + for metrics, results in self.results_to_report: + # Name and success icon row + entry_name = find_result(results, "name") + success_str = "✅ success" if find_result(results, "success") else "❌ FAILED" + rows.append(self._two_column_row_bold(entry_name, success_str)) + + # Remaining rows are metrics and values + data = [] + for metric in metrics: + data.append((metric, find_result(results, metric, 0))) + + # Requirements checks - add a row for each requirement that was not met + if "requirements_not_met" in results: + all_requirements_met = True + for metric_name, reason_not_met in results["requirements_not_met"].items(): + data.append((f"Requirement for {metric_name} was not met", f"{reason_not_met}")) + all_requirements_met = False + if all_requirements_met: + data.append(("All requirements met", "✅")) + else: + data.append(("All requirements met", "❌")) + + for var, val in data: + rows.append(self._two_column_row(str(var), str(val))) + # Add a blank row between entry results + rows.append(_blank_row) + + # Remove the last blank row added in the loop above + if len(self.results_to_report) > 0: + rows.pop(-1) + + table_dict["rows"] = rows + report_data.append(table_dict) + # Add a comma to separate each item to be added to the "blocks" array in the template. + message_text_values["REPORT_JSON_TEXT"] = ",".join( + [json.dumps(get_obj_for_json(item), indent=2, sort_keys=True) for item in report_data] + ) + + payload = self.substitute_template_placeholders(_post_template, message_text_values).strip() + response = requests.post( + self.webhook_url, + data=payload, + headers={"Content-Type": "application/json"}, + timeout=100, + ) + if not response.ok: + logger.error(f"SlackSink: Failed to send Slack message (status={response.status_code}): {response.text}") + + @staticmethod + def substitute_template_placeholders(template_str: str, values: dict[str, str]) -> str: + """ + Substitute variables in template_str of the form $VAR with values from the dictionary { "VAR": ... }. + The variables to substitute are those in _post_template above, and must occur as $VAR in the string. + """ + + def replacer(match: re.Match[str]) -> str: + var_with_dollar = match.group(0) + varname = var_with_dollar[1:] # strip initial $ + return str(values.get(varname, var_with_dollar)) + + # Substitute variables matching $VAR + return re.sub(r"\$[A-Za-z0-9_]+", replacer, template_str) + + @staticmethod + def _two_column_row(left_text: str, right_text: str) -> list[dict[str, Any]]: + return [ + { + "type": "rich_text", + "elements": [{"type": "rich_text_section", "elements": [{"type": "text", "text": left_text}]}], + }, + { + "type": "rich_text", + "elements": [{"type": "rich_text_section", "elements": [{"type": "text", "text": right_text}]}], + }, + ] + + @staticmethod + def _two_column_row_bold(left_text: str, right_text: str) -> list[dict[str, Any]]: + return [ + { + "type": "rich_text", + "elements": [ + { + "type": "rich_text_section", + "elements": [{"type": "text", "text": left_text, "style": {"bold": True}}], + } + ], + }, + { + "type": "rich_text", + "elements": [{"type": "rich_text_section", "elements": [{"type": "text", "text": right_text}]}], + }, + ] + + +# Run SlackSink from the command line to post a summary of the results to Slack. +if __name__ == "__main__": + import argparse + from pathlib import Path + + parser = argparse.ArgumentParser(description="Post benchmark results to Slack via webhook.") + parser.add_argument("--webhook-url", help="Slack webhook URL") + parser.add_argument("--results-root-dir", help="Path to the directory containing result subdirectories") + parser.add_argument("--additional-metrics", help="Additional metrics to include in the report", nargs="+") + args = parser.parse_args() + + webhook_url = args.webhook_url + results_root_path = Path(args.results_root_dir) + + def collect_results_from_dir(results_root_path: Path) -> Generator[dict[str, Any], None, None]: + """Generator: yields dicts loaded from results.json files in subdirectories.""" + for subdir in results_root_path.iterdir(): + if (subdir / "results.json").exists(): + results_json_path = subdir / "results.json" + with open(results_json_path) as f: + yield json.load(f) + + sink_config = {"webhook_url": webhook_url, "default_metrics": ["exec_time_s"]} + matrix_config = MatrixConfig(results_path=results_root_path, artifacts_path=results_root_path) + env_json_path = results_root_path / "env.json" + with open(env_json_path) as f: + env_data = json.load(f) + + slack_sink = SlackSink(sink_config=sink_config) + slack_sink.initialize(session_name="test", matrix_config=matrix_config, env_dict=env_data) + + matrix_entry = MatrixEntry( + name="test", sink_data=[{"name": "slack", "additional_metrics": args.additional_metrics}] + ) + for result in collect_results_from_dir(results_root_path): + slack_sink.process_result(result_dict=result, matrix_entry=matrix_entry) + slack_sink.finalize() diff --git a/benchmarking/runner/utils.py b/benchmarking/runner/utils.py new file mode 100644 index 000000000..2b2737b89 --- /dev/null +++ b/benchmarking/runner/utils.py @@ -0,0 +1,80 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import re +from typing import Any + + +def get_obj_for_json(obj: object) -> str | int | float | bool | list | dict: + """ + Recursively convert objects to Python primitives for JSON serialization. + Useful for objects like Path, sets, bytes, etc. + """ + if isinstance(obj, dict): + retval = {get_obj_for_json(k): get_obj_for_json(v) for k, v in obj.items()} + elif isinstance(obj, (list, tuple, set)): + retval = [get_obj_for_json(item) for item in obj] + elif hasattr(obj, "as_posix"): # Path objects + retval = obj.as_posix() + elif isinstance(obj, bytes): + retval = obj.decode("utf-8", errors="replace") + elif hasattr(obj, "to_json") and callable(obj.to_json): + retval = obj.to_json() + elif hasattr(obj, "__dict__"): + retval = get_obj_for_json(vars(obj)) + elif obj is None: + retval = "null" + elif isinstance(obj, str) and len(obj) == 0: # special case for Slack, empty strings not allowed + retval = " " + else: + retval = obj + return retval + + +_env_var_pattern = re.compile(r"\$\{([^}]+)\}") # Pattern to match ${VAR_NAME} + + +def _replace_env_var(match: re.Match[str]) -> str: + env_var_name = match.group(1) + env_value = os.getenv(env_var_name) + if env_value is not None and env_value != "": + return env_value + else: + msg = f"Environment variable {env_var_name} not found in the environment or is empty" + raise ValueError(msg) + + +def resolve_env_vars(data: dict | list | str | object) -> dict | list | str | object: + """Recursively resolve environment variables in strings in/from various objects. + + Environment variables are identified in strings when specified using the ${VAR_NAME} + syntax. If the environment variable is not found, ValueError is raised. + """ + if isinstance(data, dict): + return {key: resolve_env_vars(value) for key, value in data.items()} + elif isinstance(data, list): + return [resolve_env_vars(item) for item in data] + elif isinstance(data, str): + return _env_var_pattern.sub(_replace_env_var, data) + else: + return data + + +def find_result(results: dict[str, Any], key: str, default_value: Any = None) -> Any: # noqa: ANN401 + """Find a value in the results dictionary by key, checking both the metrics sub-dict and then the results itself.""" + if "metrics" in results: + return results["metrics"].get(key, results.get(key, default_value)) + else: + return results.get(key, default_value) diff --git a/benchmarking/scripts/__init__.py b/benchmarking/scripts/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/benchmarking/scripts/common_crawl_benchmark.py b/benchmarking/scripts/common_crawl_benchmark.py new file mode 100755 index 000000000..14fca5ead --- /dev/null +++ b/benchmarking/scripts/common_crawl_benchmark.py @@ -0,0 +1,205 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Common Crawl download+extract benchmark for nightly benchmarking. + +Runs the text Common Crawl pipeline and writes params/metrics/tasks to the +benchmark results directory, compatible with the nightly driver. +""" + +import argparse +import json +import os +import pickle +import time +from pathlib import Path +from typing import Literal + +from loguru import logger + +from nemo_curator.pipeline.pipeline import Pipeline +from nemo_curator.stages.text.download.common_crawl.stage import CommonCrawlDownloadExtractStage +from nemo_curator.stages.text.io.writer import JsonlWriter, ParquetWriter +from nemo_curator.tasks.tasks import _EmptyTask + + +def create_common_crawl_pipeline( # noqa: PLR0913 + download_dir: Path, + output_dir: Path, + output_format: Literal["parquet", "jsonl"], + crawl_type: Literal["main", "news"], + start_snapshot: str, + end_snapshot: str, + html_extraction_algorithm: str = "justext", + use_aws_to_download: bool = False, + verbose: bool = False, + url_limit: int | None = None, + record_limit: int | None = None, + add_filename_column: bool = False, + ray_data_cast_as_actor: bool = False, +) -> Pipeline: + if ray_data_cast_as_actor: + os.environ["CAST_AS_ACTOR"] = "true" + + pipeline = Pipeline(name="common_crawl_processing", description="Download and process Common Crawl data") + + pipeline.add_stage( + CommonCrawlDownloadExtractStage( + start_snapshot=start_snapshot, + end_snapshot=end_snapshot, + download_dir=str(download_dir), + crawl_type=crawl_type, + html_extraction=html_extraction_algorithm, + use_aws_to_download=use_aws_to_download, + verbose=verbose, + url_limit=url_limit, + record_limit=record_limit, + add_filename_column=add_filename_column, + ) + ) + + if output_format == "jsonl": + writer = JsonlWriter(path=str(output_dir)) + elif output_format == "parquet": + writer = ParquetWriter(path=str(output_dir)) + else: + msg = f"Invalid output format: {output_format}" + raise ValueError(msg) + + pipeline.add_stage(writer) + + return pipeline + + +def run_benchmark(args: argparse.Namespace) -> dict: + download_dir = Path(args.download_path).resolve() + download_dir.mkdir(exist_ok=True, parents=True) + + output_dir = Path(args.output_path).resolve() + output_dir.mkdir(exist_ok=True, parents=True) + + pipeline = create_common_crawl_pipeline( + download_dir=download_dir, + output_dir=output_dir, + output_format=args.output_format, + crawl_type=args.crawl_type, + start_snapshot=args.start_snapshot, + end_snapshot=args.end_snapshot, + html_extraction_algorithm=args.html_extraction, + use_aws_to_download=args.aws, + verbose=args.verbose, + url_limit=args.url_limit, + record_limit=args.record_limit, + add_filename_column=args.add_filename_column, + ray_data_cast_as_actor=args.ray_data_cast_as_actor, + ) + + if args.executor == "xenna": + from nemo_curator.backends.xenna.executor import XennaExecutor + + executor = XennaExecutor() + elif args.executor == "ray_data": + from nemo_curator.backends.experimental.ray_data.executor import RayDataExecutor + + executor = RayDataExecutor() + elif args.executor == "ray_actors": + from nemo_curator.backends.experimental.ray_actor_pool.executor import RayActorPoolExecutor + + executor = RayActorPoolExecutor() + else: + msg = f"Invalid executor type: {args.executor}" + raise ValueError(msg) + + initial_task = _EmptyTask(task_id="common_crawl_task", dataset_name="common_crawl", data=None) + + logger.info("Starting Common Crawl pipeline execution...") + start = time.perf_counter() + try: + results = pipeline.run(executor, initial_tasks=[initial_task]) + success = True + except Exception as e: # noqa: BLE001 + logger.error(f"Pipeline failed: {e}") + results = [] + success = False + elapsed = time.perf_counter() - start + + total_documents = sum(task.num_items for task in results) if results else 0 + + return { + "params": { + "download_path": str(download_dir), + "output_path": str(output_dir), + "output_format": args.output_format, + "crawl_type": args.crawl_type, + "start_snapshot": args.start_snapshot, + "end_snapshot": args.end_snapshot, + "html_extraction": args.html_extraction, + "aws": args.aws, + "verbose": args.verbose, + "url_limit": args.url_limit, + "record_limit": args.record_limit, + "add_filename_column": args.add_filename_column, + "ray_data_cast_as_actor": args.ray_data_cast_as_actor, + "executor": args.executor, + }, + "metrics": { + "is_success": success, + "time_taken_s": elapsed, + "num_output_tasks": len(results) if results else 0, + "total_documents": total_documents, + }, + "tasks": results or [], + } + + +def write_results(benchmark_results_path: str, results: dict) -> None: + out = Path(benchmark_results_path) + out.mkdir(parents=True, exist_ok=True) + with open(out / "params.json", "w") as f: + json.dump(results["params"], f, indent=2) + with open(out / "metrics.json", "w") as f: + json.dump(results["metrics"], f, indent=2) + with open(out / "tasks.pkl", "wb") as f: + pickle.dump(results["tasks"], f) + + +def main() -> int: + p = argparse.ArgumentParser(description="Common Crawl download/extract benchmark") + # Contract arg for nightly driver + p.add_argument("--benchmark-results-path", required=True, help="Directory to write benchmark artifacts") + # Pipeline configuration + p.add_argument("--download_path", type=str, default="./common_crawl_downloads") + p.add_argument("--output_path", type=str, default="./common_crawl_output") + p.add_argument("--output_format", type=str, default="parquet", choices=["parquet", "jsonl"]) + p.add_argument("--crawl_type", type=str, default="main", choices=["main", "news"]) + p.add_argument("--start_snapshot", type=str, default="2023-01") + p.add_argument("--end_snapshot", type=str, default="2023-10") + p.add_argument("--html_extraction", type=str, default="justext", choices=["justext", "resiliparse", "trafilatura"]) + p.add_argument("--aws", action="store_true") + p.add_argument("--verbose", action="store_true") + p.add_argument("--url_limit", type=int, default=5) + p.add_argument("--record_limit", type=int, default=None) + p.add_argument("--add_filename_column", action="store_true") + # Executor selection + p.add_argument("--executor", type=str, default="xenna", choices=["xenna", "ray_data", "ray_actors"]) + p.add_argument("--ray_data_cast_as_actor", action="store_true") + + args = p.parse_args() + results = run_benchmark(args) + write_results(args.benchmark_results_path, results) + return 0 if results["metrics"]["is_success"] else 1 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/benchmarking/scripts/domain_classification_benchmark.py b/benchmarking/scripts/domain_classification_benchmark.py new file mode 100755 index 000000000..76a17a728 --- /dev/null +++ b/benchmarking/scripts/domain_classification_benchmark.py @@ -0,0 +1,210 @@ +#!/usr/bin/env python3 +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Domain classification benchmarking script. + +This script runs domain classification benchmarks with comprehensive metrics collection +using various executors and logs results to configured sinks. +""" +# ruff: noqa: ERA001 + +import argparse +import json +import pickle +import time +import traceback +from pathlib import Path +from typing import Any + +from loguru import logger + +from nemo_curator.backends.experimental.ray_data import RayDataExecutor +from nemo_curator.backends.xenna import XennaExecutor +from nemo_curator.pipeline import Pipeline +from nemo_curator.stages.text.classifiers import DomainClassifier +from nemo_curator.stages.text.io.reader import ParquetReader +from nemo_curator.stages.text.io.writer import ParquetWriter +from nemo_curator.utils.file_utils import get_all_file_paths_and_size_under + +_executor_map = {"ray_data": RayDataExecutor, "xenna": XennaExecutor} + + +def run_domain_classification_benchmark( # noqa: PLR0913 + input_path: Path, + output_path: Path, + executor_name: str, + dataset_size_gb: float, + model_inference_batch_size: int, + benchmark_results_path: Path, +) -> dict[str, Any]: + """Run the domain classification benchmark and collect comprehensive metrics.""" + + # Setup executor + try: + executor = _executor_map[executor_name]() + except KeyError: + msg = f"Executor {executor_name} not supported" + raise ValueError(msg) from None + + # Ensure output directory + output_path = output_path.absolute() + output_path.mkdir(parents=True, exist_ok=True) + + logger.info(f"Input path: {input_path}") + logger.info(f"Output path: {output_path}") + logger.info(f"Dataset size: {dataset_size_gb} GB") + logger.info(f"Batch size: {model_inference_batch_size}") + logger.debug(f"Executor: {executor}") + + run_start_time = time.perf_counter() + + try: + logger.info("Running domain classification pipeline...") + + input_files = load_dataset_files(input_path, dataset_size_gb) + + executor = RayDataExecutor() if executor_name == "ray_data" else XennaExecutor() + + pipeline = Pipeline( + name="domain_classification_pipeline", + stages=[ + ParquetReader(file_paths=input_files, files_per_partition=1, fields=["text"], _generate_ids=False), + DomainClassifier( + text_field="text", + model_inference_batch_size=model_inference_batch_size, + ), + ParquetWriter(path=str(output_path), fields=["domain_pred"]), + ], + ) + output_tasks = pipeline.run(executor) + run_time_taken = time.perf_counter() - run_start_time + + # task._metadata is a dictionary of metadata for the task, but will not be used here. + # Instead simply use the num_items property of the task to get the number of documents processed. + # TODO: can we get the number of domains classified? + num_documents_processed = sum(task.num_items for task in output_tasks) + # num_domains_classified = 0 + + run_time_taken = time.perf_counter() - run_start_time + logger.success(f"Benchmark completed in {run_time_taken:.2f}s") + logger.success(f"Processed {num_documents_processed} documents") + success = True + + except Exception as e: # noqa: BLE001 + error_traceback = traceback.format_exc() + logger.error(f"Benchmark failed: {e}") + logger.debug(f"Full traceback:\n{error_traceback}") + output_tasks = [] + run_time_taken = time.perf_counter() - run_start_time + num_documents_processed = 0 + # num_domains_classified = 0 + success = False + + return { + "params": { + "executor": executor_name, + "input_path": str(input_path), + "output_path": str(output_path), + "dataset_size_gb": dataset_size_gb, + "model_inference_batch_size": model_inference_batch_size, + "benchmark_results_path": str(benchmark_results_path), + }, + "metrics": { + "is_success": success, + "time_taken": run_time_taken, + "num_documents_processed": num_documents_processed, + # "num_domains_classified": num_domains_classified, + "num_output_tasks": len(output_tasks), + "throughput_docs_per_sec": num_documents_processed / run_time_taken if run_time_taken > 0 else 0, + }, + "tasks": output_tasks, + } + + +def write_results(results: dict[str, Any], output_path: Path) -> None: + """Write results to files required by the benchmarking framework at the given path.""" + output_path.mkdir(parents=True, exist_ok=True) + (output_path / "params.json").write_text(json.dumps(results["params"], indent=2)) + (output_path / "metrics.json").write_text(json.dumps(results["metrics"], indent=2)) + (output_path / "tasks.pkl").write_bytes(pickle.dumps(results["tasks"])) + + +def load_dataset_files(dataset_path: Path, dataset_size_gb: float) -> list[str]: + """Load the dataset files at the given path and return a subset of the files whose combined size is approximately the given size in GB.""" + input_files = get_all_file_paths_and_size_under( + dataset_path, recurse_subdirectories=True, keep_extensions="parquet" + ) + desired_size_bytes = (1024**3) * dataset_size_gb + total_size = 0 + subset_files = [] + for file, size in input_files: + if size + total_size > desired_size_bytes: + break + else: + subset_files.append(file) + total_size += size + + return subset_files + + +def main() -> int: + parser = argparse.ArgumentParser(description="Domain classification benchmark") + # Paths + parser.add_argument("--benchmark-results-path", type=Path, required=True, help="Path to benchmark results") + parser.add_argument("--input-path", required=True, type=Path, help="Path to input data") + parser.add_argument( + "--output-path", default=Path("./domain_classification_output"), type=Path, help="Output directory for results" + ) + # Executor + parser.add_argument("--executor", default="ray_data", choices=["xenna", "ray_data"], help="Executor to use") + # Pipeline Specific + parser.add_argument("--dataset-size-gb", type=float, required=True, help="Size of dataset to process in GB") + parser.add_argument("--model-inference-batch-size", type=int, default=1024, help="Batch size for model inference") + + args = parser.parse_args() + + logger.info("=== Domain Classification Benchmark Starting ===") + logger.info(f"Arguments: {vars(args)}") + + try: + results = run_domain_classification_benchmark( + input_path=args.input_path, + output_path=args.output_path, + executor_name=args.executor, + dataset_size_gb=args.dataset_size_gb, + model_inference_batch_size=args.model_inference_batch_size, + benchmark_results_path=args.benchmark_results_path, + ) + + except Exception as e: # noqa: BLE001 + error_traceback = traceback.format_exc() + print(f"Benchmark failed: {e}") + logger.debug(f"Full traceback:\n{error_traceback}") + results = { + "params": vars(args), + "metrics": { + "is_success": False, + }, + "tasks": [], + } + finally: + write_results(results, args.benchmark_results_path) + + # Return proper exit code based on success + return 0 if results["metrics"]["is_success"] else 1 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/benchmarking/scripts/dummy_benchmark.py b/benchmarking/scripts/dummy_benchmark.py new file mode 100755 index 000000000..28d7e71e0 --- /dev/null +++ b/benchmarking/scripts/dummy_benchmark.py @@ -0,0 +1,127 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Dummy benchmarking script for testing the benchmarking framework. + +This script runs a dummy benchmark with comprehensive metrics collection +and logs results to configured sinks. +""" + +import argparse +import json +import os +import pickle +import random +import time +from pathlib import Path +from typing import Any + +from loguru import logger + + +def run_demo_benchmark( + input_path: str, + output_path: str, + benchmark_results_path: str, +) -> dict[str, Any]: + """Run the demo benchmark and collect comprehensive metrics.""" + + # Ensure output directory + Path(output_path).mkdir(parents=True, exist_ok=True) + + logger.info("Starting dummy benchmark") + run_start_time = time.perf_counter() + + try: + for _ in range(11): + print("Output: " + "*" * random.randint(1, 100), flush=True) # noqa: S311 + time.sleep(1) + output_tasks = [] + run_time_taken = time.perf_counter() - run_start_time + num_removed = 0 + logger.success(f"Benchmark completed in {run_time_taken:.2f}s") + success = True + except Exception as e: # noqa: BLE001 + logger.error(f"Benchmark failed: {e}") + output_tasks = [] + success = False + + return { + "params": { + "input_path": input_path, + "output_path": output_path, + "benchmark_results_path": benchmark_results_path, + }, + "metrics": { + "is_success": success, + "time_taken": run_time_taken, + "num_removed": num_removed, + "num_output_tasks": len(output_tasks), + }, + "tasks": output_tasks, + } + + +# TODO: since the benchmarking framework depends on these files being present for all benchmark scripts, +# the framework should provide a utility (essentially, this function) to ensure they are written correctly +# with the correct names, paths, etc. +def write_results(results: dict, output_path: str | None = None) -> None: + """Write results to a file or stdout.""" + Path(output_path).mkdir(parents=True, exist_ok=True) + with open(os.path.join(output_path, "params.json"), "w") as f: + json.dump(results["params"], f, indent=2) + with open(os.path.join(output_path, "metrics.json"), "w") as f: + json.dump(results["metrics"], f, indent=2) + with open(os.path.join(output_path, "tasks.pkl"), "wb") as f: + pickle.dump(results["tasks"], f) + + +def main() -> int: + parser = argparse.ArgumentParser(description="Demo benchmark for nightly benchmarking") + # Paths + parser.add_argument("--input-path", required=True, help="Path to input data") + parser.add_argument("--output-path", required=True, help="Output directory for results") + # TODO: the framework will always add this! Look into if this policy should be removed. + parser.add_argument("--benchmark-results-path", required=True, help="Path to benchmark results") + + args = parser.parse_args() + + logger.info("=== Dummy Benchmark Starting ===") + logger.info(f"Arguments: {vars(args)}") + + try: + results = run_demo_benchmark( + input_path=args.input_path, + output_path=args.output_path, + benchmark_results_path=args.benchmark_results_path, + ) + + except Exception as e: # noqa: BLE001 + print(f"Benchmark failed: {e}") + results = { + "params": vars(args), + "metrics": { + "is_success": False, + }, + "tasks": [], + } + finally: + write_results(results, args.benchmark_results_path) + + # Return proper exit code based on success + return 0 if results["metrics"]["is_success"] else 1 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/benchmarking/scripts/embedding_generation_benchmark.py b/benchmarking/scripts/embedding_generation_benchmark.py new file mode 100755 index 000000000..3b2cc6635 --- /dev/null +++ b/benchmarking/scripts/embedding_generation_benchmark.py @@ -0,0 +1,228 @@ +#!/usr/bin/env python3 +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Embedding generation benchmarking script. + +This script runs embedding generation benchmarks with comprehensive metrics collection +using various executors and logs results to configured sinks. +""" +# ruff: noqa: ERA001 + +import argparse +import json +import pickle +import time +import traceback +from pathlib import Path +from typing import Any + +from loguru import logger + +from nemo_curator.backends.experimental.ray_data import RayDataExecutor +from nemo_curator.backends.xenna import XennaExecutor +from nemo_curator.pipeline import Pipeline +from nemo_curator.stages.text.embedders import EmbeddingCreatorStage +from nemo_curator.stages.text.io.reader import ParquetReader +from nemo_curator.stages.text.io.writer import ParquetWriter +from nemo_curator.utils.file_utils import get_all_file_paths_and_size_under + +_executor_map = {"ray_data": RayDataExecutor, "xenna": XennaExecutor} + + +def run_embedding_generation_benchmark( # noqa: PLR0913 + input_path: Path, + output_path: Path, + executor_name: str, + dataset_size_gb: float, + model_identifier: str, + model_inference_batch_size: int, + benchmark_results_path: Path, +) -> dict[str, Any]: + """Run the embedding generation benchmark and collect comprehensive metrics.""" + + # Setup executor + try: + executor = _executor_map[executor_name]() + except KeyError: + msg = f"Executor {executor_name} not supported" + raise ValueError(msg) from None + + # Ensure output directory + output_path = output_path.absolute() + output_path.mkdir(parents=True, exist_ok=True) + + logger.info("Starting embedding generation benchmark") + logger.info(f"Input path: {input_path}") + logger.info(f"Output path: {output_path}") + logger.info(f"Dataset size: {dataset_size_gb} GB") + logger.info(f"Model: {model_identifier}") + logger.info(f"Batch size: {model_inference_batch_size}") + logger.debug(f"Executor: {executor}") + + run_start_time = time.perf_counter() + + try: + logger.info("Running embedding generation pipeline...") + + input_files = load_dataset_files(input_path, dataset_size_gb) + + executor = RayDataExecutor() if executor_name == "ray_data" else XennaExecutor() + + pipeline = Pipeline( + name="embedding_generation_pipeline", + stages=[ + ParquetReader(file_paths=input_files, files_per_partition=1, fields=["text"], _generate_ids=False), + EmbeddingCreatorStage( + model_identifier=model_identifier, + text_field="text", + max_seq_length=None, + max_chars=None, + embedding_pooling="mean_pooling", + model_inference_batch_size=model_inference_batch_size, + ), + ParquetWriter(path=str(output_path), fields=["embeddings"]), + ], + ) + + output_tasks = pipeline.run(executor) + run_time_taken = time.perf_counter() - run_start_time + + # task._metadata is a dictionary of metadata for the task, but will not be used here. + # Instead simply use the num_items property of the task to get the number of documents processed. + # TODO: can we get the number of embeddings generated? + num_documents_processed = sum(task.num_items for task in output_tasks) + + logger.success(f"Benchmark completed in {run_time_taken:.2f}s") + logger.success(f"Processed {num_documents_processed} documents") + # logger.success(f"Generated {num_embeddings_generated} embeddings") + success = True + + except Exception as e: # noqa: BLE001 + error_traceback = traceback.format_exc() + logger.error(f"Benchmark failed: {e}") + logger.debug(f"Full traceback:\n{error_traceback}") + output_tasks = [] + run_time_taken = time.perf_counter() - run_start_time + num_documents_processed = 0 + # num_embeddings_generated = 0 + # embedding_dimension = 0 + success = False + + return { + "params": { + "executor": executor_name, + "input_path": str(input_path), + "output_path": str(output_path), + "dataset_size_gb": dataset_size_gb, + "model_identifier": model_identifier, + "model_inference_batch_size": model_inference_batch_size, + "benchmark_results_path": str(benchmark_results_path), + }, + "metrics": { + "is_success": success, + "time_taken": run_time_taken, + "num_documents_processed": num_documents_processed, + # "num_embeddings_generated": num_embeddings_generated, + # "embedding_dimension": embedding_dimension, + "num_output_tasks": len(output_tasks), + "throughput_docs_per_sec": num_documents_processed / run_time_taken if run_time_taken > 0 else 0, + # "throughput_embeddings_per_sec": num_embeddings_generated / run_time_taken if run_time_taken > 0 else 0, + }, + "tasks": output_tasks, + } + + +def write_results(results: dict[str, Any], output_path: Path) -> None: + """Write results to files required by the benchmarking framework at the given path.""" + output_path.mkdir(parents=True, exist_ok=True) + (output_path / "params.json").write_text(json.dumps(results["params"], indent=2)) + (output_path / "metrics.json").write_text(json.dumps(results["metrics"], indent=2)) + (output_path / "tasks.pkl").write_bytes(pickle.dumps(results["tasks"])) + + +def load_dataset_files(dataset_path: Path, dataset_size_gb: float) -> list[str]: + """Load the dataset files at the given path and return a subset of the files whose combined size is approximately the given size in GB.""" + input_files = get_all_file_paths_and_size_under( + dataset_path, recurse_subdirectories=True, keep_extensions="parquet" + ) + desired_size_bytes = (1024**3) * dataset_size_gb + total_size = 0 + subset_files = [] + for file, size in input_files: + if size + total_size > desired_size_bytes: + break + else: + subset_files.append(file) + total_size += size + + return subset_files + + +def main() -> int: + parser = argparse.ArgumentParser(description="Embedding generation benchmark") + # Paths + parser.add_argument("--benchmark-results-path", type=Path, required=True, help="Path to benchmark results") + parser.add_argument("--input-path", required=True, type=Path, help="Path to input data") + parser.add_argument( + "--output-path", default=Path("./embedding_generation_output"), type=Path, help="Output directory for results" + ) + # Executor + parser.add_argument("--executor", default="ray_data", choices=["xenna", "ray_data"], help="Executor to use") + # Pipeline Specific + parser.add_argument("--dataset-size-gb", type=float, required=True, help="Size of dataset to process in GB") + parser.add_argument( + "--model-identifier", + type=str, + required=True, + help="Model identifier (e.g., sentence-transformers/all-MiniLM-L6-v2)", + ) + parser.add_argument("--model-inference-batch-size", type=int, default=1024, help="Batch size for model inference") + + args = parser.parse_args() + + logger.info("=== Embedding Generation Benchmark Starting ===") + logger.info(f"Arguments: {vars(args)}") + + try: + results = run_embedding_generation_benchmark( + input_path=args.input_path, + output_path=args.output_path, + executor_name=args.executor, + dataset_size_gb=args.dataset_size_gb, + model_identifier=args.model_identifier, + model_inference_batch_size=args.model_inference_batch_size, + benchmark_results_path=args.benchmark_results_path, + ) + + except Exception as e: # noqa: BLE001 + error_traceback = traceback.format_exc() + print(f"Benchmark failed: {e}") + logger.debug(f"Full traceback:\n{error_traceback}") + results = { + "params": vars(args), + "metrics": { + "is_success": False, + }, + "tasks": [], + } + finally: + write_results(results, args.benchmark_results_path) + + # Return proper exit code based on success + return 0 if results["metrics"]["is_success"] else 1 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/benchmarking/scripts/removal_benchmark.py b/benchmarking/scripts/removal_benchmark.py new file mode 100755 index 000000000..3ac20d26b --- /dev/null +++ b/benchmarking/scripts/removal_benchmark.py @@ -0,0 +1,246 @@ +#!/usr/bin/env python3 +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Removal logic benchmarking script for nightly benchmarking framework. + +This script runs removal benchmarks with comprehensive metrics collection +using TaskPerfUtils and logs results to configured sinks. +""" + +import argparse +import json +import os +import pickle +import time +from pathlib import Path +from typing import Any + +from loguru import logger + +from nemo_curator.stages.file_partitioning import FilePartitioningStage +from nemo_curator.stages.text.deduplication.removal_workflow import TextDuplicatesRemovalWorkflow +from nemo_curator.tasks import EmptyTask + + +def run_removal_benchmark( # noqa: PLR0913 + input_path: str, + ids_to_remove_path: str, + output_path: str, + executor_name: str, + input_filetype: str = "jsonl", + output_filetype: str = "parquet", + id_field: str = "_curator_dedup_id", + duplicate_id_field: str = "id", + files_per_partition: int | None = None, + blocksize: str | None = None, + id_generator_path: str | None = None, + use_initial_tasks: bool = False, + limit: int | None = None, + use_ray_data_settings: bool = False, +) -> dict[str, Any]: + """Run the removal benchmark and collect comprehensive metrics.""" + + # Setup executor + if executor_name == "ray_data": + from nemo_curator.backends.experimental.ray_data import RayDataExecutor + + executor = RayDataExecutor() + if use_ray_data_settings: + from ray.data import DataContext + + DataContext.get_current().target_max_block_size = 1 + + elif executor_name == "xenna": + from nemo_curator.backends.xenna import XennaExecutor + + executor = XennaExecutor() + else: + msg = f"Executor {executor_name} not supported" + raise ValueError(msg) + + # Ensure output directory + Path(output_path).mkdir(parents=True, exist_ok=True) + + logger.info("Starting removal benchmark") + run_start_time = time.perf_counter() + + try: + # Validate partitioning: exactly one of files_per_partition or blocksize must be provided + if (files_per_partition is None) == (blocksize is None): + msg = "Exactly one of --files-per-partition or --blocksize must be provided" + raise ValueError(msg) # noqa: TRY301 + + # Create and run workflow-backed pipeline + workflow = TextDuplicatesRemovalWorkflow( + input_path=input_path, + ids_to_remove_path=ids_to_remove_path, + output_path=output_path, + input_filetype=input_filetype, # jsonl or parquet + input_id_field=id_field, + input_files_per_partition=files_per_partition, + input_blocksize=blocksize, + input_task_limit=limit, + ids_to_remove_duplicate_id_field=duplicate_id_field, + output_filetype=output_filetype, + id_generator_path=id_generator_path, + input_kwargs={}, + output_kwargs={}, + ) + + initial_tasks = None + if use_initial_tasks: + logger.info("Using initial tasks produced by FilePartitioningStage on driver") + partitioner = FilePartitioningStage( + file_paths=input_path, + files_per_partition=files_per_partition, + blocksize=blocksize, + file_extensions=[".jsonl", ".json", ".parquet"], + storage_options=None, + ) + initial_tasks = partitioner.process(EmptyTask) + log_msg = f"Initial tasks: {len(initial_tasks)}" + if limit: + initial_tasks = initial_tasks[:limit] + log_msg += f" (limited to {limit})" + logger.info(log_msg) + + output_tasks = workflow.run(executor, initial_tasks=initial_tasks) + run_time_taken = time.perf_counter() - run_start_time + + # Calculate removal statistics + num_removed = sum(task._metadata.get("num_removed", 0) for task in output_tasks if hasattr(task, "_metadata")) + logger.success(f"Benchmark completed in {run_time_taken:.2f}s, removed {num_removed} documents") + success = True + except Exception as e: # noqa: BLE001 + logger.error(f"Benchmark failed: {e}") + output_tasks = [] + run_time_taken = time.perf_counter() - run_start_time + num_removed = 0 + success = False + + return { + "params": { + "executor": executor_name, + "input_path": input_path, + "input_filetype": input_filetype, + "ids_to_remove_path": ids_to_remove_path, + "output_filetype": output_filetype, + "id_field": id_field, + "duplicate_id_field": duplicate_id_field, + "files_per_partition": files_per_partition, + "blocksize": blocksize, + "id_generator_path": id_generator_path, + "use_initial_tasks": use_initial_tasks, + "limit": limit, + }, + "metrics": { + "is_success": success, + "time_taken": run_time_taken, + "num_removed": num_removed, + "num_output_tasks": len(output_tasks), + }, + "tasks": output_tasks, + } + + +def write_results(results: dict, output_path: str | None = None) -> None: + """Write results to a file or stdout.""" + Path(output_path).parent.mkdir(parents=True, exist_ok=True) + with open(os.path.join(output_path, "params.json"), "w") as f: + json.dump(results["params"], f, indent=2) + with open(os.path.join(output_path, "metrics.json"), "w") as f: + json.dump(results["metrics"], f, indent=2) + with open(os.path.join(output_path, "tasks.pkl"), "wb") as f: + pickle.dump(results["tasks"], f) + + +def main() -> int: + parser = argparse.ArgumentParser(description="Removal logic benchmark for nightly benchmarking") + # Paths + parser.add_argument( + "--benchmark-results-path", required=True, help="Path to benchmark results" + ) # we should write params.json / metrics.json / tasks.pkl here + parser.add_argument("--input-path", required=True, help="Path to input data") + parser.add_argument("--ids-to-remove-path", required=True, help="Path to parquet file with IDs to remove") + parser.add_argument("--output-path", required=True, help="Output directory for results") + # Executor + parser.add_argument("--executor", default="xenna", choices=["xenna", "ray_data"], help="Executor to use") + # Pipeline Specific + parser.add_argument("--input-filetype", default="jsonl", choices=["jsonl", "parquet"], help="Input filetype") + parser.add_argument("--output-filetype", default="parquet", choices=["parquet", "jsonl"], help="Output filetype") + parser.add_argument("--id-field", default="_curator_dedup_id", help="ID field in input data") + parser.add_argument("--duplicate-id-field", default="id", help="ID field in removal file") + parser.add_argument( + "--files-per-partition", + type=int, + default=None, + help="Files per partition (mutually exclusive with --blocksize)", + ) + parser.add_argument( + "--blocksize", + type=str, + default=None, + help="Target partition size (e.g. '512MB', '1GiB'); mutually exclusive with --files-per-partition", + ) + parser.add_argument("--id-generator-path", type=str, default=None, help="Path to ID generator JSON (optional)") + parser.add_argument( + "--use-initial-tasks", + action="store_true", + help="If set, pre-compute initial FileGroupTasks via FilePartitioningStage and pass to workflow", + ) + parser.add_argument("--use-ray-data-settings", action="store_true", help="If set, use ray data settings") + parser.add_argument("--limit", type=int, default=None, help="Limit the number of tasks to process") + + args = parser.parse_args() + + logger.info("=== Removal Benchmark Starting ===") + logger.info(f"Arguments: {vars(args)}") + + try: + results = run_removal_benchmark( + input_path=args.input_path, + ids_to_remove_path=args.ids_to_remove_path, + output_path=args.output_path, + executor_name=args.executor, + input_filetype=args.input_filetype, + output_filetype=args.output_filetype, + id_field=args.id_field, + duplicate_id_field=args.duplicate_id_field, + files_per_partition=args.files_per_partition, + blocksize=args.blocksize, + id_generator_path=args.id_generator_path, + use_initial_tasks=args.use_initial_tasks, + limit=args.limit, + use_ray_data_settings=args.use_ray_data_settings, + ) + + except Exception as e: # noqa: BLE001 + print(f"Benchmark failed: {e}") + results = { + "params": vars(args), + "metrics": { + "is_success": False, + }, + "tasks": [], + } + finally: + write_results(results, args.benchmark_results_path) + + # Return proper exit code based on success + return 0 if results["metrics"]["is_success"] else 1 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/benchmarking/tools/build_docker.sh b/benchmarking/tools/build_docker.sh new file mode 100755 index 000000000..05b300e18 --- /dev/null +++ b/benchmarking/tools/build_docker.sh @@ -0,0 +1,45 @@ +#!/bin/bash + +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Exit immediately on error, unset vars are errors, pipeline errors are errors +set -euo pipefail + +# Make the image tags unique based on the current timestamp +# Use UTC to avoid confusion between users/build machines in different timezones +UTC_TIMESTAMP=$(date --utc "+%Y%m%d%H%M%SUTC") +NEMO_CURATOR_TAG="nemo_curator:${UTC_TIMESTAMP}" +NEMO_CURATOR_BENCHMARKING_TAG="nemo_curator_benchmarking:${UTC_TIMESTAMP}" + +# Assume this script is in the benchmarking/tools directory +THIS_SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +CURATOR_DIR="$(cd ${THIS_SCRIPT_DIR}/../.. && pwd)" + +# Build the standard NeMo Curator image +docker build \ + -f ${CURATOR_DIR}/docker/Dockerfile \ + --target nemo_curator \ + --tag=${NEMO_CURATOR_TAG} \ + --tag=nemo_curator:latest \ + ${CURATOR_DIR} + +# Build the benchmarking image which extends the standard NeMo Curator image +docker build \ + -f ${CURATOR_DIR}/benchmarking/Dockerfile \ + --target nemo_curator_benchmarking \ + --tag=${NEMO_CURATOR_BENCHMARKING_TAG} \ + --tag=nemo_curator_benchmarking:latest \ + --build-arg NEMO_CURATOR_IMAGE=${NEMO_CURATOR_TAG} \ + ${CURATOR_DIR} diff --git a/benchmarking/tools/gen_runscript_vars.py b/benchmarking/tools/gen_runscript_vars.py new file mode 100755 index 000000000..1acc34e3d --- /dev/null +++ b/benchmarking/tools/gen_runscript_vars.py @@ -0,0 +1,183 @@ +#!/bin/env python +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import os +import sys +from pathlib import Path + +import yaml + +this_script_path = Path(__file__).parent.absolute() +# Add the parent directory to PYTHONPATH to import the runner modules +sys.path.insert(0, str(this_script_path.parent)) +from runner.path_resolver import ( # noqa: E402 + CONTAINER_ARTIFACTS_DIR_ROOT, + CONTAINER_CONFIG_DIR_ROOT, + CONTAINER_CURATOR_DIR, + CONTAINER_DATASETS_DIR_ROOT, + CONTAINER_RESULTS_DIR_ROOT, +) + +DOCKER_IMAGE = os.environ.get("DOCKER_IMAGE", "nemo_curator_benchmarking:latest") +GPUS = os.environ.get("GPUS", "all") +HOST_CURATOR_DIR = os.environ.get("HOST_CURATOR_DIR", str(this_script_path.parent.parent.absolute())) +CURATOR_BENCHMARKING_DEBUG = os.environ.get("CURATOR_BENCHMARKING_DEBUG", "0") + +BASH_ENTRYPOINT_OVERRIDE = "" +ENTRYPOINT_ARGS = [] +VOLUME_MOUNTS = [] + + +def print_help(script_name: str) -> None: + """Print usage and help message for the run script (not this script)to stderr.""" + sys.stderr.write(f""" + Usage: {script_name} [OPTIONS] [ARGS ...] + + Options: + --use-host-curator Mount $HOST_CURATOR_DIR into the container for benchmarking/debugging curator sources without rebuilding the image. + --shell Start an interactive bash shell instead of running benchmarks. ARGS, if specified, will be passed to 'bash -c'. + For example: '--shell uv pip list | grep cugraph' will run 'uv pip list | grep cugraph' to display the version of cugraph installed in the container. + --config Path to a YAML config file. Can be specified multiple times to merge configs. This arg is required if not using --shell. + -h, --help Show this help message and exit. + + ARGS, if specified, are passed to the container entrypoint, either the default benchmarking entrypoint or the --shell bash entrypoint. + + Optional environment variables to override config and defaults: + GPUS Value for --gpus option to docker run (using: {GPUS}). + DOCKER_IMAGE Docker image to use (using: {DOCKER_IMAGE}). + HOST_CURATOR_DIR Curator repo path used with --use-host-curator (see above) (using: {HOST_CURATOR_DIR}). + CURATOR_BENCHMARKING_DEBUG Set to 1 for debug mode (regular output, possibly more in the future) (using: {CURATOR_BENCHMARKING_DEBUG}). + """) + + +def combine_dir_paths(patha: str | Path, pathb: str | Path) -> Path: + """Combine two paths, ensuring the result is an absolute path.""" + return Path(f"{patha}/{pathb}").absolute().expanduser().resolve() + + +def get_runscript_eval_str(argv: list[str]) -> str: # noqa: C901, PLR0912, PLR0915 + """ + Parse CLI args and output env variables for bash integration. + argv is a list of strings to be treated like sys.argv, where argv[0] is the name of this script. + argv[1] must be the name of the run.sh bash script, then all other args follow. + Example: ["this_script.py", "run.sh", "--use-host-curator", "--shell", "--config", "config.yaml"] + Returns a string of env variables to eval in bash. + """ + # Make a copy of argv to avoid modifying the caller's list. + argv = argv.copy() + # Initialize with defaults, these will be modified in this function. + bash_entrypoint_override = BASH_ENTRYPOINT_OVERRIDE + entrypoint_args = ENTRYPOINT_ARGS.copy() + volume_mounts = VOLUME_MOUNTS.copy() + + # This script must be called with the run.sh bash script as the first arg. + # It will be removed before parsing the rest of the args. + if len(argv) > 1: + script_name = Path(argv[1]).name + # Show help and exit if -h|--help is passed as first arg. All other options are passed to the + # container entrypoint including -h|--help if other args are present. This provides a way for + # -h|--help to be passed to the container entrypoint while still allowing for -h|--help output + # for the run.sh script. + if len(argv) > 2 and argv[2] in ("-h", "--help"): # noqa: PLR2004 + print_help(script_name) + sys.exit(1) + argv.pop(1) + else: + msg = "Internal error: script name not provided" + raise ValueError(msg) + + parser = argparse.ArgumentParser( + description="Parse benchmarking tool options and output env variables for bash integration.", + add_help=False, + ) + parser.add_argument("--use-host-curator", action="store_true") + parser.add_argument("--shell", action="store_true") + parser.add_argument("--config", action="append", type=Path, default=[]) + + args, unknown_args = parser.parse_known_args(argv[1:]) + + # Set volume mount for host curator directory. + if args.use_host_curator: + # Do not use combine_dir_paths here since CONTAINER_CURATOR_DIR is assumed to be a unique absolute + # path (e.g., /opt/Curator from Dockerfile). + volume_mounts.append(f"--volume {Path(HOST_CURATOR_DIR).absolute()}:{CONTAINER_CURATOR_DIR}") + + # Set entrypoint to bash if --shell is passed. + if args.shell: + bash_entrypoint_override = "--entrypoint=bash" + if len(unknown_args) > 0: + entrypoint_args.extend(["-c", " ".join(unknown_args)]) + else: + entrypoint_args.extend(unknown_args) + + # Parse config files and set volume mounts for results, artifacts, and datasets. + if args.config: + # consolidate all config files passed in into a single dict - last one wins. + config_data = {} + for config_file in args.config: + if not config_file.exists(): + msg = f"Config file not found: {config_file}." + raise FileNotFoundError(msg) + with open(config_file) as f: + new_config = yaml.safe_load(f) + if not isinstance(new_config, dict): + continue + config_data.update(new_config) + + # process the final path settings into the list of volume mounts. + for path_type, container_dir in [ + ("results_path", CONTAINER_RESULTS_DIR_ROOT), + ("artifacts_path", CONTAINER_ARTIFACTS_DIR_ROOT), + ("datasets_path", CONTAINER_DATASETS_DIR_ROOT), + ]: + if path_type in config_data: + path_value = config_data[path_type] + if path_value.startswith("/"): + container_dir_path = combine_dir_paths(container_dir, path_value) + volume_mounts.append(f"--volume {path_value}:{container_dir_path}") + else: + msg = f"Path value {path_value} for {path_type} must be an absolute path." + raise ValueError(msg) + else: + msg = f"Path value {path_type} not found in config file(s)." + raise ValueError(msg) + + # Add volume mounts for each config file so the script in the container can read each one + # and add each to ENTRYPOINT_ARGS. + for config_file in args.config: + config_file_host = config_file.absolute().expanduser().resolve() + container_dir_path = combine_dir_paths(CONTAINER_CONFIG_DIR_ROOT, config_file_host) + volume_mounts.append(f"--volume {config_file_host}:{container_dir_path}") + # Only add modified --config args if running the benchmark tool entrypoint, not the + # bash shell entrypoint. + if not args.shell: + entrypoint_args.append(f"--config={container_dir_path}") + + # Build and return the string to eval in bash. + eval_str = "" + eval_str += f"BASH_ENTRYPOINT_OVERRIDE={bash_entrypoint_override}\n" + eval_str += f"DOCKER_IMAGE={DOCKER_IMAGE}\n" + eval_str += f"GPUS={GPUS}\n" + eval_str += f"HOST_CURATOR_DIR={HOST_CURATOR_DIR}\n" + eval_str += f"CURATOR_BENCHMARKING_DEBUG={CURATOR_BENCHMARKING_DEBUG}\n" + vms = f'"{" ".join(volume_mounts)}"' if volume_mounts else "" + eval_str += f"VOLUME_MOUNTS={vms}\n" + eval_str += "ENTRYPOINT_ARGS=(" + " ".join([f'"{arg}"' for arg in entrypoint_args]) + ")\n" + return eval_str + + +if __name__ == "__main__": + print(get_runscript_eval_str(sys.argv)) diff --git a/benchmarking/tools/run.sh b/benchmarking/tools/run.sh new file mode 100755 index 000000000..b057da526 --- /dev/null +++ b/benchmarking/tools/run.sh @@ -0,0 +1,72 @@ +#!/bin/bash + +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -euo pipefail + +# Assume this script is in the /benchmarking/tools directory +THIS_SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" + +MLFLOW_TRACKING_URI=${MLFLOW_TRACKING_URI:-""} +SLACK_WEBHOOK_URL=${SLACK_WEBHOOK_URL:-""} +GDRIVE_FOLDER_ID=${GDRIVE_FOLDER_ID:-""} +GDRIVE_SERVICE_ACCOUNT_FILE=${GDRIVE_SERVICE_ACCOUNT_FILE:-""} + +# get the following vars from the command line, config file(s), etc. and +# set them in this environment: +# BASH_ENTRYPOINT_OVERRIDE +# DOCKER_IMAGE +# GPUS +# HOST_CURATOR_DIR +# CURATOR_BENCHMARKING_DEBUG +# VOLUME_MOUNTS +# ENTRYPOINT_ARGS +eval_str=$(python ${THIS_SCRIPT_DIR}/gen_runscript_vars.py "${BASH_SOURCE[0]}" "$@") +eval "$eval_str" + +# Get the image digest/ID for benchmark reports. This is not known at image build time. +IMAGE_DIGEST=$(docker image inspect ${DOCKER_IMAGE} --format '{{.Digest}}' 2>/dev/null) || true +if [ -z "${IMAGE_DIGEST}" ] || [ "${IMAGE_DIGEST}" = "" ]; then + # Use the image ID as a fallback + IMAGE_DIGEST=$(docker image inspect ${DOCKER_IMAGE} --format '{{.ID}}' 2>/dev/null) || true +fi +if [ -z "${IMAGE_DIGEST}" ] || [ "${IMAGE_DIGEST}" = "" ]; then + IMAGE_DIGEST="" +fi + +################################################################################################################ +docker run \ + --rm \ + --net=host \ + --interactive \ + --tty \ + \ + --gpus="\"${GPUS}\"" \ + \ + ${VOLUME_MOUNTS} \ + \ + --env=IMAGE_DIGEST=${IMAGE_DIGEST} \ + --env=MLFLOW_TRACKING_URI=${MLFLOW_TRACKING_URI} \ + --env=SLACK_WEBHOOK_URL=${SLACK_WEBHOOK_URL} \ + --env=GDRIVE_FOLDER_ID=${GDRIVE_FOLDER_ID} \ + --env=GDRIVE_SERVICE_ACCOUNT_FILE=${GDRIVE_SERVICE_ACCOUNT_FILE} \ + --env=CURATOR_BENCHMARKING_DEBUG=${CURATOR_BENCHMARKING_DEBUG} \ + --env=HOST_HOSTNAME=$(hostname) \ + \ + ${BASH_ENTRYPOINT_OVERRIDE} \ + ${DOCKER_IMAGE} \ + "${ENTRYPOINT_ARGS[@]}" + +exit $? diff --git a/nemo_curator/tasks/utils.py b/nemo_curator/tasks/utils.py index b78e4a694..b99426be1 100644 --- a/nemo_curator/tasks/utils.py +++ b/nemo_curator/tasks/utils.py @@ -13,6 +13,7 @@ # limitations under the License. from collections import defaultdict +from typing import Any import numpy as np @@ -63,3 +64,19 @@ def collect_stage_metrics(tasks: list[Task]) -> dict[str, dict[str, np.ndarray[f stage: {m: np.asarray(vals, dtype=float) for m, vals in metrics.items()} for stage, metrics in stage_to_metrics.items() } + + @staticmethod + def aggregate_task_metrics(tasks: list[Task], prefix: str | None = None) -> dict[str, Any]: + """Aggregate task metrics by computing mean/std/sum.""" + metrics = {} + tasks_metrics = TaskPerfUtils.collect_stage_metrics(tasks) + # For each of the metric compute mean/std/sum and flatten the dict + for stage_name, stage_data in tasks_metrics.items(): + for metric_name, values in stage_data.items(): + for agg_name, agg_func in [("sum", np.sum), ("mean", np.mean), ("std", np.std)]: + stage_key = stage_name if prefix is None else f"{prefix}_{stage_name}" + if len(values) > 0: + metrics[f"{stage_key}_{metric_name}_{agg_name}"] = float(agg_func(values)) + else: + metrics[f"{stage_key}_{metric_name}_{agg_name}"] = 0.0 + return metrics