[docs]@OPERATORS.register_module(OP_NAME)@LOADED_VIDEOS.register_module(OP_NAME)classVideoSplitByDurationMapper(Mapper):"""Splits videos into segments based on a specified duration. This operator splits each video in the dataset into smaller segments, each with a fixed duration. The last segment is discarded if its duration is less than the specified minimum last split duration. The original sample can be kept or removed based on the `keep_original_sample` parameter. The generated video files are saved in the specified directory or, if not provided, in the same directory as the input files. The key metric for this operation is the duration of each segment, which is character-based (seconds). - Splits videos into segments of a specified duration. - Discards the last segment if it is shorter than the minimum allowed duration. - Keeps or removes the original sample based on the `keep_original_sample` parameter. - Saves the generated video files in the specified directory or the input file's directory. - Uses the duration in seconds to determine the segment boundaries."""_batched_op=True
[docs]def__init__(self,split_duration:float=10,min_last_split_duration:float=0,keep_original_sample:bool=True,save_dir:str=None,*args,**kwargs,):""" Initialization method. :param split_duration: duration of each video split in seconds. :param min_last_split_duration: The minimum allowable duration in seconds for the last video split. If the duration of the last split is less than this value, it will be discarded. :param keep_original_sample: whether to keep the original sample. If it's set to False, there will be only cut sample in the final datasets and the original sample will be removed. It's True in default. :param save_dir: The directory where generated video files will be stored. If not specified, outputs will be saved in the same directory as their corresponding input files. This path can alternatively be defined by setting the `DJ_PRODUCED_DATA_DIR` environment variable. :param args: extra args :param kwargs: extra args """super().__init__(*args,**kwargs)self._init_parameters=self.remove_extra_parameters(locals())self._init_parameters.pop("save_dir",None)self.split_duration=split_durationself.min_last_split_duration=min_last_split_durationself.keep_original_sample=keep_original_sampleself.extra_args=kwargsself.save_dir=save_dir
def_process_single_sample(self,sample):# there is no video in this sampleifself.video_keynotinsampleorsample[self.video_key]isNoneorlen(sample[self.video_key])==0:sample[Fields.source_file]=[]return[]ifFields.source_filenotinsampleornotsample[Fields.source_file]:sample[Fields.source_file]=sample[self.video_key]# the split resultssplit_sample=copy.deepcopy(sample)split_sample[self.text_key]=""split_sample[Fields.source_file]=[]# load all video(s)loaded_video_keys=sample[self.video_key]videos={}forloaded_video_keyinloaded_video_keys:ifloaded_video_keynotinvideos:# avoid loading the same videosvideo=load_video(loaded_video_key)videos[loaded_video_key]=videosplit_video_keys=[]offset=0# split each video chunk by chunkforchunkinsample[self.text_key].split(SpecialTokens.eoc):# skip empty chunks or contents after the last eoc tokenifnotchunk.strip():continueelse:video_count=chunk.count(SpecialTokens.video)place_holders=[]forvideo_keyinloaded_video_keys[offset:offset+video_count]:video=videos[video_key]new_video_keys=self.split_videos_by_duration(video_key,video)close_video(video)split_video_keys.extend(new_video_keys)place_holders.append(SpecialTokens.video*len(new_video_keys))split_sample[Fields.source_file].extend([video_key]*len(new_video_keys))# insert the generated text according to given modereplacer_function=create_replacer(place_holders)new_split_text_per_chunk=re.sub(SpecialTokens.video,replacer_function,chunk)split_sample[self.text_key]+=f"{new_split_text_per_chunk}{SpecialTokens.eoc}"# noqa: E501offset+=video_countsplit_sample[self.video_key]=split_video_keysreturn[split_sample]
[docs]defprocess_batched(self,samples):# reconstruct samples from "dict of lists" to "list of dicts"reconstructed_samples=[]foriinrange(len(samples[self.text_key])):reconstructed_samples.append({key:samples[key][i]forkeyinsamples})samples_after_split=[]# do split for each sample within the batchforori_sampleinreconstructed_samples:ifself.keep_original_sample:samples_after_split.append(ori_sample)generated_samples=self._process_single_sample(ori_sample)iflen(generated_samples)!=0:samples_after_split.extend(generated_samples)# reconstruct samples from "list of dicts" to "dict of lists"keys=samples_after_split[0].keys()res_samples={}forkeyinkeys:res_samples[key]=[s[key]forsinsamples_after_split]returnres_samples