Skip to content

Commit

Permalink
Apply data converter context in more places (#1896)
Browse files Browse the repository at this point in the history
Add data converter context to memo, lastFailure and schedules
  • Loading branch information
Quinn-With-Two-Ns authored Oct 16, 2023
1 parent 4fe296e commit 538508b
Show file tree
Hide file tree
Showing 8 changed files with 141 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.temporal.common.converter.DataConverter;
import io.temporal.internal.common.ProtobufTimeUtils;
import io.temporal.internal.common.SearchAttributesUtil;
import io.temporal.payload.context.WorkflowSerializationContext;
import java.lang.reflect.Type;
import java.time.Instant;
import java.util.Collections;
Expand Down Expand Up @@ -123,7 +124,11 @@ public <T> T getMemo(String key, Class<T> valueClass, Type genericType) {
if (memo == null) {
return null;
}
return dataConverter.fromPayload(memo, valueClass, genericType);
return dataConverter
.withContext(
new WorkflowSerializationContext(
info.getParentNamespaceId(), info.getExecution().getWorkflowId()))
.fromPayload(memo, valueClass, genericType);
}

@Nonnull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public void createSchedule(CreateScheduleInput input) {
.setSchedule(scheduleRequestHeader.scheduleToProto(input.getSchedule()));

if (input.getOptions().getMemo() != null) {
// TODO we don't have a workflow context here, maybe we need a schedule context?
request.setMemo(
Memo.newBuilder()
.putAllFields(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public WorkflowStartOutput start(WorkflowStartInput input) {
(input.getOptions().getMemo() != null)
? Memo.newBuilder()
.putAllFields(
intoPayloadMap(clientOptions.getDataConverter(), input.getOptions().getMemo()))
intoPayloadMap(dataConverterWithWorkflowContext, input.getOptions().getMemo()))
.build()
: null;

Expand Down Expand Up @@ -169,7 +169,7 @@ public WorkflowSignalWithStartOutput signalWithStart(WorkflowSignalWithStartInpu
? Memo.newBuilder()
.putAllFields(
intoPayloadMap(
clientOptions.getDataConverter(),
dataConverterWithWorkflowContext,
workflowStartInput.getOptions().getMemo()))
.build()
: null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,13 @@
import io.temporal.client.WorkflowOptions;
import io.temporal.client.schedules.*;
import io.temporal.common.context.ContextPropagator;
import io.temporal.common.converter.DataConverter;
import io.temporal.common.converter.EncodedValues;
import io.temporal.internal.client.external.GenericWorkflowClient;
import io.temporal.internal.common.ProtobufTimeUtils;
import io.temporal.internal.common.RetryOptionsUtils;
import io.temporal.internal.common.SearchAttributesUtil;
import io.temporal.payload.context.WorkflowSerializationContext;
import java.time.Instant;
import java.util.*;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -82,6 +84,13 @@ private io.temporal.common.interceptors.Header extractContextsAndConvertToBytes(
public ScheduleAction actionToProto(io.temporal.client.schedules.ScheduleAction action) {
if (action instanceof ScheduleActionStartWorkflow) {
ScheduleActionStartWorkflow startWorkflowAction = (ScheduleActionStartWorkflow) action;
DataConverter dataConverterWithWorkflowContext =
clientOptions
.getDataConverter()
.withContext(
new WorkflowSerializationContext(
clientOptions.getNamespace(),
startWorkflowAction.getOptions().getWorkflowId()));

WorkflowOptions wfOptions = startWorkflowAction.getOptions();
// Disallow some options
Expand Down Expand Up @@ -113,7 +122,7 @@ public ScheduleAction actionToProto(io.temporal.client.schedules.ScheduleAction
ProtobufTimeUtils.toProtoDuration(wfOptions.getWorkflowTaskTimeout()))
.setTaskQueue(TaskQueue.newBuilder().setName(wfOptions.getTaskQueue()).build());

startWorkflowAction.getArguments().setDataConverter(clientOptions.getDataConverter());
startWorkflowAction.getArguments().setDataConverter(dataConverterWithWorkflowContext);
Optional<Payloads> inputArgs = startWorkflowAction.getArguments().toPayloads();
if (inputArgs.isPresent()) {
workflowRequest.setInput(inputArgs.get());
Expand All @@ -128,7 +137,7 @@ public ScheduleAction actionToProto(io.temporal.client.schedules.ScheduleAction
item.getKey(), ((EncodedValues) item.getValue()).toPayloads().get().getPayloads(0));
} else {
memo.put(
item.getKey(), clientOptions.getDataConverter().toPayload(item.getValue()).get());
item.getKey(), dataConverterWithWorkflowContext.toPayload(item.getValue()).get());
}
}
workflowRequest.setMemo(Memo.newBuilder().putAllFields(memo).build());
Expand Down Expand Up @@ -388,12 +397,19 @@ public io.temporal.client.schedules.ScheduleAction protoToAction(@Nonnull Schedu
Objects.requireNonNull(action);
if (action.hasStartWorkflow()) {
NewWorkflowExecutionInfo startWfAction = action.getStartWorkflow();
DataConverter dataConverterWithWorkflowContext =
clientOptions
.getDataConverter()
.withContext(
new WorkflowSerializationContext(
clientOptions.getNamespace(), startWfAction.getWorkflowId()));

ScheduleActionStartWorkflow.Builder builder = ScheduleActionStartWorkflow.newBuilder();
builder.setWorkflowType(startWfAction.getWorkflowType().getName());

builder.setRawArguments(
new EncodedValues(
Optional.of(startWfAction.getInput()), clientOptions.getDataConverter()));
Optional.of(startWfAction.getInput()), dataConverterWithWorkflowContext));

WorkflowOptions.Builder wfOptionsBuilder = WorkflowOptions.newBuilder();
// set required options
Expand All @@ -420,7 +436,7 @@ public io.temporal.client.schedules.ScheduleAction protoToAction(@Nonnull Schedu
EncodedValues encodedMemo =
new EncodedValues(
Optional.of(Payloads.newBuilder().addPayloads(memo.getValue()).build()),
clientOptions.getDataConverter());
dataConverterWithWorkflowContext);
memos.put(memo.getKey(), encodedMemo);
}
wfOptionsBuilder.setMemo(memos);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1008,6 +1008,10 @@ public DataConverter getDataConverter() {
return dataConverter;
}

public DataConverter getDataConverterWithCurrentWorkflowContext() {
return dataConverterWithCurrentWorkflowContext;
}

boolean isReplaying() {
return replayContext.isReplaying();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ public static <T> T getMemo(String key, Class<T> valueClass, Type genericType) {
return null;
}

return getDataConverter().fromPayload(memo, valueClass, genericType);
return getDataConverterWithCurrentWorkflowContext().fromPayload(memo, valueClass, genericType);
}

public static <R> R retry(
Expand Down Expand Up @@ -693,20 +693,24 @@ public static io.temporal.common.SearchAttributes getTypedSearchAttributes() {
}

public static void upsertSearchAttributes(Map<String, ?> searchAttributes) {
assertNotReadOnly("upset search attribute");
assertNotReadOnly("upsert search attribute");
getWorkflowOutboundInterceptor().upsertSearchAttributes(searchAttributes);
}

public static void upsertTypedSearchAttributes(
SearchAttributeUpdate<?>... searchAttributeUpdates) {
assertNotReadOnly("upset search attribute");
assertNotReadOnly("upsert search attribute");
getWorkflowOutboundInterceptor().upsertTypedSearchAttributes(searchAttributeUpdates);
}

public static DataConverter getDataConverter() {
return getRootWorkflowContext().getDataConverter();
}

static DataConverter getDataConverterWithCurrentWorkflowContext() {
return getRootWorkflowContext().getDataConverterWithCurrentWorkflowContext();
}

/**
* Name of the workflow type the interface defines. It is either the interface short name * or
* value of {@link WorkflowMethod#name()} parameter.
Expand All @@ -723,7 +727,7 @@ public static Optional<Exception> getPreviousRunFailure() {
return Optional.ofNullable(getRootWorkflowContext().getReplayContext().getPreviousRunFailure())
// Temporal Failure Values are additional user payload and serialized using user data
// converter
.map(f -> getDataConverter().failureToException(f));
.map(f -> getDataConverterWithCurrentWorkflowContext().failureToException(f));
}

private static WorkflowOutboundCallsInterceptor getWorkflowOutboundInterceptor() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public interface HasWorkflowSerializationContext extends SerializationContext {
* @return workflowId of the Workflow Execution the Serialization Target belongs to. If the Target
* is a Workflow itself, this method will return the Target's Workflow ID (not the ID of the
* parent workflow).
* <p>WARNING: When used in the context of a schedule workflow the workflowId may differ on
* serialization and deserialization.
*/
String getWorkflowId();
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,34 +21,39 @@
package io.temporal.functional.serialization;

import static org.junit.Assert.*;
import static org.junit.Assume.assumeFalse;
import static org.junit.Assume.assumeTrue;

import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
import io.temporal.activity.*;
import io.temporal.api.common.v1.Payload;
import io.temporal.client.WorkflowClientOptions;
import io.temporal.common.converter.CodecDataConverter;
import io.temporal.common.converter.DataConverter;
import io.temporal.common.converter.DefaultDataConverter;
import io.temporal.common.converter.EncodingKeys;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.client.*;
import io.temporal.client.schedules.*;
import io.temporal.common.converter.*;
import io.temporal.failure.CanceledFailure;
import io.temporal.payload.codec.PayloadCodec;
import io.temporal.payload.codec.PayloadCodecException;
import io.temporal.payload.context.ActivitySerializationContext;
import io.temporal.payload.context.HasWorkflowSerializationContext;
import io.temporal.payload.context.SerializationContext;
import io.temporal.testing.internal.SDKTestOptions;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.workflow.ChildWorkflowOptions;
import io.temporal.workflow.ContinueAsNewOptions;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.shared.TestWorkflowWithCronScheduleImpl;
import io.temporal.workflow.shared.TestWorkflows;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.*;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;

/**
* This test emulates a scenario when users may be using WorkflowId in their encoding to sign every
Expand All @@ -58,6 +63,9 @@
* explode on decoding.
*/
public class WorkflowIdSignedPayloadsTest {
private static final String MEMO_KEY = "testKey";
private static final String MEMO_VALUE = "testValue";
private static final Map<String, Object> MEMO = ImmutableMap.of(MEMO_KEY, MEMO_VALUE);
private final SimpleActivity heartbeatingActivity = new HeartbeatingIfNotLocalActivityImpl();
private final ManualCompletionActivity manualCompletionActivity =
new ManualCompletionActivityImpl();
Expand All @@ -70,19 +78,90 @@ public class WorkflowIdSignedPayloadsTest {
@Rule
public SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder()
.setWorkflowTypes(SimpleWorkflowWithAnActivity.class)
.setWorkflowTypes(
SimpleWorkflowWithAnActivity.class, TestWorkflowWithCronScheduleImpl.class)
.setWorkflowClientOptions(
WorkflowClientOptions.newBuilder().setDataConverter(codecDataConverter).build())
.setActivityImplementations(heartbeatingActivity, manualCompletionActivity)
.build();

@Rule public TestName testName = new TestName();

@Test
public void testSimpleWorkflowWithAnActivity() {
TestWorkflows.TestWorkflow1 workflowStub =
testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflows.TestWorkflow1.class);
assertEquals("result", workflowStub.execute("input"));
}

@Test
public void testSimpleWorkflowWithMemo() throws InterruptedException {
assumeTrue(
"skipping as test server does not support list", SDKTestWorkflowRule.useExternalService);

WorkflowOptions options =
SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue());
options = WorkflowOptions.newBuilder(options).setMemo(MEMO).build();
TestWorkflows.TestWorkflow1 workflowStub =
testWorkflowRule
.getWorkflowClient()
.newWorkflowStub(TestWorkflows.TestWorkflow1.class, options);
assertEquals("result", workflowStub.execute("input"));
WorkflowExecution execution = WorkflowStub.fromTyped(workflowStub).getExecution();
String workflowId = execution.getWorkflowId();
String runId = execution.getRunId();

// listWorkflowExecutions is Visibility API
// Temporal Visibility has latency and is not transactional with the Server API call
Thread.sleep(4_000);

List<WorkflowExecutionMetadata> executions =
testWorkflowRule
.getWorkflowClient()
.listExecutions("WorkflowId = '" + workflowId + "' AND " + " RunId = '" + runId + "'")
.collect(Collectors.toList());
assertEquals(1, executions.size());
assertEquals(MEMO_VALUE, executions.get(0).getMemo(MEMO_KEY, String.class));
}

@Test
public void testSimpleCronWorkflow() {
assumeFalse("skipping as test will timeout", SDKTestWorkflowRule.useExternalService);

WorkflowOptions options =
SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue());
options =
WorkflowOptions.newBuilder(options)
.setWorkflowRunTimeout(Duration.ofHours(1))
.setCronSchedule("0 */6 * * *")
.build();
TestWorkflows.TestWorkflowWithCronSchedule workflow =
testWorkflowRule
.getWorkflowClient()
.newWorkflowStub(TestWorkflows.TestWorkflowWithCronSchedule.class, options);

testWorkflowRule.registerDelayedCallback(
Duration.ofDays(1), WorkflowStub.fromTyped(workflow)::cancel);
WorkflowClient.start(workflow::execute, testName.getMethodName());

try {
workflow.execute(testName.getMethodName());
fail("unreachable");
} catch (WorkflowFailedException e) {
assertTrue(e.getCause() instanceof CanceledFailure);
}

Map<Integer, String> lastCompletionResults =
TestWorkflowWithCronScheduleImpl.lastCompletionResults.get(testName.getMethodName());
assertEquals(4, lastCompletionResults.size());
// Run 3 failed. So on run 4 we get the last completion result from run 2.
assertEquals("run 2", lastCompletionResults.get(4));
// The last failure ought to be the one from run 3
assertTrue(TestWorkflowWithCronScheduleImpl.lastFail.isPresent());
assertTrue(
TestWorkflowWithCronScheduleImpl.lastFail.get().getMessage().contains("simulated error"));
}

@ActivityInterface
public interface SimpleActivity {
@ActivityMethod(name = "simple")
Expand Down Expand Up @@ -159,14 +238,21 @@ public String execute(String input) {
assertEquals("result", result);
// Child Workflow
if (!Workflow.getInfo().getParentWorkflowId().isPresent()) {
ChildWorkflowOptions childOptions = ChildWorkflowOptions.newBuilder().setMemo(MEMO).build();
TestWorkflows.TestWorkflow1 child =
Workflow.newChildWorkflowStub(TestWorkflows.TestWorkflow1.class);
Workflow.newChildWorkflowStub(TestWorkflows.TestWorkflow1.class, childOptions);
result = child.execute(input);
assertEquals("result", result);
}
// Memo
String memoValue = (String) Workflow.getMemo(MEMO_KEY, String.class);
if (memoValue != null) {
assertEquals(MEMO_VALUE, memoValue);
}
// continueAsNew
if (!Workflow.getInfo().getContinuedExecutionRunId().isPresent()) {
Workflow.continueAsNew(input);
ContinueAsNewOptions casOptions = ContinueAsNewOptions.newBuilder().setMemo(MEMO).build();
Workflow.continueAsNew(casOptions, input);
}
return result;
}
Expand Down

0 comments on commit 538508b

Please sign in to comment.