Source code for data_juicer.ops.deduplicator.ray_video_deduplicator

import hashlib

from pydantic import PositiveInt

from data_juicer.utils.mm_utils import (close_video, load_data_with_context,
                                        load_video)

from ..base_op import OPERATORS
from ..op_fusion import LOADED_VIDEOS
from .ray_basic_deduplicator import RayBasicDeduplicator

OP_NAME = 'ray_video_deduplicator'


[docs]@OPERATORS.register_module(OP_NAME) @LOADED_VIDEOS.register_module(OP_NAME) class RayVideoDeduplicator(RayBasicDeduplicator): """ Deduplicator to deduplicate samples at document-level using exact matching of videos between documents. """
[docs] def __init__(self, redis_host: str = 'localhost', redis_port: PositiveInt = 6380, *args, **kwargs): """ Initialization. :param redis_host: the hostname of redis server :param redis_port: the port of redis server :param args: extra args :param kwargs: extra args """ super().__init__(redis_host=redis_host, redis_port=redis_port, *args, **kwargs)
[docs] def calculate_hash(self, sample, context=False): if self.video_key not in sample or not sample[self.video_key]: return RayBasicDeduplicator.EMPTY_HASH_VALUE # load videos loaded_video_keys = sample[self.video_key] sample, videos = load_data_with_context(sample, context, loaded_video_keys, load_video) # compute hash md5_hash = hashlib.md5() for key in videos: # consider the multi stream of video in one container for packet in videos[key].demux(): if packet.stream.type == 'video': md5_hash.update(bytes(packet)) for key in videos: close_video(videos[key]) return md5_hash.hexdigest()