diff --git a/internal/storage/insert_data.go b/internal/storage/insert_data.go index 98b3eb3d550e8..8f41c12fec852 100644 --- a/internal/storage/insert_data.go +++ b/internal/storage/insert_data.go @@ -1152,7 +1152,13 @@ func (data *JSONFieldData) AppendValidDataRows(rows interface{}) error { // AppendValidDataRows appends FLATTEN vectors to field data. func (data *BinaryVectorFieldData) AppendValidDataRows(rows interface{}) error { if rows != nil { - return merr.WrapErrParameterInvalidMsg("not support Nullable in vector") + v, ok := rows.([]bool) + if !ok { + return merr.WrapErrParameterInvalid("[]bool", rows, "Wrong rows type") + } + if len(v) != 0 { + return merr.WrapErrParameterInvalidMsg("not support Nullable in vector") + } } return nil } @@ -1160,7 +1166,13 @@ func (data *BinaryVectorFieldData) AppendValidDataRows(rows interface{}) error { // AppendValidDataRows appends FLATTEN vectors to field data. func (data *FloatVectorFieldData) AppendValidDataRows(rows interface{}) error { if rows != nil { - return merr.WrapErrParameterInvalidMsg("not support Nullable in vector") + v, ok := rows.([]bool) + if !ok { + return merr.WrapErrParameterInvalid("[]bool", rows, "Wrong rows type") + } + if len(v) != 0 { + return merr.WrapErrParameterInvalidMsg("not support Nullable in vector") + } } return nil } @@ -1168,7 +1180,13 @@ func (data *FloatVectorFieldData) AppendValidDataRows(rows interface{}) error { // AppendValidDataRows appends FLATTEN vectors to field data. func (data *Float16VectorFieldData) AppendValidDataRows(rows interface{}) error { if rows != nil { - return merr.WrapErrParameterInvalidMsg("not support Nullable in vector") + v, ok := rows.([]bool) + if !ok { + return merr.WrapErrParameterInvalid("[]bool", rows, "Wrong rows type") + } + if len(v) != 0 { + return merr.WrapErrParameterInvalidMsg("not support Nullable in vector") + } } return nil } @@ -1176,14 +1194,26 @@ func (data *Float16VectorFieldData) AppendValidDataRows(rows interface{}) error // AppendValidDataRows appends FLATTEN vectors to field data. func (data *BFloat16VectorFieldData) AppendValidDataRows(rows interface{}) error { if rows != nil { - return merr.WrapErrParameterInvalidMsg("not support Nullable in vector") + v, ok := rows.([]bool) + if !ok { + return merr.WrapErrParameterInvalid("[]bool", rows, "Wrong rows type") + } + if len(v) != 0 { + return merr.WrapErrParameterInvalidMsg("not support Nullable in vector") + } } return nil } func (data *SparseFloatVectorFieldData) AppendValidDataRows(rows interface{}) error { if rows != nil { - return merr.WrapErrParameterInvalidMsg("not support Nullable in vector") + v, ok := rows.([]bool) + if !ok { + return merr.WrapErrParameterInvalid("[]bool", rows, "Wrong rows type") + } + if len(v) != 0 { + return merr.WrapErrParameterInvalidMsg("not support Nullable in vector") + } } return nil } diff --git a/internal/util/importutilv2/binlog/field_reader.go b/internal/util/importutilv2/binlog/field_reader.go index 002ed47c0356a..c9bef62ff2ddd 100644 --- a/internal/util/importutilv2/binlog/field_reader.go +++ b/internal/util/importutilv2/binlog/field_reader.go @@ -44,13 +44,12 @@ func (r *fieldReader) Next() (storage.FieldData, error) { if err != nil { return nil, err } - rowsSet, err := readData(r.reader, storage.InsertEventType) + rowsSet, validDataRows, err := readData(r.reader, storage.InsertEventType) if err != nil { return nil, err } - // need append nulls - for _, rows := range rowsSet { - err = fieldData.AppendRows(rows, nil) + for i, rows := range rowsSet { + err = fieldData.AppendRows(rows, validDataRows[i]) if err != nil { return nil, err } diff --git a/internal/util/importutilv2/binlog/reader.go b/internal/util/importutilv2/binlog/reader.go index ca49b05cfafca..bedfb369337d5 100644 --- a/internal/util/importutilv2/binlog/reader.go +++ b/internal/util/importutilv2/binlog/reader.go @@ -116,7 +116,8 @@ func (r *reader) readDelete(deltaLogs []string, tsStart, tsEnd uint64) (*storage if err != nil { return nil, err } - rowsSet, err := readData(reader, storage.DeleteEventType) + // no need to read nulls in DeleteEventType + rowsSet, _, err := readData(reader, storage.DeleteEventType) if err != nil { return nil, err } diff --git a/internal/util/importutilv2/binlog/reader_test.go b/internal/util/importutilv2/binlog/reader_test.go index f734786143927..e0249037fb120 100644 --- a/internal/util/importutilv2/binlog/reader_test.go +++ b/internal/util/importutilv2/binlog/reader_test.go @@ -70,7 +70,7 @@ func (suite *ReaderSuite) SetupTest() { func createBinlogBuf(t *testing.T, field *schemapb.FieldSchema, data storage.FieldData) []byte { dataType := field.GetDataType() - w := storage.NewInsertBinlogWriter(dataType, 1, 1, 1, field.GetFieldID(), false) + w := storage.NewInsertBinlogWriter(dataType, 1, 1, 1, field.GetFieldID(), field.GetNullable()) assert.NotNil(t, w) defer w.Close() @@ -81,7 +81,7 @@ func createBinlogBuf(t *testing.T, field *schemapb.FieldSchema, data storage.Fie dim = 1 } - evt, err := w.NextInsertEventWriter(storage.WithDim(int(dim))) + evt, err := w.NextInsertEventWriter(storage.WithDim(int(dim)), storage.WithNullable(field.GetNullable())) assert.NoError(t, err) evt.SetEventTimestamp(1, math.MaxInt64) @@ -94,42 +94,57 @@ func createBinlogBuf(t *testing.T, field *schemapb.FieldSchema, data storage.Fie switch dataType { case schemapb.DataType_Bool: - err = evt.AddBoolToPayload(data.(*storage.BoolFieldData).Data, nil) + err = evt.AddBoolToPayload(data.(*storage.BoolFieldData).Data, data.(*storage.BoolFieldData).ValidData) assert.NoError(t, err) case schemapb.DataType_Int8: - err = evt.AddInt8ToPayload(data.(*storage.Int8FieldData).Data, nil) + err = evt.AddInt8ToPayload(data.(*storage.Int8FieldData).Data, data.(*storage.Int8FieldData).ValidData) assert.NoError(t, err) case schemapb.DataType_Int16: - err = evt.AddInt16ToPayload(data.(*storage.Int16FieldData).Data, nil) + err = evt.AddInt16ToPayload(data.(*storage.Int16FieldData).Data, data.(*storage.Int16FieldData).ValidData) assert.NoError(t, err) case schemapb.DataType_Int32: - err = evt.AddInt32ToPayload(data.(*storage.Int32FieldData).Data, nil) + err = evt.AddInt32ToPayload(data.(*storage.Int32FieldData).Data, data.(*storage.Int32FieldData).ValidData) assert.NoError(t, err) case schemapb.DataType_Int64: - err = evt.AddInt64ToPayload(data.(*storage.Int64FieldData).Data, nil) + err = evt.AddInt64ToPayload(data.(*storage.Int64FieldData).Data, data.(*storage.Int64FieldData).ValidData) assert.NoError(t, err) case schemapb.DataType_Float: - err = evt.AddFloatToPayload(data.(*storage.FloatFieldData).Data, nil) + err = evt.AddFloatToPayload(data.(*storage.FloatFieldData).Data, data.(*storage.FloatFieldData).ValidData) assert.NoError(t, err) case schemapb.DataType_Double: - err = evt.AddDoubleToPayload(data.(*storage.DoubleFieldData).Data, nil) + err = evt.AddDoubleToPayload(data.(*storage.DoubleFieldData).Data, data.(*storage.DoubleFieldData).ValidData) assert.NoError(t, err) case schemapb.DataType_VarChar: values := data.(*storage.StringFieldData).Data - for _, val := range values { - err = evt.AddOneStringToPayload(val, true) + validValues := data.(*storage.StringFieldData).ValidData + for i, val := range values { + valid := true + if len(validValues) > 0 { + valid = validValues[i] + } + err = evt.AddOneStringToPayload(val, valid) assert.NoError(t, err) } case schemapb.DataType_JSON: rows := data.(*storage.JSONFieldData).Data + validValues := data.(*storage.JSONFieldData).ValidData for i := 0; i < len(rows); i++ { - err = evt.AddOneJSONToPayload(rows[i], true) + valid := true + if len(validValues) > 0 { + valid = validValues[i] + } + err = evt.AddOneJSONToPayload(rows[i], valid) assert.NoError(t, err) } case schemapb.DataType_Array: rows := data.(*storage.ArrayFieldData).Data + validValues := data.(*storage.ArrayFieldData).ValidData for i := 0; i < len(rows); i++ { - err = evt.AddOneArrayToPayload(rows[i], true) + valid := true + if len(validValues) > 0 { + valid = validValues[i] + } + err = evt.AddOneArrayToPayload(rows[i], valid) assert.NoError(t, err) } case schemapb.DataType_BinaryVector: @@ -176,7 +191,7 @@ func createDeltaBuf(t *testing.T, deletePKs []storage.PrimaryKey, deleteTss []in return blob.Value } -func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.DataType) { +func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.DataType, nullable bool) { const ( insertPrefix = "mock-insert-binlog-prefix" deltaPrefix = "mock-delta-binlog-prefix" @@ -228,6 +243,7 @@ func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.Data Name: dataType.String(), DataType: dataType, ElementType: elemType, + Nullable: nullable, }, }, } @@ -308,7 +324,11 @@ OUTER: expect := fieldData.GetRow(i) actual := data.GetRow(i) if fieldDataType == schemapb.DataType_Array { - suite.True(slices.Equal(expect.(*schemapb.ScalarField).GetIntData().GetData(), actual.(*schemapb.ScalarField).GetIntData().GetData())) + if expect == nil { + suite.Nil(expect) + } else { + suite.True(slices.Equal(expect.(*schemapb.ScalarField).GetIntData().GetData(), actual.(*schemapb.ScalarField).GetIntData().GetData())) + } } else { suite.Equal(expect, actual) } @@ -317,24 +337,43 @@ OUTER: } func (suite *ReaderSuite) TestReadScalarFields() { - suite.run(schemapb.DataType_Bool, schemapb.DataType_None) - suite.run(schemapb.DataType_Int8, schemapb.DataType_None) - suite.run(schemapb.DataType_Int16, schemapb.DataType_None) - suite.run(schemapb.DataType_Int32, schemapb.DataType_None) - suite.run(schemapb.DataType_Int64, schemapb.DataType_None) - suite.run(schemapb.DataType_Float, schemapb.DataType_None) - suite.run(schemapb.DataType_Double, schemapb.DataType_None) - suite.run(schemapb.DataType_VarChar, schemapb.DataType_None) - suite.run(schemapb.DataType_JSON, schemapb.DataType_None) - - suite.run(schemapb.DataType_Array, schemapb.DataType_Bool) - suite.run(schemapb.DataType_Array, schemapb.DataType_Int8) - suite.run(schemapb.DataType_Array, schemapb.DataType_Int16) - suite.run(schemapb.DataType_Array, schemapb.DataType_Int32) - suite.run(schemapb.DataType_Array, schemapb.DataType_Int64) - suite.run(schemapb.DataType_Array, schemapb.DataType_Float) - suite.run(schemapb.DataType_Array, schemapb.DataType_Double) - suite.run(schemapb.DataType_Array, schemapb.DataType_String) + suite.run(schemapb.DataType_Bool, schemapb.DataType_None, false) + suite.run(schemapb.DataType_Int8, schemapb.DataType_None, false) + suite.run(schemapb.DataType_Int16, schemapb.DataType_None, false) + suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) + suite.run(schemapb.DataType_Int64, schemapb.DataType_None, false) + suite.run(schemapb.DataType_Float, schemapb.DataType_None, false) + suite.run(schemapb.DataType_Double, schemapb.DataType_None, false) + suite.run(schemapb.DataType_VarChar, schemapb.DataType_None, false) + suite.run(schemapb.DataType_JSON, schemapb.DataType_None, false) + + suite.run(schemapb.DataType_Array, schemapb.DataType_Bool, false) + suite.run(schemapb.DataType_Array, schemapb.DataType_Int8, false) + suite.run(schemapb.DataType_Array, schemapb.DataType_Int16, false) + suite.run(schemapb.DataType_Array, schemapb.DataType_Int32, false) + suite.run(schemapb.DataType_Array, schemapb.DataType_Int64, false) + suite.run(schemapb.DataType_Array, schemapb.DataType_Float, false) + suite.run(schemapb.DataType_Array, schemapb.DataType_Double, false) + suite.run(schemapb.DataType_Array, schemapb.DataType_String, false) + + suite.run(schemapb.DataType_Bool, schemapb.DataType_None, true) + suite.run(schemapb.DataType_Int8, schemapb.DataType_None, true) + suite.run(schemapb.DataType_Int16, schemapb.DataType_None, true) + suite.run(schemapb.DataType_Int32, schemapb.DataType_None, true) + suite.run(schemapb.DataType_Int64, schemapb.DataType_None, true) + suite.run(schemapb.DataType_Float, schemapb.DataType_None, true) + suite.run(schemapb.DataType_Double, schemapb.DataType_None, true) + suite.run(schemapb.DataType_VarChar, schemapb.DataType_None, true) + suite.run(schemapb.DataType_JSON, schemapb.DataType_None, true) + + suite.run(schemapb.DataType_Array, schemapb.DataType_Bool, true) + suite.run(schemapb.DataType_Array, schemapb.DataType_Int8, true) + suite.run(schemapb.DataType_Array, schemapb.DataType_Int16, true) + suite.run(schemapb.DataType_Array, schemapb.DataType_Int32, true) + suite.run(schemapb.DataType_Array, schemapb.DataType_Int64, true) + suite.run(schemapb.DataType_Array, schemapb.DataType_Float, true) + suite.run(schemapb.DataType_Array, schemapb.DataType_Double, true) + suite.run(schemapb.DataType_Array, schemapb.DataType_String, true) } func (suite *ReaderSuite) TestWithTSRangeAndDelete() { @@ -350,7 +389,7 @@ func (suite *ReaderSuite) TestWithTSRangeAndDelete() { suite.deleteTss = []int64{ 8, 8, 1, 8, } - suite.run(schemapb.DataType_Int32, schemapb.DataType_None) + suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) } func (suite *ReaderSuite) TestStringPK() { @@ -367,20 +406,20 @@ func (suite *ReaderSuite) TestStringPK() { suite.deleteTss = []int64{ 8, 8, 1, 8, } - suite.run(schemapb.DataType_Int32, schemapb.DataType_None) + suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) } func (suite *ReaderSuite) TestVector() { suite.vecDataType = schemapb.DataType_BinaryVector - suite.run(schemapb.DataType_Int32, schemapb.DataType_None) + suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) suite.vecDataType = schemapb.DataType_FloatVector - suite.run(schemapb.DataType_Int32, schemapb.DataType_None) + suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) suite.vecDataType = schemapb.DataType_Float16Vector - suite.run(schemapb.DataType_Int32, schemapb.DataType_None) + suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) suite.vecDataType = schemapb.DataType_BFloat16Vector - suite.run(schemapb.DataType_Int32, schemapb.DataType_None) + suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) suite.vecDataType = schemapb.DataType_SparseFloatVector - suite.run(schemapb.DataType_Int32, schemapb.DataType_None) + suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) } func TestUtil(t *testing.T) { diff --git a/internal/util/importutilv2/binlog/util.go b/internal/util/importutilv2/binlog/util.go index 6d10556755d98..645d2a1dcdd6e 100644 --- a/internal/util/importutilv2/binlog/util.go +++ b/internal/util/importutilv2/binlog/util.go @@ -29,27 +29,29 @@ import ( "github.com/milvus-io/milvus/pkg/util/merr" ) -func readData(reader *storage.BinlogReader, et storage.EventTypeCode) ([]any, error) { +func readData(reader *storage.BinlogReader, et storage.EventTypeCode) ([]any, [][]bool, error) { rowsSet := make([]any, 0) + validDataRowsSet := make([][]bool, 0) for { event, err := reader.NextEventReader() if err != nil { - return nil, merr.WrapErrImportFailed(fmt.Sprintf("failed to iterate events reader, error: %v", err)) + return nil, nil, merr.WrapErrImportFailed(fmt.Sprintf("failed to iterate events reader, error: %v", err)) } if event == nil { break // end of the file } if event.TypeCode != et { - return nil, merr.WrapErrImportFailed(fmt.Sprintf("wrong binlog type, expect:%s, actual:%s", + return nil, nil, merr.WrapErrImportFailed(fmt.Sprintf("wrong binlog type, expect:%s, actual:%s", et.String(), event.TypeCode.String())) } - rows, _, _, err := event.PayloadReaderInterface.GetDataFromPayload() + rows, validDataRows, _, err := event.PayloadReaderInterface.GetDataFromPayload() if err != nil { - return nil, merr.WrapErrImportFailed(fmt.Sprintf("failed to read data, error: %v", err)) + return nil, nil, merr.WrapErrImportFailed(fmt.Sprintf("failed to read data, error: %v", err)) } rowsSet = append(rowsSet, rows) + validDataRowsSet = append(validDataRowsSet, validDataRows) } - return rowsSet, nil + return rowsSet, validDataRowsSet, nil } func newBinlogReader(ctx context.Context, cm storage.ChunkManager, path string) (*storage.BinlogReader, error) {