From 7fe58da9d4c6c17547bdd6d3e25d57e2f25c9a5e Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Mon, 25 Sep 2023 15:08:10 -0700 Subject: [PATCH] Release WF slot on UnableToAcquireLockException --- .../internal/worker/WorkflowWorker.java | 55 ++++++++++--------- .../internal/worker/WorkflowWorkerTest.java | 2 +- 2 files changed, 29 insertions(+), 28 deletions(-) diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java index f3087407a..413da1284 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java @@ -281,37 +281,38 @@ public void handle(WorkflowTask task) throws Exception { MDC.put(LoggerTag.RUN_ID, runId); boolean locked = false; - if (!Strings.isNullOrEmpty(stickyTaskQueueName)) { - // Serialize workflow task processing for a particular workflow run. - // This is used to make sure that query tasks and real workflow tasks - // are serialized when sticky is on. - // - // Acquiring a lock with a timeout to avoid having lots of workflow tasks for the same run - // id waiting for a lock and consuming threads in case if lock is unavailable. - // - // Throws interrupted exception which is propagated. It's a correct way to handle it here. - // - // TODO 1: 5 seconds is chosen as a half of normal workflow task timeout. - // This value should be dynamically configured. - // TODO 2: Does "consider increasing workflow task timeout" advice in this exception makes - // any sense? - // This MAYBE makes sense only if a previous workflow task timed out, it's still in - // progress on the worker and the next workflow task got picked up by the same exact - // worker from the general non-sticky task queue. - // Even in this case, this advice looks misleading, something else is going on - // (like an extreme network latency). - locked = runLocks.tryLock(runId, 5, TimeUnit.SECONDS); - - if (!locked) { - throw new UnableToAcquireLockException( - "Workflow lock for the run id hasn't been released by one of previous execution attempts, " - + "consider increasing workflow task timeout."); - } - } Stopwatch swTotal = workflowTypeScope.timer(MetricsType.WORKFLOW_TASK_EXECUTION_TOTAL_LATENCY).start(); try { + if (!Strings.isNullOrEmpty(stickyTaskQueueName)) { + // Serialize workflow task processing for a particular workflow run. + // This is used to make sure that query tasks and real workflow tasks + // are serialized when sticky is on. + // + // Acquiring a lock with a timeout to avoid having lots of workflow tasks for the same run + // id waiting for a lock and consuming threads in case if lock is unavailable. + // + // Throws interrupted exception which is propagated. It's a correct way to handle it here. + // + // TODO 1: 5 seconds is chosen as a half of normal workflow task timeout. + // This value should be dynamically configured. + // TODO 2: Does "consider increasing workflow task timeout" advice in this exception makes + // any sense? + // This MAYBE makes sense only if a previous workflow task timed out, it's still in + // progress on the worker and the next workflow task got picked up by the same exact + // worker from the general non-sticky task queue. + // Even in this case, this advice looks misleading, something else is going on + // (like an extreme network latency). + locked = runLocks.tryLock(runId, 5, TimeUnit.SECONDS); + + if (!locked) { + throw new UnableToAcquireLockException( + "Workflow lock for the run id hasn't been released by one of previous execution attempts, " + + "consider increasing workflow task timeout."); + } + } + Optional nextWFTResponse = Optional.of(workflowTaskResponse); do { PollWorkflowTaskQueueResponse currentTask = nextWFTResponse.get(); diff --git a/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowWorkerTest.java b/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowWorkerTest.java index 87de87d9b..b6c2bfe7f 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowWorkerTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowWorkerTest.java @@ -189,7 +189,7 @@ public void concurrentPollRequestLockTest() throws Exception { ImmutableMap.of("worker_type", "WorkflowWorker"), 100.0); // Cleanup - worker.shutdown(new ShutdownManager(), true).get(); + worker.shutdown(new ShutdownManager(), false).get(); // Verify we only handled two tasks verify(taskHandler, times(2)).handleWorkflowTask(any()); }