trinity.common.workflows

Submodules

trinity.common.workflows.customized_math_workflows module

We include the customized math workflows in this file.

class trinity.common.workflows.customized_math_workflows.MathBoxedWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]

Bases: SimpleWorkflow

A workflow for math tasks that give answers in boxed format.

reset(task: Task)[source]

Reset the workflow.

format_prompt()[source]
run() List[Experience][source]

Run workflow and return a list of experiences.

trinity.common.workflows.customized_toolcall_workflows module

We include the customized toolcall workflows in this file. Code adapted from https://github.com/NVlabs/Tool-N1 Reference Paper https://arxiv.org/pdf/2505.00024 for further details.

trinity.common.workflows.customized_toolcall_workflows.construct_prompt(dp)[source]
trinity.common.workflows.customized_toolcall_workflows.validate_result(result, answer)[source]
trinity.common.workflows.customized_toolcall_workflows.validate_format(tool_call_list)[source]
trinity.common.workflows.customized_toolcall_workflows.extract_solution_v0(tool_call_str)[source]
trinity.common.workflows.customized_toolcall_workflows.compute_score_v0(solution_str, ground_truth, do_print=False)[source]
trinity.common.workflows.customized_toolcall_workflows.compute_toolcall_reward(solution_str: str, ground_truth: str) float[source]
class trinity.common.workflows.customized_toolcall_workflows.ToolCallWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]

Bases: SimpleWorkflow

A workflow for toolcall tasks. Prompt construction and reward function from https://github.com/NVlabs/Tool-N1

Only support qwen model for now. You can change the prompt construction and reward calculation by yourself for other models.

reset(task: Task)[source]

Reset the workflow.

format_prompt()[source]
run() List[Experience][source]

Run workflow and return a list of experiences.

trinity.common.workflows.eval_workflow module

Evaluation Workflow Class

class trinity.common.workflows.eval_workflow.MathEvalWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]

Bases: Workflow

A workflow for standard math evaluation.

The evaluation standard and prompting style are follow the Qwen2.5-Math model’s evaluation methodology. For more details on their approach, see: https://github.com/QwenLM/Qwen2.5-Math

__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]
property resettable
property repeatable

A workflow is repeatable if it can be run multiple times within the run() method.

format_messages()[source]

Format message for the evaluation of qwen_boxed type.

run() List[Experience][source]

Run workflow and return a list of experiences.

trinity.common.workflows.math_rm_workflow module

We include the math workflow with rm-gallery reward in this file.

class trinity.common.workflows.math_rm_workflow.MathRMWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]

Bases: SimpleWorkflow

A workflow for math tasks as introduced in DeepSeek-R1.

__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]
run() List[Experience][source]

Run workflow and return a list of experiences.

trinity.common.workflows.step_wise_workflow module

class trinity.common.workflows.step_wise_workflow.StepWiseRewardWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models=None, use_openai_client=True)[source]

Bases: Workflow

A workflow that implements step-wise rewards for tasks.

__init__(*, task: Task, model: ModelWrapper, auxiliary_models=None, use_openai_client=True)[source]
run() list[Experience][source]

Run the workflow and return a list of experiences with step-wise rewards.

abstract step(step_num: int) bool[source]

Run a single step of your agent application.

Parameters:

step_num (int) – The current step number.

Returns:

Whether to continue running the agent application.

Return type:

bool

Tips:

You can use the openai client (self.client) to migrate your existing applications at low cost.

abstract reward(exps: list[Experience], step_num: int) float[source]

Calculate the reward for the given experiences at the specified step.

abstract property max_step_num

Return the maximum number of steps in the task.

property repeatable

A workflow is repeatable if it can be run multiple times within the run() method.

class trinity.common.workflows.step_wise_workflow.RewardPropagationWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models=None, use_openai_client=True)[source]

Bases: Workflow

A workflow that propagates rewards across multiple turns.

__init__(*, task: Task, model: ModelWrapper, auxiliary_models=None, use_openai_client=True)[source]
run() list[Experience][source]

Run the workflow and return a list of experiences with step-wise rewards.

abstract step(step_num: int) bool[source]

Run a single step of your agent application.

Parameters:

step_num (int) – The current step number.

Returns:

Whether to continue running the agent application.

Return type:

bool

Tips:

You can use the openai client (self.client) to migrate your existing applications at low cost.

abstract reward(exps: list[Experience]) float[source]

Calculate the reward for the given experiences of the entire run.

abstract property max_step_num

Return the maximum number of steps in the task.

property repeatable

A workflow is repeatable if it can be run multiple times within the run() method.

trinity.common.workflows.workflow module

Base Workflow Class

class trinity.common.workflows.workflow.Task(workflow: ~typing.Type[~trinity.common.workflows.workflow.Workflow], repeat_times: int | None = None, format_args: ~trinity.common.config.FormatConfig = <factory>, rollout_args: ~trinity.common.config.GenerationConfig = <factory>, workflow_args: dict = <factory>, reward_fn_args: dict = <factory>, is_eval: bool = False, reward_fn: ~typing.Type[~trinity.common.rewards.reward_fn.RewardFn] | None = None, raw_task: dict | None = None, batch_id: int | str = 0, task_id: int | str = 0)[source]

Bases: dict

A Task class that defines a task and its associated reward function / workflow.

workflow: Type[Workflow]
repeat_times: int | None = None
format_args: FormatConfig
rollout_args: GenerationConfig
workflow_args: dict
reward_fn_args: dict
is_eval: bool = False
reward_fn: Type[RewardFn] | None = None
raw_task: dict | None = None
batch_id: int | str = 0
task_id: int | str = 0
to_workflow(model: Any, auxiliary_models: List[OpenAI] | None = None) Workflow[source]

Convert the task to a workflow.

Parameters:
  • model (ModelWrapper) – The rollout model for the workflow.

  • auxiliary_models (List[openai.OpenAI]) – The auxiliary models for the workflow.

Note

model_path attribute is added to the auxiliary_models for use within the workflow.

Returns:

The generated workflow object.

Return type:

Workflow

property task_desc: str | None
property truth: str | None
to_dict() dict[source]
__init__(workflow: ~typing.Type[~trinity.common.workflows.workflow.Workflow], repeat_times: int | None = None, format_args: ~trinity.common.config.FormatConfig = <factory>, rollout_args: ~trinity.common.config.GenerationConfig = <factory>, workflow_args: dict = <factory>, reward_fn_args: dict = <factory>, is_eval: bool = False, reward_fn: ~typing.Type[~trinity.common.rewards.reward_fn.RewardFn] | None = None, raw_task: dict | None = None, batch_id: int | str = 0, task_id: int | str = 0) None
class trinity.common.workflows.workflow.Workflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]

Bases: ABC

The base workflow class.

A workflow is a runnable object which generates a list of experiences.

__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]
property resettable
property repeatable

A workflow is repeatable if it can be run multiple times within the run() method.

property rollout_args
reset(task: Task)[source]

Reset the workflow.

set_repeat_times(repeat_times: int, run_id_base: int) None[source]

Set the number of times to repeat the workflow. :param repeat_times: number of times to repeat the workflow (if repeatable). :type repeat_times: int :param run_id_base: base run_id for setting run_id in experiences. :type run_id_base: int

abstract run() List[Experience][source]

Run workflow and return a list of experiences.

class trinity.common.workflows.workflow.MultiTurnWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]

Bases: Workflow

The base workflow class for concatenated multi-turn tasks.

__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]
set_repeat_times(repeat_times, run_id_base)[source]

Set the number of times to repeat the workflow. :param repeat_times: number of times to repeat the workflow (if repeatable). :type repeat_times: int :param run_id_base: base run_id for setting run_id in experiences. :type run_id_base: int

abstract run() List[Experience][source]

Run workflow and return a list of experiences.

process_messages_to_experience(messages, reward, info={}) Experience[source]
class trinity.common.workflows.workflow.SimpleWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]

Bases: Workflow

A workflow for simple single-round task.

__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]
property resettable
reset(task: Task)[source]

Reset the workflow.

set_repeat_times(repeat_times, run_id_base)[source]

Set the number of times to repeat the workflow. :param repeat_times: number of times to repeat the workflow (if repeatable). :type repeat_times: int :param run_id_base: base run_id for setting run_id in experiences. :type run_id_base: int

format_messages()[source]

Format messages for the instruct model.

run() List[Experience][source]

Run workflow and return a list of experiences.

class trinity.common.workflows.workflow.MathWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]

Bases: SimpleWorkflow

A workflow for math tasks as introduced in DeepSeek-R1.

__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]
reset(task: Task)[source]

Reset the workflow.

Module contents

Workflow module

class trinity.common.workflows.Task(workflow: ~typing.Type[~trinity.common.workflows.workflow.Workflow], repeat_times: int | None = None, format_args: ~trinity.common.config.FormatConfig = <factory>, rollout_args: ~trinity.common.config.GenerationConfig = <factory>, workflow_args: dict = <factory>, reward_fn_args: dict = <factory>, is_eval: bool = False, reward_fn: ~typing.Type[~trinity.common.rewards.reward_fn.RewardFn] | None = None, raw_task: dict | None = None, batch_id: int | str = 0, task_id: int | str = 0)[source]

Bases: dict

A Task class that defines a task and its associated reward function / workflow.

workflow: Type[Workflow]
repeat_times: int | None = None
format_args: FormatConfig
rollout_args: GenerationConfig
workflow_args: dict
reward_fn_args: dict
is_eval: bool = False
reward_fn: Type[RewardFn] | None = None
raw_task: dict | None = None
batch_id: int | str = 0
task_id: int | str = 0
to_workflow(model: Any, auxiliary_models: List[OpenAI] | None = None) Workflow[source]

Convert the task to a workflow.

Parameters:
  • model (ModelWrapper) – The rollout model for the workflow.

  • auxiliary_models (List[openai.OpenAI]) – The auxiliary models for the workflow.

Note

model_path attribute is added to the auxiliary_models for use within the workflow.

Returns:

The generated workflow object.

Return type:

Workflow

property task_desc: str | None
property truth: str | None
to_dict() dict[source]
__init__(workflow: ~typing.Type[~trinity.common.workflows.workflow.Workflow], repeat_times: int | None = None, format_args: ~trinity.common.config.FormatConfig = <factory>, rollout_args: ~trinity.common.config.GenerationConfig = <factory>, workflow_args: dict = <factory>, reward_fn_args: dict = <factory>, is_eval: bool = False, reward_fn: ~typing.Type[~trinity.common.rewards.reward_fn.RewardFn] | None = None, raw_task: dict | None = None, batch_id: int | str = 0, task_id: int | str = 0) None
class trinity.common.workflows.Workflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]

Bases: ABC

The base workflow class.

A workflow is a runnable object which generates a list of experiences.

__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]
property resettable
property repeatable

A workflow is repeatable if it can be run multiple times within the run() method.

property rollout_args
reset(task: Task)[source]

Reset the workflow.

set_repeat_times(repeat_times: int, run_id_base: int) None[source]

Set the number of times to repeat the workflow. :param repeat_times: number of times to repeat the workflow (if repeatable). :type repeat_times: int :param run_id_base: base run_id for setting run_id in experiences. :type run_id_base: int

abstract run() List[Experience][source]

Run workflow and return a list of experiences.

class trinity.common.workflows.SimpleWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]

Bases: Workflow

A workflow for simple single-round task.

__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]
property resettable
reset(task: Task)[source]

Reset the workflow.

set_repeat_times(repeat_times, run_id_base)[source]

Set the number of times to repeat the workflow. :param repeat_times: number of times to repeat the workflow (if repeatable). :type repeat_times: int :param run_id_base: base run_id for setting run_id in experiences. :type run_id_base: int

format_messages()[source]

Format messages for the instruct model.

run() List[Experience][source]

Run workflow and return a list of experiences.

class trinity.common.workflows.MathWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]

Bases: SimpleWorkflow

A workflow for math tasks as introduced in DeepSeek-R1.

__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]
reset(task: Task)[source]

Reset the workflow.

class trinity.common.workflows.WebShopWorkflow(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]

Bases: MultiTurnWorkflow

A workflow for webshop task.

__init__(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]
property resettable
reset(task: Task)[source]

Reset the workflow.

get_model_response(messages)[source]
get_model_response_text(messages)[source]
generate_env_inference_samples(env, session_id, rollout_num) List[Experience][source]
run() List[Experience][source]

Run workflow and return a list of experiences.

class trinity.common.workflows.AlfworldWorkflow(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]

Bases: MultiTurnWorkflow

A workflow for alfworld task.

__init__(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]
get_model_response(messages)[source]
get_model_response_text(messages)[source]
generate_env_inference_samples(env, rollout_num) List[Experience][source]
run() List[Experience][source]

Run workflow and return a list of experiences.

class trinity.common.workflows.StepWiseAlfworldWorkflow(model: ModelWrapper, task: Task, auxiliary_models: List | None = None, use_openai_client: bool = False)[source]

Bases: RewardPropagationWorkflow

An Alfworld workflow refactored to use the RewardPropagationWorkflow base class.

This workflow manages an Alfworld environment, interacts with it step-by-step using a model, and calculates a final reward based on the episode’s outcome.

__init__(model: ModelWrapper, task: Task, auxiliary_models: List | None = None, use_openai_client: bool = False)[source]
run() List[Experience][source]

Run the workflow and return a list of experiences with step-wise rewards.

step(step_num: int) bool[source]

Run a single step of your agent application.

Parameters:

step_num (int) – The current step number.

Returns:

Whether to continue running the agent application.

Return type:

bool

Tips:

You can use the openai client (self.client) to migrate your existing applications at low cost.

reward(exps: list[Experience]) float[source]

Calculate the reward for the given experiences of the entire run.

property max_step_num: int

Return the maximum number of steps allowed in an episode.

class trinity.common.workflows.RAFTAlfworldWorkflow(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]

Bases: Workflow

RAFT workflow for alfworld using trajectory context.

Process: 1. First exploration with normal experience generation 2. Generate SFT data from successful attempt

__init__(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]
reset(task: Task)[source]

Reset the workflow with a new task

create_environment(game_file)[source]

Create alfworld environment

run_single_rollout(env) tuple[List[Dict[str, str]], float, bool, int, List[Dict[str, str]]][source]

Run a single rollout with RAFT-guided actions

eval_alfworld() List[Experience][source]

Evaluate a single alfworld trajectory

run() List[Experience][source]

Run the RAFT alfworld workflow and return experiences

resettable() bool[source]

Indicate that this workflow can be reset to avoid re-initialization

set_repeat_times(repeat_times, run_id_base)[source]

Set the number of times to repeat the workflow. :param repeat_times: number of times to repeat the workflow (if repeatable). :type repeat_times: int :param run_id_base: base run_id for setting run_id in experiences. :type run_id_base: int

class trinity.common.workflows.RAFTReflectAlfworldWorkflow(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]

Bases: RAFTAlfworldWorkflow

RAFT workflow for alfworld using trajectory context.

Process: 1. First exploration with normal experience generation 2. If failed, re-explore with first trajectory as context 3. Generate SFT data from successful attempt

__init__(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]
construct_sft_data(first_trajectory: List[Dict[str, str]], success: bool, reward: float, original_steps: int) tuple[List[Dict[str, str]], Dict[str, Any], List[Dict[str, str]]][source]

Generate SFT training data using RAFT learning

re_explore_with_context(first_trajectory: List[Dict[str, str]], original_reward: float, original_success: bool, original_steps: int) tuple[List[Dict[str, str]], float, bool, int, List[Dict[str, str]]][source]

Re-explore with first trajectory as context

run() List[Experience][source]

Run the RAFT alfworld workflow and return experiences

class trinity.common.workflows.SciWorldWorkflow(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]

Bases: MultiTurnWorkflow

A workflow for sciworld task.

__init__(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]
get_model_response(messages)[source]
get_model_response_text(messages)[source]
generate_env_inference_samples(env, rollout_num) List[Experience][source]
run() List[Experience][source]

Run workflow and return a list of experiences.

class trinity.common.workflows.MathBoxedWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]

Bases: SimpleWorkflow

A workflow for math tasks that give answers in boxed format.

reset(task: Task)[source]

Reset the workflow.

format_prompt()[source]
run() List[Experience][source]

Run workflow and return a list of experiences.

class trinity.common.workflows.MathRMWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]

Bases: SimpleWorkflow

A workflow for math tasks as introduced in DeepSeek-R1.

__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]
run() List[Experience][source]

Run workflow and return a list of experiences.

class trinity.common.workflows.ToolCallWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]

Bases: SimpleWorkflow

A workflow for toolcall tasks. Prompt construction and reward function from https://github.com/NVlabs/Tool-N1

Only support qwen model for now. You can change the prompt construction and reward calculation by yourself for other models.

reset(task: Task)[source]

Reset the workflow.

format_prompt()[source]
run() List[Experience][source]

Run workflow and return a list of experiences.

class trinity.common.workflows.MathEvalWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]

Bases: Workflow

A workflow for standard math evaluation.

The evaluation standard and prompting style are follow the Qwen2.5-Math model’s evaluation methodology. For more details on their approach, see: https://github.com/QwenLM/Qwen2.5-Math

__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]
property resettable
property repeatable

A workflow is repeatable if it can be run multiple times within the run() method.

format_messages()[source]

Format message for the evaluation of qwen_boxed type.

run() List[Experience][source]

Run workflow and return a list of experiences.

class trinity.common.workflows.AgentScopeReactV2MathWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]

Bases: Workflow

This workflow serves as an example of how to use the agentscope framework within the trinity workflow.

__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]
property resettable
reset(task: Task)[source]

Reset the workflow.

property repeatable

A workflow is repeatable if it can be run multiple times within the run() method.

run()[source]

Run workflow and return a list of experiences.