diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/ApplicationInitializer.java b/airbyte-workers/src/main/java/io/airbyte/workers/ApplicationInitializer.java index 858c8be3a6d..f2fefa4ca3b 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/ApplicationInitializer.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/ApplicationInitializer.java @@ -4,6 +4,7 @@ package io.airbyte.workers; +import com.google.common.annotations.VisibleForTesting; import datadog.trace.api.GlobalTracer; import datadog.trace.api.Tracer; import io.airbyte.commons.logging.LogClientManager; @@ -273,9 +274,19 @@ private void registerSync(final WorkerFactory factory, final MaxWorkersConfig ma private WorkerOptions getWorkerOptions(final int max) { return WorkerOptions.newBuilder() .setMaxConcurrentActivityExecutionSize(max) + .setMaxConcurrentWorkflowTaskExecutionSize(inferWorkflowExecSizeFromActivityExecutionSize(max)) .build(); } + @VisibleForTesting + static int inferWorkflowExecSizeFromActivityExecutionSize(final int max) { + // Divide by 5 seems to be a good ratio given current empirical observations + // Keeping floor at 2 to ensure we keep always return a valid value + final int floor = 2; + final int maxWorkflowSize = max / 5; + return Math.max(maxWorkflowSize, floor); + } + /** * Performs additional configuration of the Temporal service/connection. * diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/ApplicationInitializerTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/ApplicationInitializerTest.java new file mode 100644 index 00000000000..0a535a46b38 --- /dev/null +++ b/airbyte-workers/src/test/java/io/airbyte/workers/ApplicationInitializerTest.java @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2020-2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.workers; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.junit.jupiter.api.Test; + +class ApplicationInitializerTest { + + @Test + void testInferWorkflowExecSizeFromActivityExecutionSize() { + assertEquals(2, ApplicationInitializer.inferWorkflowExecSizeFromActivityExecutionSize(10)); + + assertEquals(2, ApplicationInitializer.inferWorkflowExecSizeFromActivityExecutionSize(5)); + + assertEquals(2, ApplicationInitializer.inferWorkflowExecSizeFromActivityExecutionSize(1)); + + assertEquals(20, ApplicationInitializer.inferWorkflowExecSizeFromActivityExecutionSize(100)); + } + +}