Skip to content

Commit

Permalink
enhance: L0 compaction performance by Bypass serialize
Browse files Browse the repository at this point in the history
Signed-off-by: yangxuan <[email protected]>
  • Loading branch information
XuanYang-cn committed Jun 20, 2024
1 parent a7ae45c commit 8b0cfdb
Show file tree
Hide file tree
Showing 5 changed files with 326 additions and 114 deletions.
36 changes: 27 additions & 9 deletions internal/datanode/l0_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"math"
"time"

"github.com/samber/lo"
"go.opentelemetry.io/otel"
Expand Down Expand Up @@ -207,11 +208,11 @@ func (t *levelZeroCompactionTask) serializeUpload(ctx context.Context, segmentWr

blobKey, _ := binlog.BuildLogPath(storage.DeleteBinlog, writer.collectionID, writer.partitionID, writer.segmentID, -1, logID)

allBlobs[blobKey] = blob.GetValue()
allBlobs[blobKey] = blob
deltalog := &datapb.Binlog{
EntriesNum: writer.GetRowNum(),
LogSize: int64(len(blob.GetValue())),
MemorySize: blob.GetMemorySize(),
LogSize: int64(len(blob)),
MemorySize: writer.GetMemorySize(),
LogPath: blobKey,
LogID: logID,
TimestampFrom: tr.GetMinTimestamp(),
Expand Down Expand Up @@ -241,7 +242,7 @@ func (t *levelZeroCompactionTask) splitDelta(
ctx context.Context,
allDelta []*storage.DeleteData,
targetSegIDs []int64,
) map[int64]*SegmentDeltaWriter {
) (map[int64]*SegmentDeltaWriter, error) {
traceCtx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact splitDelta")
defer span.End()

Expand All @@ -254,13 +255,15 @@ func (t *levelZeroCompactionTask) splitDelta(
// spilt all delete data to segments
retMap := t.applyBFInParallel(traceCtx, allDelta, io.GetBFApplyPool(), segments)

var err error
targetSegBuffer := make(map[int64]*SegmentDeltaWriter)
retMap.Range(func(key int, value *BatchApplyRet) bool {
startIdx := value.StartIdx
pk2SegmentIDs := value.Segment2Hits

pks := allDelta[value.DeleteDataIdx].Pks
tss := allDelta[value.DeleteDataIdx].Tss
serialized := allDelta[value.DeleteDataIdx].Serialized

for segmentID, hits := range pk2SegmentIDs {
for i, hit := range hits {
Expand All @@ -271,14 +274,20 @@ func (t *levelZeroCompactionTask) splitDelta(
writer = NewSegmentDeltaWriter(segmentID, segment.GetPartitionID(), t.getCollection())
targetSegBuffer[segmentID] = writer
}
writer.Write(pks[startIdx+i], tss[startIdx+i])
err = writer.WriteSerialized(serialized[startIdx+i], pks[startIdx+i], tss[startIdx+i])
if err != nil {
return false

Check warning on line 279 in internal/datanode/l0_compactor.go

View check run for this annotation

Codecov / codecov/patch

internal/datanode/l0_compactor.go#L279

Added line #L279 was not covered by tests
}
}
}
}
return true
})
if err != nil {
return nil, err

Check warning on line 287 in internal/datanode/l0_compactor.go

View check run for this annotation

Codecov / codecov/patch

internal/datanode/l0_compactor.go#L287

Added line #L287 was not covered by tests
}

return targetSegBuffer
return targetSegBuffer, nil
}

type BatchApplyRet = struct {
Expand Down Expand Up @@ -356,20 +365,29 @@ func (t *levelZeroCompactionTask) process(ctx context.Context, batchSize int, ta
}

for i := 0; i < batch; i++ {
batchStart := time.Now()
left, right := i*batchSize, (i+1)*batchSize
if right > len(targetSegments) {
right = len(targetSegments)
}

batchSegments := targetSegments[left:right]
batchSegWriter := t.splitDelta(ctx, allDelta, batchSegments)
batchSegWriter, err := t.splitDelta(ctx, allDelta, batchSegments)
if err != nil {
log.Warn("L0 compaction splitDelta fail", zap.Error(err))
return nil, err

Check warning on line 378 in internal/datanode/l0_compactor.go

View check run for this annotation

Codecov / codecov/patch

internal/datanode/l0_compactor.go#L377-L378

Added lines #L377 - L378 were not covered by tests
}
batchResults, err := t.serializeUpload(ctx, batchSegWriter)
if err != nil {
log.Warn("L0 compaction serialize upload fail", zap.Error(err))
return nil, err
}

log.Info("L0 compaction finished one batch", zap.Int("batch no.", i), zap.Int("batch segment count", len(batchResults)))
log.Info("L0 compaction finished one batch",
zap.Int("batch no.", i),
zap.Int("batch segment count", len(batchResults)),
zap.Duration("batch elapse", time.Since(batchStart)),
)
results = append(results, batchResults...)
}

Expand All @@ -390,7 +408,7 @@ func (t *levelZeroCompactionTask) loadDelta(ctx context.Context, deltaLogs ...[]
for _, blob := range blobBytes {
blobs = append(blobs, &storage.Blob{Value: blob})
}
_, _, dData, err := storage.NewDeleteCodec().Deserialize(blobs)
_, _, dData, err := storage.NewDeleteCodec().DeserializeWithSerialized(blobs)
if err != nil {
return nil, err
}
Expand Down
45 changes: 34 additions & 11 deletions internal/datanode/l0_compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,18 @@ func (s *LevelZeroCompactionTaskSuite) SetupTest() {
3: 20002,
}

s.dData = storage.NewDeleteData([]storage.PrimaryKey{}, []Timestamp{})
dData := storage.NewEmptyDeleteData()
for pk, ts := range pk2ts {
s.dData.Append(storage.NewInt64PrimaryKey(pk), ts)
dData.Append(storage.NewInt64PrimaryKey(pk), ts)
}

dataCodec := storage.NewDeleteCodec()
blob, err := dataCodec.Serialize(0, 0, 0, s.dData)
blob, err := storage.NewDeleteCodec().Serialize(0, 0, 0, dData)
s.Require().NoError(err)
s.dBlob = blob.GetValue()

_, _, serializedData, err := storage.NewDeleteCodec().DeserializeWithSerialized([]*storage.Blob{{Value: s.dBlob}})
s.Require().NoError(err)
s.dData = serializedData
}

func (s *LevelZeroCompactionTaskSuite) TestProcessLoadDeltaFail() {
Expand Down Expand Up @@ -351,7 +354,10 @@ func (s *LevelZeroCompactionTaskSuite) TestSerializeUpload() {
s.SetupTest()
s.mockAlloc.EXPECT().AllocOne().Return(0, errors.New("mock alloc wrong"))
writer := NewSegmentDeltaWriter(100, 10, 1)
writer.WriteBatch(s.dData.Pks, s.dData.Tss)

for i := range s.dData.Pks {
writer.WriteSerialized(s.dData.Serialized[i], s.dData.Pks[i], s.dData.Tss[i])
}
writers := map[int64]*SegmentDeltaWriter{100: writer}

result, err := s.task.serializeUpload(ctx, writers)
Expand All @@ -365,7 +371,9 @@ func (s *LevelZeroCompactionTaskSuite) TestSerializeUpload() {
s.mockAlloc.EXPECT().AllocOne().Return(19530, nil)

writer := NewSegmentDeltaWriter(100, 10, 1)
writer.WriteBatch(s.dData.Pks, s.dData.Tss)
for i := range s.dData.Pks {
writer.WriteSerialized(s.dData.Serialized[i], s.dData.Pks[i], s.dData.Tss[i])
}
writers := map[int64]*SegmentDeltaWriter{100: writer}
results, err := s.task.serializeUpload(ctx, writers)
s.Error(err)
Expand All @@ -377,7 +385,9 @@ func (s *LevelZeroCompactionTaskSuite) TestSerializeUpload() {
s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil)
s.mockAlloc.EXPECT().AllocOne().Return(19530, nil)
writer := NewSegmentDeltaWriter(100, 10, 1)
writer.WriteBatch(s.dData.Pks, s.dData.Tss)
for i := range s.dData.Pks {
writer.WriteSerialized(s.dData.Serialized[i], s.dData.Pks[i], s.dData.Tss[i])
}
writers := map[int64]*SegmentDeltaWriter{100: writer}
results, err := s.task.serializeUpload(ctx, writers)
s.NoError(err)
Expand All @@ -394,6 +404,7 @@ func (s *LevelZeroCompactionTaskSuite) TestSplitDelta() {
bfs1 := metacache.NewBloomFilterSetWithBatchSize(100)
bfs1.UpdatePKRange(&storage.Int64FieldData{Data: []int64{1, 3}})
segment1 := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 100}, bfs1)

bfs2 := metacache.NewBloomFilterSetWithBatchSize(100)
bfs2.UpdatePKRange(&storage.Int64FieldData{Data: []int64{3}})
segment2 := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 101}, bfs2)
Expand All @@ -406,17 +417,29 @@ func (s *LevelZeroCompactionTaskSuite) TestSplitDelta() {
s.mockMeta.EXPECT().Collection().Return(1)

targetSegIDs := predicted
deltaWriters := s.task.splitDelta(context.TODO(), []*storage.DeleteData{s.dData}, targetSegIDs)
deltaWriters, err := s.task.splitDelta(context.TODO(), []*storage.DeleteData{s.dData}, targetSegIDs)
s.NoError(err)

expectedSegPK := map[int64][]int64{
100: {1, 3},
101: {3},
102: {3},
}

s.NotEmpty(deltaWriters)
s.ElementsMatch(predicted, lo.Keys(deltaWriters))
s.EqualValues(2, deltaWriters[100].GetRowNum())
s.EqualValues(1, deltaWriters[101].GetRowNum())
s.EqualValues(1, deltaWriters[102].GetRowNum())

s.ElementsMatch([]storage.PrimaryKey{storage.NewInt64PrimaryKey(1), storage.NewInt64PrimaryKey(3)}, deltaWriters[100].deleteData.Pks)
s.Equal(storage.NewInt64PrimaryKey(3), deltaWriters[101].deleteData.Pks[0])
s.Equal(storage.NewInt64PrimaryKey(3), deltaWriters[102].deleteData.Pks[0])
for segID, writer := range deltaWriters {
gotBytes, _, err := writer.Finish()
s.NoError(err)

_, _, gotData, err := storage.NewDeleteCodec().Deserialize([]*storage.Blob{{Value: gotBytes}})
s.NoError(err)
s.ElementsMatch(expectedSegPK[segID], lo.Map(gotData.Pks, func(pk storage.PrimaryKey, _ int) int64 { return pk.(*storage.Int64PrimaryKey).Value }))
}
}

func (s *LevelZeroCompactionTaskSuite) TestLoadDelta() {
Expand Down
38 changes: 20 additions & 18 deletions internal/datanode/segment_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,27 @@ import (

func NewSegmentDeltaWriter(segmentID, partitionID, collectionID int64) *SegmentDeltaWriter {
return &SegmentDeltaWriter{
deleteData: &storage.DeleteData{},
segmentID: segmentID,
partitionID: partitionID,
collectionID: collectionID,
tsFrom: math.MaxUint64,
tsTo: 0,

writer: storage.NewDeleteSerializedWriter(collectionID, partitionID, segmentID),
}
}

type SegmentDeltaWriter struct {
deleteData *storage.DeleteData
segmentID int64
partitionID int64
collectionID int64

tsFrom typeutil.Timestamp
tsTo typeutil.Timestamp
writer *storage.DeleteSerializedWriter

rowCount int
memSize int64
tsFrom typeutil.Timestamp
tsTo typeutil.Timestamp
}

func (w *SegmentDeltaWriter) GetCollectionID() int64 {
Expand All @@ -42,7 +46,11 @@ func (w *SegmentDeltaWriter) GetSegmentID() int64 {
}

func (w *SegmentDeltaWriter) GetRowNum() int64 {
return w.deleteData.RowCount
return int64(w.rowCount)
}

func (w *SegmentDeltaWriter) GetMemorySize() int64 {
return w.memSize
}

func (w *SegmentDeltaWriter) GetTimeRange() *writebuffer.TimeRange {
Expand All @@ -58,24 +66,18 @@ func (w *SegmentDeltaWriter) updateRange(ts typeutil.Timestamp) {
}
}

func (w *SegmentDeltaWriter) Write(pk storage.PrimaryKey, ts typeutil.Timestamp) {
w.deleteData.Append(pk, ts)
func (w *SegmentDeltaWriter) WriteSerialized(serializedRow string, pk storage.PrimaryKey, ts typeutil.Timestamp) error {
w.updateRange(ts)
w.memSize += pk.Size() + int64(8)
w.rowCount += 1
return w.writer.Write(serializedRow)
}

func (w *SegmentDeltaWriter) WriteBatch(pks []storage.PrimaryKey, tss []typeutil.Timestamp) {
w.deleteData.AppendBatch(pks, tss)

for _, ts := range tss {
w.updateRange(ts)
}
}

func (w *SegmentDeltaWriter) Finish() (*storage.Blob, *writebuffer.TimeRange, error) {
blob, err := storage.NewDeleteCodec().Serialize(w.collectionID, w.partitionID, w.segmentID, w.deleteData)
// Finish returns serialized bytes and timestamp range of delete data
func (w *SegmentDeltaWriter) Finish() ([]byte, *writebuffer.TimeRange, error) {
blob, err := w.writer.Finish(w.tsFrom, w.tsTo)
if err != nil {
return nil, nil, err
}

return blob, w.GetTimeRange(), nil
}
Loading

0 comments on commit 8b0cfdb

Please sign in to comment.