From 2cfec31aab565a914927ee352c5974e555ce4211 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Mon, 10 Jun 2024 11:08:44 -0700 Subject: [PATCH] Update server & add --reapply-exclude flag to reset (#588) --- temporalcli/commands.gen.go | 26 ++++---- temporalcli/commands.workflow_reset.go | 24 ++++++++ temporalcli/commands.workflow_reset_test.go | 67 +++++++++++++++++++++ temporalcli/commands_test.go | 1 + temporalcli/commandsmd/commands.md | 3 +- temporalcli/devserver/server.go | 4 +- 6 files changed, 111 insertions(+), 14 deletions(-) diff --git a/temporalcli/commands.gen.go b/temporalcli/commands.gen.go index edacff17..9ba527d2 100644 --- a/temporalcli/commands.gen.go +++ b/temporalcli/commands.gen.go @@ -1901,17 +1901,18 @@ func NewTemporalWorkflowQueryCommand(cctx *CommandContext, parent *TemporalWorkf } type TemporalWorkflowResetCommand struct { - Parent *TemporalWorkflowCommand - Command cobra.Command - WorkflowId string - RunId string - EventId int - Reason string - ReapplyType StringEnum - Type StringEnum - BuildId string - Query string - Yes bool + Parent *TemporalWorkflowCommand + Command cobra.Command + WorkflowId string + RunId string + EventId int + Reason string + ReapplyType StringEnum + ReapplyExclude []string + Type StringEnum + BuildId string + Query string + Yes bool } func NewTemporalWorkflowResetCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowResetCommand { @@ -1932,7 +1933,8 @@ func NewTemporalWorkflowResetCommand(cctx *CommandContext, parent *TemporalWorkf s.Command.Flags().StringVar(&s.Reason, "reason", "", "The reason why this workflow is being reset. Required.") _ = cobra.MarkFlagRequired(s.Command.Flags(), "reason") s.ReapplyType = NewStringEnum([]string{"All", "Signal", "None"}, "All") - s.Command.Flags().Var(&s.ReapplyType, "reapply-type", "Event types to reapply after the reset point. Accepted values: All, Signal, None.") + s.Command.Flags().Var(&s.ReapplyType, "reapply-type", "*DEPRECATED* Use --reapply-exclude. Event types to reapply after the reset point. Accepted values: All, Signal, None.") + s.Command.Flags().StringArrayVar(&s.ReapplyExclude, "reapply-exclude", nil, "Event types to exclude from reapplication. Accepted values: All, Signal, Update.") s.Type = NewStringEnum([]string{"FirstWorkflowTask", "LastWorkflowTask", "LastContinuedAsNew", "BuildId"}, "") s.Command.Flags().VarP(&s.Type, "type", "t", "Event type to which you want to reset. Accepted values: FirstWorkflowTask, LastWorkflowTask, LastContinuedAsNew, BuildId.") s.Command.Flags().StringVar(&s.BuildId, "build-id", "", "Only used if type is BuildId. Reset the first workflow task processed by this build id. Note that by default, this reset is allowed to be to a prior run in a chain of continue-as-new.") diff --git a/temporalcli/commands.workflow_reset.go b/temporalcli/commands.workflow_reset.go index 39dbe130..10a5ebb7 100644 --- a/temporalcli/commands.workflow_reset.go +++ b/temporalcli/commands.workflow_reset.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "strings" "github.com/google/uuid" "go.temporal.io/api/batch/v1" @@ -75,8 +76,30 @@ func (c *TemporalWorkflowResetCommand) doWorkflowReset(cctx *CommandContext, cl return fmt.Errorf("getting reset event ID by type failed: %w", err) } } + + reapplyExcludes := make([]enums.ResetReapplyExcludeType, 0) + for _, exclude := range c.ReapplyExclude { + if strings.ToLower(exclude) == "all" { + for _, excludeType := range enums.ResetReapplyExcludeType_value { + if excludeType == 0 { + continue + } + reapplyExcludes = append(reapplyExcludes, enums.ResetReapplyExcludeType(excludeType)) + } + break + } + excludeType, err := enums.ResetReapplyExcludeTypeFromString(exclude) + if err != nil { + return err + } + reapplyExcludes = append(reapplyExcludes, excludeType) + } + reapplyType := enums.RESET_REAPPLY_TYPE_SIGNAL if c.ReapplyType.Value != "All" { + if len(c.ReapplyExclude) > 0 { + return errors.New("cannot specify --reapply-type and --reapply-exclude at the same time") + } reapplyType, err = enums.ResetReapplyTypeFromString(c.ReapplyType.Value) if err != nil { return err @@ -94,6 +117,7 @@ func (c *TemporalWorkflowResetCommand) doWorkflowReset(cctx *CommandContext, cl Reason: fmt.Sprintf("%s: %s", username(), c.Reason), WorkflowTaskFinishEventId: eventID, ResetReapplyType: reapplyType, + ResetReapplyExcludeTypes: reapplyExcludes, }) if err != nil { return fmt.Errorf("failed to reset workflow: %w", err) diff --git a/temporalcli/commands.workflow_reset_test.go b/temporalcli/commands.workflow_reset_test.go index 86adff9e..af980cd1 100644 --- a/temporalcli/commands.workflow_reset_test.go +++ b/temporalcli/commands.workflow_reset_test.go @@ -248,6 +248,73 @@ func (s *SharedServerSuite) TestBatchResetByBuildId() { sut.stopWorkerFor("v3") } +func (s *SharedServerSuite) TestWorkflow_Reset_ReapplyExclude() { + var wfExecutions, timesSignalSeen int + s.Worker().OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) { + sigChan := workflow.GetSignalChannel(ctx, "sig") + workflow.Go(ctx, func(ctx workflow.Context) { + for { + sigChan.Receive(ctx, nil) + fmt.Println("saw signal", workflow.GetInfo(ctx).WorkflowExecution) + timesSignalSeen++ + } + }) + err := workflow.Sleep(ctx, 1*time.Second) + if err != nil { + return nil, err + } + wfExecutions++ + return nil, nil + }) + + // Start the workflow + searchAttr := "keyword-" + uuid.NewString() + run, err := s.Client.ExecuteWorkflow( + s.Context, + client.StartWorkflowOptions{ + TaskQueue: s.Worker().Options.TaskQueue, + SearchAttributes: map[string]any{"CustomKeywordField": searchAttr}, + }, + DevWorkflow, + "ignored", + ) + s.NoError(err) + + // Send a couple signals + s.NoError(s.Client.SignalWorkflow(s.Context, run.GetID(), run.GetRunID(), "sig", "1")) + s.NoError(s.Client.SignalWorkflow(s.Context, run.GetID(), run.GetRunID(), "sig", "2")) + + s.NoError(run.Get(s.Context, nil)) + s.Equal(1, wfExecutions) + + // Reset to the beginning, exclude signals + res := s.Execute( + "workflow", "reset", + "--address", s.Address(), + "-w", run.GetID(), + "--event-id", "3", + "--reason", "test-reset-FirstWorkflowTask", + "--reapply-exclude", "Signal", + ) + require.NoError(s.T(), res.Err) + s.awaitNextWorkflow(searchAttr) + s.Equal(2, timesSignalSeen, "Should only see original signals and not after reset") +} + +func (s *SharedServerSuite) TestWorkflow_Reset_DoesNotAllowBothApplyKinds() { + res := s.Execute( + "workflow", "reset", + "--address", s.Address(), + "-w", "whatever", + "--event-id", "3", + "--reason", "test-reset-FirstWorkflowTask", + "--reapply-exclude", "Signal", + "--reapply-type", "Signal", + ) + require.Error(s.T(), res.Err) + s.Contains(res.Err.Error(), "cannot specify --reapply-type and --reapply-exclude") +} + const ( badActivity = iota firstActivity diff --git a/temporalcli/commands_test.go b/temporalcli/commands_test.go index 1c573dd8..7cdf6bfd 100644 --- a/temporalcli/commands_test.go +++ b/temporalcli/commands_test.go @@ -333,6 +333,7 @@ func StartDevServer(t *testing.T, options DevServerOptions) *DevServer { d.Options.DynamicConfigValues["worker.buildIdScavengerEnabled"] = true d.Options.DynamicConfigValues["frontend.enableUpdateWorkflowExecution"] = true d.Options.DynamicConfigValues["frontend.MaxConcurrentBatchOperationPerNamespace"] = 1000 + d.Options.DynamicConfigValues["frontend.namespaceRPS.visibility"] = 100 d.Options.GRPCInterceptors = append( d.Options.GRPCInterceptors, diff --git a/temporalcli/commandsmd/commands.md b/temporalcli/commandsmd/commands.md index 357102f0..e858f04c 100644 --- a/temporalcli/commandsmd/commands.md +++ b/temporalcli/commandsmd/commands.md @@ -911,7 +911,8 @@ Use the options listed below to change reset behavior. * `--run-id`, `-r` (string) - Run Id. * `--event-id`, `-e` (int) - The Event Id for any Event after `WorkflowTaskStarted` you want to reset to (exclusive). It can be `WorkflowTaskCompleted`, `WorkflowTaskFailed` or others. * `--reason` (string) - The reason why this workflow is being reset. Required. -* `--reapply-type` (string-enum) - Event types to reapply after the reset point. Options: All, Signal, None. Default: All. +* `--reapply-type` (string-enum) - *DEPRECATED* Use --reapply-exclude. Event types to reapply after the reset point. Options: All, Signal, None. Default: All. +* `--reapply-exclude` (string[]) - Event types to exclude from reapplication. Options: All, Signal, Update. * `--type`, `-t` (string-enum) - Event type to which you want to reset. Options: FirstWorkflowTask, LastWorkflowTask, LastContinuedAsNew, BuildId. * `--build-id` (string) - Only used if type is BuildId. Reset the first workflow task processed by this build id. Note that by default, this reset is allowed to be to a prior run in a chain of continue-as-new. * `--query`, `-q` (string) - Start a batch reset to operate on Workflow Executions with given List Filter. diff --git a/temporalcli/devserver/server.go b/temporalcli/devserver/server.go index aef28f88..55a23d7b 100644 --- a/temporalcli/devserver/server.go +++ b/temporalcli/devserver/server.go @@ -200,9 +200,11 @@ func (s *StartOptions) buildServerOptions() ([]temporal.ServerOption, error) { temporal.WithClaimMapper(func(*config.Config) authorization.ClaimMapper { return claimMapper }), } - // Setting host level mutable state cache size to 8k. dynConf := make(dynamicconfig.StaticClient, len(s.DynamicConfigValues)+1) + // Setting host level mutable state cache size to 8k. dynConf[dynamicconfig.HistoryCacheHostLevelMaxSize] = 8096 + // Up default visibility RPS + dynConf[dynamicconfig.FrontendMaxNamespaceVisibilityRPSPerInstance] = 100 // Dynamic config if set for k, v := range s.DynamicConfigValues {