trinity.common.workflows package#
Submodules#
Module contents#
Workflow module
- class trinity.common.workflows.Task(workflow: ~typing.Type[~trinity.common.workflows.workflow.Workflow] | None = None, repeat_times: int | None = None, format_args: ~trinity.common.config.FormatConfig = <factory>, rollout_args: ~trinity.common.config.GenerationConfig = <factory>, workflow_args: dict = <factory>, reward_fn_args: dict = <factory>, is_eval: bool = False, reward_fn: ~typing.Type[~trinity.common.rewards.reward_fn.RewardFn] | None = None, raw_task: dict | None = None, batch_id: int | str = '', task_id: int | str = '')[source]#
Bases:
dict
A Task class that defines a task and its associated reward function / workflow.
- repeat_times: int | None = None#
- format_args: FormatConfig#
- rollout_args: GenerationConfig#
- workflow_args: dict#
- reward_fn_args: dict#
- is_eval: bool = False#
- raw_task: dict | None = None#
- batch_id: int | str = ''#
- task_id: int | str = ''#
- to_workflow(model: Any, auxiliary_models: List[OpenAI] | None = None) Workflow [source]#
Convert the task to a workflow.
- Parameters:
model (ModelWrapper) – The rollout model for the workflow.
auxiliary_models (List[openai.OpenAI]) – The auxiliary models for the workflow.
Note
model_path attribute is added to the auxiliary_models for use within the workflow.
- Returns:
The generated workflow object.
- Return type:
- property task_desc: str | None#
- property truth: str | None#
- __init__(workflow: ~typing.Type[~trinity.common.workflows.workflow.Workflow] | None = None, repeat_times: int | None = None, format_args: ~trinity.common.config.FormatConfig = <factory>, rollout_args: ~trinity.common.config.GenerationConfig = <factory>, workflow_args: dict = <factory>, reward_fn_args: dict = <factory>, is_eval: bool = False, reward_fn: ~typing.Type[~trinity.common.rewards.reward_fn.RewardFn] | None = None, raw_task: dict | None = None, batch_id: int | str = '', task_id: int | str = '') None #
- class trinity.common.workflows.Workflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
ABC
The base workflow class.
A workflow is a runnable object which generates a list of experiences.
- __init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
- property resettable#
- property repeatable#
A workflow is repeatable if it can be run multiple times within the run() or run_async() method.
- property asynchronous#
Whether the workflow runs in async mode.
- property rollout_args#
- set_repeat_times(repeat_times: int, run_id_base: int) None [source]#
Set the number of times to repeat the workflow. :param repeat_times: number of times to repeat the workflow (if repeatable). :type repeat_times: int :param run_id_base: base run_id for setting run_id in experiences. :type run_id_base: int
- run() List[Experience] [source]#
Run workflow and return a list of experiences.
- async run_async() List[Experience] [source]#
Run workflow in async and return a list of experiences.
- class trinity.common.workflows.AsyncSimpleWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
Workflow
- property asynchronous#
Whether the workflow runs in async mode.
- async run_async() List[Experience] [source]#
Run workflow in async and return a list of experiences.
- class trinity.common.workflows.SimpleWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
Workflow
A workflow for simple single-round task.
- __init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
- property resettable#
- set_repeat_times(repeat_times, run_id_base)[source]#
Set the number of times to repeat the workflow. :param repeat_times: number of times to repeat the workflow (if repeatable). :type repeat_times: int :param run_id_base: base run_id for setting run_id in experiences. :type run_id_base: int
- run() List[Experience] [source]#
Run workflow and return a list of experiences.
- class trinity.common.workflows.AsyncMathWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
AsyncSimpleWorkflow
,MathWorkflow
- class trinity.common.workflows.MathWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
SimpleWorkflow
A workflow for math tasks as introduced in DeepSeek-R1.
- __init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
- class trinity.common.workflows.WebShopWorkflow(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#
Bases:
MultiTurnWorkflow
A workflow for webshop task.
- __init__(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#
- property resettable#
- async generate_env_inference_samples(env, session_id, rollout_num) List[Experience] [source]#
- async run_async() List[Experience] [source]#
Run workflow in async and return a list of experiences.
- class trinity.common.workflows.AlfworldWorkflow(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#
Bases:
MultiTurnWorkflow
A workflow for alfworld task.
- __init__(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#
- property asynchronous#
Whether the workflow runs in async mode.
- async generate_env_inference_samples(env, rollout_num) List[Experience] [source]#
- async run_async() List[Experience] [source]#
Run workflow in async and return a list of experiences.
- class trinity.common.workflows.StepWiseAlfworldWorkflow(model: ModelWrapper, task: Task, auxiliary_models: List | None = None, use_openai_client: bool = False)[source]#
Bases:
RewardPropagationWorkflow
An Alfworld workflow refactored to use the RewardPropagationWorkflow base class.
This workflow manages an Alfworld environment, interacts with it step-by-step using a model, and calculates a final reward based on the episode’s outcome.
- __init__(model: ModelWrapper, task: Task, auxiliary_models: List | None = None, use_openai_client: bool = False)[source]#
- run() List[Experience] [source]#
Run the workflow and return a list of experiences with step-wise rewards.
- step(step_num: int) bool [source]#
Run a single step of your agent application.
- Parameters:
step_num (int) – The current step number.
- Returns:
Whether to continue running the agent application.
- Return type:
bool
- Tips:
You can use the openai client (self.client) to migrate your existing applications at low cost.
- reward(exps: list[Experience]) float [source]#
Calculate the reward for the given experiences of the entire run.
- property max_step_num: int#
Return the maximum number of steps allowed in an episode.
- class trinity.common.workflows.RAFTAlfworldWorkflow(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#
Bases:
Workflow
RAFT workflow for alfworld using trajectory context.
Process: 1. First exploration with normal experience generation 2. Generate SFT data from successful attempt
- __init__(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#
- property asynchronous#
Whether the workflow runs in async mode.
- async run_single_rollout(env) tuple[List[Dict[str, str]], float, bool, int, List[Dict[str, str]]] [source]#
Run a single rollout with RAFT-guided actions
- async eval_alfworld() List[Experience] [source]#
Evaluate a single alfworld trajectory
- async run_async() List[Experience] [source]#
Run the RAFT alfworld workflow and return experiences
- class trinity.common.workflows.RAFTReflectAlfworldWorkflow(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#
Bases:
RAFTAlfworldWorkflow
RAFT workflow for alfworld using trajectory context.
Process: 1. First exploration with normal experience generation 2. If failed, re-explore with first trajectory as context 3. Generate SFT data from successful attempt
- __init__(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#
- property asynchronous#
Whether the workflow runs in async mode.
- async construct_sft_data(first_trajectory: List[Dict[str, str]], success: bool, reward: float, original_steps: int) tuple[List[Dict[str, str]], Dict[str, Any], List[Dict[str, str]]] [source]#
Generate SFT training data using RAFT learning
- async re_explore_with_context(first_trajectory: List[Dict[str, str]], original_reward: float, original_success: bool, original_steps: int) tuple[List[Dict[str, str]], float, bool, int, List[Dict[str, str]]] [source]#
Re-explore with first trajectory as context
- async run_async() List[Experience] [source]#
Run the RAFT alfworld workflow and return experiences
- class trinity.common.workflows.SciWorldWorkflow(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#
Bases:
MultiTurnWorkflow
A workflow for sciworld task.
- __init__(model: ModelWrapper, task: Task, auxiliary_models: List | None = None)[source]#
- property asynchronous#
Whether the workflow runs in async mode.
- async generate_env_inference_samples(env, rollout_num) List[Experience] [source]#
- async run_async() List[Experience] [source]#
Run workflow in async and return a list of experiences.
- class trinity.common.workflows.AsyncMathBoxedWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
MathBoxedWorkflow
- property asynchronous#
Whether the workflow runs in async mode.
- async run_async() List[Experience] [source]#
Run workflow in async and return a list of experiences.
- class trinity.common.workflows.MathBoxedWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
SimpleWorkflow
A workflow for math tasks that give answers in boxed format.
- run() List[Experience] [source]#
Run workflow and return a list of experiences.
- class trinity.common.workflows.AsyncMathRMWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
MathRMWorkflow
- property asynchronous#
Whether the workflow runs in async mode.
- async run_async() List[Experience] [source]#
Run workflow in async and return a list of experiences.
- class trinity.common.workflows.MathRMWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
SimpleWorkflow
A workflow for math tasks as introduced in DeepSeek-R1.
- __init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
- run() List[Experience] [source]#
Run workflow and return a list of experiences.
- class trinity.common.workflows.ToolCallWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
SimpleWorkflow
A workflow for toolcall tasks. Prompt construction and reward function from NVlabs/Tool-N1
Only support qwen model for now. You can change the prompt construction and reward calculation by yourself for other models.
- property asynchronous#
Whether the workflow runs in async mode.
- async run_async() List[Experience] [source]#
Run workflow in async and return a list of experiences.
- class trinity.common.workflows.AsyncMathEvalWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
MathEvalWorkflow
- property asynchronous#
Whether the workflow runs in async mode.
- async run_async() List[Experience] [source]#
Run workflow in async and return a list of experiences.
- class trinity.common.workflows.MathEvalWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
Workflow
A workflow for standard math evaluation.
The evaluation standard and prompting style are follow the Qwen2.5-Math model’s evaluation methodology. For more details on their approach, see: QwenLM/Qwen2.5-Math
- __init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
- property resettable#
- property repeatable#
A workflow is repeatable if it can be run multiple times within the run() or run_async() method.
- run() List[Experience] [source]#
Run workflow and return a list of experiences.
- class trinity.common.workflows.AgentScopeV0ReactMathWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
Workflow
This workflow serves as an example of how to use the agentscope framework within the trinity workflow. We use the AgentScope V0 version here. The code will be deprecated soon.
- __init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
- property resettable#
- property repeatable#
A workflow is repeatable if it can be run multiple times within the run() or run_async() method.
- class trinity.common.workflows.AgentScopeReactMathWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
Workflow
This workflow serves as an example of how to use the agentscope framework within the trinity workflow. We use the AgentScope V1 version here.
- __init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
- property resettable#
- property repeatable#
A workflow is repeatable if it can be run multiple times within the run() or run_async() method.
- property asynchronous#
Whether the workflow runs in async mode.
- class trinity.common.workflows.AgentScopeV1ReactSearchWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
Workflow
This workflow serves as an example of how to use the agentscope framework within the trinity workflow.
- __init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
- property resettable#
- property asynchronous#
Whether the workflow runs in async mode.
- property repeatable#
A workflow is repeatable if it can be run multiple times within the run() or run_async() method.
- class trinity.common.workflows.EmailSearchWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
Workflow
Multi-turn Email Search workflow (ReAct-style tool use).
- __init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
- property repeatable: bool#
A workflow is repeatable if it can be run multiple times within the run() or run_async() method.
- property resettable#
- calculate_reward(answer_and_sources: Dict) Dict[str, float] [source]#
Ref: calculate_reward in OpenPipe/ART
- class trinity.common.workflows.AsyncMathRULERWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
MathRULERWorkflow
- property asynchronous#
Whether the workflow runs in async mode.
- async run_async() List[Experience] [source]#
Modified from SimpleWorkflow.run
- class trinity.common.workflows.MathRULERWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
SimpleWorkflow
A workflow for math with RULER reward function.
Modified from MathWorkflow. Adapted from OpenPipe/ART
- __init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
- reset(task: Task)[source]#
Note that in this workflow, MathRewardFn is only used for calculating the ‘golden reward’, whereasa the rewards used by RL training are calculated by RULER.
- run() List[Experience] [source]#
Modified from SimpleWorkflow.run
- get_ruler_scores(responses: List[Experience], judger: Any) Tuple[bool, List[float]] [source]#
Get RULER scores
- class trinity.common.workflows.MathTrainableRULERWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
SimpleWorkflow
A workflow for math, where the policy model itself serves as a RULER reward model. Modified from MathRULERWorkflow. RULER is adapted from OpenPipe/ART
- __init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
- run() List[Experience] [source]#
Modified from MathRULERWorkflow.run
- get_ruler_responses(responses: List[Experience], judger: Any, ruler_rollout_args: Any, gold_scores: List[float] | None = None) Tuple[float, List[Experience], List[float]] [source]#
Get RULER scores :returns: float
ruler_responses: List[Experience] ruler_scores: List[float]
- Return type:
judge_success_rate
- class trinity.common.workflows.AsyncSimpleMMWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
SimpleMMWorkflow
- property asynchronous#
Whether the workflow runs in async mode.
- async run_async() List[Experience] [source]#
Run workflow in async and return a list of experiences.
- class trinity.common.workflows.SimpleMMWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
Bases:
SimpleWorkflow
A workflow for simple single-round task.
- __init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
- run() List[Experience] [source]#
Run workflow and return a list of experiences.