1. Overview

The Data Process Module provides users with a unified and flexible data processing solution. Based on the Operator Pipeline design philosophy, this module allows users to build complex data processing workflows by flexibly combining multiple operators.

2. Architecture Design

Core Components

2.1. DataProcess - Data Processing Engine

  • Inherits from BaseDataModule, providing standardized data processing interfaces
  • Manages and orchestrates the execution order of operator sequences
  • Supports both batch data processing and real-time data stream processing

2.2. BaseOperator - Abstract Base Class for Operators

  • Defines standard interface specifications for operators
  • Supports generic types for type safety
  • Provides extensible data processing abstract methods

2.3. OperatorFactory - Operator Factory

  • Implements unified registration and dynamic creation mechanisms for operators
  • Seamlessly integrates with the data-juicer ecosystem operators
  • Supports configuration-based operator instantiation

3. Core Features

3.1. Pipeline-based Data Processing

  • Chain Operations: Supports seamless serial execution of multiple operators
  • Metadata Preservation: Completely preserves metadata information from original datasets
  • Full Tracking: Provides detailed processing logs, performance statistics, and data flow tracking

3.2. Rich Operator Ecosystem

  • Built-in Operators:
  • TextLengthFilter - Intelligent filter based on text length
  • ConversationTurnFilter - Filter for conversation turn count
  • External Integration:
  • Full support for data-juicer operator library
  • Support for custom operator extensions

3.3. Configuration-driven Design

  • Declarative Configuration: Flexibly define data processing flows through configuration files
  • Parameterized Control: All operator parameters can be adjusted through configuration files
  • Dynamic Adjustment: Supports runtime dynamic modification of processing parameters

4. Quick Start

Method 1: Direct Operator Creation

from rm_gallery.core.data.process.process import create_processor
from rm_gallery.core.data.process.ops.filter.text_length_filter import TextLengthFilter
from rm_gallery.core.data.process.ops.filter.conversation_turn_filter import ConversationTurnFilter
from rm_gallery.core.data.load.base import create_loader
import rm_gallery.core.data     # Core strategy registration
import rm_gallery.gallery.data  # Extension strategy registration

# Configure local file loading parameters
config = {
    "path": "../../../data/reward-bench-2/data/test-00000-of-00001.parquet",
    "limit": 1000,  # Limit the number of data entries to load
}

# Create data loader
loader = create_loader(
    name="rewardbench2",           # Dataset name
    load_strategy_type="local",    # Use local file loading strategy
    data_source="rewardbench2",    # Specify data source format converter
    config=config                  # Pass configuration parameters
)

# Execute data loading
dataset = loader.run()

# Create operators
text_filter = TextLengthFilter(
    name="text_length_filter",
    config={"min_length": 50, "max_length": 2000}
)

turn_filter = ConversationTurnFilter(
    name="conversation_turn_filter",
    config={"min_turns": 1, "max_turns": 10}
)

# Create data processing module
processor = create_processor(
    name="data_processor",
    operators=[text_filter, turn_filter]
)

# Process data
result = processor.run(dataset)
print(f"Before processing: {len(dataset.datasamples)} data entries")
print(f"After processing: {len(result.datasamples)} data entries")
/Users/xielipeng/Library/Caches/pypoetry/virtualenvs/rm-gallery-VQCvXsd2-py3.10/lib/python3.10/site-packages/tqdm/auto.py:21: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html
  from .autonotebook import tqdm as notebook_tqdm


2025-07-02 13:24:02.337 | INFO     | rm_gallery.core.utils.logger:init_logger:16 - start!
Before processing: 1000 data entries2025-07-02 13:24:02.722 | INFO     | rm_gallery.core.data.load.base:_load_data_impl:392 - Loaded 1865 samples from file: ../../../data/reward-bench-2/data/test-00000-of-00001.parquet

After processing: 168 data entries
2025-07-02 13:24:02.723 | INFO     | rm_gallery.core.data.load.base:run:262 - Applied limit of 1000, final count: 1000
2025-07-02 13:24:02.724 | INFO     | rm_gallery.core.data.load.base:run:276 - Successfully loaded 1000 items from rewardbench2
2025-07-02 13:24:02.729 | INFO     | rm_gallery.core.data.process.process:run:92 - Processing 1000 items with 2 operators
2025-07-02 13:24:02.729 | INFO     | rm_gallery.core.data.process.process:run:99 - Applying operator 1/2: text_length_filter
2025-07-02 13:24:02.734 | INFO     | rm_gallery.core.data.process.process:run:103 - Operator text_length_filter completed: 168 items remaining
2025-07-02 13:24:02.734 | INFO     | rm_gallery.core.data.process.process:run:99 - Applying operator 2/2: conversation_turn_filter


2025-07-02 13:24:02.734 | INFO     | rm_gallery.core.data.process.process:run:103 - Operator conversation_turn_filter completed: 168 items remaining
2025-07-02 13:24:02.734 | INFO     | rm_gallery.core.data.process.process:run:127 - Processing completed: 1000 -> 168 items

Method 2: Configuration-based Batch Processing

Using configuration files provides more flexible definition of data processing workflows, especially suitable for complex multi-step processing scenarios.

# Create operators through configuration
from rm_gallery.core.data.process.process import create_processor
from rm_gallery.core.data.load.base import create_loader
from rm_gallery.core.data.process.ops.base import OperatorFactory
import rm_gallery.core.data     # Core strategy registration
import rm_gallery.gallery.data  # Extension strategy registration

# Configure local file loading parameters
config = {
    "path": "../../../data/reward-bench-2/data/test-00000-of-00001.parquet",
    "limit": 1000,  # Limit the number of data entries to load
}

# Create data loader
loader = create_loader(
    name="rewardbench2",           # Dataset name
    load_strategy_type="local",    # Use local file loading strategy
    data_source="rewardbench2",    # Specify data source format converter
    config=config                  # Pass configuration parameters
)

# Execute data loading
dataset = loader.run()

# Configure multiple operators
operator_configs = [
    {
        "type": "filter",
        "name": "conversation_turn_filter",
        "config": {"min_turns": 1, "max_turns": 8}
    },
    {
        "type": "filter",
        "name": "text_length_filter",
        "config": {"min_length": 100, "max_length": 2000}
    },
    {
        "type": "data_juicer",
        "name": "character_repetition_filter",
        "config": {
            "rep_len": 10,
            "min_ratio": 0.0,
            "max_ratio": 0.5
        }
    }
]

# Batch create operators
operators = [OperatorFactory.create_operator(config) for config in operator_configs]

# Create processor
processor = create_processor(
    name="batch_processor",
    operators=operators
)

result = processor.run(dataset)
print(f"Before processing: {len(dataset.datasamples)} data entries")
print(f"After processing: {len(result.datasamples)} data entries")

5. Advanced Features

5.1. Custom Operator Development

When built-in operators cannot meet specific requirements, you can easily create custom operators. Here's the complete development workflow:

Step 1: Implement Operator Class

Create custom operators in the rm_gallery/gallery/data/process/ directory:

from rm_gallery.core.data.process.ops.base import BaseOperator, OperatorFactory

@OperatorFactory.register("custom_filter")
class CustomFilter(BaseOperator):
    """Custom data filter example"""

    def process_dataset(self, items):
        """
        Core method for processing datasets

        Args:
            items: List of input data items

        Returns:
            List of filtered data items
        """
        filtered_items = []
        for item in items:
            if self._custom_condition(item):
                filtered_items.append(item)
        return filtered_items

    def _custom_condition(self, item):
        """
        Custom filtering condition

        Args:
            item: Single data item

        Returns:
            bool: Whether to keep this data item
        """
        # Implement your filtering logic here
        return True

Step 2: Register Operator

Import the operator in rm_gallery/gallery/data/__init__.py to complete registration:

from rm_gallery.gallery.data.process.custom_filter import CustomFilter

5.2. Data-Juicer Operator Integration

RM-Gallery seamlessly integrates with the data-juicer ecosystem, allowing you to use its rich collection of data processing operators:

# Configuration example using data-juicer operators
config = {
    "type": "data_juicer",
    "name": "text_length_filter",
    "config": {
        "min_len": 10,
        "max_len": 20
    }
}

operator = OperatorFactory.create_operator(config)

6. Supported Operators

Operator Name Functionality Configuration Parameters
TextLengthFilter Filter data samples based on text length min_length, max_length
ConversationTurnFilter Filter samples based on conversation turn count min_turns, max_turns

Data-Juicer Integrated Operators

Operator Name Functionality Status
text_length_filter Text length filtering ✅ Tested
character_repetition_filter Character repetition filtering ✅ Tested
word_repetition_filter Word repetition filtering 🔄 Testing

Tip: We continuously add and test new operators, stay tuned for more features!