diff --git a/.github/runners/Dockerfile b/.github/runners/Dockerfile new file mode 100644 index 0000000..493489d --- /dev/null +++ b/.github/runners/Dockerfile @@ -0,0 +1,51 @@ +FROM python:3.11-slim + +# Install necessary packages +RUN apt-get update \ + && apt-get install -y --no-install-recommends \ + build-essential \ + cmake \ + sudo \ + unzip \ + curl \ + wget \ + git \ + git-lfs \ + jq \ + && rm -rf /var/lib/apt/lists/* + + +RUN curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip" && \ + unzip awscliv2.zip && \ + ./aws/install + +ENV HOME=/home/runner + +RUN mkdir -p /home/runner + +ARG RUNNER_VERSION=2.317.0 + +ARG RUNNER_UID=1000 +ARG DOCKER_GID=1001 + +RUN adduser --disabled-password --gecos "" --uid $RUNNER_UID runner \ + && groupadd docker --gid $DOCKER_GID \ + && usermod -aG sudo runner \ + && usermod -aG docker runner \ + && echo "%sudo ALL=(ALL:ALL) NOPASSWD:ALL" > /etc/sudoers \ + && echo "Defaults env_keep += \"DEBIAN_FRONTEND\"" >> /etc/sudoers + +# cd into the user directory, download and unzip the github actions runner +RUN cd /home/runner && mkdir actions-runner && cd actions-runner \ + && curl -O -L https://github.com/actions/runner/releases/download/v${RUNNER_VERSION}/actions-runner-linux-x64-${RUNNER_VERSION}.tar.gz \ + && tar xzf ./actions-runner-linux-x64-${RUNNER_VERSION}.tar.gz + +RUN chown -R runner:runner /home/runner && /home/runner/actions-runner/bin/installdependencies.sh + +ADD ./start.sh /home/runner/start.sh + +RUN chmod +x /home/runner/start.sh + +ENTRYPOINT ["/bin/bash", "/home/runner/start.sh"] + +USER runner \ No newline at end of file diff --git a/.github/runners/start.sh b/.github/runners/start.sh new file mode 100644 index 0000000..84d3c3d --- /dev/null +++ b/.github/runners/start.sh @@ -0,0 +1,21 @@ +#!/bin/bash + +RUNNER_REPO=$RUNNER_REPO +RUNNER_PAT=$RUNNER_PAT +RUNNER_GROUP=$RUNNER_GROUP +RUNNER_LABELS=$RUNNER_LABELS +RUNNER_NAME=$(hostname) + +cd /home/runner/actions-runner + +./config.sh --unattended --replace --url https://github.com/${RUNNER_REPO} --pat ${RUNNER_PAT} --name ${RUNNER_NAME} --runnergroup ${RUNNER_GROUP} --labels ${RUNNER_LABELS} --work /home/runner/actions-runner/_work + +cleanup() { + echo "Removing runner..." + ./config.sh remove --unattended --pat ${RUNNER_PAT} +} + +trap 'cleanup; exit 130' INT +trap 'cleanup; exit 143' TERM + +./run.sh & wait $! \ No newline at end of file diff --git a/.github/workflows/test-flow.yml b/.github/workflows/test-flow.yml new file mode 100644 index 0000000..f2ff92e --- /dev/null +++ b/.github/workflows/test-flow.yml @@ -0,0 +1,27 @@ +name: CI/CD Test + +on: + push: + branches: main + pull_request: + branches: main + workflow_dispatch: + +jobs: + test: + runs-on: research-utils + + steps: + - uses: actions/checkout@v4 + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -e . + + - name: Run test_flow.py + run: python test_flows.py + env: + S3_ACCESS_KEY: ${{ secrets.MINIO_ACCESS_KEY_ID }} + S3_SECRET_KEY: ${{ secrets.MINIO_SECRET_ACCESS_KEY }} + S3_ENDPOINT_URL: ${{ secrets.MINIO_ENDPOINT }} \ No newline at end of file diff --git a/example_env.sh b/example_env.sh index 2eeb355..9633801 100755 --- a/example_env.sh +++ b/example_env.sh @@ -1,3 +1,4 @@ export S3_ACCESS_KEY=minioadmin export S3_SECRET_KEY=minioadmin -export S3_ENDPOINT_URL=http://127.0.0.1:9000 +export S3_ENDPOINT_URL="http://127.0.0.1:9000" +mc alias set ALIAS $S3_ENDPOINT_URL $S3_ACCESS_KEY $S3_SECRET_KEY \ No newline at end of file diff --git a/main.py b/main.py deleted file mode 100644 index 1ba6bad..0000000 --- a/main.py +++ /dev/null @@ -1,23 +0,0 @@ -from s3helper import S3Helper,S3HelperAutoConfig,S3HelperAutoTokenizer,S3HelperAutoModelForCausalLM, s3_load_dataset -import os - -os.environ['S3_ACCESS_KEY'] = 'minioadmin' -os.environ['S3_SECRET_KEY'] = 'minioadmin' -os.environ['S3_ENDPOINT_URL'] = 'http://172.17.0.2:9001' -S3Helper() - -# # Example usage -# model_name = "thunghiem/tinyllama" -# model = S3HelperAutoModelForCausalLM.from_pretrained(model_name) -# tokenizer = S3HelperAutoTokenizer.from_pretrained(model_name) -# config = S3HelperAutoConfig.from_pretrained(model_name) -# Make sure S3Helper is initialized and environment variables are set -# Load a dataset -dataset = s3_load_dataset("modelhubjan/test_dataset") - -# Use the dataset -for item in dataset: - print(item) - -# You can also pass additional arguments to load_dataset -dataset = s3_load_dataset("modelhubjan/test_dataset", file_format='parquet', split='train') \ No newline at end of file diff --git a/s3helper/s3_helper.py b/s3helper/s3_helper.py index f8955e9..a493c14 100644 --- a/s3helper/s3_helper.py +++ b/s3helper/s3_helper.py @@ -4,12 +4,23 @@ from transformers import AutoModelForCausalLM, AutoTokenizer, AutoConfig import sys import logging -from datasets import load_dataset, Dataset -from typing import Optional, Dict, Any - +from datasets import load_dataset, Dataset, load_from_disk +from typing import Optional, Dict, Any, List # Configure logging logging.basicConfig(stream=sys.stdout, level=logging.INFO, format='%(asctime)s - %(message)s') +def find_files(directory: str, file_format: str): + matching_files = [] + + # Walk through the directory + for root, _, files in os.walk(directory): + for file in files: + # Check if the file ends with the specified format + if file.endswith(f".{file_format}"): + matching_files.append(os.path.join(root, file)) + + return matching_files + class S3Helper: _instance = None @@ -46,7 +57,7 @@ def validate_credentials(self): logging.error(f"Invalid S3 credentials: {e}") raise ValueError("Invalid S3 credentials") - def download_model(self, path_components: list, local_dir: str = './models'): + def download_file(self, path_components: list, local_dir: str): bucket_name = path_components[0] model_name = path_components[1] objects = self.s3_client.list_objects_v2(Bucket=bucket_name, Prefix=model_name) @@ -59,99 +70,113 @@ def download_model(self, path_components: list, local_dir: str = './models'): self.s3_client.download_file(bucket_name, file_key, file_path) logging.info(f'Downloaded file: {file_key}') - def ensure_model_local(self, pretrained_model_name_or_path, local_dir): - path_components = pretrained_model_name_or_path.split("/") + def ensure_file_local(self, file_name_or_path: str, local_dir: str): + path_components = file_name_or_path.split("/") if len(path_components) != 2: - logging.error("Cannot recognize bucket name and model name since having > 2 components") - raise ValueError("Cannot recognize bucket name and model name since having > 2 components") - model_local_path = os.path.join(local_dir, pretrained_model_name_or_path) - if not os.path.exists(model_local_path): - os.makedirs(model_local_path, exist_ok=True) - self.download_model(path_components, local_dir) + logging.error("Cannot recognize bucket name and file name since the components are not 2") + raise ValueError("Cannot recognize bucket name and file name since the components are not 2") + file_local_path = os.path.join(local_dir, file_name_or_path) + if not os.path.exists(file_local_path): + os.makedirs(file_local_path, exist_ok=True) + self.download_file(path_components, local_dir) else: - logging.info(f"Model existed at: {model_local_path}, read from cache") - return model_local_path + if 'model' in local_dir.lower(): + + logging.info(f"Model existed at: {file_local_path}, read from cache") + elif 'dataset' in local_dir.lower(): + logging.info(f"Dataset existed at: {file_local_path}, read from cache") + return file_local_path - def upload_to_s3(self, local_dir, bucket_name, model_name): + def upload_to_s3(self, local_dir, bucket_name, file_name): for root, _, files in os.walk(local_dir): for file in files: local_file_path = os.path.join(root, file) s3_key = os.path.relpath(local_file_path, local_dir) - self.s3_client.upload_file(local_file_path, bucket_name, os.path.join(model_name, s3_key)) - logging.info(f'Uploaded {local_file_path} to s3://{bucket_name}/{model_name}/{s3_key}') - def download_dataset(self, path_components: list, local_dir: str = './datasets'): - bucket_name = path_components[0] - dataset_name = path_components[1] - objects = self.s3_client.list_objects_v2(Bucket=bucket_name, Prefix=dataset_name) - for obj in objects.get('Contents', []): - file_key = obj['Key'] - if file_key.endswith('/'): - continue # Skip directories - file_path = os.path.join(local_dir, bucket_name, file_key) - os.makedirs(os.path.dirname(file_path), exist_ok=True) - self.s3_client.download_file(bucket_name, file_key, file_path) - logging.info(f'Downloaded dataset file: {file_key}') + self.s3_client.upload_file(local_file_path, bucket_name, os.path.join(file_name, s3_key)) + logging.info(f'Uploaded {local_file_path} to s3://{bucket_name}/{file_name}/{s3_key}') + # def download_dataset(self, path_components: list, local_dir: str = './datasets'): + # bucket_name = path_components[0] + # dataset_name = path_components[1] + # objects = self.s3_client.list_objects_v2(Bucket=bucket_name, Prefix=dataset_name) + # for obj in objects.get('Contents', []): + # file_key = obj['Key'] + # if file_key.endswith('/'): + # continue # Skip directories + # file_path = os.path.join(local_dir, bucket_name, file_key) + # os.makedirs(os.path.dirname(file_path), exist_ok=True) + # self.s3_client.download_file(bucket_name, file_key, file_path) + # logging.info(f'Downloaded dataset file: {file_key}') class S3HelperAutoModelForCausalLM(AutoModelForCausalLM): @classmethod def from_pretrained(cls, pretrained_model_name_or_path, *model_args, local_dir: str = './models', **kwargs): s3_helper = S3Helper.get_instance() - model_local_path = s3_helper.ensure_model_local(pretrained_model_name_or_path, local_dir) + model_local_path = s3_helper.ensure_file_local(pretrained_model_name_or_path, local_dir) return super().from_pretrained(model_local_path, *model_args, **kwargs) class S3HelperAutoTokenizer(AutoTokenizer): @classmethod def from_pretrained(cls, pretrained_model_name_or_path, *model_args, local_dir: str = './models', **kwargs): s3_helper = S3Helper.get_instance() - tokenizer_local_path = s3_helper.ensure_model_local(pretrained_model_name_or_path, local_dir) + tokenizer_local_path = s3_helper.ensure_file_local(pretrained_model_name_or_path, local_dir) return super().from_pretrained(tokenizer_local_path, *model_args, **kwargs) class S3HelperAutoConfig(AutoConfig): @classmethod def from_pretrained(cls, pretrained_model_name_or_path, *model_args, local_dir: str = './models', **kwargs): s3_helper = S3Helper.get_instance() - config_local_path = s3_helper.ensure_model_local(pretrained_model_name_or_path, local_dir) + config_local_path = s3_helper.ensure_file_local(pretrained_model_name_or_path, local_dir) return super().from_pretrained(config_local_path, *model_args, **kwargs) # defined a custom load_dataset from S3 bucket def s3_load_dataset( - path: str, - local_dir: str = './datasets', + dataset_name_or_path: str, file_format: str = 'json', + local_dir: str = './datasets', + split: str = None, *args: Any, **kwargs: Any ) -> Dataset: """ Load a dataset from S3/Minio storage. - Args: - path (str): Path to the dataset in the format 'bucket_name/dataset_name' - local_dir (str): Local directory to store downloaded datasets - file_format (str): Format of the dataset file (e.g., 'json', 'csv', 'parquet') - *args: Additional positional arguments to pass to load_dataset - **kwargs: Additional keyword arguments to pass to load_dataset - + dataset_name_or_path (str): Path to the dataset in the format 'bucket_name/dataset_name' + file_format (str): File format of the dataset. Either 'json', 'csv', or 'parquet'. + local_dir (str): Local directory to store downloaded datasets + split (str): Dataset split to load ('train', 'test', or None for all) + *args: Additional positional arguments to pass to load_dataset + **kwargs: Additional keyword arguments to pass to load_dataset Returns: - Dataset: The loaded dataset + Dataset: The loaded dataset """ s3_helper = S3Helper.get_instance() + dataset_local_path = s3_helper.ensure_file_local(dataset_name_or_path, local_dir) - # Split the path into bucket and dataset name - path_components = path.split("/") - if len(path_components) != 2: - raise ValueError("Path should be in the format 'bucket_name/dataset_name'") + def find_files(path: str, extension: str) -> List[str]: + return [os.path.join(root, file) for root, _, files in os.walk(path) + for file in files if file.endswith(f'.{extension}')] - bucket_name, dataset_name = path_components - dataset_local_path = os.path.join(local_dir, bucket_name, dataset_name) - - # Download dataset if not exists locally - if not os.path.exists(dataset_local_path): - os.makedirs(dataset_local_path, exist_ok=True) - s3_helper.download_dataset(path_components, local_dir) - else: - logging.info(f"Dataset already exists at: {dataset_local_path}, using cached version") - - # Construct the path to the data file - data_file_path = os.path.join(dataset_local_path, f"data.{file_format}") + local_files = find_files(dataset_local_path, file_format) + logging.info(f"Found local files: {local_files}") + + data_files: Dict[str, List[str]] = {"train": [], "test": []} + for file in local_files: + if "train" in file: + data_files["train"].append(file) + elif "test" in file: + data_files["test"].append(file) + else: + logging.warning(f"Unclassified file: {file}") + + if split: + if split not in data_files: + raise ValueError(f"Invalid split: {split}. Available splits are: {list(data_files.keys())}") + data_files = {split: data_files[split]} + + # Remove empty splits + data_files = {k: v for k, v in data_files.items() if v} + + if not data_files: + raise ValueError(f"No valid files found for the specified format and split.") - # Load and return the dataset - return load_dataset(file_format, data_files=data_file_path, *args, **kwargs) \ No newline at end of file + logging.info(f"Loading dataset with data_files: {data_files}") + return load_dataset(file_format, data_files=data_files, *args, **kwargs) \ No newline at end of file diff --git a/setup.py b/setup.py index eb072c1..818a845 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ setup( name='research-utils', - version='0.2.0', # Increment the version number + version='0.2.1', # Increment the version number description='A helper library for working with S3/Minio, Hugging Face models, and datasets', long_description='This library provides utilities for downloading and managing machine learning models and datasets from S3-compatible storage services, and loading them using the Hugging Face libraries.', author='Alan', @@ -12,8 +12,10 @@ packages=find_packages(), install_requires=[ 'boto3', + # tokenizers >=0.13.3 + 'tokenizers==0.13.3', 'transformers', - 'datasets', # Add the datasets library + 'datasets==2.20.0', # Add the datasets library ], classifiers=[ 'Programming Language :: Python :: 3', diff --git a/test_flows.py b/test_flows.py new file mode 100644 index 0000000..aeea043 --- /dev/null +++ b/test_flows.py @@ -0,0 +1,20 @@ +from s3helper import S3Helper,S3HelperAutoConfig,S3HelperAutoTokenizer,S3HelperAutoModelForCausalLM, s3_load_dataset +import os +import logging + +# os.environ['S3_ACCESS_KEY'] = 'minioadmin' +# os.environ['S3_SECRET_KEY'] = 'minioadmin' +# os.environ['S3_ENDPOINT_URL'] = 'http://172.17.0.2:9000' +S3Helper() + +# # Example usage +model_name = "jan-hq-test/tokenizer-tinyllama" +# model = S3HelperAutoModelForCausalLM.from_pretrained(model_name) +tokenizer = S3HelperAutoTokenizer.from_pretrained(model_name) +logging.info(f"Tokenizer Loading successful: {tokenizer}") +# print(tokenizer) +# config = S3HelperAutoConfig.from_pretrained(model_name) +# Make sure S3Helper is initialized and environment variables are set +# Load a dataset from S3 bucket +dataset = s3_load_dataset("jan-hq-test/test-dataset",file_format='parquet', split='train') +logging.info(f"Dataset Loading successful") \ No newline at end of file