data_juicer.core package

Submodules

data_juicer.core.adapter module

class data_juicer.core.adapter.Adapter(cfg: dict)[source]

Bases: object

MAX_BATCH_SIZE = 10000
__init__(cfg: dict)[source]
static execute_and_probe(dataset, operators, sample_interval=0.5)[source]

Process the input dataset and probe related information for each OP in the specified operator list.

For now, we support the following targets to probe: “resource”: resource utilization for each OP. “speed”: average processing speed for each OP.

The probe result is a list and each item in the list is the probe result for each OP.

static take_batch(dataset, config)[source]

Split the dataset into batches based on configuration and load factor.

Parameters:
  • dataset – The dataset to be split

  • config – Configuration settings, including batch size

Returns:

An iterator of batches

adapt_workloads(dataset, operators)[source]

Manage the scheduling and load balancing for the dataset processing.

Parameters:
  • dataset – The dataset that needs to be processed

  • operators – Operators in the data recipe

probe_small_batch(dataset, operators)[source]

Perform small batch pre-execution to probe available resources, current load and estimated OP speed, returning load factors and speed ranks for each OP.

Notice: the probe should be run with cache enabled to avoid removing the cache files of the input dataset.

Parameters:
  • dataset – The dataset to pre-execute small batch on

  • operators – The OP list to be pre-execution and probe

Returns:

A list of probe results for each OP and the length of data batch to probe.

batch_size_strategy(load_analysis_res, base_bs=1, util_th=0.9)[source]

Decide the batch size for each op according to their workload analysis result and expected utilization threshold. We need to guarantee that the resource utilization won’t exceed the threshold. Now we only consider the buckets effect, which means the max batch size is decided by the max utilization of all types of resources except GPU util (decided by num_proc).

analyze_small_batch(dataset, current_state)[source]

Perform small batch analysis to probe the current OP-wise stats/meta distributions. The analyzed results will be stored in the directory {work_dir}/insight_mining.

Notice: the probe should be run with cache enabled to avoid removing the cache files of the input dataset.

Parameters:
  • dataset – The dataset to analyze small batch on

  • current_state – A string to indicate the current state of the input dataset. It usually consists of a number of the index of the OP processed just now and the OP name, e.g. “1_text_length_filter”.

insight_mining(pval_th=0.05)[source]

Mining the insights from the OP-wise analysis results. For now, we use T-Test to check the significance of stats/meta changes before and after each OP processing. If the p-value is less than a given threshold (usually 0.05), we think the stats/meta changes are significant. The insight mining results will be stored in the file {work_dir}/insight_mining/insight_mining.json.

Parameters:

pval_th – the threshold of p-value.

data_juicer.core.analyzer module

class data_juicer.core.analyzer.Analyzer(cfg: Namespace | None = None)[source]

Bases: object

This Analyzer class is used to analyze a specific dataset.

It will compute stats for all filter ops in the config file, apply multiple analysis (e.g. OverallAnalysis, ColumnWiseAnalysis, etc.) on these stats, and generate the analysis results (stats tables, distribution figures, etc.) to help users understand the input dataset better.

__init__(cfg: Namespace | None = None)[source]

Initialization method.

Parameters:

cfg – optional jsonargparse Namespace dict.

run(dataset: Dataset | NestedDataset | None = None, load_data_np: Annotated[int, Gt(gt=0)] | None = None, skip_export: bool = False, skip_return: bool = False)[source]

Running the dataset analysis pipeline.

Parameters:
  • dataset – a Dataset object to be analyzed.

  • load_data_np – number of workers when loading the dataset.

  • skip_export – whether export the results into disk

  • skip_return – skip return for API called.

Returns:

analyzed dataset.

data_juicer.core.data module

class data_juicer.core.data.DJDataset[source]

Bases: ABC

Base dataset of DJ

abstract process(operators, *, exporter=None, checkpointer=None, tracer=None) DJDataset[source]

process a list of operators on the dataset.

data_juicer.core.data.wrap_func_with_nested_access(f)[source]

Before conducting actual function f, wrap its args and kargs into nested ones.

Parameters:

f – function to be wrapped.

Returns:

wrapped function

data_juicer.core.data.nested_obj_factory(obj)[source]

Use nested classes to wrap the input object.

Parameters:

obj – object to be nested.

Returns:

nested object

class data_juicer.core.data.NestedQueryDict(*args, **kargs)[source]

Bases: dict

Enhanced dict for better usability.

__init__(*args, **kargs)[source]
class data_juicer.core.data.NestedDatasetDict(*args, **kargs)[source]

Bases: DatasetDict

Enhanced HuggingFace-DatasetDict for better usability and efficiency.

__init__(*args, **kargs)[source]
map(**args)[source]

Override the map func, which is called by most common operations, such that the processed samples can be accessed by nested manner.

class data_juicer.core.data.NestedDataset(*args, **kargs)[source]

Bases: Dataset, DJDataset

Enhanced HuggingFace-Dataset for better usability and efficiency.

__init__(*args, **kargs)[source]
process(operators, *, work_dir=None, exporter=None, checkpointer=None, tracer=None, adapter=None, open_monitor=True)[source]

process a list of operators on the dataset.

update_args(args, kargs, is_filter=False)[source]
map(*args, **kargs)[source]

Override the map func, which is called by most common operations, such that the processed samples can be accessed by nested manner.

filter(*args, **kargs)[source]

Override the filter func, which is called by most common operations, such that the processed samples can be accessed by nested manner.

select(*args, **kargs)[source]

Override the select func, such that selected samples can be accessed by nested manner.

classmethod from_dict(*args, **kargs)[source]

Override the from_dict func, which is called by most from_xx constructors, such that the constructed dataset object is NestedDataset.

add_column(*args, **kargs)[source]

Override the add column func, such that the processed samples can be accessed by nested manner.

select_columns(*args, **kargs)[source]

Override the select columns func, such that the processed samples can be accessed by nested manner.

remove_columns(*args, **kargs)[source]

Override the remove columns func, such that the processed samples can be accessed by nested manner.

cleanup_cache_files()[source]

Override the cleanup_cache_files func, clear raw and compressed cache files.

static load_from_disk(*args, **kargs)[source]

Loads a dataset that was previously saved using [save_to_disk] from a dataset directory, or from a filesystem using any implementation of fsspec.spec.AbstractFileSystem.

Parameters:
  • dataset_path (path-like) – Path (e.g. “dataset/train”) or remote URI (e.g. “s3//my-bucket/dataset/train”) of the dataset directory where the dataset will be loaded from.

  • keep_in_memory (bool, defaults to None) – Whether to copy the dataset in-memory. If None, the dataset will not be copied in-memory unless explicitly enabled by setting datasets.config.IN_MEMORY_MAX_SIZE to nonzero. See more details in the [improve performance](../cache#improve-performance) section.

  • storage_options (dict, optional) –

    Key/value pairs to be passed on to the file-system backend, if any.

    <Added version=”2.8.0”/>

Returns:

  • If dataset_path is a path of a dataset directory, the dataset requested.

  • If dataset_path is a path of a dataset dict directory, a datasets.DatasetDict with each split.

Return type:

[Dataset] or [DatasetDict]

Example:

`py >>> ds = load_from_disk("path/to/dataset/directory") `

data_juicer.core.data.nested_query(root_obj: NestedDatasetDict | NestedDataset | NestedQueryDict, key)[source]

Find item from a given object, by first checking flatten layer, then checking nested layers.

Parameters:
  • root_obj – the object

  • key – the stored item to be queried, e.g., “meta” or “meta.date”

Returns:

data_juicer.core.data.add_same_content_to_new_column(sample, new_column_name, initial_value=None)[source]

A helper function to speed up add_column function. Apply map on this function in parallel instead of using add_column. :param sample: a single sample to add this new column/field. :param new_column_name: the name of this new column/field. :param initial_value: the initial value of this new column/field.

data_juicer.core.executor module

class data_juicer.core.executor.Executor(cfg: Namespace | None = None)[source]

Bases: object

This Executor class is used to process a specific dataset.

It will load the dataset and unify the format, then apply all the ops in the config file in order and generate a processed dataset.

__init__(cfg: Namespace | None = None)[source]

Initialization method.

Parameters:

cfg – optional jsonargparse Namespace.

sample_data(dataset_to_sample: Dataset | None = None, load_data_np=None, sample_ratio: float = 1.0, sample_algo: str = 'uniform', **kwargs)[source]

Sample a subset from the given dataset.

Parameters:
  • dataset_to_sample – Dataset to sample from. If None, will use the formatter linked by the executor. Default is None.

  • load_data_np – number of workers when loading the dataset.

  • sample_ratio – The ratio of the sample size to the original dataset size. Default is 1.0 (no sampling).

  • sample_algo – Sampling algorithm to use. Options are “uniform”, “frequency_specified_field_selector”, or “topk_specified_field_selector”. Default is “uniform”.

Returns:

A sampled Dataset.

run(load_data_np: Annotated[int, Gt(gt=0)] | None = None, skip_return=False)[source]

Running the dataset process pipeline.

Parameters:
  • load_data_np – number of workers when loading the dataset.

  • skip_return – skip return for API called.

Returns:

processed dataset.

data_juicer.core.exporter module

class data_juicer.core.exporter.Exporter(export_path, export_shard_size=0, export_in_parallel=True, num_proc=1, export_ds=True, keep_stats_in_res_ds=False, keep_hashes_in_res_ds=False, export_stats=True)[source]

Bases: object

The Exporter class is used to export a dataset to files of specific format.

KiB = 1024
MiB = 1048576
GiB = 1073741824
TiB = 1099511627776
__init__(export_path, export_shard_size=0, export_in_parallel=True, num_proc=1, export_ds=True, keep_stats_in_res_ds=False, keep_hashes_in_res_ds=False, export_stats=True)[source]

Initialization method.

Parameters:
  • export_path – the path to export datasets.

  • export_shard_size – the size of each shard of exported dataset. In default, it’s 0, which means export the dataset to a single file.

  • num_proc – number of process to export the dataset.

  • export_ds – whether to export the dataset contents.

  • keep_stats_in_res_ds – whether to keep stats in the result dataset.

  • keep_hashes_in_res_ds – whether to keep hashes in the result dataset.

  • export_stats – whether to export the stats of dataset.

export(dataset)[source]

Export method for a dataset.

Parameters:

dataset – the dataset to export.

Returns:

export_compute_stats(dataset, export_path)[source]

Export method for saving compute status in filters

static to_jsonl(dataset, export_path, num_proc=1, **kwargs)[source]

Export method for jsonl target files.

Parameters:
  • dataset – the dataset to export.

  • export_path – the path to store the exported dataset.

  • num_proc – the number of processes used to export the dataset.

  • kwargs – extra arguments.

Returns:

static to_json(dataset, export_path, num_proc=1, **kwargs)[source]

Export method for json target files.

Parameters:
  • dataset – the dataset to export.

  • export_path – the path to store the exported dataset.

  • num_proc – the number of processes used to export the dataset.

  • kwargs – extra arguments.

Returns:

static to_parquet(dataset, export_path, **kwargs)[source]

Export method for parquet target files.

Parameters:
  • dataset – the dataset to export.

  • export_path – the path to store the exported dataset.

  • kwargs – extra arguments.

Returns:

data_juicer.core.monitor module

data_juicer.core.monitor.resource_monitor(mdict, interval)[source]
class data_juicer.core.monitor.Monitor[source]

Bases: object

Monitor resource utilization and other information during the data processing.

Resource utilization dict: (for each func) ‘’’python {

‘time’: 10, ‘sampling interval’: 0.5, ‘resource’: [

{

‘timestamp’: xxx, ‘CPU count’: xxx, ‘GPU free mem.’: xxx. …

}, {

‘timestamp’: xxx, ‘CPU count’: xxx, ‘GPU free mem.’: xxx, …

},

]

}

Based on the structure above, the resource utilization analysis result will add several extra fields on the first level: ‘’’python {

‘time’: 10, ‘sampling interval’: 0.5, ‘resource’: […], ‘resource_analysis’: {

‘GPU free mem.’: {

‘max’: xxx, ‘min’: xxx, ‘avg’: xxx,

}

}

Only those fields in DYNAMIC_FIELDS will be analyzed.

DYNAMIC_FIELDS = {'Available mem.', 'CPU util.', 'Free mem.', 'GPU free mem.', 'GPU used mem.', 'GPU util.', 'Mem. util.', 'Used mem.'}
__init__()[source]
monitor_all_resources()[source]

Detect the resource utilization of all distributed nodes.

static monitor_current_resources()[source]

Detect the resource utilization of the current environment/machine. All data of “util.” is ratios in the range of [0.0, 1.0]. All data of “mem.” is in MB.

static draw_resource_util_graph(resource_util_list, store_dir)[source]
static analyze_resource_util_list(resource_util_list)[source]

Analyze the resource utilization for a given resource util list. Compute {‘max’, ‘min’, ‘avg’} of resource metrics for each dict item.

static analyze_single_resource_util(resource_util_dict)[source]

Analyze the resource utilization for a single resource util dict. Compute {‘max’, ‘min’, ‘avg’} of each resource metrics.

static monitor_func(func, args=None, sample_interval=0.5)[source]

Process the input dataset and probe related information for each OP in the specified operator list.

For now, we support the following targets to probe: “resource”: resource utilization for each OP. “speed”: average processing speed for each OP.

The probe result is a list and each item in the list is the probe result for each OP.

data_juicer.core.ray_data module

data_juicer.core.ray_data.get_abs_path(path, dataset_dir)[source]
data_juicer.core.ray_data.convert_to_absolute_paths(samples, dataset_dir, path_keys)[source]
data_juicer.core.ray_data.set_dataset_to_absolute_path(dataset, dataset_path, cfg)[source]

Set all the path in input data to absolute path. Checks dataset_dir and project_dir for valid paths.

data_juicer.core.ray_data.preprocess_dataset(dataset: Dataset, dataset_path, cfg) Dataset[source]
data_juicer.core.ray_data.get_num_gpus(op, op_proc)[source]
data_juicer.core.ray_data.filter_batch(batch, filter_func)[source]
class data_juicer.core.ray_data.RayDataset(dataset: Dataset, dataset_path: str | None = None, cfg=None)[source]

Bases: DJDataset

__init__(dataset: Dataset, dataset_path: str | None = None, cfg=None) None[source]
process(operators, *, exporter=None, checkpointer=None, tracer=None) DJDataset[source]

process a list of operators on the dataset.

classmethod read_json(paths: str | List[str]) RayDataset[source]
class data_juicer.core.ray_data.JSONStreamDatasource(paths: str | List[str], *, arrow_json_args: Dict[str, Any] | None = None, **file_based_datasource_kwargs)[source]

Bases: JSONDatasource

A temp Datasource for reading json stream.

Note

Depends on a customized pyarrow with open_json method.

data_juicer.core.ray_data.read_json_stream(paths: str | List[str], *, filesystem: FileSystem | None = None, parallelism: int = -1, ray_remote_args: Dict[str, Any] | None = None, arrow_open_stream_args: Dict[str, Any] | None = None, meta_provider=None, partition_filter=None, partitioning=Partitioning(style='hive', base_dir='', field_names=None, filesystem=None), include_paths: bool = False, ignore_missing_paths: bool = False, shuffle: Literal['files'] | None = None, file_extensions: List[str] | None = ['json', 'jsonl'], concurrency: int | None = None, override_num_blocks: int | None = None, **arrow_json_args) Dataset[source]

data_juicer.core.ray_executor module

class data_juicer.core.ray_executor.RayExecutor(cfg=None)[source]

Bases: object

Executor based on Ray.

Run Data-Juicer data processing in a distributed cluster.

  1. Support Filter, Mapper and Exact Deduplicator operators for now.

  2. Only support loading .json files.

  3. Advanced functions such as checkpoint, tracer are not supported.

__init__(cfg=None)[source]

Initialization method.

Parameters:

cfg – optional config dict.

run(load_data_np=None)[source]

Running the dataset process pipeline.

Parameters:

load_data_np – number of workers when loading the dataset.

Returns:

processed dataset.

data_juicer.core.tracer module

class data_juicer.core.tracer.Tracer(work_dir, show_num=10)[source]

Bases: object

The tracer to trace the sample changes before and after an operator process.

The comparison results will be stored in the work directory.

__init__(work_dir, show_num=10)[source]

Initialization method.

Parameters:
  • work_dir – the work directory to store the comparison results

  • show_num – the maximum number of samples to show in the comparison result files.

trace_mapper(op_name: str, previous_ds: Dataset, processed_ds: Dataset, text_key: str)[source]

Compare datasets before and after a Mapper.

This will mainly show the different sample pairs due to the modification by the Mapper

Parameters:
  • op_name – the op name of mapper

  • previous_ds – dataset before the mapper process

  • processed_ds – dataset processed by the mapper

  • text_key – which text_key to trace

Returns:

trace_batch_mapper(op_name: str, previous_ds: Dataset, processed_ds: Dataset, text_key: str)[source]

Compare datasets before and after a BatchMapper.

This will mainly show the new samples augmented by the BatchMapper

Parameters:
  • op_name – the op name of mapper

  • previous_ds – dataset before the mapper process

  • processed_ds – dataset processed by the mapper

  • text_key – which text_key to trace

Returns:

trace_filter(op_name: str, previous_ds: Dataset, processed_ds: Dataset)[source]

Compare datasets before and after a Filter.

This will mainly show the filtered samples by the Filter

Parameters:
  • op_name – the op name of filter

  • previous_ds – dataset before the filter process

  • processed_ds – dataset processed by the filter

Returns:

trace_deduplicator(op_name: str, dup_pairs: list)[source]

Compare datasets before and after a Deduplicator.

This will mainly show the near-duplicate sample pairs extracted by the Deduplicator. Different from the other two trace methods, the trace process for deduplicator is embedded into the process method of deduplicator, but the other two trace methods are independent of the process method of mapper and filter operators

Parameters:
  • op_name – the op name of deduplicator

  • dup_pairs – duplicate sample pairs obtained from deduplicator

Returns:

Module contents

class data_juicer.core.Adapter(cfg: dict)[source]

Bases: object

MAX_BATCH_SIZE = 10000
__init__(cfg: dict)[source]
static execute_and_probe(dataset, operators, sample_interval=0.5)[source]

Process the input dataset and probe related information for each OP in the specified operator list.

For now, we support the following targets to probe: “resource”: resource utilization for each OP. “speed”: average processing speed for each OP.

The probe result is a list and each item in the list is the probe result for each OP.

static take_batch(dataset, config)[source]

Split the dataset into batches based on configuration and load factor.

Parameters:
  • dataset – The dataset to be split

  • config – Configuration settings, including batch size

Returns:

An iterator of batches

adapt_workloads(dataset, operators)[source]

Manage the scheduling and load balancing for the dataset processing.

Parameters:
  • dataset – The dataset that needs to be processed

  • operators – Operators in the data recipe

probe_small_batch(dataset, operators)[source]

Perform small batch pre-execution to probe available resources, current load and estimated OP speed, returning load factors and speed ranks for each OP.

Notice: the probe should be run with cache enabled to avoid removing the cache files of the input dataset.

Parameters:
  • dataset – The dataset to pre-execute small batch on

  • operators – The OP list to be pre-execution and probe

Returns:

A list of probe results for each OP and the length of data batch to probe.

batch_size_strategy(load_analysis_res, base_bs=1, util_th=0.9)[source]

Decide the batch size for each op according to their workload analysis result and expected utilization threshold. We need to guarantee that the resource utilization won’t exceed the threshold. Now we only consider the buckets effect, which means the max batch size is decided by the max utilization of all types of resources except GPU util (decided by num_proc).

analyze_small_batch(dataset, current_state)[source]

Perform small batch analysis to probe the current OP-wise stats/meta distributions. The analyzed results will be stored in the directory {work_dir}/insight_mining.

Notice: the probe should be run with cache enabled to avoid removing the cache files of the input dataset.

Parameters:
  • dataset – The dataset to analyze small batch on

  • current_state – A string to indicate the current state of the input dataset. It usually consists of a number of the index of the OP processed just now and the OP name, e.g. “1_text_length_filter”.

insight_mining(pval_th=0.05)[source]

Mining the insights from the OP-wise analysis results. For now, we use T-Test to check the significance of stats/meta changes before and after each OP processing. If the p-value is less than a given threshold (usually 0.05), we think the stats/meta changes are significant. The insight mining results will be stored in the file {work_dir}/insight_mining/insight_mining.json.

Parameters:

pval_th – the threshold of p-value.

class data_juicer.core.Analyzer(cfg: Namespace | None = None)[source]

Bases: object

This Analyzer class is used to analyze a specific dataset.

It will compute stats for all filter ops in the config file, apply multiple analysis (e.g. OverallAnalysis, ColumnWiseAnalysis, etc.) on these stats, and generate the analysis results (stats tables, distribution figures, etc.) to help users understand the input dataset better.

__init__(cfg: Namespace | None = None)[source]

Initialization method.

Parameters:

cfg – optional jsonargparse Namespace dict.

run(dataset: Dataset | NestedDataset | None = None, load_data_np: Annotated[int, Gt(gt=0)] | None = None, skip_export: bool = False, skip_return: bool = False)[source]

Running the dataset analysis pipeline.

Parameters:
  • dataset – a Dataset object to be analyzed.

  • load_data_np – number of workers when loading the dataset.

  • skip_export – whether export the results into disk

  • skip_return – skip return for API called.

Returns:

analyzed dataset.

class data_juicer.core.NestedDataset(*args, **kargs)[source]

Bases: Dataset, DJDataset

Enhanced HuggingFace-Dataset for better usability and efficiency.

__init__(*args, **kargs)[source]
process(operators, *, work_dir=None, exporter=None, checkpointer=None, tracer=None, adapter=None, open_monitor=True)[source]

process a list of operators on the dataset.

update_args(args, kargs, is_filter=False)[source]
map(*args, **kargs)[source]

Override the map func, which is called by most common operations, such that the processed samples can be accessed by nested manner.

filter(*args, **kargs)[source]

Override the filter func, which is called by most common operations, such that the processed samples can be accessed by nested manner.

select(*args, **kargs)[source]

Override the select func, such that selected samples can be accessed by nested manner.

classmethod from_dict(*args, **kargs)[source]

Override the from_dict func, which is called by most from_xx constructors, such that the constructed dataset object is NestedDataset.

add_column(*args, **kargs)[source]

Override the add column func, such that the processed samples can be accessed by nested manner.

select_columns(*args, **kargs)[source]

Override the select columns func, such that the processed samples can be accessed by nested manner.

remove_columns(*args, **kargs)[source]

Override the remove columns func, such that the processed samples can be accessed by nested manner.

cleanup_cache_files()[source]

Override the cleanup_cache_files func, clear raw and compressed cache files.

static load_from_disk(*args, **kargs)[source]

Loads a dataset that was previously saved using [save_to_disk] from a dataset directory, or from a filesystem using any implementation of fsspec.spec.AbstractFileSystem.

Parameters:
  • dataset_path (path-like) – Path (e.g. “dataset/train”) or remote URI (e.g. “s3//my-bucket/dataset/train”) of the dataset directory where the dataset will be loaded from.

  • keep_in_memory (bool, defaults to None) – Whether to copy the dataset in-memory. If None, the dataset will not be copied in-memory unless explicitly enabled by setting datasets.config.IN_MEMORY_MAX_SIZE to nonzero. See more details in the [improve performance](../cache#improve-performance) section.

  • storage_options (dict, optional) –

    Key/value pairs to be passed on to the file-system backend, if any.

    <Added version=”2.8.0”/>

Returns:

  • If dataset_path is a path of a dataset directory, the dataset requested.

  • If dataset_path is a path of a dataset dict directory, a datasets.DatasetDict with each split.

Return type:

[Dataset] or [DatasetDict]

Example:

`py >>> ds = load_from_disk("path/to/dataset/directory") `

class data_juicer.core.Executor(cfg: Namespace | None = None)[source]

Bases: object

This Executor class is used to process a specific dataset.

It will load the dataset and unify the format, then apply all the ops in the config file in order and generate a processed dataset.

__init__(cfg: Namespace | None = None)[source]

Initialization method.

Parameters:

cfg – optional jsonargparse Namespace.

sample_data(dataset_to_sample: Dataset | None = None, load_data_np=None, sample_ratio: float = 1.0, sample_algo: str = 'uniform', **kwargs)[source]

Sample a subset from the given dataset.

Parameters:
  • dataset_to_sample – Dataset to sample from. If None, will use the formatter linked by the executor. Default is None.

  • load_data_np – number of workers when loading the dataset.

  • sample_ratio – The ratio of the sample size to the original dataset size. Default is 1.0 (no sampling).

  • sample_algo – Sampling algorithm to use. Options are “uniform”, “frequency_specified_field_selector”, or “topk_specified_field_selector”. Default is “uniform”.

Returns:

A sampled Dataset.

run(load_data_np: Annotated[int, Gt(gt=0)] | None = None, skip_return=False)[source]

Running the dataset process pipeline.

Parameters:
  • load_data_np – number of workers when loading the dataset.

  • skip_return – skip return for API called.

Returns:

processed dataset.

class data_juicer.core.Exporter(export_path, export_shard_size=0, export_in_parallel=True, num_proc=1, export_ds=True, keep_stats_in_res_ds=False, keep_hashes_in_res_ds=False, export_stats=True)[source]

Bases: object

The Exporter class is used to export a dataset to files of specific format.

KiB = 1024
MiB = 1048576
GiB = 1073741824
TiB = 1099511627776
__init__(export_path, export_shard_size=0, export_in_parallel=True, num_proc=1, export_ds=True, keep_stats_in_res_ds=False, keep_hashes_in_res_ds=False, export_stats=True)[source]

Initialization method.

Parameters:
  • export_path – the path to export datasets.

  • export_shard_size – the size of each shard of exported dataset. In default, it’s 0, which means export the dataset to a single file.

  • num_proc – number of process to export the dataset.

  • export_ds – whether to export the dataset contents.

  • keep_stats_in_res_ds – whether to keep stats in the result dataset.

  • keep_hashes_in_res_ds – whether to keep hashes in the result dataset.

  • export_stats – whether to export the stats of dataset.

export(dataset)[source]

Export method for a dataset.

Parameters:

dataset – the dataset to export.

Returns:

export_compute_stats(dataset, export_path)[source]

Export method for saving compute status in filters

static to_jsonl(dataset, export_path, num_proc=1, **kwargs)[source]

Export method for jsonl target files.

Parameters:
  • dataset – the dataset to export.

  • export_path – the path to store the exported dataset.

  • num_proc – the number of processes used to export the dataset.

  • kwargs – extra arguments.

Returns:

static to_json(dataset, export_path, num_proc=1, **kwargs)[source]

Export method for json target files.

Parameters:
  • dataset – the dataset to export.

  • export_path – the path to store the exported dataset.

  • num_proc – the number of processes used to export the dataset.

  • kwargs – extra arguments.

Returns:

static to_parquet(dataset, export_path, **kwargs)[source]

Export method for parquet target files.

Parameters:
  • dataset – the dataset to export.

  • export_path – the path to store the exported dataset.

  • kwargs – extra arguments.

Returns:

class data_juicer.core.Monitor[source]

Bases: object

Monitor resource utilization and other information during the data processing.

Resource utilization dict: (for each func) ‘’’python {

‘time’: 10, ‘sampling interval’: 0.5, ‘resource’: [

{

‘timestamp’: xxx, ‘CPU count’: xxx, ‘GPU free mem.’: xxx. …

}, {

‘timestamp’: xxx, ‘CPU count’: xxx, ‘GPU free mem.’: xxx, …

},

]

}

Based on the structure above, the resource utilization analysis result will add several extra fields on the first level: ‘’’python {

‘time’: 10, ‘sampling interval’: 0.5, ‘resource’: […], ‘resource_analysis’: {

‘GPU free mem.’: {

‘max’: xxx, ‘min’: xxx, ‘avg’: xxx,

}

}

Only those fields in DYNAMIC_FIELDS will be analyzed.

DYNAMIC_FIELDS = {'Available mem.', 'CPU util.', 'Free mem.', 'GPU free mem.', 'GPU used mem.', 'GPU util.', 'Mem. util.', 'Used mem.'}
__init__()[source]
monitor_all_resources()[source]

Detect the resource utilization of all distributed nodes.

static monitor_current_resources()[source]

Detect the resource utilization of the current environment/machine. All data of “util.” is ratios in the range of [0.0, 1.0]. All data of “mem.” is in MB.

static draw_resource_util_graph(resource_util_list, store_dir)[source]
static analyze_resource_util_list(resource_util_list)[source]

Analyze the resource utilization for a given resource util list. Compute {‘max’, ‘min’, ‘avg’} of resource metrics for each dict item.

static analyze_single_resource_util(resource_util_dict)[source]

Analyze the resource utilization for a single resource util dict. Compute {‘max’, ‘min’, ‘avg’} of each resource metrics.

static monitor_func(func, args=None, sample_interval=0.5)[source]

Process the input dataset and probe related information for each OP in the specified operator list.

For now, we support the following targets to probe: “resource”: resource utilization for each OP. “speed”: average processing speed for each OP.

The probe result is a list and each item in the list is the probe result for each OP.

class data_juicer.core.Tracer(work_dir, show_num=10)[source]

Bases: object

The tracer to trace the sample changes before and after an operator process.

The comparison results will be stored in the work directory.

__init__(work_dir, show_num=10)[source]

Initialization method.

Parameters:
  • work_dir – the work directory to store the comparison results

  • show_num – the maximum number of samples to show in the comparison result files.

trace_mapper(op_name: str, previous_ds: Dataset, processed_ds: Dataset, text_key: str)[source]

Compare datasets before and after a Mapper.

This will mainly show the different sample pairs due to the modification by the Mapper

Parameters:
  • op_name – the op name of mapper

  • previous_ds – dataset before the mapper process

  • processed_ds – dataset processed by the mapper

  • text_key – which text_key to trace

Returns:

trace_batch_mapper(op_name: str, previous_ds: Dataset, processed_ds: Dataset, text_key: str)[source]

Compare datasets before and after a BatchMapper.

This will mainly show the new samples augmented by the BatchMapper

Parameters:
  • op_name – the op name of mapper

  • previous_ds – dataset before the mapper process

  • processed_ds – dataset processed by the mapper

  • text_key – which text_key to trace

Returns:

trace_filter(op_name: str, previous_ds: Dataset, processed_ds: Dataset)[source]

Compare datasets before and after a Filter.

This will mainly show the filtered samples by the Filter

Parameters:
  • op_name – the op name of filter

  • previous_ds – dataset before the filter process

  • processed_ds – dataset processed by the filter

Returns:

trace_deduplicator(op_name: str, dup_pairs: list)[source]

Compare datasets before and after a Deduplicator.

This will mainly show the near-duplicate sample pairs extracted by the Deduplicator. Different from the other two trace methods, the trace process for deduplicator is embedded into the process method of deduplicator, but the other two trace methods are independent of the process method of mapper and filter operators

Parameters:
  • op_name – the op name of deduplicator

  • dup_pairs – duplicate sample pairs obtained from deduplicator

Returns: