from pydantic import PositiveInt
from data_juicer.utils.constant import HashKeys
from data_juicer.utils.lazy_loader import LazyLoader
from ..base_op import Filter
redis = LazyLoader('redis', 'redis')
[docs]
class RayBasicDeduplicator(Filter):
"""
A basic exact matching deduplicator for RAY.
Although its functionality is deduplication,
it is implemented as Filter sub-class.
"""
# TODO: Set a more reasonable value
EMPTY_HASH_VALUE = 'EMPTY'
[docs]
def __init__(self,
redis_host: str = 'localhost',
redis_port: PositiveInt = 6380,
*args,
**kwargs):
"""
Initialization.
:param redis_host: the hostname of redis server
:param redis_port: the port of redis server
:param args: extra args
:param kwargs: extra args
"""
super().__init__(*args, **kwargs)
self.redis_host = redis_host
self.redis_port = redis_port
# TODO: add a barrier to ensure that flushdb is performed before
# the operator is called
r = redis.StrictRedis(host=self.redis_host, port=self.redis_port, db=0)
r.flushdb(0)
[docs]
def calculate_hash(self, sample, context=False):
"""Calculate hash value for the sample."""
raise NotImplementedError
[docs]
def compute_stats_single(self, sample, context=False):
# init redis client
r = redis.StrictRedis(host=self.redis_host, port=self.redis_port, db=0)
# compute hash
md5_value = self.calculate_hash(sample, context)
# check existing
sample[HashKeys.is_duplicate] = r.setnx(md5_value, 1)
return sample
[docs]
def process_single(self, sample):
return sample[HashKeys.is_duplicate]