Source code for data_juicer.ops.deduplicator.ray_basic_deduplicator

from pydantic import PositiveInt

from data_juicer.utils.constant import HashKeys
from data_juicer.utils.lazy_loader import LazyLoader

from ..base_op import Filter

redis = LazyLoader('redis', 'redis')


[docs]class RayBasicDeduplicator(Filter): """ A basic exact matching deduplicator for RAY. Although its functionality is deduplication, it is implemented as Filter sub-class. """ # TODO: Set a more reasonable value EMPTY_HASH_VALUE = 'EMPTY'
[docs] def __init__(self, redis_host: str = 'localhost', redis_port: PositiveInt = 6380, *args, **kwargs): """ Initialization. :param redis_host: the hostname of redis server :param redis_port: the port of redis server :param args: extra args :param kwargs: extra args """ super().__init__(*args, **kwargs) self.redis_host = redis_host self.redis_port = redis_port # TODO: add a barrier to ensure that flushdb is performed before # the operator is called r = redis.StrictRedis(host=self.redis_host, port=self.redis_port, db=0) r.flushdb(0)
[docs] def calculate_hash(self, sample, context=False): """Calculate hash value for the sample.""" raise NotImplementedError
[docs] def compute_stats_single(self, sample, context=False): # init redis client r = redis.StrictRedis(host=self.redis_host, port=self.redis_port, db=0) # compute hash md5_value = self.calculate_hash(sample, context) # check existing sample[HashKeys.is_duplicate] = r.setnx(md5_value, 1) return sample
[docs] def process_single(self, sample): return sample[HashKeys.is_duplicate]