import math
import re
from itertools import chain
from pydantic import NonNegativeFloat, NonNegativeInt
from data_juicer.utils.constant import Fields
from data_juicer.utils.file_utils import (add_suffix_to_filename,
transfer_filename)
from data_juicer.utils.lazy_loader import LazyLoader
from data_juicer.utils.mm_utils import SpecialTokens
from ..base_op import OPERATORS, Mapper
scenedetect = LazyLoader('scenedetect', 'scenedetect')
OP_NAME = 'video_split_by_scene_mapper'
[docs]
def replace_func(match, scene_counts_iter):
try:
count = next(scene_counts_iter)
return SpecialTokens.video * count
except StopIteration:
return match.group(0)
[docs]
@OPERATORS.register_module(OP_NAME)
class VideoSplitBySceneMapper(Mapper):
"""Mapper to cut videos into scene clips.
"""
# Define shared detector keys and their properties
avaliable_detectors = {
'ContentDetector': ['weights', 'luma_only', 'kernel_size'],
'AdaptiveDetector': [
'window_width', 'min_content_val', 'weights', 'luma_only',
'kernel_size', 'video_manager', 'min_delta_hsv'
],
'ThresholdDetector':
['fade_bias', 'add_final_scene', 'method', 'block_size']
}
[docs]
def __init__(self,
detector: str = 'ContentDetector',
threshold: NonNegativeFloat = 27.0,
min_scene_len: NonNegativeInt = 15,
show_progress: bool = False,
*args,
**kwargs):
"""
Initialization method.
:param detector: Algorithm from `scenedetect.detectors`. Should be one
of ['ContentDetector', 'ThresholdDetector', 'AdaptiveDetector`].
:param threshold: Threshold passed to the detector.
:param min_scene_len: Minimum length of any scene.
:param show_progress: Whether to show progress from scenedetect.
:param args: extra args
:param kwargs: extra args
"""
super().__init__(*args, **kwargs)
self._init_parameters = self.remove_extra_parameters(locals())
if detector not in self.avaliable_detectors:
raise ValueError(
f'Scene detector {detector} is not supported. '
f'Can only be one of {list(self.avaliable_detectors.keys())}')
self.detector = detector
self.threshold = threshold
self.min_scene_len = min_scene_len
self.show_progress = show_progress
# prepare detector args
avaliable_kwargs = self.avaliable_detectors[self.detector]
self.detector_class = getattr(scenedetect.detectors, self.detector)
self.detector_kwargs = {
key: kwargs[key]
for key in avaliable_kwargs if key in kwargs
}
[docs]
def process_single(self, sample, context=False):
# there is no video in this sample
if self.video_key not in sample or not sample[self.video_key]:
sample[Fields.source_file] = []
return sample
# load videos
loaded_video_keys = sample[self.video_key]
output_video_keys = {}
scene_counts = {}
for video_key in loaded_video_keys:
# skip duplicate
if video_key in output_video_keys:
continue
redirected_video_key = transfer_filename(video_key, OP_NAME,
**self._init_parameters)
output_template = add_suffix_to_filename(redirected_video_key,
'_$SCENE_NUMBER')
# detect scenes
detector = self.detector_class(self.threshold, self.min_scene_len,
**self.detector_kwargs)
scene_list = scenedetect.detect(video_key,
detector,
show_progress=self.show_progress,
start_in_scene=True)
scene_counts[video_key] = len(scene_list)
if len(scene_list) > 1:
# sync with split_video_ffmpeg internal
scene_num_format = f'%0{max(3, math.floor(math.log(len(scene_list), 10)) + 1)}d' # noqa: E501
output_video_keys[video_key] = [
output_template.replace('$SCENE_NUMBER',
scene_num_format % (i + 1))
for i in range(len(scene_list))
]
# split video into clips
scenedetect.split_video_ffmpeg(
input_video_path=video_key,
scene_list=scene_list,
output_file_template=output_template,
show_progress=self.show_progress)
else:
output_video_keys[video_key] = [video_key]
# replace splited video tokens
if self.text_key in sample:
scene_counts_iter = iter(
[scene_counts[key] for key in loaded_video_keys])
updated_text = re.sub(
re.escape(SpecialTokens.video),
lambda match: replace_func(match, scene_counts_iter),
sample[self.text_key])
sample[self.text_key] = updated_text
# when the file is modified, its source file needs to be updated.
sample[Fields.source_file] = []
for value in loaded_video_keys:
sample[Fields.source_file].extend([value] *
len(output_video_keys[value]))
sample[self.video_key] = list(
chain.from_iterable(
[output_video_keys[key] for key in loaded_video_keys]))
return sample