From 6bcbd2585198d85de90cbd7ffce89cc8bd7358cb Mon Sep 17 00:00:00 2001 From: Andrey Velichkevich Date: Mon, 16 Jan 2023 16:40:59 +0000 Subject: [PATCH] [SDK] Use Katib SDK for E2E Tests (#2075) * [SDK] Use Katib SDK for E2E tests * Fix pvc deletion * Add list_suggestions API * Remove wait from edit Experiment function * Add shell to GitHub action * Add protobuf package to Katib SDK * Add Experiment Timeout to 40 min * Modify SDK Examples * Fix example text * Change to custom_api * Enable verbose logging for Katib E2E * Use expected condition arg * Add timeout and delete options * Modify logging to debug * Use read API to check resource status --- .../workflows/template-e2e-test/action.yaml | 10 +- .../template-setup-e2e-test/action.yaml | 18 +- .../sdk/cmaes-and-resume-policies.ipynb | 753 ++++++------ examples/v1beta1/sdk/nas-with-darts.ipynb | 1 + .../v1beta1/sdk/tune-train-from-func.ipynb | 61 +- .../kubeflow/katib/api/katib_client.py | 1021 ++++++++++++----- .../kubeflow/katib/constants/constants.py | 20 +- .../v1beta1/kubeflow/katib/utils/utils.py | 33 + sdk/python/v1beta1/setup.py | 1 + .../hack/gh-actions/run-e2e-experiment.go | 370 ------ .../scripts/gh-actions/run-e2e-experiment.py | 279 +++++ .../scripts/gh-actions/run-e2e-experiment.sh | 2 +- .../v1beta1/scripts/gh-actions/setup-katib.sh | 6 - 13 files changed, 1406 insertions(+), 1169 deletions(-) delete mode 100644 test/e2e/v1beta1/hack/gh-actions/run-e2e-experiment.go create mode 100644 test/e2e/v1beta1/scripts/gh-actions/run-e2e-experiment.py diff --git a/.github/workflows/template-e2e-test/action.yaml b/.github/workflows/template-e2e-test/action.yaml index ce483e7a795..ef1ca26064d 100644 --- a/.github/workflows/template-e2e-test/action.yaml +++ b/.github/workflows/template-e2e-test/action.yaml @@ -9,14 +9,14 @@ inputs: training-operator: required: false description: whether to deploy training-operator or not - default: "false" + default: false trial-images: required: true description: comma delimited trial image name katib-ui: required: true - description: whether to deploy katib-ui or not - default: "false" + description: whether to deploy katib-ui or not + default: false database-type: required: false description: mysql or postgres @@ -25,11 +25,11 @@ inputs: runs: using: composite steps: - - name: Set Up Minikube Cluster + - name: Setup Minikube Cluster shell: bash run: ./test/e2e/v1beta1/scripts/gh-actions/setup-minikube.sh ${{ inputs.katib-ui }} ${{ inputs.trial-images }} ${{ inputs.experiments }} - - name: Set Up Katib + - name: Setup Katib shell: bash run: ./test/e2e/v1beta1/scripts/gh-actions/setup-katib.sh ${{ inputs.katib-ui }} ${{ inputs.training-operator }} ${{ inputs.database-type }} diff --git a/.github/workflows/template-setup-e2e-test/action.yaml b/.github/workflows/template-setup-e2e-test/action.yaml index 34e5b87dc17..f9cbd08b106 100644 --- a/.github/workflows/template-setup-e2e-test/action.yaml +++ b/.github/workflows/template-setup-e2e-test/action.yaml @@ -10,19 +10,23 @@ inputs: runs: using: composite steps: - - name: Set Up Minikube Cluster + - name: Setup Minikube Cluster uses: manusa/actions-setup-minikube@v2.7.2 with: - minikube version: 'v1.28.0' + minikube version: v1.28.0 kubernetes version: ${{ inputs.kubernetes-version }} start args: --wait-timeout=60s - driver: 'none' + driver: none github token: ${{ env.GITHUB_TOKEN }} - - name: Set Up Docker Buildx + - name: Setup Docker Buildx uses: docker/setup-buildx-action@v2 - - name: Set Up Go env - uses: actions/setup-go@v3 + - name: Setup Python + uses: actions/setup-python@v4 with: - go-version-file: go.mod + python-version: 3.9 + + - name: Install Katib SDK + shell: bash + run: pip install -e sdk/python/v1beta1 diff --git a/examples/v1beta1/sdk/cmaes-and-resume-policies.ipynb b/examples/v1beta1/sdk/cmaes-and-resume-policies.ipynb index 0864dc1559a..c901086c1af 100644 --- a/examples/v1beta1/sdk/cmaes-and-resume-policies.ipynb +++ b/examples/v1beta1/sdk/cmaes-and-resume-policies.ipynb @@ -15,40 +15,23 @@ "The notebook shows how to create, get, check status and delete an Experiment." ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Install Katib SDK\n", + "\n", + "You need to install Katib SDK to run this Notebook." + ] + }, { "cell_type": "code", - "execution_count": 1, + "execution_count": null, "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Requirement already satisfied: kubeflow-katib==0.13.0 in /opt/conda/lib/python3.8/site-packages (0.13.0)\n", - "Requirement already satisfied: kubernetes>=12.0.0 in /opt/conda/lib/python3.8/site-packages (from kubeflow-katib==0.13.0) (12.0.1)\n", - "Requirement already satisfied: setuptools>=21.0.0 in /opt/conda/lib/python3.8/site-packages (from kubeflow-katib==0.13.0) (49.6.0.post20210108)\n", - "Requirement already satisfied: certifi>=14.05.14 in /opt/conda/lib/python3.8/site-packages (from kubeflow-katib==0.13.0) (2021.5.30)\n", - "Requirement already satisfied: six>=1.10 in /opt/conda/lib/python3.8/site-packages (from kubeflow-katib==0.13.0) (1.16.0)\n", - "Requirement already satisfied: urllib3>=1.15.1 in /opt/conda/lib/python3.8/site-packages (from kubeflow-katib==0.13.0) (1.26.5)\n", - "Requirement already satisfied: google-auth>=1.0.1 in /opt/conda/lib/python3.8/site-packages (from kubernetes>=12.0.0->kubeflow-katib==0.13.0) (1.35.0)\n", - "Requirement already satisfied: pyyaml>=3.12 in /opt/conda/lib/python3.8/site-packages (from kubernetes>=12.0.0->kubeflow-katib==0.13.0) (5.4.1)\n", - "Requirement already satisfied: websocket-client!=0.40.0,!=0.41.*,!=0.42.*,>=0.32.0 in /opt/conda/lib/python3.8/site-packages (from kubernetes>=12.0.0->kubeflow-katib==0.13.0) (1.0.1)\n", - "Requirement already satisfied: python-dateutil>=2.5.3 in /opt/conda/lib/python3.8/site-packages (from kubernetes>=12.0.0->kubeflow-katib==0.13.0) (2.8.1)\n", - "Requirement already satisfied: requests-oauthlib in /opt/conda/lib/python3.8/site-packages (from kubernetes>=12.0.0->kubeflow-katib==0.13.0) (1.3.1)\n", - "Requirement already satisfied: requests in /opt/conda/lib/python3.8/site-packages (from kubernetes>=12.0.0->kubeflow-katib==0.13.0) (2.25.1)\n", - "Requirement already satisfied: cachetools<5.0,>=2.0.0 in /opt/conda/lib/python3.8/site-packages (from google-auth>=1.0.1->kubernetes>=12.0.0->kubeflow-katib==0.13.0) (4.2.4)\n", - "Requirement already satisfied: rsa<5,>=3.1.4 in /opt/conda/lib/python3.8/site-packages (from google-auth>=1.0.1->kubernetes>=12.0.0->kubeflow-katib==0.13.0) (4.8)\n", - "Requirement already satisfied: pyasn1-modules>=0.2.1 in /opt/conda/lib/python3.8/site-packages (from google-auth>=1.0.1->kubernetes>=12.0.0->kubeflow-katib==0.13.0) (0.2.8)\n", - "Requirement already satisfied: pyasn1<0.5.0,>=0.4.6 in /opt/conda/lib/python3.8/site-packages (from pyasn1-modules>=0.2.1->google-auth>=1.0.1->kubernetes>=12.0.0->kubeflow-katib==0.13.0) (0.4.8)\n", - "Requirement already satisfied: chardet<5,>=3.0.2 in /opt/conda/lib/python3.8/site-packages (from requests->kubernetes>=12.0.0->kubeflow-katib==0.13.0) (4.0.0)\n", - "Requirement already satisfied: idna<3,>=2.5 in /opt/conda/lib/python3.8/site-packages (from requests->kubernetes>=12.0.0->kubeflow-katib==0.13.0) (2.10)\n", - "Requirement already satisfied: oauthlib>=3.0.0 in /opt/conda/lib/python3.8/site-packages (from requests-oauthlib->kubernetes>=12.0.0->kubeflow-katib==0.13.0) (3.2.0)\n" - ] - } - ], + "outputs": [], "source": [ - "# Install required package (Katib SDK).\n", - "!pip install kubeflow-katib==0.13.0" + "# TODO (andreyvelich): Change to release version when SDK with the new APIs is published.\n", + "!pip install git+https://github.com/kubeflow/katib.git#subdirectory=sdk/python/v1beta1" ] }, { @@ -85,12 +68,12 @@ "source": [ "## Define your Experiment\n", "\n", - "You have to create your Experiment object before deploying it. This Experiment is similar to [this](https://github.com/kubeflow/katib/blob/master/examples/v1beta1/hp-tuning/cmaes.yaml) example." + "You have to create your Experiment object before deploying it. This Experiment is similar to [this](https://github.com/kubeflow/katib/blob/master/examples/v1beta1/hp-tuning/cma-es.yaml) example." ] }, { "cell_type": "code", - "execution_count": 3, + "execution_count": 46, "metadata": {}, "outputs": [], "source": [ @@ -160,11 +143,12 @@ " \"containers\": [\n", " {\n", " \"name\": \"training-container\",\n", - " \"image\": \"docker.io/kubeflowkatib/mxnet-mnist:v0.13.0\",\n", + " \"image\": \"docker.io/kubeflowkatib/mxnet-mnist:v0.14.0\",\n", " \"command\": [\n", " \"python3\",\n", " \"/opt/mxnet-mnist/mnist.py\",\n", " \"--batch-size=64\",\n", + " \"--num-epochs=1\",\n", " \"--lr=${trialParameters.learningRate}\",\n", " \"--num-layers=${trialParameters.numberLayers}\",\n", " \"--optimizer=${trialParameters.optimizer}\"\n", @@ -207,9 +191,9 @@ " kind=\"Experiment\",\n", " metadata=metadata,\n", " spec=V1beta1ExperimentSpec(\n", - " max_trial_count=7,\n", - " parallel_trial_count=3,\n", - " max_failed_trial_count=3,\n", + " max_trial_count=3,\n", + " parallel_trial_count=2,\n", + " max_failed_trial_count=1,\n", " algorithm=algorithm_spec,\n", " objective=objective_spec,\n", " parameters=parameters,\n", @@ -233,7 +217,7 @@ }, { "cell_type": "code", - "execution_count": 4, + "execution_count": 47, "metadata": {}, "outputs": [], "source": [ @@ -241,13 +225,13 @@ "experiment_from_volume_resume_name = \"from-volume-resume-cmaes\"\n", "\n", "# Create new Experiments from the previous Experiment info.\n", - "# Define Experiment with never resume.\n", + "# Define Experiment with Never resume.\n", "experiment_never_resume = copy.deepcopy(experiment)\n", "experiment_never_resume.metadata.name = experiment_never_resume_name\n", "experiment_never_resume.spec.resume_policy = \"Never\"\n", "experiment_never_resume.spec.max_trial_count = 4\n", "\n", - "# Define Experiment with from volume resume.\n", + "# Define Experiment with FromVolume resume.\n", "experiment_from_volume_resume = copy.deepcopy(experiment)\n", "experiment_from_volume_resume.metadata.name = experiment_from_volume_resume_name\n", "experiment_from_volume_resume.spec.resume_policy = \"FromVolume\"\n", @@ -263,7 +247,7 @@ }, { "cell_type": "code", - "execution_count": 5, + "execution_count": 48, "metadata": { "scrolled": true }, @@ -305,9 +289,16 @@ }, { "cell_type": "code", - "execution_count": 6, + "execution_count": 49, "metadata": {}, "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Experiment kubeflow-user-example-com/cmaes-example has been created\n" + ] + }, { "data": { "text/html": [ @@ -319,99 +310,10 @@ }, "metadata": {}, "output_type": "display_data" - }, - { - "data": { - "text/plain": [ - "{'apiVersion': 'kubeflow.org/v1beta1',\n", - " 'kind': 'Experiment',\n", - " 'metadata': {'creationTimestamp': '2021-10-05T23:40:19Z',\n", - " 'generation': 1,\n", - " 'managedFields': [{'apiVersion': 'kubeflow.org/v1beta1',\n", - " 'fieldsType': 'FieldsV1',\n", - " 'fieldsV1': {'f:spec': {'.': {},\n", - " 'f:algorithm': {'.': {}, 'f:algorithmName': {}},\n", - " 'f:maxFailedTrialCount': {},\n", - " 'f:maxTrialCount': {},\n", - " 'f:objective': {'.': {},\n", - " 'f:additionalMetricNames': {},\n", - " 'f:goal': {},\n", - " 'f:objectiveMetricName': {},\n", - " 'f:type': {}},\n", - " 'f:parallelTrialCount': {},\n", - " 'f:parameters': {},\n", - " 'f:trialTemplate': {'.': {},\n", - " 'f:primaryContainerName': {},\n", - " 'f:trialParameters': {},\n", - " 'f:trialSpec': {'.': {},\n", - " 'f:apiVersion': {},\n", - " 'f:kind': {},\n", - " 'f:spec': {'.': {},\n", - " 'f:template': {'.': {},\n", - " 'f:metadata': {'.': {},\n", - " 'f:annotations': {'.': {}, 'f:sidecar.istio.io/inject': {}}},\n", - " 'f:spec': {'.': {}, 'f:containers': {}, 'f:restartPolicy': {}}}}}}}},\n", - " 'manager': 'OpenAPI-Generator',\n", - " 'operation': 'Update',\n", - " 'time': '2021-10-05T23:40:19Z'}],\n", - " 'name': 'cmaes-example',\n", - " 'namespace': 'kubeflow-user-example-com',\n", - " 'resourceVersion': '393930965',\n", - " 'uid': 'ccf6bb73-6768-4e6e-9a23-742d0953ecf1'},\n", - " 'spec': {'algorithm': {'algorithmName': 'cmaes'},\n", - " 'maxFailedTrialCount': 3,\n", - " 'maxTrialCount': 7,\n", - " 'metricsCollectorSpec': {'collector': {'kind': 'StdOut'}},\n", - " 'objective': {'additionalMetricNames': ['Train-accuracy'],\n", - " 'goal': 0.99,\n", - " 'metricStrategies': [{'name': 'Validation-accuracy', 'value': 'max'},\n", - " {'name': 'Train-accuracy', 'value': 'max'}],\n", - " 'objectiveMetricName': 'Validation-accuracy',\n", - " 'type': 'maximize'},\n", - " 'parallelTrialCount': 3,\n", - " 'parameters': [{'feasibleSpace': {'max': '0.06', 'min': '0.01'},\n", - " 'name': 'lr',\n", - " 'parameterType': 'double'},\n", - " {'feasibleSpace': {'max': '5', 'min': '2'},\n", - " 'name': 'num-layers',\n", - " 'parameterType': 'int'},\n", - " {'feasibleSpace': {'list': ['sgd', 'adam', 'ftrl']},\n", - " 'name': 'optimizer',\n", - " 'parameterType': 'categorical'}],\n", - " 'resumePolicy': 'LongRunning',\n", - " 'trialTemplate': {'failureCondition': 'status.conditions.#(type==\"Failed\")#|#(status==\"True\")#',\n", - " 'primaryContainerName': 'training-container',\n", - " 'successCondition': 'status.conditions.#(type==\"Complete\")#|#(status==\"True\")#',\n", - " 'trialParameters': [{'description': 'Learning rate for the training model',\n", - " 'name': 'learningRate',\n", - " 'reference': 'lr'},\n", - " {'description': 'Number of training model layers',\n", - " 'name': 'numberLayers',\n", - " 'reference': 'num-layers'},\n", - " {'description': 'Training model optimizer (sdg, adam or ftrl)',\n", - " 'name': 'optimizer',\n", - " 'reference': 'optimizer'}],\n", - " 'trialSpec': {'apiVersion': 'batch/v1',\n", - " 'kind': 'Job',\n", - " 'spec': {'template': {'metadata': {'annotations': {'sidecar.istio.io/inject': 'false'}},\n", - " 'spec': {'containers': [{'command': ['python3',\n", - " '/opt/mxnet-mnist/mnist.py',\n", - " '--batch-size=64',\n", - " '--lr=${trialParameters.learningRate}',\n", - " '--num-layers=${trialParameters.numberLayers}',\n", - " '--optimizer=${trialParameters.optimizer}'],\n", - " 'image': 'docker.io/kubeflowkatib/mxnet-mnist:v0.13.0',\n", - " 'name': 'training-container'}],\n", - " 'restartPolicy': 'Never'}}}}}}}" - ] - }, - "execution_count": 6, - "metadata": {}, - "output_type": "execute_result" } ], "source": [ - "# Create client.\n", + "# Create Katib client.\n", "kclient = KatibClient()\n", "\n", "# Create your Experiment.\n", @@ -422,14 +324,21 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "Create other Experiments." + "### Create other Experiments" ] }, { "cell_type": "code", - "execution_count": 7, + "execution_count": 50, "metadata": {}, "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Experiment kubeflow-user-example-com/never-resume-cmaes has been created\n" + ] + }, { "data": { "text/html": [ @@ -442,6 +351,13 @@ "metadata": {}, "output_type": "display_data" }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Experiment kubeflow-user-example-com/from-volume-resume-cmaes has been created\n" + ] + }, { "data": { "text/html": [ @@ -453,96 +369,6 @@ }, "metadata": {}, "output_type": "display_data" - }, - { - "data": { - "text/plain": [ - "{'apiVersion': 'kubeflow.org/v1beta1',\n", - " 'kind': 'Experiment',\n", - " 'metadata': {'creationTimestamp': '2021-10-05T23:40:34Z',\n", - " 'generation': 1,\n", - " 'managedFields': [{'apiVersion': 'kubeflow.org/v1beta1',\n", - " 'fieldsType': 'FieldsV1',\n", - " 'fieldsV1': {'f:spec': {'.': {},\n", - " 'f:algorithm': {'.': {}, 'f:algorithmName': {}},\n", - " 'f:maxFailedTrialCount': {},\n", - " 'f:maxTrialCount': {},\n", - " 'f:objective': {'.': {},\n", - " 'f:additionalMetricNames': {},\n", - " 'f:goal': {},\n", - " 'f:objectiveMetricName': {},\n", - " 'f:type': {}},\n", - " 'f:parallelTrialCount': {},\n", - " 'f:parameters': {},\n", - " 'f:resumePolicy': {},\n", - " 'f:trialTemplate': {'.': {},\n", - " 'f:primaryContainerName': {},\n", - " 'f:trialParameters': {},\n", - " 'f:trialSpec': {'.': {},\n", - " 'f:apiVersion': {},\n", - " 'f:kind': {},\n", - " 'f:spec': {'.': {},\n", - " 'f:template': {'.': {},\n", - " 'f:metadata': {'.': {},\n", - " 'f:annotations': {'.': {}, 'f:sidecar.istio.io/inject': {}}},\n", - " 'f:spec': {'.': {}, 'f:containers': {}, 'f:restartPolicy': {}}}}}}}},\n", - " 'manager': 'OpenAPI-Generator',\n", - " 'operation': 'Update',\n", - " 'time': '2021-10-05T23:40:34Z'}],\n", - " 'name': 'from-volume-resume-cmaes',\n", - " 'namespace': 'kubeflow-user-example-com',\n", - " 'resourceVersion': '393931453',\n", - " 'uid': '028034f1-3b42-4a1e-9f2c-cf24e845e4e0'},\n", - " 'spec': {'algorithm': {'algorithmName': 'cmaes'},\n", - " 'maxFailedTrialCount': 3,\n", - " 'maxTrialCount': 4,\n", - " 'metricsCollectorSpec': {'collector': {'kind': 'StdOut'}},\n", - " 'objective': {'additionalMetricNames': ['Train-accuracy'],\n", - " 'goal': 0.99,\n", - " 'metricStrategies': [{'name': 'Validation-accuracy', 'value': 'max'},\n", - " {'name': 'Train-accuracy', 'value': 'max'}],\n", - " 'objectiveMetricName': 'Validation-accuracy',\n", - " 'type': 'maximize'},\n", - " 'parallelTrialCount': 3,\n", - " 'parameters': [{'feasibleSpace': {'max': '0.06', 'min': '0.01'},\n", - " 'name': 'lr',\n", - " 'parameterType': 'double'},\n", - " {'feasibleSpace': {'max': '5', 'min': '2'},\n", - " 'name': 'num-layers',\n", - " 'parameterType': 'int'},\n", - " {'feasibleSpace': {'list': ['sgd', 'adam', 'ftrl']},\n", - " 'name': 'optimizer',\n", - " 'parameterType': 'categorical'}],\n", - " 'resumePolicy': 'FromVolume',\n", - " 'trialTemplate': {'failureCondition': 'status.conditions.#(type==\"Failed\")#|#(status==\"True\")#',\n", - " 'primaryContainerName': 'training-container',\n", - " 'successCondition': 'status.conditions.#(type==\"Complete\")#|#(status==\"True\")#',\n", - " 'trialParameters': [{'description': 'Learning rate for the training model',\n", - " 'name': 'learningRate',\n", - " 'reference': 'lr'},\n", - " {'description': 'Number of training model layers',\n", - " 'name': 'numberLayers',\n", - " 'reference': 'num-layers'},\n", - " {'description': 'Training model optimizer (sdg, adam or ftrl)',\n", - " 'name': 'optimizer',\n", - " 'reference': 'optimizer'}],\n", - " 'trialSpec': {'apiVersion': 'batch/v1',\n", - " 'kind': 'Job',\n", - " 'spec': {'template': {'metadata': {'annotations': {'sidecar.istio.io/inject': 'false'}},\n", - " 'spec': {'containers': [{'command': ['python3',\n", - " '/opt/mxnet-mnist/mnist.py',\n", - " '--batch-size=64',\n", - " '--lr=${trialParameters.learningRate}',\n", - " '--num-layers=${trialParameters.numberLayers}',\n", - " '--optimizer=${trialParameters.optimizer}'],\n", - " 'image': 'docker.io/kubeflowkatib/mxnet-mnist:v0.13.0',\n", - " 'name': 'training-container'}],\n", - " 'restartPolicy': 'Never'}}}}}}}" - ] - }, - "execution_count": 7, - "metadata": {}, - "output_type": "execute_result" } ], "source": [ @@ -563,7 +389,7 @@ }, { "cell_type": "code", - "execution_count": 8, + "execution_count": 51, "metadata": { "scrolled": true }, @@ -572,11 +398,194 @@ "name": "stdout", "output_type": "stream", "text": [ - "{'apiVersion': 'kubeflow.org/v1beta1', 'kind': 'Experiment', 'metadata': {'creationTimestamp': '2021-10-05T23:40:19Z', 'finalizers': ['update-prometheus-metrics'], 'generation': 1, 'managedFields': [{'apiVersion': 'kubeflow.org/v1beta1', 'fieldsType': 'FieldsV1', 'fieldsV1': {'f:spec': {'.': {}, 'f:algorithm': {'.': {}, 'f:algorithmName': {}}, 'f:maxFailedTrialCount': {}, 'f:maxTrialCount': {}, 'f:objective': {'.': {}, 'f:additionalMetricNames': {}, 'f:goal': {}, 'f:objectiveMetricName': {}, 'f:type': {}}, 'f:parallelTrialCount': {}, 'f:parameters': {}, 'f:trialTemplate': {'.': {}, 'f:primaryContainerName': {}, 'f:trialParameters': {}, 'f:trialSpec': {'.': {}, 'f:apiVersion': {}, 'f:kind': {}, 'f:spec': {'.': {}, 'f:template': {'.': {}, 'f:metadata': {'.': {}, 'f:annotations': {'.': {}, 'f:sidecar.istio.io/inject': {}}}, 'f:spec': {'.': {}, 'f:containers': {}, 'f:restartPolicy': {}}}}}}}}, 'manager': 'OpenAPI-Generator', 'operation': 'Update', 'time': '2021-10-05T23:40:19Z'}, {'apiVersion': 'kubeflow.org/v1beta1', 'fieldsType': 'FieldsV1', 'fieldsV1': {'f:metadata': {'f:finalizers': {'.': {}, 'v:\"update-prometheus-metrics\"': {}}}, 'f:status': {'.': {}, 'f:conditions': {}, 'f:currentOptimalTrial': {'.': {}, 'f:bestTrialName': {}, 'f:observation': {'.': {}, 'f:metrics': {}}, 'f:parameterAssignments': {}}, 'f:runningTrialList': {}, 'f:startTime': {}, 'f:trials': {}, 'f:trialsRunning': {}}}, 'manager': 'katib-controller', 'operation': 'Update', 'time': '2021-10-05T23:40:54Z'}], 'name': 'cmaes-example', 'namespace': 'kubeflow-user-example-com', 'resourceVersion': '393932086', 'uid': 'ccf6bb73-6768-4e6e-9a23-742d0953ecf1'}, 'spec': {'algorithm': {'algorithmName': 'cmaes'}, 'maxFailedTrialCount': 3, 'maxTrialCount': 7, 'metricsCollectorSpec': {'collector': {'kind': 'StdOut'}}, 'objective': {'additionalMetricNames': ['Train-accuracy'], 'goal': 0.99, 'metricStrategies': [{'name': 'Validation-accuracy', 'value': 'max'}, {'name': 'Train-accuracy', 'value': 'max'}], 'objectiveMetricName': 'Validation-accuracy', 'type': 'maximize'}, 'parallelTrialCount': 3, 'parameters': [{'feasibleSpace': {'max': '0.06', 'min': '0.01'}, 'name': 'lr', 'parameterType': 'double'}, {'feasibleSpace': {'max': '5', 'min': '2'}, 'name': 'num-layers', 'parameterType': 'int'}, {'feasibleSpace': {'list': ['sgd', 'adam', 'ftrl']}, 'name': 'optimizer', 'parameterType': 'categorical'}], 'resumePolicy': 'LongRunning', 'trialTemplate': {'failureCondition': 'status.conditions.#(type==\"Failed\")#|#(status==\"True\")#', 'primaryContainerName': 'training-container', 'successCondition': 'status.conditions.#(type==\"Complete\")#|#(status==\"True\")#', 'trialParameters': [{'description': 'Learning rate for the training model', 'name': 'learningRate', 'reference': 'lr'}, {'description': 'Number of training model layers', 'name': 'numberLayers', 'reference': 'num-layers'}, {'description': 'Training model optimizer (sdg, adam or ftrl)', 'name': 'optimizer', 'reference': 'optimizer'}], 'trialSpec': {'apiVersion': 'batch/v1', 'kind': 'Job', 'spec': {'template': {'metadata': {'annotations': {'sidecar.istio.io/inject': 'false'}}, 'spec': {'containers': [{'command': ['python3', '/opt/mxnet-mnist/mnist.py', '--batch-size=64', '--lr=${trialParameters.learningRate}', '--num-layers=${trialParameters.numberLayers}', '--optimizer=${trialParameters.optimizer}'], 'image': 'docker.io/kubeflowkatib/mxnet-mnist:v0.13.0', 'name': 'training-container'}], 'restartPolicy': 'Never'}}}}}}, 'status': {'conditions': [{'lastTransitionTime': '2021-10-05T23:40:19Z', 'lastUpdateTime': '2021-10-05T23:40:19Z', 'message': 'Experiment is created', 'reason': 'ExperimentCreated', 'status': 'True', 'type': 'Created'}, {'lastTransitionTime': '2021-10-05T23:40:54Z', 'lastUpdateTime': '2021-10-05T23:40:54Z', 'message': 'Experiment is running', 'reason': 'ExperimentRunning', 'status': 'True', 'type': 'Running'}], 'currentOptimalTrial': {'bestTrialName': '', 'observation': {'metrics': None}, 'parameterAssignments': None}, 'runningTrialList': ['cmaes-example-xn6txwtw', 'cmaes-example-8crn89vg', 'cmaes-example-4q8hmt9r'], 'startTime': '2021-10-05T23:40:19Z', 'trials': 3, 'trialsRunning': 3}}\n", + "{'api_version': 'kubeflow.org/v1beta1',\n", + " 'kind': 'Experiment',\n", + " 'metadata': {'annotations': None,\n", + " 'creation_timestamp': datetime.datetime(2023, 1, 6, 14, 28, 28, tzinfo=tzlocal()),\n", + " 'deletion_grace_period_seconds': None,\n", + " 'deletion_timestamp': None,\n", + " 'finalizers': ['update-prometheus-metrics'],\n", + " 'generate_name': None,\n", + " 'generation': 1,\n", + " 'labels': None,\n", + " 'managed_fields': [{'api_version': 'kubeflow.org/v1beta1',\n", + " 'fields_type': 'FieldsV1',\n", + " 'fields_v1': {'f:spec': {'.': {},\n", + " 'f:algorithm': {'.': {},\n", + " 'f:algorithmName': {}},\n", + " 'f:maxFailedTrialCount': {},\n", + " 'f:maxTrialCount': {},\n", + " 'f:objective': {'.': {},\n", + " 'f:additionalMetricNames': {},\n", + " 'f:goal': {},\n", + " 'f:objectiveMetricName': {},\n", + " 'f:type': {}},\n", + " 'f:parallelTrialCount': {},\n", + " 'f:parameters': {},\n", + " 'f:trialTemplate': {'.': {},\n", + " 'f:primaryContainerName': {},\n", + " 'f:trialParameters': {},\n", + " 'f:trialSpec': {'.': {},\n", + " 'f:apiVersion': {},\n", + " 'f:kind': {},\n", + " 'f:spec': {'.': {},\n", + " 'f:template': {'.': {},\n", + " 'f:metadata': {'.': {},\n", + " 'f:annotations': {'.': {},\n", + " 'f:sidecar.istio.io/inject': {}}},\n", + " 'f:spec': {'.': {},\n", + " 'f:containers': {},\n", + " 'f:restartPolicy': {}}}}}}}},\n", + " 'manager': 'OpenAPI-Generator',\n", + " 'operation': 'Update',\n", + " 'subresource': None,\n", + " 'time': datetime.datetime(2023, 1, 6, 14, 28, 28, tzinfo=tzlocal())},\n", + " {'api_version': 'kubeflow.org/v1beta1',\n", + " 'fields_type': 'FieldsV1',\n", + " 'fields_v1': {'f:metadata': {'f:finalizers': {'.': {},\n", + " 'v:\"update-prometheus-metrics\"': {}}}},\n", + " 'manager': 'katib-controller',\n", + " 'operation': 'Update',\n", + " 'subresource': None,\n", + " 'time': datetime.datetime(2023, 1, 6, 14, 28, 28, tzinfo=tzlocal())},\n", + " {'api_version': 'kubeflow.org/v1beta1',\n", + " 'fields_type': 'FieldsV1',\n", + " 'fields_v1': {'f:status': {'.': {},\n", + " 'f:conditions': {},\n", + " 'f:currentOptimalTrial': {'.': {},\n", + " 'f:observation': {}},\n", + " 'f:runningTrialList': {},\n", + " 'f:startTime': {},\n", + " 'f:trials': {},\n", + " 'f:trialsRunning': {}}},\n", + " 'manager': 'katib-controller',\n", + " 'operation': 'Update',\n", + " 'subresource': 'status',\n", + " 'time': datetime.datetime(2023, 1, 6, 14, 28, 52, tzinfo=tzlocal())}],\n", + " 'name': 'cmaes-example',\n", + " 'namespace': 'kubeflow-user-example-com',\n", + " 'owner_references': None,\n", + " 'resource_version': '26516',\n", + " 'self_link': None,\n", + " 'uid': '1d59819e-4e5f-4adc-90cc-62c2ee867f72'},\n", + " 'spec': {'algorithm': {'algorithm_name': 'cmaes', 'algorithm_settings': None},\n", + " 'early_stopping': None,\n", + " 'max_failed_trial_count': 1,\n", + " 'max_trial_count': 3,\n", + " 'metrics_collector_spec': {'collector': {'custom_collector': None,\n", + " 'kind': 'StdOut'},\n", + " 'source': None},\n", + " 'nas_config': None,\n", + " 'objective': {'additional_metric_names': ['Train-accuracy'],\n", + " 'goal': 0.99,\n", + " 'metric_strategies': [{'name': 'Validation-accuracy',\n", + " 'value': 'max'},\n", + " {'name': 'Train-accuracy',\n", + " 'value': 'max'}],\n", + " 'objective_metric_name': 'Validation-accuracy',\n", + " 'type': 'maximize'},\n", + " 'parallel_trial_count': 2,\n", + " 'parameters': [{'feasible_space': {'list': None,\n", + " 'max': '0.06',\n", + " 'min': '0.01',\n", + " 'step': None},\n", + " 'name': 'lr',\n", + " 'parameter_type': 'double'},\n", + " {'feasible_space': {'list': None,\n", + " 'max': '5',\n", + " 'min': '2',\n", + " 'step': None},\n", + " 'name': 'num-layers',\n", + " 'parameter_type': 'int'},\n", + " {'feasible_space': {'list': ['sgd', 'adam', 'ftrl'],\n", + " 'max': None,\n", + " 'min': None,\n", + " 'step': None},\n", + " 'name': 'optimizer',\n", + " 'parameter_type': 'categorical'}],\n", + " 'resume_policy': 'LongRunning',\n", + " 'trial_template': {'config_map': None,\n", + " 'failure_condition': 'status.conditions.#(type==\"Failed\")#|#(status==\"True\")#',\n", + " 'primary_container_name': 'training-container',\n", + " 'primary_pod_labels': None,\n", + " 'retain': None,\n", + " 'success_condition': 'status.conditions.#(type==\"Complete\")#|#(status==\"True\")#',\n", + " 'trial_parameters': [{'description': 'Learning '\n", + " 'rate for '\n", + " 'the '\n", + " 'training '\n", + " 'model',\n", + " 'name': 'learningRate',\n", + " 'reference': 'lr'},\n", + " {'description': 'Number of '\n", + " 'training '\n", + " 'model '\n", + " 'layers',\n", + " 'name': 'numberLayers',\n", + " 'reference': 'num-layers'},\n", + " {'description': 'Training '\n", + " 'model '\n", + " 'optimizer '\n", + " '(sdg, adam '\n", + " 'or ftrl)',\n", + " 'name': 'optimizer',\n", + " 'reference': 'optimizer'}],\n", + " 'trial_spec': {'apiVersion': 'batch/v1',\n", + " 'kind': 'Job',\n", + " 'spec': {'template': {'metadata': {'annotations': {'sidecar.istio.io/inject': 'false'}},\n", + " 'spec': {'containers': [{'command': ['python3',\n", + " '/opt/mxnet-mnist/mnist.py',\n", + " '--batch-size=64',\n", + " '--num-epochs=1',\n", + " '--lr=${trialParameters.learningRate}',\n", + " '--num-layers=${trialParameters.numberLayers}',\n", + " '--optimizer=${trialParameters.optimizer}'],\n", + " 'image': 'docker.io/kubeflowkatib/mxnet-mnist:v0.14.0',\n", + " 'name': 'training-container'}],\n", + " 'restartPolicy': 'Never'}}}}}},\n", + " 'status': {'completion_time': None,\n", + " 'conditions': [{'last_transition_time': datetime.datetime(2023, 1, 6, 14, 28, 28, tzinfo=tzlocal()),\n", + " 'last_update_time': datetime.datetime(2023, 1, 6, 14, 28, 28, tzinfo=tzlocal()),\n", + " 'message': 'Experiment is created',\n", + " 'reason': 'ExperimentCreated',\n", + " 'status': 'True',\n", + " 'type': 'Created'},\n", + " {'last_transition_time': datetime.datetime(2023, 1, 6, 14, 28, 52, tzinfo=tzlocal()),\n", + " 'last_update_time': datetime.datetime(2023, 1, 6, 14, 28, 52, tzinfo=tzlocal()),\n", + " 'message': 'Experiment is running',\n", + " 'reason': 'ExperimentRunning',\n", + " 'status': 'True',\n", + " 'type': 'Running'}],\n", + " 'current_optimal_trial': {'best_trial_name': None,\n", + " 'observation': {'metrics': None},\n", + " 'parameter_assignments': None},\n", + " 'early_stopped_trial_list': None,\n", + " 'failed_trial_list': None,\n", + " 'killed_trial_list': None,\n", + " 'last_reconcile_time': None,\n", + " 'metrics_unavailable_trial_list': None,\n", + " 'pending_trial_list': None,\n", + " 'running_trial_list': ['cmaes-example-f64n8vb5',\n", + " 'cmaes-example-l6zkx5jx'],\n", + " 'start_time': datetime.datetime(2023, 1, 6, 14, 28, 28, tzinfo=tzlocal()),\n", + " 'succeeded_trial_list': None,\n", + " 'trial_metrics_unavailable': None,\n", + " 'trials': 2,\n", + " 'trials_early_stopped': None,\n", + " 'trials_failed': None,\n", + " 'trials_killed': None,\n", + " 'trials_pending': None,\n", + " 'trials_running': 2,\n", + " 'trials_succeeded': None}}\n", "-----------------\n", "\n", - "7\n", - "{'lastTransitionTime': '2021-10-05T23:40:54Z', 'lastUpdateTime': '2021-10-05T23:40:54Z', 'message': 'Experiment is running', 'reason': 'ExperimentRunning', 'status': 'True', 'type': 'Running'}\n" + "3\n", + "{'last_transition_time': datetime.datetime(2023, 1, 6, 14, 28, 52, tzinfo=tzlocal()),\n", + " 'last_update_time': datetime.datetime(2023, 1, 6, 14, 28, 52, tzinfo=tzlocal()),\n", + " 'message': 'Experiment is running',\n", + " 'reason': 'ExperimentRunning',\n", + " 'status': 'True',\n", + " 'type': 'Running'}\n" ] } ], @@ -586,8 +595,8 @@ "print(\"-----------------\\n\")\n", "\n", "# Get the max trial count and latest status.\n", - "print(exp[\"spec\"][\"maxTrialCount\"])\n", - "print(exp[\"status\"][\"conditions\"][-1])" + "print(exp.spec.max_trial_count)\n", + "print(exp.status.conditions[-1])" ] }, { @@ -601,7 +610,7 @@ }, { "cell_type": "code", - "execution_count": 9, + "execution_count": 52, "metadata": {}, "outputs": [ { @@ -616,51 +625,55 @@ ], "source": [ "# Get names from the running Experiments.\n", - "exp_list = kclient.get_experiment(namespace=namespace)\n", + "exp_list = kclient.list_experiments(namespace=namespace)\n", "\n", - "for exp in exp_list[\"items\"]:\n", - " print(exp[\"metadata\"][\"name\"])" + "for exp in exp_list:\n", + " print(exp.metadata.name)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "## Get the current Experiment status\n", + "## Get the current Experiment conditions\n", "\n", - "You can check the current Experiment status." + "You can check the current Experiment conditions and check if Experiment is Succeeded." ] }, { "cell_type": "code", - "execution_count": 10, + "execution_count": 53, "metadata": {}, "outputs": [ { "data": { "text/plain": [ - "'Running'" + "[{'last_transition_time': datetime.datetime(2023, 1, 6, 14, 28, 28, tzinfo=tzlocal()),\n", + " 'last_update_time': datetime.datetime(2023, 1, 6, 14, 28, 28, tzinfo=tzlocal()),\n", + " 'message': 'Experiment is created',\n", + " 'reason': 'ExperimentCreated',\n", + " 'status': 'True',\n", + " 'type': 'Created'},\n", + " {'last_transition_time': datetime.datetime(2023, 1, 6, 14, 28, 52, tzinfo=tzlocal()),\n", + " 'last_update_time': datetime.datetime(2023, 1, 6, 14, 28, 52, tzinfo=tzlocal()),\n", + " 'message': 'Experiment is running',\n", + " 'reason': 'ExperimentRunning',\n", + " 'status': 'True',\n", + " 'type': 'Running'}]" ] }, - "execution_count": 10, + "execution_count": 53, "metadata": {}, "output_type": "execute_result" } ], "source": [ - "kclient.get_experiment_status(name=experiment_name, namespace=namespace)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "You can check if your Experiment is succeeded." + "kclient.get_experiment_conditions(name=experiment_name, namespace=namespace)\n" ] }, { "cell_type": "code", - "execution_count": 11, + "execution_count": 54, "metadata": { "scrolled": true }, @@ -671,7 +684,7 @@ "False" ] }, - "execution_count": 11, + "execution_count": 54, "metadata": {}, "output_type": "execute_result" } @@ -686,35 +699,53 @@ "source": [ "## List of the current Trials\n", "\n", - "You can get list of the current trials with the latest status." + "You can get list of the current Trials with the latest status." ] }, { "cell_type": "code", - "execution_count": 12, + "execution_count": 55, "metadata": { "scrolled": true }, "outputs": [ { - "data": { - "text/plain": [ - "[{'name': 'cmaes-example-4q8hmt9r', 'status': 'Succeeded'},\n", - " {'name': 'cmaes-example-8crn89vg', 'status': 'Succeeded'},\n", - " {'name': 'cmaes-example-8m84klwm', 'status': 'Running'},\n", - " {'name': 'cmaes-example-gd9k79lg', 'status': 'Running'},\n", - " {'name': 'cmaes-example-jsh9pljq', 'status': 'Running'},\n", - " {'name': 'cmaes-example-xn6txwtw', 'status': 'Succeeded'}]" - ] - }, - "execution_count": 12, - "metadata": {}, - "output_type": "execute_result" + "name": "stdout", + "output_type": "stream", + "text": [ + "Trial Name: cmaes-example-dd4x6tsh\n", + "Trial Status: {'last_transition_time': datetime.datetime(2023, 1, 6, 14, 30, 43, tzinfo=tzlocal()),\n", + " 'last_update_time': datetime.datetime(2023, 1, 6, 14, 30, 43, tzinfo=tzlocal()),\n", + " 'message': 'Trial is running',\n", + " 'reason': 'TrialRunning',\n", + " 'status': 'True',\n", + " 'type': 'Running'}\n", + "\n", + "Trial Name: cmaes-example-f64n8vb5\n", + "Trial Status: {'last_transition_time': datetime.datetime(2023, 1, 6, 14, 30, 43, tzinfo=tzlocal()),\n", + " 'last_update_time': datetime.datetime(2023, 1, 6, 14, 30, 43, tzinfo=tzlocal()),\n", + " 'message': 'Trial has succeeded',\n", + " 'reason': 'TrialSucceeded',\n", + " 'status': 'True',\n", + " 'type': 'Succeeded'}\n", + "\n", + "Trial Name: cmaes-example-l6zkx5jx\n", + "Trial Status: {'last_transition_time': datetime.datetime(2023, 1, 6, 14, 30, 45, tzinfo=tzlocal()),\n", + " 'last_update_time': datetime.datetime(2023, 1, 6, 14, 30, 45, tzinfo=tzlocal()),\n", + " 'message': 'Trial has succeeded',\n", + " 'reason': 'TrialSucceeded',\n", + " 'status': 'True',\n", + " 'type': 'Succeeded'}\n", + "\n" + ] } ], "source": [ "# Trial list.\n", - "kclient.list_trials(name=experiment_name, namespace=namespace)" + "trial_list = kclient.list_trials(experiment_name=experiment_name, namespace=namespace)\n", + "for trial in trial_list:\n", + " print(f\"Trial Name: {trial.metadata.name}\")\n", + " print(f\"Trial Status: {trial.status.conditions[-1]}\\n\")" ] }, { @@ -728,27 +759,27 @@ }, { "cell_type": "code", - "execution_count": 13, + "execution_count": 56, "metadata": {}, "outputs": [ { "data": { "text/plain": [ - "{'currentOptimalTrial': {'bestTrialName': 'cmaes-example-8crn89vg',\n", - " 'observation': {'metrics': [{'latest': '0.991588',\n", - " 'max': '0.991588',\n", - " 'min': '0.925057',\n", - " 'name': 'Train-accuracy'},\n", - " {'latest': '0.978205',\n", - " 'max': '0.980096',\n", - " 'min': '0.954817',\n", - " 'name': 'Validation-accuracy'}]},\n", - " 'parameterAssignments': [{'name': 'lr', 'value': '0.04511033252270099'},\n", - " {'name': 'num-layers', 'value': '3'},\n", - " {'name': 'optimizer', 'value': 'sgd'}]}}" + "{'best_trial_name': 'cmaes-example-l6zkx5jx',\n", + " 'observation': {'metrics': [{'latest': '0.955613',\n", + " 'max': '0.955613',\n", + " 'min': '0.955613',\n", + " 'name': 'Validation-accuracy'},\n", + " {'latest': '0.922775',\n", + " 'max': '0.922775',\n", + " 'min': '0.922775',\n", + " 'name': 'Train-accuracy'}]},\n", + " 'parameter_assignments': [{'name': 'lr', 'value': '0.04511033252270099'},\n", + " {'name': 'num-layers', 'value': '3'},\n", + " {'name': 'optimizer', 'value': 'sgd'}]}" ] }, - "execution_count": 13, + "execution_count": 56, "metadata": {}, "output_type": "execute_result" } @@ -764,14 +795,14 @@ "source": [ "## Status for the Suggestion objects\n", "\n", - "You can check the Suggestion object status for more information about resume status.\n", + "Once Experiment is Succeeded, you can check the Suggestion object status for more information about resume status.\n", "\n", "For Experiment with FromVolume you should be able to check created PVC." ] }, { "cell_type": "code", - "execution_count": 14, + "execution_count": 59, "metadata": {}, "outputs": [ { @@ -788,13 +819,13 @@ "# Get the current Suggestion status for the never resume Experiment.\n", "suggestion = kclient.get_suggestion(name=experiment_never_resume_name, namespace=namespace)\n", "\n", - "print(suggestion[\"status\"][\"conditions\"][-1][\"message\"])\n", + "print(suggestion.status.conditions[-1].message)\n", "print(\"-----------------\")\n", "\n", "# Get the current Suggestion status for the from volume Experiment.\n", "suggestion = kclient.get_suggestion(name=experiment_from_volume_resume_name, namespace=namespace)\n", "\n", - "print(suggestion[\"status\"][\"conditions\"][-1][\"message\"])" + "print(suggestion.status.conditions[-1].message)" ] }, { @@ -808,157 +839,17 @@ }, { "cell_type": "code", - "execution_count": 15, + "execution_count": 61, "metadata": {}, "outputs": [ { - "data": { - "text/plain": [ - "{'apiVersion': 'kubeflow.org/v1beta1',\n", - " 'kind': 'Experiment',\n", - " 'metadata': {'creationTimestamp': '2021-10-05T23:40:34Z',\n", - " 'deletionGracePeriodSeconds': 0,\n", - " 'deletionTimestamp': '2021-10-05T23:48:36Z',\n", - " 'finalizers': ['update-prometheus-metrics'],\n", - " 'generation': 2,\n", - " 'managedFields': [{'apiVersion': 'kubeflow.org/v1beta1',\n", - " 'fieldsType': 'FieldsV1',\n", - " 'fieldsV1': {'f:spec': {'.': {},\n", - " 'f:algorithm': {'.': {}, 'f:algorithmName': {}},\n", - " 'f:maxFailedTrialCount': {},\n", - " 'f:maxTrialCount': {},\n", - " 'f:objective': {'.': {},\n", - " 'f:additionalMetricNames': {},\n", - " 'f:goal': {},\n", - " 'f:objectiveMetricName': {},\n", - " 'f:type': {}},\n", - " 'f:parallelTrialCount': {},\n", - " 'f:parameters': {},\n", - " 'f:resumePolicy': {},\n", - " 'f:trialTemplate': {'.': {},\n", - " 'f:primaryContainerName': {},\n", - " 'f:trialParameters': {},\n", - " 'f:trialSpec': {'.': {},\n", - " 'f:apiVersion': {},\n", - " 'f:kind': {},\n", - " 'f:spec': {'.': {},\n", - " 'f:template': {'.': {},\n", - " 'f:metadata': {'.': {},\n", - " 'f:annotations': {'.': {}, 'f:sidecar.istio.io/inject': {}}},\n", - " 'f:spec': {'.': {}, 'f:containers': {}, 'f:restartPolicy': {}}}}}}}},\n", - " 'manager': 'OpenAPI-Generator',\n", - " 'operation': 'Update',\n", - " 'time': '2021-10-05T23:40:34Z'},\n", - " {'apiVersion': 'kubeflow.org/v1beta1',\n", - " 'fieldsType': 'FieldsV1',\n", - " 'fieldsV1': {'f:metadata': {'f:finalizers': {'.': {},\n", - " 'v:\"update-prometheus-metrics\"': {}}},\n", - " 'f:status': {'.': {},\n", - " 'f:completionTime': {},\n", - " 'f:conditions': {},\n", - " 'f:currentOptimalTrial': {'.': {},\n", - " 'f:bestTrialName': {},\n", - " 'f:observation': {'.': {}, 'f:metrics': {}},\n", - " 'f:parameterAssignments': {}},\n", - " 'f:startTime': {},\n", - " 'f:succeededTrialList': {},\n", - " 'f:trials': {},\n", - " 'f:trialsSucceeded': {}}},\n", - " 'manager': 'katib-controller',\n", - " 'operation': 'Update',\n", - " 'time': '2021-10-05T23:44:26Z'}],\n", - " 'name': 'from-volume-resume-cmaes',\n", - " 'namespace': 'kubeflow-user-example-com',\n", - " 'resourceVersion': '393945413',\n", - " 'uid': '028034f1-3b42-4a1e-9f2c-cf24e845e4e0'},\n", - " 'spec': {'algorithm': {'algorithmName': 'cmaes'},\n", - " 'maxFailedTrialCount': 3,\n", - " 'maxTrialCount': 4,\n", - " 'metricsCollectorSpec': {'collector': {'kind': 'StdOut'}},\n", - " 'objective': {'additionalMetricNames': ['Train-accuracy'],\n", - " 'goal': 0.99,\n", - " 'metricStrategies': [{'name': 'Validation-accuracy', 'value': 'max'},\n", - " {'name': 'Train-accuracy', 'value': 'max'}],\n", - " 'objectiveMetricName': 'Validation-accuracy',\n", - " 'type': 'maximize'},\n", - " 'parallelTrialCount': 3,\n", - " 'parameters': [{'feasibleSpace': {'max': '0.06', 'min': '0.01'},\n", - " 'name': 'lr',\n", - " 'parameterType': 'double'},\n", - " {'feasibleSpace': {'max': '5', 'min': '2'},\n", - " 'name': 'num-layers',\n", - " 'parameterType': 'int'},\n", - " {'feasibleSpace': {'list': ['sgd', 'adam', 'ftrl']},\n", - " 'name': 'optimizer',\n", - " 'parameterType': 'categorical'}],\n", - " 'resumePolicy': 'FromVolume',\n", - " 'trialTemplate': {'failureCondition': 'status.conditions.#(type==\"Failed\")#|#(status==\"True\")#',\n", - " 'primaryContainerName': 'training-container',\n", - " 'successCondition': 'status.conditions.#(type==\"Complete\")#|#(status==\"True\")#',\n", - " 'trialParameters': [{'description': 'Learning rate for the training model',\n", - " 'name': 'learningRate',\n", - " 'reference': 'lr'},\n", - " {'description': 'Number of training model layers',\n", - " 'name': 'numberLayers',\n", - " 'reference': 'num-layers'},\n", - " {'description': 'Training model optimizer (sdg, adam or ftrl)',\n", - " 'name': 'optimizer',\n", - " 'reference': 'optimizer'}],\n", - " 'trialSpec': {'apiVersion': 'batch/v1',\n", - " 'kind': 'Job',\n", - " 'spec': {'template': {'metadata': {'annotations': {'sidecar.istio.io/inject': 'false'}},\n", - " 'spec': {'containers': [{'command': ['python3',\n", - " '/opt/mxnet-mnist/mnist.py',\n", - " '--batch-size=64',\n", - " '--lr=${trialParameters.learningRate}',\n", - " '--num-layers=${trialParameters.numberLayers}',\n", - " '--optimizer=${trialParameters.optimizer}'],\n", - " 'image': 'docker.io/kubeflowkatib/mxnet-mnist:v0.13.0',\n", - " 'name': 'training-container'}],\n", - " 'restartPolicy': 'Never'}}}}}},\n", - " 'status': {'completionTime': '2021-10-05T23:44:26Z',\n", - " 'conditions': [{'lastTransitionTime': '2021-10-05T23:40:34Z',\n", - " 'lastUpdateTime': '2021-10-05T23:40:34Z',\n", - " 'message': 'Experiment is created',\n", - " 'reason': 'ExperimentCreated',\n", - " 'status': 'True',\n", - " 'type': 'Created'},\n", - " {'lastTransitionTime': '2021-10-05T23:44:26Z',\n", - " 'lastUpdateTime': '2021-10-05T23:44:26Z',\n", - " 'message': 'Experiment is running',\n", - " 'reason': 'ExperimentRunning',\n", - " 'status': 'False',\n", - " 'type': 'Running'},\n", - " {'lastTransitionTime': '2021-10-05T23:44:26Z',\n", - " 'lastUpdateTime': '2021-10-05T23:44:26Z',\n", - " 'message': 'Experiment has succeeded because max trial count has reached',\n", - " 'reason': 'ExperimentMaxTrialsReached',\n", - " 'status': 'True',\n", - " 'type': 'Succeeded'}],\n", - " 'currentOptimalTrial': {'bestTrialName': 'from-volume-resume-cmaes-4vkwxqkk',\n", - " 'observation': {'metrics': [{'latest': '0.977309',\n", - " 'max': '0.977309',\n", - " 'min': '0.957604',\n", - " 'name': 'Validation-accuracy'},\n", - " {'latest': '0.991021',\n", - " 'max': '0.991021',\n", - " 'min': '0.923807',\n", - " 'name': 'Train-accuracy'}]},\n", - " 'parameterAssignments': [{'name': 'num-layers', 'value': '3'},\n", - " {'name': 'optimizer', 'value': 'sgd'},\n", - " {'name': 'lr', 'value': '0.04511033252270099'}]},\n", - " 'startTime': '2021-10-05T23:40:34Z',\n", - " 'succeededTrialList': ['from-volume-resume-cmaes-67cs2x4s',\n", - " 'from-volume-resume-cmaes-zfxl6ltf',\n", - " 'from-volume-resume-cmaes-tzzh9zkd',\n", - " 'from-volume-resume-cmaes-4vkwxqkk'],\n", - " 'trials': 4,\n", - " 'trialsSucceeded': 4}}" - ] - }, - "execution_count": 15, - "metadata": {}, - "output_type": "execute_result" + "name": "stdout", + "output_type": "stream", + "text": [ + "Experiment kubeflow-user-example-com/cmaes-example has been deleted\n", + "Experiment kubeflow-user-example-com/never-resume-cmaes has been deleted\n", + "Experiment kubeflow-user-example-com/from-volume-resume-cmaes has been deleted\n" + ] } ], "source": [ @@ -977,7 +868,7 @@ ], "metadata": { "kernelspec": { - "display_name": "Python 3", + "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, diff --git a/examples/v1beta1/sdk/nas-with-darts.ipynb b/examples/v1beta1/sdk/nas-with-darts.ipynb index 7c45bd0baf1..b05f6db5ecf 100644 --- a/examples/v1beta1/sdk/nas-with-darts.ipynb +++ b/examples/v1beta1/sdk/nas-with-darts.ipynb @@ -46,6 +46,7 @@ ], "source": [ "# Install required package (Katib SDK).\n", + "# TODO (andreyvelich): Update this example with the latest SDK.\n", "!pip install kubeflow-katib==0.13.0" ] }, diff --git a/examples/v1beta1/sdk/tune-train-from-func.ipynb b/examples/v1beta1/sdk/tune-train-from-func.ipynb index e98d11ff7a9..e47ed47d0e7 100644 --- a/examples/v1beta1/sdk/tune-train-from-func.ipynb +++ b/examples/v1beta1/sdk/tune-train-from-func.ipynb @@ -383,60 +383,39 @@ "name": "stdout", "output_type": "stream", "text": [ - "Katib Experiment status: Succeeded\n", + "Katib Experiment is Succeeded: True\n", "\n", "Current Optimal Trial\n", "\n", - "{\n", - " \"currentOptimalTrial\": {\n", - " \"bestTrialName\": \"tune-mnist-svnwmmmv\",\n", - " \"observation\": {\n", - " \"metrics\": [\n", - " {\n", - " \"latest\": \"0.0992\",\n", - " \"max\": \"0.8749\",\n", - " \"min\": \"0.0992\",\n", - " \"name\": \"loss\"\n", - " },\n", - " {\n", - " \"latest\": \"0.9714\",\n", - " \"max\": \"0.9714\",\n", - " \"min\": \"0.7176\",\n", - " \"name\": \"accuracy\"\n", - " }\n", - " ]\n", - " },\n", - " \"parameterAssignments\": [\n", - " {\n", - " \"name\": \"lr\",\n", - " \"value\": \"0.17016692449867332\"\n", - " },\n", - " {\n", - " \"name\": \"num_epoch\",\n", - " \"value\": \"13\"\n", - " }\n", - " ]\n", - " }\n", - "}\n" + "{'best_trial_name': 'tune-mnist-l8xvxzvj',\n", + " 'observation': {'metrics': [{'latest': '0.9694',\n", + " 'max': '0.9694',\n", + " 'min': '0.7203',\n", + " 'name': 'accuracy'},\n", + " {'latest': '0.1007',\n", + " 'max': '0.9082',\n", + " 'min': '0.1007',\n", + " 'name': 'loss'}]},\n", + " 'parameter_assignments': [{'name': 'num_epoch', 'value': '13'},\n", + " {'name': 'lr', 'value': '0.16377224201308005'}]}\n" ] } ], "source": [ - "status = katib_client.get_experiment_status(exp_name)\n", - "print(f\"Katib Experiment status: {status}\\n\")\n", + "status = katib_client.is_experiment_succeeded(exp_name)\n", + "print(f\"Katib Experiment is Succeeded: {status}\\n\")\n", "\n", "best_hps = katib_client.get_optimal_hyperparameters(exp_name)\n", "\n", - "if best_hps != {}:\n", - " import json\n", + "if best_hps != None:\n", " print(\"Current Optimal Trial\\n\")\n", - " print(json.dumps(best_hps, indent=4))\n", + " print(best_hps)\n", " \n", - " for hp in best_hps[\"currentOptimalTrial\"][\"parameterAssignments\"]:\n", - " if hp[\"name\"] == \"lr\":\n", - " best_lr = hp[\"value\"]\n", + " for hp in best_hps.parameter_assignments:\n", + " if hp.name == \"lr\":\n", + " best_lr = hp.value\n", " else:\n", - " best_num_epoch = hp[\"value\"]" + " best_num_epoch = hp.value" ] }, { diff --git a/sdk/python/v1beta1/kubeflow/katib/api/katib_client.py b/sdk/python/v1beta1/kubeflow/katib/api/katib_client.py index d6289b82ef5..631f82a9ba4 100644 --- a/sdk/python/v1beta1/kubeflow/katib/api/katib_client.py +++ b/sdk/python/v1beta1/kubeflow/katib/api/katib_client.py @@ -17,6 +17,7 @@ import inspect import textwrap import grpc +import time from kubernetes import client, config from kubeflow.katib import models @@ -36,10 +37,11 @@ def __init__( ): """KatibClient constructor. - :param config_file: Name of the kube-config file. Defaults to ~/.kube/config. - :param context: Set the active context. Defaults to current_context from the kube-config. - :param client_configuration: The kubernetes.client.Configuration to set configs to. - :param persist_config: If True, config file will be updated when changed. + Args: + config_file: Name of the kube-config file. Defaults to ~/.kube/config. + context: Set the active context. Defaults to current_context from the kube-config. + client_configuration: The kubernetes.client.Configuration to set configs to. + persist_config: If True, config file will be updated when changed. """ self.in_cluster = None @@ -55,7 +57,7 @@ def __init__( config.load_incluster_config() self.in_cluster = True - self.api_instance = client.CustomObjectsApi() + self.custom_api = client.CustomObjectsApi() self.api_client = ApiClient() def _is_ipython(self): @@ -71,32 +73,40 @@ def _is_ipython(self): return True def create_experiment( - self, exp_object, namespace=utils.get_default_target_namespace() + self, + experiment: models.V1beta1Experiment, + namespace: str = utils.get_default_target_namespace(), ): """Create the Katib Experiment. - :param exp_object: Experiment object. - :param namespace: Experiment namespace. - If the namespace is None, it takes namespace from the Experiment or "default". + Args: + experiment: Experiment object of type V1beta1Experiment. + namespace: Namespace for the Experiment. + + Raises: + TimeoutError: Timeout to create Katib Experiment. + RuntimeError: Failed to create Katib Experiment. """ try: - self.api_instance.create_namespaced_custom_object( + self.custom_api.create_namespaced_custom_object( constants.KUBEFLOW_GROUP, constants.KATIB_VERSION, namespace, constants.EXPERIMENT_PLURAL, - exp_object, + experiment, + ) + except multiprocessing.TimeoutError: + raise TimeoutError( + f"Timeout to create Katib Experiment: {namespace}/{experiment.metadata.name}" ) - except client.rest.ApiException as e: + except Exception: raise RuntimeError( - "Exception when calling CustomObjectsApi->create_namespaced_custom_object:\ - %s\n" - % e + f"Failed to create Katib Experiment: {namespace}/{experiment.metadata.name}" ) # TODO (andreyvelich): Use proper logger. - print("Experiment {} has been created".format(exp_object.metadata.name)) + print(f"Experiment {namespace}/{experiment.metadata.name} has been created") if self._is_ipython(): if self.in_cluster: @@ -106,9 +116,9 @@ def create_experiment( IPython.display.HTML( "Katib Experiment {} " 'link here'.format( - exp_object.metadata.name, + experiment.metadata.name, namespace, - exp_object.metadata.name, + experiment.metadata.name, ) ) ) @@ -167,6 +177,11 @@ def tune( to the base image packages. These packages are installed before executing the objective function. pip_index_url: The PyPI url from which to install Python packages. + + Raises: + ValueError: Objective function has invalid arguments. + TimeoutError: Timeout to create Katib Experiment. + RuntimeError: Failed to create Katib Experiment. """ # Create Katib Experiment template. @@ -297,26 +312,32 @@ def tune( experiment.spec.trial_template = trial_template # Create the Katib Experiment. - self.create_experiment(exp_object=experiment, namespace=namespace) + self.create_experiment(experiment, namespace) - # TODO (andreyvelich): Get Experiment should always return one Experiment. - # Use list_experiments to return Experiment list. - # That function should return Experiment object. - def get_experiment(self, name=None, namespace=utils.get_default_target_namespace()): + def get_experiment( + self, + name: str, + namespace: str = utils.get_default_target_namespace(), + timeout: int = constants.DEFAULT_TIMEOUT, + ): """Get the Katib Experiment. - :param name: Experiment name. - If the name is None returns all Experiments in the namespace. - :param namespace: Experiment namespace. - If the namespace is `None`, it takes namespace from the Experiment object or "default". + Args: + name: Name for the Experiment. + namespace: Namespace for the Experiment. + timeout: Optional, Kubernetes API server timeout in seconds + to execute the request. + Returns: + V1beta1Experiment: Katib Experiment object. - :return: Experiment object. - :rtype: dict + Raises: + TimeoutError: Timeout to get Katib Experiment. + RuntimeError: Failed to get Katib Experiment. """ - if name: - thread = self.api_instance.get_namespaced_custom_object( + try: + thread = self.custom_api.get_namespaced_custom_object( constants.KUBEFLOW_GROUP, constants.KATIB_VERSION, namespace, @@ -324,378 +345,758 @@ def get_experiment(self, name=None, namespace=utils.get_default_target_namespace name, async_req=True, ) + response = utils.FakeResponse(thread.get(timeout)) + experiment = self.api_client.deserialize(response, models.V1beta1Experiment) + return experiment - katibexp = None - try: - katibexp = thread.get(constants.APISERVER_TIMEOUT) - except multiprocessing.TimeoutError: - raise RuntimeError("Timeout trying to get katib experiment.") - except client.rest.ApiException as e: - raise RuntimeError( - "Exception when calling CustomObjectsApi->get_namespaced_custom_object:\ - %s\n" - % e - ) - except Exception as e: - raise RuntimeError( - "There was a problem to get experiment {0} in namespace {1}. Exception: \ - {2} ".format( - name, namespace, e - ) - ) + except multiprocessing.TimeoutError: + raise TimeoutError(f"Timeout to get Katib Experiment: {namespace}/{name}") + except Exception: + raise RuntimeError(f"Failed to get Katib Experiment: {namespace}/{name}") - else: - thread = self.api_instance.list_namespaced_custom_object( + def list_experiments( + self, + namespace: str = utils.get_default_target_namespace(), + timeout: int = constants.DEFAULT_TIMEOUT, + ): + """List of all Katib Experiments in namespace. + + Args: + namespace: Namespace to list the Experiments. + timeout: Optional, Kubernetes API server timeout in seconds + to execute the request. + + Returns: + list[V1beta1Experiment]: List of Katib Experiment objects. It returns + empty list if Experiments cannot be found. + + Raises: + TimeoutError: Timeout to list Katib Experiments. + RuntimeError: Failed to list Katib Experiments. + """ + + result = [] + try: + thread = self.custom_api.list_namespaced_custom_object( constants.KUBEFLOW_GROUP, constants.KATIB_VERSION, - namespace, - constants.EXPERIMENT_PLURAL, + namespace=namespace, + plural=constants.EXPERIMENT_PLURAL, async_req=True, ) + response = thread.get(timeout) + result = [ + self.api_client.deserialize( + utils.FakeResponse(item), models.V1beta1Experiment + ) + for item in response.get("items") + ] + except multiprocessing.TimeoutError: + raise TimeoutError( + f"Timeout to list Katib Experiments in namespace: {namespace}" + ) + except Exception: + raise RuntimeError( + f"Failed to list Katib Experiments in namespace: {namespace}" + ) + return result - katibexp = None - try: - katibexp = thread.get(constants.APISERVER_TIMEOUT) - except multiprocessing.TimeoutError: - raise RuntimeError("Timeout trying to get Experiment.") - except client.rest.ApiException as e: - raise RuntimeError( - "Exception when calling CustomObjectsApi->list_namespaced_custom_object:\ - %s\n" - % e + def get_experiment_conditions( + self, + name: str, + namespace: str = utils.get_default_target_namespace(), + experiment: models.V1beta1Experiment = None, + timeout: int = constants.DEFAULT_TIMEOUT, + ): + """Get the Experiment conditions. Experiment is in the condition when + `status` is True for the appropriate condition `type`. + + Args: + name: Name for the Experiment. + namespace: Namespace for the Experiment. + experiment: Optionally, Experiment object can be set to get the conditions. + timeout: Optional, Kubernetes API server timeout in seconds + to execute the request. + + Returns: + list[V1beta1ExperimentCondition]: List of Experiment conditions with + last transition time, last update time, message, reason, type, and + status. It returns empty list if Experiment does not have any + conditions yet. + + Raises: + TimeoutError: Timeout to get Katib Experiment. + RuntimeError: Failed to get Katib Experiment. + """ + + if experiment is None: + experiment = self.get_experiment(name, namespace, timeout) + + if ( + experiment.status + and experiment.status.conditions + and len(experiment.status.conditions) > 0 + ): + return experiment.status.conditions + + return [] + + def is_experiment_created( + self, + name: str, + namespace: str = utils.get_default_target_namespace(), + experiment: models.V1beta1Experiment = None, + timeout: int = constants.DEFAULT_TIMEOUT, + ): + """Check if Experiment is Created. + + Args: + name: Name for the Experiment. + namespace: Namespace for the Experiment. + experiment: Optionally, Experiment object can be set to check the status. + timeout: Optional, Kubernetes API server timeout in seconds + to execute the request. + + Returns: + bool: True is Experiment is Created, else False. + + Raises: + TimeoutError: Timeout to get Katib Experiment. + RuntimeError: Failed to get Katib Experiment. + """ + + return utils.has_condition( + self.get_experiment_conditions(name, namespace, experiment, timeout), + constants.EXPERIMENT_CONDITION_CREATED, + ) + + def is_experiment_running( + self, + name: str, + namespace: str = utils.get_default_target_namespace(), + experiment: models.V1beta1Experiment = None, + timeout: int = constants.DEFAULT_TIMEOUT, + ): + """Check if Experiment is Running. + + Args: + name: Name for the Experiment. + namespace: Namespace for the Experiment. + experiment: Optionally, Experiment object can be set to check the status. + timeout: Optional, Kubernetes API server timeout in seconds + to execute the request. + + Returns: + bool: True is Experiment is Running, else False. + + Raises: + TimeoutError: Timeout to get Katib Experiment. + RuntimeError: Failed to get Katib Experiment. + """ + + return utils.has_condition( + self.get_experiment_conditions(name, namespace, experiment, timeout), + constants.EXPERIMENT_CONDITION_RUNNING, + ) + + def is_experiment_restarting( + self, + name: str, + namespace: str = utils.get_default_target_namespace(), + experiment: models.V1beta1Experiment = None, + timeout: int = constants.DEFAULT_TIMEOUT, + ): + """Check if Experiment is Restarting. + Args: + name: Name for the Experiment. + namespace: Namespace for the Experiment. + experiment: Optionally, Experiment object can be set to check the status. + timeout: Optional, Kubernetes API server timeout in seconds + to execute the request. + + Returns: + bool: True is Experiment is Resting, else False. + + Raises: + TimeoutError: Timeout to get Katib Experiment. + RuntimeError: Failed to get Katib Experiment. + """ + + return utils.has_condition( + self.get_experiment_conditions(name, namespace, experiment, timeout), + constants.EXPERIMENT_CONDITION_RESTARTING, + ) + + def is_experiment_succeeded( + self, + name: str, + namespace: str = utils.get_default_target_namespace(), + experiment: models.V1beta1Experiment = None, + timeout: int = constants.DEFAULT_TIMEOUT, + ): + """Check if Experiment is Succeeded. + Args: + name: Name for the Experiment. + namespace: Namespace for the Experiment. + experiment: Optionally, Experiment object can be set to check the status. + timeout: Optional, Kubernetes API server timeout in seconds + to execute the request. + + Returns: + bool: True is Experiment is Succeeded, else False. + + Raises: + TimeoutError: Timeout to get Katib Experiment. + RuntimeError: Failed to get Katib Experiment. + """ + + return utils.has_condition( + self.get_experiment_conditions(name, namespace, experiment, timeout), + constants.EXPERIMENT_CONDITION_SUCCEEDED, + ) + + def is_experiment_failed( + self, + name: str, + namespace: str = utils.get_default_target_namespace(), + experiment: models.V1beta1Experiment = None, + timeout: int = constants.DEFAULT_TIMEOUT, + ): + """Check if Experiment is Failed. + Args: + name: Name for the Experiment. + namespace: Namespace for the Experiment. + experiment: Optionally, Experiment object can be set to check the status. + timeout: Optional, Kubernetes API server timeout in seconds + to execute the request. + + Returns: + bool: True is Experiment is Failed, else False. + + Raises: + TimeoutError: Timeout to get Katib Experiment. + RuntimeError: Failed to get Katib Experiment. + """ + + return utils.has_condition( + self.get_experiment_conditions(name, namespace, experiment, timeout), + constants.EXPERIMENT_CONDITION_FAILED, + ) + + def wait_for_experiment_condition( + self, + name: str, + namespace: str = utils.get_default_target_namespace(), + expected_condition: str = constants.EXPERIMENT_CONDITION_SUCCEEDED, + timeout: int = 600, + polling_interval: int = 15, + apiserver_timeout: int = constants.DEFAULT_TIMEOUT, + ): + """Wait until Experiment reaches specific condition. By default it waits + for the Succeeded condition. + + Args: + name: Name for the Experiment. + namespace: Namespace for the Experiment. + expected_condition: Which condition Experiment should reach. + timeout: How many seconds to wait until Experiment reaches condition. + polling_interval: The polling interval in seconds to get Experiment status. + apiserver_timeout: Optional, Kubernetes API server timeout in seconds + to execute the request. + + Returns: + V1beta1Experiment: Katib Experiment object. + + Raises: + RuntimeError: Failed to get Katib Experiment or Experiment reaches + Failed state if it does not wait for this condition. + TimeoutError: Timeout waiting for Experiment to reach required condition + or timeout to get Katib Experiment. + """ + + for _ in range(round(timeout / polling_interval)): + + # We should get Experiment only once per cycle and check the statuses. + experiment = self.get_experiment(name, namespace, apiserver_timeout) + + # Wait for Failed condition. + if ( + expected_condition == constants.EXPERIMENT_CONDITION_FAILED + and self.is_experiment_failed( + name, namespace, experiment, apiserver_timeout ) - except Exception as e: + ): + utils.print_experiment_status(experiment) + print(f"Experiment: {namespace}/{name} is {expected_condition}\n\n\n") + return experiment + + # Raise exception if Experiment is Failed. + elif self.is_experiment_failed( + name, namespace, experiment, apiserver_timeout + ): raise RuntimeError( - "There was a problem to get experiment in namespace {0}. \ - Exception: {1} ".format( - namespace, e - ) + f"Experiment: {namespace}/{name} is Failed. " + f"Experiment conditions: {experiment.status.conditions}" ) - return katibexp + # Check if Experiment reaches Created condition. + elif ( + expected_condition == constants.EXPERIMENT_CONDITION_CREATED + and self.is_experiment_created( + name, namespace, experiment, apiserver_timeout + ) + ): + utils.print_experiment_status(experiment) + print(f"Experiment: {namespace}/{name} is {expected_condition}\n\n\n") + return experiment + + # Check if Experiment reaches Running condition. + elif ( + expected_condition == constants.EXPERIMENT_CONDITION_RUNNING + and self.is_experiment_running( + name, namespace, experiment, apiserver_timeout + ) + ): + utils.print_experiment_status(experiment) + print(f"Experiment: {namespace}/{name} is {expected_condition}\n\n\n") + return experiment + + # Check if Experiment reaches Restarting condition. + elif ( + expected_condition == constants.EXPERIMENT_CONDITION_RESTARTING + and self.is_experiment_restarting( + name, namespace, experiment, apiserver_timeout + ) + ): + utils.print_experiment_status(experiment) + print(f"Experiment: {namespace}/{name} is {expected_condition}\n\n\n") + return experiment + + # Check if Experiment reaches Succeeded condition. + elif ( + expected_condition == constants.EXPERIMENT_CONDITION_SUCCEEDED + and self.is_experiment_succeeded( + name, namespace, experiment, apiserver_timeout + ) + ): + utils.print_experiment_status(experiment) + print(f"Experiment: {namespace}/{name} is {expected_condition}\n\n\n") + return experiment + + # Otherwise, print the current Experiment results and sleep for the pooling interval. + utils.print_experiment_status(experiment) + print( + f"Waiting for Experiment: {namespace}/{name} to reach {expected_condition} condition\n\n\n" + ) + time.sleep(polling_interval) - def get_suggestion(self, name=None, namespace=utils.get_default_target_namespace()): - """Get the Katib Suggestion. + raise TimeoutError( + f"Timeout waiting for Experiment: {namespace}/{name} to reach {expected_condition} state" + ) - :param name: Suggestion name. - If the name is None returns all Suggestion in the namespace. - :param namespace: Suggestion namespace. - If the namespace is None, it takes namespace from the Suggestion object or "default". + def edit_experiment_budget( + self, + name: str, + namespace: str = utils.get_default_target_namespace(), + max_trial_count: int = None, + parallel_trial_count: int = None, + max_failed_trial_count: int = None, + timeout: int = constants.DEFAULT_TIMEOUT, + ): + """Update Experiment budget for the running Trials. You can modify Trial + budget to resume Succeeded Experiments with `LongRunning` and `FromVolume` + resume policies. - :return: Suggestion object. - :rtype: dict + Learn about resuming Experiments here: https://www.kubeflow.org/docs/components/katib/resume-experiment/ + + Args: + name: Name for the Experiment. + namespace: Namespace for the Experiment. + max_trial_count: The new maximum number of Trials. + parallel_trial_count: The new number of Trials that Experiment runs in parallel. + max_failed_trial_count: The new maximum number of Trials allowed to fail. + timeout: Optional, Kubernetes API server timeout in seconds + to execute the request. + + Raises: + ValueError: The new Trial budget is not set. + TimeoutError: Timeout to edit/get Katib Experiment or timeout to wait + until Experiment reaches Restarting condition. + RuntimeError: Failed to edit/get Katib Experiment or Experiment + reaches Failed condition. """ - if name: - thread = self.api_instance.get_namespaced_custom_object( - constants.KUBEFLOW_GROUP, - constants.KATIB_VERSION, - namespace, - constants.SUGGESTION_PLURAL, - name, - async_req=True, + # The new Trial budget must be set. + if ( + max_trial_count is None + and parallel_trial_count is None + and max_failed_trial_count is None + ): + raise ValueError( + "Invalid input arguments. " + "You have to set max_trial_count, parallel_trial_count, or max_failed_trial_count " + "to modify Experiment Trial budget." ) - katib_suggestion = None - try: - katib_suggestion = thread.get(constants.APISERVER_TIMEOUT) - except multiprocessing.TimeoutError: - raise RuntimeError("Timeout trying to get Katib suggestion") - except client.rest.ApiException as e: - raise RuntimeError( - "Exception when calling CustomObjectsApi->get_namespaced_custom_object:\ - %s\n" - % e - ) - except Exception as e: - raise RuntimeError( - "There was a problem to get suggestion {0} in namespace {1}. Exception: \ - {2} ".format( - name, namespace, e - ) - ) + # Modify the Experiment Trial budget. + experiment = self.get_experiment(name, namespace, timeout) + if max_trial_count is not None: + experiment.spec.max_trial_count = max_trial_count + if parallel_trial_count is not None: + experiment.spec.parallel_trial_count = parallel_trial_count + if max_failed_trial_count is not None: + experiment.spec.max_failed_trial_count = max_failed_trial_count - else: - thread = self.api_instance.list_namespaced_custom_object( + # Update Experiment with the new Trial budget. + try: + self.custom_api.patch_namespaced_custom_object( constants.KUBEFLOW_GROUP, constants.KATIB_VERSION, namespace, - constants.SUGGESTION_PLURAL, - async_req=True, + constants.EXPERIMENT_PLURAL, + name, + experiment, ) + except multiprocessing.TimeoutError: + raise TimeoutError(f"Timeout to edit Katib Experiment: {namespace}/{name}") + except Exception: + raise RuntimeError(f"Failed to edit Katib Experiment: {namespace}/{name}") - katib_suggestion = None - try: - katib_suggestion = thread.get(constants.APISERVER_TIMEOUT) - except multiprocessing.TimeoutError: - raise RuntimeError("Timeout trying to get Katib suggestion") - except client.rest.ApiException as e: - raise RuntimeError( - "Exception when calling CustomObjectsApi->list_namespaced_custom_object:\ - %s\n" - % e - ) - except Exception as e: - raise RuntimeError( - "There was a problem to get suggestions in namespace {0}. \ - Exception: {1} ".format( - namespace, e - ) - ) - - return katib_suggestion + print(f"Experiment {namespace}/{name} has been updated") - def delete_experiment(self, name, namespace=utils.get_default_target_namespace()): + def delete_experiment( + self, + name: str, + namespace: str = utils.get_default_target_namespace(), + delete_options: client.V1DeleteOptions = None, + ): """Delete the Katib Experiment. - :param name: Experiment name. - :param namespace: Experiment namespace. - If the namespace is None, it takes namespace from the Experiment object or "default". + Args: + name: Name for the Experiment. + namespace: Namespace for the Experiment. + delete_options: Optional, V1DeleteOptions to set while deleting + Katib Experiment. For example, grace period seconds. + + Raises: + TimeoutError: Timeout to delete Katib Experiment. + RuntimeError: Failed to delete Katib Experiment. """ try: - self.api_instance.delete_namespaced_custom_object( + self.custom_api.delete_namespaced_custom_object( constants.KUBEFLOW_GROUP, constants.KATIB_VERSION, namespace, constants.EXPERIMENT_PLURAL, name, - body=client.V1DeleteOptions(), + body=delete_options, ) - except client.rest.ApiException as e: - raise RuntimeError( - "Exception when calling CustomObjectsApi->delete_namespaced_custom_object:\ - %s\n" - % e + except multiprocessing.TimeoutError: + raise TimeoutError( + f"Timeout to delete Katib Experiment: {namespace}/{name}" ) + except Exception: + raise RuntimeError(f"Failed to delete Katib Experiment: {namespace}/{name}") # TODO (andreyvelich): Use proper logger. - print("Experiment {} has been deleted".format(name)) + print(f"Experiment {namespace}/{name} has been deleted") + + def get_suggestion( + self, + name: str, + namespace: str = utils.get_default_target_namespace(), + timeout: int = constants.DEFAULT_TIMEOUT, + ): + """Get the Katib Suggestion. - def list_experiments(self, namespace=utils.get_default_target_namespace()): - """List all Katib Experiments. + Args: + name: Name for the Suggestion. + namespace: Namespace for the Suggestion. + timeout: Optional, Kubernetes API server timeout in seconds + to execute the request. - :param namespace: Experiments namespace. - If the namespace is None, it takes "default" namespace. + Returns: + V1beta1Suggestion: Katib Suggestion object. - :return: List of Experiment objects. - :rtype: list[V1beta1Experiment] + Raises: + TimeoutError: Timeout to get Katib Suggestion. + RuntimeError: Failed to get Katib Suggestion. """ - thread = self.api_instance.list_namespaced_custom_object( - constants.KUBEFLOW_GROUP, - constants.KATIB_VERSION, - namespace=namespace, - plural=constants.EXPERIMENT_PLURAL, - async_req=True, - ) + try: + thread = self.custom_api.get_namespaced_custom_object( + constants.KUBEFLOW_GROUP, + constants.KATIB_VERSION, + namespace, + constants.SUGGESTION_PLURAL, + name, + async_req=True, + ) + response = utils.FakeResponse(thread.get(timeout)) + suggestion = self.api_client.deserialize(response, models.V1beta1Suggestion) + return suggestion + + except multiprocessing.TimeoutError: + raise TimeoutError(f"Timeout to get Katib Suggestion: {namespace}/{name}") + except Exception: + raise RuntimeError(f"Failed to get Katib Suggestion: {namespace}/{name}") + + def list_suggestions( + self, + namespace: str = utils.get_default_target_namespace(), + timeout: int = constants.DEFAULT_TIMEOUT, + ): + """List of all Katib Suggestion in namespace. + + Args: + namespace: Namespace to list the Suggestions. + timeout: Optional, Kubernetes API server timeout in seconds + to execute the request. + + Returns: + list[V1beta1Suggestion]: List of Katib Suggestions objects. It returns + empty list if Suggestions cannot be found. + + Raises: + TimeoutError: Timeout to list Katib Suggestions. + RuntimeError: Failed to list Katib Suggestions. + """ - katibexp = None result = [] try: - katibexp = thread.get(constants.APISERVER_TIMEOUT) + thread = self.custom_api.list_namespaced_custom_object( + constants.KUBEFLOW_GROUP, + constants.KATIB_VERSION, + namespace=namespace, + plural=constants.EXPERIMENT_PLURAL, + async_req=True, + ) + response = thread.get(timeout) result = [ self.api_client.deserialize( - utils.FakeResponse(item), models.V1beta1Experiment + utils.FakeResponse(item), models.V1beta1Suggestion ) - for item in katibexp.get("items") + for item in response.get("items") ] - except multiprocessing.TimeoutError: - raise RuntimeError("Timeout trying to get katib experiment.") - except client.rest.ApiException as e: - raise RuntimeError( - "Exception when calling CustomObjectsApi->get_namespaced_custom_object:\ - %s\n" - % e + raise TimeoutError( + f"Timeout to list Katib Suggestions in namespace: {namespace}" ) - except Exception as e: + except Exception: raise RuntimeError( - "There was a problem to get experiments in namespace {0}. Exception: \ - {1} ".format( - namespace, e - ) + f"Failed to list Katib Suggestions in namespace: {namespace}" ) return result - def get_experiment_status( - self, name, namespace=utils.get_default_target_namespace() + def get_trial( + self, + name: str, + namespace: str = utils.get_default_target_namespace(), + timeout: int = constants.DEFAULT_TIMEOUT, ): - """Get the Experiment current status. - - :param name: Experiment name. - :param namespace: Experiment namespace. - If the namespace is None, it takes "default" namespace. - - :return: Current Experiment status. - :rtype: str - """ - - katibexp = self.get_experiment(name, namespace=namespace) - last_condition = katibexp.get("status", {}).get("conditions", [])[-1] - return last_condition.get("type", "") + """Get the Katib Trial. - def is_experiment_succeeded( - self, name, namespace=utils.get_default_target_namespace() - ): - """Check if Experiment has succeeded. + Args: + name: Name for the Trial. + namespace: Namespace for the Trial. + timeout: Optional, Kubernetes API server timeout in seconds + to execute the request. - :param name: Experiment name. - :param namespace: Experiment namespace. - If the namespace is None, it takes "default" namespace. + Returns: + V1beta1Trial: Katib Trial object. - :return: Whether Experiment has succeeded or not. - :rtype: bool + Raises: + TimeoutError: Timeout to get Katib Trial. + RuntimeError: Failed to get Katib Trial. """ - experiment_status = self.get_experiment_status(name, namespace=namespace) - return experiment_status.lower() == "succeeded" - def list_trials(self, name=None, namespace=utils.get_default_target_namespace()): - """List all Experiment's Trials. + try: + thread = self.custom_api.get_namespaced_custom_object( + constants.KUBEFLOW_GROUP, + constants.KATIB_VERSION, + namespace, + constants.TRIAL_PLURAL, + name, + async_req=True, + ) + response = utils.FakeResponse(thread.get(timeout)) + trial = self.api_client.deserialize(response, models.V1beta1Trial) + return trial + + except multiprocessing.TimeoutError: + raise TimeoutError(f"Timeout to get Katib Trial: {namespace}/{name}") + except Exception: + raise RuntimeError(f"Failed to get Katib Trial: {namespace}/{name}") - :param name: Experiment name. - :param namespace: Experiments namespace. - If the namespace is None, it takes "default" namespace. + def list_trials( + self, + experiment_name: str = None, + namespace: str = utils.get_default_target_namespace(), + timeout: int = constants.DEFAULT_TIMEOUT, + ): + """List of all Trials in namespace. If Experiment name is set, + it returns all Trials belong to the Experiment. - :return: List of Trial objects - :rtype: list[V1beta1Trial] + Args: + experiment_name: Optional name for the Experiment. + namespace: Namespace to list the Trials. + timeout: Optional, Kubernetes API server timeout in seconds + to execute the request. + + Returns: + list[V1beta1Trial]: List of Katib Trial objects. It returns + empty list if Trials cannot be found. + + Raises: + TimeoutError: Timeout to list Katib Trials. + RuntimeError: Failed to list Katib Trials. """ - thread = self.api_instance.list_namespaced_custom_object( - constants.KUBEFLOW_GROUP, - constants.KATIB_VERSION, - namespace=namespace, - plural=constants.TRIAL_PLURAL, - async_req=True, - ) - - katibtrial = None result = [] try: - katibtrial = thread.get(constants.APISERVER_TIMEOUT) - - for item in katibtrial.get("items"): - if ( - name is not None - and item.get("metadata", {}).get("ownerReferences")[0].get("name") - != name - ): - continue - - result.append( - self.api_client.deserialize( - utils.FakeResponse(item), models.V1beta1Trial - ) + if experiment_name is None: + thread = self.custom_api.list_namespaced_custom_object( + constants.KUBEFLOW_GROUP, + constants.KATIB_VERSION, + namespace=namespace, + plural=constants.TRIAL_PLURAL, + async_req=True, ) - except multiprocessing.TimeoutError: - raise RuntimeError("Timeout trying to get katib experiment.") - except client.rest.ApiException as e: - raise RuntimeError( - "Exception when calling CustomObjectsApi->list_namespaced_custom_object:\ - %s\n" - % e - ) - except Exception as e: - raise RuntimeError( - "There was a problem to get experiment {0} in namespace {1}. Exception: \ - {2} ".format( - name, namespace, e + else: + thread = self.custom_api.list_namespaced_custom_object( + constants.KUBEFLOW_GROUP, + constants.KATIB_VERSION, + namespace=namespace, + plural=constants.TRIAL_PLURAL, + label_selector=f"{constants.EXPERIMENT_LABEL}={experiment_name}", + async_req=True, ) + response = thread.get(timeout) + result = [ + self.api_client.deserialize( + utils.FakeResponse(item), models.V1beta1Trial + ) + for item in response.get("items") + ] + except multiprocessing.TimeoutError: + raise TimeoutError( + f"Timeout to list Katib Trials in namespace: {namespace}" ) + except Exception: + raise RuntimeError(f"Failed to list Katib Trials in namespace: {namespace}") return result def get_success_trial_details( - self, name=None, namespace=utils.get_default_target_namespace() + self, + experiment_name: str = None, + namespace: str = utils.get_default_target_namespace(), + timeout: int = constants.DEFAULT_TIMEOUT, ): - """Get the Trial details that have succeeded for an Experiment. - - :param name: Experiment name. - :param namespace: Experiment namespace. - If the namespace is None, it takes namespace from the Experiment or "default". + """Get the Succeeded Trial details. If Experiment name is set, + it returns Succeeded Trials details belong to the Experiment. - :return: Trial names with the hyperparameters and metrics. - :type: list[dict] + Args: + experiment_name: Optional name for the Experiment. + namespace: Namespace to list the Trials. + timeout: Optional, Kubernetes API server timeout in seconds + to execute the request. + + Returns: + list[dict]: Trial names with hyperparameters and metrics. + It returns empty list if Succeeded Trials cannot be found. + + Raises: + TimeoutError: Timeout to list Katib Trials. + RuntimeError: Failed to list Katib Trials. """ - thread = self.api_instance.list_namespaced_custom_object( - constants.KUBEFLOW_GROUP, - constants.KATIB_VERSION, - namespace=namespace, - plural=constants.TRIAL_PLURAL, - async_req=True, - ) - - katibtrial = None result = [] try: - katibtrial = thread.get(constants.APISERVER_TIMEOUT) - - for item in katibtrial.get("items"): - status = item.get("status", {}).get("conditions", [])[-1].get("type") - if status != "Succeeded": - continue - - if ( - name is not None - and item.get("metadata", {}).get("ownerReferences")[0].get("name") - != name - ): - continue - - output = {} - output["name"] = item.get("metadata", {}).get("name") - output["hyperparameters"] = item.get("spec", {}).get( - "parameterAssignments", [] + if experiment_name is None: + thread = self.custom_api.list_namespaced_custom_object( + constants.KUBEFLOW_GROUP, + constants.KATIB_VERSION, + namespace=namespace, + plural=constants.TRIAL_PLURAL, + async_req=True, ) - output["metrics"] = ( - item.get("status", {}).get("observation", {}).get("metrics", []) + else: + thread = self.custom_api.list_namespaced_custom_object( + constants.KUBEFLOW_GROUP, + constants.KATIB_VERSION, + namespace=namespace, + plural=constants.TRIAL_PLURAL, + label_selector=f"{constants.EXPERIMENT_LABEL}={experiment_name}", + async_req=True, ) - result.append(output) - except multiprocessing.TimeoutError: - raise RuntimeError( - "Timeout trying to get succeeded trials of the katib experiment." - ) - except client.rest.ApiException as e: - raise RuntimeError( - "Exception when calling CustomObjectsApi->list_namespaced_custom_object:\ - %s\n" - % e - ) - except Exception as e: - raise RuntimeError( - "There was a problem to get experiment {0} in namespace {1}. Exception: \ - {2} ".format( - name, namespace, e + response = thread.get(timeout) + for item in response.get("items"): + trial = self.api_client.deserialize( + utils.FakeResponse(item), models.V1beta1Trial ) + if ( + trial.status + and trial.status.conditions + and len(trial.status.conditions) > 0 + ): + if utils.has_condition( + trial.status.conditions, constants.TRIAL_CONDITION_SUCCEEDED + ): + output = {} + output["name"] = trial.metadata.name + output[ + "parameter_assignments" + ] = trial.spec.parameter_assignments + output["metrics"] = trial.status.observation.metrics + result.append(output) + except multiprocessing.TimeoutError: + raise TimeoutError( + f"Timeout to list Katib Trials in namespace: {namespace}" ) - + except Exception: + raise RuntimeError(f"Failed to list Katib Trials in namespace: {namespace}") return result def get_optimal_hyperparameters( - self, name=None, namespace=utils.get_default_target_namespace() + self, + name: str, + namespace: str = utils.get_default_target_namespace(), + timeout: int = constants.DEFAULT_TIMEOUT, ): """Get the current optimal Trial from the Experiment. - :param name: Experiment name. - :param namespace: Experiment namespace. + Args: + name: Name for the Experiment. + namespace: Namespace for the Experiment. + timeout: Optional, Kubernetes API server timeout in seconds + to execute the request. - :return: Current optimal Trial for the Experiment. - :rtype: dict - """ + Returns: + V1beta1OptimalTrial: The most optimal Trial for the Experiment. + It returns `None` if Experiment does not have optimal Trial yet. - katibexp = self.get_experiment(name, namespace=namespace) - result = {} - result["currentOptimalTrial"] = katibexp.get("status", {}).get( - "currentOptimalTrial" - ) + Raises: + TimeoutError: Timeout to get Katib Experiment. + RuntimeError: Failed to get Katib Experiment. + """ - return result + experiment = self.get_experiment(name, namespace, timeout) + if ( + experiment.status + and experiment.status.current_optimal_trial + and experiment.status.current_optimal_trial.observation.metrics + ): + return experiment.status.current_optimal_trial + else: + return None def get_trial_metrics( self, name: str, namespace: str = utils.get_default_target_namespace(), - db_manager_address=constants.DEFAULT_DB_MANAGER_ADDRESS, + db_manager_address: str = constants.DEFAULT_DB_MANAGER_ADDRESS, + timeout: str = constants.DEFAULT_TIMEOUT, ): """Get the Trial Metric Results from the Katib DB. Katib DB Manager service should be accessible while calling this API. @@ -713,10 +1114,16 @@ def get_trial_metrics( name: Name for the Trial. namespace: Namespace for the Trial. db-manager-address: Address for the Katib DB Manager in this format: `ip-address:port`. + timeout: Optional, gRPC API Server timeout in seconds to get metrics. - Returns: List of MetricLog objects (https://github.com/kubeflow/katib/blob/4a2db414d85f29f17bc8ec6ff3462beef29585da/pkg/apis/manager/v1beta1/gen-doc/api.md#api-v1-beta1-MetricLog). + Returns: + List of MetricLog objects + (https://github.com/kubeflow/katib/blob/4a2db414d85f29f17bc8ec6ff3462beef29585da/pkg/apis/manager/v1beta1/gen-doc/api.md#api-v1-beta1-MetricLog). For example, to get the first metric value run the following: `get_trial_metrics(...)[0].metric.value + + Raises: + RuntimeError: Unable to get Trial metrics. """ db_manager_address = db_manager_address.split(":") @@ -730,11 +1137,11 @@ def get_trial_metrics( # When metric name is empty, we select all logs from the Katib DB. observation_logs = client.GetObservationLog( katib_api_pb2.GetObservationLogRequest(trial_name=name), - timeout=constants.APISERVER_TIMEOUT, + timeout=timeout, ) except Exception as e: raise RuntimeError( - f"Unable to get metrics for Trial {name} in namespace {namespace}. Exception: {e}" + f"Unable to get metrics for Trial {namespace}/{name}. Exception: {e}" ) return observation_logs.observation_log.metric_logs diff --git a/sdk/python/v1beta1/kubeflow/katib/constants/constants.py b/sdk/python/v1beta1/kubeflow/katib/constants/constants.py index d1178be4d2d..62ac68571d6 100644 --- a/sdk/python/v1beta1/kubeflow/katib/constants/constants.py +++ b/sdk/python/v1beta1/kubeflow/katib/constants/constants.py @@ -15,7 +15,7 @@ import os # How long to wait in seconds for requests to the Kubernetes or gRPC API Server. -APISERVER_TIMEOUT = 120 +DEFAULT_TIMEOUT = 120 # Global CRD version KATIB_VERSION = os.environ.get("EXPERIMENT_VERSION", "v1beta1") @@ -30,6 +30,24 @@ DEFAULT_PRIMARY_CONTAINER_NAME = "training-container" +# Label to identify Experiment's resources. +EXPERIMENT_LABEL = "katib.kubeflow.org/experiment" + +# True means that Katib CR is in this condition. +CONDITION_STATUS_TRUE = "True" + +# Experiment conditions. +# TODO (andreyvelich): Use API enums when Katib SDK supports it. +# Ref: https://github.com/kubeflow/katib/issues/1969. +EXPERIMENT_CONDITION_CREATED = "Created" +EXPERIMENT_CONDITION_RUNNING = "Running" +EXPERIMENT_CONDITION_RESTARTING = "Restarting" +EXPERIMENT_CONDITION_SUCCEEDED = "Succeeded" +EXPERIMENT_CONDITION_FAILED = "Failed" + +# Trial conditions. +TRIAL_CONDITION_SUCCEEDED = "Succeeded" + # Supported base images for the Katib Trials. # TODO (andreyvelich): Implement list_base_images function to get each image description. BASE_IMAGE_TENSORFLOW = "docker.io/tensorflow/tensorflow:2.11.0" diff --git a/sdk/python/v1beta1/kubeflow/katib/utils/utils.py b/sdk/python/v1beta1/kubeflow/katib/utils/utils.py index 3946f381332..2a709c55002 100644 --- a/sdk/python/v1beta1/kubeflow/katib/utils/utils.py +++ b/sdk/python/v1beta1/kubeflow/katib/utils/utils.py @@ -18,6 +18,9 @@ from typing import Callable import inspect +from kubeflow.katib import models +from kubeflow.katib.constants import constants + def is_running_in_k8s(): return os.path.isdir("/var/run/secrets/kubernetes.io/") @@ -40,6 +43,36 @@ def set_katib_namespace(katib): return namespace +def has_condition(conditions, condition_type): + """Verify if the condition list has the required condition. + Condition should be valid object with `type` and `status`. + """ + + for c in conditions: + if c.type == condition_type and c.status == constants.CONDITION_STATUS_TRUE: + return True + return False + + +def print_experiment_status(experiment: models.V1beta1Experiment): + if experiment.status: + print( + "Experiment Trials status: {} Trials, {} Pending Trials, " + "{} Running Trials, {} Succeeded Trials, {} Failed Trials, " + "{} EarlyStopped Trials, {} MetricsUnavailable Trials".format( + experiment.status.trials or 0, + experiment.status.trials_pending or 0, + experiment.status.trials_running or 0, + experiment.status.trials_succeeded or 0, + experiment.status.trials_failed or 0, + experiment.status.trials_early_stopped or 0, + experiment.status.trial_metrics_unavailable or 0, + ) + ) + print(f"Current Optimal Trial:\n {experiment.status.current_optimal_trial}") + print(f"Experiment conditions:\n {experiment.status.conditions}") + + def validate_objective_function(objective: Callable): # Check if objective function is callable. diff --git a/sdk/python/v1beta1/setup.py b/sdk/python/v1beta1/setup.py index 31ddbb28aaf..8b8a16546ee 100644 --- a/sdk/python/v1beta1/setup.py +++ b/sdk/python/v1beta1/setup.py @@ -23,6 +23,7 @@ "urllib3>=1.15.1", "kubernetes>=23.6.0", "grpcio==1.41.1", + "protobuf==3.19.5", ] katib_grpc_api_file = "../../../pkg/apis/manager/v1beta1/python/api_pb2.py" diff --git a/test/e2e/v1beta1/hack/gh-actions/run-e2e-experiment.go b/test/e2e/v1beta1/hack/gh-actions/run-e2e-experiment.go deleted file mode 100644 index d3021f6ed98..00000000000 --- a/test/e2e/v1beta1/hack/gh-actions/run-e2e-experiment.go +++ /dev/null @@ -1,370 +0,0 @@ -/* -Copyright 2022 The Kubeflow Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package main - -import ( - "bytes" - "context" - "fmt" - "log" - "os" - "os/exec" - "strconv" - "time" - - appsv1 "k8s.io/api/apps/v1" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/equality" - "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/types" - k8syaml "k8s.io/apimachinery/pkg/util/yaml" - "sigs.k8s.io/controller-runtime/pkg/client" - - commonv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/common/v1beta1" - experimentsv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/experiments/v1beta1" - controllerUtil "github.com/kubeflow/katib/pkg/controller.v1beta1/util" - "github.com/kubeflow/katib/pkg/util/v1beta1/katibclient" -) - -func main() { - // First argument should be Experiment yaml path. - if len(os.Args) != 2 { - log.Fatal("Path to Experiment yaml is missing") - } - expPath := os.Args[1] - byteExp, err := os.ReadFile(expPath) - if err != nil { - log.Fatalf("Error in reading file: %v", err) - } - - exp := &experimentsv1beta1.Experiment{} - buf := bytes.NewBufferString(string(byteExp)) - if err := k8syaml.NewYAMLOrJSONDecoder(buf, 1024).Decode(exp); err != nil { - log.Fatal("Yaml decode error ", err) - } - - kclient, err := katibclient.NewClient(client.Options{}) - if err != nil { - log.Fatalf("Create NewClient for Katib failed: %v", err) - } - - var maxTrials int32 = 2 - var parallelTrials int32 = 1 - var maxFailedTrial int32 = 0 - // For random we test 2 parallel execution. - if exp.Name == "random" { - maxTrials = 3 - parallelTrials = 2 - maxFailedTrial = 3 - } - if exp.Spec.Algorithm.AlgorithmName != "hyperband" && exp.Spec.Algorithm.AlgorithmName != "darts" { - // Hyperband will validate the parallel trial count, - // thus we should not change it. - // Not necessary to test parallel Trials for Darts. - exp.Spec.MaxTrialCount = &maxTrials - exp.Spec.ParallelTrialCount = ¶llelTrials - } - exp.Spec.TrialTemplate.Retain = true - exp.Spec.MaxFailedTrialCount = &maxFailedTrial - - log.Printf("Creating Experiment %v with MaxTrialCount: %v, ParallelTrialCount: %v", exp.Name, maxTrials, parallelTrials) - err = kclient.CreateRuntimeObject(exp) - if err != nil { - log.Fatalf("CreateRuntimeObject failed: %v", err) - } - - // Wait until Experiment is finished. - exp, err = waitExperimentFinish(kclient, exp) - if err != nil { - // Delete Experiment in case of error. - log.Printf("Deleting Experiment %v\n", exp.Name) - if kclient.DeleteRuntimeObject(exp) != nil { - log.Fatalf("Unable to delete Experiment %v, error: %v", exp.Name, err) - } - log.Fatalf("Wait Experiment finish failed: %v", err) - } - - // For random example and from volume we restart Experiment. - if exp.Name == "random" || exp.Name == "from-volume-resume" { - // Increase parallel Trials and max Trials counts. - parallelTrials++ - maxTrials += parallelTrials + 1 - exp.Spec.MaxTrialCount = &maxTrials - exp.Spec.ParallelTrialCount = ¶llelTrials - - // Update Experiment with new info. - err := kclient.UpdateRuntimeObject(exp) - if err != nil { - // Delete Experiment in case of error. - log.Printf("Deleting Experiment %v\n", exp.Name) - if kclient.DeleteRuntimeObject(exp) != nil { - log.Fatalf("Unable to delete Experiment %v, error: %v", exp.Name, err) - } - log.Fatalf("UpdateRuntimeObject failed: %v", err) - } - - log.Printf("Restarting Experiment %v with MaxTrialCount: %v, ParallelTrialCount: %v\n\n", - exp.Name, maxTrials, parallelTrials) - - // Wait until Experiment is restarted. - timeout := 60 * time.Second - endTime := time.Now().Add(timeout) - for time.Now().Before(endTime) { - exp, err = kclient.GetExperiment(exp.Name, exp.Namespace) - if err != nil { - // Delete Experiment in case of error - log.Printf("Deleting Experiment %v\n", exp.Name) - if kclient.DeleteRuntimeObject(exp) != nil { - log.Fatalf("Unable to delete Experiment %v, error: %v", exp.Name, err) - } - log.Fatalf("Get Experiment error: %v", err) - } - // Once Experiment is restarted stop the waiting loop. - if exp.IsRestarting() { - break - } - time.Sleep(1 * time.Second) - } - // Check if Experiment is not restarting and is not running. - if !exp.IsRestarting() && !exp.IsRunning() { - // Delete experiment in case of error. - log.Printf("Deleting Experiment %v\n", exp.Name) - if kclient.DeleteRuntimeObject(exp) != nil { - log.Fatalf("Unable to delete Experiment %v, error: %v", exp.Name, err) - } - log.Fatalf("Unable to restart Experiment %v, Experiment conditions: %v", exp.Name, exp.Status.Conditions) - } - // Wait until Experiment is finished. - exp, err = waitExperimentFinish(kclient, exp) - if err != nil { - // Delete experiment in case of error - log.Printf("Deleting Experiment %v\n", exp.Name) - if kclient.DeleteRuntimeObject(exp) != nil { - log.Fatalf("Unable to delete Experiment %v, error: %v", exp.Name, err) - } - log.Fatalf("Wait Experiment finish failed: %v", err) - } - } - - // Verify Experiment results - err = verifyExperimentResults(kclient, exp) - if err != nil { - // Delete Experiment in case of error - log.Printf("Deleting Experiment %v\n", exp.Name) - if kclient.DeleteRuntimeObject(exp) != nil { - log.Fatalf("Unable to delete Experiment %v, error: %v", exp.Name, err) - } - log.Fatalf("Verify Experiment results failed: %v", err) - } - - // Print results. - err = printResults(exp) - if err != nil { - // Delete Experiment in case of error. - log.Printf("Deleting Experiment %v\n", exp.Name) - if kclient.DeleteRuntimeObject(exp) != nil { - log.Fatalf("Unable to delete Experiment %v, error: %v", exp.Name, err) - } - log.Fatalf("Print Experiment results failed: %v", err) - } - - // Delete Experiment. - log.Printf("Deleting Experiment %v\n", exp.Name) - if kclient.DeleteRuntimeObject(exp) != nil { - log.Fatalf("Unable to delete Experiment %v, error: %v", exp.Name, err) - } - -} - -func waitExperimentFinish(kclient katibclient.Client, exp *experimentsv1beta1.Experiment) (*experimentsv1beta1.Experiment, error) { - // Experiment should be completed before the timeout. - timeout := 50 * time.Minute - for endTime := time.Now().Add(timeout); time.Now().Before(endTime); { - exp, err := kclient.GetExperiment(exp.Name, exp.Namespace) - if err != nil { - return exp, fmt.Errorf("Get Experiment error: %v", err) - } - - log.Printf("Waiting for Experiment %s to finish", exp.Name) - log.Printf("Experiment is running: %v Trials, %v Pending Trials, %v Running Trials, %v Succeeded Trials,"+ - " %v Failed Trials, %v EarlyStopped Trials, %v MetricsUnavailable Trials", - exp.Status.Trials, exp.Status.TrialsPending, exp.Status.TrialsRunning, exp.Status.TrialsSucceeded, - exp.Status.TrialsFailed, exp.Status.TrialsEarlyStopped, exp.Status.TrialMetricsUnavailable) - log.Printf("Current optimal Trial: %v", exp.Status.CurrentOptimalTrial) - log.Printf("Experiment conditions: %v\n\n\n", exp.Status.Conditions) - - // Check if Experiment is completed. - if exp.IsCompleted() { - log.Printf("Experiment %v is finished", exp.Name) - if exp.IsFailed() { - return exp, fmt.Errorf("Experiment %v is failed", exp.Name) - } - // Print latest condition message. - log.Printf("%v\n\n", exp.Status.Conditions[len(exp.Status.Conditions)-1].Message) - // Print Suggestion conditions. - suggestion, err := kclient.GetSuggestion(exp.Name, exp.Namespace) - if err != nil { - return exp, fmt.Errorf("Get Suggestion error: %v", err) - } - log.Printf("Suggestion %v. Conditions: %v", suggestion.Name, suggestion.Status.Conditions) - log.Printf("Suggestion %v. Suggestions: %v\n\n", suggestion.Name, suggestion.Status.Suggestions) - - // Return succeeded Experiment. - return exp, nil - } - time.Sleep(20 * time.Second) - } - - // If loop is end, Experiment is not finished. - return exp, fmt.Errorf("Experiment run timed out") -} - -func verifyExperimentResults(kclient katibclient.Client, exp *experimentsv1beta1.Experiment) error { - - // Current optimal Trial should be set. - if equality.Semantic.DeepEqual(exp.Status.CurrentOptimalTrial, experimentsv1beta1.OptimalTrial{}) { - return fmt.Errorf("Current optimal Trial is empty. Experiment status: %v", exp.Status) - } - - // Best objective metric should be set. - var bestObjectiveMetric *commonv1beta1.Metric - for _, metric := range exp.Status.CurrentOptimalTrial.Observation.Metrics { - if metric.Name == exp.Spec.Objective.ObjectiveMetricName { - bestObjectiveMetric = &metric - break - } - } - if bestObjectiveMetric == nil { - return fmt.Errorf("Unable to get best metrics for objective: %v", exp.Spec.Objective.ObjectiveMetricName) - } - - // Verify objective metric. - objectiveType := exp.Spec.Objective.Type - goal := exp.Spec.Objective.Goal - // If min metric is set, max be set also. - minMetric, err := strconv.ParseFloat(bestObjectiveMetric.Min, 64) - maxMetric, _ := strconv.ParseFloat(bestObjectiveMetric.Max, 64) - - // If metrics can't be parsed to float or goal is empty, succeeded Trials should be equal to MaxTrialCount. - if (err != nil || goal == nil) && exp.Status.TrialsSucceeded != *exp.Spec.MaxTrialCount { - return fmt.Errorf("All trials are not successful. MaxTrialCount: %v, TrialsSucceeded: %v", - *exp.Spec.MaxTrialCount, exp.Status.TrialsSucceeded) - } - - trialsCompleted := exp.Status.TrialsSucceeded - if exp.Spec.EarlyStopping != nil { - trialsCompleted += exp.Status.TrialsEarlyStopped - } - - // Otherwise, Goal should be reached. - if trialsCompleted != *exp.Spec.MaxTrialCount && - ((objectiveType == commonv1beta1.ObjectiveTypeMinimize && minMetric > *goal) || - (objectiveType == commonv1beta1.ObjectiveTypeMaximize && maxMetric < *goal)) { - return fmt.Errorf(`Objective Goal is not reached and Succeeded Trials: %v != %v MaxTrialCount. - ObjectiveType: %v, Goal: %v, MinMetric: %v, MaxMetric: %v`, - exp.Status.TrialsSucceeded, *exp.Spec.MaxTrialCount, - objectiveType, *goal, minMetric, maxMetric) - } - - err = verifySuggestion(kclient, exp) - if err != nil { - return fmt.Errorf("Verify Suggestion failed: %v", err) - } - return nil -} - -func verifySuggestion(kclient katibclient.Client, exp *experimentsv1beta1.Experiment) error { - - // Verify Suggestion's resources. - sug, err := kclient.GetSuggestion(exp.Name, exp.Namespace) - if err != nil { - return fmt.Errorf("GetSuggestion failed: %v", err) - } - - // When Suggestion is LongRunning, it can't be succeeded. - if exp.Spec.ResumePolicy == experimentsv1beta1.LongRunning && sug.IsSucceeded() { - return fmt.Errorf("Suggestion is succeeded while ResumePolicy = %v", experimentsv1beta1.LongRunning) - } - - // Verify Suggestion with resume policy Never and FromVolume. - if exp.Spec.ResumePolicy == experimentsv1beta1.NeverResume || exp.Spec.ResumePolicy == experimentsv1beta1.FromVolume { - - // Give controller time to delete Suggestion resources and change Suggestion status. - // TODO (andreyvelich): Think about better way to handle this. - time.Sleep(10 * time.Second) - - // When Suggestion has resume policy Never or FromVolume, it should be not running. - if sug.IsRunning() { - return fmt.Errorf("Suggestion is still running while ResumePolicy = %v", exp.Spec.ResumePolicy) - } - - // Suggestion service should be deleted. - serviceName := controllerUtil.GetSuggestionServiceName(sug) - namespacedName := types.NamespacedName{Name: serviceName, Namespace: sug.Namespace} - err = kclient.GetClient().Get(context.TODO(), namespacedName, &corev1.Service{}) - if errors.IsNotFound(err) { - log.Printf("Suggestion service %v has been deleted", serviceName) - } else { - return fmt.Errorf("Suggestion service: %v is still alive while ResumePolicy: %v, error: %v", serviceName, exp.Spec.ResumePolicy, err) - } - - // Suggestion deployment should be deleted. - deploymentName := controllerUtil.GetSuggestionDeploymentName(sug) - namespacedName = types.NamespacedName{Name: deploymentName, Namespace: sug.Namespace} - err = kclient.GetClient().Get(context.TODO(), namespacedName, &appsv1.Deployment{}) - if errors.IsNotFound(err) { - log.Printf("Suggestion deployment %v has been deleted", deploymentName) - } else { - return fmt.Errorf("Suggestion deployment: %v is still alive while ResumePolicy: %v, error: %v", deploymentName, exp.Spec.ResumePolicy, err) - } - - // PVC should not be deleted for Suggestion with resume policy FromVolume. - if exp.Spec.ResumePolicy == experimentsv1beta1.FromVolume { - pvcName := controllerUtil.GetSuggestionPersistentVolumeClaimName(sug) - namespacedName = types.NamespacedName{Name: pvcName, Namespace: sug.Namespace} - err = kclient.GetClient().Get(context.TODO(), namespacedName, &corev1.PersistentVolumeClaim{}) - if errors.IsNotFound(err) { - return fmt.Errorf("suggestion PVC: %v is not alive while ResumePolicy: %v", pvcName, exp.Spec.ResumePolicy) - } - } - } - return nil -} - -func printResults(exp *experimentsv1beta1.Experiment) error { - log.Printf("Experiment has recorded best current Optimal Trial %v\n\n", exp.Status.CurrentOptimalTrial) - - // Describe the Experiment. - cmd := exec.Command("kubectl", "describe", "experiment", exp.Name, "-n", exp.Namespace) - out, err := cmd.Output() - if err != nil { - return fmt.Errorf("Execute \"kubectl describe suggestion\" failed: %v", err) - } - log.Println(cmd.String()) - log.Printf("\n%v\n\n", string(out)) - - // Describe the Suggestion. - cmd = exec.Command("kubectl", "describe", "suggestion", exp.Name, "-n", exp.Namespace) - out, err = cmd.Output() - if err != nil { - return fmt.Errorf("Execute \"kubectl describe experiment\" failed: %v", err) - } - log.Println(cmd.String()) - log.Printf("\n%v", string(out)) - - return nil -} diff --git a/test/e2e/v1beta1/scripts/gh-actions/run-e2e-experiment.py b/test/e2e/v1beta1/scripts/gh-actions/run-e2e-experiment.py new file mode 100644 index 00000000000..3227d6e9fa6 --- /dev/null +++ b/test/e2e/v1beta1/scripts/gh-actions/run-e2e-experiment.py @@ -0,0 +1,279 @@ +import argparse +import yaml +import time +import logging +from kubernetes import client, config +from kubeflow.katib import ApiClient, KatibClient, models +from kubeflow.katib.utils.utils import FakeResponse +from kubeflow.katib.constants import constants + +# Experiment timeout is 40 min. +EXPERIMENT_TIMEOUT = 60 * 40 + +# The default logging config. +logging.basicConfig(level=logging.INFO) + + +def verify_experiment_results( + katib_client: KatibClient, + experiment: models.V1beta1Experiment, + exp_name: str, + exp_namespace: str, +): + + # Get the best objective metric. + best_objective_metric = None + for metric in experiment.status.current_optimal_trial.observation.metrics: + if metric.name == experiment.spec.objective.objective_metric_name: + best_objective_metric = metric + break + + if best_objective_metric is None: + raise Exception( + "Unable to get the best metrics for objective: {}. Current Optimal Trial: {}".format( + experiment.spec.objective.objective_metric_name, + experiment.status.current_optimal_trial, + ) + ) + + # Get Experiment Succeeded reason. + for c in experiment.status.conditions: + if ( + c.type == constants.EXPERIMENT_CONDITION_SUCCEEDED + and c.status == constants.CONDITION_STATUS_TRUE + ): + succeeded_reason = c.reason + break + + trials_completed = experiment.status.trials_succeeded or 0 + trials_completed += experiment.status.trials_early_stopped or 0 + max_trial_count = experiment.spec.max_trial_count + + # If Experiment is Succeeded because of Max Trial Reached, all Trials must be completed. + if ( + succeeded_reason == "ExperimentMaxTrialsReached" + and trials_completed != max_trial_count + ): + raise Exception( + "All Trials must be Completed. Max Trial count: {}, Experiment status: {}".format( + max_trial_count, experiment.status + ) + ) + + # If Experiment is Succeeded because of Goal reached, the metrics must be correct. + if succeeded_reason == "ExperimentGoalReached" and ( + ( + experiment.spec.objective.type == "minimize" + and float(best_objective_metric.min) > float(experiment.spec.objective.goal) + ) + or ( + experiment.spec.objective.type == "maximize" + and float(best_objective_metric.max) < float(experiment.spec.objective.goal) + ) + ): + raise Exception( + "Experiment goal is reached, but metrics are incorrect. " + f"Experiment objective: {experiment.spec.objective}. " + f"Experiment best objective metric: {best_objective_metric}" + ) + + # Verify Suggestion's resources. Suggestion name = Experiment name. + suggestion = katib_client.get_suggestion(exp_name, exp_namespace) + + # For the Never or FromVolume resume policies Suggestion must be Succeeded. + # For the LongRunning resume policy Suggestion must be always Running. + for c in suggestion.status.conditions: + if ( + c.type == constants.EXPERIMENT_CONDITION_SUCCEEDED + and c.status == constants.CONDITION_STATUS_TRUE + and experiment.spec.resume_policy == "LongRunning" + ): + raise Exception( + f"Suggestion is Succeeded while Resume Policy is {experiment.spec.resume_policy}." + f"Suggestion conditions: {suggestion.status.conditions}" + ) + elif ( + c.type == constants.EXPERIMENT_CONDITION_RUNNING + and c.status == constants.CONDITION_STATUS_TRUE + and experiment.spec.resume_policy != "LongRunning" + ): + raise Exception( + f"Suggestion is Running while Resume Policy is {experiment.spec.resume_policy}." + f"Suggestion conditions: {suggestion.status.conditions}" + ) + + # For Never and FromVolume resume policies verify Suggestion's resources. + if ( + experiment.spec.resume_policy == "Never" + or experiment.spec.resume_policy == "FromVolume" + ): + resource_name = exp_name + "-" + experiment.spec.algorithm.algorithm_name + + config.load_kube_config() + # Suggestion's Service and Deployment should be deleted. + for i in range(10): + try: + client.AppsV1Api().read_namespaced_deployment( + resource_name, exp_namespace + ) + except client.ApiException as e: + if e.status == 404: + break + else: + raise e + # Deployment deletion might take some time. + time.sleep(1) + if i == 10: + raise Exception( + "Suggestion Deployment is still alive for Resume Policy: {}".format( + experiment.spec.resume_policy + ) + ) + + try: + client.CoreV1Api().read_namespaced_service(resource_name, exp_namespace) + except client.ApiException as e: + if e.status != 404: + raise e + else: + raise Exception( + "Suggestion Service is still alive for Resume Policy: {}".format( + experiment.spec.resume_policy + ) + ) + + # For FromVolume resume policy PVC should not be deleted. + if experiment.spec.resume_policy == "FromVolume": + try: + client.CoreV1Api().read_namespaced_persistent_volume_claim( + resource_name, exp_namespace + ) + except client.ApiException: + raise Exception("PVC is deleted for FromVolume Resume Policy") + + +def run_e2e_experiment( + katib_client: KatibClient, + experiment: models.V1beta1Experiment, + exp_name: str, + exp_namespace: str, +): + + # Create Katib Experiment and wait until it is finished. + logging.debug( + "Creating Experiment: {}/{} with MaxTrialCount: {}, ParallelTrialCount: {}".format( + exp_namespace, + exp_name, + experiment.spec.max_trial_count, + experiment.spec.parallel_trial_count, + ) + ) + + # Wait until Experiment reaches Succeeded condition. + katib_client.create_experiment(experiment, exp_namespace) + experiment = katib_client.wait_for_experiment_condition( + exp_name, exp_namespace, timeout=EXPERIMENT_TIMEOUT + ) + + # Test resume feature for "FromVolume" and "LongRunning" Experiments. + # TODO (andreyvelich): Once we change the default resume policy to "Never", + # also test restart for "LongRunning" Experiment here instead of "random". + # Ref: https://github.com/kubeflow/katib/issues/2055 + if exp_name == "from-volume-resume" or exp_name == "random": + max_trial_count = experiment.spec.max_trial_count + 1 + parallel_trial_count = experiment.spec.parallel_trial_count + 1 + logging.debug( + f"Restarting Experiment {exp_namespace}/{exp_name} " + f"with MaxTrialCount: {max_trial_count} and ParallelTrialCount: {parallel_trial_count}" + ) + + # Modify Experiment budget. + katib_client.edit_experiment_budget( + exp_name, exp_namespace, max_trial_count, parallel_trial_count + ) + # Wait until Experiment is Restarted. + katib_client.wait_for_experiment_condition( + exp_name, + exp_namespace, + constants.EXPERIMENT_CONDITION_RESTARTING, + EXPERIMENT_TIMEOUT, + ) + # Wait until Experiment is Succeeded. + experiment = katib_client.wait_for_experiment_condition( + exp_name, exp_namespace, timeout=EXPERIMENT_TIMEOUT + ) + + # Verify the Experiment results. + verify_experiment_results(katib_client, experiment, exp_name, exp_namespace) + + # Print the Experiment and Suggestion. + logging.debug(katib_client.get_experiment(exp_name, exp_namespace)) + logging.debug(katib_client.get_suggestion(exp_name, exp_namespace)) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument( + "--experiment-path", + type=str, + required=True, + help="Path to the Katib Experiment.", + ) + parser.add_argument( + "--verbose", action="store_true", help="Verbose output for the Katib E2E test", + ) + args = parser.parse_args() + + if args.verbose: + logging.getLogger().setLevel(logging.DEBUG) + + logging.info("---------------------------------------------------------------") + logging.info("---------------------------------------------------------------") + logging.info(f"Start E2E test for the Katib Experiment: {args.experiment_path}") + + # Read Experiment YAML to Fake Response object. + with open(args.experiment_path, "r") as file: + experiment = FakeResponse(yaml.safe_load(file)) + + # Replace batch size to number of epochs for faster execution. + experiment.data = experiment.data.replace("--batch-size=64", "--num-epochs=2") + + # Convert to the Katib Experiment object. + experiment = ApiClient().deserialize(experiment, "V1beta1Experiment") + exp_name = experiment.metadata.name + exp_namespace = experiment.metadata.namespace + + # Set Trial threshold for Katib Experiments. + MAX_TRIAL_COUNT = 2 + PARALLEL_TRIAL_COUNT = 1 + MAX_FAILED_TRIAL_COUNT = 0 + + # For one random search Experiment we test parallel execution. + if experiment.metadata.name == "random": + MAX_TRIAL_COUNT += 1 + PARALLEL_TRIAL_COUNT += 1 + + # Hyperband will validate the parallel trial count, thus we should not change it. + # We don't need to test parallel Trials for Darts. + if ( + experiment.spec.algorithm.algorithm_name != "hyperband" + and experiment.spec.algorithm.algorithm_name != "darts" + ): + experiment.spec.max_trial_count = MAX_TRIAL_COUNT + experiment.spec.parallel_trial_count = PARALLEL_TRIAL_COUNT + experiment.spec.max_failed_trial_count = MAX_FAILED_TRIAL_COUNT + + katib_client = KatibClient() + try: + run_e2e_experiment(katib_client, experiment, exp_name, exp_namespace) + logging.info("---------------------------------------------------------------") + logging.info(f"E2E is succeeded for Experiment: {exp_namespace}/{exp_name}") + except Exception as e: + logging.info("---------------------------------------------------------------") + logging.info(f"E2E is failed for Experiment: {exp_namespace}/{exp_name}") + raise e + finally: + # Delete the Experiment. + logging.info("---------------------------------------------------------------") + logging.info("---------------------------------------------------------------") + katib_client.delete_experiment(exp_name, exp_namespace) diff --git a/test/e2e/v1beta1/scripts/gh-actions/run-e2e-experiment.sh b/test/e2e/v1beta1/scripts/gh-actions/run-e2e-experiment.sh index 1aac755e6a5..13b050ff98c 100755 --- a/test/e2e/v1beta1/scripts/gh-actions/run-e2e-experiment.sh +++ b/test/e2e/v1beta1/scripts/gh-actions/run-e2e-experiment.sh @@ -44,7 +44,7 @@ fi for exp_name in "${EXPERIMENT_FILE_ARRAY[@]}"; do echo "Running Experiment from $exp_name file" exp_path=$(find ../../../../../examples/v1beta1 -name "${exp_name}.yaml") - ../../bin/run-e2e-experiment "$exp_path" || (kubectl get pods -n kubeflow && exit 1) + python run-e2e-experiment.py --experiment-path "${exp_path}" --verbose || (kubectl get pods -n kubeflow && exit 1) done exit 0 diff --git a/test/e2e/v1beta1/scripts/gh-actions/setup-katib.sh b/test/e2e/v1beta1/scripts/gh-actions/setup-katib.sh index 2e723ad29e9..3bdc255cd01 100755 --- a/test/e2e/v1beta1/scripts/gh-actions/setup-katib.sh +++ b/test/e2e/v1beta1/scripts/gh-actions/setup-katib.sh @@ -93,10 +93,4 @@ if [ $? -ne 1 ]; then fi set -o errexit -# Build the binary for e2e test -echo "Building run-e2e-experiment for e2e test cases" -mkdir -p ../../bin -go build -o ../../bin/run-e2e-experiment ../../hack/gh-actions/run-e2e-experiment.go -chmod +x ../../bin/run-e2e-experiment - exit 0