Skip to content

Commit

Permalink
Merge branch 'master' into workflow-update-duplicate-id
Browse files Browse the repository at this point in the history
  • Loading branch information
yuandrew authored Nov 25, 2024
2 parents df5146f + c7fa7e8 commit 8d66fb2
Show file tree
Hide file tree
Showing 17 changed files with 230 additions and 129 deletions.
14 changes: 14 additions & 0 deletions contrib/datadog/tracing/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,20 @@ func (t *tracerImpl) ContextWithSpan(ctx context.Context, span interceptor.Trace
return tracer.ContextWithSpan(ctx, span.(*tracerSpan).Span)
}

// SpanFromWorkflowContext extracts the DataDog Span object from the workflow context.
func SpanFromWorkflowContext(ctx workflow.Context) (ddtrace.Span, bool) {
val := ctx.Value(activeSpanContextKey)
if val == nil {
return tracer.SpanFromContext(nil)
}

if span, ok := val.(*tracerSpan); ok {
return span.Span, true
}

return tracer.SpanFromContext(nil)
}

func genSpanID(idempotencyKey string) uint64 {
h := fnv.New64()
// Write() always writes all bytes and never fails; the count and error result are for implementing io.Writer.
Expand Down
43 changes: 42 additions & 1 deletion contrib/datadog/tracing/interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
package tracing

import (
"errors"
"strings"
"testing"

Expand All @@ -31,6 +32,9 @@ import (

"go.temporal.io/sdk/interceptor"
"go.temporal.io/sdk/internal/interceptortest"
"go.temporal.io/sdk/testsuite"
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"
)

type testTracer struct {
Expand Down Expand Up @@ -133,11 +137,48 @@ func Test_OnFinishOption(t *testing.T) {
Tracer: impl,
mt: mt,
}

interceptortest.RunTestWorkflowWithError(t, trc)

spans := trc.FinishedSpans()

require.Len(t, spans, 1)
require.Equal(t, "temporal.RunWorkflow", spans[0].Name)
}

func setCustomSpanTagWorkflow(ctx workflow.Context) error {
span, ok := SpanFromWorkflowContext(ctx)

if !ok {
return errors.New("Did not find span in workflow context")
}

span.SetTag("testTag", "testValue")
return nil
}

func Test_SpanFromWorkflowContext(t *testing.T) {
// Start the mock tracer.
mt := mocktracer.Start()
defer mt.Stop()

var suite testsuite.WorkflowTestSuite
env := suite.NewTestWorkflowEnvironment()
env.RegisterWorkflow(setCustomSpanTagWorkflow)

impl := NewTracer(TracerOptions{})
testTracer := testTracer{
Tracer: impl,
mt: mt,
}

// Set tracer interceptor
env.SetWorkerOptions(worker.Options{
Interceptors: []interceptor.WorkerInterceptor{interceptor.NewTracingInterceptor(testTracer)},
})

env.ExecuteWorkflow(setCustomSpanTagWorkflow)

require.True(t, env.IsWorkflowCompleted())
testSpan := mt.FinishedSpans()[0]
require.Equal(t, "testValue", testSpan.Tag("testTag"))
}
8 changes: 8 additions & 0 deletions internal/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,14 @@ type (
// build ID or not. See temporal.VersioningIntent.
// WARNING: Worker versioning is currently experimental
VersioningIntent VersioningIntent

// Summary is a single-line summary for this activity that will appear in UI/CLI. This can be
// in single-line Temporal Markdown format.
//
// Optional: defaults to none/empty.
//
// NOTE: Experimental
Summary string
}

// LocalActivityOptions stores local activity specific parameters that will be stored inside of a context.
Expand Down
2 changes: 1 addition & 1 deletion internal/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func testTimeoutErrorDetails(t *testing.T, timeoutType enumspb.TimeoutType) {
context.commandsHelper.scheduledEventIDToActivityID[5] = activityID
di := h.newActivityCommandStateMachine(
5,
&commandpb.ScheduleActivityTaskCommandAttributes{ActivityId: activityID})
&commandpb.ScheduleActivityTaskCommandAttributes{ActivityId: activityID}, nil)
di.state = commandStateInitiated
di.setData(&scheduledActivity{
callback: func(r *commonpb.Payloads, e error) {
Expand Down
1 change: 1 addition & 0 deletions internal/internal_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type (
RetryPolicy *commonpb.RetryPolicy
DisableEagerExecution bool
VersioningIntent VersioningIntent
Summary string
}

// ExecuteLocalActivityOptions options for executing a local activity
Expand Down
11 changes: 8 additions & 3 deletions internal/internal_command_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,9 @@ type (

activityCommandStateMachine struct {
*commandStateMachineBase
scheduleID int64
attributes *commandpb.ScheduleActivityTaskCommandAttributes
scheduleID int64
attributes *commandpb.ScheduleActivityTaskCommandAttributes
startMetadata *sdk.UserMetadata
}

cancelActivityStateMachine struct {
Expand Down Expand Up @@ -348,12 +349,14 @@ func (h *commandsHelper) newCommandStateMachineBase(commandType commandType, id
func (h *commandsHelper) newActivityCommandStateMachine(
scheduleID int64,
attributes *commandpb.ScheduleActivityTaskCommandAttributes,
startMetadata *sdk.UserMetadata,
) *activityCommandStateMachine {
base := h.newCommandStateMachineBase(commandTypeActivity, attributes.GetActivityId())
return &activityCommandStateMachine{
commandStateMachineBase: base,
scheduleID: scheduleID,
attributes: attributes,
startMetadata: startMetadata,
}
}

Expand Down Expand Up @@ -618,6 +621,7 @@ func (d *activityCommandStateMachine) getCommand() *commandpb.Command {
case commandStateCreated, commandStateCanceledBeforeSent:
command := createNewCommand(enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK)
command.Attributes = &commandpb.Command_ScheduleActivityTaskCommandAttributes{ScheduleActivityTaskCommandAttributes: d.attributes}
command.UserMetadata = d.startMetadata
return command
default:
return nil
Expand Down Expand Up @@ -1118,9 +1122,10 @@ func (h *commandsHelper) moveCommandToBack(command commandStateMachine) {
func (h *commandsHelper) scheduleActivityTask(
scheduleID int64,
attributes *commandpb.ScheduleActivityTaskCommandAttributes,
metadata *sdk.UserMetadata,
) commandStateMachine {
h.scheduledEventIDToActivityID[scheduleID] = attributes.GetActivityId()
command := h.newActivityCommandStateMachine(scheduleID, attributes)
command := h.newActivityCommandStateMachine(scheduleID, attributes, metadata)
h.addCommand(command)
return command
}
Expand Down
12 changes: 6 additions & 6 deletions internal/internal_command_state_machine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func Test_ActivityStateMachine_CompleteWithoutCancel(t *testing.T) {

// schedule activity
scheduleID := h.getNextID()
d := h.scheduleActivityTask(scheduleID, attributes)
d := h.scheduleActivityTask(scheduleID, attributes, nil)
require.Equal(t, commandStateCreated, d.getState())
commands := h.getCommands(true)
require.Equal(t, commandStateCommandSent, d.getState())
Expand All @@ -192,7 +192,7 @@ func Test_ActivityStateMachine_CancelBeforeSent(t *testing.T) {

// schedule activity
scheduleID := h.getNextID()
d := h.scheduleActivityTask(scheduleID, attributes)
d := h.scheduleActivityTask(scheduleID, attributes, nil)
require.Equal(t, commandStateCreated, d.getState())

// Cancel before command sent. We will send the command and the cancellation.
Expand All @@ -215,7 +215,7 @@ func Test_ActivityStateMachine_CancelAfterSent(t *testing.T) {

// schedule activity
scheduleID := h.getNextID()
d := h.scheduleActivityTask(scheduleID, attributes)
d := h.scheduleActivityTask(scheduleID, attributes, nil)
require.Equal(t, commandStateCreated, d.getState())
commands := h.getCommands(true)
require.Equal(t, 1, len(commands))
Expand Down Expand Up @@ -251,7 +251,7 @@ func Test_ActivityStateMachine_CompletedAfterCancel(t *testing.T) {

// schedule activity
scheduleID := h.getNextID()
d := h.scheduleActivityTask(scheduleID, attributes)
d := h.scheduleActivityTask(scheduleID, attributes, nil)
require.Equal(t, commandStateCreated, d.getState())
commands := h.getCommands(true)
require.Equal(t, 1, len(commands))
Expand Down Expand Up @@ -287,7 +287,7 @@ func Test_ActivityStateMachine_CancelInitiated_After_CanceledBeforeSent(t *testi

// schedule activity
scheduleID := h.getNextID()
d := h.scheduleActivityTask(scheduleID, attributes)
d := h.scheduleActivityTask(scheduleID, attributes, nil)
require.Equal(t, commandStateCreated, d.getState())

// cancel activity before sent
Expand Down Expand Up @@ -324,7 +324,7 @@ func Test_ActivityStateMachine_PanicInvalidStateTransition(t *testing.T) {

// schedule activity
scheduleID := h.getNextID()
h.scheduleActivityTask(scheduleID, attributes)
h.scheduleActivityTask(scheduleID, attributes, nil)

// verify that using invalid activity id will panic
err := runAndCatchPanic(func() {
Expand Down
10 changes: 8 additions & 2 deletions internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ func (wc *workflowEnvironmentImpl) ExecuteChildWorkflow(
attributes.InheritBuildId = determineInheritBuildIdFlagForCommand(
params.VersioningIntent, wc.workflowInfo.TaskQueueName, params.TaskQueueName)

startMetadata, err := buildUserMetadata(params.staticSummary, params.staticDetails, wc.dataConverter)
startMetadata, err := buildUserMetadata(params.StaticSummary, params.StaticDetails, wc.dataConverter)
if err != nil {
callback(nil, err)
return
Expand Down Expand Up @@ -759,7 +759,13 @@ func (wc *workflowEnvironmentImpl) ExecuteActivity(parameters ExecuteActivityPar
scheduleTaskAttr.UseWorkflowBuildId = determineInheritBuildIdFlagForCommand(
parameters.VersioningIntent, wc.workflowInfo.TaskQueueName, parameters.TaskQueueName)

command := wc.commandsHelper.scheduleActivityTask(scheduleID, scheduleTaskAttr)
startMetadata, err := buildUserMetadata(parameters.Summary, "", wc.dataConverter)
if err != nil {
callback(nil, err)
return ActivityID{}
}

command := wc.commandsHelper.scheduleActivityTask(scheduleID, scheduleTaskAttr, startMetadata)
command.setData(&scheduledActivity{
callback: callback,
waitForCancelRequest: parameters.WaitForCancellation,
Expand Down
5 changes: 2 additions & 3 deletions internal/internal_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,16 +222,15 @@ type (
SearchAttributes map[string]interface{}
TypedSearchAttributes SearchAttributes
ParentClosePolicy enumspb.ParentClosePolicy
StaticSummary string
StaticDetails string
signalChannels map[string]Channel
requestedSignalChannels map[string]*requestedSignalChannel
queryHandlers map[string]*queryHandler
updateHandlers map[string]*updateHandler
// runningUpdatesHandles is a map of update handlers that are currently running.
runningUpdatesHandles map[string]UpdateInfo
VersioningIntent VersioningIntent
// TODO(cretz): Expose once https://github.com/temporalio/temporal/issues/6412 is fixed
staticSummary string
staticDetails string
// currentDetails is the user-set string returned on metadata query as
// WorkflowMetadata.current_details
currentDetails string
Expand Down
20 changes: 1 addition & 19 deletions internal/internal_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1402,24 +1402,6 @@ func (s *WorkflowUnitTest) Test_MutatingFunctionsInQueries() {
s.NoError(env.GetWorkflowError())
}

type updateCallback struct {
accept func()
reject func(error)
complete func(interface{}, error)
}

func (uc *updateCallback) Accept() {
uc.accept()
}

func (uc *updateCallback) Reject(err error) {
uc.reject(err)
}

func (uc *updateCallback) Complete(success interface{}, err error) {
uc.complete(success, err)
}

func (s *WorkflowUnitTest) Test_MutatingFunctionsInUpdateValidator() {
env := s.NewTestWorkflowEnvironment()

Expand All @@ -1438,7 +1420,7 @@ func (s *WorkflowUnitTest) Test_MutatingFunctionsInUpdateValidator() {
}
env.RegisterWorkflow(wf)
env.RegisterDelayedCallback(func() {
env.UpdateWorkflow(updateType, "testID", &updateCallback{})
env.UpdateWorkflow(updateType, "testID", &TestUpdateCallback{})
}, time.Second)
env.ExecuteWorkflow(wf)
s.True(env.IsWorkflowCompleted())
Expand Down
34 changes: 24 additions & 10 deletions internal/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,9 +380,22 @@ type (
// WARNING: Worker versioning is currently experimental
VersioningIntent VersioningIntent

// TODO(cretz): Expose once https://github.com/temporalio/temporal/issues/6412 is fixed
staticSummary string
staticDetails string
// StaticSummary is a single-line fixed summary for this child workflow execution that will appear in UI/CLI. This can be
// in single-line Temporal Markdown format.
//
// Optional: defaults to none/empty.
//
// NOTE: Experimental
StaticSummary string

// Details - General fixed details for this child workflow execution that will appear in UI/CLI. This can be in
// Temporal markdown format and can span multiple lines. This is a fixed value on the workflow that cannot be
// updated. For details that can be updated, use SetCurrentDetails within the workflow.
//
// Optional: defaults to none/empty.
//
// NOTE: Experimental
StaticDetails string
}

// RegisterWorkflowOptions consists of options for registering a workflow
Expand Down Expand Up @@ -1083,7 +1096,8 @@ func (wc *workflowEnvironmentInterceptor) ExecuteChildWorkflow(ctx Context, chil
options.SearchAttributes = workflowOptionsFromCtx.SearchAttributes
options.TypedSearchAttributes = workflowOptionsFromCtx.TypedSearchAttributes
options.VersioningIntent = workflowOptionsFromCtx.VersioningIntent

options.StaticDetails = workflowOptionsFromCtx.StaticDetails
options.StaticSummary = workflowOptionsFromCtx.StaticSummary
header, err := workflowHeaderPropagated(ctx, options.ContextPropagators)
if err != nil {
executionSettable.Set(nil, err)
Expand Down Expand Up @@ -1609,9 +1623,8 @@ func WithChildWorkflowOptions(ctx Context, cwo ChildWorkflowOptions) Context {
wfOptions.TypedSearchAttributes = cwo.TypedSearchAttributes
wfOptions.ParentClosePolicy = cwo.ParentClosePolicy
wfOptions.VersioningIntent = cwo.VersioningIntent
// TODO(cretz): Expose once https://github.com/temporalio/temporal/issues/6412 is fixed
wfOptions.staticSummary = cwo.staticSummary
wfOptions.staticDetails = cwo.staticDetails
wfOptions.StaticSummary = cwo.StaticSummary
wfOptions.StaticDetails = cwo.StaticDetails

return ctx1
}
Expand All @@ -1638,9 +1651,8 @@ func GetChildWorkflowOptions(ctx Context) ChildWorkflowOptions {
TypedSearchAttributes: opts.TypedSearchAttributes,
ParentClosePolicy: opts.ParentClosePolicy,
VersioningIntent: opts.VersioningIntent,
// TODO(cretz): Expose once https://github.com/temporalio/temporal/issues/6412 is fixed
staticSummary: opts.staticSummary,
staticDetails: opts.staticDetails,
StaticSummary: opts.StaticSummary,
StaticDetails: opts.StaticDetails,
}
}

Expand Down Expand Up @@ -2163,6 +2175,7 @@ func WithActivityOptions(ctx Context, options ActivityOptions) Context {
eap.RetryPolicy = convertToPBRetryPolicy(options.RetryPolicy)
eap.DisableEagerExecution = options.DisableEagerExecution
eap.VersioningIntent = options.VersioningIntent
eap.Summary = options.Summary
return ctx1
}

Expand Down Expand Up @@ -2219,6 +2232,7 @@ func GetActivityOptions(ctx Context) ActivityOptions {
RetryPolicy: convertFromPBRetryPolicy(opts.RetryPolicy),
DisableEagerExecution: opts.DisableEagerExecution,
VersioningIntent: opts.VersioningIntent,
Summary: opts.Summary,
}
}

Expand Down
3 changes: 3 additions & 0 deletions internal/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ func TestGetChildWorkflowOptions(t *testing.T) {
},
ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL,
VersioningIntent: VersioningIntentDefault,
StaticSummary: "child workflow summary",
StaticDetails: "child workflow details",
}

// Require test options to have non-zero value for each field. This ensures that we update tests (and the
Expand All @@ -82,6 +84,7 @@ func TestGetActivityOptions(t *testing.T) {
RetryPolicy: newTestRetryPolicy(),
DisableEagerExecution: true,
VersioningIntent: VersioningIntentDefault,
Summary: "activity summary",
}

assertNonZero(t, opts)
Expand Down
Loading

0 comments on commit 8d66fb2

Please sign in to comment.