Skip to content

Commit

Permalink
Add WriteBuffer to provide abstraction for delta policy (#27874)
Browse files Browse the repository at this point in the history
Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia authored Nov 4, 2023
1 parent 8011054 commit bf2f62c
Show file tree
Hide file tree
Showing 31 changed files with 2,337 additions and 23 deletions.
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,8 @@ generate-mockery-datanode: getdeps
$(INSTALL_PATH)/mockery --name=Allocator --dir=$(PWD)/internal/datanode/allocator --output=$(PWD)/internal/datanode/allocator --filename=mock_allocator.go --with-expecter --structname=MockAllocator --outpkg=allocator --inpackage
$(INSTALL_PATH)/mockery --name=Broker --dir=$(PWD)/internal/datanode/broker --output=$(PWD)/internal/datanode/broker/ --filename=mock_broker.go --with-expecter --structname=MockBroker --outpkg=broker --inpackage
$(INSTALL_PATH)/mockery --name=MetaCache --dir=$(PWD)/internal/datanode/metacache --output=$(PWD)/internal/datanode/metacache --filename=mock_meta_cache.go --with-expecter --structname=MockMetaCache --outpkg=metacache --inpackage
$(INSTALL_PATH)/mockery --name=SyncManager --dir=$(PWD)/internal/datanode/syncmgr --output=$(PWD)/internal/datanode/syncmgr --filename=mock_sync_manager.go --with-expecter --structname=MockSyncManager --outpkg=syncmgr --inpackage
$(INSTALL_PATH)/mockery --name=WriteBuffer --dir=$(PWD)/internal/datanode/writebuffer --output=$(PWD)/internal/datanode/writebuffer --filename=mock_write_buffer.go --with-expecter --structname=MockWriteBuffer --output=writebuffer --inpackage

generate-mockery-metastore: getdeps
$(INSTALL_PATH)/mockery --name=RootCoordCatalog --dir=$(PWD)/internal/metastore --output=$(PWD)/internal/metastore/mocks --filename=mock_rootcoord_catalog.go --with-expecter --structname=RootCoordCatalog --outpkg=mocks
Expand Down
6 changes: 4 additions & 2 deletions internal/datanode/metacache/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package metacache
import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

type SegmentFilter func(info *SegmentInfo) bool
Expand All @@ -29,9 +30,10 @@ func WithPartitionID(partitionID int64) SegmentFilter {
}
}

func WithSegmentID(segmentID int64) SegmentFilter {
func WithSegmentIDs(segmentIDs ...int64) SegmentFilter {
set := typeutil.NewSet[int64](segmentIDs...)
return func(info *SegmentInfo) bool {
return info.segmentID == segmentID
return set.Contain(info.segmentID)
}
}

Expand Down
2 changes: 1 addition & 1 deletion internal/datanode/metacache/actions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (s *SegmentFilterSuite) TestFilters() {
s.True(filter(info))

segmentID := int64(10001)
filter = WithSegmentID(segmentID)
filter = WithSegmentIDs(segmentID)
info.segmentID = segmentID + 1
s.False(filter(info))
info.segmentID = segmentID
Expand Down
11 changes: 8 additions & 3 deletions internal/datanode/metacache/meta_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ import (
"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
)

type MetaCache interface {
NewSegment(segmentID, partitionID int64)
NewSegment(segmentID, partitionID int64, startPos *msgpb.MsgPosition, actions ...SegmentAction)
UpdateSegments(action SegmentAction, filters ...SegmentFilter)
CompactSegments(newSegmentID, partitionID int64, oldSegmentIDs ...int64)
GetSegmentsBy(filters ...SegmentFilter) []*SegmentInfo
Expand Down Expand Up @@ -67,17 +68,21 @@ func (c *metaCacheImpl) init(vchannel *datapb.VchannelInfo, factory PkStatsFacto
}
}

func (c *metaCacheImpl) NewSegment(segmentID, partitionID int64) {
func (c *metaCacheImpl) NewSegment(segmentID, partitionID int64, startPos *msgpb.MsgPosition, actions ...SegmentAction) {
c.mu.Lock()
defer c.mu.Unlock()

if _, ok := c.segmentInfos[segmentID]; !ok {
c.segmentInfos[segmentID] = &SegmentInfo{
info := &SegmentInfo{
segmentID: segmentID,
partitionID: partitionID,
state: commonpb.SegmentState_Growing,
startPosRecorded: false,
}
for _, action := range actions {
action(info)
}
c.segmentInfos[segmentID] = info
}
}

Expand Down
6 changes: 3 additions & 3 deletions internal/datanode/metacache/meta_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (s *MetaCacheSuite) SetupTest() {

func (s *MetaCacheSuite) TestNewSegment() {
for i, seg := range s.newSegments {
s.cache.NewSegment(seg, s.partitionIDs[i])
s.cache.NewSegment(seg, s.partitionIDs[i], nil, UpdateNumOfRows(100))
}

for id, partitionID := range s.partitionIDs {
Expand Down Expand Up @@ -110,8 +110,8 @@ func (s *MetaCacheSuite) TestCompactSegments() {
}

func (s *MetaCacheSuite) TestUpdateSegments() {
s.cache.UpdateSegments(UpdateState(commonpb.SegmentState_Flushed), WithSegmentID(5))
segments := s.cache.GetSegmentsBy(WithSegmentID(5))
s.cache.UpdateSegments(UpdateState(commonpb.SegmentState_Flushed), WithSegmentIDs(5))
segments := s.cache.GetSegmentsBy(WithSegmentIDs(5))
s.Require().Equal(1, len(segments))
segment := segments[0]
s.Equal(commonpb.SegmentState_Flushed, segment.State())
Expand Down
37 changes: 28 additions & 9 deletions internal/datanode/metacache/mock_meta_cache.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions internal/datanode/metacache/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ func (s *SegmentInfo) CompactTo() int64 {
return s.compactTo
}

func (s *SegmentInfo) GetBloomFilterSet() *BloomFilterSet {
return s.bfs
}

func (s *SegmentInfo) Clone() *SegmentInfo {
return &SegmentInfo{
segmentID: s.segmentID,
Expand Down
6 changes: 3 additions & 3 deletions internal/datanode/syncmgr/meta_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (b *brokerMetaWriter) UpdateSync(pack *SyncTask) error {
deltaInfos[0] = &datapb.FieldBinlog{Binlogs: []*datapb.Binlog{pack.deltaBinlog}}

// only current segment checkpoint info,
segments := pack.metacache.GetSegmentsBy(metacache.WithSegmentID(pack.segmentID))
segments := pack.metacache.GetSegmentsBy(metacache.WithSegmentIDs(pack.segmentID))
if len(segments) == 0 {
return merr.WrapErrSegmentNotFound(pack.segmentID)
}
Expand Down Expand Up @@ -96,8 +96,8 @@ func (b *brokerMetaWriter) UpdateSync(pack *SyncTask) error {

StartPositions: startPos,
Flushed: pack.isFlush,
// Dropped: pack.option.isDrop,
Channel: pack.channelName,
Dropped: pack.isDrop,
Channel: pack.channelName,
}
err := retry.Do(context.Background(), func() error {
err := b.broker.SaveBinlogPaths(context.Background(), req)
Expand Down
145 changes: 145 additions & 0 deletions internal/datanode/syncmgr/mock_sync_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions internal/datanode/syncmgr/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ func (t *SyncTask) WithFlush() *SyncTask {
return t
}

func (t *SyncTask) WithDrop() *SyncTask {
t.isDrop = true
return t
}

func (t *SyncTask) WithMetaCache(metacache metacache.MetaCache) *SyncTask {
t.metacache = metacache
return t
Expand Down
5 changes: 3 additions & 2 deletions internal/datanode/syncmgr/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type SyncTask struct {
tsTo typeutil.Timestamp

isFlush bool
isDrop bool

metacache metacache.MetaCache
metaWriter MetaWriter
Expand Down Expand Up @@ -75,7 +76,7 @@ func (t *SyncTask) Run() error {
log := t.getLogger()
var err error

infos := t.metacache.GetSegmentsBy(metacache.WithSegmentID(t.segmentID))
infos := t.metacache.GetSegmentsBy(metacache.WithSegmentIDs(t.segmentID))
if len(infos) == 0 {
log.Warn("failed to sync data, segment not found in metacache")
t.handleError(err)
Expand Down Expand Up @@ -245,7 +246,7 @@ func (t *SyncTask) serializeSinglePkStats(fieldID int64, stats *storage.PrimaryK
}

func (t *SyncTask) serializeMergedPkStats(fieldID int64, stats *storage.PrimaryKeyStats, rowNum int64) error {
segments := t.metacache.GetSegmentsBy(metacache.WithSegmentID(t.segmentID))
segments := t.metacache.GetSegmentsBy(metacache.WithSegmentIDs(t.segmentID))
var statsList []*storage.PrimaryKeyStats
var oldRowNum int64
for _, segment := range segments {
Expand Down
1 change: 1 addition & 0 deletions internal/datanode/syncmgr/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ func (s *SyncTaskSuite) TestRunNormal() {
task := s.getSuiteSyncTask()
task.WithInsertData(s.getInsertBuffer()).WithDeleteData(s.getDeleteBuffer())
task.WithFlush()
task.WithDrop()
task.WithMetaWriter(BrokerMetaWriter(s.broker))
task.WithCheckpoint(&msgpb.MsgPosition{
ChannelName: s.channelName,
Expand Down
Loading

0 comments on commit bf2f62c

Please sign in to comment.