Skip to content

Commit

Permalink
binlog writer fallback encoding
Browse files Browse the repository at this point in the history
Signed-off-by: shaoting-huang <[email protected]>
  • Loading branch information
shaoting-huang committed Jul 9, 2024
1 parent efdaed4 commit c05b548
Show file tree
Hide file tree
Showing 17 changed files with 445 additions and 280 deletions.
10 changes: 5 additions & 5 deletions internal/storage/binlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (
func TestInsertBinlog(t *testing.T) {
w := NewInsertBinlogWriter(schemapb.DataType_Int64, 10, 20, 30, 40, false)

e1, err := w.NextInsertEventWriter(false)
e1, err := w.NextInsertEventWriter()
assert.NoError(t, err)
err = e1.AddDataToPayload([]int64{1, 2, 3}, nil)
assert.NoError(t, err)
Expand All @@ -49,7 +49,7 @@ func TestInsertBinlog(t *testing.T) {
assert.NoError(t, err)
e1.SetEventTimestamp(100, 200)

e2, err := w.NextInsertEventWriter(false)
e2, err := w.NextInsertEventWriter()
assert.NoError(t, err)
err = e2.AddDataToPayload([]int64{7, 8, 9}, nil)
assert.NoError(t, err)
Expand Down Expand Up @@ -1329,7 +1329,7 @@ func TestNewBinlogReaderError(t *testing.T) {

w.SetEventTimeStamp(1000, 2000)

e1, err := w.NextInsertEventWriter(false)
e1, err := w.NextInsertEventWriter()
assert.NoError(t, err)
err = e1.AddDataToPayload([]int64{1, 2, 3}, nil)
assert.NoError(t, err)
Expand Down Expand Up @@ -1393,7 +1393,7 @@ func TestNewBinlogWriterTsError(t *testing.T) {

func TestInsertBinlogWriterCloseError(t *testing.T) {
insertWriter := NewInsertBinlogWriter(schemapb.DataType_Int64, 10, 20, 30, 40, false)
e1, err := insertWriter.NextInsertEventWriter(false)
e1, err := insertWriter.NextInsertEventWriter()
assert.NoError(t, err)

sizeTotal := 2000000
Expand All @@ -1406,7 +1406,7 @@ func TestInsertBinlogWriterCloseError(t *testing.T) {
err = insertWriter.Finish()
assert.NoError(t, err)
assert.NotNil(t, insertWriter.buffer)
insertEventWriter, err := insertWriter.NextInsertEventWriter(false)
insertEventWriter, err := insertWriter.NextInsertEventWriter()
assert.Nil(t, insertEventWriter)
assert.Error(t, err)
insertWriter.Close()
Expand Down
14 changes: 2 additions & 12 deletions internal/storage/binlog_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

// BinlogType is to distinguish different files saving different data.
Expand Down Expand Up @@ -150,21 +149,12 @@ type InsertBinlogWriter struct {
}

// NextInsertEventWriter returns an event writer to write insert data to an event.
func (writer *InsertBinlogWriter) NextInsertEventWriter(nullable bool, dim ...int) (*insertEventWriter, error) {
func (writer *InsertBinlogWriter) NextInsertEventWriter(options ...PayloadWriterOptions) (*insertEventWriter, error) {
if writer.isClosed() {
return nil, fmt.Errorf("binlog has closed")
}

var event *insertEventWriter
var err error
if typeutil.IsVectorType(writer.PayloadDataType) && !typeutil.IsSparseFloatVectorType(writer.PayloadDataType) {
if len(dim) != 1 {
return nil, fmt.Errorf("incorrect input numbers")
}
event, err = newInsertEventWriter(writer.PayloadDataType, nullable, dim[0])
} else {
event, err = newInsertEventWriter(writer.PayloadDataType, nullable)
}
event, err := newInsertEventWriter(writer.PayloadDataType, options...)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion internal/storage/binlog_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestBinlogWriterReader(t *testing.T) {

binlogWriter.SetEventTimeStamp(1000, 2000)
defer binlogWriter.Close()
eventWriter, err := binlogWriter.NextInsertEventWriter(false)
eventWriter, err := binlogWriter.NextInsertEventWriter()
assert.NoError(t, err)
err = eventWriter.AddInt32ToPayload([]int32{1, 2, 3}, nil)
assert.NoError(t, err)
Expand Down
33 changes: 10 additions & 23 deletions internal/storage/data_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,31 +243,18 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique
for _, field := range insertCodec.Schema.Schema.Fields {
// encode fields
writer = NewInsertBinlogWriter(field.DataType, insertCodec.Schema.ID, partitionID, segmentID, field.FieldID, field.GetNullable())
var eventWriter *insertEventWriter
var err error
var dim int64
if typeutil.IsVectorType(field.DataType) {
if field.GetNullable() {
return nil, merr.WrapErrParameterInvalidMsg(fmt.Sprintf("vectorType not support null, fieldName: %s", field.GetName()))
}
switch field.DataType {
case schemapb.DataType_FloatVector,
schemapb.DataType_BinaryVector,
schemapb.DataType_Float16Vector,
schemapb.DataType_BFloat16Vector:
dim, err = typeutil.GetDim(field)
if err != nil {
return nil, err
}
eventWriter, err = writer.NextInsertEventWriter(field.GetNullable(), int(dim))
case schemapb.DataType_SparseFloatVector:
eventWriter, err = writer.NextInsertEventWriter(field.GetNullable())
default:
return nil, fmt.Errorf("undefined data type %d", field.DataType)

// get payload writing configs, including nullable and fallback encoding method
options := []PayloadWriterOptions{WithNullable(field.GetNullable()), WithWriterProps(GetWriterPropsByDataType(field.DataType))}

if typeutil.IsVectorType(field.DataType) && !typeutil.IsSparseFloatVectorType(field.DataType) {
dim, err := typeutil.GetDim(field)
if err != nil {
return nil, err
}
} else {
eventWriter, err = writer.NextInsertEventWriter(field.GetNullable())
options = append(options, WithDim(int(dim)))
}
eventWriter, err := writer.NextInsertEventWriter(options...)
if err != nil {
writer.Close()
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion internal/storage/data_codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -977,7 +977,7 @@ func TestDeleteData(t *testing.T) {

func TestAddFieldDataToPayload(t *testing.T) {
w := NewInsertBinlogWriter(schemapb.DataType_Int64, 10, 20, 30, 40, false)
e, _ := w.NextInsertEventWriter(false)
e, _ := w.NextInsertEventWriter()
var err error
err = AddFieldDataToPayload(e, schemapb.DataType_Bool, &BoolFieldData{[]bool{}, nil})
assert.Error(t, err)
Expand Down
22 changes: 11 additions & 11 deletions internal/storage/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func TestInsertEvent(t *testing.T) {
}

t.Run("insert_bool", func(t *testing.T) {
w, err := newInsertEventWriter(schemapb.DataType_Bool, false)
w, err := newInsertEventWriter(schemapb.DataType_Bool)
assert.NoError(t, err)
insertT(t, schemapb.DataType_Bool, w,
func(w *insertEventWriter) error {
Expand All @@ -211,7 +211,7 @@ func TestInsertEvent(t *testing.T) {
})

t.Run("insert_int8", func(t *testing.T) {
w, err := newInsertEventWriter(schemapb.DataType_Int8, false)
w, err := newInsertEventWriter(schemapb.DataType_Int8)
assert.NoError(t, err)
insertT(t, schemapb.DataType_Int8, w,
func(w *insertEventWriter) error {
Expand All @@ -227,7 +227,7 @@ func TestInsertEvent(t *testing.T) {
})

t.Run("insert_int16", func(t *testing.T) {
w, err := newInsertEventWriter(schemapb.DataType_Int16, false)
w, err := newInsertEventWriter(schemapb.DataType_Int16)
assert.NoError(t, err)
insertT(t, schemapb.DataType_Int16, w,
func(w *insertEventWriter) error {
Expand All @@ -243,7 +243,7 @@ func TestInsertEvent(t *testing.T) {
})

t.Run("insert_int32", func(t *testing.T) {
w, err := newInsertEventWriter(schemapb.DataType_Int32, false)
w, err := newInsertEventWriter(schemapb.DataType_Int32)
assert.NoError(t, err)
insertT(t, schemapb.DataType_Int32, w,
func(w *insertEventWriter) error {
Expand All @@ -259,7 +259,7 @@ func TestInsertEvent(t *testing.T) {
})

t.Run("insert_int64", func(t *testing.T) {
w, err := newInsertEventWriter(schemapb.DataType_Int64, false)
w, err := newInsertEventWriter(schemapb.DataType_Int64)
assert.NoError(t, err)
insertT(t, schemapb.DataType_Int64, w,
func(w *insertEventWriter) error {
Expand All @@ -275,7 +275,7 @@ func TestInsertEvent(t *testing.T) {
})

t.Run("insert_float32", func(t *testing.T) {
w, err := newInsertEventWriter(schemapb.DataType_Float, false)
w, err := newInsertEventWriter(schemapb.DataType_Float)
assert.NoError(t, err)
insertT(t, schemapb.DataType_Float, w,
func(w *insertEventWriter) error {
Expand All @@ -291,7 +291,7 @@ func TestInsertEvent(t *testing.T) {
})

t.Run("insert_float64", func(t *testing.T) {
w, err := newInsertEventWriter(schemapb.DataType_Double, false)
w, err := newInsertEventWriter(schemapb.DataType_Double)
assert.NoError(t, err)
insertT(t, schemapb.DataType_Double, w,
func(w *insertEventWriter) error {
Expand All @@ -307,7 +307,7 @@ func TestInsertEvent(t *testing.T) {
})

t.Run("insert_binary_vector", func(t *testing.T) {
w, err := newInsertEventWriter(schemapb.DataType_BinaryVector, false, 16)
w, err := newInsertEventWriter(schemapb.DataType_BinaryVector, WithDim(16))
assert.NoError(t, err)
insertT(t, schemapb.DataType_BinaryVector, w,
func(w *insertEventWriter) error {
Expand All @@ -323,7 +323,7 @@ func TestInsertEvent(t *testing.T) {
})

t.Run("insert_float_vector", func(t *testing.T) {
w, err := newInsertEventWriter(schemapb.DataType_FloatVector, false, 2)
w, err := newInsertEventWriter(schemapb.DataType_FloatVector, WithDim(2))
assert.NoError(t, err)
insertT(t, schemapb.DataType_FloatVector, w,
func(w *insertEventWriter) error {
Expand All @@ -339,7 +339,7 @@ func TestInsertEvent(t *testing.T) {
})

t.Run("insert_string", func(t *testing.T) {
w, err := newInsertEventWriter(schemapb.DataType_String, false)
w, err := newInsertEventWriter(schemapb.DataType_String)
assert.NoError(t, err)
w.SetEventTimestamp(tsoutil.ComposeTS(10, 0), tsoutil.ComposeTS(100, 0))
err = w.AddDataToPayload("1234", nil)
Expand Down Expand Up @@ -1101,7 +1101,7 @@ func TestEventReaderError(t *testing.T) {
}

func TestEventClose(t *testing.T) {
w, err := newInsertEventWriter(schemapb.DataType_String, false)
w, err := newInsertEventWriter(schemapb.DataType_String)
assert.NoError(t, err)
w.SetEventTimestamp(tsoutil.ComposeTS(10, 0), tsoutil.ComposeTS(100, 0))
err = w.AddDataToPayload("1234", nil)
Expand Down
27 changes: 8 additions & 19 deletions internal/storage/event_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@ package storage
import (
"bytes"
"encoding/binary"
"fmt"
"io"

"github.com/cockroachdb/errors"

"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

// EventTypeCode represents event type by code
Expand Down Expand Up @@ -222,17 +220,8 @@ func NewBaseDescriptorEvent(collectionID int64, partitionID int64, segmentID int
return de
}

func newInsertEventWriter(dataType schemapb.DataType, nullable bool, dim ...int) (*insertEventWriter, error) {
var payloadWriter PayloadWriterInterface
var err error
if typeutil.IsVectorType(dataType) && !typeutil.IsSparseFloatVectorType(dataType) {
if len(dim) != 1 {
return nil, fmt.Errorf("incorrect input numbers")
}
payloadWriter, err = NewPayloadWriter(dataType, nullable, dim[0])
} else {
payloadWriter, err = NewPayloadWriter(dataType, nullable)
}
func newInsertEventWriter(dataType schemapb.DataType, options ...PayloadWriterOptions) (*insertEventWriter, error) {
payloadWriter, err := NewPayloadWriter(dataType, options...)
if err != nil {
return nil, err
}
Expand All @@ -254,7 +243,7 @@ func newInsertEventWriter(dataType schemapb.DataType, nullable bool, dim ...int)
}

func newDeleteEventWriter(dataType schemapb.DataType) (*deleteEventWriter, error) {
payloadWriter, err := NewPayloadWriter(dataType, false)
payloadWriter, err := NewPayloadWriter(dataType)
if err != nil {
return nil, err
}
Expand All @@ -280,7 +269,7 @@ func newCreateCollectionEventWriter(dataType schemapb.DataType) (*createCollecti
return nil, errors.New("incorrect data type")
}

payloadWriter, err := NewPayloadWriter(dataType, false)
payloadWriter, err := NewPayloadWriter(dataType)
if err != nil {
return nil, err
}
Expand All @@ -306,7 +295,7 @@ func newDropCollectionEventWriter(dataType schemapb.DataType) (*dropCollectionEv
return nil, errors.New("incorrect data type")
}

payloadWriter, err := NewPayloadWriter(dataType, false)
payloadWriter, err := NewPayloadWriter(dataType)
if err != nil {
return nil, err
}
Expand All @@ -332,7 +321,7 @@ func newCreatePartitionEventWriter(dataType schemapb.DataType) (*createPartition
return nil, errors.New("incorrect data type")
}

payloadWriter, err := NewPayloadWriter(dataType, false)
payloadWriter, err := NewPayloadWriter(dataType)
if err != nil {
return nil, err
}
Expand All @@ -358,7 +347,7 @@ func newDropPartitionEventWriter(dataType schemapb.DataType) (*dropPartitionEven
return nil, errors.New("incorrect data type")
}

payloadWriter, err := NewPayloadWriter(dataType, false)
payloadWriter, err := NewPayloadWriter(dataType)
if err != nil {
return nil, err
}
Expand All @@ -380,7 +369,7 @@ func newDropPartitionEventWriter(dataType schemapb.DataType) (*dropPartitionEven
}

func newIndexFileEventWriter(dataType schemapb.DataType) (*indexFileEventWriter, error) {
payloadWriter, err := NewPayloadWriter(dataType, false)
payloadWriter, err := NewPayloadWriter(dataType)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions internal/storage/event_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ func TestSizeofStruct(t *testing.T) {
}

func TestEventWriter(t *testing.T) {
insertEvent, err := newInsertEventWriter(schemapb.DataType_Int32, false)
insertEvent, err := newInsertEventWriter(schemapb.DataType_Int32)
assert.NoError(t, err)
insertEvent.Close()

insertEvent, err = newInsertEventWriter(schemapb.DataType_Int32, false)
insertEvent, err = newInsertEventWriter(schemapb.DataType_Int32)
assert.NoError(t, err)
defer insertEvent.Close()

Expand Down
Loading

0 comments on commit c05b548

Please sign in to comment.