Skip to content

core

ChatMessageConverter

Bases: DataConverter

Specialized converter for chat message data format with conversation structure.

Processes data containing message arrays with role/content pairs for chat-based reward modeling and conversation training.

Input Data Format Expected

{ "messages": [ {"role": "user", "content": "Hello"}, {"role": "assistant", "content": "Hi there!"} ] }

Output: DataSample with structured input messages and empty output for inference

Source code in rm_gallery/core/data/load/chat_message.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
@DataConverterRegistry.register("chat_message")
class ChatMessageConverter(DataConverter):
    """
    Specialized converter for chat message data format with conversation structure.

    Processes data containing message arrays with role/content pairs for
    chat-based reward modeling and conversation training.

    Input Data Format Expected:
        {
            "messages": [
                {"role": "user", "content": "Hello"},
                {"role": "assistant", "content": "Hi there!"}
            ]
        }

    Output: DataSample with structured input messages and empty output for inference
    """

    def convert_to_data_sample(
        self, data_dict: Dict[str, Any], source_info: Dict[str, Any]
    ) -> DataSample:
        """
        Convert chat message data dictionary to standardized DataSample format.

        Extracts conversation messages from input data and creates a DataSample
        with structured input for chat-based processing pipelines.

        Args:
            data_dict: Raw data containing messages array with role/content pairs
            source_info: Metadata about data source (file path, dataset name, etc.)

        Returns:
            DataSample with structured conversation input and metadata
            Returns None if conversion fails
        """
        # generate unique id
        content = str(data_dict)
        unique_id = hashlib.md5(content.encode()).hexdigest()

        try:
            # Create input from messages
            data_input = []
            data_output = []
            messages = data_dict.get("messages", [])

            if isinstance(messages, list) and len(messages) > 0:
                # check if the conversation is paired
                is_paired_conversation = True
                if len(messages) % 2 != 0:
                    is_paired_conversation = False
                else:
                    for i in range(0, len(messages), 2):
                        if (
                            i + 1 < len(messages)
                            and messages[i].get("role") == "user"
                            and messages[i + 1].get("role") == "assistant"
                        ):
                            continue
                        else:
                            is_paired_conversation = False
                            break

                if is_paired_conversation and len(messages) >= 2:
                    # if the conversation is paired, the last assistant message is the output, others are the input
                    for i, msg in enumerate(messages):
                        if isinstance(msg, dict):
                            role = msg.get("role", "user")
                            content = msg.get("content", "")

                            # the last assistant message is the output
                            if i == len(messages) - 1 and role == "assistant":
                                # Convert to DataOutput format
                                answer_step = Step(
                                    role=role,
                                    content=content,
                                    label={},
                                    reward=Reward(),
                                )
                                data_output.append(
                                    DataOutput(answer=answer_step, steps=None)
                                )
                            else:
                                data_input.append(
                                    ChatMessage(role=role, content=content)
                                )
                else:
                    # if the conversation is not paired, all messages are the input
                    for msg in messages:
                        if isinstance(msg, dict):
                            role = msg.get("role", "user")
                            content = msg.get("content", "")
                            data_input.append(ChatMessage(role=role, content=content))

            # Build metadata based on source type
            metadata = {
                "raw_data": data_dict,
                "load_strategy": "ChatMessageConverter",
            }

            # Add source-specific metadata
            if source_info.get("load_type") == "local":
                metadata.update(
                    {
                        "source_file_path": source_info.get("source_file_path"),
                        "load_type": "local",
                    }
                )
            elif source_info.get("load_type") == "huggingface":
                metadata.update(
                    {
                        "dataset_name": source_info.get("dataset_name"),
                        "dataset_config": source_info.get("dataset_config"),
                        "split": source_info.get("split", "train"),
                        "load_type": "huggingface",
                    }
                )

            data_sample = DataSample(
                unique_id=unique_id,
                input=data_input,
                output=data_output,
                source="chat_message",
                task_category="chat",
                metadata=metadata,
            )

            return data_sample

        except Exception as e:
            logger.error(f"Error creating ChatMessage DataSample: {str(e)}")
            return None

convert_to_data_sample(data_dict, source_info)

Convert chat message data dictionary to standardized DataSample format.

Extracts conversation messages from input data and creates a DataSample with structured input for chat-based processing pipelines.

Parameters:

Name Type Description Default
data_dict Dict[str, Any]

Raw data containing messages array with role/content pairs

required
source_info Dict[str, Any]

Metadata about data source (file path, dataset name, etc.)

required

Returns:

Type Description
DataSample

DataSample with structured conversation input and metadata

DataSample

Returns None if conversion fails

Source code in rm_gallery/core/data/load/chat_message.py
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
def convert_to_data_sample(
    self, data_dict: Dict[str, Any], source_info: Dict[str, Any]
) -> DataSample:
    """
    Convert chat message data dictionary to standardized DataSample format.

    Extracts conversation messages from input data and creates a DataSample
    with structured input for chat-based processing pipelines.

    Args:
        data_dict: Raw data containing messages array with role/content pairs
        source_info: Metadata about data source (file path, dataset name, etc.)

    Returns:
        DataSample with structured conversation input and metadata
        Returns None if conversion fails
    """
    # generate unique id
    content = str(data_dict)
    unique_id = hashlib.md5(content.encode()).hexdigest()

    try:
        # Create input from messages
        data_input = []
        data_output = []
        messages = data_dict.get("messages", [])

        if isinstance(messages, list) and len(messages) > 0:
            # check if the conversation is paired
            is_paired_conversation = True
            if len(messages) % 2 != 0:
                is_paired_conversation = False
            else:
                for i in range(0, len(messages), 2):
                    if (
                        i + 1 < len(messages)
                        and messages[i].get("role") == "user"
                        and messages[i + 1].get("role") == "assistant"
                    ):
                        continue
                    else:
                        is_paired_conversation = False
                        break

            if is_paired_conversation and len(messages) >= 2:
                # if the conversation is paired, the last assistant message is the output, others are the input
                for i, msg in enumerate(messages):
                    if isinstance(msg, dict):
                        role = msg.get("role", "user")
                        content = msg.get("content", "")

                        # the last assistant message is the output
                        if i == len(messages) - 1 and role == "assistant":
                            # Convert to DataOutput format
                            answer_step = Step(
                                role=role,
                                content=content,
                                label={},
                                reward=Reward(),
                            )
                            data_output.append(
                                DataOutput(answer=answer_step, steps=None)
                            )
                        else:
                            data_input.append(
                                ChatMessage(role=role, content=content)
                            )
            else:
                # if the conversation is not paired, all messages are the input
                for msg in messages:
                    if isinstance(msg, dict):
                        role = msg.get("role", "user")
                        content = msg.get("content", "")
                        data_input.append(ChatMessage(role=role, content=content))

        # Build metadata based on source type
        metadata = {
            "raw_data": data_dict,
            "load_strategy": "ChatMessageConverter",
        }

        # Add source-specific metadata
        if source_info.get("load_type") == "local":
            metadata.update(
                {
                    "source_file_path": source_info.get("source_file_path"),
                    "load_type": "local",
                }
            )
        elif source_info.get("load_type") == "huggingface":
            metadata.update(
                {
                    "dataset_name": source_info.get("dataset_name"),
                    "dataset_config": source_info.get("dataset_config"),
                    "split": source_info.get("split", "train"),
                    "load_type": "huggingface",
                }
            )

        data_sample = DataSample(
            unique_id=unique_id,
            input=data_input,
            output=data_output,
            source="chat_message",
            task_category="chat",
            metadata=metadata,
        )

        return data_sample

    except Exception as e:
        logger.error(f"Error creating ChatMessage DataSample: {str(e)}")
        return None

ConversationTurnFilter

Bases: BaseOperator

Filter conversations based on the number of turns in the input. A turn is defined as a single message in the conversation.

Source code in rm_gallery/core/data/process/ops/filter/conversation_turn_filter.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@OperatorFactory.register("conversation_turn_filter")
class ConversationTurnFilter(BaseOperator):
    """
    Filter conversations based on the number of turns in the input.
    A turn is defined as a single message in the conversation.
    """

    def __init__(
        self,
        name: str,
        config: Optional[Dict[str, Any]] = None,
    ):
        """
        Initialize the conversation turn filter.

        Args:
            name: Name of the operator
            min_turns: Minimum number of turns required (inclusive)
            max_turns: Maximum number of turns allowed (inclusive)
            config: Additional configuration parameters
        """
        super().__init__(name=name, config=config)

    def process_dataset(self, items: List[DataSample]) -> List[DataSample]:
        """
        Filter conversations based on the number of turns.

        Args:
            items: List of DataSample items to process

        Returns:
            List of DataSample items that meet the turn count criteria
        """
        try:
            filtered_items = []
            for item in items:
                # Count the number of user turns in the input
                num_turns = (
                    sum(1 for input_item in item.input if input_item.role == "user")
                    if item.input
                    else 0
                )

                # Check if the number of turns is within the specified range
                if (
                    self.config.get("min_turns", 1)
                    <= num_turns
                    <= self.config.get("max_turns", 100)
                ):
                    filtered_items.append(item)
                else:
                    pass
                    # logger.debug(f"Filtered out conversation with {num_turns} user turns "
                    #            f"(min: {self.min_turns}, max: {self.max_turns})")

            return filtered_items
        except Exception as e:
            logger.error(f"Error in conversation turn filtering: {str(e)}")
            return items

__init__(name, config=None)

Initialize the conversation turn filter.

Parameters:

Name Type Description Default
name str

Name of the operator

required
min_turns

Minimum number of turns required (inclusive)

required
max_turns

Maximum number of turns allowed (inclusive)

required
config Optional[Dict[str, Any]]

Additional configuration parameters

None
Source code in rm_gallery/core/data/process/ops/filter/conversation_turn_filter.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def __init__(
    self,
    name: str,
    config: Optional[Dict[str, Any]] = None,
):
    """
    Initialize the conversation turn filter.

    Args:
        name: Name of the operator
        min_turns: Minimum number of turns required (inclusive)
        max_turns: Maximum number of turns allowed (inclusive)
        config: Additional configuration parameters
    """
    super().__init__(name=name, config=config)

process_dataset(items)

Filter conversations based on the number of turns.

Parameters:

Name Type Description Default
items List[DataSample]

List of DataSample items to process

required

Returns:

Type Description
List[DataSample]

List of DataSample items that meet the turn count criteria

Source code in rm_gallery/core/data/process/ops/filter/conversation_turn_filter.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def process_dataset(self, items: List[DataSample]) -> List[DataSample]:
    """
    Filter conversations based on the number of turns.

    Args:
        items: List of DataSample items to process

    Returns:
        List of DataSample items that meet the turn count criteria
    """
    try:
        filtered_items = []
        for item in items:
            # Count the number of user turns in the input
            num_turns = (
                sum(1 for input_item in item.input if input_item.role == "user")
                if item.input
                else 0
            )

            # Check if the number of turns is within the specified range
            if (
                self.config.get("min_turns", 1)
                <= num_turns
                <= self.config.get("max_turns", 100)
            ):
                filtered_items.append(item)
            else:
                pass
                # logger.debug(f"Filtered out conversation with {num_turns} user turns "
                #            f"(min: {self.min_turns}, max: {self.max_turns})")

        return filtered_items
    except Exception as e:
        logger.error(f"Error in conversation turn filtering: {str(e)}")
        return items

GenericConverter

Bases: DataConverter

Generic converter that automatically handles diverse HuggingFace dataset formats.

Acts as a fallback converter when no specific format converter is available. Intelligently extracts input/output pairs from common field names and structures.

Supported Input Patterns
  • Fields: prompt, question, input, text, instruction (for input)
  • Fields: response, answer, output, completion (for output)
  • Messages: array of role/content objects for conversations

Output: DataSample with auto-detected task category and structured data

Source code in rm_gallery/core/data/load/huggingface.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
@DataConverterRegistry.register("*")
class GenericConverter(DataConverter):
    """
    Generic converter that automatically handles diverse HuggingFace dataset formats.

    Acts as a fallback converter when no specific format converter is available.
    Intelligently extracts input/output pairs from common field names and structures.

    Supported Input Patterns:
        - Fields: prompt, question, input, text, instruction (for input)
        - Fields: response, answer, output, completion (for output)
        - Messages: array of role/content objects for conversations

    Output: DataSample with auto-detected task category and structured data
    """

    def convert_to_data_sample(
        self, data_dict: Dict[str, Any], source_info: Dict[str, Any]
    ) -> DataSample:
        """
        Convert generic HuggingFace data dictionary to standardized DataSample format.

        Automatically detects input/output patterns from common field names,
        determines task category, and creates appropriate data structure.

        Args:
            data_dict: Raw data dictionary from HuggingFace dataset
            source_info: Source metadata including dataset name, config, split info

        Returns:
            DataSample with auto-detected structure and task category
            Returns None if input/output extraction fails
        """
        # Generate unique id
        content = str(data_dict)
        unique_id = hashlib.md5(content.encode()).hexdigest()

        try:
            # Try to extract input from common field names
            input_data = self._extract_input(data_dict)
            if not input_data:
                logger.warning(f"Could not extract input from data: {data_dict}")
                return None

            # Try to extract output from common field names
            output_data = self._extract_output(data_dict)
            if not output_data:
                logger.warning(f"Could not extract output from data: {data_dict}")
                return None

            # Determine task category
            task_category = self._determine_task_category(data_dict)

            # Build metadata based on source type
            metadata = {
                "raw_data": data_dict,
                "load_strategy": "GenericConverter",
                "task_category": task_category,
            }

            # Add source-specific metadata
            if source_info.get("load_type") == "local":
                metadata.update(
                    {
                        "source_file_path": source_info.get("source_file_path"),
                        "load_type": "local",
                    }
                )
            elif source_info.get("load_type") == "huggingface":
                metadata.update(
                    {
                        "dataset_name": source_info.get("dataset_name"),
                        "dataset_config": source_info.get("dataset_config"),
                        "split": source_info.get("split", "train"),
                        "load_type": "huggingface",
                    }
                )

            data_sample = DataSample(
                unique_id=unique_id,
                input=input_data,
                output=output_data,
                source=source_info.get("dataset_name", "generic"),
                task_category=task_category,
                metadata=metadata,
            )

            return data_sample

        except Exception as e:
            logger.error(f"Error creating generic DataSample: {str(e)}")
            return None

    def _extract_input(self, data_dict: Dict[str, Any]) -> list[ChatMessage]:
        """
        Extract input messages from data using common field name patterns.

        Searches for standard input field names and converts to ChatMessage format.
        Handles both single-field inputs and conversation message arrays.

        Args:
            data_dict: Raw data dictionary to extract input from

        Returns:
            List of ChatMessage objects representing the input context
        """
        input_data = []

        # Common input field names
        for field in ["prompt", "question", "input", "text", "instruction"]:
            if field in data_dict and data_dict[field]:
                input_data.append(
                    ChatMessage(role="user", content=str(data_dict[field]))
                )
                break

        # Handle conversation/messages format
        if "messages" in data_dict:
            messages = data_dict["messages"]
            if isinstance(messages, list):
                for msg in messages:
                    if isinstance(msg, dict):
                        role = msg.get("role", "user")
                        content = msg.get("content", str(msg))
                        if role in ["user", "system"]:  # Only include input messages
                            input_data.append(ChatMessage(role=role, content=content))

        return input_data

    def _extract_output(self, data_dict: Dict[str, Any]) -> list[DataOutput]:
        """
        Extract output responses from data using common field name patterns.

        Searches for standard output field names and creates DataOutput objects
        with Step components for response evaluation.

        Args:
            data_dict: Raw data dictionary to extract output from

        Returns:
            List of DataOutput objects representing expected responses
        """
        outputs = []

        # Common output field names
        for field in ["response", "answer", "output", "completion"]:
            if field in data_dict and data_dict[field]:
                outputs.append(
                    DataOutput(
                        answer=Step(role="assistant", content=str(data_dict[field]))
                    )
                )
                break

        # Handle messages format for assistant responses
        if "messages" in data_dict and not outputs:
            messages = data_dict["messages"]
            if isinstance(messages, list):
                for msg in messages:
                    if isinstance(msg, dict) and msg.get("role") == "assistant":
                        outputs.append(
                            DataOutput(
                                answer=Step(
                                    role="assistant",
                                    content=str(msg.get("content", "")),
                                )
                            )
                        )

        return outputs

    def _determine_task_category(self, data_dict: Dict[str, Any]) -> str:
        """
        Automatically determine task category from data field patterns.

        Analyzes field names and structure to classify the type of task
        for appropriate processing and evaluation strategies.

        Args:
            data_dict: Raw data dictionary to analyze

        Returns:
            String identifier for the detected task category
        """
        # Check for explicit task category
        if "task_category" in data_dict:
            return str(data_dict["task_category"])

        # Infer from field names
        if any(field in data_dict for field in ["messages", "conversation"]):
            return "chat"
        elif any(field in data_dict for field in ["question", "answer"]):
            return "qa"
        elif any(field in data_dict for field in ["instruction", "completion"]):
            return "instruction_following"
        else:
            return "general"

convert_to_data_sample(data_dict, source_info)

Convert generic HuggingFace data dictionary to standardized DataSample format.

Automatically detects input/output patterns from common field names, determines task category, and creates appropriate data structure.

Parameters:

Name Type Description Default
data_dict Dict[str, Any]

Raw data dictionary from HuggingFace dataset

required
source_info Dict[str, Any]

Source metadata including dataset name, config, split info

required

Returns:

Type Description
DataSample

DataSample with auto-detected structure and task category

DataSample

Returns None if input/output extraction fails

Source code in rm_gallery/core/data/load/huggingface.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def convert_to_data_sample(
    self, data_dict: Dict[str, Any], source_info: Dict[str, Any]
) -> DataSample:
    """
    Convert generic HuggingFace data dictionary to standardized DataSample format.

    Automatically detects input/output patterns from common field names,
    determines task category, and creates appropriate data structure.

    Args:
        data_dict: Raw data dictionary from HuggingFace dataset
        source_info: Source metadata including dataset name, config, split info

    Returns:
        DataSample with auto-detected structure and task category
        Returns None if input/output extraction fails
    """
    # Generate unique id
    content = str(data_dict)
    unique_id = hashlib.md5(content.encode()).hexdigest()

    try:
        # Try to extract input from common field names
        input_data = self._extract_input(data_dict)
        if not input_data:
            logger.warning(f"Could not extract input from data: {data_dict}")
            return None

        # Try to extract output from common field names
        output_data = self._extract_output(data_dict)
        if not output_data:
            logger.warning(f"Could not extract output from data: {data_dict}")
            return None

        # Determine task category
        task_category = self._determine_task_category(data_dict)

        # Build metadata based on source type
        metadata = {
            "raw_data": data_dict,
            "load_strategy": "GenericConverter",
            "task_category": task_category,
        }

        # Add source-specific metadata
        if source_info.get("load_type") == "local":
            metadata.update(
                {
                    "source_file_path": source_info.get("source_file_path"),
                    "load_type": "local",
                }
            )
        elif source_info.get("load_type") == "huggingface":
            metadata.update(
                {
                    "dataset_name": source_info.get("dataset_name"),
                    "dataset_config": source_info.get("dataset_config"),
                    "split": source_info.get("split", "train"),
                    "load_type": "huggingface",
                }
            )

        data_sample = DataSample(
            unique_id=unique_id,
            input=input_data,
            output=output_data,
            source=source_info.get("dataset_name", "generic"),
            task_category=task_category,
            metadata=metadata,
        )

        return data_sample

    except Exception as e:
        logger.error(f"Error creating generic DataSample: {str(e)}")
        return None

TextLengthFilter

Bases: BaseOperator

Filter texts based on their length.

Source code in rm_gallery/core/data/process/ops/filter/text_length_filter.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
@OperatorFactory.register("text_length_filter")
class TextLengthFilter(BaseOperator):
    """
    Filter texts based on their length.
    """

    def __init__(
        self,
        name: str,
        config: Optional[Dict[str, Any]] = None,
    ):
        """
        Initialize the text length filter.

        Args:
            name: Name of the operator
            min_length: Minimum text length required (inclusive)
            max_length: Maximum text length allowed (inclusive)
            config: Additional configuration parameters
        """
        super().__init__(name=name, config=config)

    def process_dataset(self, items: List[DataSample]) -> List[DataSample]:
        """
        Filter items based on text length.

        Args:
            items: List of data items to process

        Returns:
            Filtered list of items
        """
        filtered_items = []
        for item in items:
            # get all input and output texts
            texts = []

            # process input from history
            if item.input:
                for input_item in item.input:
                    if input_item.content:
                        texts.append(input_item.content)

            # process output from answers
            if item.output:
                for output_item in item.output:
                    if (
                        hasattr(output_item, "answer")
                        and output_item.answer
                        and output_item.answer.content
                    ):
                        texts.append(output_item.answer.content)

            # calculate total length
            total_length = sum(len(text) for text in texts)

            if (
                self.config.get("min_length", 10)
                <= total_length
                <= self.config.get("max_length", 1000)
            ):
                filtered_items.append(item)
            else:
                pass
                # logger.debug(f"Filtered out item with total length {total_length}")
        return filtered_items

__init__(name, config=None)

Initialize the text length filter.

Parameters:

Name Type Description Default
name str

Name of the operator

required
min_length

Minimum text length required (inclusive)

required
max_length

Maximum text length allowed (inclusive)

required
config Optional[Dict[str, Any]]

Additional configuration parameters

None
Source code in rm_gallery/core/data/process/ops/filter/text_length_filter.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def __init__(
    self,
    name: str,
    config: Optional[Dict[str, Any]] = None,
):
    """
    Initialize the text length filter.

    Args:
        name: Name of the operator
        min_length: Minimum text length required (inclusive)
        max_length: Maximum text length allowed (inclusive)
        config: Additional configuration parameters
    """
    super().__init__(name=name, config=config)

process_dataset(items)

Filter items based on text length.

Parameters:

Name Type Description Default
items List[DataSample]

List of data items to process

required

Returns:

Type Description
List[DataSample]

Filtered list of items

Source code in rm_gallery/core/data/process/ops/filter/text_length_filter.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def process_dataset(self, items: List[DataSample]) -> List[DataSample]:
    """
    Filter items based on text length.

    Args:
        items: List of data items to process

    Returns:
        Filtered list of items
    """
    filtered_items = []
    for item in items:
        # get all input and output texts
        texts = []

        # process input from history
        if item.input:
            for input_item in item.input:
                if input_item.content:
                    texts.append(input_item.content)

        # process output from answers
        if item.output:
            for output_item in item.output:
                if (
                    hasattr(output_item, "answer")
                    and output_item.answer
                    and output_item.answer.content
                ):
                    texts.append(output_item.answer.content)

        # calculate total length
        total_length = sum(len(text) for text in texts)

        if (
            self.config.get("min_length", 10)
            <= total_length
            <= self.config.get("max_length", 1000)
        ):
            filtered_items.append(item)
        else:
            pass
            # logger.debug(f"Filtered out item with total length {total_length}")
    return filtered_items