Source code for data_juicer.ops.deduplicator.ray_image_deduplicator

import numpy as np
from pydantic import PositiveInt

from data_juicer.utils.lazy_loader import LazyLoader
from data_juicer.utils.mm_utils import load_data_with_context, load_image

from ..base_op import OPERATORS
from ..op_fusion import LOADED_IMAGES
from .ray_basic_deduplicator import RayBasicDeduplicator

imgdedup_methods = LazyLoader('imgdedup_methods', 'imagededup.methods')

OP_NAME = 'ray_image_deduplicator'

HASH_METHOD = {'phash', 'dhash', 'whash', 'ahash'}


[docs] def get_hash_method(method_name): mapping = { 'phash': imgdedup_methods.PHash, 'dhash': imgdedup_methods.DHash, 'whash': imgdedup_methods.WHash, 'ahash': imgdedup_methods.AHash } return mapping[method_name]
[docs] @OPERATORS.register_module(OP_NAME) @LOADED_IMAGES.register_module(OP_NAME) class RayImageDeduplicator(RayBasicDeduplicator): """ Deduplicator to deduplicate samples at document-level using exact matching of images between documents. """
[docs] def __init__(self, redis_host: str = 'localhost', redis_port: PositiveInt = 6380, method: str = 'phash', *args, **kwargs): """ Initialization. :param redis_host: the hostname of redis server :param redis_port: the port of redis server :param args: extra args :param kwargs: extra args """ super().__init__(redis_host=redis_host, redis_port=redis_port, *args, **kwargs) if method not in HASH_METHOD: raise ValueError(f'Keep strategy [{method}] is not supported. ' f'Can only be one of {HASH_METHOD}.') self.hasher = get_hash_method(method)()
[docs] def calculate_hash(self, sample, context=False): if self.image_key not in sample or not sample[self.image_key]: return RayBasicDeduplicator.EMPTY_HASH_VALUE # load images loaded_image_keys = sample[self.image_key] sample, images = load_data_with_context(sample, context, loaded_image_keys, load_image) # compute hash hash_value = '' for key in images: hash_value += self.hasher.encode_image( image_array=np.array(images[key])) return hash_value