From d1cedc8a8e8de9aae5a71df3859c9df28609a446 Mon Sep 17 00:00:00 2001 From: Sergey Sklarovs Date: Wed, 6 Nov 2024 17:50:06 +0100 Subject: [PATCH] fix: ENV VAR kv parsing to handle json values - JOB_KUBE_ANNOTATIONS can have JSON as value - This fixes the parsin of such values --- .../build.gradle.kts | 1 + .../commons/workers/config/EnvUtils.java | 66 +++++++++++++++++++ .../workers/config/WorkerConfigsProvider.java | 34 ++-------- .../commons/workers/config/EnvUtilsTest.java | 57 ++++++++++++++++ 4 files changed, 129 insertions(+), 29 deletions(-) create mode 100644 airbyte-commons-with-dependencies/src/main/java/io/airbyte/commons/workers/config/EnvUtils.java create mode 100644 airbyte-commons-with-dependencies/src/test/java/io/airbyte/commons/workers/config/EnvUtilsTest.java diff --git a/airbyte-commons-with-dependencies/build.gradle.kts b/airbyte-commons-with-dependencies/build.gradle.kts index 51b21cdac7e..c2932ffd4ad 100644 --- a/airbyte-commons-with-dependencies/build.gradle.kts +++ b/airbyte-commons-with-dependencies/build.gradle.kts @@ -18,4 +18,5 @@ dependencies { testImplementation(libs.mockito.core) testImplementation(libs.bundles.micronaut.test) + testImplementation(libs.assertj.core) } diff --git a/airbyte-commons-with-dependencies/src/main/java/io/airbyte/commons/workers/config/EnvUtils.java b/airbyte-commons-with-dependencies/src/main/java/io/airbyte/commons/workers/config/EnvUtils.java new file mode 100644 index 00000000000..71c53dbd9d5 --- /dev/null +++ b/airbyte-commons-with-dependencies/src/main/java/io/airbyte/commons/workers/config/EnvUtils.java @@ -0,0 +1,66 @@ +package io.airbyte.commons.workers.config; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +@Slf4j +@NoArgsConstructor(access = AccessLevel.PRIVATE) +class EnvUtils { + + private static final Pattern KEY_JSON_VALUE_PATTERN = Pattern.compile("(?[^=]+)\\s*?=\\s*?(?\\{[^=]+})\\s*,?\\s*"); + private static final Pattern KEY_VALUE_PATTERN = Pattern.compile("(?[^=]+)=(?[^=,]+),?"); + private static final String KEY_GROUP_ALIAS = "key"; + private static final String VALUE_GROUP_ALIAS = "value"; + + /** + * Splits key value pairs from the input string into a map. Each kv-pair is separated by a ','. + *
+ * The key and the value are separated by '='. + *

+ * For example:- The following represents two map entries + *

+ * - key1=value1,key2=value2 + *
+ * - key1={key11: value1},key2={key22: value2} + *
+ * - key1={key11: value11, key12: value12},key2={key21: value21, key22: value22} + * + * @param input string + * @return map containing kv pairs + */ + public static Map splitKVPairsFromEnvString(final String input) { + if (input == null || input.isBlank()) { + return Map.of(); + } + final Map jsonValuesMatchResult = match(input, KEY_JSON_VALUE_PATTERN); + return jsonValuesMatchResult.isEmpty() + ? getKVPairsMatchedWithSimplePattern(input) + : jsonValuesMatchResult; + } + + private static Map match(final String input, final Pattern pattern) { + final Matcher matcher = pattern.matcher(input); + final Map kvResult = new HashMap<>(); + while (matcher.find()) { + kvResult.put(matcher.group(KEY_GROUP_ALIAS).trim(), matcher.group(VALUE_GROUP_ALIAS).trim()); + } + return kvResult; + } + + private static Map getKVPairsMatchedWithSimplePattern(final String input) { + final Map stringMatchResult = match(input, KEY_VALUE_PATTERN); + if (stringMatchResult.isEmpty()) { + log.warn("No valid key value pairs found in the input string: {}", input); + return Collections.emptyMap(); + } + return stringMatchResult; + } + +} diff --git a/airbyte-commons-with-dependencies/src/main/java/io/airbyte/commons/workers/config/WorkerConfigsProvider.java b/airbyte-commons-with-dependencies/src/main/java/io/airbyte/commons/workers/config/WorkerConfigsProvider.java index f54ed6f0eb6..65c40bcd6c4 100644 --- a/airbyte-commons-with-dependencies/src/main/java/io/airbyte/commons/workers/config/WorkerConfigsProvider.java +++ b/airbyte-commons-with-dependencies/src/main/java/io/airbyte/commons/workers/config/WorkerConfigsProvider.java @@ -4,7 +4,6 @@ package io.airbyte.commons.workers.config; -import com.google.common.base.Splitter; import com.google.common.base.Strings; import io.airbyte.config.ResourceRequirements; import io.airbyte.config.ResourceRequirementsType; @@ -24,7 +23,6 @@ import java.util.Optional; import java.util.regex.Matcher; import java.util.regex.Pattern; -import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; /** @@ -210,25 +208,25 @@ private WorkerConfigs getConfig(final KubeResourceKey key) { .orElseThrow(() -> new NoSuchElementException(String.format("Unable to find config: {variant:%s, type:%s, subtype:%s}", key.variant, key.type, key.subType))); - final Map isolatedNodeSelectors = splitKVPairsFromEnvString(workerConfigsDefaults.isolatedNodeSelectors); + final Map isolatedNodeSelectors = EnvUtils.splitKVPairsFromEnvString(workerConfigsDefaults.isolatedNodeSelectors); validateIsolatedPoolConfigInitialization(workerConfigsDefaults.useCustomNodeSelector(), isolatedNodeSelectors); // if annotations are not defined for this specific resource, then fallback to the default // resource's annotations final Map annotations; if (Strings.isNullOrEmpty(kubeResourceConfig.getAnnotations())) { - annotations = splitKVPairsFromEnvString(workerConfigsDefaults.defaultKubeResourceConfig.getAnnotations()); + annotations = EnvUtils.splitKVPairsFromEnvString(workerConfigsDefaults.defaultKubeResourceConfig.getAnnotations()); } else { - annotations = splitKVPairsFromEnvString(kubeResourceConfig.getAnnotations()); + annotations = EnvUtils.splitKVPairsFromEnvString(kubeResourceConfig.getAnnotations()); } return new WorkerConfigs( getResourceRequirementsFrom(kubeResourceConfig, workerConfigsDefaults.defaultKubeResourceConfig()), TolerationPOJO.getJobKubeTolerations(workerConfigsDefaults.jobKubeTolerations()), - splitKVPairsFromEnvString(kubeResourceConfig.getNodeSelectors()), + EnvUtils.splitKVPairsFromEnvString(kubeResourceConfig.getNodeSelectors()), workerConfigsDefaults.useCustomNodeSelector() ? Optional.of(isolatedNodeSelectors) : Optional.empty(), annotations, - splitKVPairsFromEnvString(kubeResourceConfig.getLabels()), + EnvUtils.splitKVPairsFromEnvString(kubeResourceConfig.getLabels()), workerConfigsDefaults.mainContainerImagePullSecret(), workerConfigsDefaults.mainContainerImagePullPolicy()); } @@ -323,28 +321,6 @@ private Optional parseKubeResourceKey(final String value) { return Optional.empty(); } - /** - * Splits key value pairs from the input string into a map. Each kv-pair is separated by a ','. The - * key and the value are separated by '='. - *

- * For example:- The following represents two map entries - *

- * key1=value1,key2=value2 - * - * @param input string - * @return map containing kv pairs - */ - private Map splitKVPairsFromEnvString(final String input) { - if (input == null || input.isBlank()) { - return Map.of(); - } - return Splitter.on(",") - .splitToStream(input) - .filter(s -> !Strings.isNullOrEmpty(s) && s.contains("=")) - .map(s -> s.split("=")) - .collect(Collectors.toMap(s -> s[0].trim(), s -> s[1].trim())); - } - private ResourceRequirements getResourceRequirementsFrom(final KubeResourceConfig kubeResourceConfig, final KubeResourceConfig defaultConfig) { return new ResourceRequirements() .withCpuLimit(useDefaultIfEmpty(kubeResourceConfig.getCpuLimit(), defaultConfig.getCpuLimit())) diff --git a/airbyte-commons-with-dependencies/src/test/java/io/airbyte/commons/workers/config/EnvUtilsTest.java b/airbyte-commons-with-dependencies/src/test/java/io/airbyte/commons/workers/config/EnvUtilsTest.java new file mode 100644 index 00000000000..cb29292d630 --- /dev/null +++ b/airbyte-commons-with-dependencies/src/test/java/io/airbyte/commons/workers/config/EnvUtilsTest.java @@ -0,0 +1,57 @@ +package io.airbyte.commons.workers.config; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.Collections; +import java.util.Map; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; + +class EnvUtilsTest { + + @ParameterizedTest + @MethodSource("splitKVPairsFromEnvString") + void splitKVPairsFromEnvString(String input, Map expected) { + final Map result = EnvUtils.splitKVPairsFromEnvString(input); + assertThat(result).isEqualTo(expected); + } + + private static Stream splitKVPairsFromEnvString() { + return Stream.of( + // unmatched + Arguments.of("key1", Collections.emptyMap()), + Arguments.of("key1,value", Collections.emptyMap()), + Arguments.of("key1-value", Collections.emptyMap()), + Arguments.of("key1:value", Collections.emptyMap()), + // matched k:v pairs + Arguments.of("key1=value1", Map.of("key1", "value1")), + Arguments.of("key1 = value1", Map.of("key1", "value1")), + Arguments.of("key1=value1,key2=value2", Map.of("key1", "value1", "key2", "value2")), + Arguments.of("key1 = value1, key2 = value2", Map.of("key1", "value1", "key2", "value2")), + // matched k:jsonV pairs + Arguments.of("key1={value1}", Map.of("key1", "{value1}")), + Arguments.of("key1={ value1 }", Map.of("key1", "{ value1 }")), + Arguments.of("key1 = { value1 }", Map.of("key1", "{ value1 }")), + Arguments.of("key1={value1},key2={value2}", Map.of("key1", "{value1}", "key2", "{value2}")), + Arguments.of("key1= {value1} , key2={value2}", Map.of("key1", "{value1}", "key2", "{value2}")), + Arguments.of("key1= {value1 } , key2= { value2}", Map.of("key1", "{value1 }", "key2", "{ value2}")), + Arguments.of("key1={key11: value11},key2={key22: value22}", Map.of( + "key1", "{key11: value11}", + "key2", "{key22: value22}" + )), + Arguments.of("key1={key11: value11, key12: value12},key2={key21: value21, key22: value22}", Map.of( + "key1", "{key11: value11, key12: value12}", + "key2", "{key21: value21, key22: value22}" + )), + Arguments.of("key1={key11: value11, key12: value12, key13: {key131: value131}}," + + "key2={key21: value21, key22: value22, key23: {key231: value231}}", Map.of( + "key1", "{key11: value11, key12: value12, key13: {key131: value131}}", + "key2", "{key21: value21, key22: value22, key23: {key231: value231}}" + )) + ); + } + +}