diff --git a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowExecutor.java b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowExecutor.java index 87e359ebd..299162f22 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowExecutor.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowExecutor.java @@ -38,6 +38,7 @@ import io.temporal.internal.statemachines.WorkflowStateMachines; import io.temporal.internal.worker.WorkflowExecutionException; import io.temporal.worker.MetricsType; +import io.temporal.worker.NonDeterministicException; import java.util.Optional; import javax.annotation.Nullable; @@ -140,7 +141,8 @@ public void handleWorkflowExecutionSignaled(HistoryEvent event) { WorkflowExecutionSignaledEventAttributes signalAttributes = event.getWorkflowExecutionSignaledEventAttributes(); if (context.isWorkflowMethodCompleted()) { - throw new IllegalStateException("Signal received after workflow is closed."); + throw new NonDeterministicException( + "Signal received after workflow is completed. Typically this is caused by a nondeterministic code change in a workflow or a change is what payloads data converters can handle"); } Optional input = signalAttributes.hasInput() ? Optional.of(signalAttributes.getInput()) : Optional.empty(); @@ -150,7 +152,7 @@ public void handleWorkflowExecutionSignaled(HistoryEvent event) { public void handleWorkflowExecutionUpdated(UpdateMessage updateMessage) { if (context.isWorkflowMethodCompleted()) { - throw new IllegalStateException("Update received after workflow is closed."); + throw new NonDeterministicException("Update received after workflow is completed."); } try { Message protocolMessage = updateMessage.getMessage(); diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/signalTests/SignalContinueAsNewNonDeterminism.java b/temporal-sdk/src/test/java/io/temporal/workflow/signalTests/SignalContinueAsNewNonDeterminism.java new file mode 100644 index 000000000..737d36448 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/signalTests/SignalContinueAsNewNonDeterminism.java @@ -0,0 +1,122 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.workflow.signalTests; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertEquals; + +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowFailedException; +import io.temporal.client.WorkflowOptions; +import io.temporal.failure.ApplicationFailure; +import io.temporal.testing.internal.SDKTestOptions; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.worker.NonDeterministicException; +import io.temporal.worker.WorkflowImplementationOptions; +import io.temporal.workflow.SignalMethod; +import io.temporal.workflow.Workflow; +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Semaphore; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +public class SignalContinueAsNewNonDeterminism { + private static final Semaphore workflowTaskProcessed = new Semaphore(1); + + private static final CompletableFuture continueAsNew = new CompletableFuture<>(); + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes( + WorkflowImplementationOptions.newBuilder() + .setFailWorkflowExceptionTypes(Throwable.class) + .build(), + TestSignaledWorkflowImpl.class) + .build(); + + @Test + public void testSignalContinueAsNewNonDeterminism() + throws ExecutionException, InterruptedException { + // Verify we report nondeterminism when a signal handler is nondeterministic and calls continue + // as new on replay + String workflowId = UUID.randomUUID().toString(); + WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient(); + WorkflowOptions options = + SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue()).toBuilder() + .setWorkflowId(workflowId) + .setWorkflowTaskTimeout(Duration.ofSeconds(1)) + .build(); + TestSignaledWorkflow client = + workflowClient.newWorkflowStub(TestSignaledWorkflow.class, options); + + WorkflowClient.start(client::execute, false); + for (int i = 0; i < 5; i++) { + workflowTaskProcessed.acquire(); + client.signal(); + } + continueAsNew.complete(true); + // Force replay, expected to fail with NonDeterministicException + testWorkflowRule.invalidateWorkflowCache(); + client.signal(); + WorkflowFailedException e = + Assert.assertThrows(WorkflowFailedException.class, () -> client.execute(false)); + assertThat(e.getCause(), is(instanceOf(ApplicationFailure.class))); + assertEquals( + NonDeterministicException.class.getName(), ((ApplicationFailure) e.getCause()).getType()); + } + + @WorkflowInterface + public interface TestSignaledWorkflow { + + @WorkflowMethod + String execute(boolean finish); + + @SignalMethod + void signal() throws ExecutionException, InterruptedException; + } + + public static class TestSignaledWorkflowImpl implements TestSignaledWorkflow { + + @Override + public String execute(boolean finish) { + Workflow.await(() -> finish); + return "finished"; + } + + @Override + public void signal() throws ExecutionException, InterruptedException { + // Intentionally introduce non determinism + if (continueAsNew.getNow(false)) { + Workflow.continueAsNew(true); + } + workflowTaskProcessed.release(); + } + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/signalTests/SignalContinueAsNewWFTFailure.java b/temporal-sdk/src/test/java/io/temporal/workflow/signalTests/SignalContinueAsNewWFTFailure.java new file mode 100644 index 000000000..5ebfb4819 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/signalTests/SignalContinueAsNewWFTFailure.java @@ -0,0 +1,99 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.workflow.signalTests; + +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowOptions; +import io.temporal.testing.internal.SDKTestOptions; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.SignalMethod; +import io.temporal.workflow.Workflow; +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Semaphore; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +public class SignalContinueAsNewWFTFailure { + private static final Semaphore workflowTaskProcessed = new Semaphore(1); + + private static final CompletableFuture continueAsNew = new CompletableFuture<>(); + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder().setWorkflowTypes(TestSignaledWorkflowImpl.class).build(); + + @Test + public void testSignalContinueAsNewAfterWFTFailure() + throws ExecutionException, InterruptedException { + String workflowId = UUID.randomUUID().toString(); + WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient(); + WorkflowOptions options = + SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue()).toBuilder() + .setWorkflowId(workflowId) + .setWorkflowTaskTimeout(Duration.ofSeconds(1)) + .build(); + TestSignaledWorkflow client = + workflowClient.newWorkflowStub(TestSignaledWorkflow.class, options); + + WorkflowClient.start(client::execute, false); + for (int i = 0; i < 5; i++) { + workflowTaskProcessed.acquire(); + client.signal(); + } + continueAsNew.complete(true); + + Assert.assertEquals("finished", client.execute(false)); + } + + @WorkflowInterface + public interface TestSignaledWorkflow { + + @WorkflowMethod + String execute(boolean finish); + + @SignalMethod + void signal() throws ExecutionException, InterruptedException; + } + + public static class TestSignaledWorkflowImpl implements TestSignaledWorkflow { + + @Override + public String execute(boolean finish) { + Workflow.await(() -> finish); + return "finished"; + } + + @Override + public void signal() throws ExecutionException, InterruptedException { + if (continueAsNew.getNow(false)) { + Workflow.continueAsNew(true); + } + workflowTaskProcessed.release(); + throw new RuntimeException("fail workflow task"); + } + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateContinueAsNewNonDeterminism.java b/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateContinueAsNewNonDeterminism.java new file mode 100644 index 000000000..49751c7f4 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateContinueAsNewNonDeterminism.java @@ -0,0 +1,120 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.workflow.updateTest; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertEquals; + +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowFailedException; +import io.temporal.client.WorkflowOptions; +import io.temporal.failure.ApplicationFailure; +import io.temporal.testing.internal.SDKTestOptions; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.worker.NonDeterministicException; +import io.temporal.worker.WorkflowImplementationOptions; +import io.temporal.workflow.*; +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +public class UpdateContinueAsNewNonDeterminism { + private static final CompletableFuture continueAsNew = new CompletableFuture<>(); + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes( + WorkflowImplementationOptions.newBuilder() + .setFailWorkflowExceptionTypes(Throwable.class) + .build(), + TestUpdateWorkflowImpl.class) + .build(); + + @Test + public void testUpdateContinueAsNewNonDeterminism() + throws ExecutionException, InterruptedException { + // Verify we report nondeterminism when an update handler is nondeterministic and calls continue + // as new on replay + String workflowId = UUID.randomUUID().toString(); + WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient(); + WorkflowOptions options = + SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue()).toBuilder() + .setWorkflowId(workflowId) + .setWorkflowTaskTimeout(Duration.ofSeconds(1)) + .build(); + TestUpdateWorkflow client = workflowClient.newWorkflowStub(TestUpdateWorkflow.class, options); + + WorkflowClient.start(client::execute, false); + for (int i = 0; i < 5; i++) { + client.update(); + } + continueAsNew.complete(true); + // Force replay, expected to fail with NonDeterministicException + testWorkflowRule.invalidateWorkflowCache(); + // Use a signal here because an update would block + client.signal(); + WorkflowFailedException e = + Assert.assertThrows(WorkflowFailedException.class, () -> client.execute(false)); + assertThat(e.getCause(), is(instanceOf(ApplicationFailure.class))); + assertEquals( + NonDeterministicException.class.getName(), ((ApplicationFailure) e.getCause()).getType()); + } + + @WorkflowInterface + public interface TestUpdateWorkflow { + + @WorkflowMethod + String execute(boolean finish); + + @UpdateMethod + void update() throws ExecutionException, InterruptedException; + + @SignalMethod + void signal(); + } + + public static class TestUpdateWorkflowImpl implements TestUpdateWorkflow { + + @Override + public String execute(boolean finish) { + Workflow.await(() -> finish); + return "finished"; + } + + @Override + public void update() throws ExecutionException, InterruptedException { + // Intentionally introduce non determinism + if (continueAsNew.getNow(false)) { + Workflow.continueAsNew(true); + } + } + + @Override + public void signal() {} + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateContinueAsNewWFTFailure.java b/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateContinueAsNewWFTFailure.java new file mode 100644 index 000000000..0f560810e --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateContinueAsNewWFTFailure.java @@ -0,0 +1,108 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.workflow.updateTest; + +import static org.junit.Assume.assumeTrue; + +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowOptions; +import io.temporal.testing.internal.SDKTestOptions; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.*; +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Semaphore; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +public class UpdateContinueAsNewWFTFailure { + private static final Semaphore workflowTaskProcessed = new Semaphore(1); + + private static final CompletableFuture continueAsNew = new CompletableFuture<>(); + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder().setWorkflowTypes(TestUpdateWorkflowImpl.class).build(); + + @Test + public void testUpdateContinueAsNewAfterWFTFailure() throws InterruptedException { + // TODO(https://github.com/temporalio/sdk-java/issues/1903) + assumeTrue("Test Server hangs here", SDKTestWorkflowRule.useExternalService); + + String workflowId = UUID.randomUUID().toString(); + WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient(); + WorkflowOptions options = + SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue()).toBuilder() + .setWorkflowId(workflowId) + .setWorkflowTaskTimeout(Duration.ofSeconds(1)) + .build(); + TestUpdateWorkflow client = workflowClient.newWorkflowStub(TestUpdateWorkflow.class, options); + + WorkflowClient.start(client::execute, false); + for (int i = 0; i < 5; i++) { + workflowTaskProcessed.acquire(); + // Start update in a separate thread to avoid blocking since admitted is not supported. + Thread asyncUpdate = + new Thread( + () -> { + try { + client.update(); + } catch (Exception e) { + } + }); + asyncUpdate.start(); + } + continueAsNew.complete(true); + + Assert.assertEquals("finished", client.execute(false)); + } + + @WorkflowInterface + public interface TestUpdateWorkflow { + + @WorkflowMethod + String execute(boolean finish); + + @UpdateMethod + void update() throws ExecutionException, InterruptedException; + } + + public static class TestUpdateWorkflowImpl implements TestUpdateWorkflow { + + @Override + public String execute(boolean finish) { + Workflow.await(() -> finish); + return "finished"; + } + + @Override + public void update() throws ExecutionException, InterruptedException { + if (continueAsNew.getNow(false)) { + Workflow.continueAsNew(true); + } + workflowTaskProcessed.release(); + throw new RuntimeException("fail workflow task"); + } + } +}