Skip to content

base

Data Processing Operator Framework - extensible system for data transformation and filtering operations.

Provides base classes, registry, and factory system for creating modular data processing operators that can be combined into flexible processing pipelines.

BaseOperator

Bases: BaseModule, Generic[T]

Abstract base class for all data processing operators in the pipeline framework.

Operators are modular processing units that transform, filter, or modify datasets in a standardized way. Each operator processes a list of data samples and returns a modified list, enabling flexible composition into processing pipelines.

Attributes:

Name Type Description
name str

Unique identifier for the operator instance

config Dict[str, Any]

Configuration parameters specific to the operator

Source code in rm_gallery/core/data/process/ops/base.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
class BaseOperator(BaseModule, Generic[T]):
    """
    Abstract base class for all data processing operators in the pipeline framework.

    Operators are modular processing units that transform, filter, or modify datasets
    in a standardized way. Each operator processes a list of data samples and returns
    a modified list, enabling flexible composition into processing pipelines.

    Attributes:
        name: Unique identifier for the operator instance
        config: Configuration parameters specific to the operator
    """

    name: str = Field(..., description="operator name")
    config: Dict[str, Any] = Field(default_factory=dict, description="operator config")

    def __init__(self, name: str, config: Optional[Dict[str, Any]] = None, **kwargs):
        """
        Initialize operator with name and configuration.

        Args:
            name: Unique identifier for the operator
            config: Operator-specific configuration parameters
            **kwargs: Additional initialization parameters
        """
        super().__init__(name=name, config=config or {}, **kwargs)

    @abstractmethod
    def process_dataset(self, items: List[T]) -> List[T]:
        """
        Process the entire dataset with operator-specific logic.

        This is the main processing method that must be implemented by all
        concrete operators. It receives a list of data samples and returns
        a modified list after applying the operator's transformation or filtering.

        Args:
            items: List of data samples to process

        Returns:
            List of processed data samples (may be filtered or transformed)
        """
        pass

    def run(self, **kwargs):
        """
        Run method implementation for operator interface compatibility.

        Args:
            **kwargs: Runtime parameters including 'items' list

        Returns:
            Result of process_dataset method
        """
        items = kwargs.get("items", [])
        return self.process_dataset(items)

    def __str__(self) -> str:
        """
        String representation for debugging and logging.

        Returns:
            Human-readable operator description
        """
        return f"{self.__class__.__name__}({self.name})"

__init__(name, config=None, **kwargs)

Initialize operator with name and configuration.

Parameters:

Name Type Description Default
name str

Unique identifier for the operator

required
config Optional[Dict[str, Any]]

Operator-specific configuration parameters

None
**kwargs

Additional initialization parameters

{}
Source code in rm_gallery/core/data/process/ops/base.py
37
38
39
40
41
42
43
44
45
46
def __init__(self, name: str, config: Optional[Dict[str, Any]] = None, **kwargs):
    """
    Initialize operator with name and configuration.

    Args:
        name: Unique identifier for the operator
        config: Operator-specific configuration parameters
        **kwargs: Additional initialization parameters
    """
    super().__init__(name=name, config=config or {}, **kwargs)

__str__()

String representation for debugging and logging.

Returns:

Type Description
str

Human-readable operator description

Source code in rm_gallery/core/data/process/ops/base.py
78
79
80
81
82
83
84
85
def __str__(self) -> str:
    """
    String representation for debugging and logging.

    Returns:
        Human-readable operator description
    """
    return f"{self.__class__.__name__}({self.name})"

process_dataset(items) abstractmethod

Process the entire dataset with operator-specific logic.

This is the main processing method that must be implemented by all concrete operators. It receives a list of data samples and returns a modified list after applying the operator's transformation or filtering.

Parameters:

Name Type Description Default
items List[T]

List of data samples to process

required

Returns:

Type Description
List[T]

List of processed data samples (may be filtered or transformed)

Source code in rm_gallery/core/data/process/ops/base.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
@abstractmethod
def process_dataset(self, items: List[T]) -> List[T]:
    """
    Process the entire dataset with operator-specific logic.

    This is the main processing method that must be implemented by all
    concrete operators. It receives a list of data samples and returns
    a modified list after applying the operator's transformation or filtering.

    Args:
        items: List of data samples to process

    Returns:
        List of processed data samples (may be filtered or transformed)
    """
    pass

run(**kwargs)

Run method implementation for operator interface compatibility.

Parameters:

Name Type Description Default
**kwargs

Runtime parameters including 'items' list

{}

Returns:

Type Description

Result of process_dataset method

Source code in rm_gallery/core/data/process/ops/base.py
65
66
67
68
69
70
71
72
73
74
75
76
def run(self, **kwargs):
    """
    Run method implementation for operator interface compatibility.

    Args:
        **kwargs: Runtime parameters including 'items' list

    Returns:
        Result of process_dataset method
    """
    items = kwargs.get("items", [])
    return self.process_dataset(items)

DataJuicerOperator

Bases: BaseOperator[T]

Adapter class for integrating data-juicer library operators into the pipeline framework.

Wraps data-juicer operators to provide standardized interface and automatic text extraction/processing for compatibility with DataSample structures.

Attributes:

Name Type Description
juicer_op Any

Instantiated data-juicer operator for actual processing

Source code in rm_gallery/core/data/process/ops/base.py
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
class DataJuicerOperator(BaseOperator[T]):
    """
    Adapter class for integrating data-juicer library operators into the pipeline framework.

    Wraps data-juicer operators to provide standardized interface and automatic
    text extraction/processing for compatibility with DataSample structures.

    Attributes:
        juicer_op: Instantiated data-juicer operator for actual processing
    """

    juicer_op: Any = Field(..., description="Data Juicer operator instance")

    def __init__(
        self,
        name: str,
        juicer_op_class: Any,
        config: Optional[Dict[str, Any]] = None,
        **kwargs,
    ):
        """
        Initialize data-juicer operator adapter.

        Args:
            name: Operator instance name
            juicer_op_class: data-juicer operator class to wrap
            config: Configuration parameters for the juicer operator
            **kwargs: Additional initialization parameters
        """
        juicer_op = juicer_op_class(**config) if config else juicer_op_class()
        super().__init__(name=name, config=config, juicer_op=juicer_op, **kwargs)

    def process_dataset(self, items: List[T]) -> List[T]:
        """
        Process dataset using data-juicer operators with automatic text extraction.

        Extracts text content from DataSample structures, applies data-juicer
        filtering, and returns samples that pass the filter criteria.

        Args:
            items: List of DataSample objects to process

        Returns:
            Filtered list of DataSample objects that pass data-juicer criteria
        """
        try:
            all_texts = []
            text_to_item_indices = {}

            for i, item in enumerate(items):
                # Extract texts from input history
                if item.input and item.input:
                    for input_item in item.input:
                        if input_item.content:
                            all_texts.append(input_item.content)
                            text_to_item_indices.setdefault(
                                input_item.content, []
                            ).append(i)

                # Extract texts from output answers
                if item.output:
                    for output_item in item.output:
                        if output_item.answer and output_item.answer.content:
                            all_texts.append(output_item.answer.content)
                            text_to_item_indices.setdefault(
                                output_item.answer.content, []
                            ).append(i)

            if not all_texts:
                return items

            # Process with data-juicer
            sample = {
                "text": all_texts,
                "__dj__stats__": [{} for _ in range(len(all_texts))],
            }

            processed_sample = self.juicer_op.compute_stats_batched(sample)
            keep_indices = list(self.juicer_op.process_batched(processed_sample))

            # Determine which items to keep
            items_to_keep = set()
            for i, (text, keep) in enumerate(zip(all_texts, keep_indices)):
                if keep:
                    items_to_keep.update(text_to_item_indices[text])

            return [items[i] for i in range(len(items)) if i in items_to_keep]

        except Exception as e:
            logger.error(
                f"Error in dataset-level processing with operator {self.name}: {str(e)}"
            )
            return items

__init__(name, juicer_op_class, config=None, **kwargs)

Initialize data-juicer operator adapter.

Parameters:

Name Type Description Default
name str

Operator instance name

required
juicer_op_class Any

data-juicer operator class to wrap

required
config Optional[Dict[str, Any]]

Configuration parameters for the juicer operator

None
**kwargs

Additional initialization parameters

{}
Source code in rm_gallery/core/data/process/ops/base.py
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
def __init__(
    self,
    name: str,
    juicer_op_class: Any,
    config: Optional[Dict[str, Any]] = None,
    **kwargs,
):
    """
    Initialize data-juicer operator adapter.

    Args:
        name: Operator instance name
        juicer_op_class: data-juicer operator class to wrap
        config: Configuration parameters for the juicer operator
        **kwargs: Additional initialization parameters
    """
    juicer_op = juicer_op_class(**config) if config else juicer_op_class()
    super().__init__(name=name, config=config, juicer_op=juicer_op, **kwargs)

process_dataset(items)

Process dataset using data-juicer operators with automatic text extraction.

Extracts text content from DataSample structures, applies data-juicer filtering, and returns samples that pass the filter criteria.

Parameters:

Name Type Description Default
items List[T]

List of DataSample objects to process

required

Returns:

Type Description
List[T]

Filtered list of DataSample objects that pass data-juicer criteria

Source code in rm_gallery/core/data/process/ops/base.py
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
def process_dataset(self, items: List[T]) -> List[T]:
    """
    Process dataset using data-juicer operators with automatic text extraction.

    Extracts text content from DataSample structures, applies data-juicer
    filtering, and returns samples that pass the filter criteria.

    Args:
        items: List of DataSample objects to process

    Returns:
        Filtered list of DataSample objects that pass data-juicer criteria
    """
    try:
        all_texts = []
        text_to_item_indices = {}

        for i, item in enumerate(items):
            # Extract texts from input history
            if item.input and item.input:
                for input_item in item.input:
                    if input_item.content:
                        all_texts.append(input_item.content)
                        text_to_item_indices.setdefault(
                            input_item.content, []
                        ).append(i)

            # Extract texts from output answers
            if item.output:
                for output_item in item.output:
                    if output_item.answer and output_item.answer.content:
                        all_texts.append(output_item.answer.content)
                        text_to_item_indices.setdefault(
                            output_item.answer.content, []
                        ).append(i)

        if not all_texts:
            return items

        # Process with data-juicer
        sample = {
            "text": all_texts,
            "__dj__stats__": [{} for _ in range(len(all_texts))],
        }

        processed_sample = self.juicer_op.compute_stats_batched(sample)
        keep_indices = list(self.juicer_op.process_batched(processed_sample))

        # Determine which items to keep
        items_to_keep = set()
        for i, (text, keep) in enumerate(zip(all_texts, keep_indices)):
            if keep:
                items_to_keep.update(text_to_item_indices[text])

        return [items[i] for i in range(len(items)) if i in items_to_keep]

    except Exception as e:
        logger.error(
            f"Error in dataset-level processing with operator {self.name}: {str(e)}"
        )
        return items

OperatorFactory

Factory class for creating and registering data processing operators.

Provides centralized operator creation from configuration dictionaries, supports built-in operator types, and enables registration of custom operators through decorator pattern or direct registration.

Source code in rm_gallery/core/data/process/ops/base.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
class OperatorFactory:
    """
    Factory class for creating and registering data processing operators.

    Provides centralized operator creation from configuration dictionaries,
    supports built-in operator types, and enables registration of custom operators
    through decorator pattern or direct registration.
    """

    _operator_registry: Dict[str, Callable[[Dict[str, Any]], BaseOperator]] = {}
    _external_operators: Dict[str, type] = {}

    # Operator type mapping
    _operator_types = {"filter": "filter", "group": "group", "map": "map"}

    @classmethod
    def register(cls, name: str) -> Callable:
        """
        Decorator for registering operator creation functions or classes.

        Supports both function-based and class-based operator registration.
        For classes, automatically creates a factory function.

        Args:
            name: Unique operator identifier for registry lookup

        Returns:
            Decorator function that registers and returns the original object

        Example:
            @OperatorFactory.register("my_filter")
            class MyFilterOperator(BaseOperator):
                ...
        """

        def decorator(func_or_class):
            # Check if it's a class (subclass of BaseOperator)
            if isinstance(func_or_class, type) and issubclass(
                func_or_class, BaseOperator
            ):
                # Create a factory function for the class
                def class_factory(operator_config: Dict[str, Any]) -> BaseOperator:
                    op_name = operator_config.get("name", name)
                    config = operator_config.get("config", {})
                    return func_or_class(name=op_name, config=config)

                cls._operator_registry[name] = class_factory
                return func_or_class
            else:
                # It's a function, register as-is
                cls._operator_registry[name] = func_or_class
                return func_or_class

        return decorator

    @classmethod
    def create_operator(cls, operator_config: Dict[str, Any]) -> BaseOperator:
        """
        Create operator instance from configuration dictionary.

        Supports registered operators, built-in types, and external library
        operators (like data_juicer) through automatic discovery and instantiation.

        Args:
            operator_config: Configuration dictionary containing:
                - type: Operator type identifier
                - name: Operator instance name
                - config: Operator-specific parameters

        Returns:
            Configured operator instance ready for pipeline integration

        Raises:
            ValueError: If operator type is unknown or unsupported
            ImportError: If external operator dependencies are missing
        """
        op_type = operator_config.get("type")
        name = operator_config.get("name", op_type)
        config = operator_config.get("config", {})

        # Check registry first
        if name in cls._operator_registry:
            return cls._operator_registry[name](operator_config)

        # Handle built-in operator types
        if op_type in cls._operator_types:
            return RegisteredOperator(name=name, operator_type=op_type, config=config)
        elif op_type == "data_juicer":
            return cls._create_data_juicer_filter_operator(name, config)
        else:
            raise ValueError(f"Unknown operator type: {op_type}")

    @classmethod
    def _create_data_juicer_filter_operator(
        cls, name: str, config: Dict[str, Any]
    ) -> BaseOperator:
        """
        Create operator adapter for data_juicer library operators.

        Automatically discovers and wraps data_juicer filter operators for
        integration into the processing pipeline framework.

        Args:
            name: data_juicer operator name (snake_case)
            config: Operator configuration parameters

        Returns:
            DataJuicerOperator wrapper instance

        Raises:
            ImportError: If data_juicer library is not installed
            AttributeError: If specified operator class is not found
        """
        try:
            # Import data_juicer filter module
            import data_juicer.ops.filter as dj_filters

            # Convert snake_case name to PascalCase class name
            class_name = "".join(word.capitalize() for word in name.split("_"))

            # Try to get the operator class from data_juicer.ops.filter
            if hasattr(dj_filters, class_name):
                operator_class = getattr(dj_filters, class_name)
                return DataJuicerOperator(
                    name=class_name, juicer_op_class=operator_class, config=config
                )
            else:
                # Fallback: try to import from specific module (for backward compatibility)
                module_path = "data_juicer.ops.filter"
                operator_module = importlib.import_module(
                    f"{module_path}.{name.lower()}"
                )
                operator_class = getattr(operator_module, class_name)
                return DataJuicerOperator(
                    name=class_name, juicer_op_class=operator_class, config=config
                )

        except ImportError as e:
            raise ImportError(
                f"Failed to import data_juicer operator '{name}': {e}. "
                f"Please ensure py-data-juicer is installed: pip install py-data-juicer"
            )
        except AttributeError as e:
            raise AttributeError(
                f"Data_juicer operator '{class_name}' not found. "
                f"Available operators can be found in data_juicer.ops.filter module. Error: {e}"
            )

create_operator(operator_config) classmethod

Create operator instance from configuration dictionary.

Supports registered operators, built-in types, and external library operators (like data_juicer) through automatic discovery and instantiation.

Parameters:

Name Type Description Default
operator_config Dict[str, Any]

Configuration dictionary containing: - type: Operator type identifier - name: Operator instance name - config: Operator-specific parameters

required

Returns:

Type Description
BaseOperator

Configured operator instance ready for pipeline integration

Raises:

Type Description
ValueError

If operator type is unknown or unsupported

ImportError

If external operator dependencies are missing

Source code in rm_gallery/core/data/process/ops/base.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
@classmethod
def create_operator(cls, operator_config: Dict[str, Any]) -> BaseOperator:
    """
    Create operator instance from configuration dictionary.

    Supports registered operators, built-in types, and external library
    operators (like data_juicer) through automatic discovery and instantiation.

    Args:
        operator_config: Configuration dictionary containing:
            - type: Operator type identifier
            - name: Operator instance name
            - config: Operator-specific parameters

    Returns:
        Configured operator instance ready for pipeline integration

    Raises:
        ValueError: If operator type is unknown or unsupported
        ImportError: If external operator dependencies are missing
    """
    op_type = operator_config.get("type")
    name = operator_config.get("name", op_type)
    config = operator_config.get("config", {})

    # Check registry first
    if name in cls._operator_registry:
        return cls._operator_registry[name](operator_config)

    # Handle built-in operator types
    if op_type in cls._operator_types:
        return RegisteredOperator(name=name, operator_type=op_type, config=config)
    elif op_type == "data_juicer":
        return cls._create_data_juicer_filter_operator(name, config)
    else:
        raise ValueError(f"Unknown operator type: {op_type}")

register(name) classmethod

Decorator for registering operator creation functions or classes.

Supports both function-based and class-based operator registration. For classes, automatically creates a factory function.

Parameters:

Name Type Description Default
name str

Unique operator identifier for registry lookup

required

Returns:

Type Description
Callable

Decorator function that registers and returns the original object

Example

@OperatorFactory.register("my_filter") class MyFilterOperator(BaseOperator): ...

Source code in rm_gallery/core/data/process/ops/base.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
@classmethod
def register(cls, name: str) -> Callable:
    """
    Decorator for registering operator creation functions or classes.

    Supports both function-based and class-based operator registration.
    For classes, automatically creates a factory function.

    Args:
        name: Unique operator identifier for registry lookup

    Returns:
        Decorator function that registers and returns the original object

    Example:
        @OperatorFactory.register("my_filter")
        class MyFilterOperator(BaseOperator):
            ...
    """

    def decorator(func_or_class):
        # Check if it's a class (subclass of BaseOperator)
        if isinstance(func_or_class, type) and issubclass(
            func_or_class, BaseOperator
        ):
            # Create a factory function for the class
            def class_factory(operator_config: Dict[str, Any]) -> BaseOperator:
                op_name = operator_config.get("name", name)
                config = operator_config.get("config", {})
                return func_or_class(name=op_name, config=config)

            cls._operator_registry[name] = class_factory
            return func_or_class
        else:
            # It's a function, register as-is
            cls._operator_registry[name] = func_or_class
            return func_or_class

    return decorator

RegisteredOperator

Bases: BaseOperator[T]

Generic operator wrapper that delegates to registry-based implementations.

Used for operators registered in the factory registry, providing a uniform interface while delegating actual processing to registered functions or classes.

Attributes:

Name Type Description
operator_type str

Type classification of the operator

Source code in rm_gallery/core/data/process/ops/base.py
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
class RegisteredOperator(BaseOperator[T]):
    """
    Generic operator wrapper that delegates to registry-based implementations.

    Used for operators registered in the factory registry, providing a uniform
    interface while delegating actual processing to registered functions or classes.

    Attributes:
        operator_type: Type classification of the operator
    """

    operator_type: str = Field(..., description="operator type")

    def __init__(
        self,
        name: str,
        operator_type: str,
        config: Optional[Dict[str, Any]] = None,
        **kwargs,
    ):
        """
        Initialize registered operator wrapper.

        Args:
            name: Operator instance name
            operator_type: Type classification for the operator
            config: Operator configuration parameters
            **kwargs: Additional initialization parameters
        """
        super().__init__(
            name=name, config=config, operator_type=operator_type, **kwargs
        )

    def process_dataset(self, items: List[T]) -> List[T]:
        """
        Process dataset by delegating to registered operator implementation.

        Args:
            items: List of data samples to process

        Returns:
            Processed data samples from registered operator
        """
        try:
            if self.name in OperatorFactory._operator_registry:
                operator = OperatorFactory._operator_registry[self.name](
                    {
                        "type": self.operator_type,
                        "name": self.name,
                        "config": self.config,
                    }
                )
                return operator.process_dataset(items)

            logger.warning(f"No registered operator found for name: {self.name}")
            return items
        except Exception as e:
            logger.error(
                f"Error in {self.operator_type} operation {self.name}: {str(e)}"
            )
            return items

__init__(name, operator_type, config=None, **kwargs)

Initialize registered operator wrapper.

Parameters:

Name Type Description Default
name str

Operator instance name

required
operator_type str

Type classification for the operator

required
config Optional[Dict[str, Any]]

Operator configuration parameters

None
**kwargs

Additional initialization parameters

{}
Source code in rm_gallery/core/data/process/ops/base.py
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
def __init__(
    self,
    name: str,
    operator_type: str,
    config: Optional[Dict[str, Any]] = None,
    **kwargs,
):
    """
    Initialize registered operator wrapper.

    Args:
        name: Operator instance name
        operator_type: Type classification for the operator
        config: Operator configuration parameters
        **kwargs: Additional initialization parameters
    """
    super().__init__(
        name=name, config=config, operator_type=operator_type, **kwargs
    )

process_dataset(items)

Process dataset by delegating to registered operator implementation.

Parameters:

Name Type Description Default
items List[T]

List of data samples to process

required

Returns:

Type Description
List[T]

Processed data samples from registered operator

Source code in rm_gallery/core/data/process/ops/base.py
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
def process_dataset(self, items: List[T]) -> List[T]:
    """
    Process dataset by delegating to registered operator implementation.

    Args:
        items: List of data samples to process

    Returns:
        Processed data samples from registered operator
    """
    try:
        if self.name in OperatorFactory._operator_registry:
            operator = OperatorFactory._operator_registry[self.name](
                {
                    "type": self.operator_type,
                    "name": self.name,
                    "config": self.config,
                }
            )
            return operator.process_dataset(items)

        logger.warning(f"No registered operator found for name: {self.name}")
        return items
    except Exception as e:
        logger.error(
            f"Error in {self.operator_type} operation {self.name}: {str(e)}"
        )
        return items