Skip to content

Commit

Permalink
* use multiprocessing to speed up overall analysis
Browse files Browse the repository at this point in the history
+ support analyse for DataFrame column of list type
  • Loading branch information
HYLcool committed Nov 28, 2023
1 parent e704da2 commit 9c31b84
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 3 deletions.
54 changes: 52 additions & 2 deletions data_juicer/analysis/overall_analysis.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
import os
from multiprocessing import Pool

import pandas as pd
from loguru import logger
from tqdm import tqdm

from data_juicer.utils.constant import Fields


def _single_column_analysis(col, *args, **kwargs):
col_overall = col.describe(*args, **kwargs)
return col_overall


class OverallAnalysis:
"""Apply analysis on the overall stats, including mean, std, quantiles,
etc."""
Expand All @@ -23,18 +31,60 @@ def __init__(self, dataset, output_path):

# default percentiles to analyse
self.default_percentiles = [0.25, 0.5, 0.75]
# supported dtypes of column to be analysed
# Notice: there won't be mixed types in a column because the stats is
# obtained from Dataset, which doesn't allow mixed types.
# Notice: for now, stats can only be:
# {numbers, string, list of one of before}
self.supported_object_types = {str, list}

def analyse(self, percentiles=[]):
def refine_single_column(self, col):
if col.dtype != 'object':
# not an object, return directly
return col
# if the type of this column is object, we can decide the actual type
# according to the first element.
first = col[0]
if type(first) not in self.supported_object_types:
logger.warning(f'There is a column of stats with type '
f'[{type(first)}], which is not supported to be '
f'analysed for now.')
return None
if type(first) is str:
# describe(include = 'all') can analyze the string type
return col
elif type(first) is list:
# flatten and infer the type
col = col.explode().infer_objects()
return col

def analyse(self, percentiles=[], num_proc=1):
"""
Apply overall analysis on the whole dataset based on the describe
method of pandas.
:param percentiles: percentiles to analyse
:param num_proc: number of processes to analyse the dataset
:return: the overall analysis result.
"""
# merge default and customized percentiles and get overall information
percentiles = list(set(percentiles + self.default_percentiles))
overall = self.stats.describe(percentiles=percentiles, include='all')

results = []
pool = Pool(num_proc)
for col_name in self.stats.columns:
this_col = self.refine_single_column(self.stats[col_name])
res = pool.apply_async(_single_column_analysis,
kwds={
'col': this_col,
'percentiles': percentiles,
'include': 'all',
})
results.append(res)
pool.close()
pool.join()
result_cols = [res.get() for res in tqdm(results)]
overall = pd.DataFrame(result_cols).T

# export to result report file
overall.to_csv(os.path.join(self.output_path, 'overall.csv'))
Expand Down
2 changes: 1 addition & 1 deletion data_juicer/core/analyser.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def run(self, load_data_np=None):

logger.info('Applying overall analysis on stats...')
overall_analysis = OverallAnalysis(dataset, self.analysis_path)
self.overall_result = overall_analysis.analyse()
self.overall_result = overall_analysis.analyse(num_proc=self.cfg.np)

logger.info('Applying column-wise analysis on stats...')
column_wise_analysis = ColumnWiseAnalysis(
Expand Down

0 comments on commit 9c31b84

Please sign in to comment.