trinity.buffer.ray_wrapper module

Ray actor wrapper for different buffers.

class trinity.buffer.ray_wrapper.DBWrapper(storage_config: StorageConfig, config: BufferConfig)[source]

Bases: object

A wrapper of a SQL database.

If wrap_in_ray in StorageConfig is True, this class will be run as a Ray Actor, and provide a remote interface to the local database.

For databases that do not support multi-processing read/write (e.g. sqlite, duckdb), we recommend setting wrap_in_ray to True

__init__(storage_config: StorageConfig, config: BufferConfig) None[source]
classmethod get_wrapper(storage_config: StorageConfig, config: BufferConfig)[source]
write(data: list) None[source]
read(batch_size: int | None = None, strategy: ReadStrategy | None = None) List[source]
acquire() int[source]
release() int[source]
class trinity.buffer.ray_wrapper.FileWrapper(storage_config: StorageConfig, config: BufferConfig)[source]

Bases: object

A wrapper of a local jsonl file.

If wrap_in_ray in StorageConfig is True, this class will be run as a Ray Actor, and provide a remote interface to the local file.

This wrapper is only for writing, if you want to read from the file, use StorageType.QUEUE instead.

__init__(storage_config: StorageConfig, config: BufferConfig) None[source]
classmethod get_wrapper(storage_config: StorageConfig, config: BufferConfig)[source]
write(data: List) None[source]
read() List[source]
acquire() int[source]
release() int[source]
trinity.buffer.ray_wrapper.is_database_url(path: str) bool[source]
trinity.buffer.ray_wrapper.is_json_file(path: str) bool[source]
class trinity.buffer.ray_wrapper.QueueWrapper(storage_config: StorageConfig, config: BufferConfig)[source]

Bases: object

An wrapper of a async queue.

__init__(storage_config: StorageConfig, config: BufferConfig) None[source]
async acquire() int[source]
async release() int[source]

Release the queue.

length() int[source]

The length of the queue.

async put_batch(exp_list: List) None[source]

Put batch of experience.

async get_batch(batch_size: int, timeout: float) List[source]

Get batch of experience.

classmethod get_wrapper(storage_config: StorageConfig, config: BufferConfig)[source]

Get the queue actor.