Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add start delay #1897

Merged
merged 1 commit into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.base.Objects;
import io.temporal.api.enums.v1.WorkflowIdReusePolicy;
import io.temporal.common.CronSchedule;
import io.temporal.common.Experimental;
import io.temporal.common.MethodRetry;
import io.temporal.common.RetryOptions;
import io.temporal.common.SearchAttributes;
Expand Down Expand Up @@ -76,6 +77,7 @@ public static WorkflowOptions merge(
.setTypedSearchAttributes(o.getTypedSearchAttributes())
.setContextPropagators(o.getContextPropagators())
.setDisableEagerExecution(o.isDisableEagerExecution())
.setStartDelay(o.getStartDelay())
.validateBuildWithDefaults();
}

Expand Down Expand Up @@ -107,6 +109,8 @@ public static final class Builder {

private boolean disableEagerExecution = true;

private Duration startDelay;

private Builder() {}

private Builder(WorkflowOptions options) {
Expand All @@ -126,6 +130,7 @@ private Builder(WorkflowOptions options) {
this.typedSearchAttributes = options.typedSearchAttributes;
this.contextPropagators = options.contextPropagators;
this.disableEagerExecution = options.disableEagerExecution;
this.startDelay = options.startDelay;
}

/**
Expand Down Expand Up @@ -344,6 +349,18 @@ public Builder setDisableEagerExecution(boolean disableEagerExecution) {
return this;
}

/**
* Time to wait before dispatching the first workflow task. If the workflow gets a signal before
* the delay, a workflow task will be dispatched and the rest of the delay will be ignored. A
* signal from signal with start will not trigger a workflow task. Cannot be set the same time
* as a CronSchedule.
*/
@Experimental
public Builder setStartDelay(Duration startDelay) {
this.startDelay = startDelay;
return this;
}

public WorkflowOptions build() {
return new WorkflowOptions(
workflowId,
Expand All @@ -358,7 +375,8 @@ public WorkflowOptions build() {
searchAttributes,
typedSearchAttributes,
contextPropagators,
disableEagerExecution);
disableEagerExecution,
startDelay);
}

/**
Expand All @@ -378,7 +396,8 @@ public WorkflowOptions validateBuildWithDefaults() {
searchAttributes,
typedSearchAttributes,
contextPropagators,
disableEagerExecution);
disableEagerExecution,
startDelay);
}
}

Expand Down Expand Up @@ -408,6 +427,8 @@ public WorkflowOptions validateBuildWithDefaults() {

private final boolean disableEagerExecution;

private final Duration startDelay;

private WorkflowOptions(
String workflowId,
WorkflowIdReusePolicy workflowIdReusePolicy,
Expand All @@ -421,7 +442,8 @@ private WorkflowOptions(
Map<String, ?> searchAttributes,
SearchAttributes typedSearchAttributes,
List<ContextPropagator> contextPropagators,
boolean disableEagerExecution) {
boolean disableEagerExecution,
Duration startDelay) {
this.workflowId = workflowId;
this.workflowIdReusePolicy = workflowIdReusePolicy;
this.workflowRunTimeout = workflowRunTimeout;
Expand All @@ -435,6 +457,7 @@ private WorkflowOptions(
this.typedSearchAttributes = typedSearchAttributes;
this.contextPropagators = contextPropagators;
this.disableEagerExecution = disableEagerExecution;
this.startDelay = startDelay;
}

public String getWorkflowId() {
Expand Down Expand Up @@ -498,6 +521,10 @@ public boolean isDisableEagerExecution() {
return disableEagerExecution;
}

public @Nullable Duration getStartDelay() {
return startDelay;
}

public Builder toBuilder() {
return new Builder(this);
}
Expand All @@ -519,7 +546,8 @@ public boolean equals(Object o) {
&& Objects.equal(searchAttributes, that.searchAttributes)
&& Objects.equal(typedSearchAttributes, that.typedSearchAttributes)
&& Objects.equal(contextPropagators, that.contextPropagators)
&& Objects.equal(disableEagerExecution, that.disableEagerExecution);
&& Objects.equal(disableEagerExecution, that.disableEagerExecution)
&& Objects.equal(startDelay, that.startDelay);
}

@Override
Expand All @@ -537,7 +565,8 @@ public int hashCode() {
searchAttributes,
typedSearchAttributes,
contextPropagators,
disableEagerExecution);
disableEagerExecution,
startDelay);
}

@Override
Expand Down Expand Up @@ -572,6 +601,8 @@ public String toString() {
+ contextPropagators
+ ", disableEagerExecution="
+ disableEagerExecution
+ ", startDelay="
+ startDelay
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ StartWorkflowExecutionRequest.Builder newStartWorkflowExecutionRequest(
request.setMemo(memo);
}

if (options.getStartDelay() != null) {
request.setWorkflowStartDelay(ProtobufTimeUtils.toProtoDuration(options.getStartDelay()));
}

if (options.getSearchAttributes() != null && !options.getSearchAttributes().isEmpty()) {
if (options.getTypedSearchAttributes() != null) {
throw new IllegalArgumentException(
Expand Down Expand Up @@ -170,6 +174,10 @@ SignalWithStartWorkflowExecutionRequest.Builder newSignalWithStartWorkflowExecut
request.setHeader(startParameters.getHeader());
}

if (startParameters.hasWorkflowStartDelay()) {
request.setWorkflowStartDelay(startParameters.getWorkflowStartDelay());
}

return request;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,23 @@

package io.temporal.client.functional;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;

import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.history.v1.HistoryEvent;
import io.temporal.api.history.v1.WorkflowExecutionStartedEventAttributes;
import io.temporal.client.WorkflowNotFoundException;
import io.temporal.client.WorkflowOptions;
import io.temporal.client.WorkflowStub;
import io.temporal.common.WorkflowExecutionHistory;
import io.temporal.internal.common.ProtobufTimeUtils;
import io.temporal.testing.internal.SDKTestOptions;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.workflow.shared.TestWorkflows;
import java.time.Duration;
import java.util.List;
import java.util.stream.Collectors;
import org.junit.Rule;
import org.junit.Test;

Expand Down Expand Up @@ -56,6 +67,35 @@ public void signalCompletedWorkflow() {
assertThrows(WorkflowNotFoundException.class, () -> workflow.signal("some-value"));
}

@Test(timeout = 50000)
public void signalWithStartWithDelay() {
WorkflowOptions workflowOptions =
SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue()).toBuilder()
.setStartDelay(Duration.ofSeconds(5))
.build();
WorkflowStub stubF =
testWorkflowRule
.getWorkflowClient()
.newUntypedWorkflowStub("TestSignaledWorkflow", workflowOptions);

WorkflowExecution workflowExecution =
stubF.signalWithStart("testSignal", new Object[] {"testArg"}, new Object[] {});

assertEquals("done", stubF.getResult(String.class));
WorkflowExecutionHistory workflowExecutionHistory =
testWorkflowRule.getWorkflowClient().fetchHistory(workflowExecution.getWorkflowId());
List<WorkflowExecutionStartedEventAttributes> workflowExecutionStartedEvents =
workflowExecutionHistory.getEvents().stream()
.filter(HistoryEvent::hasWorkflowExecutionStartedEventAttributes)
.map(x -> x.getWorkflowExecutionStartedEventAttributes())
.collect(Collectors.toList());
assertEquals(1, workflowExecutionStartedEvents.size());
assertEquals(
Duration.ofSeconds(5),
ProtobufTimeUtils.toJavaDuration(
workflowExecutionStartedEvents.get(0).getFirstWorkflowTaskBackoff()));
}

public static class QuickWorkflowWithSignalImpl implements TestWorkflows.TestSignaledWorkflow {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,20 @@

import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.enums.v1.WorkflowIdReusePolicy;
import io.temporal.api.history.v1.HistoryEvent;
import io.temporal.api.history.v1.WorkflowExecutionStartedEventAttributes;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowOptions;
import io.temporal.client.WorkflowStub;
import io.temporal.common.WorkflowExecutionHistory;
import io.temporal.internal.common.ProtobufTimeUtils;
import io.temporal.testing.internal.SDKTestOptions;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.workflow.shared.TestMultiArgWorkflowFunctions.*;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
Expand All @@ -40,6 +48,32 @@ public class StartTest {
public SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder().setWorkflowTypes(TestMultiArgWorkflowImpl.class).build();

@Test
public void startWithDelay() {
WorkflowOptions workflowOptions =
SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue()).toBuilder()
.setStartDelay(Duration.ofSeconds(5))
.build();
TestNoArgsWorkflowFunc stubF =
testWorkflowRule
.getWorkflowClient()
.newWorkflowStub(TestNoArgsWorkflowFunc.class, workflowOptions);
assertResult("func", WorkflowClient.start(stubF::func));
WorkflowExecution workflowExecution = WorkflowStub.fromTyped(stubF).getExecution();
WorkflowExecutionHistory workflowExecutionHistory =
testWorkflowRule.getWorkflowClient().fetchHistory(workflowExecution.getWorkflowId());
List<WorkflowExecutionStartedEventAttributes> workflowExecutionStartedEvents =
workflowExecutionHistory.getEvents().stream()
.filter(HistoryEvent::hasWorkflowExecutionStartedEventAttributes)
.map(x -> x.getWorkflowExecutionStartedEventAttributes())
.collect(Collectors.toList());
assertEquals(1, workflowExecutionStartedEvents.size());
assertEquals(
Duration.ofSeconds(5),
ProtobufTimeUtils.toJavaDuration(
workflowExecutionStartedEvents.get(0).getFirstWorkflowTaskBackoff()));
}

@Test
public void startNoArgFuncWithRejectDuplicate() {
WorkflowOptions workflowOptions =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -841,6 +841,11 @@ private static void startWorkflow(
.withDescription("negative workflowTaskTimeoutSeconds")
.asRuntimeException();
}
if (request.hasWorkflowStartDelay() && !request.getCronSchedule().trim().isEmpty()) {
throw Status.INVALID_ARGUMENT
.withDescription("CronSchedule and WorkflowStartDelay may not be used together.")
.asRuntimeException();
}

WorkflowExecutionStartedEventAttributes.Builder a =
WorkflowExecutionStartedEventAttributes.newBuilder()
Expand All @@ -860,6 +865,9 @@ private static void startWorkflow(
if (data.lastCompletionResult != null) {
a.setLastCompletionResult(data.lastCompletionResult);
}
if (request.hasWorkflowStartDelay()) {
a.setFirstWorkflowTaskBackoff(request.getWorkflowStartDelay());
}
data.lastFailure.ifPresent(a::setContinuedFailure);
if (request.hasMemo()) {
a.setMemo(request.getMemo());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -883,6 +883,10 @@ public void signalWithStartWorkflowExecution(
if (r.hasSearchAttributes()) {
startRequest.setSearchAttributes(r.getSearchAttributes());
}
if (r.hasWorkflowStartDelay()) {
startRequest.setWorkflowStartDelay(r.getWorkflowStartDelay());
}

StartWorkflowExecutionResponse startResult =
startWorkflowExecutionImpl(
startRequest.build(),
Expand Down
Loading