trinity.common.workflows package#
Submodules#
Module contents#
Workflow module
- class trinity.common.workflows.Task(workflow: ~typing.Type[~trinity.common.workflows.workflow.Workflow] | None = None, repeat_times: int | None = None, format_args: ~trinity.common.config.FormatConfig = <factory>, rollout_args: ~trinity.common.config.GenerationConfig = <factory>, workflow_args: dict = <factory>, reward_fn_args: dict = <factory>, is_eval: bool = False, reward_fn: ~typing.Type[~trinity.common.rewards.reward_fn.RewardFn] | None = None, raw_task: dict | None = None, batch_id: int | str = '', task_id: int | str = '', index: dict = <factory>)[source]#
Bases:
dictA Task class that defines a task and its associated reward function / workflow.
- repeat_times: int | None = None#
- format_args: FormatConfig#
- rollout_args: GenerationConfig#
- workflow_args: dict#
- reward_fn_args: dict#
- is_eval: bool = False#
- raw_task: dict | None = None#
- batch_id: int | str = ''#
- task_id: int | str = ''#
- index: dict#
- to_workflow(model: Any, auxiliary_models: List[OpenAI] | None = None) Workflow[source]#
Convert the task to a workflow.
- Parameters:
model (ModelWrapper) – The rollout model for the workflow.
auxiliary_models (List[openai.OpenAI]) – The auxiliary models for the workflow.
Note
model_path attribute is added to the auxiliary_models for use within the workflow.
- Returns:
The generated workflow object.
- Return type:
- property task_desc: str | None#
- property truth: str | None#
- __init__(workflow: ~typing.Type[~trinity.common.workflows.workflow.Workflow] | None = None, repeat_times: int | None = None, format_args: ~trinity.common.config.FormatConfig = <factory>, rollout_args: ~trinity.common.config.GenerationConfig = <factory>, workflow_args: dict = <factory>, reward_fn_args: dict = <factory>, is_eval: bool = False, reward_fn: ~typing.Type[~trinity.common.rewards.reward_fn.RewardFn] | None = None, raw_task: dict | None = None, batch_id: int | str = '', task_id: int | str = '', index: dict = <factory>) None#
- class trinity.common.workflows.Workflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
objectThe base workflow class.
A workflow is a runnable object which generates a list of experiences.
- can_reset: bool = False#
- can_repeat: bool = False#
- is_async: bool = False#
- __init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
- property resettable#
Deprecated, use cls.can_reset instead.
- property repeatable#
Deprecated, use cls.can_repeat instead. A workflow is repeatable if it can be run multiple times within the run() or run_async() method.
- property asynchronous#
Deprecated, use cls.is_async instead. Whether the workflow runs in async mode.
- set_repeat_times(repeat_times: int, run_id_base: int) None[source]#
Set the number of times to repeat the workflow. :param repeat_times: number of times to repeat the workflow (if repeatable). :type repeat_times: int :param run_id_base: base run_id for setting run_id in experiences. :type run_id_base: int
- run() List[Experience][source]#
Run workflow and return a list of experiences.
- async run_async() List[Experience][source]#
Run workflow in async and return a list of experiences.
- class trinity.common.workflows.AsyncSimpleWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
Workflow- is_async: bool = True#
- async run_async() List[Experience][source]#
Run workflow in async and return a list of experiences.
- class trinity.common.workflows.SimpleWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
WorkflowA workflow for simple single-round task.
- can_reset: bool = True#
- can_repeat: bool = True#
- __init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
- set_repeat_times(repeat_times, run_id_base)[source]#
Set the number of times to repeat the workflow. :param repeat_times: number of times to repeat the workflow (if repeatable). :type repeat_times: int :param run_id_base: base run_id for setting run_id in experiences. :type run_id_base: int
- property rollout_args#
- run() List[Experience][source]#
Run workflow and return a list of experiences.
- class trinity.common.workflows.AsyncMathWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
AsyncSimpleWorkflow,MathWorkflow
- class trinity.common.workflows.MathWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
SimpleWorkflowA workflow for math tasks as introduced in DeepSeek-R1.
- __init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
- class trinity.common.workflows.WebShopWorkflow(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#
Bases:
MultiTurnWorkflowA workflow for webshop task.
- can_reset: bool = True#
- is_async: bool = True#
- __init__(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#
- async generate_env_inference_samples(env, session_id, rollout_num) List[Experience][source]#
- async run_async() List[Experience][source]#
Run workflow in async and return a list of experiences.
- class trinity.common.workflows.AlfworldWorkflow(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#
Bases:
MultiTurnWorkflowA workflow for alfworld task.
- is_async: bool = True#
- __init__(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#
- async generate_env_inference_samples(env, rollout_num) List[Experience][source]#
- async run_async() List[Experience][source]#
Run workflow in async and return a list of experiences.
- class trinity.common.workflows.StepWiseAlfworldWorkflow(model: ModelWrapper, task: Task, auxiliary_models: List | None = None, use_openai_client: bool = False)[source]#
Bases:
RewardPropagationWorkflowAn Alfworld workflow refactored to use the RewardPropagationWorkflow base class.
This workflow manages an Alfworld environment, interacts with it step-by-step using a model, and calculates a final reward based on the episode’s outcome.
- __init__(model: ModelWrapper, task: Task, auxiliary_models: List | None = None, use_openai_client: bool = False)[source]#
- run() List[Experience][source]#
Run the workflow and return a list of experiences with step-wise rewards.
- step(step_num: int) bool[source]#
Run a single step of your agent application.
- Parameters:
step_num (int) – The current step number.
- Returns:
Whether to continue running the agent application.
- Return type:
bool
- Tips:
You can use the openai client (self.client) to migrate your existing applications at low cost.
- reward(exps: list[Experience]) float[source]#
Calculate the reward for the given experiences of the entire run.
- property max_step_num: int#
Return the maximum number of steps allowed in an episode.
- class trinity.common.workflows.RAFTAlfworldWorkflow(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#
Bases:
WorkflowRAFT workflow for alfworld using trajectory context.
Process: 1. First exploration with normal experience generation 2. Generate SFT data from successful attempt
- can_reset: bool = True#
- can_repeat: bool = True#
- is_async: bool = True#
- __init__(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#
- async run_single_rollout(env) tuple[List[Dict[str, str]], float, bool, int, List[Dict[str, str]]][source]#
Run a single rollout with RAFT-guided actions
- async eval_alfworld() List[Experience][source]#
Evaluate a single alfworld trajectory
- async run_async() List[Experience][source]#
Run the RAFT alfworld workflow and return experiences
- class trinity.common.workflows.RAFTReflectAlfworldWorkflow(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#
Bases:
RAFTAlfworldWorkflowRAFT workflow for alfworld using trajectory context.
Process: 1. First exploration with normal experience generation 2. If failed, re-explore with first trajectory as context 3. Generate SFT data from successful attempt
- __init__(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#
- async construct_sft_data(first_trajectory: List[Dict[str, str]], success: bool, reward: float, original_steps: int) tuple[List[Dict[str, str]], Dict[str, Any], List[Dict[str, str]]][source]#
Generate SFT training data using RAFT learning
- async re_explore_with_context(first_trajectory: List[Dict[str, str]], original_reward: float, original_success: bool, original_steps: int) tuple[List[Dict[str, str]], float, bool, int, List[Dict[str, str]]][source]#
Re-explore with first trajectory as context
- async run_async() List[Experience][source]#
Run the RAFT alfworld workflow and return experiences
- class trinity.common.workflows.SciWorldWorkflow(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#
Bases:
MultiTurnWorkflowA workflow for sciworld task.
- is_async: bool = True#
- __init__(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#
- async generate_env_inference_samples(env, rollout_num) List[Experience][source]#
- async run_async() List[Experience][source]#
Run workflow in async and return a list of experiences.
- class trinity.common.workflows.AsyncMathBoxedWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
MathBoxedWorkflow- is_async: bool = True#
- async run_async() List[Experience][source]#
Run workflow in async and return a list of experiences.
- class trinity.common.workflows.MathBoxedWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
SimpleWorkflowA workflow for math tasks that give answers in boxed format.
- run() List[Experience][source]#
Run workflow and return a list of experiences.
- class trinity.common.workflows.AsyncMathRMWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
MathRMWorkflow- is_async: bool = True#
- async run_async() List[Experience][source]#
Run workflow in async and return a list of experiences.
- class trinity.common.workflows.MathRMWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
SimpleWorkflowA workflow for math tasks as introduced in DeepSeek-R1.
- __init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
- run() List[Experience][source]#
Run workflow and return a list of experiences.
- class trinity.common.workflows.ToolCallWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
SimpleWorkflowA workflow for toolcall tasks. Prompt construction and reward function from NVlabs/Tool-N1
Only support qwen model for now. You can change the prompt construction and reward calculation by yourself for other models.
- is_async: bool = True#
- async run_async() List[Experience][source]#
Run workflow in async and return a list of experiences.
- class trinity.common.workflows.AsyncMathEvalWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
MathEvalWorkflow- is_async: bool = True#
- async run_async() List[Experience][source]#
Run workflow in async and return a list of experiences.
- class trinity.common.workflows.MathEvalWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
WorkflowA workflow for standard math evaluation.
The evaluation standard and prompting style are follow the Qwen2.5-Math model’s evaluation methodology. For more details on their approach, see: QwenLM/Qwen2.5-Math
- __init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
- run() List[Experience][source]#
Run workflow and return a list of experiences.
- class trinity.common.workflows.AgentScopeV0ReactMathWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
WorkflowThis workflow serves as an example of how to use the agentscope framework within the trinity workflow. We use the AgentScope V0 version here. The code will be deprecated soon.
- can_reset: bool = True#
- __init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
- class trinity.common.workflows.AgentScopeReactMathWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
WorkflowThis workflow serves as an example of how to use the agentscope framework within the trinity workflow. We use the AgentScope V1 version here.
- can_reset: bool = True#
- is_async: bool = True#
- __init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
- class trinity.common.workflows.AgentScopeV1ReactSearchWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
WorkflowThis workflow serves as an example of how to use the agentscope framework within the trinity workflow.
- can_reset: bool = True#
- is_async: bool = True#
- __init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
- class trinity.common.workflows.AgentScopeReActWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
Workflow- is_async: bool = True#
- __init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
- async calculate_reward(response) float | Dict[str, float][source]#
Calculate the reward for the workflow.
- Returns:
The reward value or a dictionary of reward value.
- Return type:
Union[float, Dict[str, float]]
- construct_experiences(reward: float | Dict[str, float]) List[Experience][source]#
Construct experiences from the agent’s interaction history.
- Parameters:
reward (Union[float, Dict[str, float]]) – The reward value to assign to each experience.
- Returns:
A list of Experience objects.
- Return type:
List
- class trinity.common.workflows.EmailSearchWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
WorkflowMulti-turn Email Search workflow (ReAct-style tool use).
- can_reset: bool = True#
- is_async: bool = True#
- __init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
- async calculate_reward(answer_and_sources: Dict) Dict[str, float][source]#
Ref: calculate_reward in OpenPipe/ART
- class trinity.common.workflows.AsyncMathRULERWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
MathRULERWorkflow- is_async: bool = True#
- async run_async() List[Experience][source]#
Modified from SimpleWorkflow.run
- class trinity.common.workflows.MathRULERWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
SimpleWorkflowA workflow for math with RULER reward function.
Modified from MathWorkflow. Adapted from OpenPipe/ART
- __init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
- reset(task: Task)[source]#
Note that in this workflow, MathRewardFn is only used for calculating the ‘golden reward’, whereasa the rewards used by RL training are calculated by RULER.
- run() List[Experience][source]#
Modified from SimpleWorkflow.run
- get_ruler_scores(responses: List[Experience], judger: Any) Tuple[bool, List[float]][source]#
Get RULER scores
- class trinity.common.workflows.MathTrainableRULERWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
SimpleWorkflowA workflow for math, where the policy model itself serves as a RULER reward model. Modified from MathRULERWorkflow. RULER is adapted from OpenPipe/ART
- __init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
- run() List[Experience][source]#
Modified from MathRULERWorkflow.run
- get_ruler_responses(responses: List[Experience], judger: Any, ruler_rollout_args: Any, gold_scores: List[float] | None = None) Tuple[float, List[Experience], List[float]][source]#
Get RULER scores :returns: float
ruler_responses: List[Experience] ruler_scores: List[float]
- Return type:
judge_success_rate
- class trinity.common.workflows.AsyncSimpleMMWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
SimpleMMWorkflow- is_async: bool = True#
- async run_async() List[Experience][source]#
Run workflow in async and return a list of experiences.
- class trinity.common.workflows.SimpleMMWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
SimpleWorkflowA workflow for simple single-round task.
- __init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
- run() List[Experience][source]#
Run workflow and return a list of experiences.
- class trinity.common.workflows.RubricJudgeWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
SimpleWorkflowA workflow using LLM-as-a-judge and rubrics to get the reward.
Adapted from https://arxiv.org/pdf/2507.17746
- __init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
- run() List[Experience][source]#
Modified from SimpleWorkflow.run
- get_judge_reward(response: str, judger: OpenAI) Tuple[bool, float][source]#
Get rewards with LLM-as-a-judge The prompts are adapted from RAR-IMPLICIT method in https://arxiv.org/pdf/2507.17746
- class trinity.common.workflows.AgentScopeWorkflowAdapter(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
WorkflowAdapter to wrap a agentscope trainable workflow function into a Trinity Workflow.
- is_async: bool = True#
- __init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Initialize the adapter with the task and model.
- construct_experiences(reward: float) List[Experience][source]#
Construct experiences from the agent’s interaction history.
- Parameters:
reward (float) – The reward value to assign to each experience.
- Returns:
A list of Experience objects.
- Return type:
List
- async run_async() List[Experience][source]#
Run the workflow asynchronously and return experiences.