整体基于Huggingface Datasets设计,Huggingface datasets集成了丰富的基础功能包括一些常见文件的解析(json、csv等),多进程处理数据等功能,由于我们部分数据集暂时未开源,为了做统一的管理,所以选择在datasets上再包一层,封装了处理、加载、地址管理等逻辑。
其中一个核心的思想是,利用datasets.load_dataset将原始文件自动转换成PyArrow的格式,利用datasets.save_to_dick将PyArrow格式的数据集作为Cache缓存,在之后的使用中,只需要使用datasets.load_from_disk即可利用PyArrow的特性快速读取、处理数据。
import fs_datasets
fs_datasets.list_dataset()
fs_datasets.load_dataset('afqmc')
huggingface datasets已知问题:
- 处理大文件效果极其低下,因为需要将原始文件转换成PyArrow的形式,这个过程很慢,所以我们单个文件建议切分至500M左右,切分后的数据集采用多进程的方式加载,加载后再合并,可以显著提升加载性能
- 大数据集的情况下,做shuffle十分耗时,建议在生成缓存时就做shuffle
悟道数据集是目前中文领域中比较大的数据集,我们以处理悟道180G数据为例展示一个比较简单但完整的流程。完整代码位于目录wudao_180g下 悟道数据集包含将近60M个文本,每一行数据格式如下:
{"title": "朝阳县羊山高级中学", "content": "朝阳县羊山高级中学成立于1956年,是辽宁普通中学学校,从事中学教育,学校位于朝阳县羊山镇羊山村委会。基本信息 学校名称:朝阳县羊山高级中学 成立时间:1956 学校类型:普通中学中学教育 所在区域:辽宁 学校地址:朝阳县羊山镇羊山村委会 邮 编:122614"}
由于我们首先需要对文件进行切分,在linux环境下,split方法可以快速的对文本进行切分。
split -100000 wudao.json
切分后的文件夹长这样
total 187G
-rw-r--r-- 1 gaoxinyu ai4cogsys 361M Apr 2 18:05 aa
-rw-r--r-- 1 gaoxinyu ai4cogsys 333M Apr 2 18:05 ab
-rw-r--r-- 1 gaoxinyu ai4cogsys 344M Apr 2 18:05 ac
-rw-r--r-- 1 gaoxinyu ai4cogsys 337M Apr 2 18:05 ad
-rw-r--r-- 1 gaoxinyu ai4cogsys 367M Apr 2 18:05 ae
-rw-r--r-- 1 gaoxinyu ai4cogsys 311M Apr 2 18:05 af
-rw-r--r-- 1 gaoxinyu ai4cogsys 311M Apr 2 18:05 ag
-rw-r--r-- 1 gaoxinyu ai4cogsys 383M Apr 2 18:05 ah
...
...
...
在fs_datasets的目录下,我们需要先新建一个目录,目录名即dataset的名字,之后我们通过fs_datasets.load_dataset('dir_name')即可通过目录名加载数据集了。
我们新建一个目录wudao_180g,并且在目录下新建一个load.py,在里面包含了路径管理、处理逻辑、加载逻辑。
处理逻辑指原始文件到PyArrow的过程,为什么要放load.py中呢? 我们希望使用这个数据集的同学能清楚的知道这个数据集是怎么处理的,所以强烈建议放在同一个文件中做管理
在load.py中,首先是需要定义路径管理
# Step0中切分好的数据目录
_SPLIT_DATA_PATH = '/path/WuDaoCorpus180G_split_100k/*'
# PyArrow存储路径,也是我们之后加载数据集时的路径
_CACHE_TRAIN_DATA_PATH = '/path/hf_cache_split_100k/'
然后需要将原始文件转换成pyarrow的格式,这部分因为我们之前切分成了N份,所以这里建议使用多进程的方式处理。出于个人习惯,我通常会定义个主函数generate_cache_arrow作为生成缓存的总入口。
def generate_cache_arrow(num_proc=1) -> None:
'''
生成HF支持的缓存文件,加速后续的加载
'''
# 遍历切分目录下的所有文件
data_dict_paths = glob.glob(_SPLIT_DATA_PATH)
# 起多进程
p = ProcessPoolExecutor(max_workers=num_proc)
res = []
for index, path in enumerate(data_dict_paths):
# _generate_cache_arrow 为实际的转换逻辑
res.append(p.submit(_generate_cache_arrow, index, path))
p.shutdown(wait=True)
for future in res:
print(future.result(), flush=True)
在**_generate_cache_arrow**中我定义了一个原始文件到PyArrow的逻辑
def _generate_cache_arrow(index, path):
print('saving dataset shard {}'.format(index))
# datasets.load_dataset 会将实际的原始文件隐式的转换成PyArrow
# 单个文件的话默认会生成一个‘train’
ds = (datasets.load_dataset('json', data_files=path)['train'])
# 将这个PyArrow文件保存到Cache目录下
ds.save_to_disk(os.path.join(_CACHE_TRAIN_DATA_PATH, os.path.basename(path)))
return 'saving dataset shard {} done'.format(index)
当然这里的**_generate_cache_arrow**逻辑也比较简单,在实际使用中,可以加入分词、分句、train_test_split等等特殊逻辑,wudao_180g_spbpe_tokenized就是基于wudao_180g数据做了更进一步处理的例子,如有需要可以参考
最后仅需要运行一次generate_cache_arrow生成一次Cache后就好,之后不需要再调用,所以在load.py下实现一个main函数即可
if __name__ == '__main__':
generate_cache_arrow(num_proc=100)
python load.py
这样我们简单的文件处理过程就已经由原始文件转换成PyArrow,其中,一个原始文件经过转换后会变成一个文件,包含以下三个文件
total 360M
-rwxrwxrwx 1 gaoxinyu ai4cogsys 360M Apr 2 23:01 dataset.arrow
-rwxrwxrwx 1 gaoxinyu ai4cogsys 930 Apr 2 23:01 dataset_info.json
-rwxrwxrwx 1 gaoxinyu ai4cogsys 256 Apr 2 23:01 state.json
最后一步也是最重要的一步,数据集的读取逻辑,数据集读取统一由load_dataset函数作为入口,因为我们切分多分数据集,所以这里我们也需要使用多进程来加载
def load_dataset(num_proc=1, **kargs):
cache_dict_paths = glob.glob(os.path.join(_CACHE_TRAIN_DATA_PATH, '*'))
ds = []
res = []
p = ProcessPoolExecutor(max_workers=num_proc)
for path in cache_dict_paths:
# 使用load_from_disk加载PyArrow类型的数据集
res.append(p.submit(datasets.load_from_disk,
path, **kargs))
p.shutdown(wait=True)
for future in res:
ds.append(future.result())
# 对切分数据集做拼接合并成一个完整的数据集,跟一个文件的数据集使用起来就没有任何区别啦
return datasets.DatasetDict({"train": datasets.concatenate_datasets(ds)})
一个完整的数据集就被我们处理成fs_datasets的格式了,其他同学通过名字就可以调用你的数据集了~