Skip to content

Commit

Permalink
enhance: Optimize DescribeIndex to reduce lock contention (milvus-io#…
Browse files Browse the repository at this point in the history
…30939)

issue: milvus-io#29313 
issue: milvus-io#30443

Signed-off-by: Cai Zhang <[email protected]>
  • Loading branch information
xiaocai2333 authored Mar 3, 2024
1 parent e39f46a commit f6ff258
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 38 deletions.
100 changes: 63 additions & 37 deletions internal/datacoord/index_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,9 +365,12 @@ func (s *Server) GetIndexState(ctx context.Context, req *indexpb.GetIndexStateRe
}

indexInfo := &indexpb.IndexInfo{}
s.completeIndexInfo(indexInfo, indexes[0], s.meta.SelectSegments(func(info *SegmentInfo) bool {
return isFlush(info) && info.CollectionID == req.GetCollectionID()
}), false, indexes[0].CreateTime)
// The total rows of all indexes should be based on the current perspective
segments := s.meta.SelectSegmentIndexes(func(info *SegmentInfo) bool {
return info.GetCollectionID() == req.GetCollectionID() && (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped)
})

s.completeIndexInfo(indexInfo, indexes[0], segments, false, indexes[0].CreateTime)
ret.State = indexInfo.State
ret.FailReason = indexInfo.IndexStateFailReason

Expand Down Expand Up @@ -415,35 +418,38 @@ func (s *Server) GetSegmentIndexState(ctx context.Context, req *indexpb.GetSegme
return ret, nil
}

func (s *Server) countIndexedRows(indexInfo *indexpb.IndexInfo, segments []*SegmentInfo) int64 {
func (s *Server) countIndexedRows(indexInfo *indexpb.IndexInfo, segments map[int64]*indexStats) int64 {
unIndexed, indexed := typeutil.NewSet[int64](), typeutil.NewSet[int64]()
for _, seg := range segments {
segIdx, ok := seg.segmentIndexes[indexInfo.IndexID]
for segID, seg := range segments {
if seg.state != commonpb.SegmentState_Flushed && seg.state != commonpb.SegmentState_Flushing {
continue
}
segIdx, ok := seg.indexStates[indexInfo.IndexID]
if !ok {
unIndexed.Insert(seg.GetID())
unIndexed.Insert(segID)
continue
}
switch segIdx.IndexState {
switch segIdx.GetState() {
case commonpb.IndexState_Finished:
indexed.Insert(seg.GetID())
indexed.Insert(segID)
default:
unIndexed.Insert(seg.GetID())
unIndexed.Insert(segID)
}
}
retrieveContinue := len(unIndexed) != 0
for retrieveContinue {
for segID := range unIndexed {
unIndexed.Remove(segID)
segment := s.meta.GetSegment(segID)
if segment == nil || len(segment.CompactionFrom) == 0 {
segment := segments[segID]
if segment == nil || len(segment.compactionFrom) == 0 {
continue
}
for _, fromID := range segment.CompactionFrom {
fromSeg := s.meta.GetSegment(fromID)
for _, fromID := range segment.compactionFrom {
fromSeg := segments[fromID]
if fromSeg == nil {
continue
}
if segIndex, ok := fromSeg.segmentIndexes[indexInfo.IndexID]; ok && segIndex.IndexState == commonpb.IndexState_Finished {
if segIndex, ok := fromSeg.indexStates[indexInfo.IndexID]; ok && segIndex.GetState() == commonpb.IndexState_Finished {
indexed.Insert(fromID)
continue
}
Expand All @@ -454,9 +460,9 @@ func (s *Server) countIndexedRows(indexInfo *indexpb.IndexInfo, segments []*Segm
}
indexedRows := int64(0)
for segID := range indexed {
segment := s.meta.GetSegment(segID)
segment := segments[segID]
if segment != nil {
indexedRows += segment.GetNumOfRows()
indexedRows += segment.numRows
}
}
return indexedRows
Expand All @@ -465,7 +471,7 @@ func (s *Server) countIndexedRows(indexInfo *indexpb.IndexInfo, segments []*Segm
// completeIndexInfo get the index row count and index task state
// if realTime, calculate current statistics
// if not realTime, which means get info of the prior `CreateIndex` action, skip segments created after index's create time
func (s *Server) completeIndexInfo(indexInfo *indexpb.IndexInfo, index *model.Index, segments []*SegmentInfo, realTime bool, ts Timestamp) {
func (s *Server) completeIndexInfo(indexInfo *indexpb.IndexInfo, index *model.Index, segments map[int64]*indexStats, realTime bool, ts Timestamp) {
var (
cntNone = 0
cntUnissued = 0
Expand All @@ -478,42 +484,45 @@ func (s *Server) completeIndexInfo(indexInfo *indexpb.IndexInfo, index *model.In
pendingIndexRows = int64(0)
)

for _, seg := range segments {
totalRows += seg.NumOfRows
segIdx, ok := seg.segmentIndexes[index.IndexID]
for segID, seg := range segments {
if seg.state != commonpb.SegmentState_Flushed && seg.state != commonpb.SegmentState_Flushing {
continue
}
totalRows += seg.numRows
segIdx, ok := seg.indexStates[index.IndexID]

if !ok {
if seg.GetLastExpireTime() <= ts {
if seg.lastExpireTime <= ts {
cntUnissued++
}
pendingIndexRows += seg.GetNumOfRows()
pendingIndexRows += seg.numRows
continue
}
if segIdx.IndexState != commonpb.IndexState_Finished {
pendingIndexRows += seg.GetNumOfRows()
if segIdx.GetState() != commonpb.IndexState_Finished {
pendingIndexRows += seg.numRows
}

// if realTime, calculate current statistics
// if not realTime, skip segments created after index create
if !realTime && seg.GetLastExpireTime() > ts {
if !realTime && seg.lastExpireTime > ts {
continue
}

switch segIdx.IndexState {
switch segIdx.GetState() {
case commonpb.IndexState_IndexStateNone:
// can't to here
log.Warn("receive unexpected index state: IndexStateNone", zap.Int64("segmentID", segIdx.SegmentID))
log.Warn("receive unexpected index state: IndexStateNone", zap.Int64("segmentID", segID))
cntNone++
case commonpb.IndexState_Unissued:
cntUnissued++
case commonpb.IndexState_InProgress:
cntInProgress++
case commonpb.IndexState_Finished:
cntFinished++
indexedRows += seg.NumOfRows
indexedRows += seg.numRows
case commonpb.IndexState_Failed:
cntFailed++
failReason += fmt.Sprintf("%d: %s;", segIdx.SegmentID, segIdx.FailReason)
failReason += fmt.Sprintf("%d: %s;", segID, segIdx.FailReason)
}
}

Expand Down Expand Up @@ -581,9 +590,13 @@ func (s *Server) GetIndexBuildProgress(ctx context.Context, req *indexpb.GetInde
PendingIndexRows: 0,
State: 0,
}
s.completeIndexInfo(indexInfo, indexes[0], s.meta.SelectSegments(func(info *SegmentInfo) bool {
return isFlush(info) && info.CollectionID == req.GetCollectionID()
}), false, indexes[0].CreateTime)

// The total rows of all indexes should be based on the current perspective
segments := s.meta.SelectSegmentIndexes(func(info *SegmentInfo) bool {
return info.GetCollectionID() == req.GetCollectionID() && (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped)
})

s.completeIndexInfo(indexInfo, indexes[0], segments, false, indexes[0].CreateTime)
log.Info("GetIndexBuildProgress success", zap.Int64("collectionID", req.GetCollectionID()),
zap.String("indexName", req.GetIndexName()))
return &indexpb.GetIndexBuildProgressResponse{
Expand All @@ -594,6 +607,17 @@ func (s *Server) GetIndexBuildProgress(ctx context.Context, req *indexpb.GetInde
}, nil
}

// indexStats just for indexing statistics.
// Please use it judiciously.
type indexStats struct {
ID int64
numRows int64
compactionFrom []int64
indexStates map[int64]*indexpb.SegmentIndexState
state commonpb.SegmentState
lastExpireTime uint64
}

// DescribeIndex describe the index info of the collection.
func (s *Server) DescribeIndex(ctx context.Context, req *indexpb.DescribeIndexRequest) (*indexpb.DescribeIndexResponse, error) {
log := log.Ctx(ctx).With(
Expand Down Expand Up @@ -621,9 +645,10 @@ func (s *Server) DescribeIndex(ctx context.Context, req *indexpb.DescribeIndexRe
}

// The total rows of all indexes should be based on the current perspective
segments := s.meta.SelectSegments(func(info *SegmentInfo) bool {
return isFlush(info) && info.CollectionID == req.GetCollectionID()
segments := s.meta.SelectSegmentIndexes(func(info *SegmentInfo) bool {
return info.GetCollectionID() == req.GetCollectionID() && (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped)
})

indexInfos := make([]*indexpb.IndexInfo, 0)
for _, index := range indexes {
indexInfo := &indexpb.IndexInfo{
Expand Down Expand Up @@ -679,9 +704,10 @@ func (s *Server) GetIndexStatistics(ctx context.Context, req *indexpb.GetIndexSt
}

// The total rows of all indexes should be based on the current perspective
segments := s.meta.SelectSegments(func(info *SegmentInfo) bool {
return isFlush(info) && info.CollectionID == req.GetCollectionID()
segments := s.meta.SelectSegmentIndexes(func(info *SegmentInfo) bool {
return info.GetCollectionID() == req.GetCollectionID() && (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped)
})

indexInfos := make([]*indexpb.IndexInfo, 0)
for _, index := range indexes {
indexInfo := &indexpb.IndexInfo{
Expand Down
2 changes: 1 addition & 1 deletion internal/datacoord/index_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1371,7 +1371,7 @@ func TestServer_DescribeIndex(t *testing.T) {
},
segID - 1: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
ID: segID - 1,
CollectionID: collID,
PartitionID: partID,
NumOfRows: 10000,
Expand Down
28 changes: 28 additions & 0 deletions internal/datacoord/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/segmentutil"
"github.com/milvus-io/milvus/pkg/common"
Expand Down Expand Up @@ -1050,6 +1051,33 @@ func (m *meta) SelectSegments(selector SegmentInfoSelector) []*SegmentInfo {
return ret
}

func (m *meta) SelectSegmentIndexes(selector SegmentInfoSelector) map[int64]*indexStats {
m.RLock()
defer m.RUnlock()
ret := make(map[int64]*indexStats)
for _, info := range m.segments.segments {
if selector(info) {
s := &indexStats{
ID: info.GetID(),
numRows: info.GetNumOfRows(),
compactionFrom: info.GetCompactionFrom(),
indexStates: make(map[int64]*indexpb.SegmentIndexState),
state: info.GetState(),
lastExpireTime: info.GetLastExpireTime(),
}
for indexID, segIndex := range info.segmentIndexes {
s.indexStates[indexID] = &indexpb.SegmentIndexState{
SegmentID: segIndex.SegmentID,
State: segIndex.IndexState,
FailReason: segIndex.FailReason,
}
}
ret[info.GetID()] = s
}
}
return ret
}

// AddAllocation add allocation in segment
func (m *meta) AddAllocation(segmentID UniqueID, allocation *Allocation) error {
log.Debug("meta update: add allocation",
Expand Down

0 comments on commit f6ff258

Please sign in to comment.