diff --git a/.github/workflows/test-flow.yml b/.github/workflows/test-flow.yml new file mode 100644 index 0000000..c39ceef --- /dev/null +++ b/.github/workflows/test-flow.yml @@ -0,0 +1,27 @@ +name: CI/CD Test + +on: + push: + branches: main + pull_request: + branches: main + +jobs: + test: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: '3.10' + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -e . + + - name: Run test_flow.py + run: python test_flows.py \ No newline at end of file diff --git a/example_env.sh b/example_env.sh index 2eeb355..9633801 100755 --- a/example_env.sh +++ b/example_env.sh @@ -1,3 +1,4 @@ export S3_ACCESS_KEY=minioadmin export S3_SECRET_KEY=minioadmin -export S3_ENDPOINT_URL=http://127.0.0.1:9000 +export S3_ENDPOINT_URL="http://127.0.0.1:9000" +mc alias set ALIAS $S3_ENDPOINT_URL $S3_ACCESS_KEY $S3_SECRET_KEY \ No newline at end of file diff --git a/s3helper/s3_helper.py b/s3helper/s3_helper.py index f8955e9..a493c14 100644 --- a/s3helper/s3_helper.py +++ b/s3helper/s3_helper.py @@ -4,12 +4,23 @@ from transformers import AutoModelForCausalLM, AutoTokenizer, AutoConfig import sys import logging -from datasets import load_dataset, Dataset -from typing import Optional, Dict, Any - +from datasets import load_dataset, Dataset, load_from_disk +from typing import Optional, Dict, Any, List # Configure logging logging.basicConfig(stream=sys.stdout, level=logging.INFO, format='%(asctime)s - %(message)s') +def find_files(directory: str, file_format: str): + matching_files = [] + + # Walk through the directory + for root, _, files in os.walk(directory): + for file in files: + # Check if the file ends with the specified format + if file.endswith(f".{file_format}"): + matching_files.append(os.path.join(root, file)) + + return matching_files + class S3Helper: _instance = None @@ -46,7 +57,7 @@ def validate_credentials(self): logging.error(f"Invalid S3 credentials: {e}") raise ValueError("Invalid S3 credentials") - def download_model(self, path_components: list, local_dir: str = './models'): + def download_file(self, path_components: list, local_dir: str): bucket_name = path_components[0] model_name = path_components[1] objects = self.s3_client.list_objects_v2(Bucket=bucket_name, Prefix=model_name) @@ -59,99 +70,113 @@ def download_model(self, path_components: list, local_dir: str = './models'): self.s3_client.download_file(bucket_name, file_key, file_path) logging.info(f'Downloaded file: {file_key}') - def ensure_model_local(self, pretrained_model_name_or_path, local_dir): - path_components = pretrained_model_name_or_path.split("/") + def ensure_file_local(self, file_name_or_path: str, local_dir: str): + path_components = file_name_or_path.split("/") if len(path_components) != 2: - logging.error("Cannot recognize bucket name and model name since having > 2 components") - raise ValueError("Cannot recognize bucket name and model name since having > 2 components") - model_local_path = os.path.join(local_dir, pretrained_model_name_or_path) - if not os.path.exists(model_local_path): - os.makedirs(model_local_path, exist_ok=True) - self.download_model(path_components, local_dir) + logging.error("Cannot recognize bucket name and file name since the components are not 2") + raise ValueError("Cannot recognize bucket name and file name since the components are not 2") + file_local_path = os.path.join(local_dir, file_name_or_path) + if not os.path.exists(file_local_path): + os.makedirs(file_local_path, exist_ok=True) + self.download_file(path_components, local_dir) else: - logging.info(f"Model existed at: {model_local_path}, read from cache") - return model_local_path + if 'model' in local_dir.lower(): + + logging.info(f"Model existed at: {file_local_path}, read from cache") + elif 'dataset' in local_dir.lower(): + logging.info(f"Dataset existed at: {file_local_path}, read from cache") + return file_local_path - def upload_to_s3(self, local_dir, bucket_name, model_name): + def upload_to_s3(self, local_dir, bucket_name, file_name): for root, _, files in os.walk(local_dir): for file in files: local_file_path = os.path.join(root, file) s3_key = os.path.relpath(local_file_path, local_dir) - self.s3_client.upload_file(local_file_path, bucket_name, os.path.join(model_name, s3_key)) - logging.info(f'Uploaded {local_file_path} to s3://{bucket_name}/{model_name}/{s3_key}') - def download_dataset(self, path_components: list, local_dir: str = './datasets'): - bucket_name = path_components[0] - dataset_name = path_components[1] - objects = self.s3_client.list_objects_v2(Bucket=bucket_name, Prefix=dataset_name) - for obj in objects.get('Contents', []): - file_key = obj['Key'] - if file_key.endswith('/'): - continue # Skip directories - file_path = os.path.join(local_dir, bucket_name, file_key) - os.makedirs(os.path.dirname(file_path), exist_ok=True) - self.s3_client.download_file(bucket_name, file_key, file_path) - logging.info(f'Downloaded dataset file: {file_key}') + self.s3_client.upload_file(local_file_path, bucket_name, os.path.join(file_name, s3_key)) + logging.info(f'Uploaded {local_file_path} to s3://{bucket_name}/{file_name}/{s3_key}') + # def download_dataset(self, path_components: list, local_dir: str = './datasets'): + # bucket_name = path_components[0] + # dataset_name = path_components[1] + # objects = self.s3_client.list_objects_v2(Bucket=bucket_name, Prefix=dataset_name) + # for obj in objects.get('Contents', []): + # file_key = obj['Key'] + # if file_key.endswith('/'): + # continue # Skip directories + # file_path = os.path.join(local_dir, bucket_name, file_key) + # os.makedirs(os.path.dirname(file_path), exist_ok=True) + # self.s3_client.download_file(bucket_name, file_key, file_path) + # logging.info(f'Downloaded dataset file: {file_key}') class S3HelperAutoModelForCausalLM(AutoModelForCausalLM): @classmethod def from_pretrained(cls, pretrained_model_name_or_path, *model_args, local_dir: str = './models', **kwargs): s3_helper = S3Helper.get_instance() - model_local_path = s3_helper.ensure_model_local(pretrained_model_name_or_path, local_dir) + model_local_path = s3_helper.ensure_file_local(pretrained_model_name_or_path, local_dir) return super().from_pretrained(model_local_path, *model_args, **kwargs) class S3HelperAutoTokenizer(AutoTokenizer): @classmethod def from_pretrained(cls, pretrained_model_name_or_path, *model_args, local_dir: str = './models', **kwargs): s3_helper = S3Helper.get_instance() - tokenizer_local_path = s3_helper.ensure_model_local(pretrained_model_name_or_path, local_dir) + tokenizer_local_path = s3_helper.ensure_file_local(pretrained_model_name_or_path, local_dir) return super().from_pretrained(tokenizer_local_path, *model_args, **kwargs) class S3HelperAutoConfig(AutoConfig): @classmethod def from_pretrained(cls, pretrained_model_name_or_path, *model_args, local_dir: str = './models', **kwargs): s3_helper = S3Helper.get_instance() - config_local_path = s3_helper.ensure_model_local(pretrained_model_name_or_path, local_dir) + config_local_path = s3_helper.ensure_file_local(pretrained_model_name_or_path, local_dir) return super().from_pretrained(config_local_path, *model_args, **kwargs) # defined a custom load_dataset from S3 bucket def s3_load_dataset( - path: str, - local_dir: str = './datasets', + dataset_name_or_path: str, file_format: str = 'json', + local_dir: str = './datasets', + split: str = None, *args: Any, **kwargs: Any ) -> Dataset: """ Load a dataset from S3/Minio storage. - Args: - path (str): Path to the dataset in the format 'bucket_name/dataset_name' - local_dir (str): Local directory to store downloaded datasets - file_format (str): Format of the dataset file (e.g., 'json', 'csv', 'parquet') - *args: Additional positional arguments to pass to load_dataset - **kwargs: Additional keyword arguments to pass to load_dataset - + dataset_name_or_path (str): Path to the dataset in the format 'bucket_name/dataset_name' + file_format (str): File format of the dataset. Either 'json', 'csv', or 'parquet'. + local_dir (str): Local directory to store downloaded datasets + split (str): Dataset split to load ('train', 'test', or None for all) + *args: Additional positional arguments to pass to load_dataset + **kwargs: Additional keyword arguments to pass to load_dataset Returns: - Dataset: The loaded dataset + Dataset: The loaded dataset """ s3_helper = S3Helper.get_instance() + dataset_local_path = s3_helper.ensure_file_local(dataset_name_or_path, local_dir) - # Split the path into bucket and dataset name - path_components = path.split("/") - if len(path_components) != 2: - raise ValueError("Path should be in the format 'bucket_name/dataset_name'") + def find_files(path: str, extension: str) -> List[str]: + return [os.path.join(root, file) for root, _, files in os.walk(path) + for file in files if file.endswith(f'.{extension}')] - bucket_name, dataset_name = path_components - dataset_local_path = os.path.join(local_dir, bucket_name, dataset_name) - - # Download dataset if not exists locally - if not os.path.exists(dataset_local_path): - os.makedirs(dataset_local_path, exist_ok=True) - s3_helper.download_dataset(path_components, local_dir) - else: - logging.info(f"Dataset already exists at: {dataset_local_path}, using cached version") - - # Construct the path to the data file - data_file_path = os.path.join(dataset_local_path, f"data.{file_format}") + local_files = find_files(dataset_local_path, file_format) + logging.info(f"Found local files: {local_files}") + + data_files: Dict[str, List[str]] = {"train": [], "test": []} + for file in local_files: + if "train" in file: + data_files["train"].append(file) + elif "test" in file: + data_files["test"].append(file) + else: + logging.warning(f"Unclassified file: {file}") + + if split: + if split not in data_files: + raise ValueError(f"Invalid split: {split}. Available splits are: {list(data_files.keys())}") + data_files = {split: data_files[split]} + + # Remove empty splits + data_files = {k: v for k, v in data_files.items() if v} + + if not data_files: + raise ValueError(f"No valid files found for the specified format and split.") - # Load and return the dataset - return load_dataset(file_format, data_files=data_file_path, *args, **kwargs) \ No newline at end of file + logging.info(f"Loading dataset with data_files: {data_files}") + return load_dataset(file_format, data_files=data_files, *args, **kwargs) \ No newline at end of file diff --git a/setup.py b/setup.py index eb072c1..818a845 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ setup( name='research-utils', - version='0.2.0', # Increment the version number + version='0.2.1', # Increment the version number description='A helper library for working with S3/Minio, Hugging Face models, and datasets', long_description='This library provides utilities for downloading and managing machine learning models and datasets from S3-compatible storage services, and loading them using the Hugging Face libraries.', author='Alan', @@ -12,8 +12,10 @@ packages=find_packages(), install_requires=[ 'boto3', + # tokenizers >=0.13.3 + 'tokenizers==0.13.3', 'transformers', - 'datasets', # Add the datasets library + 'datasets==2.20.0', # Add the datasets library ], classifiers=[ 'Programming Language :: Python :: 3', diff --git a/main.py b/test_flows.py similarity index 50% rename from main.py rename to test_flows.py index 1ba6bad..dcb987b 100644 --- a/main.py +++ b/test_flows.py @@ -1,23 +1,20 @@ from s3helper import S3Helper,S3HelperAutoConfig,S3HelperAutoTokenizer,S3HelperAutoModelForCausalLM, s3_load_dataset import os +import logging os.environ['S3_ACCESS_KEY'] = 'minioadmin' os.environ['S3_SECRET_KEY'] = 'minioadmin' -os.environ['S3_ENDPOINT_URL'] = 'http://172.17.0.2:9001' +os.environ['S3_ENDPOINT_URL'] = 'http://172.17.0.2:9000' S3Helper() # # Example usage -# model_name = "thunghiem/tinyllama" +model_name = "jan-hq/tokenizer-tinyllama" # model = S3HelperAutoModelForCausalLM.from_pretrained(model_name) -# tokenizer = S3HelperAutoTokenizer.from_pretrained(model_name) +tokenizer = S3HelperAutoTokenizer.from_pretrained(model_name) +logging.info(f"Tokenizer Loading successful: {tokenizer}") +# print(tokenizer) # config = S3HelperAutoConfig.from_pretrained(model_name) # Make sure S3Helper is initialized and environment variables are set -# Load a dataset -dataset = s3_load_dataset("modelhubjan/test_dataset") - -# Use the dataset -for item in dataset: - print(item) - -# You can also pass additional arguments to load_dataset -dataset = s3_load_dataset("modelhubjan/test_dataset", file_format='parquet', split='train') \ No newline at end of file +# Load a dataset from S3 bucket +dataset = s3_load_dataset("jan-hq/test-dataset",file_format='parquet', split='train') +logging.info(f"Dataset Loading successful") \ No newline at end of file