Skip to content

Commit

Permalink
Allow specifying workflow id when starting the workflow.
Browse files Browse the repository at this point in the history
  • Loading branch information
gnagy committed Nov 26, 2023
1 parent 9cd9dca commit 2d3bc9f
Show file tree
Hide file tree
Showing 9 changed files with 161 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -179,14 +179,24 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
return Defaults.defaultValue(method.getReturnType());
}

private static void startWorkflow(WorkflowStub untyped, Object[] args) {
private static void startWorkflow(
POJOWorkflowMethodMetadata methodMetadata, WorkflowStub untyped, Object[] args) {
Optional<WorkflowOptions> options = untyped.getOptions();
if (untyped.getExecution() == null
|| (options.isPresent()
&& options.get().getWorkflowIdReusePolicy()
== WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE)) {
try {
untyped.start(args);
if (methodMetadata.getWorkflowIdParameterIndex().isPresent()) {
Object workflowId = args[methodMetadata.getWorkflowIdParameterIndex().get()];
if (workflowId == null) {
untyped.start(args);
} else {
untyped.startWithId(workflowId.toString(), args);
}
} else {
untyped.start(args);
}
} catch (WorkflowExecutionAlreadyStarted e) {
// We do allow duplicated calls if policy is not AllowDuplicate. Semantic is to wait for
// result.
Expand Down Expand Up @@ -264,7 +274,7 @@ public void invoke(
POJOWorkflowMethodMetadata methodMetadata = workflowMetadata.getMethodMetadata(method);
WorkflowMethodType type = methodMetadata.getType();
if (type == WorkflowMethodType.WORKFLOW) {
result = startWorkflow(untyped, method, args);
result = startWorkflow(methodMetadata, untyped, method, args);
} else if (type == WorkflowMethodType.QUERY) {
result = queryWorkflow(methodMetadata, untyped, method, args);
} else if (type == WorkflowMethodType.SIGNAL) {
Expand Down Expand Up @@ -318,8 +328,12 @@ private Object updateWorkflow(
}

@SuppressWarnings("FutureReturnValueIgnored")
private Object startWorkflow(WorkflowStub untyped, Method method, Object[] args) {
WorkflowInvocationHandler.startWorkflow(untyped, args);
private Object startWorkflow(
POJOWorkflowMethodMetadata methodMetadata,
WorkflowStub untyped,
Method method,
Object[] args) {
WorkflowInvocationHandler.startWorkflow(methodMetadata, untyped, args);
return untyped.getResult(method.getReturnType(), method.getGenericReturnType());
}
}
Expand All @@ -344,7 +358,8 @@ public void invoke(
throw new IllegalArgumentException(
"WorkflowClient.execute can be called only on a method annotated with @WorkflowMethod");
}
WorkflowInvocationHandler.startWorkflow(untyped, args);
WorkflowInvocationHandler.startWorkflow(
workflowMetadata.getMethodMetadata(method), untyped, args);
result = untyped.getResultAsync(method.getReturnType(), method.getGenericReturnType());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ static <T> WorkflowStub fromTyped(T typed) {

WorkflowExecution start(Object... args);

WorkflowExecution startWithId(String workflowId, Object... args);

WorkflowExecution signalWithStart(String signalName, Object[] signalArgs, Object[] startArgs);

Optional<String> getWorkflowType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,16 @@ public WorkflowExecution start(Object... args) {
return startWithOptions(WorkflowOptions.merge(null, null, options), args);
}

@Override
public WorkflowExecution startWithId(String workflowId, Object... args) {
if (options == null) {
throw new IllegalStateException("Required parameter WorkflowOptions is missing");
}
WorkflowOptions workflowOptions =
WorkflowOptions.newBuilder(options).setWorkflowId(workflowId).build();
return startWithOptions(WorkflowOptions.merge(null, null, workflowOptions), args);
}

private WorkflowExecution signalWithStartWithOptions(
WorkflowOptions options, String signalName, Object[] signalArgs, Object[] startArgs) {
checkExecutionIsNotStarted();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import com.google.common.base.Strings;
import io.temporal.workflow.*;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.util.Objects;
import java.util.Optional;
Expand All @@ -32,6 +33,8 @@ final class POJOWorkflowMethod {
private final Method method;
private final Optional<String> nameFromAnnotation;

private final Optional<Integer> workflowIdParameterIndex;

POJOWorkflowMethod(Method method) {
this.method = Objects.requireNonNull(method);
WorkflowMethod workflowMethod = method.getAnnotation(WorkflowMethod.class);
Expand Down Expand Up @@ -94,6 +97,12 @@ final class POJOWorkflowMethod {
this.nameFromAnnotation = Optional.of(name);
}
this.type = Objects.requireNonNull(type);

if (this.type == WorkflowMethodType.WORKFLOW) {
this.workflowIdParameterIndex = findWorkflowIdParameterIndex(method);
} else {
this.workflowIdParameterIndex = Optional.empty();
}
}

public WorkflowMethodType getType() {
Expand All @@ -108,6 +117,10 @@ public Optional<String> getNameFromAnnotation() {
return nameFromAnnotation;
}

public Optional<Integer> getWorkflowIdParameterIndex() {
return workflowIdParameterIndex;
}

/** Compare and hash on method only. */
@Override
public boolean equals(Object o) {
Expand All @@ -122,4 +135,16 @@ public boolean equals(Object o) {
public int hashCode() {
return com.google.common.base.Objects.hashCode(method);
}

private Optional<Integer> findWorkflowIdParameterIndex(Method method) {
Annotation[][] parameterAnnotations = method.getParameterAnnotations();
for (int i = 0; i < parameterAnnotations.length; i++) {
for (Annotation annotation : parameterAnnotations[i]) {
if (annotation.annotationType().equals(WorkflowId.class)) {
return Optional.of(i);
}
}
}
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ public Class<?> getWorkflowInterface() {
return workflowInterface;
}

public Optional<Integer> getWorkflowIdParameterIndex() {
return workflowMethod.getWorkflowIdParameterIndex();
}

/** Compare and hash based on method and the interface type only. */
@Override
public boolean equals(Object o) {
Expand Down
34 changes: 34 additions & 0 deletions temporal-sdk/src/main/java/io/temporal/workflow/WorkflowId.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.workflow;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* Indicates that the method parameter is the workflow id. If such parameter is present in
* a @WorkflowMethod then it will override the value in WorkflowOptions.
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.PARAMETER)
public @interface WorkflowId {}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowException;
import io.temporal.client.WorkflowOptions;
import io.temporal.client.WorkflowStub;
import io.temporal.failure.ActivityFailure;
import io.temporal.failure.ApplicationFailure;
import io.temporal.failure.ChildWorkflowFailure;
Expand All @@ -55,6 +56,7 @@
import io.temporal.workflow.shared.TestWorkflows.NoArgsWorkflow;
import io.temporal.workflow.shared.TestWorkflows.TestWorkflow1;
import io.temporal.workflow.shared.TestWorkflows.TestWorkflow2;
import io.temporal.workflow.shared.TestWorkflows.TestWorkflowIdArg;
import java.time.Duration;
import java.util.List;
import java.util.UUID;
Expand Down Expand Up @@ -110,6 +112,49 @@ public void testEmptyWorkflow() {
assertEquals("TestWorkflow1-input1", result);
}

@Test
public void testWorkflowStartWithId() {
Worker worker = testEnvironment.newWorker(TASK_QUEUE);
worker.registerWorkflowImplementationTypes(WorkflowIdImpl.class);
testEnvironment.start();
WorkflowClient client = testEnvironment.getWorkflowClient();
WorkflowOptions options =
WorkflowOptions.newBuilder()
.setWorkflowId("not-workflow-id")
.setTaskQueue(TASK_QUEUE)
.build();
WorkflowStub workflow =
client.newUntypedWorkflowStub(TestWorkflowIdArg.class.getSimpleName(), options);
workflow.startWithId("workflow-id", "input2");
String workflowId = workflow.getResult(String.class);
assertEquals("workflow-id", workflowId);
}

@Test
public void testWorkflowIdParameter() {
Worker worker = testEnvironment.newWorker(TASK_QUEUE);
worker.registerWorkflowImplementationTypes(WorkflowIdImpl.class);
testEnvironment.start();
WorkflowClient client = testEnvironment.getWorkflowClient();
WorkflowOptions options = WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).build();
TestWorkflowIdArg workflow = client.newWorkflowStub(TestWorkflowIdArg.class, options);
String workflowId = workflow.execute("workflow-id", "input2");
assertEquals("workflow-id", workflowId);
}

@Test
public void testWorkflowIdNullParameter() {
Worker worker = testEnvironment.newWorker(TASK_QUEUE);
worker.registerWorkflowImplementationTypes(WorkflowIdImpl.class);
testEnvironment.start();
WorkflowClient client = testEnvironment.getWorkflowClient();
WorkflowOptions options =
WorkflowOptions.newBuilder().setWorkflowId("workflow-id").setTaskQueue(TASK_QUEUE).build();
TestWorkflowIdArg workflow = client.newWorkflowStub(TestWorkflowIdArg.class, options);
String workflowId = workflow.execute(null, "input2");
assertEquals("workflow-id", workflowId);
}

@Test
public void testFailure() {
Worker worker = testEnvironment.newWorker(TASK_QUEUE);
Expand Down Expand Up @@ -424,6 +469,15 @@ public String execute(String input) {
}
}

public static class WorkflowIdImpl implements TestWorkflowIdArg {

@Override
public String execute(String workflowId, String anotherArg) {
Workflow.sleep(Duration.ofMinutes(5)); // test time skipping
return Workflow.getInfo().getWorkflowId();
}
}

public static class FailingWorkflowImpl implements TestWorkflow1 {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ public interface TestWorkflow4 {
String execute(String arg, boolean arg2);
}

@WorkflowInterface
public interface TestWorkflowIdArg {
@WorkflowMethod
String execute(@WorkflowId String arg1, String arg2);
}

@WorkflowInterface
public interface TestWorkflowWithCronSchedule {
@WorkflowMethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ public WorkflowExecution start(Object... args) {
return next.start(args);
}

@Override
public WorkflowExecution startWithId(String workflowId, Object... args) {
return next.startWithId(workflowId, args);
}

@Override
public WorkflowExecution signalWithStart(
String signalName, Object[] signalArgs, Object[] startArgs) {
Expand Down

0 comments on commit 2d3bc9f

Please sign in to comment.