From d832437f02f09c7455af812406c179c35814e8f2 Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Fri, 19 Jan 2024 13:26:42 -0800 Subject: [PATCH] Change thread pool to a ScalingExecutorBuilder Signed-off-by: Daniel Widdis --- .../opensearch/flowframework/FlowFrameworkPlugin.java | 9 +++++---- .../DeprovisionWorkflowTransportActionTests.java | 8 ++++---- .../flowframework/workflow/DeployModelStepTests.java | 8 ++++---- .../flowframework/workflow/ProcessNodeTests.java | 8 ++++---- .../workflow/RegisterLocalCustomModelStepTests.java | 8 ++++---- .../workflow/RegisterLocalPretrainedModelStepTests.java | 8 ++++---- .../RegisterLocalSparseEncodingModelStepTests.java | 8 ++++---- .../workflow/WorkflowProcessSorterTests.java | 8 ++++---- 8 files changed, 33 insertions(+), 32 deletions(-) diff --git a/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java b/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java index 0fe7b724d..b646cf3bf 100644 --- a/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java +++ b/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java @@ -18,6 +18,7 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.SettingsFilter; +import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.core.action.ActionResponse; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; @@ -64,7 +65,7 @@ import org.opensearch.rest.RestHandler; import org.opensearch.script.ScriptService; import org.opensearch.threadpool.ExecutorBuilder; -import org.opensearch.threadpool.FixedExecutorBuilder; +import org.opensearch.threadpool.ScalingExecutorBuilder; import org.opensearch.threadpool.ThreadPool; import org.opensearch.watcher.ResourceWatcherService; @@ -176,11 +177,11 @@ public List> getSettings() { @Override public List> getExecutorBuilders(Settings settings) { return List.of( - new FixedExecutorBuilder( - settings, + new ScalingExecutorBuilder( WORKFLOW_THREAD_POOL, + 1, OpenSearchExecutors.allocatedProcessors(settings), - 100, + TimeValue.timeValueMinutes(5), FLOW_FRAMEWORK_THREAD_POOL_PREFIX + WORKFLOW_THREAD_POOL ) ); diff --git a/src/test/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportActionTests.java b/src/test/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportActionTests.java index 7841ed193..4d29fa827 100644 --- a/src/test/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportActionTests.java +++ b/src/test/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportActionTests.java @@ -25,7 +25,7 @@ import org.opensearch.flowframework.workflow.WorkflowStepFactory; import org.opensearch.tasks.Task; import org.opensearch.test.OpenSearchTestCase; -import org.opensearch.threadpool.FixedExecutorBuilder; +import org.opensearch.threadpool.ScalingExecutorBuilder; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; @@ -56,11 +56,11 @@ public class DeprovisionWorkflowTransportActionTests extends OpenSearchTestCase private static ThreadPool threadPool = new TestThreadPool( DeprovisionWorkflowTransportActionTests.class.getName(), - new FixedExecutorBuilder( - Settings.EMPTY, + new ScalingExecutorBuilder( WORKFLOW_THREAD_POOL, + 1, OpenSearchExecutors.allocatedProcessors(Settings.EMPTY), - 100, + TimeValue.timeValueMinutes(5), FLOW_FRAMEWORK_THREAD_POOL_PREFIX + WORKFLOW_THREAD_POOL ) ); diff --git a/src/test/java/org/opensearch/flowframework/workflow/DeployModelStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/DeployModelStepTests.java index 2d17da062..92d5be388 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/DeployModelStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/DeployModelStepTests.java @@ -26,7 +26,7 @@ import org.opensearch.ml.common.MLTaskType; import org.opensearch.ml.common.transport.deploy.MLDeployModelResponse; import org.opensearch.test.OpenSearchTestCase; -import org.opensearch.threadpool.FixedExecutorBuilder; +import org.opensearch.threadpool.ScalingExecutorBuilder; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; import org.junit.AfterClass; @@ -82,11 +82,11 @@ public void setUp() throws Exception { testThreadPool = new TestThreadPool( DeployModelStepTests.class.getName(), - new FixedExecutorBuilder( - Settings.EMPTY, + new ScalingExecutorBuilder( WORKFLOW_THREAD_POOL, + 1, OpenSearchExecutors.allocatedProcessors(Settings.EMPTY), - 100, + TimeValue.timeValueMinutes(5), FLOW_FRAMEWORK_THREAD_POOL_PREFIX + WORKFLOW_THREAD_POOL ) ); diff --git a/src/test/java/org/opensearch/flowframework/workflow/ProcessNodeTests.java b/src/test/java/org/opensearch/flowframework/workflow/ProcessNodeTests.java index 56a8e715e..1f67d2b0b 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/ProcessNodeTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/ProcessNodeTests.java @@ -12,7 +12,7 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.test.OpenSearchTestCase; -import org.opensearch.threadpool.FixedExecutorBuilder; +import org.opensearch.threadpool.ScalingExecutorBuilder; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; import org.junit.AfterClass; @@ -42,11 +42,11 @@ public class ProcessNodeTests extends OpenSearchTestCase { public static void setup() { testThreadPool = new TestThreadPool( ProcessNodeTests.class.getName(), - new FixedExecutorBuilder( - Settings.EMPTY, + new ScalingExecutorBuilder( WORKFLOW_THREAD_POOL, + 1, OpenSearchExecutors.allocatedProcessors(Settings.EMPTY), - 100, + TimeValue.timeValueMinutes(5), FLOW_FRAMEWORK_THREAD_POOL_PREFIX + WORKFLOW_THREAD_POOL ) ); diff --git a/src/test/java/org/opensearch/flowframework/workflow/RegisterLocalCustomModelStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/RegisterLocalCustomModelStepTests.java index 942279e6a..010abcf2d 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/RegisterLocalCustomModelStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/RegisterLocalCustomModelStepTests.java @@ -25,7 +25,7 @@ import org.opensearch.ml.common.transport.register.MLRegisterModelInput; import org.opensearch.ml.common.transport.register.MLRegisterModelResponse; import org.opensearch.test.OpenSearchTestCase; -import org.opensearch.threadpool.FixedExecutorBuilder; +import org.opensearch.threadpool.ScalingExecutorBuilder; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; import org.junit.AfterClass; @@ -78,11 +78,11 @@ public void setUp() throws Exception { testThreadPool = new TestThreadPool( RegisterLocalCustomModelStepTests.class.getName(), - new FixedExecutorBuilder( - Settings.EMPTY, + new ScalingExecutorBuilder( WORKFLOW_THREAD_POOL, + 1, OpenSearchExecutors.allocatedProcessors(Settings.EMPTY), - 100, + TimeValue.timeValueMinutes(5), FLOW_FRAMEWORK_THREAD_POOL_PREFIX + WORKFLOW_THREAD_POOL ) ); diff --git a/src/test/java/org/opensearch/flowframework/workflow/RegisterLocalPretrainedModelStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/RegisterLocalPretrainedModelStepTests.java index afb76d92a..031967713 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/RegisterLocalPretrainedModelStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/RegisterLocalPretrainedModelStepTests.java @@ -25,7 +25,7 @@ import org.opensearch.ml.common.transport.register.MLRegisterModelInput; import org.opensearch.ml.common.transport.register.MLRegisterModelResponse; import org.opensearch.test.OpenSearchTestCase; -import org.opensearch.threadpool.FixedExecutorBuilder; +import org.opensearch.threadpool.ScalingExecutorBuilder; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; import org.junit.AfterClass; @@ -78,11 +78,11 @@ public void setUp() throws Exception { testThreadPool = new TestThreadPool( RegisterLocalCustomModelStepTests.class.getName(), - new FixedExecutorBuilder( - Settings.EMPTY, + new ScalingExecutorBuilder( WORKFLOW_THREAD_POOL, + 1, OpenSearchExecutors.allocatedProcessors(Settings.EMPTY), - 100, + TimeValue.timeValueMinutes(5), FLOW_FRAMEWORK_THREAD_POOL_PREFIX + WORKFLOW_THREAD_POOL ) ); diff --git a/src/test/java/org/opensearch/flowframework/workflow/RegisterLocalSparseEncodingModelStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/RegisterLocalSparseEncodingModelStepTests.java index 6756913ec..6cedf632b 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/RegisterLocalSparseEncodingModelStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/RegisterLocalSparseEncodingModelStepTests.java @@ -25,7 +25,7 @@ import org.opensearch.ml.common.transport.register.MLRegisterModelInput; import org.opensearch.ml.common.transport.register.MLRegisterModelResponse; import org.opensearch.test.OpenSearchTestCase; -import org.opensearch.threadpool.FixedExecutorBuilder; +import org.opensearch.threadpool.ScalingExecutorBuilder; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; import org.junit.AfterClass; @@ -78,11 +78,11 @@ public void setUp() throws Exception { testThreadPool = new TestThreadPool( RegisterLocalCustomModelStepTests.class.getName(), - new FixedExecutorBuilder( - Settings.EMPTY, + new ScalingExecutorBuilder( WORKFLOW_THREAD_POOL, + 1, OpenSearchExecutors.allocatedProcessors(Settings.EMPTY), - 100, + TimeValue.timeValueMinutes(5), FLOW_FRAMEWORK_THREAD_POOL_PREFIX + WORKFLOW_THREAD_POOL ) ); diff --git a/src/test/java/org/opensearch/flowframework/workflow/WorkflowProcessSorterTests.java b/src/test/java/org/opensearch/flowframework/workflow/WorkflowProcessSorterTests.java index 531283483..ef9dd5dcf 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/WorkflowProcessSorterTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/WorkflowProcessSorterTests.java @@ -27,7 +27,7 @@ import org.opensearch.flowframework.model.WorkflowValidator; import org.opensearch.ml.client.MachineLearningNodeClient; import org.opensearch.test.OpenSearchTestCase; -import org.opensearch.threadpool.FixedExecutorBuilder; +import org.opensearch.threadpool.ScalingExecutorBuilder; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; import org.junit.AfterClass; @@ -111,11 +111,11 @@ public static void setup() throws IOException { testThreadPool = new TestThreadPool( WorkflowProcessSorterTests.class.getName(), - new FixedExecutorBuilder( - Settings.EMPTY, + new ScalingExecutorBuilder( WORKFLOW_THREAD_POOL, + 1, OpenSearchExecutors.allocatedProcessors(Settings.EMPTY), - 100, + TimeValue.timeValueMinutes(5), FLOW_FRAMEWORK_THREAD_POOL_PREFIX + WORKFLOW_THREAD_POOL ) );