Skip to content

Commit

Permalink
enhance: Enable database level replica num and resource groups for lo…
Browse files Browse the repository at this point in the history
…ading collection (#33052) (#33981)

pr: #33052

issue: #30040

This PR introduce two database level props:
1. database.replica.number
2. database.resource_groups

User can set those two database props by AlterDatabase API, then can
load collection without specified replica_num and resource groups. then
it will use database level load param when try to load collections.

Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
weiliu1031 authored Jun 21, 2024
1 parent 89461db commit 061a00c
Show file tree
Hide file tree
Showing 10 changed files with 575 additions and 10 deletions.
5 changes: 0 additions & 5 deletions internal/proxy/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1548,11 +1548,6 @@ func (t *loadCollectionTask) PreExecute(ctx context.Context) error {
return err
}

// To compat with LoadCollcetion before [email protected]
if t.ReplicaNumber == 0 {
t.ReplicaNumber = 1
}

return nil
}

Expand Down
46 changes: 46 additions & 0 deletions internal/querycoordv2/meta/coordinator_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/merr"
Expand All @@ -47,6 +49,8 @@ type Broker interface {
GetSegmentInfo(ctx context.Context, segmentID ...UniqueID) (*datapb.GetSegmentInfoResponse, error)
GetIndexInfo(ctx context.Context, collectionID UniqueID, segmentID UniqueID) ([]*querypb.FieldIndexInfo, error)
GetRecoveryInfoV2(ctx context.Context, collectionID UniqueID, partitionIDs ...UniqueID) ([]*datapb.VchannelInfo, []*datapb.SegmentInfo, error)
DescribeDatabase(ctx context.Context, dbName string) (*rootcoordpb.DescribeDatabaseResponse, error)
GetCollectionLoadInfo(ctx context.Context, collectionID UniqueID) ([]string, int64, error)
}

type CoordinatorBroker struct {
Expand Down Expand Up @@ -83,6 +87,48 @@ func (broker *CoordinatorBroker) DescribeCollection(ctx context.Context, collect
return resp, nil
}

func (broker *CoordinatorBroker) DescribeDatabase(ctx context.Context, dbName string) (*rootcoordpb.DescribeDatabaseResponse, error) {
ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond))
defer cancel()

req := &rootcoordpb.DescribeDatabaseRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_DescribeCollection),
),
DbName: dbName,
}
resp, err := broker.rootCoord.DescribeDatabase(ctx, req)
if err := merr.CheckRPCCall(resp, err); err != nil {
log.Ctx(ctx).Warn("failed to describe database", zap.Error(err))
return nil, err
}
return resp, nil
}

// try to get database level replica_num and resource groups, return (resource_groups, replica_num, error)
func (broker *CoordinatorBroker) GetCollectionLoadInfo(ctx context.Context, collectionID UniqueID) ([]string, int64, error) {
// to do by weiliu1031: querycoord should cache mappings: collectionID->dbName
collectionInfo, err := broker.DescribeCollection(ctx, collectionID)
if err != nil {
return nil, 0, err
}

dbInfo, err := broker.DescribeDatabase(ctx, collectionInfo.GetDbName())
if err != nil {
return nil, 0, err
}
replicaNum, err := common.DatabaseLevelReplicaNumber(dbInfo.GetProperties())
if err != nil {
return nil, 0, err
}
rgs, err := common.DatabaseLevelResourceGroups(dbInfo.GetProperties())
if err != nil {
return nil, 0, err
}

return rgs, replicaNum, nil
}

func (broker *CoordinatorBroker) GetPartitions(ctx context.Context, collectionID UniqueID) ([]UniqueID, error) {
ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond))
defer cancel()
Expand Down
87 changes: 87 additions & 0 deletions internal/querycoordv2/meta/coordinator_broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package meta

import (
"context"
"strings"
"testing"

"github.com/cockroachdb/errors"
Expand All @@ -32,6 +33,8 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
Expand Down Expand Up @@ -490,6 +493,90 @@ func (s *CoordinatorBrokerDataCoordSuite) TestGetIndexInfo() {
})
}

func (s *CoordinatorBrokerRootCoordSuite) TestDescribeDatabase() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

s.Run("normal_case", func() {
s.rootcoord.EXPECT().DescribeDatabase(mock.Anything, mock.Anything).
Return(&rootcoordpb.DescribeDatabaseResponse{
Status: merr.Success(),
}, nil)
_, err := s.broker.DescribeDatabase(ctx, "fake_db1")
s.NoError(err)
s.resetMock()
})

s.Run("rootcoord_return_error", func() {
s.rootcoord.EXPECT().DescribeDatabase(mock.Anything, mock.Anything).Return(nil, errors.New("fake error"))
_, err := s.broker.DescribeDatabase(ctx, "fake_db1")
s.Error(err)
s.resetMock()
})

s.Run("rootcoord_return_failure_status", func() {
s.rootcoord.EXPECT().DescribeDatabase(mock.Anything, mock.Anything).
Return(&rootcoordpb.DescribeDatabaseResponse{
Status: merr.Status(errors.New("fake error")),
}, nil)
_, err := s.broker.DescribeDatabase(ctx, "fake_db1")
s.Error(err)
s.resetMock()
})

s.Run("rootcoord_return_unimplemented", func() {
s.rootcoord.EXPECT().DescribeDatabase(mock.Anything, mock.Anything).Return(nil, merr.ErrServiceUnimplemented)
_, err := s.broker.DescribeDatabase(ctx, "fake_db1")
s.Error(err)
s.resetMock()
})
}

func (s *CoordinatorBrokerRootCoordSuite) TestGetCollectionLoadInfo() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

s.Run("normal_case", func() {
s.rootcoord.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(&milvuspb.DescribeCollectionResponse{
DbName: "fake_db1",
}, nil)
s.rootcoord.EXPECT().DescribeDatabase(mock.Anything, mock.Anything).
Return(&rootcoordpb.DescribeDatabaseResponse{
Status: merr.Success(),
Properties: []*commonpb.KeyValuePair{
{
Key: common.DatabaseReplicaNumber,
Value: "3",
},
{
Key: common.DatabaseResourceGroups,
Value: strings.Join([]string{"rg1", "rg2"}, ","),
},
},
}, nil)
rgs, replicas, err := s.broker.GetCollectionLoadInfo(ctx, 1)
s.NoError(err)
s.Equal(int64(3), replicas)
s.Contains(rgs, "rg1")
s.Contains(rgs, "rg2")
s.resetMock()
})

s.Run("props not set", func() {
s.rootcoord.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(&milvuspb.DescribeCollectionResponse{
DbName: "fake_db1",
}, nil)
s.rootcoord.EXPECT().DescribeDatabase(mock.Anything, mock.Anything).
Return(&rootcoordpb.DescribeDatabaseResponse{
Status: merr.Success(),
Properties: []*commonpb.KeyValuePair{},
}, nil)
_, _, err := s.broker.GetCollectionLoadInfo(ctx, 1)
s.Error(err)
s.resetMock()
})
}

func TestCoordinatorBroker(t *testing.T) {
suite.Run(t, new(CoordinatorBrokerRootCoordSuite))
suite.Run(t, new(CoordinatorBrokerDataCoordSuite))
Expand Down
119 changes: 119 additions & 0 deletions internal/querycoordv2/meta/mock_broker.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 7 additions & 5 deletions internal/querycoordv2/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,17 +436,19 @@ func (suite *ServerSuite) loadAll() {
for _, collection := range suite.collections {
if suite.loadTypes[collection] == querypb.LoadType_LoadCollection {
req := &querypb.LoadCollectionRequest{
CollectionID: collection,
ReplicaNumber: suite.replicaNumber[collection],
CollectionID: collection,
ReplicaNumber: suite.replicaNumber[collection],
ResourceGroups: []string{meta.DefaultResourceGroupName},
}
resp, err := suite.server.LoadCollection(ctx, req)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_Success, resp.ErrorCode)
} else {
req := &querypb.LoadPartitionsRequest{
CollectionID: collection,
PartitionIDs: suite.partitions[collection],
ReplicaNumber: suite.replicaNumber[collection],
CollectionID: collection,
PartitionIDs: suite.partitions[collection],
ReplicaNumber: suite.replicaNumber[collection],
ResourceGroups: []string{meta.DefaultResourceGroupName},
}
resp, err := suite.server.LoadPartitions(ctx, req)
suite.NoError(err)
Expand Down
36 changes: 36 additions & 0 deletions internal/querycoordv2/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,24 @@ func (s *Server) LoadCollection(ctx context.Context, req *querypb.LoadCollection
return merr.Status(err), nil
}

if req.GetReplicaNumber() <= 0 || len(req.GetResourceGroups()) == 0 {
// when replica number or resource groups is not set, use database level config
rgs, replicas, err := s.broker.GetCollectionLoadInfo(ctx, req.GetCollectionID())
if err != nil {
log.Warn("failed to get data base level load info", zap.Error(err))
}

if req.GetReplicaNumber() <= 0 {
log.Info("load collection use database level replica number", zap.Int64("databaseLevelReplicaNum", replicas))
req.ReplicaNumber = int32(replicas)
}

if len(req.GetResourceGroups()) == 0 {
log.Info("load collection use database level resource groups", zap.Strings("databaseLevelResourceGroups", rgs))
req.ResourceGroups = rgs
}
}

if err := s.checkResourceGroup(req.GetCollectionID(), req.GetResourceGroups()); err != nil {
msg := "failed to load collection"
log.Warn(msg, zap.Error(err))
Expand Down Expand Up @@ -316,6 +334,24 @@ func (s *Server) LoadPartitions(ctx context.Context, req *querypb.LoadPartitions
return merr.Status(err), nil
}

if req.GetReplicaNumber() <= 0 || len(req.GetResourceGroups()) == 0 {
// when replica number or resource groups is not set, use database level config
rgs, replicas, err := s.broker.GetCollectionLoadInfo(ctx, req.GetCollectionID())
if err != nil {
log.Warn("failed to get data base level load info", zap.Error(err))
}

if req.GetReplicaNumber() <= 0 {
log.Info("load collection use database level replica number", zap.Int64("databaseLevelReplicaNum", replicas))
req.ReplicaNumber = int32(replicas)
}

if len(req.GetResourceGroups()) == 0 {
log.Info("load collection use database level resource groups", zap.Strings("databaseLevelResourceGroups", rgs))
req.ResourceGroups = rgs
}
}

if err := s.checkResourceGroup(req.GetCollectionID(), req.GetResourceGroups()); err != nil {
msg := "failed to load partitions"
log.Warn(msg, zap.Error(err))
Expand Down
Loading

0 comments on commit 061a00c

Please sign in to comment.