Skip to content

Commit

Permalink
s3 as dataset source code review changes
Browse files Browse the repository at this point in the history
  • Loading branch information
deepanker13 committed Apr 5, 2024
1 parent b0600ce commit 7342ee5
Show file tree
Hide file tree
Showing 4 changed files with 171 additions and 96 deletions.
145 changes: 145 additions & 0 deletions examples/pytorch/language-modeling/train_api_hf_dataset.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# install kubeflow-training extra 'huggingface'\n",
"!pip install -U 'kubeflow-training[huggingface]'"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"# import the libraries\n",
"from kubeflow.training.api.training_client import TrainingClient\n",
"from kubeflow.storage_initializer.s3 import S3DatasetParams\n",
"from kubeflow.storage_initializer.hugging_face import (\n",
" HuggingFaceModelParams,\n",
" HuggingFaceTrainParams,\n",
" HfDatasetParams,\n",
")\n",
"from kubeflow.storage_initializer.constants import INIT_CONTAINER_MOUNT_PATH\n",
"from peft import LoraConfig\n",
"import transformers\n",
"from transformers import TrainingArguments\n",
"from kubeflow.training import constants"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {},
"outputs": [],
"source": [
"# create a training client, pass config_file parameter if you want to use kubeconfig other than \"~/.kube/config\"\n",
"client = TrainingClient()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"USING HUGGING FACE HUB AS THE DATASET STORE"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# mention the model, datasets and training parameters\n",
"client.train(\n",
" name=\"huggingface-test\",\n",
" num_workers=2,\n",
" num_procs_per_worker=1,\n",
" # specify the storage class if you don't want to use the default one for the storage-initializer PVC\n",
" # storage_config={\n",
" # \"size\": \"10Gi\",\n",
" # \"storage_class\": \"<your storage class>\",\n",
" # },\n",
" model_provider_parameters=HuggingFaceModelParams(\n",
" model_uri=\"hf://TinyLlama/TinyLlama-1.1B-Chat-v1.0\",\n",
" transformer_type=transformers.AutoModelForCausalLM,\n",
" ),\n",
" # it is assumed for text related tasks, you have 'text' column in the dataset.\n",
" # for more info on how dataset is loaded check load_and_preprocess_data function in sdk/python/kubeflow/trainer/hf_llm_training.py\n",
" dataset_provider_parameters=HfDatasetParams(repo_id=\"imdatta0/ultrachat_1k\"),\n",
" train_parameters=HuggingFaceTrainParams(\n",
" lora_config=LoraConfig(\n",
" r=8,\n",
" lora_alpha=8,\n",
" lora_dropout=0.1,\n",
" bias=\"none\",\n",
" task_type=\"CAUSAL_LM\",\n",
" ),\n",
" training_parameters=TrainingArguments(\n",
" num_train_epochs=1,\n",
" per_device_train_batch_size=1,\n",
" gradient_accumulation_steps=1,\n",
" gradient_checkpointing=True,\n",
" gradient_checkpointing_kwargs={\n",
" \"use_reentrant\": False\n",
" }, # this is mandatory if checkpointng is enabled\n",
" warmup_steps=0.02,\n",
" learning_rate=1,\n",
" lr_scheduler_type=\"cosine\",\n",
" bf16=False,\n",
" logging_steps=0.01,\n",
" output_dir=INIT_CONTAINER_MOUNT_PATH,\n",
" optim=f\"sgd\",\n",
" save_steps=0.01,\n",
" save_total_limit=3,\n",
" disable_tqdm=False,\n",
" resume_from_checkpoint=True,\n",
" remove_unused_columns=True,\n",
" ddp_backend=\"nccl\", # change the backend to gloo if you want cpu based training and remove the gpu key in resources_per_worker\n",
" ),\n",
" ),\n",
" resources_per_worker={\n",
" \"gpu\": 1,\n",
" \"cpu\": 8,\n",
" \"memory\": \"8Gi\",\n",
" }, # remove the gpu key if you don't want to attach gpus to the pods\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# check the logs of the job\n",
"client.get_job_logs(name=\"huggingface-test\", job_kind=constants.PYTORCHJOB_KIND)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "myenv3.11",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.6"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
},
{
"cell_type": "code",
"execution_count": 8,
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
Expand All @@ -33,7 +33,7 @@
},
{
"cell_type": "code",
"execution_count": 16,
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
Expand All @@ -45,7 +45,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"1. USING HUGGING FACE AS THE DATASET STORE"
"USING S3 AS THE DATASET SOURCE"
]
},
{
Expand All @@ -56,78 +56,7 @@
"source": [
"# mention the model, datasets and training parameters\n",
"client.train(\n",
" name=\"test\",\n",
" num_workers=2,\n",
" num_procs_per_worker=1,\n",
" # specify the storage class if you don't want to use the default one for the storage-initializer PVC\n",
" # storage_config={\n",
" # \"size\": \"10Gi\",\n",
" # \"storage_class\": \"<your storage class>\",\n",
" # },\n",
" model_provider_parameters=HuggingFaceModelParams(\n",
" model_uri=\"hf://TinyLlama/TinyLlama-1.1B-Chat-v1.0\",\n",
" transformer_type=transformers.AutoModelForCausalLM,\n",
" ),\n",
" # it is assumed for text related tasks, you have 'text' column in the dataset.\n",
" # for more info on how dataset is loaded check load_and_preprocess_data function in sdk/python/kubeflow/trainer/hf_llm_training.py\n",
" dataset_provider_parameters=HfDatasetParams(repo_id=\"imdatta0/ultrachat_1k\"),\n",
" train_parameters=HuggingFaceTrainParams(\n",
" lora_config=LoraConfig(\n",
" r=8,\n",
" lora_alpha=8,\n",
" lora_dropout=0.1,\n",
" bias=\"none\",\n",
" task_type=\"CAUSAL_LM\",\n",
" ),\n",
" training_parameters=TrainingArguments(\n",
" num_train_epochs=1,\n",
" per_device_train_batch_size=1,\n",
" gradient_accumulation_steps=1,\n",
" gradient_checkpointing=True,\n",
" gradient_checkpointing_kwargs={\n",
" \"use_reentrant\": False\n",
" }, # this is mandatory if checkpointng is enabled\n",
" warmup_steps=0.02,\n",
" learning_rate=1,\n",
" lr_scheduler_type=\"cosine\",\n",
" bf16=False,\n",
" logging_steps=0.01,\n",
" output_dir=INIT_CONTAINER_MOUNT_PATH,\n",
" optim=f\"sgd\",\n",
" save_steps=0.01,\n",
" save_total_limit=3,\n",
" disable_tqdm=False,\n",
" resume_from_checkpoint=True,\n",
" remove_unused_columns=True,\n",
" ddp_backend=\"nccl\", # change the backend to gloo if you want cpu based training and remove the gpu key in resources_per_worker\n",
" ),\n",
" ),\n",
" resources_per_worker={\n",
" \"gpu\": 1,\n",
" \"cpu\": 8,\n",
" \"memory\": \"8Gi\",\n",
" }, # remove the gpu key if you don't want to attach gpus to the pods\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"2. USING S3 AS THE DATASET \n",
"Note - The dataset folder inside the bucket has to be named similar to how hugging face names its folder with the downloaded dataset, i.e (huggingface username or any name without a - or _ ) + triple underscore + dataset name\n",
"For example -> imdatta0___ultrachat_1k can be one possible name"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# mention the model, datasets and training parameters\n",
"client.train(\n",
" name=\"test\",\n",
" name=\"s3-test\",\n",
" num_workers=2,\n",
" num_procs_per_worker=1,\n",
" # specify the storage class if you don't want to use the default one for the storage-initializer PVC\n",
Expand Down Expand Up @@ -197,7 +126,7 @@
"outputs": [],
"source": [
"# check the logs of the job\n",
"client.get_job_logs(name=\"test\", job_kind=constants.PYTORCHJOB_KIND)"
"client.get_job_logs(name=\"s3-test\", job_kind=constants.PYTORCHJOB_KIND)"
]
}
],
Expand All @@ -217,7 +146,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.6"
"version": "3.11.9"
}
},
"nbformat": 4,
Expand Down
37 changes: 20 additions & 17 deletions sdk/python/kubeflow/storage_initializer/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,30 +42,33 @@ def download_dataset(self):
import boto3

# Create an S3 client for Nutanix Object Store/S3
s3_client = boto3.client(
"s3",
s3_client = boto3.Session(
aws_access_key_id=self.config.access_key,
aws_secret_access_key=self.config.secret_key,
endpoint_url=self.config.endpoint_url,
region_name=self.config.region_name,
)

response = s3_client.list_objects_v2(
Bucket=self.config.bucket_name, Prefix=self.config.file_key
)
# Download the file
for obj in response.get("Contents", []):
s3_resource = s3_client.resource('s3', endpoint_url=self.config.endpoint_url)
# Get the bucket object
bucket = s3_resource.Bucket(self.config.bucket_name)

# Filter objects with the specified prefix
objects = bucket.objects.filter(Prefix=self.config.file_key)
# Iterate over filtered objects
for obj in objects:
# Extract the object key (filename)
obj_key = obj["Key"]
obj_key = obj.key
path_components = obj_key.split(os.path.sep)
path_excluded_first_last_parts = os.path.sep.join(path_components[1:-1])

# Create directories if they don't exist
os.makedirs(
os.path.join(VOLUME_PATH_DATASET, self.config.file_key), exist_ok=True
os.path.join(VOLUME_PATH_DATASET, path_excluded_first_last_parts), exist_ok=True
)
s3_client.download_file(
self.config.bucket_name,

# Download the file
file_path = os.path.sep.join(path_components[1:])
bucket.download_file(
obj_key,
os.path.join(
os.path.join(VOLUME_PATH_DATASET, self.config.file_key),
os.path.basename(obj_key),
),
os.path.join(VOLUME_PATH_DATASET, file_path)
)
print(f"Files downloaded")
2 changes: 0 additions & 2 deletions sdk/python/kubeflow/training/api/training_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,8 @@ def train(

if isinstance(dataset_provider_parameters, S3DatasetParams):
dp = "s3"
dataset_name = dataset_provider_parameters.file_key.replace("_" * 3, "/")
elif isinstance(dataset_provider_parameters, HfDatasetParams):
dp = "hf"
dataset_name = dataset_provider_parameters.repo_id
else:
raise ValueError(
f"Invalid dataset provider parameters {dataset_provider_parameters}"
Expand Down

0 comments on commit 7342ee5

Please sign in to comment.