Skip to content

Commit

Permalink
fix: predict inverted index resource usage more reasonably (milvus-io…
Browse files Browse the repository at this point in the history
…#31615)

/kind improvement
issue: milvus-io#31617

---------

Signed-off-by: longjiquan <[email protected]>
  • Loading branch information
longjiquan authored Mar 27, 2024
1 parent 655097f commit 4eb4df1
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 11 deletions.
3 changes: 2 additions & 1 deletion internal/core/src/segcore/load_index_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@

bool
IsLoadWithDisk(const char* index_type, int index_engine_version) {
return knowhere::UseDiskLoad(index_type, index_engine_version);
return knowhere::UseDiskLoad(index_type, index_engine_version) ||
strcmp(index_type, milvus::index::INVERTED_INDEX_TYPE) == 0;
}

CStatus
Expand Down
7 changes: 6 additions & 1 deletion internal/core/unittest/test_c_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include "expr/ITypeExpr.h"
#include "plan/PlanNode.h"
#include "exec/expression/Expr.h"
#include "segcore/load_index_c.h"

namespace chrono = std::chrono;

Expand Down Expand Up @@ -5272,4 +5273,8 @@ TEST(CApiTest, RANGE_SEARCH_WITH_RADIUS_AND_RANGE_FILTER_WHEN_IP_BFLOAT16) {
DeleteSearchResult(search_result);
DeleteCollection(c_collection);
DeleteSegment(segment);
}
}

TEST(CApiTest, IsLoadWithDisk) {
ASSERT_TRUE(IsLoadWithDisk(INVERTED_INDEX_TYPE, 0));
}
5 changes: 3 additions & 2 deletions internal/proxy/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/contextutil"
"github.com/milvus-io/milvus/pkg/util/crypto"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metric"
"github.com/milvus-io/milvus/pkg/util/paramtable"
Expand All @@ -65,10 +66,10 @@ const (
defaultMaxSearchRequest = 1024

// DefaultArithmeticIndexType name of default index type for scalar field
DefaultArithmeticIndexType = "INVERTED"
DefaultArithmeticIndexType = indexparamcheck.IndexINVERTED

// DefaultStringIndexType name of default index type for varChar/string field
DefaultStringIndexType = "INVERTED"
DefaultStringIndexType = indexparamcheck.IndexINVERTED

defaultRRFParamsValue = 60
maxRRFParamsValue = 16384
Expand Down
9 changes: 8 additions & 1 deletion internal/querynodev2/segments/index_attr_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"fmt"
"unsafe"

"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/conc"
Expand Down Expand Up @@ -54,7 +55,7 @@ func NewIndexAttrCache() *IndexAttrCache {
}
}

func (c *IndexAttrCache) GetIndexResourceUsage(indexInfo *querypb.FieldIndexInfo, memoryIndexLoadPredictMemoryUsageFactor float64) (memory uint64, disk uint64, err error) {
func (c *IndexAttrCache) GetIndexResourceUsage(indexInfo *querypb.FieldIndexInfo, memoryIndexLoadPredictMemoryUsageFactor float64, fieldBinlog *datapb.FieldBinlog) (memory uint64, disk uint64, err error) {
indexType, err := funcutil.GetAttrByKeyFromRepeatedKV(common.IndexTypeKey, indexInfo.IndexParams)
if err != nil {
return 0, 0, fmt.Errorf("index type not exist in index params")
Expand All @@ -64,6 +65,12 @@ func (c *IndexAttrCache) GetIndexResourceUsage(indexInfo *querypb.FieldIndexInfo
neededDiskSize := indexInfo.IndexSize - neededMemSize
return uint64(neededMemSize), uint64(neededDiskSize), nil
}
if indexType == indexparamcheck.IndexINVERTED {
neededMemSize := 0
// we will mmap the binlog if the index type is inverted index.
neededDiskSize := indexInfo.IndexSize + getBinlogDataSize(fieldBinlog)
return uint64(neededMemSize), uint64(neededDiskSize), nil
}

engineVersion := indexInfo.GetCurrentIndexVersion()
isLoadWithDisk, has := c.loadWithDisk.Get(typeutil.NewPair(indexType, engineVersion))
Expand Down
31 changes: 26 additions & 5 deletions internal/querynodev2/segments/index_attr_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/stretchr/testify/suite"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
Expand Down Expand Up @@ -51,7 +52,7 @@ func (s *IndexAttrCacheSuite) TestCacheMissing() {
CurrentIndexVersion: 0,
}

_, _, err := s.c.GetIndexResourceUsage(info, paramtable.Get().QueryNodeCfg.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat())
_, _, err := s.c.GetIndexResourceUsage(info, paramtable.Get().QueryNodeCfg.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat(), nil)
s.Require().NoError(err)

_, has := s.c.loadWithDisk.Get(typeutil.NewPair[string, int32]("test", 0))
Expand All @@ -67,7 +68,7 @@ func (s *IndexAttrCacheSuite) TestDiskANN() {
IndexSize: 100,
}

memory, disk, err := s.c.GetIndexResourceUsage(info, paramtable.Get().QueryNodeCfg.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat())
memory, disk, err := s.c.GetIndexResourceUsage(info, paramtable.Get().QueryNodeCfg.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat(), nil)
s.Require().NoError(err)

_, has := s.c.loadWithDisk.Get(typeutil.NewPair[string, int32](indexparamcheck.IndexDISKANN, 0))
Expand All @@ -77,6 +78,26 @@ func (s *IndexAttrCacheSuite) TestDiskANN() {
s.EqualValues(75, disk)
}

func (s *IndexAttrCacheSuite) TestInvertedIndex() {
info := &querypb.FieldIndexInfo{
IndexParams: []*commonpb.KeyValuePair{
{Key: common.IndexTypeKey, Value: indexparamcheck.IndexINVERTED},
},
CurrentIndexVersion: 0,
IndexSize: 50,
}
binlog := &datapb.FieldBinlog{
Binlogs: []*datapb.Binlog{
{LogSize: 60},
},
}

memory, disk, err := s.c.GetIndexResourceUsage(info, paramtable.Get().QueryNodeCfg.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat(), binlog)
s.Require().NoError(err)
s.EqualValues(uint64(0), memory)
s.EqualValues(uint64(110), disk)
}

func (s *IndexAttrCacheSuite) TestLoadWithDisk() {
info := &querypb.FieldIndexInfo{
IndexParams: []*commonpb.KeyValuePair{
Expand All @@ -88,7 +109,7 @@ func (s *IndexAttrCacheSuite) TestLoadWithDisk() {

s.Run("load_with_disk", func() {
s.c.loadWithDisk.Insert(typeutil.NewPair[string, int32]("test", 0), true)
memory, disk, err := s.c.GetIndexResourceUsage(info, paramtable.Get().QueryNodeCfg.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat())
memory, disk, err := s.c.GetIndexResourceUsage(info, paramtable.Get().QueryNodeCfg.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat(), nil)
s.Require().NoError(err)

s.EqualValues(100, memory)
Expand All @@ -97,7 +118,7 @@ func (s *IndexAttrCacheSuite) TestLoadWithDisk() {

s.Run("load_with_disk", func() {
s.c.loadWithDisk.Insert(typeutil.NewPair[string, int32]("test", 0), false)
memory, disk, err := s.c.GetIndexResourceUsage(info, paramtable.Get().QueryNodeCfg.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat())
memory, disk, err := s.c.GetIndexResourceUsage(info, paramtable.Get().QueryNodeCfg.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat(), nil)
s.Require().NoError(err)

s.Equal(uint64(250), memory)
Expand All @@ -109,7 +130,7 @@ func (s *IndexAttrCacheSuite) TestLoadWithDisk() {
IndexParams: []*commonpb.KeyValuePair{},
}

_, _, err := s.c.GetIndexResourceUsage(info, paramtable.Get().QueryNodeCfg.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat())
_, _, err := s.c.GetIndexResourceUsage(info, paramtable.Get().QueryNodeCfg.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat(), nil)
s.Error(err)
})
}
Expand Down
2 changes: 1 addition & 1 deletion internal/querynodev2/segments/segment_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1487,7 +1487,7 @@ func getResourceUsageEstimateOfSegment(schema *schemapb.CollectionSchema, loadIn
var mmapEnabled bool
if fieldIndexInfo, ok := vecFieldID2IndexInfo[fieldID]; ok {
mmapEnabled = isIndexMmapEnable(fieldIndexInfo)
neededMemSize, neededDiskSize, err := getIndexAttrCache().GetIndexResourceUsage(fieldIndexInfo, multiplyFactor.memoryIndexUsageFactor)
neededMemSize, neededDiskSize, err := getIndexAttrCache().GetIndexResourceUsage(fieldIndexInfo, multiplyFactor.memoryIndexUsageFactor, fieldBinlog)
if err != nil {
return nil, errors.Wrapf(err, "failed to get index size collection %d, segment %d, indexBuildID %d",
loadInfo.GetCollectionID(),
Expand Down
1 change: 1 addition & 0 deletions pkg/util/paramtable/component_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -2469,6 +2469,7 @@ Max read concurrency must greater than or equal to 1, and less than or equal to
Doc: "memory usage prediction factor for memory index loaded",
}
p.MemoryIndexLoadPredictMemoryUsageFactor.Init(base.mgr)

p.EnableSegmentPrune = ParamItem{
Key: "queryNode.enableSegmentPrune",
Version: "2.3.4",
Expand Down

0 comments on commit 4eb4df1

Please sign in to comment.