Skip to content

chat_message

Chat Message Data Converter - specialized converter for chat message format data. Handles conversation data with multiple messages and roles for chat-based training.

ChatMessageConverter

Bases: DataConverter

Specialized converter for chat message data format with conversation structure.

Processes data containing message arrays with role/content pairs for chat-based reward modeling and conversation training.

Input Data Format Expected

{ "messages": [ {"role": "user", "content": "Hello"}, {"role": "assistant", "content": "Hi there!"} ] }

Output: DataSample with structured input messages and empty output for inference

Source code in rm_gallery/core/data/load/chat_message.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
@DataConverterRegistry.register("chat_message")
class ChatMessageConverter(DataConverter):
    """
    Specialized converter for chat message data format with conversation structure.

    Processes data containing message arrays with role/content pairs for
    chat-based reward modeling and conversation training.

    Input Data Format Expected:
        {
            "messages": [
                {"role": "user", "content": "Hello"},
                {"role": "assistant", "content": "Hi there!"}
            ]
        }

    Output: DataSample with structured input messages and empty output for inference
    """

    def convert_to_data_sample(
        self, data_dict: Dict[str, Any], source_info: Dict[str, Any]
    ) -> DataSample:
        """
        Convert chat message data dictionary to standardized DataSample format.

        Extracts conversation messages from input data and creates a DataSample
        with structured input for chat-based processing pipelines.

        Args:
            data_dict: Raw data containing messages array with role/content pairs
            source_info: Metadata about data source (file path, dataset name, etc.)

        Returns:
            DataSample with structured conversation input and metadata
            Returns None if conversion fails
        """
        # generate unique id
        content = str(data_dict)
        unique_id = hashlib.md5(content.encode()).hexdigest()

        try:
            # Create input from messages
            data_input = []
            data_output = []
            messages = data_dict.get("messages", [])

            if isinstance(messages, list) and len(messages) > 0:
                # check if the conversation is paired
                is_paired_conversation = True
                if len(messages) % 2 != 0:
                    is_paired_conversation = False
                else:
                    for i in range(0, len(messages), 2):
                        if (
                            i + 1 < len(messages)
                            and messages[i].get("role") == "user"
                            and messages[i + 1].get("role") == "assistant"
                        ):
                            continue
                        else:
                            is_paired_conversation = False
                            break

                if is_paired_conversation and len(messages) >= 2:
                    # if the conversation is paired, the last assistant message is the output, others are the input
                    for i, msg in enumerate(messages):
                        if isinstance(msg, dict):
                            role = msg.get("role", "user")
                            content = msg.get("content", "")

                            # the last assistant message is the output
                            if i == len(messages) - 1 and role == "assistant":
                                # Convert to DataOutput format
                                answer_step = Step(
                                    role=role,
                                    content=content,
                                    label={},
                                    reward=Reward(),
                                )
                                data_output.append(
                                    DataOutput(answer=answer_step, steps=None)
                                )
                            else:
                                data_input.append(
                                    ChatMessage(role=role, content=content)
                                )
                else:
                    # if the conversation is not paired, all messages are the input
                    for msg in messages:
                        if isinstance(msg, dict):
                            role = msg.get("role", "user")
                            content = msg.get("content", "")
                            data_input.append(ChatMessage(role=role, content=content))

            # Build metadata based on source type
            metadata = {
                "raw_data": data_dict,
                "load_strategy": "ChatMessageConverter",
            }

            # Add source-specific metadata
            if source_info.get("load_type") == "local":
                metadata.update(
                    {
                        "source_file_path": source_info.get("source_file_path"),
                        "load_type": "local",
                    }
                )
            elif source_info.get("load_type") == "huggingface":
                metadata.update(
                    {
                        "dataset_name": source_info.get("dataset_name"),
                        "dataset_config": source_info.get("dataset_config"),
                        "split": source_info.get("split", "train"),
                        "load_type": "huggingface",
                    }
                )

            data_sample = DataSample(
                unique_id=unique_id,
                input=data_input,
                output=data_output,
                source="chat_message",
                task_category="chat",
                metadata=metadata,
            )

            return data_sample

        except Exception as e:
            logger.error(f"Error creating ChatMessage DataSample: {str(e)}")
            return None

convert_to_data_sample(data_dict, source_info)

Convert chat message data dictionary to standardized DataSample format.

Extracts conversation messages from input data and creates a DataSample with structured input for chat-based processing pipelines.

Parameters:

Name Type Description Default
data_dict Dict[str, Any]

Raw data containing messages array with role/content pairs

required
source_info Dict[str, Any]

Metadata about data source (file path, dataset name, etc.)

required

Returns:

Type Description
DataSample

DataSample with structured conversation input and metadata

DataSample

Returns None if conversion fails

Source code in rm_gallery/core/data/load/chat_message.py
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
def convert_to_data_sample(
    self, data_dict: Dict[str, Any], source_info: Dict[str, Any]
) -> DataSample:
    """
    Convert chat message data dictionary to standardized DataSample format.

    Extracts conversation messages from input data and creates a DataSample
    with structured input for chat-based processing pipelines.

    Args:
        data_dict: Raw data containing messages array with role/content pairs
        source_info: Metadata about data source (file path, dataset name, etc.)

    Returns:
        DataSample with structured conversation input and metadata
        Returns None if conversion fails
    """
    # generate unique id
    content = str(data_dict)
    unique_id = hashlib.md5(content.encode()).hexdigest()

    try:
        # Create input from messages
        data_input = []
        data_output = []
        messages = data_dict.get("messages", [])

        if isinstance(messages, list) and len(messages) > 0:
            # check if the conversation is paired
            is_paired_conversation = True
            if len(messages) % 2 != 0:
                is_paired_conversation = False
            else:
                for i in range(0, len(messages), 2):
                    if (
                        i + 1 < len(messages)
                        and messages[i].get("role") == "user"
                        and messages[i + 1].get("role") == "assistant"
                    ):
                        continue
                    else:
                        is_paired_conversation = False
                        break

            if is_paired_conversation and len(messages) >= 2:
                # if the conversation is paired, the last assistant message is the output, others are the input
                for i, msg in enumerate(messages):
                    if isinstance(msg, dict):
                        role = msg.get("role", "user")
                        content = msg.get("content", "")

                        # the last assistant message is the output
                        if i == len(messages) - 1 and role == "assistant":
                            # Convert to DataOutput format
                            answer_step = Step(
                                role=role,
                                content=content,
                                label={},
                                reward=Reward(),
                            )
                            data_output.append(
                                DataOutput(answer=answer_step, steps=None)
                            )
                        else:
                            data_input.append(
                                ChatMessage(role=role, content=content)
                            )
            else:
                # if the conversation is not paired, all messages are the input
                for msg in messages:
                    if isinstance(msg, dict):
                        role = msg.get("role", "user")
                        content = msg.get("content", "")
                        data_input.append(ChatMessage(role=role, content=content))

        # Build metadata based on source type
        metadata = {
            "raw_data": data_dict,
            "load_strategy": "ChatMessageConverter",
        }

        # Add source-specific metadata
        if source_info.get("load_type") == "local":
            metadata.update(
                {
                    "source_file_path": source_info.get("source_file_path"),
                    "load_type": "local",
                }
            )
        elif source_info.get("load_type") == "huggingface":
            metadata.update(
                {
                    "dataset_name": source_info.get("dataset_name"),
                    "dataset_config": source_info.get("dataset_config"),
                    "split": source_info.get("split", "train"),
                    "load_type": "huggingface",
                }
            )

        data_sample = DataSample(
            unique_id=unique_id,
            input=data_input,
            output=data_output,
            source="chat_message",
            task_category="chat",
            metadata=metadata,
        )

        return data_sample

    except Exception as e:
        logger.error(f"Error creating ChatMessage DataSample: {str(e)}")
        return None