Source code for trinity.common.workflows.math_trainable_ruler_workflow

# -*- coding: utf-8 -*-
"""Math workflow with trainable RULER."""
import ast
from copy import deepcopy
from typing import Any, List, Optional, Tuple

import numpy as np
import openai

from trinity.common.experience import Experience
from trinity.common.models.model import ModelWrapper
from trinity.common.rewards.math_reward import MathRewardFn
from trinity.common.workflows.workflow import WORKFLOWS, SimpleWorkflow, Task

# the probability that the ground truth is assumed to be available for RL
PROBABILITY_GROUND_TRUTH_AVAILABLE = 0.2


[docs] @WORKFLOWS.register_module("math_trainable_ruler_workflow") class MathTrainableRULERWorkflow(SimpleWorkflow): """A workflow for math, where the policy model itself serves as a RULER reward model. Modified from `MathRULERWorkflow`. RULER is adapted from https://github.com/OpenPipe/ART/blob/main/src/art/rewards/ruler.py """
[docs] def __init__( self, *, task: Task, model: ModelWrapper, auxiliary_models: Optional[List[openai.OpenAI]] = None, ): super().__init__( task=task, model=model, auxiliary_models=auxiliary_models, )
[docs] def reset(self, task: Task): if task.reward_fn is None: task.reward_fn = MathRewardFn if task.reward_fn == MathRewardFn and task.format_args.system_prompt is None: task.format_args.system_prompt = """A conversation between User and Assistant. The user asks a question, and the Assistant solves it. The assistant first thinks about the reasoning process in the mind and then provides the user with the answer. The reasoning process and answer are enclosed within <think> </think> and <answer> </answer> tags, respectively, i.e., <think> reasoning process here </think> <answer> answer here </answer>. """ # call the SimpleWorkflow.reset super().reset(task)
[docs] def run(self) -> List[Experience]: """Modified from MathRULERWorkflow.run""" # Part 1: generate responses to the original task (as in usual workflows) messages = self.format_messages() self.logger.debug("start chat") responses = self.model.chat(messages, **self.rollout_args) gold_rewards = [] gold_scores_scaled = [] for i, response in enumerate(responses): gold_reward_dict = self.reward_fn( # type: ignore [misc] response=response.response_text, # type: ignore [arg-type] truth=self.truth, ) if response.metrics is None: response.metrics = {} response.metrics.update(gold_reward_dict) gold_reward = sum(gold_reward_dict.values()) response.metrics.update({"gold_reward": gold_reward}) # set task_id explicitly within workflow! response.eid.task = str(self.task.task_id) response.eid.run = i + self.run_id_base gold_rewards.append(gold_reward) gold_scores_scaled.append( (gold_reward + 0.1) / 1.2 ) # scale from range [-0.1, 1.1] to [0, 1] # Part 2: get and use RULER scores ruler_rollout_args = deepcopy(self.rollout_args) ground_truth_is_available = np.random.rand() < PROBABILITY_GROUND_TRUTH_AVAILABLE if ground_truth_is_available: # Assuming that ground truth is accessible to RL: # - set exp's reward to gold reward # - generate RULER scores for repeat_times, construct ruler_responses # - return responses + ruler_responses judge_success_rate, ruler_responses, ruler_scores = self.get_ruler_responses( responses=responses, judger=self.model, # use the policy model itself as judger! ruler_rollout_args=ruler_rollout_args, gold_scores=gold_scores_scaled, ) for i, response in enumerate(responses): response.reward = gold_rewards[i] response.metrics.update({"judge_success": judge_success_rate}) for i, ruler_response in enumerate(ruler_responses): # set task_id explicitly, to distinguish two types of experiences! ruler_response.eid.task = str(self.task.task_id) + "-ruler" ruler_response.eid.run = i + self.run_id_base return responses + ruler_responses else: # Assuming that ground truth is not accessible to RL: # - generate RULER scores only once # - set exp's reward to RULER score # - return responses ruler_rollout_args["n"] = 1 judge_success_rate, ruler_responses, ruler_scores = self.get_ruler_responses( responses=responses, judger=self.model, # use the policy model itself as judger! ruler_rollout_args=ruler_rollout_args, gold_scores=None, ) for i, response in enumerate(responses): response.reward = ruler_scores[i] response.metrics.update({"judge_success": judge_success_rate}) return responses
[docs] def get_ruler_responses( self, responses: List[Experience], judger: Any, ruler_rollout_args: Any, gold_scores: Optional[List[float]] = None, ) -> Tuple[float, List[Experience], List[float]]: """Get RULER scores Returns: judge_success_rate: float ruler_responses: List[Experience] ruler_scores: List[float] """ num_responses = len(responses) # Step 1: format prompt for judge ruler_system_prompt = f"You are a fair judge. The user will provide a question and {num_responses} candidate solutions to it. Your task is to compare the solutions, see how well they resolve the question, and assign a score within the range [0, 1] for each solution." question_prompt = ( f"Question: {self.task_desc}\n\n" f"""Solution format requirement: first thinks about the reasoning process in the mind and then provides the final answer. The reasoning process and answer are enclosed within <think> </think> and <answer> </answer> tags, respectively, i.e., <think> reasoning process here </think> <answer> answer here </answer>.""" ) solutions_prompt_parts = [ f"Candidate solution {i + 1}: {response.response_text}" for i, response in enumerate(responses) ] solutions_prompt = "\n\n".join(solutions_prompt_parts) ruler_user_prompt = f""" Below is a question and several candidate solutions. {question_prompt} {solutions_prompt} Please assign a score within the range [0, 1] for each of them, reflecting how well they solve the question. You may compare them against each other and think step by step before returning your final scores, but keep your reasoning process brief and concise when possible. Conclude your response with a list of scores, in the following format: [score for solution 1, score for solution 2, ..., score for solution {num_responses}] """ # Step 2: invoke judger LLM (actually self.model), get ruler_responses: List[Experience] messages = [ {"role": "system", "content": ruler_system_prompt}, {"role": "user", "content": ruler_user_prompt}, ] ruler_responses = judger.chat(messages, **ruler_rollout_args) # Step 3: extract scores from each ruler_response, and update its reward if needed ruler_scores = [0.0 for _ in range(num_responses)] judge_success_count = 0 for ruler_response in ruler_responses: # default reward is 0; update if gold_scores is provided & judger returns valid scores ruler_response.reward = 0.0 ruler_response_text = ruler_response.response_text idx1, idx2 = ruler_response_text.rfind("["), ruler_response_text.rfind("]") if (idx1 == -1) or (idx2 == -1) or (idx1 > idx2): self.logger.warning("Unable to extract a list from judger response.") continue lst_as_str = ruler_response_text[idx1 : (idx2 + 1)] try: scores = ast.literal_eval(lst_as_str) scores = [max(0.0, min(1.0, score)) for score in scores] # clip to range [0, 1] if len(scores) == num_responses: judge_success_count += 1 ruler_scores = [ruler_scores[i] + scores[i] for i in range(len(ruler_scores))] if gold_scores: mae_error = np.abs(np.array(scores) - np.array(gold_scores)).mean() ruler_response.reward = 1.0 - mae_error else: self.logger.warning( "The length of list in judger response does not match num_responses." ) except Exception: self.logger.warning("Unable to parse the list in judger response.") if judge_success_count > 0: ruler_scores = [score / judge_success_count for score in ruler_scores] if len(ruler_responses) > 0: judge_success_rate = 1.0 * judge_success_count / len(ruler_responses) else: judge_success_rate = 0.0 for ruler_response in ruler_responses: if ruler_response.metrics is None: ruler_response.metrics = {} ruler_response.metrics.update({"judge_success": judge_success_rate}) ruler_response.metrics.update({"reward_for_judger": ruler_response.reward}) return judge_success_rate, ruler_responses, ruler_scores