From 41138daeca9331acdedaf30e7ec768bebc8498fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hynek=20Kydl=C3=AD=C4=8Dek?= Date: Mon, 25 Mar 2024 15:40:20 +0100 Subject: [PATCH 1/5] nitts for url_dedup --- src/datatrove/pipeline/dedup/url_dedup.py | 394 ++++++++++++++++++++++ tests/pipeline/test_url_deduplication.py | 145 ++++++++ 2 files changed, 539 insertions(+) create mode 100644 src/datatrove/pipeline/dedup/url_dedup.py create mode 100644 tests/pipeline/test_url_deduplication.py diff --git a/src/datatrove/pipeline/dedup/url_dedup.py b/src/datatrove/pipeline/dedup/url_dedup.py new file mode 100644 index 00000000..957a11ad --- /dev/null +++ b/src/datatrove/pipeline/dedup/url_dedup.py @@ -0,0 +1,394 @@ +""" +URL based deduplication. +""" + +import contextlib +import dataclasses +import heapq +import struct +from concurrent.futures import ThreadPoolExecutor +from dataclasses import dataclass, field +from typing import Callable, Generator + +import numpy as np +from fsspec.spec import AbstractBufferedFile +from loguru import logger +from tqdm import tqdm + +from datatrove.data import Document, DocumentsPipeline +from datatrove.io import DataFolderLike, get_datafolder +from datatrove.pipeline.base import PipelineStep +from datatrove.utils.binaryio import read_tuples_from_file +from datatrove.utils.text import sha1_hash64 +from datatrove.utils.typeshelper import ExtensionHelperSD, StatHints + +from ..writers.disk_base import DiskWriter + + +@dataclass +class UrlDedupConfig: + """ + Args: + url_normalizer: Callable[[str], str] Normalize the url, e.g. remove query parameters + document_priority: Callable[[Document], int] + Function for determining the priority of a document. + Only the document with the highest priority will be preserved, out of duplicates. + The document priority must be in range [0, 65535] + """ + + url_normalizer: Callable[[str], str] = ( + lambda x: x + ) # Normalize the url, e.g. remove query parameters + document_priority: Callable[[Document], int] = ( + lambda x: 0 + ) # Urls with higher will be preserved, will be saved as unsigned short! + + +DEFAULT_URL_DEDUP_CONFIG = UrlDedupConfig() + + +@dataclass(order=False) +class HashSig: + hash_value: int + priority: int + doc_id: int + file_id: int + + def is_from_index(self): + return self.doc_id == -1 and self.priority == 1 + + def __lt__(self, other: "HashSig") -> bool: + # Ensure that highest priority is always first of the hashes + return (self.hash_value, -self.priority, self.doc_id) < ( + other.hash_value, + -other.priority, + other.doc_id, + ) + + +class UrlDedupSignature(PipelineStep): + """UrlDedup: First pipeline step + + Creates a signature for url in each document. Each HashSig has n hash, the -priority the doc id. Before saving + them the hashes are sorted. We use negative priority as we want to the highest priority urls to be first in priority queue. + + Args: + output_folder: folder where signatures are saved + """ + + type = "🫂 - DEDUPS" + name = "💥 url-deduplication stage 1" + _requires_dependencies = ["nltk"] + + def __init__( + self, + output_folder: DataFolderLike, + finder_workers: int = 1, + config: UrlDedupConfig = DEFAULT_URL_DEDUP_CONFIG, + language: str = "english", + ): + super().__init__() + self.output_folder = get_datafolder(output_folder) + if finder_workers <= 0: + raise ValueError("finder_workers must be >= 1") + elif finder_workers > 1: + logger.warning( + f"Remember to also set the name of tasks of the finder block to {finder_workers=}!" + ) + self.finder_workers = finder_workers + self.config = config + self.language = language + + def save_hashes(self, rank: int, signatures): + # explicitly define little endiannes + + priority_max = np.iinfo(np.dtype("= 0 and sig[1] < priority_max for sig in signatures + ), f"priority must be between 1 and {priority_max}" + signatures = np.array( + signatures, dtype=[("hash", " left_idx: + signatures[left_idx:right_idx].tofile(f) + left_idx = right_idx + # we've reached the end of our data + if right_idx >= len(signatures): + break + + def get_hashes( + self, doc: Document, doc_idx: int + ) -> list[None] | list[tuple[int, int, int]]: + normalized_url = self.config.url_normalizer(doc.metadata["url"]) + priority = self.config.document_priority(doc) + hashes = [(sha1_hash64(normalized_url.encode("utf-8")), priority, doc_idx)] + + return hashes + + def run(self, data: DocumentsPipeline, rank: int = 0, world_size: int = 1): + """Args: + data + rank + world_size + + Returns: + + UrlDedupSignature creates a signature for each document. Each HashSig has n hash, the priority the doc id. + Before saving them the hashes are sorted. + """ + signatures = [] + for doc_idx, doc in enumerate(data): + with self.stats.time_stats: + self.stat_update(StatHints.total) + signatures.extend(self.get_hashes(doc, doc_idx)) + self.save_hashes(rank, signatures) + + +def read_sigs( + file: AbstractBufferedFile, + file_id: int, + index_file: bool = False, + lines_to_buffer: int = 5, +) -> Generator[HashSig, None, None]: + line_format = "QHI" if not index_file else "Q" + last = None + with file as f: + for data in read_tuples_from_file( + f, line_format, lines_to_buffer=lines_to_buffer + ): + assert ( + last is None or data[0] >= last + ), f"Hash order error. {f.tell()=}, {data[0]=}, {last=}" + last = data[0] + yield ( + HashSig(hash_value=data[0], doc_id=-1, file_id=file_id, priority=-1) + if index_file + else HashSig( + file_id=file_id, + hash_value=data[0], + priority=data[1], + doc_id=data[2], + ) + ) + + +class UrlFindDedups(PipelineStep): + """UrlDedup: Second pipeline step + + UrlFindDedups runs on a single worker. It reads all the signatures from the previous step and loads them + in a priority queue to check for duplicates. If a duplicate is found its document id is saved. + The document with the highest priority is the one that will be saved out of the duplicates . + + Args: + data_folder: data folder where signatures are saved + output_folder: folder where duplicates are saved + index_folder: folder where index files are saved + only_dedup_in_index: only dedup in index + """ + + type = "🫂 - DEDUPS" + name = "💥 url-deduplication stage 2" + + def __init__( + self, + data_folder: DataFolderLike, + output_folder: DataFolderLike, + index_folder: DataFolderLike = None, + config: UrlDedupConfig = DEFAULT_URL_DEDUP_CONFIG, + lines_to_buffer: int = 5, + ): + super().__init__() + self.data_folder = get_datafolder(data_folder) + self.output_folder = get_datafolder(output_folder) + self.index_folder = get_datafolder(index_folder) if index_folder else None + + self.config = config + self.lines_to_buffer = lines_to_buffer + + def run(self, data: DocumentsPipeline = None, rank: int = 0, world_size: int = 1): + with self.stats.time_stats: + if world_size == 1: + # check that there was not a mistake in setting this values + sig_files = self.data_folder.list_files( + glob_pattern="*/*" + ExtensionHelperSD.stage_1_signature + ) + if any(not sig_file.startswith("0000/") for sig_file in sig_files): + raise ValueError( + f"{world_size=} but found sig files for different hash buckets. Set tasks=finder_workers" + ) + else: + sig_files = self.data_folder.list_files( + subdirectory=f"{rank:04d}", + glob_pattern=ExtensionHelperSD.stage_1_signature, + ) + sig_readers = [ + read_sigs(file, file_i, lines_to_buffer=self.lines_to_buffer) + for file_i, file in enumerate(self.data_folder.open_files(sig_files)) + ] + index_files = self.index_folder.list_files() if self.index_folder else None + if index_files: + logger.info(f"Found index file(s): {', '.join(index_files)}") + sig_readers.extend( + [ + read_sigs( + file, + len(sig_readers) + file_i, + index_file=True, + lines_to_buffer=self.lines_to_buffer, + ) + for file_i, file in enumerate( + self.data_folder.open_files(index_files) + ) + ] + ) + + logger.info(f"Initializing pq with {len(sig_readers)} files.") + with ThreadPoolExecutor() as executor: + pq = [ + x + for x in tqdm( + executor.map(lambda x: next(x, None), sig_readers), + total=len(sig_readers), + desc="Initializing pq...", + ) + if x + ] + heapq.heapify(pq) + logger.info("PQ initialized.") + + output_mg = self.output_folder.get_output_file_manager(mode="wb") + + packer = struct.Struct(" np.ndarray: + """Helper function to read duplicates from a binary file storing (doc_id) as created by the second stage.""" + with file as f: + return np.fromfile(f, dtype=" DocumentsPipeline: + """step method for Filters. + Drops documents that if .filter() is False + + UrlDedupFilter reads a DocumentPipeline and removes duplicated urls found at stage 2 + """ + folders = self.data_folder.list_files(include_directories=True, recursive=False) + # for performance reasons when having for instance 12k*10k files + files = [ + f + for f in [ + f"{folder}/{rank:05d}{ExtensionHelperSD.stage_2_duplicates}" + for folder in folders + ] + if self.data_folder.exists(f) + ] + + logger.info(f"Loading duplicate indexes from {len(files)} results files.") + + all_dups = np.array([], dtype=" Date: Mon, 22 Apr 2024 17:39:22 +0200 Subject: [PATCH 2/5] add pypi release action --- .github/workflows/ci.yml | 1 + .github/workflows/pypi-release.yml | 67 ++++++++++++++++++++++++++++++ 2 files changed, 68 insertions(+) create mode 100644 .github/workflows/pypi-release.yml diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 675e7897..2149b38c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -7,6 +7,7 @@ on: push: branches: - main + workflow_call: jobs: check_code_quality: diff --git a/.github/workflows/pypi-release.yml b/.github/workflows/pypi-release.yml new file mode 100644 index 00000000..432f961b --- /dev/null +++ b/.github/workflows/pypi-release.yml @@ -0,0 +1,67 @@ +name: PyPI release +on: + workflow_dispatch: + +jobs: + ci: + uses: ./.github/workflows/ci.yml + release: + needs: ci + runs-on: ubuntu-latest + env: + TWINE_USERNAME: __token__ + GITHUB_TOKEN: ${{ secrets.GH_TOKEN }} + + steps: + - name: Checkout Repo + uses: actions/checkout@v3 + + - name: Setup Python + uses: actions/setup-python@v4 + with: + python-version: "3.10" + + - name: Install build dependencies + run: | + python -m pip install --upgrade pip + pip install -U twine build + + - name: Build the dist files + run: python -m build . + + - name: Publish to the test PyPI + env: + TWINE_PASSWORD: ${{ secrets.TEST_PYPI_TOKEN }} + run: twine upload dist/* --repository=testpypi + + - name: Test installing from test PyPI and running tests + env: + TWINE_PASSWORD: ${{ secrets.PYPI_TOKEN }} + run: | + pip install -i https://testpypi.python.org/pypi --extra-index-url https://pypi.org/simple testing-datatrove[terting] + python -m nltk.downloader punkt + + - name: Get tag name + id: get_tag_name + run: | + echo TAG_NAME=$(grep '^version' pyproject.toml | head -1 | cut -d '"' -f 2) >> $GITHUB_OUTPUT + echo ::notice + + + - name: Tag the release + uses: actions/github-script@v7 + with: + github-token: ${{ env.GITHUB_TOKEN }} + script: | + github.rest.git.createRef({ + owner: context.repo.owner, + repo: context.repo.repo, + ref: 'refs/tags/${{ steps.get_tag_name.outputs.TAG_NAME }}', + sha: context.sha + }) + + + - name: Publish to PyPI + env: + TWINE_PASSWORD: ${{ secrets.PYPI_TOKEN }} + run: twine upload dist/* --repository=pypi From 4ce37b8adf00fefbda8c2e2a0362558890ece6eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hynek=20Kydl=C3=AD=C4=8Dek?= Date: Mon, 22 Apr 2024 17:51:09 +0200 Subject: [PATCH 3/5] Update PyPI release workflow --- .github/workflows/pypi-release.yml | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/.github/workflows/pypi-release.yml b/.github/workflows/pypi-release.yml index 432f961b..d463bd35 100644 --- a/.github/workflows/pypi-release.yml +++ b/.github/workflows/pypi-release.yml @@ -10,7 +10,6 @@ jobs: runs-on: ubuntu-latest env: TWINE_USERNAME: __token__ - GITHUB_TOKEN: ${{ secrets.GH_TOKEN }} steps: - name: Checkout Repo @@ -38,20 +37,17 @@ jobs: env: TWINE_PASSWORD: ${{ secrets.PYPI_TOKEN }} run: | - pip install -i https://testpypi.python.org/pypi --extra-index-url https://pypi.org/simple testing-datatrove[terting] + pip install -i https://testpypi.python.org/pypi --extra-index-url https://pypi.org/simple datatrove[testing] python -m nltk.downloader punkt - name: Get tag name id: get_tag_name run: | echo TAG_NAME=$(grep '^version' pyproject.toml | head -1 | cut -d '"' -f 2) >> $GITHUB_OUTPUT - echo ::notice - - name: Tag the release uses: actions/github-script@v7 with: - github-token: ${{ env.GITHUB_TOKEN }} script: | github.rest.git.createRef({ owner: context.repo.owner, From 48b1b2187f931a0fe73e2e7dda0f35d59d21e8a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hynek=20Kydl=C3=AD=C4=8Dek?= Date: Mon, 22 Apr 2024 17:55:01 +0200 Subject: [PATCH 4/5] remove teh unwanted files --- src/datatrove/pipeline/dedup/url_dedup.py | 394 ---------------------- tests/pipeline/test_url_deduplication.py | 145 -------- 2 files changed, 539 deletions(-) delete mode 100644 src/datatrove/pipeline/dedup/url_dedup.py delete mode 100644 tests/pipeline/test_url_deduplication.py diff --git a/src/datatrove/pipeline/dedup/url_dedup.py b/src/datatrove/pipeline/dedup/url_dedup.py deleted file mode 100644 index 957a11ad..00000000 --- a/src/datatrove/pipeline/dedup/url_dedup.py +++ /dev/null @@ -1,394 +0,0 @@ -""" -URL based deduplication. -""" - -import contextlib -import dataclasses -import heapq -import struct -from concurrent.futures import ThreadPoolExecutor -from dataclasses import dataclass, field -from typing import Callable, Generator - -import numpy as np -from fsspec.spec import AbstractBufferedFile -from loguru import logger -from tqdm import tqdm - -from datatrove.data import Document, DocumentsPipeline -from datatrove.io import DataFolderLike, get_datafolder -from datatrove.pipeline.base import PipelineStep -from datatrove.utils.binaryio import read_tuples_from_file -from datatrove.utils.text import sha1_hash64 -from datatrove.utils.typeshelper import ExtensionHelperSD, StatHints - -from ..writers.disk_base import DiskWriter - - -@dataclass -class UrlDedupConfig: - """ - Args: - url_normalizer: Callable[[str], str] Normalize the url, e.g. remove query parameters - document_priority: Callable[[Document], int] - Function for determining the priority of a document. - Only the document with the highest priority will be preserved, out of duplicates. - The document priority must be in range [0, 65535] - """ - - url_normalizer: Callable[[str], str] = ( - lambda x: x - ) # Normalize the url, e.g. remove query parameters - document_priority: Callable[[Document], int] = ( - lambda x: 0 - ) # Urls with higher will be preserved, will be saved as unsigned short! - - -DEFAULT_URL_DEDUP_CONFIG = UrlDedupConfig() - - -@dataclass(order=False) -class HashSig: - hash_value: int - priority: int - doc_id: int - file_id: int - - def is_from_index(self): - return self.doc_id == -1 and self.priority == 1 - - def __lt__(self, other: "HashSig") -> bool: - # Ensure that highest priority is always first of the hashes - return (self.hash_value, -self.priority, self.doc_id) < ( - other.hash_value, - -other.priority, - other.doc_id, - ) - - -class UrlDedupSignature(PipelineStep): - """UrlDedup: First pipeline step - - Creates a signature for url in each document. Each HashSig has n hash, the -priority the doc id. Before saving - them the hashes are sorted. We use negative priority as we want to the highest priority urls to be first in priority queue. - - Args: - output_folder: folder where signatures are saved - """ - - type = "🫂 - DEDUPS" - name = "💥 url-deduplication stage 1" - _requires_dependencies = ["nltk"] - - def __init__( - self, - output_folder: DataFolderLike, - finder_workers: int = 1, - config: UrlDedupConfig = DEFAULT_URL_DEDUP_CONFIG, - language: str = "english", - ): - super().__init__() - self.output_folder = get_datafolder(output_folder) - if finder_workers <= 0: - raise ValueError("finder_workers must be >= 1") - elif finder_workers > 1: - logger.warning( - f"Remember to also set the name of tasks of the finder block to {finder_workers=}!" - ) - self.finder_workers = finder_workers - self.config = config - self.language = language - - def save_hashes(self, rank: int, signatures): - # explicitly define little endiannes - - priority_max = np.iinfo(np.dtype("= 0 and sig[1] < priority_max for sig in signatures - ), f"priority must be between 1 and {priority_max}" - signatures = np.array( - signatures, dtype=[("hash", " left_idx: - signatures[left_idx:right_idx].tofile(f) - left_idx = right_idx - # we've reached the end of our data - if right_idx >= len(signatures): - break - - def get_hashes( - self, doc: Document, doc_idx: int - ) -> list[None] | list[tuple[int, int, int]]: - normalized_url = self.config.url_normalizer(doc.metadata["url"]) - priority = self.config.document_priority(doc) - hashes = [(sha1_hash64(normalized_url.encode("utf-8")), priority, doc_idx)] - - return hashes - - def run(self, data: DocumentsPipeline, rank: int = 0, world_size: int = 1): - """Args: - data - rank - world_size - - Returns: - - UrlDedupSignature creates a signature for each document. Each HashSig has n hash, the priority the doc id. - Before saving them the hashes are sorted. - """ - signatures = [] - for doc_idx, doc in enumerate(data): - with self.stats.time_stats: - self.stat_update(StatHints.total) - signatures.extend(self.get_hashes(doc, doc_idx)) - self.save_hashes(rank, signatures) - - -def read_sigs( - file: AbstractBufferedFile, - file_id: int, - index_file: bool = False, - lines_to_buffer: int = 5, -) -> Generator[HashSig, None, None]: - line_format = "QHI" if not index_file else "Q" - last = None - with file as f: - for data in read_tuples_from_file( - f, line_format, lines_to_buffer=lines_to_buffer - ): - assert ( - last is None or data[0] >= last - ), f"Hash order error. {f.tell()=}, {data[0]=}, {last=}" - last = data[0] - yield ( - HashSig(hash_value=data[0], doc_id=-1, file_id=file_id, priority=-1) - if index_file - else HashSig( - file_id=file_id, - hash_value=data[0], - priority=data[1], - doc_id=data[2], - ) - ) - - -class UrlFindDedups(PipelineStep): - """UrlDedup: Second pipeline step - - UrlFindDedups runs on a single worker. It reads all the signatures from the previous step and loads them - in a priority queue to check for duplicates. If a duplicate is found its document id is saved. - The document with the highest priority is the one that will be saved out of the duplicates . - - Args: - data_folder: data folder where signatures are saved - output_folder: folder where duplicates are saved - index_folder: folder where index files are saved - only_dedup_in_index: only dedup in index - """ - - type = "🫂 - DEDUPS" - name = "💥 url-deduplication stage 2" - - def __init__( - self, - data_folder: DataFolderLike, - output_folder: DataFolderLike, - index_folder: DataFolderLike = None, - config: UrlDedupConfig = DEFAULT_URL_DEDUP_CONFIG, - lines_to_buffer: int = 5, - ): - super().__init__() - self.data_folder = get_datafolder(data_folder) - self.output_folder = get_datafolder(output_folder) - self.index_folder = get_datafolder(index_folder) if index_folder else None - - self.config = config - self.lines_to_buffer = lines_to_buffer - - def run(self, data: DocumentsPipeline = None, rank: int = 0, world_size: int = 1): - with self.stats.time_stats: - if world_size == 1: - # check that there was not a mistake in setting this values - sig_files = self.data_folder.list_files( - glob_pattern="*/*" + ExtensionHelperSD.stage_1_signature - ) - if any(not sig_file.startswith("0000/") for sig_file in sig_files): - raise ValueError( - f"{world_size=} but found sig files for different hash buckets. Set tasks=finder_workers" - ) - else: - sig_files = self.data_folder.list_files( - subdirectory=f"{rank:04d}", - glob_pattern=ExtensionHelperSD.stage_1_signature, - ) - sig_readers = [ - read_sigs(file, file_i, lines_to_buffer=self.lines_to_buffer) - for file_i, file in enumerate(self.data_folder.open_files(sig_files)) - ] - index_files = self.index_folder.list_files() if self.index_folder else None - if index_files: - logger.info(f"Found index file(s): {', '.join(index_files)}") - sig_readers.extend( - [ - read_sigs( - file, - len(sig_readers) + file_i, - index_file=True, - lines_to_buffer=self.lines_to_buffer, - ) - for file_i, file in enumerate( - self.data_folder.open_files(index_files) - ) - ] - ) - - logger.info(f"Initializing pq with {len(sig_readers)} files.") - with ThreadPoolExecutor() as executor: - pq = [ - x - for x in tqdm( - executor.map(lambda x: next(x, None), sig_readers), - total=len(sig_readers), - desc="Initializing pq...", - ) - if x - ] - heapq.heapify(pq) - logger.info("PQ initialized.") - - output_mg = self.output_folder.get_output_file_manager(mode="wb") - - packer = struct.Struct(" np.ndarray: - """Helper function to read duplicates from a binary file storing (doc_id) as created by the second stage.""" - with file as f: - return np.fromfile(f, dtype=" DocumentsPipeline: - """step method for Filters. - Drops documents that if .filter() is False - - UrlDedupFilter reads a DocumentPipeline and removes duplicated urls found at stage 2 - """ - folders = self.data_folder.list_files(include_directories=True, recursive=False) - # for performance reasons when having for instance 12k*10k files - files = [ - f - for f in [ - f"{folder}/{rank:05d}{ExtensionHelperSD.stage_2_duplicates}" - for folder in folders - ] - if self.data_folder.exists(f) - ] - - logger.info(f"Loading duplicate indexes from {len(files)} results files.") - - all_dups = np.array([], dtype=" Date: Mon, 22 Apr 2024 18:43:31 +0200 Subject: [PATCH 5/5] fixed workflow erros --- .github/workflows/pypi-release.yml | 8 +++----- .github/workflows/{ci.yml => testing.yml} | 2 +- 2 files changed, 4 insertions(+), 6 deletions(-) rename .github/workflows/{ci.yml => testing.yml} (97%) diff --git a/.github/workflows/pypi-release.yml b/.github/workflows/pypi-release.yml index d463bd35..ddf596b7 100644 --- a/.github/workflows/pypi-release.yml +++ b/.github/workflows/pypi-release.yml @@ -3,7 +3,7 @@ on: workflow_dispatch: jobs: - ci: + testing: uses: ./.github/workflows/ci.yml release: needs: ci @@ -34,11 +34,10 @@ jobs: run: twine upload dist/* --repository=testpypi - name: Test installing from test PyPI and running tests - env: - TWINE_PASSWORD: ${{ secrets.PYPI_TOKEN }} run: | pip install -i https://testpypi.python.org/pypi --extra-index-url https://pypi.org/simple datatrove[testing] python -m nltk.downloader punkt + make test - name: Get tag name id: get_tag_name @@ -52,11 +51,10 @@ jobs: github.rest.git.createRef({ owner: context.repo.owner, repo: context.repo.repo, - ref: 'refs/tags/${{ steps.get_tag_name.outputs.TAG_NAME }}', + ref: 'refs/tags/v${{ steps.get_tag_name.outputs.TAG_NAME }}', sha: context.sha }) - - name: Publish to PyPI env: TWINE_PASSWORD: ${{ secrets.PYPI_TOKEN }} diff --git a/.github/workflows/ci.yml b/.github/workflows/testing.yml similarity index 97% rename from .github/workflows/ci.yml rename to .github/workflows/testing.yml index 2149b38c..a0fb9920 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/testing.yml @@ -1,4 +1,4 @@ -name: CI +name: Test & Check Code Quality on: pull_request: