Source code for data_juicer.analysis.overall_analysis

import os
from multiprocessing import Pool

import pandas as pd
from loguru import logger
from tqdm import tqdm

from data_juicer.utils.constant import Fields


def _single_column_analysis(col, *args, **kwargs):
    col_overall = col.describe(*args, **kwargs)
    return col_overall


[docs]class OverallAnalysis: """Apply analysis on the overall stats, including mean, std, quantiles, etc."""
[docs] def __init__(self, dataset, output_path): """ Initialization method. :param dataset: the dataset to be analyzed :param output_path: path to store the analysis results. """ self.stats = pd.DataFrame(dataset[Fields.stats]) self.output_path = output_path if not os.path.exists(self.output_path): os.makedirs(self.output_path) # default percentiles to analyze self.default_percentiles = [0.25, 0.5, 0.75] # supported dtypes of column to be analyzed # Notice: there won't be mixed types in a column because the stats is # obtained from Dataset, which doesn't allow mixed types. # Notice: for now, stats can only be: # {numbers, string, list of one of before} self.supported_object_types = {str, list}
[docs] def refine_single_column(self, col): if col.dtype != 'object': # not an object, return directly return col # if the type of this column is object, we can decide the actual type # according to the first element. first = col[0] if type(first) not in self.supported_object_types: logger.warning(f'There is a column of stats with type ' f'[{type(first)}], which is not supported to be ' f'analyzed for now.') return None if type(first) is str: # describe(include = 'all') can analyze the string type return col elif type(first) is list: # flatten and infer the type col = col.explode().infer_objects() return col
[docs] def analyze(self, percentiles=[], num_proc=1, skip_export=False): """ Apply overall analysis on the whole dataset based on the describe method of pandas. :param percentiles: percentiles to analyze :param num_proc: number of processes to analyze the dataset :param skip_export: whether export the results to disk :return: the overall analysis result. """ # merge default and customized percentiles and get overall information percentiles = list(set(percentiles + self.default_percentiles)) results = [] pool = Pool(num_proc) for col_name in self.stats.columns: this_col = self.refine_single_column(self.stats[col_name]) res = pool.apply_async(_single_column_analysis, kwds={ 'col': this_col, 'percentiles': percentiles, 'include': 'all', }) results.append(res) pool.close() pool.join() result_cols = [res.get() for res in tqdm(results)] overall = pd.DataFrame(result_cols).T # export to result report file if not skip_export: overall.to_csv(os.path.join(self.output_path, 'overall.csv')) overall.to_markdown(os.path.join(self.output_path, 'overall.md')) return overall