Skip to content

Commit

Permalink
[Feat] OP-wise Insight Mining (#516)
Browse files Browse the repository at this point in the history
* + add auto mode for analyzer: load all filters that produce stats to analyze the target dataset

* + add default mem_required for those model-based OPs

* - support wordcloud drawing for str or str list fields in stats
- support set the number of samples to be analyzed in auto mode. It's 1k in default.

* - take the minimum one of dataset length and auto num

* * update default export path

* * set version limit for wandb to avoid exception

* + add docs for auto mode

* + support t-test for Measure

* * fix some bugs

* - support analyze a dataset object
- optimize the logics of loading filters that produce stats and updating attributes of OPs

* - support analysis on tags in meta

* - support analysis with tagging OPs

* - move tags into the meta field

* - do not tell tags using their suffix
- suppress the error/exceptions in Monitor due to the termination of the main process
- exported stats file includes meta field in exporter

* - add insight mining

* * resolve the bugs when running insight mining in multiprocessing mode

* * update unittests

* * update unittests

* * update unittests

* * update readme for analyzer

* * use more detailed key

* + add reference
  • Loading branch information
HYLcool authored Dec 20, 2024
1 parent 2fdf484 commit b6f89a9
Show file tree
Hide file tree
Showing 27 changed files with 680 additions and 244 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,9 @@ dj-analyze --config configs/demo/analyzer.yaml
dj-analyze --auto --dataset_path xx.jsonl [--auto_num 1000]
```
- **Note:** Analyzer only compute stats of Filter ops. So extra Mapper or Deduplicator ops will be ignored in the analysis process.
- **Note:** Analyzer only compute stats for Filters that produce stats or other OPs that produce tags/categories in meta. So other OPs will be ignored in the analysis process. We use the following registries to decorate OPs:
- `NON_STATS_FILTERS`: decorate Filters that **DO NOT** produce any stats.
- `TAGGING_OPS`: decorate OPs that **DO** produce tags/categories in meta field.
### Data Visualization
Expand Down
4 changes: 3 additions & 1 deletion README_ZH.md
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,9 @@ dj-analyze --config configs/demo/analyzer.yaml
dj-analyze --auto --dataset_path xx.jsonl [--auto_num 1000]
```

* **注意**:Analyzer 只计算 Filter 算子的状态,其他的算子(例如 Mapper 和 Deduplicator)会在分析过程中被忽略。
* **注意**:Analyzer 只用于能在 stats 字段里产出统计信息的 Filter 算子和能在 meta 字段里产出 tags 或类别标签的其他算子。除此之外的其他的算子会在分析过程中被忽略。我们使用以下两种注册器来装饰相关的算子:
* `NON_STATS_FILTERS`:装饰那些**不能**产出任何统计信息的 Filter 算子。
* `TAGGING_OPS`:装饰那些能在 meta 字段中产出 tags 或类别标签的算子。

### 数据可视化

Expand Down
24 changes: 14 additions & 10 deletions data_juicer/analysis/column_wise_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from tqdm import tqdm
from wordcloud import WordCloud

from data_juicer.utils.constant import Fields
from data_juicer.utils.constant import DEFAULT_PREFIX, Fields

from .overall_analysis import OverallAnalysis

Expand Down Expand Up @@ -70,6 +70,12 @@ def __init__(self,
stats into one image file
"""
self.stats = pd.DataFrame(dataset[Fields.stats])
self.meta = pd.DataFrame(dataset[Fields.meta])
# remove non-tag columns
meta_columns = self.meta.columns
for col_name in meta_columns:
if not col_name.startswith(DEFAULT_PREFIX):
self.meta = self.meta.drop(col_name, axis=1)
self.output_path = output_path
if not os.path.exists(self.output_path):
os.makedirs(self.output_path)
Expand Down Expand Up @@ -101,8 +107,9 @@ def analyze(self, show_percentiles=False, show=False, skip_export=False):
width_unit = 4
height_unit = 6

columns = self.stats.columns
num = len(columns)
stats_and_meta = pd.concat([self.stats, self.meta], axis=1)
all_columns = stats_and_meta.columns
num = len(all_columns)

# get the recommended "best" number of columns and rows
rec_row, rec_col, grid_indexes = get_row_col(num, num_subcol)
Expand All @@ -115,9 +122,9 @@ def analyze(self, show_percentiles=False, show=False, skip_export=False):
fig = plt.figure(figsize=(rec_width, rec_height),
layout='constrained')
subfigs = fig.subfigures(rec_row, rec_col, wspace=0.01)
for i, column_name in enumerate(tqdm(columns.to_list(),
desc='Column')):
data = self.stats[column_name]
for i, column_name in enumerate(
tqdm(all_columns.to_list(), desc='Column')):
data = stats_and_meta[column_name]
# explode data to flatten inner list
data = data.explode().infer_objects()
grid = grid_indexes[i]
Expand Down Expand Up @@ -210,10 +217,7 @@ def draw_hist(self, ax, data, save_path, percentiles=None, show=False):
"""
# recommended number of bins
data_num = len(data)
if data_num >= 100:
rec_bins = int(math.sqrt(len(data)))
else:
rec_bins = None
rec_bins = max(int(math.sqrt(data_num)), 10)

# if ax is None, using plot method in pandas
if ax is None:
Expand Down
111 changes: 111 additions & 0 deletions data_juicer/analysis/measure.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import numpy as np

from data_juicer.utils.lazy_loader import LazyLoader

torch = LazyLoader('torch', 'torch')
td = LazyLoader('td', 'torch.distributions')
F = LazyLoader('F', 'torch.nn.functional')

stats = LazyLoader('stats', 'scipy.stats')


class Measure(object):
"""Base class for Measure distribution.
Expand Down Expand Up @@ -48,6 +52,15 @@ def _convert_to_categorical(self, p):
else:
return td.Categorical(torch.tensor(p))

def _convert_to_ndarray(self, p):
"""
Convert input data to torch tensor.
:param p: input data, now support
[`scalar`,`list`, `tuple`, `torch binary file`, and `Categorical`].
:return: torch tensor
"""
return self._convert_to_tensor(p).numpy()


class KLDivMeasure(Measure):
"""
Expand Down Expand Up @@ -108,3 +121,101 @@ class EntropyMeasure(Measure):
def measure(self, p):
p = self._convert_to_categorical(p)
return p.entropy()


class RelatedTTestMeasure(Measure):
"""
Measure T-Test for two related distributions on their histogram of the same
bins.
Ref:
https://en.wikipedia.org/wiki/Student%27s_t-test
For continuous features or distributions, the input could be dataset stats
list.
For discrete features or distributions, the input could be the tags or the
categories list.
"""
name = 't-test'

@staticmethod
def stats_to_hist(p, q):
p = np.array(p)
q = np.array(q)

# get common maximum number of data samples, and max/min values
max_data_num = max(len(p), len(q))
min_val = min(min(p), min(q))
max_val = max(max(p), max(q))

# get a recommended number of bins
rec_bins = max(int(np.sqrt(max_data_num)), 10)

# get the common bin edges
common_p = np.append(p, [min_val, max_val])
hist_p, bin_edges = np.histogram(common_p, bins=rec_bins)
# restore the hist of the original p
hist_p[0] -= 1
hist_p[-1] -= 1
# get the hist of the original q using the common bin edges
hist_q, _ = np.histogram(q, bins=bin_edges)
return hist_p, hist_q, bin_edges

@staticmethod
def category_to_hist(p, q):

def flatten_list(lst):
res = []
for s in lst:
if isinstance(s, list):
res.extend(flatten_list(s))
else:
res.append(s)
return res

# flatten the list
p = flatten_list(p)
q = flatten_list(q)

# get the common categories
cat_p = set(p)
cat_q = set(q)
cat_common = cat_p.union(cat_q)

# get category distributions
count_p = {cat: 0 for cat in cat_common}
count_q = {cat: 0 for cat in cat_common}
for cat in p:
count_p[cat] += 1
for cat in q:
count_q[cat] += 1

# only keep distribution values sorted by counts
sorted_cat = list(count_p.items())
sorted_cat.sort(key=lambda it: it[1], reverse=True)
sorted_cat = [it[0] for it in sorted_cat]
# get the value dist
hist_p = [count_p[cat] for cat in sorted_cat]
hist_q = [count_q[cat] for cat in sorted_cat]

return hist_p, hist_q, count_p, count_q, sorted_cat

def measure(self, p, q):
"""
:param p: the first feature or distribution. (stats/tags/categories)
:param q: the second feature or distribution. (stats/tags/categories)
:return: the T-Test results object -- ([ref](https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats._result_classes.TtestResult.html#scipy.stats._result_classes.TtestResult)) # noqa: E501
"""
ele = p[0]
while isinstance(ele, list):
ele = ele[0]
if isinstance(ele, str):
# discrete tags or categories
hist_p, hist_q = self.category_to_hist(p, q)[:2]
else:
# continuous stats
hist_p, hist_q = self.stats_to_hist(p, q)[:2]

# compute the t-test and pval for hist_p and hist_q
ttest_res = stats.ttest_rel(hist_p, hist_q)
return ttest_res
16 changes: 13 additions & 3 deletions data_juicer/analysis/overall_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from loguru import logger
from tqdm import tqdm

from data_juicer.utils.constant import Fields
from data_juicer.utils.constant import DEFAULT_PREFIX, Fields


def _single_column_analysis(col, *args, **kwargs):
Expand All @@ -25,6 +25,12 @@ def __init__(self, dataset, output_path):
:param output_path: path to store the analysis results.
"""
self.stats = pd.DataFrame(dataset[Fields.stats])
self.meta = pd.DataFrame(dataset[Fields.meta])
# remove non-tag columns
meta_columns = self.meta.columns
for col_name in meta_columns:
if not col_name.startswith(DEFAULT_PREFIX):
self.meta = self.meta.drop(col_name, axis=1)
self.output_path = output_path
if not os.path.exists(self.output_path):
os.makedirs(self.output_path)
Expand Down Expand Up @@ -71,10 +77,14 @@ def analyze(self, percentiles=[], num_proc=1, skip_export=False):
# merge default and customized percentiles and get overall information
percentiles = list(set(percentiles + self.default_percentiles))

# merge stats and meta
stats_and_meta = pd.concat([self.stats, self.meta], axis=1)
all_columns = stats_and_meta.columns

results = []
pool = Pool(num_proc)
for col_name in self.stats.columns:
this_col = self.refine_single_column(self.stats[col_name])
for col_name in all_columns:
this_col = self.refine_single_column(stats_and_meta[col_name])
res = pool.apply_async(_single_column_analysis,
kwds={
'col': this_col,
Expand Down
84 changes: 54 additions & 30 deletions data_juicer/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,22 @@ def init_configs(args: Optional[List[str]] = None, which_entry: object = None):
help='Number of samples extracted by tracer to show the dataset '
'difference before and after a op. Only available when '
'open_tracer is true.')
parser.add_argument(
'--open_insight_mining',
type=bool,
default=False,
help='Whether to open insight mining to trace the OP-wise stats/tags '
'changes during process. It might take more time when opening '
'insight mining.')
parser.add_argument(
'--op_list_to_mine',
type=List[str],
default=[],
help='Which OPs will be applied on the dataset to mine the insights '
'in their stats changes. Only those OPs that produce stats or '
'meta are valid. If it\'s empty, all OPs that produce stats and '
'meta will be involved. Only available when filter_list_to_mine '
'is true.')
parser.add_argument(
'--op_fusion',
type=bool,
Expand Down Expand Up @@ -513,13 +529,7 @@ def init_setup_from_cfg(cfg: Namespace):

# add all filters that produce stats
if cfg.auto:
import pkgutil

import data_juicer.ops.filter as djfilters
cfg.process = [{
filter_name: {}
} for _, filter_name, _ in pkgutil.iter_modules(djfilters.__path__)
if filter_name not in djfilters.NON_STATS_FILTERS]
cfg.process = load_ops_with_stats_meta()

# Apply text_key modification during initializing configs
# users can freely specify text_key for different ops using `text_key`
Expand All @@ -528,34 +538,48 @@ def init_setup_from_cfg(cfg: Namespace):
text_key = cfg.text_keys[0]
else:
text_key = cfg.text_keys
for op in cfg.process:
op_attrs = {
'text_key': text_key,
'image_key': cfg.image_key,
'audio_key': cfg.audio_key,
'video_key': cfg.video_key,
'num_proc': cfg.np,
'turbo': cfg.turbo,
}
cfg.process = update_op_attr(cfg.process, op_attrs)

return cfg


def load_ops_with_stats_meta():
import pkgutil

import data_juicer.ops.filter as djfilter
from data_juicer.ops import NON_STATS_FILTERS, TAGGING_OPS
stats_filters = [{
filter_name: {}
} for _, filter_name, _ in pkgutil.iter_modules(djfilter.__path__)
if filter_name not in NON_STATS_FILTERS.modules]
meta_ops = [{op_name: {}} for op_name in TAGGING_OPS.modules]
return stats_filters + meta_ops


def update_op_attr(op_list: list, attr_dict: dict = None):
if not attr_dict:
return op_list
updated_op_list = []
for op in op_list:
for op_name in op:
args = op[op_name]
if args is None:
args = {
'text_key': text_key,
'image_key': cfg.image_key,
'audio_key': cfg.audio_key,
'video_key': cfg.video_key,
'num_proc': cfg.np,
'turbo': cfg.turbo,
}
args = attr_dict
else:
if 'text_key' not in args or args['text_key'] is None:
args['text_key'] = text_key
if 'image_key' not in args or args['image_key'] is None:
args['image_key'] = cfg.image_key
if 'audio_key' not in args or args['audio_key'] is None:
args['audio_key'] = cfg.audio_key
if 'video_key' not in args or args['video_key'] is None:
args['video_key'] = cfg.video_key
if 'num_proc' not in args or args['num_proc'] is None:
args['num_proc'] = cfg.np
if 'turbo' not in args or args['turbo'] is None:
args['turbo'] = cfg.turbo
for key in attr_dict:
if key not in args or args[key] is None:
args[key] = attr_dict[key]
op[op_name] = args

return cfg
updated_op_list.append(op)
return updated_op_list


def _collect_config_info_from_class_docs(configurable_ops, parser):
Expand Down
Loading

0 comments on commit b6f89a9

Please sign in to comment.