Skip to content

Commit

Permalink
enhance: Optimize the performance of stats task (#37374)
Browse files Browse the repository at this point in the history
1. Increase the writer's `batchSize` to avoid multiple serialization
operations.
2. Perform asynchronous upload of binlog files to prevent blocking the
data processing flow.
3. Reduce multiple calls to `writer.Flush()`.

issue: #37373

---------

Signed-off-by: bigsheeper <[email protected]>
  • Loading branch information
bigsheeper authored Nov 8, 2024
1 parent bc9562f commit 8187942
Show file tree
Hide file tree
Showing 16 changed files with 397 additions and 65 deletions.
4 changes: 2 additions & 2 deletions internal/datanode/compaction/clustering_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1280,7 +1280,7 @@ func (t *clusteringCompactionTask) refreshBufferWriterWithPack(buffer *ClusterBu
buffer.currentSegmentRowNum.Store(0)
}

writer, err := NewSegmentWriter(t.plan.GetSchema(), t.plan.MaxSegmentRows, segmentID, t.partitionID, t.collectionID, t.bm25FieldIds)
writer, err := NewSegmentWriter(t.plan.GetSchema(), t.plan.MaxSegmentRows, compactionBatchSize, segmentID, t.partitionID, t.collectionID, t.bm25FieldIds)
if err != nil {
return pack, err
}
Expand All @@ -1295,7 +1295,7 @@ func (t *clusteringCompactionTask) refreshBufferWriter(buffer *ClusterBuffer) er
segmentID = buffer.writer.Load().(*SegmentWriter).GetSegmentID()
buffer.bufferMemorySize.Add(int64(buffer.writer.Load().(*SegmentWriter).WrittenMemorySize()))

writer, err := NewSegmentWriter(t.plan.GetSchema(), t.plan.MaxSegmentRows, segmentID, t.partitionID, t.collectionID, t.bm25FieldIds)
writer, err := NewSegmentWriter(t.plan.GetSchema(), t.plan.MaxSegmentRows, compactionBatchSize, segmentID, t.partitionID, t.collectionID, t.bm25FieldIds)
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions internal/datanode/compaction/clustering_compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (s *ClusteringCompactionTaskSuite) TestCompactionInit() {
func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormal() {
schema := genCollectionSchema()
var segmentID int64 = 1001
segWriter, err := NewSegmentWriter(schema, 1000, segmentID, PartitionID, CollectionID, []int64{})
segWriter, err := NewSegmentWriter(schema, 1000, compactionBatchSize, segmentID, PartitionID, CollectionID, []int64{})
s.Require().NoError(err)
for i := 0; i < 10240; i++ {
v := storage.Value{
Expand Down Expand Up @@ -240,7 +240,7 @@ func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormal() {
func (s *ClusteringCompactionTaskSuite) TestCompactionWithBM25Function() {
schema := genCollectionSchemaWithBM25()
var segmentID int64 = 1001
segWriter, err := NewSegmentWriter(schema, 1000, segmentID, PartitionID, CollectionID, []int64{102})
segWriter, err := NewSegmentWriter(schema, 1000, compactionBatchSize, segmentID, PartitionID, CollectionID, []int64{102})
s.Require().NoError(err)

for i := 0; i < 10240; i++ {
Expand Down Expand Up @@ -453,7 +453,7 @@ func (s *ClusteringCompactionTaskSuite) TestGeneratePkStats() {

s.Run("upload failed", func() {
schema := genCollectionSchema()
segWriter, err := NewSegmentWriter(schema, 1000, SegmentID, PartitionID, CollectionID, []int64{})
segWriter, err := NewSegmentWriter(schema, 1000, compactionBatchSize, SegmentID, PartitionID, CollectionID, []int64{})
s.Require().NoError(err)
for i := 0; i < 2000; i++ {
v := storage.Value{
Expand Down
2 changes: 2 additions & 0 deletions internal/datanode/compaction/compactor_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import (
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

const compactionBatchSize = 100

func isExpiredEntity(ttl int64, now, ts typeutil.Timestamp) bool {
// entity expire is not enabled if duration <= 0
if ttl <= 0 {
Expand Down
6 changes: 3 additions & 3 deletions internal/datanode/compaction/mix_compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ func getRow(magic int64) map[int64]interface{} {
}

func (s *MixCompactionTaskSuite) initMultiRowsSegBuffer(magic, numRows, step int64) {
segWriter, err := NewSegmentWriter(s.meta.GetSchema(), 65535, magic, PartitionID, CollectionID, []int64{})
segWriter, err := NewSegmentWriter(s.meta.GetSchema(), 65535, compactionBatchSize, magic, PartitionID, CollectionID, []int64{})
s.Require().NoError(err)

for i := int64(0); i < numRows; i++ {
Expand All @@ -670,7 +670,7 @@ func (s *MixCompactionTaskSuite) initMultiRowsSegBuffer(magic, numRows, step int
}

func (s *MixCompactionTaskSuite) initSegBufferWithBM25(magic int64) {
segWriter, err := NewSegmentWriter(s.meta.GetSchema(), 100, magic, PartitionID, CollectionID, []int64{102})
segWriter, err := NewSegmentWriter(s.meta.GetSchema(), 100, compactionBatchSize, magic, PartitionID, CollectionID, []int64{102})
s.Require().NoError(err)

v := storage.Value{
Expand All @@ -686,7 +686,7 @@ func (s *MixCompactionTaskSuite) initSegBufferWithBM25(magic int64) {
}

func (s *MixCompactionTaskSuite) initSegBuffer(size int, seed int64) {
segWriter, err := NewSegmentWriter(s.meta.GetSchema(), 100, seed, PartitionID, CollectionID, []int64{})
segWriter, err := NewSegmentWriter(s.meta.GetSchema(), 100, compactionBatchSize, seed, PartitionID, CollectionID, []int64{})
s.Require().NoError(err)

for i := 0; i < size; i++ {
Expand Down
35 changes: 24 additions & 11 deletions internal/datanode/compaction/segment_writer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,18 @@
// SegmentInsertBuffer can be reused to buffer all insert data of one segment
// buffer.Serialize will serialize the InsertBuffer and clear it
// pkstats keeps tracking pkstats of the segment until Finish
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package compaction

Expand Down Expand Up @@ -155,7 +167,7 @@ func (w *MultiSegmentWriter) addNewWriter() error {
if err != nil {
return err
}
writer, err := NewSegmentWriter(w.schema, w.maxRows, newSegmentID, w.partitionID, w.collectionID, w.bm25Fields)
writer, err := NewSegmentWriter(w.schema, w.maxRows, compactionBatchSize, newSegmentID, w.partitionID, w.collectionID, w.bm25Fields)
if err != nil {
return err
}
Expand Down Expand Up @@ -351,6 +363,7 @@ type SegmentWriter struct {
rowCount *atomic.Int64
syncedSize *atomic.Int64

batchSize int
maxBinlogSize uint64
}

Expand Down Expand Up @@ -484,8 +497,7 @@ func (w *SegmentWriter) FlushAndIsFull() bool {
return w.writer.WrittenMemorySize() > w.maxBinlogSize
}

func (w *SegmentWriter) FlushAndIsFullWithBinlogMaxSize(binLogMaxSize uint64) bool {
w.writer.Flush()
func (w *SegmentWriter) IsFullWithBinlogMaxSize(binLogMaxSize uint64) bool {
return w.writer.WrittenMemorySize() > binLogMaxSize
}

Expand Down Expand Up @@ -528,15 +540,15 @@ func (w *SegmentWriter) GetTotalSize() int64 {
func (w *SegmentWriter) clear() {
w.syncedSize.Add(int64(w.writer.WrittenMemorySize()))

writer, closers, _ := newBinlogWriter(w.collectionID, w.partitionID, w.segmentID, w.sch)
writer, closers, _ := newBinlogWriter(w.collectionID, w.partitionID, w.segmentID, w.sch, w.batchSize)
w.writer = writer
w.closers = closers
w.tsFrom = math.MaxUint64
w.tsTo = 0
}

func NewSegmentWriter(sch *schemapb.CollectionSchema, maxCount int64, segID, partID, collID int64, Bm25Fields []int64) (*SegmentWriter, error) {
writer, closers, err := newBinlogWriter(collID, partID, segID, sch)
func NewSegmentWriter(sch *schemapb.CollectionSchema, maxCount int64, batchSize int, segID, partID, collID int64, Bm25Fields []int64) (*SegmentWriter, error) {
writer, closers, err := newBinlogWriter(collID, partID, segID, sch, batchSize)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -567,6 +579,7 @@ func NewSegmentWriter(sch *schemapb.CollectionSchema, maxCount int64, segID, par
rowCount: atomic.NewInt64(0),
syncedSize: atomic.NewInt64(0),

batchSize: batchSize,
maxBinlogSize: paramtable.Get().DataNodeCfg.BinLogMaxSize.GetAsUint64(),
}

Expand All @@ -576,13 +589,13 @@ func NewSegmentWriter(sch *schemapb.CollectionSchema, maxCount int64, segID, par
return &segWriter, nil
}

func newBinlogWriter(collID, partID, segID int64, schema *schemapb.CollectionSchema,
func newBinlogWriter(collID, partID, segID int64, schema *schemapb.CollectionSchema, batchSize int,
) (writer *storage.SerializeWriter[*storage.Value], closers []func() (*storage.Blob, error), err error) {
fieldWriters := storage.NewBinlogStreamWriters(collID, partID, segID, schema.Fields)
closers = make([]func() (*storage.Blob, error), 0, len(fieldWriters))
for _, w := range fieldWriters {
closers = append(closers, w.Finalize)
}
writer, err = storage.NewBinlogSerializeWriter(schema, partID, segID, fieldWriters, 100)
writer, err = storage.NewBinlogSerializeWriter(schema, partID, segID, fieldWriters, batchSize)
return
}
119 changes: 119 additions & 0 deletions internal/datanode/compaction/segment_writer_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package compaction

import (
"fmt"
"math/rand"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)

func testSegmentWriterBatchSize(b *testing.B, batchSize int) {
orgLevel := log.GetLevel()
log.SetLevel(zapcore.InfoLevel)
defer log.SetLevel(orgLevel)
paramtable.Init()

const (
dim = 128
numRows = 1000000
)

var (
rId = &schemapb.FieldSchema{FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64}
ts = &schemapb.FieldSchema{FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64}
pk = &schemapb.FieldSchema{FieldID: 100, Name: "pk", IsPrimaryKey: true, DataType: schemapb.DataType_VarChar, TypeParams: []*commonpb.KeyValuePair{{Key: common.MaxLengthKey, Value: "100"}}}
f = &schemapb.FieldSchema{FieldID: 101, Name: "random", DataType: schemapb.DataType_Double}
fVec = &schemapb.FieldSchema{FieldID: 102, Name: "vec", DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{{Key: common.DimKey, Value: strconv.Itoa(dim)}}}
)
schema := &schemapb.CollectionSchema{Name: "test-aaa", Fields: []*schemapb.FieldSchema{rId, ts, pk, f, fVec}}

// prepare data values
start := time.Now()
vec := make([]float32, dim)
for j := 0; j < dim; j++ {
vec[j] = rand.Float32()
}
values := make([]*storage.Value, numRows)
for i := 0; i < numRows; i++ {
value := &storage.Value{}
value.Value = make(map[int64]interface{}, len(schema.GetFields()))
m := value.Value.(map[int64]interface{})
for _, field := range schema.GetFields() {
switch field.GetDataType() {
case schemapb.DataType_Int64:
m[field.GetFieldID()] = int64(i)
case schemapb.DataType_VarChar:
k := fmt.Sprintf("test_pk_%d", i)
m[field.GetFieldID()] = k
value.PK = &storage.VarCharPrimaryKey{
Value: k,
}
case schemapb.DataType_Double:
m[field.GetFieldID()] = float64(i)
case schemapb.DataType_FloatVector:
m[field.GetFieldID()] = vec
}
}
value.ID = int64(i)
value.Timestamp = int64(0)
value.IsDeleted = false
value.Value = m
values[i] = value
}
log.Info("prepare data done", zap.Int("len", len(values)), zap.Duration("dur", time.Since(start)))

writer, err := NewSegmentWriter(schema, numRows, batchSize, 1, 2, 3, nil)
assert.NoError(b, err)

b.N = 10
b.ResetTimer()
for i := 0; i < b.N; i++ {
start = time.Now()
for _, v := range values {
err = writer.Write(v)
assert.NoError(b, err)
}
log.Info("write done", zap.Int("len", len(values)), zap.Duration("dur", time.Since(start)))
}
b.StopTimer()
}

func Benchmark_SegmentWriter_BatchSize_100(b *testing.B) {
testSegmentWriterBatchSize(b, 100)
}

func Benchmark_SegmentWriter_BatchSize_1000(b *testing.B) {
testSegmentWriterBatchSize(b, 1000)
}

func Benchmark_SegmentWriter_BatchSize_10000(b *testing.B) {
testSegmentWriterBatchSize(b, 10000)
}
4 changes: 2 additions & 2 deletions internal/datanode/compaction/segment_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (s *SegmentWriteSuite) TestWriteFailed() {
s.Run("get bm25 field failed", func() {
schema := genCollectionSchemaWithBM25()
// init segment writer with invalid bm25 fieldID
writer, err := NewSegmentWriter(schema, 1024, 1, s.parititonID, s.collectionID, []int64{1000})
writer, err := NewSegmentWriter(schema, 1024, compactionBatchSize, 1, s.parititonID, s.collectionID, []int64{1000})
s.Require().NoError(err)

v := storage.Value{
Expand All @@ -59,7 +59,7 @@ func (s *SegmentWriteSuite) TestWriteFailed() {
s.Run("parse bm25 field data failed", func() {
schema := genCollectionSchemaWithBM25()
// init segment writer with wrong field as bm25 sparse field
writer, err := NewSegmentWriter(schema, 1024, 1, s.parititonID, s.collectionID, []int64{101})
writer, err := NewSegmentWriter(schema, 1024, compactionBatchSize, 1, s.parititonID, s.collectionID, []int64{101})
s.Require().NoError(err)

v := storage.Value{
Expand Down
30 changes: 21 additions & 9 deletions internal/flushcommon/io/binlog_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ import (

type BinlogIO interface {
Download(ctx context.Context, paths []string) ([][]byte, error)
AsyncDownload(ctx context.Context, paths []string) []*conc.Future[any]
Upload(ctx context.Context, kvs map[string][]byte) error
AsyncUpload(ctx context.Context, kvs map[string][]byte) []*conc.Future[any]
}

type BinlogIoImpl struct {
Expand All @@ -46,6 +48,18 @@ func NewBinlogIO(cm storage.ChunkManager) BinlogIO {
}

func (b *BinlogIoImpl) Download(ctx context.Context, paths []string) ([][]byte, error) {
futures := b.AsyncDownload(ctx, paths)
err := conc.AwaitAll(futures...)
if err != nil {
return nil, err
}

return lo.Map(futures, func(future *conc.Future[any], _ int) []byte {
return future.Value().([]byte)
}), nil
}

func (b *BinlogIoImpl) AsyncDownload(ctx context.Context, paths []string) []*conc.Future[any] {
ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "Download")
defer span.End()

Expand Down Expand Up @@ -74,17 +88,15 @@ func (b *BinlogIoImpl) Download(ctx context.Context, paths []string) ([][]byte,
futures = append(futures, future)
}

err := conc.AwaitAll(futures...)
if err != nil {
return nil, err
}

return lo.Map(futures, func(future *conc.Future[any], _ int) []byte {
return future.Value().([]byte)
}), nil
return futures
}

func (b *BinlogIoImpl) Upload(ctx context.Context, kvs map[string][]byte) error {
futures := b.AsyncUpload(ctx, kvs)
return conc.AwaitAll(futures...)
}

func (b *BinlogIoImpl) AsyncUpload(ctx context.Context, kvs map[string][]byte) []*conc.Future[any] {
ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "Upload")
defer span.End()

Expand All @@ -108,5 +120,5 @@ func (b *BinlogIoImpl) Upload(ctx context.Context, kvs map[string][]byte) error
futures = append(futures, future)
}

return conc.AwaitAll(futures...)
return futures
}
Loading

0 comments on commit 8187942

Please sign in to comment.