Skip to content

Commit

Permalink
Support user metadata on activities and child/scheduled workflows (#1719
Browse files Browse the repository at this point in the history
)

Support user metadata on activities and child/scheduled workflows
  • Loading branch information
Quinn-With-Two-Ns authored Nov 21, 2024
1 parent c31c2f2 commit 643c803
Show file tree
Hide file tree
Showing 11 changed files with 111 additions and 29 deletions.
8 changes: 8 additions & 0 deletions internal/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,14 @@ type (
// build ID or not. See temporal.VersioningIntent.
// WARNING: Worker versioning is currently experimental
VersioningIntent VersioningIntent

// Summary is a single-line summary for this activity that will appear in UI/CLI. This can be
// in single-line Temporal Markdown format.
//
// Optional: defaults to none/empty.
//
// NOTE: Experimental
Summary string
}

// LocalActivityOptions stores local activity specific parameters that will be stored inside of a context.
Expand Down
2 changes: 1 addition & 1 deletion internal/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func testTimeoutErrorDetails(t *testing.T, timeoutType enumspb.TimeoutType) {
context.commandsHelper.scheduledEventIDToActivityID[5] = activityID
di := h.newActivityCommandStateMachine(
5,
&commandpb.ScheduleActivityTaskCommandAttributes{ActivityId: activityID})
&commandpb.ScheduleActivityTaskCommandAttributes{ActivityId: activityID}, nil)
di.state = commandStateInitiated
di.setData(&scheduledActivity{
callback: func(r *commonpb.Payloads, e error) {
Expand Down
1 change: 1 addition & 0 deletions internal/internal_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type (
RetryPolicy *commonpb.RetryPolicy
DisableEagerExecution bool
VersioningIntent VersioningIntent
Summary string
}

// ExecuteLocalActivityOptions options for executing a local activity
Expand Down
11 changes: 8 additions & 3 deletions internal/internal_command_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,9 @@ type (

activityCommandStateMachine struct {
*commandStateMachineBase
scheduleID int64
attributes *commandpb.ScheduleActivityTaskCommandAttributes
scheduleID int64
attributes *commandpb.ScheduleActivityTaskCommandAttributes
startMetadata *sdk.UserMetadata
}

cancelActivityStateMachine struct {
Expand Down Expand Up @@ -348,12 +349,14 @@ func (h *commandsHelper) newCommandStateMachineBase(commandType commandType, id
func (h *commandsHelper) newActivityCommandStateMachine(
scheduleID int64,
attributes *commandpb.ScheduleActivityTaskCommandAttributes,
startMetadata *sdk.UserMetadata,
) *activityCommandStateMachine {
base := h.newCommandStateMachineBase(commandTypeActivity, attributes.GetActivityId())
return &activityCommandStateMachine{
commandStateMachineBase: base,
scheduleID: scheduleID,
attributes: attributes,
startMetadata: startMetadata,
}
}

Expand Down Expand Up @@ -618,6 +621,7 @@ func (d *activityCommandStateMachine) getCommand() *commandpb.Command {
case commandStateCreated, commandStateCanceledBeforeSent:
command := createNewCommand(enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK)
command.Attributes = &commandpb.Command_ScheduleActivityTaskCommandAttributes{ScheduleActivityTaskCommandAttributes: d.attributes}
command.UserMetadata = d.startMetadata
return command
default:
return nil
Expand Down Expand Up @@ -1118,9 +1122,10 @@ func (h *commandsHelper) moveCommandToBack(command commandStateMachine) {
func (h *commandsHelper) scheduleActivityTask(
scheduleID int64,
attributes *commandpb.ScheduleActivityTaskCommandAttributes,
metadata *sdk.UserMetadata,
) commandStateMachine {
h.scheduledEventIDToActivityID[scheduleID] = attributes.GetActivityId()
command := h.newActivityCommandStateMachine(scheduleID, attributes)
command := h.newActivityCommandStateMachine(scheduleID, attributes, metadata)
h.addCommand(command)
return command
}
Expand Down
12 changes: 6 additions & 6 deletions internal/internal_command_state_machine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func Test_ActivityStateMachine_CompleteWithoutCancel(t *testing.T) {

// schedule activity
scheduleID := h.getNextID()
d := h.scheduleActivityTask(scheduleID, attributes)
d := h.scheduleActivityTask(scheduleID, attributes, nil)
require.Equal(t, commandStateCreated, d.getState())
commands := h.getCommands(true)
require.Equal(t, commandStateCommandSent, d.getState())
Expand All @@ -192,7 +192,7 @@ func Test_ActivityStateMachine_CancelBeforeSent(t *testing.T) {

// schedule activity
scheduleID := h.getNextID()
d := h.scheduleActivityTask(scheduleID, attributes)
d := h.scheduleActivityTask(scheduleID, attributes, nil)
require.Equal(t, commandStateCreated, d.getState())

// Cancel before command sent. We will send the command and the cancellation.
Expand All @@ -215,7 +215,7 @@ func Test_ActivityStateMachine_CancelAfterSent(t *testing.T) {

// schedule activity
scheduleID := h.getNextID()
d := h.scheduleActivityTask(scheduleID, attributes)
d := h.scheduleActivityTask(scheduleID, attributes, nil)
require.Equal(t, commandStateCreated, d.getState())
commands := h.getCommands(true)
require.Equal(t, 1, len(commands))
Expand Down Expand Up @@ -251,7 +251,7 @@ func Test_ActivityStateMachine_CompletedAfterCancel(t *testing.T) {

// schedule activity
scheduleID := h.getNextID()
d := h.scheduleActivityTask(scheduleID, attributes)
d := h.scheduleActivityTask(scheduleID, attributes, nil)
require.Equal(t, commandStateCreated, d.getState())
commands := h.getCommands(true)
require.Equal(t, 1, len(commands))
Expand Down Expand Up @@ -287,7 +287,7 @@ func Test_ActivityStateMachine_CancelInitiated_After_CanceledBeforeSent(t *testi

// schedule activity
scheduleID := h.getNextID()
d := h.scheduleActivityTask(scheduleID, attributes)
d := h.scheduleActivityTask(scheduleID, attributes, nil)
require.Equal(t, commandStateCreated, d.getState())

// cancel activity before sent
Expand Down Expand Up @@ -324,7 +324,7 @@ func Test_ActivityStateMachine_PanicInvalidStateTransition(t *testing.T) {

// schedule activity
scheduleID := h.getNextID()
h.scheduleActivityTask(scheduleID, attributes)
h.scheduleActivityTask(scheduleID, attributes, nil)

// verify that using invalid activity id will panic
err := runAndCatchPanic(func() {
Expand Down
10 changes: 8 additions & 2 deletions internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ func (wc *workflowEnvironmentImpl) ExecuteChildWorkflow(
attributes.InheritBuildId = determineInheritBuildIdFlagForCommand(
params.VersioningIntent, wc.workflowInfo.TaskQueueName, params.TaskQueueName)

startMetadata, err := buildUserMetadata(params.staticSummary, params.staticDetails, wc.dataConverter)
startMetadata, err := buildUserMetadata(params.StaticSummary, params.StaticDetails, wc.dataConverter)
if err != nil {
callback(nil, err)
return
Expand Down Expand Up @@ -759,7 +759,13 @@ func (wc *workflowEnvironmentImpl) ExecuteActivity(parameters ExecuteActivityPar
scheduleTaskAttr.UseWorkflowBuildId = determineInheritBuildIdFlagForCommand(
parameters.VersioningIntent, wc.workflowInfo.TaskQueueName, parameters.TaskQueueName)

command := wc.commandsHelper.scheduleActivityTask(scheduleID, scheduleTaskAttr)
startMetadata, err := buildUserMetadata(parameters.Summary, "", wc.dataConverter)
if err != nil {
callback(nil, err)
return ActivityID{}
}

command := wc.commandsHelper.scheduleActivityTask(scheduleID, scheduleTaskAttr, startMetadata)
command.setData(&scheduledActivity{
callback: callback,
waitForCancelRequest: parameters.WaitForCancellation,
Expand Down
5 changes: 2 additions & 3 deletions internal/internal_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,16 +222,15 @@ type (
SearchAttributes map[string]interface{}
TypedSearchAttributes SearchAttributes
ParentClosePolicy enumspb.ParentClosePolicy
StaticSummary string
StaticDetails string
signalChannels map[string]Channel
requestedSignalChannels map[string]*requestedSignalChannel
queryHandlers map[string]*queryHandler
updateHandlers map[string]*updateHandler
// runningUpdatesHandles is a map of update handlers that are currently running.
runningUpdatesHandles map[string]UpdateInfo
VersioningIntent VersioningIntent
// TODO(cretz): Expose once https://github.com/temporalio/temporal/issues/6412 is fixed
staticSummary string
staticDetails string
// currentDetails is the user-set string returned on metadata query as
// WorkflowMetadata.current_details
currentDetails string
Expand Down
34 changes: 24 additions & 10 deletions internal/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,9 +380,22 @@ type (
// WARNING: Worker versioning is currently experimental
VersioningIntent VersioningIntent

// TODO(cretz): Expose once https://github.com/temporalio/temporal/issues/6412 is fixed
staticSummary string
staticDetails string
// StaticSummary is a single-line fixed summary for this child workflow execution that will appear in UI/CLI. This can be
// in single-line Temporal Markdown format.
//
// Optional: defaults to none/empty.
//
// NOTE: Experimental
StaticSummary string

// Details - General fixed details for this child workflow execution that will appear in UI/CLI. This can be in
// Temporal markdown format and can span multiple lines. This is a fixed value on the workflow that cannot be
// updated. For details that can be updated, use SetCurrentDetails within the workflow.
//
// Optional: defaults to none/empty.
//
// NOTE: Experimental
StaticDetails string
}

// RegisterWorkflowOptions consists of options for registering a workflow
Expand Down Expand Up @@ -1083,7 +1096,8 @@ func (wc *workflowEnvironmentInterceptor) ExecuteChildWorkflow(ctx Context, chil
options.SearchAttributes = workflowOptionsFromCtx.SearchAttributes
options.TypedSearchAttributes = workflowOptionsFromCtx.TypedSearchAttributes
options.VersioningIntent = workflowOptionsFromCtx.VersioningIntent

options.StaticDetails = workflowOptionsFromCtx.StaticDetails
options.StaticSummary = workflowOptionsFromCtx.StaticSummary
header, err := workflowHeaderPropagated(ctx, options.ContextPropagators)
if err != nil {
executionSettable.Set(nil, err)
Expand Down Expand Up @@ -1609,9 +1623,8 @@ func WithChildWorkflowOptions(ctx Context, cwo ChildWorkflowOptions) Context {
wfOptions.TypedSearchAttributes = cwo.TypedSearchAttributes
wfOptions.ParentClosePolicy = cwo.ParentClosePolicy
wfOptions.VersioningIntent = cwo.VersioningIntent
// TODO(cretz): Expose once https://github.com/temporalio/temporal/issues/6412 is fixed
wfOptions.staticSummary = cwo.staticSummary
wfOptions.staticDetails = cwo.staticDetails
wfOptions.StaticSummary = cwo.StaticSummary
wfOptions.StaticDetails = cwo.StaticDetails

return ctx1
}
Expand All @@ -1638,9 +1651,8 @@ func GetChildWorkflowOptions(ctx Context) ChildWorkflowOptions {
TypedSearchAttributes: opts.TypedSearchAttributes,
ParentClosePolicy: opts.ParentClosePolicy,
VersioningIntent: opts.VersioningIntent,
// TODO(cretz): Expose once https://github.com/temporalio/temporal/issues/6412 is fixed
staticSummary: opts.staticSummary,
staticDetails: opts.staticDetails,
StaticSummary: opts.StaticSummary,
StaticDetails: opts.StaticDetails,
}
}

Expand Down Expand Up @@ -2163,6 +2175,7 @@ func WithActivityOptions(ctx Context, options ActivityOptions) Context {
eap.RetryPolicy = convertToPBRetryPolicy(options.RetryPolicy)
eap.DisableEagerExecution = options.DisableEagerExecution
eap.VersioningIntent = options.VersioningIntent
eap.Summary = options.Summary
return ctx1
}

Expand Down Expand Up @@ -2219,6 +2232,7 @@ func GetActivityOptions(ctx Context) ActivityOptions {
RetryPolicy: convertFromPBRetryPolicy(opts.RetryPolicy),
DisableEagerExecution: opts.DisableEagerExecution,
VersioningIntent: opts.VersioningIntent,
Summary: opts.Summary,
}
}

Expand Down
3 changes: 3 additions & 0 deletions internal/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ func TestGetChildWorkflowOptions(t *testing.T) {
},
ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL,
VersioningIntent: VersioningIntentDefault,
StaticSummary: "child workflow summary",
StaticDetails: "child workflow details",
}

// Require test options to have non-zero value for each field. This ensures that we update tests (and the
Expand All @@ -82,6 +84,7 @@ func TestGetActivityOptions(t *testing.T) {
RetryPolicy: newTestRetryPolicy(),
DisableEagerExecution: true,
VersioningIntent: VersioningIntentDefault,
Summary: "activity summary",
}

assertNonZero(t, opts)
Expand Down
30 changes: 26 additions & 4 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6241,10 +6241,6 @@ func (ts *IntegrationTestSuite) TestRequestFailureMetric() {
}

func (ts *IntegrationTestSuite) TestUserMetadata() {
// Skip this test if disabled
if os.Getenv("DISABLE_USER_METADATA_TESTS") != "" {
ts.T().SkipNow()
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

Expand Down Expand Up @@ -6301,18 +6297,44 @@ func (ts *IntegrationTestSuite) TestUserMetadata() {
// Confirm that the history has a timer with the proper summary
iter := ts.client.GetWorkflowHistory(ctx, run.GetID(), "", false, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT)
var timerEvent *historypb.HistoryEvent
var activityEvent *historypb.HistoryEvent
var childWorkflowEvent *historypb.HistoryEvent
for iter.HasNext() {
event, err := iter.Next()
ts.NoError(err)
if event.GetTimerStartedEventAttributes() != nil {
ts.Nil(timerEvent)
timerEvent = event
}

if event.GetActivityTaskScheduledEventAttributes() != nil {
ts.Nil(activityEvent)
activityEvent = event
}

if event.GetStartChildWorkflowExecutionInitiatedEventAttributes() != nil {
ts.Nil(childWorkflowEvent)
childWorkflowEvent = event
}
}
ts.NotNil(timerEvent)
ts.NoError(converter.GetDefaultDataConverter().FromPayload(
timerEvent.UserMetadata.Summary, &str))
ts.Equal("my-timer", str)

ts.NotNil(activityEvent)
ts.NoError(converter.GetDefaultDataConverter().FromPayload(
activityEvent.UserMetadata.Summary, &str))
ts.Equal("my-activity", str)

ts.NotNil(childWorkflowEvent)
fmt.Printf("childWorkflowEvent: %v\n", childWorkflowEvent.UserMetadata)
ts.NoError(converter.GetDefaultDataConverter().FromPayload(
childWorkflowEvent.UserMetadata.Summary, &str))
ts.Equal("my-child-wf-summary", str)
ts.NoError(converter.GetDefaultDataConverter().FromPayload(
childWorkflowEvent.UserMetadata.Details, &str))
ts.Equal("my-child-wf-details", str)
}

func (ts *IntegrationTestSuite) TestAwaitWithOptionsTimeout() {
Expand Down
24 changes: 24 additions & 0 deletions test/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3092,6 +3092,7 @@ func (w *Workflows) UpsertMemo(ctx workflow.Context, memo map[string]interface{}
}

func (w *Workflows) UserMetadata(ctx workflow.Context) error {
var activities *Activities
// Define an update and query handler
err := workflow.SetQueryHandlerWithOptions(
ctx,
Expand Down Expand Up @@ -3123,6 +3124,29 @@ func (w *Workflows) UserMetadata(ctx workflow.Context) error {
).Receive(ctx, nil)
workflow.SetCurrentDetails(ctx, "current-details-2")

// Start an activity with a description
ao := workflow.ActivityOptions{
StartToCloseTimeout: time.Minute,
Summary: "my-activity",
}
ctx = workflow.WithActivityOptions(ctx, ao)
var result string
err = workflow.ExecuteActivity(ctx, activities.EmptyActivity).Get(ctx, &result)
if err != nil {
return err
}

// Start a child workflow with a description
cwo := workflow.ChildWorkflowOptions{
StaticSummary: "my-child-wf-summary",
StaticDetails: "my-child-wf-details",
}
ctx = workflow.WithChildOptions(ctx, cwo)
err = workflow.ExecuteChildWorkflow(ctx, w.SimplestWorkflow).Get(ctx, nil)
if err != nil {
return err
}

// Run a short timer with a summary and return
return workflow.NewTimerWithOptions(
ctx,
Expand Down

0 comments on commit 643c803

Please sign in to comment.