Source code for data_juicer.core.ray_executor

import time

from loguru import logger

from data_juicer.config import init_configs
from data_juicer.core.ray_data import RayDataset
from data_juicer.ops import load_ops
from data_juicer.ops.op_fusion import fuse_operators
from data_juicer.utils.lazy_loader import LazyLoader

from .adapter import Adapter

ray = LazyLoader('ray', 'ray')
rd = LazyLoader('rd', 'ray.data')


[docs] class RayExecutor: """ Executor based on Ray. Run Data-Juicer data processing in a distributed cluster. 1. Support Filter, Mapper and Exact Deduplicator operators for now. 2. Only support loading `.json` files. 3. Advanced functions such as checkpoint, tracer are not supported. """
[docs] def __init__(self, cfg=None): """ Initialization method. :param cfg: optional config dict. """ self.cfg = init_configs() if cfg is None else cfg self.work_dir = self.cfg.work_dir self.adapter = Adapter(self.cfg) # init ray logger.info('Initing Ray ...') ray.init(self.cfg.ray_address)
[docs] def run(self, load_data_np=None): """ Running the dataset process pipeline. :param load_data_np: number of workers when loading the dataset. :return: processed dataset. """ # 1. load data logger.info('Loading dataset with Ray...') if self.cfg.get('generated_dataset_config', None): generated_dataset_config = self.cfg.generated_dataset_config assert isinstance(generated_dataset_config, dict) and 'type' in generated_dataset_config args = generated_dataset_config.copy() obj_name = args.pop('type') from data_juicer.format.formatter import FORMATTERS dataset = FORMATTERS.modules[obj_name](**args).load_dataset() else: dataset = RayDataset.read_json(self.cfg.dataset_path) # convert all the path in dataset to absolute path dataset = RayDataset(dataset, self.cfg.dataset_path, self.cfg) # 2. extract processes logger.info('Preparing process operators...') ops = load_ops(self.cfg.process) if self.cfg.op_fusion: probe_res = None if self.cfg.fusion_strategy == 'probe': logger.info('Probe the OP speed for OP reordering...') probe_res, _ = self.adapter.probe_small_batch(dataset, ops) logger.info(f'Start OP fusion and reordering with strategy ' f'[{self.cfg.fusion_strategy}]...') ops = fuse_operators(ops, probe_res) # 3. data process logger.info('Processing data...') tstart = time.time() dataset.process(ops) tend = time.time() logger.info(f'All Ops are done in {tend - tstart:.3f}s.') # 4. data export logger.info('Exporting dataset to disk...') dataset.data.write_json(self.cfg.export_path, force_ascii=False) return dataset