trinity.common.workflows.workflow module#

Base Workflow Class

class trinity.common.workflows.workflow.Task(workflow: ~typing.Type[~trinity.common.workflows.workflow.Workflow] = None, repeat_times: int | None = None, format_args: ~trinity.common.config.FormatConfig = <factory>, rollout_args: ~trinity.common.config.GenerationConfig = <factory>, workflow_args: dict = <factory>, reward_fn_args: dict = <factory>, is_eval: bool = False, reward_fn: ~typing.Type[~trinity.common.rewards.reward_fn.RewardFn] | None = None, raw_task: dict | None = None, batch_id: int | str = '', task_id: int | str = '', index: dict = <factory>)[源代码]#

基类:dict

A Task class that defines a task and its associated reward function / workflow.

workflow: Type[Workflow] = None#
repeat_times: int | None = None#
format_args: FormatConfig#
rollout_args: GenerationConfig#
workflow_args: dict#
reward_fn_args: dict#
is_eval: bool = False#
reward_fn: Type[RewardFn] | None = None#
raw_task: dict | None = None#
batch_id: int | str = ''#
task_id: int | str = ''#
index: dict#
to_workflow(model: ModelWrapper, auxiliary_models: List[ModelWrapper] | None = None) Workflow[源代码]#

Convert the task to a workflow.

参数:
  • model (ModelWrapper) -- The rollout model for the workflow.

  • auxiliary_models (List[ModelWrapper]) -- The auxiliary model wrappers. Workflows can access both the ModelWrapper and OpenAI client via self.auxiliary_model_wrappers and self.auxiliary_models respectively.

返回:

The generated workflow object.

返回类型:

Workflow

property task_desc: str | None#
property truth: str | None#
to_dict() dict[源代码]#
__init__(workflow: ~typing.Type[~trinity.common.workflows.workflow.Workflow] = None, repeat_times: int | None = None, format_args: ~trinity.common.config.FormatConfig = <factory>, rollout_args: ~trinity.common.config.GenerationConfig = <factory>, workflow_args: dict = <factory>, reward_fn_args: dict = <factory>, is_eval: bool = False, reward_fn: ~typing.Type[~trinity.common.rewards.reward_fn.RewardFn] | None = None, raw_task: dict | None = None, batch_id: int | str = '', task_id: int | str = '', index: dict = <factory>) None#
class trinity.common.workflows.workflow.Workflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[ModelWrapper] | None = None)[源代码]#

基类:object

The base workflow class.

A workflow is a runnable object which generates a list of experiences.

auxiliary_model_wrappers#

List of ModelWrapper instances for auxiliary models.

auxiliary_models#

List of OpenAI clients (sync or async based on is_async) for auxiliary models.

Type:

Optional[Union[List[openai.OpenAI], List[openai.AsyncOpenAI]]]

can_reset: bool = False#
can_repeat: bool = False#
is_async: bool = False#
__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[ModelWrapper] | None = None)[源代码]#
auxiliary_models: List[openai.OpenAI] | List[openai.AsyncOpenAI] | None#
property resettable#

Deprecated, use cls.can_reset instead.

property repeatable#

Deprecated, use cls.can_repeat instead. A workflow is repeatable if it can be run multiple times within the run() or run_async() method.

property asynchronous#

Deprecated, use cls.is_async instead. Whether the workflow runs in async mode.

reset(task: Task)[源代码]#

Reset the workflow.

set_repeat_times(repeat_times: int, run_id_base: int) None[源代码]#

Set the number of times to repeat the workflow. :param repeat_times: number of times to repeat the workflow (if repeatable). :type repeat_times: int :param run_id_base: base run_id for setting run_id in experiences. :type run_id_base: int

run() List[Experience][源代码]#

Run workflow and return a list of experiences.

async run_async() List[Experience][源代码]#

Run workflow in async and return a list of experiences.

class trinity.common.workflows.workflow.MultiTurnWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[ModelWrapper] | None = None)[源代码]#

基类:Workflow

The base workflow class for concatenated multi-turn tasks.

can_repeat: bool = True#
__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[ModelWrapper] | None = None)[源代码]#
set_repeat_times(repeat_times, run_id_base)[源代码]#

Set the number of times to repeat the workflow. :param repeat_times: number of times to repeat the workflow (if repeatable). :type repeat_times: int :param run_id_base: base run_id for setting run_id in experiences. :type run_id_base: int

process_messages_to_experience(messages, reward, info={}, truncate_status=None) Experience[源代码]#
class trinity.common.workflows.workflow.BaseSimpleWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[ModelWrapper] | None = None)[源代码]#

基类:Workflow

__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[ModelWrapper] | None = None)[源代码]#
reset(task: Task)[源代码]#

Reset the workflow.

set_repeat_times(repeat_times, run_id_base)[源代码]#

Set the number of times to repeat the workflow. :param repeat_times: number of times to repeat the workflow (if repeatable). :type repeat_times: int :param run_id_base: base run_id for setting run_id in experiences. :type run_id_base: int

property rollout_args#
format_messages()[源代码]#

Format messages for the instruct model.

class trinity.common.workflows.workflow.SimpleWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[ModelWrapper] | None = None)[源代码]#

基类:BaseSimpleWorkflow

A workflow for simple single-round task.

can_reset: bool = True#
can_repeat: bool = True#
run() List[Experience][源代码]#

Run workflow and return a list of experiences.

class trinity.common.workflows.workflow.AsyncSimpleWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[ModelWrapper] | None = None)[源代码]#

基类:BaseSimpleWorkflow

is_async: bool = True#
async run_async() List[Experience][源代码]#

Run workflow in async and return a list of experiences.

class trinity.common.workflows.workflow.MathWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[ModelWrapper] | None = None)[源代码]#

基类:SimpleWorkflow

A workflow for math tasks as introduced in DeepSeek-R1.

__init__(*, task: Task, model: ModelWrapper, auxiliary_models: List[ModelWrapper] | None = None)[源代码]#
reset(task: Task)[源代码]#

Reset the workflow.

class trinity.common.workflows.workflow.AsyncMathWorkflow(*, task: Task, model: ModelWrapper, auxiliary_models: List[ModelWrapper] | None = None)[源代码]#

基类:AsyncSimpleWorkflow, MathWorkflow