Skip to content

Commit

Permalink
enhance: Periodically synchronize segments to datanode watcher (#33420)
Browse files Browse the repository at this point in the history
issue: #32809

---------

Signed-off-by: Cai Zhang <[email protected]>
  • Loading branch information
xiaocai2333 authored May 30, 2024
1 parent 589d4df commit 7763718
Show file tree
Hide file tree
Showing 19 changed files with 827 additions and 171 deletions.
1 change: 1 addition & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ dataCoord:
serverMaxRecvSize: 268435456
clientMaxSendSize: 268435456
clientMaxRecvSize: 536870912
syncSegmentsInterval: 300

dataNode:
dataSync:
Expand Down
32 changes: 4 additions & 28 deletions internal/datacoord/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,18 +487,7 @@ func (c *compactionPlanHandler) handleMergeCompactionResult(plan *datapb.Compact
// Apply metrics after successful meta update.
metricMutation.commit()
}

nodeID := c.plans[plan.GetPlanID()].dataNodeID
req := &datapb.SyncSegmentsRequest{
PlanID: plan.PlanID,
}

log.Info("handleCompactionResult: syncing segments with node", zap.Int64("nodeID", nodeID))
if err := c.sessions.SyncSegments(nodeID, req); err != nil {
log.Warn("handleCompactionResult: fail to sync segments with node",
zap.Int64("nodeID", nodeID), zap.Error(err))
return err
}
// TODO @xiaocai2333: drop compaction plan on datanode

log.Info("handleCompactionResult: success to handle merge compaction result")
return nil
Expand Down Expand Up @@ -546,13 +535,8 @@ func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error {
// task.dataNodeID not match with channel
// Mark this compaction as failure and skip processing the meta
if !c.chManager.Match(task.dataNodeID, task.plan.GetChannel()) {
// Sync segments without CompactionFrom segmentsIDs to make sure DN clear the task
// without changing the meta
// TODO @xiaocai2333: drop compaction plan on datanode
log.Warn("compaction failed for channel nodeID not match")
if err := c.sessions.SyncSegments(task.dataNodeID, &datapb.SyncSegmentsRequest{PlanID: planID}); err != nil {
log.Warn("compaction failed to sync segments with node", zap.Error(err))
continue
}
c.plans[planID] = c.plans[planID].shadowClone(setState(failed), endSpan())
c.setSegmentsCompacting(task.plan, false)
c.scheduler.Finish(task.dataNodeID, task.plan)
Expand Down Expand Up @@ -617,16 +601,8 @@ func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error {
if nodeUnkonwnPlan, ok := completedPlans[planID]; ok {
nodeID, plan := nodeUnkonwnPlan.A, nodeUnkonwnPlan.B
log := log.With(zap.Int64("planID", planID), zap.Int64("nodeID", nodeID), zap.String("channel", plan.GetChannel()))

// Sync segments without CompactionFrom segmentsIDs to make sure DN clear the task
// without changing the meta
log.Info("compaction syncing unknown plan with node")
if err := c.sessions.SyncSegments(nodeID, &datapb.SyncSegmentsRequest{
PlanID: planID,
}); err != nil {
log.Warn("compaction failed to sync segments with node", zap.Error(err))
return err
}
// TODO @xiaocai2333: drop compaction plan on datanode
log.Info("drop unknown plan with node")
}
}

Expand Down
14 changes: 1 addition & 13 deletions internal/datacoord/compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ func (s *CompactionPlanHandlerSuite) TestCheckResult() {
4: {A: 100, B: &datapb.CompactionPlanResult{PlanID: 4, State: commonpb.CompactionState_Executing}},
}, nil)

s.mockSessMgr.EXPECT().SyncSegments(int64(100), mock.Anything).Return(nil).Once()
{
s.mockAlloc.EXPECT().allocTimestamp(mock.Anything).Return(0, errors.New("mock")).Once()
handler := newCompactionPlanHandler(nil, s.mockSessMgr, nil, nil, s.mockAlloc)
Expand Down Expand Up @@ -475,7 +474,6 @@ func (s *CompactionPlanHandlerSuite) TestHandleMergeCompactionResult() {
}
return nil
}).Once()
s.mockSessMgr.EXPECT().SyncSegments(mock.Anything, mock.Anything).Return(nil).Once()

handler := newCompactionPlanHandler(nil, s.mockSessMgr, s.mockCm, s.mockMeta, s.mockAlloc)
handler.plans[plan.PlanID] = &compactionTask{dataNodeID: 111, plan: plan}
Expand Down Expand Up @@ -517,7 +515,6 @@ func (s *CompactionPlanHandlerSuite) TestHandleMergeCompactionResult() {
s.mockMeta.EXPECT().CompleteCompactionMutation(mock.Anything, mock.Anything).Return(
[]*SegmentInfo{segment},
&segMetricMutation{}, nil).Once()
s.mockSessMgr.EXPECT().SyncSegments(mock.Anything, mock.Anything).Return(errors.New("mock error")).Once()

handler := newCompactionPlanHandler(nil, s.mockSessMgr, s.mockCm, s.mockMeta, s.mockAlloc)
handler.plans[plan.PlanID] = &compactionTask{dataNodeID: 111, plan: plan}
Expand All @@ -529,7 +526,7 @@ func (s *CompactionPlanHandlerSuite) TestHandleMergeCompactionResult() {
}

err := handler.handleMergeCompactionResult(plan, compactionResult)
s.Error(err)
s.NoError(err)
})
}

Expand All @@ -549,7 +546,6 @@ func (s *CompactionPlanHandlerSuite) TestCompleteCompaction() {
})

s.Run("test complete merge compaction task", func() {
s.mockSessMgr.EXPECT().SyncSegments(mock.Anything, mock.Anything).Return(nil).Once()
// mock for handleMergeCompactionResult
s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).Return(nil).Once()
segment := NewSegmentInfo(&datapb.SegmentInfo{ID: 100})
Expand Down Expand Up @@ -702,14 +698,6 @@ func (s *CompactionPlanHandlerSuite) TestUpdateCompaction() {
},
}

s.mockSessMgr.EXPECT().SyncSegments(int64(222), mock.Anything).RunAndReturn(func(nodeID int64, req *datapb.SyncSegmentsRequest) error {
s.EqualValues(nodeID, 222)
s.NotNil(req)
s.Empty(req.GetCompactedFrom())
s.EqualValues(5, req.GetPlanID())
return nil
}).Once()
s.mockSessMgr.EXPECT().SyncSegments(int64(111), mock.Anything).Return(nil)
s.mockCm.EXPECT().Match(int64(111), "ch-1").Return(true)
s.mockCm.EXPECT().Match(int64(111), "ch-2").Return(false).Once()

Expand Down
7 changes: 7 additions & 0 deletions internal/datacoord/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -1575,3 +1575,10 @@ func updateSegStateAndPrepareMetrics(segToUpdate *SegmentInfo, targetState commo
metricMutation.append(segToUpdate.GetState(), targetState, segToUpdate.GetLevel(), segToUpdate.GetNumOfRows())
segToUpdate.State = targetState
}

func (m *meta) ListCollections() []int64 {
m.RLock()
defer m.RUnlock()

return lo.Keys(m.collections)
}
1 change: 1 addition & 0 deletions internal/datacoord/segment_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func SetMaxRowCount(maxRow int64) SegmentOperator {
type segmentCriterion struct {
collectionID int64
channel string
partitionID int64
others []SegmentFilter
}

Expand Down
5 changes: 5 additions & 0 deletions internal/datacoord/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ type Server struct {
compactionTrigger trigger
compactionHandler compactionPlanContext
compactionViewManager *CompactionViewManager
syncSegmentsScheduler *SyncSegmentsScheduler

metricsCacheManager *metricsinfo.MetricsCacheManager

Expand Down Expand Up @@ -393,6 +394,8 @@ func (s *Server) initDataCoord() error {
s.importScheduler = NewImportScheduler(s.meta, s.cluster, s.allocator, s.importMeta, s.buildIndexCh)
s.importChecker = NewImportChecker(s.meta, s.broker, s.cluster, s.allocator, s.segmentManager, s.importMeta)

s.syncSegmentsScheduler = newSyncSegmentsScheduler(s.meta, s.channelManager, s.sessionManager)

s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)

log.Info("init datacoord done", zap.Int64("nodeID", paramtable.GetNodeID()), zap.String("Address", s.address))
Expand Down Expand Up @@ -712,6 +715,7 @@ func (s *Server) startServerLoop() {
go s.importScheduler.Start()
go s.importChecker.Start()
s.garbageCollector.start()
s.syncSegmentsScheduler.Start()
}

// startDataNodeTtLoop start a goroutine to recv data node tt msg from msgstream
Expand Down Expand Up @@ -1104,6 +1108,7 @@ func (s *Server) Stop() error {

s.importScheduler.Close()
s.importChecker.Close()
s.syncSegmentsScheduler.Stop()

if Params.DataCoordCfg.EnableCompaction.GetAsBool() {
s.stopCompactionTrigger()
Expand Down
149 changes: 149 additions & 0 deletions internal/datacoord/sync_segments_scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package datacoord

import (
"sync"
"time"

"github.com/samber/lo"
"go.uber.org/zap"

"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/logutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

type SyncSegmentsScheduler struct {
quit chan struct{}
wg sync.WaitGroup

meta *meta
channelManager ChannelManager
sessions SessionManager
}

func newSyncSegmentsScheduler(m *meta, channelManager ChannelManager, sessions SessionManager) *SyncSegmentsScheduler {
return &SyncSegmentsScheduler{
quit: make(chan struct{}),
wg: sync.WaitGroup{},
meta: m,
channelManager: channelManager,
sessions: sessions,
}
}

func (sss *SyncSegmentsScheduler) Start() {
sss.quit = make(chan struct{})
sss.wg.Add(1)

go func() {
defer logutil.LogPanic()
ticker := time.NewTicker(Params.DataCoordCfg.SyncSegmentsInterval.GetAsDuration(time.Second))
defer sss.wg.Done()

for {
select {
case <-sss.quit:
log.Info("sync segments scheduler quit")
ticker.Stop()
return
case <-ticker.C:
sss.SyncSegmentsForCollections()
}
}
}()
log.Info("SyncSegmentsScheduler started...")
}

func (sss *SyncSegmentsScheduler) Stop() {
close(sss.quit)
sss.wg.Wait()
}

func (sss *SyncSegmentsScheduler) SyncSegmentsForCollections() {
collIDs := sss.meta.ListCollections()
for _, collID := range collIDs {
collInfo := sss.meta.GetCollection(collID)
if collInfo == nil {
log.Warn("collection info is nil, skip it", zap.Int64("collectionID", collID))
continue
}
pkField, err := typeutil.GetPrimaryFieldSchema(collInfo.Schema)
if err != nil {
log.Warn("get primary field from schema failed", zap.Int64("collectionID", collID),
zap.Error(err))
continue
}
for _, channelName := range collInfo.VChannelNames {
nodeID, err := sss.channelManager.FindWatcher(channelName)
if err != nil {
log.Warn("find watcher for channel failed", zap.Int64("collectionID", collID),
zap.String("channelName", channelName), zap.Error(err))
continue
}
for _, partitionID := range collInfo.Partitions {
if err := sss.SyncSegments(collID, partitionID, channelName, nodeID, pkField.GetFieldID()); err != nil {
log.Warn("sync segment with channel failed, retry next ticker",
zap.Int64("collectionID", collID),
zap.Int64("partitionID", partitionID),
zap.String("channel", channelName),
zap.Error(err))
continue
}
}
}
}
}

func (sss *SyncSegmentsScheduler) SyncSegments(collectionID, partitionID int64, channelName string, nodeID, pkFieldID int64) error {
log := log.With(zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID),
zap.String("channelName", channelName), zap.Int64("nodeID", nodeID))
segments := sss.meta.SelectSegments(WithChannel(channelName), SegmentFilterFunc(func(info *SegmentInfo) bool {
return info.GetPartitionID() == partitionID && isSegmentHealthy(info)
}))
req := &datapb.SyncSegmentsRequest{
ChannelName: channelName,
PartitionId: partitionID,
CollectionId: collectionID,
SegmentInfos: make(map[int64]*datapb.SyncSegmentInfo),
}

for _, seg := range segments {
for _, statsLog := range seg.GetStatslogs() {
if statsLog.GetFieldID() == pkFieldID {
req.SegmentInfos[seg.ID] = &datapb.SyncSegmentInfo{
SegmentId: seg.GetID(),
PkStatsLog: statsLog,
State: seg.GetState(),
Level: seg.GetLevel(),
NumOfRows: seg.GetNumOfRows(),
}
}
}
}

if err := sss.sessions.SyncSegments(nodeID, req); err != nil {
log.Warn("fail to sync segments with node", zap.Error(err))
return err
}
log.Info("sync segments success", zap.Int64s("segments", lo.Map(segments, func(t *SegmentInfo, i int) int64 {
return t.GetID()
})))
return nil
}
Loading

0 comments on commit 7763718

Please sign in to comment.