trinity.explorer

Submodules

trinity.explorer.explorer module

The explorer module

class trinity.explorer.explorer.Explorer(config: Config)[source]

Bases: object

Responsible for exploring the taskset.

__init__(config: Config)[source]
async setup_weight_sync_group(master_address: str, master_port: int, state_dict_meta: List | None = None)[source]
async prepare() None[source]

Preparation before running.

async get_weight(name: str) Tensor[source]

Get the weight of the loaded model (For checkpoint weights update).

async explore() str[source]
The dreamming loop for explorer and trainer.
<—————————————– one period ———————————————-> |
explorer | <– step_1 –> | <– step_2 –> | … | <– step_n –> | <– eval –> | <– [idle] –> | <– sync –> |

trainer | <– idle –> | <– step_1 –> | <– step_2 –> | … | <– step_n –> | <– [idle] –> | <– sync –> |

explore_step() bool[source]
need_sync() bool[source]
eval(eval_explore_step_num: int)[source]

Evaluation on all evaluation data samples.

async benchmark() bool[source]

Benchmark the model checkpoints.

wait_for_workflow_done() None[source]

Wait for workflow to finish.

async sync_weight() None[source]

Synchronize model weights.

async running_status() RunningStatus[source]
flush_log(step: int) None[source]

Flush the log of the current step.

shutdown() None[source]

trinity.explorer.runner_pool module

Runner pool for running tasks in parallel. Modified from ray.util.actor_pool.ActorPool.

class trinity.explorer.runner_pool.RunnerPool(config: Config, models: List[InferenceModel], auxiliary_models: List[List[InferenceModel]] | None = None)[source]

Bases: object

A pool of WorkflowRunner.

The RunnerPool will automatically handle the exceptions during the workflow and retry when the workflow fails or timeout. The number of max retries is set in config.explorer.max_retry_times and the max timeout is set in config.explorer.max_timeout.

__init__(config: Config, models: List[InferenceModel], auxiliary_models: List[List[InferenceModel]] | None = None)[source]
run_tasks(tasks: List[Task] | Task) None[source]

Schedule a list of tasks to run in the pool.

Parameters:

tasks – A list of tasks.

has_next()[source]

Returns whether there are any pending results to return.

Returns:

True if there are any pending results not yet returned.

get_next_unorder() List[Status][source]

Returns the next pending result unorder.

Returns:

The return status of the next task.

get_next() Status[source]

Returns the next pending result in order.

This returns the next task result, blocking for up to the specified timeout until it is available.

Returns:

The return status of the next task.

has_free()[source]

Returns whether there are any idle actors available.

Returns:

True if there are any idle actors and no pending submits.

pop_idle()[source]

Removes an idle actor from the pool.

Returns:

An idle actor if one is available. None if no actor was free to be removed.

trinity.explorer.workflow_runner module

The Workflow Runner Moudle.

class trinity.explorer.workflow_runner.Status(ok: bool, metric: dict[str, float], message: str | None = None)[source]

Bases: object

Status of the task running result.

ok: bool
metric: dict[str, float]
message: str | None = None
__init__(ok: bool, metric: dict[str, float], message: str | None = None) None
class trinity.explorer.workflow_runner.WorkflowRunner(config: Config, model: InferenceModel, auxiliary_models: List[InferenceModel] | None = None)[source]

Bases: object

A Ray remote actor to run the workflow and put the returned experiences into the buffer.

__init__(config: Config, model: InferenceModel, auxiliary_models: List[InferenceModel] | None = None) None[source]
is_alive()[source]
run_task(task: Task) Status[source]

Run the task and return the states.

Module contents

class trinity.explorer.Explorer(config: Config)[source]

Bases: object

Responsible for exploring the taskset.

__init__(config: Config)[source]
async setup_weight_sync_group(master_address: str, master_port: int, state_dict_meta: List | None = None)[source]
async prepare() None[source]

Preparation before running.

async get_weight(name: str) Tensor[source]

Get the weight of the loaded model (For checkpoint weights update).

async explore() str[source]
The dreamming loop for explorer and trainer.
<—————————————– one period ———————————————-> |
explorer | <– step_1 –> | <– step_2 –> | … | <– step_n –> | <– eval –> | <– [idle] –> | <– sync –> |

trainer | <– idle –> | <– step_1 –> | <– step_2 –> | … | <– step_n –> | <– [idle] –> | <– sync –> |

explore_step() bool[source]
need_sync() bool[source]
eval(eval_explore_step_num: int)[source]

Evaluation on all evaluation data samples.

async benchmark() bool[source]

Benchmark the model checkpoints.

wait_for_workflow_done() None[source]

Wait for workflow to finish.

async sync_weight() None[source]

Synchronize model weights.

async running_status() RunningStatus[source]
flush_log(step: int) None[source]

Flush the log of the current step.

shutdown() None[source]
class trinity.explorer.RunnerPool(config: Config, models: List[InferenceModel], auxiliary_models: List[List[InferenceModel]] | None = None)[source]

Bases: object

A pool of WorkflowRunner.

The RunnerPool will automatically handle the exceptions during the workflow and retry when the workflow fails or timeout. The number of max retries is set in config.explorer.max_retry_times and the max timeout is set in config.explorer.max_timeout.

__init__(config: Config, models: List[InferenceModel], auxiliary_models: List[List[InferenceModel]] | None = None)[source]
run_tasks(tasks: List[Task] | Task) None[source]

Schedule a list of tasks to run in the pool.

Parameters:

tasks – A list of tasks.

has_next()[source]

Returns whether there are any pending results to return.

Returns:

True if there are any pending results not yet returned.

get_next_unorder() List[Status][source]

Returns the next pending result unorder.

Returns:

The return status of the next task.

get_next() Status[source]

Returns the next pending result in order.

This returns the next task result, blocking for up to the specified timeout until it is available.

Returns:

The return status of the next task.

has_free()[source]

Returns whether there are any idle actors available.

Returns:

True if there are any idle actors and no pending submits.

pop_idle()[source]

Removes an idle actor from the pool.

Returns:

An idle actor if one is available. None if no actor was free to be removed.