Skip to content
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
92 commits
Select commit Hold shift + click to select a range
bd5e115
docs and utility code
cyruszhang Jul 16, 2025
db3f470
add checkpointing strategy support
cyruszhang Jul 23, 2025
d6aa8c3
use enum for strategy; use every_ops for default
cyruszhang Jul 23, 2025
98eba4b
support event_log and checkpoint directories, with proper naming and …
cyruszhang Jul 23, 2025
8668ae9
add ray_partitioned mode in process_data
cyruszhang Jul 24, 2025
d7a1275
add necessary configs for partition/checkpoint
cyruszhang Jul 24, 2025
86c6522
update event_loggin_mixin for proper formatting
cyruszhang Jul 24, 2025
20e146c
add README.md
cyruszhang Jul 24, 2025
17e0d2c
update demo yaml
cyruszhang Jul 24, 2025
c277435
add parition and intermediate_storage related config logic
cyruszhang Jul 24, 2025
20b1c06
remove export_shard_size for single file output
cyruszhang Jul 24, 2025
bda3033
fix export/logging logic in PartitionedRayExecutor
cyruszhang Jul 24, 2025
f98d7f8
add atuo partition size logic; ignore F541
cyruszhang Jul 24, 2025
116ff0e
switch back to ray_partitioned mode
cyruszhang Jul 24, 2025
0c9373a
Update data_juicer/core/executor/partition_size_optimizer.py
cyruszhang Jul 24, 2025
552527d
remove duplcate code
cyruszhang Jul 24, 2025
9d20177
rename demo config
cyruszhang Jul 24, 2025
488126c
add partition_dir; resolution logic of job_id and work_dir
cyruszhang Jul 24, 2025
3796c20
consolidate direcotry resolution logic
cyruszhang Jul 25, 2025
6685a3b
add demo code; consolidate docs
cyruszhang Jul 25, 2025
8b7d0be
restore accidentally deleted images
cyruszhang Jul 25, 2025
56e562b
use every_op for checkpointing
cyruszhang Jul 25, 2025
ab3a27e
use every_op for checkpointing
cyruszhang Jul 25, 2025
e5be57a
complete event logs
cyruszhang Jul 25, 2025
b7e731c
add utility for counting different file formats; for debugging
cyruszhang Jul 26, 2025
6cfcbf9
add support for tallying directories
cyruszhang Jul 26, 2025
d51e7d8
bugfix: handle last partition
cyruszhang Jul 26, 2025
d21b690
fix parquet writing error during re-partitioning
cyruszhang Jul 28, 2025
ad0ecd5
make arrow intermediate work
cyruszhang Jul 28, 2025
f8d5138
parquet intermediate storage size fix; get rid of fallback config logic
cyruszhang Jul 28, 2025
2930f44
defaults to 10K for parquet file size
cyruszhang Jul 28, 2025
2da9183
remove dead arrow configs; use compression
cyruszhang Jul 28, 2025
3d92ad7
add job monitoring utility
cyruszhang Jul 29, 2025
f970d1d
update count_rows utility; auto detect between directory or file
cyruszhang Jul 29, 2025
5796eaf
add monitor and stop; update event log and executor for proper event …
cyruszhang Jul 29, 2025
4b40898
update README for job monitor/stopper
cyruszhang Jul 29, 2025
4b86ce1
remove info/warn from event logs
cyruszhang Jul 30, 2025
71ddd15
remove performance/resources related event logs
cyruszhang Jul 30, 2025
205ed70
backup config logic; don't resume when config don't match
cyruszhang Jul 30, 2025
cc18176
no resumption without job_id arg
cyruszhang Jul 30, 2025
f70aca7
add re-partition log entries
cyruszhang Jul 30, 2025
5e8526e
fix event_id issue
cyruszhang Jul 31, 2025
f076708
fix: jsonl with .json ext
cyruszhang Jul 31, 2025
fdf692f
use ray_exporter for data exporting
cyruszhang Jul 31, 2025
2ecdf02
bugfix: tracer related config
cyruszhang Jul 31, 2025
bd55dba
proper handling of already done job
cyruszhang Jul 31, 2025
474ebf9
add ast support
cyruszhang Aug 1, 2025
98cc115
add dag support
cyruszhang Aug 1, 2025
83ae339
enforce job_id as last part of work path
cyruszhang Aug 4, 2025
c923357
add test case for dir resolutions
cyruszhang Aug 4, 2025
089499c
dag_execution_plan location fix
cyruszhang Aug 4, 2025
b9f7e5e
dag related events
cyruszhang Aug 4, 2025
4365f06
enable DAG in ray_executor_partitioned; update partition sizing logic
cyruszhang Aug 4, 2025
bc72ba5
enable dag and event pipeline in DefaultExecutor
cyruszhang Aug 4, 2025
e2b5720
enable DAG/event-logging in ray executor
cyruszhang Aug 4, 2025
b276c90
add test for dag
cyruszhang Aug 4, 2025
bd849bb
bugfix: missing checkpoints in manual mode
cyruszhang Aug 4, 2025
7a350bc
update config
cyruszhang Aug 4, 2025
0938676
add restart related events
cyruszhang Aug 5, 2025
39c76bb
remove enable_fault_tolerance config
cyruszhang Aug 8, 2025
30b8122
remove retries and backoff strategy configs
cyruszhang Aug 8, 2025
a2ea06e
visualize dj job
cyruszhang Aug 8, 2025
9889cab
complete resource awareness logic; fix logs and events
cyruszhang Aug 9, 2025
1a5209b
remove visualization
cyruszhang Aug 9, 2025
8491be4
add job snapshot utility
cyruszhang Aug 9, 2025
4bed148
update documentations and demos
cyruszhang Aug 9, 2025
ba11d08
update readme
cyruszhang Aug 10, 2025
2c556c0
use ray_dataset.take instead of to_pandas.to_dict for more efficiency
cyruszhang Sep 2, 2025
7b82c13
bugfix: process by ops and proper checkpointing
cyruszhang Sep 2, 2025
0b08bcd
use ray dataset repartitioning
cyruszhang Sep 2, 2025
4a2b0bf
update config for partition and resource_optimization
cyruszhang Sep 5, 2025
a7fb5e2
handle config logic
cyruszhang Sep 5, 2025
414ee63
use split for partitioning data; support size_in_mb and rows options …
cyruszhang Sep 5, 2025
d483245
more changes
cyruszhang Sep 15, 2025
b977017
temp fix for filter_with_union_find stuck
cyruszhang Sep 16, 2025
a5579f1
Add materialize prior to global op convergence to avoid high backpres…
cyruszhang Sep 16, 2025
5c33e26
remove final checkpoint
cyruszhang Sep 16, 2025
aed499d
back to barebone for deadlock issue resolution
cyruszhang Sep 17, 2025
e283461
checkpointing support with auto-resumption
cyruszhang Sep 17, 2025
4720be6
job resumption logic
cyruszhang Sep 17, 2025
fb29fa7
add manual/auto partition sizing mode; fix subtle bug with job resump…
cyruszhang Sep 17, 2025
e195d98
merge master
cyruszhang Sep 18, 2025
469f10a
fix config bug
cyruszhang Sep 18, 2025
24463e2
re-enable DAG
cyruszhang Sep 18, 2025
40bcba2
add tests
cyruszhang Sep 18, 2025
2742b24
enable DAG monitoring
cyruszhang Sep 18, 2025
1f6ae68
temp directory cleanup
cyruszhang Sep 19, 2025
3ede5d7
temp directory logic
cyruszhang Sep 22, 2025
ff09274
Merge branch 'main' into feat/cyrusz/partition
cyruszhang Sep 22, 2025
939f5c4
fix config for artifically introduced namespace fields
cyruszhang Oct 1, 2025
b055cf1
fix factory return types
cyruszhang Oct 2, 2025
3e28558
Merge branch 'main' into feat/cyrusz/partition
cyruszhang Oct 24, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 172 additions & 0 deletions configs/demo/checkpoint_config_example.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
# =============================================================================
# COMPREHENSIVE DATAJUICER DEMO: Checkpointing, Event Logging & Job Management
# =============================================================================
# This demo showcases:
# 1. Configurable checkpointing strategies
# 2. Event logging with job-specific directories
# 3. Flexible storage architecture
# 4. Job resumption capabilities
# 5. Real DataJuicer operations
# =============================================================================

# Global parameters
work_dir: "./outputs/demo-checkpoint-strategies"

# Separate storage configuration (optional)
# Event logs: Fast storage (SSD, local disk) - small files, frequent writes
event_log_dir: "/tmp/fast_event_logs" # Optional: separate fast storage for event logs

# Checkpoints: Large storage (HDD, network storage) - large files, infrequent writes
checkpoint_dir: "/tmp/large_checkpoints" # Optional: separate large storage for checkpoints

# Executor configuration
executor_type: "ray_partitioned" # Use our enhanced partitioned executor


# Intermediate storage configuration for partition and checkpoint data (format, compression, and lifecycle management)
intermediate_storage:
# File format and compression
format: "parquet" # parquet, arrow, jsonl
compression: "snappy" # snappy, gzip, none
use_arrow_batches: true
arrow_batch_size: 500
arrow_memory_mapping: false

# File lifecycle management
preserve_intermediate_data: true # Keep temporary files for debugging/resumption
cleanup_temp_files: true
cleanup_on_success: false
retention_policy: "keep_all" # keep_all, keep_failed_only, cleanup_all
max_retention_days: 7


# Partitioning configuration
partition:
# Auto-configuration (recommended for most use cases)
auto_configure: false # Disable auto-configuration to use manual settings

# Manual partitioning settings (used when auto_configure: false)
# Recommended partition sizes:
# - 50-100: For debugging, quick iterations, small datasets
# - 100-300: For production, good balance of fault tolerance and efficiency
# - 300-500: For large datasets with stable processing
# - 500+: Only for very large datasets with minimal failure risk
size: 50000 # Number of samples per partition (smaller for better fault tolerance)
max_size_mb: 128 # Maximum partition size in MB (reduced for faster processing)

# Fault tolerance settings
enable_fault_tolerance: true
max_retries: 3
retry_backoff: "exponential" # exponential, linear, fixed

# Checkpoint configuration
checkpoint:
enabled: true
strategy: "every_op" # every_op, every_partition, every_n_ops, manual, disabled
n_ops: 2 # For every_n_ops strategy
op_names: ["clean_links_mapper", "whitespace_normalization_mapper"] # For manual strategy

# Event logging configuration
event_logging:
enabled: true
max_log_size_mb: 100
backup_count: 5

# Ray configuration
ray_address: "auto"
np: 2 # Number of Ray workers

# Dataset configuration
dataset_path: './demos/data/demo-dataset.jsonl'
export_path: './outputs/demo-checkpoint-strategies/processed.jsonl'

# Process pipeline with real DataJuicer operations
process:
# Text cleaning operations
- clean_links_mapper:
text_key: "text"
min_links: 0
max_links: 10

- clean_email_mapper:
text_key: "text"
min_emails: 0
max_emails: 5

- whitespace_normalization_mapper:
text_key: "text"

- fix_unicode_mapper:
text_key: "text"

# Text filtering operations
- text_length_filter:
text_key: "text"
min_len: 10
max_len: 10000

- alphanumeric_filter:
text_key: "text"
min_ratio: 0.3

# Quality filtering
- character_repetition_filter:
text_key: "text"
min_ratio: 0.0
max_ratio: 0.3

- word_repetition_filter:
text_key: "text"
min_ratio: 0.0
max_ratio: 0.3

# Export configuration
export_in_parallel: true
keep_stats_in_res_ds: true
keep_hashes_in_res_ds: true

# =============================================================================
# COMPLETE USER EXPERIENCE:
# =============================================================================
# 1. Start job:
# dj-process --config configs/demo/checkpoint_config_example.yaml
# # Output shows: Job ID (timestamp_configname_suffix), job directory, resumption command
# # Example: 20241201_143022_checkpoint_config_example_abc123
#
# 2. If job fails, resume with:
# dj-process --config configs/demo/checkpoint_config_example.yaml --job_id <job_id>
# # System validates job_id and shows previous status
#
# 3. Directory structure (flexible storage):
# Option A: All in work_dir (default)
# {work_dir}/
# ├── 20241201_143022_checkpoint_config_example_abc123/ # Job-specific directory
# │ ├── job_summary.json # Job metadata and status
# │ ├── event_logs/
# │ │ ├── events.log # Human-readable logs
# │ │ └── events.jsonl # Machine-readable for resumption
# │ ├── checkpoints/ # Job-specific checkpoint data
# │ │ ├── partition_000000/
# │ │ │ ├── op_000_clean_links_mapper.parquet
# │ │ │ └── op_001_clean_email_mapper.parquet
# │ │ └── checkpoint_1701432000.json
# │ └── metadata/ # Job-specific metadata
# │ └── dataset_mapping.json
#
# Option B: Separate storage (configured)
# {work_dir}/
# ├── 20241201_143022_checkpoint_config_example_abc123/ # Job metadata only
# │ └── job_summary.json
# /tmp/fast_event_logs/ # Fast storage for event logs
# ├── 20241201_143022_checkpoint_config_example_abc123/
# │ └── event_logs/
# │ ├── events.log
# │ └── events.jsonl
# /tmp/large_checkpoints/ # Large storage for checkpoints
# ├── 20241201_143022_checkpoint_config_example_abc123/
# │ ├── partition_000000/
# │ │ ├── op_000_clean_links_mapper.parquet
# │ │ └── op_001_clean_email_mapper.parquet
# │ └── checkpoint_1701432000.json
# └── results/ # Shared final results
# =============================================================================
Loading
Loading