Skip to content

Commit

Permalink
enhance: Seal by total growing segments size (#34692)
Browse files Browse the repository at this point in the history
Seals the largest growing segment if the total size of growing segments
of each shard exceeds the size threshold(default 4GB). Introducing this
policy can help keep the size of growing segments within a suitable
level, alleviating the pressure on the delegator.

issue: #34554

---------

Signed-off-by: bigsheeper <[email protected]>
  • Loading branch information
bigsheeper authored Jul 17, 2024
1 parent e56ab76 commit 4939f82
Show file tree
Hide file tree
Showing 9 changed files with 283 additions and 9 deletions.
3 changes: 3 additions & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,9 @@ dataCoord:
# The max number of binlog file for one segment, the segment will be sealed if
# the number of binlog file reaches to max value.
maxBinlogFileNumber: 32
# The size threshold in MB, if the total size of growing segments
# exceeds this threshold, the largest growing segment will be sealed.
totalGrowingSizeThresholdInMB: 4096
smallProportion: 0.5 # The segment is considered as "small segment" when its # of rows is smaller than
# (smallProportion * segment max # of rows).
# A compaction will happen on small segments if the segment after compaction will have
Expand Down
36 changes: 32 additions & 4 deletions internal/datacoord/segment_allocation_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/cockroachdb/errors"
"github.com/samber/lo"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
Expand Down Expand Up @@ -172,20 +173,47 @@ func sealL1SegmentByIdleTime(idleTimeTolerance time.Duration, minSizeToSealIdleS
}

// channelSealPolicy seal policy applies to channel
type channelSealPolicy func(string, []*SegmentInfo, Timestamp) []*SegmentInfo
type channelSealPolicy func(string, []*SegmentInfo, Timestamp) ([]*SegmentInfo, string)

// getChannelOpenSegCapacityPolicy get channelSealPolicy with channel segment capacity policy
func getChannelOpenSegCapacityPolicy(limit int) channelSealPolicy {
return func(channel string, segs []*SegmentInfo, ts Timestamp) []*SegmentInfo {
return func(channel string, segs []*SegmentInfo, ts Timestamp) ([]*SegmentInfo, string) {
if len(segs) <= limit {
return []*SegmentInfo{}
return []*SegmentInfo{}, ""
}
sortSegmentsByLastExpires(segs)
offLen := len(segs) - limit
if offLen > len(segs) {
offLen = len(segs)
}
return segs[0:offLen]
return segs[0:offLen], fmt.Sprintf("seal by channel segment capacity, len(segs)=%d, limit=%d", len(segs), limit)
}
}

// sealByTotalGrowingSegmentsSize seals the largest growing segment
// if the total size of growing segments exceeds the threshold.
func sealByTotalGrowingSegmentsSize() channelSealPolicy {
return func(channel string, segments []*SegmentInfo, ts Timestamp) ([]*SegmentInfo, string) {
growingSegments := lo.Filter(segments, func(segment *SegmentInfo, _ int) bool {
return segment != nil && segment.GetState() == commonpb.SegmentState_Growing
})

var totalSize int64
sizeMap := lo.SliceToMap(growingSegments, func(segment *SegmentInfo) (int64, int64) {
size := segment.getSegmentSize()
totalSize += size
return segment.GetID(), size
})

threshold := paramtable.Get().DataCoordCfg.TotalGrowingSizeThresholdInMB.GetAsInt64() * 1024 * 1024
if totalSize >= threshold {
target := lo.MaxBy(growingSegments, func(s1, s2 *SegmentInfo) bool {
return sizeMap[s1.GetID()] > sizeMap[s2.GetID()]
})
return []*SegmentInfo{target}, fmt.Sprintf("seal by total growing segments size, "+
"totalSize=%d, threshold=%d", totalSize, threshold)
}
return nil, ""
}
}

Expand Down
37 changes: 36 additions & 1 deletion internal/datacoord/segment_allocation_policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
)

Expand Down Expand Up @@ -132,7 +133,7 @@ func TestGetChannelOpenSegCapacityPolicy(t *testing.T) {
},
}
for _, c := range testCases {
result := p(c.channel, c.segments, c.ts)
result, _ := p(c.channel, c.segments, c.ts)
if c.validator != nil {
assert.True(t, c.validator(result))
}
Expand Down Expand Up @@ -195,3 +196,37 @@ func Test_sealLongTimeIdlePolicy(t *testing.T) {
shouldSeal, _ = policy.ShouldSeal(seg3, 100)
assert.True(t, shouldSeal)
}

func Test_sealByTotalGrowingSegmentsSize(t *testing.T) {
paramtable.Get().Save(paramtable.Get().DataCoordCfg.TotalGrowingSizeThresholdInMB.Key, "100")
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.TotalGrowingSizeThresholdInMB.Key)

seg0 := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{
ID: 0,
State: commonpb.SegmentState_Growing,
Binlogs: []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{MemorySize: 30 * MB}}}},
}}
seg1 := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{
ID: 1,
State: commonpb.SegmentState_Growing,
Binlogs: []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{MemorySize: 40 * MB}}}},
}}
seg2 := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{
ID: 2,
State: commonpb.SegmentState_Growing,
Binlogs: []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{MemorySize: 50 * MB}}}},
}}
seg3 := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{
ID: 3,
State: commonpb.SegmentState_Sealed,
}}

fn := sealByTotalGrowingSegmentsSize()
// size not reach threshold
res, _ := fn("ch-0", []*SegmentInfo{seg0}, 0)
assert.Equal(t, 0, len(res))
// size reached the threshold
res, _ = fn("ch-0", []*SegmentInfo{seg0, seg1, seg2, seg3}, 0)
assert.Equal(t, 1, len(res))
assert.Equal(t, seg2.GetID(), res[0].GetID())
}
2 changes: 1 addition & 1 deletion internal/datacoord/segment_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ func SetLevel(level datapb.SegmentLevel) SegmentInfoOption {
}

func (s *SegmentInfo) getSegmentSize() int64 {
if s.size.Load() <= 0 {
if s.size.Load() <= 0 || s.GetState() == commonpb.SegmentState_Growing {
var size int64
for _, binlogs := range s.GetBinlogs() {
for _, l := range binlogs.GetBinlogs() {
Expand Down
20 changes: 17 additions & 3 deletions internal/datacoord/segment_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,12 @@ func defaultSegmentSealPolicy() []SegmentSealPolicy {
}
}

func defaultChannelSealPolicy() []channelSealPolicy {
return []channelSealPolicy{
sealByTotalGrowingSegmentsSize(),
}
}

func defaultFlushPolicy() flushPolicy {
return flushPolicyL1
}
Expand All @@ -211,8 +217,8 @@ func newSegmentManager(meta *meta, allocator allocator, opts ...allocOption) (*S
segments: make([]UniqueID, 0),
estimatePolicy: defaultCalUpperLimitPolicy(),
allocPolicy: defaultAllocatePolicy(),
segmentSealPolicies: defaultSegmentSealPolicy(), // default only segment size policy
channelSealPolicies: []channelSealPolicy{}, // no default channel seal policy
segmentSealPolicies: defaultSegmentSealPolicy(),
channelSealPolicies: defaultChannelSealPolicy(),
flushPolicy: defaultFlushPolicy(),
}
for _, opt := range opts {
Expand Down Expand Up @@ -644,6 +650,7 @@ func isEmptySealedSegment(segment *SegmentInfo, ts Timestamp) bool {
// tryToSealSegment applies segment & channel seal policies
func (s *SegmentManager) tryToSealSegment(ts Timestamp, channel string) error {
channelInfo := make(map[string][]*SegmentInfo)
sealedSegments := make(map[int64]struct{})
for _, id := range s.segments {
info := s.meta.GetHealthySegment(id)
if info == nil || info.InsertChannel != channel {
Expand All @@ -660,20 +667,27 @@ func (s *SegmentManager) tryToSealSegment(ts Timestamp, channel string) error {
if err := s.meta.SetState(id, commonpb.SegmentState_Sealed); err != nil {
return err
}
sealedSegments[id] = struct{}{}
break
}
}
}
for channel, segmentInfos := range channelInfo {
for _, policy := range s.channelSealPolicies {
vs := policy(channel, segmentInfos, ts)
vs, reason := policy(channel, segmentInfos, ts)
for _, info := range vs {
if _, ok := sealedSegments[info.GetID()]; ok {
continue
}
if info.State != commonpb.SegmentState_Growing {
continue
}
if err := s.meta.SetState(info.GetID(), commonpb.SegmentState_Sealed); err != nil {
return err
}
log.Info("seal segment for channel seal policy matched",
zap.Int64("segmentID", info.GetID()), zap.String("channel", channel), zap.String("reason", reason))
sealedSegments[info.GetID()] = struct{}{}
}
}
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/util/paramtable/component_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -2881,6 +2881,7 @@ type dataCoordConfig struct {
SegmentMaxIdleTime ParamItem `refreshable:"false"`
SegmentMinSizeFromIdleToSealed ParamItem `refreshable:"false"`
SegmentMaxBinlogFileNumber ParamItem `refreshable:"false"`
TotalGrowingSizeThresholdInMB ParamItem `refreshable:"true"`
AutoUpgradeSegmentIndex ParamItem `refreshable:"true"`
SegmentFlushInterval ParamItem `refreshable:"true"`

Expand Down Expand Up @@ -3117,6 +3118,16 @@ the number of binlog file reaches to max value.`,
}
p.SegmentMaxBinlogFileNumber.Init(base.mgr)

p.TotalGrowingSizeThresholdInMB = ParamItem{
Key: "dataCoord.segment.totalGrowingSizeThresholdInMB",
Version: "2.4.6",
DefaultValue: "4096",
Doc: `The size threshold in MB, if the total size of growing segments
exceeds this threshold, the largest growing segment will be sealed.`,
Export: true,
}
p.TotalGrowingSizeThresholdInMB.Init(base.mgr)

p.EnableCompaction = ParamItem{
Key: "dataCoord.enableCompaction",
Version: "2.0.0",
Expand Down
1 change: 1 addition & 0 deletions pkg/util/paramtable/component_param_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,7 @@ func TestComponentParam(t *testing.T) {
assert.True(t, Params.EnableGarbageCollection.GetAsBool())
assert.Equal(t, Params.EnableActiveStandby.GetAsBool(), false)
t.Logf("dataCoord EnableActiveStandby = %t", Params.EnableActiveStandby.GetAsBool())
assert.Equal(t, int64(4096), Params.TotalGrowingSizeThresholdInMB.GetAsInt64())

assert.Equal(t, true, Params.AutoBalance.GetAsBool())
assert.Equal(t, 10, Params.CheckAutoBalanceConfigInterval.GetAsInt())
Expand Down
138 changes: 138 additions & 0 deletions tests/integration/sealpolicies/seal_by_total_growing_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sealpolicies

import (
"context"
"time"

"github.com/golang/protobuf/proto"
"github.com/samber/lo"
"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/tests/integration"
)

func (s *SealSuite) TestSealByTotalGrowingSegmentsSize() {
paramtable.Get().Save(paramtable.Get().DataCoordCfg.TotalGrowingSizeThresholdInMB.Key, "10")
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.TotalGrowingSizeThresholdInMB.Key)

paramtable.Get().Save(paramtable.Get().DataNodeCfg.SyncPeriod.Key, "5")
defer paramtable.Get().Reset(paramtable.Get().DataNodeCfg.SyncPeriod.Key)

ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10)
defer cancel()
c := s.Cluster

const (
dim = 128
dbName = ""
rowNum = 100000

vecType = schemapb.DataType_FloatVector
)

collectionName := "TestSealByGrowingSegmentsSize_" + funcutil.GenRandomStr()

schema := integration.ConstructSchemaOfVecDataType(collectionName, dim, true, vecType)
schema.Fields = append(schema.Fields, &schemapb.FieldSchema{
FieldID: 102,
Name: "pid",
DataType: schemapb.DataType_Int64,
IsPartitionKey: true,
})
marshaledSchema, err := proto.Marshal(schema)
s.NoError(err)

// create collection
createCollectionStatus, err := c.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{
DbName: dbName,
CollectionName: collectionName,
Schema: marshaledSchema,
ShardsNum: common.DefaultShardsNum,
ConsistencyLevel: commonpb.ConsistencyLevel_Strong,
})
err = merr.CheckRPCCall(createCollectionStatus, err)
s.NoError(err)
log.Info("CreateCollection result", zap.Any("createCollectionStatus", createCollectionStatus))

// show collection
showCollectionsResp, err := c.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{})
err = merr.CheckRPCCall(showCollectionsResp, err)
s.NoError(err)
log.Info("ShowCollections result", zap.Any("showCollectionsResp", showCollectionsResp))

// insert
fVecColumn := integration.NewFloatVectorFieldData(integration.FloatVecField, rowNum, dim)
partitionKeyColumn := integration.NewInt64FieldDataWithStart("pid", rowNum, 0)
hashKeys := integration.GenerateHashKeys(rowNum)
insertResult, err := c.Proxy.Insert(ctx, &milvuspb.InsertRequest{
DbName: dbName,
CollectionName: collectionName,
FieldsData: []*schemapb.FieldData{fVecColumn, partitionKeyColumn},
HashKeys: hashKeys,
NumRows: uint32(rowNum),
})
err = merr.CheckRPCCall(insertResult, err)
s.NoError(err)
s.Equal(int64(rowNum), insertResult.GetInsertCnt())

// wait for segment seal and flush
showSegments := func() bool {
var segments []*datapb.SegmentInfo
segments, err = c.MetaWatcher.ShowSegments()
s.NoError(err)
s.NotEmpty(segments)
flushedSegments := lo.Filter(segments, func(segment *datapb.SegmentInfo, _ int) bool {
return segment.GetState() == commonpb.SegmentState_Flushed
})
log.Info("ShowSegments result", zap.Int("len(segments)", len(segments)),
zap.Int("len(flushedSegments)", len(flushedSegments)))
return len(flushedSegments) >= 1
}
for !showSegments() {
select {
case <-ctx.Done():
s.Fail("waiting for segment sealed timeout")
return
case <-time.After(1 * time.Second):
}
}

// release collection
status, err := c.Proxy.ReleaseCollection(ctx, &milvuspb.ReleaseCollectionRequest{
CollectionName: collectionName,
})
err = merr.CheckRPCCall(status, err)
s.NoError(err)

// drop collection
status, err = c.Proxy.DropCollection(ctx, &milvuspb.DropCollectionRequest{
CollectionName: collectionName,
})
err = merr.CheckRPCCall(status, err)
s.NoError(err)
}
Loading

0 comments on commit 4939f82

Please sign in to comment.