data_juicer.ops.op_fusion module

data_juicer.ops.op_fusion.fuse_operators(ops, probe_res=None)[source]

Fuse the input ops list and return the fused ops list.

Parameters:
  • ops – the corresponding list of op objects.

  • probe_res – the probed speed for each OP from Monitor.

Returns:

a list of fused op objects.

data_juicer.ops.op_fusion.fuse_filter_group(original_filter_group)[source]

Fuse single filter group and return the fused filter group.

Parameters:

original_filter_group – the original filter group, including op definitions and objects.

Returns:

the fused definitions and objects of the input filter group.

class data_juicer.ops.op_fusion.FusedFilter(name: str, fused_filters: List)[source]

Bases: Filter

A fused operator for filters.

__init__(name: str, fused_filters: List)[source]

Initialization method.

Parameters:

fused_filters – a list of filters to be fused.

compute_stats_batched(samples, rank=None)[source]
process_batched(samples)[source]
class data_juicer.ops.op_fusion.GeneralFusedOP(batch_size: int = 1, fused_op_list: List | None = None, *args, **kwargs)[source]

Bases: Mapper

An explicitly fused operator designed to execute multiple sequential operations (OPs) on the same batch, enabling fine-grained control over data processing.

This operator allows for the chaining of multiple data processing steps, such as mappers and filters, into a single pass. It processes each batch of samples sequentially through the defined operations, ensuring that all specified transformations are applied in order. The operator supports both mappers, which transform data, and filters, which remove or keep samples based on computed statistics. Context variables can be passed between operations if needed. The accelerator is set to ‘cuda’ if any of the fused operations use it. The number of processes is determined by the minimum value among all fused operations. After processing, any temporary context variables, such as those used for video containers, are cleaned up.

__init__(batch_size: int = 1, fused_op_list: List | None = None, *args, **kwargs)[source]

Initialization.

Parameters:
  • batch_size – the batch size of the input samples.

  • fused_op_list – a list of OPs to be fused.

process_batched(samples, rank=None)[source]
run(dataset, *, exporter=None, tracer=None)[source]