Skip to content

Commit

Permalink
Merge branch 'main' into fix_availablity_checking
Browse files Browse the repository at this point in the history
  • Loading branch information
garyzhang99 committed Apr 17, 2024
2 parents 9ae437d + c1a8aa8 commit 03d6c9b
Show file tree
Hide file tree
Showing 9 changed files with 333 additions and 4 deletions.
1 change: 1 addition & 0 deletions .github/workflows/unit-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ jobs:
df -h
- name: Install dependencies
run: |
sudo apt-get install ffmpeg
python -m pip install --upgrade pip
pip install -v -e .[all]
- name: Increase swapfile
Expand Down
15 changes: 12 additions & 3 deletions data_juicer/utils/mm_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,8 @@ def process_each_frame(input_video: Union[str, av.container.InputContainer],

def extract_key_frames(input_video: Union[str, av.container.InputContainer]):
"""
Extract key frames from the input video.
Extract key frames from the input video. If there is no keyframes in the
video, return the first frame.
:param input_video: input video path or container.
:return: a list of key frames.
Expand All @@ -414,12 +415,20 @@ def extract_key_frames(input_video: Union[str, av.container.InputContainer]):
ori_skip_method = input_video_stream.codec_context.skip_frame
input_video_stream.codec_context.skip_frame = 'NONKEY'
# restore to the beginning of the video
container.seek(0, backward=False, any_frame=False)
container.seek(0)
for frame in container.decode(input_video_stream):
key_frames.append(frame)
# restore to the original skip_type
input_video_stream.codec_context.skip_frame = ori_skip_method

if len(key_frames) == 0:
logger.warning(f'No keyframes in this video [{input_video}]. Return '
f'the first frame instead.')
container.seek(0)
for frame in container.decode(input_video_stream):
key_frames.append(frame)
break

if isinstance(input_video, str):
container.close()
return key_frames
Expand Down Expand Up @@ -514,7 +523,7 @@ def extract_video_frames_uniformly(
continue
if key_frame_second == 0.0:
# search from the beginning
container.seek(0, backward=False, any_frame=True)
container.seek(0)
search_idx = 0
curr_pts = second_group[search_idx] / time_base
for frame in container.decode(input_video_stream):
Expand Down
43 changes: 43 additions & 0 deletions tests/config/test_config_funcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ def test_yaml_cfg_file(self):
'text_key': 'text',
'image_key': 'images',
'audio_key': 'audios',
'video_key': 'videos',
'accelerator': 'cpu',
'spec_numprocs': 0,
'cpu_required': 1,
'mem_required': 0,
'use_actor': False,
'batched_op': False,
}
}, 'nested dict load fail, for nonparametric op')
self.assertDictEqual(
Expand All @@ -51,6 +58,12 @@ def test_yaml_cfg_file(self):
'text_key': 'text',
'image_key': 'images',
'audio_key': 'audios',
'video_key': 'videos',
'accelerator': 'cpu',
'spec_numprocs': 0,
'cpu_required': 1,
'mem_required': 0,
'use_actor': False,
}
}, 'nested dict load fail, un-expected internal value')

Expand Down Expand Up @@ -83,6 +96,12 @@ def test_mixture_cfg(self):
'text_key': 'text',
'image_key': 'images',
'audio_key': 'audios',
'video_key': 'videos',
'accelerator': 'cpu',
'spec_numprocs': 0,
'cpu_required': 1,
'mem_required': 0,
'use_actor': False,
}
})
self.assertDictEqual(
Expand All @@ -93,6 +112,12 @@ def test_mixture_cfg(self):
'text_key': 'text',
'image_key': 'images',
'audio_key': 'audios',
'video_key': 'videos',
'accelerator': 'cpu',
'spec_numprocs': 0,
'cpu_required': 1,
'mem_required': 0,
'use_actor': False,
}
})
self.assertDictEqual(
Expand All @@ -103,6 +128,12 @@ def test_mixture_cfg(self):
'text_key': 'text',
'image_key': 'images',
'audio_key': 'audios',
'video_key': 'videos',
'accelerator': 'cpu',
'spec_numprocs': 0,
'cpu_required': 1,
'mem_required': 0,
'use_actor': False,
}
})
self.assertDictEqual(
Expand All @@ -113,6 +144,12 @@ def test_mixture_cfg(self):
'text_key': 'text',
'image_key': 'images',
'audio_key': 'audios',
'video_key': 'videos',
'accelerator': 'cpu',
'spec_numprocs': 0,
'cpu_required': 1,
'mem_required': 0,
'use_actor': False,
}
})
self.assertDictEqual(
Expand All @@ -123,6 +160,12 @@ def test_mixture_cfg(self):
'text_key': 'text',
'image_key': 'images',
'audio_key': 'audios',
'video_key': 'videos',
'accelerator': 'cpu',
'spec_numprocs': 0,
'cpu_required': 1,
'mem_required': 0,
'use_actor': False,
}
})

Expand Down
2 changes: 1 addition & 1 deletion tests/ops/mapper/test_image_diffusion_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def test_for_fp16_given_caption_string(self):
dataset = NestedDataset.from_list(ds_list)
op = ImageDiffusionMapper(hf_diffusion=self.hf_diffusion,
torch_dtype='fp16',
revision='fp16'
revision='fp16',
aug_num=aug_num,
keep_original_sample=False,
caption_key='text')
Expand Down
44 changes: 44 additions & 0 deletions tools/distributed_deduplication/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Distributed Fuzzy Deduplication Tools

Help you reproduce and apply fuzzy deduplication to your web datasets similar to GPT-3 paper.

**The General Description about Fuzzy Deduplication**:

The fuzzy deduplication method here mainly refer to the fuzzy deduplication method mentioned in the Appendix A of [GPT-3 paper](https://arxiv.org/pdf/2005.14165.pdf).

> To further improve model quality and prevent overfitting (which becomes increasingly important as model capacity increases), we fuzzily deduplicated documents (i.e. removed documents with high overlap with other documents) within each dataset using Spark’s MinHashLSH implementation with 10 hashes, using **the same features as were used for classification above**. We also fuzzily removed WebText from Common Crawl. Overall this decreased dataset size by an average of 10%.
As the paper mentioned, the features used are the same as were used for quality classification, as described in [quality_classifier tools](../quality_classifier/README.md).

The whole toolkit is based on PySpark.

- tokenizer: Since the standard tokenizer of pyspark have trouble tokenizing text in languages such as Chinese, the [standard Tokenizer](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.Tokenizer.html#tokenizer) of PySpark or [sentencepiece](https://github.com/google/sentencepiece) model are used.
- feature extractor: [HashingTF](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.HashingTF.html)
- minhashLSH: [MinHashLSH](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.MinHashLSH.html)


## Usage

Use `spark_dedup.py` to fuzzily deduplicate documents.

```shell
python spark_dedup.py \
<dataset_path> \
<result_path> \
[--tokenizer <tokenizer_type>] \
[--num_features <num_features>] \
[--num_hashtables <num_hashtables>] \
[--text_key <text_key>] \
[--master_url <master_url>]

# print the usage message
python spark_dedup.py --help
```

- `dataset_path`: the input dataset path. The suffix of the path should be one of the `[json, jsonl, parquet]`.
- `result_path`: the path to store the dataset with prediction results. The suffix of the path should be one of the `[json, jsonl, parquet]`.
- `tokenizer`: (Optional. Default: None) the tokenizer to tokenize texts to be classified. If it's None, the [standard Tokenizer](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.Tokenizer.html#tokenizer) of PySpark will be used. Besides, you can use one of the tokenizers we provide `[zh.sp.model, code.sp.model]`. Or you can set it to a path to your own [sentencepiece](https://github.com/google/sentencepiece) model.
- `num_features`: the number of features that HashingTF generates. Default with 1047576 as mentioned in megatron-turing-nlg paper.
- `num_hashtables`: (Optional. Default: 10) the number of hashes used in MinHashLSH. Default with 10 hashes as mentioned in the GPT3 paper.
- `text_key`: (Optional. Default: "text") the field name to store texts to be classified in the input dataset.
- `master_url`: (Optional. Default: None) the master url for spark config. If None, then run with "local[*]"
35 changes: 35 additions & 0 deletions tools/distributed_deduplication/README_ZH.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# 分布式模糊去重工具
复现与GPT-3论文相似的模糊去重方法并应用到您的Web数据集。

**模糊去重的一般描述**
这里的模糊去重方法主要指的是 [GPT-3论文](https://arxiv.org/pdf/2005.14165.pdf)附录A中提到的模糊去重方法。
> 为了进一步提高模型质量并防止过拟合(随着模型容量的增加越来越重要),我们使用Spark的MinHashLSH实现对每个数据集中的文档进行了模糊去重(即移除了与其他文档高度重合的文档),使用了10个哈希,使用的**特征与上面用于分类的特征相同**。我们还从Common Crawl中模糊移除了WebText。总体而言,这使数据集的大小平均减少了10%。
正如论文中提到的,使用的特征与前文描述的质量分类器([quality_classifier tools](../quality_classifier/README.md))中所用的一致。
整个工具包基于PySpark。
- 分词器:由于pyspark的标准分词器无法很好地处理中文等语言的文本,所以使用了PySpark的[标准分词器](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.Tokenizer.html#tokenizer)[sentencepiece](https://github.com/google/sentencepiece)模型。
- 特征提取器:[HashingTF](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.HashingTF.html)
- minhashLSH:[MinHashLSH](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.MinHashLSH.html)

## 使用方法
使用`spark_dedup.py`对文档进行模糊去重。
```shell
python spark_dedup.py \
<dataset_path> \
<result_path> \
[--tokenizer <tokenizer_type>] \
[--num_features <num_features>] \
[--num_hashtables <num_hashtables>] \
[--text_key <text_key>] \
[--master_url <master_url>]
# 打印使用信息
python spark_dedup.py --help

```

- `dataset_path`:输入数据集路径。路径的后缀应该是`[json, jsonl, parquet]`中的一个。
- `result_path`:存储带有预测结果数据集的路径。路径的后缀应该是`[json, jsonl, parquet]`中的一个。
- `tokenizer`:(可选。默认值:None)用于对将要分类的文本进行分词的分词器。如果为None,将使用PySpark的[标准分词器](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.Tokenizer.html#tokenizer)。此外,你可以使用我们提供的分词器`[zh.sp.model, code.sp.model]`中的一个,或者你可以将其设置为你自己的[sentencepiece](https://github.com/google/sentencepiece)模型的路径。
- `num_features`:HashingTF生成的特征数量。默认值为1047576,如megatron-turing-nlg论文中所述。
- `num_hashtables`:(可选。默认值:10)MinHashLSH中使用的哈希数量。默认使用10个哈希,如GPT-3论文中所述。
- `text_key`:(可选。默认值:"text")输入数据集中用于存储待分类文本的字段名称。
- `master_url`:(可选。默认值:None)用于Spark配置的master URL。如果为空,则默认运行在"local[*]"模式下。
Empty file.
103 changes: 103 additions & 0 deletions tools/distributed_deduplication/dedup_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# The Star-Graph-Connected-Components (SGCC) algorithm here referenced from:
# https://github.com/bigcode-project/bigcode-dataset/blob/main/near_deduplication/minhash_deduplication_spark.py
# --------------------------------------------------------

from typing import List, Tuple, Union

from loguru import logger
from pyspark import SparkConf
from pyspark.sql import SparkSession


def init_spark(master_url: Union[str, None] = None,
spark_executor_memory=None,
spark_driver_memory=None,
spark_executor_memoryOverhead=None):
if not spark_executor_memory:
spark_executor_memory = '64g'
if not spark_driver_memory:
spark_driver_memory = '64g'
if not spark_executor_memoryOverhead:
spark_executor_memoryOverhead = '20000'
if not master_url:
master_url = 'local[*]'
conf = SparkConf()
conf.set('spark.app.name', 'MinHashLSH')
conf.set('spark.debug.maxToStringFields', '100')
conf.set('spark.master', master_url)
conf.set('spark.executor.memory', spark_executor_memory)
conf.set('spark.driver.memory', spark_driver_memory)
conf.set('spark.sql.execution.arrow.pyspark.enabled', 'true')
conf.set('spark.executor.memoryOverhead', spark_executor_memoryOverhead)
spark = SparkSession.builder.config(conf=conf).getOrCreate()
logger.info('Spark initialization done.')
return spark


def generate_edges(nodes: List[int]) -> List[Tuple[int, int]]:
"""
Generate edges from a cluster. Instead of generating N^2 edges,
we only need all nodes align to a single node,
since we will be running connected components on the edges later.
Parameters
----------
nodes : List[int]
The list of nodes in the cluster.
Returns
-------
List[Tuple[int, int]]
The list of edges.
"""
if len(nodes) <= 1:
return []

min_node = min(nodes)
return [(n, min_node) for n in nodes if n != min_node]


# Connected Components in MapReduce and Beyond
def large_star_map(edge):
return [(edge[0], edge[1]), (edge[1], edge[0])]


def large_star_reduce(group):
x, neighbors = group
nodes = [x] + list(neighbors)
minimum = min(nodes)
return [(n, minimum) for n in nodes if n > x]


def small_star_map(edge):
x, y = edge
if y <= x:
return (x, y)
else:
return (y, x)


def small_star_reduce(group):
x, neighbors = group
nodes = [x] + list(neighbors)
minimum = min(nodes)
return [(n, minimum) for n in nodes if n != minimum]


def find_components(edges):
"""
Star-Graph-Connected-Components (SGCC) algorithm
"""

a = edges
while True:
b = a.flatMap(large_star_map).groupByKey().flatMap(
large_star_reduce).distinct().cache()
a = b.map(small_star_map).groupByKey().flatMap(
small_star_reduce).distinct().cache()
changes = a.subtract(b).union(b.subtract(a)).collect()
if len(changes) == 0:
break

results = a.collect()
return results
Loading

0 comments on commit 03d6c9b

Please sign in to comment.