Skip to content

Commit

Permalink
Refine errors for import (#27379)
Browse files Browse the repository at this point in the history
Signed-off-by: yah01 <[email protected]>
  • Loading branch information
yah01 authored Sep 30, 2023
1 parent dbdb9e1 commit 63ac43a
Show file tree
Hide file tree
Showing 17 changed files with 108 additions and 317 deletions.
9 changes: 3 additions & 6 deletions internal/distributed/datanode/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"sync"
"time"

"github.com/cockroachdb/errors"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
clientv3 "go.etcd.io/etcd/client/v3"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
Expand All @@ -50,6 +49,7 @@ import (
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/interceptor"
"github.com/milvus-io/milvus/pkg/util/logutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/retry"
)
Expand Down Expand Up @@ -326,11 +326,8 @@ func (s *Server) WatchDmChannels(ctx context.Context, req *datapb.WatchDmChannel
}

func (s *Server) FlushSegments(ctx context.Context, req *datapb.FlushSegmentsRequest) (*commonpb.Status, error) {
if s.datanode.GetStateCode() != commonpb.StateCode_Healthy {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "DataNode isn't healthy.",
}, errors.New("DataNode is not ready yet")
if err := merr.CheckHealthy(s.datanode.GetStateCode()); err != nil {
return merr.Status(err), nil
}
return s.datanode.FlushSegments(ctx, req)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/distributed/datanode/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func Test_NewServer(t *testing.T) {
status: &commonpb.Status{},
}
states, err := server.FlushSegments(ctx, nil)
assert.Error(t, err)
assert.NoError(t, err)
assert.NotNil(t, states)
})

Expand Down
20 changes: 4 additions & 16 deletions internal/distributed/proxy/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -868,19 +868,13 @@ func (s *Server) AlterAlias(ctx context.Context, request *milvuspb.AlterAliasReq

func (s *Server) DescribeAlias(ctx context.Context, request *milvuspb.DescribeAliasRequest) (*milvuspb.DescribeAliasResponse, error) {
return &milvuspb.DescribeAliasResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "TODO: implement me",
},
Status: merr.Status(merr.WrapErrServiceUnavailable("DescribeAlias unimplemented")),
}, nil
}

func (s *Server) ListAliases(ctx context.Context, request *milvuspb.ListAliasesRequest) (*milvuspb.ListAliasesResponse, error) {
return &milvuspb.ListAliasesResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "TODO: implement me",
},
Status: merr.Status(merr.WrapErrServiceUnavailable("ListAliases unimplemented")),
}, nil
}

Expand Down Expand Up @@ -1070,19 +1064,13 @@ func (s *Server) ListResourceGroups(ctx context.Context, req *milvuspb.ListResou

func (s *Server) ListIndexedSegment(ctx context.Context, req *federpb.ListIndexedSegmentRequest) (*federpb.ListIndexedSegmentResponse, error) {
return &federpb.ListIndexedSegmentResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "not implemented",
},
Status: merr.Status(merr.WrapErrServiceUnavailable("ListIndexedSegment unimplemented")),
}, nil
}

func (s *Server) DescribeSegmentIndexData(ctx context.Context, req *federpb.DescribeSegmentIndexDataRequest) (*federpb.DescribeSegmentIndexDataResponse, error) {
return &federpb.DescribeSegmentIndexDataResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "not implemented",
},
Status: merr.Status(merr.WrapErrServiceUnavailable("DescribeSegmentIndexData unimplemented")),
}, nil
}

Expand Down
47 changes: 18 additions & 29 deletions internal/indexnode/indexnode_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,20 @@ import (
)

func (i *IndexNode) CreateJob(ctx context.Context, req *indexpb.CreateJobRequest) (*commonpb.Status, error) {
log := log.Ctx(ctx).With(
zap.String("clusterID", req.GetClusterID()),
zap.Int64("indexBuildID", req.GetBuildID()),
)

if !i.lifetime.Add(commonpbutil.IsHealthy) {
stateCode := i.lifetime.GetState()
log.Ctx(ctx).Warn("index node not ready",
log.Warn("index node not ready",
zap.String("state", stateCode.String()),
zap.String("clusterID", req.GetClusterID()),
zap.Int64("indexBuildID", req.GetBuildID()),
)
return merr.Status(merr.WrapErrServiceNotReady(stateCode.String())), nil
}
defer i.lifetime.Done()
log.Ctx(ctx).Info("IndexNode building index ...",
zap.String("clusterID", req.GetClusterID()),
zap.Int64("indexBuildID", req.GetBuildID()),
log.Info("IndexNode building index ...",
zap.Int64("indexID", req.GetIndexID()),
zap.String("indexName", req.GetIndexName()),
zap.String("indexFilePrefix", req.GetIndexFilePrefix()),
Expand All @@ -77,26 +78,20 @@ func (i *IndexNode) CreateJob(ctx context.Context, req *indexpb.CreateJobRequest
cancel: taskCancel,
state: commonpb.IndexState_InProgress,
}); oldInfo != nil {
log.Ctx(ctx).Warn("duplicated index build task", zap.String("clusterID", req.GetClusterID()), zap.Int64("buildID", req.GetBuildID()))
err := merr.WrapErrIndexDuplicate(req.GetIndexName(), "building index task existed")
log.Warn("duplicated index build task", zap.Error(err))
metrics.IndexNodeBuildIndexTaskCounter.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.FailLabel).Inc()
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_BuildIndexError,
Reason: "duplicated index build task",
}, nil
return merr.Status(err), nil
}
cm, err := i.storageFactory.NewChunkManager(i.loopCtx, req.GetStorageConfig())
if err != nil {
log.Ctx(ctx).Error("create chunk manager failed", zap.String("bucket", req.GetStorageConfig().GetBucketName()),
log.Error("create chunk manager failed", zap.String("bucket", req.GetStorageConfig().GetBucketName()),
zap.String("accessKey", req.GetStorageConfig().GetAccessKeyID()),
zap.String("clusterID", req.GetClusterID()), zap.Int64("indexBuildID", req.GetBuildID()),
zap.Error(err),
)
i.deleteTaskInfos(ctx, []taskKey{{ClusterID: req.GetClusterID(), BuildID: req.GetBuildID()}})
metrics.IndexNodeBuildIndexTaskCounter.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.FailLabel).Inc()
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_BuildIndexError,
Reason: "create chunk manager failed, error: " + err.Error(),
}, nil
return merr.Status(err), nil
}
task := &indexBuildTask{
ident: fmt.Sprintf("%s/%d", req.ClusterID, req.BuildID),
Expand All @@ -113,15 +108,15 @@ func (i *IndexNode) CreateJob(ctx context.Context, req *indexpb.CreateJobRequest
}
ret := merr.Status(nil)
if err := i.sched.IndexBuildQueue.Enqueue(task); err != nil {
log.Ctx(ctx).Warn("IndexNode failed to schedule", zap.Int64("indexBuildID", req.GetBuildID()),
zap.String("clusterID", req.GetClusterID()), zap.Error(err))
log.Warn("IndexNode failed to schedule",
zap.Error(err))
ret = merr.Status(err)
metrics.IndexNodeBuildIndexTaskCounter.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.FailLabel).Inc()
return ret, nil
}
metrics.IndexNodeBuildIndexTaskCounter.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SuccessLabel).Inc()
log.Ctx(ctx).Info("IndexNode successfully scheduled", zap.Int64("indexBuildID", req.GetBuildID()),
zap.String("clusterID", req.GetClusterID()), zap.String("indexName", req.GetIndexName()))
log.Info("IndexNode successfully scheduled",
zap.String("indexName", req.GetIndexName()))
return ret, nil
}

Expand Down Expand Up @@ -253,7 +248,6 @@ func (i *IndexNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequ
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: msgIndexNodeIsUnhealthy(paramtable.GetNodeID()),
},
Response: "",
}, nil
}
defer i.lifetime.Done()
Expand All @@ -266,8 +260,7 @@ func (i *IndexNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequ
zap.Error(err))

return &milvuspb.GetMetricsResponse{
Status: merr.Status(err),
Response: "",
Status: merr.Status(err),
}, nil
}

Expand All @@ -289,10 +282,6 @@ func (i *IndexNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequ
zap.String("metricType", metricType))

return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: metricsinfo.MsgUnimplementedMetric,
},
Response: "",
Status: merr.Status(merr.WrapErrMetricNotFound(metricType)),
}, nil
}
3 changes: 1 addition & 2 deletions internal/indexnode/indexnode_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,7 @@ func TestGetMetricsError(t *testing.T) {
}
resp, err = in.GetMetrics(ctx, unsupportedReq)
assert.NoError(t, err)
assert.Equal(t, resp.GetStatus().GetErrorCode(), commonpb.ErrorCode_UnexpectedError)
assert.Equal(t, resp.GetStatus().GetReason(), metricsinfo.MsgUnimplementedMetric)
assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrMetricNotFound)
}

func TestMockFieldData(t *testing.T) {
Expand Down
42 changes: 0 additions & 42 deletions internal/proxy/error.go

This file was deleted.

35 changes: 0 additions & 35 deletions internal/proxy/error_test.go

This file was deleted.

38 changes: 18 additions & 20 deletions internal/proxy/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1483,13 +1483,13 @@ func (node *Proxy) GetLoadingProgress(ctx context.Context, request *milvuspb.Get

getErrResponse := func(err error) *milvuspb.GetLoadingProgressResponse {
log.Warn("fail to get loading progress",
zap.String("collection_name", request.CollectionName),
zap.Strings("partition_name", request.PartitionNames),
zap.String("collectionName", request.CollectionName),
zap.Strings("partitionName", request.PartitionNames),
zap.Error(err))
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
if errors.Is(err, ErrInsufficientMemory) {
if errors.Is(err, merr.ErrServiceMemoryLimitExceeded) {
return &milvuspb.GetLoadingProgressResponse{
Status: InSufficientMemoryStatus(request.GetCollectionName()),
Status: merr.Status(err),
}
}
return &milvuspb.GetLoadingProgressResponse{
Expand Down Expand Up @@ -1574,14 +1574,6 @@ func (node *Proxy) GetLoadState(ctx context.Context, request *milvuspb.GetLoadSt
return getErrResponse(err), nil
}

// TODO(longjiquan): https://github.com/milvus-io/milvus/issues/21485, Remove `GetComponentStates` after error code
// is ready to distinguish case whether the querycoord is not healthy or the collection is not even loaded.
if statesResp, err := node.queryCoord.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{}); err != nil {
return getErrResponse(err), nil
} else if statesResp.State == nil || statesResp.State.StateCode != commonpb.StateCode_Healthy {
return getErrResponse(fmt.Errorf("the querycoord server isn't healthy, state: %v", statesResp.State)), nil
}

successResponse := &milvuspb.GetLoadStateResponse{
Status: merr.Status(nil),
}
Expand Down Expand Up @@ -1615,24 +1607,30 @@ func (node *Proxy) GetLoadState(ctx context.Context, request *milvuspb.GetLoadSt
var progress int64
if len(request.GetPartitionNames()) == 0 {
if progress, _, err = getCollectionProgress(ctx, node.queryCoord, request.GetBase(), collectionID); err != nil {
if errors.Is(err, ErrInsufficientMemory) {
if err != nil {
if errors.Is(err, merr.ErrCollectionNotLoaded) {
successResponse.State = commonpb.LoadState_LoadStateNotLoad
return successResponse, nil
}
return &milvuspb.GetLoadStateResponse{
Status: InSufficientMemoryStatus(request.GetCollectionName()),
Status: merr.Status(err),
}, nil
}
successResponse.State = commonpb.LoadState_LoadStateNotLoad
return successResponse, nil
}
} else {
if progress, _, err = getPartitionProgress(ctx, node.queryCoord, request.GetBase(),
request.GetPartitionNames(), request.GetCollectionName(), collectionID, request.GetDbName()); err != nil {
if errors.Is(err, ErrInsufficientMemory) {
if err != nil {
if errors.IsAny(err,
merr.ErrCollectionNotLoaded,
merr.ErrPartitionNotLoaded) {
successResponse.State = commonpb.LoadState_LoadStateNotLoad
return successResponse, nil
}
return &milvuspb.GetLoadStateResponse{
Status: InSufficientMemoryStatus(request.GetCollectionName()),
Status: merr.Status(err),
}, nil
}
successResponse.State = commonpb.LoadState_LoadStateNotLoad
return successResponse, nil
}
}
if progress >= 100 {
Expand Down
Loading

0 comments on commit 63ac43a

Please sign in to comment.