From 0c0ca4cf0e0132eb43c9bdbf837cc3c1938fa47d Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Thu, 18 Jul 2024 09:15:48 +0800 Subject: [PATCH] fix: Fix bug where binlogs already flushed with new segment during pack (#34762) issue: #34703 Signed-off-by: Cai Zhang --- .../compaction/clustering_compactor.go | 53 ++++++++++++------- 1 file changed, 33 insertions(+), 20 deletions(-) diff --git a/internal/datanode/compaction/clustering_compactor.go b/internal/datanode/compaction/clustering_compactor.go index f348f7ba21398..01e77ada5df9a 100644 --- a/internal/datanode/compaction/clustering_compactor.go +++ b/internal/datanode/compaction/clustering_compactor.go @@ -108,9 +108,10 @@ type ClusterBuffer struct { bufferMemorySize atomic.Int64 - flushedRowNum atomic.Int64 + flushedRowNum map[typeutil.UniqueID]atomic.Int64 currentSegmentRowNum atomic.Int64 - flushedBinlogs map[typeutil.UniqueID]*datapb.FieldBinlog + // segID -> fieldID -> binlogs + flushedBinlogs map[typeutil.UniqueID]map[typeutil.UniqueID]*datapb.FieldBinlog uploadedSegments []*datapb.CompactionSegment uploadedSegmentStats map[typeutil.UniqueID]storage.SegmentStats @@ -293,7 +294,8 @@ func (t *clusteringCompactionTask) getScalarAnalyzeResult(ctx context.Context) e } buffer := &ClusterBuffer{ id: id, - flushedBinlogs: make(map[typeutil.UniqueID]*datapb.FieldBinlog, 0), + flushedRowNum: map[typeutil.UniqueID]atomic.Int64{}, + flushedBinlogs: make(map[typeutil.UniqueID]map[typeutil.UniqueID]*datapb.FieldBinlog, 0), uploadedSegments: make([]*datapb.CompactionSegment, 0), uploadedSegmentStats: make(map[typeutil.UniqueID]storage.SegmentStats, 0), clusteringKeyFieldStats: fieldStats, @@ -346,7 +348,8 @@ func (t *clusteringCompactionTask) getVectorAnalyzeResult(ctx context.Context) e fieldStats.SetVectorCentroids(storage.NewVectorFieldValue(t.clusteringKeyField.DataType, centroid)) clusterBuffer := &ClusterBuffer{ id: id, - flushedBinlogs: make(map[typeutil.UniqueID]*datapb.FieldBinlog, 0), + flushedRowNum: map[typeutil.UniqueID]atomic.Int64{}, + flushedBinlogs: make(map[typeutil.UniqueID]map[typeutil.UniqueID]*datapb.FieldBinlog, 0), uploadedSegments: make([]*datapb.CompactionSegment, 0), uploadedSegmentStats: make(map[typeutil.UniqueID]storage.SegmentStats, 0), clusteringKeyFieldStats: fieldStats, @@ -587,16 +590,18 @@ func (t *clusteringCompactionTask) mappingSegment( zap.Int64("currentBufferWrittenMemorySize", currentBufferWrittenMemorySize)) // trigger flushBinlog - currentBufferNum := clusterBuffer.writer.GetRowNum() - if clusterBuffer.currentSegmentRowNum.Load()+currentBufferNum > t.plan.GetMaxSegmentRows() || + currentSegmentNumRows := clusterBuffer.currentSegmentRowNum.Load() + if currentSegmentNumRows > t.plan.GetMaxSegmentRows() || clusterBuffer.writer.IsFull() { // reach segment/binlog max size t.clusterBufferLocks.Lock(clusterBuffer.id) writer := clusterBuffer.writer pack, _ := t.refreshBufferWriter(clusterBuffer) log.Debug("buffer need to flush", zap.Int("bufferID", clusterBuffer.id), - zap.Bool("pack", pack), zap.Int64("buffer num", currentBufferNum), - zap.Int64("clusterBuffer.flushedRowNum.Load()", clusterBuffer.flushedRowNum.Load())) + zap.Bool("pack", pack), + zap.Int64("current segment", writer.GetSegmentID()), + zap.Int64("current segment num rows", currentSegmentNumRows), + zap.Int64("writer num", writer.GetRowNum())) t.clusterBufferLocks.Unlock(clusterBuffer.id) t.flushChan <- FlushSignal{ @@ -805,14 +810,17 @@ func (t *clusteringCompactionTask) flushAll(ctx context.Context) error { } func (t *clusteringCompactionTask) packBufferToSegment(ctx context.Context, buffer *ClusterBuffer, writer *SegmentWriter) error { - if len(buffer.flushedBinlogs) == 0 { + if binlogs, ok := buffer.flushedBinlogs[writer.GetSegmentID()]; !ok || len(binlogs) == 0 { return nil } + insertLogs := make([]*datapb.FieldBinlog, 0) - for _, fieldBinlog := range buffer.flushedBinlogs { + for _, fieldBinlog := range buffer.flushedBinlogs[writer.GetSegmentID()] { insertLogs = append(insertLogs, fieldBinlog) } - statPaths, err := statSerializeWrite(ctx, t.binlogIO, t.logIDAlloc, writer, buffer.flushedRowNum.Load()) + + numRows := buffer.flushedRowNum[writer.GetSegmentID()] + statPaths, err := statSerializeWrite(ctx, t.binlogIO, t.logIDAlloc, writer, numRows.Load()) if err != nil { return err } @@ -821,7 +829,7 @@ func (t *clusteringCompactionTask) packBufferToSegment(ctx context.Context, buff seg := &datapb.CompactionSegment{ PlanID: t.plan.GetPlanID(), SegmentID: writer.GetSegmentID(), - NumOfRows: buffer.flushedRowNum.Load(), + NumOfRows: numRows.Load(), InsertLogs: insertLogs, Field2StatslogPaths: []*datapb.FieldBinlog{statPaths}, Channel: t.plan.GetChannel(), @@ -829,11 +837,10 @@ func (t *clusteringCompactionTask) packBufferToSegment(ctx context.Context, buff buffer.uploadedSegments = append(buffer.uploadedSegments, seg) segmentStats := storage.SegmentStats{ FieldStats: []storage.FieldStats{buffer.clusteringKeyFieldStats.Clone()}, - NumRows: int(buffer.flushedRowNum.Load()), + NumRows: int(numRows.Load()), } buffer.uploadedSegmentStats[writer.GetSegmentID()] = segmentStats - buffer.flushedBinlogs = make(map[typeutil.UniqueID]*datapb.FieldBinlog, 0) for _, binlog := range seg.InsertLogs { log.Debug("pack binlog in segment", zap.Int64("partitionID", t.partitionID), zap.Int64("segID", writer.GetSegmentID()), zap.String("binlog", binlog.String())) } @@ -841,9 +848,9 @@ func (t *clusteringCompactionTask) packBufferToSegment(ctx context.Context, buff zap.Int64("segID", seg.GetSegmentID()), zap.Int64("row num", seg.GetNumOfRows())) - // reset - buffer.flushedRowNum.Store(0) - // set old writer nil + // clear segment binlogs cache + buffer.flushedBinlogs[writer.GetSegmentID()] = nil + //set old writer nil writer = nil return nil } @@ -888,16 +895,22 @@ func (t *clusteringCompactionTask) flushBinlog(ctx context.Context, buffer *Clus return err } + if info, ok := buffer.flushedBinlogs[writer.GetSegmentID()]; !ok || info == nil { + buffer.flushedBinlogs[writer.GetSegmentID()] = make(map[typeutil.UniqueID]*datapb.FieldBinlog) + } + for fID, path := range partialBinlogs { - tmpBinlog, ok := buffer.flushedBinlogs[fID] + tmpBinlog, ok := buffer.flushedBinlogs[writer.GetSegmentID()][fID] if !ok { tmpBinlog = path } else { tmpBinlog.Binlogs = append(tmpBinlog.Binlogs, path.GetBinlogs()...) } - buffer.flushedBinlogs[fID] = tmpBinlog + buffer.flushedBinlogs[writer.GetSegmentID()][fID] = tmpBinlog } - buffer.flushedRowNum.Add(writtenRowNum) + curSegFlushedRowNum := buffer.flushedRowNum[writer.GetSegmentID()] + curSegFlushedRowNum.Add(writtenRowNum) + buffer.flushedRowNum[writer.GetSegmentID()] = curSegFlushedRowNum // clean buffer with writer buffer.bufferMemorySize.Sub(writtenMemorySize)