diff --git a/internal/datacoord/sync_segments_scheduler.go b/internal/datacoord/sync_segments_scheduler.go index 94b029ed481c2..b0f616984d207 100644 --- a/internal/datacoord/sync_segments_scheduler.go +++ b/internal/datacoord/sync_segments_scheduler.go @@ -134,12 +134,16 @@ func (sss *SyncSegmentsScheduler) SyncSegments(collectionID, partitionID int64, Level: seg.GetLevel(), NumOfRows: seg.GetNumOfRows(), } + statsLogs := make([]*datapb.Binlog, 0) for _, statsLog := range seg.GetStatslogs() { if statsLog.GetFieldID() == pkFieldID { - req.SegmentInfos[seg.ID].PkStatsLog = statsLog - break + statsLogs = append(statsLogs, statsLog.GetBinlogs()...) } } + req.SegmentInfos[seg.ID].PkStatsLog = &datapb.FieldBinlog{ + FieldID: pkFieldID, + Binlogs: statsLogs, + } } if err := sss.sessions.SyncSegments(nodeID, req); err != nil { diff --git a/internal/datanode/compaction/clustering_compactor.go b/internal/datanode/compaction/clustering_compactor.go index dda212cc6a95e..242cc27d44a10 100644 --- a/internal/datanode/compaction/clustering_compactor.go +++ b/internal/datanode/compaction/clustering_compactor.go @@ -42,6 +42,7 @@ import ( "github.com/milvus-io/milvus/internal/flushcommon/io" "github.com/milvus-io/milvus/internal/proto/clusteringpb" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" @@ -592,12 +593,12 @@ func (t *clusteringCompactionTask) mappingSegment( remained++ if (remained+1)%100 == 0 { - currentBufferTotalMemorySize := t.getBufferTotalUsedMemorySize() - // trigger flushBinlog - t.clusterBufferLocks.RLock(clusterBuffer.id) - currentBufferWriterFull := clusterBuffer.writer.IsFull() + currentBufferWriterFull := clusterBuffer.writer.FlushAndIsFull() t.clusterBufferLocks.RUnlock(clusterBuffer.id) + + currentBufferTotalMemorySize := t.getBufferTotalUsedMemorySize() + currentSegmentNumRows := clusterBuffer.currentSegmentRowNum.Load() if currentSegmentNumRows > t.plan.GetMaxSegmentRows() || currentBufferWriterFull { // reach segment/binlog max size @@ -823,18 +824,29 @@ func (t *clusteringCompactionTask) flushAll(ctx context.Context) error { return nil } -func (t *clusteringCompactionTask) packBufferToSegment(ctx context.Context, buffer *ClusterBuffer, writer *SegmentWriter) error { - if binlogs, ok := buffer.flushedBinlogs[writer.GetSegmentID()]; !ok || len(binlogs) == 0 { +func (t *clusteringCompactionTask) packBufferToSegment(ctx context.Context, buffer *ClusterBuffer, segmentID int64) error { + if binlogs, ok := buffer.flushedBinlogs[segmentID]; !ok || len(binlogs) == 0 { return nil } + binlogNum := 0 + numRows := buffer.flushedRowNum[segmentID] insertLogs := make([]*datapb.FieldBinlog, 0) - for _, fieldBinlog := range buffer.flushedBinlogs[writer.GetSegmentID()] { + for _, fieldBinlog := range buffer.flushedBinlogs[segmentID] { insertLogs = append(insertLogs, fieldBinlog) + binlogNum = len(fieldBinlog.GetBinlogs()) } - numRows := buffer.flushedRowNum[writer.GetSegmentID()] - statPaths, err := statSerializeWrite(ctx, t.binlogIO, t.logIDAlloc, writer, numRows.Load()) + fieldBinlogPaths := make([][]string, 0) + for idx := 0; idx < binlogNum; idx++ { + var ps []string + for _, fieldID := range []int64{t.primaryKeyField.GetFieldID(), common.RowIDField, common.TimeStampField} { + ps = append(ps, buffer.flushedBinlogs[segmentID][fieldID].GetBinlogs()[idx].GetLogPath()) + } + fieldBinlogPaths = append(fieldBinlogPaths, ps) + } + + statsLogs, err := t.generatePkStats(ctx, segmentID, numRows.Load(), fieldBinlogPaths) if err != nil { return err } @@ -842,10 +854,10 @@ func (t *clusteringCompactionTask) packBufferToSegment(ctx context.Context, buff // pack current flushBinlog data into a segment seg := &datapb.CompactionSegment{ PlanID: t.plan.GetPlanID(), - SegmentID: writer.GetSegmentID(), + SegmentID: segmentID, NumOfRows: numRows.Load(), InsertLogs: insertLogs, - Field2StatslogPaths: []*datapb.FieldBinlog{statPaths}, + Field2StatslogPaths: []*datapb.FieldBinlog{statsLogs}, Channel: t.plan.GetChannel(), } buffer.uploadedSegments = append(buffer.uploadedSegments, seg) @@ -853,35 +865,44 @@ func (t *clusteringCompactionTask) packBufferToSegment(ctx context.Context, buff FieldStats: []storage.FieldStats{buffer.clusteringKeyFieldStats.Clone()}, NumRows: int(numRows.Load()), } - buffer.uploadedSegmentStats[writer.GetSegmentID()] = segmentStats + buffer.uploadedSegmentStats[segmentID] = segmentStats for _, binlog := range seg.InsertLogs { - log.Debug("pack binlog in segment", zap.Int64("partitionID", t.partitionID), zap.Int64("segID", writer.GetSegmentID()), zap.String("binlog", binlog.String())) + log.Debug("pack binlog in segment", zap.Int64("partitionID", t.partitionID), + zap.Int64("segID", segmentID), zap.String("binlog", binlog.String())) + } + for _, statsLog := range seg.Field2StatslogPaths { + log.Debug("pack binlog in segment", zap.Int64("partitionID", t.partitionID), + zap.Int64("segID", segmentID), zap.String("binlog", statsLog.String())) } + log.Debug("finish pack segment", zap.Int64("partitionID", t.partitionID), zap.Int64("segID", seg.GetSegmentID()), zap.Int64("row num", seg.GetNumOfRows())) // clear segment binlogs cache - delete(buffer.flushedBinlogs, writer.GetSegmentID()) - // set old writer nil - writer = nil + delete(buffer.flushedBinlogs, segmentID) return nil } func (t *clusteringCompactionTask) flushBinlog(ctx context.Context, buffer *ClusterBuffer, writer *SegmentWriter, pack bool) error { - _, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, fmt.Sprintf("flushBinlog-%d", writer.GetSegmentID())) + segmentID := writer.GetSegmentID() + _, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, fmt.Sprintf("flushBinlog-%d", segmentID)) defer span.End() if writer == nil { log.Warn("buffer writer is nil, please check", zap.Int("buffer id", buffer.id)) return fmt.Errorf("buffer: %d writer is nil, please check", buffer.id) } + defer func() { + // set old writer nil + writer = nil + }() buffer.flushLock.Lock() defer buffer.flushLock.Unlock() writtenMemorySize := int64(writer.WrittenMemorySize()) writtenRowNum := writer.GetRowNum() log := log.With(zap.Int("bufferID", buffer.id), - zap.Int64("segmentID", writer.GetSegmentID()), + zap.Int64("segmentID", segmentID), zap.Bool("pack", pack), zap.Int64("writerRowNum", writtenRowNum), zap.Int64("writtenMemorySize", writtenMemorySize), @@ -892,7 +913,7 @@ func (t *clusteringCompactionTask) flushBinlog(ctx context.Context, buffer *Clus if writtenRowNum <= 0 { log.Debug("writerRowNum is zero, skip flush") if pack { - return t.packBufferToSegment(ctx, buffer, writer) + return t.packBufferToSegment(ctx, buffer, segmentID) } return nil } @@ -909,29 +930,30 @@ func (t *clusteringCompactionTask) flushBinlog(ctx context.Context, buffer *Clus return err } - if info, ok := buffer.flushedBinlogs[writer.GetSegmentID()]; !ok || info == nil { - buffer.flushedBinlogs[writer.GetSegmentID()] = make(map[typeutil.UniqueID]*datapb.FieldBinlog) + if info, ok := buffer.flushedBinlogs[segmentID]; !ok || info == nil { + buffer.flushedBinlogs[segmentID] = make(map[typeutil.UniqueID]*datapb.FieldBinlog) } for fID, path := range partialBinlogs { - tmpBinlog, ok := buffer.flushedBinlogs[writer.GetSegmentID()][fID] + tmpBinlog, ok := buffer.flushedBinlogs[segmentID][fID] if !ok { tmpBinlog = path } else { tmpBinlog.Binlogs = append(tmpBinlog.Binlogs, path.GetBinlogs()...) } - buffer.flushedBinlogs[writer.GetSegmentID()][fID] = tmpBinlog + buffer.flushedBinlogs[segmentID][fID] = tmpBinlog } - curSegFlushedRowNum := buffer.flushedRowNum[writer.GetSegmentID()] + + curSegFlushedRowNum := buffer.flushedRowNum[segmentID] curSegFlushedRowNum.Add(writtenRowNum) - buffer.flushedRowNum[writer.GetSegmentID()] = curSegFlushedRowNum + buffer.flushedRowNum[segmentID] = curSegFlushedRowNum // clean buffer with writer buffer.bufferMemorySize.Sub(writtenMemorySize) t.flushCount.Inc() if pack { - if err := t.packBufferToSegment(ctx, buffer, writer); err != nil { + if err := t.packBufferToSegment(ctx, buffer, segmentID); err != nil { return err } } @@ -1220,3 +1242,47 @@ func (t *clusteringCompactionTask) checkBuffersAfterCompaction() error { } return nil } + +func (t *clusteringCompactionTask) generatePkStats(ctx context.Context, segmentID int64, + numRows int64, binlogPaths [][]string) (*datapb.FieldBinlog, error) { + stats, err := storage.NewPrimaryKeyStats(t.primaryKeyField.GetFieldID(), int64(t.primaryKeyField.GetDataType()), numRows) + if err != nil { + return nil, err + } + + for _, path := range binlogPaths { + bytesArr, err := t.binlogIO.Download(ctx, path) + if err != nil { + log.Warn("download insertlogs wrong", zap.Strings("path", path), zap.Error(err)) + return nil, err + } + blobs := make([]*storage.Blob, len(bytesArr)) + for i := range bytesArr { + blobs[i] = &storage.Blob{Value: bytesArr[i]} + } + + pkIter, err := storage.NewInsertBinlogIterator(blobs, t.primaryKeyField.GetFieldID(), t.primaryKeyField.GetDataType()) + if err != nil { + log.Warn("new insert binlogs Itr wrong", zap.Strings("path", path), zap.Error(err)) + return nil, err + } + + for pkIter.HasNext() { + vIter, _ := pkIter.Next() + v, ok := vIter.(*storage.Value) + if !ok { + log.Warn("transfer interface to Value wrong", zap.Strings("path", path)) + return nil, errors.New("unexpected error") + } + stats.Update(v.PK) + } + } + + codec := storage.NewInsertCodecWithSchema(&etcdpb.CollectionMeta{ID: t.collectionID, Schema: t.plan.GetSchema()}) + sblob, err := codec.SerializePkStats(stats, numRows) + if err != nil { + return nil, err + } + + return uploadStatsBlobs(ctx, t.collectionID, t.partitionID, segmentID, t.primaryKeyField.GetFieldID(), numRows, t.binlogIO, t.logIDAlloc, sblob) +} diff --git a/internal/datanode/compaction/clustering_compactor_test.go b/internal/datanode/compaction/clustering_compactor_test.go index 05f79f5a8d304..0c73ce350b33a 100644 --- a/internal/datanode/compaction/clustering_compactor_test.go +++ b/internal/datanode/compaction/clustering_compactor_test.go @@ -18,6 +18,7 @@ package compaction import ( "context" + "fmt" "testing" "time" @@ -37,6 +38,7 @@ import ( "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/tsoutil" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) func TestClusteringCompactionTaskSuite(t *testing.T) { @@ -172,8 +174,7 @@ func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormal() { var segmentID int64 = 1001 segWriter, err := NewSegmentWriter(schema, 1000, segmentID, PartitionID, CollectionID) s.Require().NoError(err) - - for i := 0; i < 1000; i++ { + for i := 0; i < 10240; i++ { v := storage.Value{ PK: storage.NewInt64PrimaryKey(int64(i)), Timestamp: int64(tsoutil.ComposeTSByTime(getMilvusBirthday(), 0)), @@ -189,24 +190,153 @@ func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormal() { s.plan.SegmentBinlogs = []*datapb.CompactionSegmentBinlogs{ { - SegmentID: 100, + SegmentID: segmentID, FieldBinlogs: lo.Values(fBinlogs), }, } s.task.plan.Schema = genCollectionSchema() s.task.plan.ClusteringKeyField = 100 - s.task.plan.PreferSegmentRows = 100 - s.task.plan.MaxSegmentRows = 200 + s.task.plan.PreferSegmentRows = 2048 + s.task.plan.MaxSegmentRows = 2048 s.task.plan.PreAllocatedSegments = &datapb.IDRange{ Begin: time.Now().UnixMilli(), End: time.Now().UnixMilli() + 1000, } + // 8+8+8+4+7+4*4=51 + // 51*1024 = 52224 + // writer will automatically flush after 1024 rows. + paramtable.Get().Save(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key, "52223") + defer paramtable.Get().Reset(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key) + compactionResult, err := s.task.Compact() s.Require().NoError(err) - s.Equal(10, len(s.task.clusterBuffers)) - s.Equal(10, len(compactionResult.GetSegments())) + s.Equal(5, len(s.task.clusterBuffers)) + s.Equal(5, len(compactionResult.GetSegments())) + totalBinlogNum := 0 + totalRowNum := int64(0) + for _, fb := range compactionResult.GetSegments()[0].GetInsertLogs() { + for _, b := range fb.GetBinlogs() { + totalBinlogNum++ + if fb.GetFieldID() == 100 { + totalRowNum += b.GetEntriesNum() + } + } + } + statsBinlogNum := 0 + statsRowNum := int64(0) + for _, sb := range compactionResult.GetSegments()[0].GetField2StatslogPaths() { + for _, b := range sb.GetBinlogs() { + statsBinlogNum++ + statsRowNum += b.GetEntriesNum() + } + } + s.Equal(2, totalBinlogNum/len(schema.GetFields())) + s.Equal(1, statsBinlogNum) + s.Equal(totalRowNum, statsRowNum) +} + +func (s *ClusteringCompactionTaskSuite) TestCheckBuffersAfterCompaction() { + s.Run("no leak", func() { + task := &clusteringCompactionTask{clusterBuffers: []*ClusterBuffer{{}}} + + s.NoError(task.checkBuffersAfterCompaction()) + }) + + s.Run("leak binlog", func() { + task := &clusteringCompactionTask{ + clusterBuffers: []*ClusterBuffer{ + { + flushedBinlogs: map[typeutil.UniqueID]map[typeutil.UniqueID]*datapb.FieldBinlog{ + 1: { + 101: { + FieldID: 101, + Binlogs: []*datapb.Binlog{{LogID: 1000}}, + }, + }, + }, + }, + }, + } + s.Error(task.checkBuffersAfterCompaction()) + }) +} + +func (s *ClusteringCompactionTaskSuite) TestGeneratePkStats() { + pkField := &schemapb.FieldSchema{ + FieldID: 100, + Name: "pk", + IsPrimaryKey: true, + Description: "", + DataType: schemapb.DataType_Int64, + } + s.Run("num rows zero", func() { + task := &clusteringCompactionTask{ + primaryKeyField: pkField, + } + binlogs, err := task.generatePkStats(context.Background(), 1, 0, nil) + s.Error(err) + s.Nil(binlogs) + }) + + s.Run("download binlogs failed", func() { + s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return(nil, fmt.Errorf("mock error")) + task := &clusteringCompactionTask{ + binlogIO: s.mockBinlogIO, + primaryKeyField: pkField, + } + binlogs, err := task.generatePkStats(context.Background(), 1, 100, [][]string{{"abc", "def"}}) + s.Error(err) + s.Nil(binlogs) + }) + + s.Run("NewInsertBinlogIterator failed", func() { + s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return([][]byte{[]byte("mock")}, nil) + task := &clusteringCompactionTask{ + binlogIO: s.mockBinlogIO, + primaryKeyField: pkField, + } + binlogs, err := task.generatePkStats(context.Background(), 1, 100, [][]string{{"abc", "def"}}) + s.Error(err) + s.Nil(binlogs) + }) + + s.Run("upload failed", func() { + schema := genCollectionSchema() + segWriter, err := NewSegmentWriter(schema, 1000, SegmentID, PartitionID, CollectionID) + s.Require().NoError(err) + for i := 0; i < 2000; i++ { + v := storage.Value{ + PK: storage.NewInt64PrimaryKey(int64(i)), + Timestamp: int64(tsoutil.ComposeTSByTime(getMilvusBirthday(), 0)), + Value: genRow(int64(i)), + } + err = segWriter.Write(&v) + s.Require().NoError(err) + } + segWriter.writer.Flush() + + kvs, _, err := serializeWrite(context.TODO(), s.mockAlloc, segWriter) + s.NoError(err) + mockBinlogIO := io.NewMockBinlogIO(s.T()) + mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return(lo.Values(kvs), nil) + mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(fmt.Errorf("mock error")) + task := &clusteringCompactionTask{ + collectionID: CollectionID, + partitionID: PartitionID, + plan: &datapb.CompactionPlan{ + Schema: genCollectionSchema(), + }, + binlogIO: mockBinlogIO, + primaryKeyField: pkField, + logIDAlloc: s.mockAlloc, + } + + binlogs, err := task.generatePkStats(context.Background(), 1, 100, [][]string{{"abc", "def"}}) + s.Error(err) + s.Nil(binlogs) + }) } func genRow(magic int64) map[int64]interface{} { diff --git a/internal/datanode/compaction/compactor_common.go b/internal/datanode/compaction/compactor_common.go index 0fe750542538b..8e57d2204741a 100644 --- a/internal/datanode/compaction/compactor_common.go +++ b/internal/datanode/compaction/compactor_common.go @@ -166,29 +166,34 @@ func serializeWrite(ctx context.Context, allocator allocator.Interface, writer * return } -func statSerializeWrite(ctx context.Context, io io.BinlogIO, allocator allocator.Interface, writer *SegmentWriter, finalRowCount int64) (*datapb.FieldBinlog, error) { +func statSerializeWrite(ctx context.Context, io io.BinlogIO, allocator allocator.Interface, writer *SegmentWriter) (*datapb.FieldBinlog, error) { ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "statslog serializeWrite") defer span.End() - sblob, err := writer.Finish(finalRowCount) + sblob, err := writer.Finish(writer.GetRowNum()) if err != nil { return nil, err } + return uploadStatsBlobs(ctx, writer.GetCollectionID(), writer.GetPartitionID(), writer.GetSegmentID(), writer.GetPkID(), writer.GetRowNum(), io, allocator, sblob) +} + +func uploadStatsBlobs(ctx context.Context, collectionID, partitionID, segmentID, pkID, numRows int64, + io io.BinlogIO, allocator allocator.Interface, blob *storage.Blob) (*datapb.FieldBinlog, error) { logID, err := allocator.AllocOne() if err != nil { return nil, err } - key, _ := binlog.BuildLogPath(storage.StatsBinlog, writer.GetCollectionID(), writer.GetPartitionID(), writer.GetSegmentID(), writer.GetPkID(), logID) - kvs := map[string][]byte{key: sblob.GetValue()} + key, _ := binlog.BuildLogPath(storage.StatsBinlog, collectionID, partitionID, segmentID, pkID, logID) + kvs := map[string][]byte{key: blob.GetValue()} statFieldLog := &datapb.FieldBinlog{ - FieldID: writer.GetPkID(), + FieldID: pkID, Binlogs: []*datapb.Binlog{ { - LogSize: int64(len(sblob.GetValue())), - MemorySize: int64(len(sblob.GetValue())), + LogSize: int64(len(blob.GetValue())), + MemorySize: int64(len(blob.GetValue())), LogPath: key, - EntriesNum: finalRowCount, + EntriesNum: numRows, }, }, } diff --git a/internal/datanode/compaction/mix_compactor.go b/internal/datanode/compaction/mix_compactor.go index 196cb3da8aa5b..0c91099c70667 100644 --- a/internal/datanode/compaction/mix_compactor.go +++ b/internal/datanode/compaction/mix_compactor.go @@ -244,7 +244,7 @@ func (t *mixCompactionTask) merge( } serWriteStart := time.Now() - sPath, err := statSerializeWrite(ctx, t.binlogIO, t.allocator, writer, remainingRowCount) + sPath, err := statSerializeWrite(ctx, t.binlogIO, t.allocator, writer) if err != nil { log.Warn("compact wrong, failed to serialize write segment stats", zap.Int64("remaining row count", remainingRowCount), zap.Error(err))