trinity.buffer.storage.queue module#
Ray Queue storage
- class trinity.buffer.storage.queue.PriorityFunction[source]#
Bases:
ABC- Each priority_fn,
- Args:
item: List[Experience], assume that all experiences in it have the same model_version and use_count priority_fn_args: Dict, the arguments for priority_fn
- Returns:
priority: float put_into_queue: bool, decide whether to put item into queue
Note that put_into_queue takes effect both for new item from the explorer and for item sampled from the buffer.
- class trinity.buffer.storage.queue.LinearDecayPriority(decay: float = 2.0)[source]#
Bases:
PriorityFunctionCalculate priority by linear decay.
Priority is calculated as model_version - decay * use_count. The item is always put back into the queue for reuse (as long as `reuse_cooldown_time is not None).
- class trinity.buffer.storage.queue.LinearDecayUseCountControlPriority(decay: float = 2.0, use_count_limit: int = 3, sigma: float = 0.0)[source]#
Bases:
PriorityFunctionCalculate priority by linear decay, use count control, and randomization.
Priority is calculated as model_version - decay * use_count; if sigma is non-zero, priority is further perturbed by random Gaussian noise with standard deviation sigma. The item will be put back into the queue only if use count does not exceed use_count_limit.
- class trinity.buffer.storage.queue.QueueBuffer[source]#
Bases:
ABC- abstract async put(exps: List[Experience]) None[source]#
Put a list of experiences into the queue.
- abstract async get() List[Experience][source]#
Get a list of experience from the queue.
- classmethod get_queue(config: StorageConfig) QueueBuffer[source]#
Get a queue instance based on the storage configuration.
- class trinity.buffer.storage.queue.AsyncQueue(capacity: int)[source]#
Bases:
Queue,QueueBuffer
- class trinity.buffer.storage.queue.AsyncPriorityQueue(capacity: int, reuse_cooldown_time: float | None = None, priority_fn: str = 'linear_decay', priority_fn_args: dict | None = None)[source]#
Bases:
QueueBufferAn asynchronous priority queue that manages a fixed-size buffer of experience items. Items are prioritized using a user-defined function and reinserted after a cooldown period.
- capacity#
Maximum number of items the queue can hold. This value is automatically adjusted to be at most twice the read batch size.
- Type:
int
- reuse_cooldown_time#
Delay before reusing an item (set to infinity to disable).
- Type:
float
- priority_fn#
Function used to determine the priority of an item.
- Type:
callable
- priority_groups#
Maps priorities to deques of items with the same priority.
- Type:
SortedDict
- __init__(capacity: int, reuse_cooldown_time: float | None = None, priority_fn: str = 'linear_decay', priority_fn_args: dict | None = None)[source]#
Initialize the async priority queue.
- Parameters:
capacity (int) – The maximum number of items the queue can store.
reuse_cooldown_time (float) – Time to wait before reusing an item. Set to None to disable reuse.
priority_fn (str) – Name of the function to use for determining item priority.
kwargs – Additional keyword arguments for the priority function.
- async put(item: List[Experience]) None[source]#
Put a list of experiences into the queue.
- async get() List[Experience][source]#
Retrieve the highest-priority item from the queue.
- Returns:
The highest-priority item (list of experiences).
- Return type:
List[Experience]
Notes
After retrieval, the item is optionally reinserted after a cooldown period.
- class trinity.buffer.storage.queue.QueueStorage(config: StorageConfig)[source]#
Bases:
objectAn wrapper of a async queue.
- __init__(config: StorageConfig) None[source]#
- classmethod get_wrapper(config: StorageConfig)[source]#
Get the queue actor.