diff --git a/internal/datanode/l0_compactor.go b/internal/datanode/l0_compactor.go index 169238ae5e742..f00a0768d1b31 100644 --- a/internal/datanode/l0_compactor.go +++ b/internal/datanode/l0_compactor.go @@ -188,6 +188,8 @@ func getMaxBatchSize(totalSize int64) int { } func (t *levelZeroCompactionTask) serializeUpload(ctx context.Context, segmentWriters map[int64]*SegmentDeltaWriter) ([]*datapb.CompactionSegment, error) { + traceCtx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact serializeUpload") + defer span.End() allBlobs := make(map[string][]byte) results := make([]*datapb.CompactionSegment, 0) for segID, writer := range segmentWriters { @@ -227,7 +229,7 @@ func (t *levelZeroCompactionTask) serializeUpload(ctx context.Context, segmentWr return nil, nil } - if err := t.Upload(ctx, allBlobs); err != nil { + if err := t.Upload(traceCtx, allBlobs); err != nil { log.Warn("L0 compaction serializeUpload upload failed", zap.Error(err)) return nil, err } @@ -240,7 +242,7 @@ func (t *levelZeroCompactionTask) splitDelta( allDelta []*storage.DeleteData, targetSegIDs []int64, ) map[int64]*SegmentDeltaWriter { - _, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact splitDelta") + traceCtx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact splitDelta") defer span.End() allSeg := lo.Associate(t.plan.GetSegmentBinlogs(), func(segment *datapb.CompactionSegmentBinlogs) (int64, *datapb.CompactionSegmentBinlogs) { @@ -250,7 +252,7 @@ func (t *levelZeroCompactionTask) splitDelta( // segments shall be safe to read outside segments := t.metacache.GetSegmentsBy(metacache.WithSegmentIDs(targetSegIDs...)) // spilt all delete data to segments - retMap := t.applyBFInParallel(allDelta, io.GetBFApplyPool(), segments) + retMap := t.applyBFInParallel(traceCtx, allDelta, io.GetBFApplyPool(), segments) targetSegBuffer := make(map[int64]*SegmentDeltaWriter) retMap.Range(func(key int, value *BatchApplyRet) bool { @@ -285,7 +287,9 @@ type BatchApplyRet = struct { Segment2Hits map[int64][]bool } -func (t *levelZeroCompactionTask) applyBFInParallel(deleteDatas []*storage.DeleteData, pool *conc.Pool[any], segmentBfs []*metacache.SegmentInfo) *typeutil.ConcurrentMap[int, *BatchApplyRet] { +func (t *levelZeroCompactionTask) applyBFInParallel(ctx context.Context, deleteDatas []*storage.DeleteData, pool *conc.Pool[any], segmentBfs []*metacache.SegmentInfo) *typeutil.ConcurrentMap[int, *BatchApplyRet] { + _, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact applyBFInParallel") + defer span.End() batchSize := paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt() batchPredict := func(pks []storage.PrimaryKey) map[int64][]bool {