From 8ea2606e8fdb8f068cdf1d53f45c0307c8d10a25 Mon Sep 17 00:00:00 2001 From: "Ricardo M. Oliveira" Date: Tue, 19 Nov 2024 18:11:32 -0300 Subject: [PATCH] Backport fixes in kubeflow/pipelines#11075 Introduced back the functions to convert k8s size values to float, but moved to kfp.dsl.utils Signed-off-by: Ricardo M. Oliveira --- sdk/RELEASE.md | 1 + sdk/python/kfp/compiler/compiler_test.py | 18 +++++ sdk/python/kfp/compiler/compiler_utils.py | 78 +++++++++++++++++++ .../kfp/compiler/pipeline_spec_builder.py | 12 +++ .../pipeline_with_resource_spec.yaml | 12 ++- 5 files changed, 117 insertions(+), 4 deletions(-) diff --git a/sdk/RELEASE.md b/sdk/RELEASE.md index 171d83548eb..8aaea8e8e60 100644 --- a/sdk/RELEASE.md +++ b/sdk/RELEASE.md @@ -7,6 +7,7 @@ ## Deprecations ## Bug fixes and other changes +* Backport fixes in kubeflow/pipelines#11075. [\#11392])(https://github.com/kubeflow/pipelines/pull/11392) ## Documentation updates diff --git a/sdk/python/kfp/compiler/compiler_test.py b/sdk/python/kfp/compiler/compiler_test.py index 2433f09bc6d..1097c06d521 100644 --- a/sdk/python/kfp/compiler/compiler_test.py +++ b/sdk/python/kfp/compiler/compiler_test.py @@ -3437,6 +3437,9 @@ def simple_pipeline(): self.assertEqual( '5', dict_format['deploymentSpec']['executors']['exec-return-1-2'] ['container']['resources']['resourceCpuLimit']) + self.assertEqual( + 5.0, dict_format['deploymentSpec']['executors']['exec-return-1-2'] + ['container']['resources']['cpuLimit']) self.assertNotIn( 'memoryLimit', dict_format['deploymentSpec']['executors'] ['exec-return-1-2']['container']['resources']) @@ -3444,6 +3447,9 @@ def simple_pipeline(): self.assertEqual( '50G', dict_format['deploymentSpec']['executors']['exec-return-1-3'] ['container']['resources']['resourceMemoryLimit']) + self.assertEqual( + 50.0, dict_format['deploymentSpec']['executors']['exec-return-1-3'] + ['container']['resources']['memoryLimit']) self.assertNotIn( 'cpuLimit', dict_format['deploymentSpec']['executors'] ['exec-return-1-3']['container']['resources']) @@ -3451,15 +3457,27 @@ def simple_pipeline(): self.assertEqual( '2', dict_format['deploymentSpec']['executors']['exec-return-1-4'] ['container']['resources']['resourceCpuRequest']) + self.assertEqual( + 2.0, dict_format['deploymentSpec']['executors']['exec-return-1-4'] + ['container']['resources']['cpuRequest']) self.assertEqual( '5', dict_format['deploymentSpec']['executors']['exec-return-1-4'] ['container']['resources']['resourceCpuLimit']) + self.assertEqual( + 5.0, dict_format['deploymentSpec']['executors']['exec-return-1-4'] + ['container']['resources']['cpuLimit']) self.assertEqual( '4G', dict_format['deploymentSpec']['executors']['exec-return-1-4'] ['container']['resources']['resourceMemoryRequest']) + self.assertEqual( + 4.0, dict_format['deploymentSpec']['executors']['exec-return-1-4'] + ['container']['resources']['memoryRequest']) self.assertEqual( '50G', dict_format['deploymentSpec']['executors']['exec-return-1-4'] ['container']['resources']['resourceMemoryLimit']) + self.assertEqual( + 50.0, dict_format['deploymentSpec']['executors']['exec-return-1-4'] + ['container']['resources']['memoryLimit']) class TestPlatformConfig(unittest.TestCase): diff --git a/sdk/python/kfp/compiler/compiler_utils.py b/sdk/python/kfp/compiler/compiler_utils.py index 421bef1ad2c..dc10665944f 100644 --- a/sdk/python/kfp/compiler/compiler_utils.py +++ b/sdk/python/kfp/compiler/compiler_utils.py @@ -15,9 +15,11 @@ import collections import copy +import re from typing import DefaultDict, Dict, List, Mapping, Set, Tuple, Union from kfp import dsl +from kfp.dsl import constants from kfp.dsl import for_loop from kfp.dsl import pipeline_channel from kfp.dsl import pipeline_context @@ -803,3 +805,79 @@ def recursive_replace_placeholders(data: Union[Dict, List], old_value: str, if isinstance(data, pipeline_channel.PipelineChannel): data = str(data) return new_value if data == old_value else data + + +def validate_cpu_request_limit_to_float(cpu: str) -> float: + """Validates cpu request/limit string and converts to its numeric float + value. + + Args: + cpu: CPU requests or limits. This string should be a number or a + number followed by an "m" to indicate millicores (1/1000). For + more information, see `Specify a CPU Request and a CPU Limit + `_. + + Raises: + ValueError if the cpu request/limit string value is invalid. + + Returns: + The numeric value (float) of the cpu request/limit. + """ + if re.match(r'([0-9]*[.])?[0-9]+m?$', cpu) is None: + raise ValueError( + 'Invalid cpu string. Should be float or integer, or integer' + ' followed by "m".') + + return float(cpu[:-1]) / 1000 if cpu.endswith('m') else float(cpu) + + +def validate_memory_request_limit_to_float(memory: str) -> float: + """Validates memory request/limit string and converts to its numeric value. + + Args: + memory: Memory requests or limits. This string should be a number or + a number followed by one of "E", "Ei", "P", "Pi", "T", "Ti", "G", + "Gi", "M", "Mi", "K", or "Ki". + + Raises: + ValueError if the memory request/limit string value is invalid. + + Returns: + The numeric value (float) of the memory request/limit. + """ + if re.match(r'^[0-9]+(E|Ei|P|Pi|T|Ti|G|Gi|M|Mi|K|Ki){0,1}$', + memory) is None: + raise ValueError( + 'Invalid memory string. Should be a number or a number ' + 'followed by one of "E", "Ei", "P", "Pi", "T", "Ti", "G", ' + '"Gi", "M", "Mi", "K", "Ki".') + + if memory.endswith('E'): + memory = float(memory[:-1]) * constants._E / constants._G + elif memory.endswith('Ei'): + memory = float(memory[:-2]) * constants._EI / constants._G + elif memory.endswith('P'): + memory = float(memory[:-1]) * constants._P / constants._G + elif memory.endswith('Pi'): + memory = float(memory[:-2]) * constants._PI / constants._G + elif memory.endswith('T'): + memory = float(memory[:-1]) * constants._T / constants._G + elif memory.endswith('Ti'): + memory = float(memory[:-2]) * constants._TI / constants._G + elif memory.endswith('G'): + memory = float(memory[:-1]) + elif memory.endswith('Gi'): + memory = float(memory[:-2]) * constants._GI / constants._G + elif memory.endswith('M'): + memory = float(memory[:-1]) * constants._M / constants._G + elif memory.endswith('Mi'): + memory = float(memory[:-2]) * constants._MI / constants._G + elif memory.endswith('K'): + memory = float(memory[:-1]) * constants._K / constants._G + elif memory.endswith('Ki'): + memory = float(memory[:-2]) * constants._KI / constants._G + else: + # By default interpret as a plain integer, in the unit of Bytes. + memory = float(memory) / constants._G + + return memory diff --git a/sdk/python/kfp/compiler/pipeline_spec_builder.py b/sdk/python/kfp/compiler/pipeline_spec_builder.py index afc014530fa..cef5f1e8794 100644 --- a/sdk/python/kfp/compiler/pipeline_spec_builder.py +++ b/sdk/python/kfp/compiler/pipeline_spec_builder.py @@ -648,17 +648,29 @@ def convert_to_placeholder(input_value: str) -> str: container_spec.resources.resource_cpu_request = ( convert_to_placeholder( task.container_spec.resources.cpu_request)) + container_spec.resources.cpu_request = compiler_utils.validate_cpu_request_limit_to_float( + cpu=convert_to_placeholder( + task.container_spec.resources.cpu_request)) if task.container_spec.resources.cpu_limit is not None: container_spec.resources.resource_cpu_limit = ( convert_to_placeholder(task.container_spec.resources.cpu_limit)) + container_spec.resources.cpu_limit = compiler_utils.validate_cpu_request_limit_to_float( + cpu=convert_to_placeholder( + task.container_spec.resources.cpu_limit)) if task.container_spec.resources.memory_request is not None: container_spec.resources.resource_memory_request = ( convert_to_placeholder( task.container_spec.resources.memory_request)) + container_spec.resources.memory_request = compiler_utils.validate_memory_request_limit_to_float( + memory=convert_to_placeholder( + task.container_spec.resources.memory_request)) if task.container_spec.resources.memory_limit is not None: container_spec.resources.resource_memory_limit = ( convert_to_placeholder( task.container_spec.resources.memory_limit)) + container_spec.resources.memory_limit = compiler_utils.validate_memory_request_limit_to_float( + memory=convert_to_placeholder( + task.container_spec.resources.memory_limit)) if task.container_spec.resources.accelerator_count is not None: container_spec.resources.accelerator.CopyFrom( pipeline_spec_pb2.PipelineDeploymentConfig.PipelineContainerSpec diff --git a/sdk/python/test_data/pipelines/pipeline_with_resource_spec.yaml b/sdk/python/test_data/pipelines/pipeline_with_resource_spec.yaml index 380151586e9..81595db1803 100644 --- a/sdk/python/test_data/pipelines/pipeline_with_resource_spec.yaml +++ b/sdk/python/test_data/pipelines/pipeline_with_resource_spec.yaml @@ -62,13 +62,17 @@ deploymentSpec: resources: accelerator: count: '1' - type: tpu-v3 + type: 'tpu-v3' resourceCount: '1' - resourceType: tpu-v3 + resourceType: 'tpu-v3' + cpuLimit: 4.0 + cpuRequest: 2.0 + memoryLimit: 15.032385536 + memoryRequest: 4.294967296 resourceCpuLimit: '4' resourceCpuRequest: '2' - resourceMemoryLimit: 14Gi - resourceMemoryRequest: 4Gi + resourceMemoryLimit: '14Gi' + resourceMemoryRequest: '4Gi' pipelineInfo: description: A linear two-step pipeline with resource specification. name: two-step-pipeline-with-resource-spec