Source code for data_juicer.utils.constant
import copy
import inspect
import io
import os
from enum import Enum
import zstandard as zstd
from loguru import logger
DEFAULT_PREFIX = "__dj__"
[docs]
class Fields(object):
# for storing stats generated by filter op
stats = DEFAULT_PREFIX + "stats__"
# for storing metas generated by mapper op
meta = DEFAULT_PREFIX + "meta__"
# for storing metas of batch samples generated by aggregator op
batch_meta = DEFAULT_PREFIX + "batch_meta__"
context = DEFAULT_PREFIX + "context__"
suffix = DEFAULT_PREFIX + "suffix__"
# the name of the original file from which this sample was derived.
source_file = DEFAULT_PREFIX + "source_file__"
# the name of directory to store the produced multimodal data
multimodal_data_output_dir = DEFAULT_PREFIX + "produced_data__"
[docs]
class BatchMetaKeys(object):
entity_attribute = "entity_attribute"
most_relevant_entities = "most_relevant_entities"
[docs]
class MetaKeys(object):
# === text related tags ===
# # sentiment
dialog_sentiment_intensity = "dialog_sentiment_intensity"
dialog_sentiment_intensity_analysis = "dialog_sentiment_intensity_analysis"
query_sentiment_label = "query_sentiment_label"
query_sentiment_score = "query_sentiment_label_score"
dialog_sentiment_labels = "dialog_sentiment_labels"
dialog_sentiment_labels_analysis = "dialog_sentiment_labels_analysis"
# # intent
dialog_intent_labels = "dialog_intent_labels"
dialog_intent_labels_analysis = "dialog_intent_labels_analysis"
query_intent_label = "query_intent_label"
query_intent_score = "query_intent_label_score"
# # topic
dialog_topic_labels = "dialog_topic_labels"
dialog_topic_labels_analysis = "dialog_topic_labels_analysis"
query_topic_label = "query_topic_label"
query_topic_score = "query_topic_label_score"
# === multi-modal related tags ===
# # video-frame tags
video_frame_tags = "video_frame_tags"
# # video-audio tags
video_audio_tags = "video_audio_tags"
# # video frames
video_frames = "video_frames"
# # image tags
image_tags = "image_tags"
# bounding box tag
bbox_tag = DEFAULT_PREFIX + "bbox__"
# === info extraction related tags ===
# # for event extraction
event_description = "event_description"
# # a list of characters relevant to the event
relevant_characters = "relevant_characters"
# # the given main entities for attribute extraction
main_entities = "main_entities"
# # the given attributes to be extracted
attributes = "attributes"
# # the extracted attribute descriptions
attribute_descriptions = "attribute_descriptions"
# # extract from raw data for support the attribute
attribute_support_texts = "attribute_support_texts"
# # the nickname relationship
nickname = "nickname"
# # the entity for knowledge graph
entity = "entity"
# # # the name of entity
entity_name = "entity_name"
# # # the type of entity
entity_type = "entity_type"
# # # the description of entity
entity_description = "entity_entity_description"
# # the relationship for knowledge graph
relation = "relation"
# # # the source entity of the relation
source_entity = "relation_source_entity"
# # # the target entity of the relation
target_entity = "relation_target_entity"
# # # the description of the relation
relation_description = "relation_description"
# # # the keywords of the relation
relation_keywords = "relation_keywords"
# # # the strength of the relation
relation_strength = "relation_strength"
# # the keyword in a text
keyword = "keyword"
# # support text
support_text = "support_text"
# # role relation
role_relation = "role_relation"
# # html tables
html_tables = "html_tables"
[docs]
class StatsKeysMeta(type):
"""
a helper class to track the mapping from OP's name to its used stats_keys
e.g., # once the AlphanumericFilter's compute_stats method has been called
res = TrackingDescriptor.get_access_log()
print(res) # {"AlphanumericFilter": ["alnum_ratio", "alpha_token_ratio"]}
"""
_accessed_by = {}
def __getattr__(cls, attr):
caller_class = inspect.currentframe().f_back.f_globals["__name__"]
# no need to track the parent classes
caller_class = caller_class.split(".")[-1]
stat_key = getattr(cls._constants_class, attr)
if caller_class not in cls._accessed_by:
cls._accessed_by[caller_class] = set()
if stat_key not in cls._accessed_by[caller_class]:
cls._accessed_by[caller_class].add(stat_key)
return stat_key
[docs]
def get_access_log(cls, dj_cfg=None, dataset=None):
if cls._accessed_by:
return cls._accessed_by
elif dj_cfg and dataset:
tmp_dj_cfg = copy.deepcopy(dj_cfg)
tmp_dj_cfg.use_cache = False
tmp_dj_cfg.use_checkpoint = False
from data_juicer.config import get_init_configs
from data_juicer.core import Analyzer
tmp_analyzer = Analyzer(get_init_configs(tmp_dj_cfg))
dataset = dataset.take(1)
# do not overwrite the true analysis results
tmp_analyzer.run(dataset=dataset, skip_export=True)
elif dj_cfg:
tmp_dj_cfg = copy.deepcopy(dj_cfg)
# the access has been skipped due to the use of cache
# we will using a temp data sample to get the access log
if os.path.exists(dj_cfg.dataset_path) and (
"jsonl" in dj_cfg.dataset_path or "jsonl.zst" in dj_cfg.dataset_path
):
logger.info("Begin to track the usage of ops with a dummy data sample")
# load the first line as tmp_data
tmp_f_name = None
first_line = None
if "jsonl.zst" in dj_cfg.dataset_path:
tmp_f_name = dj_cfg.dataset_path.replace(".jsonl.zst", ".tmp.jsonl")
# Open the file in binary mode and
# create a Zstandard decompression context
with open(dj_cfg.dataset_path, "rb") as compressed_file:
dctx = zstd.ZstdDecompressor()
# Create a stream reader for the file and decode the
# first line
with dctx.stream_reader(compressed_file) as reader:
text_stream = io.TextIOWrapper(reader, encoding="utf-8")
first_line = text_stream.readline()
elif "jsonl" in dj_cfg.dataset_path:
tmp_f_name = dj_cfg.dataset_path.replace(".jsonl", ".tmp.jsonl")
with open(dj_cfg.dataset_path, "r", encoding="utf-8") as orig_file:
first_line = orig_file.readline()
assert tmp_f_name is not None and first_line is not None, (
"error when loading the first line, when " f"dj_cfg.dataset_path={dj_cfg.dataset_path}"
)
with open(tmp_f_name, "w", encoding="utf-8") as tmp_file:
tmp_file.write(first_line)
tmp_dj_cfg.dataset_path = tmp_f_name
tmp_dj_cfg.use_cache = False
tmp_dj_cfg.use_checkpoint = False
from data_juicer.config import get_init_configs
from data_juicer.core import Analyzer
tmp_analyzer = Analyzer(get_init_configs(tmp_dj_cfg))
# do not overwrite the true analysis results
tmp_analyzer.run(skip_export=True)
os.remove(tmp_f_name)
else:
raise NotImplementedError(
f"For now, the dummy data is supported for only jsonl type"
f". Please check your config as {dj_cfg.dataset_path} is "
f"either not existed or in jsonl type."
)
return cls._accessed_by
[docs]
class StatsKeysConstant(object):
# === text ===
alpha_token_ratio = "alpha_token_ratio"
alnum_ratio = "alnum_ratio"
avg_line_length = "avg_line_length"
char_rep_ratio = "char_rep_ratio"
flagged_words_ratio = "flagged_words_ratio"
lang = "lang"
lang_score = "lang_score"
max_line_length = "max_line_length"
perplexity = "perplexity"
special_char_ratio = "special_char_ratio"
stopwords_ratio = "stopwords_ratio"
text_len = "text_len"
text_pair_similarity = "text_pair_similarity"
num_action = "num_action"
num_dependency_edges = "num_dependency_edges"
num_token = "num_token"
num_words = "num_words"
word_rep_ratio = "word_rep_ratio"
llm_analysis_score = "llm_analysis_score"
llm_analysis_record = "llm_analysis_record"
llm_quality_score = "llm_quality_score"
llm_quality_record = "llm_quality_record"
llm_difficulty_score = "llm_difficulty_score"
llm_difficulty_record = "llm_difficulty_record"
# === image ===
aspect_ratios = "aspect_ratios"
image_width = "image_width"
image_height = "image_height"
image_sizes = "image_sizes"
face_ratios = "face_ratios"
face_detections = "face_detections"
face_counts = "face_counts"
image_aesthetics_scores = "image_aesthetics_scores"
image_nsfw_score = "image_nsfw_score"
image_watermark_prob = "image_watermark_prob"
image_pair_similarity = "image_pair_similarity"
# === audios ===
audio_duration = "audio_duration"
audio_nmf_snr = "audio_nmf_snr"
audio_sizes = "audio_sizes"
# === videos ===
video_duration = "video_duration"
video_aspect_ratios = "video_aspect_ratios"
video_width = "video_width"
video_height = "video_height"
video_ocr_area_ratio = "video_ocr_area_ratio"
video_aesthetic_score = "video_aesthetic_score"
video_frames_aesthetics_score = "video_frames_aesthetics_score"
video_motion_score = "video_motion_score"
video_nsfw_score = "video_nsfw_score"
video_watermark_prob = "video_watermark_prob"
# === multimodal ===
# image-text
image_text_similarity = "image_text_similarity"
image_text_matching_score = "image_text_matching_score"
phrase_grounding_recall = "phrase_grounding_recall"
# video-text
video_frames_text_similarity = "video_frames_text_similarity"
# general-field-filter
general_field_filter_condition = "general_field_filter_condition"
[docs]
class HashKeys(object):
uid = DEFAULT_PREFIX + "uid"
hash = DEFAULT_PREFIX + "hash"
minhash = DEFAULT_PREFIX + "minhash"
simhash = DEFAULT_PREFIX + "simhash"
# image
imagehash = DEFAULT_PREFIX + "imagehash"
# video
videohash = DEFAULT_PREFIX + "videohash"
# duplicate flag
is_unique = DEFAULT_PREFIX + "is_unique"
[docs]
class InterVars(object):
# === text ===
lines = DEFAULT_PREFIX + "lines"
words = DEFAULT_PREFIX + "words"
refined_words = DEFAULT_PREFIX + "refined_words"
# === image ===
loaded_images = DEFAULT_PREFIX + "loaded_images" # Image
# === audios ===
loaded_audios = DEFAULT_PREFIX + "loaded_audios" # (data, sampling_rate)
# === videos ===
# # InputContainer from av.
# # Key: {video_path}
loaded_videos = DEFAULT_PREFIX + "loaded_videos"
# sampled frames.
# # Key: {video_path}-{frame_sampling_method}[-{frame_num}]
# # {frame_num} is only used when {frame_sampling_method} is "uniform"
sampled_frames = DEFAULT_PREFIX + "sampled_frames"
[docs]
class JobRequiredKeys(Enum):
hook = "hook"
meta_name = "meta_name"
input = "input"
output = "output"
local = "local"
dj_configs = "dj_configs"
extra_configs = "extra_configs"