Skip to content

Commit

Permalink
fix: Correct Size calculation of DeleteData (milvus-io#30397)
Browse files Browse the repository at this point in the history
This PR would correct the actual deltalog size

See also: milvus-io#30191

---------

Signed-off-by: yangxuan <[email protected]>
  • Loading branch information
XuanYang-cn authored Feb 2, 2024
1 parent 1c1dd48 commit d744962
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 56 deletions.
6 changes: 3 additions & 3 deletions internal/datanode/writebuffer/bf_write_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,12 +229,12 @@ func (s *BFWriteBufferSuite) TestBufferData() {

value, err := metrics.DataNodeFlowGraphBufferDataSize.GetMetricWithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(s.metacacheInt64.Collection()))
s.NoError(err)
s.MetricsEqual(value, 5524)
s.MetricsEqual(value, 5604)

delMsg = s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) }))
err = wb.BufferData([]*msgstream.InsertMsg{}, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200})
s.NoError(err)
s.MetricsEqual(value, 5684)
s.MetricsEqual(value, 5844)
})

s.Run("normal_run_varchar", func() {
Expand All @@ -259,7 +259,7 @@ func (s *BFWriteBufferSuite) TestBufferData() {

value, err := metrics.DataNodeFlowGraphBufferDataSize.GetMetricWithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(s.metacacheInt64.Collection()))
s.NoError(err)
s.MetricsEqual(value, 5884)
s.MetricsEqual(value, 7224)
})

s.Run("int_pk_type_not_match", func() {
Expand Down
13 changes: 2 additions & 11 deletions internal/datanode/writebuffer/delta_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"math"

"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
Expand Down Expand Up @@ -54,22 +53,14 @@ func (db *DeltaBuffer) Yield() *storage.DeleteData {
}

func (db *DeltaBuffer) Buffer(pks []storage.PrimaryKey, tss []typeutil.Timestamp, startPos, endPos *msgpb.MsgPosition) (bufSize int64) {
beforeSize := db.buffer.Size()
rowCount := len(pks)

for i := 0; i < rowCount; i++ {
db.buffer.Append(pks[i], tss[i])

switch pks[i].Type() {
case schemapb.DataType_Int64:
bufSize += 8
case schemapb.DataType_VarChar:
varCharPk := pks[i].(*storage.VarCharPrimaryKey)
bufSize += int64(len(varCharPk.Value))
}
// accumulate buf size for timestamp, which is 8 bytes
bufSize += 8
}

bufSize = db.buffer.Size() - beforeSize
db.UpdateStatistics(int64(rowCount), bufSize, db.getTimestampRange(tss), startPos, endPos)

return bufSize
Expand Down
11 changes: 9 additions & 2 deletions internal/datanode/writebuffer/delta_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
)

Expand All @@ -25,7 +26,8 @@ func (s *DeltaBufferSuite) TestBuffer() {
pks := lo.Map(tss, func(ts uint64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(int64(ts)) })

memSize := deltaBuffer.Buffer(pks, tss, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200})
s.EqualValues(100*8*2, memSize)
// 24 = 16(pk) + 8(ts)
s.EqualValues(100*24, memSize)
})

s.Run("string_pk", func() {
Expand All @@ -37,7 +39,8 @@ func (s *DeltaBufferSuite) TestBuffer() {
})

memSize := deltaBuffer.Buffer(pks, tss, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200})
s.EqualValues(100*8+100*3, memSize)
// 40 = (3*8+8)(string pk) + 8(ts)
s.EqualValues(100*40, memSize)
})
}

Expand All @@ -61,6 +64,10 @@ func (s *DeltaBufferSuite) TestYield() {
s.ElementsMatch(pks, result.Pks)
}

func (s *DeltaBufferSuite) SetupSuite() {
paramtable.Init()
}

func TestDeltaBuffer(t *testing.T) {
suite.Run(t, new(DeltaBufferSuite))
}
4 changes: 2 additions & 2 deletions internal/datanode/writebuffer/l0_write_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,12 +194,12 @@ func (s *L0WriteBufferSuite) TestBufferData() {

value, err := metrics.DataNodeFlowGraphBufferDataSize.GetMetricWithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(s.metacache.Collection()))
s.NoError(err)
s.MetricsEqual(value, 5524)
s.MetricsEqual(value, 5604)

delMsg = s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) }))
err = wb.BufferData([]*msgstream.InsertMsg{}, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200})
s.NoError(err)
s.MetricsEqual(value, 5684)
s.MetricsEqual(value, 5844)
})

s.Run("pk_type_not_match", func() {
Expand Down
19 changes: 10 additions & 9 deletions internal/storage/data_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"strconv"
"strings"

"github.com/samber/lo"

"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/pkg/common"
Expand Down Expand Up @@ -878,13 +880,15 @@ type DeleteData struct {
Pks []PrimaryKey // primary keys
Tss []Timestamp // timestamps
RowCount int64
memSize int64
}

func NewDeleteData(pks []PrimaryKey, tss []Timestamp) *DeleteData {
return &DeleteData{
Pks: pks,
Tss: tss,
RowCount: int64(len(pks)),
memSize: lo.SumBy(pks, func(pk PrimaryKey) int64 { return pk.Size() }) + int64(len(tss)*8),
}
}

Expand All @@ -893,32 +897,31 @@ func (data *DeleteData) Append(pk PrimaryKey, ts Timestamp) {
data.Pks = append(data.Pks, pk)
data.Tss = append(data.Tss, ts)
data.RowCount++
data.memSize += pk.Size() + int64(8)
}

// Append append 1 pk&ts pair to DeleteData
func (data *DeleteData) AppendBatch(pks []PrimaryKey, tss []Timestamp) {
data.Pks = append(data.Pks, pks...)
data.Tss = append(data.Tss, tss...)
data.RowCount += int64(len(pks))
data.memSize += lo.SumBy(pks, func(pk PrimaryKey) int64 { return pk.Size() }) + int64(len(tss)*8)
}

func (data *DeleteData) Merge(other *DeleteData) {
data.Pks = append(other.Pks, other.Pks...)
data.Tss = append(other.Tss, other.Tss...)
data.RowCount += other.RowCount
data.memSize += other.Size()

other.Pks = nil
other.Tss = nil
other.RowCount = 0
other.memSize = 0
}

func (data *DeleteData) Size() int64 {
var size int64
for _, pk := range data.Pks {
size += pk.Size()
}

return size
return data.memSize
}

// DeleteCodec serializes and deserializes the delete data
Expand Down Expand Up @@ -1048,13 +1051,11 @@ func (deleteCodec *DeleteCodec) Deserialize(blobs []*Blob) (partitionID UniqueID
}
}

result.Pks = append(result.Pks, deleteLog.Pk)
result.Tss = append(result.Tss, deleteLog.Ts)
result.Append(deleteLog.Pk, deleteLog.Ts)
}
eventReader.Close()
binlogReader.Close()
}
result.RowCount = int64(len(result.Pks))

return pid, sid, result, nil
}
Expand Down
72 changes: 43 additions & 29 deletions internal/storage/data_codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
Expand Down Expand Up @@ -458,11 +459,7 @@ func TestDeleteCodec(t *testing.T) {
pk1 := &Int64PrimaryKey{
Value: 1,
}
deleteData := &DeleteData{
Pks: []PrimaryKey{pk1},
Tss: []uint64{43757345},
RowCount: int64(1),
}
deleteData := NewDeleteData([]PrimaryKey{pk1}, []uint64{43757345})

pk2 := &Int64PrimaryKey{
Value: 2,
Expand All @@ -481,11 +478,7 @@ func TestDeleteCodec(t *testing.T) {
t.Run("string pk", func(t *testing.T) {
deleteCodec := NewDeleteCodec()
pk1 := NewVarCharPrimaryKey("test1")
deleteData := &DeleteData{
Pks: []PrimaryKey{pk1},
Tss: []uint64{43757345},
RowCount: int64(1),
}
deleteData := NewDeleteData([]PrimaryKey{pk1}, []uint64{43757345})

pk2 := NewVarCharPrimaryKey("test2")
deleteData.Append(pk2, 23578294723)
Expand All @@ -498,25 +491,6 @@ func TestDeleteCodec(t *testing.T) {
assert.Equal(t, sid, int64(1))
assert.Equal(t, data, deleteData)
})

t.Run("merge", func(t *testing.T) {
first := &DeleteData{
Pks: []PrimaryKey{NewInt64PrimaryKey(1)},
Tss: []uint64{100},
RowCount: 1,
}

second := &DeleteData{
Pks: []PrimaryKey{NewInt64PrimaryKey(2)},
Tss: []uint64{100},
RowCount: 1,
}

first.Merge(second)
assert.Equal(t, len(first.Pks), 2)
assert.Equal(t, len(first.Tss), 2)
assert.Equal(t, first.RowCount, int64(2))
})
}

func TestUpgradeDeleteLog(t *testing.T) {
Expand Down Expand Up @@ -755,3 +729,43 @@ func TestMemorySize(t *testing.T) {
assert.Equal(t, insertDataEmpty.Data[BinaryVectorField].GetMemorySize(), 4)
assert.Equal(t, insertDataEmpty.Data[FloatVectorField].GetMemorySize(), 4)
}

func TestDeleteData(t *testing.T) {
pks, err := GenInt64PrimaryKeys(1, 2, 3)
require.NoError(t, err)

t.Run("merge", func(t *testing.T) {
first := NewDeleteData(pks, []Timestamp{100, 101, 102})
second := NewDeleteData(pks, []Timestamp{100, 101, 102})
require.EqualValues(t, first.RowCount, second.RowCount)
require.EqualValues(t, first.Size(), second.Size())
require.EqualValues(t, 3, first.RowCount)
require.EqualValues(t, 72, first.Size())

first.Merge(second)
assert.Equal(t, len(first.Pks), 6)
assert.Equal(t, len(first.Tss), 6)
assert.EqualValues(t, first.RowCount, 6)
assert.EqualValues(t, first.Size(), 144)

assert.NotNil(t, second)
assert.EqualValues(t, 0, second.RowCount)
assert.EqualValues(t, 0, second.Size())
})

t.Run("append", func(t *testing.T) {
dData := NewDeleteData(nil, nil)
dData.Append(pks[0], 100)

assert.EqualValues(t, dData.RowCount, 1)
assert.EqualValues(t, dData.Size(), 24)
})

t.Run("append batch", func(t *testing.T) {
dData := NewDeleteData(nil, nil)
dData.AppendBatch(pks, []Timestamp{100, 101, 102})

assert.EqualValues(t, dData.RowCount, 3)
assert.EqualValues(t, dData.Size(), 72)
})
}

0 comments on commit d744962

Please sign in to comment.