From aae7e627298e58f1c65f75630ea846b37308883a Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Thu, 30 Nov 2023 14:30:28 +0800 Subject: [PATCH] feat: Add levelzero compaction in DN (#28470) See also: #27606 --------- Signed-off-by: yangxuan --- Makefile | 1 + internal/datanode/compactor.go | 24 +- internal/datanode/compactor_test.go | 4 +- internal/datanode/io/binlog_io.go | 80 ++++ internal/datanode/io/binlog_io_test.go | 73 +++ internal/datanode/io/mock_binlogio.go | 189 ++++++++ .../datanode/iterators/binlog_iterator.go | 2 +- .../iterators/binlog_iterator_test.go | 8 +- .../iterators/deltalog_iterator_test.go | 10 +- internal/datanode/iterators/iterator.go | 39 +- internal/datanode/l0_compactor.go | 346 ++++++++++++++ internal/datanode/l0_compactor_test.go | 428 ++++++++++++++++++ internal/datanode/services.go | 39 +- internal/storage/data_codec.go | 9 + 14 files changed, 1215 insertions(+), 37 deletions(-) create mode 100644 internal/datanode/io/binlog_io.go create mode 100644 internal/datanode/io/binlog_io_test.go create mode 100644 internal/datanode/io/mock_binlogio.go create mode 100644 internal/datanode/l0_compactor.go create mode 100644 internal/datanode/l0_compactor_test.go diff --git a/Makefile b/Makefile index 8ad4e63e52e98..b2545385f117c 100644 --- a/Makefile +++ b/Makefile @@ -437,6 +437,7 @@ generate-mockery-datanode: getdeps $(INSTALL_PATH)/mockery --name=SyncManager --dir=$(PWD)/internal/datanode/syncmgr --output=$(PWD)/internal/datanode/syncmgr --filename=mock_sync_manager.go --with-expecter --structname=MockSyncManager --outpkg=syncmgr --inpackage $(INSTALL_PATH)/mockery --name=WriteBuffer --dir=$(PWD)/internal/datanode/writebuffer --output=$(PWD)/internal/datanode/writebuffer --filename=mock_write_buffer.go --with-expecter --structname=MockWriteBuffer --outpkg=writebuffer --inpackage $(INSTALL_PATH)/mockery --name=BufferManager --dir=$(PWD)/internal/datanode/writebuffer --output=$(PWD)/internal/datanode/writebuffer --filename=mock_mananger.go --with-expecter --structname=MockBufferManager --outpkg=writebuffer --inpackage + $(INSTALL_PATH)/mockery --name=BinlogIO --dir=$(PWD)/internal/datanode/io --output=$(PWD)/internal/datanode/io --filename=mock_binlogio.go --with-expecter --structname=MockBinlogIO --outpkg=io --inpackage generate-mockery-metastore: getdeps $(INSTALL_PATH)/mockery --name=RootCoordCatalog --dir=$(PWD)/internal/metastore --output=$(PWD)/internal/metastore/mocks --filename=mock_rootcoord_catalog.go --with-expecter --structname=RootCoordCatalog --outpkg=mocks diff --git a/internal/datanode/compactor.go b/internal/datanode/compactor.go index ab137c9b05222..9ea535079b782 100644 --- a/internal/datanode/compactor.go +++ b/internal/datanode/compactor.go @@ -54,7 +54,6 @@ type iterator = storage.Iterator type compactor interface { complete() - // compact() (*datapb.CompactionResult, error) compact() (*datapb.CompactionPlanResult, error) injectDone() stop() @@ -79,9 +78,8 @@ type compactionTask struct { ctx context.Context cancel context.CancelFunc - done chan struct{} - tr *timerecord.TimeRecorder - chunkManager storage.ChunkManager + done chan struct{} + tr *timerecord.TimeRecorder } func newCompactionTask( @@ -92,22 +90,20 @@ func newCompactionTask( syncMgr syncmgr.SyncManager, alloc allocator.Allocator, plan *datapb.CompactionPlan, - chunkManager storage.ChunkManager, ) *compactionTask { ctx1, cancel := context.WithCancel(ctx) return &compactionTask{ ctx: ctx1, cancel: cancel, - downloader: dl, - uploader: ul, - syncMgr: syncMgr, - metaCache: metaCache, - Allocator: alloc, - plan: plan, - tr: timerecord.NewTimeRecorder("compactionTask"), - chunkManager: chunkManager, - done: make(chan struct{}, 1), + downloader: dl, + uploader: ul, + syncMgr: syncMgr, + metaCache: metaCache, + Allocator: alloc, + plan: plan, + tr: timerecord.NewTimeRecorder("levelone compaction"), + done: make(chan struct{}, 1), } } diff --git a/internal/datanode/compactor_test.go b/internal/datanode/compactor_test.go index 6ee4a3b44cde9..e99dadfe3db85 100644 --- a/internal/datanode/compactor_test.go +++ b/internal/datanode/compactor_test.go @@ -965,7 +965,7 @@ func TestCompactorInterfaceMethods(t *testing.T) { Channel: "channelname", } - task := newCompactionTask(context.TODO(), mockbIO, mockbIO, metaCache, syncMgr, alloc, plan, nil) + task := newCompactionTask(context.TODO(), mockbIO, mockbIO, metaCache, syncMgr, alloc, plan) result, err := task.compact() assert.NoError(t, err) assert.NotNil(t, result) @@ -1103,7 +1103,7 @@ func TestCompactorInterfaceMethods(t *testing.T) { Channel: "channelname", } - task := newCompactionTask(context.TODO(), mockbIO, mockbIO, metaCache, syncMgr, alloc, plan, nil) + task := newCompactionTask(context.TODO(), mockbIO, mockbIO, metaCache, syncMgr, alloc, plan) result, err := task.compact() assert.NoError(t, err) assert.NotNil(t, result) diff --git a/internal/datanode/io/binlog_io.go b/internal/datanode/io/binlog_io.go new file mode 100644 index 0000000000000..518071d67e186 --- /dev/null +++ b/internal/datanode/io/binlog_io.go @@ -0,0 +1,80 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package io + +import ( + "context" + "path" + + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/util/conc" + "github.com/milvus-io/milvus/pkg/util/retry" +) + +type BinlogIO interface { + Download(ctx context.Context, paths []string) ([][]byte, error) + Upload(ctx context.Context, kvs map[string][]byte) error + // JoinFullPath returns the full path by join the paths with the chunkmanager's rootpath + JoinFullPath(paths ...string) string +} + +type BinlogIoImpl struct { + storage.ChunkManager + pool *conc.Pool[any] +} + +func NewBinlogIO(cm storage.ChunkManager, ioPool *conc.Pool[any]) BinlogIO { + return &BinlogIoImpl{cm, ioPool} +} + +func (b *BinlogIoImpl) Download(ctx context.Context, paths []string) ([][]byte, error) { + future := b.pool.Submit(func() (any, error) { + var vs [][]byte + var err error + + err = retry.Do(ctx, func() error { + vs, err = b.MultiRead(ctx, paths) + return err + }) + + return vs, err + }) + + vs, err := future.Await() + if err != nil { + return nil, err + } + + return vs.([][]byte), nil +} + +func (b *BinlogIoImpl) Upload(ctx context.Context, kvs map[string][]byte) error { + future := b.pool.Submit(func() (any, error) { + err := retry.Do(ctx, func() error { + return b.MultiWrite(ctx, kvs) + }) + + return nil, err + }) + + _, err := future.Await() + return err +} + +func (b *BinlogIoImpl) JoinFullPath(paths ...string) string { + return path.Join(b.ChunkManager.RootPath(), path.Join(paths...)) +} diff --git a/internal/datanode/io/binlog_io_test.go b/internal/datanode/io/binlog_io_test.go new file mode 100644 index 0000000000000..70ad89b69b5fc --- /dev/null +++ b/internal/datanode/io/binlog_io_test.go @@ -0,0 +1,73 @@ +package io + +import ( + "path" + "testing" + + "github.com/samber/lo" + "github.com/stretchr/testify/suite" + "golang.org/x/net/context" + + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/util/conc" +) + +const binlogIOTestDir = "/tmp/milvus_test/binlog_io" + +func TestBinlogIO(t *testing.T) { + suite.Run(t, new(BinlogIOSuite)) +} + +type BinlogIOSuite struct { + suite.Suite + + cm storage.ChunkManager + b BinlogIO +} + +func (s *BinlogIOSuite) SetupTest() { + pool := conc.NewDefaultPool[any]() + + s.cm = storage.NewLocalChunkManager(storage.RootPath(binlogIOTestDir)) + + s.b = NewBinlogIO(s.cm, pool) +} + +func (s *BinlogIOSuite) TeardownTest() { + ctx := context.Background() + s.cm.RemoveWithPrefix(ctx, s.cm.RootPath()) +} + +func (s *BinlogIOSuite) TestUploadDownload() { + kvs := map[string][]byte{ + path.Join(binlogIOTestDir, "a/b/c"): {1, 255, 255}, + path.Join(binlogIOTestDir, "a/b/d"): {1, 255, 255}, + } + + ctx := context.Background() + err := s.b.Upload(ctx, kvs) + s.NoError(err) + + vs, err := s.b.Download(ctx, lo.Keys(kvs)) + s.NoError(err) + s.ElementsMatch(lo.Values(kvs), vs) +} + +func (s *BinlogIOSuite) TestJoinFullPath() { + tests := []struct { + description string + inPaths []string + outPath string + }{ + {"no input", nil, path.Join(binlogIOTestDir)}, + {"input one", []string{"a"}, path.Join(binlogIOTestDir, "a")}, + {"input two", []string{"a", "b"}, path.Join(binlogIOTestDir, "a/b")}, + } + + for _, test := range tests { + s.Run(test.description, func() { + out := s.b.JoinFullPath(test.inPaths...) + s.Equal(test.outPath, out) + }) + } +} diff --git a/internal/datanode/io/mock_binlogio.go b/internal/datanode/io/mock_binlogio.go new file mode 100644 index 0000000000000..4202a7ed55679 --- /dev/null +++ b/internal/datanode/io/mock_binlogio.go @@ -0,0 +1,189 @@ +// Code generated by mockery v2.32.4. DO NOT EDIT. + +package io + +import ( + context "context" + + mock "github.com/stretchr/testify/mock" +) + +// MockBinlogIO is an autogenerated mock type for the BinlogIO type +type MockBinlogIO struct { + mock.Mock +} + +type MockBinlogIO_Expecter struct { + mock *mock.Mock +} + +func (_m *MockBinlogIO) EXPECT() *MockBinlogIO_Expecter { + return &MockBinlogIO_Expecter{mock: &_m.Mock} +} + +// Download provides a mock function with given fields: ctx, paths +func (_m *MockBinlogIO) Download(ctx context.Context, paths []string) ([][]byte, error) { + ret := _m.Called(ctx, paths) + + var r0 [][]byte + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, []string) ([][]byte, error)); ok { + return rf(ctx, paths) + } + if rf, ok := ret.Get(0).(func(context.Context, []string) [][]byte); ok { + r0 = rf(ctx, paths) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([][]byte) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, []string) error); ok { + r1 = rf(ctx, paths) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockBinlogIO_Download_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Download' +type MockBinlogIO_Download_Call struct { + *mock.Call +} + +// Download is a helper method to define mock.On call +// - ctx context.Context +// - paths []string +func (_e *MockBinlogIO_Expecter) Download(ctx interface{}, paths interface{}) *MockBinlogIO_Download_Call { + return &MockBinlogIO_Download_Call{Call: _e.mock.On("Download", ctx, paths)} +} + +func (_c *MockBinlogIO_Download_Call) Run(run func(ctx context.Context, paths []string)) *MockBinlogIO_Download_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].([]string)) + }) + return _c +} + +func (_c *MockBinlogIO_Download_Call) Return(_a0 [][]byte, _a1 error) *MockBinlogIO_Download_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockBinlogIO_Download_Call) RunAndReturn(run func(context.Context, []string) ([][]byte, error)) *MockBinlogIO_Download_Call { + _c.Call.Return(run) + return _c +} + +// JoinFullPath provides a mock function with given fields: paths +func (_m *MockBinlogIO) JoinFullPath(paths ...string) string { + _va := make([]interface{}, len(paths)) + for _i := range paths { + _va[_i] = paths[_i] + } + var _ca []interface{} + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 string + if rf, ok := ret.Get(0).(func(...string) string); ok { + r0 = rf(paths...) + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// MockBinlogIO_JoinFullPath_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'JoinFullPath' +type MockBinlogIO_JoinFullPath_Call struct { + *mock.Call +} + +// JoinFullPath is a helper method to define mock.On call +// - paths ...string +func (_e *MockBinlogIO_Expecter) JoinFullPath(paths ...interface{}) *MockBinlogIO_JoinFullPath_Call { + return &MockBinlogIO_JoinFullPath_Call{Call: _e.mock.On("JoinFullPath", + append([]interface{}{}, paths...)...)} +} + +func (_c *MockBinlogIO_JoinFullPath_Call) Run(run func(paths ...string)) *MockBinlogIO_JoinFullPath_Call { + _c.Call.Run(func(args mock.Arguments) { + variadicArgs := make([]string, len(args)-0) + for i, a := range args[0:] { + if a != nil { + variadicArgs[i] = a.(string) + } + } + run(variadicArgs...) + }) + return _c +} + +func (_c *MockBinlogIO_JoinFullPath_Call) Return(_a0 string) *MockBinlogIO_JoinFullPath_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockBinlogIO_JoinFullPath_Call) RunAndReturn(run func(...string) string) *MockBinlogIO_JoinFullPath_Call { + _c.Call.Return(run) + return _c +} + +// Upload provides a mock function with given fields: ctx, kvs +func (_m *MockBinlogIO) Upload(ctx context.Context, kvs map[string][]byte) error { + ret := _m.Called(ctx, kvs) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, map[string][]byte) error); ok { + r0 = rf(ctx, kvs) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockBinlogIO_Upload_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Upload' +type MockBinlogIO_Upload_Call struct { + *mock.Call +} + +// Upload is a helper method to define mock.On call +// - ctx context.Context +// - kvs map[string][]byte +func (_e *MockBinlogIO_Expecter) Upload(ctx interface{}, kvs interface{}) *MockBinlogIO_Upload_Call { + return &MockBinlogIO_Upload_Call{Call: _e.mock.On("Upload", ctx, kvs)} +} + +func (_c *MockBinlogIO_Upload_Call) Run(run func(ctx context.Context, kvs map[string][]byte)) *MockBinlogIO_Upload_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(map[string][]byte)) + }) + return _c +} + +func (_c *MockBinlogIO_Upload_Call) Return(_a0 error) *MockBinlogIO_Upload_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockBinlogIO_Upload_Call) RunAndReturn(run func(context.Context, map[string][]byte) error) *MockBinlogIO_Upload_Call { + _c.Call.Return(run) + return _c +} + +// NewMockBinlogIO creates a new instance of MockBinlogIO. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockBinlogIO(t interface { + mock.TestingT + Cleanup(func()) +}) *MockBinlogIO { + mock := &MockBinlogIO{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/datanode/iterators/binlog_iterator.go b/internal/datanode/iterators/binlog_iterator.go index 884efaf14e2c4..8f3d371d5439c 100644 --- a/internal/datanode/iterators/binlog_iterator.go +++ b/internal/datanode/iterators/binlog_iterator.go @@ -74,7 +74,7 @@ func (i *BinlogIterator) Next() (*LabeledRowData, error) { row := &InsertRow{ ID: i.data.Data[common.RowIDField].GetRow(i.pos).(int64), Timestamp: uint64(i.data.Data[common.TimeStampField].GetRow(i.pos).(int64)), - PK: pk, + Pk: pk, Value: fields, } i.pos++ diff --git a/internal/datanode/iterators/binlog_iterator_test.go b/internal/datanode/iterators/binlog_iterator_test.go index e9e95ba44290b..79a395b61ea58 100644 --- a/internal/datanode/iterators/binlog_iterator_test.go +++ b/internal/datanode/iterators/binlog_iterator_test.go @@ -79,10 +79,16 @@ func (s *InsertBinlogIteratorSuite) TestBinlogIterator() { rows = append(rows, labeled.data) + label := labeled.GetLabel() + s.NotNil(label) + s.EqualValues(19530, label.segmentID) + s.EqualValues(19530, labeled.GetSegmentID()) + insertRow, ok := labeled.data.(*InsertRow) s.True(ok) - s.Equal(insertData.Data[Int64Field].GetRow(idx).(int64), insertRow.PK.GetValue().(int64)) + s.EqualValues(insertData.Data[TimestampField].GetRow(idx).(int64), labeled.GetTimestamp()) + s.Equal(insertData.Data[Int64Field].GetRow(idx).(int64), labeled.GetPk().GetValue().(int64)) s.Equal(insertData.Data[RowIDField].GetRow(idx).(int64), insertRow.ID) s.Equal(insertData.Data[BoolField].GetRow(idx).(bool), insertRow.Value[BoolField].(bool)) s.Equal(insertData.Data[Int8Field].GetRow(idx).(int8), insertRow.Value[Int8Field].(int8)) diff --git a/internal/datanode/iterators/deltalog_iterator_test.go b/internal/datanode/iterators/deltalog_iterator_test.go index 930b3f0f17fa5..8fc1bd412e326 100644 --- a/internal/datanode/iterators/deltalog_iterator_test.go +++ b/internal/datanode/iterators/deltalog_iterator_test.go @@ -17,6 +17,12 @@ type DeltalogIteratorSuite struct { } func (s *DeltalogIteratorSuite) TestDeltalogIteratorIntPK() { + s.Run("invalid blobs", func() { + iter, err := NewDeltalogIterator([][]byte{}, nil) + s.Error(err) + s.Nil(iter) + }) + testpks := []int64{1, 2, 3, 4} testtss := []uint64{43757345, 43757346, 43757347, 43757348} @@ -43,8 +49,8 @@ func (s *DeltalogIteratorSuite) TestDeltalogIteratorIntPK() { s.NoError(err) s.Equal(labeled.GetSegmentID(), int64(100)) - gotpks = append(gotpks, labeled.data.(*DeltalogRow).Pk.GetValue().(int64)) - gottss = append(gottss, labeled.data.(*DeltalogRow).Timestamp) + gotpks = append(gotpks, labeled.GetPk().GetValue().(int64)) + gottss = append(gottss, labeled.GetTimestamp()) } s.ElementsMatch(gotpks, testpks) diff --git a/internal/datanode/iterators/iterator.go b/internal/datanode/iterators/iterator.go index d7d9c26c3ccfc..158db4ad80116 100644 --- a/internal/datanode/iterators/iterator.go +++ b/internal/datanode/iterators/iterator.go @@ -16,20 +16,39 @@ var ( const InvalidID int64 = -1 -type Row interface{} +type Row interface { + GetPk() storage.PrimaryKey + GetTimestamp() uint64 +} type InsertRow struct { ID int64 - PK storage.PrimaryKey + Pk storage.PrimaryKey Timestamp typeutil.Timestamp Value map[storage.FieldID]interface{} } +func (r *InsertRow) GetPk() storage.PrimaryKey { + return r.Pk +} + +func (r *InsertRow) GetTimestamp() uint64 { + return r.Timestamp +} + type DeltalogRow struct { Pk storage.PrimaryKey Timestamp typeutil.Timestamp } +func (r *DeltalogRow) GetPk() storage.PrimaryKey { + return r.Pk +} + +func (r *DeltalogRow) GetTimestamp() uint64 { + return r.Timestamp +} + type Label struct { segmentID typeutil.UniqueID } @@ -39,11 +58,19 @@ type LabeledRowData struct { data Row } -func (l *LabeledRowData) GetSegmentID() typeutil.UniqueID { - if l.label == nil { - return InvalidID - } +func (l *LabeledRowData) GetLabel() *Label { + return l.label +} +func (l *LabeledRowData) GetPk() storage.PrimaryKey { + return l.data.GetPk() +} + +func (l *LabeledRowData) GetTimestamp() uint64 { + return l.data.GetTimestamp() +} + +func (l *LabeledRowData) GetSegmentID() typeutil.UniqueID { return l.label.segmentID } diff --git a/internal/datanode/l0_compactor.go b/internal/datanode/l0_compactor.go new file mode 100644 index 0000000000000..2f1bcd6831552 --- /dev/null +++ b/internal/datanode/l0_compactor.go @@ -0,0 +1,346 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package datanode + +import ( + "context" + "time" + + "github.com/samber/lo" + "go.uber.org/zap" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/internal/datanode/allocator" + "github.com/milvus-io/milvus/internal/datanode/io" + iter "github.com/milvus-io/milvus/internal/datanode/iterators" + "github.com/milvus-io/milvus/internal/datanode/metacache" + "github.com/milvus-io/milvus/internal/datanode/syncmgr" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/funcutil" + "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/metautil" + "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/timerecord" +) + +type levelZeroCompactionTask struct { + compactor + io.BinlogIO + + allocator allocator.Allocator + metacache metacache.MetaCache + syncmgr syncmgr.SyncManager + + plan *datapb.CompactionPlan + + ctx context.Context + cancel context.CancelFunc + + done chan struct{} + tr *timerecord.TimeRecorder +} + +func newLevelZeroCompactionTask( + ctx context.Context, + binlogIO io.BinlogIO, + alloc allocator.Allocator, + metaCache metacache.MetaCache, + syncmgr syncmgr.SyncManager, + plan *datapb.CompactionPlan, +) *levelZeroCompactionTask { + ctx, cancel := context.WithCancel(ctx) + return &levelZeroCompactionTask{ + ctx: ctx, + cancel: cancel, + + BinlogIO: binlogIO, + allocator: alloc, + metacache: metaCache, + syncmgr: syncmgr, + plan: plan, + tr: timerecord.NewTimeRecorder("levelzero compaction"), + done: make(chan struct{}, 1), + } +} + +func (t *levelZeroCompactionTask) complete() { + t.done <- struct{}{} +} + +func (t *levelZeroCompactionTask) stop() { + t.cancel() + <-t.done +} + +func (t *levelZeroCompactionTask) getPlanID() UniqueID { + return t.plan.GetPlanID() +} + +func (t *levelZeroCompactionTask) getChannelName() string { + return t.plan.GetChannel() +} + +func (t *levelZeroCompactionTask) getCollection() int64 { + return t.metacache.Collection() +} + +// Do nothing for levelzero compaction +func (t *levelZeroCompactionTask) injectDone() {} + +func (t *levelZeroCompactionTask) compact() (*datapb.CompactionPlanResult, error) { + log := log.With(zap.Int64("planID", t.plan.GetPlanID()), zap.String("type", t.plan.GetType().String())) + log.Info("L0 compaction", zap.Duration("wait in queue elapse", t.tr.RecordSpan())) + + if !funcutil.CheckCtxValid(t.ctx) { + log.Warn("compact wrong, task context done or timeout") + return nil, errContext + } + + ctxTimeout, cancelAll := context.WithTimeout(t.ctx, time.Duration(t.plan.GetTimeoutInSeconds())*time.Second) + defer cancelAll() + + l0Segments := lo.Filter(t.plan.GetSegmentBinlogs(), func(s *datapb.CompactionSegmentBinlogs, _ int) bool { + return s.Level == datapb.SegmentLevel_L0 + }) + + targetSegIDs := lo.FilterMap(t.plan.GetSegmentBinlogs(), func(s *datapb.CompactionSegmentBinlogs, _ int) (int64, bool) { + if s.Level == datapb.SegmentLevel_L1 { + return s.GetSegmentID(), true + } + return 0, false + }) + if len(targetSegIDs) == 0 { + log.Warn("compact wrong, not target sealed segments") + return nil, errIllegalCompactionPlan + } + + var ( + totalSize int64 + totalDeltalogs = make(map[UniqueID][]string) + ) + for _, s := range l0Segments { + paths := []string{} + for _, d := range s.GetDeltalogs() { + for _, l := range d.GetBinlogs() { + paths = append(paths, l.GetLogPath()) + totalSize += l.GetLogSize() + } + } + if len(paths) > 0 { + totalDeltalogs[s.GetSegmentID()] = paths + } + } + + // TODO + // batchProcess := func() ([]*datapb.CompactionSegment, error) { + // resultSegments := make(map[int64]*datapb.CompactionSegment) + // + // iters, err := t.loadDelta(ctxTimeout, lo.Values(totalDeltalogs)...) + // if err != nil { + // return nil, err + // } + // log.Info("Batch L0 compaction load delta into memeory", zap.Duration("elapse", t.tr.RecordSpan())) + // + // alteredSegments := make(map[int64]*storage.DeleteData) + // err = t.splitDelta(iters, alteredSegments, targetSegIDs) + // if err != nil { + // return nil, err + // } + // log.Info("Batch L0 compaction split delta into segments", zap.Duration("elapse", t.tr.RecordSpan())) + // + // err = t.uploadByCheck(ctxTimeout, false, alteredSegments, resultSegments) + // log.Info("Batch L0 compaction upload all", zap.Duration("elapse", t.tr.RecordSpan())) + // + // return lo.Values(resultSegments), nil + // } + + linearProcess := func() ([]*datapb.CompactionSegment, error) { + var ( + resultSegments = make(map[int64]*datapb.CompactionSegment) + alteredSegments = make(map[int64]*storage.DeleteData) + ) + for segID, deltaLogs := range totalDeltalogs { + log := log.With(zap.Int64("levelzero segment", segID)) + log.Info("Linear L0 compaction processing segment", zap.Int64s("target segmentIDs", targetSegIDs)) + + allIters, err := t.loadDelta(ctxTimeout, deltaLogs) + if err != nil { + log.Warn("Linear L0 compaction loadDelta fail", zap.Error(err)) + return nil, err + } + + err = t.splitDelta(allIters, alteredSegments, targetSegIDs) + if err != nil { + log.Warn("Linear L0 compaction splitDelta fail", zap.Error(err)) + return nil, err + } + + err = t.uploadByCheck(ctxTimeout, true, alteredSegments, resultSegments) + if err != nil { + log.Warn("Linear L0 compaction upload buffer fail", zap.Error(err)) + return nil, err + } + } + + err := t.uploadByCheck(ctxTimeout, false, alteredSegments, resultSegments) + if err != nil { + log.Warn("Linear L0 compaction upload all buffer fail", zap.Error(err)) + return nil, err + } + log.Warn("Linear L0 compaction finished", zap.Duration("elapse", t.tr.RecordSpan())) + return lo.Values(resultSegments), nil + } + + var ( + resultSegments []*datapb.CompactionSegment + err error + ) + // if totalSize*3 < int64(hardware.GetFreeMemoryCount()) { + // resultSegments, err = batchProcess() + // } + resultSegments, err = linearProcess() + if err != nil { + return nil, err + } + + result := &datapb.CompactionPlanResult{ + PlanID: t.plan.GetPlanID(), + State: commonpb.CompactionState_Completed, + Segments: resultSegments, + Channel: t.plan.GetChannel(), + } + + log.Info("L0 compaction finished", zap.Duration("elapse", t.tr.ElapseSpan())) + + return result, nil +} + +func (t *levelZeroCompactionTask) loadDelta(ctx context.Context, deltaLogs ...[]string) ([]*iter.DeltalogIterator, error) { + allIters := make([]*iter.DeltalogIterator, 0) + for _, paths := range deltaLogs { + blobs, err := t.Download(ctx, paths) + if err != nil { + return nil, err + } + + deltaIter, err := iter.NewDeltalogIterator(blobs, nil) + if err != nil { + return nil, err + } + + allIters = append(allIters, deltaIter) + } + return allIters, nil +} + +func (t *levelZeroCompactionTask) splitDelta( + allIters []*iter.DeltalogIterator, + targetSegBuffer map[int64]*storage.DeleteData, + targetSegIDs []int64, +) error { + // spilt all delete data to segments + for _, deltaIter := range allIters { + for deltaIter.HasNext() { + labeled, err := deltaIter.Next() + if err != nil { + return err + } + + predicted, found := t.metacache.PredictSegments(labeled.GetPk(), metacache.WithSegmentIDs(targetSegIDs...)) + if !found { + continue + } + + for _, gotSeg := range predicted { + delBuffer, ok := targetSegBuffer[gotSeg] + if !ok { + delBuffer = &storage.DeleteData{} + targetSegBuffer[gotSeg] = delBuffer + } + + delBuffer.Append(labeled.GetPk(), labeled.GetTimestamp()) + } + } + } + return nil +} + +func (t *levelZeroCompactionTask) composeDeltalog(segmentID int64, dData *storage.DeleteData) (map[string][]byte, *datapb.Binlog, error) { + var ( + collID = t.metacache.Collection() + uploadKv = make(map[string][]byte) + ) + + seg, ok := t.metacache.GetSegmentByID(segmentID) + if !ok { + return nil, nil, merr.WrapErrSegmentLack(segmentID) + } + blob, err := storage.NewDeleteCodec().Serialize(collID, seg.PartitionID(), segmentID, dData) + if err != nil { + return nil, nil, err + } + + logID, err := t.allocator.AllocOne() + if err != nil { + return nil, nil, err + } + + blobKey := metautil.JoinIDPath(collID, seg.PartitionID(), segmentID, logID) + blobPath := t.BinlogIO.JoinFullPath(common.SegmentDeltaLogPath, blobKey) + + uploadKv[blobPath] = blob.GetValue() + + // TODO Timestamp? + deltalog := &datapb.Binlog{ + LogSize: int64(len(blob.GetValue())), + LogPath: blobPath, + LogID: logID, + } + + return uploadKv, deltalog, nil +} + +func (t *levelZeroCompactionTask) uploadByCheck(ctx context.Context, requireCheck bool, alteredSegments map[int64]*storage.DeleteData, resultSegments map[int64]*datapb.CompactionSegment) error { + for segID, dData := range alteredSegments { + if !requireCheck || (dData.Size() >= paramtable.Get().DataNodeCfg.FlushDeleteBufferBytes.GetAsInt64()) { + blobs, binlog, err := t.composeDeltalog(segID, dData) + if err != nil { + return err + } + err = t.Upload(ctx, blobs) + if err != nil { + return err + } + + if _, ok := resultSegments[segID]; !ok { + resultSegments[segID] = &datapb.CompactionSegment{ + SegmentID: segID, + Deltalogs: []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{binlog}}}, + Channel: t.plan.GetChannel(), + } + } else { + resultSegments[segID].Deltalogs[0].Binlogs = append(resultSegments[segID].Deltalogs[0].Binlogs, binlog) + } + + delete(alteredSegments, segID) + } + } + return nil +} diff --git a/internal/datanode/l0_compactor_test.go b/internal/datanode/l0_compactor_test.go new file mode 100644 index 0000000000000..2dc36d6b4a7d5 --- /dev/null +++ b/internal/datanode/l0_compactor_test.go @@ -0,0 +1,428 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package datanode + +import ( + "context" + "path" + "testing" + + "github.com/cockroachdb/errors" + "github.com/samber/lo" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" + "go.uber.org/zap" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/internal/datanode/allocator" + "github.com/milvus-io/milvus/internal/datanode/io" + iter "github.com/milvus-io/milvus/internal/datanode/iterators" + "github.com/milvus-io/milvus/internal/datanode/metacache" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/metautil" + "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/timerecord" +) + +func TestLevelZeroCompactionTaskSuite(t *testing.T) { + suite.Run(t, new(LevelZeroCompactionTaskSuite)) +} + +type LevelZeroCompactionTaskSuite struct { + suite.Suite + + mockBinlogIO *io.MockBinlogIO + mockAlloc *allocator.MockAllocator + mockMeta *metacache.MockMetaCache + task *levelZeroCompactionTask + + dData *storage.DeleteData + dBlob []byte +} + +func (s *LevelZeroCompactionTaskSuite) SetupTest() { + s.mockAlloc = allocator.NewMockAllocator(s.T()) + s.mockBinlogIO = io.NewMockBinlogIO(s.T()) + s.mockMeta = metacache.NewMockMetaCache(s.T()) + // plan of the task is unset + s.task = newLevelZeroCompactionTask(context.Background(), s.mockBinlogIO, s.mockAlloc, s.mockMeta, nil, nil) + + pk2ts := map[int64]uint64{ + 1: 20000, + 2: 20001, + 3: 20002, + } + + s.dData = storage.NewDeleteData([]storage.PrimaryKey{}, []Timestamp{}) + for pk, ts := range pk2ts { + s.dData.Append(storage.NewInt64PrimaryKey(pk), ts) + } + + dataCodec := storage.NewDeleteCodec() + blob, err := dataCodec.Serialize(0, 0, 0, s.dData) + s.Require().NoError(err) + s.dBlob = blob.GetValue() +} + +func (s *LevelZeroCompactionTaskSuite) TestCompactLinear() { + plan := &datapb.CompactionPlan{ + PlanID: 19530, + Type: datapb.CompactionType_Level0DeleteCompaction, + SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ + { + SegmentID: 100, Level: datapb.SegmentLevel_L0, Deltalogs: []*datapb.FieldBinlog{ + { + Binlogs: []*datapb.Binlog{ + {LogPath: "a/b/c1", LogSize: 100}, + {LogPath: "a/b/c2", LogSize: 100}, + {LogPath: "a/b/c3", LogSize: 100}, + {LogPath: "a/b/c4", LogSize: 100}, + }, + }, + }, + }, + { + SegmentID: 101, Level: datapb.SegmentLevel_L0, Deltalogs: []*datapb.FieldBinlog{ + { + Binlogs: []*datapb.Binlog{ + {LogPath: "a/d/c1", LogSize: 100}, + {LogPath: "a/d/c2", LogSize: 100}, + {LogPath: "a/d/c3", LogSize: 100}, + {LogPath: "a/d/c4", LogSize: 100}, + }, + }, + }, + }, + {SegmentID: 200, Level: datapb.SegmentLevel_L1}, + {SegmentID: 201, Level: datapb.SegmentLevel_L1}, + }, + } + + s.task.plan = plan + s.task.tr = timerecord.NewTimeRecorder("test") + + s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return([][]byte{s.dBlob}, nil).Times(2) + s.mockMeta.EXPECT().PredictSegments(mock.Anything, mock.Anything).Return([]int64{200, 201}, true) + s.mockMeta.EXPECT().Collection().Return(1) + s.mockMeta.EXPECT().GetSegmentByID(mock.Anything, mock.Anything). + RunAndReturn(func(id int64, filters ...metacache.SegmentFilter) (*metacache.SegmentInfo, bool) { + return metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: id, PartitionID: 10}, nil), true + }) + + s.mockAlloc.EXPECT().AllocOne().Return(19530, nil).Times(2) + s.mockBinlogIO.EXPECT().JoinFullPath(mock.Anything, mock.Anything). + RunAndReturn(func(paths ...string) string { + return path.Join(paths...) + }).Times(2) + s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil).Times(2) + + s.Require().Equal(plan.GetPlanID(), s.task.getPlanID()) + s.Require().Equal(plan.GetChannel(), s.task.getChannelName()) + s.Require().EqualValues(1, s.task.getCollection()) + + result, err := s.task.compact() + s.NoError(err) + s.NotNil(result) + s.Equal(commonpb.CompactionState_Completed, result.GetState()) + s.Equal(plan.GetChannel(), result.GetChannel()) + s.Equal(2, len(result.GetSegments())) + s.ElementsMatch([]int64{200, 201}, + lo.Map(result.GetSegments(), func(seg *datapb.CompactionSegment, _ int) int64 { + return seg.GetSegmentID() + })) + + s.EqualValues(plan.GetPlanID(), result.GetPlanID()) + log.Info("test segment results", zap.Any("result", result)) + + s.task.complete() + s.task.stop() +} + +func (s *LevelZeroCompactionTaskSuite) TestCompactBatch() { + s.T().Skip() + plan := &datapb.CompactionPlan{ + PlanID: 19530, + Type: datapb.CompactionType_Level0DeleteCompaction, + SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ + { + SegmentID: 100, Level: datapb.SegmentLevel_L0, Deltalogs: []*datapb.FieldBinlog{ + { + Binlogs: []*datapb.Binlog{ + {LogPath: "a/b/c1", LogSize: 100}, + {LogPath: "a/b/c2", LogSize: 100}, + {LogPath: "a/b/c3", LogSize: 100}, + {LogPath: "a/b/c4", LogSize: 100}, + }, + }, + }, + }, + { + SegmentID: 101, Level: datapb.SegmentLevel_L0, Deltalogs: []*datapb.FieldBinlog{ + { + Binlogs: []*datapb.Binlog{ + {LogPath: "a/d/c1", LogSize: 100}, + {LogPath: "a/d/c2", LogSize: 100}, + {LogPath: "a/d/c3", LogSize: 100}, + {LogPath: "a/d/c4", LogSize: 100}, + }, + }, + }, + }, + {SegmentID: 200, Level: datapb.SegmentLevel_L1}, + {SegmentID: 201, Level: datapb.SegmentLevel_L1}, + }, + } + + s.task.plan = plan + s.task.tr = timerecord.NewTimeRecorder("test") + + s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return([][]byte{s.dBlob}, nil).Times(2) + s.mockMeta.EXPECT().PredictSegments(mock.Anything, mock.Anything).Return([]int64{200, 201}, true) + s.mockMeta.EXPECT().Collection().Return(1) + s.mockMeta.EXPECT().GetSegmentByID(mock.Anything, mock.Anything). + RunAndReturn(func(id int64, filters ...metacache.SegmentFilter) (*metacache.SegmentInfo, bool) { + return metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: id, PartitionID: 10}, nil), true + }) + + s.mockAlloc.EXPECT().AllocOne().Return(19530, nil).Times(2) + s.mockBinlogIO.EXPECT().JoinFullPath(mock.Anything, mock.Anything). + RunAndReturn(func(paths ...string) string { + return path.Join(paths...) + }).Times(2) + s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil).Times(2) + + result, err := s.task.compact() + s.NoError(err) + s.NotNil(result) + s.Equal(commonpb.CompactionState_Completed, result.GetState()) + s.Equal(plan.GetChannel(), result.GetChannel()) + s.Equal(2, len(result.GetSegments())) + s.ElementsMatch([]int64{200, 201}, + lo.Map(result.GetSegments(), func(seg *datapb.CompactionSegment, _ int) int64 { + return seg.GetSegmentID() + })) + + s.EqualValues(plan.GetPlanID(), result.GetPlanID()) + log.Info("test segment results", zap.Any("result", result)) +} + +func (s *LevelZeroCompactionTaskSuite) TestUploadByCheck() { + ctx := context.Background() + + s.Run("upload directly", func() { + s.SetupTest() + s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil) + s.mockMeta.EXPECT().Collection().Return(1) + s.mockMeta.EXPECT().GetSegmentByID( + mock.MatchedBy(func(ID int64) bool { + return ID == 100 + }), mock.Anything). + Return(metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 100, PartitionID: 10}, nil), true) + + s.mockAlloc.EXPECT().AllocOne().Return(19530, nil) + blobKey := metautil.JoinIDPath(1, 10, 100, 19530) + blobPath := path.Join(common.SegmentDeltaLogPath, blobKey) + s.mockBinlogIO.EXPECT().JoinFullPath(mock.Anything, mock.Anything).Return(blobPath) + segments := map[int64]*storage.DeleteData{100: s.dData} + results := make(map[int64]*datapb.CompactionSegment) + err := s.task.uploadByCheck(ctx, false, segments, results) + s.NoError(err) + s.Equal(1, len(results)) + + seg1, ok := results[100] + s.True(ok) + s.EqualValues(100, seg1.GetSegmentID()) + s.Equal(1, len(seg1.GetDeltalogs())) + s.Equal(1, len(seg1.GetDeltalogs()[0].GetBinlogs())) + }) + + s.Run("check without upload", func() { + s.SetupTest() + segments := map[int64]*storage.DeleteData{100: s.dData} + results := make(map[int64]*datapb.CompactionSegment) + s.Require().Empty(results) + + err := s.task.uploadByCheck(ctx, true, segments, results) + s.NoError(err) + s.Empty(results) + }) + + s.Run("check with upload", func() { + blobKey := metautil.JoinIDPath(1, 10, 100, 19530) + blobPath := path.Join(common.SegmentDeltaLogPath, blobKey) + + s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil) + s.mockMeta.EXPECT().Collection().Return(1) + s.mockMeta.EXPECT().GetSegmentByID( + mock.MatchedBy(func(ID int64) bool { + return ID == 100 + }), mock.Anything). + Return(metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 100, PartitionID: 10}, nil), true) + + s.mockAlloc.EXPECT().AllocOne().Return(19530, nil) + s.mockBinlogIO.EXPECT().JoinFullPath(mock.Anything, mock.Anything).Return(blobPath) + + segments := map[int64]*storage.DeleteData{100: s.dData} + results := map[int64]*datapb.CompactionSegment{ + 100: {SegmentID: 100, Deltalogs: []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{LogID: 1}}}}}, + } + s.Require().Equal(1, len(results)) + + paramtable.Get().Save(paramtable.Get().DataNodeCfg.FlushDeleteBufferBytes.Key, "1") + defer paramtable.Get().Reset(paramtable.Get().DataNodeCfg.FlushDeleteBufferBytes.Key) + err := s.task.uploadByCheck(ctx, true, segments, results) + s.NoError(err) + s.NotEmpty(results) + s.Equal(1, len(results)) + + seg1, ok := results[100] + s.True(ok) + s.EqualValues(100, seg1.GetSegmentID()) + s.Equal(1, len(seg1.GetDeltalogs())) + s.Equal(2, len(seg1.GetDeltalogs()[0].GetBinlogs())) + }) +} + +func (s *LevelZeroCompactionTaskSuite) TestComposeDeltalog() { + s.mockMeta.EXPECT().Collection().Return(1) + s.mockMeta.EXPECT(). + GetSegmentByID( + mock.MatchedBy(func(ID int64) bool { + return ID == 100 + }), mock.Anything). + Return(metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 100, PartitionID: 10}, nil), true) + + s.mockMeta.EXPECT(). + GetSegmentByID( + mock.MatchedBy(func(ID int64) bool { + return ID == 101 + }), mock.Anything). + Return(nil, false) + + s.mockAlloc.EXPECT().AllocOne().Return(19530, nil) + + blobKey := metautil.JoinIDPath(1, 10, 100, 19530) + blobPath := path.Join(common.SegmentDeltaLogPath, blobKey) + s.mockBinlogIO.EXPECT().JoinFullPath(mock.Anything, mock.Anything).Return(blobPath) + + kvs, binlog, err := s.task.composeDeltalog(100, s.dData) + s.NoError(err) + s.Equal(1, len(kvs)) + v, ok := kvs[blobPath] + s.True(ok) + s.NotNil(v) + s.Equal(blobPath, binlog.LogPath) + + _, _, err = s.task.composeDeltalog(101, s.dData) + s.Error(err) +} + +func (s *LevelZeroCompactionTaskSuite) TestSplitDelta() { + predicted := []int64{100, 101, 102} + s.mockMeta.EXPECT().PredictSegments(mock.MatchedBy(func(pk storage.PrimaryKey) bool { + return pk.GetValue().(int64) == 1 + }), mock.Anything).Return([]int64{100}, true) + s.mockMeta.EXPECT().PredictSegments(mock.MatchedBy(func(pk storage.PrimaryKey) bool { + return pk.GetValue().(int64) == 2 + }), mock.Anything).Return(nil, false) + s.mockMeta.EXPECT().PredictSegments(mock.MatchedBy(func(pk storage.PrimaryKey) bool { + return pk.GetValue().(int64) == 3 + }), mock.Anything).Return([]int64{100, 101, 102}, true) + + diter, err := iter.NewDeltalogIterator([][]byte{s.dBlob}, nil) + s.Require().NoError(err) + s.Require().NotNil(diter) + + targetSegBuffer := make(map[int64]*storage.DeleteData) + targetSegIDs := predicted + err = s.task.splitDelta([]*iter.DeltalogIterator{diter}, targetSegBuffer, targetSegIDs) + s.NoError(err) + + s.NotEmpty(targetSegBuffer) + s.ElementsMatch(predicted, lo.Keys(targetSegBuffer)) + s.EqualValues(2, targetSegBuffer[100].RowCount) + s.EqualValues(1, targetSegBuffer[101].RowCount) + s.EqualValues(1, targetSegBuffer[102].RowCount) + + s.ElementsMatch([]storage.PrimaryKey{storage.NewInt64PrimaryKey(1), storage.NewInt64PrimaryKey(3)}, targetSegBuffer[100].Pks) + s.Equal(storage.NewInt64PrimaryKey(3), targetSegBuffer[101].Pks[0]) + s.Equal(storage.NewInt64PrimaryKey(3), targetSegBuffer[102].Pks[0]) +} + +func (s *LevelZeroCompactionTaskSuite) TestLoadDelta() { + ctx := context.TODO() + + s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.MatchedBy( + func(paths []string) bool { + return len(paths) > 0 && paths[0] == "correct" + })).Return([][]byte{s.dBlob}, nil).Once() + + s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.MatchedBy( + func(paths []string) bool { + return len(paths) > 0 && paths[0] == "error" + })).Return(nil, errors.New("mock err")).Once() + + s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.MatchedBy( + func(paths []string) bool { + return len(paths) > 0 && paths[0] == "invalid-blobs" + })).Return([][]byte{{1}}, nil).Once() + + tests := []struct { + description string + paths []string + + expectNilIter bool + expectError bool + }{ + {"no error", []string{"correct"}, false, false}, + {"download error", []string{"error"}, true, true}, + {"new iter error", []string{"invalid-blobs"}, true, true}, + } + + for _, test := range tests { + iters, err := s.task.loadDelta(ctx, test.paths) + if test.expectNilIter { + s.Nil(iters) + } else { + s.NotNil(iters) + s.Equal(1, len(iters)) + s.True(iters[0].HasNext()) + + iter := iters[0] + var pks []storage.PrimaryKey + var tss []storage.Timestamp + for iter.HasNext() { + labeled, err := iter.Next() + s.NoError(err) + pks = append(pks, labeled.GetPk()) + tss = append(tss, labeled.GetTimestamp()) + } + + s.ElementsMatch(pks, s.dData.Pks) + s.ElementsMatch(tss, s.dData.Tss) + } + + if test.expectError { + s.Error(err) + } else { + s.NoError(err) + } + } +} diff --git a/internal/datanode/services.go b/internal/datanode/services.go index a048ccb6b4035..fbf666eee6a9d 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -34,6 +34,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/datanode/io" "github.com/milvus-io/milvus/internal/datanode/metacache" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/etcdpb" @@ -250,19 +251,35 @@ func (node *DataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan return merr.Status(merr.WrapErrChannelNotFound(req.GetChannel(), "channel is dropping")), nil } - binlogIO := &binlogIO{node.chunkManager, ds.idAllocator} - task := newCompactionTask( - node.ctx, - binlogIO, binlogIO, - ds.metacache, - ds.syncMgr, - ds.idAllocator, - req, - node.chunkManager, - ) + var task compactor + switch req.GetType() { + case datapb.CompactionType_Level0DeleteCompaction: + binlogIO := io.NewBinlogIO(node.chunkManager, getOrCreateIOPool()) + task = newLevelZeroCompactionTask( + node.ctx, + binlogIO, + node.allocator, + ds.metacache, + node.syncMgr, + req, + ) + case datapb.CompactionType_MixCompaction, datapb.CompactionType_MinorCompaction: + // TODO, replace this binlogIO with io.BinlogIO + binlogIO := &binlogIO{node.chunkManager, ds.idAllocator} + task = newCompactionTask( + node.ctx, + binlogIO, binlogIO, + ds.metacache, + node.syncMgr, + node.allocator, + req, + ) + default: + log.Warn("Unknown compaction type", zap.String("type", req.GetType().String())) + return merr.Status(merr.WrapErrParameterInvalidMsg("Unknown compaction type: %v", req.GetType().String())), nil + } node.compactionExecutor.execute(task) - return merr.Success(), nil } diff --git a/internal/storage/data_codec.go b/internal/storage/data_codec.go index 77630e41f227e..00cfb5c9d1061 100644 --- a/internal/storage/data_codec.go +++ b/internal/storage/data_codec.go @@ -874,6 +874,15 @@ func (data *DeleteData) Merge(other *DeleteData) { other.RowCount = 0 } +func (data *DeleteData) Size() int64 { + var size int64 + for _, pk := range data.Pks { + size += pk.Size() + } + + return size +} + // DeleteCodec serializes and deserializes the delete data type DeleteCodec struct{}