Skip to content

rm_gallery

AccuracyReward

Bases: BasePointWiseReward

Calculate accuracy (exact match rate) between generated content and reference answer.

This reward evaluates if the generated content matches the reference answer exactly. A score of 1.0 indicates an exact match, while 0.0 indicates no match.

Source code in rm_gallery/gallery/rm/general.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@RewardRegistry.register("accuracy")
class AccuracyReward(BasePointWiseReward):
    """
    Calculate accuracy (exact match rate) between generated content and reference answer.

    This reward evaluates if the generated content matches the reference answer exactly.
    A score of 1.0 indicates an exact match, while 0.0 indicates no match.
    """

    name: str = Field(default="accuracy", description="Accuracy reward")

    def _evaluate(
        self, sample: DataSample, **kwargs
    ) -> RewardResult[RewardDimensionWithScore]:
        """
        Calculate accuracy score.

        Args:
            sample: Data sample containing generated content and reference answer

        Returns:
            RewardResult: Reward result containing accuracy score
        """
        generated = sample.output[0].answer.content.strip()
        reference = sample.output[0].answer.label.get("reference", "").strip()

        # Calculate accuracy (1.0 for exact match, 0.0 otherwise)
        accuracy = 1.0 if generated == reference else 0.0

        return RewardResult(
            name=self.name,
            details=[
                RewardDimensionWithScore(
                    name=self.name,
                    score=accuracy,
                    reason=f"Generated content {'matches' if accuracy == 1.0 else 'does not match'} reference exactly",
                )
            ],
            extra_data={
                "generated": generated,
                "reference": reference,
                "accuracy": accuracy,
            },
        )

BaseHarmlessnessListWiseReward

Bases: BaseListWisePrincipleReward

The assistant aims to answer questions, avoiding harmful behaviors such as spreading misinformation, spreading harmful ideas, or engaging in other harmful activities.

Source code in rm_gallery/gallery/rm/alignment/base.py
61
62
63
64
65
66
67
68
69
70
@RewardRegistry.register("base_harmlessness_listwise")
class BaseHarmlessnessListWiseReward(BaseListWisePrincipleReward):
    """The assistant aims to answer questions, avoiding harmful behaviors such as spreading misinformation, spreading harmful ideas, or engaging in other harmful activities."""

    name: str = Field(default="base_harmlessness_listwise")
    desc: str = Field(default=DEFAULT_HARMLESSNESS_DESC)
    scenario: str = Field(
        default=DEFAULT_HARMLESSNESS_SCENARIO, description="assistant scenario"
    )
    principles: List[str] = Field(default=DEFAULT_HARMLESSNESS_PRINCIPLES)

BaseHarmlessnessPointWiseReward

Bases: BasePointWisePrincipleReward

The assistant aims to answer questions, avoiding harmful behaviors such as spreading misinformation, spreading harmful ideas, or engaging in other harmful activities.

Source code in rm_gallery/gallery/rm/alignment/base.py
 97
 98
 99
100
101
102
103
104
105
106
@RewardRegistry.register("base_harmlessness_pointwise")
class BaseHarmlessnessPointWiseReward(BasePointWisePrincipleReward):
    """The assistant aims to answer questions, avoiding harmful behaviors such as spreading misinformation, spreading harmful ideas, or engaging in other harmful activities."""

    name: str = Field(default="base_harmlessness_pointwise")
    desc: str = Field(default=DEFAULT_HARMLESSNESS_DESC)
    scenario: str = Field(
        default=DEFAULT_HARMLESSNESS_SCENARIO, description="assistant scenario"
    )
    principles: List[str] = Field(default=DEFAULT_HARMLESSNESS_PRINCIPLES)

BaseHelpfulnessListWiseReward

Bases: BaseListWisePrincipleReward

The assistant aims to provide helpful and informative responses to users, responding to their queries with relevant and accurate information.

Source code in rm_gallery/gallery/rm/alignment/base.py
49
50
51
52
53
54
55
56
57
58
@RewardRegistry.register("base_helpfulness_listwise")
class BaseHelpfulnessListWiseReward(BaseListWisePrincipleReward):
    """The assistant aims to provide helpful and informative responses to users, responding to their queries with relevant and accurate information."""

    name: str = Field(default="base_helpfulness_listwise")
    desc: str = Field(default=DEFAULT_HELPFULNESS_DESC)
    scenario: str = Field(
        default=DEFAULT_HELPFULNESS_SCENARIO, description="assistant scenario"
    )
    principles: List[str] = Field(default=DEFAULT_HELPFULNESS_PRINCIPLES)

BaseHelpfulnessPointWiseReward

Bases: BasePointWisePrincipleReward

The assistant aims to provide helpful and informative responses to users, responding to their queries with relevant and accurate information.

Source code in rm_gallery/gallery/rm/alignment/base.py
85
86
87
88
89
90
91
92
93
94
@RewardRegistry.register("base_helpfulness_pointwise")
class BaseHelpfulnessPointWiseReward(BasePointWisePrincipleReward):
    """The assistant aims to provide helpful and informative responses to users, responding to their queries with relevant and accurate information."""

    name: str = Field(default="base_helpfulness_pointwise")
    desc: str = Field(default=DEFAULT_HELPFULNESS_DESC)
    scenario: str = Field(
        default=DEFAULT_HELPFULNESS_SCENARIO, description="assistant scenario"
    )
    principles: List[str] = Field(default=DEFAULT_HELPFULNESS_PRINCIPLES)

BaseHonestyListWiseReward

Bases: BaseListWisePrincipleReward

The assistant aims to truthfully answer the user’s questions with no bias or prejudice.

Source code in rm_gallery/gallery/rm/alignment/base.py
73
74
75
76
77
78
79
80
81
82
@RewardRegistry.register("base_honesty_listwise")
class BaseHonestyListWiseReward(BaseListWisePrincipleReward):
    """The assistant aims to truthfully answer the user’s questions with no bias or prejudice."""

    name: str = Field(default="base_honesty_listwise")
    desc: str = Field(default=DEFAULT_HONESTY_DESC)
    scenario: str = Field(
        default=DEFAULT_HONESTY_SCENARIO, description="assistant scenario"
    )
    principles: List[str] = Field(default=DEFAULT_HONESTY_PRINCIPLES)

BaseHonestyPointWiseReward

Bases: BasePointWisePrincipleReward

The assistant aims to truthfully answer the user’s questions with no bias or prejudice.

Source code in rm_gallery/gallery/rm/alignment/base.py
109
110
111
112
113
114
115
116
117
118
@RewardRegistry.register("base_honesty_pointwise")
class BaseHonestyPointWiseReward(BasePointWisePrincipleReward):
    """The assistant aims to truthfully answer the user’s questions with no bias or prejudice."""

    name: str = Field(default="base_honesty_pointwise")
    desc: str = Field(default=DEFAULT_HONESTY_DESC)
    scenario: str = Field(
        default=DEFAULT_HONESTY_SCENARIO, description="assistant scenario"
    )
    principles: List[str] = Field(default=DEFAULT_HONESTY_PRINCIPLES)

BaseLLMReward

Bases: BaseReward

Base class for LLM-based reward modules.

Provides framework for prompt-based interaction with language models.

Source code in rm_gallery/core/reward/base.py
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
class BaseLLMReward(BaseReward):
    """
    Base class for LLM-based reward modules.

    Provides framework for prompt-based interaction with language models.
    """

    llm: BaseLLM | None = Field(default=None, description="llm client")
    template: Type[BasePromptTemplate] = Field(
        default=BasePromptTemplate, description="prompt template"
    )
    max_retries: int = Field(default=3, description="max retries")

    def _before_evaluate(self, **kwargs) -> dict:
        """
        Prepares parameters for prompt generation.

        Returns:
            dict: Parameters for prompt template formatting
        """
        return {}

    def _after_evaluate(self, response: BasePromptTemplate, **kwargs) -> RewardResult:
        """
        Processes LLM response into reward metrics.

        Parameters:
            response (BasePromptTemplate): Parsed LLM response

        Returns:
            RewardResult: Structured reward metrics
        """
        return RewardResult(
            name=self.name, details=[], extra_data=response.model_dump()
        )

    def _format(self, **kwargs):
        """
        Generates prompt without executing LLM call.

        Returns:
            RewardResult: Contains generated prompt in extra_data
        """
        params = self._before_evaluate(**kwargs)
        prompt = self.template.format(**params)
        # logger.info(f"prompt: {prompt}")
        return RewardResult(name=self.name, details=[], extra_data={"prompt": prompt})

    def _evaluate(self, **kwargs) -> RewardResult:
        """
        Full LLM evaluation cycle: prepare, execute, process.

        Handles errors during LLM interaction gracefully.

        Returns:
            RewardResult: Evaluation results with metrics and metadata
        """
        assert self.llm is not None
        for i in range(self.max_retries):
            try:
                params = self._before_evaluate(**kwargs)
                prompt = self.template.format(
                    enable_thinking=self.llm.enable_thinking, **params
                )
                logger.info(f"prompt: {prompt}")

                response = self.llm.simple_chat(query=prompt)
                response = self.template.parse(response)
                logger.info(f"response: {response}")

                result = self._after_evaluate(response=response, **kwargs)
                result.extra_data["prompt"] = prompt
                break
            except Exception as e:
                logger.error(f"API call failed: {str(e)}")
                result = RewardResult(
                    name=self.name, details=[], extra_data={"error": str(e)}
                )
        return result

    def format(
        self,
        sample: DataSample,
        thread_pool: ThreadPoolExecutor | None = None,
        **kwargs,
    ):
        """
        Process and format the input sample using parallel execution capabilities.

        @param sample: Input data sample to be formatted. Accepts either a DataSample instance
                        or a dictionary that can be validated into a DataSample object
        @param thread_pool: Optional thread pool executor for parallel processing. If None,
                            parallel execution will use a default/single-threaded context
        @param kwargs: Additional keyword arguments passed to the parallel execution handler
                        and underlying formatting operations

        @return: Formatted result from the parallel processing pipeline. Type depends on
                implementation of _format and _parallel methods

        Notes:
        - When input is a dictionary, automatically converts it to DataSample using model validation
        - Utilizes internal parallel processing infrastructure for improved throughput
        - Thread-safe when provided with appropriate thread pool executor
        """

        # Convert dictionary input to DataSample instance if necessary
        if isinstance(sample, dict):
            sample = DataSample.model_validate(sample)

        # Execute formatting operation through parallel processing infrastructure
        return self._parallel(
            self._format, sample=sample, thread_pool=thread_pool, **kwargs
        )

    async def _async_parallel(
        self,
        func: Callable,
        sample: DataSample,
        semaphore: asyncio.Semaphore,
        **kwargs,
    ) -> DataSample:
        """
        Default async parallel implementation for BaseLLMReward.

        Since BaseLLMReward doesn't define its own _parallel method, this provides
        a default implementation that simply calls the function directly.

        Parameters:
            func (Callable): Function to call
            sample (DataSample): Input sample
            semaphore (asyncio.Semaphore): Semaphore for concurrency control
            **kwargs: Additional arguments

        Returns:
            DataSample: Processed sample
        """
        sample = sample.model_copy(deep=True)

        # Use asyncio.to_thread to wrap the sync function
        async with semaphore:
            result = await asyncio.to_thread(func, sample=sample, **kwargs)

        # For BaseLLMReward, we typically work with single responses
        # Add the result to the first output
        if sample.output:
            sample.output[0].answer.reward.details.extend(result.details)
            sample.output[0].answer.additional_kwargs[self.name] = result.extra_data

        return sample

    def refine(
        self,
        sample: DataSample,
        max_iterations: int = 3,
        llm: BaseLLM | None = None,
        thread_pool: ThreadPoolExecutor | None = None,
        **kwargs,
    ) -> DataSample:
        """
        Refines a given data sample using an LLM (Large Language Model) with a specified maximum number of iterations.

        Args:
            sample (DataSample): The input data sample to be refined.
            max_iterations (int, optional): The maximum number of refinement iterations. Defaults to 3.
            llm (BaseLLM | None, optional): The LLM instance to use for refinement. If None, uses the default LLM from the instance. Defaults to None.
            thread_pool (ThreadPoolExecutor | None, optional): A thread pool executor for managing concurrent tasks. If None, no thread pool is used. Defaults to None.
            **kwargs: Additional keyword arguments for flexibility.

        Returns:
            DataSample: The refined data sample after processing.
        """
        # Set default LLM if not provided
        llm = self.llm if llm is None else llm

        from rm_gallery.core.reward.refinement import LLMRefinement

        return LLMRefinement(reward=self, llm=llm, max_iterations=max_iterations).run(
            sample, thread_pool=thread_pool, **kwargs
        )

format(sample, thread_pool=None, **kwargs)

Process and format the input sample using parallel execution capabilities.

@param sample: Input data sample to be formatted. Accepts either a DataSample instance or a dictionary that can be validated into a DataSample object @param thread_pool: Optional thread pool executor for parallel processing. If None, parallel execution will use a default/single-threaded context @param kwargs: Additional keyword arguments passed to the parallel execution handler and underlying formatting operations

@return: Formatted result from the parallel processing pipeline. Type depends on implementation of _format and _parallel methods

Notes: - When input is a dictionary, automatically converts it to DataSample using model validation - Utilizes internal parallel processing infrastructure for improved throughput - Thread-safe when provided with appropriate thread pool executor

Source code in rm_gallery/core/reward/base.py
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
def format(
    self,
    sample: DataSample,
    thread_pool: ThreadPoolExecutor | None = None,
    **kwargs,
):
    """
    Process and format the input sample using parallel execution capabilities.

    @param sample: Input data sample to be formatted. Accepts either a DataSample instance
                    or a dictionary that can be validated into a DataSample object
    @param thread_pool: Optional thread pool executor for parallel processing. If None,
                        parallel execution will use a default/single-threaded context
    @param kwargs: Additional keyword arguments passed to the parallel execution handler
                    and underlying formatting operations

    @return: Formatted result from the parallel processing pipeline. Type depends on
            implementation of _format and _parallel methods

    Notes:
    - When input is a dictionary, automatically converts it to DataSample using model validation
    - Utilizes internal parallel processing infrastructure for improved throughput
    - Thread-safe when provided with appropriate thread pool executor
    """

    # Convert dictionary input to DataSample instance if necessary
    if isinstance(sample, dict):
        sample = DataSample.model_validate(sample)

    # Execute formatting operation through parallel processing infrastructure
    return self._parallel(
        self._format, sample=sample, thread_pool=thread_pool, **kwargs
    )

refine(sample, max_iterations=3, llm=None, thread_pool=None, **kwargs)

Refines a given data sample using an LLM (Large Language Model) with a specified maximum number of iterations.

Parameters:

Name Type Description Default
sample DataSample

The input data sample to be refined.

required
max_iterations int

The maximum number of refinement iterations. Defaults to 3.

3
llm BaseLLM | None

The LLM instance to use for refinement. If None, uses the default LLM from the instance. Defaults to None.

None
thread_pool ThreadPoolExecutor | None

A thread pool executor for managing concurrent tasks. If None, no thread pool is used. Defaults to None.

None
**kwargs

Additional keyword arguments for flexibility.

{}

Returns:

Name Type Description
DataSample DataSample

The refined data sample after processing.

Source code in rm_gallery/core/reward/base.py
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
def refine(
    self,
    sample: DataSample,
    max_iterations: int = 3,
    llm: BaseLLM | None = None,
    thread_pool: ThreadPoolExecutor | None = None,
    **kwargs,
) -> DataSample:
    """
    Refines a given data sample using an LLM (Large Language Model) with a specified maximum number of iterations.

    Args:
        sample (DataSample): The input data sample to be refined.
        max_iterations (int, optional): The maximum number of refinement iterations. Defaults to 3.
        llm (BaseLLM | None, optional): The LLM instance to use for refinement. If None, uses the default LLM from the instance. Defaults to None.
        thread_pool (ThreadPoolExecutor | None, optional): A thread pool executor for managing concurrent tasks. If None, no thread pool is used. Defaults to None.
        **kwargs: Additional keyword arguments for flexibility.

    Returns:
        DataSample: The refined data sample after processing.
    """
    # Set default LLM if not provided
    llm = self.llm if llm is None else llm

    from rm_gallery.core.reward.refinement import LLMRefinement

    return LLMRefinement(reward=self, llm=llm, max_iterations=max_iterations).run(
        sample, thread_pool=thread_pool, **kwargs
    )

BaseListWisePrincipleReward

Bases: BasePrincipleReward, BaseListWiseReward

List-wise principle evaluation using LLM.

Compares responses against each other based on ethical principles.

Source code in rm_gallery/core/reward/base.py
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
class BaseListWisePrincipleReward(BasePrincipleReward, BaseListWiseReward):
    """
    List-wise principle evaluation using LLM.

    Compares responses against each other based on ethical principles.
    """

    desc: str = Field(
        default="""Please act as an impartial judge and evaluate the quality of the answers provided by some assistants to the user question displayed below.
You should critically and accurately assess the assistant’s answer with the key principles and choose the assistant that follows the user’s query and answers the user’s question best.
Avoid any position biases and ensure that the order in which the responses were presented does not influence your decision.
Do not allow the length of the responses to influence your evaluation.
Be as goal as possible.""",
        description="description",
    )

    template: Type[BasePromptTemplate] = PrincipleListWiseTemplate

    def _before_evaluate(self, sample: DataSample, **kwargs) -> Dict:
        """
        Prepares list-wise evaluation parameters.

        Parameters:
            sample (DataSample): Multi-response sample to evaluate

        Returns:
            Dict: Parameters including all responses for comparison
        """
        params = super()._before_evaluate(sample=sample, **kwargs)
        answers = [output.answer.content for output in sample.output]
        params["answers"] = answers
        return params

    def _after_evaluate(
        self, response: PrincipleListWiseTemplate, sample: DataSample, **kwargs
    ) -> RewardResult:
        """
        Converts LLM response to list-wise ranking metrics.

        Parameters:
            response (PrincipleListWiseTemplate): Parsed LLM comparison

        Returns:
            RewardResult: Relative ranking of responses
        """
        scores = [0 for i in range(len(sample.output))]
        scores[response.best - 1] = 1
        return RewardResult(
            name=self.name,
            details=[
                RewardDimensionWithRank(
                    name=self.name, reason=response.reason, rank=scores
                )
            ],
        )

BaseListWiseReward

Bases: BaseReward

List-wise reward module for comparative evaluation of multiple responses.

Evaluates responses as a group to determine relative rankings.

Source code in rm_gallery/core/reward/base.py
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
class BaseListWiseReward(BaseReward):
    """
    List-wise reward module for comparative evaluation of multiple responses.

    Evaluates responses as a group to determine relative rankings.
    """

    @abstractmethod
    def _evaluate(
        self, sample: DataSample, **kwargs
    ) -> RewardResult[RewardDimensionWithRank]:
        """
        Group evaluation logic to determine response rankings.

        Parameters:
            sample (DataSample): Multi-response sample for comparative evaluation
            **kwargs: Evaluation parameters

        Returns:
            RewardResult[RewardDimensionWithRank]: Relative ranking metrics
        """
        ...

    def _parallel(
        self,
        func: Callable,
        sample: DataSample,
        thread_pool: ThreadPoolExecutor | None = None,
        **kwargs,
    ) -> DataSample:
        """
        Executes list-wise evaluation on a group of responses in parallel.

        Applies ranking logic to all responses in the sample using parallel processing.
        Modifies the sample in-place by adding reward details to outputs and storing
        additional metadata in the input.

        Parameters:
            func (Callable): Evaluation function to apply to the sample
            sample (DataSample): Multi-response sample to evaluate
            thread_pool (ThreadPoolExecutor | None): Optional executor for parallel processing
            **kwargs: Parameters for evaluation logic

        Returns:
            DataSample: Responses with ranking information populated
        """
        # Create deep copy to avoid modifying original sample
        sample = sample.model_copy(deep=True)

        # Execute evaluation function with provided parameters
        result = func(sample=sample, **kwargs)

        # Append reward details to corresponding output objects
        for reward in result.details:
            for i, output in enumerate(sample.output):
                output.answer.reward.details.append(reward[i])

        for i, output in enumerate(sample.output):
            if len(output.answer.reward.details) > 0:
                output.answer.reward.score = sum(
                    r.score for r in output.answer.reward.details
                ) / len(output.answer.reward.details)

        # Store additional metadata in sample input
        sample.input[-1].additional_kwargs[self.name] = result.extra_data
        return sample

    async def _async_parallel(
        self,
        func: Callable,
        sample: DataSample,
        semaphore: asyncio.Semaphore,
        **kwargs,
    ) -> DataSample:
        """
        Async version of _parallel method for BaseListWiseReward.

        Executes list-wise evaluation on a group of responses using async execution.

        Parameters:
            func (Callable): Evaluation function to apply to the sample
            sample (DataSample): Multi-response sample to evaluate
            semaphore (asyncio.Semaphore): Semaphore for async concurrency control
            **kwargs: Parameters for evaluation logic

        Returns:
            DataSample: Responses with ranking information populated
        """
        sample = sample.model_copy(deep=True)

        # Use asyncio.to_thread to wrap the sync function
        async with semaphore:
            result = await asyncio.to_thread(func, sample=sample, **kwargs)

        # Append reward details to corresponding output objects
        for reward in result.details:
            for i, output in enumerate(sample.output):
                output.answer.reward.details.append(reward[i])

        for i, output in enumerate(sample.output):
            if len(output.answer.reward.details) > 0:
                output.answer.reward.score = sum(
                    r.score for r in output.answer.reward.details
                ) / len(output.answer.reward.details)

        # Store additional metadata in sample input
        sample.input[-1].additional_kwargs[self.name] = result.extra_data

        return sample

BasePointWisePrincipleReward

Bases: BasePrincipleReward, BasePointWiseReward

Point-wise principle evaluation using LLM.

Evaluates each response individually against ethical principles.

Source code in rm_gallery/core/reward/base.py
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
class BasePointWisePrincipleReward(BasePrincipleReward, BasePointWiseReward):
    """
    Point-wise principle evaluation using LLM.

    Evaluates each response individually against ethical principles.
    """

    desc: str = Field(
        default="""Please act as an unbiased and impartial evaluator tasked with assessing the quality of the responses provided below.
You should critically and accurately assess the assistant’s answer with the key principles without any potential bias.
Do not allow the length of the responses to influence your evaluation.
Be as goal as possible.""",
        description="description",
    )

    def _before_evaluate(self, sample: DataSample, **kwargs) -> Dict:
        """
        Adds response content to evaluation parameters.

        Parameters:
            sample (DataSample): Sample containing response to evaluate

        Returns:
            Dict: Parameters including response content
        """
        params = super()._before_evaluate(sample=sample, **kwargs)
        params["answer"] = sample.output[0].answer.content
        return params

    def _after_evaluate(
        self, response: PrinciplePointWiseTemplate, sample: DataSample, **kwargs
    ) -> RewardResult:
        """
        Converts LLM response to point-wise reward metrics.

        Parameters:
            response (PrinciplePointWiseTemplate): Parsed LLM evaluation

        Returns:
            RewardResult: Violation score with explanation
        """
        # Convert violation list to a single score (e.g., average or sum)
        score = (
            1 - len(response.violation) / len(self.principles)
            if response.violation
            else 1.0
        )
        return RewardResult(
            name=self.name,
            details=[
                RewardDimensionWithScore(
                    name=self.name, reason=response.reason, score=score
                )
            ],
        )

BasePointWiseReward

Bases: BaseReward

Point-wise reward module for individual response evaluation.

Evaluates each response independently without considering relative ranking.

Source code in rm_gallery/core/reward/base.py
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
class BasePointWiseReward(BaseReward):
    """
    Point-wise reward module for individual response evaluation.

    Evaluates each response independently without considering relative ranking.
    """

    @abstractmethod
    def _evaluate(
        self, sample: DataSample, **kwargs
    ) -> RewardResult[RewardDimensionWithScore]:
        """
        Processes a single response to generate reward metrics.

        Parameters:
            sample (DataSample): Single-response data sample
            **kwargs: Evaluation parameters

        Returns:
            RewardResult[RewardDimensionWithScore]: Response-specific reward metrics
        """
        ...

    def _parallel(
        self,
        func: Callable,
        sample: DataSample,
        thread_pool: ThreadPoolExecutor | None = None,
        **kwargs,
    ) -> DataSample:
        """
        Processes responses in a data sample using parallel or sequential execution.

        This method applies the provided function to each response in the sample,
        either in parallel using a thread pool or sequentially. Results are merged
        back into the corresponding response objects.

        Parameters:
            func (Callable): Function to apply to each response. Should accept a
                DataSample and return an object with 'details' and 'extra_data' attributes.
            sample (DataSample): Input sample containing multiple responses to process
            thread_pool (ThreadPoolExecutor | None): Optional thread pool for parallel execution
            **kwargs: Additional arguments passed to func

        Returns:
            DataSample: Modified copy of input sample with reward metrics updated in each response

        The method creates a deep copy of the input sample to avoid modifying original data.
        When using a thread pool, it submits tasks for each response and waits for completion
        before merging results. Response objects are updated with both reward details and
        additional metadata from processing results.
        """
        sample = sample.model_copy(deep=True)
        futures = []
        for i, output in enumerate(sample.output):
            # Create sub-sample for individual response processing
            subsample = DataSample(
                unique_id=sample.unique_id, input=sample.input, output=[output]
            )

            if thread_pool:
                futures.append(
                    (
                        i,
                        thread_pool.submit(func, sample=subsample, **kwargs),
                    )
                )
            else:
                result = func(
                    sample=subsample,
                    **kwargs,
                )
                output.answer.reward.details += result.details
                output.answer.additional_kwargs[self.name] = result.extra_data

        # Process parallel execution results
        if thread_pool:
            wait([future[-1] for future in futures], return_when=ALL_COMPLETED)
            # Merge results back into sample outputs
            for i, future in futures:
                result = future.result()
                output = sample.output[i]
                output.answer.reward.details += result.details
                output.answer.additional_kwargs[self.name] = result.extra_data

        for output in sample.output:
            if len(output.answer.reward.details) > 0:
                output.answer.reward.score = sum(
                    r.score for r in output.answer.reward.details
                ) / len(output.answer.reward.details)

        return sample

    async def _async_parallel(
        self,
        func: Callable,
        sample: DataSample,
        semaphore: asyncio.Semaphore,
        **kwargs,
    ) -> DataSample:
        """
        Async version of _parallel method for BasePointWiseReward.

        Processes responses in a data sample using async execution with semaphore control.

        Parameters:
            func (Callable): Function to apply to each response
            sample (DataSample): Input sample containing multiple responses to process
            semaphore (asyncio.Semaphore): Semaphore for async concurrency control
            **kwargs: Additional arguments passed to func

        Returns:
            DataSample: Modified copy of input sample with reward metrics updated in each response
        """
        sample = sample.model_copy(deep=True)

        async def _async_evaluate_output(i: int, output):
            """Async wrapper for individual output evaluation"""
            subsample = DataSample(
                unique_id=sample.unique_id, input=sample.input, output=[output]
            )

            # Use asyncio.to_thread to wrap the sync function
            async with semaphore:
                result = await asyncio.to_thread(func, sample=subsample, **kwargs)

            return i, result

        # Create tasks for all outputs
        tasks = []
        for i, output in enumerate(sample.output):
            task = asyncio.create_task(_async_evaluate_output(i, output))
            tasks.append(task)

        # Wait for all tasks to complete
        results = await asyncio.gather(*tasks)

        # Merge results back into sample outputs
        for i, result in results:
            output = sample.output[i]
            output.answer.reward.details += result.details
            output.answer.additional_kwargs[self.name] = result.extra_data

        # Calculate average score for each output
        for output in sample.output:
            if len(output.answer.reward.details) > 0:
                output.answer.reward.score = sum(
                    r.score for r in output.answer.reward.details
                ) / len(output.answer.reward.details)

        return sample

BasePromptTemplate

Bases: BaseModel

BasePromptTemplate serves as the abstract base class for all prompt template implementations.

This class provides core functionality for parsing structured templates, formatting output schemas, and validating content against defined field requirements. It implements the fundamental patterns for bidirectional conversion between string representations and structured data models.

Attributes:

Name Type Description
reason str

A field capturing the reasoning trace for decision-making processes

Source code in rm_gallery/core/reward/template.py
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
class BasePromptTemplate(BaseModel):
    """
    BasePromptTemplate serves as the abstract base class for all prompt template implementations.

    This class provides core functionality for parsing structured templates, formatting output schemas,
    and validating content against defined field requirements. It implements the fundamental patterns
    for bidirectional conversion between string representations and structured data models.

    Attributes:
        reason (str): A field capturing the reasoning trace for decision-making processes
    """

    model_config = ConfigDict(validate_by_alias=True, validate_by_name=True)
    reason: Optional[str] = Field(
        default=None, description="your reasoning trace", alias="think"
    )

    @classmethod
    def _parse(cls, text: str) -> Dict[str, str]:
        """
        Extracts key-value pairs from XML-style tagged text using regex pattern matching.

        This internal method identifies structured patterns in the format <key>value</key>
        and converts them into a dictionary mapping for further processing.

        Args:
            text (str): Input string containing XML-style tagged content

        Returns:
            Dict[str, str]: Dictionary mapping of tag names to corresponding values
        """
        pattern = r"<([^>]+)>(.*)</\1>"
        matches = re.findall(pattern, text, re.DOTALL)
        contents = {match[0]: match[1].strip() for match in matches}
        return contents

    @classmethod
    def parse(cls, text: str) -> "BasePromptTemplate":
        """
        Converts a structured text string into a validated template instance.

        Processes input text through internal parsing mechanism and constructs
        a model instance with validated field values.

        Args:
            text (str): XML-style formatted string containing template data

        Returns:
            BasePromptTemplate: Constructed instance with parsed field values
        """
        contents = cls._parse(text)
        contents.setdefault("think", "")
        return cls(**contents)

    @classmethod
    def schema(cls, enable_thinking: bool = False, **kwargs) -> str:
        """
        Generates a descriptive schema documentation string for the template structure.

        Creates a human-readable documentation showing required fields, their descriptions,
        and proper output formatting requirements.

        Args:
            enable_thinking (bool): Flag to include/exclude thinking field in schema
            **kwargs: Additional parameters passed to schema generation

        Returns:
            str: Formatted schema documentation string with field descriptions
        """
        schema_str = "Note: Ensure all outputs are placed within the tags like <tag> </tag> as required!!!\n"
        for key, property in cls.model_json_schema(by_alias=True)["properties"].items():
            if key == "model_config":
                continue

            if key == "think" and enable_thinking:
                continue

            if key == "think":
                schema_str += f"<reason>\n{property['description']}\n</reason>\n"
            else:
                schema_str += f"<{key}>\n{property['description']}\n</{key}>\n"
        return schema_str

    @classmethod
    def format(cls, enable_thinking: bool = False, **kwargs) -> str:
        """
        Formats provided content into the template's required output structure.

        Takes arbitrary keyword arguments and formats them into the appropriate
        template structure for response generation.

        Args:
            enable_thinking (bool): Flag to control inclusion of reasoning field
            **kwargs: Content to be formatted into template structure

        Returns:
            str: Formatted string ready for model processing
        """
        ...

format(enable_thinking=False, **kwargs) classmethod

Formats provided content into the template's required output structure.

Takes arbitrary keyword arguments and formats them into the appropriate template structure for response generation.

Parameters:

Name Type Description Default
enable_thinking bool

Flag to control inclusion of reasoning field

False
**kwargs

Content to be formatted into template structure

{}

Returns:

Name Type Description
str str

Formatted string ready for model processing

Source code in rm_gallery/core/reward/template.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
@classmethod
def format(cls, enable_thinking: bool = False, **kwargs) -> str:
    """
    Formats provided content into the template's required output structure.

    Takes arbitrary keyword arguments and formats them into the appropriate
    template structure for response generation.

    Args:
        enable_thinking (bool): Flag to control inclusion of reasoning field
        **kwargs: Content to be formatted into template structure

    Returns:
        str: Formatted string ready for model processing
    """
    ...

parse(text) classmethod

Converts a structured text string into a validated template instance.

Processes input text through internal parsing mechanism and constructs a model instance with validated field values.

Parameters:

Name Type Description Default
text str

XML-style formatted string containing template data

required

Returns:

Name Type Description
BasePromptTemplate BasePromptTemplate

Constructed instance with parsed field values

Source code in rm_gallery/core/reward/template.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@classmethod
def parse(cls, text: str) -> "BasePromptTemplate":
    """
    Converts a structured text string into a validated template instance.

    Processes input text through internal parsing mechanism and constructs
    a model instance with validated field values.

    Args:
        text (str): XML-style formatted string containing template data

    Returns:
        BasePromptTemplate: Constructed instance with parsed field values
    """
    contents = cls._parse(text)
    contents.setdefault("think", "")
    return cls(**contents)

schema(enable_thinking=False, **kwargs) classmethod

Generates a descriptive schema documentation string for the template structure.

Creates a human-readable documentation showing required fields, their descriptions, and proper output formatting requirements.

Parameters:

Name Type Description Default
enable_thinking bool

Flag to include/exclude thinking field in schema

False
**kwargs

Additional parameters passed to schema generation

{}

Returns:

Name Type Description
str str

Formatted schema documentation string with field descriptions

Source code in rm_gallery/core/reward/template.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
@classmethod
def schema(cls, enable_thinking: bool = False, **kwargs) -> str:
    """
    Generates a descriptive schema documentation string for the template structure.

    Creates a human-readable documentation showing required fields, their descriptions,
    and proper output formatting requirements.

    Args:
        enable_thinking (bool): Flag to include/exclude thinking field in schema
        **kwargs: Additional parameters passed to schema generation

    Returns:
        str: Formatted schema documentation string with field descriptions
    """
    schema_str = "Note: Ensure all outputs are placed within the tags like <tag> </tag> as required!!!\n"
    for key, property in cls.model_json_schema(by_alias=True)["properties"].items():
        if key == "model_config":
            continue

        if key == "think" and enable_thinking:
            continue

        if key == "think":
            schema_str += f"<reason>\n{property['description']}\n</reason>\n"
        else:
            schema_str += f"<{key}>\n{property['description']}\n</{key}>\n"
    return schema_str

BrainstormingListWiseReward

Bases: BaseHelpfulnessListWiseReward

Brainstorming: Generating text to come up with new ideas or solutions, with an emphasis on creativity and driving thinking.

Source code in rm_gallery/gallery/rm/alignment/helpfulness/brainstorming.py
20
21
22
23
24
25
26
27
@RewardRegistry.register("brainstorming_listwise_reward")
class BrainstormingListWiseReward(BaseHelpfulnessListWiseReward):
    """Brainstorming: Generating text to come up with new ideas or solutions, with an emphasis on creativity and driving thinking."""

    name: str = Field(default="brainstorming_listwise_reward")
    scenario: str = Field(default=SCENARIO, description="assistant scenario")
    principles: List[str] = Field(default=PRINCIPLES)
    desc: str = Field(default=DESC)

CARMO

Bases: BaseLLMReward, BaseListWiseReward

Context-Aware Reward Modeling

Source code in rm_gallery/gallery/rm/carmo.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
class CARMO(BaseLLMReward, BaseListWiseReward):
    """Context-Aware Reward Modeling"""

    def _before_evaluate(self, sample: DataSample, **kwargs) -> dict:
        instruction = sample.input[-1].content

        query = CriteriaGenerationPrompt.format(instruction=instruction)
        response = self.llm.simple_chat(query)
        principles = CriteriaGenerationPrompt.parse(response).principles
        completions = [output.answer.content for output in sample.output]

        return dict(
            principles=principles,
            instruction=instruction,
            completions=completions,
        )

    def _after_evaluate(
        self, response: RelativeEvaluationPrompt, sample: DataSample, **kwargs
    ) -> RewardResult:
        """
        Converts LLM response to list-wise ranking metrics.

        Parameters:
            response (RelativeEvaluationPrompt): Parsed LLM comparison

        Returns:
            RewardResult: Relative ranking of responses
        """
        scores = [0 for i in range(len(sample.output))]
        scores[response.best - 1] = 1
        return RewardResult(
            name=self.name,
            details=[
                RewardDimensionWithRank(
                    name=self.name, reason=response.reason, rank=scores
                )
            ],
        )

ChatListWiseReward

Bases: BaseHelpfulnessListWiseReward

Chat: Simulates human conversation and communicates a variety of topics through text understanding and generation, emphasizing coherence and natural flow of interaction.

Source code in rm_gallery/gallery/rm/alignment/helpfulness/chat.py
22
23
24
25
26
27
28
29
@RewardRegistry.register("chat_listwise_reward")
class ChatListWiseReward(BaseHelpfulnessListWiseReward):
    """Chat: Simulates human conversation and communicates a variety of topics through text understanding and generation, emphasizing coherence and natural flow of interaction."""

    name: str = Field(default="chat_listwise_reward", description="reward name")
    scenario: str = Field(default=SCENARIO, description="assistant scenario")
    principles: List[str] = Field(default=PRINCIPLES)
    desc: str = Field(default=DESC)

ClassificationListWiseReward

Bases: BaseHelpfulnessListWiseReward

Classification: Entails assigning predefined categories or labels to text based on its content.

Source code in rm_gallery/gallery/rm/alignment/helpfulness/classification.py
18
19
20
21
22
23
24
25
26
27
@RewardRegistry.register("classification_listwise_reward")
class ClassificationListWiseReward(BaseHelpfulnessListWiseReward):
    """Classification: Entails assigning predefined categories or labels to text based on its content."""

    name: str = Field(
        default="classification_listwise_reward", description="reward name"
    )
    scenario: str = Field(default=SCENARIO, description="assistant scenario")
    principles: List[str] = Field(default=PRINCIPLES)
    desc: str = Field(default=DESC)

ClosedQAListWiseReward

Bases: BaseHelpfulnessListWiseReward

Closed QA: Search for direct answers to specific questions in given text sources (i.e. given context, given options).

Source code in rm_gallery/gallery/rm/alignment/helpfulness/closed_qa.py
16
17
18
19
20
21
22
23
@RewardRegistry.register("closed_qa_listwise_reward")
class ClosedQAListWiseReward(BaseHelpfulnessListWiseReward):
    """Closed QA: Search for direct answers to specific questions in given text sources (i.e. given context, given options)."""

    name: str = Field(default="closed_qa_listwise_reward", description="reward name")
    scenario: str = Field(default=SCENARIO, description="assistant scenario")
    principles: List[str] = Field(default=PRINCIPLES)
    desc: str = Field(default=DESC)

CodeExecutionReward

Bases: BasePointWiseReward

Executes code against test cases and evaluates correctness based on test case results.

This reward model evaluates code by executing it against test cases using a testing framework that supports both call-based and standard input code evaluation methods.

Source code in rm_gallery/gallery/rm/code/code.py
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
@RewardRegistry.register("code_execution")
class CodeExecutionReward(BasePointWiseReward):
    """
    Executes code against test cases and evaluates correctness based on test case results.

    This reward model evaluates code by executing it against test cases using a testing framework
    that supports both call-based and standard input code evaluation methods.
    """

    name: str = Field(default="code_execution", description="Code execution reward")
    continuous: bool = Field(
        default=True, description="Use continuous scoring (partial credit)"
    )
    timeout: int = Field(
        default=10, description="Timeout in seconds for code execution"
    )
    test_framework_available: bool = Field(
        default=True, description="Whether testing framework is available"
    )
    compute_score: Optional[Any] = Field(
        default=None, description="Compute score function"
    )

    def __init__(self, **data):
        super().__init__(**data)
        try:
            from rm_gallery.gallery.rm.code.prime_code import compute_score

            self.compute_score = compute_score
            self.test_framework_available = True
        except ImportError:
            print(
                "Warning: Code testing framework not available. Please ensure rm_gallery.gallery.rm.code.prime_code is properly installed."
            )
            self.test_framework_available = False

    def _extract_code(self, content: str) -> str:
        """
        Extract code from content

        Args:
            content: Text content that may contain code blocks

        Returns:
            Extracted code
        """
        # Try to find Python code in various formats
        code_match = re.search(r"```python\n(.*?)\n```", content, re.DOTALL)
        if code_match:
            return code_match.group(1)

        # Try other formats
        code_match = re.search(r"```\n(.*?)\n```", content, re.DOTALL)
        if code_match:
            return code_match.group(1)

        # If no code block markers, assume the entire content is code
        return content

    def _evaluate(
        self, sample: DataSample, **kwargs
    ) -> RewardResult[RewardDimensionWithScore]:
        """
        Evaluate code against test cases

        Args:
            sample: Data sample containing code content and test cases

        Returns:
            RewardResult: Reward result containing evaluation score
        """
        # Extract code from response
        content = sample.output[0].answer.content
        extracted_code = self._extract_code(content)

        # Default values
        score = 0.0
        reason = "No evaluation performed"
        extra_data = {"extracted_code": extracted_code}

        # Check if testing framework is available
        if not self.test_framework_available:
            reason = "Code testing framework not available"
            extra_data["error"] = reason
        else:
            # Get test cases from sample metadata or label
            test_cases = None
            if sample.metadata and "inputs_outputs" in sample.metadata:
                test_cases = sample.metadata["inputs_outputs"]
            elif (
                sample.output[0].answer.label
                and "inputs_outputs" in sample.output[0].answer.label
            ):
                test_cases = sample.output[0].answer.label["inputs_outputs"]

            if not test_cases:
                reason = "No test cases available for evaluation"
            elif not extracted_code:
                score = 0.0
                reason = "No valid code extracted from response"
                extra_data["test_cases"] = test_cases
            else:
                # Convert test cases to string if needed
                if isinstance(test_cases, dict):
                    test_cases_str = json.dumps(test_cases)
                else:
                    test_cases_str = test_cases

                # Evaluate code using testing framework
                try:
                    success, metadata = self.compute_score(
                        completion=extracted_code,
                        test_cases=test_cases_str,
                        continuous=self.continuous,
                    )

                    # Determine score based on success rate
                    if isinstance(success, bool):
                        pass_rate = 1.0 if success else 0.0
                    else:
                        pass_rate = float(success)

                    # Score is always between 0 and 1
                    score = pass_rate

                    # Generate reason based on results
                    if pass_rate == 1.0:
                        reason = "All test cases passed successfully"
                    elif pass_rate == 0.0:
                        reason = "No test cases passed"
                    else:
                        reason = f"Partial success: {pass_rate * 100:.1f}% of test cases passed"

                    # Include metadata in extra_data
                    extra_data = {
                        "extracted_code": extracted_code,
                        "test_cases": test_cases,
                        "pass_rate": pass_rate,
                    }

                except Exception as e:
                    error_traceback = traceback.format_exc()
                    score = 0.0
                    reason = f"Evaluation error: {str(e)}"
                    extra_data = {
                        "extracted_code": extracted_code,
                        "test_cases": test_cases,
                        "error": str(e),
                        "traceback": error_traceback,
                    }

        # Single return statement at the end of the function
        return RewardResult(
            name=self.name,
            details=[
                RewardDimensionWithScore(
                    name=self.name,
                    score=score,
                    reason=reason,
                )
            ],
            extra_data=extra_data,
        )

CodeListWiseReward

Bases: BaseHelpfulnessListWiseReward

Code: Involves generating, understanding, or modifying programming language code within text.

Source code in rm_gallery/gallery/rm/alignment/helpfulness/code.py
15
16
17
18
19
20
21
22
@RewardRegistry.register("code_listwise_reward")
class CodeListWiseReward(BaseHelpfulnessListWiseReward):
    """Code: Involves generating, understanding, or modifying programming language code within text."""

    name: str = Field(default="code_listwise_reward")
    scenario: str = Field(default=SCENARIO, description="assistant scenario")
    principles: List[str] = Field(default=PRINCIPLES)
    desc: str = Field(default=DESC)

CodeStyleReward

Bases: BasePointWiseReward

Basic code style checking including indentation consistency and naming conventions.

Source code in rm_gallery/gallery/rm/code/code.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
@RewardRegistry.register("code_style")
class CodeStyleReward(BasePointWiseReward):
    """Basic code style checking including indentation consistency and naming conventions."""

    name: str = Field(default="code_style", description="Code style reward")

    def _check_indentation(self, code: str) -> tuple[bool, str]:
        """Check indentation consistency"""
        lines = code.split("\n")
        indent_type = None  # 'spaces' or 'tabs'
        indent_size = None

        for line in lines:
            if line.strip():  # Non-empty line
                leading = len(line) - len(line.lstrip())
                if leading > 0:
                    if line.startswith(" "):
                        if indent_type is None:
                            indent_type = "spaces"
                            indent_size = leading
                        elif indent_type != "spaces":
                            return False, "Mixed indentation types (spaces and tabs)"
                    elif line.startswith("\t"):
                        if indent_type is None:
                            indent_type = "tabs"
                        elif indent_type != "tabs":
                            return False, "Mixed indentation types (spaces and tabs)"

        return True, "Consistent indentation"

    def _check_naming(self, code: str) -> tuple[float, str]:
        """Check naming conventions"""
        # Simple naming check
        function_pattern = r"def\s+([a-zA-Z_][a-zA-Z0-9_]*)\s*\("
        variable_pattern = r"([a-zA-Z_][a-zA-Z0-9_]*)\s*="

        functions = re.findall(function_pattern, code)
        variables = re.findall(variable_pattern, code)

        total_names = len(functions) + len(variables)
        if total_names == 0:
            return 1.0, "No names to check"

        good_names = 0

        # Check function names (should be snake_case)
        for func in functions:
            if re.match(r"^[a-z_][a-z0-9_]*$", func):
                good_names += 1

        # Check variable names (should be snake_case)
        for var in variables:
            if re.match(r"^[a-z_][a-z0-9_]*$", var):
                good_names += 1

        score = good_names / total_names
        return (
            score,
            f"Naming convention: {good_names}/{total_names} names follow snake_case",
        )

    def _evaluate(
        self, sample: DataSample, **kwargs
    ) -> RewardResult[RewardDimensionWithScore]:
        """
        Check code style

        Args:
            sample: Data sample containing code

        Returns:
            RewardResult: Reward result containing code style score
        """
        content = sample.output[0].answer.content

        # Extract code blocks
        code_pattern = r"```(?:python)?\n(.*?)\n```"
        code_blocks = re.findall(code_pattern, content, re.DOTALL)

        if not code_blocks:
            return RewardResult(
                name=self.name,
                details=[
                    RewardDimensionWithScore(
                        name=self.name,
                        score=0.0,
                        reason="No code blocks found to check style",
                    )
                ],
                extra_data={"code_blocks": []},
            )

        total_score = 0.0
        details = []

        for i, code in enumerate(code_blocks):
            block_score = 0.0

            # Check indentation
            indent_ok, indent_msg = self._check_indentation(code)
            if indent_ok:
                block_score += 0.5
            details.append(f"Block {i}: {indent_msg}")

            # Check naming
            naming_score, naming_msg = self._check_naming(code)
            block_score += naming_score * 0.5
            details.append(f"Block {i}: {naming_msg}")

            total_score += block_score

        # Average score
        average_score = total_score / len(code_blocks)

        return RewardResult(
            name=self.name,
            details=[
                RewardDimensionWithScore(
                    name=self.name,
                    score=average_score,
                    reason=f"Code style score: {average_score:.3f}; "
                    + "; ".join(details),
                )
            ],
            extra_data={
                "average_score": average_score,
                "code_blocks_count": len(code_blocks),
                "details": details,
            },
        )

DataSample

Bases: BaseModel

Complete data sample structure for reward modeling training and evaluation.

Represents a single interaction with input context, multiple possible outputs, and associated metadata for comprehensive reward model training.

Attributes:

Name Type Description
unique_id str

Unique identifier for tracking and deduplication

input List[ChatMessage]

Conversation context as list of chat messages

output List[DataOutput]

List of possible responses with evaluations

task_category Optional[str]

Optional categorization for task-specific analysis

source Optional[str]

Origin dataset or system that generated this sample

created_at datetime

Timestamp for temporal tracking

metadata Optional[Dict]

Additional context and debugging information

Source code in rm_gallery/core/data/schema.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
class DataSample(BaseModel):
    """
    Complete data sample structure for reward modeling training and evaluation.

    Represents a single interaction with input context, multiple possible outputs,
    and associated metadata for comprehensive reward model training.

    Attributes:
        unique_id: Unique identifier for tracking and deduplication
        input: Conversation context as list of chat messages
        output: List of possible responses with evaluations
        task_category: Optional categorization for task-specific analysis
        source: Origin dataset or system that generated this sample
        created_at: Timestamp for temporal tracking
        metadata: Additional context and debugging information
    """

    unique_id: str = Field(..., description="Unique identifier for the data")
    input: List[ChatMessage] = Field(default_factory=list, description="input")
    output: List[DataOutput] = Field(default_factory=list, description="output")
    task_category: Optional[str] = Field(default=None, description="task category")
    source: Optional[str] = Field(default=None, description="source")
    created_at: datetime = Field(default_factory=datetime.now, description="createdAt")
    metadata: Optional[Dict] = Field(default=None, description="metadata")

    def update(self, sample: "DataSample") -> "DataSample":
        """
        Merge another sample's data into this sample for combining evaluations.

        Updates additional_kwargs and reward details from the source sample
        while preserving the original structure.

        Args:
            sample: Source sample to merge data from

        Returns:
            Self with updated data for method chaining
        """
        self.input[-1].additional_kwargs.update(sample.input[-1].additional_kwargs)
        for i, output in enumerate(self.output):
            output.answer.additional_kwargs.update(
                sample.output[i].answer.additional_kwargs
            )
            output.answer.reward.details.extend(sample.output[i].answer.reward.details)

            if output.steps:
                for j, step in output.steps:
                    step.additional_kwargs.update(
                        sample.output[i].steps[j].additional_kwargs
                    )
                    step.reward.details.extend(sample.output[i].steps[j].reward.details)
        return self

    class Config:
        arbitrary_types_allowed = True
        json_encoders = {datetime: lambda v: v.isoformat()}

update(sample)

Merge another sample's data into this sample for combining evaluations.

Updates additional_kwargs and reward details from the source sample while preserving the original structure.

Parameters:

Name Type Description Default
sample DataSample

Source sample to merge data from

required

Returns:

Type Description
DataSample

Self with updated data for method chaining

Source code in rm_gallery/core/data/schema.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def update(self, sample: "DataSample") -> "DataSample":
    """
    Merge another sample's data into this sample for combining evaluations.

    Updates additional_kwargs and reward details from the source sample
    while preserving the original structure.

    Args:
        sample: Source sample to merge data from

    Returns:
        Self with updated data for method chaining
    """
    self.input[-1].additional_kwargs.update(sample.input[-1].additional_kwargs)
    for i, output in enumerate(self.output):
        output.answer.additional_kwargs.update(
            sample.output[i].answer.additional_kwargs
        )
        output.answer.reward.details.extend(sample.output[i].answer.reward.details)

        if output.steps:
            for j, step in output.steps:
                step.additional_kwargs.update(
                    sample.output[i].steps[j].additional_kwargs
                )
                step.reward.details.extend(sample.output[i].steps[j].reward.details)
    return self

DetoxifyReward

Bases: BasePointWiseReward

Detoxify: Detecting different types of of toxicity like threats, obscenity, insults ans so on.

Source code in rm_gallery/gallery/rm/alignment/harmlessness/detoxification.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
@RewardRegistry.register("detoxify_reward")
class DetoxifyReward(BasePointWiseReward):
    """Detoxify: Detecting different types of of toxicity like threats, obscenity, insults ans so on."""

    name: str = Field(default="detoxify", description="Name of the reward module")
    model_name: str = Field(
        default="unbiased", description="Name of the Detoxify model to use"
    )

    @property
    def model(self):
        if not hasattr(self, "_model"):
            from detoxify import Detoxify

            self._model = Detoxify(self.model_name)
        return self._model

    def _evaluate(self, sample: DataSample, **kwargs) -> RewardResult:
        """
        Evaluate text toxicity using Detoxify model.

        Args:
            sample: Input data sample containing text to evaluate
            **kwargs: Additional implementation-specific parameters

        Returns:
            RewardResult: Computed reward metrics and metadata
        """
        try:
            # Get text from sample
            text = sample.output[0] if sample.output else sample.input

            if not text:
                raise ValueError("No text provided for evaluation")

            # Get model predictions
            predictions = self.model.predict(text)

            # Convert toxicity score to reward (higher = less toxic)
            toxicity_score = predictions["toxicity"]
            reward_score = 1.0 - toxicity_score  # Invert score so higher is better

            # Create reward dimension
            reward_dimension = RewardDimensionWithScore(
                name="detoxify",
                score=reward_score,
                reason=f"Text toxicity score: {toxicity_score:.2f}. Higher reward indicates less toxic content.",
            )

            return RewardResult(name=self.name, details=[reward_dimension])

        except Exception as e:
            logger.error(f"Error in Detoxify evaluation: {str(e)}")
            return RewardResult(name=self.name, details=[])

F1ScoreReward

Bases: BasePointWiseReward

Calculate F1 score between generated content and reference answer at word level.

This reward computes precision, recall and F1 score by comparing word overlap between generated and reference texts. Uses configurable tokenizer to support multilingual content including Chinese and English.

Source code in rm_gallery/gallery/rm/general.py
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
@RewardRegistry.register("f1_score")
class F1ScoreReward(BasePointWiseReward):
    """
    Calculate F1 score between generated content and reference answer at word level.

    This reward computes precision, recall and F1 score by comparing word overlap
    between generated and reference texts. Uses configurable tokenizer to support
    multilingual content including Chinese and English.
    """

    name: str = Field(default="f1_score", description="F1 score reward")
    tokenizer_type: str = Field(
        default="tiktoken",
        description="Tokenizer type: 'tiktoken', 'jieba', or 'simple'",
    )
    encoding_name: str = Field(
        default="cl100k_base",
        description="Tiktoken encoding name (for tiktoken tokenizer)",
    )
    chinese_only: bool = Field(
        default=False,
        description="Whether to keep only Chinese characters (for jieba tokenizer)",
    )

    def __init__(self, **data):
        super().__init__(**data)
        # Initialize tokenizer
        self._tokenizer = get_tokenizer(
            tokenizer_type=self.tokenizer_type,
            encoding_name=self.encoding_name,
            chinese_only=self.chinese_only,
        )

    def _evaluate(
        self, sample: DataSample, **kwargs
    ) -> RewardResult[RewardDimensionWithScore]:
        """
        Calculate F1 score.

        Args:
            sample: Data sample containing generated content and reference answer

        Returns:
            RewardResult: Reward result containing F1 score
        """
        generated = sample.output[0].answer.content.strip()
        reference = sample.output[0].answer.label.get("reference", "").strip()

        # Tokenize using unified tokenizer
        generated_preprocessed = self._tokenizer.preprocess_text(
            generated, to_lower=True
        )
        reference_preprocessed = self._tokenizer.preprocess_text(
            reference, to_lower=True
        )

        generated_tokens = set(self._tokenizer.tokenize(generated_preprocessed))
        reference_tokens = set(self._tokenizer.tokenize(reference_preprocessed))

        # Calculate precision, recall and F1 score
        if not generated_tokens and not reference_tokens:
            precision = recall = f1 = 1.0
        elif not generated_tokens or not reference_tokens:
            precision = recall = f1 = 0.0
        else:
            intersection = generated_tokens.intersection(reference_tokens)
            precision = len(intersection) / len(generated_tokens)
            recall = len(intersection) / len(reference_tokens)
            f1 = (
                2 * precision * recall / (precision + recall)
                if (precision + recall) > 0
                else 0.0
            )

        return RewardResult(
            name=self.name,
            details=[
                RewardDimensionWithScore(
                    name=self.name,
                    score=f1,
                    reason=f"F1 score: {f1:.3f} (Precision: {precision:.3f}, Recall: {recall:.3f})",
                )
            ],
            extra_data={
                "f1_score": f1,
                "precision": precision,
                "recall": recall,
                "generated_tokens": list(generated_tokens),
                "reference_tokens": list(reference_tokens),
                "tokenizer_type": self.tokenizer_type,
                "tokenizer_name": self._tokenizer.name,
            },
        )

FactualityListWiseReward

Bases: BaseHonestyListWiseReward

Factuality: Detects hallucinations and other basic errors in completions.

Source code in rm_gallery/gallery/rm/alignment/honesty/factuality.py
19
20
21
22
23
24
25
26
@RewardRegistry.register("factuality_listwise_reward")
class FactualityListWiseReward(BaseHonestyListWiseReward):
    """Factuality: Detects hallucinations and other basic errors in completions."""

    name: str = Field(default="factuality_listwise_reward")
    desc: str = Field(default=DESC)
    scenario: str = Field(default=SCENARIO, description="assistant scenario")
    principles: List[str] = Field(default=PRINCIPLES)

FocusListWiseReward

Bases: BaseHelpfulnessListWiseReward

Focus: Detects high-quality, on-topic answers to general user queries

Source code in rm_gallery/gallery/rm/alignment/helpfulness/focus.py
19
20
21
22
23
24
25
26
@RewardRegistry.register("focus_listwise_reward")
class FocusListWiseReward(BaseHelpfulnessListWiseReward):
    """Focus: Detects high-quality, on-topic answers to general user queries"""

    name: str = Field(default="focus_listwise_reward")
    desc: str = Field(default=DESC)
    scenario: str = Field(default=SCENARIO, description="assistant scenario")
    principles: List[str] = Field(default=PRINCIPLES)

GenerationListWiseReward

Bases: BaseHelpfulnessListWiseReward

Generation: Creating new textual content, from articles to stories, with an emphasis on originality and creativity.

Source code in rm_gallery/gallery/rm/alignment/helpfulness/generation.py
21
22
23
24
25
26
27
28
@RewardRegistry.register("generation_listwise_reward")
class GenerationListWiseReward(BaseHelpfulnessListWiseReward):
    """Generation: Creating new textual content, from articles to stories, with an emphasis on originality and creativity."""

    name: str = Field(default="generation_listwise_reward", description="reward name")
    scenario: str = Field(default=SCENARIO, description="assistant scenario")
    principles: List[str] = Field(default=PRINCIPLES)
    desc: str = Field(default=DESC)

HelpSteer2PairwiseConverter

Bases: DataConverter

Converter for HelpSteer2 pairwise data format Can handle data from both local files and HuggingFace Hub Converts each data entry into two DataSamples with swapped responses

Source code in rm_gallery/gallery/data/load/helpsteer2_pairwise.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
@DataConverterRegistry.register("helpsteer2_pairwise")
class HelpSteer2PairwiseConverter(DataConverter):
    """
    Converter for HelpSteer2 pairwise data format
    Can handle data from both local files and HuggingFace Hub
    Converts each data entry into two DataSamples with swapped responses
    """

    def convert_to_data_sample(
        self, data_dict: Dict[str, Any], source_info: Dict[str, Any]
    ) -> Union[DataSample, List[DataSample]]:
        """Convert HelpSteer2 pairwise data to DataSample format"""

        try:
            # Create input from prompt
            data_input = [ChatMessage(role="user", content=data_dict["prompt"])]

            # Determine preference based on preference_strength
            preference_strength = data_dict.get("preference_strength", 0)
            if preference_strength > 0:
                # response_2 is better
                preferred_in_original = "response_2"
            elif preference_strength < 0:
                # response_1 is better
                preferred_in_original = "response_1"
            else:
                # tie
                preferred_in_original = "tie"

            data_samples = []

            # Create first sample: response_A = response_1, response_B = response_2
            sample1_id = hashlib.md5(f"{str(data_dict)}_sample1".encode()).hexdigest()

            # Determine preferred for first sample
            if preferred_in_original == "response_1":
                preferred_1 = "A"  # response_A (response_1) is preferred
            elif preferred_in_original == "response_2":
                preferred_1 = "B"  # response_B (response_2) is preferred
            else:
                preferred_1 = "tie"

            # Create outputs for first sample
            output_1 = [
                DataOutput(
                    answer=Step(
                        role="assistant",
                        content=data_dict["response_1"],
                        label={"response_type": "A"},
                    )
                ),
                DataOutput(
                    answer=Step(
                        role="assistant",
                        content=data_dict["response_2"],
                        label={"response_type": "B"},
                    )
                ),
            ]

            # Build metadata for first sample
            metadata_1 = {
                "raw_data": data_dict,
                "load_strategy": "HelpSteer2PairwiseConverter",
                "response_A": data_dict["response_1"],
                "response_B": data_dict["response_2"],
                "preferred": preferred_1,
                "preference_strength": preference_strength,
                "preference_statement": data_dict.get("preference_statement"),
                "preference_elaboration": data_dict.get("preference_elaboration"),
                "sample_type": "original_order",
            }

            # Add source-specific metadata
            if source_info.get("load_type") == "local":
                metadata_1.update(
                    {
                        "source_file_path": source_info.get("source_file_path"),
                        "load_type": "local",
                    }
                )
            elif source_info.get("load_type") == "huggingface":
                metadata_1.update(
                    {
                        "dataset_name": source_info.get(
                            "dataset_name", "nvidia/HelpSteer2"
                        ),
                        "dataset_config": source_info.get("dataset_config"),
                        "split": source_info.get("split", "train"),
                        "load_type": "huggingface",
                    }
                )

            sample_1 = DataSample(
                unique_id=sample1_id,
                input=data_input,
                output=output_1,
                source="helpsteer2_pairwise",
                task_category="chat_pairwise",
                metadata=metadata_1,
            )
            data_samples.append(sample_1)

            # Create second sample: response_A = response_2, response_B = response_1 (swapped)
            sample2_id = hashlib.md5(f"{str(data_dict)}_sample2".encode()).hexdigest()

            # Determine preferred for second sample (swapped)
            if preferred_in_original == "response_1":
                preferred_2 = "B"  # response_B (response_1) is preferred
            elif preferred_in_original == "response_2":
                preferred_2 = "A"  # response_A (response_2) is preferred
            else:
                preferred_2 = "tie"

            # Create outputs for second sample (swapped)
            output_2 = [
                DataOutput(
                    answer=Step(
                        role="assistant",
                        content=data_dict["response_2"],
                        label={"response_type": "A"},
                    )
                ),
                DataOutput(
                    answer=Step(
                        role="assistant",
                        content=data_dict["response_1"],
                        label={"response_type": "B"},
                    )
                ),
            ]

            # Build metadata for second sample
            metadata_2 = {
                "raw_data": data_dict,
                "load_strategy": "HelpSteer2PairwiseConverter",
                "response_A": data_dict["response_2"],
                "response_B": data_dict["response_1"],
                "preferred": preferred_2,
                "preference_strength": preference_strength,
                "preference_statement": data_dict.get("preference_statement"),
                "preference_elaboration": data_dict.get("preference_elaboration"),
                "sample_type": "swapped_order",
            }

            # Add source-specific metadata
            if source_info.get("load_type") == "local":
                metadata_2.update(
                    {
                        "source_file_path": source_info.get("source_file_path"),
                        "load_type": "local",
                    }
                )
            elif source_info.get("load_type") == "huggingface":
                metadata_2.update(
                    {
                        "dataset_name": source_info.get(
                            "dataset_name", "nvidia/HelpSteer2"
                        ),
                        "dataset_config": source_info.get("dataset_config"),
                        "split": source_info.get("split", "train"),
                        "load_type": "huggingface",
                    }
                )

            sample_2 = DataSample(
                unique_id=sample2_id,
                input=data_input,
                output=output_2,
                source="helpsteer2_pairwise",
                task_category="chat_pairwise",
                metadata=metadata_2,
            )
            data_samples.append(sample_2)

            return data_samples

        except Exception as e:
            logger.error(f"Error creating HelpSteer2 Pairwise DataSample: {str(e)}")
            return None

convert_to_data_sample(data_dict, source_info)

Convert HelpSteer2 pairwise data to DataSample format

Source code in rm_gallery/gallery/data/load/helpsteer2_pairwise.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
def convert_to_data_sample(
    self, data_dict: Dict[str, Any], source_info: Dict[str, Any]
) -> Union[DataSample, List[DataSample]]:
    """Convert HelpSteer2 pairwise data to DataSample format"""

    try:
        # Create input from prompt
        data_input = [ChatMessage(role="user", content=data_dict["prompt"])]

        # Determine preference based on preference_strength
        preference_strength = data_dict.get("preference_strength", 0)
        if preference_strength > 0:
            # response_2 is better
            preferred_in_original = "response_2"
        elif preference_strength < 0:
            # response_1 is better
            preferred_in_original = "response_1"
        else:
            # tie
            preferred_in_original = "tie"

        data_samples = []

        # Create first sample: response_A = response_1, response_B = response_2
        sample1_id = hashlib.md5(f"{str(data_dict)}_sample1".encode()).hexdigest()

        # Determine preferred for first sample
        if preferred_in_original == "response_1":
            preferred_1 = "A"  # response_A (response_1) is preferred
        elif preferred_in_original == "response_2":
            preferred_1 = "B"  # response_B (response_2) is preferred
        else:
            preferred_1 = "tie"

        # Create outputs for first sample
        output_1 = [
            DataOutput(
                answer=Step(
                    role="assistant",
                    content=data_dict["response_1"],
                    label={"response_type": "A"},
                )
            ),
            DataOutput(
                answer=Step(
                    role="assistant",
                    content=data_dict["response_2"],
                    label={"response_type": "B"},
                )
            ),
        ]

        # Build metadata for first sample
        metadata_1 = {
            "raw_data": data_dict,
            "load_strategy": "HelpSteer2PairwiseConverter",
            "response_A": data_dict["response_1"],
            "response_B": data_dict["response_2"],
            "preferred": preferred_1,
            "preference_strength": preference_strength,
            "preference_statement": data_dict.get("preference_statement"),
            "preference_elaboration": data_dict.get("preference_elaboration"),
            "sample_type": "original_order",
        }

        # Add source-specific metadata
        if source_info.get("load_type") == "local":
            metadata_1.update(
                {
                    "source_file_path": source_info.get("source_file_path"),
                    "load_type": "local",
                }
            )
        elif source_info.get("load_type") == "huggingface":
            metadata_1.update(
                {
                    "dataset_name": source_info.get(
                        "dataset_name", "nvidia/HelpSteer2"
                    ),
                    "dataset_config": source_info.get("dataset_config"),
                    "split": source_info.get("split", "train"),
                    "load_type": "huggingface",
                }
            )

        sample_1 = DataSample(
            unique_id=sample1_id,
            input=data_input,
            output=output_1,
            source="helpsteer2_pairwise",
            task_category="chat_pairwise",
            metadata=metadata_1,
        )
        data_samples.append(sample_1)

        # Create second sample: response_A = response_2, response_B = response_1 (swapped)
        sample2_id = hashlib.md5(f"{str(data_dict)}_sample2".encode()).hexdigest()

        # Determine preferred for second sample (swapped)
        if preferred_in_original == "response_1":
            preferred_2 = "B"  # response_B (response_1) is preferred
        elif preferred_in_original == "response_2":
            preferred_2 = "A"  # response_A (response_2) is preferred
        else:
            preferred_2 = "tie"

        # Create outputs for second sample (swapped)
        output_2 = [
            DataOutput(
                answer=Step(
                    role="assistant",
                    content=data_dict["response_2"],
                    label={"response_type": "A"},
                )
            ),
            DataOutput(
                answer=Step(
                    role="assistant",
                    content=data_dict["response_1"],
                    label={"response_type": "B"},
                )
            ),
        ]

        # Build metadata for second sample
        metadata_2 = {
            "raw_data": data_dict,
            "load_strategy": "HelpSteer2PairwiseConverter",
            "response_A": data_dict["response_2"],
            "response_B": data_dict["response_1"],
            "preferred": preferred_2,
            "preference_strength": preference_strength,
            "preference_statement": data_dict.get("preference_statement"),
            "preference_elaboration": data_dict.get("preference_elaboration"),
            "sample_type": "swapped_order",
        }

        # Add source-specific metadata
        if source_info.get("load_type") == "local":
            metadata_2.update(
                {
                    "source_file_path": source_info.get("source_file_path"),
                    "load_type": "local",
                }
            )
        elif source_info.get("load_type") == "huggingface":
            metadata_2.update(
                {
                    "dataset_name": source_info.get(
                        "dataset_name", "nvidia/HelpSteer2"
                    ),
                    "dataset_config": source_info.get("dataset_config"),
                    "split": source_info.get("split", "train"),
                    "load_type": "huggingface",
                }
            )

        sample_2 = DataSample(
            unique_id=sample2_id,
            input=data_input,
            output=output_2,
            source="helpsteer2_pairwise",
            task_category="chat_pairwise",
            metadata=metadata_2,
        )
        data_samples.append(sample_2)

        return data_samples

    except Exception as e:
        logger.error(f"Error creating HelpSteer2 Pairwise DataSample: {str(e)}")
        return None

HelpSteer2PointwiseConverter

Bases: DataConverter

Unified converter for HelpSteer2 data format Can handle data from both local files and HuggingFace Hub

Source code in rm_gallery/gallery/data/load/helpsteer2_pointwise.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
@DataConverterRegistry.register("helpsteer2_pointwise")
class HelpSteer2PointwiseConverter(DataConverter):
    """
    Unified converter for HelpSteer2 data format
    Can handle data from both local files and HuggingFace Hub
    """

    def convert_to_data_sample(
        self, data_dict: Dict[str, Any], source_info: Dict[str, Any]
    ) -> Union[DataSample, List[DataSample]]:
        """Convert HelpSteer2 data to DataSample format"""
        # Generate unique id
        content = str(data_dict)
        unique_id = hashlib.md5(content.encode()).hexdigest()

        try:
            # Create input from prompt
            data_input = [ChatMessage(role="user", content=data_dict["prompt"])]

            # Extract evaluation metrics for label
            label = {
                "helpfulness": data_dict.get("helpfulness"),
                "correctness": data_dict.get("correctness"),
                "coherence": data_dict.get("coherence"),
                "complexity": data_dict.get("complexity"),
                "verbosity": data_dict.get("verbosity"),
            }

            # Create output from response
            data_output = [
                DataOutput(
                    answer=Step(
                        role="assistant", content=data_dict["response"], label=label
                    )
                )
            ]

            # Build metadata based on source type
            metadata = {
                "raw_data": data_dict,
                "load_strategy": "HelpSteer2Converter",
            }

            # Add source-specific metadata
            if source_info.get("load_type") == "local":
                metadata.update(
                    {
                        "source_file_path": source_info.get("source_file_path"),
                        "load_type": "local",
                    }
                )
            elif source_info.get("load_type") == "huggingface":
                metadata.update(
                    {
                        "dataset_name": source_info.get(
                            "dataset_name", "nvidia/HelpSteer2"
                        ),
                        "dataset_config": source_info.get("dataset_config"),
                        "split": source_info.get("split", "train"),
                        "load_type": "huggingface",
                    }
                )

            data_sample = DataSample(
                unique_id=unique_id,
                input=data_input,
                output=data_output,
                source="helpsteer2",
                task_category="chat",
                metadata=metadata,
            )

            return [data_sample]

        except Exception as e:
            logger.error(f"Error creating HelpSteer2 DataSample: {str(e)}")
            return None

convert_to_data_sample(data_dict, source_info)

Convert HelpSteer2 data to DataSample format

Source code in rm_gallery/gallery/data/load/helpsteer2_pointwise.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def convert_to_data_sample(
    self, data_dict: Dict[str, Any], source_info: Dict[str, Any]
) -> Union[DataSample, List[DataSample]]:
    """Convert HelpSteer2 data to DataSample format"""
    # Generate unique id
    content = str(data_dict)
    unique_id = hashlib.md5(content.encode()).hexdigest()

    try:
        # Create input from prompt
        data_input = [ChatMessage(role="user", content=data_dict["prompt"])]

        # Extract evaluation metrics for label
        label = {
            "helpfulness": data_dict.get("helpfulness"),
            "correctness": data_dict.get("correctness"),
            "coherence": data_dict.get("coherence"),
            "complexity": data_dict.get("complexity"),
            "verbosity": data_dict.get("verbosity"),
        }

        # Create output from response
        data_output = [
            DataOutput(
                answer=Step(
                    role="assistant", content=data_dict["response"], label=label
                )
            )
        ]

        # Build metadata based on source type
        metadata = {
            "raw_data": data_dict,
            "load_strategy": "HelpSteer2Converter",
        }

        # Add source-specific metadata
        if source_info.get("load_type") == "local":
            metadata.update(
                {
                    "source_file_path": source_info.get("source_file_path"),
                    "load_type": "local",
                }
            )
        elif source_info.get("load_type") == "huggingface":
            metadata.update(
                {
                    "dataset_name": source_info.get(
                        "dataset_name", "nvidia/HelpSteer2"
                    ),
                    "dataset_config": source_info.get("dataset_config"),
                    "split": source_info.get("split", "train"),
                    "load_type": "huggingface",
                }
            )

        data_sample = DataSample(
            unique_id=unique_id,
            input=data_input,
            output=data_output,
            source="helpsteer2",
            task_category="chat",
            metadata=metadata,
        )

        return [data_sample]

    except Exception as e:
        logger.error(f"Error creating HelpSteer2 DataSample: {str(e)}")
        return None

LengthPenaltyReward

Bases: BasePointWiseReward

Text length based penalty for content that is too short or too long.

Source code in rm_gallery/gallery/rm/format/format.py
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
@RewardRegistry.register("length_penalty")
class LengthPenaltyReward(BasePointWiseReward):
    """
    Text length based penalty for content that is too short or too long.
    """

    name: str = Field(default="length_penalty", description="Length penalty reward")
    min_length: int = Field(default=10, description="Minimum length")
    max_length: int = Field(default=1000, description="Maximum length")
    penalty_rate: float = Field(default=0.01, description="Penalty rate")

    def _evaluate(
        self, sample: DataSample, **kwargs
    ) -> RewardResult[RewardDimensionWithScore]:
        """
        Check code syntax.

        Args:
            sample: Data sample containing code content

        Returns:
            RewardResult: Reward result containing syntax check score
        """
        content = sample.output[0].answer.content
        length = len(content)

        penalty = 0.0
        reason_parts = []

        if length < self.min_length:
            penalty = -(self.min_length - length) * self.penalty_rate
            reason_parts.append(f"Too short: {length} < {self.min_length}")
        elif length > self.max_length:
            penalty = -(length - self.max_length) * self.penalty_rate
            reason_parts.append(f"Too long: {length} > {self.max_length}")
        else:
            reason_parts.append(
                f"Length acceptable: {self.min_length} <= {length} <= {self.max_length}"
            )

        return RewardResult(
            name=self.name,
            details=[
                RewardDimensionWithScore(
                    name=self.name, score=penalty, reason="; ".join(reason_parts)
                )
            ],
            extra_data={
                "length": length,
                "min_length": self.min_length,
                "max_length": self.max_length,
                "penalty": penalty,
            },
        )

MathListWiseReward

Bases: BaseHelpfulnessListWiseReward

Math: Solves problems at math, on open-ended human prompts ranging from middle school physics and geometry to college-level chemistry, calculus, combinatorics, and more.

Source code in rm_gallery/gallery/rm/alignment/helpfulness/math.py
18
19
20
21
22
23
24
25
@RewardRegistry.register("math_listwise_reward")
class MathListWiseReward(BaseHelpfulnessListWiseReward):
    """Math: Solves problems at math, on open-ended human prompts ranging from middle school physics and geometry to college-level chemistry, calculus, combinatorics, and more."""

    name: str = Field(default="math_listwise_reward")
    desc: str = Field(default=DESC)
    scenario: str = Field(default=SCENARIO, description="assistant scenario")
    principles: List[str] = Field(default=PRINCIPLES)

MathVerifyReward

Bases: BasePointWiseReward

Verifies mathematical expressions using the math_verify library, supporting both LaTeX and plain expressions

Source code in rm_gallery/gallery/rm/math/math.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
@RewardRegistry.register("math_verify_reward")
class MathVerifyReward(BasePointWiseReward):
    """
    Verifies mathematical expressions using the math_verify library, supporting both LaTeX and plain expressions
    """

    name: str = Field(default="math_verify", description="Math verification reward")
    timeout_score: float = Field(default=0.0, description="Score to assign on timeout")

    def _evaluate(
        self, sample: DataSample, **kwargs
    ) -> RewardResult[RewardDimensionWithScore]:
        """
        Verify mathematical expressions

        Args:
            sample: Data sample containing mathematical content

        Returns:
            RewardResult: Reward result containing verification score
        """
        generated = sample.output[0].answer.content.strip()
        reference = sample.output[0].answer.label.get("reference", "").strip()

        score = 0.0
        reason = "Verification failed or timed out"

        try:
            # Parse the reference (gold) answer
            # Use both LatexExtractionConfig and ExprExtractionConfig for maximum flexibility
            gold_parsed = parse(
                reference,
                extraction_config=[LatexExtractionConfig(), ExprExtractionConfig()],
            )

            # Parse the generated answer
            pred_parsed = parse(
                generated,
                extraction_config=[LatexExtractionConfig(), ExprExtractionConfig()],
            )

            # If both parsing succeeded and we have results
            if gold_parsed and pred_parsed:
                # Use the first parsed result from each
                gold_expr = gold_parsed[0]
                pred_expr = pred_parsed[0]

                # Verify if they match
                if verify(gold_expr, pred_expr):
                    score = 1.0
                    reason = f"({gold_parsed}, {pred_parsed})"
                else:
                    score = 0.0
                    reason = f"({gold_parsed}, {pred_parsed})"
            else:
                score = 0.0
                reason = f"Parsing failed - gold: {gold_parsed}, pred: {pred_parsed}"

        except Exception as e:
            score = self.timeout_score
            reason = f"Exception occurred: {str(e)}"

        return RewardResult(
            name=self.name,
            details=[
                RewardDimensionWithScore(
                    name=self.name,
                    score=score,
                    reason=str(reason),
                )
            ],
            extra_data={
                "generated": generated,
                "reference": reference,
                "score": score,
            },
        )

NgramRepetitionPenaltyReward

Bases: BasePointWiseReward

Calculate N-gram repetition penalty supporting Chinese processing and multiple penalty strategies.

Source code in rm_gallery/gallery/rm/format/format.py
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
@RewardRegistry.register("ngram_repetition_penalty")
class NgramRepetitionPenaltyReward(BasePointWiseReward):
    """
    Calculate N-gram repetition penalty supporting Chinese processing and multiple penalty strategies.
    """

    name: str = Field(
        default="ngram_repetition_penalty",
        description="N-gram repetition penalty reward",
    )
    n: int = Field(default=3, description="N value for N-gram")

    # Hard threshold penalty parameters
    penalty_threshold: float = Field(
        default=0.3, description="Repetition rate threshold (hard threshold mode)"
    )
    penalty_rate: float = Field(
        default=1.0, description="Penalty multiplier (hard threshold mode)"
    )

    # Soft penalty parameters
    use_soft_penalty: bool = Field(
        default=False, description="Whether to use soft penalty mode"
    )
    max_penalty: float = Field(
        default=-1.0,
        description="Maximum penalty value (soft penalty mode, should be negative)",
    )
    min_scaling: float = Field(
        default=0.0, description="Minimum scaling threshold (soft penalty mode)"
    )

    # Tokenizer parameters
    tokenizer_type: str = Field(
        default="tiktoken",
        description="Tokenizer type: 'tiktoken', 'jieba', or 'simple'",
    )
    encoding_name: str = Field(
        default="cl100k_base",
        description="Tiktoken encoding name (for tiktoken tokenizer)",
    )
    chinese_only: bool = Field(
        default=False,
        description="Whether to keep only Chinese characters (for jieba tokenizer)",
    )

    # Analysis scope parameters
    analyze_scope: str = Field(
        default="full",
        description="Analysis scope: 'full' or 'thought' (thought process only)",
    )

    def __init__(self, **data):
        super().__init__(**data)
        # Initialize tokenizer
        self._tokenizer = get_tokenizer(
            tokenizer_type=self.tokenizer_type,
            encoding_name=self.encoding_name,
            chinese_only=self.chinese_only,
        )

    def _extract_thought_process(self, content: str) -> str:
        """Extract thought process"""
        think_pattern = r"<think>(.*?)</think>"
        matches = re.findall(think_pattern, content, re.DOTALL)
        return " ".join(matches) if matches else ""

    def _generate_ngrams(self, tokens: List[str]) -> List[tuple]:
        """Generate N-grams"""
        if len(tokens) < self.n:
            return []

        # Use unified approach for all tokenizers
        ngrams = []
        for i in range(len(tokens) - self.n + 1):
            ngrams.append(tuple(tokens[i : i + self.n]))
        return ngrams

    def _calculate_penalty(self, repetition_rate: float) -> float:
        """Calculate penalty value"""
        if self.use_soft_penalty:
            # Soft penalty mode
            if self.max_penalty > 0:
                raise ValueError(
                    f"max_penalty {self.max_penalty} should not be positive"
                )

            scaling = repetition_rate
            if scaling < self.min_scaling:
                scaling = 0.0
            elif scaling > self.min_scaling:
                scaling = (scaling - self.min_scaling) / (1 - self.min_scaling)

            return scaling * self.max_penalty
        else:
            # Hard threshold mode (original logic)
            if repetition_rate > self.penalty_threshold:
                return -(repetition_rate - self.penalty_threshold) * self.penalty_rate
            return 0.0

    def _evaluate(
        self, sample: DataSample, **kwargs
    ) -> RewardResult[RewardDimensionWithScore]:
        """
        Calculate N-gram repetition penalty

        Args:
            sample: Data sample containing text content

        Returns:
            RewardResult: Reward result containing N-gram repetition penalty score
        """
        content = sample.output[0].answer.content

        # Select text based on analysis scope
        if self.analyze_scope == "thought":
            text_to_analyze = self._extract_thought_process(content)
            if not text_to_analyze:
                return RewardResult(
                    name=self.name,
                    details=[
                        RewardDimensionWithScore(
                            name=self.name,
                            score=0.0,
                            reason="No thought process found to analyze",
                        )
                    ],
                    extra_data={
                        "analyze_scope": self.analyze_scope,
                        "text_to_analyze": text_to_analyze,
                    },
                )
        else:
            text_to_analyze = content

        # Tokenization using unified tokenizer
        preprocessed_text = self._tokenizer.preprocess_text(
            text_to_analyze,
            to_lower=(
                self.tokenizer_type != "jieba"
            ),  # Keep case for Chinese tokenization
        )
        tokens = self._tokenizer.tokenize(preprocessed_text)

        if len(tokens) < self.n:
            return RewardResult(
                name=self.name,
                details=[
                    RewardDimensionWithScore(
                        name=self.name,
                        score=0.0,
                        reason=f"Text too short for {self.n}-gram analysis",
                    )
                ],
                extra_data={
                    "token_count": len(tokens),
                    "n": self.n,
                    "analyze_scope": self.analyze_scope,
                    "tokenizer_type": self.tokenizer_type,
                },
            )

        # Generate N-grams
        ngrams = self._generate_ngrams(tokens)

        if not ngrams:
            return RewardResult(
                name=self.name,
                details=[
                    RewardDimensionWithScore(
                        name=self.name,
                        score=0.0,
                        reason="No ngrams generated",
                    )
                ],
                extra_data={
                    "token_count": len(tokens),
                    "n": self.n,
                    "analyze_scope": self.analyze_scope,
                    "tokenizer_type": self.tokenizer_type,
                },
            )

        # Calculate repetition rate
        ngram_counts = Counter(ngrams)
        total_ngrams = len(ngrams)
        unique_ngrams = len(ngram_counts)
        repetition_rate = (
            1 - (unique_ngrams / total_ngrams) if total_ngrams > 0 else 0.0
        )

        # Calculate penalty
        penalty = self._calculate_penalty(repetition_rate)

        # Build reason description
        penalty_mode = "soft" if self.use_soft_penalty else "hard"

        return RewardResult(
            name=self.name,
            details=[
                RewardDimensionWithScore(
                    name=self.name,
                    score=penalty,
                    reason=f"{self.n}-gram repetition rate: {repetition_rate:.3f}, penalty: {penalty:.3f} ({penalty_mode} penalty, {self.tokenizer_type} tokenizer, scope: {self.analyze_scope})",
                )
            ],
            extra_data={
                "repetition_rate": repetition_rate,
                "unique_ngrams": unique_ngrams,
                "total_ngrams": total_ngrams,
                "penalty": penalty,
                "most_common_ngrams": ngram_counts.most_common(5),
                "analyze_scope": self.analyze_scope,
                "tokenizer_type": self.tokenizer_type,
                "use_soft_penalty": self.use_soft_penalty,
                "penalty_mode": penalty_mode,
            },
        )

NumberAccuracyReward

Bases: BasePointWiseReward

Check numerical calculation accuracy by comparing numbers in generated vs reference content.

This reward verifies if the numbers in the generated content match the numbers in the reference content within a specified tolerance.

Source code in rm_gallery/gallery/rm/general.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
@RewardRegistry.register("number_accuracy")
class NumberAccuracyReward(BasePointWiseReward):
    """
    Check numerical calculation accuracy by comparing numbers in generated vs reference content.

    This reward verifies if the numbers in the generated content match
    the numbers in the reference content within a specified tolerance.
    """

    name: str = Field(default="number_accuracy", description="Number accuracy reward")
    tolerance: float = Field(default=1e-6, description="Numerical comparison tolerance")

    def _extract_numbers(self, text: str) -> List[float]:
        """Extract numbers from text"""
        # Match integers and floating point numbers
        number_pattern = r"-?\d+\.?\d*"
        numbers = re.findall(number_pattern, text)
        return [float(n) for n in numbers if n]

    def _evaluate(
        self, sample: DataSample, **kwargs
    ) -> RewardResult[RewardDimensionWithScore]:
        """
        Check numerical accuracy.

        Args:
            sample: Data sample containing numerical values

        Returns:
            RewardResult: Reward result containing numerical accuracy score
        """
        generated = sample.output[0].answer.content
        reference = sample.output[0].answer.label.get("reference", "")

        generated_numbers = self._extract_numbers(generated)
        reference_numbers = self._extract_numbers(reference)

        if not reference_numbers:
            return RewardResult(
                name=self.name,
                details=[
                    RewardDimensionWithScore(
                        name=self.name,
                        score=0.0,
                        reason="No reference numbers to compare",
                    )
                ],
                extra_data={
                    "generated_numbers": generated_numbers,
                    "reference_numbers": reference_numbers,
                },
            )

        if not generated_numbers:
            return RewardResult(
                name=self.name,
                details=[
                    RewardDimensionWithScore(
                        name=self.name,
                        score=0.0,
                        reason="No numbers found in generated content",
                    )
                ],
                extra_data={
                    "generated_numbers": generated_numbers,
                    "reference_numbers": reference_numbers,
                },
            )

        # Compare numbers (match in order)
        correct = 0
        total = min(len(generated_numbers), len(reference_numbers))

        for i in range(total):
            if abs(generated_numbers[i] - reference_numbers[i]) <= self.tolerance:
                correct += 1

        accuracy = correct / len(reference_numbers) if reference_numbers else 0.0

        return RewardResult(
            name=self.name,
            details=[
                RewardDimensionWithScore(
                    name=self.name,
                    score=accuracy,
                    reason=f"Number accuracy: {correct}/{len(reference_numbers)} numbers correct",
                )
            ],
            extra_data={
                "accuracy": accuracy,
                "correct_numbers": correct,
                "total_reference_numbers": len(reference_numbers),
                "generated_numbers": generated_numbers,
                "reference_numbers": reference_numbers,
            },
        )

OpenQAListWiseReward

Bases: BaseHelpfulnessListWiseReward

Open QA: Search for answers across a wide range of text sources. The challenge is to process large amounts of information and understand complex questions.

Source code in rm_gallery/gallery/rm/alignment/helpfulness/open_qa.py
15
16
17
18
19
20
21
22
@RewardRegistry.register("open_qa_listwise_reward")
class OpenQAListWiseReward(BaseHelpfulnessListWiseReward):
    """Open QA: Search for answers across a wide range of text sources. The challenge is to process large amounts of information and understand complex questions."""

    name: str = Field(default="open_qa_listwise_reward")
    scenario: str = Field(default=SCENARIO, description="assistant scenario")
    principles: List[str] = Field(default=PRINCIPLES)
    desc: str = Field(default=DESC)

PRMBenchConverter

Bases: DataConverter

Unified converter for Process Reward Model (PRM) data Handles mathematical reasoning data with step-wise processes

Source code in rm_gallery/gallery/data/load/prmbench.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
@DataConverterRegistry.register("prmbench")
class PRMBenchConverter(DataConverter):
    """
    Unified converter for Process Reward Model (PRM) data
    Handles mathematical reasoning data with step-wise processes
    """

    # define as class attribute instead of instance attribute
    DIMENSION_CLASSIFICATION_MAPPING: ClassVar[Dict[str, str]] = {
        "confidence": "confidence",
        "*": None,  # wildcard, means no filtering
    }

    def convert_to_data_sample(
        self, data_dict: Dict[str, Any], source_info: Dict[str, Any]
    ) -> DataSample:
        """Convert PRM data to DataSample format

        Expected input format:
        {
            "original_question": "...",
            "modified_question": "...",
            "original_process": ["step1", "step2", ...],
            "modified_process": ["step1", "step2", ...],
            "modified_steps": [5, 6],
            "error_steps": [5, 6],
            "reason": "...",
            "idx": "...",
            "question": "...",
            "classification": "confidence"
        }
        """

        # Generate unique id from idx or question
        unique_id = data_dict.get(
            "idx", hashlib.md5(str(data_dict.get("question", "")).encode()).hexdigest()
        )

        try:
            # Create input from question
            data_input = self._create_prm_input(data_dict)

            # Create outputs from processes
            data_output = self._create_prm_output(data_dict)

            # Build metadata based on source type
            metadata = {
                "classification": data_dict.get("classification"),
                "modified_steps": data_dict.get("modified_steps", []),
                "error_steps": data_dict.get("error_steps", []),
                "reason": data_dict.get("reason"),
                "idx": data_dict.get("idx"),
                "original_process_length": len(data_dict.get("original_process", [])),
                "modified_process_length": len(data_dict.get("modified_process", [])),
                "load_strategy": "PRMBenchConverter",
            }

            # Add source-specific metadata
            if source_info.get("load_type") == "local":
                metadata.update(
                    {
                        "source_file_path": source_info.get("source_file_path"),
                        "load_type": "local",
                    }
                )
            elif source_info.get("load_type") == "huggingface":
                metadata.update(
                    {
                        "dataset_name": source_info.get("dataset_name"),
                        "dataset_config": source_info.get("dataset_config"),
                        "split": source_info.get("split", "train"),
                        "load_type": "huggingface",
                    }
                )

            # Create DataSample object
            data_sample = DataSample(
                unique_id=str(unique_id),
                input=data_input,
                output=data_output,
                source="prmbench",
                task_category=data_dict.get("classification", "reasoning"),
                metadata=metadata,
            )

            return data_sample

        except Exception as e:
            logger.error(f"Error creating DataSample from PRM data: {str(e)}")
            return None

    def _create_prm_input(self, data_dict: Dict[str, Any]) -> list[ChatMessage]:
        """Create DataInput from PRM question"""
        question = data_dict.get("question") or data_dict.get("original_question", "")
        return [ChatMessage(role="user", content=question)]

    def _create_prm_output(self, data_dict: Dict[str, Any]) -> list[DataOutput]:
        """Create DataOutput list from PRM processes"""
        outputs = []

        # Original process output
        if "original_process" in data_dict:
            original_steps = []
            for i, step_content in enumerate(data_dict["original_process"]):
                step = Step(
                    role="assistant",
                    content=step_content,
                    label={"correctness": "correct", "step_idx": i + 1},
                )
                original_steps.append(step)

            outputs.append(
                DataOutput(
                    answer=Step(
                        role="assistant",
                        content="\n".join(data_dict["original_process"]),
                        label={"process_type": "original_correct"},
                    ),
                    steps=original_steps,
                )
            )

        # Modified process output (with errors)
        if "modified_process" in data_dict:
            modified_steps = []
            error_steps = set(data_dict.get("error_steps", []))

            for i, step_content in enumerate(data_dict["modified_process"]):
                step_idx = i + 1
                is_correct = step_idx not in error_steps

                step = Step(
                    role="assistant",
                    content=step_content,
                    label={
                        "correctness": "correct" if is_correct else "error",
                        "step_idx": step_idx,
                    },
                )
                modified_steps.append(step)

            # Calculate correctness score based on error ratio
            total_steps = len(data_dict["modified_process"])
            error_count = len(error_steps)

            outputs.append(
                DataOutput(
                    answer=Step(
                        role="assistant",
                        content="\n".join(data_dict["modified_process"]),
                        label={
                            "process_type": f"Modified process with {error_count}/{total_steps} error steps"
                        },
                    ),
                    steps=modified_steps,
                )
            )

        return outputs

convert_to_data_sample(data_dict, source_info)

Convert PRM data to DataSample format

Expected input format: { "original_question": "...", "modified_question": "...", "original_process": ["step1", "step2", ...], "modified_process": ["step1", "step2", ...], "modified_steps": [5, 6], "error_steps": [5, 6], "reason": "...", "idx": "...", "question": "...", "classification": "confidence" }

Source code in rm_gallery/gallery/data/load/prmbench.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def convert_to_data_sample(
    self, data_dict: Dict[str, Any], source_info: Dict[str, Any]
) -> DataSample:
    """Convert PRM data to DataSample format

    Expected input format:
    {
        "original_question": "...",
        "modified_question": "...",
        "original_process": ["step1", "step2", ...],
        "modified_process": ["step1", "step2", ...],
        "modified_steps": [5, 6],
        "error_steps": [5, 6],
        "reason": "...",
        "idx": "...",
        "question": "...",
        "classification": "confidence"
    }
    """

    # Generate unique id from idx or question
    unique_id = data_dict.get(
        "idx", hashlib.md5(str(data_dict.get("question", "")).encode()).hexdigest()
    )

    try:
        # Create input from question
        data_input = self._create_prm_input(data_dict)

        # Create outputs from processes
        data_output = self._create_prm_output(data_dict)

        # Build metadata based on source type
        metadata = {
            "classification": data_dict.get("classification"),
            "modified_steps": data_dict.get("modified_steps", []),
            "error_steps": data_dict.get("error_steps", []),
            "reason": data_dict.get("reason"),
            "idx": data_dict.get("idx"),
            "original_process_length": len(data_dict.get("original_process", [])),
            "modified_process_length": len(data_dict.get("modified_process", [])),
            "load_strategy": "PRMBenchConverter",
        }

        # Add source-specific metadata
        if source_info.get("load_type") == "local":
            metadata.update(
                {
                    "source_file_path": source_info.get("source_file_path"),
                    "load_type": "local",
                }
            )
        elif source_info.get("load_type") == "huggingface":
            metadata.update(
                {
                    "dataset_name": source_info.get("dataset_name"),
                    "dataset_config": source_info.get("dataset_config"),
                    "split": source_info.get("split", "train"),
                    "load_type": "huggingface",
                }
            )

        # Create DataSample object
        data_sample = DataSample(
            unique_id=str(unique_id),
            input=data_input,
            output=data_output,
            source="prmbench",
            task_category=data_dict.get("classification", "reasoning"),
            metadata=metadata,
        )

        return data_sample

    except Exception as e:
        logger.error(f"Error creating DataSample from PRM data: {str(e)}")
        return None

PatchSimilarityReward

Bases: BasePointWiseReward

Calculate similarity between generated patch and oracle patch using difflib.SequenceMatcher.

This reward measures how similar the generated patch is to the reference patch, providing a similarity score and detailed diff information.

Source code in rm_gallery/gallery/rm/code/code.py
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
@RewardRegistry.register("code_patch_similarity")
class PatchSimilarityReward(BasePointWiseReward):
    """
    Calculate similarity between generated patch and oracle patch using difflib.SequenceMatcher.

    This reward measures how similar the generated patch is to the reference patch,
    providing a similarity score and detailed diff information.
    """

    name: str = Field(default="patch_similarity", description="Patch similarity reward")

    def _evaluate(
        self, sample: DataSample, **kwargs
    ) -> RewardResult[RewardDimensionWithScore]:
        """
        Calculate patch similarity.

        Args:
            sample: Data sample containing generated patch

        Returns:
            RewardResult: Reward result containing similarity score
        """
        generated = sample.output[0].answer.content.strip()
        reference = sample.output[0].answer.label.get("reference", "").strip()

        # Use SequenceMatcher to calculate similarity
        matcher = difflib.SequenceMatcher(None, generated, reference)
        similarity = matcher.ratio()

        # Get detailed diff information
        opcodes = list(matcher.get_opcodes())

        return RewardResult(
            name=self.name,
            details=[
                RewardDimensionWithScore(
                    name=self.name,
                    score=similarity,
                    reason=f"Patch similarity: {similarity:.3f} based on sequence matching",
                )
            ],
            extra_data={
                "similarity": similarity,
                "generated": generated,
                "reference": reference,
                "opcodes": opcodes,
            },
        )

PreciseIFListWiseReward

Bases: BaseHelpfulnessListWiseReward

Precise Instruction Following : Follows precise instructions, such as ‘Answer without the letter u’.

Source code in rm_gallery/gallery/rm/alignment/helpfulness/precise_if.py
20
21
22
23
24
25
26
27
@RewardRegistry.register("precise_if_listwise_reward")
class PreciseIFListWiseReward(BaseHelpfulnessListWiseReward):
    """Precise Instruction Following : Follows precise instructions, such as ‘Answer without the letter u’."""

    name: str = Field(default="precise_if_listwise_reward")
    desc: str = Field(default=DESC)
    scenario: str = Field(default=SCENARIO, description="assistant scenario")
    principles: List[str] = Field(default=PRINCIPLES)

PrincipleListWiseTemplate

Bases: BasePromptTemplate

Template implementation for principle-based list-wise evaluation tasks.

Designed for comparative evaluation scenarios where multiple answers need to be assessed against defined principles to determine the optimal choice.

Attributes:

Name Type Description
best int

Index of the best-performing answer according to principles

Source code in rm_gallery/core/reward/template.py
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
class PrincipleListWiseTemplate(BasePromptTemplate):
    """
    Template implementation for principle-based list-wise evaluation tasks.

    Designed for comparative evaluation scenarios where multiple answers need
    to be assessed against defined principles to determine the optimal choice.

    Attributes:
        best (int): Index of the best-performing answer according to principles
    """

    best: int = Field(
        default=...,
        description="which answer is the best? just give the number here!!!",
    )

    @classmethod
    def parse(cls, text: str):
        """
        Parses text input containing list-wise evaluation results.

        Converts best answer index from string to integer format
        during template instantiation.

        Args:
            text (str): Input string containing XML-style tagged content

        Returns:
            PrincipleListWiseTemplate: Constructed instance with parsed values
        """
        contents = cls._parse(text)
        contents["best"] = int(contents["best"])
        return cls(**contents)

    @classmethod
    def format(
        cls,
        desc: str,
        scenario: str,
        principles: str,
        examples: str,
        query: str,
        context: str,
        answers: List[str],
        **kwargs,
    ) -> str:
        """
        Formats comparative evaluation components into structured prompt template.

        Combines task description, scenario context, principles, and multiple
        candidate answers into standardized prompt format for list-wise evaluation.

        Args:
            desc (str): Task description text
            scenario (str): Scenario context description
            principles (str): List of relevant principles
            examples (str): Example-based guidance
            query (str): Evaluation query text
            context (str): Additional contextual information
            answers (List[str]): List of candidate answers for comparison
            **kwargs: Additional formatting parameters

        Returns:
            str: Formatted prompt string following template requirements
        """
        answer_str = ""
        for i, answer in enumerate(answers):
            answer_str += f"## Answer {i + 1}\n{answer}\n\n"

        if examples:
            examples = f"# Examples\n{examples}\n"

        if scenario:
            scenario = f"\n# Scenario\n{scenario}\n"

        if context:
            context = f"\n# Context\n{context}\n"

        if principles:
            principles = f"# Principles\n{principles}\n"

        return f"""# Task Description
{desc}
{scenario}

{principles}
{examples}

# Query
{query}
{context}

# Answers
{answer_str}

# Output Requirement
{cls.schema(**kwargs)}
"""

format(desc, scenario, principles, examples, query, context, answers, **kwargs) classmethod

Formats comparative evaluation components into structured prompt template.

Combines task description, scenario context, principles, and multiple candidate answers into standardized prompt format for list-wise evaluation.

Parameters:

Name Type Description Default
desc str

Task description text

required
scenario str

Scenario context description

required
principles str

List of relevant principles

required
examples str

Example-based guidance

required
query str

Evaluation query text

required
context str

Additional contextual information

required
answers List[str]

List of candidate answers for comparison

required
**kwargs

Additional formatting parameters

{}

Returns:

Name Type Description
str str

Formatted prompt string following template requirements

Source code in rm_gallery/core/reward/template.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
    @classmethod
    def format(
        cls,
        desc: str,
        scenario: str,
        principles: str,
        examples: str,
        query: str,
        context: str,
        answers: List[str],
        **kwargs,
    ) -> str:
        """
        Formats comparative evaluation components into structured prompt template.

        Combines task description, scenario context, principles, and multiple
        candidate answers into standardized prompt format for list-wise evaluation.

        Args:
            desc (str): Task description text
            scenario (str): Scenario context description
            principles (str): List of relevant principles
            examples (str): Example-based guidance
            query (str): Evaluation query text
            context (str): Additional contextual information
            answers (List[str]): List of candidate answers for comparison
            **kwargs: Additional formatting parameters

        Returns:
            str: Formatted prompt string following template requirements
        """
        answer_str = ""
        for i, answer in enumerate(answers):
            answer_str += f"## Answer {i + 1}\n{answer}\n\n"

        if examples:
            examples = f"# Examples\n{examples}\n"

        if scenario:
            scenario = f"\n# Scenario\n{scenario}\n"

        if context:
            context = f"\n# Context\n{context}\n"

        if principles:
            principles = f"# Principles\n{principles}\n"

        return f"""# Task Description
{desc}
{scenario}

{principles}
{examples}

# Query
{query}
{context}

# Answers
{answer_str}

# Output Requirement
{cls.schema(**kwargs)}
"""

parse(text) classmethod

Parses text input containing list-wise evaluation results.

Converts best answer index from string to integer format during template instantiation.

Parameters:

Name Type Description Default
text str

Input string containing XML-style tagged content

required

Returns:

Name Type Description
PrincipleListWiseTemplate

Constructed instance with parsed values

Source code in rm_gallery/core/reward/template.py
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
@classmethod
def parse(cls, text: str):
    """
    Parses text input containing list-wise evaluation results.

    Converts best answer index from string to integer format
    during template instantiation.

    Args:
        text (str): Input string containing XML-style tagged content

    Returns:
        PrincipleListWiseTemplate: Constructed instance with parsed values
    """
    contents = cls._parse(text)
    contents["best"] = int(contents["best"])
    return cls(**contents)

PrivacyLeakageReward

Bases: BasePointWiseReward

Privacy information leakage detection for emails, phone numbers, ID cards, credit cards, and IP addresses.

This reward checks for potential privacy leaks in the generated content, including email addresses, phone numbers, ID numbers, credit card numbers, and IP addresses. Applies penalties for each detected leak.

Source code in rm_gallery/gallery/rm/format/format.py
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
@RewardRegistry.register("privacy_leakage")
class PrivacyLeakageReward(BasePointWiseReward):
    """
    Privacy information leakage detection for emails, phone numbers, ID cards, credit cards, and IP addresses.

    This reward checks for potential privacy leaks in the generated content,
    including email addresses, phone numbers, ID numbers, credit card numbers,
    and IP addresses. Applies penalties for each detected leak.
    """

    name: str = Field(
        default="privacy_leakage", description="Privacy leakage detection reward"
    )
    penalty_per_leak: float = Field(default=-0.5, description="Penalty per leak")

    def _detect_privacy_leaks(self, text: str) -> List[Dict[str, str]]:
        """Detect privacy information leaks"""
        leaks = []

        # Email addresses
        email_pattern = r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b"
        emails = re.findall(email_pattern, text)
        for email in emails:
            leaks.append({"type": "email", "value": email})

        # Phone numbers (simple pattern)
        phone_pattern = (
            r"\b(?:\+?1[-.\s]?)?\(?[0-9]{3}\)?[-.\s]?[0-9]{3}[-.\s]?[0-9]{4}\b"
        )
        phones = re.findall(phone_pattern, text)
        for phone in phones:
            leaks.append({"type": "phone", "value": phone})

        # ID numbers (China)
        id_pattern = r"\b[1-9]\d{5}(18|19|20)\d{2}(0[1-9]|1[0-2])(0[1-9]|[12]\d|3[01])\d{3}[0-9Xx]\b"
        ids = re.findall(id_pattern, text)
        for id_num in ids:
            leaks.append({"type": "id_card", "value": id_num})

        # Credit card numbers (simple detection)
        credit_card_pattern = r"\b(?:\d{4}[-\s]?){3}\d{4}\b"
        cards = re.findall(credit_card_pattern, text)
        for card in cards:
            leaks.append({"type": "credit_card", "value": card})

        # IP addresses
        ip_pattern = r"\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b"
        ips = re.findall(ip_pattern, text)
        for ip in ips:
            # Exclude common non-sensitive IPs (like localhost)
            if not ip.startswith(("127.", "192.168.", "10.", "172.")):
                leaks.append({"type": "ip_address", "value": ip})

        return leaks

    def _evaluate(
        self, sample: DataSample, **kwargs
    ) -> RewardResult[RewardDimensionWithScore]:
        """
        Detect privacy leaks.

        Args:
            sample: Data sample containing text content

        Returns:
            RewardResult: Reward result containing privacy leak penalty score
        """
        content = sample.output[0].answer.content

        leaks = self._detect_privacy_leaks(content)
        penalty = len(leaks) * self.penalty_per_leak

        leak_types = {}
        for leak in leaks:
            leak_type = leak["type"]
            if leak_type not in leak_types:
                leak_types[leak_type] = 0
            leak_types[leak_type] += 1

        if leaks:
            reason = f"Privacy leaks detected: {leak_types}, total penalty: {penalty}"
        else:
            reason = "No privacy leaks detected"

        return RewardResult(
            name=self.name,
            details=[
                RewardDimensionWithScore(name=self.name, score=penalty, reason=reason)
            ],
            extra_data={
                "leaks": leaks,
                "leak_types": leak_types,
                "total_leaks": len(leaks),
                "penalty": penalty,
            },
        )

RMBBenchmarkBestOfNConverter

Bases: DataConverter

Unified converter for conversation data with conversation_input, bon_best and loser_list responses

Source code in rm_gallery/gallery/data/load/rmbbenchmark_bestofn.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
@DataConverterRegistry.register("rmbbenchmark_bestofn")
class RMBBenchmarkBestOfNConverter(DataConverter):
    """
    Unified converter for conversation data with conversation_input, bon_best and loser_list responses
    """

    def convert_to_data_sample(
        self, data_dict: Dict[str, Any], source_info: Dict[str, Any]
    ) -> DataSample:
        """Convert conversation data to DataSample format"""
        # Generate unique id using bon_uid
        if "bon_uid" in data_dict:
            unique_id = str(data_dict["bon_uid"])
        else:
            # Use conversation_input content for generating hash
            conversation_input = data_dict.get("conversation_input", [])
            if (
                conversation_input
                and isinstance(conversation_input, list)
                and len(conversation_input) > 0
            ):
                content = str(conversation_input[0].get("content", ""))
            else:
                content = ""
            unique_id = hashlib.md5(content.encode()).hexdigest()

        # Create input from conversation_input
        data_input = self._create_conversation_input(data_dict)

        # Create outputs from bon_best and loser_list
        data_output = self._create_conversation_output(data_dict)

        try:
            # Build metadata based on source type
            metadata = {
                "raw_data": data_dict,
                "load_strategy": "RMBBenchmarkBestOfNConverter",
                "category_path": data_dict.get("category_path"),
                "bon_uid": data_dict.get("bon_uid"),
                "bon_best_model": data_dict.get("bon_best", {}).get("llm_name")
                if data_dict.get("bon_best")
                else None,
                "loser_models": [
                    item.get("llm_name")
                    for item in data_dict.get("loser_list", [])
                    if isinstance(item, dict)
                ],
                "num_losers": len(data_dict.get("loser_list", [])),
            }

            # Add source-specific metadata
            if source_info.get("load_type") == "local":
                metadata.update(
                    {
                        "source_file_path": source_info.get("source_file_path"),
                        "load_type": "local",
                    }
                )
            elif source_info.get("load_type") == "huggingface":
                metadata.update(
                    {
                        "dataset_name": source_info.get("dataset_name"),
                        "dataset_config": source_info.get("dataset_config"),
                        "split": source_info.get("split", "train"),
                        "load_type": "huggingface",
                    }
                )

            data_sample = DataSample(
                unique_id=unique_id,
                input=data_input,
                output=data_output,
                source="rewardbench",
                task_category="conversation",
                metadata=metadata,
            )

            return data_sample

        except Exception as e:
            logger.error(f"Error creating conversation DataSample: {str(e)}")
            return None

    def _create_conversation_input(
        self, data_dict: Dict[str, Any]
    ) -> list[ChatMessage]:
        """Create DataInput from conversation_input"""
        conversation_input = data_dict.get("conversation_input", [])
        if isinstance(conversation_input, list):
            history = []
            for message in conversation_input:
                if isinstance(message, dict):
                    role = message.get("role", "user")
                    content = message.get("content", "")
                    history.append(ChatMessage(role=role, content=content))
                else:
                    history.append(ChatMessage(role="user", content=str(message)))
            return history
        else:
            return [ChatMessage(role="user", content=str(conversation_input))]

    def _create_conversation_output(
        self, data_dict: Dict[str, Any]
    ) -> list[DataOutput]:
        """Create DataOutput list from bon_best and loser_list"""
        outputs = []

        # Handle bon_best
        if "bon_best" in data_dict:
            bon_best = data_dict["bon_best"]
            if isinstance(bon_best, dict):
                answer_content = bon_best.get("answer", "")
                llm_name = bon_best.get("llm_name", "unknown")
                outputs.append(
                    DataOutput(
                        answer=Step(
                            role="assistant",
                            content=str(answer_content),
                            label={
                                "preference": "chosen",
                                "model": llm_name,
                                "type": "bon_best",
                            },
                        ),
                    )
                )

        # Handle loser_list
        if "loser_list" in data_dict:
            loser_list = data_dict["loser_list"]
            if isinstance(loser_list, list):
                for loser in loser_list:
                    if isinstance(loser, dict):
                        answer_content = loser.get("answer", "")
                        llm_name = loser.get("llm_name", "unknown")
                        outputs.append(
                            DataOutput(
                                answer=Step(
                                    role="assistant",
                                    content=str(answer_content),
                                    label={
                                        "preference": "rejected",
                                        "model": llm_name,
                                        "type": "loser",
                                    },
                                ),
                            )
                        )

        return outputs

convert_to_data_sample(data_dict, source_info)

Convert conversation data to DataSample format

Source code in rm_gallery/gallery/data/load/rmbbenchmark_bestofn.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def convert_to_data_sample(
    self, data_dict: Dict[str, Any], source_info: Dict[str, Any]
) -> DataSample:
    """Convert conversation data to DataSample format"""
    # Generate unique id using bon_uid
    if "bon_uid" in data_dict:
        unique_id = str(data_dict["bon_uid"])
    else:
        # Use conversation_input content for generating hash
        conversation_input = data_dict.get("conversation_input", [])
        if (
            conversation_input
            and isinstance(conversation_input, list)
            and len(conversation_input) > 0
        ):
            content = str(conversation_input[0].get("content", ""))
        else:
            content = ""
        unique_id = hashlib.md5(content.encode()).hexdigest()

    # Create input from conversation_input
    data_input = self._create_conversation_input(data_dict)

    # Create outputs from bon_best and loser_list
    data_output = self._create_conversation_output(data_dict)

    try:
        # Build metadata based on source type
        metadata = {
            "raw_data": data_dict,
            "load_strategy": "RMBBenchmarkBestOfNConverter",
            "category_path": data_dict.get("category_path"),
            "bon_uid": data_dict.get("bon_uid"),
            "bon_best_model": data_dict.get("bon_best", {}).get("llm_name")
            if data_dict.get("bon_best")
            else None,
            "loser_models": [
                item.get("llm_name")
                for item in data_dict.get("loser_list", [])
                if isinstance(item, dict)
            ],
            "num_losers": len(data_dict.get("loser_list", [])),
        }

        # Add source-specific metadata
        if source_info.get("load_type") == "local":
            metadata.update(
                {
                    "source_file_path": source_info.get("source_file_path"),
                    "load_type": "local",
                }
            )
        elif source_info.get("load_type") == "huggingface":
            metadata.update(
                {
                    "dataset_name": source_info.get("dataset_name"),
                    "dataset_config": source_info.get("dataset_config"),
                    "split": source_info.get("split", "train"),
                    "load_type": "huggingface",
                }
            )

        data_sample = DataSample(
            unique_id=unique_id,
            input=data_input,
            output=data_output,
            source="rewardbench",
            task_category="conversation",
            metadata=metadata,
        )

        return data_sample

    except Exception as e:
        logger.error(f"Error creating conversation DataSample: {str(e)}")
        return None

RMBBenchmarkPairwiseConverter

Bases: DataConverter

Unified converter for conversation data with conversation_input, chosen and reject responses

Source code in rm_gallery/gallery/data/load/rmbbenchmark_pairwise.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
@DataConverterRegistry.register("rmbbenchmark_pairwise")
class RMBBenchmarkPairwiseConverter(DataConverter):
    """
    Unified converter for conversation data with conversation_input, chosen and reject responses
    """

    def convert_to_data_sample(
        self, data_dict: Dict[str, Any], source_info: Dict[str, Any]
    ) -> DataSample:
        """Convert conversation data to DataSample format"""
        # Generate unique id using pair_uid
        if "pair_uid" in data_dict:
            unique_id = str(data_dict["pair_uid"])
        else:
            # Use conversation_input content for generating hash
            conversation_input = data_dict.get("conversation_input", [])
            if (
                conversation_input
                and isinstance(conversation_input, list)
                and len(conversation_input) > 0
            ):
                content = str(conversation_input[0].get("content", ""))
            else:
                content = ""
            unique_id = hashlib.md5(content.encode()).hexdigest()

        # Create input from conversation_input
        data_input = self._create_conversation_input(data_dict)

        # Create outputs from chosen and reject
        data_output = self._create_conversation_output(data_dict)

        try:
            # Build metadata based on source type
            metadata = {
                "raw_data": data_dict,
                "load_strategy": "RMBBenchmarkPairwiseConverter",
                "category_path": data_dict.get("category_path"),
                "pair_uid": data_dict.get("pair_uid"),
                "chosen_model": data_dict.get("chosen", {}).get("llm_name")
                if data_dict.get("chosen")
                else None,
                "reject_model": data_dict.get("reject", {}).get("llm_name")
                if data_dict.get("reject")
                else None,
            }

            # Add source-specific metadata
            if source_info.get("load_type") == "local":
                metadata.update(
                    {
                        "source_file_path": source_info.get("source_file_path"),
                        "load_type": "local",
                    }
                )
            elif source_info.get("load_type") == "huggingface":
                metadata.update(
                    {
                        "dataset_name": source_info.get("dataset_name"),
                        "dataset_config": source_info.get("dataset_config"),
                        "split": source_info.get("split", "train"),
                        "load_type": "huggingface",
                    }
                )

            data_sample = DataSample(
                unique_id=unique_id,
                input=data_input,
                output=data_output,
                source="rewardbench",
                task_category="conversation",
                metadata=metadata,
            )

            return data_sample

        except Exception as e:
            logger.error(f"Error creating conversation DataSample: {str(e)}")
            return None

    def _create_conversation_input(
        self, data_dict: Dict[str, Any]
    ) -> list[ChatMessage]:
        """Create DataInput from conversation_input"""
        conversation_input = data_dict.get("conversation_input", [])
        if isinstance(conversation_input, list):
            history = []
            for message in conversation_input:
                if isinstance(message, dict):
                    role = message.get("role", "user")
                    content = message.get("content", "")
                    history.append(ChatMessage(role=role, content=content))
                else:
                    history.append(ChatMessage(role="user", content=str(message)))
            return history
        else:
            return [ChatMessage(role="user", content=str(conversation_input))]

    def _create_conversation_output(
        self, data_dict: Dict[str, Any]
    ) -> list[DataOutput]:
        """Create DataOutput list from chosen and reject"""
        outputs = []

        # Handle chosen
        if "chosen" in data_dict:
            chosen = data_dict["chosen"]
            if isinstance(chosen, dict):
                answer_content = chosen.get("answer", "")
                llm_name = chosen.get("llm_name", "unknown")
                outputs.append(
                    DataOutput(
                        answer=Step(
                            role="assistant",
                            content=str(answer_content),
                            label={
                                "preference": "chosen",
                                "model": llm_name,
                                "type": "chosen",
                            },
                        ),
                    )
                )

        # Handle reject
        if "reject" in data_dict:
            reject = data_dict["reject"]
            if isinstance(reject, dict):
                answer_content = reject.get("answer", "")
                llm_name = reject.get("llm_name", "unknown")
                outputs.append(
                    DataOutput(
                        answer=Step(
                            role="assistant",
                            content=str(answer_content),
                            label={
                                "preference": "rejected",
                                "model": llm_name,
                                "type": "reject",
                            },
                        ),
                    )
                )

        return outputs

convert_to_data_sample(data_dict, source_info)

Convert conversation data to DataSample format

Source code in rm_gallery/gallery/data/load/rmbbenchmark_pairwise.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def convert_to_data_sample(
    self, data_dict: Dict[str, Any], source_info: Dict[str, Any]
) -> DataSample:
    """Convert conversation data to DataSample format"""
    # Generate unique id using pair_uid
    if "pair_uid" in data_dict:
        unique_id = str(data_dict["pair_uid"])
    else:
        # Use conversation_input content for generating hash
        conversation_input = data_dict.get("conversation_input", [])
        if (
            conversation_input
            and isinstance(conversation_input, list)
            and len(conversation_input) > 0
        ):
            content = str(conversation_input[0].get("content", ""))
        else:
            content = ""
        unique_id = hashlib.md5(content.encode()).hexdigest()

    # Create input from conversation_input
    data_input = self._create_conversation_input(data_dict)

    # Create outputs from chosen and reject
    data_output = self._create_conversation_output(data_dict)

    try:
        # Build metadata based on source type
        metadata = {
            "raw_data": data_dict,
            "load_strategy": "RMBBenchmarkPairwiseConverter",
            "category_path": data_dict.get("category_path"),
            "pair_uid": data_dict.get("pair_uid"),
            "chosen_model": data_dict.get("chosen", {}).get("llm_name")
            if data_dict.get("chosen")
            else None,
            "reject_model": data_dict.get("reject", {}).get("llm_name")
            if data_dict.get("reject")
            else None,
        }

        # Add source-specific metadata
        if source_info.get("load_type") == "local":
            metadata.update(
                {
                    "source_file_path": source_info.get("source_file_path"),
                    "load_type": "local",
                }
            )
        elif source_info.get("load_type") == "huggingface":
            metadata.update(
                {
                    "dataset_name": source_info.get("dataset_name"),
                    "dataset_config": source_info.get("dataset_config"),
                    "split": source_info.get("split", "train"),
                    "load_type": "huggingface",
                }
            )

        data_sample = DataSample(
            unique_id=unique_id,
            input=data_input,
            output=data_output,
            source="rewardbench",
            task_category="conversation",
            metadata=metadata,
        )

        return data_sample

    except Exception as e:
        logger.error(f"Error creating conversation DataSample: {str(e)}")
        return None

ReasoningFormatReward

Bases: BasePointWiseReward

Check format reward for thinking format and answer format with proper tags.

This reward verifies if the generated content follows the required format with proper and tags.

Source code in rm_gallery/gallery/rm/format/format.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
@RewardRegistry.register("reasoning_format")
class ReasoningFormatReward(BasePointWiseReward):
    """
    Check format reward for thinking format and answer format with proper tags.

    This reward verifies if the generated content follows the required format
    with proper <think> and <answer> tags.
    """

    name: str = Field(default="format_reward", description="Reasoning Format reward")
    think_token: str = Field(default="think", description="Think tag name")
    answer_token: str = Field(default="answer", description="Answer tag name")

    def _evaluate(
        self, sample: DataSample, **kwargs
    ) -> RewardResult[RewardDimensionWithScore]:
        """
        Check format and calculate reward.

        Args:
            sample: Data sample containing generated content

        Returns:
            RewardResult: Reward result containing format score
        """
        content = sample.output[0].answer.content

        # Check thinking format tags
        think_pattern = f"<{self.think_token}>.*?</{self.think_token}>"
        has_think_tag = bool(re.search(think_pattern, content, re.DOTALL))

        # Check answer format tags
        answer_pattern = f"<{self.answer_token}>.*?</{self.answer_token}>"
        has_answer_tag = bool(re.search(answer_pattern, content, re.DOTALL))

        # Calculate reward
        reward = 1.0 if has_think_tag and has_answer_tag else 0.0
        reasons = []

        if not has_think_tag:
            reasons.append(f"Missing <{self.think_token}></{self.think_token}> tags")

        if not has_answer_tag:
            reasons.append(f"Missing <{self.answer_token}></{self.answer_token}> tags")

        if reward == 1.0:
            reasons.append("All format requirements met")

        return RewardResult(
            name=self.name,
            details=[
                RewardDimensionWithScore(
                    name=self.name, score=reward, reason="; ".join(reasons)
                )
            ],
            extra_data={
                "has_think_tag": has_think_tag,
                "has_answer_tag": has_answer_tag,
                "total_reward": reward,
                "think_token": self.think_token,
                "answer_token": self.answer_token,
            },
        )

ReasoningListWiseReward

Bases: BaseHelpfulnessListWiseReward

Reasoning: Involves processing and analyzing text to draw inferences, make predictions, or solve problems, requiring an understanding of underlying concepts and relationships within the text.

Source code in rm_gallery/gallery/rm/alignment/helpfulness/reasoning.py
15
16
17
18
19
20
21
22
@RewardRegistry.register("reasoning_listwise_reward")
class ReasoningListWiseReward(BaseHelpfulnessListWiseReward):
    """Reasoning: Involves processing and analyzing text to draw inferences, make predictions, or solve problems, requiring an understanding of underlying concepts and relationships within the text."""

    name: str = Field(default="reasoning_listwise_reward", description="reward name")
    scenario: str = Field(default=SCENARIO, description="assistant scenario")
    principles: List[str] = Field(default=PRINCIPLES)
    desc: str = Field(default=DESC)

ReasoningToolCallFormatReward

Bases: BasePointWiseReward

Check tool call format including think, answer and tool_call tags with JSON validation.

This reward verifies if the generated content follows the required format with proper , and tags, including JSON validation for tool calls.

Source code in rm_gallery/gallery/rm/format/format.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
@RewardRegistry.register("reasoning_tool_call_format")
class ReasoningToolCallFormatReward(BasePointWiseReward):
    """
    Check tool call format including think, answer and tool_call tags with JSON validation.

    This reward verifies if the generated content follows the required format
    with proper <think>, <answer> and <tool_call> tags, including JSON validation
    for tool calls.
    """

    name: str = Field(
        default="tool_call_format", description="Reasoning tool call format reward"
    )

    def _evaluate(
        self, sample: DataSample, **kwargs
    ) -> RewardResult[RewardDimensionWithScore]:
        """
        Check tool call format and calculate reward.

        Args:
            sample: Data sample containing generated content

        Returns:
            RewardResult: Reward result containing format score
        """
        content = sample.output[0].answer.content

        # Extract tag contents
        think_pattern = r"<think>(.*?)</think>"
        answer_pattern = r"<answer>(.*?)</answer>"
        tool_call_pattern = r"<tool_call>(.*?)</tool_call>"

        think_matches = re.search(think_pattern, content, re.DOTALL)
        answer_matches = re.search(answer_pattern, content, re.DOTALL)
        tool_call_matches = re.findall(tool_call_pattern, content, re.DOTALL)

        has_think_tag = think_matches is not None
        has_answer_tag = answer_matches is not None
        has_tool_call_tag = len(tool_call_matches) > 0

        valid_format = False
        valid_tool_call_json = False
        reasons = []

        if has_think_tag:
            # Case 1: <think></think> + <answer></answer>
            if has_answer_tag and not has_tool_call_tag:
                # Check overall format
                format_pattern = r"^\s*<think>.*?</think>\s*<answer>.*?</answer>\s*$"
                valid_format = bool(re.match(format_pattern, content, re.DOTALL))

                # Check tag occurrence count
                if valid_format:
                    valid_format = (
                        content.count("<think>") == 1
                        and content.count("</think>") == 1
                        and content.count("<answer>") == 1
                        and content.count("</answer>") == 1
                    )

                if valid_format:
                    reasons.append("Valid <think></think> + <answer></answer> format")
                else:
                    reasons.append("Invalid <think></think> + <answer></answer> format")

            # Case 2: <think></think> + <tool_call></tool_call>
            elif has_tool_call_tag and not has_answer_tag:
                # Check overall format
                format_pattern = (
                    r"^\s*<think>.*?</think>\s*(?:<tool_call>.*?</tool_call>\s*)+$"
                )
                valid_format = bool(re.match(format_pattern, content, re.DOTALL))

                # Check <think> tag occurrence count
                if valid_format:
                    valid_format = (
                        content.count("<think>") == 1 and content.count("</think>") == 1
                    )

                # Check if <tool_call> and </tool_call> tags appear in pairs
                if valid_format:
                    if content.count("<tool_call>") != content.count("</tool_call>"):
                        valid_format = False

                # Check for consecutive duplicate tags
                if valid_format:
                    if re.search(r"</tool_call>\s*</tool_call>", content) or re.search(
                        r"<tool_call>\s*<tool_call>", content
                    ):
                        valid_format = False

                # Check tool_call JSON format
                valid_tool_call_json = True
                tool_calls = []
                if valid_format:
                    for tool_call_content in tool_call_matches:
                        try:
                            tool_call_json = json.loads(tool_call_content.strip())
                            # Check if JSON contains required fields
                            if not (
                                "name" in tool_call_json
                                and "arguments" in tool_call_json
                            ):
                                valid_tool_call_json = False
                                break
                            tool_calls.append(
                                {
                                    "function": {
                                        "name": tool_call_json["name"],
                                        "arguments": json.dumps(
                                            tool_call_json["arguments"],
                                            ensure_ascii=False,
                                        ),
                                    }
                                }
                            )
                        except json.JSONDecodeError:
                            valid_tool_call_json = False
                            break

                valid_format = valid_format and valid_tool_call_json

                if valid_format:
                    reasons.append(
                        "Valid <think></think> + <tool_call></tool_call> format with valid JSON"
                    )
                else:
                    if not valid_tool_call_json:
                        reasons.append("Invalid JSON format in <tool_call> tags")
                    else:
                        reasons.append(
                            "Invalid <think></think> + <tool_call></tool_call> format"
                        )
            else:
                # Has both answer and tool_call, or neither
                reasons.append(
                    "Invalid combination: should have either <answer> or <tool_call> tags, not both or neither"
                )
        else:
            reasons.append("Missing <think></think> tags")

        # Calculate reward score
        reward = 1.0 if valid_format else 0.0

        return RewardResult(
            name=self.name,
            details=[
                RewardDimensionWithScore(
                    name=self.name, score=reward, reason="; ".join(reasons)
                )
            ],
            extra_data={
                "has_think_tag": has_think_tag,
                "has_answer_tag": has_answer_tag,
                "has_tool_call_tag": has_tool_call_tag,
                "valid_format": valid_format,
                "valid_tool_call_json": valid_tool_call_json,
                "tool_call_count": len(tool_call_matches),
                "reward": reward,
            },
        )

RewardBench2AnnotationTemplate

Bases: BaseAnnotationTemplate

Reward Bench 2 annotation template implementation for 4-way comparison

Source code in rm_gallery/gallery/data/annotation/rewardbench2.py
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
@AnnotationTemplateRegistry.register("rewardbench2")
class RewardBench2AnnotationTemplate(BaseAnnotationTemplate):
    """Reward Bench 2 annotation template implementation for 4-way comparison"""

    def __init__(self, name: str):
        super().__init__(name)

    @property
    def label_config(self) -> str:
        """Return the Label Studio XML configuration for reward bench 2 evaluation (4-way comparison)"""
        return """
<View>
  <!-- Sample Information -->
  <Header value="Sample Information"/>
  <Text name="unique_id" value="$unique_id" title="Unique ID"/>
  <Text name="source" value="$source" title="Source"/>
  <Text name="task_category" value="$task_category" title="task_category"/>
  <Text name="created_at" value="$created_at" title="Created At"/>
  <Text name="answer_count" value="$answer_count" title="Number of Answers"/>

  <!-- Input Messages -->
  <Header value="Input Messages"/>
  <Paragraphs name="input_dialogue" value="$input_messages" layout="dialogue" nameKey="role" textKey="content" />

  <!-- Output Responses -->
  <Header value="Output Responses"/>
  <Paragraphs name="output_dialogue" value="$output_messages" layout="dialogue" nameKey="role" textKey="content" />

  <!-- Step 1: Best Answer Selection -->
  <View>
    <Text name="step1_title" value="Step 1: Best Answer Selection" />
    <Text name="step1_desc1" value="Please select the best answer among the 4 options" />
    <Choices name="best_answer" toName="output_dialogue" choice="single" title="🏆 Best Answer">
      <Choice value="answer_1" showIf="$answer_count>=1"/>
      <Choice value="answer_2" showIf="$answer_count>=2"/>
      <Choice value="answer_3" showIf="$answer_count>=3"/>
      <Choice value="answer_4" showIf="$answer_count>=4"/>
      <Choice value="all_equal" showIf="$answer_count=4"/>
    </Choices>
  </View>

  <!-- Step 2: Answer Ranking -->
  <View>
    <Text name="step2_spacer" value="" />
    <Text name="step2_title" value="Step 2: Answer Ranking" />
    <Text name="step2_desc" value="Please rank all answers from best to worst (1=best, 4=worst)" />

    <Text name="answer1_rank_label" value="📝 Answer 1 Rank:" />
    <Choices name="answer1_rank" toName="output_dialogue" choice="single" title="Answer 1 Rank">
      <Choice value="1"/>
      <Choice value="2"/>
      <Choice value="3"/>
      <Choice value="4"/>
    </Choices>

    <Text name="answer2_rank_label" value="📝 Answer 2 Rank:" />
    <Choices name="answer2_rank" toName="output_dialogue" choice="single" title="Answer 2 Rank">
      <Choice value="1"/>
      <Choice value="2"/>
      <Choice value="3"/>
      <Choice value="4"/>
    </Choices>

    <Text name="answer3_rank_label" value="📝 Answer 3 Rank:" />
    <Choices name="answer3_rank" toName="output_dialogue" choice="single" title="Answer 3 Rank">
      <Choice value="1"/>
      <Choice value="2"/>
      <Choice value="3"/>
      <Choice value="4"/>
    </Choices>

    <Text name="answer4_rank_label" value="📝 Answer 4 Rank:" />
    <Choices name="answer4_rank" toName="output_dialogue" choice="single" title="Answer 4 Rank">
      <Choice value="1"/>
      <Choice value="2"/>
      <Choice value="3"/>
      <Choice value="4"/>
    </Choices>
  </View>

  <!-- Step 3: Answer Rating -->
  <View>
    <Text name="step3_spacer" value="" />
    <Text name="step3_title" value="Step 3: Answer Rating" />
    <Text name="step3_desc" value="Please rate the quality of each answer for the $task_category task_category (1-5 stars)" />

    <Text name="answer1_rating_label" value="📝 Answer 1 Rating:" />
    <Rating name="answer1_rating" toName="output_dialogue" maxRating="5" icon="star" size="medium" title="Answer 1 Quality Rating"/>

    <Text name="answer2_rating_label" value="📝 Answer 2 Rating:" />
    <Rating name="answer2_rating" toName="output_dialogue" maxRating="5" icon="star" size="medium" title="Answer 2 Quality Rating"/>

    <Text name="answer3_rating_label" value="📝 Answer 3 Rating:" />
    <Rating name="answer3_rating" toName="output_dialogue" maxRating="5" icon="star" size="medium" title="Answer 3 Quality Rating"/>

    <Text name="answer4_rating_label" value="📝 Answer 4 Rating:" />
    <Rating name="answer4_rating" toName="output_dialogue" maxRating="5" icon="star" size="medium" title="Answer 4 Quality Rating"/>

    <Text name="rating_criteria" value="💡 Rating Criteria: 5 stars = excellent, 4 stars = good, 3 stars = average, 2 stars = poor, 1 star = very poor" />
  </View>

  <!-- Step 4: Additional Comments -->
  <View>
    <Text name="step4_spacer" value="" />
    <Text name="step4_title" value="Step 4: Additional Comments" />
    <Text name="step4_desc" value="Please provide any additional comments or feedback" />
    <TextArea name="additional_comments" toName="output_dialogue" placeholder="[x] The x-th answer has the following issues..." title="Additional Comments"/>
  </View>

</View>
"""

    def process_annotations(self, annotation_data: Dict[str, Any]) -> Dict[str, Any]:
        """
        Process annotation data specific to reward bench 2 evaluation (4-way comparison)

        Args:
            annotation_data: Generic annotation data with ratings, choices, text_areas

        Returns:
            Processed data structured for reward bench 2 evaluation
        """
        processed = {
            "best_answer": None,
            "answer_rankings": {},
            "answer_ratings": {},
            "ranking_order": [],
            "quality_comparison": {},
            "comments": "",
            "preference": None,
        }

        # Extract best answer selection (Step 1)
        if "best_answer" in annotation_data.get("choices", {}):
            best_answer_choices = annotation_data["choices"]["best_answer"]["choices"]
            if best_answer_choices:
                processed["best_answer"] = best_answer_choices[0]
                processed["preference"] = best_answer_choices[0]

        # Extract answer rankings (Step 2)
        choices = annotation_data.get("choices", {})
        rank_keys = ["answer1_rank", "answer2_rank", "answer3_rank", "answer4_rank"]

        for i, rank_key in enumerate(rank_keys, 1):
            if rank_key in choices:
                rank_choices = choices[rank_key]["choices"]
                if rank_choices:
                    processed["answer_rankings"][f"answer_{i}"] = int(rank_choices[0])

        # Create ranking order based on ranks
        if processed["answer_rankings"]:
            # Sort answers by their rank (1=best, 4=worst)
            sorted_answers = sorted(
                processed["answer_rankings"].items(), key=lambda x: x[1]
            )
            processed["ranking_order"] = [answer for answer, rank in sorted_answers]

        # Extract answer ratings (Step 3)
        ratings = annotation_data.get("ratings", {})
        rating_keys = [
            "answer1_rating",
            "answer2_rating",
            "answer3_rating",
            "answer4_rating",
        ]

        for i, rating_key in enumerate(rating_keys, 1):
            if rating_key in ratings:
                processed["answer_ratings"][f"answer_{i}"] = ratings[rating_key][
                    "rating"
                ]

        # Calculate quality comparison
        if processed["answer_ratings"]:
            # Find the highest rated answer
            best_rated_answer = max(
                processed["answer_ratings"].items(), key=lambda x: x[1]
            )

            # Calculate average rating
            avg_rating = sum(processed["answer_ratings"].values()) / len(
                processed["answer_ratings"]
            )

            processed["quality_comparison"] = {
                "best_rated_answer": best_rated_answer[0],
                "best_rating": best_rated_answer[1],
                "average_rating": avg_rating,
                "rating_spread": max(processed["answer_ratings"].values())
                - min(processed["answer_ratings"].values()),
                "consistency_check": {
                    "best_answer_matches_best_rating": processed["best_answer"]
                    == best_rated_answer[0],
                    "best_answer_matches_rank_1": processed["best_answer"]
                    in [
                        answer
                        for answer, rank in processed["answer_rankings"].items()
                        if rank == 1
                    ]
                    if processed["answer_rankings"]
                    else False,
                },
            }

        # Extract additional comments (Step 4)
        if "additional_comments" in annotation_data.get("text_areas", {}):
            processed["comments"] = annotation_data["text_areas"][
                "additional_comments"
            ]["text"]

        return processed

    def validate_annotation_data(self, annotation_data: Dict[str, Any]) -> bool:
        """
        Validate annotation data for reward bench 2 evaluation

        Args:
            annotation_data: Annotation data to validate

        Returns:
            True if valid, False otherwise
        """
        # Check if required fields are present
        required_sections = ["choices", "ratings"]
        for section in required_sections:
            if section not in annotation_data:
                return False

        # Check if best answer is selected
        if "best_answer" not in annotation_data.get("choices", {}):
            return False

        # Check if at least some rankings are provided
        choices = annotation_data.get("choices", {})
        rank_keys = ["answer1_rank", "answer2_rank", "answer3_rank", "answer4_rank"]
        if not any(key in choices for key in rank_keys):
            return False

        # Check if at least some ratings are provided
        ratings = annotation_data.get("ratings", {})
        rating_keys = [
            "answer1_rating",
            "answer2_rating",
            "answer3_rating",
            "answer4_rating",
        ]
        if not any(key in ratings for key in rating_keys):
            return False

        # Validate ranking consistency (each rank should be unique)
        provided_ranks = []
        for rank_key in rank_keys:
            if rank_key in choices:
                rank_choices = choices[rank_key]["choices"]
                if rank_choices:
                    rank = int(rank_choices[0])
                    if rank in provided_ranks:
                        return False  # Duplicate rank
                    provided_ranks.append(rank)

        return True

label_config property

Return the Label Studio XML configuration for reward bench 2 evaluation (4-way comparison)

process_annotations(annotation_data)

Process annotation data specific to reward bench 2 evaluation (4-way comparison)

Parameters:

Name Type Description Default
annotation_data Dict[str, Any]

Generic annotation data with ratings, choices, text_areas

required

Returns:

Type Description
Dict[str, Any]

Processed data structured for reward bench 2 evaluation

Source code in rm_gallery/gallery/data/annotation/rewardbench2.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
def process_annotations(self, annotation_data: Dict[str, Any]) -> Dict[str, Any]:
    """
    Process annotation data specific to reward bench 2 evaluation (4-way comparison)

    Args:
        annotation_data: Generic annotation data with ratings, choices, text_areas

    Returns:
        Processed data structured for reward bench 2 evaluation
    """
    processed = {
        "best_answer": None,
        "answer_rankings": {},
        "answer_ratings": {},
        "ranking_order": [],
        "quality_comparison": {},
        "comments": "",
        "preference": None,
    }

    # Extract best answer selection (Step 1)
    if "best_answer" in annotation_data.get("choices", {}):
        best_answer_choices = annotation_data["choices"]["best_answer"]["choices"]
        if best_answer_choices:
            processed["best_answer"] = best_answer_choices[0]
            processed["preference"] = best_answer_choices[0]

    # Extract answer rankings (Step 2)
    choices = annotation_data.get("choices", {})
    rank_keys = ["answer1_rank", "answer2_rank", "answer3_rank", "answer4_rank"]

    for i, rank_key in enumerate(rank_keys, 1):
        if rank_key in choices:
            rank_choices = choices[rank_key]["choices"]
            if rank_choices:
                processed["answer_rankings"][f"answer_{i}"] = int(rank_choices[0])

    # Create ranking order based on ranks
    if processed["answer_rankings"]:
        # Sort answers by their rank (1=best, 4=worst)
        sorted_answers = sorted(
            processed["answer_rankings"].items(), key=lambda x: x[1]
        )
        processed["ranking_order"] = [answer for answer, rank in sorted_answers]

    # Extract answer ratings (Step 3)
    ratings = annotation_data.get("ratings", {})
    rating_keys = [
        "answer1_rating",
        "answer2_rating",
        "answer3_rating",
        "answer4_rating",
    ]

    for i, rating_key in enumerate(rating_keys, 1):
        if rating_key in ratings:
            processed["answer_ratings"][f"answer_{i}"] = ratings[rating_key][
                "rating"
            ]

    # Calculate quality comparison
    if processed["answer_ratings"]:
        # Find the highest rated answer
        best_rated_answer = max(
            processed["answer_ratings"].items(), key=lambda x: x[1]
        )

        # Calculate average rating
        avg_rating = sum(processed["answer_ratings"].values()) / len(
            processed["answer_ratings"]
        )

        processed["quality_comparison"] = {
            "best_rated_answer": best_rated_answer[0],
            "best_rating": best_rated_answer[1],
            "average_rating": avg_rating,
            "rating_spread": max(processed["answer_ratings"].values())
            - min(processed["answer_ratings"].values()),
            "consistency_check": {
                "best_answer_matches_best_rating": processed["best_answer"]
                == best_rated_answer[0],
                "best_answer_matches_rank_1": processed["best_answer"]
                in [
                    answer
                    for answer, rank in processed["answer_rankings"].items()
                    if rank == 1
                ]
                if processed["answer_rankings"]
                else False,
            },
        }

    # Extract additional comments (Step 4)
    if "additional_comments" in annotation_data.get("text_areas", {}):
        processed["comments"] = annotation_data["text_areas"][
            "additional_comments"
        ]["text"]

    return processed

validate_annotation_data(annotation_data)

Validate annotation data for reward bench 2 evaluation

Parameters:

Name Type Description Default
annotation_data Dict[str, Any]

Annotation data to validate

required

Returns:

Type Description
bool

True if valid, False otherwise

Source code in rm_gallery/gallery/data/annotation/rewardbench2.py
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
def validate_annotation_data(self, annotation_data: Dict[str, Any]) -> bool:
    """
    Validate annotation data for reward bench 2 evaluation

    Args:
        annotation_data: Annotation data to validate

    Returns:
        True if valid, False otherwise
    """
    # Check if required fields are present
    required_sections = ["choices", "ratings"]
    for section in required_sections:
        if section not in annotation_data:
            return False

    # Check if best answer is selected
    if "best_answer" not in annotation_data.get("choices", {}):
        return False

    # Check if at least some rankings are provided
    choices = annotation_data.get("choices", {})
    rank_keys = ["answer1_rank", "answer2_rank", "answer3_rank", "answer4_rank"]
    if not any(key in choices for key in rank_keys):
        return False

    # Check if at least some ratings are provided
    ratings = annotation_data.get("ratings", {})
    rating_keys = [
        "answer1_rating",
        "answer2_rating",
        "answer3_rating",
        "answer4_rating",
    ]
    if not any(key in ratings for key in rating_keys):
        return False

    # Validate ranking consistency (each rank should be unique)
    provided_ranks = []
    for rank_key in rank_keys:
        if rank_key in choices:
            rank_choices = choices[rank_key]["choices"]
            if rank_choices:
                rank = int(rank_choices[0])
                if rank in provided_ranks:
                    return False  # Duplicate rank
                provided_ranks.append(rank)

    return True

RewardBench2Converter

Bases: DataConverter

Unified converter for conversation data with prompt, chosen and rejected responses (version 2)

Source code in rm_gallery/gallery/data/load/rewardbench2.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
@DataConverterRegistry.register("rewardbench2")
class RewardBench2Converter(DataConverter):
    """
    Unified converter for conversation data with prompt, chosen and rejected responses (version 2)
    """

    def convert_to_data_sample(
        self, data_dict: Dict[str, Any], source_info: Dict[str, Any]
    ) -> DataSample:
        """Convert conversation data to DataSample format"""
        # generate unique id using id field if available, otherwise use prompt content
        if "id" in data_dict:
            unique_id = str(data_dict["id"])
        else:
            content = str(data_dict.get("prompt", ""))
            unique_id = hashlib.md5(content.encode()).hexdigest()

        # Create input from prompt
        data_input = self._create_conversation_input(data_dict)

        # Create outputs from chosen/rejected responses
        data_output = self._create_conversation_output(data_dict)

        try:
            # Build metadata based on source type
            metadata = {
                "raw_data": data_dict,
                "load_strategy": "RewardBench2Converter",
                "subset": data_dict.get("subset"),
                "num_correct": data_dict.get("num_correct"),
                "num_rejected": data_dict.get("num_rejected"),
                "total_completions": data_dict.get("total_completions"),
                "models": data_dict.get("models"),
            }

            # Add source-specific metadata
            if source_info.get("load_type") == "local":
                metadata.update(
                    {
                        "source_file_path": source_info.get("source_file_path"),
                        "load_type": "local",
                    }
                )
            elif source_info.get("load_type") == "huggingface":
                metadata.update(
                    {
                        "dataset_name": source_info.get("dataset_name"),
                        "dataset_config": source_info.get("dataset_config"),
                        "split": source_info.get("split", "train"),
                        "load_type": "huggingface",
                    }
                )

            data_sample = DataSample(
                unique_id=unique_id,
                input=data_input,
                output=data_output,
                source="rewardbench2",
                task_category="conversation",
                metadata=metadata,
            )

            return data_sample

        except Exception as e:
            logger.error(f"Error creating conversation DataSample: {str(e)}")
            return None

    def _create_conversation_input(
        self, data_dict: Dict[str, Any]
    ) -> list[ChatMessage]:
        """Create DataInput from conversation prompt"""
        prompt = data_dict.get("prompt", "")

        # Since prompt is now a string, create a single user message
        if isinstance(prompt, str):
            return [ChatMessage(role="user", content=prompt)]
        else:
            # Fallback for backwards compatibility
            history = []
            if isinstance(prompt, list):
                for turn in prompt:
                    if isinstance(turn, dict):
                        role = turn.get("role", "user")
                        content = turn.get("content", str(turn))
                        history.append(ChatMessage(role=role, content=content))
                    else:
                        history.append(ChatMessage(role="user", content=str(turn)))
            else:
                history.append(ChatMessage(role="user", content=str(prompt)))

            return history

    def _create_conversation_output(
        self, data_dict: Dict[str, Any]
    ) -> list[DataOutput]:
        """Create DataOutput list from conversation responses"""
        outputs = []

        # Handle chosen responses (now a list of strings)
        chosen_responses = data_dict.get("chosen", [])
        if isinstance(chosen_responses, list):
            for chosen_content in chosen_responses:
                outputs.append(
                    DataOutput(
                        answer=Step(
                            role="assistant",
                            content=str(chosen_content),
                            label={"preference": "chosen"},
                        ),
                    )
                )
        elif chosen_responses:  # Single chosen response (backwards compatibility)
            outputs.append(
                DataOutput(
                    answer=Step(
                        role="assistant",
                        content=str(chosen_responses),
                        label={"preference": "chosen"},
                    ),
                )
            )

        # Handle rejected responses (now a list of strings)
        rejected_responses = data_dict.get("rejected", [])
        if isinstance(rejected_responses, list):
            for rejected_content in rejected_responses:
                outputs.append(
                    DataOutput(
                        answer=Step(
                            role="assistant",
                            content=str(rejected_content),
                            label={"preference": "rejected"},
                        ),
                    )
                )
        elif rejected_responses:  # Single rejected response (backwards compatibility)
            outputs.append(
                DataOutput(
                    answer=Step(
                        role="assistant",
                        content=str(rejected_responses),
                        label={"preference": "rejected"},
                    ),
                )
            )

        return outputs

convert_to_data_sample(data_dict, source_info)

Convert conversation data to DataSample format

Source code in rm_gallery/gallery/data/load/rewardbench2.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def convert_to_data_sample(
    self, data_dict: Dict[str, Any], source_info: Dict[str, Any]
) -> DataSample:
    """Convert conversation data to DataSample format"""
    # generate unique id using id field if available, otherwise use prompt content
    if "id" in data_dict:
        unique_id = str(data_dict["id"])
    else:
        content = str(data_dict.get("prompt", ""))
        unique_id = hashlib.md5(content.encode()).hexdigest()

    # Create input from prompt
    data_input = self._create_conversation_input(data_dict)

    # Create outputs from chosen/rejected responses
    data_output = self._create_conversation_output(data_dict)

    try:
        # Build metadata based on source type
        metadata = {
            "raw_data": data_dict,
            "load_strategy": "RewardBench2Converter",
            "subset": data_dict.get("subset"),
            "num_correct": data_dict.get("num_correct"),
            "num_rejected": data_dict.get("num_rejected"),
            "total_completions": data_dict.get("total_completions"),
            "models": data_dict.get("models"),
        }

        # Add source-specific metadata
        if source_info.get("load_type") == "local":
            metadata.update(
                {
                    "source_file_path": source_info.get("source_file_path"),
                    "load_type": "local",
                }
            )
        elif source_info.get("load_type") == "huggingface":
            metadata.update(
                {
                    "dataset_name": source_info.get("dataset_name"),
                    "dataset_config": source_info.get("dataset_config"),
                    "split": source_info.get("split", "train"),
                    "load_type": "huggingface",
                }
            )

        data_sample = DataSample(
            unique_id=unique_id,
            input=data_input,
            output=data_output,
            source="rewardbench2",
            task_category="conversation",
            metadata=metadata,
        )

        return data_sample

    except Exception as e:
        logger.error(f"Error creating conversation DataSample: {str(e)}")
        return None

RewardBenchAnnotationTemplate

Bases: BaseAnnotationTemplate

Reward Bench annotation template implementation

Source code in rm_gallery/gallery/data/annotation/rewardbench.py
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
@AnnotationTemplateRegistry.register("rewardbench")
class RewardBenchAnnotationTemplate(BaseAnnotationTemplate):
    """Reward Bench annotation template implementation"""

    def __init__(self, name: str):
        super().__init__(name)

    @property
    def label_config(self) -> str:
        """Return the Label Studio XML configuration for reward bench evaluation"""
        return """
<View>
  <!-- Sample Information -->
  <Header value="Sample Information"/>
  <Text name="unique_id" value="$unique_id" title="Unique ID"/>
  <Text name="source" value="$source" title="Source"/>
  <Text name="task_category" value="$task_category" title="task_category"/>
  <Text name="created_at" value="$created_at" title="Created At"/>
  <Text name="answer_count" value="$answer_count" title="Number of Answers"/>

  <!-- Input Messages -->
  <Header value="Input Messages"/>
  <Paragraphs name="input_dialogue" value="$input_messages" layout="dialogue" nameKey="role" textKey="content" />

  <!-- Output Responses -->
  <Header value="Output Responses"/>
  <Paragraphs name="output_dialogue" value="$output_messages" layout="dialogue" nameKey="role" textKey="content" />

  <!-- Step 1: Ranking -->
  <View>
    <Text name="step1_title" value="Step 1: Answer Ranking" />
    <Text name="step1_desc1" value="Please select the most appropriate ranking relationship" />
    <Choices name="answer_ranking" toName="output_dialogue" choice="single" title="🏆 Answer Ranking">
      <Choice value="1>2" showIf="$answer_count=2"/>
      <Choice value="2>1" showIf="$answer_count=2"/>
      <Choice value="Neither" showIf="$answer_count=2"/>
      <Choice value="All answers are of equal quality"/>
    </Choices>
  </View>

  <!-- Step 2: Answer Rating -->
  <View>
    <Text name="step2_spacer" value="" />
    <Text name="step2_title" value="Step 2: Answer Rating" />
    <Text name="step2_desc" value="Please rate the quality of the answers for the $task_category task_category (1-5 stars)" />

    <Text name="answer1_label" value="📝 Answer 1 Rating:" />
    <Rating name="answer1_rating" toName="output_dialogue" maxRating="5" icon="star" size="medium" title="Answer 1 Quality Rating"/>

    <Text name="answer2_label" value="📝 Answer 2 Rating:" />
    <Rating name="answer2_rating" toName="output_dialogue" maxRating="5" icon="star" size="medium" title="Answer 2 Quality Rating"/>

    <Text name="rating_criteria" value="💡 Rating Criteria: 5 stars = excellent, 4 stars = good, 3 stars = average, 2 stars = poor, 1 star = very poor" />
  </View>

  <!-- Step 3: Additional Comments -->
  <View>
    <Text name="step3_spacer" value="" />
    <Text name="step3_title" value="Step 3: Additional Comments" />
    <Text name="step3_desc" value="Please provide any additional comments or feedback" />
    <TextArea name="additional_comments" toName="output_dialogue" placeholder="[x] The x-th answer has the following issues..." title="Additional Comments"/>
  </View>

</View>
"""

    def process_annotations(self, annotation_data: Dict[str, Any]) -> Dict[str, Any]:
        """
        Process annotation data specific to reward bench evaluation

        Args:
            annotation_data: Generic annotation data with ratings, choices, text_areas

        Returns:
            Processed data structured for reward bench evaluation
        """
        processed = {
            "ranking_result": None,
            "answer_ratings": {},
            "quality_comparison": {},
            "comments": "",
            "preference": None,
        }

        # Extract answer ranking (Step 1)
        if "answer_ranking" in annotation_data.get("choices", {}):
            ranking_choices = annotation_data["choices"]["answer_ranking"]["choices"]
            if ranking_choices:
                processed["ranking_result"] = ranking_choices[0]

                # Determine preference based on ranking
                if "1>2" in ranking_choices[0]:
                    processed["preference"] = "answer_1"
                elif "2>1" in ranking_choices[0]:
                    processed["preference"] = "answer_2"
                elif "Neither" in ranking_choices[0]:
                    processed["preference"] = "neither"
                else:
                    processed["preference"] = "tie"

        # Extract answer ratings (Step 2)
        ratings = annotation_data.get("ratings", {})

        if "answer1_rating" in ratings:
            processed["answer_ratings"]["answer_1"] = ratings["answer1_rating"][
                "rating"
            ]

        if "answer2_rating" in ratings:
            processed["answer_ratings"]["answer_2"] = ratings["answer2_rating"][
                "rating"
            ]

        # Calculate quality comparison
        if len(processed["answer_ratings"]) == 2:
            rating1 = processed["answer_ratings"]["answer_1"]
            rating2 = processed["answer_ratings"]["answer_2"]

            processed["quality_comparison"] = {
                "rating_difference": rating1 - rating2,
                "better_answer": "answer_1"
                if rating1 > rating2
                else "answer_2"
                if rating2 > rating1
                else "tie",
                "rating_consistency": processed["preference"]
                == processed["quality_comparison"].get("better_answer", "unknown"),
            }

        # Extract additional comments (Step 3)
        if "additional_comments" in annotation_data.get("text_areas", {}):
            processed["comments"] = annotation_data["text_areas"][
                "additional_comments"
            ]["text"]

        return processed

    def validate_annotation_data(self, annotation_data: Dict[str, Any]) -> bool:
        """
        Validate annotation data for reward bench evaluation

        Args:
            annotation_data: Annotation data to validate

        Returns:
            True if valid, False otherwise
        """
        # Check if required fields are present
        required_sections = ["choices", "ratings"]
        for section in required_sections:
            if section not in annotation_data:
                return False

        # Check if answer ranking is provided
        if "answer_ranking" not in annotation_data.get("choices", {}):
            return False

        # Check if at least one rating is provided
        ratings = annotation_data.get("ratings", {})
        if not any(key in ratings for key in ["answer1_rating", "answer2_rating"]):
            return False

        return True

label_config property

Return the Label Studio XML configuration for reward bench evaluation

process_annotations(annotation_data)

Process annotation data specific to reward bench evaluation

Parameters:

Name Type Description Default
annotation_data Dict[str, Any]

Generic annotation data with ratings, choices, text_areas

required

Returns:

Type Description
Dict[str, Any]

Processed data structured for reward bench evaluation

Source code in rm_gallery/gallery/data/annotation/rewardbench.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
def process_annotations(self, annotation_data: Dict[str, Any]) -> Dict[str, Any]:
    """
    Process annotation data specific to reward bench evaluation

    Args:
        annotation_data: Generic annotation data with ratings, choices, text_areas

    Returns:
        Processed data structured for reward bench evaluation
    """
    processed = {
        "ranking_result": None,
        "answer_ratings": {},
        "quality_comparison": {},
        "comments": "",
        "preference": None,
    }

    # Extract answer ranking (Step 1)
    if "answer_ranking" in annotation_data.get("choices", {}):
        ranking_choices = annotation_data["choices"]["answer_ranking"]["choices"]
        if ranking_choices:
            processed["ranking_result"] = ranking_choices[0]

            # Determine preference based on ranking
            if "1>2" in ranking_choices[0]:
                processed["preference"] = "answer_1"
            elif "2>1" in ranking_choices[0]:
                processed["preference"] = "answer_2"
            elif "Neither" in ranking_choices[0]:
                processed["preference"] = "neither"
            else:
                processed["preference"] = "tie"

    # Extract answer ratings (Step 2)
    ratings = annotation_data.get("ratings", {})

    if "answer1_rating" in ratings:
        processed["answer_ratings"]["answer_1"] = ratings["answer1_rating"][
            "rating"
        ]

    if "answer2_rating" in ratings:
        processed["answer_ratings"]["answer_2"] = ratings["answer2_rating"][
            "rating"
        ]

    # Calculate quality comparison
    if len(processed["answer_ratings"]) == 2:
        rating1 = processed["answer_ratings"]["answer_1"]
        rating2 = processed["answer_ratings"]["answer_2"]

        processed["quality_comparison"] = {
            "rating_difference": rating1 - rating2,
            "better_answer": "answer_1"
            if rating1 > rating2
            else "answer_2"
            if rating2 > rating1
            else "tie",
            "rating_consistency": processed["preference"]
            == processed["quality_comparison"].get("better_answer", "unknown"),
        }

    # Extract additional comments (Step 3)
    if "additional_comments" in annotation_data.get("text_areas", {}):
        processed["comments"] = annotation_data["text_areas"][
            "additional_comments"
        ]["text"]

    return processed

validate_annotation_data(annotation_data)

Validate annotation data for reward bench evaluation

Parameters:

Name Type Description Default
annotation_data Dict[str, Any]

Annotation data to validate

required

Returns:

Type Description
bool

True if valid, False otherwise

Source code in rm_gallery/gallery/data/annotation/rewardbench.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
def validate_annotation_data(self, annotation_data: Dict[str, Any]) -> bool:
    """
    Validate annotation data for reward bench evaluation

    Args:
        annotation_data: Annotation data to validate

    Returns:
        True if valid, False otherwise
    """
    # Check if required fields are present
    required_sections = ["choices", "ratings"]
    for section in required_sections:
        if section not in annotation_data:
            return False

    # Check if answer ranking is provided
    if "answer_ranking" not in annotation_data.get("choices", {}):
        return False

    # Check if at least one rating is provided
    ratings = annotation_data.get("ratings", {})
    if not any(key in ratings for key in ["answer1_rating", "answer2_rating"]):
        return False

    return True

RewardBenchConverter

Bases: DataConverter

Unified converter for conversation data with prompt, chosen and rejected responses

Source code in rm_gallery/gallery/data/load/rewardbench.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
@DataConverterRegistry.register("rewardbench")
class RewardBenchConverter(DataConverter):
    """
    Unified converter for conversation data with prompt, chosen and rejected responses
    """

    def convert_to_data_sample(
        self, data_dict: Dict[str, Any], source_info: Dict[str, Any]
    ) -> DataSample:
        """Convert conversation data to DataSample format"""
        # generate unique id
        content = str(data_dict.get("prompt", []))
        unique_id = hashlib.md5(content.encode()).hexdigest()

        # Create input from prompt
        data_input = self._create_conversation_input(data_dict)

        # Create outputs from chosen/rejected responses
        data_output = self._create_conversation_output(data_dict)

        try:
            # Build metadata based on source type
            metadata = {
                "raw_data": data_dict,
                "load_strategy": "RewardBenchConverter",
            }

            # Add source-specific metadata
            if source_info.get("load_type") == "local":
                metadata.update(
                    {
                        "source_file_path": source_info.get("source_file_path"),
                        "load_type": "local",
                    }
                )
            elif source_info.get("load_type") == "huggingface":
                metadata.update(
                    {
                        "dataset_name": source_info.get("dataset_name"),
                        "dataset_config": source_info.get("dataset_config"),
                        "split": source_info.get("split", "train"),
                        "load_type": "huggingface",
                    }
                )

            data_sample = DataSample(
                unique_id=unique_id,
                input=data_input,
                output=data_output,
                source="rewardbench",
                task_category="conversation",
                metadata=metadata,
            )

            return data_sample

        except Exception as e:
            logger.error(f"Error creating conversation DataSample: {str(e)}")
            return None

    def _create_conversation_input(
        self, data_dict: Dict[str, Any]
    ) -> list[ChatMessage]:
        """Create DataInput from conversation prompt"""
        history = []
        prompt = data_dict.get("prompt")

        # Convert single-turn conversation to list format
        if isinstance(prompt, dict):
            prompt = [prompt]

        if isinstance(prompt, list):
            for turn in prompt:
                if isinstance(turn, dict):
                    role = turn.get("role", "user")
                    content = turn.get("content", str(turn))
                    history.append(ChatMessage(role=role, content=content))
                else:
                    history.append(ChatMessage(role="user", content=str(turn)))
        elif isinstance(prompt, str):
            history.append(ChatMessage(role="user", content=prompt))

        return history

    def _create_conversation_output(
        self, data_dict: Dict[str, Any]
    ) -> list[DataOutput]:
        """Create DataOutput list from conversation responses"""
        outputs = []

        # Handle chosen response
        if "chosen" in data_dict:
            chosen_content = data_dict["chosen"]
            if isinstance(chosen_content, list):
                # Multi-turn chosen response
                for turn in chosen_content:
                    if isinstance(turn, dict):
                        content = turn.get("content", str(turn))
                    else:
                        content = str(turn)
                    outputs.append(
                        DataOutput(
                            answer=Step(
                                role="assistant",
                                content=content,
                                label={"preference": "chosen"},
                            ),
                        )
                    )
            else:
                outputs.append(
                    DataOutput(
                        answer=Step(
                            role="assistant",
                            content=str(chosen_content),
                            label={"preference": "chosen"},
                        ),
                    )
                )

        # Handle rejected response
        if "rejected" in data_dict:
            rejected_content = data_dict["rejected"]
            if isinstance(rejected_content, list):
                # Multi-turn rejected response
                for turn in rejected_content:
                    if isinstance(turn, dict):
                        content = turn.get("content", str(turn))
                    else:
                        content = str(turn)
                    outputs.append(
                        DataOutput(
                            answer=Step(
                                role="assistant",
                                content=content,
                                label={"preference": "rejected"},
                            ),
                        )
                    )
            else:
                outputs.append(
                    DataOutput(
                        answer=Step(
                            role="assistant",
                            content=str(rejected_content),
                            label={"preference": "rejected"},
                        ),
                    )
                )

        return outputs

convert_to_data_sample(data_dict, source_info)

Convert conversation data to DataSample format

Source code in rm_gallery/gallery/data/load/rewardbench.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def convert_to_data_sample(
    self, data_dict: Dict[str, Any], source_info: Dict[str, Any]
) -> DataSample:
    """Convert conversation data to DataSample format"""
    # generate unique id
    content = str(data_dict.get("prompt", []))
    unique_id = hashlib.md5(content.encode()).hexdigest()

    # Create input from prompt
    data_input = self._create_conversation_input(data_dict)

    # Create outputs from chosen/rejected responses
    data_output = self._create_conversation_output(data_dict)

    try:
        # Build metadata based on source type
        metadata = {
            "raw_data": data_dict,
            "load_strategy": "RewardBenchConverter",
        }

        # Add source-specific metadata
        if source_info.get("load_type") == "local":
            metadata.update(
                {
                    "source_file_path": source_info.get("source_file_path"),
                    "load_type": "local",
                }
            )
        elif source_info.get("load_type") == "huggingface":
            metadata.update(
                {
                    "dataset_name": source_info.get("dataset_name"),
                    "dataset_config": source_info.get("dataset_config"),
                    "split": source_info.get("split", "train"),
                    "load_type": "huggingface",
                }
            )

        data_sample = DataSample(
            unique_id=unique_id,
            input=data_input,
            output=data_output,
            source="rewardbench",
            task_category="conversation",
            metadata=metadata,
        )

        return data_sample

    except Exception as e:
        logger.error(f"Error creating conversation DataSample: {str(e)}")
        return None

RewardDimensionWithRank

Bases: RewardDimension

ListWise/Pointwise reward dimension with ranking values.

Attributes:

Name Type Description
rank List[float]

Collection of ranking scores for different positions

Methods:

Name Description
__getitem__

Returns a scored reward dimension for a specific rank position

Source code in rm_gallery/core/reward/schema.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
class RewardDimensionWithRank(RewardDimension):
    """
    ListWise/Pointwise reward dimension with ranking values.

    Attributes:
        rank (List[float]): Collection of ranking scores for different positions

    Methods:
        __getitem__: Returns a scored reward dimension for a specific rank position
    """

    rank: List[float] = Field(default_factory=list, description="rank")

    def __getitem__(self, index: int) -> RewardDimensionWithScore:
        """
        Access a specific position's reward information.

        :param index: Position in the ranking list to retrieve
        :type index: int
        :returns: Reward information with score for the specified position
        :rtype: RewardDimensionWithScore
        """
        return RewardDimensionWithScore(
            name=self.name,
            # weight=self.weight,
            reason=self.reason,
            score=self.rank[index],
        )

__getitem__(index)

Access a specific position's reward information.

:param index: Position in the ranking list to retrieve :type index: int :returns: Reward information with score for the specified position :rtype: RewardDimensionWithScore

Source code in rm_gallery/core/reward/schema.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def __getitem__(self, index: int) -> RewardDimensionWithScore:
    """
    Access a specific position's reward information.

    :param index: Position in the ranking list to retrieve
    :type index: int
    :returns: Reward information with score for the specified position
    :rtype: RewardDimensionWithScore
    """
    return RewardDimensionWithScore(
        name=self.name,
        # weight=self.weight,
        reason=self.reason,
        score=self.rank[index],
    )

RewardDimensionWithScore

Bases: RewardDimension

Pointwise/Stepwise reward dimension with a numerical score.

Attributes:

Name Type Description
score float

Numerical value representing the reward magnitude

Source code in rm_gallery/core/reward/schema.py
20
21
22
23
24
25
26
27
28
class RewardDimensionWithScore(RewardDimension):
    """
    Pointwise/Stepwise reward dimension with a numerical score.

    Attributes:
        score (float): Numerical value representing the reward magnitude
    """

    score: float = Field(default=..., description="score")

RewardRegistry

A registry management system for reward modules that maps module names to their corresponding implementation classes.

This class provides a centralized repository for registering and retrieving reward modules by string identifiers. Modules can be registered using decorators and later accessed by their string identifiers.

Attributes:

Name Type Description
_registry Dict[str, Type[BaseReward]]

Internal dictionary storing the mapping between reward module names and their classes.

Source code in rm_gallery/core/reward/registry.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
class RewardRegistry:
    """A registry management system for reward modules that maps module names to their corresponding implementation classes.

    This class provides a centralized repository for registering and retrieving reward modules by string identifiers.
    Modules can be registered using decorators and later accessed by their string identifiers.

    Attributes:
        _registry: Internal dictionary storing the mapping between reward module names and their classes.
    """

    # Dictionary mapping reward module names to their corresponding classes
    _registry: Dict[str, Type[BaseReward]] = {}

    @classmethod
    def register(cls, name: str):
        """Create a decorator to register a reward module class with a specified identifier.

        The decorator pattern allows classes to be registered while maintaining their original identity.

        Args:
            name: Unique string identifier for the reward module
            module: The BaseReward subclass to be registered

        Returns:
            A decorator function that registers the module when applied to a class
        """

        def _register(module: Type[BaseReward]):
            """Internal registration function that stores the module in the registry.

            Args:
                module: The BaseReward subclass to be registered

            Returns:
                The original module class (unchanged)
            """
            cls._registry[name] = module
            return module

        return _register

    @classmethod
    def get(cls, name: str) -> Type[BaseReward] | None:
        """Retrieve a registered reward module class by its identifier.

        Provides safe access to registered modules without raising errors for missing entries.

        Args:
            name: String identifier of the reward module to retrieve

        Returns:
            The corresponding BaseReward subclass if found, None otherwise
        """
        assert name in cls._registry, f"Reward module '{name}' not found"
        return cls._registry.get(name, None)

    @classmethod
    def list(cls) -> str:
        """
        Returns:
            A list of all registered reward modules
        """
        info = []
        for name, module in cls._registry.items():
            info.append(
                pd.Series(
                    {
                        "Name": name,
                        "Class": module.__name__,
                        "Scenario": module.__doc__.strip(),
                    }
                )
            )

        info_df = pd.concat(info, axis=1).T
        # info_str = info_df.to_markdown(index=False)
        info_str = tabulate(
            info_df,
            headers="keys",
            tablefmt="grid",
            maxcolwidths=[50] * (len(info_df.columns) + 1),
            # showindex=False,
        )
        # info_str = tabulate(info_df, headers='keys', tablefmt='github')
        return info_str

get(name) classmethod

Retrieve a registered reward module class by its identifier.

Provides safe access to registered modules without raising errors for missing entries.

Parameters:

Name Type Description Default
name str

String identifier of the reward module to retrieve

required

Returns:

Type Description
Type[BaseReward] | None

The corresponding BaseReward subclass if found, None otherwise

Source code in rm_gallery/core/reward/registry.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
@classmethod
def get(cls, name: str) -> Type[BaseReward] | None:
    """Retrieve a registered reward module class by its identifier.

    Provides safe access to registered modules without raising errors for missing entries.

    Args:
        name: String identifier of the reward module to retrieve

    Returns:
        The corresponding BaseReward subclass if found, None otherwise
    """
    assert name in cls._registry, f"Reward module '{name}' not found"
    return cls._registry.get(name, None)

list() classmethod

Returns:

Type Description
str

A list of all registered reward modules

Source code in rm_gallery/core/reward/registry.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
@classmethod
def list(cls) -> str:
    """
    Returns:
        A list of all registered reward modules
    """
    info = []
    for name, module in cls._registry.items():
        info.append(
            pd.Series(
                {
                    "Name": name,
                    "Class": module.__name__,
                    "Scenario": module.__doc__.strip(),
                }
            )
        )

    info_df = pd.concat(info, axis=1).T
    # info_str = info_df.to_markdown(index=False)
    info_str = tabulate(
        info_df,
        headers="keys",
        tablefmt="grid",
        maxcolwidths=[50] * (len(info_df.columns) + 1),
        # showindex=False,
    )
    # info_str = tabulate(info_df, headers='keys', tablefmt='github')
    return info_str

register(name) classmethod

Create a decorator to register a reward module class with a specified identifier.

The decorator pattern allows classes to be registered while maintaining their original identity.

Parameters:

Name Type Description Default
name str

Unique string identifier for the reward module

required
module

The BaseReward subclass to be registered

required

Returns:

Type Description

A decorator function that registers the module when applied to a class

Source code in rm_gallery/core/reward/registry.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@classmethod
def register(cls, name: str):
    """Create a decorator to register a reward module class with a specified identifier.

    The decorator pattern allows classes to be registered while maintaining their original identity.

    Args:
        name: Unique string identifier for the reward module
        module: The BaseReward subclass to be registered

    Returns:
        A decorator function that registers the module when applied to a class
    """

    def _register(module: Type[BaseReward]):
        """Internal registration function that stores the module in the registry.

        Args:
            module: The BaseReward subclass to be registered

        Returns:
            The original module class (unchanged)
        """
        cls._registry[name] = module
        return module

    return _register

RewardResult

Bases: BaseModel, Generic[T]

Container for reward calculation results with generic type support.

Attributes:

Name Type Description
name str

Identifier of the reward module that generated this result

details List[T]

Collection of detailed reward information items

extra_data dict

Additional metadata or context information

Source code in rm_gallery/core/reward/schema.py
65
66
67
68
69
70
71
72
73
74
75
76
77
class RewardResult(BaseModel, Generic[T]):
    """
    Container for reward calculation results with generic type support.

    Attributes:
        name (str): Identifier of the reward module that generated this result
        details (List[T]): Collection of detailed reward information items
        extra_data (dict): Additional metadata or context information
    """

    name: str = Field(default=..., description="reward module name")
    details: List[T] = Field(default_factory=list, description="reward details")
    extra_data: dict = Field(default_factory=dict, description="extra data")

RewriteListWiseReward

Bases: BaseHelpfulnessListWiseReward

Rewrite: the assistant aims to modifies existing text to alter its style while preserving the original information and intent.

Source code in rm_gallery/gallery/rm/alignment/helpfulness/rewrite.py
17
18
19
20
21
22
23
24
@RewardRegistry.register("rewrite_listwise_reward")
class RewriteListWiseReward(BaseHelpfulnessListWiseReward):
    """Rewrite: the assistant aims to modifies existing text to alter its style while preserving the original information and intent."""

    name: str = Field(default="rewrite_listwise_reward", description="reward name")
    scenario: str = Field(default=SCENARIO, description="assistant scenario")
    principles: List[str] = Field(default=PRINCIPLES)
    desc: str = Field(default=DESC)

RolePlayingListWiseReward

Bases: BaseHelpfulnessListWiseReward

Role Playing: Entails adopting specific characters or personas within text-based scenarios, engaging in dialogues or actions that reflect the assigned roles.

Source code in rm_gallery/gallery/rm/alignment/helpfulness/role_playing.py
25
26
27
28
29
30
31
32
@RewardRegistry.register("role_playing_listwise_reward")
class RolePlayingListWiseReward(BaseHelpfulnessListWiseReward):
    """Role Playing: Entails adopting specific characters or personas within text-based scenarios, engaging in dialogues or actions that reflect the assigned roles."""

    name: str = Field(default="role_playing_listwise_reward")
    scenario: str = Field(default=SCENARIO, description="assistant scenario")
    principles: List[str] = Field(default=PRINCIPLES)
    desc: str = Field(default=DESC, description="task description")

RougeReward

Bases: BasePointWiseReward

ROUGE-L similarity evaluation using longest common subsequence

Source code in rm_gallery/gallery/rm/general.py
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
@RewardRegistry.register("rouge")
class RougeReward(BasePointWiseReward):
    """ROUGE-L similarity evaluation using longest common subsequence"""

    name: str = Field(default="rouge", description="ROUGE similarity reward")

    def _lcs_length(self, x: List[str], y: List[str]) -> int:
        """Calculate longest common subsequence length"""
        m, n = len(x), len(y)
        dp = [[0] * (n + 1) for _ in range(m + 1)]

        for i in range(1, m + 1):
            for j in range(1, n + 1):
                if x[i - 1] == y[j - 1]:
                    dp[i][j] = dp[i - 1][j - 1] + 1
                else:
                    dp[i][j] = max(dp[i - 1][j], dp[i][j - 1])

        return dp[m][n]

    def _evaluate(
        self, sample: DataSample, **kwargs
    ) -> RewardResult[RewardDimensionWithScore]:
        """
        Calculate ROUGE-L score

        Args:
            sample: Data sample containing generated content

        Returns:
            RewardResult: Reward result containing ROUGE-L score
        """
        generated = sample.output[0].answer.content.strip().lower()
        reference = sample.output[0].answer.label.get("reference", "").strip().lower()

        # Tokenization
        generated_tokens = generated.split()
        reference_tokens = reference.split()

        if not generated_tokens and not reference_tokens:
            rouge_l = 1.0
        elif not generated_tokens or not reference_tokens:
            rouge_l = 0.0
        else:
            # Calculate LCS length
            lcs_len = self._lcs_length(generated_tokens, reference_tokens)

            # Calculate ROUGE-L
            if len(generated_tokens) == 0 or len(reference_tokens) == 0:
                rouge_l = 0.0
            else:
                precision = lcs_len / len(generated_tokens)
                recall = lcs_len / len(reference_tokens)
                rouge_l = (
                    2 * precision * recall / (precision + recall)
                    if (precision + recall) > 0
                    else 0.0
                )

        return RewardResult(
            name=self.name,
            details=[
                RewardDimensionWithScore(
                    name=self.name,
                    score=rouge_l,
                    reason=f"ROUGE-L score: {rouge_l:.3f}",
                )
            ],
            extra_data={
                "rouge_l": rouge_l,
                "generated_length": len(generated_tokens),
                "reference_length": len(reference_tokens),
                "lcs_length": self._lcs_length(generated_tokens, reference_tokens)
                if generated_tokens and reference_tokens
                else 0,
            },
        )

SafetyListWiseReward

Bases: BaseHarmlessnessListWiseReward

Safety: Comply with or refuse prompts related to harmful use cases as well as general compliance behaviors.

Source code in rm_gallery/gallery/rm/alignment/harmlessness/safety.py
19
20
21
22
23
24
25
26
@RewardRegistry.register("safety_listwise_reward")
class SafetyListWiseReward(BaseHarmlessnessListWiseReward):
    """Safety: Comply with or refuse prompts related to harmful use cases as well as general compliance behaviors."""

    name: str = Field(default="safety_listwise_reward")
    desc: str = Field(default=DESC)
    scenario: str = Field(default=SCENARIO, description="assistant scenario")
    principles: List[str] = Field(default=PRINCIPLES)

SummarizationListWiseReward

Bases: BaseHelpfulnessListWiseReward

Summarization: The text is compressed into a short form, retaining the main information, which is divided into extraction (directly selected from the original text) and production (rewriting the information).

Source code in rm_gallery/gallery/rm/alignment/helpfulness/summarization.py
23
24
25
26
27
28
29
30
31
32
@RewardRegistry.register("summarization_listwise_reward")
class SummarizationListWiseReward(BaseHelpfulnessListWiseReward):
    """Summarization: The text is compressed into a short form, retaining the main information, which is divided into extraction (directly selected from the original text) and production (rewriting the information)."""

    name: str = Field(
        default="summarization_listwise_reward", description="reward name"
    )
    scenario: str = Field(default=SCENARIO, description="assistant scenario")
    principles: List[str] = Field(default=PRINCIPLES)
    desc: str = Field(default=DESC, description="task description")

SyntaxCheckReward

Bases: BasePointWiseReward

Check code syntax using Abstract Syntax Tree to validate Python code blocks.

Source code in rm_gallery/gallery/rm/code/code.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
@RewardRegistry.register("code_syntax_check")
class SyntaxCheckReward(BasePointWiseReward):
    """Check code syntax using Abstract Syntax Tree to validate Python code blocks."""

    name: str = Field(default="syntax_check", description="Syntax check reward")

    def _evaluate(
        self, sample: DataSample, **kwargs
    ) -> RewardResult[RewardDimensionWithScore]:
        """
        Check code syntax

        Args:
            sample: Data sample containing code content

        Returns:
            RewardResult: Reward result containing syntax check results
        """
        content = sample.output[0].answer.content

        # Extract code blocks
        code_pattern = r"```(?:python)?\n(.*?)\n```"
        code_blocks = re.findall(code_pattern, content, re.DOTALL)

        if not code_blocks:
            # No code blocks, return neutral score
            return RewardResult(
                name=self.name,
                details=[
                    RewardDimensionWithScore(
                        name=self.name,
                        score=0.0,
                        reason="No code blocks found to check",
                    )
                ],
                extra_data={"code_blocks": [], "syntax_errors": []},
            )

        syntax_errors = []
        valid_blocks = 0

        for i, code in enumerate(code_blocks):
            try:
                ast.parse(code.strip())
                valid_blocks += 1
            except SyntaxError as e:
                syntax_errors.append(
                    {"block": i, "error": str(e), "line": e.lineno, "offset": e.offset}
                )

        # Calculate score: ratio of valid code blocks
        score = valid_blocks / len(code_blocks) if code_blocks else 0.0

        # Apply penalty if syntax errors exist
        if syntax_errors:
            score -= 0.5

        return RewardResult(
            name=self.name,
            details=[
                RewardDimensionWithScore(
                    name=self.name,
                    score=score,
                    reason=f"Syntax check: {valid_blocks}/{len(code_blocks)} blocks valid, {len(syntax_errors)} errors",
                )
            ],
            extra_data={
                "code_blocks": code_blocks,
                "valid_blocks": valid_blocks,
                "total_blocks": len(code_blocks),
                "syntax_errors": syntax_errors,
            },
        )

TranslationListWiseReward

Bases: BaseHelpfulnessListWiseReward

Translation: Converting text from one language to another.

Source code in rm_gallery/gallery/rm/alignment/helpfulness/translation.py
21
22
23
24
25
26
27
28
@RewardRegistry.register("translation_listwise_reward")
class TranslationListWiseReward(BaseHelpfulnessListWiseReward):
    """Translation: Converting text from one language to another."""

    name: str = Field(default="translation_listwise_reward", description="reward name")
    scenario: str = Field(default=SCENARIO, description="assistant scenario")
    principles: List[str] = Field(default=PRINCIPLES)
    desc: str = Field(default=DESC, description="task description")

get_tokenizer(tokenizer_type='tiktoken', encoding_name='cl100k_base', chinese_only=False, **kwargs)

Factory function to create tokenizer instances.

Parameters:

Name Type Description Default
tokenizer_type str

Type of tokenizer ("tiktoken", "jieba", "simple")

'tiktoken'
encoding_name str

Tiktoken encoding name (for tiktoken tokenizer)

'cl100k_base'
chinese_only bool

Whether to keep only Chinese characters (for jieba tokenizer)

False
**kwargs

Additional arguments for tokenizer initialization

{}

Returns:

Name Type Description
BaseTokenizer BaseTokenizer

Tokenizer instance

Raises:

Type Description
ValueError

If tokenizer_type is not supported

Source code in rm_gallery/core/utils/tokenizer.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
def get_tokenizer(
    tokenizer_type: str = "tiktoken",
    encoding_name: str = "cl100k_base",
    chinese_only: bool = False,
    **kwargs,
) -> BaseTokenizer:
    """
    Factory function to create tokenizer instances.

    Args:
        tokenizer_type: Type of tokenizer ("tiktoken", "jieba", "simple")
        encoding_name: Tiktoken encoding name (for tiktoken tokenizer)
        chinese_only: Whether to keep only Chinese characters (for jieba tokenizer)
        **kwargs: Additional arguments for tokenizer initialization

    Returns:
        BaseTokenizer: Tokenizer instance

    Raises:
        ValueError: If tokenizer_type is not supported
    """
    if tokenizer_type == "tiktoken":
        return TiktokenTokenizer(encoding_name=encoding_name, **kwargs)
    elif tokenizer_type == "jieba":
        return JiebaTokenizer(chinese_only=chinese_only, **kwargs)
    elif tokenizer_type == "simple":
        return SimpleTokenizer(**kwargs)
    else:
        raise ValueError(
            f"Unsupported tokenizer type: {tokenizer_type}. "
            f"Supported types: tiktoken, jieba, simple"
        )