trinity.buffer
Subpackages
- trinity.buffer.reader
- trinity.buffer.schema
- trinity.buffer.writer
Submodules
trinity.buffer.buffer module
The buffer module
- trinity.buffer.buffer.get_buffer_reader(storage_config: StorageConfig, buffer_config: BufferConfig) BufferReader [source]
Get a buffer reader for the given dataset name.
- trinity.buffer.buffer.get_buffer_writer(storage_config: StorageConfig, buffer_config: BufferConfig) BufferWriter [source]
Get a buffer writer for the given dataset name.
trinity.buffer.buffer_reader module
Reader of the buffer.
- class trinity.buffer.buffer_reader.BufferReader[source]
Bases:
ABC
Interface of the buffer reader.
- abstract read(batch_size: int | None = None, strategy: ReadStrategy | None = None) List [source]
Read from buffer.
- abstract async read_async(batch_size: int | None = None, strategy: ReadStrategy | None = None) List [source]
Read from buffer asynchronously.
trinity.buffer.buffer_writer module
Writer of the buffer.
trinity.buffer.queue module
Implementation of async queue buffers.
- trinity.buffer.queue.linear_decay_priority(item: List[Experience], decay: float = 0.1)[source]
- class trinity.buffer.queue.QueueBuffer[source]
Bases:
ABC
- abstract async put(exps: List[Experience]) None [source]
Put a list of experiences into the queue.
- abstract async get() List[Experience] [source]
Get a list of experience from the queue.
- classmethod get_queue(storage_config: StorageConfig, config: BufferConfig) QueueBuffer [source]
Get a queue instance based on the storage configuration.
- class trinity.buffer.queue.AsyncQueue(capacity: int)[source]
Bases:
Queue
,QueueBuffer
- class trinity.buffer.queue.AsyncPriorityQueue(capacity: int, reuse_cooldown_time: float | None = None, priority_fn: str = 'linear_decay', **kwargs)[source]
Bases:
QueueBuffer
An asynchronous priority queue that manages a fixed-size buffer of experience items. Items are prioritized using a user-defined function and reinserted after a cooldown period.
- capacity
Maximum number of items the queue can hold. This value is automatically adjusted to be at most twice the read batch size.
- Type:
int
- priority_groups
Maps priorities to deques of items with the same priority.
- Type:
SortedDict
- priority_fn
Function used to determine the priority of an item.
- Type:
callable
- reuse_cooldown_time
Delay before reusing an item (set to infinity to disable).
- Type:
float
- __init__(capacity: int, reuse_cooldown_time: float | None = None, priority_fn: str = 'linear_decay', **kwargs)[source]
Initialize the async priority queue.
- Parameters:
capacity (int) – The maximum number of items the queue can store.
reuse_cooldown_time (float) – Time to wait before reusing an item. Set to None to disable reuse.
priority_fn (str) – Name of the function to use for determining item priority.
kwargs – Additional keyword arguments for the priority function.
- async put(item: List[Experience]) None [source]
Put a list of experiences into the queue.
- async get() List[Experience] [source]
Retrieve the highest-priority item from the queue.
- Returns:
The highest-priority item (list of experiences).
- Return type:
List[Experience]
Notes
After retrieval, the item is optionally reinserted after a cooldown period.
trinity.buffer.ray_wrapper module
Ray actor wrapper for different buffers.
- class trinity.buffer.ray_wrapper.DBWrapper(storage_config: StorageConfig, config: BufferConfig)[source]
Bases:
object
A wrapper of a SQL database.
If wrap_in_ray in StorageConfig is True, this class will be run as a Ray Actor, and provide a remote interface to the local database.
For databases that do not support multi-processing read/write (e.g. sqlite, duckdb), we recommend setting wrap_in_ray to True
- __init__(storage_config: StorageConfig, config: BufferConfig) None [source]
- classmethod get_wrapper(storage_config: StorageConfig, config: BufferConfig)[source]
- read(batch_size: int | None = None, strategy: ReadStrategy | None = None) List [source]
- class trinity.buffer.ray_wrapper.FileWrapper(storage_config: StorageConfig, config: BufferConfig)[source]
Bases:
object
A wrapper of a local jsonl file.
If wrap_in_ray in StorageConfig is True, this class will be run as a Ray Actor, and provide a remote interface to the local file.
This wrapper is only for writing, if you want to read from the file, use StorageType.QUEUE instead.
- __init__(storage_config: StorageConfig, config: BufferConfig) None [source]
- classmethod get_wrapper(storage_config: StorageConfig, config: BufferConfig)[source]
- class trinity.buffer.ray_wrapper.QueueWrapper(storage_config: StorageConfig, config: BufferConfig)[source]
Bases:
object
An wrapper of a async queue.
- __init__(storage_config: StorageConfig, config: BufferConfig) None [source]
- classmethod get_wrapper(storage_config: StorageConfig, config: BufferConfig)[source]
Get the queue actor.
trinity.buffer.utils module
- trinity.buffer.utils.retry_session(session_maker, max_retry_times: int, max_retry_interval: float)[source]
A Context manager for retrying session.
- trinity.buffer.utils.default_storage_path(storage_config: StorageConfig, buffer_config: BufferConfig) str [source]
Module contents
- class trinity.buffer.BufferReader[source]
Bases:
ABC
Interface of the buffer reader.
- abstract read(batch_size: int | None = None, strategy: ReadStrategy | None = None) List [source]
Read from buffer.
- abstract async read_async(batch_size: int | None = None, strategy: ReadStrategy | None = None) List [source]
Read from buffer asynchronously.
- class trinity.buffer.BufferWriter[source]
Bases:
ABC
Interface of the buffer writer.
- trinity.buffer.get_buffer_reader(storage_config: StorageConfig, buffer_config: BufferConfig) BufferReader [source]
Get a buffer reader for the given dataset name.
- trinity.buffer.get_buffer_writer(storage_config: StorageConfig, buffer_config: BufferConfig) BufferWriter [source]
Get a buffer writer for the given dataset name.