From 538508be0dd2123c743a4d954fda03932bef5daa Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Mon, 16 Oct 2023 08:47:58 -0700 Subject: [PATCH] Apply data converter context in more places (#1896) Add data converter context to memo, lastFailure and schedules --- .../client/WorkflowExecutionMetadata.java | 7 +- .../client/RootScheduleClientInvoker.java | 1 + .../client/RootWorkflowClientInvoker.java | 4 +- .../internal/client/ScheduleProtoUtil.java | 24 +++- .../internal/sync/SyncWorkflowContext.java | 4 + .../internal/sync/WorkflowInternal.java | 12 +- .../HasWorkflowSerializationContext.java | 2 + .../WorkflowIdSignedPayloadsTest.java | 110 ++++++++++++++++-- 8 files changed, 141 insertions(+), 23 deletions(-) diff --git a/temporal-sdk/src/main/java/io/temporal/client/WorkflowExecutionMetadata.java b/temporal-sdk/src/main/java/io/temporal/client/WorkflowExecutionMetadata.java index 7d896b82a..a5587af79 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/WorkflowExecutionMetadata.java +++ b/temporal-sdk/src/main/java/io/temporal/client/WorkflowExecutionMetadata.java @@ -29,6 +29,7 @@ import io.temporal.common.converter.DataConverter; import io.temporal.internal.common.ProtobufTimeUtils; import io.temporal.internal.common.SearchAttributesUtil; +import io.temporal.payload.context.WorkflowSerializationContext; import java.lang.reflect.Type; import java.time.Instant; import java.util.Collections; @@ -123,7 +124,11 @@ public T getMemo(String key, Class valueClass, Type genericType) { if (memo == null) { return null; } - return dataConverter.fromPayload(memo, valueClass, genericType); + return dataConverter + .withContext( + new WorkflowSerializationContext( + info.getParentNamespaceId(), info.getExecution().getWorkflowId())) + .fromPayload(memo, valueClass, genericType); } @Nonnull diff --git a/temporal-sdk/src/main/java/io/temporal/internal/client/RootScheduleClientInvoker.java b/temporal-sdk/src/main/java/io/temporal/internal/client/RootScheduleClientInvoker.java index 9f114dad7..2f31d8361 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/client/RootScheduleClientInvoker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/RootScheduleClientInvoker.java @@ -68,6 +68,7 @@ public void createSchedule(CreateScheduleInput input) { .setSchedule(scheduleRequestHeader.scheduleToProto(input.getSchedule())); if (input.getOptions().getMemo() != null) { + // TODO we don't have a workflow context here, maybe we need a schedule context? request.setMemo( Memo.newBuilder() .putAllFields( diff --git a/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java b/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java index 38763bba1..e55778d6a 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java @@ -83,7 +83,7 @@ public WorkflowStartOutput start(WorkflowStartInput input) { (input.getOptions().getMemo() != null) ? Memo.newBuilder() .putAllFields( - intoPayloadMap(clientOptions.getDataConverter(), input.getOptions().getMemo())) + intoPayloadMap(dataConverterWithWorkflowContext, input.getOptions().getMemo())) .build() : null; @@ -169,7 +169,7 @@ public WorkflowSignalWithStartOutput signalWithStart(WorkflowSignalWithStartInpu ? Memo.newBuilder() .putAllFields( intoPayloadMap( - clientOptions.getDataConverter(), + dataConverterWithWorkflowContext, workflowStartInput.getOptions().getMemo())) .build() : null; diff --git a/temporal-sdk/src/main/java/io/temporal/internal/client/ScheduleProtoUtil.java b/temporal-sdk/src/main/java/io/temporal/internal/client/ScheduleProtoUtil.java index ff8a0dde2..364453d90 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/client/ScheduleProtoUtil.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/ScheduleProtoUtil.java @@ -37,11 +37,13 @@ import io.temporal.client.WorkflowOptions; import io.temporal.client.schedules.*; import io.temporal.common.context.ContextPropagator; +import io.temporal.common.converter.DataConverter; import io.temporal.common.converter.EncodedValues; import io.temporal.internal.client.external.GenericWorkflowClient; import io.temporal.internal.common.ProtobufTimeUtils; import io.temporal.internal.common.RetryOptionsUtils; import io.temporal.internal.common.SearchAttributesUtil; +import io.temporal.payload.context.WorkflowSerializationContext; import java.time.Instant; import java.util.*; import java.util.stream.Collectors; @@ -82,6 +84,13 @@ private io.temporal.common.interceptors.Header extractContextsAndConvertToBytes( public ScheduleAction actionToProto(io.temporal.client.schedules.ScheduleAction action) { if (action instanceof ScheduleActionStartWorkflow) { ScheduleActionStartWorkflow startWorkflowAction = (ScheduleActionStartWorkflow) action; + DataConverter dataConverterWithWorkflowContext = + clientOptions + .getDataConverter() + .withContext( + new WorkflowSerializationContext( + clientOptions.getNamespace(), + startWorkflowAction.getOptions().getWorkflowId())); WorkflowOptions wfOptions = startWorkflowAction.getOptions(); // Disallow some options @@ -113,7 +122,7 @@ public ScheduleAction actionToProto(io.temporal.client.schedules.ScheduleAction ProtobufTimeUtils.toProtoDuration(wfOptions.getWorkflowTaskTimeout())) .setTaskQueue(TaskQueue.newBuilder().setName(wfOptions.getTaskQueue()).build()); - startWorkflowAction.getArguments().setDataConverter(clientOptions.getDataConverter()); + startWorkflowAction.getArguments().setDataConverter(dataConverterWithWorkflowContext); Optional inputArgs = startWorkflowAction.getArguments().toPayloads(); if (inputArgs.isPresent()) { workflowRequest.setInput(inputArgs.get()); @@ -128,7 +137,7 @@ public ScheduleAction actionToProto(io.temporal.client.schedules.ScheduleAction item.getKey(), ((EncodedValues) item.getValue()).toPayloads().get().getPayloads(0)); } else { memo.put( - item.getKey(), clientOptions.getDataConverter().toPayload(item.getValue()).get()); + item.getKey(), dataConverterWithWorkflowContext.toPayload(item.getValue()).get()); } } workflowRequest.setMemo(Memo.newBuilder().putAllFields(memo).build()); @@ -388,12 +397,19 @@ public io.temporal.client.schedules.ScheduleAction protoToAction(@Nonnull Schedu Objects.requireNonNull(action); if (action.hasStartWorkflow()) { NewWorkflowExecutionInfo startWfAction = action.getStartWorkflow(); + DataConverter dataConverterWithWorkflowContext = + clientOptions + .getDataConverter() + .withContext( + new WorkflowSerializationContext( + clientOptions.getNamespace(), startWfAction.getWorkflowId())); + ScheduleActionStartWorkflow.Builder builder = ScheduleActionStartWorkflow.newBuilder(); builder.setWorkflowType(startWfAction.getWorkflowType().getName()); builder.setRawArguments( new EncodedValues( - Optional.of(startWfAction.getInput()), clientOptions.getDataConverter())); + Optional.of(startWfAction.getInput()), dataConverterWithWorkflowContext)); WorkflowOptions.Builder wfOptionsBuilder = WorkflowOptions.newBuilder(); // set required options @@ -420,7 +436,7 @@ public io.temporal.client.schedules.ScheduleAction protoToAction(@Nonnull Schedu EncodedValues encodedMemo = new EncodedValues( Optional.of(Payloads.newBuilder().addPayloads(memo.getValue()).build()), - clientOptions.getDataConverter()); + dataConverterWithWorkflowContext); memos.put(memo.getKey(), encodedMemo); } wfOptionsBuilder.setMemo(memos); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java index 02dc8a42b..986beb704 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java @@ -1008,6 +1008,10 @@ public DataConverter getDataConverter() { return dataConverter; } + public DataConverter getDataConverterWithCurrentWorkflowContext() { + return dataConverterWithCurrentWorkflowContext; + } + boolean isReplaying() { return replayContext.isReplaying(); } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java index 9750473fd..6be76823c 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java @@ -555,7 +555,7 @@ public static T getMemo(String key, Class valueClass, Type genericType) { return null; } - return getDataConverter().fromPayload(memo, valueClass, genericType); + return getDataConverterWithCurrentWorkflowContext().fromPayload(memo, valueClass, genericType); } public static R retry( @@ -693,13 +693,13 @@ public static io.temporal.common.SearchAttributes getTypedSearchAttributes() { } public static void upsertSearchAttributes(Map searchAttributes) { - assertNotReadOnly("upset search attribute"); + assertNotReadOnly("upsert search attribute"); getWorkflowOutboundInterceptor().upsertSearchAttributes(searchAttributes); } public static void upsertTypedSearchAttributes( SearchAttributeUpdate... searchAttributeUpdates) { - assertNotReadOnly("upset search attribute"); + assertNotReadOnly("upsert search attribute"); getWorkflowOutboundInterceptor().upsertTypedSearchAttributes(searchAttributeUpdates); } @@ -707,6 +707,10 @@ public static DataConverter getDataConverter() { return getRootWorkflowContext().getDataConverter(); } + static DataConverter getDataConverterWithCurrentWorkflowContext() { + return getRootWorkflowContext().getDataConverterWithCurrentWorkflowContext(); + } + /** * Name of the workflow type the interface defines. It is either the interface short name * or * value of {@link WorkflowMethod#name()} parameter. @@ -723,7 +727,7 @@ public static Optional getPreviousRunFailure() { return Optional.ofNullable(getRootWorkflowContext().getReplayContext().getPreviousRunFailure()) // Temporal Failure Values are additional user payload and serialized using user data // converter - .map(f -> getDataConverter().failureToException(f)); + .map(f -> getDataConverterWithCurrentWorkflowContext().failureToException(f)); } private static WorkflowOutboundCallsInterceptor getWorkflowOutboundInterceptor() { diff --git a/temporal-sdk/src/main/java/io/temporal/payload/context/HasWorkflowSerializationContext.java b/temporal-sdk/src/main/java/io/temporal/payload/context/HasWorkflowSerializationContext.java index 810d97fec..11aee2da6 100644 --- a/temporal-sdk/src/main/java/io/temporal/payload/context/HasWorkflowSerializationContext.java +++ b/temporal-sdk/src/main/java/io/temporal/payload/context/HasWorkflowSerializationContext.java @@ -41,6 +41,8 @@ public interface HasWorkflowSerializationContext extends SerializationContext { * @return workflowId of the Workflow Execution the Serialization Target belongs to. If the Target * is a Workflow itself, this method will return the Target's Workflow ID (not the ID of the * parent workflow). + *

WARNING: When used in the context of a schedule workflow the workflowId may differ on + * serialization and deserialization. */ String getWorkflowId(); } diff --git a/temporal-sdk/src/test/java/io/temporal/functional/serialization/WorkflowIdSignedPayloadsTest.java b/temporal-sdk/src/test/java/io/temporal/functional/serialization/WorkflowIdSignedPayloadsTest.java index 7b9147acc..3af2f1f3c 100644 --- a/temporal-sdk/src/test/java/io/temporal/functional/serialization/WorkflowIdSignedPayloadsTest.java +++ b/temporal-sdk/src/test/java/io/temporal/functional/serialization/WorkflowIdSignedPayloadsTest.java @@ -21,34 +21,39 @@ package io.temporal.functional.serialization; import static org.junit.Assert.*; +import static org.junit.Assume.assumeFalse; +import static org.junit.Assume.assumeTrue; +import com.google.common.collect.ImmutableMap; import com.google.protobuf.ByteString; import io.temporal.activity.*; import io.temporal.api.common.v1.Payload; -import io.temporal.client.WorkflowClientOptions; -import io.temporal.common.converter.CodecDataConverter; -import io.temporal.common.converter.DataConverter; -import io.temporal.common.converter.DefaultDataConverter; -import io.temporal.common.converter.EncodingKeys; +import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.client.*; +import io.temporal.client.schedules.*; +import io.temporal.common.converter.*; +import io.temporal.failure.CanceledFailure; import io.temporal.payload.codec.PayloadCodec; import io.temporal.payload.codec.PayloadCodecException; import io.temporal.payload.context.ActivitySerializationContext; import io.temporal.payload.context.HasWorkflowSerializationContext; import io.temporal.payload.context.SerializationContext; +import io.temporal.testing.internal.SDKTestOptions; import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.ChildWorkflowOptions; +import io.temporal.workflow.ContinueAsNewOptions; import io.temporal.workflow.Workflow; +import io.temporal.workflow.shared.TestWorkflowWithCronScheduleImpl; import io.temporal.workflow.shared.TestWorkflows; import java.io.IOException; import java.time.Duration; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Optional; +import java.util.*; import java.util.stream.Collectors; import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TestName; /** * This test emulates a scenario when users may be using WorkflowId in their encoding to sign every @@ -58,6 +63,9 @@ * explode on decoding. */ public class WorkflowIdSignedPayloadsTest { + private static final String MEMO_KEY = "testKey"; + private static final String MEMO_VALUE = "testValue"; + private static final Map MEMO = ImmutableMap.of(MEMO_KEY, MEMO_VALUE); private final SimpleActivity heartbeatingActivity = new HeartbeatingIfNotLocalActivityImpl(); private final ManualCompletionActivity manualCompletionActivity = new ManualCompletionActivityImpl(); @@ -70,12 +78,15 @@ public class WorkflowIdSignedPayloadsTest { @Rule public SDKTestWorkflowRule testWorkflowRule = SDKTestWorkflowRule.newBuilder() - .setWorkflowTypes(SimpleWorkflowWithAnActivity.class) + .setWorkflowTypes( + SimpleWorkflowWithAnActivity.class, TestWorkflowWithCronScheduleImpl.class) .setWorkflowClientOptions( WorkflowClientOptions.newBuilder().setDataConverter(codecDataConverter).build()) .setActivityImplementations(heartbeatingActivity, manualCompletionActivity) .build(); + @Rule public TestName testName = new TestName(); + @Test public void testSimpleWorkflowWithAnActivity() { TestWorkflows.TestWorkflow1 workflowStub = @@ -83,6 +94,74 @@ public void testSimpleWorkflowWithAnActivity() { assertEquals("result", workflowStub.execute("input")); } + @Test + public void testSimpleWorkflowWithMemo() throws InterruptedException { + assumeTrue( + "skipping as test server does not support list", SDKTestWorkflowRule.useExternalService); + + WorkflowOptions options = + SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue()); + options = WorkflowOptions.newBuilder(options).setMemo(MEMO).build(); + TestWorkflows.TestWorkflow1 workflowStub = + testWorkflowRule + .getWorkflowClient() + .newWorkflowStub(TestWorkflows.TestWorkflow1.class, options); + assertEquals("result", workflowStub.execute("input")); + WorkflowExecution execution = WorkflowStub.fromTyped(workflowStub).getExecution(); + String workflowId = execution.getWorkflowId(); + String runId = execution.getRunId(); + + // listWorkflowExecutions is Visibility API + // Temporal Visibility has latency and is not transactional with the Server API call + Thread.sleep(4_000); + + List executions = + testWorkflowRule + .getWorkflowClient() + .listExecutions("WorkflowId = '" + workflowId + "' AND " + " RunId = '" + runId + "'") + .collect(Collectors.toList()); + assertEquals(1, executions.size()); + assertEquals(MEMO_VALUE, executions.get(0).getMemo(MEMO_KEY, String.class)); + } + + @Test + public void testSimpleCronWorkflow() { + assumeFalse("skipping as test will timeout", SDKTestWorkflowRule.useExternalService); + + WorkflowOptions options = + SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue()); + options = + WorkflowOptions.newBuilder(options) + .setWorkflowRunTimeout(Duration.ofHours(1)) + .setCronSchedule("0 */6 * * *") + .build(); + TestWorkflows.TestWorkflowWithCronSchedule workflow = + testWorkflowRule + .getWorkflowClient() + .newWorkflowStub(TestWorkflows.TestWorkflowWithCronSchedule.class, options); + + testWorkflowRule.registerDelayedCallback( + Duration.ofDays(1), WorkflowStub.fromTyped(workflow)::cancel); + WorkflowClient.start(workflow::execute, testName.getMethodName()); + + try { + workflow.execute(testName.getMethodName()); + fail("unreachable"); + } catch (WorkflowFailedException e) { + assertTrue(e.getCause() instanceof CanceledFailure); + } + + Map lastCompletionResults = + TestWorkflowWithCronScheduleImpl.lastCompletionResults.get(testName.getMethodName()); + assertEquals(4, lastCompletionResults.size()); + // Run 3 failed. So on run 4 we get the last completion result from run 2. + assertEquals("run 2", lastCompletionResults.get(4)); + // The last failure ought to be the one from run 3 + assertTrue(TestWorkflowWithCronScheduleImpl.lastFail.isPresent()); + assertTrue( + TestWorkflowWithCronScheduleImpl.lastFail.get().getMessage().contains("simulated error")); + } + @ActivityInterface public interface SimpleActivity { @ActivityMethod(name = "simple") @@ -159,14 +238,21 @@ public String execute(String input) { assertEquals("result", result); // Child Workflow if (!Workflow.getInfo().getParentWorkflowId().isPresent()) { + ChildWorkflowOptions childOptions = ChildWorkflowOptions.newBuilder().setMemo(MEMO).build(); TestWorkflows.TestWorkflow1 child = - Workflow.newChildWorkflowStub(TestWorkflows.TestWorkflow1.class); + Workflow.newChildWorkflowStub(TestWorkflows.TestWorkflow1.class, childOptions); result = child.execute(input); assertEquals("result", result); } + // Memo + String memoValue = (String) Workflow.getMemo(MEMO_KEY, String.class); + if (memoValue != null) { + assertEquals(MEMO_VALUE, memoValue); + } // continueAsNew if (!Workflow.getInfo().getContinuedExecutionRunId().isPresent()) { - Workflow.continueAsNew(input); + ContinueAsNewOptions casOptions = ContinueAsNewOptions.newBuilder().setMemo(MEMO).build(); + Workflow.continueAsNew(casOptions, input); } return result; }