trinity.buffer.queue module
Implementation of async queue buffers.
- trinity.buffer.queue.linear_decay_priority(item: List[Experience], decay: float = 0.1)[source]
- class trinity.buffer.queue.QueueBuffer[source]
Bases:
ABC
- abstract async put(exps: List[Experience]) None [source]
Put a list of experiences into the queue.
- abstract async get() List[Experience] [source]
Get a list of experience from the queue.
- classmethod get_queue(storage_config: StorageConfig, config: BufferConfig) QueueBuffer [source]
Get a queue instance based on the storage configuration.
- class trinity.buffer.queue.AsyncQueue(capacity: int)[source]
Bases:
Queue
,QueueBuffer
- class trinity.buffer.queue.AsyncPriorityQueue(capacity: int, reuse_cooldown_time: float | None = None, priority_fn: str = 'linear_decay', **kwargs)[source]
Bases:
QueueBuffer
An asynchronous priority queue that manages a fixed-size buffer of experience items. Items are prioritized using a user-defined function and reinserted after a cooldown period.
- capacity
Maximum number of items the queue can hold. This value is automatically adjusted to be at most twice the read batch size.
- Type:
int
- priority_groups
Maps priorities to deques of items with the same priority.
- Type:
SortedDict
- priority_fn
Function used to determine the priority of an item.
- Type:
callable
- reuse_cooldown_time
Delay before reusing an item (set to infinity to disable).
- Type:
float
- __init__(capacity: int, reuse_cooldown_time: float | None = None, priority_fn: str = 'linear_decay', **kwargs)[source]
Initialize the async priority queue.
- Parameters:
capacity (int) – The maximum number of items the queue can store.
reuse_cooldown_time (float) – Time to wait before reusing an item. Set to None to disable reuse.
priority_fn (str) – Name of the function to use for determining item priority.
kwargs – Additional keyword arguments for the priority function.
- async put(item: List[Experience]) None [source]
Put a list of experiences into the queue.
- async get() List[Experience] [source]
Retrieve the highest-priority item from the queue.
- Returns:
The highest-priority item (list of experiences).
- Return type:
List[Experience]
Notes
After retrieval, the item is optionally reinserted after a cooldown period.