Skip to content

Commit

Permalink
enhance: make delegator delete buffer holding all delete from cp (mil…
Browse files Browse the repository at this point in the history
…vus-io#29626) (milvus-io#35074)

See also milvus-io#29625
pr: milvus-io#29626 

This PR:
- Add a new implemention of `DeleteBuffer`: listDeleteBuffer
  - holds cacheBlock slice
  - `Put` method append new delete data into last block
  - when a block is full, append a new block into the list
- Add `TryDiscard` method for `DeleteBuffer` interface
  - For doubleCacheBuffer, do nothing
- For listDeleteBuffer, try to evict "old" blocks, which are blocks
before the first block whose start ts is behind provided ts
- Add checkpoint field for `UpdateVersion` sync action, which shall be
used to discard old cache delete block

---------

Signed-off-by: Wei Liu <[email protected]>
Co-authored-by: congqixia <[email protected]>
  • Loading branch information
weiliu1031 and congqixia authored Aug 9, 2024
1 parent d8bfc07 commit e5681e5
Show file tree
Hide file tree
Showing 12 changed files with 263 additions and 31 deletions.
1 change: 1 addition & 0 deletions internal/proto/query_coord.proto
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,7 @@ message SyncAction {
repeated int64 sealedInTarget = 8;
int64 TargetVersion = 9;
repeated int64 droppedInTarget = 10;
msg.MsgPosition checkpoint = 11;
}

message SyncDistributionRequest {
Expand Down
9 changes: 8 additions & 1 deletion internal/querycoordv2/observers/target_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,14 +466,21 @@ func (ob *TargetObserver) checkNeedUpdateTargetVersion(ctx context.Context, lead
sealedSegments := ob.targetMgr.GetSealedSegmentsByChannel(leaderView.CollectionID, leaderView.Channel, meta.NextTarget)
growingSegments := ob.targetMgr.GetGrowingSegmentsByChannel(leaderView.CollectionID, leaderView.Channel, meta.NextTarget)
droppedSegments := ob.targetMgr.GetDroppedSegmentsByChannel(leaderView.CollectionID, leaderView.Channel, meta.NextTarget)
channel := ob.targetMgr.GetDmChannel(leaderView.CollectionID, leaderView.Channel, meta.NextTargetFirst)

return &querypb.SyncAction{
action := &querypb.SyncAction{
Type: querypb.SyncType_UpdateVersion,
GrowingInTarget: growingSegments.Collect(),
SealedInTarget: lo.Keys(sealedSegments),
DroppedInTarget: droppedSegments,
TargetVersion: targetVersion,
}

if channel != nil {
action.Checkpoint = channel.GetSeekPosition()
}

return action
}

func (ob *TargetObserver) updateCurrentTarget(collectionID int64) {
Expand Down
12 changes: 7 additions & 5 deletions internal/querynodev2/delegator/delegator.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querynodev2/cluster"
Expand Down Expand Up @@ -70,7 +71,7 @@ type ShardDelegator interface {
LoadGrowing(ctx context.Context, infos []*querypb.SegmentLoadInfo, version int64) error
LoadSegments(ctx context.Context, req *querypb.LoadSegmentsRequest) error
ReleaseSegments(ctx context.Context, req *querypb.ReleaseSegmentsRequest, force bool) error
SyncTargetVersion(newVersion int64, growingInTarget []int64, sealedInTarget []int64, droppedInTarget []int64)
SyncTargetVersion(newVersion int64, growingInTarget []int64, sealedInTarget []int64, droppedInTarget []int64, checkpoint *msgpb.MsgPosition)
GetTargetVersion() int64

// control
Expand Down Expand Up @@ -505,7 +506,8 @@ func organizeSubTask[T any](ctx context.Context, req T, sealed []SnapshotItem, g

func executeSubTasks[T any, R interface {
GetStatus() *commonpb.Status
}](ctx context.Context, tasks []subTask[T], execute func(context.Context, T, cluster.Worker) (R, error), taskType string, log *log.MLogger) ([]R, error) {
}](ctx context.Context, tasks []subTask[T], execute func(context.Context, T, cluster.Worker) (R, error), taskType string, log *log.MLogger,
) ([]R, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

Expand Down Expand Up @@ -674,8 +676,8 @@ func NewShardDelegator(ctx context.Context, collectionID UniqueID, replicaID Uni
return nil, fmt.Errorf("collection(%d) not found in manager", collectionID)
}

maxSegmentDeleteBuffer := paramtable.Get().QueryNodeCfg.MaxSegmentDeleteBuffer.GetAsInt64()
log.Info("Init delta cache", zap.Int64("maxSegmentCacheBuffer", maxSegmentDeleteBuffer), zap.Time("startTime", tsoutil.PhysicalTime(startTs)))
sizePerBlock := paramtable.Get().QueryNodeCfg.DeleteBufferBlockSize.GetAsInt64()
log.Info("Init delete cache with list delete buffer", zap.Int64("sizePerBlock", sizePerBlock), zap.Time("startTime", tsoutil.PhysicalTime(startTs)))

sd := &shardDelegator{
collectionID: collectionID,
Expand All @@ -687,7 +689,7 @@ func NewShardDelegator(ctx context.Context, collectionID UniqueID, replicaID Uni
workerManager: workerManager,
lifetime: lifetime.NewLifetime(lifetime.Initializing),
distribution: NewDistribution(),
deleteBuffer: deletebuffer.NewDoubleCacheDeleteBuffer[*deletebuffer.Item](startTs, maxSegmentDeleteBuffer),
deleteBuffer: deletebuffer.NewListDeleteBuffer[*deletebuffer.Item](startTs, sizePerBlock),
pkOracle: pkoracle.NewPkOracle(),
tsafeManager: tsafeManager,
latestTsafe: atomic.NewUint64(startTs),
Expand Down
3 changes: 2 additions & 1 deletion internal/querynodev2/delegator/delegator_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,7 @@ func (sd *shardDelegator) ReleaseSegments(ctx context.Context, req *querypb.Rele
}

func (sd *shardDelegator) SyncTargetVersion(newVersion int64, growingInTarget []int64,
sealedInTarget []int64, droppedInTarget []int64,
sealedInTarget []int64, droppedInTarget []int64, checkpoint *msgpb.MsgPosition,
) {
growings := sd.segmentManager.GetBy(
segments.WithType(segments.SegmentTypeGrowing),
Expand Down Expand Up @@ -774,6 +774,7 @@ func (sd *shardDelegator) SyncTargetVersion(newVersion int64, growingInTarget []
zap.Int64s("growingSegments", redundantGrowingIDs))
}
sd.distribution.SyncTargetVersion(newVersion, growingInTarget, sealedInTarget, redundantGrowingIDs)
sd.deleteBuffer.TryDiscard(checkpoint.GetTimestamp())
}

func (sd *shardDelegator) GetTargetVersion() int64 {
Expand Down
2 changes: 1 addition & 1 deletion internal/querynodev2/delegator/delegator_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -849,7 +849,7 @@ func (s *DelegatorDataSuite) TestSyncTargetVersion() {
s.manager.Segment.Put(segments.SegmentTypeGrowing, ms)
}

s.delegator.SyncTargetVersion(int64(5), []int64{1}, []int64{2}, []int64{3, 4})
s.delegator.SyncTargetVersion(int64(5), []int64{1}, []int64{2}, []int64{3, 4}, &msgpb.MsgPosition{})
s.Equal(int64(5), s.delegator.GetTargetVersion())
}

Expand Down
3 changes: 2 additions & 1 deletion internal/querynodev2/delegator/delegator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"go.uber.org/atomic"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
Expand Down Expand Up @@ -243,7 +244,7 @@ func (s *DelegatorSuite) initSegments() {
Version: 2001,
},
)
s.delegator.SyncTargetVersion(2001, []int64{1004}, []int64{1000, 1001, 1002, 1003}, []int64{})
s.delegator.SyncTargetVersion(2001, []int64{1004}, []int64{1000, 1001, 1002, 1003}, []int64{}, &msgpb.MsgPosition{})
}

func (s *DelegatorSuite) TestSearch() {
Expand Down
26 changes: 13 additions & 13 deletions internal/querynodev2/delegator/deletebuffer/delete_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,12 @@ type DeleteBuffer[T timed] interface {
Put(T)
ListAfter(uint64) []T
SafeTs() uint64
TryDiscard(uint64)
}

func NewDoubleCacheDeleteBuffer[T timed](startTs uint64, maxSize int64) DeleteBuffer[T] {
return &doubleCacheBuffer[T]{
head: newDoubleCacheItem[T](startTs, maxSize),
head: newCacheBlock[T](startTs, maxSize),
maxSize: maxSize,
ts: startTs,
}
Expand All @@ -48,7 +49,7 @@ func NewDoubleCacheDeleteBuffer[T timed](startTs uint64, maxSize int64) DeleteBu
// doubleCacheBuffer implements DeleteBuffer with fixed sized double cache.
type doubleCacheBuffer[T timed] struct {
mut sync.RWMutex
head, tail *doubleCacheItem[T]
head, tail *cacheBlock[T]
maxSize int64
ts uint64
}
Expand All @@ -57,6 +58,9 @@ func (c *doubleCacheBuffer[T]) SafeTs() uint64 {
return c.ts
}

func (c *doubleCacheBuffer[T]) TryDiscard(_ uint64) {
}

// Put implements DeleteBuffer.
func (c *doubleCacheBuffer[T]) Put(entry T) {
c.mut.Lock()
Expand Down Expand Up @@ -85,23 +89,19 @@ func (c *doubleCacheBuffer[T]) ListAfter(ts uint64) []T {
// evict sets head as tail and evicts tail.
func (c *doubleCacheBuffer[T]) evict(newTs uint64, entry T) {
c.tail = c.head
c.head = &doubleCacheItem[T]{
headTs: newTs,
maxSize: c.maxSize / 2,
size: entry.Size(),
data: []T{entry},
}
c.head = newCacheBlock[T](newTs, c.maxSize/2, entry)
c.ts = c.tail.headTs
}

func newDoubleCacheItem[T timed](ts uint64, maxSize int64) *doubleCacheItem[T] {
return &doubleCacheItem[T]{
func newCacheBlock[T timed](ts uint64, maxSize int64, elements ...T) *cacheBlock[T] {
return &cacheBlock[T]{
headTs: ts,
maxSize: maxSize,
data: elements,
}
}

type doubleCacheItem[T timed] struct {
type cacheBlock[T timed] struct {
mut sync.RWMutex
headTs uint64
size int64
Expand All @@ -112,7 +112,7 @@ type doubleCacheItem[T timed] struct {

// Cache adds entry into cache item.
// returns error if item is full
func (c *doubleCacheItem[T]) Put(entry T) error {
func (c *cacheBlock[T]) Put(entry T) error {
c.mut.Lock()
defer c.mut.Unlock()

Expand All @@ -126,7 +126,7 @@ func (c *doubleCacheItem[T]) Put(entry T) error {
}

// ListAfter returns entries of which ts after provided value.
func (c *doubleCacheItem[T]) ListAfter(ts uint64) []T {
func (c *cacheBlock[T]) ListAfter(ts uint64) []T {
c.mut.RLock()
defer c.mut.RUnlock()
idx := sort.Search(len(c.data), func(idx int) bool {
Expand Down
94 changes: 94 additions & 0 deletions internal/querynodev2/delegator/deletebuffer/list_delete_buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package deletebuffer

import (
"sync"

"github.com/cockroachdb/errors"
)

func NewListDeleteBuffer[T timed](startTs uint64, sizePerBlock int64) DeleteBuffer[T] {
return &listDeleteBuffer[T]{
safeTs: startTs,
sizePerBlock: sizePerBlock,
list: []*cacheBlock[T]{newCacheBlock[T](startTs, sizePerBlock)},
}
}

// listDeleteBuffer implements DeleteBuffer with a list.
// head points to the earliest block.
// tail points to the latest block which shall be written into.
type listDeleteBuffer[T timed] struct {
mut sync.RWMutex

list []*cacheBlock[T]

safeTs uint64
sizePerBlock int64
}

func (b *listDeleteBuffer[T]) Put(entry T) {
b.mut.Lock()
defer b.mut.Unlock()

tail := b.list[len(b.list)-1]
err := tail.Put(entry)
if errors.Is(err, errBufferFull) {
b.list = append(b.list, newCacheBlock[T](entry.Timestamp(), b.sizePerBlock, entry))
}
}

func (b *listDeleteBuffer[T]) ListAfter(ts uint64) []T {
b.mut.RLock()
defer b.mut.RUnlock()

var result []T
for _, block := range b.list {
result = append(result, block.ListAfter(ts)...)
}
return result
}

func (b *listDeleteBuffer[T]) SafeTs() uint64 {
b.mut.RLock()
defer b.mut.RUnlock()
return b.safeTs
}

func (b *listDeleteBuffer[T]) TryDiscard(ts uint64) {
b.mut.Lock()
defer b.mut.Unlock()
if len(b.list) == 1 {
return
}
var nextHead int
for idx := len(b.list) - 1; idx >= 0; idx-- {
block := b.list[idx]
if block.headTs <= ts {
nextHead = idx
break
}
}

if nextHead > 0 {
for idx := 0; idx < nextHead; idx++ {
b.list[idx] = nil
}
b.list = b.list[nextHead:]
}
}
Loading

0 comments on commit e5681e5

Please sign in to comment.