From 4eaeb9edce429e14e9f615ae0e719c2aa02f1440 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Tue, 20 Aug 2024 13:13:04 -0700 Subject: [PATCH] Fix shutdown behavior of unstarted LA slot queue (#2196) --- .../LocalActivitySlotSupplierQueue.java | 57 ++++++++++--- .../internal/worker/LocalActivityWorker.java | 16 ++-- ...ityWorkerNoneRegisteredNotStartedTest.java | 75 +++++++++++++++++ .../LocalActivityWorkerNotStartedTest.java | 82 +++++++++++++++++++ 4 files changed, 213 insertions(+), 17 deletions(-) create mode 100644 temporal-sdk/src/test/java/io/temporal/worker/LocalActivityWorkerNoneRegisteredNotStartedTest.java create mode 100644 temporal-sdk/src/test/java/io/temporal/worker/LocalActivityWorkerNotStartedTest.java diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivitySlotSupplierQueue.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivitySlotSupplierQueue.java index 4cf18e090..e2da9d832 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivitySlotSupplierQueue.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivitySlotSupplierQueue.java @@ -23,14 +23,12 @@ import io.temporal.worker.tuning.LocalActivitySlotInfo; import io.temporal.worker.tuning.SlotPermit; import io.temporal.workflow.Functions; -import java.util.concurrent.PriorityBlockingQueue; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import javax.annotation.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class LocalActivitySlotSupplierQueue { +class LocalActivitySlotSupplierQueue implements Shutdownable { static final class QueuedLARequest { final boolean isRetry; final SlotReservationData data; @@ -47,10 +45,11 @@ static final class QueuedLARequest { private final Semaphore newExecutionsBackpressureSemaphore; private final TrackingSlotSupplier slotSupplier; private final Functions.Proc1 afterReservedCallback; - private final Thread queueThread; + private final ExecutorService queueThreadService; private static final Logger log = LoggerFactory.getLogger(LocalActivitySlotSupplierQueue.class.getName()); private volatile boolean running = true; + private volatile boolean wasEverStarted = false; LocalActivitySlotSupplierQueue( TrackingSlotSupplier slotSupplier, @@ -73,13 +72,13 @@ static final class QueuedLARequest { return 0; }); this.slotSupplier = slotSupplier; - this.queueThread = new Thread(this::processQueue, "LocalActivitySlotSupplierQueue"); - this.queueThread.start(); + this.queueThreadService = + Executors.newSingleThreadExecutor(r -> new Thread(r, "LocalActivitySlotSupplierQueue")); } private void processQueue() { try { - while (running) { + while (running || !requestQueue.isEmpty()) { QueuedLARequest request = requestQueue.take(); SlotPermit slotPermit; try { @@ -102,9 +101,9 @@ private void processQueue() { } } - void shutdown() { - running = false; - queueThread.interrupt(); + void start() { + wasEverStarted = true; + this.queueThreadService.submit(this::processQueue); } boolean waitOnBackpressure(@Nullable Long acceptanceTimeoutMs) throws InterruptedException { @@ -134,4 +133,40 @@ void submitAttempt(SlotReservationData data, boolean isRetry, LocalActivityAttem newExecutionsBackpressureSemaphore.release(); } } + + @Override + public boolean isShutdown() { + return queueThreadService.isShutdown(); + } + + @Override + public boolean isTerminated() { + return queueThreadService.isTerminated(); + } + + @Override + public CompletableFuture shutdown(ShutdownManager shutdownManager, boolean interruptTasks) { + running = false; + if (requestQueue.isEmpty()) { + // Just interrupt the thread, so that if we're waiting on blocking take the thread will + // be interrupted and exit. Otherwise the loop will exit once the queue is empty. + queueThreadService.shutdownNow(); + } + + return interruptTasks + ? shutdownManager.shutdownExecutorNowUntimed( + queueThreadService, "LocalActivitySlotSupplierQueue") + : shutdownManager.shutdownExecutorUntimed( + queueThreadService, "LocalActivitySlotSupplierQueue"); + } + + @Override + public void awaitTermination(long timeout, TimeUnit unit) { + if (!wasEverStarted) { + // Not entirely clear why this is necessary, but await termination will hang the whole + // timeout duration if no task was ever submitted. + return; + } + ShutdownManager.awaitTermination(queueThreadService, unit.toMillis(timeout)); + } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityWorker.java index a15e0c876..106be05be 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityWorker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityWorker.java @@ -689,6 +689,7 @@ public boolean start() { false); this.workerMetricsScope.counter(MetricsType.WORKER_START_COUNTER).inc(1); + this.slotQueue.start(); return true; } else { return false; @@ -698,9 +699,9 @@ public boolean start() { @Override public CompletableFuture shutdown(ShutdownManager shutdownManager, boolean interruptTasks) { if (activityAttemptTaskExecutor != null && !activityAttemptTaskExecutor.isShutdown()) { - slotQueue.shutdown(); - return activityAttemptTaskExecutor + return slotQueue .shutdown(shutdownManager, interruptTasks) + .thenCompose(r -> activityAttemptTaskExecutor.shutdown(shutdownManager, interruptTasks)) .thenCompose( r -> shutdownManager.shutdownExecutor( @@ -717,21 +718,24 @@ public CompletableFuture shutdown(ShutdownManager shutdownManager, boolean @Override public void awaitTermination(long timeout, TimeUnit unit) { - slotQueue.shutdown(); long timeoutMillis = unit.toMillis(timeout); - ShutdownManager.awaitTermination(scheduledExecutor, timeoutMillis); + long remainingTimeout = ShutdownManager.awaitTermination(scheduledExecutor, timeoutMillis); + ShutdownManager.awaitTermination(slotQueue, remainingTimeout); } @Override public boolean isShutdown() { - return activityAttemptTaskExecutor != null && activityAttemptTaskExecutor.isShutdown(); + return activityAttemptTaskExecutor != null + && activityAttemptTaskExecutor.isShutdown() + && slotQueue.isShutdown(); } @Override public boolean isTerminated() { return activityAttemptTaskExecutor != null && activityAttemptTaskExecutor.isTerminated() - && scheduledExecutor.isTerminated(); + && scheduledExecutor.isTerminated() + && slotQueue.isTerminated(); } @Override diff --git a/temporal-sdk/src/test/java/io/temporal/worker/LocalActivityWorkerNoneRegisteredNotStartedTest.java b/temporal-sdk/src/test/java/io/temporal/worker/LocalActivityWorkerNoneRegisteredNotStartedTest.java new file mode 100644 index 000000000..c0e5d91a5 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/worker/LocalActivityWorkerNoneRegisteredNotStartedTest.java @@ -0,0 +1,75 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.worker; + +import static org.junit.Assert.assertTrue; + +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.Workflow; +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; +import java.time.Duration; +import java.time.Instant; +import java.util.Set; +import org.junit.Rule; +import org.junit.Test; + +public class LocalActivityWorkerNoneRegisteredNotStartedTest { + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes(NothingWorkflowImpl.class) + .setWorkerOptions(WorkerOptions.newBuilder().setLocalActivityWorkerOnly(true).build()) + // Don't start the worker + .setDoNotStart(true) + .build(); + + @Test + public void canShutDownProperlyWhenNotStarted() { + // Shut down the (never started) worker + Instant shutdownTime = Instant.now(); + testWorkflowRule.getTestEnvironment().getWorkerFactory().shutdown(); + testWorkflowRule.getWorker().awaitTermination(2, java.util.concurrent.TimeUnit.SECONDS); + Set threadSet = Thread.getAllStackTraces().keySet(); + for (Thread thread : threadSet) { + if (thread.getName().contains("LocalActivitySlotSupplierQueue")) { + throw new RuntimeException("Thread should be terminated"); + } + } + Duration elapsed = Duration.between(shutdownTime, Instant.now()); + // Shutdown should not have taken long + assertTrue(elapsed.getSeconds() < 2); + } + + @WorkflowInterface + public interface NothingWorkflow { + @WorkflowMethod + void execute(); + } + + public static class NothingWorkflowImpl implements NothingWorkflow { + @Override + public void execute() { + Workflow.sleep(500); + } + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/worker/LocalActivityWorkerNotStartedTest.java b/temporal-sdk/src/test/java/io/temporal/worker/LocalActivityWorkerNotStartedTest.java new file mode 100644 index 000000000..09032ba36 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/worker/LocalActivityWorkerNotStartedTest.java @@ -0,0 +1,82 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.worker; + +import static org.junit.Assert.assertTrue; + +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.Workflow; +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; +import io.temporal.workflow.shared.TestActivities; +import java.time.Duration; +import java.time.Instant; +import java.util.Set; +import org.junit.Rule; +import org.junit.Test; + +public class LocalActivityWorkerNotStartedTest { + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes(NothingWorkflowImpl.class) + .setActivityImplementations(new NothingActivityImpl()) + .setWorkerOptions(WorkerOptions.newBuilder().setLocalActivityWorkerOnly(true).build()) + // Don't start the worker + .setDoNotStart(true) + .build(); + + @Test + public void canShutDownProperlyWhenNotStarted() { + // Shut down the (never started) worker + Instant shutdownTime = Instant.now(); + testWorkflowRule.getTestEnvironment().getWorkerFactory().shutdown(); + testWorkflowRule.getWorker().awaitTermination(2, java.util.concurrent.TimeUnit.SECONDS); + Set threadSet = Thread.getAllStackTraces().keySet(); + for (Thread thread : threadSet) { + if (thread.getName().contains("LocalActivitySlotSupplierQueue")) { + throw new RuntimeException("Thread should be terminated"); + } + } + Duration elapsed = Duration.between(shutdownTime, Instant.now()); + // Shutdown should not have taken long + assertTrue(elapsed.getSeconds() < 2); + } + + @WorkflowInterface + public interface NothingWorkflow { + @WorkflowMethod + void execute(); + } + + public static class NothingWorkflowImpl implements NothingWorkflow { + @Override + public void execute() { + Workflow.sleep(500); + } + } + + public static class NothingActivityImpl implements TestActivities.NoArgsActivity { + @Override + public void execute() {} + } +}