[docs]@OPERATORS.register_module(NAME)classVideoCaptioningFromAudioMapper(Mapper):"""Mapper to caption a video according to its audio streams based on Qwen-Audio model. """_accelerator='cuda'_batched_op=True
[docs]def__init__(self,keep_original_sample:bool=True,*args,**kwargs):""" Initialization method. :param keep_original_sample: whether to keep the original sample. If it's set to False, there will be only captioned sample in the final datasets and the original sample will be removed. It's True in default. :param args: extra args :param kwargs: extra args """kwargs.setdefault('mem_required','30GB')super().__init__(*args,**kwargs)LazyLoader.check_packages(['transformers','transformers_stream_generator','einops','accelerate','tiktoken'])self.keep_original_sample=keep_original_sampleself.extra_args=kwargsself._hf_qwen_audio='Qwen/Qwen-Audio'self.model_key=prepare_model(model_type='huggingface',pretrained_model_name_or_path=self._hf_qwen_audio,trust_remote_code=True,)self.prompt='<|startoftranscription|><|unknown|><|caption|>' \
'<|unknown|><|notimestamps|><|wo_itn|>'self.response_remove_pattern=re.compile(r'<\|.*?\|>')
def_process_single_sample(self,sample,rank=None):# there is no video in this sampleifself.video_keynotinsampleornotsample[self.video_key]:return[]# get paths of all video(s)loaded_video_keys=sample[self.video_key]# get modelsmodel,processor=get_model(self.model_key,rank,self.use_cuda())offset=0captioned_sample=copy.deepcopy(sample)# generate for each video chunk by chunkcaptioned_texts=''left_video_keys=[]forchunkinsample[self.text_key].split(SpecialTokens.eoc):# skip empty chunksifnotchunk.strip():continuevid_count=chunk.count(SpecialTokens.video)captioned_text_list=[]forvideoinloaded_video_keys[offset:offset+vid_count]:# only extract audio for index 0 for now_,_,valid_indexes=extract_audio_from_video(video,video+'.mp3',stream_indexes=[0])iflen(valid_indexes)==0:# there is no valid audio streams. Skip!continueextracted_audio_path=video+'_0.mp3'query=f'<audio>{extracted_audio_path}</audio>{self.prompt}'# start to inferenceaudio_info=processor.process_audio(query)inputs=processor(query,return_tensors='pt',audio_info=audio_info).to(model.device)outputs=model.generate(**inputs,audio_info=audio_info)response=processor.decode(outputs[0],skip_special_tokens=True,audio_info=audio_info)# remove audio pathresponse=response.replace(extracted_audio_path,'').replace('<audio>','').replace('</audio>','')response=self.response_remove_pattern.sub('',response).strip()ifresponse=='':# generate failure. Skip!continuecaptioned_text_list.append(f'{SpecialTokens.video}{response}')left_video_keys.append(video)# remove extracted audio filesos.remove(extracted_audio_path)offset+=vid_countcaptioned_text=''.join(captioned_text_list)# add special tokenscaptioned_texts+=f'{captioned_text}{SpecialTokens.eoc}'captioned_sample[self.text_key]=captioned_textscaptioned_sample[self.video_key]=left_video_keysreturn[captioned_sample]
[docs]defprocess_batched(self,samples,rank=None):# reconstruct samples from "dict of lists" to "list of dicts"reconstructed_samples=[]foriinrange(len(samples[self.text_key])):reconstructed_samples.append({key:samples[key][i]forkeyinsamples})samples_after_split=[]# do split for each sample within the batchforori_sampleinreconstructed_samples:ifself.keep_original_sample:samples_after_split.append(ori_sample)generated_samples=self._process_single_sample(ori_sample,rank=rank)iflen(generated_samples)!=0:samples_after_split.extend(generated_samples)# reconstruct samples from "list of dicts" to "dict of lists"keys=samples_after_split[0].keys()res_samples={}forkeyinkeys:res_samples[key]=[s[key]forsinsamples_after_split]returnres_samples