trinity.common.workflows package

Contents

trinity.common.workflows package#

Submodules#

Module contents#

Workflow module

class trinity.common.workflows.Task(workflow: ~typing.Type[~trinity.common.workflows.workflow.Workflow] | None = None, repeat_times: int | None = None, format_args: ~trinity.common.config.FormatConfig = <factory>, rollout_args: ~trinity.common.config.GenerationConfig = <factory>, workflow_args: dict = <factory>, reward_fn_args: dict = <factory>, is_eval: bool = False, reward_fn: ~typing.Type[~trinity.common.rewards.reward_fn.RewardFn] | None = None, raw_task: dict | None = None, batch_id: int | str = 0, task_id: int | str = 0)[source]#

Bases: dict

A Task class that defines a task and its associated reward function / workflow.

workflow: Type[Workflow] = None#
repeat_times: int | None = None#
format_args: FormatConfig#
rollout_args: GenerationConfig#
workflow_args: dict#
reward_fn_args: dict#
is_eval: bool = False#
reward_fn: Type[RewardFn] | None = None#
raw_task: dict | None = None#
batch_id: int | str = 0#
task_id: int | str = 0#
to_workflow(model: Any, auxiliary_models: List[OpenAI] | None = None) Workflow[source]#

Convert the task to a workflow.

Parameters:
  • model (ModelWrapper) – The rollout model for the workflow.

  • auxiliary_models (List[openai.OpenAI]) – The auxiliary models for the workflow.

Note

model_path attribute is added to the auxiliary_models for use within the workflow.

Returns:

The generated workflow object.

Return type:

Workflow

property task_desc: str | None#
property truth: str | None#
to_dict() dict[source]#
__init__(workflow: ~typing.Type[~trinity.common.workflows.workflow.Workflow] | None = None, repeat_times: int | None = None, format_args: ~trinity.common.config.FormatConfig = <factory>, rollout_args: ~trinity.common.config.GenerationConfig = <factory>, workflow_args: dict = <factory>, reward_fn_args: dict = <factory>, is_eval: bool = False, reward_fn: ~typing.Type[~trinity.common.rewards.reward_fn.RewardFn] | None = None, raw_task: dict | None = None, batch_id: int | str = 0, task_id: int | str = 0) None#
class trinity.common.workflows.Workflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: ABC

The base workflow class.

A workflow is a runnable object which generates a list of experiences.

__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
property resettable#
property repeatable#

A workflow is repeatable if it can be run multiple times within the run() or run_async() method.

property asynchronous#

Whether the workflow runs in async mode.

property rollout_args#
reset(task: Task)[source]#

Reset the workflow.

set_repeat_times(repeat_times: int, run_id_base: int) None[source]#

Set the number of times to repeat the workflow. :param repeat_times: number of times to repeat the workflow (if repeatable). :type repeat_times: int :param run_id_base: base run_id for setting run_id in experiences. :type run_id_base: int

run() List[Experience][source]#

Run workflow and return a list of experiences.

async run_async() List[Experience][source]#

Run workflow in async and return a list of experiences.

class trinity.common.workflows.SimpleWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: Workflow

A workflow for simple single-round task.

__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
property resettable#
reset(task: Task)[source]#

Reset the workflow.

set_repeat_times(repeat_times, run_id_base)[source]#

Set the number of times to repeat the workflow. :param repeat_times: number of times to repeat the workflow (if repeatable). :type repeat_times: int :param run_id_base: base run_id for setting run_id in experiences. :type run_id_base: int

format_messages()[source]#

Format messages for the instruct model.

run() List[Experience][source]#

Run workflow and return a list of experiences.

class trinity.common.workflows.MathWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: SimpleWorkflow

A workflow for math tasks as introduced in DeepSeek-R1.

__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
reset(task: Task)[source]#

Reset the workflow.

class trinity.common.workflows.WebShopWorkflow(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#

Bases: MultiTurnWorkflow

A workflow for webshop task.

__init__(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#
property resettable#
reset(task: Task)[source]#

Reset the workflow.

get_model_response(messages)[source]#
get_model_response_text(messages)[source]#
generate_env_inference_samples(env, session_id, rollout_num) List[Experience][source]#
run() List[Experience][source]#

Run workflow and return a list of experiences.

class trinity.common.workflows.AlfworldWorkflow(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#

Bases: MultiTurnWorkflow

A workflow for alfworld task.

__init__(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#
get_model_response(messages)[source]#
get_model_response_text(messages)[source]#
generate_env_inference_samples(env, rollout_num) List[Experience][source]#
run() List[Experience][source]#

Run workflow and return a list of experiences.

class trinity.common.workflows.StepWiseAlfworldWorkflow(model: ModelWrapper, task: Task, auxiliary_models: List | None = None, use_openai_client: bool = False)[source]#

Bases: RewardPropagationWorkflow

An Alfworld workflow refactored to use the RewardPropagationWorkflow base class.

This workflow manages an Alfworld environment, interacts with it step-by-step using a model, and calculates a final reward based on the episode’s outcome.

__init__(model: ModelWrapper, task: Task, auxiliary_models: List | None = None, use_openai_client: bool = False)[source]#
run() List[Experience][source]#

Run the workflow and return a list of experiences with step-wise rewards.

step(step_num: int) bool[source]#

Run a single step of your agent application.

Parameters:

step_num (int) – The current step number.

Returns:

Whether to continue running the agent application.

Return type:

bool

Tips:

You can use the openai client (self.client) to migrate your existing applications at low cost.

reward(exps: list[Experience]) float[source]#

Calculate the reward for the given experiences of the entire run.

property max_step_num: int#

Return the maximum number of steps allowed in an episode.

class trinity.common.workflows.RAFTAlfworldWorkflow(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#

Bases: Workflow

RAFT workflow for alfworld using trajectory context.

Process: 1. First exploration with normal experience generation 2. Generate SFT data from successful attempt

__init__(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#
reset(task: Task)[source]#

Reset the workflow with a new task

create_environment(game_file)[source]#

Create alfworld environment

run_single_rollout(env) tuple[List[Dict[str, str]], float, bool, int, List[Dict[str, str]]][source]#

Run a single rollout with RAFT-guided actions

eval_alfworld() List[Experience][source]#

Evaluate a single alfworld trajectory

run() List[Experience][source]#

Run the RAFT alfworld workflow and return experiences

resettable() bool[source]#

Indicate that this workflow can be reset to avoid re-initialization

set_repeat_times(repeat_times, run_id_base)[source]#

Set the number of times to repeat the workflow. :param repeat_times: number of times to repeat the workflow (if repeatable). :type repeat_times: int :param run_id_base: base run_id for setting run_id in experiences. :type run_id_base: int

class trinity.common.workflows.RAFTReflectAlfworldWorkflow(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#

Bases: RAFTAlfworldWorkflow

RAFT workflow for alfworld using trajectory context.

Process: 1. First exploration with normal experience generation 2. If failed, re-explore with first trajectory as context 3. Generate SFT data from successful attempt

__init__(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#
construct_sft_data(first_trajectory: List[Dict[str, str]], success: bool, reward: float, original_steps: int) tuple[List[Dict[str, str]], Dict[str, Any], List[Dict[str, str]]][source]#

Generate SFT training data using RAFT learning

re_explore_with_context(first_trajectory: List[Dict[str, str]], original_reward: float, original_success: bool, original_steps: int) tuple[List[Dict[str, str]], float, bool, int, List[Dict[str, str]]][source]#

Re-explore with first trajectory as context

run() List[Experience][source]#

Run the RAFT alfworld workflow and return experiences

class trinity.common.workflows.SciWorldWorkflow(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#

Bases: MultiTurnWorkflow

A workflow for sciworld task.

__init__(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#
get_model_response(messages)[source]#
get_model_response_text(messages)[source]#
generate_env_inference_samples(env, rollout_num) List[Experience][source]#
run() List[Experience][source]#

Run workflow and return a list of experiences.

class trinity.common.workflows.MathBoxedWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: SimpleWorkflow

A workflow for math tasks that give answers in boxed format.

reset(task: Task)[source]#

Reset the workflow.

format_prompt()[source]#
run() List[Experience][source]#

Run workflow and return a list of experiences.

class trinity.common.workflows.MathRMWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: SimpleWorkflow

A workflow for math tasks as introduced in DeepSeek-R1.

__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
run() List[Experience][source]#

Run workflow and return a list of experiences.

class trinity.common.workflows.ToolCallWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: SimpleWorkflow

A workflow for toolcall tasks. Prompt construction and reward function from NVlabs/Tool-N1

Only support qwen model for now. You can change the prompt construction and reward calculation by yourself for other models.

reset(task: Task)[source]#

Reset the workflow.

format_prompt()[source]#
run() List[Experience][source]#

Run workflow and return a list of experiences.

class trinity.common.workflows.MathEvalWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: Workflow

A workflow for standard math evaluation.

The evaluation standard and prompting style are follow the Qwen2.5-Math model’s evaluation methodology. For more details on their approach, see: QwenLM/Qwen2.5-Math

__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
property resettable#
property repeatable#

A workflow is repeatable if it can be run multiple times within the run() or run_async() method.

format_messages()[source]#

Format message for the evaluation of qwen_boxed type.

run() List[Experience][source]#

Run workflow and return a list of experiences.

class trinity.common.workflows.AgentScopeReactV2MathWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: Workflow

This workflow serves as an example of how to use the agentscope framework within the trinity workflow.

__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
property resettable#
reset(task: Task)[source]#

Reset the workflow.

property repeatable#

A workflow is repeatable if it can be run multiple times within the run() or run_async() method.

run()[source]#

Run workflow and return a list of experiences.

class trinity.common.workflows.EmailSearchWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: Workflow

Multi-turn Email Search workflow (ReAct-style tool use).

__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
property repeatable: bool#

A workflow is repeatable if it can be run multiple times within the run() or run_async() method.

property resettable#
reset(task: Task)[source]#

Reset the workflow.

run()[source]#

Run workflow and return a list of experiences.

calculate_reward(answer_and_sources: Dict) Dict[str, float][source]#

Ref: calculate_reward in OpenPipe/ART

class trinity.common.workflows.MathRULERWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: SimpleWorkflow

A workflow for math with RULER reward function.

Modified from MathWorkflow. Adapted from OpenPipe/ART

__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
reset(task: Task)[source]#

Note that in this workflow, MathRewardFn is only used for calculating the ‘golden reward’, whereasa the rewards used by RL training are calculated by RULER.

run() List[Experience][source]#

Modified from SimpleWorkflow.run

get_ruler_scores(responses: List[Experience], judger: Any) Tuple[bool, List[float]][source]#

Get RULER scores

class trinity.common.workflows.SimpleMMWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: SimpleWorkflow

A workflow for simple single-round task.

__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
reset(task: Task)[source]#

Reset the workflow.

run() List[Experience][source]#

Run workflow and return a list of experiences.