trinity.explorer.scheduler module

Scheduler for rollout tasks.

class trinity.explorer.scheduler.TaskWrapper(task: Task, batch_id: int | str, run_id_base: int = 0, repeat_times: int = 1)[source]

Bases: object

A wrapper for a task.

task: Task
batch_id: int | str
run_id_base: int = 0
repeat_times: int = 1
__init__(task: Task, batch_id: int | str, run_id_base: int = 0, repeat_times: int = 1) None
class trinity.explorer.scheduler.RunnerWrapper(runner_id: int, rollout_model: InferenceModel, auxiliary_models: List[InferenceModel], config: Config)[source]

Bases: object

A wrapper for a WorkflowRunner

__init__(runner_id: int, rollout_model: InferenceModel, auxiliary_models: List[InferenceModel], config: Config)[source]
async run_with_retry(task: TaskWrapper) Tuple[Status, List, int][source]
Returns:

The return status of the task. List: The experiences generated by the task. int: The runner_id of current runner.

Return type:

Status

restart_runner()[source]
trinity.explorer.scheduler.sort_batch_id(batch_id: int | str)[source]

Priority of batch_id

class trinity.explorer.scheduler.Scheduler(config: Config, rollout_model: List[InferenceModel], auxiliary_models: List[List[InferenceModel]] | None = None)[source]

Bases: object

Scheduler for rollout tasks.

__init__(config: Config, rollout_model: List[InferenceModel], auxiliary_models: List[List[InferenceModel]] | None = None)[source]
task_done_callback(async_task: Task)[source]
async start() None[source]
async stop() None[source]
schedule(tasks: List[Task], batch_id: int | str) None[source]

Schedule the provided tasks.

Parameters:
  • tasks (List[Task]) – The tasks to schedule.

  • batch_id (Union[int, str]) – The id of provided tasks. It should be an integer or a string starting with an integer (e.g., 123, “123/my_task”)

async get_results(batch_id: int | str, min_num: int | None = None, timeout: float | None = None, clear_timeout_tasks: bool = True) Tuple[List[Status], List[Experience]][source]

Get the result of tasks at the specific batch_id.

Parameters:
  • batch_id (Union[int, str]) – Only wait for tasks at this batch.

  • min_num (int) – The minimum number of tasks to wait for. If None, wait for all tasks at batch_id.

  • timeout (float) – The timeout for waiting for tasks to finish. If None, wait for default timeout.

  • clear_timeout_tasks (bool) – Whether to clear timeout tasks.

has_step(batch_id: int | str) bool[source]
async wait_all(timeout: float | None = None, clear_timeout_tasks: bool = True) None[source]

Wait for all tasks to complete without poping results. If timeout reached, raise TimeoutError.

Parameters:
  • timeout (float) – timeout in seconds. Raise TimeoutError when no new tasks is completed within timeout.

  • clear_timeout_tasks (bool) – Whether to clear timeout tasks.