diff --git a/internal/datanode/importv2/pool_test.go b/internal/datanode/importv2/pool_test.go index 06873c6d31ae5..4449a5031c812 100644 --- a/internal/datanode/importv2/pool_test.go +++ b/internal/datanode/importv2/pool_test.go @@ -20,9 +20,10 @@ import ( "fmt" "testing" + "github.com/stretchr/testify/assert" + "github.com/milvus-io/milvus/pkg/config" "github.com/milvus-io/milvus/pkg/util/paramtable" - "github.com/stretchr/testify/assert" ) func TestResizePools(t *testing.T) { diff --git a/internal/datanode/iterators/deltalog_iterator.go b/internal/datanode/iterators/deltalog_iterator.go index 41b020edc1dec..1f5f8e66196c7 100644 --- a/internal/datanode/iterators/deltalog_iterator.go +++ b/internal/datanode/iterators/deltalog_iterator.go @@ -17,7 +17,7 @@ type DeltalogIterator struct { disposedOnce sync.Once disposed atomic.Bool - data *storage.DeleteData + data *storage.DeltaData blobs []*storage.Blob label *Label pos int @@ -50,8 +50,8 @@ func (d *DeltalogIterator) Next() (*LabeledRowData, error) { } row := &DeltalogRow{ - Pk: d.data.Pks[d.pos], - Timestamp: d.data.Tss[d.pos], + Pk: d.data.DeletePks().Get(d.pos), + Timestamp: d.data.DeleteTimestamps()[d.pos], } d.pos++ @@ -76,7 +76,7 @@ func (d *DeltalogIterator) hasNext() bool { d.data = dData d.blobs = nil } - return int64(d.pos) < d.data.RowCount + return int64(d.pos) < d.data.DeleteRowCount() } func (d *DeltalogIterator) isDisposed() bool { diff --git a/internal/querynodev2/delegator/delegator_data_test.go b/internal/querynodev2/delegator/delegator_data_test.go index 2343607269ac9..c461cc16ae64d 100644 --- a/internal/querynodev2/delegator/delegator_data_test.go +++ b/internal/querynodev2/delegator/delegator_data_test.go @@ -1509,18 +1509,15 @@ func (s *DelegatorDataSuite) TestSyncTargetVersion() { func (s *DelegatorDataSuite) TestLevel0Deletions() { delegator := s.delegator partitionID := int64(10) - partitionDelPks := storage.NewInt64PrimaryKeys(1) - partitionDelPks.AppendRaw(1) - allPartitionDelPks := storage.NewInt64PrimaryKeys(1) - allPartitionDelPks.AppendRaw(2) - partitionDeleteData := &storage.DeltaData{ - DeletePks: partitionDelPks, - DeleteTimestamps: []storage.Timestamp{100}, - } - allPartitionDeleteData := &storage.DeltaData{ - DeletePks: allPartitionDelPks, - DeleteTimestamps: []storage.Timestamp{101}, - } + partitionDeleteData, err := storage.NewDeltaDataWithPkType(1, schemapb.DataType_Int64) + s.Require().NoError(err) + err = partitionDeleteData.Append(storage.NewInt64PrimaryKey(1), 100) + s.Require().NoError(err) + + allPartitionDeleteData, err := storage.NewDeltaDataWithPkType(1, schemapb.DataType_Int64) + s.Require().NoError(err) + err = allPartitionDeleteData.Append(storage.NewInt64PrimaryKey(2), 101) + s.Require().NoError(err) schema := segments.GenTestCollectionSchema("test_stop", schemapb.DataType_Int64, true) collection := segments.NewCollection(1, schema, nil, &querypb.LoadMetaInfo{ @@ -1549,29 +1546,29 @@ func (s *DelegatorDataSuite) TestLevel0Deletions() { l0Global.LoadDeltaData(context.TODO(), allPartitionDeleteData) pks, _ := delegator.GetLevel0Deletions(partitionID, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing)) - s.True(pks[0].EQ(partitionDeleteData.DeletePks.Get(0))) + s.True(pks[0].EQ(partitionDeleteData.DeletePks().Get(0))) pks, _ = delegator.GetLevel0Deletions(partitionID+1, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing)) s.Empty(pks) delegator.segmentManager.Put(context.TODO(), segments.SegmentTypeSealed, l0Global) pks, _ = delegator.GetLevel0Deletions(partitionID, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing)) - s.ElementsMatch(pks, []storage.PrimaryKey{partitionDeleteData.DeletePks.Get(0), allPartitionDeleteData.DeletePks.Get(0)}) + s.ElementsMatch(pks, []storage.PrimaryKey{partitionDeleteData.DeletePks().Get(0), allPartitionDeleteData.DeletePks().Get(0)}) bfs := pkoracle.NewBloomFilterSet(3, l0.Partition(), commonpb.SegmentState_Sealed) - bfs.UpdateBloomFilter([]storage.PrimaryKey{allPartitionDeleteData.DeletePks.Get(0)}) + bfs.UpdateBloomFilter([]storage.PrimaryKey{allPartitionDeleteData.DeletePks().Get(0)}) pks, _ = delegator.GetLevel0Deletions(partitionID, bfs) // bf filtered segment s.Equal(len(pks), 1) - s.True(pks[0].EQ(allPartitionDeleteData.DeletePks.Get(0))) + s.True(pks[0].EQ(allPartitionDeleteData.DeletePks().Get(0))) delegator.segmentManager.Remove(context.TODO(), l0.ID(), querypb.DataScope_All) pks, _ = delegator.GetLevel0Deletions(partitionID, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing)) - s.True(pks[0].EQ(allPartitionDeleteData.DeletePks.Get(0))) + s.True(pks[0].EQ(allPartitionDeleteData.DeletePks().Get(0))) pks, _ = delegator.GetLevel0Deletions(partitionID+1, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing)) - s.True(pks[0].EQ(allPartitionDeleteData.DeletePks.Get(0))) + s.True(pks[0].EQ(allPartitionDeleteData.DeletePks().Get(0))) delegator.segmentManager.Remove(context.TODO(), l0Global.ID(), querypb.DataScope_All) pks, _ = delegator.GetLevel0Deletions(partitionID+1, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing)) diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index 3cf67e4be7606..20578e32f91df 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -1019,8 +1019,8 @@ func (s *LocalSegment) AddFieldDataInfo(ctx context.Context, rowCount int64, fie } func (s *LocalSegment) LoadDeltaData(ctx context.Context, deltaData *storage.DeltaData) error { - pks, tss := deltaData.DeletePks, deltaData.DeleteTimestamps - rowNum := deltaData.DelRowCount + pks, tss := deltaData.DeletePks(), deltaData.DeleteTimestamps() + rowNum := deltaData.DeleteRowCount() if !s.ptrLock.RLockIf(state.IsNotReleased) { return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released") diff --git a/internal/querynodev2/segments/segment_l0.go b/internal/querynodev2/segments/segment_l0.go index d2f88a200e908..0b28f8042fd12 100644 --- a/internal/querynodev2/segments/segment_l0.go +++ b/internal/querynodev2/segments/segment_l0.go @@ -155,10 +155,10 @@ func (s *L0Segment) LoadDeltaData(ctx context.Context, deltaData *storage.DeltaD s.dataGuard.Lock() defer s.dataGuard.Unlock() - for i := 0; i < deltaData.DeletePks.Len(); i++ { - s.pks = append(s.pks, deltaData.DeletePks.Get(i)) + for i := 0; i < int(deltaData.DeleteRowCount()); i++ { + s.pks = append(s.pks, deltaData.DeletePks().Get(i)) } - s.tss = append(s.tss, deltaData.DeleteTimestamps...) + s.tss = append(s.tss, deltaData.DeleteTimestamps()...) return nil } diff --git a/internal/querynodev2/segments/segment_loader.go b/internal/querynodev2/segments/segment_loader.go index 647b78f3bdcf5..eb9b513e8adec 100644 --- a/internal/querynodev2/segments/segment_loader.go +++ b/internal/querynodev2/segments/segment_loader.go @@ -1212,22 +1212,13 @@ func (loader *segmentLoader) loadDeltalogs(ctx context.Context, segment Segment, return blob.RowNum }) - var deltaData *storage.DeltaData collection := loader.manager.Collection.Get(segment.Collection()) helper, _ := typeutil.CreateSchemaHelper(collection.Schema()) pkField, _ := helper.GetPrimaryKeyField() - switch pkField.DataType { - case schemapb.DataType_Int64: - deltaData = &storage.DeltaData{ - DeletePks: storage.NewInt64PrimaryKeys(int(rowNums)), - DeleteTimestamps: make([]uint64, 0, rowNums), - } - case schemapb.DataType_VarChar: - deltaData = &storage.DeltaData{ - DeletePks: storage.NewVarcharPrimaryKeys(int(rowNums)), - DeleteTimestamps: make([]uint64, 0, rowNums), - } + deltaData, err := storage.NewDeltaDataWithPkType(rowNums, pkField.DataType) + if err != nil { + return err } reader, err := storage.CreateDeltalogReader(blobs) @@ -1244,9 +1235,10 @@ func (loader *segmentLoader) loadDeltalogs(ctx context.Context, segment Segment, return err } dl := reader.Value() - deltaData.DeletePks.MustAppend(dl.Pk) - deltaData.DeleteTimestamps = append(deltaData.DeleteTimestamps, dl.Ts) - deltaData.DelRowCount++ + err = deltaData.Append(dl.Pk, dl.Ts) + if err != nil { + return err + } } err = segment.LoadDeltaData(ctx, deltaData) @@ -1254,7 +1246,7 @@ func (loader *segmentLoader) loadDeltalogs(ctx context.Context, segment Segment, return err } - log.Info("load delta logs done", zap.Int64("deleteCount", deltaData.DelRowCount)) + log.Info("load delta logs done", zap.Int64("deleteCount", deltaData.DeleteRowCount())) return nil } diff --git a/internal/storage/data_codec.go b/internal/storage/data_codec.go index 9a1726575c421..460e042458582 100644 --- a/internal/storage/data_codec.go +++ b/internal/storage/data_codec.go @@ -23,6 +23,8 @@ import ( "math" "sort" + "github.com/samber/lo" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/pkg/common" @@ -761,13 +763,17 @@ func (deleteCodec *DeleteCodec) Serialize(collectionID UniqueID, partitionID Uni } // Deserialize deserializes the deltalog blobs into DeleteData -func (deleteCodec *DeleteCodec) Deserialize(blobs []*Blob) (partitionID UniqueID, segmentID UniqueID, data *DeleteData, err error) { +func (deleteCodec *DeleteCodec) Deserialize(blobs []*Blob) (partitionID UniqueID, segmentID UniqueID, data *DeltaData, err error) { if len(blobs) == 0 { return InvalidUniqueID, InvalidUniqueID, nil, fmt.Errorf("blobs is empty") } + rowNum := lo.SumBy(blobs, func(blob *Blob) int64 { + return blob.RowNum + }) + var pid, sid UniqueID - result := &DeleteData{} + result := NewDeltaData(rowNum) deserializeBlob := func(blob *Blob) error { binlogReader, err := NewBinlogReader(blob.Value) @@ -801,7 +807,10 @@ func (deleteCodec *DeleteCodec) Deserialize(blobs []*Blob) (partitionID UniqueID if err != nil { return err } - result.Append(deleteLog.Pk, deleteLog.Ts) + err = result.Append(deleteLog.Pk, deleteLog.Ts) + if err != nil { + return err + } } return nil } diff --git a/internal/storage/data_codec_test.go b/internal/storage/data_codec_test.go index 37b0cf77dbaca..614167b2bf1d8 100644 --- a/internal/storage/data_codec_test.go +++ b/internal/storage/data_codec_test.go @@ -574,9 +574,11 @@ func TestDeleteCodec(t *testing.T) { pid, sid, data, err := deleteCodec.Deserialize([]*Blob{blob}) assert.NoError(t, err) + intPks, ok := data.DeletePks().(*Int64PrimaryKeys) + require.True(t, ok) assert.Equal(t, pid, int64(1)) assert.Equal(t, sid, int64(1)) - assert.Equal(t, data, deleteData) + assert.Equal(t, []int64{1, 2}, intPks.values) }) t.Run("string pk", func(t *testing.T) { @@ -591,9 +593,11 @@ func TestDeleteCodec(t *testing.T) { pid, sid, data, err := deleteCodec.Deserialize([]*Blob{blob}) assert.NoError(t, err) + strPks, ok := data.DeletePks().(*VarcharPrimaryKeys) + require.True(t, ok) assert.Equal(t, pid, int64(1)) assert.Equal(t, sid, int64(1)) - assert.Equal(t, data, deleteData) + assert.Equal(t, []string{"test1", "test2"}, strPks.values) }) } @@ -633,8 +637,10 @@ func TestUpgradeDeleteLog(t *testing.T) { assert.NoError(t, err) assert.Equal(t, int64(1), parID) assert.Equal(t, int64(1), segID) - assert.ElementsMatch(t, dData.Pks, deleteData.Pks) - assert.ElementsMatch(t, dData.Tss, deleteData.Tss) + intPks, ok := deleteData.DeletePks().(*Int64PrimaryKeys) + require.True(t, ok) + assert.ElementsMatch(t, []int64{1, 2}, intPks.values) + assert.ElementsMatch(t, dData.Tss, deleteData.DeleteTimestamps()) }) t.Run("with split lenth error", func(t *testing.T) { diff --git a/internal/storage/delta_data.go b/internal/storage/delta_data.go index f698210b847d4..8bb7f7cdf797c 100644 --- a/internal/storage/delta_data.go +++ b/internal/storage/delta_data.go @@ -21,11 +21,13 @@ import ( "fmt" "strconv" "strings" + "sync" "github.com/samber/lo" "github.com/valyala/fastjson" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/pkg/util/merr" ) // parserPool use object pooling to reduce fastjson.Parser allocation. @@ -34,14 +36,84 @@ var parserPool = &fastjson.ParserPool{} // DeltaData stores delta data // currently only delete tuples are stored type DeltaData struct { - PkType schemapb.DataType + pkType schemapb.DataType // delete tuples - DeletePks PrimaryKeys - DeleteTimestamps []Timestamp + deletePks PrimaryKeys + deleteTimestamps []Timestamp // stats - DelRowCount int64 - MemSize int64 + delRowCount int64 + + initCap int64 + typeInitOnce sync.Once +} + +func (dd *DeltaData) initPkType(pkType schemapb.DataType) error { + var err error + dd.typeInitOnce.Do(func() { + switch pkType { + case schemapb.DataType_Int64: + dd.deletePks = NewInt64PrimaryKeys(dd.initCap) + case schemapb.DataType_VarChar: + dd.deletePks = NewVarcharPrimaryKeys(dd.initCap) + default: + err = merr.WrapErrServiceInternal("unsupported pk type", pkType.String()) + } + dd.pkType = pkType + }) + return err +} + +func (dd *DeltaData) PkType() schemapb.DataType { + return dd.pkType +} + +func (dd *DeltaData) DeletePks() PrimaryKeys { + return dd.deletePks +} + +func (dd *DeltaData) DeleteTimestamps() []Timestamp { + return dd.deleteTimestamps +} + +func (dd *DeltaData) Append(pk PrimaryKey, ts Timestamp) error { + dd.initPkType(pk.Type()) + err := dd.deletePks.Append(pk) + if err != nil { + return err + } + dd.deleteTimestamps = append(dd.deleteTimestamps, ts) + dd.delRowCount++ + return nil +} + +func (dd *DeltaData) DeleteRowCount() int64 { + return dd.delRowCount +} + +func (dd *DeltaData) MemSize() int64 { + var result int64 + if dd.deletePks != nil { + result += dd.deletePks.Size() + } + result += int64(len(dd.deleteTimestamps) * 8) + return result +} + +func NewDeltaData(cap int64) *DeltaData { + return &DeltaData{ + deleteTimestamps: make([]Timestamp, 0, cap), + initCap: cap, + } +} + +func NewDeltaDataWithPkType(cap int64, pkType schemapb.DataType) (*DeltaData, error) { + result := NewDeltaData(cap) + err := result.initPkType(pkType) + if err != nil { + return nil, err + } + return result, nil } type DeleteLog struct { diff --git a/internal/storage/delta_data_test.go b/internal/storage/delta_data_test.go index 4ee51de4ee0a9..21570b714efe5 100644 --- a/internal/storage/delta_data_test.go +++ b/internal/storage/delta_data_test.go @@ -20,6 +20,8 @@ import ( "testing" "github.com/stretchr/testify/suite" + + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" ) type DeleteLogSuite struct { @@ -150,3 +152,135 @@ func (s *DeleteLogSuite) TestUnmarshalJSON() { func TestDeleteLog(t *testing.T) { suite.Run(t, new(DeleteLogSuite)) } + +type DeltaDataSuite struct { + suite.Suite +} + +func (s *DeltaDataSuite) TestCreateWithPkType() { + s.Run("int_pks", func() { + dd, err := NewDeltaDataWithPkType(10, schemapb.DataType_Int64) + s.Require().NoError(err) + + s.Equal(schemapb.DataType_Int64, dd.PkType()) + intPks, ok := dd.DeletePks().(*Int64PrimaryKeys) + s.Require().True(ok) + s.EqualValues(10, cap(intPks.values)) + }) + + s.Run("varchar_pks", func() { + dd, err := NewDeltaDataWithPkType(10, schemapb.DataType_VarChar) + s.Require().NoError(err) + + s.Equal(schemapb.DataType_VarChar, dd.PkType()) + intPks, ok := dd.DeletePks().(*VarcharPrimaryKeys) + s.Require().True(ok) + s.EqualValues(10, cap(intPks.values)) + }) + + s.Run("unsupport_pk_type", func() { + _, err := NewDeltaDataWithPkType(10, schemapb.DataType_Bool) + s.Error(err) + }) +} + +func (s *DeltaDataSuite) TestAppend() { + s.Run("normal_same_type", func() { + s.Run("int64_pk", func() { + dd, err := NewDeltaDataWithPkType(10, schemapb.DataType_Int64) + s.Require().NoError(err) + + err = dd.Append(NewInt64PrimaryKey(100), 100) + s.NoError(err) + + err = dd.Append(NewInt64PrimaryKey(200), 200) + s.NoError(err) + + s.EqualValues(2, dd.DeleteRowCount()) + s.Equal([]Timestamp{100, 200}, dd.DeleteTimestamps()) + intPks, ok := dd.DeletePks().(*Int64PrimaryKeys) + s.Require().True(ok) + s.Equal([]int64{100, 200}, intPks.values) + s.EqualValues(32, dd.MemSize()) + }) + + s.Run("varchar_pk", func() { + dd, err := NewDeltaDataWithPkType(10, schemapb.DataType_VarChar) + s.Require().NoError(err) + + err = dd.Append(NewVarCharPrimaryKey("100"), 100) + s.NoError(err) + + err = dd.Append(NewVarCharPrimaryKey("200"), 200) + s.NoError(err) + + s.EqualValues(2, dd.DeleteRowCount()) + s.Equal([]Timestamp{100, 200}, dd.DeleteTimestamps()) + varcharPks, ok := dd.DeletePks().(*VarcharPrimaryKeys) + s.Require().True(ok) + s.Equal([]string{"100", "200"}, varcharPks.values) + s.EqualValues(54, dd.MemSize()) + }) + }) + + s.Run("deduct_pk_type", func() { + s.Run("int64_pk", func() { + dd := NewDeltaData(10) + + err := dd.Append(NewInt64PrimaryKey(100), 100) + s.NoError(err) + s.Equal(schemapb.DataType_Int64, dd.PkType()) + + err = dd.Append(NewInt64PrimaryKey(200), 200) + s.NoError(err) + s.Equal(schemapb.DataType_Int64, dd.PkType()) + + s.EqualValues(2, dd.DeleteRowCount()) + s.Equal([]Timestamp{100, 200}, dd.DeleteTimestamps()) + intPks, ok := dd.DeletePks().(*Int64PrimaryKeys) + s.Require().True(ok) + s.Equal([]int64{100, 200}, intPks.values) + }) + + s.Run("varchar_pk", func() { + dd := NewDeltaData(10) + + err := dd.Append(NewVarCharPrimaryKey("100"), 100) + s.NoError(err) + s.Equal(schemapb.DataType_VarChar, dd.PkType()) + + err = dd.Append(NewVarCharPrimaryKey("200"), 200) + s.NoError(err) + s.Equal(schemapb.DataType_VarChar, dd.PkType()) + + s.EqualValues(2, dd.DeleteRowCount()) + s.Equal([]Timestamp{100, 200}, dd.DeleteTimestamps()) + varcharPks, ok := dd.DeletePks().(*VarcharPrimaryKeys) + s.Require().True(ok) + s.Equal([]string{"100", "200"}, varcharPks.values) + }) + }) + + // expect errors + s.Run("mixed_pk_type", func() { + s.Run("intpks_append_varchar", func() { + dd, err := NewDeltaDataWithPkType(10, schemapb.DataType_Int64) + s.Require().NoError(err) + + err = dd.Append(NewVarCharPrimaryKey("100"), 100) + s.Error(err) + }) + + s.Run("varcharpks_append_int", func() { + dd, err := NewDeltaDataWithPkType(10, schemapb.DataType_VarChar) + s.Require().NoError(err) + + err = dd.Append(NewInt64PrimaryKey(100), 100) + s.Error(err) + }) + }) +} + +func TestDeltaData(t *testing.T) { + suite.Run(t, new(DeltaDataSuite)) +} diff --git a/internal/storage/primary_keys.go b/internal/storage/primary_keys.go index 4f6be2e3a406c..58666356ed411 100644 --- a/internal/storage/primary_keys.go +++ b/internal/storage/primary_keys.go @@ -38,7 +38,7 @@ type Int64PrimaryKeys struct { values []int64 } -func NewInt64PrimaryKeys(cap int) *Int64PrimaryKeys { +func NewInt64PrimaryKeys(cap int64) *Int64PrimaryKeys { return &Int64PrimaryKeys{values: make([]int64, 0, cap)} } @@ -97,7 +97,7 @@ type VarcharPrimaryKeys struct { size int64 } -func NewVarcharPrimaryKeys(cap int) *VarcharPrimaryKeys { +func NewVarcharPrimaryKeys(cap int64) *VarcharPrimaryKeys { return &VarcharPrimaryKeys{ values: make([]string, 0, cap), }