From c4e280a5afc3fdf70a42517577fedc5c95ab1d10 Mon Sep 17 00:00:00 2001 From: yangxuan Date: Thu, 13 Jun 2024 19:33:57 +0800 Subject: [PATCH] enhance: Add deltaRowCount in l0 compaction Signed-off-by: yangxuan --- internal/datanode/l0_compactor.go | 96 ++++++++++++++----------------- 1 file changed, 43 insertions(+), 53 deletions(-) diff --git a/internal/datanode/l0_compactor.go b/internal/datanode/l0_compactor.go index 169238ae5e742..8c395fe5757a9 100644 --- a/internal/datanode/l0_compactor.go +++ b/internal/datanode/l0_compactor.go @@ -237,7 +237,7 @@ func (t *levelZeroCompactionTask) serializeUpload(ctx context.Context, segmentWr func (t *levelZeroCompactionTask) splitDelta( ctx context.Context, - allDelta []*storage.DeleteData, + allDelta *storage.DeleteData, targetSegIDs []int64, ) map[int64]*SegmentDeltaWriter { _, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact splitDelta") @@ -257,9 +257,6 @@ func (t *levelZeroCompactionTask) splitDelta( startIdx := value.StartIdx pk2SegmentIDs := value.Segment2Hits - pks := allDelta[value.DeleteDataIdx].Pks - tss := allDelta[value.DeleteDataIdx].Tss - for segmentID, hits := range pk2SegmentIDs { for i, hit := range hits { if hit { @@ -269,23 +266,21 @@ func (t *levelZeroCompactionTask) splitDelta( writer = NewSegmentDeltaWriter(segmentID, segment.GetPartitionID(), t.getCollection()) targetSegBuffer[segmentID] = writer } - writer.Write(pks[startIdx+i], tss[startIdx+i]) + writer.Write(allDelta.Pks[startIdx+i], allDelta.Tss[startIdx+i]) } } } return true }) - return targetSegBuffer } type BatchApplyRet = struct { - DeleteDataIdx int - StartIdx int - Segment2Hits map[int64][]bool + StartIdx int + Segment2Hits map[int64][]bool } -func (t *levelZeroCompactionTask) applyBFInParallel(deleteDatas []*storage.DeleteData, pool *conc.Pool[any], segmentBfs []*metacache.SegmentInfo) *typeutil.ConcurrentMap[int, *BatchApplyRet] { +func (t *levelZeroCompactionTask) applyBFInParallel(deltaData *storage.DeleteData, pool *conc.Pool[any], segmentBfs []*metacache.SegmentInfo) *typeutil.ConcurrentMap[int, *BatchApplyRet] { batchSize := paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt() batchPredict := func(pks []storage.PrimaryKey) map[int64][]bool { @@ -302,42 +297,37 @@ func (t *levelZeroCompactionTask) applyBFInParallel(deleteDatas []*storage.Delet retIdx := 0 retMap := typeutil.NewConcurrentMap[int, *BatchApplyRet]() var futures []*conc.Future[any] - for didx, data := range deleteDatas { - pks := data.Pks - for idx := 0; idx < len(pks); idx += batchSize { - startIdx := idx - endIdx := startIdx + batchSize - if endIdx > len(pks) { - endIdx = len(pks) - } + pks := deltaData.Pks + for idx := 0; idx < len(pks); idx += batchSize { + startIdx := idx + endIdx := startIdx + batchSize + if endIdx > len(pks) { + endIdx = len(pks) + } - retIdx += 1 - tmpRetIndex := retIdx - deleteDataId := didx - future := pool.Submit(func() (any, error) { - ret := batchPredict(pks[startIdx:endIdx]) - retMap.Insert(tmpRetIndex, &BatchApplyRet{ - DeleteDataIdx: deleteDataId, - StartIdx: startIdx, - Segment2Hits: ret, - }) - return nil, nil + retIdx += 1 + tmpRetIndex := retIdx + future := pool.Submit(func() (any, error) { + ret := batchPredict(pks[startIdx:endIdx]) + retMap.Insert(tmpRetIndex, &BatchApplyRet{ + StartIdx: startIdx, + Segment2Hits: ret, }) - futures = append(futures, future) - } + return nil, nil + }) + futures = append(futures, future) } conc.AwaitAll(futures...) - return retMap } func (t *levelZeroCompactionTask) process(ctx context.Context, batchSize int, targetSegments []int64, deltaLogs ...[]string) ([]*datapb.CompactionSegment, error) { - _, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact process") + ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact process") defer span.End() results := make([]*datapb.CompactionSegment, 0) batch := int(math.Ceil(float64(len(targetSegments)) / float64(batchSize))) - log := log.Ctx(t.ctx).With( + log := log.Ctx(ctx).With( zap.Int64("planID", t.plan.GetPlanID()), zap.Int("max conc segment counts", batchSize), zap.Int("total segment counts", len(targetSegments)), @@ -365,7 +355,10 @@ func (t *levelZeroCompactionTask) process(ctx context.Context, batchSize int, ta return nil, err } - log.Info("L0 compaction finished one batch", zap.Int("batch no.", i), zap.Int("batch segment count", len(batchResults))) + log.Info("L0 compaction finished one batch", + zap.Int("batch no.", i), + zap.Int("total deltaRowCount", int(allDelta.RowCount)), + zap.Int("batch segment count", len(batchResults))) results = append(results, batchResults...) } @@ -373,25 +366,22 @@ func (t *levelZeroCompactionTask) process(ctx context.Context, batchSize int, ta return results, nil } -func (t *levelZeroCompactionTask) loadDelta(ctx context.Context, deltaLogs ...[]string) ([]*storage.DeleteData, error) { - _, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact loadDelta") +func (t *levelZeroCompactionTask) loadDelta(ctx context.Context, deltaLogs []string) (*storage.DeleteData, error) { + ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact loadDelta") defer span.End() - allData := make([]*storage.DeleteData, 0, len(deltaLogs)) - for _, paths := range deltaLogs { - blobBytes, err := t.Download(ctx, paths) - if err != nil { - return nil, err - } - blobs := make([]*storage.Blob, 0, len(blobBytes)) - for _, blob := range blobBytes { - blobs = append(blobs, &storage.Blob{Value: blob}) - } - _, _, dData, err := storage.NewDeleteCodec().Deserialize(blobs) - if err != nil { - return nil, err - } - allData = append(allData, dData) + blobBytes, err := t.Download(ctx, deltaLogs) + if err != nil { + return nil, err } - return allData, nil + blobs := make([]*storage.Blob, 0, len(blobBytes)) + for _, blob := range blobBytes { + blobs = append(blobs, &storage.Blob{Value: blob}) + } + _, _, dData, err := storage.NewDeleteCodec().Deserialize(blobs) + if err != nil { + return nil, err + } + + return dData, nil }