diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowOutboundCallsInterceptor.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowOutboundCallsInterceptor.java index 5d6661d53..7b6add447 100644 --- a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowOutboundCallsInterceptor.java +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowOutboundCallsInterceptor.java @@ -177,11 +177,11 @@ private Tracer.SpanBuilder createChildWorkflowStartSpanBuilder(ChildWorkflow } private Tracer.SpanBuilder createContinueAsNewWorkflowStartSpanBuilder(ContinueAsNewInput input) { - WorkflowInfo parentWorkflowInfo = Workflow.getInfo(); + WorkflowInfo continuedWorkflowInfo = Workflow.getInfo(); return spanFactory.createContinueAsNewWorkflowStartSpan( tracer, - MoreObjects.firstNonNull(input.getWorkflowType(), parentWorkflowInfo.getWorkflowType()), - parentWorkflowInfo.getWorkflowId(), - parentWorkflowInfo.getRunId()); + MoreObjects.firstNonNull(input.getWorkflowType(), continuedWorkflowInfo.getWorkflowType()), + continuedWorkflowInfo.getWorkflowId(), + continuedWorkflowInfo.getRunId()); } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/client/WorkflowClientRequestFactory.java b/temporal-sdk/src/main/java/io/temporal/internal/client/WorkflowClientRequestFactory.java index d39705ce2..53e43f5a2 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/client/WorkflowClientRequestFactory.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/WorkflowClientRequestFactory.java @@ -21,7 +21,7 @@ package io.temporal.internal.client; import static io.temporal.internal.common.HeaderUtils.toHeaderGrpc; -import static io.temporal.internal.common.SerializerUtils.toRetryPolicy; +import static io.temporal.internal.common.RetryOptionsUtils.toRetryPolicy; import com.google.common.base.MoreObjects; import com.google.common.base.Strings; diff --git a/temporal-sdk/src/main/java/io/temporal/internal/common/RetryOptionsUtils.java b/temporal-sdk/src/main/java/io/temporal/internal/common/RetryOptionsUtils.java index 4f670016a..11e36aa64 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/common/RetryOptionsUtils.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/common/RetryOptionsUtils.java @@ -27,6 +27,7 @@ import io.temporal.failure.ApplicationFailure; import io.temporal.failure.ChildWorkflowFailure; import java.time.Duration; +import java.util.Arrays; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; @@ -95,4 +96,21 @@ public static RetryOptions toRetryOptions(RetryPolicy retryPolicy) { return roBuilder.validateBuildWithDefaults(); } + + public static RetryPolicy.Builder toRetryPolicy(RetryOptions retryOptions) { + RetryPolicy.Builder builder = + RetryPolicy.newBuilder() + .setInitialInterval( + ProtobufTimeUtils.toProtoDuration(retryOptions.getInitialInterval())) + .setMaximumInterval( + ProtobufTimeUtils.toProtoDuration(retryOptions.getMaximumInterval())) + .setBackoffCoefficient(retryOptions.getBackoffCoefficient()) + .setMaximumAttempts(retryOptions.getMaximumAttempts()); + + if (retryOptions.getDoNotRetry() != null) { + builder = builder.addAllNonRetryableErrorTypes(Arrays.asList(retryOptions.getDoNotRetry())); + } + + return builder; + } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/common/SerializerUtils.java b/temporal-sdk/src/main/java/io/temporal/internal/common/SerializerUtils.java deleted file mode 100644 index 085ead46a..000000000 --- a/temporal-sdk/src/main/java/io/temporal/internal/common/SerializerUtils.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. - * - * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Modifications copyright (C) 2017 Uber Technologies, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this material except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.temporal.internal.common; - -import io.temporal.api.common.v1.RetryPolicy; -import io.temporal.common.RetryOptions; -import java.util.Arrays; - -public class SerializerUtils { - public static RetryPolicy.Builder toRetryPolicy(RetryOptions retryOptions) { - RetryPolicy.Builder builder = - RetryPolicy.newBuilder() - .setInitialInterval( - ProtobufTimeUtils.toProtoDuration(retryOptions.getInitialInterval())) - .setMaximumInterval( - ProtobufTimeUtils.toProtoDuration(retryOptions.getMaximumInterval())) - .setBackoffCoefficient(retryOptions.getBackoffCoefficient()) - .setMaximumAttempts(retryOptions.getMaximumAttempts()); - - if (retryOptions.getDoNotRetry() != null) { - builder = builder.addAllNonRetryableErrorTypes(Arrays.asList(retryOptions.getDoNotRetry())); - } - - return builder; - } -} diff --git a/temporal-sdk/src/main/java/io/temporal/internal/replay/BasicWorkflowContext.java b/temporal-sdk/src/main/java/io/temporal/internal/replay/BasicWorkflowContext.java index e6af6c1f5..3e0932b0b 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/replay/BasicWorkflowContext.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/replay/BasicWorkflowContext.java @@ -20,11 +20,14 @@ package io.temporal.internal.replay; +import static io.temporal.internal.common.RetryOptionsUtils.toRetryOptions; + import com.google.common.base.Preconditions; import com.google.protobuf.util.Timestamps; import io.temporal.api.common.v1.*; import io.temporal.api.failure.v1.Failure; import io.temporal.api.history.v1.WorkflowExecutionStartedEventAttributes; +import io.temporal.common.RetryOptions; import io.temporal.internal.common.ProtobufTimeUtils; import java.time.Duration; import java.util.Map; @@ -146,4 +149,12 @@ public Payloads getLastCompletionResult() { public Failure getPreviousRunFailure() { return previousRunFailure; } + + @Nullable + public RetryOptions getRetryOptions() { + if (!startedAttributes.hasRetryPolicy()) { + return null; + } + return toRetryOptions(startedAttributes.getRetryPolicy()); + } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContext.java b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContext.java index d24a0764c..30a3eeb2c 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContext.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContext.java @@ -26,6 +26,7 @@ import io.temporal.api.common.v1.*; import io.temporal.api.failure.v1.Failure; import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponse; +import io.temporal.common.RetryOptions; import io.temporal.internal.common.SdkFlag; import io.temporal.internal.statemachines.ExecuteActivityParameters; import io.temporal.internal.statemachines.ExecuteLocalActivityParameters; @@ -117,6 +118,9 @@ public Functions.Proc1 getCancellationHandle() { /** Workflow task queue name. */ String getTaskQueue(); + @Nullable + RetryOptions getRetryOptions(); + /** Workflow namespace. */ String getNamespace(); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContextImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContextImpl.java index 46cb9aaf1..8d5a99c23 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContextImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContextImpl.java @@ -30,6 +30,7 @@ import io.temporal.api.failure.v1.Failure; import io.temporal.api.history.v1.HistoryEvent; import io.temporal.api.history.v1.WorkflowExecutionStartedEventAttributes; +import io.temporal.common.RetryOptions; import io.temporal.failure.CanceledFailure; import io.temporal.internal.common.ProtobufTimeUtils; import io.temporal.internal.common.SdkFlag; @@ -141,6 +142,12 @@ public String getTaskQueue() { return basicWorkflowContext.getTaskQueue(); } + @Nullable + @Override + public RetryOptions getRetryOptions() { + return basicWorkflowContext.getRetryOptions(); + } + @Override public String getNamespace() { return basicWorkflowContext.getNamespace(); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java index 986beb704..fb8ed08d0 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java @@ -22,7 +22,7 @@ import static io.temporal.internal.common.HeaderUtils.intoPayloadMap; import static io.temporal.internal.common.HeaderUtils.toHeaderGrpc; -import static io.temporal.internal.common.SerializerUtils.toRetryPolicy; +import static io.temporal.internal.common.RetryOptionsUtils.toRetryPolicy; import static io.temporal.internal.sync.WorkflowInternal.DEFAULT_VERSION; import com.google.common.base.MoreObjects; @@ -1108,6 +1108,11 @@ public void continueAsNew(ContinueAsNewInput input) { if (options.getTaskQueue() != null && !options.getTaskQueue().isEmpty()) { attributes.setTaskQueue(TaskQueue.newBuilder().setName(options.getTaskQueue())); } + if (options.getRetryOptions() != null) { + attributes.setRetryPolicy(toRetryPolicy(options.getRetryOptions())); + } else if (replayContext.getRetryOptions() != null) { + attributes.setRetryPolicy(toRetryPolicy(replayContext.getRetryOptions())); + } Map searchAttributes = options.getSearchAttributes(); if (searchAttributes != null && !searchAttributes.isEmpty()) { if (options.getTypedSearchAttributes() != null) { @@ -1132,6 +1137,9 @@ public void continueAsNew(ContinueAsNewInput input) { .determineUseCompatibleFlag( replayContext.getTaskQueue().equals(options.getTaskQueue()))); } + } else if (replayContext.getRetryOptions() != null) { + // Have to copy retry options as server doesn't copy them. + attributes.setRetryPolicy(toRetryPolicy(replayContext.getRetryOptions())); } List propagators = diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInfoImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInfoImpl.java index bda3a60b3..fc546c951 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInfoImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInfoImpl.java @@ -22,6 +22,7 @@ import io.temporal.api.common.v1.SearchAttributes; import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.common.RetryOptions; import io.temporal.internal.replay.ReplayWorkflowContext; import io.temporal.workflow.WorkflowInfo; import java.time.Duration; @@ -80,6 +81,12 @@ public String getTaskQueue() { return context.getTaskQueue(); } + @Nullable + @Override + public RetryOptions getRetryOptions() { + return context.getRetryOptions(); + } + @Override public Duration getWorkflowRunTimeout() { return context.getWorkflowRunTimeout(); diff --git a/temporal-sdk/src/main/java/io/temporal/workflow/ContinueAsNewOptions.java b/temporal-sdk/src/main/java/io/temporal/workflow/ContinueAsNewOptions.java index 84455988c..1908d0846 100644 --- a/temporal-sdk/src/main/java/io/temporal/workflow/ContinueAsNewOptions.java +++ b/temporal-sdk/src/main/java/io/temporal/workflow/ContinueAsNewOptions.java @@ -20,6 +20,7 @@ package io.temporal.workflow; +import io.temporal.common.RetryOptions; import io.temporal.common.SearchAttributes; import io.temporal.common.VersioningIntent; import io.temporal.common.context.ContextPropagator; @@ -56,6 +57,7 @@ public static final class Builder { private Duration workflowRunTimeout; private String taskQueue; + private RetryOptions retryOptions; private Duration workflowTaskTimeout; private Map memo; private Map searchAttributes; @@ -71,6 +73,7 @@ private Builder(ContinueAsNewOptions options) { } this.workflowRunTimeout = options.workflowRunTimeout; this.taskQueue = options.taskQueue; + this.retryOptions = options.retryOptions; this.workflowTaskTimeout = options.workflowTaskTimeout; this.memo = options.getMemo(); this.searchAttributes = options.getSearchAttributes(); @@ -89,6 +92,11 @@ public Builder setTaskQueue(String taskQueue) { return this; } + public Builder setRetryOptions(RetryOptions retryOptions) { + this.retryOptions = retryOptions; + return this; + } + public Builder setWorkflowTaskTimeout(Duration workflowTaskTimeout) { this.workflowTaskTimeout = workflowTaskTimeout; return this; @@ -143,6 +151,7 @@ public ContinueAsNewOptions build() { return new ContinueAsNewOptions( workflowRunTimeout, taskQueue, + retryOptions, workflowTaskTimeout, memo, searchAttributes, @@ -154,6 +163,7 @@ public ContinueAsNewOptions build() { private final @Nullable Duration workflowRunTimeout; private final @Nullable String taskQueue; + private final @Nullable RetryOptions retryOptions; private final @Nullable Duration workflowTaskTimeout; private final @Nullable Map memo; private final @Nullable Map searchAttributes; @@ -164,6 +174,7 @@ public ContinueAsNewOptions build() { public ContinueAsNewOptions( @Nullable Duration workflowRunTimeout, @Nullable String taskQueue, + @Nullable RetryOptions retryOptions, @Nullable Duration workflowTaskTimeout, @Nullable Map memo, @Nullable Map searchAttributes, @@ -172,6 +183,7 @@ public ContinueAsNewOptions( @Nullable VersioningIntent versioningIntent) { this.workflowRunTimeout = workflowRunTimeout; this.taskQueue = taskQueue; + this.retryOptions = retryOptions; this.workflowTaskTimeout = workflowTaskTimeout; this.memo = memo; this.searchAttributes = searchAttributes; @@ -188,6 +200,11 @@ public ContinueAsNewOptions( return taskQueue; } + @Nullable + public RetryOptions getRetryOptions() { + return retryOptions; + } + public @Nullable Duration getWorkflowTaskTimeout() { return workflowTaskTimeout; } diff --git a/temporal-sdk/src/main/java/io/temporal/workflow/WorkflowInfo.java b/temporal-sdk/src/main/java/io/temporal/workflow/WorkflowInfo.java index 0dea22152..cff44064b 100644 --- a/temporal-sdk/src/main/java/io/temporal/workflow/WorkflowInfo.java +++ b/temporal-sdk/src/main/java/io/temporal/workflow/WorkflowInfo.java @@ -21,6 +21,7 @@ package io.temporal.workflow; import io.temporal.api.common.v1.SearchAttributes; +import io.temporal.common.RetryOptions; import java.time.Duration; import java.util.Optional; import javax.annotation.Nonnull; @@ -89,6 +90,9 @@ public interface WorkflowInfo { */ String getTaskQueue(); + @Nullable + RetryOptions getRetryOptions(); + /** * @return Timeout for a Workflow Run specified during Workflow start in {@link * io.temporal.client.WorkflowOptions.Builder#setWorkflowRunTimeout(Duration)} diff --git a/temporal-sdk/src/test/java/io/temporal/internal/common/RetryOptionsUtilsTest.java b/temporal-sdk/src/test/java/io/temporal/internal/common/RetryOptionsUtilsTest.java index 511224e17..c54cdd7ef 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/common/RetryOptionsUtilsTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/common/RetryOptionsUtilsTest.java @@ -20,6 +20,7 @@ package io.temporal.internal.common; +import static io.temporal.internal.common.RetryOptionsUtils.toRetryPolicy; import static org.junit.Assert.assertEquals; import io.temporal.api.common.v1.RetryPolicy; @@ -32,7 +33,7 @@ public class RetryOptionsUtilsTest { public void buildRetryOptions() { Duration initialInterval = Duration.ofSeconds(2); Duration maxInterval = Duration.ofSeconds(5); - RetryPolicy retryPolicy = + RetryPolicy retryPolicy1 = RetryPolicy.newBuilder() .setInitialInterval(ProtobufTimeUtils.toProtoDuration(initialInterval)) .setMaximumInterval(ProtobufTimeUtils.toProtoDuration(maxInterval)) @@ -41,11 +42,33 @@ public void buildRetryOptions() { .addNonRetryableErrorTypes(IllegalStateException.class.getName()) .build(); - RetryOptions retryOptions = RetryOptionsUtils.toRetryOptions(retryPolicy); + RetryOptions retryOptions = RetryOptionsUtils.toRetryOptions(retryPolicy1); assertEquals(initialInterval, retryOptions.getInitialInterval()); assertEquals(maxInterval, retryOptions.getMaximumInterval()); assertEquals(5, retryOptions.getMaximumAttempts()); assertEquals(2, retryOptions.getBackoffCoefficient(), 0.001); assertEquals(IllegalStateException.class.getName(), retryOptions.getDoNotRetry()[0]); + + assertEquals( + retryPolicy1.getInitialInterval().getSeconds(), + retryOptions.getInitialInterval().getSeconds()); + assertEquals( + retryPolicy1.getMaximumInterval().getSeconds(), + retryOptions.getMaximumInterval().getSeconds()); + assertEquals(retryPolicy1.getMaximumAttempts(), retryOptions.getMaximumAttempts()); + assertEquals(retryPolicy1.getBackoffCoefficient(), retryOptions.getBackoffCoefficient(), 0.001); + assertEquals(retryPolicy1.getNonRetryableErrorTypes(0), retryOptions.getDoNotRetry()[0]); + + RetryPolicy retryPolicy2 = toRetryPolicy(retryOptions).build(); + + assertEquals( + retryPolicy2.getInitialInterval().getSeconds(), + retryOptions.getInitialInterval().getSeconds()); + assertEquals( + retryPolicy2.getMaximumInterval().getSeconds(), + retryOptions.getMaximumInterval().getSeconds()); + assertEquals(retryPolicy2.getMaximumAttempts(), retryOptions.getMaximumAttempts()); + assertEquals(retryPolicy2.getBackoffCoefficient(), retryOptions.getBackoffCoefficient(), 0.001); + assertEquals(retryPolicy2.getNonRetryableErrorTypes(0), retryOptions.getDoNotRetry()[0]); } } diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/ContinueAsNewTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/ContinueAsNewTest.java index a7c791d24..63615de4a 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/ContinueAsNewTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/ContinueAsNewTest.java @@ -22,8 +22,11 @@ import static org.junit.Assert.assertEquals; +import io.temporal.client.WorkflowOptions; +import io.temporal.common.RetryOptions; import io.temporal.common.SearchAttributeKey; import io.temporal.common.SearchAttributes; +import io.temporal.testing.internal.SDKTestOptions; import io.temporal.testing.internal.SDKTestWorkflowRule; import io.temporal.testing.internal.TracingWorkerInterceptor; import java.util.HashMap; @@ -34,15 +37,23 @@ public class ContinueAsNewTest { + public static final int INITIAL_COUNT = 4; + @Rule public SDKTestWorkflowRule testWorkflowRule = SDKTestWorkflowRule.newBuilder().setWorkflowTypes(TestContinueAsNewImpl.class).build(); @Test public void testContinueAsNew() { + WorkflowOptions options = + SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue()); + options = + WorkflowOptions.newBuilder(options) + .setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(10).build()) + .build(); TestContinueAsNew client = - testWorkflowRule.newWorkflowStubTimeoutOptions(TestContinueAsNew.class); - int result = client.execute(4, testWorkflowRule.getTaskQueue()); + testWorkflowRule.getWorkflowClient().newWorkflowStub(TestContinueAsNew.class, options); + int result = client.execute(INITIAL_COUNT, testWorkflowRule.getTaskQueue()); Assert.assertEquals(111, result); testWorkflowRule .getInterceptor(TracingWorkerInterceptor.class) @@ -75,12 +86,30 @@ public static class TestContinueAsNewImpl implements TestContinueAsNew { @Override public int execute(int count, String continueAsNewTaskQueue) { String taskQueue = Workflow.getInfo().getTaskQueue(); + if (count >= INITIAL_COUNT - 2) { + assertEquals(10, Workflow.getInfo().getRetryOptions().getMaximumAttempts()); + } else { + assertEquals(5, Workflow.getInfo().getRetryOptions().getMaximumAttempts()); + } if (count == 0) { assertEquals(continueAsNewTaskQueue, taskQueue); return 111; } Map memo = new HashMap<>(); memo.put("myKey", "MyValue"); + RetryOptions retryOptions = null; + // don't specify ContinueAsNewOptions on the first continue-as-new to test that RetryOptions + // are copied from the previous run. + if (count == INITIAL_COUNT) { + TestContinueAsNew next = Workflow.newContinueAsNewStub(TestContinueAsNew.class); + next.execute(count - 1, continueAsNewTaskQueue); + throw new RuntimeException("unreachable"); + } + // don't specify RetryOptions on the second continue-as-new to test that they are copied from + // the previous run. + if (count < INITIAL_COUNT - 1) { + retryOptions = RetryOptions.newBuilder().setMaximumAttempts(5).build(); + } SearchAttributes searchAttributes = SearchAttributes.newBuilder() .set(SearchAttributeKey.forKeyword("CustomKeywordField"), "foo1") @@ -88,6 +117,7 @@ public int execute(int count, String continueAsNewTaskQueue) { ContinueAsNewOptions options = ContinueAsNewOptions.newBuilder() .setTaskQueue(continueAsNewTaskQueue) + .setRetryOptions(retryOptions) .setMemo(memo) .setTypedSearchAttributes(searchAttributes) .build(); diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java index 91438fcc4..f23bcd0ad 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java @@ -857,6 +857,9 @@ private static void startWorkflow( .setInput(request.getInput()) .setTaskQueue(request.getTaskQueue()) .setAttempt(1); + if (request.hasRetryPolicy()) { + a.setRetryPolicy(request.getRetryPolicy()); + } data.retryState.ifPresent( testServiceRetryState -> a.setAttempt(testServiceRetryState.getAttempt())); a.setFirstExecutionRunId(data.firstExecutionRunId); diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java index 479a047a8..a3a88b3ea 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java @@ -212,7 +212,7 @@ private interface UpdateProcedure { private StartWorkflowExecutionRequest overrideStartWorkflowExecutionRequest( StartWorkflowExecutionRequest r) { StartWorkflowExecutionRequest.Builder request = - validateStartWorkflowExecutionRequest(r).toBuilder(); + validateStartWorkflowExecutionRequest(r.toBuilder()); long executionTimeoutMillis = Durations.toMillis(request.getWorkflowExecutionTimeout()); if (executionTimeoutMillis == 0) { executionTimeoutMillis = DEFAULT_WORKFLOW_EXECUTION_TIMEOUT_MILLISECONDS; @@ -248,8 +248,8 @@ private StartWorkflowExecutionRequest overrideStartWorkflowExecutionRequest( } /** Based on validateStartWorkflowExecutionRequest from historyEngine.go */ - private StartWorkflowExecutionRequest validateStartWorkflowExecutionRequest( - StartWorkflowExecutionRequest request) { + private StartWorkflowExecutionRequest.Builder validateStartWorkflowExecutionRequest( + StartWorkflowExecutionRequest.Builder request) { if (request.getRequestId().isEmpty()) { throw Status.INVALID_ARGUMENT.withDescription("Missing request ID.").asRuntimeException(); @@ -276,7 +276,7 @@ private StartWorkflowExecutionRequest validateStartWorkflowExecutionRequest( throw Status.INVALID_ARGUMENT.withDescription("Missing WorkflowType.").asRuntimeException(); } if (request.hasRetryPolicy()) { - validateAndOverrideRetryPolicy(request.getRetryPolicy()); + request.setRetryPolicy(validateAndOverrideRetryPolicy(request.getRetryPolicy())); } return request; } @@ -1331,8 +1331,10 @@ private void processFailWorkflowExecution( if (startRequest.hasMemo()) { continueAsNewAttr.setMemo(startRequest.getMemo()); } - workflow.action( - Action.CONTINUE_AS_NEW, ctx, continueAsNewAttr.build(), workflowTaskCompletedId); + // TODO + ContinueAsNewWorkflowExecutionCommandAttributes coninueAsNewCommand = + continueAsNewAttr.build(); + workflow.action(Action.CONTINUE_AS_NEW, ctx, coninueAsNewCommand, workflowTaskCompletedId); workflowTaskStateMachine.getData().workflowCompleted = true; HistoryEvent event = ctx.getEvents().get(ctx.getEvents().size() - 1); WorkflowExecutionContinuedAsNewEventAttributes continuedAsNewEventAttributes = @@ -1342,6 +1344,7 @@ private void processFailWorkflowExecution( Optional.of(rs.getNextAttempt(Optional.of(failure))); service.continueAsNew( startRequest, + coninueAsNewCommand, continuedAsNewEventAttributes, continuedRetryState, identity, @@ -1463,14 +1466,15 @@ private void startNewCronRun( .setRetryPolicy(startRequest.getRetryPolicy()) .setLastCompletionResult(lastCompletionResult); lastFailure.ifPresent(builder::setFailure); - ContinueAsNewWorkflowExecutionCommandAttributes continueAsNewAttr = builder.build(); - workflow.action(Action.CONTINUE_AS_NEW, ctx, continueAsNewAttr, workflowTaskCompletedId); + ContinueAsNewWorkflowExecutionCommandAttributes continueAsNewCommandAttr = builder.build(); + workflow.action(Action.CONTINUE_AS_NEW, ctx, continueAsNewCommandAttr, workflowTaskCompletedId); workflowTaskStateMachine.getData().workflowCompleted = true; HistoryEvent event = ctx.getEvents().get(ctx.getEvents().size() - 1); WorkflowExecutionContinuedAsNewEventAttributes continuedAsNewEventAttributes = event.getWorkflowExecutionContinuedAsNewEventAttributes(); service.continueAsNew( startRequest, + continueAsNewCommandAttr, continuedAsNewEventAttributes, Optional.empty(), identity, @@ -1526,6 +1530,7 @@ private void processContinueAsNewWorkflowExecution( HistoryEvent event = ctx.getEvents().get(ctx.getEvents().size() - 1); service.continueAsNew( startRequest, + d, event.getWorkflowExecutionContinuedAsNewEventAttributes(), workflow.getData().retryState, identity, diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java index a7c1a1f66..2d7535532 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java @@ -30,6 +30,7 @@ import com.google.protobuf.util.Timestamps; import io.grpc.*; import io.grpc.stub.StreamObserver; +import io.temporal.api.command.v1.ContinueAsNewWorkflowExecutionCommandAttributes; import io.temporal.api.command.v1.SignalExternalWorkflowExecutionCommandAttributes; import io.temporal.api.common.v1.Payload; import io.temporal.api.common.v1.Payloads; @@ -940,7 +941,8 @@ public void signalExternalWorkflowExecution( */ public String continueAsNew( StartWorkflowExecutionRequest previousRunStartRequest, - WorkflowExecutionContinuedAsNewEventAttributes a, + ContinueAsNewWorkflowExecutionCommandAttributes ca, + WorkflowExecutionContinuedAsNewEventAttributes ea, Optional retryState, String identity, ExecutionId continuedExecutionId, @@ -950,40 +952,45 @@ public String continueAsNew( StartWorkflowExecutionRequest.Builder startRequestBuilder = StartWorkflowExecutionRequest.newBuilder() .setRequestId(UUID.randomUUID().toString()) - .setWorkflowType(a.getWorkflowType()) - .setWorkflowRunTimeout(a.getWorkflowRunTimeout()) - .setWorkflowTaskTimeout(a.getWorkflowTaskTimeout()) + .setWorkflowType(ea.getWorkflowType()) + .setWorkflowRunTimeout(ea.getWorkflowRunTimeout()) + .setWorkflowTaskTimeout(ea.getWorkflowTaskTimeout()) .setNamespace(continuedExecutionId.getNamespace()) - .setTaskQueue(a.getTaskQueue()) + .setTaskQueue(ea.getTaskQueue()) .setWorkflowId(continuedExecutionId.getWorkflowId().getWorkflowId()) .setWorkflowIdReusePolicy(previousRunStartRequest.getWorkflowIdReusePolicy()) .setIdentity(identity) .setCronSchedule(previousRunStartRequest.getCronSchedule()); - if (previousRunStartRequest.hasRetryPolicy()) { - startRequestBuilder.setRetryPolicy(previousRunStartRequest.getRetryPolicy()); + // TODO: Service doesn't perform this copy. + // See https://github.com/temporalio/temporal/issues/5249 + // if (previousRunStartRequest.hasRetryPolicy()) { + // startRequestBuilder.setRetryPolicy(previousRunStartRequest.getRetryPolicy()); + // } + if (ca.hasRetryPolicy()) { + startRequestBuilder.setRetryPolicy(ca.getRetryPolicy()); } - if (a.hasInput()) { - startRequestBuilder.setInput(a.getInput()); + if (ea.hasInput()) { + startRequestBuilder.setInput(ea.getInput()); } - if (a.hasHeader()) { - startRequestBuilder.setHeader(a.getHeader()); + if (ea.hasHeader()) { + startRequestBuilder.setHeader(ea.getHeader()); } StartWorkflowExecutionRequest startRequest = startRequestBuilder.build(); lock.lock(); Optional lastFail = - a.hasFailure() - ? Optional.of(a.getFailure()) + ea.hasFailure() + ? Optional.of(ea.getFailure()) : retryState.flatMap(TestServiceRetryState::getPreviousRunFailure); try { StartWorkflowExecutionResponse response = startWorkflowExecutionNoRunningCheckLocked( startRequest, - a.getNewExecutionRunId(), + ea.getNewExecutionRunId(), firstExecutionRunId, Optional.of(continuedExecutionId.getExecution().getRunId()), retryState, - ProtobufTimeUtils.toJavaDuration(a.getBackoffStartInterval()), - a.getLastCompletionResult(), + ProtobufTimeUtils.toJavaDuration(ea.getBackoffStartInterval()), + ea.getLastCompletionResult(), lastFail, parent, parentChildInitiatedEventId, diff --git a/temporal-testing/src/main/java/io/temporal/internal/sync/DummySyncWorkflowContext.java b/temporal-testing/src/main/java/io/temporal/internal/sync/DummySyncWorkflowContext.java index da018e4b0..18be08aaf 100644 --- a/temporal-testing/src/main/java/io/temporal/internal/sync/DummySyncWorkflowContext.java +++ b/temporal-testing/src/main/java/io/temporal/internal/sync/DummySyncWorkflowContext.java @@ -30,6 +30,7 @@ import io.temporal.api.common.v1.WorkflowExecution; import io.temporal.api.common.v1.WorkflowType; import io.temporal.api.failure.v1.Failure; +import io.temporal.common.RetryOptions; import io.temporal.common.converter.DefaultDataConverter; import io.temporal.failure.CanceledFailure; import io.temporal.internal.common.SdkFlag; @@ -118,6 +119,12 @@ public String getTaskQueue() { return "dummy-task-queue"; } + @Nullable + @Override + public RetryOptions getRetryOptions() { + return null; + } + @Override public String getNamespace() { return "dummy-namespace";