data_juicer.ops.base_op module

data_juicer.ops.base_op.convert_list_dict_to_dict_list(samples)[source]
data_juicer.ops.base_op.convert_dict_list_to_list_dict(samples)[source]
data_juicer.ops.base_op.convert_arrow_to_python(method)[source]
data_juicer.ops.base_op.catch_map_batches_exception(method, skip_op_error=False, op_name=None)[source]

For batched-map sample-level fault tolerance.

data_juicer.ops.base_op.catch_map_single_exception(method, return_sample=True, skip_op_error=False, op_name=None)[source]

For single-map sample-level fault tolerance. The input sample is expected batch_size = 1.

class data_juicer.ops.base_op.OP(*args, **kwargs)[source]

Bases: object

__init__(*args, **kwargs)[source]

Base class of operators.

Parameters:
  • text_key – the key name of field that stores sample texts to be processed.

  • image_key – the key name of field that stores sample image list to be processed

  • audio_key – the key name of field that stores sample audio list to be processed

  • video_key – the key name of field that stores sample video list to be processed

  • image_bytes_key – the key name of field that stores sample image bytes list to be processed

  • query_key – the key name of field that stores sample queries

  • response_key – the key name of field that stores responses

  • history_key – the key name of field that stores history of queries and responses

  • index_key – index the samples before process if not None

  • batch_size – the batch size for processing

  • work_dir – the working directory for this operator

is_batched_op()[source]
process(*args, **kwargs)[source]
use_cuda()[source]
runtime_np()[source]
remove_extra_parameters(param_dict, keys=None)[source]

at the beginning of the init of the mapper op, call self.remove_extra_parameters(locals()) to get the init parameter dict of the op for convenience

add_parameters(init_parameter_dict, **extra_param_dict)[source]

add parameters for each sample, need to keep extra_param_dict and init_parameter_dict unchanged.

run(dataset)[source]
empty_history()[source]
class data_juicer.ops.base_op.Mapper(*args, **kwargs)[source]

Bases: OP

__init__(*args, **kwargs)[source]

Base class that conducts data editing.

Parameters:
  • text_key – the key name of field that stores sample texts to be processed.

  • image_key – the key name of field that stores sample image list to be processed

  • audio_key – the key name of field that stores sample audio list to be processed

  • video_key – the key name of field that stores sample video list to be processed

  • image_bytes_key – the key name of field that stores sample image bytes list to be processed

  • query_key – the key name of field that stores sample queries

  • response_key – the key name of field that stores responses

  • history_key – the key name of field that stores history of queries and responses

process_batched(samples, *args, **kwargs)[source]
process_single(sample)[source]

For sample level, sample –> sample

Parameters:

sample – sample to process

Returns:

processed sample

run(dataset, *, exporter=None, tracer=None)[source]
class data_juicer.ops.base_op.Filter(*args, **kwargs)[source]

Bases: OP

__init__(*args, **kwargs)[source]

Base class that removes specific info.

Parameters:
  • text_key – the key name of field that stores sample texts to be processed

  • image_key – the key name of field that stores sample image list to be processed

  • audio_key – the key name of field that stores sample audio list to be processed

  • video_key – the key name of field that stores sample video list to be processed

  • image_bytes_key – the key name of field that stores sample image bytes list to be processed

  • query_key – the key name of field that stores sample queries

  • response_key – the key name of field that stores responses

  • history_key – the key name of field that stores history of queries and responses

  • min_closed_interval – whether the min_val of the specified filter range is a closed interval. It’s True by default.

  • max_closed_interval – whether the max_val of the specified filter range is a closed interval. It’s True by default.

  • reversed_range – whether to reverse the target range [min_val, max_val] to (-∞, min_val) or (max_val, +∞). It’s False by default.

get_keep_boolean(val, min_val=None, max_val=None)[source]
compute_stats_batched(samples, *args, **kwargs)[source]
process_batched(samples)[source]
compute_stats_single(sample, context=False)[source]

Compute stats for the sample which is used as a metric to decide whether to filter this sample.

Parameters:
  • sample – input sample.

  • context – whether to store context information of intermediate vars in the sample temporarily.

Returns:

sample with computed stats

process_single(sample)[source]

For sample level, sample –> Boolean.

Parameters:

sample – sample to decide whether to filter

Returns:

true for keeping and false for filtering

run(dataset, *, exporter=None, tracer=None, reduce=True)[source]
class data_juicer.ops.base_op.Deduplicator(*args, **kwargs)[source]

Bases: OP

__init__(*args, **kwargs)[source]

Base class that conducts deduplication.

Parameters:
  • text_key – the key name of field that stores sample texts to be processed

  • image_key – the key name of field that stores sample image list to be processed

  • audio_key – the key name of field that stores sample audio list to be processed

  • video_key – the key name of field that stores sample video list to be processed

  • image_bytes_key – the key name of field that stores sample image bytes list to be processed

  • query_key – the key name of field that stores sample queries

  • response_key – the key name of field that stores responses

  • history_key – the key name of field that stores history of queries and responses

compute_hash(sample)[source]

Compute hash values for the sample.

Parameters:

sample – input sample

Returns:

sample with computed hash value.

process(dataset, show_num=0)[source]

For doc-level, dataset –> dataset.

Parameters:
  • dataset – input dataset

  • show_num – number of traced samples used when tracer is open.

Returns:

deduplicated dataset and the sampled duplicate pairs.

run(dataset, *, exporter=None, tracer=None, reduce=True)[source]
class data_juicer.ops.base_op.Selector(*args, **kwargs)[source]

Bases: OP

__init__(*args, **kwargs)[source]

Base class that conducts selection in dataset-level.

Parameters:
  • text_key – the key name of field that stores sample texts to be processed

  • image_key – the key name of field that stores sample image list to be processed

  • audio_key – the key name of field that stores sample audio list to be processed

  • video_key – the key name of field that stores sample video list to be processed

  • image_bytes_key – the key name of field that stores sample image bytes list to be processed

  • query_key – the key name of field that stores sample queries

  • response_key – the key name of field that stores responses

  • history_key – the key name of field that stores history of queries and responses

process(dataset)[source]

Dataset –> dataset.

Parameters:

dataset – input dataset

Returns:

selected dataset.

run(dataset, *, exporter=None, tracer=None)[source]
class data_juicer.ops.base_op.Grouper(*args, **kwargs)[source]

Bases: OP

__init__(*args, **kwargs)[source]

Base class that group samples.

Parameters:
  • text_key – the key name of field that stores sample texts to be processed

  • image_key – the key name of field that stores sample image list to be processed

  • audio_key – the key name of field that stores sample audio list to be processed

  • video_key – the key name of field that stores sample video list to be processed

  • image_bytes_key – the key name of field that stores sample image bytes list to be processed

  • query_key – the key name of field that stores sample queries

  • response_key – the key name of field that stores responses

  • history_key – the key name of field that stores history of queries and responses

process(dataset)[source]

Dataset –> dataset.

Parameters:

dataset – input dataset

Returns:

dataset of batched samples.

run(dataset, *, exporter=None, tracer=None)[source]
class data_juicer.ops.base_op.Aggregator(*args, **kwargs)[source]

Bases: OP

__init__(*args, **kwargs)[source]

Base class that group samples.

Parameters:
  • text_key – the key name of field that stores sample texts to be processed

  • image_key – the key name of field that stores sample image list to be processed

  • audio_key – the key name of field that stores sample audio list to be processed

  • video_key – the key name of field that stores sample video list to be processed

  • image_bytes_key – the key name of field that stores sample image bytes list to be processed

  • query_key – the key name of field that stores sample queries

  • response_key – the key name of field that stores responses

  • history_key – the key name of field that stores history of queries and responses

process_single(sample)[source]

For sample level, batched sample –> sample, the input must be the output of some Grouper OP.

Parameters:

sample – batched sample to aggregate

Returns:

aggregated sample

run(dataset, *, exporter=None, tracer=None)[source]