Source code for trinity.common.workflows.step_wise_workflow

from abc import abstractmethod

import openai

from trinity.common.experience import Experience
from trinity.common.models.model import ModelWrapper
from trinity.common.workflows.workflow import Task, Workflow


[docs] class StepWiseRewardWorkflow(Workflow): """A workflow that implements step-wise rewards for tasks."""
[docs] def __init__( self, *, task: Task, model: ModelWrapper, auxiliary_models=None, use_openai_client=True ): super().__init__(task=task, model=model, auxiliary_models=auxiliary_models) assert model.enable_history, ( "Rollout Model must have history enabled for step-wise rewards, please " "set `explorer.rollout_model.enable_history` to `True` in your config." ) # use the rollout model's OpenAI client to write your agent application if use_openai_client: self.client: openai.OpenAI = model.get_openai_client() else: self.client = None
[docs] def run(self) -> list[Experience]: """Run the workflow and return a list of experiences with step-wise rewards.""" experiences = [] for step in range(self.max_step_num): # Run a single step of the agent application continue_run = self.step(step_num=step) # Collect experiences data of the current step exps = self.model.extract_experience_from_history() # Calculate the reward for the current step reward = self.reward(exps, step_num=step) for exp in exps: exp.reward = reward # set the step number in each experience exp.eid.step = step # Store the step experiences experiences.extend(exps) if not continue_run: break return experiences
[docs] @abstractmethod def step(self, step_num: int) -> bool: """Run a single step of your agent application. Args: step_num (int): The current step number. Returns: bool: Whether to continue running the agent application. Tips: You can use the openai client (`self.client`) to migrate your existing applications at low cost. """ pass
[docs] @abstractmethod def reward(self, exps: list[Experience], step_num: int) -> float: """Calculate the reward for the given experiences at the specified step.""" pass
@property @abstractmethod def max_step_num(self): """Return the maximum number of steps in the task.""" @property def repeatable(self): return False
[docs] class RewardPropagationWorkflow(Workflow): """A workflow that propagates rewards across multiple turns."""
[docs] def __init__( self, *, task: Task, model: ModelWrapper, auxiliary_models=None, use_openai_client=True ): super().__init__(task=task, model=model, auxiliary_models=auxiliary_models) assert model.enable_history, ( "Rollout Model must have history enabled for step-wise rewards, please " "set `explorer.rollout_model.enable_history` to `True` in your config." ) # use the rollout model's OpenAI client to write your agent application if use_openai_client: self.client: openai.OpenAI = model.get_openai_client() else: self.client = None
[docs] def run(self) -> list[Experience]: """Run the workflow and return a list of experiences with step-wise rewards.""" experiences = [] for step in range(self.max_step_num): # Run a single step of the agent application continue_run = self.step(step_num=step) # Collect experiences data of the current step exps = self.model.extract_experience_from_history() # set the step number in each experience for exp in exps: exp.eid.step = step # Store the step experiences experiences.extend(exps) if not continue_run: break reward = self.reward(experiences) for exp in experiences: exp.reward = reward if exp.metrics is None: exp.metrics = {} exp.metrics["actual_env_steps"] = step + 1 # +1 because step starts from 0 return experiences
[docs] @abstractmethod def step(self, step_num: int) -> bool: """Run a single step of your agent application. Args: step_num (int): The current step number. Returns: bool: Whether to continue running the agent application. Tips: You can use the openai client (`self.client`) to migrate your existing applications at low cost. """ pass
[docs] @abstractmethod def reward(self, exps: list[Experience]) -> float: """Calculate the reward for the given experiences of the entire run.""" pass
@property @abstractmethod def max_step_num(self): """Return the maximum number of steps in the task.""" @property def repeatable(self): return False