trinity.common.workflows package

Contents

trinity.common.workflows package#

Subpackages#

Submodules#

Module contents#

Workflow module

class trinity.common.workflows.Task(workflow: ~typing.Type[~trinity.common.workflows.workflow.Workflow] | None = None, repeat_times: int | None = None, format_args: ~trinity.common.config.FormatConfig = <factory>, rollout_args: ~trinity.common.config.GenerationConfig = <factory>, workflow_args: dict = <factory>, reward_fn_args: dict = <factory>, is_eval: bool = False, reward_fn: ~typing.Type[~trinity.common.rewards.reward_fn.RewardFn] | None = None, raw_task: dict | None = None, batch_id: int | str = '', task_id: int | str = '', index: dict = <factory>)[source]#

Bases: dict

A Task class that defines a task and its associated reward function / workflow.

workflow: Type[Workflow] = None#
repeat_times: int | None = None#
format_args: FormatConfig#
rollout_args: GenerationConfig#
workflow_args: dict#
reward_fn_args: dict#
is_eval: bool = False#
reward_fn: Type[RewardFn] | None = None#
raw_task: dict | None = None#
batch_id: int | str = ''#
task_id: int | str = ''#
index: dict#
to_workflow(model: Any, auxiliary_models: List[OpenAI] | None = None) Workflow[source]#

Convert the task to a workflow.

Parameters:
  • model (ModelWrapper) – The rollout model for the workflow.

  • auxiliary_models (List[openai.OpenAI]) – The auxiliary models for the workflow.

Note

model_path attribute is added to the auxiliary_models for use within the workflow.

Returns:

The generated workflow object.

Return type:

Workflow

property task_desc: str | None#
property truth: str | None#
to_dict() dict[source]#
__init__(workflow: ~typing.Type[~trinity.common.workflows.workflow.Workflow] | None = None, repeat_times: int | None = None, format_args: ~trinity.common.config.FormatConfig = <factory>, rollout_args: ~trinity.common.config.GenerationConfig = <factory>, workflow_args: dict = <factory>, reward_fn_args: dict = <factory>, is_eval: bool = False, reward_fn: ~typing.Type[~trinity.common.rewards.reward_fn.RewardFn] | None = None, raw_task: dict | None = None, batch_id: int | str = '', task_id: int | str = '', index: dict = <factory>) None#
class trinity.common.workflows.Workflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: object

The base workflow class.

A workflow is a runnable object which generates a list of experiences.

can_reset: bool = False#
can_repeat: bool = False#
is_async: bool = False#
__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
property resettable#

Deprecated, use cls.can_reset instead.

property repeatable#

Deprecated, use cls.can_repeat instead. A workflow is repeatable if it can be run multiple times within the run() or run_async() method.

property asynchronous#

Deprecated, use cls.is_async instead. Whether the workflow runs in async mode.

reset(task: Task)[source]#

Reset the workflow.

set_repeat_times(repeat_times: int, run_id_base: int) None[source]#

Set the number of times to repeat the workflow. :param repeat_times: number of times to repeat the workflow (if repeatable). :type repeat_times: int :param run_id_base: base run_id for setting run_id in experiences. :type run_id_base: int

run() List[Experience][source]#

Run workflow and return a list of experiences.

async run_async() List[Experience][source]#

Run workflow in async and return a list of experiences.

class trinity.common.workflows.AsyncSimpleWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: BaseSimpleWorkflow

is_async: bool = True#
async run_async() List[Experience][source]#

Run workflow in async and return a list of experiences.

class trinity.common.workflows.SimpleWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: BaseSimpleWorkflow

A workflow for simple single-round task.

can_reset: bool = True#
can_repeat: bool = True#
run() List[Experience][source]#

Run workflow and return a list of experiences.

class trinity.common.workflows.AsyncMathWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: AsyncSimpleWorkflow, MathWorkflow

class trinity.common.workflows.MathWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: SimpleWorkflow

A workflow for math tasks as introduced in DeepSeek-R1.

__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
reset(task: Task)[source]#

Reset the workflow.

class trinity.common.workflows.WebShopWorkflow(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#

Bases: MultiTurnWorkflow

A workflow for webshop task.

can_reset: bool = True#
is_async: bool = True#
__init__(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#
reset(task: Task)[source]#

Reset the workflow.

async get_model_response(messages)[source]#
async get_model_response_text(messages)[source]#
async generate_env_inference_samples(env, session_id, rollout_num) List[Experience][source]#
async run_async() List[Experience][source]#

Run workflow in async and return a list of experiences.

class trinity.common.workflows.AlfworldWorkflow(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#

Bases: MultiTurnWorkflow

A workflow for alfworld task.

is_async: bool = True#
can_repeat: bool = False#
__init__(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#
async get_model_response(messages)[source]#
async get_model_response_text(messages)[source]#
async generate_env_inference_samples(env) List[Experience][source]#
async run_async() List[Experience][source]#

Run workflow in async and return a list of experiences.

class trinity.common.workflows.StepWiseAlfworldWorkflow(model: ModelWrapper, task: Task, auxiliary_models: List | None = None, use_openai_client: bool = False)[source]#

Bases: RewardPropagationWorkflow

An Alfworld workflow refactored to use the RewardPropagationWorkflow base class.

This workflow manages an Alfworld environment, interacts with it step-by-step using a model, and calculates a final reward based on the episode’s outcome.

__init__(model: ModelWrapper, task: Task, auxiliary_models: List | None = None, use_openai_client: bool = False)[source]#
run() List[Experience][source]#

Run the workflow and return a list of experiences with step-wise rewards.

step(step_num: int) bool[source]#

Run a single step of your agent application.

Parameters:

step_num (int) – The current step number.

Returns:

Whether to continue running the agent application.

Return type:

bool

Tips:

You can use the openai client (self.client) to migrate your existing applications at low cost.

reward(exps: list[Experience]) float[source]#

Calculate the reward for the given experiences of the entire run.

property max_step_num: int#

Return the maximum number of steps allowed in an episode.

class trinity.common.workflows.RAFTAlfworldWorkflow(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#

Bases: Workflow

RAFT workflow for alfworld using trajectory context.

Process: 1. First exploration with normal experience generation 2. Generate SFT data from successful attempt

can_reset: bool = True#
can_repeat: bool = True#
is_async: bool = True#
__init__(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#
reset(task: Task)[source]#

Reset the workflow with a new task

create_environment(game_file)[source]#

Create alfworld environment

async run_single_rollout(env) tuple[List[Dict[str, str]], float, bool, int, List[Dict[str, str]]][source]#

Run a single rollout with RAFT-guided actions

async eval_alfworld() List[Experience][source]#

Evaluate a single alfworld trajectory

async run_async() List[Experience][source]#

Run the RAFT alfworld workflow and return experiences

set_repeat_times(repeat_times, run_id_base)[source]#

Set the number of times to repeat the workflow. :param repeat_times: number of times to repeat the workflow (if repeatable). :type repeat_times: int :param run_id_base: base run_id for setting run_id in experiences. :type run_id_base: int

class trinity.common.workflows.RAFTReflectAlfworldWorkflow(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#

Bases: RAFTAlfworldWorkflow

RAFT workflow for alfworld using trajectory context.

Process: 1. First exploration with normal experience generation 2. If failed, re-explore with first trajectory as context 3. Generate SFT data from successful attempt

__init__(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#
async construct_sft_data(first_trajectory: List[Dict[str, str]], success: bool, reward: float, original_steps: int) tuple[List[Dict[str, str]], Dict[str, Any], List[Dict[str, str]]][source]#

Generate SFT training data using RAFT learning

async re_explore_with_context(first_trajectory: List[Dict[str, str]], original_reward: float, original_success: bool, original_steps: int) tuple[List[Dict[str, str]], float, bool, int, List[Dict[str, str]]][source]#

Re-explore with first trajectory as context

async run_async() List[Experience][source]#

Run the RAFT alfworld workflow and return experiences

class trinity.common.workflows.SciWorldWorkflow(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#

Bases: MultiTurnWorkflow

A workflow for sciworld task.

is_async: bool = True#
__init__(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#
async get_model_response(messages)[source]#
async get_model_response_text(messages)[source]#
async generate_env_inference_samples(env, rollout_num) List[Experience][source]#
async run_async() List[Experience][source]#

Run workflow in async and return a list of experiences.

class trinity.common.workflows.AsyncMathBoxedWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: MathBoxedWorkflow

is_async: bool = True#
async run_async() List[Experience][source]#

Run workflow in async and return a list of experiences.

class trinity.common.workflows.MathBoxedWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: SimpleWorkflow

A workflow for math tasks that give answers in boxed format.

reset(task: Task)[source]#

Reset the workflow.

format_prompt()[source]#
run() List[Experience][source]#

Run workflow and return a list of experiences.

class trinity.common.workflows.AsyncMathRMWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: MathRMWorkflow

is_async: bool = True#
async run_async() List[Experience][source]#

Run workflow in async and return a list of experiences.

class trinity.common.workflows.MathRMWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: SimpleWorkflow

A workflow for math tasks as introduced in DeepSeek-R1.

__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
run() List[Experience][source]#

Run workflow and return a list of experiences.

class trinity.common.workflows.ToolCallWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: SimpleWorkflow

A workflow for toolcall tasks. Prompt construction and reward function from NVlabs/Tool-N1

Only support qwen model for now. You can change the prompt construction and reward calculation by yourself for other models.

is_async: bool = True#
reset(task: Task)[source]#

Reset the workflow.

format_prompt()[source]#
async run_async() List[Experience][source]#

Run workflow in async and return a list of experiences.

class trinity.common.workflows.AsyncMathEvalWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: MathEvalWorkflow

is_async: bool = True#
async run_async() List[Experience][source]#

Run workflow in async and return a list of experiences.

class trinity.common.workflows.MathEvalWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: Workflow

A workflow for standard math evaluation.

The evaluation standard and prompting style are follow the Qwen2.5-Math model’s evaluation methodology. For more details on their approach, see: QwenLM/Qwen2.5-Math

__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
format_messages()[source]#

Format message for the evaluation of qwen_boxed type.

run() List[Experience][source]#

Run workflow and return a list of experiences.

class trinity.common.workflows.AgentScopeV0ReactMathWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: Workflow

This workflow serves as an example of how to use the agentscope framework within the trinity workflow. We use the AgentScope V0 version here. The code will be deprecated soon.

can_reset: bool = True#
__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
reset(task: Task)[source]#

Reset the workflow.

run()[source]#

Run workflow and return a list of experiences.

class trinity.common.workflows.AgentScopeReactMathWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: Workflow

This workflow serves as an example of how to use the agentscope framework within the trinity workflow. We use the AgentScope V1 version here.

can_reset: bool = True#
is_async: bool = True#
__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
reset(task: Task)[source]#

Reset the workflow.

async run_async()[source]#

Run workflow in async and return a list of experiences.

class trinity.common.workflows.AgentScopeV1ReactSearchWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: Workflow

This workflow serves as an example of how to use the agentscope framework within the trinity workflow.

can_reset: bool = True#
is_async: bool = True#
__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
reset(task: Task)[source]#

Reset the workflow.

judge_result(result, question, correct_answer, judge_model=None) bool[source]#

Use LLM to judge whether the answer is correct or not.

async run_async()[source]#

Run workflow in async and return a list of experiences.

class trinity.common.workflows.AgentScopeReActWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: Workflow

is_async: bool = True#
__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
async run_async()[source]#

Run the workflow asynchronously.

async calculate_reward(response) float | Dict[str, float][source]#

Calculate the reward for the workflow.

Returns:

The reward value or a dictionary of reward value.

Return type:

Union[float, Dict[str, float]]

construct_experiences(reward: float | Dict[str, float]) List[Experience][source]#

Construct experiences from the agent’s interaction history.

Parameters:

reward (Union[float, Dict[str, float]]) – The reward value to assign to each experience.

Returns:

A list of Experience objects.

Return type:

List

class trinity.common.workflows.EmailSearchWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: Workflow

Multi-turn Email Search workflow (ReAct-style tool use).

can_reset: bool = True#
is_async: bool = True#
__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
reset(task: Task)[source]#

Reset the workflow.

async run_async()[source]#

Run workflow in async and return a list of experiences.

async calculate_reward(answer_and_sources: Dict) Dict[str, float][source]#

Ref: calculate_reward in OpenPipe/ART

class trinity.common.workflows.AsyncMathRULERWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: MathRULERWorkflow

is_async: bool = True#
async run_async() List[Experience][source]#

Modified from SimpleWorkflow.run

class trinity.common.workflows.MathRULERWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: SimpleWorkflow

A workflow for math with RULER reward function.

Modified from MathWorkflow. Adapted from OpenPipe/ART

__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
reset(task: Task)[source]#

Note that in this workflow, MathRewardFn is only used for calculating the ‘golden reward’, whereasa the rewards used by RL training are calculated by RULER.

run() List[Experience][source]#

Modified from SimpleWorkflow.run

get_ruler_scores(responses: List[Experience], judger: Any) Tuple[bool, List[float]][source]#

Get RULER scores

class trinity.common.workflows.MathTrainableRULERWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: SimpleWorkflow

A workflow for math, where the policy model itself serves as a RULER reward model. Modified from MathRULERWorkflow. RULER is adapted from OpenPipe/ART

__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
reset(task: Task)[source]#

Reset the workflow.

run() List[Experience][source]#

Modified from MathRULERWorkflow.run

get_ruler_responses(responses: List[Experience], judger: Any, ruler_rollout_args: Any, gold_scores: List[float] | None = None) Tuple[float, List[Experience], List[float]][source]#

Get RULER scores :returns: float

ruler_responses: List[Experience] ruler_scores: List[float]

Return type:

judge_success_rate

class trinity.common.workflows.AsyncSimpleMMWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: SimpleMMWorkflow

is_async: bool = True#
async run_async() List[Experience][source]#

Run workflow in async and return a list of experiences.

class trinity.common.workflows.SimpleMMWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: SimpleWorkflow

A workflow for simple single-round task.

__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
reset(task: Task)[source]#

Reset the workflow.

run() List[Experience][source]#

Run workflow and return a list of experiences.

class trinity.common.workflows.RubricJudgeWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: SimpleWorkflow

A workflow using LLM-as-a-judge and rubrics to get the reward.

Adapted from https://arxiv.org/pdf/2507.17746

__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
reset(task: Task)[source]#

Modified from SimpleWorkflow.reset

run() List[Experience][source]#

Modified from SimpleWorkflow.run

get_judge_reward(response: str, judger: OpenAI) Tuple[bool, float][source]#

Get rewards with LLM-as-a-judge The prompts are adapted from RAR-IMPLICIT method in https://arxiv.org/pdf/2507.17746

class trinity.common.workflows.AgentScopeWorkflowAdapter(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: Workflow

Adapter to wrap a agentscope trainable workflow function into a Trinity Workflow.

is_async: bool = True#
__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Initialize the adapter with the task and model.

construct_experiences(reward: float) List[Experience][source]#

Construct experiences from the agent’s interaction history.

Parameters:

reward (float) – The reward value to assign to each experience.

Returns:

A list of Experience objects.

Return type:

List

async run_async() List[Experience][source]#

Run the workflow asynchronously and return experiences.

class trinity.common.workflows.FrozenLakeWorkflow(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#

Bases: MultiTurnWorkflow

FrozenLake environment for multi-step workflows.

## Description The game starts with the player at random location of the frozen lake grid world with the goal located at another random location for the 4x4 environment.

## Action Space The action shape is (1,) in the range {0, 3} indicating which direction to move the player. NOTE the action space is different from gymnasium.envs.toy_text.frozen_lake.FrozenLakeEnv, start from 1 use action_map to map from custom action to action defined in FrozenLakeEnv in gymnasium - 0: Still - 1: Left - 2: Down - 3: Right - 4: Up

## Starting State The episode starts with the player at random location

## Rewards Reward schedule: - Reach goal: +1 - Reach hole: 0 - Reach frozen: 0

## Arguments is_slippery: if action is left and is_slippery is True, then: - P(move left)=1/3 - P(move up)=1/3 - P(move down)=1/3

## Example P _ _ _ _ _ _ O O _ O _ O _ _ G

can_reset: bool = False#
is_async: bool = True#
can_repeat: bool = False#
__init__(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#

Initialize the FrozenLake workflow.

Parameters:
  • model – The model wrapper to use for generating actions.

  • task – The task configuration containing workflow-specific arguments.

  • auxiliary_models – Optional list of auxiliary models.

property rollout_args#
finished() bool[source]#

Check if the episode is finished.

Returns:

True if the player is on goal (G) or hole (H), False otherwise.

success() bool[source]#

Check if the agent has reached the goal (G).

Returns:

True if the player is on goal (G), False otherwise.

env_step(action: int)[source]#

Execute a step in the environment.

Maps custom action to gymnasium FrozenLakeEnv action and takes the step. Checks if the action is effective (whether player moves in the env).

Parameters:

action – The action to take.

Returns:

Tuple of (observation, reward, done, info).

render(mode='tiny_rgb_array')[source]#

Render the environment.

Parameters:

mode – Rendering mode. Options: “tiny_rgb_array”, “list”, “state”, “rgb_array”, “ansi”.

Returns:

Rendered observation based on the mode.

async run_async() List[Experience][source]#

Run the workflow and return a list of experiences.

Returns:

List of Experience objects, one for each rollout.