Skip to content

Commit

Permalink
fix: Fix the issue of missing stats log after clustering compaction (#…
Browse files Browse the repository at this point in the history
…35266)

issue: #35265

Signed-off-by: Cai Zhang <[email protected]>
  • Loading branch information
xiaocai2333 authored Aug 8, 2024
1 parent 626b1b2 commit aaab827
Show file tree
Hide file tree
Showing 5 changed files with 249 additions and 44 deletions.
8 changes: 6 additions & 2 deletions internal/datacoord/sync_segments_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,16 @@ func (sss *SyncSegmentsScheduler) SyncSegments(collectionID, partitionID int64,
Level: seg.GetLevel(),
NumOfRows: seg.GetNumOfRows(),
}
statsLogs := make([]*datapb.Binlog, 0)
for _, statsLog := range seg.GetStatslogs() {
if statsLog.GetFieldID() == pkFieldID {
req.SegmentInfos[seg.ID].PkStatsLog = statsLog
break
statsLogs = append(statsLogs, statsLog.GetBinlogs()...)
}
}
req.SegmentInfos[seg.ID].PkStatsLog = &datapb.FieldBinlog{
FieldID: pkFieldID,
Binlogs: statsLogs,
}
}

if err := sss.sessions.SyncSegments(nodeID, req); err != nil {
Expand Down
118 changes: 92 additions & 26 deletions internal/datanode/compaction/clustering_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/milvus-io/milvus/internal/flushcommon/io"
"github.com/milvus-io/milvus/internal/proto/clusteringpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
Expand Down Expand Up @@ -592,12 +593,12 @@ func (t *clusteringCompactionTask) mappingSegment(
remained++

if (remained+1)%100 == 0 {
currentBufferTotalMemorySize := t.getBufferTotalUsedMemorySize()
// trigger flushBinlog

t.clusterBufferLocks.RLock(clusterBuffer.id)
currentBufferWriterFull := clusterBuffer.writer.IsFull()
currentBufferWriterFull := clusterBuffer.writer.FlushAndIsFull()
t.clusterBufferLocks.RUnlock(clusterBuffer.id)

currentBufferTotalMemorySize := t.getBufferTotalUsedMemorySize()

currentSegmentNumRows := clusterBuffer.currentSegmentRowNum.Load()
if currentSegmentNumRows > t.plan.GetMaxSegmentRows() || currentBufferWriterFull {
// reach segment/binlog max size
Expand Down Expand Up @@ -823,65 +824,85 @@ func (t *clusteringCompactionTask) flushAll(ctx context.Context) error {
return nil
}

func (t *clusteringCompactionTask) packBufferToSegment(ctx context.Context, buffer *ClusterBuffer, writer *SegmentWriter) error {
if binlogs, ok := buffer.flushedBinlogs[writer.GetSegmentID()]; !ok || len(binlogs) == 0 {
func (t *clusteringCompactionTask) packBufferToSegment(ctx context.Context, buffer *ClusterBuffer, segmentID int64) error {
if binlogs, ok := buffer.flushedBinlogs[segmentID]; !ok || len(binlogs) == 0 {
return nil
}

binlogNum := 0
numRows := buffer.flushedRowNum[segmentID]
insertLogs := make([]*datapb.FieldBinlog, 0)
for _, fieldBinlog := range buffer.flushedBinlogs[writer.GetSegmentID()] {
for _, fieldBinlog := range buffer.flushedBinlogs[segmentID] {
insertLogs = append(insertLogs, fieldBinlog)
binlogNum = len(fieldBinlog.GetBinlogs())
}

numRows := buffer.flushedRowNum[writer.GetSegmentID()]
statPaths, err := statSerializeWrite(ctx, t.binlogIO, t.logIDAlloc, writer, numRows.Load())
fieldBinlogPaths := make([][]string, 0)
for idx := 0; idx < binlogNum; idx++ {
var ps []string
for _, fieldID := range []int64{t.primaryKeyField.GetFieldID(), common.RowIDField, common.TimeStampField} {
ps = append(ps, buffer.flushedBinlogs[segmentID][fieldID].GetBinlogs()[idx].GetLogPath())
}
fieldBinlogPaths = append(fieldBinlogPaths, ps)
}

statsLogs, err := t.generatePkStats(ctx, segmentID, numRows.Load(), fieldBinlogPaths)
if err != nil {
return err
}

// pack current flushBinlog data into a segment
seg := &datapb.CompactionSegment{
PlanID: t.plan.GetPlanID(),
SegmentID: writer.GetSegmentID(),
SegmentID: segmentID,
NumOfRows: numRows.Load(),
InsertLogs: insertLogs,
Field2StatslogPaths: []*datapb.FieldBinlog{statPaths},
Field2StatslogPaths: []*datapb.FieldBinlog{statsLogs},
Channel: t.plan.GetChannel(),
}
buffer.uploadedSegments = append(buffer.uploadedSegments, seg)
segmentStats := storage.SegmentStats{
FieldStats: []storage.FieldStats{buffer.clusteringKeyFieldStats.Clone()},
NumRows: int(numRows.Load()),
}
buffer.uploadedSegmentStats[writer.GetSegmentID()] = segmentStats
buffer.uploadedSegmentStats[segmentID] = segmentStats

for _, binlog := range seg.InsertLogs {
log.Debug("pack binlog in segment", zap.Int64("partitionID", t.partitionID), zap.Int64("segID", writer.GetSegmentID()), zap.String("binlog", binlog.String()))
log.Debug("pack binlog in segment", zap.Int64("partitionID", t.partitionID),
zap.Int64("segID", segmentID), zap.String("binlog", binlog.String()))
}
for _, statsLog := range seg.Field2StatslogPaths {
log.Debug("pack binlog in segment", zap.Int64("partitionID", t.partitionID),
zap.Int64("segID", segmentID), zap.String("binlog", statsLog.String()))
}

log.Debug("finish pack segment", zap.Int64("partitionID", t.partitionID),
zap.Int64("segID", seg.GetSegmentID()),
zap.Int64("row num", seg.GetNumOfRows()))

// clear segment binlogs cache
delete(buffer.flushedBinlogs, writer.GetSegmentID())
// set old writer nil
writer = nil
delete(buffer.flushedBinlogs, segmentID)
return nil
}

func (t *clusteringCompactionTask) flushBinlog(ctx context.Context, buffer *ClusterBuffer, writer *SegmentWriter, pack bool) error {
_, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, fmt.Sprintf("flushBinlog-%d", writer.GetSegmentID()))
segmentID := writer.GetSegmentID()
_, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, fmt.Sprintf("flushBinlog-%d", segmentID))
defer span.End()
if writer == nil {
log.Warn("buffer writer is nil, please check", zap.Int("buffer id", buffer.id))
return fmt.Errorf("buffer: %d writer is nil, please check", buffer.id)
}
defer func() {
// set old writer nil
writer = nil
}()
buffer.flushLock.Lock()
defer buffer.flushLock.Unlock()
writtenMemorySize := int64(writer.WrittenMemorySize())
writtenRowNum := writer.GetRowNum()
log := log.With(zap.Int("bufferID", buffer.id),
zap.Int64("segmentID", writer.GetSegmentID()),
zap.Int64("segmentID", segmentID),
zap.Bool("pack", pack),
zap.Int64("writerRowNum", writtenRowNum),
zap.Int64("writtenMemorySize", writtenMemorySize),
Expand All @@ -892,7 +913,7 @@ func (t *clusteringCompactionTask) flushBinlog(ctx context.Context, buffer *Clus
if writtenRowNum <= 0 {
log.Debug("writerRowNum is zero, skip flush")
if pack {
return t.packBufferToSegment(ctx, buffer, writer)
return t.packBufferToSegment(ctx, buffer, segmentID)
}
return nil
}
Expand All @@ -909,29 +930,30 @@ func (t *clusteringCompactionTask) flushBinlog(ctx context.Context, buffer *Clus
return err
}

if info, ok := buffer.flushedBinlogs[writer.GetSegmentID()]; !ok || info == nil {
buffer.flushedBinlogs[writer.GetSegmentID()] = make(map[typeutil.UniqueID]*datapb.FieldBinlog)
if info, ok := buffer.flushedBinlogs[segmentID]; !ok || info == nil {
buffer.flushedBinlogs[segmentID] = make(map[typeutil.UniqueID]*datapb.FieldBinlog)
}

for fID, path := range partialBinlogs {
tmpBinlog, ok := buffer.flushedBinlogs[writer.GetSegmentID()][fID]
tmpBinlog, ok := buffer.flushedBinlogs[segmentID][fID]
if !ok {
tmpBinlog = path
} else {
tmpBinlog.Binlogs = append(tmpBinlog.Binlogs, path.GetBinlogs()...)
}
buffer.flushedBinlogs[writer.GetSegmentID()][fID] = tmpBinlog
buffer.flushedBinlogs[segmentID][fID] = tmpBinlog
}
curSegFlushedRowNum := buffer.flushedRowNum[writer.GetSegmentID()]

curSegFlushedRowNum := buffer.flushedRowNum[segmentID]
curSegFlushedRowNum.Add(writtenRowNum)
buffer.flushedRowNum[writer.GetSegmentID()] = curSegFlushedRowNum
buffer.flushedRowNum[segmentID] = curSegFlushedRowNum

// clean buffer with writer
buffer.bufferMemorySize.Sub(writtenMemorySize)

t.flushCount.Inc()
if pack {
if err := t.packBufferToSegment(ctx, buffer, writer); err != nil {
if err := t.packBufferToSegment(ctx, buffer, segmentID); err != nil {
return err
}
}
Expand Down Expand Up @@ -1220,3 +1242,47 @@ func (t *clusteringCompactionTask) checkBuffersAfterCompaction() error {
}
return nil
}

func (t *clusteringCompactionTask) generatePkStats(ctx context.Context, segmentID int64,
numRows int64, binlogPaths [][]string) (*datapb.FieldBinlog, error) {
stats, err := storage.NewPrimaryKeyStats(t.primaryKeyField.GetFieldID(), int64(t.primaryKeyField.GetDataType()), numRows)
if err != nil {
return nil, err
}

for _, path := range binlogPaths {
bytesArr, err := t.binlogIO.Download(ctx, path)
if err != nil {
log.Warn("download insertlogs wrong", zap.Strings("path", path), zap.Error(err))
return nil, err
}
blobs := make([]*storage.Blob, len(bytesArr))
for i := range bytesArr {
blobs[i] = &storage.Blob{Value: bytesArr[i]}
}

pkIter, err := storage.NewInsertBinlogIterator(blobs, t.primaryKeyField.GetFieldID(), t.primaryKeyField.GetDataType())
if err != nil {
log.Warn("new insert binlogs Itr wrong", zap.Strings("path", path), zap.Error(err))
return nil, err
}

for pkIter.HasNext() {
vIter, _ := pkIter.Next()
v, ok := vIter.(*storage.Value)
if !ok {
log.Warn("transfer interface to Value wrong", zap.Strings("path", path))
return nil, errors.New("unexpected error")
}
stats.Update(v.PK)
}
}

codec := storage.NewInsertCodecWithSchema(&etcdpb.CollectionMeta{ID: t.collectionID, Schema: t.plan.GetSchema()})
sblob, err := codec.SerializePkStats(stats, numRows)
if err != nil {
return nil, err
}

return uploadStatsBlobs(ctx, t.collectionID, t.partitionID, segmentID, t.primaryKeyField.GetFieldID(), numRows, t.binlogIO, t.logIDAlloc, sblob)
}
Loading

0 comments on commit aaab827

Please sign in to comment.