trinity.buffer

Subpackages

Submodules

trinity.buffer.buffer module

The buffer module

trinity.buffer.buffer.get_buffer_reader(storage_config: StorageConfig, buffer_config: BufferConfig) BufferReader[source]

Get a buffer reader for the given dataset name.

trinity.buffer.buffer.get_buffer_writer(storage_config: StorageConfig, buffer_config: BufferConfig) BufferWriter[source]

Get a buffer writer for the given dataset name.

trinity.buffer.buffer_reader module

Reader of the buffer.

class trinity.buffer.buffer_reader.BufferReader[source]

Bases: ABC

Interface of the buffer reader.

abstract read(batch_size: int | None = None, strategy: ReadStrategy | None = None) List[source]

Read from buffer.

abstract async read_async(batch_size: int | None = None, strategy: ReadStrategy | None = None) List[source]

Read from buffer asynchronously.

trinity.buffer.buffer_writer module

Writer of the buffer.

class trinity.buffer.buffer_writer.BufferWriter[source]

Bases: ABC

Interface of the buffer writer.

abstract write(data: List) None[source]

Write to buffer.

abstract async write_async(data: List) None[source]

Write to buffer asynchronously.

abstract async acquire() int[source]

Acquire the buffer writer.

Returns:

The reference count of the buffer after acquiring.

Return type:

int

abstract async release() int[source]

Release the buffer writer. After release, the buffer writer can not be used again.

Returns:

The reference count of the buffer after releasing.

Return type:

int

trinity.buffer.queue module

Implementation of async queue buffers.

trinity.buffer.queue.linear_decay_priority(item: List[Experience], decay: float = 0.1)[source]
class trinity.buffer.queue.QueueBuffer[source]

Bases: ABC

abstract async put(exps: List[Experience]) None[source]

Put a list of experiences into the queue.

abstract async get() List[Experience][source]

Get a list of experience from the queue.

abstract qsize() int[source]

Get the current size of the queue.

abstract async close() None[source]

Close the queue.

abstract stopped() bool[source]

Check if there is no more data to read.

classmethod get_queue(storage_config: StorageConfig, config: BufferConfig) QueueBuffer[source]

Get a queue instance based on the storage configuration.

class trinity.buffer.queue.AsyncQueue(capacity: int)[source]

Bases: Queue, QueueBuffer

__init__(capacity: int)[source]

Initialize the async queue with a specified capacity.

Parameters:

capacity (int) – The maximum number of items the queue can hold.

async close() None[source]

Close the queue.

stopped() bool[source]

Check if there is no more data to read.

class trinity.buffer.queue.AsyncPriorityQueue(capacity: int, reuse_cooldown_time: float | None = None, priority_fn: str = 'linear_decay', **kwargs)[source]

Bases: QueueBuffer

An asynchronous priority queue that manages a fixed-size buffer of experience items. Items are prioritized using a user-defined function and reinserted after a cooldown period.

capacity

Maximum number of items the queue can hold. This value is automatically adjusted to be at most twice the read batch size.

Type:

int

priority_groups

Maps priorities to deques of items with the same priority.

Type:

SortedDict

priority_fn

Function used to determine the priority of an item.

Type:

callable

reuse_cooldown_time

Delay before reusing an item (set to infinity to disable).

Type:

float

__init__(capacity: int, reuse_cooldown_time: float | None = None, priority_fn: str = 'linear_decay', **kwargs)[source]

Initialize the async priority queue.

Parameters:
  • capacity (int) – The maximum number of items the queue can store.

  • reuse_cooldown_time (float) – Time to wait before reusing an item. Set to None to disable reuse.

  • priority_fn (str) – Name of the function to use for determining item priority.

  • kwargs – Additional keyword arguments for the priority function.

async put(item: List[Experience]) None[source]

Put a list of experiences into the queue.

async get() List[Experience][source]

Retrieve the highest-priority item from the queue.

Returns:

The highest-priority item (list of experiences).

Return type:

List[Experience]

Notes

  • After retrieval, the item is optionally reinserted after a cooldown period.

qsize()[source]

Get the current size of the queue.

async close() None[source]

Close the queue.

stopped() bool[source]

Check if there is no more data to read.

trinity.buffer.ray_wrapper module

Ray actor wrapper for different buffers.

class trinity.buffer.ray_wrapper.DBWrapper(storage_config: StorageConfig, config: BufferConfig)[source]

Bases: object

A wrapper of a SQL database.

If wrap_in_ray in StorageConfig is True, this class will be run as a Ray Actor, and provide a remote interface to the local database.

For databases that do not support multi-processing read/write (e.g. sqlite, duckdb), we recommend setting wrap_in_ray to True

__init__(storage_config: StorageConfig, config: BufferConfig) None[source]
classmethod get_wrapper(storage_config: StorageConfig, config: BufferConfig)[source]
write(data: list) None[source]
read(batch_size: int | None = None, strategy: ReadStrategy | None = None) List[source]
acquire() int[source]
release() int[source]
class trinity.buffer.ray_wrapper.FileWrapper(storage_config: StorageConfig, config: BufferConfig)[source]

Bases: object

A wrapper of a local jsonl file.

If wrap_in_ray in StorageConfig is True, this class will be run as a Ray Actor, and provide a remote interface to the local file.

This wrapper is only for writing, if you want to read from the file, use StorageType.QUEUE instead.

__init__(storage_config: StorageConfig, config: BufferConfig) None[source]
classmethod get_wrapper(storage_config: StorageConfig, config: BufferConfig)[source]
write(data: List) None[source]
read() List[source]
acquire() int[source]
release() int[source]
trinity.buffer.ray_wrapper.is_database_url(path: str) bool[source]
trinity.buffer.ray_wrapper.is_json_file(path: str) bool[source]
class trinity.buffer.ray_wrapper.QueueWrapper(storage_config: StorageConfig, config: BufferConfig)[source]

Bases: object

An wrapper of a async queue.

__init__(storage_config: StorageConfig, config: BufferConfig) None[source]
async acquire() int[source]
async release() int[source]

Release the queue.

length() int[source]

The length of the queue.

async put_batch(exp_list: List) None[source]

Put batch of experience.

async get_batch(batch_size: int, timeout: float) List[source]

Get batch of experience.

classmethod get_wrapper(storage_config: StorageConfig, config: BufferConfig)[source]

Get the queue actor.

trinity.buffer.utils module

trinity.buffer.utils.retry_session(session_maker, max_retry_times: int, max_retry_interval: float)[source]

A Context manager for retrying session.

trinity.buffer.utils.default_storage_path(storage_config: StorageConfig, buffer_config: BufferConfig) str[source]

Module contents

class trinity.buffer.BufferReader[source]

Bases: ABC

Interface of the buffer reader.

abstract read(batch_size: int | None = None, strategy: ReadStrategy | None = None) List[source]

Read from buffer.

abstract async read_async(batch_size: int | None = None, strategy: ReadStrategy | None = None) List[source]

Read from buffer asynchronously.

class trinity.buffer.BufferWriter[source]

Bases: ABC

Interface of the buffer writer.

abstract write(data: List) None[source]

Write to buffer.

abstract async write_async(data: List) None[source]

Write to buffer asynchronously.

abstract async acquire() int[source]

Acquire the buffer writer.

Returns:

The reference count of the buffer after acquiring.

Return type:

int

abstract async release() int[source]

Release the buffer writer. After release, the buffer writer can not be used again.

Returns:

The reference count of the buffer after releasing.

Return type:

int

trinity.buffer.get_buffer_reader(storage_config: StorageConfig, buffer_config: BufferConfig) BufferReader[source]

Get a buffer reader for the given dataset name.

trinity.buffer.get_buffer_writer(storage_config: StorageConfig, buffer_config: BufferConfig) BufferWriter[source]

Get a buffer writer for the given dataset name.