Skip to content

Commit

Permalink
enhance: Add trace for bf cost in l0 compactor (#33898)
Browse files Browse the repository at this point in the history
pr: #33860

Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
weiliu1031 authored Jun 20, 2024
1 parent e0e2ed0 commit a7ae45c
Showing 1 changed file with 8 additions and 4 deletions.
12 changes: 8 additions & 4 deletions internal/datanode/l0_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ func getMaxBatchSize(totalSize int64) int {
}

func (t *levelZeroCompactionTask) serializeUpload(ctx context.Context, segmentWriters map[int64]*SegmentDeltaWriter) ([]*datapb.CompactionSegment, error) {
traceCtx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact serializeUpload")
defer span.End()
allBlobs := make(map[string][]byte)
results := make([]*datapb.CompactionSegment, 0)
for segID, writer := range segmentWriters {
Expand Down Expand Up @@ -227,7 +229,7 @@ func (t *levelZeroCompactionTask) serializeUpload(ctx context.Context, segmentWr
return nil, nil
}

if err := t.Upload(ctx, allBlobs); err != nil {
if err := t.Upload(traceCtx, allBlobs); err != nil {
log.Warn("L0 compaction serializeUpload upload failed", zap.Error(err))
return nil, err
}
Expand All @@ -240,7 +242,7 @@ func (t *levelZeroCompactionTask) splitDelta(
allDelta []*storage.DeleteData,
targetSegIDs []int64,
) map[int64]*SegmentDeltaWriter {
_, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact splitDelta")
traceCtx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact splitDelta")
defer span.End()

allSeg := lo.Associate(t.plan.GetSegmentBinlogs(), func(segment *datapb.CompactionSegmentBinlogs) (int64, *datapb.CompactionSegmentBinlogs) {
Expand All @@ -250,7 +252,7 @@ func (t *levelZeroCompactionTask) splitDelta(
// segments shall be safe to read outside
segments := t.metacache.GetSegmentsBy(metacache.WithSegmentIDs(targetSegIDs...))
// spilt all delete data to segments
retMap := t.applyBFInParallel(allDelta, io.GetBFApplyPool(), segments)
retMap := t.applyBFInParallel(traceCtx, allDelta, io.GetBFApplyPool(), segments)

targetSegBuffer := make(map[int64]*SegmentDeltaWriter)
retMap.Range(func(key int, value *BatchApplyRet) bool {
Expand Down Expand Up @@ -285,7 +287,9 @@ type BatchApplyRet = struct {
Segment2Hits map[int64][]bool
}

func (t *levelZeroCompactionTask) applyBFInParallel(deleteDatas []*storage.DeleteData, pool *conc.Pool[any], segmentBfs []*metacache.SegmentInfo) *typeutil.ConcurrentMap[int, *BatchApplyRet] {
func (t *levelZeroCompactionTask) applyBFInParallel(ctx context.Context, deleteDatas []*storage.DeleteData, pool *conc.Pool[any], segmentBfs []*metacache.SegmentInfo) *typeutil.ConcurrentMap[int, *BatchApplyRet] {
_, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact applyBFInParallel")
defer span.End()
batchSize := paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt()

batchPredict := func(pks []storage.PrimaryKey) map[int64][]bool {
Expand Down

0 comments on commit a7ae45c

Please sign in to comment.