Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feat] OP-wise Insight Mining #516

Merged
merged 24 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
083b665
+ add auto mode for analyzer: load all filters that produce stats to …
HYLcool Dec 12, 2024
662df5e
+ add default mem_required for those model-based OPs
HYLcool Dec 13, 2024
926c3da
- support wordcloud drawing for str or str list fields in stats
HYLcool Dec 13, 2024
27347c0
- take the minimum one of dataset length and auto num
HYLcool Dec 13, 2024
d19f92f
* update default export path
HYLcool Dec 13, 2024
fbd6726
* set version limit for wandb to avoid exception
HYLcool Dec 13, 2024
9f9f85b
+ add docs for auto mode
HYLcool Dec 13, 2024
566eb5b
+ support t-test for Measure
HYLcool Dec 16, 2024
7b8ee5c
* fix some bugs
HYLcool Dec 16, 2024
601d9a2
- support analyze a dataset object
HYLcool Dec 17, 2024
34f2ab6
- support analysis on tags in meta
HYLcool Dec 17, 2024
8531a01
- support analysis with tagging OPs
HYLcool Dec 17, 2024
4d6b701
- move tags into the meta field
HYLcool Dec 18, 2024
35aa6bd
- do not tell tags using their suffix
HYLcool Dec 18, 2024
85e1392
- add insight mining
HYLcool Dec 18, 2024
e3d7b8b
* resolve the bugs when running insight mining in multiprocessing mode
HYLcool Dec 19, 2024
3ca9994
Merge branch 'main' into feat/insight_mining
HYLcool Dec 19, 2024
16ca358
* update unittests
HYLcool Dec 20, 2024
dfb0bca
* update unittests
HYLcool Dec 20, 2024
f8b9539
* update unittests
HYLcool Dec 20, 2024
45259e5
* update readme for analyzer
HYLcool Dec 20, 2024
174ee05
Merge branch 'main' into feat/insight_mining
HYLcool Dec 20, 2024
51f53dc
* use more detailed key
HYLcool Dec 20, 2024
58001ca
+ add reference
HYLcool Dec 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,9 @@ dj-analyze --config configs/demo/analyzer.yaml
dj-analyze --auto --dataset_path xx.jsonl [--auto_num 1000]
```

- **Note:** Analyzer only compute stats of Filter ops. So extra Mapper or Deduplicator ops will be ignored in the analysis process.
- **Note:** Analyzer only compute stats for Filters that produce stats or other OPs that produce tags/categories in meta. So other OPs will be ignored in the analysis process. We use the following registries to decorate OPs:
- `NON_STATS_FILTERS`: decorate Filters that **DO NOT** produce any stats.
- `TAGGING_OPS`: decorate OPs that **DO** produce tags/categories in meta field.

### Data Visualization

Expand Down
4 changes: 3 additions & 1 deletion README_ZH.md
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,9 @@ dj-analyze --config configs/demo/analyzer.yaml
dj-analyze --auto --dataset_path xx.jsonl [--auto_num 1000]
```

* **注意**:Analyzer 只计算 Filter 算子的状态,其他的算子(例如 Mapper 和 Deduplicator)会在分析过程中被忽略。
* **注意**:Analyzer 只用于能在 stats 字段里产出统计信息的 Filter 算子和能在 meta 字段里产出 tags 或类别标签的其他算子。除此之外的其他的算子会在分析过程中被忽略。我们使用以下两种注册器来装饰相关的算子:
* `NON_STATS_FILTERS`:装饰那些**不能**产出任何统计信息的 Filter 算子。
* `TAGGING_OPS`:装饰那些能在 meta 字段中产出 tags 或类别标签的算子。

### 数据可视化

Expand Down
24 changes: 14 additions & 10 deletions data_juicer/analysis/column_wise_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from tqdm import tqdm
from wordcloud import WordCloud

from data_juicer.utils.constant import Fields
from data_juicer.utils.constant import DEFAULT_PREFIX, Fields

from .overall_analysis import OverallAnalysis

Expand Down Expand Up @@ -70,6 +70,12 @@ def __init__(self,
stats into one image file
"""
self.stats = pd.DataFrame(dataset[Fields.stats])
self.meta = pd.DataFrame(dataset[Fields.meta])
# remove non-tag columns
meta_columns = self.meta.columns
for col_name in meta_columns:
if not col_name.startswith(DEFAULT_PREFIX):
self.meta = self.meta.drop(col_name, axis=1)
self.output_path = output_path
if not os.path.exists(self.output_path):
os.makedirs(self.output_path)
Expand Down Expand Up @@ -101,8 +107,9 @@ def analyze(self, show_percentiles=False, show=False, skip_export=False):
width_unit = 4
height_unit = 6

columns = self.stats.columns
num = len(columns)
stats_and_meta = pd.concat([self.stats, self.meta], axis=1)
all_columns = stats_and_meta.columns
num = len(all_columns)

# get the recommended "best" number of columns and rows
rec_row, rec_col, grid_indexes = get_row_col(num, num_subcol)
Expand All @@ -115,9 +122,9 @@ def analyze(self, show_percentiles=False, show=False, skip_export=False):
fig = plt.figure(figsize=(rec_width, rec_height),
layout='constrained')
subfigs = fig.subfigures(rec_row, rec_col, wspace=0.01)
for i, column_name in enumerate(tqdm(columns.to_list(),
desc='Column')):
data = self.stats[column_name]
for i, column_name in enumerate(
tqdm(all_columns.to_list(), desc='Column')):
data = stats_and_meta[column_name]
# explode data to flatten inner list
data = data.explode().infer_objects()
grid = grid_indexes[i]
Expand Down Expand Up @@ -210,10 +217,7 @@ def draw_hist(self, ax, data, save_path, percentiles=None, show=False):
"""
# recommended number of bins
data_num = len(data)
if data_num >= 100:
rec_bins = int(math.sqrt(len(data)))
else:
rec_bins = None
rec_bins = max(int(math.sqrt(data_num)), 10)

# if ax is None, using plot method in pandas
if ax is None:
Expand Down
108 changes: 108 additions & 0 deletions data_juicer/analysis/measure.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import numpy as np

from data_juicer.utils.lazy_loader import LazyLoader

torch = LazyLoader('torch', 'torch')
td = LazyLoader('td', 'torch.distributions')
F = LazyLoader('F', 'torch.nn.functional')

stats = LazyLoader('stats', 'scipy.stats')


class Measure(object):
"""Base class for Measure distribution.
Expand Down Expand Up @@ -48,6 +52,15 @@ def _convert_to_categorical(self, p):
else:
return td.Categorical(torch.tensor(p))

def _convert_to_ndarray(self, p):
"""
Convert input data to torch tensor.
:param p: input data, now support
[`scalar`,`list`, `tuple`, `torch binary file`, and `Categorical`].
:return: torch tensor
"""
return self._convert_to_tensor(p).numpy()


class KLDivMeasure(Measure):
"""
Expand Down Expand Up @@ -108,3 +121,98 @@ class EntropyMeasure(Measure):
def measure(self, p):
p = self._convert_to_categorical(p)
return p.entropy()


class RelatedTTestMeasure(Measure):
"""
Measure T-Test for two related distributions on their histogram of the same
bins.

For continuous features or distributions, the input could be dataset stats
list.
For discrete features or distributions, the input could be the tags or the
categories list.
"""
name = 't-test'

@staticmethod
def stats_to_hist(p, q):
p = np.array(p)
q = np.array(q)

# get common maximum number of data samples, and max/min values
max_data_num = max(len(p), len(q))
min_val = min(min(p), min(q))
max_val = max(max(p), max(q))

# get a recommended number of bins
rec_bins = max(int(np.sqrt(max_data_num)), 10)

# get the common bin edges
common_p = np.append(p, [min_val, max_val])
hist_p, bin_edges = np.histogram(common_p, bins=rec_bins)
# restore the hist of the original p
hist_p[0] -= 1
hist_p[-1] -= 1
# get the hist of the original q using the common bin edges
hist_q, _ = np.histogram(q, bins=bin_edges)
return hist_p, hist_q, bin_edges

@staticmethod
def category_to_hist(p, q):

def flatten_list(lst):
res = []
for s in lst:
if isinstance(s, list):
res.extend(flatten_list(s))
else:
res.append(s)
return res

# flatten the list
p = flatten_list(p)
q = flatten_list(q)

# get the common categories
cat_p = set(p)
cat_q = set(q)
cat_common = cat_p.union(cat_q)

# get category distributions
count_p = {cat: 0 for cat in cat_common}
count_q = {cat: 0 for cat in cat_common}
for cat in p:
count_p[cat] += 1
for cat in q:
count_q[cat] += 1

# only keep distribution values sorted by counts
sorted_cat = list(count_p.items())
sorted_cat.sort(key=lambda it: it[1], reverse=True)
sorted_cat = [it[0] for it in sorted_cat]
# get the value dist
hist_p = [count_p[cat] for cat in sorted_cat]
hist_q = [count_q[cat] for cat in sorted_cat]

return hist_p, hist_q, count_p, count_q, sorted_cat

def measure(self, p, q):
"""
:param p: the first feature or distribution. (stats/tags/categories)
:param q: the second feature or distribution. (stats/tags/categories)
:return: the T-Test results object -- ([ref](https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats._result_classes.TtestResult.html#scipy.stats._result_classes.TtestResult)) # noqa: E501
"""
ele = p[0]
while isinstance(ele, list):
ele = ele[0]
if isinstance(ele, str):
# discrete tags or categories
hist_p, hist_q = self.category_to_hist(p, q)[:2]
else:
# continuous stats
hist_p, hist_q = self.stats_to_hist(p, q)[:2]

# compute the t-test and pval for hist_p and hist_q
ttest_res = stats.ttest_rel(hist_p, hist_q)
return ttest_res
16 changes: 13 additions & 3 deletions data_juicer/analysis/overall_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from loguru import logger
from tqdm import tqdm

from data_juicer.utils.constant import Fields
from data_juicer.utils.constant import DEFAULT_PREFIX, Fields


def _single_column_analysis(col, *args, **kwargs):
Expand All @@ -25,6 +25,12 @@ def __init__(self, dataset, output_path):
:param output_path: path to store the analysis results.
"""
self.stats = pd.DataFrame(dataset[Fields.stats])
self.meta = pd.DataFrame(dataset[Fields.meta])
# remove non-tag columns
meta_columns = self.meta.columns
for col_name in meta_columns:
if not col_name.startswith(DEFAULT_PREFIX):
self.meta = self.meta.drop(col_name, axis=1)
self.output_path = output_path
if not os.path.exists(self.output_path):
os.makedirs(self.output_path)
Expand Down Expand Up @@ -71,10 +77,14 @@ def analyze(self, percentiles=[], num_proc=1, skip_export=False):
# merge default and customized percentiles and get overall information
percentiles = list(set(percentiles + self.default_percentiles))

# merge stats and meta
stats_and_meta = pd.concat([self.stats, self.meta], axis=1)
all_columns = stats_and_meta.columns

results = []
pool = Pool(num_proc)
for col_name in self.stats.columns:
this_col = self.refine_single_column(self.stats[col_name])
for col_name in all_columns:
this_col = self.refine_single_column(stats_and_meta[col_name])
res = pool.apply_async(_single_column_analysis,
kwds={
'col': this_col,
Expand Down
84 changes: 54 additions & 30 deletions data_juicer/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,22 @@ def init_configs(args: Optional[List[str]] = None, which_entry: object = None):
help='Number of samples extracted by tracer to show the dataset '
'difference before and after a op. Only available when '
'open_tracer is true.')
parser.add_argument(
'--open_insight_mining',
type=bool,
default=False,
HYLcool marked this conversation as resolved.
Show resolved Hide resolved
help='Whether to open insight mining to trace the OP-wise stats/tags '
'changes during process. It might take more time when opening '
'insight mining.')
parser.add_argument(
'--op_list_to_mine',
type=List[str],
default=[],
help='Which OPs will be applied on the dataset to mine the insights '
'in their stats changes. Only those OPs that produce stats or '
'meta are valid. If it\'s empty, all OPs that produce stats and '
'meta will be involved. Only available when filter_list_to_mine '
'is true.')
parser.add_argument(
'--op_fusion',
type=bool,
Expand Down Expand Up @@ -513,13 +529,7 @@ def init_setup_from_cfg(cfg: Namespace):

# add all filters that produce stats
if cfg.auto:
import pkgutil

import data_juicer.ops.filter as djfilters
cfg.process = [{
filter_name: {}
} for _, filter_name, _ in pkgutil.iter_modules(djfilters.__path__)
if filter_name not in djfilters.NON_STATS_FILTERS]
cfg.process = load_ops_with_stats_meta()

# Apply text_key modification during initializing configs
# users can freely specify text_key for different ops using `text_key`
Expand All @@ -528,34 +538,48 @@ def init_setup_from_cfg(cfg: Namespace):
text_key = cfg.text_keys[0]
else:
text_key = cfg.text_keys
for op in cfg.process:
op_attrs = {
'text_key': text_key,
'image_key': cfg.image_key,
'audio_key': cfg.audio_key,
'video_key': cfg.video_key,
'num_proc': cfg.np,
'turbo': cfg.turbo,
}
cfg.process = update_op_attr(cfg.process, op_attrs)

return cfg


def load_ops_with_stats_meta():
import pkgutil

import data_juicer.ops.filter as djfilter
from data_juicer.ops import NON_STATS_FILTERS, TAGGING_OPS
stats_filters = [{
filter_name: {}
} for _, filter_name, _ in pkgutil.iter_modules(djfilter.__path__)
if filter_name not in NON_STATS_FILTERS.modules]
meta_ops = [{op_name: {}} for op_name in TAGGING_OPS.modules]
return stats_filters + meta_ops


def update_op_attr(op_list: list, attr_dict: dict = None):
if not attr_dict:
return op_list
updated_op_list = []
for op in op_list:
for op_name in op:
args = op[op_name]
if args is None:
args = {
'text_key': text_key,
'image_key': cfg.image_key,
'audio_key': cfg.audio_key,
'video_key': cfg.video_key,
'num_proc': cfg.np,
'turbo': cfg.turbo,
}
args = attr_dict
else:
if 'text_key' not in args or args['text_key'] is None:
args['text_key'] = text_key
if 'image_key' not in args or args['image_key'] is None:
args['image_key'] = cfg.image_key
if 'audio_key' not in args or args['audio_key'] is None:
args['audio_key'] = cfg.audio_key
if 'video_key' not in args or args['video_key'] is None:
args['video_key'] = cfg.video_key
if 'num_proc' not in args or args['num_proc'] is None:
args['num_proc'] = cfg.np
if 'turbo' not in args or args['turbo'] is None:
args['turbo'] = cfg.turbo
for key in attr_dict:
if key not in args or args[key] is None:
args[key] = attr_dict[key]
op[op_name] = args

return cfg
updated_op_list.append(op)
return updated_op_list


def _collect_config_info_from_class_docs(configurable_ops, parser):
Expand Down
Loading
Loading