From 26393687684f7ef0d60e0890b355172ab6405717 Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Sun, 15 Oct 2023 15:09:44 -0700 Subject: [PATCH] Add start delay --- .../io/temporal/client/WorkflowOptions.java | 41 ++++++++++++++++--- .../client/WorkflowClientRequestFactory.java | 8 ++++ .../client/functional/SignalTest.java | 40 ++++++++++++++++++ .../temporal/client/functional/StartTest.java | 34 +++++++++++++++ .../internal/testservice/StateMachines.java | 8 ++++ .../testservice/TestWorkflowService.java | 4 ++ 6 files changed, 130 insertions(+), 5 deletions(-) diff --git a/temporal-sdk/src/main/java/io/temporal/client/WorkflowOptions.java b/temporal-sdk/src/main/java/io/temporal/client/WorkflowOptions.java index 578168ac4..c601a5daa 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/WorkflowOptions.java +++ b/temporal-sdk/src/main/java/io/temporal/client/WorkflowOptions.java @@ -23,6 +23,7 @@ import com.google.common.base.Objects; import io.temporal.api.enums.v1.WorkflowIdReusePolicy; import io.temporal.common.CronSchedule; +import io.temporal.common.Experimental; import io.temporal.common.MethodRetry; import io.temporal.common.RetryOptions; import io.temporal.common.SearchAttributes; @@ -76,6 +77,7 @@ public static WorkflowOptions merge( .setTypedSearchAttributes(o.getTypedSearchAttributes()) .setContextPropagators(o.getContextPropagators()) .setDisableEagerExecution(o.isDisableEagerExecution()) + .setStartDelay(o.getStartDelay()) .validateBuildWithDefaults(); } @@ -107,6 +109,8 @@ public static final class Builder { private boolean disableEagerExecution = true; + private Duration startDelay; + private Builder() {} private Builder(WorkflowOptions options) { @@ -126,6 +130,7 @@ private Builder(WorkflowOptions options) { this.typedSearchAttributes = options.typedSearchAttributes; this.contextPropagators = options.contextPropagators; this.disableEagerExecution = options.disableEagerExecution; + this.startDelay = options.startDelay; } /** @@ -344,6 +349,18 @@ public Builder setDisableEagerExecution(boolean disableEagerExecution) { return this; } + /** + * Time to wait before dispatching the first workflow task. If the workflow gets a signal before + * the delay, a workflow task will be dispatched and the rest of the delay will be ignored. A + * signal from signal with start will not trigger a workflow task. Cannot be set the same time + * as a CronSchedule. + */ + @Experimental + public Builder setStartDelay(Duration startDelay) { + this.startDelay = startDelay; + return this; + } + public WorkflowOptions build() { return new WorkflowOptions( workflowId, @@ -358,7 +375,8 @@ public WorkflowOptions build() { searchAttributes, typedSearchAttributes, contextPropagators, - disableEagerExecution); + disableEagerExecution, + startDelay); } /** @@ -378,7 +396,8 @@ public WorkflowOptions validateBuildWithDefaults() { searchAttributes, typedSearchAttributes, contextPropagators, - disableEagerExecution); + disableEagerExecution, + startDelay); } } @@ -408,6 +427,8 @@ public WorkflowOptions validateBuildWithDefaults() { private final boolean disableEagerExecution; + private final Duration startDelay; + private WorkflowOptions( String workflowId, WorkflowIdReusePolicy workflowIdReusePolicy, @@ -421,7 +442,8 @@ private WorkflowOptions( Map searchAttributes, SearchAttributes typedSearchAttributes, List contextPropagators, - boolean disableEagerExecution) { + boolean disableEagerExecution, + Duration startDelay) { this.workflowId = workflowId; this.workflowIdReusePolicy = workflowIdReusePolicy; this.workflowRunTimeout = workflowRunTimeout; @@ -435,6 +457,7 @@ private WorkflowOptions( this.typedSearchAttributes = typedSearchAttributes; this.contextPropagators = contextPropagators; this.disableEagerExecution = disableEagerExecution; + this.startDelay = startDelay; } public String getWorkflowId() { @@ -498,6 +521,10 @@ public boolean isDisableEagerExecution() { return disableEagerExecution; } + public @Nullable Duration getStartDelay() { + return startDelay; + } + public Builder toBuilder() { return new Builder(this); } @@ -519,7 +546,8 @@ public boolean equals(Object o) { && Objects.equal(searchAttributes, that.searchAttributes) && Objects.equal(typedSearchAttributes, that.typedSearchAttributes) && Objects.equal(contextPropagators, that.contextPropagators) - && Objects.equal(disableEagerExecution, that.disableEagerExecution); + && Objects.equal(disableEagerExecution, that.disableEagerExecution) + && Objects.equal(startDelay, that.startDelay); } @Override @@ -537,7 +565,8 @@ public int hashCode() { searchAttributes, typedSearchAttributes, contextPropagators, - disableEagerExecution); + disableEagerExecution, + startDelay); } @Override @@ -572,6 +601,8 @@ public String toString() { + contextPropagators + ", disableEagerExecution=" + disableEagerExecution + + ", startDelay=" + + startDelay + '}'; } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/client/WorkflowClientRequestFactory.java b/temporal-sdk/src/main/java/io/temporal/internal/client/WorkflowClientRequestFactory.java index 7545790ac..d39705ce2 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/client/WorkflowClientRequestFactory.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/WorkflowClientRequestFactory.java @@ -100,6 +100,10 @@ StartWorkflowExecutionRequest.Builder newStartWorkflowExecutionRequest( request.setMemo(memo); } + if (options.getStartDelay() != null) { + request.setWorkflowStartDelay(ProtobufTimeUtils.toProtoDuration(options.getStartDelay())); + } + if (options.getSearchAttributes() != null && !options.getSearchAttributes().isEmpty()) { if (options.getTypedSearchAttributes() != null) { throw new IllegalArgumentException( @@ -170,6 +174,10 @@ SignalWithStartWorkflowExecutionRequest.Builder newSignalWithStartWorkflowExecut request.setHeader(startParameters.getHeader()); } + if (startParameters.hasWorkflowStartDelay()) { + request.setWorkflowStartDelay(startParameters.getWorkflowStartDelay()); + } + return request; } diff --git a/temporal-sdk/src/test/java/io/temporal/client/functional/SignalTest.java b/temporal-sdk/src/test/java/io/temporal/client/functional/SignalTest.java index 1bbe7ac95..75df89480 100644 --- a/temporal-sdk/src/test/java/io/temporal/client/functional/SignalTest.java +++ b/temporal-sdk/src/test/java/io/temporal/client/functional/SignalTest.java @@ -20,12 +20,23 @@ package io.temporal.client.functional; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; +import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.api.history.v1.HistoryEvent; +import io.temporal.api.history.v1.WorkflowExecutionStartedEventAttributes; import io.temporal.client.WorkflowNotFoundException; import io.temporal.client.WorkflowOptions; +import io.temporal.client.WorkflowStub; +import io.temporal.common.WorkflowExecutionHistory; +import io.temporal.internal.common.ProtobufTimeUtils; +import io.temporal.testing.internal.SDKTestOptions; import io.temporal.testing.internal.SDKTestWorkflowRule; import io.temporal.workflow.shared.TestWorkflows; +import java.time.Duration; +import java.util.List; +import java.util.stream.Collectors; import org.junit.Rule; import org.junit.Test; @@ -56,6 +67,35 @@ public void signalCompletedWorkflow() { assertThrows(WorkflowNotFoundException.class, () -> workflow.signal("some-value")); } + @Test(timeout = 50000) + public void signalWithStartWithDelay() { + WorkflowOptions workflowOptions = + SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue()).toBuilder() + .setStartDelay(Duration.ofSeconds(5)) + .build(); + WorkflowStub stubF = + testWorkflowRule + .getWorkflowClient() + .newUntypedWorkflowStub("TestSignaledWorkflow", workflowOptions); + + WorkflowExecution workflowExecution = + stubF.signalWithStart("testSignal", new Object[] {"testArg"}, new Object[] {}); + + assertEquals("done", stubF.getResult(String.class)); + WorkflowExecutionHistory workflowExecutionHistory = + testWorkflowRule.getWorkflowClient().fetchHistory(workflowExecution.getWorkflowId()); + List workflowExecutionStartedEvents = + workflowExecutionHistory.getEvents().stream() + .filter(HistoryEvent::hasWorkflowExecutionStartedEventAttributes) + .map(x -> x.getWorkflowExecutionStartedEventAttributes()) + .collect(Collectors.toList()); + assertEquals(1, workflowExecutionStartedEvents.size()); + assertEquals( + Duration.ofSeconds(5), + ProtobufTimeUtils.toJavaDuration( + workflowExecutionStartedEvents.get(0).getFirstWorkflowTaskBackoff())); + } + public static class QuickWorkflowWithSignalImpl implements TestWorkflows.TestSignaledWorkflow { @Override diff --git a/temporal-sdk/src/test/java/io/temporal/client/functional/StartTest.java b/temporal-sdk/src/test/java/io/temporal/client/functional/StartTest.java index 4f837b732..094f9799f 100644 --- a/temporal-sdk/src/test/java/io/temporal/client/functional/StartTest.java +++ b/temporal-sdk/src/test/java/io/temporal/client/functional/StartTest.java @@ -24,12 +24,20 @@ import io.temporal.api.common.v1.WorkflowExecution; import io.temporal.api.enums.v1.WorkflowIdReusePolicy; +import io.temporal.api.history.v1.HistoryEvent; +import io.temporal.api.history.v1.WorkflowExecutionStartedEventAttributes; import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowOptions; +import io.temporal.client.WorkflowStub; +import io.temporal.common.WorkflowExecutionHistory; +import io.temporal.internal.common.ProtobufTimeUtils; import io.temporal.testing.internal.SDKTestOptions; import io.temporal.testing.internal.SDKTestWorkflowRule; import io.temporal.workflow.shared.TestMultiArgWorkflowFunctions.*; +import java.time.Duration; +import java.util.List; import java.util.Optional; +import java.util.stream.Collectors; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -40,6 +48,32 @@ public class StartTest { public SDKTestWorkflowRule testWorkflowRule = SDKTestWorkflowRule.newBuilder().setWorkflowTypes(TestMultiArgWorkflowImpl.class).build(); + @Test + public void startWithDelay() { + WorkflowOptions workflowOptions = + SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue()).toBuilder() + .setStartDelay(Duration.ofSeconds(5)) + .build(); + TestNoArgsWorkflowFunc stubF = + testWorkflowRule + .getWorkflowClient() + .newWorkflowStub(TestNoArgsWorkflowFunc.class, workflowOptions); + assertResult("func", WorkflowClient.start(stubF::func)); + WorkflowExecution workflowExecution = WorkflowStub.fromTyped(stubF).getExecution(); + WorkflowExecutionHistory workflowExecutionHistory = + testWorkflowRule.getWorkflowClient().fetchHistory(workflowExecution.getWorkflowId()); + List workflowExecutionStartedEvents = + workflowExecutionHistory.getEvents().stream() + .filter(HistoryEvent::hasWorkflowExecutionStartedEventAttributes) + .map(x -> x.getWorkflowExecutionStartedEventAttributes()) + .collect(Collectors.toList()); + assertEquals(1, workflowExecutionStartedEvents.size()); + assertEquals( + Duration.ofSeconds(5), + ProtobufTimeUtils.toJavaDuration( + workflowExecutionStartedEvents.get(0).getFirstWorkflowTaskBackoff())); + } + @Test public void startNoArgFuncWithRejectDuplicate() { WorkflowOptions workflowOptions = diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java index 58e0963ec..91438fcc4 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java @@ -841,6 +841,11 @@ private static void startWorkflow( .withDescription("negative workflowTaskTimeoutSeconds") .asRuntimeException(); } + if (request.hasWorkflowStartDelay() && !request.getCronSchedule().trim().isEmpty()) { + throw Status.INVALID_ARGUMENT + .withDescription("CronSchedule and WorkflowStartDelay may not be used together.") + .asRuntimeException(); + } WorkflowExecutionStartedEventAttributes.Builder a = WorkflowExecutionStartedEventAttributes.newBuilder() @@ -860,6 +865,9 @@ private static void startWorkflow( if (data.lastCompletionResult != null) { a.setLastCompletionResult(data.lastCompletionResult); } + if (request.hasWorkflowStartDelay()) { + a.setFirstWorkflowTaskBackoff(request.getWorkflowStartDelay()); + } data.lastFailure.ifPresent(a::setContinuedFailure); if (request.hasMemo()) { a.setMemo(request.getMemo()); diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java index bb00cead2..a7c1a1f66 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java @@ -883,6 +883,10 @@ public void signalWithStartWorkflowExecution( if (r.hasSearchAttributes()) { startRequest.setSearchAttributes(r.getSearchAttributes()); } + if (r.hasWorkflowStartDelay()) { + startRequest.setWorkflowStartDelay(r.getWorkflowStartDelay()); + } + StartWorkflowExecutionResponse startResult = startWorkflowExecutionImpl( startRequest.build(),