Skip to content

Commit

Permalink
Add support for user meta data (#2218)
Browse files Browse the repository at this point in the history
Add support for user metadata on certain events
  • Loading branch information
Quinn-With-Two-Ns authored Sep 16, 2024
1 parent 2163b8f commit 03f7182
Show file tree
Hide file tree
Showing 29 changed files with 621 additions and 41 deletions.
76 changes: 67 additions & 9 deletions temporal-sdk/src/main/java/io/temporal/client/WorkflowOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@
import com.google.common.base.Objects;
import io.temporal.api.enums.v1.WorkflowIdConflictPolicy;
import io.temporal.api.enums.v1.WorkflowIdReusePolicy;
import io.temporal.common.CronSchedule;
import io.temporal.common.MethodRetry;
import io.temporal.common.RetryOptions;
import io.temporal.common.SearchAttributes;
import io.temporal.common.*;
import io.temporal.common.context.ContextPropagator;
import io.temporal.internal.common.OptionsUtils;
import io.temporal.worker.WorkerFactory;
Expand Down Expand Up @@ -79,6 +76,8 @@ public static WorkflowOptions merge(
.setDisableEagerExecution(o.isDisableEagerExecution())
.setStartDelay(o.getStartDelay())
.setWorkflowIdConflictPolicy(o.getWorkflowIdConflictPolicy())
.setStaticSummary(o.getStaticSummary())
.setStaticDetails(o.getStaticDetails())
.validateBuildWithDefaults();
}

Expand Down Expand Up @@ -114,6 +113,10 @@ public static final class Builder {

private WorkflowIdConflictPolicy workflowIdConflictpolicy;

private String staticSummary;

private String staticDetails;

private Builder() {}

private Builder(WorkflowOptions options) {
Expand All @@ -135,6 +138,8 @@ private Builder(WorkflowOptions options) {
this.disableEagerExecution = options.disableEagerExecution;
this.startDelay = options.startDelay;
this.workflowIdConflictpolicy = options.workflowIdConflictpolicy;
this.staticSummary = options.staticSummary;
this.staticDetails = options.staticDetails;
}

/**
Expand Down Expand Up @@ -382,6 +387,31 @@ public Builder setStartDelay(Duration startDelay) {
return this;
}

/**
* Single-line fixed summary for this workflow execution that will appear in UI/CLI. This can be
* in single-line Temporal Markdown format.
*
* <p>Default is none/empty.
*/
@Experimental
public Builder setStaticSummary(String staticSummary) {
this.staticSummary = staticSummary;
return this;
}

/**
* General fixed details for this workflow execution that will appear in UI/CLI. This can be in
* Temporal Markdown format and can span multiple lines. This is a fixed value on the workflow
* that cannot be updated.
*
* <p>Default is none/empty.
*/
@Experimental
public Builder setStaticDetails(String staticDetails) {
this.staticDetails = staticDetails;
return this;
}

public WorkflowOptions build() {
return new WorkflowOptions(
workflowId,
Expand All @@ -398,7 +428,9 @@ public WorkflowOptions build() {
contextPropagators,
disableEagerExecution,
startDelay,
workflowIdConflictpolicy);
workflowIdConflictpolicy,
staticSummary,
staticDetails);
}

/**
Expand All @@ -420,7 +452,9 @@ public WorkflowOptions validateBuildWithDefaults() {
contextPropagators,
disableEagerExecution,
startDelay,
workflowIdConflictpolicy);
workflowIdConflictpolicy,
staticSummary,
staticDetails);
}
}

Expand Down Expand Up @@ -454,6 +488,10 @@ public WorkflowOptions validateBuildWithDefaults() {

private final WorkflowIdConflictPolicy workflowIdConflictpolicy;

private final String staticSummary;

private final String staticDetails;

private WorkflowOptions(
String workflowId,
WorkflowIdReusePolicy workflowIdReusePolicy,
Expand All @@ -469,7 +507,9 @@ private WorkflowOptions(
List<ContextPropagator> contextPropagators,
boolean disableEagerExecution,
Duration startDelay,
WorkflowIdConflictPolicy workflowIdConflictpolicy) {
WorkflowIdConflictPolicy workflowIdConflictpolicy,
String staticSummary,
String staticDetails) {
this.workflowId = workflowId;
this.workflowIdReusePolicy = workflowIdReusePolicy;
this.workflowRunTimeout = workflowRunTimeout;
Expand All @@ -485,6 +525,8 @@ private WorkflowOptions(
this.disableEagerExecution = disableEagerExecution;
this.startDelay = startDelay;
this.workflowIdConflictpolicy = workflowIdConflictpolicy;
this.staticSummary = staticSummary;
this.staticDetails = staticDetails;
}

public String getWorkflowId() {
Expand Down Expand Up @@ -556,6 +598,14 @@ public WorkflowIdConflictPolicy getWorkflowIdConflictPolicy() {
return workflowIdConflictpolicy;
}

public String getStaticSummary() {
return staticSummary;
}

public String getStaticDetails() {
return staticDetails;
}

public Builder toBuilder() {
return new Builder(this);
}
Expand All @@ -579,7 +629,9 @@ public boolean equals(Object o) {
&& Objects.equal(contextPropagators, that.contextPropagators)
&& Objects.equal(disableEagerExecution, that.disableEagerExecution)
&& Objects.equal(startDelay, that.startDelay)
&& Objects.equal(workflowIdConflictpolicy, that.workflowIdConflictpolicy);
&& Objects.equal(workflowIdConflictpolicy, that.workflowIdConflictpolicy)
&& Objects.equal(staticSummary, that.staticSummary)
&& Objects.equal(staticDetails, that.staticDetails);
}

@Override
Expand All @@ -599,7 +651,9 @@ public int hashCode() {
contextPropagators,
disableEagerExecution,
startDelay,
workflowIdConflictpolicy);
workflowIdConflictpolicy,
staticSummary,
staticDetails);
}

@Override
Expand Down Expand Up @@ -638,6 +692,10 @@ public String toString() {
+ startDelay
+ ", workflowIdConflictpolicy="
+ workflowIdConflictpolicy
+ ", staticSummary="
+ staticSummary
+ ", staticDetails="
+ staticDetails
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,8 @@ public DynamicUpdateHandler getHandler() {

Promise<Void> newTimer(Duration duration);

Promise<Void> newTimer(Duration duration, TimerOptions options);

<R> R sideEffect(Class<R> resultClass, Type resultType, Func<R> func);

<R> R mutableSideEffect(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.temporal.common.SearchAttributeUpdate;
import io.temporal.workflow.Functions.Func;
import io.temporal.workflow.Promise;
import io.temporal.workflow.TimerOptions;
import java.lang.reflect.Type;
import java.time.Duration;
import java.util.Map;
Expand Down Expand Up @@ -90,6 +91,11 @@ public Promise<Void> newTimer(Duration duration) {
return next.newTimer(duration);
}

@Override
public Promise<Void> newTimer(Duration duration, TimerOptions options) {
return next.newTimer(duration, options);
}

@Override
public <R> R sideEffect(Class<R> resultClass, Type resultType, Func<R> func) {
return next.sideEffect(resultClass, resultType, func);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package io.temporal.internal.client;

import static io.temporal.internal.common.HeaderUtils.intoPayloadMap;
import static io.temporal.internal.common.WorkflowExecutionUtils.makeUserMetaData;

import io.grpc.Deadline;
import io.grpc.Status;
Expand All @@ -29,6 +30,7 @@
import io.temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage;
import io.temporal.api.enums.v1.WorkflowExecutionStatus;
import io.temporal.api.query.v1.WorkflowQuery;
import io.temporal.api.sdk.v1.UserMetadata;
import io.temporal.api.update.v1.*;
import io.temporal.api.workflowservice.v1.*;
import io.temporal.client.*;
Expand Down Expand Up @@ -86,14 +88,22 @@ public WorkflowStartOutput start(WorkflowStartInput input) {
.build()
: null;

@Nullable
UserMetadata userMetadata =
makeUserMetaData(
input.getOptions().getStaticSummary(),
input.getOptions().getStaticDetails(),
dataConverterWithWorkflowContext);

StartWorkflowExecutionRequest.Builder request =
requestsHelper.newStartWorkflowExecutionRequest(
input.getWorkflowId(),
input.getWorkflowType(),
input.getHeader(),
input.getOptions(),
inputArgs.orElse(null),
memo);
memo,
userMetadata);
try (@Nullable WorkflowTaskDispatchHandle eagerDispatchHandle = obtainDispatchHandle(input)) {
boolean requestEagerExecution = eagerDispatchHandle != null;
request.setRequestEagerExecution(requestEagerExecution);
Expand Down Expand Up @@ -173,14 +183,22 @@ public WorkflowSignalWithStartOutput signalWithStart(WorkflowSignalWithStartInpu
.build()
: null;

@Nullable
UserMetadata userMetadata =
makeUserMetaData(
workflowStartInput.getOptions().getStaticSummary(),
workflowStartInput.getOptions().getStaticDetails(),
dataConverterWithWorkflowContext);

StartWorkflowExecutionRequestOrBuilder startRequest =
requestsHelper.newStartWorkflowExecutionRequest(
workflowStartInput.getWorkflowId(),
workflowStartInput.getWorkflowType(),
workflowStartInput.getHeader(),
workflowStartInput.getOptions(),
workflowInput.orElse(null),
memo);
memo,
userMetadata);

Optional<Payloads> signalInput =
dataConverterWithWorkflowContext.toPayloads(input.getSignalArguments());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import static io.temporal.internal.common.HeaderUtils.toHeaderGrpc;
import static io.temporal.internal.common.RetryOptionsUtils.toRetryPolicy;
import static io.temporal.internal.common.WorkflowExecutionUtils.makeUserMetaData;

import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
Expand All @@ -33,6 +34,7 @@
import io.temporal.api.schedule.v1.ScheduleInfo;
import io.temporal.api.schedule.v1.ScheduleSpec;
import io.temporal.api.schedule.v1.ScheduleState;
import io.temporal.api.sdk.v1.UserMetadata;
import io.temporal.api.taskqueue.v1.TaskQueue;
import io.temporal.api.workflow.v1.NewWorkflowExecutionInfo;
import io.temporal.client.WorkflowOptions;
Expand Down Expand Up @@ -160,6 +162,16 @@ public ScheduleAction actionToProto(io.temporal.client.schedules.ScheduleAction
SearchAttributesUtil.encodeTyped(wfOptions.getTypedSearchAttributes()));
}

@Nullable
UserMetadata userMetadata =
makeUserMetaData(
wfOptions.getStaticSummary(),
wfOptions.getStaticDetails(),
dataConverterWithWorkflowContext);
if (userMetadata != null) {
workflowRequest.setUserMetadata(userMetadata);
}

Header grpcHeader =
toHeaderGrpc(
startWorkflowAction.getHeader(),
Expand Down Expand Up @@ -460,6 +472,15 @@ public io.temporal.client.schedules.ScheduleAction protoToAction(@Nonnull Schedu
SearchAttributesUtil.decodeTyped(startWfAction.getSearchAttributes()));
}

if (startWfAction.hasUserMetadata()) {
wfOptionsBuilder.setStaticSummary(
dataConverterWithWorkflowContext.fromPayload(
startWfAction.getUserMetadata().getSummary(), String.class, String.class));
wfOptionsBuilder.setStaticDetails(
dataConverterWithWorkflowContext.fromPayload(
startWfAction.getUserMetadata().getDetails(), String.class, String.class));
}

builder.setOptions(wfOptionsBuilder.build());
return builder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.protobuf.ByteString;
import io.temporal.api.common.v1.*;
import io.temporal.api.enums.v1.HistoryEventFilterType;
import io.temporal.api.sdk.v1.UserMetadata;
import io.temporal.api.taskqueue.v1.TaskQueue;
import io.temporal.api.workflowservice.v1.GetWorkflowExecutionHistoryRequest;
import io.temporal.api.workflowservice.v1.SignalWithStartWorkflowExecutionRequest;
Expand Down Expand Up @@ -59,7 +60,8 @@ StartWorkflowExecutionRequest.Builder newStartWorkflowExecutionRequest(
@Nonnull io.temporal.common.interceptors.Header header,
@Nonnull WorkflowOptions options,
@Nullable Payloads inputArgs,
@Nullable Memo memo) {
@Nullable Memo memo,
@Nullable UserMetadata userMetadata) {
StartWorkflowExecutionRequest.Builder request =
StartWorkflowExecutionRequest.newBuilder()
.setNamespace(clientOptions.getNamespace())
Expand Down Expand Up @@ -108,6 +110,10 @@ StartWorkflowExecutionRequest.Builder newStartWorkflowExecutionRequest(
request.setWorkflowStartDelay(ProtobufTimeUtils.toProtoDuration(options.getStartDelay()));
}

if (userMetadata != null) {
request.setUserMetadata(userMetadata);
}

if (options.getSearchAttributes() != null && !options.getSearchAttributes().isEmpty()) {
if (options.getTypedSearchAttributes() != null) {
throw new IllegalArgumentException(
Expand Down Expand Up @@ -183,6 +189,10 @@ SignalWithStartWorkflowExecutionRequest.Builder newSignalWithStartWorkflowExecut
request.setWorkflowStartDelay(startParameters.getWorkflowStartDelay());
}

if (startParameters.hasUserMetadata()) {
request.setUserMetadata(startParameters.getUserMetadata());
}

return request;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.temporal.api.enums.v1.TimeoutType;
import io.temporal.api.enums.v1.WorkflowExecutionStatus;
import io.temporal.api.history.v1.*;
import io.temporal.api.sdk.v1.UserMetadata;
import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponseOrBuilder;
import io.temporal.client.WorkflowFailedException;
import io.temporal.common.converter.DataConverter;
Expand Down Expand Up @@ -240,6 +241,21 @@ public static WorkflowExecutionStatus getCloseStatus(HistoryEvent event) {
}
}

public static UserMetadata makeUserMetaData(String summary, String details, DataConverter dc) {
if (summary == null && details == null) {
return null;
}

UserMetadata.Builder builder = UserMetadata.newBuilder();
if (summary != null) {
builder.setSummary(dc.toPayload(summary).get());
}
if (details != null) {
builder.setDetails(dc.toPayload(details).get());
}
return builder.build();
}

public static String prettyPrintCommands(Iterable<Command> commands) {
StringBuilder result = new StringBuilder();
for (Command command : commands) {
Expand Down
Loading

0 comments on commit 03f7182

Please sign in to comment.