From ab5f0f6c8b88ddd8b843d788595bcba88e9d2f53 Mon Sep 17 00:00:00 2001 From: Yilun Huang Date: Tue, 16 Apr 2024 11:38:24 +0800 Subject: [PATCH 1/2] Fix: avoid extracting empty frame list (#295) * - remove wechat group qr code * * fix unittest error * * fix unittest error * install ffmpeg * * install ffmpeg --- .github/workflows/unit-test.yml | 1 + data_juicer/utils/mm_utils.py | 15 +++++-- tests/config/test_config_funcs.py | 43 +++++++++++++++++++ .../ops/mapper/test_image_diffusion_mapper.py | 2 +- 4 files changed, 57 insertions(+), 4 deletions(-) diff --git a/.github/workflows/unit-test.yml b/.github/workflows/unit-test.yml index 9809449e0..3eb21202d 100644 --- a/.github/workflows/unit-test.yml +++ b/.github/workflows/unit-test.yml @@ -27,6 +27,7 @@ jobs: df -h - name: Install dependencies run: | + sudo apt-get install ffmpeg python -m pip install --upgrade pip pip install -v -e .[all] - name: Increase swapfile diff --git a/data_juicer/utils/mm_utils.py b/data_juicer/utils/mm_utils.py index 05fc1d9f5..84b4e3caa 100644 --- a/data_juicer/utils/mm_utils.py +++ b/data_juicer/utils/mm_utils.py @@ -394,7 +394,8 @@ def process_each_frame(input_video: Union[str, av.container.InputContainer], def extract_key_frames(input_video: Union[str, av.container.InputContainer]): """ - Extract key frames from the input video. + Extract key frames from the input video. If there is no keyframes in the + video, return the first frame. :param input_video: input video path or container. :return: a list of key frames. @@ -414,12 +415,20 @@ def extract_key_frames(input_video: Union[str, av.container.InputContainer]): ori_skip_method = input_video_stream.codec_context.skip_frame input_video_stream.codec_context.skip_frame = 'NONKEY' # restore to the beginning of the video - container.seek(0, backward=False, any_frame=False) + container.seek(0) for frame in container.decode(input_video_stream): key_frames.append(frame) # restore to the original skip_type input_video_stream.codec_context.skip_frame = ori_skip_method + if len(key_frames) == 0: + logger.warning(f'No keyframes in this video [{input_video}]. Return ' + f'the first frame instead.') + container.seek(0) + for frame in container.decode(input_video_stream): + key_frames.append(frame) + break + if isinstance(input_video, str): container.close() return key_frames @@ -514,7 +523,7 @@ def extract_video_frames_uniformly( continue if key_frame_second == 0.0: # search from the beginning - container.seek(0, backward=False, any_frame=True) + container.seek(0) search_idx = 0 curr_pts = second_group[search_idx] / time_base for frame in container.decode(input_video_stream): diff --git a/tests/config/test_config_funcs.py b/tests/config/test_config_funcs.py index a2748ed7f..bd2d6e7d8 100644 --- a/tests/config/test_config_funcs.py +++ b/tests/config/test_config_funcs.py @@ -41,6 +41,13 @@ def test_yaml_cfg_file(self): 'text_key': 'text', 'image_key': 'images', 'audio_key': 'audios', + 'video_key': 'videos', + 'accelerator': 'cpu', + 'spec_numprocs': 0, + 'cpu_required': 1, + 'mem_required': 0, + 'use_actor': False, + 'batched_op': False, } }, 'nested dict load fail, for nonparametric op') self.assertDictEqual( @@ -51,6 +58,12 @@ def test_yaml_cfg_file(self): 'text_key': 'text', 'image_key': 'images', 'audio_key': 'audios', + 'video_key': 'videos', + 'accelerator': 'cpu', + 'spec_numprocs': 0, + 'cpu_required': 1, + 'mem_required': 0, + 'use_actor': False, } }, 'nested dict load fail, un-expected internal value') @@ -83,6 +96,12 @@ def test_mixture_cfg(self): 'text_key': 'text', 'image_key': 'images', 'audio_key': 'audios', + 'video_key': 'videos', + 'accelerator': 'cpu', + 'spec_numprocs': 0, + 'cpu_required': 1, + 'mem_required': 0, + 'use_actor': False, } }) self.assertDictEqual( @@ -93,6 +112,12 @@ def test_mixture_cfg(self): 'text_key': 'text', 'image_key': 'images', 'audio_key': 'audios', + 'video_key': 'videos', + 'accelerator': 'cpu', + 'spec_numprocs': 0, + 'cpu_required': 1, + 'mem_required': 0, + 'use_actor': False, } }) self.assertDictEqual( @@ -103,6 +128,12 @@ def test_mixture_cfg(self): 'text_key': 'text', 'image_key': 'images', 'audio_key': 'audios', + 'video_key': 'videos', + 'accelerator': 'cpu', + 'spec_numprocs': 0, + 'cpu_required': 1, + 'mem_required': 0, + 'use_actor': False, } }) self.assertDictEqual( @@ -113,6 +144,12 @@ def test_mixture_cfg(self): 'text_key': 'text', 'image_key': 'images', 'audio_key': 'audios', + 'video_key': 'videos', + 'accelerator': 'cpu', + 'spec_numprocs': 0, + 'cpu_required': 1, + 'mem_required': 0, + 'use_actor': False, } }) self.assertDictEqual( @@ -123,6 +160,12 @@ def test_mixture_cfg(self): 'text_key': 'text', 'image_key': 'images', 'audio_key': 'audios', + 'video_key': 'videos', + 'accelerator': 'cpu', + 'spec_numprocs': 0, + 'cpu_required': 1, + 'mem_required': 0, + 'use_actor': False, } }) diff --git a/tests/ops/mapper/test_image_diffusion_mapper.py b/tests/ops/mapper/test_image_diffusion_mapper.py index 87b0f51ca..77db34a99 100644 --- a/tests/ops/mapper/test_image_diffusion_mapper.py +++ b/tests/ops/mapper/test_image_diffusion_mapper.py @@ -150,7 +150,7 @@ def test_for_fp16_given_caption_string(self): dataset = NestedDataset.from_list(ds_list) op = ImageDiffusionMapper(hf_diffusion=self.hf_diffusion, torch_dtype='fp16', - revision='fp16' + revision='fp16', aug_num=aug_num, keep_original_sample=False, caption_key='text') From c1a8aa808ea12cf298c177130b6d5854cf08fbb7 Mon Sep 17 00:00:00 2001 From: garyzhang99 <46197280+garyzhang99@users.noreply.github.com> Date: Tue, 16 Apr 2024 17:18:00 +0800 Subject: [PATCH 2/2] Document deduplication tools using pyspark. (#290) * init spark dedup * add doc strings and Readme.md * remove commented debug line * fix typo * fix comments * add readme_zh, add more config * fix typos * fix typo * fix for efficiency --------- Tested on larger scale --- tools/distributed_deduplication/README.md | 44 ++++++++ tools/distributed_deduplication/README_ZH.md | 35 ++++++ tools/distributed_deduplication/__init__.py | 0 .../distributed_deduplication/dedup_utils.py | 103 ++++++++++++++++++ .../distributed_deduplication/spark_dedup.py | 94 ++++++++++++++++ 5 files changed, 276 insertions(+) create mode 100644 tools/distributed_deduplication/README.md create mode 100644 tools/distributed_deduplication/README_ZH.md create mode 100644 tools/distributed_deduplication/__init__.py create mode 100644 tools/distributed_deduplication/dedup_utils.py create mode 100644 tools/distributed_deduplication/spark_dedup.py diff --git a/tools/distributed_deduplication/README.md b/tools/distributed_deduplication/README.md new file mode 100644 index 000000000..23464af32 --- /dev/null +++ b/tools/distributed_deduplication/README.md @@ -0,0 +1,44 @@ +# Distributed Fuzzy Deduplication Tools + +Help you reproduce and apply fuzzy deduplication to your web datasets similar to GPT-3 paper. + +**The General Description about Fuzzy Deduplication**: + +The fuzzy deduplication method here mainly refer to the fuzzy deduplication method mentioned in the Appendix A of [GPT-3 paper](https://arxiv.org/pdf/2005.14165.pdf). + +> To further improve model quality and prevent overfitting (which becomes increasingly important as model capacity increases), we fuzzily deduplicated documents (i.e. removed documents with high overlap with other documents) within each dataset using Spark’s MinHashLSH implementation with 10 hashes, using **the same features as were used for classification above**. We also fuzzily removed WebText from Common Crawl. Overall this decreased dataset size by an average of 10%. + +As the paper mentioned, the features used are the same as were used for quality classification, as described in [quality_classifier tools](../quality_classifier/README.md). + +The whole toolkit is based on PySpark. + +- tokenizer: Since the standard tokenizer of pyspark have trouble tokenizing text in languages such as Chinese, the [standard Tokenizer](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.Tokenizer.html#tokenizer) of PySpark or [sentencepiece](https://github.com/google/sentencepiece) model are used. +- feature extractor: [HashingTF](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.HashingTF.html) +- minhashLSH: [MinHashLSH](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.MinHashLSH.html) + + +## Usage + +Use `spark_dedup.py` to fuzzily deduplicate documents. + +```shell +python spark_dedup.py \ + \ + \ + [--tokenizer ] \ + [--num_features ] \ + [--num_hashtables ] \ + [--text_key ] \ + [--master_url ] + +# print the usage message +python spark_dedup.py --help +``` + +- `dataset_path`: the input dataset path. The suffix of the path should be one of the `[json, jsonl, parquet]`. +- `result_path`: the path to store the dataset with prediction results. The suffix of the path should be one of the `[json, jsonl, parquet]`. +- `tokenizer`: (Optional. Default: None) the tokenizer to tokenize texts to be classified. If it's None, the [standard Tokenizer](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.Tokenizer.html#tokenizer) of PySpark will be used. Besides, you can use one of the tokenizers we provide `[zh.sp.model, code.sp.model]`. Or you can set it to a path to your own [sentencepiece](https://github.com/google/sentencepiece) model. +- `num_features`: the number of features that HashingTF generates. Default with 1047576 as mentioned in megatron-turing-nlg paper. +- `num_hashtables`: (Optional. Default: 10) the number of hashes used in MinHashLSH. Default with 10 hashes as mentioned in the GPT3 paper. +- `text_key`: (Optional. Default: "text") the field name to store texts to be classified in the input dataset. +- `master_url`: (Optional. Default: None) the master url for spark config. If None, then run with "local[*]" diff --git a/tools/distributed_deduplication/README_ZH.md b/tools/distributed_deduplication/README_ZH.md new file mode 100644 index 000000000..6a74006e7 --- /dev/null +++ b/tools/distributed_deduplication/README_ZH.md @@ -0,0 +1,35 @@ +# 分布式模糊去重工具 +复现与GPT-3论文相似的模糊去重方法并应用到您的Web数据集。 + +**模糊去重的一般描述**: +这里的模糊去重方法主要指的是 [GPT-3论文](https://arxiv.org/pdf/2005.14165.pdf)附录A中提到的模糊去重方法。 +> 为了进一步提高模型质量并防止过拟合(随着模型容量的增加越来越重要),我们使用Spark的MinHashLSH实现对每个数据集中的文档进行了模糊去重(即移除了与其他文档高度重合的文档),使用了10个哈希,使用的**特征与上面用于分类的特征相同**。我们还从Common Crawl中模糊移除了WebText。总体而言,这使数据集的大小平均减少了10%。 +正如论文中提到的,使用的特征与前文描述的质量分类器([quality_classifier tools](../quality_classifier/README.md))中所用的一致。 +整个工具包基于PySpark。 +- 分词器:由于pyspark的标准分词器无法很好地处理中文等语言的文本,所以使用了PySpark的[标准分词器](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.Tokenizer.html#tokenizer)或[sentencepiece](https://github.com/google/sentencepiece)模型。 +- 特征提取器:[HashingTF](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.HashingTF.html) +- minhashLSH:[MinHashLSH](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.MinHashLSH.html) + +## 使用方法 +使用`spark_dedup.py`对文档进行模糊去重。 +```shell +python spark_dedup.py \ + \ + \ + [--tokenizer ] \ + [--num_features ] \ + [--num_hashtables ] \ + [--text_key ] \ + [--master_url ] +# 打印使用信息 +python spark_dedup.py --help + +``` + +- `dataset_path`:输入数据集路径。路径的后缀应该是`[json, jsonl, parquet]`中的一个。 +- `result_path`:存储带有预测结果数据集的路径。路径的后缀应该是`[json, jsonl, parquet]`中的一个。 +- `tokenizer`:(可选。默认值:None)用于对将要分类的文本进行分词的分词器。如果为None,将使用PySpark的[标准分词器](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.Tokenizer.html#tokenizer)。此外,你可以使用我们提供的分词器`[zh.sp.model, code.sp.model]`中的一个,或者你可以将其设置为你自己的[sentencepiece](https://github.com/google/sentencepiece)模型的路径。 +- `num_features`:HashingTF生成的特征数量。默认值为1047576,如megatron-turing-nlg论文中所述。 +- `num_hashtables`:(可选。默认值:10)MinHashLSH中使用的哈希数量。默认使用10个哈希,如GPT-3论文中所述。 +- `text_key`:(可选。默认值:"text")输入数据集中用于存储待分类文本的字段名称。 +- `master_url`:(可选。默认值:None)用于Spark配置的master URL。如果为空,则默认运行在"local[*]"模式下。 diff --git a/tools/distributed_deduplication/__init__.py b/tools/distributed_deduplication/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tools/distributed_deduplication/dedup_utils.py b/tools/distributed_deduplication/dedup_utils.py new file mode 100644 index 000000000..25c46d4ed --- /dev/null +++ b/tools/distributed_deduplication/dedup_utils.py @@ -0,0 +1,103 @@ +# The Star-Graph-Connected-Components (SGCC) algorithm here referenced from: +# https://github.com/bigcode-project/bigcode-dataset/blob/main/near_deduplication/minhash_deduplication_spark.py +# -------------------------------------------------------- + +from typing import List, Tuple, Union + +from loguru import logger +from pyspark import SparkConf +from pyspark.sql import SparkSession + + +def init_spark(master_url: Union[str, None] = None, + spark_executor_memory=None, + spark_driver_memory=None, + spark_executor_memoryOverhead=None): + if not spark_executor_memory: + spark_executor_memory = '64g' + if not spark_driver_memory: + spark_driver_memory = '64g' + if not spark_executor_memoryOverhead: + spark_executor_memoryOverhead = '20000' + if not master_url: + master_url = 'local[*]' + conf = SparkConf() + conf.set('spark.app.name', 'MinHashLSH') + conf.set('spark.debug.maxToStringFields', '100') + conf.set('spark.master', master_url) + conf.set('spark.executor.memory', spark_executor_memory) + conf.set('spark.driver.memory', spark_driver_memory) + conf.set('spark.sql.execution.arrow.pyspark.enabled', 'true') + conf.set('spark.executor.memoryOverhead', spark_executor_memoryOverhead) + spark = SparkSession.builder.config(conf=conf).getOrCreate() + logger.info('Spark initialization done.') + return spark + + +def generate_edges(nodes: List[int]) -> List[Tuple[int, int]]: + """ + Generate edges from a cluster. Instead of generating N^2 edges, + we only need all nodes align to a single node, + since we will be running connected components on the edges later. + + Parameters + ---------- + nodes : List[int] + The list of nodes in the cluster. + + Returns + ------- + List[Tuple[int, int]] + The list of edges. + """ + if len(nodes) <= 1: + return [] + + min_node = min(nodes) + return [(n, min_node) for n in nodes if n != min_node] + + +# Connected Components in MapReduce and Beyond +def large_star_map(edge): + return [(edge[0], edge[1]), (edge[1], edge[0])] + + +def large_star_reduce(group): + x, neighbors = group + nodes = [x] + list(neighbors) + minimum = min(nodes) + return [(n, minimum) for n in nodes if n > x] + + +def small_star_map(edge): + x, y = edge + if y <= x: + return (x, y) + else: + return (y, x) + + +def small_star_reduce(group): + x, neighbors = group + nodes = [x] + list(neighbors) + minimum = min(nodes) + return [(n, minimum) for n in nodes if n != minimum] + + +def find_components(edges): + """ + Star-Graph-Connected-Components (SGCC) algorithm + """ + + a = edges + while True: + b = a.flatMap(large_star_map).groupByKey().flatMap( + large_star_reduce).distinct().cache() + a = b.map(small_star_map).groupByKey().flatMap( + small_star_reduce).distinct().cache() + changes = a.subtract(b).union(b.subtract(a)).collect() + if len(changes) == 0: + break + + results = a.collect() + return results diff --git a/tools/distributed_deduplication/spark_dedup.py b/tools/distributed_deduplication/spark_dedup.py new file mode 100644 index 000000000..871f1811b --- /dev/null +++ b/tools/distributed_deduplication/spark_dedup.py @@ -0,0 +1,94 @@ +import sys +import time +from typing import Union + +import fire +from loguru import logger +from pyspark.ml.feature import HashingTF, MinHashLSH, Tokenizer +from pyspark.sql import functions as F +from pyspark.sql.functions import posexplode + +from tools.distributed_deduplication.dedup_utils import (find_components, + generate_edges, + init_spark) +from tools.quality_classifier.qc_utils import (export_result, load_dataset, + tokenize_dataset) + + +@logger.catch +def dedup_dataset(dataset_path: str, + result_path: str, + tokenizer: Union[str, None] = None, + num_features: int = 1047576, + num_hashtables: int = 10, + text_key: str = 'text', + master_url: Union[str, None] = None): + """ + Perform fuzzy text deduplication on the given dataset. + :param dataset_path: the path to the dataset to perform deduplication, + The suffix of the path should be one of the json, jsonl, parquet. + :param result_path: the path to store the predicted result dataset. + :param tokenizer: what tokenizer to use to tokenize texts. It's None in + default, which means using the standard Tokenizer of PySpark. You can + use one of ["zh.sp.model", "code.sp.model"] we provided, or you can set + it to the path to your own sentencepiece model. + :param num_features: the number of features that HashingTF generates. + Default with 1047576 as mentioned in megatron-turing-nlg paper. + :param num_hashtables: the number of hashes used in MinHashLSH. + Default with 10 hashes as mentioned in the GPT3 paper. + :param text_key: the field key name to hold texts to be classified. It's + "text" in default. + :param master_url: the master url for spark config. Default is None. + If None, then run with local[*]. + """ + # for inited cluster, + # provide master url such as "spark://master:7077" + spark = init_spark(master_url=master_url) + ds = load_dataset(spark, dataset_path, text_key=text_key) + ds = ds.withColumn('id', F.monotonically_increasing_id()).cache() + df = ds + + if tokenizer: + ds = tokenize_dataset(ds, tokenizer) + else: + ds = Tokenizer(inputCol='text', outputCol='words').transform(ds) + + hashingTF = HashingTF(inputCol='words', + outputCol='features', + numFeatures=num_features) + ds = hashingTF.transform(ds) + + minHash = MinHashLSH(inputCol='features', + outputCol='hashes', + numHashTables=num_hashtables) + model = minHash.fit(ds) + + ds = model.transform(ds) + + ds = ds.select('id', posexplode('hashes').alias('band_idx', 'hash_vector')) + + record = ds.rdd.map(lambda x: + (x['band_idx'], int(x['hash_vector'][0]), x['id'])) + + edges = (record.groupBy(lambda x: (x[0], x[1])).flatMap( + lambda x: generate_edges([i[2] for i in x[1]])).distinct().cache()) + + results = find_components(edges) + if len(results) == 0: + logger.info('No components found.') + sys.exit(0) + + components = spark.createDataFrame(results, + schema=['id', 'component' + ]).sort(['component', 'id']) + components.show() + df = df.join(components, on='id', how='left') + df = df.filter(F.col('component').isNull()).drop('id', 'component').cache() + export_result(df, result_path) + + +if __name__ == '__main__': + stime = time.time() + fire.Fire(dedup_dataset) + etime = time.time() + logger.info(f'Execution Done, Total time {etime - stime}')