Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev/bach #3

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .github/workflows/test-flow.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
name: CI/CD Test

on:
push:
branches: main
pull_request:
branches: main

jobs:
test:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.10'

- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -e .

- name: Run test_flow.py
run: python test_flows.py
3 changes: 2 additions & 1 deletion example_env.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export S3_ACCESS_KEY=minioadmin
export S3_SECRET_KEY=minioadmin
export S3_ENDPOINT_URL=http://127.0.0.1:9000
export S3_ENDPOINT_URL="http://127.0.0.1:9000"
mc alias set ALIAS $S3_ENDPOINT_URL $S3_ACCESS_KEY $S3_SECRET_KEY
145 changes: 85 additions & 60 deletions s3helper/s3_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,23 @@
from transformers import AutoModelForCausalLM, AutoTokenizer, AutoConfig
import sys
import logging
from datasets import load_dataset, Dataset
from typing import Optional, Dict, Any

from datasets import load_dataset, Dataset, load_from_disk
from typing import Optional, Dict, Any, List
# Configure logging
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format='%(asctime)s - %(message)s')

def find_files(directory: str, file_format: str):
matching_files = []

# Walk through the directory
for root, _, files in os.walk(directory):
for file in files:
# Check if the file ends with the specified format
if file.endswith(f".{file_format}"):
matching_files.append(os.path.join(root, file))

return matching_files

class S3Helper:
_instance = None

Expand Down Expand Up @@ -46,7 +57,7 @@ def validate_credentials(self):
logging.error(f"Invalid S3 credentials: {e}")
raise ValueError("Invalid S3 credentials")

def download_model(self, path_components: list, local_dir: str = './models'):
def download_file(self, path_components: list, local_dir: str):
bucket_name = path_components[0]
model_name = path_components[1]
objects = self.s3_client.list_objects_v2(Bucket=bucket_name, Prefix=model_name)
Expand All @@ -59,99 +70,113 @@ def download_model(self, path_components: list, local_dir: str = './models'):
self.s3_client.download_file(bucket_name, file_key, file_path)
logging.info(f'Downloaded file: {file_key}')

def ensure_model_local(self, pretrained_model_name_or_path, local_dir):
path_components = pretrained_model_name_or_path.split("/")
def ensure_file_local(self, file_name_or_path: str, local_dir: str):
path_components = file_name_or_path.split("/")
if len(path_components) != 2:
logging.error("Cannot recognize bucket name and model name since having > 2 components")
raise ValueError("Cannot recognize bucket name and model name since having > 2 components")
model_local_path = os.path.join(local_dir, pretrained_model_name_or_path)
if not os.path.exists(model_local_path):
os.makedirs(model_local_path, exist_ok=True)
self.download_model(path_components, local_dir)
logging.error("Cannot recognize bucket name and file name since the components are not 2")
raise ValueError("Cannot recognize bucket name and file name since the components are not 2")
file_local_path = os.path.join(local_dir, file_name_or_path)
if not os.path.exists(file_local_path):
os.makedirs(file_local_path, exist_ok=True)
self.download_file(path_components, local_dir)
else:
logging.info(f"Model existed at: {model_local_path}, read from cache")
return model_local_path
if 'model' in local_dir.lower():

logging.info(f"Model existed at: {file_local_path}, read from cache")
elif 'dataset' in local_dir.lower():
logging.info(f"Dataset existed at: {file_local_path}, read from cache")
return file_local_path

def upload_to_s3(self, local_dir, bucket_name, model_name):
def upload_to_s3(self, local_dir, bucket_name, file_name):
for root, _, files in os.walk(local_dir):
for file in files:
local_file_path = os.path.join(root, file)
s3_key = os.path.relpath(local_file_path, local_dir)
self.s3_client.upload_file(local_file_path, bucket_name, os.path.join(model_name, s3_key))
logging.info(f'Uploaded {local_file_path} to s3://{bucket_name}/{model_name}/{s3_key}')
def download_dataset(self, path_components: list, local_dir: str = './datasets'):
bucket_name = path_components[0]
dataset_name = path_components[1]
objects = self.s3_client.list_objects_v2(Bucket=bucket_name, Prefix=dataset_name)
for obj in objects.get('Contents', []):
file_key = obj['Key']
if file_key.endswith('/'):
continue # Skip directories
file_path = os.path.join(local_dir, bucket_name, file_key)
os.makedirs(os.path.dirname(file_path), exist_ok=True)
self.s3_client.download_file(bucket_name, file_key, file_path)
logging.info(f'Downloaded dataset file: {file_key}')
self.s3_client.upload_file(local_file_path, bucket_name, os.path.join(file_name, s3_key))
logging.info(f'Uploaded {local_file_path} to s3://{bucket_name}/{file_name}/{s3_key}')
# def download_dataset(self, path_components: list, local_dir: str = './datasets'):
# bucket_name = path_components[0]
# dataset_name = path_components[1]
# objects = self.s3_client.list_objects_v2(Bucket=bucket_name, Prefix=dataset_name)
# for obj in objects.get('Contents', []):
# file_key = obj['Key']
# if file_key.endswith('/'):
# continue # Skip directories
# file_path = os.path.join(local_dir, bucket_name, file_key)
# os.makedirs(os.path.dirname(file_path), exist_ok=True)
# self.s3_client.download_file(bucket_name, file_key, file_path)
# logging.info(f'Downloaded dataset file: {file_key}')

class S3HelperAutoModelForCausalLM(AutoModelForCausalLM):
@classmethod
def from_pretrained(cls, pretrained_model_name_or_path, *model_args, local_dir: str = './models', **kwargs):
s3_helper = S3Helper.get_instance()
model_local_path = s3_helper.ensure_model_local(pretrained_model_name_or_path, local_dir)
model_local_path = s3_helper.ensure_file_local(pretrained_model_name_or_path, local_dir)
return super().from_pretrained(model_local_path, *model_args, **kwargs)

class S3HelperAutoTokenizer(AutoTokenizer):
@classmethod
def from_pretrained(cls, pretrained_model_name_or_path, *model_args, local_dir: str = './models', **kwargs):
s3_helper = S3Helper.get_instance()
tokenizer_local_path = s3_helper.ensure_model_local(pretrained_model_name_or_path, local_dir)
tokenizer_local_path = s3_helper.ensure_file_local(pretrained_model_name_or_path, local_dir)
return super().from_pretrained(tokenizer_local_path, *model_args, **kwargs)

class S3HelperAutoConfig(AutoConfig):
@classmethod
def from_pretrained(cls, pretrained_model_name_or_path, *model_args, local_dir: str = './models', **kwargs):
s3_helper = S3Helper.get_instance()
config_local_path = s3_helper.ensure_model_local(pretrained_model_name_or_path, local_dir)
config_local_path = s3_helper.ensure_file_local(pretrained_model_name_or_path, local_dir)
return super().from_pretrained(config_local_path, *model_args, **kwargs)
# defined a custom load_dataset from S3 bucket
def s3_load_dataset(
path: str,
local_dir: str = './datasets',
dataset_name_or_path: str,
file_format: str = 'json',
local_dir: str = './datasets',
split: str = None,
*args: Any,
**kwargs: Any
) -> Dataset:
"""
Load a dataset from S3/Minio storage.

Args:
path (str): Path to the dataset in the format 'bucket_name/dataset_name'
local_dir (str): Local directory to store downloaded datasets
file_format (str): Format of the dataset file (e.g., 'json', 'csv', 'parquet')
*args: Additional positional arguments to pass to load_dataset
**kwargs: Additional keyword arguments to pass to load_dataset

dataset_name_or_path (str): Path to the dataset in the format 'bucket_name/dataset_name'
file_format (str): File format of the dataset. Either 'json', 'csv', or 'parquet'.
local_dir (str): Local directory to store downloaded datasets
split (str): Dataset split to load ('train', 'test', or None for all)
*args: Additional positional arguments to pass to load_dataset
**kwargs: Additional keyword arguments to pass to load_dataset
Returns:
Dataset: The loaded dataset
Dataset: The loaded dataset
"""
s3_helper = S3Helper.get_instance()
dataset_local_path = s3_helper.ensure_file_local(dataset_name_or_path, local_dir)

# Split the path into bucket and dataset name
path_components = path.split("/")
if len(path_components) != 2:
raise ValueError("Path should be in the format 'bucket_name/dataset_name'")
def find_files(path: str, extension: str) -> List[str]:
return [os.path.join(root, file) for root, _, files in os.walk(path)
for file in files if file.endswith(f'.{extension}')]

bucket_name, dataset_name = path_components
dataset_local_path = os.path.join(local_dir, bucket_name, dataset_name)

# Download dataset if not exists locally
if not os.path.exists(dataset_local_path):
os.makedirs(dataset_local_path, exist_ok=True)
s3_helper.download_dataset(path_components, local_dir)
else:
logging.info(f"Dataset already exists at: {dataset_local_path}, using cached version")

# Construct the path to the data file
data_file_path = os.path.join(dataset_local_path, f"data.{file_format}")
local_files = find_files(dataset_local_path, file_format)
logging.info(f"Found local files: {local_files}")

data_files: Dict[str, List[str]] = {"train": [], "test": []}
for file in local_files:
if "train" in file:
data_files["train"].append(file)
elif "test" in file:
data_files["test"].append(file)
else:
logging.warning(f"Unclassified file: {file}")

if split:
if split not in data_files:
raise ValueError(f"Invalid split: {split}. Available splits are: {list(data_files.keys())}")
data_files = {split: data_files[split]}

# Remove empty splits
data_files = {k: v for k, v in data_files.items() if v}

if not data_files:
raise ValueError(f"No valid files found for the specified format and split.")

# Load and return the dataset
return load_dataset(file_format, data_files=data_file_path, *args, **kwargs)
logging.info(f"Loading dataset with data_files: {data_files}")
return load_dataset(file_format, data_files=data_files, *args, **kwargs)
6 changes: 4 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

setup(
name='research-utils',
version='0.2.0', # Increment the version number
version='0.2.1', # Increment the version number
description='A helper library for working with S3/Minio, Hugging Face models, and datasets',
long_description='This library provides utilities for downloading and managing machine learning models and datasets from S3-compatible storage services, and loading them using the Hugging Face libraries.',
author='Alan',
Expand All @@ -12,8 +12,10 @@
packages=find_packages(),
install_requires=[
'boto3',
# tokenizers >=0.13.3
'tokenizers==0.13.3',
'transformers',
'datasets', # Add the datasets library
'datasets==2.20.0', # Add the datasets library
],
classifiers=[
'Programming Language :: Python :: 3',
Expand Down
21 changes: 9 additions & 12 deletions main.py → test_flows.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,20 @@
from s3helper import S3Helper,S3HelperAutoConfig,S3HelperAutoTokenizer,S3HelperAutoModelForCausalLM, s3_load_dataset
import os
import logging

os.environ['S3_ACCESS_KEY'] = 'minioadmin'
os.environ['S3_SECRET_KEY'] = 'minioadmin'
os.environ['S3_ENDPOINT_URL'] = 'http://172.17.0.2:9001'
os.environ['S3_ENDPOINT_URL'] = 'http://172.17.0.2:9000'
S3Helper()

# # Example usage
# model_name = "thunghiem/tinyllama"
model_name = "jan-hq/tokenizer-tinyllama"
# model = S3HelperAutoModelForCausalLM.from_pretrained(model_name)
# tokenizer = S3HelperAutoTokenizer.from_pretrained(model_name)
tokenizer = S3HelperAutoTokenizer.from_pretrained(model_name)
logging.info(f"Tokenizer Loading successful: {tokenizer}")
# print(tokenizer)
# config = S3HelperAutoConfig.from_pretrained(model_name)
# Make sure S3Helper is initialized and environment variables are set
# Load a dataset
dataset = s3_load_dataset("modelhubjan/test_dataset")

# Use the dataset
for item in dataset:
print(item)

# You can also pass additional arguments to load_dataset
dataset = s3_load_dataset("modelhubjan/test_dataset", file_format='parquet', split='train')
# Load a dataset from S3 bucket
dataset = s3_load_dataset("jan-hq/test-dataset",file_format='parquet', split='train')
logging.info(f"Dataset Loading successful")
Loading