Skip to content

harmlessness

BaseHarmlessnessListWiseReward

Bases: BaseListWisePrincipleReward

The assistant aims to answer questions, avoiding harmful behaviors such as spreading misinformation, spreading harmful ideas, or engaging in other harmful activities.

Source code in rm_gallery/gallery/rm/alignment/base.py
61
62
63
64
65
66
67
68
69
70
@RewardRegistry.register("base_harmlessness_listwise")
class BaseHarmlessnessListWiseReward(BaseListWisePrincipleReward):
    """The assistant aims to answer questions, avoiding harmful behaviors such as spreading misinformation, spreading harmful ideas, or engaging in other harmful activities."""

    name: str = Field(default="base_harmlessness_listwise")
    desc: str = Field(default=DEFAULT_HARMLESSNESS_DESC)
    scenario: str = Field(
        default=DEFAULT_HARMLESSNESS_SCENARIO, description="assistant scenario"
    )
    principles: List[str] = Field(default=DEFAULT_HARMLESSNESS_PRINCIPLES)

BasePointWiseReward

Bases: BaseReward

Point-wise reward module for individual response evaluation.

Evaluates each response independently without considering relative ranking.

Source code in rm_gallery/core/reward/base.py
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
class BasePointWiseReward(BaseReward):
    """
    Point-wise reward module for individual response evaluation.

    Evaluates each response independently without considering relative ranking.
    """

    @abstractmethod
    def _evaluate(
        self, sample: DataSample, **kwargs
    ) -> RewardResult[RewardDimensionWithScore]:
        """
        Processes a single response to generate reward metrics.

        Parameters:
            sample (DataSample): Single-response data sample
            **kwargs: Evaluation parameters

        Returns:
            RewardResult[RewardDimensionWithScore]: Response-specific reward metrics
        """
        ...

    def _parallel(
        self,
        func: Callable,
        sample: DataSample,
        thread_pool: ThreadPoolExecutor | None = None,
        **kwargs,
    ) -> DataSample:
        """
        Processes responses in a data sample using parallel or sequential execution.

        This method applies the provided function to each response in the sample,
        either in parallel using a thread pool or sequentially. Results are merged
        back into the corresponding response objects.

        Parameters:
            func (Callable): Function to apply to each response. Should accept a
                DataSample and return an object with 'details' and 'extra_data' attributes.
            sample (DataSample): Input sample containing multiple responses to process
            thread_pool (ThreadPoolExecutor | None): Optional thread pool for parallel execution
            **kwargs: Additional arguments passed to func

        Returns:
            DataSample: Modified copy of input sample with reward metrics updated in each response

        The method creates a deep copy of the input sample to avoid modifying original data.
        When using a thread pool, it submits tasks for each response and waits for completion
        before merging results. Response objects are updated with both reward details and
        additional metadata from processing results.
        """
        sample = sample.model_copy(deep=True)
        futures = []
        for i, output in enumerate(sample.output):
            # Create sub-sample for individual response processing
            subsample = DataSample(
                unique_id=sample.unique_id, input=sample.input, output=[output]
            )

            if thread_pool:
                futures.append(
                    (
                        i,
                        thread_pool.submit(func, sample=subsample, **kwargs),
                    )
                )
            else:
                result = func(
                    sample=subsample,
                    **kwargs,
                )
                output.answer.reward.details += result.details
                output.answer.additional_kwargs[self.name] = result.extra_data

        # Process parallel execution results
        if thread_pool:
            wait([future[-1] for future in futures], return_when=ALL_COMPLETED)
            # Merge results back into sample outputs
            for i, future in futures:
                result = future.result()
                output = sample.output[i]
                output.answer.reward.details += result.details
                output.answer.additional_kwargs[self.name] = result.extra_data

        for output in sample.output:
            if len(output.answer.reward.details) > 0:
                output.answer.reward.score = sum(
                    r.score for r in output.answer.reward.details
                ) / len(output.answer.reward.details)

        return sample

    async def _async_parallel(
        self,
        func: Callable,
        sample: DataSample,
        semaphore: asyncio.Semaphore,
        **kwargs,
    ) -> DataSample:
        """
        Async version of _parallel method for BasePointWiseReward.

        Processes responses in a data sample using async execution with semaphore control.

        Parameters:
            func (Callable): Function to apply to each response
            sample (DataSample): Input sample containing multiple responses to process
            semaphore (asyncio.Semaphore): Semaphore for async concurrency control
            **kwargs: Additional arguments passed to func

        Returns:
            DataSample: Modified copy of input sample with reward metrics updated in each response
        """
        sample = sample.model_copy(deep=True)

        async def _async_evaluate_output(i: int, output):
            """Async wrapper for individual output evaluation"""
            subsample = DataSample(
                unique_id=sample.unique_id, input=sample.input, output=[output]
            )

            # Use asyncio.to_thread to wrap the sync function
            async with semaphore:
                result = await asyncio.to_thread(func, sample=subsample, **kwargs)

            return i, result

        # Create tasks for all outputs
        tasks = []
        for i, output in enumerate(sample.output):
            task = asyncio.create_task(_async_evaluate_output(i, output))
            tasks.append(task)

        # Wait for all tasks to complete
        results = await asyncio.gather(*tasks)

        # Merge results back into sample outputs
        for i, result in results:
            output = sample.output[i]
            output.answer.reward.details += result.details
            output.answer.additional_kwargs[self.name] = result.extra_data

        # Calculate average score for each output
        for output in sample.output:
            if len(output.answer.reward.details) > 0:
                output.answer.reward.score = sum(
                    r.score for r in output.answer.reward.details
                ) / len(output.answer.reward.details)

        return sample

DataSample

Bases: BaseModel

Complete data sample structure for reward modeling training and evaluation.

Represents a single interaction with input context, multiple possible outputs, and associated metadata for comprehensive reward model training.

Attributes:

Name Type Description
unique_id str

Unique identifier for tracking and deduplication

input List[ChatMessage]

Conversation context as list of chat messages

output List[DataOutput]

List of possible responses with evaluations

task_category Optional[str]

Optional categorization for task-specific analysis

source Optional[str]

Origin dataset or system that generated this sample

created_at datetime

Timestamp for temporal tracking

metadata Optional[Dict]

Additional context and debugging information

Source code in rm_gallery/core/data/schema.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
class DataSample(BaseModel):
    """
    Complete data sample structure for reward modeling training and evaluation.

    Represents a single interaction with input context, multiple possible outputs,
    and associated metadata for comprehensive reward model training.

    Attributes:
        unique_id: Unique identifier for tracking and deduplication
        input: Conversation context as list of chat messages
        output: List of possible responses with evaluations
        task_category: Optional categorization for task-specific analysis
        source: Origin dataset or system that generated this sample
        created_at: Timestamp for temporal tracking
        metadata: Additional context and debugging information
    """

    unique_id: str = Field(..., description="Unique identifier for the data")
    input: List[ChatMessage] = Field(default_factory=list, description="input")
    output: List[DataOutput] = Field(default_factory=list, description="output")
    task_category: Optional[str] = Field(default=None, description="task category")
    source: Optional[str] = Field(default=None, description="source")
    created_at: datetime = Field(default_factory=datetime.now, description="createdAt")
    metadata: Optional[Dict] = Field(default=None, description="metadata")

    def update(self, sample: "DataSample") -> "DataSample":
        """
        Merge another sample's data into this sample for combining evaluations.

        Updates additional_kwargs and reward details from the source sample
        while preserving the original structure.

        Args:
            sample: Source sample to merge data from

        Returns:
            Self with updated data for method chaining
        """
        self.input[-1].additional_kwargs.update(sample.input[-1].additional_kwargs)
        for i, output in enumerate(self.output):
            output.answer.additional_kwargs.update(
                sample.output[i].answer.additional_kwargs
            )
            output.answer.reward.details.extend(sample.output[i].answer.reward.details)

            if output.steps:
                for j, step in output.steps:
                    step.additional_kwargs.update(
                        sample.output[i].steps[j].additional_kwargs
                    )
                    step.reward.details.extend(sample.output[i].steps[j].reward.details)
        return self

    class Config:
        arbitrary_types_allowed = True
        json_encoders = {datetime: lambda v: v.isoformat()}

update(sample)

Merge another sample's data into this sample for combining evaluations.

Updates additional_kwargs and reward details from the source sample while preserving the original structure.

Parameters:

Name Type Description Default
sample DataSample

Source sample to merge data from

required

Returns:

Type Description
DataSample

Self with updated data for method chaining

Source code in rm_gallery/core/data/schema.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def update(self, sample: "DataSample") -> "DataSample":
    """
    Merge another sample's data into this sample for combining evaluations.

    Updates additional_kwargs and reward details from the source sample
    while preserving the original structure.

    Args:
        sample: Source sample to merge data from

    Returns:
        Self with updated data for method chaining
    """
    self.input[-1].additional_kwargs.update(sample.input[-1].additional_kwargs)
    for i, output in enumerate(self.output):
        output.answer.additional_kwargs.update(
            sample.output[i].answer.additional_kwargs
        )
        output.answer.reward.details.extend(sample.output[i].answer.reward.details)

        if output.steps:
            for j, step in output.steps:
                step.additional_kwargs.update(
                    sample.output[i].steps[j].additional_kwargs
                )
                step.reward.details.extend(sample.output[i].steps[j].reward.details)
    return self

DetoxifyReward

Bases: BasePointWiseReward

Detoxify: Detecting different types of of toxicity like threats, obscenity, insults ans so on.

Source code in rm_gallery/gallery/rm/alignment/harmlessness/detoxification.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
@RewardRegistry.register("detoxify_reward")
class DetoxifyReward(BasePointWiseReward):
    """Detoxify: Detecting different types of of toxicity like threats, obscenity, insults ans so on."""

    name: str = Field(default="detoxify", description="Name of the reward module")
    model_name: str = Field(
        default="unbiased", description="Name of the Detoxify model to use"
    )

    @property
    def model(self):
        if not hasattr(self, "_model"):
            from detoxify import Detoxify

            self._model = Detoxify(self.model_name)
        return self._model

    def _evaluate(self, sample: DataSample, **kwargs) -> RewardResult:
        """
        Evaluate text toxicity using Detoxify model.

        Args:
            sample: Input data sample containing text to evaluate
            **kwargs: Additional implementation-specific parameters

        Returns:
            RewardResult: Computed reward metrics and metadata
        """
        try:
            # Get text from sample
            text = sample.output[0] if sample.output else sample.input

            if not text:
                raise ValueError("No text provided for evaluation")

            # Get model predictions
            predictions = self.model.predict(text)

            # Convert toxicity score to reward (higher = less toxic)
            toxicity_score = predictions["toxicity"]
            reward_score = 1.0 - toxicity_score  # Invert score so higher is better

            # Create reward dimension
            reward_dimension = RewardDimensionWithScore(
                name="detoxify",
                score=reward_score,
                reason=f"Text toxicity score: {toxicity_score:.2f}. Higher reward indicates less toxic content.",
            )

            return RewardResult(name=self.name, details=[reward_dimension])

        except Exception as e:
            logger.error(f"Error in Detoxify evaluation: {str(e)}")
            return RewardResult(name=self.name, details=[])

RewardDimensionWithScore

Bases: RewardDimension

Pointwise/Stepwise reward dimension with a numerical score.

Attributes:

Name Type Description
score float

Numerical value representing the reward magnitude

Source code in rm_gallery/core/reward/schema.py
20
21
22
23
24
25
26
27
28
class RewardDimensionWithScore(RewardDimension):
    """
    Pointwise/Stepwise reward dimension with a numerical score.

    Attributes:
        score (float): Numerical value representing the reward magnitude
    """

    score: float = Field(default=..., description="score")

RewardRegistry

A registry management system for reward modules that maps module names to their corresponding implementation classes.

This class provides a centralized repository for registering and retrieving reward modules by string identifiers. Modules can be registered using decorators and later accessed by their string identifiers.

Attributes:

Name Type Description
_registry Dict[str, Type[BaseReward]]

Internal dictionary storing the mapping between reward module names and their classes.

Source code in rm_gallery/core/reward/registry.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
class RewardRegistry:
    """A registry management system for reward modules that maps module names to their corresponding implementation classes.

    This class provides a centralized repository for registering and retrieving reward modules by string identifiers.
    Modules can be registered using decorators and later accessed by their string identifiers.

    Attributes:
        _registry: Internal dictionary storing the mapping between reward module names and their classes.
    """

    # Dictionary mapping reward module names to their corresponding classes
    _registry: Dict[str, Type[BaseReward]] = {}

    @classmethod
    def register(cls, name: str):
        """Create a decorator to register a reward module class with a specified identifier.

        The decorator pattern allows classes to be registered while maintaining their original identity.

        Args:
            name: Unique string identifier for the reward module
            module: The BaseReward subclass to be registered

        Returns:
            A decorator function that registers the module when applied to a class
        """

        def _register(module: Type[BaseReward]):
            """Internal registration function that stores the module in the registry.

            Args:
                module: The BaseReward subclass to be registered

            Returns:
                The original module class (unchanged)
            """
            cls._registry[name] = module
            return module

        return _register

    @classmethod
    def get(cls, name: str) -> Type[BaseReward] | None:
        """Retrieve a registered reward module class by its identifier.

        Provides safe access to registered modules without raising errors for missing entries.

        Args:
            name: String identifier of the reward module to retrieve

        Returns:
            The corresponding BaseReward subclass if found, None otherwise
        """
        assert name in cls._registry, f"Reward module '{name}' not found"
        return cls._registry.get(name, None)

    @classmethod
    def list(cls) -> str:
        """
        Returns:
            A list of all registered reward modules
        """
        info = []
        for name, module in cls._registry.items():
            info.append(
                pd.Series(
                    {
                        "Name": name,
                        "Class": module.__name__,
                        "Scenario": module.__doc__.strip(),
                    }
                )
            )

        info_df = pd.concat(info, axis=1).T
        # info_str = info_df.to_markdown(index=False)
        info_str = tabulate(
            info_df,
            headers="keys",
            tablefmt="grid",
            maxcolwidths=[50] * (len(info_df.columns) + 1),
            # showindex=False,
        )
        # info_str = tabulate(info_df, headers='keys', tablefmt='github')
        return info_str

get(name) classmethod

Retrieve a registered reward module class by its identifier.

Provides safe access to registered modules without raising errors for missing entries.

Parameters:

Name Type Description Default
name str

String identifier of the reward module to retrieve

required

Returns:

Type Description
Type[BaseReward] | None

The corresponding BaseReward subclass if found, None otherwise

Source code in rm_gallery/core/reward/registry.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
@classmethod
def get(cls, name: str) -> Type[BaseReward] | None:
    """Retrieve a registered reward module class by its identifier.

    Provides safe access to registered modules without raising errors for missing entries.

    Args:
        name: String identifier of the reward module to retrieve

    Returns:
        The corresponding BaseReward subclass if found, None otherwise
    """
    assert name in cls._registry, f"Reward module '{name}' not found"
    return cls._registry.get(name, None)

list() classmethod

Returns:

Type Description
str

A list of all registered reward modules

Source code in rm_gallery/core/reward/registry.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
@classmethod
def list(cls) -> str:
    """
    Returns:
        A list of all registered reward modules
    """
    info = []
    for name, module in cls._registry.items():
        info.append(
            pd.Series(
                {
                    "Name": name,
                    "Class": module.__name__,
                    "Scenario": module.__doc__.strip(),
                }
            )
        )

    info_df = pd.concat(info, axis=1).T
    # info_str = info_df.to_markdown(index=False)
    info_str = tabulate(
        info_df,
        headers="keys",
        tablefmt="grid",
        maxcolwidths=[50] * (len(info_df.columns) + 1),
        # showindex=False,
    )
    # info_str = tabulate(info_df, headers='keys', tablefmt='github')
    return info_str

register(name) classmethod

Create a decorator to register a reward module class with a specified identifier.

The decorator pattern allows classes to be registered while maintaining their original identity.

Parameters:

Name Type Description Default
name str

Unique string identifier for the reward module

required
module

The BaseReward subclass to be registered

required

Returns:

Type Description

A decorator function that registers the module when applied to a class

Source code in rm_gallery/core/reward/registry.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@classmethod
def register(cls, name: str):
    """Create a decorator to register a reward module class with a specified identifier.

    The decorator pattern allows classes to be registered while maintaining their original identity.

    Args:
        name: Unique string identifier for the reward module
        module: The BaseReward subclass to be registered

    Returns:
        A decorator function that registers the module when applied to a class
    """

    def _register(module: Type[BaseReward]):
        """Internal registration function that stores the module in the registry.

        Args:
            module: The BaseReward subclass to be registered

        Returns:
            The original module class (unchanged)
        """
        cls._registry[name] = module
        return module

    return _register

RewardResult

Bases: BaseModel, Generic[T]

Container for reward calculation results with generic type support.

Attributes:

Name Type Description
name str

Identifier of the reward module that generated this result

details List[T]

Collection of detailed reward information items

extra_data dict

Additional metadata or context information

Source code in rm_gallery/core/reward/schema.py
65
66
67
68
69
70
71
72
73
74
75
76
77
class RewardResult(BaseModel, Generic[T]):
    """
    Container for reward calculation results with generic type support.

    Attributes:
        name (str): Identifier of the reward module that generated this result
        details (List[T]): Collection of detailed reward information items
        extra_data (dict): Additional metadata or context information
    """

    name: str = Field(default=..., description="reward module name")
    details: List[T] = Field(default_factory=list, description="reward details")
    extra_data: dict = Field(default_factory=dict, description="extra data")

SafetyListWiseReward

Bases: BaseHarmlessnessListWiseReward

Safety: Comply with or refuse prompts related to harmful use cases as well as general compliance behaviors.

Source code in rm_gallery/gallery/rm/alignment/harmlessness/safety.py
19
20
21
22
23
24
25
26
@RewardRegistry.register("safety_listwise_reward")
class SafetyListWiseReward(BaseHarmlessnessListWiseReward):
    """Safety: Comply with or refuse prompts related to harmful use cases as well as general compliance behaviors."""

    name: str = Field(default="safety_listwise_reward")
    desc: str = Field(default=DESC)
    scenario: str = Field(default=SCENARIO, description="assistant scenario")
    principles: List[str] = Field(default=PRINCIPLES)