Source code for data_juicer.ops.mapper.video_extract_frames_mapper

import json
import os
import os.path as osp

from pydantic import PositiveInt

from data_juicer.utils.constant import Fields
from data_juicer.utils.file_utils import dict_to_hash
from data_juicer.utils.mm_utils import (
    SpecialTokens, close_video, extract_key_frames,
    extract_key_frames_by_seconds, extract_video_frames_uniformly,
    extract_video_frames_uniformly_by_seconds, load_data_with_context,
    load_video)

from ..base_op import OPERATORS, Mapper
from ..op_fusion import LOADED_VIDEOS

OP_NAME = 'video_extract_frames_mapper'


[docs] @OPERATORS.register_module(OP_NAME) @LOADED_VIDEOS.register_module(OP_NAME) class VideoExtractFramesMapper(Mapper): """Mapper to extract frames from video files according to specified methods. Extracted Frames Data Format: The data format for the extracted frames is a dictionary mapping video key to extracted frames directory where the extracted frames are saved. The dictionary follows the structure: { "video_key_1": "/${frame_dir}/video_key_1_filename/", "video_key_2": "/${frame_dir}/video_key_2_filename/", ... } """ _batched_op = True
[docs] def __init__( self, frame_sampling_method: str = 'all_keyframes', frame_num: PositiveInt = 3, duration: float = 0, frame_dir: str = None, frame_key=Fields.video_frames, *args, **kwargs, ): """ Initialization method. :param frame_sampling_method: sampling method of extracting frame videos from the videos. Should be one of ["all_keyframes", "uniform"]. The former one extracts all key frames (the number of which depends on the duration of the video) and the latter one extract specified number of frames uniformly from the video. If "duration" > 0, frame_sampling_method acts on every segment. Default: "all_keyframes". :param frame_num: the number of frames to be extracted uniformly from the video. Only works when frame_sampling_method is "uniform". If it's 1, only the middle frame will be extracted. If it's 2, only the first and the last frames will be extracted. If it's larger than 2, in addition to the first and the last frames, other frames will be extracted uniformly within the video duration. If "duration" > 0, frame_num is the number of frames per segment. :param duration: The duration of each segment in seconds. If 0, frames are extracted from the entire video. If duration > 0, the video is segmented into multiple segments based on duration, and frames are extracted from each segment. :param frame_dir: Output directory to save extracted frames. If None, a default directory based on the video file path is used. :param frame_key: The name of field to save generated frames info. :param args: extra args :param kwargs: extra args """ super().__init__(*args, **kwargs) self._init_parameters = self.remove_extra_parameters(locals()) if frame_sampling_method not in ['all_keyframes', 'uniform']: raise ValueError( f'Frame sampling method ' f'[{frame_sampling_method}] is not supported. ' f'Can only be one of ["all_keyframes", "uniform"].') self.frame_dir = frame_dir self.frame_sampling_method = frame_sampling_method self.frame_num = frame_num self.duration = duration self.frame_key = frame_key self.frame_fname_template = 'frame_{}.jpg'
def _get_default_frame_dir(self, original_filepath): original_dir = os.path.dirname(original_filepath) dir_token = f'/{Fields.multimodal_data_output_dir}/' if dir_token in original_dir: original_dir = original_dir.split(dir_token)[0] saved_dir = os.path.join( original_dir, f'{Fields.multimodal_data_output_dir}/{OP_NAME}') original_filename = osp.splitext(osp.basename(original_filepath))[0] hash_val = dict_to_hash(self._init_parameters) return osp.join(saved_dir, f'{original_filename}__dj_hash_#{hash_val}#')
[docs] def process_single(self, sample, context=False): # check if it's generated already if self.frame_key in sample: return sample # there is no videos in this sample if self.video_key not in sample or not sample[self.video_key]: return [] # load videos loaded_video_keys = sample[self.video_key] sample, videos = load_data_with_context(sample, context, loaded_video_keys, load_video) video_to_frame_dir = {} text = sample[self.text_key] offset = 0 for chunk in text.split(SpecialTokens.eoc): video_count = chunk.count(SpecialTokens.video) # no video or no text if video_count == 0 or len(chunk) == 0: continue else: for video_key in loaded_video_keys[offset:offset + video_count]: video = videos[video_key] # extract frame videos if self.frame_sampling_method == 'all_keyframes': if self.duration: frames = extract_key_frames_by_seconds( video, self.duration) else: frames = extract_key_frames(video) elif self.frame_sampling_method == 'uniform': if self.duration: frames = extract_video_frames_uniformly_by_seconds( video, self.frame_num, duration=self.duration) else: frames = extract_video_frames_uniformly( video, self.frame_num) else: raise ValueError(f'Not support sampling method \ `{self.frame_sampling_method}`.') frames = [frame.to_image() for frame in frames] if self.frame_dir: frame_dir = osp.join( self.frame_dir, osp.splitext(osp.basename(video_key))[0]) else: # video path as frames directory frame_dir = self._get_default_frame_dir(video_key) os.makedirs(frame_dir, exist_ok=True) video_to_frame_dir[video_key] = frame_dir for i, frame in enumerate(frames): frame_path = osp.join( frame_dir, self.frame_fname_template.format(i)) if not os.path.exists(frame_path): frame.save(frame_path) offset += video_count if not context: for vid_key in videos: close_video(videos[vid_key]) sample[self.frame_key] = json.dumps(video_to_frame_dir) return sample