Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev/bach #8

Merged
merged 15 commits into from
Jul 17, 2024
51 changes: 51 additions & 0 deletions .github/runners/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
FROM python:3.11-slim

# Install necessary packages
RUN apt-get update \
&& apt-get install -y --no-install-recommends \
build-essential \
cmake \
sudo \
unzip \
curl \
wget \
git \
git-lfs \
jq \
&& rm -rf /var/lib/apt/lists/*


RUN curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip" && \
unzip awscliv2.zip && \
./aws/install

ENV HOME=/home/runner

RUN mkdir -p /home/runner

ARG RUNNER_VERSION=2.317.0

ARG RUNNER_UID=1000
ARG DOCKER_GID=1001

RUN adduser --disabled-password --gecos "" --uid $RUNNER_UID runner \
&& groupadd docker --gid $DOCKER_GID \
&& usermod -aG sudo runner \
&& usermod -aG docker runner \
&& echo "%sudo ALL=(ALL:ALL) NOPASSWD:ALL" > /etc/sudoers \
&& echo "Defaults env_keep += \"DEBIAN_FRONTEND\"" >> /etc/sudoers

# cd into the user directory, download and unzip the github actions runner
RUN cd /home/runner && mkdir actions-runner && cd actions-runner \
&& curl -O -L https://github.com/actions/runner/releases/download/v${RUNNER_VERSION}/actions-runner-linux-x64-${RUNNER_VERSION}.tar.gz \
&& tar xzf ./actions-runner-linux-x64-${RUNNER_VERSION}.tar.gz

RUN chown -R runner:runner /home/runner && /home/runner/actions-runner/bin/installdependencies.sh

ADD ./start.sh /home/runner/start.sh

RUN chmod +x /home/runner/start.sh

ENTRYPOINT ["/bin/bash", "/home/runner/start.sh"]

USER runner
21 changes: 21 additions & 0 deletions .github/runners/start.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/bin/bash

RUNNER_REPO=$RUNNER_REPO
RUNNER_PAT=$RUNNER_PAT
RUNNER_GROUP=$RUNNER_GROUP
RUNNER_LABELS=$RUNNER_LABELS
RUNNER_NAME=$(hostname)

cd /home/runner/actions-runner

./config.sh --unattended --replace --url https://github.com/${RUNNER_REPO} --pat ${RUNNER_PAT} --name ${RUNNER_NAME} --runnergroup ${RUNNER_GROUP} --labels ${RUNNER_LABELS} --work /home/runner/actions-runner/_work

cleanup() {
echo "Removing runner..."
./config.sh remove --unattended --pat ${RUNNER_PAT}
}

trap 'cleanup; exit 130' INT
trap 'cleanup; exit 143' TERM

./run.sh & wait $!
27 changes: 27 additions & 0 deletions .github/workflows/test-flow.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
name: CI/CD Test

on:
push:
branches: main
pull_request:
branches: main
workflow_dispatch:

jobs:
test:
runs-on: research-utils

steps:
- uses: actions/checkout@v4

- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -e .

- name: Run test_flow.py
run: python test_flows.py
env:
S3_ACCESS_KEY: ${{ secrets.MINIO_ACCESS_KEY_ID }}
S3_SECRET_KEY: ${{ secrets.MINIO_SECRET_ACCESS_KEY }}
S3_ENDPOINT_URL: ${{ secrets.MINIO_ENDPOINT }}
3 changes: 2 additions & 1 deletion example_env.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export S3_ACCESS_KEY=minioadmin
export S3_SECRET_KEY=minioadmin
export S3_ENDPOINT_URL=http://127.0.0.1:9000
export S3_ENDPOINT_URL="http://127.0.0.1:9000"
mc alias set ALIAS $S3_ENDPOINT_URL $S3_ACCESS_KEY $S3_SECRET_KEY
23 changes: 0 additions & 23 deletions main.py

This file was deleted.

145 changes: 85 additions & 60 deletions s3helper/s3_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,23 @@
from transformers import AutoModelForCausalLM, AutoTokenizer, AutoConfig
import sys
import logging
from datasets import load_dataset, Dataset
from typing import Optional, Dict, Any

from datasets import load_dataset, Dataset, load_from_disk
from typing import Optional, Dict, Any, List
# Configure logging
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format='%(asctime)s - %(message)s')

def find_files(directory: str, file_format: str):
matching_files = []

# Walk through the directory
for root, _, files in os.walk(directory):
for file in files:
# Check if the file ends with the specified format
if file.endswith(f".{file_format}"):
matching_files.append(os.path.join(root, file))

return matching_files

class S3Helper:
_instance = None

Expand Down Expand Up @@ -46,7 +57,7 @@ def validate_credentials(self):
logging.error(f"Invalid S3 credentials: {e}")
raise ValueError("Invalid S3 credentials")

def download_model(self, path_components: list, local_dir: str = './models'):
def download_file(self, path_components: list, local_dir: str):
bucket_name = path_components[0]
model_name = path_components[1]
objects = self.s3_client.list_objects_v2(Bucket=bucket_name, Prefix=model_name)
Expand All @@ -59,99 +70,113 @@ def download_model(self, path_components: list, local_dir: str = './models'):
self.s3_client.download_file(bucket_name, file_key, file_path)
logging.info(f'Downloaded file: {file_key}')

def ensure_model_local(self, pretrained_model_name_or_path, local_dir):
path_components = pretrained_model_name_or_path.split("/")
def ensure_file_local(self, file_name_or_path: str, local_dir: str):
path_components = file_name_or_path.split("/")
if len(path_components) != 2:
logging.error("Cannot recognize bucket name and model name since having > 2 components")
raise ValueError("Cannot recognize bucket name and model name since having > 2 components")
model_local_path = os.path.join(local_dir, pretrained_model_name_or_path)
if not os.path.exists(model_local_path):
os.makedirs(model_local_path, exist_ok=True)
self.download_model(path_components, local_dir)
logging.error("Cannot recognize bucket name and file name since the components are not 2")
raise ValueError("Cannot recognize bucket name and file name since the components are not 2")
file_local_path = os.path.join(local_dir, file_name_or_path)
if not os.path.exists(file_local_path):
os.makedirs(file_local_path, exist_ok=True)
self.download_file(path_components, local_dir)
else:
logging.info(f"Model existed at: {model_local_path}, read from cache")
return model_local_path
if 'model' in local_dir.lower():

logging.info(f"Model existed at: {file_local_path}, read from cache")
elif 'dataset' in local_dir.lower():
logging.info(f"Dataset existed at: {file_local_path}, read from cache")
return file_local_path

def upload_to_s3(self, local_dir, bucket_name, model_name):
def upload_to_s3(self, local_dir, bucket_name, file_name):
for root, _, files in os.walk(local_dir):
for file in files:
local_file_path = os.path.join(root, file)
s3_key = os.path.relpath(local_file_path, local_dir)
self.s3_client.upload_file(local_file_path, bucket_name, os.path.join(model_name, s3_key))
logging.info(f'Uploaded {local_file_path} to s3://{bucket_name}/{model_name}/{s3_key}')
def download_dataset(self, path_components: list, local_dir: str = './datasets'):
bucket_name = path_components[0]
dataset_name = path_components[1]
objects = self.s3_client.list_objects_v2(Bucket=bucket_name, Prefix=dataset_name)
for obj in objects.get('Contents', []):
file_key = obj['Key']
if file_key.endswith('/'):
continue # Skip directories
file_path = os.path.join(local_dir, bucket_name, file_key)
os.makedirs(os.path.dirname(file_path), exist_ok=True)
self.s3_client.download_file(bucket_name, file_key, file_path)
logging.info(f'Downloaded dataset file: {file_key}')
self.s3_client.upload_file(local_file_path, bucket_name, os.path.join(file_name, s3_key))
logging.info(f'Uploaded {local_file_path} to s3://{bucket_name}/{file_name}/{s3_key}')
# def download_dataset(self, path_components: list, local_dir: str = './datasets'):
# bucket_name = path_components[0]
# dataset_name = path_components[1]
# objects = self.s3_client.list_objects_v2(Bucket=bucket_name, Prefix=dataset_name)
# for obj in objects.get('Contents', []):
# file_key = obj['Key']
# if file_key.endswith('/'):
# continue # Skip directories
# file_path = os.path.join(local_dir, bucket_name, file_key)
# os.makedirs(os.path.dirname(file_path), exist_ok=True)
# self.s3_client.download_file(bucket_name, file_key, file_path)
# logging.info(f'Downloaded dataset file: {file_key}')

class S3HelperAutoModelForCausalLM(AutoModelForCausalLM):
@classmethod
def from_pretrained(cls, pretrained_model_name_or_path, *model_args, local_dir: str = './models', **kwargs):
s3_helper = S3Helper.get_instance()
model_local_path = s3_helper.ensure_model_local(pretrained_model_name_or_path, local_dir)
model_local_path = s3_helper.ensure_file_local(pretrained_model_name_or_path, local_dir)
return super().from_pretrained(model_local_path, *model_args, **kwargs)

class S3HelperAutoTokenizer(AutoTokenizer):
@classmethod
def from_pretrained(cls, pretrained_model_name_or_path, *model_args, local_dir: str = './models', **kwargs):
s3_helper = S3Helper.get_instance()
tokenizer_local_path = s3_helper.ensure_model_local(pretrained_model_name_or_path, local_dir)
tokenizer_local_path = s3_helper.ensure_file_local(pretrained_model_name_or_path, local_dir)
return super().from_pretrained(tokenizer_local_path, *model_args, **kwargs)

class S3HelperAutoConfig(AutoConfig):
@classmethod
def from_pretrained(cls, pretrained_model_name_or_path, *model_args, local_dir: str = './models', **kwargs):
s3_helper = S3Helper.get_instance()
config_local_path = s3_helper.ensure_model_local(pretrained_model_name_or_path, local_dir)
config_local_path = s3_helper.ensure_file_local(pretrained_model_name_or_path, local_dir)
return super().from_pretrained(config_local_path, *model_args, **kwargs)
# defined a custom load_dataset from S3 bucket
def s3_load_dataset(
path: str,
local_dir: str = './datasets',
dataset_name_or_path: str,
file_format: str = 'json',
local_dir: str = './datasets',
split: str = None,
*args: Any,
**kwargs: Any
) -> Dataset:
"""
Load a dataset from S3/Minio storage.

Args:
path (str): Path to the dataset in the format 'bucket_name/dataset_name'
local_dir (str): Local directory to store downloaded datasets
file_format (str): Format of the dataset file (e.g., 'json', 'csv', 'parquet')
*args: Additional positional arguments to pass to load_dataset
**kwargs: Additional keyword arguments to pass to load_dataset

dataset_name_or_path (str): Path to the dataset in the format 'bucket_name/dataset_name'
file_format (str): File format of the dataset. Either 'json', 'csv', or 'parquet'.
local_dir (str): Local directory to store downloaded datasets
split (str): Dataset split to load ('train', 'test', or None for all)
*args: Additional positional arguments to pass to load_dataset
**kwargs: Additional keyword arguments to pass to load_dataset
Returns:
Dataset: The loaded dataset
Dataset: The loaded dataset
"""
s3_helper = S3Helper.get_instance()
dataset_local_path = s3_helper.ensure_file_local(dataset_name_or_path, local_dir)

# Split the path into bucket and dataset name
path_components = path.split("/")
if len(path_components) != 2:
raise ValueError("Path should be in the format 'bucket_name/dataset_name'")
def find_files(path: str, extension: str) -> List[str]:
return [os.path.join(root, file) for root, _, files in os.walk(path)
for file in files if file.endswith(f'.{extension}')]

bucket_name, dataset_name = path_components
dataset_local_path = os.path.join(local_dir, bucket_name, dataset_name)

# Download dataset if not exists locally
if not os.path.exists(dataset_local_path):
os.makedirs(dataset_local_path, exist_ok=True)
s3_helper.download_dataset(path_components, local_dir)
else:
logging.info(f"Dataset already exists at: {dataset_local_path}, using cached version")

# Construct the path to the data file
data_file_path = os.path.join(dataset_local_path, f"data.{file_format}")
local_files = find_files(dataset_local_path, file_format)
logging.info(f"Found local files: {local_files}")

data_files: Dict[str, List[str]] = {"train": [], "test": []}
for file in local_files:
if "train" in file:
data_files["train"].append(file)
elif "test" in file:
data_files["test"].append(file)
else:
logging.warning(f"Unclassified file: {file}")

if split:
if split not in data_files:
raise ValueError(f"Invalid split: {split}. Available splits are: {list(data_files.keys())}")
data_files = {split: data_files[split]}

# Remove empty splits
data_files = {k: v for k, v in data_files.items() if v}

if not data_files:
raise ValueError(f"No valid files found for the specified format and split.")

# Load and return the dataset
return load_dataset(file_format, data_files=data_file_path, *args, **kwargs)
logging.info(f"Loading dataset with data_files: {data_files}")
return load_dataset(file_format, data_files=data_files, *args, **kwargs)
6 changes: 4 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

setup(
name='research-utils',
version='0.2.0', # Increment the version number
version='0.2.1', # Increment the version number
description='A helper library for working with S3/Minio, Hugging Face models, and datasets',
long_description='This library provides utilities for downloading and managing machine learning models and datasets from S3-compatible storage services, and loading them using the Hugging Face libraries.',
author='Alan',
Expand All @@ -12,8 +12,10 @@
packages=find_packages(),
install_requires=[
'boto3',
# tokenizers >=0.13.3
'tokenizers==0.13.3',
'transformers',
'datasets', # Add the datasets library
'datasets==2.20.0', # Add the datasets library
],
classifiers=[
'Programming Language :: Python :: 3',
Expand Down
20 changes: 20 additions & 0 deletions test_flows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from s3helper import S3Helper,S3HelperAutoConfig,S3HelperAutoTokenizer,S3HelperAutoModelForCausalLM, s3_load_dataset
import os
import logging

# os.environ['S3_ACCESS_KEY'] = 'minioadmin'
# os.environ['S3_SECRET_KEY'] = 'minioadmin'
# os.environ['S3_ENDPOINT_URL'] = 'http://172.17.0.2:9000'
S3Helper()

# # Example usage
model_name = "jan-hq-test/tokenizer-tinyllama"
# model = S3HelperAutoModelForCausalLM.from_pretrained(model_name)
tokenizer = S3HelperAutoTokenizer.from_pretrained(model_name)
logging.info(f"Tokenizer Loading successful: {tokenizer}")
# print(tokenizer)
# config = S3HelperAutoConfig.from_pretrained(model_name)
# Make sure S3Helper is initialized and environment variables are set
# Load a dataset from S3 bucket
dataset = s3_load_dataset("jan-hq-test/test-dataset",file_format='parquet', split='train')
logging.info(f"Dataset Loading successful")