data_juicer.ops.op_fusion module

data_juicer.ops.op_fusion.fuse_operators(ops, probe_res=None)[源代码]

Fuse the input ops list and return the fused ops list.

参数:
  • ops -- the corresponding list of op objects.

  • probe_res -- the probed speed for each OP from Monitor.

返回:

a list of fused op objects.

data_juicer.ops.op_fusion.fuse_filter_group(original_filter_group)[源代码]

Fuse single filter group and return the fused filter group.

参数:

original_filter_group -- the original filter group, including op definitions and objects.

返回:

the fused definitions and objects of the input filter group.

class data_juicer.ops.op_fusion.FusedFilter(name: str, fused_filters: List)[源代码]

基类:Filter

A fused operator for filters.

__init__(name: str, fused_filters: List)[源代码]

Initialization method.

参数:

fused_filters -- a list of filters to be fused.

compute_stats_batched(samples, rank=None)[源代码]
process_batched(samples)[源代码]
class data_juicer.ops.op_fusion.GeneralFusedOP(batch_size: int = 1, fused_op_list: List | None = None, *args, **kwargs)[源代码]

基类:Mapper

An explicitly fused operator designed to execute multiple sequential operations (OPs) on the same batch, enabling fine-grained control over data processing.

__init__(batch_size: int = 1, fused_op_list: List | None = None, *args, **kwargs)[源代码]

Initialization.

参数:
  • batch_size -- the batch size of the input samples.

  • fused_op_list -- a list of OPs to be fused.

process_batched(samples, rank=None)[源代码]
run(dataset, *, exporter=None, tracer=None)[源代码]