diff --git a/internal/datanode/compaction/clustering_compactor.go b/internal/datanode/compaction/clustering_compactor.go index d6e0ab70940e5..3154456577608 100644 --- a/internal/datanode/compaction/clustering_compactor.go +++ b/internal/datanode/compaction/clustering_compactor.go @@ -608,7 +608,7 @@ func (t *clusteringCompactionTask) mappingSegment( pack: pack, id: clusterBuffer.id, } - } else if currentBufferTotalMemorySize > t.getMemoryBufferBlockFlushThreshold() && !t.hasSignal.Load() { + } else if currentBufferTotalMemorySize > t.getMemoryBufferHighWatermark() && !t.hasSignal.Load() { // reach flushBinlog trigger threshold log.Debug("largest buffer need to flush", zap.Int64("currentBufferTotalMemorySize", currentBufferTotalMemorySize)) @@ -632,8 +632,8 @@ func (t *clusteringCompactionTask) mappingSegment( default: // currentSize := t.getCurrentBufferWrittenMemorySize() currentSize := t.getBufferTotalUsedMemorySize() - if currentSize < t.getMemoryBufferBlockFlushThreshold() { - log.Debug("memory is already below the block watermark, continue writing", + if currentSize < t.getMemoryBufferHighWatermark() { + log.Debug("memory is already below the high watermark, continue writing", zap.Int64("currentSize", currentSize)) break loop } @@ -685,7 +685,7 @@ func (t *clusteringCompactionTask) getMemoryBufferLowWatermark() int64 { } func (t *clusteringCompactionTask) getMemoryBufferHighWatermark() int64 { - return int64(float64(t.memoryBufferSize) * 0.9) + return int64(float64(t.memoryBufferSize) * 0.7) } func (t *clusteringCompactionTask) getMemoryBufferBlockFlushThreshold() int64 {