Skip to content

Commit

Permalink
fix: BinlogDeserializeReader leak in mix_compactor.go (#36270)
Browse files Browse the repository at this point in the history
#36269

Signed-off-by: fengjun2016 <[email protected]>
  • Loading branch information
fengjun2016 authored Oct 11, 2024
1 parent 621dbc9 commit 7c8b71e
Showing 1 changed file with 66 additions and 53 deletions.
119 changes: 66 additions & 53 deletions internal/datanode/compaction/mix_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"go.opentelemetry.io/otel"
"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/flushcommon/io"
"github.com/milvus-io/milvus/internal/proto/datapb"
Expand Down Expand Up @@ -141,17 +142,6 @@ func (t *mixCompactionTask) mergeSplit(
compAlloc := NewCompactionAllocator(segIDAlloc, logIDAlloc)
mWriter := NewMultiSegmentWriter(t.binlogIO, compAlloc, t.plan, t.maxRows, t.partitionID, t.collectionID)

isValueDeleted := func(v *storage.Value) bool {
ts, ok := delta[v.PK.GetValue()]
// insert task and delete task has the same ts when upsert
// here should be < instead of <=
// to avoid the upsert data to be deleted after compact
if ok && uint64(v.Timestamp) < ts {
return true
}
return false
}

deletedRowCount := int64(0)
expiredRowCount := int64(0)

Expand All @@ -161,51 +151,10 @@ func (t *mixCompactionTask) mergeSplit(
return nil, err
}
for _, paths := range binlogPaths {
log := log.With(zap.Strings("paths", paths))
allValues, err := t.binlogIO.Download(ctx, paths)
err := t.dealBinlogPaths(ctx, delta, mWriter, pkField, paths, &deletedRowCount, &expiredRowCount)
if err != nil {
log.Warn("compact wrong, fail to download insertLogs", zap.Error(err))
return nil, err
}

blobs := lo.Map(allValues, func(v []byte, i int) *storage.Blob {
return &storage.Blob{Key: paths[i], Value: v}
})

iter, err := storage.NewBinlogDeserializeReader(blobs, pkField.GetFieldID())
if err != nil {
log.Warn("compact wrong, failed to new insert binlogs reader", zap.Error(err))
return nil, err
}

for {
err := iter.Next()
if err != nil {
if err == sio.EOF {
break
} else {
log.Warn("compact wrong, failed to iter through data", zap.Error(err))
return nil, err
}
}
v := iter.Value()
if isValueDeleted(v) {
deletedRowCount++
continue
}

// Filtering expired entity
if isExpiredEntity(t.plan.GetCollectionTtl(), t.currentTs, typeutil.Timestamp(v.Timestamp)) {
expiredRowCount++
continue
}

err = mWriter.Write(v)
if err != nil {
log.Warn("compact wrong, failed to writer row", zap.Error(err))
return nil, err
}
}
}
res, err := mWriter.Finish()
if err != nil {
Expand All @@ -223,6 +172,70 @@ func (t *mixCompactionTask) mergeSplit(
return res, nil
}

func isValueDeleted(v *storage.Value, delta map[interface{}]typeutil.Timestamp) bool {
ts, ok := delta[v.PK.GetValue()]
// insert task and delete task has the same ts when upsert
// here should be < instead of <=
// to avoid the upsert data to be deleted after compact
if ok && uint64(v.Timestamp) < ts {
return true
}
return false
}

func (t *mixCompactionTask) dealBinlogPaths(ctx context.Context, delta map[interface{}]typeutil.Timestamp, mWriter *MultiSegmentWriter, pkField *schemapb.FieldSchema, paths []string, deletedRowCount, expiredRowCount *int64) error {
log := log.With(zap.Strings("paths", paths))
allValues, err := t.binlogIO.Download(ctx, paths)
if err != nil {
log.Warn("compact wrong, fail to download insertLogs", zap.Error(err))
return err
}

blobs := lo.Map(allValues, func(v []byte, i int) *storage.Blob {
return &storage.Blob{Key: paths[i], Value: v}
})

iter, err := storage.NewBinlogDeserializeReader(blobs, pkField.GetFieldID())
if err != nil {
log.Warn("compact wrong, failed to new insert binlogs reader", zap.Error(err))
return err
}
defer iter.Close()

for {
err := iter.Next()
if err != nil {
if err == sio.EOF {
break
} else {
log.Warn("compact wrong, failed to iter through data", zap.Error(err))
return err
}
}
v := iter.Value()
if isValueDeleted(v, delta) {
oldDeletedRowCount := *deletedRowCount
*deletedRowCount = oldDeletedRowCount + 1
continue
}

// Filtering expired entity
if isExpiredEntity(t.plan.GetCollectionTtl(), t.currentTs, typeutil.Timestamp(v.Timestamp)) {
oldExpiredRowCount := *expiredRowCount
*expiredRowCount = oldExpiredRowCount + 1
continue
}

err = mWriter.Write(v)
if err != nil {
log.Warn("compact wrong, failed to writer row", zap.Error(err))
return err
}
}

return nil
}

func (t *mixCompactionTask) Compact() (*datapb.CompactionPlanResult, error) {
durInQueue := t.tr.RecordSpan()
ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(t.ctx, fmt.Sprintf("MixCompact-%d", t.GetPlanID()))
Expand Down

0 comments on commit 7c8b71e

Please sign in to comment.