Skip to content

Commit

Permalink
- implemented an auto-hpo tool for data-recipes based on 3-sigma tool
Browse files Browse the repository at this point in the history
- added a helper class to build the mapping from OPs' op_name to their used stats_key
- added a config exporter
  • Loading branch information
yxdyc committed Dec 18, 2023
1 parent 48c081e commit 74e7236
Show file tree
Hide file tree
Showing 6 changed files with 233 additions and 14 deletions.
45 changes: 44 additions & 1 deletion data_juicer/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
from data_juicer.utils.logger_utils import setup_logger
from data_juicer.utils.mm_utils import SpecialTokens

global_cfg = None
global_parser = None


def init_configs(args=None):
"""
Expand All @@ -37,6 +40,11 @@ def init_configs(args=None):
type=str,
help='Path to a configuration file when using auto-HPO tool.',
required=False)
parser.add_argument(
'--path_3sigma_recipe',
type=str,
help='Path to save a configuration file when using 3-sigma tool.',
required=False)

# basic global paras with extended type hints
# e.g., files can be mode include flags
Expand Down Expand Up @@ -294,6 +302,10 @@ def init_configs(args=None):
# show the final config tables before the process started
display_config(cfg)

global global_cfg, global_parser
global_cfg = cfg
global_parser = parser

return cfg
except ArgumentError:
logger.error('Config initialization failed')
Expand Down Expand Up @@ -371,7 +383,7 @@ def init_setup_from_cfg(cfg):
f'variable HF_DATASETS_CACHE.')
config.HF_DATASETS_CACHE = cfg.ds_cache_dir
else:
cfg.ds_cache_dir = config.HF_DATASETS_CACHE
cfg.ds_cache_dir = str(config.HF_DATASETS_CACHE)

# if there is suffix_filter op, turn on the add_suffix flag
cfg.add_suffix = False
Expand Down Expand Up @@ -478,6 +490,37 @@ def display_config(cfg):
print(table)


def export_config(cfg, path, format='yaml', skip_none=True, skip_check=True,
overwrite=False, multifile=True, branch=None):
"""
save the config object, some params are from jsonargparse
:param cfg: cfg object to save (Namespace type)
:param path: the save path
:param format: 'yaml', 'json', 'json_indented', 'parser_mode'
:param skip_none: Whether to exclude entries whose value is None.
:param skip_check: Whether to skip parser checking.
:param overwrite: Whether to overwrite existing files.
:param multifile: Whether to save multiple config files
by using the __path__ metas.
:param branch:
:return:
"""
# remove ops outside the process list for better displaying
cfg_to_export = cfg.clone()
for op in OPERATORS.modules.keys():
_ = cfg_to_export.pop(op)

global global_parser
if not global_parser:
init_configs() # enable the customized type parser
global_parser.save(
cfg=cfg_to_export, path=path, format=format, skip_none=skip_none,
skip_check=skip_check, overwrite=overwrite, multifile=multifile,
branch=branch)

logger.info(f'Saved the configuration in {path}')


def merge_config(ori_cfg, new_cfg: Dict):
"""
Merge configuration from new_cfg into ori_cfg
Expand Down
71 changes: 70 additions & 1 deletion data_juicer/utils/constant.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
import copy
import inspect
import os

from loguru import logger

DEFAULT_PREFIX = '__dj__'


Expand All @@ -8,7 +14,66 @@ class Fields(object):
suffix = DEFAULT_PREFIX + 'suffix__'


class StatsKeys(object):
class StatsKeysMeta(type):
"""
a helper class to track the mapping from OP's name to its used stats_keys
e.g., # once the AlphanumericFilter's compute_stats method has been called
res = TrackingDescriptor.get_access_log()
print(res) # {"AlphanumericFilter": ["alnum_ratio", "alpha_token_ratio"]}
"""
_accessed_by = {}

def __getattr__(cls, attr):
caller_class = inspect.currentframe().f_back.f_globals['__name__']
# no need to track the parent classes
caller_class = caller_class.split('.')[-1]
stat_key = getattr(cls._constants_class, attr)
if caller_class not in cls._accessed_by:
cls._accessed_by[caller_class] = set()
if stat_key not in cls._accessed_by[caller_class]:
cls._accessed_by[caller_class].add(stat_key)
return stat_key

def get_access_log(cls, dj_cfg=None):
if cls._accessed_by:
return cls._accessed_by
elif dj_cfg:
tmp_dj_cfg = copy.deepcopy(dj_cfg)
# the access has been skipped due to the use of cache
# we will using a temp data sample to get the access log
if os.path.exists(dj_cfg.dataset_path) and \
'jsonl' in dj_cfg.dataset_path:
logger.info(
'Begin to track the usage of ops with a dummy data sample')

# load the first line as tmp_data
tmp_f_name = dj_cfg.dataset_path.\
replace('.jsonl', '.tmp.jsonl')
with open(dj_cfg.dataset_path, 'r') as orig_file, \
open(tmp_f_name, 'w') as tmp_file:
first_line = orig_file.readline()
tmp_file.write(first_line)

tmp_dj_cfg.dataset_path = tmp_f_name
tmp_dj_cfg.use_cache = False
tmp_dj_cfg.use_checkpoint = False

from data_juicer.core import Analyser
tmp_analyzer = Analyser(tmp_dj_cfg)
tmp_analyzer.run()

os.remove(tmp_f_name)
else:
raise NotImplementedError(
f'For now, the dummy data is supported for only jsonl type'
f'. Please check your config as {dj_cfg.dataset_path} is '
f'either not existed or in jsonl type.')

return cls._accessed_by


class StatsKeysConstant(object):
# text
alpha_token_ratio = 'alpha_token_ratio'
alnum_ratio = 'alnum_ratio'
Expand Down Expand Up @@ -41,6 +106,10 @@ class StatsKeys(object):
image_text_matching_score = 'image_text_matching_score'


class StatsKeys(object, metaclass=StatsKeysMeta):
_constants_class = StatsKeysConstant


class HashKeys(object):
hash = DEFAULT_PREFIX + 'hash'
minhash = DEFAULT_PREFIX + 'minhash'
Expand Down
29 changes: 23 additions & 6 deletions tools/hpo/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,23 @@
# Hyper-parameter Optimization for Data Recipe

## Auto-HPO
## Auto-HPO based on 3-Sigma principles
A simple automatic hyper-parameter optimization method for data recipes is to assume that outlier data is harmful to training.
We thus can introduce the 3-sigma principle to automatically determine the hyper-parameters and filter the data.

Specifically, assuming that a certain analysis dimension of the original data obeys a normal distribution and has random errors, we can set the upper and lower bounds of the filtering OP in this dimension to three times the standard deviation based on the statistics produced by the DataJuicer's Analyzer.

$$P(|x-\mu| > 3\sigma) \leq 0.003$$

To automate this process, we provide the tool which can be used as follows:
```shell
# cd tools/hpo
python execute_hpo_3sigma.py --config <data-process-cfg-file-path>

#e.g.,
python execute_hpo_3sigma.py --config configs/process.yaml
```

## Auto-HPO with WandB

We incorporate an automated HPO tool, WandB [Sweep](https://docs.wandb.ai/guides/sweeps), into Data-Juicer to streamline the finding of good data processing hyper-parameters.
With this tool, users can investigate correlations and importance scores of
Expand All @@ -11,7 +28,7 @@ a large room to explore. Feel free to provide more suggestions, discussion,
and contribution via new PRs!


## Prerequisite
### Prerequisite
You need to install data-juicer first.
Besides, the tool leverages WandB, install it via `pip install wandb`.
Before using this tool, you need to run
Expand All @@ -26,17 +43,17 @@ wandb login --host <URL of your wandb instance>



## Usage and Customization
### Usage and Customization

Given a data recipe, characterized by specified configuration file
`<data-process-cfg-file-path>`, you can use `execute_hpo.py` to search the
`<data-process-cfg-file-path>`, you can use `execute_hpo_wandb.py` to search the
hyper-parameter space defined by `<hpo-cfg-file-path>`.
```shell
# cd tools/hpo
python execute_hpo.py --config <data-process-cfg-file-path> --hpo_config <hpo-cfg-file-path>
python execute_hpo_wandb.py --config <data-process-cfg-file-path> --hpo_config <hpo-cfg-file-path>

# e.g.,
python execute_hpo.py --config configs/process.yaml --hpo_config configs/quality_score_hpo.yaml
python execute_hpo_wandb.py --config configs/process.yaml --hpo_config configs/quality_score_hpo.yaml
```

For the configuration for data recipe, i.e., `<data-process-cfg-file-path>`,
Expand Down
33 changes: 27 additions & 6 deletions tools/hpo/README_ZH.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,27 @@
# 数据菜谱的自动化超参优化

## Auto-HPO
## 基于3-Sigma原则进行Auto-HPO
一种简单的数据菜谱自动调参方法是假设outlier数据对训练有害,那么我们可以引入3-sigma原则来自动确定超参,过滤数据。具体来说,假设原始数据的某个分析维度服从正态分布且存在随机误差,我们可以基于Analyzer产出的stats,在该维度上
把算子过滤的上下界设为三倍标准差。

$$P(|x-\mu| > 3\sigma) \leq 0.003$$

为了自动化该过程,我们提供了相应工具:
```shell
# cd tools/hpo
# usage 1: do not save the refined recipe
python execute_hpo_3sigma.py --config <data-process-cfg-file-path>
# usage 2: save the refined recipe at the given path
python execute_hpo_3sigma.py --config <data-process-cfg-file-path> --path_3sigma_recipe <data-process-cfg-file-after-refined-path>

# e.g., usage 1
python execute_hpo_3sigma.py --config configs/process.yaml
# e.g., usage 2
python execute_hpo_3sigma.py --config configs/process.yaml --path_3sigma_recipe configs/process_3sigma.yaml
```


## 基于WandB进行Auto-HPO

我们将自动化 HPO (hyper-parameters optimization) 工具 WandB [Sweep](https://docs.wandb.ai/guides/sweeps) 结合到
Data-Juicer 中,以简化改良数据处理超参数的过程。
Expand All @@ -11,7 +32,7 @@ Data-Juicer 中,以简化改良数据处理超参数的过程。
并通过新的 PR 做出贡献!


## 前置条件
### 前置条件
您需要先安装 data-juicer。
此外,该工具利用了 WandB,通过`pip install wandb`安装它。
在使用此工具之前,您需要运行`wandb login`并输入您的 WandB
Expand All @@ -25,17 +46,17 @@ wandb login --host <URL of your wandb instance>



## 使用和定制化
### 使用和定制化

给定一个数据配方,以指定的配置文件所定义`<data-process-cfg-file-path>`,您可以使用 `execute_hpo.py` 来搜索
给定一个数据配方,以指定的配置文件所定义`<data-process-cfg-file-path>`,您可以使用 `execute_hpo_wandb.py` 来搜索
`<hpo-cfg-file-path>`定义的超参数空间。

```shell
# cd tools/hpo
python execute_hpo.py --config <data-process-cfg-file-path> --hpo_config <hpo-cfg-file-path>
python execute_hpo_wandb.py --config <data-process-cfg-file-path> --hpo_config <hpo-cfg-file-path>

# e.g.,
python execute_hpo.py --config configs/process.yaml --hpo_config configs/quality_score_hpo.yaml
python execute_hpo_wandb.py --config configs/process.yaml --hpo_config configs/quality_score_hpo.yaml
```

对于数据菜谱的配置,即`<data-process-cfg-file-path>`
Expand Down
69 changes: 69 additions & 0 deletions tools/hpo/execute_hpo_3sigma.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import copy
import sys

from loguru import logger

from data_juicer.config import export_config, init_configs
from data_juicer.core import Analyser, Executor
from data_juicer.utils.constant import StatsKeys


@logger.catch
def main():

path_3sigma_recipe = None
for i in range(len(sys.argv) - 1):
if sys.argv[i] == '--path_3sigma_recipe':
path_3sigma_recipe = sys.argv[i + 1]

# 1. analyze using the given initial recipe
cfg = init_configs()
logger.info('Begin to analyze data using the given initial recipe')

analyser = Analyser(cfg)
analyser.run()
df = analyser.overall_result
# get the mapping from op_name to their mu and sigma
mean_series = df[df.index == 'mean']
stats_key_to_mean = mean_series.iloc[0, :].to_dict()
std_series = df[df.index == 'std']
stats_key_to_std = std_series.iloc[0, :].to_dict()

# 2. adjust the hyper-parameters of the given recipe with 3-sigma rule
logger.info('Begin to modify the recipe with 3-sigma rule')
op_name_to_stats_key = StatsKeys.get_access_log(dj_cfg=cfg)
for process in cfg.process:
op_name, args = list(process.items())[0]
temp_args = copy.deepcopy(args)
stats_keys = op_name_to_stats_key[op_name]
for stats_key in stats_keys:
if stats_key in stats_key_to_mean:
for arg_name in temp_args.keys():
new_val = None
if 'min' in arg_name:
new_val = stats_key_to_mean[stats_key] - \
3 * stats_key_to_std[stats_key]
if 'max' in arg_name:
new_val = stats_key_to_mean[stats_key] + \
3 * stats_key_to_std[stats_key]
if new_val is not None:
logger.info(f'Using 3-sigma rule, changed para '
f'{arg_name}={args[arg_name]} into '
f'{arg_name}={new_val}')
args[arg_name] = new_val

if path_3sigma_recipe:
export_config(cfg, path_3sigma_recipe)

# 3. process the data using the refined recipe
logger.info('Begin to process the data with refined recipe')
if cfg.executor_type == 'default':
executor = Executor(cfg)
elif cfg.executor_type == 'ray':
from data_juicer.core.ray_executor import RayExecutor
executor = RayExecutor(cfg)
executor.run()


if __name__ == '__main__':
main()
File renamed without changes.

0 comments on commit 74e7236

Please sign in to comment.