Skip to content

Commit

Permalink
enhance: [10kcp] Reduce mutex contention in datacoord meta (#38229)
Browse files Browse the repository at this point in the history
1. Using secondary index to avoid retrieving all segments at
GetSegmentsChanPart.
2. Perform batch SetAllocations to reduce the number of times the meta
lock is acquired.

issue: #37630

pr: #38219

---------

Signed-off-by: bigsheeper <[email protected]>
  • Loading branch information
bigsheeper authored Dec 5, 2024
1 parent 3219b86 commit d75fb5b
Show file tree
Hide file tree
Showing 7 changed files with 152 additions and 99 deletions.
7 changes: 3 additions & 4 deletions internal/datacoord/compaction_policy_clustering.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,13 @@ func (policy *clusteringCompactionPolicy) triggerOneCollection(ctx context.Conte
return nil, 0, err
}

partSegments := policy.meta.GetSegmentsChanPart(func(segment *SegmentInfo) bool {
return segment.CollectionID == collectionID &&
isSegmentHealthy(segment) &&
partSegments := GetSegmentsChanPart(policy.meta, WithCollection(collectionID), SegmentFilterFunc(func(segment *SegmentInfo) bool {
return isSegmentHealthy(segment) &&
isFlush(segment) &&
!segment.isCompacting && // not compacting now
!segment.GetIsImporting() && // not importing now
segment.GetLevel() != datapb.SegmentLevel_L0 // ignore level zero segments
})
}))

views := make([]CompactionView, 0)
// partSegments is list of chanPartSegments, which is channel-partition organized segments
Expand Down
7 changes: 3 additions & 4 deletions internal/datacoord/compaction_policy_single.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,13 @@ func (policy *singleCompactionPolicy) triggerOneCollection(ctx context.Context,
return nil, 0, err
}

partSegments := policy.meta.GetSegmentsChanPart(func(segment *SegmentInfo) bool {
return segment.CollectionID == collectionID &&
isSegmentHealthy(segment) &&
partSegments := GetSegmentsChanPart(policy.meta, WithCollection(collectionID), SegmentFilterFunc(func(segment *SegmentInfo) bool {
return isSegmentHealthy(segment) &&
isFlush(segment) &&
!segment.isCompacting && // not compacting now
!segment.GetIsImporting() && // not importing now
segment.GetLevel() == datapb.SegmentLevel_L2 // only support L2 for now
})
}))

views := make([]CompactionView, 0)
for _, group := range partSegments {
Expand Down
13 changes: 9 additions & 4 deletions internal/datacoord/compaction_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,15 +307,20 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error {
zap.Int64("signal.collectionID", signal.collectionID),
zap.Int64("signal.partitionID", signal.partitionID),
zap.Int64("signal.segmentID", signal.segmentID))
partSegments := t.meta.GetSegmentsChanPart(func(segment *SegmentInfo) bool {
return (signal.collectionID == 0 || segment.CollectionID == signal.collectionID) &&
isSegmentHealthy(segment) &&
filters := []SegmentFilter{SegmentFilterFunc(func(segment *SegmentInfo) bool {
return isSegmentHealthy(segment) &&
isFlush(segment) &&
!segment.isCompacting && // not compacting now
!segment.GetIsImporting() && // not importing now
segment.GetLevel() != datapb.SegmentLevel_L0 && // ignore level zero segments
segment.GetLevel() != datapb.SegmentLevel_L2 // ignore l2 segment
}) // partSegments is list of chanPartSegments, which is channel-partition organized segments
})} // partSegments is list of chanPartSegments, which is channel-partition organized segments

// get all segments if signal.collection == 0, otherwise get collection segments
if signal.collectionID != 0 {
filters = append(filters, WithCollection(signal.collectionID))
}
partSegments := GetSegmentsChanPart(t.meta, filters...)

if len(partSegments) == 0 {
log.Info("the length of SegmentsChanPart is 0, skip to handle compaction")
Expand Down
182 changes: 110 additions & 72 deletions internal/datacoord/compaction_trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,23 +126,33 @@ func Test_compactionTrigger_force_without_index(t *testing.T) {
},
}

segInfo := &datapb.SegmentInfo{
ID: 1,
CollectionID: collectionID,
PartitionID: 1,
LastExpireTime: 100,
NumOfRows: 100,
MaxRowNum: 300,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
Binlogs: binlogs,
Deltalogs: deltaLogs,
}
m := &meta{
catalog: catalog,
channelCPs: newChannelCps(),
segments: &SegmentsInfo{
segments: map[int64]*SegmentInfo{
1: {
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: collectionID,
PartitionID: 1,
LastExpireTime: 100,
NumOfRows: 100,
MaxRowNum: 300,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
Binlogs: binlogs,
Deltalogs: deltaLogs,
SegmentInfo: segInfo,
},
},
secondaryIndexes: segmentInfoIndexes{
coll2Segments: map[UniqueID]map[UniqueID]*SegmentInfo{
collectionID: {
1: {
SegmentInfo: segInfo,
},
},
},
},
Expand Down Expand Up @@ -215,6 +225,73 @@ func Test_compactionTrigger_force(t *testing.T) {
},
}

seg1 := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: 2,
PartitionID: 1,
LastExpireTime: 100,
NumOfRows: 100,
MaxRowNum: 300,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
Binlogs: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{EntriesNum: 5, LogID: 1},
},
},
},
Deltalogs: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{EntriesNum: 5, LogID: 1},
},
},
},
},
}

seg2 := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 2,
CollectionID: 2,
PartitionID: 1,
LastExpireTime: 100,
NumOfRows: 100,
MaxRowNum: 300,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
Binlogs: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{EntriesNum: 5, LogID: 2},
},
},
},
Deltalogs: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{EntriesNum: 5, LogID: 2},
},
},
},
},
}

seg3 := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 3,
CollectionID: 1111,
PartitionID: 1,
LastExpireTime: 100,
NumOfRows: 100,
MaxRowNum: 300,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
},
}

tests := []struct {
name string
fields fields
Expand All @@ -231,68 +308,18 @@ func Test_compactionTrigger_force(t *testing.T) {
channelCPs: newChannelCps(),
segments: &SegmentsInfo{
segments: map[int64]*SegmentInfo{
1: {
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: 2,
PartitionID: 1,
LastExpireTime: 100,
NumOfRows: 100,
MaxRowNum: 300,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
Binlogs: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{EntriesNum: 5, LogID: 1},
},
},
},
Deltalogs: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{EntriesNum: 5, LogID: 1},
},
},
},
},
},
2: {
SegmentInfo: &datapb.SegmentInfo{
ID: 2,
CollectionID: 2,
PartitionID: 1,
LastExpireTime: 100,
NumOfRows: 100,
MaxRowNum: 300,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
Binlogs: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{EntriesNum: 5, LogID: 2},
},
},
},
Deltalogs: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{EntriesNum: 5, LogID: 2},
},
},
},
1: seg1,
2: seg2,
3: seg3,
},
secondaryIndexes: segmentInfoIndexes{
coll2Segments: map[UniqueID]map[UniqueID]*SegmentInfo{
2: {
seg1.GetID(): seg1,
seg2.GetID(): seg2,
},
},
3: {
SegmentInfo: &datapb.SegmentInfo{
ID: 3,
CollectionID: 1111,
PartitionID: 1,
LastExpireTime: 100,
NumOfRows: 100,
MaxRowNum: 300,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
1111: {
seg3.GetID(): seg3,
},
},
},
Expand Down Expand Up @@ -605,7 +632,13 @@ func Test_compactionTrigger_force(t *testing.T) {
t.Run(tt.name+" with DiskANN index", func(t *testing.T) {
for _, segment := range tt.fields.meta.segments.GetSegments() {
// Collection 1000 means it has DiskANN index
delete(tt.fields.meta.segments.secondaryIndexes.coll2Segments[segment.GetCollectionID()], segment.GetID())
segment.CollectionID = 1000
_, ok := tt.fields.meta.segments.secondaryIndexes.coll2Segments[segment.GetCollectionID()]
if !ok {
tt.fields.meta.segments.secondaryIndexes.coll2Segments[segment.GetCollectionID()] = make(map[UniqueID]*SegmentInfo)
}
tt.fields.meta.segments.secondaryIndexes.coll2Segments[segment.GetCollectionID()][segment.GetID()] = segment
}
tr := &compactionTrigger{
meta: tt.fields.meta,
Expand Down Expand Up @@ -706,6 +739,9 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) {
vecFieldID := int64(201)
segmentInfos := &SegmentsInfo{
segments: make(map[UniqueID]*SegmentInfo),
secondaryIndexes: segmentInfoIndexes{
coll2Segments: make(map[UniqueID]map[UniqueID]*SegmentInfo),
},
}

indexMeta := newSegmentIndexMeta(nil)
Expand All @@ -732,6 +768,7 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) {
},
}

segmentInfos.secondaryIndexes.coll2Segments[2] = make(map[UniqueID]*SegmentInfo)
for i := UniqueID(0); i < 50; i++ {
info := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
Expand Down Expand Up @@ -773,6 +810,7 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) {
})

segmentInfos.segments[i] = info
segmentInfos.secondaryIndexes.coll2Segments[2][i] = info
}

tests := []struct {
Expand Down
34 changes: 22 additions & 12 deletions internal/datacoord/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,28 +336,28 @@ func (m *meta) GetClonedCollectionInfo(collectionID UniqueID) *collectionInfo {
}

// GetSegmentsChanPart returns segments organized in Channel-Partition dimension with selector applied
func (m *meta) GetSegmentsChanPart(selector SegmentInfoSelector) []*chanPartSegments {
m.RLock()
defer m.RUnlock()
mDimEntry := make(map[string]*chanPartSegments)
// TODO: Move this function to the compaction module after reorganizing the DataCoord modules.
func GetSegmentsChanPart(m *meta, filters ...SegmentFilter) []*chanPartSegments {
type dim struct {
partitionID int64
channelName string
}

log.Debug("GetSegmentsChanPart segment number", zap.Int("length", len(m.segments.GetSegments())))
for _, segmentInfo := range m.segments.segments {
if !selector(segmentInfo) {
continue
}
mDimEntry := make(map[dim]*chanPartSegments)

candidates := m.SelectSegments(filters...)
for _, segmentInfo := range candidates {
cloned := segmentInfo.Clone()

dim := fmt.Sprintf("%d-%s", cloned.PartitionID, cloned.InsertChannel)
entry, ok := mDimEntry[dim]
d := dim{cloned.PartitionID, cloned.InsertChannel}
entry, ok := mDimEntry[d]
if !ok {
entry = &chanPartSegments{
collectionID: cloned.CollectionID,
partitionID: cloned.PartitionID,
channelName: cloned.InsertChannel,
}
mDimEntry[dim] = entry
mDimEntry[d] = entry
}
entry.segments = append(entry.segments, cloned)
}
Expand Down Expand Up @@ -1295,6 +1295,16 @@ func (m *meta) SetAllocations(segmentID UniqueID, allocations []*Allocation) {
m.segments.SetAllocations(segmentID, allocations)
}

// SetSegmentsAllocations set Segments allocations, will overwrite ALL original allocations
// Note that allocations is not persisted in KV store
func (m *meta) SetSegmentsAllocations(segmentsAllocations map[int64][]*Allocation) {
m.Lock()
defer m.Unlock()
for segmentID, allocations := range segmentsAllocations {
m.segments.SetAllocations(segmentID, allocations)
}
}

// SetCurrentRows set current row count for segment with provided `segmentID`
// Note that currRows is not persisted in KV store
func (m *meta) SetCurrentRows(segmentID UniqueID, rows int64) {
Expand Down
4 changes: 2 additions & 2 deletions internal/datacoord/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,7 @@ func TestMeta_Basic(t *testing.T) {
})

t.Run("Test GetSegmentsChanPart", func(t *testing.T) {
result := meta.GetSegmentsChanPart(func(*SegmentInfo) bool { return true })
result := GetSegmentsChanPart(meta, SegmentFilterFunc(func(segment *SegmentInfo) bool { return true }))
assert.Equal(t, 2, len(result))
for _, entry := range result {
assert.Equal(t, "c1", entry.channelName)
Expand All @@ -682,7 +682,7 @@ func TestMeta_Basic(t *testing.T) {
assert.Equal(t, 1, len(entry.segments))
}
}
result = meta.GetSegmentsChanPart(func(seg *SegmentInfo) bool { return seg.GetCollectionID() == 10 })
result = GetSegmentsChanPart(meta, WithCollection(10))
assert.Equal(t, 0, len(result))
})

Expand Down
4 changes: 3 additions & 1 deletion internal/datacoord/segment_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,7 @@ func (s *SegmentManager) ExpireAllocations(channel string, ts Timestamp) {
return
}

segmentsAllocations := make(map[int64][]*Allocation)
growing.Range(func(id int64) bool {
segment := s.meta.GetHealthySegment(id)
if segment == nil {
Expand All @@ -546,9 +547,10 @@ func (s *SegmentManager) ExpireAllocations(channel string, ts Timestamp) {
allocations = append(allocations, segment.allocations[i])
}
}
s.meta.SetAllocations(segment.GetID(), allocations)
segmentsAllocations[id] = allocations
return true
})
s.meta.SetSegmentsAllocations(segmentsAllocations)
}

func (s *SegmentManager) cleanupSealedSegment(ts Timestamp, channel string) {
Expand Down

0 comments on commit d75fb5b

Please sign in to comment.