trinity.common.workflows package#
Subpackages#
Submodules#
- trinity.common.workflows.agentscope_workflow module
- trinity.common.workflows.customized_math_workflows module
- trinity.common.workflows.customized_toolcall_workflows module
- trinity.common.workflows.eval_workflow module
- trinity.common.workflows.math_rm_workflow module
- trinity.common.workflows.math_ruler_workflow module
- trinity.common.workflows.math_trainable_ruler_workflow module
- trinity.common.workflows.rubric_judge_workflow module
- trinity.common.workflows.simple_mm_workflow module
- trinity.common.workflows.step_wise_workflow module
- trinity.common.workflows.workflow module
Module contents#
Workflow module
- class trinity.common.workflows.Task(workflow: ~typing.Type[~trinity.common.workflows.workflow.Workflow] | None = None, repeat_times: int | None = None, format_args: ~trinity.common.config.FormatConfig = <factory>, rollout_args: ~trinity.common.config.GenerationConfig = <factory>, workflow_args: dict = <factory>, reward_fn_args: dict = <factory>, is_eval: bool = False, reward_fn: ~typing.Type[~trinity.common.rewards.reward_fn.RewardFn] | None = None, raw_task: dict | None = None, batch_id: int | str = '', task_id: int | str = '')[source]#
Bases:
dict
A Task class that defines a task and its associated reward function / workflow.
- repeat_times: int | None = None#
- format_args: FormatConfig#
- rollout_args: GenerationConfig#
- workflow_args: dict#
- reward_fn_args: dict#
- is_eval: bool = False#
- raw_task: dict | None = None#
- batch_id: int | str = ''#
- task_id: int | str = ''#
- to_workflow(model: Any, auxiliary_models: List[OpenAI] | None = None) Workflow [source]#
Convert the task to a workflow.
- Parameters:
model (ModelWrapper) – The rollout model for the workflow.
auxiliary_models (List[openai.OpenAI]) – The auxiliary models for the workflow.
Note
model_path attribute is added to the auxiliary_models for use within the workflow.
- Returns:
The generated workflow object.
- Return type:
- property task_desc: str | None#
- property truth: str | None#
- __init__(workflow: ~typing.Type[~trinity.common.workflows.workflow.Workflow] | None = None, repeat_times: int | None = None, format_args: ~trinity.common.config.FormatConfig = <factory>, rollout_args: ~trinity.common.config.GenerationConfig = <factory>, workflow_args: dict = <factory>, reward_fn_args: dict = <factory>, is_eval: bool = False, reward_fn: ~typing.Type[~trinity.common.rewards.reward_fn.RewardFn] | None = None, raw_task: dict | None = None, batch_id: int | str = '', task_id: int | str = '') None #
- class trinity.common.workflows.Workflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
object
The base workflow class.
A workflow is a runnable object which generates a list of experiences.
- can_reset: bool = False#
- can_repeat: bool = False#
- is_async: bool = False#
- __init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
- property resettable#
Deprecated, use cls.can_reset instead.
- property repeatable#
Deprecated, use cls.can_repeat instead. A workflow is repeatable if it can be run multiple times within the run() or run_async() method.
- property asynchronous#
Deprecated, use cls.is_async instead. Whether the workflow runs in async mode.
- set_repeat_times(repeat_times: int, run_id_base: int) None [source]#
Set the number of times to repeat the workflow. :param repeat_times: number of times to repeat the workflow (if repeatable). :type repeat_times: int :param run_id_base: base run_id for setting run_id in experiences. :type run_id_base: int
- run() List[Experience] [source]#
Run workflow and return a list of experiences.
- async run_async() List[Experience] [source]#
Run workflow in async and return a list of experiences.
- class trinity.common.workflows.AsyncSimpleWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
Workflow
- is_async: bool = True#
- async run_async() List[Experience] [source]#
Run workflow in async and return a list of experiences.
- class trinity.common.workflows.SimpleWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
Workflow
A workflow for simple single-round task.
- can_reset: bool = True#
- can_repeat: bool = True#
- __init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
- set_repeat_times(repeat_times, run_id_base)[source]#
Set the number of times to repeat the workflow. :param repeat_times: number of times to repeat the workflow (if repeatable). :type repeat_times: int :param run_id_base: base run_id for setting run_id in experiences. :type run_id_base: int
- property rollout_args#
- run() List[Experience] [source]#
Run workflow and return a list of experiences.
- class trinity.common.workflows.AsyncMathWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
AsyncSimpleWorkflow
,MathWorkflow
- class trinity.common.workflows.MathWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
SimpleWorkflow
A workflow for math tasks as introduced in DeepSeek-R1.
- __init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
- class trinity.common.workflows.WebShopWorkflow(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#
Bases:
MultiTurnWorkflow
A workflow for webshop task.
- can_reset: bool = True#
- is_async: bool = True#
- __init__(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#
- async generate_env_inference_samples(env, session_id, rollout_num) List[Experience] [source]#
- async run_async() List[Experience] [source]#
Run workflow in async and return a list of experiences.
- class trinity.common.workflows.AlfworldWorkflow(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#
Bases:
MultiTurnWorkflow
A workflow for alfworld task.
- is_async: bool = True#
- __init__(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#
- async generate_env_inference_samples(env, rollout_num) List[Experience] [source]#
- async run_async() List[Experience] [source]#
Run workflow in async and return a list of experiences.
- class trinity.common.workflows.StepWiseAlfworldWorkflow(model: ModelWrapper, task: Task, auxiliary_models: List | None = None, use_openai_client: bool = False)[source]#
Bases:
RewardPropagationWorkflow
An Alfworld workflow refactored to use the RewardPropagationWorkflow base class.
This workflow manages an Alfworld environment, interacts with it step-by-step using a model, and calculates a final reward based on the episode’s outcome.
- __init__(model: ModelWrapper, task: Task, auxiliary_models: List | None = None, use_openai_client: bool = False)[source]#
- run() List[Experience] [source]#
Run the workflow and return a list of experiences with step-wise rewards.
- step(step_num: int) bool [source]#
Run a single step of your agent application.
- Parameters:
step_num (int) – The current step number.
- Returns:
Whether to continue running the agent application.
- Return type:
bool
- Tips:
You can use the openai client (self.client) to migrate your existing applications at low cost.
- reward(exps: list[Experience]) float [source]#
Calculate the reward for the given experiences of the entire run.
- property max_step_num: int#
Return the maximum number of steps allowed in an episode.
- class trinity.common.workflows.RAFTAlfworldWorkflow(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#
Bases:
Workflow
RAFT workflow for alfworld using trajectory context.
Process: 1. First exploration with normal experience generation 2. Generate SFT data from successful attempt
- can_reset: bool = True#
- can_repeat: bool = True#
- is_async: bool = True#
- __init__(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#
- async run_single_rollout(env) tuple[List[Dict[str, str]], float, bool, int, List[Dict[str, str]]] [source]#
Run a single rollout with RAFT-guided actions
- async eval_alfworld() List[Experience] [source]#
Evaluate a single alfworld trajectory
- async run_async() List[Experience] [source]#
Run the RAFT alfworld workflow and return experiences
- class trinity.common.workflows.RAFTReflectAlfworldWorkflow(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#
Bases:
RAFTAlfworldWorkflow
RAFT workflow for alfworld using trajectory context.
Process: 1. First exploration with normal experience generation 2. If failed, re-explore with first trajectory as context 3. Generate SFT data from successful attempt
- __init__(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#
- async construct_sft_data(first_trajectory: List[Dict[str, str]], success: bool, reward: float, original_steps: int) tuple[List[Dict[str, str]], Dict[str, Any], List[Dict[str, str]]] [source]#
Generate SFT training data using RAFT learning
- async re_explore_with_context(first_trajectory: List[Dict[str, str]], original_reward: float, original_success: bool, original_steps: int) tuple[List[Dict[str, str]], float, bool, int, List[Dict[str, str]]] [source]#
Re-explore with first trajectory as context
- async run_async() List[Experience] [source]#
Run the RAFT alfworld workflow and return experiences
- class trinity.common.workflows.SciWorldWorkflow(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#
Bases:
MultiTurnWorkflow
A workflow for sciworld task.
- is_async: bool = True#
- __init__(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#
- async generate_env_inference_samples(env, rollout_num) List[Experience] [source]#
- async run_async() List[Experience] [source]#
Run workflow in async and return a list of experiences.
- class trinity.common.workflows.AsyncMathBoxedWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
MathBoxedWorkflow
- is_async: bool = True#
- async run_async() List[Experience] [source]#
Run workflow in async and return a list of experiences.
- class trinity.common.workflows.MathBoxedWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
SimpleWorkflow
A workflow for math tasks that give answers in boxed format.
- run() List[Experience] [source]#
Run workflow and return a list of experiences.
- class trinity.common.workflows.AsyncMathRMWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
MathRMWorkflow
- is_async: bool = True#
- async run_async() List[Experience] [source]#
Run workflow in async and return a list of experiences.
- class trinity.common.workflows.MathRMWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
SimpleWorkflow
A workflow for math tasks as introduced in DeepSeek-R1.
- __init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
- run() List[Experience] [source]#
Run workflow and return a list of experiences.
- class trinity.common.workflows.ToolCallWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
SimpleWorkflow
A workflow for toolcall tasks. Prompt construction and reward function from NVlabs/Tool-N1
Only support qwen model for now. You can change the prompt construction and reward calculation by yourself for other models.
- is_async: bool = True#
- async run_async() List[Experience] [source]#
Run workflow in async and return a list of experiences.
- class trinity.common.workflows.AsyncMathEvalWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
MathEvalWorkflow
- is_async: bool = True#
- async run_async() List[Experience] [source]#
Run workflow in async and return a list of experiences.
- class trinity.common.workflows.MathEvalWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
Workflow
A workflow for standard math evaluation.
The evaluation standard and prompting style are follow the Qwen2.5-Math model’s evaluation methodology. For more details on their approach, see: QwenLM/Qwen2.5-Math
- __init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
- run() List[Experience] [source]#
Run workflow and return a list of experiences.
- class trinity.common.workflows.AgentScopeV0ReactMathWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
Workflow
This workflow serves as an example of how to use the agentscope framework within the trinity workflow. We use the AgentScope V0 version here. The code will be deprecated soon.
- can_reset: bool = True#
- __init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
- class trinity.common.workflows.AgentScopeReactMathWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
Workflow
This workflow serves as an example of how to use the agentscope framework within the trinity workflow. We use the AgentScope V1 version here.
- can_reset: bool = True#
- is_async: bool = True#
- __init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
- class trinity.common.workflows.AgentScopeV1ReactSearchWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
Workflow
This workflow serves as an example of how to use the agentscope framework within the trinity workflow.
- can_reset: bool = True#
- is_async: bool = True#
- __init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
- class trinity.common.workflows.AgentScopeReActWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
Workflow
- is_async: bool = True#
- __init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
- async calculate_reward(response) float | Dict[str, float] [source]#
Calculate the reward for the workflow.
- Returns:
The reward value or a dictionary of reward value.
- Return type:
Union[float, Dict[str, float]]
- construct_experiences(reward: float | Dict[str, float]) List[Experience] [source]#
Construct experiences from the agent’s interaction history.
- Parameters:
reward (Union[float, Dict[str, float]]) – The reward value to assign to each experience.
- Returns:
A list of Experience objects.
- Return type:
List
- class trinity.common.workflows.EmailSearchWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
Workflow
Multi-turn Email Search workflow (ReAct-style tool use).
- can_reset: bool = True#
- is_async: bool = True#
- __init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
- calculate_reward(answer_and_sources: Dict) Dict[str, float] [source]#
Ref: calculate_reward in OpenPipe/ART
- class trinity.common.workflows.AsyncMathRULERWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
MathRULERWorkflow
- is_async: bool = True#
- async run_async() List[Experience] [source]#
Modified from SimpleWorkflow.run
- class trinity.common.workflows.MathRULERWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
SimpleWorkflow
A workflow for math with RULER reward function.
Modified from MathWorkflow. Adapted from OpenPipe/ART
- __init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
- reset(task: Task)[source]#
Note that in this workflow, MathRewardFn is only used for calculating the ‘golden reward’, whereasa the rewards used by RL training are calculated by RULER.
- run() List[Experience] [source]#
Modified from SimpleWorkflow.run
- get_ruler_scores(responses: List[Experience], judger: Any) Tuple[bool, List[float]] [source]#
Get RULER scores
- class trinity.common.workflows.MathTrainableRULERWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
SimpleWorkflow
A workflow for math, where the policy model itself serves as a RULER reward model. Modified from MathRULERWorkflow. RULER is adapted from OpenPipe/ART
- __init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
- run() List[Experience] [source]#
Modified from MathRULERWorkflow.run
- get_ruler_responses(responses: List[Experience], judger: Any, ruler_rollout_args: Any, gold_scores: List[float] | None = None) Tuple[float, List[Experience], List[float]] [source]#
Get RULER scores :returns: float
ruler_responses: List[Experience] ruler_scores: List[float]
- Return type:
judge_success_rate
- class trinity.common.workflows.AsyncSimpleMMWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
SimpleMMWorkflow
- is_async: bool = True#
- async run_async() List[Experience] [source]#
Run workflow in async and return a list of experiences.
- class trinity.common.workflows.SimpleMMWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
SimpleWorkflow
A workflow for simple single-round task.
- __init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
- run() List[Experience] [source]#
Run workflow and return a list of experiences.
- class trinity.common.workflows.RubricJudgeWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
SimpleWorkflow
A workflow using LLM-as-a-judge and rubrics to get the reward.
Adapted from https://arxiv.org/pdf/2507.17746
- __init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
- run() List[Experience] [source]#
Modified from SimpleWorkflow.run
- get_judge_reward(response: str, judger: OpenAI) Tuple[bool, float] [source]#
Get rewards with LLM-as-a-judge The prompts are adapted from RAR-IMPLICIT method in https://arxiv.org/pdf/2507.17746
- class trinity.common.workflows.AgentScopeWorkflowAdapter(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
Workflow
Adapter to wrap a agentscope trainable workflow function into a Trinity Workflow.
- is_async: bool = True#
- __init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Initialize the adapter with the task and model.
- construct_experiences(reward: float) List[Experience] [source]#
Construct experiences from the agent’s interaction history.
- Parameters:
reward (float) – The reward value to assign to each experience.
- Returns:
A list of Experience objects.
- Return type:
List
- async run_async() List[Experience] [source]#
Run the workflow asynchronously and return experiences.