Skip to content

Conversation

@cyruszhang
Copy link
Collaborator

design doc (internal): https://aliyuque.antfin.com/ah7ri9/zdesop/qw3tm08a5wcqx446

概述
Data-Juicer 分区、检查点和事件日志系统为处理大型数据集提供了全面的解决方案,具备容错性、可扩展性和完整的可观测性。

设计初衷
Ray会有一些容错能力(actor的persistence机制,以及task级别的重试逻辑);Ray-DLC也会提供更好的异常容错和自愈;但是还是存在一些系统性的问题:
● 整体执行问题:ray将整个数据集作为一个整体单元来处理;如果一小部分失败了,整个OP stage乃至pipeine就失败
● 进度恢复空白:整个流程作为个整体来操作的,一个部分错了就会需要全部重跑
● 没有用户可配置的细粒度的容错方式,缺少灵活性
● 数据持久化和映射:这个目前是空缺的;actor可以提供入口,但是目前DJ框架没有支持
● 可观测性不够:ray只有集群状态,对于dj任务的状态还是缺少了更好的观测
所以我们希望能够通过一整套分区、检查点、事件日志的逻辑,把这些问题都解决

主要功能
● 容错性: 使用检查点自动从故障中恢复
● 可扩展性: 基于分区的处理,适用于任何规模的数据集
● 可观测性: 全面的事件日志记录和实时监控
● 性能: 优化的存储格式和并行处理
● 灵活性: 可配置的分区和检查点策略
DJ会提供分区、检查点逻辑,可以给用户提供显式的容错机制介入,灵活的重启方式,小分片的血缘支持,也有更加完整和细粒度的的任务可观测性,可以给DJ提供企业级的服务打好基础。这个跟Ray或者Ray-DLC关注的容错和计算可扩展性并不冲突

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary of Changes

Hello @cyruszhang, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly upgrades Data-Juicer's capabilities for large-scale data processing by implementing a comprehensive system for partitioning, checkpointing, and event logging. These changes are designed to enhance the platform's fault tolerance, improve scalability, provide in-depth observability, and offer greater flexibility in managing data processing workflows, especially within the Ray execution environment.

Highlights

  • Enhanced Fault Tolerance and Resumption: This PR introduces a robust partitioning and checkpointing system for Data-Juicer, enabling jobs to automatically resume from failures. It prevents the need to re-run entire pipelines by saving intermediate states, significantly improving reliability for large dataset processing.
  • Comprehensive Observability with Event Logging: A new, universal event logging system has been integrated, providing real-time monitoring, detailed progress tracking, and comprehensive audit trails. This system logs various events, including job, partition, operation, and checkpoint activities, enhancing debugging and operational visibility across all executor types.
  • Optimized Data Handling and Storage: The system now supports configurable intermediate data storage formats (Parquet, Arrow, JSONL) with compression, allowing users to balance performance, storage efficiency, and human readability. It also includes an intelligent auto-configuration system for optimal partition sizing based on data modality and processing complexity.
  • Flexible Job Management and Configuration: New configuration options and command-line arguments are added for fine-grained control over partitioning, checkpointing strategies (e.g., every_op, every_partition, every_n_ops), and file retention policies. It also introduces job-specific directories and automatic generation of resumption commands for easier job management.
  • New Partitioned Ray Executor: A ray_partitioned executor type is introduced, leveraging Ray's distributed capabilities with the new partitioning, checkpointing, and event logging features to provide a highly scalable and fault-tolerant processing solution for large datasets.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in issue comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments or fill out our survey to provide feedback.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a major new feature: a partitioned Ray executor with checkpointing and event logging capabilities. This is a significant step towards making Data-Juicer more robust, scalable, and observable for large-scale data processing. The changes are extensive, including new core components, configuration options, and comprehensive documentation.

The overall design is solid, with a clear separation of concerns for partitioning, checkpointing, and event logging. The use of a mixin for event logging is a good pattern for reusability across different executors. The automatic partition size optimization is a great usability improvement.

However, the review identified several critical and high-severity issues that need to be addressed:

  • There is significant code duplication and conflicting implementations for event logging between PartitionedRayExecutor and EventLoggingMixin.
  • A new file partitioned_executor_base.py appears to be dead code and should be removed.
  • There are a couple of correctness bugs in the event logging and partition size optimizer that could lead to runtime errors or incorrect behavior.

I've provided detailed comments and suggestions for each issue. Addressing these will greatly improve the maintainability, correctness, and clarity of the new implementation. Once these issues are resolved, this will be a fantastic addition to Data-Juicer.

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
@cyruszhang cyruszhang changed the title [WIP] support of partitioning/checkpointing/event-logging Support of partitioning/checkpointing/event-logging Oct 1, 2025
Copy link
Collaborator

@HYLcool HYLcool left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, please pull the latest main branch, resolve the conflicts, and merge it into this dev branch. It resolves the OOM and other issues in dist unittest.

- [Data Scoring](tools/quality_classifier/README.md)
- Job Management & Monitoring
- [Processing Snapshot Utility](data_juicer/utils/job/snapshot.py) - Comprehensive job status analysis with JSON output
- [Job Management Tools](data_juicer/utils/job/) - Monitor and manage DataJuicer processing jobs
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DataJuicer --> Data-Juicer

Need to check other docs as well.

ExecutorBase,
ExecutorFactory,
PartitionedRayExecutor,
RayExecutor,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not recommended to import Ray-related executors here because importing them make data-juicer try to import ray when import data_juicer, which requires ray as a general dependency.

Instead, when using these two executors, import them from a deeper path (see tools/process_data.py) rather than from data_juicer.core.

from .default_executor import DefaultExecutor
from .factory import ExecutorFactory
from .ray_executor import RayExecutor
from .ray_executor_partitioned import PartitionedRayExecutor
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same reason as the comment in data_juicer/core/__init__.py

timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
short_hash = uuid.uuid4().hex[:6]
job_id = f"{timestamp}_{short_hash}"
setattr(cfg, "job_id", job_id)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lines 1594-1597 and lines 1600-1603 are all the same. Maybe merging and simplifying this branch with the following code is OK:

    if not job_id:
        # Generate job_id: timestamp + short hash
        timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
        short_hash = uuid.uuid4().hex[:6]
        job_id = f"{timestamp}_{short_hash}"
        setattr(cfg, "job_id", job_id)

return

# Create cli.yaml in work directory
cli_path = os.path.join(cfg.work_dir, "cli.yaml")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the path should be concatenated with cfg.job_dir instead of cfg.work_dir, because on the line 1237, the path of cli_file is constructed by cli_file = Path(job_dir) / "cli.yaml".

A test running shows that the position of cli.yaml is in the work_dir instead of job_dir.
image

Comment on lines +38 to +39
│ ├── job_summary.json # Job metadata and resumption info
│ ├── events.jsonl # Machine-readable events (JSONL format)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran the demo config file partition-checkpoint-eventlog.yaml, and there is a file named events_{timestamp}.jsonl and no job_summary.json.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's better to put these two docs in the docs/ directory, which can integrate them in the html documentation pages automatically.

9. Comprehensive job management
Usage:
python run_demo.py
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This script must be run from the Data-Juicer root directory. Need updating.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Events file not found" and "Job summary file not found" occur when running this demo. Maybe it's the same issue as the comment in data_juicer/utils/job/monitor.py. Please double check and resolve it.

image

self.assertIn('language_id_score_filter.min_score', out_str)
self.assertIn('float', out_str)

def test_auto_mode(self):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason to remove these test cases? Are they not compatible with the current config arch?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants