From 528e29c864eae52b943d56b42781378537614a2f Mon Sep 17 00:00:00 2001 From: Ilya Ozherelyev Date: Wed, 22 May 2024 14:45:59 +0200 Subject: [PATCH] fix direct error type assertion and a few mishandled cases with !ok instead of ok --- bench/lib/client.go | 2 +- canary/canary.go | 2 +- canary/client.go | 2 +- canary/timeout.go | 2 +- .../persistence/sql/sqlplugin/postgres/db.go | 9 +++++---- service/frontend/admin/handler.go | 2 +- service/history/decision/task_handler.go | 3 ++- .../engine/engineimpl/history_engine2_test.go | 12 +++--------- .../engine/engineimpl/poll_mutable_state.go | 4 ++-- .../engineimpl/start_workflow_execution.go | 4 +++- service/history/execution/context.go | 4 ++-- service/history/ndc/activity_replicator.go | 3 ++- service/history/ndc/workflow_resetter_test.go | 4 ++-- service/history/queue/task_allocator.go | 6 +++--- .../queue/timer_queue_standby_processor.go | 4 +++- .../history/queue/transfer_queue_processor.go | 2 +- service/history/replication/task_executor.go | 8 ++++++-- service/history/replication/task_fetcher.go | 5 +++-- service/history/shard/context.go | 7 +++---- service/history/task/cross_cluster_task.go | 10 +++++----- service/history/task/priority_assigner.go | 3 ++- service/history/task/task.go | 18 +++++++++--------- service/history/task/task_util.go | 7 ++++--- .../task/transfer_active_task_executor.go | 12 ++++++------ .../task/transfer_task_executor_base.go | 8 ++++---- service/history/workflow/util.go | 3 ++- service/matching/tasklist/forwarder.go | 2 +- service/worker/batcher/workflow.go | 5 +++-- service/worker/scanner/history/scavenger.go | 3 ++- service/worker/scanner/tasklist/db.go | 4 ++-- service/worker/service.go | 3 ++- tools/cli/domain_commands.go | 10 +++++----- 32 files changed, 92 insertions(+), 81 deletions(-) diff --git a/bench/lib/client.go b/bench/lib/client.go index 0d978cb4460..72be948a53f 100644 --- a/bench/lib/client.go +++ b/bench/lib/client.go @@ -65,7 +65,7 @@ func (client CadenceClient) CreateDomain(name string, desc string, owner string) defer cancel() err := client.Register(ctx, req) if err != nil { - if errors.As(err, new(*shared.DomainAlreadyExistsError)) { + if !errors.As(err, new(*shared.DomainAlreadyExistsError)) { return err } } diff --git a/canary/canary.go b/canary/canary.go index 25658c49723..7db3bfd3c2a 100644 --- a/canary/canary.go +++ b/canary/canary.go @@ -159,7 +159,7 @@ func (c *canaryImpl) startCronWorkflow() { if err != nil { // TODO: improvement: compare the cron schedule to decide whether or not terminating the current one // https://github.com/uber/cadence/issues/4469 - if errors.As(err, new(*shared.WorkflowExecutionAlreadyStartedError)) { + if !errors.As(err, new(*shared.WorkflowExecutionAlreadyStartedError)) { c.runtime.logger.Error("error starting cron workflow", zap.Error(err)) } else { c.runtime.logger.Info("cron workflow already started, you may need to terminate and restart if cron schedule is changed...") diff --git a/canary/client.go b/canary/client.go index 926cbcf4a7d..88151150d40 100644 --- a/canary/client.go +++ b/canary/client.go @@ -75,7 +75,7 @@ func (client *cadenceClient) createDomain(name string, desc string, owner string } err := client.Register(context.Background(), req) if err != nil { - if errors.As(err, new(*shared.DomainAlreadyExistsError)) { + if !errors.As(err, new(*shared.DomainAlreadyExistsError)) { return err } } diff --git a/canary/timeout.go b/canary/timeout.go index bbb015b38a1..a4d4abf152f 100644 --- a/canary/timeout.go +++ b/canary/timeout.go @@ -62,7 +62,7 @@ func timeoutWorkflow(ctx workflow.Context, inputScheduledTimeNanos int64) error activityErr := activityFuture.Get(ctx, nil) if activityErr != nil { - if errors.As(err, new(*workflow.TimeoutError)) { + if !errors.As(err, new(*workflow.TimeoutError)) { workflow.GetLogger(ctx).Info("activity timeout failed", zap.Error(activityErr)) } else { activityErr = nil diff --git a/common/persistence/sql/sqlplugin/postgres/db.go b/common/persistence/sql/sqlplugin/postgres/db.go index 27bb029537c..f4f213d7035 100644 --- a/common/persistence/sql/sqlplugin/postgres/db.go +++ b/common/persistence/sql/sqlplugin/postgres/db.go @@ -23,6 +23,7 @@ package postgres import ( "context" "database/sql" + "errors" "time" "github.com/jmoiron/sqlx" @@ -56,8 +57,8 @@ const ErrInsufficientResources = "53000" const ErrTooManyConnections = "53300" func (pdb *db) IsDupEntryError(err error) bool { - sqlErr, ok := err.(*pq.Error) - return ok && sqlErr.Code == ErrDupEntry + var sqlErr *pq.Error + return errors.As(err, &sqlErr) && sqlErr.Code == ErrDupEntry } func (pdb *db) IsNotFoundError(err error) bool { @@ -69,8 +70,8 @@ func (pdb *db) IsTimeoutError(err error) bool { } func (pdb *db) IsThrottlingError(err error) bool { - sqlErr, ok := err.(*pq.Error) - if ok { + var sqlErr *pq.Error + if errors.As(err, &sqlErr) { if sqlErr.Code == ErrTooManyConnections || sqlErr.Code == ErrInsufficientResources { return true diff --git a/service/frontend/admin/handler.go b/service/frontend/admin/handler.go index b3f180b4667..1041adb02b0 100644 --- a/service/frontend/admin/handler.go +++ b/service/frontend/admin/handler.go @@ -800,7 +800,7 @@ func (adh *adminHandlerImpl) GetWorkflowExecutionRawHistoryV2( DomainName: request.GetDomain(), }) if err != nil { - if _, ok := err.(*types.EntityNotExistsError); ok { + if errors.As(err, new(*types.EntityNotExistsError)) { // when no events can be returned from DB, DB layer will return // EntityNotExistsError, this API shall return empty response return &types.GetWorkflowExecutionRawHistoryV2Response{ diff --git a/service/history/decision/task_handler.go b/service/history/decision/task_handler.go index d5cf742334c..7a7aa8808f2 100644 --- a/service/history/decision/task_handler.go +++ b/service/history/decision/task_handler.go @@ -22,6 +22,7 @@ package decision import ( "context" + "errors" "fmt" "github.com/pborman/uuid" @@ -1074,7 +1075,7 @@ func (handler *taskHandlerImpl) validateDecisionAttr( ) error { if err := validationFn(); err != nil { - if _, ok := err.(*types.BadRequestError); ok { + if errors.As(err, new(*types.BadRequestError)) { return handler.handlerFailDecision(failedCause, err.Error()) } return err diff --git a/service/history/engine/engineimpl/history_engine2_test.go b/service/history/engine/engineimpl/history_engine2_test.go index 6aab47487f6..1ca46de2723 100644 --- a/service/history/engine/engineimpl/history_engine2_test.go +++ b/service/history/engine/engineimpl/history_engine2_test.go @@ -1225,9 +1225,7 @@ func (s *engine2Suite) TestStartWorkflowExecution_StillRunning_NonDeDup() { RequestID: "newRequestID", }, }) - if _, ok := err.(*types.WorkflowExecutionAlreadyStartedError); !ok { - s.Fail("return err is not *types.WorkflowExecutionAlreadyStartedError") - } + s.ErrorAs(err, new(*types.WorkflowExecutionAlreadyStartedError)) s.Nil(resp) } @@ -1425,9 +1423,7 @@ func (s *engine2Suite) TestStartWorkflowExecution_NotRunning_PrevSuccess() { }) if expectedErrs[index] { - if _, ok := err.(*types.WorkflowExecutionAlreadyStartedError); !ok { - s.Fail("return err is not *types.WorkflowExecutionAlreadyStartedError") - } + s.ErrorAs(err, new(*types.WorkflowExecutionAlreadyStartedError)) s.Nil(resp) } else { s.Nil(err) @@ -1515,9 +1511,7 @@ func (s *engine2Suite) TestStartWorkflowExecution_NotRunning_PrevFail() { }) if expectedErrs[j] { - if _, ok := err.(*types.WorkflowExecutionAlreadyStartedError); !ok { - s.Fail("return err is not *types.WorkflowExecutionAlreadyStartedError") - } + s.ErrorAs(err, new(*types.WorkflowExecutionAlreadyStartedError)) s.Nil(resp) } else { s.Nil(err) diff --git a/service/history/engine/engineimpl/poll_mutable_state.go b/service/history/engine/engineimpl/poll_mutable_state.go index 535da71ea96..1f09842bb98 100644 --- a/service/history/engine/engineimpl/poll_mutable_state.go +++ b/service/history/engine/engineimpl/poll_mutable_state.go @@ -24,6 +24,7 @@ package engineimpl import ( "bytes" "context" + "errors" "time" "github.com/uber/cadence/common" @@ -120,8 +121,7 @@ func (e *historyEngineImpl) getMutableState( } func (e *historyEngineImpl) updateEntityNotExistsErrorOnPassiveCluster(err error, domainID string) error { - switch err.(type) { - case *types.EntityNotExistsError: + if errors.As(err, new(*types.EntityNotExistsError)) { domainEntry, domainCacheErr := e.shard.GetDomainCache().GetDomainByID(domainID) if domainCacheErr != nil { return err // if could not access domain cache simply return original error diff --git a/service/history/engine/engineimpl/start_workflow_execution.go b/service/history/engine/engineimpl/start_workflow_execution.go index 06e58209fe9..0dfb7dd1f50 100644 --- a/service/history/engine/engineimpl/start_workflow_execution.go +++ b/service/history/engine/engineimpl/start_workflow_execution.go @@ -23,6 +23,7 @@ package engineimpl import ( "context" + "errors" "fmt" "time" @@ -232,7 +233,8 @@ func (e *historyEngineImpl) startWorkflowHelper( return nil, t } // handle already started error - if t, ok := err.(*persistence.WorkflowExecutionAlreadyStartedError); ok { + var t *persistence.WorkflowExecutionAlreadyStartedError + if errors.As(err, &t) { if t.StartRequestID == request.GetRequestID() { return &types.StartWorkflowExecutionResponse{ diff --git a/service/history/execution/context.go b/service/history/execution/context.go index c093474c79c..cd11d46a3c1 100644 --- a/service/history/execution/context.go +++ b/service/history/execution/context.go @@ -1154,7 +1154,7 @@ func createWorkflowExecutionWithRetry( return err } isRetryable := func(err error) bool { - if _, ok := err.(*persistence.TimeoutError); ok { + if errors.As(err, new(*persistence.TimeoutError)) { // TODO: is timeout error retryable for create workflow? // if we treat it as retryable, user may receive workflowAlreadyRunning error // on the first start workflow execution request. @@ -1239,7 +1239,7 @@ func updateWorkflowExecutionWithRetry( // checker, _ := taskvalidator.NewWfChecker(zapLogger, metricsClient, domainCache, executionManager, historymanager) isRetryable := func(err error) bool { - if _, ok := err.(*persistence.TimeoutError); ok { + if errors.As(err, new(*persistence.TimeoutError)) { // timeout error is not retryable for update workflow execution return false } diff --git a/service/history/ndc/activity_replicator.go b/service/history/ndc/activity_replicator.go index b68903de79c..9e904ea437a 100644 --- a/service/history/ndc/activity_replicator.go +++ b/service/history/ndc/activity_replicator.go @@ -24,6 +24,7 @@ package ndc import ( ctx "context" + "errors" "time" "github.com/uber/cadence/common" @@ -100,7 +101,7 @@ func (r *activityReplicatorImpl) SyncActivity( mutableState, err := context.LoadWorkflowExecution(ctx) if err != nil { - if _, ok := err.(*types.EntityNotExistsError); !ok { + if !errors.As(err, new(*types.EntityNotExistsError)) { return err } diff --git a/service/history/ndc/workflow_resetter_test.go b/service/history/ndc/workflow_resetter_test.go index d8fcbfc4336..0e4c903258d 100644 --- a/service/history/ndc/workflow_resetter_test.go +++ b/service/history/ndc/workflow_resetter_test.go @@ -251,8 +251,8 @@ func (s *workflowResetterSuite) TestResetWorkflow_Error() { s.IsType(&types.RetryTaskV2Error{}, err) s.Nil(rebuiltMutableState) - retryErr, isRetryError := err.(*types.RetryTaskV2Error) - s.True(isRetryError) + var retryErr *types.RetryTaskV2Error + s.ErrorAs(err, &retryErr) expectedErr := &types.RetryTaskV2Error{ Message: resendOnResetWorkflowMessage, DomainID: s.domainID, diff --git a/service/history/queue/task_allocator.go b/service/history/queue/task_allocator.go index e636d958cb9..671bce113e0 100644 --- a/service/history/queue/task_allocator.go +++ b/service/history/queue/task_allocator.go @@ -72,7 +72,7 @@ func (t *taskAllocatorImpl) VerifyActiveTask(taskDomainID string, task interface if err != nil { // it is possible that the domain is deleted // we should treat that domain as active - if _, ok := err.(*types.EntityNotExistsError); !ok { + if !errors.As(err, new(*types.EntityNotExistsError)) { t.logger.Warn("Cannot find domain", tag.WorkflowDomainID(taskDomainID)) return false, err } @@ -108,7 +108,7 @@ func (t *taskAllocatorImpl) VerifyFailoverActiveTask(targetDomainIDs map[string] if err != nil { // it is possible that the domain is deleted // we should treat that domain as not active - if _, ok := err.(*types.EntityNotExistsError); !ok { + if !errors.As(err, new(*types.EntityNotExistsError)) { t.logger.Warn("Cannot find domain", tag.WorkflowDomainID(taskDomainID)) return false, err } @@ -139,7 +139,7 @@ func (t *taskAllocatorImpl) VerifyStandbyTask(standbyCluster string, taskDomainI if err != nil { // it is possible that the domain is deleted // we should treat that domain as not active - if _, ok := err.(*types.EntityNotExistsError); !ok { + if !errors.As(err, new(*types.EntityNotExistsError)) { t.logger.Warn("Cannot find domain", tag.WorkflowDomainID(taskDomainID)) return false, err } diff --git a/service/history/queue/timer_queue_standby_processor.go b/service/history/queue/timer_queue_standby_processor.go index 3ed59858925..595017a68ce 100644 --- a/service/history/queue/timer_queue_standby_processor.go +++ b/service/history/queue/timer_queue_standby_processor.go @@ -21,6 +21,8 @@ package queue import ( + "errors" + "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/persistence" @@ -62,7 +64,7 @@ func newTimerQueueStandbyProcessor( return true, nil } } else { - if _, ok := err.(*types.EntityNotExistsError); !ok { + if !errors.As(err, new(*types.EntityNotExistsError)) { // retry the task if failed to find the domain logger.Warn("Cannot find domain", tag.WorkflowDomainID(timer.DomainID)) return false, err diff --git a/service/history/queue/transfer_queue_processor.go b/service/history/queue/transfer_queue_processor.go index 02af6cf2e2e..69b2cc5bf19 100644 --- a/service/history/queue/transfer_queue_processor.go +++ b/service/history/queue/transfer_queue_processor.go @@ -555,7 +555,7 @@ func newTransferQueueStandbyProcessor( return true, nil } } else { - if _, ok := err.(*types.EntityNotExistsError); !ok { + if !errors.As(err, new(*types.EntityNotExistsError)) { // retry the task if failed to find the domain logger.Warn("Cannot find domain", tag.WorkflowDomainID(task.DomainID)) return false, err diff --git a/service/history/replication/task_executor.go b/service/history/replication/task_executor.go index 85904488c9e..f07dbeaf853 100644 --- a/service/history/replication/task_executor.go +++ b/service/history/replication/task_executor.go @@ -22,6 +22,7 @@ package replication import ( "context" + "errors" "github.com/uber/cadence/common" "github.com/uber/cadence/common/cache" @@ -306,6 +307,9 @@ func (e *taskExecutorImpl) filterTask( } func toRetryTaskV2Error(err error) (*types.RetryTaskV2Error, bool) { - retError, ok := err.(*types.RetryTaskV2Error) - return retError, ok + var retError *types.RetryTaskV2Error + if errors.As(err, &retError) { + return retError, true + } + return nil, false } diff --git a/service/history/replication/task_fetcher.go b/service/history/replication/task_fetcher.go index f8540553651..12444eb62bc 100644 --- a/service/history/replication/task_fetcher.go +++ b/service/history/replication/task_fetcher.go @@ -22,6 +22,7 @@ package replication import ( "context" + "errors" "sync" "sync/atomic" "time" @@ -235,7 +236,7 @@ func (f *taskFetcherImpl) fetchTasks() { // When timer fires, we collect all the requests we have so far and attempt to send them to remote. err := f.fetchAndDistributeTasksFn(requestByShard) if err != nil { - if _, ok := err.(*types.ServiceBusyError); ok { + if errors.As(err, new(*types.ServiceBusyError)) { // slow down replication when source cluster is busy timer.Reset(f.config.ReplicationTaskFetcherServiceBusyWait()) } else { @@ -265,7 +266,7 @@ func (f *taskFetcherImpl) fetchAndDistributeTasks(requestByShard map[int32]*requ messagesByShard, err := f.getMessages(requestByShard) if err != nil { - if _, ok := err.(*types.ServiceBusyError); !ok { + if !errors.As(err, new(*types.ServiceBusyError)) { f.logger.Error("Failed to get replication tasks", tag.Error(err)) } else { f.logger.Debug("Failed to get replication tasks because service busy") diff --git a/service/history/shard/context.go b/service/history/shard/context.go index cdf0fe6dd9d..30c3b0bd0dd 100644 --- a/service/history/shard/context.go +++ b/service/history/shard/context.go @@ -1146,7 +1146,7 @@ func (s *contextImpl) persistShardInfoLocked( if err != nil { // Shard is stolen, trigger history engine shutdown - if _, ok := err.(*persistence.ShardOwnershipLostError); ok { + if errors.As(err, new(*persistence.ShardOwnershipLostError)) { s.logger.Warn( "Closing shard: updateShardInfoLocked failed due to stolen shard.", tag.Error(err), @@ -1515,8 +1515,7 @@ func acquireShard( if persistence.IsTransientError(err) { return true } - _, ok := err.(*persistence.ShardAlreadyExistError) - return ok + return errors.As(err, new(*persistence.ShardAlreadyExistError)) } getShard := func() error { @@ -1527,7 +1526,7 @@ func acquireShard( shardInfo = resp.ShardInfo return nil } - if _, ok := err.(*types.EntityNotExistsError); !ok { + if !errors.As(err, new(*types.EntityNotExistsError)) { return err } diff --git a/service/history/task/cross_cluster_task.go b/service/history/task/cross_cluster_task.go index 5886c8eb4a9..4e082a4ae8d 100644 --- a/service/history/task/cross_cluster_task.go +++ b/service/history/task/cross_cluster_task.go @@ -374,18 +374,18 @@ func (t *crossClusterSourceTask) HandleErr( logEvent(t.eventLogger, "Handling task processing error", err) - if _, ok := err.(*types.EntityNotExistsError); ok { + if errors.As(err, new(*types.EntityNotExistsError)) { return nil } - if _, ok := err.(*types.WorkflowExecutionAlreadyCompletedError); ok { + if errors.As(err, new(*types.WorkflowExecutionAlreadyCompletedError)) { return nil } - if err == errWorkflowBusy { + if errors.Is(err, errWorkflowBusy) { t.scope.IncCounter(metrics.TaskWorkflowBusyPerDomain) return err } - if err == ErrTaskPendingActive { + if errors.Is(err, ErrTaskPendingActive) { t.scope.IncCounter(metrics.TaskPendingActiveCounterPerDomain) return err } @@ -393,7 +393,7 @@ func (t *crossClusterSourceTask) HandleErr( // convert to a (passive) transfer task t.scope.IncCounter(metrics.TaskFailuresPerDomain) - if _, ok := err.(*persistence.CurrentWorkflowConditionFailedError); ok { + if errors.As(err, new(*persistence.CurrentWorkflowConditionFailedError)) { t.logger.Error("More than 2 workflow are running.", tag.Error(err), tag.LifeCycleProcessingFailed) return nil } diff --git a/service/history/task/priority_assigner.go b/service/history/task/priority_assigner.go index db81f30247c..2c88a8711de 100644 --- a/service/history/task/priority_assigner.go +++ b/service/history/task/priority_assigner.go @@ -21,6 +21,7 @@ package task import ( + "errors" "sync" "github.com/uber/cadence/common" @@ -134,7 +135,7 @@ func (a *priorityAssignerImpl) Assign(queueTask Task) error { func (a *priorityAssignerImpl) getDomainInfo(domainID string) (string, bool, error) { domainEntry, err := a.domainCache.GetDomainByID(domainID) if err != nil { - if _, ok := err.(*types.EntityNotExistsError); !ok { + if !errors.As(err, new(*types.EntityNotExistsError)) { a.logger.Warn("Cannot find domain", tag.WorkflowDomainID(domainID)) return "", false, err } diff --git a/service/history/task/task.go b/service/history/task/task.go index 27edc7527cc..1dcbb88073f 100644 --- a/service/history/task/task.go +++ b/service/history/task/task.go @@ -240,27 +240,27 @@ func (t *taskImpl) HandleErr(err error) (retErr error) { logEvent(t.eventLogger, "Handling task processing error", err) - if _, ok := err.(*types.EntityNotExistsError); ok { + if errors.As(err, new(*types.EntityNotExistsError)) { return nil - } else if _, ok := err.(*types.WorkflowExecutionAlreadyCompletedError); ok { + } else if errors.As(err, new(*types.WorkflowExecutionAlreadyCompletedError)) { return nil } if transferTask, ok := t.Info.(*persistence.TransferTaskInfo); ok && transferTask.TaskType == persistence.TransferTaskTypeCloseExecution && - err == execution.ErrMissingWorkflowStartEvent && + errors.Is(err, execution.ErrMissingWorkflowStartEvent) && t.shard.GetConfig().EnableDropStuckTaskByDomainID(t.Info.GetDomainID()) { // use domainID here to avoid accessing domainCache t.scope.IncCounter(metrics.TransferTaskMissingEventCounterPerDomain) t.logger.Error("Drop close execution transfer task due to corrupted workflow history", tag.Error(err), tag.LifeCycleProcessingFailed) return nil } - if err == errWorkflowBusy { + if errors.Is(err, errWorkflowBusy) { t.scope.IncCounter(metrics.TaskWorkflowBusyPerDomain) return err } - if err == errWorkflowRateLimited { + if errors.Is(err, errWorkflowRateLimited) { // metrics are emitted within the rate limiter return err } @@ -278,17 +278,17 @@ func (t *taskImpl) HandleErr(err error) (retErr error) { } // this is a transient error during graceful failover - if err == ErrTaskPendingActive { + if errors.Is(err, ErrTaskPendingActive) { t.scope.IncCounter(metrics.TaskPendingActiveCounterPerDomain) return err } - if err == ErrTaskDiscarded { + if errors.Is(err, ErrTaskDiscarded) { t.scope.IncCounter(metrics.TaskDiscardedPerDomain) err = nil } - if err == execution.ErrMissingVersionHistories { + if errors.Is(err, execution.ErrMissingVersionHistories) { t.logger.Error("Encounter 2DC workflow during task processing.") t.scope.IncCounter(metrics.TaskUnsupportedPerDomain) err = nil @@ -310,7 +310,7 @@ func (t *taskImpl) HandleErr(err error) (retErr error) { t.scope.IncCounter(metrics.TaskFailuresPerDomain) - if _, ok := err.(*persistence.CurrentWorkflowConditionFailedError); ok { + if errors.As(err, new(*persistence.CurrentWorkflowConditionFailedError)) { t.logger.Error("More than 2 workflow are running.", tag.Error(err), tag.LifeCycleProcessingFailed) return nil } diff --git a/service/history/task/task_util.go b/service/history/task/task_util.go index 6506377cece..1dc0e844c65 100644 --- a/service/history/task/task_util.go +++ b/service/history/task/task_util.go @@ -22,6 +22,7 @@ package task import ( "context" + "errors" "fmt" "github.com/golang/mock/gomock" @@ -273,7 +274,7 @@ func loadMutableStateForTimerTask( ) (execution.MutableState, error) { msBuilder, err := wfContext.LoadWorkflowExecution(ctx) if err != nil { - if _, ok := err.(*types.EntityNotExistsError); ok { + if errors.As(err, new(*types.EntityNotExistsError)) { // this could happen if this is a duplicate processing of the task, and the execution has already completed. return nil, nil } @@ -328,7 +329,7 @@ func loadMutableStateForTransferTask( msBuilder, err := wfContext.LoadWorkflowExecution(ctx) if err != nil { - if _, ok := err.(*types.EntityNotExistsError); ok { + if errors.As(err, new(*types.EntityNotExistsError)) { // this could happen if this is a duplicate processing of the task, and the execution has already completed. return nil, nil } @@ -384,7 +385,7 @@ func loadMutableStateForCrossClusterTask( msBuilder, err := wfContext.LoadWorkflowExecution(ctx) if err != nil { - if _, ok := err.(*types.EntityNotExistsError); ok { + if errors.As(err, new(*types.EntityNotExistsError)) { // this could happen if this is a duplicate processing of the task, and the execution has already completed. return nil, nil } diff --git a/service/history/task/transfer_active_task_executor.go b/service/history/task/transfer_active_task_executor.go index 77b2ff9a98b..7ae789c2e4a 100644 --- a/service/history/task/transfer_active_task_executor.go +++ b/service/history/task/transfer_active_task_executor.go @@ -286,7 +286,7 @@ func (t *transferActiveTaskExecutor) processDecisionTask( } err = t.pushDecision(ctx, task, taskList, decisionTimeout, mutableState.GetExecutionInfo().PartitionConfig) - if _, ok := err.(*types.StickyWorkerUnavailableError); ok { + if errors.As(err, new(*types.StickyWorkerUnavailableError)) { // sticky worker is unavailable, switch to non-sticky task list taskList = &types.TaskList{ Name: mutableState.GetExecutionInfo().TaskList, @@ -867,7 +867,7 @@ func (t *transferActiveTaskExecutor) processStartChildExecution( var targetDomainName string var targetDomainEntry *cache.DomainCacheEntry if targetDomainEntry, err = t.shard.GetDomainCache().GetDomainByID(task.TargetDomainID); err != nil { - if _, ok := err.(*types.EntityNotExistsError); !ok { + if !errors.As(err, new(*types.EntityNotExistsError)) { return err } // TODO: handle the case where target domain does not exist @@ -1697,7 +1697,7 @@ func signalExternalExecutionWithRetry( backoff.WithRetryableError(common.IsServiceTransientError), ) err := throttleRetry.Do(context.Background(), op) - if _, ok := err.(*types.DomainNotActiveError); ok { + if errors.As(err, new(*types.DomainNotActiveError)) { err = errTargetDomainNotActive } return err @@ -1757,7 +1757,7 @@ func startWorkflowWithRetry( // Get parent domain name domainName, err := domainCache.GetDomainName(task.DomainID) if err != nil { - if _, ok := err.(*types.EntityNotExistsError); !ok { + if !errors.As(err, new(*types.EntityNotExistsError)) { return "", err } // it is possible that the domain got deleted. Use domainID instead as this is only needed for the history event @@ -1811,7 +1811,7 @@ func startWorkflowWithRetry( backoff.WithRetryableError(common.IsServiceTransientError), ) if err := throttleRetry.Do(context.Background(), op); err != nil { - if _, ok := err.(*types.DomainNotActiveError); ok { + if errors.As(err, new(*types.DomainNotActiveError)) { err = errTargetDomainNotActive } return "", err @@ -2117,7 +2117,7 @@ func applyParentClosePolicy( } } - if _, ok := err.(*types.DomainNotActiveError); ok { + if errors.As(err, new(*types.DomainNotActiveError)) { err = errTargetDomainNotActive } return err diff --git a/service/history/task/transfer_task_executor_base.go b/service/history/task/transfer_task_executor_base.go index 94c8fda52a0..79d636b254c 100644 --- a/service/history/task/transfer_task_executor_base.go +++ b/service/history/task/transfer_task_executor_base.go @@ -22,6 +22,7 @@ package task import ( "context" + "errors" "time" "github.com/uber/cadence/client/matching" @@ -161,7 +162,7 @@ func (t *transferTaskExecutorBase) recordWorkflowStarted( domain := defaultDomainName if domainEntry, err := t.shard.GetDomainCache().GetDomainByID(domainID); err != nil { - if _, ok := err.(*types.EntityNotExistsError); !ok { + if !errors.As(err, new(*types.EntityNotExistsError)) { return err } } else { @@ -234,7 +235,7 @@ func (t *transferTaskExecutorBase) upsertWorkflowExecution( domain, err := t.shard.GetDomainCache().GetDomainName(domainID) if err != nil { - if _, ok := err.(*types.EntityNotExistsError); !ok { + if !errors.As(err, new(*types.EntityNotExistsError)) { return err } domain = defaultDomainName @@ -415,6 +416,5 @@ func copySearchAttributes( } func isWorkflowNotExistError(err error) bool { - _, ok := err.(*types.EntityNotExistsError) - return ok + return errors.As(err, new(*types.EntityNotExistsError)) } diff --git a/service/history/workflow/util.go b/service/history/workflow/util.go index f853b1bdd5b..0a06729b2ad 100644 --- a/service/history/workflow/util.go +++ b/service/history/workflow/util.go @@ -22,6 +22,7 @@ package workflow import ( "context" + "errors" "time" "github.com/uber/cadence/common/cache" @@ -261,7 +262,7 @@ UpdateHistoryLoop: } err = workflowContext.GetContext().UpdateWorkflowExecutionAsActive(ctx, now) - if _, ok := err.(*persistence.DuplicateRequestError); ok { + if errors.As(err, new(*persistence.DuplicateRequestError)) { return nil } if execution.IsConflictError(err) { diff --git a/service/matching/tasklist/forwarder.go b/service/matching/tasklist/forwarder.go index eb6ecb341a1..b02e2828967 100644 --- a/service/matching/tasklist/forwarder.go +++ b/service/matching/tasklist/forwarder.go @@ -283,7 +283,7 @@ func (fwdr *Forwarder) refreshTokenC(value *atomic.Value, curr *int32, maxLimit } func (fwdr *Forwarder) handleErr(err error) error { - if _, ok := err.(*types.ServiceBusyError); ok { + if errors.As(err, new(*types.ServiceBusyError)) { return errForwarderSlowDown } return err diff --git a/service/worker/batcher/workflow.go b/service/worker/batcher/workflow.go index a9be88846df..5ebc3a46154 100644 --- a/service/worker/batcher/workflow.go +++ b/service/worker/batcher/workflow.go @@ -22,6 +22,7 @@ package batcher import ( "context" + "errors" "fmt" "time" @@ -491,7 +492,7 @@ func processTask( err = procFn(wf.GetWorkflowID(), wf.GetRunID()) if err != nil { // EntityNotExistsError means wf is not running or deleted - if _, ok := err.(*types.EntityNotExistsError); ok { + if errors.As(err, new(*types.EntityNotExistsError)) { continue } return err @@ -505,7 +506,7 @@ func processTask( }) if err != nil { // EntityNotExistsError means wf is deleted - if _, ok := err.(*types.EntityNotExistsError); ok { + if errors.As(err, new(*types.EntityNotExistsError)) { continue } return err diff --git a/service/worker/scanner/history/scavenger.go b/service/worker/scanner/history/scavenger.go index 391efaa9a62..c7ca1c4e0b6 100644 --- a/service/worker/scanner/history/scavenger.go +++ b/service/worker/scanner/history/scavenger.go @@ -22,6 +22,7 @@ package history import ( "context" + "errors" "time" "go.uber.org/cadence/activity" @@ -250,7 +251,7 @@ func (s *Scavenger) startTaskProcessor( }) if err != nil { - if _, ok := err.(*types.EntityNotExistsError); ok { + if errors.As(err, new(*types.EntityNotExistsError)) { // deleting history branch var branchToken []byte branchToken, err = p.NewHistoryBranchTokenByBranchID(task.treeID, task.branchID) diff --git a/service/worker/scanner/tasklist/db.go b/service/worker/scanner/tasklist/db.go index d0a10884a3e..41a92794aa6 100644 --- a/service/worker/scanner/tasklist/db.go +++ b/service/worker/scanner/tasklist/db.go @@ -22,6 +22,7 @@ package tasklist import ( "context" + "errors" "time" "github.com/uber/cadence/common/backoff" @@ -136,8 +137,7 @@ func (s *Scavenger) deleteTaskList(info *p.TaskListInfo) error { throttleRetry := backoff.NewThrottleRetry( backoff.WithRetryPolicy(retryForeverPolicy), backoff.WithRetryableError(func(err error) bool { - _, ok := err.(*types.ServiceBusyError) - return ok + return errors.As(err, new(*types.ServiceBusyError)) }), ) return throttleRetry.Do(context.Background(), op) diff --git a/service/worker/service.go b/service/worker/service.go index 7f8afc4dc10..7baff3e291b 100644 --- a/service/worker/service.go +++ b/service/worker/service.go @@ -24,6 +24,7 @@ package worker import ( "context" + "errors" "fmt" "sync/atomic" @@ -445,7 +446,7 @@ func (s *Service) registerSystemDomain(domain string) { FailoverVersion: common.EmptyVersion, }) if err != nil { - if _, ok := err.(*types.DomainAlreadyExistsError); ok { + if errors.As(err, new(*types.DomainAlreadyExistsError)) { return } s.GetLogger().Fatal("failed to register system domain", tag.Error(err)) diff --git a/tools/cli/domain_commands.go b/tools/cli/domain_commands.go index 2c185ddfeb7..4dbcba2aafb 100644 --- a/tools/cli/domain_commands.go +++ b/tools/cli/domain_commands.go @@ -140,7 +140,7 @@ func (d *domainCLIImpl) RegisterDomain(c *cli.Context) { defer cancel() err = d.registerDomain(ctx, request) if err != nil { - if errors.As(err, new(*types.DomainAlreadyExistsError)) { + if !errors.As(err, new(*types.DomainAlreadyExistsError)) { ErrorAndExit("Register Domain operation failed.", err) } else { ErrorAndExit(fmt.Sprintf("Domain %s already registered.", domainName), err) @@ -178,7 +178,7 @@ func (d *domainCLIImpl) UpdateDomain(c *cli.Context) { Name: common.StringPtr(domainName), }) if err != nil { - if errors.As(err, new(*types.EntityNotExistsError)) { + if !errors.As(err, new(*types.EntityNotExistsError)) { ErrorAndExit("Operation UpdateDomain failed.", err) } else { ErrorAndExit(fmt.Sprintf("Domain %s does not exist.", domainName), err) @@ -261,7 +261,7 @@ func (d *domainCLIImpl) UpdateDomain(c *cli.Context) { updateRequest.SecurityToken = securityToken _, err := d.updateDomain(ctx, updateRequest) if err != nil { - if errors.As(err, new(*types.EntityNotExistsError)) { + if !errors.As(err, new(*types.EntityNotExistsError)) { ErrorAndExit("Operation UpdateDomain failed.", err) } else { ErrorAndExit(fmt.Sprintf("Domain %s does not exist.", domainName), err) @@ -297,7 +297,7 @@ func (d *domainCLIImpl) DeprecateDomain(c *cli.Context) { SecurityToken: securityToken, }) if err != nil { - if errors.As(err, new(*types.EntityNotExistsError)) { + if !errors.As(err, new(*types.EntityNotExistsError)) { ErrorAndExit("Operation DeprecateDomain failed.", err) } else { ErrorAndExit(fmt.Sprintf("Domain %s does not exist.", domainName), err) @@ -420,7 +420,7 @@ func (d *domainCLIImpl) DescribeDomain(c *cli.Context) { defer cancel() resp, err := d.describeDomain(ctx, &request) if err != nil { - if errors.As(err, new(*types.EntityNotExistsError)) { + if !errors.As(err, new(*types.EntityNotExistsError)) { ErrorAndExit("Operation DescribeDomain failed.", err) } ErrorAndExit(fmt.Sprintf("Domain %s does not exist.", domainName), err)