Skip to content

Commit

Permalink
Merge pull request #8 from janhq/dev/bach
Browse files Browse the repository at this point in the history
Dev/bach
  • Loading branch information
tikikun authored Jul 17, 2024
2 parents e9b62ae + 2d4f077 commit 99fb4e8
Show file tree
Hide file tree
Showing 8 changed files with 210 additions and 86 deletions.
51 changes: 51 additions & 0 deletions .github/runners/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
FROM python:3.11-slim

# Install necessary packages
RUN apt-get update \
&& apt-get install -y --no-install-recommends \
build-essential \
cmake \
sudo \
unzip \
curl \
wget \
git \
git-lfs \
jq \
&& rm -rf /var/lib/apt/lists/*


RUN curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip" && \
unzip awscliv2.zip && \
./aws/install

ENV HOME=/home/runner

RUN mkdir -p /home/runner

ARG RUNNER_VERSION=2.317.0

ARG RUNNER_UID=1000
ARG DOCKER_GID=1001

RUN adduser --disabled-password --gecos "" --uid $RUNNER_UID runner \
&& groupadd docker --gid $DOCKER_GID \
&& usermod -aG sudo runner \
&& usermod -aG docker runner \
&& echo "%sudo ALL=(ALL:ALL) NOPASSWD:ALL" > /etc/sudoers \
&& echo "Defaults env_keep += \"DEBIAN_FRONTEND\"" >> /etc/sudoers

# cd into the user directory, download and unzip the github actions runner
RUN cd /home/runner && mkdir actions-runner && cd actions-runner \
&& curl -O -L https://github.com/actions/runner/releases/download/v${RUNNER_VERSION}/actions-runner-linux-x64-${RUNNER_VERSION}.tar.gz \
&& tar xzf ./actions-runner-linux-x64-${RUNNER_VERSION}.tar.gz

RUN chown -R runner:runner /home/runner && /home/runner/actions-runner/bin/installdependencies.sh

ADD ./start.sh /home/runner/start.sh

RUN chmod +x /home/runner/start.sh

ENTRYPOINT ["/bin/bash", "/home/runner/start.sh"]

USER runner
21 changes: 21 additions & 0 deletions .github/runners/start.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/bin/bash

RUNNER_REPO=$RUNNER_REPO
RUNNER_PAT=$RUNNER_PAT
RUNNER_GROUP=$RUNNER_GROUP
RUNNER_LABELS=$RUNNER_LABELS
RUNNER_NAME=$(hostname)

cd /home/runner/actions-runner

./config.sh --unattended --replace --url https://github.com/${RUNNER_REPO} --pat ${RUNNER_PAT} --name ${RUNNER_NAME} --runnergroup ${RUNNER_GROUP} --labels ${RUNNER_LABELS} --work /home/runner/actions-runner/_work

cleanup() {
echo "Removing runner..."
./config.sh remove --unattended --pat ${RUNNER_PAT}
}

trap 'cleanup; exit 130' INT
trap 'cleanup; exit 143' TERM

./run.sh & wait $!
27 changes: 27 additions & 0 deletions .github/workflows/test-flow.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
name: CI/CD Test

on:
push:
branches: main
pull_request:
branches: main
workflow_dispatch:

jobs:
test:
runs-on: research-utils

steps:
- uses: actions/checkout@v4

- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -e .
- name: Run test_flow.py
run: python test_flows.py
env:
S3_ACCESS_KEY: ${{ secrets.MINIO_ACCESS_KEY_ID }}
S3_SECRET_KEY: ${{ secrets.MINIO_SECRET_ACCESS_KEY }}
S3_ENDPOINT_URL: ${{ secrets.MINIO_ENDPOINT }}
3 changes: 2 additions & 1 deletion example_env.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export S3_ACCESS_KEY=minioadmin
export S3_SECRET_KEY=minioadmin
export S3_ENDPOINT_URL=http://127.0.0.1:9000
export S3_ENDPOINT_URL="http://127.0.0.1:9000"
mc alias set ALIAS $S3_ENDPOINT_URL $S3_ACCESS_KEY $S3_SECRET_KEY
23 changes: 0 additions & 23 deletions main.py

This file was deleted.

145 changes: 85 additions & 60 deletions s3helper/s3_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,23 @@
from transformers import AutoModelForCausalLM, AutoTokenizer, AutoConfig
import sys
import logging
from datasets import load_dataset, Dataset
from typing import Optional, Dict, Any

from datasets import load_dataset, Dataset, load_from_disk
from typing import Optional, Dict, Any, List
# Configure logging
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format='%(asctime)s - %(message)s')

def find_files(directory: str, file_format: str):
matching_files = []

# Walk through the directory
for root, _, files in os.walk(directory):
for file in files:
# Check if the file ends with the specified format
if file.endswith(f".{file_format}"):
matching_files.append(os.path.join(root, file))

return matching_files

class S3Helper:
_instance = None

Expand Down Expand Up @@ -46,7 +57,7 @@ def validate_credentials(self):
logging.error(f"Invalid S3 credentials: {e}")
raise ValueError("Invalid S3 credentials")

def download_model(self, path_components: list, local_dir: str = './models'):
def download_file(self, path_components: list, local_dir: str):
bucket_name = path_components[0]
model_name = path_components[1]
objects = self.s3_client.list_objects_v2(Bucket=bucket_name, Prefix=model_name)
Expand All @@ -59,99 +70,113 @@ def download_model(self, path_components: list, local_dir: str = './models'):
self.s3_client.download_file(bucket_name, file_key, file_path)
logging.info(f'Downloaded file: {file_key}')

def ensure_model_local(self, pretrained_model_name_or_path, local_dir):
path_components = pretrained_model_name_or_path.split("/")
def ensure_file_local(self, file_name_or_path: str, local_dir: str):
path_components = file_name_or_path.split("/")
if len(path_components) != 2:
logging.error("Cannot recognize bucket name and model name since having > 2 components")
raise ValueError("Cannot recognize bucket name and model name since having > 2 components")
model_local_path = os.path.join(local_dir, pretrained_model_name_or_path)
if not os.path.exists(model_local_path):
os.makedirs(model_local_path, exist_ok=True)
self.download_model(path_components, local_dir)
logging.error("Cannot recognize bucket name and file name since the components are not 2")
raise ValueError("Cannot recognize bucket name and file name since the components are not 2")
file_local_path = os.path.join(local_dir, file_name_or_path)
if not os.path.exists(file_local_path):
os.makedirs(file_local_path, exist_ok=True)
self.download_file(path_components, local_dir)
else:
logging.info(f"Model existed at: {model_local_path}, read from cache")
return model_local_path
if 'model' in local_dir.lower():

logging.info(f"Model existed at: {file_local_path}, read from cache")
elif 'dataset' in local_dir.lower():
logging.info(f"Dataset existed at: {file_local_path}, read from cache")
return file_local_path

def upload_to_s3(self, local_dir, bucket_name, model_name):
def upload_to_s3(self, local_dir, bucket_name, file_name):
for root, _, files in os.walk(local_dir):
for file in files:
local_file_path = os.path.join(root, file)
s3_key = os.path.relpath(local_file_path, local_dir)
self.s3_client.upload_file(local_file_path, bucket_name, os.path.join(model_name, s3_key))
logging.info(f'Uploaded {local_file_path} to s3://{bucket_name}/{model_name}/{s3_key}')
def download_dataset(self, path_components: list, local_dir: str = './datasets'):
bucket_name = path_components[0]
dataset_name = path_components[1]
objects = self.s3_client.list_objects_v2(Bucket=bucket_name, Prefix=dataset_name)
for obj in objects.get('Contents', []):
file_key = obj['Key']
if file_key.endswith('/'):
continue # Skip directories
file_path = os.path.join(local_dir, bucket_name, file_key)
os.makedirs(os.path.dirname(file_path), exist_ok=True)
self.s3_client.download_file(bucket_name, file_key, file_path)
logging.info(f'Downloaded dataset file: {file_key}')
self.s3_client.upload_file(local_file_path, bucket_name, os.path.join(file_name, s3_key))
logging.info(f'Uploaded {local_file_path} to s3://{bucket_name}/{file_name}/{s3_key}')
# def download_dataset(self, path_components: list, local_dir: str = './datasets'):
# bucket_name = path_components[0]
# dataset_name = path_components[1]
# objects = self.s3_client.list_objects_v2(Bucket=bucket_name, Prefix=dataset_name)
# for obj in objects.get('Contents', []):
# file_key = obj['Key']
# if file_key.endswith('/'):
# continue # Skip directories
# file_path = os.path.join(local_dir, bucket_name, file_key)
# os.makedirs(os.path.dirname(file_path), exist_ok=True)
# self.s3_client.download_file(bucket_name, file_key, file_path)
# logging.info(f'Downloaded dataset file: {file_key}')

class S3HelperAutoModelForCausalLM(AutoModelForCausalLM):
@classmethod
def from_pretrained(cls, pretrained_model_name_or_path, *model_args, local_dir: str = './models', **kwargs):
s3_helper = S3Helper.get_instance()
model_local_path = s3_helper.ensure_model_local(pretrained_model_name_or_path, local_dir)
model_local_path = s3_helper.ensure_file_local(pretrained_model_name_or_path, local_dir)
return super().from_pretrained(model_local_path, *model_args, **kwargs)

class S3HelperAutoTokenizer(AutoTokenizer):
@classmethod
def from_pretrained(cls, pretrained_model_name_or_path, *model_args, local_dir: str = './models', **kwargs):
s3_helper = S3Helper.get_instance()
tokenizer_local_path = s3_helper.ensure_model_local(pretrained_model_name_or_path, local_dir)
tokenizer_local_path = s3_helper.ensure_file_local(pretrained_model_name_or_path, local_dir)
return super().from_pretrained(tokenizer_local_path, *model_args, **kwargs)

class S3HelperAutoConfig(AutoConfig):
@classmethod
def from_pretrained(cls, pretrained_model_name_or_path, *model_args, local_dir: str = './models', **kwargs):
s3_helper = S3Helper.get_instance()
config_local_path = s3_helper.ensure_model_local(pretrained_model_name_or_path, local_dir)
config_local_path = s3_helper.ensure_file_local(pretrained_model_name_or_path, local_dir)
return super().from_pretrained(config_local_path, *model_args, **kwargs)
# defined a custom load_dataset from S3 bucket
def s3_load_dataset(
path: str,
local_dir: str = './datasets',
dataset_name_or_path: str,
file_format: str = 'json',
local_dir: str = './datasets',
split: str = None,
*args: Any,
**kwargs: Any
) -> Dataset:
"""
Load a dataset from S3/Minio storage.
Args:
path (str): Path to the dataset in the format 'bucket_name/dataset_name'
local_dir (str): Local directory to store downloaded datasets
file_format (str): Format of the dataset file (e.g., 'json', 'csv', 'parquet')
*args: Additional positional arguments to pass to load_dataset
**kwargs: Additional keyword arguments to pass to load_dataset
dataset_name_or_path (str): Path to the dataset in the format 'bucket_name/dataset_name'
file_format (str): File format of the dataset. Either 'json', 'csv', or 'parquet'.
local_dir (str): Local directory to store downloaded datasets
split (str): Dataset split to load ('train', 'test', or None for all)
*args: Additional positional arguments to pass to load_dataset
**kwargs: Additional keyword arguments to pass to load_dataset
Returns:
Dataset: The loaded dataset
Dataset: The loaded dataset
"""
s3_helper = S3Helper.get_instance()
dataset_local_path = s3_helper.ensure_file_local(dataset_name_or_path, local_dir)

# Split the path into bucket and dataset name
path_components = path.split("/")
if len(path_components) != 2:
raise ValueError("Path should be in the format 'bucket_name/dataset_name'")
def find_files(path: str, extension: str) -> List[str]:
return [os.path.join(root, file) for root, _, files in os.walk(path)
for file in files if file.endswith(f'.{extension}')]

bucket_name, dataset_name = path_components
dataset_local_path = os.path.join(local_dir, bucket_name, dataset_name)

# Download dataset if not exists locally
if not os.path.exists(dataset_local_path):
os.makedirs(dataset_local_path, exist_ok=True)
s3_helper.download_dataset(path_components, local_dir)
else:
logging.info(f"Dataset already exists at: {dataset_local_path}, using cached version")

# Construct the path to the data file
data_file_path = os.path.join(dataset_local_path, f"data.{file_format}")
local_files = find_files(dataset_local_path, file_format)
logging.info(f"Found local files: {local_files}")

data_files: Dict[str, List[str]] = {"train": [], "test": []}
for file in local_files:
if "train" in file:
data_files["train"].append(file)
elif "test" in file:
data_files["test"].append(file)
else:
logging.warning(f"Unclassified file: {file}")

if split:
if split not in data_files:
raise ValueError(f"Invalid split: {split}. Available splits are: {list(data_files.keys())}")
data_files = {split: data_files[split]}

# Remove empty splits
data_files = {k: v for k, v in data_files.items() if v}

if not data_files:
raise ValueError(f"No valid files found for the specified format and split.")

# Load and return the dataset
return load_dataset(file_format, data_files=data_file_path, *args, **kwargs)
logging.info(f"Loading dataset with data_files: {data_files}")
return load_dataset(file_format, data_files=data_files, *args, **kwargs)
6 changes: 4 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

setup(
name='research-utils',
version='0.2.0', # Increment the version number
version='0.2.1', # Increment the version number
description='A helper library for working with S3/Minio, Hugging Face models, and datasets',
long_description='This library provides utilities for downloading and managing machine learning models and datasets from S3-compatible storage services, and loading them using the Hugging Face libraries.',
author='Alan',
Expand All @@ -12,8 +12,10 @@
packages=find_packages(),
install_requires=[
'boto3',
# tokenizers >=0.13.3
'tokenizers==0.13.3',
'transformers',
'datasets', # Add the datasets library
'datasets==2.20.0', # Add the datasets library
],
classifiers=[
'Programming Language :: Python :: 3',
Expand Down
20 changes: 20 additions & 0 deletions test_flows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from s3helper import S3Helper,S3HelperAutoConfig,S3HelperAutoTokenizer,S3HelperAutoModelForCausalLM, s3_load_dataset
import os
import logging

# os.environ['S3_ACCESS_KEY'] = 'minioadmin'
# os.environ['S3_SECRET_KEY'] = 'minioadmin'
# os.environ['S3_ENDPOINT_URL'] = 'http://172.17.0.2:9000'
S3Helper()

# # Example usage
model_name = "jan-hq-test/tokenizer-tinyllama"
# model = S3HelperAutoModelForCausalLM.from_pretrained(model_name)
tokenizer = S3HelperAutoTokenizer.from_pretrained(model_name)
logging.info(f"Tokenizer Loading successful: {tokenizer}")
# print(tokenizer)
# config = S3HelperAutoConfig.from_pretrained(model_name)
# Make sure S3Helper is initialized and environment variables are set
# Load a dataset from S3 bucket
dataset = s3_load_dataset("jan-hq-test/test-dataset",file_format='parquet', split='train')
logging.info(f"Dataset Loading successful")

0 comments on commit 99fb4e8

Please sign in to comment.