Source code for data_juicer.ops.mapper.video_face_blur_mapper

import os

import av
from PIL import ImageFilter

from data_juicer.utils.constant import Fields
from data_juicer.utils.file_utils import transfer_filename
from data_juicer.utils.lazy_loader import LazyLoader
from data_juicer.utils.mm_utils import (close_video, detect_faces,
                                        load_data_with_context, load_video,
                                        process_each_frame)
from data_juicer.utils.model_utils import get_model, prepare_model

from ..base_op import OPERATORS, UNFORKABLE, Mapper
from ..op_fusion import LOADED_VIDEOS

cv2 = LazyLoader('cv2', 'cv2')

OP_NAME = 'video_face_blur_mapper'


[docs]@UNFORKABLE.register_module(OP_NAME) @OPERATORS.register_module(OP_NAME) @LOADED_VIDEOS.register_module(OP_NAME) class VideoFaceBlurMapper(Mapper): """Mapper to blur faces detected in videos. """ _default_kwargs = { 'scaleFactor': 1.1, 'minNeighbors': 3, 'minSize': None, 'maxSize': None, }
[docs] def __init__(self, cv_classifier: str = '', blur_type: str = 'gaussian', radius: float = 2, *args, **kwargs): """ Initialization method. :param cv_classifier: OpenCV classifier path for face detection. By default, we will use 'haarcascade_frontalface_alt.xml'. :param blur_type: Type of blur kernel, including ['mean', 'box', 'gaussian']. :param radius: Radius of blur kernel. :param args: extra args :param kwargs: extra args """ super().__init__(*args, **kwargs) self._init_parameters = self.remove_extra_parameters(locals()) if cv_classifier == '': cv_classifier = os.path.join(cv2.data.haarcascades, 'haarcascade_frontalface_alt.xml') if blur_type not in ['mean', 'box', 'gaussian']: raise ValueError( f'Blur_type [{blur_type}] is not supported. ' f'Can only be one of ["mean", "box", "gaussian"]. ') if radius < 0: raise ValueError('Radius must be >= 0. ') if blur_type == 'mean': self.blur = ImageFilter.BLUR elif blur_type == 'box': self.blur = ImageFilter.BoxBlur(radius) else: self.blur = ImageFilter.GaussianBlur(radius) self.blur_type = blur_type self.radius = radius self.extra_kwargs = self._default_kwargs for key in kwargs: if key in self.extra_kwargs: self.extra_kwargs[key] = kwargs[key] self.model_key = prepare_model(model_type='opencv_classifier', model_path=cv_classifier)
[docs] def process_single(self, sample, context=False): # there is no video in this sample if self.video_key not in sample or not sample[self.video_key]: sample[Fields.source_file] = [] return sample if Fields.source_file not in sample or not sample[Fields.source_file]: sample[Fields.source_file] = sample[self.video_key] loaded_video_keys = sample[self.video_key] sample, videos = load_data_with_context(sample, context, loaded_video_keys, load_video) model = get_model(self.model_key) def _blur_func(frame): image = frame.to_image() dets = detect_faces(image, model, **self.extra_kwargs) if len(dets) > 0: for (x, y, w, h) in dets: box = (x, y, x + w, y + h) blured_roi = image.crop(box).filter(self.blur) image.paste(blured_roi, box) frame = av.VideoFrame.from_image(image) return frame processed_video_keys = {} for video_key in loaded_video_keys: # skip duplicate if video_key in processed_video_keys: continue video = videos[video_key] blured_video_key = transfer_filename(video_key, OP_NAME, **self._init_parameters) output_video_key = process_each_frame(video, blured_video_key, _blur_func) processed_video_keys[video_key] = output_video_key if not context: close_video(video) # when the file is modified, its source file needs to be updated. for i, value in enumerate(loaded_video_keys): if sample[Fields.source_file][i] != value: if processed_video_keys[value] != value: sample[Fields.source_file][i] = value sample[self.video_key] = [ processed_video_keys[key] for key in loaded_video_keys ] return sample