From 4a311d058b4f59efd7209ecf62ec9708580e70a3 Mon Sep 17 00:00:00 2001 From: bachvudinh Date: Tue, 16 Jul 2024 09:50:33 +0000 Subject: [PATCH 1/3] refactor the code --- .github/workflows/test-flow.yml | 0 example_env.sh | 3 +- s3helper/s3_helper.py | 107 +++++++++++++++++--------------- main.py => test_flows.py | 13 +--- 4 files changed, 63 insertions(+), 60 deletions(-) create mode 100644 .github/workflows/test-flow.yml rename main.py => test_flows.py (61%) diff --git a/.github/workflows/test-flow.yml b/.github/workflows/test-flow.yml new file mode 100644 index 0000000..e69de29 diff --git a/example_env.sh b/example_env.sh index 2eeb355..9633801 100755 --- a/example_env.sh +++ b/example_env.sh @@ -1,3 +1,4 @@ export S3_ACCESS_KEY=minioadmin export S3_SECRET_KEY=minioadmin -export S3_ENDPOINT_URL=http://127.0.0.1:9000 +export S3_ENDPOINT_URL="http://127.0.0.1:9000" +mc alias set ALIAS $S3_ENDPOINT_URL $S3_ACCESS_KEY $S3_SECRET_KEY \ No newline at end of file diff --git a/s3helper/s3_helper.py b/s3helper/s3_helper.py index f8955e9..10ffcf6 100644 --- a/s3helper/s3_helper.py +++ b/s3helper/s3_helper.py @@ -4,12 +4,24 @@ from transformers import AutoModelForCausalLM, AutoTokenizer, AutoConfig import sys import logging -from datasets import load_dataset, Dataset +from datasets import load_dataset, Dataset, load_from_disk from typing import Optional, Dict, Any # Configure logging logging.basicConfig(stream=sys.stdout, level=logging.INFO, format='%(asctime)s - %(message)s') +def find_files(directory: str, file_format: str): + matching_files = [] + + # Walk through the directory + for root, _, files in os.walk(directory): + for file in files: + # Check if the file ends with the specified format + if file.endswith(f".{file_format}"): + matching_files.append(os.path.join(root, file)) + + return matching_files + class S3Helper: _instance = None @@ -46,7 +58,7 @@ def validate_credentials(self): logging.error(f"Invalid S3 credentials: {e}") raise ValueError("Invalid S3 credentials") - def download_model(self, path_components: list, local_dir: str = './models'): + def download_file(self, path_components: list, local_dir: str): bucket_name = path_components[0] model_name = path_components[1] objects = self.s3_client.list_objects_v2(Bucket=bucket_name, Prefix=model_name) @@ -59,38 +71,41 @@ def download_model(self, path_components: list, local_dir: str = './models'): self.s3_client.download_file(bucket_name, file_key, file_path) logging.info(f'Downloaded file: {file_key}') - def ensure_model_local(self, pretrained_model_name_or_path, local_dir): - path_components = pretrained_model_name_or_path.split("/") + def ensure_file_local(self, file_name_or_path: str, local_dir: str): + path_components = file_name_or_path.split("/") if len(path_components) != 2: - logging.error("Cannot recognize bucket name and model name since having > 2 components") - raise ValueError("Cannot recognize bucket name and model name since having > 2 components") - model_local_path = os.path.join(local_dir, pretrained_model_name_or_path) - if not os.path.exists(model_local_path): - os.makedirs(model_local_path, exist_ok=True) - self.download_model(path_components, local_dir) + logging.error("Cannot recognize bucket name and file name since the components are not 2") + raise ValueError("Cannot recognize bucket name and file name since the components are not 2") + file_local_path = os.path.join(local_dir, file_name_or_path) + if not os.path.exists(file_local_path): + os.makedirs(file_local_path, exist_ok=True) + self.download_file(path_components, local_dir) else: - logging.info(f"Model existed at: {model_local_path}, read from cache") - return model_local_path + if 'model' in file_name_or_path: + logging.info(f"Model existed at: {file_local_path}, read from cache") + elif 'dataset' in file_name_or_path: + logging.info(f"Dataset existed at: {file_local_path}, read from cache") + return file_local_path - def upload_to_s3(self, local_dir, bucket_name, model_name): + def upload_to_s3(self, local_dir, bucket_name, file_name): for root, _, files in os.walk(local_dir): for file in files: local_file_path = os.path.join(root, file) s3_key = os.path.relpath(local_file_path, local_dir) - self.s3_client.upload_file(local_file_path, bucket_name, os.path.join(model_name, s3_key)) - logging.info(f'Uploaded {local_file_path} to s3://{bucket_name}/{model_name}/{s3_key}') - def download_dataset(self, path_components: list, local_dir: str = './datasets'): - bucket_name = path_components[0] - dataset_name = path_components[1] - objects = self.s3_client.list_objects_v2(Bucket=bucket_name, Prefix=dataset_name) - for obj in objects.get('Contents', []): - file_key = obj['Key'] - if file_key.endswith('/'): - continue # Skip directories - file_path = os.path.join(local_dir, bucket_name, file_key) - os.makedirs(os.path.dirname(file_path), exist_ok=True) - self.s3_client.download_file(bucket_name, file_key, file_path) - logging.info(f'Downloaded dataset file: {file_key}') + self.s3_client.upload_file(local_file_path, bucket_name, os.path.join(file_name, s3_key)) + logging.info(f'Uploaded {local_file_path} to s3://{bucket_name}/{file_name}/{s3_key}') + # def download_dataset(self, path_components: list, local_dir: str = './datasets'): + # bucket_name = path_components[0] + # dataset_name = path_components[1] + # objects = self.s3_client.list_objects_v2(Bucket=bucket_name, Prefix=dataset_name) + # for obj in objects.get('Contents', []): + # file_key = obj['Key'] + # if file_key.endswith('/'): + # continue # Skip directories + # file_path = os.path.join(local_dir, bucket_name, file_key) + # os.makedirs(os.path.dirname(file_path), exist_ok=True) + # self.s3_client.download_file(bucket_name, file_key, file_path) + # logging.info(f'Downloaded dataset file: {file_key}') class S3HelperAutoModelForCausalLM(AutoModelForCausalLM): @classmethod @@ -114,9 +129,9 @@ def from_pretrained(cls, pretrained_model_name_or_path, *model_args, local_dir: return super().from_pretrained(config_local_path, *model_args, **kwargs) # defined a custom load_dataset from S3 bucket def s3_load_dataset( - path: str, - local_dir: str = './datasets', + dataset_name_or_path: str, file_format: str = 'json', + local_dir: str = './datasets', *args: Any, **kwargs: Any ) -> Dataset: @@ -125,8 +140,8 @@ def s3_load_dataset( Args: path (str): Path to the dataset in the format 'bucket_name/dataset_name' + file_format: File format of the dataset. Either 'json' or 'csv' or 'parquet'. local_dir (str): Local directory to store downloaded datasets - file_format (str): Format of the dataset file (e.g., 'json', 'csv', 'parquet') *args: Additional positional arguments to pass to load_dataset **kwargs: Additional keyword arguments to pass to load_dataset @@ -134,24 +149,18 @@ def s3_load_dataset( Dataset: The loaded dataset """ s3_helper = S3Helper.get_instance() - # Split the path into bucket and dataset name - path_components = path.split("/") - if len(path_components) != 2: - raise ValueError("Path should be in the format 'bucket_name/dataset_name'") - - bucket_name, dataset_name = path_components - dataset_local_path = os.path.join(local_dir, bucket_name, dataset_name) - - # Download dataset if not exists locally - if not os.path.exists(dataset_local_path): - os.makedirs(dataset_local_path, exist_ok=True) - s3_helper.download_dataset(path_components, local_dir) - else: - logging.info(f"Dataset already exists at: {dataset_local_path}, using cached version") - - # Construct the path to the data file - data_file_path = os.path.join(dataset_local_path, f"data.{file_format}") - + dataset_local_path = ensure_file_local(dataset_name_or_path, local_dir) + local_files = find_files(dataset_local_path, file_format) + dataset_local_paths = [os.path.join(dataset_local_path, file) for file in local_files] + train_local_paths = [] + test_local_paths = [] + for file in dataset_local_paths: + if "train" in file: + train_local_paths.append(file) + elif "test" in file: + test_local_paths.append(file) + else: + raise ValueError("Not Implemented") # Load and return the dataset - return load_dataset(file_format, data_files=data_file_path, *args, **kwargs) \ No newline at end of file + return load_dataset(file_format, data_files={'train': train_local_paths, "test": test_local_paths}, *args, **kwargs) \ No newline at end of file diff --git a/main.py b/test_flows.py similarity index 61% rename from main.py rename to test_flows.py index 1ba6bad..cbd8dbe 100644 --- a/main.py +++ b/test_flows.py @@ -3,7 +3,7 @@ os.environ['S3_ACCESS_KEY'] = 'minioadmin' os.environ['S3_SECRET_KEY'] = 'minioadmin' -os.environ['S3_ENDPOINT_URL'] = 'http://172.17.0.2:9001' +os.environ['S3_ENDPOINT_URL'] = 'http://172.17.0.2:9000' S3Helper() # # Example usage @@ -12,12 +12,5 @@ # tokenizer = S3HelperAutoTokenizer.from_pretrained(model_name) # config = S3HelperAutoConfig.from_pretrained(model_name) # Make sure S3Helper is initialized and environment variables are set -# Load a dataset -dataset = s3_load_dataset("modelhubjan/test_dataset") - -# Use the dataset -for item in dataset: - print(item) - -# You can also pass additional arguments to load_dataset -dataset = s3_load_dataset("modelhubjan/test_dataset", file_format='parquet', split='train') \ No newline at end of file +# Load a dataset from S3 bucket +dataset = s3_load_dataset("jan-hq/test_dataset",file_format='parquet', split='train') From b3cad27e10e1faa9b3237ce0eeac756422d9fd60 Mon Sep 17 00:00:00 2001 From: bachvudinh Date: Tue, 16 Jul 2024 17:24:46 +0700 Subject: [PATCH 2/3] debug and correct version's dependency --- s3helper/s3_helper.py | 68 ++++++++++++++++++++++++++----------------- setup.py | 6 ++-- test_flows.py | 7 +++-- 3 files changed, 50 insertions(+), 31 deletions(-) diff --git a/s3helper/s3_helper.py b/s3helper/s3_helper.py index 10ffcf6..a493c14 100644 --- a/s3helper/s3_helper.py +++ b/s3helper/s3_helper.py @@ -5,8 +5,7 @@ import sys import logging from datasets import load_dataset, Dataset, load_from_disk -from typing import Optional, Dict, Any - +from typing import Optional, Dict, Any, List # Configure logging logging.basicConfig(stream=sys.stdout, level=logging.INFO, format='%(asctime)s - %(message)s') @@ -81,9 +80,10 @@ def ensure_file_local(self, file_name_or_path: str, local_dir: str): os.makedirs(file_local_path, exist_ok=True) self.download_file(path_components, local_dir) else: - if 'model' in file_name_or_path: + if 'model' in local_dir.lower(): + logging.info(f"Model existed at: {file_local_path}, read from cache") - elif 'dataset' in file_name_or_path: + elif 'dataset' in local_dir.lower(): logging.info(f"Dataset existed at: {file_local_path}, read from cache") return file_local_path @@ -111,56 +111,72 @@ class S3HelperAutoModelForCausalLM(AutoModelForCausalLM): @classmethod def from_pretrained(cls, pretrained_model_name_or_path, *model_args, local_dir: str = './models', **kwargs): s3_helper = S3Helper.get_instance() - model_local_path = s3_helper.ensure_model_local(pretrained_model_name_or_path, local_dir) + model_local_path = s3_helper.ensure_file_local(pretrained_model_name_or_path, local_dir) return super().from_pretrained(model_local_path, *model_args, **kwargs) class S3HelperAutoTokenizer(AutoTokenizer): @classmethod def from_pretrained(cls, pretrained_model_name_or_path, *model_args, local_dir: str = './models', **kwargs): s3_helper = S3Helper.get_instance() - tokenizer_local_path = s3_helper.ensure_model_local(pretrained_model_name_or_path, local_dir) + tokenizer_local_path = s3_helper.ensure_file_local(pretrained_model_name_or_path, local_dir) return super().from_pretrained(tokenizer_local_path, *model_args, **kwargs) class S3HelperAutoConfig(AutoConfig): @classmethod def from_pretrained(cls, pretrained_model_name_or_path, *model_args, local_dir: str = './models', **kwargs): s3_helper = S3Helper.get_instance() - config_local_path = s3_helper.ensure_model_local(pretrained_model_name_or_path, local_dir) + config_local_path = s3_helper.ensure_file_local(pretrained_model_name_or_path, local_dir) return super().from_pretrained(config_local_path, *model_args, **kwargs) # defined a custom load_dataset from S3 bucket def s3_load_dataset( dataset_name_or_path: str, file_format: str = 'json', local_dir: str = './datasets', + split: str = None, *args: Any, **kwargs: Any ) -> Dataset: """ Load a dataset from S3/Minio storage. - Args: - path (str): Path to the dataset in the format 'bucket_name/dataset_name' - file_format: File format of the dataset. Either 'json' or 'csv' or 'parquet'. - local_dir (str): Local directory to store downloaded datasets - *args: Additional positional arguments to pass to load_dataset - **kwargs: Additional keyword arguments to pass to load_dataset - + dataset_name_or_path (str): Path to the dataset in the format 'bucket_name/dataset_name' + file_format (str): File format of the dataset. Either 'json', 'csv', or 'parquet'. + local_dir (str): Local directory to store downloaded datasets + split (str): Dataset split to load ('train', 'test', or None for all) + *args: Additional positional arguments to pass to load_dataset + **kwargs: Additional keyword arguments to pass to load_dataset Returns: - Dataset: The loaded dataset + Dataset: The loaded dataset """ s3_helper = S3Helper.get_instance() - # Split the path into bucket and dataset name - dataset_local_path = ensure_file_local(dataset_name_or_path, local_dir) + dataset_local_path = s3_helper.ensure_file_local(dataset_name_or_path, local_dir) + + def find_files(path: str, extension: str) -> List[str]: + return [os.path.join(root, file) for root, _, files in os.walk(path) + for file in files if file.endswith(f'.{extension}')] + local_files = find_files(dataset_local_path, file_format) - dataset_local_paths = [os.path.join(dataset_local_path, file) for file in local_files] - train_local_paths = [] - test_local_paths = [] - for file in dataset_local_paths: + logging.info(f"Found local files: {local_files}") + + data_files: Dict[str, List[str]] = {"train": [], "test": []} + for file in local_files: if "train" in file: - train_local_paths.append(file) + data_files["train"].append(file) elif "test" in file: - test_local_paths.append(file) + data_files["test"].append(file) else: - raise ValueError("Not Implemented") - # Load and return the dataset - return load_dataset(file_format, data_files={'train': train_local_paths, "test": test_local_paths}, *args, **kwargs) \ No newline at end of file + logging.warning(f"Unclassified file: {file}") + + if split: + if split not in data_files: + raise ValueError(f"Invalid split: {split}. Available splits are: {list(data_files.keys())}") + data_files = {split: data_files[split]} + + # Remove empty splits + data_files = {k: v for k, v in data_files.items() if v} + + if not data_files: + raise ValueError(f"No valid files found for the specified format and split.") + + logging.info(f"Loading dataset with data_files: {data_files}") + return load_dataset(file_format, data_files=data_files, *args, **kwargs) \ No newline at end of file diff --git a/setup.py b/setup.py index eb072c1..818a845 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ setup( name='research-utils', - version='0.2.0', # Increment the version number + version='0.2.1', # Increment the version number description='A helper library for working with S3/Minio, Hugging Face models, and datasets', long_description='This library provides utilities for downloading and managing machine learning models and datasets from S3-compatible storage services, and loading them using the Hugging Face libraries.', author='Alan', @@ -12,8 +12,10 @@ packages=find_packages(), install_requires=[ 'boto3', + # tokenizers >=0.13.3 + 'tokenizers==0.13.3', 'transformers', - 'datasets', # Add the datasets library + 'datasets==2.20.0', # Add the datasets library ], classifiers=[ 'Programming Language :: Python :: 3', diff --git a/test_flows.py b/test_flows.py index cbd8dbe..2031fc7 100644 --- a/test_flows.py +++ b/test_flows.py @@ -7,10 +7,11 @@ S3Helper() # # Example usage -# model_name = "thunghiem/tinyllama" +model_name = "jan-hq/tokenizer-tinyllama" # model = S3HelperAutoModelForCausalLM.from_pretrained(model_name) -# tokenizer = S3HelperAutoTokenizer.from_pretrained(model_name) +tokenizer = S3HelperAutoTokenizer.from_pretrained(model_name) +# print(tokenizer) # config = S3HelperAutoConfig.from_pretrained(model_name) # Make sure S3Helper is initialized and environment variables are set # Load a dataset from S3 bucket -dataset = s3_load_dataset("jan-hq/test_dataset",file_format='parquet', split='train') +dataset = s3_load_dataset("jan-hq/test-dataset",file_format='parquet', split='train') From a8e99246ab383871dad998640d9de9f461a4af41 Mon Sep 17 00:00:00 2001 From: bachvudinh Date: Tue, 16 Jul 2024 17:32:21 +0700 Subject: [PATCH 3/3] add github CI-CD and Minio host script --- .github/workflows/test-flow.yml | 27 +++++++++++++++++++++++++++ Minio_host.sh | 9 +++++++++ test_flows.py | 3 +++ 3 files changed, 39 insertions(+) create mode 100644 Minio_host.sh diff --git a/.github/workflows/test-flow.yml b/.github/workflows/test-flow.yml index e69de29..c39ceef 100644 --- a/.github/workflows/test-flow.yml +++ b/.github/workflows/test-flow.yml @@ -0,0 +1,27 @@ +name: CI/CD Test + +on: + push: + branches: main + pull_request: + branches: main + +jobs: + test: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: '3.10' + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -e . + + - name: Run test_flow.py + run: python test_flows.py \ No newline at end of file diff --git a/Minio_host.sh b/Minio_host.sh new file mode 100644 index 0000000..214f378 --- /dev/null +++ b/Minio_host.sh @@ -0,0 +1,9 @@ +# mkdir -p ~/minio/data + +docker run \ + -p 9000:9000 \ + -p 9001:9001 \ + --name minio \ + -e "MINIO_ROOT_USER=minioadmin" \ + -e "MINIO_ROOT_PASSWORD=minioadmin" \ + quay.io/minio/minio server /data --console-address ":9001" \ No newline at end of file diff --git a/test_flows.py b/test_flows.py index 2031fc7..dcb987b 100644 --- a/test_flows.py +++ b/test_flows.py @@ -1,5 +1,6 @@ from s3helper import S3Helper,S3HelperAutoConfig,S3HelperAutoTokenizer,S3HelperAutoModelForCausalLM, s3_load_dataset import os +import logging os.environ['S3_ACCESS_KEY'] = 'minioadmin' os.environ['S3_SECRET_KEY'] = 'minioadmin' @@ -10,8 +11,10 @@ model_name = "jan-hq/tokenizer-tinyllama" # model = S3HelperAutoModelForCausalLM.from_pretrained(model_name) tokenizer = S3HelperAutoTokenizer.from_pretrained(model_name) +logging.info(f"Tokenizer Loading successful: {tokenizer}") # print(tokenizer) # config = S3HelperAutoConfig.from_pretrained(model_name) # Make sure S3Helper is initialized and environment variables are set # Load a dataset from S3 bucket dataset = s3_load_dataset("jan-hq/test-dataset",file_format='parquet', split='train') +logging.info(f"Dataset Loading successful") \ No newline at end of file