diff --git a/test/nexus_test.go b/test/nexus_test.go index dee4333dd..89ac977ca 100644 --- a/test/nexus_test.go +++ b/test/nexus_test.go @@ -42,6 +42,7 @@ import ( nexuspb "go.temporal.io/api/nexus/v1" "go.temporal.io/api/operatorservice/v1" "go.temporal.io/api/serviceerror" + "google.golang.org/protobuf/proto" "go.temporal.io/sdk/client" "go.temporal.io/sdk/converter" @@ -385,10 +386,22 @@ func TestNexusWorkflowRunOperation(t *testing.T) { nc := tc.newNexusClient(t, service.Name) + link := &common.Link_WorkflowEvent{ + Namespace: tc.testConfig.Namespace, + WorkflowId: "caller-wf-id", + RunId: "caller-run-id", + Reference: &common.Link_WorkflowEvent_EventRef{ + EventRef: &common.Link_WorkflowEvent_EventReference{ + EventType: enums.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED, + }, + }, + } + workflowID := "nexus-handler-workflow-" + uuid.NewString() result, err := nexus.StartOperation(ctx, nc, workflowOp, workflowID, nexus.StartOperationOptions{ CallbackURL: "http://localhost/test", CallbackHeader: nexus.Header{"test": "ok"}, + Links: []nexus.Link{temporalnexus.ConvertLinkWorkflowEventToNexusLink(link)}, }) require.NoError(t, err) require.NotNil(t, result.Pending) @@ -403,6 +416,17 @@ func TestNexusWorkflowRunOperation(t *testing.T) { require.Equal(t, "http://localhost/test", callback.Nexus.Url) require.Subset(t, callback.Nexus.Header, map[string]string{"test": "ok"}) + iter := tc.client.GetWorkflowHistory(ctx, workflowID, "", false, enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT) + for iter.HasNext() { + event, err := iter.Next() + require.NoError(t, err) + if event.GetEventType() == enums.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED { + require.Len(t, event.GetLinks(), 1) + require.True(t, proto.Equal(link, event.GetLinks()[0].GetWorkflowEvent())) + break + } + } + run := tc.client.GetWorkflow(ctx, workflowID, "") require.NoError(t, handle.Cancel(ctx, nexus.CancelOperationOptions{})) require.ErrorContains(t, run.Get(ctx, nil), "canceled") @@ -545,19 +569,26 @@ func TestAsyncOperationFromWorkflow(t *testing.T) { panic(fmt.Errorf("unexpected outcome: %s", action)) } } - op := temporalnexus.NewWorkflowRunOperation("op", handlerWorkflow, func(ctx context.Context, action string, soo nexus.StartOperationOptions) (client.StartWorkflowOptions, error) { - require.NotPanicsf(t, func() { - temporalnexus.GetMetricsHandler(ctx) - temporalnexus.GetLogger(ctx) - }, "Failed to get metrics handler or logger from operation context.") + handlerWfID := "" + op := temporalnexus.NewWorkflowRunOperation( + "op", + handlerWorkflow, + func(ctx context.Context, action string, soo nexus.StartOperationOptions) (client.StartWorkflowOptions, error) { + require.NotPanicsf(t, func() { + temporalnexus.GetMetricsHandler(ctx) + temporalnexus.GetLogger(ctx) + }, "Failed to get metrics handler or logger from operation context.") - if action == "fail-to-start" { - return client.StartWorkflowOptions{}, nexus.HandlerErrorf(nexus.HandlerErrorTypeInternal, "fake internal error") - } - return client.StartWorkflowOptions{ - ID: soo.RequestID, - }, nil - }) + handlerWfID = "" + if action == "fail-to-start" { + return client.StartWorkflowOptions{}, nexus.HandlerErrorf(nexus.HandlerErrorTypeInternal, "fake internal error") + } + handlerWfID = soo.RequestID + return client.StartWorkflowOptions{ + ID: soo.RequestID, + }, nil + }, + ) callerWorkflow := func(ctx workflow.Context, action string) error { c := workflow.NewNexusClient(tc.endpoint, "test") ctx, cancel := workflow.WithCancel(ctx) @@ -611,6 +642,74 @@ func TestAsyncOperationFromWorkflow(t *testing.T) { }, callerWorkflow, "succeed") require.NoError(t, err) require.NoError(t, run.Get(ctx, nil)) + + // Check the link is added in the caller workflow. + iter := tc.client.GetWorkflowHistory( + ctx, + run.GetID(), + run.GetRunID(), + false, + enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT, + ) + var nexusOperationScheduleEventID int64 + var targetEvent *historypb.HistoryEvent + for iter.HasNext() { + event, err := iter.Next() + require.NoError(t, err) + if event.GetEventType() == enums.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED { + nexusOperationScheduleEventID = event.GetEventId() + } else if event.GetEventType() == enums.EVENT_TYPE_NEXUS_OPERATION_STARTED { + targetEvent = event + break + } + } + require.NotNil(t, targetEvent) + require.Len(t, targetEvent.GetLinks(), 1) + link := targetEvent.GetLinks()[0] + require.Equal(t, tc.testConfig.Namespace, link.GetWorkflowEvent().GetNamespace()) + require.Equal(t, handlerWfID, link.GetWorkflowEvent().GetWorkflowId()) + require.NotEmpty(t, link.GetWorkflowEvent().GetRunId()) + require.True(t, proto.Equal( + &common.Link_WorkflowEvent_EventReference{ + EventType: enums.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED, + }, + link.GetWorkflowEvent().GetEventRef(), + )) + handlerRunID := link.GetWorkflowEvent().GetRunId() + + // Check the link is added in the handler workflow. + iter = tc.client.GetWorkflowHistory( + ctx, + handlerWfID, + handlerRunID, + false, + enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT, + ) + targetEvent = nil + for iter.HasNext() { + event, err := iter.Next() + require.NoError(t, err) + if event.GetEventType() == enums.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED { + targetEvent = event + break + } + } + require.NotNil(t, targetEvent) + require.Len(t, targetEvent.GetLinks(), 1) + require.True(t, proto.Equal( + &common.Link_WorkflowEvent{ + Namespace: tc.testConfig.Namespace, + WorkflowId: run.GetID(), + RunId: run.GetRunID(), + Reference: &common.Link_WorkflowEvent_EventRef{ + EventRef: &common.Link_WorkflowEvent_EventReference{ + EventId: nexusOperationScheduleEventID, + EventType: enums.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED, + }, + }, + }, + targetEvent.GetLinks()[0].GetWorkflowEvent(), + )) }) t.Run("OpFailed", func(t *testing.T) {