[docs]@OPERATORS.register_module(OP_NAME)@LOADED_VIDEOS.register_module(OP_NAME)classVideoSplitByDurationMapper(Mapper):"""Mapper to split video by duration. """_batched_op=True
[docs]def__init__(self,split_duration:float=10,min_last_split_duration:float=0,keep_original_sample:bool=True,*args,**kwargs):""" Initialization method. :param split_duration: duration of each video split in seconds. :param min_last_split_duration: The minimum allowable duration in seconds for the last video split. If the duration of the last split is less than this value, it will be discarded. :param keep_original_sample: whether to keep the original sample. If it's set to False, there will be only cut sample in the final datasets and the original sample will be removed. It's True in default. :param args: extra args :param kwargs: extra args """super().__init__(*args,**kwargs)self._init_parameters=self.remove_extra_parameters(locals())self.split_duration=split_durationself.min_last_split_duration=min_last_split_durationself.keep_original_sample=keep_original_sampleself.extra_args=kwargs
def_process_single_sample(self,sample):# there is no video in this sampleifself.video_keynotinsampleorsample[self.video_key]isNoneorlen(sample[self.video_key])==0:sample[Fields.source_file]=[]return[]ifFields.source_filenotinsampleornotsample[Fields.source_file]:sample[Fields.source_file]=sample[self.video_key]# the split resultssplit_sample=copy.deepcopy(sample)split_sample[self.text_key]=''split_sample[Fields.source_file]=[]# load all video(s)loaded_video_keys=sample[self.video_key]videos={}forloaded_video_keyinloaded_video_keys:ifloaded_video_keynotinvideos:# avoid loading the same videosvideo=load_video(loaded_video_key)videos[loaded_video_key]=videosplit_video_keys=[]offset=0# split each video chunk by chunkforchunkinsample[self.text_key].split(SpecialTokens.eoc):# skip empty chunks or contents after the last eoc tokenifnotchunk.strip():continueelse:video_count=chunk.count(SpecialTokens.video)place_holders=[]forvideo_keyinloaded_video_keys[offset:offset+video_count]:video=videos[video_key]new_video_keys=self.split_videos_by_duration(video_key,video)close_video(video)split_video_keys.extend(new_video_keys)place_holders.append(SpecialTokens.video*len(new_video_keys))split_sample[Fields.source_file].extend([video_key]*len(new_video_keys))# insert the generated text according to given modereplacer_function=create_replacer(place_holders)new_split_text_per_chunk=re.sub(SpecialTokens.video,replacer_function,chunk)split_sample[self.text_key]+=f'{new_split_text_per_chunk}{SpecialTokens.eoc}'# noqa: E501offset+=video_countsplit_sample[self.video_key]=split_video_keysreturn[split_sample]
[docs]defprocess_batched(self,samples):# reconstruct samples from "dict of lists" to "list of dicts"reconstructed_samples=[]foriinrange(len(samples[self.text_key])):reconstructed_samples.append({key:samples[key][i]forkeyinsamples})samples_after_split=[]# do split for each sample within the batchforori_sampleinreconstructed_samples:ifself.keep_original_sample:samples_after_split.append(ori_sample)generated_samples=self._process_single_sample(ori_sample)iflen(generated_samples)!=0:samples_after_split.extend(generated_samples)# reconstruct samples from "list of dicts" to "dict of lists"keys=samples_after_split[0].keys()res_samples={}forkeyinkeys:res_samples[key]=[s[key]forsinsamples_after_split]returnres_samples