From 5a6ac664d1aba1bbacb136a6102007877c332431 Mon Sep 17 00:00:00 2001 From: yah01 Date: Wed, 27 Sep 2023 10:19:57 +0800 Subject: [PATCH] Improve the error message for getting all indexes of collection Signed-off-by: yah01 --- internal/datacoord/index_service.go | 27 +- internal/proxy/default_limit_reducer.go | 49 +- internal/proxy/task.go | 1180 +++++++++++---------- internal/proxy/task_index.go | 12 +- pkg/util/merr/errors_test.go | 3 + pkg/util/merr/utils.go | 8 + pkg/util/typeutil/gen_empty_field_data.go | 33 +- pkg/util/typeutil/schema.go | 7 + 8 files changed, 676 insertions(+), 643 deletions(-) diff --git a/internal/datacoord/index_service.go b/internal/datacoord/index_service.go index 04fde946c770e..7973960813e4c 100644 --- a/internal/datacoord/index_service.go +++ b/internal/datacoord/index_service.go @@ -213,9 +213,9 @@ func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexReques func (s *Server) GetIndexState(ctx context.Context, req *indexpb.GetIndexStateRequest) (*indexpb.GetIndexStateResponse, error) { log := log.Ctx(ctx).With( zap.Int64("collectionID", req.GetCollectionID()), + zap.String("indexName", req.GetIndexName()), ) - log.Info("receive GetIndexState request", - zap.String("indexName", req.GetIndexName())) + log.Info("receive GetIndexState request") if err := merr.CheckHealthy(s.GetStateCode()); err != nil { log.Warn(msgDataCoordIsUnhealthy(paramtable.GetNodeID()), zap.Error(err)) @@ -227,8 +227,7 @@ func (s *Server) GetIndexState(ctx context.Context, req *indexpb.GetIndexStateRe indexes := s.meta.GetIndexesForCollection(req.GetCollectionID(), req.GetIndexName()) if len(indexes) == 0 { err := merr.WrapErrIndexNotFound(req.GetIndexName()) - log.Warn("GetIndexState fail", - zap.String("indexName", req.GetIndexName()), zap.Error(err)) + log.Warn("GetIndexState fail", zap.Error(err)) return &indexpb.GetIndexStateResponse{ Status: merr.Status(err), }, nil @@ -245,12 +244,7 @@ func (s *Server) GetIndexState(ctx context.Context, req *indexpb.GetIndexStateRe State: commonpb.IndexState_Finished, } - indexInfo := &indexpb.IndexInfo{ - IndexedRows: 0, - TotalRows: 0, - State: 0, - IndexStateFailReason: "", - } + indexInfo := &indexpb.IndexInfo{} s.completeIndexInfo(indexInfo, indexes[0], s.meta.SelectSegments(func(info *SegmentInfo) bool { return isFlush(info) && info.CollectionID == req.GetCollectionID() }), false, indexes[0].CreateTime) @@ -258,7 +252,8 @@ func (s *Server) GetIndexState(ctx context.Context, req *indexpb.GetIndexStateRe ret.FailReason = indexInfo.IndexStateFailReason log.Info("GetIndexState success", - zap.String("IndexName", req.GetIndexName()), zap.String("state", ret.GetState().String())) + zap.String("state", ret.GetState().String()), + ) return ret, nil } @@ -485,9 +480,11 @@ func (s *Server) GetIndexBuildProgress(ctx context.Context, req *indexpb.GetInde func (s *Server) DescribeIndex(ctx context.Context, req *indexpb.DescribeIndexRequest) (*indexpb.DescribeIndexResponse, error) { log := log.Ctx(ctx).With( zap.Int64("collectionID", req.GetCollectionID()), + zap.String("indexName", req.GetIndexName()), + ) + log.Info("receive DescribeIndex request", + zap.Uint64("timestamp", req.GetTimestamp()), ) - log.Info("receive DescribeIndex request", zap.String("indexName", req.GetIndexName()), - zap.Uint64("timestamp", req.GetTimestamp())) if err := merr.CheckHealthy(s.GetStateCode()); err != nil { log.Warn(msgDataCoordIsUnhealthy(paramtable.GetNodeID()), zap.Error(err)) @@ -499,7 +496,7 @@ func (s *Server) DescribeIndex(ctx context.Context, req *indexpb.DescribeIndexRe indexes := s.meta.GetIndexesForCollection(req.GetCollectionID(), req.GetIndexName()) if len(indexes) == 0 { err := merr.WrapErrIndexNotFound(req.GetIndexName()) - log.Warn("DescribeIndex fail", zap.String("indexName", req.GetIndexName()), zap.Error(err)) + log.Warn("DescribeIndex fail", zap.Error(err)) return &indexpb.DescribeIndexResponse{ Status: merr.Status(err), }, nil @@ -532,7 +529,7 @@ func (s *Server) DescribeIndex(ctx context.Context, req *indexpb.DescribeIndexRe s.completeIndexInfo(indexInfo, index, segments, false, createTs) indexInfos = append(indexInfos, indexInfo) } - log.Info("DescribeIndex success", zap.String("indexName", req.GetIndexName())) + log.Info("DescribeIndex success") return &indexpb.DescribeIndexResponse{ Status: merr.Status(nil), IndexInfos: indexInfos, diff --git a/internal/proxy/default_limit_reducer.go b/internal/proxy/default_limit_reducer.go index d74440567bfc3..0f70f49bd1ccb 100644 --- a/internal/proxy/default_limit_reducer.go +++ b/internal/proxy/default_limit_reducer.go @@ -3,7 +3,6 @@ package proxy import ( "context" - "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/proto/internalpb" @@ -39,42 +38,38 @@ func (r *defaultLimitReducer) afterReduce(result *milvuspb.QueryResults) error { outputFieldsID := r.req.GetOutputFieldsId() result.CollectionName = collectionName + var err error - if len(result.FieldsData) > 0 { - result.Status = merr.Status(nil) - } else { - result.Status = &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_EmptyCollection, - Reason: "empty collection", // TODO - } - return nil - } - - for i := 0; i < len(result.FieldsData); i++ { + for i := 0; i < len(result.GetFieldsData()); i++ { + // drop ts column if outputFieldsID[i] == common.TimeStampField { result.FieldsData = append(result.FieldsData[:i], result.FieldsData[(i+1):]...) + outputFieldsID = append(outputFieldsID[:i], outputFieldsID[i+1:]...) i-- continue } - for _, field := range schema.Fields { - if field.FieldID == outputFieldsID[i] { - // deal with the situation that offset equal to or greater than the number of entities - if result.FieldsData[i] == nil { - var err error - result.FieldsData[i], err = typeutil.GenEmptyFieldData(field) - if err != nil { - return err - } - } - result.FieldsData[i].FieldName = field.Name - result.FieldsData[i].FieldId = field.FieldID - result.FieldsData[i].Type = field.DataType - result.FieldsData[i].IsDynamic = field.IsDynamic + field := typeutil.GetField(schema, outputFieldsID[i]) + if field == nil { + err = merr.WrapErrFieldNotFound(outputFieldsID[i]) + break + } + + if result.FieldsData[i] == nil { + result.FieldsData[i], err = typeutil.GenEmptyFieldData(field) + if err != nil { + break } + continue } + + result.FieldsData[i].FieldName = field.GetName() + result.FieldsData[i].FieldId = field.GetFieldID() + result.FieldsData[i].Type = field.GetDataType() + result.FieldsData[i].IsDynamic = field.GetIsDynamic() } - return nil + result.Status = merr.Status(err) + return err } func newDefaultLimitReducer(ctx context.Context, params *queryParams, req *internalpb.RetrieveRequest, schema *schemapb.CollectionSchema, collectionName string) *defaultLimitReducer { diff --git a/internal/proxy/task.go b/internal/proxy/task.go index bdb6b45bd793c..56e3971681bbe 100644 --- a/internal/proxy/task.go +++ b/internal/proxy/task.go @@ -123,51 +123,51 @@ type createCollectionTask struct { schema *schemapb.CollectionSchema } -func (cct *createCollectionTask) TraceCtx() context.Context { - return cct.ctx +func (t *createCollectionTask) TraceCtx() context.Context { + return t.ctx } -func (cct *createCollectionTask) ID() UniqueID { - return cct.Base.MsgID +func (t *createCollectionTask) ID() UniqueID { + return t.Base.MsgID } -func (cct *createCollectionTask) SetID(uid UniqueID) { - cct.Base.MsgID = uid +func (t *createCollectionTask) SetID(uid UniqueID) { + t.Base.MsgID = uid } -func (cct *createCollectionTask) Name() string { +func (t *createCollectionTask) Name() string { return CreateCollectionTaskName } -func (cct *createCollectionTask) Type() commonpb.MsgType { - return cct.Base.MsgType +func (t *createCollectionTask) Type() commonpb.MsgType { + return t.Base.MsgType } -func (cct *createCollectionTask) BeginTs() Timestamp { - return cct.Base.Timestamp +func (t *createCollectionTask) BeginTs() Timestamp { + return t.Base.Timestamp } -func (cct *createCollectionTask) EndTs() Timestamp { - return cct.Base.Timestamp +func (t *createCollectionTask) EndTs() Timestamp { + return t.Base.Timestamp } -func (cct *createCollectionTask) SetTs(ts Timestamp) { - cct.Base.Timestamp = ts +func (t *createCollectionTask) SetTs(ts Timestamp) { + t.Base.Timestamp = ts } -func (cct *createCollectionTask) OnEnqueue() error { - cct.Base = commonpbutil.NewMsgBase() - cct.Base.MsgType = commonpb.MsgType_CreateCollection - cct.Base.SourceID = paramtable.GetNodeID() +func (t *createCollectionTask) OnEnqueue() error { + t.Base = commonpbutil.NewMsgBase() + t.Base.MsgType = commonpb.MsgType_CreateCollection + t.Base.SourceID = paramtable.GetNodeID() return nil } -func (cct *createCollectionTask) validatePartitionKey() error { +func (t *createCollectionTask) validatePartitionKey() error { idx := -1 - for i, field := range cct.schema.Fields { + for i, field := range t.schema.Fields { if field.GetIsPartitionKey() { if idx != -1 { - return fmt.Errorf("there are more than one partition key, field name = %s, %s", cct.schema.Fields[idx].Name, field.Name) + return fmt.Errorf("there are more than one partition key, field name = %s, %s", t.schema.Fields[idx].Name, field.Name) } if field.GetIsPrimaryKey() { @@ -179,13 +179,13 @@ func (cct *createCollectionTask) validatePartitionKey() error { return errors.New("the data type of partition key should be Int64 or VarChar") } - if cct.GetNumPartitions() < 0 { + if t.GetNumPartitions() < 0 { return errors.New("the specified partitions should be greater than 0 if partition key is used") } // set default physical partitions num if enable partition key mode - if cct.GetNumPartitions() == 0 { - cct.NumPartitions = common.DefaultPartitionsWithPartitionKey + if t.GetNumPartitions() == 0 { + t.NumPartitions = common.DefaultPartitionsWithPartitionKey } idx = i @@ -193,73 +193,73 @@ func (cct *createCollectionTask) validatePartitionKey() error { } if idx == -1 { - if cct.GetNumPartitions() != 0 { + if t.GetNumPartitions() != 0 { return fmt.Errorf("num_partitions should only be specified with partition key field enabled") } } else { log.Info("create collection with partition key mode", - zap.String("collectionName", cct.CollectionName), - zap.Int64("numDefaultPartitions", cct.GetNumPartitions())) + zap.String("collectionName", t.CollectionName), + zap.Int64("numDefaultPartitions", t.GetNumPartitions())) } return nil } -func (cct *createCollectionTask) PreExecute(ctx context.Context) error { - cct.Base.MsgType = commonpb.MsgType_CreateCollection - cct.Base.SourceID = paramtable.GetNodeID() +func (t *createCollectionTask) PreExecute(ctx context.Context) error { + t.Base.MsgType = commonpb.MsgType_CreateCollection + t.Base.SourceID = paramtable.GetNodeID() - cct.schema = &schemapb.CollectionSchema{} - err := proto.Unmarshal(cct.Schema, cct.schema) + t.schema = &schemapb.CollectionSchema{} + err := proto.Unmarshal(t.Schema, t.schema) if err != nil { return err } - cct.schema.AutoID = false + t.schema.AutoID = false - if cct.ShardsNum > Params.ProxyCfg.MaxShardNum.GetAsInt32() { + if t.ShardsNum > Params.ProxyCfg.MaxShardNum.GetAsInt32() { return fmt.Errorf("maximum shards's number should be limited to %d", Params.ProxyCfg.MaxShardNum.GetAsInt()) } - if len(cct.schema.Fields) > Params.ProxyCfg.MaxFieldNum.GetAsInt() { + if len(t.schema.Fields) > Params.ProxyCfg.MaxFieldNum.GetAsInt() { return fmt.Errorf("maximum field's number should be limited to %d", Params.ProxyCfg.MaxFieldNum.GetAsInt()) } // validate collection name - if err := validateCollectionName(cct.schema.Name); err != nil { + if err := validateCollectionName(t.schema.Name); err != nil { return err } // validate whether field names duplicates - if err := validateDuplicatedFieldName(cct.schema.Fields); err != nil { + if err := validateDuplicatedFieldName(t.schema.Fields); err != nil { return err } // validate primary key definition - if err := validatePrimaryKey(cct.schema); err != nil { + if err := validatePrimaryKey(t.schema); err != nil { return err } // validate dynamic field - if err := validateDynamicField(cct.schema); err != nil { + if err := validateDynamicField(t.schema); err != nil { return err } // validate auto id definition - if err := ValidateFieldAutoID(cct.schema); err != nil { + if err := ValidateFieldAutoID(t.schema); err != nil { return err } // validate field type definition - if err := validateFieldType(cct.schema); err != nil { + if err := validateFieldType(t.schema); err != nil { return err } // validate partition key mode - if err := cct.validatePartitionKey(); err != nil { + if err := t.validatePartitionKey(); err != nil { return err } - for _, field := range cct.schema.Fields { + for _, field := range t.schema.Fields { // validate field name if err := validateFieldName(field.Name); err != nil { return err @@ -275,7 +275,7 @@ func (cct *createCollectionTask) PreExecute(ctx context.Context) error { // if max_length not specified, return error if field.DataType == schemapb.DataType_VarChar || (field.GetDataType() == schemapb.DataType_Array && field.GetElementType() == schemapb.DataType_VarChar) { - err = validateMaxLengthPerRow(cct.schema.Name, field) + err = validateMaxLengthPerRow(t.schema.Name, field) if err != nil { return err } @@ -283,17 +283,17 @@ func (cct *createCollectionTask) PreExecute(ctx context.Context) error { // valid max capacity for array per row parameters // if max_capacity not specified, return error if field.DataType == schemapb.DataType_Array { - if err = validateMaxCapacityPerRow(cct.schema.Name, field); err != nil { + if err = validateMaxCapacityPerRow(t.schema.Name, field); err != nil { return err } } } - if err := validateMultipleVectorFields(cct.schema); err != nil { + if err := validateMultipleVectorFields(t.schema); err != nil { return err } - cct.CreateCollectionRequest.Schema, err = proto.Marshal(cct.schema) + t.CreateCollectionRequest.Schema, err = proto.Marshal(t.schema) if err != nil { return err } @@ -301,13 +301,13 @@ func (cct *createCollectionTask) PreExecute(ctx context.Context) error { return nil } -func (cct *createCollectionTask) Execute(ctx context.Context) error { +func (t *createCollectionTask) Execute(ctx context.Context) error { var err error - cct.result, err = cct.rootCoord.CreateCollection(ctx, cct.CreateCollectionRequest) + t.result, err = t.rootCoord.CreateCollection(ctx, t.CreateCollectionRequest) return err } -func (cct *createCollectionTask) PostExecute(ctx context.Context) error { +func (t *createCollectionTask) PostExecute(ctx context.Context) error { return nil } @@ -321,60 +321,60 @@ type dropCollectionTask struct { chTicker channelsTimeTicker } -func (dct *dropCollectionTask) TraceCtx() context.Context { - return dct.ctx +func (t *dropCollectionTask) TraceCtx() context.Context { + return t.ctx } -func (dct *dropCollectionTask) ID() UniqueID { - return dct.Base.MsgID +func (t *dropCollectionTask) ID() UniqueID { + return t.Base.MsgID } -func (dct *dropCollectionTask) SetID(uid UniqueID) { - dct.Base.MsgID = uid +func (t *dropCollectionTask) SetID(uid UniqueID) { + t.Base.MsgID = uid } -func (dct *dropCollectionTask) Name() string { +func (t *dropCollectionTask) Name() string { return DropCollectionTaskName } -func (dct *dropCollectionTask) Type() commonpb.MsgType { - return dct.Base.MsgType +func (t *dropCollectionTask) Type() commonpb.MsgType { + return t.Base.MsgType } -func (dct *dropCollectionTask) BeginTs() Timestamp { - return dct.Base.Timestamp +func (t *dropCollectionTask) BeginTs() Timestamp { + return t.Base.Timestamp } -func (dct *dropCollectionTask) EndTs() Timestamp { - return dct.Base.Timestamp +func (t *dropCollectionTask) EndTs() Timestamp { + return t.Base.Timestamp } -func (dct *dropCollectionTask) SetTs(ts Timestamp) { - dct.Base.Timestamp = ts +func (t *dropCollectionTask) SetTs(ts Timestamp) { + t.Base.Timestamp = ts } -func (dct *dropCollectionTask) OnEnqueue() error { - dct.Base = commonpbutil.NewMsgBase() +func (t *dropCollectionTask) OnEnqueue() error { + t.Base = commonpbutil.NewMsgBase() return nil } -func (dct *dropCollectionTask) PreExecute(ctx context.Context) error { - dct.Base.MsgType = commonpb.MsgType_DropCollection - dct.Base.SourceID = paramtable.GetNodeID() +func (t *dropCollectionTask) PreExecute(ctx context.Context) error { + t.Base.MsgType = commonpb.MsgType_DropCollection + t.Base.SourceID = paramtable.GetNodeID() - if err := validateCollectionName(dct.CollectionName); err != nil { + if err := validateCollectionName(t.CollectionName); err != nil { return err } return nil } -func (dct *dropCollectionTask) Execute(ctx context.Context) error { +func (t *dropCollectionTask) Execute(ctx context.Context) error { var err error - dct.result, err = dct.rootCoord.DropCollection(ctx, dct.DropCollectionRequest) + t.result, err = t.rootCoord.DropCollection(ctx, t.DropCollectionRequest) return err } -func (dct *dropCollectionTask) PostExecute(ctx context.Context) error { +func (t *dropCollectionTask) PostExecute(ctx context.Context) error { return nil } @@ -386,69 +386,69 @@ type hasCollectionTask struct { result *milvuspb.BoolResponse } -func (hct *hasCollectionTask) TraceCtx() context.Context { - return hct.ctx +func (t *hasCollectionTask) TraceCtx() context.Context { + return t.ctx } -func (hct *hasCollectionTask) ID() UniqueID { - return hct.Base.MsgID +func (t *hasCollectionTask) ID() UniqueID { + return t.Base.MsgID } -func (hct *hasCollectionTask) SetID(uid UniqueID) { - hct.Base.MsgID = uid +func (t *hasCollectionTask) SetID(uid UniqueID) { + t.Base.MsgID = uid } -func (hct *hasCollectionTask) Name() string { +func (t *hasCollectionTask) Name() string { return HasCollectionTaskName } -func (hct *hasCollectionTask) Type() commonpb.MsgType { - return hct.Base.MsgType +func (t *hasCollectionTask) Type() commonpb.MsgType { + return t.Base.MsgType } -func (hct *hasCollectionTask) BeginTs() Timestamp { - return hct.Base.Timestamp +func (t *hasCollectionTask) BeginTs() Timestamp { + return t.Base.Timestamp } -func (hct *hasCollectionTask) EndTs() Timestamp { - return hct.Base.Timestamp +func (t *hasCollectionTask) EndTs() Timestamp { + return t.Base.Timestamp } -func (hct *hasCollectionTask) SetTs(ts Timestamp) { - hct.Base.Timestamp = ts +func (t *hasCollectionTask) SetTs(ts Timestamp) { + t.Base.Timestamp = ts } -func (hct *hasCollectionTask) OnEnqueue() error { - hct.Base = commonpbutil.NewMsgBase() +func (t *hasCollectionTask) OnEnqueue() error { + t.Base = commonpbutil.NewMsgBase() return nil } -func (hct *hasCollectionTask) PreExecute(ctx context.Context) error { - hct.Base.MsgType = commonpb.MsgType_HasCollection - hct.Base.SourceID = paramtable.GetNodeID() +func (t *hasCollectionTask) PreExecute(ctx context.Context) error { + t.Base.MsgType = commonpb.MsgType_HasCollection + t.Base.SourceID = paramtable.GetNodeID() - if err := validateCollectionName(hct.CollectionName); err != nil { + if err := validateCollectionName(t.CollectionName); err != nil { return err } return nil } -func (hct *hasCollectionTask) Execute(ctx context.Context) error { +func (t *hasCollectionTask) Execute(ctx context.Context) error { var err error - hct.result, err = hct.rootCoord.HasCollection(ctx, hct.HasCollectionRequest) + t.result, err = t.rootCoord.HasCollection(ctx, t.HasCollectionRequest) if err != nil { return err } - if hct.result == nil { + if t.result == nil { return errors.New("has collection resp is nil") } - if hct.result.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { - return merr.Error(hct.result.GetStatus()) + if t.result.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { + return merr.Error(t.result.GetStatus()) } return nil } -func (hct *hasCollectionTask) PostExecute(ctx context.Context) error { +func (t *hasCollectionTask) PostExecute(ctx context.Context) error { return nil } @@ -460,57 +460,57 @@ type describeCollectionTask struct { result *milvuspb.DescribeCollectionResponse } -func (dct *describeCollectionTask) TraceCtx() context.Context { - return dct.ctx +func (t *describeCollectionTask) TraceCtx() context.Context { + return t.ctx } -func (dct *describeCollectionTask) ID() UniqueID { - return dct.Base.MsgID +func (t *describeCollectionTask) ID() UniqueID { + return t.Base.MsgID } -func (dct *describeCollectionTask) SetID(uid UniqueID) { - dct.Base.MsgID = uid +func (t *describeCollectionTask) SetID(uid UniqueID) { + t.Base.MsgID = uid } -func (dct *describeCollectionTask) Name() string { +func (t *describeCollectionTask) Name() string { return DescribeCollectionTaskName } -func (dct *describeCollectionTask) Type() commonpb.MsgType { - return dct.Base.MsgType +func (t *describeCollectionTask) Type() commonpb.MsgType { + return t.Base.MsgType } -func (dct *describeCollectionTask) BeginTs() Timestamp { - return dct.Base.Timestamp +func (t *describeCollectionTask) BeginTs() Timestamp { + return t.Base.Timestamp } -func (dct *describeCollectionTask) EndTs() Timestamp { - return dct.Base.Timestamp +func (t *describeCollectionTask) EndTs() Timestamp { + return t.Base.Timestamp } -func (dct *describeCollectionTask) SetTs(ts Timestamp) { - dct.Base.Timestamp = ts +func (t *describeCollectionTask) SetTs(ts Timestamp) { + t.Base.Timestamp = ts } -func (dct *describeCollectionTask) OnEnqueue() error { - dct.Base = commonpbutil.NewMsgBase() +func (t *describeCollectionTask) OnEnqueue() error { + t.Base = commonpbutil.NewMsgBase() return nil } -func (dct *describeCollectionTask) PreExecute(ctx context.Context) error { - dct.Base.MsgType = commonpb.MsgType_DescribeCollection - dct.Base.SourceID = paramtable.GetNodeID() +func (t *describeCollectionTask) PreExecute(ctx context.Context) error { + t.Base.MsgType = commonpb.MsgType_DescribeCollection + t.Base.SourceID = paramtable.GetNodeID() - if dct.CollectionID != 0 && len(dct.CollectionName) == 0 { + if t.CollectionID != 0 && len(t.CollectionName) == 0 { return nil } - return validateCollectionName(dct.CollectionName) + return validateCollectionName(t.CollectionName) } -func (dct *describeCollectionTask) Execute(ctx context.Context) error { +func (t *describeCollectionTask) Execute(ctx context.Context) error { var err error - dct.result = &milvuspb.DescribeCollectionResponse{ + t.result = &milvuspb.DescribeCollectionResponse{ Status: merr.Status(nil), Schema: &schemapb.CollectionSchema{ Name: "", @@ -521,48 +521,48 @@ func (dct *describeCollectionTask) Execute(ctx context.Context) error { CollectionID: 0, VirtualChannelNames: nil, PhysicalChannelNames: nil, - CollectionName: dct.GetCollectionName(), - DbName: dct.GetDbName(), + CollectionName: t.GetCollectionName(), + DbName: t.GetDbName(), } - result, err := dct.rootCoord.DescribeCollection(ctx, dct.DescribeCollectionRequest) + result, err := t.rootCoord.DescribeCollection(ctx, t.DescribeCollectionRequest) if err != nil { return err } if result.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { - dct.result.Status = result.Status + t.result.Status = result.Status // compatibility with PyMilvus existing implementation - err := merr.Error(dct.result.GetStatus()) + err := merr.Error(t.result.GetStatus()) if errors.Is(err, merr.ErrCollectionNotFound) { // nolint - dct.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError + t.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError // nolint - dct.result.Status.Reason = "can't find collection " + dct.result.GetStatus().GetReason() + t.result.Status.Reason = "can't find collection " + t.result.GetStatus().GetReason() } } else { - dct.result.Schema.Name = result.Schema.Name - dct.result.Schema.Description = result.Schema.Description - dct.result.Schema.AutoID = result.Schema.AutoID - dct.result.Schema.EnableDynamicField = result.Schema.EnableDynamicField - dct.result.CollectionID = result.CollectionID - dct.result.VirtualChannelNames = result.VirtualChannelNames - dct.result.PhysicalChannelNames = result.PhysicalChannelNames - dct.result.CreatedTimestamp = result.CreatedTimestamp - dct.result.CreatedUtcTimestamp = result.CreatedUtcTimestamp - dct.result.ShardsNum = result.ShardsNum - dct.result.ConsistencyLevel = result.ConsistencyLevel - dct.result.Aliases = result.Aliases - dct.result.Properties = result.Properties - dct.result.DbName = result.GetDbName() - dct.result.NumPartitions = result.NumPartitions + t.result.Schema.Name = result.Schema.Name + t.result.Schema.Description = result.Schema.Description + t.result.Schema.AutoID = result.Schema.AutoID + t.result.Schema.EnableDynamicField = result.Schema.EnableDynamicField + t.result.CollectionID = result.CollectionID + t.result.VirtualChannelNames = result.VirtualChannelNames + t.result.PhysicalChannelNames = result.PhysicalChannelNames + t.result.CreatedTimestamp = result.CreatedTimestamp + t.result.CreatedUtcTimestamp = result.CreatedUtcTimestamp + t.result.ShardsNum = result.ShardsNum + t.result.ConsistencyLevel = result.ConsistencyLevel + t.result.Aliases = result.Aliases + t.result.Properties = result.Properties + t.result.DbName = result.GetDbName() + t.result.NumPartitions = result.NumPartitions for _, field := range result.Schema.Fields { if field.IsDynamic { continue } if field.FieldID >= common.StartOfUserFieldID { - dct.result.Schema.Fields = append(dct.result.Schema.Fields, &schemapb.FieldSchema{ + t.result.Schema.Fields = append(t.result.Schema.Fields, &schemapb.FieldSchema{ FieldID: field.FieldID, Name: field.Name, IsPrimaryKey: field.IsPrimaryKey, @@ -582,7 +582,7 @@ func (dct *describeCollectionTask) Execute(ctx context.Context) error { return nil } -func (dct *describeCollectionTask) PostExecute(ctx context.Context) error { +func (t *describeCollectionTask) PostExecute(ctx context.Context) error { return nil } @@ -595,48 +595,48 @@ type showCollectionsTask struct { result *milvuspb.ShowCollectionsResponse } -func (sct *showCollectionsTask) TraceCtx() context.Context { - return sct.ctx +func (t *showCollectionsTask) TraceCtx() context.Context { + return t.ctx } -func (sct *showCollectionsTask) ID() UniqueID { - return sct.Base.MsgID +func (t *showCollectionsTask) ID() UniqueID { + return t.Base.MsgID } -func (sct *showCollectionsTask) SetID(uid UniqueID) { - sct.Base.MsgID = uid +func (t *showCollectionsTask) SetID(uid UniqueID) { + t.Base.MsgID = uid } -func (sct *showCollectionsTask) Name() string { +func (t *showCollectionsTask) Name() string { return ShowCollectionTaskName } -func (sct *showCollectionsTask) Type() commonpb.MsgType { - return sct.Base.MsgType +func (t *showCollectionsTask) Type() commonpb.MsgType { + return t.Base.MsgType } -func (sct *showCollectionsTask) BeginTs() Timestamp { - return sct.Base.Timestamp +func (t *showCollectionsTask) BeginTs() Timestamp { + return t.Base.Timestamp } -func (sct *showCollectionsTask) EndTs() Timestamp { - return sct.Base.Timestamp +func (t *showCollectionsTask) EndTs() Timestamp { + return t.Base.Timestamp } -func (sct *showCollectionsTask) SetTs(ts Timestamp) { - sct.Base.Timestamp = ts +func (t *showCollectionsTask) SetTs(ts Timestamp) { + t.Base.Timestamp = ts } -func (sct *showCollectionsTask) OnEnqueue() error { - sct.Base = commonpbutil.NewMsgBase() +func (t *showCollectionsTask) OnEnqueue() error { + t.Base = commonpbutil.NewMsgBase() return nil } -func (sct *showCollectionsTask) PreExecute(ctx context.Context) error { - sct.Base.MsgType = commonpb.MsgType_ShowCollections - sct.Base.SourceID = paramtable.GetNodeID() - if sct.GetType() == milvuspb.ShowType_InMemory { - for _, collectionName := range sct.CollectionNames { +func (t *showCollectionsTask) PreExecute(ctx context.Context) error { + t.Base.MsgType = commonpb.MsgType_ShowCollections + t.Base.SourceID = paramtable.GetNodeID() + if t.GetType() == milvuspb.ShowType_InMemory { + for _, collectionName := range t.CollectionNames { if err := validateCollectionName(collectionName); err != nil { return err } @@ -646,8 +646,8 @@ func (sct *showCollectionsTask) PreExecute(ctx context.Context) error { return nil } -func (sct *showCollectionsTask) Execute(ctx context.Context) error { - respFromRootCoord, err := sct.rootCoord.ShowCollections(ctx, sct.ShowCollectionsRequest) +func (t *showCollectionsTask) Execute(ctx context.Context) error { + respFromRootCoord, err := t.rootCoord.ShowCollections(ctx, t.ShowCollectionsRequest) if err != nil { return err } @@ -660,30 +660,30 @@ func (sct *showCollectionsTask) Execute(ctx context.Context) error { return merr.Error(respFromRootCoord.GetStatus()) } - if sct.GetType() == milvuspb.ShowType_InMemory { + if t.GetType() == milvuspb.ShowType_InMemory { IDs2Names := make(map[UniqueID]string) for offset, collectionName := range respFromRootCoord.CollectionNames { collectionID := respFromRootCoord.CollectionIds[offset] IDs2Names[collectionID] = collectionName } collectionIDs := make([]UniqueID, 0) - for _, collectionName := range sct.CollectionNames { - collectionID, err := globalMetaCache.GetCollectionID(ctx, sct.GetDbName(), collectionName) + for _, collectionName := range t.CollectionNames { + collectionID, err := globalMetaCache.GetCollectionID(ctx, t.GetDbName(), collectionName) if err != nil { log.Debug("Failed to get collection id.", zap.Any("collectionName", collectionName), - zap.Any("requestID", sct.Base.MsgID), zap.Any("requestType", "showCollections")) + zap.Any("requestID", t.Base.MsgID), zap.Any("requestType", "showCollections")) return err } collectionIDs = append(collectionIDs, collectionID) IDs2Names[collectionID] = collectionName } - resp, err := sct.queryCoord.ShowCollections(ctx, &querypb.ShowCollectionsRequest{ + resp, err := t.queryCoord.ShowCollections(ctx, &querypb.ShowCollectionsRequest{ Base: commonpbutil.UpdateMsgBase( - sct.Base, + t.Base, commonpbutil.WithMsgType(commonpb.MsgType_ShowCollections), ), - // DbID: sct.ShowCollectionsRequest.DbName, + // DbID: t.ShowCollectionsRequest.DbName, CollectionIDs: collectionIDs, }) if err != nil { @@ -703,7 +703,7 @@ func (sct *showCollectionsTask) Execute(ctx context.Context) error { return errors.New(newErrorReason) } - sct.result = &milvuspb.ShowCollectionsResponse{ + t.result = &milvuspb.ShowCollectionsResponse{ Status: resp.Status, CollectionNames: make([]string, 0, len(resp.CollectionIDs)), CollectionIds: make([]int64, 0, len(resp.CollectionIDs)), @@ -718,30 +718,30 @@ func (sct *showCollectionsTask) Execute(ctx context.Context) error { if !ok { log.Debug("Failed to get collection info. This collection may be not released", zap.Any("collectionID", id), - zap.Any("requestID", sct.Base.MsgID), zap.Any("requestType", "showCollections")) + zap.Any("requestID", t.Base.MsgID), zap.Any("requestType", "showCollections")) continue } - collectionInfo, err := globalMetaCache.GetCollectionInfo(ctx, sct.GetDbName(), collectionName, id) + collectionInfo, err := globalMetaCache.GetCollectionInfo(ctx, t.GetDbName(), collectionName, id) if err != nil { log.Debug("Failed to get collection info.", zap.Any("collectionName", collectionName), - zap.Any("requestID", sct.Base.MsgID), zap.Any("requestType", "showCollections")) + zap.Any("requestID", t.Base.MsgID), zap.Any("requestType", "showCollections")) return err } - sct.result.CollectionIds = append(sct.result.CollectionIds, id) - sct.result.CollectionNames = append(sct.result.CollectionNames, collectionName) - sct.result.CreatedTimestamps = append(sct.result.CreatedTimestamps, collectionInfo.createdTimestamp) - sct.result.CreatedUtcTimestamps = append(sct.result.CreatedUtcTimestamps, collectionInfo.createdUtcTimestamp) - sct.result.InMemoryPercentages = append(sct.result.InMemoryPercentages, resp.InMemoryPercentages[offset]) - sct.result.QueryServiceAvailable = append(sct.result.QueryServiceAvailable, resp.QueryServiceAvailable[offset]) + t.result.CollectionIds = append(t.result.CollectionIds, id) + t.result.CollectionNames = append(t.result.CollectionNames, collectionName) + t.result.CreatedTimestamps = append(t.result.CreatedTimestamps, collectionInfo.createdTimestamp) + t.result.CreatedUtcTimestamps = append(t.result.CreatedUtcTimestamps, collectionInfo.createdUtcTimestamp) + t.result.InMemoryPercentages = append(t.result.InMemoryPercentages, resp.InMemoryPercentages[offset]) + t.result.QueryServiceAvailable = append(t.result.QueryServiceAvailable, resp.QueryServiceAvailable[offset]) } } else { - sct.result = respFromRootCoord + t.result = respFromRootCoord } return nil } -func (sct *showCollectionsTask) PostExecute(ctx context.Context) error { +func (t *showCollectionsTask) PostExecute(ctx context.Context) error { return nil } @@ -753,57 +753,57 @@ type alterCollectionTask struct { result *commonpb.Status } -func (act *alterCollectionTask) TraceCtx() context.Context { - return act.ctx +func (t *alterCollectionTask) TraceCtx() context.Context { + return t.ctx } -func (act *alterCollectionTask) ID() UniqueID { - return act.Base.MsgID +func (t *alterCollectionTask) ID() UniqueID { + return t.Base.MsgID } -func (act *alterCollectionTask) SetID(uid UniqueID) { - act.Base.MsgID = uid +func (t *alterCollectionTask) SetID(uid UniqueID) { + t.Base.MsgID = uid } -func (act *alterCollectionTask) Name() string { +func (t *alterCollectionTask) Name() string { return AlterCollectionTaskName } -func (act *alterCollectionTask) Type() commonpb.MsgType { - return act.Base.MsgType +func (t *alterCollectionTask) Type() commonpb.MsgType { + return t.Base.MsgType } -func (act *alterCollectionTask) BeginTs() Timestamp { - return act.Base.Timestamp +func (t *alterCollectionTask) BeginTs() Timestamp { + return t.Base.Timestamp } -func (act *alterCollectionTask) EndTs() Timestamp { - return act.Base.Timestamp +func (t *alterCollectionTask) EndTs() Timestamp { + return t.Base.Timestamp } -func (act *alterCollectionTask) SetTs(ts Timestamp) { - act.Base.Timestamp = ts +func (t *alterCollectionTask) SetTs(ts Timestamp) { + t.Base.Timestamp = ts } -func (act *alterCollectionTask) OnEnqueue() error { - act.Base = commonpbutil.NewMsgBase() +func (t *alterCollectionTask) OnEnqueue() error { + t.Base = commonpbutil.NewMsgBase() return nil } -func (act *alterCollectionTask) PreExecute(ctx context.Context) error { - act.Base.MsgType = commonpb.MsgType_AlterCollection - act.Base.SourceID = paramtable.GetNodeID() +func (t *alterCollectionTask) PreExecute(ctx context.Context) error { + t.Base.MsgType = commonpb.MsgType_AlterCollection + t.Base.SourceID = paramtable.GetNodeID() return nil } -func (act *alterCollectionTask) Execute(ctx context.Context) error { +func (t *alterCollectionTask) Execute(ctx context.Context) error { var err error - act.result, err = act.rootCoord.AlterCollection(ctx, act.AlterCollectionRequest) + t.result, err = t.rootCoord.AlterCollection(ctx, t.AlterCollectionRequest) return err } -func (act *alterCollectionTask) PostExecute(ctx context.Context) error { +func (t *alterCollectionTask) PostExecute(ctx context.Context) error { return nil } @@ -815,54 +815,54 @@ type createPartitionTask struct { result *commonpb.Status } -func (cpt *createPartitionTask) TraceCtx() context.Context { - return cpt.ctx +func (t *createPartitionTask) TraceCtx() context.Context { + return t.ctx } -func (cpt *createPartitionTask) ID() UniqueID { - return cpt.Base.MsgID +func (t *createPartitionTask) ID() UniqueID { + return t.Base.MsgID } -func (cpt *createPartitionTask) SetID(uid UniqueID) { - cpt.Base.MsgID = uid +func (t *createPartitionTask) SetID(uid UniqueID) { + t.Base.MsgID = uid } -func (cpt *createPartitionTask) Name() string { +func (t *createPartitionTask) Name() string { return CreatePartitionTaskName } -func (cpt *createPartitionTask) Type() commonpb.MsgType { - return cpt.Base.MsgType +func (t *createPartitionTask) Type() commonpb.MsgType { + return t.Base.MsgType } -func (cpt *createPartitionTask) BeginTs() Timestamp { - return cpt.Base.Timestamp +func (t *createPartitionTask) BeginTs() Timestamp { + return t.Base.Timestamp } -func (cpt *createPartitionTask) EndTs() Timestamp { - return cpt.Base.Timestamp +func (t *createPartitionTask) EndTs() Timestamp { + return t.Base.Timestamp } -func (cpt *createPartitionTask) SetTs(ts Timestamp) { - cpt.Base.Timestamp = ts +func (t *createPartitionTask) SetTs(ts Timestamp) { + t.Base.Timestamp = ts } -func (cpt *createPartitionTask) OnEnqueue() error { - cpt.Base = commonpbutil.NewMsgBase() +func (t *createPartitionTask) OnEnqueue() error { + t.Base = commonpbutil.NewMsgBase() return nil } -func (cpt *createPartitionTask) PreExecute(ctx context.Context) error { - cpt.Base.MsgType = commonpb.MsgType_CreatePartition - cpt.Base.SourceID = paramtable.GetNodeID() +func (t *createPartitionTask) PreExecute(ctx context.Context) error { + t.Base.MsgType = commonpb.MsgType_CreatePartition + t.Base.SourceID = paramtable.GetNodeID() - collName, partitionTag := cpt.CollectionName, cpt.PartitionName + collName, partitionTag := t.CollectionName, t.PartitionName if err := validateCollectionName(collName); err != nil { return err } - partitionKeyMode, err := isPartitionKeyMode(ctx, cpt.GetDbName(), collName) + partitionKeyMode, err := isPartitionKeyMode(ctx, t.GetDbName(), collName) if err != nil { return err } @@ -877,18 +877,18 @@ func (cpt *createPartitionTask) PreExecute(ctx context.Context) error { return nil } -func (cpt *createPartitionTask) Execute(ctx context.Context) (err error) { - cpt.result, err = cpt.rootCoord.CreatePartition(ctx, cpt.CreatePartitionRequest) +func (t *createPartitionTask) Execute(ctx context.Context) (err error) { + t.result, err = t.rootCoord.CreatePartition(ctx, t.CreatePartitionRequest) if err != nil { return err } - if cpt.result.ErrorCode != commonpb.ErrorCode_Success { - return errors.New(cpt.result.Reason) + if t.result.ErrorCode != commonpb.ErrorCode_Success { + return errors.New(t.result.Reason) } return err } -func (cpt *createPartitionTask) PostExecute(ctx context.Context) error { +func (t *createPartitionTask) PostExecute(ctx context.Context) error { return nil } @@ -901,54 +901,54 @@ type dropPartitionTask struct { result *commonpb.Status } -func (dpt *dropPartitionTask) TraceCtx() context.Context { - return dpt.ctx +func (t *dropPartitionTask) TraceCtx() context.Context { + return t.ctx } -func (dpt *dropPartitionTask) ID() UniqueID { - return dpt.Base.MsgID +func (t *dropPartitionTask) ID() UniqueID { + return t.Base.MsgID } -func (dpt *dropPartitionTask) SetID(uid UniqueID) { - dpt.Base.MsgID = uid +func (t *dropPartitionTask) SetID(uid UniqueID) { + t.Base.MsgID = uid } -func (dpt *dropPartitionTask) Name() string { +func (t *dropPartitionTask) Name() string { return DropPartitionTaskName } -func (dpt *dropPartitionTask) Type() commonpb.MsgType { - return dpt.Base.MsgType +func (t *dropPartitionTask) Type() commonpb.MsgType { + return t.Base.MsgType } -func (dpt *dropPartitionTask) BeginTs() Timestamp { - return dpt.Base.Timestamp +func (t *dropPartitionTask) BeginTs() Timestamp { + return t.Base.Timestamp } -func (dpt *dropPartitionTask) EndTs() Timestamp { - return dpt.Base.Timestamp +func (t *dropPartitionTask) EndTs() Timestamp { + return t.Base.Timestamp } -func (dpt *dropPartitionTask) SetTs(ts Timestamp) { - dpt.Base.Timestamp = ts +func (t *dropPartitionTask) SetTs(ts Timestamp) { + t.Base.Timestamp = ts } -func (dpt *dropPartitionTask) OnEnqueue() error { - dpt.Base = commonpbutil.NewMsgBase() +func (t *dropPartitionTask) OnEnqueue() error { + t.Base = commonpbutil.NewMsgBase() return nil } -func (dpt *dropPartitionTask) PreExecute(ctx context.Context) error { - dpt.Base.MsgType = commonpb.MsgType_DropPartition - dpt.Base.SourceID = paramtable.GetNodeID() +func (t *dropPartitionTask) PreExecute(ctx context.Context) error { + t.Base.MsgType = commonpb.MsgType_DropPartition + t.Base.SourceID = paramtable.GetNodeID() - collName, partitionTag := dpt.CollectionName, dpt.PartitionName + collName, partitionTag := t.CollectionName, t.PartitionName if err := validateCollectionName(collName); err != nil { return err } - partitionKeyMode, err := isPartitionKeyMode(ctx, dpt.GetDbName(), collName) + partitionKeyMode, err := isPartitionKeyMode(ctx, t.GetDbName(), collName) if err != nil { return err } @@ -960,11 +960,11 @@ func (dpt *dropPartitionTask) PreExecute(ctx context.Context) error { return err } - collID, err := globalMetaCache.GetCollectionID(ctx, dpt.GetDbName(), dpt.GetCollectionName()) + collID, err := globalMetaCache.GetCollectionID(ctx, t.GetDbName(), t.GetCollectionName()) if err != nil { return err } - partID, err := globalMetaCache.GetPartitionID(ctx, dpt.GetDbName(), dpt.GetCollectionName(), dpt.GetPartitionName()) + partID, err := globalMetaCache.GetPartitionID(ctx, t.GetDbName(), t.GetCollectionName(), t.GetPartitionName()) if err != nil { if errors.Is(merr.ErrPartitionNotFound, err) { return nil @@ -972,12 +972,12 @@ func (dpt *dropPartitionTask) PreExecute(ctx context.Context) error { return err } - collLoaded, err := isCollectionLoaded(ctx, dpt.queryCoord, collID) + collLoaded, err := isCollectionLoaded(ctx, t.queryCoord, collID) if err != nil { return err } if collLoaded { - loaded, err := isPartitionLoaded(ctx, dpt.queryCoord, collID, []int64{partID}) + loaded, err := isPartitionLoaded(ctx, t.queryCoord, collID, []int64{partID}) if err != nil { return err } @@ -989,18 +989,18 @@ func (dpt *dropPartitionTask) PreExecute(ctx context.Context) error { return nil } -func (dpt *dropPartitionTask) Execute(ctx context.Context) (err error) { - dpt.result, err = dpt.rootCoord.DropPartition(ctx, dpt.DropPartitionRequest) +func (t *dropPartitionTask) Execute(ctx context.Context) (err error) { + t.result, err = t.rootCoord.DropPartition(ctx, t.DropPartitionRequest) if err != nil { return err } - if dpt.result.ErrorCode != commonpb.ErrorCode_Success { - return errors.New(dpt.result.Reason) + if t.result.ErrorCode != commonpb.ErrorCode_Success { + return errors.New(t.result.Reason) } return err } -func (dpt *dropPartitionTask) PostExecute(ctx context.Context) error { +func (t *dropPartitionTask) PostExecute(ctx context.Context) error { return nil } @@ -1012,48 +1012,48 @@ type hasPartitionTask struct { result *milvuspb.BoolResponse } -func (hpt *hasPartitionTask) TraceCtx() context.Context { - return hpt.ctx +func (t *hasPartitionTask) TraceCtx() context.Context { + return t.ctx } -func (hpt *hasPartitionTask) ID() UniqueID { - return hpt.Base.MsgID +func (t *hasPartitionTask) ID() UniqueID { + return t.Base.MsgID } -func (hpt *hasPartitionTask) SetID(uid UniqueID) { - hpt.Base.MsgID = uid +func (t *hasPartitionTask) SetID(uid UniqueID) { + t.Base.MsgID = uid } -func (hpt *hasPartitionTask) Name() string { +func (t *hasPartitionTask) Name() string { return HasPartitionTaskName } -func (hpt *hasPartitionTask) Type() commonpb.MsgType { - return hpt.Base.MsgType +func (t *hasPartitionTask) Type() commonpb.MsgType { + return t.Base.MsgType } -func (hpt *hasPartitionTask) BeginTs() Timestamp { - return hpt.Base.Timestamp +func (t *hasPartitionTask) BeginTs() Timestamp { + return t.Base.Timestamp } -func (hpt *hasPartitionTask) EndTs() Timestamp { - return hpt.Base.Timestamp +func (t *hasPartitionTask) EndTs() Timestamp { + return t.Base.Timestamp } -func (hpt *hasPartitionTask) SetTs(ts Timestamp) { - hpt.Base.Timestamp = ts +func (t *hasPartitionTask) SetTs(ts Timestamp) { + t.Base.Timestamp = ts } -func (hpt *hasPartitionTask) OnEnqueue() error { - hpt.Base = commonpbutil.NewMsgBase() +func (t *hasPartitionTask) OnEnqueue() error { + t.Base = commonpbutil.NewMsgBase() return nil } -func (hpt *hasPartitionTask) PreExecute(ctx context.Context) error { - hpt.Base.MsgType = commonpb.MsgType_HasPartition - hpt.Base.SourceID = paramtable.GetNodeID() +func (t *hasPartitionTask) PreExecute(ctx context.Context) error { + t.Base.MsgType = commonpb.MsgType_HasPartition + t.Base.SourceID = paramtable.GetNodeID() - collName, partitionTag := hpt.CollectionName, hpt.PartitionName + collName, partitionTag := t.CollectionName, t.PartitionName if err := validateCollectionName(collName); err != nil { return err @@ -1065,18 +1065,18 @@ func (hpt *hasPartitionTask) PreExecute(ctx context.Context) error { return nil } -func (hpt *hasPartitionTask) Execute(ctx context.Context) (err error) { - hpt.result, err = hpt.rootCoord.HasPartition(ctx, hpt.HasPartitionRequest) +func (t *hasPartitionTask) Execute(ctx context.Context) (err error) { + t.result, err = t.rootCoord.HasPartition(ctx, t.HasPartitionRequest) if err != nil { return err } - if hpt.result.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { - return merr.Error(hpt.result.GetStatus()) + if t.result.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { + return merr.Error(t.result.GetStatus()) } return err } -func (hpt *hasPartitionTask) PostExecute(ctx context.Context) error { +func (t *hasPartitionTask) PostExecute(ctx context.Context) error { return nil } @@ -1089,53 +1089,53 @@ type showPartitionsTask struct { result *milvuspb.ShowPartitionsResponse } -func (spt *showPartitionsTask) TraceCtx() context.Context { - return spt.ctx +func (t *showPartitionsTask) TraceCtx() context.Context { + return t.ctx } -func (spt *showPartitionsTask) ID() UniqueID { - return spt.Base.MsgID +func (t *showPartitionsTask) ID() UniqueID { + return t.Base.MsgID } -func (spt *showPartitionsTask) SetID(uid UniqueID) { - spt.Base.MsgID = uid +func (t *showPartitionsTask) SetID(uid UniqueID) { + t.Base.MsgID = uid } -func (spt *showPartitionsTask) Name() string { +func (t *showPartitionsTask) Name() string { return ShowPartitionTaskName } -func (spt *showPartitionsTask) Type() commonpb.MsgType { - return spt.Base.MsgType +func (t *showPartitionsTask) Type() commonpb.MsgType { + return t.Base.MsgType } -func (spt *showPartitionsTask) BeginTs() Timestamp { - return spt.Base.Timestamp +func (t *showPartitionsTask) BeginTs() Timestamp { + return t.Base.Timestamp } -func (spt *showPartitionsTask) EndTs() Timestamp { - return spt.Base.Timestamp +func (t *showPartitionsTask) EndTs() Timestamp { + return t.Base.Timestamp } -func (spt *showPartitionsTask) SetTs(ts Timestamp) { - spt.Base.Timestamp = ts +func (t *showPartitionsTask) SetTs(ts Timestamp) { + t.Base.Timestamp = ts } -func (spt *showPartitionsTask) OnEnqueue() error { - spt.Base = commonpbutil.NewMsgBase() +func (t *showPartitionsTask) OnEnqueue() error { + t.Base = commonpbutil.NewMsgBase() return nil } -func (spt *showPartitionsTask) PreExecute(ctx context.Context) error { - spt.Base.MsgType = commonpb.MsgType_ShowPartitions - spt.Base.SourceID = paramtable.GetNodeID() +func (t *showPartitionsTask) PreExecute(ctx context.Context) error { + t.Base.MsgType = commonpb.MsgType_ShowPartitions + t.Base.SourceID = paramtable.GetNodeID() - if err := validateCollectionName(spt.CollectionName); err != nil { + if err := validateCollectionName(t.CollectionName); err != nil { return err } - if spt.GetType() == milvuspb.ShowType_InMemory { - for _, partitionName := range spt.PartitionNames { + if t.GetType() == milvuspb.ShowType_InMemory { + for _, partitionName := range t.PartitionNames { if err := validatePartitionTag(partitionName, true); err != nil { return err } @@ -1145,8 +1145,8 @@ func (spt *showPartitionsTask) PreExecute(ctx context.Context) error { return nil } -func (spt *showPartitionsTask) Execute(ctx context.Context) error { - respFromRootCoord, err := spt.rootCoord.ShowPartitions(ctx, spt.ShowPartitionsRequest) +func (t *showPartitionsTask) Execute(ctx context.Context) error { + respFromRootCoord, err := t.rootCoord.ShowPartitions(ctx, t.ShowPartitionsRequest) if err != nil { return err } @@ -1159,12 +1159,12 @@ func (spt *showPartitionsTask) Execute(ctx context.Context) error { return merr.Error(respFromRootCoord.GetStatus()) } - if spt.GetType() == milvuspb.ShowType_InMemory { - collectionName := spt.CollectionName - collectionID, err := globalMetaCache.GetCollectionID(ctx, spt.GetDbName(), collectionName) + if t.GetType() == milvuspb.ShowType_InMemory { + collectionName := t.CollectionName + collectionID, err := globalMetaCache.GetCollectionID(ctx, t.GetDbName(), collectionName) if err != nil { log.Debug("Failed to get collection id.", zap.Any("collectionName", collectionName), - zap.Any("requestID", spt.Base.MsgID), zap.Any("requestType", "showPartitions")) + zap.Any("requestID", t.Base.MsgID), zap.Any("requestType", "showPartitions")) return err } IDs2Names := make(map[UniqueID]string) @@ -1173,19 +1173,19 @@ func (spt *showPartitionsTask) Execute(ctx context.Context) error { IDs2Names[partitionID] = partitionName } partitionIDs := make([]UniqueID, 0) - for _, partitionName := range spt.PartitionNames { - partitionID, err := globalMetaCache.GetPartitionID(ctx, spt.GetDbName(), collectionName, partitionName) + for _, partitionName := range t.PartitionNames { + partitionID, err := globalMetaCache.GetPartitionID(ctx, t.GetDbName(), collectionName, partitionName) if err != nil { log.Debug("Failed to get partition id.", zap.Any("partitionName", partitionName), - zap.Any("requestID", spt.Base.MsgID), zap.Any("requestType", "showPartitions")) + zap.Any("requestID", t.Base.MsgID), zap.Any("requestType", "showPartitions")) return err } partitionIDs = append(partitionIDs, partitionID) IDs2Names[partitionID] = partitionName } - resp, err := spt.queryCoord.ShowPartitions(ctx, &querypb.ShowPartitionsRequest{ + resp, err := t.queryCoord.ShowPartitions(ctx, &querypb.ShowPartitionsRequest{ Base: commonpbutil.UpdateMsgBase( - spt.Base, + t.Base, commonpbutil.WithMsgType(commonpb.MsgType_ShowCollections), ), CollectionID: collectionID, @@ -1203,7 +1203,7 @@ func (spt *showPartitionsTask) Execute(ctx context.Context) error { return merr.Error(resp.GetStatus()) } - spt.result = &milvuspb.ShowPartitionsResponse{ + t.result = &milvuspb.ShowPartitionsResponse{ Status: resp.Status, PartitionNames: make([]string, 0, len(resp.PartitionIDs)), PartitionIDs: make([]int64, 0, len(resp.PartitionIDs)), @@ -1216,29 +1216,29 @@ func (spt *showPartitionsTask) Execute(ctx context.Context) error { partitionName, ok := IDs2Names[id] if !ok { log.Debug("Failed to get partition id.", zap.Any("partitionName", partitionName), - zap.Any("requestID", spt.Base.MsgID), zap.Any("requestType", "showPartitions")) + zap.Any("requestID", t.Base.MsgID), zap.Any("requestType", "showPartitions")) return errors.New("failed to show partitions") } - partitionInfo, err := globalMetaCache.GetPartitionInfo(ctx, spt.GetDbName(), collectionName, partitionName) + partitionInfo, err := globalMetaCache.GetPartitionInfo(ctx, t.GetDbName(), collectionName, partitionName) if err != nil { log.Debug("Failed to get partition id.", zap.Any("partitionName", partitionName), - zap.Any("requestID", spt.Base.MsgID), zap.Any("requestType", "showPartitions")) + zap.Any("requestID", t.Base.MsgID), zap.Any("requestType", "showPartitions")) return err } - spt.result.PartitionIDs = append(spt.result.PartitionIDs, id) - spt.result.PartitionNames = append(spt.result.PartitionNames, partitionName) - spt.result.CreatedTimestamps = append(spt.result.CreatedTimestamps, partitionInfo.createdTimestamp) - spt.result.CreatedUtcTimestamps = append(spt.result.CreatedUtcTimestamps, partitionInfo.createdUtcTimestamp) - spt.result.InMemoryPercentages = append(spt.result.InMemoryPercentages, resp.InMemoryPercentages[offset]) + t.result.PartitionIDs = append(t.result.PartitionIDs, id) + t.result.PartitionNames = append(t.result.PartitionNames, partitionName) + t.result.CreatedTimestamps = append(t.result.CreatedTimestamps, partitionInfo.createdTimestamp) + t.result.CreatedUtcTimestamps = append(t.result.CreatedUtcTimestamps, partitionInfo.createdUtcTimestamp) + t.result.InMemoryPercentages = append(t.result.InMemoryPercentages, resp.InMemoryPercentages[offset]) } } else { - spt.result = respFromRootCoord + t.result = respFromRootCoord } return nil } -func (spt *showPartitionsTask) PostExecute(ctx context.Context) error { +func (t *showPartitionsTask) PostExecute(ctx context.Context) error { return nil } @@ -1250,68 +1250,68 @@ type flushTask struct { result *milvuspb.FlushResponse } -func (ft *flushTask) TraceCtx() context.Context { - return ft.ctx +func (t *flushTask) TraceCtx() context.Context { + return t.ctx } -func (ft *flushTask) ID() UniqueID { - return ft.Base.MsgID +func (t *flushTask) ID() UniqueID { + return t.Base.MsgID } -func (ft *flushTask) SetID(uid UniqueID) { - ft.Base.MsgID = uid +func (t *flushTask) SetID(uid UniqueID) { + t.Base.MsgID = uid } -func (ft *flushTask) Name() string { +func (t *flushTask) Name() string { return FlushTaskName } -func (ft *flushTask) Type() commonpb.MsgType { - return ft.Base.MsgType +func (t *flushTask) Type() commonpb.MsgType { + return t.Base.MsgType } -func (ft *flushTask) BeginTs() Timestamp { - return ft.Base.Timestamp +func (t *flushTask) BeginTs() Timestamp { + return t.Base.Timestamp } -func (ft *flushTask) EndTs() Timestamp { - return ft.Base.Timestamp +func (t *flushTask) EndTs() Timestamp { + return t.Base.Timestamp } -func (ft *flushTask) SetTs(ts Timestamp) { - ft.Base.Timestamp = ts +func (t *flushTask) SetTs(ts Timestamp) { + t.Base.Timestamp = ts } -func (ft *flushTask) OnEnqueue() error { - ft.Base = commonpbutil.NewMsgBase() +func (t *flushTask) OnEnqueue() error { + t.Base = commonpbutil.NewMsgBase() return nil } -func (ft *flushTask) PreExecute(ctx context.Context) error { - ft.Base.MsgType = commonpb.MsgType_Flush - ft.Base.SourceID = paramtable.GetNodeID() +func (t *flushTask) PreExecute(ctx context.Context) error { + t.Base.MsgType = commonpb.MsgType_Flush + t.Base.SourceID = paramtable.GetNodeID() return nil } -func (ft *flushTask) Execute(ctx context.Context) error { +func (t *flushTask) Execute(ctx context.Context) error { coll2Segments := make(map[string]*schemapb.LongArray) flushColl2Segments := make(map[string]*schemapb.LongArray) coll2SealTimes := make(map[string]int64) coll2FlushTs := make(map[string]Timestamp) - for _, collName := range ft.CollectionNames { - collID, err := globalMetaCache.GetCollectionID(ctx, ft.GetDbName(), collName) + for _, collName := range t.CollectionNames { + collID, err := globalMetaCache.GetCollectionID(ctx, t.GetDbName(), collName) if err != nil { return err } flushReq := &datapb.FlushRequest{ Base: commonpbutil.UpdateMsgBase( - ft.Base, + t.Base, commonpbutil.WithMsgType(commonpb.MsgType_Flush), ), CollectionID: collID, IsImport: false, } - resp, err := ft.dataCoord.Flush(ctx, flushReq) + resp, err := t.dataCoord.Flush(ctx, flushReq) if err != nil { return fmt.Errorf("failed to call flush to data coordinator: %s", err.Error()) } @@ -1323,9 +1323,9 @@ func (ft *flushTask) Execute(ctx context.Context) error { coll2SealTimes[collName] = resp.GetTimeOfSeal() coll2FlushTs[collName] = resp.GetFlushTs() } - ft.result = &milvuspb.FlushResponse{ + t.result = &milvuspb.FlushResponse{ Status: merr.Status(nil), - DbName: ft.GetDbName(), + DbName: t.GetDbName(), CollSegIDs: coll2Segments, FlushCollSegIDs: flushColl2Segments, CollSealTimes: coll2SealTimes, @@ -1334,7 +1334,7 @@ func (ft *flushTask) Execute(ctx context.Context) error { return nil } -func (ft *flushTask) PostExecute(ctx context.Context) error { +func (t *flushTask) PostExecute(ctx context.Context) error { return nil } @@ -1349,65 +1349,65 @@ type loadCollectionTask struct { collectionID UniqueID } -func (lct *loadCollectionTask) TraceCtx() context.Context { - return lct.ctx +func (t *loadCollectionTask) TraceCtx() context.Context { + return t.ctx } -func (lct *loadCollectionTask) ID() UniqueID { - return lct.Base.MsgID +func (t *loadCollectionTask) ID() UniqueID { + return t.Base.MsgID } -func (lct *loadCollectionTask) SetID(uid UniqueID) { - lct.Base.MsgID = uid +func (t *loadCollectionTask) SetID(uid UniqueID) { + t.Base.MsgID = uid } -func (lct *loadCollectionTask) Name() string { +func (t *loadCollectionTask) Name() string { return LoadCollectionTaskName } -func (lct *loadCollectionTask) Type() commonpb.MsgType { - return lct.Base.MsgType +func (t *loadCollectionTask) Type() commonpb.MsgType { + return t.Base.MsgType } -func (lct *loadCollectionTask) BeginTs() Timestamp { - return lct.Base.Timestamp +func (t *loadCollectionTask) BeginTs() Timestamp { + return t.Base.Timestamp } -func (lct *loadCollectionTask) EndTs() Timestamp { - return lct.Base.Timestamp +func (t *loadCollectionTask) EndTs() Timestamp { + return t.Base.Timestamp } -func (lct *loadCollectionTask) SetTs(ts Timestamp) { - lct.Base.Timestamp = ts +func (t *loadCollectionTask) SetTs(ts Timestamp) { + t.Base.Timestamp = ts } -func (lct *loadCollectionTask) OnEnqueue() error { - lct.Base = commonpbutil.NewMsgBase() +func (t *loadCollectionTask) OnEnqueue() error { + t.Base = commonpbutil.NewMsgBase() return nil } -func (lct *loadCollectionTask) PreExecute(ctx context.Context) error { +func (t *loadCollectionTask) PreExecute(ctx context.Context) error { log.Ctx(ctx).Debug("loadCollectionTask PreExecute", zap.String("role", typeutil.ProxyRole)) - lct.Base.MsgType = commonpb.MsgType_LoadCollection - lct.Base.SourceID = paramtable.GetNodeID() + t.Base.MsgType = commonpb.MsgType_LoadCollection + t.Base.SourceID = paramtable.GetNodeID() - collName := lct.CollectionName + collName := t.CollectionName if err := validateCollectionName(collName); err != nil { return err } // To compat with LoadCollcetion before Milvus@2.1 - if lct.ReplicaNumber == 0 { - lct.ReplicaNumber = 1 + if t.ReplicaNumber == 0 { + t.ReplicaNumber = 1 } return nil } -func (lct *loadCollectionTask) Execute(ctx context.Context) (err error) { - collID, err := globalMetaCache.GetCollectionID(ctx, lct.GetDbName(), lct.CollectionName) +func (t *loadCollectionTask) Execute(ctx context.Context) (err error) { + collID, err := globalMetaCache.GetCollectionID(ctx, t.GetDbName(), t.CollectionName) log := log.Ctx(ctx).With( zap.String("role", typeutil.ProxyRole), @@ -1418,22 +1418,25 @@ func (lct *loadCollectionTask) Execute(ctx context.Context) (err error) { return err } - lct.collectionID = collID - collSchema, err := globalMetaCache.GetCollectionSchema(ctx, lct.GetDbName(), lct.CollectionName) + t.collectionID = collID + collSchema, err := globalMetaCache.GetCollectionSchema(ctx, t.GetDbName(), t.CollectionName) if err != nil { return err } // check index - indexResponse, err := lct.datacoord.DescribeIndex(ctx, &indexpb.DescribeIndexRequest{ + indexResponse, err := t.datacoord.DescribeIndex(ctx, &indexpb.DescribeIndexRequest{ CollectionID: collID, IndexName: "", }) + if err == nil { + err = merr.Error(indexResponse.GetStatus()) + } if err != nil { + if errors.Is(err, merr.ErrIndexNotFound) { + err = merr.WrapErrIndexNotFoundForCollection(t.GetCollectionName()) + } return err } - if indexResponse.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { - return merr.Error(indexResponse.GetStatus()) - } hasVecIndex := false fieldIndexIDs := make(map[int64]int64) @@ -1446,34 +1449,34 @@ func (lct *loadCollectionTask) Execute(ctx context.Context) (err error) { } } if !hasVecIndex { - errMsg := fmt.Sprintf("there is no vector index on collection: %s, please create index firstly", lct.LoadCollectionRequest.CollectionName) + errMsg := fmt.Sprintf("there is no vector index on collection: %s, please create index firstly", t.LoadCollectionRequest.CollectionName) log.Error(errMsg) return errors.New(errMsg) } request := &querypb.LoadCollectionRequest{ Base: commonpbutil.UpdateMsgBase( - lct.Base, + t.Base, commonpbutil.WithMsgType(commonpb.MsgType_LoadCollection), ), DbID: 0, CollectionID: collID, Schema: collSchema, - ReplicaNumber: lct.ReplicaNumber, + ReplicaNumber: t.ReplicaNumber, FieldIndexID: fieldIndexIDs, - Refresh: lct.Refresh, - ResourceGroups: lct.ResourceGroups, + Refresh: t.Refresh, + ResourceGroups: t.ResourceGroups, } log.Debug("send LoadCollectionRequest to query coordinator", zap.Any("schema", request.Schema)) - lct.result, err = lct.queryCoord.LoadCollection(ctx, request) + t.result, err = t.queryCoord.LoadCollection(ctx, request) if err != nil { return fmt.Errorf("call query coordinator LoadCollection: %s", err) } return nil } -func (lct *loadCollectionTask) PostExecute(ctx context.Context) error { - collID, err := globalMetaCache.GetCollectionID(ctx, lct.GetDbName(), lct.CollectionName) +func (t *loadCollectionTask) PostExecute(ctx context.Context) error { + collID, err := globalMetaCache.GetCollectionID(ctx, t.GetDbName(), t.CollectionName) log.Ctx(ctx).Debug("loadCollectionTask PostExecute", zap.String("role", typeutil.ProxyRole), zap.Int64("collectionID", collID)) @@ -1494,48 +1497,48 @@ type releaseCollectionTask struct { collectionID UniqueID } -func (rct *releaseCollectionTask) TraceCtx() context.Context { - return rct.ctx +func (t *releaseCollectionTask) TraceCtx() context.Context { + return t.ctx } -func (rct *releaseCollectionTask) ID() UniqueID { - return rct.Base.MsgID +func (t *releaseCollectionTask) ID() UniqueID { + return t.Base.MsgID } -func (rct *releaseCollectionTask) SetID(uid UniqueID) { - rct.Base.MsgID = uid +func (t *releaseCollectionTask) SetID(uid UniqueID) { + t.Base.MsgID = uid } -func (rct *releaseCollectionTask) Name() string { +func (t *releaseCollectionTask) Name() string { return ReleaseCollectionTaskName } -func (rct *releaseCollectionTask) Type() commonpb.MsgType { - return rct.Base.MsgType +func (t *releaseCollectionTask) Type() commonpb.MsgType { + return t.Base.MsgType } -func (rct *releaseCollectionTask) BeginTs() Timestamp { - return rct.Base.Timestamp +func (t *releaseCollectionTask) BeginTs() Timestamp { + return t.Base.Timestamp } -func (rct *releaseCollectionTask) EndTs() Timestamp { - return rct.Base.Timestamp +func (t *releaseCollectionTask) EndTs() Timestamp { + return t.Base.Timestamp } -func (rct *releaseCollectionTask) SetTs(ts Timestamp) { - rct.Base.Timestamp = ts +func (t *releaseCollectionTask) SetTs(ts Timestamp) { + t.Base.Timestamp = ts } -func (rct *releaseCollectionTask) OnEnqueue() error { - rct.Base = commonpbutil.NewMsgBase() +func (t *releaseCollectionTask) OnEnqueue() error { + t.Base = commonpbutil.NewMsgBase() return nil } -func (rct *releaseCollectionTask) PreExecute(ctx context.Context) error { - rct.Base.MsgType = commonpb.MsgType_ReleaseCollection - rct.Base.SourceID = paramtable.GetNodeID() +func (t *releaseCollectionTask) PreExecute(ctx context.Context) error { + t.Base.MsgType = commonpb.MsgType_ReleaseCollection + t.Base.SourceID = paramtable.GetNodeID() - collName := rct.CollectionName + collName := t.CollectionName if err := validateCollectionName(collName); err != nil { return err @@ -1544,30 +1547,30 @@ func (rct *releaseCollectionTask) PreExecute(ctx context.Context) error { return nil } -func (rct *releaseCollectionTask) Execute(ctx context.Context) (err error) { - collID, err := globalMetaCache.GetCollectionID(ctx, rct.GetDbName(), rct.CollectionName) +func (t *releaseCollectionTask) Execute(ctx context.Context) (err error) { + collID, err := globalMetaCache.GetCollectionID(ctx, t.GetDbName(), t.CollectionName) if err != nil { return err } - rct.collectionID = collID + t.collectionID = collID request := &querypb.ReleaseCollectionRequest{ Base: commonpbutil.UpdateMsgBase( - rct.Base, + t.Base, commonpbutil.WithMsgType(commonpb.MsgType_ReleaseCollection), ), DbID: 0, CollectionID: collID, } - rct.result, err = rct.queryCoord.ReleaseCollection(ctx, request) + t.result, err = t.queryCoord.ReleaseCollection(ctx, request) - globalMetaCache.RemoveCollection(ctx, rct.GetDbName(), rct.CollectionName) + globalMetaCache.RemoveCollection(ctx, t.GetDbName(), t.CollectionName) return err } -func (rct *releaseCollectionTask) PostExecute(ctx context.Context) error { - globalMetaCache.DeprecateShardCache(rct.GetDbName(), rct.CollectionName) +func (t *releaseCollectionTask) PostExecute(ctx context.Context) error { + globalMetaCache.DeprecateShardCache(t.GetDbName(), t.CollectionName) return nil } @@ -1582,54 +1585,54 @@ type loadPartitionsTask struct { collectionID UniqueID } -func (lpt *loadPartitionsTask) TraceCtx() context.Context { - return lpt.ctx +func (t *loadPartitionsTask) TraceCtx() context.Context { + return t.ctx } -func (lpt *loadPartitionsTask) ID() UniqueID { - return lpt.Base.MsgID +func (t *loadPartitionsTask) ID() UniqueID { + return t.Base.MsgID } -func (lpt *loadPartitionsTask) SetID(uid UniqueID) { - lpt.Base.MsgID = uid +func (t *loadPartitionsTask) SetID(uid UniqueID) { + t.Base.MsgID = uid } -func (lpt *loadPartitionsTask) Name() string { +func (t *loadPartitionsTask) Name() string { return LoadPartitionTaskName } -func (lpt *loadPartitionsTask) Type() commonpb.MsgType { - return lpt.Base.MsgType +func (t *loadPartitionsTask) Type() commonpb.MsgType { + return t.Base.MsgType } -func (lpt *loadPartitionsTask) BeginTs() Timestamp { - return lpt.Base.Timestamp +func (t *loadPartitionsTask) BeginTs() Timestamp { + return t.Base.Timestamp } -func (lpt *loadPartitionsTask) EndTs() Timestamp { - return lpt.Base.Timestamp +func (t *loadPartitionsTask) EndTs() Timestamp { + return t.Base.Timestamp } -func (lpt *loadPartitionsTask) SetTs(ts Timestamp) { - lpt.Base.Timestamp = ts +func (t *loadPartitionsTask) SetTs(ts Timestamp) { + t.Base.Timestamp = ts } -func (lpt *loadPartitionsTask) OnEnqueue() error { - lpt.Base = commonpbutil.NewMsgBase() +func (t *loadPartitionsTask) OnEnqueue() error { + t.Base = commonpbutil.NewMsgBase() return nil } -func (lpt *loadPartitionsTask) PreExecute(ctx context.Context) error { - lpt.Base.MsgType = commonpb.MsgType_LoadPartitions - lpt.Base.SourceID = paramtable.GetNodeID() +func (t *loadPartitionsTask) PreExecute(ctx context.Context) error { + t.Base.MsgType = commonpb.MsgType_LoadPartitions + t.Base.SourceID = paramtable.GetNodeID() - collName := lpt.CollectionName + collName := t.CollectionName if err := validateCollectionName(collName); err != nil { return err } - partitionKeyMode, err := isPartitionKeyMode(ctx, lpt.GetDbName(), collName) + partitionKeyMode, err := isPartitionKeyMode(ctx, t.GetDbName(), collName) if err != nil { return err } @@ -1640,28 +1643,31 @@ func (lpt *loadPartitionsTask) PreExecute(ctx context.Context) error { return nil } -func (lpt *loadPartitionsTask) Execute(ctx context.Context) error { +func (t *loadPartitionsTask) Execute(ctx context.Context) error { var partitionIDs []int64 - collID, err := globalMetaCache.GetCollectionID(ctx, lpt.GetDbName(), lpt.CollectionName) + collID, err := globalMetaCache.GetCollectionID(ctx, t.GetDbName(), t.CollectionName) if err != nil { return err } - lpt.collectionID = collID - collSchema, err := globalMetaCache.GetCollectionSchema(ctx, lpt.GetDbName(), lpt.CollectionName) + t.collectionID = collID + collSchema, err := globalMetaCache.GetCollectionSchema(ctx, t.GetDbName(), t.CollectionName) if err != nil { return err } // check index - indexResponse, err := lpt.datacoord.DescribeIndex(ctx, &indexpb.DescribeIndexRequest{ + indexResponse, err := t.datacoord.DescribeIndex(ctx, &indexpb.DescribeIndexRequest{ CollectionID: collID, IndexName: "", }) + if err == nil { + err = merr.Error(indexResponse.GetStatus()) + } if err != nil { + if errors.Is(err, merr.ErrIndexNotFound) { + err = merr.WrapErrIndexNotFoundForCollection(t.GetCollectionName()) + } return err } - if indexResponse.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { - return merr.Error(indexResponse.GetStatus()) - } hasVecIndex := false fieldIndexIDs := make(map[int64]int64) @@ -1674,12 +1680,12 @@ func (lpt *loadPartitionsTask) Execute(ctx context.Context) error { } } if !hasVecIndex { - errMsg := fmt.Sprintf("there is no vector index on collection: %s, please create index firstly", lpt.LoadPartitionsRequest.CollectionName) + errMsg := fmt.Sprintf("there is no vector index on collection: %s, please create index firstly", t.LoadPartitionsRequest.CollectionName) log.Ctx(ctx).Error(errMsg) return errors.New(errMsg) } - for _, partitionName := range lpt.PartitionNames { - partitionID, err := globalMetaCache.GetPartitionID(ctx, lpt.GetDbName(), lpt.CollectionName, partitionName) + for _, partitionName := range t.PartitionNames { + partitionID, err := globalMetaCache.GetPartitionID(ctx, t.GetDbName(), t.CollectionName, partitionName) if err != nil { return err } @@ -1690,23 +1696,23 @@ func (lpt *loadPartitionsTask) Execute(ctx context.Context) error { } request := &querypb.LoadPartitionsRequest{ Base: commonpbutil.UpdateMsgBase( - lpt.Base, + t.Base, commonpbutil.WithMsgType(commonpb.MsgType_LoadPartitions), ), DbID: 0, CollectionID: collID, PartitionIDs: partitionIDs, Schema: collSchema, - ReplicaNumber: lpt.ReplicaNumber, + ReplicaNumber: t.ReplicaNumber, FieldIndexID: fieldIndexIDs, - Refresh: lpt.Refresh, - ResourceGroups: lpt.ResourceGroups, + Refresh: t.Refresh, + ResourceGroups: t.ResourceGroups, } - lpt.result, err = lpt.queryCoord.LoadPartitions(ctx, request) + t.result, err = t.queryCoord.LoadPartitions(ctx, request) return err } -func (lpt *loadPartitionsTask) PostExecute(ctx context.Context) error { +func (t *loadPartitionsTask) PostExecute(ctx context.Context) error { return nil } @@ -1720,54 +1726,54 @@ type releasePartitionsTask struct { collectionID UniqueID } -func (rpt *releasePartitionsTask) TraceCtx() context.Context { - return rpt.ctx +func (t *releasePartitionsTask) TraceCtx() context.Context { + return t.ctx } -func (rpt *releasePartitionsTask) ID() UniqueID { - return rpt.Base.MsgID +func (t *releasePartitionsTask) ID() UniqueID { + return t.Base.MsgID } -func (rpt *releasePartitionsTask) SetID(uid UniqueID) { - rpt.Base.MsgID = uid +func (t *releasePartitionsTask) SetID(uid UniqueID) { + t.Base.MsgID = uid } -func (rpt *releasePartitionsTask) Type() commonpb.MsgType { - return rpt.Base.MsgType +func (t *releasePartitionsTask) Type() commonpb.MsgType { + return t.Base.MsgType } -func (rpt *releasePartitionsTask) Name() string { +func (t *releasePartitionsTask) Name() string { return ReleasePartitionTaskName } -func (rpt *releasePartitionsTask) BeginTs() Timestamp { - return rpt.Base.Timestamp +func (t *releasePartitionsTask) BeginTs() Timestamp { + return t.Base.Timestamp } -func (rpt *releasePartitionsTask) EndTs() Timestamp { - return rpt.Base.Timestamp +func (t *releasePartitionsTask) EndTs() Timestamp { + return t.Base.Timestamp } -func (rpt *releasePartitionsTask) SetTs(ts Timestamp) { - rpt.Base.Timestamp = ts +func (t *releasePartitionsTask) SetTs(ts Timestamp) { + t.Base.Timestamp = ts } -func (rpt *releasePartitionsTask) OnEnqueue() error { - rpt.Base = commonpbutil.NewMsgBase() +func (t *releasePartitionsTask) OnEnqueue() error { + t.Base = commonpbutil.NewMsgBase() return nil } -func (rpt *releasePartitionsTask) PreExecute(ctx context.Context) error { - rpt.Base.MsgType = commonpb.MsgType_ReleasePartitions - rpt.Base.SourceID = paramtable.GetNodeID() +func (t *releasePartitionsTask) PreExecute(ctx context.Context) error { + t.Base.MsgType = commonpb.MsgType_ReleasePartitions + t.Base.SourceID = paramtable.GetNodeID() - collName := rpt.CollectionName + collName := t.CollectionName if err := validateCollectionName(collName); err != nil { return err } - partitionKeyMode, err := isPartitionKeyMode(ctx, rpt.GetDbName(), collName) + partitionKeyMode, err := isPartitionKeyMode(ctx, t.GetDbName(), collName) if err != nil { return err } @@ -1778,15 +1784,15 @@ func (rpt *releasePartitionsTask) PreExecute(ctx context.Context) error { return nil } -func (rpt *releasePartitionsTask) Execute(ctx context.Context) (err error) { +func (t *releasePartitionsTask) Execute(ctx context.Context) (err error) { var partitionIDs []int64 - collID, err := globalMetaCache.GetCollectionID(ctx, rpt.GetDbName(), rpt.CollectionName) + collID, err := globalMetaCache.GetCollectionID(ctx, t.GetDbName(), t.CollectionName) if err != nil { return err } - rpt.collectionID = collID - for _, partitionName := range rpt.PartitionNames { - partitionID, err := globalMetaCache.GetPartitionID(ctx, rpt.GetDbName(), rpt.CollectionName, partitionName) + t.collectionID = collID + for _, partitionName := range t.PartitionNames { + partitionID, err := globalMetaCache.GetPartitionID(ctx, t.GetDbName(), t.CollectionName, partitionName) if err != nil { return err } @@ -1794,19 +1800,19 @@ func (rpt *releasePartitionsTask) Execute(ctx context.Context) (err error) { } request := &querypb.ReleasePartitionsRequest{ Base: commonpbutil.UpdateMsgBase( - rpt.Base, + t.Base, commonpbutil.WithMsgType(commonpb.MsgType_ReleasePartitions), ), DbID: 0, CollectionID: collID, PartitionIDs: partitionIDs, } - rpt.result, err = rpt.queryCoord.ReleasePartitions(ctx, request) + t.result, err = t.queryCoord.ReleasePartitions(ctx, request) return err } -func (rpt *releasePartitionsTask) PostExecute(ctx context.Context) error { - globalMetaCache.DeprecateShardCache(rpt.GetDbName(), rpt.CollectionName) +func (t *releasePartitionsTask) PostExecute(ctx context.Context) error { + globalMetaCache.DeprecateShardCache(t.GetDbName(), t.CollectionName) return nil } @@ -1820,78 +1826,78 @@ type CreateAliasTask struct { } // TraceCtx returns the trace context of the task. -func (c *CreateAliasTask) TraceCtx() context.Context { - return c.ctx +func (t *CreateAliasTask) TraceCtx() context.Context { + return t.ctx } // ID return the id of the task -func (c *CreateAliasTask) ID() UniqueID { - return c.Base.MsgID +func (t *CreateAliasTask) ID() UniqueID { + return t.Base.MsgID } // SetID sets the id of the task -func (c *CreateAliasTask) SetID(uid UniqueID) { - c.Base.MsgID = uid +func (t *CreateAliasTask) SetID(uid UniqueID) { + t.Base.MsgID = uid } // Name returns the name of the task -func (c *CreateAliasTask) Name() string { +func (t *CreateAliasTask) Name() string { return CreateAliasTaskName } // Type returns the type of the task -func (c *CreateAliasTask) Type() commonpb.MsgType { - return c.Base.MsgType +func (t *CreateAliasTask) Type() commonpb.MsgType { + return t.Base.MsgType } // BeginTs returns the ts -func (c *CreateAliasTask) BeginTs() Timestamp { - return c.Base.Timestamp +func (t *CreateAliasTask) BeginTs() Timestamp { + return t.Base.Timestamp } // EndTs returns the ts -func (c *CreateAliasTask) EndTs() Timestamp { - return c.Base.Timestamp +func (t *CreateAliasTask) EndTs() Timestamp { + return t.Base.Timestamp } // SetTs sets the ts -func (c *CreateAliasTask) SetTs(ts Timestamp) { - c.Base.Timestamp = ts +func (t *CreateAliasTask) SetTs(ts Timestamp) { + t.Base.Timestamp = ts } // OnEnqueue defines the behavior task enqueued -func (c *CreateAliasTask) OnEnqueue() error { - c.Base = commonpbutil.NewMsgBase() +func (t *CreateAliasTask) OnEnqueue() error { + t.Base = commonpbutil.NewMsgBase() return nil } -// PreExecute defines the action before task execution -func (c *CreateAliasTask) PreExecute(ctx context.Context) error { - c.Base.MsgType = commonpb.MsgType_CreateAlias - c.Base.SourceID = paramtable.GetNodeID() +// PreExecute defines the tion before task execution +func (t *CreateAliasTask) PreExecute(ctx context.Context) error { + t.Base.MsgType = commonpb.MsgType_CreateAlias + t.Base.SourceID = paramtable.GetNodeID() - collAlias := c.Alias + collAlias := t.Alias // collection alias uses the same format as collection name if err := ValidateCollectionAlias(collAlias); err != nil { return err } - collName := c.CollectionName + collName := t.CollectionName if err := validateCollectionName(collName); err != nil { return err } return nil } -// Execute defines the actual execution of create alias -func (c *CreateAliasTask) Execute(ctx context.Context) error { +// Execute defines the tual execution of create alias +func (t *CreateAliasTask) Execute(ctx context.Context) error { var err error - c.result, err = c.rootCoord.CreateAlias(ctx, c.CreateAliasRequest) + t.result, err = t.rootCoord.CreateAlias(ctx, t.CreateAliasRequest) return err } // PostExecute defines the post execution, do nothing for create alias -func (c *CreateAliasTask) PostExecute(ctx context.Context) error { +func (t *CreateAliasTask) PostExecute(ctx context.Context) error { return nil } @@ -1905,63 +1911,63 @@ type DropAliasTask struct { } // TraceCtx returns the context for trace -func (d *DropAliasTask) TraceCtx() context.Context { - return d.ctx +func (t *DropAliasTask) TraceCtx() context.Context { + return t.ctx } // ID returns the MsgID -func (d *DropAliasTask) ID() UniqueID { - return d.Base.MsgID +func (t *DropAliasTask) ID() UniqueID { + return t.Base.MsgID } // SetID sets the MsgID -func (d *DropAliasTask) SetID(uid UniqueID) { - d.Base.MsgID = uid +func (t *DropAliasTask) SetID(uid UniqueID) { + t.Base.MsgID = uid } // Name returns the name of the task -func (d *DropAliasTask) Name() string { +func (t *DropAliasTask) Name() string { return DropAliasTaskName } -func (d *DropAliasTask) Type() commonpb.MsgType { - return d.Base.MsgType +func (t *DropAliasTask) Type() commonpb.MsgType { + return t.Base.MsgType } -func (d *DropAliasTask) BeginTs() Timestamp { - return d.Base.Timestamp +func (t *DropAliasTask) BeginTs() Timestamp { + return t.Base.Timestamp } -func (d *DropAliasTask) EndTs() Timestamp { - return d.Base.Timestamp +func (t *DropAliasTask) EndTs() Timestamp { + return t.Base.Timestamp } -func (d *DropAliasTask) SetTs(ts Timestamp) { - d.Base.Timestamp = ts +func (t *DropAliasTask) SetTs(ts Timestamp) { + t.Base.Timestamp = ts } -func (d *DropAliasTask) OnEnqueue() error { - d.Base = commonpbutil.NewMsgBase() +func (t *DropAliasTask) OnEnqueue() error { + t.Base = commonpbutil.NewMsgBase() return nil } -func (d *DropAliasTask) PreExecute(ctx context.Context) error { - d.Base.MsgType = commonpb.MsgType_DropAlias - d.Base.SourceID = paramtable.GetNodeID() - collAlias := d.Alias +func (t *DropAliasTask) PreExecute(ctx context.Context) error { + t.Base.MsgType = commonpb.MsgType_DropAlias + t.Base.SourceID = paramtable.GetNodeID() + collAlias := t.Alias if err := ValidateCollectionAlias(collAlias); err != nil { return err } return nil } -func (d *DropAliasTask) Execute(ctx context.Context) error { +func (t *DropAliasTask) Execute(ctx context.Context) error { var err error - d.result, err = d.rootCoord.DropAlias(ctx, d.DropAliasRequest) + t.result, err = t.rootCoord.DropAlias(ctx, t.DropAliasRequest) return err } -func (d *DropAliasTask) PostExecute(ctx context.Context) error { +func (t *DropAliasTask) PostExecute(ctx context.Context) error { return nil } @@ -1974,54 +1980,54 @@ type AlterAliasTask struct { result *commonpb.Status } -func (a *AlterAliasTask) TraceCtx() context.Context { - return a.ctx +func (t *AlterAliasTask) TraceCtx() context.Context { + return t.ctx } -func (a *AlterAliasTask) ID() UniqueID { - return a.Base.MsgID +func (t *AlterAliasTask) ID() UniqueID { + return t.Base.MsgID } -func (a *AlterAliasTask) SetID(uid UniqueID) { - a.Base.MsgID = uid +func (t *AlterAliasTask) SetID(uid UniqueID) { + t.Base.MsgID = uid } -func (a *AlterAliasTask) Name() string { +func (t *AlterAliasTask) Name() string { return AlterAliasTaskName } -func (a *AlterAliasTask) Type() commonpb.MsgType { - return a.Base.MsgType +func (t *AlterAliasTask) Type() commonpb.MsgType { + return t.Base.MsgType } -func (a *AlterAliasTask) BeginTs() Timestamp { - return a.Base.Timestamp +func (t *AlterAliasTask) BeginTs() Timestamp { + return t.Base.Timestamp } -func (a *AlterAliasTask) EndTs() Timestamp { - return a.Base.Timestamp +func (t *AlterAliasTask) EndTs() Timestamp { + return t.Base.Timestamp } -func (a *AlterAliasTask) SetTs(ts Timestamp) { - a.Base.Timestamp = ts +func (t *AlterAliasTask) SetTs(ts Timestamp) { + t.Base.Timestamp = ts } -func (a *AlterAliasTask) OnEnqueue() error { - a.Base = commonpbutil.NewMsgBase() +func (t *AlterAliasTask) OnEnqueue() error { + t.Base = commonpbutil.NewMsgBase() return nil } -func (a *AlterAliasTask) PreExecute(ctx context.Context) error { - a.Base.MsgType = commonpb.MsgType_AlterAlias - a.Base.SourceID = paramtable.GetNodeID() +func (t *AlterAliasTask) PreExecute(ctx context.Context) error { + t.Base.MsgType = commonpb.MsgType_AlterAlias + t.Base.SourceID = paramtable.GetNodeID() - collAlias := a.Alias + collAlias := t.Alias // collection alias uses the same format as collection name if err := ValidateCollectionAlias(collAlias); err != nil { return err } - collName := a.CollectionName + collName := t.CollectionName if err := validateCollectionName(collName); err != nil { return err } @@ -2029,13 +2035,13 @@ func (a *AlterAliasTask) PreExecute(ctx context.Context) error { return nil } -func (a *AlterAliasTask) Execute(ctx context.Context) error { +func (t *AlterAliasTask) Execute(ctx context.Context) error { var err error - a.result, err = a.rootCoord.AlterAlias(ctx, a.AlterAliasRequest) + t.result, err = t.rootCoord.AlterAlias(ctx, t.AlterAliasRequest) return err } -func (a *AlterAliasTask) PostExecute(ctx context.Context) error { +func (t *AlterAliasTask) PostExecute(ctx context.Context) error { return nil } diff --git a/internal/proxy/task_index.go b/internal/proxy/task_index.go index cc0229bb07a95..65f4406eeff29 100644 --- a/internal/proxy/task_index.go +++ b/internal/proxy/task_index.go @@ -491,13 +491,19 @@ func (dit *describeIndexTask) Execute(ctx context.Context) error { } resp, err := dit.datacoord.DescribeIndex(ctx, &indexpb.DescribeIndexRequest{CollectionID: dit.collectionID, IndexName: dit.IndexName, Timestamp: dit.Timestamp}) - if err != nil || resp == nil { + if err != nil { return err } + dit.result = &milvuspb.DescribeIndexResponse{} dit.result.Status = resp.GetStatus() - if dit.result.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { - return merr.Error(dit.result.GetStatus()) + err = merr.Error(resp.GetStatus()) + if err != nil { + if errors.Is(err, merr.ErrIndexNotFound) && len(dit.GetIndexName()) == 0 { + err = merr.WrapErrIndexNotFoundForCollection(dit.GetCollectionName()) + dit.result.Status = merr.Status(err) + } + return err } for _, indexInfo := range resp.IndexInfos { field, err := schemaHelper.GetFieldFromID(indexInfo.FieldID) diff --git a/pkg/util/merr/errors_test.go b/pkg/util/merr/errors_test.go index 40c74ba907711..e578016c8bca4 100644 --- a/pkg/util/merr/errors_test.go +++ b/pkg/util/merr/errors_test.go @@ -109,6 +109,9 @@ func (s *ErrSuite) TestWrap() { // Index related s.ErrorIs(WrapErrIndexNotFound("failed to get Index"), ErrIndexNotFound) + s.ErrorIs(WrapErrIndexNotFoundForCollection("milvus_hello", "failed to get collection index"), ErrIndexNotFound) + s.ErrorIs(WrapErrIndexNotFoundForSegment(100, "failed to get collection index"), ErrIndexNotFound) + s.ErrorIs(WrapErrIndexNotSupported("wsnh", "failed to create index"), ErrIndexNotSupported) // Node related s.ErrorIs(WrapErrNodeNotFound(1, "failed to get node"), ErrNodeNotFound) diff --git a/pkg/util/merr/utils.go b/pkg/util/merr/utils.go index 8195670f0ac8d..7f4bb6b49a859 100644 --- a/pkg/util/merr/utils.go +++ b/pkg/util/merr/utils.go @@ -489,6 +489,14 @@ func WrapErrIndexNotFoundForSegment(segmentID int64, msg ...string) error { return err } +func WrapErrIndexNotFoundForCollection(collection string, msg ...string) error { + err := wrapWithField(ErrIndexNotFound, "collection", collection) + if len(msg) > 0 { + err = errors.Wrap(err, strings.Join(msg, "; ")) + } + return err +} + func WrapErrIndexNotSupported(indexType string, msg ...string) error { err := wrapWithField(ErrIndexNotSupported, "indexType", indexType) if len(msg) > 0 { diff --git a/pkg/util/typeutil/gen_empty_field_data.go b/pkg/util/typeutil/gen_empty_field_data.go index 2292c295331ba..818a8ec25f883 100644 --- a/pkg/util/typeutil/gen_empty_field_data.go +++ b/pkg/util/typeutil/gen_empty_field_data.go @@ -15,7 +15,8 @@ func genEmptyBoolFieldData(field *schemapb.FieldSchema) *schemapb.FieldData { Data: &schemapb.ScalarField_BoolData{BoolData: &schemapb.BoolArray{Data: nil}}, }, }, - FieldId: field.GetFieldID(), + FieldId: field.GetFieldID(), + IsDynamic: field.GetIsDynamic(), } } @@ -28,7 +29,8 @@ func genEmptyIntFieldData(field *schemapb.FieldSchema) *schemapb.FieldData { Data: &schemapb.ScalarField_IntData{IntData: &schemapb.IntArray{Data: nil}}, }, }, - FieldId: field.GetFieldID(), + FieldId: field.GetFieldID(), + IsDynamic: field.GetIsDynamic(), } } @@ -41,7 +43,8 @@ func genEmptyLongFieldData(field *schemapb.FieldSchema) *schemapb.FieldData { Data: &schemapb.ScalarField_LongData{LongData: &schemapb.LongArray{Data: nil}}, }, }, - FieldId: field.GetFieldID(), + FieldId: field.GetFieldID(), + IsDynamic: field.GetIsDynamic(), } } @@ -54,7 +57,8 @@ func genEmptyFloatFieldData(field *schemapb.FieldSchema) *schemapb.FieldData { Data: &schemapb.ScalarField_FloatData{FloatData: &schemapb.FloatArray{Data: nil}}, }, }, - FieldId: field.GetFieldID(), + FieldId: field.GetFieldID(), + IsDynamic: field.GetIsDynamic(), } } @@ -67,7 +71,8 @@ func genEmptyDoubleFieldData(field *schemapb.FieldSchema) *schemapb.FieldData { Data: &schemapb.ScalarField_DoubleData{DoubleData: &schemapb.DoubleArray{Data: nil}}, }, }, - FieldId: field.GetFieldID(), + FieldId: field.GetFieldID(), + IsDynamic: field.GetIsDynamic(), } } @@ -80,7 +85,8 @@ func genEmptyVarCharFieldData(field *schemapb.FieldSchema) *schemapb.FieldData { Data: &schemapb.ScalarField_StringData{StringData: &schemapb.StringArray{Data: nil}}, }, }, - FieldId: field.GetFieldID(), + FieldId: field.GetFieldID(), + IsDynamic: field.GetIsDynamic(), } } @@ -93,7 +99,8 @@ func genEmptyArrayFieldData(field *schemapb.FieldSchema) *schemapb.FieldData { Data: &schemapb.ScalarField_ArrayData{ArrayData: &schemapb.ArrayArray{Data: nil}}, }, }, - FieldId: field.GetFieldID(), + FieldId: field.GetFieldID(), + IsDynamic: field.GetIsDynamic(), } } @@ -106,7 +113,8 @@ func genEmptyJSONFieldData(field *schemapb.FieldSchema) *schemapb.FieldData { Data: &schemapb.ScalarField_JsonData{JsonData: &schemapb.JSONArray{Data: nil}}, }, }, - FieldId: field.GetFieldID(), + FieldId: field.GetFieldID(), + IsDynamic: field.GetIsDynamic(), } } @@ -126,7 +134,8 @@ func genEmptyBinaryVectorFieldData(field *schemapb.FieldSchema) (*schemapb.Field }, }, }, - FieldId: field.GetFieldID(), + FieldId: field.GetFieldID(), + IsDynamic: field.GetIsDynamic(), }, nil } @@ -146,7 +155,8 @@ func genEmptyFloatVectorFieldData(field *schemapb.FieldSchema) (*schemapb.FieldD }, }, }, - FieldId: field.GetFieldID(), + FieldId: field.GetFieldID(), + IsDynamic: field.GetIsDynamic(), }, nil } @@ -166,7 +176,8 @@ func genEmptyFloat16VectorFieldData(field *schemapb.FieldSchema) (*schemapb.Fiel }, }, }, - FieldId: field.GetFieldID(), + FieldId: field.GetFieldID(), + IsDynamic: field.GetIsDynamic(), }, nil } diff --git a/pkg/util/typeutil/schema.go b/pkg/util/typeutil/schema.go index c0bd98b360d1c..16f12fa40f688 100644 --- a/pkg/util/typeutil/schema.go +++ b/pkg/util/typeutil/schema.go @@ -23,6 +23,7 @@ import ( "unsafe" "github.com/cockroachdb/errors" + "github.com/samber/lo" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" @@ -833,6 +834,12 @@ func GetPrimaryFieldData(datas []*schemapb.FieldData, primaryFieldSchema *schema return primaryFieldData, nil } +func GetField(schema *schemapb.CollectionSchema, fieldID int64) *schemapb.FieldSchema { + return lo.FindOrElse(schema.GetFields(), nil, func(field *schemapb.FieldSchema) bool { + return field.GetFieldID() == fieldID + }) +} + func IsPrimaryFieldDataExist(datas []*schemapb.FieldData, primaryFieldSchema *schemapb.FieldSchema) bool { primaryFieldID := primaryFieldSchema.FieldID primaryFieldName := primaryFieldSchema.Name