Data-Juicer 分布式数据处理¶
概览¶
Data-Juicer 支持基于 Ray 和阿里巴巴 PAI 的大规模分布式数据处理。
经过专门的设计后,几乎所有在单机模式下实现的 Data-Juicer 算子都可以无缝地运行在 Ray 的分布式模式下。对于大规模场景,我们继续进行了针对计算引擎的特定优化,例如用于平衡文件和进程数目的数据子集分割策略,针对 Ray 和 Apache Arrow的 JSON 文件流式 I/O 补丁等。
作为参考,我们在 25 到 100 个阿里云节点上进行了实验,使用 Ray 模式下的 Data-Juicer 处理不同的数据集。在 6,400 个 CPU 核上处理包含 700 亿条样本的数据集只需要花费 2 小时,在 3,200 个 CPU 核上处理包含 70 亿条样本的数据集只需要花费 0.45 小时。此外,在 Ray 模式下,对 TB 大小级别的数据集,Data-Juicer 的 MinHash-LSH 去重算子在 1,280 个 CPU 核的 8 节点集群上进行去重只需 3 小时。
更多细节请参考我们的论文:Data-Juicer 2.0: Cloud-Scale Adaptive Data Processing for Foundation Models 。

实现与优化¶
Data-Juicer 的 Ray 处理模式¶
对于 Data-Juicer 的大部分算子实现,其核心处理函数是引擎无关的。RayDataset 和 RayExecutor 封装了与Ray引擎的具体互操作,它们分别是基类
DJDataset
和BaseExecutor
的子类,并且都支持 Ray Tasks 和 Actors 。其中,去重算子是例外。它们在单机模式下很难规模化。因此我们提供了针对它们的 Ray 优化版本算子,并以特殊前缀开头:
ray_xx_deduplicator
。
数据子集分割¶
当在上万个节点中处理仅有若干个文件的数据集时, Ray 会根据可用资源分割数据集文件,并将它们分发到所有节点上,这可能带来极大的网络通信开销并减少 CPU 利用率。更多细节可以参考文档 Ray's autodetect_parallelism 和 tuning output blocks for Ray 。
这种默认执行计划可能非常低效,尤其是在节点数量较多的情况下。为了优化此类情况的性能,我们考虑到 Ray 和 Arrow 的特性,提前将原始数据集自动拆分为较小的文件。当用户遇到此类性能问题时,他们可以利用此功能或根据偏好自己拆分数据集。在我们的自动拆分策略中,单个文件大小设置为 128MB,且结果应确保 拆分后的子文件数量 至少是 集群中可用CPU核心总数 的两倍。对应工具可在tools/data_resplit.py获取。
JSON 文件的流式读取¶
为了解决 Ray Dataset 类底层框架 Arrow 对流式读取 JSON 数据的原生支持的缺失,我们开发了一个流式载入的接口并贡献到了一个针对 Apache Arrow 的内部 补丁( 相关 PR ) 。这个补丁可以缓解内存不够的问题。
流式读取 JSON 文件是基础模型数据处理中的常见要求,因为许多数据集都以 JSONL 格式存储,并且尺寸巨大。 但是,Ray Datasets 中当前的实现不支持流式读取 JSON 文件,根因来源于其底层 Arrow 库(截至 Ray 版本 2.40 和 Arrow 版本 18.1.0)。
为了解决不支持流式 JSON 数据的原生读取问题,我们开发了一个流式加载接口,并为 Apache Arrow 贡献了一个第三方 补丁(PR 到 repo)。这将有助于缓解内存不足问题。使用此补丁后, Data-Juicer 的Ray模式将默认使用流式加载接口加载 JSON 文件。此外,如果输入变为 CSV 和 Parquet 文件,Ray模式下流式读取已经会自动开启。
去重¶
在 Ray 模式下,我们提供了一个优化过的基于 MinHash-LSH 的去重算子。我们使用 Ray Actors 实现了一个多进程的并查集和一个负载均衡的分布式算法 BTS 来完成等价类合并操作。这个算子在 1,280 个CPU核上对 TB 大小级别的数据集去重只需要 3 个小时。我们的消融实验还表明相比于这个去重算子的初始实现版本,这些专门的优化项可以带来 2-3 倍的提速。
性能结果¶
不同数据规模的数据处理¶
我们在十亿样本规模的数据集上进行了实验。我们先准备了一个 56 万条样本的多模态数据集,并用不同的倍数(1-125,000倍)将其扩展来创建不同大小的数据集。下图的实验结果展示出了 Data-Juicer 的高扩展性。
大规模数据集分布式去重¶
我们在 200GB、1TB、5TB 的数据集上测试了我们的基于 MinHash 的 Ray 去重算子,测试机器的 CPU 核数从 640 核到 1280 核。如下表所示,当数据集大小增长 5 倍,处理时间增长 4.02 到 5.62 倍。当 CPU 核数翻倍,处理时间较原来减少了 58.9% 到 67.1%。
CPU 核数 |
200GB 耗时 |
1TB 耗时 |
5TB 耗时 |
---|---|---|---|
4 * 160 |
11.13 分钟 |
50.83 分钟 |
285.43 分钟 |
8 * 160 |
7.47 分钟 |
30.08 分钟 |
168.10 分钟 |
快速开始¶
在开始前,你应该安装 Data-Juicer 以及它的 dist
依赖需求:
pip install -v -e . # 安装 Data-Juicer 的最小依赖需求
pip install -v -e ".[dist]" # 包括 Ray 以及其他分布式相关的依赖库
然后启动一个 Ray 集群(参考 Ray 文档 ):
# 启动一个集群并作为头节点
ray start --head
# (可选)在其他节点或机器上连接集群
ray start --address='{head_ip}:6379'
我们在目录 demos/process_on_ray/
中准备了简单的例子,包括 2 个配置文件和 2 个测试数据集。
demos/process_on_ray
├── configs
│ ├── demo.yaml
│ └── dedup.yaml
└── data
├── demo-dataset.json
└── demo-dataset.jsonl
[!Important] 如果你要在多个节点上运行这些例子,你需要将示例数据集放置与一个共享磁盘(如 NAS)上,并且将结果数据集导出到那里。你可以通过修改配置文件中的
dataset_path
和export_path
参数来实现。
运行 Ray 模式样例¶
在配置文件 demo.yaml
中,我们将执行器类型设置为 "ray" 并且指定了自动的 Ray 地址。
...
dataset_path: './demos/process_on_ray/data/demo-dataset.jsonl'
export_path: './outputs/demo/demo-processed'
executor_type: 'ray' # 将执行器类型设置为 "ray"
ray_address: 'auto' # 设置为自动 Ray 地址
...
运行这个例子,以使用 12 个常规算子处理测试数据集:
# 从源码运行处理工具
python tools/process_data.py --config demos/process_on_ray/configs/demo.yaml
# 使用命令行工具
dj-process --config demos/process_on_ray/configs/demo.yaml
Data-Juicer 会使用示例配置文件处理示例数据集,并将结果数据集导出到配置文件中 export_path
参数指定的目录中。
运行分布式去重样例¶
在配置文件 dedup.yaml
中,我们将执行器类型设置为 "ray" 并且指定了自动的 Ray 地址。我们使用了 MinHash 去重算子专门的分布式版本来对数据集去重。
project_name: 'demo-dedup'
dataset_path: './demos/process_on_ray/data/'
export_path: './outputs/demo-dedup/demo-ray-bts-dedup-processed'
executor_type: 'ray' # 将执行器类型设置为 "ray"
ray_address: 'auto' # 设置为自动 Ray 地址
# process schedule
# a list of several process operators with their arguments
process:
- ray_bts_minhash_deduplicator: # minhash 去重算子的分布式版本
tokenization: 'character'
运行该实例来对数据集去重:
# 从源码运行处理工具
python tools/process_data.py --config demos/process_on_ray/configs/dedup.yaml
# 使用命令行工具
dj-process --config demos/process_on_ray/configs/dedup.yaml
Data-Juicer 会使用示例配置文件对示例数据集去重,并将结果数据集导出到配置文件中 export_path
参数指定的目录中。