Skip to content

Commit

Permalink
enhance: Add deltaRowCount in l0 compaction
Browse files Browse the repository at this point in the history
Signed-off-by: yangxuan <[email protected]>
  • Loading branch information
XuanYang-cn committed Jun 13, 2024
1 parent 62bd51e commit c4e280a
Showing 1 changed file with 43 additions and 53 deletions.
96 changes: 43 additions & 53 deletions internal/datanode/l0_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func (t *levelZeroCompactionTask) serializeUpload(ctx context.Context, segmentWr

func (t *levelZeroCompactionTask) splitDelta(
ctx context.Context,
allDelta []*storage.DeleteData,
allDelta *storage.DeleteData,
targetSegIDs []int64,
) map[int64]*SegmentDeltaWriter {
_, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact splitDelta")
Expand All @@ -257,9 +257,6 @@ func (t *levelZeroCompactionTask) splitDelta(
startIdx := value.StartIdx
pk2SegmentIDs := value.Segment2Hits

pks := allDelta[value.DeleteDataIdx].Pks
tss := allDelta[value.DeleteDataIdx].Tss

for segmentID, hits := range pk2SegmentIDs {
for i, hit := range hits {
if hit {
Expand All @@ -269,23 +266,21 @@ func (t *levelZeroCompactionTask) splitDelta(
writer = NewSegmentDeltaWriter(segmentID, segment.GetPartitionID(), t.getCollection())
targetSegBuffer[segmentID] = writer
}
writer.Write(pks[startIdx+i], tss[startIdx+i])
writer.Write(allDelta.Pks[startIdx+i], allDelta.Tss[startIdx+i])
}
}
}
return true
})

return targetSegBuffer
}

type BatchApplyRet = struct {
DeleteDataIdx int
StartIdx int
Segment2Hits map[int64][]bool
StartIdx int
Segment2Hits map[int64][]bool
}

func (t *levelZeroCompactionTask) applyBFInParallel(deleteDatas []*storage.DeleteData, pool *conc.Pool[any], segmentBfs []*metacache.SegmentInfo) *typeutil.ConcurrentMap[int, *BatchApplyRet] {
func (t *levelZeroCompactionTask) applyBFInParallel(deltaData *storage.DeleteData, pool *conc.Pool[any], segmentBfs []*metacache.SegmentInfo) *typeutil.ConcurrentMap[int, *BatchApplyRet] {
batchSize := paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt()

batchPredict := func(pks []storage.PrimaryKey) map[int64][]bool {
Expand All @@ -302,42 +297,37 @@ func (t *levelZeroCompactionTask) applyBFInParallel(deleteDatas []*storage.Delet
retIdx := 0
retMap := typeutil.NewConcurrentMap[int, *BatchApplyRet]()
var futures []*conc.Future[any]
for didx, data := range deleteDatas {
pks := data.Pks
for idx := 0; idx < len(pks); idx += batchSize {
startIdx := idx
endIdx := startIdx + batchSize
if endIdx > len(pks) {
endIdx = len(pks)
}
pks := deltaData.Pks
for idx := 0; idx < len(pks); idx += batchSize {
startIdx := idx
endIdx := startIdx + batchSize
if endIdx > len(pks) {
endIdx = len(pks)
}

retIdx += 1
tmpRetIndex := retIdx
deleteDataId := didx
future := pool.Submit(func() (any, error) {
ret := batchPredict(pks[startIdx:endIdx])
retMap.Insert(tmpRetIndex, &BatchApplyRet{
DeleteDataIdx: deleteDataId,
StartIdx: startIdx,
Segment2Hits: ret,
})
return nil, nil
retIdx += 1
tmpRetIndex := retIdx
future := pool.Submit(func() (any, error) {
ret := batchPredict(pks[startIdx:endIdx])
retMap.Insert(tmpRetIndex, &BatchApplyRet{
StartIdx: startIdx,
Segment2Hits: ret,
})
futures = append(futures, future)
}
return nil, nil
})
futures = append(futures, future)
}
conc.AwaitAll(futures...)

return retMap
}

func (t *levelZeroCompactionTask) process(ctx context.Context, batchSize int, targetSegments []int64, deltaLogs ...[]string) ([]*datapb.CompactionSegment, error) {
_, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact process")
ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact process")
defer span.End()

results := make([]*datapb.CompactionSegment, 0)
batch := int(math.Ceil(float64(len(targetSegments)) / float64(batchSize)))
log := log.Ctx(t.ctx).With(
log := log.Ctx(ctx).With(
zap.Int64("planID", t.plan.GetPlanID()),
zap.Int("max conc segment counts", batchSize),
zap.Int("total segment counts", len(targetSegments)),
Expand Down Expand Up @@ -365,33 +355,33 @@ func (t *levelZeroCompactionTask) process(ctx context.Context, batchSize int, ta
return nil, err
}

log.Info("L0 compaction finished one batch", zap.Int("batch no.", i), zap.Int("batch segment count", len(batchResults)))
log.Info("L0 compaction finished one batch",
zap.Int("batch no.", i),
zap.Int("total deltaRowCount", int(allDelta.RowCount)),
zap.Int("batch segment count", len(batchResults)))
results = append(results, batchResults...)
}

log.Info("L0 compaction process done")
return results, nil
}

func (t *levelZeroCompactionTask) loadDelta(ctx context.Context, deltaLogs ...[]string) ([]*storage.DeleteData, error) {
_, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact loadDelta")
func (t *levelZeroCompactionTask) loadDelta(ctx context.Context, deltaLogs []string) (*storage.DeleteData, error) {
ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact loadDelta")
defer span.End()
allData := make([]*storage.DeleteData, 0, len(deltaLogs))
for _, paths := range deltaLogs {
blobBytes, err := t.Download(ctx, paths)
if err != nil {
return nil, err
}
blobs := make([]*storage.Blob, 0, len(blobBytes))
for _, blob := range blobBytes {
blobs = append(blobs, &storage.Blob{Value: blob})
}
_, _, dData, err := storage.NewDeleteCodec().Deserialize(blobs)
if err != nil {
return nil, err
}

allData = append(allData, dData)
blobBytes, err := t.Download(ctx, deltaLogs)
if err != nil {
return nil, err
}
return allData, nil
blobs := make([]*storage.Blob, 0, len(blobBytes))
for _, blob := range blobBytes {
blobs = append(blobs, &storage.Blob{Value: blob})
}
_, _, dData, err := storage.NewDeleteCodec().Deserialize(blobs)
if err != nil {
return nil, err
}

return dData, nil
}

0 comments on commit c4e280a

Please sign in to comment.