From 9fde735855c89fc96c0fe2696a499f35695f8bb1 Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Mon, 2 Dec 2024 12:01:50 +0800 Subject: [PATCH] enhance: Reduce memory usage of BF in DataNode and QueryNode Signed-off-by: bigsheeper --- .../datanode/writebuffer/l0_write_buffer.go | 22 ++++++++++--------- .../writebuffer/l0_write_buffer_test.go | 2 -- internal/querynodev2/segments/segment.go | 15 +++++++++++++ pkg/util/paramtable/component_param.go | 10 +++++++++ pkg/util/paramtable/component_param_test.go | 2 ++ 5 files changed, 39 insertions(+), 12 deletions(-) diff --git a/internal/datanode/writebuffer/l0_write_buffer.go b/internal/datanode/writebuffer/l0_write_buffer.go index 1a9966020f5a4..732867993dd95 100644 --- a/internal/datanode/writebuffer/l0_write_buffer.go +++ b/internal/datanode/writebuffer/l0_write_buffer.go @@ -165,23 +165,25 @@ func (wb *l0WriteBuffer) BufferData(insertMsgs []*msgstream.InsertMsg, deleteMsg if paramtable.Get().DataNodeCfg.SkipBFStatsLoad.GetAsBool() { // In Skip BF mode, datanode no longer maintains bloom filters. - // So, here we skip filtering delete entries. + // So, here we skip generating BF (growing segment's BF will be regenerated during the sync phase) + // and also skip filtering delete entries by bf. wb.dispatchDeleteMsgsWithoutFilter(deleteMsgs, startPos, endPos) } else { // distribute delete msg // bf write buffer check bloom filter of segment and current insert batch to decide which segment to write delete data wb.dispatchDeleteMsgs(groups, deleteMsgs, startPos, endPos) + + // update pk oracle + for _, inData := range groups { + // segment shall always exists after buffer insert + segments := wb.metaCache.GetSegmentsBy(metacache.WithSegmentIDs(inData.segmentID)) + for _, segment := range segments { + for _, fieldData := range inData.pkField { + err := segment.GetBloomFilterSet().UpdatePKRange(fieldData) + if err != nil { + return err } - // update pk oracle - for _, inData := range groups { - // segment shall always exists after buffer insert - segments := wb.metaCache.GetSegmentsBy(metacache.WithSegmentIDs(inData.segmentID)) - for _, segment := range segments { - for _, fieldData := range inData.pkField { - err := segment.GetBloomFilterSet().UpdatePKRange(fieldData) - if err != nil { - return err } } } diff --git a/internal/datanode/writebuffer/l0_write_buffer_test.go b/internal/datanode/writebuffer/l0_write_buffer_test.go index 5d79a25b40203..7c39fabf926f3 100644 --- a/internal/datanode/writebuffer/l0_write_buffer_test.go +++ b/internal/datanode/writebuffer/l0_write_buffer_test.go @@ -181,8 +181,6 @@ func (s *L0WriteBufferSuite) TestBufferData() { pks, msg := s.composeInsertMsg(1000, 10, 128, schemapb.DataType_Int64) delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) })) - seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet()) - s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg}) s.metacache.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false).Once() s.metacache.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return() s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index 00509135cb8d2..24a4e41db6f70 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -94,6 +94,7 @@ type baseSegment struct { bloomFilterSet *pkoracle.BloomFilterSet loadInfo *atomic.Pointer[querypb.SegmentLoadInfo] isLazyLoad bool + skipGrowingBF bool // Skip generating or maintaining BF for growing segments; deletion checks will be handled in segcore. channel metautil.Channel resourceUsageCache *atomic.Pointer[ResourceUsage] @@ -114,6 +115,7 @@ func newBaseSegment(collection *Collection, segmentType SegmentType, version int bloomFilterSet: pkoracle.NewBloomFilterSet(loadInfo.GetSegmentID(), loadInfo.GetPartitionID(), segmentType), channel: channel, isLazyLoad: isLazyLoad(collection, segmentType), + skipGrowingBF: segmentType == SegmentTypeGrowing && paramtable.Get().QueryNodeCfg.SkipGrowingSegmentBF.GetAsBool(), resourceUsageCache: atomic.NewPointer[ResourceUsage](nil), needUpdatedVersion: atomic.NewInt64(0), @@ -183,6 +185,9 @@ func (s *baseSegment) LoadInfo() *querypb.SegmentLoadInfo { } func (s *baseSegment) UpdateBloomFilter(pks []storage.PrimaryKey) { + if s.skipGrowingBF { + return + } s.bloomFilterSet.UpdateBloomFilter(pks) } @@ -190,10 +195,20 @@ func (s *baseSegment) UpdateBloomFilter(pks []storage.PrimaryKey) { // false otherwise, // may returns true even the PK doesn't exist actually func (s *baseSegment) MayPkExist(pk *storage.LocationsCache) bool { + if s.skipGrowingBF { + return true + } return s.bloomFilterSet.MayPkExist(pk) } func (s *baseSegment) BatchPkExist(lc *storage.BatchLocationsCache) []bool { + if s.skipGrowingBF { + allPositive := make([]bool, lc.Size()) + for i := 0; i < lc.Size(); i++ { + allPositive[i] = true + } + return allPositive + } return s.bloomFilterSet.BatchPkExist(lc) } diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 86467ac47488c..2d9742a189d9d 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -2448,6 +2448,8 @@ type queryNodeConfig struct { DefaultSegmentFilterRatio ParamItem `refreshable:"false"` UseStreamComputing ParamItem `refreshable:"false"` + // BF + SkipGrowingSegmentBF ParamItem `refreshable:"true"` BloomFilterApplyParallelFactor ParamItem `refreshable:"true"` QueryStreamBatchSize ParamItem `refreshable:"false"` @@ -3144,6 +3146,14 @@ user-task-polling: } p.QueryStreamBatchSize.Init(base.mgr) + p.SkipGrowingSegmentBF = ParamItem{ + Key: "queryNode.skipGrowingSegmentBF", + Version: "2.5", + DefaultValue: "true", + Doc: "indicates whether skipping the creation, maintenance, or checking of Bloom Filters for growing segments", + } + p.SkipGrowingSegmentBF.Init(base.mgr) + p.WorkerPoolingSize = ParamItem{ Key: "queryNode.workerPooling.size", Version: "2.4.7", diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 3e5b77504e6f1..4b625c42be26c 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -456,6 +456,8 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, 3*time.Second, Params.LazyLoadRequestResourceRetryInterval.GetAsDuration(time.Millisecond)) assert.Equal(t, 4, Params.BloomFilterApplyParallelFactor.GetAsInt()) + assert.Equal(t, true, Params.SkipGrowingSegmentBF.GetAsBool()) + assert.Equal(t, "/var/lib/milvus/data/mmap", Params.MmapDirPath.GetValue()) assert.Equal(t, true, Params.MmapChunkCache.GetAsBool())