trinity.common.workflows package

Contents

trinity.common.workflows package#

Subpackages#

Submodules#

Module contents#

Workflow module

class trinity.common.workflows.Task(workflow: ~typing.Type[~trinity.common.workflows.workflow.Workflow] | None = None, repeat_times: int | None = None, format_args: ~trinity.common.config.FormatConfig = <factory>, rollout_args: ~trinity.common.config.GenerationConfig = <factory>, workflow_args: dict = <factory>, reward_fn_args: dict = <factory>, is_eval: bool = False, reward_fn: ~typing.Type[~trinity.common.rewards.reward_fn.RewardFn] | None = None, raw_task: dict | None = None, batch_id: int | str = '', task_id: int | str = '')[source]#

Bases: dict

A Task class that defines a task and its associated reward function / workflow.

workflow: Type[Workflow] = None#
repeat_times: int | None = None#
format_args: FormatConfig#
rollout_args: GenerationConfig#
workflow_args: dict#
reward_fn_args: dict#
is_eval: bool = False#
reward_fn: Type[RewardFn] | None = None#
raw_task: dict | None = None#
batch_id: int | str = ''#
task_id: int | str = ''#
to_workflow(model: Any, auxiliary_models: List[OpenAI] | None = None) Workflow[source]#

Convert the task to a workflow.

Parameters:
  • model (ModelWrapper) – The rollout model for the workflow.

  • auxiliary_models (List[openai.OpenAI]) – The auxiliary models for the workflow.

Note

model_path attribute is added to the auxiliary_models for use within the workflow.

Returns:

The generated workflow object.

Return type:

Workflow

property task_desc: str | None#
property truth: str | None#
to_dict() dict[source]#
__init__(workflow: ~typing.Type[~trinity.common.workflows.workflow.Workflow] | None = None, repeat_times: int | None = None, format_args: ~trinity.common.config.FormatConfig = <factory>, rollout_args: ~trinity.common.config.GenerationConfig = <factory>, workflow_args: dict = <factory>, reward_fn_args: dict = <factory>, is_eval: bool = False, reward_fn: ~typing.Type[~trinity.common.rewards.reward_fn.RewardFn] | None = None, raw_task: dict | None = None, batch_id: int | str = '', task_id: int | str = '') None#
class trinity.common.workflows.Workflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: object

The base workflow class.

A workflow is a runnable object which generates a list of experiences.

can_reset: bool = False#
can_repeat: bool = False#
is_async: bool = False#
__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
property resettable#

Deprecated, use cls.can_reset instead.

property repeatable#

Deprecated, use cls.can_repeat instead. A workflow is repeatable if it can be run multiple times within the run() or run_async() method.

property asynchronous#

Deprecated, use cls.is_async instead. Whether the workflow runs in async mode.

reset(task: Task)[source]#

Reset the workflow.

set_repeat_times(repeat_times: int, run_id_base: int) None[source]#

Set the number of times to repeat the workflow. :param repeat_times: number of times to repeat the workflow (if repeatable). :type repeat_times: int :param run_id_base: base run_id for setting run_id in experiences. :type run_id_base: int

run() List[Experience][source]#

Run workflow and return a list of experiences.

async run_async() List[Experience][source]#

Run workflow in async and return a list of experiences.

class trinity.common.workflows.AsyncSimpleWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: Workflow

is_async: bool = True#
async run_async() List[Experience][source]#

Run workflow in async and return a list of experiences.

class trinity.common.workflows.SimpleWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: Workflow

A workflow for simple single-round task.

can_reset: bool = True#
can_repeat: bool = True#
__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
reset(task: Task)[source]#

Reset the workflow.

set_repeat_times(repeat_times, run_id_base)[source]#

Set the number of times to repeat the workflow. :param repeat_times: number of times to repeat the workflow (if repeatable). :type repeat_times: int :param run_id_base: base run_id for setting run_id in experiences. :type run_id_base: int

property rollout_args#
format_messages()[source]#

Format messages for the instruct model.

run() List[Experience][source]#

Run workflow and return a list of experiences.

class trinity.common.workflows.AsyncMathWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: AsyncSimpleWorkflow, MathWorkflow

class trinity.common.workflows.MathWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: SimpleWorkflow

A workflow for math tasks as introduced in DeepSeek-R1.

__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
reset(task: Task)[source]#

Reset the workflow.

class trinity.common.workflows.WebShopWorkflow(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#

Bases: MultiTurnWorkflow

A workflow for webshop task.

can_reset: bool = True#
is_async: bool = True#
__init__(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#
reset(task: Task)[source]#

Reset the workflow.

async get_model_response(messages)[source]#
async get_model_response_text(messages)[source]#
async generate_env_inference_samples(env, session_id, rollout_num) List[Experience][source]#
async run_async() List[Experience][source]#

Run workflow in async and return a list of experiences.

class trinity.common.workflows.AlfworldWorkflow(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#

Bases: MultiTurnWorkflow

A workflow for alfworld task.

is_async: bool = True#
__init__(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#
async get_model_response(messages)[source]#
async get_model_response_text(messages)[source]#
async generate_env_inference_samples(env, rollout_num) List[Experience][source]#
async run_async() List[Experience][source]#

Run workflow in async and return a list of experiences.

class trinity.common.workflows.StepWiseAlfworldWorkflow(model: ModelWrapper, task: Task, auxiliary_models: List | None = None, use_openai_client: bool = False)[source]#

Bases: RewardPropagationWorkflow

An Alfworld workflow refactored to use the RewardPropagationWorkflow base class.

This workflow manages an Alfworld environment, interacts with it step-by-step using a model, and calculates a final reward based on the episode’s outcome.

__init__(model: ModelWrapper, task: Task, auxiliary_models: List | None = None, use_openai_client: bool = False)[source]#
run() List[Experience][source]#

Run the workflow and return a list of experiences with step-wise rewards.

step(step_num: int) bool[source]#

Run a single step of your agent application.

Parameters:

step_num (int) – The current step number.

Returns:

Whether to continue running the agent application.

Return type:

bool

Tips:

You can use the openai client (self.client) to migrate your existing applications at low cost.

reward(exps: list[Experience]) float[source]#

Calculate the reward for the given experiences of the entire run.

property max_step_num: int#

Return the maximum number of steps allowed in an episode.

class trinity.common.workflows.RAFTAlfworldWorkflow(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#

Bases: Workflow

RAFT workflow for alfworld using trajectory context.

Process: 1. First exploration with normal experience generation 2. Generate SFT data from successful attempt

can_reset: bool = True#
can_repeat: bool = True#
is_async: bool = True#
__init__(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#
reset(task: Task)[source]#

Reset the workflow with a new task

create_environment(game_file)[source]#

Create alfworld environment

async run_single_rollout(env) tuple[List[Dict[str, str]], float, bool, int, List[Dict[str, str]]][source]#

Run a single rollout with RAFT-guided actions

async eval_alfworld() List[Experience][source]#

Evaluate a single alfworld trajectory

async run_async() List[Experience][source]#

Run the RAFT alfworld workflow and return experiences

set_repeat_times(repeat_times, run_id_base)[source]#

Set the number of times to repeat the workflow. :param repeat_times: number of times to repeat the workflow (if repeatable). :type repeat_times: int :param run_id_base: base run_id for setting run_id in experiences. :type run_id_base: int

class trinity.common.workflows.RAFTReflectAlfworldWorkflow(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#

Bases: RAFTAlfworldWorkflow

RAFT workflow for alfworld using trajectory context.

Process: 1. First exploration with normal experience generation 2. If failed, re-explore with first trajectory as context 3. Generate SFT data from successful attempt

__init__(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#
async construct_sft_data(first_trajectory: List[Dict[str, str]], success: bool, reward: float, original_steps: int) tuple[List[Dict[str, str]], Dict[str, Any], List[Dict[str, str]]][source]#

Generate SFT training data using RAFT learning

async re_explore_with_context(first_trajectory: List[Dict[str, str]], original_reward: float, original_success: bool, original_steps: int) tuple[List[Dict[str, str]], float, bool, int, List[Dict[str, str]]][source]#

Re-explore with first trajectory as context

async run_async() List[Experience][source]#

Run the RAFT alfworld workflow and return experiences

class trinity.common.workflows.SciWorldWorkflow(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#

Bases: MultiTurnWorkflow

A workflow for sciworld task.

is_async: bool = True#
__init__(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#
async get_model_response(messages)[source]#
async get_model_response_text(messages)[source]#
async generate_env_inference_samples(env, rollout_num) List[Experience][source]#
async run_async() List[Experience][source]#

Run workflow in async and return a list of experiences.

class trinity.common.workflows.AsyncMathBoxedWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: MathBoxedWorkflow

is_async: bool = True#
async run_async() List[Experience][source]#

Run workflow in async and return a list of experiences.

class trinity.common.workflows.MathBoxedWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: SimpleWorkflow

A workflow for math tasks that give answers in boxed format.

reset(task: Task)[source]#

Reset the workflow.

format_prompt()[source]#
run() List[Experience][source]#

Run workflow and return a list of experiences.

class trinity.common.workflows.AsyncMathRMWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: MathRMWorkflow

is_async: bool = True#
async run_async() List[Experience][source]#

Run workflow in async and return a list of experiences.

class trinity.common.workflows.MathRMWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: SimpleWorkflow

A workflow for math tasks as introduced in DeepSeek-R1.

__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
run() List[Experience][source]#

Run workflow and return a list of experiences.

class trinity.common.workflows.ToolCallWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: SimpleWorkflow

A workflow for toolcall tasks. Prompt construction and reward function from NVlabs/Tool-N1

Only support qwen model for now. You can change the prompt construction and reward calculation by yourself for other models.

is_async: bool = True#
reset(task: Task)[source]#

Reset the workflow.

format_prompt()[source]#
async run_async() List[Experience][source]#

Run workflow in async and return a list of experiences.

class trinity.common.workflows.AsyncMathEvalWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: MathEvalWorkflow

is_async: bool = True#
async run_async() List[Experience][source]#

Run workflow in async and return a list of experiences.

class trinity.common.workflows.MathEvalWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: Workflow

A workflow for standard math evaluation.

The evaluation standard and prompting style are follow the Qwen2.5-Math model’s evaluation methodology. For more details on their approach, see: QwenLM/Qwen2.5-Math

__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
format_messages()[source]#

Format message for the evaluation of qwen_boxed type.

run() List[Experience][source]#

Run workflow and return a list of experiences.

class trinity.common.workflows.AgentScopeV0ReactMathWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: Workflow

This workflow serves as an example of how to use the agentscope framework within the trinity workflow. We use the AgentScope V0 version here. The code will be deprecated soon.

can_reset: bool = True#
__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
reset(task: Task)[source]#

Reset the workflow.

run()[source]#

Run workflow and return a list of experiences.

class trinity.common.workflows.AgentScopeReactMathWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: Workflow

This workflow serves as an example of how to use the agentscope framework within the trinity workflow. We use the AgentScope V1 version here.

can_reset: bool = True#
is_async: bool = True#
__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
reset(task: Task)[source]#

Reset the workflow.

async run_async()[source]#

Run workflow in async and return a list of experiences.

class trinity.common.workflows.AgentScopeV1ReactSearchWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: Workflow

This workflow serves as an example of how to use the agentscope framework within the trinity workflow.

can_reset: bool = True#
is_async: bool = True#
__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
reset(task: Task)[source]#

Reset the workflow.

judge_result(result, question, correct_answer, judge_model=None) bool[source]#

Use LLM to judge whether the answer is correct or not.

async run_async()[source]#

Run workflow in async and return a list of experiences.

class trinity.common.workflows.AgentScopeReActWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: Workflow

is_async: bool = True#
__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
async run_async()[source]#

Run the workflow asynchronously.

async calculate_reward(response) float | Dict[str, float][source]#

Calculate the reward for the workflow.

Returns:

The reward value or a dictionary of reward value.

Return type:

Union[float, Dict[str, float]]

construct_experiences(reward: float | Dict[str, float]) List[Experience][source]#

Construct experiences from the agent’s interaction history.

Parameters:

reward (Union[float, Dict[str, float]]) – The reward value to assign to each experience.

Returns:

A list of Experience objects.

Return type:

List

class trinity.common.workflows.EmailSearchWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: Workflow

Multi-turn Email Search workflow (ReAct-style tool use).

can_reset: bool = True#
is_async: bool = True#
__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
reset(task: Task)[source]#

Reset the workflow.

async run_async()[source]#

Run workflow in async and return a list of experiences.

calculate_reward(answer_and_sources: Dict) Dict[str, float][source]#

Ref: calculate_reward in OpenPipe/ART

class trinity.common.workflows.AsyncMathRULERWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: MathRULERWorkflow

is_async: bool = True#
async run_async() List[Experience][source]#

Modified from SimpleWorkflow.run

class trinity.common.workflows.MathRULERWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: SimpleWorkflow

A workflow for math with RULER reward function.

Modified from MathWorkflow. Adapted from OpenPipe/ART

__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
reset(task: Task)[source]#

Note that in this workflow, MathRewardFn is only used for calculating the ‘golden reward’, whereasa the rewards used by RL training are calculated by RULER.

run() List[Experience][source]#

Modified from SimpleWorkflow.run

get_ruler_scores(responses: List[Experience], judger: Any) Tuple[bool, List[float]][source]#

Get RULER scores

class trinity.common.workflows.MathTrainableRULERWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: SimpleWorkflow

A workflow for math, where the policy model itself serves as a RULER reward model. Modified from MathRULERWorkflow. RULER is adapted from OpenPipe/ART

__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
reset(task: Task)[source]#

Reset the workflow.

run() List[Experience][source]#

Modified from MathRULERWorkflow.run

get_ruler_responses(responses: List[Experience], judger: Any, ruler_rollout_args: Any, gold_scores: List[float] | None = None) Tuple[float, List[Experience], List[float]][source]#

Get RULER scores :returns: float

ruler_responses: List[Experience] ruler_scores: List[float]

Return type:

judge_success_rate

class trinity.common.workflows.AsyncSimpleMMWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: SimpleMMWorkflow

is_async: bool = True#
async run_async() List[Experience][source]#

Run workflow in async and return a list of experiences.

class trinity.common.workflows.SimpleMMWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: SimpleWorkflow

A workflow for simple single-round task.

__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
reset(task: Task)[source]#

Reset the workflow.

run() List[Experience][source]#

Run workflow and return a list of experiences.

class trinity.common.workflows.RubricJudgeWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: SimpleWorkflow

A workflow using LLM-as-a-judge and rubrics to get the reward.

Adapted from https://arxiv.org/pdf/2507.17746

__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
reset(task: Task)[source]#

Modified from SimpleWorkflow.reset

run() List[Experience][source]#

Modified from SimpleWorkflow.run

get_judge_reward(response: str, judger: OpenAI) Tuple[bool, float][source]#

Get rewards with LLM-as-a-judge The prompts are adapted from RAR-IMPLICIT method in https://arxiv.org/pdf/2507.17746

class trinity.common.workflows.AgentScopeWorkflowAdapter(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: Workflow

Adapter to wrap a agentscope trainable workflow function into a Trinity Workflow.

is_async: bool = True#
__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Initialize the adapter with the task and model.

construct_experiences(reward: float) List[Experience][source]#

Construct experiences from the agent’s interaction history.

Parameters:

reward (float) – The reward value to assign to each experience.

Returns:

A list of Experience objects.

Return type:

List

async run_async() List[Experience][source]#

Run the workflow asynchronously and return experiences.