trinity.buffer.storage.queue module#

Ray Queue storage

trinity.buffer.storage.queue.is_database_url(path: str) bool[源代码]#
trinity.buffer.storage.queue.is_json_file(path: str) bool[源代码]#
class trinity.buffer.storage.queue.PriorityFunction[源代码]#

基类:ABC

Each priority_fn,
Args:

item: List[Experience], assume that all experiences in it have the same model_version and use_count priority_fn_args: Dict, the arguments for priority_fn

Returns:

priority: float put_into_queue: bool, decide whether to put item into queue

Note that put_into_queue takes effect both for new item from the explorer and for item sampled from the buffer.

abstractmethod classmethod default_config() Dict[源代码]#

Return the default config.

class trinity.buffer.storage.queue.LinearDecayPriority(decay: float = 2.0)[源代码]#

基类:PriorityFunction

Calculate priority by linear decay.

Priority is calculated as model_version - decay * use_count. The item is always put back into the queue for reuse (as long as `reuse_cooldown_time is not None).

__init__(decay: float = 2.0)[源代码]#
classmethod default_config() Dict[源代码]#

Return the default config.

class trinity.buffer.storage.queue.LinearDecayUseCountControlPriority(decay: float = 2.0, use_count_limit: int = 3, sigma: float = 0.0)[源代码]#

基类:PriorityFunction

Calculate priority by linear decay, use count control, and randomization.

Priority is calculated as model_version - decay * use_count; if sigma is non-zero, priority is further perturbed by random Gaussian noise with standard deviation sigma. The item will be put back into the queue only if use count does not exceed use_count_limit.

__init__(decay: float = 2.0, use_count_limit: int = 3, sigma: float = 0.0)[源代码]#
classmethod default_config() Dict[源代码]#

Return the default config.

class trinity.buffer.storage.queue.QueueBuffer[源代码]#

基类:ABC

async set_min_model_version(min_model_version: int)[源代码]#
abstractmethod async put(exps: List[Experience]) None[源代码]#

Put a list of experiences into the queue.

abstractmethod async get() List[Experience][源代码]#

Get a list of experience from the queue.

abstractmethod qsize() int[源代码]#

Get the current size of the queue.

abstractmethod async close() None[源代码]#

Close the queue.

abstractmethod stopped() bool[源代码]#

Check if there is no more data to read.

classmethod get_queue(config: StorageConfig) QueueBuffer[源代码]#

Get a queue instance based on the storage configuration.

class trinity.buffer.storage.queue.AsyncQueue(capacity: int)[源代码]#

基类:Queue, QueueBuffer

__init__(capacity: int)[源代码]#

Initialize the async queue with a specified capacity.

参数:

capacity (int) -- The maximum number of items the queue can hold.

async put(item: List[Experience])[源代码]#

Put an item into the queue.

Put an item into the queue. If the queue is full, wait until a free slot is available before adding item.

async get()[源代码]#

Remove and return an item from the queue.

If queue is empty, wait until an item is available.

async close() None[源代码]#

Close the queue.

stopped() bool[源代码]#

Check if there is no more data to read.

class trinity.buffer.storage.queue.AsyncPriorityQueue(capacity: int, reuse_cooldown_time: float | None = None, priority_fn: str = 'linear_decay', priority_fn_args: dict | None = None)[源代码]#

基类:QueueBuffer

An asynchronous priority queue that manages a fixed-size buffer of experience items. Items are prioritized using a user-defined function and reinserted after a cooldown period.

capacity#

Maximum number of items the queue can hold. This value is automatically adjusted to be at most twice the read batch size.

Type:

int

reuse_cooldown_time#

Delay before reusing an item (set to infinity to disable).

Type:

float

priority_fn#

Function used to determine the priority of an item.

Type:

callable

priority_groups#

Maps priorities to deques of items with the same priority.

Type:

SortedDict

__init__(capacity: int, reuse_cooldown_time: float | None = None, priority_fn: str = 'linear_decay', priority_fn_args: dict | None = None)[源代码]#

Initialize the async priority queue.

参数:
  • capacity (int) -- The maximum number of items the queue can store.

  • reuse_cooldown_time (float) -- Time to wait before reusing an item. Set to None to disable reuse.

  • priority_fn (str) -- Name of the function to use for determining item priority.

  • kwargs -- Additional keyword arguments for the priority function.

async put(item: List[Experience]) None[源代码]#

Put a list of experiences into the queue.

async get() List[Experience][源代码]#

Retrieve the highest-priority item from the queue.

返回:

The highest-priority item (list of experiences).

返回类型:

List[Experience]

备注

  • After retrieval, the item is optionally reinserted after a cooldown period.

qsize()[源代码]#

Get the current size of the queue.

async close() None[源代码]#

Close the queue.

stopped() bool[源代码]#

Check if there is no more data to read.

class trinity.buffer.storage.queue.QueueStorage(config: StorageConfig)[源代码]#

基类:object

An wrapper of a async queue.

__init__(config: StorageConfig) None[源代码]#
async acquire() int[源代码]#
async release() int[源代码]#

Release the queue.

length() int[源代码]#

The length of the queue.

async put_batch(exp_list: List) None[源代码]#

Put batch of experience.

async get_batch(batch_size: int, timeout: float, min_model_version: int = 0) List[源代码]#

Get batch of experience.

classmethod get_wrapper(config: StorageConfig)[源代码]#

Get the queue actor.