Skip to content

composition

Module for composing multiple reward evaluation modules with weighting and routing strategies. Implements base classes and concrete compositions for handling complex reward calculations.

BaseComposition

Bases: BaseReward

Base class for reward compositions that provides shared configuration parameters.

Attributes:

Name Type Description
params Dict[str, Any]

General parameters dictionary containing shared configurations like LLM settings

Source code in rm_gallery/core/reward/composition.py
18
19
20
21
22
23
24
25
26
27
28
class BaseComposition(BaseReward):
    """
    Base class for reward compositions that provides shared configuration parameters.

    Attributes:
        params: General parameters dictionary containing shared configurations like LLM settings
    """

    params: Dict[str, Any] = Field(
        default={}, description="general parameters like llm"
    )

RouterComposition

Bases: SimpleComposition

Base class for conditional reward routing that selects different reward compositions based on input sample characteristics.

Attributes:

Name Type Description
router

Dictionary mapping condition keys to reward composition instances

Source code in rm_gallery/core/reward/composition.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
class RouterComposition(SimpleComposition):
    """
    Base class for conditional reward routing that selects different reward compositions
    based on input sample characteristics.

    Attributes:
        router: Dictionary mapping condition keys to reward composition instances
    """

    @abstractmethod
    def _condition(self, sample: DataSample) -> str:
        """
        Determine routing condition based on input sample.
        Must be implemented by subclasses to return a router key.

        Args:
            sample: Input data sample to evaluate

        Returns:
            str: Key identifying which reward composition to use
        """
        ...

    def evaluate(
        self, sample: DataSample, thread_pool: ThreadPoolExecutor | None = None
    ) -> DataSample:
        """
        Route sample to appropriate reward composition based on condition.

        Args:
            sample: Input data sample to evaluate
            thread_pool: Optional thread pool executor for parallel execution

        Returns:
            DataSample with updated reward information
        """
        condition = self._condition(sample)
        sample = self.rewards[condition].evaluate(sample, thread_pool)
        return sample

evaluate(sample, thread_pool=None)

Route sample to appropriate reward composition based on condition.

Parameters:

Name Type Description Default
sample DataSample

Input data sample to evaluate

required
thread_pool ThreadPoolExecutor | None

Optional thread pool executor for parallel execution

None

Returns:

Type Description
DataSample

DataSample with updated reward information

Source code in rm_gallery/core/reward/composition.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
def evaluate(
    self, sample: DataSample, thread_pool: ThreadPoolExecutor | None = None
) -> DataSample:
    """
    Route sample to appropriate reward composition based on condition.

    Args:
        sample: Input data sample to evaluate
        thread_pool: Optional thread pool executor for parallel execution

    Returns:
        DataSample with updated reward information
    """
    condition = self._condition(sample)
    sample = self.rewards[condition].evaluate(sample, thread_pool)
    return sample

SimpleComposition

Bases: BaseComposition

Composite reward module that combines multiple reward modules with weighted averaging. Supports both sequential and parallel execution modes for reward evaluation.

Attributes:

Name Type Description
weights Dict[str, float]

Dictionary mapping reward dimension names to their respective weights

rewards Dict[str, Dict[str, Any] | BaseReward]

Dict of reward module configurations or instances

is_parallel bool

Flag indicating whether to evaluate modules in parallel

Source code in rm_gallery/core/reward/composition.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
class SimpleComposition(BaseComposition):
    """
    Composite reward module that combines multiple reward modules with weighted averaging.
    Supports both sequential and parallel execution modes for reward evaluation.

    Attributes:
        weights: Dictionary mapping reward dimension names to their respective weights
        rewards: Dict of reward module configurations or instances
        is_parallel: Flag indicating whether to evaluate modules in parallel
    """

    weights: Dict[str, float] = Field(default={}, description="weight for each reward")
    rewards: Dict[str, Dict[str, Any] | BaseReward] = Field(
        default_factory=dict, description="reward modules"
    )
    is_parallel: bool = Field(default=False, description="parallel or not")

    def __init__(self, *args, **kwargs):
        """
        Initialize reward modules from configurations.
        Converts dictionary configurations to actual reward module instances using the registry.

        Args:
            *args: Variable length argument list passed to parent constructor
            **kwargs: Arbitrary keyword arguments passed to parent constructor
        """
        super().__init__(*args, **kwargs)
        for name, reward in self.rewards.items():
            if isinstance(reward, dict):
                params = {k: v for k, v in self.params.items()}
                params.update(reward.get("params", {}))
                params["name"] = name

                if isinstance(reward["cls"], str):
                    self.rewards[name] = RewardRegistry.get(reward["cls"])(**params)

                elif issubclass(reward["cls"], BaseReward):
                    self.rewards[name] = reward["cls"](
                        **params,
                    )
                else:
                    raise ValueError(f"Invalid dimension: {reward}")
            elif isinstance(reward, str):
                self.rewards[name] = RewardRegistry.get(reward)(**self.params)
            elif isinstance(reward, BaseReward):
                self.rewards[name] = reward
            elif issubclass(reward, BaseReward):
                self.rewards[name] = reward(**self.params)
            else:
                raise NotImplementedError(f"Invalid dimension: {reward}")

    def evaluate(
        self, sample: DataSample, thread_pool: ThreadPoolExecutor | None = None
    ) -> DataSample:
        """
        Evaluate rewards using configured modules with optional parallel execution.

        Args:
            sample: Input data sample to evaluate
            thread_pool: Optional thread pool executor for parallel execution

        Returns:
            DataSample with updated reward information
        """
        # Parallel evaluation using thread pool
        if self.is_parallel and thread_pool is not None:
            sample = deepcopy(sample)
            futures = []
            for name, reward in self.rewards.items():
                futures.append(
                    thread_pool.submit(
                        reward.evaluate, sample=sample, thread_pool=thread_pool
                    )
                )

            wait(futures, return_when=ALL_COMPLETED)
            samples = [future.result() for future in futures]

            # Merge results from parallel evaluations
            for s in samples:
                sample.update(s)

        # Sequential evaluation mode
        else:
            for name, reward in self.rewards.items():
                sample = reward.evaluate(sample, thread_pool)

        # Weighted reward calculation function (executed for both parallel and sequential modes)
        def weight(reward: Reward):
            """Calculate weighted average based on configured weights"""
            w_sum = 0
            d_sum = 0
            for d in reward.details:
                w = self.weights.get(d.name, 1.0)
                w_sum += w
                d_sum += w * d.score
            if w_sum != 0:
                reward.score = d_sum / w_sum

        # Apply weighting to all output rewards
        for output in sample.output:
            weight(output.answer.reward)
            if output.steps:
                for step in output.steps:
                    weight(step.reward)

        return sample

__init__(*args, **kwargs)

Initialize reward modules from configurations. Converts dictionary configurations to actual reward module instances using the registry.

Parameters:

Name Type Description Default
*args

Variable length argument list passed to parent constructor

()
**kwargs

Arbitrary keyword arguments passed to parent constructor

{}
Source code in rm_gallery/core/reward/composition.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def __init__(self, *args, **kwargs):
    """
    Initialize reward modules from configurations.
    Converts dictionary configurations to actual reward module instances using the registry.

    Args:
        *args: Variable length argument list passed to parent constructor
        **kwargs: Arbitrary keyword arguments passed to parent constructor
    """
    super().__init__(*args, **kwargs)
    for name, reward in self.rewards.items():
        if isinstance(reward, dict):
            params = {k: v for k, v in self.params.items()}
            params.update(reward.get("params", {}))
            params["name"] = name

            if isinstance(reward["cls"], str):
                self.rewards[name] = RewardRegistry.get(reward["cls"])(**params)

            elif issubclass(reward["cls"], BaseReward):
                self.rewards[name] = reward["cls"](
                    **params,
                )
            else:
                raise ValueError(f"Invalid dimension: {reward}")
        elif isinstance(reward, str):
            self.rewards[name] = RewardRegistry.get(reward)(**self.params)
        elif isinstance(reward, BaseReward):
            self.rewards[name] = reward
        elif issubclass(reward, BaseReward):
            self.rewards[name] = reward(**self.params)
        else:
            raise NotImplementedError(f"Invalid dimension: {reward}")

evaluate(sample, thread_pool=None)

Evaluate rewards using configured modules with optional parallel execution.

Parameters:

Name Type Description Default
sample DataSample

Input data sample to evaluate

required
thread_pool ThreadPoolExecutor | None

Optional thread pool executor for parallel execution

None

Returns:

Type Description
DataSample

DataSample with updated reward information

Source code in rm_gallery/core/reward/composition.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def evaluate(
    self, sample: DataSample, thread_pool: ThreadPoolExecutor | None = None
) -> DataSample:
    """
    Evaluate rewards using configured modules with optional parallel execution.

    Args:
        sample: Input data sample to evaluate
        thread_pool: Optional thread pool executor for parallel execution

    Returns:
        DataSample with updated reward information
    """
    # Parallel evaluation using thread pool
    if self.is_parallel and thread_pool is not None:
        sample = deepcopy(sample)
        futures = []
        for name, reward in self.rewards.items():
            futures.append(
                thread_pool.submit(
                    reward.evaluate, sample=sample, thread_pool=thread_pool
                )
            )

        wait(futures, return_when=ALL_COMPLETED)
        samples = [future.result() for future in futures]

        # Merge results from parallel evaluations
        for s in samples:
            sample.update(s)

    # Sequential evaluation mode
    else:
        for name, reward in self.rewards.items():
            sample = reward.evaluate(sample, thread_pool)

    # Weighted reward calculation function (executed for both parallel and sequential modes)
    def weight(reward: Reward):
        """Calculate weighted average based on configured weights"""
        w_sum = 0
        d_sum = 0
        for d in reward.details:
            w = self.weights.get(d.name, 1.0)
            w_sum += w
            d_sum += w * d.score
        if w_sum != 0:
            reward.score = d_sum / w_sum

    # Apply weighting to all output rewards
    for output in sample.output:
        weight(output.answer.reward)
        if output.steps:
            for step in output.steps:
                weight(step.reward)

    return sample