Source code for data_juicer.utils.constant

import copy
import inspect
import io
import os
from enum import Enum

import zstandard as zstd
from loguru import logger

DEFAULT_PREFIX = "__dj__"


[docs] class Fields(object): # for storing stats generated by filter op stats = DEFAULT_PREFIX + "stats__" # for storing metas generated by mapper op meta = DEFAULT_PREFIX + "meta__" # for storing metas of batch samples generated by aggregator op batch_meta = DEFAULT_PREFIX + "batch_meta__" context = DEFAULT_PREFIX + "context__" suffix = DEFAULT_PREFIX + "suffix__" # the name of the original file from which this sample was derived. source_file = DEFAULT_PREFIX + "source_file__" # the name of directory to store the produced multimodal data multimodal_data_output_dir = DEFAULT_PREFIX + "produced_data__"
[docs] class BatchMetaKeys(object): entity_attribute = "entity_attribute" most_relevant_entities = "most_relevant_entities"
[docs] class MetaKeys(object): # === text related tags === # # sentiment dialog_sentiment_intensity = "dialog_sentiment_intensity" dialog_sentiment_intensity_analysis = "dialog_sentiment_intensity_analysis" query_sentiment_label = "query_sentiment_label" query_sentiment_score = "query_sentiment_label_score" dialog_sentiment_labels = "dialog_sentiment_labels" dialog_sentiment_labels_analysis = "dialog_sentiment_labels_analysis" # # intent dialog_intent_labels = "dialog_intent_labels" dialog_intent_labels_analysis = "dialog_intent_labels_analysis" query_intent_label = "query_intent_label" query_intent_score = "query_intent_label_score" # # topic dialog_topic_labels = "dialog_topic_labels" dialog_topic_labels_analysis = "dialog_topic_labels_analysis" query_topic_label = "query_topic_label" query_topic_score = "query_topic_label_score" # === multi-modal related tags === # # video-frame tags video_frame_tags = "video_frame_tags" # # video-audio tags video_audio_tags = "video_audio_tags" # # video frames video_frames = "video_frames" # # image tags image_tags = "image_tags" # bounding box tag bbox_tag = DEFAULT_PREFIX + "bbox__" # === info extraction related tags === # # for event extraction event_description = "event_description" # # a list of characters relevant to the event relevant_characters = "relevant_characters" # # the given main entities for attribute extraction main_entities = "main_entities" # # the given attributes to be extracted attributes = "attributes" # # the extracted attribute descriptions attribute_descriptions = "attribute_descriptions" # # extract from raw data for support the attribute attribute_support_texts = "attribute_support_texts" # # the nickname relationship nickname = "nickname" # # the entity for knowledge graph entity = "entity" # # # the name of entity entity_name = "entity_name" # # # the type of entity entity_type = "entity_type" # # # the description of entity entity_description = "entity_entity_description" # # the relationship for knowledge graph relation = "relation" # # # the source entity of the relation source_entity = "relation_source_entity" # # # the target entity of the relation target_entity = "relation_target_entity" # # # the description of the relation relation_description = "relation_description" # # # the keywords of the relation relation_keywords = "relation_keywords" # # # the strength of the relation relation_strength = "relation_strength" # # the keyword in a text keyword = "keyword" # # support text support_text = "support_text" # # role relation role_relation = "role_relation" # # html tables html_tables = "html_tables"
[docs] class StatsKeysMeta(type): """ a helper class to track the mapping from OP's name to its used stats_keys e.g., # once the AlphanumericFilter's compute_stats method has been called res = TrackingDescriptor.get_access_log() print(res) # {"AlphanumericFilter": ["alnum_ratio", "alpha_token_ratio"]} """ _accessed_by = {} def __getattr__(cls, attr): caller_class = inspect.currentframe().f_back.f_globals["__name__"] # no need to track the parent classes caller_class = caller_class.split(".")[-1] stat_key = getattr(cls._constants_class, attr) if caller_class not in cls._accessed_by: cls._accessed_by[caller_class] = set() if stat_key not in cls._accessed_by[caller_class]: cls._accessed_by[caller_class].add(stat_key) return stat_key
[docs] def get_access_log(cls, dj_cfg=None, dataset=None): if cls._accessed_by: return cls._accessed_by elif dj_cfg and dataset: tmp_dj_cfg = copy.deepcopy(dj_cfg) tmp_dj_cfg.use_cache = False tmp_dj_cfg.use_checkpoint = False from data_juicer.config import get_init_configs from data_juicer.core import Analyzer tmp_analyzer = Analyzer(get_init_configs(tmp_dj_cfg)) dataset = dataset.take(1) # do not overwrite the true analysis results tmp_analyzer.run(dataset=dataset, skip_export=True) elif dj_cfg: tmp_dj_cfg = copy.deepcopy(dj_cfg) # the access has been skipped due to the use of cache # we will using a temp data sample to get the access log if os.path.exists(dj_cfg.dataset_path) and ( "jsonl" in dj_cfg.dataset_path or "jsonl.zst" in dj_cfg.dataset_path ): logger.info("Begin to track the usage of ops with a dummy data sample") # load the first line as tmp_data tmp_f_name = None first_line = None if "jsonl.zst" in dj_cfg.dataset_path: tmp_f_name = dj_cfg.dataset_path.replace(".jsonl.zst", ".tmp.jsonl") # Open the file in binary mode and # create a Zstandard decompression context with open(dj_cfg.dataset_path, "rb") as compressed_file: dctx = zstd.ZstdDecompressor() # Create a stream reader for the file and decode the # first line with dctx.stream_reader(compressed_file) as reader: text_stream = io.TextIOWrapper(reader, encoding="utf-8") first_line = text_stream.readline() elif "jsonl" in dj_cfg.dataset_path: tmp_f_name = dj_cfg.dataset_path.replace(".jsonl", ".tmp.jsonl") with open(dj_cfg.dataset_path, "r", encoding="utf-8") as orig_file: first_line = orig_file.readline() assert tmp_f_name is not None and first_line is not None, ( "error when loading the first line, when " f"dj_cfg.dataset_path={dj_cfg.dataset_path}" ) with open(tmp_f_name, "w", encoding="utf-8") as tmp_file: tmp_file.write(first_line) tmp_dj_cfg.dataset_path = tmp_f_name tmp_dj_cfg.use_cache = False tmp_dj_cfg.use_checkpoint = False from data_juicer.config import get_init_configs from data_juicer.core import Analyzer tmp_analyzer = Analyzer(get_init_configs(tmp_dj_cfg)) # do not overwrite the true analysis results tmp_analyzer.run(skip_export=True) os.remove(tmp_f_name) else: raise NotImplementedError( f"For now, the dummy data is supported for only jsonl type" f". Please check your config as {dj_cfg.dataset_path} is " f"either not existed or in jsonl type." ) return cls._accessed_by
[docs] class StatsKeysConstant(object): # === text === alpha_token_ratio = "alpha_token_ratio" alnum_ratio = "alnum_ratio" avg_line_length = "avg_line_length" char_rep_ratio = "char_rep_ratio" flagged_words_ratio = "flagged_words_ratio" lang = "lang" lang_score = "lang_score" max_line_length = "max_line_length" perplexity = "perplexity" special_char_ratio = "special_char_ratio" stopwords_ratio = "stopwords_ratio" text_len = "text_len" text_pair_similarity = "text_pair_similarity" num_action = "num_action" num_dependency_edges = "num_dependency_edges" num_token = "num_token" num_words = "num_words" word_rep_ratio = "word_rep_ratio" llm_analysis_score = "llm_analysis_score" llm_analysis_record = "llm_analysis_record" llm_quality_score = "llm_quality_score" llm_quality_record = "llm_quality_record" llm_difficulty_score = "llm_difficulty_score" llm_difficulty_record = "llm_difficulty_record" # === image === aspect_ratios = "aspect_ratios" image_width = "image_width" image_height = "image_height" image_sizes = "image_sizes" face_ratios = "face_ratios" face_detections = "face_detections" face_counts = "face_counts" image_aesthetics_scores = "image_aesthetics_scores" image_nsfw_score = "image_nsfw_score" image_watermark_prob = "image_watermark_prob" image_pair_similarity = "image_pair_similarity" # === audios === audio_duration = "audio_duration" audio_nmf_snr = "audio_nmf_snr" audio_sizes = "audio_sizes" # === videos === video_duration = "video_duration" video_aspect_ratios = "video_aspect_ratios" video_width = "video_width" video_height = "video_height" video_ocr_area_ratio = "video_ocr_area_ratio" video_aesthetic_score = "video_aesthetic_score" video_frames_aesthetics_score = "video_frames_aesthetics_score" video_motion_score = "video_motion_score" video_nsfw_score = "video_nsfw_score" video_watermark_prob = "video_watermark_prob" # === multimodal === # image-text image_text_similarity = "image_text_similarity" image_text_matching_score = "image_text_matching_score" phrase_grounding_recall = "phrase_grounding_recall" # video-text video_frames_text_similarity = "video_frames_text_similarity" # general-field-filter general_field_filter_condition = "general_field_filter_condition"
[docs] class StatsKeys(object, metaclass=StatsKeysMeta): _constants_class = StatsKeysConstant
[docs] class HashKeys(object): uid = DEFAULT_PREFIX + "uid" hash = DEFAULT_PREFIX + "hash" minhash = DEFAULT_PREFIX + "minhash" simhash = DEFAULT_PREFIX + "simhash" # image imagehash = DEFAULT_PREFIX + "imagehash" # video videohash = DEFAULT_PREFIX + "videohash" # duplicate flag is_unique = DEFAULT_PREFIX + "is_unique"
[docs] class InterVars(object): # === text === lines = DEFAULT_PREFIX + "lines" words = DEFAULT_PREFIX + "words" refined_words = DEFAULT_PREFIX + "refined_words" # === image === loaded_images = DEFAULT_PREFIX + "loaded_images" # Image # === audios === loaded_audios = DEFAULT_PREFIX + "loaded_audios" # (data, sampling_rate) # === videos === # # InputContainer from av. # # Key: {video_path} loaded_videos = DEFAULT_PREFIX + "loaded_videos" # sampled frames. # # Key: {video_path}-{frame_sampling_method}[-{frame_num}] # # {frame_num} is only used when {frame_sampling_method} is "uniform" sampled_frames = DEFAULT_PREFIX + "sampled_frames"
[docs] class JobRequiredKeys(Enum): hook = "hook" meta_name = "meta_name" input = "input" output = "output" local = "local" dj_configs = "dj_configs" extra_configs = "extra_configs"