[docs]@OPERATORS.register_module(OP_NAME)classVideoFFmpegWrappedMapper(Mapper):"""Simple wrapper for FFmpeg video filters."""
[docs]def__init__(self,filter_name:Optional[str]=None,filter_kwargs:Optional[Dict]=None,global_args:Optional[List[str]]=None,capture_stderr:bool=True,overwrite_output:bool=True,save_dir:str=None,*args,**kwargs,):""" Initialization method. :param filter_name: ffmpeg video filter name. :param filter_kwargs: keyword-arguments passed to ffmpeg filter. :param global_args: list-arguments passed to ffmpeg command-line. :param capture_stderr: whether to capture stderr. :param overwrite_output: whether to overwrite output file. :param save_dir: The directory where generated video files will be stored. If not specified, outputs will be saved in the same directory as their corresponding input files. This path can alternatively be defined by setting the `DJ_PRODUCED_DATA_DIR` environment variable. :param args: extra args :param kwargs: extra args """super().__init__(*args,**kwargs)self._init_parameters=self.remove_extra_parameters(locals())self._init_parameters.pop("save_dir",None)self.filter_name=filter_nameself.filter_kwargs=filter_kwargsself.global_args=global_argsself.capture_stderr=capture_stderrself.overwrite_output=overwrite_outputself.save_dir=save_dir
[docs]defprocess_single(self,sample):# there is no video in this sampleifself.video_keynotinsampleornotsample[self.video_key]:sample[Fields.source_file]=[]returnsampleifFields.source_filenotinsampleornotsample[Fields.source_file]:sample[Fields.source_file]=sample[self.video_key]ifself.filter_nameisNone:returnsampleloaded_video_keys=sample[self.video_key]processed={}forvideo_keyinloaded_video_keys:ifvideo_keyinprocessed:continueoutput_key=transfer_filename(video_key,OP_NAME,self.save_dir,**self._init_parameters)stream=ffmpeg.input(video_key).filter(self.filter_name,**self.filter_kwargs).output(output_key)ifself.global_argsisnotNone:stream=stream.global_args(*self.global_args)stream.run(capture_stderr=self.capture_stderr,overwrite_output=self.overwrite_output)processed[video_key]=output_key# when the file is modified, its source file needs to be updated.fori,valueinenumerate(loaded_video_keys):ifsample[Fields.source_file][i]!=value:ifprocessed[value]!=value:sample[Fields.source_file][i]=valuesample[self.video_key]=[processed[key]forkeyinloaded_video_keys]returnsample