From 8d939417b3c5658e2a4a68f8904286c1fc0c4bef Mon Sep 17 00:00:00 2001 From: Ye Cao Date: Wed, 13 Mar 2024 18:29:26 +0800 Subject: [PATCH] Replace the vineyard+fluid example with a tutorial. Signed-off-by: Ye Cao --- docs/tutorials/kubernetes.rst | 1 + .../kubernetes/vineyard-on-fluid.rst | 315 ++++++++++++++++++ .../linear-regression-with-vineyard.py | 163 --------- .../vineyard-on-fluid/prepare-dataset.py | 65 ---- 4 files changed, 316 insertions(+), 228 deletions(-) create mode 100644 docs/tutorials/kubernetes/vineyard-on-fluid.rst delete mode 100644 k8s/examples/vineyard-on-fluid/linear-regression-with-vineyard.py delete mode 100644 k8s/examples/vineyard-on-fluid/prepare-dataset.py diff --git a/docs/tutorials/kubernetes.rst b/docs/tutorials/kubernetes.rst index a7146e3d..fa450720 100644 --- a/docs/tutorials/kubernetes.rst +++ b/docs/tutorials/kubernetes.rst @@ -10,6 +10,7 @@ Vineyard on Kubernetes ./kubernetes/ml-pipeline-mars-pytorch.rst ./kubernetes/data-sharing-with-vineyard-on-kubernetes.rst ./kubernetes/efficient-data-sharing-in-kubeflow-with-vineyard-csi-driver.rst + ./kubernetes/vineyard-on-fluid.rst Vineyard can be seamlessly deployed on Kubernetes, managed by the :ref:`vineyard-operator`, to enhance big-data workflows through its data-aware scheduling policy. This policy diff --git a/docs/tutorials/kubernetes/vineyard-on-fluid.rst b/docs/tutorials/kubernetes/vineyard-on-fluid.rst new file mode 100644 index 00000000..8250dfc2 --- /dev/null +++ b/docs/tutorials/kubernetes/vineyard-on-fluid.rst @@ -0,0 +1,315 @@ +Vineyard + Fluid in Action: Train a Linear Regression Model on ACK +================================================================== + +In this tutorial, we will demonstrate how to train a linear regression +model on ACK (Alibaba Cloud Kubernetes) using VineyardRuntime, +please follow the steps below. + +Step 1: Create a dataset and upload it to OSS +--------------------------------------------- + +The ``prepare-dataset.py`` code introduced below shows how to use the +numpy and pandas libraries in Python to generate a data set of about 22GB +and upload the files to the OSS service through `ossutil`_. The whole process is +divided into two main parts: + +.. note:: + + If your ACK node machine memory is insufficient to generate a 22GB dataset, + please reduce the number of rows (num_rows) in the dataset. + +1. **Dataset creation**: This code uses the pandas library to create a DataFrame +containing multiple random number columns that simulate various attributes of +the real estate market. + +2. **Serialize the dataset**: This line of code serializes the dataset into +the local file ``df.pkl``. + +.. code:: python + + import numpy as np + import pandas as pd + + # generate the dataframe with a size of about 22G + num_rows = 6000 * 10000 + df = pd.DataFrame({ + 'Id': np.random.randint(1, 100000, num_rows), + 'MSSubClass': np.random.randint(20, 201, size=num_rows), + 'LotFrontage': np.random.randint(50, 151, size=num_rows), + 'LotArea': np.random.randint(5000, 20001, size=num_rows), + 'OverallQual': np.random.randint(1, 11, size=num_rows), + 'OverallCond': np.random.randint(1, 11, size=num_rows), + 'YearBuilt': np.random.randint(1900, 2022, size=num_rows), + 'YearRemodAdd': np.random.randint(1900, 2022, size=num_rows), + 'MasVnrArea': np.random.randint(0, 1001, size=num_rows), + 'BsmtFinSF1': np.random.randint(0, 2001, size=num_rows), + 'BsmtFinSF2': np.random.randint(0, 1001, size=num_rows), + 'BsmtUnfSF': np.random.randint(0, 2001, size=num_rows), + 'TotalBsmtSF': np.random.randint(0, 3001, size=num_rows), + '1stFlrSF': np.random.randint(500, 4001, size=num_rows), + '2ndFlrSF': np.random.randint(0, 2001, size=num_rows), + 'LowQualFinSF': np.random.randint(0, 201, size=num_rows), + 'GrLivArea': np.random.randint(600, 5001, size=num_rows), + 'BsmtFullBath': np.random.randint(0, 4, size=num_rows), + 'BsmtHalfBath': np.random.randint(0, 3, size=num_rows), + 'FullBath': np.random.randint(0, 5, size=num_rows), + 'HalfBath': np.random.randint(0, 3, size=num_rows), + 'BedroomAbvGr': np.random.randint(0, 11, size=num_rows), + 'KitchenAbvGr': np.random.randint(0, 4, size=num_rows), + 'TotRmsAbvGrd': np.random.randint(0, 16, size=num_rows), + 'Fireplaces': np.random.randint(0, 4, size=num_rows), + 'GarageYrBlt': np.random.randint(1900, 2022, size=num_rows), + 'GarageCars': np.random.randint(0, 5, num_rows), + 'GarageArea': np.random.randint(0, 1001, num_rows), + 'WoodDeckSF': np.random.randint(0, 501, num_rows), + 'OpenPorchSF': np.random.randint(0, 301, num_rows), + 'EnclosedPorch': np.random.randint(0, 201, num_rows), + '3SsnPorch': np.random.randint(0, 101, num_rows), + 'ScreenPorch': np.random.randint(0, 201, num_rows), + 'PoolArea': np.random.randint(0, 301, num_rows), + 'MiscVal': np.random.randint(0, 5001, num_rows), + 'TotalRooms': np.random.randint(2, 11, num_rows), + "GarageAge": np.random.randint(1, 31, num_rows), + "RemodAge": np.random.randint(1, 31, num_rows), + "HouseAge": np.random.randint(1, 31, num_rows), + "TotalBath": np.random.randint(1, 5, num_rows), + "TotalPorchSF": np.random.randint(1, 1001, num_rows), + "TotalSF": np.random.randint(1000, 6001, num_rows), + "TotalArea": np.random.randint(1000, 6001, num_rows), + 'MoSold': np.random.randint(1, 13, num_rows), + 'YrSold': np.random.randint(2006, 2022, num_rows), + 'SalePrice': np.random.randint(50000, 800001, num_rows), + }) + + # Save the dataframe to the current directory + df.to_pickle("df.pkl") + +3. **Upload the data set to OSS**: Follow the following command to use ossutil to +upload the current file to the OSS service. + +.. code:: bash + + # Upload the current dataset df.pkl to the OSS service through the ossutil cp command. + # Refer to https://help.aliyun.com/zh/oss/developer-reference/upload-objects-6?spm=a2c4g.11186623.0.i3 + $ ossutil cp ./df.pkl oss://your-bucket-name/your-path + + +Step 2: Install the Fluid control plane and Fluid Python SDK in the ACK cluster. +-------------------------------------------------------------------------------- + +Option 1: Install ack-fluid. Reference document: Installing the `cloud native AI suite`_ + +Option 2: Using the open source version, we will use Kubectl to create a +namespace named ``fluid-system`` , and then use Helm to install Fluid. +This process only needs to be completed through the following simple Shell commands. + +.. code:: bash + + # Create the fluid-system namespace + $ kubectl create ns fluid-system + + # Add the Fluid repository to the Helm repository + $ helm repo add fluid https://fluid-cloudnative.github.io/charts + # Get the latest Fluid repository + $ helm repo update + # Find the development version in the Fluid repository + $ helm search repo fluid --devel + # Deploy the corresponding version of the Fluid chart on ACK + $ helm install fluid fluid/fluid --devel + +After we deploy the Fluid platform on ACK, we need to execute the following pip +command to install the Fluid Python SDK. + +.. code:: bash + + $ pip install git+https://github.com/fluid-cloudnative/fluid-client-python.git + + +Step 3: Enable collaborative scheduling of data and tasks (optional) +-------------------------------------------------------------------- + +In cloud environments, end-to-end data operation pipelines often contain multiple subtasks. +When these tasks are scheduled by Kubernetes, the system only considers the required resource +constraints and cannot guarantee that two consecutively executed tasks can run on the same node. +This results in additional network overhead due to data migration when the two use Vineyard to +share intermediate results. + +If you want to schedule tasks and vineyard to the same node to achieve the best performance, +you can modify the configmap configuration as follows to enable fuse affinity scheduling. +In this way, system scheduling will give priority to associated tasks to access memory on the +same node to reduce data migration. The network overhead incurred. + +.. code:: bash + + # Update the webhook-plugins configuration according to the following command + # and enable fuse affinity scheduling + $ kubectl edit configmap webhook-plugins -n fluid-system + data: + pluginsProfile: | + pluginConfig: + - args: | + preferred: + # Enable fuse affinity scheduling + - name: fluid.io/fuse + weight: 100 + ... + + # Restart the fluid-webhook pod + $ kubectl delete pod -lcontrol-plane=fluid-webhook -n fluid-system + +Step 4: Use Fluid Python SDK to build and deploy linear regression data operation pipeline +------------------------------------------------------------------------------------------ + +In the ``linear-regression-with-vineyard.py`` script below, we will explore an example +of building and deploying a machine learning workflow using Python and the Fluid library. +The dataset is generated by the code in the appendix. The workflow covers data preprocessing, +The whole process of model training and model testing. + +.. code:: python + + import fluid + + from fluid import constants + from fluid import models + + # Create a Fluid client instance by connecting to the Fluid control plane + # using the default kubeconfig file + client_config = fluid.ClientConfig() + fluid_client = fluid.FluidClient(client_config) + + # Create a dataset named vineyard in the default namespace + fluid_client.create_dataset( + dataset_name="vineyard", + ) + + # Get the vineyard dataset instance + dataset = fluid_client.get_dataset(dataset_name="vineyard") + + # Initialize the configuration of the vineyard runtime and bind the + # vineyard dataset instance to the runtime. + # The number of replicas is 2, and the memory is 30Gi + dataset.bind_runtime( + runtime_type=constants.VINEYARD_RUNTIME_KIND, + replicas=2, + cache_capacity_GiB=30, + cache_medium="MEM", + wait=True + ) + + # define the data preprocessing task + def preprocess(): + from sklearn.model_selection import train_test_split + + import pandas as pd + import vineyard + + df = pd.read_pickle('/data/df.pkl') + + # Preprocess Data + df = df.drop(df[(df['GrLivArea']>4800)].index) + X = df.drop('SalePrice', axis=1) # Features + y = df['SalePrice'] # Target variable + + del df + + X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2) + + del X, y + + vineyard.put(X_train, name="x_train", persist=True) + vineyard.put(X_test, name="x_test", persist=True) + vineyard.put(y_train, name="y_train", persist=True) + vineyard.put(y_test, name="y_test", persist=True) + + + def train(): + from sklearn.linear_model import LinearRegression + + import joblib + import pandas as pd + import vineyard + + x_train_data = vineyard.get(name="x_train", fetch=True) + y_train_data = vineyard.get(name="y_train", fetch=True) + + model = LinearRegression() + model.fit(x_train_data, y_train_data) + + joblib.dump(model, '/data/model.pkl') + + def test(): + from sklearn.linear_model import LinearRegression + from sklearn.metrics import mean_squared_error + + import vineyard + import joblib + import pandas as pd + + x_test_data = vineyard.get(name="x_test", fetch=True) + y_test_data = vineyard.get(name="y_test", fetch=True) + + model = joblib.load("/data/model.pkl") + y_pred = model.predict(x_test_data) + + err = mean_squared_error(y_test_data, y_pred) + + with open('/data/output.txt', 'a') as f: + f.write(str(err)) + + packages_to_install = ["numpy", "pandas", "pyarrow", "requests", "vineyard", "scikit-learn==1.4.0", "joblib==1.3.2"] + pip_index_url = "https://pypi.tuna.tsinghua.edu.cn/simple" + + preprocess_processor = create_processor(preprocess, packages_to_install, pip_index_url) + train_processor = create_processor(train, packages_to_install, pip_index_url) + test_processor = create_processor(test, packages_to_install, pip_index_url) + + # Create a linear regression model task workflow: data preprocessing -> model training -> model testing + # The following mount path "/var/run/vineyard" is the default path of the vineyard configuration file + flow = dataset.process(processor=preprocess_processor, dataset_mountpath="/var/run/vineyard") \ + .process(processor=train_processor, dataset_mountpath="/var/run/vineyard") \ + .process(processor=test_processor, dataset_mountpath="/var/run/vineyard") + + # Submit the data processing task workflow of the linear regression model and wait for it to run to completion + run = flow.run(run_id="linear-regression-with-vineyard") + run.wait() + +Here's an overview of each part of the code: + +1. **Create Fluid client**: This code is responsible for establishing +a connection with the Fluid control platform using the default kubeconfig file and +creating a Fluid client instance. + +2. **Create and configure the vineyard dataset and runtime environment**: Next, the code +creates a dataset named Vineyard, then obtains the dataset instance, initializes the vineyard +runtime configuration, and sets up a copy number and memory size to bind the dataset to the +runtime environment. + +3. **Define the data preprocessing script**: This part defines a bash script for data +preprocessing, which includes splitting the training set and the test set, as well as +data filtering and other operations. + +4. **Define model training script**: As the name suggests, this code defines another +bash script for training a linear regression model. + +5. **Define the model testing script**: This section contains the model testing logic +for evaluating the trained model. + +6. **Create a task template and define task workflow**: The code encapsulates a task +template function named create_processor, which uses the previously defined bash script +to build data preprocessing, model training and model testing steps respectively. +These steps are designed to be executed sequentially, forming a complete workflow in which +data preprocessing is the first step, followed by model training, and finally model testing. +This serial execution sequence ensures that the output of each stage can be used as the input +of the next stage, thereby achieving a coherent and orderly machine learning process. + +7. **[Optional] Enable data affinity scheduling**: After enabling fuse affinity scheduling, +add the tag ``"fuse.serverful.fluid.io/inject": "true"`` to ensure that related tasks run on the +same physical node first through scheduling. to achieve the best performance in data processing. + +8. **Submit and execute the task workflow**: Submit the entire linear regression model task +workflow to the Fluid platform for execution through the run command. + +9. **Resource Cleanup**: Finally, clean up all resources created on the Fluid platform. + +.. _cloud native AI suite: https://help.aliyun.com/zh/ack/cloud-native-ai-suite/user-guide/deploy-the-cloud-native-ai-suite?spm=a2c4g.11186623.0.i14#task-2038811 +.. _ossutil: https://help.aliyun.com/zh/oss/developer-reference/ossutil \ No newline at end of file diff --git a/k8s/examples/vineyard-on-fluid/linear-regression-with-vineyard.py b/k8s/examples/vineyard-on-fluid/linear-regression-with-vineyard.py deleted file mode 100644 index 6350ac42..00000000 --- a/k8s/examples/vineyard-on-fluid/linear-regression-with-vineyard.py +++ /dev/null @@ -1,163 +0,0 @@ -import fluid - -from fluid import constants -from fluid import models - -# Use the default kubeconfig file to connect to the Fluid control plane -# and create a Fluid client instance -client_config = fluid.ClientConfig() -fluid_client = fluid.FluidClient(client_config) - -# Create a dataset named "vineyard" in the default namespace -fluid_client.create_dataset( - dataset_name="vineyard", - mount_name="dummy-mount-name", - mount_point="dummy-mount-point" -) - -# Get the dataset instance of the "vineyard" dataset -dataset = fluid_client.get_dataset(dataset_name="vineyard") - -# Init vineyard runtime configuration and bind the vineyard dataset instance to the runtime. -# Replicas is 2, and the memory is 30Gi -dataset.bind_runtime( - runtime_type=constants.VINEYARD_RUNTIME_KIND, - replicas=2, - cache_capacity_GiB=30, - cache_medium="MEM", - wait=True -) - -# define the script of data preprocessing -preprocess_data_script = """ -pip3 install numpy pandas pyarrow requests vineyard scikit-learn==1.4.0 joblib==1.3.2 -#!/bin/bash -set -ex - -cat < ./preprocess.py -from sklearn.model_selection import train_test_split - -import pandas as pd -import vineyard - -df = pd.read_pickle('/data/df.pkl') - -# Preprocess Data -df = df.drop(df[(df['GrLivArea']>4800)].index) -X = df.drop('SalePrice', axis=1) # Features -y = df['SalePrice'] # Target variable - -del df - -X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2) - -del X, y - -vineyard.put(X_train, name="x_train", persist=True) -vineyard.put(X_test, name="x_test", persist=True) -vineyard.put(y_train, name="y_train", persist=True) -vineyard.put(y_test, name="y_test", persist=True) - -EOF - -python3 ./preprocess.py -""" - -# define the script of model training -train_data_script = """ -pip3 install numpy pandas pyarrow requests vineyard scikit-learn==1.4.0 joblib==1.3.2 -#!/bin/bash -set -ex - -cat < ./train.py -from sklearn.linear_model import LinearRegression - -import joblib -import pandas as pd -import vineyard - -x_train_data = vineyard.get(name="x_train", fetch=True) -y_train_data = vineyard.get(name="y_train", fetch=True) - -model = LinearRegression() -model.fit(x_train_data, y_train_data) - -joblib.dump(model, '/data/model.pkl') - -EOF -python3 ./train.py -""" - -# define the script of model testing -test_data_script = """ -pip3 install numpy pandas pyarrow requests vineyard scikit-learn==1.4.0 joblib==1.3.2 -#!/bin/bash -set -ex - -cat < ./test.py -from sklearn.linear_model import LinearRegression -from sklearn.metrics import mean_squared_error - -import vineyard -import joblib -import pandas as pd - -x_test_data = vineyard.get(name="x_test", fetch=True) -y_test_data = vineyard.get(name="y_test", fetch=True) - -model = joblib.load("/data/model.pkl") -y_pred = model.predict(x_test_data) - -err = mean_squared_error(y_test_data, y_pred) - -with open('/data/output.txt', 'a') as f: - f.write(str(err)) - -EOF - -python3 ./test.py -""" - -from kubernetes.client import models as k8s_models -# define the template of the task processor and mount the OSS Volume -def create_processor(script): - return models.Processor( - # When enabling fuse affinity scheduling, add the following label - # to achieve the best performance of data processing - # pod_metadata=models.PodMetadata( - # labels={"fuse.serverful.fluid.io/inject": "true"}, - # ), - script=models.ScriptProcessor( - command=["bash"], - source=script, - image="python", - image_tag="3.10", - volumes=[k8s_models.V1Volume( - name="data", - persistent_volume_claim=k8s_models.V1PersistentVolumeClaimVolumeSource( - claim_name="pvc-oss" - ) - )], - volume_mounts=[k8s_models.V1VolumeMount( - name="data", - mount_path="/data" - )], - ) - ) - -preprocess_processor = create_processor(preprocess_data_script) -train_processor = create_processor(train_data_script) -test_processor = create_processor(test_data_script) - -# Create a linear regression model task workflow: data preprocessing -> model training -> model testing -# The following mount path "/var/run" is the default path of the vineyard configuration file -flow = dataset.process(processor=preprocess_processor, dataset_mountpath="/var/run") \ - .process(processor=train_processor, dataset_mountpath="/var/run") \ - .process(processor=test_processor, dataset_mountpath="/var/run") - -# Submit the linear regression model task workflow to the Fluid platform and start execution -run = flow.run(run_id="linear-regression-with-vineyard") -run.wait() - -# Clean up all resources -dataset.clean_up(wait=True) diff --git a/k8s/examples/vineyard-on-fluid/prepare-dataset.py b/k8s/examples/vineyard-on-fluid/prepare-dataset.py deleted file mode 100644 index d6cad2c1..00000000 --- a/k8s/examples/vineyard-on-fluid/prepare-dataset.py +++ /dev/null @@ -1,65 +0,0 @@ -import numpy as np -import pandas as pd - -# generate a dataframe with size around 22G -num_rows = 6000 * 10000 -df = pd.DataFrame({ - 'Id': np.random.randint(1, 100000, num_rows), - 'MSSubClass': np.random.randint(20, 201, size=num_rows), - 'LotFrontage': np.random.randint(50, 151, size=num_rows), - 'LotArea': np.random.randint(5000, 20001, size=num_rows), - 'OverallQual': np.random.randint(1, 11, size=num_rows), - 'OverallCond': np.random.randint(1, 11, size=num_rows), - 'YearBuilt': np.random.randint(1900, 2022, size=num_rows), - 'YearRemodAdd': np.random.randint(1900, 2022, size=num_rows), - 'MasVnrArea': np.random.randint(0, 1001, size=num_rows), - 'BsmtFinSF1': np.random.randint(0, 2001, size=num_rows), - 'BsmtFinSF2': np.random.randint(0, 1001, size=num_rows), - 'BsmtUnfSF': np.random.randint(0, 2001, size=num_rows), - 'TotalBsmtSF': np.random.randint(0, 3001, size=num_rows), - '1stFlrSF': np.random.randint(500, 4001, size=num_rows), - '2andFlrSF': np.random.randint(0, 2001, size=num_rows), - 'LowQualFinSF': np.random.randint(0, 201, size=num_rows), - 'GrLivArea': np.random.randint(600, 5001, size=num_rows), - 'BsmtFullBath': np.random.randint(0, 4, size=num_rows), - 'BsmtHalfBath': np.random.randint(0, 3, size=num_rows), - 'FullBath': np.random.randint(0, 5, size=num_rows), - 'HalfBath': np.random.randint(0, 3, size=num_rows), - 'BedroomAbvGr': np.random.randint(0, 11, size=num_rows), - 'KitchenAbvGr': np.random.randint(0, 4, size=num_rows), - 'TotRmsAbvGrd': np.random.randint(0, 16, size=num_rows), - 'Fireplaces': np.random.randint(0, 4, size=num_rows), - 'GarageYrBlt': np.random.randint(1900, 2022, size=num_rows), - 'GarageCars': np.random.randint(0, 5, num_rows), - 'GarageArea': np.random.randint(0, 1001, num_rows), - 'WoodDeckSF': np.random.randint(0, 501, num_rows), - 'OpenPorchSF': np.random.randint(0, 301, num_rows), - 'EnclosedPorch': np.random.randint(0, 201, num_rows), - '3SsnPorch': np.random.randint(0, 101, num_rows), - 'ScreenPorch': np.random.randint(0, 201, num_rows), - 'PoolArea': np.random.randint(0, 301, num_rows), - 'MiscVal': np.random.randint(0, 5001, num_rows), - 'TotalRooms': np.random.randint(2, 11, num_rows), - "GarageAge": np.random.randint(1, 31, num_rows), - "RemodAge": np.random.randint(1, 31, num_rows), - "HouseAge": np.random.randint(1, 31, num_rows), - "TotalBath": np.random.randint(1, 5, num_rows), - "TotalPorchSF": np.random.randint(1, 1001, num_rows), - "TotalSF": np.random.randint(1000, 6001, num_rows), - "TotalArea": np.random.randint(1000, 6001, num_rows), - 'MoSold': np.random.randint(1, 13, num_rows), - 'YrSold': np.random.randint(2006, 2022, num_rows), - 'SalePrice': np.random.randint(50000, 800001, num_rows), -}) - -import oss2 -import io -from oss2.credentials import EnvironmentVariableCredentialsProvider -# Please set your OSS accessKeyID and accessKeySecret as environment variables OSS_ACCESS_KEY_ID and OSS_ACCESS_KEY_SECRET -auth = oss2.ProviderAuth(EnvironmentVariableCredentialsProvider()) -# Please replace OSS_ENDPOINT and BUCKET_NAME with your OSS Endpoint and Bucket -bucket = oss2.Bucket(auth, 'OSS_ENDPOINT', 'BUCKET_NAME') - -bytes_buffer = io.BytesIO() -df.to_pickle(bytes_buffer) -bucket.put_object("df.pkl", bytes_buffer.getvalue())