trinity.explorer.workflow_runner module#

The Workflow Runner Module.

class trinity.explorer.workflow_runner.Status(ok: bool, metrics: List[Dict[str, float]], message: str | None = None)[source]#

Bases: object

Status of the task running result.

ok: bool#
metrics: List[Dict[str, float]]#
message: str | None = None#
__init__(ok: bool, metrics: List[Dict[str, float]], message: str | None = None) None#
trinity.explorer.workflow_runner.calculate_run_level_metrics(experiences: List[Experience]) Dict[str, float][source]#

Calculate metrics from experiences.

For non-repeatable workflows, this function will average the metrics from experiences generated by each run, which is equivalent to calculating run level metrics.

For repeatable workflows, please do not use this function.

class trinity.explorer.workflow_runner.WorkflowRunner(config: Config, model: InferenceModel, auxiliary_models: List[InferenceModel] | None = None, runner_id: int | None = None)[source]#

Bases: object

A Ray remote actor to run the workflow and generate experiences.

__init__(config: Config, model: InferenceModel, auxiliary_models: List[InferenceModel] | None = None, runner_id: int | None = None) None[source]#
async prepare() None[source]#

Prepare the runner.

is_alive()[source]#
async get_runner_state() Dict[source]#

Get the runner state.

async run_task(task: Task, repeat_times: int = 1, run_id_base: int = 0) Tuple[Status, List[Experience]][source]#

Run the task and return the states.

class trinity.explorer.workflow_runner.DebugWorkflowRunner(config: Config, output_file: str)[source]#

Bases: WorkflowRunner

A WorkflowRunner for debugging.

__init__(config: Config, output_file: str) None[source]#
async debug() None[source]#

Run the debug workflow.