diff --git a/client/history/client.go b/client/history/client.go index 709a5209e52..fbc9a06b9fe 100644 --- a/client/history/client.go +++ b/client/history/client.go @@ -22,6 +22,7 @@ package history import ( "context" + "errors" "math/rand" "sync" "time" @@ -755,7 +756,7 @@ func (c *clientImpl) GetReplicationMessages( tag.ShardReplicationToken(req), ) // Returns service busy error to notify replication - if _, ok := err.(*types.ServiceBusyError); ok { + if errors.As(err, new(*types.ServiceBusyError)) { return err } return nil @@ -1096,7 +1097,8 @@ redirectLoop: } err = op(ctx, peer) if err != nil { - if s, ok := err.(*types.ShardOwnershipLostError); ok { + var s *types.ShardOwnershipLostError + if errors.As(err, &s) { // TODO: consider emitting a metric for number of redirects peer, err = c.peerResolver.FromHostAddress(s.GetOwner()) if err != nil { diff --git a/common/dynamicconfig/config.go b/common/dynamicconfig/config.go index 4ef3654a60f..b06b0f1020c 100644 --- a/common/dynamicconfig/config.go +++ b/common/dynamicconfig/config.go @@ -21,6 +21,7 @@ package dynamicconfig import ( + "errors" "fmt" "reflect" "strings" @@ -73,7 +74,7 @@ func (c *Collection) logError( if errCount%errCountLogThreshold == 0 { // log only every 'x' errors to reduce mem allocs and to avoid log noise filteredKey := getFilteredKeyAsString(key, filters) - if _, ok := err.(*types.EntityNotExistsError); ok { + if errors.As(err, new(*types.EntityNotExistsError)) { c.logger.Debug("dynamic config not set, use default value", tag.Key(filteredKey)) } else { c.logger.Warn("Failed to fetch key from dynamic config", tag.Key(filteredKey), tag.Error(err)) diff --git a/common/dynamicconfig/configstore/config_store_client.go b/common/dynamicconfig/configstore/config_store_client.go index 4f18fbf0d29..d03a77cb3b0 100644 --- a/common/dynamicconfig/configstore/config_store_client.go +++ b/common/dynamicconfig/configstore/config_store_client.go @@ -469,7 +469,7 @@ func (csc *configStoreClient) updateValue(name dc.Key, dcValues []*types.Dynamic return errors.New("timeout error on update") default: if err != nil { - if _, ok := err.(*persistence.ConditionFailedError); ok && retryAttempts > 0 { + if errors.As(err, new(*persistence.ConditionFailedError)) && retryAttempts > 0 { // fetch new config and retry err := csc.update() if err != nil { diff --git a/common/ndc/history_resender.go b/common/ndc/history_resender.go index 231b428f244..291fe92c944 100644 --- a/common/ndc/history_resender.go +++ b/common/ndc/history_resender.go @@ -156,20 +156,18 @@ func (n *HistoryResenderImpl) SendSingleWorkflowHistory( historyBatch.versionHistory.GetItems()) err = n.sendReplicationRawRequest(ctx, replicationRequest) - switch err.(type) { - case nil: - // continue to process the events - break - case *types.EntityNotExistsError: + if err == nil { + continue + } + if errors.As(err, new(*types.EntityNotExistsError)) { // Case 1: the workflow pass the retention period // Case 2: the workflow is corrupted if skipTask := n.fixCurrentExecution(ctx, domainID, workflowID, runID); skipTask { return ErrSkipTask } return err - default: - return fmt.Errorf("sending replication request: %w", err) } + return fmt.Errorf("sending replication request: %w", err) } return nil } diff --git a/common/util.go b/common/util.go index 18e6f4d01fa..d94f28be6b6 100644 --- a/common/util.go +++ b/common/util.go @@ -286,24 +286,19 @@ func IsServiceTransientError(err error) bool { // IsEntityNotExistsError checks if the error is an entity not exists error. func IsEntityNotExistsError(err error) bool { - _, ok := err.(*types.EntityNotExistsError) - return ok + return errors.As(err, new(*types.EntityNotExistsError)) } // IsServiceBusyError checks if the error is a service busy error. func IsServiceBusyError(err error) bool { - switch err.(type) { - case *types.ServiceBusyError: - return true - } - return false + return errors.As(err, new(*types.ServiceBusyError)) } // IsContextTimeoutError checks if the error is context timeout error func IsContextTimeoutError(err error) bool { - switch err := err.(type) { - case *types.InternalServiceError: - return err.Message == context.DeadlineExceeded.Error() + var internalErr *types.InternalServiceError + if errors.As(err, &internalErr) { + return internalErr.Message == context.DeadlineExceeded.Error() } return err == context.DeadlineExceeded || yarpcerrors.IsDeadlineExceeded(err) } @@ -933,7 +928,8 @@ func ConvertDynamicConfigMapPropertyToIntMap(dcValue map[string]interface{}) (ma // IsStickyTaskConditionError is error from matching engine func IsStickyTaskConditionError(err error) bool { - if e, ok := err.(*types.InternalServiceError); ok { + var e *types.InternalServiceError + if errors.As(err, &e) { return e.GetMessage() == StickyTaskConditionFailedErrorMsg } return false @@ -977,7 +973,7 @@ func ConvertErrToGetTaskFailedCause(err error) types.GetTaskFailedCause { if IsServiceBusyError(err) { return types.GetTaskFailedCauseServiceBusy } - if _, ok := err.(*types.ShardOwnershipLostError); ok { + if errors.As(err, new(*types.ShardOwnershipLostError)) { return types.GetTaskFailedCauseShardOwnershipLost } return types.GetTaskFailedCauseUncategorized diff --git a/host/archival_test.go b/host/archival_test.go index 437374a137c..4c869d63d07 100644 --- a/host/archival_test.go +++ b/host/archival_test.go @@ -24,6 +24,7 @@ import ( "bytes" "context" "encoding/binary" + "errors" "fmt" "strconv" "time" @@ -208,7 +209,7 @@ func (s *IntegrationSuite) isMutableStateDeleted(domainID string, execution *typ ctx, cancel := context.WithTimeout(context.Background(), defaultTestPersistenceTimeout) _, err := s.testCluster.testBase.ExecutionManager.GetWorkflowExecution(ctx, request) cancel() - if _, ok := err.(*types.EntityNotExistsError); ok { + if errors.As(err, new(*types.EntityNotExistsError)) { return true } time.Sleep(retryBackoffTime) diff --git a/host/onebox.go b/host/onebox.go index 5ee3143e011..c89ce69d1f0 100644 --- a/host/onebox.go +++ b/host/onebox.go @@ -23,6 +23,7 @@ package host import ( "context" "encoding/json" + "errors" "fmt" "sync" "time" @@ -812,7 +813,7 @@ func (c *cadenceImpl) createSystemDomain() error { FailoverVersion: common.EmptyVersion, }) if err != nil { - if _, ok := err.(*types.DomainAlreadyExistsError); ok { + if errors.As(err, new(*types.DomainAlreadyExistsError)) { return nil } return fmt.Errorf("failed to create cadence-system domain: %v", err) diff --git a/host/query_workflow_test.go b/host/query_workflow_test.go index cdf568c6bf5..a91af3744ef 100644 --- a/host/query_workflow_test.go +++ b/host/query_workflow_test.go @@ -210,8 +210,8 @@ func (s *IntegrationSuite) TestQueryWorkflow_Sticky() { } queryResult = <-queryResultCh s.NotNil(queryResult.Err) - queryFailError, ok := queryResult.Err.(*types.QueryFailedError) - s.True(ok) + var queryFailError *types.QueryFailedError + s.True(errors.As(err, &queryFailError)) s.Equal("unknown-query-type", queryFailError.Message) } @@ -523,8 +523,8 @@ func (s *IntegrationSuite) TestQueryWorkflow_NonSticky() { } queryResult = <-queryResultCh s.NotNil(queryResult.Err) - queryFailError, ok := queryResult.Err.(*types.QueryFailedError) - s.True(ok) + var queryFailError *types.QueryFailedError + s.True(errors.As(err, &queryFailError)) s.Equal("unknown-query-type", queryFailError.Message) // advance the state of the decider @@ -1416,6 +1416,7 @@ func (s *IntegrationSuite) TestQueryWorkflow_BeforeFirstDecision() { }, }) s.Nil(queryResp) - s.IsType(&types.QueryFailedError{}, err) - s.Equal("workflow must handle at least one decision task before it can be queried", err.(*types.QueryFailedError).Message) + var queryErr *types.QueryFailedError + s.True(errors.As(err, &queryErr), "wrong error: %v", err) + s.Equal("workflow must handle at least one decision task before it can be queried", queryErr.Message) } diff --git a/host/signal_workflow_test.go b/host/signal_workflow_test.go index 865c27e67b5..8aa55807958 100644 --- a/host/signal_workflow_test.go +++ b/host/signal_workflow_test.go @@ -24,6 +24,7 @@ import ( "bytes" "context" "encoding/binary" + "errors" "fmt" "strconv" "strings" @@ -1574,7 +1575,9 @@ func (s *IntegrationSuite) TestSignalWithStartWorkflow_IDReusePolicy() { cancel() s.Nil(resp) s.Error(err) - errMsg := err.(*types.WorkflowExecutionAlreadyStartedError).GetMessage() + var alreadyStartedErr *types.WorkflowExecutionAlreadyStartedError + s.True(errors.As(err, &alreadyStartedErr)) + errMsg := alreadyStartedErr.GetMessage() s.True(strings.Contains(errMsg, "reject duplicate workflow ID")) // test policy WorkflowIDReusePolicyAllowDuplicateFailedOnly @@ -1585,7 +1588,8 @@ func (s *IntegrationSuite) TestSignalWithStartWorkflow_IDReusePolicy() { cancel() s.Nil(resp) s.Error(err) - errMsg = err.(*types.WorkflowExecutionAlreadyStartedError).GetMessage() + s.True(errors.As(err, &alreadyStartedErr)) + errMsg = alreadyStartedErr.GetMessage() s.True(strings.Contains(errMsg, "allow duplicate workflow ID if last run failed")) // test policy WorkflowIDReusePolicyAllowDuplicate diff --git a/service/frontend/wrappers/clusterredirection/policy.go b/service/frontend/wrappers/clusterredirection/policy.go index cf6a2974554..d18f51d9d06 100644 --- a/service/frontend/wrappers/clusterredirection/policy.go +++ b/service/frontend/wrappers/clusterredirection/policy.go @@ -22,6 +22,7 @@ package clusterredirection import ( "context" + "errors" "fmt" "github.com/uber/cadence/common/cache" @@ -235,8 +236,8 @@ func (policy *selectedOrAllAPIsForwardingRedirectionPolicy) withRedirect(ctx con } func (policy *selectedOrAllAPIsForwardingRedirectionPolicy) isDomainNotActiveError(err error) (string, bool) { - domainNotActiveErr, ok := err.(*types.DomainNotActiveError) - if !ok { + var domainNotActiveErr *types.DomainNotActiveError + if !errors.As(err, &domainNotActiveErr) { return "", false } return domainNotActiveErr.ActiveCluster, true diff --git a/service/worker/parentclosepolicy/workflow.go b/service/worker/parentclosepolicy/workflow.go index 60876fd3f9f..c10c15e0fb1 100644 --- a/service/worker/parentclosepolicy/workflow.go +++ b/service/worker/parentclosepolicy/workflow.go @@ -22,6 +22,7 @@ package parentclosepolicy import ( "context" + "errors" "fmt" "math/rand" "time" @@ -197,12 +198,12 @@ func ProcessorActivity(ctx context.Context, request Request) error { err = fmt.Errorf("unknown parent close policy: %v", execution.Policy) } if err != nil { - switch err.(type) { - case *types.EntityNotExistsError, - *types.WorkflowExecutionAlreadyCompletedError, - *types.CancellationAlreadyRequestedError: + switch { + case errors.As(err, new(*types.EntityNotExistsError)), + errors.As(err, new(*types.WorkflowExecutionAlreadyCompletedError)), + errors.As(err, new(*types.CancellationAlreadyRequestedError)): err = nil - case *types.DomainNotActiveError: + case errors.As(err, new(*types.DomainNotActiveError)): var domainEntry *cache.DomainCacheEntry if domainEntry, err = domainCache.GetDomainByID(domainID); err == nil { cluster := domainEntry.GetReplicationConfig().ActiveClusterName