Skip to content

Commit

Permalink
Merge branch 'main' into swissdox
Browse files Browse the repository at this point in the history
  • Loading branch information
jderiu authored Jun 19, 2024
2 parents 9fbab55 + d7a798a commit f423321
Show file tree
Hide file tree
Showing 97 changed files with 5,131 additions and 935 deletions.
61 changes: 61 additions & 0 deletions .github/workflows/pypi-release.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
name: PyPI release
on:
workflow_dispatch:

jobs:
testing:
uses: ./.github/workflows/testing.yml
release:
needs: testing
runs-on: ubuntu-latest
env:
TWINE_USERNAME: __token__

steps:
- name: Checkout Repo
uses: actions/checkout@v3

- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: "3.10"

- name: Install build dependencies
run: |
python -m pip install --upgrade pip
pip install -U twine build
- name: Build the dist files
run: python -m build .

- name: Publish to the test PyPI
env:
TWINE_PASSWORD: ${{ secrets.TEST_PYPI_TOKEN }}
run: twine upload dist/* --repository=testpypi

- name: Test installing from test PyPI and running tests
run: |
pip install -i https://testpypi.python.org/pypi --extra-index-url https://pypi.org/simple datatrove[testing]
python -m nltk.downloader punkt
make test
- name: Get tag name
id: get_tag_name
run: |
echo TAG_NAME=$(grep '^version' pyproject.toml | head -1 | cut -d '"' -f 2) >> $GITHUB_OUTPUT
- name: Tag the release
uses: actions/github-script@v7
with:
script: |
github.rest.git.createRef({
owner: context.repo.owner,
repo: context.repo.repo,
ref: 'refs/tags/v${{ steps.get_tag_name.outputs.TAG_NAME }}',
sha: context.sha
})
- name: Publish to PyPI
env:
TWINE_PASSWORD: ${{ secrets.PYPI_TOKEN }}
run: twine upload dist/* --repository=pypi
15 changes: 8 additions & 7 deletions .github/workflows/ci.yml → .github/workflows/testing.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: CI
name: Test & Check Code Quality

on:
pull_request:
Expand All @@ -7,6 +7,7 @@ on:
push:
branches:
- main
workflow_call:

jobs:
check_code_quality:
Expand All @@ -19,12 +20,12 @@ jobs:
python-version: "3.10"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install .[quality]
python -m pip install uv
uv pip install --system .[quality]
- name: Check quality
run: |
ruff check tests src # linter
ruff format --check tests src # formatter
ruff check tests src examples # linter
ruff format --check tests src examples # formatter
test:
runs-on: ubuntu-latest
Expand All @@ -40,8 +41,8 @@ jobs:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install .[testing]
python -m pip install uv
uv pip install --system .[testing]
python -m nltk.downloader punkt
- name: Test with pytest
run: |
Expand Down
15 changes: 15 additions & 0 deletions .github/workflows/trufflehog.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
on:
push:

name: Secret Leaks

jobs:
trufflehog:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Secret Scanning
uses: trufflesecurity/trufflehog@main
2 changes: 2 additions & 0 deletions CITATION.cff
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ type: software
authors:
- given-names: Guilherme
family-names: Penedo
- given-names: Hynek
family-names: Kydlíček
- given-names: Alessandro
family-names: Cappelli
- given-names: Thomas
Expand Down
51 changes: 49 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ Local, remote and other file systems are supported through [fsspec](https://file
* [Filtering data](#filtering-data)
* [Saving data](#saving-data)
* [Deduplicating data](#deduplicating-data)
* [Summary Statistics](#summary-statistics)
* [Custom blocks](#custom-blocks)
+ [Simple data](#simple-data)
+ [Custom function](#custom-function)
Expand Down Expand Up @@ -96,6 +97,7 @@ Some options common to all executors:
- `pipeline` a list consisting of the pipeline steps that should be run
- `logging_dir` a datafolder where log files, statistics and more should be saved. Do not reuse folders for different pipelines/jobs as this will overwrite your stats, logs and completions.
- `skip_completed` (_bool_, `True` by default) datatrove keeps track of completed tasks so that when you relaunch a job they can be skipped. Set this to `False` to disable this behaviour
- `randomize_start_duration` (_int_, `0` by default) the maximum number of seconds to delay the start of each task to prevent all tasks from starting simultaneously and potentially overloading the system.

Call an executor's `run` method to execute its pipeline.

Expand Down Expand Up @@ -223,6 +225,12 @@ For a pipeline with `logging_dir` **mylogspath/exp1**, the following folder stru
```
</details>

### Colorization
Log messages support colorization. By default, colorization will be auto detected for console messages and disabled for log files (logs/task_XXXXX.log).
To explicitly enable or disable colorization, you may set the following environment variables:
- `DATATROVE_COLORIZE_LOGS` "1" to add ANSI colors to console log messages and "0" to disable colorization.
- `DATATROVE_COLORIZE_LOG_FILES` set to "1" to add ANSI colors to log messages saved to logs/task_XXXXX.log.

## DataFolder / paths
Datatrove supports a wide variety of input/output sources through [fsspec](https://filesystem-spec.readthedocs.io/en/latest/).

Expand Down Expand Up @@ -279,6 +287,45 @@ JsonlWriter(
### Deduplicating data
For deduplication check the examples [minhash_deduplication.py](examples/minhash_deduplication.py), [sentence_deduplication.py](examples/sentence_deduplication.py) and [exact_substrings.py](examples/exact_substrings.py).

### Summary Statistics
For summary statistics on your data you can use the [Stats](src/datatrove/pipeline/stats/summary_stats/) blocks. These blocks provide an easy way to collect data-profiles on your dataset in a distributed manner. It's a two step process in which you first:
1) For each shard iterate over documents and collect stats into of the following groupings `summary` (all docs counted to "summary" key), `fqdn` (fully qualified domain name grouping), `suffix` (the last part of the url path grouping) or `histogram` (value based grouping).
2) Merge the stats from different shards into a single file.
See the [summary_stats.py](examples/summarty_stats.py) for more details.

Each resulting stat is saved in a separate file with following structure: `output_folder/{fqdn,suffix,summary,histogram}/{stat_name}/metric.json`

Each such file is a `MetricStatsDict` object, which you can easily load using:
```python
from datatrove.pipeline.stats.summary_stats import MetricStatsDict
import json
stats = MetricStatsDict.from_dict(json.load(open("fqdn/length/metric.json")))

# E.g for total length of nytimes.com docs
stats["nytimes.com"].total

# Or for mean of cnn.com docs
stats["cnn.com"].mean
```

Following stats are available:
- `contamination_stats.py`: `word_contamination_{words[0]}: Frequency of words contamination in the document.
- `doc_stats.py`: `length`: Length of the document, `white_space_ratio`: Ratio of whitespace characters, `non_alpha_digit_ratio`: Ratio of non-alphabetic and non-digit characters, `digit_ratio`: Ratio of digits, `uppercase_ratio`: Ratio of uppercase letters, `elipsis_ratio`: Ratio of elipsis characters, `punctuation_ratio`: Punctuation ratio
- `lang_stats.py`: `fasttext_{language}`: Language of the document using fastText
- `line_stats.py`: `n_lines`: Number of lines per doc, `avg_line_length`: Average length of line per doc, `long_line_ratio_words`: Ratio of lines with more than k chars, `short_line_ratio_chars`: Ratio of lines with more than k chars, `bullet_point_lines_ratio`: Ratio of bullet points, `line_duplicates`: Ratio of lines that are duplicates, `line_char_duplicates`: Ratio of chars in duplicated lines
- `paragraph_stats.py`: `n_paragraphs`: Number of paragraphs, `avg_paragraph_length`: Average paragraph length, `short_paragraph_ratio_{chars}`: Ratio of short paragraphs (<{chars} chars), `long_paragraph_ratio_{chars}`: Ratio of long paragraphs (>{chars} chars)
- `perplexity_stats.py`: `ccnet_perplexity_{model_dataset}_{language}`: Perplexity of the document using the CCNet model for {model} on {dataset} in {language}
- `sentence_stats.py`: `n_sentences`: Number of sentences, `avg_sentence_length`: Average sentence length, `short_sentence_ratio_{chars}`: Ratio of short sentences (<{chars} chars), `long_sentence_ratio_{chars}`: Ratio of long sentences (>{chars} chars)
- `token_stats.py`:`token_count`: Number of tokens in the document
- `word_stats.py`: `n_words`: Number of words in the document, `avg_word_length`: Average length of words in the document, `avg_words_per_line`: Average number of words per line in the document, `short_word_ratio_{chars}`: Ratio of words shorter than {chars} characters, `stop_word_ratio`: Ratio of stop words, `long_word_ratio_{chars}`: Ratio of words longer than {chars} characters, `type_token_ratio`: Number of unique words / Number of tokens, `capitalized_word_ratio`: Ratio of capitalized words, `uppercase_word_ratio`: Ratio of uppercase words








### Custom blocks

#### Simple data
Expand Down Expand Up @@ -405,11 +452,11 @@ pytest -sv ./tests/

```bibtex
@misc{penedo2024datatrove,
author = {Penedo, Guilherme and Cappelli, Alessandro and Wolf, Thomas and Sasko, Mario},
author = {Penedo, Guilherme and Kydlíček, Hynek and Cappelli, Alessandro and Sasko, Mario and Wolf, Thomas},
title = {DataTrove: large scale data processing},
year = {2024},
publisher = {GitHub},
journal = {GitHub repository},
url = {https://github.com/huggingface/datatrove}
}
```
```
176 changes: 176 additions & 0 deletions examples/fineweb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
"""
This file contains the code used to process and create the
FineWeb dataset (https://huggingface.co/datasets/HuggingFaceFW/fineweb)
"""

from datatrove.executor.slurm import SlurmPipelineExecutor
from datatrove.pipeline.dedup import MinhashDedupCluster, MinhashDedupFilter, MinhashDedupSignature
from datatrove.pipeline.dedup.minhash import MinhashConfig, MinhashDedupBuckets
from datatrove.pipeline.extractors import Trafilatura
from datatrove.pipeline.filters import (
C4QualityFilter,
FineWebQualityFilter,
GopherQualityFilter,
GopherRepetitionFilter,
LanguageFilter,
URLFilter,
)
from datatrove.pipeline.formatters import PIIFormatter
from datatrove.pipeline.readers import JsonlReader, WarcReader
from datatrove.pipeline.tokens import TokensCounter
from datatrove.pipeline.writers.jsonl import JsonlWriter


"""
we first ran the following pipeline for each dump
"""
DUMP_TO_PROCESS = "CC-MAIN-2023-50" # example

MAIN_OUTPUT_PATH = "s3://some_s3_bucket"
FILTERING_OUTPUT_PATH = f"{MAIN_OUTPUT_PATH}/base_processing"

main_processing_executor = SlurmPipelineExecutor(
job_name=f"cc_{DUMP_TO_PROCESS}",
pipeline=[
WarcReader(
f"s3://commoncrawl/crawl-data/{DUMP_TO_PROCESS}/segments/",
glob_pattern="*/warc/*", # we want the warc files
default_metadata={"dump": DUMP_TO_PROCESS},
),
URLFilter(exclusion_writer=JsonlWriter(f"{FILTERING_OUTPUT_PATH}/removed/1_url/{DUMP_TO_PROCESS}")),
Trafilatura(favour_precision=True),
LanguageFilter(
exclusion_writer=JsonlWriter(
f"{FILTERING_OUTPUT_PATH}/2_non_english/",
output_filename="${language}/" + DUMP_TO_PROCESS + "/${rank}.jsonl.gz",
# folder structure: language/dump/file
)
),
GopherRepetitionFilter(
exclusion_writer=JsonlWriter(f"{FILTERING_OUTPUT_PATH}/removed/3_gopher_rep/{DUMP_TO_PROCESS}")
),
GopherQualityFilter(
exclusion_writer=JsonlWriter(f"{FILTERING_OUTPUT_PATH}/removed/4_gopher_qual/{DUMP_TO_PROCESS}")
),
C4QualityFilter(
filter_no_terminal_punct=False,
exclusion_writer=JsonlWriter(f"{FILTERING_OUTPUT_PATH}/removed/5_c4/{DUMP_TO_PROCESS}"),
),
FineWebQualityFilter(
exclusion_writer=JsonlWriter(f"{FILTERING_OUTPUT_PATH}/removed/6_fineweb_qual/{DUMP_TO_PROCESS}")
),
JsonlWriter(f"{FILTERING_OUTPUT_PATH}/output/{DUMP_TO_PROCESS}"),
],
tasks=8000,
time="10:00:00",
logging_dir=f"{MAIN_OUTPUT_PATH}/logs/base_processing/{DUMP_TO_PROCESS}",
slurm_logs_folder=f"logs/base_processing/{DUMP_TO_PROCESS}/slurm_logs", # must be local
randomize_start_duration=180, # don't hit the bucket all at once with the list requests
mem_per_cpu_gb=2,
partition="hopper-cpu",
)
main_processing_executor.run()

"""
we then applied minhash deduplication to each individual dump,
"""

# you can also change ngrams or the number of buckets and their size here
minhash_config = MinhashConfig(
use_64bit_hashes=True, # better precision -> fewer false positives (collisions)
num_buckets=14,
hashes_per_bucket=8,
n_grams=5,
)

S3_MINHASH_BASE_PATH = f"{MAIN_OUTPUT_PATH}/minhash"

S3_LOGS_FOLDER = f"{MAIN_OUTPUT_PATH}/logs/minhash"
LOCAL_LOGS_FOLDER = "logs/minhash"

TOTAL_TASKS = 1000

# this is the original data that we want to deduplicate
INPUT_READER = JsonlReader(
f"{FILTERING_OUTPUT_PATH}/output/{DUMP_TO_PROCESS}"
) # this is the output from the first part

# stage 1 computes minhash signatures for each task (each task gets a set of files)
stage1 = SlurmPipelineExecutor(
job_name=f"mh1_{DUMP_TO_PROCESS}",
pipeline=[
INPUT_READER,
MinhashDedupSignature(
output_folder=f"{S3_MINHASH_BASE_PATH}/{DUMP_TO_PROCESS}/signatures", config=minhash_config
),
],
tasks=TOTAL_TASKS,
time="5:00:00",
partition="hopper-cpu",
logging_dir=f"{S3_LOGS_FOLDER}/signatures",
slurm_logs_folder=f"{LOCAL_LOGS_FOLDER}/signatures/slurm_logs",
randomize_start_duration=180,
depends=main_processing_executor, # only start after the first one completes
)

stage2 = SlurmPipelineExecutor(
job_name=f"mh2_{DUMP_TO_PROCESS}",
pipeline=[
MinhashDedupBuckets(
input_folder=f"{S3_MINHASH_BASE_PATH}/{DUMP_TO_PROCESS}/signatures",
output_folder=f"{S3_MINHASH_BASE_PATH}/{DUMP_TO_PROCESS}/buckets",
config=MinhashConfig(use_64bit_hashes=True),
),
],
tasks=minhash_config.num_buckets * 50, # the code supports parallelizing each bucket. here we run 50
# workers per bucket
randomize_start_duration=180,
logging_dir=f"{S3_LOGS_FOLDER}/buckets",
partition="hopper-cpu",
time="02:00:00",
mem_per_cpu_gb=4,
cpus_per_task=3, # you can add run more (smaller) tasks if you do not have a lot of memory
depends=stage1,
)


stage3 = SlurmPipelineExecutor(
job_name=f"mh3_{DUMP_TO_PROCESS}",
pipeline=[
MinhashDedupCluster(
input_folder=f"{S3_MINHASH_BASE_PATH}/{DUMP_TO_PROCESS}/buckets",
output_folder=f"{S3_MINHASH_BASE_PATH}/{DUMP_TO_PROCESS}/remove_ids",
config=minhash_config,
),
],
tasks=1, # this step runs on a single task
logging_dir=f"{S3_LOGS_FOLDER}/clustering",
partition="hopper-cpu",
time="30:00:00", # and can also be quite slow. Usually not this slow though
mem_per_cpu_gb=25,
cpus_per_task=8, # if you dedup a full dump, you do need a lot of memory for this one
depends=stage2,
)


stage4 = SlurmPipelineExecutor(
job_name=f"mh4_{DUMP_TO_PROCESS}",
pipeline=[
INPUT_READER,
TokensCounter(), # you can remove this one, it's just a nice way to know how many tokens we have
# before and after dedup
MinhashDedupFilter(input_folder=f"{S3_MINHASH_BASE_PATH}/{DUMP_TO_PROCESS}/remove_ids"),
# run the PII removal
PIIFormatter(),
JsonlWriter(f"{S3_MINHASH_BASE_PATH}/{DUMP_TO_PROCESS}/deduped_output"),
],
tasks=TOTAL_TASKS,
logging_dir=f"{S3_LOGS_FOLDER}/filtering",
partition="hopper-cpu",
time="5:00:00",
mem_per_cpu_gb=4,
depends=stage3,
)

# launch dedup pipelines
stage4.run()
Loading

0 comments on commit f423321

Please sign in to comment.