Source code for data_juicer.core.analyzer

import os
from typing import Optional, Union

from datasets import Dataset
from jsonargparse import Namespace
from loguru import logger
from pydantic import PositiveInt

from data_juicer.analysis import ColumnWiseAnalysis, OverallAnalysis
from data_juicer.config import init_configs
from data_juicer.format import load_formatter
from data_juicer.ops import NON_STATS_FILTERS, TAGGING_OPS, Filter, load_ops
from data_juicer.ops.op_fusion import fuse_operators
from data_juicer.utils import cache_utils

from .adapter import Adapter
from .data import NestedDataset
from .exporter import Exporter


[docs] class Analyzer: """ This Analyzer class is used to analyze a specific dataset. It will compute stats for all filter ops in the config file, apply multiple analysis (e.g. OverallAnalysis, ColumnWiseAnalysis, etc.) on these stats, and generate the analysis results (stats tables, distribution figures, etc.) to help users understand the input dataset better. """
[docs] def __init__(self, cfg: Optional[Namespace] = None): """ Initialization method. :param cfg: optional jsonargparse Namespace dict. """ self.cfg = init_configs(which_entry=self) if cfg is None else cfg self.work_dir = self.cfg.work_dir if self.cfg.use_cache: logger.info(f'Using cache compression method: ' f'[{self.cfg.cache_compress}]') cache_utils.CACHE_COMPRESS = self.cfg.cache_compress # setup formatter logger.info('Setting up data formatter...') self.formatter = load_formatter( dataset_path=self.cfg.dataset_path, generated_dataset_config=self.cfg.generated_dataset_config, text_keys=self.cfg.text_keys, suffixes=self.cfg.suffixes, add_suffix=self.cfg.add_suffix) # prepare exporter and check export path suffix # NOTICE: no need to export dataset texts for analyzer # (export_ds=False). Instead, only need to export stats # (export_stats=True). logger.info('Preparing exporter...') self.exporter = Exporter( self.cfg.export_path, self.cfg.export_shard_size, self.cfg.export_in_parallel, self.cfg.np, export_ds=self.cfg.export_original_dataset, keep_stats_in_res_ds=self.cfg.export_original_dataset, export_stats=True) # parsed_res self.overall_result = None self.overall_single_plot_path = None self.analysis_path = os.path.join(self.cfg.work_dir, 'analysis')
[docs] def run(self, dataset: Union[Dataset, NestedDataset] = None, load_data_np: Optional[PositiveInt] = None, skip_export: bool = False, skip_return: bool = False): """ Running the dataset analysis pipeline. :param dataset: a Dataset object to be analyzed. :param load_data_np: number of workers when loading the dataset. :param skip_export: whether export the results into disk :param skip_return: skip return for API called. :return: analyzed dataset. """ # 1. format data if load_data_np is None: load_data_np = self.cfg.np if dataset is None: logger.info('Loading dataset from data formatter...') dataset = self.formatter.load_dataset(load_data_np, self.cfg) else: logger.info(f'Using existing dataset {dataset}') if self.cfg.auto: # if it's auto analysis, only analyze for a minor part of the input # dataset to save time and computing resource dataset = dataset.take(min(len(dataset), self.cfg.auto_num)) # extract processes logger.info('Preparing process operators...') ops = load_ops(self.cfg.process) if self.cfg.op_fusion: probe_res = None if self.cfg.fusion_strategy == 'probe': logger.info('Probe the OP speed for OP reordering...') adapter = Adapter(self.cfg) probe_res, _ = adapter.probe_small_batch(dataset, ops) logger.info(f'Start OP fusion and reordering with strategy ' f'[{self.cfg.fusion_strategy}]...') ops = fuse_operators(ops, probe_res) # 2. stats precompute only for filter ops logger.info('Computing the stats of dataset...') stats_collected = False for op in ops: if isinstance(op, Filter) \ and op._name not in NON_STATS_FILTERS.modules: original_process = op.process op.process = None dataset = dataset.process(op, work_dir=self.work_dir, open_monitor=self.cfg.open_monitor) op.process = original_process stats_collected = True elif op._name in TAGGING_OPS.modules: dataset = dataset.process(op, work_dir=self.work_dir, open_monitor=self.cfg.open_monitor) stats_collected = True if not stats_collected: logger.warning( 'No stats/meta collected. Please add some Filter OPs or ' 'Tagging OPs to the process list in configs.') if not skip_return: return dataset # 3. data export logger.info('Exporting dataset to disk...') self.exporter.export(dataset) if self.cfg.use_cache and self.cfg.cache_compress: from data_juicer.utils.compress import compress compress(dataset) # 4. analysis and output result to the export path # 4.1. Only consider fields in Fields.stats # 4.2. For string fields, only consider its histogram # 4.3. For numeric fields, consider its histogram and box # 4.4. Otherwise, DO NOT analyze logger.info('Applying overall analysis on stats...') overall_analysis = OverallAnalysis(dataset, self.analysis_path) self.overall_result = overall_analysis.analyze( percentiles=self.cfg.percentiles, num_proc=self.cfg.np, skip_export=skip_export) logger.info(f'The overall analysis results are: {self.overall_result}') logger.info('Applying column-wise analysis on stats...') column_wise_analysis = ColumnWiseAnalysis( dataset, self.analysis_path, overall_result=self.overall_result, save_stats_in_one_file=self.cfg.save_stats_in_one_file, ) column_wise_analysis.analyze(skip_export=skip_export) if not skip_return: return dataset