Skip to content

Commit

Permalink
Deliver L0 segments delete records (#27722)
Browse files Browse the repository at this point in the history
Signed-off-by: yah01 <[email protected]>
  • Loading branch information
yah01 authored Nov 6, 2023
1 parent b1df3ea commit ece592a
Show file tree
Hide file tree
Showing 27 changed files with 1,006 additions and 431 deletions.
1 change: 1 addition & 0 deletions internal/proto/query_coord.proto
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ message SegmentLoadInfo {
msg.MsgPosition start_position = 14;
msg.MsgPosition delta_position = 15;
int64 readableVersion = 16;
data.SegmentLevel level = 17;
}

message FieldIndexInfo {
Expand Down
614 changes: 312 additions & 302 deletions internal/proto/querypb/query_coord.pb.go

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions internal/querycoordv2/task/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,14 +381,14 @@ func (ex *Executor) releaseSegment(task *SegmentTask, step int) {
func (ex *Executor) executeDmChannelAction(task *ChannelTask, step int) {
switch task.Actions()[step].Type() {
case ActionTypeGrow:
ex.subDmChannel(task, step)
ex.subscribeChannel(task, step)

case ActionTypeReduce:
ex.unsubDmChannel(task, step)
ex.unsubscribeChannel(task, step)
}
}

func (ex *Executor) subDmChannel(task *ChannelTask, step int) error {
func (ex *Executor) subscribeChannel(task *ChannelTask, step int) error {
defer ex.removeTask(task, step)
startTs := time.Now()
action := task.Actions()[step].(*ChannelAction)
Expand Down Expand Up @@ -478,7 +478,7 @@ func (ex *Executor) subDmChannel(task *ChannelTask, step int) error {
return nil
}

func (ex *Executor) unsubDmChannel(task *ChannelTask, step int) error {
func (ex *Executor) unsubscribeChannel(task *ChannelTask, step int) error {
defer ex.removeTask(task, step)
startTs := time.Now()
action := task.Actions()[step].(*ChannelAction)
Expand Down
80 changes: 71 additions & 9 deletions internal/querynodev2/delegator/delegator_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/proto/segcorepb"
"github.com/milvus-io/milvus/internal/querynodev2/cluster"
Expand Down Expand Up @@ -85,8 +86,18 @@ func (sd *shardDelegator) ProcessInsert(insertRecords map[int64]*InsertData) {
growing := sd.segmentManager.GetGrowing(segmentID)
if growing == nil {
var err error
growing, err = segments.NewSegment(sd.collection, segmentID, insertData.PartitionID, sd.collectionID, sd.vchannelName,
segments.SegmentTypeGrowing, 0, insertData.StartPosition, insertData.StartPosition)
growing, err = segments.NewSegment(
sd.collection,
segmentID,
insertData.PartitionID,
sd.collectionID,
sd.vchannelName,
segments.SegmentTypeGrowing,
0,
insertData.StartPosition,
insertData.StartPosition,
datapb.SegmentLevel_Legacy,
)
if err != nil {
log.Error("failed to create new segment",
zap.Int64("segmentID", segmentID),
Expand Down Expand Up @@ -311,6 +322,23 @@ func (sd *shardDelegator) LoadGrowing(ctx context.Context, infos []*querypb.Segm
return err
}

deletedPks, deletedTss := sd.segmentManager.GetL0DeleteRecords()
for _, segment := range loaded {
err = segment.Delete(deletedPks, deletedTss)
if err != nil {
log.Warn("failed to forward L0 deletions to growing segment",
zap.Int64("segmentID", segment.ID()),
zap.Error(err),
)

// clear loaded growing segments
for _, segment := range loaded {
segment.Release()
}
return err
}
}

segmentIDs = lo.Map(loaded, func(segment segments.Segment, _ int) int64 { return segment.ID() })
log.Info("load growing segments done", zap.Int64s("segmentIDs", segmentIDs))

Expand All @@ -331,6 +359,10 @@ func (sd *shardDelegator) LoadGrowing(ctx context.Context, infos []*querypb.Segm

// LoadSegments load segments local or remotely depends on the target node.
func (sd *shardDelegator) LoadSegments(ctx context.Context, req *querypb.LoadSegmentsRequest) error {
if len(req.GetInfos()) == 0 {
return nil
}

log := sd.getLogger(ctx)

targetNodeID := req.GetDstNodeID()
Expand Down Expand Up @@ -396,8 +428,9 @@ func (sd *shardDelegator) LoadSegments(ctx context.Context, req *querypb.LoadSeg
}
log.Debug("work loads segments done")

// load index need no stream delete and distribution change
if req.GetLoadScope() == querypb.LoadScope_Index {
// load index and L0 segment need no stream delete and distribution change
if req.GetLoadScope() == querypb.LoadScope_Index ||
req.GetInfos()[0].GetLevel() == datapb.SegmentLevel_L0 {
return nil
}

Expand Down Expand Up @@ -433,11 +466,16 @@ func (sd *shardDelegator) loadStreamDelete(ctx context.Context,
return candidate.ID(), candidate
})

level0DeletePks, level0DeleteTss := sd.segmentManager.GetL0DeleteRecords()

sd.deleteMut.Lock()
defer sd.deleteMut.Unlock()
// apply buffered delete for new segments
// no goroutines here since qnv2 has no load merging logic
for _, info := range infos {
log := log.With(
zap.Int64("segmentID", info.GetSegmentID()),
)
candidate := idCandidates[info.GetSegmentID()]
position := info.GetDeltaPosition()
if position == nil { // for compatibility of rolling upgrade from 2.2.x to 2.3
Expand All @@ -450,16 +488,42 @@ func (sd *shardDelegator) loadStreamDelete(ctx context.Context,
}

deleteData := &storage.DeleteData{}
for i, pk := range level0DeletePks {
if candidate.MayPkExist(pk) {
deleteData.Append(pk, level0DeleteTss[i])
}
}

if deleteData.RowCount > 0 {
log.Info("forward L0 delete to worker...",
zap.Int64("deleteRowNum", deleteData.RowCount),
)
err := worker.Delete(ctx, &querypb.DeleteRequest{
Base: commonpbutil.NewMsgBase(commonpbutil.WithTargetID(targetNodeID)),
CollectionId: info.GetCollectionID(),
PartitionId: info.GetPartitionID(),
SegmentId: info.GetSegmentID(),
PrimaryKeys: storage.ParsePrimaryKeys2IDs(deleteData.Pks),
Timestamps: deleteData.Tss,
})
if err != nil {
log.Warn("failed to apply delete when LoadSegment", zap.Error(err))
return err
}
}

deleteData = &storage.DeleteData{}
// start position is dml position for segment
// if this position is before deleteBuffer's safe ts, it means some delete shall be read from msgstream
if position.GetTimestamp() < sd.deleteBuffer.SafeTs() {
log.Info("load delete from stream...")
var err error
deleteData, err = sd.readDeleteFromMsgstream(ctx, position, sd.deleteBuffer.SafeTs(), candidate)
streamDeleteData, err := sd.readDeleteFromMsgstream(ctx, position, sd.deleteBuffer.SafeTs(), candidate)
if err != nil {
log.Warn("failed to read delete data from msgstream", zap.Error(err))
return err
}

deleteData.Merge(streamDeleteData)
log.Info("load delete from stream done")
}

Expand All @@ -472,9 +536,7 @@ func (sd *shardDelegator) loadStreamDelete(ctx context.Context,
}
for i, pk := range record.DeleteData.Pks {
if candidate.MayPkExist(pk) {
deleteData.Pks = append(deleteData.Pks, pk)
deleteData.Tss = append(deleteData.Tss, record.DeleteData.Tss[i])
deleteData.RowCount++
deleteData.Append(pk, record.DeleteData.Tss[i])
}
}
}
Expand Down
21 changes: 21 additions & 0 deletions internal/querynodev2/delegator/delegator_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/proto/segcorepb"
"github.com/milvus-io/milvus/internal/querynodev2/cluster"
Expand Down Expand Up @@ -232,6 +233,7 @@ func (s *DelegatorDataSuite) TestProcessDelete() {
ms.EXPECT().Partition().Return(info.GetPartitionID())
ms.EXPECT().Indexes().Return(nil)
ms.EXPECT().RowNum().Return(info.GetNumOfRows())
ms.EXPECT().Delete(mock.Anything, mock.Anything).Return(nil)
ms.EXPECT().MayPkExist(mock.Anything).Call.Return(func(pk storage.PrimaryKey) bool {
return pk.EQ(storage.NewInt64PrimaryKey(10))
})
Expand Down Expand Up @@ -447,7 +449,25 @@ func (s *DelegatorDataSuite) TestLoadSegments() {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

err := s.delegator.LoadSegments(ctx, &querypb.LoadSegmentsRequest{
Base: commonpbutil.NewMsgBase(),
DstNodeID: 1,
CollectionID: s.collectionID,
Infos: []*querypb.SegmentLoadInfo{
{
SegmentID: 200,
PartitionID: 500,
StartPosition: &msgpb.MsgPosition{Timestamp: 20000},
DeltaPosition: &msgpb.MsgPosition{Timestamp: 20000},
Deltalogs: []*datapb.FieldBinlog{},
Level: datapb.SegmentLevel_L0,
},
},
})
s.NoError(err)

err = s.delegator.LoadSegments(ctx, &querypb.LoadSegmentsRequest{
Base: commonpbutil.NewMsgBase(),
DstNodeID: 1,
CollectionID: s.collectionID,
Expand Down Expand Up @@ -676,6 +696,7 @@ func (s *DelegatorDataSuite) TestReleaseSegment() {
ms.EXPECT().Collection().Return(info.GetCollectionID())
ms.EXPECT().Indexes().Return(nil)
ms.EXPECT().RowNum().Return(info.GetNumOfRows())
ms.EXPECT().Delete(mock.Anything, mock.Anything).Return(nil)
ms.EXPECT().MayPkExist(mock.Anything).Call.Return(func(pk storage.PrimaryKey) bool {
return pk.EQ(storage.NewInt64PrimaryKey(10))
})
Expand Down
1 change: 1 addition & 0 deletions internal/querynodev2/delegator/delegator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func (s *DelegatorSuite) SetupTest() {
ms.EXPECT().Collection().Return(info.GetCollectionID())
ms.EXPECT().Indexes().Return(nil)
ms.EXPECT().RowNum().Return(info.GetNumOfRows())
ms.EXPECT().Delete(mock.Anything, mock.Anything).Return(nil)
return ms
})
}, nil)
Expand Down
40 changes: 40 additions & 0 deletions internal/querynodev2/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/planpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
Expand All @@ -43,6 +44,45 @@ import (
"github.com/milvus-io/milvus/pkg/util/timerecord"
)

func loadL0Segments(ctx context.Context, delegator delegator.ShardDelegator, req *querypb.WatchDmChannelsRequest) error {
l0Segments := make([]*querypb.SegmentLoadInfo, 0)
for _, channel := range req.GetInfos() {
for _, segmentID := range channel.GetFlushedSegmentIds() {
segmentInfo, ok := req.GetSegmentInfos()[segmentID]
if !ok ||
segmentInfo.GetLevel() != datapb.SegmentLevel_L0 {
continue
}

l0Segments = append(l0Segments, &querypb.SegmentLoadInfo{
SegmentID: segmentID,
PartitionID: segmentInfo.PartitionID,
CollectionID: segmentInfo.CollectionID,
BinlogPaths: segmentInfo.Binlogs,
NumOfRows: segmentInfo.NumOfRows,
Statslogs: segmentInfo.Statslogs,
Deltalogs: segmentInfo.Deltalogs,
InsertChannel: segmentInfo.InsertChannel,
StartPosition: segmentInfo.GetStartPosition(),
Level: segmentInfo.GetLevel(),
})
}
}

return delegator.LoadSegments(ctx, &querypb.LoadSegmentsRequest{
Base: req.GetBase(),
DstNodeID: req.GetNodeID(),
Infos: l0Segments,
Schema: req.GetSchema(),
CollectionID: req.GetCollectionID(),
LoadMeta: req.GetLoadMeta(),
ReplicaID: req.GetReplicaID(),
Version: req.GetVersion(),
NeedTransfer: false,
IndexInfoList: req.GetIndexInfoList(),
})
}

func loadGrowingSegments(ctx context.Context, delegator delegator.ShardDelegator, req *querypb.WatchDmChannelsRequest) error {
// load growing segments
growingSegments := make([]*querypb.SegmentLoadInfo, 0, len(req.Infos))
Expand Down
27 changes: 24 additions & 3 deletions internal/querynodev2/segments/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ import (

"go.uber.org/zap"

"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/eventlog"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
Expand Down Expand Up @@ -116,6 +118,7 @@ type SegmentManager interface {
// and increases the ref count of the corresponding collection,
// dup segments will not increase the ref count
Put(segmentType SegmentType, segments ...Segment)
UpdateBy(action SegmentAction, filters ...SegmentFilter) int
Get(segmentID UniqueID) Segment
GetWithType(segmentID UniqueID, typ SegmentType) Segment
GetBy(filters ...SegmentFilter) []Segment
Expand All @@ -124,10 +127,9 @@ type SegmentManager interface {
GetAndPin(segments []int64, filters ...SegmentFilter) ([]Segment, error)
Unpin(segments []Segment)

UpdateSegmentBy(action SegmentAction, filters ...SegmentFilter) int

GetSealed(segmentID UniqueID) Segment
GetGrowing(segmentID UniqueID) Segment
GetL0DeleteRecords() ([]storage.PrimaryKey, []uint64)
Empty() bool

// Remove removes the given segment,
Expand Down Expand Up @@ -217,7 +219,7 @@ func (mgr *segmentManager) Put(segmentType SegmentType, segments ...Segment) {
}
}

func (mgr *segmentManager) UpdateSegmentBy(action SegmentAction, filters ...SegmentFilter) int {
func (mgr *segmentManager) UpdateBy(action SegmentAction, filters ...SegmentFilter) int {
mgr.mu.RLock()
defer mgr.mu.RUnlock()

Expand Down Expand Up @@ -407,6 +409,25 @@ func (mgr *segmentManager) GetGrowing(segmentID UniqueID) Segment {
return nil
}

func (mgr *segmentManager) GetL0DeleteRecords() ([]storage.PrimaryKey, []uint64) {
mgr.mu.RLock()
defer mgr.mu.RUnlock()

pks := make([]storage.PrimaryKey, 0)
tss := make([]uint64, 0)
for _, segment := range mgr.sealedSegments {
if segment.Level() != datapb.SegmentLevel_L0 {
continue
}

deletePks, deleteTss := segment.(*L0Segment).DeleteRecords()
pks = append(pks, deletePks...)
tss = append(tss, deleteTss...)
}

return pks, tss
}

func (mgr *segmentManager) Empty() bool {
mgr.mu.RLock()
defer mgr.mu.RUnlock()
Expand Down
8 changes: 5 additions & 3 deletions internal/querynodev2/segments/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
Expand All @@ -20,7 +21,7 @@ type ManagerSuite struct {
partitionIDs []int64
channels []string
types []SegmentType
segments []*LocalSegment
segments []Segment

mgr *segmentManager
}
Expand Down Expand Up @@ -49,6 +50,7 @@ func (s *ManagerSuite) SetupTest() {
0,
nil,
nil,
datapb.SegmentLevel_Legacy,
)
s.Require().NoError(err)
s.segments = append(s.segments, segment)
Expand Down Expand Up @@ -115,8 +117,8 @@ func (s *ManagerSuite) TestRemoveBy() {
func (s *ManagerSuite) TestUpdateBy() {
action := IncreaseVersion(1)

s.Equal(2, s.mgr.UpdateSegmentBy(action, WithType(SegmentTypeSealed)))
s.Equal(1, s.mgr.UpdateSegmentBy(action, WithType(SegmentTypeGrowing)))
s.Equal(2, s.mgr.UpdateBy(action, WithType(SegmentTypeSealed)))
s.Equal(1, s.mgr.UpdateBy(action, WithType(SegmentTypeGrowing)))

segments := s.mgr.GetBy()
for _, segment := range segments {
Expand Down
Loading

0 comments on commit ece592a

Please sign in to comment.