Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement data-model sandbox, with refactoring existing DJ's features and tools #291

Merged
merged 17 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion configs/config_all.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ export_path: '/path/to/result/dataset.jsonl' # path to processed
export_shard_size: 0 # shard size of exported dataset in Byte. In default, it's 0, which means export the whole dataset into only one file. If it's set a positive number, the exported dataset will be split into several dataset shards, and the max size of each shard won't larger than the export_shard_size
export_in_parallel: false # whether to export the result dataset in parallel to a single file, which usually takes less time. It only works when export_shard_size is 0, and its default number of processes is the same as the argument np. **Notice**: If it's True, sometimes exporting in parallel might require much more time due to the IO blocking, especially for very large datasets. When this happens, False is a better choice, although it takes more time.
np: 4 # number of subprocess to process your dataset
text_keys: 'content' # the key name of field where the sample texts to be processed, e.g., `text`, `instruction`, `output`, ...
text_keys: 'text' # the key name of field where the sample texts to be processed, e.g., `text`, `instruction`, `output`, ...
# Note: currently, we support specify only ONE key for each op, for cases requiring multiple keys, users can specify the op multiple times. We will only use the first key of `text_keys` when you set multiple keys.
suffixes: [] # the suffix of files that will be read. For example: '.txt', 'txt' or ['txt', '.pdf', 'docx']
use_cache: true # whether to use the cache management of Hugging Face datasets. It might take up lots of disk space when using cache
Expand All @@ -22,6 +22,8 @@ op_list_to_trace: [] # only ops in this l
trace_num: 10 # number of samples to show the differences between datasets before and after each op. Only available when tracer is opened.
op_fusion: false # whether to fuse operators that share the same intermediate variables automatically. Op fusion might reduce the memory requirements slightly but speed up the whole process.
cache_compress: null # the compression method of the cache file, which can be specified in ['gzip', 'zstd', 'lz4']. If this parameter is None, the cache file will not be compressed. We recommend you turn on this argument when your input dataset is larger than tens of GB and your disk space is not enough.
keep_stats_in_res_ds: false # whether to keep the computed stats in the result dataset. The intermediate fields to store the stats computed by Filters will be removed if it's False. It's False in default.
keep_hashes_in_res_ds: false # whether to keep the computed hashes in the result dataset. The intermediate fields to store the hashes computed by Deduplicators will be removed if it's False. It's False in default.

# for multimodal data processing
image_key: 'images' # key name of field to store the list of sample image paths.
Expand All @@ -40,6 +42,18 @@ ray_address: auto # the address of the
# only for data analysis
save_stats_in_one_file: false # whether to store all stats result into one file

# for sandbox or hpo
model_infer_config: null # path or dict to model inference configuration file when calling model executor in sandbox. Related hooks will be disabled if it's not specified.
model_train_config: null # path or dict to model training configuration file when calling model executor in sandbox. Related hooks will be disabled if it's not specified.
model_eval_config: null # path or dict to model evaluation configuration file when calling model executor in sandbox. Related hooks will be disabled if it's not specified.
data_eval_config: null # path or dict to data evaluation configuration file when calling model executor in sandbox. Related hooks will be disabled if it's not specified.
data_probe_algo: 'uniform' # sampling algorithm for dataset. Should be one of ["uniform", "frequency_specified_field_selector", "topk_specified_field_selector"]. It's "uniform" in default. Only used for dataset sampling.
data_probe_ratio: 1.0 # the sampling ratio to the original dataset size. It's 1.0 in default. Only used for dataset sampling.
path_k_sigma_recipe: null # path to save a configuration file when using k-sigma tool to refine processing recipes
path_model_feedback_recipe: null # path to save a configuration file refined by model feedback
hpo_config: null # path to a configuration file when using auto-HPO tool.


# process schedule: a list of several process operators with their arguments
process:
# Mapper ops. Most of these ops need no arguments.
Expand Down
1 change: 1 addition & 0 deletions configs/demo/sandbox/gpt3_data_quality_eval_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
type: dj_text_quality_classifier
26 changes: 26 additions & 0 deletions configs/demo/sandbox/gpt3_extra_train_config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{
"type": "modelscope",
"model_name": "iic/nlp_gpt3_text-generation_chinese-base",
"trainer_name": "nlp-base-trainer",
"key_remapping": {
"text": "src_txt"
},
"train": {
"max_epochs": 3,
"lr_scheduler": {
"type": "StepLR",
"step_size": 2,
"options": {
"by_epoch": false
}
},
"optimizer": {
"type": "AdamW",
"lr": 3e-4
},
"dataloader": {
"batch_size_per_gpu": 2,
"workers_per_gpu": 0
}
}
}
18 changes: 18 additions & 0 deletions configs/demo/sandbox/gpt3_extra_train_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
type: modelscope
model_name: "iic/nlp_gpt3_text-generation_chinese-base"
trainer_name: "nlp-base-trainer"
key_remapping:
text: "src_txt"
train:
max_epochs: 2
lr_scheduler:
type: "StepLR"
step_size: 2
options:
by_epoch: false
optimizer:
type: "AdamW"
lr: 0.0003
dataloader:
batch_size_per_gpu: 2
workers_per_gpu: 0
27 changes: 27 additions & 0 deletions configs/demo/sandbox/sandbox.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Sandbox config example for dataset

# global parameters
project_name: 'demo-sandbox'
dataset_path: './demos/data/demo-dataset.jsonl' # path to your dataset directory or file
np: 4 # number of subprocess to process your dataset

export_path: './outputs/demo-sandbox/demo-sandbox.jsonl'

# sandbox configs
# for refining recipe using k-sigma rules
path_k_sigma_recipe: './outputs/demo-sandbox/k_sigma_new_recipe.yaml'

# for gpt3 quality classifier as data evaluator
data_eval_config: 'configs/demo/sandbox/gpt3_data_quality_eval_config.yaml'
#data_eval_config:
# type: dj_text_quality_classifier

# for gpt3 model training
model_train_config: 'configs/demo/sandbox/gpt3_extra_train_config.json'

# process schedule
# a list of several process operators with their arguments
process:
- language_id_score_filter:
lang: 'zh'
min_score: 0.5
59 changes: 54 additions & 5 deletions data_juicer/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from jsonargparse import (ActionConfigFile, ArgumentParser, dict_to_namespace,
namespace_to_dict)
from jsonargparse.typing import NonNegativeInt, PositiveInt
from jsonargparse.typing import ClosedUnitInterval, NonNegativeInt, PositiveInt
from loguru import logger

from data_juicer.ops.base_op import OPERATORS
Expand All @@ -32,7 +32,7 @@ def init_configs(args=None):

parser.add_argument('--config',
action=ActionConfigFile,
help='Path to a configuration file.',
help='Path to a dj basic configuration file.',
required=True)

parser.add_argument(
Expand All @@ -41,9 +41,58 @@ def init_configs(args=None):
help='Path to a configuration file when using auto-HPO tool.',
required=False)
parser.add_argument(
'--path_3sigma_recipe',
'--path_k_sigma_recipe',
type=str,
help='Path to save a configuration file when using 3-sigma tool.',
help='Path to save a configuration file when using k-sigma tool.',
required=False)
parser.add_argument(
'--path_model_feedback_recipe',
type=str,
help='Path to save a configuration file refined by model feedback.',
required=False)
parser.add_argument(
'--model_infer_config',
type=Union[str, dict],
help='Path or a dict to model inference configuration file when '
'calling model executor in sandbox. If not specified, the model '
'inference related hooks will be disabled.',
required=False)
parser.add_argument(
'--model_train_config',
type=Union[str, dict],
help='Path or a dict to model training configuration file when '
'calling model executor in sandbox. If not specified, the model '
'training related hooks will be disabled.',
required=False)
parser.add_argument(
'--data_eval_config',
type=Union[str, dict],
help='Path or a dict to eval configuration file when calling '
'auto-evaluator for data in sandbox. '
'If not specified, the eval related hooks will be disabled.',
required=False)
parser.add_argument(
'--model_eval_config',
type=Union[str, dict],
help='Path or a dict to eval configuration file when calling '
'auto-evaluator for model in sandbox. '
'If not specified, the eval related hooks will be disabled.',
required=False)
parser.add_argument(
'--data_probe_algo',
type=str,
default='uniform',
help='Sampling algorithm to use. Options are "uniform", '
'"frequency_specified_field_selector", or '
'"topk_specified_field_selector". Default is "uniform". Only '
'used for dataset sampling',
required=False)
parser.add_argument(
'--data_probe_ratio',
type=ClosedUnitInterval,
default=1.0,
help='The ratio of the sample size to the original dataset size. '
'Default is 1.0 (no sampling). Only used for dataset sampling',
required=False)

# basic global paras with extended type hints
Expand Down Expand Up @@ -436,7 +485,7 @@ def _collect_config_info_from_class_docs(configurable_ops, parser):
"""
Add ops and its params to parser for command line.

:param configurable_ops: a list of ops to be to added, each item is
:param configurable_ops: a list of ops to be added, each item is
a pair of op_name and op_class
:param parser: jsonargparse parser need to update
"""
Expand Down
52 changes: 52 additions & 0 deletions data_juicer/core/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,20 @@

from data_juicer import use_cuda
from data_juicer.config import init_configs
from data_juicer.core.data import Dataset
from data_juicer.format.load import load_formatter
from data_juicer.format.mixture_formatter import MixtureFormatter
from data_juicer.ops import (OPERATORS, Deduplicator, Filter, Mapper, Selector,
load_ops)
from data_juicer.utils import cache_utils
from data_juicer.utils.ckpt_utils import CheckpointManager
from data_juicer.utils.constant import Fields
from data_juicer.utils.process_utils import calculate_np

from ..ops.selector.frequency_specified_field_selector import \
FrequencySpecifiedFieldSelector
from ..ops.selector.topk_specified_field_selector import \
TopkSpecifiedFieldSelector
from .data import add_same_content_to_new_column
from .exporter import Exporter
from .tracer import Tracer
Expand Down Expand Up @@ -86,6 +92,52 @@ def __init__(self, cfg=None):
logger.info('Trace for all ops.')
self.op_list_to_trace = set(OPERATORS.modules.keys())

def sample_data(self,
dataset_to_sample: Dataset = None,
load_data_np=None,
sample_ratio: float = 1.0,
sample_algo: str = 'uniform',
**kwargs):
"""
Sample a subset from the given dataset.

:param dataset_to_sample: Dataset to sample from. If None, will use
the formatter linked by the executor. Default is None.
:param load_data_np: number of workers when loading the dataset.
:param sample_ratio: The ratio of the sample size to the original
dataset size. Default is 1.0 (no sampling).
:param sample_algo: Sampling algorithm to use. Options are "uniform",
"frequency_specified_field_selector", or
"topk_specified_field_selector".
Default is "uniform".
:return: A sampled Dataset.
"""
# Determine the dataset to sample from
if dataset_to_sample is not None:
dataset = dataset_to_sample
elif self.cfg.use_checkpoint and self.ckpt_manager.ckpt_available:
logger.info('Loading dataset from checkpoint...')
dataset = self.ckpt_manager.load_ckpt()
elif hasattr(self, 'formatter'):
logger.info('Loading dataset from data formatter...')
if load_data_np is None:
load_data_np = self.cfg.np
dataset = self.formatter.load_dataset(load_data_np, self.cfg)
else:
raise ValueError('No dataset available to sample from.')

# Perform sampling based on the specified algorithm
if sample_algo == 'uniform':
return MixtureFormatter.random_sample(dataset, sample_ratio)
elif sample_algo == 'frequency_specified_field_selector':
dj_op = FrequencySpecifiedFieldSelector(**kwargs)
return dj_op.process(dataset)
elif sample_algo == 'topk_specified_field_selector':
dj_op = TopkSpecifiedFieldSelector(**kwargs)
return dj_op.process(dataset)
else:
raise ValueError(f'Unsupported sample_algo: {sample_algo}')

def run(self, load_data_np=None):
"""
Running the dataset process pipeline.
Expand Down
88 changes: 88 additions & 0 deletions data_juicer/core/sandbox/evaluators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import os
import shutil

# TODO: cannot import tools correctly if DJ is installed by pypi. Maybe we need
# other importing methods.
from tools.quality_classifier.predict import predict_score


class BaseEvaluator(object):

def __init__(self, eval_config: dict):
self.eval_config = eval_config

def run(self, eval_type, eval_obj, **kwargs) -> dict:
"""
conduct the evaluation given specified measurement
on specified target object;
return evaluated results in a dict: {res_name: res_val}
"""
raise NotImplementedError


class Gpt3QualityEvaluator(BaseEvaluator):

def run(self, eval_type, eval_obj, **kwargs):
if eval_type == 'data':
# eval_obj is the path to the dataset to be evaluated
assert isinstance(eval_obj, str)
input_data_path = eval_obj
tmp_res_export_path = input_data_path + '.tmp_res.jsonl'
if os.path.exists(tmp_res_export_path):
if os.path.isfile(tmp_res_export_path):
os.remove(tmp_res_export_path)
if os.path.isdir(tmp_res_export_path):
shutil.rmtree(tmp_res_export_path)

overall_quality_stats = predict_score(input_data_path,
tmp_res_export_path,
overall_stats=True)

shutil.rmtree(tmp_res_export_path)

# by default, using the mean quality score of processed data
# as final score
return float(overall_quality_stats.loc['mean'])
else:
raise NotImplementedError(
'Unsupported evaluation type: {}'.format(eval_type))


class HelmEvaluator(BaseEvaluator):

def run(self, eval_type, eval_obj, **kwargs):
raise NotImplementedError("To be refactored from dj's `thirdparty`.")


class GptEvaluator(BaseEvaluator):

def run(self, eval_type, eval_obj, **kwargs):
raise NotImplementedError('To be refactored from `tools.evaluator`,')


class VideoFvdEvaluator(BaseEvaluator):

def run(self, eval_type, eval_obj, **kwargs):
raise NotImplementedError(
'To be refactored from video fvd/isv related tools.')


class Gpt4VEvaluator(BaseEvaluator):

def run(self, eval_type, eval_obj, **kwargs):
raise NotImplementedError(
'To be refactored from gpt4v related operators/tools.')


class LmHarnessEvaluator(BaseEvaluator):

def run(self, eval_type, eval_obj, **kwargs):
raise NotImplementedError(
'To be refactored from, used in data-juicer competition.')


class ModelscopeEvaluator(BaseEvaluator):

def run(self, eval_type, eval_obj, **kwargs):
raise NotImplementedError(
'To be implemented from https://github.com/modelscope/eval-scope.')
Loading
Loading