trinity.common.workflows.workflow module#

Base Workflow Class

class trinity.common.workflows.workflow.Task(workflow: ~typing.Type[~trinity.common.workflows.workflow.Workflow] | None = None, repeat_times: int | None = None, format_args: ~trinity.common.config.FormatConfig = <factory>, rollout_args: ~trinity.common.config.GenerationConfig = <factory>, workflow_args: dict = <factory>, reward_fn_args: dict = <factory>, is_eval: bool = False, reward_fn: ~typing.Type[~trinity.common.rewards.reward_fn.RewardFn] | None = None, raw_task: dict | None = None, batch_id: int | str = 0, task_id: int | str = 0)[source]#

Bases: dict

A Task class that defines a task and its associated reward function / workflow.

workflow: Type[Workflow] = None#
repeat_times: int | None = None#
format_args: FormatConfig#
rollout_args: GenerationConfig#
workflow_args: dict#
reward_fn_args: dict#
is_eval: bool = False#
reward_fn: Type[RewardFn] | None = None#
raw_task: dict | None = None#
batch_id: int | str = 0#
task_id: int | str = 0#
to_workflow(model: Any, auxiliary_models: List[OpenAI] | None = None) Workflow[source]#

Convert the task to a workflow.

Parameters:
  • model (ModelWrapper) – The rollout model for the workflow.

  • auxiliary_models (List[openai.OpenAI]) – The auxiliary models for the workflow.

Note

model_path attribute is added to the auxiliary_models for use within the workflow.

Returns:

The generated workflow object.

Return type:

Workflow

property task_desc: str | None#
property truth: str | None#
to_dict() dict[source]#
__init__(workflow: ~typing.Type[~trinity.common.workflows.workflow.Workflow] | None = None, repeat_times: int | None = None, format_args: ~trinity.common.config.FormatConfig = <factory>, rollout_args: ~trinity.common.config.GenerationConfig = <factory>, workflow_args: dict = <factory>, reward_fn_args: dict = <factory>, is_eval: bool = False, reward_fn: ~typing.Type[~trinity.common.rewards.reward_fn.RewardFn] | None = None, raw_task: dict | None = None, batch_id: int | str = 0, task_id: int | str = 0) None#
class trinity.common.workflows.workflow.Workflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: ABC

The base workflow class.

A workflow is a runnable object which generates a list of experiences.

__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
property resettable#
property repeatable#

A workflow is repeatable if it can be run multiple times within the run() or run_async() method.

property asynchronous#

Whether the workflow runs in async mode.

property rollout_args#
reset(task: Task)[source]#

Reset the workflow.

set_repeat_times(repeat_times: int, run_id_base: int) None[source]#

Set the number of times to repeat the workflow. :param repeat_times: number of times to repeat the workflow (if repeatable). :type repeat_times: int :param run_id_base: base run_id for setting run_id in experiences. :type run_id_base: int

run() List[Experience][source]#

Run workflow and return a list of experiences.

async run_async() List[Experience][source]#

Run workflow in async and return a list of experiences.

class trinity.common.workflows.workflow.MultiTurnWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: Workflow

The base workflow class for concatenated multi-turn tasks.

__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
set_repeat_times(repeat_times, run_id_base)[source]#

Set the number of times to repeat the workflow. :param repeat_times: number of times to repeat the workflow (if repeatable). :type repeat_times: int :param run_id_base: base run_id for setting run_id in experiences. :type run_id_base: int

abstract run() List[Experience][source]#

Run workflow and return a list of experiences.

process_messages_to_experience(messages, reward, info={}) Experience[source]#
class trinity.common.workflows.workflow.SimpleWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: Workflow

A workflow for simple single-round task.

__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
property resettable#
reset(task: Task)[source]#

Reset the workflow.

set_repeat_times(repeat_times, run_id_base)[source]#

Set the number of times to repeat the workflow. :param repeat_times: number of times to repeat the workflow (if repeatable). :type repeat_times: int :param run_id_base: base run_id for setting run_id in experiences. :type run_id_base: int

format_messages()[source]#

Format messages for the instruct model.

run() List[Experience][source]#

Run workflow and return a list of experiences.

class trinity.common.workflows.workflow.MathWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#

Bases: SimpleWorkflow

A workflow for math tasks as introduced in DeepSeek-R1.

__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[OpenAI] | None = None)[source]#
reset(task: Task)[source]#

Reset the workflow.