[docs]@OPERATORS.register_module(OP_NAME)@LOADED_VIDEOS.register_module(OP_NAME)classVideoDeduplicator(Deduplicator):""" Deduplicator to deduplicate samples at document-level using exact matching of videos between documents. """
[docs]def__init__(self,consider_text:bool=False,*args,**kwargs):""" Initialization. :param consider_text: whether to consider text hash together with video hash when applying deduplication. :param args: extra args :param kwargs: extra args """super().__init__(*args,**kwargs)self.consider_text=consider_textself.text_dedup_op=Noneifself.consider_text:self.text_dedup_op=DocumentDeduplicator(**kwargs)
[docs]defcompute_hash(self,sample,context=False):# get hash of text firstifself.consider_text:sample=self.text_dedup_op.compute_hash(sample)# check if it's computed alreadyifHashKeys.videohashinsample:returnsample# there is no video in this samplesample[HashKeys.videohash]=''ifself.video_keynotinsampleornotsample[self.video_key]:returnsample# load videosloaded_video_keys=sample[self.video_key]sample,videos=load_data_with_context(sample,context,loaded_video_keys,load_video)# compute hashmd5_hash=hashlib.md5()forkeyinvideos:# consider the multi stream of video in one containerforpacketinvideos[key].demux():ifpacket.stream.type=='video':md5_hash.update(bytes(packet))forkeyinvideos:close_video(videos[key])sample[HashKeys.videohash]=md5_hash.hexdigest()returnsample
[docs]defprocess(self,dataset,show_num=0):""" For doc-level, dataset --> dataset. :param dataset: input dataset :param show_num: number of traced samples used when tracer is open. :return: deduplicated dataset and the sampled duplicate pairs. """# no need to deduplicate because too few samplesiflen(dataset)<=1:returndataset,{}dup_hashes=Noneifshow_num>0:# sample duplicate pairsifself.consider_text:hash2ids:Dict[Tuple[int,int],Set[int]]=defaultdict(set)hashes=zip(dataset[HashKeys.videohash],dataset[HashKeys.hash])else:hash2ids:Dict[int,Set[int]]=defaultdict(set)hashes=dataset[HashKeys.videohash]forsid,hash_valinenumerate(hashes):ifhash_val:hash2ids[hash_val].add(sid)dup_samples=sorted(list(hash2ids.items()),key=lambdax:len(x[1]),reverse=True)dup_hashes=set([item[0]foritemindup_samplesiflen(item[1])>1][:show_num])def_filter_dup_helper(sample,hashes):ifself.consider_text:hash=(sample[HashKeys.videohash],sample[HashKeys.hash])else:hash=sample[HashKeys.videohash]ifnothash:returnTrueifshow_num>0andhashindup_hashes \
andlen(dup_pairs[hash])<2:# tracer is open and not enough duplicate sample pairsdup_pairs[hash].append(sample)ifhashinhashes:returnFalseelse:hashes.add(hash)returnTruehashes=set()dup_pairs={hash_v:[]forhash_vindup_hashes}ifdup_hasheselse{}dataset=dataset.filter(_filter_dup_helper,fn_kwargs=dict(hashes=hashes),load_from_cache_file=Falseifshow_num>0elseTrue)# num_proc=1returndataset,dup_pairs