Source code for data_juicer.ops.mapper.video_split_by_scene_mapper

import math
import re
from itertools import chain

from pydantic import NonNegativeFloat, NonNegativeInt

from data_juicer.utils.constant import Fields
from data_juicer.utils.file_utils import add_suffix_to_filename, transfer_filename
from data_juicer.utils.lazy_loader import LazyLoader
from data_juicer.utils.mm_utils import SpecialTokens

from ..base_op import OPERATORS, Mapper

scenedetect = LazyLoader("scenedetect")

OP_NAME = "video_split_by_scene_mapper"


[docs] def replace_func(match, scene_counts_iter): try: count = next(scene_counts_iter) return SpecialTokens.video * count except StopIteration: return match.group(0)
[docs] @OPERATORS.register_module(OP_NAME) class VideoSplitBySceneMapper(Mapper): """Mapper to cut videos into scene clips.""" # Define shared detector keys and their properties avaliable_detectors = { "ContentDetector": ["weights", "luma_only", "kernel_size"], "AdaptiveDetector": [ "window_width", "min_content_val", "weights", "luma_only", "kernel_size", "video_manager", "min_delta_hsv", ], "ThresholdDetector": ["fade_bias", "add_final_scene", "method", "block_size"], }
[docs] def __init__( self, detector: str = "ContentDetector", threshold: NonNegativeFloat = 27.0, min_scene_len: NonNegativeInt = 15, show_progress: bool = False, *args, **kwargs, ): """ Initialization method. :param detector: Algorithm from `scenedetect.detectors`. Should be one of ['ContentDetector', 'ThresholdDetector', 'AdaptiveDetector`]. :param threshold: Threshold passed to the detector. :param min_scene_len: Minimum length of any scene. :param show_progress: Whether to show progress from scenedetect. :param args: extra args :param kwargs: extra args """ super().__init__(*args, **kwargs) self._init_parameters = self.remove_extra_parameters(locals()) if detector not in self.avaliable_detectors: raise ValueError( f"Scene detector {detector} is not supported. " f"Can only be one of {list(self.avaliable_detectors.keys())}" ) self.detector = detector self.threshold = threshold self.min_scene_len = min_scene_len self.show_progress = show_progress # prepare detector args avaliable_kwargs = self.avaliable_detectors[self.detector] self.detector_class = getattr(scenedetect.detectors, self.detector) self.detector_kwargs = {key: kwargs[key] for key in avaliable_kwargs if key in kwargs}
[docs] def process_single(self, sample, context=False): # there is no video in this sample if self.video_key not in sample or not sample[self.video_key]: sample[Fields.source_file] = [] return sample # load videos loaded_video_keys = sample[self.video_key] output_video_keys = {} scene_counts = {} for video_key in loaded_video_keys: # skip duplicate if video_key in output_video_keys: continue redirected_video_key = transfer_filename(video_key, OP_NAME, **self._init_parameters) output_template = add_suffix_to_filename(redirected_video_key, "_$SCENE_NUMBER") # detect scenes detector = self.detector_class(self.threshold, self.min_scene_len, **self.detector_kwargs) scene_list = scenedetect.detect(video_key, detector, show_progress=self.show_progress, start_in_scene=True) scene_counts[video_key] = len(scene_list) if len(scene_list) > 1: # sync with split_video_ffmpeg internal scene_num_format = f"%0{max(3, math.floor(math.log(len(scene_list), 10)) + 1)}d" # noqa: E501 output_video_keys[video_key] = [ output_template.replace("$SCENE_NUMBER", scene_num_format % (i + 1)) for i in range(len(scene_list)) ] # split video into clips scenedetect.split_video_ffmpeg( input_video_path=video_key, scene_list=scene_list, output_file_template=output_template, show_progress=self.show_progress, ) else: output_video_keys[video_key] = [video_key] # replace split video tokens if self.text_key in sample: scene_counts_iter = iter([scene_counts[key] for key in loaded_video_keys]) updated_text = re.sub( re.escape(SpecialTokens.video), lambda match: replace_func(match, scene_counts_iter), sample[self.text_key], ) sample[self.text_key] = updated_text # when the file is modified, its source file needs to be updated. sample[Fields.source_file] = [] for value in loaded_video_keys: sample[Fields.source_file].extend([value] * len(output_video_keys[value])) sample[self.video_key] = list(chain.from_iterable([output_video_keys[key] for key in loaded_video_keys])) return sample