[docs]defget_remote_dedup_set():"""Get the remote version of DedupSet with Ray decorator applied at runtime."""returnray.remote(scheduling_strategy='SPREAD')(DedupSet)
[docs]classBackend(ABC):""" Backend for deduplicator. """
[docs]classRayBasicDeduplicator(Filter):""" A basic exact matching deduplicator for RAY. Although its functionality is deduplication, it is implemented as Filter sub-class. """# TODO: Set a more reasonable valueEMPTY_HASH_VALUE='EMPTY'
[docs]def__init__(self,backend:str='ray_actor',redis_address:str='redis://localhost:6379',*args,**kwargs):""" Initialization. :param backend: the backend for dedup, either 'ray_actor' or 'redis' :param redis_address: the address of redis server :param args: extra args :param kwargs: extra args """super().__init__(*args,**kwargs)self.redis_address=redis_addressself.backend=backendifbackend=='ray_actor':dedup_set_num=int(ray.cluster_resources().get('CPU')/2)self.backend=ActorBackend(dedup_set_num)elifbackend=='redis':# TODO: add a barrier to ensure that flushdb is performed before# the operator is calledself.backend=RedisBackend(redis_address)else:raiseValueError(f'Unknown backend: {backend}')
[docs]defcalculate_hash(self,sample,context=False):"""Calculate hash value for the sample."""raiseNotImplementedError