Source code for trinity.buffer.writer.file_writer

from typing import List

import ray

from trinity.buffer.buffer_writer import BufferWriter
from trinity.buffer.ray_wrapper import FileWrapper
from trinity.common.config import BufferConfig, StorageConfig
from trinity.common.constants import StorageType


[docs] class JSONWriter(BufferWriter):
[docs] def __init__(self, meta: StorageConfig, config: BufferConfig): assert meta.storage_type == StorageType.FILE self.writer = FileWrapper.get_wrapper(meta, config) self.wrap_in_ray = meta.wrap_in_ray
[docs] def write(self, data: List) -> None: if self.wrap_in_ray: ray.get(self.writer.write.remote(data)) else: self.writer.write(data)
[docs] def acquire(self) -> int: if self.wrap_in_ray: return ray.get(self.writer.acquire()) else: return 0
[docs] def release(self) -> int: if self.wrap_in_ray: return ray.get(self.writer.release.remote()) else: self.writer.release() return 0