[docs]@OPERATORS.register_module(OP_NAME)@LOADED_VIDEOS.register_module(OP_NAME)classRayVideoDeduplicator(RayBasicDeduplicator):"""Deduplicates samples at document-level using exact matching of videos in Ray distributed mode. This operator computes the MD5 hash of video streams in each sample and compares them to identify duplicates. It uses Ray distributed mode for parallel processing. The hash is computed by demuxing the video streams and updating the MD5 hash with each video packet. If a sample does not contain a valid video, it is assigned an empty hash value. The operator supports 'ray_actor' or 'redis' backends for deduplication."""
[docs]def__init__(self,backend:str="ray_actor",redis_address:str="redis://localhost:6379",*args,**kwargs):""" Initialization. :param backend: the backend for dedup, either 'ray_actor' or 'redis' :param redis_address: the address of redis server :param args: extra args :param kwargs: extra args """super().__init__(backend=backend,redis_address=redis_address,*args,**kwargs)
[docs]defcalculate_hash(self,sample,context=False):ifself.video_keynotinsampleornotsample[self.video_key]:returnRayBasicDeduplicator.EMPTY_HASH_VALUE# load videosloaded_video_keys=sample[self.video_key]sample,videos=load_data_with_context(sample,context,loaded_video_keys,load_video)# compute hashmd5_hash=hashlib.md5()forkeyinvideos:# consider the multi stream of video in one containerforpacketinvideos[key].demux():ifpacket.stream.type=="video":md5_hash.update(bytes(packet))forkeyinvideos:close_video(videos[key])returnmd5_hash.hexdigest()