From 94f5e0317080cccb3f18a5e66b980b07f37351f4 Mon Sep 17 00:00:00 2001 From: PowderLi Date: Thu, 25 Jul 2024 17:08:37 +0800 Subject: [PATCH] feat: support alter collection field properties Signed-off-by: PowderLi --- client/mock_milvus_server_test.go | 55 +++++++++++++++ internal/datacoord/mock_test.go | 4 ++ internal/datacoord/services.go | 1 + internal/distributed/proxy/service.go | 5 ++ .../distributed/rootcoord/client/client.go | 11 +++ internal/distributed/rootcoord/service.go | 4 ++ internal/mocks/mock_proxy.go | 55 +++++++++++++++ internal/mocks/mock_rootcoord.go | 55 +++++++++++++++ internal/mocks/mock_rootcoord_client.go | 70 +++++++++++++++++++ internal/proto/root_coord.proto | 2 + internal/proxy/impl.go | 16 ++++- internal/proxy/rootcoord_mock_test.go | 4 ++ internal/proxy/task.go | 22 +----- internal/rootcoord/alter_collection_task.go | 38 +++++++--- .../rootcoord/alter_collection_task_test.go | 39 ++++++++++- internal/rootcoord/broker.go | 15 ++-- internal/rootcoord/broker_test.go | 13 ++-- internal/rootcoord/mock_test.go | 6 +- internal/rootcoord/root_coord.go | 11 +++ internal/rootcoord/step.go | 11 +-- internal/util/mock/grpc_rootcoord_client.go | 4 ++ 21 files changed, 384 insertions(+), 57 deletions(-) diff --git a/client/mock_milvus_server_test.go b/client/mock_milvus_server_test.go index 2ef4927e9a8c4..0444661bd2346 100644 --- a/client/mock_milvus_server_test.go +++ b/client/mock_milvus_server_test.go @@ -192,6 +192,61 @@ func (_c *MilvusServiceServer_AlterCollection_Call) RunAndReturn(run func(contex return _c } +// AlterCollection provides a mock function with given fields: _a0, _a1 +func (_m *MilvusServiceServer) AlterCollectionField(_a0 context.Context, _a1 *milvuspb.AlterCollectionRequest) (*commonpb.Status, error) { + ret := _m.Called(_a0, _a1) + + var r0 *commonpb.Status + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.AlterCollectionRequest) (*commonpb.Status, error)); ok { + return rf(_a0, _a1) + } + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.AlterCollectionRequest) *commonpb.Status); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*commonpb.Status) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.AlterCollectionRequest) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MilvusServiceServer_AlterCollection_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AlterCollection' +type MilvusServiceServer_AlterCollectionField_Call struct { + *mock.Call +} + +// AlterCollection is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 *milvuspb.AlterCollectionRequest +func (_e *MilvusServiceServer_Expecter) AlterCollectionField(_a0 interface{}, _a1 interface{}) *MilvusServiceServer_AlterCollectionField_Call { + return &MilvusServiceServer_AlterCollectionField_Call{Call: _e.mock.On("AlterCollection", _a0, _a1)} +} + +func (_c *MilvusServiceServer_AlterCollectionField_Call) Run(run func(_a0 context.Context, _a1 *milvuspb.AlterCollectionRequest)) *MilvusServiceServer_AlterCollectionField_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*milvuspb.AlterCollectionRequest)) + }) + return _c +} + +func (_c *MilvusServiceServer_AlterCollectionField_Call) Return(_a0 *commonpb.Status, _a1 error) *MilvusServiceServer_AlterCollectionField_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MilvusServiceServer_AlterCollectionField_Call) RunAndReturn(run func(context.Context, *milvuspb.AlterCollectionRequest) (*commonpb.Status, error)) *MilvusServiceServer_AlterCollectionField_Call { + _c.Call.Return(run) + return _c +} + // AlterDatabase provides a mock function with given fields: _a0, _a1 func (_m *MilvusServiceServer) AlterDatabase(_a0 context.Context, _a1 *milvuspb.AlterDatabaseRequest) (*commonpb.Status, error) { ret := _m.Called(_a0, _a1) diff --git a/internal/datacoord/mock_test.go b/internal/datacoord/mock_test.go index 0a79599937dbd..9eb3c96b57b94 100644 --- a/internal/datacoord/mock_test.go +++ b/internal/datacoord/mock_test.go @@ -489,6 +489,10 @@ func (m *mockRootCoordClient) AlterCollection(ctx context.Context, request *milv panic("not implemented") // TODO: Implement } +func (m *mockRootCoordClient) AlterCollectionOrField(ctx context.Context, request *milvuspb.AlterCollectionRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { + panic("not implemented") // TODO: Implement +} + func (m *mockRootCoordClient) CreatePartition(ctx context.Context, req *milvuspb.CreatePartitionRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { panic("not implemented") // TODO: Implement } diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 6407e97f05801..fd0cce6737aa1 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -1510,6 +1510,7 @@ func (s *Server) BroadcastAlteredCollection(ctx context.Context, req *datapb.Alt return merr.Success(), nil } + clonedColl.Schema = req.GetSchema() clonedColl.Properties = properties s.meta.AddCollection(clonedColl) return merr.Success(), nil diff --git a/internal/distributed/proxy/service.go b/internal/distributed/proxy/service.go index e132fa6fa10ed..c6e77fea45984 100644 --- a/internal/distributed/proxy/service.go +++ b/internal/distributed/proxy/service.go @@ -818,6 +818,11 @@ func (s *Server) AlterCollection(ctx context.Context, request *milvuspb.AlterCol return s.proxy.AlterCollection(ctx, request) } +func (s *Server) AlterCollectionField(ctx context.Context, request *milvuspb.AlterCollectionRequest) (*commonpb.Status, error) { + // todo + return s.proxy.AlterCollection(ctx, request) +} + // CreatePartition notifies Proxy to create a partition func (s *Server) CreatePartition(ctx context.Context, request *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) { return s.proxy.CreatePartition(ctx, request) diff --git a/internal/distributed/rootcoord/client/client.go b/internal/distributed/rootcoord/client/client.go index ece48aafddeb0..bada37fb2afb3 100644 --- a/internal/distributed/rootcoord/client/client.go +++ b/internal/distributed/rootcoord/client/client.go @@ -229,6 +229,17 @@ func (c *Client) AlterCollection(ctx context.Context, request *milvuspb.AlterCol }) } +func (c *Client) AlterCollectionOrField(ctx context.Context, request *milvuspb.AlterCollectionRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { + request = typeutil.Clone(request) + commonpbutil.UpdateMsgBase( + request.GetBase(), + commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())), + ) + return wrapGrpcCall(ctx, c, func(client rootcoordpb.RootCoordClient) (*commonpb.Status, error) { + return client.AlterCollectionOrField(ctx, request) + }) +} + // CreatePartition create partition func (c *Client) CreatePartition(ctx context.Context, in *milvuspb.CreatePartitionRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { in = typeutil.Clone(in) diff --git a/internal/distributed/rootcoord/service.go b/internal/distributed/rootcoord/service.go index 8e4edd795b84e..3ddfc8410545c 100644 --- a/internal/distributed/rootcoord/service.go +++ b/internal/distributed/rootcoord/service.go @@ -530,6 +530,10 @@ func (s *Server) AlterCollection(ctx context.Context, request *milvuspb.AlterCol return s.rootCoord.AlterCollection(ctx, request) } +func (s *Server) AlterCollectionOrField(ctx context.Context, request *milvuspb.AlterCollectionRequest) (*commonpb.Status, error) { + return s.rootCoord.AlterCollectionOrField(ctx, request) +} + func (s *Server) RenameCollection(ctx context.Context, request *milvuspb.RenameCollectionRequest) (*commonpb.Status, error) { return s.rootCoord.RenameCollection(ctx, request) } diff --git a/internal/mocks/mock_proxy.go b/internal/mocks/mock_proxy.go index ab28fd9f87ac4..47d6e6e0858f8 100644 --- a/internal/mocks/mock_proxy.go +++ b/internal/mocks/mock_proxy.go @@ -199,6 +199,61 @@ func (_c *MockProxy_AlterCollection_Call) RunAndReturn(run func(context.Context, return _c } +// AlterCollection provides a mock function with given fields: _a0, _a1 +func (_m *MockProxy) AlterCollectionField(_a0 context.Context, _a1 *milvuspb.AlterCollectionRequest) (*commonpb.Status, error) { + ret := _m.Called(_a0, _a1) + + var r0 *commonpb.Status + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.AlterCollectionRequest) (*commonpb.Status, error)); ok { + return rf(_a0, _a1) + } + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.AlterCollectionRequest) *commonpb.Status); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*commonpb.Status) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.AlterCollectionRequest) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockProxy_AlterCollection_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AlterCollection' +type MockProxy_AlterCollectionField_Call struct { + *mock.Call +} + +// AlterCollection is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 *milvuspb.AlterCollectionRequest +func (_e *MockProxy_Expecter) AlterCollectionField(_a0 interface{}, _a1 interface{}) *MockProxy_AlterCollectionField_Call { + return &MockProxy_AlterCollectionField_Call{Call: _e.mock.On("AlterCollection", _a0, _a1)} +} + +func (_c *MockProxy_AlterCollectionField_Call) Run(run func(_a0 context.Context, _a1 *milvuspb.AlterCollectionRequest)) *MockProxy_AlterCollectionField_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*milvuspb.AlterCollectionRequest)) + }) + return _c +} + +func (_c *MockProxy_AlterCollectionField_Call) Return(_a0 *commonpb.Status, _a1 error) *MockProxy_AlterCollectionField_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockProxy_AlterCollectionField_Call) RunAndReturn(run func(context.Context, *milvuspb.AlterCollectionRequest) (*commonpb.Status, error)) *MockProxy_AlterCollectionField_Call { + _c.Call.Return(run) + return _c +} + // AlterDatabase provides a mock function with given fields: _a0, _a1 func (_m *MockProxy) AlterDatabase(_a0 context.Context, _a1 *milvuspb.AlterDatabaseRequest) (*commonpb.Status, error) { ret := _m.Called(_a0, _a1) diff --git a/internal/mocks/mock_rootcoord.go b/internal/mocks/mock_rootcoord.go index a025150083aa5..bca0070e9630c 100644 --- a/internal/mocks/mock_rootcoord.go +++ b/internal/mocks/mock_rootcoord.go @@ -256,6 +256,61 @@ func (_c *RootCoord_AlterCollection_Call) RunAndReturn(run func(context.Context, return _c } +// AlterCollection provides a mock function with given fields: _a0, _a1 +func (_m *RootCoord) AlterCollectionOrField(_a0 context.Context, _a1 *milvuspb.AlterCollectionRequest) (*commonpb.Status, error) { + ret := _m.Called(_a0, _a1) + + var r0 *commonpb.Status + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.AlterCollectionRequest) (*commonpb.Status, error)); ok { + return rf(_a0, _a1) + } + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.AlterCollectionRequest) *commonpb.Status); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*commonpb.Status) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.AlterCollectionRequest) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// RootCoord_AlterCollection_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AlterCollection' +type RootCoord_AlterCollectionOrField_Call struct { + *mock.Call +} + +// AlterCollection is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 *milvuspb.AlterCollectionRequest +func (_e *RootCoord_Expecter) AlterCollectionOrField(_a0 interface{}, _a1 interface{}) *RootCoord_AlterCollectionOrField_Call { + return &RootCoord_AlterCollectionOrField_Call{Call: _e.mock.On("AlterCollection", _a0, _a1)} +} + +func (_c *RootCoord_AlterCollectionOrField_Call) Run(run func(_a0 context.Context, _a1 *milvuspb.AlterCollectionRequest)) *RootCoord_AlterCollectionOrField_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*milvuspb.AlterCollectionRequest)) + }) + return _c +} + +func (_c *RootCoord_AlterCollectionOrField_Call) Return(_a0 *commonpb.Status, _a1 error) *RootCoord_AlterCollectionOrField_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *RootCoord_AlterCollectionOrField_Call) RunAndReturn(run func(context.Context, *milvuspb.AlterCollectionRequest) (*commonpb.Status, error)) *RootCoord_AlterCollectionOrField_Call { + _c.Call.Return(run) + return _c +} + // AlterDatabase provides a mock function with given fields: _a0, _a1 func (_m *RootCoord) AlterDatabase(_a0 context.Context, _a1 *rootcoordpb.AlterDatabaseRequest) (*commonpb.Status, error) { ret := _m.Called(_a0, _a1) diff --git a/internal/mocks/mock_rootcoord_client.go b/internal/mocks/mock_rootcoord_client.go index 987571b6c024e..b1234739f2690 100644 --- a/internal/mocks/mock_rootcoord_client.go +++ b/internal/mocks/mock_rootcoord_client.go @@ -313,6 +313,76 @@ func (_c *MockRootCoordClient_AlterCollection_Call) RunAndReturn(run func(contex return _c } +// AlterCollectionOrField provides a mock function with given fields: ctx, in, opts +func (_m *MockRootCoordClient) AlterCollectionOrField(ctx context.Context, in *milvuspb.AlterCollectionRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, in) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 *commonpb.Status + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.AlterCollectionRequest, ...grpc.CallOption) (*commonpb.Status, error)); ok { + return rf(ctx, in, opts...) + } + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.AlterCollectionRequest, ...grpc.CallOption) *commonpb.Status); ok { + r0 = rf(ctx, in, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*commonpb.Status) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.AlterCollectionRequest, ...grpc.CallOption) error); ok { + r1 = rf(ctx, in, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockRootCoordClient_AlterCollection_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AlterCollection' +type MockRootCoordClient_AlterCollectionOrField_Call struct { + *mock.Call +} + +// AlterCollection is a helper method to define mock.On call +// - ctx context.Context +// - in *milvuspb.AlterCollectionRequest +// - opts ...grpc.CallOption +func (_e *MockRootCoordClient_Expecter) AlterCollectionOrField(ctx interface{}, in interface{}, opts ...interface{}) *MockRootCoordClient_AlterCollectionOrField_Call { + return &MockRootCoordClient_AlterCollectionOrField_Call{Call: _e.mock.On("AlterCollectionOrField", + append([]interface{}{ctx, in}, opts...)...)} +} + +func (_c *MockRootCoordClient_AlterCollectionOrField_Call) Run(run func(ctx context.Context, in *milvuspb.AlterCollectionRequest, opts ...grpc.CallOption)) *MockRootCoordClient_AlterCollectionOrField_Call { + _c.Call.Run(func(args mock.Arguments) { + variadicArgs := make([]grpc.CallOption, len(args)-2) + for i, a := range args[2:] { + if a != nil { + variadicArgs[i] = a.(grpc.CallOption) + } + } + run(args[0].(context.Context), args[1].(*milvuspb.AlterCollectionRequest), variadicArgs...) + }) + return _c +} + +func (_c *MockRootCoordClient_AlterCollectionOrField_Call) Return(_a0 *commonpb.Status, _a1 error) *MockRootCoordClient_AlterCollectionOrField_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockRootCoordClient_AlterCollectionOrField_Call) RunAndReturn(run func(context.Context, *milvuspb.AlterCollectionRequest, ...grpc.CallOption) (*commonpb.Status, error)) *MockRootCoordClient_AlterCollectionOrField_Call { + _c.Call.Return(run) + return _c +} + // AlterDatabase provides a mock function with given fields: ctx, in, opts func (_m *MockRootCoordClient) AlterDatabase(ctx context.Context, in *rootcoordpb.AlterDatabaseRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { _va := make([]interface{}, len(opts)) diff --git a/internal/proto/root_coord.proto b/internal/proto/root_coord.proto index 4fc19061cad4b..402a4712e5dfc 100644 --- a/internal/proto/root_coord.proto +++ b/internal/proto/root_coord.proto @@ -65,6 +65,8 @@ service RootCoord { rpc AlterCollection(milvus.AlterCollectionRequest) returns (common.Status) {} + rpc AlterCollectionOrField(milvus.AlterCollectionRequest) returns (common.Status) {} + /** * @brief This method is used to create partition * diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index f989cd922ab21..ef2f1d2dfbc62 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -1218,13 +1218,25 @@ func (node *Proxy) ShowCollections(ctx context.Context, request *milvuspb.ShowCo } func (node *Proxy) AlterCollection(ctx context.Context, request *milvuspb.AlterCollectionRequest) (*commonpb.Status, error) { + return node.alterCollectionOrField(ctx, "AlterCollection", &milvuspb.AlterCollectionRequest{ + Base: request.Base, + DbName: request.DbName, + CollectionName: request.CollectionName, + Properties: request.Properties, + }) +} + +func (node *Proxy) AlterCollectionField(ctx context.Context, request *milvuspb.AlterCollectionRequest) (*commonpb.Status, error) { + return node.alterCollectionOrField(ctx, "AlterCollectionField", request) +} + +func (node *Proxy) alterCollectionOrField(ctx context.Context, method string, request *milvuspb.AlterCollectionRequest) (*commonpb.Status, error) { if err := merr.CheckHealthy(node.GetStateCode()); err != nil { return merr.Status(err), nil } - ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-AlterCollection") + ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-"+method) defer sp.End() - method := "AlterCollection" tr := timerecord.NewTimeRecorder(method) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() diff --git a/internal/proxy/rootcoord_mock_test.go b/internal/proxy/rootcoord_mock_test.go index 9fc565163abdd..cb94c1b03d87a 100644 --- a/internal/proxy/rootcoord_mock_test.go +++ b/internal/proxy/rootcoord_mock_test.go @@ -1095,6 +1095,10 @@ func (coord *RootCoordMock) AlterCollection(ctx context.Context, request *milvus return &commonpb.Status{}, nil } +func (coord *RootCoordMock) AlterCollectionOrField(ctx context.Context, request *milvuspb.AlterCollectionRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { + return &commonpb.Status{}, nil +} + func (coord *RootCoordMock) CreateDatabase(ctx context.Context, in *milvuspb.CreateDatabaseRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { return &commonpb.Status{}, nil } diff --git a/internal/proxy/task.go b/internal/proxy/task.go index 7b64ce7d8cf50..6a7c743b4ff82 100644 --- a/internal/proxy/task.go +++ b/internal/proxy/task.go @@ -459,7 +459,6 @@ func (t *dropCollectionTask) OnEnqueue() error { } func (t *dropCollectionTask) PreExecute(ctx context.Context) error { - if err := validateCollectionName(t.CollectionName); err != nil { return err } @@ -527,7 +526,6 @@ func (t *hasCollectionTask) OnEnqueue() error { } func (t *hasCollectionTask) PreExecute(ctx context.Context) error { - if err := validateCollectionName(t.CollectionName); err != nil { return err } @@ -595,7 +593,6 @@ func (t *describeCollectionTask) OnEnqueue() error { } func (t *describeCollectionTask) PreExecute(ctx context.Context) error { - if t.CollectionID != 0 && len(t.CollectionName) == 0 { return nil } @@ -935,12 +932,13 @@ func validatePartitionKeyIsolation(colName string, isPartitionKeyEnabled bool, p } func (t *alterCollectionTask) PreExecute(ctx context.Context) error { - collectionID, err := globalMetaCache.GetCollectionID(ctx, t.GetDbName(), t.CollectionName) if err != nil { return err } + // todo: if field != "": check dataType / elementDataType, max_length > origin maxlength + t.CollectionID = collectionID if hasMmapProp(t.Properties...) || hasLazyLoadProp(t.Properties...) { loaded, err := isCollectionLoaded(ctx, t.queryCoord, t.CollectionID) @@ -1013,7 +1011,7 @@ func (t *alterCollectionTask) PreExecute(ctx context.Context) error { func (t *alterCollectionTask) Execute(ctx context.Context) error { var err error - t.result, err = t.rootCoord.AlterCollection(ctx, t.AlterCollectionRequest) + t.result, err = t.rootCoord.AlterCollectionOrField(ctx, t.AlterCollectionRequest) return merr.CheckRPCCall(t.result, err) } @@ -1072,7 +1070,6 @@ func (t *createPartitionTask) OnEnqueue() error { } func (t *createPartitionTask) PreExecute(ctx context.Context) error { - collName, partitionTag := t.CollectionName, t.PartitionName if err := validateCollectionName(collName); err != nil { @@ -1155,7 +1152,6 @@ func (t *dropPartitionTask) OnEnqueue() error { } func (t *dropPartitionTask) PreExecute(ctx context.Context) error { - collName, partitionTag := t.CollectionName, t.PartitionName if err := validateCollectionName(collName); err != nil { @@ -1263,7 +1259,6 @@ func (t *hasPartitionTask) OnEnqueue() error { } func (t *hasPartitionTask) PreExecute(ctx context.Context) error { - collName, partitionTag := t.CollectionName, t.PartitionName if err := validateCollectionName(collName); err != nil { @@ -1337,7 +1332,6 @@ func (t *showPartitionsTask) OnEnqueue() error { } func (t *showPartitionsTask) PreExecute(ctx context.Context) error { - if err := validateCollectionName(t.CollectionName); err != nil { return err } @@ -1742,7 +1736,6 @@ func (t *releaseCollectionTask) OnEnqueue() error { } func (t *releaseCollectionTask) PreExecute(ctx context.Context) error { - collName := t.CollectionName if err := validateCollectionName(collName); err != nil { @@ -1836,7 +1829,6 @@ func (t *loadPartitionsTask) OnEnqueue() error { } func (t *loadPartitionsTask) PreExecute(ctx context.Context) error { - collName := t.CollectionName if err := validateCollectionName(collName); err != nil { @@ -1986,7 +1978,6 @@ func (t *releasePartitionsTask) OnEnqueue() error { } func (t *releasePartitionsTask) PreExecute(ctx context.Context) error { - collName := t.CollectionName if err := validateCollectionName(collName); err != nil { @@ -2091,7 +2082,6 @@ func (t *CreateResourceGroupTask) OnEnqueue() error { } func (t *CreateResourceGroupTask) PreExecute(ctx context.Context) error { - return nil } @@ -2156,7 +2146,6 @@ func (t *UpdateResourceGroupsTask) OnEnqueue() error { } func (t *UpdateResourceGroupsTask) PreExecute(ctx context.Context) error { - return nil } @@ -2224,7 +2213,6 @@ func (t *DropResourceGroupTask) OnEnqueue() error { } func (t *DropResourceGroupTask) PreExecute(ctx context.Context) error { - return nil } @@ -2289,7 +2277,6 @@ func (t *DescribeResourceGroupTask) OnEnqueue() error { } func (t *DescribeResourceGroupTask) PreExecute(ctx context.Context) error { - return nil } @@ -2415,7 +2402,6 @@ func (t *TransferNodeTask) OnEnqueue() error { } func (t *TransferNodeTask) PreExecute(ctx context.Context) error { - return nil } @@ -2480,7 +2466,6 @@ func (t *TransferReplicaTask) OnEnqueue() error { } func (t *TransferReplicaTask) PreExecute(ctx context.Context) error { - return nil } @@ -2554,7 +2539,6 @@ func (t *ListResourceGroupsTask) OnEnqueue() error { } func (t *ListResourceGroupsTask) PreExecute(ctx context.Context) error { - return nil } diff --git a/internal/rootcoord/alter_collection_task.go b/internal/rootcoord/alter_collection_task.go index 779e631bcf501..afb6579385dab 100644 --- a/internal/rootcoord/alter_collection_task.go +++ b/internal/rootcoord/alter_collection_task.go @@ -33,6 +33,7 @@ import ( type alterCollectionTask struct { baseTask Req *milvuspb.AlterCollectionRequest + // todo AlterCollectionFieldRequest } func (a *alterCollectionTask) Prepare(ctx context.Context) error { @@ -46,7 +47,7 @@ func (a *alterCollectionTask) Prepare(ctx context.Context) error { func (a *alterCollectionTask) Execute(ctx context.Context) error { // Now we only support alter properties of collection if a.Req.GetProperties() == nil { - return errors.New("only support alter collection properties, but collection properties is empty") + return errors.New("only support alter collection/field properties, but collection/field properties is empty") } oldColl, err := a.core.meta.GetCollectionByName(ctx, a.Req.GetDbName(), a.Req.GetCollectionName(), a.ts) @@ -57,7 +58,12 @@ func (a *alterCollectionTask) Execute(ctx context.Context) error { } newColl := oldColl.Clone() - updateCollectionProperties(newColl, a.Req.GetProperties()) + // todo a.Req.FieldName + if "" == "" { + updateCollectionProperties(newColl, a.Req.GetProperties()) + } else { + updateFieldProperties(newColl, "", a.Req.GetProperties()) + } ts := a.GetTs() redoTask := newBaseRedoTask(a.core.stepExecutor) @@ -68,11 +74,12 @@ func (a *alterCollectionTask) Execute(ctx context.Context) error { ts: ts, }) - a.Req.CollectionID = oldColl.CollectionID redoTask.AddSyncStep(&BroadcastAlteredCollectionStep{ - baseStep: baseStep{core: a.core}, - req: a.Req, - core: a.core, + baseStep: baseStep{core: a.core}, + dbName: a.Req.GetDbName(), + collectionName: oldColl.Name, + collectionID: oldColl.CollectionID, + core: a.core, }) // properties needs to be refreshed in the cache @@ -80,7 +87,7 @@ func (a *alterCollectionTask) Execute(ctx context.Context) error { redoTask.AddSyncStep(&expireCacheStep{ baseStep: baseStep{core: a.core}, dbName: a.Req.GetDbName(), - collectionNames: append(aliases, a.Req.GetCollectionName()), + collectionNames: append(aliases, oldColl.Name), collectionID: oldColl.CollectionID, opts: []proxyutil.ExpireCacheOpt{proxyutil.SetMsgType(commonpb.MsgType_AlterCollection)}, }) @@ -89,8 +96,21 @@ func (a *alterCollectionTask) Execute(ctx context.Context) error { } func updateCollectionProperties(coll *model.Collection, updatedProps []*commonpb.KeyValuePair) { + coll.Properties = newProperties(coll.Properties, updatedProps) +} + +func updateFieldProperties(coll *model.Collection, fieldName string, updatedProps []*commonpb.KeyValuePair) { + for i, field := range coll.Fields { + if field.Name == fieldName { + coll.Fields[i].TypeParams = newProperties(field.TypeParams, updatedProps) + return + } + } +} + +func newProperties(oldProps, updatedProps []*commonpb.KeyValuePair) []*commonpb.KeyValuePair { props := make(map[string]string) - for _, prop := range coll.Properties { + for _, prop := range oldProps { props[prop.Key] = prop.Value } @@ -107,5 +127,5 @@ func updateCollectionProperties(coll *model.Collection, updatedProps []*commonpb }) } - coll.Properties = propKV + return propKV } diff --git a/internal/rootcoord/alter_collection_task_test.go b/internal/rootcoord/alter_collection_task_test.go index 32525349864f4..5bb891e29c7de 100644 --- a/internal/rootcoord/alter_collection_task_test.go +++ b/internal/rootcoord/alter_collection_task_test.go @@ -26,6 +26,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/metastore/model" mockrootcoord "github.com/milvus-io/milvus/internal/rootcoord/mocks" "github.com/milvus-io/milvus/pkg/common" @@ -125,7 +126,7 @@ func Test_alterCollectionTask_Execute(t *testing.T) { meta.On("ListAliasesByID", mock.Anything).Return([]string{}) broker := newMockBroker() - broker.BroadcastAlteredCollectionFunc = func(ctx context.Context, req *milvuspb.AlterCollectionRequest) error { + broker.BroadcastAlteredCollectionFunc = func(ctx context.Context, dbName, collectionName string, collectionID UniqueID) error { return errors.New("err") } @@ -160,7 +161,7 @@ func Test_alterCollectionTask_Execute(t *testing.T) { meta.On("ListAliasesByID", mock.Anything).Return([]string{}) broker := newMockBroker() - broker.BroadcastAlteredCollectionFunc = func(ctx context.Context, req *milvuspb.AlterCollectionRequest) error { + broker.BroadcastAlteredCollectionFunc = func(ctx context.Context, dbName, collectionName string, collectionID UniqueID) error { return errors.New("err") } @@ -195,7 +196,7 @@ func Test_alterCollectionTask_Execute(t *testing.T) { meta.On("ListAliasesByID", mock.Anything).Return([]string{}) broker := newMockBroker() - broker.BroadcastAlteredCollectionFunc = func(ctx context.Context, req *milvuspb.AlterCollectionRequest) error { + broker.BroadcastAlteredCollectionFunc = func(ctx context.Context, dbName, collectionName string, collectionID UniqueID) error { return nil } @@ -271,4 +272,36 @@ func Test_alterCollectionTask_Execute(t *testing.T) { Value: "true", }) }) + + t.Run("test update collection field props", func(t *testing.T) { + fieldName := "VarCharField" + coll := &model.Collection{ + Fields: []*model.Field{ + { + Name: fieldName, + IsPrimaryKey: false, + DataType: schemapb.DataType_VarChar, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.MaxLengthKey, + Value: "100", + }, + }, + }, + }, + } + + updateProps1 := []*commonpb.KeyValuePair{ + { + Key: common.MaxLengthKey, + Value: "128", + }, + } + updateFieldProperties(coll, fieldName, updateProps1) + assert.Len(t, coll.Fields[0].TypeParams, 1) + assert.Contains(t, coll.Fields[0].TypeParams, &commonpb.KeyValuePair{ + Key: common.MaxLengthKey, + Value: "128", + }) + }) } diff --git a/internal/rootcoord/broker.go b/internal/rootcoord/broker.go index edd3bc0525faf..47941e0ffb206 100644 --- a/internal/rootcoord/broker.go +++ b/internal/rootcoord/broker.go @@ -24,7 +24,6 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" - "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -59,7 +58,7 @@ type Broker interface { DropCollectionIndex(ctx context.Context, collID UniqueID, partIDs []UniqueID) error // notify observer to clean their meta cache - BroadcastAlteredCollection(ctx context.Context, req *milvuspb.AlterCollectionRequest) error + BroadcastAlteredCollection(ctx context.Context, dbName, collectionName string, collectionID UniqueID) error } type ServerBroker struct { @@ -224,15 +223,15 @@ func (b *ServerBroker) GetSegmentIndexState(ctx context.Context, collID UniqueID return resp.GetStates(), nil } -func (b *ServerBroker) BroadcastAlteredCollection(ctx context.Context, req *milvuspb.AlterCollectionRequest) error { - log.Info("broadcasting request to alter collection", zap.String("collectionName", req.GetCollectionName()), zap.Int64("collectionID", req.GetCollectionID()), zap.Any("props", req.GetProperties())) +func (b *ServerBroker) BroadcastAlteredCollection(ctx context.Context, dbName, collectionName string, collectionID UniqueID) error { + log.Info("broadcasting request to alter collection", zap.String("collectionName", collectionName), zap.Int64("collectionID", collectionID)) - colMeta, err := b.s.meta.GetCollectionByID(ctx, req.GetDbName(), req.GetCollectionID(), typeutil.MaxTimestamp, false) + colMeta, err := b.s.meta.GetCollectionByID(ctx, dbName, collectionID, typeutil.MaxTimestamp, false) if err != nil { return err } - db, err := b.s.meta.GetDatabaseByName(ctx, req.GetDbName(), typeutil.MaxTimestamp) + db, err := b.s.meta.GetDatabaseByName(ctx, dbName, typeutil.MaxTimestamp) if err != nil { return err } @@ -242,7 +241,7 @@ func (b *ServerBroker) BroadcastAlteredCollection(ctx context.Context, req *milv partitionIDs = append(partitionIDs, p.PartitionID) } dcReq := &datapb.AlterCollectionRequest{ - CollectionID: req.GetCollectionID(), + CollectionID: collectionID, Schema: &schemapb.CollectionSchema{ Name: colMeta.Name, Description: colMeta.Description, @@ -264,7 +263,7 @@ func (b *ServerBroker) BroadcastAlteredCollection(ctx context.Context, req *milv if resp.ErrorCode != commonpb.ErrorCode_Success { return errors.New(resp.Reason) } - log.Info("done to broadcast request to alter collection", zap.String("collectionName", req.GetCollectionName()), zap.Int64("collectionID", req.GetCollectionID()), zap.Any("props", req.GetProperties())) + log.Info("done to broadcast request to alter collection", zap.String("collectionName", collectionName), zap.Int64("collectionID", collectionID), zap.Any("props", colMeta.Properties)) return nil } diff --git a/internal/rootcoord/broker_test.go b/internal/rootcoord/broker_test.go index 4d4102d560d43..1f7791bb39f9f 100644 --- a/internal/rootcoord/broker_test.go +++ b/internal/rootcoord/broker_test.go @@ -25,13 +25,13 @@ import ( "github.com/stretchr/testify/mock" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" - "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/internal/proto/datapb" pb "github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/internal/proto/indexpb" mockrootcoord "github.com/milvus-io/milvus/internal/rootcoord/mocks" + "github.com/milvus-io/milvus/pkg/util" "github.com/milvus-io/milvus/pkg/util/merr" ) @@ -226,7 +226,7 @@ func TestServerBroker_BroadcastAlteredCollection(t *testing.T) { c.meta = meta b := newServerBroker(c) ctx := context.Background() - err := b.BroadcastAlteredCollection(ctx, &milvuspb.AlterCollectionRequest{}) + err := b.BroadcastAlteredCollection(ctx, util.DefaultDBName, "", int64(1)) assert.Error(t, err) }) @@ -244,7 +244,7 @@ func TestServerBroker_BroadcastAlteredCollection(t *testing.T) { c.meta = meta b := newServerBroker(c) ctx := context.Background() - err := b.BroadcastAlteredCollection(ctx, &milvuspb.AlterCollectionRequest{}) + err := b.BroadcastAlteredCollection(ctx, util.DefaultDBName, "", int64(1)) assert.Error(t, err) }) @@ -262,7 +262,7 @@ func TestServerBroker_BroadcastAlteredCollection(t *testing.T) { c.meta = meta b := newServerBroker(c) ctx := context.Background() - err := b.BroadcastAlteredCollection(ctx, &milvuspb.AlterCollectionRequest{}) + err := b.BroadcastAlteredCollection(ctx, util.DefaultDBName, "", int64(1)) assert.Error(t, err) }) @@ -281,10 +281,7 @@ func TestServerBroker_BroadcastAlteredCollection(t *testing.T) { b := newServerBroker(c) ctx := context.Background() - req := &milvuspb.AlterCollectionRequest{ - CollectionID: 1, - } - err := b.BroadcastAlteredCollection(ctx, req) + err := b.BroadcastAlteredCollection(ctx, util.DefaultDBName, "", int64(1)) assert.NoError(t, err) }) } diff --git a/internal/rootcoord/mock_test.go b/internal/rootcoord/mock_test.go index da77ef67c3cac..a88926ca90933 100644 --- a/internal/rootcoord/mock_test.go +++ b/internal/rootcoord/mock_test.go @@ -890,7 +890,7 @@ type mockBroker struct { DropCollectionIndexFunc func(ctx context.Context, collID UniqueID, partIDs []UniqueID) error GetSegmentIndexStateFunc func(ctx context.Context, collID UniqueID, indexName string, segIDs []UniqueID) ([]*indexpb.SegmentIndexState, error) - BroadcastAlteredCollectionFunc func(ctx context.Context, req *milvuspb.AlterCollectionRequest) error + BroadcastAlteredCollectionFunc func(ctx context.Context, dbName, collectionName string, collectionID UniqueID) error GCConfirmFunc func(ctx context.Context, collectionID, partitionID UniqueID) bool } @@ -927,8 +927,8 @@ func (b mockBroker) GetSegmentIndexState(ctx context.Context, collID UniqueID, i return b.GetSegmentIndexStateFunc(ctx, collID, indexName, segIDs) } -func (b mockBroker) BroadcastAlteredCollection(ctx context.Context, req *milvuspb.AlterCollectionRequest) error { - return b.BroadcastAlteredCollectionFunc(ctx, req) +func (b mockBroker) BroadcastAlteredCollection(ctx context.Context, dbName, collectionName string, collectionID UniqueID) error { + return b.BroadcastAlteredCollectionFunc(ctx, dbName, collectionName, collectionID) } func (b mockBroker) GcConfirm(ctx context.Context, collectionID, partitionID UniqueID) bool { diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 7cf27432076ee..1ebc1a30687ac 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -1250,6 +1250,17 @@ func (c *Core) ShowCollections(ctx context.Context, in *milvuspb.ShowCollections } func (c *Core) AlterCollection(ctx context.Context, in *milvuspb.AlterCollectionRequest) (*commonpb.Status, error) { + return c.AlterCollectionOrField(ctx, &milvuspb.AlterCollectionRequest{ + Base: in.Base, + DbName: in.DbName, + CollectionName: in.CollectionName, + // todo FieldName: "" + Properties: in.Properties, + }) +} + +// todo AlterCollectionOrFieldRequest +func (c *Core) AlterCollectionOrField(ctx context.Context, in *milvuspb.AlterCollectionRequest) (*commonpb.Status, error) { if err := merr.CheckHealthy(c.GetStateCode()); err != nil { return merr.Status(err), nil } diff --git a/internal/rootcoord/step.go b/internal/rootcoord/step.go index 7c76715029ba4..950e559029f3d 100644 --- a/internal/rootcoord/step.go +++ b/internal/rootcoord/step.go @@ -21,7 +21,6 @@ import ( "fmt" "time" - "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus/internal/metastore/model" pb "github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/internal/util/proxyutil" @@ -444,19 +443,21 @@ func (a *AlterCollectionStep) Desc() string { type BroadcastAlteredCollectionStep struct { baseStep - req *milvuspb.AlterCollectionRequest - core *Core + dbName string + collectionName string + collectionID UniqueID + core *Core } func (b *BroadcastAlteredCollectionStep) Execute(ctx context.Context) ([]nestedStep, error) { // TODO: support online schema change mechanism // It only broadcast collection properties to DataCoord service - err := b.core.broker.BroadcastAlteredCollection(ctx, b.req) + err := b.core.broker.BroadcastAlteredCollection(ctx, b.dbName, b.collectionName, b.collectionID) return nil, err } func (b *BroadcastAlteredCollectionStep) Desc() string { - return fmt.Sprintf("broadcast altered collection, collectionID: %d", b.req.CollectionID) + return fmt.Sprintf("broadcast altered collection, collectionID: %d", b.collectionID) } type AlterDatabaseStep struct { diff --git a/internal/util/mock/grpc_rootcoord_client.go b/internal/util/mock/grpc_rootcoord_client.go index ac597c961a390..c0d51d08e686e 100644 --- a/internal/util/mock/grpc_rootcoord_client.go +++ b/internal/util/mock/grpc_rootcoord_client.go @@ -262,6 +262,10 @@ func (m *GrpcRootCoordClient) AlterCollection(ctx context.Context, in *milvuspb. return &commonpb.Status{}, m.Err } +func (m *GrpcRootCoordClient) AlterCollectionOrField(ctx context.Context, in *milvuspb.AlterCollectionRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { + return &commonpb.Status{}, m.Err +} + func (m *GrpcRootCoordClient) AlterDatabase(ctx context.Context, in *rootcoordpb.AlterDatabaseRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { return &commonpb.Status{}, m.Err }