Skip to content

composition

Module for composing multiple reward evaluation modules with weighting and routing strategies. Implements base classes and concrete compositions for handling complex reward calculations.

BaseComposition

Bases: BaseReward

Base class for reward compositions that provides shared configuration parameters.

Attributes:

Name Type Description
params Dict[str, Any]

General parameters dictionary containing shared configurations like LLM settings

Source code in rm_gallery/core/reward/composition.py
19
20
21
22
23
24
25
26
27
28
29
class BaseComposition(BaseReward):
    """
    Base class for reward compositions that provides shared configuration parameters.

    Attributes:
        params: General parameters dictionary containing shared configurations like LLM settings
    """

    params: Dict[str, Any] = Field(
        default={}, description="general parameters like llm"
    )

RouterComposition

Bases: SimpleComposition

Base class for conditional reward routing that selects different reward compositions based on input sample characteristics.

Attributes:

Name Type Description
router

Dictionary mapping condition keys to reward composition instances

Source code in rm_gallery/core/reward/composition.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
class RouterComposition(SimpleComposition):
    """
    Base class for conditional reward routing that selects different reward compositions
    based on input sample characteristics.

    Attributes:
        router: Dictionary mapping condition keys to reward composition instances
    """

    @abstractmethod
    def _condition(self, sample: DataSample) -> str:
        """
        Determine routing condition based on input sample.
        Must be implemented by subclasses to return a router key.

        Args:
            sample: Input data sample to evaluate

        Returns:
            str: Key identifying which reward composition to use
        """
        ...

    def evaluate(
        self, sample: DataSample, thread_pool: ThreadPoolExecutor | None = None
    ) -> DataSample:
        """
        Route sample to appropriate reward composition based on condition.

        Args:
            sample: Input data sample to evaluate
            thread_pool: Optional thread pool executor for parallel execution

        Returns:
            DataSample with updated reward information
        """
        condition = self._condition(sample)
        sample = self.rewards[condition].evaluate(sample, thread_pool)
        return sample

    async def async_evaluate(
        self, sample: DataSample, semaphore: asyncio.Semaphore | None = None
    ) -> DataSample:
        """
        Async version of evaluate method that routes sample to appropriate reward composition.

        Args:
            sample: Input data sample to evaluate
            semaphore: Optional semaphore for concurrency control

        Returns:
            DataSample with updated reward information
        """
        if semaphore is None:
            semaphore = asyncio.Semaphore(self.max_workers)

        condition = self._condition(sample)
        sample = await self.rewards[condition].async_evaluate(sample, semaphore)
        return sample

async_evaluate(sample, semaphore=None) async

Async version of evaluate method that routes sample to appropriate reward composition.

Parameters:

Name Type Description Default
sample DataSample

Input data sample to evaluate

required
semaphore Semaphore | None

Optional semaphore for concurrency control

None

Returns:

Type Description
DataSample

DataSample with updated reward information

Source code in rm_gallery/core/reward/composition.py
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
async def async_evaluate(
    self, sample: DataSample, semaphore: asyncio.Semaphore | None = None
) -> DataSample:
    """
    Async version of evaluate method that routes sample to appropriate reward composition.

    Args:
        sample: Input data sample to evaluate
        semaphore: Optional semaphore for concurrency control

    Returns:
        DataSample with updated reward information
    """
    if semaphore is None:
        semaphore = asyncio.Semaphore(self.max_workers)

    condition = self._condition(sample)
    sample = await self.rewards[condition].async_evaluate(sample, semaphore)
    return sample

evaluate(sample, thread_pool=None)

Route sample to appropriate reward composition based on condition.

Parameters:

Name Type Description Default
sample DataSample

Input data sample to evaluate

required
thread_pool ThreadPoolExecutor | None

Optional thread pool executor for parallel execution

None

Returns:

Type Description
DataSample

DataSample with updated reward information

Source code in rm_gallery/core/reward/composition.py
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
def evaluate(
    self, sample: DataSample, thread_pool: ThreadPoolExecutor | None = None
) -> DataSample:
    """
    Route sample to appropriate reward composition based on condition.

    Args:
        sample: Input data sample to evaluate
        thread_pool: Optional thread pool executor for parallel execution

    Returns:
        DataSample with updated reward information
    """
    condition = self._condition(sample)
    sample = self.rewards[condition].evaluate(sample, thread_pool)
    return sample

SimpleComposition

Bases: BaseComposition

Composite reward module that combines multiple reward modules with weighted averaging. Supports both sequential and parallel execution modes for reward evaluation.

Attributes:

Name Type Description
weights Dict[str, float]

Dictionary mapping reward dimension names to their respective weights

rewards Dict[str, Dict[str, Any] | BaseReward]

Dict of reward module configurations or instances

is_parallel bool

Flag indicating whether to evaluate modules in parallel

Source code in rm_gallery/core/reward/composition.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
class SimpleComposition(BaseComposition):
    """
    Composite reward module that combines multiple reward modules with weighted averaging.
    Supports both sequential and parallel execution modes for reward evaluation.

    Attributes:
        weights: Dictionary mapping reward dimension names to their respective weights
        rewards: Dict of reward module configurations or instances
        is_parallel: Flag indicating whether to evaluate modules in parallel
    """

    weights: Dict[str, float] = Field(default={}, description="weight for each reward")
    rewards: Dict[str, Dict[str, Any] | BaseReward] = Field(
        default_factory=dict, description="reward modules"
    )
    is_parallel: bool = Field(default=False, description="parallel or not")

    def __init__(self, *args, **kwargs):
        """
        Initialize reward modules from configurations.
        Converts dictionary configurations to actual reward module instances using the registry.

        Args:
            *args: Variable length argument list passed to parent constructor
            **kwargs: Arbitrary keyword arguments passed to parent constructor
        """
        super().__init__(*args, **kwargs)

        for name, reward in self.rewards.items():
            if isinstance(reward, dict):
                params = {k: v for k, v in self.params.items()}
                params.update(reward.get("params", {}))
                params["name"] = name

                if isinstance(reward["cls"], str):
                    self.rewards[name] = RewardRegistry.get(reward["cls"])(**params)

                elif issubclass(reward["cls"], BaseReward):
                    self.rewards[name] = reward["cls"](
                        **params,
                    )
                else:
                    raise ValueError(f"Invalid dimension: {reward}")
            elif isinstance(reward, str):
                self.rewards[name] = RewardRegistry.get(reward)(**self.params)
            elif isinstance(reward, BaseReward):
                self.rewards[name] = reward
            elif issubclass(reward, BaseReward):
                self.rewards[name] = reward(**self.params)
            else:
                raise NotImplementedError(f"Invalid dimension: {reward}")

    def evaluate(
        self, sample: DataSample, thread_pool: ThreadPoolExecutor | None = None
    ) -> DataSample:
        """
        Evaluate rewards using configured modules with optional parallel execution.

        Args:
            sample: Input data sample to evaluate
            thread_pool: Optional thread pool executor for parallel execution

        Returns:
            DataSample with updated reward information
        """
        # Parallel evaluation using thread pool
        if self.is_parallel and thread_pool is not None:
            sample = deepcopy(sample)
            futures = []
            for name, reward in self.rewards.items():
                futures.append(
                    thread_pool.submit(
                        reward.evaluate, sample=sample, thread_pool=thread_pool
                    )
                )

            wait(futures, return_when=ALL_COMPLETED)
            samples = [future.result() for future in futures]

            # Merge results from parallel evaluations
            for s in samples:
                sample.update(s)

        # Sequential evaluation mode
        else:
            for name, reward in self.rewards.items():
                sample = reward.evaluate(sample, thread_pool)

        # Weighted reward calculation function (executed for both parallel and sequential modes)
        def weight(reward: Reward):
            """Calculate weighted average based on configured weights"""
            w_sum = 0
            d_sum = 0
            for d in reward.details:
                w = self.weights.get(d.name, 1.0)
                w_sum += w
                d_sum += w * d.score
            if w_sum != 0:
                reward.score = d_sum / w_sum

        # Apply weighting to all output rewards
        for output in sample.output:
            weight(output.answer.reward)
            if output.steps:
                for step in output.steps:
                    weight(step.reward)

        return sample

    async def async_evaluate(
        self, sample: DataSample, semaphore: asyncio.Semaphore | None = None
    ) -> DataSample:
        """
        Async version of evaluate method that supports async parallel execution.

        Args:
            sample: Input data sample to evaluate
            semaphore: Optional semaphore for concurrency control

        Returns:
            DataSample with updated reward information
        """
        if semaphore is None:
            semaphore = asyncio.Semaphore(self.max_workers)

        sample = deepcopy(sample)

        async def _async_evaluate_reward(name: str, reward: BaseReward):
            """Async wrapper for individual reward evaluation"""
            return await reward.async_evaluate(sample, semaphore)

        # Create tasks for all reward modules
        tasks = []
        for name, reward in self.rewards.items():
            task = asyncio.create_task(_async_evaluate_reward(name, reward))
            tasks.append(task)

        # Wait for all tasks to complete
        samples = await asyncio.gather(*tasks)

        # Merge results from parallel evaluations
        for s in samples:
            sample.update(s)

        # Weighted reward calculation function (executed for both parallel and sequential modes)
        def weight(reward: Reward):
            """Calculate weighted average based on configured weights"""
            w_sum = 0
            d_sum = 0
            for d in reward.details:
                w = self.weights.get(d.name, 1.0)
                w_sum += w
                d_sum += w * d.score
            if w_sum != 0:
                reward.score = d_sum / w_sum

        # Apply weighting to all output rewards
        for output in sample.output:
            weight(output.answer.reward)
            if output.steps:
                for step in output.steps:
                    weight(step.reward)

        return sample

__init__(*args, **kwargs)

Initialize reward modules from configurations. Converts dictionary configurations to actual reward module instances using the registry.

Parameters:

Name Type Description Default
*args

Variable length argument list passed to parent constructor

()
**kwargs

Arbitrary keyword arguments passed to parent constructor

{}
Source code in rm_gallery/core/reward/composition.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def __init__(self, *args, **kwargs):
    """
    Initialize reward modules from configurations.
    Converts dictionary configurations to actual reward module instances using the registry.

    Args:
        *args: Variable length argument list passed to parent constructor
        **kwargs: Arbitrary keyword arguments passed to parent constructor
    """
    super().__init__(*args, **kwargs)

    for name, reward in self.rewards.items():
        if isinstance(reward, dict):
            params = {k: v for k, v in self.params.items()}
            params.update(reward.get("params", {}))
            params["name"] = name

            if isinstance(reward["cls"], str):
                self.rewards[name] = RewardRegistry.get(reward["cls"])(**params)

            elif issubclass(reward["cls"], BaseReward):
                self.rewards[name] = reward["cls"](
                    **params,
                )
            else:
                raise ValueError(f"Invalid dimension: {reward}")
        elif isinstance(reward, str):
            self.rewards[name] = RewardRegistry.get(reward)(**self.params)
        elif isinstance(reward, BaseReward):
            self.rewards[name] = reward
        elif issubclass(reward, BaseReward):
            self.rewards[name] = reward(**self.params)
        else:
            raise NotImplementedError(f"Invalid dimension: {reward}")

async_evaluate(sample, semaphore=None) async

Async version of evaluate method that supports async parallel execution.

Parameters:

Name Type Description Default
sample DataSample

Input data sample to evaluate

required
semaphore Semaphore | None

Optional semaphore for concurrency control

None

Returns:

Type Description
DataSample

DataSample with updated reward information

Source code in rm_gallery/core/reward/composition.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
async def async_evaluate(
    self, sample: DataSample, semaphore: asyncio.Semaphore | None = None
) -> DataSample:
    """
    Async version of evaluate method that supports async parallel execution.

    Args:
        sample: Input data sample to evaluate
        semaphore: Optional semaphore for concurrency control

    Returns:
        DataSample with updated reward information
    """
    if semaphore is None:
        semaphore = asyncio.Semaphore(self.max_workers)

    sample = deepcopy(sample)

    async def _async_evaluate_reward(name: str, reward: BaseReward):
        """Async wrapper for individual reward evaluation"""
        return await reward.async_evaluate(sample, semaphore)

    # Create tasks for all reward modules
    tasks = []
    for name, reward in self.rewards.items():
        task = asyncio.create_task(_async_evaluate_reward(name, reward))
        tasks.append(task)

    # Wait for all tasks to complete
    samples = await asyncio.gather(*tasks)

    # Merge results from parallel evaluations
    for s in samples:
        sample.update(s)

    # Weighted reward calculation function (executed for both parallel and sequential modes)
    def weight(reward: Reward):
        """Calculate weighted average based on configured weights"""
        w_sum = 0
        d_sum = 0
        for d in reward.details:
            w = self.weights.get(d.name, 1.0)
            w_sum += w
            d_sum += w * d.score
        if w_sum != 0:
            reward.score = d_sum / w_sum

    # Apply weighting to all output rewards
    for output in sample.output:
        weight(output.answer.reward)
        if output.steps:
            for step in output.steps:
                weight(step.reward)

    return sample

evaluate(sample, thread_pool=None)

Evaluate rewards using configured modules with optional parallel execution.

Parameters:

Name Type Description Default
sample DataSample

Input data sample to evaluate

required
thread_pool ThreadPoolExecutor | None

Optional thread pool executor for parallel execution

None

Returns:

Type Description
DataSample

DataSample with updated reward information

Source code in rm_gallery/core/reward/composition.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
def evaluate(
    self, sample: DataSample, thread_pool: ThreadPoolExecutor | None = None
) -> DataSample:
    """
    Evaluate rewards using configured modules with optional parallel execution.

    Args:
        sample: Input data sample to evaluate
        thread_pool: Optional thread pool executor for parallel execution

    Returns:
        DataSample with updated reward information
    """
    # Parallel evaluation using thread pool
    if self.is_parallel and thread_pool is not None:
        sample = deepcopy(sample)
        futures = []
        for name, reward in self.rewards.items():
            futures.append(
                thread_pool.submit(
                    reward.evaluate, sample=sample, thread_pool=thread_pool
                )
            )

        wait(futures, return_when=ALL_COMPLETED)
        samples = [future.result() for future in futures]

        # Merge results from parallel evaluations
        for s in samples:
            sample.update(s)

    # Sequential evaluation mode
    else:
        for name, reward in self.rewards.items():
            sample = reward.evaluate(sample, thread_pool)

    # Weighted reward calculation function (executed for both parallel and sequential modes)
    def weight(reward: Reward):
        """Calculate weighted average based on configured weights"""
        w_sum = 0
        d_sum = 0
        for d in reward.details:
            w = self.weights.get(d.name, 1.0)
            w_sum += w
            d_sum += w * d.score
        if w_sum != 0:
            reward.score = d_sum / w_sum

    # Apply weighting to all output rewards
    for output in sample.output:
        weight(output.answer.reward)
        if output.steps:
            for step in output.steps:
                weight(step.reward)

    return sample