data_juicer.utils package

Submodules

data_juicer.utils.asset_utils module

data_juicer.utils.asset_utils.load_words_asset(words_dir: str, words_type: str)[source]

Load words from a asset file named words_type, if not find a valid asset file, then download it from ASSET_LINKS cached by data_juicer team.

Parameters:
  • words_dir – directory that stores asset file(s)

  • words_type – name of target words assets

Returns:

a dict that stores words assets, whose keys are language names, and the values are lists of words

data_juicer.utils.auto_install_mapping module

data_juicer.utils.auto_install_utils module

class data_juicer.utils.auto_install_utils.AutoInstaller(require_f_paths=[])[source]

Bases: object

This class is used to install the required package automatically.

__init__(require_f_paths=[])[source]

Initialization method.

Parameters:

require_f_paths – paths to the file for version limitation

check(check_pkgs, param=None)[source]

install if the package is not installed.

Parameters:
  • check_pkgs – packages to be check, install them if they are not installed

  • param – install param for pip if necessary

install(module)[source]

install package for given module.

Parameters:

module – module to be installed

data_juicer.utils.availability_utils module

data_juicer.utils.cache_utils module

class data_juicer.utils.cache_utils.DatasetCacheControl(on: bool = False)[source]

Bases: object

Define a range that change the cache state temporarily.

__init__(on: bool = False)[source]
data_juicer.utils.cache_utils.dataset_cache_control(on)[source]

A more easy-to-use decorator for functions that need to control the cache state temporarily.

data_juicer.utils.ckpt_utils module

class data_juicer.utils.ckpt_utils.CheckpointManager(ckpt_dir, original_process_list, num_proc=1)[source]

Bases: object

This class is used to save the latest version of dataset to checkpoint directory or load it from checkpoint directory, a bit like cache management Rerun the same config will reload the checkpoint and skip ops before it.

If any args of operator in process list is changed, all ops will be rerun from the beginning.

__init__(ckpt_dir, original_process_list, num_proc=1)[source]

Initialization method.

Parameters:
  • ckpt_dir – path to save and load checkpoint

  • original_process_list – process list in config

  • num_proc – number of process workers when saving dataset

get_left_process_list()[source]

Get left process list of ops for processing dataset, when checkpoint is available, remove some ops from process list, otherwise keep it unchanged.

Returns:

process list of left ops

check_ckpt()[source]

Check if checkpoint is available.

Returns:

True when checkpoint is available, else False

record(op_cfg: dict)[source]

Save op name and args to op record, which is used to compare with the process list from config to decide if a checkpoint is available.

check_ops_to_skip()[source]

Check which ops need to be skipped in the process list.

If op record list from checkpoint are the same as the prefix part of process list, then skip these ops and start processing from the checkpoint. Otherwise, process the original dataset from scratch.

Returns:

whether to skip some ops or not

save_ckpt(ds)[source]

Save dataset to checkpoint directory and dump processed ops list.

Parameters:

ds – input dataset to save

load_ckpt()[source]

Load dataset from a checkpoint file.

Returns:

a dataset stored in checkpoint file.

data_juicer.utils.common_utils module

data_juicer.utils.common_utils.stats_to_number(s, reverse=True)[source]

convert a stats value which can be string of list to a float.

data_juicer.utils.common_utils.dict_to_hash(input_dict: dict, hash_length=None)[source]

hash a dict to a string with length hash_length

Parameters:

input_dict – the given dict

data_juicer.utils.common_utils.nested_access(data, path, digit_allowed=True)[source]

Access nested data using a dot-separated path.

Parameters:
  • data – A dictionary or a list to access the nested data from.

  • path – A dot-separated string representing the path to access. This can include numeric indices when accessing list elements.

  • digit_allowed – Allow transfering string to digit.

Returns:

The value located at the specified path, or raises a KeyError or IndexError if the path does not exist.

data_juicer.utils.common_utils.nested_set(data: dict, path: str, val)[source]

Set the val to the nested data in the dot-separated path.

Parameters:
  • data – A dictionary with nested format.

  • path – A dot-separated string representing the path to set. This can include numeric indices when setting list elements.

Returns:

The nested data after the val set.

data_juicer.utils.common_utils.is_string_list(var)[source]

return if the var is list of string.

Parameters:

var – input variance

data_juicer.utils.common_utils.avg_split_string_list_under_limit(str_list: list, token_nums: list, max_token_num=None)[source]

Split the string list to several sub str_list, such that the total token num of each sub string list is less than max_token_num, keeping the total token nums of sub string lists are similar.

Parameters:
  • str_list – input string list.

  • token_nums – token num of each string list.

  • max_token_num – max token num of each sub string list.

data_juicer.utils.common_utils.is_float(s)[source]

data_juicer.utils.compress module

class data_juicer.utils.compress.FileLock(lock_file: str | os.PathLike[str], timeout: float = -1, mode: int = 420, thread_local: bool = True, *, blocking: bool = True, is_singleton: bool = False, **kwargs: Any)[source]

Bases: FileLock

File lock for compresssion or decompression, and remove lock file automatically.

class data_juicer.utils.compress.Extractor[source]

Bases: Extractor

Extract content from a compressed file.

classmethod extract(input_path: Path | str, output_path: Path | str, extractor_format: str)[source]

Extract content from a compressed file.

Parameters:
  • input_path – path to compressed file.

  • output_path – path to uncompressed file.

  • extractor_format – extraction format, see supported algorithm in Extractor of huggingface dataset.

class data_juicer.utils.compress.BaseCompressor[source]

Bases: ABC

Base class that compresses a file.

abstract static compress(input_path: Path | str, output_path: Path | str)[source]

Compress input file and save to output file.

Parameters:
  • input_path – path to uncompressed file.

  • output_path – path to compressed file.

class data_juicer.utils.compress.ZstdCompressor[source]

Bases: BaseCompressor

This class compresses a file using the zstd algorithm.

static compress(input_path: Path | str, output_path: Path | str)[source]

Compress input file and save to output file.

Parameters:
  • input_path – path to uncompressed file.

  • output_path – path to compressed file.

class data_juicer.utils.compress.Lz4Compressor[source]

Bases: BaseCompressor

This class compresses a file using the lz4 algorithm.

static compress(input_path: Path | str, output_path: Path | str)[source]

Compress a input file and save to output file.

Parameters:
  • input_path – path to uncompressed file.

  • output_path – path to compressed file.

class data_juicer.utils.compress.GzipCompressor[source]

Bases: BaseCompressor

This class compresses a file using the gzip algorithm.

static compress(input_path: Path | str, output_path: Path | str)[source]

Compress input file and save to output file.

Parameters:
  • input_path – path to uncompressed file.

  • output_path – path to compressed file.

class data_juicer.utils.compress.Compressor[source]

Bases: object

This class that contains multiple compressors.

compressors: Dict[str, Type[BaseCompressor]] = {'gzip': <class 'data_juicer.utils.compress.GzipCompressor'>, 'lz4': <class 'data_juicer.utils.compress.Lz4Compressor'>, 'zstd': <class 'data_juicer.utils.compress.ZstdCompressor'>}
classmethod compress(input_path: Path | str, output_path: Path | str, compressor_format: str)[source]

Compress input file and save to output file.

Parameters:
  • input_path – path to uncompressed file.

  • output_path – path to compressed file.

  • compressor_format – compression format, see supported algorithm in compressors.

class data_juicer.utils.compress.CompressManager(compressor_format: str = 'zstd')[source]

Bases: object

This class is used to compress or decompress a input file using compression format algorithms.

__init__(compressor_format: str = 'zstd')[source]

Initialization method.

Parameters:

compressor_format – compression format algorithms, default zstd.

compress(input_path: Path | str, output_path: Path | str)[source]

Compress input file and save to output file.

Parameters:
  • input_path – path to uncompressed file.

  • output_path – path to compressed file.

decompress(input_path: Path | str, output_path: Path | str)[source]

Decompress input file and save to output file.

Parameters:
  • input_path – path to compressed file.

  • output_path – path to uncompressed file.

class data_juicer.utils.compress.CacheCompressManager(compressor_format: str = 'zstd')[source]

Bases: object

This class is used to compress or decompress huggingface cache files using compression format algorithms.

__init__(compressor_format: str = 'zstd')[source]

Initialization method.

Parameters:

compressor_format – compression format algorithms, default zstd.

compress(prev_ds: Dataset, this_ds: Dataset | None = None, num_proc: int = 1)[source]

Compress cache files with fingerprint in dataset cache directory.

Parameters:
  • prev_ds – previous dataset whose cache files need to be compressed here.

  • this_ds – Current dataset that is computed from the previous dataset. There might be overlaps between cache files of them, so we must not compress cache files that will be used again in the current dataset. If it’s None, it means all cache files of previous dataset should be compressed.

  • num_proc – number of processes to compress cache files.

decompress(ds: Dataset, fingerprints: str | List[str] | None = None, num_proc: int = 1)[source]

Decompress compressed cache files with fingerprint in dataset cache directory.

Parameters:
  • ds – input dataset.

  • fingerprints – fingerprintd of cache files. String or List are accepted. If None, we will find all cache files which starts with cache- and ends with compression format.

  • num_proc – number of processes to decompress cache files.

format_cache_file_name(cache_file_name: str | None) str | None[source]

Use * to replace the sub rank in a cache file name. :param cache_file_name: a cache file name.

cleanup_cache_files(ds)[source]

Clean up all compressed cache files in dataset cache directory, which starts with cache- and ends with compression format :param ds: input dataset.

class data_juicer.utils.compress.CompressionOff[source]

Bases: object

Define a range that turn off the cache compression temporarily.

data_juicer.utils.compress.compress(prev_ds, this_ds=None, num_proc=1)[source]
data_juicer.utils.compress.decompress(ds, fingerprints=None, num_proc=1)[source]
data_juicer.utils.compress.cleanup_compressed_cache_files(ds)[source]

data_juicer.utils.constant module

class data_juicer.utils.constant.Fields[source]

Bases: object

stats = '__dj__stats__'
meta = '__dj__meta__'
context = '__dj__context__'
suffix = '__dj__suffix__'
video_frame_tags = '__dj__video_frame_tags__'
video_audio_tags = '__dj__video_audio_tags__'
image_tags = '__dj__image_tags__'
video_frames = '__dj__video_frames__'
source_file = '__dj__source_file__'
multimodal_data_output_dir = '__dj__produced_data__'
event_description = '__dj__event_description__'
relevant_characters = '__dj__relevant_characters__'
main_entities = '__dj__main_entities__'
attributes = '__dj__attributes__'
attribute_descriptions = '__dj__attribute_descriptions__'
attribute_support_texts = '__dj__attribute_support_texts__'
nickname = '__dj__nickname__'
entity = '__dj__entity__'
entity_name = '__dj__entity_name__'
entity_type = '__dj__entity_type__'
entity_description = '__dj__entity_entity_description__'
relation = '__dj__relation__'
source_entity = '__dj__relation_source_entity__'
target_entity = '__dj__relation_target_entity__'
relation_description = '__dj__relation_description__'
relation_keywords = '__dj__relation_keywords__'
relation_strength = '__dj__relation_strength__'
keyword = '__dj__keyword__'
support_text = '__dj__support_text__'
class data_juicer.utils.constant.StatsKeysMeta[source]

Bases: type

a helper class to track the mapping from OP’s name to its used stats_keys

e.g., # once the AlphanumericFilter’s compute_stats method has been called res = TrackingDescriptor.get_access_log() print(res) # {“AlphanumericFilter”: [“alnum_ratio”, “alpha_token_ratio”]}

get_access_log(dj_cfg=None)[source]
class data_juicer.utils.constant.StatsKeysConstant[source]

Bases: object

alpha_token_ratio = 'alpha_token_ratio'
alnum_ratio = 'alnum_ratio'
avg_line_length = 'avg_line_length'
char_rep_ratio = 'char_rep_ratio'
flagged_words_ratio = 'flagged_words_ratio'
lang = 'lang'
lang_score = 'lang_score'
max_line_length = 'max_line_length'
perplexity = 'perplexity'
special_char_ratio = 'special_char_ratio'
stopwords_ratio = 'stopwords_ratio'
text_len = 'text_len'
num_action = 'num_action'
num_dependency_edges = 'num_dependency_edges'
num_token = 'num_token'
num_words = 'num_words'
word_rep_ratio = 'word_rep_ratio'
aspect_ratios = 'aspect_ratios'
image_width = 'image_width'
image_height = 'image_height'
image_sizes = 'image_sizes'
face_ratios = 'face_ratios'
face_detections = 'face_detections'
face_counts = 'face_counts'
image_aesthetics_scores = 'image_aesthetics_scores'
image_nsfw_score = 'image_nsfw_score'
image_watermark_prob = 'image_watermark_prob'
image_pair_similarity = 'image_pair_similarity'
audio_duration = 'audio_duration'
audio_nmf_snr = 'audio_nmf_snr'
audio_sizes = 'audio_sizes'
video_duration = 'video_duration'
video_aspect_ratios = 'video_aspect_ratios'
video_width = 'video_width'
video_height = 'video_height'
video_ocr_area_ratio = 'video_ocr_area_ratio'
video_aesthetic_score = 'video_aesthetic_score'
video_frames_aesthetics_score = 'video_frames_aesthetics_score'
video_motion_score = 'video_motion_score'
video_nsfw_score = 'video_nsfw_score'
video_watermark_prob = 'video_watermark_prob'
image_text_similarity = 'image_text_similarity'
image_text_matching_score = 'image_text_matching_score'
phrase_grounding_recall = 'phrase_grounding_recall'
video_frames_text_similarity = 'video_frames_text_similarity'
class data_juicer.utils.constant.StatsKeys[source]

Bases: object

class data_juicer.utils.constant.HashKeys[source]

Bases: object

hash = '__dj__hash'
minhash = '__dj__minhash'
simhash = '__dj__simhash'
imagehash = '__dj__imagehash'
videohash = '__dj__videohash'
is_duplicate = '__dj__is_duplicate'
class data_juicer.utils.constant.InterVars[source]

Bases: object

lines = '__dj__lines'
words = '__dj__words'
refined_words = '__dj__refined_words'
loaded_images = '__dj__loaded_images'
loaded_audios = '__dj__loaded_audios'
loaded_videos = '__dj__loaded_videos'
sampled_frames = '__dj__sampled_frames'
class data_juicer.utils.constant.JobRequiredKeys(value)[source]

Bases: Enum

An enumeration.

hook = 'hook'
dj_configs = 'dj_configs'
meta_name = 'meta_name'
extra_configs = 'extra_configs'

data_juicer.utils.file_utils module

async data_juicer.utils.file_utils.follow_read(logfile_path: str, skip_existing_content: bool = False) AsyncGenerator[source]

Read a file in online and iterative manner

Parameters:
  • logfile_path (str) – The file path to be read.

  • skip_existing_content (bool, defaults to `False) – If True, read from the end, otherwise read from the beginning.

Returns:

One line string of the file content.

data_juicer.utils.file_utils.find_files_with_suffix(path: str | Path, suffixes: str | List[str] | None = None) List[str][source]

Traverse a path to find all files with the specified suffixes.

Parameters:
  • path – path (str/Path): source path

  • suffixes – specified file suffixes, ‘.txt’ or [‘.txt’, ‘.md’] etc

Returns:

list of all files with the specified suffixes

data_juicer.utils.file_utils.is_absolute_path(path: str | Path) bool[source]

Check whether input path is a absolute path.

Parameters:

path – input path

Returns:

True means input path is absolute path, False means input path is a relative path.

data_juicer.utils.file_utils.add_suffix_to_filename(filename, suffix)[source]

Add a suffix to the filename. Only regard the content after the last dot as the file extension. E.g. 1. abc.jpg + “_resized” –> abc_resized.jpg 2. edf.xyz.csv + “_processed” –> edf.xyz_processed.csv 3. /path/to/file.json + “_suf” –> /path/to/file_suf.json 4. ds.tar.gz + “_whoops” –> ds.tar_whoops.gz (maybe unexpected)

Parameters:
  • filename – input filename

  • suffix – suffix string to be added

data_juicer.utils.file_utils.create_directory_if_not_exists(directory_path)[source]

create a directory if not exists, this function is process safe

Parameters:

directory_path – directory path to be create

data_juicer.utils.file_utils.transfer_filename(original_filepath: str | Path, op_name, **op_kwargs)[source]

According to the op and hashing its parameters ‘op_kwargs’ addition to the process id and current time as the ‘hash_val’, map the original_filepath to another unique file path. E.g.

  1. abc.jpg –>

    __dj__produced_data__/{op_name}/ abc__dj_hash_#{hash_val}#.jpg

  2. ./abc.jpg –>

    ./__dj__produced_data__/{op_name}/ abc__dj_hash_#{hash_val}#.jpg

  3. /path/to/abc.jpg –>

    /path/to/__dj__produced_data__/{op_name}/ abc__dj_hash_#{hash_val}#.jpg

  4. /path/to/__dj__produced_data__/{op_name}/

    abc__dj_hash_#{hash_val1}#.jpg –> /path/to/__dj__produced_data__/{op_name}/ abc__dj_hash_#{hash_val2}#.jpg

data_juicer.utils.file_utils.copy_data(from_dir, to_dir, data_path)[source]

Copy data from from_dir/data_path to to_dir/data_path. Return True if success.

data_juicer.utils.fingerprint_utils module

class data_juicer.utils.fingerprint_utils.Hasher[source]

Bases: object

Hasher that accepts python objects as inputs.

dispatch: Dict = {}
__init__()[source]
classmethod hash_bytes(value: bytes | List[bytes]) str[source]
classmethod hash_default(value: Any) str[source]

Use dill to serialize objects to avoid serialization failures.

classmethod hash(value: Any) str[source]
update(value: Any) None[source]
hexdigest() str[source]
data_juicer.utils.fingerprint_utils.update_fingerprint(fingerprint, transform, transform_args)[source]

Combining various objects to update the fingerprint.

data_juicer.utils.fingerprint_utils.generate_fingerprint(ds, *args, **kwargs)[source]

Generate new fingerprints by using various kwargs of the dataset.

data_juicer.utils.lazy_loader module

A LazyLoader class.

class data_juicer.utils.lazy_loader.LazyLoader(local_name, name, auto_install=True)[source]

Bases: ModuleType

Lazily import a module, mainly to avoid pulling in large dependencies. contrib, and ffmpeg are examples of modules that are large and not always needed, and this allows them to only be loaded when they are used.

__init__(local_name, name, auto_install=True)[source]

data_juicer.utils.logger_utils module

data_juicer.utils.logger_utils.get_caller_name(depth=0)[source]

Get caller name by depth.

Parameters:

depth – depth of caller context, use 0 for caller depth.

Returns:

module name of the caller

class data_juicer.utils.logger_utils.StreamToLoguru(level='INFO', caller_names=('datasets', 'logging'))[source]

Bases: object

Stream object that redirects writes to a logger instance.

__init__(level='INFO', caller_names=('datasets', 'logging'))[source]

Initialization method.

Parameters:
  • level – log level string of loguru. Default value: “INFO”.

  • caller_names – caller names of redirected module. Default value: (apex, pycocotools).

write(buf)[source]
getvalue()[source]
flush()[source]
data_juicer.utils.logger_utils.redirect_sys_output(log_level='INFO')[source]

Redirect stdout/stderr to loguru with log level.

Parameters:

log_level – log level string of loguru. Default value: “INFO”.

data_juicer.utils.logger_utils.get_log_file_path()[source]

Get the path to the location of the log file.

Returns:

a location of log file.

data_juicer.utils.logger_utils.setup_logger(save_dir, distributed_rank=0, filename='log.txt', mode='o', level='INFO', redirect=True)[source]

Setup logger for training and testing.

Parameters:
  • save_dir – location to save log file

  • distributed_rank – device rank when multi-gpu environment

  • filename – log file name to save

  • mode – log file write mode, append or override. default is o.

  • level – log severity level. It’s “INFO” in default.

  • redirect – whether to redirect system output

Returns:

logger instance.

class data_juicer.utils.logger_utils.HiddenPrints[source]

Bases: object

Define a range that hide the outputs within this range.

data_juicer.utils.mm_utils module

class data_juicer.utils.mm_utils.SpecialTokens[source]

Bases: object

image = '<__dj__image>'
audio = '<__dj__audio>'
video = '<__dj__video>'
eoc = '<|__dj__eoc|>'
data_juicer.utils.mm_utils.AV_STREAM_THREAD_TYPE = 'AUTO'

av stream thread type support “SLICE”, “FRAME”, “AUTO”.

“SLICE”: Decode more than one part of a single frame at once

“FRAME”: Decode more than one frame at once

“AUTO”: Using both “FRAME” and “SLICE” AUTO is faster when there are no video latency.

data_juicer.utils.mm_utils.get_special_tokens()[source]
data_juicer.utils.mm_utils.remove_special_tokens(text)[source]
data_juicer.utils.mm_utils.remove_non_special_tokens(text)[source]
data_juicer.utils.mm_utils.load_data_with_context(sample, context, loaded_data_keys, load_func)[source]

The unified loading function with contexts for multimodal data.

data_juicer.utils.mm_utils.load_images(paths)[source]
data_juicer.utils.mm_utils.load_images_byte(paths)[source]
data_juicer.utils.mm_utils.load_image(path)[source]
data_juicer.utils.mm_utils.load_image_byte(path)[source]
data_juicer.utils.mm_utils.image_path_to_base64(image_path)[source]
data_juicer.utils.mm_utils.image_byte_to_base64(image_byte)[source]
data_juicer.utils.mm_utils.pil_to_opencv(pil_image)[source]
data_juicer.utils.mm_utils.detect_faces(image, detector, **extra_kwargs)[source]
data_juicer.utils.mm_utils.get_file_size(path)[source]
data_juicer.utils.mm_utils.iou(box1, box2)[source]
data_juicer.utils.mm_utils.calculate_resized_dimensions(original_size: Tuple[Annotated[int, Gt(gt=0)], Annotated[int, Gt(gt=0)]], target_size: Annotated[int, Gt(gt=0)] | Tuple[Annotated[int, Gt(gt=0)], Annotated[int, Gt(gt=0)]], max_length: int | None = None, divisible: Annotated[int, Gt(gt=0)] = 1) Tuple[int, int][source]

Resize dimensions based on specified constraints.

Parameters:
  • original_size – The original dimensions as (height, width).

  • target_size – Desired target size; can be a single integer (short edge) or a tuple (height, width).

  • max_length – Maximum allowed length for the longer edge.

  • divisible – The number that the dimensions must be divisible by.

Returns:

Resized dimensions as (height, width).

data_juicer.utils.mm_utils.load_audios(paths)[source]
data_juicer.utils.mm_utils.load_audio(path, sampling_rate=None)[source]
data_juicer.utils.mm_utils.load_videos(paths)[source]
data_juicer.utils.mm_utils.load_video(path, mode='r')[source]

Load a video using its path.

Parameters:
  • path – the path to this video.

  • mode – the loading mode. It’s “r” in default.

Returns:

a container object form PyAv library, which contains all streams in this video (video/audio/…) and can be used to decode these streams to frames.

data_juicer.utils.mm_utils.get_video_duration(input_video: str | InputContainer, video_stream_index: int = 0)[source]

Get the video’s duration from the container

Parameters:
  • input_video – the container object form PyAv library, which contains all streams in this video (video/audio/…) and can be used to decode these streams to frames.

  • video_stream_index – the video stream index to decode, default set to 0.

Returns:

duration of the video in second

data_juicer.utils.mm_utils.get_decoded_frames_from_video(input_video: str | InputContainer, video_stream_index: int = 0)[source]

Get the video’s frames from the container

Parameters:
  • input_video – the container object form PyAv library, which contains all streams in this video (video/audio/…) and can be used to decode these streams to frames.

  • video_stream_index – the video stream index to decode, default set to 0.

Returns:

an iterator of all the frames of the video

data_juicer.utils.mm_utils.cut_video_by_seconds(input_video: str | InputContainer, output_video: str, start_seconds: float, end_seconds: float | None = None)[source]

Cut a video into several segments by times in second.

Parameters:
  • input_video – the path to input video or the video container.

  • output_video – the path to output video.

  • start_seconds – the start time in second.

  • end_seconds – the end time in second. If it’s None, this function will cut the video from the start_seconds to the end of the video.

Returns:

a boolean flag indicating whether the video was successfully cut or not.

data_juicer.utils.mm_utils.process_each_frame(input_video: str | InputContainer, output_video: str, frame_func)[source]

Process each frame in video by replacing each frame by frame_func(frame).

Parameters:
  • input_video – the path to input video or the video container.

  • output_video – the path to output video.

  • frame_func – a function which inputs a frame and outputs another frame.

data_juicer.utils.mm_utils.extract_key_frames_by_seconds(input_video: str | InputContainer, duration: float = 1)[source]

Extract key frames by seconds. :param input_video: input video path or av.container.InputContainer. :param duration: duration of each video split in seconds.

data_juicer.utils.mm_utils.extract_key_frames(input_video: str | InputContainer)[source]

Extract key frames from the input video. If there is no keyframes in the video, return the first frame.

Parameters:

input_video – input video path or container.

Returns:

a list of key frames.

data_juicer.utils.mm_utils.get_key_frame_seconds(input_video: str | InputContainer)[source]

Get seconds of key frames in the input video.

data_juicer.utils.mm_utils.extract_video_frames_uniformly_by_seconds(input_video: str | InputContainer, frame_num: Annotated[int, Gt(gt=0)], duration: float = 1)[source]

Extract video frames uniformly by seconds. :param input_video: input video path or av.container.InputContainer. :param frame_num: the number of frames to be extracted uniformly from

each video split by duration.

Parameters:

duration – duration of each video split in seconds.

data_juicer.utils.mm_utils.extract_video_frames_uniformly(input_video: str | InputContainer, frame_num: Annotated[int, Gt(gt=0)])[source]

Extract a number of video frames uniformly within the video duration.

Parameters:
  • input_video – input video path or container.

  • frame_num – The number of frames to be extracted. If it’s 1, only the middle frame will be extracted. If it’s 2, only the first and the last frames will be extracted. If it’s larger than 2, in addition to the first and the last frames, other frames will be extracted uniformly within the video duration.

Returns:

a list of extracted frames.

data_juicer.utils.mm_utils.extract_audio_from_video(input_video: str | InputContainer, output_audio: str | None = None, start_seconds: int = 0, end_seconds: int | None = None, stream_indexes: int | List[int] | None = None)[source]

Extract audio data for the given video.

Parameters:
  • input_video – input video. Can be a video path or an av.container.InputContainer.

  • output_audio – output audio path. If it’s None, the audio data won’t be written to file. If stream_indexes is not None, it will output multiple audio files with original filename and the stream indexes. Default: None.

  • start_seconds – the start seconds to extract audio data. Default: 0, which means extract from the start of the video.

  • end_seconds – the end seconds to stop extracting audio data. If it’s None, the extraction won’t stop until the end of the video. Default: None.

  • stream_indexes – there might be multiple audio streams in the video, so we need to decide which audio streams with stream_indexes will be extracted. It can be a single index or a list of indexes. If it’s None, all audio streams will be extracted. Default: None.

data_juicer.utils.mm_utils.size_to_bytes(size)[source]
data_juicer.utils.mm_utils.insert_texts_after_placeholders(original_string, placeholders, new_texts, delimiter_in_insert_pos=' ')[source]
data_juicer.utils.mm_utils.timecode_string_to_seconds(timecode: str)[source]

Convert a timecode string to the float seconds.

Parameters:

timecode – the input timecode string. Must in “HH:MM:SS.fff(fff)” format.

data_juicer.utils.mm_utils.parse_string_to_roi(roi_string, roi_type='pixel')[source]

Convert a roi string to four number x1, y1, x2, y2 stand for the region. When the type is ‘pixel’, (x1, y1), (x2, y2) are the locations of pixels in the top left corner and the bottom right corner respectively. If the roi_type is ‘ratio’, the coordinates are normalized by wights and heights.

Parameters:

roi_string – the roi string

Patam roi_type:

the roi string type

return tuple of (x1, y1, x2, y2) if roi_string is valid, else None

data_juicer.utils.mm_utils.close_video(container: InputContainer)[source]

Close the video stream and container to avoid memory leak.

Parameters:

container – the video container.

data_juicer.utils.model_utils module

data_juicer.utils.model_utils.check_model(model_name, force=False)[source]

Check whether a model exists in DATA_JUICER_MODELS_CACHE. If exists, return its full path. Else, download it from cached models links.

Parameters:
  • model_name – a specified model name

  • force – Whether to download model forcefully or not, Sometimes the model file maybe incomplete for some reason, so need to download again forcefully.

class data_juicer.utils.model_utils.APIModel(model, endpoint=None, response_path=None, **kwargs)[source]

Bases: object

__init__(model, endpoint=None, response_path=None, **kwargs)[source]

Initializes an instance of the APIModel class.

Parameters:
  • model – The name of the model to be used for making API calls. This should correspond to a valid model identifier recognized by the API server.

  • endpoint – The URL endpoint for the API. If provided as a relative path, it will be appended to the base URL (defined by the OPENAI_BASE_URL environment variable or through an additional base_url parameter). Defaults to ‘/chat/completions’ for OpenAI compatibility.

  • response_path – A dot-separated string specifying the path to extract the desired content from the API response. The default value is ‘choices.0.message.content’, which corresponds to the typical structure of an OpenAI API response.

  • kwargs – Additional keyword arguments for configuring the internal OpenAI client.

data_juicer.utils.model_utils.prepare_api_model(model, *, endpoint=None, response_path=None, return_processor=False, processor_config=None, **model_params)[source]

Creates a callable API model for interacting with OpenAI-compatible API. The callable supports custom response parsing and works with proxy servers that may be incompatible.

Parameters:
  • model – The name of the model to interact with.

  • endpoint – The URL endpoint for the API. If provided as a relative path, it will be appended to the base URL (defined by the OPENAI_BASE_URL environment variable or through an additional base_url parameter). By default, it is set to ‘/chat/completions’ for OpenAI compatibility.

  • response_path – The dot-separated path to extract desired content from the API response. Defaults to ‘choices.0.message.content’.

  • return_processor – A boolean flag indicating whether to return a processor along with the model. The processor can be used for tasks like tokenization or encoding. Defaults to False.

  • processor_config – A dictionary containing configuration parameters for initializing a Hugging Face processor. It is only relevant if return_processor is set to True.

  • model_params – Additional parameters for configuring the API model.

Returns:

A callable APIModel instance, and optionally a processor if return_processor is True.

data_juicer.utils.model_utils.prepare_diffusion_model(pretrained_model_name_or_path, diffusion_type, **model_params)[source]

Prepare and load an Diffusion model from HuggingFace.

Parameters:
  • pretrained_model_name_or_path – input Diffusion model name or local path to the model

  • diffusion_type – the use of the diffusion model. It can be ‘image2image’, ‘text2image’, ‘inpainting’

Returns:

a Diffusion model.

data_juicer.utils.model_utils.prepare_fasttext_model(model_name='lid.176.bin', **model_params)[source]

Prepare and load a fasttext model.

Parameters:

model_name – input model name

Returns:

model instance.

data_juicer.utils.model_utils.prepare_huggingface_model(pretrained_model_name_or_path, *, return_model=True, return_pipe=False, pipe_task='text-generation', **model_params)[source]

Prepare and load a HuggingFace model with the correspoding processor.

Parameters:
  • pretrained_model_name_or_path – model name or path

  • return_model – return model or not

  • return_pipe – whether to wrap model into pipeline

  • model_params – model initialization parameters.

Returns:

a tuple of (model, input processor) if return_model is True; otherwise, only the processor is returned.

data_juicer.utils.model_utils.prepare_kenlm_model(lang, name_pattern='{}.arpa.bin', **model_params)[source]

Prepare and load a kenlm model.

Parameters:
  • model_name – input model name in formatting syntax.

  • lang – language to render model name

Returns:

model instance.

data_juicer.utils.model_utils.prepare_nltk_model(lang, name_pattern='punkt.{}.pickle', **model_params)[source]

Prepare and load a nltk punkt model.

Parameters:
  • model_name – input model name in formatting syntax

  • lang – language to render model name

Returns:

model instance.

data_juicer.utils.model_utils.prepare_opencv_classifier(model_path, **model_params)[source]
data_juicer.utils.model_utils.prepare_recognizeAnything_model(pretrained_model_name_or_path='ram_plus_swin_large_14m.pth', input_size=384, **model_params)[source]

Prepare and load recognizeAnything model.

Parameters:
  • model_name – input model name.

  • input_size – the input size of the model.

data_juicer.utils.model_utils.prepare_sentencepiece_model(model_path, **model_params)[source]

Prepare and load a sentencepiece model.

Parameters:

model_path – input model path

Returns:

model instance

data_juicer.utils.model_utils.prepare_sentencepiece_for_lang(lang, name_pattern='{}.sp.model', **model_params)[source]

Prepare and load a sentencepiece model for specific langauge.

Parameters:
  • lang – language to render model name

  • name_pattern – pattern to render the model name

Returns:

model instance.

data_juicer.utils.model_utils.prepare_simple_aesthetics_model(pretrained_model_name_or_path, *, return_model=True, **model_params)[source]

Prepare and load a simple aesthetics model.

Parameters:
  • pretrained_model_name_or_path – model name or path

  • return_model – return model or not

Returns:

a tuple (model, input processor) if return_model is True; otherwise, only the processor is returned.

data_juicer.utils.model_utils.prepare_spacy_model(lang, name_pattern='{}_core_web_md-3.7.0', **model_params)[source]

Prepare spacy model for specific language.

Parameters:

lang – language of sapcy model. Should be one of [“zh”, “en”]

Returns:

corresponding spacy model

data_juicer.utils.model_utils.prepare_video_blip_model(pretrained_model_name_or_path, *, return_model=True, **model_params)[source]

Prepare and load a video-clip model with the correspoding processor.

Parameters:
  • pretrained_model_name_or_path – model name or path

  • return_model – return model or not

  • trust_remote_code – passed to transformers

Returns:

a tuple (model, input processor) if return_model is True; otherwise, only the processor is returned.

data_juicer.utils.model_utils.prepare_vllm_model(pretrained_model_name_or_path, **model_params)[source]

Prepare and load a HuggingFace model with the correspoding processor.

Parameters:
  • pretrained_model_name_or_path – model name or path

  • model_params – LLM initialization parameters.

Returns:

a tuple of (model, tokenizer)

data_juicer.utils.model_utils.prepare_model(model_type, **model_kwargs)[source]
data_juicer.utils.model_utils.get_model(model_key=None, rank=None, use_cuda=False)[source]
data_juicer.utils.model_utils.free_models()[source]

data_juicer.utils.process_utils module

data_juicer.utils.process_utils.setup_mp(method=None)[source]
data_juicer.utils.process_utils.get_min_cuda_memory()[source]
data_juicer.utils.process_utils.calculate_np(name, mem_required, cpu_required, num_proc=None, use_cuda=False)[source]

Calculate the optimum number of processes for the given OP

data_juicer.utils.registry module

class data_juicer.utils.registry.Registry(name: str)[source]

Bases: object

This class is used to register some modules to registry by a repo name.

__init__(name: str)[source]

Initialization method.

Parameters:

name – a registry repo name

property name

Get name of current registry.

Returns:

name of current registry.

property modules

Get all modules in current registry.

Returns:

a dict storing modules in current registry.

list()[source]

Logging the list of module in current registry.

get(module_key)[source]

Get module named module_key from in current registry. If not found, return None.

Parameters:

module_key – specified module name

Returns:

module named module_key

register_module(module_name: str | None = None, module_cls: type | None = None, force=False)[source]

Register module class object to registry with the specified modulename.

Parameters:
  • module_name – module name

  • module_cls – module class object

  • force – Whether to override an existing class with the same name. Default: False.

Example

>>> registry = Registry()
>>> @registry.register_module()
>>> class TextFormatter:
>>>     pass
>>> class TextFormatter2:
>>>     pass
>>> registry.register_module( module_name='text_formatter2',
                            module_cls=TextFormatter2)

data_juicer.utils.resource_utils module

data_juicer.utils.resource_utils.query_cuda_info(query_key)[source]
data_juicer.utils.resource_utils.get_cpu_count()[source]
data_juicer.utils.resource_utils.get_cpu_utilization()[source]
data_juicer.utils.resource_utils.query_mem_info(query_key)[source]

data_juicer.utils.unittest_utils module

data_juicer.utils.unittest_utils.TEST_TAG(*tags)[source]

Tags for test case. Currently, standalone, ray are supported.

data_juicer.utils.unittest_utils.set_clear_model_flag(flag)[source]
class data_juicer.utils.unittest_utils.DataJuicerTestCaseBase(methodName='runTest')[source]

Bases: TestCase

classmethod setUpClass()[source]

Hook method for setting up class fixture before running tests in the class.

classmethod tearDownClass(hf_model_name=None) None[source]

Hook method for deconstructing the class fixture after running all tests in the class.

classmethod tearDown() None[source]

Hook method for deconstructing the test fixture after testing it.

generate_dataset(data) DJDataset[source]

Generate dataset for a specific executor.

Parameters:
  • type (str, optional) – “standalone” or “ray”.

  • "standalone". (Defaults to)

run_single_op(dataset: DJDataset, op, column_names)[source]

Run operator in the specific executor.

assertDatasetEqual(first, second)[source]

Module contents