[docs]@OPERATORS.register_module(OP_NAME)classVideoFFmpegWrappedMapper(Mapper):"""Simple wrapper for FFmpeg video filters. """
[docs]def__init__(self,filter_name:Optional[str]=None,filter_kwargs:Optional[Dict]=None,global_args:Optional[List[str]]=None,capture_stderr:bool=True,overwrite_output:bool=True,*args,**kwargs,):""" Initialization method. :param filter_name: ffmpeg video filter name. :param filter_kwargs: keyword-arguments passed to ffmpeg filter. :param global_args: list-arguments passed to ffmpeg command-line. :param capture_stderr: whether to capture stderr. :param overwrite_output: whether to overwrite output file. :param args: extra args :param kwargs: extra args """super().__init__(*args,**kwargs)self._init_parameters=self.remove_extra_parameters(locals())self.filter_name=filter_nameself.filter_kwargs=filter_kwargsself.global_args=global_argsself.capture_stderr=capture_stderrself.overwrite_output=overwrite_output
[docs]defprocess_single(self,sample):# there is no video in this sampleifself.video_keynotinsampleornotsample[self.video_key]:sample[Fields.source_file]=[]returnsampleifFields.source_filenotinsampleornotsample[Fields.source_file]:sample[Fields.source_file]=sample[self.video_key]ifself.filter_nameisNone:returnsampleloaded_video_keys=sample[self.video_key]processed={}forvideo_keyinloaded_video_keys:ifvideo_keyinprocessed:continueoutput_key=transfer_filename(video_key,OP_NAME,**self._init_parameters)stream=(ffmpeg.input(video_key).filter(self.filter_name,**self.filter_kwargs).output(output_key))ifself.global_argsisnotNone:stream=stream.global_args(*self.global_args)stream.run(capture_stderr=self.capture_stderr,overwrite_output=self.overwrite_output)processed[video_key]=output_key# when the file is modified, its source file needs to be updated.fori,valueinenumerate(loaded_video_keys):ifsample[Fields.source_file][i]!=value:ifprocessed[value]!=value:sample[Fields.source_file][i]=valuesample[self.video_key]=[processed[key]forkeyinloaded_video_keys]returnsample