From 2d3bc9f64ae3dffbc39c8752968acbcd79abf592 Mon Sep 17 00:00:00 2001 From: Gergely Nagy Date: Sun, 26 Nov 2023 19:02:31 +0100 Subject: [PATCH] Allow specifying workflow id when starting the workflow. --- .../client/WorkflowInvocationHandler.java | 27 +++++++--- .../java/io/temporal/client/WorkflowStub.java | 2 + .../io/temporal/client/WorkflowStubImpl.java | 10 ++++ .../common/metadata/POJOWorkflowMethod.java | 25 +++++++++ .../metadata/POJOWorkflowMethodMetadata.java | 4 ++ .../java/io/temporal/workflow/WorkflowId.java | 34 ++++++++++++ .../internal/testing/WorkflowTestingTest.java | 54 +++++++++++++++++++ .../workflow/shared/TestWorkflows.java | 6 +++ .../testing/TimeLockingInterceptor.java | 5 ++ 9 files changed, 161 insertions(+), 6 deletions(-) create mode 100644 temporal-sdk/src/main/java/io/temporal/workflow/WorkflowId.java diff --git a/temporal-sdk/src/main/java/io/temporal/client/WorkflowInvocationHandler.java b/temporal-sdk/src/main/java/io/temporal/client/WorkflowInvocationHandler.java index a9c001f49..a3e935b59 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/WorkflowInvocationHandler.java +++ b/temporal-sdk/src/main/java/io/temporal/client/WorkflowInvocationHandler.java @@ -179,14 +179,24 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl return Defaults.defaultValue(method.getReturnType()); } - private static void startWorkflow(WorkflowStub untyped, Object[] args) { + private static void startWorkflow( + POJOWorkflowMethodMetadata methodMetadata, WorkflowStub untyped, Object[] args) { Optional options = untyped.getOptions(); if (untyped.getExecution() == null || (options.isPresent() && options.get().getWorkflowIdReusePolicy() == WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE)) { try { - untyped.start(args); + if (methodMetadata.getWorkflowIdParameterIndex().isPresent()) { + Object workflowId = args[methodMetadata.getWorkflowIdParameterIndex().get()]; + if (workflowId == null) { + untyped.start(args); + } else { + untyped.startWithId(workflowId.toString(), args); + } + } else { + untyped.start(args); + } } catch (WorkflowExecutionAlreadyStarted e) { // We do allow duplicated calls if policy is not AllowDuplicate. Semantic is to wait for // result. @@ -264,7 +274,7 @@ public void invoke( POJOWorkflowMethodMetadata methodMetadata = workflowMetadata.getMethodMetadata(method); WorkflowMethodType type = methodMetadata.getType(); if (type == WorkflowMethodType.WORKFLOW) { - result = startWorkflow(untyped, method, args); + result = startWorkflow(methodMetadata, untyped, method, args); } else if (type == WorkflowMethodType.QUERY) { result = queryWorkflow(methodMetadata, untyped, method, args); } else if (type == WorkflowMethodType.SIGNAL) { @@ -318,8 +328,12 @@ private Object updateWorkflow( } @SuppressWarnings("FutureReturnValueIgnored") - private Object startWorkflow(WorkflowStub untyped, Method method, Object[] args) { - WorkflowInvocationHandler.startWorkflow(untyped, args); + private Object startWorkflow( + POJOWorkflowMethodMetadata methodMetadata, + WorkflowStub untyped, + Method method, + Object[] args) { + WorkflowInvocationHandler.startWorkflow(methodMetadata, untyped, args); return untyped.getResult(method.getReturnType(), method.getGenericReturnType()); } } @@ -344,7 +358,8 @@ public void invoke( throw new IllegalArgumentException( "WorkflowClient.execute can be called only on a method annotated with @WorkflowMethod"); } - WorkflowInvocationHandler.startWorkflow(untyped, args); + WorkflowInvocationHandler.startWorkflow( + workflowMetadata.getMethodMetadata(method), untyped, args); result = untyped.getResultAsync(method.getReturnType(), method.getGenericReturnType()); } diff --git a/temporal-sdk/src/main/java/io/temporal/client/WorkflowStub.java b/temporal-sdk/src/main/java/io/temporal/client/WorkflowStub.java index 8d37a81e4..e843f8eef 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/WorkflowStub.java +++ b/temporal-sdk/src/main/java/io/temporal/client/WorkflowStub.java @@ -144,6 +144,8 @@ static WorkflowStub fromTyped(T typed) { WorkflowExecution start(Object... args); + WorkflowExecution startWithId(String workflowId, Object... args); + WorkflowExecution signalWithStart(String signalName, Object[] signalArgs, Object[] startArgs); Optional getWorkflowType(); diff --git a/temporal-sdk/src/main/java/io/temporal/client/WorkflowStubImpl.java b/temporal-sdk/src/main/java/io/temporal/client/WorkflowStubImpl.java index 9cd54b6c9..5f5808a1d 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/WorkflowStubImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/client/WorkflowStubImpl.java @@ -129,6 +129,16 @@ public WorkflowExecution start(Object... args) { return startWithOptions(WorkflowOptions.merge(null, null, options), args); } + @Override + public WorkflowExecution startWithId(String workflowId, Object... args) { + if (options == null) { + throw new IllegalStateException("Required parameter WorkflowOptions is missing"); + } + WorkflowOptions workflowOptions = + WorkflowOptions.newBuilder(options).setWorkflowId(workflowId).build(); + return startWithOptions(WorkflowOptions.merge(null, null, workflowOptions), args); + } + private WorkflowExecution signalWithStartWithOptions( WorkflowOptions options, String signalName, Object[] signalArgs, Object[] startArgs) { checkExecutionIsNotStarted(); diff --git a/temporal-sdk/src/main/java/io/temporal/common/metadata/POJOWorkflowMethod.java b/temporal-sdk/src/main/java/io/temporal/common/metadata/POJOWorkflowMethod.java index f6cfba9af..ae5d443eb 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/metadata/POJOWorkflowMethod.java +++ b/temporal-sdk/src/main/java/io/temporal/common/metadata/POJOWorkflowMethod.java @@ -22,6 +22,7 @@ import com.google.common.base.Strings; import io.temporal.workflow.*; +import java.lang.annotation.Annotation; import java.lang.reflect.Method; import java.util.Objects; import java.util.Optional; @@ -32,6 +33,8 @@ final class POJOWorkflowMethod { private final Method method; private final Optional nameFromAnnotation; + private final Optional workflowIdParameterIndex; + POJOWorkflowMethod(Method method) { this.method = Objects.requireNonNull(method); WorkflowMethod workflowMethod = method.getAnnotation(WorkflowMethod.class); @@ -94,6 +97,12 @@ final class POJOWorkflowMethod { this.nameFromAnnotation = Optional.of(name); } this.type = Objects.requireNonNull(type); + + if (this.type == WorkflowMethodType.WORKFLOW) { + this.workflowIdParameterIndex = findWorkflowIdParameterIndex(method); + } else { + this.workflowIdParameterIndex = Optional.empty(); + } } public WorkflowMethodType getType() { @@ -108,6 +117,10 @@ public Optional getNameFromAnnotation() { return nameFromAnnotation; } + public Optional getWorkflowIdParameterIndex() { + return workflowIdParameterIndex; + } + /** Compare and hash on method only. */ @Override public boolean equals(Object o) { @@ -122,4 +135,16 @@ public boolean equals(Object o) { public int hashCode() { return com.google.common.base.Objects.hashCode(method); } + + private Optional findWorkflowIdParameterIndex(Method method) { + Annotation[][] parameterAnnotations = method.getParameterAnnotations(); + for (int i = 0; i < parameterAnnotations.length; i++) { + for (Annotation annotation : parameterAnnotations[i]) { + if (annotation.annotationType().equals(WorkflowId.class)) { + return Optional.of(i); + } + } + } + return Optional.empty(); + } } diff --git a/temporal-sdk/src/main/java/io/temporal/common/metadata/POJOWorkflowMethodMetadata.java b/temporal-sdk/src/main/java/io/temporal/common/metadata/POJOWorkflowMethodMetadata.java index a412a444f..b846e1c1d 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/metadata/POJOWorkflowMethodMetadata.java +++ b/temporal-sdk/src/main/java/io/temporal/common/metadata/POJOWorkflowMethodMetadata.java @@ -70,6 +70,10 @@ public Class getWorkflowInterface() { return workflowInterface; } + public Optional getWorkflowIdParameterIndex() { + return workflowMethod.getWorkflowIdParameterIndex(); + } + /** Compare and hash based on method and the interface type only. */ @Override public boolean equals(Object o) { diff --git a/temporal-sdk/src/main/java/io/temporal/workflow/WorkflowId.java b/temporal-sdk/src/main/java/io/temporal/workflow/WorkflowId.java new file mode 100644 index 000000000..872114aac --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/workflow/WorkflowId.java @@ -0,0 +1,34 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.workflow; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Indicates that the method parameter is the workflow id. If such parameter is present in + * a @WorkflowMethod then it will override the value in WorkflowOptions. + */ +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.PARAMETER) +public @interface WorkflowId {} diff --git a/temporal-sdk/src/test/java/io/temporal/internal/testing/WorkflowTestingTest.java b/temporal-sdk/src/test/java/io/temporal/internal/testing/WorkflowTestingTest.java index 3eac25eb2..e25d20caf 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/testing/WorkflowTestingTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/testing/WorkflowTestingTest.java @@ -38,6 +38,7 @@ import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowException; import io.temporal.client.WorkflowOptions; +import io.temporal.client.WorkflowStub; import io.temporal.failure.ActivityFailure; import io.temporal.failure.ApplicationFailure; import io.temporal.failure.ChildWorkflowFailure; @@ -55,6 +56,7 @@ import io.temporal.workflow.shared.TestWorkflows.NoArgsWorkflow; import io.temporal.workflow.shared.TestWorkflows.TestWorkflow1; import io.temporal.workflow.shared.TestWorkflows.TestWorkflow2; +import io.temporal.workflow.shared.TestWorkflows.TestWorkflowIdArg; import java.time.Duration; import java.util.List; import java.util.UUID; @@ -110,6 +112,49 @@ public void testEmptyWorkflow() { assertEquals("TestWorkflow1-input1", result); } + @Test + public void testWorkflowStartWithId() { + Worker worker = testEnvironment.newWorker(TASK_QUEUE); + worker.registerWorkflowImplementationTypes(WorkflowIdImpl.class); + testEnvironment.start(); + WorkflowClient client = testEnvironment.getWorkflowClient(); + WorkflowOptions options = + WorkflowOptions.newBuilder() + .setWorkflowId("not-workflow-id") + .setTaskQueue(TASK_QUEUE) + .build(); + WorkflowStub workflow = + client.newUntypedWorkflowStub(TestWorkflowIdArg.class.getSimpleName(), options); + workflow.startWithId("workflow-id", "input2"); + String workflowId = workflow.getResult(String.class); + assertEquals("workflow-id", workflowId); + } + + @Test + public void testWorkflowIdParameter() { + Worker worker = testEnvironment.newWorker(TASK_QUEUE); + worker.registerWorkflowImplementationTypes(WorkflowIdImpl.class); + testEnvironment.start(); + WorkflowClient client = testEnvironment.getWorkflowClient(); + WorkflowOptions options = WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).build(); + TestWorkflowIdArg workflow = client.newWorkflowStub(TestWorkflowIdArg.class, options); + String workflowId = workflow.execute("workflow-id", "input2"); + assertEquals("workflow-id", workflowId); + } + + @Test + public void testWorkflowIdNullParameter() { + Worker worker = testEnvironment.newWorker(TASK_QUEUE); + worker.registerWorkflowImplementationTypes(WorkflowIdImpl.class); + testEnvironment.start(); + WorkflowClient client = testEnvironment.getWorkflowClient(); + WorkflowOptions options = + WorkflowOptions.newBuilder().setWorkflowId("workflow-id").setTaskQueue(TASK_QUEUE).build(); + TestWorkflowIdArg workflow = client.newWorkflowStub(TestWorkflowIdArg.class, options); + String workflowId = workflow.execute(null, "input2"); + assertEquals("workflow-id", workflowId); + } + @Test public void testFailure() { Worker worker = testEnvironment.newWorker(TASK_QUEUE); @@ -424,6 +469,15 @@ public String execute(String input) { } } + public static class WorkflowIdImpl implements TestWorkflowIdArg { + + @Override + public String execute(String workflowId, String anotherArg) { + Workflow.sleep(Duration.ofMinutes(5)); // test time skipping + return Workflow.getInfo().getWorkflowId(); + } + } + public static class FailingWorkflowImpl implements TestWorkflow1 { @Override diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/shared/TestWorkflows.java b/temporal-sdk/src/test/java/io/temporal/workflow/shared/TestWorkflows.java index 3d9688c98..ac6621be5 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/shared/TestWorkflows.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/shared/TestWorkflows.java @@ -94,6 +94,12 @@ public interface TestWorkflow4 { String execute(String arg, boolean arg2); } + @WorkflowInterface + public interface TestWorkflowIdArg { + @WorkflowMethod + String execute(@WorkflowId String arg1, String arg2); + } + @WorkflowInterface public interface TestWorkflowWithCronSchedule { @WorkflowMethod diff --git a/temporal-testing/src/main/java/io/temporal/testing/TimeLockingInterceptor.java b/temporal-testing/src/main/java/io/temporal/testing/TimeLockingInterceptor.java index a75a72883..7292483f6 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/TimeLockingInterceptor.java +++ b/temporal-testing/src/main/java/io/temporal/testing/TimeLockingInterceptor.java @@ -77,6 +77,11 @@ public WorkflowExecution start(Object... args) { return next.start(args); } + @Override + public WorkflowExecution startWithId(String workflowId, Object... args) { + return next.startWithId(workflowId, args); + } + @Override public WorkflowExecution signalWithStart( String signalName, Object[] signalArgs, Object[] startArgs) {