-
Couldn't load subscription status.
- Fork 284
Support of partitioning/checkpointing/event-logging #748
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
cyruszhang
wants to merge
92
commits into
main
Choose a base branch
from
feat/cyrusz/partition
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 67 commits
Commits
Show all changes
92 commits
Select commit
Hold shift + click to select a range
bd5e115
docs and utility code
cyruszhang db3f470
add checkpointing strategy support
cyruszhang d6aa8c3
use enum for strategy; use every_ops for default
cyruszhang 98eba4b
support event_log and checkpoint directories, with proper naming and …
cyruszhang 8668ae9
add ray_partitioned mode in process_data
cyruszhang d7a1275
add necessary configs for partition/checkpoint
cyruszhang 86c6522
update event_loggin_mixin for proper formatting
cyruszhang 20e146c
add README.md
cyruszhang 17e0d2c
update demo yaml
cyruszhang c277435
add parition and intermediate_storage related config logic
cyruszhang 20b1c06
remove export_shard_size for single file output
cyruszhang bda3033
fix export/logging logic in PartitionedRayExecutor
cyruszhang f98d7f8
add atuo partition size logic; ignore F541
cyruszhang 116ff0e
switch back to ray_partitioned mode
cyruszhang 0c9373a
Update data_juicer/core/executor/partition_size_optimizer.py
cyruszhang 552527d
remove duplcate code
cyruszhang 9d20177
rename demo config
cyruszhang 488126c
add partition_dir; resolution logic of job_id and work_dir
cyruszhang 3796c20
consolidate direcotry resolution logic
cyruszhang 6685a3b
add demo code; consolidate docs
cyruszhang 8b7d0be
restore accidentally deleted images
cyruszhang 56e562b
use every_op for checkpointing
cyruszhang ab3a27e
use every_op for checkpointing
cyruszhang e5be57a
complete event logs
cyruszhang b7e731c
add utility for counting different file formats; for debugging
cyruszhang 6cfcbf9
add support for tallying directories
cyruszhang d51e7d8
bugfix: handle last partition
cyruszhang d21b690
fix parquet writing error during re-partitioning
cyruszhang ad0ecd5
make arrow intermediate work
cyruszhang f8d5138
parquet intermediate storage size fix; get rid of fallback config logic
cyruszhang 2930f44
defaults to 10K for parquet file size
cyruszhang 2da9183
remove dead arrow configs; use compression
cyruszhang 3d92ad7
add job monitoring utility
cyruszhang f970d1d
update count_rows utility; auto detect between directory or file
cyruszhang 5796eaf
add monitor and stop; update event log and executor for proper event …
cyruszhang 4b40898
update README for job monitor/stopper
cyruszhang 4b86ce1
remove info/warn from event logs
cyruszhang 71ddd15
remove performance/resources related event logs
cyruszhang 205ed70
backup config logic; don't resume when config don't match
cyruszhang cc18176
no resumption without job_id arg
cyruszhang f70aca7
add re-partition log entries
cyruszhang 5e8526e
fix event_id issue
cyruszhang f076708
fix: jsonl with .json ext
cyruszhang fdf692f
use ray_exporter for data exporting
cyruszhang 2ecdf02
bugfix: tracer related config
cyruszhang bd55dba
proper handling of already done job
cyruszhang 474ebf9
add ast support
cyruszhang 98cc115
add dag support
cyruszhang 83ae339
enforce job_id as last part of work path
cyruszhang c923357
add test case for dir resolutions
cyruszhang 089499c
dag_execution_plan location fix
cyruszhang b9f7e5e
dag related events
cyruszhang 4365f06
enable DAG in ray_executor_partitioned; update partition sizing logic
cyruszhang bc72ba5
enable dag and event pipeline in DefaultExecutor
cyruszhang e2b5720
enable DAG/event-logging in ray executor
cyruszhang b276c90
add test for dag
cyruszhang bd849bb
bugfix: missing checkpoints in manual mode
cyruszhang 7a350bc
update config
cyruszhang 0938676
add restart related events
cyruszhang 39c76bb
remove enable_fault_tolerance config
cyruszhang 30b8122
remove retries and backoff strategy configs
cyruszhang a2ea06e
visualize dj job
cyruszhang 9889cab
complete resource awareness logic; fix logs and events
cyruszhang 1a5209b
remove visualization
cyruszhang 8491be4
add job snapshot utility
cyruszhang 4bed148
update documentations and demos
cyruszhang ba11d08
update readme
cyruszhang 2c556c0
use ray_dataset.take instead of to_pandas.to_dict for more efficiency
cyruszhang 7b82c13
bugfix: process by ops and proper checkpointing
cyruszhang 0b08bcd
use ray dataset repartitioning
cyruszhang 4a2b0bf
update config for partition and resource_optimization
cyruszhang a7fb5e2
handle config logic
cyruszhang 414ee63
use split for partitioning data; support size_in_mb and rows options …
cyruszhang d483245
more changes
cyruszhang b977017
temp fix for filter_with_union_find stuck
cyruszhang a5579f1
Add materialize prior to global op convergence to avoid high backpres…
cyruszhang 5c33e26
remove final checkpoint
cyruszhang aed499d
back to barebone for deadlock issue resolution
cyruszhang e283461
checkpointing support with auto-resumption
cyruszhang 4720be6
job resumption logic
cyruszhang fb29fa7
add manual/auto partition sizing mode; fix subtle bug with job resump…
cyruszhang e195d98
merge master
cyruszhang 469f10a
fix config bug
cyruszhang 24463e2
re-enable DAG
cyruszhang 40bcba2
add tests
cyruszhang 2742b24
enable DAG monitoring
cyruszhang 1f6ae68
temp directory cleanup
cyruszhang 3ede5d7
temp directory logic
cyruszhang ff09274
Merge branch 'main' into feat/cyrusz/partition
cyruszhang 939f5c4
fix config for artifically introduced namespace fields
cyruszhang b055cf1
fix factory return types
cyruszhang 3e28558
Merge branch 'main' into feat/cyrusz/partition
cyruszhang File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,154 @@ | ||
| # ============================================================================= | ||
| # COMPREHENSIVE DATAJUICER DEMO: Checkpointing, Event Logging & Job Management | ||
| # ============================================================================= | ||
| # This demo showcases: | ||
| # 1. Configurable checkpointing strategies | ||
| # 2. Event logging with job-specific directories | ||
| # 3. Flexible storage architecture | ||
| # 4. Job resumption capabilities | ||
| # 5. Real DataJuicer operations | ||
| # ============================================================================= | ||
|
|
||
| # Data location configuration (Mandatory) | ||
| dataset_path: './demos/data/demo-dataset.jsonl' | ||
|
|
||
| # Work directory configuration | ||
| # IMPORTANT: If using {job_id} placeholder, it MUST be the last part of the path | ||
| # Examples: | ||
| # ✅ work_dir: "./outputs/my_project/{job_id}" # Valid | ||
| # ✅ work_dir: "/data/experiments/{job_id}" # Valid | ||
| # ❌ work_dir: "./outputs/{job_id}/results" # Invalid - {job_id} not at end | ||
| # ❌ work_dir: "./{job_id}/outputs/data" # Invalid - {job_id} not at end | ||
| # | ||
| # If no {job_id} is specified, job_id will be automatically appended: | ||
| # work_dir: "./outputs/my_project" → job_dir: "./outputs/my_project/20250804_143022_abc123" | ||
| work_dir: "./outputs/partition-checkpoint-eventlog/{job_id}" | ||
| export_path: '{work_dir}/processed.jsonl' | ||
|
|
||
| # Executor configuration | ||
| executor_type: "ray_partitioned" # Use our enhanced partitioned executor | ||
| ray_address: "auto" | ||
| # np will be auto-configured based on available cluster resources when partition.auto_configure: true | ||
| # np: 2 # Number of Ray workers (auto-configured when partition.auto_configure: true) | ||
|
|
||
| # Separate storage configuration | ||
| # Partition directory (Optional) is used to store the partitions of the dataset if using ray_partitioned executor | ||
| partition_dir: "{work_dir}/partitions" | ||
|
|
||
| # Event logs: Fast storage (SSD, local disk) - small files, frequent writes (Optional) | ||
| event_log_dir: "{work_dir}/event_logs" # Optional: separate fast storage for event logs | ||
|
|
||
| # Checkpoints: Large storage (HDD, network storage) - large files, infrequent writes (Optional) | ||
| checkpoint_dir: "{work_dir}/checkpoints" # Optional: separate large storage for checkpoints | ||
|
|
||
|
|
||
| # Resource optimization configuration | ||
| resource_optimization: | ||
| auto_configure: true # Enable automatic optimization of partition size, worker count, and other resource-dependent settings | ||
| # Manual configuration (used when auto_configure: false) | ||
| # partition: | ||
| # size: 10000 # Number of samples per partition | ||
| # max_size_mb: 128 # Maximum partition size in MB | ||
| # np: 2 # Number of Ray workers | ||
|
|
||
| # Partition configuration (used when resource_optimization.auto_configure: false) | ||
| partition: | ||
| # size: 10000 # Number of samples per partition | ||
| # max_size_mb: 128 # Maximum partition size in MB | ||
|
|
||
| # Checkpoint configuration | ||
| checkpoint: | ||
| enabled: false | ||
| # strategy: "every_op" # every_op, every_partition, every_n_ops, manual, disabled | ||
| # n_ops: 1 # Number of operations between checkpoints (for every_n_ops strategy) | ||
| # op_names: [] # Specific operation names to checkpoint after (for manual strategy) | ||
|
|
||
| # Intermediate storage configuration (includes file lifecycle management) | ||
| intermediate_storage: | ||
| format: "parquet" # parquet, arrow, jsonl | ||
| parquet_batch_size: 10000 # Number of rows per parquet file | ||
|
|
||
| # Event logging configuration | ||
| event_logging: | ||
| enabled: true | ||
|
|
||
| # Process pipeline with real DataJuicer operations | ||
| process: | ||
| # Text cleaning operations | ||
| - clean_links_mapper: | ||
| text_key: "text" | ||
| min_links: 0 | ||
| max_links: 10 | ||
|
|
||
| - clean_email_mapper: | ||
| text_key: "text" | ||
| min_emails: 0 | ||
| max_emails: 5 | ||
|
|
||
| - whitespace_normalization_mapper: | ||
| text_key: "text" | ||
|
|
||
| - fix_unicode_mapper: | ||
| text_key: "text" | ||
|
|
||
| # Text filtering operations | ||
| - text_length_filter: | ||
| text_key: "text" | ||
| min_len: 5 | ||
| max_len: 10000 | ||
|
|
||
| - alphanumeric_filter: | ||
| text_key: "text" | ||
| min_ratio: 0.1 | ||
|
|
||
| # Quality filtering | ||
| - character_repetition_filter: | ||
| text_key: "text" | ||
| min_ratio: 0.0 | ||
| max_ratio: 0.5 | ||
|
|
||
| - word_repetition_filter: | ||
| text_key: "text" | ||
| min_ratio: 0.0 | ||
| max_ratio: 0.5 | ||
|
|
||
| # Export configuration | ||
| export_in_parallel: true | ||
| keep_stats_in_res_ds: true | ||
| keep_hashes_in_res_ds: true | ||
|
|
||
| # ============================================================================= | ||
| # COMPLETE USER EXPERIENCE: | ||
| # ============================================================================= | ||
| # 1. Start job: | ||
| # dj-process --config configs/demo/partition-checkpoint-eventlog.yaml | ||
| # # Output shows: Job ID (timestamp_configname_suffix), job directory, resumption command | ||
| # # Example: 20241201_143022_partition-checkpoint-eventlog_abc123 | ||
| # | ||
| # 2. If job fails, resume with: | ||
| # dj-process --config configs/demo/partition-checkpoint-eventlog.yaml --job_id <job_id> | ||
| # # System validates job_id and shows previous status | ||
| # | ||
| # 3. Directory structure (flexible storage): | ||
| # outputs/partition-checkpoint-eventlog/{job_id}/ | ||
| # ├── partitions/ # Dataset partitions (large files) | ||
| # ├── checkpoints/ # Operation checkpoints (large files) | ||
| # ├── event_logs/ # Event logs (small files, frequent writes) | ||
| # ├── metadata/ # Job metadata and mapping | ||
| # ├── results/ # Final processed dataset | ||
| # └── processed.jsonl # Final output file | ||
| # | ||
| # 4. Resource Optimization: | ||
| # - resource_optimization.auto_configure: true automatically optimizes: | ||
| # * Partition size based on data characteristics and available memory | ||
| # * Worker count (np) based on available CPU cores | ||
| # * Processing efficiency based on data modality (text, image, audio, video) | ||
| # - No manual tuning required - system adapts to your hardware and data | ||
| # | ||
| # 5. Monitoring and Debugging: | ||
| # - Real-time event logs in event_logs/ directory | ||
| # - Processing summary with statistics and timing | ||
| # - Checkpoint recovery for fault tolerance | ||
| # - Detailed resource utilization analysis | ||
| # | ||
| # ============================================================================= |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DataJuicer --> Data-Juicer
Need to check other docs as well.