trinity.common.workflows package

Contents

trinity.common.workflows package#

Submodules#

Module contents#

Workflow module

class trinity.common.workflows.Task(workflow: ~typing.Type[~trinity.common.workflows.workflow.Workflow] | None = None, repeat_times: int | None = None, format_args: ~trinity.common.config.FormatConfig = <factory>, rollout_args: ~trinity.common.config.GenerationConfig = <factory>, workflow_args: dict = <factory>, reward_fn_args: dict = <factory>, is_eval: bool = False, reward_fn: ~typing.Type[~trinity.common.rewards.reward_fn.RewardFn] | None = None, raw_task: dict | None = None, batch_id: int | str = '', task_id: int | str = '')[source]#

Bases: dict

A Task class that defines a task and its associated reward function / workflow.

workflow: Type[Workflow] = None#
repeat_times: int | None = None#
format_args: FormatConfig#
rollout_args: GenerationConfig#
workflow_args: dict#
reward_fn_args: dict#
is_eval: bool = False#
reward_fn: Type[RewardFn] | None = None#
raw_task: dict | None = None#
batch_id: int | str = ''#
task_id: int | str = ''#
to_workflow(model: Any, auxiliary_models: List[OpenAI] | None = None) Workflow[source]#

Convert the task to a workflow.

Parameters:
  • model (ModelWrapper) – The rollout model for the workflow.

  • auxiliary_models (List[openai.OpenAI]) – The auxiliary models for the workflow.

Note

model_path attribute is added to the auxiliary_models for use within the workflow.

Returns:

The generated workflow object.

Return type:

Workflow

property task_desc: str | None#
property truth: str | None#
to_dict() dict[source]#
__init__(workflow: ~typing.Type[~trinity.common.workflows.workflow.Workflow] | None = None, repeat_times: int | None = None, format_args: ~trinity.common.config.FormatConfig = <factory>, rollout_args: ~trinity.common.config.GenerationConfig = <factory>, workflow_args: dict = <factory>, reward_fn_args: dict = <factory>, is_eval: bool = False, reward_fn: ~typing.Type[~trinity.common.rewards.reward_fn.RewardFn] | None = None, raw_task: dict | None = None, batch_id: int | str = '', task_id: int | str = '') None#
class trinity.common.workflows.Workflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: ABC

The base workflow class.

A workflow is a runnable object which generates a list of experiences.

__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
property resettable#
property repeatable#

A workflow is repeatable if it can be run multiple times within the run() or run_async() method.

property asynchronous#

Whether the workflow runs in async mode.

property rollout_args#
reset(task: Task)[source]#

Reset the workflow.

set_repeat_times(repeat_times: int, run_id_base: int) None[source]#

Set the number of times to repeat the workflow. :param repeat_times: number of times to repeat the workflow (if repeatable). :type repeat_times: int :param run_id_base: base run_id for setting run_id in experiences. :type run_id_base: int

run() List[Experience][source]#

Run workflow and return a list of experiences.

async run_async() List[Experience][source]#

Run workflow in async and return a list of experiences.

class trinity.common.workflows.AsyncSimpleWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: Workflow

property asynchronous#

Whether the workflow runs in async mode.

async run_async() List[Experience][source]#

Run workflow in async and return a list of experiences.

class trinity.common.workflows.SimpleWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: Workflow

A workflow for simple single-round task.

__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
property resettable#
reset(task: Task)[source]#

Reset the workflow.

set_repeat_times(repeat_times, run_id_base)[source]#

Set the number of times to repeat the workflow. :param repeat_times: number of times to repeat the workflow (if repeatable). :type repeat_times: int :param run_id_base: base run_id for setting run_id in experiences. :type run_id_base: int

format_messages()[source]#

Format messages for the instruct model.

run() List[Experience][source]#

Run workflow and return a list of experiences.

class trinity.common.workflows.AsyncMathWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: AsyncSimpleWorkflow, MathWorkflow

class trinity.common.workflows.MathWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: SimpleWorkflow

A workflow for math tasks as introduced in DeepSeek-R1.

__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
reset(task: Task)[source]#

Reset the workflow.

class trinity.common.workflows.WebShopWorkflow(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#

Bases: MultiTurnWorkflow

A workflow for webshop task.

__init__(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#
property resettable#
reset(task: Task)[source]#

Reset the workflow.

async get_model_response(messages)[source]#
async get_model_response_text(messages)[source]#
async generate_env_inference_samples(env, session_id, rollout_num) List[Experience][source]#
async run_async() List[Experience][source]#

Run workflow in async and return a list of experiences.

class trinity.common.workflows.AlfworldWorkflow(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#

Bases: MultiTurnWorkflow

A workflow for alfworld task.

__init__(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#
property asynchronous#

Whether the workflow runs in async mode.

async get_model_response(messages)[source]#
async get_model_response_text(messages)[source]#
async generate_env_inference_samples(env, rollout_num) List[Experience][source]#
async run_async() List[Experience][source]#

Run workflow in async and return a list of experiences.

class trinity.common.workflows.StepWiseAlfworldWorkflow(model: ModelWrapper, task: Task, auxiliary_models: List | None = None, use_openai_client: bool = False)[source]#

Bases: RewardPropagationWorkflow

An Alfworld workflow refactored to use the RewardPropagationWorkflow base class.

This workflow manages an Alfworld environment, interacts with it step-by-step using a model, and calculates a final reward based on the episode’s outcome.

__init__(model: ModelWrapper, task: Task, auxiliary_models: List | None = None, use_openai_client: bool = False)[source]#
run() List[Experience][source]#

Run the workflow and return a list of experiences with step-wise rewards.

step(step_num: int) bool[source]#

Run a single step of your agent application.

Parameters:

step_num (int) – The current step number.

Returns:

Whether to continue running the agent application.

Return type:

bool

Tips:

You can use the openai client (self.client) to migrate your existing applications at low cost.

reward(exps: list[Experience]) float[source]#

Calculate the reward for the given experiences of the entire run.

property max_step_num: int#

Return the maximum number of steps allowed in an episode.

class trinity.common.workflows.RAFTAlfworldWorkflow(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#

Bases: Workflow

RAFT workflow for alfworld using trajectory context.

Process: 1. First exploration with normal experience generation 2. Generate SFT data from successful attempt

__init__(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#
property asynchronous#

Whether the workflow runs in async mode.

reset(task: Task)[source]#

Reset the workflow with a new task

create_environment(game_file)[source]#

Create alfworld environment

async run_single_rollout(env) tuple[List[Dict[str, str]], float, bool, int, List[Dict[str, str]]][source]#

Run a single rollout with RAFT-guided actions

async eval_alfworld() List[Experience][source]#

Evaluate a single alfworld trajectory

async run_async() List[Experience][source]#

Run the RAFT alfworld workflow and return experiences

resettable() bool[source]#

Indicate that this workflow can be reset to avoid re-initialization

set_repeat_times(repeat_times, run_id_base)[source]#

Set the number of times to repeat the workflow. :param repeat_times: number of times to repeat the workflow (if repeatable). :type repeat_times: int :param run_id_base: base run_id for setting run_id in experiences. :type run_id_base: int

class trinity.common.workflows.RAFTReflectAlfworldWorkflow(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#

Bases: RAFTAlfworldWorkflow

RAFT workflow for alfworld using trajectory context.

Process: 1. First exploration with normal experience generation 2. If failed, re-explore with first trajectory as context 3. Generate SFT data from successful attempt

__init__(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#
property asynchronous#

Whether the workflow runs in async mode.

async construct_sft_data(first_trajectory: List[Dict[str, str]], success: bool, reward: float, original_steps: int) tuple[List[Dict[str, str]], Dict[str, Any], List[Dict[str, str]]][source]#

Generate SFT training data using RAFT learning

async re_explore_with_context(first_trajectory: List[Dict[str, str]], original_reward: float, original_success: bool, original_steps: int) tuple[List[Dict[str, str]], float, bool, int, List[Dict[str, str]]][source]#

Re-explore with first trajectory as context

async run_async() List[Experience][source]#

Run the RAFT alfworld workflow and return experiences

class trinity.common.workflows.SciWorldWorkflow(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#

Bases: MultiTurnWorkflow

A workflow for sciworld task.

__init__(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#
property asynchronous#

Whether the workflow runs in async mode.

async get_model_response(messages)[source]#
async get_model_response_text(messages)[source]#
async generate_env_inference_samples(env, rollout_num) List[Experience][source]#
async run_async() List[Experience][source]#

Run workflow in async and return a list of experiences.

class trinity.common.workflows.AsyncMathBoxedWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: MathBoxedWorkflow

property asynchronous#

Whether the workflow runs in async mode.

async run_async() List[Experience][source]#

Run workflow in async and return a list of experiences.

class trinity.common.workflows.MathBoxedWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: SimpleWorkflow

A workflow for math tasks that give answers in boxed format.

reset(task: Task)[source]#

Reset the workflow.

format_prompt()[source]#
run() List[Experience][source]#

Run workflow and return a list of experiences.

class trinity.common.workflows.AsyncMathRMWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: MathRMWorkflow

property asynchronous#

Whether the workflow runs in async mode.

async run_async() List[Experience][source]#

Run workflow in async and return a list of experiences.

class trinity.common.workflows.MathRMWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: SimpleWorkflow

A workflow for math tasks as introduced in DeepSeek-R1.

__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
run() List[Experience][source]#

Run workflow and return a list of experiences.

class trinity.common.workflows.ToolCallWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: SimpleWorkflow

A workflow for toolcall tasks. Prompt construction and reward function from NVlabs/Tool-N1

Only support qwen model for now. You can change the prompt construction and reward calculation by yourself for other models.

reset(task: Task)[source]#

Reset the workflow.

property asynchronous#

Whether the workflow runs in async mode.

format_prompt()[source]#
async run_async() List[Experience][source]#

Run workflow in async and return a list of experiences.

class trinity.common.workflows.AsyncMathEvalWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: MathEvalWorkflow

property asynchronous#

Whether the workflow runs in async mode.

async run_async() List[Experience][source]#

Run workflow in async and return a list of experiences.

class trinity.common.workflows.MathEvalWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: Workflow

A workflow for standard math evaluation.

The evaluation standard and prompting style are follow the Qwen2.5-Math model’s evaluation methodology. For more details on their approach, see: QwenLM/Qwen2.5-Math

__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
property resettable#
property repeatable#

A workflow is repeatable if it can be run multiple times within the run() or run_async() method.

format_messages()[source]#

Format message for the evaluation of qwen_boxed type.

run() List[Experience][source]#

Run workflow and return a list of experiences.

class trinity.common.workflows.AgentScopeV0ReactMathWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: Workflow

This workflow serves as an example of how to use the agentscope framework within the trinity workflow. We use the AgentScope V0 version here. The code will be deprecated soon.

__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
property resettable#
reset(task: Task)[source]#

Reset the workflow.

property repeatable#

A workflow is repeatable if it can be run multiple times within the run() or run_async() method.

run()[source]#

Run workflow and return a list of experiences.

class trinity.common.workflows.AgentScopeReactMathWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: Workflow

This workflow serves as an example of how to use the agentscope framework within the trinity workflow. We use the AgentScope V1 version here.

__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
property resettable#
reset(task: Task)[source]#

Reset the workflow.

property repeatable#

A workflow is repeatable if it can be run multiple times within the run() or run_async() method.

property asynchronous#

Whether the workflow runs in async mode.

async run_async()[source]#

Run workflow in async and return a list of experiences.

class trinity.common.workflows.AgentScopeV1ReactSearchWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: Workflow

This workflow serves as an example of how to use the agentscope framework within the trinity workflow.

__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
property resettable#
property asynchronous#

Whether the workflow runs in async mode.

property repeatable#

A workflow is repeatable if it can be run multiple times within the run() or run_async() method.

reset(task: Task)[source]#

Reset the workflow.

judge_result(result, question, correct_answer, judge_model=None) bool[source]#

Use LLM to judge whether the answer is correct or not.

async run_async()[source]#

Run workflow in async and return a list of experiences.

class trinity.common.workflows.EmailSearchWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: Workflow

Multi-turn Email Search workflow (ReAct-style tool use).

__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
property repeatable: bool#

A workflow is repeatable if it can be run multiple times within the run() or run_async() method.

property resettable#
reset(task: Task)[source]#

Reset the workflow.

run()[source]#

Run workflow and return a list of experiences.

calculate_reward(answer_and_sources: Dict) Dict[str, float][source]#

Ref: calculate_reward in OpenPipe/ART

class trinity.common.workflows.AsyncMathRULERWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: MathRULERWorkflow

property asynchronous#

Whether the workflow runs in async mode.

async run_async() List[Experience][source]#

Modified from SimpleWorkflow.run

class trinity.common.workflows.MathRULERWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: SimpleWorkflow

A workflow for math with RULER reward function.

Modified from MathWorkflow. Adapted from OpenPipe/ART

__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
reset(task: Task)[source]#

Note that in this workflow, MathRewardFn is only used for calculating the ‘golden reward’, whereasa the rewards used by RL training are calculated by RULER.

run() List[Experience][source]#

Modified from SimpleWorkflow.run

get_ruler_scores(responses: List[Experience], judger: Any) Tuple[bool, List[float]][source]#

Get RULER scores

class trinity.common.workflows.MathTrainableRULERWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: SimpleWorkflow

A workflow for math, where the policy model itself serves as a RULER reward model. Modified from MathRULERWorkflow. RULER is adapted from OpenPipe/ART

__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
reset(task: Task)[source]#

Reset the workflow.

run() List[Experience][source]#

Modified from MathRULERWorkflow.run

get_ruler_responses(responses: List[Experience], judger: Any, ruler_rollout_args: Any, gold_scores: List[float] | None = None) Tuple[float, List[Experience], List[float]][source]#

Get RULER scores :returns: float

ruler_responses: List[Experience] ruler_scores: List[float]

Return type:

judge_success_rate

class trinity.common.workflows.AsyncSimpleMMWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: SimpleMMWorkflow

property asynchronous#

Whether the workflow runs in async mode.

async run_async() List[Experience][source]#

Run workflow in async and return a list of experiences.

class trinity.common.workflows.SimpleMMWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: SimpleWorkflow

A workflow for simple single-round task.

__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
reset(task: Task)[source]#

Reset the workflow.

run() List[Experience][source]#

Run workflow and return a list of experiences.