diff --git a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandler.java b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandler.java index 65dd813169..dd18849e33 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandler.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandler.java @@ -163,9 +163,8 @@ public WorkflowTaskResult handleWorkflowTask( } handleWorkflowTaskImpl(workflowTask, historyIterator); - if (!context.isWorkflowMethodCompleted()) { - processLocalActivityRequests(wftHearbeatDeadline); - } + processLocalActivityRequests(wftHearbeatDeadline); + List commands = workflowStateMachines.takeCommands(); List messages = workflowStateMachines.takeMessages(); EnumSet newFlags = workflowStateMachines.takeNewSdkFlags(); @@ -313,6 +312,12 @@ private void processLocalActivityRequests(Deadline wftHeartbeatDeadline) throws InterruptedException, Throwable { while (true) { + // Scheduling or handling any local activities after the workflow method has returned + // can result in commands being generated after the CompleteWorkflowExecution command + // which the server does not allow. + if (context.isWorkflowMethodCompleted()) { + break; + } List laRequests = workflowStateMachines.takeLocalActivityRequests(); localActivityTaskCount += laRequests.size(); @@ -363,7 +368,8 @@ private void processLocalActivityRequests(Deadline wftHeartbeatDeadline) // it's safe to call and discard the result of takeLocalActivityRequests() here, because if it's // not empty - we are in trouble anyway Preconditions.checkState( - workflowStateMachines.takeLocalActivityRequests().isEmpty(), + workflowStateMachines.takeLocalActivityRequests().isEmpty() + || context.isWorkflowMethodCompleted(), "[BUG] Local activities requests from the last event loop were not drained " + "and accounted in the outstanding local activities counter"); } diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/CommandInTheLastWorkflowTaskTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/CommandInTheLastWorkflowTaskTest.java index 5cf9bb651d..2973a512f5 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/CommandInTheLastWorkflowTaskTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/CommandInTheLastWorkflowTaskTest.java @@ -53,7 +53,6 @@ public String execute() { Workflow.mutableSideEffect( "id1", Integer.class, (o, n) -> Objects.equals(n, o), () -> 0); }); - return "done"; } } diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/activityTests/LocalActivityInTheLastWorkflowTaskTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/activityTests/LocalActivityInTheLastWorkflowTaskTest.java index 66b0db8409..7e68084dcd 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/activityTests/LocalActivityInTheLastWorkflowTaskTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/activityTests/LocalActivityInTheLastWorkflowTaskTest.java @@ -24,14 +24,16 @@ import io.temporal.activity.LocalActivityOptions; import io.temporal.testing.internal.SDKTestWorkflowRule; -import io.temporal.workflow.Async; -import io.temporal.workflow.Workflow; +import io.temporal.workflow.*; import io.temporal.workflow.shared.TestActivities; -import io.temporal.workflow.shared.TestWorkflows; import java.time.Duration; +import junitparams.JUnitParamsRunner; +import junitparams.Parameters; import org.junit.Rule; import org.junit.Test; +import org.junit.runner.RunWith; +@RunWith(JUnitParamsRunner.class) public class LocalActivityInTheLastWorkflowTaskTest { @Rule public SDKTestWorkflowRule testWorkflowRule = @@ -41,13 +43,19 @@ public class LocalActivityInTheLastWorkflowTaskTest { .build(); @Test - public void testLocalActivityInTheLastWorkflowTask() { - TestWorkflows.TestWorkflowReturnString client = - testWorkflowRule.newWorkflowStub(TestWorkflows.TestWorkflowReturnString.class); - assertEquals("done", client.execute()); + @Parameters({"true", "false"}) + public void testLocalActivityInTheLastWorkflowTask(boolean blockOnLA) { + TestWorkflow client = testWorkflowRule.newWorkflowStub(TestWorkflow.class); + assertEquals("done", client.execute(blockOnLA)); } - public static class TestWorkflowImpl implements TestWorkflows.TestWorkflowReturnString { + @WorkflowInterface + public interface TestWorkflow { + @WorkflowMethod + String execute(boolean blockOnLA); + } + + public static class TestWorkflowImpl implements TestWorkflow { private final TestActivities.VariousTestActivities activities = Workflow.newLocalActivityStub( @@ -57,7 +65,12 @@ public static class TestWorkflowImpl implements TestWorkflows.TestWorkflowReturn .build()); @Override - public String execute() { + public String execute(boolean blockOnLA) { + if (blockOnLA) { + Promise promise = Async.procedure(activities::sleepActivity, (long) 100, 0); + Async.procedure(activities::sleepActivity, (long) 1000, 0); + promise.get(); + } Async.procedure(activities::sleepActivity, (long) 1000, 0); return "done"; } diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/signalTests/SignalWithLocalActivityInTheLastWorkflowTaskTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/signalTests/SignalWithLocalActivityInTheLastWorkflowTaskTest.java index 069935029f..fce2a3ca0d 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/signalTests/SignalWithLocalActivityInTheLastWorkflowTaskTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/signalTests/SignalWithLocalActivityInTheLastWorkflowTaskTest.java @@ -25,13 +25,16 @@ import io.temporal.activity.LocalActivityOptions; import io.temporal.client.WorkflowStub; import io.temporal.testing.internal.SDKTestWorkflowRule; -import io.temporal.workflow.Workflow; +import io.temporal.workflow.*; import io.temporal.workflow.shared.TestActivities; -import io.temporal.workflow.shared.TestWorkflows; import java.time.Duration; +import junitparams.JUnitParamsRunner; +import junitparams.Parameters; import org.junit.Rule; import org.junit.Test; +import org.junit.runner.RunWith; +@RunWith(JUnitParamsRunner.class) public class SignalWithLocalActivityInTheLastWorkflowTaskTest { @Rule public SDKTestWorkflowRule testWorkflowRule = @@ -41,15 +44,25 @@ public class SignalWithLocalActivityInTheLastWorkflowTaskTest { .build(); @Test - public void testSignalWithLocalActivityInTheLastWorkflowTask() { - TestWorkflows.TestSignaledWorkflow client = - testWorkflowRule.newWorkflowStub(TestWorkflows.TestSignaledWorkflow.class); + @Parameters({"true", "false"}) + public void testSignalWithLocalActivityInTheLastWorkflowTask(Boolean waitOnLA) { + TestSignaledWorkflow client = testWorkflowRule.newWorkflowStub(TestSignaledWorkflow.class); WorkflowStub.fromTyped(client) - .signalWithStart("testSignal", new String[] {"signalValue"}, new String[] {}); + .signalWithStart("testSignal", new String[] {"signalValue"}, new Boolean[] {waitOnLA}); assertEquals("done", client.execute()); } - public static class TestSignalWorkflowImpl implements TestWorkflows.TestSignaledWorkflow { + @WorkflowInterface + public interface TestSignaledWorkflow { + + @WorkflowMethod + String execute(); + + @SignalMethod + void signal(boolean waitOnLA); + } + + public static class TestSignalWorkflowImpl implements TestSignaledWorkflow { private final TestActivities.VariousTestActivities activities = Workflow.newLocalActivityStub( @@ -64,7 +77,13 @@ public String execute() { } @Override - public void signal(String arg) { + public void signal(boolean waitOnLA) { + if (waitOnLA) { + Promise promise = Async.procedure(activities::sleepActivity, (long) 100, 0); + Async.procedure(activities::sleepActivity, (long) 1000, 0); + promise.get(); + } + activities.sleepActivity(1000, 0); } } diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateWithLocalActivityInTheLastWorkflowTaskTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateWithLocalActivityInTheLastWorkflowTaskTest.java index f3c084c2aa..f27861cdb9 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateWithLocalActivityInTheLastWorkflowTaskTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateWithLocalActivityInTheLastWorkflowTaskTest.java @@ -25,33 +25,36 @@ import io.temporal.activity.LocalActivityOptions; import io.temporal.client.WorkflowStub; import io.temporal.testing.internal.SDKTestWorkflowRule; -import io.temporal.workflow.Workflow; +import io.temporal.workflow.*; import io.temporal.workflow.shared.TestActivities; -import io.temporal.workflow.shared.TestWorkflows; import java.time.Duration; +import junitparams.JUnitParamsRunner; +import junitparams.Parameters; import org.junit.Rule; import org.junit.Test; +import org.junit.runner.RunWith; +@RunWith(JUnitParamsRunner.class) public class UpdateWithLocalActivityInTheLastWorkflowTaskTest { @Rule public SDKTestWorkflowRule testWorkflowRule = SDKTestWorkflowRule.newBuilder() - .setWorkflowTypes(TestSimpleWorkflowWithUpdateImpl.class) + .setWorkflowTypes(WorkflowWithUpdateImpl.class) .setActivityImplementations(new TestActivities.TestActivitiesImpl()) .build(); @Test - public void testUpdateWithLocalActivityInTheLastWorkflowTask() throws InterruptedException { - TestWorkflows.SimpleWorkflowWithUpdate client = - testWorkflowRule.newWorkflowStub(TestWorkflows.SimpleWorkflowWithUpdate.class); + @Parameters({"true", "false"}) + public void testUpdateWithLocalActivityInTheLastWorkflowTask(Boolean waitOnLA) + throws InterruptedException { + WorkflowWithUpdate client = testWorkflowRule.newWorkflowStub(WorkflowWithUpdate.class); WorkflowStub.fromTyped(client).start(); Thread asyncUpdate = new Thread( () -> { try { - System.out.println("Sending update"); - client.update("Update"); + client.update(waitOnLA); } catch (Exception e) { } }); @@ -60,8 +63,17 @@ public void testUpdateWithLocalActivityInTheLastWorkflowTask() throws Interrupte asyncUpdate.interrupt(); } - public static class TestSimpleWorkflowWithUpdateImpl - implements TestWorkflows.SimpleWorkflowWithUpdate { + @WorkflowInterface + public interface WorkflowWithUpdate { + + @WorkflowMethod + String execute(); + + @UpdateMethod + String update(Boolean waitOnLA); + } + + public static class WorkflowWithUpdateImpl implements WorkflowWithUpdate { Boolean finish = false; private final TestActivities.VariousTestActivities activities = @@ -78,7 +90,12 @@ public String execute() { } @Override - public String update(String value) { + public String update(Boolean waitOnLA) { + if (waitOnLA) { + Promise promise = Async.procedure(activities::sleepActivity, (long) 100, 0); + Async.procedure(activities::sleepActivity, (long) 1000, 0); + promise.get(); + } finish = true; activities.sleepActivity(1000, 0); return "update";