[文档]@OPERATORS.register_module(OP_NAME)@LOADED_VIDEOS.register_module(OP_NAME)classVideoDeduplicator(Deduplicator):"""Deduplicates samples at the document level using exact matching of videos. This operator computes a hash for each video in the sample and uses it to identify and remove duplicate documents. If `consider_text` is set to True, it also considers the text hash alongside the video hash for deduplication. The video hash is computed by hashing the video data, including all video streams in the container. The operator supports sampling and tracing of duplicate pairs when the `show_num` parameter is greater than 0. Important fields used for caching include 'videohash' and optionally 'hash' if text is considered."""
[文档]def__init__(self,consider_text:bool=False,*args,**kwargs):""" Initialization. :param consider_text: whether to consider text hash together with video hash when applying deduplication. :param args: extra args :param kwargs: extra args """super().__init__(*args,**kwargs)self.consider_text=consider_textself.text_dedup_op=Noneifself.consider_text:self.text_dedup_op=DocumentDeduplicator(**kwargs)
[文档]defcompute_hash(self,sample,context=False):# get hash of text firstifself.consider_text:sample=self.text_dedup_op.compute_hash(sample)# check if it's computed alreadyifHashKeys.videohashinsample:returnsample# there is no video in this samplesample[HashKeys.videohash]=""ifself.video_keynotinsampleornotsample[self.video_key]:returnsample# load videosloaded_video_keys=sample[self.video_key]sample,videos=load_data_with_context(sample,context,loaded_video_keys,load_video)# compute hashmd5_hash=hashlib.md5()forkeyinvideos:# consider the multi stream of video in one containerforpacketinvideos[key].demux():ifpacket.stream.type=="video":md5_hash.update(bytes(packet))forkeyinvideos:close_video(videos[key])sample[HashKeys.videohash]=md5_hash.hexdigest()returnsample
[文档]defprocess(self,dataset,show_num=0):""" For doc-level, dataset --> dataset. :param dataset: input dataset :param show_num: number of traced samples used when tracer is open. :return: deduplicated dataset and the sampled duplicate pairs. """# no need to deduplicate because too few samplesiflen(dataset)<=1:returndataset,{}dup_hashes=Noneifshow_num>0:# sample duplicate pairsifself.consider_text:hash2ids:Dict[Tuple[int,int],Set[int]]=defaultdict(set)hashes=zip(dataset[HashKeys.videohash],dataset[HashKeys.hash])else:hash2ids:Dict[int,Set[int]]=defaultdict(set)hashes=dataset[HashKeys.videohash]forsid,hash_valinenumerate(hashes):ifhash_val:hash2ids[hash_val].add(sid)dup_samples=sorted(list(hash2ids.items()),key=lambdax:len(x[1]),reverse=True)dup_hashes=set([item[0]foritemindup_samplesiflen(item[1])>1][:show_num])def_filter_dup_helper(sample,hashes):ifself.consider_text:hash=(sample[HashKeys.videohash],sample[HashKeys.hash])else:hash=sample[HashKeys.videohash]ifnothash:returnTrueifshow_num>0andhashindup_hashesandlen(dup_pairs[hash])<2:# tracer is open and not enough duplicate sample pairsdup_pairs[hash].append(sample)ifhashinhashes:returnFalseelse:hashes.add(hash)returnTruehashes=set()dup_pairs={hash_v:[]forhash_vindup_hashes}ifdup_hasheselse{}dataset=dataset.filter(_filter_dup_helper,fn_kwargs=dict(hashes=hashes),load_from_cache_file=Falseifshow_num>0elseTrue)# num_proc=1returndataset,dup_pairs