Skip to content

Commit

Permalink
Test continue as new in an update
Browse files Browse the repository at this point in the history
  • Loading branch information
Quinn-With-Two-Ns committed Oct 30, 2023
1 parent 8b3be3b commit 7a62d03
Showing 1 changed file with 162 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.workflow.updateTest;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;

import io.temporal.activity.Activity;
import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;
import io.temporal.activity.ActivityOptions;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.client.*;
import io.temporal.testing.internal.SDKTestOptions;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.worker.WorkerOptions;
import io.temporal.workflow.CompletablePromise;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.shared.TestActivities;
import io.temporal.workflow.shared.TestWorkflows.WorkflowWithUpdate;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UpdateTestContinueAsNew {

private static final Logger log = LoggerFactory.getLogger(UpdateTestContinueAsNew.class);

@Rule
public SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder()
.setWorkerOptions(WorkerOptions.newBuilder().build())
.setWorkflowTypes(TestUpdateWorkflowImpl.class)
.setActivityImplementations(new ActivityImpl())
.build();

@Test
public void testContinueAsNewInAUpdate() {
String workflowId = UUID.randomUUID().toString();
WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient();
WorkflowOptions options =
SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue()).toBuilder()
.setWorkflowId(workflowId)
.build();
WorkflowWithUpdate workflow = workflowClient.newWorkflowStub(WorkflowWithUpdate.class, options);
// To execute workflow client.execute() would do. But we want to start workflow and immediately
// return.
WorkflowExecution execution = WorkflowClient.start(workflow::execute);

SDKTestWorkflowRule.waitForOKQuery(workflow);
assertEquals("initial", workflow.getState());

assertEquals(workflowId, execution.getWorkflowId());

assertEquals("Execute-Hello Update", workflow.update(0, "Hello Update"));
assertEquals("Execute-Hello Update 2", workflow.update(0, "Hello Update 2"));
// Complete should fail since we have not continued as new yet
assertThrows(WorkflowUpdateException.class, () -> workflow.complete());

// Send an update to continue as new, must be async since the update won't complete
WorkflowStub workflowStub = WorkflowStub.fromTyped(workflow);
workflowStub.startUpdate("update", String.class, 0, "");

testWorkflowRule.waitForTheEndOfWFT(execution.getWorkflowId());
testWorkflowRule.invalidateWorkflowCache();

assertEquals("Execute-Hello Update", workflow.update(0, "Hello Update"));
assertEquals("Execute-Hello Update 2", workflow.update(0, "Hello Update 2"));

workflow.complete();

String result =
testWorkflowRule
.getWorkflowClient()
.newUntypedWorkflowStub(execution, Optional.empty())
.getResult(String.class);
assertEquals("Execute-Hello Update Execute-Hello Update 2", result);
}

public static class TestUpdateWorkflowImpl implements WorkflowWithUpdate {
String state = "initial";
List<String> updates = new ArrayList<>();
CompletablePromise<Void> promise = Workflow.newPromise();
private final TestActivities.TestActivity1 activity =
Workflow.newActivityStub(
TestActivities.TestActivity1.class,
ActivityOptions.newBuilder().setScheduleToCloseTimeout(Duration.ofHours(1)).build());

@Override
public String execute() {
promise.get();
return updates.get(0) + " " + updates.get(1);
}

@Override
public String getState() {
return state;
}

@Override
public String update(Integer index, String value) {
if (value.isEmpty()) {
Workflow.newContinueAsNewStub(WorkflowWithUpdate.class).execute();
}
String result = activity.execute(value);
updates.add(result);
return result;
}

@Override
public void updateValidator(Integer index, String value) {}

@Override
public void complete() {
promise.complete(null);
}

@Override
public void completeValidator() {
if (updates.size() < 2 || !Workflow.getInfo().getContinuedExecutionRunId().isPresent()) {
throw new RuntimeException("Workflow not ready to complete");
}
}
}

@ActivityInterface
public interface GreetingActivities {
@ActivityMethod
String hello(String input);
}

public static class ActivityImpl implements TestActivities.TestActivity1 {
@Override
public String execute(String input) {
return Activity.getExecutionContext().getInfo().getActivityType() + "-" + input;
}
}
}

0 comments on commit 7a62d03

Please sign in to comment.