Source code for trinity.buffer.reader.sql_reader
"""Reader of the SQL buffer."""
from typing import List, Optional
import ray
from trinity.buffer.buffer_reader import BufferReader
from trinity.buffer.storage.sql import SQLStorage
from trinity.common.config import BufferConfig, StorageConfig
from trinity.common.constants import StorageType
[docs]
class SQLReader(BufferReader):
"""Reader of the SQL buffer."""
[docs]
def __init__(self, meta: StorageConfig, config: BufferConfig) -> None:
assert meta.storage_type == StorageType.SQL
self.wrap_in_ray = meta.wrap_in_ray
self.storage = SQLStorage.get_wrapper(meta, config)
[docs]
def read(self, batch_size: Optional[int] = None) -> List:
if self.wrap_in_ray:
return ray.get(self.storage.read.remote(batch_size))
else:
return self.storage.read(batch_size)
[docs]
async def read_async(self, batch_size: Optional[int] = None) -> List:
if self.wrap_in_ray:
try:
return ray.get(self.storage.read.remote(batch_size))
except StopIteration:
raise StopAsyncIteration
else:
return self.storage.read(batch_size)