diff --git a/internal/datanode/compaction/clustering_compactor.go b/internal/datanode/compaction/clustering_compactor.go index 82a408d34da8d..2a09d58436402 100644 --- a/internal/datanode/compaction/clustering_compactor.go +++ b/internal/datanode/compaction/clustering_compactor.go @@ -1280,7 +1280,7 @@ func (t *clusteringCompactionTask) refreshBufferWriterWithPack(buffer *ClusterBu buffer.currentSegmentRowNum.Store(0) } - writer, err := NewSegmentWriter(t.plan.GetSchema(), t.plan.MaxSegmentRows, segmentID, t.partitionID, t.collectionID, t.bm25FieldIds) + writer, err := NewSegmentWriter(t.plan.GetSchema(), t.plan.MaxSegmentRows, compactionBatchSize, segmentID, t.partitionID, t.collectionID, t.bm25FieldIds) if err != nil { return pack, err } @@ -1295,7 +1295,7 @@ func (t *clusteringCompactionTask) refreshBufferWriter(buffer *ClusterBuffer) er segmentID = buffer.writer.Load().(*SegmentWriter).GetSegmentID() buffer.bufferMemorySize.Add(int64(buffer.writer.Load().(*SegmentWriter).WrittenMemorySize())) - writer, err := NewSegmentWriter(t.plan.GetSchema(), t.plan.MaxSegmentRows, segmentID, t.partitionID, t.collectionID, t.bm25FieldIds) + writer, err := NewSegmentWriter(t.plan.GetSchema(), t.plan.MaxSegmentRows, compactionBatchSize, segmentID, t.partitionID, t.collectionID, t.bm25FieldIds) if err != nil { return err } diff --git a/internal/datanode/compaction/clustering_compactor_test.go b/internal/datanode/compaction/clustering_compactor_test.go index a1486e820f3a0..ebfe20de34381 100644 --- a/internal/datanode/compaction/clustering_compactor_test.go +++ b/internal/datanode/compaction/clustering_compactor_test.go @@ -171,7 +171,7 @@ func (s *ClusteringCompactionTaskSuite) TestCompactionInit() { func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormal() { schema := genCollectionSchema() var segmentID int64 = 1001 - segWriter, err := NewSegmentWriter(schema, 1000, segmentID, PartitionID, CollectionID, []int64{}) + segWriter, err := NewSegmentWriter(schema, 1000, compactionBatchSize, segmentID, PartitionID, CollectionID, []int64{}) s.Require().NoError(err) for i := 0; i < 10240; i++ { v := storage.Value{ @@ -240,7 +240,7 @@ func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormal() { func (s *ClusteringCompactionTaskSuite) TestCompactionWithBM25Function() { schema := genCollectionSchemaWithBM25() var segmentID int64 = 1001 - segWriter, err := NewSegmentWriter(schema, 1000, segmentID, PartitionID, CollectionID, []int64{102}) + segWriter, err := NewSegmentWriter(schema, 1000, compactionBatchSize, segmentID, PartitionID, CollectionID, []int64{102}) s.Require().NoError(err) for i := 0; i < 10240; i++ { @@ -453,7 +453,7 @@ func (s *ClusteringCompactionTaskSuite) TestGeneratePkStats() { s.Run("upload failed", func() { schema := genCollectionSchema() - segWriter, err := NewSegmentWriter(schema, 1000, SegmentID, PartitionID, CollectionID, []int64{}) + segWriter, err := NewSegmentWriter(schema, 1000, compactionBatchSize, SegmentID, PartitionID, CollectionID, []int64{}) s.Require().NoError(err) for i := 0; i < 2000; i++ { v := storage.Value{ diff --git a/internal/datanode/compaction/compactor_common.go b/internal/datanode/compaction/compactor_common.go index 3839865f82f2f..560e761adfa5c 100644 --- a/internal/datanode/compaction/compactor_common.go +++ b/internal/datanode/compaction/compactor_common.go @@ -35,6 +35,8 @@ import ( "github.com/milvus-io/milvus/pkg/util/typeutil" ) +const compactionBatchSize = 100 + func isExpiredEntity(ttl int64, now, ts typeutil.Timestamp) bool { // entity expire is not enabled if duration <= 0 if ttl <= 0 { diff --git a/internal/datanode/compaction/mix_compactor_test.go b/internal/datanode/compaction/mix_compactor_test.go index 51cfee8fa4923..59d912c45def0 100644 --- a/internal/datanode/compaction/mix_compactor_test.go +++ b/internal/datanode/compaction/mix_compactor_test.go @@ -651,7 +651,7 @@ func getRow(magic int64) map[int64]interface{} { } func (s *MixCompactionTaskSuite) initMultiRowsSegBuffer(magic, numRows, step int64) { - segWriter, err := NewSegmentWriter(s.meta.GetSchema(), 65535, magic, PartitionID, CollectionID, []int64{}) + segWriter, err := NewSegmentWriter(s.meta.GetSchema(), 65535, compactionBatchSize, magic, PartitionID, CollectionID, []int64{}) s.Require().NoError(err) for i := int64(0); i < numRows; i++ { @@ -670,7 +670,7 @@ func (s *MixCompactionTaskSuite) initMultiRowsSegBuffer(magic, numRows, step int } func (s *MixCompactionTaskSuite) initSegBufferWithBM25(magic int64) { - segWriter, err := NewSegmentWriter(s.meta.GetSchema(), 100, magic, PartitionID, CollectionID, []int64{102}) + segWriter, err := NewSegmentWriter(s.meta.GetSchema(), 100, compactionBatchSize, magic, PartitionID, CollectionID, []int64{102}) s.Require().NoError(err) v := storage.Value{ @@ -686,7 +686,7 @@ func (s *MixCompactionTaskSuite) initSegBufferWithBM25(magic int64) { } func (s *MixCompactionTaskSuite) initSegBuffer(size int, seed int64) { - segWriter, err := NewSegmentWriter(s.meta.GetSchema(), 100, seed, PartitionID, CollectionID, []int64{}) + segWriter, err := NewSegmentWriter(s.meta.GetSchema(), 100, compactionBatchSize, seed, PartitionID, CollectionID, []int64{}) s.Require().NoError(err) for i := 0; i < size; i++ { diff --git a/internal/datanode/compaction/segment_writer.go b/internal/datanode/compaction/segment_writer.go index b6171b2b6f829..e9eac72332f3c 100644 --- a/internal/datanode/compaction/segment_writer.go +++ b/internal/datanode/compaction/segment_writer.go @@ -1,6 +1,18 @@ -// SegmentInsertBuffer can be reused to buffer all insert data of one segment -// buffer.Serialize will serialize the InsertBuffer and clear it -// pkstats keeps tracking pkstats of the segment until Finish +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. package compaction @@ -153,7 +165,7 @@ func (w *MultiSegmentWriter) addNewWriter() error { if err != nil { return err } - writer, err := NewSegmentWriter(w.schema, w.maxRows, newSegmentID, w.partitionID, w.collectionID, w.bm25Fields) + writer, err := NewSegmentWriter(w.schema, w.maxRows, compactionBatchSize, newSegmentID, w.partitionID, w.collectionID, w.bm25Fields) if err != nil { return err } @@ -331,6 +343,7 @@ type SegmentWriter struct { rowCount *atomic.Int64 syncedSize *atomic.Int64 + batchSize int maxBinlogSize uint64 } @@ -422,8 +435,7 @@ func (w *SegmentWriter) FlushAndIsFull() bool { return w.writer.WrittenMemorySize() > w.maxBinlogSize } -func (w *SegmentWriter) FlushAndIsFullWithBinlogMaxSize(binLogMaxSize uint64) bool { - w.writer.Flush() +func (w *SegmentWriter) IsFullWithBinlogMaxSize(binLogMaxSize uint64) bool { return w.writer.WrittenMemorySize() > binLogMaxSize } @@ -466,15 +478,15 @@ func (w *SegmentWriter) GetTotalSize() int64 { func (w *SegmentWriter) clear() { w.syncedSize.Add(int64(w.writer.WrittenMemorySize())) - writer, closers, _ := newBinlogWriter(w.collectionID, w.partitionID, w.segmentID, w.sch) + writer, closers, _ := newBinlogWriter(w.collectionID, w.partitionID, w.segmentID, w.sch, w.batchSize) w.writer = writer w.closers = closers w.tsFrom = math.MaxUint64 w.tsTo = 0 } -func NewSegmentWriter(sch *schemapb.CollectionSchema, maxCount int64, segID, partID, collID int64, Bm25Fields []int64) (*SegmentWriter, error) { - writer, closers, err := newBinlogWriter(collID, partID, segID, sch) +func NewSegmentWriter(sch *schemapb.CollectionSchema, maxCount int64, batchSize int, segID, partID, collID int64, Bm25Fields []int64) (*SegmentWriter, error) { + writer, closers, err := newBinlogWriter(collID, partID, segID, sch, batchSize) if err != nil { return nil, err } @@ -505,6 +517,7 @@ func NewSegmentWriter(sch *schemapb.CollectionSchema, maxCount int64, segID, par rowCount: atomic.NewInt64(0), syncedSize: atomic.NewInt64(0), + batchSize: batchSize, maxBinlogSize: paramtable.Get().DataNodeCfg.BinLogMaxSize.GetAsUint64(), } @@ -514,13 +527,13 @@ func NewSegmentWriter(sch *schemapb.CollectionSchema, maxCount int64, segID, par return &segWriter, nil } -func newBinlogWriter(collID, partID, segID int64, schema *schemapb.CollectionSchema, +func newBinlogWriter(collID, partID, segID int64, schema *schemapb.CollectionSchema, batchSize int, ) (writer *storage.SerializeWriter[*storage.Value], closers []func() (*storage.Blob, error), err error) { fieldWriters := storage.NewBinlogStreamWriters(collID, partID, segID, schema.Fields) closers = make([]func() (*storage.Blob, error), 0, len(fieldWriters)) for _, w := range fieldWriters { closers = append(closers, w.Finalize) } - writer, err = storage.NewBinlogSerializeWriter(schema, partID, segID, fieldWriters, 100) + writer, err = storage.NewBinlogSerializeWriter(schema, partID, segID, fieldWriters, batchSize) return } diff --git a/internal/datanode/compaction/segment_writer_bench_test.go b/internal/datanode/compaction/segment_writer_bench_test.go new file mode 100644 index 0000000000000..2c9df3daeb456 --- /dev/null +++ b/internal/datanode/compaction/segment_writer_bench_test.go @@ -0,0 +1,119 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package compaction + +import ( + "fmt" + "math/rand" + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/paramtable" +) + +func testSegmentWriterBatchSize(b *testing.B, batchSize int) { + orgLevel := log.GetLevel() + log.SetLevel(zapcore.InfoLevel) + defer log.SetLevel(orgLevel) + paramtable.Init() + + const ( + dim = 128 + numRows = 1000000 + ) + + var ( + rId = &schemapb.FieldSchema{FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64} + ts = &schemapb.FieldSchema{FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64} + pk = &schemapb.FieldSchema{FieldID: 100, Name: "pk", IsPrimaryKey: true, DataType: schemapb.DataType_VarChar, TypeParams: []*commonpb.KeyValuePair{{Key: common.MaxLengthKey, Value: "100"}}} + f = &schemapb.FieldSchema{FieldID: 101, Name: "random", DataType: schemapb.DataType_Double} + fVec = &schemapb.FieldSchema{FieldID: 102, Name: "vec", DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{{Key: common.DimKey, Value: strconv.Itoa(dim)}}} + ) + schema := &schemapb.CollectionSchema{Name: "test-aaa", Fields: []*schemapb.FieldSchema{rId, ts, pk, f, fVec}} + + // prepare data values + start := time.Now() + vec := make([]float32, dim) + for j := 0; j < dim; j++ { + vec[j] = rand.Float32() + } + values := make([]*storage.Value, numRows) + for i := 0; i < numRows; i++ { + value := &storage.Value{} + value.Value = make(map[int64]interface{}, len(schema.GetFields())) + m := value.Value.(map[int64]interface{}) + for _, field := range schema.GetFields() { + switch field.GetDataType() { + case schemapb.DataType_Int64: + m[field.GetFieldID()] = int64(i) + case schemapb.DataType_VarChar: + k := fmt.Sprintf("test_pk_%d", i) + m[field.GetFieldID()] = k + value.PK = &storage.VarCharPrimaryKey{ + Value: k, + } + case schemapb.DataType_Double: + m[field.GetFieldID()] = float64(i) + case schemapb.DataType_FloatVector: + m[field.GetFieldID()] = vec + } + } + value.ID = int64(i) + value.Timestamp = int64(0) + value.IsDeleted = false + value.Value = m + values[i] = value + } + log.Info("prepare data done", zap.Int("len", len(values)), zap.Duration("dur", time.Since(start))) + + writer, err := NewSegmentWriter(schema, numRows, batchSize, 1, 2, 3, nil) + assert.NoError(b, err) + + b.N = 10 + b.ResetTimer() + for i := 0; i < b.N; i++ { + start = time.Now() + for _, v := range values { + err = writer.Write(v) + assert.NoError(b, err) + } + log.Info("write done", zap.Int("len", len(values)), zap.Duration("dur", time.Since(start))) + } + b.StopTimer() +} + +func Benchmark_SegmentWriter_BatchSize_100(b *testing.B) { + testSegmentWriterBatchSize(b, 100) +} + +func Benchmark_SegmentWriter_BatchSize_1000(b *testing.B) { + testSegmentWriterBatchSize(b, 1000) +} + +func Benchmark_SegmentWriter_BatchSize_10000(b *testing.B) { + testSegmentWriterBatchSize(b, 10000) +} diff --git a/internal/datanode/compaction/segment_writer_test.go b/internal/datanode/compaction/segment_writer_test.go index c93062ab214b1..647f125e5fcc8 100644 --- a/internal/datanode/compaction/segment_writer_test.go +++ b/internal/datanode/compaction/segment_writer_test.go @@ -44,7 +44,7 @@ func (s *SegmentWriteSuite) TestWriteFailed() { s.Run("get bm25 field failed", func() { schema := genCollectionSchemaWithBM25() // init segment writer with invalid bm25 fieldID - writer, err := NewSegmentWriter(schema, 1024, 1, s.parititonID, s.collectionID, []int64{1000}) + writer, err := NewSegmentWriter(schema, 1024, compactionBatchSize, 1, s.parititonID, s.collectionID, []int64{1000}) s.Require().NoError(err) v := storage.Value{ @@ -59,7 +59,7 @@ func (s *SegmentWriteSuite) TestWriteFailed() { s.Run("parse bm25 field data failed", func() { schema := genCollectionSchemaWithBM25() // init segment writer with wrong field as bm25 sparse field - writer, err := NewSegmentWriter(schema, 1024, 1, s.parititonID, s.collectionID, []int64{101}) + writer, err := NewSegmentWriter(schema, 1024, compactionBatchSize, 1, s.parititonID, s.collectionID, []int64{101}) s.Require().NoError(err) v := storage.Value{ diff --git a/internal/flushcommon/io/binlog_io.go b/internal/flushcommon/io/binlog_io.go index 55274e8327e8e..e1fed730b3061 100644 --- a/internal/flushcommon/io/binlog_io.go +++ b/internal/flushcommon/io/binlog_io.go @@ -33,7 +33,9 @@ import ( type BinlogIO interface { Download(ctx context.Context, paths []string) ([][]byte, error) + AsyncDownload(ctx context.Context, paths []string) []*conc.Future[any] Upload(ctx context.Context, kvs map[string][]byte) error + AsyncUpload(ctx context.Context, kvs map[string][]byte) []*conc.Future[any] } type BinlogIoImpl struct { @@ -46,6 +48,18 @@ func NewBinlogIO(cm storage.ChunkManager) BinlogIO { } func (b *BinlogIoImpl) Download(ctx context.Context, paths []string) ([][]byte, error) { + futures := b.AsyncDownload(ctx, paths) + err := conc.AwaitAll(futures...) + if err != nil { + return nil, err + } + + return lo.Map(futures, func(future *conc.Future[any], _ int) []byte { + return future.Value().([]byte) + }), nil +} + +func (b *BinlogIoImpl) AsyncDownload(ctx context.Context, paths []string) []*conc.Future[any] { ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "Download") defer span.End() @@ -74,17 +88,15 @@ func (b *BinlogIoImpl) Download(ctx context.Context, paths []string) ([][]byte, futures = append(futures, future) } - err := conc.AwaitAll(futures...) - if err != nil { - return nil, err - } - - return lo.Map(futures, func(future *conc.Future[any], _ int) []byte { - return future.Value().([]byte) - }), nil + return futures } func (b *BinlogIoImpl) Upload(ctx context.Context, kvs map[string][]byte) error { + futures := b.AsyncUpload(ctx, kvs) + return conc.AwaitAll(futures...) +} + +func (b *BinlogIoImpl) AsyncUpload(ctx context.Context, kvs map[string][]byte) []*conc.Future[any] { ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "Upload") defer span.End() @@ -108,5 +120,5 @@ func (b *BinlogIoImpl) Upload(ctx context.Context, kvs map[string][]byte) error futures = append(futures, future) } - return conc.AwaitAll(futures...) + return futures } diff --git a/internal/flushcommon/io/mock_binlogio.go b/internal/flushcommon/io/mock_binlogio.go index b0132f16299a7..7e041f5740709 100644 --- a/internal/flushcommon/io/mock_binlogio.go +++ b/internal/flushcommon/io/mock_binlogio.go @@ -1,10 +1,12 @@ -// Code generated by mockery v2.32.4. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package io import ( context "context" + conc "github.com/milvus-io/milvus/pkg/util/conc" + mock "github.com/stretchr/testify/mock" ) @@ -21,10 +23,112 @@ func (_m *MockBinlogIO) EXPECT() *MockBinlogIO_Expecter { return &MockBinlogIO_Expecter{mock: &_m.Mock} } +// AsyncDownload provides a mock function with given fields: ctx, paths +func (_m *MockBinlogIO) AsyncDownload(ctx context.Context, paths []string) []*conc.Future[interface{}] { + ret := _m.Called(ctx, paths) + + if len(ret) == 0 { + panic("no return value specified for AsyncDownload") + } + + var r0 []*conc.Future[interface{}] + if rf, ok := ret.Get(0).(func(context.Context, []string) []*conc.Future[interface{}]); ok { + r0 = rf(ctx, paths) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*conc.Future[interface{}]) + } + } + + return r0 +} + +// MockBinlogIO_AsyncDownload_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AsyncDownload' +type MockBinlogIO_AsyncDownload_Call struct { + *mock.Call +} + +// AsyncDownload is a helper method to define mock.On call +// - ctx context.Context +// - paths []string +func (_e *MockBinlogIO_Expecter) AsyncDownload(ctx interface{}, paths interface{}) *MockBinlogIO_AsyncDownload_Call { + return &MockBinlogIO_AsyncDownload_Call{Call: _e.mock.On("AsyncDownload", ctx, paths)} +} + +func (_c *MockBinlogIO_AsyncDownload_Call) Run(run func(ctx context.Context, paths []string)) *MockBinlogIO_AsyncDownload_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].([]string)) + }) + return _c +} + +func (_c *MockBinlogIO_AsyncDownload_Call) Return(_a0 []*conc.Future[interface{}]) *MockBinlogIO_AsyncDownload_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockBinlogIO_AsyncDownload_Call) RunAndReturn(run func(context.Context, []string) []*conc.Future[interface{}]) *MockBinlogIO_AsyncDownload_Call { + _c.Call.Return(run) + return _c +} + +// AsyncUpload provides a mock function with given fields: ctx, kvs +func (_m *MockBinlogIO) AsyncUpload(ctx context.Context, kvs map[string][]byte) []*conc.Future[interface{}] { + ret := _m.Called(ctx, kvs) + + if len(ret) == 0 { + panic("no return value specified for AsyncUpload") + } + + var r0 []*conc.Future[interface{}] + if rf, ok := ret.Get(0).(func(context.Context, map[string][]byte) []*conc.Future[interface{}]); ok { + r0 = rf(ctx, kvs) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*conc.Future[interface{}]) + } + } + + return r0 +} + +// MockBinlogIO_AsyncUpload_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AsyncUpload' +type MockBinlogIO_AsyncUpload_Call struct { + *mock.Call +} + +// AsyncUpload is a helper method to define mock.On call +// - ctx context.Context +// - kvs map[string][]byte +func (_e *MockBinlogIO_Expecter) AsyncUpload(ctx interface{}, kvs interface{}) *MockBinlogIO_AsyncUpload_Call { + return &MockBinlogIO_AsyncUpload_Call{Call: _e.mock.On("AsyncUpload", ctx, kvs)} +} + +func (_c *MockBinlogIO_AsyncUpload_Call) Run(run func(ctx context.Context, kvs map[string][]byte)) *MockBinlogIO_AsyncUpload_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(map[string][]byte)) + }) + return _c +} + +func (_c *MockBinlogIO_AsyncUpload_Call) Return(_a0 []*conc.Future[interface{}]) *MockBinlogIO_AsyncUpload_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockBinlogIO_AsyncUpload_Call) RunAndReturn(run func(context.Context, map[string][]byte) []*conc.Future[interface{}]) *MockBinlogIO_AsyncUpload_Call { + _c.Call.Return(run) + return _c +} + // Download provides a mock function with given fields: ctx, paths func (_m *MockBinlogIO) Download(ctx context.Context, paths []string) ([][]byte, error) { ret := _m.Called(ctx, paths) + if len(ret) == 0 { + panic("no return value specified for Download") + } + var r0 [][]byte var r1 error if rf, ok := ret.Get(0).(func(context.Context, []string) ([][]byte, error)); ok { @@ -80,6 +184,10 @@ func (_c *MockBinlogIO_Download_Call) RunAndReturn(run func(context.Context, []s func (_m *MockBinlogIO) Upload(ctx context.Context, kvs map[string][]byte) error { ret := _m.Called(ctx, kvs) + if len(ret) == 0 { + panic("no return value specified for Upload") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, map[string][]byte) error); ok { r0 = rf(ctx, kvs) diff --git a/internal/flushcommon/pipeline/mock_fgmanager.go b/internal/flushcommon/pipeline/mock_fgmanager.go index 6945e21ff271f..2dafe7d0a450b 100644 --- a/internal/flushcommon/pipeline/mock_fgmanager.go +++ b/internal/flushcommon/pipeline/mock_fgmanager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.32.4. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package pipeline @@ -118,6 +118,10 @@ func (_c *MockFlowgraphManager_Close_Call) RunAndReturn(run func()) *MockFlowgra func (_m *MockFlowgraphManager) GetCollectionIDs() []int64 { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for GetCollectionIDs") + } + var r0 []int64 if rf, ok := ret.Get(0).(func() []int64); ok { r0 = rf() @@ -161,6 +165,10 @@ func (_c *MockFlowgraphManager_GetCollectionIDs_Call) RunAndReturn(run func() [] func (_m *MockFlowgraphManager) GetFlowgraphCount() int { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for GetFlowgraphCount") + } + var r0 int if rf, ok := ret.Get(0).(func() int); ok { r0 = rf() @@ -202,6 +210,10 @@ func (_c *MockFlowgraphManager_GetFlowgraphCount_Call) RunAndReturn(run func() i func (_m *MockFlowgraphManager) GetFlowgraphService(channel string) (*DataSyncService, bool) { ret := _m.Called(channel) + if len(ret) == 0 { + panic("no return value specified for GetFlowgraphService") + } + var r0 *DataSyncService var r1 bool if rf, ok := ret.Get(0).(func(string) (*DataSyncService, bool)); ok { @@ -256,6 +268,10 @@ func (_c *MockFlowgraphManager_GetFlowgraphService_Call) RunAndReturn(run func(s func (_m *MockFlowgraphManager) HasFlowgraph(channel string) bool { ret := _m.Called(channel) + if len(ret) == 0 { + panic("no return value specified for HasFlowgraph") + } + var r0 bool if rf, ok := ret.Get(0).(func(string) bool); ok { r0 = rf(channel) @@ -298,6 +314,10 @@ func (_c *MockFlowgraphManager_HasFlowgraph_Call) RunAndReturn(run func(string) func (_m *MockFlowgraphManager) HasFlowgraphWithOpID(channel string, opID int64) bool { ret := _m.Called(channel, opID) + if len(ret) == 0 { + panic("no return value specified for HasFlowgraphWithOpID") + } + var r0 bool if rf, ok := ret.Get(0).(func(string, int64) bool); ok { r0 = rf(channel, opID) diff --git a/internal/flushcommon/syncmgr/mock_meta_writer.go b/internal/flushcommon/syncmgr/mock_meta_writer.go index bacc91649a397..a5857cd3a0275 100644 --- a/internal/flushcommon/syncmgr/mock_meta_writer.go +++ b/internal/flushcommon/syncmgr/mock_meta_writer.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.32.4. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package syncmgr @@ -25,6 +25,10 @@ func (_m *MockMetaWriter) EXPECT() *MockMetaWriter_Expecter { func (_m *MockMetaWriter) DropChannel(_a0 context.Context, _a1 string) error { ret := _m.Called(_a0, _a1) + if len(ret) == 0 { + panic("no return value specified for DropChannel") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { r0 = rf(_a0, _a1) @@ -68,6 +72,10 @@ func (_c *MockMetaWriter_DropChannel_Call) RunAndReturn(run func(context.Context func (_m *MockMetaWriter) UpdateSync(_a0 context.Context, _a1 *SyncTask) error { ret := _m.Called(_a0, _a1) + if len(ret) == 0 { + panic("no return value specified for UpdateSync") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, *SyncTask) error); ok { r0 = rf(_a0, _a1) diff --git a/internal/flushcommon/syncmgr/mock_serializer.go b/internal/flushcommon/syncmgr/mock_serializer.go index fdbf8236994c5..03d05aa85574f 100644 --- a/internal/flushcommon/syncmgr/mock_serializer.go +++ b/internal/flushcommon/syncmgr/mock_serializer.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.32.4. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package syncmgr @@ -25,6 +25,10 @@ func (_m *MockSerializer) EXPECT() *MockSerializer_Expecter { func (_m *MockSerializer) EncodeBuffer(ctx context.Context, pack *SyncPack) (Task, error) { ret := _m.Called(ctx, pack) + if len(ret) == 0 { + panic("no return value specified for EncodeBuffer") + } + var r0 Task var r1 error if rf, ok := ret.Get(0).(func(context.Context, *SyncPack) (Task, error)); ok { diff --git a/internal/flushcommon/syncmgr/mock_task.go b/internal/flushcommon/syncmgr/mock_task.go index 01a80a59c58a4..cc7494c0032b4 100644 --- a/internal/flushcommon/syncmgr/mock_task.go +++ b/internal/flushcommon/syncmgr/mock_task.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.32.4. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package syncmgr @@ -26,6 +26,10 @@ func (_m *MockTask) EXPECT() *MockTask_Expecter { func (_m *MockTask) ChannelName() string { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for ChannelName") + } + var r0 string if rf, ok := ret.Get(0).(func() string); ok { r0 = rf() @@ -67,6 +71,10 @@ func (_c *MockTask_ChannelName_Call) RunAndReturn(run func() string) *MockTask_C func (_m *MockTask) Checkpoint() *msgpb.MsgPosition { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for Checkpoint") + } + var r0 *msgpb.MsgPosition if rf, ok := ret.Get(0).(func() *msgpb.MsgPosition); ok { r0 = rf() @@ -143,6 +151,10 @@ func (_c *MockTask_HandleError_Call) RunAndReturn(run func(error)) *MockTask_Han func (_m *MockTask) IsFlush() bool { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for IsFlush") + } + var r0 bool if rf, ok := ret.Get(0).(func() bool); ok { r0 = rf() @@ -184,6 +196,10 @@ func (_c *MockTask_IsFlush_Call) RunAndReturn(run func() bool) *MockTask_IsFlush func (_m *MockTask) Run(_a0 context.Context) error { ret := _m.Called(_a0) + if len(ret) == 0 { + panic("no return value specified for Run") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context) error); ok { r0 = rf(_a0) @@ -226,6 +242,10 @@ func (_c *MockTask_Run_Call) RunAndReturn(run func(context.Context) error) *Mock func (_m *MockTask) SegmentID() int64 { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for SegmentID") + } + var r0 int64 if rf, ok := ret.Get(0).(func() int64); ok { r0 = rf() @@ -267,6 +287,10 @@ func (_c *MockTask_SegmentID_Call) RunAndReturn(run func() int64) *MockTask_Segm func (_m *MockTask) StartPosition() *msgpb.MsgPosition { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for StartPosition") + } + var r0 *msgpb.MsgPosition if rf, ok := ret.Get(0).(func() *msgpb.MsgPosition); ok { r0 = rf() diff --git a/internal/flushcommon/writebuffer/mock_manager.go b/internal/flushcommon/writebuffer/mock_manager.go index 4b9bde855779a..2a08bd5d96706 100644 --- a/internal/flushcommon/writebuffer/mock_manager.go +++ b/internal/flushcommon/writebuffer/mock_manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.32.4. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package writebuffer @@ -30,6 +30,10 @@ func (_m *MockBufferManager) EXPECT() *MockBufferManager_Expecter { func (_m *MockBufferManager) BufferData(channel string, insertData []*InsertData, deleteMsgs []*msgstream.DeleteMsg, startPos *msgpb.MsgPosition, endPos *msgpb.MsgPosition) error { ret := _m.Called(channel, insertData, deleteMsgs, startPos, endPos) + if len(ret) == 0 { + panic("no return value specified for BufferData") + } + var r0 error if rf, ok := ret.Get(0).(func(string, []*InsertData, []*msgstream.DeleteMsg, *msgpb.MsgPosition, *msgpb.MsgPosition) error); ok { r0 = rf(channel, insertData, deleteMsgs, startPos, endPos) @@ -143,6 +147,10 @@ func (_c *MockBufferManager_DropPartitions_Call) RunAndReturn(run func(string, [ func (_m *MockBufferManager) FlushChannel(ctx context.Context, channel string, flushTs uint64) error { ret := _m.Called(ctx, channel, flushTs) + if len(ret) == 0 { + panic("no return value specified for FlushChannel") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, string, uint64) error); ok { r0 = rf(ctx, channel, flushTs) @@ -187,6 +195,10 @@ func (_c *MockBufferManager_FlushChannel_Call) RunAndReturn(run func(context.Con func (_m *MockBufferManager) GetCheckpoint(channel string) (*msgpb.MsgPosition, bool, error) { ret := _m.Called(channel) + if len(ret) == 0 { + panic("no return value specified for GetCheckpoint") + } + var r0 *msgpb.MsgPosition var r1 bool var r2 error @@ -289,6 +301,10 @@ func (_m *MockBufferManager) Register(channel string, _a1 metacache.MetaCache, o _ca = append(_ca, _va...) ret := _m.Called(_ca...) + if len(ret) == 0 { + panic("no return value specified for Register") + } + var r0 error if rf, ok := ret.Get(0).(func(string, metacache.MetaCache, ...WriteBufferOption) error); ok { r0 = rf(channel, _a1, opts...) @@ -373,6 +389,10 @@ func (_c *MockBufferManager_RemoveChannel_Call) RunAndReturn(run func(string)) * func (_m *MockBufferManager) SealSegments(ctx context.Context, channel string, segmentIDs []int64) error { ret := _m.Called(ctx, channel, segmentIDs) + if len(ret) == 0 { + panic("no return value specified for SealSegments") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, string, []int64) error); ok { r0 = rf(ctx, channel, segmentIDs) diff --git a/internal/flushcommon/writebuffer/mock_write_buffer.go b/internal/flushcommon/writebuffer/mock_write_buffer.go index 93635c4178c28..53fc0ff21a46d 100644 --- a/internal/flushcommon/writebuffer/mock_write_buffer.go +++ b/internal/flushcommon/writebuffer/mock_write_buffer.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.32.4. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package writebuffer @@ -28,6 +28,10 @@ func (_m *MockWriteBuffer) EXPECT() *MockWriteBuffer_Expecter { func (_m *MockWriteBuffer) BufferData(insertMsgs []*InsertData, deleteMsgs []*msgstream.DeleteMsg, startPos *msgpb.MsgPosition, endPos *msgpb.MsgPosition) error { ret := _m.Called(insertMsgs, deleteMsgs, startPos, endPos) + if len(ret) == 0 { + panic("no return value specified for BufferData") + } + var r0 error if rf, ok := ret.Get(0).(func([]*InsertData, []*msgstream.DeleteMsg, *msgpb.MsgPosition, *msgpb.MsgPosition) error); ok { r0 = rf(insertMsgs, deleteMsgs, startPos, endPos) @@ -186,6 +190,10 @@ func (_c *MockWriteBuffer_EvictBuffer_Call) RunAndReturn(run func(...SyncPolicy) func (_m *MockWriteBuffer) GetCheckpoint() *msgpb.MsgPosition { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for GetCheckpoint") + } + var r0 *msgpb.MsgPosition if rf, ok := ret.Get(0).(func() *msgpb.MsgPosition); ok { r0 = rf() @@ -229,6 +237,10 @@ func (_c *MockWriteBuffer_GetCheckpoint_Call) RunAndReturn(run func() *msgpb.Msg func (_m *MockWriteBuffer) GetFlushTimestamp() uint64 { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for GetFlushTimestamp") + } + var r0 uint64 if rf, ok := ret.Get(0).(func() uint64); ok { r0 = rf() @@ -270,6 +282,10 @@ func (_c *MockWriteBuffer_GetFlushTimestamp_Call) RunAndReturn(run func() uint64 func (_m *MockWriteBuffer) HasSegment(segmentID int64) bool { ret := _m.Called(segmentID) + if len(ret) == 0 { + panic("no return value specified for HasSegment") + } + var r0 bool if rf, ok := ret.Get(0).(func(int64) bool); ok { r0 = rf(segmentID) @@ -312,6 +328,10 @@ func (_c *MockWriteBuffer_HasSegment_Call) RunAndReturn(run func(int64) bool) *M func (_m *MockWriteBuffer) MemorySize() int64 { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for MemorySize") + } + var r0 int64 if rf, ok := ret.Get(0).(func() int64); ok { r0 = rf() @@ -353,6 +373,10 @@ func (_c *MockWriteBuffer_MemorySize_Call) RunAndReturn(run func() int64) *MockW func (_m *MockWriteBuffer) SealSegments(ctx context.Context, segmentIDs []int64) error { ret := _m.Called(ctx, segmentIDs) + if len(ret) == 0 { + panic("no return value specified for SealSegments") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, []int64) error); ok { r0 = rf(ctx, segmentIDs) diff --git a/internal/indexnode/task_stats.go b/internal/indexnode/task_stats.go index ac340db78e5bd..d24475fc95405 100644 --- a/internal/indexnode/task_stats.go +++ b/internal/indexnode/task_stats.go @@ -41,6 +41,7 @@ import ( "github.com/milvus-io/milvus/internal/util/indexcgowrapper" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/conc" _ "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/metautil" "github.com/milvus-io/milvus/pkg/util/timerecord" @@ -50,6 +51,8 @@ import ( var _ task = (*statsTask)(nil) +const statsBatchSize = 10000 + type statsTask struct { ident string ctx context.Context @@ -157,7 +160,7 @@ func (st *statsTask) sortSegment(ctx context.Context) ([]*datapb.FieldBinlog, er numRows := st.req.GetNumRows() bm25FieldIds := compaction.GetBM25FieldIDs(st.req.GetSchema()) - writer, err := compaction.NewSegmentWriter(st.req.GetSchema(), numRows, st.req.GetTargetSegmentID(), st.req.GetPartitionID(), st.req.GetCollectionID(), bm25FieldIds) + writer, err := compaction.NewSegmentWriter(st.req.GetSchema(), numRows, statsBatchSize, st.req.GetTargetSegmentID(), st.req.GetPartitionID(), st.req.GetCollectionID(), bm25FieldIds) if err != nil { log.Warn("sort segment wrong, unable to init segment writer", zap.Int64("taskID", st.req.GetTaskID()), zap.Error(err)) @@ -165,22 +168,24 @@ func (st *statsTask) sortSegment(ctx context.Context) ([]*datapb.FieldBinlog, er } var ( - flushBatchCount int // binlog batch count - unFlushedRowCount int64 = 0 + flushBatchCount int // binlog batch count - // All binlog meta of a segment - allBinlogs = make(map[typeutil.UniqueID]*datapb.FieldBinlog) - ) + allBinlogs = make(map[typeutil.UniqueID]*datapb.FieldBinlog) // All binlog meta of a segment + uploadFutures = make([]*conc.Future[any], 0) - serWriteTimeCost := time.Duration(0) - uploadTimeCost := time.Duration(0) - sortTimeCost := time.Duration(0) + downloadCost = time.Duration(0) + serWriteTimeCost = time.Duration(0) + sortTimeCost = time.Duration(0) + //writeCost = time.Duration(0) + ) + downloadStart := time.Now() values, err := st.downloadData(ctx, numRows, writer.GetPkID(), bm25FieldIds) if err != nil { log.Warn("download data failed", zap.Int64("taskID", st.req.GetTaskID()), zap.Error(err)) return nil, err } + downloadCost = time.Since(downloadStart) sortStart := time.Now() sort.Slice(values, func(i, j int) bool { @@ -188,15 +193,16 @@ func (st *statsTask) sortSegment(ctx context.Context) ([]*datapb.FieldBinlog, er }) sortTimeCost += time.Since(sortStart) - for _, v := range values { + for i, v := range values { + //writeStart := time.Now() err := writer.Write(v) if err != nil { log.Warn("write value wrong, failed to writer row", zap.Int64("taskID", st.req.GetTaskID()), zap.Error(err)) return nil, err } - unFlushedRowCount++ + //writeCost += time.Since(writeStart) - if (unFlushedRowCount+1)%100 == 0 && writer.FlushAndIsFullWithBinlogMaxSize(st.req.GetBinlogMaxSize()) { + if (i+1)%statsBatchSize == 0 && writer.IsFullWithBinlogMaxSize(st.req.GetBinlogMaxSize()) { serWriteStart := time.Now() binlogNum, kvs, partialBinlogs, err := serializeWrite(ctx, st.req.GetStartLogID()+st.logIDOffset, writer) if err != nil { @@ -205,17 +211,10 @@ func (st *statsTask) sortSegment(ctx context.Context) ([]*datapb.FieldBinlog, er } serWriteTimeCost += time.Since(serWriteStart) - uploadStart := time.Now() - if err := st.binlogIO.Upload(ctx, kvs); err != nil { - log.Warn("stats wrong, failed to upload kvs", zap.Int64("taskID", st.req.GetTaskID()), zap.Error(err)) - return nil, err - } - uploadTimeCost += time.Since(uploadStart) - + uploadFutures = append(uploadFutures, st.binlogIO.AsyncUpload(ctx, kvs)...) mergeFieldBinlogs(allBinlogs, partialBinlogs) flushBatchCount++ - unFlushedRowCount = 0 st.logIDOffset += binlogNum if st.req.GetStartLogID()+st.logIDOffset >= st.req.GetEndLogID() { log.Warn("binlog files too much, log is not enough", zap.Int64("taskID", st.req.GetTaskID()), @@ -236,16 +235,17 @@ func (st *statsTask) sortSegment(ctx context.Context) ([]*datapb.FieldBinlog, er serWriteTimeCost += time.Since(serWriteStart) st.logIDOffset += binlogNum - uploadStart := time.Now() - if err := st.binlogIO.Upload(ctx, kvs); err != nil { - return nil, err - } - uploadTimeCost += time.Since(uploadStart) - + uploadFutures = append(uploadFutures, st.binlogIO.AsyncUpload(ctx, kvs)...) mergeFieldBinlogs(allBinlogs, partialBinlogs) flushBatchCount++ } + err = conc.AwaitAll(uploadFutures...) + if err != nil { + log.Warn("stats wrong, failed to upload kvs", zap.Int64("taskID", st.req.GetTaskID()), zap.Error(err)) + return nil, err + } + serWriteStart := time.Now() binlogNums, sPath, err := statSerializeWrite(ctx, st.binlogIO, st.req.GetStartLogID()+st.logIDOffset, writer, numRows) if err != nil { @@ -302,9 +302,10 @@ func (st *statsTask) sortSegment(ctx context.Context) ([]*datapb.FieldBinlog, er zap.Int64("old rows", numRows), zap.Int("valid rows", len(values)), zap.Int("binlog batch count", flushBatchCount), - zap.Duration("upload binlogs elapse", uploadTimeCost), + zap.Duration("download elapse", downloadCost), zap.Duration("sort elapse", sortTimeCost), zap.Duration("serWrite elapse", serWriteTimeCost), + //zap.Duration("write elapse", writeCost), zap.Duration("total elapse", totalElapse)) return insertLogs, nil } diff --git a/internal/indexnode/task_stats_test.go b/internal/indexnode/task_stats_test.go index 81144676226c3..2d8989b9f79c7 100644 --- a/internal/indexnode/task_stats_test.go +++ b/internal/indexnode/task_stats_test.go @@ -66,7 +66,7 @@ func (s *TaskStatsSuite) SetupSubTest() { } func (s *TaskStatsSuite) GenSegmentWriterWithBM25(magic int64) { - segWriter, err := compaction.NewSegmentWriter(s.schema, 100, magic, s.partitionID, s.collectionID, []int64{102}) + segWriter, err := compaction.NewSegmentWriter(s.schema, 100, statsBatchSize, magic, s.partitionID, s.collectionID, []int64{102}) s.Require().NoError(err) v := storage.Value{