From e585f6d2b40d76774ebbc98e0e3ccb1eb3e56ff6 Mon Sep 17 00:00:00 2001 From: Ted Xu Date: Thu, 31 Oct 2024 14:40:20 +0800 Subject: [PATCH] enhance: generally improve the performance of mix compactions #37163 (#37268) See #37234 pr: #37163 Signed-off-by: Ted Xu --- .../compaction/clustering_compactor_test.go | 3 +- internal/datanode/compaction/mix_compactor.go | 28 ++++--- .../datanode/compaction/mix_compactor_test.go | 81 +++++++++++++++---- .../datanode/compaction/segment_writer.go | 8 +- internal/storage/binlog_iterator_test.go | 31 +++++-- internal/storage/serde.go | 69 ++++------------ internal/storage/serde_events.go | 9 ++- 7 files changed, 136 insertions(+), 93 deletions(-) diff --git a/internal/datanode/compaction/clustering_compactor_test.go b/internal/datanode/compaction/clustering_compactor_test.go index ba7bc2b650fa2..a1486e820f3a0 100644 --- a/internal/datanode/compaction/clustering_compactor_test.go +++ b/internal/datanode/compaction/clustering_compactor_test.go @@ -277,8 +277,9 @@ func (s *ClusteringCompactionTaskSuite) TestCompactionWithBM25Function() { // 8 + 8 + 8 + 7 + 8 = 39 // 39*1024 = 39936 + // plus buffer on null bitsets etc., let's make it 45000 // writer will automatically flush after 1024 rows. - paramtable.Get().Save(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key, "39935") + paramtable.Get().Save(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key, "45000") defer paramtable.Get().Reset(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key) compactionResult, err := s.task.Compact() diff --git a/internal/datanode/compaction/mix_compactor.go b/internal/datanode/compaction/mix_compactor.go index 3949ee27a2820..08e50da608445 100644 --- a/internal/datanode/compaction/mix_compactor.go +++ b/internal/datanode/compaction/mix_compactor.go @@ -154,10 +154,12 @@ func (t *mixCompactionTask) mergeSplit( return nil, err } for _, paths := range binlogPaths { - err := t.dealBinlogPaths(ctx, delta, mWriter, pkField, paths, &deletedRowCount, &expiredRowCount) + del, exp, err := t.writePaths(ctx, delta, mWriter, pkField, paths) if err != nil { return nil, err } + deletedRowCount += del + expiredRowCount += exp } res, err := mWriter.Finish() if err != nil { @@ -186,12 +188,14 @@ func isValueDeleted(v *storage.Value, delta map[interface{}]typeutil.Timestamp) return false } -func (t *mixCompactionTask) dealBinlogPaths(ctx context.Context, delta map[interface{}]typeutil.Timestamp, mWriter *MultiSegmentWriter, pkField *schemapb.FieldSchema, paths []string, deletedRowCount, expiredRowCount *int64) error { +func (t *mixCompactionTask) writePaths(ctx context.Context, delta map[interface{}]typeutil.Timestamp, + mWriter *MultiSegmentWriter, pkField *schemapb.FieldSchema, paths []string, +) (deletedRowCount, expiredRowCount int64, err error) { log := log.With(zap.Strings("paths", paths)) allValues, err := t.binlogIO.Download(ctx, paths) if err != nil { log.Warn("compact wrong, fail to download insertLogs", zap.Error(err)) - return err + return } blobs := lo.Map(allValues, func(v []byte, i int) *storage.Blob { @@ -201,42 +205,40 @@ func (t *mixCompactionTask) dealBinlogPaths(ctx context.Context, delta map[inter iter, err := storage.NewBinlogDeserializeReader(blobs, pkField.GetFieldID()) if err != nil { log.Warn("compact wrong, failed to new insert binlogs reader", zap.Error(err)) - return err + return } defer iter.Close() for { - err := iter.Next() + err = iter.Next() if err != nil { if err == sio.EOF { + err = nil break } else { log.Warn("compact wrong, failed to iter through data", zap.Error(err)) - return err + return } } v := iter.Value() if isValueDeleted(v, delta) { - oldDeletedRowCount := *deletedRowCount - *deletedRowCount = oldDeletedRowCount + 1 + deletedRowCount++ continue } // Filtering expired entity if isExpiredEntity(t.plan.GetCollectionTtl(), t.currentTs, typeutil.Timestamp(v.Timestamp)) { - oldExpiredRowCount := *expiredRowCount - *expiredRowCount = oldExpiredRowCount + 1 + expiredRowCount++ continue } err = mWriter.Write(v) if err != nil { log.Warn("compact wrong, failed to writer row", zap.Error(err)) - return err + return } } - - return nil + return } func (t *mixCompactionTask) Compact() (*datapb.CompactionPlanResult, error) { diff --git a/internal/datanode/compaction/mix_compactor_test.go b/internal/datanode/compaction/mix_compactor_test.go index 3ba2e2a39b100..5e98d2be0a44d 100644 --- a/internal/datanode/compaction/mix_compactor_test.go +++ b/internal/datanode/compaction/mix_compactor_test.go @@ -42,8 +42,6 @@ import ( "github.com/milvus-io/milvus/pkg/util/typeutil" ) -var compactTestDir = "/tmp/milvus_test/compact" - func TestMixCompactionTaskSuite(t *testing.T) { suite.Run(t, new(MixCompactionTaskSuite)) } @@ -146,7 +144,7 @@ func (s *MixCompactionTaskSuite) TestCompactDupPK() { // clear origial segments s.task.plan.SegmentBinlogs = make([]*datapb.CompactionSegmentBinlogs, 0) for _, segID := range segments { - s.initSegBuffer(segID) + s.initSegBuffer(1, segID) row := getRow(100) v := &storage.Value{ PK: storage.NewInt64PrimaryKey(100), @@ -193,7 +191,7 @@ func (s *MixCompactionTaskSuite) TestCompactTwoToOne() { s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil) s.task.plan.SegmentBinlogs = make([]*datapb.CompactionSegmentBinlogs, 0) for _, segID := range segments { - s.initSegBuffer(segID) + s.initSegBuffer(1, segID) kvs, fBinlogs, err := serializeWrite(context.TODO(), alloc, s.segWriter) s.Require().NoError(err) s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.MatchedBy(func(keys []string) bool { @@ -220,7 +218,7 @@ func (s *MixCompactionTaskSuite) TestCompactTwoToOne() { }) result, err := s.task.Compact() - s.NoError(err) + s.Require().NoError(err) s.NotNil(result) s.Equal(s.task.plan.GetPlanID(), result.GetPlanID()) @@ -322,7 +320,7 @@ func (s *MixCompactionTaskSuite) TestCompactSortedSegment() { } func (s *MixCompactionTaskSuite) TestSplitMergeEntityExpired() { - s.initSegBuffer(3) + s.initSegBuffer(1, 3) collTTL := 864000 // 10 days currTs := tsoutil.ComposeTSByTime(getMilvusBirthday().Add(time.Second*(time.Duration(collTTL)+1)), 0) s.task.currentTs = currTs @@ -353,7 +351,7 @@ func (s *MixCompactionTaskSuite) TestSplitMergeEntityExpired() { } func (s *MixCompactionTaskSuite) TestMergeNoExpiration() { - s.initSegBuffer(4) + s.initSegBuffer(1, 4) deleteTs := tsoutil.ComposeTSByTime(getMilvusBirthday().Add(10*time.Second), 0) tests := []struct { description string @@ -657,17 +655,19 @@ func (s *MixCompactionTaskSuite) initSegBufferWithBM25(magic int64) { s.segWriter = segWriter } -func (s *MixCompactionTaskSuite) initSegBuffer(magic int64) { - segWriter, err := NewSegmentWriter(s.meta.GetSchema(), 100, magic, PartitionID, CollectionID, []int64{}) +func (s *MixCompactionTaskSuite) initSegBuffer(size int, seed int64) { + segWriter, err := NewSegmentWriter(s.meta.GetSchema(), 100, seed, PartitionID, CollectionID, []int64{}) s.Require().NoError(err) - v := storage.Value{ - PK: storage.NewInt64PrimaryKey(magic), - Timestamp: int64(tsoutil.ComposeTSByTime(getMilvusBirthday(), 0)), - Value: getRow(magic), + for i := 0; i < size; i++ { + v := storage.Value{ + PK: storage.NewInt64PrimaryKey(seed), + Timestamp: int64(tsoutil.ComposeTSByTime(getMilvusBirthday(), 0)), + Value: getRow(seed), + } + err = segWriter.Write(&v) + s.Require().NoError(err) } - err = segWriter.Write(&v) - s.Require().NoError(err) segWriter.FlushAndIsFull() s.segWriter = segWriter @@ -858,3 +858,54 @@ func genTestCollectionMeta() *etcdpb.CollectionMeta { }, } } + +func BenchmarkMixCompactor(b *testing.B) { + // Setup + s := new(MixCompactionTaskSuite) + + s.SetT(&testing.T{}) + s.SetupSuite() + s.SetupTest() + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + b.StopTimer() + seq := int64(i * 100000) + segments := []int64{seq, seq + 1, seq + 2} + alloc := allocator.NewLocalAllocator(seq+3, math.MaxInt64) + s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil) + s.task.plan.SegmentBinlogs = make([]*datapb.CompactionSegmentBinlogs, 0) + for _, segID := range segments { + s.initSegBuffer(100000, segID) + kvs, fBinlogs, err := serializeWrite(context.TODO(), alloc, s.segWriter) + s.Require().NoError(err) + s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.MatchedBy(func(keys []string) bool { + left, right := lo.Difference(keys, lo.Keys(kvs)) + return len(left) == 0 && len(right) == 0 + })).Return(lo.Values(kvs), nil).Once() + + s.plan.SegmentBinlogs = append(s.plan.SegmentBinlogs, &datapb.CompactionSegmentBinlogs{ + SegmentID: segID, + FieldBinlogs: lo.Values(fBinlogs), + }) + } + + b.StartTimer() + + result, err := s.task.Compact() + s.NoError(err) + s.NotNil(result) + s.Equal(s.task.plan.GetPlanID(), result.GetPlanID()) + s.Equal(1, len(result.GetSegments())) + segment := result.GetSegments()[0] + s.EqualValues(19531, segment.GetSegmentID()) + s.EqualValues(3, segment.GetNumOfRows()) + s.NotEmpty(segment.InsertLogs) + s.NotEmpty(segment.Field2StatslogPaths) + s.Empty(segment.Deltalogs) + + } + + s.TearDownTest() +} diff --git a/internal/datanode/compaction/segment_writer.go b/internal/datanode/compaction/segment_writer.go index 6ae6a6aa6a75d..b6171b2b6f829 100644 --- a/internal/datanode/compaction/segment_writer.go +++ b/internal/datanode/compaction/segment_writer.go @@ -330,6 +330,8 @@ type SegmentWriter struct { sch *schemapb.CollectionSchema rowCount *atomic.Int64 syncedSize *atomic.Int64 + + maxBinlogSize uint64 } func (w *SegmentWriter) GetRowNum() int64 { @@ -412,12 +414,12 @@ func (w *SegmentWriter) GetBm25StatsBlob() (map[int64]*storage.Blob, error) { } func (w *SegmentWriter) IsFull() bool { - return w.writer.WrittenMemorySize() > paramtable.Get().DataNodeCfg.BinLogMaxSize.GetAsUint64() + return w.writer.WrittenMemorySize() > w.maxBinlogSize } func (w *SegmentWriter) FlushAndIsFull() bool { w.writer.Flush() - return w.writer.WrittenMemorySize() > paramtable.Get().DataNodeCfg.BinLogMaxSize.GetAsUint64() + return w.writer.WrittenMemorySize() > w.maxBinlogSize } func (w *SegmentWriter) FlushAndIsFullWithBinlogMaxSize(binLogMaxSize uint64) bool { @@ -502,6 +504,8 @@ func NewSegmentWriter(sch *schemapb.CollectionSchema, maxCount int64, segID, par collectionID: collID, rowCount: atomic.NewInt64(0), syncedSize: atomic.NewInt64(0), + + maxBinlogSize: paramtable.Get().DataNodeCfg.BinLogMaxSize.GetAsUint64(), } for _, fieldID := range Bm25Fields { diff --git a/internal/storage/binlog_iterator_test.go b/internal/storage/binlog_iterator_test.go index 2f5ad6d7b6a36..343e7a835f5df 100644 --- a/internal/storage/binlog_iterator_test.go +++ b/internal/storage/binlog_iterator_test.go @@ -21,6 +21,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "google.golang.org/protobuf/proto" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" @@ -174,8 +175,28 @@ func generateTestData(num int) ([]*Blob, error) { return blobs, err } -// Verify value of index i (1-based numbering) in data generated by generateTestData func assertTestData(t *testing.T, i int, value *Value) { + assertTestDataInternal(t, i, value, true) +} + +// Verify value of index i (1-based numbering) in data generated by generateTestData +func assertTestDataInternal(t *testing.T, i int, value *Value, lazy bool) { + getf18 := func() any { + f18 := &schemapb.ScalarField{ + Data: &schemapb.ScalarField_IntData{ + IntData: &schemapb.IntArray{ + Data: []int32{int32(i), int32(i), int32(i)}, + }, + }, + } + if lazy { + f18b, err := proto.Marshal(f18) + assert.Nil(t, err) + return f18b + } + return f18 + } + f102 := make([]float32, 8) for j := range f102 { f102[j] = float32(i) @@ -205,7 +226,7 @@ func assertTestData(t *testing.T, i int, value *Value) { 15: float64(i), 16: fmt.Sprint(i), 17: fmt.Sprint(i), - 18: &schemapb.ScalarField{Data: &schemapb.ScalarField_IntData{IntData: &schemapb.IntArray{Data: []int32{int32(i), int32(i), int32(i)}}}}, + 18: getf18(), 19: []byte{byte(i)}, 101: int32(i), 102: f102, @@ -250,7 +271,7 @@ func TestInsertlogIterator(t *testing.T) { v, err := itr.Next() assert.NoError(t, err) value := v.(*Value) - assertTestData(t, i, value) + assertTestDataInternal(t, i, value, false) } assert.False(t, itr.HasNext()) @@ -290,7 +311,7 @@ func TestMergeIterator(t *testing.T) { v, err := itr.Next() assert.NoError(t, err) value := v.(*Value) - assertTestData(t, i, value) + assertTestDataInternal(t, i, value, false) } assert.False(t, itr.HasNext()) _, err = itr.Next() @@ -313,7 +334,7 @@ func TestMergeIterator(t *testing.T) { v, err := itr.Next() assert.NoError(t, err) value := v.(*Value) - assertTestData(t, i, value) + assertTestDataInternal(t, i, value, false) } } diff --git a/internal/storage/serde.go b/internal/storage/serde.go index 55cd416ee7d74..fc6f6468b2265 100644 --- a/internal/storage/serde.go +++ b/internal/storage/serde.go @@ -102,8 +102,6 @@ type serdeEntry struct { // serialize serializes the value to the builder, returns ok. // nil is serialized to null without checking the type nullability. serialize func(array.Builder, any) bool - // sizeof returns the size in bytes of the value - sizeof func(any) uint64 } var serdeMap = func() map[schemapb.DataType]serdeEntry { @@ -134,9 +132,6 @@ var serdeMap = func() map[schemapb.DataType]serdeEntry { } return false }, - func(any) uint64 { - return 1 - }, } m[schemapb.DataType_Int8] = serdeEntry{ func(i int) arrow.DataType { @@ -164,9 +159,6 @@ var serdeMap = func() map[schemapb.DataType]serdeEntry { } return false }, - func(any) uint64 { - return 1 - }, } m[schemapb.DataType_Int16] = serdeEntry{ func(i int) arrow.DataType { @@ -194,9 +186,6 @@ var serdeMap = func() map[schemapb.DataType]serdeEntry { } return false }, - func(any) uint64 { - return 2 - }, } m[schemapb.DataType_Int32] = serdeEntry{ func(i int) arrow.DataType { @@ -224,9 +213,6 @@ var serdeMap = func() map[schemapb.DataType]serdeEntry { } return false }, - func(any) uint64 { - return 4 - }, } m[schemapb.DataType_Int64] = serdeEntry{ func(i int) arrow.DataType { @@ -254,9 +240,6 @@ var serdeMap = func() map[schemapb.DataType]serdeEntry { } return false }, - func(any) uint64 { - return 8 - }, } m[schemapb.DataType_Float] = serdeEntry{ func(i int) arrow.DataType { @@ -284,9 +267,6 @@ var serdeMap = func() map[schemapb.DataType]serdeEntry { } return false }, - func(any) uint64 { - return 4 - }, } m[schemapb.DataType_Double] = serdeEntry{ func(i int) arrow.DataType { @@ -314,9 +294,6 @@ var serdeMap = func() map[schemapb.DataType]serdeEntry { } return false }, - func(any) uint64 { - return 8 - }, } stringEntry := serdeEntry{ func(i int) arrow.DataType { @@ -344,17 +321,14 @@ var serdeMap = func() map[schemapb.DataType]serdeEntry { } return false }, - func(v any) uint64 { - if v == nil { - return 8 - } - return uint64(len(v.(string))) - }, } m[schemapb.DataType_VarChar] = stringEntry m[schemapb.DataType_String] = stringEntry - m[schemapb.DataType_Array] = serdeEntry{ + + // We're not using the deserialized data in go, so we can skip the heavy pb serde. + // If there is need in the future, just assign it to m[schemapb.DataType_Array] + eagerArrayEntry := serdeEntry{ func(i int) arrow.DataType { return arrow.BinaryTypes.Binary }, @@ -385,20 +359,8 @@ var serdeMap = func() map[schemapb.DataType]serdeEntry { } return false }, - func(v any) uint64 { - if v == nil { - return 8 - } - return uint64(proto.Size(v.(*schemapb.ScalarField))) - }, - } - - sizeOfBytes := func(v any) uint64 { - if v == nil { - return 8 - } - return uint64(len(v.([]byte))) } + _ = eagerArrayEntry byteEntry := serdeEntry{ func(i int) arrow.DataType { @@ -419,16 +381,22 @@ var serdeMap = func() map[schemapb.DataType]serdeEntry { return true } if builder, ok := b.(*array.BinaryBuilder); ok { - if v, ok := v.([]byte); ok { - builder.Append(v) + if vv, ok := v.([]byte); ok { + builder.Append(vv) return true } + if vv, ok := v.(*schemapb.ScalarField); ok { + if bytes, err := proto.Marshal(vv); err == nil { + builder.Append(bytes) + return true + } + } } return false }, - sizeOfBytes, } + m[schemapb.DataType_Array] = byteEntry m[schemapb.DataType_JSON] = byteEntry fixedSizeDeserializer := func(a arrow.Array, i int) (any, bool) { @@ -460,7 +428,6 @@ var serdeMap = func() map[schemapb.DataType]serdeEntry { }, fixedSizeDeserializer, fixedSizeSerializer, - sizeOfBytes, } m[schemapb.DataType_Float16Vector] = serdeEntry{ func(i int) arrow.DataType { @@ -468,7 +435,6 @@ var serdeMap = func() map[schemapb.DataType]serdeEntry { }, fixedSizeDeserializer, fixedSizeSerializer, - sizeOfBytes, } m[schemapb.DataType_BFloat16Vector] = serdeEntry{ func(i int) arrow.DataType { @@ -476,7 +442,6 @@ var serdeMap = func() map[schemapb.DataType]serdeEntry { }, fixedSizeDeserializer, fixedSizeSerializer, - sizeOfBytes, } m[schemapb.DataType_FloatVector] = serdeEntry{ func(i int) arrow.DataType { @@ -511,12 +476,6 @@ var serdeMap = func() map[schemapb.DataType]serdeEntry { } return false }, - func(v any) uint64 { - if v == nil { - return 8 - } - return uint64(len(v.([]float32)) * 4) - }, } m[schemapb.DataType_SparseFloatVector] = byteEntry return m diff --git a/internal/storage/serde_events.go b/internal/storage/serde_events.go index a53196d1a7dd7..255bc1cef8042 100644 --- a/internal/storage/serde_events.go +++ b/internal/storage/serde_events.go @@ -28,6 +28,7 @@ import ( "github.com/apache/arrow/go/v12/arrow" "github.com/apache/arrow/go/v12/arrow/array" "github.com/apache/arrow/go/v12/arrow/memory" + "github.com/samber/lo" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/pkg/common" @@ -390,8 +391,6 @@ func NewBinlogSerializeWriter(schema *schemapb.CollectionSchema, partitionID, se if !ok { return nil, 0, merr.WrapErrServiceInternal(fmt.Sprintf("serialize error on type %s", types[fid])) } - eventWriters[fid].memorySize += int(typeEntry.sizeof(e)) - memorySize += typeEntry.sizeof(e) } } arrays := make([]arrow.Array, len(types)) @@ -400,6 +399,12 @@ func NewBinlogSerializeWriter(schema *schemapb.CollectionSchema, partitionID, se i := 0 for fid, builder := range builders { arrays[i] = builder.NewArray() + size := lo.SumBy[*memory.Buffer, int](arrays[i].Data().Buffers(), func(b *memory.Buffer) int { + return b.Len() + }) + eventWriters[fid].memorySize += size + memorySize += uint64(size) + builder.Release() fields[i] = arrow.Field{ Name: strconv.Itoa(int(fid)),