Skip to content

Commit

Permalink
Update server & add --reapply-exclude flag to reset (#588)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource authored Jun 10, 2024
1 parent da1e0aa commit 2cfec31
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 14 deletions.
26 changes: 14 additions & 12 deletions temporalcli/commands.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -1901,17 +1901,18 @@ func NewTemporalWorkflowQueryCommand(cctx *CommandContext, parent *TemporalWorkf
}

type TemporalWorkflowResetCommand struct {
Parent *TemporalWorkflowCommand
Command cobra.Command
WorkflowId string
RunId string
EventId int
Reason string
ReapplyType StringEnum
Type StringEnum
BuildId string
Query string
Yes bool
Parent *TemporalWorkflowCommand
Command cobra.Command
WorkflowId string
RunId string
EventId int
Reason string
ReapplyType StringEnum
ReapplyExclude []string
Type StringEnum
BuildId string
Query string
Yes bool
}

func NewTemporalWorkflowResetCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowResetCommand {
Expand All @@ -1932,7 +1933,8 @@ func NewTemporalWorkflowResetCommand(cctx *CommandContext, parent *TemporalWorkf
s.Command.Flags().StringVar(&s.Reason, "reason", "", "The reason why this workflow is being reset. Required.")
_ = cobra.MarkFlagRequired(s.Command.Flags(), "reason")
s.ReapplyType = NewStringEnum([]string{"All", "Signal", "None"}, "All")
s.Command.Flags().Var(&s.ReapplyType, "reapply-type", "Event types to reapply after the reset point. Accepted values: All, Signal, None.")
s.Command.Flags().Var(&s.ReapplyType, "reapply-type", "*DEPRECATED* Use --reapply-exclude. Event types to reapply after the reset point. Accepted values: All, Signal, None.")
s.Command.Flags().StringArrayVar(&s.ReapplyExclude, "reapply-exclude", nil, "Event types to exclude from reapplication. Accepted values: All, Signal, Update.")
s.Type = NewStringEnum([]string{"FirstWorkflowTask", "LastWorkflowTask", "LastContinuedAsNew", "BuildId"}, "")
s.Command.Flags().VarP(&s.Type, "type", "t", "Event type to which you want to reset. Accepted values: FirstWorkflowTask, LastWorkflowTask, LastContinuedAsNew, BuildId.")
s.Command.Flags().StringVar(&s.BuildId, "build-id", "", "Only used if type is BuildId. Reset the first workflow task processed by this build id. Note that by default, this reset is allowed to be to a prior run in a chain of continue-as-new.")
Expand Down
24 changes: 24 additions & 0 deletions temporalcli/commands.workflow_reset.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"strings"

"github.com/google/uuid"
"go.temporal.io/api/batch/v1"
Expand Down Expand Up @@ -75,8 +76,30 @@ func (c *TemporalWorkflowResetCommand) doWorkflowReset(cctx *CommandContext, cl
return fmt.Errorf("getting reset event ID by type failed: %w", err)
}
}

reapplyExcludes := make([]enums.ResetReapplyExcludeType, 0)
for _, exclude := range c.ReapplyExclude {
if strings.ToLower(exclude) == "all" {
for _, excludeType := range enums.ResetReapplyExcludeType_value {
if excludeType == 0 {
continue
}
reapplyExcludes = append(reapplyExcludes, enums.ResetReapplyExcludeType(excludeType))
}
break
}
excludeType, err := enums.ResetReapplyExcludeTypeFromString(exclude)
if err != nil {
return err
}
reapplyExcludes = append(reapplyExcludes, excludeType)
}

reapplyType := enums.RESET_REAPPLY_TYPE_SIGNAL
if c.ReapplyType.Value != "All" {
if len(c.ReapplyExclude) > 0 {
return errors.New("cannot specify --reapply-type and --reapply-exclude at the same time")
}
reapplyType, err = enums.ResetReapplyTypeFromString(c.ReapplyType.Value)
if err != nil {
return err
Expand All @@ -94,6 +117,7 @@ func (c *TemporalWorkflowResetCommand) doWorkflowReset(cctx *CommandContext, cl
Reason: fmt.Sprintf("%s: %s", username(), c.Reason),
WorkflowTaskFinishEventId: eventID,
ResetReapplyType: reapplyType,
ResetReapplyExcludeTypes: reapplyExcludes,
})
if err != nil {
return fmt.Errorf("failed to reset workflow: %w", err)
Expand Down
67 changes: 67 additions & 0 deletions temporalcli/commands.workflow_reset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,73 @@ func (s *SharedServerSuite) TestBatchResetByBuildId() {
sut.stopWorkerFor("v3")
}

func (s *SharedServerSuite) TestWorkflow_Reset_ReapplyExclude() {
var wfExecutions, timesSignalSeen int
s.Worker().OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) {
sigChan := workflow.GetSignalChannel(ctx, "sig")
workflow.Go(ctx, func(ctx workflow.Context) {
for {
sigChan.Receive(ctx, nil)
fmt.Println("saw signal", workflow.GetInfo(ctx).WorkflowExecution)
timesSignalSeen++
}
})
err := workflow.Sleep(ctx, 1*time.Second)
if err != nil {
return nil, err
}
wfExecutions++
return nil, nil
})

// Start the workflow
searchAttr := "keyword-" + uuid.NewString()
run, err := s.Client.ExecuteWorkflow(
s.Context,
client.StartWorkflowOptions{
TaskQueue: s.Worker().Options.TaskQueue,
SearchAttributes: map[string]any{"CustomKeywordField": searchAttr},
},
DevWorkflow,
"ignored",
)
s.NoError(err)

// Send a couple signals
s.NoError(s.Client.SignalWorkflow(s.Context, run.GetID(), run.GetRunID(), "sig", "1"))
s.NoError(s.Client.SignalWorkflow(s.Context, run.GetID(), run.GetRunID(), "sig", "2"))

s.NoError(run.Get(s.Context, nil))
s.Equal(1, wfExecutions)

// Reset to the beginning, exclude signals
res := s.Execute(
"workflow", "reset",
"--address", s.Address(),
"-w", run.GetID(),
"--event-id", "3",
"--reason", "test-reset-FirstWorkflowTask",
"--reapply-exclude", "Signal",
)
require.NoError(s.T(), res.Err)
s.awaitNextWorkflow(searchAttr)
s.Equal(2, timesSignalSeen, "Should only see original signals and not after reset")
}

func (s *SharedServerSuite) TestWorkflow_Reset_DoesNotAllowBothApplyKinds() {
res := s.Execute(
"workflow", "reset",
"--address", s.Address(),
"-w", "whatever",
"--event-id", "3",
"--reason", "test-reset-FirstWorkflowTask",
"--reapply-exclude", "Signal",
"--reapply-type", "Signal",
)
require.Error(s.T(), res.Err)
s.Contains(res.Err.Error(), "cannot specify --reapply-type and --reapply-exclude")
}

const (
badActivity = iota
firstActivity
Expand Down
1 change: 1 addition & 0 deletions temporalcli/commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ func StartDevServer(t *testing.T, options DevServerOptions) *DevServer {
d.Options.DynamicConfigValues["worker.buildIdScavengerEnabled"] = true
d.Options.DynamicConfigValues["frontend.enableUpdateWorkflowExecution"] = true
d.Options.DynamicConfigValues["frontend.MaxConcurrentBatchOperationPerNamespace"] = 1000
d.Options.DynamicConfigValues["frontend.namespaceRPS.visibility"] = 100

d.Options.GRPCInterceptors = append(
d.Options.GRPCInterceptors,
Expand Down
3 changes: 2 additions & 1 deletion temporalcli/commandsmd/commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -911,7 +911,8 @@ Use the options listed below to change reset behavior.
* `--run-id`, `-r` (string) - Run Id.
* `--event-id`, `-e` (int) - The Event Id for any Event after `WorkflowTaskStarted` you want to reset to (exclusive). It can be `WorkflowTaskCompleted`, `WorkflowTaskFailed` or others.
* `--reason` (string) - The reason why this workflow is being reset. Required.
* `--reapply-type` (string-enum) - Event types to reapply after the reset point. Options: All, Signal, None. Default: All.
* `--reapply-type` (string-enum) - *DEPRECATED* Use --reapply-exclude. Event types to reapply after the reset point. Options: All, Signal, None. Default: All.
* `--reapply-exclude` (string[]) - Event types to exclude from reapplication. Options: All, Signal, Update.
* `--type`, `-t` (string-enum) - Event type to which you want to reset. Options: FirstWorkflowTask, LastWorkflowTask, LastContinuedAsNew, BuildId.
* `--build-id` (string) - Only used if type is BuildId. Reset the first workflow task processed by this build id. Note that by default, this reset is allowed to be to a prior run in a chain of continue-as-new.
* `--query`, `-q` (string) - Start a batch reset to operate on Workflow Executions with given List Filter.
Expand Down
4 changes: 3 additions & 1 deletion temporalcli/devserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,11 @@ func (s *StartOptions) buildServerOptions() ([]temporal.ServerOption, error) {
temporal.WithClaimMapper(func(*config.Config) authorization.ClaimMapper { return claimMapper }),
}

// Setting host level mutable state cache size to 8k.
dynConf := make(dynamicconfig.StaticClient, len(s.DynamicConfigValues)+1)
// Setting host level mutable state cache size to 8k.
dynConf[dynamicconfig.HistoryCacheHostLevelMaxSize] = 8096
// Up default visibility RPS
dynConf[dynamicconfig.FrontendMaxNamespaceVisibilityRPSPerInstance] = 100

// Dynamic config if set
for k, v := range s.DynamicConfigValues {
Expand Down

0 comments on commit 2cfec31

Please sign in to comment.