Skip to content

Commit

Permalink
fix: Fix bug where binlogs already flushed with new segment during pa…
Browse files Browse the repository at this point in the history
…ck (milvus-io#34762)

issue: milvus-io#34703

Signed-off-by: Cai Zhang <[email protected]>
  • Loading branch information
xiaocai2333 authored Jul 18, 2024
1 parent 4939f82 commit 0c0ca4c
Showing 1 changed file with 33 additions and 20 deletions.
53 changes: 33 additions & 20 deletions internal/datanode/compaction/clustering_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,10 @@ type ClusterBuffer struct {

bufferMemorySize atomic.Int64

flushedRowNum atomic.Int64
flushedRowNum map[typeutil.UniqueID]atomic.Int64
currentSegmentRowNum atomic.Int64
flushedBinlogs map[typeutil.UniqueID]*datapb.FieldBinlog
// segID -> fieldID -> binlogs
flushedBinlogs map[typeutil.UniqueID]map[typeutil.UniqueID]*datapb.FieldBinlog

uploadedSegments []*datapb.CompactionSegment
uploadedSegmentStats map[typeutil.UniqueID]storage.SegmentStats
Expand Down Expand Up @@ -293,7 +294,8 @@ func (t *clusteringCompactionTask) getScalarAnalyzeResult(ctx context.Context) e
}
buffer := &ClusterBuffer{
id: id,
flushedBinlogs: make(map[typeutil.UniqueID]*datapb.FieldBinlog, 0),
flushedRowNum: map[typeutil.UniqueID]atomic.Int64{},
flushedBinlogs: make(map[typeutil.UniqueID]map[typeutil.UniqueID]*datapb.FieldBinlog, 0),
uploadedSegments: make([]*datapb.CompactionSegment, 0),
uploadedSegmentStats: make(map[typeutil.UniqueID]storage.SegmentStats, 0),
clusteringKeyFieldStats: fieldStats,
Expand Down Expand Up @@ -346,7 +348,8 @@ func (t *clusteringCompactionTask) getVectorAnalyzeResult(ctx context.Context) e
fieldStats.SetVectorCentroids(storage.NewVectorFieldValue(t.clusteringKeyField.DataType, centroid))
clusterBuffer := &ClusterBuffer{
id: id,
flushedBinlogs: make(map[typeutil.UniqueID]*datapb.FieldBinlog, 0),
flushedRowNum: map[typeutil.UniqueID]atomic.Int64{},
flushedBinlogs: make(map[typeutil.UniqueID]map[typeutil.UniqueID]*datapb.FieldBinlog, 0),
uploadedSegments: make([]*datapb.CompactionSegment, 0),
uploadedSegmentStats: make(map[typeutil.UniqueID]storage.SegmentStats, 0),
clusteringKeyFieldStats: fieldStats,
Expand Down Expand Up @@ -587,16 +590,18 @@ func (t *clusteringCompactionTask) mappingSegment(
zap.Int64("currentBufferWrittenMemorySize", currentBufferWrittenMemorySize))

// trigger flushBinlog
currentBufferNum := clusterBuffer.writer.GetRowNum()
if clusterBuffer.currentSegmentRowNum.Load()+currentBufferNum > t.plan.GetMaxSegmentRows() ||
currentSegmentNumRows := clusterBuffer.currentSegmentRowNum.Load()
if currentSegmentNumRows > t.plan.GetMaxSegmentRows() ||
clusterBuffer.writer.IsFull() {
// reach segment/binlog max size
t.clusterBufferLocks.Lock(clusterBuffer.id)
writer := clusterBuffer.writer
pack, _ := t.refreshBufferWriter(clusterBuffer)
log.Debug("buffer need to flush", zap.Int("bufferID", clusterBuffer.id),
zap.Bool("pack", pack), zap.Int64("buffer num", currentBufferNum),
zap.Int64("clusterBuffer.flushedRowNum.Load()", clusterBuffer.flushedRowNum.Load()))
zap.Bool("pack", pack),
zap.Int64("current segment", writer.GetSegmentID()),
zap.Int64("current segment num rows", currentSegmentNumRows),
zap.Int64("writer num", writer.GetRowNum()))
t.clusterBufferLocks.Unlock(clusterBuffer.id)

t.flushChan <- FlushSignal{
Expand Down Expand Up @@ -805,14 +810,17 @@ func (t *clusteringCompactionTask) flushAll(ctx context.Context) error {
}

func (t *clusteringCompactionTask) packBufferToSegment(ctx context.Context, buffer *ClusterBuffer, writer *SegmentWriter) error {
if len(buffer.flushedBinlogs) == 0 {
if binlogs, ok := buffer.flushedBinlogs[writer.GetSegmentID()]; !ok || len(binlogs) == 0 {
return nil
}

insertLogs := make([]*datapb.FieldBinlog, 0)
for _, fieldBinlog := range buffer.flushedBinlogs {
for _, fieldBinlog := range buffer.flushedBinlogs[writer.GetSegmentID()] {
insertLogs = append(insertLogs, fieldBinlog)
}
statPaths, err := statSerializeWrite(ctx, t.binlogIO, t.logIDAlloc, writer, buffer.flushedRowNum.Load())

numRows := buffer.flushedRowNum[writer.GetSegmentID()]
statPaths, err := statSerializeWrite(ctx, t.binlogIO, t.logIDAlloc, writer, numRows.Load())
if err != nil {
return err
}
Expand All @@ -821,29 +829,28 @@ func (t *clusteringCompactionTask) packBufferToSegment(ctx context.Context, buff
seg := &datapb.CompactionSegment{
PlanID: t.plan.GetPlanID(),
SegmentID: writer.GetSegmentID(),
NumOfRows: buffer.flushedRowNum.Load(),
NumOfRows: numRows.Load(),
InsertLogs: insertLogs,
Field2StatslogPaths: []*datapb.FieldBinlog{statPaths},
Channel: t.plan.GetChannel(),
}
buffer.uploadedSegments = append(buffer.uploadedSegments, seg)
segmentStats := storage.SegmentStats{
FieldStats: []storage.FieldStats{buffer.clusteringKeyFieldStats.Clone()},
NumRows: int(buffer.flushedRowNum.Load()),
NumRows: int(numRows.Load()),
}
buffer.uploadedSegmentStats[writer.GetSegmentID()] = segmentStats

buffer.flushedBinlogs = make(map[typeutil.UniqueID]*datapb.FieldBinlog, 0)
for _, binlog := range seg.InsertLogs {
log.Debug("pack binlog in segment", zap.Int64("partitionID", t.partitionID), zap.Int64("segID", writer.GetSegmentID()), zap.String("binlog", binlog.String()))
}
log.Debug("finish pack segment", zap.Int64("partitionID", t.partitionID),
zap.Int64("segID", seg.GetSegmentID()),
zap.Int64("row num", seg.GetNumOfRows()))

// reset
buffer.flushedRowNum.Store(0)
// set old writer nil
// clear segment binlogs cache
buffer.flushedBinlogs[writer.GetSegmentID()] = nil
//set old writer nil
writer = nil
return nil
}
Expand Down Expand Up @@ -888,16 +895,22 @@ func (t *clusteringCompactionTask) flushBinlog(ctx context.Context, buffer *Clus
return err
}

if info, ok := buffer.flushedBinlogs[writer.GetSegmentID()]; !ok || info == nil {
buffer.flushedBinlogs[writer.GetSegmentID()] = make(map[typeutil.UniqueID]*datapb.FieldBinlog)
}

for fID, path := range partialBinlogs {
tmpBinlog, ok := buffer.flushedBinlogs[fID]
tmpBinlog, ok := buffer.flushedBinlogs[writer.GetSegmentID()][fID]
if !ok {
tmpBinlog = path
} else {
tmpBinlog.Binlogs = append(tmpBinlog.Binlogs, path.GetBinlogs()...)
}
buffer.flushedBinlogs[fID] = tmpBinlog
buffer.flushedBinlogs[writer.GetSegmentID()][fID] = tmpBinlog
}
buffer.flushedRowNum.Add(writtenRowNum)
curSegFlushedRowNum := buffer.flushedRowNum[writer.GetSegmentID()]
curSegFlushedRowNum.Add(writtenRowNum)
buffer.flushedRowNum[writer.GetSegmentID()] = curSegFlushedRowNum

// clean buffer with writer
buffer.bufferMemorySize.Sub(writtenMemorySize)
Expand Down

0 comments on commit 0c0ca4c

Please sign in to comment.