data_juicer.utils package

Submodules

data_juicer.utils.asset_utils module

data_juicer.utils.asset_utils.load_words_asset(words_dir: str, words_type: str)[源代码]

Load words from a asset file named words_type, if not find a valid asset file, then download it from ASSET_LINKS cached by data_juicer team.

参数:
  • words_dir -- directory that stores asset file(s)

  • words_type -- name of target words assets

返回:

a dict that stores words assets, whose keys are language names, and the values are lists of words

data_juicer.utils.auto_install_mapping module

data_juicer.utils.auto_install_utils module

data_juicer.utils.availability_utils module

data_juicer.utils.cache_utils module

class data_juicer.utils.cache_utils.DatasetCacheControl(on: bool = False)[源代码]

基类:object

Define a range that change the cache state temporarily.

__init__(on: bool = False)[源代码]
data_juicer.utils.cache_utils.dataset_cache_control(on)[源代码]

A more easy-to-use decorator for functions that need to control the cache state temporarily.

data_juicer.utils.ckpt_utils module

class data_juicer.utils.ckpt_utils.CheckpointManager(ckpt_dir, original_process_list, num_proc=1)[源代码]

基类:object

This class is used to save the latest version of dataset to checkpoint directory or load it from checkpoint directory, a bit like cache management Rerun the same config will reload the checkpoint and skip ops before it.

If any args of operator in process list is changed, all ops will be rerun from the beginning.

__init__(ckpt_dir, original_process_list, num_proc=1)[源代码]

Initialization method.

参数:
  • ckpt_dir -- path to save and load checkpoint

  • original_process_list -- process list in config

  • num_proc -- number of process workers when saving dataset

get_left_process_list()[源代码]

Get left process list of ops for processing dataset, when checkpoint is available, remove some ops from process list, otherwise keep it unchanged.

返回:

process list of left ops

check_ckpt()[源代码]

Check if checkpoint is available.

返回:

True when checkpoint is available, else False

record(op_cfg: dict)[源代码]

Save op name and args to op record, which is used to compare with the process list from config to decide if a checkpoint is available.

check_ops_to_skip()[源代码]

Check which ops need to be skipped in the process list.

If op record list from checkpoint are the same as the prefix part of process list, then skip these ops and start processing from the checkpoint. Otherwise, process the original dataset from scratch.

返回:

whether to skip some ops or not

save_ckpt(ds)[源代码]

Save dataset to checkpoint directory and dump processed ops list.

参数:

ds -- input dataset to save

load_ckpt()[源代码]

Load dataset from a checkpoint file.

返回:

a dataset stored in checkpoint file.

data_juicer.utils.common_utils module

data_juicer.utils.common_utils.stats_to_number(s, reverse=True)[源代码]

convert a stats value which can be string of list to a float.

data_juicer.utils.common_utils.dict_to_hash(input_dict: dict, hash_length=None)[源代码]

hash a dict to a string with length hash_length

参数:

input_dict -- the given dict

data_juicer.utils.common_utils.nested_access(data, path, digit_allowed=True)[源代码]

Access nested data using a dot-separated path.

参数:
  • data -- A dictionary or a list to access the nested data from.

  • path -- A dot-separated string representing the path to access. This can include numeric indices when accessing list elements.

  • digit_allowed -- Allow transferring string to digit.

返回:

The value located at the specified path, or raises a KeyError or IndexError if the path does not exist.

data_juicer.utils.common_utils.is_string_list(var)[源代码]

return if the var is list of string.

参数:

var -- input variance

data_juicer.utils.common_utils.avg_split_string_list_under_limit(str_list: list, token_nums: list, max_token_num=None)[源代码]

Split the string list to several sub str_list, such that the total token num of each sub string list is less than max_token_num, keeping the total token nums of sub string lists are similar.

参数:
  • str_list -- input string list.

  • token_nums -- token num of each string list.

  • max_token_num -- max token num of each sub string list.

data_juicer.utils.common_utils.is_float(s)[源代码]

data_juicer.utils.compress module

class data_juicer.utils.compress.FileLock(lock_file: str | PathLike[str], timeout: float = -1, mode: int = 420, thread_local: bool = True, *, blocking: bool = True, is_singleton: bool = False, **kwargs: Any)[源代码]

基类:FileLock

File lock for compression or decompression, and remove lock file automatically.

class data_juicer.utils.compress.Extractor[源代码]

基类:Extractor

Extract content from a compressed file.

classmethod extract(input_path: Path | str, output_path: Path | str, extractor_format: str)[源代码]

Extract content from a compressed file.

参数:
  • input_path -- path to compressed file.

  • output_path -- path to uncompressed file.

  • extractor_format -- extraction format, see supported algorithm in Extractor of huggingface dataset.

class data_juicer.utils.compress.BaseCompressor[源代码]

基类:ABC

Base class that compresses a file.

abstract static compress(input_path: Path | str, output_path: Path | str)[源代码]

Compress input file and save to output file.

参数:
  • input_path -- path to uncompressed file.

  • output_path -- path to compressed file.

class data_juicer.utils.compress.ZstdCompressor[源代码]

基类:BaseCompressor

This class compresses a file using the zstd algorithm.

static compress(input_path: Path | str, output_path: Path | str)[源代码]

Compress input file and save to output file.

参数:
  • input_path -- path to uncompressed file.

  • output_path -- path to compressed file.

class data_juicer.utils.compress.Lz4Compressor[源代码]

基类:BaseCompressor

This class compresses a file using the lz4 algorithm.

static compress(input_path: Path | str, output_path: Path | str)[源代码]

Compress a input file and save to output file.

参数:
  • input_path -- path to uncompressed file.

  • output_path -- path to compressed file.

class data_juicer.utils.compress.GzipCompressor[源代码]

基类:BaseCompressor

This class compresses a file using the gzip algorithm.

static compress(input_path: Path | str, output_path: Path | str)[源代码]

Compress input file and save to output file.

参数:
  • input_path -- path to uncompressed file.

  • output_path -- path to compressed file.

class data_juicer.utils.compress.Compressor[源代码]

基类:object

This class that contains multiple compressors.

compressors: Dict[str, Type[BaseCompressor]] = {'gzip': <class 'data_juicer.utils.compress.GzipCompressor'>, 'lz4': <class 'data_juicer.utils.compress.Lz4Compressor'>, 'zstd': <class 'data_juicer.utils.compress.ZstdCompressor'>}
classmethod compress(input_path: Path | str, output_path: Path | str, compressor_format: str)[源代码]

Compress input file and save to output file.

参数:
  • input_path -- path to uncompressed file.

  • output_path -- path to compressed file.

  • compressor_format -- compression format, see supported algorithm in compressors.

class data_juicer.utils.compress.CompressManager(compressor_format: str = 'zstd')[源代码]

基类:object

This class is used to compress or decompress a input file using compression format algorithms.

__init__(compressor_format: str = 'zstd')[源代码]

Initialization method.

参数:

compressor_format -- compression format algorithms, default zstd.

compress(input_path: Path | str, output_path: Path | str)[源代码]

Compress input file and save to output file.

参数:
  • input_path -- path to uncompressed file.

  • output_path -- path to compressed file.

decompress(input_path: Path | str, output_path: Path | str)[源代码]

Decompress input file and save to output file.

参数:
  • input_path -- path to compressed file.

  • output_path -- path to uncompressed file.

class data_juicer.utils.compress.CacheCompressManager(compressor_format: str = 'zstd')[源代码]

基类:object

This class is used to compress or decompress huggingface cache files using compression format algorithms.

__init__(compressor_format: str = 'zstd')[源代码]

Initialization method.

参数:

compressor_format -- compression format algorithms, default zstd.

compress(prev_ds: Dataset, this_ds: Dataset | None = None, num_proc: int = 1)[源代码]

Compress cache files with fingerprint in dataset cache directory.

参数:
  • prev_ds -- previous dataset whose cache files need to be compressed here.

  • this_ds -- Current dataset that is computed from the previous dataset. There might be overlaps between cache files of them, so we must not compress cache files that will be used again in the current dataset. If it's None, it means all cache files of previous dataset should be compressed.

  • num_proc -- number of processes to compress cache files.

decompress(ds: Dataset, fingerprints: str | List[str] | None = None, num_proc: int = 1)[源代码]

Decompress compressed cache files with fingerprint in dataset cache directory.

参数:
  • ds -- input dataset.

  • fingerprints -- fingerprints of cache files. String or List are accepted. If None, we will find all cache files which starts with cache- and ends with compression format.

  • num_proc -- number of processes to decompress cache files.

format_cache_file_name(cache_file_name: str | None) str | None[源代码]

Use * to replace the sub rank in a cache file name. :param cache_file_name: a cache file name.

cleanup_cache_files(ds)[源代码]

Clean up all compressed cache files in dataset cache directory, which starts with cache- and ends with compression format :param ds: input dataset.

class data_juicer.utils.compress.CompressionOff[源代码]

基类:object

Define a range that turn off the cache compression temporarily.

data_juicer.utils.compress.compress(prev_ds, this_ds=None, num_proc=1)[源代码]
data_juicer.utils.compress.decompress(ds, fingerprints=None, num_proc=1)[源代码]
data_juicer.utils.compress.cleanup_compressed_cache_files(ds)[源代码]

data_juicer.utils.constant module

class data_juicer.utils.constant.Fields[源代码]

基类:object

stats = '__dj__stats__'
meta = '__dj__meta__'
batch_meta = '__dj__batch_meta__'
context = '__dj__context__'
suffix = '__dj__suffix__'
source_file = '__dj__source_file__'
multimodal_data_output_dir = '__dj__produced_data__'
class data_juicer.utils.constant.BatchMetaKeys[源代码]

基类:object

entity_attribute = 'entity_attribute'
most_relevant_entities = 'most_relevant_entities'
class data_juicer.utils.constant.MetaKeys[源代码]

基类:object

dialog_sentiment_intensity = 'dialog_sentiment_intensity'
dialog_sentiment_intensity_analysis = 'dialog_sentiment_intensity_analysis'
query_sentiment_label = 'query_sentiment_label'
query_sentiment_score = 'query_sentiment_label_score'
dialog_sentiment_labels = 'dialog_sentiment_labels'
dialog_sentiment_labels_analysis = 'dialog_sentiment_labels_analysis'
dialog_intent_labels = 'dialog_intent_labels'
dialog_intent_labels_analysis = 'dialog_intent_labels_analysis'
query_intent_label = 'query_intent_label'
query_intent_score = 'query_intent_label_score'
dialog_topic_labels = 'dialog_topic_labels'
dialog_topic_labels_analysis = 'dialog_topic_labels_analysis'
query_topic_label = 'query_topic_label'
query_topic_score = 'query_topic_label_score'
video_frame_tags = 'video_frame_tags'
video_audio_tags = 'video_audio_tags'
video_frames = 'video_frames'
image_tags = 'image_tags'
bbox_tag = '__dj__bbox__'
event_description = 'event_description'
relevant_characters = 'relevant_characters'
main_entities = 'main_entities'
attributes = 'attributes'
attribute_descriptions = 'attribute_descriptions'
attribute_support_texts = 'attribute_support_texts'
nickname = 'nickname'
entity = 'entity'
entity_name = 'entity_name'
entity_type = 'entity_type'
entity_description = 'entity_entity_description'
relation = 'relation'
source_entity = 'relation_source_entity'
target_entity = 'relation_target_entity'
relation_description = 'relation_description'
relation_keywords = 'relation_keywords'
relation_strength = 'relation_strength'
keyword = 'keyword'
support_text = 'support_text'
role_relation = 'role_relation'
html_tables = 'html_tables'
class data_juicer.utils.constant.StatsKeysMeta[源代码]

基类:type

a helper class to track the mapping from OP's name to its used stats_keys

e.g., # once the AlphanumericFilter's compute_stats method has been called res = TrackingDescriptor.get_access_log() print(res) # {"AlphanumericFilter": ["alnum_ratio", "alpha_token_ratio"]}

get_access_log(dj_cfg=None, dataset=None)[源代码]
class data_juicer.utils.constant.StatsKeysConstant[源代码]

基类:object

alpha_token_ratio = 'alpha_token_ratio'
alnum_ratio = 'alnum_ratio'
avg_line_length = 'avg_line_length'
char_rep_ratio = 'char_rep_ratio'
flagged_words_ratio = 'flagged_words_ratio'
lang = 'lang'
lang_score = 'lang_score'
max_line_length = 'max_line_length'
perplexity = 'perplexity'
special_char_ratio = 'special_char_ratio'
stopwords_ratio = 'stopwords_ratio'
text_len = 'text_len'
text_pair_similarity = 'text_pair_similarity'
num_action = 'num_action'
num_dependency_edges = 'num_dependency_edges'
num_token = 'num_token'
num_words = 'num_words'
word_rep_ratio = 'word_rep_ratio'
llm_quality_score = 'llm_quality_score'
llm_quality_record = 'llm_quality_record'
llm_difficulty_score = 'llm_difficulty_score'
llm_difficulty_record = 'llm_difficulty_record'
aspect_ratios = 'aspect_ratios'
image_width = 'image_width'
image_height = 'image_height'
image_sizes = 'image_sizes'
face_ratios = 'face_ratios'
face_detections = 'face_detections'
face_counts = 'face_counts'
image_aesthetics_scores = 'image_aesthetics_scores'
image_nsfw_score = 'image_nsfw_score'
image_watermark_prob = 'image_watermark_prob'
image_pair_similarity = 'image_pair_similarity'
audio_duration = 'audio_duration'
audio_nmf_snr = 'audio_nmf_snr'
audio_sizes = 'audio_sizes'
video_duration = 'video_duration'
video_aspect_ratios = 'video_aspect_ratios'
video_width = 'video_width'
video_height = 'video_height'
video_ocr_area_ratio = 'video_ocr_area_ratio'
video_aesthetic_score = 'video_aesthetic_score'
video_frames_aesthetics_score = 'video_frames_aesthetics_score'
video_motion_score = 'video_motion_score'
video_nsfw_score = 'video_nsfw_score'
video_watermark_prob = 'video_watermark_prob'
image_text_similarity = 'image_text_similarity'
image_text_matching_score = 'image_text_matching_score'
phrase_grounding_recall = 'phrase_grounding_recall'
video_frames_text_similarity = 'video_frames_text_similarity'
general_field_filter_condition = 'general_field_filter_condition'
class data_juicer.utils.constant.StatsKeys[源代码]

基类:object

class data_juicer.utils.constant.HashKeys[源代码]

基类:object

uid = '__dj__uid'
hash = '__dj__hash'
minhash = '__dj__minhash'
simhash = '__dj__simhash'
imagehash = '__dj__imagehash'
videohash = '__dj__videohash'
is_unique = '__dj__is_unique'
class data_juicer.utils.constant.InterVars[源代码]

基类:object

lines = '__dj__lines'
words = '__dj__words'
refined_words = '__dj__refined_words'
loaded_images = '__dj__loaded_images'
loaded_audios = '__dj__loaded_audios'
loaded_videos = '__dj__loaded_videos'
sampled_frames = '__dj__sampled_frames'
class data_juicer.utils.constant.JobRequiredKeys(value)[源代码]

基类:Enum

An enumeration.

hook = 'hook'
dj_configs = 'dj_configs'
meta_name = 'meta_name'
extra_configs = 'extra_configs'

data_juicer.utils.file_utils module

async data_juicer.utils.file_utils.follow_read(logfile_path: str, skip_existing_content: bool = False) AsyncGenerator[源代码]

Read a file in online and iterative manner

参数:
  • logfile_path (str) -- The file path to be read.

  • skip_existing_content (bool, defaults to `False) -- If True, read from the end, otherwise read from the beginning.

返回:

One line string of the file content.

data_juicer.utils.file_utils.find_files_with_suffix(path: str | Path, suffixes: str | List[str] | None = None) Dict[str, List[str]][源代码]

Traverse a path to find all files with the specified suffixes.

参数:
  • path -- path (str/Path): source path

  • suffixes -- specified file suffixes, '.txt' or ['.txt', '.md'] etc

返回:

list of all files with the specified suffixes

data_juicer.utils.file_utils.is_absolute_path(path: str | Path) bool[源代码]

Check whether input path is a absolute path.

参数:

path -- input path

返回:

True means input path is absolute path, False means input path is a relative path.

data_juicer.utils.file_utils.add_suffix_to_filename(filename, suffix)[源代码]

Add a suffix to the filename. Only regard the content after the last dot as the file extension. E.g. 1. abc.jpg + "_resized" --> abc_resized.jpg 2. edf.xyz.csv + "_processed" --> edf.xyz_processed.csv 3. /path/to/file.json + "_suf" --> /path/to/file_suf.json 4. ds.tar.gz + "_whoops" --> ds.tar_whoops.gz (maybe unexpected)

参数:
  • filename -- input filename

  • suffix -- suffix string to be added

data_juicer.utils.file_utils.create_directory_if_not_exists(directory_path)[源代码]

create a directory if not exists, this function is process safe

参数:

directory_path -- directory path to be create

data_juicer.utils.file_utils.transfer_data_dir(original_dir, op_name)[源代码]

Transfer the original multimodal data dir to a new dir to store the newly generated multimodal data. The pattern is {original_dir}/__dj__produced_data__/{op_name}

data_juicer.utils.file_utils.transfer_filename(original_filepath: str | Path, op_name, **op_kwargs)[源代码]

According to the op and hashing its parameters 'op_kwargs' addition to the process id and current time as the 'hash_val', map the original_filepath to another unique file path. E.g.

  1. abc.jpg -->

    __dj__produced_data__/{op_name}/ abc__dj_hash_#{hash_val}#.jpg

  2. ./abc.jpg -->

    ./__dj__produced_data__/{op_name}/ abc__dj_hash_#{hash_val}#.jpg

  3. /path/to/abc.jpg -->

    /path/to/__dj__produced_data__/{op_name}/ abc__dj_hash_#{hash_val}#.jpg

  4. /path/to/__dj__produced_data__/{op_name}/

    abc__dj_hash_#{hash_val1}#.jpg --> /path/to/__dj__produced_data__/{op_name}/ abc__dj_hash_#{hash_val2}#.jpg

data_juicer.utils.file_utils.copy_data(from_dir, to_dir, data_path)[源代码]

Copy data from from_dir/data_path to to_dir/data_path. Return True if success.

data_juicer.utils.file_utils.expand_outdir_and_mkdir(outdir)[源代码]
data_juicer.utils.file_utils.single_partition_write_with_filename(df: DataFrame, output_file_dir: str, keep_filename_column: bool = False, output_type: str = 'jsonl') Series[源代码]

This function processes a DataFrame and writes it to disk

参数:
  • df -- A DataFrame.

  • output_file_dir -- The output file path.

  • keep_filename_column -- Whether to keep or drop the "filename" column, if it exists.

  • output_type="jsonl" -- The type of output file to write.

返回:

If the DataFrame is non-empty, return a Series containing a single element, True. If the DataFrame is empty, return a Series containing a single element, False.

data_juicer.utils.file_utils.read_single_partition(files, filetype='jsonl', add_filename=False, input_meta: str | dict | None = None, columns: List[str] | None = None, **kwargs) DataFrame[源代码]

This function reads a file with cuDF, sorts the columns of the DataFrame and adds a "filename" column.

参数:
  • files -- The path to the jsonl files to read.

  • add_filename -- Whether to add a "filename" column to the DataFrame.

  • input_meta -- A dictionary or a string formatted as a dictionary, which outlines the field names and their respective data types within the JSONL input file.

  • columns -- If not None, only these columns will be read from the file. There is a significant performance gain when specifying columns for Parquet files.

返回:

A pandas DataFrame.

data_juicer.utils.file_utils.get_all_files_paths_under(root, recurse_subdirectories=True, followlinks=False)[源代码]

This function returns a list of all the files under a specified directory. :param root: The path to the directory to read. :param recurse_subdirecties: Whether to recurse into subdirectories.

Please note that this can be slow for large number of files.

参数:

followlinks -- Whether to follow symbolic links.

data_juicer.utils.fingerprint_utils module

class data_juicer.utils.fingerprint_utils.Hasher[源代码]

基类:object

Hasher that accepts python objects as inputs.

dispatch: Dict = {}
__init__()[源代码]
classmethod hash_bytes(value: bytes | List[bytes]) str[源代码]
classmethod hash_default(value: Any) str[源代码]

Use dill to serialize objects to avoid serialization failures.

classmethod hash(value: Any) str[源代码]
update(value: Any) None[源代码]
hexdigest() str[源代码]
data_juicer.utils.fingerprint_utils.update_fingerprint(fingerprint, transform, transform_args)[源代码]

Combining various objects to update the fingerprint.

data_juicer.utils.fingerprint_utils.generate_fingerprint(ds, *args, **kwargs)[源代码]

Generate new fingerprints by using various kwargs of the dataset.

data_juicer.utils.lazy_loader module

A LazyLoader class for on-demand module loading with uv integration.

data_juicer.utils.lazy_loader.get_toml_file_path()[源代码]

Get the path to pyproject.toml file.

data_juicer.utils.lazy_loader.get_uv_lock_path()[源代码]

Get the path to uv.lock file.

class data_juicer.utils.lazy_loader.LazyLoader(module_name: str, package_name: str | None = None, package_url: str | None = None, auto_install: bool = True)[源代码]

基类:ModuleType

Lazily import a module, mainly to avoid pulling in large dependencies. Uses uv for fast dependency installation when available.

classmethod get_package_name(module_name: str) str[源代码]

Convert a module name to its corresponding package name.

参数:

module_name -- The name of the module (e.g., 'cv2', 'PIL')

返回:

The corresponding package name (e.g., 'opencv-python', 'Pillow')

返回类型:

str

classmethod reset_dependencies_cache()[源代码]

Reset the dependencies cache.

classmethod get_all_dependencies()[源代码]

Get all dependencies, prioritizing uv.lock if available. Falls back to pyproject.toml if uv.lock is not found or fails to parse.

返回:

A dictionary mapping module names to their full package specifications

e.g. {'numpy': 'numpy>=1.26.4,<2.0.0', 'pandas': 'pandas>=2.0.0'}

返回类型:

dict

classmethod check_packages(package_specs, pip_args=None)[源代码]

Check if packages are installed and install them if needed.

参数:
  • package_specs -- A list of package specifications to check/install. Can be package names or URLs (e.g., 'torch' or 'git+https://github.com/...')

  • pip_args -- Optional list of additional arguments to pass to pip install command (e.g., ['--no-deps', '--upgrade'])

__init__(module_name: str, package_name: str | None = None, package_url: str | None = None, auto_install: bool = True)[源代码]

Initialize the LazyLoader.

参数:
  • module_name -- The name of the module to import (e.g., 'cv2', 'ray.data', 'torchvision.models')

  • package_name -- The name of the pip package to install (e.g., 'opencv-python', 'ray', 'torchvision') If None, will use the base module name (e.g., 'ray' for 'ray.data')

  • package_url -- The URL to install the package from (e.g., git+https://github.com/...)

  • auto_install -- Whether to automatically install missing dependencies

data_juicer.utils.logger_utils module

data_juicer.utils.logger_utils.get_caller_name(depth=0)[源代码]

Get caller name by depth.

参数:

depth -- depth of caller context, use 0 for caller depth.

返回:

module name of the caller

class data_juicer.utils.logger_utils.StreamToLoguru(level='INFO', caller_names=('datasets', 'logging'))[源代码]

基类:object

Stream object that redirects writes to a logger instance.

__init__(level='INFO', caller_names=('datasets', 'logging'))[源代码]

Initialization method.

参数:
  • level -- log level string of loguru. Default value: "INFO".

  • caller_names -- caller names of redirected module. Default value: (apex, pycocotools).

fileno()[源代码]
write(buf)[源代码]
getvalue()[源代码]
flush()[源代码]
isatty()[源代码]
data_juicer.utils.logger_utils.redirect_sys_output(log_level='INFO')[源代码]

Redirect stdout/stderr to loguru with log level.

参数:

log_level -- log level string of loguru. Default value: "INFO".

data_juicer.utils.logger_utils.get_log_file_path()[源代码]

Get the path to the location of the log file.

返回:

a location of log file.

data_juicer.utils.logger_utils.setup_logger(save_dir, distributed_rank=0, filename='log.txt', mode='o', level='INFO', redirect=True)[源代码]

Setup logger for training and testing.

参数:
  • save_dir -- location to save log file

  • distributed_rank -- device rank when multi-gpu environment

  • filename -- log file name to save

  • mode -- log file write mode, append or override. default is o.

  • level -- log severity level. It's "INFO" in default.

  • redirect -- whether to redirect system output

返回:

logger instance.

data_juicer.utils.logger_utils.make_log_summarization(max_show_item=10)[源代码]
class data_juicer.utils.logger_utils.HiddenPrints[源代码]

基类:object

Define a range that hide the outputs within this range.

data_juicer.utils.mm_utils module

class data_juicer.utils.mm_utils.SpecialTokens[源代码]

基类:object

image = '<__dj__image>'
audio = '<__dj__audio>'
video = '<__dj__video>'
eoc = '<|__dj__eoc|>'
data_juicer.utils.mm_utils.AV_STREAM_THREAD_TYPE = 'AUTO'

av stream thread type support "SLICE", "FRAME", "AUTO".

"SLICE": Decode more than one part of a single frame at once

"FRAME": Decode more than one frame at once

"AUTO": Using both "FRAME" and "SLICE" AUTO is faster when there are no video latency.

data_juicer.utils.mm_utils.get_special_tokens()[源代码]
data_juicer.utils.mm_utils.remove_special_tokens(text)[源代码]
data_juicer.utils.mm_utils.remove_non_special_tokens(text)[源代码]
data_juicer.utils.mm_utils.load_data_with_context(sample, context, loaded_data_keys, load_func)[源代码]

The unified loading function with contexts for multimodal data.

data_juicer.utils.mm_utils.load_images(paths)[源代码]
data_juicer.utils.mm_utils.load_images_byte(paths)[源代码]
data_juicer.utils.mm_utils.load_image(path)[源代码]
data_juicer.utils.mm_utils.load_image_byte(path)[源代码]
data_juicer.utils.mm_utils.image_path_to_base64(image_path)[源代码]
data_juicer.utils.mm_utils.image_byte_to_base64(image_byte)[源代码]
data_juicer.utils.mm_utils.pil_to_opencv(pil_image)[源代码]
data_juicer.utils.mm_utils.detect_faces(image, detector, **extra_kwargs)[源代码]
data_juicer.utils.mm_utils.get_file_size(path)[源代码]
data_juicer.utils.mm_utils.iou(box1, box2)[源代码]
data_juicer.utils.mm_utils.calculate_resized_dimensions(original_size: Tuple[Annotated[int, Gt(gt=0)], Annotated[int, Gt(gt=0)]], target_size: Annotated[int, Gt(gt=0)] | Tuple[Annotated[int, Gt(gt=0)], Annotated[int, Gt(gt=0)]], max_length: int | None = None, divisible: Annotated[int, Gt(gt=0)] = 1) Tuple[int, int][源代码]

Resize dimensions based on specified constraints.

参数:
  • original_size -- The original dimensions as (height, width).

  • target_size -- Desired target size; can be a single integer (short edge) or a tuple (height, width).

  • max_length -- Maximum allowed length for the longer edge.

  • divisible -- The number that the dimensions must be divisible by.

返回:

Resized dimensions as (height, width).

data_juicer.utils.mm_utils.load_audios(paths)[源代码]
data_juicer.utils.mm_utils.load_audio(path, sampling_rate=None)[源代码]
data_juicer.utils.mm_utils.load_videos(paths)[源代码]
data_juicer.utils.mm_utils.load_video(path, mode='r')[源代码]

Load a video using its path.

参数:
  • path -- the path to this video.

  • mode -- the loading mode. It's "r" in default.

返回:

a container object form PyAv library, which contains all streams in this video (video/audio/...) and can be used to decode these streams to frames.

data_juicer.utils.mm_utils.get_video_duration(input_video: str | InputContainer, video_stream_index: int = 0)[源代码]

Get the video's duration from the container

参数:
  • input_video -- the container object form PyAv library, which contains all streams in this video (video/audio/...) and can be used to decode these streams to frames.

  • video_stream_index -- the video stream index to decode, default set to 0.

返回:

duration of the video in second

data_juicer.utils.mm_utils.get_decoded_frames_from_video(input_video: str | InputContainer, video_stream_index: int = 0)[源代码]

Get the video's frames from the container

参数:
  • input_video -- the container object form PyAv library, which contains all streams in this video (video/audio/...) and can be used to decode these streams to frames.

  • video_stream_index -- the video stream index to decode, default set to 0.

返回:

an iterator of all the frames of the video

data_juicer.utils.mm_utils.cut_video_by_seconds(input_video: str | InputContainer, output_video: str, start_seconds: float, end_seconds: float | None = None)[源代码]

Cut a video into several segments by times in second.

参数:
  • input_video -- the path to input video or the video container.

  • output_video -- the path to output video.

  • start_seconds -- the start time in second.

  • end_seconds -- the end time in second. If it's None, this function will cut the video from the start_seconds to the end of the video.

返回:

a boolean flag indicating whether the video was successfully cut or not.

data_juicer.utils.mm_utils.process_each_frame(input_video: str | InputContainer, output_video: str, frame_func)[源代码]

Process each frame in video by replacing each frame by frame_func(frame).

参数:
  • input_video -- the path to input video or the video container.

  • output_video -- the path to output video.

  • frame_func -- a function which inputs a frame and outputs another frame.

data_juicer.utils.mm_utils.extract_key_frames_by_seconds(input_video: str | InputContainer, duration: float = 1)[源代码]

Extract key frames by seconds. :param input_video: input video path or av.container.InputContainer. :param duration: duration of each video split in seconds.

data_juicer.utils.mm_utils.extract_key_frames(input_video: str | InputContainer)[源代码]

Extract key frames from the input video. If there is no keyframes in the video, return the first frame.

参数:

input_video -- input video path or container.

返回:

a list of key frames.

data_juicer.utils.mm_utils.get_key_frame_seconds(input_video: str | InputContainer)[源代码]

Get seconds of key frames in the input video.

data_juicer.utils.mm_utils.extract_video_frames_uniformly_by_seconds(input_video: str | InputContainer, frame_num: Annotated[int, Gt(gt=0)], duration: float = 1)[源代码]

Extract video frames uniformly by seconds. :param input_video: input video path or av.container.InputContainer. :param frame_num: the number of frames to be extracted uniformly from

each video split by duration.

参数:

duration -- duration of each video split in seconds.

data_juicer.utils.mm_utils.extract_video_frames_uniformly(input_video: str | InputContainer, frame_num: Annotated[int, Gt(gt=0)])[源代码]

Extract a number of video frames uniformly within the video duration.

参数:
  • input_video -- input video path or container.

  • frame_num -- The number of frames to be extracted. If it's 1, only the middle frame will be extracted. If it's 2, only the first and the last frames will be extracted. If it's larger than 2, in addition to the first and the last frames, other frames will be extracted uniformly within the video duration.

返回:

a list of extracted frames.

data_juicer.utils.mm_utils.extract_audio_from_video(input_video: str | InputContainer, output_audio: str | None = None, start_seconds: int = 0, end_seconds: int | None = None, stream_indexes: int | List[int] | None = None)[源代码]

Extract audio data for the given video.

参数:
  • input_video -- input video. Can be a video path or an av.container.InputContainer.

  • output_audio -- output audio path. If it's None, the audio data won't be written to file. If stream_indexes is not None, it will output multiple audio files with original filename and the stream indexes. Default: None.

  • start_seconds -- the start seconds to extract audio data. Default: 0, which means extract from the start of the video.

  • end_seconds -- the end seconds to stop extracting audio data. If it's None, the extraction won't stop until the end of the video. Default: None.

  • stream_indexes -- there might be multiple audio streams in the video, so we need to decide which audio streams with stream_indexes will be extracted. It can be a single index or a list of indexes. If it's None, all audio streams will be extracted. Default: None.

data_juicer.utils.mm_utils.size_to_bytes(size)[源代码]
data_juicer.utils.mm_utils.insert_texts_after_placeholders(original_string, placeholders, new_texts, delimiter_in_insert_pos=' ')[源代码]
data_juicer.utils.mm_utils.timecode_string_to_seconds(timecode: str)[源代码]

Convert a timecode string to the float seconds.

参数:

timecode -- the input timecode string. Must in "HH:MM:SS.fff(fff)" format.

data_juicer.utils.mm_utils.parse_string_to_roi(roi_string, roi_type='pixel')[源代码]

Convert a roi string to four number x1, y1, x2, y2 stand for the region. When the type is 'pixel', (x1, y1), (x2, y2) are the locations of pixels in the top left corner and the bottom right corner respectively. If the roi_type is 'ratio', the coordinates are normalized by widths and heights.

参数:

roi_string -- the roi string

Patam roi_type:

the roi string type

return tuple of (x1, y1, x2, y2) if roi_string is valid, else None

data_juicer.utils.mm_utils.close_video(container: InputContainer)[源代码]

Close the video stream and container to avoid memory leak.

参数:

container -- the video container.

data_juicer.utils.model_utils module

data_juicer.utils.model_utils.check_model(model_name, force=False)[源代码]

Check whether a model exists in DATA_JUICER_MODELS_CACHE. If exists, return its full path. Else, download it from cached models links.

参数:
  • model_name -- a specified model name

  • force -- Whether to download model forcefully or not, Sometimes the model file maybe incomplete for some reason, so need to download again forcefully.

class data_juicer.utils.model_utils.APIModel(model, endpoint=None, response_path=None, **kwargs)[源代码]

基类:object

__init__(model, endpoint=None, response_path=None, **kwargs)[源代码]

Initializes an instance of the APIModel class.

参数:
  • model -- The name of the model to be used for making API calls. This should correspond to a valid model identifier recognized by the API server.

  • endpoint -- The URL endpoint for the API. If provided as a relative path, it will be appended to the base URL (defined by the OPENAI_BASE_URL environment variable or through an additional base_url parameter). Defaults to '/chat/completions' for OpenAI compatibility.

  • response_path -- A dot-separated string specifying the path to extract the desired content from the API response. The default value is 'choices.0.message.content', which corresponds to the typical structure of an OpenAI API response.

  • kwargs -- Additional keyword arguments for configuring the internal OpenAI client.

data_juicer.utils.model_utils.prepare_api_model(model, *, endpoint=None, response_path=None, return_processor=False, processor_config=None, **model_params)[源代码]

Creates a callable API model for interacting with OpenAI-compatible API. The callable supports custom response parsing and works with proxy servers that may be incompatible.

参数:
  • model -- The name of the model to interact with.

  • endpoint -- The URL endpoint for the API. If provided as a relative path, it will be appended to the base URL (defined by the OPENAI_BASE_URL environment variable or through an additional base_url parameter). By default, it is set to '/chat/completions' for OpenAI compatibility.

  • response_path -- The dot-separated path to extract desired content from the API response. Defaults to 'choices.0.message.content'.

  • return_processor -- A boolean flag indicating whether to return a processor along with the model. The processor can be used for tasks like tokenization or encoding. Defaults to False.

  • processor_config -- A dictionary containing configuration parameters for initializing a Hugging Face processor. It is only relevant if return_processor is set to True.

  • model_params -- Additional parameters for configuring the API model.

返回:

A callable APIModel instance, and optionally a processor if return_processor is True.

data_juicer.utils.model_utils.prepare_diffusion_model(pretrained_model_name_or_path, diffusion_type, **model_params)[源代码]

Prepare and load an Diffusion model from HuggingFace.

参数:
  • pretrained_model_name_or_path -- input Diffusion model name or local path to the model

  • diffusion_type -- the use of the diffusion model. It can be 'image2image', 'text2image', 'inpainting'

返回:

a Diffusion model.

data_juicer.utils.model_utils.prepare_fastsam_model(model_path, **model_params)[源代码]
data_juicer.utils.model_utils.prepare_fasttext_model(model_name='lid.176.bin', **model_params)[源代码]

Prepare and load a fasttext model.

参数:

model_name -- input model name

返回:

model instance.

data_juicer.utils.model_utils.prepare_huggingface_model(pretrained_model_name_or_path, *, return_model=True, return_pipe=False, pipe_task='text-generation', **model_params)[源代码]

Prepare and load a huggingface model.

参数:
  • pretrained_model_name_or_path -- model name or path

  • return_model -- return model or not

  • return_pipe -- return pipeline or not

  • pipe_task -- task for pipeline

返回:

a tuple (model, processor) if return_model is True; otherwise, only the processor is returned.

data_juicer.utils.model_utils.prepare_kenlm_model(lang, name_pattern='{}.arpa.bin', **model_params)[源代码]

Prepare and load a kenlm model.

参数:
  • model_name -- input model name in formatting syntax.

  • lang -- language to render model name

返回:

model instance.

data_juicer.utils.model_utils.prepare_nltk_model(lang, name_pattern='punkt.{}.pickle', **model_params)[源代码]

Prepare and load a nltk punkt model with enhanced resource handling.

参数:
  • model_name -- input model name in formatting syntax

  • lang -- language to render model name

返回:

model instance.

data_juicer.utils.model_utils.prepare_nltk_pos_tagger(**model_params)[源代码]
Prepare and load NLTK's part-of-speech tagger with enhanced resource

handling.

返回:

The POS tagger model

data_juicer.utils.model_utils.prepare_opencv_classifier(model_path, **model_params)[源代码]
data_juicer.utils.model_utils.prepare_recognizeAnything_model(pretrained_model_name_or_path='ram_plus_swin_large_14m.pth', input_size=384, **model_params)[源代码]

Prepare and load recognizeAnything model.

参数:
  • model_name -- input model name.

  • input_size -- the input size of the model.

data_juicer.utils.model_utils.prepare_sdxl_prompt2prompt(pretrained_model_name_or_path, pipe_func, torch_dtype='fp32', device='cpu')[源代码]
data_juicer.utils.model_utils.prepare_sentencepiece_model(model_path, **model_params)[源代码]

Prepare and load a sentencepiece model.

参数:

model_path -- input model path

返回:

model instance

data_juicer.utils.model_utils.prepare_sentencepiece_for_lang(lang, name_pattern='{}.sp.model', **model_params)[源代码]

Prepare and load a sentencepiece model for specific language.

参数:
  • lang -- language to render model name

  • name_pattern -- pattern to render the model name

返回:

model instance.

data_juicer.utils.model_utils.prepare_simple_aesthetics_model(pretrained_model_name_or_path, *, return_model=True, **model_params)[源代码]

Prepare and load a simple aesthetics model.

参数:
  • pretrained_model_name_or_path -- model name or path

  • return_model -- return model or not

返回:

a tuple (model, input processor) if return_model is True; otherwise, only the processor is returned.

data_juicer.utils.model_utils.prepare_spacy_model(lang, name_pattern='{}_core_web_md-3.7.0', **model_params)[源代码]

Prepare spacy model for specific language.

参数:

lang -- language of sapcy model. Should be one of ["zh", "en"]

返回:

corresponding spacy model

data_juicer.utils.model_utils.prepare_video_blip_model(pretrained_model_name_or_path, *, return_model=True, **model_params)[源代码]

Prepare and load a video-clip model with the corresponding processor.

参数:
  • pretrained_model_name_or_path -- model name or path

  • return_model -- return model or not

  • trust_remote_code -- passed to transformers

返回:

a tuple (model, input processor) if return_model is True; otherwise, only the processor is returned.

data_juicer.utils.model_utils.prepare_vllm_model(pretrained_model_name_or_path, **model_params)[源代码]

Prepare and load a HuggingFace model with the corresponding processor.

参数:
  • pretrained_model_name_or_path -- model name or path

  • model_params -- LLM initialization parameters.

返回:

a tuple of (model, tokenizer)

data_juicer.utils.model_utils.update_sampling_params(sampling_params, pretrained_model_name_or_path, enable_vllm=False)[源代码]
data_juicer.utils.model_utils.prepare_model(model_type, **model_kwargs)[源代码]
data_juicer.utils.model_utils.get_model(model_key=None, rank=None, use_cuda=False)[源代码]
data_juicer.utils.model_utils.free_models(clear_model_zoo=True)[源代码]

data_juicer.utils.nltk_utils module

Utilities for working with NLTK in Data-Juicer.

This module provides utility functions for handling NLTK-specific operations, including pickle security patches and data downloading.

data_juicer.utils.nltk_utils.ensure_nltk_resource(resource_path, fallback_package=None)[源代码]

Ensure a specific NLTK resource is available and accessible.

This function attempts to find and load a resource, and if it fails, downloads the specified fallback package.

参数:
  • resource_path -- The path to the resource to check

  • fallback_package -- The package to download if the resource isn't found

返回:

True if the resource is available, False otherwise

返回类型:

bool

data_juicer.utils.nltk_utils.clean_nltk_cache(packages=None, complete_reset=False)[源代码]

Clean NLTK model cache.

参数:
  • packages (list, optional) -- List of package names to clean. If None, cleans all package caches.

  • complete_reset (bool, optional) -- If True, deletes all NLTK data. Default is False.

data_juicer.utils.nltk_utils.patch_nltk_pickle_security()[源代码]

Patch NLTK's pickle security restrictions to allow loading models.

NLTK 3.9+ introduced strict pickle security that prevents loading some models. This function patches NLTK to bypass those restrictions while maintaining security.

This should be called once during initialization before any NLTK functions are used.

data_juicer.utils.nltk_utils.create_physical_resource_alias(source_path, alias_path)[源代码]

Create a physical file alias for NLTK resources.

This function creates a hard link, symlink, or copy of a source resource to a target alias path. This is useful for problematic resources that might be requested with a path that doesn't match NLTK's structure.

参数:
  • source_path -- The full path to the source file

  • alias_path -- The full path where the alias should be created

返回:

True if the alias was created successfully, False otherwise

返回类型:

bool

data_juicer.utils.nltk_utils.setup_resource_aliases()[源代码]

Create physical file aliases for common problematic NLTK resources.

This function creates aliases/copies of resources that have known problematic paths to ensure they can be found regardless of how they're requested.

data_juicer.utils.process_utils module

data_juicer.utils.process_utils.setup_mp(method=None)[源代码]
data_juicer.utils.process_utils.get_min_cuda_memory()[源代码]
data_juicer.utils.process_utils.calculate_np(name, mem_required, cpu_required, num_proc=None, use_cuda=False)[源代码]

Calculate the optimum number of processes for the given OP

data_juicer.utils.registry module

class data_juicer.utils.registry.Registry(name: str)[源代码]

基类:object

This class is used to register some modules to registry by a repo name.

__init__(name: str)[源代码]

Initialization method.

参数:

name -- a registry repo name

property name

Get name of current registry.

返回:

name of current registry.

property modules

Get all modules in current registry.

返回:

a dict storing modules in current registry.

list()[源代码]

Logging the list of module in current registry.

get(module_key)[源代码]

Get module named module_key from in current registry. If not found, return None.

参数:

module_key -- specified module name

返回:

module named module_key

register_module(module_name: str | None = None, module_cls: type | None = None, force=False)[源代码]

Register module class object to registry with the specified modulename.

参数:
  • module_name -- module name

  • module_cls -- module class object

  • force -- Whether to override an existing class with the same name. Default: False.

示例

>>> registry = Registry()
>>> @registry.register_module()
>>> class TextFormatter:
>>>     pass
>>> class TextFormatter2:
>>>     pass
>>> registry.register_module( module_name='text_formatter2',
                            module_cls=TextFormatter2)

data_juicer.utils.resource_utils module

data_juicer.utils.resource_utils.query_cuda_info(query_key)[源代码]
data_juicer.utils.resource_utils.get_cpu_count()[源代码]
data_juicer.utils.resource_utils.get_cpu_utilization()[源代码]
data_juicer.utils.resource_utils.query_mem_info(query_key)[源代码]

data_juicer.utils.sample module

data_juicer.utils.sample.random_sample(dataset, weight=1.0, sample_number=0, seed=None)[源代码]

Randomly sample a subset from a dataset with weight or number, if sample number is bigger than 0, we will use sample number instead of weight. :param dataset: a HuggingFace dataset :param weight: sample ratio of dataset :param sample_number: sample number of dataset :param seed: random sample seed, if None, 42 as default :return: a subset of dataset

data_juicer.utils.unittest_utils module

data_juicer.utils.unittest_utils.TEST_TAG(*tags)[源代码]

Tags for test case. Currently, standalone, ray are supported.

data_juicer.utils.unittest_utils.set_clear_model_flag(flag)[源代码]
class data_juicer.utils.unittest_utils.DataJuicerTestCaseBase(methodName='runTest')[源代码]

基类:TestCase

classmethod setUpClass()[源代码]

Hook method for setting up class fixture before running tests in the class.

classmethod tearDownClass(hf_model_name=None) None[源代码]

Hook method for deconstructing the class fixture after running all tests in the class.

classmethod tearDown() None[源代码]

Hook method for deconstructing the test fixture after testing it.

generate_dataset(data) DJDataset[源代码]

Generate dataset for a specific executor.

参数:
  • type (str, optional) -- "standalone" or "ray".

  • "standalone". (Defaults to)

run_single_op(dataset: DJDataset, op, column_names)[源代码]

Run operator in the specific executor.

assertDatasetEqual(first, second)[源代码]
data_juicer.utils.unittest_utils.get_diff_files(prefix_filter=['data_juicer/', 'tests/'])[源代码]

Get git diff files in target dirs except the __init__.py files

data_juicer.utils.unittest_utils.find_corresponding_test_file(file_path)[源代码]
data_juicer.utils.unittest_utils.get_partial_test_cases()[源代码]

Module contents