Skip to content

Commit

Permalink
enhance: support null in bulk insert of binlog to help backup null (#…
Browse files Browse the repository at this point in the history
…36526)

#36341

Signed-off-by: lixinguo <[email protected]>
Co-authored-by: lixinguo <[email protected]>
  • Loading branch information
smellthemoon and lixinguo authored Sep 26, 2024
1 parent c94b69c commit b60164b
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 56 deletions.
40 changes: 35 additions & 5 deletions internal/storage/insert_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -1152,38 +1152,68 @@ func (data *JSONFieldData) AppendValidDataRows(rows interface{}) error {
// AppendValidDataRows appends FLATTEN vectors to field data.
func (data *BinaryVectorFieldData) AppendValidDataRows(rows interface{}) error {
if rows != nil {
return merr.WrapErrParameterInvalidMsg("not support Nullable in vector")
v, ok := rows.([]bool)
if !ok {
return merr.WrapErrParameterInvalid("[]bool", rows, "Wrong rows type")
}
if len(v) != 0 {
return merr.WrapErrParameterInvalidMsg("not support Nullable in vector")
}
}
return nil
}

// AppendValidDataRows appends FLATTEN vectors to field data.
func (data *FloatVectorFieldData) AppendValidDataRows(rows interface{}) error {
if rows != nil {
return merr.WrapErrParameterInvalidMsg("not support Nullable in vector")
v, ok := rows.([]bool)
if !ok {
return merr.WrapErrParameterInvalid("[]bool", rows, "Wrong rows type")
}
if len(v) != 0 {
return merr.WrapErrParameterInvalidMsg("not support Nullable in vector")
}
}
return nil
}

// AppendValidDataRows appends FLATTEN vectors to field data.
func (data *Float16VectorFieldData) AppendValidDataRows(rows interface{}) error {
if rows != nil {
return merr.WrapErrParameterInvalidMsg("not support Nullable in vector")
v, ok := rows.([]bool)
if !ok {
return merr.WrapErrParameterInvalid("[]bool", rows, "Wrong rows type")
}
if len(v) != 0 {
return merr.WrapErrParameterInvalidMsg("not support Nullable in vector")
}
}
return nil
}

// AppendValidDataRows appends FLATTEN vectors to field data.
func (data *BFloat16VectorFieldData) AppendValidDataRows(rows interface{}) error {
if rows != nil {
return merr.WrapErrParameterInvalidMsg("not support Nullable in vector")
v, ok := rows.([]bool)
if !ok {
return merr.WrapErrParameterInvalid("[]bool", rows, "Wrong rows type")
}
if len(v) != 0 {
return merr.WrapErrParameterInvalidMsg("not support Nullable in vector")
}
}
return nil
}

func (data *SparseFloatVectorFieldData) AppendValidDataRows(rows interface{}) error {
if rows != nil {
return merr.WrapErrParameterInvalidMsg("not support Nullable in vector")
v, ok := rows.([]bool)
if !ok {
return merr.WrapErrParameterInvalid("[]bool", rows, "Wrong rows type")
}
if len(v) != 0 {
return merr.WrapErrParameterInvalidMsg("not support Nullable in vector")
}
}
return nil
}
Expand Down
7 changes: 3 additions & 4 deletions internal/util/importutilv2/binlog/field_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,12 @@ func (r *fieldReader) Next() (storage.FieldData, error) {
if err != nil {
return nil, err
}
rowsSet, err := readData(r.reader, storage.InsertEventType)
rowsSet, validDataRows, err := readData(r.reader, storage.InsertEventType)
if err != nil {
return nil, err
}
// need append nulls
for _, rows := range rowsSet {
err = fieldData.AppendRows(rows, nil)
for i, rows := range rowsSet {
err = fieldData.AppendRows(rows, validDataRows[i])
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion internal/util/importutilv2/binlog/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ func (r *reader) readDelete(deltaLogs []string, tsStart, tsEnd uint64) (*storage
if err != nil {
return nil, err
}
rowsSet, err := readData(reader, storage.DeleteEventType)
// no need to read nulls in DeleteEventType
rowsSet, _, err := readData(reader, storage.DeleteEventType)
if err != nil {
return nil, err
}
Expand Down
119 changes: 79 additions & 40 deletions internal/util/importutilv2/binlog/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (suite *ReaderSuite) SetupTest() {

func createBinlogBuf(t *testing.T, field *schemapb.FieldSchema, data storage.FieldData) []byte {
dataType := field.GetDataType()
w := storage.NewInsertBinlogWriter(dataType, 1, 1, 1, field.GetFieldID(), false)
w := storage.NewInsertBinlogWriter(dataType, 1, 1, 1, field.GetFieldID(), field.GetNullable())
assert.NotNil(t, w)
defer w.Close()

Expand All @@ -81,7 +81,7 @@ func createBinlogBuf(t *testing.T, field *schemapb.FieldSchema, data storage.Fie
dim = 1
}

evt, err := w.NextInsertEventWriter(storage.WithDim(int(dim)))
evt, err := w.NextInsertEventWriter(storage.WithDim(int(dim)), storage.WithNullable(field.GetNullable()))
assert.NoError(t, err)

evt.SetEventTimestamp(1, math.MaxInt64)
Expand All @@ -94,42 +94,57 @@ func createBinlogBuf(t *testing.T, field *schemapb.FieldSchema, data storage.Fie

switch dataType {
case schemapb.DataType_Bool:
err = evt.AddBoolToPayload(data.(*storage.BoolFieldData).Data, nil)
err = evt.AddBoolToPayload(data.(*storage.BoolFieldData).Data, data.(*storage.BoolFieldData).ValidData)
assert.NoError(t, err)
case schemapb.DataType_Int8:
err = evt.AddInt8ToPayload(data.(*storage.Int8FieldData).Data, nil)
err = evt.AddInt8ToPayload(data.(*storage.Int8FieldData).Data, data.(*storage.Int8FieldData).ValidData)
assert.NoError(t, err)
case schemapb.DataType_Int16:
err = evt.AddInt16ToPayload(data.(*storage.Int16FieldData).Data, nil)
err = evt.AddInt16ToPayload(data.(*storage.Int16FieldData).Data, data.(*storage.Int16FieldData).ValidData)
assert.NoError(t, err)
case schemapb.DataType_Int32:
err = evt.AddInt32ToPayload(data.(*storage.Int32FieldData).Data, nil)
err = evt.AddInt32ToPayload(data.(*storage.Int32FieldData).Data, data.(*storage.Int32FieldData).ValidData)
assert.NoError(t, err)
case schemapb.DataType_Int64:
err = evt.AddInt64ToPayload(data.(*storage.Int64FieldData).Data, nil)
err = evt.AddInt64ToPayload(data.(*storage.Int64FieldData).Data, data.(*storage.Int64FieldData).ValidData)
assert.NoError(t, err)
case schemapb.DataType_Float:
err = evt.AddFloatToPayload(data.(*storage.FloatFieldData).Data, nil)
err = evt.AddFloatToPayload(data.(*storage.FloatFieldData).Data, data.(*storage.FloatFieldData).ValidData)
assert.NoError(t, err)
case schemapb.DataType_Double:
err = evt.AddDoubleToPayload(data.(*storage.DoubleFieldData).Data, nil)
err = evt.AddDoubleToPayload(data.(*storage.DoubleFieldData).Data, data.(*storage.DoubleFieldData).ValidData)
assert.NoError(t, err)
case schemapb.DataType_VarChar:
values := data.(*storage.StringFieldData).Data
for _, val := range values {
err = evt.AddOneStringToPayload(val, true)
validValues := data.(*storage.StringFieldData).ValidData
for i, val := range values {
valid := true
if len(validValues) > 0 {
valid = validValues[i]
}
err = evt.AddOneStringToPayload(val, valid)
assert.NoError(t, err)
}
case schemapb.DataType_JSON:
rows := data.(*storage.JSONFieldData).Data
validValues := data.(*storage.JSONFieldData).ValidData
for i := 0; i < len(rows); i++ {
err = evt.AddOneJSONToPayload(rows[i], true)
valid := true
if len(validValues) > 0 {
valid = validValues[i]
}
err = evt.AddOneJSONToPayload(rows[i], valid)
assert.NoError(t, err)
}
case schemapb.DataType_Array:
rows := data.(*storage.ArrayFieldData).Data
validValues := data.(*storage.ArrayFieldData).ValidData
for i := 0; i < len(rows); i++ {
err = evt.AddOneArrayToPayload(rows[i], true)
valid := true
if len(validValues) > 0 {
valid = validValues[i]
}
err = evt.AddOneArrayToPayload(rows[i], valid)
assert.NoError(t, err)
}
case schemapb.DataType_BinaryVector:
Expand Down Expand Up @@ -176,7 +191,7 @@ func createDeltaBuf(t *testing.T, deletePKs []storage.PrimaryKey, deleteTss []in
return blob.Value
}

func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.DataType) {
func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.DataType, nullable bool) {
const (
insertPrefix = "mock-insert-binlog-prefix"
deltaPrefix = "mock-delta-binlog-prefix"
Expand Down Expand Up @@ -228,6 +243,7 @@ func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.Data
Name: dataType.String(),
DataType: dataType,
ElementType: elemType,
Nullable: nullable,
},
},
}
Expand Down Expand Up @@ -308,7 +324,11 @@ OUTER:
expect := fieldData.GetRow(i)
actual := data.GetRow(i)
if fieldDataType == schemapb.DataType_Array {
suite.True(slices.Equal(expect.(*schemapb.ScalarField).GetIntData().GetData(), actual.(*schemapb.ScalarField).GetIntData().GetData()))
if expect == nil {
suite.Nil(expect)
} else {
suite.True(slices.Equal(expect.(*schemapb.ScalarField).GetIntData().GetData(), actual.(*schemapb.ScalarField).GetIntData().GetData()))
}
} else {
suite.Equal(expect, actual)
}
Expand All @@ -317,24 +337,43 @@ OUTER:
}

func (suite *ReaderSuite) TestReadScalarFields() {
suite.run(schemapb.DataType_Bool, schemapb.DataType_None)
suite.run(schemapb.DataType_Int8, schemapb.DataType_None)
suite.run(schemapb.DataType_Int16, schemapb.DataType_None)
suite.run(schemapb.DataType_Int32, schemapb.DataType_None)
suite.run(schemapb.DataType_Int64, schemapb.DataType_None)
suite.run(schemapb.DataType_Float, schemapb.DataType_None)
suite.run(schemapb.DataType_Double, schemapb.DataType_None)
suite.run(schemapb.DataType_VarChar, schemapb.DataType_None)
suite.run(schemapb.DataType_JSON, schemapb.DataType_None)

suite.run(schemapb.DataType_Array, schemapb.DataType_Bool)
suite.run(schemapb.DataType_Array, schemapb.DataType_Int8)
suite.run(schemapb.DataType_Array, schemapb.DataType_Int16)
suite.run(schemapb.DataType_Array, schemapb.DataType_Int32)
suite.run(schemapb.DataType_Array, schemapb.DataType_Int64)
suite.run(schemapb.DataType_Array, schemapb.DataType_Float)
suite.run(schemapb.DataType_Array, schemapb.DataType_Double)
suite.run(schemapb.DataType_Array, schemapb.DataType_String)
suite.run(schemapb.DataType_Bool, schemapb.DataType_None, false)
suite.run(schemapb.DataType_Int8, schemapb.DataType_None, false)
suite.run(schemapb.DataType_Int16, schemapb.DataType_None, false)
suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false)
suite.run(schemapb.DataType_Int64, schemapb.DataType_None, false)
suite.run(schemapb.DataType_Float, schemapb.DataType_None, false)
suite.run(schemapb.DataType_Double, schemapb.DataType_None, false)
suite.run(schemapb.DataType_VarChar, schemapb.DataType_None, false)
suite.run(schemapb.DataType_JSON, schemapb.DataType_None, false)

suite.run(schemapb.DataType_Array, schemapb.DataType_Bool, false)
suite.run(schemapb.DataType_Array, schemapb.DataType_Int8, false)
suite.run(schemapb.DataType_Array, schemapb.DataType_Int16, false)
suite.run(schemapb.DataType_Array, schemapb.DataType_Int32, false)
suite.run(schemapb.DataType_Array, schemapb.DataType_Int64, false)
suite.run(schemapb.DataType_Array, schemapb.DataType_Float, false)
suite.run(schemapb.DataType_Array, schemapb.DataType_Double, false)
suite.run(schemapb.DataType_Array, schemapb.DataType_String, false)

suite.run(schemapb.DataType_Bool, schemapb.DataType_None, true)
suite.run(schemapb.DataType_Int8, schemapb.DataType_None, true)
suite.run(schemapb.DataType_Int16, schemapb.DataType_None, true)
suite.run(schemapb.DataType_Int32, schemapb.DataType_None, true)
suite.run(schemapb.DataType_Int64, schemapb.DataType_None, true)
suite.run(schemapb.DataType_Float, schemapb.DataType_None, true)
suite.run(schemapb.DataType_Double, schemapb.DataType_None, true)
suite.run(schemapb.DataType_VarChar, schemapb.DataType_None, true)
suite.run(schemapb.DataType_JSON, schemapb.DataType_None, true)

suite.run(schemapb.DataType_Array, schemapb.DataType_Bool, true)
suite.run(schemapb.DataType_Array, schemapb.DataType_Int8, true)
suite.run(schemapb.DataType_Array, schemapb.DataType_Int16, true)
suite.run(schemapb.DataType_Array, schemapb.DataType_Int32, true)
suite.run(schemapb.DataType_Array, schemapb.DataType_Int64, true)
suite.run(schemapb.DataType_Array, schemapb.DataType_Float, true)
suite.run(schemapb.DataType_Array, schemapb.DataType_Double, true)
suite.run(schemapb.DataType_Array, schemapb.DataType_String, true)
}

func (suite *ReaderSuite) TestWithTSRangeAndDelete() {
Expand All @@ -350,7 +389,7 @@ func (suite *ReaderSuite) TestWithTSRangeAndDelete() {
suite.deleteTss = []int64{
8, 8, 1, 8,
}
suite.run(schemapb.DataType_Int32, schemapb.DataType_None)
suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false)
}

func (suite *ReaderSuite) TestStringPK() {
Expand All @@ -367,20 +406,20 @@ func (suite *ReaderSuite) TestStringPK() {
suite.deleteTss = []int64{
8, 8, 1, 8,
}
suite.run(schemapb.DataType_Int32, schemapb.DataType_None)
suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false)
}

func (suite *ReaderSuite) TestVector() {
suite.vecDataType = schemapb.DataType_BinaryVector
suite.run(schemapb.DataType_Int32, schemapb.DataType_None)
suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false)
suite.vecDataType = schemapb.DataType_FloatVector
suite.run(schemapb.DataType_Int32, schemapb.DataType_None)
suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false)
suite.vecDataType = schemapb.DataType_Float16Vector
suite.run(schemapb.DataType_Int32, schemapb.DataType_None)
suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false)
suite.vecDataType = schemapb.DataType_BFloat16Vector
suite.run(schemapb.DataType_Int32, schemapb.DataType_None)
suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false)
suite.vecDataType = schemapb.DataType_SparseFloatVector
suite.run(schemapb.DataType_Int32, schemapb.DataType_None)
suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false)
}

func TestUtil(t *testing.T) {
Expand Down
14 changes: 8 additions & 6 deletions internal/util/importutilv2/binlog/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,27 +29,29 @@ import (
"github.com/milvus-io/milvus/pkg/util/merr"
)

func readData(reader *storage.BinlogReader, et storage.EventTypeCode) ([]any, error) {
func readData(reader *storage.BinlogReader, et storage.EventTypeCode) ([]any, [][]bool, error) {
rowsSet := make([]any, 0)
validDataRowsSet := make([][]bool, 0)
for {
event, err := reader.NextEventReader()
if err != nil {
return nil, merr.WrapErrImportFailed(fmt.Sprintf("failed to iterate events reader, error: %v", err))
return nil, nil, merr.WrapErrImportFailed(fmt.Sprintf("failed to iterate events reader, error: %v", err))
}
if event == nil {
break // end of the file
}
if event.TypeCode != et {
return nil, merr.WrapErrImportFailed(fmt.Sprintf("wrong binlog type, expect:%s, actual:%s",
return nil, nil, merr.WrapErrImportFailed(fmt.Sprintf("wrong binlog type, expect:%s, actual:%s",
et.String(), event.TypeCode.String()))
}
rows, _, _, err := event.PayloadReaderInterface.GetDataFromPayload()
rows, validDataRows, _, err := event.PayloadReaderInterface.GetDataFromPayload()
if err != nil {
return nil, merr.WrapErrImportFailed(fmt.Sprintf("failed to read data, error: %v", err))
return nil, nil, merr.WrapErrImportFailed(fmt.Sprintf("failed to read data, error: %v", err))
}
rowsSet = append(rowsSet, rows)
validDataRowsSet = append(validDataRowsSet, validDataRows)
}
return rowsSet, nil
return rowsSet, validDataRowsSet, nil
}

func newBinlogReader(ctx context.Context, cm storage.ChunkManager, path string) (*storage.BinlogReader, error) {
Expand Down

0 comments on commit b60164b

Please sign in to comment.