From e160db8e8bfba0fdeab9f6a78ff84b4db095c6f3 Mon Sep 17 00:00:00 2001 From: Leonid Kozhinov Date: Sun, 17 Mar 2024 01:18:53 +0100 Subject: [PATCH 1/3] Use merge policy instead of fallbacks for KubeResourceConfig --- .../workers/config/KubeResourceConfig.java | 54 +++++++++++--- .../workers/config/WorkerConfigsProvider.java | 74 +++++++++---------- .../WorkerConfigProviderMicronautTest.java | 8 +- .../resources/application-config-test.yaml | 10 ++- .../src/main/resources/application.yml | 1 + 5 files changed, 93 insertions(+), 54 deletions(-) diff --git a/airbyte-commons-with-dependencies/src/main/java/io/airbyte/commons/workers/config/KubeResourceConfig.java b/airbyte-commons-with-dependencies/src/main/java/io/airbyte/commons/workers/config/KubeResourceConfig.java index 49fbddcdc1b..07a3f5a2501 100644 --- a/airbyte-commons-with-dependencies/src/main/java/io/airbyte/commons/workers/config/KubeResourceConfig.java +++ b/airbyte-commons-with-dependencies/src/main/java/io/airbyte/commons/workers/config/KubeResourceConfig.java @@ -7,13 +7,20 @@ import io.micronaut.context.annotation.EachProperty; import io.micronaut.context.annotation.Parameter; +import java.util.Objects; +import java.util.Optional; + /** * Encapsulates the configuration that is specific to Kubernetes. This is meant for the - * WorkerConfigsProvider to be reading configs, not for direct use as fallback logic isn't + * WorkerConfigsProvider to be reading configs, not for direct use as merge logic isn't * implemented here. + * Important: we cannot distinguish between empty and non-existent environment variables + * in this context, so we treat empty and non-existing strings as the same for our update + * logic. We use the literal to represent an empty string. */ @EachProperty("airbyte.worker.kube-job-configs") -public final class KubeResourceConfig { +public final class KubeResourceConfig implements Cloneable { + public static final String EMPTY_VALUE = ""; private final String name; private String annotations; @@ -28,36 +35,57 @@ public KubeResourceConfig(@Parameter final String name) { this.name = name; } + public KubeResourceConfig clone() { + try { + return (KubeResourceConfig) super.clone(); + } catch (final CloneNotSupportedException e) { + // Unlikely, but in the worst case, we will get this error when running tests. + throw new RuntimeException(e); + } + } + + public KubeResourceConfig update(KubeResourceConfig other) { + annotations = useOtherIfEmpty(other.annotations, annotations); + labels = useOtherIfEmpty(other.labels, labels); + nodeSelectors = useOtherIfEmpty(other.nodeSelectors, nodeSelectors); + cpuLimit = useOtherIfEmpty(other.cpuLimit, cpuLimit); + cpuRequest = useOtherIfEmpty(other.cpuRequest, cpuRequest); + memoryLimit = useOtherIfEmpty(other.memoryLimit, memoryLimit); + memoryRequest = useOtherIfEmpty(other.memoryRequest, memoryRequest); + + return this; + } + public String getName() { return name; } public String getAnnotations() { - return annotations; + return resolveEmpty(annotations); } public String getLabels() { - return labels; + return resolveEmpty(labels); } public String getNodeSelectors() { - return nodeSelectors; + return resolveEmpty(nodeSelectors); } public String getCpuLimit() { - return cpuLimit; + return resolveEmpty(cpuLimit); } public String getCpuRequest() { - return cpuRequest; + return resolveEmpty(cpuRequest); } public String getMemoryLimit() { - return memoryLimit; + return resolveEmpty(memoryLimit); } public String getMemoryRequest() { - return memoryRequest; + return resolveEmpty(memoryRequest); } public void setAnnotations(final String annotations) { @@ -88,4 +116,12 @@ public void setMemoryRequest(final String memoryRequest) { this.memoryRequest = memoryRequest; } + private static String useOtherIfEmpty(final String value, final String defaultValue) { + return (value == null || value.isBlank()) ? defaultValue : value; + } + + private static String resolveEmpty(final String value) { + // Let's no return null values as it can be ambiguous + return (Objects.equals(value, EMPTY_VALUE)) ? "" : Optional.ofNullable(value).orElse(""); + } } diff --git a/airbyte-commons-with-dependencies/src/main/java/io/airbyte/commons/workers/config/WorkerConfigsProvider.java b/airbyte-commons-with-dependencies/src/main/java/io/airbyte/commons/workers/config/WorkerConfigsProvider.java index 0f52a0e55fe..ecdeaf4c606 100644 --- a/airbyte-commons-with-dependencies/src/main/java/io/airbyte/commons/workers/config/WorkerConfigsProvider.java +++ b/airbyte-commons-with-dependencies/src/main/java/io/airbyte/commons/workers/config/WorkerConfigsProvider.java @@ -165,7 +165,6 @@ public static ResourceSubType fromValue(final String value) { @Singleton record WorkerConfigsDefaults(WorkerEnvironment workerEnvironment, - @Named("default") KubeResourceConfig defaultKubeResourceConfig, List jobKubeTolerations, @Value("${airbyte.worker.isolated.kube.node-selectors}") String isolatedNodeSelectors, @Value("${airbyte.worker.isolated.kube.use-custom-node-selector}") boolean useCustomNodeSelector, @@ -231,22 +230,13 @@ private WorkerConfigs getConfig(final KubeResourceKey key) { final Map isolatedNodeSelectors = splitKVPairsFromEnvString(workerConfigsDefaults.isolatedNodeSelectors); validateIsolatedPoolConfigInitialization(workerConfigsDefaults.useCustomNodeSelector(), isolatedNodeSelectors); - // if annotations are not defined for this specific resource, then fallback to the default - // resource's annotations - final Map annotations; - if (Strings.isNullOrEmpty(kubeResourceConfig.getAnnotations())) { - annotations = splitKVPairsFromEnvString(workerConfigsDefaults.defaultKubeResourceConfig.getAnnotations()); - } else { - annotations = splitKVPairsFromEnvString(kubeResourceConfig.getAnnotations()); - } - return new WorkerConfigs( workerConfigsDefaults.workerEnvironment(), - getResourceRequirementsFrom(kubeResourceConfig, workerConfigsDefaults.defaultKubeResourceConfig()), + getResourceRequirementsFrom(kubeResourceConfig), workerConfigsDefaults.jobKubeTolerations(), splitKVPairsFromEnvString(kubeResourceConfig.getNodeSelectors()), workerConfigsDefaults.useCustomNodeSelector() ? Optional.of(isolatedNodeSelectors) : Optional.empty(), - annotations, + splitKVPairsFromEnvString(kubeResourceConfig.getAnnotations()), splitKVPairsFromEnvString(kubeResourceConfig.getLabels()), workerConfigsDefaults.mainContainerImagePullSecret(), workerConfigsDefaults.mainContainerImagePullPolicy(), @@ -279,18 +269,20 @@ public ResourceRequirements getResourceRequirements(final ResourceRequirementsTy * Look up resource configs given a key. *

* We are storing configs in a tree like structure. Look up should be handled as such. Keeping in - * mind that we have defaults we want to fallback to, we should perform a complete scan of the - * configs until we find a match to make sure we do not overlook a match. + * mind that we have defaults we want to merge with, we should perform a complete scan of the + * configs, so we can update our result config. */ private Optional getKubeResourceConfig(final KubeResourceKey key) { - // Look up by actual variant - final var resultWithVariant = getKubeResourceConfigByType(kubeResourceConfigs.get(key.variant), key); - if (resultWithVariant.isPresent()) { - return resultWithVariant; + var defaultConfig = getKubeResourceConfigByType(kubeResourceConfigs.get(DEFAULT_VARIANT), key); + if (Objects.equals(key.variant, DEFAULT_VARIANT)) { // fast track + return defaultConfig; } - // no match with exact variant found, try again with the default. - return getKubeResourceConfigByType(kubeResourceConfigs.get(DEFAULT_VARIANT), key); + final var variantConfig = getKubeResourceConfigByType(kubeResourceConfigs.get(key.variant), key); + if (defaultConfig.isEmpty()) { + return variantConfig; + } + return variantConfig.map(kubeResourceConfig -> defaultConfig.get().clone().update(kubeResourceConfig)).or(() -> defaultConfig); } private static Optional getKubeResourceConfigByType( @@ -300,14 +292,16 @@ private static Optional getKubeResourceConfigByType( return Optional.empty(); } - // Look up by actual type - final var resultWithType = getKubeResourceConfigBySubType(configs.get(key.type), key); - if (resultWithType.isPresent()) { - return resultWithType; + var defaultConfig = getKubeResourceConfigBySubType(configs.get(ResourceType.DEFAULT), key); + if (Objects.equals(key.type, ResourceType.DEFAULT)) { // fast track + return defaultConfig; } - // no match with exact type found, try again with the default. - return getKubeResourceConfigBySubType(configs.get(ResourceType.DEFAULT), key); + final var typeConfig = getKubeResourceConfigBySubType(configs.get(key.type), key); + if (defaultConfig.isEmpty()) { + return typeConfig; + } + return typeConfig.map(kubeResourceConfig -> defaultConfig.get().clone().update(kubeResourceConfig)).or(() -> defaultConfig); } private static Optional getKubeResourceConfigBySubType(final Map configBySubType, @@ -316,10 +310,16 @@ private static Optional getKubeResourceConfigBySubType(final return Optional.empty(); } - // Lookup by actual sub type - final var config = configBySubType.get(key.subType); - // if we didn't find a match, try again with the default - return Optional.ofNullable(config != null ? config : configBySubType.get(ResourceSubType.DEFAULT)); + var defaultConfig = Optional.ofNullable(configBySubType.get(ResourceSubType.DEFAULT)); + if (Objects.equals(key.subType, ResourceSubType.DEFAULT)) { // fast track + return defaultConfig; + } + + final var subTypeConfig = Optional.ofNullable(configBySubType.get(key.subType)); + if (defaultConfig.isEmpty()) { + return subTypeConfig; + } + return subTypeConfig.map(kubeResourceConfig -> defaultConfig.get().clone().update(kubeResourceConfig)).or(() -> defaultConfig); } private void validateIsolatedPoolConfigInitialization(final boolean useCustomNodeSelector, final Map isolatedNodeSelectors) { @@ -369,16 +369,12 @@ private Map splitKVPairsFromEnvString(final String input) { .collect(Collectors.toMap(s -> s[0].trim(), s -> s[1].trim())); } - private ResourceRequirements getResourceRequirementsFrom(final KubeResourceConfig kubeResourceConfig, final KubeResourceConfig defaultConfig) { + private ResourceRequirements getResourceRequirementsFrom(final KubeResourceConfig kubeResourceConfig) { return new ResourceRequirements() - .withCpuLimit(useDefaultIfEmpty(kubeResourceConfig.getCpuLimit(), defaultConfig.getCpuLimit())) - .withCpuRequest(useDefaultIfEmpty(kubeResourceConfig.getCpuRequest(), defaultConfig.getCpuRequest())) - .withMemoryLimit(useDefaultIfEmpty(kubeResourceConfig.getMemoryLimit(), defaultConfig.getMemoryLimit())) - .withMemoryRequest(useDefaultIfEmpty(kubeResourceConfig.getMemoryRequest(), defaultConfig.getMemoryRequest())); - } - - private static String useDefaultIfEmpty(final String value, final String defaultValue) { - return (value == null || value.isBlank()) ? defaultValue : value; + .withCpuLimit(kubeResourceConfig.getCpuLimit()) + .withCpuRequest(kubeResourceConfig.getCpuRequest()) + .withMemoryLimit(kubeResourceConfig.getMemoryLimit()) + .withMemoryRequest(kubeResourceConfig.getMemoryRequest()); } } diff --git a/airbyte-commons-with-dependencies/src/test/java/io/airbyte/commons/workers/config/WorkerConfigProviderMicronautTest.java b/airbyte-commons-with-dependencies/src/test/java/io/airbyte/commons/workers/config/WorkerConfigProviderMicronautTest.java index 9c15cce824f..7daa1633c10 100644 --- a/airbyte-commons-with-dependencies/src/test/java/io/airbyte/commons/workers/config/WorkerConfigProviderMicronautTest.java +++ b/airbyte-commons-with-dependencies/src/test/java/io/airbyte/commons/workers/config/WorkerConfigProviderMicronautTest.java @@ -85,9 +85,9 @@ void checkWorkerConfigProvider() { final WorkerConfigs specKubeConfig = workerConfigsProvider.getConfig(ResourceType.SPEC); assertEquals("default cpu limit", specKubeConfig.getResourceRequirements().getCpuLimit()); - assertEquals("", specKubeConfig.getResourceRequirements().getCpuRequest()); + assertEquals("default cpu request", specKubeConfig.getResourceRequirements().getCpuRequest()); assertEquals("spec memory limit", specKubeConfig.getResourceRequirements().getMemoryLimit()); - assertEquals("", specKubeConfig.getResourceRequirements().getMemoryRequest()); + assertEquals("default memory request", specKubeConfig.getResourceRequirements().getMemoryRequest()); } @Test @@ -120,18 +120,22 @@ void testVariantLookups() { testVariant); final ResourceRequirements testSourceDatabase = workerConfigsProvider.getResourceRequirements(ResourceRequirementsType.SOURCE, Optional.of( "database"), testVariant); + final WorkerConfigs checkConfig = workerConfigsProvider.getConfig(ResourceType.CHECK); + final WorkerConfigs specConfig = workerConfigsProvider.getConfig(ResourceType.SPEC); // Testing the variant override lookup assertEquals("5", testSourceApi.getCpuLimit()); assertEquals("10", testSourceDatabase.getCpuLimit()); assertEquals("default cpu limit", sourceApi.getCpuLimit()); assertEquals("default cpu limit", sourceDatabase.getCpuLimit()); + assertEquals(Map.of("check_annotation_key", "check_annotation_value"), checkConfig.getWorkerKubeAnnotations()); // Verifying the default inheritance assertEquals("0.5", sourceApi.getCpuRequest()); assertEquals("1", sourceDatabase.getCpuRequest()); assertEquals("", testSourceApi.getCpuRequest()); assertEquals("", testSourceDatabase.getCpuRequest()); + assertEquals(Map.of("default_annotation_key", "default_annotation_value"), specConfig.getWorkerKubeAnnotations()); } @Test diff --git a/airbyte-commons-with-dependencies/src/test/resources/application-config-test.yaml b/airbyte-commons-with-dependencies/src/test/resources/application-config-test.yaml index d46fb66cb0c..c567950e1db 100644 --- a/airbyte-commons-with-dependencies/src/test/resources/application-config-test.yaml +++ b/airbyte-commons-with-dependencies/src/test/resources/application-config-test.yaml @@ -5,11 +5,12 @@ airbyte: worker: kube-job-configs: default: + annotations: default_annotation_key=default_annotation_value cpu-limit: default cpu limit - cpu-request: ${JOB_MAIN_CONTAINER_CPU_REQUEST:} - memory-request: ${JOB_MAIN_CONTAINER_MEMORY_REQUEST:} + cpu-request: default cpu request + memory-request: default memory request check: - annotations: ${CHECK_JOB_KUBE_ANNOTATIONS:check annotations} + annotations: ${CHECK_JOB_KUBE_ANNOTATIONS:check_annotation_key=check_annotation_value} labels: ${CHECK_JOB_KUBE_LABELS:check labels} node-selectors: ${CHECK_JOB_KUBE_NODE_SELECTORS:check node-selectors} cpu-limit: ${CHECK_JOB_MAIN_CONTAINER_CPU_LIMIT:check cpu limit} @@ -17,7 +18,7 @@ airbyte: memory-limit: ${CHECK_JOB_MAIN_CONTAINER_MEMORY_LIMIT:check mem limit} memory-request: ${CHECK_JOB_MAIN_CONTAINER_MEMORY_REQUEST:check mem request} spec: - annotations: ${SPEC_JOB_KUBE_ANNOTATIONS:spec annotations} + annotations: ${SPEC_JOB_KUBE_ANNOTATIONS:} labels: ${SPEC_JOB_KUBE_LABELS:spec labels} node-selectors: ${SPEC_JOB_KUBE_NODE_SELECTORS:spec node selectors} memory-limit: spec memory limit @@ -38,6 +39,7 @@ airbyte: cpu-request: 42 micronauttest-source: cpu-limit: 5 + cpu-request: # Disable inheritance, force empty value micronauttest-source-database: cpu-limit: 10 mappingtest-source-stderr: diff --git a/airbyte-container-orchestrator/src/main/resources/application.yml b/airbyte-container-orchestrator/src/main/resources/application.yml index 29d16fffe1e..b0db424bac4 100644 --- a/airbyte-container-orchestrator/src/main/resources/application.yml +++ b/airbyte-container-orchestrator/src/main/resources/application.yml @@ -105,6 +105,7 @@ airbyte: kube-job-configs: default: annotations: ${JOB_KUBE_ANNOTATIONS:} + labels: ${JOB_KUBE_LABELS:} node-selectors: ${JOB_KUBE_NODE_SELECTORS:} cpu-limit: ${JOB_MAIN_CONTAINER_CPU_LIMIT:} cpu-request: ${JOB_MAIN_CONTAINER_CPU_REQUEST:} From d0ca46387d9f3820c2b97f1fd78e060dec7e03e7 Mon Sep 17 00:00:00 2001 From: Leonid Kozhinov Date: Sun, 17 Mar 2024 02:42:20 +0100 Subject: [PATCH 2/3] Fix tests --- .../commons/workers/config/KubeResourceConfig.java | 12 ++++++------ .../config/WorkerConfigProviderMicronautTest.java | 9 ++++----- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/airbyte-commons-with-dependencies/src/main/java/io/airbyte/commons/workers/config/KubeResourceConfig.java b/airbyte-commons-with-dependencies/src/main/java/io/airbyte/commons/workers/config/KubeResourceConfig.java index 07a3f5a2501..2c00bfcde10 100644 --- a/airbyte-commons-with-dependencies/src/main/java/io/airbyte/commons/workers/config/KubeResourceConfig.java +++ b/airbyte-commons-with-dependencies/src/main/java/io/airbyte/commons/workers/config/KubeResourceConfig.java @@ -6,20 +6,19 @@ import io.micronaut.context.annotation.EachProperty; import io.micronaut.context.annotation.Parameter; - import java.util.Objects; import java.util.Optional; /** * Encapsulates the configuration that is specific to Kubernetes. This is meant for the - * WorkerConfigsProvider to be reading configs, not for direct use as merge logic isn't - * implemented here. - * Important: we cannot distinguish between empty and non-existent environment variables - * in this context, so we treat empty and non-existing strings as the same for our update - * logic. We use the literal to represent an empty string. + * WorkerConfigsProvider to be reading configs, not for direct use as merge logic isn't implemented + * here. Important: we cannot distinguish between empty and non-existent environment variables in + * this context, so we treat empty and non-existing strings as the same for our update logic. We use + * the "<EMPTY>" literal to represent an empty string. */ @EachProperty("airbyte.worker.kube-job-configs") public final class KubeResourceConfig implements Cloneable { + public static final String EMPTY_VALUE = ""; private final String name; @@ -124,4 +123,5 @@ private static String resolveEmpty(final String value) { // Let's no return null values as it can be ambiguous return (Objects.equals(value, EMPTY_VALUE)) ? "" : Optional.ofNullable(value).orElse(""); } + } diff --git a/airbyte-commons-with-dependencies/src/test/java/io/airbyte/commons/workers/config/WorkerConfigProviderMicronautTest.java b/airbyte-commons-with-dependencies/src/test/java/io/airbyte/commons/workers/config/WorkerConfigProviderMicronautTest.java index 7daa1633c10..4917b85ce64 100644 --- a/airbyte-commons-with-dependencies/src/test/java/io/airbyte/commons/workers/config/WorkerConfigProviderMicronautTest.java +++ b/airbyte-commons-with-dependencies/src/test/java/io/airbyte/commons/workers/config/WorkerConfigProviderMicronautTest.java @@ -5,7 +5,6 @@ package io.airbyte.commons.workers.config; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNull; import io.airbyte.commons.workers.config.WorkerConfigsProvider.ResourceType; import io.airbyte.config.ResourceRequirements; @@ -59,7 +58,7 @@ void verifyTestConfigIsLoaded() { @Test void testKubeConfigIsReadingAllTheFields() { assertEquals("check", checkKubeResourceConfig.getName()); - assertEquals("check annotations", checkKubeResourceConfig.getAnnotations()); + assertEquals("check_annotation_key=check_annotation_value", checkKubeResourceConfig.getAnnotations()); assertEquals("check labels", checkKubeResourceConfig.getLabels()); assertEquals("check node-selectors", checkKubeResourceConfig.getNodeSelectors()); assertEquals("check cpu limit", checkKubeResourceConfig.getCpuLimit()); @@ -71,11 +70,11 @@ void testKubeConfigIsReadingAllTheFields() { @Test void testDefaultFieldBehavior() { assertEquals("spec", specKubeResourceConfig.getName()); - assertEquals("spec annotations", specKubeResourceConfig.getAnnotations()); + assertEquals("", specKubeResourceConfig.getAnnotations()); assertEquals("spec labels", specKubeResourceConfig.getLabels()); assertEquals("spec node selectors", specKubeResourceConfig.getNodeSelectors()); - assertNull(specKubeResourceConfig.getCpuLimit()); - assertNull(specKubeResourceConfig.getCpuRequest()); + assertEquals("", specKubeResourceConfig.getCpuLimit()); + assertEquals("", specKubeResourceConfig.getCpuRequest()); assertEquals("spec memory limit", specKubeResourceConfig.getMemoryLimit()); assertEquals("", specKubeResourceConfig.getMemoryRequest()); } From 147f247216c6f863b2ad99e89625bc1ea18633a5 Mon Sep 17 00:00:00 2001 From: Leonid Kozhinov Date: Sun, 17 Mar 2024 03:06:59 +0100 Subject: [PATCH 3/3] Remove usage of clone --- .../workers/config/KubeResourceConfig.java | 29 +++++++------------ .../workers/config/WorkerConfigsProvider.java | 6 ++-- 2 files changed, 14 insertions(+), 21 deletions(-) diff --git a/airbyte-commons-with-dependencies/src/main/java/io/airbyte/commons/workers/config/KubeResourceConfig.java b/airbyte-commons-with-dependencies/src/main/java/io/airbyte/commons/workers/config/KubeResourceConfig.java index 2c00bfcde10..b4039d745c5 100644 --- a/airbyte-commons-with-dependencies/src/main/java/io/airbyte/commons/workers/config/KubeResourceConfig.java +++ b/airbyte-commons-with-dependencies/src/main/java/io/airbyte/commons/workers/config/KubeResourceConfig.java @@ -17,7 +17,7 @@ * the "<EMPTY>" literal to represent an empty string. */ @EachProperty("airbyte.worker.kube-job-configs") -public final class KubeResourceConfig implements Cloneable { +public final class KubeResourceConfig { public static final String EMPTY_VALUE = ""; @@ -34,25 +34,18 @@ public KubeResourceConfig(@Parameter final String name) { this.name = name; } - public KubeResourceConfig clone() { - try { - return (KubeResourceConfig) super.clone(); - } catch (final CloneNotSupportedException e) { - // Unlikely, but in the worst case, we will get this error when running tests. - throw new RuntimeException(e); - } - } + public KubeResourceConfig merge(KubeResourceConfig other) { + var merged = new KubeResourceConfig(name); - public KubeResourceConfig update(KubeResourceConfig other) { - annotations = useOtherIfEmpty(other.annotations, annotations); - labels = useOtherIfEmpty(other.labels, labels); - nodeSelectors = useOtherIfEmpty(other.nodeSelectors, nodeSelectors); - cpuLimit = useOtherIfEmpty(other.cpuLimit, cpuLimit); - cpuRequest = useOtherIfEmpty(other.cpuRequest, cpuRequest); - memoryLimit = useOtherIfEmpty(other.memoryLimit, memoryLimit); - memoryRequest = useOtherIfEmpty(other.memoryRequest, memoryRequest); + merged.setAnnotations(useOtherIfEmpty(annotations, other.annotations)); + merged.setLabels(useOtherIfEmpty(labels, other.labels)); + merged.setNodeSelectors(useOtherIfEmpty(nodeSelectors, other.nodeSelectors)); + merged.setCpuLimit(useOtherIfEmpty(cpuLimit, other.cpuLimit)); + merged.setCpuRequest(useOtherIfEmpty(cpuRequest, other.cpuRequest)); + merged.setMemoryLimit(useOtherIfEmpty(memoryLimit, other.memoryLimit)); + merged.setMemoryRequest(useOtherIfEmpty(memoryRequest, other.memoryRequest)); - return this; + return merged; } public String getName() { diff --git a/airbyte-commons-with-dependencies/src/main/java/io/airbyte/commons/workers/config/WorkerConfigsProvider.java b/airbyte-commons-with-dependencies/src/main/java/io/airbyte/commons/workers/config/WorkerConfigsProvider.java index ecdeaf4c606..9ea6a88f720 100644 --- a/airbyte-commons-with-dependencies/src/main/java/io/airbyte/commons/workers/config/WorkerConfigsProvider.java +++ b/airbyte-commons-with-dependencies/src/main/java/io/airbyte/commons/workers/config/WorkerConfigsProvider.java @@ -282,7 +282,7 @@ private Optional getKubeResourceConfig(final KubeResourceKey if (defaultConfig.isEmpty()) { return variantConfig; } - return variantConfig.map(kubeResourceConfig -> defaultConfig.get().clone().update(kubeResourceConfig)).or(() -> defaultConfig); + return variantConfig.map(kubeResourceConfig -> kubeResourceConfig.merge(defaultConfig.get())).or(() -> defaultConfig); } private static Optional getKubeResourceConfigByType( @@ -301,7 +301,7 @@ private static Optional getKubeResourceConfigByType( if (defaultConfig.isEmpty()) { return typeConfig; } - return typeConfig.map(kubeResourceConfig -> defaultConfig.get().clone().update(kubeResourceConfig)).or(() -> defaultConfig); + return typeConfig.map(kubeResourceConfig -> kubeResourceConfig.merge(defaultConfig.get())).or(() -> defaultConfig); } private static Optional getKubeResourceConfigBySubType(final Map configBySubType, @@ -319,7 +319,7 @@ private static Optional getKubeResourceConfigBySubType(final if (defaultConfig.isEmpty()) { return subTypeConfig; } - return subTypeConfig.map(kubeResourceConfig -> defaultConfig.get().clone().update(kubeResourceConfig)).or(() -> defaultConfig); + return subTypeConfig.map(kubeResourceConfig -> kubeResourceConfig.merge(defaultConfig.get())).or(() -> defaultConfig); } private void validateIsolatedPoolConfigInitialization(final boolean useCustomNodeSelector, final Map isolatedNodeSelectors) {