Skip to content

rm_gallery

BaseHarmlessnessListWiseReward

Bases: BaseListWisePrincipleReward

The assistant aims to answer questions, avoiding harmful behaviors such as spreading misinformation, spreading harmful ideas, or engaging in other harmful activities.

Source code in rm_gallery/gallery/rm/alignment/base.py
61
62
63
64
65
66
67
68
69
70
@RewardRegistry.register("base_harmlessness_listwise")
class BaseHarmlessnessListWiseReward(BaseListWisePrincipleReward):
    """The assistant aims to answer questions, avoiding harmful behaviors such as spreading misinformation, spreading harmful ideas, or engaging in other harmful activities."""

    name: str = Field(default="base_harmlessness_listwise")
    desc: str = Field(default=DEFAULT_HARMLESSNESS_DESC)
    scenario: str = Field(
        default=DEFAULT_HARMLESSNESS_SCENARIO, description="assistant scenario"
    )
    principles: List[str] = Field(default=DEFAULT_HARMLESSNESS_PRINCIPLES)

BaseHarmlessnessPointWiseReward

Bases: BasePointWisePrincipleReward

The assistant aims to answer questions, avoiding harmful behaviors such as spreading misinformation, spreading harmful ideas, or engaging in other harmful activities.

Source code in rm_gallery/gallery/rm/alignment/base.py
 97
 98
 99
100
101
102
103
104
105
106
@RewardRegistry.register("base_harmlessness_pointwise")
class BaseHarmlessnessPointWiseReward(BasePointWisePrincipleReward):
    """The assistant aims to answer questions, avoiding harmful behaviors such as spreading misinformation, spreading harmful ideas, or engaging in other harmful activities."""

    name: str = Field(default="base_harmlessness_pointwise")
    desc: str = Field(default=DEFAULT_HARMLESSNESS_DESC)
    scenario: str = Field(
        default=DEFAULT_HARMLESSNESS_SCENARIO, description="assistant scenario"
    )
    principles: List[str] = Field(default=DEFAULT_HARMLESSNESS_PRINCIPLES)

BaseHelpfulnessListWiseReward

Bases: BaseListWisePrincipleReward

The assistant aims to provide helpful and informative responses to users, responding to their queries with relevant and accurate information.

Source code in rm_gallery/gallery/rm/alignment/base.py
49
50
51
52
53
54
55
56
57
58
@RewardRegistry.register("base_helpfulness_listwise")
class BaseHelpfulnessListWiseReward(BaseListWisePrincipleReward):
    """The assistant aims to provide helpful and informative responses to users, responding to their queries with relevant and accurate information."""

    name: str = Field(default="base_helpfulness_listwise")
    desc: str = Field(default=DEFAULT_HELPFULNESS_DESC)
    scenario: str = Field(
        default=DEFAULT_HELPFULNESS_SCENARIO, description="assistant scenario"
    )
    principles: List[str] = Field(default=DEFAULT_HELPFULNESS_PRINCIPLES)

BaseHelpfulnessPointWiseReward

Bases: BasePointWisePrincipleReward

The assistant aims to provide helpful and informative responses to users, responding to their queries with relevant and accurate information.

Source code in rm_gallery/gallery/rm/alignment/base.py
85
86
87
88
89
90
91
92
93
94
@RewardRegistry.register("base_helpfulness_pointwise")
class BaseHelpfulnessPointWiseReward(BasePointWisePrincipleReward):
    """The assistant aims to provide helpful and informative responses to users, responding to their queries with relevant and accurate information."""

    name: str = Field(default="base_helpfulness_pointwise")
    desc: str = Field(default=DEFAULT_HELPFULNESS_DESC)
    scenario: str = Field(
        default=DEFAULT_HELPFULNESS_SCENARIO, description="assistant scenario"
    )
    principles: List[str] = Field(default=DEFAULT_HELPFULNESS_PRINCIPLES)

BaseHonestyListWiseReward

Bases: BaseListWisePrincipleReward

The assistant aims to truthfully answer the user’s questions with no bias or prejudice.

Source code in rm_gallery/gallery/rm/alignment/base.py
73
74
75
76
77
78
79
80
81
82
@RewardRegistry.register("base_honesty_listwise")
class BaseHonestyListWiseReward(BaseListWisePrincipleReward):
    """The assistant aims to truthfully answer the user’s questions with no bias or prejudice."""

    name: str = Field(default="base_honesty_listwise")
    desc: str = Field(default=DEFAULT_HONESTY_DESC)
    scenario: str = Field(
        default=DEFAULT_HONESTY_SCENARIO, description="assistant scenario"
    )
    principles: List[str] = Field(default=DEFAULT_HONESTY_PRINCIPLES)

BaseHonestyPointWiseReward

Bases: BasePointWisePrincipleReward

The assistant aims to truthfully answer the user’s questions with no bias or prejudice.

Source code in rm_gallery/gallery/rm/alignment/base.py
109
110
111
112
113
114
115
116
117
118
@RewardRegistry.register("base_honesty_pointwise")
class BaseHonestyPointWiseReward(BasePointWisePrincipleReward):
    """The assistant aims to truthfully answer the user’s questions with no bias or prejudice."""

    name: str = Field(default="base_honesty_pointwise")
    desc: str = Field(default=DEFAULT_HONESTY_DESC)
    scenario: str = Field(
        default=DEFAULT_HONESTY_SCENARIO, description="assistant scenario"
    )
    principles: List[str] = Field(default=DEFAULT_HONESTY_PRINCIPLES)

BaseListWisePrincipleReward

Bases: BasePrincipleReward, BaseListWiseReward

List-wise principle evaluation using LLM.

Compares responses against each other based on ethical principles.

Source code in rm_gallery/core/reward/base.py
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
class BaseListWisePrincipleReward(BasePrincipleReward, BaseListWiseReward):
    """
    List-wise principle evaluation using LLM.

    Compares responses against each other based on ethical principles.
    """

    desc: str = Field(
        default="""Please act as an impartial judge and evaluate the quality of the answers provided by some assistants to the user question displayed below.
You should critically and accurately assess the assistant’s answer with the key principles and choose the assistant that follows the user’s query and answers the user’s question best.
Avoid any position biases and ensure that the order in which the responses were presented does not influence your decision.
Do not allow the length of the responses to influence your evaluation.
Be as goal as possible.""",
        description="description",
    )

    template: Type[BasePromptTemplate] = PrincipleListWiseTemplate

    def _before_evaluate(self, sample: DataSample, **kwargs) -> Dict:
        """
        Prepares list-wise evaluation parameters.

        Parameters:
            sample (DataSample): Multi-response sample to evaluate

        Returns:
            Dict: Parameters including all responses for comparison
        """
        params = super()._before_evaluate(sample=sample, **kwargs)
        answers = [output.answer.content for output in sample.output]
        params["answers"] = answers
        return params

    def _after_evaluate(
        self, response: PrincipleListWiseTemplate, sample: DataSample, **kwargs
    ) -> RewardResult:
        """
        Converts LLM response to list-wise ranking metrics.

        Parameters:
            response (PrincipleListWiseTemplate): Parsed LLM comparison

        Returns:
            RewardResult: Relative ranking of responses
        """
        scores = [0 for i in range(len(sample.output))]
        scores[response.best - 1] = 1
        return RewardResult(
            name=self.name,
            details=[
                RewardDimensionWithRank(
                    name=self.name, reason=response.reason, rank=scores
                )
            ],
        )

BasePointWisePrincipleReward

Bases: BasePrincipleReward, BasePointWiseReward

Point-wise principle evaluation using LLM.

Evaluates each response individually against ethical principles.

Source code in rm_gallery/core/reward/base.py
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
class BasePointWisePrincipleReward(BasePrincipleReward, BasePointWiseReward):
    """
    Point-wise principle evaluation using LLM.

    Evaluates each response individually against ethical principles.
    """

    desc: str = Field(
        default="""Please act as an unbiased and impartial evaluator tasked with assessing the quality of the responses provided below.
You should critically and accurately assess the assistant’s answer with the key principles without any potential bias.
Do not allow the length of the responses to influence your evaluation.
Be as goal as possible.""",
        description="description",
    )

    def _before_evaluate(self, sample: DataSample, **kwargs) -> Dict:
        """
        Adds response content to evaluation parameters.

        Parameters:
            sample (DataSample): Sample containing response to evaluate

        Returns:
            Dict: Parameters including response content
        """
        params = super()._before_evaluate(sample=sample, **kwargs)
        params["answer"] = sample.output[0].answer.content
        return params

    def _after_evaluate(
        self, response: PrinciplePointWiseTemplate, sample: DataSample, **kwargs
    ) -> RewardResult:
        """
        Converts LLM response to point-wise reward metrics.

        Parameters:
            response (PrinciplePointWiseTemplate): Parsed LLM evaluation

        Returns:
            RewardResult: Violation score with explanation
        """
        # Convert violation list to a single score (e.g., average or sum)
        score = (
            1 - len(response.violation) / len(self.principles)
            if response.violation
            else 1.0
        )
        return RewardResult(
            name=self.name,
            details=[
                RewardDimensionWithScore(
                    name=self.name, reason=response.reason, score=score
                )
            ],
        )

BasePointWiseReward

Bases: BaseReward

Point-wise reward module for individual response evaluation.

Evaluates each response independently without considering relative ranking.

Source code in rm_gallery/core/reward/base.py
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
class BasePointWiseReward(BaseReward):
    """
    Point-wise reward module for individual response evaluation.

    Evaluates each response independently without considering relative ranking.
    """

    @abstractmethod
    def _evaluate(
        self, sample: DataSample, **kwargs
    ) -> RewardResult[RewardDimensionWithScore]:
        """
        Processes a single response to generate reward metrics.

        Parameters:
            sample (DataSample): Single-response data sample
            **kwargs: Evaluation parameters

        Returns:
            RewardResult[RewardDimensionWithScore]: Response-specific reward metrics
        """
        ...

    def _parallel(
        self,
        func: Callable,
        sample: DataSample,
        thread_pool: ThreadPoolExecutor | None = None,
        **kwargs,
    ) -> DataSample:
        """
        Processes responses in a data sample using parallel or sequential execution.

        This method applies the provided function to each response in the sample,
        either in parallel using a thread pool or sequentially. Results are merged
        back into the corresponding response objects.

        Parameters:
            func (Callable): Function to apply to each response. Should accept a
                DataSample and return an object with 'details' and 'extra_data' attributes.
            sample (DataSample): Input sample containing multiple responses to process
            thread_pool (ThreadPoolExecutor | None): Optional thread pool for parallel execution
            **kwargs: Additional arguments passed to func

        Returns:
            DataSample: Modified copy of input sample with reward metrics updated in each response

        The method creates a deep copy of the input sample to avoid modifying original data.
        When using a thread pool, it submits tasks for each response and waits for completion
        before merging results. Response objects are updated with both reward details and
        additional metadata from processing results.
        """
        sample = sample.model_copy(deep=True)
        futures = []
        for i, output in enumerate(sample.output):
            # Create sub-sample for individual response processing
            subsample = DataSample(
                unique_id=sample.unique_id, input=sample.input, output=[output]
            )

            if thread_pool:
                futures.append(
                    (
                        i,
                        thread_pool.submit(
                            func, sample=subsample, thread_pool=thread_pool, **kwargs
                        ),
                    )
                )
            else:
                result = func(
                    sample=subsample,
                    thread_pool=thread_pool,
                    **kwargs,
                )
                output.answer.reward.details += result.details
                output.answer.additional_kwargs[self.name] = result.extra_data

        # Process parallel execution results
        if thread_pool:
            wait([future[-1] for future in futures], return_when=ALL_COMPLETED)
            # Merge results back into sample outputs
            for i, future in futures:
                result = future.result()
                output = sample.output[i]
                output.answer.reward.details += result.details
                output.answer.additional_kwargs[self.name] = result.extra_data

        for output in sample.output:
            if len(output.answer.reward.details) > 0:
                output.answer.reward.score = sum(
                    r.score for r in output.answer.reward.details
                ) / len(output.answer.reward.details)

        return sample

BrainstormingListWiseReward

Bases: BaseHelpfulnessListWiseReward

Brainstorming: Generating text to come up with new ideas or solutions, with an emphasis on creativity and driving thinking.

Source code in rm_gallery/gallery/rm/alignment/helpfulness/brainstorming.py
20
21
22
23
24
25
26
27
@RewardRegistry.register("brainstorming_listwise_reward")
class BrainstormingListWiseReward(BaseHelpfulnessListWiseReward):
    """Brainstorming: Generating text to come up with new ideas or solutions, with an emphasis on creativity and driving thinking."""

    name: str = Field(default="brainstorming_listwise_reward")
    scenario: str = Field(default=SCENARIO, description="assistant scenario")
    principles: List[str] = Field(default=PRINCIPLES)
    desc: str = Field(default=DESC)

ChatListWiseReward

Bases: BaseHelpfulnessListWiseReward

Chat: Simulates human conversation and communicates a variety of topics through text understanding and generation, emphasizing coherence and natural flow of interaction.

Source code in rm_gallery/gallery/rm/alignment/helpfulness/chat.py
22
23
24
25
26
27
28
29
@RewardRegistry.register("chat_listwise_reward")
class ChatListWiseReward(BaseHelpfulnessListWiseReward):
    """Chat: Simulates human conversation and communicates a variety of topics through text understanding and generation, emphasizing coherence and natural flow of interaction."""

    name: str = Field(default="chat_listwise_reward", description="reward name")
    scenario: str = Field(default=SCENARIO, description="assistant scenario")
    principles: List[str] = Field(default=PRINCIPLES)
    desc: str = Field(default=DESC)

ClassificationListWiseReward

Bases: BaseHelpfulnessListWiseReward

Classification: Entails assigning predefined categories or labels to text based on its content.

Source code in rm_gallery/gallery/rm/alignment/helpfulness/classification.py
18
19
20
21
22
23
24
25
26
27
@RewardRegistry.register("classification_listwise_reward")
class ClassificationListWiseReward(BaseHelpfulnessListWiseReward):
    """Classification: Entails assigning predefined categories or labels to text based on its content."""

    name: str = Field(
        default="classification_listwise_reward", description="reward name"
    )
    scenario: str = Field(default=SCENARIO, description="assistant scenario")
    principles: List[str] = Field(default=PRINCIPLES)
    desc: str = Field(default=DESC)

ClosedQAListWiseReward

Bases: BaseHelpfulnessListWiseReward

Closed QA: Search for direct answers to specific questions in given text sources (i.e. given context, given options).

Source code in rm_gallery/gallery/rm/alignment/helpfulness/closed_qa.py
16
17
18
19
20
21
22
23
@RewardRegistry.register("closed_qa_listwise_reward")
class ClosedQAListWiseReward(BaseHelpfulnessListWiseReward):
    """Closed QA: Search for direct answers to specific questions in given text sources (i.e. given context, given options)."""

    name: str = Field(default="closed_qa_listwise_reward", description="reward name")
    scenario: str = Field(default=SCENARIO, description="assistant scenario")
    principles: List[str] = Field(default=PRINCIPLES)
    desc: str = Field(default=DESC)

CodeListWiseReward

Bases: BaseHelpfulnessListWiseReward

Code: Involves generating, understanding, or modifying programming language code within text.

Source code in rm_gallery/gallery/rm/alignment/helpfulness/code.py
15
16
17
18
19
20
21
22
@RewardRegistry.register("code_listwise_reward")
class CodeListWiseReward(BaseHelpfulnessListWiseReward):
    """Code: Involves generating, understanding, or modifying programming language code within text."""

    name: str = Field(default="code_listwise_reward")
    scenario: str = Field(default=SCENARIO, description="assistant scenario")
    principles: List[str] = Field(default=PRINCIPLES)
    desc: str = Field(default=DESC)

DataSample

Bases: BaseModel

Complete data sample structure for reward modeling training and evaluation.

Represents a single interaction with input context, multiple possible outputs, and associated metadata for comprehensive reward model training.

Attributes:

Name Type Description
unique_id str

Unique identifier for tracking and deduplication

input List[ChatMessage]

Conversation context as list of chat messages

output List[DataOutput]

List of possible responses with evaluations

task_category Optional[str]

Optional categorization for task-specific analysis

source Optional[str]

Origin dataset or system that generated this sample

created_at datetime

Timestamp for temporal tracking

metadata Optional[Dict]

Additional context and debugging information

Source code in rm_gallery/core/data/schema.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
class DataSample(BaseModel):
    """
    Complete data sample structure for reward modeling training and evaluation.

    Represents a single interaction with input context, multiple possible outputs,
    and associated metadata for comprehensive reward model training.

    Attributes:
        unique_id: Unique identifier for tracking and deduplication
        input: Conversation context as list of chat messages
        output: List of possible responses with evaluations
        task_category: Optional categorization for task-specific analysis
        source: Origin dataset or system that generated this sample
        created_at: Timestamp for temporal tracking
        metadata: Additional context and debugging information
    """

    unique_id: str = Field(..., description="Unique identifier for the data")
    input: List[ChatMessage] = Field(default_factory=list, description="input")
    output: List[DataOutput] = Field(default_factory=list, description="output")
    task_category: Optional[str] = Field(default=None, description="task category")
    source: Optional[str] = Field(default=None, description="source")
    created_at: datetime = Field(default_factory=datetime.now, description="createdAt")
    metadata: Optional[Dict] = Field(default=None, description="metadata")

    def update(self, sample: "DataSample") -> "DataSample":
        """
        Merge another sample's data into this sample for combining evaluations.

        Updates additional_kwargs and reward details from the source sample
        while preserving the original structure.

        Args:
            sample: Source sample to merge data from

        Returns:
            Self with updated data for method chaining
        """
        self.input[-1].additional_kwargs.update(sample.input[-1].additional_kwargs)
        for i, output in enumerate(self.output):
            output.answer.additional_kwargs.update(
                sample.output[i].answer.additional_kwargs
            )
            output.answer.reward.details.extend(sample.output[i].answer.reward.details)

            if output.steps:
                for j, step in output.steps:
                    step.additional_kwargs.update(
                        sample.output[i].steps[j].additional_kwargs
                    )
                    step.reward.details.extend(sample.output[i].steps[j].reward.details)
        return self

    class Config:
        arbitrary_types_allowed = True
        json_encoders = {datetime: lambda v: v.isoformat()}

update(sample)

Merge another sample's data into this sample for combining evaluations.

Updates additional_kwargs and reward details from the source sample while preserving the original structure.

Parameters:

Name Type Description Default
sample DataSample

Source sample to merge data from

required

Returns:

Type Description
DataSample

Self with updated data for method chaining

Source code in rm_gallery/core/data/schema.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def update(self, sample: "DataSample") -> "DataSample":
    """
    Merge another sample's data into this sample for combining evaluations.

    Updates additional_kwargs and reward details from the source sample
    while preserving the original structure.

    Args:
        sample: Source sample to merge data from

    Returns:
        Self with updated data for method chaining
    """
    self.input[-1].additional_kwargs.update(sample.input[-1].additional_kwargs)
    for i, output in enumerate(self.output):
        output.answer.additional_kwargs.update(
            sample.output[i].answer.additional_kwargs
        )
        output.answer.reward.details.extend(sample.output[i].answer.reward.details)

        if output.steps:
            for j, step in output.steps:
                step.additional_kwargs.update(
                    sample.output[i].steps[j].additional_kwargs
                )
                step.reward.details.extend(sample.output[i].steps[j].reward.details)
    return self

DetoxifyReward

Bases: BasePointWiseReward

Detoxify: Detecting different types of of toxicity like threats, obscenity, insults ans so on.

Source code in rm_gallery/gallery/rm/alignment/harmlessness/detoxification.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
@RewardRegistry.register("detoxify_reward")
class DetoxifyReward(BasePointWiseReward):
    """Detoxify: Detecting different types of of toxicity like threats, obscenity, insults ans so on."""

    name: str = Field(default="detoxify", description="Name of the reward module")
    model_name: str = Field(
        default="unbiased", description="Name of the Detoxify model to use"
    )

    @property
    def model(self):
        if not hasattr(self, "_model"):
            from detoxify import Detoxify

            self._model = Detoxify(self.model_name)
        return self._model

    def _evaluate(self, sample: DataSample, **kwargs) -> RewardResult:
        """
        Evaluate text toxicity using Detoxify model.

        Args:
            sample: Input data sample containing text to evaluate
            **kwargs: Additional implementation-specific parameters

        Returns:
            RewardResult: Computed reward metrics and metadata
        """
        try:
            # Get text from sample
            text = sample.output[0] if sample.output else sample.input

            if not text:
                raise ValueError("No text provided for evaluation")

            # Get model predictions
            predictions = self.model.predict(text)

            # Convert toxicity score to reward (higher = less toxic)
            toxicity_score = predictions["toxicity"]
            reward_score = 1.0 - toxicity_score  # Invert score so higher is better

            # Create reward dimension
            reward_dimension = RewardDimensionWithScore(
                name="detoxify",
                score=reward_score,
                reason=f"Text toxicity score: {toxicity_score:.2f}. Higher reward indicates less toxic content.",
            )

            return RewardResult(name=self.name, details=[reward_dimension])

        except Exception as e:
            logger.error(f"Error in Detoxify evaluation: {str(e)}")
            return RewardResult(name=self.name, details=[])

FactualityListWiseReward

Bases: BaseHonestyListWiseReward

Factuality: Detects hallucinations and other basic errors in completions.

Source code in rm_gallery/gallery/rm/alignment/honesty/factuality.py
19
20
21
22
23
24
25
26
@RewardRegistry.register("factuality_listwise_reward")
class FactualityListWiseReward(BaseHonestyListWiseReward):
    """Factuality: Detects hallucinations and other basic errors in completions."""

    name: str = Field(default="factuality_listwise_reward")
    desc: str = Field(default=DESC)
    scenario: str = Field(default=SCENARIO, description="assistant scenario")
    principles: List[str] = Field(default=PRINCIPLES)

FocusListWiseReward

Bases: BaseHelpfulnessListWiseReward

Focus: Detects high-quality, on-topic answers to general user queries

Source code in rm_gallery/gallery/rm/alignment/helpfulness/focus.py
19
20
21
22
23
24
25
26
@RewardRegistry.register("focus_listwise_reward")
class FocusListWiseReward(BaseHelpfulnessListWiseReward):
    """Focus: Detects high-quality, on-topic answers to general user queries"""

    name: str = Field(default="focus_listwise_reward")
    desc: str = Field(default=DESC)
    scenario: str = Field(default=SCENARIO, description="assistant scenario")
    principles: List[str] = Field(default=PRINCIPLES)

GenerationListWiseReward

Bases: BaseHelpfulnessListWiseReward

Generation: Creating new textual content, from articles to stories, with an emphasis on originality and creativity.

Source code in rm_gallery/gallery/rm/alignment/helpfulness/generation.py
21
22
23
24
25
26
27
28
@RewardRegistry.register("generation_listwise_reward")
class GenerationListWiseReward(BaseHelpfulnessListWiseReward):
    """Generation: Creating new textual content, from articles to stories, with an emphasis on originality and creativity."""

    name: str = Field(default="generation_listwise_reward", description="reward name")
    scenario: str = Field(default=SCENARIO, description="assistant scenario")
    principles: List[str] = Field(default=PRINCIPLES)
    desc: str = Field(default=DESC)

HelpSteer2PairwiseConverter

Bases: DataConverter

Converter for HelpSteer2 pairwise data format Can handle data from both local files and HuggingFace Hub Converts each data entry into two DataSamples with swapped responses

Source code in rm_gallery/gallery/data/load/helpsteer2_pairwise.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
@DataConverterRegistry.register("helpsteer2_pairwise")
class HelpSteer2PairwiseConverter(DataConverter):
    """
    Converter for HelpSteer2 pairwise data format
    Can handle data from both local files and HuggingFace Hub
    Converts each data entry into two DataSamples with swapped responses
    """

    def convert_to_data_sample(
        self, data_dict: Dict[str, Any], source_info: Dict[str, Any]
    ) -> Union[DataSample, List[DataSample]]:
        """Convert HelpSteer2 pairwise data to DataSample format"""

        try:
            # Create input from prompt
            data_input = [ChatMessage(role="user", content=data_dict["prompt"])]

            # Determine preference based on preference_strength
            preference_strength = data_dict.get("preference_strength", 0)
            if preference_strength > 0:
                # response_2 is better
                preferred_in_original = "response_2"
            elif preference_strength < 0:
                # response_1 is better
                preferred_in_original = "response_1"
            else:
                # tie
                preferred_in_original = "tie"

            data_samples = []

            # Create first sample: response_A = response_1, response_B = response_2
            sample1_id = hashlib.md5(f"{str(data_dict)}_sample1".encode()).hexdigest()

            # Determine preferred for first sample
            if preferred_in_original == "response_1":
                preferred_1 = "A"  # response_A (response_1) is preferred
            elif preferred_in_original == "response_2":
                preferred_1 = "B"  # response_B (response_2) is preferred
            else:
                preferred_1 = "tie"

            # Create outputs for first sample
            output_1 = [
                DataOutput(
                    answer=Step(
                        role="assistant",
                        content=data_dict["response_1"],
                        label={"response_type": "A"},
                    )
                ),
                DataOutput(
                    answer=Step(
                        role="assistant",
                        content=data_dict["response_2"],
                        label={"response_type": "B"},
                    )
                ),
            ]

            # Build metadata for first sample
            metadata_1 = {
                "raw_data": data_dict,
                "load_strategy": "HelpSteer2PairwiseConverter",
                "response_A": data_dict["response_1"],
                "response_B": data_dict["response_2"],
                "preferred": preferred_1,
                "preference_strength": preference_strength,
                "preference_statement": data_dict.get("preference_statement"),
                "preference_elaboration": data_dict.get("preference_elaboration"),
                "sample_type": "original_order",
            }

            # Add source-specific metadata
            if source_info.get("load_type") == "local":
                metadata_1.update(
                    {
                        "source_file_path": source_info.get("source_file_path"),
                        "load_type": "local",
                    }
                )
            elif source_info.get("load_type") == "huggingface":
                metadata_1.update(
                    {
                        "dataset_name": source_info.get(
                            "dataset_name", "nvidia/HelpSteer2"
                        ),
                        "dataset_config": source_info.get("dataset_config"),
                        "split": source_info.get("split", "train"),
                        "load_type": "huggingface",
                    }
                )

            sample_1 = DataSample(
                unique_id=sample1_id,
                input=data_input,
                output=output_1,
                source="helpsteer2_pairwise",
                task_category="chat_pairwise",
                metadata=metadata_1,
            )
            data_samples.append(sample_1)

            # Create second sample: response_A = response_2, response_B = response_1 (swapped)
            sample2_id = hashlib.md5(f"{str(data_dict)}_sample2".encode()).hexdigest()

            # Determine preferred for second sample (swapped)
            if preferred_in_original == "response_1":
                preferred_2 = "B"  # response_B (response_1) is preferred
            elif preferred_in_original == "response_2":
                preferred_2 = "A"  # response_A (response_2) is preferred
            else:
                preferred_2 = "tie"

            # Create outputs for second sample (swapped)
            output_2 = [
                DataOutput(
                    answer=Step(
                        role="assistant",
                        content=data_dict["response_2"],
                        label={"response_type": "A"},
                    )
                ),
                DataOutput(
                    answer=Step(
                        role="assistant",
                        content=data_dict["response_1"],
                        label={"response_type": "B"},
                    )
                ),
            ]

            # Build metadata for second sample
            metadata_2 = {
                "raw_data": data_dict,
                "load_strategy": "HelpSteer2PairwiseConverter",
                "response_A": data_dict["response_2"],
                "response_B": data_dict["response_1"],
                "preferred": preferred_2,
                "preference_strength": preference_strength,
                "preference_statement": data_dict.get("preference_statement"),
                "preference_elaboration": data_dict.get("preference_elaboration"),
                "sample_type": "swapped_order",
            }

            # Add source-specific metadata
            if source_info.get("load_type") == "local":
                metadata_2.update(
                    {
                        "source_file_path": source_info.get("source_file_path"),
                        "load_type": "local",
                    }
                )
            elif source_info.get("load_type") == "huggingface":
                metadata_2.update(
                    {
                        "dataset_name": source_info.get(
                            "dataset_name", "nvidia/HelpSteer2"
                        ),
                        "dataset_config": source_info.get("dataset_config"),
                        "split": source_info.get("split", "train"),
                        "load_type": "huggingface",
                    }
                )

            sample_2 = DataSample(
                unique_id=sample2_id,
                input=data_input,
                output=output_2,
                source="helpsteer2_pairwise",
                task_category="chat_pairwise",
                metadata=metadata_2,
            )
            data_samples.append(sample_2)

            return data_samples

        except Exception as e:
            logger.error(f"Error creating HelpSteer2 Pairwise DataSample: {str(e)}")
            return None

convert_to_data_sample(data_dict, source_info)

Convert HelpSteer2 pairwise data to DataSample format

Source code in rm_gallery/gallery/data/load/helpsteer2_pairwise.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
def convert_to_data_sample(
    self, data_dict: Dict[str, Any], source_info: Dict[str, Any]
) -> Union[DataSample, List[DataSample]]:
    """Convert HelpSteer2 pairwise data to DataSample format"""

    try:
        # Create input from prompt
        data_input = [ChatMessage(role="user", content=data_dict["prompt"])]

        # Determine preference based on preference_strength
        preference_strength = data_dict.get("preference_strength", 0)
        if preference_strength > 0:
            # response_2 is better
            preferred_in_original = "response_2"
        elif preference_strength < 0:
            # response_1 is better
            preferred_in_original = "response_1"
        else:
            # tie
            preferred_in_original = "tie"

        data_samples = []

        # Create first sample: response_A = response_1, response_B = response_2
        sample1_id = hashlib.md5(f"{str(data_dict)}_sample1".encode()).hexdigest()

        # Determine preferred for first sample
        if preferred_in_original == "response_1":
            preferred_1 = "A"  # response_A (response_1) is preferred
        elif preferred_in_original == "response_2":
            preferred_1 = "B"  # response_B (response_2) is preferred
        else:
            preferred_1 = "tie"

        # Create outputs for first sample
        output_1 = [
            DataOutput(
                answer=Step(
                    role="assistant",
                    content=data_dict["response_1"],
                    label={"response_type": "A"},
                )
            ),
            DataOutput(
                answer=Step(
                    role="assistant",
                    content=data_dict["response_2"],
                    label={"response_type": "B"},
                )
            ),
        ]

        # Build metadata for first sample
        metadata_1 = {
            "raw_data": data_dict,
            "load_strategy": "HelpSteer2PairwiseConverter",
            "response_A": data_dict["response_1"],
            "response_B": data_dict["response_2"],
            "preferred": preferred_1,
            "preference_strength": preference_strength,
            "preference_statement": data_dict.get("preference_statement"),
            "preference_elaboration": data_dict.get("preference_elaboration"),
            "sample_type": "original_order",
        }

        # Add source-specific metadata
        if source_info.get("load_type") == "local":
            metadata_1.update(
                {
                    "source_file_path": source_info.get("source_file_path"),
                    "load_type": "local",
                }
            )
        elif source_info.get("load_type") == "huggingface":
            metadata_1.update(
                {
                    "dataset_name": source_info.get(
                        "dataset_name", "nvidia/HelpSteer2"
                    ),
                    "dataset_config": source_info.get("dataset_config"),
                    "split": source_info.get("split", "train"),
                    "load_type": "huggingface",
                }
            )

        sample_1 = DataSample(
            unique_id=sample1_id,
            input=data_input,
            output=output_1,
            source="helpsteer2_pairwise",
            task_category="chat_pairwise",
            metadata=metadata_1,
        )
        data_samples.append(sample_1)

        # Create second sample: response_A = response_2, response_B = response_1 (swapped)
        sample2_id = hashlib.md5(f"{str(data_dict)}_sample2".encode()).hexdigest()

        # Determine preferred for second sample (swapped)
        if preferred_in_original == "response_1":
            preferred_2 = "B"  # response_B (response_1) is preferred
        elif preferred_in_original == "response_2":
            preferred_2 = "A"  # response_A (response_2) is preferred
        else:
            preferred_2 = "tie"

        # Create outputs for second sample (swapped)
        output_2 = [
            DataOutput(
                answer=Step(
                    role="assistant",
                    content=data_dict["response_2"],
                    label={"response_type": "A"},
                )
            ),
            DataOutput(
                answer=Step(
                    role="assistant",
                    content=data_dict["response_1"],
                    label={"response_type": "B"},
                )
            ),
        ]

        # Build metadata for second sample
        metadata_2 = {
            "raw_data": data_dict,
            "load_strategy": "HelpSteer2PairwiseConverter",
            "response_A": data_dict["response_2"],
            "response_B": data_dict["response_1"],
            "preferred": preferred_2,
            "preference_strength": preference_strength,
            "preference_statement": data_dict.get("preference_statement"),
            "preference_elaboration": data_dict.get("preference_elaboration"),
            "sample_type": "swapped_order",
        }

        # Add source-specific metadata
        if source_info.get("load_type") == "local":
            metadata_2.update(
                {
                    "source_file_path": source_info.get("source_file_path"),
                    "load_type": "local",
                }
            )
        elif source_info.get("load_type") == "huggingface":
            metadata_2.update(
                {
                    "dataset_name": source_info.get(
                        "dataset_name", "nvidia/HelpSteer2"
                    ),
                    "dataset_config": source_info.get("dataset_config"),
                    "split": source_info.get("split", "train"),
                    "load_type": "huggingface",
                }
            )

        sample_2 = DataSample(
            unique_id=sample2_id,
            input=data_input,
            output=output_2,
            source="helpsteer2_pairwise",
            task_category="chat_pairwise",
            metadata=metadata_2,
        )
        data_samples.append(sample_2)

        return data_samples

    except Exception as e:
        logger.error(f"Error creating HelpSteer2 Pairwise DataSample: {str(e)}")
        return None

HelpSteer2PointwiseConverter

Bases: DataConverter

Unified converter for HelpSteer2 data format Can handle data from both local files and HuggingFace Hub

Source code in rm_gallery/gallery/data/load/helpsteer2_pointwise.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
@DataConverterRegistry.register("helpsteer2_pointwise")
class HelpSteer2PointwiseConverter(DataConverter):
    """
    Unified converter for HelpSteer2 data format
    Can handle data from both local files and HuggingFace Hub
    """

    def convert_to_data_sample(
        self, data_dict: Dict[str, Any], source_info: Dict[str, Any]
    ) -> Union[DataSample, List[DataSample]]:
        """Convert HelpSteer2 data to DataSample format"""
        # Generate unique id
        content = str(data_dict)
        unique_id = hashlib.md5(content.encode()).hexdigest()

        try:
            # Create input from prompt
            data_input = [ChatMessage(role="user", content=data_dict["prompt"])]

            # Extract evaluation metrics for label
            label = {
                "helpfulness": data_dict.get("helpfulness"),
                "correctness": data_dict.get("correctness"),
                "coherence": data_dict.get("coherence"),
                "complexity": data_dict.get("complexity"),
                "verbosity": data_dict.get("verbosity"),
            }

            # Create output from response
            data_output = [
                DataOutput(
                    answer=Step(
                        role="assistant", content=data_dict["response"], label=label
                    )
                )
            ]

            # Build metadata based on source type
            metadata = {
                "raw_data": data_dict,
                "load_strategy": "HelpSteer2Converter",
            }

            # Add source-specific metadata
            if source_info.get("load_type") == "local":
                metadata.update(
                    {
                        "source_file_path": source_info.get("source_file_path"),
                        "load_type": "local",
                    }
                )
            elif source_info.get("load_type") == "huggingface":
                metadata.update(
                    {
                        "dataset_name": source_info.get(
                            "dataset_name", "nvidia/HelpSteer2"
                        ),
                        "dataset_config": source_info.get("dataset_config"),
                        "split": source_info.get("split", "train"),
                        "load_type": "huggingface",
                    }
                )

            data_sample = DataSample(
                unique_id=unique_id,
                input=data_input,
                output=data_output,
                source="helpsteer2",
                task_category="chat",
                metadata=metadata,
            )

            return [data_sample]

        except Exception as e:
            logger.error(f"Error creating HelpSteer2 DataSample: {str(e)}")
            return None

convert_to_data_sample(data_dict, source_info)

Convert HelpSteer2 data to DataSample format

Source code in rm_gallery/gallery/data/load/helpsteer2_pointwise.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def convert_to_data_sample(
    self, data_dict: Dict[str, Any], source_info: Dict[str, Any]
) -> Union[DataSample, List[DataSample]]:
    """Convert HelpSteer2 data to DataSample format"""
    # Generate unique id
    content = str(data_dict)
    unique_id = hashlib.md5(content.encode()).hexdigest()

    try:
        # Create input from prompt
        data_input = [ChatMessage(role="user", content=data_dict["prompt"])]

        # Extract evaluation metrics for label
        label = {
            "helpfulness": data_dict.get("helpfulness"),
            "correctness": data_dict.get("correctness"),
            "coherence": data_dict.get("coherence"),
            "complexity": data_dict.get("complexity"),
            "verbosity": data_dict.get("verbosity"),
        }

        # Create output from response
        data_output = [
            DataOutput(
                answer=Step(
                    role="assistant", content=data_dict["response"], label=label
                )
            )
        ]

        # Build metadata based on source type
        metadata = {
            "raw_data": data_dict,
            "load_strategy": "HelpSteer2Converter",
        }

        # Add source-specific metadata
        if source_info.get("load_type") == "local":
            metadata.update(
                {
                    "source_file_path": source_info.get("source_file_path"),
                    "load_type": "local",
                }
            )
        elif source_info.get("load_type") == "huggingface":
            metadata.update(
                {
                    "dataset_name": source_info.get(
                        "dataset_name", "nvidia/HelpSteer2"
                    ),
                    "dataset_config": source_info.get("dataset_config"),
                    "split": source_info.get("split", "train"),
                    "load_type": "huggingface",
                }
            )

        data_sample = DataSample(
            unique_id=unique_id,
            input=data_input,
            output=data_output,
            source="helpsteer2",
            task_category="chat",
            metadata=metadata,
        )

        return [data_sample]

    except Exception as e:
        logger.error(f"Error creating HelpSteer2 DataSample: {str(e)}")
        return None

LengthPenaltyReward

Bases: BasePointWiseReward

Text length based penalty for content that is too short or too long.

Source code in rm_gallery/gallery/rm/format/format.py
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
@RewardRegistry.register("length_penalty")
class LengthPenaltyReward(BasePointWiseReward):
    """
    Text length based penalty for content that is too short or too long.
    """

    name: str = Field(default="length_penalty", description="Length penalty reward")
    min_length: int = Field(default=10, description="Minimum length")
    max_length: int = Field(default=1000, description="Maximum length")
    penalty_rate: float = Field(default=0.01, description="Penalty rate")

    def _evaluate(
        self, sample: DataSample, **kwargs
    ) -> RewardResult[RewardDimensionWithScore]:
        """
        Check code syntax.

        Args:
            sample: Data sample containing code content

        Returns:
            RewardResult: Reward result containing syntax check score
        """
        content = sample.output[0].answer.content
        length = len(content)

        penalty = 0.0
        reason_parts = []

        if length < self.min_length:
            penalty = -(self.min_length - length) * self.penalty_rate
            reason_parts.append(f"Too short: {length} < {self.min_length}")
        elif length > self.max_length:
            penalty = -(length - self.max_length) * self.penalty_rate
            reason_parts.append(f"Too long: {length} > {self.max_length}")
        else:
            reason_parts.append(
                f"Length acceptable: {self.min_length} <= {length} <= {self.max_length}"
            )

        return RewardResult(
            name=self.name,
            details=[
                RewardDimensionWithScore(
                    name=self.name, score=penalty, reason="; ".join(reason_parts)
                )
            ],
            extra_data={
                "length": length,
                "min_length": self.min_length,
                "max_length": self.max_length,
                "penalty": penalty,
            },
        )

MathListWiseReward

Bases: BaseHelpfulnessListWiseReward

Math: Solves problems at math, on open-ended human prompts ranging from middle school physics and geometry to college-level chemistry, calculus, combinatorics, and more.

Source code in rm_gallery/gallery/rm/alignment/helpfulness/math.py
18
19
20
21
22
23
24
25
@RewardRegistry.register("math_listwise_reward")
class MathListWiseReward(BaseHelpfulnessListWiseReward):
    """Math: Solves problems at math, on open-ended human prompts ranging from middle school physics and geometry to college-level chemistry, calculus, combinatorics, and more."""

    name: str = Field(default="math_listwise_reward")
    desc: str = Field(default=DESC)
    scenario: str = Field(default=SCENARIO, description="assistant scenario")
    principles: List[str] = Field(default=PRINCIPLES)

NgramRepetitionPenaltyReward

Bases: BasePointWiseReward

Calculate N-gram repetition penalty supporting Chinese processing and multiple penalty strategies.

Source code in rm_gallery/gallery/rm/format/format.py
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
@RewardRegistry.register("ngram_repetition_penalty")
class NgramRepetitionPenaltyReward(BasePointWiseReward):
    """
    Calculate N-gram repetition penalty supporting Chinese processing and multiple penalty strategies.
    """

    name: str = Field(
        default="ngram_repetition_penalty",
        description="N-gram repetition penalty reward",
    )
    n: int = Field(default=3, description="N value for N-gram")

    # Hard threshold penalty parameters
    penalty_threshold: float = Field(
        default=0.3, description="Repetition rate threshold (hard threshold mode)"
    )
    penalty_rate: float = Field(
        default=1.0, description="Penalty multiplier (hard threshold mode)"
    )

    # Soft penalty parameters
    use_soft_penalty: bool = Field(
        default=False, description="Whether to use soft penalty mode"
    )
    max_penalty: float = Field(
        default=-1.0,
        description="Maximum penalty value (soft penalty mode, should be negative)",
    )
    min_scaling: float = Field(
        default=0.0, description="Minimum scaling threshold (soft penalty mode)"
    )

    # Tokenizer parameters
    tokenizer_type: str = Field(
        default="tiktoken",
        description="Tokenizer type: 'tiktoken', 'jieba', or 'simple'",
    )
    encoding_name: str = Field(
        default="cl100k_base",
        description="Tiktoken encoding name (for tiktoken tokenizer)",
    )
    chinese_only: bool = Field(
        default=False,
        description="Whether to keep only Chinese characters (for jieba tokenizer)",
    )

    # Analysis scope parameters
    analyze_scope: str = Field(
        default="full",
        description="Analysis scope: 'full' or 'thought' (thought process only)",
    )

    def __init__(self, **data):
        super().__init__(**data)
        # Initialize tokenizer
        self._tokenizer = get_tokenizer(
            tokenizer_type=self.tokenizer_type,
            encoding_name=self.encoding_name,
            chinese_only=self.chinese_only,
        )

    def _extract_thought_process(self, content: str) -> str:
        """Extract thought process"""
        think_pattern = r"<think>(.*?)</think>"
        matches = re.findall(think_pattern, content, re.DOTALL)
        return " ".join(matches) if matches else ""

    def _generate_ngrams(self, tokens: List[str]) -> List[tuple]:
        """Generate N-grams"""
        if len(tokens) < self.n:
            return []

        # Use unified approach for all tokenizers
        ngrams = []
        for i in range(len(tokens) - self.n + 1):
            ngrams.append(tuple(tokens[i : i + self.n]))
        return ngrams

    def _calculate_penalty(self, repetition_rate: float) -> float:
        """Calculate penalty value"""
        if self.use_soft_penalty:
            # Soft penalty mode
            if self.max_penalty > 0:
                raise ValueError(
                    f"max_penalty {self.max_penalty} should not be positive"
                )

            scaling = repetition_rate
            if scaling < self.min_scaling:
                scaling = 0.0
            elif scaling > self.min_scaling:
                scaling = (scaling - self.min_scaling) / (1 - self.min_scaling)

            return scaling * self.max_penalty
        else:
            # Hard threshold mode (original logic)
            if repetition_rate > self.penalty_threshold:
                return -(repetition_rate - self.penalty_threshold) * self.penalty_rate
            return 0.0

    def _evaluate(
        self, sample: DataSample, **kwargs
    ) -> RewardResult[RewardDimensionWithScore]:
        """
        Calculate N-gram repetition penalty

        Args:
            sample: Data sample containing text content

        Returns:
            RewardResult: Reward result containing N-gram repetition penalty score
        """
        content = sample.output[0].answer.content

        # Select text based on analysis scope
        if self.analyze_scope == "thought":
            text_to_analyze = self._extract_thought_process(content)
            if not text_to_analyze:
                return RewardResult(
                    name=self.name,
                    details=[
                        RewardDimensionWithScore(
                            name=self.name,
                            score=0.0,
                            reason="No thought process found to analyze",
                        )
                    ],
                    extra_data={
                        "analyze_scope": self.analyze_scope,
                        "text_to_analyze": text_to_analyze,
                    },
                )
        else:
            text_to_analyze = content

        # Tokenization using unified tokenizer
        preprocessed_text = self._tokenizer.preprocess_text(
            text_to_analyze,
            to_lower=(
                self.tokenizer_type != "jieba"
            ),  # Keep case for Chinese tokenization
        )
        tokens = self._tokenizer.tokenize(preprocessed_text)

        if len(tokens) < self.n:
            return RewardResult(
                name=self.name,
                details=[
                    RewardDimensionWithScore(
                        name=self.name,
                        score=0.0,
                        reason=f"Text too short for {self.n}-gram analysis",
                    )
                ],
                extra_data={
                    "token_count": len(tokens),
                    "n": self.n,
                    "analyze_scope": self.analyze_scope,
                    "tokenizer_type": self.tokenizer_type,
                },
            )

        # Generate N-grams
        ngrams = self._generate_ngrams(tokens)

        if not ngrams:
            return RewardResult(
                name=self.name,
                details=[
                    RewardDimensionWithScore(
                        name=self.name,
                        score=0.0,
                        reason="No ngrams generated",
                    )
                ],
                extra_data={
                    "token_count": len(tokens),
                    "n": self.n,
                    "analyze_scope": self.analyze_scope,
                    "tokenizer_type": self.tokenizer_type,
                },
            )

        # Calculate repetition rate
        ngram_counts = Counter(ngrams)
        total_ngrams = len(ngrams)
        unique_ngrams = len(ngram_counts)
        repetition_rate = (
            1 - (unique_ngrams / total_ngrams) if total_ngrams > 0 else 0.0
        )

        # Calculate penalty
        penalty = self._calculate_penalty(repetition_rate)

        # Build reason description
        penalty_mode = "soft" if self.use_soft_penalty else "hard"

        return RewardResult(
            name=self.name,
            details=[
                RewardDimensionWithScore(
                    name=self.name,
                    score=penalty,
                    reason=f"{self.n}-gram repetition rate: {repetition_rate:.3f}, penalty: {penalty:.3f} ({penalty_mode} penalty, {self.tokenizer_type} tokenizer, scope: {self.analyze_scope})",
                )
            ],
            extra_data={
                "repetition_rate": repetition_rate,
                "unique_ngrams": unique_ngrams,
                "total_ngrams": total_ngrams,
                "penalty": penalty,
                "most_common_ngrams": ngram_counts.most_common(5),
                "analyze_scope": self.analyze_scope,
                "tokenizer_type": self.tokenizer_type,
                "use_soft_penalty": self.use_soft_penalty,
                "penalty_mode": penalty_mode,
            },
        )

OpenQAListWiseReward

Bases: BaseHelpfulnessListWiseReward

Open QA: Search for answers across a wide range of text sources. The challenge is to process large amounts of information and understand complex questions.

Source code in rm_gallery/gallery/rm/alignment/helpfulness/open_qa.py
15
16
17
18
19
20
21
22
@RewardRegistry.register("open_qa_listwise_reward")
class OpenQAListWiseReward(BaseHelpfulnessListWiseReward):
    """Open QA: Search for answers across a wide range of text sources. The challenge is to process large amounts of information and understand complex questions."""

    name: str = Field(default="open_qa_listwise_reward")
    scenario: str = Field(default=SCENARIO, description="assistant scenario")
    principles: List[str] = Field(default=PRINCIPLES)
    desc: str = Field(default=DESC)

PRMBenchConverter

Bases: DataConverter

Unified converter for Process Reward Model (PRM) data Handles mathematical reasoning data with step-wise processes

Source code in rm_gallery/gallery/data/load/prmbench.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
@DataConverterRegistry.register("prmbench")
class PRMBenchConverter(DataConverter):
    """
    Unified converter for Process Reward Model (PRM) data
    Handles mathematical reasoning data with step-wise processes
    """

    # define as class attribute instead of instance attribute
    DIMENSION_CLASSIFICATION_MAPPING: ClassVar[Dict[str, str]] = {
        "confidence": "confidence",
        "*": None,  # wildcard, means no filtering
    }

    def convert_to_data_sample(
        self, data_dict: Dict[str, Any], source_info: Dict[str, Any]
    ) -> DataSample:
        """Convert PRM data to DataSample format

        Expected input format:
        {
            "original_question": "...",
            "modified_question": "...",
            "original_process": ["step1", "step2", ...],
            "modified_process": ["step1", "step2", ...],
            "modified_steps": [5, 6],
            "error_steps": [5, 6],
            "reason": "...",
            "idx": "...",
            "question": "...",
            "classification": "confidence"
        }
        """

        # Generate unique id from idx or question
        unique_id = data_dict.get(
            "idx", hashlib.md5(str(data_dict.get("question", "")).encode()).hexdigest()
        )

        try:
            # Create input from question
            data_input = self._create_prm_input(data_dict)

            # Create outputs from processes
            data_output = self._create_prm_output(data_dict)

            # Build metadata based on source type
            metadata = {
                "classification": data_dict.get("classification"),
                "modified_steps": data_dict.get("modified_steps", []),
                "error_steps": data_dict.get("error_steps", []),
                "reason": data_dict.get("reason"),
                "idx": data_dict.get("idx"),
                "original_process_length": len(data_dict.get("original_process", [])),
                "modified_process_length": len(data_dict.get("modified_process", [])),
                "load_strategy": "PRMBenchConverter",
            }

            # Add source-specific metadata
            if source_info.get("load_type") == "local":
                metadata.update(
                    {
                        "source_file_path": source_info.get("source_file_path"),
                        "load_type": "local",
                    }
                )
            elif source_info.get("load_type") == "huggingface":
                metadata.update(
                    {
                        "dataset_name": source_info.get("dataset_name"),
                        "dataset_config": source_info.get("dataset_config"),
                        "split": source_info.get("split", "train"),
                        "load_type": "huggingface",
                    }
                )

            # Create DataSample object
            data_sample = DataSample(
                unique_id=str(unique_id),
                input=data_input,
                output=data_output,
                source="prmbench",
                task_category=data_dict.get("classification", "reasoning"),
                metadata=metadata,
            )

            return data_sample

        except Exception as e:
            logger.error(f"Error creating DataSample from PRM data: {str(e)}")
            return None

    def _create_prm_input(self, data_dict: Dict[str, Any]) -> list[ChatMessage]:
        """Create DataInput from PRM question"""
        question = data_dict.get("question") or data_dict.get("original_question", "")
        return [ChatMessage(role="user", content=question)]

    def _create_prm_output(self, data_dict: Dict[str, Any]) -> list[DataOutput]:
        """Create DataOutput list from PRM processes"""
        outputs = []

        # Original process output
        if "original_process" in data_dict:
            original_steps = []
            for i, step_content in enumerate(data_dict["original_process"]):
                step = Step(
                    role="assistant",
                    content=step_content,
                    label={"correctness": "correct", "step_idx": i + 1},
                )
                original_steps.append(step)

            outputs.append(
                DataOutput(
                    answer=Step(
                        role="assistant",
                        content="\n".join(data_dict["original_process"]),
                        label={"process_type": "original_correct"},
                    ),
                    steps=original_steps,
                )
            )

        # Modified process output (with errors)
        if "modified_process" in data_dict:
            modified_steps = []
            error_steps = set(data_dict.get("error_steps", []))

            for i, step_content in enumerate(data_dict["modified_process"]):
                step_idx = i + 1
                is_correct = step_idx not in error_steps

                step = Step(
                    role="assistant",
                    content=step_content,
                    label={
                        "correctness": "correct" if is_correct else "error",
                        "step_idx": step_idx,
                    },
                )
                modified_steps.append(step)

            # Calculate correctness score based on error ratio
            total_steps = len(data_dict["modified_process"])
            error_count = len(error_steps)

            outputs.append(
                DataOutput(
                    answer=Step(
                        role="assistant",
                        content="\n".join(data_dict["modified_process"]),
                        label={
                            "process_type": f"Modified process with {error_count}/{total_steps} error steps"
                        },
                    ),
                    steps=modified_steps,
                )
            )

        return outputs

convert_to_data_sample(data_dict, source_info)

Convert PRM data to DataSample format

Expected input format: { "original_question": "...", "modified_question": "...", "original_process": ["step1", "step2", ...], "modified_process": ["step1", "step2", ...], "modified_steps": [5, 6], "error_steps": [5, 6], "reason": "...", "idx": "...", "question": "...", "classification": "confidence" }

Source code in rm_gallery/gallery/data/load/prmbench.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def convert_to_data_sample(
    self, data_dict: Dict[str, Any], source_info: Dict[str, Any]
) -> DataSample:
    """Convert PRM data to DataSample format

    Expected input format:
    {
        "original_question": "...",
        "modified_question": "...",
        "original_process": ["step1", "step2", ...],
        "modified_process": ["step1", "step2", ...],
        "modified_steps": [5, 6],
        "error_steps": [5, 6],
        "reason": "...",
        "idx": "...",
        "question": "...",
        "classification": "confidence"
    }
    """

    # Generate unique id from idx or question
    unique_id = data_dict.get(
        "idx", hashlib.md5(str(data_dict.get("question", "")).encode()).hexdigest()
    )

    try:
        # Create input from question
        data_input = self._create_prm_input(data_dict)

        # Create outputs from processes
        data_output = self._create_prm_output(data_dict)

        # Build metadata based on source type
        metadata = {
            "classification": data_dict.get("classification"),
            "modified_steps": data_dict.get("modified_steps", []),
            "error_steps": data_dict.get("error_steps", []),
            "reason": data_dict.get("reason"),
            "idx": data_dict.get("idx"),
            "original_process_length": len(data_dict.get("original_process", [])),
            "modified_process_length": len(data_dict.get("modified_process", [])),
            "load_strategy": "PRMBenchConverter",
        }

        # Add source-specific metadata
        if source_info.get("load_type") == "local":
            metadata.update(
                {
                    "source_file_path": source_info.get("source_file_path"),
                    "load_type": "local",
                }
            )
        elif source_info.get("load_type") == "huggingface":
            metadata.update(
                {
                    "dataset_name": source_info.get("dataset_name"),
                    "dataset_config": source_info.get("dataset_config"),
                    "split": source_info.get("split", "train"),
                    "load_type": "huggingface",
                }
            )

        # Create DataSample object
        data_sample = DataSample(
            unique_id=str(unique_id),
            input=data_input,
            output=data_output,
            source="prmbench",
            task_category=data_dict.get("classification", "reasoning"),
            metadata=metadata,
        )

        return data_sample

    except Exception as e:
        logger.error(f"Error creating DataSample from PRM data: {str(e)}")
        return None

PreciseIFListWiseReward

Bases: BaseHelpfulnessListWiseReward

Precise Instruction Following : Follows precise instructions, such as ‘Answer without the letter u’.

Source code in rm_gallery/gallery/rm/alignment/helpfulness/precise_if.py
20
21
22
23
24
25
26
27
@RewardRegistry.register("precise_if_listwise_reward")
class PreciseIFListWiseReward(BaseHelpfulnessListWiseReward):
    """Precise Instruction Following : Follows precise instructions, such as ‘Answer without the letter u’."""

    name: str = Field(default="precise_if_listwise_reward")
    desc: str = Field(default=DESC)
    scenario: str = Field(default=SCENARIO, description="assistant scenario")
    principles: List[str] = Field(default=PRINCIPLES)

PrivacyLeakageReward

Bases: BasePointWiseReward

Privacy information leakage detection for emails, phone numbers, ID cards, credit cards, and IP addresses.

This reward checks for potential privacy leaks in the generated content, including email addresses, phone numbers, ID numbers, credit card numbers, and IP addresses. Applies penalties for each detected leak.

Source code in rm_gallery/gallery/rm/format/format.py
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
@RewardRegistry.register("privacy_leakage")
class PrivacyLeakageReward(BasePointWiseReward):
    """
    Privacy information leakage detection for emails, phone numbers, ID cards, credit cards, and IP addresses.

    This reward checks for potential privacy leaks in the generated content,
    including email addresses, phone numbers, ID numbers, credit card numbers,
    and IP addresses. Applies penalties for each detected leak.
    """

    name: str = Field(
        default="privacy_leakage", description="Privacy leakage detection reward"
    )
    penalty_per_leak: float = Field(default=-0.5, description="Penalty per leak")

    def _detect_privacy_leaks(self, text: str) -> List[Dict[str, str]]:
        """Detect privacy information leaks"""
        leaks = []

        # Email addresses
        email_pattern = r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b"
        emails = re.findall(email_pattern, text)
        for email in emails:
            leaks.append({"type": "email", "value": email})

        # Phone numbers (simple pattern)
        phone_pattern = (
            r"\b(?:\+?1[-.\s]?)?\(?[0-9]{3}\)?[-.\s]?[0-9]{3}[-.\s]?[0-9]{4}\b"
        )
        phones = re.findall(phone_pattern, text)
        for phone in phones:
            leaks.append({"type": "phone", "value": phone})

        # ID numbers (China)
        id_pattern = r"\b[1-9]\d{5}(18|19|20)\d{2}(0[1-9]|1[0-2])(0[1-9]|[12]\d|3[01])\d{3}[0-9Xx]\b"
        ids = re.findall(id_pattern, text)
        for id_num in ids:
            leaks.append({"type": "id_card", "value": id_num})

        # Credit card numbers (simple detection)
        credit_card_pattern = r"\b(?:\d{4}[-\s]?){3}\d{4}\b"
        cards = re.findall(credit_card_pattern, text)
        for card in cards:
            leaks.append({"type": "credit_card", "value": card})

        # IP addresses
        ip_pattern = r"\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b"
        ips = re.findall(ip_pattern, text)
        for ip in ips:
            # Exclude common non-sensitive IPs (like localhost)
            if not ip.startswith(("127.", "192.168.", "10.", "172.")):
                leaks.append({"type": "ip_address", "value": ip})

        return leaks

    def _evaluate(
        self, sample: DataSample, **kwargs
    ) -> RewardResult[RewardDimensionWithScore]:
        """
        Detect privacy leaks.

        Args:
            sample: Data sample containing text content

        Returns:
            RewardResult: Reward result containing privacy leak penalty score
        """
        content = sample.output[0].answer.content

        leaks = self._detect_privacy_leaks(content)
        penalty = len(leaks) * self.penalty_per_leak

        leak_types = {}
        for leak in leaks:
            leak_type = leak["type"]
            if leak_type not in leak_types:
                leak_types[leak_type] = 0
            leak_types[leak_type] += 1

        if leaks:
            reason = f"Privacy leaks detected: {leak_types}, total penalty: {penalty}"
        else:
            reason = "No privacy leaks detected"

        return RewardResult(
            name=self.name,
            details=[
                RewardDimensionWithScore(name=self.name, score=penalty, reason=reason)
            ],
            extra_data={
                "leaks": leaks,
                "leak_types": leak_types,
                "total_leaks": len(leaks),
                "penalty": penalty,
            },
        )

RMBBenchmarkBestOfNConverter

Bases: DataConverter

Unified converter for conversation data with conversation_input, bon_best and loser_list responses

Source code in rm_gallery/gallery/data/load/rmbbenchmark_bestofn.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
@DataConverterRegistry.register("rmbbenchmark_bestofn")
class RMBBenchmarkBestOfNConverter(DataConverter):
    """
    Unified converter for conversation data with conversation_input, bon_best and loser_list responses
    """

    def convert_to_data_sample(
        self, data_dict: Dict[str, Any], source_info: Dict[str, Any]
    ) -> DataSample:
        """Convert conversation data to DataSample format"""
        # Generate unique id using bon_uid
        if "bon_uid" in data_dict:
            unique_id = str(data_dict["bon_uid"])
        else:
            # Use conversation_input content for generating hash
            conversation_input = data_dict.get("conversation_input", [])
            if (
                conversation_input
                and isinstance(conversation_input, list)
                and len(conversation_input) > 0
            ):
                content = str(conversation_input[0].get("content", ""))
            else:
                content = ""
            unique_id = hashlib.md5(content.encode()).hexdigest()

        # Create input from conversation_input
        data_input = self._create_conversation_input(data_dict)

        # Create outputs from bon_best and loser_list
        data_output = self._create_conversation_output(data_dict)

        try:
            # Build metadata based on source type
            metadata = {
                "raw_data": data_dict,
                "load_strategy": "RMBBenchmarkBestOfNConverter",
                "category_path": data_dict.get("category_path"),
                "bon_uid": data_dict.get("bon_uid"),
                "bon_best_model": data_dict.get("bon_best", {}).get("llm_name")
                if data_dict.get("bon_best")
                else None,
                "loser_models": [
                    item.get("llm_name")
                    for item in data_dict.get("loser_list", [])
                    if isinstance(item, dict)
                ],
                "num_losers": len(data_dict.get("loser_list", [])),
            }

            # Add source-specific metadata
            if source_info.get("load_type") == "local":
                metadata.update(
                    {
                        "source_file_path": source_info.get("source_file_path"),
                        "load_type": "local",
                    }
                )
            elif source_info.get("load_type") == "huggingface":
                metadata.update(
                    {
                        "dataset_name": source_info.get("dataset_name"),
                        "dataset_config": source_info.get("dataset_config"),
                        "split": source_info.get("split", "train"),
                        "load_type": "huggingface",
                    }
                )

            data_sample = DataSample(
                unique_id=unique_id,
                input=data_input,
                output=data_output,
                source="rewardbench",
                task_category="conversation",
                metadata=metadata,
            )

            return data_sample

        except Exception as e:
            logger.error(f"Error creating conversation DataSample: {str(e)}")
            return None

    def _create_conversation_input(
        self, data_dict: Dict[str, Any]
    ) -> list[ChatMessage]:
        """Create DataInput from conversation_input"""
        conversation_input = data_dict.get("conversation_input", [])
        if isinstance(conversation_input, list):
            history = []
            for message in conversation_input:
                if isinstance(message, dict):
                    role = message.get("role", "user")
                    content = message.get("content", "")
                    history.append(ChatMessage(role=role, content=content))
                else:
                    history.append(ChatMessage(role="user", content=str(message)))
            return history
        else:
            return [ChatMessage(role="user", content=str(conversation_input))]

    def _create_conversation_output(
        self, data_dict: Dict[str, Any]
    ) -> list[DataOutput]:
        """Create DataOutput list from bon_best and loser_list"""
        outputs = []

        # Handle bon_best
        if "bon_best" in data_dict:
            bon_best = data_dict["bon_best"]
            if isinstance(bon_best, dict):
                answer_content = bon_best.get("answer", "")
                llm_name = bon_best.get("llm_name", "unknown")
                outputs.append(
                    DataOutput(
                        answer=Step(
                            role="assistant",
                            content=str(answer_content),
                            label={
                                "preference": "chosen",
                                "model": llm_name,
                                "type": "bon_best",
                            },
                        ),
                    )
                )

        # Handle loser_list
        if "loser_list" in data_dict:
            loser_list = data_dict["loser_list"]
            if isinstance(loser_list, list):
                for loser in loser_list:
                    if isinstance(loser, dict):
                        answer_content = loser.get("answer", "")
                        llm_name = loser.get("llm_name", "unknown")
                        outputs.append(
                            DataOutput(
                                answer=Step(
                                    role="assistant",
                                    content=str(answer_content),
                                    label={
                                        "preference": "rejected",
                                        "model": llm_name,
                                        "type": "loser",
                                    },
                                ),
                            )
                        )

        return outputs

convert_to_data_sample(data_dict, source_info)

Convert conversation data to DataSample format

Source code in rm_gallery/gallery/data/load/rmbbenchmark_bestofn.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def convert_to_data_sample(
    self, data_dict: Dict[str, Any], source_info: Dict[str, Any]
) -> DataSample:
    """Convert conversation data to DataSample format"""
    # Generate unique id using bon_uid
    if "bon_uid" in data_dict:
        unique_id = str(data_dict["bon_uid"])
    else:
        # Use conversation_input content for generating hash
        conversation_input = data_dict.get("conversation_input", [])
        if (
            conversation_input
            and isinstance(conversation_input, list)
            and len(conversation_input) > 0
        ):
            content = str(conversation_input[0].get("content", ""))
        else:
            content = ""
        unique_id = hashlib.md5(content.encode()).hexdigest()

    # Create input from conversation_input
    data_input = self._create_conversation_input(data_dict)

    # Create outputs from bon_best and loser_list
    data_output = self._create_conversation_output(data_dict)

    try:
        # Build metadata based on source type
        metadata = {
            "raw_data": data_dict,
            "load_strategy": "RMBBenchmarkBestOfNConverter",
            "category_path": data_dict.get("category_path"),
            "bon_uid": data_dict.get("bon_uid"),
            "bon_best_model": data_dict.get("bon_best", {}).get("llm_name")
            if data_dict.get("bon_best")
            else None,
            "loser_models": [
                item.get("llm_name")
                for item in data_dict.get("loser_list", [])
                if isinstance(item, dict)
            ],
            "num_losers": len(data_dict.get("loser_list", [])),
        }

        # Add source-specific metadata
        if source_info.get("load_type") == "local":
            metadata.update(
                {
                    "source_file_path": source_info.get("source_file_path"),
                    "load_type": "local",
                }
            )
        elif source_info.get("load_type") == "huggingface":
            metadata.update(
                {
                    "dataset_name": source_info.get("dataset_name"),
                    "dataset_config": source_info.get("dataset_config"),
                    "split": source_info.get("split", "train"),
                    "load_type": "huggingface",
                }
            )

        data_sample = DataSample(
            unique_id=unique_id,
            input=data_input,
            output=data_output,
            source="rewardbench",
            task_category="conversation",
            metadata=metadata,
        )

        return data_sample

    except Exception as e:
        logger.error(f"Error creating conversation DataSample: {str(e)}")
        return None

RMBBenchmarkPairwiseConverter

Bases: DataConverter

Unified converter for conversation data with conversation_input, chosen and reject responses

Source code in rm_gallery/gallery/data/load/rmbbenchmark_pairwise.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
@DataConverterRegistry.register("rmbbenchmark_pairwise")
class RMBBenchmarkPairwiseConverter(DataConverter):
    """
    Unified converter for conversation data with conversation_input, chosen and reject responses
    """

    def convert_to_data_sample(
        self, data_dict: Dict[str, Any], source_info: Dict[str, Any]
    ) -> DataSample:
        """Convert conversation data to DataSample format"""
        # Generate unique id using pair_uid
        if "pair_uid" in data_dict:
            unique_id = str(data_dict["pair_uid"])
        else:
            # Use conversation_input content for generating hash
            conversation_input = data_dict.get("conversation_input", [])
            if (
                conversation_input
                and isinstance(conversation_input, list)
                and len(conversation_input) > 0
            ):
                content = str(conversation_input[0].get("content", ""))
            else:
                content = ""
            unique_id = hashlib.md5(content.encode()).hexdigest()

        # Create input from conversation_input
        data_input = self._create_conversation_input(data_dict)

        # Create outputs from chosen and reject
        data_output = self._create_conversation_output(data_dict)

        try:
            # Build metadata based on source type
            metadata = {
                "raw_data": data_dict,
                "load_strategy": "RMBBenchmarkPairwiseConverter",
                "category_path": data_dict.get("category_path"),
                "pair_uid": data_dict.get("pair_uid"),
                "chosen_model": data_dict.get("chosen", {}).get("llm_name")
                if data_dict.get("chosen")
                else None,
                "reject_model": data_dict.get("reject", {}).get("llm_name")
                if data_dict.get("reject")
                else None,
            }

            # Add source-specific metadata
            if source_info.get("load_type") == "local":
                metadata.update(
                    {
                        "source_file_path": source_info.get("source_file_path"),
                        "load_type": "local",
                    }
                )
            elif source_info.get("load_type") == "huggingface":
                metadata.update(
                    {
                        "dataset_name": source_info.get("dataset_name"),
                        "dataset_config": source_info.get("dataset_config"),
                        "split": source_info.get("split", "train"),
                        "load_type": "huggingface",
                    }
                )

            data_sample = DataSample(
                unique_id=unique_id,
                input=data_input,
                output=data_output,
                source="rewardbench",
                task_category="conversation",
                metadata=metadata,
            )

            return data_sample

        except Exception as e:
            logger.error(f"Error creating conversation DataSample: {str(e)}")
            return None

    def _create_conversation_input(
        self, data_dict: Dict[str, Any]
    ) -> list[ChatMessage]:
        """Create DataInput from conversation_input"""
        conversation_input = data_dict.get("conversation_input", [])
        if isinstance(conversation_input, list):
            history = []
            for message in conversation_input:
                if isinstance(message, dict):
                    role = message.get("role", "user")
                    content = message.get("content", "")
                    history.append(ChatMessage(role=role, content=content))
                else:
                    history.append(ChatMessage(role="user", content=str(message)))
            return history
        else:
            return [ChatMessage(role="user", content=str(conversation_input))]

    def _create_conversation_output(
        self, data_dict: Dict[str, Any]
    ) -> list[DataOutput]:
        """Create DataOutput list from chosen and reject"""
        outputs = []

        # Handle chosen
        if "chosen" in data_dict:
            chosen = data_dict["chosen"]
            if isinstance(chosen, dict):
                answer_content = chosen.get("answer", "")
                llm_name = chosen.get("llm_name", "unknown")
                outputs.append(
                    DataOutput(
                        answer=Step(
                            role="assistant",
                            content=str(answer_content),
                            label={
                                "preference": "chosen",
                                "model": llm_name,
                                "type": "chosen",
                            },
                        ),
                    )
                )

        # Handle reject
        if "reject" in data_dict:
            reject = data_dict["reject"]
            if isinstance(reject, dict):
                answer_content = reject.get("answer", "")
                llm_name = reject.get("llm_name", "unknown")
                outputs.append(
                    DataOutput(
                        answer=Step(
                            role="assistant",
                            content=str(answer_content),
                            label={
                                "preference": "rejected",
                                "model": llm_name,
                                "type": "reject",
                            },
                        ),
                    )
                )

        return outputs

convert_to_data_sample(data_dict, source_info)

Convert conversation data to DataSample format

Source code in rm_gallery/gallery/data/load/rmbbenchmark_pairwise.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def convert_to_data_sample(
    self, data_dict: Dict[str, Any], source_info: Dict[str, Any]
) -> DataSample:
    """Convert conversation data to DataSample format"""
    # Generate unique id using pair_uid
    if "pair_uid" in data_dict:
        unique_id = str(data_dict["pair_uid"])
    else:
        # Use conversation_input content for generating hash
        conversation_input = data_dict.get("conversation_input", [])
        if (
            conversation_input
            and isinstance(conversation_input, list)
            and len(conversation_input) > 0
        ):
            content = str(conversation_input[0].get("content", ""))
        else:
            content = ""
        unique_id = hashlib.md5(content.encode()).hexdigest()

    # Create input from conversation_input
    data_input = self._create_conversation_input(data_dict)

    # Create outputs from chosen and reject
    data_output = self._create_conversation_output(data_dict)

    try:
        # Build metadata based on source type
        metadata = {
            "raw_data": data_dict,
            "load_strategy": "RMBBenchmarkPairwiseConverter",
            "category_path": data_dict.get("category_path"),
            "pair_uid": data_dict.get("pair_uid"),
            "chosen_model": data_dict.get("chosen", {}).get("llm_name")
            if data_dict.get("chosen")
            else None,
            "reject_model": data_dict.get("reject", {}).get("llm_name")
            if data_dict.get("reject")
            else None,
        }

        # Add source-specific metadata
        if source_info.get("load_type") == "local":
            metadata.update(
                {
                    "source_file_path": source_info.get("source_file_path"),
                    "load_type": "local",
                }
            )
        elif source_info.get("load_type") == "huggingface":
            metadata.update(
                {
                    "dataset_name": source_info.get("dataset_name"),
                    "dataset_config": source_info.get("dataset_config"),
                    "split": source_info.get("split", "train"),
                    "load_type": "huggingface",
                }
            )

        data_sample = DataSample(
            unique_id=unique_id,
            input=data_input,
            output=data_output,
            source="rewardbench",
            task_category="conversation",
            metadata=metadata,
        )

        return data_sample

    except Exception as e:
        logger.error(f"Error creating conversation DataSample: {str(e)}")
        return None

ReasoningFormatReward

Bases: BasePointWiseReward

Check format reward for thinking format and answer format with proper tags.

This reward verifies if the generated content follows the required format with proper and tags.

Source code in rm_gallery/gallery/rm/format/format.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
@RewardRegistry.register("reasoning_format")
class ReasoningFormatReward(BasePointWiseReward):
    """
    Check format reward for thinking format and answer format with proper tags.

    This reward verifies if the generated content follows the required format
    with proper <think> and <answer> tags.
    """

    name: str = Field(default="format_reward", description="Reasoning Format reward")
    think_token: str = Field(default="think", description="Think tag name")
    answer_token: str = Field(default="answer", description="Answer tag name")

    def _evaluate(
        self, sample: DataSample, **kwargs
    ) -> RewardResult[RewardDimensionWithScore]:
        """
        Check format and calculate reward.

        Args:
            sample: Data sample containing generated content

        Returns:
            RewardResult: Reward result containing format score
        """
        content = sample.output[0].answer.content

        # Check thinking format tags
        think_pattern = f"<{self.think_token}>.*?</{self.think_token}>"
        has_think_tag = bool(re.search(think_pattern, content, re.DOTALL))

        # Check answer format tags
        answer_pattern = f"<{self.answer_token}>.*?</{self.answer_token}>"
        has_answer_tag = bool(re.search(answer_pattern, content, re.DOTALL))

        # Calculate reward
        reward = 1.0 if has_think_tag and has_answer_tag else 0.0
        reasons = []

        if not has_think_tag:
            reasons.append(f"Missing <{self.think_token}></{self.think_token}> tags")

        if not has_answer_tag:
            reasons.append(f"Missing <{self.answer_token}></{self.answer_token}> tags")

        if reward == 1.0:
            reasons.append("All format requirements met")

        return RewardResult(
            name=self.name,
            details=[
                RewardDimensionWithScore(
                    name=self.name, score=reward, reason="; ".join(reasons)
                )
            ],
            extra_data={
                "has_think_tag": has_think_tag,
                "has_answer_tag": has_answer_tag,
                "total_reward": reward,
                "think_token": self.think_token,
                "answer_token": self.answer_token,
            },
        )

ReasoningListWiseReward

Bases: BaseHelpfulnessListWiseReward

Reasoning: Involves processing and analyzing text to draw inferences, make predictions, or solve problems, requiring an understanding of underlying concepts and relationships within the text.

Source code in rm_gallery/gallery/rm/alignment/helpfulness/reasoning.py
15
16
17
18
19
20
21
22
@RewardRegistry.register("reasoning_listwise_reward")
class ReasoningListWiseReward(BaseHelpfulnessListWiseReward):
    """Reasoning: Involves processing and analyzing text to draw inferences, make predictions, or solve problems, requiring an understanding of underlying concepts and relationships within the text."""

    name: str = Field(default="reasoning_listwise_reward", description="reward name")
    scenario: str = Field(default=SCENARIO, description="assistant scenario")
    principles: List[str] = Field(default=PRINCIPLES)
    desc: str = Field(default=DESC)

ReasoningToolCallFormatReward

Bases: BasePointWiseReward

Check tool call format including think, answer and tool_call tags with JSON validation.

This reward verifies if the generated content follows the required format with proper , and tags, including JSON validation for tool calls.

Source code in rm_gallery/gallery/rm/format/format.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
@RewardRegistry.register("reasoning_tool_call_format")
class ReasoningToolCallFormatReward(BasePointWiseReward):
    """
    Check tool call format including think, answer and tool_call tags with JSON validation.

    This reward verifies if the generated content follows the required format
    with proper <think>, <answer> and <tool_call> tags, including JSON validation
    for tool calls.
    """

    name: str = Field(
        default="tool_call_format", description="Reasoning tool call format reward"
    )

    def _evaluate(
        self, sample: DataSample, **kwargs
    ) -> RewardResult[RewardDimensionWithScore]:
        """
        Check tool call format and calculate reward.

        Args:
            sample: Data sample containing generated content

        Returns:
            RewardResult: Reward result containing format score
        """
        content = sample.output[0].answer.content

        # Extract tag contents
        think_pattern = r"<think>(.*?)</think>"
        answer_pattern = r"<answer>(.*?)</answer>"
        tool_call_pattern = r"<tool_call>(.*?)</tool_call>"

        think_matches = re.search(think_pattern, content, re.DOTALL)
        answer_matches = re.search(answer_pattern, content, re.DOTALL)
        tool_call_matches = re.findall(tool_call_pattern, content, re.DOTALL)

        has_think_tag = think_matches is not None
        has_answer_tag = answer_matches is not None
        has_tool_call_tag = len(tool_call_matches) > 0

        valid_format = False
        valid_tool_call_json = False
        reasons = []

        if has_think_tag:
            # Case 1: <think></think> + <answer></answer>
            if has_answer_tag and not has_tool_call_tag:
                # Check overall format
                format_pattern = r"^\s*<think>.*?</think>\s*<answer>.*?</answer>\s*$"
                valid_format = bool(re.match(format_pattern, content, re.DOTALL))

                # Check tag occurrence count
                if valid_format:
                    valid_format = (
                        content.count("<think>") == 1
                        and content.count("</think>") == 1
                        and content.count("<answer>") == 1
                        and content.count("</answer>") == 1
                    )

                if valid_format:
                    reasons.append("Valid <think></think> + <answer></answer> format")
                else:
                    reasons.append("Invalid <think></think> + <answer></answer> format")

            # Case 2: <think></think> + <tool_call></tool_call>
            elif has_tool_call_tag and not has_answer_tag:
                # Check overall format
                format_pattern = (
                    r"^\s*<think>.*?</think>\s*(?:<tool_call>.*?</tool_call>\s*)+$"
                )
                valid_format = bool(re.match(format_pattern, content, re.DOTALL))

                # Check <think> tag occurrence count
                if valid_format:
                    valid_format = (
                        content.count("<think>") == 1 and content.count("</think>") == 1
                    )

                # Check if <tool_call> and </tool_call> tags appear in pairs
                if valid_format:
                    if content.count("<tool_call>") != content.count("</tool_call>"):
                        valid_format = False

                # Check for consecutive duplicate tags
                if valid_format:
                    if re.search(r"</tool_call>\s*</tool_call>", content) or re.search(
                        r"<tool_call>\s*<tool_call>", content
                    ):
                        valid_format = False

                # Check tool_call JSON format
                valid_tool_call_json = True
                tool_calls = []
                if valid_format:
                    for tool_call_content in tool_call_matches:
                        try:
                            tool_call_json = json.loads(tool_call_content.strip())
                            # Check if JSON contains required fields
                            if not (
                                "name" in tool_call_json
                                and "arguments" in tool_call_json
                            ):
                                valid_tool_call_json = False
                                break
                            tool_calls.append(
                                {
                                    "function": {
                                        "name": tool_call_json["name"],
                                        "arguments": json.dumps(
                                            tool_call_json["arguments"],
                                            ensure_ascii=False,
                                        ),
                                    }
                                }
                            )
                        except json.JSONDecodeError:
                            valid_tool_call_json = False
                            break

                valid_format = valid_format and valid_tool_call_json

                if valid_format:
                    reasons.append(
                        "Valid <think></think> + <tool_call></tool_call> format with valid JSON"
                    )
                else:
                    if not valid_tool_call_json:
                        reasons.append("Invalid JSON format in <tool_call> tags")
                    else:
                        reasons.append(
                            "Invalid <think></think> + <tool_call></tool_call> format"
                        )
            else:
                # Has both answer and tool_call, or neither
                reasons.append(
                    "Invalid combination: should have either <answer> or <tool_call> tags, not both or neither"
                )
        else:
            reasons.append("Missing <think></think> tags")

        # Calculate reward score
        reward = 1.0 if valid_format else 0.0

        return RewardResult(
            name=self.name,
            details=[
                RewardDimensionWithScore(
                    name=self.name, score=reward, reason="; ".join(reasons)
                )
            ],
            extra_data={
                "has_think_tag": has_think_tag,
                "has_answer_tag": has_answer_tag,
                "has_tool_call_tag": has_tool_call_tag,
                "valid_format": valid_format,
                "valid_tool_call_json": valid_tool_call_json,
                "tool_call_count": len(tool_call_matches),
                "reward": reward,
            },
        )

RewardBench2AnnotationTemplate

Bases: BaseAnnotationTemplate

Reward Bench 2 annotation template implementation for 4-way comparison

Source code in rm_gallery/gallery/data/annotation/rewardbench2.py
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
@AnnotationTemplateRegistry.register("rewardbench2")
class RewardBench2AnnotationTemplate(BaseAnnotationTemplate):
    """Reward Bench 2 annotation template implementation for 4-way comparison"""

    def __init__(self, name: str):
        super().__init__(name)

    @property
    def label_config(self) -> str:
        """Return the Label Studio XML configuration for reward bench 2 evaluation (4-way comparison)"""
        return """
<View>
  <!-- Sample Information -->
  <Header value="Sample Information"/>
  <Text name="unique_id" value="$unique_id" title="Unique ID"/>
  <Text name="source" value="$source" title="Source"/>
  <Text name="task_category" value="$task_category" title="task_category"/>
  <Text name="created_at" value="$created_at" title="Created At"/>
  <Text name="answer_count" value="$answer_count" title="Number of Answers"/>

  <!-- Input Messages -->
  <Header value="Input Messages"/>
  <Paragraphs name="input_dialogue" value="$input_messages" layout="dialogue" nameKey="role" textKey="content" />

  <!-- Output Responses -->
  <Header value="Output Responses"/>
  <Paragraphs name="output_dialogue" value="$output_messages" layout="dialogue" nameKey="role" textKey="content" />

  <!-- Step 1: Best Answer Selection -->
  <View>
    <Text name="step1_title" value="Step 1: Best Answer Selection" />
    <Text name="step1_desc1" value="Please select the best answer among the 4 options" />
    <Choices name="best_answer" toName="output_dialogue" choice="single" title="🏆 Best Answer">
      <Choice value="answer_1" showIf="$answer_count>=1"/>
      <Choice value="answer_2" showIf="$answer_count>=2"/>
      <Choice value="answer_3" showIf="$answer_count>=3"/>
      <Choice value="answer_4" showIf="$answer_count>=4"/>
      <Choice value="all_equal" showIf="$answer_count=4"/>
    </Choices>
  </View>

  <!-- Step 2: Answer Ranking -->
  <View>
    <Text name="step2_spacer" value="" />
    <Text name="step2_title" value="Step 2: Answer Ranking" />
    <Text name="step2_desc" value="Please rank all answers from best to worst (1=best, 4=worst)" />

    <Text name="answer1_rank_label" value="📝 Answer 1 Rank:" />
    <Choices name="answer1_rank" toName="output_dialogue" choice="single" title="Answer 1 Rank">
      <Choice value="1"/>
      <Choice value="2"/>
      <Choice value="3"/>
      <Choice value="4"/>
    </Choices>

    <Text name="answer2_rank_label" value="📝 Answer 2 Rank:" />
    <Choices name="answer2_rank" toName="output_dialogue" choice="single" title="Answer 2 Rank">
      <Choice value="1"/>
      <Choice value="2"/>
      <Choice value="3"/>
      <Choice value="4"/>
    </Choices>

    <Text name="answer3_rank_label" value="📝 Answer 3 Rank:" />
    <Choices name="answer3_rank" toName="output_dialogue" choice="single" title="Answer 3 Rank">
      <Choice value="1"/>
      <Choice value="2"/>
      <Choice value="3"/>
      <Choice value="4"/>
    </Choices>

    <Text name="answer4_rank_label" value="📝 Answer 4 Rank:" />
    <Choices name="answer4_rank" toName="output_dialogue" choice="single" title="Answer 4 Rank">
      <Choice value="1"/>
      <Choice value="2"/>
      <Choice value="3"/>
      <Choice value="4"/>
    </Choices>
  </View>

  <!-- Step 3: Answer Rating -->
  <View>
    <Text name="step3_spacer" value="" />
    <Text name="step3_title" value="Step 3: Answer Rating" />
    <Text name="step3_desc" value="Please rate the quality of each answer for the $task_category task_category (1-5 stars)" />

    <Text name="answer1_rating_label" value="📝 Answer 1 Rating:" />
    <Rating name="answer1_rating" toName="output_dialogue" maxRating="5" icon="star" size="medium" title="Answer 1 Quality Rating"/>

    <Text name="answer2_rating_label" value="📝 Answer 2 Rating:" />
    <Rating name="answer2_rating" toName="output_dialogue" maxRating="5" icon="star" size="medium" title="Answer 2 Quality Rating"/>

    <Text name="answer3_rating_label" value="📝 Answer 3 Rating:" />
    <Rating name="answer3_rating" toName="output_dialogue" maxRating="5" icon="star" size="medium" title="Answer 3 Quality Rating"/>

    <Text name="answer4_rating_label" value="📝 Answer 4 Rating:" />
    <Rating name="answer4_rating" toName="output_dialogue" maxRating="5" icon="star" size="medium" title="Answer 4 Quality Rating"/>

    <Text name="rating_criteria" value="💡 Rating Criteria: 5 stars = excellent, 4 stars = good, 3 stars = average, 2 stars = poor, 1 star = very poor" />
  </View>

  <!-- Step 4: Additional Comments -->
  <View>
    <Text name="step4_spacer" value="" />
    <Text name="step4_title" value="Step 4: Additional Comments" />
    <Text name="step4_desc" value="Please provide any additional comments or feedback" />
    <TextArea name="additional_comments" toName="output_dialogue" placeholder="[x] The x-th answer has the following issues..." title="Additional Comments"/>
  </View>

</View>
"""

    def process_annotations(self, annotation_data: Dict[str, Any]) -> Dict[str, Any]:
        """
        Process annotation data specific to reward bench 2 evaluation (4-way comparison)

        Args:
            annotation_data: Generic annotation data with ratings, choices, text_areas

        Returns:
            Processed data structured for reward bench 2 evaluation
        """
        processed = {
            "best_answer": None,
            "answer_rankings": {},
            "answer_ratings": {},
            "ranking_order": [],
            "quality_comparison": {},
            "comments": "",
            "preference": None,
        }

        # Extract best answer selection (Step 1)
        if "best_answer" in annotation_data.get("choices", {}):
            best_answer_choices = annotation_data["choices"]["best_answer"]["choices"]
            if best_answer_choices:
                processed["best_answer"] = best_answer_choices[0]
                processed["preference"] = best_answer_choices[0]

        # Extract answer rankings (Step 2)
        choices = annotation_data.get("choices", {})
        rank_keys = ["answer1_rank", "answer2_rank", "answer3_rank", "answer4_rank"]

        for i, rank_key in enumerate(rank_keys, 1):
            if rank_key in choices:
                rank_choices = choices[rank_key]["choices"]
                if rank_choices:
                    processed["answer_rankings"][f"answer_{i}"] = int(rank_choices[0])

        # Create ranking order based on ranks
        if processed["answer_rankings"]:
            # Sort answers by their rank (1=best, 4=worst)
            sorted_answers = sorted(
                processed["answer_rankings"].items(), key=lambda x: x[1]
            )
            processed["ranking_order"] = [answer for answer, rank in sorted_answers]

        # Extract answer ratings (Step 3)
        ratings = annotation_data.get("ratings", {})
        rating_keys = [
            "answer1_rating",
            "answer2_rating",
            "answer3_rating",
            "answer4_rating",
        ]

        for i, rating_key in enumerate(rating_keys, 1):
            if rating_key in ratings:
                processed["answer_ratings"][f"answer_{i}"] = ratings[rating_key][
                    "rating"
                ]

        # Calculate quality comparison
        if processed["answer_ratings"]:
            # Find the highest rated answer
            best_rated_answer = max(
                processed["answer_ratings"].items(), key=lambda x: x[1]
            )

            # Calculate average rating
            avg_rating = sum(processed["answer_ratings"].values()) / len(
                processed["answer_ratings"]
            )

            processed["quality_comparison"] = {
                "best_rated_answer": best_rated_answer[0],
                "best_rating": best_rated_answer[1],
                "average_rating": avg_rating,
                "rating_spread": max(processed["answer_ratings"].values())
                - min(processed["answer_ratings"].values()),
                "consistency_check": {
                    "best_answer_matches_best_rating": processed["best_answer"]
                    == best_rated_answer[0],
                    "best_answer_matches_rank_1": processed["best_answer"]
                    in [
                        answer
                        for answer, rank in processed["answer_rankings"].items()
                        if rank == 1
                    ]
                    if processed["answer_rankings"]
                    else False,
                },
            }

        # Extract additional comments (Step 4)
        if "additional_comments" in annotation_data.get("text_areas", {}):
            processed["comments"] = annotation_data["text_areas"][
                "additional_comments"
            ]["text"]

        return processed

    def validate_annotation_data(self, annotation_data: Dict[str, Any]) -> bool:
        """
        Validate annotation data for reward bench 2 evaluation

        Args:
            annotation_data: Annotation data to validate

        Returns:
            True if valid, False otherwise
        """
        # Check if required fields are present
        required_sections = ["choices", "ratings"]
        for section in required_sections:
            if section not in annotation_data:
                return False

        # Check if best answer is selected
        if "best_answer" not in annotation_data.get("choices", {}):
            return False

        # Check if at least some rankings are provided
        choices = annotation_data.get("choices", {})
        rank_keys = ["answer1_rank", "answer2_rank", "answer3_rank", "answer4_rank"]
        if not any(key in choices for key in rank_keys):
            return False

        # Check if at least some ratings are provided
        ratings = annotation_data.get("ratings", {})
        rating_keys = [
            "answer1_rating",
            "answer2_rating",
            "answer3_rating",
            "answer4_rating",
        ]
        if not any(key in ratings for key in rating_keys):
            return False

        # Validate ranking consistency (each rank should be unique)
        provided_ranks = []
        for rank_key in rank_keys:
            if rank_key in choices:
                rank_choices = choices[rank_key]["choices"]
                if rank_choices:
                    rank = int(rank_choices[0])
                    if rank in provided_ranks:
                        return False  # Duplicate rank
                    provided_ranks.append(rank)

        return True

label_config property

Return the Label Studio XML configuration for reward bench 2 evaluation (4-way comparison)

process_annotations(annotation_data)

Process annotation data specific to reward bench 2 evaluation (4-way comparison)

Parameters:

Name Type Description Default
annotation_data Dict[str, Any]

Generic annotation data with ratings, choices, text_areas

required

Returns:

Type Description
Dict[str, Any]

Processed data structured for reward bench 2 evaluation

Source code in rm_gallery/gallery/data/annotation/rewardbench2.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
def process_annotations(self, annotation_data: Dict[str, Any]) -> Dict[str, Any]:
    """
    Process annotation data specific to reward bench 2 evaluation (4-way comparison)

    Args:
        annotation_data: Generic annotation data with ratings, choices, text_areas

    Returns:
        Processed data structured for reward bench 2 evaluation
    """
    processed = {
        "best_answer": None,
        "answer_rankings": {},
        "answer_ratings": {},
        "ranking_order": [],
        "quality_comparison": {},
        "comments": "",
        "preference": None,
    }

    # Extract best answer selection (Step 1)
    if "best_answer" in annotation_data.get("choices", {}):
        best_answer_choices = annotation_data["choices"]["best_answer"]["choices"]
        if best_answer_choices:
            processed["best_answer"] = best_answer_choices[0]
            processed["preference"] = best_answer_choices[0]

    # Extract answer rankings (Step 2)
    choices = annotation_data.get("choices", {})
    rank_keys = ["answer1_rank", "answer2_rank", "answer3_rank", "answer4_rank"]

    for i, rank_key in enumerate(rank_keys, 1):
        if rank_key in choices:
            rank_choices = choices[rank_key]["choices"]
            if rank_choices:
                processed["answer_rankings"][f"answer_{i}"] = int(rank_choices[0])

    # Create ranking order based on ranks
    if processed["answer_rankings"]:
        # Sort answers by their rank (1=best, 4=worst)
        sorted_answers = sorted(
            processed["answer_rankings"].items(), key=lambda x: x[1]
        )
        processed["ranking_order"] = [answer for answer, rank in sorted_answers]

    # Extract answer ratings (Step 3)
    ratings = annotation_data.get("ratings", {})
    rating_keys = [
        "answer1_rating",
        "answer2_rating",
        "answer3_rating",
        "answer4_rating",
    ]

    for i, rating_key in enumerate(rating_keys, 1):
        if rating_key in ratings:
            processed["answer_ratings"][f"answer_{i}"] = ratings[rating_key][
                "rating"
            ]

    # Calculate quality comparison
    if processed["answer_ratings"]:
        # Find the highest rated answer
        best_rated_answer = max(
            processed["answer_ratings"].items(), key=lambda x: x[1]
        )

        # Calculate average rating
        avg_rating = sum(processed["answer_ratings"].values()) / len(
            processed["answer_ratings"]
        )

        processed["quality_comparison"] = {
            "best_rated_answer": best_rated_answer[0],
            "best_rating": best_rated_answer[1],
            "average_rating": avg_rating,
            "rating_spread": max(processed["answer_ratings"].values())
            - min(processed["answer_ratings"].values()),
            "consistency_check": {
                "best_answer_matches_best_rating": processed["best_answer"]
                == best_rated_answer[0],
                "best_answer_matches_rank_1": processed["best_answer"]
                in [
                    answer
                    for answer, rank in processed["answer_rankings"].items()
                    if rank == 1
                ]
                if processed["answer_rankings"]
                else False,
            },
        }

    # Extract additional comments (Step 4)
    if "additional_comments" in annotation_data.get("text_areas", {}):
        processed["comments"] = annotation_data["text_areas"][
            "additional_comments"
        ]["text"]

    return processed

validate_annotation_data(annotation_data)

Validate annotation data for reward bench 2 evaluation

Parameters:

Name Type Description Default
annotation_data Dict[str, Any]

Annotation data to validate

required

Returns:

Type Description
bool

True if valid, False otherwise

Source code in rm_gallery/gallery/data/annotation/rewardbench2.py
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
def validate_annotation_data(self, annotation_data: Dict[str, Any]) -> bool:
    """
    Validate annotation data for reward bench 2 evaluation

    Args:
        annotation_data: Annotation data to validate

    Returns:
        True if valid, False otherwise
    """
    # Check if required fields are present
    required_sections = ["choices", "ratings"]
    for section in required_sections:
        if section not in annotation_data:
            return False

    # Check if best answer is selected
    if "best_answer" not in annotation_data.get("choices", {}):
        return False

    # Check if at least some rankings are provided
    choices = annotation_data.get("choices", {})
    rank_keys = ["answer1_rank", "answer2_rank", "answer3_rank", "answer4_rank"]
    if not any(key in choices for key in rank_keys):
        return False

    # Check if at least some ratings are provided
    ratings = annotation_data.get("ratings", {})
    rating_keys = [
        "answer1_rating",
        "answer2_rating",
        "answer3_rating",
        "answer4_rating",
    ]
    if not any(key in ratings for key in rating_keys):
        return False

    # Validate ranking consistency (each rank should be unique)
    provided_ranks = []
    for rank_key in rank_keys:
        if rank_key in choices:
            rank_choices = choices[rank_key]["choices"]
            if rank_choices:
                rank = int(rank_choices[0])
                if rank in provided_ranks:
                    return False  # Duplicate rank
                provided_ranks.append(rank)

    return True

RewardBench2Converter

Bases: DataConverter

Unified converter for conversation data with prompt, chosen and rejected responses (version 2)

Source code in rm_gallery/gallery/data/load/rewardbench2.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
@DataConverterRegistry.register("rewardbench2")
class RewardBench2Converter(DataConverter):
    """
    Unified converter for conversation data with prompt, chosen and rejected responses (version 2)
    """

    def convert_to_data_sample(
        self, data_dict: Dict[str, Any], source_info: Dict[str, Any]
    ) -> DataSample:
        """Convert conversation data to DataSample format"""
        # generate unique id using id field if available, otherwise use prompt content
        if "id" in data_dict:
            unique_id = str(data_dict["id"])
        else:
            content = str(data_dict.get("prompt", ""))
            unique_id = hashlib.md5(content.encode()).hexdigest()

        # Create input from prompt
        data_input = self._create_conversation_input(data_dict)

        # Create outputs from chosen/rejected responses
        data_output = self._create_conversation_output(data_dict)

        try:
            # Build metadata based on source type
            metadata = {
                "raw_data": data_dict,
                "load_strategy": "RewardBench2Converter",
                "subset": data_dict.get("subset"),
                "num_correct": data_dict.get("num_correct"),
                "num_rejected": data_dict.get("num_rejected"),
                "total_completions": data_dict.get("total_completions"),
                "models": data_dict.get("models"),
            }

            # Add source-specific metadata
            if source_info.get("load_type") == "local":
                metadata.update(
                    {
                        "source_file_path": source_info.get("source_file_path"),
                        "load_type": "local",
                    }
                )
            elif source_info.get("load_type") == "huggingface":
                metadata.update(
                    {
                        "dataset_name": source_info.get("dataset_name"),
                        "dataset_config": source_info.get("dataset_config"),
                        "split": source_info.get("split", "train"),
                        "load_type": "huggingface",
                    }
                )

            data_sample = DataSample(
                unique_id=unique_id,
                input=data_input,
                output=data_output,
                source="rewardbench2",
                task_category="conversation",
                metadata=metadata,
            )

            return data_sample

        except Exception as e:
            logger.error(f"Error creating conversation DataSample: {str(e)}")
            return None

    def _create_conversation_input(
        self, data_dict: Dict[str, Any]
    ) -> list[ChatMessage]:
        """Create DataInput from conversation prompt"""
        prompt = data_dict.get("prompt", "")

        # Since prompt is now a string, create a single user message
        if isinstance(prompt, str):
            return [ChatMessage(role="user", content=prompt)]
        else:
            # Fallback for backwards compatibility
            history = []
            if isinstance(prompt, list):
                for turn in prompt:
                    if isinstance(turn, dict):
                        role = turn.get("role", "user")
                        content = turn.get("content", str(turn))
                        history.append(ChatMessage(role=role, content=content))
                    else:
                        history.append(ChatMessage(role="user", content=str(turn)))
            else:
                history.append(ChatMessage(role="user", content=str(prompt)))

            return history

    def _create_conversation_output(
        self, data_dict: Dict[str, Any]
    ) -> list[DataOutput]:
        """Create DataOutput list from conversation responses"""
        outputs = []

        # Handle chosen responses (now a list of strings)
        chosen_responses = data_dict.get("chosen", [])
        if isinstance(chosen_responses, list):
            for chosen_content in chosen_responses:
                outputs.append(
                    DataOutput(
                        answer=Step(
                            role="assistant",
                            content=str(chosen_content),
                            label={"preference": "chosen"},
                        ),
                    )
                )
        elif chosen_responses:  # Single chosen response (backwards compatibility)
            outputs.append(
                DataOutput(
                    answer=Step(
                        role="assistant",
                        content=str(chosen_responses),
                        label={"preference": "chosen"},
                    ),
                )
            )

        # Handle rejected responses (now a list of strings)
        rejected_responses = data_dict.get("rejected", [])
        if isinstance(rejected_responses, list):
            for rejected_content in rejected_responses:
                outputs.append(
                    DataOutput(
                        answer=Step(
                            role="assistant",
                            content=str(rejected_content),
                            label={"preference": "rejected"},
                        ),
                    )
                )
        elif rejected_responses:  # Single rejected response (backwards compatibility)
            outputs.append(
                DataOutput(
                    answer=Step(
                        role="assistant",
                        content=str(rejected_responses),
                        label={"preference": "rejected"},
                    ),
                )
            )

        return outputs

convert_to_data_sample(data_dict, source_info)

Convert conversation data to DataSample format

Source code in rm_gallery/gallery/data/load/rewardbench2.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def convert_to_data_sample(
    self, data_dict: Dict[str, Any], source_info: Dict[str, Any]
) -> DataSample:
    """Convert conversation data to DataSample format"""
    # generate unique id using id field if available, otherwise use prompt content
    if "id" in data_dict:
        unique_id = str(data_dict["id"])
    else:
        content = str(data_dict.get("prompt", ""))
        unique_id = hashlib.md5(content.encode()).hexdigest()

    # Create input from prompt
    data_input = self._create_conversation_input(data_dict)

    # Create outputs from chosen/rejected responses
    data_output = self._create_conversation_output(data_dict)

    try:
        # Build metadata based on source type
        metadata = {
            "raw_data": data_dict,
            "load_strategy": "RewardBench2Converter",
            "subset": data_dict.get("subset"),
            "num_correct": data_dict.get("num_correct"),
            "num_rejected": data_dict.get("num_rejected"),
            "total_completions": data_dict.get("total_completions"),
            "models": data_dict.get("models"),
        }

        # Add source-specific metadata
        if source_info.get("load_type") == "local":
            metadata.update(
                {
                    "source_file_path": source_info.get("source_file_path"),
                    "load_type": "local",
                }
            )
        elif source_info.get("load_type") == "huggingface":
            metadata.update(
                {
                    "dataset_name": source_info.get("dataset_name"),
                    "dataset_config": source_info.get("dataset_config"),
                    "split": source_info.get("split", "train"),
                    "load_type": "huggingface",
                }
            )

        data_sample = DataSample(
            unique_id=unique_id,
            input=data_input,
            output=data_output,
            source="rewardbench2",
            task_category="conversation",
            metadata=metadata,
        )

        return data_sample

    except Exception as e:
        logger.error(f"Error creating conversation DataSample: {str(e)}")
        return None

RewardBenchAnnotationTemplate

Bases: BaseAnnotationTemplate

Reward Bench annotation template implementation

Source code in rm_gallery/gallery/data/annotation/rewardbench.py
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
@AnnotationTemplateRegistry.register("rewardbench")
class RewardBenchAnnotationTemplate(BaseAnnotationTemplate):
    """Reward Bench annotation template implementation"""

    def __init__(self, name: str):
        super().__init__(name)

    @property
    def label_config(self) -> str:
        """Return the Label Studio XML configuration for reward bench evaluation"""
        return """
<View>
  <!-- Sample Information -->
  <Header value="Sample Information"/>
  <Text name="unique_id" value="$unique_id" title="Unique ID"/>
  <Text name="source" value="$source" title="Source"/>
  <Text name="task_category" value="$task_category" title="task_category"/>
  <Text name="created_at" value="$created_at" title="Created At"/>
  <Text name="answer_count" value="$answer_count" title="Number of Answers"/>

  <!-- Input Messages -->
  <Header value="Input Messages"/>
  <Paragraphs name="input_dialogue" value="$input_messages" layout="dialogue" nameKey="role" textKey="content" />

  <!-- Output Responses -->
  <Header value="Output Responses"/>
  <Paragraphs name="output_dialogue" value="$output_messages" layout="dialogue" nameKey="role" textKey="content" />

  <!-- Step 1: Ranking -->
  <View>
    <Text name="step1_title" value="Step 1: Answer Ranking" />
    <Text name="step1_desc1" value="Please select the most appropriate ranking relationship" />
    <Choices name="answer_ranking" toName="output_dialogue" choice="single" title="🏆 Answer Ranking">
      <Choice value="1>2" showIf="$answer_count=2"/>
      <Choice value="2>1" showIf="$answer_count=2"/>
      <Choice value="Neither" showIf="$answer_count=2"/>
      <Choice value="All answers are of equal quality"/>
    </Choices>
  </View>

  <!-- Step 2: Answer Rating -->
  <View>
    <Text name="step2_spacer" value="" />
    <Text name="step2_title" value="Step 2: Answer Rating" />
    <Text name="step2_desc" value="Please rate the quality of the answers for the $task_category task_category (1-5 stars)" />

    <Text name="answer1_label" value="📝 Answer 1 Rating:" />
    <Rating name="answer1_rating" toName="output_dialogue" maxRating="5" icon="star" size="medium" title="Answer 1 Quality Rating"/>

    <Text name="answer2_label" value="📝 Answer 2 Rating:" />
    <Rating name="answer2_rating" toName="output_dialogue" maxRating="5" icon="star" size="medium" title="Answer 2 Quality Rating"/>

    <Text name="rating_criteria" value="💡 Rating Criteria: 5 stars = excellent, 4 stars = good, 3 stars = average, 2 stars = poor, 1 star = very poor" />
  </View>

  <!-- Step 3: Additional Comments -->
  <View>
    <Text name="step3_spacer" value="" />
    <Text name="step3_title" value="Step 3: Additional Comments" />
    <Text name="step3_desc" value="Please provide any additional comments or feedback" />
    <TextArea name="additional_comments" toName="output_dialogue" placeholder="[x] The x-th answer has the following issues..." title="Additional Comments"/>
  </View>

</View>
"""

    def process_annotations(self, annotation_data: Dict[str, Any]) -> Dict[str, Any]:
        """
        Process annotation data specific to reward bench evaluation

        Args:
            annotation_data: Generic annotation data with ratings, choices, text_areas

        Returns:
            Processed data structured for reward bench evaluation
        """
        processed = {
            "ranking_result": None,
            "answer_ratings": {},
            "quality_comparison": {},
            "comments": "",
            "preference": None,
        }

        # Extract answer ranking (Step 1)
        if "answer_ranking" in annotation_data.get("choices", {}):
            ranking_choices = annotation_data["choices"]["answer_ranking"]["choices"]
            if ranking_choices:
                processed["ranking_result"] = ranking_choices[0]

                # Determine preference based on ranking
                if "1>2" in ranking_choices[0]:
                    processed["preference"] = "answer_1"
                elif "2>1" in ranking_choices[0]:
                    processed["preference"] = "answer_2"
                elif "Neither" in ranking_choices[0]:
                    processed["preference"] = "neither"
                else:
                    processed["preference"] = "tie"

        # Extract answer ratings (Step 2)
        ratings = annotation_data.get("ratings", {})

        if "answer1_rating" in ratings:
            processed["answer_ratings"]["answer_1"] = ratings["answer1_rating"][
                "rating"
            ]

        if "answer2_rating" in ratings:
            processed["answer_ratings"]["answer_2"] = ratings["answer2_rating"][
                "rating"
            ]

        # Calculate quality comparison
        if len(processed["answer_ratings"]) == 2:
            rating1 = processed["answer_ratings"]["answer_1"]
            rating2 = processed["answer_ratings"]["answer_2"]

            processed["quality_comparison"] = {
                "rating_difference": rating1 - rating2,
                "better_answer": "answer_1"
                if rating1 > rating2
                else "answer_2"
                if rating2 > rating1
                else "tie",
                "rating_consistency": processed["preference"]
                == processed["quality_comparison"].get("better_answer", "unknown"),
            }

        # Extract additional comments (Step 3)
        if "additional_comments" in annotation_data.get("text_areas", {}):
            processed["comments"] = annotation_data["text_areas"][
                "additional_comments"
            ]["text"]

        return processed

    def validate_annotation_data(self, annotation_data: Dict[str, Any]) -> bool:
        """
        Validate annotation data for reward bench evaluation

        Args:
            annotation_data: Annotation data to validate

        Returns:
            True if valid, False otherwise
        """
        # Check if required fields are present
        required_sections = ["choices", "ratings"]
        for section in required_sections:
            if section not in annotation_data:
                return False

        # Check if answer ranking is provided
        if "answer_ranking" not in annotation_data.get("choices", {}):
            return False

        # Check if at least one rating is provided
        ratings = annotation_data.get("ratings", {})
        if not any(key in ratings for key in ["answer1_rating", "answer2_rating"]):
            return False

        return True

label_config property

Return the Label Studio XML configuration for reward bench evaluation

process_annotations(annotation_data)

Process annotation data specific to reward bench evaluation

Parameters:

Name Type Description Default
annotation_data Dict[str, Any]

Generic annotation data with ratings, choices, text_areas

required

Returns:

Type Description
Dict[str, Any]

Processed data structured for reward bench evaluation

Source code in rm_gallery/gallery/data/annotation/rewardbench.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
def process_annotations(self, annotation_data: Dict[str, Any]) -> Dict[str, Any]:
    """
    Process annotation data specific to reward bench evaluation

    Args:
        annotation_data: Generic annotation data with ratings, choices, text_areas

    Returns:
        Processed data structured for reward bench evaluation
    """
    processed = {
        "ranking_result": None,
        "answer_ratings": {},
        "quality_comparison": {},
        "comments": "",
        "preference": None,
    }

    # Extract answer ranking (Step 1)
    if "answer_ranking" in annotation_data.get("choices", {}):
        ranking_choices = annotation_data["choices"]["answer_ranking"]["choices"]
        if ranking_choices:
            processed["ranking_result"] = ranking_choices[0]

            # Determine preference based on ranking
            if "1>2" in ranking_choices[0]:
                processed["preference"] = "answer_1"
            elif "2>1" in ranking_choices[0]:
                processed["preference"] = "answer_2"
            elif "Neither" in ranking_choices[0]:
                processed["preference"] = "neither"
            else:
                processed["preference"] = "tie"

    # Extract answer ratings (Step 2)
    ratings = annotation_data.get("ratings", {})

    if "answer1_rating" in ratings:
        processed["answer_ratings"]["answer_1"] = ratings["answer1_rating"][
            "rating"
        ]

    if "answer2_rating" in ratings:
        processed["answer_ratings"]["answer_2"] = ratings["answer2_rating"][
            "rating"
        ]

    # Calculate quality comparison
    if len(processed["answer_ratings"]) == 2:
        rating1 = processed["answer_ratings"]["answer_1"]
        rating2 = processed["answer_ratings"]["answer_2"]

        processed["quality_comparison"] = {
            "rating_difference": rating1 - rating2,
            "better_answer": "answer_1"
            if rating1 > rating2
            else "answer_2"
            if rating2 > rating1
            else "tie",
            "rating_consistency": processed["preference"]
            == processed["quality_comparison"].get("better_answer", "unknown"),
        }

    # Extract additional comments (Step 3)
    if "additional_comments" in annotation_data.get("text_areas", {}):
        processed["comments"] = annotation_data["text_areas"][
            "additional_comments"
        ]["text"]

    return processed

validate_annotation_data(annotation_data)

Validate annotation data for reward bench evaluation

Parameters:

Name Type Description Default
annotation_data Dict[str, Any]

Annotation data to validate

required

Returns:

Type Description
bool

True if valid, False otherwise

Source code in rm_gallery/gallery/data/annotation/rewardbench.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
def validate_annotation_data(self, annotation_data: Dict[str, Any]) -> bool:
    """
    Validate annotation data for reward bench evaluation

    Args:
        annotation_data: Annotation data to validate

    Returns:
        True if valid, False otherwise
    """
    # Check if required fields are present
    required_sections = ["choices", "ratings"]
    for section in required_sections:
        if section not in annotation_data:
            return False

    # Check if answer ranking is provided
    if "answer_ranking" not in annotation_data.get("choices", {}):
        return False

    # Check if at least one rating is provided
    ratings = annotation_data.get("ratings", {})
    if not any(key in ratings for key in ["answer1_rating", "answer2_rating"]):
        return False

    return True

RewardBenchConverter

Bases: DataConverter

Unified converter for conversation data with prompt, chosen and rejected responses

Source code in rm_gallery/gallery/data/load/rewardbench.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
@DataConverterRegistry.register("rewardbench")
class RewardBenchConverter(DataConverter):
    """
    Unified converter for conversation data with prompt, chosen and rejected responses
    """

    def convert_to_data_sample(
        self, data_dict: Dict[str, Any], source_info: Dict[str, Any]
    ) -> DataSample:
        """Convert conversation data to DataSample format"""
        # generate unique id
        content = str(data_dict.get("prompt", []))
        unique_id = hashlib.md5(content.encode()).hexdigest()

        # Create input from prompt
        data_input = self._create_conversation_input(data_dict)

        # Create outputs from chosen/rejected responses
        data_output = self._create_conversation_output(data_dict)

        try:
            # Build metadata based on source type
            metadata = {
                "raw_data": data_dict,
                "load_strategy": "RewardBenchConverter",
            }

            # Add source-specific metadata
            if source_info.get("load_type") == "local":
                metadata.update(
                    {
                        "source_file_path": source_info.get("source_file_path"),
                        "load_type": "local",
                    }
                )
            elif source_info.get("load_type") == "huggingface":
                metadata.update(
                    {
                        "dataset_name": source_info.get("dataset_name"),
                        "dataset_config": source_info.get("dataset_config"),
                        "split": source_info.get("split", "train"),
                        "load_type": "huggingface",
                    }
                )

            data_sample = DataSample(
                unique_id=unique_id,
                input=data_input,
                output=data_output,
                source="rewardbench",
                task_category="conversation",
                metadata=metadata,
            )

            return data_sample

        except Exception as e:
            logger.error(f"Error creating conversation DataSample: {str(e)}")
            return None

    def _create_conversation_input(
        self, data_dict: Dict[str, Any]
    ) -> list[ChatMessage]:
        """Create DataInput from conversation prompt"""
        history = []
        prompt = data_dict.get("prompt")

        # Convert single-turn conversation to list format
        if isinstance(prompt, dict):
            prompt = [prompt]

        if isinstance(prompt, list):
            for turn in prompt:
                if isinstance(turn, dict):
                    role = turn.get("role", "user")
                    content = turn.get("content", str(turn))
                    history.append(ChatMessage(role=role, content=content))
                else:
                    history.append(ChatMessage(role="user", content=str(turn)))
        elif isinstance(prompt, str):
            history.append(ChatMessage(role="user", content=prompt))

        return history

    def _create_conversation_output(
        self, data_dict: Dict[str, Any]
    ) -> list[DataOutput]:
        """Create DataOutput list from conversation responses"""
        outputs = []

        # Handle chosen response
        if "chosen" in data_dict:
            chosen_content = data_dict["chosen"]
            if isinstance(chosen_content, list):
                # Multi-turn chosen response
                for turn in chosen_content:
                    if isinstance(turn, dict):
                        content = turn.get("content", str(turn))
                    else:
                        content = str(turn)
                    outputs.append(
                        DataOutput(
                            answer=Step(
                                role="assistant",
                                content=content,
                                label={"preference": "chosen"},
                            ),
                        )
                    )
            else:
                outputs.append(
                    DataOutput(
                        answer=Step(
                            role="assistant",
                            content=str(chosen_content),
                            label={"preference": "chosen"},
                        ),
                    )
                )

        # Handle rejected response
        if "rejected" in data_dict:
            rejected_content = data_dict["rejected"]
            if isinstance(rejected_content, list):
                # Multi-turn rejected response
                for turn in rejected_content:
                    if isinstance(turn, dict):
                        content = turn.get("content", str(turn))
                    else:
                        content = str(turn)
                    outputs.append(
                        DataOutput(
                            answer=Step(
                                role="assistant",
                                content=content,
                                label={"preference": "rejected"},
                            ),
                        )
                    )
            else:
                outputs.append(
                    DataOutput(
                        answer=Step(
                            role="assistant",
                            content=str(rejected_content),
                            label={"preference": "rejected"},
                        ),
                    )
                )

        return outputs

convert_to_data_sample(data_dict, source_info)

Convert conversation data to DataSample format

Source code in rm_gallery/gallery/data/load/rewardbench.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def convert_to_data_sample(
    self, data_dict: Dict[str, Any], source_info: Dict[str, Any]
) -> DataSample:
    """Convert conversation data to DataSample format"""
    # generate unique id
    content = str(data_dict.get("prompt", []))
    unique_id = hashlib.md5(content.encode()).hexdigest()

    # Create input from prompt
    data_input = self._create_conversation_input(data_dict)

    # Create outputs from chosen/rejected responses
    data_output = self._create_conversation_output(data_dict)

    try:
        # Build metadata based on source type
        metadata = {
            "raw_data": data_dict,
            "load_strategy": "RewardBenchConverter",
        }

        # Add source-specific metadata
        if source_info.get("load_type") == "local":
            metadata.update(
                {
                    "source_file_path": source_info.get("source_file_path"),
                    "load_type": "local",
                }
            )
        elif source_info.get("load_type") == "huggingface":
            metadata.update(
                {
                    "dataset_name": source_info.get("dataset_name"),
                    "dataset_config": source_info.get("dataset_config"),
                    "split": source_info.get("split", "train"),
                    "load_type": "huggingface",
                }
            )

        data_sample = DataSample(
            unique_id=unique_id,
            input=data_input,
            output=data_output,
            source="rewardbench",
            task_category="conversation",
            metadata=metadata,
        )

        return data_sample

    except Exception as e:
        logger.error(f"Error creating conversation DataSample: {str(e)}")
        return None

RewardDimensionWithScore

Bases: RewardDimension

Pointwise/Stepwise reward dimension with a numerical score.

Attributes:

Name Type Description
score float

Numerical value representing the reward magnitude

Source code in rm_gallery/core/reward/schema.py
20
21
22
23
24
25
26
27
28
class RewardDimensionWithScore(RewardDimension):
    """
    Pointwise/Stepwise reward dimension with a numerical score.

    Attributes:
        score (float): Numerical value representing the reward magnitude
    """

    score: float = Field(default=..., description="score")

RewardRegistry

A registry management system for reward modules that maps module names to their corresponding implementation classes.

This class provides a centralized repository for registering and retrieving reward modules by string identifiers. Modules can be registered using decorators and later accessed by their string identifiers.

Attributes:

Name Type Description
_registry Dict[str, Type[BaseReward]]

Internal dictionary storing the mapping between reward module names and their classes.

Source code in rm_gallery/core/reward/registry.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
class RewardRegistry:
    """A registry management system for reward modules that maps module names to their corresponding implementation classes.

    This class provides a centralized repository for registering and retrieving reward modules by string identifiers.
    Modules can be registered using decorators and later accessed by their string identifiers.

    Attributes:
        _registry: Internal dictionary storing the mapping between reward module names and their classes.
    """

    # Dictionary mapping reward module names to their corresponding classes
    _registry: Dict[str, Type[BaseReward]] = {}

    @classmethod
    def register(cls, name: str):
        """Create a decorator to register a reward module class with a specified identifier.

        The decorator pattern allows classes to be registered while maintaining their original identity.

        Args:
            name: Unique string identifier for the reward module
            module: The BaseReward subclass to be registered

        Returns:
            A decorator function that registers the module when applied to a class
        """

        def _register(module: Type[BaseReward]):
            """Internal registration function that stores the module in the registry.

            Args:
                module: The BaseReward subclass to be registered

            Returns:
                The original module class (unchanged)
            """
            cls._registry[name] = module
            return module

        return _register

    @classmethod
    def get(cls, name: str) -> Type[BaseReward] | None:
        """Retrieve a registered reward module class by its identifier.

        Provides safe access to registered modules without raising errors for missing entries.

        Args:
            name: String identifier of the reward module to retrieve

        Returns:
            The corresponding BaseReward subclass if found, None otherwise
        """
        assert name in cls._registry, f"Reward module '{name}' not found"
        return cls._registry.get(name, None)

    @classmethod
    def list(cls) -> str:
        """
        Returns:
            A list of all registered reward modules
        """
        info = []
        for name, module in cls._registry.items():
            info.append(
                pd.Series(
                    {
                        "Name": name,
                        "Class": module.__name__,
                        "Scenario": module.__doc__.strip(),
                    }
                )
            )

        info_df = pd.concat(info, axis=1).T
        # info_str = info_df.to_markdown(index=False)
        info_str = tabulate(
            info_df,
            headers="keys",
            tablefmt="grid",
            maxcolwidths=[50] * (len(info_df.columns) + 1),
            # showindex=False,
        )
        # info_str = tabulate(info_df, headers='keys', tablefmt='github')
        return info_str

get(name) classmethod

Retrieve a registered reward module class by its identifier.

Provides safe access to registered modules without raising errors for missing entries.

Parameters:

Name Type Description Default
name str

String identifier of the reward module to retrieve

required

Returns:

Type Description
Type[BaseReward] | None

The corresponding BaseReward subclass if found, None otherwise

Source code in rm_gallery/core/reward/registry.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
@classmethod
def get(cls, name: str) -> Type[BaseReward] | None:
    """Retrieve a registered reward module class by its identifier.

    Provides safe access to registered modules without raising errors for missing entries.

    Args:
        name: String identifier of the reward module to retrieve

    Returns:
        The corresponding BaseReward subclass if found, None otherwise
    """
    assert name in cls._registry, f"Reward module '{name}' not found"
    return cls._registry.get(name, None)

list() classmethod

Returns:

Type Description
str

A list of all registered reward modules

Source code in rm_gallery/core/reward/registry.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
@classmethod
def list(cls) -> str:
    """
    Returns:
        A list of all registered reward modules
    """
    info = []
    for name, module in cls._registry.items():
        info.append(
            pd.Series(
                {
                    "Name": name,
                    "Class": module.__name__,
                    "Scenario": module.__doc__.strip(),
                }
            )
        )

    info_df = pd.concat(info, axis=1).T
    # info_str = info_df.to_markdown(index=False)
    info_str = tabulate(
        info_df,
        headers="keys",
        tablefmt="grid",
        maxcolwidths=[50] * (len(info_df.columns) + 1),
        # showindex=False,
    )
    # info_str = tabulate(info_df, headers='keys', tablefmt='github')
    return info_str

register(name) classmethod

Create a decorator to register a reward module class with a specified identifier.

The decorator pattern allows classes to be registered while maintaining their original identity.

Parameters:

Name Type Description Default
name str

Unique string identifier for the reward module

required
module

The BaseReward subclass to be registered

required

Returns:

Type Description

A decorator function that registers the module when applied to a class

Source code in rm_gallery/core/reward/registry.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@classmethod
def register(cls, name: str):
    """Create a decorator to register a reward module class with a specified identifier.

    The decorator pattern allows classes to be registered while maintaining their original identity.

    Args:
        name: Unique string identifier for the reward module
        module: The BaseReward subclass to be registered

    Returns:
        A decorator function that registers the module when applied to a class
    """

    def _register(module: Type[BaseReward]):
        """Internal registration function that stores the module in the registry.

        Args:
            module: The BaseReward subclass to be registered

        Returns:
            The original module class (unchanged)
        """
        cls._registry[name] = module
        return module

    return _register

RewardResult

Bases: BaseModel, Generic[T]

Container for reward calculation results with generic type support.

Attributes:

Name Type Description
name str

Identifier of the reward module that generated this result

details List[T]

Collection of detailed reward information items

extra_data dict

Additional metadata or context information

Source code in rm_gallery/core/reward/schema.py
65
66
67
68
69
70
71
72
73
74
75
76
77
class RewardResult(BaseModel, Generic[T]):
    """
    Container for reward calculation results with generic type support.

    Attributes:
        name (str): Identifier of the reward module that generated this result
        details (List[T]): Collection of detailed reward information items
        extra_data (dict): Additional metadata or context information
    """

    name: str = Field(default=..., description="reward module name")
    details: List[T] = Field(default_factory=list, description="reward details")
    extra_data: dict = Field(default_factory=dict, description="extra data")

RewriteListWiseReward

Bases: BaseHelpfulnessListWiseReward

Rewrite: the assistant aims to modifies existing text to alter its style while preserving the original information and intent.

Source code in rm_gallery/gallery/rm/alignment/helpfulness/rewrite.py
17
18
19
20
21
22
23
24
@RewardRegistry.register("rewrite_listwise_reward")
class RewriteListWiseReward(BaseHelpfulnessListWiseReward):
    """Rewrite: the assistant aims to modifies existing text to alter its style while preserving the original information and intent."""

    name: str = Field(default="rewrite_listwise_reward", description="reward name")
    scenario: str = Field(default=SCENARIO, description="assistant scenario")
    principles: List[str] = Field(default=PRINCIPLES)
    desc: str = Field(default=DESC)

RolePlayingListWiseReward

Bases: BaseHelpfulnessListWiseReward

Role Playing: Entails adopting specific characters or personas within text-based scenarios, engaging in dialogues or actions that reflect the assigned roles.

Source code in rm_gallery/gallery/rm/alignment/helpfulness/role_playing.py
25
26
27
28
29
30
31
32
@RewardRegistry.register("role_playing_listwise_reward")
class RolePlayingListWiseReward(BaseHelpfulnessListWiseReward):
    """Role Playing: Entails adopting specific characters or personas within text-based scenarios, engaging in dialogues or actions that reflect the assigned roles."""

    name: str = Field(default="role_playing_listwise_reward")
    scenario: str = Field(default=SCENARIO, description="assistant scenario")
    principles: List[str] = Field(default=PRINCIPLES)
    desc: str = Field(default=DESC, description="task description")

SafetyListWiseReward

Bases: BaseHarmlessnessListWiseReward

Safety: Comply with or refuse prompts related to harmful use cases as well as general compliance behaviors.

Source code in rm_gallery/gallery/rm/alignment/harmlessness/safety.py
19
20
21
22
23
24
25
26
@RewardRegistry.register("safety_listwise_reward")
class SafetyListWiseReward(BaseHarmlessnessListWiseReward):
    """Safety: Comply with or refuse prompts related to harmful use cases as well as general compliance behaviors."""

    name: str = Field(default="safety_listwise_reward")
    desc: str = Field(default=DESC)
    scenario: str = Field(default=SCENARIO, description="assistant scenario")
    principles: List[str] = Field(default=PRINCIPLES)

SummarizationListWiseReward

Bases: BaseHelpfulnessListWiseReward

Summarization: The text is compressed into a short form, retaining the main information, which is divided into extraction (directly selected from the original text) and production (rewriting the information).

Source code in rm_gallery/gallery/rm/alignment/helpfulness/summarization.py
23
24
25
26
27
28
29
30
31
32
@RewardRegistry.register("summarization_listwise_reward")
class SummarizationListWiseReward(BaseHelpfulnessListWiseReward):
    """Summarization: The text is compressed into a short form, retaining the main information, which is divided into extraction (directly selected from the original text) and production (rewriting the information)."""

    name: str = Field(
        default="summarization_listwise_reward", description="reward name"
    )
    scenario: str = Field(default=SCENARIO, description="assistant scenario")
    principles: List[str] = Field(default=PRINCIPLES)
    desc: str = Field(default=DESC, description="task description")

TranslationListWiseReward

Bases: BaseHelpfulnessListWiseReward

Translation: Converting text from one language to another.

Source code in rm_gallery/gallery/rm/alignment/helpfulness/translation.py
21
22
23
24
25
26
27
28
@RewardRegistry.register("translation_listwise_reward")
class TranslationListWiseReward(BaseHelpfulnessListWiseReward):
    """Translation: Converting text from one language to another."""

    name: str = Field(default="translation_listwise_reward", description="reward name")
    scenario: str = Field(default=SCENARIO, description="assistant scenario")
    principles: List[str] = Field(default=PRINCIPLES)
    desc: str = Field(default=DESC, description="task description")

get_tokenizer(tokenizer_type='tiktoken', encoding_name='cl100k_base', chinese_only=False, **kwargs)

Factory function to create tokenizer instances.

Parameters:

Name Type Description Default
tokenizer_type str

Type of tokenizer ("tiktoken", "jieba", "simple")

'tiktoken'
encoding_name str

Tiktoken encoding name (for tiktoken tokenizer)

'cl100k_base'
chinese_only bool

Whether to keep only Chinese characters (for jieba tokenizer)

False
**kwargs

Additional arguments for tokenizer initialization

{}

Returns:

Name Type Description
BaseTokenizer BaseTokenizer

Tokenizer instance

Raises:

Type Description
ValueError

If tokenizer_type is not supported

Source code in rm_gallery/core/utils/tokenizer.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
def get_tokenizer(
    tokenizer_type: str = "tiktoken",
    encoding_name: str = "cl100k_base",
    chinese_only: bool = False,
    **kwargs,
) -> BaseTokenizer:
    """
    Factory function to create tokenizer instances.

    Args:
        tokenizer_type: Type of tokenizer ("tiktoken", "jieba", "simple")
        encoding_name: Tiktoken encoding name (for tiktoken tokenizer)
        chinese_only: Whether to keep only Chinese characters (for jieba tokenizer)
        **kwargs: Additional arguments for tokenizer initialization

    Returns:
        BaseTokenizer: Tokenizer instance

    Raises:
        ValueError: If tokenizer_type is not supported
    """
    if tokenizer_type == "tiktoken":
        return TiktokenTokenizer(encoding_name=encoding_name, **kwargs)
    elif tokenizer_type == "jieba":
        return JiebaTokenizer(chinese_only=chinese_only, **kwargs)
    elif tokenizer_type == "simple":
        return SimpleTokenizer(**kwargs)
    else:
        raise ValueError(
            f"Unsupported tokenizer type: {tokenizer_type}. "
            f"Supported types: tiktoken, jieba, simple"
        )