From 7a9b3fcff59641ee9baebbac3a9ec9d5a585aa04 Mon Sep 17 00:00:00 2001 From: Ye Cao Date: Mon, 27 Nov 2023 20:34:23 +0800 Subject: [PATCH] Change the API of kubeflow pipeline from `vineyard.csi.read/writer` to `client.get/put` (#1614) Signed-off-by: Ye Cao --- docs/notes/cloud-native/deploy-kubernetes.rst | 22 +++ docs/notes/developers/build-from-source.rst | 14 -- .../kubernetes/using-vineyard-operator.rst | 9 +- k8s/examples/vineyard-csidriver/Makefile | 2 +- .../pipeline-kfp-v2-with-vineyard.py | 6 +- .../pipeline-kfp-v2-with-vineyard.yaml | 6 +- .../vineyard-csidriver/pipeline-kfp-v2.py | 6 +- .../vineyard-csidriver/pipeline-kfp-v2.yaml | 6 +- .../vineyard-csidriver/prepare-data.yaml | 2 +- k8s/examples/vineyard-kubeflow/Dockerfile | 10 ++ k8s/examples/vineyard-kubeflow/Makefile | 23 ++++ .../pipeline-with-vineyard.py | 88 ++++++++++++ .../pipeline-with-vineyard.yaml | 128 ++++++++++++++++++ k8s/examples/vineyard-kubeflow/pipeline.py | 57 ++++++++ k8s/examples/vineyard-kubeflow/pipeline.yaml | 105 ++++++++++++++ .../vineyard-kubeflow/prepare-data.yaml | 57 ++++++++ .../prepare-data/prepare-data.py | 76 +++++++++++ .../preprocess/preprocess.py | 89 ++++++++++++ k8s/examples/vineyard-kubeflow/rbac.yaml | 31 +++++ k8s/examples/vineyard-kubeflow/readme.md | 60 ++++++++ k8s/examples/vineyard-kubeflow/test/test.py | 46 +++++++ k8s/examples/vineyard-kubeflow/train/train.py | 44 ++++++ 22 files changed, 855 insertions(+), 32 deletions(-) create mode 100644 k8s/examples/vineyard-kubeflow/Dockerfile create mode 100644 k8s/examples/vineyard-kubeflow/Makefile create mode 100644 k8s/examples/vineyard-kubeflow/pipeline-with-vineyard.py create mode 100644 k8s/examples/vineyard-kubeflow/pipeline-with-vineyard.yaml create mode 100644 k8s/examples/vineyard-kubeflow/pipeline.py create mode 100644 k8s/examples/vineyard-kubeflow/pipeline.yaml create mode 100644 k8s/examples/vineyard-kubeflow/prepare-data.yaml create mode 100644 k8s/examples/vineyard-kubeflow/prepare-data/prepare-data.py create mode 100644 k8s/examples/vineyard-kubeflow/preprocess/preprocess.py create mode 100644 k8s/examples/vineyard-kubeflow/rbac.yaml create mode 100644 k8s/examples/vineyard-kubeflow/readme.md create mode 100644 k8s/examples/vineyard-kubeflow/test/test.py create mode 100644 k8s/examples/vineyard-kubeflow/train/train.py diff --git a/docs/notes/cloud-native/deploy-kubernetes.rst b/docs/notes/cloud-native/deploy-kubernetes.rst index 1464343b..8b02ca43 100644 --- a/docs/notes/cloud-native/deploy-kubernetes.rst +++ b/docs/notes/cloud-native/deploy-kubernetes.rst @@ -5,6 +5,27 @@ Deploy on Kubernetes Vineyard is managed by the :ref:`vineyard-operator` on Kubernetes. +Quick start +----------- + +If you want to install vineyard cluster quickly, you can +use the following command. + +Install `vineyardctl`_ as follows. + +.. code:: bash + + pip3 install vineyard + +Use the vineyardctl to install vineyard cluster. + +.. code:: bash + + python3 -m vineyard.ctl install vineyard-cluster --create-namespace + +Also, you could follow the next guide to install vineyard cluster steps +by steps. + Install vineyard-operator ------------------------- @@ -196,5 +217,6 @@ automates much of the boilerplate configuration required when deploying workflow ^^^^^^^^^^^^ :code:`vineyardctl` is the command-line tool for working with the Vineyard Operator. +.. _vineyardctl: https://github.com/v6d-io/v6d/blob/main/k8s/cmd/README.md .. _kind: https://kind.sigs.k8s.io .. _CRD: https://kubernetes.io/docs/tasks/extend-kubernetes/custom-resources/custom-resource-definitions diff --git a/docs/notes/developers/build-from-source.rst b/docs/notes/developers/build-from-source.rst index c9fd0f3f..030515f9 100644 --- a/docs/notes/developers/build-from-source.rst +++ b/docs/notes/developers/build-from-source.rst @@ -139,20 +139,6 @@ After building the vineyard library successfully, you can package an install whe python3 setup.py bdist_wheel -Install vineyardctl -------------------- - -Vineyardctl is available on the Github release page, you can download the binary as follows: - -.. code:: shell - - export LATEST_TAG=$(curl -s "https://api.github.com/repos/v6d-io/v6d/tags" | jq -r '.[0].name') - export OS=$(uname -s | tr '[:upper:]' '[:lower:]') - export ARCH=${$(uname -m)/x86_64/amd64} - curl -Lo vineyardctl https://github.com/v6d-io/v6d/releases/download/$LATEST_TAG/vineyardctl-$LATEST_TAG-$OS-$ARCH - chmod +x vineyardctl - sudo mv vineyardctl /usr/local/bin/ - Building the documentation -------------------------- diff --git a/docs/tutorials/kubernetes/using-vineyard-operator.rst b/docs/tutorials/kubernetes/using-vineyard-operator.rst index fb7d0e4a..11f04a46 100644 --- a/docs/tutorials/kubernetes/using-vineyard-operator.rst +++ b/docs/tutorials/kubernetes/using-vineyard-operator.rst @@ -252,7 +252,7 @@ Check the status of all relevant resources managed by the ``vineyardd-sample`` c .. code:: bash - $ kubectl get all -l app.kubernetes.io/instance=vineyardd -n vineyard-system + $ kubectl get all -l app.kubernetes.io/instance=vineyard-system-vineyardd-sample -n vineyard-system .. admonition:: Expected output :class: admonition-details @@ -307,11 +307,11 @@ First, let's deploy the Python client on two Vineyard nodes as follows. containers: - name: vineyard-python imagePullPolicy: IfNotPresent - image: vineyardcloudnative/vineyard-python:v0.11.4 + image: python:3.10 command: - /bin/bash - -c - - sleep infinity + - pip3 install vineyard && sleep infinity volumeMounts: - mountPath: /var/run name: vineyard-sock @@ -341,7 +341,8 @@ Wait for the vineyard python client pod ready. .. code:: bash NAME READY STATUS RESTARTS AGE - vineyard-python-client-6fd8c47c98-7btkv 1/1 Running 0 93s + vineyard-python-client-6fd84bc897-27glp 1/1 Running 0 93s + vineyard-python-client-6fd84bc897-tlb22 1/1 Running 0 93s Use the kubectl exec command to enter the first vineyard python client pod. diff --git a/k8s/examples/vineyard-csidriver/Makefile b/k8s/examples/vineyard-csidriver/Makefile index 3ce332c3..ff1d7d19 100644 --- a/k8s/examples/vineyard-csidriver/Makefile +++ b/k8s/examples/vineyard-csidriver/Makefile @@ -1,4 +1,4 @@ -REGISTRY := "ghcr.io/v6d-io/v6d/kubeflow-example" +REGISTRY := "ghcr.io/v6d-io/v6d/csidriver-example" docker-build: docker build prepare-data/ -f Dockerfile \ --build-arg APP=prepare-data.py \ diff --git a/k8s/examples/vineyard-csidriver/pipeline-kfp-v2-with-vineyard.py b/k8s/examples/vineyard-csidriver/pipeline-kfp-v2-with-vineyard.py index da961cfc..e636252a 100644 --- a/k8s/examples/vineyard-csidriver/pipeline-kfp-v2-with-vineyard.py +++ b/k8s/examples/vineyard-csidriver/pipeline-kfp-v2-with-vineyard.py @@ -4,7 +4,7 @@ @dsl.container_component def PreProcess(data_multiplier: int): return dsl.ContainerSpec( - image = 'ghcr.io/v6d-io/v6d/kubeflow-example/preprocess-data', + image = 'ghcr.io/v6d-io/v6d/csidriver-example/preprocess-data', command = ['python3', 'preprocess.py'], args = [f'--data_multiplier={data_multiplier}', '--with_vineyard=True'], ) @@ -12,7 +12,7 @@ def PreProcess(data_multiplier: int): @dsl.container_component def Train(): return dsl.ContainerSpec( - image = 'ghcr.io/v6d-io/v6d/kubeflow-example/train-data', + image = 'ghcr.io/v6d-io/v6d/csidriver-example/train-data', command = ['python3', 'train.py'], args = ['--with_vineyard=True'], ) @@ -20,7 +20,7 @@ def Train(): @dsl.container_component def Test(): return dsl.ContainerSpec( - image = 'ghcr.io/v6d-io/v6d/kubeflow-example/test-data', + image = 'ghcr.io/v6d-io/v6d/csidriver-example/test-data', command = ['python3', 'test.py'], args = ['--with_vineyard=True'], ) diff --git a/k8s/examples/vineyard-csidriver/pipeline-kfp-v2-with-vineyard.yaml b/k8s/examples/vineyard-csidriver/pipeline-kfp-v2-with-vineyard.yaml index 8d0f979b..c6aab390 100644 --- a/k8s/examples/vineyard-csidriver/pipeline-kfp-v2-with-vineyard.yaml +++ b/k8s/examples/vineyard-csidriver/pipeline-kfp-v2-with-vineyard.yaml @@ -99,7 +99,7 @@ deploymentSpec: command: - python3 - preprocess.py - image: ghcr.io/v6d-io/v6d/kubeflow-example/preprocess-data + image: ghcr.io/v6d-io/v6d/csidriver-example/preprocess-data exec-test: container: args: @@ -107,7 +107,7 @@ deploymentSpec: command: - python3 - test.py - image: ghcr.io/v6d-io/v6d/kubeflow-example/test-data + image: ghcr.io/v6d-io/v6d/csidriver-example/test-data exec-train: container: args: @@ -115,7 +115,7 @@ deploymentSpec: command: - python3 - train.py - image: ghcr.io/v6d-io/v6d/kubeflow-example/train-data + image: ghcr.io/v6d-io/v6d/csidriver-example/train-data pipelineInfo: description: An example pipeline that trains and logs a regression model. name: machine-learning-pipeline-with-vineyard diff --git a/k8s/examples/vineyard-csidriver/pipeline-kfp-v2.py b/k8s/examples/vineyard-csidriver/pipeline-kfp-v2.py index 993ef4be..88e54839 100644 --- a/k8s/examples/vineyard-csidriver/pipeline-kfp-v2.py +++ b/k8s/examples/vineyard-csidriver/pipeline-kfp-v2.py @@ -4,7 +4,7 @@ @dsl.container_component def PreProcess(data_multiplier: int): return dsl.ContainerSpec( - image = 'ghcr.io/v6d-io/v6d/kubeflow-example/preprocess-data', + image = 'ghcr.io/v6d-io/v6d/csidriver-example/preprocess-data', command = ['python3', 'preprocess.py'], args=[f'--data_multiplier={data_multiplier}'], ) @@ -12,14 +12,14 @@ def PreProcess(data_multiplier: int): @dsl.container_component def Train(): return dsl.ContainerSpec( - image='ghcr.io/v6d-io/v6d/kubeflow-example/train-data', + image='ghcr.io/v6d-io/v6d/csidriver-example/train-data', command = ['python3', 'train.py'], ) @dsl.container_component def Test(): return dsl.ContainerSpec( - image='ghcr.io/v6d-io/v6d/kubeflow-example/test-data', + image='ghcr.io/v6d-io/v6d/csidriver-example/test-data', command = ['python3', 'test.py'], ) diff --git a/k8s/examples/vineyard-csidriver/pipeline-kfp-v2.yaml b/k8s/examples/vineyard-csidriver/pipeline-kfp-v2.yaml index ed694403..c9f13e23 100644 --- a/k8s/examples/vineyard-csidriver/pipeline-kfp-v2.yaml +++ b/k8s/examples/vineyard-csidriver/pipeline-kfp-v2.yaml @@ -23,19 +23,19 @@ deploymentSpec: command: - python3 - preprocess.py - image: ghcr.io/v6d-io/v6d/kubeflow-example/preprocess-data + image: ghcr.io/v6d-io/v6d/csidriver-example/preprocess-data exec-test: container: command: - python3 - test.py - image: ghcr.io/v6d-io/v6d/kubeflow-example/test-data + image: ghcr.io/v6d-io/v6d/csidriver-example/test-data exec-train: container: command: - python3 - train.py - image: ghcr.io/v6d-io/v6d/kubeflow-example/train-data + image: ghcr.io/v6d-io/v6d/csidriver-example/train-data pipelineInfo: description: An example pipeline that trains and logs a regression model. name: machine-learning-pipeline diff --git a/k8s/examples/vineyard-csidriver/prepare-data.yaml b/k8s/examples/vineyard-csidriver/prepare-data.yaml index 9475ff9f..c8b051d9 100644 --- a/k8s/examples/vineyard-csidriver/prepare-data.yaml +++ b/k8s/examples/vineyard-csidriver/prepare-data.yaml @@ -15,7 +15,7 @@ spec: spec: containers: - name: prepare-data - image: ghcr.io/v6d-io/v6d/kubeflow-example/prepare-data + image: ghcr.io/v6d-io/v6d/csidriver-example/prepare-data imagePullPolicy: Always command: ["python3", "/prepare-data.py"] volumeMounts: diff --git a/k8s/examples/vineyard-kubeflow/Dockerfile b/k8s/examples/vineyard-kubeflow/Dockerfile new file mode 100644 index 00000000..612e2d22 --- /dev/null +++ b/k8s/examples/vineyard-kubeflow/Dockerfile @@ -0,0 +1,10 @@ +FROM python:3.10 + +RUN pip3 install --no-cache-dir pandas requests scikit-learn numpy vineyard + +WORKDIR / + +ARG APP +ENV APP ${APP} + +COPY ${APP} /${APP} diff --git a/k8s/examples/vineyard-kubeflow/Makefile b/k8s/examples/vineyard-kubeflow/Makefile new file mode 100644 index 00000000..3ce332c3 --- /dev/null +++ b/k8s/examples/vineyard-kubeflow/Makefile @@ -0,0 +1,23 @@ +REGISTRY := "ghcr.io/v6d-io/v6d/kubeflow-example" +docker-build: + docker build prepare-data/ -f Dockerfile \ + --build-arg APP=prepare-data.py \ + -t $(REGISTRY)/prepare-data + + docker build preprocess/ -f Dockerfile \ + --build-arg APP=preprocess.py \ + -t $(REGISTRY)/preprocess-data + + docker build train/ -f Dockerfile \ + --build-arg APP=train.py \ + -t $(REGISTRY)/train-data + + docker build test/ -f Dockerfile \ + --build-arg APP=test.py \ + -t $(REGISTRY)/test-data + +push-images: + docker push $(REGISTRY)/prepare-data + docker push $(REGISTRY)/preprocess-data + docker push $(REGISTRY)/train-data + docker push $(REGISTRY)/test-data diff --git a/k8s/examples/vineyard-kubeflow/pipeline-with-vineyard.py b/k8s/examples/vineyard-kubeflow/pipeline-with-vineyard.py new file mode 100644 index 00000000..610af034 --- /dev/null +++ b/k8s/examples/vineyard-kubeflow/pipeline-with-vineyard.py @@ -0,0 +1,88 @@ +from kfp import dsl +from kubernetes.client.models import V1EnvVar +import kubernetes as k8s + +def PreProcess(data_multiplier: int, registry: str): + vineyard_volume = dsl.PipelineVolume( + volume=k8s.client.V1Volume( + name="vineyard-socket", + host_path=k8s.client.V1HostPathVolumeSource( + path="/var/run/vineyard-kubernetes/vineyard-system/vineyardd-sample" + ) + ) + ) + + op = dsl.ContainerOp( + name='Preprocess Data', + image = f'{registry}/preprocess-data', + container_kwargs={ + 'image_pull_policy': "Always", + 'env': [V1EnvVar('VINEYARD_IPC_SOCKET', '/var/run/vineyard.sock')] + }, + pvolumes={ + "/data": dsl.PipelineVolume(pvc="benchmark-data"), + "/var/run": vineyard_volume, + }, + command = ['python3', 'preprocess.py'], + arguments=[f'--data_multiplier={data_multiplier}', '--with_vineyard=True'], + ) + op.add_pod_label('scheduling.k8s.v6d.io/vineyardd-namespace', 'vineyard-system') + op.add_pod_label('scheduling.k8s.v6d.io/vineyardd', 'vineyardd-sample') + op.add_pod_label('scheduling.k8s.v6d.io/job', 'preprocess-data') + op.add_pod_annotation('scheduling.k8s.v6d.io/required', '') + return op + +def Train(comp1, registry: str): + op = dsl.ContainerOp( + name='Train Data', + image=f'{registry}/train-data', + container_kwargs={ + 'image_pull_policy': "Always", + 'env': [V1EnvVar('VINEYARD_IPC_SOCKET', '/var/run/vineyard.sock')] + }, + pvolumes={ + "/data": comp1.pvolumes['/data'], + "/var/run": comp1.pvolumes['/var/run'], + }, + command = ['python3', 'train.py'], + arguments=['--with_vineyard=True'], + ) + op.add_pod_label('scheduling.k8s.v6d.io/vineyardd-namespace', 'vineyard-system') + op.add_pod_label('scheduling.k8s.v6d.io/vineyardd', 'vineyardd-sample') + op.add_pod_label('scheduling.k8s.v6d.io/job', 'train-data') + op.add_pod_annotation('scheduling.k8s.v6d.io/required', 'preprocess-data') + return op + +def Test(comp2, registry: str): + op = dsl.ContainerOp( + name='Test Data', + image=f'{registry}/test-data', + container_kwargs={ + 'image_pull_policy': "Always", + 'env': [V1EnvVar('VINEYARD_IPC_SOCKET', '/var/run/vineyard.sock')] + }, + pvolumes={ + "/data": comp2.pvolumes['/data'], + "/var/run": comp2.pvolumes['/var/run'] + }, + command = ['python3', 'test.py'], + arguments=['--with_vineyard=True'], + ) + op.add_pod_label('scheduling.k8s.v6d.io/vineyardd-namespace', 'vineyard-system') + op.add_pod_label('scheduling.k8s.v6d.io/vineyardd', 'vineyardd-sample') + op.add_pod_label('scheduling.k8s.v6d.io/job', 'test-data') + op.add_pod_annotation('scheduling.k8s.v6d.io/required', 'train-data') + return op + +@dsl.pipeline( + name='Machine Learning Pipeline', + description='An example pipeline that trains and logs a regression model.' +) +def pipeline(data_multiplier: int, registry: str): + comp1 = PreProcess(data_multiplier=data_multiplier, registry=registry) + comp2 = Train(comp1, registry=registry) + comp3 = Test(comp2, registry=registry) + +if __name__ == '__main__': + from kfp import compiler + compiler.Compiler().compile(pipeline, __file__[:-3]+ '.yaml') diff --git a/k8s/examples/vineyard-kubeflow/pipeline-with-vineyard.yaml b/k8s/examples/vineyard-kubeflow/pipeline-with-vineyard.yaml new file mode 100644 index 00000000..acff2db5 --- /dev/null +++ b/k8s/examples/vineyard-kubeflow/pipeline-with-vineyard.yaml @@ -0,0 +1,128 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: machine-learning-pipeline- + annotations: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.21, pipelines.kubeflow.org/pipeline_compilation_time: '2023-11-17T16:10:29.221000', + pipelines.kubeflow.org/pipeline_spec: '{"description": "An example pipeline that + trains and logs a regression model.", "inputs": [{"name": "data_multiplier", + "type": "Integer"}, {"name": "registry", "type": "String"}], "name": "Machine + learning Pipeline"}'} + labels: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.21} +spec: + entrypoint: machine-learning-pipeline + templates: + - name: machine-learning-pipeline + inputs: + parameters: + - {name: data_multiplier} + - {name: registry} + dag: + tasks: + - name: preprocess-data + template: preprocess-data + arguments: + parameters: + - {name: data_multiplier, value: '{{inputs.parameters.data_multiplier}}'} + - {name: registry, value: '{{inputs.parameters.registry}}'} + - name: test-data + template: test-data + dependencies: [train-data] + arguments: + parameters: + - {name: registry, value: '{{inputs.parameters.registry}}'} + - name: train-data + template: train-data + dependencies: [preprocess-data] + arguments: + parameters: + - {name: registry, value: '{{inputs.parameters.registry}}'} + - name: preprocess-data + container: + args: ['--data_multiplier={{inputs.parameters.data_multiplier}}', --with_vineyard=True] + command: [python3, preprocess.py] + env: + - {name: VINEYARD_IPC_SOCKET, value: /var/run/vineyard.sock} + image: '{{inputs.parameters.registry}}/preprocess-data' + imagePullPolicy: Always + volumeMounts: + - {mountPath: /data, name: pvolume-d9c6725a1237b14c08a2567cb12c489bec539873deeddba7d87f5b4} + - {mountPath: /var/run, name: vineyard-socket} + inputs: + parameters: + - {name: data_multiplier} + - {name: registry} + metadata: + annotations: {scheduling.k8s.v6d.io/required: ''} + labels: + scheduling.k8s.v6d.io/vineyardd-namespace: vineyard-system + scheduling.k8s.v6d.io/vineyardd: vineyardd-sample + scheduling.k8s.v6d.io/job: preprocess-data + pipelines.kubeflow.org/kfp_sdk_version: 1.8.21 + pipelines.kubeflow.org/pipeline-sdk-type: kfp + pipelines.kubeflow.org/enable_caching: "true" + volumes: + - name: pvolume-d9c6725a1237b14c08a2567cb12c489bec539873deeddba7d87f5b4 + persistentVolumeClaim: {claimName: benchmark-data} + - hostPath: {path: /var/run/vineyard-kubernetes/vineyard-system/vineyardd-sample} + name: vineyard-socket + - name: test-data + container: + args: [--with_vineyard=True] + command: [python3, test.py] + env: + - {name: VINEYARD_IPC_SOCKET, value: /var/run/vineyard.sock} + image: '{{inputs.parameters.registry}}/test-data' + imagePullPolicy: Always + volumeMounts: + - {mountPath: /data, name: pvolume-d9c6725a1237b14c08a2567cb12c489bec539873deeddba7d87f5b4} + - {mountPath: /var/run, name: vineyard-socket} + inputs: + parameters: + - {name: registry} + metadata: + annotations: {scheduling.k8s.v6d.io/required: train-data} + labels: + scheduling.k8s.v6d.io/vineyardd-namespace: vineyard-system + scheduling.k8s.v6d.io/vineyardd: vineyardd-sample + scheduling.k8s.v6d.io/job: test-data + pipelines.kubeflow.org/kfp_sdk_version: 1.8.21 + pipelines.kubeflow.org/pipeline-sdk-type: kfp + pipelines.kubeflow.org/enable_caching: "true" + volumes: + - name: pvolume-d9c6725a1237b14c08a2567cb12c489bec539873deeddba7d87f5b4 + persistentVolumeClaim: {claimName: benchmark-data} + - hostPath: {path: /var/run/vineyard-kubernetes/vineyard-system/vineyardd-sample} + name: vineyard-socket + - name: train-data + container: + args: [--with_vineyard=True] + command: [python3, train.py] + env: + - {name: VINEYARD_IPC_SOCKET, value: /var/run/vineyard.sock} + image: '{{inputs.parameters.registry}}/train-data' + imagePullPolicy: Always + volumeMounts: + - {mountPath: /data, name: pvolume-d9c6725a1237b14c08a2567cb12c489bec539873deeddba7d87f5b4} + - {mountPath: /var/run, name: vineyard-socket} + inputs: + parameters: + - {name: registry} + metadata: + annotations: {scheduling.k8s.v6d.io/required: preprocess-data} + labels: + scheduling.k8s.v6d.io/vineyardd-namespace: vineyard-system + scheduling.k8s.v6d.io/vineyardd: vineyardd-sample + scheduling.k8s.v6d.io/job: train-data + pipelines.kubeflow.org/kfp_sdk_version: 1.8.21 + pipelines.kubeflow.org/pipeline-sdk-type: kfp + pipelines.kubeflow.org/enable_caching: "true" + volumes: + - name: pvolume-d9c6725a1237b14c08a2567cb12c489bec539873deeddba7d87f5b4 + persistentVolumeClaim: {claimName: benchmark-data} + - hostPath: {path: /var/run/vineyard-kubernetes/vineyard-system/vineyardd-sample} + name: vineyard-socket + arguments: + parameters: + - {name: data_multiplier} + - {name: registry} + serviceAccountName: pipeline-runner diff --git a/k8s/examples/vineyard-kubeflow/pipeline.py b/k8s/examples/vineyard-kubeflow/pipeline.py new file mode 100644 index 00000000..db93de6a --- /dev/null +++ b/k8s/examples/vineyard-kubeflow/pipeline.py @@ -0,0 +1,57 @@ +from kfp import dsl + +def PreProcess(data_multiplier: int, registry: str): + op = dsl.ContainerOp( + name='Preprocess Data', + image = f'{registry}/preprocess-data', + container_kwargs={ + 'image_pull_policy': "Always", + }, + pvolumes={ + "/data": dsl.PipelineVolume(pvc="benchmark-data"), + }, + command = ['python3', 'preprocess.py'], + arguments = [f'--data_multiplier={data_multiplier}'], + ) + return op + +def Train(comp1, registry: str): + op = dsl.ContainerOp( + name='Train Data', + image=f'{registry}/train-data', + container_kwargs={ + 'image_pull_policy': "Always", + }, + pvolumes={ + "/data": comp1.pvolumes['/data'], + }, + command = ['python3', 'train.py'], + ) + return op + +def Test(comp2, registry: str): + op = dsl.ContainerOp( + name='Test Data', + image=f'{registry}/test-data', + container_kwargs={ + 'image_pull_policy': "Always", + }, + pvolumes={ + "/data": comp2.pvolumes['/data'], + }, + command = ['python3', 'test.py'], + ) + return op + +@dsl.pipeline( + name='Machine Learning Pipeline', + description='An example pipeline that trains and logs a regression model.' +) +def pipeline(data_multiplier: int, registry: str): + comp1 = PreProcess(data_multiplier=data_multiplier, registry=registry) + comp2 = Train(comp1, registry=registry) + comp3 = Test(comp2, registry=registry) + +if __name__ == '__main__': + from kfp import compiler + compiler.Compiler().compile(pipeline, __file__[:-3]+ '.yaml') diff --git a/k8s/examples/vineyard-kubeflow/pipeline.yaml b/k8s/examples/vineyard-kubeflow/pipeline.yaml new file mode 100644 index 00000000..fe94f948 --- /dev/null +++ b/k8s/examples/vineyard-kubeflow/pipeline.yaml @@ -0,0 +1,105 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: machine-learning-pipeline- + annotations: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.0, pipelines.kubeflow.org/pipeline_compilation_time: '2023-10-10T15:14:12.195049', + pipelines.kubeflow.org/pipeline_spec: '{"description": "An example pipeline that + trains and logs a regression model.", "inputs": [{"name": "data_multiplier", + "type": "Integer"}, {"name": "registry", "type": "String"}], "name": "Machine + Learning Pipeline"}'} + labels: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.0} +spec: + entrypoint: machine-learning-pipeline + templates: + - name: machine-learning-pipeline + inputs: + parameters: + - {name: data_multiplier} + - {name: registry} + dag: + tasks: + - name: preprocess-data + template: preprocess-data + arguments: + parameters: + - {name: data_multiplier, value: '{{inputs.parameters.data_multiplier}}'} + - {name: registry, value: '{{inputs.parameters.registry}}'} + - name: test-data + template: test-data + dependencies: [train-data] + arguments: + parameters: + - {name: registry, value: '{{inputs.parameters.registry}}'} + - name: train-data + template: train-data + dependencies: [preprocess-data] + arguments: + parameters: + - {name: registry, value: '{{inputs.parameters.registry}}'} + - name: preprocess-data + container: + args: ['--data_multiplier={{inputs.parameters.data_multiplier}}'] + command: [python3, preprocess.py] + image: '{{inputs.parameters.registry}}/preprocess-data' + imagePullPolicy: Always + securityContext: + privileged: true + volumeMounts: + - {mountPath: /data, name: pvolume-d9c6725a1237b14c08a2567cb12c489bec539873deeddba7d87f5b4} + inputs: + parameters: + - {name: data_multiplier} + - {name: registry} + metadata: + labels: + pipelines.kubeflow.org/kfp_sdk_version: 1.8.0 + pipelines.kubeflow.org/pipeline-sdk-type: kfp + pipelines.kubeflow.org/enable_caching: "true" + volumes: + - name: pvolume-d9c6725a1237b14c08a2567cb12c489bec539873deeddba7d87f5b4 + persistentVolumeClaim: {claimName: benchmark-data} + - name: test-data + container: + command: [python3, test.py] + image: '{{inputs.parameters.registry}}/test-data' + imagePullPolicy: Always + securityContext: + privileged: true + volumeMounts: + - {mountPath: /data, name: pvolume-d9c6725a1237b14c08a2567cb12c489bec539873deeddba7d87f5b4} + inputs: + parameters: + - {name: registry} + metadata: + labels: + pipelines.kubeflow.org/kfp_sdk_version: 1.8.0 + pipelines.kubeflow.org/pipeline-sdk-type: kfp + pipelines.kubeflow.org/enable_caching: "true" + volumes: + - name: pvolume-d9c6725a1237b14c08a2567cb12c489bec539873deeddba7d87f5b4 + persistentVolumeClaim: {claimName: benchmark-data} + - name: train-data + container: + command: [python3, train.py] + image: '{{inputs.parameters.registry}}/train-data' + imagePullPolicy: Always + securityContext: + privileged: true + volumeMounts: + - {mountPath: /data, name: pvolume-d9c6725a1237b14c08a2567cb12c489bec539873deeddba7d87f5b4} + inputs: + parameters: + - {name: registry} + metadata: + labels: + pipelines.kubeflow.org/kfp_sdk_version: 1.8.0 + pipelines.kubeflow.org/pipeline-sdk-type: kfp + pipelines.kubeflow.org/enable_caching: "true" + volumes: + - name: pvolume-d9c6725a1237b14c08a2567cb12c489bec539873deeddba7d87f5b4 + persistentVolumeClaim: {claimName: benchmark-data} + arguments: + parameters: + - {name: data_multiplier} + - {name: registry} + serviceAccountName: pipeline-runner diff --git a/k8s/examples/vineyard-kubeflow/prepare-data.yaml b/k8s/examples/vineyard-kubeflow/prepare-data.yaml new file mode 100644 index 00000000..2c3abffa --- /dev/null +++ b/k8s/examples/vineyard-kubeflow/prepare-data.yaml @@ -0,0 +1,57 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: prepare-data + namespace: kubeflow +spec: + selector: + matchLabels: + app: prepare-data + replicas: 1 + template: + metadata: + labels: + app: prepare-data + spec: + containers: + - name: prepare-data + image: ghcr.io/v6d-io/v6d/kubeflow-example/prepare-data + imagePullPolicy: Always + command: ["python3", "/prepare-data.py"] + volumeMounts: + - mountPath: /data + name: data + volumes: + - name: data + persistentVolumeClaim: + claimName: benchmark-data +--- +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: benchmark-data + namespace: kubeflow +spec: + storageClassName: manual + accessModes: + - ReadWriteMany + resources: + requests: + storage: 30Gi +--- +apiVersion: v1 +kind: PersistentVolume +metadata: + name: benchmark-data + namespace: kubeflow + labels: + type: local +spec: + storageClassName: manual + capacity: + storage: 30Gi + accessModes: + - ReadWriteMany + hostPath: + # mount a nfs volume to the kubernetes nodes + path: "/mnt/csi-benchmark" \ No newline at end of file diff --git a/k8s/examples/vineyard-kubeflow/prepare-data/prepare-data.py b/k8s/examples/vineyard-kubeflow/prepare-data/prepare-data.py new file mode 100644 index 00000000..9c26d19d --- /dev/null +++ b/k8s/examples/vineyard-kubeflow/prepare-data/prepare-data.py @@ -0,0 +1,76 @@ +import time + +import numpy as np +import pandas as pd + + +def generate_random_dataframe(num_rows): + return pd.DataFrame({ + 'Id': np.random.randint(1, 100000, num_rows), + 'MSSubClass': np.random.randint(20, 201, size=num_rows), + 'LotFrontage': np.random.randint(50, 151, size=num_rows), + 'LotArea': np.random.randint(5000, 20001, size=num_rows), + 'OverallQual': np.random.randint(1, 11, size=num_rows), + 'OverallCond': np.random.randint(1, 11, size=num_rows), + 'YearBuilt': np.random.randint(1900, 2022, size=num_rows), + 'YearRemodAdd': np.random.randint(1900, 2022, size=num_rows), + 'MasVnrArea': np.random.randint(0, 1001, size=num_rows), + 'BsmtFinSF1': np.random.randint(0, 2001, size=num_rows), + 'BsmtFinSF2': np.random.randint(0, 1001, size=num_rows), + 'BsmtUnfSF': np.random.randint(0, 2001, size=num_rows), + 'TotalBsmtSF': np.random.randint(0, 3001, size=num_rows), + '1stFlrSF': np.random.randint(500, 4001, size=num_rows), + '2ndFlrSF': np.random.randint(0, 2001, size=num_rows), + 'LowQualFinSF': np.random.randint(0, 201, size=num_rows), + 'GrLivArea': np.random.randint(600, 5001, size=num_rows), + 'BsmtFullBath': np.random.randint(0, 4, size=num_rows), + 'BsmtHalfBath': np.random.randint(0, 3, size=num_rows), + 'FullBath': np.random.randint(0, 5, size=num_rows), + 'HalfBath': np.random.randint(0, 3, size=num_rows), + 'BedroomAbvGr': np.random.randint(0, 11, size=num_rows), + 'KitchenAbvGr': np.random.randint(0, 4, size=num_rows), + 'TotRmsAbvGrd': np.random.randint(0, 16, size=num_rows), + 'Fireplaces': np.random.randint(0, 4, size=num_rows), + 'GarageYrBlt': np.random.randint(1900, 2022, size=num_rows), + 'GarageCars': np.random.randint(0, 5, num_rows), + 'GarageArea': np.random.randint(0, 1001, num_rows), + 'WoodDeckSF': np.random.randint(0, 501, num_rows), + 'OpenPorchSF': np.random.randint(0, 301, num_rows), + 'EnclosedPorch': np.random.randint(0, 201, num_rows), + '3SsnPorch': np.random.randint(0, 101, num_rows), + 'ScreenPorch': np.random.randint(0, 201, num_rows), + 'PoolArea': np.random.randint(0, 301, num_rows), + 'MiscVal': np.random.randint(0, 5001, num_rows), + 'TotalRooms': np.random.randint(2, 11, num_rows), + "GarageAge": np.random.randint(1, 31, num_rows), + "RemodAge": np.random.randint(1, 31, num_rows), + "HouseAge": np.random.randint(1, 31, num_rows), + "TotalBath": np.random.randint(1, 5, num_rows), + "TotalPorchSF": np.random.randint(1, 1001, num_rows), + "TotalSF": np.random.randint(1000, 6001, num_rows), + "TotalArea": np.random.randint(1000, 6001, num_rows), + 'MoSold': np.random.randint(1, 13, num_rows), + 'YrSold': np.random.randint(2006, 2022, num_rows), + 'SalePrice': np.random.randint(50000, 800001, num_rows), + }) + +def prepare_data(): + print('Start preparing data....', flush=True) + st = time.time() + for multiplier in 4000, 5000, 6000: + df = generate_random_dataframe(10000*(multiplier)) + df.to_pickle('/data/df_{}.pkl'.format(multiplier)) + del df + ed = time.time() + print('##################################', flush=True) + print('dataframe to_pickle time: ', ed - st, flush=True) + + +if __name__ == '__main__': + st = time.time() + print('Preparing data....', flush=True) + prepare_data() + ed = time.time() + print('##################################') + print('preparing data time: ', ed - st, flush=True) + time.sleep(10000000) diff --git a/k8s/examples/vineyard-kubeflow/preprocess/preprocess.py b/k8s/examples/vineyard-kubeflow/preprocess/preprocess.py new file mode 100644 index 00000000..93ff2f4b --- /dev/null +++ b/k8s/examples/vineyard-kubeflow/preprocess/preprocess.py @@ -0,0 +1,89 @@ +import argparse +import os +import time + +#from sklearn.compose import ColumnTransformer +from sklearn.model_selection import train_test_split +#from sklearn.preprocessing import OneHotEncoder + +import pandas as pd +import vineyard + + +def preprocess_data(data_multiplier, with_vineyard): + os.system('sync; echo 3 > /proc/sys/vm/drop_caches') + st = time.time() + df = pd.read_pickle('/data/df_{0}.pkl'.format(data_multiplier)) + + ed = time.time() + print('##################################') + print('read dataframe pickle time: ', ed - st) + + df = df.drop(df[(df['GrLivArea']>4800)].index) + + """ The following part will need large memory usage, disable for benchmark + del df + + # Define the categorical feature columns + categorical_features = df_preocessed.select_dtypes(include='object').columns + + # Create the column transformer for one-hot encoding + preprocessor = ColumnTransformer( + transformers=[('encoder', OneHotEncoder(sparse=False, handle_unknown='ignore'), categorical_features)], + remainder='passthrough' + ) + + # Preprocess the features using the column transformer + one_hot_df = preprocessor.fit_transform(df_preocessed) + + # Get the column names for the encoded features + encoded_feature_names = preprocessor.named_transformers_['encoder'].get_feature_names_out(categorical_features) + + columns = list(encoded_feature_names) + list(df_preocessed.select_dtypes(exclude='object').columns) + + del df_preocessed + + # Concatenate the encoded features with the original numerical features + df = pd.DataFrame(one_hot_df, columns=columns) + + del one_hot_df + """ + + X = df.drop('SalePrice', axis=1) # Features + y = df['SalePrice'] # Target variable + + del df + + X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2) + + del X, y + + st = time.time() + if with_vineyard: + client = vineyard.connect() + client.put(X_train, name="/data/x_train.pkl", persist=True) + client.put(X_test, name="/data/x_test.pkl", persist=True) + client.put(y_train, name="/data/y_train.pkl", persist=True) + client.put(y_test, name="/data/y_test.pkl", persist=True) + else: + X_train.to_pickle('/data/x_train.pkl') + X_test.to_pickle('/data/x_test.pkl') + y_train.to_pickle('/data/y_train.pkl') + y_test.to_pickle('/data/y_test.pkl') + + ed = time.time() + print('##################################') + print('write training and testing data time: ', ed - st) + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument('--data_multiplier', type=int, default=1, help='Multiplier for data') + parser.add_argument('--with_vineyard', type=bool, default=False, help='Whether to use vineyard') + args = parser.parse_args() + st = time.time() + print('Preprocessing data...') + preprocess_data(args.data_multiplier, args.with_vineyard) + ed = time.time() + print('##################################') + print('Preprocessing data time: ', ed - st) diff --git a/k8s/examples/vineyard-kubeflow/rbac.yaml b/k8s/examples/vineyard-kubeflow/rbac.yaml new file mode 100644 index 00000000..cbd71ef5 --- /dev/null +++ b/k8s/examples/vineyard-kubeflow/rbac.yaml @@ -0,0 +1,31 @@ +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: pipeline-runner + namespace: kubeflow +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: pipeline-runner-role +rules: + - apiGroups: [""] + resources: ["persistentvolumeclaims"] + verbs: ["get", "create", "update", "list", "delete"] + - apiGroups: [""] + resources: ["pods"] + verbs: ["get", "patch", "create", "update", "list", "delete"] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: pipeline-runner-rolebinding +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: pipeline-runner-role +subjects: + - kind: ServiceAccount + name: pipeline-runner + namespace: kubeflow \ No newline at end of file diff --git a/k8s/examples/vineyard-kubeflow/readme.md b/k8s/examples/vineyard-kubeflow/readme.md new file mode 100644 index 00000000..a0bc5a58 --- /dev/null +++ b/k8s/examples/vineyard-kubeflow/readme.md @@ -0,0 +1,60 @@ +## Use vineyard to accelerate kubeflow pipelines + +Vineyard can accelerate data sharing by utilizing shared memory compared to existing methods such as local files or S3 services. In this doc, we will show you how to use vineyard to accelerate an existing kubeflow pipeline. + + +### Prerequisites + +- Install the argo CLI tool via the [official guide](https://github.com/argoproj/argo-workflows/releases/). + + +### Overview of the pipeline + +The pipeline we use is a simple pipeline that trains a linear regression model on the dummy Boston Housing Dataset. It contains three steps: preprocess, train, and test. + + +### Run the pipeline + +Assume we have installed [kubeflow](https://www.kubeflow.org/docs/components/pipelines/v1/installation/standalone-deployment/#deploying-kubeflow-pipelines) and [vineyard](https://v6d.io/notes/cloud-native/deploy-kubernetes.html#quick-start) in the kubernetes cluster. We can use the following steps to run the pipeline: + +First, we need to prepare the dataset by running the following command: + +```bash +$ kubectl apply -f prepare_dataset.yaml +``` + +The dataset will be stored in the host path. Also, you may need to wait for a while for the dataset to be generated and you can use the following command to check the status: + +```bash +$ kubectl logs -l app=prepare-data -n kubeflow | grep "preparing data time" >/dev/null && echo "dataset ready" || echo "dataset unready" +``` + +After that, you can run the pipeline via the following command: + +```bash +$ argo submit --watch pipeline-with-vineyard.yaml -p data_multiplier=4000 -p registry="ghcr.io/v6d-io/v6d/kubeflow-example" -n kubeflow +``` + + +### Modifications to use vineyard + +Compared to the original kubeflow pipeline, we could use the following command to check the differences: + +```bash +$ git diff --no-index --unified=40 pipeline.py pipeline-with-vineyard.py +``` + +The main modifications are: +- Add a new volume to the pipeline. This volume is used to connect to the vineyard cluster via the IPC socket file in +the host path. +- Add the scheduler annotations and labels to the pipeline. This is used to schedule the pipeline to the node that has vineyardd running. + +Also, you can check the modifications of the source code as +follows. + +- [Save data in the preparation step](https://github.com/v6d-io/v6d/blob/main/k8s/examples/vineyard-kubeflow/preprocess/preprocess.py#L62-L72). +- [Load data in the training step](https://github.com/v6d-io/v6d/blob/main/k8s/examples/vineyard-kubeflow/train/train.py#L15-L24). +- [load data in the testing step](https://github.com/v6d-io/v6d/blob/main/k8s/examples/vineyard-kubeflow/test/test.py#L14-L20). + +The main modification is to use vineyard to load and save data +rather than using local files. diff --git a/k8s/examples/vineyard-kubeflow/test/test.py b/k8s/examples/vineyard-kubeflow/test/test.py new file mode 100644 index 00000000..eacef615 --- /dev/null +++ b/k8s/examples/vineyard-kubeflow/test/test.py @@ -0,0 +1,46 @@ +import argparse +import os +import time + +from sklearn.metrics import mean_squared_error + +import joblib +import pandas as pd +import vineyard + +def test_model(with_vineyard): + os.system('sync; echo 3 > /proc/sys/vm/drop_caches') + st = time.time() + if with_vineyard: + client = vineyard.connect() + x_test_data = client.get(name="/data/x_test.pkl", fetch=True) + y_test_data = client.get(name="/data/y_test.pkl", fetch=True) + else: + x_test_data = pd.read_pickle("/data/x_test.pkl") + y_test_data = pd.read_pickle("/data/y_test.pkl") + #delete the x_test.pkl and y_test.pkl + os.remove("/data/x_test.pkl") + os.remove("/data/y_test.pkl") + ed = time.time() + print('##################################') + print('read x_test and y_test execution time: ', ed - st) + + model = joblib.load("/data/model.pkl") + y_pred = model.predict(x_test_data) + + err = mean_squared_error(y_test_data, y_pred) + + with open('/data/output.txt', 'a') as f: + f.write(str(err)) + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument('--with_vineyard', type=bool, default=False, help='Whether to use vineyard') + args = parser.parse_args() + st = time.time() + print('Testing model...') + test_model(args.with_vineyard) + ed = time.time() + print('##################################') + print('Testing model data time: ', ed - st) diff --git a/k8s/examples/vineyard-kubeflow/train/train.py b/k8s/examples/vineyard-kubeflow/train/train.py new file mode 100644 index 00000000..5a534ab0 --- /dev/null +++ b/k8s/examples/vineyard-kubeflow/train/train.py @@ -0,0 +1,44 @@ +import argparse +import os +import time + +from sklearn.linear_model import LinearRegression + +import joblib +import pandas as pd +import vineyard + + +def train_model(with_vineyard): + os.system('sync; echo 3 > /proc/sys/vm/drop_caches') + st = time.time() + if with_vineyard: + client = vineyard.connect() + x_train_data = client.get(name="/data/x_train.pkl", fetch=True) + y_train_data = client.get(name="/data/y_train.pkl", fetch=True) + else: + x_train_data = pd.read_pickle("/data/x_train.pkl") + y_train_data = pd.read_pickle("/data/y_train.pkl") + # delete the x_train.pkl and y_train.pkl + os.remove("/data/x_train.pkl") + os.remove("/data/y_train.pkl") + ed = time.time() + print('##################################') + print('read x_train and y_train data time: ', ed - st) + + model = LinearRegression() + model.fit(x_train_data, y_train_data) + + joblib.dump(model, '/data/model.pkl') + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument('--with_vineyard', type=bool, default=False, help='Whether to use vineyard') + args = parser.parse_args() + st = time.time() + print('Training model...') + train_model(args.with_vineyard) + ed = time.time() + print('##################################') + print('Training model data time: ', ed - st)