diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateTestContinueAsNew.java b/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateTestContinueAsNew.java new file mode 100644 index 000000000..2bc9ddf42 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateTestContinueAsNew.java @@ -0,0 +1,162 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.workflow.updateTest; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; + +import io.temporal.activity.Activity; +import io.temporal.activity.ActivityInterface; +import io.temporal.activity.ActivityMethod; +import io.temporal.activity.ActivityOptions; +import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.client.*; +import io.temporal.testing.internal.SDKTestOptions; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.worker.WorkerOptions; +import io.temporal.workflow.CompletablePromise; +import io.temporal.workflow.Workflow; +import io.temporal.workflow.shared.TestActivities; +import io.temporal.workflow.shared.TestWorkflows.WorkflowWithUpdate; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.UUID; +import org.junit.Rule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class UpdateTestContinueAsNew { + + private static final Logger log = LoggerFactory.getLogger(UpdateTestContinueAsNew.class); + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkerOptions(WorkerOptions.newBuilder().build()) + .setWorkflowTypes(TestUpdateWorkflowImpl.class) + .setActivityImplementations(new ActivityImpl()) + .build(); + + @Test + public void testContinueAsNewInAUpdate() { + String workflowId = UUID.randomUUID().toString(); + WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient(); + WorkflowOptions options = + SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue()).toBuilder() + .setWorkflowId(workflowId) + .build(); + WorkflowWithUpdate workflow = workflowClient.newWorkflowStub(WorkflowWithUpdate.class, options); + // To execute workflow client.execute() would do. But we want to start workflow and immediately + // return. + WorkflowExecution execution = WorkflowClient.start(workflow::execute); + + SDKTestWorkflowRule.waitForOKQuery(workflow); + assertEquals("initial", workflow.getState()); + + assertEquals(workflowId, execution.getWorkflowId()); + + assertEquals("Execute-Hello Update", workflow.update(0, "Hello Update")); + assertEquals("Execute-Hello Update 2", workflow.update(0, "Hello Update 2")); + // Complete should fail since we have not continued as new yet + assertThrows(WorkflowUpdateException.class, () -> workflow.complete()); + + // Send an update to continue as new, must be async since the update won't complete + WorkflowStub workflowStub = WorkflowStub.fromTyped(workflow); + workflowStub.startUpdate("update", String.class, 0, ""); + + testWorkflowRule.waitForTheEndOfWFT(execution.getWorkflowId()); + testWorkflowRule.invalidateWorkflowCache(); + + assertEquals("Execute-Hello Update", workflow.update(0, "Hello Update")); + assertEquals("Execute-Hello Update 2", workflow.update(0, "Hello Update 2")); + + workflow.complete(); + + String result = + testWorkflowRule + .getWorkflowClient() + .newUntypedWorkflowStub(execution, Optional.empty()) + .getResult(String.class); + assertEquals("Execute-Hello Update Execute-Hello Update 2", result); + } + + public static class TestUpdateWorkflowImpl implements WorkflowWithUpdate { + String state = "initial"; + List updates = new ArrayList<>(); + CompletablePromise promise = Workflow.newPromise(); + private final TestActivities.TestActivity1 activity = + Workflow.newActivityStub( + TestActivities.TestActivity1.class, + ActivityOptions.newBuilder().setScheduleToCloseTimeout(Duration.ofHours(1)).build()); + + @Override + public String execute() { + promise.get(); + return updates.get(0) + " " + updates.get(1); + } + + @Override + public String getState() { + return state; + } + + @Override + public String update(Integer index, String value) { + if (value.isEmpty()) { + Workflow.newContinueAsNewStub(WorkflowWithUpdate.class).execute(); + } + String result = activity.execute(value); + updates.add(result); + return result; + } + + @Override + public void updateValidator(Integer index, String value) {} + + @Override + public void complete() { + promise.complete(null); + } + + @Override + public void completeValidator() { + if (updates.size() < 2 || !Workflow.getInfo().getContinuedExecutionRunId().isPresent()) { + throw new RuntimeException("Workflow not ready to complete"); + } + } + } + + @ActivityInterface + public interface GreetingActivities { + @ActivityMethod + String hello(String input); + } + + public static class ActivityImpl implements TestActivities.TestActivity1 { + @Override + public String execute(String input) { + return Activity.getExecutionContext().getInfo().getActivityType() + "-" + input; + } + } +}