Skip to content

format

BasePointWiseReward

Bases: BaseReward

Point-wise reward module for individual response evaluation.

Evaluates each response independently without considering relative ranking.

Source code in rm_gallery/core/reward/base.py
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
class BasePointWiseReward(BaseReward):
    """
    Point-wise reward module for individual response evaluation.

    Evaluates each response independently without considering relative ranking.
    """

    @abstractmethod
    def _evaluate(
        self, sample: DataSample, **kwargs
    ) -> RewardResult[RewardDimensionWithScore]:
        """
        Processes a single response to generate reward metrics.

        Parameters:
            sample (DataSample): Single-response data sample
            **kwargs: Evaluation parameters

        Returns:
            RewardResult[RewardDimensionWithScore]: Response-specific reward metrics
        """
        ...

    def _parallel(
        self,
        func: Callable,
        sample: DataSample,
        thread_pool: ThreadPoolExecutor | None = None,
        **kwargs,
    ) -> DataSample:
        """
        Processes responses in a data sample using parallel or sequential execution.

        This method applies the provided function to each response in the sample,
        either in parallel using a thread pool or sequentially. Results are merged
        back into the corresponding response objects.

        Parameters:
            func (Callable): Function to apply to each response. Should accept a
                DataSample and return an object with 'details' and 'extra_data' attributes.
            sample (DataSample): Input sample containing multiple responses to process
            thread_pool (ThreadPoolExecutor | None): Optional thread pool for parallel execution
            **kwargs: Additional arguments passed to func

        Returns:
            DataSample: Modified copy of input sample with reward metrics updated in each response

        The method creates a deep copy of the input sample to avoid modifying original data.
        When using a thread pool, it submits tasks for each response and waits for completion
        before merging results. Response objects are updated with both reward details and
        additional metadata from processing results.
        """
        sample = sample.model_copy(deep=True)
        futures = []
        for i, output in enumerate(sample.output):
            # Create sub-sample for individual response processing
            subsample = DataSample(
                unique_id=sample.unique_id, input=sample.input, output=[output]
            )

            if thread_pool:
                futures.append(
                    (
                        i,
                        thread_pool.submit(
                            func, sample=subsample, thread_pool=thread_pool, **kwargs
                        ),
                    )
                )
            else:
                result = func(
                    sample=subsample,
                    thread_pool=thread_pool,
                    **kwargs,
                )
                output.answer.reward.details += result.details
                output.answer.additional_kwargs[self.name] = result.extra_data

        # Process parallel execution results
        if thread_pool:
            wait([future[-1] for future in futures], return_when=ALL_COMPLETED)
            # Merge results back into sample outputs
            for i, future in futures:
                result = future.result()
                output = sample.output[i]
                output.answer.reward.details += result.details
                output.answer.additional_kwargs[self.name] = result.extra_data

        for output in sample.output:
            if len(output.answer.reward.details) > 0:
                output.answer.reward.score = sum(
                    r.score for r in output.answer.reward.details
                ) / len(output.answer.reward.details)

        return sample

DataSample

Bases: BaseModel

Complete data sample structure for reward modeling training and evaluation.

Represents a single interaction with input context, multiple possible outputs, and associated metadata for comprehensive reward model training.

Attributes:

Name Type Description
unique_id str

Unique identifier for tracking and deduplication

input List[ChatMessage]

Conversation context as list of chat messages

output List[DataOutput]

List of possible responses with evaluations

task_category Optional[str]

Optional categorization for task-specific analysis

source Optional[str]

Origin dataset or system that generated this sample

created_at datetime

Timestamp for temporal tracking

metadata Optional[Dict]

Additional context and debugging information

Source code in rm_gallery/core/data/schema.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
class DataSample(BaseModel):
    """
    Complete data sample structure for reward modeling training and evaluation.

    Represents a single interaction with input context, multiple possible outputs,
    and associated metadata for comprehensive reward model training.

    Attributes:
        unique_id: Unique identifier for tracking and deduplication
        input: Conversation context as list of chat messages
        output: List of possible responses with evaluations
        task_category: Optional categorization for task-specific analysis
        source: Origin dataset or system that generated this sample
        created_at: Timestamp for temporal tracking
        metadata: Additional context and debugging information
    """

    unique_id: str = Field(..., description="Unique identifier for the data")
    input: List[ChatMessage] = Field(default_factory=list, description="input")
    output: List[DataOutput] = Field(default_factory=list, description="output")
    task_category: Optional[str] = Field(default=None, description="task category")
    source: Optional[str] = Field(default=None, description="source")
    created_at: datetime = Field(default_factory=datetime.now, description="createdAt")
    metadata: Optional[Dict] = Field(default=None, description="metadata")

    def update(self, sample: "DataSample") -> "DataSample":
        """
        Merge another sample's data into this sample for combining evaluations.

        Updates additional_kwargs and reward details from the source sample
        while preserving the original structure.

        Args:
            sample: Source sample to merge data from

        Returns:
            Self with updated data for method chaining
        """
        self.input[-1].additional_kwargs.update(sample.input[-1].additional_kwargs)
        for i, output in enumerate(self.output):
            output.answer.additional_kwargs.update(
                sample.output[i].answer.additional_kwargs
            )
            output.answer.reward.details.extend(sample.output[i].answer.reward.details)

            if output.steps:
                for j, step in output.steps:
                    step.additional_kwargs.update(
                        sample.output[i].steps[j].additional_kwargs
                    )
                    step.reward.details.extend(sample.output[i].steps[j].reward.details)
        return self

    class Config:
        arbitrary_types_allowed = True
        json_encoders = {datetime: lambda v: v.isoformat()}

update(sample)

Merge another sample's data into this sample for combining evaluations.

Updates additional_kwargs and reward details from the source sample while preserving the original structure.

Parameters:

Name Type Description Default
sample DataSample

Source sample to merge data from

required

Returns:

Type Description
DataSample

Self with updated data for method chaining

Source code in rm_gallery/core/data/schema.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def update(self, sample: "DataSample") -> "DataSample":
    """
    Merge another sample's data into this sample for combining evaluations.

    Updates additional_kwargs and reward details from the source sample
    while preserving the original structure.

    Args:
        sample: Source sample to merge data from

    Returns:
        Self with updated data for method chaining
    """
    self.input[-1].additional_kwargs.update(sample.input[-1].additional_kwargs)
    for i, output in enumerate(self.output):
        output.answer.additional_kwargs.update(
            sample.output[i].answer.additional_kwargs
        )
        output.answer.reward.details.extend(sample.output[i].answer.reward.details)

        if output.steps:
            for j, step in output.steps:
                step.additional_kwargs.update(
                    sample.output[i].steps[j].additional_kwargs
                )
                step.reward.details.extend(sample.output[i].steps[j].reward.details)
    return self

LengthPenaltyReward

Bases: BasePointWiseReward

Text length based penalty for content that is too short or too long.

Source code in rm_gallery/gallery/rm/format/format.py
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
@RewardRegistry.register("length_penalty")
class LengthPenaltyReward(BasePointWiseReward):
    """
    Text length based penalty for content that is too short or too long.
    """

    name: str = Field(default="length_penalty", description="Length penalty reward")
    min_length: int = Field(default=10, description="Minimum length")
    max_length: int = Field(default=1000, description="Maximum length")
    penalty_rate: float = Field(default=0.01, description="Penalty rate")

    def _evaluate(
        self, sample: DataSample, **kwargs
    ) -> RewardResult[RewardDimensionWithScore]:
        """
        Check code syntax.

        Args:
            sample: Data sample containing code content

        Returns:
            RewardResult: Reward result containing syntax check score
        """
        content = sample.output[0].answer.content
        length = len(content)

        penalty = 0.0
        reason_parts = []

        if length < self.min_length:
            penalty = -(self.min_length - length) * self.penalty_rate
            reason_parts.append(f"Too short: {length} < {self.min_length}")
        elif length > self.max_length:
            penalty = -(length - self.max_length) * self.penalty_rate
            reason_parts.append(f"Too long: {length} > {self.max_length}")
        else:
            reason_parts.append(
                f"Length acceptable: {self.min_length} <= {length} <= {self.max_length}"
            )

        return RewardResult(
            name=self.name,
            details=[
                RewardDimensionWithScore(
                    name=self.name, score=penalty, reason="; ".join(reason_parts)
                )
            ],
            extra_data={
                "length": length,
                "min_length": self.min_length,
                "max_length": self.max_length,
                "penalty": penalty,
            },
        )

NgramRepetitionPenaltyReward

Bases: BasePointWiseReward

Calculate N-gram repetition penalty supporting Chinese processing and multiple penalty strategies.

Source code in rm_gallery/gallery/rm/format/format.py
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
@RewardRegistry.register("ngram_repetition_penalty")
class NgramRepetitionPenaltyReward(BasePointWiseReward):
    """
    Calculate N-gram repetition penalty supporting Chinese processing and multiple penalty strategies.
    """

    name: str = Field(
        default="ngram_repetition_penalty",
        description="N-gram repetition penalty reward",
    )
    n: int = Field(default=3, description="N value for N-gram")

    # Hard threshold penalty parameters
    penalty_threshold: float = Field(
        default=0.3, description="Repetition rate threshold (hard threshold mode)"
    )
    penalty_rate: float = Field(
        default=1.0, description="Penalty multiplier (hard threshold mode)"
    )

    # Soft penalty parameters
    use_soft_penalty: bool = Field(
        default=False, description="Whether to use soft penalty mode"
    )
    max_penalty: float = Field(
        default=-1.0,
        description="Maximum penalty value (soft penalty mode, should be negative)",
    )
    min_scaling: float = Field(
        default=0.0, description="Minimum scaling threshold (soft penalty mode)"
    )

    # Tokenizer parameters
    tokenizer_type: str = Field(
        default="tiktoken",
        description="Tokenizer type: 'tiktoken', 'jieba', or 'simple'",
    )
    encoding_name: str = Field(
        default="cl100k_base",
        description="Tiktoken encoding name (for tiktoken tokenizer)",
    )
    chinese_only: bool = Field(
        default=False,
        description="Whether to keep only Chinese characters (for jieba tokenizer)",
    )

    # Analysis scope parameters
    analyze_scope: str = Field(
        default="full",
        description="Analysis scope: 'full' or 'thought' (thought process only)",
    )

    def __init__(self, **data):
        super().__init__(**data)
        # Initialize tokenizer
        self._tokenizer = get_tokenizer(
            tokenizer_type=self.tokenizer_type,
            encoding_name=self.encoding_name,
            chinese_only=self.chinese_only,
        )

    def _extract_thought_process(self, content: str) -> str:
        """Extract thought process"""
        think_pattern = r"<think>(.*?)</think>"
        matches = re.findall(think_pattern, content, re.DOTALL)
        return " ".join(matches) if matches else ""

    def _generate_ngrams(self, tokens: List[str]) -> List[tuple]:
        """Generate N-grams"""
        if len(tokens) < self.n:
            return []

        # Use unified approach for all tokenizers
        ngrams = []
        for i in range(len(tokens) - self.n + 1):
            ngrams.append(tuple(tokens[i : i + self.n]))
        return ngrams

    def _calculate_penalty(self, repetition_rate: float) -> float:
        """Calculate penalty value"""
        if self.use_soft_penalty:
            # Soft penalty mode
            if self.max_penalty > 0:
                raise ValueError(
                    f"max_penalty {self.max_penalty} should not be positive"
                )

            scaling = repetition_rate
            if scaling < self.min_scaling:
                scaling = 0.0
            elif scaling > self.min_scaling:
                scaling = (scaling - self.min_scaling) / (1 - self.min_scaling)

            return scaling * self.max_penalty
        else:
            # Hard threshold mode (original logic)
            if repetition_rate > self.penalty_threshold:
                return -(repetition_rate - self.penalty_threshold) * self.penalty_rate
            return 0.0

    def _evaluate(
        self, sample: DataSample, **kwargs
    ) -> RewardResult[RewardDimensionWithScore]:
        """
        Calculate N-gram repetition penalty

        Args:
            sample: Data sample containing text content

        Returns:
            RewardResult: Reward result containing N-gram repetition penalty score
        """
        content = sample.output[0].answer.content

        # Select text based on analysis scope
        if self.analyze_scope == "thought":
            text_to_analyze = self._extract_thought_process(content)
            if not text_to_analyze:
                return RewardResult(
                    name=self.name,
                    details=[
                        RewardDimensionWithScore(
                            name=self.name,
                            score=0.0,
                            reason="No thought process found to analyze",
                        )
                    ],
                    extra_data={
                        "analyze_scope": self.analyze_scope,
                        "text_to_analyze": text_to_analyze,
                    },
                )
        else:
            text_to_analyze = content

        # Tokenization using unified tokenizer
        preprocessed_text = self._tokenizer.preprocess_text(
            text_to_analyze,
            to_lower=(
                self.tokenizer_type != "jieba"
            ),  # Keep case for Chinese tokenization
        )
        tokens = self._tokenizer.tokenize(preprocessed_text)

        if len(tokens) < self.n:
            return RewardResult(
                name=self.name,
                details=[
                    RewardDimensionWithScore(
                        name=self.name,
                        score=0.0,
                        reason=f"Text too short for {self.n}-gram analysis",
                    )
                ],
                extra_data={
                    "token_count": len(tokens),
                    "n": self.n,
                    "analyze_scope": self.analyze_scope,
                    "tokenizer_type": self.tokenizer_type,
                },
            )

        # Generate N-grams
        ngrams = self._generate_ngrams(tokens)

        if not ngrams:
            return RewardResult(
                name=self.name,
                details=[
                    RewardDimensionWithScore(
                        name=self.name,
                        score=0.0,
                        reason="No ngrams generated",
                    )
                ],
                extra_data={
                    "token_count": len(tokens),
                    "n": self.n,
                    "analyze_scope": self.analyze_scope,
                    "tokenizer_type": self.tokenizer_type,
                },
            )

        # Calculate repetition rate
        ngram_counts = Counter(ngrams)
        total_ngrams = len(ngrams)
        unique_ngrams = len(ngram_counts)
        repetition_rate = (
            1 - (unique_ngrams / total_ngrams) if total_ngrams > 0 else 0.0
        )

        # Calculate penalty
        penalty = self._calculate_penalty(repetition_rate)

        # Build reason description
        penalty_mode = "soft" if self.use_soft_penalty else "hard"

        return RewardResult(
            name=self.name,
            details=[
                RewardDimensionWithScore(
                    name=self.name,
                    score=penalty,
                    reason=f"{self.n}-gram repetition rate: {repetition_rate:.3f}, penalty: {penalty:.3f} ({penalty_mode} penalty, {self.tokenizer_type} tokenizer, scope: {self.analyze_scope})",
                )
            ],
            extra_data={
                "repetition_rate": repetition_rate,
                "unique_ngrams": unique_ngrams,
                "total_ngrams": total_ngrams,
                "penalty": penalty,
                "most_common_ngrams": ngram_counts.most_common(5),
                "analyze_scope": self.analyze_scope,
                "tokenizer_type": self.tokenizer_type,
                "use_soft_penalty": self.use_soft_penalty,
                "penalty_mode": penalty_mode,
            },
        )

PrivacyLeakageReward

Bases: BasePointWiseReward

Privacy information leakage detection for emails, phone numbers, ID cards, credit cards, and IP addresses.

This reward checks for potential privacy leaks in the generated content, including email addresses, phone numbers, ID numbers, credit card numbers, and IP addresses. Applies penalties for each detected leak.

Source code in rm_gallery/gallery/rm/format/format.py
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
@RewardRegistry.register("privacy_leakage")
class PrivacyLeakageReward(BasePointWiseReward):
    """
    Privacy information leakage detection for emails, phone numbers, ID cards, credit cards, and IP addresses.

    This reward checks for potential privacy leaks in the generated content,
    including email addresses, phone numbers, ID numbers, credit card numbers,
    and IP addresses. Applies penalties for each detected leak.
    """

    name: str = Field(
        default="privacy_leakage", description="Privacy leakage detection reward"
    )
    penalty_per_leak: float = Field(default=-0.5, description="Penalty per leak")

    def _detect_privacy_leaks(self, text: str) -> List[Dict[str, str]]:
        """Detect privacy information leaks"""
        leaks = []

        # Email addresses
        email_pattern = r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b"
        emails = re.findall(email_pattern, text)
        for email in emails:
            leaks.append({"type": "email", "value": email})

        # Phone numbers (simple pattern)
        phone_pattern = (
            r"\b(?:\+?1[-.\s]?)?\(?[0-9]{3}\)?[-.\s]?[0-9]{3}[-.\s]?[0-9]{4}\b"
        )
        phones = re.findall(phone_pattern, text)
        for phone in phones:
            leaks.append({"type": "phone", "value": phone})

        # ID numbers (China)
        id_pattern = r"\b[1-9]\d{5}(18|19|20)\d{2}(0[1-9]|1[0-2])(0[1-9]|[12]\d|3[01])\d{3}[0-9Xx]\b"
        ids = re.findall(id_pattern, text)
        for id_num in ids:
            leaks.append({"type": "id_card", "value": id_num})

        # Credit card numbers (simple detection)
        credit_card_pattern = r"\b(?:\d{4}[-\s]?){3}\d{4}\b"
        cards = re.findall(credit_card_pattern, text)
        for card in cards:
            leaks.append({"type": "credit_card", "value": card})

        # IP addresses
        ip_pattern = r"\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b"
        ips = re.findall(ip_pattern, text)
        for ip in ips:
            # Exclude common non-sensitive IPs (like localhost)
            if not ip.startswith(("127.", "192.168.", "10.", "172.")):
                leaks.append({"type": "ip_address", "value": ip})

        return leaks

    def _evaluate(
        self, sample: DataSample, **kwargs
    ) -> RewardResult[RewardDimensionWithScore]:
        """
        Detect privacy leaks.

        Args:
            sample: Data sample containing text content

        Returns:
            RewardResult: Reward result containing privacy leak penalty score
        """
        content = sample.output[0].answer.content

        leaks = self._detect_privacy_leaks(content)
        penalty = len(leaks) * self.penalty_per_leak

        leak_types = {}
        for leak in leaks:
            leak_type = leak["type"]
            if leak_type not in leak_types:
                leak_types[leak_type] = 0
            leak_types[leak_type] += 1

        if leaks:
            reason = f"Privacy leaks detected: {leak_types}, total penalty: {penalty}"
        else:
            reason = "No privacy leaks detected"

        return RewardResult(
            name=self.name,
            details=[
                RewardDimensionWithScore(name=self.name, score=penalty, reason=reason)
            ],
            extra_data={
                "leaks": leaks,
                "leak_types": leak_types,
                "total_leaks": len(leaks),
                "penalty": penalty,
            },
        )

ReasoningFormatReward

Bases: BasePointWiseReward

Check format reward for thinking format and answer format with proper tags.

This reward verifies if the generated content follows the required format with proper and tags.

Source code in rm_gallery/gallery/rm/format/format.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
@RewardRegistry.register("reasoning_format")
class ReasoningFormatReward(BasePointWiseReward):
    """
    Check format reward for thinking format and answer format with proper tags.

    This reward verifies if the generated content follows the required format
    with proper <think> and <answer> tags.
    """

    name: str = Field(default="format_reward", description="Reasoning Format reward")
    think_token: str = Field(default="think", description="Think tag name")
    answer_token: str = Field(default="answer", description="Answer tag name")

    def _evaluate(
        self, sample: DataSample, **kwargs
    ) -> RewardResult[RewardDimensionWithScore]:
        """
        Check format and calculate reward.

        Args:
            sample: Data sample containing generated content

        Returns:
            RewardResult: Reward result containing format score
        """
        content = sample.output[0].answer.content

        # Check thinking format tags
        think_pattern = f"<{self.think_token}>.*?</{self.think_token}>"
        has_think_tag = bool(re.search(think_pattern, content, re.DOTALL))

        # Check answer format tags
        answer_pattern = f"<{self.answer_token}>.*?</{self.answer_token}>"
        has_answer_tag = bool(re.search(answer_pattern, content, re.DOTALL))

        # Calculate reward
        reward = 1.0 if has_think_tag and has_answer_tag else 0.0
        reasons = []

        if not has_think_tag:
            reasons.append(f"Missing <{self.think_token}></{self.think_token}> tags")

        if not has_answer_tag:
            reasons.append(f"Missing <{self.answer_token}></{self.answer_token}> tags")

        if reward == 1.0:
            reasons.append("All format requirements met")

        return RewardResult(
            name=self.name,
            details=[
                RewardDimensionWithScore(
                    name=self.name, score=reward, reason="; ".join(reasons)
                )
            ],
            extra_data={
                "has_think_tag": has_think_tag,
                "has_answer_tag": has_answer_tag,
                "total_reward": reward,
                "think_token": self.think_token,
                "answer_token": self.answer_token,
            },
        )

ReasoningToolCallFormatReward

Bases: BasePointWiseReward

Check tool call format including think, answer and tool_call tags with JSON validation.

This reward verifies if the generated content follows the required format with proper , and tags, including JSON validation for tool calls.

Source code in rm_gallery/gallery/rm/format/format.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
@RewardRegistry.register("reasoning_tool_call_format")
class ReasoningToolCallFormatReward(BasePointWiseReward):
    """
    Check tool call format including think, answer and tool_call tags with JSON validation.

    This reward verifies if the generated content follows the required format
    with proper <think>, <answer> and <tool_call> tags, including JSON validation
    for tool calls.
    """

    name: str = Field(
        default="tool_call_format", description="Reasoning tool call format reward"
    )

    def _evaluate(
        self, sample: DataSample, **kwargs
    ) -> RewardResult[RewardDimensionWithScore]:
        """
        Check tool call format and calculate reward.

        Args:
            sample: Data sample containing generated content

        Returns:
            RewardResult: Reward result containing format score
        """
        content = sample.output[0].answer.content

        # Extract tag contents
        think_pattern = r"<think>(.*?)</think>"
        answer_pattern = r"<answer>(.*?)</answer>"
        tool_call_pattern = r"<tool_call>(.*?)</tool_call>"

        think_matches = re.search(think_pattern, content, re.DOTALL)
        answer_matches = re.search(answer_pattern, content, re.DOTALL)
        tool_call_matches = re.findall(tool_call_pattern, content, re.DOTALL)

        has_think_tag = think_matches is not None
        has_answer_tag = answer_matches is not None
        has_tool_call_tag = len(tool_call_matches) > 0

        valid_format = False
        valid_tool_call_json = False
        reasons = []

        if has_think_tag:
            # Case 1: <think></think> + <answer></answer>
            if has_answer_tag and not has_tool_call_tag:
                # Check overall format
                format_pattern = r"^\s*<think>.*?</think>\s*<answer>.*?</answer>\s*$"
                valid_format = bool(re.match(format_pattern, content, re.DOTALL))

                # Check tag occurrence count
                if valid_format:
                    valid_format = (
                        content.count("<think>") == 1
                        and content.count("</think>") == 1
                        and content.count("<answer>") == 1
                        and content.count("</answer>") == 1
                    )

                if valid_format:
                    reasons.append("Valid <think></think> + <answer></answer> format")
                else:
                    reasons.append("Invalid <think></think> + <answer></answer> format")

            # Case 2: <think></think> + <tool_call></tool_call>
            elif has_tool_call_tag and not has_answer_tag:
                # Check overall format
                format_pattern = (
                    r"^\s*<think>.*?</think>\s*(?:<tool_call>.*?</tool_call>\s*)+$"
                )
                valid_format = bool(re.match(format_pattern, content, re.DOTALL))

                # Check <think> tag occurrence count
                if valid_format:
                    valid_format = (
                        content.count("<think>") == 1 and content.count("</think>") == 1
                    )

                # Check if <tool_call> and </tool_call> tags appear in pairs
                if valid_format:
                    if content.count("<tool_call>") != content.count("</tool_call>"):
                        valid_format = False

                # Check for consecutive duplicate tags
                if valid_format:
                    if re.search(r"</tool_call>\s*</tool_call>", content) or re.search(
                        r"<tool_call>\s*<tool_call>", content
                    ):
                        valid_format = False

                # Check tool_call JSON format
                valid_tool_call_json = True
                tool_calls = []
                if valid_format:
                    for tool_call_content in tool_call_matches:
                        try:
                            tool_call_json = json.loads(tool_call_content.strip())
                            # Check if JSON contains required fields
                            if not (
                                "name" in tool_call_json
                                and "arguments" in tool_call_json
                            ):
                                valid_tool_call_json = False
                                break
                            tool_calls.append(
                                {
                                    "function": {
                                        "name": tool_call_json["name"],
                                        "arguments": json.dumps(
                                            tool_call_json["arguments"],
                                            ensure_ascii=False,
                                        ),
                                    }
                                }
                            )
                        except json.JSONDecodeError:
                            valid_tool_call_json = False
                            break

                valid_format = valid_format and valid_tool_call_json

                if valid_format:
                    reasons.append(
                        "Valid <think></think> + <tool_call></tool_call> format with valid JSON"
                    )
                else:
                    if not valid_tool_call_json:
                        reasons.append("Invalid JSON format in <tool_call> tags")
                    else:
                        reasons.append(
                            "Invalid <think></think> + <tool_call></tool_call> format"
                        )
            else:
                # Has both answer and tool_call, or neither
                reasons.append(
                    "Invalid combination: should have either <answer> or <tool_call> tags, not both or neither"
                )
        else:
            reasons.append("Missing <think></think> tags")

        # Calculate reward score
        reward = 1.0 if valid_format else 0.0

        return RewardResult(
            name=self.name,
            details=[
                RewardDimensionWithScore(
                    name=self.name, score=reward, reason="; ".join(reasons)
                )
            ],
            extra_data={
                "has_think_tag": has_think_tag,
                "has_answer_tag": has_answer_tag,
                "has_tool_call_tag": has_tool_call_tag,
                "valid_format": valid_format,
                "valid_tool_call_json": valid_tool_call_json,
                "tool_call_count": len(tool_call_matches),
                "reward": reward,
            },
        )

RewardDimensionWithScore

Bases: RewardDimension

Pointwise/Stepwise reward dimension with a numerical score.

Attributes:

Name Type Description
score float

Numerical value representing the reward magnitude

Source code in rm_gallery/core/reward/schema.py
20
21
22
23
24
25
26
27
28
class RewardDimensionWithScore(RewardDimension):
    """
    Pointwise/Stepwise reward dimension with a numerical score.

    Attributes:
        score (float): Numerical value representing the reward magnitude
    """

    score: float = Field(default=..., description="score")

RewardRegistry

A registry management system for reward modules that maps module names to their corresponding implementation classes.

This class provides a centralized repository for registering and retrieving reward modules by string identifiers. Modules can be registered using decorators and later accessed by their string identifiers.

Attributes:

Name Type Description
_registry Dict[str, Type[BaseReward]]

Internal dictionary storing the mapping between reward module names and their classes.

Source code in rm_gallery/core/reward/registry.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
class RewardRegistry:
    """A registry management system for reward modules that maps module names to their corresponding implementation classes.

    This class provides a centralized repository for registering and retrieving reward modules by string identifiers.
    Modules can be registered using decorators and later accessed by their string identifiers.

    Attributes:
        _registry: Internal dictionary storing the mapping between reward module names and their classes.
    """

    # Dictionary mapping reward module names to their corresponding classes
    _registry: Dict[str, Type[BaseReward]] = {}

    @classmethod
    def register(cls, name: str):
        """Create a decorator to register a reward module class with a specified identifier.

        The decorator pattern allows classes to be registered while maintaining their original identity.

        Args:
            name: Unique string identifier for the reward module
            module: The BaseReward subclass to be registered

        Returns:
            A decorator function that registers the module when applied to a class
        """

        def _register(module: Type[BaseReward]):
            """Internal registration function that stores the module in the registry.

            Args:
                module: The BaseReward subclass to be registered

            Returns:
                The original module class (unchanged)
            """
            cls._registry[name] = module
            return module

        return _register

    @classmethod
    def get(cls, name: str) -> Type[BaseReward] | None:
        """Retrieve a registered reward module class by its identifier.

        Provides safe access to registered modules without raising errors for missing entries.

        Args:
            name: String identifier of the reward module to retrieve

        Returns:
            The corresponding BaseReward subclass if found, None otherwise
        """
        assert name in cls._registry, f"Reward module '{name}' not found"
        return cls._registry.get(name, None)

    @classmethod
    def list(cls) -> str:
        """
        Returns:
            A list of all registered reward modules
        """
        info = []
        for name, module in cls._registry.items():
            info.append(
                pd.Series(
                    {
                        "Name": name,
                        "Class": module.__name__,
                        "Scenario": module.__doc__.strip(),
                    }
                )
            )

        info_df = pd.concat(info, axis=1).T
        # info_str = info_df.to_markdown(index=False)
        info_str = tabulate(
            info_df,
            headers="keys",
            tablefmt="grid",
            maxcolwidths=[50] * (len(info_df.columns) + 1),
            # showindex=False,
        )
        # info_str = tabulate(info_df, headers='keys', tablefmt='github')
        return info_str

get(name) classmethod

Retrieve a registered reward module class by its identifier.

Provides safe access to registered modules without raising errors for missing entries.

Parameters:

Name Type Description Default
name str

String identifier of the reward module to retrieve

required

Returns:

Type Description
Type[BaseReward] | None

The corresponding BaseReward subclass if found, None otherwise

Source code in rm_gallery/core/reward/registry.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
@classmethod
def get(cls, name: str) -> Type[BaseReward] | None:
    """Retrieve a registered reward module class by its identifier.

    Provides safe access to registered modules without raising errors for missing entries.

    Args:
        name: String identifier of the reward module to retrieve

    Returns:
        The corresponding BaseReward subclass if found, None otherwise
    """
    assert name in cls._registry, f"Reward module '{name}' not found"
    return cls._registry.get(name, None)

list() classmethod

Returns:

Type Description
str

A list of all registered reward modules

Source code in rm_gallery/core/reward/registry.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
@classmethod
def list(cls) -> str:
    """
    Returns:
        A list of all registered reward modules
    """
    info = []
    for name, module in cls._registry.items():
        info.append(
            pd.Series(
                {
                    "Name": name,
                    "Class": module.__name__,
                    "Scenario": module.__doc__.strip(),
                }
            )
        )

    info_df = pd.concat(info, axis=1).T
    # info_str = info_df.to_markdown(index=False)
    info_str = tabulate(
        info_df,
        headers="keys",
        tablefmt="grid",
        maxcolwidths=[50] * (len(info_df.columns) + 1),
        # showindex=False,
    )
    # info_str = tabulate(info_df, headers='keys', tablefmt='github')
    return info_str

register(name) classmethod

Create a decorator to register a reward module class with a specified identifier.

The decorator pattern allows classes to be registered while maintaining their original identity.

Parameters:

Name Type Description Default
name str

Unique string identifier for the reward module

required
module

The BaseReward subclass to be registered

required

Returns:

Type Description

A decorator function that registers the module when applied to a class

Source code in rm_gallery/core/reward/registry.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@classmethod
def register(cls, name: str):
    """Create a decorator to register a reward module class with a specified identifier.

    The decorator pattern allows classes to be registered while maintaining their original identity.

    Args:
        name: Unique string identifier for the reward module
        module: The BaseReward subclass to be registered

    Returns:
        A decorator function that registers the module when applied to a class
    """

    def _register(module: Type[BaseReward]):
        """Internal registration function that stores the module in the registry.

        Args:
            module: The BaseReward subclass to be registered

        Returns:
            The original module class (unchanged)
        """
        cls._registry[name] = module
        return module

    return _register

RewardResult

Bases: BaseModel, Generic[T]

Container for reward calculation results with generic type support.

Attributes:

Name Type Description
name str

Identifier of the reward module that generated this result

details List[T]

Collection of detailed reward information items

extra_data dict

Additional metadata or context information

Source code in rm_gallery/core/reward/schema.py
65
66
67
68
69
70
71
72
73
74
75
76
77
class RewardResult(BaseModel, Generic[T]):
    """
    Container for reward calculation results with generic type support.

    Attributes:
        name (str): Identifier of the reward module that generated this result
        details (List[T]): Collection of detailed reward information items
        extra_data (dict): Additional metadata or context information
    """

    name: str = Field(default=..., description="reward module name")
    details: List[T] = Field(default_factory=list, description="reward details")
    extra_data: dict = Field(default_factory=dict, description="extra data")

get_tokenizer(tokenizer_type='tiktoken', encoding_name='cl100k_base', chinese_only=False, **kwargs)

Factory function to create tokenizer instances.

Parameters:

Name Type Description Default
tokenizer_type str

Type of tokenizer ("tiktoken", "jieba", "simple")

'tiktoken'
encoding_name str

Tiktoken encoding name (for tiktoken tokenizer)

'cl100k_base'
chinese_only bool

Whether to keep only Chinese characters (for jieba tokenizer)

False
**kwargs

Additional arguments for tokenizer initialization

{}

Returns:

Name Type Description
BaseTokenizer BaseTokenizer

Tokenizer instance

Raises:

Type Description
ValueError

If tokenizer_type is not supported

Source code in rm_gallery/core/utils/tokenizer.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
def get_tokenizer(
    tokenizer_type: str = "tiktoken",
    encoding_name: str = "cl100k_base",
    chinese_only: bool = False,
    **kwargs,
) -> BaseTokenizer:
    """
    Factory function to create tokenizer instances.

    Args:
        tokenizer_type: Type of tokenizer ("tiktoken", "jieba", "simple")
        encoding_name: Tiktoken encoding name (for tiktoken tokenizer)
        chinese_only: Whether to keep only Chinese characters (for jieba tokenizer)
        **kwargs: Additional arguments for tokenizer initialization

    Returns:
        BaseTokenizer: Tokenizer instance

    Raises:
        ValueError: If tokenizer_type is not supported
    """
    if tokenizer_type == "tiktoken":
        return TiktokenTokenizer(encoding_name=encoding_name, **kwargs)
    elif tokenizer_type == "jieba":
        return JiebaTokenizer(chinese_only=chinese_only, **kwargs)
    elif tokenizer_type == "simple":
        return SimpleTokenizer(**kwargs)
    else:
        raise ValueError(
            f"Unsupported tokenizer type: {tokenizer_type}. "
            f"Supported types: tiktoken, jieba, simple"
        )