import os
from loguru import logger
from data_juicer.analysis import ColumnWiseAnalysis, OverallAnalysis
from data_juicer.config import init_configs
from data_juicer.format import load_formatter
from data_juicer.ops import Filter, load_ops
from data_juicer.utils import cache_utils
from .exporter import Exporter
[docs]class Analyzer:
"""
This Analyzer class is used to analyze a specific dataset.
It will compute stats for all filter ops in the config file, apply
multiple analysis (e.g. OverallAnalysis, ColumnWiseAnalysis, etc.)
on these stats, and generate the analysis results (stats tables,
distribution figures, etc.) to help users understand the input
dataset better.
"""
[docs] def __init__(self, cfg=None):
"""
Initialization method.
:param cfg: optional config dict.
"""
self.cfg = init_configs() if cfg is None else cfg
self.work_dir = self.cfg.work_dir
if self.cfg.use_cache:
logger.info(f'Using cache compression method: '
f'[{self.cfg.cache_compress}]')
cache_utils.CACHE_COMPRESS = self.cfg.cache_compress
# setup formatter
logger.info('Setting up data formatter...')
self.formatter = load_formatter(
dataset_path=self.cfg.dataset_path,
generated_dataset_config=self.cfg.generated_dataset_config,
text_keys=self.cfg.text_keys,
suffixes=self.cfg.suffixes,
add_suffix=self.cfg.add_suffix)
# prepare exporter and check export path suffix
# NOTICE: no need to export dataset texts for analyzer
# (export_ds=False). Instead, only need to export stats
# (export_stats=True).
logger.info('Preparing exporter...')
self.exporter = Exporter(
self.cfg.export_path,
self.cfg.export_shard_size,
self.cfg.export_in_parallel,
self.cfg.np,
export_ds=self.cfg.export_original_dataset,
keep_stats_in_res_ds=self.cfg.export_original_dataset,
export_stats=True)
# parsed_res
self.overall_result = None
self.overall_single_plot_path = None
self.analysis_path = os.path.join(self.cfg.work_dir, 'analysis')
[docs] def run(self, load_data_np=None, skip_export=False):
"""
Running the dataset analysis pipeline.
:param load_data_np: number of workers when loading the dataset.
:param skip_export: whether export the results into disk
:return: analyzed dataset.
"""
# 1. format data
logger.info('Loading dataset from data formatter...')
if load_data_np is None:
load_data_np = self.cfg.np
dataset = self.formatter.load_dataset(load_data_np, self.cfg)
# extract processes
logger.info('Preparing process operators...')
ops = load_ops(self.cfg.process, self.cfg.op_fusion)
# 2. stats precompute only for filter ops
logger.info('Computing the stats of dataset...')
stats_collected = False
for op in ops:
if isinstance(op, Filter):
original_process = op.process
op.process = None
dataset = dataset.process(op, work_dir=self.work_dir)
op.process = original_process
stats_collected = True
if not stats_collected:
logger.warning('No stats collected. Please add some Filter ops to '
'the process list in configs.')
return dataset
# 3. data export
logger.info('Exporting dataset to disk...')
self.exporter.export(dataset)
if self.cfg.use_cache and self.cfg.cache_compress:
from data_juicer.utils.compress import compress
compress(dataset)
# 4. analysis and output result to the export path
# 4.1. Only consider fields in Fields.stats
# 4.2. For string fields, only consider its histogram
# 4.3. For numeric fields, consider its histogram and box
# 4.4. Otherwise, DO NOT analyze
logger.info('Applying overall analysis on stats...')
overall_analysis = OverallAnalysis(dataset, self.analysis_path)
self.overall_result = overall_analysis.analyze(
percentiles=self.cfg.percentiles,
num_proc=self.cfg.np,
skip_export=skip_export)
logger.info(f'The overall analysis results are: {self.overall_result}')
logger.info('Applying column-wise analysis on stats...')
column_wise_analysis = ColumnWiseAnalysis(
dataset,
self.analysis_path,
overall_result=self.overall_result,
save_stats_in_one_file=self.cfg.save_stats_in_one_file,
)
column_wise_analysis.analyze(skip_export=skip_export)
return dataset