diff --git a/temporalcli/commands.workflow_reset.go b/temporalcli/commands.workflow_reset.go index e5e802a3..a4284591 100644 --- a/temporalcli/commands.workflow_reset.go +++ b/temporalcli/commands.workflow_reset.go @@ -72,38 +72,14 @@ func (c *TemporalWorkflowResetCommand) doWorkflowReset(cctx *CommandContext, cl eventID := int64(c.EventId) if c.Type.Value != "" { resetBaseRunID, eventID, err = c.getResetEventIDByType(cctx, cl) - if err != nil { - return fmt.Errorf("getting reset event ID by type failed: %w", err) - } - } - - reapplyExcludes := make([]enums.ResetReapplyExcludeType, 0) - for _, exclude := range c.ReapplyExclude.Values { - if strings.ToLower(exclude) == "all" { - for _, excludeType := range enums.ResetReapplyExcludeType_value { - if excludeType == 0 { - continue - } - reapplyExcludes = append(reapplyExcludes, enums.ResetReapplyExcludeType(excludeType)) - } - break - } - excludeType, err := enums.ResetReapplyExcludeTypeFromString(exclude) if err != nil { return err } - reapplyExcludes = append(reapplyExcludes, excludeType) } - reapplyType := enums.RESET_REAPPLY_TYPE_ALL_ELIGIBLE - if c.ReapplyType.Value != "All" { - if len(c.ReapplyExclude.Values) > 0 { - return errors.New("--reapply-type cannot be used with --reapply-exclude. Use --reapply-exclude.") - } - reapplyType, err = enums.ResetReapplyTypeFromString(c.ReapplyType.Value) - if err != nil { - return err - } + reapplyExcludes, reapplyType, err := getResetReapplyAndExcludeTypes(c.ReapplyExclude.Values, c.ReapplyType.Value) + if err != nil { + return err } cctx.Printer.Printlnf("Resetting workflow %s to event ID %d", c.WorkflowId, eventID) @@ -138,10 +114,15 @@ func (c *TemporalWorkflowResetCommand) runBatchReset(cctx *CommandContext, cl cl VisibilityQuery: c.Query, Reason: c.Reason, } + + batchResetOptions, err := c.batchResetOptions() + if err != nil { + return err + } request.Operation = &workflowservice.StartBatchOperationRequest_ResetOperation{ ResetOperation: &batch.BatchOperationReset{ Identity: clientIdentity(), - Options: c.batchResetOptions(c.Type.Value), + Options: batchResetOptions, }, } count, err := cl.CountWorkflow(cctx, &workflowservice.CountWorkflowExecutionsRequest{Query: c.Query}) @@ -160,22 +141,33 @@ func (c *TemporalWorkflowResetCommand) runBatchReset(cctx *CommandContext, cl cl return startBatchJob(cctx, cl, &request) } -func (c *TemporalWorkflowResetCommand) batchResetOptions(resetType string) *common.ResetOptions { - switch resetType { +func (c *TemporalWorkflowResetCommand) batchResetOptions() (*common.ResetOptions, error) { + reapplyExcludes, reapplyType, err := getResetReapplyAndExcludeTypes(c.ReapplyExclude.Values, c.ReapplyType.Value) + if err != nil { + return nil, err + } + + switch c.Type.Value { case "FirstWorkflowTask": return &common.ResetOptions{ - Target: &common.ResetOptions_FirstWorkflowTask{}, - } + Target: &common.ResetOptions_FirstWorkflowTask{}, + ResetReapplyExcludeTypes: reapplyExcludes, + ResetReapplyType: reapplyType, + }, nil case "LastWorkflowTask": return &common.ResetOptions{ - Target: &common.ResetOptions_LastWorkflowTask{}, - } + Target: &common.ResetOptions_LastWorkflowTask{}, + ResetReapplyExcludeTypes: reapplyExcludes, + ResetReapplyType: reapplyType, + }, nil case "BuildId": return &common.ResetOptions{ Target: &common.ResetOptions_BuildId{ BuildId: c.BuildId, }, - } + ResetReapplyExcludeTypes: reapplyExcludes, + ResetReapplyType: reapplyType, + }, nil default: panic("unsupported operation type was filtered by cli framework") } @@ -314,3 +306,38 @@ func getLastContinueAsNewID(ctx context.Context, namespace, wid, rid string, wfs } return } + +func getResetReapplyAndExcludeTypes(resetReapplyExclude []string, resetReapplyType string) ([]enums.ResetReapplyExcludeType, enums.ResetReapplyType, error) { + var err error + + var reapplyExcludes []enums.ResetReapplyExcludeType + for _, exclude := range resetReapplyExclude { + if strings.ToLower(exclude) == "all" { + for _, excludeType := range enums.ResetReapplyExcludeType_value { + if excludeType == int32(enums.RESET_REAPPLY_EXCLUDE_TYPE_UNSPECIFIED) { + continue + } + reapplyExcludes = append(reapplyExcludes, enums.ResetReapplyExcludeType(excludeType)) + } + break + } + excludeType, err := enums.ResetReapplyExcludeTypeFromString(exclude) + if err != nil { + return nil, enums.RESET_REAPPLY_TYPE_UNSPECIFIED, err + } + reapplyExcludes = append(reapplyExcludes, excludeType) + } + + returnReapplyType := enums.RESET_REAPPLY_TYPE_ALL_ELIGIBLE + if resetReapplyType != "All" { + if len(resetReapplyExclude) > 0 { + return nil, enums.RESET_REAPPLY_TYPE_UNSPECIFIED, errors.New("--reapply-type cannot be used with --reapply-exclude. Use --reapply-exclude") + } + returnReapplyType, err = enums.ResetReapplyTypeFromString(resetReapplyType) + if err != nil { + return nil, enums.RESET_REAPPLY_TYPE_UNSPECIFIED, err + } + } + + return reapplyExcludes, returnReapplyType, nil +} diff --git a/temporalcli/commands.workflow_reset_test.go b/temporalcli/commands.workflow_reset_test.go index e8da74dd..c850a3d7 100644 --- a/temporalcli/commands.workflow_reset_test.go +++ b/temporalcli/commands.workflow_reset_test.go @@ -77,6 +77,48 @@ func (s *SharedServerSuite) TestWorkflow_Reset_ToFirstWorkflowTask() { s.Greater(activityExecutions, 1, "Should have re-executed the workflow from the beginning") } +func (s *SharedServerSuite) TestWorkflow_ResetBatch_ToFirstWorkflowTask() { + var wfExecutions, activityExecutions int + s.Worker().OnDevActivity(func(ctx context.Context, a any) (any, error) { + activityExecutions++ + return nil, nil + }) + s.Worker().OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) { + workflow.ExecuteActivity(ctx, DevActivity, 1).Get(ctx, nil) + wfExecutions++ + return nil, nil + }) + + // Start the workflow + searchAttr := "keyword-" + uuid.NewString() + run, err := s.Client.ExecuteWorkflow( + s.Context, + client.StartWorkflowOptions{ + TaskQueue: s.Worker().Options.TaskQueue, + SearchAttributes: map[string]any{"CustomKeywordField": searchAttr}, + }, + DevWorkflow, + "ignored", + ) + s.NoError(err) + var junk any + s.NoError(run.Get(s.Context, &junk)) + s.Equal(1, wfExecutions) + + s.CommandHarness.Stdin.WriteString("y\n") + res := s.Execute( + "workflow", "reset", + "--address", s.Address(), + "--query", fmt.Sprintf("CustomKeywordField = '%s'", searchAttr), + "-t", "FirstWorkflowTask", + "--reason", "test-reset-FirstWorkflowTask", + ) + require.NoError(s.T(), res.Err) + s.awaitNextWorkflow(searchAttr) + s.Equal(2, wfExecutions, "Should have re-executed the workflow from the beginning") + s.Greater(activityExecutions, 1, "Should have re-executed the workflow from the beginning") +} + func (s *SharedServerSuite) TestWorkflow_Reset_ToLastWorkflowTask() { var wfExecutions, activityExecutions int s.Worker().OnDevActivity(func(ctx context.Context, a any) (any, error) { @@ -119,6 +161,48 @@ func (s *SharedServerSuite) TestWorkflow_Reset_ToLastWorkflowTask() { s.Equal(1, activityExecutions, "Should not have re-executed the activity") } +func (s *SharedServerSuite) TestWorkflow_ResetBatch_ToLastWorkflowTask() { + var wfExecutions, activityExecutions int + s.Worker().OnDevActivity(func(ctx context.Context, a any) (any, error) { + activityExecutions++ + return nil, nil + }) + s.Worker().OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) { + workflow.ExecuteActivity(ctx, DevActivity, 1).Get(ctx, nil) + wfExecutions++ + return nil, nil + }) + + // Start the workflow + searchAttr := "keyword-" + uuid.NewString() + run, err := s.Client.ExecuteWorkflow( + s.Context, + client.StartWorkflowOptions{ + TaskQueue: s.Worker().Options.TaskQueue, + SearchAttributes: map[string]any{"CustomKeywordField": searchAttr}, + }, + DevWorkflow, + "ignored", + ) + s.NoError(err) + var junk any + s.NoError(run.Get(s.Context, &junk)) + s.Equal(1, wfExecutions) + + s.CommandHarness.Stdin.WriteString("y\n") + res := s.Execute( + "workflow", "reset", + "--address", s.Address(), + "--query", fmt.Sprintf("CustomKeywordField = '%s'", searchAttr), + "-t", "LastWorkflowTask", + "--reason", "test-reset-LastWorkflowTask", + ) + require.NoError(s.T(), res.Err) + s.awaitNextWorkflow(searchAttr) + s.Equal(2, wfExecutions, "Should re-executed the workflow") + s.Equal(1, activityExecutions, "Should not have re-executed the activity") +} + func (s *SharedServerSuite) TestWorkflow_Reset_ToEventID() { // We execute two activities and will resume just before the second one. We use the same activity for both // but a unique input so we can check which fake activity is executed @@ -248,6 +332,94 @@ func (s *SharedServerSuite) TestBatchResetByBuildId() { sut.stopWorkerFor("v3") } +func (s *SharedServerSuite) TestWorkflow_ResetBatch_OnlyMatchingQuery() { + var resetWfExecutions, resetActivityExecutions int + var nonResetWfExecutions, nonResetActivityExecutions int + s.Worker().OnDevActivity(func(ctx context.Context, a any) (any, error) { + isReset, ok := a.(bool) + if !ok { + return nil, fmt.Errorf("expected bool, not %T (%v)", a, a) + } + if isReset { + resetActivityExecutions++ + } else { + nonResetActivityExecutions++ + } + return nil, nil + }) + s.Worker().OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) { + workflow.ExecuteActivity(ctx, DevActivity, a).Get(ctx, nil) + isReset, ok := a.(bool) + if !ok { + return nil, fmt.Errorf("expected bool, not %T (%v)", a, a) + } + if isReset { + resetWfExecutions++ + } else { + nonResetWfExecutions++ + } + return nil, nil + }) + + resetSearchAttr := "keyword-" + uuid.NewString() + run, err := s.Client.ExecuteWorkflow( + s.Context, + client.StartWorkflowOptions{ + TaskQueue: s.Worker().Options.TaskQueue, + SearchAttributes: map[string]any{"CustomKeywordField": resetSearchAttr}, + }, + DevWorkflow, + true, + ) + s.NoError(err) + var junk any + s.NoError(run.Get(s.Context, &junk)) + s.Equal(1, resetWfExecutions) + + nonResetSearchAttr := "keyword-" + uuid.NewString() + nonResetRun, err := s.Client.ExecuteWorkflow( + s.Context, + client.StartWorkflowOptions{ + TaskQueue: s.Worker().Options.TaskQueue, + SearchAttributes: map[string]any{"CustomKeywordField": nonResetSearchAttr}, + }, + DevWorkflow, + false, + ) + s.NoError(err) + s.NoError(nonResetRun.Get(s.Context, &junk)) + s.Equal(1, nonResetWfExecutions) + + s.CommandHarness.Stdin.WriteString("y\n") + res := s.Execute( + "workflow", "reset", + "--address", s.Address(), + "--query", fmt.Sprintf("CustomKeywordField = '%s'", resetSearchAttr), + "-t", "FirstWorkflowTask", + "--reason", "test-reset-FirstWorkflowTask", + ) + require.NoError(s.T(), res.Err) + s.Eventually(func() bool { + resp, err := s.Client.ListWorkflow(s.Context, &workflowservice.ListWorkflowExecutionsRequest{ + Query: "CustomKeywordField = '" + resetSearchAttr + "'" + " OR " + "CustomKeywordField = '" + nonResetSearchAttr + "'", + }) + s.NoError(err) + if len(resp.Executions) != 3 { + return false + } + for _, exec := range resp.Executions { + if exec.Status != enums.WORKFLOW_EXECUTION_STATUS_COMPLETED { + return false + } + } + return true + }, 3*time.Second, 100*time.Millisecond) + s.Equal(2, resetWfExecutions, "Should have re-executed the workflow from the beginning") + s.Equal(2, resetActivityExecutions, "Should have re-executed the workflow from the beginning") + s.Equal(1, nonResetWfExecutions, "Should not have re-executed the non-matching workflow") + s.Equal(1, nonResetActivityExecutions, "Should not have re-executed the non-matching workflow") +} + type WorkflowResetTest struct { s *SharedServerSuite reapplyType string @@ -262,7 +434,16 @@ func (s *SharedServerSuite) TestWorkflow_Reset_DefaultReappliesAll() { expectUpdatesReapplied: true, expectSignalsReapplied: true, } - t.run() + t.runSingleReset() +} + +func (s *SharedServerSuite) TestWorkflow_ResetBatch_ReappliesAll() { + t := WorkflowResetTest{ + s: s, + expectUpdatesReapplied: true, + expectSignalsReapplied: true, + } + t.runBatchReset() } func (s *SharedServerSuite) TestWorkflow_Reset_ExcludeUpdate() { @@ -272,7 +453,17 @@ func (s *SharedServerSuite) TestWorkflow_Reset_ExcludeUpdate() { expectUpdatesReapplied: false, expectSignalsReapplied: true, } - t.run() + t.runSingleReset() +} + +func (s *SharedServerSuite) TestWorkflow_ResetBatch_ExcludeUpdate() { + t := WorkflowResetTest{ + s: s, + reapplyExclude: []string{"Update"}, + expectUpdatesReapplied: false, + expectSignalsReapplied: true, + } + t.runBatchReset() } func (s *SharedServerSuite) TestWorkflow_Reset_ExcludeSignal() { @@ -282,7 +473,17 @@ func (s *SharedServerSuite) TestWorkflow_Reset_ExcludeSignal() { expectUpdatesReapplied: true, expectSignalsReapplied: false, } - t.run() + t.runSingleReset() +} + +func (s *SharedServerSuite) TestWorkflow_ResetBatch_ExcludeSignal() { + t := WorkflowResetTest{ + s: s, + reapplyExclude: []string{"Signal"}, + expectUpdatesReapplied: true, + expectSignalsReapplied: false, + } + t.runBatchReset() } func (s *SharedServerSuite) TestWorkflow_Reset_ExcludeSignalAndUpdate() { @@ -292,7 +493,17 @@ func (s *SharedServerSuite) TestWorkflow_Reset_ExcludeSignalAndUpdate() { expectUpdatesReapplied: false, expectSignalsReapplied: false, } - t.run() + t.runSingleReset() +} + +func (s *SharedServerSuite) TestWorkflow_ResetBatch_ExcludeSignalAndUpdate() { + t := WorkflowResetTest{ + s: s, + reapplyExclude: []string{"Signal", "Update"}, + expectUpdatesReapplied: false, + expectSignalsReapplied: false, + } + t.runBatchReset() } func (s *SharedServerSuite) TestWorkflow_Reset_ReapplySignalOnly() { @@ -302,13 +513,32 @@ func (s *SharedServerSuite) TestWorkflow_Reset_ReapplySignalOnly() { expectUpdatesReapplied: false, expectSignalsReapplied: true, } - t.run() + t.runSingleReset() +} + +func (s *SharedServerSuite) TestWorkflow_ResetBatch_ReapplySignalOnly() { + t := WorkflowResetTest{ + s: s, + reapplyType: "Signal", + expectUpdatesReapplied: false, + expectSignalsReapplied: true, + } + t.runBatchReset() +} + +func (t *WorkflowResetTest) runSingleReset() { + t.run(false) +} + +func (t *WorkflowResetTest) runBatchReset() { + t.run(true) } -func (t *WorkflowResetTest) run() { +func (t *WorkflowResetTest) run(resetBatch bool) { s := t.s var wfExecutions, updateHandlerExecutions, signalHandlerExecutions int s.Worker().OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) { + // Handle signals sigChan := workflow.GetSignalChannel(ctx, "mySignal") workflow.Go(ctx, func(ctx workflow.Context) { @@ -333,9 +563,14 @@ func (t *WorkflowResetTest) run() { searchAttr := "keyword-" + uuid.NewString() run := t.startWorkflowAndSendTwoSignalsAndTwoUpdates(searchAttr) s.Equal(2, updateHandlerExecutions) + s.Equal(2, signalHandlerExecutions) s.Equal(1, wfExecutions) - t.resetWorkflow(run.GetID()) + if resetBatch { + t.resetBatchWorkflow(searchAttr) + } else { + t.resetWorkflow(run.GetID()) + } s.awaitNextWorkflow(searchAttr) if t.expectUpdatesReapplied { @@ -365,6 +600,19 @@ func (t *WorkflowResetTest) startWorkflowAndSendTwoSignalsAndTwoUpdates(searchAt ) s.NoError(err) + // Wait for the workflow to start before sending signals/updates. + // This has to be done, as batch reset with type `FirstWorkflowTask`, will reset to first workflow task completed, so the first signal + // sent before the workflow starts, will be reapplied, as the reset point is later in the history. + // The same would happen with single reset to eventId 4. + s.Eventually(func() bool { + resp, err := s.Client.ListWorkflow(s.Context, &workflowservice.ListWorkflowExecutionsRequest{ + Query: "CustomKeywordField = '" + searchAttr + "'", + }) + s.NoError(err) + return len(resp.Executions) > 0 + }, 3*time.Second, 100*time.Millisecond, "Workflow failed to start") + + // before sending signals, we wait for the workflow to execute the activity for i := 1; i <= 2; i++ { s.NoError(s.Client.SignalWorkflow(s.Context, run.GetID(), run.GetRunID(), "mySignal", fmt.Sprintf("%d", i))) updateHandle, err := s.Client.UpdateWorkflow(s.Context, client.UpdateWorkflowOptions{ @@ -405,6 +653,31 @@ func (t *WorkflowResetTest) resetWorkflow(workflowID string) { require.NoError(s.T(), res.Err) } +func (t *WorkflowResetTest) resetBatchWorkflow(searchAttr string) { + s := t.s + args := []string{ + "workflow", "reset", + "--address", s.Address(), + "--query", fmt.Sprintf("CustomKeywordField = '%s'", searchAttr), + "--type", "FirstWorkflowTask", + "--reason", "test-workflow-reset", + } + if len(t.reapplyExclude) > 0 && t.reapplyType != "" { + panic("--reapply-type cannot be used with --reapply-exclude") + } + if t.reapplyType != "" { + args = append(args, "--reapply-type", t.reapplyType) + } + if len(t.reapplyExclude) > 0 { + for _, exclude := range t.reapplyExclude { + args = append(args, "--reapply-exclude", exclude) + } + } + s.CommandHarness.Stdin.WriteString("y\n") + res := s.Execute(args...) + require.NoError(s.T(), res.Err) +} + func (s *SharedServerSuite) TestWorkflow_Reset_DoesNotAllowBothReapplyOptions() { res := s.Execute( "workflow", "reset",