From 1127a392a5c5fb24a108086ff06d852d6ec4a6d4 Mon Sep 17 00:00:00 2001 From: Cai Zhang Date: Tue, 18 Jun 2024 14:42:23 +0800 Subject: [PATCH] Optimeize clustering compaction Signed-off-by: Cai Zhang --- internal/datacoord/compaction.go | 11 +- .../compaction/clustering_compactor.go | 263 ++++++++++++------ internal/datanode/io/binlog_io.go | 7 + 3 files changed, 196 insertions(+), 85 deletions(-) diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index 145849bec3c01..819699c8f1984 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -570,11 +570,12 @@ func (c *compactionPlanHandler) createCompactTask(t *datapb.CompactionTask) (Com } case datapb.CompactionType_ClusteringCompaction: task = &clusteringCompactionTask{ - CompactionTask: t, - meta: c.meta, - sessions: c.sessions, - handler: c.handler, - analyzeScheduler: c.analyzeScheduler, + CompactionTask: t, + meta: c.meta, + sessions: c.sessions, + handler: c.handler, + analyzeScheduler: c.analyzeScheduler, + lastUpdateStateTime: time.Now().UnixMilli(), } default: return nil, merr.WrapErrIllegalCompactionPlan("illegal compaction type") diff --git a/internal/datanode/compaction/clustering_compactor.go b/internal/datanode/compaction/clustering_compactor.go index 1fb6b8c3acb71..e5d21e2579368 100644 --- a/internal/datanode/compaction/clustering_compactor.go +++ b/internal/datanode/compaction/clustering_compactor.go @@ -21,6 +21,8 @@ import ( "fmt" "math" "path" + "runtime" + "runtime/debug" "sort" "strconv" "strings" @@ -73,7 +75,8 @@ type clusteringCompactionTask struct { // flush flushMutex sync.Mutex flushCount *atomic.Int64 - flushChan chan SpillSignal + flushChan chan FlushSignal + doneChan chan struct{} // metrics writtenRowNum *atomic.Int64 @@ -99,8 +102,9 @@ type clusteringCompactionTask struct { type ClusterBuffer struct { id int - writer *SegmentWriter - bufferRowNum atomic.Int64 + writer *SegmentWriter + bufferRowNum atomic.Int64 + bufferMemorySize atomic.Int64 flushedRowNum atomic.Int64 flushedBinlogs map[typeutil.UniqueID]*datapb.FieldBinlog @@ -111,8 +115,11 @@ type ClusterBuffer struct { clusteringKeyFieldStats *storage.FieldStats } -type SpillSignal struct { - buffer *ClusterBuffer +type FlushSignal struct { + writer *SegmentWriter + pack bool + id int + done bool } func NewClusteringCompactionTask( @@ -130,7 +137,8 @@ func NewClusteringCompactionTask( plan: plan, tr: timerecord.NewTimeRecorder("clustering_compaction"), done: make(chan struct{}, 1), - flushChan: make(chan SpillSignal, 100), + flushChan: make(chan FlushSignal, 100), + doneChan: make(chan struct{}), clusterBuffers: make([]*ClusterBuffer, 0), clusterBufferLocks: lock.NewKeyLock[int](), flushCount: atomic.NewInt64(0), @@ -261,6 +269,8 @@ func (t *clusteringCompactionTask) Compact() (*datapb.CompactionPlanResult, erro } func (t *clusteringCompactionTask) getScalarAnalyzeResult(ctx context.Context) error { + ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, fmt.Sprintf("getScalarAnalyzeResult-%d", t.GetPlanID())) + defer span.End() analyzeDict, err := t.scalarAnalyze(ctx) if err != nil { return err @@ -296,6 +306,8 @@ func (t *clusteringCompactionTask) getScalarAnalyzeResult(ctx context.Context) e } func (t *clusteringCompactionTask) getVectorAnalyzeResult(ctx context.Context) error { + ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, fmt.Sprintf("getVectorAnalyzeResult-%d", t.GetPlanID())) + defer span.End() analyzeResultPath := t.plan.AnalyzeResultPath centroidFilePath := path.Join(analyzeResultPath, metautil.JoinIDPath(t.collectionID, t.partitionID, t.clusteringKeyField.FieldID), common.Centroids) offsetMappingFiles := make(map[int64]string, 0) @@ -344,6 +356,8 @@ func (t *clusteringCompactionTask) getVectorAnalyzeResult(ctx context.Context) e func (t *clusteringCompactionTask) mapping(ctx context.Context, deltaPk2Ts map[interface{}]typeutil.Timestamp, ) ([]*datapb.CompactionSegment, *storage.PartitionStatsSnapshot, error) { + ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, fmt.Sprintf("mapping-%d", t.GetPlanID())) + defer span.End() inputSegments := t.plan.GetSegmentBinlogs() mapStart := time.Now() @@ -367,6 +381,13 @@ func (t *clusteringCompactionTask) mapping(ctx context.Context, return nil, nil, err } + t.flushChan <- FlushSignal{ + done: true, + } + + // block util all writer flushed. + <-t.doneChan + // force flush all buffers err := t.flushAll(ctx) if err != nil { @@ -410,7 +431,7 @@ func (t *clusteringCompactionTask) mapping(ctx context.Context, func (t *clusteringCompactionTask) getUsedMemoryBufferSize() int64 { var totalBufferSize int64 = 0 for _, buffer := range t.clusterBuffers { - totalBufferSize = totalBufferSize + int64(buffer.writer.WrittenMemorySize()) + totalBufferSize = totalBufferSize + int64(buffer.writer.WrittenMemorySize()) + buffer.bufferMemorySize.Load() } return totalBufferSize } @@ -421,7 +442,7 @@ func (t *clusteringCompactionTask) mappingSegment( segment *datapb.CompactionSegmentBinlogs, delta map[interface{}]typeutil.Timestamp, ) error { - ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, fmt.Sprintf("Compact-Map-%d", t.GetPlanID())) + ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, fmt.Sprintf("mappingSegment-%d-%d", t.GetPlanID(), segment.GetSegmentID())) defer span.End() log := log.With(zap.Int64("planID", t.GetPlanID()), zap.Int64("collectionID", t.GetCollection()), @@ -475,6 +496,11 @@ func (t *clusteringCompactionTask) mappingSegment( } for idx := 0; idx < binlogNum; idx++ { var ps []string + if segment.GetFieldBinlogs()[0].GetBinlogs()[idx].GetEntriesNum() <= 0 { + log.Warn("receive empty binlog, skip it", zap.Int64("segmentID", segment.GetSegmentID()), + zap.Int("idx", idx)) + continue + } for _, f := range segment.GetFieldBinlogs() { ps = append(ps, f.GetBinlogs()[idx].GetLogPath()) } @@ -543,19 +569,33 @@ func (t *clusteringCompactionTask) mappingSegment( if (remained+1)%100 == 0 { currentBufferSize := t.getUsedMemoryBufferSize() + // trigger flushBinlog - if clusterBuffer.flushedRowNum.Load() > t.plan.GetMaxSegmentRows() || clusterBuffer.writer.IsFull() { + currentBufferNum := clusterBuffer.bufferRowNum.Load() + if clusterBuffer.flushedRowNum.Load()+currentBufferNum > t.plan.GetMaxSegmentRows() || + clusterBuffer.writer.IsFull() { // reach segment/binlog max size - t.flushChan <- SpillSignal{ - buffer: clusterBuffer, + t.clusterBufferLocks.Lock(clusterBuffer.id) + writer := clusterBuffer.writer + pack, _ := t.refreshBufferWriter(clusterBuffer) + log.Debug("buffer need to flush", zap.Int("bufferID", clusterBuffer.id), + zap.Bool("pack", pack), zap.Int64("buffer num", currentBufferNum)) + t.clusterBufferLocks.Unlock(clusterBuffer.id) + + t.flushChan <- FlushSignal{ + writer: writer, + pack: pack, + id: clusterBuffer.id, } + } else if currentBufferSize >= t.getMemoryBufferHighWatermark() { // reach flushBinlog trigger threshold - t.flushChan <- SpillSignal{} + log.Debug("largest buffer need to flush", zap.Int64("currentBufferSize", currentBufferSize)) + t.flushChan <- FlushSignal{} } // if the total buffer size is too large, block here, wait for memory release by flushBinlog - if currentBufferSize > t.getMemoryBufferBlockSpillThreshold() { + if currentBufferSize > t.getMemoryBufferBlockFlushThreshold() { loop: for { select { @@ -592,10 +632,8 @@ func (t *clusteringCompactionTask) writeToBuffer(ctx context.Context, clusterBuf defer t.clusterBufferLocks.Unlock(clusterBuffer.id) // prepare if clusterBuffer.writer == nil { - err := t.refreshBufferWriter(clusterBuffer) - if err != nil { - return err - } + log.Warn("unexpected behavior, please check", zap.Int("buffer id", clusterBuffer.id)) + panic(fmt.Sprintf("unexpected behavior, please check buffer id: %d", clusterBuffer.id)) } err := clusterBuffer.writer.Write(value) if err != nil { @@ -623,7 +661,7 @@ func (t *clusteringCompactionTask) getMemoryBufferHighWatermark() int64 { return int64(float64(t.memoryBufferSize) * 0.9) } -func (t *clusteringCompactionTask) getMemoryBufferBlockSpillThreshold() int64 { +func (t *clusteringCompactionTask) getMemoryBufferBlockFlushThreshold() int64 { return t.memoryBufferSize } @@ -638,14 +676,19 @@ func (t *clusteringCompactionTask) backgroundFlush(ctx context.Context) { return case signal := <-t.flushChan: var err error - if signal.buffer == nil { + if signal.done { + t.doneChan <- struct{}{} + } else if signal.writer == nil { err = t.flushLargestBuffers(ctx) } else { - err = func() error { - t.clusterBufferLocks.Lock(signal.buffer.id) - defer t.clusterBufferLocks.Unlock(signal.buffer.id) - return t.flushBinlog(ctx, signal.buffer) - }() + future := t.flushPool.Submit(func() (any, error) { + err := t.flushBinlog(ctx, t.clusterBuffers[signal.id], signal.writer, signal.pack) + if err != nil { + return nil, err + } + return struct{}{}, nil + }) + err = conc.AwaitAll(future) } if err != nil { log.Warn("fail to flushBinlog data", zap.Error(err)) @@ -662,28 +705,54 @@ func (t *clusteringCompactionTask) flushLargestBuffers(ctx context.Context) erro return nil } defer t.flushMutex.Unlock() + currentMemorySize := t.getUsedMemoryBufferSize() + if currentMemorySize <= t.getMemoryBufferLowWatermark() { + log.Info("memory low water mark", zap.Int64("memoryBufferSize", t.getUsedMemoryBufferSize())) + return nil + } + _, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "flushLargestBuffers") + defer span.End() bufferIDs := make([]int, 0) for _, buffer := range t.clusterBuffers { bufferIDs = append(bufferIDs, buffer.id) } sort.Slice(bufferIDs, func(i, j int) bool { - return t.clusterBuffers[i].bufferRowNum.Load() > t.clusterBuffers[j].bufferRowNum.Load() + return t.clusterBuffers[i].bufferMemorySize.Load()+int64(t.clusterBuffers[i].writer.WrittenMemorySize()) > + t.clusterBuffers[j].bufferMemorySize.Load()+int64(t.clusterBuffers[j].writer.WrittenMemorySize()) }) - log.Info("start flushLargestBuffers", zap.Ints("bufferIDs", bufferIDs)) + log.Info("start flushLargestBuffers", zap.Ints("bufferIDs", bufferIDs), zap.Int64("currentMemorySize", currentMemorySize)) + + futures := make([]*conc.Future[any], 0) for _, bufferId := range bufferIDs { - err := func() error { - t.clusterBufferLocks.Lock(bufferId) - defer t.clusterBufferLocks.Unlock(bufferId) - return t.flushBinlog(ctx, t.clusterBuffers[bufferId]) - }() - if err != nil { - return err - } - if t.getUsedMemoryBufferSize() <= t.getMemoryBufferLowWatermark() { + t.clusterBufferLocks.Lock(bufferId) + buffer := t.clusterBuffers[bufferId] + writer := buffer.writer + currentMemorySize -= int64(writer.WrittenMemorySize()) + pack, _ := t.refreshBufferWriter(buffer) + t.clusterBufferLocks.Unlock(bufferId) + + log.Info("currentMemorySize after flush buffer binlog", + zap.Int64("currentMemorySize", currentMemorySize), + zap.Int("bufferID", bufferId)) + future := t.flushPool.Submit(func() (any, error) { + err := t.flushBinlog(ctx, buffer, writer, pack) + if err != nil { + return nil, err + } + return struct{}{}, nil + }) + futures = append(futures, future) + + if currentMemorySize <= t.getMemoryBufferLowWatermark() { log.Info("reach memory low water mark", zap.Int64("memoryBufferSize", t.getUsedMemoryBufferSize())) break } } + if err := conc.AwaitAll(futures...); err != nil { + return err + } + + log.Info("flushLargestBuffers end", zap.Int64("currentMemorySize", currentMemorySize)) return nil } @@ -691,26 +760,28 @@ func (t *clusteringCompactionTask) flushAll(ctx context.Context) error { // only one flushLargestBuffers or flushAll should do at the same time t.flushMutex.Lock() defer t.flushMutex.Unlock() + futures := make([]*conc.Future[any], 0) for _, buffer := range t.clusterBuffers { - err := func() error { - t.clusterBufferLocks.Lock(buffer.id) - defer t.clusterBufferLocks.Unlock(buffer.id) - err := t.flushBinlog(ctx, buffer) + buffer := buffer + future := t.flushPool.Submit(func() (any, error) { + err := t.flushBinlog(ctx, buffer, buffer.writer, true) if err != nil { - log.Error("flushBinlog fail") - return err + return nil, err } - err = t.packBufferToSegment(ctx, buffer) - return err - }() - if err != nil { - return err - } + return struct{}{}, nil + }) + futures = append(futures, future) + } + if err := conc.AwaitAll(futures...); err != nil { + return err } + return nil } -func (t *clusteringCompactionTask) packBufferToSegment(ctx context.Context, buffer *ClusterBuffer) error { +func (t *clusteringCompactionTask) packBufferToSegment(ctx context.Context, buffer *ClusterBuffer, writer *SegmentWriter) error { + t.clusterBufferLocks.Lock(buffer.id) + defer t.clusterBufferLocks.Unlock(buffer.id) if len(buffer.flushedBinlogs) == 0 { return nil } @@ -718,7 +789,7 @@ func (t *clusteringCompactionTask) packBufferToSegment(ctx context.Context, buff for _, fieldBinlog := range buffer.flushedBinlogs { insertLogs = append(insertLogs, fieldBinlog) } - statPaths, err := statSerializeWrite(ctx, t.binlogIO, t.allocator, buffer.writer, buffer.flushedRowNum.Load()) + statPaths, err := statSerializeWrite(ctx, t.binlogIO, t.allocator, writer, buffer.flushedRowNum.Load()) if err != nil { return err } @@ -726,7 +797,7 @@ func (t *clusteringCompactionTask) packBufferToSegment(ctx context.Context, buff // pack current flushBinlog data into a segment seg := &datapb.CompactionSegment{ PlanID: t.plan.GetPlanID(), - SegmentID: buffer.writer.GetSegmentID(), + SegmentID: writer.GetSegmentID(), NumOfRows: buffer.flushedRowNum.Load(), InsertLogs: insertLogs, Field2StatslogPaths: []*datapb.FieldBinlog{statPaths}, @@ -737,37 +808,50 @@ func (t *clusteringCompactionTask) packBufferToSegment(ctx context.Context, buff FieldStats: []storage.FieldStats{buffer.clusteringKeyFieldStats.Clone()}, NumRows: int(buffer.flushedRowNum.Load()), } - buffer.uploadedSegmentStats[buffer.writer.GetSegmentID()] = segmentStats - // refresh - t.refreshBufferWriter(buffer) + buffer.uploadedSegmentStats[writer.GetSegmentID()] = segmentStats + buffer.flushedRowNum.Store(0) buffer.flushedBinlogs = make(map[typeutil.UniqueID]*datapb.FieldBinlog, 0) for _, binlog := range seg.InsertLogs { - log.Debug("pack binlog in segment", zap.Int64("partitionID", t.partitionID), zap.Int64("segID", buffer.writer.GetSegmentID()), zap.String("binlog", binlog.String())) + log.Debug("pack binlog in segment", zap.Int64("partitionID", t.partitionID), zap.Int64("segID", writer.GetSegmentID()), zap.String("binlog", binlog.String())) } - log.Debug("finish pack segment", zap.Int64("partitionID", t.partitionID), zap.Int64("segID", buffer.writer.GetSegmentID()), zap.Any("segStats", segmentStats)) + log.Debug("finish pack segment", zap.Int64("partitionID", t.partitionID), zap.Int64("segID", writer.GetSegmentID()), zap.Any("segStats", segmentStats)) + + // set old writer nil + writer = nil return nil } -func (t *clusteringCompactionTask) flushBinlog(ctx context.Context, buffer *ClusterBuffer) error { - log := log.With(zap.Int("bufferID", buffer.id), zap.Int64("bufferSize", buffer.bufferRowNum.Load()), zap.Int64("segmentID", buffer.writer.GetSegmentID())) - if buffer.writer.IsEmpty() { - return nil +func (t *clusteringCompactionTask) flushBinlog(ctx context.Context, buffer *ClusterBuffer, writer *SegmentWriter, pack bool) error { + _, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, fmt.Sprintf("flushBinlog-%d", writer.GetSegmentID())) + defer span.End() + if writer == nil { + log.Warn("buffer writer is nil, please check", zap.Int("buffer id", buffer.id)) + panic("buffer writer is nil, please check") } - - future := t.flushPool.Submit(func() (any, error) { - kvs, partialBinlogs, err := serializeWrite(ctx, t.allocator, buffer.writer) - if err != nil { - log.Warn("compact wrong, failed to serialize writer", zap.Error(err)) - return typeutil.NewPair(kvs, partialBinlogs), err + defer func() { + runtime.GC() + debug.FreeOSMemory() + }() + log := log.With(zap.Int("bufferID", buffer.id), + zap.Int64("writerRowNum", writer.GetRowNum()), + zap.Int64("segmentID", writer.GetSegmentID()), + zap.Bool("pack", pack)) + writtenMemorySize := int64(writer.WrittenMemorySize()) + log.Info("start flush binlog", zap.Int("bufferID", buffer.id), zap.Int64("writerSize", writtenMemorySize)) + if writer.GetRowNum() <= 0 { + log.Debug("writerRowNum is zero, skip flush") + if pack { + return t.packBufferToSegment(ctx, buffer, writer) } - return typeutil.NewPair(kvs, partialBinlogs), nil - }) - if err := conc.AwaitAll(future); err != nil { + return nil + } + start := time.Now() + kvs, partialBinlogs, err := serializeWrite(ctx, t.allocator, writer) + if err != nil { + log.Warn("compact wrong, failed to serialize writer", zap.Error(err)) return err } - kvs := future.Value().(typeutil.Pair[map[string][]byte, map[int64]*datapb.FieldBinlog]).A - partialBinlogs := future.Value().(typeutil.Pair[map[string][]byte, map[int64]*datapb.FieldBinlog]).B if err := t.binlogIO.Upload(ctx, kvs); err != nil { log.Warn("compact wrong, failed to upload kvs", zap.Error(err)) @@ -786,15 +870,18 @@ func (t *clusteringCompactionTask) flushBinlog(ctx context.Context, buffer *Clus buffer.flushedRowNum.Add(buffer.bufferRowNum.Load()) // clean buffer - buffer.bufferRowNum.Store(0) + //buffer.bufferRowNum.Store(0) + buffer.bufferMemorySize.Sub(writtenMemorySize) t.flushCount.Inc() - log.Info("finish flush binlogs", zap.Int64("flushCount", t.flushCount.Load())) - if buffer.flushedRowNum.Load() > t.plan.GetMaxSegmentRows() { - if err := t.packBufferToSegment(ctx, buffer); err != nil { + if pack { + if err := t.packBufferToSegment(ctx, buffer, writer); err != nil { return err } } + log.Info("finish flush binlogs", zap.Int64("flushCount", t.flushCount.Load()), + zap.Int64("cost", time.Since(start).Milliseconds()), + zap.Int64("writtenMemorySize", writtenMemorySize)) return nil } @@ -824,6 +911,8 @@ func (t *clusteringCompactionTask) cleanUp(ctx context.Context) { } func (t *clusteringCompactionTask) scalarAnalyze(ctx context.Context) (map[interface{}]int64, error) { + ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, fmt.Sprintf("scalarAnalyze-%d", t.GetPlanID())) + defer span.End() inputSegments := t.plan.GetSegmentBinlogs() futures := make([]*conc.Future[any], 0, len(inputSegments)) analyzeStart := time.Now() @@ -870,6 +959,8 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment( ctx context.Context, segment *datapb.CompactionSegmentBinlogs, ) (map[interface{}]int64, error) { + ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, fmt.Sprintf("scalarAnalyzeSegment-%d-%d", t.GetPlanID(), segment.GetSegmentID())) + defer span.End() log := log.With(zap.Int64("planID", t.GetPlanID()), zap.Int64("segmentID", segment.GetSegmentID())) // vars @@ -1014,16 +1105,28 @@ func (t *clusteringCompactionTask) scalarPlan(dict map[interface{}]int64) [][]in return buckets } -func (t *clusteringCompactionTask) refreshBufferWriter(buffer *ClusterBuffer) error { - segmentID, err := t.allocator.AllocOne() - if err != nil { - return err +func (t *clusteringCompactionTask) refreshBufferWriter(buffer *ClusterBuffer) (bool, error) { + var segmentID int64 + var err error + var pack bool + if buffer.writer != nil { + segmentID = buffer.writer.GetSegmentID() + buffer.bufferMemorySize.Add(int64(buffer.writer.WrittenMemorySize())) } + if buffer.writer == nil || buffer.flushedRowNum.Load()+buffer.bufferRowNum.Load() > t.plan.GetMaxSegmentRows() { + pack = true + segmentID, err = t.allocator.AllocOne() + if err != nil { + return pack, err + } + } + writer, err := NewSegmentWriter(t.plan.GetSchema(), t.plan.MaxSegmentRows, segmentID, t.partitionID, t.collectionID) if err != nil { - return err + return pack, err } + buffer.writer = writer buffer.bufferRowNum.Store(0) - return nil + return pack, nil } diff --git a/internal/datanode/io/binlog_io.go b/internal/datanode/io/binlog_io.go index 61a2ccf97615f..55274e8327e8e 100644 --- a/internal/datanode/io/binlog_io.go +++ b/internal/datanode/io/binlog_io.go @@ -18,6 +18,7 @@ package io import ( "context" + "time" "github.com/samber/lo" "go.opentelemetry.io/otel" @@ -55,6 +56,7 @@ func (b *BinlogIoImpl) Download(ctx context.Context, paths []string) ([][]byte, var val []byte var err error + start := time.Now() log.Debug("BinlogIO download", zap.String("path", path)) err = retry.Do(ctx, func() error { val, err = b.Read(ctx, path) @@ -64,6 +66,9 @@ func (b *BinlogIoImpl) Download(ctx context.Context, paths []string) ([][]byte, return err }) + log.Debug("BinlogIO download success", zap.String("path", path), zap.Int64("cost", time.Since(start).Milliseconds()), + zap.Error(err)) + return val, err }) futures = append(futures, future) @@ -88,6 +93,7 @@ func (b *BinlogIoImpl) Upload(ctx context.Context, kvs map[string][]byte) error innerK, innerV := k, v future := b.pool.Submit(func() (any, error) { var err error + start := time.Now() log.Debug("BinlogIO upload", zap.String("paths", innerK)) err = retry.Do(ctx, func() error { err = b.Write(ctx, innerK, innerV) @@ -96,6 +102,7 @@ func (b *BinlogIoImpl) Upload(ctx context.Context, kvs map[string][]byte) error } return err }) + log.Debug("BinlogIO upload success", zap.String("paths", innerK), zap.Int64("cost", time.Since(start).Milliseconds()), zap.Error(err)) return struct{}{}, err }) futures = append(futures, future)