Source code for data_juicer.ops.mapper.video_split_by_scene_mapper

import math
import re
from itertools import chain

from pydantic import NonNegativeFloat, NonNegativeInt

from data_juicer.utils.constant import Fields
from data_juicer.utils.file_utils import add_suffix_to_filename, transfer_filename
from data_juicer.utils.lazy_loader import LazyLoader
from data_juicer.utils.mm_utils import SpecialTokens

from ..base_op import OPERATORS, Mapper

scenedetect = LazyLoader("scenedetect")

OP_NAME = "video_split_by_scene_mapper"


[docs] def replace_func(match, scene_counts_iter): try: count = next(scene_counts_iter) return SpecialTokens.video * count except StopIteration: return match.group(0)
[docs] @OPERATORS.register_module(OP_NAME) class VideoSplitBySceneMapper(Mapper): """Mapper to cut videos into scene clips.""" # Define shared detector keys and their properties avaliable_detectors = { "ContentDetector": ["weights", "luma_only", "kernel_size"], "AdaptiveDetector": [ "window_width", "min_content_val", "weights", "luma_only", "kernel_size", "video_manager", "min_delta_hsv", ], "ThresholdDetector": ["fade_bias", "add_final_scene", "method", "block_size"], }
[docs] def __init__( self, detector: str = "ContentDetector", threshold: NonNegativeFloat = 27.0, min_scene_len: NonNegativeInt = 15, show_progress: bool = False, save_dir: str = None, *args, **kwargs, ): """ Initialization method. :param detector: Algorithm from `scenedetect.detectors`. Should be one of ['ContentDetector', 'ThresholdDetector', 'AdaptiveDetector`]. :param threshold: Threshold passed to the detector. :param min_scene_len: Minimum length of any scene. :param show_progress: Whether to show progress from scenedetect. :param save_dir: The directory where generated video files will be stored. If not specified, outputs will be saved in the same directory as their corresponding input files. This path can alternatively be defined by setting the `DJ_PRODUCED_DATA_DIR` environment variable. :param args: extra args :param kwargs: extra args """ super().__init__(*args, **kwargs) self._init_parameters = self.remove_extra_parameters(locals()) self._init_parameters.pop("save_dir", None) if detector not in self.avaliable_detectors: raise ValueError( f"Scene detector {detector} is not supported. " f"Can only be one of {list(self.avaliable_detectors.keys())}" ) self.detector = detector self.threshold = threshold self.min_scene_len = min_scene_len self.show_progress = show_progress self.save_dir = save_dir # prepare detector args avaliable_kwargs = self.avaliable_detectors[self.detector] self.detector_class = getattr(scenedetect.detectors, self.detector) self.detector_kwargs = {key: kwargs[key] for key in avaliable_kwargs if key in kwargs}
[docs] def process_single(self, sample, context=False): # there is no video in this sample if self.video_key not in sample or not sample[self.video_key]: sample[Fields.source_file] = [] return sample # load videos loaded_video_keys = sample[self.video_key] output_video_keys = {} scene_counts = {} for video_key in loaded_video_keys: # skip duplicate if video_key in output_video_keys: continue redirected_video_key = transfer_filename(video_key, OP_NAME, self.save_dir, **self._init_parameters) output_template = add_suffix_to_filename(redirected_video_key, "_$SCENE_NUMBER") # detect scenes detector = self.detector_class(self.threshold, self.min_scene_len, **self.detector_kwargs) scene_list = scenedetect.detect(video_key, detector, show_progress=self.show_progress, start_in_scene=True) scene_counts[video_key] = len(scene_list) if len(scene_list) > 1: # sync with split_video_ffmpeg internal scene_num_format = f"%0{max(3, math.floor(math.log(len(scene_list), 10)) + 1)}d" # noqa: E501 output_video_keys[video_key] = [ output_template.replace("$SCENE_NUMBER", scene_num_format % (i + 1)) for i in range(len(scene_list)) ] # split video into clips scenedetect.split_video_ffmpeg( input_video_path=video_key, scene_list=scene_list, output_file_template=output_template, show_progress=self.show_progress, ) else: output_video_keys[video_key] = [video_key] # replace split video tokens if self.text_key in sample: scene_counts_iter = iter([scene_counts[key] for key in loaded_video_keys]) updated_text = re.sub( re.escape(SpecialTokens.video), lambda match: replace_func(match, scene_counts_iter), sample[self.text_key], ) sample[self.text_key] = updated_text # when the file is modified, its source file needs to be updated. sample[Fields.source_file] = [] for value in loaded_video_keys: sample[Fields.source_file].extend([value] * len(output_video_keys[value])) sample[self.video_key] = list(chain.from_iterable([output_video_keys[key] for key in loaded_video_keys])) return sample