Skip to content

Commit

Permalink
support truncate collection
Browse files Browse the repository at this point in the history
Signed-off-by: PowderLi <[email protected]>
  • Loading branch information
PowderLi committed Jul 17, 2024
1 parent 88b373b commit ebff870
Show file tree
Hide file tree
Showing 42 changed files with 2,489 additions and 21 deletions.
55 changes: 55 additions & 0 deletions client/mock_milvus_server_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 4 additions & 10 deletions internal/core/thirdparty/jemalloc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,6 @@ message(STATUS "Building (vendored) jemalloc from source")
# installations.
# find_package(jemalloc)

include(CheckSymbolExists)

macro(detect_aarch64_target_arch)
check_symbol_exists(__aarch64__ "" __AARCH64)
endmacro()
detect_aarch64_target_arch()

set(JEMALLOC_PREFIX "${CMAKE_INSTALL_PREFIX}")
set(JEMALLOC_LIB_DIR "${JEMALLOC_PREFIX}/lib")
set(JEMALLOC_STATIC_LIB "${JEMALLOC_LIB_DIR}/libjemalloc_pic${CMAKE_STATIC_LIBRARY_SUFFIX}")
Expand All @@ -44,9 +37,10 @@ if (CMAKE_OSX_SYSROOT)
list(APPEND JEMALLOC_CONFIGURE_COMMAND "SDKROOT=${CMAKE_OSX_SYSROOT}")
endif ()

if (DEFINED __AARCH64)
#aarch64 platform use 64k pagesize.
list(APPEND JEMALLOC_CONFIGURE_COMMAND "--with-lg-page=16")
if (DEFINED MILVUS_JEMALLOC_LG_PAGE)
# Used for arm64 manylinux wheels in order to make the wheel work on both
# 4k and 64k page arm64 systems.
list(APPEND JEMALLOC_CONFIGURE_COMMAND "--with-lg-page=${MILVUS_JEMALLOC_LG_PAGE}")
endif ()

list(APPEND
Expand Down
22 changes: 22 additions & 0 deletions internal/datacoord/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,28 @@ func (m *mockRootCoordClient) ListPolicy(ctx context.Context, in *internalpb.Lis
return &internalpb.ListPolicyResponse{Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}}, nil
}

func (m *mockRootCoordClient) TruncateCollection(ctx context.Context, req *milvuspb.DropCollectionRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
panic("not implemented") // TODO: Implement
}

func (m *mockRootCoordClient) DescribeCollectionWithState(ctx context.Context, req *milvuspb.DescribeCollectionRequest, opts ...grpc.CallOption) (*rootcoordpb.DescribeCollectionResponse, error) {
// return not exist
if req.CollectionID == -1 {
err := merr.WrapErrCollectionNotFound(req.GetCollectionID())
return &rootcoordpb.DescribeCollectionResponse{
Status: merr.Status(err),
}, nil
}
return &rootcoordpb.DescribeCollectionResponse{
Status: merr.Success(),
Schema: &schemapb.CollectionSchema{
Name: "test",
},
CollectionID: 1314,
VirtualChannelNames: []string{"vchan1"},
}, nil
}

type mockHandler struct {
meta *meta
}
Expand Down
6 changes: 6 additions & 0 deletions internal/distributed/proxy/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1242,3 +1242,9 @@ func (s *Server) InvalidateShardLeaderCache(ctx context.Context, req *proxypb.In
func (s *Server) DescribeDatabase(ctx context.Context, req *milvuspb.DescribeDatabaseRequest) (*milvuspb.DescribeDatabaseResponse, error) {
return s.proxy.DescribeDatabase(ctx, req)
}

func (s *Server) TruncateCollection(ctx context.Context, request *milvuspb.DropCollectionRequest) (*commonpb.Status, error) {
return nil, nil
// todo milvuspb.MilvusServiceServer
// return s.proxy.TruncateCollection(ctx, request)
}
24 changes: 24 additions & 0 deletions internal/distributed/rootcoord/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,3 +670,27 @@ func (c *Client) AlterDatabase(ctx context.Context, request *rootcoordpb.AlterDa
return client.AlterDatabase(ctx, request)
})
}

// TruncateCollection drop collection
func (c *Client) TruncateCollection(ctx context.Context, in *milvuspb.DropCollectionRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
in = typeutil.Clone(in)
commonpbutil.UpdateMsgBase(
in.GetBase(),
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
)
return wrapGrpcCall(ctx, c, func(client rootcoordpb.RootCoordClient) (*commonpb.Status, error) {
return client.TruncateCollection(ctx, in)
})
}

// DescribeCollectionWithState return collection info
func (c *Client) DescribeCollectionWithState(ctx context.Context, in *milvuspb.DescribeCollectionRequest, opts ...grpc.CallOption) (*rootcoordpb.DescribeCollectionResponse, error) {
in = typeutil.Clone(in)
commonpbutil.UpdateMsgBase(
in.GetBase(),
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
)
return wrapGrpcCall(ctx, c, func(client rootcoordpb.RootCoordClient) (*rootcoordpb.DescribeCollectionResponse, error) {
return client.DescribeCollectionWithState(ctx, in)
})
}
10 changes: 10 additions & 0 deletions internal/distributed/rootcoord/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,3 +528,13 @@ func (s *Server) AlterCollection(ctx context.Context, request *milvuspb.AlterCol
func (s *Server) RenameCollection(ctx context.Context, request *milvuspb.RenameCollectionRequest) (*commonpb.Status, error) {
return s.rootCoord.RenameCollection(ctx, request)
}

// TruncateCollection drops a collection
func (s *Server) TruncateCollection(ctx context.Context, in *milvuspb.DropCollectionRequest) (*commonpb.Status, error) {
return s.rootCoord.TruncateCollection(ctx, in)
}

// DescribeCollectionWithState gets meta info of a collection
func (s *Server) DescribeCollectionWithState(ctx context.Context, in *milvuspb.DescribeCollectionRequest) (*rootcoordpb.DescribeCollectionResponse, error) {
return s.rootCoord.DescribeCollectionWithState(ctx, in)
}
1 change: 1 addition & 0 deletions internal/metastore/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type RootCoordCatalog interface {
// List all user role pair in string for the tenant
// For example []string{"user1/role1"}
ListUserRole(ctx context.Context, tenant string) ([]string, error)
UpdateCollectionAndAlias(ctx context.Context, collections []*model.Collection, aliases []*model.Alias, ts typeutil.Timestamp) error

Close()
}
Expand Down
39 changes: 39 additions & 0 deletions internal/metastore/kv/rootcoord/kv_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -1210,6 +1210,45 @@ func (kc *Catalog) ListUserRole(ctx context.Context, tenant string) ([]string, e
return userRoles, nil
}

func (kc *Catalog) UpdateCollectionAndAlias(ctx context.Context, collections []*model.Collection, aliases []*model.Alias, ts typeutil.Timestamp) error {
if len(collections) == 0 && len(aliases) == 0 {
log.Ctx(ctx).Debug("there is no collection and alias to update")
return nil
}
kvMap := make(map[string]string, 0)
toRemoveKeys := make([]string, 0)
for i, alias := range aliases {
oldKBefore210 := BuildAliasKey210(alias.Name)
oldKeyWithoutDb := BuildAliasKey(alias.Name)
toRemoveKeys = append(toRemoveKeys, oldKBefore210, oldKeyWithoutDb)
k := BuildAliasKeyWithDB(alias.DbID, alias.Name)
aliasInfo := model.MarshalAliasModel(alias)
v, err := proto.Marshal(aliasInfo)
if err != nil {
return fmt.Errorf("failed to marshal alias(%s) info: %s", alias.Name, err.Error())
}
kvMap[k] = string(v)
log.Ctx(ctx).Debug("update an alias, need to remove old format alias",
zap.String("key", k), zap.String("value", string(v)),
zap.String("oldKBefore210", oldKBefore210), zap.String("oldKeyWithoutDb", oldKeyWithoutDb),
zap.Int("total", len(aliases)), zap.Int("no.", i))
}
for i, collection := range collections {
key := BuildCollectionKey(collection.DBID, collection.CollectionID)
collInfo := model.MarshalCollectionModel(collection)
value, err := proto.Marshal(collInfo)
if err != nil {
return fmt.Errorf("failed to marshal collection(%s) info: %s", collection.Name, err.Error())
}
kvMap[key] = string(value)
log.Ctx(ctx).Debug("update an collection",
zap.String("key", key), zap.String("value", string(value)),
zap.Int("total", len(collections)), zap.Int("no.", i))
}

return kc.Snapshot.MultiSaveAndRemove(kvMap, toRemoveKeys, ts)
}

func (kc *Catalog) Close() {
// do nothing
}
Expand Down
24 changes: 24 additions & 0 deletions internal/metastore/kv/rootcoord/kv_catalog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2570,3 +2570,27 @@ func TestCatalog_AlterDatabase(t *testing.T) {
err = c.AlterDatabase(ctx, newDB, typeutil.ZeroTimestamp)
assert.ErrorIs(t, err, mockErr)
}

func TestCatalog_UpdateCollectionAndAlias(t *testing.T) {
kvmock := mocks.NewSnapShotKV(t)
kvmock.EXPECT().MultiSaveAndRemove(mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
c := &Catalog{Snapshot: kvmock}
ctx := context.Background()
err := c.UpdateCollectionAndAlias(ctx, []*model.Collection{}, []*model.Alias{}, uint64(1))
assert.NoError(t, err)
err = c.UpdateCollectionAndAlias(ctx, []*model.Collection{
{
CollectionID: 1,
Name: "tempColl",
State: pb.CollectionState_CollectionDropping,
},
{
CollectionID: 0,
Name: "coll",
State: pb.CollectionState_CollectionCreated,
},
}, []*model.Alias{
{Name: "collAlias0"},
}, uint64(1))
assert.NoError(t, err)
}
45 changes: 45 additions & 0 deletions internal/metastore/mocks/mock_rootcoord_catalog.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

55 changes: 55 additions & 0 deletions internal/mocks/mock_proxy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit ebff870

Please sign in to comment.