Skip to content

Commit

Permalink
enhance: Add deltaRowCount in l0 compaction (#33997)
Browse files Browse the repository at this point in the history
See also: #33998

Signed-off-by: yangxuan <[email protected]>
  • Loading branch information
XuanYang-cn authored Jun 20, 2024
1 parent dc4437f commit 04edb07
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 85 deletions.
44 changes: 23 additions & 21 deletions internal/datacoord/compaction_l0_view.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,13 @@ func (v *LevelZeroSegmentsView) String() string {
l0strings := lo.Map(v.segments, func(v *SegmentView, _ int) string {
return v.LevelZeroString()
})
return fmt.Sprintf("label=<%s>, posT=<%v>, l0 segments=%v",

count := lo.SumBy(v.segments, func(v *SegmentView) int {
return v.DeltaRowCount
})
return fmt.Sprintf("L0SegCount=%d, DeltaRowCount=%d, label=<%s>, posT=<%v>, L0 segments=%v",
len(v.segments),
count,
v.label.String(),
v.earliestGrowingSegmentPos.GetTimestamp(),
l0strings)
Expand Down Expand Up @@ -116,19 +122,20 @@ func (v *LevelZeroSegmentsView) minCountSizeTrigger(segments []*SegmentView) (pi
maxDeltaCount = paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMaxNum.GetAsInt()
)

curSize := float64(0)
pickedSize := float64(0)
pickedCount := 0

// count >= minDeltaCount
if lo.SumBy(segments, func(view *SegmentView) int { return view.DeltalogCount }) >= minDeltaCount {
picked, curSize = pickByMaxCountSize(segments, maxDeltaSize, maxDeltaCount)
reason = fmt.Sprintf("level zero segments count reaches minForceTriggerCountLimit=%d, curDeltaSize=%.2f, curDeltaCount=%d", minDeltaCount, curSize, len(segments))
picked, pickedSize, pickedCount = pickByMaxCountSize(segments, maxDeltaSize, maxDeltaCount)
reason = fmt.Sprintf("level zero segments count reaches minForceTriggerCountLimit=%d, pickedSize=%.2fB, pickedCount=%d", minDeltaCount, pickedSize, pickedCount)
return
}

// size >= minDeltaSize
if lo.SumBy(segments, func(view *SegmentView) float64 { return view.DeltaSize }) >= minDeltaSize {
picked, curSize = pickByMaxCountSize(segments, maxDeltaSize, maxDeltaCount)
reason = fmt.Sprintf("level zero segments size reaches minForceTriggerSizeLimit=%.2f, curDeltaSize=%.2f, curDeltaCount=%d", minDeltaSize, curSize, len(segments))
picked, pickedSize, pickedCount = pickByMaxCountSize(segments, maxDeltaSize, maxDeltaCount)
reason = fmt.Sprintf("level zero segments size reaches minForceTriggerSizeLimit=%.2fB, pickedSize=%.2fB, pickedCount=%d", minDeltaSize, pickedSize, pickedCount)
return
}

Expand All @@ -143,30 +150,25 @@ func (v *LevelZeroSegmentsView) forceTrigger(segments []*SegmentView) (picked []
maxDeltaCount = paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMaxNum.GetAsInt()
)

curSize := float64(0)
picked, curSize = pickByMaxCountSize(segments, maxDeltaSize, maxDeltaCount)
reason = fmt.Sprintf("level zero views force to trigger, curDeltaSize=%.2f, curDeltaCount=%d", curSize, len(segments))
return
picked, pickedSize, pickedCount := pickByMaxCountSize(segments, maxDeltaSize, maxDeltaCount)
reason = fmt.Sprintf("level zero views force to trigger, pickedSize=%.2fB, pickedCount=%d", pickedSize, pickedCount)
return picked, reason
}

// pickByMaxCountSize picks segments that count <= maxCount or size <= maxSize
func pickByMaxCountSize(segments []*SegmentView, maxSize float64, maxCount int) ([]*SegmentView, float64) {
var (
curDeltaCount = 0
curDeltaSize = float64(0)
)
func pickByMaxCountSize(segments []*SegmentView, maxSize float64, maxCount int) (picked []*SegmentView, pickedSize float64, pickedCount int) {
idx := 0
for _, view := range segments {
targetCount := view.DeltalogCount + curDeltaCount
targetSize := view.DeltaSize + curDeltaSize
targetCount := view.DeltalogCount + pickedCount
targetSize := view.DeltaSize + pickedSize

if (curDeltaCount != 0 && curDeltaSize != float64(0)) && (targetSize > maxSize || targetCount > maxCount) {
if (pickedCount != 0 && pickedSize != float64(0)) && (targetSize > maxSize || targetCount > maxCount) {
break
}

curDeltaCount = targetCount
curDeltaSize = targetSize
pickedCount = targetCount
pickedSize = targetSize
idx += 1
}
return segments[:idx], curDeltaSize
return segments[:idx], pickedSize, pickedCount
}
1 change: 1 addition & 0 deletions internal/datacoord/compaction_l0_view_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ func (s *LevelZeroSegmentsViewSuite) TestTrigger() {
if view.dmlPos.Timestamp < test.prepEarliestT {
view.DeltalogCount = test.prepCountEach
view.DeltaSize = test.prepSizeEach
view.DeltaRowCount = 1
}
}
log.Info("LevelZeroSegmentsView", zap.String("view", s.v.String()))
Expand Down
31 changes: 24 additions & 7 deletions internal/datacoord/compaction_view.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ type SegmentView struct {
BinlogCount int
StatslogCount int
DeltalogCount int

// row count
DeltaRowCount int
}

func (s *SegmentView) Clone() *SegmentView {
Expand All @@ -123,6 +126,7 @@ func (s *SegmentView) Clone() *SegmentView {
BinlogCount: s.BinlogCount,
StatslogCount: s.StatslogCount,
DeltalogCount: s.DeltalogCount,
DeltaRowCount: s.DeltaRowCount,
NumOfRows: s.NumOfRows,
MaxRowNum: s.MaxRowNum,
}
Expand All @@ -147,6 +151,7 @@ func GetViewsByInfo(segments ...*SegmentInfo) []*SegmentView {

DeltaSize: GetBinlogSizeAsBytes(segment.GetDeltalogs()),
DeltalogCount: GetBinlogCount(segment.GetDeltalogs()),
DeltaRowCount: GetBinlogEntriesNum(segment.GetDeltalogs()),

Size: GetBinlogSizeAsBytes(segment.GetBinlogs()),
BinlogCount: GetBinlogCount(segment.GetBinlogs()),
Expand All @@ -166,17 +171,19 @@ func (v *SegmentView) Equal(other *SegmentView) bool {
v.DeltaSize == other.DeltaSize &&
v.BinlogCount == other.BinlogCount &&
v.StatslogCount == other.StatslogCount &&
v.DeltalogCount == other.DeltalogCount
v.DeltalogCount == other.DeltalogCount &&
v.NumOfRows == other.NumOfRows &&
v.DeltaRowCount == other.DeltaRowCount
}

func (v *SegmentView) String() string {
return fmt.Sprintf("ID=%d, label=<%s>, state=%s, level=%s, binlogSize=%.2f, binlogCount=%d, deltaSize=%.2f, deltaCount=%d, expireSize=%.2f",
v.ID, v.label, v.State.String(), v.Level.String(), v.Size, v.BinlogCount, v.DeltaSize, v.DeltalogCount, v.ExpireSize)
return fmt.Sprintf("ID=%d, label=<%s>, state=%s, level=%s, binlogSize=%.2f, binlogCount=%d, deltaSize=%.2f, deltalogCount=%d, deltaRowCount=%d, expireSize=%.2f",
v.ID, v.label, v.State.String(), v.Level.String(), v.Size, v.BinlogCount, v.DeltaSize, v.DeltalogCount, v.DeltaRowCount, v.ExpireSize)
}

func (v *SegmentView) LevelZeroString() string {
return fmt.Sprintf("<ID=%d, level=%s, deltaSize=%.2f, deltaCount=%d>",
v.ID, v.Level.String(), v.DeltaSize, v.DeltalogCount)
return fmt.Sprintf("<ID=%d, level=%s, deltaSize=%.2f, deltaLogCount=%d, deltaRowCount=%d>",
v.ID, v.Level.String(), v.DeltaSize, v.DeltalogCount, v.DeltaRowCount)
}

func GetBinlogCount(fieldBinlogs []*datapb.FieldBinlog) int {
Expand All @@ -187,9 +194,19 @@ func GetBinlogCount(fieldBinlogs []*datapb.FieldBinlog) int {
return num
}

func GetBinlogSizeAsBytes(deltaBinlogs []*datapb.FieldBinlog) float64 {
func GetBinlogEntriesNum(fieldBinlogs []*datapb.FieldBinlog) int {
var num int
for _, fbinlog := range fieldBinlogs {
for _, binlog := range fbinlog.GetBinlogs() {
num += int(binlog.GetEntriesNum())
}
}
return num
}

func GetBinlogSizeAsBytes(fieldBinlogs []*datapb.FieldBinlog) float64 {
var deltaSize float64
for _, deltaLogs := range deltaBinlogs {
for _, deltaLogs := range fieldBinlogs {
for _, l := range deltaLogs.GetBinlogs() {
deltaSize += float64(l.GetMemorySize())
}
Expand Down
92 changes: 41 additions & 51 deletions internal/datanode/compaction/l0_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (t *LevelZeroCompactionTask) serializeUpload(ctx context.Context, segmentWr

func (t *LevelZeroCompactionTask) splitDelta(
ctx context.Context,
allDelta []*storage.DeleteData,
allDelta *storage.DeleteData,
segmentBfs map[int64]*metacache.BloomFilterSet,
) map[int64]*SegmentDeltaWriter {
traceCtx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact splitDelta")
Expand All @@ -252,9 +252,6 @@ func (t *LevelZeroCompactionTask) splitDelta(
startIdx := value.StartIdx
pk2SegmentIDs := value.Segment2Hits

pks := allDelta[value.DeleteDataIdx].Pks
tss := allDelta[value.DeleteDataIdx].Tss

for segmentID, hits := range pk2SegmentIDs {
for i, hit := range hits {
if hit {
Expand All @@ -264,23 +261,21 @@ func (t *LevelZeroCompactionTask) splitDelta(
writer = NewSegmentDeltaWriter(segmentID, segment.GetPartitionID(), segment.GetCollectionID())
targetSegBuffer[segmentID] = writer
}
writer.Write(pks[startIdx+i], tss[startIdx+i])
writer.Write(allDelta.Pks[startIdx+i], allDelta.Tss[startIdx+i])
}
}
}
return true
})

return targetSegBuffer
}

type BatchApplyRet = struct {
DeleteDataIdx int
StartIdx int
Segment2Hits map[int64][]bool
StartIdx int
Segment2Hits map[int64][]bool
}

func (t *LevelZeroCompactionTask) applyBFInParallel(ctx context.Context, deleteDatas []*storage.DeleteData, pool *conc.Pool[any], segmentBfs map[int64]*metacache.BloomFilterSet) *typeutil.ConcurrentMap[int, *BatchApplyRet] {
func (t *LevelZeroCompactionTask) applyBFInParallel(ctx context.Context, deltaData *storage.DeleteData, pool *conc.Pool[any], segmentBfs map[int64]*metacache.BloomFilterSet) *typeutil.ConcurrentMap[int, *BatchApplyRet] {
_, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact applyBFInParallel")
defer span.End()
batchSize := paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt()
Expand All @@ -298,32 +293,27 @@ func (t *LevelZeroCompactionTask) applyBFInParallel(ctx context.Context, deleteD
retIdx := 0
retMap := typeutil.NewConcurrentMap[int, *BatchApplyRet]()
var futures []*conc.Future[any]
for didx, data := range deleteDatas {
pks := data.Pks
for idx := 0; idx < len(pks); idx += batchSize {
startIdx := idx
endIdx := startIdx + batchSize
if endIdx > len(pks) {
endIdx = len(pks)
}
pks := deltaData.Pks
for idx := 0; idx < len(pks); idx += batchSize {
startIdx := idx
endIdx := startIdx + batchSize
if endIdx > len(pks) {
endIdx = len(pks)
}

retIdx += 1
tmpRetIndex := retIdx
deleteDataId := didx
future := pool.Submit(func() (any, error) {
ret := batchPredict(pks[startIdx:endIdx])
retMap.Insert(tmpRetIndex, &BatchApplyRet{
DeleteDataIdx: deleteDataId,
StartIdx: startIdx,
Segment2Hits: ret,
})
return nil, nil
retIdx += 1
tmpRetIndex := retIdx
future := pool.Submit(func() (any, error) {
ret := batchPredict(pks[startIdx:endIdx])
retMap.Insert(tmpRetIndex, &BatchApplyRet{
StartIdx: startIdx,
Segment2Hits: ret,
})
futures = append(futures, future)
}
return nil, nil
})
futures = append(futures, future)
}
conc.AwaitAll(futures...)

return retMap
}

Expand All @@ -333,7 +323,7 @@ func (t *LevelZeroCompactionTask) process(ctx context.Context, batchSize int, ta

results := make([]*datapb.CompactionSegment, 0)
batch := int(math.Ceil(float64(len(targetSegments)) / float64(batchSize)))
log := log.Ctx(t.ctx).With(
log := log.Ctx(ctx).With(
zap.Int64("planID", t.plan.GetPlanID()),
zap.Int("max conc segment counts", batchSize),
zap.Int("total segment counts", len(targetSegments)),
Expand Down Expand Up @@ -366,35 +356,35 @@ func (t *LevelZeroCompactionTask) process(ctx context.Context, batchSize int, ta
return nil, err
}

log.Info("L0 compaction finished one batch", zap.Int("batch no.", i), zap.Int("batch segment count", len(batchResults)))
log.Info("L0 compaction finished one batch",
zap.Int("batch no.", i),
zap.Int("total deltaRowCount", int(allDelta.RowCount)),
zap.Int("batch segment count", len(batchResults)))
results = append(results, batchResults...)
}

log.Info("L0 compaction process done")
return results, nil
}

func (t *LevelZeroCompactionTask) loadDelta(ctx context.Context, deltaLogs ...[]string) ([]*storage.DeleteData, error) {
func (t *LevelZeroCompactionTask) loadDelta(ctx context.Context, deltaLogs []string) (*storage.DeleteData, error) {
_, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact loadDelta")
defer span.End()
allData := make([]*storage.DeleteData, 0, len(deltaLogs))
for _, paths := range deltaLogs {
blobBytes, err := t.Download(ctx, paths)
if err != nil {
return nil, err
}
blobs := make([]*storage.Blob, 0, len(blobBytes))
for _, blob := range blobBytes {
blobs = append(blobs, &storage.Blob{Value: blob})
}
_, _, dData, err := storage.NewDeleteCodec().Deserialize(blobs)
if err != nil {
return nil, err
}

allData = append(allData, dData)
blobBytes, err := t.Download(ctx, deltaLogs)
if err != nil {
return nil, err
}
blobs := make([]*storage.Blob, 0, len(blobBytes))
for _, blob := range blobBytes {
blobs = append(blobs, &storage.Blob{Value: blob})
}
_, _, dData, err := storage.NewDeleteCodec().Deserialize(blobs)
if err != nil {
return nil, err
}
return allData, nil

return dData, nil
}

func (t *LevelZeroCompactionTask) loadBF(ctx context.Context, targetSegments []*datapb.CompactionSegmentBinlogs) (map[int64]*metacache.BloomFilterSet, error) {
Expand Down
12 changes: 6 additions & 6 deletions internal/datanode/compaction/l0_compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ func (s *LevelZeroCompactionTaskSuite) TestSplitDelta() {
101: bfs2,
102: bfs3,
}
deltaWriters := s.task.splitDelta(context.TODO(), []*storage.DeleteData{s.dData}, segmentBFs)
deltaWriters := s.task.splitDelta(context.TODO(), s.dData, segmentBFs)

s.NotEmpty(deltaWriters)
s.ElementsMatch(predicted, lo.Keys(deltaWriters))
Expand Down Expand Up @@ -523,16 +523,16 @@ func (s *LevelZeroCompactionTaskSuite) TestLoadDelta() {
}

for _, test := range tests {
dDatas, err := s.task.loadDelta(ctx, test.paths)
dData, err := s.task.loadDelta(ctx, test.paths)

if test.expectError {
s.Error(err)
} else {
s.NoError(err)
s.NotEmpty(dDatas)
s.EqualValues(1, len(dDatas))
s.ElementsMatch(s.dData.Pks, dDatas[0].Pks)
s.Equal(s.dData.RowCount, dDatas[0].RowCount)
s.NotEmpty(dData)
s.NotNil(dData)
s.ElementsMatch(s.dData.Pks, dData.Pks)
s.Equal(s.dData.RowCount, dData.RowCount)
}
}
}
Expand Down

0 comments on commit 04edb07

Please sign in to comment.