[docs]defget_remote_dedup_set():"""Get the remote version of DedupSet with Ray decorator applied at runtime."""returnray.remote(scheduling_strategy="SPREAD")(DedupSet)
[docs]classBackend(ABC):""" Backend for deduplicator. """
[docs]classRayBasicDeduplicator(Filter):""" A basic exact matching deduplicator for RAY. Although its functionality is deduplication, it is implemented as Filter sub-class. """# TODO: Set a more reasonable valueEMPTY_HASH_VALUE="EMPTY"
[docs]def__init__(self,backend:str="ray_actor",redis_address:str="redis://localhost:6379",*args,**kwargs):""" Initialization. :param backend: the backend for dedup, either 'ray_actor' or 'redis' :param redis_address: the address of redis server :param args: extra args :param kwargs: extra args """super().__init__(*args,**kwargs)self.redis_address=redis_addressself.backend=backendifbackend=="ray_actor":dedup_set_num=int(ray.cluster_resources().get("CPU")/2)self.backend=ActorBackend(dedup_set_num)elifbackend=="redis":# TODO: add a barrier to ensure that flushdb is performed before# the operator is calledself.backend=RedisBackend(redis_address)else:raiseValueError(f"Unknown backend: {backend}")
[docs]defcalculate_hash(self,sample,context=False):"""Calculate hash value for the sample."""raiseNotImplementedError