Skip to content

Commit

Permalink
Improve the error message for getting all indexes of collection
Browse files Browse the repository at this point in the history
Signed-off-by: yah01 <[email protected]>
  • Loading branch information
yah01 committed Oct 8, 2023
1 parent fc429ca commit 5a6ac66
Show file tree
Hide file tree
Showing 8 changed files with 676 additions and 643 deletions.
27 changes: 12 additions & 15 deletions internal/datacoord/index_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,9 @@ func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexReques
func (s *Server) GetIndexState(ctx context.Context, req *indexpb.GetIndexStateRequest) (*indexpb.GetIndexStateResponse, error) {
log := log.Ctx(ctx).With(
zap.Int64("collectionID", req.GetCollectionID()),
zap.String("indexName", req.GetIndexName()),
)
log.Info("receive GetIndexState request",
zap.String("indexName", req.GetIndexName()))
log.Info("receive GetIndexState request")

if err := merr.CheckHealthy(s.GetStateCode()); err != nil {
log.Warn(msgDataCoordIsUnhealthy(paramtable.GetNodeID()), zap.Error(err))
Expand All @@ -227,8 +227,7 @@ func (s *Server) GetIndexState(ctx context.Context, req *indexpb.GetIndexStateRe
indexes := s.meta.GetIndexesForCollection(req.GetCollectionID(), req.GetIndexName())
if len(indexes) == 0 {
err := merr.WrapErrIndexNotFound(req.GetIndexName())
log.Warn("GetIndexState fail",
zap.String("indexName", req.GetIndexName()), zap.Error(err))
log.Warn("GetIndexState fail", zap.Error(err))
return &indexpb.GetIndexStateResponse{
Status: merr.Status(err),
}, nil
Expand All @@ -245,20 +244,16 @@ func (s *Server) GetIndexState(ctx context.Context, req *indexpb.GetIndexStateRe
State: commonpb.IndexState_Finished,
}

indexInfo := &indexpb.IndexInfo{
IndexedRows: 0,
TotalRows: 0,
State: 0,
IndexStateFailReason: "",
}
indexInfo := &indexpb.IndexInfo{}
s.completeIndexInfo(indexInfo, indexes[0], s.meta.SelectSegments(func(info *SegmentInfo) bool {
return isFlush(info) && info.CollectionID == req.GetCollectionID()
}), false, indexes[0].CreateTime)
ret.State = indexInfo.State
ret.FailReason = indexInfo.IndexStateFailReason

log.Info("GetIndexState success",
zap.String("IndexName", req.GetIndexName()), zap.String("state", ret.GetState().String()))
zap.String("state", ret.GetState().String()),
)
return ret, nil
}

Expand Down Expand Up @@ -485,9 +480,11 @@ func (s *Server) GetIndexBuildProgress(ctx context.Context, req *indexpb.GetInde
func (s *Server) DescribeIndex(ctx context.Context, req *indexpb.DescribeIndexRequest) (*indexpb.DescribeIndexResponse, error) {
log := log.Ctx(ctx).With(
zap.Int64("collectionID", req.GetCollectionID()),
zap.String("indexName", req.GetIndexName()),
)
log.Info("receive DescribeIndex request",
zap.Uint64("timestamp", req.GetTimestamp()),
)
log.Info("receive DescribeIndex request", zap.String("indexName", req.GetIndexName()),
zap.Uint64("timestamp", req.GetTimestamp()))

if err := merr.CheckHealthy(s.GetStateCode()); err != nil {
log.Warn(msgDataCoordIsUnhealthy(paramtable.GetNodeID()), zap.Error(err))
Expand All @@ -499,7 +496,7 @@ func (s *Server) DescribeIndex(ctx context.Context, req *indexpb.DescribeIndexRe
indexes := s.meta.GetIndexesForCollection(req.GetCollectionID(), req.GetIndexName())
if len(indexes) == 0 {
err := merr.WrapErrIndexNotFound(req.GetIndexName())
log.Warn("DescribeIndex fail", zap.String("indexName", req.GetIndexName()), zap.Error(err))
log.Warn("DescribeIndex fail", zap.Error(err))
return &indexpb.DescribeIndexResponse{
Status: merr.Status(err),
}, nil
Expand Down Expand Up @@ -532,7 +529,7 @@ func (s *Server) DescribeIndex(ctx context.Context, req *indexpb.DescribeIndexRe
s.completeIndexInfo(indexInfo, index, segments, false, createTs)
indexInfos = append(indexInfos, indexInfo)
}
log.Info("DescribeIndex success", zap.String("indexName", req.GetIndexName()))
log.Info("DescribeIndex success")
return &indexpb.DescribeIndexResponse{
Status: merr.Status(nil),
IndexInfos: indexInfos,
Expand Down
49 changes: 22 additions & 27 deletions internal/proxy/default_limit_reducer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package proxy
import (
"context"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
Expand Down Expand Up @@ -39,42 +38,38 @@ func (r *defaultLimitReducer) afterReduce(result *milvuspb.QueryResults) error {
outputFieldsID := r.req.GetOutputFieldsId()

result.CollectionName = collectionName
var err error

if len(result.FieldsData) > 0 {
result.Status = merr.Status(nil)
} else {
result.Status = &commonpb.Status{
ErrorCode: commonpb.ErrorCode_EmptyCollection,
Reason: "empty collection", // TODO
}
return nil
}

for i := 0; i < len(result.FieldsData); i++ {
for i := 0; i < len(result.GetFieldsData()); i++ {
// drop ts column
if outputFieldsID[i] == common.TimeStampField {
result.FieldsData = append(result.FieldsData[:i], result.FieldsData[(i+1):]...)
outputFieldsID = append(outputFieldsID[:i], outputFieldsID[i+1:]...)
i--
continue
}
for _, field := range schema.Fields {
if field.FieldID == outputFieldsID[i] {
// deal with the situation that offset equal to or greater than the number of entities
if result.FieldsData[i] == nil {
var err error
result.FieldsData[i], err = typeutil.GenEmptyFieldData(field)
if err != nil {
return err
}
}
result.FieldsData[i].FieldName = field.Name
result.FieldsData[i].FieldId = field.FieldID
result.FieldsData[i].Type = field.DataType
result.FieldsData[i].IsDynamic = field.IsDynamic
field := typeutil.GetField(schema, outputFieldsID[i])
if field == nil {
err = merr.WrapErrFieldNotFound(outputFieldsID[i])
break

Check warning on line 54 in internal/proxy/default_limit_reducer.go

View check run for this annotation

Codecov / codecov/patch

internal/proxy/default_limit_reducer.go#L53-L54

Added lines #L53 - L54 were not covered by tests
}

if result.FieldsData[i] == nil {
result.FieldsData[i], err = typeutil.GenEmptyFieldData(field)
if err != nil {
break

Check warning on line 60 in internal/proxy/default_limit_reducer.go

View check run for this annotation

Codecov / codecov/patch

internal/proxy/default_limit_reducer.go#L60

Added line #L60 was not covered by tests
}
continue
}

result.FieldsData[i].FieldName = field.GetName()
result.FieldsData[i].FieldId = field.GetFieldID()
result.FieldsData[i].Type = field.GetDataType()
result.FieldsData[i].IsDynamic = field.GetIsDynamic()

Check warning on line 68 in internal/proxy/default_limit_reducer.go

View check run for this annotation

Codecov / codecov/patch

internal/proxy/default_limit_reducer.go#L65-L68

Added lines #L65 - L68 were not covered by tests
}

return nil
result.Status = merr.Status(err)
return err
}

func newDefaultLimitReducer(ctx context.Context, params *queryParams, req *internalpb.RetrieveRequest, schema *schemapb.CollectionSchema, collectionName string) *defaultLimitReducer {
Expand Down
Loading

0 comments on commit 5a6ac66

Please sign in to comment.